Compare commits

...

8 Commits

Author SHA1 Message Date
Vikhyath Mondreti
e2e86a7b19 fix(snapshot): consolidate to use hasWorkflowChanges check 2026-01-28 13:23:57 -08:00
Vikhyath Mondreti
78410eef84 improvement(inputs): sanitize trigger inputs better (#3047) 2026-01-28 12:57:20 -08:00
Siddharth Ganesan
655fe4f3b7 feat(executor): run from/until block (#3029)
* Run from block

* Fixes

* Fix

* Fix

* Minor improvements

* Fix

* Fix trace spans

* Fix loop l ogs

* Change ordering

* Run u ntil block

* Lint

* Clean up

* Fix

* Allow run from block for triggers

* Consolidation

* Fix lint

* Fix

* Fix mock payload

* Fix

* Fix trigger clear snapshot

* Fix loops and parallels

* Fix

* Cleanup

* Fix test

* Fix bugs

* Catch error

* Fix

* Fix

* I think it works??

* Fix

* Fix

* Add tests

* Fix lint

---------

Co-authored-by: Vikhyath Mondreti <vikhyath@simstudio.ai>
2026-01-28 12:53:23 -08:00
Waleed
72a2f79701 improvement(search-modal): add quick navigation items and fix cmdk value uniqueness (#3050)
* improvement(search-modal): add quick navigation items and fix cmdk value uniqueness

* rerank
2026-01-28 12:39:00 -08:00
Waleed
2c2b485f81 fix(workflow): update container dimensions on keyboard movement (#3043)
* fix(workflow): update container dimensions on keyboard movement

* fix(workflow): avoid duplicate container updates during drag

Add !change.dragging check to only handle keyboard movements in
onNodesChange, since mouse drags are already handled by onNodeDrag.

* fix(workflow): persist keyboard movements to backend

Keyboard arrow key movements now call collaborativeBatchUpdatePositions
to sync position changes to the backend for persistence and real-time
collaboration.

* improvement(cmdk): refactor search modal to use cmdk + fix icon SVG IDs (#3044)

* improvement(cmdk): refactor search modal to use cmdk + fix icon SVG IDs

* chore: remove unrelated workflow.tsx changes

* chore: remove comments

* chore: add devtools middleware to search modal store

* fix: allow search data re-initialization when permissions change

* fix: include keywords in search filter + show service name in tool operations

* fix: correct filterBlocks type signature

* fix: move generic to function parameter position

* fix(mcp): correct event handler type for onInput

* perf: always render command palette for instant opening

* fix: clear search input when modal reopens

* fix(helm): move rotationPolicy under privateKey for cert-manager compatibility (#3046)

* fix(helm): move rotationPolicy under privateKey for cert-manager compatibility

* docs(helm): add reclaimPolicy Retain guidance for production database storage

* fix(helm): prevent empty branding ConfigMap creation

* fix(workflow): avoid duplicate position updates on drag end

Check isInDragOperation before persisting in onNodesChange to prevent
duplicate calls. Drag-end events have dragStartPosition still set,
while keyboard movements don't, allowing proper distinction.
2026-01-28 12:31:38 -08:00
Siddharth Ganesan
01e0723a3a fix(loops): fix loops on empty collection (#3049)
* Fix

* Cleanup

* order of ops for validations

* only reachable subflow nodes should hit validation

---------

Co-authored-by: Vikhyath Mondreti <vikhyath@simstudio.ai>
2026-01-28 12:16:36 -08:00
Waleed
6814f33243 fix(helm): move rotationPolicy under privateKey for cert-manager compatibility (#3046)
* fix(helm): move rotationPolicy under privateKey for cert-manager compatibility

* docs(helm): add reclaimPolicy Retain guidance for production database storage

* fix(helm): prevent empty branding ConfigMap creation
2026-01-28 10:51:19 -08:00
Waleed
304cf717a4 improvement(cmdk): refactor search modal to use cmdk + fix icon SVG IDs (#3044)
* improvement(cmdk): refactor search modal to use cmdk + fix icon SVG IDs

* chore: remove unrelated workflow.tsx changes

* chore: remove comments

* chore: add devtools middleware to search modal store

* fix: allow search data re-initialization when permissions change

* fix: include keywords in search filter + show service name in tool operations

* fix: correct filterBlocks type signature

* fix: move generic to function parameter position

* fix(mcp): correct event handler type for onInput

* perf: always render command palette for instant opening

* fix: clear search input when modal reopens
2026-01-28 10:38:09 -08:00
45 changed files with 4678 additions and 1747 deletions

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,216 @@
import { db, workflow as workflowTable } from '@sim/db'
import { createLogger } from '@sim/logger'
import { eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { v4 as uuidv4 } from 'uuid'
import { z } from 'zod'
import { checkHybridAuth } from '@/lib/auth/hybrid'
import { generateRequestId } from '@/lib/core/utils/request'
import { SSE_HEADERS } from '@/lib/core/utils/sse'
import { markExecutionCancelled } from '@/lib/execution/cancellation'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
import { createSSECallbacks } from '@/lib/workflows/executor/execution-events'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionMetadata, SerializableExecutionState } from '@/executor/execution/types'
import { hasExecutionResult } from '@/executor/utils/errors'
const logger = createLogger('ExecuteFromBlockAPI')
const ExecuteFromBlockSchema = z.object({
startBlockId: z.string().min(1, 'Start block ID is required'),
sourceSnapshot: z.object({
blockStates: z.record(z.any()),
executedBlocks: z.array(z.string()),
blockLogs: z.array(z.any()),
decisions: z.object({
router: z.record(z.string()),
condition: z.record(z.string()),
}),
completedLoops: z.array(z.string()),
loopExecutions: z.record(z.any()).optional(),
parallelExecutions: z.record(z.any()).optional(),
parallelBlockMapping: z.record(z.any()).optional(),
activeExecutionPath: z.array(z.string()),
}),
input: z.any().optional(),
})
export const runtime = 'nodejs'
export const dynamic = 'force-dynamic'
export async function POST(req: NextRequest, { params }: { params: Promise<{ id: string }> }) {
const requestId = generateRequestId()
const { id: workflowId } = await params
try {
const auth = await checkHybridAuth(req, { requireWorkflowId: false })
if (!auth.success || !auth.userId) {
return NextResponse.json({ error: auth.error || 'Unauthorized' }, { status: 401 })
}
const userId = auth.userId
let body: unknown
try {
body = await req.json()
} catch {
return NextResponse.json({ error: 'Invalid JSON body' }, { status: 400 })
}
const validation = ExecuteFromBlockSchema.safeParse(body)
if (!validation.success) {
logger.warn(`[${requestId}] Invalid request body:`, validation.error.errors)
return NextResponse.json(
{
error: 'Invalid request body',
details: validation.error.errors.map((e) => ({
path: e.path.join('.'),
message: e.message,
})),
},
{ status: 400 }
)
}
const { startBlockId, sourceSnapshot, input } = validation.data
const executionId = uuidv4()
const [workflowRecord] = await db
.select({ workspaceId: workflowTable.workspaceId, userId: workflowTable.userId })
.from(workflowTable)
.where(eq(workflowTable.id, workflowId))
.limit(1)
if (!workflowRecord?.workspaceId) {
return NextResponse.json({ error: 'Workflow not found or has no workspace' }, { status: 404 })
}
const workspaceId = workflowRecord.workspaceId
const workflowUserId = workflowRecord.userId
logger.info(`[${requestId}] Starting run-from-block execution`, {
workflowId,
startBlockId,
executedBlocksCount: sourceSnapshot.executedBlocks.length,
})
const loggingSession = new LoggingSession(workflowId, executionId, 'manual', requestId)
const abortController = new AbortController()
let isStreamClosed = false
const stream = new ReadableStream<Uint8Array>({
async start(controller) {
const { sendEvent, onBlockStart, onBlockComplete, onStream } = createSSECallbacks({
executionId,
workflowId,
controller,
isStreamClosed: () => isStreamClosed,
setStreamClosed: () => {
isStreamClosed = true
},
})
const metadata: ExecutionMetadata = {
requestId,
workflowId,
userId,
executionId,
triggerType: 'manual',
workspaceId,
workflowUserId,
useDraftState: true,
isClientSession: true,
startTime: new Date().toISOString(),
}
const snapshot = new ExecutionSnapshot(metadata, {}, input || {}, {})
try {
const startTime = new Date()
sendEvent({
type: 'execution:started',
timestamp: startTime.toISOString(),
executionId,
workflowId,
data: { startTime: startTime.toISOString() },
})
const result = await executeWorkflowCore({
snapshot,
loggingSession,
abortSignal: abortController.signal,
runFromBlock: {
startBlockId,
sourceSnapshot: sourceSnapshot as SerializableExecutionState,
},
callbacks: { onBlockStart, onBlockComplete, onStream },
})
if (result.status === 'cancelled') {
sendEvent({
type: 'execution:cancelled',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: { duration: result.metadata?.duration || 0 },
})
} else {
sendEvent({
type: 'execution:completed',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
success: result.success,
output: result.output,
duration: result.metadata?.duration || 0,
startTime: result.metadata?.startTime || startTime.toISOString(),
endTime: result.metadata?.endTime || new Date().toISOString(),
},
})
}
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error(`[${requestId}] Run-from-block execution failed: ${errorMessage}`)
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
sendEvent({
type: 'execution:error',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
error: executionResult?.error || errorMessage,
duration: executionResult?.metadata?.duration || 0,
},
})
} finally {
if (!isStreamClosed) {
try {
controller.enqueue(new TextEncoder().encode('data: [DONE]\n\n'))
controller.close()
} catch {}
}
}
},
cancel() {
isStreamClosed = true
abortController.abort()
markExecutionCancelled(executionId).catch(() => {})
},
})
return new NextResponse(stream, {
headers: { ...SSE_HEADERS, 'X-Execution-Id': executionId },
})
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error(`[${requestId}] Failed to start run-from-block execution:`, error)
return NextResponse.json(
{ error: errorMessage || 'Failed to start execution' },
{ status: 500 }
)
}
}

View File

@@ -53,6 +53,7 @@ const ExecuteWorkflowSchema = z.object({
parallels: z.record(z.any()).optional(),
})
.optional(),
stopAfterBlockId: z.string().optional(),
})
export const runtime = 'nodejs'
@@ -222,6 +223,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
includeFileBase64,
base64MaxBytes,
workflowStateOverride,
stopAfterBlockId,
} = validation.data
// For API key and internal JWT auth, the entire body is the input (except for our control fields)
@@ -237,6 +239,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
includeFileBase64,
base64MaxBytes,
workflowStateOverride,
stopAfterBlockId: _stopAfterBlockId,
workflowId: _workflowId, // Also exclude workflowId used for internal JWT auth
...rest
} = body
@@ -434,6 +437,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
loggingSession,
includeFileBase64,
base64MaxBytes,
stopAfterBlockId,
})
const outputWithBase64 = includeFileBase64
@@ -722,6 +726,7 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
abortSignal: abortController.signal,
includeFileBase64,
base64MaxBytes,
stopAfterBlockId,
})
if (result.status === 'paused') {

View File

@@ -1,11 +1,13 @@
import { memo, useCallback } from 'react'
import { ArrowLeftRight, ArrowUpDown, Circle, CircleOff, LogOut } from 'lucide-react'
import { ArrowLeftRight, ArrowUpDown, Circle, CircleOff, LogOut, Play } from 'lucide-react'
import { Button, Copy, Tooltip, Trash2 } from '@/components/emcn'
import { cn } from '@/lib/core/utils/cn'
import { isInputDefinitionTrigger } from '@/lib/workflows/triggers/input-definition-triggers'
import { useUserPermissionsContext } from '@/app/workspace/[workspaceId]/providers/workspace-permissions-provider'
import { useWorkflowExecution } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks'
import { validateTriggerPaste } from '@/app/workspace/[workspaceId]/w/[workflowId]/utils'
import { useCollaborativeWorkflow } from '@/hooks/use-collaborative-workflow'
import { useExecutionStore } from '@/stores/execution'
import { useNotificationStore } from '@/stores/notifications'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
import { useWorkflowStore } from '@/stores/workflows/workflow/store'
@@ -49,6 +51,7 @@ export const ActionBar = memo(
collaborativeBatchToggleBlockHandles,
} = useCollaborativeWorkflow()
const { setPendingSelection } = useWorkflowRegistry()
const { handleRunFromBlock } = useWorkflowExecution()
const addNotification = useNotificationStore((s) => s.addNotification)
@@ -97,12 +100,39 @@ export const ActionBar = memo(
)
)
const { activeWorkflowId } = useWorkflowRegistry()
const { isExecuting, getLastExecutionSnapshot } = useExecutionStore()
const userPermissions = useUserPermissionsContext()
const edges = useWorkflowStore((state) => state.edges)
const isStartBlock = isInputDefinitionTrigger(blockType)
const isResponseBlock = blockType === 'response'
const isNoteBlock = blockType === 'note'
const isSubflowBlock = blockType === 'loop' || blockType === 'parallel'
const isInsideSubflow = parentId && (parentType === 'loop' || parentType === 'parallel')
const snapshot = activeWorkflowId ? getLastExecutionSnapshot(activeWorkflowId) : null
const incomingEdges = edges.filter((edge) => edge.target === blockId)
const isTriggerBlock = incomingEdges.length === 0
// Check if each source block is either executed OR is a trigger block (triggers don't need prior execution)
const isSourceSatisfied = (sourceId: string) => {
if (snapshot?.executedBlocks.includes(sourceId)) return true
// Check if source is a trigger (has no incoming edges itself)
const sourceIncomingEdges = edges.filter((edge) => edge.target === sourceId)
return sourceIncomingEdges.length === 0
}
// Non-trigger blocks need a snapshot to exist (so upstream outputs are available)
const dependenciesSatisfied =
isTriggerBlock || (snapshot && incomingEdges.every((edge) => isSourceSatisfied(edge.source)))
const canRunFromBlock =
dependenciesSatisfied && !isNoteBlock && !isInsideSubflow && !isExecuting
const handleRunFromBlockClick = useCallback(() => {
if (!activeWorkflowId || !canRunFromBlock) return
handleRunFromBlock(blockId, activeWorkflowId)
}, [blockId, activeWorkflowId, canRunFromBlock, handleRunFromBlock])
/**
* Get appropriate tooltip message based on disabled state
@@ -128,30 +158,35 @@ export const ActionBar = memo(
'dark:border-transparent dark:bg-[var(--surface-4)]'
)}
>
{!isNoteBlock && (
{!isNoteBlock && !isInsideSubflow && (
<Tooltip.Root>
<Tooltip.Trigger asChild>
<Button
variant='ghost'
onClick={(e) => {
e.stopPropagation()
if (!disabled) {
collaborativeBatchToggleBlockEnabled([blockId])
if (canRunFromBlock && !disabled) {
handleRunFromBlockClick()
}
}}
className={ACTION_BUTTON_STYLES}
disabled={disabled}
disabled={disabled || !canRunFromBlock}
>
{isEnabled ? <Circle className={ICON_SIZE} /> : <CircleOff className={ICON_SIZE} />}
<Play className={ICON_SIZE} />
</Button>
</Tooltip.Trigger>
<Tooltip.Content side='top'>
{getTooltipMessage(isEnabled ? 'Disable Block' : 'Enable Block')}
{(() => {
if (disabled) return getTooltipMessage('Run from block')
if (isExecuting) return 'Execution in progress'
if (!dependenciesSatisfied) return 'Run upstream blocks first'
return 'Run from block'
})()}
</Tooltip.Content>
</Tooltip.Root>
)}
{isSubflowBlock && (
{!isNoteBlock && (
<Tooltip.Root>
<Tooltip.Trigger asChild>
<Button

View File

@@ -40,9 +40,16 @@ export interface BlockMenuProps {
onRemoveFromSubflow: () => void
onOpenEditor: () => void
onRename: () => void
onRunFromBlock?: () => void
onRunUntilBlock?: () => void
hasClipboard?: boolean
showRemoveFromSubflow?: boolean
/** Whether run from block is available (has snapshot, was executed, not inside subflow) */
canRunFromBlock?: boolean
disableEdit?: boolean
isExecuting?: boolean
/** Whether the selected block is a trigger (has no incoming edges) */
isPositionalTrigger?: boolean
}
/**
@@ -65,9 +72,14 @@ export function BlockMenu({
onRemoveFromSubflow,
onOpenEditor,
onRename,
onRunFromBlock,
onRunUntilBlock,
hasClipboard = false,
showRemoveFromSubflow = false,
canRunFromBlock = false,
disableEdit = false,
isExecuting = false,
isPositionalTrigger = false,
}: BlockMenuProps) {
const isSingleBlock = selectedBlocks.length === 1
@@ -78,10 +90,15 @@ export function BlockMenu({
(b) =>
TriggerUtils.requiresSingleInstance(b.type) || TriggerUtils.isSingleInstanceBlockType(b.type)
)
const hasTriggerBlock = selectedBlocks.some((b) => TriggerUtils.isTriggerBlock(b))
// A block is a trigger if it's explicitly a trigger type OR has no incoming edges (positional trigger)
const hasTriggerBlock =
selectedBlocks.some((b) => TriggerUtils.isTriggerBlock(b)) || isPositionalTrigger
const allNoteBlocks = selectedBlocks.every((b) => b.type === 'note')
const isSubflow =
isSingleBlock && (selectedBlocks[0]?.type === 'loop' || selectedBlocks[0]?.type === 'parallel')
const isInsideSubflow =
isSingleBlock &&
(selectedBlocks[0]?.parentType === 'loop' || selectedBlocks[0]?.parentType === 'parallel')
const canRemoveFromSubflow = showRemoveFromSubflow && !hasTriggerBlock
@@ -203,6 +220,38 @@ export function BlockMenu({
</PopoverItem>
)}
{/* Run from/until block - only for single non-note block, not inside subflows */}
{isSingleBlock && !allNoteBlocks && !isInsideSubflow && (
<>
<PopoverDivider />
<PopoverItem
disabled={!canRunFromBlock || isExecuting}
onClick={() => {
if (canRunFromBlock && !isExecuting) {
onRunFromBlock?.()
onClose()
}
}}
>
Run from block
</PopoverItem>
{/* Hide "Run until" for triggers - they're always at the start */}
{!hasTriggerBlock && (
<PopoverItem
disabled={isExecuting}
onClick={() => {
if (!isExecuting) {
onRunUntilBlock?.()
onClose()
}
}}
>
Run until block
</PopoverItem>
)}
</>
)}
{/* Destructive action */}
<PopoverDivider />
<PopoverItem

