From fd510d582e2ae95ab656aa36900608598bc711fe Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Sun, 9 Mar 2025 04:37:50 -0700 Subject: [PATCH] improvement(logging): created common logging utility for all executions --- app/api/scheduled/execute/route.ts | 53 ++--------- app/api/webhooks/trigger/[path]/route.ts | 32 +++---- app/api/workflow/[id]/execute/route.ts | 49 +--------- lib/logging.ts | 115 +++++++++++++++++++++++ 4 files changed, 140 insertions(+), 109 deletions(-) diff --git a/app/api/scheduled/execute/route.ts b/app/api/scheduled/execute/route.ts index f2af3e0a4..c267b52f6 100644 --- a/app/api/scheduled/execute/route.ts +++ b/app/api/scheduled/execute/route.ts @@ -3,7 +3,7 @@ import { Cron } from 'croner' import { eq, lte } from 'drizzle-orm' import { v4 as uuidv4 } from 'uuid' import { z } from 'zod' -import { persistLog } from '@/lib/logging' +import { persistExecutionError, persistExecutionLogs } from '@/lib/logging' import { decryptSecret } from '@/lib/utils' import { mergeSubblockState } from '@/stores/workflows/utils' import { BlockState, WorkflowState } from '@/stores/workflows/workflow/types' @@ -280,40 +280,8 @@ export async function GET(req: NextRequest) { const executor = new Executor(serializedWorkflow, currentBlockStates, decryptedEnvVars) const result = await executor.execute(schedule.workflowId) - // Log each execution step - for (const log of result.logs || []) { - await persistLog({ - id: uuidv4(), - workflowId: schedule.workflowId, - executionId, - level: log.success ? 'info' : 'error', - message: log.success - ? `Block ${log.blockName || log.blockId} (${log.blockType}): ${JSON.stringify(log.output?.response || {})}` - : `Block ${log.blockName || log.blockId} (${log.blockType}): ${log.error || 'Failed'}`, - duration: log.success ? `${log.durationMs}ms` : 'NA', - trigger: 'schedule', - createdAt: new Date(log.endedAt || log.startedAt), - }) - } - - // Calculate total duration from successful block logs - const totalDuration = (result.logs || []) - .filter((log) => log.success) - .reduce((sum, log) => sum + log.durationMs, 0) - - // Log the final execution result - await persistLog({ - id: uuidv4(), - workflowId: schedule.workflowId, - executionId, - level: result.success ? 'info' : 'error', - message: result.success - ? 'Scheduled workflow executed successfully' - : `Scheduled workflow execution failed: ${result.error}`, - duration: result.success ? `${totalDuration}ms` : 'NA', - trigger: 'schedule', - createdAt: new Date(), - }) + // Log each execution step and the final result + await persistExecutionLogs(schedule.workflowId, executionId, result, 'schedule') // Only update next_run_at if execution was successful if (result.success) { @@ -348,17 +316,10 @@ export async function GET(req: NextRequest) { console.log('Execution failed, scheduled retry at:', nextRetryAt.toISOString()) } } catch (error: any) { - console.error('Error executing workflow:', error) - await persistLog({ - id: uuidv4(), - workflowId: schedule.workflowId, - executionId, - level: 'error', - message: `Scheduled workflow execution failed: ${error.message || 'Unknown error'}`, - duration: 'NA', - trigger: 'schedule', - createdAt: new Date(), - }) + console.error(`Error executing scheduled workflow ${schedule.workflowId}:`, error) + + // Log the error + await persistExecutionError(schedule.workflowId, executionId, error, 'schedule') // On error, increment next_run_at by a small delay to prevent immediate retries const retryDelay = 1 * 60 * 1000 // 1 minute delay diff --git a/app/api/webhooks/trigger/[path]/route.ts b/app/api/webhooks/trigger/[path]/route.ts index 091feba0c..60c073aba 100644 --- a/app/api/webhooks/trigger/[path]/route.ts +++ b/app/api/webhooks/trigger/[path]/route.ts @@ -1,7 +1,7 @@ import { NextRequest, NextResponse } from 'next/server' import { and, eq } from 'drizzle-orm' import { v4 as uuidv4 } from 'uuid' -import { persistLog } from '@/lib/logging' +import { persistExecutionError, persistExecutionLogs } from '@/lib/logging' import { decryptSecret } from '@/lib/utils' import { mergeSubblockState } from '@/stores/workflows/utils' import { db } from '@/db' @@ -98,6 +98,9 @@ export async function POST( request: NextRequest, { params }: { params: Promise<{ path: string }> } ) { + const executionId = uuidv4() + let foundWorkflow: any = null + try { const path = (await params).path @@ -120,8 +123,8 @@ export async function POST( return new NextResponse('Webhook not found', { status: 404 }) } - const { webhook: foundWebhook, workflow: foundWorkflow } = webhooks[0] - const executionId = uuidv4() + const { webhook: foundWebhook, workflow: workflowData } = webhooks[0] + foundWorkflow = workflowData // Handle provider-specific verification and authentication if (foundWebhook.provider && foundWebhook.provider !== 'whatsapp') { @@ -257,26 +260,19 @@ export async function POST( console.log(`Successfully executed workflow ${foundWorkflow.id}`) - // Log each execution step - for (const log of result.logs || []) { - await persistLog({ - id: uuidv4(), - workflowId: foundWorkflow.id, - executionId, - level: log.success ? 'info' : 'error', - message: log.success - ? `Block ${log.blockName || log.blockId} (${log.blockType}): ${JSON.stringify(log.output?.response || {})}` - : `Block ${log.blockName || log.blockId} (${log.blockType}): ${log.error || 'Failed'}`, - duration: log.success ? `${log.durationMs}ms` : 'NA', - trigger: 'webhook', - createdAt: new Date(log.endedAt || log.startedAt), - }) - } + // Log each execution step and the final result + await persistExecutionLogs(foundWorkflow.id, executionId, result, 'webhook') // Return the execution result return NextResponse.json(result, { status: 200 }) } catch (error: any) { console.error('Error processing webhook:', error) + + // Log the error if we have a workflow ID + if (foundWorkflow?.id) { + await persistExecutionError(foundWorkflow.id, executionId, error, 'webhook') + } + return new NextResponse(`Internal Server Error: ${error.message}`, { status: 500 }) } } diff --git a/app/api/workflow/[id]/execute/route.ts b/app/api/workflow/[id]/execute/route.ts index e408bf99e..7cd4ee620 100644 --- a/app/api/workflow/[id]/execute/route.ts +++ b/app/api/workflow/[id]/execute/route.ts @@ -2,7 +2,7 @@ import { NextRequest } from 'next/server' import { eq } from 'drizzle-orm' import { v4 as uuidv4 } from 'uuid' import { z } from 'zod' -import { persistLog } from '@/lib/logging' +import { persistExecutionError, persistExecutionLogs } from '@/lib/logging' import { decryptSecret } from '@/lib/utils' import { mergeSubblockState } from '@/stores/workflows/utils' import { WorkflowState } from '@/stores/workflows/workflow/types' @@ -116,54 +116,13 @@ async function executeWorkflow(workflow: any, input?: any) { const executor = new Executor(serializedWorkflow, currentBlockStates, decryptedEnvVars, input) const result = await executor.execute(workflowId) - // Log each execution step - for (const log of result.logs || []) { - await persistLog({ - id: uuidv4(), - workflowId, - executionId, - level: log.success ? 'info' : 'error', - message: log.success - ? `Block ${log.blockName || log.blockId} (${log.blockType}): ${JSON.stringify(log.output?.response || {})}` - : `Block ${log.blockName || log.blockId} (${log.blockType}): ${log.error || 'Failed'}`, - duration: log.success ? `${log.durationMs}ms` : 'NA', - trigger: 'api', - createdAt: new Date(log.endedAt || log.startedAt), - }) - } - - // Calculate total duration from successful block logs - const totalDuration = (result.logs || []) - .filter((log) => log.success) - .reduce((sum, log) => sum + log.durationMs, 0) - - // Log the final execution result - await persistLog({ - id: uuidv4(), - workflowId, - executionId, - level: result.success ? 'info' : 'error', - message: result.success - ? 'API workflow executed successfully' - : `API workflow execution failed: ${result.error}`, - duration: result.success ? `${totalDuration}ms` : 'NA', - trigger: 'api', - createdAt: new Date(), - }) + // Log each execution step and the final result + await persistExecutionLogs(workflowId, executionId, result, 'api') return result } catch (error: any) { // Log the error - await persistLog({ - id: uuidv4(), - workflowId, - executionId, - level: 'error', - message: `API workflow execution failed: ${error.message}`, - duration: 'NA', - trigger: 'api', - createdAt: new Date(), - }) + await persistExecutionError(workflowId, executionId, error, 'api') throw error } finally { runningExecutions.delete(workflowId) diff --git a/lib/logging.ts b/lib/logging.ts index 3ad47f0c3..2145b2cab 100644 --- a/lib/logging.ts +++ b/lib/logging.ts @@ -1,5 +1,7 @@ +import { v4 as uuidv4 } from 'uuid' import { db } from '@/db' import { workflowLogs } from '@/db/schema' +import { BlockLog, ExecutionResult as ExecutorResult } from '@/executor/types' export interface LogEntry { id: string @@ -15,3 +17,116 @@ export interface LogEntry { export async function persistLog(log: LogEntry) { await db.insert(workflowLogs).values(log) } + +/** + * Persists logs for a workflow execution, including individual block logs and the final result + * @param workflowId - The ID of the workflow + * @param executionId - The ID of the execution + * @param result - The execution result + * @param triggerType - The type of trigger (api, webhook, schedule) + */ +export async function persistExecutionLogs( + workflowId: string, + executionId: string, + result: ExecutorResult, + triggerType: 'api' | 'webhook' | 'schedule' +) { + try { + // Log each execution step + for (const log of result.logs || []) { + await persistLog({ + id: uuidv4(), + workflowId, + executionId, + level: log.success ? 'info' : 'error', + message: log.success + ? `Block ${log.blockName || log.blockId} (${log.blockType || 'unknown'}): ${JSON.stringify(log.output?.response || {})}` + : `Block ${log.blockName || log.blockId} (${log.blockType || 'unknown'}): ${log.error || 'Failed'}`, + duration: log.success ? `${log.durationMs}ms` : 'NA', + trigger: triggerType, + createdAt: new Date(log.endedAt || log.startedAt), + }) + } + + // Calculate total duration from successful block logs + const totalDuration = (result.logs || []) + .filter((log) => log.success) + .reduce((sum, log) => sum + log.durationMs, 0) + + // Get trigger-specific message + const successMessage = getTriggerSuccessMessage(triggerType) + const errorPrefix = getTriggerErrorPrefix(triggerType) + + // Log the final execution result + await persistLog({ + id: uuidv4(), + workflowId, + executionId, + level: result.success ? 'info' : 'error', + message: result.success ? successMessage : `${errorPrefix} execution failed: ${result.error}`, + duration: result.success ? `${totalDuration}ms` : 'NA', + trigger: triggerType, + createdAt: new Date(), + }) + } catch (error: any) { + console.error(`Error persisting execution logs: ${error.message}`, error) + } +} + +/** + * Persists an error log for a workflow execution + * @param workflowId - The ID of the workflow + * @param executionId - The ID of the execution + * @param error - The error that occurred + * @param triggerType - The type of trigger (api, webhook, schedule) + */ +export async function persistExecutionError( + workflowId: string, + executionId: string, + error: Error, + triggerType: 'api' | 'webhook' | 'schedule' +) { + try { + const errorPrefix = getTriggerErrorPrefix(triggerType) + + await persistLog({ + id: uuidv4(), + workflowId, + executionId, + level: 'error', + message: `${errorPrefix} execution failed: ${error.message}`, + duration: 'NA', + trigger: triggerType, + createdAt: new Date(), + }) + } catch (logError: any) { + console.error(`Error persisting execution error log: ${logError.message}`, logError) + } +} + +// Helper functions for trigger-specific messages +function getTriggerSuccessMessage(triggerType: 'api' | 'webhook' | 'schedule'): string { + switch (triggerType) { + case 'api': + return 'API workflow executed successfully' + case 'webhook': + return 'Webhook workflow executed successfully' + case 'schedule': + return 'Scheduled workflow executed successfully' + default: + return 'Workflow executed successfully' + } +} + +function getTriggerErrorPrefix(triggerType: 'api' | 'webhook' | 'schedule'): string { + switch (triggerType) { + case 'api': + return 'API workflow' + case 'webhook': + return 'Webhook workflow' + case 'schedule': + return 'Scheduled workflow' + default: + return 'Workflow' + } +}