feat(logs-api): expose logs as api + can subscribe to workflow execution using webhook url (#1287)

* feat(logs-api): expose logs as api + can subscribe to workflow exection using webhook url

* fix scroll

* Update apps/docs/content/docs/execution/api.mdx

Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>

* fix rate limits

* address greptile comments

* remove unused file

* address more greptile comments

* minor UI changes

* fix atomicity to prevent races

* make search param sensible

---------

Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
This commit is contained in:
Vikhyath Mondreti
2025-09-09 11:34:18 -07:00
committed by GitHub
parent cf4a935575
commit 0785f6e920
24 changed files with 9929 additions and 26 deletions

View File

@@ -0,0 +1,532 @@
---
title: External API
description: Query workflow execution logs and set up webhooks for real-time notifications
---
import { Accordion, Accordions } from 'fumadocs-ui/components/accordion'
import { Callout } from 'fumadocs-ui/components/callout'
import { Tab, Tabs } from 'fumadocs-ui/components/tabs'
import { CodeBlock } from 'fumadocs-ui/components/codeblock'
Sim provides a comprehensive external API for querying workflow execution logs and setting up webhooks for real-time notifications when workflows complete.
## Authentication
All API requests require an API key passed in the `x-api-key` header:
```bash
curl -H "x-api-key: YOUR_API_KEY" \
https://sim.ai/api/v1/logs?workspaceId=YOUR_WORKSPACE_ID
```
You can generate API keys from your user settings in the Sim dashboard.
## Logs API
All API responses include information about your workflow execution limits and usage:
```json
"limits": {
"workflowExecutionRateLimit": {
"sync": {
"limit": 60, // Max sync workflow executions per minute
"remaining": 58, // Remaining sync workflow executions
"resetAt": "..." // When the window resets
},
"async": {
"limit": 60, // Max async workflow executions per minute
"remaining": 59, // Remaining async workflow executions
"resetAt": "..." // When the window resets
}
},
"usage": {
"currentPeriodCost": 1.234, // Current billing period usage in USD
"limit": 10, // Usage limit in USD
"plan": "pro", // Current subscription plan
"isExceeded": false // Whether limit is exceeded
}
}
```
**Note:** The rate limits in the response body are for workflow executions. The rate limits for calling this API endpoint are in the response headers (`X-RateLimit-*`).
### Query Logs
Query workflow execution logs with extensive filtering options.
<Tabs items={['Request', 'Response']}>
<Tab value="Request">
```http
GET /api/v1/logs
```
**Required Parameters:**
- `workspaceId` - Your workspace ID
**Optional Filters:**
- `workflowIds` - Comma-separated workflow IDs
- `folderIds` - Comma-separated folder IDs
- `triggers` - Comma-separated trigger types: `api`, `webhook`, `schedule`, `manual`, `chat`
- `level` - Filter by level: `info`, `error`
- `startDate` - ISO timestamp for date range start
- `endDate` - ISO timestamp for date range end
- `executionId` - Exact execution ID match
- `minDurationMs` - Minimum execution duration in milliseconds
- `maxDurationMs` - Maximum execution duration in milliseconds
- `minCost` - Minimum execution cost
- `maxCost` - Maximum execution cost
- `model` - Filter by AI model used
**Pagination:**
- `limit` - Results per page (default: 100)
- `cursor` - Cursor for next page
- `order` - Sort order: `desc`, `asc` (default: desc)
**Detail Level:**
- `details` - Response detail level: `basic`, `full` (default: basic)
- `includeTraceSpans` - Include trace spans (default: false)
- `includeFinalOutput` - Include final output (default: false)
</Tab>
<Tab value="Response">
```json
{
"data": [
{
"id": "log_abc123",
"workflowId": "wf_xyz789",
"executionId": "exec_def456",
"level": "info",
"trigger": "api",
"startedAt": "2025-01-01T12:34:56.789Z",
"endedAt": "2025-01-01T12:34:57.123Z",
"totalDurationMs": 334,
"cost": {
"total": 0.00234
},
"files": null
}
],
"nextCursor": "eyJzIjoiMjAyNS0wMS0wMVQxMjozNDo1Ni43ODlaIiwiaWQiOiJsb2dfYWJjMTIzIn0",
"limits": {
"workflowExecutionRateLimit": {
"sync": {
"limit": 60,
"remaining": 58,
"resetAt": "2025-01-01T12:35:56.789Z"
},
"async": {
"limit": 60,
"remaining": 59,
"resetAt": "2025-01-01T12:35:56.789Z"
}
},
"usage": {
"currentPeriodCost": 1.234,
"limit": 10,
"plan": "pro",
"isExceeded": false
}
}
}
```
</Tab>
</Tabs>
### Get Log Details
Retrieve detailed information about a specific log entry.
<Tabs items={['Request', 'Response']}>
<Tab value="Request">
```http
GET /api/v1/logs/{id}
```
</Tab>
<Tab value="Response">
```json
{
"data": {
"id": "log_abc123",
"workflowId": "wf_xyz789",
"executionId": "exec_def456",
"level": "info",
"trigger": "api",
"startedAt": "2025-01-01T12:34:56.789Z",
"endedAt": "2025-01-01T12:34:57.123Z",
"totalDurationMs": 334,
"workflow": {
"id": "wf_xyz789",
"name": "My Workflow",
"description": "Process customer data"
},
"executionData": {
"traceSpans": [...],
"finalOutput": {...}
},
"cost": {
"total": 0.00234,
"tokens": {
"prompt": 123,
"completion": 456,
"total": 579
},
"models": {
"gpt-4o": {
"input": 0.001,
"output": 0.00134,
"total": 0.00234,
"tokens": {
"prompt": 123,
"completion": 456,
"total": 579
}
}
}
},
"limits": {
"workflowExecutionRateLimit": {
"sync": {
"limit": 60,
"remaining": 58,
"resetAt": "2025-01-01T12:35:56.789Z"
},
"async": {
"limit": 60,
"remaining": 59,
"resetAt": "2025-01-01T12:35:56.789Z"
}
},
"usage": {
"currentPeriodCost": 1.234,
"limit": 10,
"plan": "pro",
"isExceeded": false
}
}
}
}
```
</Tab>
</Tabs>
### Get Execution Details
Retrieve execution details including the workflow state snapshot.
<Tabs items={['Request', 'Response']}>
<Tab value="Request">
```http
GET /api/v1/logs/executions/{executionId}
```
</Tab>
<Tab value="Response">
```json
{
"executionId": "exec_def456",
"workflowId": "wf_xyz789",
"workflowState": {
"blocks": {...},
"edges": [...],
"loops": {...},
"parallels": {...}
},
"executionMetadata": {
"trigger": "api",
"startedAt": "2025-01-01T12:34:56.789Z",
"endedAt": "2025-01-01T12:34:57.123Z",
"totalDurationMs": 334,
"cost": {...}
}
}
```
</Tab>
</Tabs>
## Webhook Subscriptions
Get real-time notifications when workflow executions complete. Webhooks are configured through the Sim UI in the workflow editor.
### Configuration
Webhooks can be configured for each workflow through the workflow editor UI. Click the webhook icon in the control bar to set up your webhook subscriptions.
**Available Configuration Options:**
- `url`: Your webhook endpoint URL
- `secret`: Optional secret for HMAC signature verification
- `includeFinalOutput`: Include the workflow's final output in the payload
- `includeTraceSpans`: Include detailed execution trace spans
- `includeRateLimits`: Include the workflow owner's rate limit information
- `includeUsageData`: Include the workflow owner's usage and billing data
- `levelFilter`: Array of log levels to receive (`info`, `error`)
- `triggerFilter`: Array of trigger types to receive (`api`, `webhook`, `schedule`, `manual`, `chat`)
- `active`: Enable/disable the webhook subscription
### Webhook Payload
When a workflow execution completes, Sim sends a POST request to your webhook URL:
```json
{
"id": "evt_123",
"type": "workflow.execution.completed",
"timestamp": 1735925767890,
"data": {
"workflowId": "wf_xyz789",
"executionId": "exec_def456",
"status": "success",
"level": "info",
"trigger": "api",
"startedAt": "2025-01-01T12:34:56.789Z",
"endedAt": "2025-01-01T12:34:57.123Z",
"totalDurationMs": 334,
"cost": {
"total": 0.00234,
"tokens": {
"prompt": 123,
"completion": 456,
"total": 579
},
"models": {
"gpt-4o": {
"input": 0.001,
"output": 0.00134,
"total": 0.00234,
"tokens": {
"prompt": 123,
"completion": 456,
"total": 579
}
}
}
},
"files": null,
"finalOutput": {...}, // Only if includeFinalOutput=true
"traceSpans": [...], // Only if includeTraceSpans=true
"rateLimits": {...}, // Only if includeRateLimits=true
"usage": {...} // Only if includeUsageData=true
},
"links": {
"log": "/v1/logs/log_abc123",
"execution": "/v1/logs/executions/exec_def456"
}
}
```
### Webhook Headers
Each webhook request includes these headers:
- `sim-event`: Event type (always `workflow.execution.completed`)
- `sim-timestamp`: Unix timestamp in milliseconds
- `sim-delivery-id`: Unique delivery ID for idempotency
- `sim-signature`: HMAC-SHA256 signature for verification (if secret configured)
- `Idempotency-Key`: Same as delivery ID for duplicate detection
### Signature Verification
If you configure a webhook secret, verify the signature to ensure the webhook is from Sim:
<Tabs items={['Node.js', 'Python']}>
<Tab value="Node.js">
```javascript
import crypto from 'crypto';
function verifyWebhookSignature(body, signature, secret) {
const [timestampPart, signaturePart] = signature.split(',');
const timestamp = timestampPart.replace('t=', '');
const expectedSignature = signaturePart.replace('v1=', '');
const signatureBase = `${timestamp}.${body}`;
const hmac = crypto.createHmac('sha256', secret);
hmac.update(signatureBase);
const computedSignature = hmac.digest('hex');
return computedSignature === expectedSignature;
}
// In your webhook handler
app.post('/webhook', (req, res) => {
const signature = req.headers['sim-signature'];
const body = JSON.stringify(req.body);
if (!verifyWebhookSignature(body, signature, process.env.WEBHOOK_SECRET)) {
return res.status(401).send('Invalid signature');
}
// Process the webhook...
});
```
</Tab>
<Tab value="Python">
```python
import hmac
import hashlib
import json
def verify_webhook_signature(body: str, signature: str, secret: str) -> bool:
timestamp_part, signature_part = signature.split(',')
timestamp = timestamp_part.replace('t=', '')
expected_signature = signature_part.replace('v1=', '')
signature_base = f"{timestamp}.{body}"
computed_signature = hmac.new(
secret.encode(),
signature_base.encode(),
hashlib.sha256
).hexdigest()
return hmac.compare_digest(computed_signature, expected_signature)
# In your webhook handler
@app.route('/webhook', methods=['POST'])
def webhook():
signature = request.headers.get('sim-signature')
body = json.dumps(request.json)
if not verify_webhook_signature(body, signature, os.environ['WEBHOOK_SECRET']):
return 'Invalid signature', 401
# Process the webhook...
```
</Tab>
</Tabs>
### Retry Policy
Failed webhook deliveries are retried with exponential backoff and jitter:
- Maximum attempts: 5
- Retry delays: 5 seconds, 15 seconds, 1 minute, 3 minutes, 10 minutes
- Jitter: Up to 10% additional delay to prevent thundering herd
- Only HTTP 5xx and 429 responses trigger retries
- Deliveries timeout after 30 seconds
<Callout type="info">
Webhook deliveries are processed asynchronously and don't affect workflow execution performance.
</Callout>
## Best Practices
1. **Polling Strategy**: When polling for logs, use cursor-based pagination with `order=asc` and `startDate` to fetch new logs efficiently.
2. **Webhook Security**: Always configure a webhook secret and verify signatures to ensure requests are from Sim.
3. **Idempotency**: Use the `Idempotency-Key` header to detect and handle duplicate webhook deliveries.
4. **Privacy**: By default, `finalOutput` and `traceSpans` are excluded from responses. Only enable these if you need the data and understand the privacy implications.
5. **Rate Limiting**: Implement exponential backoff when you receive 429 responses. Check the `Retry-After` header for the recommended wait time.
## Rate Limiting
The API implements rate limiting to ensure fair usage:
- **Free plan**: 10 requests per minute
- **Pro plan**: 30 requests per minute
- **Team plan**: 60 requests per minute
- **Enterprise plan**: Custom limits
Rate limit information is included in response headers:
- `X-RateLimit-Limit`: Maximum requests per window
- `X-RateLimit-Remaining`: Requests remaining in current window
- `X-RateLimit-Reset`: ISO timestamp when the window resets
## Example: Polling for New Logs
```javascript
let cursor = null;
const workspaceId = 'YOUR_WORKSPACE_ID';
const startDate = new Date().toISOString();
async function pollLogs() {
const params = new URLSearchParams({
workspaceId,
startDate,
order: 'asc',
limit: '100'
});
if (cursor) {
params.append('cursor', cursor);
}
const response = await fetch(
`https://sim.ai/api/v1/logs?${params}`,
{
headers: {
'x-api-key': 'YOUR_API_KEY'
}
}
);
if (response.ok) {
const data = await response.json();
// Process new logs
for (const log of data.data) {
console.log(`New execution: ${log.executionId}`);
}
// Update cursor for next poll
if (data.nextCursor) {
cursor = data.nextCursor;
}
}
}
// Poll every 30 seconds
setInterval(pollLogs, 30000);
```
## Example: Processing Webhooks
```javascript
import express from 'express';
import crypto from 'crypto';
const app = express();
app.use(express.json());
app.post('/sim-webhook', (req, res) => {
// Verify signature
const signature = req.headers['sim-signature'];
const body = JSON.stringify(req.body);
if (!verifyWebhookSignature(body, signature, process.env.WEBHOOK_SECRET)) {
return res.status(401).send('Invalid signature');
}
// Check timestamp to prevent replay attacks
const timestamp = parseInt(req.headers['sim-timestamp']);
const fiveMinutesAgo = Date.now() - (5 * 60 * 1000);
if (timestamp < fiveMinutesAgo) {
return res.status(401).send('Timestamp too old');
}
// Process the webhook
const event = req.body;
switch (event.type) {
case 'workflow.execution.completed':
const { workflowId, executionId, status, cost } = event.data;
if (status === 'error') {
console.error(`Workflow ${workflowId} failed: ${executionId}`);
// Handle error...
} else {
console.log(`Workflow ${workflowId} completed: ${executionId}`);
console.log(`Cost: $${cost.total}`);
// Process successful execution...
}
break;
}
// Return 200 to acknowledge receipt
res.status(200).send('OK');
});
app.listen(3000, () => {
console.log('Webhook server listening on port 3000');
});
```

View File

@@ -1,4 +1,4 @@
{
"title": "Execution",
"pages": ["basics", "advanced"]
"pages": ["basics", "advanced", "api"]
}

View File

@@ -0,0 +1,64 @@
import { eq } from 'drizzle-orm'
import type { NextRequest } from 'next/server'
import { createLogger } from '@/lib/logs/console/logger'
import { db } from '@/db'
import { apiKey as apiKeyTable } from '@/db/schema'
const logger = createLogger('V1Auth')
export interface AuthResult {
authenticated: boolean
userId?: string
error?: string
}
export async function authenticateApiKey(request: NextRequest): Promise<AuthResult> {
const apiKey = request.headers.get('x-api-key')
if (!apiKey) {
return {
authenticated: false,
error: 'API key required',
}
}
try {
const [keyRecord] = await db
.select({
userId: apiKeyTable.userId,
expiresAt: apiKeyTable.expiresAt,
})
.from(apiKeyTable)
.where(eq(apiKeyTable.key, apiKey))
.limit(1)
if (!keyRecord) {
logger.warn('Invalid API key attempted', { keyPrefix: apiKey.slice(0, 8) })
return {
authenticated: false,
error: 'Invalid API key',
}
}
if (keyRecord.expiresAt && keyRecord.expiresAt < new Date()) {
logger.warn('Expired API key attempted', { userId: keyRecord.userId })
return {
authenticated: false,
error: 'API key expired',
}
}
await db.update(apiKeyTable).set({ lastUsed: new Date() }).where(eq(apiKeyTable.key, apiKey))
return {
authenticated: true,
userId: keyRecord.userId,
}
} catch (error) {
logger.error('API key authentication error', { error })
return {
authenticated: false,
error: 'Authentication failed',
}
}
}

View File

@@ -0,0 +1,106 @@
import { and, eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { createLogger } from '@/lib/logs/console/logger'
import { createApiResponse, getUserLimits } from '@/app/api/v1/logs/meta'
import { checkRateLimit, createRateLimitResponse } from '@/app/api/v1/middleware'
import { db } from '@/db'
import { permissions, workflow, workflowExecutionLogs } from '@/db/schema'
const logger = createLogger('V1LogDetailsAPI')
export const revalidate = 0
export async function GET(request: NextRequest, { params }: { params: Promise<{ id: string }> }) {
const requestId = crypto.randomUUID().slice(0, 8)
try {
const rateLimit = await checkRateLimit(request, 'logs-detail')
if (!rateLimit.allowed) {
return createRateLimitResponse(rateLimit)
}
const userId = rateLimit.userId!
const { id } = await params
const rows = await db
.select({
id: workflowExecutionLogs.id,
workflowId: workflowExecutionLogs.workflowId,
executionId: workflowExecutionLogs.executionId,
stateSnapshotId: workflowExecutionLogs.stateSnapshotId,
level: workflowExecutionLogs.level,
trigger: workflowExecutionLogs.trigger,
startedAt: workflowExecutionLogs.startedAt,
endedAt: workflowExecutionLogs.endedAt,
totalDurationMs: workflowExecutionLogs.totalDurationMs,
executionData: workflowExecutionLogs.executionData,
cost: workflowExecutionLogs.cost,
files: workflowExecutionLogs.files,
createdAt: workflowExecutionLogs.createdAt,
workflowName: workflow.name,
workflowDescription: workflow.description,
workflowColor: workflow.color,
workflowFolderId: workflow.folderId,
workflowUserId: workflow.userId,
workflowWorkspaceId: workflow.workspaceId,
workflowCreatedAt: workflow.createdAt,
workflowUpdatedAt: workflow.updatedAt,
})
.from(workflowExecutionLogs)
.innerJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
.innerJoin(
permissions,
and(
eq(permissions.entityType, 'workspace'),
eq(permissions.entityId, workflow.workspaceId),
eq(permissions.userId, userId)
)
)
.where(eq(workflowExecutionLogs.id, id))
.limit(1)
const log = rows[0]
if (!log) {
return NextResponse.json({ error: 'Log not found' }, { status: 404 })
}
const workflowSummary = {
id: log.workflowId,
name: log.workflowName,
description: log.workflowDescription,
color: log.workflowColor,
folderId: log.workflowFolderId,
userId: log.workflowUserId,
workspaceId: log.workflowWorkspaceId,
createdAt: log.workflowCreatedAt,
updatedAt: log.workflowUpdatedAt,
}
const response = {
id: log.id,
workflowId: log.workflowId,
executionId: log.executionId,
level: log.level,
trigger: log.trigger,
startedAt: log.startedAt.toISOString(),
endedAt: log.endedAt?.toISOString() || null,
totalDurationMs: log.totalDurationMs,
files: log.files || undefined,
workflow: workflowSummary,
executionData: log.executionData as any,
cost: log.cost as any,
createdAt: log.createdAt.toISOString(),
}
// Get user's workflow execution limits and usage
const limits = await getUserLimits(userId)
// Create response with limits information
const apiResponse = createApiResponse({ data: response }, limits, rateLimit)
return NextResponse.json(apiResponse.body, { headers: apiResponse.headers })
} catch (error: any) {
logger.error(`[${requestId}] Log details fetch error`, { error: error.message })
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
}
}

View File

@@ -0,0 +1,100 @@
import { and, eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { createLogger } from '@/lib/logs/console/logger'
import { createApiResponse, getUserLimits } from '@/app/api/v1/logs/meta'
import { checkRateLimit, createRateLimitResponse } from '@/app/api/v1/middleware'
import { db } from '@/db'
import {
permissions,
workflow,
workflowExecutionLogs,
workflowExecutionSnapshots,
} from '@/db/schema'
const logger = createLogger('V1ExecutionAPI')
export async function GET(
request: NextRequest,
{ params }: { params: Promise<{ executionId: string }> }
) {
try {
const rateLimit = await checkRateLimit(request, 'logs-detail')
if (!rateLimit.allowed) {
return createRateLimitResponse(rateLimit)
}
const userId = rateLimit.userId!
const { executionId } = await params
logger.debug(`Fetching execution data for: ${executionId}`)
const rows = await db
.select({
log: workflowExecutionLogs,
workflow: workflow,
})
.from(workflowExecutionLogs)
.innerJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
.innerJoin(
permissions,
and(
eq(permissions.entityType, 'workspace'),
eq(permissions.entityId, workflow.workspaceId),
eq(permissions.userId, userId)
)
)
.where(eq(workflowExecutionLogs.executionId, executionId))
.limit(1)
if (rows.length === 0) {
return NextResponse.json({ error: 'Workflow execution not found' }, { status: 404 })
}
const { log: workflowLog } = rows[0]
const [snapshot] = await db
.select()
.from(workflowExecutionSnapshots)
.where(eq(workflowExecutionSnapshots.id, workflowLog.stateSnapshotId))
.limit(1)
if (!snapshot) {
return NextResponse.json({ error: 'Workflow state snapshot not found' }, { status: 404 })
}
const response = {
executionId,
workflowId: workflowLog.workflowId,
workflowState: snapshot.stateData,
executionMetadata: {
trigger: workflowLog.trigger,
startedAt: workflowLog.startedAt.toISOString(),
endedAt: workflowLog.endedAt?.toISOString(),
totalDurationMs: workflowLog.totalDurationMs,
cost: workflowLog.cost || null,
},
}
logger.debug(`Successfully fetched execution data for: ${executionId}`)
logger.debug(
`Workflow state contains ${Object.keys((snapshot.stateData as any)?.blocks || {}).length} blocks`
)
// Get user's workflow execution limits and usage
const limits = await getUserLimits(userId)
// Create response with limits information
const apiResponse = createApiResponse(
{
...response,
},
limits,
rateLimit
)
return NextResponse.json(apiResponse.body, { headers: apiResponse.headers })
} catch (error) {
logger.error('Error fetching execution data:', error)
return NextResponse.json({ error: 'Failed to fetch execution data' }, { status: 500 })
}
}

View File

@@ -0,0 +1,110 @@
import { and, desc, eq, gte, inArray, lte, type SQL, sql } from 'drizzle-orm'
import { workflow, workflowExecutionLogs } from '@/db/schema'
export interface LogFilters {
workspaceId: string
workflowIds?: string[]
folderIds?: string[]
triggers?: string[]
level?: 'info' | 'error'
startDate?: Date
endDate?: Date
executionId?: string
minDurationMs?: number
maxDurationMs?: number
minCost?: number
maxCost?: number
model?: string
cursor?: {
startedAt: string
id: string
}
order?: 'desc' | 'asc'
}
export function buildLogFilters(filters: LogFilters): SQL<unknown> {
const conditions: SQL<unknown>[] = []
// Required: workspace and permissions check
conditions.push(eq(workflow.workspaceId, filters.workspaceId))
// Cursor-based pagination
if (filters.cursor) {
const cursorDate = new Date(filters.cursor.startedAt)
if (filters.order === 'desc') {
conditions.push(
sql`(${workflowExecutionLogs.startedAt}, ${workflowExecutionLogs.id}) < (${cursorDate}, ${filters.cursor.id})`
)
} else {
conditions.push(
sql`(${workflowExecutionLogs.startedAt}, ${workflowExecutionLogs.id}) > (${cursorDate}, ${filters.cursor.id})`
)
}
}
// Workflow IDs filter
if (filters.workflowIds && filters.workflowIds.length > 0) {
conditions.push(inArray(workflow.id, filters.workflowIds))
}
// Folder IDs filter
if (filters.folderIds && filters.folderIds.length > 0) {
conditions.push(inArray(workflow.folderId, filters.folderIds))
}
// Triggers filter
if (filters.triggers && filters.triggers.length > 0 && !filters.triggers.includes('all')) {
conditions.push(inArray(workflowExecutionLogs.trigger, filters.triggers))
}
// Level filter
if (filters.level) {
conditions.push(eq(workflowExecutionLogs.level, filters.level))
}
// Date range filters
if (filters.startDate) {
conditions.push(gte(workflowExecutionLogs.startedAt, filters.startDate))
}
if (filters.endDate) {
conditions.push(lte(workflowExecutionLogs.startedAt, filters.endDate))
}
// Search filter (execution ID)
if (filters.executionId) {
conditions.push(eq(workflowExecutionLogs.executionId, filters.executionId))
}
// Duration filters
if (filters.minDurationMs !== undefined) {
conditions.push(gte(workflowExecutionLogs.totalDurationMs, filters.minDurationMs))
}
if (filters.maxDurationMs !== undefined) {
conditions.push(lte(workflowExecutionLogs.totalDurationMs, filters.maxDurationMs))
}
// Cost filters
if (filters.minCost !== undefined) {
conditions.push(sql`(${workflowExecutionLogs.cost}->>'total')::numeric >= ${filters.minCost}`)
}
if (filters.maxCost !== undefined) {
conditions.push(sql`(${workflowExecutionLogs.cost}->>'total')::numeric <= ${filters.maxCost}`)
}
// Model filter
if (filters.model) {
conditions.push(sql`${workflowExecutionLogs.cost}->>'models' ? ${filters.model}`)
}
// Combine all conditions with AND
return conditions.length > 0 ? and(...conditions)! : sql`true`
}
export function getOrderBy(order: 'desc' | 'asc' = 'desc') {
return order === 'desc'
? desc(workflowExecutionLogs.startedAt)
: sql`${workflowExecutionLogs.startedAt} ASC`
}

View File

@@ -0,0 +1,78 @@
import { checkServerSideUsageLimits } from '@/lib/billing'
import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
import { getEffectiveCurrentPeriodCost } from '@/lib/billing/core/usage'
import { RateLimiter } from '@/services/queue'
export interface UserLimits {
workflowExecutionRateLimit: {
sync: {
limit: number
remaining: number
resetAt: string
}
async: {
limit: number
remaining: number
resetAt: string
}
}
usage: {
currentPeriodCost: number
limit: number
plan: string
isExceeded: boolean
}
}
export async function getUserLimits(userId: string): Promise<UserLimits> {
const [userSubscription, usageCheck, effectiveCost, rateLimiter] = await Promise.all([
getHighestPrioritySubscription(userId),
checkServerSideUsageLimits(userId),
getEffectiveCurrentPeriodCost(userId),
Promise.resolve(new RateLimiter()),
])
const [syncStatus, asyncStatus] = await Promise.all([
rateLimiter.getRateLimitStatusWithSubscription(userId, userSubscription, 'api', false),
rateLimiter.getRateLimitStatusWithSubscription(userId, userSubscription, 'api', true),
])
return {
workflowExecutionRateLimit: {
sync: {
limit: syncStatus.limit,
remaining: syncStatus.remaining,
resetAt: syncStatus.resetAt.toISOString(),
},
async: {
limit: asyncStatus.limit,
remaining: asyncStatus.remaining,
resetAt: asyncStatus.resetAt.toISOString(),
},
},
usage: {
currentPeriodCost: effectiveCost,
limit: usageCheck.limit,
plan: userSubscription?.plan || 'free',
isExceeded: usageCheck.isExceeded,
},
}
}
export function createApiResponse<T>(
data: T,
limits: UserLimits,
apiRateLimit: { limit: number; remaining: number; resetAt: Date }
) {
return {
body: {
...data,
limits,
},
headers: {
'X-RateLimit-Limit': apiRateLimit.limit.toString(),
'X-RateLimit-Remaining': apiRateLimit.remaining.toString(),
'X-RateLimit-Reset': apiRateLimit.resetAt.toISOString(),
},
}
}

View File

@@ -0,0 +1,212 @@
import { and, eq, sql } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { z } from 'zod'
import { createLogger } from '@/lib/logs/console/logger'
import { buildLogFilters, getOrderBy } from '@/app/api/v1/logs/filters'
import { createApiResponse, getUserLimits } from '@/app/api/v1/logs/meta'
import { checkRateLimit, createRateLimitResponse } from '@/app/api/v1/middleware'
import { db } from '@/db'
import { permissions, workflow, workflowExecutionLogs } from '@/db/schema'
const logger = createLogger('V1LogsAPI')
export const dynamic = 'force-dynamic'
export const revalidate = 0
const QueryParamsSchema = z.object({
workspaceId: z.string(),
workflowIds: z.string().optional(),
folderIds: z.string().optional(),
triggers: z.string().optional(),
level: z.enum(['info', 'error']).optional(),
startDate: z.string().optional(),
endDate: z.string().optional(),
executionId: z.string().optional(),
minDurationMs: z.coerce.number().optional(),
maxDurationMs: z.coerce.number().optional(),
minCost: z.coerce.number().optional(),
maxCost: z.coerce.number().optional(),
model: z.string().optional(),
details: z.enum(['basic', 'full']).optional().default('basic'),
includeTraceSpans: z.coerce.boolean().optional().default(false),
includeFinalOutput: z.coerce.boolean().optional().default(false),
limit: z.coerce.number().optional().default(100),
cursor: z.string().optional(),
order: z.enum(['desc', 'asc']).optional().default('desc'),
})
interface CursorData {
startedAt: string
id: string
}
function encodeCursor(data: CursorData): string {
return Buffer.from(JSON.stringify(data)).toString('base64')
}
function decodeCursor(cursor: string): CursorData | null {
try {
return JSON.parse(Buffer.from(cursor, 'base64').toString())
} catch {
return null
}
}
export async function GET(request: NextRequest) {
const requestId = crypto.randomUUID().slice(0, 8)
try {
const rateLimit = await checkRateLimit(request, 'logs')
if (!rateLimit.allowed) {
return createRateLimitResponse(rateLimit)
}
const userId = rateLimit.userId!
const { searchParams } = new URL(request.url)
const rawParams = Object.fromEntries(searchParams.entries())
const validationResult = QueryParamsSchema.safeParse(rawParams)
if (!validationResult.success) {
return NextResponse.json(
{ error: 'Invalid parameters', details: validationResult.error.errors },
{ status: 400 }
)
}
const params = validationResult.data
logger.info(`[${requestId}] Fetching logs for workspace ${params.workspaceId}`, {
userId,
filters: {
workflowIds: params.workflowIds,
triggers: params.triggers,
level: params.level,
},
})
// Build filter conditions
const filters = {
workspaceId: params.workspaceId,
workflowIds: params.workflowIds?.split(',').filter(Boolean),
folderIds: params.folderIds?.split(',').filter(Boolean),
triggers: params.triggers?.split(',').filter(Boolean),
level: params.level,
startDate: params.startDate ? new Date(params.startDate) : undefined,
endDate: params.endDate ? new Date(params.endDate) : undefined,
executionId: params.executionId,
minDurationMs: params.minDurationMs,
maxDurationMs: params.maxDurationMs,
minCost: params.minCost,
maxCost: params.maxCost,
model: params.model,
cursor: params.cursor ? decodeCursor(params.cursor) || undefined : undefined,
order: params.order,
}
const conditions = buildLogFilters(filters)
const orderBy = getOrderBy(params.order)
// Build and execute query
const baseQuery = db
.select({
id: workflowExecutionLogs.id,
workflowId: workflowExecutionLogs.workflowId,
executionId: workflowExecutionLogs.executionId,
level: workflowExecutionLogs.level,
trigger: workflowExecutionLogs.trigger,
startedAt: workflowExecutionLogs.startedAt,
endedAt: workflowExecutionLogs.endedAt,
totalDurationMs: workflowExecutionLogs.totalDurationMs,
cost: workflowExecutionLogs.cost,
files: workflowExecutionLogs.files,
executionData: params.details === 'full' ? workflowExecutionLogs.executionData : sql`null`,
workflowName: workflow.name,
workflowDescription: workflow.description,
})
.from(workflowExecutionLogs)
.innerJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
.innerJoin(
permissions,
and(
eq(permissions.entityType, 'workspace'),
eq(permissions.entityId, params.workspaceId),
eq(permissions.userId, userId)
)
)
const logs = await baseQuery
.where(conditions)
.orderBy(orderBy)
.limit(params.limit + 1)
const hasMore = logs.length > params.limit
const data = logs.slice(0, params.limit)
let nextCursor: string | undefined
if (hasMore && data.length > 0) {
const lastLog = data[data.length - 1]
nextCursor = encodeCursor({
startedAt: lastLog.startedAt.toISOString(),
id: lastLog.id,
})
}
const formattedLogs = data.map((log) => {
const result: any = {
id: log.id,
workflowId: log.workflowId,
executionId: log.executionId,
level: log.level,
trigger: log.trigger,
startedAt: log.startedAt.toISOString(),
endedAt: log.endedAt?.toISOString() || null,
totalDurationMs: log.totalDurationMs,
cost: log.cost ? { total: (log.cost as any).total } : null,
files: log.files || null,
}
if (params.details === 'full') {
result.workflow = {
id: log.workflowId,
name: log.workflowName,
description: log.workflowDescription,
}
if (log.cost) {
result.cost = log.cost
}
if (log.executionData) {
const execData = log.executionData as any
if (params.includeFinalOutput && execData.finalOutput) {
result.finalOutput = execData.finalOutput
}
if (params.includeTraceSpans && execData.traceSpans) {
result.traceSpans = execData.traceSpans
}
}
}
return result
})
// Get user's workflow execution limits and usage
const limits = await getUserLimits(userId)
// Create response with limits information
// The rateLimit object from checkRateLimit is for THIS API endpoint's rate limits
const response = createApiResponse(
{
data: formattedLogs,
nextCursor,
},
limits,
rateLimit // This is the API endpoint rate limit, not workflow execution limits
)
return NextResponse.json(response.body, { headers: response.headers })
} catch (error: any) {
logger.error(`[${requestId}] Logs fetch error`, { error: error.message })
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
}
}

View File

@@ -0,0 +1,108 @@
import { type NextRequest, NextResponse } from 'next/server'
import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
import { createLogger } from '@/lib/logs/console/logger'
import { RateLimiter } from '@/services/queue/RateLimiter'
import { authenticateApiKey } from './auth'
const logger = createLogger('V1Middleware')
const rateLimiter = new RateLimiter()
export interface RateLimitResult {
allowed: boolean
remaining: number
resetAt: Date
limit: number
userId?: string
error?: string
}
export async function checkRateLimit(
request: NextRequest,
endpoint: 'logs' | 'logs-detail' = 'logs'
): Promise<RateLimitResult> {
try {
const auth = await authenticateApiKey(request)
if (!auth.authenticated) {
return {
allowed: false,
remaining: 0,
limit: 10, // Default to free tier limit
resetAt: new Date(),
error: auth.error,
}
}
const userId = auth.userId!
const subscription = await getHighestPrioritySubscription(userId)
// Use api-endpoint trigger type for external API rate limiting
const result = await rateLimiter.checkRateLimitWithSubscription(
userId,
subscription,
'api-endpoint',
false // Not relevant for api-endpoint trigger type
)
if (!result.allowed) {
logger.warn(`Rate limit exceeded for user ${userId}`, {
endpoint,
remaining: result.remaining,
resetAt: result.resetAt,
})
}
// Get the actual rate limit for this user's plan
const rateLimitStatus = await rateLimiter.getRateLimitStatusWithSubscription(
userId,
subscription,
'api-endpoint',
false
)
return {
...result,
limit: rateLimitStatus.limit,
userId,
}
} catch (error) {
logger.error('Rate limit check error', { error })
return {
allowed: false,
remaining: 0,
limit: 10,
resetAt: new Date(Date.now() + 60000),
error: 'Rate limit check failed',
}
}
}
export function createRateLimitResponse(result: RateLimitResult): NextResponse {
const headers = {
'X-RateLimit-Limit': result.limit.toString(),
'X-RateLimit-Remaining': result.remaining.toString(),
'X-RateLimit-Reset': result.resetAt.toISOString(),
}
if (result.error) {
return NextResponse.json({ error: result.error || 'Unauthorized' }, { status: 401, headers })
}
if (!result.allowed) {
return NextResponse.json(
{
error: 'Rate limit exceeded',
message: `API rate limit exceeded. Please retry after ${result.resetAt.toISOString()}`,
retryAfter: result.resetAt.getTime(),
},
{
status: 429,
headers: {
...headers,
'Retry-After': Math.ceil((result.resetAt.getTime() - Date.now()) / 1000).toString(),
},
}
)
}
return NextResponse.json({ error: 'Bad request' }, { status: 400, headers })
}

View File

@@ -0,0 +1,221 @@
import { and, eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { z } from 'zod'
import { getSession } from '@/lib/auth'
import { createLogger } from '@/lib/logs/console/logger'
import { encryptSecret } from '@/lib/utils'
import { db } from '@/db'
import { permissions, workflow, workflowLogWebhook } from '@/db/schema'
const logger = createLogger('WorkflowLogWebhookUpdate')
type WebhookUpdatePayload = Pick<
typeof workflowLogWebhook.$inferInsert,
| 'url'
| 'includeFinalOutput'
| 'includeTraceSpans'
| 'includeRateLimits'
| 'includeUsageData'
| 'levelFilter'
| 'triggerFilter'
| 'secret'
| 'updatedAt'
>
const UpdateWebhookSchema = z.object({
url: z.string().url('Invalid webhook URL'),
secret: z.string().optional(),
includeFinalOutput: z.boolean(),
includeTraceSpans: z.boolean(),
includeRateLimits: z.boolean(),
includeUsageData: z.boolean(),
levelFilter: z.array(z.enum(['info', 'error'])),
triggerFilter: z.array(z.enum(['api', 'webhook', 'schedule', 'manual', 'chat'])),
})
export async function PUT(
request: NextRequest,
{ params }: { params: Promise<{ id: string; webhookId: string }> }
) {
try {
const session = await getSession()
if (!session?.user?.id) {
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const { id: workflowId, webhookId } = await params
const userId = session.user.id
// Check if user has access to the workflow
const hasAccess = await db
.select({ id: workflow.id })
.from(workflow)
.innerJoin(
permissions,
and(
eq(permissions.entityType, 'workspace'),
eq(permissions.entityId, workflow.workspaceId),
eq(permissions.userId, userId)
)
)
.where(eq(workflow.id, workflowId))
.limit(1)
if (hasAccess.length === 0) {
return NextResponse.json({ error: 'Workflow not found' }, { status: 404 })
}
// Check if webhook exists and belongs to this workflow
const existingWebhook = await db
.select()
.from(workflowLogWebhook)
.where(
and(eq(workflowLogWebhook.id, webhookId), eq(workflowLogWebhook.workflowId, workflowId))
)
.limit(1)
if (existingWebhook.length === 0) {
return NextResponse.json({ error: 'Webhook not found' }, { status: 404 })
}
const body = await request.json()
const validationResult = UpdateWebhookSchema.safeParse(body)
if (!validationResult.success) {
return NextResponse.json(
{ error: 'Invalid request', details: validationResult.error.errors },
{ status: 400 }
)
}
const data = validationResult.data
// Check for duplicate URL (excluding current webhook)
const duplicateWebhook = await db
.select({ id: workflowLogWebhook.id })
.from(workflowLogWebhook)
.where(
and(eq(workflowLogWebhook.workflowId, workflowId), eq(workflowLogWebhook.url, data.url))
)
.limit(1)
if (duplicateWebhook.length > 0 && duplicateWebhook[0].id !== webhookId) {
return NextResponse.json(
{ error: 'A webhook with this URL already exists for this workflow' },
{ status: 409 }
)
}
// Prepare update data
const updateData: WebhookUpdatePayload = {
url: data.url,
includeFinalOutput: data.includeFinalOutput,
includeTraceSpans: data.includeTraceSpans,
includeRateLimits: data.includeRateLimits,
includeUsageData: data.includeUsageData,
levelFilter: data.levelFilter,
triggerFilter: data.triggerFilter,
updatedAt: new Date(),
}
// Only update secret if provided
if (data.secret) {
const { encrypted } = await encryptSecret(data.secret)
updateData.secret = encrypted
}
const updatedWebhooks = await db
.update(workflowLogWebhook)
.set(updateData)
.where(eq(workflowLogWebhook.id, webhookId))
.returning()
if (updatedWebhooks.length === 0) {
return NextResponse.json({ error: 'Webhook not found' }, { status: 404 })
}
const updatedWebhook = updatedWebhooks[0]
logger.info('Webhook updated', {
webhookId,
workflowId,
userId,
})
return NextResponse.json({
data: {
id: updatedWebhook.id,
url: updatedWebhook.url,
includeFinalOutput: updatedWebhook.includeFinalOutput,
includeTraceSpans: updatedWebhook.includeTraceSpans,
includeRateLimits: updatedWebhook.includeRateLimits,
includeUsageData: updatedWebhook.includeUsageData,
levelFilter: updatedWebhook.levelFilter,
triggerFilter: updatedWebhook.triggerFilter,
active: updatedWebhook.active,
createdAt: updatedWebhook.createdAt.toISOString(),
updatedAt: updatedWebhook.updatedAt.toISOString(),
},
})
} catch (error) {
logger.error('Failed to update webhook', { error })
return NextResponse.json({ error: 'Failed to update webhook' }, { status: 500 })
}
}
export async function DELETE(
request: NextRequest,
{ params }: { params: Promise<{ id: string; webhookId: string }> }
) {
try {
const session = await getSession()
if (!session?.user?.id) {
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const { id: workflowId, webhookId } = await params
const userId = session.user.id
// Check if user has access to the workflow
const hasAccess = await db
.select({ id: workflow.id })
.from(workflow)
.innerJoin(
permissions,
and(
eq(permissions.entityType, 'workspace'),
eq(permissions.entityId, workflow.workspaceId),
eq(permissions.userId, userId)
)
)
.where(eq(workflow.id, workflowId))
.limit(1)
if (hasAccess.length === 0) {
return NextResponse.json({ error: 'Workflow not found' }, { status: 404 })
}
// Delete the webhook (will cascade delete deliveries)
const deletedWebhook = await db
.delete(workflowLogWebhook)
.where(
and(eq(workflowLogWebhook.id, webhookId), eq(workflowLogWebhook.workflowId, workflowId))
)
.returning()
if (deletedWebhook.length === 0) {
return NextResponse.json({ error: 'Webhook not found' }, { status: 404 })
}
logger.info('Webhook deleted', {
webhookId,
workflowId,
userId,
})
return NextResponse.json({ success: true })
} catch (error) {
logger.error('Failed to delete webhook', { error })
return NextResponse.json({ error: 'Failed to delete webhook' }, { status: 500 })
}
}

View File

@@ -0,0 +1,248 @@
import { and, eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { v4 as uuidv4 } from 'uuid'
import { z } from 'zod'
import { getSession } from '@/lib/auth'
import { createLogger } from '@/lib/logs/console/logger'
import { encryptSecret } from '@/lib/utils'
import { db } from '@/db'
import { permissions, workflow, workflowLogWebhook } from '@/db/schema'
const logger = createLogger('WorkflowLogWebhookAPI')
const CreateWebhookSchema = z.object({
url: z.string().url(),
secret: z.string().optional(),
includeFinalOutput: z.boolean().optional().default(false),
includeTraceSpans: z.boolean().optional().default(false),
includeRateLimits: z.boolean().optional().default(false),
includeUsageData: z.boolean().optional().default(false),
levelFilter: z
.array(z.enum(['info', 'error']))
.optional()
.default(['info', 'error']),
triggerFilter: z
.array(z.enum(['api', 'webhook', 'schedule', 'manual', 'chat']))
.optional()
.default(['api', 'webhook', 'schedule', 'manual', 'chat']),
active: z.boolean().optional().default(true),
})
export async function GET(request: NextRequest, { params }: { params: Promise<{ id: string }> }) {
try {
const session = await getSession()
if (!session?.user?.id) {
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const { id: workflowId } = await params
const userId = session.user.id
const hasAccess = await db
.select({ id: workflow.id })
.from(workflow)
.innerJoin(
permissions,
and(
eq(permissions.entityType, 'workspace'),
eq(permissions.entityId, workflow.workspaceId),
eq(permissions.userId, userId)
)
)
.where(eq(workflow.id, workflowId))
.limit(1)
if (hasAccess.length === 0) {
return NextResponse.json({ error: 'Workflow not found' }, { status: 404 })
}
const webhooks = await db
.select({
id: workflowLogWebhook.id,
url: workflowLogWebhook.url,
includeFinalOutput: workflowLogWebhook.includeFinalOutput,
includeTraceSpans: workflowLogWebhook.includeTraceSpans,
includeRateLimits: workflowLogWebhook.includeRateLimits,
includeUsageData: workflowLogWebhook.includeUsageData,
levelFilter: workflowLogWebhook.levelFilter,
triggerFilter: workflowLogWebhook.triggerFilter,
active: workflowLogWebhook.active,
createdAt: workflowLogWebhook.createdAt,
updatedAt: workflowLogWebhook.updatedAt,
})
.from(workflowLogWebhook)
.where(eq(workflowLogWebhook.workflowId, workflowId))
return NextResponse.json({ data: webhooks })
} catch (error) {
logger.error('Error fetching log webhooks', { error })
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
}
}
export async function POST(request: NextRequest, { params }: { params: Promise<{ id: string }> }) {
try {
const session = await getSession()
if (!session?.user?.id) {
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const { id: workflowId } = await params
const userId = session.user.id
const hasAccess = await db
.select({ id: workflow.id })
.from(workflow)
.innerJoin(
permissions,
and(
eq(permissions.entityType, 'workspace'),
eq(permissions.entityId, workflow.workspaceId),
eq(permissions.userId, userId)
)
)
.where(eq(workflow.id, workflowId))
.limit(1)
if (hasAccess.length === 0) {
return NextResponse.json({ error: 'Workflow not found' }, { status: 404 })
}
const body = await request.json()
const validationResult = CreateWebhookSchema.safeParse(body)
if (!validationResult.success) {
return NextResponse.json(
{ error: 'Invalid request', details: validationResult.error.errors },
{ status: 400 }
)
}
const data = validationResult.data
// Check for duplicate URL
const existingWebhook = await db
.select({ id: workflowLogWebhook.id })
.from(workflowLogWebhook)
.where(
and(eq(workflowLogWebhook.workflowId, workflowId), eq(workflowLogWebhook.url, data.url))
)
.limit(1)
if (existingWebhook.length > 0) {
return NextResponse.json(
{ error: 'A webhook with this URL already exists for this workflow' },
{ status: 409 }
)
}
let encryptedSecret: string | null = null
if (data.secret) {
const { encrypted } = await encryptSecret(data.secret)
encryptedSecret = encrypted
}
const [webhook] = await db
.insert(workflowLogWebhook)
.values({
id: uuidv4(),
workflowId,
url: data.url,
secret: encryptedSecret,
includeFinalOutput: data.includeFinalOutput,
includeTraceSpans: data.includeTraceSpans,
includeRateLimits: data.includeRateLimits,
includeUsageData: data.includeUsageData,
levelFilter: data.levelFilter,
triggerFilter: data.triggerFilter,
active: data.active,
})
.returning()
logger.info('Created log webhook', {
workflowId,
webhookId: webhook.id,
url: data.url,
})
return NextResponse.json({
data: {
id: webhook.id,
url: webhook.url,
includeFinalOutput: webhook.includeFinalOutput,
includeTraceSpans: webhook.includeTraceSpans,
includeRateLimits: webhook.includeRateLimits,
includeUsageData: webhook.includeUsageData,
levelFilter: webhook.levelFilter,
triggerFilter: webhook.triggerFilter,
active: webhook.active,
createdAt: webhook.createdAt,
updatedAt: webhook.updatedAt,
},
})
} catch (error) {
logger.error('Error creating log webhook', { error })
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
}
}
export async function DELETE(
request: NextRequest,
{ params }: { params: Promise<{ id: string }> }
) {
try {
const session = await getSession()
if (!session?.user?.id) {
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const { id: workflowId } = await params
const userId = session.user.id
const { searchParams } = new URL(request.url)
const webhookId = searchParams.get('webhookId')
if (!webhookId) {
return NextResponse.json({ error: 'webhookId is required' }, { status: 400 })
}
const hasAccess = await db
.select({ id: workflow.id })
.from(workflow)
.innerJoin(
permissions,
and(
eq(permissions.entityType, 'workspace'),
eq(permissions.entityId, workflow.workspaceId),
eq(permissions.userId, userId)
)
)
.where(eq(workflow.id, workflowId))
.limit(1)
if (hasAccess.length === 0) {
return NextResponse.json({ error: 'Workflow not found' }, { status: 404 })
}
const deleted = await db
.delete(workflowLogWebhook)
.where(
and(eq(workflowLogWebhook.id, webhookId), eq(workflowLogWebhook.workflowId, workflowId))
)
.returning({ id: workflowLogWebhook.id })
if (deleted.length === 0) {
return NextResponse.json({ error: 'Webhook not found' }, { status: 404 })
}
logger.info('Deleted log webhook', {
workflowId,
webhookId,
})
return NextResponse.json({ success: true })
} catch (error) {
logger.error('Error deleting log webhook', { error })
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
}
}

View File

@@ -0,0 +1,233 @@
import { createHmac } from 'crypto'
import { and, eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { v4 as uuidv4 } from 'uuid'
import { getSession } from '@/lib/auth'
import { createLogger } from '@/lib/logs/console/logger'
import { decryptSecret } from '@/lib/utils'
import { db } from '@/db'
import { permissions, workflow, workflowLogWebhook } from '@/db/schema'
const logger = createLogger('WorkflowLogWebhookTestAPI')
function generateSignature(secret: string, timestamp: number, body: string): string {
const signatureBase = `${timestamp}.${body}`
const hmac = createHmac('sha256', secret)
hmac.update(signatureBase)
return hmac.digest('hex')
}
export async function POST(request: NextRequest, { params }: { params: Promise<{ id: string }> }) {
try {
const session = await getSession()
if (!session?.user?.id) {
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const { id: workflowId } = await params
const userId = session.user.id
const { searchParams } = new URL(request.url)
const webhookId = searchParams.get('webhookId')
if (!webhookId) {
return NextResponse.json({ error: 'webhookId is required' }, { status: 400 })
}
const hasAccess = await db
.select({ id: workflow.id })
.from(workflow)
.innerJoin(
permissions,
and(
eq(permissions.entityType, 'workspace'),
eq(permissions.entityId, workflow.workspaceId),
eq(permissions.userId, userId)
)
)
.where(eq(workflow.id, workflowId))
.limit(1)
if (hasAccess.length === 0) {
return NextResponse.json({ error: 'Workflow not found' }, { status: 404 })
}
const [webhook] = await db
.select()
.from(workflowLogWebhook)
.where(
and(eq(workflowLogWebhook.id, webhookId), eq(workflowLogWebhook.workflowId, workflowId))
)
.limit(1)
if (!webhook) {
return NextResponse.json({ error: 'Webhook not found' }, { status: 404 })
}
const timestamp = Date.now()
const eventId = `evt_test_${uuidv4()}`
const executionId = `exec_test_${uuidv4()}`
const logId = `log_test_${uuidv4()}`
const payload = {
id: eventId,
type: 'workflow.execution.completed',
timestamp,
data: {
workflowId,
executionId,
status: 'success',
level: 'info',
trigger: 'manual',
startedAt: new Date(timestamp - 5000).toISOString(),
endedAt: new Date(timestamp).toISOString(),
totalDurationMs: 5000,
cost: {
total: 0.00123,
tokens: { prompt: 100, completion: 50, total: 150 },
models: {
'gpt-4o': {
input: 0.001,
output: 0.00023,
total: 0.00123,
tokens: { prompt: 100, completion: 50, total: 150 },
},
},
},
files: null,
},
links: {
log: `/v1/logs/${logId}`,
execution: `/v1/logs/executions/${executionId}`,
},
}
if (webhook.includeFinalOutput) {
;(payload.data as any).finalOutput = {
message: 'This is a test webhook delivery',
test: true,
}
}
if (webhook.includeTraceSpans) {
;(payload.data as any).traceSpans = [
{
id: 'span_test_1',
name: 'Test Block',
type: 'block',
status: 'success',
startTime: new Date(timestamp - 5000).toISOString(),
endTime: new Date(timestamp).toISOString(),
duration: 5000,
},
]
}
if (webhook.includeRateLimits) {
;(payload.data as any).rateLimits = {
workflowExecutionRateLimit: {
sync: {
limit: 60,
remaining: 45,
resetAt: new Date(timestamp + 60000).toISOString(),
},
async: {
limit: 60,
remaining: 50,
resetAt: new Date(timestamp + 60000).toISOString(),
},
},
}
}
if (webhook.includeUsageData) {
;(payload.data as any).usage = {
currentPeriodCost: 2.45,
limit: 10,
plan: 'pro',
isExceeded: false,
}
}
const body = JSON.stringify(payload)
const headers: Record<string, string> = {
'Content-Type': 'application/json',
'sim-event': 'workflow.execution.completed',
'sim-timestamp': timestamp.toString(),
'sim-delivery-id': `delivery_test_${uuidv4()}`,
'Idempotency-Key': `delivery_test_${uuidv4()}`,
}
if (webhook.secret) {
const { decrypted } = await decryptSecret(webhook.secret)
const signature = generateSignature(decrypted, timestamp, body)
headers['sim-signature'] = `t=${timestamp},v1=${signature}`
}
logger.info(`Sending test webhook to ${webhook.url}`, { workflowId, webhookId })
const controller = new AbortController()
const timeoutId = setTimeout(() => controller.abort(), 10000)
try {
const response = await fetch(webhook.url, {
method: 'POST',
headers,
body,
signal: controller.signal,
})
clearTimeout(timeoutId)
const responseBody = await response.text().catch(() => '')
const truncatedBody = responseBody.slice(0, 500)
const result = {
success: response.ok,
status: response.status,
statusText: response.statusText,
headers: Object.fromEntries(response.headers.entries()),
body: truncatedBody,
timestamp: new Date().toISOString(),
}
logger.info(`Test webhook completed`, {
workflowId,
webhookId,
status: response.status,
success: response.ok,
})
return NextResponse.json({ data: result })
} catch (error: any) {
clearTimeout(timeoutId)
if (error.name === 'AbortError') {
logger.error(`Test webhook timed out`, { workflowId, webhookId })
return NextResponse.json({
data: {
success: false,
error: 'Request timeout after 10 seconds',
timestamp: new Date().toISOString(),
},
})
}
logger.error(`Test webhook failed`, {
workflowId,
webhookId,
error: error.message,
})
return NextResponse.json({
data: {
success: false,
error: error.message,
timestamp: new Date().toISOString(),
},
})
}
} catch (error) {
logger.error('Error testing webhook', { error })
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
}
}

View File

@@ -3,3 +3,4 @@ export { DeploymentControls } from './deployment-controls/deployment-controls'
export { ExportControls } from './export-controls/export-controls'
export { TemplateModal } from './template-modal/template-modal'
export { UserAvatarStack } from './user-avatar-stack/user-avatar-stack'
export { WebhookSettings } from './webhook-settings/webhook-settings'

View File

@@ -0,0 +1,799 @@
'use client'
import { useEffect, useState } from 'react'
import {
AlertCircle,
Bell,
Copy,
Edit2,
Eye,
EyeOff,
Plus,
RefreshCw,
Trash2,
Webhook,
} from 'lucide-react'
import { toast } from 'sonner'
import {
Button,
Card,
CardContent,
CardDescription,
CardHeader,
CardTitle,
Checkbox,
Dialog,
DialogContent,
DialogDescription,
DialogHeader,
DialogTitle,
Input,
Label,
Separator,
Tabs,
TabsContent,
TabsList,
TabsTrigger,
Tooltip,
TooltipContent,
TooltipTrigger,
} from '@/components/ui'
import { createLogger } from '@/lib/logs/console/logger'
import type {
LogLevel as StoreLogLevel,
TriggerType as StoreTriggerType,
} from '@/stores/logs/filters/types'
const logger = createLogger('WebhookSettings')
type NotificationLogLevel = Exclude<StoreLogLevel, 'all'>
type NotificationTrigger = Exclude<StoreTriggerType, 'all'>
interface WebhookConfig {
id: string
url: string
includeFinalOutput: boolean
includeTraceSpans: boolean
includeRateLimits: boolean
includeUsageData: boolean
levelFilter: NotificationLogLevel[]
triggerFilter: NotificationTrigger[]
active: boolean
createdAt: string
updatedAt: string
}
interface WebhookSettingsProps {
workflowId: string
open: boolean
onOpenChange: (open: boolean) => void
}
export function WebhookSettings({ workflowId, open, onOpenChange }: WebhookSettingsProps) {
const [webhooks, setWebhooks] = useState<WebhookConfig[]>([])
const [isLoading, setIsLoading] = useState(true)
const [isCreating, setIsCreating] = useState(false)
const [isTesting, setIsTesting] = useState<string | null>(null)
const [showSecret, setShowSecret] = useState(false)
const [editingWebhookId, setEditingWebhookId] = useState<string | null>(null)
const [activeTab, setActiveTab] = useState<string>('webhooks')
interface EditableWebhookPayload {
url: string
secret: string
includeFinalOutput: boolean
includeTraceSpans: boolean
includeRateLimits: boolean
includeUsageData: boolean
levelFilter: NotificationLogLevel[]
triggerFilter: NotificationTrigger[]
}
const [newWebhook, setNewWebhook] = useState<EditableWebhookPayload>({
url: '',
secret: '',
includeFinalOutput: false,
includeTraceSpans: false,
includeRateLimits: false,
includeUsageData: false,
levelFilter: ['info', 'error'],
triggerFilter: ['api', 'webhook', 'schedule', 'manual', 'chat'],
})
const [formError, setFormError] = useState<string | null>(null)
useEffect(() => {
if (open) {
loadWebhooks()
}
}, [open, workflowId])
const loadWebhooks = async () => {
try {
setIsLoading(true)
const response = await fetch(`/api/workflows/${workflowId}/log-webhook`)
if (response.ok) {
const data = await response.json()
setWebhooks(data.data || [])
}
} catch (error) {
logger.error('Failed to load webhooks', { error })
toast.error('Failed to load webhook configurations')
} finally {
setIsLoading(false)
}
}
const createWebhook = async () => {
setFormError(null) // Clear any previous errors
if (!newWebhook.url) {
setFormError('Please enter a webhook URL')
return
}
// Validate URL format
try {
const url = new URL(newWebhook.url)
if (!['http:', 'https:'].includes(url.protocol)) {
setFormError('URL must start with http:// or https://')
return
}
} catch {
setFormError('Please enter a valid URL (e.g., https://example.com/webhook)')
return
}
// Validate filters are not empty
if (newWebhook.levelFilter.length === 0) {
setFormError('Please select at least one log level filter')
return
}
if (newWebhook.triggerFilter.length === 0) {
setFormError('Please select at least one trigger filter')
return
}
// Check for duplicate URL
const existingWebhook = webhooks.find((w) => w.url === newWebhook.url)
if (existingWebhook) {
setFormError('A webhook with this URL already exists')
return
}
try {
setIsCreating(true)
setFormError(null)
const response = await fetch(`/api/workflows/${workflowId}/log-webhook`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(newWebhook),
})
if (response.ok) {
// Refresh the webhooks list to ensure consistency and avoid duplicates
await loadWebhooks()
setNewWebhook({
url: '',
secret: '',
includeFinalOutput: false,
includeTraceSpans: false,
includeRateLimits: false,
includeUsageData: false,
levelFilter: ['info', 'error'],
triggerFilter: ['api', 'webhook', 'schedule', 'manual', 'chat'],
})
setFormError(null)
toast.success('Webhook created successfully')
} else {
const error = await response.json()
// Show detailed validation errors if available
if (error.details && Array.isArray(error.details)) {
const errorMessages = error.details.map((e: any) => e.message || e.path?.join('.'))
setFormError(`Validation failed: ${errorMessages.join(', ')}`)
} else {
setFormError(error.error || 'Failed to create webhook')
}
}
} catch (error) {
logger.error('Failed to create webhook', { error })
setFormError('Failed to create webhook. Please try again.')
} finally {
setIsCreating(false)
}
}
const deleteWebhook = async (webhookId: string) => {
try {
const response = await fetch(
`/api/workflows/${workflowId}/log-webhook?webhookId=${webhookId}`,
{
method: 'DELETE',
}
)
if (response.ok) {
// Refresh the webhooks list to ensure consistency
await loadWebhooks()
toast.success('Webhook deleted')
} else {
toast.error('Failed to delete webhook')
}
} catch (error) {
logger.error('Failed to delete webhook', { error })
toast.error('Failed to delete webhook')
}
}
const testWebhook = async (webhookId: string) => {
try {
setIsTesting(webhookId)
const response = await fetch(
`/api/workflows/${workflowId}/log-webhook/test?webhookId=${webhookId}`,
{
method: 'POST',
}
)
if (response.ok) {
const data = await response.json()
if (data.data.success) {
toast.success(`Test webhook sent successfully (${data.data.status})`)
} else {
toast.error(`Test webhook failed: ${data.data.error || data.data.statusText}`)
}
} else {
toast.error('Failed to send test webhook')
}
} catch (error) {
logger.error('Failed to test webhook', { error })
toast.error('Failed to test webhook')
} finally {
setIsTesting(null)
}
}
const copyWebhookId = (id: string) => {
navigator.clipboard.writeText(id)
toast.success('Webhook ID copied')
}
const startEditWebhook = (webhook: WebhookConfig) => {
setEditingWebhookId(webhook.id)
setNewWebhook({
url: webhook.url,
secret: '', // Don't expose the existing secret
includeFinalOutput: webhook.includeFinalOutput,
includeTraceSpans: webhook.includeTraceSpans,
includeRateLimits: webhook.includeRateLimits || false,
includeUsageData: webhook.includeUsageData || false,
levelFilter: webhook.levelFilter,
triggerFilter: webhook.triggerFilter,
})
}
const cancelEdit = () => {
setEditingWebhookId(null)
setFormError(null)
setNewWebhook({
url: '',
secret: '',
includeFinalOutput: false,
includeTraceSpans: false,
includeRateLimits: false,
includeUsageData: false,
levelFilter: ['info', 'error'],
triggerFilter: ['api', 'webhook', 'schedule', 'manual', 'chat'],
})
}
const updateWebhook = async () => {
if (!editingWebhookId) return
// Validate URL format
try {
const url = new URL(newWebhook.url)
if (!['http:', 'https:'].includes(url.protocol)) {
toast.error('URL must start with http:// or https://')
return
}
} catch {
toast.error('Please enter a valid URL (e.g., https://example.com/webhook)')
return
}
// Validate filters are not empty
if (newWebhook.levelFilter.length === 0) {
toast.error('Please select at least one log level filter')
return
}
if (newWebhook.triggerFilter.length === 0) {
toast.error('Please select at least one trigger filter')
return
}
// Check for duplicate URL (excluding current webhook)
const existingWebhook = webhooks.find(
(w) => w.url === newWebhook.url && w.id !== editingWebhookId
)
if (existingWebhook) {
toast.error('A webhook with this URL already exists')
return
}
try {
setIsCreating(true)
interface UpdateWebhookPayload {
url: string
includeFinalOutput: boolean
includeTraceSpans: boolean
includeRateLimits: boolean
includeUsageData: boolean
levelFilter: NotificationLogLevel[]
triggerFilter: NotificationTrigger[]
secret?: string
active?: boolean
}
let updateData: UpdateWebhookPayload = {
url: newWebhook.url,
includeFinalOutput: newWebhook.includeFinalOutput,
includeTraceSpans: newWebhook.includeTraceSpans,
includeRateLimits: newWebhook.includeRateLimits,
includeUsageData: newWebhook.includeUsageData,
levelFilter: newWebhook.levelFilter,
triggerFilter: newWebhook.triggerFilter,
}
// Only include secret if it was changed
if (newWebhook.secret) {
updateData = { ...updateData, secret: newWebhook.secret }
}
const response = await fetch(`/api/workflows/${workflowId}/log-webhook/${editingWebhookId}`, {
method: 'PUT',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(updateData),
})
if (response.ok) {
await loadWebhooks()
cancelEdit()
toast.success('Webhook updated successfully')
} else {
const error = await response.json()
toast.error(error.error || 'Failed to update webhook')
}
} catch (error) {
logger.error('Failed to update webhook', { error })
toast.error('Failed to update webhook')
} finally {
setIsCreating(false)
}
}
return (
<Dialog open={open} onOpenChange={onOpenChange}>
<DialogContent className='flex max-h-[85vh] max-w-3xl flex-col'>
<DialogHeader>
<DialogTitle className='flex items-center gap-2'>
<Webhook className='h-5 w-5' />
Webhook Notifications
</DialogTitle>
<DialogDescription>
Configure webhooks to receive notifications when workflow executions complete
</DialogDescription>
</DialogHeader>
<Tabs
value={editingWebhookId ? 'create' : activeTab}
className='mt-4 flex min-h-0 flex-1 flex-col'
onValueChange={(value) => {
setActiveTab(value)
setFormError(null) // Clear any form errors when switching tabs
if (value === 'webhooks') {
loadWebhooks()
cancelEdit() // Cancel any ongoing edit
}
}}
>
<TabsList className='grid w-full grid-cols-2'>
<TabsTrigger value='webhooks'>Active Webhooks</TabsTrigger>
<TabsTrigger value='create'>
{editingWebhookId ? 'Edit Webhook' : 'Create New'}
</TabsTrigger>
</TabsList>
<TabsContent value='webhooks' className='flex min-h-0 flex-1 flex-col overflow-hidden'>
<div className='min-h-[600px] flex-1 overflow-y-auto px-4'>
{isLoading ? (
<div className='flex h-full items-center justify-center'>
<RefreshCw className='h-5 w-5 animate-spin text-muted-foreground' />
</div>
) : webhooks.length === 0 ? (
<div className='flex h-full items-center justify-center'>
<Card className='w-full'>
<CardContent className='flex flex-col items-center justify-center py-8'>
<Bell className='mb-3 h-8 w-8 text-muted-foreground' />
<p className='text-center text-muted-foreground text-sm'>
No webhooks configured yet
</p>
<p className='text-center text-muted-foreground text-xs'>
Create a webhook to receive execution notifications
</p>
</CardContent>
</Card>
</div>
) : (
<div className='space-y-3'>
{webhooks.map((webhook) => (
<Card key={webhook.id}>
<CardHeader className='pb-3'>
<div className='flex items-start justify-between'>
<div className='flex-1'>
<CardTitle className='font-mono text-sm'>{webhook.url}</CardTitle>
<CardDescription className='mt-1 flex items-center gap-4 text-xs'>
<span className='flex items-center gap-1'>
<span
className={`h-2 w-2 rounded-full ${
webhook.active ? 'bg-green-500' : 'bg-gray-400'
}`}
/>
{webhook.active ? 'Active' : 'Inactive'}
</span>
<span>
Created {new Date(webhook.createdAt).toLocaleDateString()}
</span>
</CardDescription>
</div>
<div className='flex gap-1'>
<Tooltip>
<TooltipTrigger asChild>
<Button
variant='ghost'
size='sm'
onClick={() => copyWebhookId(webhook.id)}
>
<Copy className='h-4 w-4' />
</Button>
</TooltipTrigger>
<TooltipContent>Copy Webhook ID</TooltipContent>
</Tooltip>
<Tooltip>
<TooltipTrigger asChild>
<Button
variant='ghost'
size='sm'
onClick={() => testWebhook(webhook.id)}
disabled={isTesting === webhook.id}
>
{isTesting === webhook.id ? (
<RefreshCw className='h-4 w-4 animate-spin' />
) : (
<Bell className='h-4 w-4' />
)}
</Button>
</TooltipTrigger>
<TooltipContent>
{isTesting === webhook.id ? 'Testing...' : 'Test Webhook'}
</TooltipContent>
</Tooltip>
<Tooltip>
<TooltipTrigger asChild>
<Button
variant='ghost'
size='sm'
onClick={() => startEditWebhook(webhook)}
>
<Edit2 className='h-4 w-4' />
</Button>
</TooltipTrigger>
<TooltipContent>Edit Webhook</TooltipContent>
</Tooltip>
<Tooltip>
<TooltipTrigger asChild>
<Button
variant='ghost'
size='sm'
onClick={() => deleteWebhook(webhook.id)}
>
<Trash2 className='h-4 w-4' />
</Button>
</TooltipTrigger>
<TooltipContent>Delete Webhook</TooltipContent>
</Tooltip>
</div>
</div>
</CardHeader>
<CardContent className='space-y-3 text-xs'>
<div className='flex gap-6'>
<div>
<span className='text-muted-foreground'>Levels:</span>{' '}
{webhook.levelFilter.join(', ')}
</div>
<div>
<span className='text-muted-foreground'>Triggers:</span>{' '}
{webhook.triggerFilter.join(', ')}
</div>
</div>
<div className='flex flex-wrap gap-x-6 gap-y-2.5'>
<div className='flex items-center gap-1'>
<Checkbox checked={webhook.includeFinalOutput} disabled />
<span className='text-muted-foreground'>Include output</span>
</div>
<div className='flex items-center gap-1'>
<Checkbox checked={webhook.includeTraceSpans} disabled />
<span className='text-muted-foreground'>Include trace spans</span>
</div>
<div className='flex items-center gap-1'>
<Checkbox checked={webhook.includeUsageData} disabled />
<span className='text-muted-foreground'>Include usage data</span>
</div>
<div className='flex items-center gap-1'>
<Checkbox checked={webhook.includeRateLimits} disabled />
<span className='text-muted-foreground'>Include rate limits</span>
</div>
</div>
</CardContent>
</Card>
))}
</div>
)}
</div>
</TabsContent>
<TabsContent value='create' className='flex min-h-0 flex-1 flex-col overflow-hidden'>
<div className='flex-1 overflow-y-auto px-4'>
{formError && (
<div className='mb-4 rounded-md border border-red-200 bg-red-50 p-3'>
<div className='flex items-start gap-2'>
<AlertCircle className='mt-0.5 h-4 w-4 shrink-0 text-red-600' />
<p className='text-red-800 text-sm'>{formError}</p>
</div>
</div>
)}
<div className='space-y-4 pb-6'>
<div>
<Label htmlFor='url'>Webhook URL</Label>
<Input
id='url'
type='url'
placeholder='https://your-app.com/webhook'
value={newWebhook.url}
onChange={(e) => {
setNewWebhook({ ...newWebhook, url: e.target.value })
setFormError(null) // Clear error when user types
}}
className='mt-1.5'
/>
</div>
<div>
<Label htmlFor='secret'>Secret (optional)</Label>
<div className='relative mt-1.5'>
<Input
id='secret'
type={showSecret ? 'text' : 'password'}
placeholder='Webhook secret for signature verification'
value={newWebhook.secret}
onChange={(e) => {
setNewWebhook({ ...newWebhook, secret: e.target.value })
setFormError(null) // Clear error when user types
}}
className='pr-10'
/>
<Button
type='button'
variant='ghost'
size='sm'
className='absolute top-0 right-0 h-full px-3'
onClick={() => setShowSecret(!showSecret)}
>
{showSecret ? <EyeOff className='h-4 w-4' /> : <Eye className='h-4 w-4' />}
</Button>
</div>
<p className='mt-2 text-muted-foreground text-xs'>
Used to sign webhook payloads with HMAC-SHA256
</p>
</div>
<Separator />
<div className='space-y-3'>
<Label>Filter by Level</Label>
<div className='flex gap-4'>
<div className='flex items-center gap-2'>
<Checkbox
id='level-info'
checked={newWebhook.levelFilter.includes('info')}
onCheckedChange={(checked) => {
if (checked) {
setNewWebhook({
...newWebhook,
levelFilter: [...newWebhook.levelFilter, 'info'],
})
} else {
setNewWebhook({
...newWebhook,
levelFilter: newWebhook.levelFilter.filter((l) => l !== 'info'),
})
}
}}
/>
<Label htmlFor='level-info' className='font-normal text-sm'>
Info
</Label>
</div>
<div className='flex items-center gap-2'>
<Checkbox
id='level-error'
checked={newWebhook.levelFilter.includes('error')}
onCheckedChange={(checked) => {
if (checked) {
setNewWebhook({
...newWebhook,
levelFilter: [...newWebhook.levelFilter, 'error'],
})
} else {
setNewWebhook({
...newWebhook,
levelFilter: newWebhook.levelFilter.filter((l) => l !== 'error'),
})
}
}}
/>
<Label htmlFor='level-error' className='font-normal text-sm'>
Error
</Label>
</div>
</div>
</div>
<div className='space-y-3'>
<Label>Filter by Trigger</Label>
<div className='grid grid-cols-3 gap-3'>
{(
['api', 'webhook', 'schedule', 'manual', 'chat'] as NotificationTrigger[]
).map((trigger) => (
<div key={trigger} className='flex items-center gap-2'>
<Checkbox
id={`trigger-${trigger}`}
checked={newWebhook.triggerFilter.includes(trigger)}
onCheckedChange={(checked) => {
if (checked) {
setNewWebhook({
...newWebhook,
triggerFilter: [...newWebhook.triggerFilter, trigger],
})
} else {
setNewWebhook({
...newWebhook,
triggerFilter: newWebhook.triggerFilter.filter(
(t) => t !== trigger
),
})
}
}}
/>
<Label
htmlFor={`trigger-${trigger}`}
className='font-normal text-sm capitalize'
>
{trigger}
</Label>
</div>
))}
</div>
</div>
<Separator />
<div className='space-y-3'>
<Label>Include in Payload</Label>
<div className='space-y-2'>
<div className='flex items-center gap-2'>
<Checkbox
id='include-output'
checked={newWebhook.includeFinalOutput}
onCheckedChange={(checked) =>
setNewWebhook({ ...newWebhook, includeFinalOutput: !!checked })
}
/>
<Label htmlFor='include-output' className='font-normal text-sm'>
Final output
</Label>
</div>
<div className='flex items-center gap-2'>
<Checkbox
id='include-spans'
checked={newWebhook.includeTraceSpans}
onCheckedChange={(checked) =>
setNewWebhook({ ...newWebhook, includeTraceSpans: !!checked })
}
/>
<Label htmlFor='include-spans' className='font-normal text-sm'>
Trace spans (detailed execution steps)
</Label>
</div>
<div className='flex items-center gap-2'>
<Checkbox
id='include-rate-limits'
checked={newWebhook.includeRateLimits}
onCheckedChange={(checked) =>
setNewWebhook({ ...newWebhook, includeRateLimits: !!checked })
}
/>
<Label htmlFor='include-rate-limits' className='font-normal text-sm'>
Rate limits (workflow execution limits)
</Label>
</div>
<div className='flex items-center gap-2'>
<Checkbox
id='include-usage-data'
checked={newWebhook.includeUsageData}
onCheckedChange={(checked) =>
setNewWebhook({ ...newWebhook, includeUsageData: !!checked })
}
/>
<Label htmlFor='include-usage-data' className='font-normal text-sm'>
Usage data (billing period cost and limits)
</Label>
</div>
</div>
<p className='mt-1 text-muted-foreground text-xs'>
By default, only basic metadata and cost information is included
</p>
</div>
</div>
</div>
<div className='flex-shrink-0 border-t bg-background p-4'>
{editingWebhookId && (
<Button
variant='outline'
onClick={cancelEdit}
disabled={isCreating}
className='mr-2'
>
Cancel
</Button>
)}
<Button
onClick={editingWebhookId ? updateWebhook : createWebhook}
disabled={
isCreating ||
!newWebhook.url ||
newWebhook.levelFilter.length === 0 ||
newWebhook.triggerFilter.length === 0
}
className={editingWebhookId ? '' : 'w-full'}
>
{isCreating ? (
<>
<RefreshCw className='mr-2 h-4 w-4 animate-spin' />
{editingWebhookId ? 'Updating...' : 'Creating...'}
</>
) : (
<>
{editingWebhookId ? (
<>
<Edit2 className='mr-2 h-4 w-4' />
Update Webhook
</>
) : (
<>
<Plus className='mr-2 h-4 w-4' />
Create Webhook
</>
)}
</>
)}
</Button>
</div>
</TabsContent>
</Tabs>
</DialogContent>
</Dialog>
)
}

View File

@@ -12,6 +12,7 @@ import {
StepForward,
Store,
Trash2,
Webhook,
WifiOff,
X,
} from 'lucide-react'
@@ -31,6 +32,7 @@ import {
TooltipTrigger,
} from '@/components/ui'
import { useSession } from '@/lib/auth-client'
import { getEnv, isTruthy } from '@/lib/env'
import { createLogger } from '@/lib/logs/console/logger'
import { cn } from '@/lib/utils'
import { useUserPermissionsContext } from '@/app/workspace/[workspaceId]/providers/workspace-permissions-provider'
@@ -38,6 +40,7 @@ import {
DeploymentControls,
ExportControls,
TemplateModal,
WebhookSettings,
} from '@/app/workspace/[workspaceId]/w/[workflowId]/components/control-bar/components'
import { useWorkflowExecution } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution'
import {
@@ -111,6 +114,7 @@ export function ControlBar({ hasValidationErrors = false }: ControlBarProps) {
const [, forceUpdate] = useState({})
const [isExpanded, setIsExpanded] = useState(false)
const [isTemplateModalOpen, setIsTemplateModalOpen] = useState(false)
const [isWebhookSettingsOpen, setIsWebhookSettingsOpen] = useState(false)
const [isAutoLayouting, setIsAutoLayouting] = useState(false)
// Delete workflow state - grouped for better organization
@@ -705,6 +709,41 @@ export function ControlBar({ hasValidationErrors = false }: ControlBarProps) {
/>
)
/**
* Render webhook settings button
*/
const renderWebhookButton = () => {
// Only show webhook button if Trigger.dev is enabled
const isTriggerEnabled = isTruthy(getEnv('NEXT_PUBLIC_TRIGGER_DEV_ENABLED'))
if (!isTriggerEnabled) return null
const canEdit = userPermissions.canEdit
const isDisabled = !canEdit
const getTooltipText = () => {
if (!canEdit) return 'Admin permission required to configure webhooks'
return 'Configure webhook notifications'
}
return (
<Tooltip>
<TooltipTrigger asChild>
<Button
variant='outline'
size='icon'
disabled={isDisabled}
onClick={() => setIsWebhookSettingsOpen(true)}
className='h-12 w-12 rounded-[11px] border bg-card text-card-foreground shadow-xs hover:bg-secondary'
>
<Webhook className='h-5 w-5' />
<span className='sr-only'>Webhook Settings</span>
</Button>
</TooltipTrigger>
<TooltipContent>{getTooltipText()}</TooltipContent>
</Tooltip>
)
}
/**
* Render workflow duplicate button
*/
@@ -1215,6 +1254,7 @@ export function ControlBar({ hasValidationErrors = false }: ControlBarProps) {
<div className='fixed top-4 right-4 z-20 flex items-center gap-1'>
{renderDisconnectionNotice()}
{renderToggleButton()}
{isExpanded && renderWebhookButton()}
{isExpanded && <ExportControls />}
{isExpanded && renderAutoLayoutButton()}
{isExpanded && renderPublishButton()}
@@ -1232,6 +1272,15 @@ export function ControlBar({ hasValidationErrors = false }: ControlBarProps) {
workflowId={activeWorkflowId}
/>
)}
{/* Webhook Settings */}
{activeWorkflowId && (
<WebhookSettings
open={isWebhookSettingsOpen}
onOpenChange={setIsWebhookSettingsOpen}
workflowId={activeWorkflowId}
/>
)}
</div>
)
}

View File

@@ -0,0 +1,404 @@
import { createHmac } from 'crypto'
import { task, wait } from '@trigger.dev/sdk'
import { and, eq, isNull, lte, or, sql } from 'drizzle-orm'
import { v4 as uuidv4 } from 'uuid'
import { createLogger } from '@/lib/logs/console/logger'
import type { WorkflowExecutionLog } from '@/lib/logs/types'
import { decryptSecret } from '@/lib/utils'
import { db } from '@/db'
import {
workflowLogWebhook,
workflowLogWebhookDelivery,
workflow as workflowTable,
} from '@/db/schema'
const logger = createLogger('LogsWebhookDelivery')
// Quick retry strategy: 5 attempts over ~15 minutes
// Most webhook failures are transient and resolve quickly
const MAX_ATTEMPTS = 5
const RETRY_DELAYS = [
5 * 1000, // 5 seconds (1st retry)
15 * 1000, // 15 seconds (2nd retry)
60 * 1000, // 1 minute (3rd retry)
3 * 60 * 1000, // 3 minutes (4th retry)
10 * 60 * 1000, // 10 minutes (5th and final retry)
]
// Add jitter to prevent thundering herd problem (up to 10% of delay)
function getRetryDelayWithJitter(baseDelay: number): number {
const jitter = Math.random() * 0.1 * baseDelay
return Math.floor(baseDelay + jitter)
}
interface WebhookPayload {
id: string
type: 'workflow.execution.completed'
timestamp: number
data: {
workflowId: string
executionId: string
status: 'success' | 'error'
level: string
trigger: string
startedAt: string
endedAt: string
totalDurationMs: number
cost?: any
files?: any
finalOutput?: any
traceSpans?: any[]
rateLimits?: {
sync: {
limit: number
remaining: number
resetAt: string
}
async: {
limit: number
remaining: number
resetAt: string
}
}
usage?: {
currentPeriodCost: number
limit: number
plan: string
isExceeded: boolean
}
}
links: {
log: string
execution: string
}
}
function generateSignature(secret: string, timestamp: number, body: string): string {
const signatureBase = `${timestamp}.${body}`
const hmac = createHmac('sha256', secret)
hmac.update(signatureBase)
return hmac.digest('hex')
}
export const logsWebhookDelivery = task({
id: 'logs-webhook-delivery',
retry: {
maxAttempts: 1, // We handle retries manually within the task
},
run: async (params: {
deliveryId: string
subscriptionId: string
log: WorkflowExecutionLog
}) => {
const { deliveryId, subscriptionId, log } = params
try {
const [subscription] = await db
.select()
.from(workflowLogWebhook)
.where(eq(workflowLogWebhook.id, subscriptionId))
.limit(1)
if (!subscription || !subscription.active) {
logger.warn(`Subscription ${subscriptionId} not found or inactive`)
await db
.update(workflowLogWebhookDelivery)
.set({
status: 'failed',
errorMessage: 'Subscription not found or inactive',
updatedAt: new Date(),
})
.where(eq(workflowLogWebhookDelivery.id, deliveryId))
return
}
// Atomically claim this delivery row for processing and increment attempts
const claimed = await db
.update(workflowLogWebhookDelivery)
.set({
status: 'in_progress',
attempts: sql`${workflowLogWebhookDelivery.attempts} + 1`,
lastAttemptAt: new Date(),
updatedAt: new Date(),
})
.where(
and(
eq(workflowLogWebhookDelivery.id, deliveryId),
eq(workflowLogWebhookDelivery.status, 'pending'),
// Only claim if not scheduled in the future or schedule has arrived
or(
isNull(workflowLogWebhookDelivery.nextAttemptAt),
lte(workflowLogWebhookDelivery.nextAttemptAt, new Date())
)
)
)
.returning({ attempts: workflowLogWebhookDelivery.attempts })
if (claimed.length === 0) {
logger.info(`Delivery ${deliveryId} not claimable (already in progress or not due)`)
return
}
const attempts = claimed[0].attempts
const timestamp = Date.now()
const eventId = `evt_${uuidv4()}`
const payload: WebhookPayload = {
id: eventId,
type: 'workflow.execution.completed',
timestamp,
data: {
workflowId: log.workflowId,
executionId: log.executionId,
status: log.level === 'error' ? 'error' : 'success',
level: log.level,
trigger: log.trigger,
startedAt: log.startedAt,
endedAt: log.endedAt || log.startedAt,
totalDurationMs: log.totalDurationMs,
cost: log.cost,
files: (log as any).files,
},
links: {
log: `/v1/logs/${log.id}`,
execution: `/v1/logs/executions/${log.executionId}`,
},
}
if (subscription.includeFinalOutput && log.executionData) {
payload.data.finalOutput = (log.executionData as any).finalOutput
}
if (subscription.includeTraceSpans && log.executionData) {
payload.data.traceSpans = (log.executionData as any).traceSpans
}
// Fetch rate limits and usage data if requested
if ((subscription.includeRateLimits || subscription.includeUsageData) && log.executionData) {
const executionData = log.executionData as any
const needsRateLimits = subscription.includeRateLimits && executionData.includeRateLimits
const needsUsage = subscription.includeUsageData && executionData.includeUsageData
if (needsRateLimits || needsUsage) {
const { getUserLimits } = await import('@/app/api/v1/logs/meta')
const workflow = await db
.select()
.from(workflowTable)
.where(eq(workflowTable.id, log.workflowId))
.limit(1)
if (workflow.length > 0) {
try {
const limits = await getUserLimits(workflow[0].userId)
if (needsRateLimits) {
payload.data.rateLimits = limits.workflowExecutionRateLimit
}
if (needsUsage) {
payload.data.usage = limits.usage
}
} catch (error) {
logger.warn('Failed to fetch limits/usage for webhook', { error })
}
}
}
}
const body = JSON.stringify(payload)
const headers: Record<string, string> = {
'Content-Type': 'application/json',
'sim-event': 'workflow.execution.completed',
'sim-timestamp': timestamp.toString(),
'sim-delivery-id': deliveryId,
'Idempotency-Key': deliveryId,
}
if (subscription.secret) {
const { decrypted } = await decryptSecret(subscription.secret)
const signature = generateSignature(decrypted, timestamp, body)
headers['sim-signature'] = `t=${timestamp},v1=${signature}`
}
logger.info(`Attempting webhook delivery ${deliveryId} (attempt ${attempts})`, {
url: subscription.url,
executionId: log.executionId,
})
const controller = new AbortController()
const timeoutId = setTimeout(() => controller.abort(), 30000)
try {
const response = await fetch(subscription.url, {
method: 'POST',
headers,
body,
signal: controller.signal,
})
clearTimeout(timeoutId)
const responseBody = await response.text().catch(() => '')
const truncatedBody = responseBody.slice(0, 1000)
if (response.ok) {
await db
.update(workflowLogWebhookDelivery)
.set({
status: 'success',
attempts,
lastAttemptAt: new Date(),
responseStatus: response.status,
responseBody: truncatedBody,
errorMessage: null,
updatedAt: new Date(),
})
.where(
and(
eq(workflowLogWebhookDelivery.id, deliveryId),
eq(workflowLogWebhookDelivery.status, 'in_progress')
)
)
logger.info(`Webhook delivery ${deliveryId} succeeded`, {
status: response.status,
executionId: log.executionId,
})
return { success: true }
}
const isRetryable = response.status >= 500 || response.status === 429
if (!isRetryable || attempts >= MAX_ATTEMPTS) {
await db
.update(workflowLogWebhookDelivery)
.set({
status: 'failed',
attempts,
lastAttemptAt: new Date(),
responseStatus: response.status,
responseBody: truncatedBody,
errorMessage: `HTTP ${response.status}`,
updatedAt: new Date(),
})
.where(
and(
eq(workflowLogWebhookDelivery.id, deliveryId),
eq(workflowLogWebhookDelivery.status, 'in_progress')
)
)
logger.warn(`Webhook delivery ${deliveryId} failed permanently`, {
status: response.status,
attempts,
executionId: log.executionId,
})
return { success: false }
}
const baseDelay = RETRY_DELAYS[Math.min(attempts - 1, RETRY_DELAYS.length - 1)]
const delayWithJitter = getRetryDelayWithJitter(baseDelay)
const nextAttemptAt = new Date(Date.now() + delayWithJitter)
await db
.update(workflowLogWebhookDelivery)
.set({
status: 'pending',
attempts,
lastAttemptAt: new Date(),
nextAttemptAt,
responseStatus: response.status,
responseBody: truncatedBody,
errorMessage: `HTTP ${response.status} - will retry`,
updatedAt: new Date(),
})
.where(
and(
eq(workflowLogWebhookDelivery.id, deliveryId),
eq(workflowLogWebhookDelivery.status, 'in_progress')
)
)
// Schedule the next retry
await wait.for({ seconds: delayWithJitter / 1000 })
// Recursively call the task for retry
await logsWebhookDelivery.trigger({
deliveryId,
subscriptionId,
log,
})
return { success: false, retrying: true }
} catch (error: any) {
clearTimeout(timeoutId)
if (error.name === 'AbortError') {
logger.error(`Webhook delivery ${deliveryId} timed out`, {
executionId: log.executionId,
attempts,
})
error.message = 'Request timeout after 30 seconds'
}
const baseDelay = RETRY_DELAYS[Math.min(attempts - 1, RETRY_DELAYS.length - 1)]
const delayWithJitter = getRetryDelayWithJitter(baseDelay)
const nextAttemptAt = new Date(Date.now() + delayWithJitter)
await db
.update(workflowLogWebhookDelivery)
.set({
status: attempts >= MAX_ATTEMPTS ? 'failed' : 'pending',
attempts,
lastAttemptAt: new Date(),
nextAttemptAt: attempts >= MAX_ATTEMPTS ? null : nextAttemptAt,
errorMessage: error.message,
updatedAt: new Date(),
})
.where(
and(
eq(workflowLogWebhookDelivery.id, deliveryId),
eq(workflowLogWebhookDelivery.status, 'in_progress')
)
)
if (attempts >= MAX_ATTEMPTS) {
logger.error(`Webhook delivery ${deliveryId} failed after ${attempts} attempts`, {
error: error.message,
executionId: log.executionId,
})
return { success: false }
}
// Schedule the next retry
await wait.for({ seconds: delayWithJitter / 1000 })
// Recursively call the task for retry
await logsWebhookDelivery.trigger({
deliveryId,
subscriptionId,
log,
})
return { success: false, retrying: true }
}
} catch (error: any) {
logger.error(`Webhook delivery ${deliveryId} encountered unexpected error`, {
error: error.message,
stack: error.stack,
})
// Mark as failed for unexpected errors
await db
.update(workflowLogWebhookDelivery)
.set({
status: 'failed',
errorMessage: `Unexpected error: ${error.message}`,
updatedAt: new Date(),
})
.where(eq(workflowLogWebhookDelivery.id, deliveryId))
return { success: false, error: error.message }
}
},
})

View File

@@ -0,0 +1,43 @@
CREATE TYPE "public"."webhook_delivery_status" AS ENUM('pending', 'in_progress', 'success', 'failed');--> statement-breakpoint
CREATE TABLE "workflow_log_webhook" (
"id" text PRIMARY KEY NOT NULL,
"workflow_id" text NOT NULL,
"url" text NOT NULL,
"secret" text,
"include_final_output" boolean DEFAULT false NOT NULL,
"include_trace_spans" boolean DEFAULT false NOT NULL,
"include_rate_limits" boolean DEFAULT false NOT NULL,
"include_usage_data" boolean DEFAULT false NOT NULL,
"level_filter" text[] DEFAULT ARRAY['info', 'error']::text[] NOT NULL,
"trigger_filter" text[] DEFAULT ARRAY['api', 'webhook', 'schedule', 'manual', 'chat']::text[] NOT NULL,
"active" boolean DEFAULT true NOT NULL,
"created_at" timestamp DEFAULT now() NOT NULL,
"updated_at" timestamp DEFAULT now() NOT NULL
);
--> statement-breakpoint
CREATE TABLE "workflow_log_webhook_delivery" (
"id" text PRIMARY KEY NOT NULL,
"subscription_id" text NOT NULL,
"workflow_id" text NOT NULL,
"execution_id" text NOT NULL,
"status" "webhook_delivery_status" DEFAULT 'pending' NOT NULL,
"attempts" integer DEFAULT 0 NOT NULL,
"last_attempt_at" timestamp,
"next_attempt_at" timestamp,
"response_status" integer,
"response_body" text,
"error_message" text,
"created_at" timestamp DEFAULT now() NOT NULL,
"updated_at" timestamp DEFAULT now() NOT NULL
);
--> statement-breakpoint
ALTER TABLE "user_rate_limits" ADD COLUMN "api_endpoint_requests" integer DEFAULT 0 NOT NULL;--> statement-breakpoint
ALTER TABLE "workflow_log_webhook" ADD CONSTRAINT "workflow_log_webhook_workflow_id_workflow_id_fk" FOREIGN KEY ("workflow_id") REFERENCES "public"."workflow"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
ALTER TABLE "workflow_log_webhook_delivery" ADD CONSTRAINT "workflow_log_webhook_delivery_subscription_id_workflow_log_webhook_id_fk" FOREIGN KEY ("subscription_id") REFERENCES "public"."workflow_log_webhook"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
ALTER TABLE "workflow_log_webhook_delivery" ADD CONSTRAINT "workflow_log_webhook_delivery_workflow_id_workflow_id_fk" FOREIGN KEY ("workflow_id") REFERENCES "public"."workflow"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
CREATE INDEX "workflow_log_webhook_workflow_id_idx" ON "workflow_log_webhook" USING btree ("workflow_id");--> statement-breakpoint
CREATE INDEX "workflow_log_webhook_active_idx" ON "workflow_log_webhook" USING btree ("active");--> statement-breakpoint
CREATE INDEX "workflow_log_webhook_delivery_subscription_id_idx" ON "workflow_log_webhook_delivery" USING btree ("subscription_id");--> statement-breakpoint
CREATE INDEX "workflow_log_webhook_delivery_execution_id_idx" ON "workflow_log_webhook_delivery" USING btree ("execution_id");--> statement-breakpoint
CREATE INDEX "workflow_log_webhook_delivery_status_idx" ON "workflow_log_webhook_delivery" USING btree ("status");--> statement-breakpoint
CREATE INDEX "workflow_log_webhook_delivery_next_attempt_idx" ON "workflow_log_webhook_delivery" USING btree ("next_attempt_at");

File diff suppressed because it is too large Load Diff

View File

@@ -596,6 +596,13 @@
"when": 1757348840739,
"tag": "0085_daffy_blacklash",
"breakpoints": true
},
{
"idx": 86,
"version": "7",
"when": 1757441740591,
"tag": "0086_breezy_sister_grimm",
"breakpoints": true
}
]
}

View File

@@ -434,6 +434,75 @@ export const webhook = pgTable(
}
)
export const workflowLogWebhook = pgTable(
'workflow_log_webhook',
{
id: text('id').primaryKey(),
workflowId: text('workflow_id')
.notNull()
.references(() => workflow.id, { onDelete: 'cascade' }),
url: text('url').notNull(),
secret: text('secret'),
includeFinalOutput: boolean('include_final_output').notNull().default(false),
includeTraceSpans: boolean('include_trace_spans').notNull().default(false),
includeRateLimits: boolean('include_rate_limits').notNull().default(false),
includeUsageData: boolean('include_usage_data').notNull().default(false),
levelFilter: text('level_filter')
.array()
.notNull()
.default(sql`ARRAY['info', 'error']::text[]`),
triggerFilter: text('trigger_filter')
.array()
.notNull()
.default(sql`ARRAY['api', 'webhook', 'schedule', 'manual', 'chat']::text[]`),
active: boolean('active').notNull().default(true),
createdAt: timestamp('created_at').notNull().defaultNow(),
updatedAt: timestamp('updated_at').notNull().defaultNow(),
},
(table) => ({
workflowIdIdx: index('workflow_log_webhook_workflow_id_idx').on(table.workflowId),
activeIdx: index('workflow_log_webhook_active_idx').on(table.active),
})
)
export const webhookDeliveryStatusEnum = pgEnum('webhook_delivery_status', [
'pending',
'in_progress',
'success',
'failed',
])
export const workflowLogWebhookDelivery = pgTable(
'workflow_log_webhook_delivery',
{
id: text('id').primaryKey(),
subscriptionId: text('subscription_id')
.notNull()
.references(() => workflowLogWebhook.id, { onDelete: 'cascade' }),
workflowId: text('workflow_id')
.notNull()
.references(() => workflow.id, { onDelete: 'cascade' }),
executionId: text('execution_id').notNull(),
status: webhookDeliveryStatusEnum('status').notNull().default('pending'),
attempts: integer('attempts').notNull().default(0),
lastAttemptAt: timestamp('last_attempt_at'),
nextAttemptAt: timestamp('next_attempt_at'),
responseStatus: integer('response_status'),
responseBody: text('response_body'),
errorMessage: text('error_message'),
createdAt: timestamp('created_at').notNull().defaultNow(),
updatedAt: timestamp('updated_at').notNull().defaultNow(),
},
(table) => ({
subscriptionIdIdx: index('workflow_log_webhook_delivery_subscription_id_idx').on(
table.subscriptionId
),
executionIdIdx: index('workflow_log_webhook_delivery_execution_id_idx').on(table.executionId),
statusIdx: index('workflow_log_webhook_delivery_status_idx').on(table.status),
nextAttemptIdx: index('workflow_log_webhook_delivery_next_attempt_idx').on(table.nextAttemptAt),
})
)
export const apiKey = pgTable('api_key', {
id: text('id').primaryKey(),
userId: text('user_id')
@@ -536,6 +605,7 @@ export const userRateLimits = pgTable('user_rate_limits', {
referenceId: text('reference_id').primaryKey(), // Can be userId or organizationId for pooling
syncApiRequests: integer('sync_api_requests').notNull().default(0), // Sync API requests counter
asyncApiRequests: integer('async_api_requests').notNull().default(0), // Async API requests counter
apiEndpointRequests: integer('api_endpoint_requests').notNull().default(0), // External API endpoint requests counter
windowStart: timestamp('window_start').notNull().defaultNow(),
lastRequestAt: timestamp('last_request_at').notNull().defaultNow(),
isRateLimited: boolean('is_rate_limited').notNull().default(false),

101
apps/sim/lib/logs/events.ts Normal file
View File

@@ -0,0 +1,101 @@
import { and, eq } from 'drizzle-orm'
import { v4 as uuidv4 } from 'uuid'
import { createLogger } from '@/lib/logs/console/logger'
import type { WorkflowExecutionLog } from '@/lib/logs/types'
import { logsWebhookDelivery } from '@/background/logs-webhook-delivery'
import { db } from '@/db'
import { workflowLogWebhook, workflowLogWebhookDelivery } from '@/db/schema'
const logger = createLogger('LogsEventEmitter')
export async function emitWorkflowExecutionCompleted(log: WorkflowExecutionLog): Promise<void> {
try {
const subscriptions = await db
.select()
.from(workflowLogWebhook)
.where(
and(eq(workflowLogWebhook.workflowId, log.workflowId), eq(workflowLogWebhook.active, true))
)
if (subscriptions.length === 0) {
return
}
logger.debug(
`Found ${subscriptions.length} active webhook subscriptions for workflow ${log.workflowId}`
)
for (const subscription of subscriptions) {
const levelMatches = subscription.levelFilter?.includes(log.level) ?? true
const triggerMatches = subscription.triggerFilter?.includes(log.trigger) ?? true
if (!levelMatches || !triggerMatches) {
logger.debug(`Skipping subscription ${subscription.id} due to filter mismatch`, {
level: log.level,
trigger: log.trigger,
levelFilter: subscription.levelFilter,
triggerFilter: subscription.triggerFilter,
})
continue
}
const deliveryId = uuidv4()
await db.insert(workflowLogWebhookDelivery).values({
id: deliveryId,
subscriptionId: subscription.id,
workflowId: log.workflowId,
executionId: log.executionId,
status: 'pending',
attempts: 0,
nextAttemptAt: new Date(),
})
// Prepare the log data based on subscription settings
const webhookLog = {
...log,
executionData: {},
}
// Only include executionData fields that are requested
if (log.executionData) {
const data = log.executionData as any
const webhookData: any = {}
if (subscription.includeFinalOutput && data.finalOutput) {
webhookData.finalOutput = data.finalOutput
}
if (subscription.includeTraceSpans && data.traceSpans) {
webhookData.traceSpans = data.traceSpans
}
// For rate limits and usage, we'll need to fetch them in the webhook delivery
// since they're user-specific and may change
if (subscription.includeRateLimits) {
webhookData.includeRateLimits = true
}
if (subscription.includeUsageData) {
webhookData.includeUsageData = true
}
webhookLog.executionData = webhookData
}
await logsWebhookDelivery.trigger({
deliveryId,
subscriptionId: subscription.id,
log: webhookLog,
})
logger.info(`Enqueued webhook delivery ${deliveryId} for subscription ${subscription.id}`)
}
} catch (error) {
logger.error('Failed to emit workflow execution completed event', {
error,
workflowId: log.workflowId,
executionId: log.executionId,
})
}
}

View File

@@ -4,6 +4,7 @@ import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
import { checkUsageStatus, maybeSendUsageThresholdEmail } from '@/lib/billing/core/usage'
import { getCostMultiplier, isBillingEnabled } from '@/lib/environment'
import { createLogger } from '@/lib/logs/console/logger'
import { emitWorkflowExecutionCompleted } from '@/lib/logs/events'
import { snapshotService } from '@/lib/logs/execution/snapshot/service'
import type {
BlockOutputData,
@@ -306,7 +307,7 @@ export class ExecutionLogger implements IExecutionLoggerService {
logger.debug(`Completed workflow execution ${executionId}`)
return {
const completedLog: WorkflowExecutionLog = {
id: updatedLog.id,
workflowId: updatedLog.workflowId,
executionId: updatedLog.executionId,
@@ -320,6 +321,15 @@ export class ExecutionLogger implements IExecutionLoggerService {
cost: updatedLog.cost as any,
createdAt: updatedLog.createdAt.toISOString(),
}
emitWorkflowExecutionCompleted(completedLog).catch((error) => {
logger.error('Failed to emit workflow execution completed event', {
error,
executionId,
})
})
return completedLog
}
async getWorkflowExecution(executionId: string): Promise<WorkflowExecutionLog | null> {

View File

@@ -7,6 +7,7 @@ import {
MANUAL_EXECUTION_LIMIT,
RATE_LIMIT_WINDOW_MS,
RATE_LIMITS,
type RateLimitCounterType,
type SubscriptionPlan,
type TriggerType,
} from '@/services/queue/types'
@@ -43,6 +44,50 @@ export class RateLimiter {
return userId
}
/**
* Determine which counter type to use based on trigger type and async flag
*/
private getCounterType(triggerType: TriggerType, isAsync: boolean): RateLimitCounterType {
if (triggerType === 'api-endpoint') {
return 'api-endpoint'
}
return isAsync ? 'async' : 'sync'
}
/**
* Get the rate limit for a specific counter type
*/
private getRateLimitForCounter(
config: (typeof RATE_LIMITS)[SubscriptionPlan],
counterType: RateLimitCounterType
): number {
switch (counterType) {
case 'api-endpoint':
return config.apiEndpointRequestsPerMinute
case 'async':
return config.asyncApiExecutionsPerMinute
case 'sync':
return config.syncApiExecutionsPerMinute
}
}
/**
* Get the current count from a rate limit record for a specific counter type
*/
private getCountFromRecord(
record: { syncApiRequests: number; asyncApiRequests: number; apiEndpointRequests: number },
counterType: RateLimitCounterType
): number {
switch (counterType) {
case 'api-endpoint':
return record.apiEndpointRequests
case 'async':
return record.asyncApiRequests
case 'sync':
return record.syncApiRequests
}
}
/**
* Check if user can execute a workflow with organization-aware rate limiting
* Manual executions bypass rate limiting entirely
@@ -64,11 +109,10 @@ export class RateLimiter {
const subscriptionPlan = (subscription?.plan || 'free') as SubscriptionPlan
const rateLimitKey = this.getRateLimitKey(userId, subscription)
const limit = RATE_LIMITS[subscriptionPlan]
const execLimit = isAsync
? limit.asyncApiExecutionsPerMinute
: limit.syncApiExecutionsPerMinute
const counterType = this.getCounterType(triggerType, isAsync)
const execLimit = this.getRateLimitForCounter(limit, counterType)
const now = new Date()
const windowStart = new Date(now.getTime() - RATE_LIMIT_WINDOW_MS)
@@ -86,8 +130,9 @@ export class RateLimiter {
.insert(userRateLimits)
.values({
referenceId: rateLimitKey,
syncApiRequests: isAsync ? 0 : 1,
asyncApiRequests: isAsync ? 1 : 0,
syncApiRequests: counterType === 'sync' ? 1 : 0,
asyncApiRequests: counterType === 'async' ? 1 : 0,
apiEndpointRequests: counterType === 'api-endpoint' ? 1 : 0,
windowStart: now,
lastRequestAt: now,
isRateLimited: false,
@@ -96,8 +141,9 @@ export class RateLimiter {
target: userRateLimits.referenceId,
set: {
// Only reset if window is still expired (avoid race condition)
syncApiRequests: sql`CASE WHEN ${userRateLimits.windowStart} < ${windowStart.toISOString()} THEN ${isAsync ? 0 : 1} ELSE ${userRateLimits.syncApiRequests} + ${isAsync ? 0 : 1} END`,
asyncApiRequests: sql`CASE WHEN ${userRateLimits.windowStart} < ${windowStart.toISOString()} THEN ${isAsync ? 1 : 0} ELSE ${userRateLimits.asyncApiRequests} + ${isAsync ? 1 : 0} END`,
syncApiRequests: sql`CASE WHEN ${userRateLimits.windowStart} < ${windowStart.toISOString()} THEN ${counterType === 'sync' ? 1 : 0} ELSE ${userRateLimits.syncApiRequests} + ${counterType === 'sync' ? 1 : 0} END`,
asyncApiRequests: sql`CASE WHEN ${userRateLimits.windowStart} < ${windowStart.toISOString()} THEN ${counterType === 'async' ? 1 : 0} ELSE ${userRateLimits.asyncApiRequests} + ${counterType === 'async' ? 1 : 0} END`,
apiEndpointRequests: sql`CASE WHEN ${userRateLimits.windowStart} < ${windowStart.toISOString()} THEN ${counterType === 'api-endpoint' ? 1 : 0} ELSE ${userRateLimits.apiEndpointRequests} + ${counterType === 'api-endpoint' ? 1 : 0} END`,
windowStart: sql`CASE WHEN ${userRateLimits.windowStart} < ${windowStart.toISOString()} THEN ${now.toISOString()} ELSE ${userRateLimits.windowStart} END`,
lastRequestAt: now,
isRateLimited: false,
@@ -107,13 +153,12 @@ export class RateLimiter {
.returning({
syncApiRequests: userRateLimits.syncApiRequests,
asyncApiRequests: userRateLimits.asyncApiRequests,
apiEndpointRequests: userRateLimits.apiEndpointRequests,
windowStart: userRateLimits.windowStart,
})
const insertedRecord = result[0]
const actualCount = isAsync
? insertedRecord.asyncApiRequests
: insertedRecord.syncApiRequests
const actualCount = this.getCountFromRecord(insertedRecord, counterType)
// Check if we exceeded the limit
if (actualCount > execLimit) {
@@ -160,21 +205,22 @@ export class RateLimiter {
const updateResult = await db
.update(userRateLimits)
.set({
...(isAsync
? { asyncApiRequests: sql`${userRateLimits.asyncApiRequests} + 1` }
: { syncApiRequests: sql`${userRateLimits.syncApiRequests} + 1` }),
...(counterType === 'api-endpoint'
? { apiEndpointRequests: sql`${userRateLimits.apiEndpointRequests} + 1` }
: counterType === 'async'
? { asyncApiRequests: sql`${userRateLimits.asyncApiRequests} + 1` }
: { syncApiRequests: sql`${userRateLimits.syncApiRequests} + 1` }),
lastRequestAt: now,
})
.where(eq(userRateLimits.referenceId, rateLimitKey))
.returning({
asyncApiRequests: userRateLimits.asyncApiRequests,
syncApiRequests: userRateLimits.syncApiRequests,
apiEndpointRequests: userRateLimits.apiEndpointRequests,
})
const updatedRecord = updateResult[0]
const actualNewRequests = isAsync
? updatedRecord.asyncApiRequests
: updatedRecord.syncApiRequests
const actualNewRequests = this.getCountFromRecord(updatedRecord, counterType)
// Check if we exceeded the limit AFTER the atomic increment
if (actualNewRequests > execLimit) {
@@ -264,11 +310,11 @@ export class RateLimiter {
const subscriptionPlan = (subscription?.plan || 'free') as SubscriptionPlan
const rateLimitKey = this.getRateLimitKey(userId, subscription)
const limit = RATE_LIMITS[subscriptionPlan]
const execLimit = isAsync
? limit.asyncApiExecutionsPerMinute
: limit.syncApiExecutionsPerMinute
const counterType = this.getCounterType(triggerType, isAsync)
const execLimit = this.getRateLimitForCounter(limit, counterType)
const now = new Date()
const windowStart = new Date(now.getTime() - RATE_LIMIT_WINDOW_MS)
@@ -287,7 +333,7 @@ export class RateLimiter {
}
}
const used = isAsync ? rateLimitRecord.asyncApiRequests : rateLimitRecord.syncApiRequests
const used = this.getCountFromRecord(rateLimitRecord, counterType)
return {
used,
limit: execLimit,

View File

@@ -6,15 +6,19 @@ import type { userRateLimits } from '@/db/schema'
export type UserRateLimit = InferSelectModel<typeof userRateLimits>
// Trigger types for rate limiting
export type TriggerType = 'api' | 'webhook' | 'schedule' | 'manual' | 'chat'
export type TriggerType = 'api' | 'webhook' | 'schedule' | 'manual' | 'chat' | 'api-endpoint'
// Rate limit counter types - which counter to increment in the database
export type RateLimitCounterType = 'sync' | 'async' | 'api-endpoint'
// Subscription plan types
export type SubscriptionPlan = 'free' | 'pro' | 'team' | 'enterprise'
// Rate limit configuration (applies to all non-manual trigger types: api, webhook, schedule, chat)
// Rate limit configuration (applies to all non-manual trigger types: api, webhook, schedule, chat, api-endpoint)
export interface RateLimitConfig {
syncApiExecutionsPerMinute: number
asyncApiExecutionsPerMinute: number
apiEndpointRequestsPerMinute: number // For external API endpoints like /api/v1/logs
}
// Rate limit window duration in milliseconds
@@ -27,18 +31,22 @@ export const RATE_LIMITS: Record<SubscriptionPlan, RateLimitConfig> = {
free: {
syncApiExecutionsPerMinute: Number.parseInt(env.RATE_LIMIT_FREE_SYNC) || 10,
asyncApiExecutionsPerMinute: Number.parseInt(env.RATE_LIMIT_FREE_ASYNC) || 50,
apiEndpointRequestsPerMinute: 10,
},
pro: {
syncApiExecutionsPerMinute: Number.parseInt(env.RATE_LIMIT_PRO_SYNC) || 25,
asyncApiExecutionsPerMinute: Number.parseInt(env.RATE_LIMIT_PRO_ASYNC) || 200,
apiEndpointRequestsPerMinute: 30,
},
team: {
syncApiExecutionsPerMinute: Number.parseInt(env.RATE_LIMIT_TEAM_SYNC) || 75,
asyncApiExecutionsPerMinute: Number.parseInt(env.RATE_LIMIT_TEAM_ASYNC) || 500,
apiEndpointRequestsPerMinute: 60,
},
enterprise: {
syncApiExecutionsPerMinute: Number.parseInt(env.RATE_LIMIT_ENTERPRISE_SYNC) || 150,
asyncApiExecutionsPerMinute: Number.parseInt(env.RATE_LIMIT_ENTERPRISE_ASYNC) || 1000,
apiEndpointRequestsPerMinute: 120,
},
}