View File

@@ -15,13 +15,16 @@ import {
TriggerUtils,
} from '@/lib/workflows/triggers/triggers'
import { useCurrentWorkflow } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-current-workflow'
import type { BlockLog, ExecutionResult, StreamingExecution } from '@/executor/types'
import { getBlock } from '@/blocks'
import type { SerializableExecutionState } from '@/executor/execution/types'
import type { BlockLog, BlockState, ExecutionResult, StreamingExecution } from '@/executor/types'
import { hasExecutionResult } from '@/executor/utils/errors'
import { coerceValue } from '@/executor/utils/start-block'
import { subscriptionKeys } from '@/hooks/queries/subscription'
import { useExecutionStream } from '@/hooks/use-execution-stream'
import { WorkflowValidationError } from '@/serializer'
import { useExecutionStore } from '@/stores/execution'
import { useNotificationStore } from '@/stores/notifications'
import { useVariablesStore } from '@/stores/panel'
import { useEnvironmentStore } from '@/stores/settings/environment'
import { type ConsoleEntry, useTerminalConsoleStore } from '@/stores/terminal'
@@ -98,11 +101,15 @@ export function useWorkflowExecution() {
setActiveBlocks,
setBlockRunStatus,
setEdgeRunStatus,
setLastExecutionSnapshot,
getLastExecutionSnapshot,
clearLastExecutionSnapshot,
} = useExecutionStore()
const [executionResult, setExecutionResult] = useState<ExecutionResult | null>(null)
const executionStream = useExecutionStream()
const currentChatExecutionIdRef = useRef<string | null>(null)
const isViewingDiff = useWorkflowDiffStore((state) => state.isShowingDiff)
const addNotification = useNotificationStore((state) => state.addNotification)
/**
* Validates debug state before performing debug operations
@@ -668,7 +675,8 @@ export function useWorkflowExecution() {
onStream?: (se: StreamingExecution) => Promise<void>,
executionId?: string,
onBlockComplete?: (blockId: string, output: any) => Promise<void>,
overrideTriggerType?: 'chat' | 'manual' | 'api'
overrideTriggerType?: 'chat' | 'manual' | 'api',
stopAfterBlockId?: string
): Promise<ExecutionResult | StreamingExecution> => {
// Use diff workflow for execution when available, regardless of canvas view state
const executionWorkflowState = null as {
@@ -876,6 +884,8 @@ export function useWorkflowExecution() {
const activeBlocksSet = new Set<string>()
const streamedContent = new Map<string, string>()
const accumulatedBlockLogs: BlockLog[] = []
const accumulatedBlockStates = new Map<string, BlockState>()
const executedBlockIds = new Set<string>()
// Execute the workflow
try {
@@ -887,6 +897,7 @@ export function useWorkflowExecution() {
triggerType: overrideTriggerType || 'manual',
useDraftState: true,
isClientSession: true,
stopAfterBlockId,
workflowStateOverride: executionWorkflowState
? {
blocks: executionWorkflowState.blocks,
@@ -916,18 +927,22 @@ export function useWorkflowExecution() {
logger.info('onBlockCompleted received:', { data })
activeBlocksSet.delete(data.blockId)
// Create a new Set to trigger React re-render
setActiveBlocks(new Set(activeBlocksSet))
// Track successful block execution in run path
setBlockRunStatus(data.blockId, 'success')
// Edges already tracked in onBlockStarted, no need to track again
executedBlockIds.add(data.blockId)
accumulatedBlockStates.set(data.blockId, {
output: data.output,
executed: true,
executionTime: data.durationMs,
})
const isContainerBlock = data.blockType === 'loop' || data.blockType === 'parallel'
if (isContainerBlock) return
const startedAt = new Date(Date.now() - data.durationMs).toISOString()
const endedAt = new Date().toISOString()
// Accumulate block log for the execution result
accumulatedBlockLogs.push({
blockId: data.blockId,
blockName: data.blockName || 'Unknown Block',
@@ -1056,6 +1071,53 @@ export function useWorkflowExecution() {
},
logs: accumulatedBlockLogs,
}
// Add trigger block to executed blocks so downstream blocks can use run-from-block
if (data.success && startBlockId) {
executedBlockIds.add(startBlockId)
}
if (data.success && activeWorkflowId) {
if (stopAfterBlockId) {
const existingSnapshot = getLastExecutionSnapshot(activeWorkflowId)
const mergedBlockStates = {
...(existingSnapshot?.blockStates || {}),
...Object.fromEntries(accumulatedBlockStates),
}
const mergedExecutedBlocks = new Set([
...(existingSnapshot?.executedBlocks || []),
...executedBlockIds,
])
const snapshot: SerializableExecutionState = {
blockStates: mergedBlockStates,
executedBlocks: Array.from(mergedExecutedBlocks),
blockLogs: [...(existingSnapshot?.blockLogs || []), ...accumulatedBlockLogs],
decisions: existingSnapshot?.decisions || { router: {}, condition: {} },
completedLoops: existingSnapshot?.completedLoops || [],
activeExecutionPath: Array.from(mergedExecutedBlocks),
}
setLastExecutionSnapshot(activeWorkflowId, snapshot)
logger.info('Merged execution snapshot after run-until-block', {
workflowId: activeWorkflowId,
newBlocksExecuted: executedBlockIds.size,
totalExecutedBlocks: mergedExecutedBlocks.size,
})
} else {
const snapshot: SerializableExecutionState = {
blockStates: Object.fromEntries(accumulatedBlockStates),
executedBlocks: Array.from(executedBlockIds),
blockLogs: accumulatedBlockLogs,
decisions: { router: {}, condition: {} },
completedLoops: [],
activeExecutionPath: Array.from(executedBlockIds),
}
setLastExecutionSnapshot(activeWorkflowId, snapshot)
logger.info('Stored execution snapshot for run-from-block', {
workflowId: activeWorkflowId,
executedBlocksCount: executedBlockIds.size,
})
}
}
},
onExecutionError: (data) => {
@@ -1376,6 +1438,330 @@ export function useWorkflowExecution() {
setActiveBlocks,
])
/**
* Handles running workflow from a specific block using cached outputs
*/
const handleRunFromBlock = useCallback(
async (blockId: string, workflowId: string) => {
const snapshot = getLastExecutionSnapshot(workflowId)
const workflowEdges = useWorkflowStore.getState().edges
const incomingEdges = workflowEdges.filter((edge) => edge.target === blockId)
const isTriggerBlock = incomingEdges.length === 0
// Check if each source block is either executed OR is a trigger block (triggers don't need prior execution)
const isSourceSatisfied = (sourceId: string) => {
if (snapshot?.executedBlocks.includes(sourceId)) return true
// Check if source is a trigger (has no incoming edges itself)
const sourceIncomingEdges = workflowEdges.filter((edge) => edge.target === sourceId)
return sourceIncomingEdges.length === 0
}
// Non-trigger blocks need a snapshot to exist (so upstream outputs are available)
if (!snapshot && !isTriggerBlock) {
logger.error('No execution snapshot available for run-from-block', { workflowId, blockId })
return
}
const dependenciesSatisfied =
isTriggerBlock || incomingEdges.every((edge) => isSourceSatisfied(edge.source))
if (!dependenciesSatisfied) {
logger.error('Upstream dependencies not satisfied for run-from-block', {
workflowId,
blockId,
})
return
}
// For trigger blocks, always use empty snapshot to prevent stale data from different
// execution paths from being resolved. For non-trigger blocks, use the existing snapshot.
const emptySnapshot: SerializableExecutionState = {
blockStates: {},
executedBlocks: [],
blockLogs: [],
decisions: { router: {}, condition: {} },
completedLoops: [],
activeExecutionPath: [],
}
const effectiveSnapshot: SerializableExecutionState = isTriggerBlock
? emptySnapshot
: snapshot || emptySnapshot
// Extract mock payload for trigger blocks
let workflowInput: any
if (isTriggerBlock) {
const workflowBlocks = useWorkflowStore.getState().blocks
const mergedStates = mergeSubblockState(workflowBlocks, workflowId)
const candidates = resolveStartCandidates(mergedStates, { execution: 'manual' })
const candidate = candidates.find((c) => c.blockId === blockId)
if (candidate) {
if (triggerNeedsMockPayload(candidate)) {
workflowInput = extractTriggerMockPayload(candidate)
} else if (
candidate.path === StartBlockPath.SPLIT_API ||
candidate.path === StartBlockPath.SPLIT_INPUT ||
candidate.path === StartBlockPath.UNIFIED
) {
const inputFormatValue = candidate.block.subBlocks?.inputFormat?.value
if (Array.isArray(inputFormatValue)) {
const testInput: Record<string, any> = {}
inputFormatValue.forEach((field: any) => {
if (field && typeof field === 'object' && field.name && field.value !== undefined) {
testInput[field.name] = coerceValue(field.type, field.value)
}
})
if (Object.keys(testInput).length > 0) {
workflowInput = testInput
}
}
}
} else {
// Fallback: block is trigger by position but not classified as start candidate
const block = mergedStates[blockId]
if (block) {
const blockConfig = getBlock(block.type)
const hasTriggers = blockConfig?.triggers?.available?.length
if (hasTriggers || block.triggerMode) {
workflowInput = extractTriggerMockPayload({
blockId,
block,
path: StartBlockPath.EXTERNAL_TRIGGER,
})
}
}
}
}
setIsExecuting(true)
const executionId = uuidv4()
const accumulatedBlockLogs: BlockLog[] = []
const accumulatedBlockStates = new Map<string, BlockState>()
const executedBlockIds = new Set<string>()
const activeBlocksSet = new Set<string>()
try {
await executionStream.executeFromBlock({
workflowId,
startBlockId: blockId,
sourceSnapshot: effectiveSnapshot,
input: workflowInput,
callbacks: {
onBlockStarted: (data) => {
activeBlocksSet.add(data.blockId)
setActiveBlocks(new Set(activeBlocksSet))
const incomingEdges = workflowEdges.filter((edge) => edge.target === data.blockId)
incomingEdges.forEach((edge) => {
setEdgeRunStatus(edge.id, 'success')
})
},
onBlockCompleted: (data) => {
activeBlocksSet.delete(data.blockId)
setActiveBlocks(new Set(activeBlocksSet))
setBlockRunStatus(data.blockId, 'success')
executedBlockIds.add(data.blockId)
accumulatedBlockStates.set(data.blockId, {
output: data.output,
executed: true,
executionTime: data.durationMs,
})
const isContainerBlock = data.blockType === 'loop' || data.blockType === 'parallel'
if (isContainerBlock) return
const startedAt = new Date(Date.now() - data.durationMs).toISOString()
const endedAt = new Date().toISOString()
accumulatedBlockLogs.push({
blockId: data.blockId,
blockName: data.blockName || 'Unknown Block',
blockType: data.blockType || 'unknown',
input: data.input || {},
output: data.output,
success: true,
durationMs: data.durationMs,
startedAt,
endedAt,
})
addConsole({
input: data.input || {},
output: data.output,
success: true,
durationMs: data.durationMs,
startedAt,
endedAt,
workflowId,
blockId: data.blockId,
executionId,
blockName: data.blockName || 'Unknown Block',
blockType: data.blockType || 'unknown',
iterationCurrent: data.iterationCurrent,
iterationTotal: data.iterationTotal,
iterationType: data.iterationType,
})
},
onBlockError: (data) => {
activeBlocksSet.delete(data.blockId)
setActiveBlocks(new Set(activeBlocksSet))
setBlockRunStatus(data.blockId, 'error')
const startedAt = new Date(Date.now() - data.durationMs).toISOString()
const endedAt = new Date().toISOString()
accumulatedBlockLogs.push({
blockId: data.blockId,
blockName: data.blockName || 'Unknown Block',
blockType: data.blockType || 'unknown',
input: data.input || {},
output: {},
success: false,
error: data.error,
durationMs: data.durationMs,
startedAt,
endedAt,
})
addConsole({
input: data.input || {},
output: {},
success: false,
error: data.error,
durationMs: data.durationMs,
startedAt,
endedAt,
workflowId,
blockId: data.blockId,
executionId,
blockName: data.blockName,
blockType: data.blockType,
iterationCurrent: data.iterationCurrent,
iterationTotal: data.iterationTotal,
iterationType: data.iterationType,
})
},
onExecutionCompleted: (data) => {
if (data.success) {
// Add the start block (trigger) to executed blocks
executedBlockIds.add(blockId)
const mergedBlockStates: Record<string, BlockState> = {
...effectiveSnapshot.blockStates,
}
for (const [bId, state] of accumulatedBlockStates) {
mergedBlockStates[bId] = state
}
const mergedExecutedBlocks = new Set([
...effectiveSnapshot.executedBlocks,
...executedBlockIds,
])
const updatedSnapshot: SerializableExecutionState = {
...effectiveSnapshot,
blockStates: mergedBlockStates,
executedBlocks: Array.from(mergedExecutedBlocks),
blockLogs: [...effectiveSnapshot.blockLogs, ...accumulatedBlockLogs],
activeExecutionPath: Array.from(mergedExecutedBlocks),
}
setLastExecutionSnapshot(workflowId, updatedSnapshot)
}
},
onExecutionError: (data) => {
const isWorkflowModified =
data.error?.includes('Block not found in workflow') ||
data.error?.includes('Upstream dependency not executed')
if (isWorkflowModified) {
clearLastExecutionSnapshot(workflowId)
addNotification({
level: 'error',
message:
'Workflow was modified. Run the workflow again to enable running from block.',
workflowId,
})
} else {
addNotification({
level: 'error',
message: data.error || 'Run from block failed',
workflowId,
})
}
},
},
})
} catch (error) {
if ((error as Error).name !== 'AbortError') {
logger.error('Run-from-block failed:', error)
}
} finally {
setIsExecuting(false)
setActiveBlocks(new Set())
}
},
[
getLastExecutionSnapshot,
setLastExecutionSnapshot,
clearLastExecutionSnapshot,
setIsExecuting,
setActiveBlocks,
setBlockRunStatus,
setEdgeRunStatus,
addNotification,
addConsole,
executionStream,
]
)
/**
* Handles running workflow until a specific block (stops after that block completes)
*/
const handleRunUntilBlock = useCallback(
async (blockId: string, workflowId: string) => {
if (!workflowId || workflowId !== activeWorkflowId) {
logger.error('Invalid workflow ID for run-until-block', { workflowId, activeWorkflowId })
return
}
logger.info('Starting run-until-block execution', { workflowId, stopAfterBlockId: blockId })
setExecutionResult(null)
setIsExecuting(true)
const executionId = uuidv4()
try {
const result = await executeWorkflow(
undefined,
undefined,
executionId,
undefined,
'manual',
blockId
)
if (result && 'success' in result) {
setExecutionResult(result)
}
} catch (error) {
const errorResult = handleExecutionError(error, { executionId })
return errorResult
} finally {
setIsExecuting(false)
setIsDebugging(false)
setActiveBlocks(new Set())
}
},
[activeWorkflowId, setExecutionResult, setIsExecuting, setIsDebugging, setActiveBlocks]
)
return {
isExecuting,
isDebugging,
@@ -1386,5 +1772,7 @@ export function useWorkflowExecution() {
handleResumeDebug,
handleCancelDebug,
handleCancelExecution,
handleRunFromBlock,
handleRunUntilBlock,
}
}

