improvement(logging): created common logging utility for all executions

This commit is contained in:
Waleed Latif
2025-03-09 04:37:50 -07:00
parent 3bb6b5bf02
commit fd510d582e
4 changed files with 140 additions and 109 deletions

View File

@@ -3,7 +3,7 @@ import { Cron } from 'croner'
import { eq, lte } from 'drizzle-orm'
import { v4 as uuidv4 } from 'uuid'
import { z } from 'zod'
import { persistLog } from '@/lib/logging'
import { persistExecutionError, persistExecutionLogs } from '@/lib/logging'
import { decryptSecret } from '@/lib/utils'
import { mergeSubblockState } from '@/stores/workflows/utils'
import { BlockState, WorkflowState } from '@/stores/workflows/workflow/types'
@@ -280,40 +280,8 @@ export async function GET(req: NextRequest) {
const executor = new Executor(serializedWorkflow, currentBlockStates, decryptedEnvVars)
const result = await executor.execute(schedule.workflowId)
// Log each execution step
for (const log of result.logs || []) {
await persistLog({
id: uuidv4(),
workflowId: schedule.workflowId,
executionId,
level: log.success ? 'info' : 'error',
message: log.success
? `Block ${log.blockName || log.blockId} (${log.blockType}): ${JSON.stringify(log.output?.response || {})}`
: `Block ${log.blockName || log.blockId} (${log.blockType}): ${log.error || 'Failed'}`,
duration: log.success ? `${log.durationMs}ms` : 'NA',
trigger: 'schedule',
createdAt: new Date(log.endedAt || log.startedAt),
})
}
// Calculate total duration from successful block logs
const totalDuration = (result.logs || [])
.filter((log) => log.success)
.reduce((sum, log) => sum + log.durationMs, 0)
// Log the final execution result
await persistLog({
id: uuidv4(),
workflowId: schedule.workflowId,
executionId,
level: result.success ? 'info' : 'error',
message: result.success
? 'Scheduled workflow executed successfully'
: `Scheduled workflow execution failed: ${result.error}`,
duration: result.success ? `${totalDuration}ms` : 'NA',
trigger: 'schedule',
createdAt: new Date(),
})
// Log each execution step and the final result
await persistExecutionLogs(schedule.workflowId, executionId, result, 'schedule')
// Only update next_run_at if execution was successful
if (result.success) {
@@ -348,17 +316,10 @@ export async function GET(req: NextRequest) {
console.log('Execution failed, scheduled retry at:', nextRetryAt.toISOString())
}
} catch (error: any) {
console.error('Error executing workflow:', error)
await persistLog({
id: uuidv4(),
workflowId: schedule.workflowId,
executionId,
level: 'error',
message: `Scheduled workflow execution failed: ${error.message || 'Unknown error'}`,
duration: 'NA',
trigger: 'schedule',
createdAt: new Date(),
})
console.error(`Error executing scheduled workflow ${schedule.workflowId}:`, error)
// Log the error
await persistExecutionError(schedule.workflowId, executionId, error, 'schedule')
// On error, increment next_run_at by a small delay to prevent immediate retries
const retryDelay = 1 * 60 * 1000 // 1 minute delay

View File

@@ -1,7 +1,7 @@
import { NextRequest, NextResponse } from 'next/server'
import { and, eq } from 'drizzle-orm'
import { v4 as uuidv4 } from 'uuid'
import { persistLog } from '@/lib/logging'
import { persistExecutionError, persistExecutionLogs } from '@/lib/logging'
import { decryptSecret } from '@/lib/utils'
import { mergeSubblockState } from '@/stores/workflows/utils'
import { db } from '@/db'
@@ -98,6 +98,9 @@ export async function POST(
request: NextRequest,
{ params }: { params: Promise<{ path: string }> }
) {
const executionId = uuidv4()
let foundWorkflow: any = null
try {
const path = (await params).path
@@ -120,8 +123,8 @@ export async function POST(
return new NextResponse('Webhook not found', { status: 404 })
}
const { webhook: foundWebhook, workflow: foundWorkflow } = webhooks[0]
const executionId = uuidv4()
const { webhook: foundWebhook, workflow: workflowData } = webhooks[0]
foundWorkflow = workflowData
// Handle provider-specific verification and authentication
if (foundWebhook.provider && foundWebhook.provider !== 'whatsapp') {
@@ -257,26 +260,19 @@ export async function POST(
console.log(`Successfully executed workflow ${foundWorkflow.id}`)
// Log each execution step
for (const log of result.logs || []) {
await persistLog({
id: uuidv4(),
workflowId: foundWorkflow.id,
executionId,
level: log.success ? 'info' : 'error',
message: log.success
? `Block ${log.blockName || log.blockId} (${log.blockType}): ${JSON.stringify(log.output?.response || {})}`
: `Block ${log.blockName || log.blockId} (${log.blockType}): ${log.error || 'Failed'}`,
duration: log.success ? `${log.durationMs}ms` : 'NA',
trigger: 'webhook',
createdAt: new Date(log.endedAt || log.startedAt),
})
}
// Log each execution step and the final result
await persistExecutionLogs(foundWorkflow.id, executionId, result, 'webhook')
// Return the execution result
return NextResponse.json(result, { status: 200 })
} catch (error: any) {
console.error('Error processing webhook:', error)
// Log the error if we have a workflow ID
if (foundWorkflow?.id) {
await persistExecutionError(foundWorkflow.id, executionId, error, 'webhook')
}
return new NextResponse(`Internal Server Error: ${error.message}`, { status: 500 })
}
}

View File

@@ -2,7 +2,7 @@ import { NextRequest } from 'next/server'
import { eq } from 'drizzle-orm'
import { v4 as uuidv4 } from 'uuid'
import { z } from 'zod'
import { persistLog } from '@/lib/logging'
import { persistExecutionError, persistExecutionLogs } from '@/lib/logging'
import { decryptSecret } from '@/lib/utils'
import { mergeSubblockState } from '@/stores/workflows/utils'
import { WorkflowState } from '@/stores/workflows/workflow/types'
@@ -116,54 +116,13 @@ async function executeWorkflow(workflow: any, input?: any) {
const executor = new Executor(serializedWorkflow, currentBlockStates, decryptedEnvVars, input)
const result = await executor.execute(workflowId)
// Log each execution step
for (const log of result.logs || []) {
await persistLog({
id: uuidv4(),
workflowId,
executionId,
level: log.success ? 'info' : 'error',
message: log.success
? `Block ${log.blockName || log.blockId} (${log.blockType}): ${JSON.stringify(log.output?.response || {})}`
: `Block ${log.blockName || log.blockId} (${log.blockType}): ${log.error || 'Failed'}`,
duration: log.success ? `${log.durationMs}ms` : 'NA',
trigger: 'api',
createdAt: new Date(log.endedAt || log.startedAt),
})
}
// Calculate total duration from successful block logs
const totalDuration = (result.logs || [])
.filter((log) => log.success)
.reduce((sum, log) => sum + log.durationMs, 0)
// Log the final execution result
await persistLog({
id: uuidv4(),
workflowId,
executionId,
level: result.success ? 'info' : 'error',
message: result.success
? 'API workflow executed successfully'
: `API workflow execution failed: ${result.error}`,
duration: result.success ? `${totalDuration}ms` : 'NA',
trigger: 'api',
createdAt: new Date(),
})
// Log each execution step and the final result
await persistExecutionLogs(workflowId, executionId, result, 'api')
return result
} catch (error: any) {
// Log the error
await persistLog({
id: uuidv4(),
workflowId,
executionId,
level: 'error',
message: `API workflow execution failed: ${error.message}`,
duration: 'NA',
trigger: 'api',
createdAt: new Date(),
})
await persistExecutionError(workflowId, executionId, error, 'api')
throw error
} finally {
runningExecutions.delete(workflowId)

View File

@@ -1,5 +1,7 @@
import { v4 as uuidv4 } from 'uuid'
import { db } from '@/db'
import { workflowLogs } from '@/db/schema'
import { BlockLog, ExecutionResult as ExecutorResult } from '@/executor/types'
export interface LogEntry {
id: string
@@ -15,3 +17,116 @@ export interface LogEntry {
export async function persistLog(log: LogEntry) {
await db.insert(workflowLogs).values(log)
}
/**
* Persists logs for a workflow execution, including individual block logs and the final result
* @param workflowId - The ID of the workflow
* @param executionId - The ID of the execution
* @param result - The execution result
* @param triggerType - The type of trigger (api, webhook, schedule)
*/
export async function persistExecutionLogs(
workflowId: string,
executionId: string,
result: ExecutorResult,
triggerType: 'api' | 'webhook' | 'schedule'
) {
try {
// Log each execution step
for (const log of result.logs || []) {
await persistLog({
id: uuidv4(),
workflowId,
executionId,
level: log.success ? 'info' : 'error',
message: log.success
? `Block ${log.blockName || log.blockId} (${log.blockType || 'unknown'}): ${JSON.stringify(log.output?.response || {})}`
: `Block ${log.blockName || log.blockId} (${log.blockType || 'unknown'}): ${log.error || 'Failed'}`,
duration: log.success ? `${log.durationMs}ms` : 'NA',
trigger: triggerType,
createdAt: new Date(log.endedAt || log.startedAt),
})
}
// Calculate total duration from successful block logs
const totalDuration = (result.logs || [])
.filter((log) => log.success)
.reduce((sum, log) => sum + log.durationMs, 0)
// Get trigger-specific message
const successMessage = getTriggerSuccessMessage(triggerType)
const errorPrefix = getTriggerErrorPrefix(triggerType)
// Log the final execution result
await persistLog({
id: uuidv4(),
workflowId,
executionId,
level: result.success ? 'info' : 'error',
message: result.success ? successMessage : `${errorPrefix} execution failed: ${result.error}`,
duration: result.success ? `${totalDuration}ms` : 'NA',
trigger: triggerType,
createdAt: new Date(),
})
} catch (error: any) {
console.error(`Error persisting execution logs: ${error.message}`, error)
}
}
/**
* Persists an error log for a workflow execution
* @param workflowId - The ID of the workflow
* @param executionId - The ID of the execution
* @param error - The error that occurred
* @param triggerType - The type of trigger (api, webhook, schedule)
*/
export async function persistExecutionError(
workflowId: string,
executionId: string,
error: Error,
triggerType: 'api' | 'webhook' | 'schedule'
) {
try {
const errorPrefix = getTriggerErrorPrefix(triggerType)
await persistLog({
id: uuidv4(),
workflowId,
executionId,
level: 'error',
message: `${errorPrefix} execution failed: ${error.message}`,
duration: 'NA',
trigger: triggerType,
createdAt: new Date(),
})
} catch (logError: any) {
console.error(`Error persisting execution error log: ${logError.message}`, logError)
}
}
// Helper functions for trigger-specific messages
function getTriggerSuccessMessage(triggerType: 'api' | 'webhook' | 'schedule'): string {
switch (triggerType) {
case 'api':
return 'API workflow executed successfully'
case 'webhook':
return 'Webhook workflow executed successfully'
case 'schedule':
return 'Scheduled workflow executed successfully'
default:
return 'Workflow executed successfully'
}
}
function getTriggerErrorPrefix(triggerType: 'api' | 'webhook' | 'schedule'): string {
switch (triggerType) {
case 'api':
return 'API workflow'
case 'webhook':
return 'Webhook workflow'
case 'schedule':
return 'Scheduled workflow'
default:
return 'Workflow'
}
}