improvement(cleanup): remove workflow_execution_blocks table (#778)

* improvement(cleanup): remove workflow_execution_blocks table

* remove reference
This commit is contained in:
Vikhyath Mondreti
2025-07-24 11:29:49 -07:00
committed by GitHub
parent 14e1c179dc
commit 386644e9f9
8 changed files with 5687 additions and 387 deletions

View File

@@ -151,7 +151,7 @@ export async function GET(request: NextRequest) {
results.enhancedLogs.archived++
try {
// Delete enhanced log (will cascade to workflowExecutionBlocks due to foreign key)
// Delete enhanced log
const deleteResult = await db
.delete(workflowExecutionLogs)
.where(eq(workflowExecutionLogs.id, log.id))

View File

@@ -4,7 +4,7 @@ import { z } from 'zod'
import { getSession } from '@/lib/auth'
import { createLogger } from '@/lib/logs/console-logger'
import { db } from '@/db'
import { permissions, workflow, workflowExecutionBlocks, workflowExecutionLogs } from '@/db/schema'
import { permissions, workflow, workflowExecutionLogs } from '@/db/schema'
const logger = createLogger('EnhancedLogsAPI')
@@ -183,56 +183,8 @@ export async function GET(request: NextRequest) {
const count = countResult[0]?.count || 0
// Get block executions for all workflow executions
const executionIds = logs.map((log) => log.executionId)
let blockExecutionsByExecution: Record<string, any[]> = {}
if (executionIds.length > 0) {
const blockLogs = await db
.select()
.from(workflowExecutionBlocks)
.where(inArray(workflowExecutionBlocks.executionId, executionIds))
.orderBy(workflowExecutionBlocks.startedAt)
// Group block logs by execution ID
blockExecutionsByExecution = blockLogs.reduce(
(acc, blockLog) => {
if (!acc[blockLog.executionId]) {
acc[blockLog.executionId] = []
}
acc[blockLog.executionId].push({
id: blockLog.id,
blockId: blockLog.blockId,
blockName: blockLog.blockName || '',
blockType: blockLog.blockType,
startedAt: blockLog.startedAt.toISOString(),
endedAt: blockLog.endedAt?.toISOString() || blockLog.startedAt.toISOString(),
durationMs: blockLog.durationMs || 0,
status: blockLog.status,
errorMessage: blockLog.errorMessage || undefined,
errorStackTrace: blockLog.errorStackTrace || undefined,
inputData: blockLog.inputData,
outputData: blockLog.outputData,
cost: blockLog.costTotal
? {
input: Number(blockLog.costInput) || 0,
output: Number(blockLog.costOutput) || 0,
total: Number(blockLog.costTotal) || 0,
tokens: {
prompt: blockLog.tokensPrompt || 0,
completion: blockLog.tokensCompletion || 0,
total: blockLog.tokensTotal || 0,
},
model: blockLog.modelUsed || '',
}
: undefined,
metadata: blockLog.metadata || {},
})
return acc
},
{} as Record<string, any[]>
)
}
// Block executions are now extracted from trace spans instead of separate table
const blockExecutionsByExecution: Record<string, any[]> = {}
// Create clean trace spans from block executions
const createTraceSpans = (blockExecutions: any[]) => {
@@ -397,87 +349,38 @@ export async function GET(request: NextRequest) {
// Include block execution data if requested
if (params.includeBlocks) {
const executionIds = logs.map((log) => log.executionId)
// Block executions are now extracted from stored trace spans in metadata
const blockLogsByExecution: Record<string, any[]> = {}
if (executionIds.length > 0) {
const blockLogs = await db
.select()
.from(workflowExecutionBlocks)
.where(inArray(workflowExecutionBlocks.executionId, executionIds))
.orderBy(workflowExecutionBlocks.startedAt)
logs.forEach((log) => {
const storedTraceSpans = (log.metadata as any)?.traceSpans
if (storedTraceSpans && Array.isArray(storedTraceSpans)) {
blockLogsByExecution[log.executionId] =
extractBlockExecutionsFromTraceSpans(storedTraceSpans)
} else {
blockLogsByExecution[log.executionId] = []
}
})
// Group block logs by execution ID
const blockLogsByExecution = blockLogs.reduce(
(acc, blockLog) => {
if (!acc[blockLog.executionId]) {
acc[blockLog.executionId] = []
}
acc[blockLog.executionId].push({
id: blockLog.id,
blockId: blockLog.blockId,
blockName: blockLog.blockName || '',
blockType: blockLog.blockType,
startedAt: blockLog.startedAt.toISOString(),
endedAt: blockLog.endedAt?.toISOString() || blockLog.startedAt.toISOString(),
durationMs: blockLog.durationMs || 0,
status: blockLog.status,
errorMessage: blockLog.errorMessage || undefined,
inputData: blockLog.inputData,
outputData: blockLog.outputData,
cost: blockLog.costTotal
? {
input: Number(blockLog.costInput) || 0,
output: Number(blockLog.costOutput) || 0,
total: Number(blockLog.costTotal) || 0,
tokens: {
prompt: blockLog.tokensPrompt || 0,
completion: blockLog.tokensCompletion || 0,
total: blockLog.tokensTotal || 0,
},
model: blockLog.modelUsed || '',
}
: undefined,
})
return acc
},
{} as Record<string, any[]>
)
// Add block logs to metadata
const logsWithBlocks = enhancedLogs.map((log) => ({
...log,
metadata: {
...log.metadata,
blockExecutions: blockLogsByExecution[log.executionId] || [],
},
}))
// For executions with no block logs in the database,
// extract block executions from stored trace spans in metadata
logs.forEach((log) => {
if (
!blockLogsByExecution[log.executionId] ||
blockLogsByExecution[log.executionId].length === 0
) {
const storedTraceSpans = (log.metadata as any)?.traceSpans
if (storedTraceSpans && Array.isArray(storedTraceSpans)) {
blockLogsByExecution[log.executionId] =
extractBlockExecutionsFromTraceSpans(storedTraceSpans)
}
}
})
// Add block logs to metadata
const logsWithBlocks = enhancedLogs.map((log) => ({
...log,
metadata: {
...log.metadata,
blockExecutions: blockLogsByExecution[log.executionId] || [],
},
}))
return NextResponse.json(
{
data: logsWithBlocks,
total: Number(count),
page: Math.floor(params.offset / params.limit) + 1,
pageSize: params.limit,
totalPages: Math.ceil(Number(count) / params.limit),
},
{ status: 200 }
)
}
return NextResponse.json(
{
data: logsWithBlocks,
total: Number(count),
page: Math.floor(params.offset / params.limit) + 1,
pageSize: params.limit,
totalPages: Math.ceil(Number(count) / params.limit),
},
{ status: 200 }
)
}
// Return basic logs

View File

@@ -0,0 +1 @@
DROP TABLE "workflow_execution_blocks" CASCADE;

File diff suppressed because it is too large Load Diff

View File

@@ -421,6 +421,13 @@
"when": 1753323514125,
"tag": "0060_ordinary_nick_fury",
"breakpoints": true
},
{
"idx": 61,
"version": "7",
"when": 1753380613269,
"tag": "0061_swift_doctor_spectrum",
"breakpoints": true
}
]
}

View File

@@ -338,59 +338,6 @@ export const workflowExecutionLogs = pgTable(
})
)
export const workflowExecutionBlocks = pgTable(
'workflow_execution_blocks',
{
id: text('id').primaryKey(),
executionId: text('execution_id').notNull(),
workflowId: text('workflow_id')
.notNull()
.references(() => workflow.id, { onDelete: 'cascade' }),
blockId: text('block_id').notNull(),
blockName: text('block_name'),
blockType: text('block_type').notNull(),
startedAt: timestamp('started_at').notNull(),
endedAt: timestamp('ended_at'),
durationMs: integer('duration_ms'),
status: text('status').notNull(), // 'success', 'error', 'skipped'
errorMessage: text('error_message'),
errorStackTrace: text('error_stack_trace'),
inputData: jsonb('input_data'),
outputData: jsonb('output_data'),
costInput: decimal('cost_input', { precision: 10, scale: 6 }),
costOutput: decimal('cost_output', { precision: 10, scale: 6 }),
costTotal: decimal('cost_total', { precision: 10, scale: 6 }),
tokensPrompt: integer('tokens_prompt'),
tokensCompletion: integer('tokens_completion'),
tokensTotal: integer('tokens_total'),
modelUsed: text('model_used'),
metadata: jsonb('metadata'),
createdAt: timestamp('created_at').notNull().defaultNow(),
},
(table) => ({
executionIdIdx: index('execution_blocks_execution_id_idx').on(table.executionId),
workflowIdIdx: index('execution_blocks_workflow_id_idx').on(table.workflowId),
blockIdIdx: index('execution_blocks_block_id_idx').on(table.blockId),
statusIdx: index('execution_blocks_status_idx').on(table.status),
durationIdx: index('execution_blocks_duration_idx').on(table.durationMs),
costIdx: index('execution_blocks_cost_idx').on(table.costTotal),
workflowExecutionIdx: index('execution_blocks_workflow_execution_idx').on(
table.workflowId,
table.executionId
),
executionStatusIdx: index('execution_blocks_execution_status_idx').on(
table.executionId,
table.status
),
startedAtIdx: index('execution_blocks_started_at_idx').on(table.startedAt),
})
)
export const environment = pgTable('environment', {
id: text('id').primaryKey(), // Use the user id as the key
userId: text('user_id')

View File

@@ -4,12 +4,9 @@ import { getCostMultiplier } from '@/lib/environment'
import { createLogger } from '@/lib/logs/console-logger'
import { snapshotService } from '@/lib/logs/snapshot-service'
import { db } from '@/db'
import { userStats, workflow, workflowExecutionBlocks, workflowExecutionLogs } from '@/db/schema'
import { userStats, workflow, workflowExecutionLogs } from '@/db/schema'
import type {
BlockExecutionLog,
BlockInputData,
BlockOutputData,
CostBreakdown,
ExecutionEnvironment,
ExecutionTrigger,
ExecutionLoggerService as IExecutionLoggerService,
@@ -111,102 +108,6 @@ export class EnhancedExecutionLogger implements IExecutionLoggerService {
}
}
async logBlockExecution(params: {
executionId: string
workflowId: string
blockId: string
blockName: string
blockType: string
input: BlockInputData
output: BlockOutputData
timing: {
startedAt: string
endedAt: string
durationMs: number
}
status: BlockExecutionLog['status']
error?: {
message: string
stackTrace?: string
}
cost?: CostBreakdown
metadata?: BlockExecutionLog['metadata']
toolCalls?: ToolCall[]
}): Promise<BlockExecutionLog> {
const {
executionId,
workflowId,
blockId,
blockName,
blockType,
input,
output,
timing,
status,
error,
cost,
metadata,
toolCalls,
} = params
logger.debug(`Logging block execution ${blockId} for execution ${executionId}`)
const blockLogId = uuidv4()
const [blockLog] = await db
.insert(workflowExecutionBlocks)
.values({
id: blockLogId,
executionId,
workflowId,
blockId,
blockName,
blockType,
startedAt: new Date(timing.startedAt),
endedAt: new Date(timing.endedAt),
durationMs: timing.durationMs,
status,
errorMessage: error?.message || null,
errorStackTrace: error?.stackTrace || null,
inputData: input,
outputData: output,
costInput: cost?.input ? cost.input.toString() : null,
costOutput: cost?.output ? cost.output.toString() : null,
costTotal: cost?.total ? cost.total.toString() : null,
tokensPrompt: cost?.tokens?.prompt || null,
tokensCompletion: cost?.tokens?.completion || null,
tokensTotal: cost?.tokens?.total || null,
modelUsed: cost?.model || null,
metadata: {
...(metadata || {}),
...(toolCalls && toolCalls.length > 0 ? { toolCalls } : {}),
},
})
.returning()
logger.debug(`Created block log ${blockLog.id} for block ${blockId}`)
return {
id: blockLog.id,
executionId: blockLog.executionId,
workflowId: blockLog.workflowId,
blockId: blockLog.blockId,
blockName: blockLog.blockName || '',
blockType: blockLog.blockType,
startedAt: blockLog.startedAt.toISOString(),
endedAt: blockLog.endedAt?.toISOString() || timing.endedAt,
durationMs: blockLog.durationMs || timing.durationMs,
status: blockLog.status as BlockExecutionLog['status'],
errorMessage: blockLog.errorMessage || undefined,
errorStackTrace: blockLog.errorStackTrace || undefined,
inputData: input,
outputData: output,
cost: cost || null,
metadata: (blockLog.metadata as BlockExecutionLog['metadata']) || {},
createdAt: blockLog.createdAt.toISOString(),
}
}
async completeWorkflowExecution(params: {
executionId: string
endedAt: string
@@ -316,51 +217,6 @@ export class EnhancedExecutionLogger implements IExecutionLoggerService {
}
}
async getBlockExecutionsForWorkflow(executionId: string): Promise<BlockExecutionLog[]> {
const blockLogs = await db
.select()
.from(workflowExecutionBlocks)
.where(eq(workflowExecutionBlocks.executionId, executionId))
.orderBy(workflowExecutionBlocks.startedAt)
return blockLogs.map((log) => ({
id: log.id,
executionId: log.executionId,
workflowId: log.workflowId,
blockId: log.blockId,
blockName: log.blockName || '',
blockType: log.blockType,
startedAt: log.startedAt.toISOString(),
endedAt: log.endedAt?.toISOString() || log.startedAt.toISOString(),
durationMs: log.durationMs || 0,
status: log.status as BlockExecutionLog['status'],
errorMessage: log.errorMessage || undefined,
errorStackTrace: log.errorStackTrace || undefined,
inputData: log.inputData as BlockInputData,
outputData: log.outputData as BlockOutputData,
cost: log.costTotal
? {
input: Number(log.costInput) || 0,
output: Number(log.costOutput) || 0,
total: Number(log.costTotal) || 0,
tokens: {
prompt: log.tokensPrompt || 0,
completion: log.tokensCompletion || 0,
total: log.tokensTotal || 0,
},
model: log.modelUsed || '',
pricing: {
input: 0,
output: 0,
updatedAt: new Date().toISOString(),
},
}
: null,
metadata: (log.metadata as BlockExecutionLog['metadata']) || {},
createdAt: log.createdAt.toISOString(),
}))
}
async getWorkflowExecution(executionId: string): Promise<WorkflowExecutionLog | null> {
const [workflowLog] = await db
.select()

View File

@@ -113,35 +113,6 @@ export interface WorkflowExecutionLog {
export type WorkflowExecutionLogInsert = Omit<WorkflowExecutionLog, 'id' | 'createdAt'>
export type WorkflowExecutionLogSelect = WorkflowExecutionLog
export interface BlockExecutionLog {
id: string
executionId: string
workflowId: string
blockId: string
blockName: string
blockType: string
startedAt: string
endedAt: string
durationMs: number
status: 'success' | 'error' | 'skipped'
errorMessage?: string
errorStackTrace?: string
inputData: BlockInputData
outputData: BlockOutputData
cost: CostBreakdown | null
metadata: {
toolCalls?: ToolCall[]
iterationIndex?: number
virtualBlockId?: string
parentBlockId?: string
environmentSnapshot?: Record<string, string>
}
createdAt: string
}
export type BlockExecutionLogInsert = Omit<BlockExecutionLog, 'id' | 'createdAt'>
export type BlockExecutionLogSelect = BlockExecutionLog
export interface TraceSpan {
id: string
name: string
@@ -200,7 +171,7 @@ export interface BlockExecutionSummary {
startedAt: string
endedAt: string
durationMs: number
status: BlockExecutionLog['status']
status: 'success' | 'error' | 'skipped'
errorMessage?: string
cost?: CostBreakdown
inputSummary: {
@@ -214,13 +185,6 @@ export interface BlockExecutionSummary {
}
}
export interface BlockExecutionDetail extends BlockExecutionSummary {
inputData: BlockInputData
outputData: BlockOutputData
metadata: BlockExecutionLog['metadata']
toolCalls?: ToolCall[]
}
export interface PaginatedResponse<T> {
data: T[]
pagination: {
@@ -329,28 +293,6 @@ export interface ExecutionLoggerService {
snapshot: WorkflowExecutionSnapshot
}>
logBlockExecution(params: {
executionId: string
workflowId: string
blockId: string
blockName: string
blockType: string
input: BlockInputData
output: BlockOutputData
timing: {
startedAt: string
endedAt: string
durationMs: number
}
status: BlockExecutionLog['status']
error?: {
message: string
stackTrace?: string
}
cost?: CostBreakdown
metadata?: BlockExecutionLog['metadata']
}): Promise<BlockExecutionLog>
completeWorkflowExecution(params: {
executionId: string
endedAt: string