View File

@@ -47,6 +47,7 @@ import {
useCurrentWorkflow,
useNodeUtilities,
useShiftSelectionLock,
useWorkflowExecution,
} from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks'
import {
calculateContainerDimensions,
@@ -302,6 +303,8 @@ const WorkflowContent = React.memo(() => {
const showTrainingModal = useCopilotTrainingStore((state) => state.showModal)
const { handleRunFromBlock, handleRunUntilBlock } = useWorkflowExecution()
const snapToGridSize = useSnapToGridSize()
const snapToGrid = snapToGridSize > 0
@@ -733,13 +736,16 @@ const WorkflowContent = React.memo(() => {
[collaborativeBatchAddBlocks, setSelectedEdges, setPendingSelection]
)
const { activeBlockIds, pendingBlocks, isDebugging } = useExecutionStore(
useShallow((state) => ({
activeBlockIds: state.activeBlockIds,
pendingBlocks: state.pendingBlocks,
isDebugging: state.isDebugging,
}))
)
const { activeBlockIds, pendingBlocks, isDebugging, isExecuting, getLastExecutionSnapshot } =
useExecutionStore(
useShallow((state) => ({
activeBlockIds: state.activeBlockIds,
pendingBlocks: state.pendingBlocks,
isDebugging: state.isDebugging,
isExecuting: state.isExecuting,
getLastExecutionSnapshot: state.getLastExecutionSnapshot,
}))
)
const [dragStartParentId, setDragStartParentId] = useState<string | null>(null)
@@ -1102,6 +1108,50 @@ const WorkflowContent = React.memo(() => {
}
}, [contextMenuBlocks])
const handleContextRunFromBlock = useCallback(() => {
if (contextMenuBlocks.length !== 1) return
const blockId = contextMenuBlocks[0].id
handleRunFromBlock(blockId, workflowIdParam)
}, [contextMenuBlocks, workflowIdParam, handleRunFromBlock])
const handleContextRunUntilBlock = useCallback(() => {
if (contextMenuBlocks.length !== 1) return
const blockId = contextMenuBlocks[0].id
handleRunUntilBlock(blockId, workflowIdParam)
}, [contextMenuBlocks, workflowIdParam, handleRunUntilBlock])
const runFromBlockState = useMemo(() => {
if (contextMenuBlocks.length !== 1) {
return { canRun: false, reason: undefined }
}
const block = contextMenuBlocks[0]
const snapshot = getLastExecutionSnapshot(workflowIdParam)
const incomingEdges = edges.filter((edge) => edge.target === block.id)
const isTriggerBlock = incomingEdges.length === 0
// Check if each source block is either executed OR is a trigger block (triggers don't need prior execution)
const isSourceSatisfied = (sourceId: string) => {
if (snapshot?.executedBlocks.includes(sourceId)) return true
// Check if source is a trigger (has no incoming edges itself)
const sourceIncomingEdges = edges.filter((edge) => edge.target === sourceId)
return sourceIncomingEdges.length === 0
}
// Non-trigger blocks need a snapshot to exist (so upstream outputs are available)
const dependenciesSatisfied =
isTriggerBlock || (snapshot && incomingEdges.every((edge) => isSourceSatisfied(edge.source)))
const isNoteBlock = block.type === 'note'
const isInsideSubflow =
block.parentId && (block.parentType === 'loop' || block.parentType === 'parallel')
if (isInsideSubflow) return { canRun: false, reason: 'Cannot run from inside subflow' }
if (!dependenciesSatisfied) return { canRun: false, reason: 'Run upstream blocks first' }
if (isNoteBlock) return { canRun: false, reason: undefined }
if (isExecuting) return { canRun: false, reason: undefined }
return { canRun: true, reason: undefined }
}, [contextMenuBlocks, edges, workflowIdParam, getLastExecutionSnapshot, isExecuting])
const handleContextAddBlock = useCallback(() => {
useSearchModalStore.getState().open()
}, [])
@@ -2302,33 +2352,12 @@ const WorkflowContent = React.memo(() => {
window.removeEventListener('remove-from-subflow', handleRemoveFromSubflow as EventListener)
}, [blocks, edgesForDisplay, getNodeAbsolutePosition, collaborativeBatchUpdateParent])
/** Handles node changes - applies changes and resolves parent-child selection conflicts. */
const onNodesChange = useCallback(
(changes: NodeChange[]) => {
selectedIdsRef.current = null
setDisplayNodes((nds) => {
const updated = applyNodeChanges(changes, nds)
const hasSelectionChange = changes.some((c) => c.type === 'select')
if (!hasSelectionChange) return updated
const resolved = resolveParentChildSelectionConflicts(updated, blocks)
selectedIdsRef.current = resolved.filter((node) => node.selected).map((node) => node.id)
return resolved
})
const selectedIds = selectedIdsRef.current as string[] | null
if (selectedIds !== null) {
syncPanelWithSelection(selectedIds)
}
},
[blocks]
)
/**
* Updates container dimensions in displayNodes during drag.
* This allows live resizing of containers as their children are dragged.
* Updates container dimensions in displayNodes during drag or keyboard movement.
*/
const updateContainerDimensionsDuringDrag = useCallback(
(draggedNodeId: string, draggedNodePosition: { x: number; y: number }) => {
const parentId = blocks[draggedNodeId]?.data?.parentId
const updateContainerDimensionsDuringMove = useCallback(
(movedNodeId: string, movedNodePosition: { x: number; y: number }) => {
const parentId = blocks[movedNodeId]?.data?.parentId
if (!parentId) return
setDisplayNodes((currentNodes) => {
@@ -2336,7 +2365,7 @@ const WorkflowContent = React.memo(() => {
if (childNodes.length === 0) return currentNodes
const childPositions = childNodes.map((node) => {
const nodePosition = node.id === draggedNodeId ? draggedNodePosition : node.position
const nodePosition = node.id === movedNodeId ? movedNodePosition : node.position
const { width, height } = getBlockDimensions(node.id)
return { x: nodePosition.x, y: nodePosition.y, width, height }
})
@@ -2367,6 +2396,55 @@ const WorkflowContent = React.memo(() => {
[blocks, getBlockDimensions]
)
/** Handles node changes - applies changes and resolves parent-child selection conflicts. */
const onNodesChange = useCallback(
(changes: NodeChange[]) => {
selectedIdsRef.current = null
setDisplayNodes((nds) => {
const updated = applyNodeChanges(changes, nds)
const hasSelectionChange = changes.some((c) => c.type === 'select')
if (!hasSelectionChange) return updated
const resolved = resolveParentChildSelectionConflicts(updated, blocks)
selectedIdsRef.current = resolved.filter((node) => node.selected).map((node) => node.id)
return resolved
})
const selectedIds = selectedIdsRef.current as string[] | null
if (selectedIds !== null) {
syncPanelWithSelection(selectedIds)
}
// Handle position changes (e.g., from keyboard arrow key movement)
// Update container dimensions when child nodes are moved and persist to backend
// Only persist if not in a drag operation (drag-end is handled by onNodeDragStop)
const isInDragOperation =
getDragStartPosition() !== null || multiNodeDragStartRef.current.size > 0
const keyboardPositionUpdates: Array<{ id: string; position: { x: number; y: number } }> = []
for (const change of changes) {
if (
change.type === 'position' &&
!change.dragging &&
'position' in change &&
change.position
) {
updateContainerDimensionsDuringMove(change.id, change.position)
if (!isInDragOperation) {
keyboardPositionUpdates.push({ id: change.id, position: change.position })
}
}
}
// Persist keyboard movements to backend for collaboration sync
if (keyboardPositionUpdates.length > 0) {
collaborativeBatchUpdatePositions(keyboardPositionUpdates)
}
},
[
blocks,
updateContainerDimensionsDuringMove,
collaborativeBatchUpdatePositions,
getDragStartPosition,
]
)
/**
* Effect to resize loops when nodes change (add/remove/position change).
* Runs on structural changes only - not during drag (position-only changes).
@@ -2611,7 +2689,7 @@ const WorkflowContent = React.memo(() => {
// If the node is inside a container, update container dimensions during drag
if (currentParentId) {
updateContainerDimensionsDuringDrag(node.id, node.position)
updateContainerDimensionsDuringMove(node.id, node.position)
}
// Check if this is a starter block - starter blocks should never be in containers
@@ -2728,7 +2806,7 @@ const WorkflowContent = React.memo(() => {
blocks,
getNodeAbsolutePosition,
getNodeDepth,
updateContainerDimensionsDuringDrag,
updateContainerDimensionsDuringMove,
highlightContainerNode,
]
)
@@ -3418,11 +3496,19 @@ const WorkflowContent = React.memo(() => {
onRemoveFromSubflow={handleContextRemoveFromSubflow}
onOpenEditor={handleContextOpenEditor}
onRename={handleContextRename}
onRunFromBlock={handleContextRunFromBlock}
onRunUntilBlock={handleContextRunUntilBlock}
hasClipboard={hasClipboard()}
showRemoveFromSubflow={contextMenuBlocks.some(
(b) => b.parentId && (b.parentType === 'loop' || b.parentType === 'parallel')
)}
canRunFromBlock={runFromBlockState.canRun}
disableEdit={!effectivePermissions.canEdit}
isExecuting={isExecuting}
isPositionalTrigger={
contextMenuBlocks.length === 1 &&
edges.filter((e) => e.target === contextMenuBlocks[0]?.id).length === 0
}
/>
<CanvasMenu

View File

@@ -1,241 +0,0 @@
/**
* Search utility functions for tiered matching algorithm
* Provides predictable search results prioritizing exact matches over fuzzy matches
*/
export interface SearchableItem {
id: string
name: string
description?: string
type: string
aliases?: string[]
[key: string]: any
}
export interface SearchResult<T extends SearchableItem> {
item: T
score: number
matchType: 'exact' | 'prefix' | 'alias' | 'word-boundary' | 'substring' | 'description'
}
const SCORE_EXACT_MATCH = 10000
const SCORE_PREFIX_MATCH = 5000
const SCORE_ALIAS_MATCH = 3000
const SCORE_WORD_BOUNDARY = 1000
const SCORE_SUBSTRING_MATCH = 100
const DESCRIPTION_WEIGHT = 0.3
/**
* Calculate match score for a single field
* Returns 0 if no match found
*/
function calculateFieldScore(
query: string,
field: string
): {
score: number
matchType: 'exact' | 'prefix' | 'word-boundary' | 'substring' | null
} {
const normalizedQuery = query.toLowerCase().trim()
const normalizedField = field.toLowerCase().trim()
if (!normalizedQuery || !normalizedField) {
return { score: 0, matchType: null }
}
// Tier 1: Exact match
if (normalizedField === normalizedQuery) {
return { score: SCORE_EXACT_MATCH, matchType: 'exact' }
}
// Tier 2: Prefix match (starts with query)
if (normalizedField.startsWith(normalizedQuery)) {
return { score: SCORE_PREFIX_MATCH, matchType: 'prefix' }
}
// Tier 3: Word boundary match (query matches start of a word)
const words = normalizedField.split(/[\s-_/]+/)
const hasWordBoundaryMatch = words.some((word) => word.startsWith(normalizedQuery))
if (hasWordBoundaryMatch) {
return { score: SCORE_WORD_BOUNDARY, matchType: 'word-boundary' }
}
// Tier 4: Substring match (query appears anywhere)
if (normalizedField.includes(normalizedQuery)) {
return { score: SCORE_SUBSTRING_MATCH, matchType: 'substring' }
}
// No match
return { score: 0, matchType: null }
}
/**
* Check if query matches any alias in the item's aliases array
* Returns the alias score if a match is found, 0 otherwise
*/
function calculateAliasScore(
query: string,
aliases?: string[]
): { score: number; matchType: 'alias' | null } {
if (!aliases || aliases.length === 0) {
return { score: 0, matchType: null }
}
const normalizedQuery = query.toLowerCase().trim()
for (const alias of aliases) {
const normalizedAlias = alias.toLowerCase().trim()
if (normalizedAlias === normalizedQuery) {
return { score: SCORE_ALIAS_MATCH, matchType: 'alias' }
}
if (normalizedAlias.startsWith(normalizedQuery)) {
return { score: SCORE_ALIAS_MATCH * 0.8, matchType: 'alias' }
}
if (normalizedQuery.includes(normalizedAlias) || normalizedAlias.includes(normalizedQuery)) {
return { score: SCORE_ALIAS_MATCH * 0.6, matchType: 'alias' }
}
}
return { score: 0, matchType: null }
}
/**
* Calculate multi-word match score
* Each word in the query must appear somewhere in the field
* Returns a score based on how well the words match
*/
function calculateMultiWordScore(
queryWords: string[],
field: string
): { score: number; matchType: 'word-boundary' | 'substring' | null } {
const normalizedField = field.toLowerCase().trim()
const fieldWords = normalizedField.split(/[\s\-_/:]+/)
let allWordsMatch = true
let totalScore = 0
let hasWordBoundary = false
for (const queryWord of queryWords) {
const wordBoundaryMatch = fieldWords.some((fw) => fw.startsWith(queryWord))
const substringMatch = normalizedField.includes(queryWord)
if (wordBoundaryMatch) {
totalScore += SCORE_WORD_BOUNDARY
hasWordBoundary = true
} else if (substringMatch) {
totalScore += SCORE_SUBSTRING_MATCH
} else {
allWordsMatch = false
break
}
}
if (!allWordsMatch) {
return { score: 0, matchType: null }
}
return {
score: totalScore / queryWords.length,
matchType: hasWordBoundary ? 'word-boundary' : 'substring',
}
}
/**
* Search items using tiered matching algorithm
* Returns items sorted by relevance (highest score first)
*/
export function searchItems<T extends SearchableItem>(
query: string,
items: T[]
): SearchResult<T>[] {
const normalizedQuery = query.trim()
if (!normalizedQuery) {
return []
}
const results: SearchResult<T>[] = []
const queryWords = normalizedQuery.toLowerCase().split(/\s+/).filter(Boolean)
const isMultiWord = queryWords.length > 1
for (const item of items) {
const nameMatch = calculateFieldScore(normalizedQuery, item.name)
const descMatch = item.description
? calculateFieldScore(normalizedQuery, item.description)
: { score: 0, matchType: null }
const aliasMatch = calculateAliasScore(normalizedQuery, item.aliases)
let nameScore = nameMatch.score
let descScore = descMatch.score * DESCRIPTION_WEIGHT
const aliasScore = aliasMatch.score
let bestMatchType = nameMatch.matchType
// For multi-word queries, also try matching each word independently and take the better score
if (isMultiWord) {
const multiWordNameMatch = calculateMultiWordScore(queryWords, item.name)
if (multiWordNameMatch.score > nameScore) {
nameScore = multiWordNameMatch.score
bestMatchType = multiWordNameMatch.matchType
}
if (item.description) {
const multiWordDescMatch = calculateMultiWordScore(queryWords, item.description)
const multiWordDescScore = multiWordDescMatch.score * DESCRIPTION_WEIGHT
if (multiWordDescScore > descScore) {
descScore = multiWordDescScore
}
}
}
const bestScore = Math.max(nameScore, descScore, aliasScore)
if (bestScore > 0) {
let matchType: SearchResult<T>['matchType'] = 'substring'
if (nameScore >= descScore && nameScore >= aliasScore) {
matchType = bestMatchType || 'substring'
} else if (aliasScore >= descScore) {
matchType = 'alias'
} else {
matchType = 'description'
}
results.push({
item,
score: bestScore,
matchType,
})
}
}
results.sort((a, b) => b.score - a.score)
return results
}
/**
* Get a human-readable match type label
*/
export function getMatchTypeLabel(matchType: SearchResult<any>['matchType']): string {
switch (matchType) {
case 'exact':
return 'Exact match'
case 'prefix':
return 'Starts with'
case 'alias':
return 'Similar to'
case 'word-boundary':
return 'Word match'
case 'substring':
return 'Contains'
case 'description':
return 'In description'
default:
return 'Match'
}
}

View File

@@ -176,7 +176,7 @@ function FormattedInput({
onChange,
onScroll,
}: FormattedInputProps) {
const handleScroll = (e: React.UIEvent<HTMLInputElement>) => {
const handleScroll = (e: { currentTarget: HTMLInputElement }) => {
onScroll(e.currentTarget.scrollLeft)
}

View File

@@ -73,7 +73,12 @@ export const Sidebar = memo(function Sidebar() {
const { data: sessionData, isPending: sessionLoading } = useSession()
const { canEdit } = useUserPermissionsContext()
const { config: permissionConfig } = usePermissionConfig()
const { config: permissionConfig, filterBlocks } = usePermissionConfig()
const initializeSearchData = useSearchModalStore((state) => state.initializeData)
useEffect(() => {
initializeSearchData(filterBlocks)
}, [initializeSearchData, filterBlocks])
/**
* Sidebar state from store with hydration tracking to prevent SSR mismatch.

File diff suppressed because one or more lines are too long

View File

@@ -33,6 +33,15 @@ export interface DAG {
parallelConfigs: Map<string, SerializedParallel>
}
export interface DAGBuildOptions {
/** Trigger block ID to start path construction from */
triggerBlockId?: string
/** Saved incoming edges from snapshot for resumption */
savedIncomingEdges?: Record<string, string[]>
/** Include all enabled blocks instead of only those reachable from trigger */
includeAllBlocks?: boolean
}
export class DAGBuilder {
private pathConstructor = new PathConstructor()
private loopConstructor = new LoopConstructor()
@@ -40,11 +49,9 @@ export class DAGBuilder {
private nodeConstructor = new NodeConstructor()
private edgeConstructor = new EdgeConstructor()
build(
workflow: SerializedWorkflow,
triggerBlockId?: string,
savedIncomingEdges?: Record<string, string[]>
): DAG {
build(workflow: SerializedWorkflow, options: DAGBuildOptions = {}): DAG {
const { triggerBlockId, savedIncomingEdges, includeAllBlocks } = options
const dag: DAG = {
nodes: new Map(),
loopConfigs: new Map(),
@@ -53,7 +60,7 @@ export class DAGBuilder {
this.initializeConfigs(workflow, dag)
const reachableBlocks = this.pathConstructor.execute(workflow, triggerBlockId)
const reachableBlocks = this.pathConstructor.execute(workflow, triggerBlockId, includeAllBlocks)
this.loopConstructor.execute(dag, reachableBlocks)
this.parallelConstructor.execute(dag, reachableBlocks)

View File

@@ -207,6 +207,7 @@ export class EdgeConstructor {
for (const connection of workflow.connections) {
let { source, target } = connection
const originalSource = source
const originalTarget = target
let sourceHandle = this.generateSourceHandle(
source,
target,
@@ -257,14 +258,14 @@ export class EdgeConstructor {
target = sentinelStartId
}
if (loopSentinelStartId) {
this.addEdge(dag, loopSentinelStartId, target, EDGE.LOOP_EXIT, targetHandle)
}
if (this.edgeCrossesLoopBoundary(source, target, blocksInLoops, dag)) {
continue
}
if (loopSentinelStartId && !blocksInLoops.has(originalTarget)) {
this.addEdge(dag, loopSentinelStartId, target, EDGE.LOOP_EXIT, targetHandle)
}
if (!this.isEdgeReachable(source, target, reachableBlocks, dag)) {
continue
}

View File

@@ -6,7 +6,16 @@ import type { SerializedBlock, SerializedWorkflow } from '@/serializer/types'
const logger = createLogger('PathConstructor')
export class PathConstructor {
execute(workflow: SerializedWorkflow, triggerBlockId?: string): Set<string> {
execute(
workflow: SerializedWorkflow,
triggerBlockId?: string,
includeAllBlocks?: boolean
): Set<string> {
// For run-from-block mode, include all enabled blocks regardless of trigger reachability
if (includeAllBlocks) {
return this.getAllEnabledBlocks(workflow)
}
const resolvedTriggerId = this.findTriggerBlock(workflow, triggerBlockId)
if (!resolvedTriggerId) {

View File

@@ -4,6 +4,7 @@ import {
containsUserFileWithMetadata,
hydrateUserFilesWithBase64,
} from '@/lib/uploads/utils/user-file-base64.server'
import { sanitizeInputFormat, sanitizeTools } from '@/lib/workflows/comparison/normalize'
import {
BlockType,
buildResumeApiUrl,
@@ -34,6 +35,7 @@ import { validateBlockType } from '@/executor/utils/permission-check'
import type { VariableResolver } from '@/executor/variables/resolver'
import type { SerializedBlock } from '@/serializer/types'
import type { SubflowType } from '@/stores/workflows/workflow/types'
import { SYSTEM_SUBBLOCK_IDS } from '@/triggers/constants'
const logger = createLogger('BlockExecutor')
@@ -87,7 +89,7 @@ export class BlockExecutor {
resolvedInputs = this.resolver.resolveInputs(ctx, node.id, block.config.params, block)
if (blockLog) {
blockLog.input = this.parseJsonInputs(resolvedInputs)
blockLog.input = this.sanitizeInputsForLog(resolvedInputs)
}
} catch (error) {
cleanupSelfReference?.()
@@ -162,7 +164,7 @@ export class BlockExecutor {
ctx,
node,
block,
this.parseJsonInputs(resolvedInputs),
this.sanitizeInputsForLog(resolvedInputs),
displayOutput,
duration
)
@@ -241,7 +243,7 @@ export class BlockExecutor {
blockLog.durationMs = duration
blockLog.success = false
blockLog.error = errorMessage
blockLog.input = this.parseJsonInputs(input)
blockLog.input = this.sanitizeInputsForLog(input)
blockLog.output = filterOutputForLog(block.metadata?.id || '', errorOutput, { block })
}
@@ -260,7 +262,7 @@ export class BlockExecutor {
ctx,
node,
block,
this.parseJsonInputs(input),
this.sanitizeInputsForLog(input),
displayOutput,
duration
)
@@ -352,29 +354,41 @@ export class BlockExecutor {
}
/**
* Parse JSON string inputs to objects for log display only.
* Attempts to parse any string that looks like JSON.
* Sanitizes inputs for log display.
* - Filters out system fields (UI-only, readonly, internal flags)
* - Removes UI state from inputFormat items (e.g., collapsed)
* - Parses JSON strings to objects for readability
* Returns a new object - does not mutate the original inputs.
*/
private parseJsonInputs(inputs: Record<string, any>): Record<string, any> {
let result = inputs
let hasChanges = false
private sanitizeInputsForLog(inputs: Record<string, any>): Record<string, any> {
const result: Record<string, any> = {}
for (const [key, value] of Object.entries(inputs)) {
// isJSONString is a quick heuristic (checks for { or [), not a validator.
// Invalid JSON is safely caught below - this just avoids JSON.parse on every string.
if (typeof value !== 'string' || !isJSONString(value)) {
if (SYSTEM_SUBBLOCK_IDS.includes(key) || key === 'triggerMode') {
continue
}
try {
if (!hasChanges) {
result = { ...inputs }
hasChanges = true
if (key === 'inputFormat' && Array.isArray(value)) {
result[key] = sanitizeInputFormat(value)
continue
}
if (key === 'tools' && Array.isArray(value)) {
result[key] = sanitizeTools(value)
continue
}
// isJSONString is a quick heuristic (checks for { or [), not a validator.
// Invalid JSON is safely caught below - this just avoids JSON.parse on every string.
if (typeof value === 'string' && isJSONString(value)) {
try {
result[key] = JSON.parse(value.trim())
} catch {
// Not valid JSON, keep original string
result[key] = value
}
result[key] = JSON.parse(value.trim())
} catch {
// Not valid JSON, keep original string
} else {
result[key] = value
}
}

View File

@@ -77,15 +77,16 @@ export class EdgeManager {
}
}
// Check if any deactivation targets that previously received an activated edge are now ready
for (const { target } of edgesToDeactivate) {
if (
!readyNodes.includes(target) &&
!activatedTargets.includes(target) &&
this.nodesWithActivatedEdge.has(target) &&
this.isTargetReady(target)
) {
readyNodes.push(target)
if (output.selectedRoute !== EDGE.LOOP_EXIT && output.selectedRoute !== EDGE.PARALLEL_EXIT) {
for (const { target } of edgesToDeactivate) {
if (
!readyNodes.includes(target) &&
!activatedTargets.includes(target) &&
this.nodesWithActivatedEdge.has(target) &&
this.isTargetReady(target)
) {
readyNodes.push(target)
}
}
}

View File

@@ -26,6 +26,7 @@ export class ExecutionEngine {
private allowResumeTriggers: boolean
private cancelledFlag = false
private errorFlag = false
private stoppedEarlyFlag = false
private executionError: Error | null = null
private lastCancellationCheck = 0
private readonly useRedisCancellation: boolean
@@ -105,7 +106,7 @@ export class ExecutionEngine {
this.initializeQueue(triggerBlockId)
while (this.hasWork()) {
if ((await this.checkCancellation()) || this.errorFlag) {
if ((await this.checkCancellation()) || this.errorFlag || this.stoppedEarlyFlag) {
break
}
await this.processQueue()
@@ -259,6 +260,16 @@ export class ExecutionEngine {
}
private initializeQueue(triggerBlockId?: string): void {
if (this.context.runFromBlockContext) {
const { startBlockId } = this.context.runFromBlockContext
logger.info('Initializing queue for run-from-block mode', {
startBlockId,
dirtySetSize: this.context.runFromBlockContext.dirtySet.size,
})
this.addToQueue(startBlockId)
return
}
const pendingBlocks = this.context.metadata.pendingBlocks
const remainingEdges = (this.context.metadata as any).remainingEdges
@@ -385,11 +396,28 @@ export class ExecutionEngine {
this.finalOutput = output
}
if (this.context.stopAfterBlockId === nodeId) {
// For loop/parallel sentinels, only stop if the subflow has fully exited (all iterations done)
// shouldContinue: true means more iterations, shouldExit: true means loop is done
const shouldContinueLoop = output.shouldContinue === true
if (!shouldContinueLoop) {
logger.info('Stopping execution after target block', { nodeId })
this.stoppedEarlyFlag = true
return
}
}
const readyNodes = this.edgeManager.processOutgoingEdges(node, output, false)
logger.info('Processing outgoing edges', {
nodeId,
outgoingEdgesCount: node.outgoingEdges.size,
outgoingEdges: Array.from(node.outgoingEdges.entries()).map(([id, e]) => ({
id,
target: e.target,
sourceHandle: e.sourceHandle,
})),
output,
readyNodesCount: readyNodes.length,
readyNodes,
})

View File

@@ -5,17 +5,31 @@ import { BlockExecutor } from '@/executor/execution/block-executor'
import { EdgeManager } from '@/executor/execution/edge-manager'
import { ExecutionEngine } from '@/executor/execution/engine'
import { ExecutionState } from '@/executor/execution/state'
import type { ContextExtensions, WorkflowInput } from '@/executor/execution/types'
import type {
ContextExtensions,
SerializableExecutionState,
WorkflowInput,
} from '@/executor/execution/types'
import { createBlockHandlers } from '@/executor/handlers/registry'
import { LoopOrchestrator } from '@/executor/orchestrators/loop'
import { NodeExecutionOrchestrator } from '@/executor/orchestrators/node'
import { ParallelOrchestrator } from '@/executor/orchestrators/parallel'
import type { BlockState, ExecutionContext, ExecutionResult } from '@/executor/types'
import {
computeExecutionSets,
type RunFromBlockContext,
resolveContainerToSentinelStart,
validateRunFromBlock,
} from '@/executor/utils/run-from-block'
import {
buildResolutionFromBlock,
buildStartBlockOutput,
resolveExecutorStartBlock,
} from '@/executor/utils/start-block'
import {
extractLoopIdFromSentinel,
extractParallelIdFromSentinel,
} from '@/executor/utils/subflow-utils'
import { VariableResolver } from '@/executor/variables/resolver'
import type { SerializedWorkflow } from '@/serializer/types'
@@ -48,7 +62,10 @@ export class DAGExecutor {
async execute(workflowId: string, triggerBlockId?: string): Promise<ExecutionResult> {
const savedIncomingEdges = this.contextExtensions.dagIncomingEdges
const dag = this.dagBuilder.build(this.workflow, triggerBlockId, savedIncomingEdges)
const dag = this.dagBuilder.build(this.workflow, {
triggerBlockId,
savedIncomingEdges,
})
const { context, state } = this.createExecutionContext(workflowId, triggerBlockId)
const resolver = new VariableResolver(this.workflow, this.workflowVariables, state)
@@ -89,17 +106,156 @@ export class DAGExecutor {
}
}
/**
* Execute from a specific block using cached outputs for upstream blocks.
*/
async executeFromBlock(
workflowId: string,
startBlockId: string,
sourceSnapshot: SerializableExecutionState
): Promise<ExecutionResult> {
// Build full DAG with all blocks to compute upstream set for snapshot filtering
// includeAllBlocks is needed because the startBlockId might be a trigger not reachable from the main trigger
const dag = this.dagBuilder.build(this.workflow, { includeAllBlocks: true })
const executedBlocks = new Set(sourceSnapshot.executedBlocks)
const validation = validateRunFromBlock(startBlockId, dag, executedBlocks)
if (!validation.valid) {
throw new Error(validation.error)
}
const { dirtySet, upstreamSet, reachableUpstreamSet } = computeExecutionSets(dag, startBlockId)
const effectiveStartBlockId = resolveContainerToSentinelStart(startBlockId, dag) ?? startBlockId
// Extract container IDs from sentinel IDs in reachable upstream set
// Use reachableUpstreamSet (not upstreamSet) to preserve sibling branch outputs
// Example: A->C, B->C where C references A.result || B.result
// When running from A, B's output should be preserved for C to reference
const reachableContainerIds = new Set<string>()
for (const nodeId of reachableUpstreamSet) {
const loopId = extractLoopIdFromSentinel(nodeId)
if (loopId) reachableContainerIds.add(loopId)
const parallelId = extractParallelIdFromSentinel(nodeId)
if (parallelId) reachableContainerIds.add(parallelId)
}
// Filter snapshot to include all blocks reachable from dirty blocks
// This preserves sibling branch outputs that dirty blocks may reference
const filteredBlockStates: Record<string, any> = {}
for (const [blockId, state] of Object.entries(sourceSnapshot.blockStates)) {
if (reachableUpstreamSet.has(blockId) || reachableContainerIds.has(blockId)) {
filteredBlockStates[blockId] = state
}
}
const filteredExecutedBlocks = sourceSnapshot.executedBlocks.filter(
(id) => reachableUpstreamSet.has(id) || reachableContainerIds.has(id)
)
// Filter loop/parallel executions to only include reachable containers
const filteredLoopExecutions: Record<string, any> = {}
if (sourceSnapshot.loopExecutions) {
for (const [loopId, execution] of Object.entries(sourceSnapshot.loopExecutions)) {
if (reachableContainerIds.has(loopId)) {
filteredLoopExecutions[loopId] = execution
}
}
}
const filteredParallelExecutions: Record<string, any> = {}
if (sourceSnapshot.parallelExecutions) {
for (const [parallelId, execution] of Object.entries(sourceSnapshot.parallelExecutions)) {
if (reachableContainerIds.has(parallelId)) {
filteredParallelExecutions[parallelId] = execution
}
}
}
const filteredSnapshot: SerializableExecutionState = {
...sourceSnapshot,
blockStates: filteredBlockStates,
executedBlocks: filteredExecutedBlocks,
loopExecutions: filteredLoopExecutions,
parallelExecutions: filteredParallelExecutions,
}
logger.info('Executing from block', {
workflowId,
startBlockId,
effectiveStartBlockId,
dirtySetSize: dirtySet.size,
upstreamSetSize: upstreamSet.size,
reachableUpstreamSetSize: reachableUpstreamSet.size,
})
// Remove incoming edges from non-dirty sources so convergent blocks don't wait for cached upstream
for (const nodeId of dirtySet) {
const node = dag.nodes.get(nodeId)
if (!node) continue
const nonDirtyIncoming: string[] = []
for (const sourceId of node.incomingEdges) {
if (!dirtySet.has(sourceId)) {
nonDirtyIncoming.push(sourceId)
}
}
for (const sourceId of nonDirtyIncoming) {
node.incomingEdges.delete(sourceId)
}
}
const runFromBlockContext = { startBlockId: effectiveStartBlockId, dirtySet }
const { context, state } = this.createExecutionContext(workflowId, undefined, {
snapshotState: filteredSnapshot,
runFromBlockContext,
})
const resolver = new VariableResolver(this.workflow, this.workflowVariables, state)
const loopOrchestrator = new LoopOrchestrator(dag, state, resolver)
loopOrchestrator.setContextExtensions(this.contextExtensions)
const parallelOrchestrator = new ParallelOrchestrator(dag, state)
parallelOrchestrator.setResolver(resolver)
parallelOrchestrator.setContextExtensions(this.contextExtensions)
const allHandlers = createBlockHandlers()
const blockExecutor = new BlockExecutor(allHandlers, resolver, this.contextExtensions, state)
const edgeManager = new EdgeManager(dag)
loopOrchestrator.setEdgeManager(edgeManager)
const nodeOrchestrator = new NodeExecutionOrchestrator(
dag,
state,
blockExecutor,
loopOrchestrator,
parallelOrchestrator
)
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
return await engine.run()
}
private createExecutionContext(
workflowId: string,
triggerBlockId?: string
triggerBlockId?: string,
overrides?: {
snapshotState?: SerializableExecutionState
runFromBlockContext?: RunFromBlockContext
}
): { context: ExecutionContext; state: ExecutionState } {
const snapshotState = this.contextExtensions.snapshotState
const snapshotState = overrides?.snapshotState ?? this.contextExtensions.snapshotState
const blockStates = snapshotState?.blockStates
? new Map(Object.entries(snapshotState.blockStates))
: new Map<string, BlockState>()
const executedBlocks = snapshotState?.executedBlocks
let executedBlocks = snapshotState?.executedBlocks
? new Set(snapshotState.executedBlocks)
: new Set<string>()
if (overrides?.runFromBlockContext) {
const { dirtySet } = overrides.runFromBlockContext
executedBlocks = new Set([...executedBlocks].filter((id) => !dirtySet.has(id)))
logger.info('Cleared executed status for dirty blocks', {
dirtySetSize: dirtySet.size,
remainingExecutedBlocks: executedBlocks.size,
})
}
const state = new ExecutionState(blockStates, executedBlocks)
const context: ExecutionContext = {
@@ -109,7 +265,7 @@ export class DAGExecutor {
userId: this.contextExtensions.userId,
isDeployedContext: this.contextExtensions.isDeployedContext,
blockStates: state.getBlockStates(),
blockLogs: snapshotState?.blockLogs ?? [],
blockLogs: overrides?.runFromBlockContext ? [] : (snapshotState?.blockLogs ?? []),
metadata: {
...this.contextExtensions.metadata,
startTime: new Date().toISOString(),
@@ -169,6 +325,8 @@ export class DAGExecutor {
abortSignal: this.contextExtensions.abortSignal,
includeFileBase64: this.contextExtensions.includeFileBase64,
base64MaxBytes: this.contextExtensions.base64MaxBytes,
runFromBlockContext: overrides?.runFromBlockContext,
stopAfterBlockId: this.contextExtensions.stopAfterBlockId,
}
if (this.contextExtensions.resumeFromSnapshot) {
@@ -193,6 +351,15 @@ export class DAGExecutor {
pendingBlocks: context.metadata.pendingBlocks,
skipStarterBlockInit: true,
})
} else if (overrides?.runFromBlockContext) {
// In run-from-block mode, initialize the start block only if it's a regular block
// Skip for sentinels/containers (loop/parallel) which aren't real blocks
const startBlockId = overrides.runFromBlockContext.startBlockId
const isRegularBlock = this.workflow.blocks.some((b) => b.id === startBlockId)
if (isRegularBlock) {
this.initializeStarterBlock(context, state, startBlockId)
}
} else {
this.initializeStarterBlock(context, state, triggerBlockId)
}

View File

@@ -27,6 +27,8 @@ export interface ParallelScope {
items?: any[]
/** Error message if parallel validation failed (e.g., exceeded max branches) */
validationError?: string
/** Whether the parallel has an empty distribution and should be skipped */
isEmpty?: boolean
}
export class ExecutionState implements BlockStateController {

View File

@@ -1,5 +1,6 @@
import type { Edge } from 'reactflow'
import type { BlockLog, BlockState, NormalizedBlockOutput } from '@/executor/types'
import type { RunFromBlockContext } from '@/executor/utils/run-from-block'
import type { SubflowType } from '@/stores/workflows/workflow/types'
export interface ExecutionMetadata {
@@ -105,6 +106,17 @@ export interface ContextExtensions {
output: { input?: any; output: NormalizedBlockOutput; executionTime: number },
iterationContext?: IterationContext
) => Promise<void>
/**
* Run-from-block configuration. When provided, executor runs in partial
* execution mode starting from the specified block.
*/
runFromBlockContext?: RunFromBlockContext
/**
* Stop execution after this block completes. Used for "run until block" feature.
*/
stopAfterBlockId?: string
}
export interface WorkflowInput {

View File

@@ -276,7 +276,16 @@ export class LoopOrchestrator {
scope: LoopScope
): LoopContinuationResult {
const results = scope.allIterationOutputs
this.state.setBlockOutput(loopId, { results }, DEFAULTS.EXECUTION_TIME)
const output = { results }
this.state.setBlockOutput(loopId, output, DEFAULTS.EXECUTION_TIME)
// Emit onBlockComplete for the loop container so the UI can track it
if (this.contextExtensions?.onBlockComplete) {
this.contextExtensions.onBlockComplete(loopId, 'Loop', 'loop', {
output,
executionTime: DEFAULTS.EXECUTION_TIME,
})
}
return {
shouldContinue: false,
@@ -386,10 +395,10 @@ export class LoopOrchestrator {
return true
}
// forEach: skip if items array is empty
if (scope.loopType === 'forEach') {
if (!scope.items || scope.items.length === 0) {
logger.info('ForEach loop has empty items, skipping loop body', { loopId })
logger.info('ForEach loop has empty collection, skipping loop body', { loopId })
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
return false
}
return true
@@ -399,6 +408,8 @@ export class LoopOrchestrator {
if (scope.loopType === 'for') {
if (scope.maxIterations === 0) {
logger.info('For loop has 0 iterations, skipping loop body', { loopId })
// Set empty output for the loop
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
return false
}
return true

View File

@@ -31,7 +31,18 @@ export class NodeExecutionOrchestrator {
throw new Error(`Node not found in DAG: ${nodeId}`)
}
if (this.state.hasExecuted(nodeId)) {
if (ctx.runFromBlockContext && !ctx.runFromBlockContext.dirtySet.has(nodeId)) {
const cachedOutput = this.state.getBlockOutput(nodeId) || {}
logger.debug('Skipping non-dirty block in run-from-block mode', { nodeId })
return {
nodeId,
output: cachedOutput,
isFinalOutput: false,
}
}
const isDirtyBlock = ctx.runFromBlockContext?.dirtySet.has(nodeId) ?? false
if (!isDirtyBlock && this.state.hasExecuted(nodeId)) {
const output = this.state.getBlockOutput(nodeId) || {}
return {
nodeId,
@@ -97,7 +108,7 @@ export class NodeExecutionOrchestrator {
if (loopId) {
const shouldExecute = await this.loopOrchestrator.evaluateInitialCondition(ctx, loopId)
if (!shouldExecute) {
logger.info('While loop initial condition false, skipping loop body', { loopId })
logger.info('Loop initial condition false, skipping loop body', { loopId })
return {
sentinelStart: true,
shouldExit: true,
@@ -158,6 +169,17 @@ export class NodeExecutionOrchestrator {
this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel)
}
}
const scope = this.parallelOrchestrator.getParallelScope(ctx, parallelId)
if (scope?.isEmpty) {
logger.info('Parallel has empty distribution, skipping parallel body', { parallelId })
return {
sentinelStart: true,
shouldExit: true,
selectedRoute: EDGE.PARALLEL_EXIT,
}
}
return { sentinelStart: true }
}

View File

@@ -61,11 +61,13 @@ export class ParallelOrchestrator {
let items: any[] | undefined
let branchCount: number
let isEmpty = false
try {
const resolved = this.resolveBranchCount(ctx, parallelConfig)
const resolved = this.resolveBranchCount(ctx, parallelConfig, parallelId)
branchCount = resolved.branchCount
items = resolved.items
isEmpty = resolved.isEmpty ?? false
} catch (error) {
const errorMessage = `Parallel Items did not resolve: ${error instanceof Error ? error.message : String(error)}`
logger.error(errorMessage, { parallelId, distribution: parallelConfig.distribution })
@@ -91,6 +93,34 @@ export class ParallelOrchestrator {
throw new Error(branchError)
}
// Handle empty distribution - skip parallel body
if (isEmpty || branchCount === 0) {
const scope: ParallelScope = {
parallelId,
totalBranches: 0,
branchOutputs: new Map(),
completedCount: 0,
totalExpectedNodes: 0,
items: [],
isEmpty: true,
}
if (!ctx.parallelExecutions) {
ctx.parallelExecutions = new Map()
}
ctx.parallelExecutions.set(parallelId, scope)
// Set empty output for the parallel
this.state.setBlockOutput(parallelId, { results: [] })
logger.info('Parallel scope initialized with empty distribution, skipping body', {
parallelId,
branchCount: 0,
})
return scope
}
const { entryNodes } = this.expander.expandParallel(this.dag, parallelId, branchCount, items)
const scope: ParallelScope = {
@@ -127,15 +157,17 @@ export class ParallelOrchestrator {
private resolveBranchCount(
ctx: ExecutionContext,
config: SerializedParallel
): { branchCount: number; items?: any[] } {
config: SerializedParallel,
parallelId: string
): { branchCount: number; items?: any[]; isEmpty?: boolean } {
if (config.parallelType === 'count') {
return { branchCount: config.count ?? 1 }
}
const items = this.resolveDistributionItems(ctx, config)
if (items.length === 0) {
return { branchCount: config.count ?? 1 }
logger.info('Parallel has empty distribution, skipping parallel body', { parallelId })
return { branchCount: 0, items: [], isEmpty: true }
}
return { branchCount: items.length, items }
@@ -228,9 +260,17 @@ export class ParallelOrchestrator {
const branchOutputs = scope.branchOutputs.get(i) || []
results.push(branchOutputs)
}
this.state.setBlockOutput(parallelId, {
results,
})
const output = { results }
this.state.setBlockOutput(parallelId, output)
// Emit onBlockComplete for the parallel container so the UI can track it
if (this.contextExtensions?.onBlockComplete) {
this.contextExtensions.onBlockComplete(parallelId, 'Parallel', 'parallel', {
output,
executionTime: 0,
})
}
return {
allBranchesComplete: true,
results,

View File

@@ -1,6 +1,7 @@
import type { TraceSpan } from '@/lib/logs/types'
import type { PermissionGroupConfig } from '@/lib/permission-groups/types'
import type { BlockOutput } from '@/blocks/types'
import type { RunFromBlockContext } from '@/executor/utils/run-from-block'
import type { SerializedBlock, SerializedWorkflow } from '@/serializer/types'
export interface UserFile {
@@ -250,6 +251,17 @@ export interface ExecutionContext {
* will not have their base64 content fetched.
*/
base64MaxBytes?: number
/**
* Context for "run from block" mode. When present, only blocks in dirtySet
* will be executed; others return cached outputs from the source snapshot.
*/
runFromBlockContext?: RunFromBlockContext
/**
* Stop execution after this block completes. Used for "run until block" feature.
*/
stopAfterBlockId?: string
}
export interface ExecutionResult {

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,219 @@
import { LOOP, PARALLEL } from '@/executor/constants'
import type { DAG } from '@/executor/dag/builder'
/**
* Builds the sentinel-start node ID for a loop.
*/
function buildLoopSentinelStartId(loopId: string): string {
return `${LOOP.SENTINEL.PREFIX}${loopId}${LOOP.SENTINEL.START_SUFFIX}`
}
/**
* Builds the sentinel-start node ID for a parallel.
*/
function buildParallelSentinelStartId(parallelId: string): string {
return `${PARALLEL.SENTINEL.PREFIX}${parallelId}${PARALLEL.SENTINEL.START_SUFFIX}`
}
/**
* Checks if a block ID is a loop or parallel container and returns the sentinel-start ID if so.
* Returns null if the block is not a container.
*/
export function resolveContainerToSentinelStart(blockId: string, dag: DAG): string | null {
if (dag.loopConfigs.has(blockId)) {
return buildLoopSentinelStartId(blockId)
}
if (dag.parallelConfigs.has(blockId)) {
return buildParallelSentinelStartId(blockId)
}
return null
}
/**
* Result of validating a block for run-from-block execution.
*/
export interface RunFromBlockValidation {
valid: boolean
error?: string
}
/**
* Context for run-from-block execution mode.
*/
export interface RunFromBlockContext {
/** The block ID to start execution from */
startBlockId: string
/** Set of block IDs that need re-execution (start block + all downstream) */
dirtySet: Set<string>
}
/**
* Result of computing execution sets for run-from-block mode.
*/
export interface ExecutionSets {
/** Blocks that need re-execution (start block + all downstream) */
dirtySet: Set<string>
/** Blocks that are upstream (ancestors) of the start block */
upstreamSet: Set<string>
/** Blocks that are upstream of any dirty block (for snapshot preservation) */
reachableUpstreamSet: Set<string>
}
/**
* Computes the dirty set, upstream set, and reachable upstream set.
* - Dirty set: start block + all blocks reachable via outgoing edges (need re-execution)
* - Upstream set: all blocks reachable via incoming edges from the start block
* - Reachable upstream set: all non-dirty blocks that are upstream of ANY dirty block
* (includes sibling branches that dirty blocks may reference)
*
* For loop/parallel containers, starts from the sentinel-start node and includes
* the container ID itself in the dirty set.
*
* @param dag - The workflow DAG
* @param startBlockId - The block to start execution from
* @returns Object containing dirtySet, upstreamSet, and reachableUpstreamSet
*/
export function computeExecutionSets(dag: DAG, startBlockId: string): ExecutionSets {
const dirty = new Set<string>([startBlockId])
const upstream = new Set<string>()
const sentinelStartId = resolveContainerToSentinelStart(startBlockId, dag)
const traversalStartId = sentinelStartId ?? startBlockId
if (sentinelStartId) {
dirty.add(sentinelStartId)
}
// BFS downstream for dirty set
const downstreamQueue = [traversalStartId]
while (downstreamQueue.length > 0) {
const nodeId = downstreamQueue.shift()!
const node = dag.nodes.get(nodeId)
if (!node) continue
for (const [, edge] of node.outgoingEdges) {
if (!dirty.has(edge.target)) {
dirty.add(edge.target)
downstreamQueue.push(edge.target)
}
}
}
// BFS upstream from start block for upstream set
const upstreamQueue = [traversalStartId]
while (upstreamQueue.length > 0) {
const nodeId = upstreamQueue.shift()!
const node = dag.nodes.get(nodeId)
if (!node) continue
for (const sourceId of node.incomingEdges) {
if (!upstream.has(sourceId)) {
upstream.add(sourceId)
upstreamQueue.push(sourceId)
}
}
}
// Compute reachable upstream: all non-dirty blocks upstream of ANY dirty block
// This handles the case where a dirty block (like C in A->C, B->C) may reference
// sibling branches (like B when running from A)
const reachableUpstream = new Set<string>()
for (const dirtyNodeId of dirty) {
const node = dag.nodes.get(dirtyNodeId)
if (!node) continue
// BFS upstream from this dirty node
const queue = [...node.incomingEdges]
while (queue.length > 0) {
const sourceId = queue.shift()!
if (reachableUpstream.has(sourceId) || dirty.has(sourceId)) continue
reachableUpstream.add(sourceId)
const sourceNode = dag.nodes.get(sourceId)
if (sourceNode) {
queue.push(...sourceNode.incomingEdges)
}
}
}
return { dirtySet: dirty, upstreamSet: upstream, reachableUpstreamSet: reachableUpstream }
}
/**
* Validates that a block can be used as a run-from-block starting point.
*
* Validation rules:
* - Block must exist in the DAG (or be a loop/parallel container)
* - Block cannot be inside a loop (but loop containers are allowed)
* - Block cannot be inside a parallel (but parallel containers are allowed)
* - Block cannot be a sentinel node
* - All upstream dependencies must have been executed (have cached outputs)
*
* @param blockId - The block ID to validate
* @param dag - The workflow DAG
* @param executedBlocks - Set of blocks that were executed in the source run
* @returns Validation result with error message if invalid
*/
export function validateRunFromBlock(
blockId: string,
dag: DAG,
executedBlocks: Set<string>
): RunFromBlockValidation {
const node = dag.nodes.get(blockId)
const isLoopContainer = dag.loopConfigs.has(blockId)
const isParallelContainer = dag.parallelConfigs.has(blockId)
const isContainer = isLoopContainer || isParallelContainer
if (!node && !isContainer) {
return { valid: false, error: `Block not found in workflow: ${blockId}` }
}
if (isContainer) {
const sentinelStartId = resolveContainerToSentinelStart(blockId, dag)
if (!sentinelStartId || !dag.nodes.has(sentinelStartId)) {
return {
valid: false,
error: `Container sentinel not found for: ${blockId}`,
}
}
}
if (node) {
if (node.metadata.isLoopNode) {
return {
valid: false,
error: `Cannot run from block inside loop: ${node.metadata.loopId}`,
}
}
if (node.metadata.isParallelBranch) {
return {
valid: false,
error: `Cannot run from block inside parallel: ${node.metadata.parallelId}`,
}
}
if (node.metadata.isSentinel) {
return { valid: false, error: 'Cannot run from sentinel node' }
}
// Check immediate upstream dependencies were executed
for (const sourceId of node.incomingEdges) {
const sourceNode = dag.nodes.get(sourceId)
// Skip sentinel nodes - they're internal and not in executedBlocks
if (sourceNode?.metadata.isSentinel) continue
// Skip trigger nodes - they're entry points and don't need prior execution
// A trigger node has no incoming edges
if (sourceNode && sourceNode.incomingEdges.size === 0) continue
if (!executedBlocks.has(sourceId)) {
return {
valid: false,
error: `Upstream dependency not executed: ${sourceId}`,
}
}
}
}
return { valid: true }
}

View File

@@ -1,10 +1,85 @@
import { useCallback, useRef } from 'react'
import { createLogger } from '@sim/logger'
import type { ExecutionEvent } from '@/lib/workflows/executor/execution-events'
import type { SerializableExecutionState } from '@/executor/execution/types'
import type { SubflowType } from '@/stores/workflows/workflow/types'
const logger = createLogger('useExecutionStream')
/**
* Processes SSE events from a response body and invokes appropriate callbacks.
*/
async function processSSEStream(
reader: ReadableStreamDefaultReader<Uint8Array>,
callbacks: ExecutionStreamCallbacks,
logPrefix: string
): Promise<void> {
const decoder = new TextDecoder()
let buffer = ''
try {
while (true) {
const { done, value } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
const lines = buffer.split('\n\n')
buffer = lines.pop() || ''
for (const line of lines) {
if (!line.trim() || !line.startsWith('data: ')) continue
const data = line.substring(6).trim()
if (data === '[DONE]') {
logger.info(`${logPrefix} stream completed`)
continue
}
try {
const event = JSON.parse(data) as ExecutionEvent
switch (event.type) {
case 'execution:started':
callbacks.onExecutionStarted?.(event.data)
break
case 'execution:completed':
callbacks.onExecutionCompleted?.(event.data)
break
case 'execution:error':
callbacks.onExecutionError?.(event.data)
break
case 'execution:cancelled':
callbacks.onExecutionCancelled?.(event.data)
break
case 'block:started':
callbacks.onBlockStarted?.(event.data)
break
case 'block:completed':
callbacks.onBlockCompleted?.(event.data)
break
case 'block:error':
callbacks.onBlockError?.(event.data)
break
case 'stream:chunk':
callbacks.onStreamChunk?.(event.data)
break
case 'stream:done':
callbacks.onStreamDone?.(event.data)
break
default:
logger.warn('Unknown event type:', (event as any).type)
}
} catch (error) {
logger.error('Failed to parse SSE event:', error, { data })
}
}
}
} finally {
reader.releaseLock()
}
}
export interface ExecutionStreamCallbacks {
onExecutionStarted?: (data: { startTime: string }) => void
onExecutionCompleted?: (data: {
@@ -68,6 +143,15 @@ export interface ExecuteStreamOptions {
loops?: Record<string, any>
parallels?: Record<string, any>
}
stopAfterBlockId?: string
callbacks?: ExecutionStreamCallbacks
}
export interface ExecuteFromBlockOptions {
workflowId: string
startBlockId: string
sourceSnapshot: SerializableExecutionState
input?: any
callbacks?: ExecutionStreamCallbacks
}
@@ -119,91 +203,7 @@ export function useExecutionStream() {
}
const reader = response.body.getReader()
const decoder = new TextDecoder()
let buffer = ''
try {
while (true) {
const { done, value } = await reader.read()
if (done) {
break
}
buffer += decoder.decode(value, { stream: true })
const lines = buffer.split('\n\n')
buffer = lines.pop() || ''
for (const line of lines) {
if (!line.trim() || !line.startsWith('data: ')) {
continue
}
const data = line.substring(6).trim()
if (data === '[DONE]') {
logger.info('Stream completed')
continue
}
try {
const event = JSON.parse(data) as ExecutionEvent
logger.info('📡 SSE Event received:', {
type: event.type,
executionId: event.executionId,
data: event.data,
})
switch (event.type) {
case 'execution:started':
logger.info('🚀 Execution started')
callbacks.onExecutionStarted?.(event.data)
break
case 'execution:completed':
logger.info('✅ Execution completed')
callbacks.onExecutionCompleted?.(event.data)
break
case 'execution:error':
logger.error('❌ Execution error')
callbacks.onExecutionError?.(event.data)
break
case 'execution:cancelled':
logger.warn('🛑 Execution cancelled')
callbacks.onExecutionCancelled?.(event.data)
break
case 'block:started':
logger.info('🔷 Block started:', event.data.blockId)
callbacks.onBlockStarted?.(event.data)
break
case 'block:completed':
logger.info('✓ Block completed:', event.data.blockId)
callbacks.onBlockCompleted?.(event.data)
break
case 'block:error':
logger.error('✗ Block error:', event.data.blockId)
callbacks.onBlockError?.(event.data)
break
case 'stream:chunk':
callbacks.onStreamChunk?.(event.data)
break
case 'stream:done':
logger.info('Stream done:', event.data.blockId)
callbacks.onStreamDone?.(event.data)
break
default:
logger.warn('Unknown event type:', (event as any).type)
}
} catch (error) {
logger.error('Failed to parse SSE event:', error, { data })
}
}
}
} finally {
reader.releaseLock()
}
await processSSEStream(reader, callbacks, 'Execution')
} catch (error: any) {
if (error.name === 'AbortError') {
logger.info('Execution stream cancelled')
@@ -222,6 +222,70 @@ export function useExecutionStream() {
}
}, [])
const executeFromBlock = useCallback(async (options: ExecuteFromBlockOptions) => {
const { workflowId, startBlockId, sourceSnapshot, input, callbacks = {} } = options
if (abortControllerRef.current) {
abortControllerRef.current.abort()
}
const abortController = new AbortController()
abortControllerRef.current = abortController
currentExecutionRef.current = null
try {
const response = await fetch(`/api/workflows/${workflowId}/execute-from-block`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ startBlockId, sourceSnapshot, input }),
signal: abortController.signal,
})
if (!response.ok) {
let errorResponse: any
try {
errorResponse = await response.json()
} catch {
throw new Error(`Server error (${response.status}): ${response.statusText}`)
}
const error = new Error(errorResponse.error || 'Failed to start execution')
if (errorResponse && typeof errorResponse === 'object') {
Object.assign(error, { executionResult: errorResponse })
}
throw error
}
if (!response.body) {
throw new Error('No response body')
}
const executionId = response.headers.get('X-Execution-Id')
if (executionId) {
currentExecutionRef.current = { workflowId, executionId }
}
const reader = response.body.getReader()
await processSSEStream(reader, callbacks, 'Run-from-block')
} catch (error: any) {
if (error.name === 'AbortError') {
logger.info('Run-from-block execution cancelled')
callbacks.onExecutionCancelled?.({ duration: 0 })
} else {
logger.error('Run-from-block execution error:', error)
callbacks.onExecutionError?.({
error: error.message || 'Unknown error',
duration: 0,
})
}
throw error
} finally {
abortControllerRef.current = null
currentExecutionRef.current = null
}
}, [])
const cancel = useCallback(() => {
const execution = currentExecutionRef.current
if (execution) {
@@ -239,6 +303,7 @@ export function useExecutionStream() {
return {
execute,
executeFromBlock,
cancel,
}
}

View File

@@ -86,7 +86,13 @@ describe('SnapshotService', () => {
type: 'agent',
position: { x: 100, y: 200 },
subBlocks: {},
subBlocks: {
prompt: {
id: 'prompt',
type: 'short-input',
value: 'Hello world',
},
},
outputs: {},
enabled: true,
horizontalHandles: true,
@@ -104,8 +110,14 @@ describe('SnapshotService', () => {
blocks: {
block1: {
...baseState.blocks.block1,
// Different block state - we can change outputs to make it different
outputs: { response: { type: 'string', description: 'different result' } },
// Different subBlock value - this is a meaningful change
subBlocks: {
prompt: {
id: 'prompt',
type: 'short-input',
value: 'Different prompt',
},
},
},
},
}

View File

@@ -11,12 +11,7 @@ import type {
WorkflowExecutionSnapshotInsert,
WorkflowState,
} from '@/lib/logs/types'
import {
normalizedStringify,
normalizeEdge,
normalizeValue,
sortEdges,
} from '@/lib/workflows/comparison'
import { normalizedStringify, normalizeWorkflowState } from '@/lib/workflows/comparison'
const logger = createLogger('SnapshotService')
@@ -38,7 +33,9 @@ export class SnapshotService implements ISnapshotService {
const existingSnapshot = await this.getSnapshotByHash(workflowId, stateHash)
if (existingSnapshot) {
logger.debug(`Reusing existing snapshot for workflow ${workflowId} with hash ${stateHash}`)
logger.info(
`Reusing existing snapshot for workflow ${workflowId} (hash: ${stateHash.slice(0, 12)}...)`
)
return {
snapshot: existingSnapshot,
isNew: false,
@@ -59,8 +56,9 @@ export class SnapshotService implements ISnapshotService {
.values(snapshotData)
.returning()
logger.debug(`Created new snapshot for workflow ${workflowId} with hash ${stateHash}`)
logger.debug(`Stored full state with ${Object.keys(state.blocks || {}).length} blocks`)
logger.info(
`Created new snapshot for workflow ${workflowId} (hash: ${stateHash.slice(0, 12)}..., blocks: ${Object.keys(state.blocks || {}).length})`
)
return {
snapshot: {
...newSnapshot,
@@ -112,7 +110,7 @@ export class SnapshotService implements ISnapshotService {
}
computeStateHash(state: WorkflowState): string {
const normalizedState = this.normalizeStateForHashing(state)
const normalizedState = normalizeWorkflowState(state)
const stateString = normalizedStringify(normalizedState)
return createHash('sha256').update(stateString).digest('hex')
}
@@ -130,69 +128,6 @@ export class SnapshotService implements ISnapshotService {
logger.info(`Cleaned up ${deletedCount} orphaned snapshots older than ${olderThanDays} days`)
return deletedCount
}
private normalizeStateForHashing(state: WorkflowState): any {
// 1. Normalize and sort edges
const normalizedEdges = sortEdges((state.edges || []).map(normalizeEdge))
// 2. Normalize blocks
const normalizedBlocks: Record<string, any> = {}
for (const [blockId, block] of Object.entries(state.blocks || {})) {
const { position, layout, height, ...blockWithoutLayoutFields } = block
// Also exclude width/height from data object (container dimensions from autolayout)
const {
width: _dataWidth,
height: _dataHeight,
...dataRest
} = blockWithoutLayoutFields.data || {}
// Normalize subBlocks
const subBlocks = blockWithoutLayoutFields.subBlocks || {}
const normalizedSubBlocks: Record<string, any> = {}
for (const [subBlockId, subBlock] of Object.entries(subBlocks)) {
const value = subBlock.value ?? null
normalizedSubBlocks[subBlockId] = {
type: subBlock.type,
value: normalizeValue(value),
...Object.fromEntries(
Object.entries(subBlock).filter(([key]) => key !== 'value' && key !== 'type')
),
}
}
normalizedBlocks[blockId] = {
...blockWithoutLayoutFields,
data: dataRest,
subBlocks: normalizedSubBlocks,
}
}
// 3. Normalize loops and parallels
const normalizedLoops: Record<string, any> = {}
for (const [loopId, loop] of Object.entries(state.loops || {})) {
normalizedLoops[loopId] = normalizeValue(loop)
}
const normalizedParallels: Record<string, any> = {}
for (const [parallelId, parallel] of Object.entries(state.parallels || {})) {
normalizedParallels[parallelId] = normalizeValue(parallel)
}
// 4. Normalize variables (if present)
const normalizedVariables = state.variables ? normalizeValue(state.variables) : undefined
return {
blocks: normalizedBlocks,
edges: normalizedEdges,
loops: normalizedLoops,
parallels: normalizedParallels,
...(normalizedVariables !== undefined && { variables: normalizedVariables }),
}
}
}
export const snapshotService = new SnapshotService()

View File

@@ -1,34 +1,10 @@
import type { BlockState, WorkflowState } from '@/stores/workflows/workflow/types'
import { SYSTEM_SUBBLOCK_IDS, TRIGGER_RUNTIME_SUBBLOCK_IDS } from '@/triggers/constants'
import {
normalizedStringify,
normalizeEdge,
normalizeLoop,
normalizeParallel,
normalizeValue,
normalizeVariables,
sanitizeInputFormat,
sanitizeTools,
sanitizeVariable,
sortEdges,
} from './normalize'
/** Block with optional diff markers added by copilot */
type BlockWithDiffMarkers = BlockState & {
is_diff?: string
field_diffs?: Record<string, unknown>
}
/** SubBlock with optional diff marker */
type SubBlockWithDiffMarker = {
id: string
type: string
value: unknown
is_diff?: string
}
import type { WorkflowState } from '@/stores/workflows/workflow/types'
import { normalizedStringify, normalizeWorkflowState } from './normalize'
/**
* Compare the current workflow state with the deployed state to detect meaningful changes
* Compare the current workflow state with the deployed state to detect meaningful changes.
* Uses the shared normalizeWorkflowState function to ensure consistency with snapshot hashing.
*
* @param currentState - The current workflow state
* @param deployedState - The deployed workflow state
* @returns True if there are meaningful changes, false if only position changes or no changes
@@ -40,236 +16,106 @@ export function hasWorkflowChanged(
// If no deployed state exists, then the workflow has changed
if (!deployedState) return true
// 1. Compare edges (connections between blocks)
const currentEdges = currentState.edges || []
const deployedEdges = deployedState.edges || []
const normalizedCurrent = normalizeWorkflowState(currentState)
const normalizedDeployed = normalizeWorkflowState(deployedState)
const normalizedCurrentEdges = sortEdges(currentEdges.map(normalizeEdge))
const normalizedDeployedEdges = sortEdges(deployedEdges.map(normalizeEdge))
const currentStr = normalizedStringify(normalizedCurrent)
const deployedStr = normalizedStringify(normalizedDeployed)
if (
normalizedStringify(normalizedCurrentEdges) !== normalizedStringify(normalizedDeployedEdges)
) {
return true
}
if (currentStr !== deployedStr) {
// Debug: Find what's different
console.log('[hasWorkflowChanged] Detected differences:')
// 2. Compare blocks and their configurations
const currentBlockIds = Object.keys(currentState.blocks || {}).sort()
const deployedBlockIds = Object.keys(deployedState.blocks || {}).sort()
if (
currentBlockIds.length !== deployedBlockIds.length ||
normalizedStringify(currentBlockIds) !== normalizedStringify(deployedBlockIds)
) {
return true
}
// 3. Build normalized representations of blocks for comparison
const normalizedCurrentBlocks: Record<string, unknown> = {}
const normalizedDeployedBlocks: Record<string, unknown> = {}
for (const blockId of currentBlockIds) {
const currentBlock = currentState.blocks[blockId]
const deployedBlock = deployedState.blocks[blockId]
// Destructure and exclude non-functional fields:
// - position: visual positioning only
// - subBlocks: handled separately below
// - layout: contains measuredWidth/measuredHeight from autolayout
// - height: block height measurement from autolayout
// - outputs: derived from subBlocks (e.g., inputFormat), already compared via subBlocks
// - is_diff, field_diffs: diff markers from copilot edits
const currentBlockWithDiff = currentBlock as BlockWithDiffMarkers
const deployedBlockWithDiff = deployedBlock as BlockWithDiffMarkers
const {
position: _currentPos,
subBlocks: currentSubBlocks = {},
layout: _currentLayout,
height: _currentHeight,
outputs: _currentOutputs,
is_diff: _currentIsDiff,
field_diffs: _currentFieldDiffs,
...currentRest
} = currentBlockWithDiff
const {
position: _deployedPos,
subBlocks: deployedSubBlocks = {},
layout: _deployedLayout,
height: _deployedHeight,
outputs: _deployedOutputs,
is_diff: _deployedIsDiff,
field_diffs: _deployedFieldDiffs,
...deployedRest
} = deployedBlockWithDiff
// Also exclude width/height from data object (container dimensions from autolayout)
const {
width: _currentDataWidth,
height: _currentDataHeight,
...currentDataRest
} = currentRest.data || {}
const {
width: _deployedDataWidth,
height: _deployedDataHeight,
...deployedDataRest
} = deployedRest.data || {}
normalizedCurrentBlocks[blockId] = {
...currentRest,
data: currentDataRest,
subBlocks: undefined,
// Compare edges
if (
normalizedStringify(normalizedCurrent.edges) !== normalizedStringify(normalizedDeployed.edges)
) {
console.log(' - Edges differ')
console.log(' Current:', JSON.stringify(normalizedCurrent.edges, null, 2))
console.log(' Deployed:', JSON.stringify(normalizedDeployed.edges, null, 2))
}
normalizedDeployedBlocks[blockId] = {
...deployedRest,
data: deployedDataRest,
subBlocks: undefined,
}
// Compare blocks
const currentBlockIds = Object.keys(normalizedCurrent.blocks).sort()
const deployedBlockIds = Object.keys(normalizedDeployed.blocks).sort()
// Get all subBlock IDs from both states, excluding runtime metadata and UI-only elements
const allSubBlockIds = [
...new Set([...Object.keys(currentSubBlocks), ...Object.keys(deployedSubBlocks)]),
]
.filter(
(id) => !TRIGGER_RUNTIME_SUBBLOCK_IDS.includes(id) && !SYSTEM_SUBBLOCK_IDS.includes(id)
)
.sort()
if (normalizedStringify(currentBlockIds) !== normalizedStringify(deployedBlockIds)) {
console.log(' - Block IDs differ')
console.log(' Current:', currentBlockIds)
console.log(' Deployed:', deployedBlockIds)
} else {
for (const blockId of currentBlockIds) {
const currentBlock = normalizedCurrent.blocks[blockId]
const deployedBlock = normalizedDeployed.blocks[blockId]
// Normalize and compare each subBlock
for (const subBlockId of allSubBlockIds) {
// If the subBlock doesn't exist in either state, there's a difference
if (!currentSubBlocks[subBlockId] || !deployedSubBlocks[subBlockId]) {
return true
}
if (normalizedStringify(currentBlock) !== normalizedStringify(deployedBlock)) {
console.log(` - Block "${blockId}" differs:`)
// Get values with special handling for null/undefined
// Using unknown type since sanitization functions return different types
let currentValue: unknown = currentSubBlocks[subBlockId].value ?? null
let deployedValue: unknown = deployedSubBlocks[subBlockId].value ?? null
// Compare subBlocks
const currentSubBlockIds = Object.keys(currentBlock.subBlocks || {}).sort()
const deployedSubBlockIds = Object.keys(deployedBlock.subBlocks || {}).sort()
if (subBlockId === 'tools' && Array.isArray(currentValue) && Array.isArray(deployedValue)) {
currentValue = sanitizeTools(currentValue)
deployedValue = sanitizeTools(deployedValue)
}
if (
normalizedStringify(currentSubBlockIds) !== normalizedStringify(deployedSubBlockIds)
) {
console.log(' SubBlock IDs differ:')
console.log(' Current:', currentSubBlockIds)
console.log(' Deployed:', deployedSubBlockIds)
} else {
for (const subBlockId of currentSubBlockIds) {
const currentSub = currentBlock.subBlocks[subBlockId]
const deployedSub = deployedBlock.subBlocks[subBlockId]
if (
subBlockId === 'inputFormat' &&
Array.isArray(currentValue) &&
Array.isArray(deployedValue)
) {
currentValue = sanitizeInputFormat(currentValue)
deployedValue = sanitizeInputFormat(deployedValue)
}
if (normalizedStringify(currentSub) !== normalizedStringify(deployedSub)) {
console.log(` SubBlock "${subBlockId}" differs:`)
console.log(' Current:', JSON.stringify(currentSub, null, 2))
console.log(' Deployed:', JSON.stringify(deployedSub, null, 2))
}
}
}
// For string values, compare directly to catch even small text changes
if (typeof currentValue === 'string' && typeof deployedValue === 'string') {
if (currentValue !== deployedValue) {
return true
}
} else {
// For other types, use normalized comparison
const normalizedCurrentValue = normalizeValue(currentValue)
const normalizedDeployedValue = normalizeValue(deployedValue)
// Compare block properties (excluding subBlocks)
const { subBlocks: _cs, ...currentBlockRest } = currentBlock
const { subBlocks: _ds, ...deployedBlockRest } = deployedBlock
if (
normalizedStringify(normalizedCurrentValue) !==
normalizedStringify(normalizedDeployedValue)
) {
return true
if (normalizedStringify(currentBlockRest) !== normalizedStringify(deployedBlockRest)) {
console.log(' Block properties differ:')
console.log(' Current:', JSON.stringify(currentBlockRest, null, 2))
console.log(' Deployed:', JSON.stringify(deployedBlockRest, null, 2))
}
}
}
// Compare type and other properties (excluding diff markers and value)
const currentSubBlockWithDiff = currentSubBlocks[subBlockId] as SubBlockWithDiffMarker
const deployedSubBlockWithDiff = deployedSubBlocks[subBlockId] as SubBlockWithDiffMarker
const { value: _cv, is_diff: _cd, ...currentSubBlockRest } = currentSubBlockWithDiff
const { value: _dv, is_diff: _dd, ...deployedSubBlockRest } = deployedSubBlockWithDiff
if (normalizedStringify(currentSubBlockRest) !== normalizedStringify(deployedSubBlockRest)) {
return true
}
}
const blocksEqual =
normalizedStringify(normalizedCurrentBlocks[blockId]) ===
normalizedStringify(normalizedDeployedBlocks[blockId])
if (!blocksEqual) {
return true
}
}
// 4. Compare loops
const currentLoops = currentState.loops || {}
const deployedLoops = deployedState.loops || {}
const currentLoopIds = Object.keys(currentLoops).sort()
const deployedLoopIds = Object.keys(deployedLoops).sort()
if (
currentLoopIds.length !== deployedLoopIds.length ||
normalizedStringify(currentLoopIds) !== normalizedStringify(deployedLoopIds)
) {
return true
}
for (const loopId of currentLoopIds) {
const normalizedCurrentLoop = normalizeValue(normalizeLoop(currentLoops[loopId]))
const normalizedDeployedLoop = normalizeValue(normalizeLoop(deployedLoops[loopId]))
// Compare loops
if (
normalizedStringify(normalizedCurrentLoop) !== normalizedStringify(normalizedDeployedLoop)
normalizedStringify(normalizedCurrent.loops) !== normalizedStringify(normalizedDeployed.loops)
) {
return true
console.log(' - Loops differ')
console.log(' Current:', JSON.stringify(normalizedCurrent.loops, null, 2))
console.log(' Deployed:', JSON.stringify(normalizedDeployed.loops, null, 2))
}
}
// 5. Compare parallels
const currentParallels = currentState.parallels || {}
const deployedParallels = deployedState.parallels || {}
const currentParallelIds = Object.keys(currentParallels).sort()
const deployedParallelIds = Object.keys(deployedParallels).sort()
if (
currentParallelIds.length !== deployedParallelIds.length ||
normalizedStringify(currentParallelIds) !== normalizedStringify(deployedParallelIds)
) {
return true
}
for (const parallelId of currentParallelIds) {
const normalizedCurrentParallel = normalizeValue(
normalizeParallel(currentParallels[parallelId])
)
const normalizedDeployedParallel = normalizeValue(
normalizeParallel(deployedParallels[parallelId])
)
// Compare parallels
if (
normalizedStringify(normalizedCurrentParallel) !==
normalizedStringify(normalizedDeployedParallel)
normalizedStringify(normalizedCurrent.parallels) !==
normalizedStringify(normalizedDeployed.parallels)
) {
return true
console.log(' - Parallels differ')
console.log(' Current:', JSON.stringify(normalizedCurrent.parallels, null, 2))
console.log(' Deployed:', JSON.stringify(normalizedDeployed.parallels, null, 2))
}
}
// 6. Compare variables
const currentVariables = normalizeVariables(currentState.variables)
const deployedVariables = normalizeVariables(deployedState.variables)
// Compare variables
if (
normalizedStringify(normalizedCurrent.variables) !==
normalizedStringify(normalizedDeployed.variables)
) {
console.log(' - Variables differ')
console.log(' Current:', JSON.stringify(normalizedCurrent.variables, null, 2))
console.log(' Deployed:', JSON.stringify(normalizedDeployed.variables, null, 2))
}
const normalizedCurrentVars = normalizeValue(
Object.fromEntries(Object.entries(currentVariables).map(([id, v]) => [id, sanitizeVariable(v)]))
)
const normalizedDeployedVars = normalizeValue(
Object.fromEntries(
Object.entries(deployedVariables).map(([id, v]) => [id, sanitizeVariable(v)])
)
)
if (normalizedStringify(normalizedCurrentVars) !== normalizedStringify(normalizedDeployedVars)) {
return true
}

View File

@@ -1,7 +1,15 @@
export { hasWorkflowChanged } from './compare'
export type { NormalizedWorkflowState } from './normalize'
export {
normalizedStringify,
normalizeEdge,
normalizeLoop,
normalizeParallel,
normalizeValue,
normalizeVariables,
normalizeWorkflowState,
sanitizeInputFormat,
sanitizeTools,
sanitizeVariable,
sortEdges,
} from './normalize'

View File

@@ -4,7 +4,14 @@
*/
import type { Edge } from 'reactflow'
import type { Loop, Parallel, Variable } from '@/stores/workflows/workflow/types'
import type {
BlockState,
Loop,
Parallel,
Variable,
WorkflowState,
} from '@/stores/workflows/workflow/types'
import { SYSTEM_SUBBLOCK_IDS, TRIGGER_RUNTIME_SUBBLOCK_IDS } from '@/triggers/constants'
/**
* Normalizes a value for consistent comparison by sorting object keys recursively
@@ -220,3 +227,155 @@ export function sortEdges(
)
)
}
/** Block with optional diff markers added by copilot */
type BlockWithDiffMarkers = BlockState & {
is_diff?: string
field_diffs?: Record<string, unknown>
}
/** SubBlock with optional diff marker */
type SubBlockWithDiffMarker = {
id: string
type: string
value: unknown
is_diff?: string
}
/** Normalized block structure for comparison */
interface NormalizedBlock {
[key: string]: unknown
data: Record<string, unknown>
subBlocks: Record<string, NormalizedSubBlock>
}
/** Normalized subBlock structure */
interface NormalizedSubBlock {
[key: string]: unknown
value: unknown
}
/** Normalized workflow state structure */
export interface NormalizedWorkflowState {
blocks: Record<string, NormalizedBlock>
edges: Array<{
source: string
sourceHandle?: string | null
target: string
targetHandle?: string | null
}>
loops: Record<string, unknown>
parallels: Record<string, unknown>
variables: unknown
}
/**
* Normalizes a workflow state for comparison or hashing.
* Excludes non-functional fields (position, layout, height, outputs, diff markers)
* and system/trigger runtime subBlocks.
*
* @param state - The workflow state to normalize
* @returns A normalized workflow state suitable for comparison or hashing
*/
export function normalizeWorkflowState(state: WorkflowState): NormalizedWorkflowState {
// 1. Normalize and sort edges (connection-relevant fields only)
const normalizedEdges = sortEdges((state.edges || []).map(normalizeEdge))
// 2. Normalize blocks
const normalizedBlocks: Record<string, NormalizedBlock> = {}
for (const [blockId, block] of Object.entries(state.blocks || {})) {
const blockWithDiff = block as BlockWithDiffMarkers
// Exclude non-functional fields:
// - position: visual positioning only
// - layout: contains measuredWidth/measuredHeight from autolayout
// - height: block height measurement from autolayout
// - outputs: derived from subBlocks, already compared via subBlocks
// - is_diff, field_diffs: diff markers from copilot edits
// - subBlocks: handled separately
const {
position: _position,
subBlocks: blockSubBlocks = {},
layout: _layout,
height: _height,
outputs: _outputs,
is_diff: _isDiff,
field_diffs: _fieldDiffs,
...blockRest
} = blockWithDiff
// Exclude from data object:
// - width/height: container dimensions from autolayout
// - nodes: subflow node membership (derived/runtime for parallel/loop blocks)
// - distribution: parallel distribution (derived/runtime)
const {
width: _dataWidth,
height: _dataHeight,
nodes: _dataNodes,
distribution: _dataDistribution,
...dataRest
} = (blockRest.data || {}) as Record<string, unknown>
// Filter and normalize subBlocks (exclude system/trigger runtime subBlocks)
const normalizedSubBlocks: Record<string, NormalizedSubBlock> = {}
const subBlockIds = Object.keys(blockSubBlocks)
.filter(
(id) => !SYSTEM_SUBBLOCK_IDS.includes(id) && !TRIGGER_RUNTIME_SUBBLOCK_IDS.includes(id)
)
.sort()
for (const subBlockId of subBlockIds) {
const subBlock = blockSubBlocks[subBlockId] as SubBlockWithDiffMarker
let value: unknown = subBlock.value ?? null
// Sanitize UI-only fields from tools and inputFormat
if (subBlockId === 'tools' && Array.isArray(value)) {
value = sanitizeTools(value)
}
if (subBlockId === 'inputFormat' && Array.isArray(value)) {
value = sanitizeInputFormat(value)
}
// Exclude diff markers from subBlock
const { value: _v, is_diff: _sd, ...subBlockRest } = subBlock
normalizedSubBlocks[subBlockId] = {
...subBlockRest,
value: normalizeValue(value),
}
}
normalizedBlocks[blockId] = {
...blockRest,
data: dataRest,
subBlocks: normalizedSubBlocks,
}
}
// 3. Normalize loops using specialized normalizeLoop (extracts only type-relevant fields)
const normalizedLoops: Record<string, unknown> = {}
for (const [loopId, loop] of Object.entries(state.loops || {})) {
normalizedLoops[loopId] = normalizeValue(normalizeLoop(loop))
}
// 4. Normalize parallels using specialized normalizeParallel
const normalizedParallels: Record<string, unknown> = {}
for (const [parallelId, parallel] of Object.entries(state.parallels || {})) {
normalizedParallels[parallelId] = normalizeValue(normalizeParallel(parallel))
}
// 5. Normalize variables (remove UI-only validationError field)
const variables = normalizeVariables(state.variables)
const normalizedVariablesObj = normalizeValue(
Object.fromEntries(Object.entries(variables).map(([id, v]) => [id, sanitizeVariable(v)]))
)
return {
blocks: normalizedBlocks,
edges: normalizedEdges,
loops: normalizedLoops,
parallels: normalizedParallels,
variables: normalizedVariablesObj,
}
}

View File

@@ -23,9 +23,11 @@ import type {
ContextExtensions,
ExecutionCallbacks,
IterationContext,
SerializableExecutionState,
} from '@/executor/execution/types'
import type { ExecutionResult, NormalizedBlockOutput } from '@/executor/types'
import { hasExecutionResult } from '@/executor/utils/errors'
import { buildParallelSentinelEndId, buildSentinelEndId } from '@/executor/utils/subflow-utils'
import { Serializer } from '@/serializer'
const logger = createLogger('ExecutionCore')
@@ -40,6 +42,12 @@ export interface ExecuteWorkflowCoreOptions {
abortSignal?: AbortSignal
includeFileBase64?: boolean
base64MaxBytes?: number
stopAfterBlockId?: string
/** Run-from-block mode: execute starting from a specific block using cached upstream outputs */
runFromBlock?: {
startBlockId: string
sourceSnapshot: SerializableExecutionState
}
}
function parseVariableValueByType(value: unknown, type: string): unknown {
@@ -114,6 +122,8 @@ export async function executeWorkflowCore(
abortSignal,
includeFileBase64,
base64MaxBytes,
stopAfterBlockId,
runFromBlock,
} = options
const { metadata, workflow, input, workflowVariables, selectedOutputs } = snapshot
const { requestId, workflowId, userId, triggerType, executionId, triggerBlockId, useDraftState } =
@@ -246,6 +256,16 @@ export async function executeWorkflowCore(
processedInput = input || {}
// Resolve stopAfterBlockId for loop/parallel containers to their sentinel-end IDs
let resolvedStopAfterBlockId = stopAfterBlockId
if (stopAfterBlockId) {
if (serializedWorkflow.loops?.[stopAfterBlockId]) {
resolvedStopAfterBlockId = buildSentinelEndId(stopAfterBlockId)
} else if (serializedWorkflow.parallels?.[stopAfterBlockId]) {
resolvedStopAfterBlockId = buildParallelSentinelEndId(stopAfterBlockId)
}
}
// Create and execute workflow with callbacks
if (resumeFromSnapshot) {
logger.info(`[${requestId}] Resume execution detected`, {
@@ -296,6 +316,7 @@ export async function executeWorkflowCore(
abortSignal,
includeFileBase64,
base64MaxBytes,
stopAfterBlockId: resolvedStopAfterBlockId,
}
const executorInstance = new Executor({
@@ -318,10 +339,13 @@ export async function executeWorkflowCore(
}
}
const result = (await executorInstance.execute(
workflowId,
resolvedTriggerBlockId
)) as ExecutionResult
const result = runFromBlock
? ((await executorInstance.executeFromBlock(
workflowId,
runFromBlock.startBlockId,
runFromBlock.sourceSnapshot
)) as ExecutionResult)
: ((await executorInstance.execute(workflowId, resolvedTriggerBlockId)) as ExecutionResult)
// Build trace spans for logging from the full execution result
const { traceSpans, totalDuration } = buildTraceSpans(result)

View File

@@ -180,3 +180,140 @@ export function formatSSEEvent(event: ExecutionEvent): string {
export function encodeSSEEvent(event: ExecutionEvent): Uint8Array {
return new TextEncoder().encode(formatSSEEvent(event))
}
/**
* Options for creating SSE execution callbacks
*/
export interface SSECallbackOptions {
executionId: string
workflowId: string
controller: ReadableStreamDefaultController<Uint8Array>
isStreamClosed: () => boolean
setStreamClosed: () => void
}
/**
* Creates SSE callbacks for workflow execution streaming
*/
export function createSSECallbacks(options: SSECallbackOptions) {
const { executionId, workflowId, controller, isStreamClosed, setStreamClosed } = options
const sendEvent = (event: ExecutionEvent) => {
if (isStreamClosed()) return
try {
controller.enqueue(encodeSSEEvent(event))
} catch {
setStreamClosed()
}
}
const onBlockStart = async (
blockId: string,
blockName: string,
blockType: string,
iterationContext?: { iterationCurrent: number; iterationTotal: number; iterationType: string }
) => {
sendEvent({
type: 'block:started',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
blockId,
blockName,
blockType,
...(iterationContext && {
iterationCurrent: iterationContext.iterationCurrent,
iterationTotal: iterationContext.iterationTotal,
iterationType: iterationContext.iterationType as any,
}),
},
})
}
const onBlockComplete = async (
blockId: string,
blockName: string,
blockType: string,
callbackData: { input?: unknown; output: any; executionTime: number },
iterationContext?: { iterationCurrent: number; iterationTotal: number; iterationType: string }
) => {
const hasError = callbackData.output?.error
const iterationData = iterationContext
? {
iterationCurrent: iterationContext.iterationCurrent,
iterationTotal: iterationContext.iterationTotal,
iterationType: iterationContext.iterationType as any,
}
: {}
if (hasError) {
sendEvent({
type: 'block:error',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
blockId,
blockName,
blockType,
input: callbackData.input,
error: callbackData.output.error,
durationMs: callbackData.executionTime || 0,
...iterationData,
},
})
} else {
sendEvent({
type: 'block:completed',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
blockId,
blockName,
blockType,
input: callbackData.input,
output: callbackData.output,
durationMs: callbackData.executionTime || 0,
...iterationData,
},
})
}
}
const onStream = async (streamingExecution: unknown) => {
const streamingExec = streamingExecution as { stream: ReadableStream; execution: any }
const blockId = streamingExec.execution?.blockId
const reader = streamingExec.stream.getReader()
const decoder = new TextDecoder()
try {
while (true) {
const { done, value } = await reader.read()
if (done) break
const chunk = decoder.decode(value, { stream: true })
sendEvent({
type: 'stream:chunk',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: { blockId, chunk },
})
}
sendEvent({
type: 'stream:done',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: { blockId },
})
} finally {
try {
reader.releaseLock()
} catch {}
}
}
return { sendEvent, onBlockStart, onBlockComplete, onStream }
}

View File

@@ -35,4 +35,23 @@ export const useExecutionStore = create<ExecutionState & ExecutionActions>()((se
},
clearRunPath: () => set({ lastRunPath: new Map(), lastRunEdges: new Map() }),
reset: () => set(initialState),
setLastExecutionSnapshot: (workflowId, snapshot) => {
const { lastExecutionSnapshots } = get()
const newSnapshots = new Map(lastExecutionSnapshots)
newSnapshots.set(workflowId, snapshot)
set({ lastExecutionSnapshots: newSnapshots })
},
getLastExecutionSnapshot: (workflowId) => {
const { lastExecutionSnapshots } = get()
return lastExecutionSnapshots.get(workflowId)
},
clearLastExecutionSnapshot: (workflowId) => {
const { lastExecutionSnapshots } = get()
const newSnapshots = new Map(lastExecutionSnapshots)
newSnapshots.delete(workflowId)
set({ lastExecutionSnapshots: newSnapshots })
},
}))

View File

@@ -1,4 +1,5 @@
import type { Executor } from '@/executor'
import type { SerializableExecutionState } from '@/executor/execution/types'
import type { ExecutionContext } from '@/executor/types'
/**
@@ -18,16 +19,9 @@ export interface ExecutionState {
pendingBlocks: string[]
executor: Executor | null
debugContext: ExecutionContext | null
/**
* Tracks blocks from the last execution run and their success/error status.
* Cleared when a new run starts. Used to show run path indicators (rings on blocks).
*/
lastRunPath: Map<string, BlockRunStatus>
/**
* Tracks edges from the last execution run and their success/error status.
* Cleared when a new run starts. Used to show run path indicators on edges.
*/
lastRunEdges: Map<string, EdgeRunStatus>
lastExecutionSnapshots: Map<string, SerializableExecutionState>
}
export interface ExecutionActions {
@@ -41,6 +35,9 @@ export interface ExecutionActions {
setEdgeRunStatus: (edgeId: string, status: EdgeRunStatus) => void
clearRunPath: () => void
reset: () => void
setLastExecutionSnapshot: (workflowId: string, snapshot: SerializableExecutionState) => void
getLastExecutionSnapshot: (workflowId: string) => SerializableExecutionState | undefined
clearLastExecutionSnapshot: (workflowId: string) => void
}
export const initialState: ExecutionState = {
@@ -52,4 +49,5 @@ export const initialState: ExecutionState = {
debugContext: null,
lastRunPath: new Map(),
lastRunEdges: new Map(),
lastExecutionSnapshots: new Map(),
}

View File

@@ -1,15 +1,155 @@
import { RepeatIcon, SplitIcon } from 'lucide-react'
import { create } from 'zustand'
import type { SearchModalState } from './types'
import { devtools } from 'zustand/middleware'
import { getToolOperationsIndex } from '@/lib/search/tool-operations'
import { getTriggersForSidebar } from '@/lib/workflows/triggers/trigger-utils'
import { getAllBlocks } from '@/blocks'
import type {
SearchBlockItem,
SearchData,
SearchDocItem,
SearchModalState,
SearchToolOperationItem,
} from './types'
export const useSearchModalStore = create<SearchModalState>((set) => ({
isOpen: false,
setOpen: (open: boolean) => {
set({ isOpen: open })
},
open: () => {
set({ isOpen: true })
},
close: () => {
set({ isOpen: false })
},
}))
const initialData: SearchData = {
blocks: [],
tools: [],
triggers: [],
toolOperations: [],
docs: [],
isInitialized: false,
}
export const useSearchModalStore = create<SearchModalState>()(
devtools(
(set, get) => ({
isOpen: false,
data: initialData,
setOpen: (open: boolean) => {
set({ isOpen: open })
},
open: () => {
set({ isOpen: true })
},
close: () => {
set({ isOpen: false })
},
initializeData: (filterBlocks) => {
const allBlocks = getAllBlocks()
const filteredAllBlocks = filterBlocks(allBlocks) as typeof allBlocks
const regularBlocks: SearchBlockItem[] = []
const tools: SearchBlockItem[] = []
const docs: SearchDocItem[] = []
for (const block of filteredAllBlocks) {
if (block.hideFromToolbar) continue
const searchItem: SearchBlockItem = {
id: block.type,
name: block.name,
description: block.description || '',
icon: block.icon,
bgColor: block.bgColor || '#6B7280',
type: block.type,
}
if (block.category === 'blocks' && block.type !== 'starter') {
regularBlocks.push(searchItem)
} else if (block.category === 'tools') {
tools.push(searchItem)
}
if (block.docsLink) {
docs.push({
id: `docs-${block.type}`,
name: block.name,
icon: block.icon,
href: block.docsLink,
})
}
}
const specialBlocks: SearchBlockItem[] = [
{
id: 'loop',
name: 'Loop',
description: 'Create a Loop',
icon: RepeatIcon,
bgColor: '#2FB3FF',
type: 'loop',
},
{
id: 'parallel',
name: 'Parallel',
description: 'Parallel Execution',
icon: SplitIcon,
bgColor: '#FEE12B',
type: 'parallel',
},
]
const blocks = [...regularBlocks, ...(filterBlocks(specialBlocks) as SearchBlockItem[])]
const allTriggers = getTriggersForSidebar()
const filteredTriggers = filterBlocks(allTriggers) as typeof allTriggers
const priorityOrder = ['Start', 'Schedule', 'Webhook']
const sortedTriggers = [...filteredTriggers].sort((a, b) => {
const aIndex = priorityOrder.indexOf(a.name)
const bIndex = priorityOrder.indexOf(b.name)
const aHasPriority = aIndex !== -1
const bHasPriority = bIndex !== -1
if (aHasPriority && bHasPriority) return aIndex - bIndex
if (aHasPriority) return -1
if (bHasPriority) return 1
return a.name.localeCompare(b.name)
})
const triggers = sortedTriggers.map(
(block): SearchBlockItem => ({
id: block.type,
name: block.name,
description: block.description || '',
icon: block.icon,
bgColor: block.bgColor || '#6B7280',
type: block.type,
config: block,
})
)
const allowedBlockTypes = new Set(tools.map((t) => t.type))
const toolOperations: SearchToolOperationItem[] = getToolOperationsIndex()
.filter((op) => allowedBlockTypes.has(op.blockType))
.map((op) => ({
id: op.id,
name: op.operationName,
searchValue: `${op.serviceName} ${op.operationName}`,
icon: op.icon,
bgColor: op.bgColor,
blockType: op.blockType,
operationId: op.operationId,
keywords: op.aliases,
}))
set({
data: {
blocks,
tools,
triggers,
toolOperations,
docs,
isInitialized: true,
},
})
},
}),
{ name: 'search-modal-store' }
)
)

View File

@@ -1,3 +1,55 @@
import type { ComponentType } from 'react'
import type { BlockConfig } from '@/blocks/types'
/**
* Represents a block item in the search results.
*/
export interface SearchBlockItem {
id: string
name: string
description: string
icon: ComponentType<{ className?: string }>
bgColor: string
type: string
config?: BlockConfig
}
/**
* Represents a tool operation item in the search results.
*/
export interface SearchToolOperationItem {
id: string
name: string
searchValue: string
icon: ComponentType<{ className?: string }>
bgColor: string
blockType: string
operationId: string
keywords: string[]
}
/**
* Represents a doc item in the search results.
*/
export interface SearchDocItem {
id: string
name: string
icon: ComponentType<{ className?: string }>
href: string
}
/**
* Pre-computed search data that is initialized on app load.
*/
export interface SearchData {
blocks: SearchBlockItem[]
tools: SearchBlockItem[]
triggers: SearchBlockItem[]
toolOperations: SearchToolOperationItem[]
docs: SearchDocItem[]
isInitialized: boolean
}
/**
* Global state for the universal search modal.
*
@@ -8,18 +60,27 @@
export interface SearchModalState {
/** Whether the search modal is currently open. */
isOpen: boolean
/** Pre-computed search data. */
data: SearchData
/**
* Explicitly set the open state of the modal.
*
* @param open - New open state.
*/
setOpen: (open: boolean) => void
/**
* Convenience method to open the modal.
*/
open: () => void
/**
* Convenience method to close the modal.
*/
close: () => void
/**
* Initialize search data. Called once on app load.
*/
initializeData: (filterBlocks: <T extends { type: string }>(blocks: T[]) => T[]) => void
}

View File

@@ -10,6 +10,7 @@ export const SYSTEM_SUBBLOCK_IDS: string[] = [
'webhookUrlDisplay', // Webhook URL display
'samplePayload', // Example payload display
'setupScript', // Setup script code (e.g., Apps Script)
'scheduleInfo', // Schedule status display (next run, last run)
]
/**

View File

@@ -4,8 +4,9 @@
# Global configuration
global:
imageRegistry: "ghcr.io"
# Use "managed-csi-premium" for Premium SSD (requires Premium storage-capable VMs like Standard_DS*)
# Use "managed-csi" for Standard SSD (works with all VM types)
# Use "managed-csi-premium" for Premium SSD, "managed-csi" for Standard SSD
# IMPORTANT: For production, use a StorageClass with reclaimPolicy: Retain
# to protect database volumes from accidental deletion.
storageClass: "managed-csi"
# Main application

View File

@@ -4,6 +4,7 @@
# Global configuration
global:
imageRegistry: "ghcr.io"
# For production, use a StorageClass with reclaimPolicy: Retain
storageClass: "managed-csi-premium"
# Main application

View File

@@ -11,12 +11,12 @@ spec:
duration: {{ .Values.postgresql.tls.duration | default "87600h" }} # Default: 10 years
renewBefore: {{ .Values.postgresql.tls.renewBefore | default "2160h" }} # Default: 90 days before expiry
isCA: false
{{- if .Values.postgresql.tls.rotationPolicy }}
rotationPolicy: {{ .Values.postgresql.tls.rotationPolicy }}
{{- end }}
privateKey:
algorithm: {{ .Values.postgresql.tls.privateKey.algorithm | default "RSA" }}
size: {{ .Values.postgresql.tls.privateKey.size | default 4096 }}
{{- if .Values.postgresql.tls.rotationPolicy }}
rotationPolicy: {{ .Values.postgresql.tls.rotationPolicy }}
{{- end }}
usages:
- server auth
- client auth

View File

@@ -1,4 +1,4 @@
{{- if .Values.branding.enabled }}
{{- if and .Values.branding.enabled (or .Values.branding.files .Values.branding.binaryFiles) }}
---
# Branding ConfigMap
# Mounts custom branding assets (logos, CSS, etc.) into the application

View File

@@ -110,9 +110,10 @@ spec:
{{- end }}
{{- include "sim.resources" .Values.app | nindent 10 }}
{{- include "sim.securityContext" .Values.app | nindent 10 }}
{{- if or .Values.branding.enabled .Values.extraVolumeMounts .Values.app.extraVolumeMounts }}
{{- $hasBranding := and .Values.branding.enabled (or .Values.branding.files .Values.branding.binaryFiles) }}
{{- if or $hasBranding .Values.extraVolumeMounts .Values.app.extraVolumeMounts }}
volumeMounts:
{{- if .Values.branding.enabled }}
{{- if $hasBranding }}
- name: branding
mountPath: {{ .Values.branding.mountPath | default "/app/public/branding" }}
readOnly: true
@@ -124,9 +125,10 @@ spec:
{{- toYaml . | nindent 12 }}
{{- end }}
{{- end }}
{{- if or .Values.branding.enabled .Values.extraVolumes .Values.app.extraVolumes }}
{{- $hasBranding := and .Values.branding.enabled (or .Values.branding.files .Values.branding.binaryFiles) }}
{{- if or $hasBranding .Values.extraVolumes .Values.app.extraVolumes }}
volumes:
{{- if .Values.branding.enabled }}
{{- if $hasBranding }}
- name: branding
configMap:
name: {{ include "sim.fullname" . }}-branding