mirror of
https://github.com/simstudioai/sim.git
synced 2026-01-09 15:07:55 -05:00
improvement(cleanup): remove old logging code (#779)
This commit is contained in:
committed by
GitHub
parent
386644e9f9
commit
022a61b77a
@@ -279,11 +279,6 @@ export function mockExecutionDependencies() {
|
||||
}
|
||||
})
|
||||
|
||||
vi.mock('@/lib/logs/execution-logger', () => ({
|
||||
persistExecutionLogs: vi.fn().mockResolvedValue(undefined),
|
||||
persistExecutionError: vi.fn().mockResolvedValue(undefined),
|
||||
}))
|
||||
|
||||
vi.mock('@/lib/logs/trace-spans', () => ({
|
||||
buildTraceSpans: vi.fn().mockReturnValue({
|
||||
traceSpans: [],
|
||||
@@ -380,7 +375,6 @@ export function mockWorkflowAccessValidation(shouldSucceed = true) {
|
||||
|
||||
export async function getMockedDependencies() {
|
||||
const utilsModule = await import('@/lib/utils')
|
||||
const logsModule = await import('@/lib/logs/execution-logger')
|
||||
const traceSpansModule = await import('@/lib/logs/trace-spans')
|
||||
const workflowUtilsModule = await import('@/lib/workflows/utils')
|
||||
const executorModule = await import('@/executor')
|
||||
@@ -389,8 +383,6 @@ export async function getMockedDependencies() {
|
||||
|
||||
return {
|
||||
decryptSecret: utilsModule.decryptSecret,
|
||||
persistExecutionLogs: logsModule.persistExecutionLogs,
|
||||
persistExecutionError: logsModule.persistExecutionError,
|
||||
buildTraceSpans: traceSpansModule.buildTraceSpans,
|
||||
updateWorkflowRunCounts: workflowUtilsModule.updateWorkflowRunCounts,
|
||||
Executor: executorModule.Executor,
|
||||
|
||||
@@ -1,759 +0,0 @@
|
||||
/**
|
||||
* Tests for workflow logs API route
|
||||
*
|
||||
* @vitest-environment node
|
||||
*/
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
|
||||
|
||||
describe('Workflow Logs API Route', () => {
|
||||
const mockWorkflowLogs = [
|
||||
{
|
||||
id: 'log-1',
|
||||
workflowId: 'workflow-1',
|
||||
executionId: 'exec-1',
|
||||
level: 'info',
|
||||
message: 'Workflow started',
|
||||
duration: '1.2s',
|
||||
trigger: 'manual',
|
||||
createdAt: new Date('2024-01-01T10:00:00.000Z'),
|
||||
},
|
||||
{
|
||||
id: 'log-2',
|
||||
workflowId: 'workflow-1',
|
||||
executionId: 'exec-1',
|
||||
level: 'error',
|
||||
message: 'API call failed',
|
||||
duration: '0.5s',
|
||||
trigger: 'manual',
|
||||
createdAt: new Date('2024-01-01T10:01:00.000Z'),
|
||||
},
|
||||
{
|
||||
id: 'log-3',
|
||||
workflowId: 'workflow-2',
|
||||
executionId: 'exec-2',
|
||||
level: 'info',
|
||||
message: 'Task completed',
|
||||
duration: '2.1s',
|
||||
trigger: 'api',
|
||||
createdAt: new Date('2024-01-01T10:02:00.000Z'),
|
||||
},
|
||||
{
|
||||
id: 'log-4',
|
||||
workflowId: 'workflow-3',
|
||||
executionId: 'exec-3',
|
||||
level: 'info',
|
||||
message: 'Root workflow executed',
|
||||
duration: '0.8s',
|
||||
trigger: 'webhook',
|
||||
createdAt: new Date('2024-01-01T10:03:00.000Z'),
|
||||
},
|
||||
]
|
||||
|
||||
const mockWorkflows = [
|
||||
{
|
||||
id: 'workflow-1',
|
||||
userId: 'user-123',
|
||||
workspaceId: 'workspace-123',
|
||||
folderId: 'folder-1',
|
||||
name: 'Test Workflow 1',
|
||||
color: '#3972F6',
|
||||
description: 'First test workflow',
|
||||
state: {},
|
||||
createdAt: new Date('2024-01-01T00:00:00.000Z'),
|
||||
updatedAt: new Date('2024-01-01T00:00:00.000Z'),
|
||||
},
|
||||
{
|
||||
id: 'workflow-2',
|
||||
userId: 'user-123',
|
||||
workspaceId: 'workspace-123',
|
||||
folderId: 'folder-2',
|
||||
name: 'Test Workflow 2',
|
||||
color: '#FF6B6B',
|
||||
description: 'Second test workflow',
|
||||
state: {},
|
||||
createdAt: new Date('2024-01-01T00:00:00.000Z'),
|
||||
updatedAt: new Date('2024-01-01T00:00:00.000Z'),
|
||||
},
|
||||
{
|
||||
id: 'workflow-3',
|
||||
userId: 'user-123',
|
||||
workspaceId: 'workspace-123',
|
||||
folderId: null,
|
||||
name: 'Test Workflow 3',
|
||||
color: '#22C55E',
|
||||
description: 'Third test workflow (no folder)',
|
||||
state: {},
|
||||
createdAt: new Date('2024-01-01T00:00:00.000Z'),
|
||||
updatedAt: new Date('2024-01-01T00:00:00.000Z'),
|
||||
},
|
||||
]
|
||||
|
||||
beforeEach(() => {
|
||||
vi.resetModules()
|
||||
vi.clearAllMocks()
|
||||
|
||||
vi.stubGlobal('crypto', {
|
||||
randomUUID: vi.fn().mockReturnValue('mock-request-id-12345678'),
|
||||
})
|
||||
|
||||
vi.doMock('@/lib/logs/console-logger', () => ({
|
||||
createLogger: vi.fn().mockReturnValue({
|
||||
debug: vi.fn(),
|
||||
info: vi.fn(),
|
||||
warn: vi.fn(),
|
||||
error: vi.fn(),
|
||||
}),
|
||||
}))
|
||||
|
||||
vi.doMock('@/lib/auth', () => ({
|
||||
getSession: vi.fn().mockResolvedValue({
|
||||
user: { id: 'user-123' },
|
||||
}),
|
||||
}))
|
||||
})
|
||||
|
||||
afterEach(() => {
|
||||
vi.clearAllMocks()
|
||||
})
|
||||
|
||||
function setupDatabaseMock({
|
||||
userWorkflows = mockWorkflows.filter((w) => w.userId === 'user-123'),
|
||||
logs = mockWorkflowLogs,
|
||||
workflows = mockWorkflows,
|
||||
throwError = false,
|
||||
} = {}) {
|
||||
const createChainableMock = (data: any[]) => {
|
||||
const mock = {
|
||||
select: vi.fn().mockReturnThis(),
|
||||
from: vi.fn().mockReturnThis(),
|
||||
where: vi.fn().mockReturnThis(),
|
||||
orderBy: vi.fn().mockReturnThis(),
|
||||
limit: vi.fn().mockReturnThis(),
|
||||
offset: vi.fn().mockReturnThis(),
|
||||
then: vi.fn((resolve) => resolve(data)),
|
||||
}
|
||||
return mock
|
||||
}
|
||||
|
||||
let dbCallCount = 0
|
||||
|
||||
vi.doMock('@/db', () => ({
|
||||
db: {
|
||||
select: vi.fn().mockImplementation((selection?: any) => {
|
||||
if (throwError) {
|
||||
throw new Error('Database connection failed')
|
||||
}
|
||||
|
||||
dbCallCount++
|
||||
|
||||
// First call: get user workflows
|
||||
if (dbCallCount === 1) {
|
||||
return createChainableMock(
|
||||
userWorkflows.map((w) => ({ id: w.id, folderId: w.folderId }))
|
||||
)
|
||||
}
|
||||
|
||||
// Second call: get logs
|
||||
if (dbCallCount === 2) {
|
||||
return createChainableMock(logs)
|
||||
}
|
||||
|
||||
// Third call: get count
|
||||
if (dbCallCount === 3) {
|
||||
// If selection is provided and has count property, return count result
|
||||
if (selection && Object.keys(selection).some((key) => key === 'count')) {
|
||||
return createChainableMock([{ count: logs.length }])
|
||||
}
|
||||
return createChainableMock([{ count: logs.length }])
|
||||
}
|
||||
|
||||
// Fourth call: get workflows for includeWorkflow
|
||||
if (dbCallCount === 4) {
|
||||
return createChainableMock(workflows)
|
||||
}
|
||||
|
||||
return createChainableMock([])
|
||||
}),
|
||||
},
|
||||
}))
|
||||
|
||||
vi.doMock('drizzle-orm', () => ({
|
||||
eq: vi.fn().mockImplementation((field, value) => ({ type: 'eq', field, value })),
|
||||
and: vi.fn().mockImplementation((...conditions) => ({ type: 'and', conditions })),
|
||||
or: vi.fn().mockImplementation((...conditions) => ({ type: 'or', conditions })),
|
||||
gte: vi.fn().mockImplementation((field, value) => ({ type: 'gte', field, value })),
|
||||
lte: vi.fn().mockImplementation((field, value) => ({ type: 'lte', field, value })),
|
||||
sql: vi.fn().mockImplementation((strings, ...values) => ({
|
||||
type: 'sql',
|
||||
sql: strings,
|
||||
values,
|
||||
})),
|
||||
}))
|
||||
|
||||
vi.doMock('@/db/schema', () => ({
|
||||
workflow: {
|
||||
id: 'workflow.id',
|
||||
userId: 'workflow.userId',
|
||||
workspaceId: 'workflow.workspaceId',
|
||||
name: 'workflow.name',
|
||||
color: 'workflow.color',
|
||||
description: 'workflow.description',
|
||||
},
|
||||
workflowLogs: {
|
||||
id: 'workflowLogs.id',
|
||||
workflowId: 'workflowLogs.workflowId',
|
||||
level: 'workflowLogs.level',
|
||||
trigger: 'workflowLogs.trigger',
|
||||
createdAt: 'workflowLogs.createdAt',
|
||||
message: 'workflowLogs.message',
|
||||
executionId: 'workflowLogs.executionId',
|
||||
},
|
||||
}))
|
||||
}
|
||||
|
||||
describe('GET /api/logs', () => {
|
||||
it('should return logs successfully with default parameters', async () => {
|
||||
setupDatabaseMock()
|
||||
|
||||
const url = new URL('http://localhost:3000/api/logs?workspaceId=workspace-123')
|
||||
const req = new Request(url.toString())
|
||||
|
||||
const { GET } = await import('./route')
|
||||
const response = await GET(req as any)
|
||||
const data = await response.json()
|
||||
|
||||
expect(response.status).toBe(200)
|
||||
expect(data).toHaveProperty('data')
|
||||
expect(data).toHaveProperty('total', 4)
|
||||
expect(data).toHaveProperty('page', 1)
|
||||
expect(data).toHaveProperty('pageSize', 100)
|
||||
expect(data).toHaveProperty('totalPages', 1)
|
||||
expect(Array.isArray(data.data)).toBe(true)
|
||||
expect(data.data).toHaveLength(4)
|
||||
})
|
||||
|
||||
it('should include workflow data when includeWorkflow=true', async () => {
|
||||
setupDatabaseMock()
|
||||
|
||||
const url = new URL(
|
||||
'http://localhost:3000/api/logs?workspaceId=workspace-123&includeWorkflow=true'
|
||||
)
|
||||
const req = new Request(url.toString())
|
||||
|
||||
const { GET } = await import('./route')
|
||||
const response = await GET(req as any)
|
||||
const data = await response.json()
|
||||
|
||||
expect(response.status).toBe(200)
|
||||
expect(data.data[0]).toHaveProperty('workflow')
|
||||
expect(data.data[0].workflow).toHaveProperty('name')
|
||||
expect(data.data[0].workflow).toHaveProperty('color')
|
||||
})
|
||||
|
||||
it('should filter logs by level', async () => {
|
||||
const errorLogs = mockWorkflowLogs.filter((log) => log.level === 'error')
|
||||
setupDatabaseMock({ logs: errorLogs })
|
||||
|
||||
const url = new URL('http://localhost:3000/api/logs?workspaceId=workspace-123&level=error')
|
||||
const req = new Request(url.toString())
|
||||
|
||||
const { GET } = await import('./route')
|
||||
const response = await GET(req as any)
|
||||
const data = await response.json()
|
||||
|
||||
expect(response.status).toBe(200)
|
||||
expect(data.data).toHaveLength(1)
|
||||
expect(data.data[0].level).toBe('error')
|
||||
})
|
||||
|
||||
it('should filter logs by specific workflow IDs', async () => {
|
||||
const workflow1Logs = mockWorkflowLogs.filter((log) => log.workflowId === 'workflow-1')
|
||||
setupDatabaseMock({ logs: workflow1Logs })
|
||||
|
||||
const url = new URL(
|
||||
'http://localhost:3000/api/logs?workspaceId=workspace-123&workflowIds=workflow-1'
|
||||
)
|
||||
const req = new Request(url.toString())
|
||||
|
||||
const { GET } = await import('./route')
|
||||
const response = await GET(req as any)
|
||||
const data = await response.json()
|
||||
|
||||
expect(response.status).toBe(200)
|
||||
expect(data.data).toHaveLength(2)
|
||||
expect(data.data.every((log: any) => log.workflowId === 'workflow-1')).toBe(true)
|
||||
})
|
||||
|
||||
it('should filter logs by multiple workflow IDs', async () => {
|
||||
// Only get logs for workflow-1 and workflow-2 (not workflow-3)
|
||||
const filteredLogs = mockWorkflowLogs.filter(
|
||||
(log) => log.workflowId === 'workflow-1' || log.workflowId === 'workflow-2'
|
||||
)
|
||||
setupDatabaseMock({ logs: filteredLogs })
|
||||
|
||||
const url = new URL(
|
||||
'http://localhost:3000/api/logs?workspaceId=workspace-123&workflowIds=workflow-1,workflow-2'
|
||||
)
|
||||
const req = new Request(url.toString())
|
||||
|
||||
const { GET } = await import('./route')
|
||||
const response = await GET(req as any)
|
||||
const data = await response.json()
|
||||
|
||||
expect(response.status).toBe(200)
|
||||
expect(data.data).toHaveLength(3)
|
||||
})
|
||||
|
||||
it('should filter logs by date range', async () => {
|
||||
const startDate = '2024-01-01T10:00:30.000Z'
|
||||
const filteredLogs = mockWorkflowLogs.filter(
|
||||
(log) => new Date(log.createdAt) >= new Date(startDate)
|
||||
)
|
||||
setupDatabaseMock({ logs: filteredLogs })
|
||||
|
||||
const url = new URL(
|
||||
`http://localhost:3000/api/logs?workspaceId=workspace-123&startDate=${startDate}`
|
||||
)
|
||||
const req = new Request(url.toString())
|
||||
|
||||
const { GET } = await import('./route')
|
||||
const response = await GET(req as any)
|
||||
const data = await response.json()
|
||||
|
||||
expect(response.status).toBe(200)
|
||||
expect(data.data).toHaveLength(filteredLogs.length)
|
||||
})
|
||||
|
||||
it('should search logs by message content', async () => {
|
||||
const searchLogs = mockWorkflowLogs.filter((log) =>
|
||||
log.message.toLowerCase().includes('failed')
|
||||
)
|
||||
setupDatabaseMock({ logs: searchLogs })
|
||||
|
||||
const url = new URL('http://localhost:3000/api/logs?workspaceId=workspace-123&search=failed')
|
||||
const req = new Request(url.toString())
|
||||
|
||||
const { GET } = await import('./route')
|
||||
const response = await GET(req as any)
|
||||
const data = await response.json()
|
||||
|
||||
expect(response.status).toBe(200)
|
||||
expect(data.data).toHaveLength(1)
|
||||
expect(data.data[0].message).toContain('failed')
|
||||
})
|
||||
|
||||
it('should handle pagination correctly', async () => {
|
||||
const paginatedLogs = mockWorkflowLogs.slice(1, 3)
|
||||
setupDatabaseMock({ logs: paginatedLogs })
|
||||
|
||||
const url = new URL(
|
||||
'http://localhost:3000/api/logs?workspaceId=workspace-123&limit=2&offset=1'
|
||||
)
|
||||
const req = new Request(url.toString())
|
||||
|
||||
const { GET } = await import('./route')
|
||||
const response = await GET(req as any)
|
||||
const data = await response.json()
|
||||
|
||||
expect(response.status).toBe(200)
|
||||
expect(data.data).toHaveLength(2)
|
||||
expect(data.page).toBe(1)
|
||||
expect(data.pageSize).toBe(2)
|
||||
expect(data.total).toBe(2)
|
||||
expect(data.totalPages).toBe(1)
|
||||
})
|
||||
|
||||
it('should return empty array when user has no workflows', async () => {
|
||||
setupDatabaseMock({ userWorkflows: [], logs: [], workflows: [] })
|
||||
|
||||
const url = new URL('http://localhost:3000/api/logs?workspaceId=workspace-123')
|
||||
const req = new Request(url.toString())
|
||||
|
||||
const { GET } = await import('./route')
|
||||
const response = await GET(req as any)
|
||||
const data = await response.json()
|
||||
|
||||
expect(response.status).toBe(200)
|
||||
expect(data.data).toEqual([])
|
||||
expect(data.total).toBe(0)
|
||||
})
|
||||
|
||||
it('should return 403 for unauthorized workflow access', async () => {
|
||||
// Set up mock to simulate user not owning the requested workflow
|
||||
setupDatabaseMock({
|
||||
userWorkflows: mockWorkflows.filter((w) => w.id !== 'unauthorized-workflow'),
|
||||
})
|
||||
|
||||
const url = new URL(
|
||||
'http://localhost:3000/api/logs?workspaceId=workspace-123&workflowIds=unauthorized-workflow'
|
||||
)
|
||||
const req = new Request(url.toString())
|
||||
|
||||
const { GET } = await import('./route')
|
||||
const response = await GET(req as any)
|
||||
const data = await response.json()
|
||||
|
||||
expect(response.status).toBe(403)
|
||||
expect(data).toHaveProperty('error', 'Unauthorized access to workflows')
|
||||
})
|
||||
|
||||
it('should return 401 for unauthenticated requests', async () => {
|
||||
// Mock auth to return no session
|
||||
vi.doMock('@/lib/auth', () => ({
|
||||
getSession: vi.fn().mockResolvedValue(null),
|
||||
}))
|
||||
|
||||
setupDatabaseMock()
|
||||
|
||||
const url = new URL('http://localhost:3000/api/logs?workspaceId=workspace-123')
|
||||
const req = new Request(url.toString())
|
||||
|
||||
const { GET } = await import('./route')
|
||||
const response = await GET(req as any)
|
||||
const data = await response.json()
|
||||
|
||||
expect(response.status).toBe(401)
|
||||
expect(data).toHaveProperty('error', 'Unauthorized')
|
||||
})
|
||||
|
||||
it('should validate query parameters', async () => {
|
||||
setupDatabaseMock()
|
||||
|
||||
const url = new URL('http://localhost:3000/api/logs?workspaceId=workspace-123&limit=invalid')
|
||||
const req = new Request(url.toString())
|
||||
|
||||
const { GET } = await import('./route')
|
||||
const response = await GET(req as any)
|
||||
const data = await response.json()
|
||||
|
||||
expect(response.status).toBe(400)
|
||||
expect(data).toHaveProperty('error', 'Invalid request parameters')
|
||||
expect(data).toHaveProperty('details')
|
||||
})
|
||||
|
||||
it('should handle database errors gracefully', async () => {
|
||||
setupDatabaseMock({ throwError: true })
|
||||
|
||||
const url = new URL('http://localhost:3000/api/logs?workspaceId=workspace-123')
|
||||
const req = new Request(url.toString())
|
||||
|
||||
const { GET } = await import('./route')
|
||||
const response = await GET(req as any)
|
||||
const data = await response.json()
|
||||
|
||||
expect(response.status).toBe(500)
|
||||
expect(data).toHaveProperty('error')
|
||||
})
|
||||
|
||||
it('should combine multiple filters correctly', async () => {
|
||||
const filteredLogs = mockWorkflowLogs.filter(
|
||||
(log) =>
|
||||
log.level === 'info' &&
|
||||
log.workflowId === 'workflow-1' &&
|
||||
log.message.toLowerCase().includes('started')
|
||||
)
|
||||
setupDatabaseMock({ logs: filteredLogs })
|
||||
|
||||
const url = new URL(
|
||||
'http://localhost:3000/api/logs?workspaceId=workspace-123&level=info&workflowIds=workflow-1&search=started'
|
||||
)
|
||||
const req = new Request(url.toString())
|
||||
|
||||
const { GET } = await import('./route')
|
||||
const response = await GET(req as any)
|
||||
const data = await response.json()
|
||||
|
||||
expect(response.status).toBe(200)
|
||||
expect(data.data).toHaveLength(1)
|
||||
expect(data.data[0].level).toBe('info')
|
||||
expect(data.data[0].workflowId).toBe('workflow-1')
|
||||
expect(data.data[0].message).toContain('started')
|
||||
})
|
||||
|
||||
it('should handle end date filter', async () => {
|
||||
const endDate = '2024-01-01T10:01:30.000Z'
|
||||
const filteredLogs = mockWorkflowLogs.filter(
|
||||
(log) => new Date(log.createdAt) <= new Date(endDate)
|
||||
)
|
||||
setupDatabaseMock({ logs: filteredLogs })
|
||||
|
||||
const url = new URL(
|
||||
`http://localhost:3000/api/logs?workspaceId=workspace-123&endDate=${endDate}`
|
||||
)
|
||||
const req = new Request(url.toString())
|
||||
|
||||
const { GET } = await import('./route')
|
||||
const response = await GET(req as any)
|
||||
const data = await response.json()
|
||||
|
||||
expect(response.status).toBe(200)
|
||||
expect(data.data).toHaveLength(2)
|
||||
})
|
||||
|
||||
it('should handle large offset values', async () => {
|
||||
setupDatabaseMock({ logs: [] })
|
||||
|
||||
const url = new URL(
|
||||
'http://localhost:3000/api/logs?workspaceId=workspace-123&limit=10&offset=1000'
|
||||
)
|
||||
const req = new Request(url.toString())
|
||||
|
||||
const { GET } = await import('./route')
|
||||
const response = await GET(req as any)
|
||||
const data = await response.json()
|
||||
|
||||
expect(response.status).toBe(200)
|
||||
expect(data.data).toEqual([])
|
||||
expect(data.page).toBe(101) // (1000 / 10) + 1
|
||||
expect(data.total).toBe(0)
|
||||
})
|
||||
|
||||
it('should handle search by execution ID', async () => {
|
||||
const searchLogs = mockWorkflowLogs.filter((log) => log.executionId?.includes('exec-1'))
|
||||
setupDatabaseMock({ logs: searchLogs })
|
||||
|
||||
const url = new URL('http://localhost:3000/api/logs?workspaceId=workspace-123&search=exec-1')
|
||||
const req = new Request(url.toString())
|
||||
|
||||
const { GET } = await import('./route')
|
||||
const response = await GET(req as any)
|
||||
const data = await response.json()
|
||||
|
||||
expect(response.status).toBe(200)
|
||||
expect(data.data).toHaveLength(2)
|
||||
expect(data.data.every((log: any) => log.executionId === 'exec-1')).toBe(true)
|
||||
})
|
||||
|
||||
it('should filter logs by single trigger type', async () => {
|
||||
const apiLogs = mockWorkflowLogs.filter((log) => log.trigger === 'api')
|
||||
setupDatabaseMock({ logs: apiLogs })
|
||||
|
||||
const url = new URL('http://localhost:3000/api/logs?workspaceId=workspace-123&triggers=api')
|
||||
const req = new Request(url.toString())
|
||||
|
||||
const { GET } = await import('./route')
|
||||
const response = await GET(req as any)
|
||||
const data = await response.json()
|
||||
|
||||
expect(response.status).toBe(200)
|
||||
expect(data.data).toHaveLength(1)
|
||||
expect(data.data[0].trigger).toBe('api')
|
||||
})
|
||||
|
||||
it('should filter logs by multiple trigger types', async () => {
|
||||
const manualAndApiLogs = mockWorkflowLogs.filter(
|
||||
(log) => log.trigger === 'manual' || log.trigger === 'api'
|
||||
)
|
||||
setupDatabaseMock({ logs: manualAndApiLogs })
|
||||
|
||||
const url = new URL(
|
||||
'http://localhost:3000/api/logs?workspaceId=workspace-123&triggers=manual,api'
|
||||
)
|
||||
const req = new Request(url.toString())
|
||||
|
||||
const { GET } = await import('./route')
|
||||
const response = await GET(req as any)
|
||||
const data = await response.json()
|
||||
|
||||
expect(response.status).toBe(200)
|
||||
expect(data.data).toHaveLength(3)
|
||||
expect(data.data.every((log: any) => ['manual', 'api'].includes(log.trigger))).toBe(true)
|
||||
})
|
||||
|
||||
it('should combine trigger filter with other filters', async () => {
|
||||
const filteredLogs = mockWorkflowLogs.filter(
|
||||
(log) => log.trigger === 'manual' && log.level === 'info' && log.workflowId === 'workflow-1'
|
||||
)
|
||||
setupDatabaseMock({ logs: filteredLogs })
|
||||
|
||||
const url = new URL(
|
||||
'http://localhost:3000/api/logs?workspaceId=workspace-123&triggers=manual&level=info&workflowIds=workflow-1'
|
||||
)
|
||||
const req = new Request(url.toString())
|
||||
|
||||
const { GET } = await import('./route')
|
||||
const response = await GET(req as any)
|
||||
const data = await response.json()
|
||||
|
||||
expect(response.status).toBe(200)
|
||||
expect(data.data).toHaveLength(1)
|
||||
expect(data.data[0].trigger).toBe('manual')
|
||||
expect(data.data[0].level).toBe('info')
|
||||
expect(data.data[0].workflowId).toBe('workflow-1')
|
||||
})
|
||||
|
||||
it('should filter logs by single folder ID', async () => {
|
||||
const folder1Logs = mockWorkflowLogs.filter((log) => log.workflowId === 'workflow-1')
|
||||
setupDatabaseMock({ logs: folder1Logs })
|
||||
|
||||
const url = new URL(
|
||||
'http://localhost:3000/api/logs?workspaceId=workspace-123&folderIds=folder-1'
|
||||
)
|
||||
const req = new Request(url.toString())
|
||||
|
||||
const { GET } = await import('./route')
|
||||
const response = await GET(req as any)
|
||||
const data = await response.json()
|
||||
|
||||
expect(response.status).toBe(200)
|
||||
expect(data.data).toHaveLength(2)
|
||||
expect(data.data.every((log: any) => log.workflowId === 'workflow-1')).toBe(true)
|
||||
})
|
||||
|
||||
it('should filter logs by multiple folder IDs', async () => {
|
||||
const folder1And2Logs = mockWorkflowLogs.filter(
|
||||
(log) => log.workflowId === 'workflow-1' || log.workflowId === 'workflow-2'
|
||||
)
|
||||
setupDatabaseMock({ logs: folder1And2Logs })
|
||||
|
||||
const url = new URL(
|
||||
'http://localhost:3000/api/logs?workspaceId=workspace-123&folderIds=folder-1,folder-2'
|
||||
)
|
||||
const req = new Request(url.toString())
|
||||
|
||||
const { GET } = await import('./route')
|
||||
const response = await GET(req as any)
|
||||
const data = await response.json()
|
||||
|
||||
expect(response.status).toBe(200)
|
||||
expect(data.data).toHaveLength(3)
|
||||
expect(
|
||||
data.data.every((log: any) => ['workflow-1', 'workflow-2'].includes(log.workflowId))
|
||||
).toBe(true)
|
||||
})
|
||||
|
||||
it('should filter logs by root folder (workflows without folders)', async () => {
|
||||
const rootLogs = mockWorkflowLogs.filter((log) => log.workflowId === 'workflow-3')
|
||||
setupDatabaseMock({ logs: rootLogs })
|
||||
|
||||
const url = new URL('http://localhost:3000/api/logs?workspaceId=workspace-123&folderIds=root')
|
||||
const req = new Request(url.toString())
|
||||
|
||||
const { GET } = await import('./route')
|
||||
const response = await GET(req as any)
|
||||
const data = await response.json()
|
||||
|
||||
expect(response.status).toBe(200)
|
||||
expect(data.data).toHaveLength(1)
|
||||
expect(data.data[0].workflowId).toBe('workflow-3')
|
||||
expect(data.data[0].message).toContain('Root workflow executed')
|
||||
})
|
||||
|
||||
it('should combine root folder with other folders', async () => {
|
||||
const rootAndFolder1Logs = mockWorkflowLogs.filter(
|
||||
(log) => log.workflowId === 'workflow-1' || log.workflowId === 'workflow-3'
|
||||
)
|
||||
setupDatabaseMock({ logs: rootAndFolder1Logs })
|
||||
|
||||
const url = new URL(
|
||||
'http://localhost:3000/api/logs?workspaceId=workspace-123&folderIds=root,folder-1'
|
||||
)
|
||||
const req = new Request(url.toString())
|
||||
|
||||
const { GET } = await import('./route')
|
||||
const response = await GET(req as any)
|
||||
const data = await response.json()
|
||||
|
||||
expect(response.status).toBe(200)
|
||||
expect(data.data).toHaveLength(3)
|
||||
expect(
|
||||
data.data.every((log: any) => ['workflow-1', 'workflow-3'].includes(log.workflowId))
|
||||
).toBe(true)
|
||||
})
|
||||
|
||||
it('should combine folder filter with workflow filter', async () => {
|
||||
// Filter by folder-1 and specific workflow-1 (should return same results)
|
||||
const filteredLogs = mockWorkflowLogs.filter((log) => log.workflowId === 'workflow-1')
|
||||
setupDatabaseMock({ logs: filteredLogs })
|
||||
|
||||
const url = new URL(
|
||||
'http://localhost:3000/api/logs?workspaceId=workspace-123&folderIds=folder-1&workflowIds=workflow-1'
|
||||
)
|
||||
const req = new Request(url.toString())
|
||||
|
||||
const { GET } = await import('./route')
|
||||
const response = await GET(req as any)
|
||||
const data = await response.json()
|
||||
|
||||
expect(response.status).toBe(200)
|
||||
expect(data.data).toHaveLength(2)
|
||||
expect(data.data.every((log: any) => log.workflowId === 'workflow-1')).toBe(true)
|
||||
})
|
||||
|
||||
it('should return empty when folder and workflow filters conflict', async () => {
|
||||
// Try to filter by folder-1 but workflow-2 (which is in folder-2)
|
||||
setupDatabaseMock({ logs: [] })
|
||||
|
||||
const url = new URL(
|
||||
'http://localhost:3000/api/logs?workspaceId=workspace-123&folderIds=folder-1&workflowIds=workflow-2'
|
||||
)
|
||||
const req = new Request(url.toString())
|
||||
|
||||
const { GET } = await import('./route')
|
||||
const response = await GET(req as any)
|
||||
const data = await response.json()
|
||||
|
||||
expect(response.status).toBe(200)
|
||||
expect(data.data).toEqual([])
|
||||
expect(data.total).toBe(0)
|
||||
})
|
||||
|
||||
it('should combine folder filter with other filters', async () => {
|
||||
const filteredLogs = mockWorkflowLogs.filter(
|
||||
(log) => log.workflowId === 'workflow-1' && log.level === 'info'
|
||||
)
|
||||
setupDatabaseMock({ logs: filteredLogs })
|
||||
|
||||
const url = new URL(
|
||||
'http://localhost:3000/api/logs?workspaceId=workspace-123&folderIds=folder-1&level=info'
|
||||
)
|
||||
const req = new Request(url.toString())
|
||||
|
||||
const { GET } = await import('./route')
|
||||
const response = await GET(req as any)
|
||||
const data = await response.json()
|
||||
|
||||
expect(response.status).toBe(200)
|
||||
expect(data.data).toHaveLength(1)
|
||||
expect(data.data[0].workflowId).toBe('workflow-1')
|
||||
expect(data.data[0].level).toBe('info')
|
||||
})
|
||||
|
||||
it('should return empty result when no workflows match folder filter', async () => {
|
||||
setupDatabaseMock({ logs: [] })
|
||||
|
||||
const url = new URL(
|
||||
'http://localhost:3000/api/logs?workspaceId=workspace-123&folderIds=non-existent-folder'
|
||||
)
|
||||
const req = new Request(url.toString())
|
||||
|
||||
const { GET } = await import('./route')
|
||||
const response = await GET(req as any)
|
||||
const data = await response.json()
|
||||
|
||||
expect(response.status).toBe(200)
|
||||
expect(data.data).toEqual([])
|
||||
expect(data.total).toBe(0)
|
||||
})
|
||||
|
||||
it('should handle folder filter with includeWorkflow=true', async () => {
|
||||
const folder1Logs = mockWorkflowLogs.filter((log) => log.workflowId === 'workflow-1')
|
||||
setupDatabaseMock({ logs: folder1Logs })
|
||||
|
||||
const url = new URL(
|
||||
'http://localhost:3000/api/logs?workspaceId=workspace-123&folderIds=folder-1&includeWorkflow=true'
|
||||
)
|
||||
const req = new Request(url.toString())
|
||||
|
||||
const { GET } = await import('./route')
|
||||
const response = await GET(req as any)
|
||||
const data = await response.json()
|
||||
|
||||
expect(response.status).toBe(200)
|
||||
expect(data.data).toHaveLength(2)
|
||||
expect(data.data[0]).toHaveProperty('workflow')
|
||||
expect(data.data[0].workflow).toHaveProperty('name')
|
||||
expect(data.data.every((log: any) => log.workflowId === 'workflow-1')).toBe(true)
|
||||
})
|
||||
})
|
||||
})
|
||||
@@ -1,246 +0,0 @@
|
||||
import { and, eq, gte, lte, or, type SQL, sql } from 'drizzle-orm'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { z } from 'zod'
|
||||
import { getSession } from '@/lib/auth'
|
||||
import { createLogger } from '@/lib/logs/console-logger'
|
||||
import { db } from '@/db'
|
||||
import { workflow, workflowLogs } from '@/db/schema'
|
||||
|
||||
const logger = createLogger('WorkflowLogsAPI')
|
||||
|
||||
export const dynamic = 'force-dynamic'
|
||||
export const revalidate = 0
|
||||
|
||||
const QueryParamsSchema = z.object({
|
||||
includeWorkflow: z.enum(['true', 'false']).optional().default('false'),
|
||||
limit: z.coerce.number().optional().default(100),
|
||||
offset: z.coerce.number().optional().default(0),
|
||||
level: z.string().optional(),
|
||||
workflowIds: z.string().optional(), // Comma-separated list of workflow IDs
|
||||
folderIds: z.string().optional(), // Comma-separated list of folder IDs
|
||||
triggers: z.string().optional(), // Comma-separated list of trigger types
|
||||
startDate: z.string().optional(),
|
||||
endDate: z.string().optional(),
|
||||
search: z.string().optional(),
|
||||
workspaceId: z.string(),
|
||||
})
|
||||
|
||||
// Used to retrieve and display workflow logs
|
||||
export async function GET(request: NextRequest) {
|
||||
const requestId = crypto.randomUUID().slice(0, 8)
|
||||
|
||||
try {
|
||||
const session = await getSession()
|
||||
if (!session?.user?.id) {
|
||||
logger.warn(`[${requestId}] Unauthorized workflow logs access attempt`)
|
||||
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
|
||||
}
|
||||
|
||||
const userId = session.user.id
|
||||
|
||||
try {
|
||||
const { searchParams } = new URL(request.url)
|
||||
const params = QueryParamsSchema.parse(Object.fromEntries(searchParams.entries()))
|
||||
|
||||
const workflowConditions = and(
|
||||
eq(workflow.workspaceId, params.workspaceId),
|
||||
eq(workflow.userId, userId)
|
||||
)
|
||||
|
||||
const userWorkflows = await db
|
||||
.select({ id: workflow.id, folderId: workflow.folderId })
|
||||
.from(workflow)
|
||||
.where(workflowConditions)
|
||||
|
||||
const userWorkflowIds = userWorkflows.map((w) => w.id)
|
||||
|
||||
if (userWorkflowIds.length === 0) {
|
||||
return NextResponse.json({ data: [], total: 0 }, { status: 200 })
|
||||
}
|
||||
|
||||
// Handle folder filtering
|
||||
let targetWorkflowIds = userWorkflowIds
|
||||
if (params.folderIds) {
|
||||
const requestedFolderIds = params.folderIds.split(',').map((id) => id.trim())
|
||||
|
||||
// Filter workflows by folder IDs (including 'root' for workflows without folders)
|
||||
const workflowsInFolders = userWorkflows.filter((w) => {
|
||||
if (requestedFolderIds.includes('root')) {
|
||||
return requestedFolderIds.includes('root') && w.folderId === null
|
||||
}
|
||||
return w.folderId && requestedFolderIds.includes(w.folderId)
|
||||
})
|
||||
|
||||
// Handle 'root' folder (workflows without folders)
|
||||
if (requestedFolderIds.includes('root')) {
|
||||
const rootWorkflows = userWorkflows.filter((w) => w.folderId === null)
|
||||
const folderWorkflows = userWorkflows.filter(
|
||||
(w) =>
|
||||
w.folderId && requestedFolderIds.filter((id) => id !== 'root').includes(w.folderId!)
|
||||
)
|
||||
targetWorkflowIds = [...rootWorkflows, ...folderWorkflows].map((w) => w.id)
|
||||
} else {
|
||||
targetWorkflowIds = workflowsInFolders.map((w) => w.id)
|
||||
}
|
||||
|
||||
if (targetWorkflowIds.length === 0) {
|
||||
return NextResponse.json({ data: [], total: 0 }, { status: 200 })
|
||||
}
|
||||
}
|
||||
|
||||
// Build the conditions for the query
|
||||
let conditions: SQL<unknown> | undefined
|
||||
|
||||
// Apply workflow filtering
|
||||
if (params.workflowIds) {
|
||||
const requestedWorkflowIds = params.workflowIds.split(',').map((id) => id.trim())
|
||||
// Ensure all requested workflows belong to the user
|
||||
const unauthorizedIds = requestedWorkflowIds.filter((id) => !userWorkflowIds.includes(id))
|
||||
if (unauthorizedIds.length > 0) {
|
||||
logger.warn(`[${requestId}] Unauthorized access to workflow logs`, {
|
||||
unauthorizedWorkflowIds: unauthorizedIds,
|
||||
})
|
||||
return NextResponse.json({ error: 'Unauthorized access to workflows' }, { status: 403 })
|
||||
}
|
||||
// Further filter by folder constraints if both filters are active
|
||||
const finalWorkflowIds = params.folderIds
|
||||
? requestedWorkflowIds.filter((id) => targetWorkflowIds.includes(id))
|
||||
: requestedWorkflowIds
|
||||
|
||||
if (finalWorkflowIds.length === 0) {
|
||||
return NextResponse.json({ data: [], total: 0 }, { status: 200 })
|
||||
}
|
||||
conditions = or(...finalWorkflowIds.map((id) => eq(workflowLogs.workflowId, id)))
|
||||
} else {
|
||||
// No specific workflows requested, filter by target workflows (considering folder filter)
|
||||
if (targetWorkflowIds.length === 1) {
|
||||
conditions = eq(workflowLogs.workflowId, targetWorkflowIds[0])
|
||||
} else {
|
||||
conditions = or(...targetWorkflowIds.map((id) => eq(workflowLogs.workflowId, id)))
|
||||
}
|
||||
}
|
||||
|
||||
// Apply additional filters if provided
|
||||
if (params.level) {
|
||||
conditions = and(conditions, eq(workflowLogs.level, params.level))
|
||||
}
|
||||
|
||||
if (params.triggers) {
|
||||
const triggerTypes = params.triggers.split(',').map((trigger) => trigger.trim())
|
||||
if (triggerTypes.length === 1) {
|
||||
conditions = and(conditions, eq(workflowLogs.trigger, triggerTypes[0]))
|
||||
} else {
|
||||
conditions = and(
|
||||
conditions,
|
||||
or(...triggerTypes.map((trigger) => eq(workflowLogs.trigger, trigger)))
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
if (params.startDate) {
|
||||
const startDate = new Date(params.startDate)
|
||||
conditions = and(conditions, gte(workflowLogs.createdAt, startDate))
|
||||
}
|
||||
|
||||
if (params.endDate) {
|
||||
const endDate = new Date(params.endDate)
|
||||
conditions = and(conditions, lte(workflowLogs.createdAt, endDate))
|
||||
}
|
||||
|
||||
if (params.search) {
|
||||
const searchTerm = `%${params.search}%`
|
||||
conditions = and(
|
||||
conditions,
|
||||
or(
|
||||
sql`${workflowLogs.message} ILIKE ${searchTerm}`,
|
||||
sql`${workflowLogs.executionId} ILIKE ${searchTerm}`
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
// Execute the query with all conditions
|
||||
const logs = await db
|
||||
.select()
|
||||
.from(workflowLogs)
|
||||
.where(conditions)
|
||||
.orderBy(sql`${workflowLogs.createdAt} DESC`)
|
||||
.limit(params.limit)
|
||||
.offset(params.offset)
|
||||
|
||||
// Get total count for pagination
|
||||
const countResult = await db
|
||||
.select({ count: sql<number>`count(*)` })
|
||||
.from(workflowLogs)
|
||||
.where(conditions)
|
||||
|
||||
const count = countResult[0]?.count || 0
|
||||
|
||||
// If includeWorkflow is true, fetch the associated workflow data
|
||||
if (params.includeWorkflow === 'true' && logs.length > 0) {
|
||||
// Get unique workflow IDs from logs
|
||||
const uniqueWorkflowIds = [...new Set(logs.map((log) => log.workflowId))]
|
||||
|
||||
// Create conditions for workflow query
|
||||
let workflowConditions: SQL<unknown> | undefined
|
||||
|
||||
if (uniqueWorkflowIds.length === 1) {
|
||||
workflowConditions = eq(workflow.id, uniqueWorkflowIds[0])
|
||||
} else {
|
||||
workflowConditions = or(...uniqueWorkflowIds.map((id) => eq(workflow.id, id)))
|
||||
}
|
||||
|
||||
// Fetch workflows
|
||||
const workflowData = await db.select().from(workflow).where(workflowConditions)
|
||||
|
||||
// Create a map of workflow data for easy lookup
|
||||
const workflowMap = new Map(workflowData.map((w) => [w.id, w]))
|
||||
|
||||
// Attach workflow data to each log
|
||||
const logsWithWorkflow = logs.map((log) => ({
|
||||
...log,
|
||||
workflow: workflowMap.get(log.workflowId) || null,
|
||||
}))
|
||||
|
||||
return NextResponse.json(
|
||||
{
|
||||
data: logsWithWorkflow,
|
||||
total: Number(count),
|
||||
page: Math.floor(params.offset / params.limit) + 1,
|
||||
pageSize: params.limit,
|
||||
totalPages: Math.ceil(Number(count) / params.limit),
|
||||
},
|
||||
{ status: 200 }
|
||||
)
|
||||
}
|
||||
|
||||
// Return logs without workflow data
|
||||
return NextResponse.json(
|
||||
{
|
||||
data: logs,
|
||||
total: Number(count),
|
||||
page: Math.floor(params.offset / params.limit) + 1,
|
||||
pageSize: params.limit,
|
||||
totalPages: Math.ceil(Number(count) / params.limit),
|
||||
},
|
||||
{ status: 200 }
|
||||
)
|
||||
} catch (validationError) {
|
||||
if (validationError instanceof z.ZodError) {
|
||||
logger.warn(`[${requestId}] Invalid workflow logs request parameters`, {
|
||||
errors: validationError.errors,
|
||||
})
|
||||
return NextResponse.json(
|
||||
{
|
||||
error: 'Invalid request parameters',
|
||||
details: validationError.errors,
|
||||
},
|
||||
{ status: 400 }
|
||||
)
|
||||
}
|
||||
throw validationError
|
||||
}
|
||||
} catch (error: any) {
|
||||
logger.error(`[${requestId}] Workflow logs fetch error`, error)
|
||||
return NextResponse.json({ error: error.message }, { status: 500 })
|
||||
}
|
||||
}
|
||||
@@ -131,12 +131,6 @@ describe('Scheduled Workflow Execution API Route', () => {
|
||||
})
|
||||
|
||||
it('should handle errors during scheduled execution gracefully', async () => {
|
||||
const persistExecutionErrorMock = vi.fn().mockResolvedValue(undefined)
|
||||
|
||||
vi.doMock('@/lib/logs/execution-logger', () => ({
|
||||
persistExecutionError: persistExecutionErrorMock,
|
||||
}))
|
||||
|
||||
vi.doMock('@/executor', () => ({
|
||||
Executor: vi.fn().mockImplementation(() => ({
|
||||
execute: vi.fn().mockRejectedValue(new Error('Execution failed')),
|
||||
|
||||
@@ -32,7 +32,6 @@ const executeMock = vi.fn().mockResolvedValue({
|
||||
endTime: new Date().toISOString(),
|
||||
},
|
||||
})
|
||||
const persistExecutionErrorMock = vi.fn().mockResolvedValue(undefined)
|
||||
|
||||
// Mock the DB schema objects
|
||||
const webhookMock = {
|
||||
@@ -78,10 +77,6 @@ vi.mock('@/executor', () => ({
|
||||
})),
|
||||
}))
|
||||
|
||||
vi.mock('@/lib/logs/execution-logger', () => ({
|
||||
persistExecutionError: persistExecutionErrorMock,
|
||||
}))
|
||||
|
||||
// Mock setTimeout and other timer functions
|
||||
vi.mock('timers', () => {
|
||||
return {
|
||||
|
||||
@@ -157,11 +157,6 @@ describe('Workflow Execution API Route', () => {
|
||||
getRotatingApiKey: vi.fn().mockReturnValue('rotated-api-key'),
|
||||
}))
|
||||
|
||||
vi.doMock('@/lib/logs/execution-logger', () => ({
|
||||
persistExecutionLogs: vi.fn().mockResolvedValue(undefined),
|
||||
persistExecutionError: vi.fn().mockResolvedValue(undefined),
|
||||
}))
|
||||
|
||||
vi.doMock('@/lib/logs/enhanced-logging-session', () => ({
|
||||
EnhancedLoggingSession: vi.fn().mockImplementation(() => ({
|
||||
safeStart: vi.fn().mockResolvedValue(undefined),
|
||||
|
||||
@@ -23,7 +23,7 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
|
||||
const body = await request.json()
|
||||
const { logs, executionId, result } = body
|
||||
|
||||
// If result is provided, use persistExecutionLogs for full tool call extraction
|
||||
// If result is provided, use enhanced logging system for full tool call extraction
|
||||
if (result) {
|
||||
logger.info(`[${requestId}] Persisting execution result for workflow: ${id}`, {
|
||||
executionId,
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
DROP TABLE "workflow_logs" CASCADE;
|
||||
5529
apps/sim/db/migrations/meta/0062_snapshot.json
Normal file
5529
apps/sim/db/migrations/meta/0062_snapshot.json
Normal file
File diff suppressed because it is too large
Load Diff
@@ -428,6 +428,13 @@
|
||||
"when": 1753380613269,
|
||||
"tag": "0061_swift_doctor_spectrum",
|
||||
"breakpoints": true
|
||||
},
|
||||
{
|
||||
"idx": 62,
|
||||
"version": "7",
|
||||
"when": 1753383446084,
|
||||
"tag": "0062_previous_phantom_reporter",
|
||||
"breakpoints": true
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
@@ -245,30 +245,6 @@ export const waitlist = pgTable('waitlist', {
|
||||
updatedAt: timestamp('updated_at').notNull().defaultNow(),
|
||||
})
|
||||
|
||||
export const workflowLogs = pgTable(
|
||||
'workflow_logs',
|
||||
{
|
||||
id: text('id').primaryKey(),
|
||||
workflowId: text('workflow_id')
|
||||
.notNull()
|
||||
.references(() => workflow.id, { onDelete: 'cascade' }),
|
||||
executionId: text('execution_id'),
|
||||
level: text('level').notNull(), // "info", "error", etc.
|
||||
message: text('message').notNull(),
|
||||
duration: text('duration'), // Store as text to allow 'NA' for errors
|
||||
trigger: text('trigger'), // "api", "schedule", "manual"
|
||||
createdAt: timestamp('created_at').notNull().defaultNow(),
|
||||
metadata: json('metadata'),
|
||||
},
|
||||
(table) => ({
|
||||
workflowIdIdx: index('workflow_logs_workflow_id_idx').on(table.workflowId),
|
||||
workflowCreatedIdx: index('workflow_logs_workflow_created_idx').on(
|
||||
table.workflowId,
|
||||
table.createdAt
|
||||
),
|
||||
})
|
||||
)
|
||||
|
||||
export const workflowExecutionSnapshots = pgTable(
|
||||
'workflow_execution_snapshots',
|
||||
{
|
||||
|
||||
@@ -1,928 +0,0 @@
|
||||
import { eq, sql } from 'drizzle-orm'
|
||||
import { v4 as uuidv4 } from 'uuid'
|
||||
import { getCostMultiplier } from '@/lib/environment'
|
||||
import { createLogger } from '@/lib/logs/console-logger'
|
||||
import { redactApiKeys } from '@/lib/utils'
|
||||
import { stripCustomToolPrefix } from '@/lib/workflows/utils'
|
||||
import { db } from '@/db'
|
||||
import { userStats, workflow, workflowLogs } from '@/db/schema'
|
||||
import type { ExecutionResult as ExecutorResult } from '@/executor/types'
|
||||
import { calculateCost } from '@/providers/utils'
|
||||
|
||||
const logger = createLogger('ExecutionLogger')
|
||||
|
||||
export interface LogEntry {
|
||||
id: string
|
||||
workflowId: string
|
||||
executionId: string
|
||||
level: string
|
||||
message: string
|
||||
createdAt: Date
|
||||
duration?: string
|
||||
trigger?: string
|
||||
metadata?: ToolCallMetadata | Record<string, any>
|
||||
}
|
||||
|
||||
export interface ToolCallMetadata {
|
||||
toolCalls?: ToolCall[]
|
||||
cost?: {
|
||||
model?: string
|
||||
input?: number
|
||||
output?: number
|
||||
total?: number
|
||||
tokens?: {
|
||||
prompt?: number
|
||||
completion?: number
|
||||
total?: number
|
||||
}
|
||||
pricing?: {
|
||||
input: number
|
||||
output: number
|
||||
cachedInput?: number
|
||||
updatedAt: string
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export interface ToolCall {
|
||||
name: string
|
||||
duration: number // in milliseconds
|
||||
startTime: string // ISO timestamp
|
||||
endTime: string // ISO timestamp
|
||||
status: 'success' | 'error' // Status of the tool call
|
||||
input?: Record<string, any> // Input parameters (optional)
|
||||
output?: Record<string, any> // Output data (optional)
|
||||
error?: string // Error message if status is 'error'
|
||||
}
|
||||
|
||||
export async function persistLog(log: LogEntry) {
|
||||
await db.insert(workflowLogs).values(log)
|
||||
}
|
||||
|
||||
/**
|
||||
* Persists logs for a workflow execution, including individual block logs and the final result
|
||||
* @param workflowId - The ID of the workflow
|
||||
* @param executionId - The ID of the execution
|
||||
* @param result - The execution result
|
||||
* @param triggerType - The type of trigger (api, webhook, schedule, manual, chat)
|
||||
*/
|
||||
export async function persistExecutionLogs(
|
||||
workflowId: string,
|
||||
executionId: string,
|
||||
result: ExecutorResult,
|
||||
triggerType: 'api' | 'webhook' | 'schedule' | 'manual' | 'chat'
|
||||
) {
|
||||
try {
|
||||
// Get the workflow record to get the userId
|
||||
const [workflowRecord] = await db
|
||||
.select()
|
||||
.from(workflow)
|
||||
.where(eq(workflow.id, workflowId))
|
||||
.limit(1)
|
||||
|
||||
if (!workflowRecord) {
|
||||
logger.error(`Workflow ${workflowId} not found`)
|
||||
return
|
||||
}
|
||||
|
||||
const userId = workflowRecord.userId
|
||||
|
||||
// Track accumulated cost data across all LLM blocks (agent, router, and evaluator)
|
||||
let totalCost = 0
|
||||
let totalInputCost = 0
|
||||
let totalOutputCost = 0
|
||||
let totalPromptTokens = 0
|
||||
let totalCompletionTokens = 0
|
||||
let totalTokens = 0
|
||||
const modelCounts: Record<string, number> = {}
|
||||
let primaryModel = ''
|
||||
|
||||
// Log each execution step
|
||||
for (const log of result.logs || []) {
|
||||
// Check for agent block and tool calls
|
||||
let metadata: ToolCallMetadata | undefined
|
||||
|
||||
// If this is an agent, router, or evaluator block (all use LLM providers and generate costs)
|
||||
if (
|
||||
(log.blockType === 'agent' ||
|
||||
log.blockType === 'router' ||
|
||||
log.blockType === 'evaluator') &&
|
||||
log.output
|
||||
) {
|
||||
logger.debug('Processing LLM-based block output for tool calls and cost tracking', {
|
||||
blockId: log.blockId,
|
||||
blockName: log.blockName,
|
||||
blockType: log.blockType,
|
||||
outputKeys: Object.keys(log.output),
|
||||
hasToolCalls: !!log.output.toolCalls,
|
||||
hasResponse: !!log.output,
|
||||
})
|
||||
|
||||
// FIRST PASS - Check if this is a no-tool scenario with tokens data not propagated
|
||||
// In some cases, the token data from the streaming callback doesn't properly get into
|
||||
// the agent block response. This ensures we capture it.
|
||||
if (
|
||||
log.output &&
|
||||
(!log.output.tokens?.completion || log.output.tokens.completion === 0) &&
|
||||
(!log.output.toolCalls ||
|
||||
!log.output.toolCalls.list ||
|
||||
log.output.toolCalls.list.length === 0)
|
||||
) {
|
||||
// Check if output has providerTiming - this indicates it's a streaming response
|
||||
if (log.output.providerTiming) {
|
||||
logger.debug('Processing streaming response without tool calls for token extraction', {
|
||||
blockId: log.blockId,
|
||||
hasTokens: !!log.output.tokens,
|
||||
hasProviderTiming: !!log.output.providerTiming,
|
||||
})
|
||||
|
||||
// Only for no-tool streaming cases, extract content length and estimate token count
|
||||
const contentLength = log.output.content?.length || 0
|
||||
if (contentLength > 0) {
|
||||
// Estimate completion tokens based on content length as a fallback
|
||||
const estimatedCompletionTokens = Math.ceil(contentLength / 4)
|
||||
const promptTokens = log.output.tokens?.prompt || 8
|
||||
|
||||
// Update the tokens object
|
||||
log.output.tokens = {
|
||||
prompt: promptTokens,
|
||||
completion: estimatedCompletionTokens,
|
||||
total: promptTokens + estimatedCompletionTokens,
|
||||
}
|
||||
|
||||
// Update cost information using the provider's cost model
|
||||
const model = log.output.model || 'gpt-4o'
|
||||
const costInfo = calculateCost(model, promptTokens, estimatedCompletionTokens)
|
||||
log.output.cost = {
|
||||
input: costInfo.input,
|
||||
output: costInfo.output,
|
||||
total: costInfo.total,
|
||||
pricing: costInfo.pricing,
|
||||
}
|
||||
|
||||
logger.debug('Updated token information for streaming no-tool response', {
|
||||
blockId: log.blockId,
|
||||
contentLength,
|
||||
estimatedCompletionTokens,
|
||||
tokens: log.output.tokens,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Special case for streaming responses from LLM blocks (agent, router, and evaluator)
|
||||
// This format has both stream and executionData properties
|
||||
if (log.output.stream && log.output.executionData) {
|
||||
logger.debug('Found streaming response with executionData', {
|
||||
blockId: log.blockId,
|
||||
hasExecutionData: !!log.output.executionData,
|
||||
executionDataKeys: log.output.executionData
|
||||
? Object.keys(log.output.executionData)
|
||||
: [],
|
||||
})
|
||||
|
||||
// Extract the executionData and use it as our primary source of information
|
||||
const executionData = log.output.executionData
|
||||
|
||||
// If executionData has output, merge it with our output
|
||||
// This is especially important for streaming responses where the final content
|
||||
// is set in the executionData structure by the executor
|
||||
if (executionData.output) {
|
||||
log.output = { ...log.output, ...executionData.output }
|
||||
logger.debug('Using output from executionData', {
|
||||
outputKeys: Object.keys(log.output),
|
||||
hasContent: !!log.output.content,
|
||||
contentLength: log.output.content?.length || 0,
|
||||
hasToolCalls: !!log.output.toolCalls,
|
||||
hasTokens: !!log.output.tokens,
|
||||
hasCost: !!log.output.cost,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Add cost information if available
|
||||
if (log.output?.cost) {
|
||||
const output = log.output
|
||||
if (!metadata) metadata = {}
|
||||
metadata.cost = {
|
||||
model: output.model,
|
||||
input: output.cost.input,
|
||||
output: output.cost.output,
|
||||
total: output.cost.total,
|
||||
tokens: output.tokens,
|
||||
pricing: output.cost.pricing,
|
||||
}
|
||||
|
||||
// Accumulate costs for workflow-level summary
|
||||
if (output.cost.total) {
|
||||
totalCost += output.cost.total
|
||||
totalInputCost += output.cost.input || 0
|
||||
totalOutputCost += output.cost.output || 0
|
||||
|
||||
// Track tokens
|
||||
if (output.tokens) {
|
||||
totalPromptTokens += output.tokens.prompt || 0
|
||||
totalCompletionTokens += output.tokens.completion || 0
|
||||
totalTokens += output.tokens.total || 0
|
||||
}
|
||||
|
||||
// Track model usage
|
||||
if (output.model) {
|
||||
modelCounts[output.model] = (modelCounts[output.model] || 0) + 1
|
||||
// Set the most frequently used model as primary
|
||||
if (!primaryModel || modelCounts[output.model] > modelCounts[primaryModel]) {
|
||||
primaryModel = output.model
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Extract timing info - try various formats that providers might use
|
||||
const blockStartTime = log.startedAt
|
||||
const blockEndTime = log.endedAt || new Date().toISOString()
|
||||
const blockDuration = log.durationMs || 0
|
||||
let toolCallData: any[] = []
|
||||
|
||||
// Case 1: Direct toolCalls array
|
||||
if (Array.isArray(log.output.toolCalls)) {
|
||||
// Log raw timing data for debugging
|
||||
log.output.toolCalls.forEach((tc: any, idx: number) => {
|
||||
logger.debug(`Tool call ${idx} raw timing data:`, {
|
||||
name: stripCustomToolPrefix(tc.name),
|
||||
startTime: tc.startTime,
|
||||
endTime: tc.endTime,
|
||||
duration: tc.duration,
|
||||
timing: tc.timing,
|
||||
argumentKeys: tc.arguments ? Object.keys(tc.arguments) : undefined,
|
||||
})
|
||||
})
|
||||
|
||||
toolCallData = log.output.toolCalls.map((toolCall: any) => {
|
||||
// Extract timing info - try various formats that providers might use
|
||||
const duration = extractDuration(toolCall)
|
||||
const timing = extractTimingInfo(
|
||||
toolCall,
|
||||
blockStartTime ? new Date(blockStartTime) : undefined,
|
||||
blockEndTime ? new Date(blockEndTime) : undefined
|
||||
)
|
||||
|
||||
return {
|
||||
name: toolCall.name,
|
||||
duration: duration,
|
||||
startTime: timing.startTime,
|
||||
endTime: timing.endTime,
|
||||
status: toolCall.error ? 'error' : 'success',
|
||||
input: toolCall.input || toolCall.arguments,
|
||||
output: toolCall.output || toolCall.result,
|
||||
error: toolCall.error,
|
||||
}
|
||||
})
|
||||
}
|
||||
// Case 2: toolCalls with a list array (as seen in the screenshot)
|
||||
else if (log.output.toolCalls && Array.isArray(log.output.toolCalls.list)) {
|
||||
// Log raw timing data for debugging
|
||||
log.output.toolCalls.list.forEach((tc: any, idx: number) => {
|
||||
logger.debug(`Tool call list ${idx} raw timing data:`, {
|
||||
name: stripCustomToolPrefix(tc.name),
|
||||
startTime: tc.startTime,
|
||||
endTime: tc.endTime,
|
||||
duration: tc.duration,
|
||||
timing: tc.timing,
|
||||
argumentKeys: tc.arguments ? Object.keys(tc.arguments) : undefined,
|
||||
})
|
||||
})
|
||||
|
||||
toolCallData = log.output.toolCalls.list.map((toolCall: any) => {
|
||||
// Extract timing info - try various formats that providers might use
|
||||
const duration = extractDuration(toolCall)
|
||||
const timing = extractTimingInfo(
|
||||
toolCall,
|
||||
blockStartTime ? new Date(blockStartTime) : undefined,
|
||||
blockEndTime ? new Date(blockEndTime) : undefined
|
||||
)
|
||||
|
||||
// Log what we extracted
|
||||
logger.debug('Tool call list timing extracted:', {
|
||||
name: toolCall.name,
|
||||
extracted_duration: duration,
|
||||
extracted_startTime: timing.startTime,
|
||||
extracted_endTime: timing.endTime,
|
||||
})
|
||||
|
||||
return {
|
||||
name: toolCall.name,
|
||||
duration: duration,
|
||||
startTime: timing.startTime,
|
||||
endTime: timing.endTime,
|
||||
status: toolCall.error ? 'error' : 'success',
|
||||
input: toolCall.arguments || toolCall.input,
|
||||
output: toolCall.result || toolCall.output,
|
||||
error: toolCall.error,
|
||||
}
|
||||
})
|
||||
}
|
||||
// Case 3: toolCalls is an object and has a list property
|
||||
else if (
|
||||
log.output.toolCalls &&
|
||||
typeof log.output.toolCalls === 'object' &&
|
||||
log.output.toolCalls.list
|
||||
) {
|
||||
const toolCalls = log.output.toolCalls
|
||||
|
||||
logger.debug('Found toolCalls object with list property', {
|
||||
count: toolCalls.list.length,
|
||||
})
|
||||
|
||||
// Log raw timing data for debugging
|
||||
toolCalls.list.forEach((tc: any, idx: number) => {
|
||||
logger.debug(`toolCalls object list ${idx} raw timing data:`, {
|
||||
name: stripCustomToolPrefix(tc.name),
|
||||
startTime: tc.startTime,
|
||||
endTime: tc.endTime,
|
||||
duration: tc.duration,
|
||||
timing: tc.timing,
|
||||
argumentKeys: tc.arguments ? Object.keys(tc.arguments) : undefined,
|
||||
})
|
||||
})
|
||||
|
||||
toolCallData = toolCalls.list.map((toolCall: any) => {
|
||||
// Extract timing info - try various formats that providers might use
|
||||
const duration = extractDuration(toolCall)
|
||||
const timing = extractTimingInfo(
|
||||
toolCall,
|
||||
blockStartTime ? new Date(blockStartTime) : undefined,
|
||||
blockEndTime ? new Date(blockEndTime) : undefined
|
||||
)
|
||||
|
||||
// Log what we extracted
|
||||
logger.debug('toolCalls object list timing extracted:', {
|
||||
name: toolCall.name,
|
||||
extracted_duration: duration,
|
||||
extracted_startTime: timing.startTime,
|
||||
extracted_endTime: timing.endTime,
|
||||
})
|
||||
|
||||
return {
|
||||
name: toolCall.name,
|
||||
duration: duration,
|
||||
startTime: timing.startTime,
|
||||
endTime: timing.endTime,
|
||||
status: toolCall.error ? 'error' : 'success',
|
||||
input: toolCall.arguments || toolCall.input,
|
||||
output: toolCall.result || toolCall.output,
|
||||
error: toolCall.error,
|
||||
}
|
||||
})
|
||||
}
|
||||
// Case 4: Look in executionData.output for streaming responses
|
||||
else if (log.output.executionData?.output?.toolCalls) {
|
||||
const toolCallsObj = log.output.executionData.output.toolCalls
|
||||
const list = Array.isArray(toolCallsObj) ? toolCallsObj : toolCallsObj.list || []
|
||||
|
||||
logger.debug('Found toolCalls in executionData output response', {
|
||||
count: list.length,
|
||||
})
|
||||
|
||||
// Log raw timing data for debugging
|
||||
list.forEach((tc: any, idx: number) => {
|
||||
logger.debug(`executionData toolCalls ${idx} raw timing data:`, {
|
||||
name: stripCustomToolPrefix(tc.name),
|
||||
startTime: tc.startTime,
|
||||
endTime: tc.endTime,
|
||||
duration: tc.duration,
|
||||
timing: tc.timing,
|
||||
argumentKeys: tc.arguments ? Object.keys(tc.arguments) : undefined,
|
||||
})
|
||||
})
|
||||
|
||||
toolCallData = list.map((toolCall: any) => {
|
||||
// Extract timing info - try various formats that providers might use
|
||||
const duration = extractDuration(toolCall)
|
||||
const timing = extractTimingInfo(
|
||||
toolCall,
|
||||
blockStartTime ? new Date(blockStartTime) : undefined,
|
||||
blockEndTime ? new Date(blockEndTime) : undefined
|
||||
)
|
||||
|
||||
return {
|
||||
name: toolCall.name,
|
||||
duration: duration,
|
||||
startTime: timing.startTime,
|
||||
endTime: timing.endTime,
|
||||
status: toolCall.error ? 'error' : 'success',
|
||||
input: toolCall.arguments || toolCall.input,
|
||||
output: toolCall.result || toolCall.output,
|
||||
error: toolCall.error,
|
||||
}
|
||||
})
|
||||
}
|
||||
// Case 5: Parse the output string for toolCalls as a last resort
|
||||
else if (typeof log.output === 'string') {
|
||||
const match = log.output.match(/"toolCalls"\s*:\s*({[^}]*}|(\[.*?\]))/s)
|
||||
if (match) {
|
||||
try {
|
||||
const toolCallsJson = JSON.parse(`{${match[0]}}`)
|
||||
const list = Array.isArray(toolCallsJson.toolCalls)
|
||||
? toolCallsJson.toolCalls
|
||||
: toolCallsJson.toolCalls.list || []
|
||||
|
||||
logger.debug('Found toolCalls in parsed response string', {
|
||||
count: list.length,
|
||||
})
|
||||
|
||||
// Log raw timing data for debugging
|
||||
list.forEach((tc: any, idx: number) => {
|
||||
logger.debug(`Parsed response ${idx} raw timing data:`, {
|
||||
name: stripCustomToolPrefix(tc.name),
|
||||
startTime: tc.startTime,
|
||||
endTime: tc.endTime,
|
||||
duration: tc.duration,
|
||||
timing: tc.timing,
|
||||
argumentKeys: tc.arguments ? Object.keys(tc.arguments) : undefined,
|
||||
})
|
||||
})
|
||||
|
||||
toolCallData = list.map((toolCall: any) => {
|
||||
// Extract timing info - try various formats that providers might use
|
||||
const duration = extractDuration(toolCall)
|
||||
const timing = extractTimingInfo(
|
||||
toolCall,
|
||||
blockStartTime ? new Date(blockStartTime) : undefined,
|
||||
blockEndTime ? new Date(blockEndTime) : undefined
|
||||
)
|
||||
|
||||
// Log what we extracted
|
||||
logger.debug('Parsed response timing extracted:', {
|
||||
name: toolCall.name,
|
||||
extracted_duration: duration,
|
||||
extracted_startTime: timing.startTime,
|
||||
extracted_endTime: timing.endTime,
|
||||
})
|
||||
|
||||
return {
|
||||
name: toolCall.name,
|
||||
duration: duration,
|
||||
startTime: timing.startTime,
|
||||
endTime: timing.endTime,
|
||||
status: toolCall.error ? 'error' : 'success',
|
||||
input: toolCall.arguments || toolCall.input,
|
||||
output: toolCall.result || toolCall.output,
|
||||
error: toolCall.error,
|
||||
}
|
||||
})
|
||||
} catch (error) {
|
||||
logger.error('Error parsing toolCalls from output string', {
|
||||
error,
|
||||
output: log.output,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
// Verbose output debugging as a fallback
|
||||
else {
|
||||
logger.debug('Could not find tool calls in standard formats, output data:', {
|
||||
outputSample: `${JSON.stringify(log.output).substring(0, 500)}...`,
|
||||
})
|
||||
}
|
||||
|
||||
// Fill in missing timing information and merge with existing metadata
|
||||
if (toolCallData.length > 0) {
|
||||
const getToolCalls = getToolCallTimings(
|
||||
toolCallData,
|
||||
blockStartTime,
|
||||
blockEndTime,
|
||||
blockDuration
|
||||
)
|
||||
|
||||
const redactedToolCalls = getToolCalls.map((toolCall) => ({
|
||||
...toolCall,
|
||||
input: redactApiKeys(toolCall.input),
|
||||
}))
|
||||
|
||||
// Merge with existing metadata instead of overwriting
|
||||
if (!metadata) metadata = {}
|
||||
metadata.toolCalls = redactedToolCalls
|
||||
|
||||
logger.debug('Added tool calls to metadata', {
|
||||
count: redactedToolCalls.length,
|
||||
existingMetadata: Object.keys(metadata).filter((k) => k !== 'toolCalls'),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
await persistLog({
|
||||
id: uuidv4(),
|
||||
workflowId,
|
||||
executionId,
|
||||
level: log.success ? 'info' : 'error',
|
||||
message: log.success
|
||||
? `Block ${log.blockName || log.blockId} (${log.blockType || 'unknown'}): ${
|
||||
log.output?.content ||
|
||||
log.output?.executionData?.output?.content ||
|
||||
JSON.stringify(log.output || {})
|
||||
}`
|
||||
: `Block ${log.blockName || log.blockId} (${log.blockType || 'unknown'}): ${log.error || 'Failed'}`,
|
||||
duration: log.success ? `${log.durationMs}ms` : 'NA',
|
||||
trigger: triggerType,
|
||||
createdAt: new Date(log.endedAt || log.startedAt),
|
||||
metadata: {
|
||||
...metadata,
|
||||
...(log.input ? { blockInput: log.input } : {}),
|
||||
},
|
||||
})
|
||||
|
||||
if (metadata) {
|
||||
logger.debug('Persisted log with metadata', {
|
||||
logId: uuidv4(),
|
||||
executionId,
|
||||
toolCallCount: metadata.toolCalls?.length || 0,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Calculate total duration from successful block logs
|
||||
const totalDuration = (result.logs || [])
|
||||
.filter((log) => log.success)
|
||||
.reduce((sum, log) => sum + log.durationMs, 0)
|
||||
|
||||
// For parallel execution, calculate the actual duration from start to end times
|
||||
let actualDuration = totalDuration
|
||||
if (result.metadata?.startTime && result.metadata?.endTime) {
|
||||
const startTime = result.metadata.startTime
|
||||
? new Date(result.metadata.startTime).getTime()
|
||||
: 0
|
||||
const endTime = new Date(result.metadata.endTime).getTime()
|
||||
actualDuration = endTime - startTime
|
||||
}
|
||||
|
||||
// Get trigger-specific message
|
||||
const successMessage = getTriggerSuccessMessage(triggerType)
|
||||
const errorPrefix = getTriggerErrorPrefix(triggerType)
|
||||
|
||||
// Create workflow-level metadata with aggregated cost information
|
||||
const workflowMetadata: any = {
|
||||
traceSpans: (result as any).traceSpans || [],
|
||||
totalDuration: (result as any).totalDuration || actualDuration,
|
||||
}
|
||||
|
||||
// Add accumulated cost data to workflow-level log
|
||||
if (totalCost > 0) {
|
||||
workflowMetadata.cost = {
|
||||
model: primaryModel,
|
||||
input: totalInputCost,
|
||||
output: totalOutputCost,
|
||||
total: totalCost,
|
||||
tokens: {
|
||||
prompt: totalPromptTokens,
|
||||
completion: totalCompletionTokens,
|
||||
total: totalTokens,
|
||||
},
|
||||
}
|
||||
|
||||
// Include pricing info if we have a model
|
||||
if (primaryModel && result.logs && result.logs.length > 0) {
|
||||
// Find the first agent log with pricing info
|
||||
for (const log of result.logs) {
|
||||
if (log.output?.cost?.pricing) {
|
||||
workflowMetadata.cost.pricing = log.output.cost.pricing
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If result has a direct cost field (for streaming responses completed with calculated cost),
|
||||
// use that as a safety check to ensure we have cost data
|
||||
if (
|
||||
result.metadata &&
|
||||
'cost' in result.metadata &&
|
||||
(!workflowMetadata.cost || workflowMetadata.cost.total <= 0)
|
||||
) {
|
||||
const resultCost = (result.metadata as any).cost
|
||||
workflowMetadata.cost = {
|
||||
model: primaryModel,
|
||||
total: typeof resultCost === 'number' ? resultCost : resultCost?.total || 0,
|
||||
input: resultCost?.input || 0,
|
||||
output: resultCost?.output || 0,
|
||||
tokens: {
|
||||
prompt: totalPromptTokens,
|
||||
completion: totalCompletionTokens,
|
||||
total: totalTokens,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
if (userId) {
|
||||
try {
|
||||
const userStatsRecords = await db
|
||||
.select()
|
||||
.from(userStats)
|
||||
.where(eq(userStats.userId, userId))
|
||||
|
||||
const costMultiplier = getCostMultiplier()
|
||||
const costToStore = totalCost * costMultiplier
|
||||
|
||||
if (userStatsRecords.length === 0) {
|
||||
await db.insert(userStats).values({
|
||||
id: crypto.randomUUID(),
|
||||
userId: userId,
|
||||
totalManualExecutions: 0,
|
||||
totalApiCalls: 0,
|
||||
totalWebhookTriggers: 0,
|
||||
totalScheduledExecutions: 0,
|
||||
totalChatExecutions: 0,
|
||||
totalTokensUsed: totalTokens,
|
||||
totalCost: costToStore.toString(),
|
||||
currentPeriodCost: costToStore.toString(), // Initialize current period usage
|
||||
lastActive: new Date(),
|
||||
})
|
||||
} else {
|
||||
await db
|
||||
.update(userStats)
|
||||
.set({
|
||||
totalTokensUsed: sql`total_tokens_used + ${totalTokens}`,
|
||||
totalCost: sql`total_cost + ${costToStore}`,
|
||||
currentPeriodCost: sql`current_period_cost + ${costToStore}`, // Track current billing period usage
|
||||
lastActive: new Date(),
|
||||
})
|
||||
.where(eq(userStats.userId, userId))
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('Error upserting user stats:', error)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Log the final execution result
|
||||
await persistLog({
|
||||
id: uuidv4(),
|
||||
workflowId,
|
||||
executionId,
|
||||
level: result.success ? 'info' : 'error',
|
||||
message: result.success ? successMessage : `${errorPrefix} execution failed: ${result.error}`,
|
||||
duration: result.success ? `${actualDuration}ms` : 'NA',
|
||||
trigger: triggerType,
|
||||
createdAt: new Date(),
|
||||
metadata: workflowMetadata,
|
||||
})
|
||||
} catch (error: any) {
|
||||
logger.error(`Error persisting execution logs: ${error.message}`, {
|
||||
error,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Persists an error log for a workflow execution
|
||||
* @param workflowId - The ID of the workflow
|
||||
* @param executionId - The ID of the execution
|
||||
* @param error - The error that occurred
|
||||
* @param triggerType - The type of trigger (api, webhook, schedule, manual, chat)
|
||||
*/
|
||||
export async function persistExecutionError(
|
||||
workflowId: string,
|
||||
executionId: string,
|
||||
error: Error,
|
||||
triggerType: 'api' | 'webhook' | 'schedule' | 'manual' | 'chat'
|
||||
) {
|
||||
try {
|
||||
const errorPrefix = getTriggerErrorPrefix(triggerType)
|
||||
|
||||
await persistLog({
|
||||
id: uuidv4(),
|
||||
workflowId,
|
||||
executionId,
|
||||
level: 'error',
|
||||
message: `${errorPrefix} execution failed: ${error.message}`,
|
||||
duration: 'NA',
|
||||
trigger: triggerType,
|
||||
createdAt: new Date(),
|
||||
})
|
||||
} catch (logError: any) {
|
||||
logger.error(`Error persisting execution error log: ${logError.message}`, {
|
||||
logError,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Helper functions for trigger-specific messages
|
||||
function getTriggerSuccessMessage(
|
||||
triggerType: 'api' | 'webhook' | 'schedule' | 'manual' | 'chat'
|
||||
): string {
|
||||
switch (triggerType) {
|
||||
case 'api':
|
||||
return 'API workflow executed successfully'
|
||||
case 'webhook':
|
||||
return 'Webhook workflow executed successfully'
|
||||
case 'schedule':
|
||||
return 'Scheduled workflow executed successfully'
|
||||
case 'manual':
|
||||
return 'Manual workflow executed successfully'
|
||||
case 'chat':
|
||||
return 'Chat workflow executed successfully'
|
||||
default:
|
||||
return 'Workflow executed successfully'
|
||||
}
|
||||
}
|
||||
|
||||
function getTriggerErrorPrefix(
|
||||
triggerType: 'api' | 'webhook' | 'schedule' | 'manual' | 'chat'
|
||||
): string {
|
||||
switch (triggerType) {
|
||||
case 'api':
|
||||
return 'API workflow'
|
||||
case 'webhook':
|
||||
return 'Webhook workflow'
|
||||
case 'schedule':
|
||||
return 'Scheduled workflow'
|
||||
case 'manual':
|
||||
return 'Manual workflow'
|
||||
case 'chat':
|
||||
return 'Chat workflow'
|
||||
default:
|
||||
return 'Workflow'
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts duration information for tool calls
|
||||
* This function preserves actual timing data while ensuring duration is calculated
|
||||
*/
|
||||
function getToolCallTimings(
|
||||
toolCalls: any[],
|
||||
blockStart: string,
|
||||
blockEnd: string,
|
||||
totalDuration: number
|
||||
): any[] {
|
||||
if (!toolCalls || toolCalls.length === 0) return []
|
||||
|
||||
logger.debug('Estimating tool call timings', {
|
||||
toolCallCount: toolCalls.length,
|
||||
blockStartTime: blockStart,
|
||||
blockEndTime: blockEnd,
|
||||
totalDuration,
|
||||
})
|
||||
|
||||
// First, try to preserve any existing timing data
|
||||
const result = toolCalls.map((toolCall, index) => {
|
||||
// Start with the original tool call
|
||||
const enhancedToolCall = { ...toolCall }
|
||||
|
||||
// If we don't have timing data, set it from the block timing info
|
||||
// Divide block duration evenly among tools as a fallback
|
||||
const toolDuration = totalDuration / toolCalls.length
|
||||
const toolStartOffset = index * toolDuration
|
||||
|
||||
// Force a minimum duration of 1000ms if none exists
|
||||
if (!enhancedToolCall.duration || enhancedToolCall.duration === 0) {
|
||||
enhancedToolCall.duration = Math.max(1000, toolDuration)
|
||||
}
|
||||
|
||||
// Force reasonable startTime and endTime if missing
|
||||
if (!enhancedToolCall.startTime) {
|
||||
const startTimestamp = new Date(blockStart).getTime() + toolStartOffset
|
||||
enhancedToolCall.startTime = new Date(startTimestamp).toISOString()
|
||||
}
|
||||
|
||||
if (!enhancedToolCall.endTime) {
|
||||
const endTimestamp =
|
||||
new Date(enhancedToolCall.startTime).getTime() + enhancedToolCall.duration
|
||||
enhancedToolCall.endTime = new Date(endTimestamp).toISOString()
|
||||
}
|
||||
|
||||
return enhancedToolCall
|
||||
})
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts the duration from a tool call object, trying various property formats
|
||||
* that different agent providers might use
|
||||
*/
|
||||
function extractDuration(toolCall: any): number {
|
||||
if (!toolCall) return 0
|
||||
|
||||
// Direct duration fields (various formats providers might use)
|
||||
if (typeof toolCall.duration === 'number' && toolCall.duration > 0) return toolCall.duration
|
||||
if (typeof toolCall.durationMs === 'number' && toolCall.durationMs > 0) return toolCall.durationMs
|
||||
if (typeof toolCall.duration_ms === 'number' && toolCall.duration_ms > 0)
|
||||
return toolCall.duration_ms
|
||||
if (typeof toolCall.executionTime === 'number' && toolCall.executionTime > 0)
|
||||
return toolCall.executionTime
|
||||
if (typeof toolCall.execution_time === 'number' && toolCall.execution_time > 0)
|
||||
return toolCall.execution_time
|
||||
if (typeof toolCall.timing?.duration === 'number' && toolCall.timing.duration > 0)
|
||||
return toolCall.timing.duration
|
||||
|
||||
// Try to calculate from timestamps if available
|
||||
if (toolCall.startTime && toolCall.endTime) {
|
||||
try {
|
||||
const start = new Date(toolCall.startTime).getTime()
|
||||
const end = new Date(toolCall.endTime).getTime()
|
||||
if (!Number.isNaN(start) && !Number.isNaN(end) && end >= start) {
|
||||
return end - start
|
||||
}
|
||||
} catch (_e) {
|
||||
// Silently fail if date parsing fails
|
||||
}
|
||||
}
|
||||
|
||||
// Also check for startedAt/endedAt format
|
||||
if (toolCall.startedAt && toolCall.endedAt) {
|
||||
try {
|
||||
const start = new Date(toolCall.startedAt).getTime()
|
||||
const end = new Date(toolCall.endedAt).getTime()
|
||||
if (!Number.isNaN(start) && !Number.isNaN(end) && end >= start) {
|
||||
return end - start
|
||||
}
|
||||
} catch (_e) {
|
||||
// Silently fail if date parsing fails
|
||||
}
|
||||
}
|
||||
|
||||
// For some providers, timing info might be in a separate object
|
||||
if (toolCall.timing) {
|
||||
if (toolCall.timing.startTime && toolCall.timing.endTime) {
|
||||
try {
|
||||
const start = new Date(toolCall.timing.startTime).getTime()
|
||||
const end = new Date(toolCall.timing.endTime).getTime()
|
||||
if (!Number.isNaN(start) && !Number.isNaN(end) && end >= start) {
|
||||
return end - start
|
||||
}
|
||||
} catch (_e) {
|
||||
// Silently fail if date parsing fails
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// No duration info found
|
||||
return 0
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract timing information from a tool call object
|
||||
* @param toolCall The tool call object
|
||||
* @param blockStartTime Optional block start time (for reference, not used as fallback anymore)
|
||||
* @param blockEndTime Optional block end time (for reference, not used as fallback anymore)
|
||||
* @returns Object with startTime and endTime properties
|
||||
*/
|
||||
function extractTimingInfo(
|
||||
toolCall: any,
|
||||
blockStartTime?: Date,
|
||||
blockEndTime?: Date
|
||||
): { startTime?: Date; endTime?: Date } {
|
||||
let startTime: Date | undefined
|
||||
let endTime: Date | undefined
|
||||
|
||||
// Try to get direct timing properties
|
||||
if (toolCall.startTime && isValidDate(toolCall.startTime)) {
|
||||
startTime = new Date(toolCall.startTime)
|
||||
} else if (toolCall.timing?.startTime && isValidDate(toolCall.timing.startTime)) {
|
||||
startTime = new Date(toolCall.timing.startTime)
|
||||
} else if (toolCall.timing?.start && isValidDate(toolCall.timing.start)) {
|
||||
startTime = new Date(toolCall.timing.start)
|
||||
} else if (toolCall.startedAt && isValidDate(toolCall.startedAt)) {
|
||||
startTime = new Date(toolCall.startedAt)
|
||||
}
|
||||
|
||||
if (toolCall.endTime && isValidDate(toolCall.endTime)) {
|
||||
endTime = new Date(toolCall.endTime)
|
||||
} else if (toolCall.timing?.endTime && isValidDate(toolCall.timing.endTime)) {
|
||||
endTime = new Date(toolCall.timing.endTime)
|
||||
} else if (toolCall.timing?.end && isValidDate(toolCall.timing.end)) {
|
||||
endTime = new Date(toolCall.timing.end)
|
||||
} else if (toolCall.completedAt && isValidDate(toolCall.completedAt)) {
|
||||
endTime = new Date(toolCall.completedAt)
|
||||
}
|
||||
|
||||
if (startTime && !endTime) {
|
||||
const duration = extractDuration(toolCall)
|
||||
if (duration > 0) {
|
||||
endTime = new Date(startTime.getTime() + duration)
|
||||
}
|
||||
}
|
||||
|
||||
logger.debug('Final extracted timing info', {
|
||||
tool: toolCall.name,
|
||||
startTime: startTime?.toISOString(),
|
||||
endTime: endTime?.toISOString(),
|
||||
hasStartTime: !!startTime,
|
||||
hasEndTime: !!endTime,
|
||||
})
|
||||
|
||||
return { startTime, endTime }
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper function to check if a string is a valid date
|
||||
*/
|
||||
function isValidDate(dateString: string): boolean {
|
||||
if (!dateString) return false
|
||||
|
||||
try {
|
||||
const timestamp = Date.parse(dateString)
|
||||
return !Number.isNaN(timestamp)
|
||||
} catch (_e) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user