feat(error-notifications): workspace-level configuration of slack, email, webhook notifications for workflow execution (#2157)

* feat(notification): slack, email, webhook notifications from logs

* retain search params for filters to link in notification

* add alerting rules

* update selector

* fix lint

* add limits on num of emails and notification triggers per workspace

* address greptile comments

* add search to combobox

* move notifications to react query

* fix lint

* fix email formatting

* add more alert types

* fix imports

* fix test route

* use emcn componentfor modal

* refactor: consolidate notification config fields into jsonb objects

* regen migration

* fix delete notif modal ui

* make them multiselect dropdowns

* update tag styling

* combobox font size with multiselect tags'
This commit is contained in:
Vikhyath Mondreti
2025-12-04 18:29:22 -08:00
committed by GitHub
parent dcbdcb43aa
commit 3b9f0f9ce2
32 changed files with 12466 additions and 2458 deletions

View File

@@ -240,32 +240,78 @@ Retrieve execution details including the workflow state snapshot.
</Tab>
</Tabs>
## Webhook Subscriptions
## Notifications
Get real-time notifications when workflow executions complete. Webhooks are configured through the Sim UI in the workflow editor.
Get real-time notifications when workflow executions complete via webhook, email, or Slack. Notifications are configured at the workspace level from the Logs page.
### Configuration
Webhooks can be configured for each workflow through the workflow editor UI. Click the webhook icon in the control bar to set up your webhook subscriptions.
Configure notifications from the Logs page by clicking the menu button and selecting "Configure Notifications".
<div className="mx-auto w-full overflow-hidden rounded-lg">
<Video src="configure-webhook.mp4" width={700} height={450} />
</div>
**Notification Channels:**
- **Webhook**: Send HTTP POST requests to your endpoint
- **Email**: Receive email notifications with execution details
- **Slack**: Post messages to a Slack channel
**Available Configuration Options:**
**Workflow Selection:**
- Select specific workflows to monitor
- Or choose "All Workflows" to include current and future workflows
**Filtering Options:**
- `levelFilter`: Log levels to receive (`info`, `error`)
- `triggerFilter`: Trigger types to receive (`api`, `webhook`, `schedule`, `manual`, `chat`)
**Optional Data:**
- `includeFinalOutput`: Include the workflow's final output
- `includeTraceSpans`: Include detailed execution trace spans
- `includeRateLimits`: Include rate limit information (sync/async limits and remaining)
- `includeUsageData`: Include billing period usage and limits
### Alert Rules
Instead of receiving notifications for every execution, configure alert rules to be notified only when issues are detected:
**Consecutive Failures**
- Alert after X consecutive failed executions (e.g., 3 failures in a row)
- Resets when an execution succeeds
**Failure Rate**
- Alert when failure rate exceeds X% over the last Y hours
- Requires minimum 5 executions in the window
- Only triggers after the full time window has elapsed
**Latency Threshold**
- Alert when any execution takes longer than X seconds
- Useful for catching slow or hanging workflows
**Latency Spike**
- Alert when execution is X% slower than the average
- Compares against the average duration over the configured time window
- Requires minimum 5 executions to establish baseline
**Cost Threshold**
- Alert when a single execution costs more than $X
- Useful for catching expensive LLM calls
**No Activity**
- Alert when no executions occur within X hours
- Useful for monitoring scheduled workflows that should run regularly
**Error Count**
- Alert when error count exceeds X within a time window
- Tracks total errors, not consecutive
All alert types include a 1-hour cooldown to prevent notification spam.
### Webhook Configuration
For webhooks, additional options are available:
- `url`: Your webhook endpoint URL
- `secret`: Optional secret for HMAC signature verification
- `includeFinalOutput`: Include the workflow's final output in the payload
- `includeTraceSpans`: Include detailed execution trace spans
- `includeRateLimits`: Include the workflow owner's rate limit information
- `includeUsageData`: Include the workflow owner's usage and billing data
- `levelFilter`: Array of log levels to receive (`info`, `error`)
- `triggerFilter`: Array of trigger types to receive (`api`, `webhook`, `schedule`, `manual`, `chat`)
- `active`: Enable/disable the webhook subscription
### Webhook Payload
### Payload Structure
When a workflow execution completes, Sim sends a POST request to your webhook URL:
When a workflow execution completes, Sim sends the following payload (via webhook POST, email, or Slack):
```json
{
@@ -316,7 +362,7 @@ When a workflow execution completes, Sim sends a POST request to your webhook UR
### Webhook Headers
Each webhook request includes these headers:
Each webhook request includes these headers (webhook channel only):
- `sim-event`: Event type (always `workflow.execution.completed`)
- `sim-timestamp`: Unix timestamp in milliseconds

View File

@@ -147,4 +147,4 @@ The snapshot provides:
- Learn about [Cost Calculation](/execution/costs) to understand workflow pricing
- Explore the [External API](/execution/api) for programmatic log access
- Set up [Webhook notifications](/execution/api#webhook-subscriptions) for real-time alerts
- Set up [Notifications](/execution/api#notifications) for real-time alerts via webhook, email, or Slack

View File

@@ -0,0 +1,40 @@
import { db } from '@sim/db'
import { account } from '@sim/db/schema'
import { and, eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { getSession } from '@/lib/auth'
import { createLogger } from '@/lib/logs/console/logger'
const logger = createLogger('AuthAccountsAPI')
export async function GET(request: NextRequest) {
try {
const session = await getSession()
if (!session?.user?.id) {
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const { searchParams } = new URL(request.url)
const provider = searchParams.get('provider')
const whereConditions = [eq(account.userId, session.user.id)]
if (provider) {
whereConditions.push(eq(account.providerId, provider))
}
const accounts = await db
.select({
id: account.id,
accountId: account.accountId,
providerId: account.providerId,
})
.from(account)
.where(and(...whereConditions))
return NextResponse.json({ accounts })
} catch (error) {
logger.error('Failed to fetch accounts', { error })
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
}
}

View File

@@ -0,0 +1,62 @@
import { nanoid } from 'nanoid'
import { type NextRequest, NextResponse } from 'next/server'
import { verifyCronAuth } from '@/lib/auth/internal'
import { acquireLock, releaseLock } from '@/lib/core/config/redis'
import { createLogger } from '@/lib/logs/console/logger'
import { pollInactivityAlerts } from '@/lib/notifications/inactivity-polling'
const logger = createLogger('InactivityAlertPoll')
export const maxDuration = 120
const LOCK_KEY = 'inactivity-alert-polling-lock'
const LOCK_TTL_SECONDS = 120
export async function GET(request: NextRequest) {
const requestId = nanoid()
logger.info(`Inactivity alert polling triggered (${requestId})`)
try {
const authError = verifyCronAuth(request, 'Inactivity alert polling')
if (authError) {
return authError
}
const locked = await acquireLock(LOCK_KEY, requestId, LOCK_TTL_SECONDS)
if (!locked) {
return NextResponse.json(
{
success: true,
message: 'Polling already in progress skipped',
requestId,
status: 'skip',
},
{ status: 202 }
)
}
const results = await pollInactivityAlerts()
return NextResponse.json({
success: true,
message: 'Inactivity alert polling completed',
requestId,
status: 'completed',
...results,
})
} catch (error) {
logger.error(`Error during inactivity alert polling (${requestId}):`, error)
return NextResponse.json(
{
success: false,
message: 'Inactivity alert polling failed',
error: error instanceof Error ? error.message : 'Unknown error',
requestId,
},
{ status: 500 }
)
} finally {
await releaseLock(LOCK_KEY).catch(() => {})
}
}

View File

@@ -1,221 +0,0 @@
import { db } from '@sim/db'
import { permissions, workflow, workflowLogWebhook } from '@sim/db/schema'
import { and, eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { z } from 'zod'
import { getSession } from '@/lib/auth'
import { encryptSecret } from '@/lib/core/security/encryption'
import { createLogger } from '@/lib/logs/console/logger'
const logger = createLogger('WorkflowLogWebhookUpdate')
type WebhookUpdatePayload = Pick<
typeof workflowLogWebhook.$inferInsert,
| 'url'
| 'includeFinalOutput'
| 'includeTraceSpans'
| 'includeRateLimits'
| 'includeUsageData'
| 'levelFilter'
| 'triggerFilter'
| 'secret'
| 'updatedAt'
>
const UpdateWebhookSchema = z.object({
url: z.string().url('Invalid webhook URL'),
secret: z.string().optional(),
includeFinalOutput: z.boolean(),
includeTraceSpans: z.boolean(),
includeRateLimits: z.boolean(),
includeUsageData: z.boolean(),
levelFilter: z.array(z.enum(['info', 'error'])),
triggerFilter: z.array(z.enum(['api', 'webhook', 'schedule', 'manual', 'chat'])),
})
export async function PUT(
request: NextRequest,
{ params }: { params: Promise<{ id: string; webhookId: string }> }
) {
try {
const session = await getSession()
if (!session?.user?.id) {
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const { id: workflowId, webhookId } = await params
const userId = session.user.id
// Check if user has access to the workflow
const hasAccess = await db
.select({ id: workflow.id })
.from(workflow)
.innerJoin(
permissions,
and(
eq(permissions.entityType, 'workspace'),
eq(permissions.entityId, workflow.workspaceId),
eq(permissions.userId, userId)
)
)
.where(eq(workflow.id, workflowId))
.limit(1)
if (hasAccess.length === 0) {
return NextResponse.json({ error: 'Workflow not found' }, { status: 404 })
}
// Check if webhook exists and belongs to this workflow
const existingWebhook = await db
.select()
.from(workflowLogWebhook)
.where(
and(eq(workflowLogWebhook.id, webhookId), eq(workflowLogWebhook.workflowId, workflowId))
)
.limit(1)
if (existingWebhook.length === 0) {
return NextResponse.json({ error: 'Webhook not found' }, { status: 404 })
}
const body = await request.json()
const validationResult = UpdateWebhookSchema.safeParse(body)
if (!validationResult.success) {
return NextResponse.json(
{ error: 'Invalid request', details: validationResult.error.errors },
{ status: 400 }
)
}
const data = validationResult.data
// Check for duplicate URL (excluding current webhook)
const duplicateWebhook = await db
.select({ id: workflowLogWebhook.id })
.from(workflowLogWebhook)
.where(
and(eq(workflowLogWebhook.workflowId, workflowId), eq(workflowLogWebhook.url, data.url))
)
.limit(1)
if (duplicateWebhook.length > 0 && duplicateWebhook[0].id !== webhookId) {
return NextResponse.json(
{ error: 'A webhook with this URL already exists for this workflow' },
{ status: 409 }
)
}
// Prepare update data
const updateData: WebhookUpdatePayload = {
url: data.url,
includeFinalOutput: data.includeFinalOutput,
includeTraceSpans: data.includeTraceSpans,
includeRateLimits: data.includeRateLimits,
includeUsageData: data.includeUsageData,
levelFilter: data.levelFilter,
triggerFilter: data.triggerFilter,
updatedAt: new Date(),
}
// Only update secret if provided
if (data.secret) {
const { encrypted } = await encryptSecret(data.secret)
updateData.secret = encrypted
}
const updatedWebhooks = await db
.update(workflowLogWebhook)
.set(updateData)
.where(eq(workflowLogWebhook.id, webhookId))
.returning()
if (updatedWebhooks.length === 0) {
return NextResponse.json({ error: 'Webhook not found' }, { status: 404 })
}
const updatedWebhook = updatedWebhooks[0]
logger.info('Webhook updated', {
webhookId,
workflowId,
userId,
})
return NextResponse.json({
data: {
id: updatedWebhook.id,
url: updatedWebhook.url,
includeFinalOutput: updatedWebhook.includeFinalOutput,
includeTraceSpans: updatedWebhook.includeTraceSpans,
includeRateLimits: updatedWebhook.includeRateLimits,
includeUsageData: updatedWebhook.includeUsageData,
levelFilter: updatedWebhook.levelFilter,
triggerFilter: updatedWebhook.triggerFilter,
active: updatedWebhook.active,
createdAt: updatedWebhook.createdAt.toISOString(),
updatedAt: updatedWebhook.updatedAt.toISOString(),
},
})
} catch (error) {
logger.error('Failed to update webhook', { error })
return NextResponse.json({ error: 'Failed to update webhook' }, { status: 500 })
}
}
export async function DELETE(
request: NextRequest,
{ params }: { params: Promise<{ id: string; webhookId: string }> }
) {
try {
const session = await getSession()
if (!session?.user?.id) {
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const { id: workflowId, webhookId } = await params
const userId = session.user.id
// Check if user has access to the workflow
const hasAccess = await db
.select({ id: workflow.id })
.from(workflow)
.innerJoin(
permissions,
and(
eq(permissions.entityType, 'workspace'),
eq(permissions.entityId, workflow.workspaceId),
eq(permissions.userId, userId)
)
)
.where(eq(workflow.id, workflowId))
.limit(1)
if (hasAccess.length === 0) {
return NextResponse.json({ error: 'Workflow not found' }, { status: 404 })
}
// Delete the webhook (will cascade delete deliveries)
const deletedWebhook = await db
.delete(workflowLogWebhook)
.where(
and(eq(workflowLogWebhook.id, webhookId), eq(workflowLogWebhook.workflowId, workflowId))
)
.returning()
if (deletedWebhook.length === 0) {
return NextResponse.json({ error: 'Webhook not found' }, { status: 404 })
}
logger.info('Webhook deleted', {
webhookId,
workflowId,
userId,
})
return NextResponse.json({ success: true })
} catch (error) {
logger.error('Failed to delete webhook', { error })
return NextResponse.json({ error: 'Failed to delete webhook' }, { status: 500 })
}
}

View File

@@ -1,248 +0,0 @@
import { db } from '@sim/db'
import { permissions, workflow, workflowLogWebhook } from '@sim/db/schema'
import { and, eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { v4 as uuidv4 } from 'uuid'
import { z } from 'zod'
import { getSession } from '@/lib/auth'
import { encryptSecret } from '@/lib/core/security/encryption'
import { createLogger } from '@/lib/logs/console/logger'
const logger = createLogger('WorkflowLogWebhookAPI')
const CreateWebhookSchema = z.object({
url: z.string().url(),
secret: z.string().optional(),
includeFinalOutput: z.boolean().optional().default(false),
includeTraceSpans: z.boolean().optional().default(false),
includeRateLimits: z.boolean().optional().default(false),
includeUsageData: z.boolean().optional().default(false),
levelFilter: z
.array(z.enum(['info', 'error']))
.optional()
.default(['info', 'error']),
triggerFilter: z
.array(z.enum(['api', 'webhook', 'schedule', 'manual', 'chat']))
.optional()
.default(['api', 'webhook', 'schedule', 'manual', 'chat']),
active: z.boolean().optional().default(true),
})
export async function GET(request: NextRequest, { params }: { params: Promise<{ id: string }> }) {
try {
const session = await getSession()
if (!session?.user?.id) {
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const { id: workflowId } = await params
const userId = session.user.id
const hasAccess = await db
.select({ id: workflow.id })
.from(workflow)
.innerJoin(
permissions,
and(
eq(permissions.entityType, 'workspace'),
eq(permissions.entityId, workflow.workspaceId),
eq(permissions.userId, userId)
)
)
.where(eq(workflow.id, workflowId))
.limit(1)
if (hasAccess.length === 0) {
return NextResponse.json({ error: 'Workflow not found' }, { status: 404 })
}
const webhooks = await db
.select({
id: workflowLogWebhook.id,
url: workflowLogWebhook.url,
includeFinalOutput: workflowLogWebhook.includeFinalOutput,
includeTraceSpans: workflowLogWebhook.includeTraceSpans,
includeRateLimits: workflowLogWebhook.includeRateLimits,
includeUsageData: workflowLogWebhook.includeUsageData,
levelFilter: workflowLogWebhook.levelFilter,
triggerFilter: workflowLogWebhook.triggerFilter,
active: workflowLogWebhook.active,
createdAt: workflowLogWebhook.createdAt,
updatedAt: workflowLogWebhook.updatedAt,
})
.from(workflowLogWebhook)
.where(eq(workflowLogWebhook.workflowId, workflowId))
return NextResponse.json({ data: webhooks })
} catch (error) {
logger.error('Error fetching log webhooks', { error })
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
}
}
export async function POST(request: NextRequest, { params }: { params: Promise<{ id: string }> }) {
try {
const session = await getSession()
if (!session?.user?.id) {
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const { id: workflowId } = await params
const userId = session.user.id
const hasAccess = await db
.select({ id: workflow.id })
.from(workflow)
.innerJoin(
permissions,
and(
eq(permissions.entityType, 'workspace'),
eq(permissions.entityId, workflow.workspaceId),
eq(permissions.userId, userId)
)
)
.where(eq(workflow.id, workflowId))
.limit(1)
if (hasAccess.length === 0) {
return NextResponse.json({ error: 'Workflow not found' }, { status: 404 })
}
const body = await request.json()
const validationResult = CreateWebhookSchema.safeParse(body)
if (!validationResult.success) {
return NextResponse.json(
{ error: 'Invalid request', details: validationResult.error.errors },
{ status: 400 }
)
}
const data = validationResult.data
// Check for duplicate URL
const existingWebhook = await db
.select({ id: workflowLogWebhook.id })
.from(workflowLogWebhook)
.where(
and(eq(workflowLogWebhook.workflowId, workflowId), eq(workflowLogWebhook.url, data.url))
)
.limit(1)
if (existingWebhook.length > 0) {
return NextResponse.json(
{ error: 'A webhook with this URL already exists for this workflow' },
{ status: 409 }
)
}
let encryptedSecret: string | null = null
if (data.secret) {
const { encrypted } = await encryptSecret(data.secret)
encryptedSecret = encrypted
}
const [webhook] = await db
.insert(workflowLogWebhook)
.values({
id: uuidv4(),
workflowId,
url: data.url,
secret: encryptedSecret,
includeFinalOutput: data.includeFinalOutput,
includeTraceSpans: data.includeTraceSpans,
includeRateLimits: data.includeRateLimits,
includeUsageData: data.includeUsageData,
levelFilter: data.levelFilter,
triggerFilter: data.triggerFilter,
active: data.active,
})
.returning()
logger.info('Created log webhook', {
workflowId,
webhookId: webhook.id,
url: data.url,
})
return NextResponse.json({
data: {
id: webhook.id,
url: webhook.url,
includeFinalOutput: webhook.includeFinalOutput,
includeTraceSpans: webhook.includeTraceSpans,
includeRateLimits: webhook.includeRateLimits,
includeUsageData: webhook.includeUsageData,
levelFilter: webhook.levelFilter,
triggerFilter: webhook.triggerFilter,
active: webhook.active,
createdAt: webhook.createdAt,
updatedAt: webhook.updatedAt,
},
})
} catch (error) {
logger.error('Error creating log webhook', { error })
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
}
}
export async function DELETE(
request: NextRequest,
{ params }: { params: Promise<{ id: string }> }
) {
try {
const session = await getSession()
if (!session?.user?.id) {
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const { id: workflowId } = await params
const userId = session.user.id
const { searchParams } = new URL(request.url)
const webhookId = searchParams.get('webhookId')
if (!webhookId) {
return NextResponse.json({ error: 'webhookId is required' }, { status: 400 })
}
const hasAccess = await db
.select({ id: workflow.id })
.from(workflow)
.innerJoin(
permissions,
and(
eq(permissions.entityType, 'workspace'),
eq(permissions.entityId, workflow.workspaceId),
eq(permissions.userId, userId)
)
)
.where(eq(workflow.id, workflowId))
.limit(1)
if (hasAccess.length === 0) {
return NextResponse.json({ error: 'Workflow not found' }, { status: 404 })
}
const deleted = await db
.delete(workflowLogWebhook)
.where(
and(eq(workflowLogWebhook.id, webhookId), eq(workflowLogWebhook.workflowId, workflowId))
)
.returning({ id: workflowLogWebhook.id })
if (deleted.length === 0) {
return NextResponse.json({ error: 'Webhook not found' }, { status: 404 })
}
logger.info('Deleted log webhook', {
workflowId,
webhookId,
})
return NextResponse.json({ success: true })
} catch (error) {
logger.error('Error deleting log webhook', { error })
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
}
}

View File

@@ -1,232 +0,0 @@
import { createHmac } from 'crypto'
import { db } from '@sim/db'
import { permissions, workflow, workflowLogWebhook } from '@sim/db/schema'
import { and, eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { v4 as uuidv4 } from 'uuid'
import { getSession } from '@/lib/auth'
import { decryptSecret } from '@/lib/core/security/encryption'
import { createLogger } from '@/lib/logs/console/logger'
const logger = createLogger('WorkflowLogWebhookTestAPI')
function generateSignature(secret: string, timestamp: number, body: string): string {
const signatureBase = `${timestamp}.${body}`
const hmac = createHmac('sha256', secret)
hmac.update(signatureBase)
return hmac.digest('hex')
}
export async function POST(request: NextRequest, { params }: { params: Promise<{ id: string }> }) {
try {
const session = await getSession()
if (!session?.user?.id) {
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const { id: workflowId } = await params
const userId = session.user.id
const { searchParams } = new URL(request.url)
const webhookId = searchParams.get('webhookId')
if (!webhookId) {
return NextResponse.json({ error: 'webhookId is required' }, { status: 400 })
}
const hasAccess = await db
.select({ id: workflow.id })
.from(workflow)
.innerJoin(
permissions,
and(
eq(permissions.entityType, 'workspace'),
eq(permissions.entityId, workflow.workspaceId),
eq(permissions.userId, userId)
)
)
.where(eq(workflow.id, workflowId))
.limit(1)
if (hasAccess.length === 0) {
return NextResponse.json({ error: 'Workflow not found' }, { status: 404 })
}
const [webhook] = await db
.select()
.from(workflowLogWebhook)
.where(
and(eq(workflowLogWebhook.id, webhookId), eq(workflowLogWebhook.workflowId, workflowId))
)
.limit(1)
if (!webhook) {
return NextResponse.json({ error: 'Webhook not found' }, { status: 404 })
}
const timestamp = Date.now()
const eventId = `evt_test_${uuidv4()}`
const executionId = `exec_test_${uuidv4()}`
const logId = `log_test_${uuidv4()}`
const payload = {
id: eventId,
type: 'workflow.execution.completed',
timestamp,
data: {
workflowId,
executionId,
status: 'success',
level: 'info',
trigger: 'manual',
startedAt: new Date(timestamp - 5000).toISOString(),
endedAt: new Date(timestamp).toISOString(),
totalDurationMs: 5000,
cost: {
total: 0.00123,
tokens: { prompt: 100, completion: 50, total: 150 },
models: {
'gpt-4o': {
input: 0.001,
output: 0.00023,
total: 0.00123,
tokens: { prompt: 100, completion: 50, total: 150 },
},
},
},
files: null,
},
links: {
log: `/v1/logs/${logId}`,
execution: `/v1/logs/executions/${executionId}`,
},
}
if (webhook.includeFinalOutput) {
;(payload.data as any).finalOutput = {
message: 'This is a test webhook delivery',
test: true,
}
}
if (webhook.includeTraceSpans) {
;(payload.data as any).traceSpans = [
{
id: 'span_test_1',
name: 'Test Block',
type: 'block',
status: 'success',
startTime: new Date(timestamp - 5000).toISOString(),
endTime: new Date(timestamp).toISOString(),
duration: 5000,
},
]
}
if (webhook.includeRateLimits) {
;(payload.data as any).rateLimits = {
sync: {
limit: 150,
remaining: 45,
resetAt: new Date(timestamp + 60000).toISOString(),
},
async: {
limit: 1000,
remaining: 50,
resetAt: new Date(timestamp + 60000).toISOString(),
},
}
}
if (webhook.includeUsageData) {
;(payload.data as any).usage = {
currentPeriodCost: 2.45,
limit: 10,
plan: 'pro',
isExceeded: false,
}
}
const body = JSON.stringify(payload)
const deliveryId = `delivery_test_${uuidv4()}`
const headers: Record<string, string> = {
'Content-Type': 'application/json',
'sim-event': 'workflow.execution.completed',
'sim-timestamp': timestamp.toString(),
'sim-delivery-id': deliveryId,
'Idempotency-Key': deliveryId,
}
if (webhook.secret) {
const { decrypted } = await decryptSecret(webhook.secret)
const signature = generateSignature(decrypted, timestamp, body)
headers['sim-signature'] = `t=${timestamp},v1=${signature}`
}
logger.info(`Sending test webhook to ${webhook.url}`, { workflowId, webhookId })
const controller = new AbortController()
const timeoutId = setTimeout(() => controller.abort(), 10000)
try {
const response = await fetch(webhook.url, {
method: 'POST',
headers,
body,
signal: controller.signal,
})
clearTimeout(timeoutId)
const responseBody = await response.text().catch(() => '')
const truncatedBody = responseBody.slice(0, 500)
const result = {
success: response.ok,
status: response.status,
statusText: response.statusText,
headers: Object.fromEntries(response.headers.entries()),
body: truncatedBody,
timestamp: new Date().toISOString(),
}
logger.info(`Test webhook completed`, {
workflowId,
webhookId,
status: response.status,
success: response.ok,
})
return NextResponse.json({ data: result })
} catch (error: any) {
clearTimeout(timeoutId)
if (error.name === 'AbortError') {
logger.error(`Test webhook timed out`, { workflowId, webhookId })
return NextResponse.json({
data: {
success: false,
error: 'Request timeout after 10 seconds',
timestamp: new Date().toISOString(),
},
})
}
logger.error(`Test webhook failed`, {
workflowId,
webhookId,
error: error.message,
})
return NextResponse.json({
data: {
success: false,
error: error.message,
timestamp: new Date().toISOString(),
},
})
}
} catch (error) {
logger.error('Error testing webhook', { error })
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
}
}

View File

@@ -0,0 +1,318 @@
import { db } from '@sim/db'
import { workflow, workspaceNotificationSubscription } from '@sim/db/schema'
import { and, eq, inArray } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { z } from 'zod'
import { getSession } from '@/lib/auth'
import { encryptSecret } from '@/lib/core/security/encryption'
import { createLogger } from '@/lib/logs/console/logger'
import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils'
import { MAX_EMAIL_RECIPIENTS, MAX_WORKFLOW_IDS } from '../constants'
const logger = createLogger('WorkspaceNotificationAPI')
const levelFilterSchema = z.array(z.enum(['info', 'error']))
const triggerFilterSchema = z.array(z.enum(['api', 'webhook', 'schedule', 'manual', 'chat']))
const alertRuleSchema = z.enum([
'consecutive_failures',
'failure_rate',
'latency_threshold',
'latency_spike',
'cost_threshold',
'no_activity',
'error_count',
])
const alertConfigSchema = z
.object({
rule: alertRuleSchema,
consecutiveFailures: z.number().int().min(1).max(100).optional(),
failureRatePercent: z.number().int().min(1).max(100).optional(),
windowHours: z.number().int().min(1).max(168).optional(),
durationThresholdMs: z.number().int().min(1000).max(3600000).optional(),
latencySpikePercent: z.number().int().min(10).max(1000).optional(),
costThresholdDollars: z.number().min(0.01).max(1000).optional(),
inactivityHours: z.number().int().min(1).max(168).optional(),
errorCountThreshold: z.number().int().min(1).max(1000).optional(),
})
.refine(
(data) => {
switch (data.rule) {
case 'consecutive_failures':
return data.consecutiveFailures !== undefined
case 'failure_rate':
return data.failureRatePercent !== undefined && data.windowHours !== undefined
case 'latency_threshold':
return data.durationThresholdMs !== undefined
case 'latency_spike':
return data.latencySpikePercent !== undefined && data.windowHours !== undefined
case 'cost_threshold':
return data.costThresholdDollars !== undefined
case 'no_activity':
return data.inactivityHours !== undefined
case 'error_count':
return data.errorCountThreshold !== undefined && data.windowHours !== undefined
default:
return false
}
},
{ message: 'Missing required fields for alert rule' }
)
.nullable()
const webhookConfigSchema = z.object({
url: z.string().url(),
secret: z.string().optional(),
})
const slackConfigSchema = z.object({
channelId: z.string(),
channelName: z.string(),
accountId: z.string(),
})
const updateNotificationSchema = z
.object({
workflowIds: z.array(z.string()).max(MAX_WORKFLOW_IDS).optional(),
allWorkflows: z.boolean().optional(),
levelFilter: levelFilterSchema.optional(),
triggerFilter: triggerFilterSchema.optional(),
includeFinalOutput: z.boolean().optional(),
includeTraceSpans: z.boolean().optional(),
includeRateLimits: z.boolean().optional(),
includeUsageData: z.boolean().optional(),
alertConfig: alertConfigSchema.optional(),
webhookConfig: webhookConfigSchema.optional(),
emailRecipients: z.array(z.string().email()).max(MAX_EMAIL_RECIPIENTS).optional(),
slackConfig: slackConfigSchema.optional(),
active: z.boolean().optional(),
})
.refine((data) => !(data.allWorkflows && data.workflowIds && data.workflowIds.length > 0), {
message: 'Cannot specify both allWorkflows and workflowIds',
})
type RouteParams = { params: Promise<{ id: string; notificationId: string }> }
async function checkWorkspaceWriteAccess(
userId: string,
workspaceId: string
): Promise<{ hasAccess: boolean; permission: string | null }> {
const permission = await getUserEntityPermissions(userId, 'workspace', workspaceId)
const hasAccess = permission === 'write' || permission === 'admin'
return { hasAccess, permission }
}
async function getSubscription(notificationId: string, workspaceId: string) {
const [subscription] = await db
.select()
.from(workspaceNotificationSubscription)
.where(
and(
eq(workspaceNotificationSubscription.id, notificationId),
eq(workspaceNotificationSubscription.workspaceId, workspaceId)
)
)
.limit(1)
return subscription
}
export async function GET(request: NextRequest, { params }: RouteParams) {
try {
const session = await getSession()
if (!session?.user?.id) {
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const { id: workspaceId, notificationId } = await params
const permission = await getUserEntityPermissions(session.user.id, 'workspace', workspaceId)
if (!permission) {
return NextResponse.json({ error: 'Workspace not found' }, { status: 404 })
}
const subscription = await getSubscription(notificationId, workspaceId)
if (!subscription) {
return NextResponse.json({ error: 'Notification not found' }, { status: 404 })
}
return NextResponse.json({
data: {
id: subscription.id,
notificationType: subscription.notificationType,
workflowIds: subscription.workflowIds,
allWorkflows: subscription.allWorkflows,
levelFilter: subscription.levelFilter,
triggerFilter: subscription.triggerFilter,
includeFinalOutput: subscription.includeFinalOutput,
includeTraceSpans: subscription.includeTraceSpans,
includeRateLimits: subscription.includeRateLimits,
includeUsageData: subscription.includeUsageData,
webhookConfig: subscription.webhookConfig,
emailRecipients: subscription.emailRecipients,
slackConfig: subscription.slackConfig,
alertConfig: subscription.alertConfig,
active: subscription.active,
createdAt: subscription.createdAt,
updatedAt: subscription.updatedAt,
},
})
} catch (error) {
logger.error('Error fetching notification', { error })
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
}
}
export async function PUT(request: NextRequest, { params }: RouteParams) {
try {
const session = await getSession()
if (!session?.user?.id) {
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const { id: workspaceId, notificationId } = await params
const { hasAccess } = await checkWorkspaceWriteAccess(session.user.id, workspaceId)
if (!hasAccess) {
return NextResponse.json({ error: 'Insufficient permissions' }, { status: 403 })
}
const existingSubscription = await getSubscription(notificationId, workspaceId)
if (!existingSubscription) {
return NextResponse.json({ error: 'Notification not found' }, { status: 404 })
}
const body = await request.json()
const validationResult = updateNotificationSchema.safeParse(body)
if (!validationResult.success) {
return NextResponse.json(
{ error: 'Invalid request', details: validationResult.error.errors },
{ status: 400 }
)
}
const data = validationResult.data
if (data.workflowIds && data.workflowIds.length > 0) {
const workflowsInWorkspace = await db
.select({ id: workflow.id })
.from(workflow)
.where(and(eq(workflow.workspaceId, workspaceId), inArray(workflow.id, data.workflowIds)))
const validIds = new Set(workflowsInWorkspace.map((w) => w.id))
const invalidIds = data.workflowIds.filter((id) => !validIds.has(id))
if (invalidIds.length > 0) {
return NextResponse.json(
{ error: 'Some workflow IDs do not belong to this workspace', invalidIds },
{ status: 400 }
)
}
}
const updateData: Record<string, unknown> = { updatedAt: new Date() }
if (data.workflowIds !== undefined) updateData.workflowIds = data.workflowIds
if (data.allWorkflows !== undefined) updateData.allWorkflows = data.allWorkflows
if (data.levelFilter !== undefined) updateData.levelFilter = data.levelFilter
if (data.triggerFilter !== undefined) updateData.triggerFilter = data.triggerFilter
if (data.includeFinalOutput !== undefined)
updateData.includeFinalOutput = data.includeFinalOutput
if (data.includeTraceSpans !== undefined) updateData.includeTraceSpans = data.includeTraceSpans
if (data.includeRateLimits !== undefined) updateData.includeRateLimits = data.includeRateLimits
if (data.includeUsageData !== undefined) updateData.includeUsageData = data.includeUsageData
if (data.alertConfig !== undefined) updateData.alertConfig = data.alertConfig
if (data.emailRecipients !== undefined) updateData.emailRecipients = data.emailRecipients
if (data.slackConfig !== undefined) updateData.slackConfig = data.slackConfig
if (data.active !== undefined) updateData.active = data.active
// Handle webhookConfig with secret encryption
if (data.webhookConfig !== undefined) {
let webhookConfig = data.webhookConfig
if (webhookConfig?.secret) {
const { encrypted } = await encryptSecret(webhookConfig.secret)
webhookConfig = { ...webhookConfig, secret: encrypted }
}
updateData.webhookConfig = webhookConfig
}
const [subscription] = await db
.update(workspaceNotificationSubscription)
.set(updateData)
.where(eq(workspaceNotificationSubscription.id, notificationId))
.returning()
logger.info('Updated notification subscription', {
workspaceId,
subscriptionId: subscription.id,
})
return NextResponse.json({
data: {
id: subscription.id,
notificationType: subscription.notificationType,
workflowIds: subscription.workflowIds,
allWorkflows: subscription.allWorkflows,
levelFilter: subscription.levelFilter,
triggerFilter: subscription.triggerFilter,
includeFinalOutput: subscription.includeFinalOutput,
includeTraceSpans: subscription.includeTraceSpans,
includeRateLimits: subscription.includeRateLimits,
includeUsageData: subscription.includeUsageData,
webhookConfig: subscription.webhookConfig,
emailRecipients: subscription.emailRecipients,
slackConfig: subscription.slackConfig,
alertConfig: subscription.alertConfig,
active: subscription.active,
createdAt: subscription.createdAt,
updatedAt: subscription.updatedAt,
},
})
} catch (error) {
logger.error('Error updating notification', { error })
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
}
}
export async function DELETE(request: NextRequest, { params }: RouteParams) {
try {
const session = await getSession()
if (!session?.user?.id) {
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const { id: workspaceId, notificationId } = await params
const { hasAccess } = await checkWorkspaceWriteAccess(session.user.id, workspaceId)
if (!hasAccess) {
return NextResponse.json({ error: 'Insufficient permissions' }, { status: 403 })
}
const deleted = await db
.delete(workspaceNotificationSubscription)
.where(
and(
eq(workspaceNotificationSubscription.id, notificationId),
eq(workspaceNotificationSubscription.workspaceId, workspaceId)
)
)
.returning({ id: workspaceNotificationSubscription.id })
if (deleted.length === 0) {
return NextResponse.json({ error: 'Notification not found' }, { status: 404 })
}
logger.info('Deleted notification subscription', {
workspaceId,
subscriptionId: notificationId,
})
return NextResponse.json({ success: true })
} catch (error) {
logger.error('Error deleting notification', { error })
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
}
}

View File

@@ -0,0 +1,319 @@
import { createHmac } from 'crypto'
import { db } from '@sim/db'
import { account, workspaceNotificationSubscription } from '@sim/db/schema'
import { and, eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { v4 as uuidv4 } from 'uuid'
import { getSession } from '@/lib/auth'
import { decryptSecret } from '@/lib/core/security/encryption'
import { createLogger } from '@/lib/logs/console/logger'
import { sendEmail } from '@/lib/messaging/email/mailer'
import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils'
const logger = createLogger('WorkspaceNotificationTestAPI')
type RouteParams = { params: Promise<{ id: string; notificationId: string }> }
interface WebhookConfig {
url: string
secret?: string
}
interface SlackConfig {
channelId: string
channelName: string
accountId: string
}
function generateSignature(secret: string, timestamp: number, body: string): string {
const signatureBase = `${timestamp}.${body}`
const hmac = createHmac('sha256', secret)
hmac.update(signatureBase)
return hmac.digest('hex')
}
function buildTestPayload(subscription: typeof workspaceNotificationSubscription.$inferSelect) {
const timestamp = Date.now()
const eventId = `evt_test_${uuidv4()}`
const executionId = `exec_test_${uuidv4()}`
const payload: Record<string, unknown> = {
id: eventId,
type: 'workflow.execution.completed',
timestamp,
data: {
workflowId: 'test-workflow-id',
workflowName: 'Test Workflow',
executionId,
status: 'success',
level: 'info',
trigger: 'manual',
startedAt: new Date(timestamp - 5000).toISOString(),
endedAt: new Date(timestamp).toISOString(),
totalDurationMs: 5000,
cost: {
total: 0.00123,
tokens: { prompt: 100, completion: 50, total: 150 },
},
},
links: {
log: `/workspace/logs`,
},
}
const data = payload.data as Record<string, unknown>
if (subscription.includeFinalOutput) {
data.finalOutput = { message: 'This is a test notification', test: true }
}
if (subscription.includeTraceSpans) {
data.traceSpans = [
{
id: 'span_test_1',
name: 'Test Block',
type: 'block',
status: 'success',
startTime: new Date(timestamp - 5000).toISOString(),
endTime: new Date(timestamp).toISOString(),
duration: 5000,
},
]
}
if (subscription.includeRateLimits) {
data.rateLimits = {
sync: { limit: 150, remaining: 45, resetAt: new Date(timestamp + 60000).toISOString() },
async: { limit: 1000, remaining: 50, resetAt: new Date(timestamp + 60000).toISOString() },
}
}
if (subscription.includeUsageData) {
data.usage = { currentPeriodCost: 2.45, limit: 10, plan: 'pro', isExceeded: false }
}
return { payload, timestamp }
}
async function testWebhook(subscription: typeof workspaceNotificationSubscription.$inferSelect) {
const webhookConfig = subscription.webhookConfig as WebhookConfig | null
if (!webhookConfig?.url) {
return { success: false, error: 'No webhook URL configured' }
}
const { payload, timestamp } = buildTestPayload(subscription)
const body = JSON.stringify(payload)
const deliveryId = `delivery_test_${uuidv4()}`
const headers: Record<string, string> = {
'Content-Type': 'application/json',
'sim-event': 'workflow.execution.completed',
'sim-timestamp': timestamp.toString(),
'sim-delivery-id': deliveryId,
'Idempotency-Key': deliveryId,
}
if (webhookConfig.secret) {
const { decrypted } = await decryptSecret(webhookConfig.secret)
const signature = generateSignature(decrypted, timestamp, body)
headers['sim-signature'] = `t=${timestamp},v1=${signature}`
}
const controller = new AbortController()
const timeoutId = setTimeout(() => controller.abort(), 10000)
try {
const response = await fetch(webhookConfig.url, {
method: 'POST',
headers,
body,
signal: controller.signal,
})
clearTimeout(timeoutId)
const responseBody = await response.text().catch(() => '')
return {
success: response.ok,
status: response.status,
statusText: response.statusText,
body: responseBody.slice(0, 500),
timestamp: new Date().toISOString(),
}
} catch (error: unknown) {
clearTimeout(timeoutId)
const err = error as Error & { name?: string }
if (err.name === 'AbortError') {
return { success: false, error: 'Request timeout after 10 seconds' }
}
return { success: false, error: err.message }
}
}
async function testEmail(subscription: typeof workspaceNotificationSubscription.$inferSelect) {
if (!subscription.emailRecipients || subscription.emailRecipients.length === 0) {
return { success: false, error: 'No email recipients configured' }
}
const { payload } = buildTestPayload(subscription)
const data = (payload as Record<string, unknown>).data as Record<string, unknown>
const result = await sendEmail({
to: subscription.emailRecipients,
subject: `[Test] Workflow Execution: ${data.workflowName}`,
text: `This is a test notification from Sim Studio.\n\nWorkflow: ${data.workflowName}\nStatus: ${data.status}\nDuration: ${data.totalDurationMs}ms\n\nThis notification is configured for workspace notifications.`,
html: `
<div style="font-family: sans-serif; max-width: 600px; margin: 0 auto;">
<h2 style="color: #7F2FFF;">Test Notification</h2>
<p>This is a test notification from Sim Studio.</p>
<table style="width: 100%; border-collapse: collapse; margin: 20px 0;">
<tr><td style="padding: 8px; border: 1px solid #eee;"><strong>Workflow</strong></td><td style="padding: 8px; border: 1px solid #eee;">${data.workflowName}</td></tr>
<tr><td style="padding: 8px; border: 1px solid #eee;"><strong>Status</strong></td><td style="padding: 8px; border: 1px solid #eee;">${data.status}</td></tr>
<tr><td style="padding: 8px; border: 1px solid #eee;"><strong>Duration</strong></td><td style="padding: 8px; border: 1px solid #eee;">${data.totalDurationMs}ms</td></tr>
</table>
<p style="color: #666; font-size: 12px;">This notification is configured for workspace notifications.</p>
</div>
`,
emailType: 'notifications',
})
return {
success: result.success,
message: result.message,
timestamp: new Date().toISOString(),
}
}
async function testSlack(
subscription: typeof workspaceNotificationSubscription.$inferSelect,
userId: string
) {
const slackConfig = subscription.slackConfig as SlackConfig | null
if (!slackConfig?.channelId || !slackConfig?.accountId) {
return { success: false, error: 'No Slack channel or account configured' }
}
const [slackAccount] = await db
.select({ accessToken: account.accessToken })
.from(account)
.where(and(eq(account.id, slackConfig.accountId), eq(account.userId, userId)))
.limit(1)
if (!slackAccount?.accessToken) {
return { success: false, error: 'Slack account not found or not connected' }
}
const { payload } = buildTestPayload(subscription)
const data = (payload as Record<string, unknown>).data as Record<string, unknown>
const slackPayload = {
channel: slackConfig.channelId,
blocks: [
{
type: 'header',
text: { type: 'plain_text', text: '🧪 Test Notification', emoji: true },
},
{
type: 'section',
fields: [
{ type: 'mrkdwn', text: `*Workflow:*\n${data.workflowName}` },
{ type: 'mrkdwn', text: `*Status:*\n✅ ${data.status}` },
{ type: 'mrkdwn', text: `*Duration:*\n${data.totalDurationMs}ms` },
{ type: 'mrkdwn', text: `*Trigger:*\n${data.trigger}` },
],
},
{
type: 'context',
elements: [
{
type: 'mrkdwn',
text: 'This is a test notification from Sim Studio workspace notifications.',
},
],
},
],
text: `Test notification: ${data.workflowName} - ${data.status}`,
}
try {
const response = await fetch('https://slack.com/api/chat.postMessage', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${slackAccount.accessToken}`,
},
body: JSON.stringify(slackPayload),
})
const result = await response.json()
return {
success: result.ok,
error: result.error,
channel: result.channel,
timestamp: new Date().toISOString(),
}
} catch (error: unknown) {
const err = error as Error
return { success: false, error: err.message }
}
}
export async function POST(request: NextRequest, { params }: RouteParams) {
try {
const session = await getSession()
if (!session?.user?.id) {
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const { id: workspaceId, notificationId } = await params
const permission = await getUserEntityPermissions(session.user.id, 'workspace', workspaceId)
if (permission !== 'write' && permission !== 'admin') {
return NextResponse.json({ error: 'Insufficient permissions' }, { status: 403 })
}
const [subscription] = await db
.select()
.from(workspaceNotificationSubscription)
.where(
and(
eq(workspaceNotificationSubscription.id, notificationId),
eq(workspaceNotificationSubscription.workspaceId, workspaceId)
)
)
.limit(1)
if (!subscription) {
return NextResponse.json({ error: 'Notification not found' }, { status: 404 })
}
let result: Record<string, unknown>
switch (subscription.notificationType) {
case 'webhook':
result = await testWebhook(subscription)
break
case 'email':
result = await testEmail(subscription)
break
case 'slack':
result = await testSlack(subscription, session.user.id)
break
default:
return NextResponse.json({ error: 'Unknown notification type' }, { status: 400 })
}
logger.info('Test notification sent', {
workspaceId,
subscriptionId: notificationId,
type: subscription.notificationType,
success: result.success,
})
return NextResponse.json({ data: result })
} catch (error) {
logger.error('Error testing notification', { error })
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
}
}

View File

@@ -0,0 +1,8 @@
/** Maximum email recipients per notification */
export const MAX_EMAIL_RECIPIENTS = 10
/** Maximum notifications per type per workspace */
export const MAX_NOTIFICATIONS_PER_TYPE = 10
/** Maximum workflow IDs per notification */
export const MAX_WORKFLOW_IDS = 1000

View File

@@ -0,0 +1,284 @@
import { db } from '@sim/db'
import { workflow, workspaceNotificationSubscription } from '@sim/db/schema'
import { and, eq, inArray } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { v4 as uuidv4 } from 'uuid'
import { z } from 'zod'
import { getSession } from '@/lib/auth'
import { encryptSecret } from '@/lib/core/security/encryption'
import { createLogger } from '@/lib/logs/console/logger'
import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils'
import { MAX_EMAIL_RECIPIENTS, MAX_NOTIFICATIONS_PER_TYPE, MAX_WORKFLOW_IDS } from './constants'
const logger = createLogger('WorkspaceNotificationsAPI')
const notificationTypeSchema = z.enum(['webhook', 'email', 'slack'])
const levelFilterSchema = z.array(z.enum(['info', 'error']))
const triggerFilterSchema = z.array(z.enum(['api', 'webhook', 'schedule', 'manual', 'chat']))
const alertRuleSchema = z.enum([
'consecutive_failures',
'failure_rate',
'latency_threshold',
'latency_spike',
'cost_threshold',
'no_activity',
'error_count',
])
const alertConfigSchema = z
.object({
rule: alertRuleSchema,
consecutiveFailures: z.number().int().min(1).max(100).optional(),
failureRatePercent: z.number().int().min(1).max(100).optional(),
windowHours: z.number().int().min(1).max(168).optional(),
durationThresholdMs: z.number().int().min(1000).max(3600000).optional(),
latencySpikePercent: z.number().int().min(10).max(1000).optional(),
costThresholdDollars: z.number().min(0.01).max(1000).optional(),
inactivityHours: z.number().int().min(1).max(168).optional(),
errorCountThreshold: z.number().int().min(1).max(1000).optional(),
})
.refine(
(data) => {
switch (data.rule) {
case 'consecutive_failures':
return data.consecutiveFailures !== undefined
case 'failure_rate':
return data.failureRatePercent !== undefined && data.windowHours !== undefined
case 'latency_threshold':
return data.durationThresholdMs !== undefined
case 'latency_spike':
return data.latencySpikePercent !== undefined && data.windowHours !== undefined
case 'cost_threshold':
return data.costThresholdDollars !== undefined
case 'no_activity':
return data.inactivityHours !== undefined
case 'error_count':
return data.errorCountThreshold !== undefined && data.windowHours !== undefined
default:
return false
}
},
{ message: 'Missing required fields for alert rule' }
)
.nullable()
const webhookConfigSchema = z.object({
url: z.string().url(),
secret: z.string().optional(),
})
const slackConfigSchema = z.object({
channelId: z.string(),
channelName: z.string(),
accountId: z.string(),
})
const createNotificationSchema = z
.object({
notificationType: notificationTypeSchema,
workflowIds: z.array(z.string()).max(MAX_WORKFLOW_IDS).default([]),
allWorkflows: z.boolean().default(false),
levelFilter: levelFilterSchema.default(['info', 'error']),
triggerFilter: triggerFilterSchema.default(['api', 'webhook', 'schedule', 'manual', 'chat']),
includeFinalOutput: z.boolean().default(false),
includeTraceSpans: z.boolean().default(false),
includeRateLimits: z.boolean().default(false),
includeUsageData: z.boolean().default(false),
alertConfig: alertConfigSchema.optional(),
webhookConfig: webhookConfigSchema.optional(),
emailRecipients: z.array(z.string().email()).max(MAX_EMAIL_RECIPIENTS).optional(),
slackConfig: slackConfigSchema.optional(),
})
.refine(
(data) => {
if (data.notificationType === 'webhook') return !!data.webhookConfig?.url
if (data.notificationType === 'email')
return !!data.emailRecipients && data.emailRecipients.length > 0
if (data.notificationType === 'slack')
return !!data.slackConfig?.channelId && !!data.slackConfig?.accountId
return false
},
{ message: 'Missing required fields for notification type' }
)
.refine((data) => !(data.allWorkflows && data.workflowIds.length > 0), {
message: 'Cannot specify both allWorkflows and workflowIds',
})
async function checkWorkspaceWriteAccess(
userId: string,
workspaceId: string
): Promise<{ hasAccess: boolean; permission: string | null }> {
const permission = await getUserEntityPermissions(userId, 'workspace', workspaceId)
const hasAccess = permission === 'write' || permission === 'admin'
return { hasAccess, permission }
}
export async function GET(request: NextRequest, { params }: { params: Promise<{ id: string }> }) {
try {
const session = await getSession()
if (!session?.user?.id) {
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const { id: workspaceId } = await params
const permission = await getUserEntityPermissions(session.user.id, 'workspace', workspaceId)
if (!permission) {
return NextResponse.json({ error: 'Workspace not found' }, { status: 404 })
}
const subscriptions = await db
.select({
id: workspaceNotificationSubscription.id,
notificationType: workspaceNotificationSubscription.notificationType,
workflowIds: workspaceNotificationSubscription.workflowIds,
allWorkflows: workspaceNotificationSubscription.allWorkflows,
levelFilter: workspaceNotificationSubscription.levelFilter,
triggerFilter: workspaceNotificationSubscription.triggerFilter,
includeFinalOutput: workspaceNotificationSubscription.includeFinalOutput,
includeTraceSpans: workspaceNotificationSubscription.includeTraceSpans,
includeRateLimits: workspaceNotificationSubscription.includeRateLimits,
includeUsageData: workspaceNotificationSubscription.includeUsageData,
webhookConfig: workspaceNotificationSubscription.webhookConfig,
emailRecipients: workspaceNotificationSubscription.emailRecipients,
slackConfig: workspaceNotificationSubscription.slackConfig,
alertConfig: workspaceNotificationSubscription.alertConfig,
active: workspaceNotificationSubscription.active,
createdAt: workspaceNotificationSubscription.createdAt,
updatedAt: workspaceNotificationSubscription.updatedAt,
})
.from(workspaceNotificationSubscription)
.where(eq(workspaceNotificationSubscription.workspaceId, workspaceId))
.orderBy(workspaceNotificationSubscription.createdAt)
return NextResponse.json({ data: subscriptions })
} catch (error) {
logger.error('Error fetching notifications', { error })
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
}
}
export async function POST(request: NextRequest, { params }: { params: Promise<{ id: string }> }) {
try {
const session = await getSession()
if (!session?.user?.id) {
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const { id: workspaceId } = await params
const { hasAccess } = await checkWorkspaceWriteAccess(session.user.id, workspaceId)
if (!hasAccess) {
return NextResponse.json({ error: 'Insufficient permissions' }, { status: 403 })
}
const body = await request.json()
const validationResult = createNotificationSchema.safeParse(body)
if (!validationResult.success) {
return NextResponse.json(
{ error: 'Invalid request', details: validationResult.error.errors },
{ status: 400 }
)
}
const data = validationResult.data
const existingCount = await db
.select({ id: workspaceNotificationSubscription.id })
.from(workspaceNotificationSubscription)
.where(
and(
eq(workspaceNotificationSubscription.workspaceId, workspaceId),
eq(workspaceNotificationSubscription.notificationType, data.notificationType)
)
)
if (existingCount.length >= MAX_NOTIFICATIONS_PER_TYPE) {
return NextResponse.json(
{
error: `Maximum ${MAX_NOTIFICATIONS_PER_TYPE} ${data.notificationType} notifications per workspace`,
},
{ status: 400 }
)
}
if (!data.allWorkflows && data.workflowIds.length > 0) {
const workflowsInWorkspace = await db
.select({ id: workflow.id })
.from(workflow)
.where(and(eq(workflow.workspaceId, workspaceId), inArray(workflow.id, data.workflowIds)))
const validIds = new Set(workflowsInWorkspace.map((w) => w.id))
const invalidIds = data.workflowIds.filter((id) => !validIds.has(id))
if (invalidIds.length > 0) {
return NextResponse.json(
{ error: 'Some workflow IDs do not belong to this workspace', invalidIds },
{ status: 400 }
)
}
}
// Encrypt webhook secret if provided
let webhookConfig = data.webhookConfig || null
if (webhookConfig?.secret) {
const { encrypted } = await encryptSecret(webhookConfig.secret)
webhookConfig = { ...webhookConfig, secret: encrypted }
}
const [subscription] = await db
.insert(workspaceNotificationSubscription)
.values({
id: uuidv4(),
workspaceId,
notificationType: data.notificationType,
workflowIds: data.workflowIds,
allWorkflows: data.allWorkflows,
levelFilter: data.levelFilter,
triggerFilter: data.triggerFilter,
includeFinalOutput: data.includeFinalOutput,
includeTraceSpans: data.includeTraceSpans,
includeRateLimits: data.includeRateLimits,
includeUsageData: data.includeUsageData,
alertConfig: data.alertConfig || null,
webhookConfig,
emailRecipients: data.emailRecipients || null,
slackConfig: data.slackConfig || null,
createdBy: session.user.id,
})
.returning()
logger.info('Created notification subscription', {
workspaceId,
subscriptionId: subscription.id,
type: data.notificationType,
})
return NextResponse.json({
data: {
id: subscription.id,
notificationType: subscription.notificationType,
workflowIds: subscription.workflowIds,
allWorkflows: subscription.allWorkflows,
levelFilter: subscription.levelFilter,
triggerFilter: subscription.triggerFilter,
includeFinalOutput: subscription.includeFinalOutput,
includeTraceSpans: subscription.includeTraceSpans,
includeRateLimits: subscription.includeRateLimits,
includeUsageData: subscription.includeUsageData,
webhookConfig: subscription.webhookConfig,
emailRecipients: subscription.emailRecipients,
slackConfig: subscription.slackConfig,
alertConfig: subscription.alertConfig,
active: subscription.active,
createdAt: subscription.createdAt,
updatedAt: subscription.updatedAt,
},
})
} catch (error) {
logger.error('Error creating notification', { error })
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
}
}

View File

@@ -1,11 +1,36 @@
import type { ReactNode } from 'react'
import { ArrowUp, Loader2, RefreshCw, Search } from 'lucide-react'
import { Button, Tooltip } from '@/components/emcn'
import { ArrowUp, Bell, Loader2, RefreshCw, Search } from 'lucide-react'
import {
Button,
Popover,
PopoverContent,
PopoverItem,
PopoverScrollArea,
PopoverTrigger,
Tooltip,
} from '@/components/emcn'
import { MoreHorizontal } from '@/components/emcn/icons'
import { Input } from '@/components/ui/input'
import { cn } from '@/lib/core/utils/cn'
import { soehne } from '@/app/_styles/fonts/soehne/soehne'
import Timeline from '@/app/workspace/[workspaceId]/logs/components/filters/components/timeline'
interface ControlsProps {
searchQuery?: string
setSearchQuery?: (v: string) => void
isRefetching: boolean
resetToNow: () => void
live: boolean
setLive: (v: (prev: boolean) => boolean) => void
viewMode: string
setViewMode: (mode: 'logs' | 'dashboard') => void
searchComponent?: ReactNode
showExport?: boolean
onExport?: () => void
canConfigureNotifications?: boolean
onConfigureNotifications?: () => void
}
export function Controls({
searchQuery,
setSearchQuery,
@@ -17,19 +42,9 @@ export function Controls({
setViewMode,
searchComponent,
onExport,
}: {
searchQuery?: string
setSearchQuery?: (v: string) => void
isRefetching: boolean
resetToNow: () => void
live: boolean
setLive: (v: (prev: boolean) => boolean) => void
viewMode: string
setViewMode: (mode: 'logs' | 'dashboard') => void
searchComponent?: ReactNode
showExport?: boolean
onExport?: () => void
}) {
canConfigureNotifications,
onConfigureNotifications,
}: ControlsProps) {
return (
<div
className={cn(
@@ -72,20 +87,29 @@ export function Controls({
<div className='ml-auto flex flex-shrink-0 items-center gap-3'>
{viewMode !== 'dashboard' && (
<Tooltip.Root>
<Tooltip.Trigger asChild>
<Button
variant='ghost'
onClick={onExport}
className='h-9 w-9 p-0 hover:bg-secondary'
aria-label='Export CSV'
>
<ArrowUp className='h-4 w-4' />
<span className='sr-only'>Export CSV</span>
<Popover>
<PopoverTrigger asChild>
<Button variant='ghost' className='h-9 w-9 p-0 hover:bg-secondary'>
<MoreHorizontal className='h-4 w-4' />
<span className='sr-only'>More options</span>
</Button>
</Tooltip.Trigger>
<Tooltip.Content>Export CSV</Tooltip.Content>
</Tooltip.Root>
</PopoverTrigger>
<PopoverContent align='end' sideOffset={4}>
<PopoverScrollArea>
<PopoverItem onClick={onExport}>
<ArrowUp className='h-3 w-3' />
<span>Export as CSV</span>
</PopoverItem>
<PopoverItem
onClick={canConfigureNotifications ? onConfigureNotifications : undefined}
disabled={!canConfigureNotifications}
>
<Bell className='h-3 w-3' />
<span>Configure Notifications</span>
</PopoverItem>
</PopoverScrollArea>
</PopoverContent>
</Popover>
)}
<Tooltip.Root>

View File

@@ -0,0 +1,2 @@
export { NotificationSettings } from './notification-settings'
export { WorkflowSelector } from './workflow-selector'

View File

@@ -0,0 +1,116 @@
'use client'
import { useCallback, useEffect, useState } from 'react'
import { Hash, Lock } from 'lucide-react'
import { Combobox, type ComboboxOption } from '@/components/emcn'
import { createLogger } from '@/lib/logs/console/logger'
const logger = createLogger('SlackChannelSelector')
interface SlackChannel {
id: string
name: string
isPrivate: boolean
}
interface SlackChannelSelectorProps {
accountId: string
value: string
onChange: (channelId: string, channelName: string) => void
disabled?: boolean
error?: string
}
/**
* Standalone Slack channel selector that fetches channels for a given account.
*/
export function SlackChannelSelector({
accountId,
value,
onChange,
disabled = false,
error,
}: SlackChannelSelectorProps) {
const [channels, setChannels] = useState<SlackChannel[]>([])
const [isLoading, setIsLoading] = useState(false)
const [fetchError, setFetchError] = useState<string | null>(null)
const fetchChannels = useCallback(async () => {
if (!accountId) {
setChannels([])
return
}
setIsLoading(true)
setFetchError(null)
try {
const response = await fetch('/api/tools/slack/channels', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ credential: accountId }),
})
if (!response.ok) {
const data = await response.json().catch(() => ({}))
throw new Error(data.error || 'Failed to fetch channels')
}
const data = await response.json()
setChannels(data.channels || [])
} catch (err) {
logger.error('Failed to fetch Slack channels', { error: err })
setFetchError(err instanceof Error ? err.message : 'Failed to fetch channels')
setChannels([])
} finally {
setIsLoading(false)
}
}, [accountId])
useEffect(() => {
fetchChannels()
}, [fetchChannels])
const options: ComboboxOption[] = channels.map((channel) => ({
label: channel.name,
value: channel.id,
icon: channel.isPrivate ? Lock : Hash,
}))
const selectedChannel = channels.find((c) => c.id === value)
if (!accountId) {
return (
<div className='rounded-[8px] border border-dashed p-3 text-center'>
<p className='text-muted-foreground text-sm'>Select a Slack account first</p>
</div>
)
}
const handleChange = (channelId: string) => {
const channel = channels.find((c) => c.id === channelId)
onChange(channelId, channel?.name || '')
}
return (
<div className='space-y-1'>
<Combobox
options={options}
value={value}
onChange={handleChange}
placeholder={
channels.length === 0 && !isLoading ? 'No channels available' : 'Select channel...'
}
disabled={disabled || channels.length === 0}
isLoading={isLoading}
error={fetchError}
/>
{selectedChannel && !fetchError && (
<p className='text-muted-foreground text-xs'>
{selectedChannel.isPrivate ? 'Private' : 'Public'} channel: #{selectedChannel.name}
</p>
)}
{error && <p className='text-red-400 text-xs'>{error}</p>}
</div>
)
}

View File

@@ -0,0 +1,183 @@
'use client'
import { useEffect, useMemo, useState } from 'react'
import { Layers, X } from 'lucide-react'
import { Button, Combobox, type ComboboxOption } from '@/components/emcn'
import { Label, Skeleton } from '@/components/ui'
interface WorkflowSelectorProps {
workspaceId: string
selectedIds: string[]
allWorkflows: boolean
onChange: (ids: string[], allWorkflows: boolean) => void
error?: string
}
const ALL_WORKFLOWS_VALUE = '__all_workflows__'
/**
* Multi-select workflow selector with "All Workflows" option.
*/
export function WorkflowSelector({
workspaceId,
selectedIds,
allWorkflows,
onChange,
error,
}: WorkflowSelectorProps) {
const [workflows, setWorkflows] = useState<Array<{ id: string; name: string }>>([])
const [isLoading, setIsLoading] = useState(true)
useEffect(() => {
const load = async () => {
try {
setIsLoading(true)
const response = await fetch(`/api/workflows?workspaceId=${workspaceId}`)
if (response.ok) {
const data = await response.json()
setWorkflows(data.data || [])
}
} catch {
setWorkflows([])
} finally {
setIsLoading(false)
}
}
load()
}, [workspaceId])
const options: ComboboxOption[] = useMemo(() => {
const workflowOptions = workflows.map((w) => ({
label: w.name,
value: w.id,
}))
return [
{
label: 'All Workflows',
value: ALL_WORKFLOWS_VALUE,
icon: Layers,
},
...workflowOptions,
]
}, [workflows])
const currentValues = useMemo(() => {
if (allWorkflows) {
return [ALL_WORKFLOWS_VALUE]
}
return selectedIds
}, [allWorkflows, selectedIds])
const handleMultiSelectChange = (values: string[]) => {
const hasAllWorkflows = values.includes(ALL_WORKFLOWS_VALUE)
const hadAllWorkflows = allWorkflows
if (hasAllWorkflows && !hadAllWorkflows) {
// User selected "All Workflows" - clear individual selections
onChange([], true)
} else if (!hasAllWorkflows && hadAllWorkflows) {
// User deselected "All Workflows" - switch to individual selection
onChange(
values.filter((v) => v !== ALL_WORKFLOWS_VALUE),
false
)
} else {
// Normal individual workflow selection/deselection
onChange(
values.filter((v) => v !== ALL_WORKFLOWS_VALUE),
false
)
}
}
const handleRemove = (e: React.MouseEvent, id: string) => {
e.preventDefault()
e.stopPropagation()
if (id === ALL_WORKFLOWS_VALUE) {
onChange([], false)
} else {
onChange(
selectedIds.filter((i) => i !== id),
false
)
}
}
const selectedWorkflows = useMemo(() => {
return workflows.filter((w) => selectedIds.includes(w.id))
}, [workflows, selectedIds])
// Render overlay content showing selected items as tags
const overlayContent = useMemo(() => {
if (allWorkflows) {
return (
<div className='flex items-center gap-1'>
<Button
variant='outline'
className='pointer-events-auto h-6 gap-1 rounded-[6px] px-2 text-[11px]'
onMouseDown={(e) => handleRemove(e, ALL_WORKFLOWS_VALUE)}
>
<Layers className='h-3 w-3' />
All Workflows
<X className='h-3 w-3' />
</Button>
</div>
)
}
if (selectedWorkflows.length === 0) {
return null
}
return (
<div className='flex items-center gap-1 overflow-hidden'>
{selectedWorkflows.slice(0, 2).map((w) => (
<Button
key={w.id}
variant='outline'
className='pointer-events-auto h-6 gap-1 rounded-[6px] px-2 text-[11px]'
onMouseDown={(e) => handleRemove(e, w.id)}
>
{w.name}
<X className='h-3 w-3' />
</Button>
))}
{selectedWorkflows.length > 2 && (
<span className='flex h-6 items-center rounded-[6px] border px-2 text-[11px]'>
+{selectedWorkflows.length - 2}
</span>
)}
</div>
)
}, [allWorkflows, selectedWorkflows, selectedIds])
if (isLoading) {
return (
<div className='space-y-2'>
<Label className='font-medium text-sm'>Workflows</Label>
<Skeleton className='h-9 w-full rounded-[4px]' />
</div>
)
}
return (
<div className='space-y-2'>
<Label className='font-medium text-sm'>Workflows</Label>
<Combobox
options={options}
multiSelect
multiSelectValues={currentValues}
onMultiSelectChange={handleMultiSelectChange}
placeholder='Select workflows...'
error={error}
overlayContent={overlayContent}
searchable
searchPlaceholder='Search workflows...'
/>
<p className='text-muted-foreground text-xs'>
Select which workflows should trigger this notification
</p>
</div>
)
}

View File

@@ -1,6 +1,6 @@
'use client'
import { useEffect, useMemo, useState } from 'react'
import { useEffect, useMemo, useRef, useState } from 'react'
import { Search, X } from 'lucide-react'
import { useParams } from 'next/navigation'
import { Button, Popover, PopoverAnchor, PopoverContent } from '@/components/emcn'
@@ -120,6 +120,17 @@ export function AutocompleteSearch({
getSuggestions: (input) => suggestionEngine.getSuggestions(input),
})
const lastExternalValue = useRef(value)
useEffect(() => {
// Only re-initialize if value changed externally (not from user typing)
if (value !== lastExternalValue.current) {
lastExternalValue.current = value
const parsed = parseQuery(value)
initializeFromQuery(parsed.textSearch, parsed.filters)
}
}, [value, initializeFromQuery])
// Initial sync on mount
useEffect(() => {
if (value) {
const parsed = parseQuery(value)

View File

@@ -8,10 +8,12 @@ import { cn } from '@/lib/core/utils/cn'
import { getIntegrationMetadata } from '@/lib/logs/get-trigger-options'
import { parseQuery, queryToApiParams } from '@/lib/logs/query-parser'
import Controls from '@/app/workspace/[workspaceId]/logs/components/dashboard/controls'
import { NotificationSettings } from '@/app/workspace/[workspaceId]/logs/components/notification-settings/notification-settings'
import { AutocompleteSearch } from '@/app/workspace/[workspaceId]/logs/components/search/search'
import { Sidebar } from '@/app/workspace/[workspaceId]/logs/components/sidebar/sidebar'
import Dashboard from '@/app/workspace/[workspaceId]/logs/dashboard'
import { formatDate } from '@/app/workspace/[workspaceId]/logs/utils'
import { useUserPermissionsContext } from '@/app/workspace/[workspaceId]/providers/workspace-permissions-provider'
import { useFolders } from '@/hooks/queries/folders'
import { useLogDetail, useLogsList } from '@/hooks/queries/logs'
import { useDebounce } from '@/hooks/use-debounce'
@@ -58,7 +60,6 @@ export default function Logs() {
level,
workflowIds,
folderIds,
searchQuery: storeSearchQuery,
setSearchQuery: setStoreSearchQuery,
triggers,
viewMode,
@@ -77,14 +78,24 @@ export default function Logs() {
const scrollContainerRef = useRef<HTMLDivElement>(null)
const isInitialized = useRef<boolean>(false)
const [searchQuery, setSearchQuery] = useState(storeSearchQuery)
const [searchQuery, setSearchQuery] = useState('')
const debouncedSearchQuery = useDebounce(searchQuery, 300)
// Sync search query from URL on mount (client-side only)
useEffect(() => {
const urlSearch = new URLSearchParams(window.location.search).get('search') || ''
if (urlSearch && urlSearch !== searchQuery) {
setSearchQuery(urlSearch)
}
}, [])
const [, setAvailableWorkflows] = useState<string[]>([])
const [, setAvailableFolders] = useState<string[]>([])
const [isLive, setIsLive] = useState(false)
const isSearchOpenRef = useRef<boolean>(false)
const [isNotificationSettingsOpen, setIsNotificationSettingsOpen] = useState(false)
const userPermissions = useUserPermissionsContext()
const logFilters = useMemo(
() => ({
@@ -111,10 +122,6 @@ export default function Logs() {
return logsQuery.data.pages.flatMap((page) => page.logs)
}, [logsQuery.data?.pages])
useEffect(() => {
setSearchQuery(storeSearchQuery)
}, [storeSearchQuery])
const foldersQuery = useFolders(workspaceId)
const { getFolderTree } = useFolderStore()
@@ -166,10 +173,10 @@ export default function Logs() {
}, [workspaceId, getFolderTree, foldersQuery.data])
useEffect(() => {
if (isInitialized.current && debouncedSearchQuery !== storeSearchQuery) {
if (isInitialized.current) {
setStoreSearchQuery(debouncedSearchQuery)
}
}, [debouncedSearchQuery, storeSearchQuery])
}, [debouncedSearchQuery, setStoreSearchQuery])
const handleLogClick = (log: WorkflowLog) => {
setSelectedLog(log)
@@ -249,6 +256,8 @@ export default function Logs() {
useEffect(() => {
const handlePopState = () => {
initializeFromURL()
const params = new URLSearchParams(window.location.search)
setSearchQuery(params.get('search') || '')
}
window.addEventListener('popstate', handlePopState)
@@ -381,6 +390,8 @@ export default function Logs() {
}
showExport={true}
onExport={handleExport}
canConfigureNotifications={userPermissions.canEdit}
onConfigureNotifications={() => setIsNotificationSettingsOpen(true)}
/>
{/* Table container */}
@@ -599,6 +610,12 @@ export default function Logs() {
hasNext={selectedLogIndex < logs.length - 1}
hasPrev={selectedLogIndex > 0}
/>
<NotificationSettings
workspaceId={workspaceId}
open={isNotificationSettingsOpen}
onOpenChange={setIsNotificationSettingsOpen}
/>
</div>
)
}

View File

@@ -1,404 +0,0 @@
import { createHmac } from 'crypto'
import { db } from '@sim/db'
import {
workflowLogWebhook,
workflowLogWebhookDelivery,
workflow as workflowTable,
} from '@sim/db/schema'
import { task, wait } from '@trigger.dev/sdk'
import { and, eq, isNull, lte, or, sql } from 'drizzle-orm'
import { v4 as uuidv4 } from 'uuid'
import { decryptSecret } from '@/lib/core/security/encryption'
import { createLogger } from '@/lib/logs/console/logger'
import type { WorkflowExecutionLog } from '@/lib/logs/types'
const logger = createLogger('LogsWebhookDelivery')
// Quick retry strategy: 5 attempts over ~15 minutes
// Most webhook failures are transient and resolve quickly
const MAX_ATTEMPTS = 5
const RETRY_DELAYS = [
5 * 1000, // 5 seconds (1st retry)
15 * 1000, // 15 seconds (2nd retry)
60 * 1000, // 1 minute (3rd retry)
3 * 60 * 1000, // 3 minutes (4th retry)
10 * 60 * 1000, // 10 minutes (5th and final retry)
]
// Add jitter to prevent thundering herd problem (up to 10% of delay)
function getRetryDelayWithJitter(baseDelay: number): number {
const jitter = Math.random() * 0.1 * baseDelay
return Math.floor(baseDelay + jitter)
}
interface WebhookPayload {
id: string
type: 'workflow.execution.completed'
timestamp: number
data: {
workflowId: string
executionId: string
status: 'success' | 'error'
level: string
trigger: string
startedAt: string
endedAt: string
totalDurationMs: number
cost?: any
files?: any
finalOutput?: any
traceSpans?: any[]
rateLimits?: {
sync: {
limit: number
remaining: number
resetAt: string
}
async: {
limit: number
remaining: number
resetAt: string
}
}
usage?: {
currentPeriodCost: number
limit: number
plan: string
isExceeded: boolean
}
}
links: {
log: string
execution: string
}
}
function generateSignature(secret: string, timestamp: number, body: string): string {
const signatureBase = `${timestamp}.${body}`
const hmac = createHmac('sha256', secret)
hmac.update(signatureBase)
return hmac.digest('hex')
}
export const logsWebhookDelivery = task({
id: 'logs-webhook-delivery',
retry: {
maxAttempts: 1, // We handle retries manually within the task
},
run: async (params: {
deliveryId: string
subscriptionId: string
log: WorkflowExecutionLog
}) => {
const { deliveryId, subscriptionId, log } = params
try {
const [subscription] = await db
.select()
.from(workflowLogWebhook)
.where(eq(workflowLogWebhook.id, subscriptionId))
.limit(1)
if (!subscription || !subscription.active) {
logger.warn(`Subscription ${subscriptionId} not found or inactive`)
await db
.update(workflowLogWebhookDelivery)
.set({
status: 'failed',
errorMessage: 'Subscription not found or inactive',
updatedAt: new Date(),
})
.where(eq(workflowLogWebhookDelivery.id, deliveryId))
return
}
// Atomically claim this delivery row for processing and increment attempts
const claimed = await db
.update(workflowLogWebhookDelivery)
.set({
status: 'in_progress',
attempts: sql`${workflowLogWebhookDelivery.attempts} + 1`,
lastAttemptAt: new Date(),
updatedAt: new Date(),
})
.where(
and(
eq(workflowLogWebhookDelivery.id, deliveryId),
eq(workflowLogWebhookDelivery.status, 'pending'),
// Only claim if not scheduled in the future or schedule has arrived
or(
isNull(workflowLogWebhookDelivery.nextAttemptAt),
lte(workflowLogWebhookDelivery.nextAttemptAt, new Date())
)
)
)
.returning({ attempts: workflowLogWebhookDelivery.attempts })
if (claimed.length === 0) {
logger.info(`Delivery ${deliveryId} not claimable (already in progress or not due)`)
return
}
const attempts = claimed[0].attempts
const timestamp = Date.now()
const eventId = `evt_${uuidv4()}`
const payload: WebhookPayload = {
id: eventId,
type: 'workflow.execution.completed',
timestamp,
data: {
workflowId: log.workflowId,
executionId: log.executionId,
status: log.level === 'error' ? 'error' : 'success',
level: log.level,
trigger: log.trigger,
startedAt: log.startedAt,
endedAt: log.endedAt || log.startedAt,
totalDurationMs: log.totalDurationMs,
cost: log.cost,
files: (log as any).files,
},
links: {
log: `/v1/logs/${log.id}`,
execution: `/v1/logs/executions/${log.executionId}`,
},
}
if (subscription.includeFinalOutput && log.executionData) {
payload.data.finalOutput = (log.executionData as any).finalOutput
}
if (subscription.includeTraceSpans && log.executionData) {
payload.data.traceSpans = (log.executionData as any).traceSpans
}
// Fetch rate limits and usage data if requested
if ((subscription.includeRateLimits || subscription.includeUsageData) && log.executionData) {
const executionData = log.executionData as any
const needsRateLimits = subscription.includeRateLimits && executionData.includeRateLimits
const needsUsage = subscription.includeUsageData && executionData.includeUsageData
if (needsRateLimits || needsUsage) {
const { getUserLimits } = await import('@/app/api/v1/logs/meta')
const workflow = await db
.select()
.from(workflowTable)
.where(eq(workflowTable.id, log.workflowId))
.limit(1)
if (workflow.length > 0) {
try {
const limits = await getUserLimits(workflow[0].userId)
if (needsRateLimits) {
payload.data.rateLimits = limits.workflowExecutionRateLimit
}
if (needsUsage) {
payload.data.usage = limits.usage
}
} catch (error) {
logger.warn('Failed to fetch limits/usage for webhook', { error })
}
}
}
}
const body = JSON.stringify(payload)
const headers: Record<string, string> = {
'Content-Type': 'application/json',
'sim-event': 'workflow.execution.completed',
'sim-timestamp': timestamp.toString(),
'sim-delivery-id': deliveryId,
'Idempotency-Key': deliveryId,
}
if (subscription.secret) {
const { decrypted } = await decryptSecret(subscription.secret)
const signature = generateSignature(decrypted, timestamp, body)
headers['sim-signature'] = `t=${timestamp},v1=${signature}`
}
logger.info(`Attempting webhook delivery ${deliveryId} (attempt ${attempts})`, {
url: subscription.url,
executionId: log.executionId,
})
const controller = new AbortController()
const timeoutId = setTimeout(() => controller.abort(), 30000)
try {
const response = await fetch(subscription.url, {
method: 'POST',
headers,
body,
signal: controller.signal,
})
clearTimeout(timeoutId)
const responseBody = await response.text().catch(() => '')
const truncatedBody = responseBody.slice(0, 1000)
if (response.ok) {
await db
.update(workflowLogWebhookDelivery)
.set({
status: 'success',
attempts,
lastAttemptAt: new Date(),
responseStatus: response.status,
responseBody: truncatedBody,
errorMessage: null,
updatedAt: new Date(),
})
.where(
and(
eq(workflowLogWebhookDelivery.id, deliveryId),
eq(workflowLogWebhookDelivery.status, 'in_progress')
)
)
logger.info(`Webhook delivery ${deliveryId} succeeded`, {
status: response.status,
executionId: log.executionId,
})
return { success: true }
}
const isRetryable = response.status >= 500 || response.status === 429
if (!isRetryable || attempts >= MAX_ATTEMPTS) {
await db
.update(workflowLogWebhookDelivery)
.set({
status: 'failed',
attempts,
lastAttemptAt: new Date(),
responseStatus: response.status,
responseBody: truncatedBody,
errorMessage: `HTTP ${response.status}`,
updatedAt: new Date(),
})
.where(
and(
eq(workflowLogWebhookDelivery.id, deliveryId),
eq(workflowLogWebhookDelivery.status, 'in_progress')
)
)
logger.warn(`Webhook delivery ${deliveryId} failed permanently`, {
status: response.status,
attempts,
executionId: log.executionId,
})
return { success: false }
}
const baseDelay = RETRY_DELAYS[Math.min(attempts - 1, RETRY_DELAYS.length - 1)]
const delayWithJitter = getRetryDelayWithJitter(baseDelay)
const nextAttemptAt = new Date(Date.now() + delayWithJitter)
await db
.update(workflowLogWebhookDelivery)
.set({
status: 'pending',
attempts,
lastAttemptAt: new Date(),
nextAttemptAt,
responseStatus: response.status,
responseBody: truncatedBody,
errorMessage: `HTTP ${response.status} - will retry`,
updatedAt: new Date(),
})
.where(
and(
eq(workflowLogWebhookDelivery.id, deliveryId),
eq(workflowLogWebhookDelivery.status, 'in_progress')
)
)
// Schedule the next retry
await wait.for({ seconds: delayWithJitter / 1000 })
// Recursively call the task for retry
await logsWebhookDelivery.trigger({
deliveryId,
subscriptionId,
log,
})
return { success: false, retrying: true }
} catch (error: any) {
clearTimeout(timeoutId)
if (error.name === 'AbortError') {
logger.error(`Webhook delivery ${deliveryId} timed out`, {
executionId: log.executionId,
attempts,
})
error.message = 'Request timeout after 30 seconds'
}
const baseDelay = RETRY_DELAYS[Math.min(attempts - 1, RETRY_DELAYS.length - 1)]
const delayWithJitter = getRetryDelayWithJitter(baseDelay)
const nextAttemptAt = new Date(Date.now() + delayWithJitter)
await db
.update(workflowLogWebhookDelivery)
.set({
status: attempts >= MAX_ATTEMPTS ? 'failed' : 'pending',
attempts,
lastAttemptAt: new Date(),
nextAttemptAt: attempts >= MAX_ATTEMPTS ? null : nextAttemptAt,
errorMessage: error.message,
updatedAt: new Date(),
})
.where(
and(
eq(workflowLogWebhookDelivery.id, deliveryId),
eq(workflowLogWebhookDelivery.status, 'in_progress')
)
)
if (attempts >= MAX_ATTEMPTS) {
logger.error(`Webhook delivery ${deliveryId} failed after ${attempts} attempts`, {
error: error.message,
executionId: log.executionId,
})
return { success: false }
}
// Schedule the next retry
await wait.for({ seconds: delayWithJitter / 1000 })
// Recursively call the task for retry
await logsWebhookDelivery.trigger({
deliveryId,
subscriptionId,
log,
})
return { success: false, retrying: true }
}
} catch (error: any) {
logger.error(`Webhook delivery ${deliveryId} encountered unexpected error`, {
error: error.message,
stack: error.stack,
})
// Mark as failed for unexpected errors
await db
.update(workflowLogWebhookDelivery)
.set({
status: 'failed',
errorMessage: `Unexpected error: ${error.message}`,
updatedAt: new Date(),
})
.where(eq(workflowLogWebhookDelivery.id, deliveryId))
return { success: false, error: error.message }
}
},
})

View File

@@ -0,0 +1,686 @@
import { createHmac } from 'crypto'
import { db } from '@sim/db'
import {
account,
workflow as workflowTable,
workspaceNotificationDelivery,
workspaceNotificationSubscription,
} from '@sim/db/schema'
import { task } from '@trigger.dev/sdk'
import { and, eq, isNull, lte, or, sql } from 'drizzle-orm'
import { v4 as uuidv4 } from 'uuid'
import { checkUsageStatus } from '@/lib/billing/calculations/usage-monitor'
import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
import { decryptSecret } from '@/lib/core/security/encryption'
import { getBaseUrl } from '@/lib/core/utils/urls'
import { createLogger } from '@/lib/logs/console/logger'
import type { TraceSpan, WorkflowExecutionLog } from '@/lib/logs/types'
import { sendEmail } from '@/lib/messaging/email/mailer'
import type { AlertConfig } from '@/lib/notifications/alert-rules'
import { RateLimiter } from '@/services/queue'
const logger = createLogger('WorkspaceNotificationDelivery')
const MAX_ATTEMPTS = 5
const RETRY_DELAYS = [5 * 1000, 15 * 1000, 60 * 1000, 3 * 60 * 1000, 10 * 60 * 1000]
function getRetryDelayWithJitter(baseDelay: number): number {
const jitter = Math.random() * 0.1 * baseDelay
return Math.floor(baseDelay + jitter)
}
interface NotificationPayload {
id: string
type: 'workflow.execution.completed'
timestamp: number
data: {
workflowId: string
workflowName?: string
executionId: string
status: 'success' | 'error'
level: string
trigger: string
startedAt: string
endedAt: string
totalDurationMs: number
cost?: Record<string, unknown>
finalOutput?: unknown
traceSpans?: unknown[]
rateLimits?: Record<string, unknown>
usage?: Record<string, unknown>
}
}
function generateSignature(secret: string, timestamp: number, body: string): string {
const signatureBase = `${timestamp}.${body}`
const hmac = createHmac('sha256', secret)
hmac.update(signatureBase)
return hmac.digest('hex')
}
async function buildPayload(
log: WorkflowExecutionLog,
subscription: typeof workspaceNotificationSubscription.$inferSelect
): Promise<NotificationPayload> {
const workflowData = await db
.select({ name: workflowTable.name, userId: workflowTable.userId })
.from(workflowTable)
.where(eq(workflowTable.id, log.workflowId))
.limit(1)
const timestamp = Date.now()
const executionData = (log.executionData || {}) as Record<string, unknown>
const userId = workflowData[0]?.userId
const payload: NotificationPayload = {
id: `evt_${uuidv4()}`,
type: 'workflow.execution.completed',
timestamp,
data: {
workflowId: log.workflowId,
workflowName: workflowData[0]?.name || 'Unknown Workflow',
executionId: log.executionId,
status: log.level === 'error' ? 'error' : 'success',
level: log.level,
trigger: log.trigger,
startedAt: log.startedAt,
endedAt: log.endedAt,
totalDurationMs: log.totalDurationMs,
cost: log.cost as Record<string, unknown>,
},
}
if (subscription.includeFinalOutput && executionData.finalOutput) {
payload.data.finalOutput = executionData.finalOutput
}
if (subscription.includeTraceSpans && executionData.traceSpans) {
payload.data.traceSpans = executionData.traceSpans as unknown[]
}
if (subscription.includeRateLimits && userId) {
try {
const userSubscription = await getHighestPrioritySubscription(userId)
const rateLimiter = new RateLimiter()
const triggerType = log.trigger === 'api' ? 'api' : 'manual'
const [syncStatus, asyncStatus] = await Promise.all([
rateLimiter.getRateLimitStatusWithSubscription(
userId,
userSubscription,
triggerType,
false
),
rateLimiter.getRateLimitStatusWithSubscription(userId, userSubscription, triggerType, true),
])
payload.data.rateLimits = {
sync: {
limit: syncStatus.limit,
remaining: syncStatus.remaining,
resetAt: syncStatus.resetAt.toISOString(),
},
async: {
limit: asyncStatus.limit,
remaining: asyncStatus.remaining,
resetAt: asyncStatus.resetAt.toISOString(),
},
}
} catch (error) {
logger.warn('Failed to fetch rate limits for notification', { error, userId })
}
}
if (subscription.includeUsageData && userId) {
try {
const usageData = await checkUsageStatus(userId)
payload.data.usage = {
currentPeriodCost: usageData.currentUsage,
limit: usageData.limit,
percentUsed: usageData.percentUsed,
isExceeded: usageData.isExceeded,
}
} catch (error) {
logger.warn('Failed to fetch usage data for notification', { error, userId })
}
}
return payload
}
interface WebhookConfig {
url: string
secret?: string
}
interface SlackConfig {
channelId: string
channelName: string
accountId: string
}
async function deliverWebhook(
subscription: typeof workspaceNotificationSubscription.$inferSelect,
payload: NotificationPayload
): Promise<{ success: boolean; status?: number; error?: string }> {
const webhookConfig = subscription.webhookConfig as WebhookConfig | null
if (!webhookConfig?.url) {
return { success: false, error: 'No webhook URL configured' }
}
const body = JSON.stringify(payload)
const deliveryId = `delivery_${uuidv4()}`
const headers: Record<string, string> = {
'Content-Type': 'application/json',
'sim-event': 'workflow.execution.completed',
'sim-timestamp': payload.timestamp.toString(),
'sim-delivery-id': deliveryId,
'Idempotency-Key': deliveryId,
}
if (webhookConfig.secret) {
const { decrypted } = await decryptSecret(webhookConfig.secret)
const signature = generateSignature(decrypted, payload.timestamp, body)
headers['sim-signature'] = `t=${payload.timestamp},v1=${signature}`
}
const controller = new AbortController()
const timeoutId = setTimeout(() => controller.abort(), 30000)
try {
const response = await fetch(webhookConfig.url, {
method: 'POST',
headers,
body,
signal: controller.signal,
})
clearTimeout(timeoutId)
return {
success: response.ok,
status: response.status,
error: response.ok ? undefined : `HTTP ${response.status}`,
}
} catch (error: unknown) {
clearTimeout(timeoutId)
const err = error as Error & { name?: string }
return {
success: false,
error: err.name === 'AbortError' ? 'Request timeout' : err.message,
}
}
}
function formatDuration(ms: number): string {
if (ms < 1000) return `${ms}ms`
if (ms < 60000) return `${(ms / 1000).toFixed(1)}s`
return `${(ms / 60000).toFixed(1)}m`
}
function formatCost(cost?: Record<string, unknown>): string {
if (!cost?.total) return 'N/A'
const total = cost.total as number
return `$${total.toFixed(4)}`
}
function buildLogUrl(workspaceId: string, executionId: string): string {
return `${getBaseUrl()}/workspace/${workspaceId}/logs?search=${encodeURIComponent(executionId)}`
}
function formatAlertReason(alertConfig: AlertConfig): string {
switch (alertConfig.rule) {
case 'consecutive_failures':
return `${alertConfig.consecutiveFailures} consecutive failures detected`
case 'failure_rate':
return `Failure rate exceeded ${alertConfig.failureRatePercent}% over ${alertConfig.windowHours}h`
case 'latency_threshold':
return `Execution exceeded ${Math.round((alertConfig.durationThresholdMs || 0) / 1000)}s duration threshold`
case 'latency_spike':
return `Execution was ${alertConfig.latencySpikePercent}% slower than average`
case 'cost_threshold':
return `Execution cost exceeded $${alertConfig.costThresholdDollars} threshold`
case 'no_activity':
return `No workflow activity detected in ${alertConfig.inactivityHours}h`
case 'error_count':
return `${alertConfig.errorCountThreshold} errors detected in ${alertConfig.windowHours}h window`
default:
return 'Alert condition met'
}
}
function formatJsonForEmail(data: unknown, label: string): string {
if (!data) return ''
const json = JSON.stringify(data, null, 2)
const escapedJson = json.replace(/</g, '&lt;').replace(/>/g, '&gt;')
return `
<div style="margin-top: 20px;">
<h3 style="color: #1a1a1a; font-size: 14px; margin-bottom: 8px;">${label}</h3>
<pre style="background: #f5f5f5; padding: 12px; border-radius: 6px; overflow-x: auto; font-size: 12px; color: #333; white-space: pre-wrap; word-wrap: break-word;">${escapedJson}</pre>
</div>
`
}
async function deliverEmail(
subscription: typeof workspaceNotificationSubscription.$inferSelect,
payload: NotificationPayload,
alertConfig?: AlertConfig
): Promise<{ success: boolean; error?: string }> {
if (!subscription.emailRecipients || subscription.emailRecipients.length === 0) {
return { success: false, error: 'No email recipients configured' }
}
const isError = payload.data.status !== 'success'
const statusText = isError ? 'Error' : 'Success'
const logUrl = buildLogUrl(subscription.workspaceId, payload.data.executionId)
const baseUrl = getBaseUrl()
const alertReason = alertConfig ? formatAlertReason(alertConfig) : null
// Build subject line
const subject = alertReason
? `Alert: ${payload.data.workflowName}`
: isError
? `Error Alert: ${payload.data.workflowName}`
: `Workflow Completed: ${payload.data.workflowName}`
let includedDataHtml = ''
let includedDataText = ''
if (payload.data.finalOutput) {
includedDataHtml += formatJsonForEmail(payload.data.finalOutput, 'Final Output')
includedDataText += `\n\nFinal Output:\n${JSON.stringify(payload.data.finalOutput, null, 2)}`
}
if (
payload.data.traceSpans &&
Array.isArray(payload.data.traceSpans) &&
payload.data.traceSpans.length > 0
) {
includedDataHtml += formatJsonForEmail(payload.data.traceSpans, 'Trace Spans')
includedDataText += `\n\nTrace Spans:\n${JSON.stringify(payload.data.traceSpans, null, 2)}`
}
if (payload.data.rateLimits) {
includedDataHtml += formatJsonForEmail(payload.data.rateLimits, 'Rate Limits')
includedDataText += `\n\nRate Limits:\n${JSON.stringify(payload.data.rateLimits, null, 2)}`
}
if (payload.data.usage) {
includedDataHtml += formatJsonForEmail(payload.data.usage, 'Usage Data')
includedDataText += `\n\nUsage Data:\n${JSON.stringify(payload.data.usage, null, 2)}`
}
const result = await sendEmail({
to: subscription.emailRecipients,
subject,
html: `
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
</head>
<body style="background-color: #f5f5f7; font-family: HelveticaNeue, Helvetica, Arial, sans-serif; margin: 0; padding: 0;">
<div style="max-width: 580px; margin: 30px auto; background-color: #ffffff; border-radius: 5px; overflow: hidden;">
<!-- Header with Logo -->
<div style="padding: 30px 0; text-align: center;">
<img src="${baseUrl}/logo/reverse/text/medium.png" width="114" alt="Sim Studio" style="margin: 0 auto;" />
</div>
<!-- Section Border -->
<div style="display: flex; width: 100%;">
<div style="border-bottom: 1px solid #eeeeee; width: 249px;"></div>
<div style="border-bottom: 1px solid #6F3DFA; width: 102px;"></div>
<div style="border-bottom: 1px solid #eeeeee; width: 249px;"></div>
</div>
<!-- Content -->
<div style="padding: 5px 30px 20px 30px;">
<h2 style="font-size: 20px; color: #333333; margin: 20px 0;">
${alertReason ? 'Alert Triggered' : isError ? 'Workflow Execution Failed' : 'Workflow Execution Completed'}
</h2>
${alertReason ? `<p style="color: #d97706; background: #fef3c7; padding: 12px; border-radius: 6px; margin-bottom: 20px; font-size: 14px;"><strong>Reason:</strong> ${alertReason}</p>` : ''}
<table style="width: 100%; border-collapse: collapse; margin-bottom: 20px;">
<tr style="border-bottom: 1px solid #eee;">
<td style="padding: 12px 0; color: #666; width: 140px;">Workflow</td>
<td style="padding: 12px 0; color: #333; font-weight: 500;">${payload.data.workflowName}</td>
</tr>
<tr style="border-bottom: 1px solid #eee;">
<td style="padding: 12px 0; color: #666;">Status</td>
<td style="padding: 12px 0; color: ${isError ? '#ef4444' : '#22c55e'}; font-weight: 500;">${statusText}</td>
</tr>
<tr style="border-bottom: 1px solid #eee;">
<td style="padding: 12px 0; color: #666;">Trigger</td>
<td style="padding: 12px 0; color: #333;">${payload.data.trigger}</td>
</tr>
<tr style="border-bottom: 1px solid #eee;">
<td style="padding: 12px 0; color: #666;">Duration</td>
<td style="padding: 12px 0; color: #333;">${formatDuration(payload.data.totalDurationMs)}</td>
</tr>
<tr style="border-bottom: 1px solid #eee;">
<td style="padding: 12px 0; color: #666;">Cost</td>
<td style="padding: 12px 0; color: #333;">${formatCost(payload.data.cost)}</td>
</tr>
</table>
<a href="${logUrl}" style="display: inline-block; background-color: #6F3DFA; color: #ffffff; font-weight: bold; font-size: 16px; padding: 12px 30px; border-radius: 5px; text-decoration: none; text-align: center; margin: 20px 0;">
View Execution Log →
</a>
${includedDataHtml}
<p style="font-size: 16px; line-height: 1.5; color: #333333; margin-top: 30px;">
Best regards,<br />
The Sim Team
</p>
</div>
</div>
<!-- Footer -->
<div style="max-width: 580px; margin: 0 auto; padding: 20px 0; text-align: center;">
<p style="font-size: 12px; color: #706a7b; margin: 8px 0 0 0;">
© ${new Date().getFullYear()} Sim Studio, All Rights Reserved
</p>
<p style="font-size: 12px; color: #706a7b; margin: 8px 0 0 0;">
<a href="${baseUrl}/privacy" style="color: #706a7b; text-decoration: underline;">Privacy Policy</a> •
<a href="${baseUrl}/terms" style="color: #706a7b; text-decoration: underline;">Terms of Service</a>
</p>
</div>
</body>
</html>
`,
text: `${subject}\n${alertReason ? `\nReason: ${alertReason}\n` : ''}\nWorkflow: ${payload.data.workflowName}\nStatus: ${statusText}\nTrigger: ${payload.data.trigger}\nDuration: ${formatDuration(payload.data.totalDurationMs)}\nCost: ${formatCost(payload.data.cost)}\n\nView Log: ${logUrl}${includedDataText}`,
emailType: 'notifications',
})
return { success: result.success, error: result.success ? undefined : result.message }
}
async function deliverSlack(
subscription: typeof workspaceNotificationSubscription.$inferSelect,
payload: NotificationPayload,
alertConfig?: AlertConfig
): Promise<{ success: boolean; error?: string }> {
const slackConfig = subscription.slackConfig as SlackConfig | null
if (!slackConfig?.channelId || !slackConfig?.accountId) {
return { success: false, error: 'No Slack channel or account configured' }
}
const [slackAccount] = await db
.select({ accessToken: account.accessToken, userId: account.userId })
.from(account)
.where(eq(account.id, slackConfig.accountId))
.limit(1)
if (!slackAccount?.accessToken) {
return { success: false, error: 'Slack account not found or not connected' }
}
const alertReason = alertConfig ? formatAlertReason(alertConfig) : null
const statusEmoji = alertReason
? ':warning:'
: payload.data.status === 'success'
? ':white_check_mark:'
: ':x:'
const statusColor = alertReason
? '#d97706'
: payload.data.status === 'success'
? '#22c55e'
: '#ef4444'
const logUrl = buildLogUrl(subscription.workspaceId, payload.data.executionId)
const blocks: Array<Record<string, unknown>> = []
if (alertReason) {
blocks.push({
type: 'section',
text: {
type: 'mrkdwn',
text: `*Reason:* ${alertReason}`,
},
})
}
blocks.push(
{
type: 'section',
fields: [
{ type: 'mrkdwn', text: `*Status:*\n${payload.data.status}` },
{ type: 'mrkdwn', text: `*Trigger:*\n${payload.data.trigger}` },
{ type: 'mrkdwn', text: `*Duration:*\n${formatDuration(payload.data.totalDurationMs)}` },
{ type: 'mrkdwn', text: `*Cost:*\n${formatCost(payload.data.cost)}` },
],
},
{
type: 'actions',
elements: [
{
type: 'button',
text: { type: 'plain_text', text: 'View Log →', emoji: true },
url: logUrl,
style: 'primary',
},
],
}
)
if (payload.data.finalOutput) {
const outputStr = JSON.stringify(payload.data.finalOutput, null, 2)
const truncated = outputStr.length > 2900 ? `${outputStr.slice(0, 2900)}...` : outputStr
blocks.push({
type: 'section',
text: {
type: 'mrkdwn',
text: `*Final Output:*\n\`\`\`${truncated}\`\`\``,
},
})
}
if (
payload.data.traceSpans &&
Array.isArray(payload.data.traceSpans) &&
payload.data.traceSpans.length > 0
) {
const spansSummary = (payload.data.traceSpans as TraceSpan[])
.map((span) => {
const status = span.status === 'success' ? '✓' : '✗'
return `${status} ${span.name || 'Unknown'} (${formatDuration(span.duration || 0)})`
})
.join('\n')
blocks.push({
type: 'section',
text: {
type: 'mrkdwn',
text: `*Trace Spans:*\n\`\`\`${spansSummary}\`\`\``,
},
})
}
if (payload.data.rateLimits) {
const limitsStr = JSON.stringify(payload.data.rateLimits, null, 2)
blocks.push({
type: 'section',
text: {
type: 'mrkdwn',
text: `*Rate Limits:*\n\`\`\`${limitsStr}\`\`\``,
},
})
}
if (payload.data.usage) {
const usageStr = JSON.stringify(payload.data.usage, null, 2)
blocks.push({
type: 'section',
text: {
type: 'mrkdwn',
text: `*Usage Data:*\n\`\`\`${usageStr}\`\`\``,
},
})
}
blocks.push({
type: 'context',
elements: [{ type: 'mrkdwn', text: `Execution ID: \`${payload.data.executionId}\`` }],
})
const fallbackText = alertReason
? `⚠️ Alert: ${payload.data.workflowName} - ${alertReason}`
: `${payload.data.status === 'success' ? '✅' : '❌'} Workflow ${payload.data.workflowName}: ${payload.data.status}`
const slackPayload = {
channel: slackConfig.channelId,
attachments: [{ color: statusColor, blocks }],
text: fallbackText,
}
try {
const response = await fetch('https://slack.com/api/chat.postMessage', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${slackAccount.accessToken}`,
},
body: JSON.stringify(slackPayload),
})
const result = await response.json()
return { success: result.ok, error: result.ok ? undefined : result.error }
} catch (error: unknown) {
const err = error as Error
return { success: false, error: err.message }
}
}
async function updateDeliveryStatus(
deliveryId: string,
status: 'success' | 'failed' | 'pending',
error?: string,
responseStatus?: number,
nextAttemptAt?: Date
) {
await db
.update(workspaceNotificationDelivery)
.set({
status,
errorMessage: error || null,
responseStatus: responseStatus || null,
nextAttemptAt: nextAttemptAt || null,
updatedAt: new Date(),
})
.where(eq(workspaceNotificationDelivery.id, deliveryId))
}
export interface NotificationDeliveryParams {
deliveryId: string
subscriptionId: string
notificationType: 'webhook' | 'email' | 'slack'
log: WorkflowExecutionLog
alertConfig?: AlertConfig
}
export async function executeNotificationDelivery(params: NotificationDeliveryParams) {
const { deliveryId, subscriptionId, notificationType, log, alertConfig } = params
try {
const [subscription] = await db
.select()
.from(workspaceNotificationSubscription)
.where(eq(workspaceNotificationSubscription.id, subscriptionId))
.limit(1)
if (!subscription || !subscription.active) {
logger.warn(`Subscription ${subscriptionId} not found or inactive`)
await updateDeliveryStatus(deliveryId, 'failed', 'Subscription not found or inactive')
return
}
const claimed = await db
.update(workspaceNotificationDelivery)
.set({
status: 'in_progress',
attempts: sql`${workspaceNotificationDelivery.attempts} + 1`,
lastAttemptAt: new Date(),
updatedAt: new Date(),
})
.where(
and(
eq(workspaceNotificationDelivery.id, deliveryId),
eq(workspaceNotificationDelivery.status, 'pending'),
or(
isNull(workspaceNotificationDelivery.nextAttemptAt),
lte(workspaceNotificationDelivery.nextAttemptAt, new Date())
)
)
)
.returning({ attempts: workspaceNotificationDelivery.attempts })
if (claimed.length === 0) {
logger.info(`Delivery ${deliveryId} not claimable`)
return
}
const attempts = claimed[0].attempts
const payload = await buildPayload(log, subscription)
let result: { success: boolean; status?: number; error?: string }
switch (notificationType) {
case 'webhook':
result = await deliverWebhook(subscription, payload)
break
case 'email':
result = await deliverEmail(subscription, payload, alertConfig)
break
case 'slack':
result = await deliverSlack(subscription, payload, alertConfig)
break
default:
result = { success: false, error: 'Unknown notification type' }
}
if (result.success) {
await updateDeliveryStatus(deliveryId, 'success', undefined, result.status)
logger.info(`${notificationType} notification delivered successfully`, { deliveryId })
} else {
if (attempts < MAX_ATTEMPTS) {
const retryDelay = getRetryDelayWithJitter(
RETRY_DELAYS[attempts - 1] || RETRY_DELAYS[RETRY_DELAYS.length - 1]
)
const nextAttemptAt = new Date(Date.now() + retryDelay)
await updateDeliveryStatus(
deliveryId,
'pending',
result.error,
result.status,
nextAttemptAt
)
logger.info(
`${notificationType} notification failed, scheduled retry ${attempts}/${MAX_ATTEMPTS}`,
{
deliveryId,
error: result.error,
}
)
} else {
await updateDeliveryStatus(deliveryId, 'failed', result.error, result.status)
logger.error(`${notificationType} notification failed after ${MAX_ATTEMPTS} attempts`, {
deliveryId,
error: result.error,
})
}
}
} catch (error) {
logger.error('Notification delivery failed', { deliveryId, error })
await updateDeliveryStatus(deliveryId, 'failed', 'Internal error')
}
}
export const workspaceNotificationDeliveryTask = task({
id: 'workspace-notification-delivery',
retry: { maxAttempts: 1 },
run: async (params: NotificationDeliveryParams) => executeNotificationDelivery(params),
})

View File

@@ -13,7 +13,7 @@ import {
useState,
} from 'react'
import { cva, type VariantProps } from 'class-variance-authority'
import { ChevronDown, Loader2 } from 'lucide-react'
import { ChevronDown, Loader2, Search } from 'lucide-react'
import { cn } from '@/lib/core/utils/cn'
import { Input } from '../input/input'
import { Popover, PopoverAnchor, PopoverContent, PopoverScrollArea } from '../popover/popover'
@@ -86,6 +86,10 @@ export interface ComboboxProps
error?: string | null
/** Callback when popover open state changes */
onOpenChange?: (open: boolean) => void
/** Enable search input in dropdown (useful for multiselect) */
searchable?: boolean
/** Placeholder for search input */
searchPlaceholder?: string
/** Size variant */
size?: 'default' | 'sm'
/** Dropdown alignment */
@@ -122,6 +126,8 @@ const Combobox = forwardRef<HTMLDivElement, ComboboxProps>(
isLoading = false,
error = null,
onOpenChange,
searchable = false,
searchPlaceholder = 'Search...',
align = 'start',
dropdownWidth = 'trigger',
...props
@@ -130,6 +136,8 @@ const Combobox = forwardRef<HTMLDivElement, ComboboxProps>(
) => {
const [open, setOpen] = useState(false)
const [highlightedIndex, setHighlightedIndex] = useState(-1)
const [searchQuery, setSearchQuery] = useState('')
const searchInputRef = useRef<HTMLInputElement>(null)
const containerRef = useRef<HTMLDivElement>(null)
const dropdownRef = useRef<HTMLDivElement>(null)
const internalInputRef = useRef<HTMLInputElement>(null)
@@ -143,26 +151,38 @@ const Combobox = forwardRef<HTMLDivElement, ComboboxProps>(
)
/**
* Filter options based on current value
* Filter options based on current value or search query
*/
const filteredOptions = useMemo(() => {
if (!filterOptions || !value || !open) return options
let result = options
const currentValue = value.toString().toLowerCase()
// Filter by editable input value
if (filterOptions && value && open) {
const currentValue = value.toString().toLowerCase()
const exactMatch = options.find(
(opt) => opt.value === value || opt.label.toLowerCase() === currentValue
)
if (!exactMatch) {
result = result.filter((option) => {
const label = option.label.toLowerCase()
const optionValue = option.value.toLowerCase()
return label.includes(currentValue) || optionValue.includes(currentValue)
})
}
}
// If value exactly matches an option, show all
const exactMatch = options.find(
(opt) => opt.value === value || opt.label.toLowerCase() === currentValue
)
if (exactMatch) return options
// Filter by search query (for searchable mode)
if (searchable && searchQuery) {
const query = searchQuery.toLowerCase()
result = result.filter((option) => {
const label = option.label.toLowerCase()
const optionValue = option.value.toLowerCase()
return label.includes(query) || optionValue.includes(query)
})
}
// Filter options
return options.filter((option) => {
const label = option.label.toLowerCase()
const optionValue = option.value.toLowerCase()
return label.includes(currentValue) || optionValue.includes(currentValue)
})
}, [options, value, open, filterOptions])
return result
}, [options, value, open, filterOptions, searchable, searchQuery])
/**
* Handles selection of an option
@@ -348,6 +368,7 @@ const Combobox = forwardRef<HTMLDivElement, ComboboxProps>(
open={open}
onOpenChange={(next) => {
setOpen(next)
if (!next) setSearchQuery('')
onOpenChange?.(next)
}}
>
@@ -453,6 +474,9 @@ const Combobox = forwardRef<HTMLDivElement, ComboboxProps>(
style={typeof dropdownWidth === 'number' ? { width: `${dropdownWidth}px` } : undefined}
onOpenAutoFocus={(e) => {
e.preventDefault()
if (searchable) {
setTimeout(() => searchInputRef.current?.focus(), 0)
}
}}
onInteractOutside={(e) => {
// If the user clicks the anchor/trigger while the popover is open,
@@ -464,6 +488,24 @@ const Combobox = forwardRef<HTMLDivElement, ComboboxProps>(
}
}}
>
{searchable && (
<div className='flex items-center px-[8px] py-[6px]'>
<Search className='mr-2 h-[14px] w-[14px] shrink-0 text-[var(--text-muted)]' />
<input
ref={searchInputRef}
className='w-full bg-transparent text-sm text-[var(--text-primary)] placeholder:text-[var(--text-muted)] focus:outline-none'
placeholder={searchPlaceholder}
value={searchQuery}
onChange={(e) => setSearchQuery(e.target.value)}
onKeyDown={(e) => {
if (e.key === 'Escape') {
setOpen(false)
setSearchQuery('')
}
}}
/>
</div>
)}
<PopoverScrollArea
className='!flex-none max-h-48 p-[4px]'
onWheelCapture={(e) => {
@@ -498,7 +540,9 @@ const Combobox = forwardRef<HTMLDivElement, ComboboxProps>(
</div>
) : filteredOptions.length === 0 ? (
<div className='py-[14px] text-center font-medium font-sans text-[var(--text-muted)] text-sm'>
{editable && value ? 'No matching options found' : 'No options available'}
{searchQuery || (editable && value)
? 'No matching options found'
: 'No options available'}
</div>
) : (
<div className='space-y-[2px]'>

View File

@@ -0,0 +1,297 @@
import { useMutation, useQuery, useQueryClient } from '@tanstack/react-query'
import { createLogger } from '@/lib/logs/console/logger'
const logger = createLogger('NotificationQueries')
/**
* Query key factories for notification-related queries
*/
export const notificationKeys = {
all: ['notifications'] as const,
lists: () => [...notificationKeys.all, 'list'] as const,
list: (workspaceId: string | undefined) =>
[...notificationKeys.lists(), workspaceId ?? ''] as const,
details: () => [...notificationKeys.all, 'detail'] as const,
detail: (workspaceId: string, notificationId: string) =>
[...notificationKeys.details(), workspaceId, notificationId] as const,
}
type NotificationType = 'webhook' | 'email' | 'slack'
type LogLevel = 'info' | 'error'
type TriggerType = 'api' | 'webhook' | 'schedule' | 'manual' | 'chat'
type AlertRuleType =
| 'consecutive_failures'
| 'failure_rate'
| 'latency_threshold'
| 'latency_spike'
| 'cost_threshold'
| 'no_activity'
| 'error_count'
interface AlertConfig {
rule: AlertRuleType
consecutiveFailures?: number
failureRatePercent?: number
windowHours?: number
durationThresholdMs?: number
latencySpikePercent?: number
costThresholdDollars?: number
inactivityHours?: number
errorCountThreshold?: number
}
interface WebhookConfig {
url: string
secret?: string
}
interface SlackConfig {
channelId: string
channelName: string
accountId: string
}
export interface NotificationSubscription {
id: string
notificationType: NotificationType
workflowIds: string[]
allWorkflows: boolean
levelFilter: LogLevel[]
triggerFilter: TriggerType[]
includeFinalOutput: boolean
includeTraceSpans: boolean
includeRateLimits: boolean
includeUsageData: boolean
webhookConfig?: WebhookConfig | null
emailRecipients?: string[] | null
slackConfig?: SlackConfig | null
alertConfig?: AlertConfig | null
active: boolean
createdAt: string
updatedAt: string
}
/**
* Fetch notifications for a workspace
*/
async function fetchNotifications(workspaceId: string): Promise<NotificationSubscription[]> {
const response = await fetch(`/api/workspaces/${workspaceId}/notifications`)
if (!response.ok) {
throw new Error('Failed to fetch notifications')
}
const data = await response.json()
return data.data || []
}
/**
* Hook to fetch notifications for a workspace
*/
export function useNotifications(workspaceId?: string) {
return useQuery({
queryKey: notificationKeys.list(workspaceId),
queryFn: () => fetchNotifications(workspaceId!),
enabled: Boolean(workspaceId),
staleTime: 30 * 1000,
})
}
interface CreateNotificationParams {
workspaceId: string
data: {
notificationType: NotificationType
workflowIds: string[]
allWorkflows: boolean
levelFilter: LogLevel[]
triggerFilter: TriggerType[]
includeFinalOutput: boolean
includeTraceSpans: boolean
includeRateLimits: boolean
includeUsageData: boolean
alertConfig?: AlertConfig | null
webhookConfig?: WebhookConfig
emailRecipients?: string[]
slackConfig?: SlackConfig
}
}
/**
* Hook to create a notification
*/
export function useCreateNotification() {
const queryClient = useQueryClient()
return useMutation({
mutationFn: async ({ workspaceId, data }: CreateNotificationParams) => {
const response = await fetch(`/api/workspaces/${workspaceId}/notifications`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(data),
})
if (!response.ok) {
const error = await response.json().catch(() => ({}))
throw new Error(error.error || 'Failed to create notification')
}
return response.json()
},
onSuccess: (_, { workspaceId }) => {
queryClient.invalidateQueries({ queryKey: notificationKeys.list(workspaceId) })
},
onError: (error) => {
logger.error('Failed to create notification', { error })
},
})
}
interface UpdateNotificationParams {
workspaceId: string
notificationId: string
data: Partial<CreateNotificationParams['data']> & { active?: boolean }
}
/**
* Hook to update a notification
*/
export function useUpdateNotification() {
const queryClient = useQueryClient()
return useMutation({
mutationFn: async ({ workspaceId, notificationId, data }: UpdateNotificationParams) => {
const response = await fetch(
`/api/workspaces/${workspaceId}/notifications/${notificationId}`,
{
method: 'PUT',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(data),
}
)
if (!response.ok) {
const error = await response.json().catch(() => ({}))
throw new Error(error.error || 'Failed to update notification')
}
return response.json()
},
onSuccess: (_, { workspaceId }) => {
queryClient.invalidateQueries({ queryKey: notificationKeys.list(workspaceId) })
},
onError: (error) => {
logger.error('Failed to update notification', { error })
},
})
}
/**
* Hook to toggle notification active state with optimistic update
*/
export function useToggleNotificationActive() {
const queryClient = useQueryClient()
return useMutation({
mutationFn: async ({
workspaceId,
notificationId,
active,
}: {
workspaceId: string
notificationId: string
active: boolean
}) => {
const response = await fetch(
`/api/workspaces/${workspaceId}/notifications/${notificationId}`,
{
method: 'PUT',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ active }),
}
)
if (!response.ok) {
throw new Error('Failed to toggle notification')
}
return response.json()
},
onMutate: async ({ workspaceId, notificationId, active }) => {
// Cancel outgoing refetches
await queryClient.cancelQueries({ queryKey: notificationKeys.list(workspaceId) })
// Snapshot previous value
const previousNotifications = queryClient.getQueryData<NotificationSubscription[]>(
notificationKeys.list(workspaceId)
)
// Optimistically update
queryClient.setQueryData<NotificationSubscription[]>(
notificationKeys.list(workspaceId),
(old) => old?.map((n) => (n.id === notificationId ? { ...n, active } : n))
)
return { previousNotifications }
},
onError: (error, { workspaceId }, context) => {
// Rollback on error
if (context?.previousNotifications) {
queryClient.setQueryData(notificationKeys.list(workspaceId), context.previousNotifications)
}
logger.error('Failed to toggle notification', { error })
},
})
}
interface DeleteNotificationParams {
workspaceId: string
notificationId: string
}
/**
* Hook to delete a notification
*/
export function useDeleteNotification() {
const queryClient = useQueryClient()
return useMutation({
mutationFn: async ({ workspaceId, notificationId }: DeleteNotificationParams) => {
const response = await fetch(
`/api/workspaces/${workspaceId}/notifications/${notificationId}`,
{
method: 'DELETE',
}
)
if (!response.ok) {
throw new Error('Failed to delete notification')
}
return response.json()
},
onSuccess: (_, { workspaceId }) => {
queryClient.invalidateQueries({ queryKey: notificationKeys.list(workspaceId) })
},
onError: (error) => {
logger.error('Failed to delete notification', { error })
},
})
}
interface TestNotificationParams {
workspaceId: string
notificationId: string
}
/**
* Hook to test a notification
*/
export function useTestNotification() {
return useMutation({
mutationFn: async ({ workspaceId, notificationId }: TestNotificationParams) => {
const response = await fetch(
`/api/workspaces/${workspaceId}/notifications/${notificationId}/test`,
{ method: 'POST' }
)
if (!response.ok) {
const error = await response.json().catch(() => ({}))
throw new Error(error.error || 'Failed to send test notification')
}
return response.json()
},
onError: (error) => {
logger.error('Failed to test notification', { error })
},
})
}

View File

@@ -0,0 +1,51 @@
import { useCallback, useEffect, useState } from 'react'
interface SlackAccount {
id: string
accountId: string
providerId: string
}
interface UseSlackAccountsResult {
accounts: SlackAccount[]
isLoading: boolean
error: string | null
refetch: () => Promise<void>
}
/**
* Fetches and manages connected Slack accounts for the current user.
* @returns Object containing accounts array, loading state, error state, and refetch function
*/
export function useSlackAccounts(): UseSlackAccountsResult {
const [accounts, setAccounts] = useState<SlackAccount[]>([])
const [isLoading, setIsLoading] = useState(true)
const [error, setError] = useState<string | null>(null)
const fetchAccounts = useCallback(async () => {
try {
setIsLoading(true)
setError(null)
const response = await fetch('/api/auth/accounts?provider=slack')
if (response.ok) {
const data = await response.json()
setAccounts(data.accounts || [])
} else {
const data = await response.json().catch(() => ({}))
setError(data.error || 'Failed to load Slack accounts')
setAccounts([])
}
} catch {
setError('Failed to load Slack accounts')
setAccounts([])
} finally {
setIsLoading(false)
}
}, [])
useEffect(() => {
fetchAccounts()
}, [])
return { accounts, isLoading, error, refetch: fetchAccounts }
}

View File

@@ -1,28 +1,83 @@
import { db } from '@sim/db'
import { workflowLogWebhook, workflowLogWebhookDelivery } from '@sim/db/schema'
import { and, eq } from 'drizzle-orm'
import {
workflow,
workspaceNotificationDelivery,
workspaceNotificationSubscription,
} from '@sim/db/schema'
import { and, eq, or, sql } from 'drizzle-orm'
import { v4 as uuidv4 } from 'uuid'
import { env, isTruthy } from '@/lib/core/config/env'
import { createLogger } from '@/lib/logs/console/logger'
import type { WorkflowExecutionLog } from '@/lib/logs/types'
import { logsWebhookDelivery } from '@/background/logs-webhook-delivery'
import {
type AlertCheckContext,
type AlertConfig,
shouldTriggerAlert,
} from '@/lib/notifications/alert-rules'
import {
executeNotificationDelivery,
workspaceNotificationDeliveryTask,
} from '@/background/workspace-notification-delivery'
const logger = createLogger('LogsEventEmitter')
export async function emitWorkflowExecutionCompleted(log: WorkflowExecutionLog): Promise<void> {
try {
const subscriptions = await db
.select()
.from(workflowLogWebhook)
.where(
and(eq(workflowLogWebhook.workflowId, log.workflowId), eq(workflowLogWebhook.active, true))
)
function prepareLogData(
log: WorkflowExecutionLog,
subscription: {
includeFinalOutput: boolean
includeTraceSpans: boolean
}
) {
const preparedLog = { ...log, executionData: {} as Record<string, unknown> }
if (subscriptions.length === 0) {
return
if (log.executionData) {
const data = log.executionData as Record<string, unknown>
const webhookData: Record<string, unknown> = {}
if (subscription.includeFinalOutput && data.finalOutput) {
webhookData.finalOutput = data.finalOutput
}
if (subscription.includeTraceSpans && data.traceSpans) {
webhookData.traceSpans = data.traceSpans
}
preparedLog.executionData = webhookData
}
return preparedLog
}
export async function emitWorkflowExecutionCompleted(log: WorkflowExecutionLog): Promise<void> {
try {
const workflowData = await db
.select({ workspaceId: workflow.workspaceId })
.from(workflow)
.where(eq(workflow.id, log.workflowId))
.limit(1)
if (workflowData.length === 0 || !workflowData[0].workspaceId) return
const workspaceId = workflowData[0].workspaceId
const subscriptions = await db
.select()
.from(workspaceNotificationSubscription)
.where(
and(
eq(workspaceNotificationSubscription.workspaceId, workspaceId),
eq(workspaceNotificationSubscription.active, true),
or(
eq(workspaceNotificationSubscription.allWorkflows, true),
sql`${log.workflowId} = ANY(${workspaceNotificationSubscription.workflowIds})`
)
)
)
if (subscriptions.length === 0) return
logger.debug(
`Found ${subscriptions.length} active webhook subscriptions for workflow ${log.workflowId}`
`Found ${subscriptions.length} active notification subscriptions for workspace ${workspaceId}`
)
for (const subscription of subscriptions) {
@@ -30,18 +85,42 @@ export async function emitWorkflowExecutionCompleted(log: WorkflowExecutionLog):
const triggerMatches = subscription.triggerFilter?.includes(log.trigger) ?? true
if (!levelMatches || !triggerMatches) {
logger.debug(`Skipping subscription ${subscription.id} due to filter mismatch`, {
level: log.level,
trigger: log.trigger,
levelFilter: subscription.levelFilter,
triggerFilter: subscription.triggerFilter,
})
logger.debug(`Skipping subscription ${subscription.id} due to filter mismatch`)
continue
}
const alertConfig = subscription.alertConfig as AlertConfig | null
if (alertConfig) {
const context: AlertCheckContext = {
workflowId: log.workflowId,
executionId: log.executionId,
status: log.level === 'error' ? 'error' : 'success',
durationMs: log.totalDurationMs || 0,
cost: (log.cost as { total?: number })?.total || 0,
}
const shouldAlert = await shouldTriggerAlert(alertConfig, context, subscription.lastAlertAt)
if (!shouldAlert) {
logger.debug(`Alert condition not met for subscription ${subscription.id}`)
continue
}
await db
.update(workspaceNotificationSubscription)
.set({ lastAlertAt: new Date() })
.where(eq(workspaceNotificationSubscription.id, subscription.id))
logger.info(`Alert triggered for subscription ${subscription.id}`, {
workflowId: log.workflowId,
alertConfig,
})
}
const deliveryId = uuidv4()
await db.insert(workflowLogWebhookDelivery).values({
await db.insert(workspaceNotificationDelivery).values({
id: deliveryId,
subscriptionId: subscription.id,
workflowId: log.workflowId,
@@ -51,45 +130,29 @@ export async function emitWorkflowExecutionCompleted(log: WorkflowExecutionLog):
nextAttemptAt: new Date(),
})
// Prepare the log data based on subscription settings
const webhookLog = {
...log,
executionData: {},
}
const notificationLog = prepareLogData(log, subscription)
// Only include executionData fields that are requested
if (log.executionData) {
const data = log.executionData as any
const webhookData: any = {}
if (subscription.includeFinalOutput && data.finalOutput) {
webhookData.finalOutput = data.finalOutput
}
if (subscription.includeTraceSpans && data.traceSpans) {
webhookData.traceSpans = data.traceSpans
}
// For rate limits and usage, we'll need to fetch them in the webhook delivery
// since they're user-specific and may change
if (subscription.includeRateLimits) {
webhookData.includeRateLimits = true
}
if (subscription.includeUsageData) {
webhookData.includeUsageData = true
}
webhookLog.executionData = webhookData
}
await logsWebhookDelivery.trigger({
const payload = {
deliveryId,
subscriptionId: subscription.id,
log: webhookLog,
})
notificationType: subscription.notificationType,
log: notificationLog,
alertConfig: alertConfig || undefined,
}
logger.info(`Enqueued webhook delivery ${deliveryId} for subscription ${subscription.id}`)
const useTrigger = isTruthy(env.TRIGGER_DEV_ENABLED)
if (useTrigger) {
await workspaceNotificationDeliveryTask.trigger(payload)
logger.info(
`Enqueued ${subscription.notificationType} notification ${deliveryId} via Trigger.dev`
)
} else {
void executeNotificationDelivery(payload).catch((error) => {
logger.error(`Direct notification delivery failed for ${deliveryId}`, { error })
})
logger.info(`Enqueued ${subscription.notificationType} notification ${deliveryId} directly`)
}
}
} catch (error) {
logger.error('Failed to emit workflow execution completed event', {

View File

@@ -0,0 +1,327 @@
import { db } from '@sim/db'
import { workflowExecutionLogs } from '@sim/db/schema'
import { and, avg, count, desc, eq, gte } from 'drizzle-orm'
import { createLogger } from '@/lib/logs/console/logger'
const logger = createLogger('AlertRules')
/**
* Alert rule types supported by the notification system
*/
export type AlertRuleType =
| 'consecutive_failures'
| 'failure_rate'
| 'latency_threshold'
| 'latency_spike'
| 'cost_threshold'
| 'no_activity'
| 'error_count'
/**
* Configuration for alert rules
*/
export interface AlertConfig {
rule: AlertRuleType
consecutiveFailures?: number
failureRatePercent?: number
windowHours?: number
durationThresholdMs?: number
latencySpikePercent?: number
costThresholdDollars?: number
inactivityHours?: number
errorCountThreshold?: number
}
/**
* Metadata for alert rule types
*/
export interface AlertRuleDefinition {
type: AlertRuleType
name: string
description: string
requiredFields: (keyof AlertConfig)[]
defaultValues: Partial<AlertConfig>
}
/**
* Registry of all alert rule definitions
*/
export const ALERT_RULES: Record<AlertRuleType, AlertRuleDefinition> = {
consecutive_failures: {
type: 'consecutive_failures',
name: 'Consecutive Failures',
description: 'Alert after X consecutive failed executions',
requiredFields: ['consecutiveFailures'],
defaultValues: { consecutiveFailures: 3 },
},
failure_rate: {
type: 'failure_rate',
name: 'Failure Rate',
description: 'Alert when failure rate exceeds X% over a time window',
requiredFields: ['failureRatePercent', 'windowHours'],
defaultValues: { failureRatePercent: 50, windowHours: 24 },
},
latency_threshold: {
type: 'latency_threshold',
name: 'Latency Threshold',
description: 'Alert when execution duration exceeds a threshold',
requiredFields: ['durationThresholdMs'],
defaultValues: { durationThresholdMs: 30000 },
},
latency_spike: {
type: 'latency_spike',
name: 'Latency Spike',
description: 'Alert when execution is X% slower than average',
requiredFields: ['latencySpikePercent', 'windowHours'],
defaultValues: { latencySpikePercent: 100, windowHours: 24 },
},
cost_threshold: {
type: 'cost_threshold',
name: 'Cost Threshold',
description: 'Alert when execution cost exceeds a threshold',
requiredFields: ['costThresholdDollars'],
defaultValues: { costThresholdDollars: 1 },
},
no_activity: {
type: 'no_activity',
name: 'No Activity',
description: 'Alert when no executions occur within a time window',
requiredFields: ['inactivityHours'],
defaultValues: { inactivityHours: 24 },
},
error_count: {
type: 'error_count',
name: 'Error Count',
description: 'Alert when error count exceeds threshold within time window',
requiredFields: ['errorCountThreshold', 'windowHours'],
defaultValues: { errorCountThreshold: 10, windowHours: 1 },
},
}
/**
* Cooldown period in hours to prevent alert spam
*/
export const ALERT_COOLDOWN_HOURS = 1
/**
* Minimum executions required for rate-based alerts
*/
export const MIN_EXECUTIONS_FOR_RATE_ALERT = 5
/**
* Validates an alert configuration
*/
export function validateAlertConfig(config: AlertConfig): { valid: boolean; error?: string } {
const definition = ALERT_RULES[config.rule]
if (!definition) {
return { valid: false, error: `Unknown alert rule: ${config.rule}` }
}
for (const field of definition.requiredFields) {
if (config[field] === undefined || config[field] === null) {
return { valid: false, error: `Missing required field: ${field}` }
}
}
return { valid: true }
}
/**
* Checks if a subscription is within its cooldown period
*/
export function isInCooldown(lastAlertAt: Date | null): boolean {
if (!lastAlertAt) return false
const cooldownEnd = new Date(lastAlertAt.getTime() + ALERT_COOLDOWN_HOURS * 60 * 60 * 1000)
return new Date() < cooldownEnd
}
/**
* Context passed to alert check functions
*/
export interface AlertCheckContext {
workflowId: string
executionId: string
status: 'success' | 'error'
durationMs: number
cost: number
}
/**
* Check if consecutive failures threshold is met
*/
async function checkConsecutiveFailures(workflowId: string, threshold: number): Promise<boolean> {
const recentLogs = await db
.select({ level: workflowExecutionLogs.level })
.from(workflowExecutionLogs)
.where(eq(workflowExecutionLogs.workflowId, workflowId))
.orderBy(desc(workflowExecutionLogs.createdAt))
.limit(threshold)
if (recentLogs.length < threshold) return false
return recentLogs.every((log) => log.level === 'error')
}
/**
* Check if failure rate exceeds threshold
*/
async function checkFailureRate(
workflowId: string,
ratePercent: number,
windowHours: number
): Promise<boolean> {
const windowStart = new Date(Date.now() - windowHours * 60 * 60 * 1000)
const logs = await db
.select({
level: workflowExecutionLogs.level,
createdAt: workflowExecutionLogs.createdAt,
})
.from(workflowExecutionLogs)
.where(
and(
eq(workflowExecutionLogs.workflowId, workflowId),
gte(workflowExecutionLogs.createdAt, windowStart)
)
)
.orderBy(workflowExecutionLogs.createdAt)
if (logs.length < MIN_EXECUTIONS_FOR_RATE_ALERT) return false
const oldestLog = logs[0]
if (oldestLog && oldestLog.createdAt > windowStart) {
return false
}
const errorCount = logs.filter((log) => log.level === 'error').length
const failureRate = (errorCount / logs.length) * 100
return failureRate >= ratePercent
}
/**
* Check if execution duration exceeds threshold
*/
function checkLatencyThreshold(durationMs: number, thresholdMs: number): boolean {
return durationMs > thresholdMs
}
/**
* Check if execution duration is significantly above average
*/
async function checkLatencySpike(
workflowId: string,
currentDurationMs: number,
spikePercent: number,
windowHours: number
): Promise<boolean> {
const windowStart = new Date(Date.now() - windowHours * 60 * 60 * 1000)
const result = await db
.select({
avgDuration: avg(workflowExecutionLogs.totalDurationMs),
count: count(),
})
.from(workflowExecutionLogs)
.where(
and(
eq(workflowExecutionLogs.workflowId, workflowId),
gte(workflowExecutionLogs.createdAt, windowStart)
)
)
const avgDuration = result[0]?.avgDuration
const execCount = result[0]?.count || 0
if (!avgDuration || execCount < MIN_EXECUTIONS_FOR_RATE_ALERT) return false
const avgMs = Number(avgDuration)
const threshold = avgMs * (1 + spikePercent / 100)
return currentDurationMs > threshold
}
/**
* Check if execution cost exceeds threshold
*/
function checkCostThreshold(cost: number, thresholdDollars: number): boolean {
return cost > thresholdDollars
}
/**
* Check if error count exceeds threshold within window
*/
async function checkErrorCount(
workflowId: string,
threshold: number,
windowHours: number
): Promise<boolean> {
const windowStart = new Date(Date.now() - windowHours * 60 * 60 * 1000)
const result = await db
.select({ count: count() })
.from(workflowExecutionLogs)
.where(
and(
eq(workflowExecutionLogs.workflowId, workflowId),
eq(workflowExecutionLogs.level, 'error'),
gte(workflowExecutionLogs.createdAt, windowStart)
)
)
const errorCount = result[0]?.count || 0
return errorCount >= threshold
}
/**
* Evaluates if an alert should be triggered based on the configuration
*/
export async function shouldTriggerAlert(
config: AlertConfig,
context: AlertCheckContext,
lastAlertAt: Date | null
): Promise<boolean> {
if (isInCooldown(lastAlertAt)) {
logger.debug('Subscription in cooldown, skipping alert check')
return false
}
const { rule } = config
const { workflowId, status, durationMs, cost } = context
switch (rule) {
case 'consecutive_failures':
if (status !== 'error') return false
return checkConsecutiveFailures(workflowId, config.consecutiveFailures!)
case 'failure_rate':
if (status !== 'error') return false
return checkFailureRate(workflowId, config.failureRatePercent!, config.windowHours!)
case 'latency_threshold':
return checkLatencyThreshold(durationMs, config.durationThresholdMs!)
case 'latency_spike':
return checkLatencySpike(
workflowId,
durationMs,
config.latencySpikePercent!,
config.windowHours!
)
case 'cost_threshold':
return checkCostThreshold(cost, config.costThresholdDollars!)
case 'no_activity':
// no_activity alerts are handled by the hourly polling job, not execution events
return false
case 'error_count':
if (status !== 'error') return false
return checkErrorCount(workflowId, config.errorCountThreshold!, config.windowHours!)
default:
logger.warn(`Unknown alert rule: ${rule}`)
return false
}
}

View File

@@ -0,0 +1,213 @@
import { db } from '@sim/db'
import {
workflow,
workflowExecutionLogs,
workspaceNotificationDelivery,
workspaceNotificationSubscription,
} from '@sim/db/schema'
import { and, eq, gte, sql } from 'drizzle-orm'
import { v4 as uuidv4 } from 'uuid'
import { env, isTruthy } from '@/lib/core/config/env'
import { createLogger } from '@/lib/logs/console/logger'
import {
executeNotificationDelivery,
workspaceNotificationDeliveryTask,
} from '@/background/workspace-notification-delivery'
import type { AlertConfig } from './alert-rules'
import { isInCooldown } from './alert-rules'
const logger = createLogger('InactivityPolling')
interface InactivityCheckResult {
subscriptionId: string
workflowId: string
triggered: boolean
reason?: string
}
/**
* Checks a single workflow for inactivity and triggers notification if needed
*/
async function checkWorkflowInactivity(
subscription: typeof workspaceNotificationSubscription.$inferSelect,
workflowId: string,
alertConfig: AlertConfig
): Promise<InactivityCheckResult> {
const result: InactivityCheckResult = {
subscriptionId: subscription.id,
workflowId,
triggered: false,
}
if (isInCooldown(subscription.lastAlertAt)) {
result.reason = 'in_cooldown'
return result
}
const windowStart = new Date(Date.now() - (alertConfig.inactivityHours || 24) * 60 * 60 * 1000)
const recentLogs = await db
.select({ id: workflowExecutionLogs.id })
.from(workflowExecutionLogs)
.where(
and(
eq(workflowExecutionLogs.workflowId, workflowId),
gte(workflowExecutionLogs.createdAt, windowStart)
)
)
.limit(1)
if (recentLogs.length > 0) {
result.reason = 'has_activity'
return result
}
const [workflowData] = await db
.select({
name: workflow.name,
workspaceId: workflow.workspaceId,
})
.from(workflow)
.where(eq(workflow.id, workflowId))
.limit(1)
if (!workflowData || !workflowData.workspaceId) {
result.reason = 'workflow_not_found'
return result
}
await db
.update(workspaceNotificationSubscription)
.set({ lastAlertAt: new Date() })
.where(eq(workspaceNotificationSubscription.id, subscription.id))
const deliveryId = uuidv4()
await db.insert(workspaceNotificationDelivery).values({
id: deliveryId,
subscriptionId: subscription.id,
workflowId,
executionId: `inactivity_${Date.now()}`,
status: 'pending',
attempts: 0,
nextAttemptAt: new Date(),
})
const now = new Date().toISOString()
const mockLog = {
id: `inactivity_log_${uuidv4()}`,
workflowId,
executionId: `inactivity_${Date.now()}`,
stateSnapshotId: '',
level: 'info' as const,
trigger: 'system' as const,
startedAt: now,
endedAt: now,
totalDurationMs: 0,
executionData: {},
cost: { total: 0 },
workspaceId: workflowData.workspaceId,
createdAt: now,
}
const payload = {
deliveryId,
subscriptionId: subscription.id,
notificationType: subscription.notificationType,
log: mockLog,
alertConfig,
}
const useTrigger = isTruthy(env.TRIGGER_DEV_ENABLED)
if (useTrigger) {
await workspaceNotificationDeliveryTask.trigger(payload)
} else {
void executeNotificationDelivery(payload).catch((error) => {
logger.error(`Direct notification delivery failed for ${deliveryId}`, { error })
})
}
result.triggered = true
result.reason = 'alert_sent'
logger.info(`Inactivity alert triggered for workflow ${workflowId}`, {
subscriptionId: subscription.id,
inactivityHours: alertConfig.inactivityHours,
})
return result
}
/**
* Polls all active no_activity subscriptions and triggers alerts as needed
*/
export async function pollInactivityAlerts(): Promise<{
total: number
triggered: number
skipped: number
details: InactivityCheckResult[]
}> {
logger.info('Starting inactivity alert polling')
const subscriptions = await db
.select()
.from(workspaceNotificationSubscription)
.where(
and(
eq(workspaceNotificationSubscription.active, true),
sql`${workspaceNotificationSubscription.alertConfig}->>'rule' = 'no_activity'`
)
)
if (subscriptions.length === 0) {
logger.info('No active no_activity subscriptions found')
return { total: 0, triggered: 0, skipped: 0, details: [] }
}
logger.info(`Found ${subscriptions.length} no_activity subscriptions to check`)
const results: InactivityCheckResult[] = []
let triggered = 0
let skipped = 0
for (const subscription of subscriptions) {
const alertConfig = subscription.alertConfig as AlertConfig
if (!alertConfig || alertConfig.rule !== 'no_activity') {
continue
}
let workflowIds: string[] = []
if (subscription.allWorkflows) {
const workflows = await db
.select({ id: workflow.id })
.from(workflow)
.where(eq(workflow.workspaceId, subscription.workspaceId))
workflowIds = workflows.map((w) => w.id)
} else {
workflowIds = subscription.workflowIds || []
}
for (const workflowId of workflowIds) {
const result = await checkWorkflowInactivity(subscription, workflowId, alertConfig)
results.push(result)
if (result.triggered) {
triggered++
} else {
skipped++
}
}
}
logger.info(`Inactivity polling completed: ${triggered} alerts triggered, ${skipped} skipped`)
return {
total: results.length,
triggered,
skipped,
details: results,
}
}

View File

@@ -209,10 +209,8 @@ export const useFilterStore = create<FilterState>((set, get) => ({
folderIds,
triggers,
searchQuery,
_isInitializing: false, // Clear the flag after initialization
_isInitializing: false,
})
get().syncWithURL()
},
syncWithURL: () => {

View File

@@ -0,0 +1,96 @@
CREATE TYPE "public"."notification_delivery_status" AS ENUM('pending', 'in_progress', 'success', 'failed');--> statement-breakpoint
CREATE TYPE "public"."notification_type" AS ENUM('webhook', 'email', 'slack');--> statement-breakpoint
CREATE TABLE "workspace_notification_delivery" (
"id" text PRIMARY KEY NOT NULL,
"subscription_id" text NOT NULL,
"workflow_id" text NOT NULL,
"execution_id" text NOT NULL,
"status" "notification_delivery_status" DEFAULT 'pending' NOT NULL,
"attempts" integer DEFAULT 0 NOT NULL,
"last_attempt_at" timestamp,
"next_attempt_at" timestamp,
"response_status" integer,
"response_body" text,
"error_message" text,
"created_at" timestamp DEFAULT now() NOT NULL,
"updated_at" timestamp DEFAULT now() NOT NULL
);
--> statement-breakpoint
CREATE TABLE "workspace_notification_subscription" (
"id" text PRIMARY KEY NOT NULL,
"workspace_id" text NOT NULL,
"notification_type" "notification_type" NOT NULL,
"workflow_ids" text[] DEFAULT '{}'::text[] NOT NULL,
"all_workflows" boolean DEFAULT false NOT NULL,
"level_filter" text[] DEFAULT ARRAY['info', 'error']::text[] NOT NULL,
"trigger_filter" text[] DEFAULT ARRAY['api', 'webhook', 'schedule', 'manual', 'chat']::text[] NOT NULL,
"include_final_output" boolean DEFAULT false NOT NULL,
"include_trace_spans" boolean DEFAULT false NOT NULL,
"include_rate_limits" boolean DEFAULT false NOT NULL,
"include_usage_data" boolean DEFAULT false NOT NULL,
"webhook_config" jsonb,
"email_recipients" text[],
"slack_config" jsonb,
"alert_config" jsonb,
"last_alert_at" timestamp,
"active" boolean DEFAULT true NOT NULL,
"created_by" text NOT NULL,
"created_at" timestamp DEFAULT now() NOT NULL,
"updated_at" timestamp DEFAULT now() NOT NULL
);
--> statement-breakpoint
INSERT INTO "workspace_notification_subscription" (
"id",
"workspace_id",
"notification_type",
"workflow_ids",
"all_workflows",
"level_filter",
"trigger_filter",
"include_final_output",
"include_trace_spans",
"include_rate_limits",
"include_usage_data",
"webhook_config",
"active",
"created_by",
"created_at",
"updated_at"
)
SELECT
wlw.id,
w.workspace_id,
'webhook'::"notification_type",
ARRAY[wlw.workflow_id],
false,
wlw.level_filter,
wlw.trigger_filter,
wlw.include_final_output,
wlw.include_trace_spans,
wlw.include_rate_limits,
wlw.include_usage_data,
jsonb_build_object('url', wlw.url, 'secret', wlw.secret),
wlw.active,
w.user_id,
wlw.created_at,
wlw.updated_at
FROM workflow_log_webhook wlw
JOIN workflow w ON w.id = wlw.workflow_id
WHERE w.workspace_id IS NOT NULL;--> statement-breakpoint
DROP TABLE "workflow_log_webhook_delivery" CASCADE;--> statement-breakpoint
DROP TABLE "workflow_log_webhook" CASCADE;--> statement-breakpoint
ALTER TABLE "workspace_notification_delivery" ADD CONSTRAINT "workspace_notification_delivery_subscription_id_workspace_notification_subscription_id_fk" FOREIGN KEY ("subscription_id") REFERENCES "public"."workspace_notification_subscription"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
ALTER TABLE "workspace_notification_delivery" ADD CONSTRAINT "workspace_notification_delivery_workflow_id_workflow_id_fk" FOREIGN KEY ("workflow_id") REFERENCES "public"."workflow"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
ALTER TABLE "workspace_notification_subscription" ADD CONSTRAINT "workspace_notification_subscription_workspace_id_workspace_id_fk" FOREIGN KEY ("workspace_id") REFERENCES "public"."workspace"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
ALTER TABLE "workspace_notification_subscription" ADD CONSTRAINT "workspace_notification_subscription_created_by_user_id_fk" FOREIGN KEY ("created_by") REFERENCES "public"."user"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
CREATE INDEX "workspace_notification_delivery_subscription_id_idx" ON "workspace_notification_delivery" USING btree ("subscription_id");--> statement-breakpoint
CREATE INDEX "workspace_notification_delivery_execution_id_idx" ON "workspace_notification_delivery" USING btree ("execution_id");--> statement-breakpoint
CREATE INDEX "workspace_notification_delivery_status_idx" ON "workspace_notification_delivery" USING btree ("status");--> statement-breakpoint
CREATE INDEX "workspace_notification_delivery_next_attempt_idx" ON "workspace_notification_delivery" USING btree ("next_attempt_at");--> statement-breakpoint
CREATE INDEX "workspace_notification_workspace_id_idx" ON "workspace_notification_subscription" USING btree ("workspace_id");--> statement-breakpoint
CREATE INDEX "workspace_notification_active_idx" ON "workspace_notification_subscription" USING btree ("active");--> statement-breakpoint
CREATE INDEX "workspace_notification_type_idx" ON "workspace_notification_subscription" USING btree ("notification_type");--> statement-breakpoint
ALTER TABLE "settings" DROP COLUMN "auto_pan";--> statement-breakpoint
ALTER TABLE "settings" DROP COLUMN "console_expanded_by_default";--> statement-breakpoint
ALTER TABLE "settings" DROP COLUMN "show_floating_controls";--> statement-breakpoint
DROP TYPE "public"."webhook_delivery_status";

File diff suppressed because it is too large Load Diff

View File

@@ -806,6 +806,13 @@
"when": 1764477997303,
"tag": "0115_redundant_cerebro",
"breakpoints": true
},
{
"idx": 116,
"version": "7",
"when": 1764820826997,
"tag": "0116_flimsy_shape",
"breakpoints": true
}
]
}

View File

@@ -493,19 +493,25 @@ export const webhook = pgTable(
}
)
export const workflowLogWebhook = pgTable(
'workflow_log_webhook',
export const notificationTypeEnum = pgEnum('notification_type', ['webhook', 'email', 'slack'])
export const notificationDeliveryStatusEnum = pgEnum('notification_delivery_status', [
'pending',
'in_progress',
'success',
'failed',
])
export const workspaceNotificationSubscription = pgTable(
'workspace_notification_subscription',
{
id: text('id').primaryKey(),
workflowId: text('workflow_id')
workspaceId: text('workspace_id')
.notNull()
.references(() => workflow.id, { onDelete: 'cascade' }),
url: text('url').notNull(),
secret: text('secret'),
includeFinalOutput: boolean('include_final_output').notNull().default(false),
includeTraceSpans: boolean('include_trace_spans').notNull().default(false),
includeRateLimits: boolean('include_rate_limits').notNull().default(false),
includeUsageData: boolean('include_usage_data').notNull().default(false),
.references(() => workspace.id, { onDelete: 'cascade' }),
notificationType: notificationTypeEnum('notification_type').notNull(),
workflowIds: text('workflow_ids').array().notNull().default(sql`'{}'::text[]`),
allWorkflows: boolean('all_workflows').notNull().default(false),
levelFilter: text('level_filter')
.array()
.notNull()
@@ -514,35 +520,46 @@ export const workflowLogWebhook = pgTable(
.array()
.notNull()
.default(sql`ARRAY['api', 'webhook', 'schedule', 'manual', 'chat']::text[]`),
includeFinalOutput: boolean('include_final_output').notNull().default(false),
includeTraceSpans: boolean('include_trace_spans').notNull().default(false),
includeRateLimits: boolean('include_rate_limits').notNull().default(false),
includeUsageData: boolean('include_usage_data').notNull().default(false),
// Channel-specific configuration
webhookConfig: jsonb('webhook_config'),
emailRecipients: text('email_recipients').array(),
slackConfig: jsonb('slack_config'),
// Alert rule configuration (if null, sends on every execution)
alertConfig: jsonb('alert_config'),
lastAlertAt: timestamp('last_alert_at'),
active: boolean('active').notNull().default(true),
createdBy: text('created_by')
.notNull()
.references(() => user.id, { onDelete: 'cascade' }),
createdAt: timestamp('created_at').notNull().defaultNow(),
updatedAt: timestamp('updated_at').notNull().defaultNow(),
},
(table) => ({
workflowIdIdx: index('workflow_log_webhook_workflow_id_idx').on(table.workflowId),
activeIdx: index('workflow_log_webhook_active_idx').on(table.active),
workspaceIdIdx: index('workspace_notification_workspace_id_idx').on(table.workspaceId),
activeIdx: index('workspace_notification_active_idx').on(table.active),
typeIdx: index('workspace_notification_type_idx').on(table.notificationType),
})
)
export const webhookDeliveryStatusEnum = pgEnum('webhook_delivery_status', [
'pending',
'in_progress',
'success',
'failed',
])
export const workflowLogWebhookDelivery = pgTable(
'workflow_log_webhook_delivery',
export const workspaceNotificationDelivery = pgTable(
'workspace_notification_delivery',
{
id: text('id').primaryKey(),
subscriptionId: text('subscription_id')
.notNull()
.references(() => workflowLogWebhook.id, { onDelete: 'cascade' }),
.references(() => workspaceNotificationSubscription.id, { onDelete: 'cascade' }),
workflowId: text('workflow_id')
.notNull()
.references(() => workflow.id, { onDelete: 'cascade' }),
executionId: text('execution_id').notNull(),
status: webhookDeliveryStatusEnum('status').notNull().default('pending'),
status: notificationDeliveryStatusEnum('status').notNull().default('pending'),
attempts: integer('attempts').notNull().default(0),
lastAttemptAt: timestamp('last_attempt_at'),
nextAttemptAt: timestamp('next_attempt_at'),
@@ -553,12 +570,14 @@ export const workflowLogWebhookDelivery = pgTable(
updatedAt: timestamp('updated_at').notNull().defaultNow(),
},
(table) => ({
subscriptionIdIdx: index('workflow_log_webhook_delivery_subscription_id_idx').on(
subscriptionIdIdx: index('workspace_notification_delivery_subscription_id_idx').on(
table.subscriptionId
),
executionIdIdx: index('workflow_log_webhook_delivery_execution_id_idx').on(table.executionId),
statusIdx: index('workflow_log_webhook_delivery_status_idx').on(table.status),
nextAttemptIdx: index('workflow_log_webhook_delivery_next_attempt_idx').on(table.nextAttemptAt),
executionIdIdx: index('workspace_notification_delivery_execution_id_idx').on(table.executionId),
statusIdx: index('workspace_notification_delivery_status_idx').on(table.status),
nextAttemptIdx: index('workspace_notification_delivery_next_attempt_idx').on(
table.nextAttemptAt
),
})
)
@@ -570,17 +589,16 @@ export const apiKey = pgTable(
.notNull()
.references(() => user.id, { onDelete: 'cascade' }),
workspaceId: text('workspace_id').references(() => workspace.id, { onDelete: 'cascade' }), // Only set for workspace keys
createdBy: text('created_by').references(() => user.id, { onDelete: 'set null' }), // Who created the workspace key
createdBy: text('created_by').references(() => user.id, { onDelete: 'set null' }),
name: text('name').notNull(),
key: text('key').notNull().unique(),
type: text('type').notNull().default('personal'), // 'personal' or 'workspace'
type: text('type').notNull().default('personal'),
lastUsed: timestamp('last_used'),
createdAt: timestamp('created_at').notNull().defaultNow(),
updatedAt: timestamp('updated_at').notNull().defaultNow(),
expiresAt: timestamp('expires_at'),
},
(table) => ({
// Ensure workspace keys have a workspace_id and personal keys don't
workspaceTypeCheck: check(
'workspace_type_check',
sql`(type = 'workspace' AND workspace_id IS NOT NULL) OR (type = 'personal' AND workspace_id IS NULL)`