feat(child-workflows): nested execution snapshots (#3059)

* feat(child-workflows): nested execution snapshots

* cleanup typing

* address bugbot comments and fix tests

* do not cascade delete logs/snapshots

* fix few more inconsitencies

* fix external logs route

* add fallback color
This commit is contained in:
Vikhyath Mondreti
2026-01-28 19:40:52 -08:00
committed by GitHub
parent 20bb7cdec6
commit e0f1e66f4f
32 changed files with 10658 additions and 95 deletions

View File

@@ -56,7 +56,7 @@ export async function GET(_request: NextRequest, { params }: { params: Promise<{
deploymentVersionName: workflowDeploymentVersion.name,
})
.from(workflowExecutionLogs)
.innerJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
.leftJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
.leftJoin(
workflowDeploymentVersion,
eq(workflowDeploymentVersion.id, workflowExecutionLogs.deploymentVersionId)
@@ -65,7 +65,7 @@ export async function GET(_request: NextRequest, { params }: { params: Promise<{
permissions,
and(
eq(permissions.entityType, 'workspace'),
eq(permissions.entityId, workflow.workspaceId),
eq(permissions.entityId, workflowExecutionLogs.workspaceId),
eq(permissions.userId, userId)
)
)
@@ -77,17 +77,19 @@ export async function GET(_request: NextRequest, { params }: { params: Promise<{
return NextResponse.json({ error: 'Not found' }, { status: 404 })
}
const workflowSummary = {
id: log.workflowId,
name: log.workflowName,
description: log.workflowDescription,
color: log.workflowColor,
folderId: log.workflowFolderId,
userId: log.workflowUserId,
workspaceId: log.workflowWorkspaceId,
createdAt: log.workflowCreatedAt,
updatedAt: log.workflowUpdatedAt,
}
const workflowSummary = log.workflowId
? {
id: log.workflowId,
name: log.workflowName,
description: log.workflowDescription,
color: log.workflowColor,
folderId: log.workflowFolderId,
userId: log.workflowUserId,
workspaceId: log.workflowWorkspaceId,
createdAt: log.workflowCreatedAt,
updatedAt: log.workflowUpdatedAt,
}
: null
const response = {
id: log.id,

View File

@@ -1,5 +1,5 @@
import { db } from '@sim/db'
import { subscription, user, workflow, workflowExecutionLogs } from '@sim/db/schema'
import { subscription, user, workflowExecutionLogs, workspace } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq, inArray, lt, sql } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
@@ -40,17 +40,17 @@ export async function GET(request: NextRequest) {
const freeUserIds = freeUsers.map((u) => u.userId)
const workflowsQuery = await db
.select({ id: workflow.id })
.from(workflow)
.where(inArray(workflow.userId, freeUserIds))
const workspacesQuery = await db
.select({ id: workspace.id })
.from(workspace)
.where(inArray(workspace.billedAccountUserId, freeUserIds))
if (workflowsQuery.length === 0) {
logger.info('No workflows found for free users')
return NextResponse.json({ message: 'No workflows found for cleanup' })
if (workspacesQuery.length === 0) {
logger.info('No workspaces found for free users')
return NextResponse.json({ message: 'No workspaces found for cleanup' })
}
const workflowIds = workflowsQuery.map((w) => w.id)
const workspaceIds = workspacesQuery.map((w) => w.id)
const results = {
enhancedLogs: {
@@ -77,7 +77,7 @@ export async function GET(request: NextRequest) {
let batchesProcessed = 0
let hasMoreLogs = true
logger.info(`Starting enhanced logs cleanup for ${workflowIds.length} workflows`)
logger.info(`Starting enhanced logs cleanup for ${workspaceIds.length} workspaces`)
while (hasMoreLogs && batchesProcessed < MAX_BATCHES) {
const oldEnhancedLogs = await db
@@ -99,7 +99,7 @@ export async function GET(request: NextRequest) {
.from(workflowExecutionLogs)
.where(
and(
inArray(workflowExecutionLogs.workflowId, workflowIds),
inArray(workflowExecutionLogs.workspaceId, workspaceIds),
lt(workflowExecutionLogs.createdAt, retentionDate)
)
)
@@ -127,7 +127,7 @@ export async function GET(request: NextRequest) {
customKey: enhancedLogKey,
metadata: {
logId: String(log.id),
workflowId: String(log.workflowId),
workflowId: String(log.workflowId ?? ''),
executionId: String(log.executionId),
logType: 'enhanced',
archivedAt: new Date().toISOString(),

View File

@@ -6,10 +6,11 @@ import {
workflowExecutionSnapshots,
} from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq } from 'drizzle-orm'
import { and, eq, inArray } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { checkHybridAuth } from '@/lib/auth/hybrid'
import { generateRequestId } from '@/lib/core/utils/request'
import type { TraceSpan, WorkflowExecutionLog } from '@/lib/logs/types'
const logger = createLogger('LogsByExecutionIdAPI')
@@ -48,14 +49,15 @@ export async function GET(
endedAt: workflowExecutionLogs.endedAt,
totalDurationMs: workflowExecutionLogs.totalDurationMs,
cost: workflowExecutionLogs.cost,
executionData: workflowExecutionLogs.executionData,
})
.from(workflowExecutionLogs)
.innerJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
.leftJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
.innerJoin(
permissions,
and(
eq(permissions.entityType, 'workspace'),
eq(permissions.entityId, workflow.workspaceId),
eq(permissions.entityId, workflowExecutionLogs.workspaceId),
eq(permissions.userId, authenticatedUserId)
)
)
@@ -78,10 +80,42 @@ export async function GET(
return NextResponse.json({ error: 'Workflow state snapshot not found' }, { status: 404 })
}
const executionData = workflowLog.executionData as WorkflowExecutionLog['executionData']
const traceSpans = (executionData?.traceSpans as TraceSpan[]) || []
const childSnapshotIds = new Set<string>()
const collectSnapshotIds = (spans: TraceSpan[]) => {
spans.forEach((span) => {
const snapshotId = span.childWorkflowSnapshotId
if (typeof snapshotId === 'string') {
childSnapshotIds.add(snapshotId)
}
if (span.children?.length) {
collectSnapshotIds(span.children)
}
})
}
if (traceSpans.length > 0) {
collectSnapshotIds(traceSpans)
}
const childWorkflowSnapshots =
childSnapshotIds.size > 0
? await db
.select()
.from(workflowExecutionSnapshots)
.where(inArray(workflowExecutionSnapshots.id, Array.from(childSnapshotIds)))
: []
const childSnapshotMap = childWorkflowSnapshots.reduce<Record<string, unknown>>((acc, snap) => {
acc[snap.id] = snap.stateData
return acc
}, {})
const response = {
executionId,
workflowId: workflowLog.workflowId,
workflowState: snapshot.stateData,
childWorkflowSnapshots: childSnapshotMap,
executionMetadata: {
trigger: workflowLog.trigger,
startedAt: workflowLog.startedAt.toISOString(),

View File

@@ -1,7 +1,7 @@
import { db } from '@sim/db'
import { permissions, workflow, workflowExecutionLogs } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, desc, eq } from 'drizzle-orm'
import { and, desc, eq, sql } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { getSession } from '@/lib/auth'
import { buildFilterConditions, LogFilterParamsSchema } from '@/lib/logs/filters'
@@ -41,7 +41,7 @@ export async function GET(request: NextRequest) {
totalDurationMs: workflowExecutionLogs.totalDurationMs,
cost: workflowExecutionLogs.cost,
executionData: workflowExecutionLogs.executionData,
workflowName: workflow.name,
workflowName: sql<string>`COALESCE(${workflow.name}, 'Deleted Workflow')`,
}
const workspaceCondition = eq(workflowExecutionLogs.workspaceId, params.workspaceId)
@@ -74,7 +74,7 @@ export async function GET(request: NextRequest) {
const rows = await db
.select(selectColumns)
.from(workflowExecutionLogs)
.innerJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
.leftJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
.innerJoin(
permissions,
and(

View File

@@ -116,7 +116,7 @@ export async function GET(request: NextRequest) {
workflowDeploymentVersion,
eq(workflowDeploymentVersion.id, workflowExecutionLogs.deploymentVersionId)
)
.innerJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
.leftJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
.innerJoin(
permissions,
and(
@@ -190,7 +190,7 @@ export async function GET(request: NextRequest) {
pausedExecutions,
eq(pausedExecutions.executionId, workflowExecutionLogs.executionId)
)
.innerJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
.leftJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
.innerJoin(
permissions,
and(
@@ -314,17 +314,19 @@ export async function GET(request: NextRequest) {
} catch {}
}
const workflowSummary = {
id: log.workflowId,
name: log.workflowName,
description: log.workflowDescription,
color: log.workflowColor,
folderId: log.workflowFolderId,
userId: log.workflowUserId,
workspaceId: log.workflowWorkspaceId,
createdAt: log.workflowCreatedAt,
updatedAt: log.workflowUpdatedAt,
}
const workflowSummary = log.workflowId
? {
id: log.workflowId,
name: log.workflowName,
description: log.workflowDescription,
color: log.workflowColor,
folderId: log.workflowFolderId,
userId: log.workflowUserId,
workspaceId: log.workflowWorkspaceId,
createdAt: log.workflowCreatedAt,
updatedAt: log.workflowUpdatedAt,
}
: null
return {
id: log.id,

View File

@@ -72,7 +72,7 @@ export async function GET(request: NextRequest) {
maxTime: sql<string>`MAX(${workflowExecutionLogs.startedAt})`,
})
.from(workflowExecutionLogs)
.innerJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
.leftJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
.innerJoin(
permissions,
and(
@@ -103,8 +103,8 @@ export async function GET(request: NextRequest) {
const statsQuery = await db
.select({
workflowId: workflowExecutionLogs.workflowId,
workflowName: workflow.name,
workflowId: sql<string>`COALESCE(${workflowExecutionLogs.workflowId}, 'deleted')`,
workflowName: sql<string>`COALESCE(${workflow.name}, 'Deleted Workflow')`,
segmentIndex:
sql<number>`FLOOR(EXTRACT(EPOCH FROM (${workflowExecutionLogs.startedAt} - ${startTimeIso}::timestamp)) * 1000 / ${segmentMs})`.as(
'segment_index'
@@ -120,7 +120,7 @@ export async function GET(request: NextRequest) {
),
})
.from(workflowExecutionLogs)
.innerJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
.leftJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
.innerJoin(
permissions,
and(
@@ -130,7 +130,11 @@ export async function GET(request: NextRequest) {
)
)
.where(whereCondition)
.groupBy(workflowExecutionLogs.workflowId, workflow.name, sql`segment_index`)
.groupBy(
sql`COALESCE(${workflowExecutionLogs.workflowId}, 'deleted')`,
sql`COALESCE(${workflow.name}, 'Deleted Workflow')`,
sql`segment_index`
)
const workflowMap = new Map<
string,

View File

@@ -215,6 +215,7 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{
}
for (const log of logs) {
if (!log.workflowId) continue // Skip logs for deleted workflows
const idx = Math.min(
segments - 1,
Math.max(0, Math.floor((log.startedAt.getTime() - start.getTime()) / segmentMs))

View File

@@ -1,5 +1,9 @@
import { memo } from 'react'
import { cn } from '@/lib/core/utils/cn'
import {
DELETED_WORKFLOW_COLOR,
DELETED_WORKFLOW_LABEL,
} from '@/app/workspace/[workspaceId]/logs/utils'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
import { StatusBar, type StatusBarSegment } from '..'
@@ -61,22 +65,32 @@ export function WorkflowsList({
<div>
{filteredExecutions.map((workflow, idx) => {
const isSelected = expandedWorkflowId === workflow.workflowId
const isDeletedWorkflow = workflow.workflowName === DELETED_WORKFLOW_LABEL
const workflowColor = isDeletedWorkflow
? DELETED_WORKFLOW_COLOR
: workflows[workflow.workflowId]?.color || '#64748b'
const canToggle = !isDeletedWorkflow
return (
<div
key={workflow.workflowId}
className={cn(
'flex h-[44px] cursor-pointer items-center gap-[16px] px-[24px] hover:bg-[var(--surface-3)] dark:hover:bg-[var(--surface-4)]',
'flex h-[44px] items-center gap-[16px] px-[24px] hover:bg-[var(--surface-3)] dark:hover:bg-[var(--surface-4)]',
canToggle ? 'cursor-pointer' : 'cursor-default',
isSelected && 'bg-[var(--surface-3)] dark:bg-[var(--surface-4)]'
)}
onClick={() => onToggleWorkflow(workflow.workflowId)}
onClick={() => {
if (canToggle) {
onToggleWorkflow(workflow.workflowId)
}
}}
>
{/* Workflow name with color */}
<div className='flex w-[160px] flex-shrink-0 items-center gap-[8px] pr-[8px]'>
<div
className='h-[10px] w-[10px] flex-shrink-0 rounded-[3px]'
style={{
backgroundColor: workflows[workflow.workflowId]?.color || '#64748b',
backgroundColor: workflowColor,
}}
/>
<span className='min-w-0 truncate font-medium text-[12px] text-[var(--text-primary)]'>

View File

@@ -80,6 +80,9 @@ export function ExecutionSnapshot({
}, [executionId, closeMenu])
const workflowState = data?.workflowState as WorkflowState | undefined
const childWorkflowSnapshots = data?.childWorkflowSnapshots as
| Record<string, WorkflowState>
| undefined
const renderContent = () => {
if (isLoading) {
@@ -148,6 +151,7 @@ export function ExecutionSnapshot({
key={executionId}
workflowState={workflowState}
traceSpans={traceSpans}
childWorkflowSnapshots={childWorkflowSnapshots}
className={className}
height={height}
width={width}

View File

@@ -26,6 +26,8 @@ import {
} from '@/app/workspace/[workspaceId]/logs/components'
import { useLogDetailsResize } from '@/app/workspace/[workspaceId]/logs/hooks'
import {
DELETED_WORKFLOW_COLOR,
DELETED_WORKFLOW_LABEL,
formatDate,
getDisplayStatus,
StatusBadge,
@@ -386,22 +388,25 @@ export const LogDetails = memo(function LogDetails({
</div>
{/* Workflow Card */}
{log.workflow && (
<div className='flex w-0 min-w-0 flex-1 flex-col gap-[8px]'>
<div className='font-medium text-[12px] text-[var(--text-tertiary)]'>
Workflow
</div>
<div className='flex min-w-0 items-center gap-[8px]'>
<div
className='h-[10px] w-[10px] flex-shrink-0 rounded-[3px]'
style={{ backgroundColor: log.workflow?.color }}
/>
<span className='min-w-0 flex-1 truncate font-medium text-[14px] text-[var(--text-secondary)]'>
{log.workflow.name}
</span>
</div>
<div className='flex w-0 min-w-0 flex-1 flex-col gap-[8px]'>
<div className='font-medium text-[12px] text-[var(--text-tertiary)]'>
Workflow
</div>
)}
<div className='flex min-w-0 items-center gap-[8px]'>
<div
className='h-[10px] w-[10px] flex-shrink-0 rounded-[3px]'
style={{
backgroundColor:
log.workflow?.color ||
(!log.workflowId ? DELETED_WORKFLOW_COLOR : undefined),
}}
/>
<span className='min-w-0 flex-1 truncate font-medium text-[14px] text-[var(--text-secondary)]'>
{log.workflow?.name ||
(!log.workflowId ? DELETED_WORKFLOW_LABEL : 'Unknown')}
</span>
</div>
</div>
</div>
{/* Execution ID */}

View File

@@ -7,6 +7,8 @@ import { List, type RowComponentProps, useListRef } from 'react-window'
import { Badge, buttonVariants } from '@/components/emcn'
import { cn } from '@/lib/core/utils/cn'
import {
DELETED_WORKFLOW_COLOR,
DELETED_WORKFLOW_LABEL,
formatDate,
formatDuration,
getDisplayStatus,
@@ -33,6 +35,11 @@ interface LogRowProps {
const LogRow = memo(
function LogRow({ log, isSelected, onClick, onContextMenu, selectedRowRef }: LogRowProps) {
const formattedDate = useMemo(() => formatDate(log.createdAt), [log.createdAt])
const isDeletedWorkflow = !log.workflow?.id && !log.workflowId
const workflowName = isDeletedWorkflow
? DELETED_WORKFLOW_LABEL
: log.workflow?.name || 'Unknown'
const workflowColor = isDeletedWorkflow ? DELETED_WORKFLOW_COLOR : log.workflow?.color
const handleClick = useCallback(() => onClick(log), [onClick, log])
@@ -78,10 +85,15 @@ const LogRow = memo(
>
<div
className='h-[10px] w-[10px] flex-shrink-0 rounded-[3px]'
style={{ backgroundColor: log.workflow?.color }}
style={{ backgroundColor: workflowColor }}
/>
<span className='min-w-0 truncate font-medium text-[12px] text-[var(--text-primary)]'>
{log.workflow?.name || 'Unknown'}
<span
className={cn(
'min-w-0 truncate font-medium text-[12px]',
isDeletedWorkflow ? 'text-[var(--text-tertiary)]' : 'text-[var(--text-primary)]'
)}
>
{workflowName}
</span>
</div>

View File

@@ -27,6 +27,9 @@ export const LOG_COLUMN_ORDER: readonly LogColumnKey[] = [
'duration',
] as const
export const DELETED_WORKFLOW_LABEL = 'Deleted Workflow'
export const DELETED_WORKFLOW_COLOR = 'var(--text-tertiary)'
export type LogStatus = 'error' | 'pending' | 'running' | 'info' | 'cancelled'
/**

View File

@@ -23,6 +23,7 @@ interface SelectorComboboxProps {
readOnly?: boolean
onOptionChange?: (value: string) => void
allowSearch?: boolean
missingOptionLabel?: string
}
export function SelectorCombobox({
@@ -37,6 +38,7 @@ export function SelectorCombobox({
readOnly,
onOptionChange,
allowSearch = true,
missingOptionLabel,
}: SelectorComboboxProps) {
const [storeValueRaw, setStoreValue] = useSubBlockValue<string | null | undefined>(
blockId,
@@ -60,7 +62,16 @@ export function SelectorCombobox({
detailId: activeValue,
})
const optionMap = useSelectorOptionMap(options, detailOption ?? undefined)
const selectedLabel = activeValue ? (optionMap.get(activeValue)?.label ?? activeValue) : ''
const hasMissingOption =
Boolean(activeValue) &&
Boolean(missingOptionLabel) &&
!isLoading &&
!optionMap.get(activeValue!)
const selectedLabel = activeValue
? hasMissingOption
? missingOptionLabel
: (optionMap.get(activeValue)?.label ?? activeValue)
: ''
const [inputValue, setInputValue] = useState(selectedLabel)
const previousActiveValue = useRef<string | undefined>(activeValue)

View File

@@ -1,6 +1,7 @@
'use client'
import { useMemo } from 'react'
import { DELETED_WORKFLOW_LABEL } from '@/app/workspace/[workspaceId]/logs/utils'
import { SelectorCombobox } from '@/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/editor/components/sub-block/components/selector-combobox/selector-combobox'
import type { SubBlockConfig } from '@/blocks/types'
import type { SelectorContext } from '@/hooks/selectors/types'
@@ -40,6 +41,7 @@ export function WorkflowSelectorInput({
isPreview={isPreview}
previewValue={previewValue}
placeholder={subBlock.placeholder || 'Select workflow...'}
missingOptionLabel={DELETED_WORKFLOW_LABEL}
/>
)
}

View File

@@ -37,6 +37,7 @@ import {
isSubBlockFeatureEnabled,
isSubBlockVisibleForMode,
} from '@/lib/workflows/subblocks/visibility'
import { DELETED_WORKFLOW_LABEL } from '@/app/workspace/[workspaceId]/logs/utils'
import { SubBlock } from '@/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/editor/components'
import { PreviewContextMenu } from '@/app/workspace/[workspaceId]/w/components/preview/components/preview-context-menu'
import { PreviewWorkflow } from '@/app/workspace/[workspaceId]/w/components/preview/components/preview-workflow'
@@ -693,6 +694,7 @@ interface ExecutionData {
output?: unknown
status?: string
durationMs?: number
childWorkflowSnapshotId?: string
}
interface WorkflowVariable {
@@ -717,6 +719,8 @@ interface PreviewEditorProps {
parallels?: Record<string, Parallel>
/** When true, shows "Not Executed" badge if no executionData is provided */
isExecutionMode?: boolean
/** Child workflow snapshots keyed by snapshot ID (execution mode only) */
childWorkflowSnapshots?: Record<string, WorkflowState>
/** Optional close handler - if not provided, no close button is shown */
onClose?: () => void
/** Callback to drill down into a nested workflow block */
@@ -742,6 +746,7 @@ function PreviewEditorContent({
loops,
parallels,
isExecutionMode = false,
childWorkflowSnapshots,
onClose,
onDrillDown,
}: PreviewEditorProps) {
@@ -771,17 +776,35 @@ function PreviewEditorContent({
const { data: childWorkflowState, isLoading: isLoadingChildWorkflow } = useWorkflowState(
childWorkflowId ?? undefined
)
const childWorkflowSnapshotId = executionData?.childWorkflowSnapshotId
const childWorkflowSnapshotState = childWorkflowSnapshotId
? childWorkflowSnapshots?.[childWorkflowSnapshotId]
: undefined
const resolvedChildWorkflowState = isExecutionMode
? childWorkflowSnapshotState
: childWorkflowState
const resolvedIsLoadingChildWorkflow = isExecutionMode ? false : isLoadingChildWorkflow
const isMissingChildWorkflow =
Boolean(childWorkflowId) && !resolvedIsLoadingChildWorkflow && !resolvedChildWorkflowState
/** Drills down into the child workflow or opens it in a new tab */
const handleExpandChildWorkflow = useCallback(() => {
if (!childWorkflowId || !childWorkflowState) return
if (!childWorkflowId) return
if (isExecutionMode && onDrillDown) {
onDrillDown(block.id, childWorkflowState)
if (!childWorkflowSnapshotState) return
onDrillDown(block.id, childWorkflowSnapshotState)
} else if (workspaceId) {
window.open(`/workspace/${workspaceId}/w/${childWorkflowId}`, '_blank', 'noopener,noreferrer')
}
}, [childWorkflowId, childWorkflowState, isExecutionMode, onDrillDown, block.id, workspaceId])
}, [
childWorkflowId,
childWorkflowSnapshotState,
isExecutionMode,
onDrillDown,
block.id,
workspaceId,
])
const contentRef = useRef<HTMLDivElement>(null)
const subBlocksRef = useRef<HTMLDivElement>(null)
@@ -1347,7 +1370,7 @@ function PreviewEditorContent({
Workflow Preview
</div>
<div className='relative h-[160px] overflow-hidden rounded-[4px] border border-[var(--border)]'>
{isLoadingChildWorkflow ? (
{resolvedIsLoadingChildWorkflow ? (
<div className='flex h-full items-center justify-center bg-[var(--surface-3)]'>
<div
className='h-[18px] w-[18px] animate-spin rounded-full'
@@ -1360,11 +1383,11 @@ function PreviewEditorContent({
}}
/>
</div>
) : childWorkflowState ? (
) : resolvedChildWorkflowState ? (
<>
<div className='[&_*:active]:!cursor-grabbing [&_*]:!cursor-grab [&_.react-flow__handle]:!hidden h-full w-full'>
<PreviewWorkflow
workflowState={childWorkflowState}
workflowState={resolvedChildWorkflowState}
height={160}
width='100%'
isPannable={true}
@@ -1396,7 +1419,9 @@ function PreviewEditorContent({
) : (
<div className='flex h-full items-center justify-center bg-[var(--surface-3)]'>
<span className='text-[13px] text-[var(--text-tertiary)]'>
Unable to load preview
{isMissingChildWorkflow
? DELETED_WORKFLOW_LABEL
: 'Unable to load preview'}
</span>
</div>
)}

View File

@@ -9,6 +9,7 @@ import {
isSubBlockFeatureEnabled,
isSubBlockVisibleForMode,
} from '@/lib/workflows/subblocks/visibility'
import { DELETED_WORKFLOW_LABEL } from '@/app/workspace/[workspaceId]/logs/utils'
import { getDisplayValue } from '@/app/workspace/[workspaceId]/w/[workflowId]/components/workflow-block/workflow-block'
import { getBlock } from '@/blocks'
import { SELECTOR_TYPES_HYDRATION_REQUIRED, type SubBlockConfig } from '@/blocks/types'
@@ -112,7 +113,7 @@ function resolveWorkflowName(
if (!rawValue || typeof rawValue !== 'string') return null
const workflowMap = useWorkflowRegistry.getState().workflows
return workflowMap[rawValue]?.name ?? null
return workflowMap[rawValue]?.name ?? DELETED_WORKFLOW_LABEL
}
/**

View File

@@ -19,6 +19,8 @@ interface TraceSpan {
status?: string
duration?: number
children?: TraceSpan[]
childWorkflowSnapshotId?: string
childWorkflowId?: string
}
interface BlockExecutionData {
@@ -28,6 +30,7 @@ interface BlockExecutionData {
durationMs: number
/** Child trace spans for nested workflow blocks */
children?: TraceSpan[]
childWorkflowSnapshotId?: string
}
/** Represents a level in the workflow navigation stack */
@@ -90,6 +93,7 @@ export function buildBlockExecutions(spans: TraceSpan[]): Record<string, BlockEx
status: span.status || 'unknown',
durationMs: span.duration || 0,
children: span.children,
childWorkflowSnapshotId: span.childWorkflowSnapshotId,
}
}
}
@@ -104,6 +108,8 @@ interface PreviewProps {
traceSpans?: TraceSpan[]
/** Pre-computed block executions (optional - will be built from traceSpans if not provided) */
blockExecutions?: Record<string, BlockExecutionData>
/** Child workflow snapshots keyed by snapshot ID (execution mode only) */
childWorkflowSnapshots?: Record<string, WorkflowState>
/** Additional CSS class names */
className?: string
/** Height of the component */
@@ -136,6 +142,7 @@ export function Preview({
workflowState: rootWorkflowState,
traceSpans: rootTraceSpans,
blockExecutions: providedBlockExecutions,
childWorkflowSnapshots,
className,
height = '100%',
width = '100%',
@@ -287,6 +294,7 @@ export function Preview({
loops={workflowState.loops}
parallels={workflowState.parallels}
isExecutionMode={isExecutionMode}
childWorkflowSnapshots={childWorkflowSnapshots}
onClose={handleEditorClose}
onDrillDown={handleDrillDown}
/>

View File

@@ -6,6 +6,7 @@ interface ChildWorkflowErrorOptions {
childWorkflowName: string
childTraceSpans?: TraceSpan[]
executionResult?: ExecutionResult
childWorkflowSnapshotId?: string
cause?: Error
}
@@ -16,6 +17,7 @@ export class ChildWorkflowError extends Error {
readonly childTraceSpans: TraceSpan[]
readonly childWorkflowName: string
readonly executionResult?: ExecutionResult
readonly childWorkflowSnapshotId?: string
constructor(options: ChildWorkflowErrorOptions) {
super(options.message, { cause: options.cause })
@@ -23,6 +25,7 @@ export class ChildWorkflowError extends Error {
this.childWorkflowName = options.childWorkflowName
this.childTraceSpans = options.childTraceSpans ?? []
this.executionResult = options.executionResult
this.childWorkflowSnapshotId = options.childWorkflowSnapshotId
}
static isChildWorkflowError(error: unknown): error is ChildWorkflowError {

View File

@@ -237,6 +237,9 @@ export class BlockExecutor {
if (ChildWorkflowError.isChildWorkflowError(error)) {
errorOutput.childTraceSpans = error.childTraceSpans
errorOutput.childWorkflowName = error.childWorkflowName
if (error.childWorkflowSnapshotId) {
errorOutput.childWorkflowSnapshotId = error.childWorkflowSnapshotId
}
}
this.state.setBlockOutput(node.id, errorOutput, duration)

View File

@@ -198,6 +198,7 @@ describe('WorkflowBlockHandler', () => {
expect(result).toEqual({
success: true,
childWorkflowId: 'child-id',
childWorkflowName: 'Child Workflow',
result: { data: 'test result' },
childTraceSpans: [],
@@ -235,6 +236,7 @@ describe('WorkflowBlockHandler', () => {
expect(result).toEqual({
success: true,
childWorkflowId: 'child-id',
childWorkflowName: 'Child Workflow',
result: { nested: 'data' },
childTraceSpans: [],

View File

@@ -1,4 +1,5 @@
import { createLogger } from '@sim/logger'
import { snapshotService } from '@/lib/logs/execution/snapshot/service'
import { buildTraceSpans } from '@/lib/logs/execution/trace-spans/trace-spans'
import type { TraceSpan } from '@/lib/logs/types'
import type { BlockOutput } from '@/blocks/types'
@@ -57,6 +58,7 @@ export class WorkflowBlockHandler implements BlockHandler {
const workflowMetadata = workflows[workflowId]
let childWorkflowName = workflowMetadata?.name || workflowId
let childWorkflowSnapshotId: string | undefined
try {
const currentDepth = (ctx.workflowId?.split('_sub_').length || 1) - 1
if (currentDepth >= DEFAULTS.MAX_WORKFLOW_DEPTH) {
@@ -107,6 +109,12 @@ export class WorkflowBlockHandler implements BlockHandler {
childWorkflowInput = inputs.input
}
const childSnapshotResult = await snapshotService.createSnapshotWithDeduplication(
workflowId,
childWorkflow.workflowState
)
childWorkflowSnapshotId = childSnapshotResult.snapshot.id
const subExecutor = new Executor({
workflow: childWorkflow.serializedState,
workflowInput: childWorkflowInput,
@@ -139,7 +147,8 @@ export class WorkflowBlockHandler implements BlockHandler {
workflowId,
childWorkflowName,
duration,
childTraceSpans
childTraceSpans,
childWorkflowSnapshotId
)
return mappedResult
@@ -172,6 +181,7 @@ export class WorkflowBlockHandler implements BlockHandler {
childWorkflowName,
childTraceSpans,
executionResult,
childWorkflowSnapshotId,
cause: error instanceof Error ? error : undefined,
})
}
@@ -279,6 +289,10 @@ export class WorkflowBlockHandler implements BlockHandler {
)
const workflowVariables = (workflowData.variables as Record<string, any>) || {}
const workflowStateWithVariables = {
...workflowState,
variables: workflowVariables,
}
if (Object.keys(workflowVariables).length > 0) {
logger.info(
@@ -290,6 +304,7 @@ export class WorkflowBlockHandler implements BlockHandler {
name: workflowData.name,
serializedState: serializedWorkflow,
variables: workflowVariables,
workflowState: workflowStateWithVariables,
rawBlocks: workflowState.blocks,
}
}
@@ -358,11 +373,16 @@ export class WorkflowBlockHandler implements BlockHandler {
)
const workflowVariables = (wfData?.variables as Record<string, any>) || {}
const workflowStateWithVariables = {
...deployedState,
variables: workflowVariables,
}
return {
name: wfData?.name || DEFAULTS.WORKFLOW_NAME,
serializedState: serializedWorkflow,
variables: workflowVariables,
workflowState: workflowStateWithVariables,
rawBlocks: deployedState.blocks,
}
}
@@ -504,7 +524,8 @@ export class WorkflowBlockHandler implements BlockHandler {
childWorkflowId: string,
childWorkflowName: string,
duration: number,
childTraceSpans?: WorkflowTraceSpan[]
childTraceSpans?: WorkflowTraceSpan[],
childWorkflowSnapshotId?: string
): BlockOutput {
const success = childResult.success !== false
const result = childResult.output || {}
@@ -515,12 +536,15 @@ export class WorkflowBlockHandler implements BlockHandler {
message: `"${childWorkflowName}" failed: ${childResult.error || 'Child workflow execution failed'}`,
childWorkflowName,
childTraceSpans: childTraceSpans || [],
childWorkflowSnapshotId,
})
}
return {
success: true,
childWorkflowName,
childWorkflowId,
...(childWorkflowSnapshotId ? { childWorkflowSnapshotId } : {}),
result,
childTraceSpans: childTraceSpans || [],
} as Record<string, any>

View File

@@ -210,6 +210,7 @@ export interface ExecutionSnapshotData {
executionId: string
workflowId: string
workflowState: Record<string, unknown>
childWorkflowSnapshots?: Record<string, Record<string, unknown>>
executionMetadata: {
trigger: string
startedAt: string

View File

@@ -50,6 +50,8 @@ function prepareLogData(
export async function emitWorkflowExecutionCompleted(log: WorkflowExecutionLog): Promise<void> {
try {
if (!log.workflowId) return
const workflowData = await db
.select({ workspaceId: workflow.workspaceId })
.from(workflow)

View File

@@ -293,7 +293,10 @@ export class ExecutionLogger implements IExecutionLoggerService {
}
try {
const [wf] = await db.select().from(workflow).where(eq(workflow.id, updatedLog.workflowId))
// Skip workflow lookup if workflow was deleted
const wf = updatedLog.workflowId
? (await db.select().from(workflow).where(eq(workflow.id, updatedLog.workflowId)))[0]
: undefined
if (wf) {
const [usr] = await db
.select({ id: userTable.id, email: userTable.email, name: userTable.name })
@@ -461,7 +464,7 @@ export class ExecutionLogger implements IExecutionLoggerService {
* Maintains same logic as original execution logger for billing consistency
*/
private async updateUserStats(
workflowId: string,
workflowId: string | null,
costSummary: {
totalCost: number
totalInputCost: number
@@ -494,6 +497,11 @@ export class ExecutionLogger implements IExecutionLoggerService {
return
}
if (!workflowId) {
logger.debug('Workflow was deleted, skipping user stats update')
return
}
try {
// Get the workflow record to get the userId
const [workflowRecord] = await db

View File

@@ -1,8 +1,8 @@
import { createHash } from 'crypto'
import { db } from '@sim/db'
import { workflowExecutionSnapshots } from '@sim/db/schema'
import { workflowExecutionLogs, workflowExecutionSnapshots } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq, lt } from 'drizzle-orm'
import { and, eq, lt, notExists } from 'drizzle-orm'
import { v4 as uuidv4 } from 'uuid'
import type {
SnapshotService as ISnapshotService,
@@ -121,7 +121,17 @@ export class SnapshotService implements ISnapshotService {
const deletedSnapshots = await db
.delete(workflowExecutionSnapshots)
.where(lt(workflowExecutionSnapshots.createdAt, cutoffDate))
.where(
and(
lt(workflowExecutionSnapshots.createdAt, cutoffDate),
notExists(
db
.select({ id: workflowExecutionLogs.id })
.from(workflowExecutionLogs)
.where(eq(workflowExecutionLogs.stateSnapshotId, workflowExecutionSnapshots.id))
)
)
)
.returning({ id: workflowExecutionSnapshots.id })
const deletedCount = deletedSnapshots.length

View File

@@ -112,6 +112,26 @@ export function buildTraceSpans(result: ExecutionResult): {
const duration = log.durationMs || 0
let output = log.output || {}
let childWorkflowSnapshotId: string | undefined
let childWorkflowId: string | undefined
if (output && typeof output === 'object') {
const outputRecord = output as Record<string, unknown>
childWorkflowSnapshotId =
typeof outputRecord.childWorkflowSnapshotId === 'string'
? outputRecord.childWorkflowSnapshotId
: undefined
childWorkflowId =
typeof outputRecord.childWorkflowId === 'string' ? outputRecord.childWorkflowId : undefined
if (childWorkflowSnapshotId || childWorkflowId) {
const {
childWorkflowSnapshotId: _childSnapshotId,
childWorkflowId: _childWorkflowId,
...outputRest
} = outputRecord
output = outputRest
}
}
if (log.error) {
output = {
@@ -134,6 +154,8 @@ export function buildTraceSpans(result: ExecutionResult): {
blockId: log.blockId,
input: log.input || {},
output: output,
...(childWorkflowSnapshotId ? { childWorkflowSnapshotId } : {}),
...(childWorkflowId ? { childWorkflowId } : {}),
...(log.loopId && { loopId: log.loopId }),
...(log.parallelId && { parallelId: log.parallelId }),
...(log.iterationIndex !== undefined && { iterationIndex: log.iterationIndex }),

View File

@@ -69,7 +69,7 @@ export interface ExecutionStatus {
export interface WorkflowExecutionSnapshot {
id: string
workflowId: string
workflowId: string | null
stateHash: string
stateData: WorkflowState
createdAt: string
@@ -80,7 +80,7 @@ export type WorkflowExecutionSnapshotSelect = WorkflowExecutionSnapshot
export interface WorkflowExecutionLog {
id: string
workflowId: string
workflowId: string | null
executionId: string
stateSnapshotId: string
level: 'info' | 'error'
@@ -178,6 +178,8 @@ export interface TraceSpan {
blockId?: string
input?: Record<string, unknown>
output?: Record<string, unknown>
childWorkflowSnapshotId?: string
childWorkflowId?: string
model?: string
cost?: {
input?: number

View File

@@ -102,7 +102,7 @@ export interface TraceSpan {
export interface WorkflowLog {
id: string
workflowId: string
workflowId: string | null
executionId?: string | null
deploymentVersion?: number | null
deploymentVersionName?: string | null

View File

@@ -0,0 +1,8 @@
ALTER TABLE "workflow_execution_logs" DROP CONSTRAINT "workflow_execution_logs_workflow_id_workflow_id_fk";
--> statement-breakpoint
ALTER TABLE "workflow_execution_snapshots" DROP CONSTRAINT "workflow_execution_snapshots_workflow_id_workflow_id_fk";
--> statement-breakpoint
ALTER TABLE "workflow_execution_logs" ALTER COLUMN "workflow_id" DROP NOT NULL;--> statement-breakpoint
ALTER TABLE "workflow_execution_snapshots" ALTER COLUMN "workflow_id" DROP NOT NULL;--> statement-breakpoint
ALTER TABLE "workflow_execution_logs" ADD CONSTRAINT "workflow_execution_logs_workflow_id_workflow_id_fk" FOREIGN KEY ("workflow_id") REFERENCES "public"."workflow"("id") ON DELETE set null ON UPDATE no action;--> statement-breakpoint
ALTER TABLE "workflow_execution_snapshots" ADD CONSTRAINT "workflow_execution_snapshots_workflow_id_workflow_id_fk" FOREIGN KEY ("workflow_id") REFERENCES "public"."workflow"("id") ON DELETE set null ON UPDATE no action;

File diff suppressed because it is too large Load Diff

View File

@@ -1037,6 +1037,13 @@
"when": 1769626313827,
"tag": "0148_aberrant_venom",
"breakpoints": true
},
{
"idx": 149,
"version": "7",
"when": 1769656977701,
"tag": "0149_next_cerise",
"breakpoints": true
}
]
}

View File

@@ -268,9 +268,7 @@ export const workflowExecutionSnapshots = pgTable(
'workflow_execution_snapshots',
{
id: text('id').primaryKey(),
workflowId: text('workflow_id')
.notNull()
.references(() => workflow.id, { onDelete: 'cascade' }),
workflowId: text('workflow_id').references(() => workflow.id, { onDelete: 'set null' }),
stateHash: text('state_hash').notNull(),
stateData: jsonb('state_data').notNull(),
createdAt: timestamp('created_at').notNull().defaultNow(),
@@ -290,9 +288,7 @@ export const workflowExecutionLogs = pgTable(
'workflow_execution_logs',
{
id: text('id').primaryKey(),
workflowId: text('workflow_id')
.notNull()
.references(() => workflow.id, { onDelete: 'cascade' }),
workflowId: text('workflow_id').references(() => workflow.id, { onDelete: 'set null' }),
workspaceId: text('workspace_id')
.notNull()
.references(() => workspace.id, { onDelete: 'cascade' }),