Startpos stuff

This commit is contained in:
Siddharth Ganesan
2025-08-29 11:54:06 -07:00
parent de06e8c35c
commit 4bba1eb8f6
4 changed files with 305 additions and 26 deletions

View File

@@ -12,6 +12,7 @@ import {
Square,
X,
Flag,
Undo2,
} from 'lucide-react'
import { Button } from '@/components/ui/button'
import { Checkbox } from '@/components/ui/checkbox'
@@ -33,6 +34,7 @@ import { mergeSubblockState } from '@/stores/workflows/utils'
import { useWorkflowStore } from '@/stores/workflows/workflow/store'
import { getTool } from '@/tools/utils'
import { getTrigger, getTriggersByProvider } from '@/triggers'
import { useDebugSnapshotStore } from '@/stores/execution/debug-snapshots/store'
export function DebugPanel() {
const {
@@ -104,6 +106,227 @@ export function DebugPanel() {
[forwardAdj]
)
const handleRevertToStartPos = () => {
const snapshot = useDebugSnapshotStore.getState()
if (!debugContext) return
const startIds = Array.from(startPositionIds)
if (startIds.length !== 1) {
// Require exactly one start position to revert deterministically
return
}
// Build a fresh context based on last snapshot, resolving everything except downstream of start positions
try {
const newCtx = { ...debugContext }
// Reset executed flags for all blocks
newCtx.executedBlocks = new Set<string>()
newCtx.activeExecutionPath = new Set<string>()
// Reapply environment/workflow vars from snapshot if present
if (snapshot.envVarValues) newCtx.environmentVariables = snapshot.envVarValues
if (snapshot.workflowVariables) newCtx.workflowVariables = snapshot.workflowVariables
// Rebuild blockStates from snapshot outputs
const rebuilt = new Map(debugContext.blockStates)
// Mark everything as executed initially based on snapshot
snapshot.blockSnapshots.forEach((snap, id) => {
rebuilt.set(id, {
output: snap.output,
executed: true,
executionTime: snap.executionTime ?? 0,
})
})
// Compute all downstream of start positions; they will become pending and not executed
const downstream = new Set<string>()
const queue = [...startIds]
const visited = new Set<string>()
while (queue.length) {
const n = queue.shift() as string
if (visited.has(n)) continue
visited.add(n)
const next = forwardAdj[n] || []
for (const m of next) {
if (!visited.has(m)) {
downstream.add(m)
queue.push(m)
}
}
}
// Unset executed for start positions and their downstream; keep outputs from snapshot but mark not executed
const startSet = new Set(startIds)
for (const id of startIds) {
const prev = rebuilt.get(id)
rebuilt.set(id, { output: prev?.output ?? {}, executed: false, executionTime: 0 })
}
downstream.forEach((id) => {
const prev = rebuilt.get(id)
rebuilt.set(id, { output: prev?.output ?? {}, executed: false, executionTime: 0 })
})
// Active execution path should include all nodes reachable from any start
const path = new Set<string>()
const q2 = [...startIds]
const seen2 = new Set<string>()
while (q2.length) {
const n = q2.shift() as string
if (seen2.has(n)) continue
seen2.add(n)
path.add(n)
const next = forwardAdj[n] || []
for (const m of next) if (!seen2.has(m)) q2.push(m)
}
newCtx.activeExecutionPath = path
// Set executedBlocks set to upstream executed nodes (not in start or downstream)
const executedSet = new Set<string>()
snapshot.blockSnapshots.forEach((_snap, id) => {
if (!startSet.has(id) && !downstream.has(id)) executedSet.add(id)
})
newCtx.executedBlocks = executedSet
// Active execution path: union of downstream from starts and downstream from executed upstream nodes
const pathFromStarts = path
const pathFromExecuted = new Set<string>()
const q3 = Array.from(executedSet)
const seen3 = new Set<string>()
while (q3.length) {
const n = q3.shift() as string
if (seen3.has(n)) continue
seen3.add(n)
pathFromExecuted.add(n)
const next = forwardAdj[n] || []
for (const m of next) if (!seen3.has(m)) q3.push(m)
}
newCtx.activeExecutionPath = new Set<string>([...pathFromStarts, ...pathFromExecuted])
// Compute minimal ready layer if any start is not ready (due to unmet dependencies)
const reverseAdj: Record<string, string[]> = {}
for (const e of edgesList) {
const s = (e as any).source
const t = (e as any).target
if (!reverseAdj[t]) reverseAdj[t] = []
reverseAdj[t].push(s)
}
const isExecuted = (id: string) => (rebuilt.get(id)?.executed ? true : false)
const memoReady = new Map<string, Set<string>>()
const visiting = new Set<string>()
const collectReadyLayerFor = (id: string): Set<string> => {
if (memoReady.has(id)) return memoReady.get(id) as Set<string>
if (visiting.has(id)) return new Set<string>()
visiting.add(id)
const incoming = reverseAdj[id] || []
if (incoming.length === 0) {
const base = isExecuted(id) ? new Set<string>() : new Set<string>([id])
memoReady.set(id, base)
visiting.delete(id)
return base
}
let allParentsExecuted = true
const need: Set<string> = new Set()
for (const src of incoming) {
if (!isExecuted(src)) {
allParentsExecuted = false
const sub = collectReadyLayerFor(src)
sub.forEach((n) => need.add(n))
}
}
const result = allParentsExecuted && !isExecuted(id) ? new Set<string>([id]) : need
memoReady.set(id, result)
visiting.delete(id)
return result
}
const initialReady = new Set<string>()
for (const sid of startIds) {
const ready = collectReadyLayerFor(sid)
ready.forEach((n) => initialReady.add(n))
}
// If there is a non-empty ready layer different from direct starts, use it as pending and include in path
let initialPending = startIds
if (initialReady.size > 0) {
initialPending = Array.from(initialReady)
for (const n of initialReady) newCtx.activeExecutionPath.add(n)
}
// Depth-based reset: reset all branches to the same depth as initialPending
// Compute BFS depth map from starter
const depthMap = new Map<string, number>()
const qd: string[] = []
if (starterId) {
depthMap.set(starterId, 0)
qd.push(starterId)
}
while (qd.length) {
const u = qd.shift() as string
const du = depthMap.get(u) || 0
const nexts = forwardAdj[u] || []
for (const v of nexts) {
if (!depthMap.has(v)) {
depthMap.set(v, du + 1)
qd.push(v)
}
}
}
// Determine target depth from initialPending (fallback to min depth of startIds)
const idsForDepth = initialPending.length > 0 ? initialPending : startIds
let targetDepth = Infinity
for (const nid of idsForDepth) {
const d = depthMap.get(nid)
if (d !== undefined && d < targetDepth) targetDepth = d
}
if (!Number.isFinite(targetDepth)) targetDepth = 0
// Apply depth-based execution state across all branches
const newExecutedSet = new Set<string>()
rebuilt.forEach((_state, id) => {
const d = depthMap.get(id)
const snap = snapshot.blockSnapshots.get(id)
const shouldBeExecuted = !!snap?.executed && (d === undefined ? false : d < (targetDepth as number))
const prev = rebuilt.get(id)
rebuilt.set(id, {
output: prev?.output ?? snap?.output ?? {},
executed: shouldBeExecuted,
executionTime: shouldBeExecuted ? (prev?.executionTime ?? snap?.executionTime ?? 0) : 0,
} as any)
if (shouldBeExecuted) newExecutedSet.add(id)
})
newCtx.executedBlocks = newExecutedSet
// Active execution path: all nodes at or beyond target depth
const futurePath = new Set<string>()
depthMap.forEach((d, id) => {
if (d >= (targetDepth as number)) futurePath.add(id)
})
newCtx.activeExecutionPath = new Set<string>([...newCtx.activeExecutionPath, ...futurePath])
// Compute global ready layer at target depth so parallel branches at same depth are included
const globalReady: string[] = []
depthMap.forEach((d, id) => {
if (d === (targetDepth as number)) {
const parents = reverseAdj[id] || []
const allParentsExec = parents.every((p) => (rebuilt.get(p)?.executed ? true : false))
if (allParentsExec) globalReady.push(id)
}
})
// Pending selection: prefer global ready layer if available; else minimal chain-based ready
const pendingSelection = globalReady.length > 0 ? globalReady : initialPending
// Apply rebuilt block states and clear parallel mapping current id
newCtx.blockStates = rebuilt as any
newCtx.currentVirtualBlockId = undefined
// Pending are either the global layer or the minimal ready layer/start positions
setPendingBlocks(pendingSelection)
setDebugContext(newCtx)
// Also update panel focus to first pending for clarity
setPanelFocusedBlockId(pendingSelection[0] || null)
} catch {}
}
// Helper to format strings with clickable var/env tokens
const renderWithTokens = (text: string, options?: { truncateAt?: number }) => {
const truncateAt = options?.truncateAt
@@ -1470,6 +1693,21 @@ export function DebugPanel() {
</TooltipContent>
</Tooltip>
<Tooltip>
<TooltipTrigger asChild>
<Button
size='icon'
variant='ghost'
onClick={handleRevertToStartPos}
aria-label='Revert to Start Pos'
className='h-8 w-8 rounded-md bg-purple-500/10 text-purple-600 hover:bg-purple-500/20'
>
<Undo2 className='h-4 w-4' />
</Button>
</TooltipTrigger>
<TooltipContent>Revert to Start Position</TooltipContent>
</Tooltip>
<Tooltip>
<TooltipTrigger asChild>
<Button
@@ -1625,8 +1863,7 @@ export function DebugPanel() {
// Check logs
const errorLog = debugContext?.blockLogs?.find(
(log) =>
(log.blockId === id || resolveOriginalBlockId(log.blockId) === id) &&
!log.success
(log.blockId === id || resolveOriginalBlockId(log.blockId) === id) && !log.success
)
if (errorLog?.error) {
return (

View File

@@ -17,7 +17,7 @@ import { useGeneralStore } from '@/stores/settings/general/store'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
import { mergeSubblockState } from '@/stores/workflows/utils'
import { useCurrentWorkflow } from './use-current-workflow'
import { BlockPathCalculator } from '@/lib/block-path-calculator'
import { useDebugSnapshotStore } from '@/stores/execution/debug-snapshots/store'
const logger = createLogger('useWorkflowExecution')
@@ -157,6 +157,10 @@ export function useWorkflowExecution() {
// Update debug context and pending blocks
if (result.metadata?.context) {
setDebugContext(result.metadata.context)
// Capture snapshot for revert
try {
useDebugSnapshotStore.getState().captureFromContext(result.metadata.context as any)
} catch {}
}
if (result.metadata?.pendingBlocks) {
setPendingBlocks(result.metadata.pendingBlocks)
@@ -475,14 +479,13 @@ export function useWorkflowExecution() {
const result = await executeWorkflow(workflowInput, undefined, executionId, enableDebug)
if (result && 'metadata' in result && result.metadata?.isDebugSession) {
setDebugContext(result.metadata.context || null)
if (result.metadata.pendingBlocks) {
// If start positions are set, override pending blocks with those
if (startPositionIds && startPositionIds.size > 0) {
const next = Array.from(startPositionIds)
setPendingBlocks(next)
} else {
setPendingBlocks(result.metadata.pendingBlocks)
try {
if (result.metadata?.context) {
useDebugSnapshotStore.getState().captureFromContext(result.metadata.context as any)
}
} catch {}
if (result.metadata.pendingBlocks) {
setPendingBlocks(result.metadata.pendingBlocks)
}
} else if (result && 'success' in result) {
setExecutionResult(result)
@@ -693,15 +696,8 @@ export function useWorkflowExecution() {
// Execute workflow
const execResult = await newExecutor.execute(activeWorkflowId || '')
// If we have start positions for debug, update context/pending accordingly
if (
debugRequested === true &&
execResult &&
'metadata' in execResult &&
(execResult as any).metadata?.isDebugSession &&
startPositionIds &&
startPositionIds.size > 0
) {
// If we have start positions for debug, update context/pending accordingly (only on explicit user revert, not on restart)
if (false) {
try {
const ctx = (execResult as any).metadata.context
// Build forward adjacency from serialized connections
@@ -730,6 +726,9 @@ export function useWorkflowExecution() {
setPendingBlocks(Array.from(startPositionIds))
;(execResult as any).metadata.pendingBlocks = Array.from(startPositionIds)
setDebugContext(ctx)
try {
useDebugSnapshotStore.getState().captureFromContext(ctx as any)
} catch {}
} catch (e) {
logger.warn('Failed to apply start positions', e)
}

View File

@@ -0,0 +1,46 @@
import { create } from 'zustand'
import type { ExecutionContext } from '@/executor/types'
interface BlockSnapshot {
output: any
executed: boolean
executionTime?: number
}
interface DebugSnapshotState {
blockSnapshots: Map<string, BlockSnapshot>
envVarValues?: Record<string, string>
workflowVariables?: Record<string, any>
}
interface DebugSnapshotActions {
captureFromContext: (ctx: ExecutionContext) => void
clear: () => void
}
export const useDebugSnapshotStore = create<DebugSnapshotState & DebugSnapshotActions>()((set) => ({
blockSnapshots: new Map<string, BlockSnapshot>(),
envVarValues: undefined,
workflowVariables: undefined,
captureFromContext: (ctx: ExecutionContext) => {
const next = new Map<string, BlockSnapshot>()
try {
ctx.blockStates.forEach((state, key) => {
next.set(String(key), {
output: state?.output ?? {},
executed: !!state?.executed,
executionTime: state?.executionTime,
})
})
} catch {}
set({
blockSnapshots: next,
envVarValues: ctx.environmentVariables || undefined,
workflowVariables: ctx.workflowVariables || undefined,
})
},
clear: () => set({ blockSnapshots: new Map(), envVarValues: undefined, workflowVariables: undefined }),
}))

View File

@@ -65,15 +65,12 @@ export const useExecutionStore = create<ExecutionState & ExecutionActions>()((se
setExecutingBlockIds: (ids) => set({ executingBlockIds: new Set(ids) }),
setBreakpointId: (id) => set({ breakpointId: id }),
setStartPositions: (ids) => set({ startPositionIds: new Set(ids) }),
setStartPositions: (ids) => set({ startPositionIds: new Set(Array.from(ids).slice(0, 1)) }),
toggleStartPosition: (id) => {
set((state) => {
const next = new Set(state.startPositionIds)
if (next.has(id)) {
next.delete(id)
} else {
next.add(id)
}
const isActive = state.startPositionIds.has(id)
// Enforce single selection
const next = isActive ? new Set<string>() : new Set<string>([id])
return { startPositionIds: next }
})
},