feat(nested-workflow-spans): nested child workflow spans in logs sidepanel (#1561)

* feat(nested-workflow-logs): nested workflow logs display

* logs UX consistency between success and error cases

* fix chat execution

* fix schedules trigger

* update all deployment versions dependent exections to use api key owner instead of workflow owner

* fix tests

* simplify tests
This commit is contained in:
Vikhyath Mondreti
2025-10-07 12:32:04 -07:00
committed by GitHub
parent f03f395225
commit 991a020917
19 changed files with 927 additions and 341 deletions

View File

@@ -403,7 +403,10 @@ export function mockExecutionDependencies() {
provider: 'provider',
providerConfig: 'providerConfig',
},
workflow: { id: 'id', userId: 'userId' },
workflow: {
id: 'id',
userId: 'userId',
},
workflowSchedule: {
id: 'id',
workflowId: 'workflowId',

View File

@@ -1,6 +1,6 @@
import { db } from '@sim/db'
import { chat, userStats, workflow } from '@sim/db/schema'
import { eq, sql } from 'drizzle-orm'
import { chat, workflow } from '@sim/db/schema'
import { eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { v4 as uuidv4 } from 'uuid'
import { checkServerSideUsageLimits } from '@/lib/billing'
@@ -16,7 +16,7 @@ import { TriggerUtils } from '@/lib/workflows/triggers'
import { CHAT_ERROR_MESSAGES } from '@/app/chat/constants'
import { getBlock } from '@/blocks'
import { Executor } from '@/executor'
import type { BlockLog, ExecutionResult } from '@/executor/types'
import type { BlockLog, ExecutionResult, StreamingExecution } from '@/executor/types'
import { Serializer } from '@/serializer'
import { mergeSubblockState } from '@/stores/workflows/server-utils'
import type { WorkflowState } from '@/stores/workflows/workflow/types'
@@ -548,6 +548,7 @@ export async function executeWorkflowForChat(
const stream = new ReadableStream({
async start(controller) {
const encoder = new TextEncoder()
let executionResultForLogging: ExecutionResult | null = null
try {
const streamedContent = new Map<string, string>()
@@ -603,6 +604,7 @@ export async function executeWorkflowForChat(
endedAt: new Date().toISOString(),
totalDurationMs: 0,
error: { message: errorMessage },
traceSpans: [],
})
sessionCompleted = true
}
@@ -644,16 +646,24 @@ export async function executeWorkflowForChat(
// Set up logging on the executor
loggingSession.setupExecutor(executor)
let result
let result: ExecutionResult | StreamingExecution | undefined
try {
result = await executor.execute(workflowId, startBlockId)
} catch (error: any) {
logger.error(`[${requestId}] Chat workflow execution failed:`, error)
if (!sessionCompleted) {
const executionResult = error?.executionResult || {
success: false,
output: {},
logs: [],
}
const { traceSpans } = buildTraceSpans(executionResult)
await loggingSession.safeCompleteWithError({
endedAt: new Date().toISOString(),
totalDurationMs: 0,
error: { message: error.message || 'Chat workflow execution failed' },
traceSpans,
})
sessionCompleted = true
}
@@ -677,17 +687,14 @@ export async function executeWorkflowForChat(
? (result.execution as ExecutionResult)
: (result as ExecutionResult)
if (executionResult?.logs) {
// Update streamed content and apply tokenization - process regardless of overall success
// This ensures partial successes (some agents succeed, some fail) still return results
executionResultForLogging = executionResult
// Add newlines between different agent outputs for better readability
if (executionResult?.logs) {
const processedOutputs = new Set<string>()
executionResult.logs.forEach((log: BlockLog) => {
if (streamedContent.has(log.blockId)) {
const content = streamedContent.get(log.blockId)
if (log.output && content) {
// Add newline separation between different outputs (but not before the first one)
const separator = processedOutputs.size > 0 ? '\n\n' : ''
log.output.content = separator + content
processedOutputs.add(log.blockId)
@@ -695,13 +702,10 @@ export async function executeWorkflowForChat(
}
})
// Also process non-streamed outputs from selected blocks (like function blocks)
// This uses the same logic as the chat panel to ensure identical behavior
const nonStreamingLogs = executionResult.logs.filter(
(log: BlockLog) => !streamedContent.has(log.blockId)
)
// Extract the exact same functions used by the chat panel
const extractBlockIdFromOutputId = (outputId: string): string => {
return outputId.includes('_') ? outputId.split('_')[0] : outputId.split('.')[0]
}
@@ -719,7 +723,6 @@ export async function executeWorkflowForChat(
try {
return JSON.parse(output.content)
} catch (e) {
// Fallback to original structure if parsing fails
return output
}
}
@@ -727,13 +730,11 @@ export async function executeWorkflowForChat(
return output
}
// Filter outputs that have matching logs (exactly like chat panel)
const outputsToRender = selectedOutputIds.filter((outputId) => {
const blockIdForOutput = extractBlockIdFromOutputId(outputId)
return nonStreamingLogs.some((log) => log.blockId === blockIdForOutput)
})
// Process each selected output (exactly like chat panel)
for (const outputId of outputsToRender) {
const blockIdForOutput = extractBlockIdFromOutputId(outputId)
const path = extractPathFromOutputId(outputId, blockIdForOutput)
@@ -743,7 +744,6 @@ export async function executeWorkflowForChat(
let outputValue: any = log.output
if (path) {
// Parse JSON content safely (exactly like chat panel)
outputValue = parseOutputContentSafely(outputValue)
const pathParts = path.split('.')
@@ -758,16 +758,13 @@ export async function executeWorkflowForChat(
}
if (outputValue !== undefined) {
// Add newline separation between different outputs
const separator = processedOutputs.size > 0 ? '\n\n' : ''
// Format the output exactly like the chat panel
const formattedOutput =
typeof outputValue === 'string'
? outputValue
: JSON.stringify(outputValue, null, 2)
// Update the log content
if (!log.output.content) {
log.output.content = separator + formattedOutput
} else {
@@ -778,7 +775,6 @@ export async function executeWorkflowForChat(
}
}
// Process all logs for streaming tokenization
const processedCount = processStreamingBlockLogs(executionResult.logs, streamedContent)
logger.info(`Processed ${processedCount} blocks for streaming tokenization`)
@@ -793,23 +789,6 @@ export async function executeWorkflowForChat(
}
;(enrichedResult.metadata as any).conversationId = conversationId
}
// Use the executionId created at the beginning of this function
logger.debug(`Using execution ID for deployed chat: ${executionId}`)
if (executionResult.success) {
try {
await db
.update(userStats)
.set({
totalChatExecutions: sql`total_chat_executions + 1`,
lastActive: new Date(),
})
.where(eq(userStats.userId, deployment.userId))
logger.debug(`Updated user stats for deployed chat: ${deployment.userId}`)
} catch (error) {
logger.error(`Failed to update user stats for deployed chat:`, error)
}
}
}
if (!(result && typeof result === 'object' && 'stream' in result)) {
@@ -833,30 +812,35 @@ export async function executeWorkflowForChat(
controller.close()
} catch (error: any) {
// Handle any errors that occur in the stream
logger.error(`[${requestId}] Stream error:`, error)
logger.error(`[${requestId}] Chat execution streaming error:`, error)
// Send error event to client
const encoder = new TextEncoder()
controller.enqueue(
encoder.encode(
`data: ${JSON.stringify({
event: 'error',
error: error.message || 'An unexpected error occurred',
})}\n\n`
)
)
// Try to complete the logging session with error if not already completed
if (!sessionCompleted && loggingSession) {
const executionResult = executionResultForLogging ||
(error?.executionResult as ExecutionResult | undefined) || {
success: false,
output: {},
logs: [],
}
const { traceSpans } = buildTraceSpans(executionResult)
await loggingSession.safeCompleteWithError({
endedAt: new Date().toISOString(),
totalDurationMs: 0,
error: { message: error.message || 'Stream processing error' },
traceSpans,
})
sessionCompleted = true
}
controller.enqueue(
encoder.encode(
`data: ${JSON.stringify({
event: 'error',
error: error.message || 'Stream processing error',
})}\n\n`
)
)
controller.close()
}
},

View File

@@ -4,6 +4,7 @@ import { and, eq, lte, not, sql } from 'drizzle-orm'
import { NextResponse } from 'next/server'
import { v4 as uuidv4 } from 'uuid'
import { z } from 'zod'
import { getApiKeyOwnerUserId } from '@/lib/api-key/service'
import { checkServerSideUsageLimits } from '@/lib/billing'
import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
import { getPersonalAndWorkspaceEnv } from '@/lib/environment/utils'
@@ -17,7 +18,7 @@ import {
getSubBlockValue,
} from '@/lib/schedules/utils'
import { decryptSecret, generateRequestId } from '@/lib/utils'
import { loadDeployedWorkflowState } from '@/lib/workflows/db-helpers'
import { blockExistsInDeployment, loadDeployedWorkflowState } from '@/lib/workflows/db-helpers'
import { updateWorkflowRunCounts } from '@/lib/workflows/utils'
import { Executor } from '@/executor'
import { Serializer } from '@/serializer'
@@ -106,12 +107,22 @@ export async function GET() {
continue
}
const actorUserId = await getApiKeyOwnerUserId(workflowRecord.pinnedApiKeyId)
if (!actorUserId) {
logger.warn(
`[${requestId}] Skipping schedule ${schedule.id}: pinned API key required to attribute usage.`
)
runningExecutions.delete(schedule.workflowId)
continue
}
// Check rate limits for scheduled execution (checks both personal and org subscriptions)
const userSubscription = await getHighestPrioritySubscription(workflowRecord.userId)
const userSubscription = await getHighestPrioritySubscription(actorUserId)
const rateLimiter = new RateLimiter()
const rateLimitCheck = await rateLimiter.checkRateLimitWithSubscription(
workflowRecord.userId,
actorUserId,
userSubscription,
'schedule',
false // schedules are always sync
@@ -149,7 +160,7 @@ export async function GET() {
continue
}
const usageCheck = await checkServerSideUsageLimits(workflowRecord.userId)
const usageCheck = await checkServerSideUsageLimits(actorUserId)
if (usageCheck.isExceeded) {
logger.warn(
`[${requestId}] User ${workflowRecord.userId} has exceeded usage limits. Skipping scheduled execution.`,
@@ -159,26 +170,19 @@ export async function GET() {
workflowId: schedule.workflowId,
}
)
// Error logging handled by logging session
const retryDelay = 24 * 60 * 60 * 1000 // 24 hour delay for exceeded limits
const nextRetryAt = new Date(now.getTime() + retryDelay)
try {
const deployedData = await loadDeployedWorkflowState(schedule.workflowId)
const nextRunAt = calculateNextRunTime(schedule, deployedData.blocks as any)
await db
.update(workflowSchedule)
.set({
updatedAt: now,
nextRunAt: nextRetryAt,
})
.set({ updatedAt: now, nextRunAt })
.where(eq(workflowSchedule.id, schedule.id))
logger.debug(`[${requestId}] Updated next retry time due to usage limits`)
} catch (updateError) {
logger.error(`[${requestId}] Error updating schedule for usage limits:`, updateError)
} catch (calcErr) {
logger.warn(
`[${requestId}] Unable to calculate nextRunAt while skipping schedule ${schedule.id}`,
calcErr
)
}
runningExecutions.delete(schedule.workflowId)
continue
}
@@ -206,11 +210,25 @@ export async function GET() {
const parallels = deployedData.parallels
logger.info(`[${requestId}] Loaded deployed workflow ${schedule.workflowId}`)
// Validate that the schedule's trigger block exists in the deployed state
if (schedule.blockId) {
const blockExists = await blockExistsInDeployment(
schedule.workflowId,
schedule.blockId
)
if (!blockExists) {
logger.warn(
`[${requestId}] Schedule trigger block ${schedule.blockId} not found in deployed workflow ${schedule.workflowId}. Skipping execution.`
)
return { skip: true, blocks: {} as Record<string, BlockState> }
}
}
const mergedStates = mergeSubblockState(blocks)
// Retrieve environment variables with workspace precedence
const { personalEncrypted, workspaceEncrypted } = await getPersonalAndWorkspaceEnv(
workflowRecord.userId,
actorUserId,
workflowRecord.workspaceId || undefined
)
const variables = EnvVarsSchema.parse({
@@ -355,7 +373,6 @@ export async function GET() {
)
const input = {
workflowId: schedule.workflowId,
_context: {
workflowId: schedule.workflowId,
},
@@ -363,7 +380,7 @@ export async function GET() {
// Start logging with environment variables
await loggingSession.safeStart({
userId: workflowRecord.userId,
userId: actorUserId,
workspaceId: workflowRecord.workspaceId || '',
variables: variables || {},
})
@@ -407,7 +424,7 @@ export async function GET() {
totalScheduledExecutions: sql`total_scheduled_executions + 1`,
lastActive: now,
})
.where(eq(userStats.userId, workflowRecord.userId))
.where(eq(userStats.userId, actorUserId))
logger.debug(`[${requestId}] Updated user stats for scheduled execution`)
} catch (statsError) {
@@ -446,6 +463,7 @@ export async function GET() {
message: `Schedule execution failed before workflow started: ${earlyError.message}`,
stackTrace: earlyError.stack,
},
traceSpans: [],
})
} catch (loggingError) {
logger.error(
@@ -459,6 +477,12 @@ export async function GET() {
}
})()
// Check if execution was skipped (e.g., trigger block not found)
if ('skip' in executionSuccess && executionSuccess.skip) {
runningExecutions.delete(schedule.workflowId)
continue
}
if (executionSuccess.success) {
logger.info(`[${requestId}] Workflow ${schedule.workflowId} executed successfully`)
@@ -565,6 +589,7 @@ export async function GET() {
message: `Schedule execution failed: ${error.message}`,
stackTrace: error.stack,
},
traceSpans: [],
})
} catch (loggingError) {
logger.error(

View File

@@ -106,6 +106,24 @@ describe('Webhook Trigger API Route', () => {
mockExecutionDependencies()
mockTriggerDevSdk()
globalMockData.workflows.push({
id: 'test-workflow-id',
userId: 'test-user-id',
pinnedApiKeyId: 'test-pinned-api-key-id',
})
vi.doMock('@/lib/api-key/service', async () => {
const actual = await vi.importActual('@/lib/api-key/service')
return {
...(actual as Record<string, unknown>),
getApiKeyOwnerUserId: vi
.fn()
.mockImplementation(async (pinnedApiKeyId: string | null | undefined) =>
pinnedApiKeyId ? 'test-user-id' : null
),
}
})
vi.doMock('@/services/queue', () => ({
RateLimiter: vi.fn().mockImplementation(() => ({
checkRateLimit: vi.fn().mockResolvedValue({
@@ -222,6 +240,7 @@ describe('Webhook Trigger API Route', () => {
globalMockData.workflows.push({
id: 'test-workflow-id',
userId: 'test-user-id',
pinnedApiKeyId: 'test-pinned-api-key-id',
})
const req = createMockRequest('POST', { event: 'test', id: 'test-123' })
@@ -250,7 +269,11 @@ describe('Webhook Trigger API Route', () => {
providerConfig: { requireAuth: true, token: 'test-token-123' },
workflowId: 'test-workflow-id',
})
globalMockData.workflows.push({ id: 'test-workflow-id', userId: 'test-user-id' })
globalMockData.workflows.push({
id: 'test-workflow-id',
userId: 'test-user-id',
pinnedApiKeyId: 'test-pinned-api-key-id',
})
const headers = {
'Content-Type': 'application/json',
@@ -281,7 +304,11 @@ describe('Webhook Trigger API Route', () => {
},
workflowId: 'test-workflow-id',
})
globalMockData.workflows.push({ id: 'test-workflow-id', userId: 'test-user-id' })
globalMockData.workflows.push({
id: 'test-workflow-id',
userId: 'test-user-id',
pinnedApiKeyId: 'test-pinned-api-key-id',
})
const headers = {
'Content-Type': 'application/json',
@@ -308,7 +335,11 @@ describe('Webhook Trigger API Route', () => {
providerConfig: { requireAuth: true, token: 'case-test-token' },
workflowId: 'test-workflow-id',
})
globalMockData.workflows.push({ id: 'test-workflow-id', userId: 'test-user-id' })
globalMockData.workflows.push({
id: 'test-workflow-id',
userId: 'test-user-id',
pinnedApiKeyId: 'test-pinned-api-key-id',
})
vi.doMock('@trigger.dev/sdk', () => ({
tasks: {
@@ -354,7 +385,11 @@ describe('Webhook Trigger API Route', () => {
},
workflowId: 'test-workflow-id',
})
globalMockData.workflows.push({ id: 'test-workflow-id', userId: 'test-user-id' })
globalMockData.workflows.push({
id: 'test-workflow-id',
userId: 'test-user-id',
pinnedApiKeyId: 'test-pinned-api-key-id',
})
vi.doMock('@trigger.dev/sdk', () => ({
tasks: {
@@ -391,7 +426,6 @@ describe('Webhook Trigger API Route', () => {
providerConfig: { requireAuth: true, token: 'correct-token' },
workflowId: 'test-workflow-id',
})
globalMockData.workflows.push({ id: 'test-workflow-id', userId: 'test-user-id' })
const headers = {
'Content-Type': 'application/json',
@@ -424,7 +458,6 @@ describe('Webhook Trigger API Route', () => {
},
workflowId: 'test-workflow-id',
})
globalMockData.workflows.push({ id: 'test-workflow-id', userId: 'test-user-id' })
const headers = {
'Content-Type': 'application/json',
@@ -453,7 +486,6 @@ describe('Webhook Trigger API Route', () => {
providerConfig: { requireAuth: true, token: 'required-token' },
workflowId: 'test-workflow-id',
})
globalMockData.workflows.push({ id: 'test-workflow-id', userId: 'test-user-id' })
const req = createMockRequest('POST', { event: 'no.auth.test' })
const params = Promise.resolve({ path: 'test-path' })
@@ -482,7 +514,6 @@ describe('Webhook Trigger API Route', () => {
},
workflowId: 'test-workflow-id',
})
globalMockData.workflows.push({ id: 'test-workflow-id', userId: 'test-user-id' })
const headers = {
'Content-Type': 'application/json',
@@ -515,7 +546,6 @@ describe('Webhook Trigger API Route', () => {
},
workflowId: 'test-workflow-id',
})
globalMockData.workflows.push({ id: 'test-workflow-id', userId: 'test-user-id' })
const headers = {
'Content-Type': 'application/json',

View File

@@ -293,6 +293,13 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
}
}
// Attribution: this route is UI-only; require session user as actor
const actorUserId: string | null = session?.user?.id ?? null
if (!actorUserId) {
logger.warn(`[${requestId}] Unable to resolve actor user for workflow deployment: ${id}`)
return createErrorResponse('Unable to determine deploying user', 400)
}
await db.transaction(async (tx) => {
const [{ maxVersion }] = await tx
.select({ maxVersion: sql`COALESCE(MAX("version"), 0)` })
@@ -318,7 +325,7 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
state: currentState,
isActive: true,
createdAt: deployedAt,
createdBy: userId,
createdBy: actorUserId,
})
const updateData: Record<string, unknown> = {

View File

@@ -5,6 +5,7 @@ import { eq, sql } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { v4 as uuidv4 } from 'uuid'
import { z } from 'zod'
import { authenticateApiKeyFromHeader, updateApiKeyLastUsed } from '@/lib/api-key/service'
import { getSession } from '@/lib/auth'
import { checkServerSideUsageLimits } from '@/lib/billing'
import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
@@ -23,6 +24,7 @@ import {
import { validateWorkflowAccess } from '@/app/api/workflows/middleware'
import { createErrorResponse, createSuccessResponse } from '@/app/api/workflows/utils'
import { Executor } from '@/executor'
import type { ExecutionResult } from '@/executor/types'
import { Serializer } from '@/serializer'
import { RateLimitError, RateLimiter, type TriggerType } from '@/services/queue'
import { mergeSubblockState } from '@/stores/workflows/server-utils'
@@ -65,8 +67,8 @@ class UsageLimitError extends Error {
async function executeWorkflow(
workflow: any,
requestId: string,
input?: any,
executingUserId?: string
input: any | undefined,
actorUserId: string
): Promise<any> {
const workflowId = workflow.id
const executionId = uuidv4()
@@ -85,8 +87,8 @@ async function executeWorkflow(
// Rate limiting is now handled before entering the sync queue
// Check if the user has exceeded their usage limits
const usageCheck = await checkServerSideUsageLimits(workflow.userId)
// Check if the actor has exceeded their usage limits
const usageCheck = await checkServerSideUsageLimits(actorUserId)
if (usageCheck.isExceeded) {
logger.warn(`[${requestId}] User ${workflow.userId} has exceeded usage limits`, {
currentUsage: usageCheck.currentUsage,
@@ -132,13 +134,13 @@ async function executeWorkflow(
// Load personal (for the executing user) and workspace env (workspace overrides personal)
const { personalEncrypted, workspaceEncrypted } = await getPersonalAndWorkspaceEnv(
executingUserId || workflow.userId,
actorUserId,
workflow.workspaceId || undefined
)
const variables = EnvVarsSchema.parse({ ...personalEncrypted, ...workspaceEncrypted })
await loggingSession.safeStart({
userId: executingUserId || workflow.userId,
userId: actorUserId,
workspaceId: workflow.workspaceId,
variables,
})
@@ -340,7 +342,7 @@ async function executeWorkflow(
totalApiCalls: sql`total_api_calls + 1`,
lastActive: sql`now()`,
})
.where(eq(userStats.userId, workflow.userId))
.where(eq(userStats.userId, actorUserId))
}
await loggingSession.safeComplete({
@@ -354,6 +356,13 @@ async function executeWorkflow(
} catch (error: any) {
logger.error(`[${requestId}] Workflow execution failed: ${workflowId}`, error)
const executionResultForError = (error?.executionResult as ExecutionResult | undefined) || {
success: false,
output: {},
logs: [],
}
const { traceSpans } = buildTraceSpans(executionResultForError)
await loggingSession.safeCompleteWithError({
endedAt: new Date().toISOString(),
totalDurationMs: 0,
@@ -361,6 +370,7 @@ async function executeWorkflow(
message: error.message || 'Workflow execution failed',
stackTrace: error.stack,
},
traceSpans,
})
throw error
@@ -396,19 +406,30 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{
// Synchronous execution
try {
// Check rate limits BEFORE entering queue for GET requests
if (triggerType === 'api') {
// Get user subscription (checks both personal and org subscriptions)
const userSubscription = await getHighestPrioritySubscription(validation.workflow.userId)
// Resolve actor user id
let actorUserId: string | null = null
if (triggerType === 'manual') {
actorUserId = session!.user!.id
} else {
const apiKeyHeader = request.headers.get('X-API-Key')
const auth = apiKeyHeader ? await authenticateApiKeyFromHeader(apiKeyHeader) : null
if (!auth?.success || !auth.userId) {
return createErrorResponse('Unauthorized', 401)
}
actorUserId = auth.userId
if (auth.keyId) {
void updateApiKeyLastUsed(auth.keyId).catch(() => {})
}
// Check rate limits BEFORE entering execution for API requests
const userSubscription = await getHighestPrioritySubscription(actorUserId)
const rateLimiter = new RateLimiter()
const rateLimitCheck = await rateLimiter.checkRateLimitWithSubscription(
validation.workflow.userId,
actorUserId,
userSubscription,
triggerType,
false // isAsync = false for sync calls
'api',
false
)
if (!rateLimitCheck.allowed) {
throw new RateLimitError(
`Rate limit exceeded. You have ${rateLimitCheck.remaining} requests remaining. Resets at ${rateLimitCheck.resetAt.toISOString()}`
@@ -420,8 +441,7 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{
validation.workflow,
requestId,
undefined,
// Executing user (manual run): if session present, use that user for fallback
(await getSession())?.user?.id || undefined
actorUserId as string
)
// Check if the workflow execution contains a response block output
@@ -508,14 +528,19 @@ export async function POST(
let triggerType: TriggerType = 'manual'
const session = await getSession()
if (session?.user?.id) {
const apiKeyHeader = request.headers.get('X-API-Key')
if (session?.user?.id && !apiKeyHeader) {
authenticatedUserId = session.user.id
triggerType = 'manual' // UI session (not rate limited)
} else {
const apiKeyHeader = request.headers.get('X-API-Key')
if (apiKeyHeader) {
authenticatedUserId = validation.workflow.userId
triggerType = 'api'
triggerType = 'manual'
} else if (apiKeyHeader) {
const auth = await authenticateApiKeyFromHeader(apiKeyHeader)
if (!auth.success || !auth.userId) {
return createErrorResponse('Unauthorized', 401)
}
authenticatedUserId = auth.userId
triggerType = 'api'
if (auth.keyId) {
void updateApiKeyLastUsed(auth.keyId).catch(() => {})
}
}

View File

@@ -44,15 +44,17 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
variables: {},
})
const { traceSpans } = buildTraceSpans(result)
if (result.success === false) {
const message = result.error || 'Workflow execution failed'
await loggingSession.safeCompleteWithError({
endedAt: new Date().toISOString(),
totalDurationMs: result.metadata?.duration || 0,
error: { message },
traceSpans,
})
} else {
const { traceSpans } = buildTraceSpans(result)
await loggingSession.safeComplete({
endedAt: new Date().toISOString(),
totalDurationMs: result.metadata?.duration || 0,

View File

@@ -13,6 +13,67 @@ import {
import { cn, redactApiKeys } from '@/lib/utils'
import type { TraceSpan } from '@/stores/logs/filters/types'
function getSpanKey(span: TraceSpan): string {
if (span.id) {
return span.id
}
const name = span.name || 'span'
const start = span.startTime || 'unknown-start'
const end = span.endTime || 'unknown-end'
return `${name}|${start}|${end}`
}
function mergeTraceSpanChildren(...groups: TraceSpan[][]): TraceSpan[] {
const merged: TraceSpan[] = []
const seen = new Set<string>()
groups.forEach((group) => {
group.forEach((child) => {
const key = getSpanKey(child)
if (seen.has(key)) {
return
}
seen.add(key)
merged.push(child)
})
})
return merged
}
function normalizeChildWorkflowSpan(span: TraceSpan): TraceSpan {
const enrichedSpan: TraceSpan = { ...span }
if (enrichedSpan.output && typeof enrichedSpan.output === 'object') {
enrichedSpan.output = { ...enrichedSpan.output }
}
const normalizedChildren = Array.isArray(span.children)
? span.children.map((childSpan) => normalizeChildWorkflowSpan(childSpan))
: []
const outputChildSpans = Array.isArray(span.output?.childTraceSpans)
? (span.output!.childTraceSpans as TraceSpan[]).map((childSpan) =>
normalizeChildWorkflowSpan(childSpan)
)
: []
const mergedChildren = mergeTraceSpanChildren(normalizedChildren, outputChildSpans)
if (enrichedSpan.output && 'childTraceSpans' in enrichedSpan.output) {
const { childTraceSpans, ...cleanOutput } = enrichedSpan.output as {
childTraceSpans?: TraceSpan[]
} & Record<string, unknown>
enrichedSpan.output = cleanOutput
}
enrichedSpan.children = mergedChildren.length > 0 ? mergedChildren : undefined
return enrichedSpan
}
interface TraceSpansDisplayProps {
traceSpans?: TraceSpan[]
totalDuration?: number
@@ -310,22 +371,23 @@ export function TraceSpansDisplay({
</div>
<div className='w-full overflow-hidden rounded-md border shadow-sm'>
{traceSpans.map((span, index) => {
const normalizedSpan = normalizeChildWorkflowSpan(span)
const hasSubItems = Boolean(
(span.children && span.children.length > 0) ||
(span.toolCalls && span.toolCalls.length > 0) ||
span.input ||
span.output
(normalizedSpan.children && normalizedSpan.children.length > 0) ||
(normalizedSpan.toolCalls && normalizedSpan.toolCalls.length > 0) ||
normalizedSpan.input ||
normalizedSpan.output
)
return (
<TraceSpanItem
key={index}
span={span}
span={normalizedSpan}
depth={0}
totalDuration={
actualTotalDuration !== undefined ? actualTotalDuration : totalDuration
}
isLast={index === traceSpans.length - 1}
parentStartTime={new Date(span.startTime).getTime()}
parentStartTime={new Date(normalizedSpan.startTime).getTime()}
workflowStartTime={workflowStartTime}
onToggle={handleSpanToggle}
expandedSpans={expandedSpans}
@@ -612,17 +674,19 @@ function TraceSpanItem({
{hasChildren && (
<div>
{span.children?.map((childSpan, index) => {
const enrichedChildSpan = normalizeChildWorkflowSpan(childSpan)
const childHasSubItems = Boolean(
(childSpan.children && childSpan.children.length > 0) ||
(childSpan.toolCalls && childSpan.toolCalls.length > 0) ||
childSpan.input ||
childSpan.output
(enrichedChildSpan.children && enrichedChildSpan.children.length > 0) ||
(enrichedChildSpan.toolCalls && enrichedChildSpan.toolCalls.length > 0) ||
enrichedChildSpan.input ||
enrichedChildSpan.output
)
return (
<TraceSpanItem
key={index}
span={childSpan}
span={enrichedChildSpan}
depth={depth + 1}
totalDuration={totalDuration}
isLast={index === (span.children?.length || 0) - 1}

View File

@@ -44,6 +44,56 @@ interface DebugValidationResult {
error?: string
}
const WORKFLOW_EXECUTION_FAILURE_MESSAGE = 'Workflow execution failed'
function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === 'object' && value !== null
}
function sanitizeMessage(value: unknown): string | undefined {
if (typeof value !== 'string') return undefined
const trimmed = value.trim()
if (!trimmed || trimmed === 'undefined (undefined)') return undefined
return trimmed
}
function normalizeErrorMessage(error: unknown): string {
if (error instanceof Error) {
const message = sanitizeMessage(error.message)
if (message) return message
} else if (typeof error === 'string') {
const message = sanitizeMessage(error)
if (message) return message
}
if (isRecord(error)) {
const directMessage = sanitizeMessage(error.message)
if (directMessage) return directMessage
const nestedError = error.error
if (isRecord(nestedError)) {
const nestedMessage = sanitizeMessage(nestedError.message)
if (nestedMessage) return nestedMessage
} else {
const nestedMessage = sanitizeMessage(nestedError)
if (nestedMessage) return nestedMessage
}
}
return WORKFLOW_EXECUTION_FAILURE_MESSAGE
}
function isExecutionResult(value: unknown): value is ExecutionResult {
if (!isRecord(value)) return false
return typeof value.success === 'boolean' && isRecord(value.output)
}
function extractExecutionResult(error: unknown): ExecutionResult | null {
if (!isRecord(error)) return null
const candidate = error.executionResult
return isExecutionResult(candidate) ? candidate : null
}
export function useWorkflowExecution() {
const currentWorkflow = useCurrentWorkflow()
const { activeWorkflowId, workflows } = useWorkflowRegistry()
@@ -862,74 +912,56 @@ export function useWorkflowExecution() {
return newExecutor.execute(activeWorkflowId || '', startBlockId)
}
const handleExecutionError = (error: any, options?: { executionId?: string }) => {
let errorMessage = 'Unknown error'
if (error instanceof Error) {
errorMessage = error.message || `Error: ${String(error)}`
} else if (typeof error === 'string') {
errorMessage = error
} else if (error && typeof error === 'object') {
if (
error.message === 'undefined (undefined)' ||
(error.error &&
typeof error.error === 'object' &&
error.error.message === 'undefined (undefined)')
) {
errorMessage = 'API request failed - no specific error details available'
} else if (error.message) {
errorMessage = error.message
} else if (error.error && typeof error.error === 'string') {
errorMessage = error.error
} else if (error.error && typeof error.error === 'object' && error.error.message) {
errorMessage = error.error.message
} else {
try {
errorMessage = `Error details: ${JSON.stringify(error)}`
} catch {
errorMessage = 'Error occurred but details could not be displayed'
}
const handleExecutionError = (error: unknown, options?: { executionId?: string }) => {
const normalizedMessage = normalizeErrorMessage(error)
const executionResultFromError = extractExecutionResult(error)
let errorResult: ExecutionResult
if (executionResultFromError) {
const logs = Array.isArray(executionResultFromError.logs) ? executionResultFromError.logs : []
errorResult = {
...executionResultFromError,
success: false,
error: executionResultFromError.error ?? normalizedMessage,
logs,
}
}
} else {
if (!executor) {
try {
let blockId = 'serialization'
let blockName = 'Workflow'
let blockType = 'serializer'
if (error instanceof WorkflowValidationError) {
blockId = error.blockId || blockId
blockName = error.blockName || blockName
blockType = error.blockType || blockType
}
if (errorMessage === 'undefined (undefined)') {
errorMessage = 'API request failed - no specific error details available'
}
useConsoleStore.getState().addConsole({
input: {},
output: {},
success: false,
error: normalizedMessage,
durationMs: 0,
startedAt: new Date().toISOString(),
endedAt: new Date().toISOString(),
workflowId: activeWorkflowId || '',
blockId,
executionId: options?.executionId,
blockName,
blockType,
})
} catch {}
}
// If we failed before creating an executor (e.g., serializer validation), add a console entry
if (!executor) {
try {
// Prefer attributing to specific subflow if we have a structured error
let blockId = 'serialization'
let blockName = 'Workflow'
let blockType = 'serializer'
if (error instanceof WorkflowValidationError) {
blockId = error.blockId || blockId
blockName = error.blockName || blockName
blockType = error.blockType || blockType
}
useConsoleStore.getState().addConsole({
input: {},
output: {},
success: false,
error: errorMessage,
durationMs: 0,
startedAt: new Date().toISOString(),
endedAt: new Date().toISOString(),
workflowId: activeWorkflowId || '',
blockId,
executionId: options?.executionId,
blockName,
blockType,
})
} catch {}
}
const errorResult: ExecutionResult = {
success: false,
output: {},
error: errorMessage,
logs: [],
errorResult = {
success: false,
output: {},
error: normalizedMessage,
logs: [],
}
}
setExecutionResult(errorResult)
@@ -937,16 +969,14 @@ export function useWorkflowExecution() {
setIsDebugging(false)
setActiveBlocks(new Set())
let notificationMessage = 'Workflow execution failed'
if (error?.request?.url) {
if (error.request.url && error.request.url.trim() !== '') {
notificationMessage += `: Request to ${error.request.url} failed`
if (error.status) {
notificationMessage += ` (Status: ${error.status})`
}
let notificationMessage = WORKFLOW_EXECUTION_FAILURE_MESSAGE
if (isRecord(error) && isRecord(error.request) && sanitizeMessage(error.request.url)) {
notificationMessage += `: Request to ${(error.request.url as string).trim()} failed`
if ('status' in error && typeof error.status === 'number') {
notificationMessage += ` (Status: ${error.status})`
}
} else {
notificationMessage += `: ${errorMessage}`
} else if (sanitizeMessage(errorResult.error)) {
notificationMessage += `: ${errorResult.error}`
}
return errorResult

View File

@@ -17,6 +17,7 @@ import {
} from '@/lib/workflows/db-helpers'
import { updateWorkflowRunCounts } from '@/lib/workflows/utils'
import { Executor } from '@/executor'
import type { ExecutionResult } from '@/executor/types'
import { Serializer } from '@/serializer'
import { mergeSubblockState } from '@/stores/workflows/server-utils'
@@ -386,6 +387,13 @@ async function executeWebhookJobInternal(
// Complete logging session with error (matching workflow-execution pattern)
try {
const executionResult = (error?.executionResult as ExecutionResult | undefined) || {
success: false,
output: {},
logs: [],
}
const { traceSpans } = buildTraceSpans(executionResult)
await loggingSession.safeCompleteWithError({
endedAt: new Date().toISOString(),
totalDurationMs: 0,
@@ -393,6 +401,7 @@ async function executeWebhookJobInternal(
message: error.message || 'Webhook execution failed',
stackTrace: error.stack,
},
traceSpans,
})
} catch (loggingError) {
logger.error(`[${requestId}] Failed to complete logging session`, loggingError)

View File

@@ -192,6 +192,9 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
stack: error.stack,
})
const executionResult = error?.executionResult || { success: false, output: {}, logs: [] }
const { traceSpans } = buildTraceSpans(executionResult)
await loggingSession.safeCompleteWithError({
endedAt: new Date().toISOString(),
totalDurationMs: 0,
@@ -199,6 +202,7 @@ export async function executeWorkflowJob(payload: WorkflowExecutionPayload) {
message: error.message || 'Workflow execution failed',
stackTrace: error.stack,
},
traceSpans,
})
throw error

View File

@@ -12,7 +12,8 @@ export enum BlockType {
API = 'api',
EVALUATOR = 'evaluator',
RESPONSE = 'response',
WORKFLOW = 'workflow',
WORKFLOW = 'workflow', // Deprecated - kept for backwards compatibility
WORKFLOW_INPUT = 'workflow_input', // Current workflow block type
STARTER = 'starter',
}
@@ -27,3 +28,10 @@ export const ALL_BLOCK_TYPES = Object.values(BlockType) as string[]
export function isValidBlockType(type: string): type is BlockType {
return ALL_BLOCK_TYPES.includes(type)
}
/**
* Helper to check if a block type is a workflow block (current or deprecated)
*/
export function isWorkflowBlockType(blockType: string | undefined): boolean {
return blockType === BlockType.WORKFLOW || blockType === BlockType.WORKFLOW_INPUT
}

View File

@@ -1,17 +1,29 @@
import { generateInternalToken } from '@/lib/auth/internal'
import { createLogger } from '@/lib/logs/console/logger'
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
import type { TraceSpan } from '@/lib/logs/types'
import { getBaseUrl } from '@/lib/urls/utils'
import type { BlockOutput } from '@/blocks/types'
import { Executor } from '@/executor'
import { BlockType } from '@/executor/consts'
import type { BlockHandler, ExecutionContext, StreamingExecution } from '@/executor/types'
import type {
BlockHandler,
ExecutionContext,
ExecutionResult,
StreamingExecution,
} from '@/executor/types'
import { Serializer } from '@/serializer'
import type { SerializedBlock } from '@/serializer/types'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
const logger = createLogger('WorkflowBlockHandler')
type WorkflowTraceSpan = TraceSpan & {
metadata?: Record<string, unknown>
children?: WorkflowTraceSpan[]
output?: (Record<string, unknown> & { childTraceSpans?: WorkflowTraceSpan[] }) | null
}
// Maximum allowed depth for nested workflow executions
const MAX_WORKFLOW_DEPTH = 10
@@ -125,13 +137,19 @@ export class WorkflowBlockHandler implements BlockHandler {
// Use the actual child workflow ID for authentication, not the execution ID
// This ensures knowledge base and other API calls can properly authenticate
const result = await subExecutor.execute(workflowId)
const executionResult = this.toExecutionResult(result)
const duration = performance.now() - startTime
logger.info(`Child workflow ${childWorkflowName} completed in ${Math.round(duration)}ms`)
const childTraceSpans = this.captureChildWorkflowLogs(result, childWorkflowName, context)
const childTraceSpans = this.captureChildWorkflowLogs(
executionResult,
childWorkflowName,
context
)
const mappedResult = this.mapChildOutputToParent(
result,
executionResult,
workflowId,
childWorkflowName,
duration,
@@ -146,6 +164,7 @@ export class WorkflowBlockHandler implements BlockHandler {
// Attach trace spans and name for higher-level logging to consume
errorWithSpans.childTraceSpans = childTraceSpans
errorWithSpans.childWorkflowName = childWorkflowName
errorWithSpans.executionResult = executionResult
throw errorWithSpans
}
@@ -162,7 +181,19 @@ export class WorkflowBlockHandler implements BlockHandler {
throw error // Re-throw as-is to avoid duplication
}
throw new Error(`Error in child workflow "${childWorkflowName}": ${originalError}`)
const wrappedError = new Error(
`Error in child workflow "${childWorkflowName}": ${originalError}`
) as any
if (error.childTraceSpans) {
wrappedError.childTraceSpans = error.childTraceSpans
}
if (error.childWorkflowName) {
wrappedError.childWorkflowName = error.childWorkflowName
}
if (error.executionResult) {
wrappedError.executionResult = error.executionResult
}
throw wrappedError
}
}
@@ -318,10 +349,10 @@ export class WorkflowBlockHandler implements BlockHandler {
* Captures and transforms child workflow logs into trace spans
*/
private captureChildWorkflowLogs(
childResult: any,
childResult: ExecutionResult,
childWorkflowName: string,
parentContext: ExecutionContext
): any[] {
): WorkflowTraceSpan[] {
try {
if (!childResult.logs || !Array.isArray(childResult.logs)) {
return []
@@ -333,9 +364,15 @@ export class WorkflowBlockHandler implements BlockHandler {
return []
}
const transformedSpans = traceSpans.map((span: any) => {
return this.transformSpanForChildWorkflow(span, childWorkflowName)
})
const processedSpans = this.processChildWorkflowSpans(traceSpans)
if (processedSpans.length === 0) {
return []
}
const transformedSpans = processedSpans.map((span) =>
this.transformSpanForChildWorkflow(span, childWorkflowName)
)
return transformedSpans
} catch (error) {
@@ -347,67 +384,111 @@ export class WorkflowBlockHandler implements BlockHandler {
/**
* Transforms trace span for child workflow context
*/
private transformSpanForChildWorkflow(span: any, childWorkflowName: string): any {
const transformedSpan = {
private transformSpanForChildWorkflow(
span: WorkflowTraceSpan,
childWorkflowName: string
): WorkflowTraceSpan {
const metadata: Record<string, unknown> = {
...(span.metadata ?? {}),
isFromChildWorkflow: true,
childWorkflowName,
}
const transformedChildren = Array.isArray(span.children)
? span.children.map((childSpan) =>
this.transformSpanForChildWorkflow(childSpan, childWorkflowName)
)
: undefined
return {
...span,
name: this.cleanChildSpanName(span.name, childWorkflowName),
metadata: {
...span.metadata,
isFromChildWorkflow: true,
childWorkflowName,
},
metadata,
...(transformedChildren ? { children: transformedChildren } : {}),
}
if (span.children && Array.isArray(span.children)) {
transformedSpan.children = span.children.map((childSpan: any) =>
this.transformSpanForChildWorkflow(childSpan, childWorkflowName)
)
}
if (span.output?.childTraceSpans) {
transformedSpan.output = {
...transformedSpan.output,
childTraceSpans: span.output.childTraceSpans,
}
}
return transformedSpan
}
/**
* Cleans up child span names for readability
*/
private cleanChildSpanName(spanName: string, childWorkflowName: string): string {
if (spanName.includes(`${childWorkflowName}:`)) {
const cleanName = spanName.replace(`${childWorkflowName}:`, '').trim()
private processChildWorkflowSpans(spans: TraceSpan[]): WorkflowTraceSpan[] {
const processed: WorkflowTraceSpan[] = []
if (cleanName === 'Workflow Execution') {
return `${childWorkflowName} workflow`
spans.forEach((span) => {
if (this.isSyntheticWorkflowWrapper(span)) {
if (span.children && Array.isArray(span.children)) {
processed.push(...this.processChildWorkflowSpans(span.children))
}
return
}
if (cleanName.startsWith('Agent ')) {
return `${cleanName}`
const workflowSpan: WorkflowTraceSpan = {
...span,
}
return `${cleanName}`
}
if (Array.isArray(workflowSpan.children)) {
workflowSpan.children = this.processChildWorkflowSpans(workflowSpan.children as TraceSpan[])
}
if (spanName === 'Workflow Execution') {
return `${childWorkflowName} workflow`
}
processed.push(workflowSpan)
})
return `${spanName}`
return processed
}
private flattenChildWorkflowSpans(spans: TraceSpan[]): WorkflowTraceSpan[] {
const flattened: WorkflowTraceSpan[] = []
spans.forEach((span) => {
if (this.isSyntheticWorkflowWrapper(span)) {
if (span.children && Array.isArray(span.children)) {
flattened.push(...this.flattenChildWorkflowSpans(span.children))
}
return
}
const workflowSpan: WorkflowTraceSpan = {
...span,
}
if (Array.isArray(workflowSpan.children)) {
const childSpans = workflowSpan.children as TraceSpan[]
workflowSpan.children = this.flattenChildWorkflowSpans(childSpans)
}
if (workflowSpan.output && typeof workflowSpan.output === 'object') {
const { childTraceSpans: nestedChildSpans, ...outputRest } = workflowSpan.output as {
childTraceSpans?: TraceSpan[]
} & Record<string, unknown>
if (Array.isArray(nestedChildSpans) && nestedChildSpans.length > 0) {
const flattenedNestedChildren = this.flattenChildWorkflowSpans(nestedChildSpans)
workflowSpan.children = [...(workflowSpan.children || []), ...flattenedNestedChildren]
}
workflowSpan.output = outputRest
}
flattened.push(workflowSpan)
})
return flattened
}
private toExecutionResult(result: ExecutionResult | StreamingExecution): ExecutionResult {
return 'execution' in result ? result.execution : result
}
private isSyntheticWorkflowWrapper(span: TraceSpan | undefined): boolean {
if (!span || span.type !== 'workflow') return false
return !span.blockId
}
/**
* Maps child workflow output to parent block output
*/
private mapChildOutputToParent(
childResult: any,
childResult: ExecutionResult,
childWorkflowId: string,
childWorkflowName: string,
duration: number,
childTraceSpans?: any[]
childTraceSpans?: WorkflowTraceSpan[]
): BlockOutput {
const success = childResult.success !== false
if (!success) {

View File

@@ -3,7 +3,7 @@ import { createLogger } from '@/lib/logs/console/logger'
import type { TraceSpan } from '@/lib/logs/types'
import { getBlock } from '@/blocks'
import type { BlockOutput } from '@/blocks/types'
import { BlockType } from '@/executor/consts'
import { BlockType, isWorkflowBlockType } from '@/executor/consts'
import {
AgentBlockHandler,
ApiBlockHandler,
@@ -2182,7 +2182,7 @@ export class Executor {
new Date(blockLog.endedAt).getTime() - new Date(blockLog.startedAt).getTime()
// If this error came from a child workflow execution, persist its trace spans on the log
if (block.metadata?.id === BlockType.WORKFLOW) {
if (isWorkflowBlockType(block.metadata?.id)) {
this.attachChildWorkflowSpansToLog(blockLog, error)
}
@@ -2272,7 +2272,7 @@ export class Executor {
}
// Preserve child workflow spans on the block state so downstream logging can render them
if (block.metadata?.id === BlockType.WORKFLOW) {
if (isWorkflowBlockType(block.metadata?.id)) {
this.attachChildWorkflowSpansToOutput(errorOutput, error)
}
@@ -2283,7 +2283,42 @@ export class Executor {
executionTime: blockLog.durationMs,
})
// If there are error paths to follow, return error output instead of throwing
const failureEndTime = context.metadata.endTime ?? new Date().toISOString()
if (!context.metadata.endTime) {
context.metadata.endTime = failureEndTime
}
const failureDuration = context.metadata.startTime
? Math.max(
0,
new Date(failureEndTime).getTime() - new Date(context.metadata.startTime).getTime()
)
: (context.metadata.duration ?? 0)
context.metadata.duration = failureDuration
const failureMetadata = {
...context.metadata,
endTime: failureEndTime,
duration: failureDuration,
workflowConnections: this.actualWorkflow.connections.map((conn) => ({
source: conn.source,
target: conn.target,
})),
}
const upstreamExecutionResult = (error as { executionResult?: ExecutionResult } | null)
?.executionResult
const executionResultPayload: ExecutionResult = {
success: false,
output: upstreamExecutionResult?.output ?? errorOutput,
error: upstreamExecutionResult?.error ?? this.extractErrorMessage(error),
logs: [...context.blockLogs],
metadata: {
...failureMetadata,
...(upstreamExecutionResult?.metadata ?? {}),
workflowConnections: failureMetadata.workflowConnections,
},
}
if (hasErrorPath) {
// Return the error output to allow execution to continue along error path
return errorOutput
@@ -2316,7 +2351,17 @@ export class Executor {
errorMessage: this.extractErrorMessage(error),
})
throw new Error(errorMessage)
const executionError = new Error(errorMessage)
;(executionError as any).executionResult = executionResultPayload
if (Array.isArray((error as { childTraceSpans?: TraceSpan[] } | null)?.childTraceSpans)) {
;(executionError as any).childTraceSpans = (
error as { childTraceSpans?: TraceSpan[] }
).childTraceSpans
;(executionError as any).childWorkflowName = (
error as { childWorkflowName?: string }
).childWorkflowName
}
throw executionError
}
}
@@ -2329,11 +2374,12 @@ export class Executor {
error as { childTraceSpans?: TraceSpan[]; childWorkflowName?: string } | null | undefined
)?.childTraceSpans
if (Array.isArray(spans) && spans.length > 0) {
const childWorkflowName = (error as { childWorkflowName?: string } | null | undefined)
?.childWorkflowName
blockLog.output = {
...(blockLog.output || {}),
childTraceSpans: spans,
childWorkflowName: (error as { childWorkflowName?: string } | null | undefined)
?.childWorkflowName,
childWorkflowName,
}
}
}
@@ -2516,7 +2562,7 @@ export class Executor {
* Preserves child workflow trace spans for proper nesting
*/
private integrateChildWorkflowLogs(block: SerializedBlock, output: NormalizedBlockOutput): void {
if (block.metadata?.id !== BlockType.WORKFLOW) {
if (!isWorkflowBlockType(block.metadata?.id)) {
return
}

View File

@@ -125,6 +125,27 @@ export async function updateApiKeyLastUsed(keyId: string): Promise<void> {
}
}
/**
* Given a pinned API key ID, resolve the owning userId (actor).
* Returns null if not found.
*/
export async function getApiKeyOwnerUserId(
pinnedApiKeyId: string | null | undefined
): Promise<string | null> {
if (!pinnedApiKeyId) return null
try {
const rows = await db
.select({ userId: apiKeyTable.userId })
.from(apiKeyTable)
.where(eq(apiKeyTable.id, pinnedApiKeyId))
.limit(1)
return rows[0]?.userId ?? null
} catch (error) {
logger.error('Error resolving API key owner', { error, pinnedApiKeyId })
return null
}
}
/**
* Get the API encryption key from the environment
* @returns The API encryption key

View File

@@ -37,6 +37,7 @@ export interface SessionErrorCompleteParams {
message?: string
stackTrace?: string
}
traceSpans?: TraceSpan[]
}
export class LoggingSession {
@@ -131,7 +132,7 @@ export class LoggingSession {
async completeWithError(params: SessionErrorCompleteParams = {}): Promise<void> {
try {
const { endedAt, totalDurationMs, error } = params
const { endedAt, totalDurationMs, error, traceSpans } = params
const endTime = endedAt ? new Date(endedAt) : new Date()
const durationMs = typeof totalDurationMs === 'number' ? totalDurationMs : 0
@@ -151,19 +152,19 @@ export class LoggingSession {
const message = error?.message || 'Execution failed before starting blocks'
const syntheticErrorSpan: TraceSpan[] = [
{
id: 'pre-execution-validation',
name: 'Workflow Error',
type: 'validation',
duration: Math.max(1, durationMs),
startTime: startTime.toISOString(),
endTime: endTime.toISOString(),
status: 'error',
children: [],
output: { error: message },
},
]
const hasProvidedSpans = Array.isArray(traceSpans) && traceSpans.length > 0
const errorSpan: TraceSpan = {
id: 'workflow-error-root',
name: 'Workflow Error',
type: 'workflow',
duration: Math.max(1, durationMs),
startTime: startTime.toISOString(),
endTime: endTime.toISOString(),
status: 'error',
...(hasProvidedSpans ? {} : { children: [] }),
output: { error: message },
}
await executionLogger.completeWorkflowExecution({
executionId: this.executionId,
@@ -171,7 +172,7 @@ export class LoggingSession {
totalDurationMs: Math.max(1, durationMs),
costSummary,
finalOutput: { error: message },
traceSpans: syntheticErrorSpan,
traceSpans: hasProvidedSpans ? traceSpans : [errorSpan],
})
if (this.requestId) {

View File

@@ -582,6 +582,195 @@ describe('buildTraceSpans', () => {
// Verify no toolCalls property exists (since we're using children instead)
expect(agentSpan.toolCalls).toBeUndefined()
})
test('should flatten nested child workflow trace spans recursively', () => {
const nestedChildSpan = {
id: 'nested-workflow-span',
name: 'Nested Workflow Block',
type: 'workflow',
blockId: 'nested-workflow-block-id',
duration: 3000,
startTime: '2024-01-01T10:00:01.000Z',
endTime: '2024-01-01T10:00:04.000Z',
status: 'success' as const,
output: {
childTraceSpans: [
{
id: 'grand-wrapper',
name: 'Workflow Execution',
type: 'workflow',
duration: 3000,
startTime: '2024-01-01T10:00:01.000Z',
endTime: '2024-01-01T10:00:04.000Z',
status: 'success' as const,
children: [
{
id: 'grand-child-block',
name: 'Deep API Call',
type: 'api',
duration: 1500,
startTime: '2024-01-01T10:00:01.500Z',
endTime: '2024-01-01T10:00:03.000Z',
status: 'success' as const,
input: { path: '/v1/test' },
output: { result: 'ok' },
},
],
},
],
},
}
const toolSpan = {
id: 'child-tool-span',
name: 'Helper Tool',
type: 'tool',
duration: 1000,
startTime: '2024-01-01T10:00:04.000Z',
endTime: '2024-01-01T10:00:05.000Z',
status: 'success' as const,
}
const mockExecutionResult: ExecutionResult = {
success: true,
output: { result: 'parent output' },
logs: [
{
blockId: 'workflow-1',
blockName: 'Child Workflow',
blockType: 'workflow',
startedAt: '2024-01-01T10:00:00.000Z',
endedAt: '2024-01-01T10:00:05.000Z',
durationMs: 5000,
success: true,
output: {
childWorkflowName: 'Child Workflow',
childTraceSpans: [
{
id: 'child-wrapper',
name: 'Workflow Execution',
type: 'workflow',
duration: 5000,
startTime: '2024-01-01T10:00:00.000Z',
endTime: '2024-01-01T10:00:05.000Z',
status: 'success' as const,
children: [nestedChildSpan, toolSpan],
},
],
},
},
],
}
const { traceSpans } = buildTraceSpans(mockExecutionResult)
expect(traceSpans).toHaveLength(1)
const workflowSpan = traceSpans[0]
expect(workflowSpan.type).toBe('workflow')
expect(workflowSpan.children).toBeDefined()
expect(workflowSpan.children).toHaveLength(2)
const nestedWorkflowSpan = workflowSpan.children?.find((span) => span.type === 'workflow')
expect(nestedWorkflowSpan).toBeDefined()
expect(nestedWorkflowSpan?.name).toBe('Nested Workflow Block')
expect(nestedWorkflowSpan?.children).toBeDefined()
expect(nestedWorkflowSpan?.children).toHaveLength(1)
expect(nestedWorkflowSpan?.children?.[0].name).toBe('Deep API Call')
expect(nestedWorkflowSpan?.children?.[0].type).toBe('api')
const helperToolSpan = workflowSpan.children?.find((span) => span.id === 'child-tool-span')
expect(helperToolSpan?.type).toBe('tool')
const syntheticWrappers = workflowSpan.children?.filter(
(span) => span.name === 'Workflow Execution'
)
expect(syntheticWrappers).toHaveLength(0)
})
test('should handle nested child workflow errors with proper hierarchy', () => {
const functionErrorSpan = {
id: 'function-error-span',
name: 'Function 1',
type: 'function',
duration: 200,
startTime: '2024-01-01T10:01:02.000Z',
endTime: '2024-01-01T10:01:02.200Z',
status: 'error' as const,
blockId: 'function-1',
output: {
error: 'Syntax Error: Line 1: `retur "HELLO"` - Unexpected string',
},
}
const rainbowCupcakeSpan = {
id: 'rainbow-workflow-span',
name: 'Rainbow Cupcake',
type: 'workflow',
duration: 300,
startTime: '2024-01-01T10:01:02.000Z',
endTime: '2024-01-01T10:01:02.300Z',
status: 'error' as const,
blockId: 'workflow-rainbow',
output: {
childWorkflowName: 'rainbow-cupcake',
error: 'Syntax Error: Line 1: `retur "HELLO"` - Unexpected string',
childTraceSpans: [functionErrorSpan],
},
}
const mockExecutionResult: ExecutionResult = {
success: false,
output: { result: null },
metadata: {
duration: 3000,
startTime: '2024-01-01T10:01:00.000Z',
},
logs: [
{
blockId: 'workflow-silk',
blockName: 'Silk Pond',
blockType: 'workflow',
startedAt: '2024-01-01T10:01:00.000Z',
endedAt: '2024-01-01T10:01:03.000Z',
durationMs: 3000,
success: false,
error:
'Error in child workflow "silk-pond": Error in child workflow "rainbow-cupcake": Syntax Error',
output: {
childWorkflowName: 'silk-pond',
childTraceSpans: [rainbowCupcakeSpan],
},
},
],
}
const { traceSpans } = buildTraceSpans(mockExecutionResult)
expect(traceSpans).toHaveLength(1)
const workflowExecutionSpan = traceSpans[0]
expect(workflowExecutionSpan.name).toBe('Workflow Execution')
expect(workflowExecutionSpan.status).toBe('error')
expect(workflowExecutionSpan.children).toBeDefined()
expect(workflowExecutionSpan.children).toHaveLength(1)
const silkPondSpan = workflowExecutionSpan.children?.[0]
expect(silkPondSpan?.name).toBe('Silk Pond')
expect(silkPondSpan?.status).toBe('error')
expect(silkPondSpan?.children).toBeDefined()
expect(silkPondSpan?.children).toHaveLength(1)
const rainbowSpan = silkPondSpan?.children?.[0]
expect(rainbowSpan?.name).toBe('Rainbow Cupcake')
expect(rainbowSpan?.status).toBe('error')
expect(rainbowSpan?.type).toBe('workflow')
expect(rainbowSpan?.children).toBeDefined()
expect(rainbowSpan?.children).toHaveLength(1)
const functionSpan = rainbowSpan?.children?.[0]
expect(functionSpan?.name).toBe('Function 1')
expect(functionSpan?.status).toBe('error')
expect((functionSpan?.output as { error?: string })?.error).toContain('Syntax Error')
})
})
describe('stripCustomToolPrefix', () => {

View File

@@ -1,9 +1,63 @@
import { createLogger } from '@/lib/logs/console/logger'
import type { TraceSpan } from '@/lib/logs/types'
import { isWorkflowBlockType } from '@/executor/consts'
import type { ExecutionResult } from '@/executor/types'
const logger = createLogger('TraceSpans')
function isSyntheticWorkflowWrapper(span: TraceSpan | undefined): boolean {
if (!span || span.type !== 'workflow') return false
return !span.blockId
}
function flattenWorkflowChildren(spans: TraceSpan[]): TraceSpan[] {
const flattened: TraceSpan[] = []
spans.forEach((span) => {
if (isSyntheticWorkflowWrapper(span)) {
if (span.children && Array.isArray(span.children)) {
flattened.push(...flattenWorkflowChildren(span.children))
}
return
}
const processedSpan = ensureNestedWorkflowsProcessed(span)
flattened.push(processedSpan)
})
return flattened
}
function getTraceSpanKey(span: TraceSpan): string {
if (span.id) {
return span.id
}
const name = span.name || 'span'
const start = span.startTime || 'unknown-start'
const end = span.endTime || 'unknown-end'
return `${name}|${start}|${end}`
}
function mergeTraceSpanChildren(...childGroups: TraceSpan[][]): TraceSpan[] {
const merged: TraceSpan[] = []
const seen = new Set<string>()
childGroups.forEach((group) => {
group.forEach((child) => {
const key = getTraceSpanKey(child)
if (seen.has(key)) {
return
}
seen.add(key)
merged.push(child)
})
})
return merged
}
// Helper function to build a tree of trace spans from execution logs
export function buildTraceSpans(result: ExecutionResult): {
traceSpans: TraceSpan[]
@@ -56,11 +110,8 @@ export function buildTraceSpans(result: ExecutionResult): {
}
}
// Prefer human-friendly workflow block naming if provided by child execution mapping
const displayName =
log.blockType === 'workflow' && log.output?.childWorkflowName
? `${log.output.childWorkflowName} workflow`
: log.blockName || log.blockId
// Use block name consistently for all block types
const displayName = log.blockName || log.blockId
const span: TraceSpan = {
id: spanId,
@@ -106,42 +157,11 @@ export function buildTraceSpans(result: ExecutionResult): {
;(span as any).model = log.output.model
}
// Handle child workflow spans for workflow blocks
if (
log.blockType === 'workflow' &&
log.output?.childTraceSpans &&
Array.isArray(log.output.childTraceSpans)
) {
// Convert child trace spans to be direct children of this workflow block span
const childTraceSpans = log.output.childTraceSpans as TraceSpan[]
// Process child workflow spans and add them as children
const flatChildSpans: TraceSpan[] = []
childTraceSpans.forEach((childSpan) => {
// Skip the synthetic workflow span wrapper - we only want the actual block executions
if (
childSpan.type === 'workflow' &&
(childSpan.name === 'Workflow Execution' || childSpan.name.endsWith(' workflow'))
) {
// Add its children directly, skipping the synthetic wrapper
if (childSpan.children && Array.isArray(childSpan.children)) {
flatChildSpans.push(...childSpan.children)
}
} else {
// This is a regular span, add it directly
// But first, ensure nested workflow blocks in this span are also processed
const processedSpan = ensureNestedWorkflowsProcessed(childSpan)
flatChildSpans.push(processedSpan)
}
})
// Add the child spans as children of this workflow block
span.children = flatChildSpans
}
// Enhanced approach: Use timeSegments for sequential flow if available
// This provides the actual model→tool→model execution sequence
// Skip for workflow blocks since they will be processed via output.childTraceSpans at the end
if (
!isWorkflowBlockType(log.blockType) &&
log.output?.providerTiming?.timeSegments &&
Array.isArray(log.output.providerTiming.timeSegments)
) {
@@ -250,6 +270,17 @@ export function buildTraceSpans(result: ExecutionResult): {
}
}
// Handle child workflow spans for workflow blocks - process at the end to avoid being overwritten
if (
isWorkflowBlockType(log.blockType) &&
log.output?.childTraceSpans &&
Array.isArray(log.output.childTraceSpans)
) {
const childTraceSpans = log.output.childTraceSpans as TraceSpan[]
const flattenedChildren = flattenWorkflowChildren(childTraceSpans)
span.children = mergeTraceSpanChildren(span.children || [], flattenedChildren)
}
// Store in map
spanMap.set(spanId, span)
})
@@ -327,7 +358,7 @@ export function buildTraceSpans(result: ExecutionResult): {
}
// Check if this span could be a parent to future spans
if (log.blockType === 'agent' || log.blockType === 'workflow') {
if (log.blockType === 'agent' || isWorkflowBlockType(log.blockType)) {
spanStack.push(span)
}
})
@@ -594,36 +625,41 @@ function groupIterationBlocks(spans: TraceSpan[]): TraceSpan[] {
}
function ensureNestedWorkflowsProcessed(span: TraceSpan): TraceSpan {
const processedSpan = { ...span }
const processedSpan: TraceSpan = { ...span }
if (
span.type === 'workflow' &&
span.output?.childTraceSpans &&
Array.isArray(span.output.childTraceSpans)
) {
const childTraceSpans = span.output.childTraceSpans as TraceSpan[]
const nestedChildren: TraceSpan[] = []
childTraceSpans.forEach((childSpan) => {
if (
childSpan.type === 'workflow' &&
(childSpan.name === 'Workflow Execution' || childSpan.name.endsWith(' workflow'))
) {
if (childSpan.children && Array.isArray(childSpan.children)) {
childSpan.children.forEach((grandchildSpan) => {
nestedChildren.push(ensureNestedWorkflowsProcessed(grandchildSpan))
})
}
} else {
nestedChildren.push(ensureNestedWorkflowsProcessed(childSpan))
}
})
processedSpan.children = nestedChildren
} else if (span.children && Array.isArray(span.children)) {
processedSpan.children = span.children.map((child) => ensureNestedWorkflowsProcessed(child))
if (processedSpan.output && typeof processedSpan.output === 'object') {
processedSpan.output = { ...processedSpan.output }
}
const normalizedChildren = Array.isArray(span.children)
? span.children.map((child) => ensureNestedWorkflowsProcessed(child))
: []
const outputChildSpans = (() => {
if (!processedSpan.output || typeof processedSpan.output !== 'object') {
return [] as TraceSpan[]
}
const maybeChildSpans = (processedSpan.output as { childTraceSpans?: TraceSpan[] })
.childTraceSpans
if (!Array.isArray(maybeChildSpans) || maybeChildSpans.length === 0) {
return [] as TraceSpan[]
}
return flattenWorkflowChildren(maybeChildSpans)
})()
const mergedChildren = mergeTraceSpanChildren(normalizedChildren, outputChildSpans)
if (processedSpan.output && 'childTraceSpans' in processedSpan.output) {
const { childTraceSpans, ...cleanOutput } = processedSpan.output as {
childTraceSpans?: TraceSpan[]
} & Record<string, unknown>
processedSpan.output = cleanOutput
}
processedSpan.children = mergedChildren.length > 0 ? mergedChildren : undefined
return processedSpan
}

View File

@@ -2,6 +2,7 @@ import { db, webhook, workflow } from '@sim/db'
import { tasks } from '@trigger.dev/sdk'
import { and, eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { getApiKeyOwnerUserId } from '@/lib/api-key/service'
import { checkServerSideUsageLimits } from '@/lib/billing'
import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
import { env, isTruthy } from '@/lib/env'
@@ -268,18 +269,25 @@ export async function checkRateLimits(
requestId: string
): Promise<NextResponse | null> {
try {
const userSubscription = await getHighestPrioritySubscription(foundWorkflow.userId)
const actorUserId = await getApiKeyOwnerUserId(foundWorkflow.pinnedApiKeyId)
if (!actorUserId) {
logger.warn(`[${requestId}] Webhook requires pinned API key to attribute usage`)
return NextResponse.json({ message: 'Pinned API key required' }, { status: 200 })
}
const userSubscription = await getHighestPrioritySubscription(actorUserId)
const rateLimiter = new RateLimiter()
const rateLimitCheck = await rateLimiter.checkRateLimitWithSubscription(
foundWorkflow.userId,
actorUserId,
userSubscription,
'webhook',
true
)
if (!rateLimitCheck.allowed) {
logger.warn(`[${requestId}] Rate limit exceeded for webhook user ${foundWorkflow.userId}`, {
logger.warn(`[${requestId}] Rate limit exceeded for webhook user ${actorUserId}`, {
provider: foundWebhook.provider,
remaining: rateLimitCheck.remaining,
resetAt: rateLimitCheck.resetAt,
@@ -319,10 +327,17 @@ export async function checkUsageLimits(
}
try {
const usageCheck = await checkServerSideUsageLimits(foundWorkflow.userId)
const actorUserId = await getApiKeyOwnerUserId(foundWorkflow.pinnedApiKeyId)
if (!actorUserId) {
logger.warn(`[${requestId}] Webhook requires pinned API key to attribute usage`)
return NextResponse.json({ message: 'Pinned API key required' }, { status: 200 })
}
const usageCheck = await checkServerSideUsageLimits(actorUserId)
if (usageCheck.isExceeded) {
logger.warn(
`[${requestId}] User ${foundWorkflow.userId} has exceeded usage limits. Skipping webhook execution.`,
`[${requestId}] User ${actorUserId} has exceeded usage limits. Skipping webhook execution.`,
{
currentUsage: usageCheck.currentUsage,
limit: usageCheck.limit,
@@ -361,10 +376,16 @@ export async function queueWebhookExecution(
options: WebhookProcessorOptions
): Promise<NextResponse> {
try {
const actorUserId = await getApiKeyOwnerUserId(foundWorkflow.pinnedApiKeyId)
if (!actorUserId) {
logger.warn(`[${options.requestId}] Webhook requires pinned API key to attribute usage`)
return NextResponse.json({ message: 'Pinned API key required' }, { status: 200 })
}
const payload = {
webhookId: foundWebhook.id,
workflowId: foundWorkflow.id,
userId: foundWorkflow.userId,
userId: actorUserId,
provider: foundWebhook.provider,
body,
headers: Object.fromEntries(request.headers.entries()),