mirror of
https://github.com/simstudioai/sim.git
synced 2026-01-28 16:27:55 -05:00
Compare commits
5 Commits
feat/run-f
...
feat/descr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1a1f3e87e7 | ||
|
|
8aaaefcd72 | ||
|
|
242710f4d7 | ||
|
|
43f7f3bc00 | ||
|
|
d4af644159 |
@@ -9,13 +9,24 @@ import { createErrorResponse, createSuccessResponse } from '@/app/api/workflows/
|
||||
|
||||
const logger = createLogger('WorkflowDeploymentVersionAPI')
|
||||
|
||||
const patchBodySchema = z.object({
|
||||
name: z
|
||||
.string()
|
||||
.trim()
|
||||
.min(1, 'Name cannot be empty')
|
||||
.max(100, 'Name must be 100 characters or less'),
|
||||
})
|
||||
const patchBodySchema = z
|
||||
.object({
|
||||
name: z
|
||||
.string()
|
||||
.trim()
|
||||
.min(1, 'Name cannot be empty')
|
||||
.max(100, 'Name must be 100 characters or less')
|
||||
.optional(),
|
||||
description: z
|
||||
.string()
|
||||
.trim()
|
||||
.max(500, 'Description must be 500 characters or less')
|
||||
.nullable()
|
||||
.optional(),
|
||||
})
|
||||
.refine((data) => data.name !== undefined || data.description !== undefined, {
|
||||
message: 'At least one of name or description must be provided',
|
||||
})
|
||||
|
||||
export const dynamic = 'force-dynamic'
|
||||
export const runtime = 'nodejs'
|
||||
@@ -88,33 +99,46 @@ export async function PATCH(
|
||||
return createErrorResponse(validation.error.errors[0]?.message || 'Invalid request body', 400)
|
||||
}
|
||||
|
||||
const { name } = validation.data
|
||||
const { name, description } = validation.data
|
||||
|
||||
const updateData: { name?: string; description?: string | null } = {}
|
||||
if (name !== undefined) {
|
||||
updateData.name = name
|
||||
}
|
||||
if (description !== undefined) {
|
||||
updateData.description = description
|
||||
}
|
||||
|
||||
const [updated] = await db
|
||||
.update(workflowDeploymentVersion)
|
||||
.set({ name })
|
||||
.set(updateData)
|
||||
.where(
|
||||
and(
|
||||
eq(workflowDeploymentVersion.workflowId, id),
|
||||
eq(workflowDeploymentVersion.version, versionNum)
|
||||
)
|
||||
)
|
||||
.returning({ id: workflowDeploymentVersion.id, name: workflowDeploymentVersion.name })
|
||||
.returning({
|
||||
id: workflowDeploymentVersion.id,
|
||||
name: workflowDeploymentVersion.name,
|
||||
description: workflowDeploymentVersion.description,
|
||||
})
|
||||
|
||||
if (!updated) {
|
||||
return createErrorResponse('Deployment version not found', 404)
|
||||
}
|
||||
|
||||
logger.info(
|
||||
`[${requestId}] Renamed deployment version ${version} for workflow ${id} to "${name}"`
|
||||
)
|
||||
logger.info(`[${requestId}] Updated deployment version ${version} for workflow ${id}`, {
|
||||
name: updateData.name,
|
||||
description: updateData.description,
|
||||
})
|
||||
|
||||
return createSuccessResponse({ name: updated.name })
|
||||
return createSuccessResponse({ name: updated.name, description: updated.description })
|
||||
} catch (error: any) {
|
||||
logger.error(
|
||||
`[${requestId}] Error renaming deployment version ${version} for workflow ${id}`,
|
||||
`[${requestId}] Error updating deployment version ${version} for workflow ${id}`,
|
||||
error
|
||||
)
|
||||
return createErrorResponse(error.message || 'Failed to rename deployment version', 500)
|
||||
return createErrorResponse(error.message || 'Failed to update deployment version', 500)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,6 +26,7 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{
|
||||
id: workflowDeploymentVersion.id,
|
||||
version: workflowDeploymentVersion.version,
|
||||
name: workflowDeploymentVersion.name,
|
||||
description: workflowDeploymentVersion.description,
|
||||
isActive: workflowDeploymentVersion.isActive,
|
||||
createdAt: workflowDeploymentVersion.createdAt,
|
||||
createdBy: workflowDeploymentVersion.createdBy,
|
||||
|
||||
@@ -1,216 +0,0 @@
|
||||
import { db, workflow as workflowTable } from '@sim/db'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { eq } from 'drizzle-orm'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { v4 as uuidv4 } from 'uuid'
|
||||
import { z } from 'zod'
|
||||
import { checkHybridAuth } from '@/lib/auth/hybrid'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { SSE_HEADERS } from '@/lib/core/utils/sse'
|
||||
import { markExecutionCancelled } from '@/lib/execution/cancellation'
|
||||
import { LoggingSession } from '@/lib/logs/execution/logging-session'
|
||||
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
|
||||
import { createSSECallbacks } from '@/lib/workflows/executor/execution-events'
|
||||
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
|
||||
import type { ExecutionMetadata, SerializableExecutionState } from '@/executor/execution/types'
|
||||
import { hasExecutionResult } from '@/executor/utils/errors'
|
||||
|
||||
const logger = createLogger('ExecuteFromBlockAPI')
|
||||
|
||||
const ExecuteFromBlockSchema = z.object({
|
||||
startBlockId: z.string().min(1, 'Start block ID is required'),
|
||||
sourceSnapshot: z.object({
|
||||
blockStates: z.record(z.any()),
|
||||
executedBlocks: z.array(z.string()),
|
||||
blockLogs: z.array(z.any()),
|
||||
decisions: z.object({
|
||||
router: z.record(z.string()),
|
||||
condition: z.record(z.string()),
|
||||
}),
|
||||
completedLoops: z.array(z.string()),
|
||||
loopExecutions: z.record(z.any()).optional(),
|
||||
parallelExecutions: z.record(z.any()).optional(),
|
||||
parallelBlockMapping: z.record(z.any()).optional(),
|
||||
activeExecutionPath: z.array(z.string()),
|
||||
}),
|
||||
input: z.any().optional(),
|
||||
})
|
||||
|
||||
export const runtime = 'nodejs'
|
||||
export const dynamic = 'force-dynamic'
|
||||
|
||||
export async function POST(req: NextRequest, { params }: { params: Promise<{ id: string }> }) {
|
||||
const requestId = generateRequestId()
|
||||
const { id: workflowId } = await params
|
||||
|
||||
try {
|
||||
const auth = await checkHybridAuth(req, { requireWorkflowId: false })
|
||||
if (!auth.success || !auth.userId) {
|
||||
return NextResponse.json({ error: auth.error || 'Unauthorized' }, { status: 401 })
|
||||
}
|
||||
const userId = auth.userId
|
||||
|
||||
let body: unknown
|
||||
try {
|
||||
body = await req.json()
|
||||
} catch {
|
||||
return NextResponse.json({ error: 'Invalid JSON body' }, { status: 400 })
|
||||
}
|
||||
|
||||
const validation = ExecuteFromBlockSchema.safeParse(body)
|
||||
if (!validation.success) {
|
||||
logger.warn(`[${requestId}] Invalid request body:`, validation.error.errors)
|
||||
return NextResponse.json(
|
||||
{
|
||||
error: 'Invalid request body',
|
||||
details: validation.error.errors.map((e) => ({
|
||||
path: e.path.join('.'),
|
||||
message: e.message,
|
||||
})),
|
||||
},
|
||||
{ status: 400 }
|
||||
)
|
||||
}
|
||||
|
||||
const { startBlockId, sourceSnapshot, input } = validation.data
|
||||
const executionId = uuidv4()
|
||||
|
||||
const [workflowRecord] = await db
|
||||
.select({ workspaceId: workflowTable.workspaceId, userId: workflowTable.userId })
|
||||
.from(workflowTable)
|
||||
.where(eq(workflowTable.id, workflowId))
|
||||
.limit(1)
|
||||
|
||||
if (!workflowRecord?.workspaceId) {
|
||||
return NextResponse.json({ error: 'Workflow not found or has no workspace' }, { status: 404 })
|
||||
}
|
||||
|
||||
const workspaceId = workflowRecord.workspaceId
|
||||
const workflowUserId = workflowRecord.userId
|
||||
|
||||
logger.info(`[${requestId}] Starting run-from-block execution`, {
|
||||
workflowId,
|
||||
startBlockId,
|
||||
executedBlocksCount: sourceSnapshot.executedBlocks.length,
|
||||
})
|
||||
|
||||
const loggingSession = new LoggingSession(workflowId, executionId, 'manual', requestId)
|
||||
const abortController = new AbortController()
|
||||
let isStreamClosed = false
|
||||
|
||||
const stream = new ReadableStream<Uint8Array>({
|
||||
async start(controller) {
|
||||
const { sendEvent, onBlockStart, onBlockComplete, onStream } = createSSECallbacks({
|
||||
executionId,
|
||||
workflowId,
|
||||
controller,
|
||||
isStreamClosed: () => isStreamClosed,
|
||||
setStreamClosed: () => {
|
||||
isStreamClosed = true
|
||||
},
|
||||
})
|
||||
|
||||
const metadata: ExecutionMetadata = {
|
||||
requestId,
|
||||
workflowId,
|
||||
userId,
|
||||
executionId,
|
||||
triggerType: 'manual',
|
||||
workspaceId,
|
||||
workflowUserId,
|
||||
useDraftState: true,
|
||||
isClientSession: true,
|
||||
startTime: new Date().toISOString(),
|
||||
}
|
||||
|
||||
const snapshot = new ExecutionSnapshot(metadata, {}, input || {}, {})
|
||||
|
||||
try {
|
||||
const startTime = new Date()
|
||||
|
||||
sendEvent({
|
||||
type: 'execution:started',
|
||||
timestamp: startTime.toISOString(),
|
||||
executionId,
|
||||
workflowId,
|
||||
data: { startTime: startTime.toISOString() },
|
||||
})
|
||||
|
||||
const result = await executeWorkflowCore({
|
||||
snapshot,
|
||||
loggingSession,
|
||||
abortSignal: abortController.signal,
|
||||
runFromBlock: {
|
||||
startBlockId,
|
||||
sourceSnapshot: sourceSnapshot as SerializableExecutionState,
|
||||
},
|
||||
callbacks: { onBlockStart, onBlockComplete, onStream },
|
||||
})
|
||||
|
||||
if (result.status === 'cancelled') {
|
||||
sendEvent({
|
||||
type: 'execution:cancelled',
|
||||
timestamp: new Date().toISOString(),
|
||||
executionId,
|
||||
workflowId,
|
||||
data: { duration: result.metadata?.duration || 0 },
|
||||
})
|
||||
} else {
|
||||
sendEvent({
|
||||
type: 'execution:completed',
|
||||
timestamp: new Date().toISOString(),
|
||||
executionId,
|
||||
workflowId,
|
||||
data: {
|
||||
success: result.success,
|
||||
output: result.output,
|
||||
duration: result.metadata?.duration || 0,
|
||||
startTime: result.metadata?.startTime || startTime.toISOString(),
|
||||
endTime: result.metadata?.endTime || new Date().toISOString(),
|
||||
},
|
||||
})
|
||||
}
|
||||
} catch (error: unknown) {
|
||||
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
|
||||
logger.error(`[${requestId}] Run-from-block execution failed: ${errorMessage}`)
|
||||
|
||||
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
|
||||
|
||||
sendEvent({
|
||||
type: 'execution:error',
|
||||
timestamp: new Date().toISOString(),
|
||||
executionId,
|
||||
workflowId,
|
||||
data: {
|
||||
error: executionResult?.error || errorMessage,
|
||||
duration: executionResult?.metadata?.duration || 0,
|
||||
},
|
||||
})
|
||||
} finally {
|
||||
if (!isStreamClosed) {
|
||||
try {
|
||||
controller.enqueue(new TextEncoder().encode('data: [DONE]\n\n'))
|
||||
controller.close()
|
||||
} catch {}
|
||||
}
|
||||
}
|
||||
},
|
||||
cancel() {
|
||||
isStreamClosed = true
|
||||
abortController.abort()
|
||||
markExecutionCancelled(executionId).catch(() => {})
|
||||
},
|
||||
})
|
||||
|
||||
return new NextResponse(stream, {
|
||||
headers: { ...SSE_HEADERS, 'X-Execution-Id': executionId },
|
||||
})
|
||||
} catch (error: unknown) {
|
||||
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
|
||||
logger.error(`[${requestId}] Failed to start run-from-block execution:`, error)
|
||||
return NextResponse.json(
|
||||
{ error: errorMessage || 'Failed to start execution' },
|
||||
{ status: 500 }
|
||||
)
|
||||
}
|
||||
}
|
||||
@@ -53,7 +53,6 @@ const ExecuteWorkflowSchema = z.object({
|
||||
parallels: z.record(z.any()).optional(),
|
||||
})
|
||||
.optional(),
|
||||
stopAfterBlockId: z.string().optional(),
|
||||
})
|
||||
|
||||
export const runtime = 'nodejs'
|
||||
@@ -223,7 +222,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
includeFileBase64,
|
||||
base64MaxBytes,
|
||||
workflowStateOverride,
|
||||
stopAfterBlockId,
|
||||
} = validation.data
|
||||
|
||||
// For API key and internal JWT auth, the entire body is the input (except for our control fields)
|
||||
@@ -239,7 +237,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
includeFileBase64,
|
||||
base64MaxBytes,
|
||||
workflowStateOverride,
|
||||
stopAfterBlockId: _stopAfterBlockId,
|
||||
workflowId: _workflowId, // Also exclude workflowId used for internal JWT auth
|
||||
...rest
|
||||
} = body
|
||||
@@ -437,7 +434,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
loggingSession,
|
||||
includeFileBase64,
|
||||
base64MaxBytes,
|
||||
stopAfterBlockId,
|
||||
})
|
||||
|
||||
const outputWithBase64 = includeFileBase64
|
||||
@@ -726,7 +722,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
|
||||
abortSignal: abortController.signal,
|
||||
includeFileBase64,
|
||||
base64MaxBytes,
|
||||
stopAfterBlockId,
|
||||
})
|
||||
|
||||
if (result.status === 'paused') {
|
||||
|
||||
@@ -1,13 +1,11 @@
|
||||
import { memo, useCallback } from 'react'
|
||||
import { ArrowLeftRight, ArrowUpDown, Circle, CircleOff, LogOut, Play } from 'lucide-react'
|
||||
import { ArrowLeftRight, ArrowUpDown, Circle, CircleOff, LogOut } from 'lucide-react'
|
||||
import { Button, Copy, Tooltip, Trash2 } from '@/components/emcn'
|
||||
import { cn } from '@/lib/core/utils/cn'
|
||||
import { isInputDefinitionTrigger } from '@/lib/workflows/triggers/input-definition-triggers'
|
||||
import { useUserPermissionsContext } from '@/app/workspace/[workspaceId]/providers/workspace-permissions-provider'
|
||||
import { useWorkflowExecution } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks'
|
||||
import { validateTriggerPaste } from '@/app/workspace/[workspaceId]/w/[workflowId]/utils'
|
||||
import { useCollaborativeWorkflow } from '@/hooks/use-collaborative-workflow'
|
||||
import { useExecutionStore } from '@/stores/execution'
|
||||
import { useNotificationStore } from '@/stores/notifications'
|
||||
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
|
||||
import { useWorkflowStore } from '@/stores/workflows/workflow/store'
|
||||
@@ -51,7 +49,6 @@ export const ActionBar = memo(
|
||||
collaborativeBatchToggleBlockHandles,
|
||||
} = useCollaborativeWorkflow()
|
||||
const { setPendingSelection } = useWorkflowRegistry()
|
||||
const { handleRunFromBlock } = useWorkflowExecution()
|
||||
|
||||
const addNotification = useNotificationStore((s) => s.addNotification)
|
||||
|
||||
@@ -100,39 +97,12 @@ export const ActionBar = memo(
|
||||
)
|
||||
)
|
||||
|
||||
const { activeWorkflowId } = useWorkflowRegistry()
|
||||
const { isExecuting, getLastExecutionSnapshot } = useExecutionStore()
|
||||
const userPermissions = useUserPermissionsContext()
|
||||
const edges = useWorkflowStore((state) => state.edges)
|
||||
|
||||
const isStartBlock = isInputDefinitionTrigger(blockType)
|
||||
const isResponseBlock = blockType === 'response'
|
||||
const isNoteBlock = blockType === 'note'
|
||||
const isSubflowBlock = blockType === 'loop' || blockType === 'parallel'
|
||||
const isInsideSubflow = parentId && (parentType === 'loop' || parentType === 'parallel')
|
||||
|
||||
const snapshot = activeWorkflowId ? getLastExecutionSnapshot(activeWorkflowId) : null
|
||||
const incomingEdges = edges.filter((edge) => edge.target === blockId)
|
||||
const isTriggerBlock = incomingEdges.length === 0
|
||||
|
||||
// Check if each source block is either executed OR is a trigger block (triggers don't need prior execution)
|
||||
const isSourceSatisfied = (sourceId: string) => {
|
||||
if (snapshot?.executedBlocks.includes(sourceId)) return true
|
||||
// Check if source is a trigger (has no incoming edges itself)
|
||||
const sourceIncomingEdges = edges.filter((edge) => edge.target === sourceId)
|
||||
return sourceIncomingEdges.length === 0
|
||||
}
|
||||
|
||||
// Non-trigger blocks need a snapshot to exist (so upstream outputs are available)
|
||||
const dependenciesSatisfied =
|
||||
isTriggerBlock || (snapshot && incomingEdges.every((edge) => isSourceSatisfied(edge.source)))
|
||||
const canRunFromBlock =
|
||||
dependenciesSatisfied && !isNoteBlock && !isInsideSubflow && !isExecuting
|
||||
|
||||
const handleRunFromBlockClick = useCallback(() => {
|
||||
if (!activeWorkflowId || !canRunFromBlock) return
|
||||
handleRunFromBlock(blockId, activeWorkflowId)
|
||||
}, [blockId, activeWorkflowId, canRunFromBlock, handleRunFromBlock])
|
||||
|
||||
/**
|
||||
* Get appropriate tooltip message based on disabled state
|
||||
@@ -158,35 +128,30 @@ export const ActionBar = memo(
|
||||
'dark:border-transparent dark:bg-[var(--surface-4)]'
|
||||
)}
|
||||
>
|
||||
{!isNoteBlock && !isInsideSubflow && (
|
||||
{!isNoteBlock && (
|
||||
<Tooltip.Root>
|
||||
<Tooltip.Trigger asChild>
|
||||
<Button
|
||||
variant='ghost'
|
||||
onClick={(e) => {
|
||||
e.stopPropagation()
|
||||
if (canRunFromBlock && !disabled) {
|
||||
handleRunFromBlockClick()
|
||||
if (!disabled) {
|
||||
collaborativeBatchToggleBlockEnabled([blockId])
|
||||
}
|
||||
}}
|
||||
className={ACTION_BUTTON_STYLES}
|
||||
disabled={disabled || !canRunFromBlock}
|
||||
disabled={disabled}
|
||||
>
|
||||
<Play className={ICON_SIZE} />
|
||||
{isEnabled ? <Circle className={ICON_SIZE} /> : <CircleOff className={ICON_SIZE} />}
|
||||
</Button>
|
||||
</Tooltip.Trigger>
|
||||
<Tooltip.Content side='top'>
|
||||
{(() => {
|
||||
if (disabled) return getTooltipMessage('Run from block')
|
||||
if (isExecuting) return 'Execution in progress'
|
||||
if (!dependenciesSatisfied) return 'Run upstream blocks first'
|
||||
return 'Run from block'
|
||||
})()}
|
||||
{getTooltipMessage(isEnabled ? 'Disable Block' : 'Enable Block')}
|
||||
</Tooltip.Content>
|
||||
</Tooltip.Root>
|
||||
)}
|
||||
|
||||
{!isNoteBlock && (
|
||||
{isSubflowBlock && (
|
||||
<Tooltip.Root>
|
||||
<Tooltip.Trigger asChild>
|
||||
<Button
|
||||
|
||||
@@ -40,16 +40,9 @@ export interface BlockMenuProps {
|
||||
onRemoveFromSubflow: () => void
|
||||
onOpenEditor: () => void
|
||||
onRename: () => void
|
||||
onRunFromBlock?: () => void
|
||||
onRunUntilBlock?: () => void
|
||||
hasClipboard?: boolean
|
||||
showRemoveFromSubflow?: boolean
|
||||
/** Whether run from block is available (has snapshot, was executed, not inside subflow) */
|
||||
canRunFromBlock?: boolean
|
||||
disableEdit?: boolean
|
||||
isExecuting?: boolean
|
||||
/** Whether the selected block is a trigger (has no incoming edges) */
|
||||
isPositionalTrigger?: boolean
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -72,14 +65,9 @@ export function BlockMenu({
|
||||
onRemoveFromSubflow,
|
||||
onOpenEditor,
|
||||
onRename,
|
||||
onRunFromBlock,
|
||||
onRunUntilBlock,
|
||||
hasClipboard = false,
|
||||
showRemoveFromSubflow = false,
|
||||
canRunFromBlock = false,
|
||||
disableEdit = false,
|
||||
isExecuting = false,
|
||||
isPositionalTrigger = false,
|
||||
}: BlockMenuProps) {
|
||||
const isSingleBlock = selectedBlocks.length === 1
|
||||
|
||||
@@ -90,15 +78,10 @@ export function BlockMenu({
|
||||
(b) =>
|
||||
TriggerUtils.requiresSingleInstance(b.type) || TriggerUtils.isSingleInstanceBlockType(b.type)
|
||||
)
|
||||
// A block is a trigger if it's explicitly a trigger type OR has no incoming edges (positional trigger)
|
||||
const hasTriggerBlock =
|
||||
selectedBlocks.some((b) => TriggerUtils.isTriggerBlock(b)) || isPositionalTrigger
|
||||
const hasTriggerBlock = selectedBlocks.some((b) => TriggerUtils.isTriggerBlock(b))
|
||||
const allNoteBlocks = selectedBlocks.every((b) => b.type === 'note')
|
||||
const isSubflow =
|
||||
isSingleBlock && (selectedBlocks[0]?.type === 'loop' || selectedBlocks[0]?.type === 'parallel')
|
||||
const isInsideSubflow =
|
||||
isSingleBlock &&
|
||||
(selectedBlocks[0]?.parentType === 'loop' || selectedBlocks[0]?.parentType === 'parallel')
|
||||
|
||||
const canRemoveFromSubflow = showRemoveFromSubflow && !hasTriggerBlock
|
||||
|
||||
@@ -220,38 +203,6 @@ export function BlockMenu({
|
||||
</PopoverItem>
|
||||
)}
|
||||
|
||||
{/* Run from/until block - only for single non-note block, not inside subflows */}
|
||||
{isSingleBlock && !allNoteBlocks && !isInsideSubflow && (
|
||||
<>
|
||||
<PopoverDivider />
|
||||
<PopoverItem
|
||||
disabled={!canRunFromBlock || isExecuting}
|
||||
onClick={() => {
|
||||
if (canRunFromBlock && !isExecuting) {
|
||||
onRunFromBlock?.()
|
||||
onClose()
|
||||
}
|
||||
}}
|
||||
>
|
||||
Run from block
|
||||
</PopoverItem>
|
||||
{/* Hide "Run until" for triggers - they're always at the start */}
|
||||
{!hasTriggerBlock && (
|
||||
<PopoverItem
|
||||
disabled={isExecuting}
|
||||
onClick={() => {
|
||||
if (!isExecuting) {
|
||||
onRunUntilBlock?.()
|
||||
onClose()
|
||||
}
|
||||
}}
|
||||
>
|
||||
Run until block
|
||||
</PopoverItem>
|
||||
)}
|
||||
</>
|
||||
)}
|
||||
|
||||
{/* Destructive action */}
|
||||
<PopoverDivider />
|
||||
<PopoverItem
|
||||
|
||||
@@ -0,0 +1,170 @@
|
||||
'use client'
|
||||
|
||||
import { useCallback, useRef, useState } from 'react'
|
||||
import {
|
||||
Button,
|
||||
Modal,
|
||||
ModalBody,
|
||||
ModalContent,
|
||||
ModalFooter,
|
||||
ModalHeader,
|
||||
Textarea,
|
||||
} from '@/components/emcn'
|
||||
import {
|
||||
useGenerateVersionDescription,
|
||||
useUpdateDeploymentVersion,
|
||||
} from '@/hooks/queries/deployments'
|
||||
|
||||
interface VersionDescriptionModalProps {
|
||||
open: boolean
|
||||
onOpenChange: (open: boolean) => void
|
||||
workflowId: string
|
||||
version: number
|
||||
versionName: string
|
||||
currentDescription: string | null | undefined
|
||||
}
|
||||
|
||||
export function VersionDescriptionModal({
|
||||
open,
|
||||
onOpenChange,
|
||||
workflowId,
|
||||
version,
|
||||
versionName,
|
||||
currentDescription,
|
||||
}: VersionDescriptionModalProps) {
|
||||
const initialDescriptionRef = useRef(currentDescription || '')
|
||||
const [description, setDescription] = useState(initialDescriptionRef.current)
|
||||
const [showUnsavedChangesAlert, setShowUnsavedChangesAlert] = useState(false)
|
||||
|
||||
const updateMutation = useUpdateDeploymentVersion()
|
||||
const generateMutation = useGenerateVersionDescription()
|
||||
|
||||
const hasChanges = description.trim() !== initialDescriptionRef.current.trim()
|
||||
const isGenerating = generateMutation.isPending
|
||||
|
||||
const handleCloseAttempt = useCallback(() => {
|
||||
if (updateMutation.isPending || isGenerating) {
|
||||
return
|
||||
}
|
||||
if (hasChanges) {
|
||||
setShowUnsavedChangesAlert(true)
|
||||
} else {
|
||||
onOpenChange(false)
|
||||
}
|
||||
}, [hasChanges, updateMutation.isPending, isGenerating, onOpenChange])
|
||||
|
||||
const handleDiscardChanges = useCallback(() => {
|
||||
setShowUnsavedChangesAlert(false)
|
||||
setDescription(initialDescriptionRef.current)
|
||||
onOpenChange(false)
|
||||
}, [onOpenChange])
|
||||
|
||||
const handleGenerateDescription = useCallback(() => {
|
||||
generateMutation.mutate({
|
||||
workflowId,
|
||||
version,
|
||||
onStreamChunk: (accumulated) => {
|
||||
setDescription(accumulated)
|
||||
},
|
||||
})
|
||||
}, [workflowId, version, generateMutation])
|
||||
|
||||
const handleSave = useCallback(() => {
|
||||
if (!workflowId) return
|
||||
|
||||
updateMutation.mutate(
|
||||
{
|
||||
workflowId,
|
||||
version,
|
||||
description: description.trim() || null,
|
||||
},
|
||||
{
|
||||
onSuccess: () => {
|
||||
onOpenChange(false)
|
||||
},
|
||||
}
|
||||
)
|
||||
}, [workflowId, version, description, updateMutation, onOpenChange])
|
||||
|
||||
return (
|
||||
<>
|
||||
<Modal open={open} onOpenChange={(openState) => !openState && handleCloseAttempt()}>
|
||||
<ModalContent className='max-w-[480px]'>
|
||||
<ModalHeader>
|
||||
<span>Version Description</span>
|
||||
</ModalHeader>
|
||||
<ModalBody className='space-y-[10px]'>
|
||||
<div className='flex items-center justify-between'>
|
||||
<p className='text-[12px] text-[var(--text-secondary)]'>
|
||||
{currentDescription ? 'Edit the' : 'Add a'} description for{' '}
|
||||
<span className='font-medium text-[var(--text-primary)]'>{versionName}</span>
|
||||
</p>
|
||||
<Button
|
||||
variant='active'
|
||||
className='-my-1 h-5 px-2 py-0 text-[11px]'
|
||||
onClick={handleGenerateDescription}
|
||||
disabled={isGenerating || updateMutation.isPending}
|
||||
>
|
||||
{isGenerating ? 'Generating...' : 'Generate'}
|
||||
</Button>
|
||||
</div>
|
||||
<Textarea
|
||||
placeholder='Describe the changes in this deployment version...'
|
||||
className='min-h-[120px] resize-none'
|
||||
value={description}
|
||||
onChange={(e) => setDescription(e.target.value)}
|
||||
maxLength={500}
|
||||
disabled={isGenerating}
|
||||
/>
|
||||
<div className='flex items-center justify-between'>
|
||||
{(updateMutation.error || generateMutation.error) && (
|
||||
<p className='text-[12px] text-[var(--text-error)]'>
|
||||
{updateMutation.error?.message || generateMutation.error?.message}
|
||||
</p>
|
||||
)}
|
||||
{!updateMutation.error && !generateMutation.error && <div />}
|
||||
<p className='text-[11px] text-[var(--text-tertiary)]'>{description.length}/500</p>
|
||||
</div>
|
||||
</ModalBody>
|
||||
<ModalFooter>
|
||||
<Button
|
||||
variant='default'
|
||||
onClick={handleCloseAttempt}
|
||||
disabled={updateMutation.isPending || isGenerating}
|
||||
>
|
||||
Cancel
|
||||
</Button>
|
||||
<Button
|
||||
variant='tertiary'
|
||||
onClick={handleSave}
|
||||
disabled={updateMutation.isPending || isGenerating || !hasChanges}
|
||||
>
|
||||
{updateMutation.isPending ? 'Saving...' : 'Save'}
|
||||
</Button>
|
||||
</ModalFooter>
|
||||
</ModalContent>
|
||||
</Modal>
|
||||
|
||||
<Modal open={showUnsavedChangesAlert} onOpenChange={setShowUnsavedChangesAlert}>
|
||||
<ModalContent className='max-w-[400px]'>
|
||||
<ModalHeader>
|
||||
<span>Unsaved Changes</span>
|
||||
</ModalHeader>
|
||||
<ModalBody>
|
||||
<p className='text-[14px] text-[var(--text-secondary)]'>
|
||||
You have unsaved changes. Are you sure you want to discard them?
|
||||
</p>
|
||||
</ModalBody>
|
||||
<ModalFooter>
|
||||
<Button variant='default' onClick={() => setShowUnsavedChangesAlert(false)}>
|
||||
Keep Editing
|
||||
</Button>
|
||||
<Button variant='destructive' onClick={handleDiscardChanges}>
|
||||
Discard Changes
|
||||
</Button>
|
||||
</ModalFooter>
|
||||
</ModalContent>
|
||||
</Modal>
|
||||
</>
|
||||
)
|
||||
}
|
||||
@@ -1,26 +1,31 @@
|
||||
'use client'
|
||||
|
||||
import { useEffect, useRef, useState } from 'react'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import clsx from 'clsx'
|
||||
import { MoreVertical, Pencil, RotateCcw, SendToBack } from 'lucide-react'
|
||||
import { Button, Popover, PopoverContent, PopoverItem, PopoverTrigger } from '@/components/emcn'
|
||||
import { FileText, MoreVertical, Pencil, RotateCcw, SendToBack } from 'lucide-react'
|
||||
import {
|
||||
Button,
|
||||
Popover,
|
||||
PopoverContent,
|
||||
PopoverItem,
|
||||
PopoverTrigger,
|
||||
Tooltip,
|
||||
} from '@/components/emcn'
|
||||
import { Skeleton } from '@/components/ui'
|
||||
import { formatDateTime } from '@/lib/core/utils/formatting'
|
||||
import type { WorkflowDeploymentVersionResponse } from '@/lib/workflows/persistence/utils'
|
||||
import { useUpdateDeploymentVersion } from '@/hooks/queries/deployments'
|
||||
import { VersionDescriptionModal } from './version-description-modal'
|
||||
|
||||
const logger = createLogger('Versions')
|
||||
|
||||
/** Shared styling constants aligned with terminal component */
|
||||
const HEADER_TEXT_CLASS = 'font-medium text-[var(--text-tertiary)] text-[12px]'
|
||||
const ROW_TEXT_CLASS = 'font-medium text-[var(--text-primary)] text-[12px]'
|
||||
const COLUMN_BASE_CLASS = 'flex-shrink-0'
|
||||
|
||||
/** Column width configuration */
|
||||
const COLUMN_WIDTHS = {
|
||||
VERSION: 'w-[180px]',
|
||||
DEPLOYED_BY: 'w-[140px]',
|
||||
TIMESTAMP: 'flex-1',
|
||||
ACTIONS: 'w-[32px]',
|
||||
ACTIONS: 'w-[56px]',
|
||||
} as const
|
||||
|
||||
interface VersionsProps {
|
||||
@@ -31,34 +36,6 @@ interface VersionsProps {
|
||||
onSelectVersion: (version: number | null) => void
|
||||
onPromoteToLive: (version: number) => void
|
||||
onLoadDeployment: (version: number) => void
|
||||
fetchVersions: () => Promise<void>
|
||||
}
|
||||
|
||||
/**
|
||||
* Formats a timestamp into a readable string.
|
||||
* @param value - The date string or Date object to format
|
||||
* @returns Formatted string like "8:36 PM PT on Oct 11, 2025"
|
||||
*/
|
||||
const formatDate = (value: string | Date): string => {
|
||||
const date = value instanceof Date ? value : new Date(value)
|
||||
if (Number.isNaN(date.getTime())) {
|
||||
return '-'
|
||||
}
|
||||
|
||||
const timePart = date.toLocaleTimeString('en-US', {
|
||||
hour: 'numeric',
|
||||
minute: '2-digit',
|
||||
hour12: true,
|
||||
timeZoneName: 'short',
|
||||
})
|
||||
|
||||
const datePart = date.toLocaleDateString('en-US', {
|
||||
month: 'short',
|
||||
day: 'numeric',
|
||||
year: 'numeric',
|
||||
})
|
||||
|
||||
return `${timePart} on ${datePart}`
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -73,14 +50,15 @@ export function Versions({
|
||||
onSelectVersion,
|
||||
onPromoteToLive,
|
||||
onLoadDeployment,
|
||||
fetchVersions,
|
||||
}: VersionsProps) {
|
||||
const [editingVersion, setEditingVersion] = useState<number | null>(null)
|
||||
const [editValue, setEditValue] = useState('')
|
||||
const [isRenaming, setIsRenaming] = useState(false)
|
||||
const [openDropdown, setOpenDropdown] = useState<number | null>(null)
|
||||
const [descriptionModalVersion, setDescriptionModalVersion] = useState<number | null>(null)
|
||||
const inputRef = useRef<HTMLInputElement>(null)
|
||||
|
||||
const renameMutation = useUpdateDeploymentVersion()
|
||||
|
||||
useEffect(() => {
|
||||
if (editingVersion !== null && inputRef.current) {
|
||||
inputRef.current.focus()
|
||||
@@ -94,7 +72,8 @@ export function Versions({
|
||||
setEditValue(currentName || `v${version}`)
|
||||
}
|
||||
|
||||
const handleSaveRename = async (version: number) => {
|
||||
const handleSaveRename = (version: number) => {
|
||||
if (renameMutation.isPending) return
|
||||
if (!workflowId || !editValue.trim()) {
|
||||
setEditingVersion(null)
|
||||
return
|
||||
@@ -108,25 +87,21 @@ export function Versions({
|
||||
return
|
||||
}
|
||||
|
||||
setIsRenaming(true)
|
||||
try {
|
||||
const res = await fetch(`/api/workflows/${workflowId}/deployments/${version}`, {
|
||||
method: 'PATCH',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({ name: editValue.trim() }),
|
||||
})
|
||||
|
||||
if (res.ok) {
|
||||
await fetchVersions()
|
||||
setEditingVersion(null)
|
||||
} else {
|
||||
logger.error('Failed to rename version')
|
||||
renameMutation.mutate(
|
||||
{
|
||||
workflowId,
|
||||
version,
|
||||
name: editValue.trim(),
|
||||
},
|
||||
{
|
||||
onSuccess: () => {
|
||||
setEditingVersion(null)
|
||||
},
|
||||
onError: () => {
|
||||
// Keep editing state open on error so user can retry
|
||||
},
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('Error renaming version:', error)
|
||||
} finally {
|
||||
setIsRenaming(false)
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
const handleCancelRename = () => {
|
||||
@@ -149,6 +124,16 @@ export function Versions({
|
||||
onLoadDeployment(version)
|
||||
}
|
||||
|
||||
const handleOpenDescriptionModal = (version: number) => {
|
||||
setOpenDropdown(null)
|
||||
setDescriptionModalVersion(version)
|
||||
}
|
||||
|
||||
const descriptionModalVersionData =
|
||||
descriptionModalVersion !== null
|
||||
? versions.find((v) => v.version === descriptionModalVersion)
|
||||
: null
|
||||
|
||||
if (versionsLoading && versions.length === 0) {
|
||||
return (
|
||||
<div className='overflow-hidden rounded-[4px] border border-[var(--border)]'>
|
||||
@@ -179,7 +164,14 @@ export function Versions({
|
||||
<div className={clsx(COLUMN_WIDTHS.TIMESTAMP, 'min-w-0')}>
|
||||
<Skeleton className='h-[12px] w-[160px]' />
|
||||
</div>
|
||||
<div className={clsx(COLUMN_WIDTHS.ACTIONS, COLUMN_BASE_CLASS, 'flex justify-end')}>
|
||||
<div
|
||||
className={clsx(
|
||||
COLUMN_WIDTHS.ACTIONS,
|
||||
COLUMN_BASE_CLASS,
|
||||
'flex justify-end gap-[2px]'
|
||||
)}
|
||||
>
|
||||
<Skeleton className='h-[20px] w-[20px] rounded-[4px]' />
|
||||
<Skeleton className='h-[20px] w-[20px] rounded-[4px]' />
|
||||
</div>
|
||||
</div>
|
||||
@@ -257,7 +249,7 @@ export function Versions({
|
||||
'text-[var(--text-primary)] focus:outline-none focus:ring-0'
|
||||
)}
|
||||
maxLength={100}
|
||||
disabled={isRenaming}
|
||||
disabled={renameMutation.isPending}
|
||||
autoComplete='off'
|
||||
autoCorrect='off'
|
||||
autoCapitalize='off'
|
||||
@@ -289,14 +281,40 @@ export function Versions({
|
||||
<span
|
||||
className={clsx('block truncate text-[var(--text-tertiary)]', ROW_TEXT_CLASS)}
|
||||
>
|
||||
{formatDate(v.createdAt)}
|
||||
{formatDateTime(new Date(v.createdAt))}
|
||||
</span>
|
||||
</div>
|
||||
|
||||
<div
|
||||
className={clsx(COLUMN_WIDTHS.ACTIONS, COLUMN_BASE_CLASS, 'flex justify-end')}
|
||||
className={clsx(
|
||||
COLUMN_WIDTHS.ACTIONS,
|
||||
COLUMN_BASE_CLASS,
|
||||
'flex items-center justify-end gap-[2px]'
|
||||
)}
|
||||
onClick={(e) => e.stopPropagation()}
|
||||
>
|
||||
<Tooltip.Root>
|
||||
<Tooltip.Trigger asChild>
|
||||
<Button
|
||||
variant='ghost'
|
||||
className={clsx(
|
||||
'!p-1',
|
||||
!v.description &&
|
||||
'text-[var(--text-quaternary)] hover:text-[var(--text-tertiary)]'
|
||||
)}
|
||||
onClick={() => handleOpenDescriptionModal(v.version)}
|
||||
>
|
||||
<FileText className='h-3.5 w-3.5' />
|
||||
</Button>
|
||||
</Tooltip.Trigger>
|
||||
<Tooltip.Content side='top' className='max-w-[240px]'>
|
||||
{v.description ? (
|
||||
<p className='line-clamp-3 text-[12px]'>{v.description}</p>
|
||||
) : (
|
||||
<p className='text-[12px]'>Add description</p>
|
||||
)}
|
||||
</Tooltip.Content>
|
||||
</Tooltip.Root>
|
||||
<Popover
|
||||
open={openDropdown === v.version}
|
||||
onOpenChange={(open) => setOpenDropdown(open ? v.version : null)}
|
||||
@@ -311,6 +329,10 @@ export function Versions({
|
||||
<Pencil className='h-3 w-3' />
|
||||
<span>Rename</span>
|
||||
</PopoverItem>
|
||||
<PopoverItem onClick={() => handleOpenDescriptionModal(v.version)}>
|
||||
<FileText className='h-3 w-3' />
|
||||
<span>{v.description ? 'Edit description' : 'Add description'}</span>
|
||||
</PopoverItem>
|
||||
{!v.isActive && (
|
||||
<PopoverItem onClick={() => handlePromote(v.version)}>
|
||||
<RotateCcw className='h-3 w-3' />
|
||||
@@ -328,6 +350,20 @@ export function Versions({
|
||||
)
|
||||
})}
|
||||
</div>
|
||||
|
||||
{workflowId && descriptionModalVersionData && (
|
||||
<VersionDescriptionModal
|
||||
key={descriptionModalVersionData.version}
|
||||
open={descriptionModalVersion !== null}
|
||||
onOpenChange={(open) => !open && setDescriptionModalVersion(null)}
|
||||
workflowId={workflowId}
|
||||
version={descriptionModalVersionData.version}
|
||||
versionName={
|
||||
descriptionModalVersionData.name || `v${descriptionModalVersionData.version}`
|
||||
}
|
||||
currentDescription={descriptionModalVersionData.description}
|
||||
/>
|
||||
)}
|
||||
</div>
|
||||
)
|
||||
}
|
||||
|
||||
@@ -32,7 +32,6 @@ interface GeneralDeployProps {
|
||||
versionsLoading: boolean
|
||||
onPromoteToLive: (version: number) => Promise<void>
|
||||
onLoadDeploymentComplete: () => void
|
||||
fetchVersions: () => Promise<void>
|
||||
}
|
||||
|
||||
type PreviewMode = 'active' | 'selected'
|
||||
@@ -48,7 +47,6 @@ export function GeneralDeploy({
|
||||
versionsLoading,
|
||||
onPromoteToLive,
|
||||
onLoadDeploymentComplete,
|
||||
fetchVersions,
|
||||
}: GeneralDeployProps) {
|
||||
const [selectedVersion, setSelectedVersion] = useState<number | null>(null)
|
||||
const [previewMode, setPreviewMode] = useState<PreviewMode>('active')
|
||||
@@ -229,7 +227,6 @@ export function GeneralDeploy({
|
||||
onSelectVersion={handleSelectVersion}
|
||||
onPromoteToLive={handlePromoteToLive}
|
||||
onLoadDeployment={handleLoadDeployment}
|
||||
fetchVersions={fetchVersions}
|
||||
/>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
@@ -135,11 +135,9 @@ export function DeployModal({
|
||||
refetch: refetchDeploymentInfo,
|
||||
} = useDeploymentInfo(workflowId, { enabled: open && isDeployed })
|
||||
|
||||
const {
|
||||
data: versionsData,
|
||||
isLoading: versionsLoading,
|
||||
refetch: refetchVersions,
|
||||
} = useDeploymentVersions(workflowId, { enabled: open })
|
||||
const { data: versionsData, isLoading: versionsLoading } = useDeploymentVersions(workflowId, {
|
||||
enabled: open,
|
||||
})
|
||||
|
||||
const {
|
||||
isLoading: isLoadingChat,
|
||||
@@ -450,10 +448,6 @@ export function DeployModal({
|
||||
deleteTrigger?.click()
|
||||
}, [])
|
||||
|
||||
const handleFetchVersions = useCallback(async () => {
|
||||
await refetchVersions()
|
||||
}, [refetchVersions])
|
||||
|
||||
const isSubmitting = deployMutation.isPending
|
||||
const isUndeploying = undeployMutation.isPending
|
||||
|
||||
@@ -512,7 +506,6 @@ export function DeployModal({
|
||||
versionsLoading={versionsLoading}
|
||||
onPromoteToLive={handlePromoteToLive}
|
||||
onLoadDeploymentComplete={handleCloseModal}
|
||||
fetchVersions={handleFetchVersions}
|
||||
/>
|
||||
</ModalTabsContent>
|
||||
|
||||
|
||||
@@ -15,16 +15,13 @@ import {
|
||||
TriggerUtils,
|
||||
} from '@/lib/workflows/triggers/triggers'
|
||||
import { useCurrentWorkflow } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-current-workflow'
|
||||
import { getBlock } from '@/blocks'
|
||||
import type { SerializableExecutionState } from '@/executor/execution/types'
|
||||
import type { BlockLog, BlockState, ExecutionResult, StreamingExecution } from '@/executor/types'
|
||||
import type { BlockLog, ExecutionResult, StreamingExecution } from '@/executor/types'
|
||||
import { hasExecutionResult } from '@/executor/utils/errors'
|
||||
import { coerceValue } from '@/executor/utils/start-block'
|
||||
import { subscriptionKeys } from '@/hooks/queries/subscription'
|
||||
import { useExecutionStream } from '@/hooks/use-execution-stream'
|
||||
import { WorkflowValidationError } from '@/serializer'
|
||||
import { useExecutionStore } from '@/stores/execution'
|
||||
import { useNotificationStore } from '@/stores/notifications'
|
||||
import { useVariablesStore } from '@/stores/panel'
|
||||
import { useEnvironmentStore } from '@/stores/settings/environment'
|
||||
import { type ConsoleEntry, useTerminalConsoleStore } from '@/stores/terminal'
|
||||
@@ -101,15 +98,11 @@ export function useWorkflowExecution() {
|
||||
setActiveBlocks,
|
||||
setBlockRunStatus,
|
||||
setEdgeRunStatus,
|
||||
setLastExecutionSnapshot,
|
||||
getLastExecutionSnapshot,
|
||||
clearLastExecutionSnapshot,
|
||||
} = useExecutionStore()
|
||||
const [executionResult, setExecutionResult] = useState<ExecutionResult | null>(null)
|
||||
const executionStream = useExecutionStream()
|
||||
const currentChatExecutionIdRef = useRef<string | null>(null)
|
||||
const isViewingDiff = useWorkflowDiffStore((state) => state.isShowingDiff)
|
||||
const addNotification = useNotificationStore((state) => state.addNotification)
|
||||
|
||||
/**
|
||||
* Validates debug state before performing debug operations
|
||||
@@ -675,8 +668,7 @@ export function useWorkflowExecution() {
|
||||
onStream?: (se: StreamingExecution) => Promise<void>,
|
||||
executionId?: string,
|
||||
onBlockComplete?: (blockId: string, output: any) => Promise<void>,
|
||||
overrideTriggerType?: 'chat' | 'manual' | 'api',
|
||||
stopAfterBlockId?: string
|
||||
overrideTriggerType?: 'chat' | 'manual' | 'api'
|
||||
): Promise<ExecutionResult | StreamingExecution> => {
|
||||
// Use diff workflow for execution when available, regardless of canvas view state
|
||||
const executionWorkflowState = null as {
|
||||
@@ -884,8 +876,6 @@ export function useWorkflowExecution() {
|
||||
const activeBlocksSet = new Set<string>()
|
||||
const streamedContent = new Map<string, string>()
|
||||
const accumulatedBlockLogs: BlockLog[] = []
|
||||
const accumulatedBlockStates = new Map<string, BlockState>()
|
||||
const executedBlockIds = new Set<string>()
|
||||
|
||||
// Execute the workflow
|
||||
try {
|
||||
@@ -897,7 +887,6 @@ export function useWorkflowExecution() {
|
||||
triggerType: overrideTriggerType || 'manual',
|
||||
useDraftState: true,
|
||||
isClientSession: true,
|
||||
stopAfterBlockId,
|
||||
workflowStateOverride: executionWorkflowState
|
||||
? {
|
||||
blocks: executionWorkflowState.blocks,
|
||||
@@ -927,22 +916,18 @@ export function useWorkflowExecution() {
|
||||
logger.info('onBlockCompleted received:', { data })
|
||||
|
||||
activeBlocksSet.delete(data.blockId)
|
||||
// Create a new Set to trigger React re-render
|
||||
setActiveBlocks(new Set(activeBlocksSet))
|
||||
|
||||
// Track successful block execution in run path
|
||||
setBlockRunStatus(data.blockId, 'success')
|
||||
|
||||
executedBlockIds.add(data.blockId)
|
||||
accumulatedBlockStates.set(data.blockId, {
|
||||
output: data.output,
|
||||
executed: true,
|
||||
executionTime: data.durationMs,
|
||||
})
|
||||
|
||||
const isContainerBlock = data.blockType === 'loop' || data.blockType === 'parallel'
|
||||
if (isContainerBlock) return
|
||||
// Edges already tracked in onBlockStarted, no need to track again
|
||||
|
||||
const startedAt = new Date(Date.now() - data.durationMs).toISOString()
|
||||
const endedAt = new Date().toISOString()
|
||||
|
||||
// Accumulate block log for the execution result
|
||||
accumulatedBlockLogs.push({
|
||||
blockId: data.blockId,
|
||||
blockName: data.blockName || 'Unknown Block',
|
||||
@@ -1071,53 +1056,6 @@ export function useWorkflowExecution() {
|
||||
},
|
||||
logs: accumulatedBlockLogs,
|
||||
}
|
||||
|
||||
// Add trigger block to executed blocks so downstream blocks can use run-from-block
|
||||
if (data.success && startBlockId) {
|
||||
executedBlockIds.add(startBlockId)
|
||||
}
|
||||
|
||||
if (data.success && activeWorkflowId) {
|
||||
if (stopAfterBlockId) {
|
||||
const existingSnapshot = getLastExecutionSnapshot(activeWorkflowId)
|
||||
const mergedBlockStates = {
|
||||
...(existingSnapshot?.blockStates || {}),
|
||||
...Object.fromEntries(accumulatedBlockStates),
|
||||
}
|
||||
const mergedExecutedBlocks = new Set([
|
||||
...(existingSnapshot?.executedBlocks || []),
|
||||
...executedBlockIds,
|
||||
])
|
||||
const snapshot: SerializableExecutionState = {
|
||||
blockStates: mergedBlockStates,
|
||||
executedBlocks: Array.from(mergedExecutedBlocks),
|
||||
blockLogs: [...(existingSnapshot?.blockLogs || []), ...accumulatedBlockLogs],
|
||||
decisions: existingSnapshot?.decisions || { router: {}, condition: {} },
|
||||
completedLoops: existingSnapshot?.completedLoops || [],
|
||||
activeExecutionPath: Array.from(mergedExecutedBlocks),
|
||||
}
|
||||
setLastExecutionSnapshot(activeWorkflowId, snapshot)
|
||||
logger.info('Merged execution snapshot after run-until-block', {
|
||||
workflowId: activeWorkflowId,
|
||||
newBlocksExecuted: executedBlockIds.size,
|
||||
totalExecutedBlocks: mergedExecutedBlocks.size,
|
||||
})
|
||||
} else {
|
||||
const snapshot: SerializableExecutionState = {
|
||||
blockStates: Object.fromEntries(accumulatedBlockStates),
|
||||
executedBlocks: Array.from(executedBlockIds),
|
||||
blockLogs: accumulatedBlockLogs,
|
||||
decisions: { router: {}, condition: {} },
|
||||
completedLoops: [],
|
||||
activeExecutionPath: Array.from(executedBlockIds),
|
||||
}
|
||||
setLastExecutionSnapshot(activeWorkflowId, snapshot)
|
||||
logger.info('Stored execution snapshot for run-from-block', {
|
||||
workflowId: activeWorkflowId,
|
||||
executedBlocksCount: executedBlockIds.size,
|
||||
})
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
onExecutionError: (data) => {
|
||||
@@ -1438,330 +1376,6 @@ export function useWorkflowExecution() {
|
||||
setActiveBlocks,
|
||||
])
|
||||
|
||||
/**
|
||||
* Handles running workflow from a specific block using cached outputs
|
||||
*/
|
||||
const handleRunFromBlock = useCallback(
|
||||
async (blockId: string, workflowId: string) => {
|
||||
const snapshot = getLastExecutionSnapshot(workflowId)
|
||||
const workflowEdges = useWorkflowStore.getState().edges
|
||||
const incomingEdges = workflowEdges.filter((edge) => edge.target === blockId)
|
||||
const isTriggerBlock = incomingEdges.length === 0
|
||||
|
||||
// Check if each source block is either executed OR is a trigger block (triggers don't need prior execution)
|
||||
const isSourceSatisfied = (sourceId: string) => {
|
||||
if (snapshot?.executedBlocks.includes(sourceId)) return true
|
||||
// Check if source is a trigger (has no incoming edges itself)
|
||||
const sourceIncomingEdges = workflowEdges.filter((edge) => edge.target === sourceId)
|
||||
return sourceIncomingEdges.length === 0
|
||||
}
|
||||
|
||||
// Non-trigger blocks need a snapshot to exist (so upstream outputs are available)
|
||||
if (!snapshot && !isTriggerBlock) {
|
||||
logger.error('No execution snapshot available for run-from-block', { workflowId, blockId })
|
||||
return
|
||||
}
|
||||
|
||||
const dependenciesSatisfied =
|
||||
isTriggerBlock || incomingEdges.every((edge) => isSourceSatisfied(edge.source))
|
||||
|
||||
if (!dependenciesSatisfied) {
|
||||
logger.error('Upstream dependencies not satisfied for run-from-block', {
|
||||
workflowId,
|
||||
blockId,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// For trigger blocks, always use empty snapshot to prevent stale data from different
|
||||
// execution paths from being resolved. For non-trigger blocks, use the existing snapshot.
|
||||
const emptySnapshot: SerializableExecutionState = {
|
||||
blockStates: {},
|
||||
executedBlocks: [],
|
||||
blockLogs: [],
|
||||
decisions: { router: {}, condition: {} },
|
||||
completedLoops: [],
|
||||
activeExecutionPath: [],
|
||||
}
|
||||
const effectiveSnapshot: SerializableExecutionState = isTriggerBlock
|
||||
? emptySnapshot
|
||||
: snapshot || emptySnapshot
|
||||
|
||||
// Extract mock payload for trigger blocks
|
||||
let workflowInput: any
|
||||
if (isTriggerBlock) {
|
||||
const workflowBlocks = useWorkflowStore.getState().blocks
|
||||
const mergedStates = mergeSubblockState(workflowBlocks, workflowId)
|
||||
const candidates = resolveStartCandidates(mergedStates, { execution: 'manual' })
|
||||
const candidate = candidates.find((c) => c.blockId === blockId)
|
||||
|
||||
if (candidate) {
|
||||
if (triggerNeedsMockPayload(candidate)) {
|
||||
workflowInput = extractTriggerMockPayload(candidate)
|
||||
} else if (
|
||||
candidate.path === StartBlockPath.SPLIT_API ||
|
||||
candidate.path === StartBlockPath.SPLIT_INPUT ||
|
||||
candidate.path === StartBlockPath.UNIFIED
|
||||
) {
|
||||
const inputFormatValue = candidate.block.subBlocks?.inputFormat?.value
|
||||
if (Array.isArray(inputFormatValue)) {
|
||||
const testInput: Record<string, any> = {}
|
||||
inputFormatValue.forEach((field: any) => {
|
||||
if (field && typeof field === 'object' && field.name && field.value !== undefined) {
|
||||
testInput[field.name] = coerceValue(field.type, field.value)
|
||||
}
|
||||
})
|
||||
if (Object.keys(testInput).length > 0) {
|
||||
workflowInput = testInput
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Fallback: block is trigger by position but not classified as start candidate
|
||||
const block = mergedStates[blockId]
|
||||
if (block) {
|
||||
const blockConfig = getBlock(block.type)
|
||||
const hasTriggers = blockConfig?.triggers?.available?.length
|
||||
|
||||
if (hasTriggers || block.triggerMode) {
|
||||
workflowInput = extractTriggerMockPayload({
|
||||
blockId,
|
||||
block,
|
||||
path: StartBlockPath.EXTERNAL_TRIGGER,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
setIsExecuting(true)
|
||||
const executionId = uuidv4()
|
||||
const accumulatedBlockLogs: BlockLog[] = []
|
||||
const accumulatedBlockStates = new Map<string, BlockState>()
|
||||
const executedBlockIds = new Set<string>()
|
||||
const activeBlocksSet = new Set<string>()
|
||||
|
||||
try {
|
||||
await executionStream.executeFromBlock({
|
||||
workflowId,
|
||||
startBlockId: blockId,
|
||||
sourceSnapshot: effectiveSnapshot,
|
||||
input: workflowInput,
|
||||
callbacks: {
|
||||
onBlockStarted: (data) => {
|
||||
activeBlocksSet.add(data.blockId)
|
||||
setActiveBlocks(new Set(activeBlocksSet))
|
||||
|
||||
const incomingEdges = workflowEdges.filter((edge) => edge.target === data.blockId)
|
||||
incomingEdges.forEach((edge) => {
|
||||
setEdgeRunStatus(edge.id, 'success')
|
||||
})
|
||||
},
|
||||
|
||||
onBlockCompleted: (data) => {
|
||||
activeBlocksSet.delete(data.blockId)
|
||||
setActiveBlocks(new Set(activeBlocksSet))
|
||||
|
||||
setBlockRunStatus(data.blockId, 'success')
|
||||
|
||||
executedBlockIds.add(data.blockId)
|
||||
accumulatedBlockStates.set(data.blockId, {
|
||||
output: data.output,
|
||||
executed: true,
|
||||
executionTime: data.durationMs,
|
||||
})
|
||||
|
||||
const isContainerBlock = data.blockType === 'loop' || data.blockType === 'parallel'
|
||||
if (isContainerBlock) return
|
||||
|
||||
const startedAt = new Date(Date.now() - data.durationMs).toISOString()
|
||||
const endedAt = new Date().toISOString()
|
||||
|
||||
accumulatedBlockLogs.push({
|
||||
blockId: data.blockId,
|
||||
blockName: data.blockName || 'Unknown Block',
|
||||
blockType: data.blockType || 'unknown',
|
||||
input: data.input || {},
|
||||
output: data.output,
|
||||
success: true,
|
||||
durationMs: data.durationMs,
|
||||
startedAt,
|
||||
endedAt,
|
||||
})
|
||||
|
||||
addConsole({
|
||||
input: data.input || {},
|
||||
output: data.output,
|
||||
success: true,
|
||||
durationMs: data.durationMs,
|
||||
startedAt,
|
||||
endedAt,
|
||||
workflowId,
|
||||
blockId: data.blockId,
|
||||
executionId,
|
||||
blockName: data.blockName || 'Unknown Block',
|
||||
blockType: data.blockType || 'unknown',
|
||||
iterationCurrent: data.iterationCurrent,
|
||||
iterationTotal: data.iterationTotal,
|
||||
iterationType: data.iterationType,
|
||||
})
|
||||
},
|
||||
|
||||
onBlockError: (data) => {
|
||||
activeBlocksSet.delete(data.blockId)
|
||||
setActiveBlocks(new Set(activeBlocksSet))
|
||||
|
||||
setBlockRunStatus(data.blockId, 'error')
|
||||
|
||||
const startedAt = new Date(Date.now() - data.durationMs).toISOString()
|
||||
const endedAt = new Date().toISOString()
|
||||
|
||||
accumulatedBlockLogs.push({
|
||||
blockId: data.blockId,
|
||||
blockName: data.blockName || 'Unknown Block',
|
||||
blockType: data.blockType || 'unknown',
|
||||
input: data.input || {},
|
||||
output: {},
|
||||
success: false,
|
||||
error: data.error,
|
||||
durationMs: data.durationMs,
|
||||
startedAt,
|
||||
endedAt,
|
||||
})
|
||||
|
||||
addConsole({
|
||||
input: data.input || {},
|
||||
output: {},
|
||||
success: false,
|
||||
error: data.error,
|
||||
durationMs: data.durationMs,
|
||||
startedAt,
|
||||
endedAt,
|
||||
workflowId,
|
||||
blockId: data.blockId,
|
||||
executionId,
|
||||
blockName: data.blockName,
|
||||
blockType: data.blockType,
|
||||
iterationCurrent: data.iterationCurrent,
|
||||
iterationTotal: data.iterationTotal,
|
||||
iterationType: data.iterationType,
|
||||
})
|
||||
},
|
||||
|
||||
onExecutionCompleted: (data) => {
|
||||
if (data.success) {
|
||||
// Add the start block (trigger) to executed blocks
|
||||
executedBlockIds.add(blockId)
|
||||
|
||||
const mergedBlockStates: Record<string, BlockState> = {
|
||||
...effectiveSnapshot.blockStates,
|
||||
}
|
||||
for (const [bId, state] of accumulatedBlockStates) {
|
||||
mergedBlockStates[bId] = state
|
||||
}
|
||||
|
||||
const mergedExecutedBlocks = new Set([
|
||||
...effectiveSnapshot.executedBlocks,
|
||||
...executedBlockIds,
|
||||
])
|
||||
|
||||
const updatedSnapshot: SerializableExecutionState = {
|
||||
...effectiveSnapshot,
|
||||
blockStates: mergedBlockStates,
|
||||
executedBlocks: Array.from(mergedExecutedBlocks),
|
||||
blockLogs: [...effectiveSnapshot.blockLogs, ...accumulatedBlockLogs],
|
||||
activeExecutionPath: Array.from(mergedExecutedBlocks),
|
||||
}
|
||||
setLastExecutionSnapshot(workflowId, updatedSnapshot)
|
||||
}
|
||||
},
|
||||
|
||||
onExecutionError: (data) => {
|
||||
const isWorkflowModified =
|
||||
data.error?.includes('Block not found in workflow') ||
|
||||
data.error?.includes('Upstream dependency not executed')
|
||||
|
||||
if (isWorkflowModified) {
|
||||
clearLastExecutionSnapshot(workflowId)
|
||||
addNotification({
|
||||
level: 'error',
|
||||
message:
|
||||
'Workflow was modified. Run the workflow again to enable running from block.',
|
||||
workflowId,
|
||||
})
|
||||
} else {
|
||||
addNotification({
|
||||
level: 'error',
|
||||
message: data.error || 'Run from block failed',
|
||||
workflowId,
|
||||
})
|
||||
}
|
||||
},
|
||||
},
|
||||
})
|
||||
} catch (error) {
|
||||
if ((error as Error).name !== 'AbortError') {
|
||||
logger.error('Run-from-block failed:', error)
|
||||
}
|
||||
} finally {
|
||||
setIsExecuting(false)
|
||||
setActiveBlocks(new Set())
|
||||
}
|
||||
},
|
||||
[
|
||||
getLastExecutionSnapshot,
|
||||
setLastExecutionSnapshot,
|
||||
clearLastExecutionSnapshot,
|
||||
setIsExecuting,
|
||||
setActiveBlocks,
|
||||
setBlockRunStatus,
|
||||
setEdgeRunStatus,
|
||||
addNotification,
|
||||
addConsole,
|
||||
executionStream,
|
||||
]
|
||||
)
|
||||
|
||||
/**
|
||||
* Handles running workflow until a specific block (stops after that block completes)
|
||||
*/
|
||||
const handleRunUntilBlock = useCallback(
|
||||
async (blockId: string, workflowId: string) => {
|
||||
if (!workflowId || workflowId !== activeWorkflowId) {
|
||||
logger.error('Invalid workflow ID for run-until-block', { workflowId, activeWorkflowId })
|
||||
return
|
||||
}
|
||||
|
||||
logger.info('Starting run-until-block execution', { workflowId, stopAfterBlockId: blockId })
|
||||
|
||||
setExecutionResult(null)
|
||||
setIsExecuting(true)
|
||||
|
||||
const executionId = uuidv4()
|
||||
try {
|
||||
const result = await executeWorkflow(
|
||||
undefined,
|
||||
undefined,
|
||||
executionId,
|
||||
undefined,
|
||||
'manual',
|
||||
blockId
|
||||
)
|
||||
if (result && 'success' in result) {
|
||||
setExecutionResult(result)
|
||||
}
|
||||
} catch (error) {
|
||||
const errorResult = handleExecutionError(error, { executionId })
|
||||
return errorResult
|
||||
} finally {
|
||||
setIsExecuting(false)
|
||||
setIsDebugging(false)
|
||||
setActiveBlocks(new Set())
|
||||
}
|
||||
},
|
||||
[activeWorkflowId, setExecutionResult, setIsExecuting, setIsDebugging, setActiveBlocks]
|
||||
)
|
||||
|
||||
return {
|
||||
isExecuting,
|
||||
isDebugging,
|
||||
@@ -1772,7 +1386,5 @@ export function useWorkflowExecution() {
|
||||
handleResumeDebug,
|
||||
handleCancelDebug,
|
||||
handleCancelExecution,
|
||||
handleRunFromBlock,
|
||||
handleRunUntilBlock,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -47,7 +47,6 @@ import {
|
||||
useCurrentWorkflow,
|
||||
useNodeUtilities,
|
||||
useShiftSelectionLock,
|
||||
useWorkflowExecution,
|
||||
} from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks'
|
||||
import {
|
||||
calculateContainerDimensions,
|
||||
@@ -303,8 +302,6 @@ const WorkflowContent = React.memo(() => {
|
||||
|
||||
const showTrainingModal = useCopilotTrainingStore((state) => state.showModal)
|
||||
|
||||
const { handleRunFromBlock, handleRunUntilBlock } = useWorkflowExecution()
|
||||
|
||||
const snapToGridSize = useSnapToGridSize()
|
||||
const snapToGrid = snapToGridSize > 0
|
||||
|
||||
@@ -736,16 +733,13 @@ const WorkflowContent = React.memo(() => {
|
||||
[collaborativeBatchAddBlocks, setSelectedEdges, setPendingSelection]
|
||||
)
|
||||
|
||||
const { activeBlockIds, pendingBlocks, isDebugging, isExecuting, getLastExecutionSnapshot } =
|
||||
useExecutionStore(
|
||||
useShallow((state) => ({
|
||||
activeBlockIds: state.activeBlockIds,
|
||||
pendingBlocks: state.pendingBlocks,
|
||||
isDebugging: state.isDebugging,
|
||||
isExecuting: state.isExecuting,
|
||||
getLastExecutionSnapshot: state.getLastExecutionSnapshot,
|
||||
}))
|
||||
)
|
||||
const { activeBlockIds, pendingBlocks, isDebugging } = useExecutionStore(
|
||||
useShallow((state) => ({
|
||||
activeBlockIds: state.activeBlockIds,
|
||||
pendingBlocks: state.pendingBlocks,
|
||||
isDebugging: state.isDebugging,
|
||||
}))
|
||||
)
|
||||
|
||||
const [dragStartParentId, setDragStartParentId] = useState<string | null>(null)
|
||||
|
||||
@@ -1108,50 +1102,6 @@ const WorkflowContent = React.memo(() => {
|
||||
}
|
||||
}, [contextMenuBlocks])
|
||||
|
||||
const handleContextRunFromBlock = useCallback(() => {
|
||||
if (contextMenuBlocks.length !== 1) return
|
||||
const blockId = contextMenuBlocks[0].id
|
||||
handleRunFromBlock(blockId, workflowIdParam)
|
||||
}, [contextMenuBlocks, workflowIdParam, handleRunFromBlock])
|
||||
|
||||
const handleContextRunUntilBlock = useCallback(() => {
|
||||
if (contextMenuBlocks.length !== 1) return
|
||||
const blockId = contextMenuBlocks[0].id
|
||||
handleRunUntilBlock(blockId, workflowIdParam)
|
||||
}, [contextMenuBlocks, workflowIdParam, handleRunUntilBlock])
|
||||
|
||||
const runFromBlockState = useMemo(() => {
|
||||
if (contextMenuBlocks.length !== 1) {
|
||||
return { canRun: false, reason: undefined }
|
||||
}
|
||||
const block = contextMenuBlocks[0]
|
||||
const snapshot = getLastExecutionSnapshot(workflowIdParam)
|
||||
const incomingEdges = edges.filter((edge) => edge.target === block.id)
|
||||
const isTriggerBlock = incomingEdges.length === 0
|
||||
|
||||
// Check if each source block is either executed OR is a trigger block (triggers don't need prior execution)
|
||||
const isSourceSatisfied = (sourceId: string) => {
|
||||
if (snapshot?.executedBlocks.includes(sourceId)) return true
|
||||
// Check if source is a trigger (has no incoming edges itself)
|
||||
const sourceIncomingEdges = edges.filter((edge) => edge.target === sourceId)
|
||||
return sourceIncomingEdges.length === 0
|
||||
}
|
||||
|
||||
// Non-trigger blocks need a snapshot to exist (so upstream outputs are available)
|
||||
const dependenciesSatisfied =
|
||||
isTriggerBlock || (snapshot && incomingEdges.every((edge) => isSourceSatisfied(edge.source)))
|
||||
const isNoteBlock = block.type === 'note'
|
||||
const isInsideSubflow =
|
||||
block.parentId && (block.parentType === 'loop' || block.parentType === 'parallel')
|
||||
|
||||
if (isInsideSubflow) return { canRun: false, reason: 'Cannot run from inside subflow' }
|
||||
if (!dependenciesSatisfied) return { canRun: false, reason: 'Run upstream blocks first' }
|
||||
if (isNoteBlock) return { canRun: false, reason: undefined }
|
||||
if (isExecuting) return { canRun: false, reason: undefined }
|
||||
|
||||
return { canRun: true, reason: undefined }
|
||||
}, [contextMenuBlocks, edges, workflowIdParam, getLastExecutionSnapshot, isExecuting])
|
||||
|
||||
const handleContextAddBlock = useCallback(() => {
|
||||
useSearchModalStore.getState().open()
|
||||
}, [])
|
||||
@@ -2352,12 +2302,33 @@ const WorkflowContent = React.memo(() => {
|
||||
window.removeEventListener('remove-from-subflow', handleRemoveFromSubflow as EventListener)
|
||||
}, [blocks, edgesForDisplay, getNodeAbsolutePosition, collaborativeBatchUpdateParent])
|
||||
|
||||
/** Handles node changes - applies changes and resolves parent-child selection conflicts. */
|
||||
const onNodesChange = useCallback(
|
||||
(changes: NodeChange[]) => {
|
||||
selectedIdsRef.current = null
|
||||
setDisplayNodes((nds) => {
|
||||
const updated = applyNodeChanges(changes, nds)
|
||||
const hasSelectionChange = changes.some((c) => c.type === 'select')
|
||||
if (!hasSelectionChange) return updated
|
||||
const resolved = resolveParentChildSelectionConflicts(updated, blocks)
|
||||
selectedIdsRef.current = resolved.filter((node) => node.selected).map((node) => node.id)
|
||||
return resolved
|
||||
})
|
||||
const selectedIds = selectedIdsRef.current as string[] | null
|
||||
if (selectedIds !== null) {
|
||||
syncPanelWithSelection(selectedIds)
|
||||
}
|
||||
},
|
||||
[blocks]
|
||||
)
|
||||
|
||||
/**
|
||||
* Updates container dimensions in displayNodes during drag or keyboard movement.
|
||||
* Updates container dimensions in displayNodes during drag.
|
||||
* This allows live resizing of containers as their children are dragged.
|
||||
*/
|
||||
const updateContainerDimensionsDuringMove = useCallback(
|
||||
(movedNodeId: string, movedNodePosition: { x: number; y: number }) => {
|
||||
const parentId = blocks[movedNodeId]?.data?.parentId
|
||||
const updateContainerDimensionsDuringDrag = useCallback(
|
||||
(draggedNodeId: string, draggedNodePosition: { x: number; y: number }) => {
|
||||
const parentId = blocks[draggedNodeId]?.data?.parentId
|
||||
if (!parentId) return
|
||||
|
||||
setDisplayNodes((currentNodes) => {
|
||||
@@ -2365,7 +2336,7 @@ const WorkflowContent = React.memo(() => {
|
||||
if (childNodes.length === 0) return currentNodes
|
||||
|
||||
const childPositions = childNodes.map((node) => {
|
||||
const nodePosition = node.id === movedNodeId ? movedNodePosition : node.position
|
||||
const nodePosition = node.id === draggedNodeId ? draggedNodePosition : node.position
|
||||
const { width, height } = getBlockDimensions(node.id)
|
||||
return { x: nodePosition.x, y: nodePosition.y, width, height }
|
||||
})
|
||||
@@ -2396,55 +2367,6 @@ const WorkflowContent = React.memo(() => {
|
||||
[blocks, getBlockDimensions]
|
||||
)
|
||||
|
||||
/** Handles node changes - applies changes and resolves parent-child selection conflicts. */
|
||||
const onNodesChange = useCallback(
|
||||
(changes: NodeChange[]) => {
|
||||
selectedIdsRef.current = null
|
||||
setDisplayNodes((nds) => {
|
||||
const updated = applyNodeChanges(changes, nds)
|
||||
const hasSelectionChange = changes.some((c) => c.type === 'select')
|
||||
if (!hasSelectionChange) return updated
|
||||
const resolved = resolveParentChildSelectionConflicts(updated, blocks)
|
||||
selectedIdsRef.current = resolved.filter((node) => node.selected).map((node) => node.id)
|
||||
return resolved
|
||||
})
|
||||
const selectedIds = selectedIdsRef.current as string[] | null
|
||||
if (selectedIds !== null) {
|
||||
syncPanelWithSelection(selectedIds)
|
||||
}
|
||||
|
||||
// Handle position changes (e.g., from keyboard arrow key movement)
|
||||
// Update container dimensions when child nodes are moved and persist to backend
|
||||
// Only persist if not in a drag operation (drag-end is handled by onNodeDragStop)
|
||||
const isInDragOperation =
|
||||
getDragStartPosition() !== null || multiNodeDragStartRef.current.size > 0
|
||||
const keyboardPositionUpdates: Array<{ id: string; position: { x: number; y: number } }> = []
|
||||
for (const change of changes) {
|
||||
if (
|
||||
change.type === 'position' &&
|
||||
!change.dragging &&
|
||||
'position' in change &&
|
||||
change.position
|
||||
) {
|
||||
updateContainerDimensionsDuringMove(change.id, change.position)
|
||||
if (!isInDragOperation) {
|
||||
keyboardPositionUpdates.push({ id: change.id, position: change.position })
|
||||
}
|
||||
}
|
||||
}
|
||||
// Persist keyboard movements to backend for collaboration sync
|
||||
if (keyboardPositionUpdates.length > 0) {
|
||||
collaborativeBatchUpdatePositions(keyboardPositionUpdates)
|
||||
}
|
||||
},
|
||||
[
|
||||
blocks,
|
||||
updateContainerDimensionsDuringMove,
|
||||
collaborativeBatchUpdatePositions,
|
||||
getDragStartPosition,
|
||||
]
|
||||
)
|
||||
|
||||
/**
|
||||
* Effect to resize loops when nodes change (add/remove/position change).
|
||||
* Runs on structural changes only - not during drag (position-only changes).
|
||||
@@ -2689,7 +2611,7 @@ const WorkflowContent = React.memo(() => {
|
||||
|
||||
// If the node is inside a container, update container dimensions during drag
|
||||
if (currentParentId) {
|
||||
updateContainerDimensionsDuringMove(node.id, node.position)
|
||||
updateContainerDimensionsDuringDrag(node.id, node.position)
|
||||
}
|
||||
|
||||
// Check if this is a starter block - starter blocks should never be in containers
|
||||
@@ -2806,7 +2728,7 @@ const WorkflowContent = React.memo(() => {
|
||||
blocks,
|
||||
getNodeAbsolutePosition,
|
||||
getNodeDepth,
|
||||
updateContainerDimensionsDuringMove,
|
||||
updateContainerDimensionsDuringDrag,
|
||||
highlightContainerNode,
|
||||
]
|
||||
)
|
||||
@@ -3496,19 +3418,11 @@ const WorkflowContent = React.memo(() => {
|
||||
onRemoveFromSubflow={handleContextRemoveFromSubflow}
|
||||
onOpenEditor={handleContextOpenEditor}
|
||||
onRename={handleContextRename}
|
||||
onRunFromBlock={handleContextRunFromBlock}
|
||||
onRunUntilBlock={handleContextRunUntilBlock}
|
||||
hasClipboard={hasClipboard()}
|
||||
showRemoveFromSubflow={contextMenuBlocks.some(
|
||||
(b) => b.parentId && (b.parentType === 'loop' || b.parentType === 'parallel')
|
||||
)}
|
||||
canRunFromBlock={runFromBlockState.canRun}
|
||||
disableEdit={!effectivePermissions.canEdit}
|
||||
isExecuting={isExecuting}
|
||||
isPositionalTrigger={
|
||||
contextMenuBlocks.length === 1 &&
|
||||
edges.filter((e) => e.target === contextMenuBlocks[0]?.id).length === 0
|
||||
}
|
||||
/>
|
||||
|
||||
<CanvasMenu
|
||||
|
||||
@@ -2,10 +2,9 @@
|
||||
|
||||
import { useCallback, useEffect, useMemo, useRef, useState } from 'react'
|
||||
import { Command } from 'cmdk'
|
||||
import { Database, HelpCircle, Layout, Settings } from 'lucide-react'
|
||||
import { BookOpen, Layout, ScrollText } from 'lucide-react'
|
||||
import { useParams, useRouter } from 'next/navigation'
|
||||
import { createPortal } from 'react-dom'
|
||||
import { Library } from '@/components/emcn'
|
||||
import { useBrandConfig } from '@/lib/branding/branding'
|
||||
import { cn } from '@/lib/core/utils/cn'
|
||||
import { hasTriggerCapability } from '@/lib/workflows/triggers/trigger-utils'
|
||||
@@ -16,7 +15,6 @@ import type {
|
||||
SearchDocItem,
|
||||
SearchToolOperationItem,
|
||||
} from '@/stores/modals/search/types'
|
||||
import { useSettingsModalStore } from '@/stores/modals/settings/store'
|
||||
|
||||
interface SearchModalProps {
|
||||
open: boolean
|
||||
@@ -45,8 +43,7 @@ interface PageItem {
|
||||
id: string
|
||||
name: string
|
||||
icon: React.ComponentType<{ className?: string }>
|
||||
href?: string
|
||||
onClick?: () => void
|
||||
href: string
|
||||
shortcut?: string
|
||||
}
|
||||
|
||||
@@ -64,7 +61,6 @@ export function SearchModal({
|
||||
const inputRef = useRef<HTMLInputElement>(null)
|
||||
const [search, setSearch] = useState('')
|
||||
const [mounted, setMounted] = useState(false)
|
||||
const openSettingsModal = useSettingsModalStore((state) => state.openModal)
|
||||
|
||||
useEffect(() => {
|
||||
setMounted(true)
|
||||
@@ -74,16 +70,12 @@ export function SearchModal({
|
||||
(state) => state.data
|
||||
)
|
||||
|
||||
const openHelpModal = useCallback(() => {
|
||||
window.dispatchEvent(new CustomEvent('open-help-modal'))
|
||||
}, [])
|
||||
|
||||
const pages = useMemo(
|
||||
(): PageItem[] => [
|
||||
{
|
||||
id: 'logs',
|
||||
name: 'Logs',
|
||||
icon: Library,
|
||||
icon: ScrollText,
|
||||
href: `/workspace/${workspaceId}/logs`,
|
||||
shortcut: '⌘⇧L',
|
||||
},
|
||||
@@ -94,26 +86,13 @@ export function SearchModal({
|
||||
href: `/workspace/${workspaceId}/templates`,
|
||||
},
|
||||
{
|
||||
id: 'knowledge-base',
|
||||
name: 'Knowledge Base',
|
||||
icon: Database,
|
||||
href: `/workspace/${workspaceId}/knowledge`,
|
||||
},
|
||||
{
|
||||
id: 'help',
|
||||
name: 'Help',
|
||||
icon: HelpCircle,
|
||||
onClick: openHelpModal,
|
||||
},
|
||||
{
|
||||
id: 'settings',
|
||||
name: 'Settings',
|
||||
icon: Settings,
|
||||
onClick: openSettingsModal,
|
||||
shortcut: '⌘,',
|
||||
id: 'docs',
|
||||
name: 'Docs',
|
||||
icon: BookOpen,
|
||||
href: brand.documentationUrl || 'https://docs.sim.ai/',
|
||||
},
|
||||
],
|
||||
[workspaceId, openHelpModal, openSettingsModal]
|
||||
[workspaceId, brand.documentationUrl]
|
||||
)
|
||||
|
||||
useEffect(() => {
|
||||
@@ -200,14 +179,10 @@ export function SearchModal({
|
||||
|
||||
const handlePageSelect = useCallback(
|
||||
(page: PageItem) => {
|
||||
if (page.onClick) {
|
||||
page.onClick()
|
||||
} else if (page.href) {
|
||||
if (page.href.startsWith('http')) {
|
||||
window.open(page.href, '_blank', 'noopener,noreferrer')
|
||||
} else {
|
||||
router.push(page.href)
|
||||
}
|
||||
if (page.href.startsWith('http')) {
|
||||
window.open(page.href, '_blank', 'noopener,noreferrer')
|
||||
} else {
|
||||
router.push(page.href)
|
||||
}
|
||||
onOpenChange(false)
|
||||
},
|
||||
@@ -294,7 +269,7 @@ export function SearchModal({
|
||||
{blocks.map((block) => (
|
||||
<CommandItem
|
||||
key={block.id}
|
||||
value={`${block.name} block-${block.id}`}
|
||||
value={block.name}
|
||||
keywords={[block.description]}
|
||||
onSelect={() => handleBlockSelect(block, 'block')}
|
||||
icon={block.icon}
|
||||
@@ -312,7 +287,7 @@ export function SearchModal({
|
||||
{tools.map((tool) => (
|
||||
<CommandItem
|
||||
key={tool.id}
|
||||
value={`${tool.name} tool-${tool.id}`}
|
||||
value={tool.name}
|
||||
keywords={[tool.description]}
|
||||
onSelect={() => handleBlockSelect(tool, 'tool')}
|
||||
icon={tool.icon}
|
||||
@@ -330,7 +305,7 @@ export function SearchModal({
|
||||
{triggers.map((trigger) => (
|
||||
<CommandItem
|
||||
key={trigger.id}
|
||||
value={`${trigger.name} trigger-${trigger.id}`}
|
||||
value={trigger.name}
|
||||
keywords={[trigger.description]}
|
||||
onSelect={() => handleBlockSelect(trigger, 'trigger')}
|
||||
icon={trigger.icon}
|
||||
@@ -348,7 +323,7 @@ export function SearchModal({
|
||||
{workflows.map((workflow) => (
|
||||
<Command.Item
|
||||
key={workflow.id}
|
||||
value={`${workflow.name} workflow-${workflow.id}`}
|
||||
value={workflow.name}
|
||||
onSelect={() => handleWorkflowSelect(workflow)}
|
||||
className='group flex h-[28px] w-full cursor-pointer items-center gap-[8px] rounded-[6px] px-[10px] text-left text-[15px] aria-selected:bg-[var(--border)] aria-selected:shadow-sm data-[disabled=true]:pointer-events-none data-[disabled=true]:opacity-50'
|
||||
>
|
||||
@@ -370,7 +345,7 @@ export function SearchModal({
|
||||
{toolOperations.map((op) => (
|
||||
<CommandItem
|
||||
key={op.id}
|
||||
value={`${op.searchValue} operation-${op.id}`}
|
||||
value={op.searchValue}
|
||||
keywords={op.keywords}
|
||||
onSelect={() => handleToolOperationSelect(op)}
|
||||
icon={op.icon}
|
||||
@@ -388,7 +363,7 @@ export function SearchModal({
|
||||
{workspaces.map((workspace) => (
|
||||
<Command.Item
|
||||
key={workspace.id}
|
||||
value={`${workspace.name} workspace-${workspace.id}`}
|
||||
value={workspace.name}
|
||||
onSelect={() => handleWorkspaceSelect(workspace)}
|
||||
className='group flex h-[28px] w-full cursor-pointer items-center gap-[8px] rounded-[6px] px-[10px] text-left text-[15px] aria-selected:bg-[var(--border)] aria-selected:shadow-sm data-[disabled=true]:pointer-events-none data-[disabled=true]:opacity-50'
|
||||
>
|
||||
@@ -406,7 +381,7 @@ export function SearchModal({
|
||||
{docs.map((doc) => (
|
||||
<CommandItem
|
||||
key={doc.id}
|
||||
value={`${doc.name} docs documentation doc-${doc.id}`}
|
||||
value={`${doc.name} docs documentation`}
|
||||
onSelect={() => handleDocSelect(doc)}
|
||||
icon={doc.icon}
|
||||
bgColor='#6B7280'
|
||||
@@ -425,7 +400,7 @@ export function SearchModal({
|
||||
return (
|
||||
<Command.Item
|
||||
key={page.id}
|
||||
value={`${page.name} page-${page.id}`}
|
||||
value={page.name}
|
||||
onSelect={() => handlePageSelect(page)}
|
||||
className='group flex h-[28px] w-full cursor-pointer items-center gap-[8px] rounded-[6px] px-[10px] text-left text-[15px] aria-selected:bg-[var(--border)] aria-selected:shadow-sm data-[disabled=true]:pointer-events-none data-[disabled=true]:opacity-50'
|
||||
>
|
||||
|
||||
@@ -33,15 +33,6 @@ export interface DAG {
|
||||
parallelConfigs: Map<string, SerializedParallel>
|
||||
}
|
||||
|
||||
export interface DAGBuildOptions {
|
||||
/** Trigger block ID to start path construction from */
|
||||
triggerBlockId?: string
|
||||
/** Saved incoming edges from snapshot for resumption */
|
||||
savedIncomingEdges?: Record<string, string[]>
|
||||
/** Include all enabled blocks instead of only those reachable from trigger */
|
||||
includeAllBlocks?: boolean
|
||||
}
|
||||
|
||||
export class DAGBuilder {
|
||||
private pathConstructor = new PathConstructor()
|
||||
private loopConstructor = new LoopConstructor()
|
||||
@@ -49,9 +40,11 @@ export class DAGBuilder {
|
||||
private nodeConstructor = new NodeConstructor()
|
||||
private edgeConstructor = new EdgeConstructor()
|
||||
|
||||
build(workflow: SerializedWorkflow, options: DAGBuildOptions = {}): DAG {
|
||||
const { triggerBlockId, savedIncomingEdges, includeAllBlocks } = options
|
||||
|
||||
build(
|
||||
workflow: SerializedWorkflow,
|
||||
triggerBlockId?: string,
|
||||
savedIncomingEdges?: Record<string, string[]>
|
||||
): DAG {
|
||||
const dag: DAG = {
|
||||
nodes: new Map(),
|
||||
loopConfigs: new Map(),
|
||||
@@ -60,7 +53,7 @@ export class DAGBuilder {
|
||||
|
||||
this.initializeConfigs(workflow, dag)
|
||||
|
||||
const reachableBlocks = this.pathConstructor.execute(workflow, triggerBlockId, includeAllBlocks)
|
||||
const reachableBlocks = this.pathConstructor.execute(workflow, triggerBlockId)
|
||||
|
||||
this.loopConstructor.execute(dag, reachableBlocks)
|
||||
this.parallelConstructor.execute(dag, reachableBlocks)
|
||||
|
||||
@@ -207,7 +207,6 @@ export class EdgeConstructor {
|
||||
for (const connection of workflow.connections) {
|
||||
let { source, target } = connection
|
||||
const originalSource = source
|
||||
const originalTarget = target
|
||||
let sourceHandle = this.generateSourceHandle(
|
||||
source,
|
||||
target,
|
||||
@@ -258,12 +257,12 @@ export class EdgeConstructor {
|
||||
target = sentinelStartId
|
||||
}
|
||||
|
||||
if (this.edgeCrossesLoopBoundary(source, target, blocksInLoops, dag)) {
|
||||
continue
|
||||
if (loopSentinelStartId) {
|
||||
this.addEdge(dag, loopSentinelStartId, target, EDGE.LOOP_EXIT, targetHandle)
|
||||
}
|
||||
|
||||
if (loopSentinelStartId && !blocksInLoops.has(originalTarget)) {
|
||||
this.addEdge(dag, loopSentinelStartId, target, EDGE.LOOP_EXIT, targetHandle)
|
||||
if (this.edgeCrossesLoopBoundary(source, target, blocksInLoops, dag)) {
|
||||
continue
|
||||
}
|
||||
|
||||
if (!this.isEdgeReachable(source, target, reachableBlocks, dag)) {
|
||||
|
||||
@@ -6,16 +6,7 @@ import type { SerializedBlock, SerializedWorkflow } from '@/serializer/types'
|
||||
const logger = createLogger('PathConstructor')
|
||||
|
||||
export class PathConstructor {
|
||||
execute(
|
||||
workflow: SerializedWorkflow,
|
||||
triggerBlockId?: string,
|
||||
includeAllBlocks?: boolean
|
||||
): Set<string> {
|
||||
// For run-from-block mode, include all enabled blocks regardless of trigger reachability
|
||||
if (includeAllBlocks) {
|
||||
return this.getAllEnabledBlocks(workflow)
|
||||
}
|
||||
|
||||
execute(workflow: SerializedWorkflow, triggerBlockId?: string): Set<string> {
|
||||
const resolvedTriggerId = this.findTriggerBlock(workflow, triggerBlockId)
|
||||
|
||||
if (!resolvedTriggerId) {
|
||||
|
||||
@@ -77,16 +77,15 @@ export class EdgeManager {
|
||||
}
|
||||
}
|
||||
|
||||
if (output.selectedRoute !== EDGE.LOOP_EXIT && output.selectedRoute !== EDGE.PARALLEL_EXIT) {
|
||||
for (const { target } of edgesToDeactivate) {
|
||||
if (
|
||||
!readyNodes.includes(target) &&
|
||||
!activatedTargets.includes(target) &&
|
||||
this.nodesWithActivatedEdge.has(target) &&
|
||||
this.isTargetReady(target)
|
||||
) {
|
||||
readyNodes.push(target)
|
||||
}
|
||||
// Check if any deactivation targets that previously received an activated edge are now ready
|
||||
for (const { target } of edgesToDeactivate) {
|
||||
if (
|
||||
!readyNodes.includes(target) &&
|
||||
!activatedTargets.includes(target) &&
|
||||
this.nodesWithActivatedEdge.has(target) &&
|
||||
this.isTargetReady(target)
|
||||
) {
|
||||
readyNodes.push(target)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -26,7 +26,6 @@ export class ExecutionEngine {
|
||||
private allowResumeTriggers: boolean
|
||||
private cancelledFlag = false
|
||||
private errorFlag = false
|
||||
private stoppedEarlyFlag = false
|
||||
private executionError: Error | null = null
|
||||
private lastCancellationCheck = 0
|
||||
private readonly useRedisCancellation: boolean
|
||||
@@ -106,7 +105,7 @@ export class ExecutionEngine {
|
||||
this.initializeQueue(triggerBlockId)
|
||||
|
||||
while (this.hasWork()) {
|
||||
if ((await this.checkCancellation()) || this.errorFlag || this.stoppedEarlyFlag) {
|
||||
if ((await this.checkCancellation()) || this.errorFlag) {
|
||||
break
|
||||
}
|
||||
await this.processQueue()
|
||||
@@ -260,16 +259,6 @@ export class ExecutionEngine {
|
||||
}
|
||||
|
||||
private initializeQueue(triggerBlockId?: string): void {
|
||||
if (this.context.runFromBlockContext) {
|
||||
const { startBlockId } = this.context.runFromBlockContext
|
||||
logger.info('Initializing queue for run-from-block mode', {
|
||||
startBlockId,
|
||||
dirtySetSize: this.context.runFromBlockContext.dirtySet.size,
|
||||
})
|
||||
this.addToQueue(startBlockId)
|
||||
return
|
||||
}
|
||||
|
||||
const pendingBlocks = this.context.metadata.pendingBlocks
|
||||
const remainingEdges = (this.context.metadata as any).remainingEdges
|
||||
|
||||
@@ -396,28 +385,11 @@ export class ExecutionEngine {
|
||||
this.finalOutput = output
|
||||
}
|
||||
|
||||
if (this.context.stopAfterBlockId === nodeId) {
|
||||
// For loop/parallel sentinels, only stop if the subflow has fully exited (all iterations done)
|
||||
// shouldContinue: true means more iterations, shouldExit: true means loop is done
|
||||
const shouldContinueLoop = output.shouldContinue === true
|
||||
if (!shouldContinueLoop) {
|
||||
logger.info('Stopping execution after target block', { nodeId })
|
||||
this.stoppedEarlyFlag = true
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
const readyNodes = this.edgeManager.processOutgoingEdges(node, output, false)
|
||||
|
||||
logger.info('Processing outgoing edges', {
|
||||
nodeId,
|
||||
outgoingEdgesCount: node.outgoingEdges.size,
|
||||
outgoingEdges: Array.from(node.outgoingEdges.entries()).map(([id, e]) => ({
|
||||
id,
|
||||
target: e.target,
|
||||
sourceHandle: e.sourceHandle,
|
||||
})),
|
||||
output,
|
||||
readyNodesCount: readyNodes.length,
|
||||
readyNodes,
|
||||
})
|
||||
|
||||
@@ -5,31 +5,17 @@ import { BlockExecutor } from '@/executor/execution/block-executor'
|
||||
import { EdgeManager } from '@/executor/execution/edge-manager'
|
||||
import { ExecutionEngine } from '@/executor/execution/engine'
|
||||
import { ExecutionState } from '@/executor/execution/state'
|
||||
import type {
|
||||
ContextExtensions,
|
||||
SerializableExecutionState,
|
||||
WorkflowInput,
|
||||
} from '@/executor/execution/types'
|
||||
import type { ContextExtensions, WorkflowInput } from '@/executor/execution/types'
|
||||
import { createBlockHandlers } from '@/executor/handlers/registry'
|
||||
import { LoopOrchestrator } from '@/executor/orchestrators/loop'
|
||||
import { NodeExecutionOrchestrator } from '@/executor/orchestrators/node'
|
||||
import { ParallelOrchestrator } from '@/executor/orchestrators/parallel'
|
||||
import type { BlockState, ExecutionContext, ExecutionResult } from '@/executor/types'
|
||||
import {
|
||||
computeExecutionSets,
|
||||
type RunFromBlockContext,
|
||||
resolveContainerToSentinelStart,
|
||||
validateRunFromBlock,
|
||||
} from '@/executor/utils/run-from-block'
|
||||
import {
|
||||
buildResolutionFromBlock,
|
||||
buildStartBlockOutput,
|
||||
resolveExecutorStartBlock,
|
||||
} from '@/executor/utils/start-block'
|
||||
import {
|
||||
extractLoopIdFromSentinel,
|
||||
extractParallelIdFromSentinel,
|
||||
} from '@/executor/utils/subflow-utils'
|
||||
import { VariableResolver } from '@/executor/variables/resolver'
|
||||
import type { SerializedWorkflow } from '@/serializer/types'
|
||||
|
||||
@@ -62,10 +48,7 @@ export class DAGExecutor {
|
||||
|
||||
async execute(workflowId: string, triggerBlockId?: string): Promise<ExecutionResult> {
|
||||
const savedIncomingEdges = this.contextExtensions.dagIncomingEdges
|
||||
const dag = this.dagBuilder.build(this.workflow, {
|
||||
triggerBlockId,
|
||||
savedIncomingEdges,
|
||||
})
|
||||
const dag = this.dagBuilder.build(this.workflow, triggerBlockId, savedIncomingEdges)
|
||||
const { context, state } = this.createExecutionContext(workflowId, triggerBlockId)
|
||||
|
||||
const resolver = new VariableResolver(this.workflow, this.workflowVariables, state)
|
||||
@@ -106,156 +89,17 @@ export class DAGExecutor {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute from a specific block using cached outputs for upstream blocks.
|
||||
*/
|
||||
async executeFromBlock(
|
||||
workflowId: string,
|
||||
startBlockId: string,
|
||||
sourceSnapshot: SerializableExecutionState
|
||||
): Promise<ExecutionResult> {
|
||||
// Build full DAG with all blocks to compute upstream set for snapshot filtering
|
||||
// includeAllBlocks is needed because the startBlockId might be a trigger not reachable from the main trigger
|
||||
const dag = this.dagBuilder.build(this.workflow, { includeAllBlocks: true })
|
||||
|
||||
const executedBlocks = new Set(sourceSnapshot.executedBlocks)
|
||||
const validation = validateRunFromBlock(startBlockId, dag, executedBlocks)
|
||||
if (!validation.valid) {
|
||||
throw new Error(validation.error)
|
||||
}
|
||||
|
||||
const { dirtySet, upstreamSet, reachableUpstreamSet } = computeExecutionSets(dag, startBlockId)
|
||||
const effectiveStartBlockId = resolveContainerToSentinelStart(startBlockId, dag) ?? startBlockId
|
||||
|
||||
// Extract container IDs from sentinel IDs in reachable upstream set
|
||||
// Use reachableUpstreamSet (not upstreamSet) to preserve sibling branch outputs
|
||||
// Example: A->C, B->C where C references A.result || B.result
|
||||
// When running from A, B's output should be preserved for C to reference
|
||||
const reachableContainerIds = new Set<string>()
|
||||
for (const nodeId of reachableUpstreamSet) {
|
||||
const loopId = extractLoopIdFromSentinel(nodeId)
|
||||
if (loopId) reachableContainerIds.add(loopId)
|
||||
const parallelId = extractParallelIdFromSentinel(nodeId)
|
||||
if (parallelId) reachableContainerIds.add(parallelId)
|
||||
}
|
||||
|
||||
// Filter snapshot to include all blocks reachable from dirty blocks
|
||||
// This preserves sibling branch outputs that dirty blocks may reference
|
||||
const filteredBlockStates: Record<string, any> = {}
|
||||
for (const [blockId, state] of Object.entries(sourceSnapshot.blockStates)) {
|
||||
if (reachableUpstreamSet.has(blockId) || reachableContainerIds.has(blockId)) {
|
||||
filteredBlockStates[blockId] = state
|
||||
}
|
||||
}
|
||||
const filteredExecutedBlocks = sourceSnapshot.executedBlocks.filter(
|
||||
(id) => reachableUpstreamSet.has(id) || reachableContainerIds.has(id)
|
||||
)
|
||||
|
||||
// Filter loop/parallel executions to only include reachable containers
|
||||
const filteredLoopExecutions: Record<string, any> = {}
|
||||
if (sourceSnapshot.loopExecutions) {
|
||||
for (const [loopId, execution] of Object.entries(sourceSnapshot.loopExecutions)) {
|
||||
if (reachableContainerIds.has(loopId)) {
|
||||
filteredLoopExecutions[loopId] = execution
|
||||
}
|
||||
}
|
||||
}
|
||||
const filteredParallelExecutions: Record<string, any> = {}
|
||||
if (sourceSnapshot.parallelExecutions) {
|
||||
for (const [parallelId, execution] of Object.entries(sourceSnapshot.parallelExecutions)) {
|
||||
if (reachableContainerIds.has(parallelId)) {
|
||||
filteredParallelExecutions[parallelId] = execution
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const filteredSnapshot: SerializableExecutionState = {
|
||||
...sourceSnapshot,
|
||||
blockStates: filteredBlockStates,
|
||||
executedBlocks: filteredExecutedBlocks,
|
||||
loopExecutions: filteredLoopExecutions,
|
||||
parallelExecutions: filteredParallelExecutions,
|
||||
}
|
||||
|
||||
logger.info('Executing from block', {
|
||||
workflowId,
|
||||
startBlockId,
|
||||
effectiveStartBlockId,
|
||||
dirtySetSize: dirtySet.size,
|
||||
upstreamSetSize: upstreamSet.size,
|
||||
reachableUpstreamSetSize: reachableUpstreamSet.size,
|
||||
})
|
||||
|
||||
// Remove incoming edges from non-dirty sources so convergent blocks don't wait for cached upstream
|
||||
for (const nodeId of dirtySet) {
|
||||
const node = dag.nodes.get(nodeId)
|
||||
if (!node) continue
|
||||
|
||||
const nonDirtyIncoming: string[] = []
|
||||
for (const sourceId of node.incomingEdges) {
|
||||
if (!dirtySet.has(sourceId)) {
|
||||
nonDirtyIncoming.push(sourceId)
|
||||
}
|
||||
}
|
||||
|
||||
for (const sourceId of nonDirtyIncoming) {
|
||||
node.incomingEdges.delete(sourceId)
|
||||
}
|
||||
}
|
||||
|
||||
const runFromBlockContext = { startBlockId: effectiveStartBlockId, dirtySet }
|
||||
const { context, state } = this.createExecutionContext(workflowId, undefined, {
|
||||
snapshotState: filteredSnapshot,
|
||||
runFromBlockContext,
|
||||
})
|
||||
|
||||
const resolver = new VariableResolver(this.workflow, this.workflowVariables, state)
|
||||
const loopOrchestrator = new LoopOrchestrator(dag, state, resolver)
|
||||
loopOrchestrator.setContextExtensions(this.contextExtensions)
|
||||
const parallelOrchestrator = new ParallelOrchestrator(dag, state)
|
||||
parallelOrchestrator.setResolver(resolver)
|
||||
parallelOrchestrator.setContextExtensions(this.contextExtensions)
|
||||
const allHandlers = createBlockHandlers()
|
||||
const blockExecutor = new BlockExecutor(allHandlers, resolver, this.contextExtensions, state)
|
||||
const edgeManager = new EdgeManager(dag)
|
||||
loopOrchestrator.setEdgeManager(edgeManager)
|
||||
const nodeOrchestrator = new NodeExecutionOrchestrator(
|
||||
dag,
|
||||
state,
|
||||
blockExecutor,
|
||||
loopOrchestrator,
|
||||
parallelOrchestrator
|
||||
)
|
||||
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
|
||||
|
||||
return await engine.run()
|
||||
}
|
||||
|
||||
private createExecutionContext(
|
||||
workflowId: string,
|
||||
triggerBlockId?: string,
|
||||
overrides?: {
|
||||
snapshotState?: SerializableExecutionState
|
||||
runFromBlockContext?: RunFromBlockContext
|
||||
}
|
||||
triggerBlockId?: string
|
||||
): { context: ExecutionContext; state: ExecutionState } {
|
||||
const snapshotState = overrides?.snapshotState ?? this.contextExtensions.snapshotState
|
||||
const snapshotState = this.contextExtensions.snapshotState
|
||||
const blockStates = snapshotState?.blockStates
|
||||
? new Map(Object.entries(snapshotState.blockStates))
|
||||
: new Map<string, BlockState>()
|
||||
let executedBlocks = snapshotState?.executedBlocks
|
||||
const executedBlocks = snapshotState?.executedBlocks
|
||||
? new Set(snapshotState.executedBlocks)
|
||||
: new Set<string>()
|
||||
|
||||
if (overrides?.runFromBlockContext) {
|
||||
const { dirtySet } = overrides.runFromBlockContext
|
||||
executedBlocks = new Set([...executedBlocks].filter((id) => !dirtySet.has(id)))
|
||||
logger.info('Cleared executed status for dirty blocks', {
|
||||
dirtySetSize: dirtySet.size,
|
||||
remainingExecutedBlocks: executedBlocks.size,
|
||||
})
|
||||
}
|
||||
|
||||
const state = new ExecutionState(blockStates, executedBlocks)
|
||||
|
||||
const context: ExecutionContext = {
|
||||
@@ -265,7 +109,7 @@ export class DAGExecutor {
|
||||
userId: this.contextExtensions.userId,
|
||||
isDeployedContext: this.contextExtensions.isDeployedContext,
|
||||
blockStates: state.getBlockStates(),
|
||||
blockLogs: overrides?.runFromBlockContext ? [] : (snapshotState?.blockLogs ?? []),
|
||||
blockLogs: snapshotState?.blockLogs ?? [],
|
||||
metadata: {
|
||||
...this.contextExtensions.metadata,
|
||||
startTime: new Date().toISOString(),
|
||||
@@ -325,8 +169,6 @@ export class DAGExecutor {
|
||||
abortSignal: this.contextExtensions.abortSignal,
|
||||
includeFileBase64: this.contextExtensions.includeFileBase64,
|
||||
base64MaxBytes: this.contextExtensions.base64MaxBytes,
|
||||
runFromBlockContext: overrides?.runFromBlockContext,
|
||||
stopAfterBlockId: this.contextExtensions.stopAfterBlockId,
|
||||
}
|
||||
|
||||
if (this.contextExtensions.resumeFromSnapshot) {
|
||||
@@ -351,15 +193,6 @@ export class DAGExecutor {
|
||||
pendingBlocks: context.metadata.pendingBlocks,
|
||||
skipStarterBlockInit: true,
|
||||
})
|
||||
} else if (overrides?.runFromBlockContext) {
|
||||
// In run-from-block mode, initialize the start block only if it's a regular block
|
||||
// Skip for sentinels/containers (loop/parallel) which aren't real blocks
|
||||
const startBlockId = overrides.runFromBlockContext.startBlockId
|
||||
const isRegularBlock = this.workflow.blocks.some((b) => b.id === startBlockId)
|
||||
|
||||
if (isRegularBlock) {
|
||||
this.initializeStarterBlock(context, state, startBlockId)
|
||||
}
|
||||
} else {
|
||||
this.initializeStarterBlock(context, state, triggerBlockId)
|
||||
}
|
||||
|
||||
@@ -27,8 +27,6 @@ export interface ParallelScope {
|
||||
items?: any[]
|
||||
/** Error message if parallel validation failed (e.g., exceeded max branches) */
|
||||
validationError?: string
|
||||
/** Whether the parallel has an empty distribution and should be skipped */
|
||||
isEmpty?: boolean
|
||||
}
|
||||
|
||||
export class ExecutionState implements BlockStateController {
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import type { Edge } from 'reactflow'
|
||||
import type { BlockLog, BlockState, NormalizedBlockOutput } from '@/executor/types'
|
||||
import type { RunFromBlockContext } from '@/executor/utils/run-from-block'
|
||||
import type { SubflowType } from '@/stores/workflows/workflow/types'
|
||||
|
||||
export interface ExecutionMetadata {
|
||||
@@ -106,17 +105,6 @@ export interface ContextExtensions {
|
||||
output: { input?: any; output: NormalizedBlockOutput; executionTime: number },
|
||||
iterationContext?: IterationContext
|
||||
) => Promise<void>
|
||||
|
||||
/**
|
||||
* Run-from-block configuration. When provided, executor runs in partial
|
||||
* execution mode starting from the specified block.
|
||||
*/
|
||||
runFromBlockContext?: RunFromBlockContext
|
||||
|
||||
/**
|
||||
* Stop execution after this block completes. Used for "run until block" feature.
|
||||
*/
|
||||
stopAfterBlockId?: string
|
||||
}
|
||||
|
||||
export interface WorkflowInput {
|
||||
|
||||
@@ -276,16 +276,7 @@ export class LoopOrchestrator {
|
||||
scope: LoopScope
|
||||
): LoopContinuationResult {
|
||||
const results = scope.allIterationOutputs
|
||||
const output = { results }
|
||||
this.state.setBlockOutput(loopId, output, DEFAULTS.EXECUTION_TIME)
|
||||
|
||||
// Emit onBlockComplete for the loop container so the UI can track it
|
||||
if (this.contextExtensions?.onBlockComplete) {
|
||||
this.contextExtensions.onBlockComplete(loopId, 'Loop', 'loop', {
|
||||
output,
|
||||
executionTime: DEFAULTS.EXECUTION_TIME,
|
||||
})
|
||||
}
|
||||
this.state.setBlockOutput(loopId, { results }, DEFAULTS.EXECUTION_TIME)
|
||||
|
||||
return {
|
||||
shouldContinue: false,
|
||||
@@ -395,10 +386,10 @@ export class LoopOrchestrator {
|
||||
return true
|
||||
}
|
||||
|
||||
// forEach: skip if items array is empty
|
||||
if (scope.loopType === 'forEach') {
|
||||
if (!scope.items || scope.items.length === 0) {
|
||||
logger.info('ForEach loop has empty collection, skipping loop body', { loopId })
|
||||
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
|
||||
logger.info('ForEach loop has empty items, skipping loop body', { loopId })
|
||||
return false
|
||||
}
|
||||
return true
|
||||
@@ -408,8 +399,6 @@ export class LoopOrchestrator {
|
||||
if (scope.loopType === 'for') {
|
||||
if (scope.maxIterations === 0) {
|
||||
logger.info('For loop has 0 iterations, skipping loop body', { loopId })
|
||||
// Set empty output for the loop
|
||||
this.state.setBlockOutput(loopId, { results: [] }, DEFAULTS.EXECUTION_TIME)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
|
||||
@@ -31,18 +31,7 @@ export class NodeExecutionOrchestrator {
|
||||
throw new Error(`Node not found in DAG: ${nodeId}`)
|
||||
}
|
||||
|
||||
if (ctx.runFromBlockContext && !ctx.runFromBlockContext.dirtySet.has(nodeId)) {
|
||||
const cachedOutput = this.state.getBlockOutput(nodeId) || {}
|
||||
logger.debug('Skipping non-dirty block in run-from-block mode', { nodeId })
|
||||
return {
|
||||
nodeId,
|
||||
output: cachedOutput,
|
||||
isFinalOutput: false,
|
||||
}
|
||||
}
|
||||
|
||||
const isDirtyBlock = ctx.runFromBlockContext?.dirtySet.has(nodeId) ?? false
|
||||
if (!isDirtyBlock && this.state.hasExecuted(nodeId)) {
|
||||
if (this.state.hasExecuted(nodeId)) {
|
||||
const output = this.state.getBlockOutput(nodeId) || {}
|
||||
return {
|
||||
nodeId,
|
||||
@@ -108,7 +97,7 @@ export class NodeExecutionOrchestrator {
|
||||
if (loopId) {
|
||||
const shouldExecute = await this.loopOrchestrator.evaluateInitialCondition(ctx, loopId)
|
||||
if (!shouldExecute) {
|
||||
logger.info('Loop initial condition false, skipping loop body', { loopId })
|
||||
logger.info('While loop initial condition false, skipping loop body', { loopId })
|
||||
return {
|
||||
sentinelStart: true,
|
||||
shouldExit: true,
|
||||
@@ -169,17 +158,6 @@ export class NodeExecutionOrchestrator {
|
||||
this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel)
|
||||
}
|
||||
}
|
||||
|
||||
const scope = this.parallelOrchestrator.getParallelScope(ctx, parallelId)
|
||||
if (scope?.isEmpty) {
|
||||
logger.info('Parallel has empty distribution, skipping parallel body', { parallelId })
|
||||
return {
|
||||
sentinelStart: true,
|
||||
shouldExit: true,
|
||||
selectedRoute: EDGE.PARALLEL_EXIT,
|
||||
}
|
||||
}
|
||||
|
||||
return { sentinelStart: true }
|
||||
}
|
||||
|
||||
|
||||
@@ -61,13 +61,11 @@ export class ParallelOrchestrator {
|
||||
|
||||
let items: any[] | undefined
|
||||
let branchCount: number
|
||||
let isEmpty = false
|
||||
|
||||
try {
|
||||
const resolved = this.resolveBranchCount(ctx, parallelConfig, parallelId)
|
||||
const resolved = this.resolveBranchCount(ctx, parallelConfig)
|
||||
branchCount = resolved.branchCount
|
||||
items = resolved.items
|
||||
isEmpty = resolved.isEmpty ?? false
|
||||
} catch (error) {
|
||||
const errorMessage = `Parallel Items did not resolve: ${error instanceof Error ? error.message : String(error)}`
|
||||
logger.error(errorMessage, { parallelId, distribution: parallelConfig.distribution })
|
||||
@@ -93,34 +91,6 @@ export class ParallelOrchestrator {
|
||||
throw new Error(branchError)
|
||||
}
|
||||
|
||||
// Handle empty distribution - skip parallel body
|
||||
if (isEmpty || branchCount === 0) {
|
||||
const scope: ParallelScope = {
|
||||
parallelId,
|
||||
totalBranches: 0,
|
||||
branchOutputs: new Map(),
|
||||
completedCount: 0,
|
||||
totalExpectedNodes: 0,
|
||||
items: [],
|
||||
isEmpty: true,
|
||||
}
|
||||
|
||||
if (!ctx.parallelExecutions) {
|
||||
ctx.parallelExecutions = new Map()
|
||||
}
|
||||
ctx.parallelExecutions.set(parallelId, scope)
|
||||
|
||||
// Set empty output for the parallel
|
||||
this.state.setBlockOutput(parallelId, { results: [] })
|
||||
|
||||
logger.info('Parallel scope initialized with empty distribution, skipping body', {
|
||||
parallelId,
|
||||
branchCount: 0,
|
||||
})
|
||||
|
||||
return scope
|
||||
}
|
||||
|
||||
const { entryNodes } = this.expander.expandParallel(this.dag, parallelId, branchCount, items)
|
||||
|
||||
const scope: ParallelScope = {
|
||||
@@ -157,17 +127,15 @@ export class ParallelOrchestrator {
|
||||
|
||||
private resolveBranchCount(
|
||||
ctx: ExecutionContext,
|
||||
config: SerializedParallel,
|
||||
parallelId: string
|
||||
): { branchCount: number; items?: any[]; isEmpty?: boolean } {
|
||||
config: SerializedParallel
|
||||
): { branchCount: number; items?: any[] } {
|
||||
if (config.parallelType === 'count') {
|
||||
return { branchCount: config.count ?? 1 }
|
||||
}
|
||||
|
||||
const items = this.resolveDistributionItems(ctx, config)
|
||||
if (items.length === 0) {
|
||||
logger.info('Parallel has empty distribution, skipping parallel body', { parallelId })
|
||||
return { branchCount: 0, items: [], isEmpty: true }
|
||||
return { branchCount: config.count ?? 1 }
|
||||
}
|
||||
|
||||
return { branchCount: items.length, items }
|
||||
@@ -260,17 +228,9 @@ export class ParallelOrchestrator {
|
||||
const branchOutputs = scope.branchOutputs.get(i) || []
|
||||
results.push(branchOutputs)
|
||||
}
|
||||
const output = { results }
|
||||
this.state.setBlockOutput(parallelId, output)
|
||||
|
||||
// Emit onBlockComplete for the parallel container so the UI can track it
|
||||
if (this.contextExtensions?.onBlockComplete) {
|
||||
this.contextExtensions.onBlockComplete(parallelId, 'Parallel', 'parallel', {
|
||||
output,
|
||||
executionTime: 0,
|
||||
})
|
||||
}
|
||||
|
||||
this.state.setBlockOutput(parallelId, {
|
||||
results,
|
||||
})
|
||||
return {
|
||||
allBranchesComplete: true,
|
||||
results,
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
import type { TraceSpan } from '@/lib/logs/types'
|
||||
import type { PermissionGroupConfig } from '@/lib/permission-groups/types'
|
||||
import type { BlockOutput } from '@/blocks/types'
|
||||
import type { RunFromBlockContext } from '@/executor/utils/run-from-block'
|
||||
import type { SerializedBlock, SerializedWorkflow } from '@/serializer/types'
|
||||
|
||||
export interface UserFile {
|
||||
@@ -251,17 +250,6 @@ export interface ExecutionContext {
|
||||
* will not have their base64 content fetched.
|
||||
*/
|
||||
base64MaxBytes?: number
|
||||
|
||||
/**
|
||||
* Context for "run from block" mode. When present, only blocks in dirtySet
|
||||
* will be executed; others return cached outputs from the source snapshot.
|
||||
*/
|
||||
runFromBlockContext?: RunFromBlockContext
|
||||
|
||||
/**
|
||||
* Stop execution after this block completes. Used for "run until block" feature.
|
||||
*/
|
||||
stopAfterBlockId?: string
|
||||
}
|
||||
|
||||
export interface ExecutionResult {
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,219 +0,0 @@
|
||||
import { LOOP, PARALLEL } from '@/executor/constants'
|
||||
import type { DAG } from '@/executor/dag/builder'
|
||||
|
||||
/**
|
||||
* Builds the sentinel-start node ID for a loop.
|
||||
*/
|
||||
function buildLoopSentinelStartId(loopId: string): string {
|
||||
return `${LOOP.SENTINEL.PREFIX}${loopId}${LOOP.SENTINEL.START_SUFFIX}`
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds the sentinel-start node ID for a parallel.
|
||||
*/
|
||||
function buildParallelSentinelStartId(parallelId: string): string {
|
||||
return `${PARALLEL.SENTINEL.PREFIX}${parallelId}${PARALLEL.SENTINEL.START_SUFFIX}`
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if a block ID is a loop or parallel container and returns the sentinel-start ID if so.
|
||||
* Returns null if the block is not a container.
|
||||
*/
|
||||
export function resolveContainerToSentinelStart(blockId: string, dag: DAG): string | null {
|
||||
if (dag.loopConfigs.has(blockId)) {
|
||||
return buildLoopSentinelStartId(blockId)
|
||||
}
|
||||
if (dag.parallelConfigs.has(blockId)) {
|
||||
return buildParallelSentinelStartId(blockId)
|
||||
}
|
||||
return null
|
||||
}
|
||||
|
||||
/**
|
||||
* Result of validating a block for run-from-block execution.
|
||||
*/
|
||||
export interface RunFromBlockValidation {
|
||||
valid: boolean
|
||||
error?: string
|
||||
}
|
||||
|
||||
/**
|
||||
* Context for run-from-block execution mode.
|
||||
*/
|
||||
export interface RunFromBlockContext {
|
||||
/** The block ID to start execution from */
|
||||
startBlockId: string
|
||||
/** Set of block IDs that need re-execution (start block + all downstream) */
|
||||
dirtySet: Set<string>
|
||||
}
|
||||
|
||||
/**
|
||||
* Result of computing execution sets for run-from-block mode.
|
||||
*/
|
||||
export interface ExecutionSets {
|
||||
/** Blocks that need re-execution (start block + all downstream) */
|
||||
dirtySet: Set<string>
|
||||
/** Blocks that are upstream (ancestors) of the start block */
|
||||
upstreamSet: Set<string>
|
||||
/** Blocks that are upstream of any dirty block (for snapshot preservation) */
|
||||
reachableUpstreamSet: Set<string>
|
||||
}
|
||||
|
||||
/**
|
||||
* Computes the dirty set, upstream set, and reachable upstream set.
|
||||
* - Dirty set: start block + all blocks reachable via outgoing edges (need re-execution)
|
||||
* - Upstream set: all blocks reachable via incoming edges from the start block
|
||||
* - Reachable upstream set: all non-dirty blocks that are upstream of ANY dirty block
|
||||
* (includes sibling branches that dirty blocks may reference)
|
||||
*
|
||||
* For loop/parallel containers, starts from the sentinel-start node and includes
|
||||
* the container ID itself in the dirty set.
|
||||
*
|
||||
* @param dag - The workflow DAG
|
||||
* @param startBlockId - The block to start execution from
|
||||
* @returns Object containing dirtySet, upstreamSet, and reachableUpstreamSet
|
||||
*/
|
||||
export function computeExecutionSets(dag: DAG, startBlockId: string): ExecutionSets {
|
||||
const dirty = new Set<string>([startBlockId])
|
||||
const upstream = new Set<string>()
|
||||
const sentinelStartId = resolveContainerToSentinelStart(startBlockId, dag)
|
||||
const traversalStartId = sentinelStartId ?? startBlockId
|
||||
|
||||
if (sentinelStartId) {
|
||||
dirty.add(sentinelStartId)
|
||||
}
|
||||
|
||||
// BFS downstream for dirty set
|
||||
const downstreamQueue = [traversalStartId]
|
||||
while (downstreamQueue.length > 0) {
|
||||
const nodeId = downstreamQueue.shift()!
|
||||
const node = dag.nodes.get(nodeId)
|
||||
if (!node) continue
|
||||
|
||||
for (const [, edge] of node.outgoingEdges) {
|
||||
if (!dirty.has(edge.target)) {
|
||||
dirty.add(edge.target)
|
||||
downstreamQueue.push(edge.target)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// BFS upstream from start block for upstream set
|
||||
const upstreamQueue = [traversalStartId]
|
||||
while (upstreamQueue.length > 0) {
|
||||
const nodeId = upstreamQueue.shift()!
|
||||
const node = dag.nodes.get(nodeId)
|
||||
if (!node) continue
|
||||
|
||||
for (const sourceId of node.incomingEdges) {
|
||||
if (!upstream.has(sourceId)) {
|
||||
upstream.add(sourceId)
|
||||
upstreamQueue.push(sourceId)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Compute reachable upstream: all non-dirty blocks upstream of ANY dirty block
|
||||
// This handles the case where a dirty block (like C in A->C, B->C) may reference
|
||||
// sibling branches (like B when running from A)
|
||||
const reachableUpstream = new Set<string>()
|
||||
for (const dirtyNodeId of dirty) {
|
||||
const node = dag.nodes.get(dirtyNodeId)
|
||||
if (!node) continue
|
||||
|
||||
// BFS upstream from this dirty node
|
||||
const queue = [...node.incomingEdges]
|
||||
while (queue.length > 0) {
|
||||
const sourceId = queue.shift()!
|
||||
if (reachableUpstream.has(sourceId) || dirty.has(sourceId)) continue
|
||||
|
||||
reachableUpstream.add(sourceId)
|
||||
const sourceNode = dag.nodes.get(sourceId)
|
||||
if (sourceNode) {
|
||||
queue.push(...sourceNode.incomingEdges)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return { dirtySet: dirty, upstreamSet: upstream, reachableUpstreamSet: reachableUpstream }
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates that a block can be used as a run-from-block starting point.
|
||||
*
|
||||
* Validation rules:
|
||||
* - Block must exist in the DAG (or be a loop/parallel container)
|
||||
* - Block cannot be inside a loop (but loop containers are allowed)
|
||||
* - Block cannot be inside a parallel (but parallel containers are allowed)
|
||||
* - Block cannot be a sentinel node
|
||||
* - All upstream dependencies must have been executed (have cached outputs)
|
||||
*
|
||||
* @param blockId - The block ID to validate
|
||||
* @param dag - The workflow DAG
|
||||
* @param executedBlocks - Set of blocks that were executed in the source run
|
||||
* @returns Validation result with error message if invalid
|
||||
*/
|
||||
export function validateRunFromBlock(
|
||||
blockId: string,
|
||||
dag: DAG,
|
||||
executedBlocks: Set<string>
|
||||
): RunFromBlockValidation {
|
||||
const node = dag.nodes.get(blockId)
|
||||
const isLoopContainer = dag.loopConfigs.has(blockId)
|
||||
const isParallelContainer = dag.parallelConfigs.has(blockId)
|
||||
const isContainer = isLoopContainer || isParallelContainer
|
||||
|
||||
if (!node && !isContainer) {
|
||||
return { valid: false, error: `Block not found in workflow: ${blockId}` }
|
||||
}
|
||||
|
||||
if (isContainer) {
|
||||
const sentinelStartId = resolveContainerToSentinelStart(blockId, dag)
|
||||
if (!sentinelStartId || !dag.nodes.has(sentinelStartId)) {
|
||||
return {
|
||||
valid: false,
|
||||
error: `Container sentinel not found for: ${blockId}`,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (node) {
|
||||
if (node.metadata.isLoopNode) {
|
||||
return {
|
||||
valid: false,
|
||||
error: `Cannot run from block inside loop: ${node.metadata.loopId}`,
|
||||
}
|
||||
}
|
||||
|
||||
if (node.metadata.isParallelBranch) {
|
||||
return {
|
||||
valid: false,
|
||||
error: `Cannot run from block inside parallel: ${node.metadata.parallelId}`,
|
||||
}
|
||||
}
|
||||
|
||||
if (node.metadata.isSentinel) {
|
||||
return { valid: false, error: 'Cannot run from sentinel node' }
|
||||
}
|
||||
|
||||
// Check immediate upstream dependencies were executed
|
||||
for (const sourceId of node.incomingEdges) {
|
||||
const sourceNode = dag.nodes.get(sourceId)
|
||||
// Skip sentinel nodes - they're internal and not in executedBlocks
|
||||
if (sourceNode?.metadata.isSentinel) continue
|
||||
|
||||
// Skip trigger nodes - they're entry points and don't need prior execution
|
||||
// A trigger node has no incoming edges
|
||||
if (sourceNode && sourceNode.incomingEdges.size === 0) continue
|
||||
|
||||
if (!executedBlocks.has(sourceId)) {
|
||||
return {
|
||||
valid: false,
|
||||
error: `Upstream dependency not executed: ${sourceId}`,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return { valid: true }
|
||||
}
|
||||
@@ -348,6 +348,210 @@ export function useUndeployWorkflow() {
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Variables for update deployment version mutation
|
||||
*/
|
||||
interface UpdateDeploymentVersionVariables {
|
||||
workflowId: string
|
||||
version: number
|
||||
name?: string
|
||||
description?: string | null
|
||||
}
|
||||
|
||||
/**
|
||||
* Response from update deployment version mutation
|
||||
*/
|
||||
interface UpdateDeploymentVersionResult {
|
||||
name: string | null
|
||||
description: string | null
|
||||
}
|
||||
|
||||
/**
|
||||
* Mutation hook for updating a deployment version's name or description.
|
||||
* Invalidates versions query on success.
|
||||
*/
|
||||
export function useUpdateDeploymentVersion() {
|
||||
const queryClient = useQueryClient()
|
||||
|
||||
return useMutation({
|
||||
mutationFn: async ({
|
||||
workflowId,
|
||||
version,
|
||||
name,
|
||||
description,
|
||||
}: UpdateDeploymentVersionVariables): Promise<UpdateDeploymentVersionResult> => {
|
||||
const response = await fetch(`/api/workflows/${workflowId}/deployments/${version}`, {
|
||||
method: 'PATCH',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
},
|
||||
body: JSON.stringify({ name, description }),
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
const errorData = await response.json()
|
||||
throw new Error(errorData.error || 'Failed to update deployment version')
|
||||
}
|
||||
|
||||
return response.json()
|
||||
},
|
||||
onSuccess: (_, variables) => {
|
||||
logger.info('Deployment version updated', {
|
||||
workflowId: variables.workflowId,
|
||||
version: variables.version,
|
||||
})
|
||||
|
||||
queryClient.invalidateQueries({
|
||||
queryKey: deploymentKeys.versions(variables.workflowId),
|
||||
})
|
||||
},
|
||||
onError: (error) => {
|
||||
logger.error('Failed to update deployment version', { error })
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Variables for generating a version description
|
||||
*/
|
||||
interface GenerateVersionDescriptionVariables {
|
||||
workflowId: string
|
||||
version: number
|
||||
onStreamChunk?: (accumulated: string) => void
|
||||
}
|
||||
|
||||
const VERSION_DESCRIPTION_SYSTEM_PROMPT = `You are a technical writer generating concise deployment version descriptions.
|
||||
|
||||
Given a diff of changes between two workflow versions, write a brief, factual description (1-2 sentences, under 300 characters) that states ONLY what changed.
|
||||
|
||||
RULES:
|
||||
- State specific values when provided (e.g. "model changed from X to Y")
|
||||
- Do NOT wrap your response in quotes
|
||||
- Do NOT add filler phrases like "streamlining the workflow", "for improved efficiency"
|
||||
- Do NOT use markdown formatting
|
||||
- Do NOT include version numbers
|
||||
- Do NOT start with "This version" or similar phrases
|
||||
|
||||
Good examples:
|
||||
- Changes model in Agent 1 from gpt-4o to claude-sonnet-4-20250514.
|
||||
- Adds Slack notification block. Updates webhook URL to production endpoint.
|
||||
- Removes Function block and its connection to Router.
|
||||
|
||||
Bad examples:
|
||||
- "Changes model..." (NO - don't wrap in quotes)
|
||||
- Changes model, streamlining the workflow. (NO - don't add filler)
|
||||
|
||||
Respond with ONLY the plain text description.`
|
||||
|
||||
/**
|
||||
* Hook for generating a version description using AI based on workflow diff
|
||||
*/
|
||||
export function useGenerateVersionDescription() {
|
||||
return useMutation({
|
||||
mutationFn: async ({
|
||||
workflowId,
|
||||
version,
|
||||
onStreamChunk,
|
||||
}: GenerateVersionDescriptionVariables): Promise<string> => {
|
||||
const { generateWorkflowDiffSummary, formatDiffSummaryForDescription } = await import(
|
||||
'@/lib/workflows/comparison/compare'
|
||||
)
|
||||
|
||||
const currentResponse = await fetch(`/api/workflows/${workflowId}/deployments/${version}`)
|
||||
if (!currentResponse.ok) {
|
||||
throw new Error('Failed to fetch current version state')
|
||||
}
|
||||
const currentData = await currentResponse.json()
|
||||
const currentState = currentData.deployedState
|
||||
|
||||
let previousState = null
|
||||
if (version > 1) {
|
||||
const previousResponse = await fetch(
|
||||
`/api/workflows/${workflowId}/deployments/${version - 1}`
|
||||
)
|
||||
if (previousResponse.ok) {
|
||||
const previousData = await previousResponse.json()
|
||||
previousState = previousData.deployedState
|
||||
}
|
||||
}
|
||||
|
||||
const diffSummary = generateWorkflowDiffSummary(currentState, previousState)
|
||||
const diffText = formatDiffSummaryForDescription(diffSummary)
|
||||
|
||||
const wandResponse = await fetch('/api/wand', {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
'Cache-Control': 'no-cache, no-transform',
|
||||
},
|
||||
body: JSON.stringify({
|
||||
prompt: `Generate a deployment version description based on these changes:\n\n${diffText}`,
|
||||
systemPrompt: VERSION_DESCRIPTION_SYSTEM_PROMPT,
|
||||
stream: true,
|
||||
workflowId,
|
||||
}),
|
||||
cache: 'no-store',
|
||||
})
|
||||
|
||||
if (!wandResponse.ok) {
|
||||
const errorText = await wandResponse.text()
|
||||
throw new Error(errorText || 'Failed to generate description')
|
||||
}
|
||||
|
||||
if (!wandResponse.body) {
|
||||
throw new Error('Response body is null')
|
||||
}
|
||||
|
||||
const reader = wandResponse.body.getReader()
|
||||
const decoder = new TextDecoder()
|
||||
let accumulatedContent = ''
|
||||
|
||||
try {
|
||||
while (true) {
|
||||
const { done, value } = await reader.read()
|
||||
if (done) break
|
||||
|
||||
const chunk = decoder.decode(value)
|
||||
const lines = chunk.split('\n\n')
|
||||
|
||||
for (const line of lines) {
|
||||
if (line.startsWith('data: ')) {
|
||||
const lineData = line.substring(6)
|
||||
if (lineData === '[DONE]') continue
|
||||
|
||||
try {
|
||||
const data = JSON.parse(lineData)
|
||||
if (data.error) throw new Error(data.error)
|
||||
if (data.chunk) {
|
||||
accumulatedContent += data.chunk
|
||||
onStreamChunk?.(accumulatedContent)
|
||||
}
|
||||
if (data.done) break
|
||||
} catch {
|
||||
// Skip unparseable lines
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
reader.releaseLock()
|
||||
}
|
||||
|
||||
if (!accumulatedContent) {
|
||||
throw new Error('Failed to generate description')
|
||||
}
|
||||
|
||||
return accumulatedContent.trim()
|
||||
},
|
||||
onSuccess: (content) => {
|
||||
logger.info('Generated version description', { length: content.length })
|
||||
},
|
||||
onError: (error) => {
|
||||
logger.error('Failed to generate version description', { error })
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Variables for activate version mutation
|
||||
*/
|
||||
|
||||
@@ -1,85 +1,10 @@
|
||||
import { useCallback, useRef } from 'react'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import type { ExecutionEvent } from '@/lib/workflows/executor/execution-events'
|
||||
import type { SerializableExecutionState } from '@/executor/execution/types'
|
||||
import type { SubflowType } from '@/stores/workflows/workflow/types'
|
||||
|
||||
const logger = createLogger('useExecutionStream')
|
||||
|
||||
/**
|
||||
* Processes SSE events from a response body and invokes appropriate callbacks.
|
||||
*/
|
||||
async function processSSEStream(
|
||||
reader: ReadableStreamDefaultReader<Uint8Array>,
|
||||
callbacks: ExecutionStreamCallbacks,
|
||||
logPrefix: string
|
||||
): Promise<void> {
|
||||
const decoder = new TextDecoder()
|
||||
let buffer = ''
|
||||
|
||||
try {
|
||||
while (true) {
|
||||
const { done, value } = await reader.read()
|
||||
|
||||
if (done) break
|
||||
|
||||
buffer += decoder.decode(value, { stream: true })
|
||||
const lines = buffer.split('\n\n')
|
||||
buffer = lines.pop() || ''
|
||||
|
||||
for (const line of lines) {
|
||||
if (!line.trim() || !line.startsWith('data: ')) continue
|
||||
|
||||
const data = line.substring(6).trim()
|
||||
if (data === '[DONE]') {
|
||||
logger.info(`${logPrefix} stream completed`)
|
||||
continue
|
||||
}
|
||||
|
||||
try {
|
||||
const event = JSON.parse(data) as ExecutionEvent
|
||||
|
||||
switch (event.type) {
|
||||
case 'execution:started':
|
||||
callbacks.onExecutionStarted?.(event.data)
|
||||
break
|
||||
case 'execution:completed':
|
||||
callbacks.onExecutionCompleted?.(event.data)
|
||||
break
|
||||
case 'execution:error':
|
||||
callbacks.onExecutionError?.(event.data)
|
||||
break
|
||||
case 'execution:cancelled':
|
||||
callbacks.onExecutionCancelled?.(event.data)
|
||||
break
|
||||
case 'block:started':
|
||||
callbacks.onBlockStarted?.(event.data)
|
||||
break
|
||||
case 'block:completed':
|
||||
callbacks.onBlockCompleted?.(event.data)
|
||||
break
|
||||
case 'block:error':
|
||||
callbacks.onBlockError?.(event.data)
|
||||
break
|
||||
case 'stream:chunk':
|
||||
callbacks.onStreamChunk?.(event.data)
|
||||
break
|
||||
case 'stream:done':
|
||||
callbacks.onStreamDone?.(event.data)
|
||||
break
|
||||
default:
|
||||
logger.warn('Unknown event type:', (event as any).type)
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('Failed to parse SSE event:', error, { data })
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
reader.releaseLock()
|
||||
}
|
||||
}
|
||||
|
||||
export interface ExecutionStreamCallbacks {
|
||||
onExecutionStarted?: (data: { startTime: string }) => void
|
||||
onExecutionCompleted?: (data: {
|
||||
@@ -143,15 +68,6 @@ export interface ExecuteStreamOptions {
|
||||
loops?: Record<string, any>
|
||||
parallels?: Record<string, any>
|
||||
}
|
||||
stopAfterBlockId?: string
|
||||
callbacks?: ExecutionStreamCallbacks
|
||||
}
|
||||
|
||||
export interface ExecuteFromBlockOptions {
|
||||
workflowId: string
|
||||
startBlockId: string
|
||||
sourceSnapshot: SerializableExecutionState
|
||||
input?: any
|
||||
callbacks?: ExecutionStreamCallbacks
|
||||
}
|
||||
|
||||
@@ -203,7 +119,91 @@ export function useExecutionStream() {
|
||||
}
|
||||
|
||||
const reader = response.body.getReader()
|
||||
await processSSEStream(reader, callbacks, 'Execution')
|
||||
const decoder = new TextDecoder()
|
||||
let buffer = ''
|
||||
|
||||
try {
|
||||
while (true) {
|
||||
const { done, value } = await reader.read()
|
||||
|
||||
if (done) {
|
||||
break
|
||||
}
|
||||
|
||||
buffer += decoder.decode(value, { stream: true })
|
||||
|
||||
const lines = buffer.split('\n\n')
|
||||
|
||||
buffer = lines.pop() || ''
|
||||
|
||||
for (const line of lines) {
|
||||
if (!line.trim() || !line.startsWith('data: ')) {
|
||||
continue
|
||||
}
|
||||
|
||||
const data = line.substring(6).trim()
|
||||
|
||||
if (data === '[DONE]') {
|
||||
logger.info('Stream completed')
|
||||
continue
|
||||
}
|
||||
|
||||
try {
|
||||
const event = JSON.parse(data) as ExecutionEvent
|
||||
|
||||
logger.info('📡 SSE Event received:', {
|
||||
type: event.type,
|
||||
executionId: event.executionId,
|
||||
data: event.data,
|
||||
})
|
||||
|
||||
switch (event.type) {
|
||||
case 'execution:started':
|
||||
logger.info('🚀 Execution started')
|
||||
callbacks.onExecutionStarted?.(event.data)
|
||||
break
|
||||
case 'execution:completed':
|
||||
logger.info('✅ Execution completed')
|
||||
callbacks.onExecutionCompleted?.(event.data)
|
||||
break
|
||||
case 'execution:error':
|
||||
logger.error('❌ Execution error')
|
||||
callbacks.onExecutionError?.(event.data)
|
||||
break
|
||||
case 'execution:cancelled':
|
||||
logger.warn('🛑 Execution cancelled')
|
||||
callbacks.onExecutionCancelled?.(event.data)
|
||||
break
|
||||
case 'block:started':
|
||||
logger.info('🔷 Block started:', event.data.blockId)
|
||||
callbacks.onBlockStarted?.(event.data)
|
||||
break
|
||||
case 'block:completed':
|
||||
logger.info('✓ Block completed:', event.data.blockId)
|
||||
callbacks.onBlockCompleted?.(event.data)
|
||||
break
|
||||
case 'block:error':
|
||||
logger.error('✗ Block error:', event.data.blockId)
|
||||
callbacks.onBlockError?.(event.data)
|
||||
break
|
||||
case 'stream:chunk':
|
||||
callbacks.onStreamChunk?.(event.data)
|
||||
break
|
||||
case 'stream:done':
|
||||
logger.info('Stream done:', event.data.blockId)
|
||||
callbacks.onStreamDone?.(event.data)
|
||||
break
|
||||
default:
|
||||
logger.warn('Unknown event type:', (event as any).type)
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('Failed to parse SSE event:', error, { data })
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
reader.releaseLock()
|
||||
}
|
||||
} catch (error: any) {
|
||||
if (error.name === 'AbortError') {
|
||||
logger.info('Execution stream cancelled')
|
||||
@@ -222,70 +222,6 @@ export function useExecutionStream() {
|
||||
}
|
||||
}, [])
|
||||
|
||||
const executeFromBlock = useCallback(async (options: ExecuteFromBlockOptions) => {
|
||||
const { workflowId, startBlockId, sourceSnapshot, input, callbacks = {} } = options
|
||||
|
||||
if (abortControllerRef.current) {
|
||||
abortControllerRef.current.abort()
|
||||
}
|
||||
|
||||
const abortController = new AbortController()
|
||||
abortControllerRef.current = abortController
|
||||
currentExecutionRef.current = null
|
||||
|
||||
try {
|
||||
const response = await fetch(`/api/workflows/${workflowId}/execute-from-block`, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
},
|
||||
body: JSON.stringify({ startBlockId, sourceSnapshot, input }),
|
||||
signal: abortController.signal,
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
let errorResponse: any
|
||||
try {
|
||||
errorResponse = await response.json()
|
||||
} catch {
|
||||
throw new Error(`Server error (${response.status}): ${response.statusText}`)
|
||||
}
|
||||
const error = new Error(errorResponse.error || 'Failed to start execution')
|
||||
if (errorResponse && typeof errorResponse === 'object') {
|
||||
Object.assign(error, { executionResult: errorResponse })
|
||||
}
|
||||
throw error
|
||||
}
|
||||
|
||||
if (!response.body) {
|
||||
throw new Error('No response body')
|
||||
}
|
||||
|
||||
const executionId = response.headers.get('X-Execution-Id')
|
||||
if (executionId) {
|
||||
currentExecutionRef.current = { workflowId, executionId }
|
||||
}
|
||||
|
||||
const reader = response.body.getReader()
|
||||
await processSSEStream(reader, callbacks, 'Run-from-block')
|
||||
} catch (error: any) {
|
||||
if (error.name === 'AbortError') {
|
||||
logger.info('Run-from-block execution cancelled')
|
||||
callbacks.onExecutionCancelled?.({ duration: 0 })
|
||||
} else {
|
||||
logger.error('Run-from-block execution error:', error)
|
||||
callbacks.onExecutionError?.({
|
||||
error: error.message || 'Unknown error',
|
||||
duration: 0,
|
||||
})
|
||||
}
|
||||
throw error
|
||||
} finally {
|
||||
abortControllerRef.current = null
|
||||
currentExecutionRef.current = null
|
||||
}
|
||||
}, [])
|
||||
|
||||
const cancel = useCallback(() => {
|
||||
const execution = currentExecutionRef.current
|
||||
if (execution) {
|
||||
@@ -303,7 +239,6 @@ export function useExecutionStream() {
|
||||
|
||||
return {
|
||||
execute,
|
||||
executeFromBlock,
|
||||
cancel,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,7 +7,11 @@ import {
|
||||
} from '@sim/testing'
|
||||
import { describe, expect, it } from 'vitest'
|
||||
import type { WorkflowState } from '@/stores/workflows/workflow/types'
|
||||
import { hasWorkflowChanged } from './compare'
|
||||
import {
|
||||
formatDiffSummaryForDescription,
|
||||
generateWorkflowDiffSummary,
|
||||
hasWorkflowChanged,
|
||||
} from './compare'
|
||||
|
||||
/**
|
||||
* Type helper for converting test workflow state to app workflow state.
|
||||
@@ -2735,3 +2739,299 @@ describe('hasWorkflowChanged', () => {
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
describe('generateWorkflowDiffSummary', () => {
|
||||
describe('Basic Cases', () => {
|
||||
it.concurrent('should return hasChanges=true when previousState is null', () => {
|
||||
const currentState = createWorkflowState({
|
||||
blocks: { block1: createBlock('block1') },
|
||||
})
|
||||
const result = generateWorkflowDiffSummary(currentState, null)
|
||||
expect(result.hasChanges).toBe(true)
|
||||
expect(result.addedBlocks).toHaveLength(1)
|
||||
expect(result.addedBlocks[0].id).toBe('block1')
|
||||
})
|
||||
|
||||
it.concurrent('should return hasChanges=false for identical states', () => {
|
||||
const state = createWorkflowState({
|
||||
blocks: { block1: createBlock('block1') },
|
||||
})
|
||||
const result = generateWorkflowDiffSummary(state, state)
|
||||
expect(result.hasChanges).toBe(false)
|
||||
expect(result.addedBlocks).toHaveLength(0)
|
||||
expect(result.removedBlocks).toHaveLength(0)
|
||||
expect(result.modifiedBlocks).toHaveLength(0)
|
||||
})
|
||||
})
|
||||
|
||||
describe('Block Changes', () => {
|
||||
it.concurrent('should detect added blocks', () => {
|
||||
const previousState = createWorkflowState({
|
||||
blocks: { block1: createBlock('block1') },
|
||||
})
|
||||
const currentState = createWorkflowState({
|
||||
blocks: {
|
||||
block1: createBlock('block1'),
|
||||
block2: createBlock('block2'),
|
||||
},
|
||||
})
|
||||
const result = generateWorkflowDiffSummary(currentState, previousState)
|
||||
expect(result.hasChanges).toBe(true)
|
||||
expect(result.addedBlocks).toHaveLength(1)
|
||||
expect(result.addedBlocks[0].id).toBe('block2')
|
||||
})
|
||||
|
||||
it.concurrent('should detect removed blocks', () => {
|
||||
const previousState = createWorkflowState({
|
||||
blocks: {
|
||||
block1: createBlock('block1'),
|
||||
block2: createBlock('block2'),
|
||||
},
|
||||
})
|
||||
const currentState = createWorkflowState({
|
||||
blocks: { block1: createBlock('block1') },
|
||||
})
|
||||
const result = generateWorkflowDiffSummary(currentState, previousState)
|
||||
expect(result.hasChanges).toBe(true)
|
||||
expect(result.removedBlocks).toHaveLength(1)
|
||||
expect(result.removedBlocks[0].id).toBe('block2')
|
||||
})
|
||||
|
||||
it.concurrent('should detect modified blocks with field changes', () => {
|
||||
const previousState = createWorkflowState({
|
||||
blocks: {
|
||||
block1: createBlock('block1', {
|
||||
subBlocks: { model: { id: 'model', type: 'dropdown', value: 'gpt-4o' } },
|
||||
}),
|
||||
},
|
||||
})
|
||||
const currentState = createWorkflowState({
|
||||
blocks: {
|
||||
block1: createBlock('block1', {
|
||||
subBlocks: { model: { id: 'model', type: 'dropdown', value: 'claude-sonnet' } },
|
||||
}),
|
||||
},
|
||||
})
|
||||
const result = generateWorkflowDiffSummary(currentState, previousState)
|
||||
expect(result.hasChanges).toBe(true)
|
||||
expect(result.modifiedBlocks).toHaveLength(1)
|
||||
expect(result.modifiedBlocks[0].id).toBe('block1')
|
||||
expect(result.modifiedBlocks[0].changes.length).toBeGreaterThan(0)
|
||||
const modelChange = result.modifiedBlocks[0].changes.find((c) => c.field === 'model')
|
||||
expect(modelChange).toBeDefined()
|
||||
expect(modelChange?.oldValue).toBe('gpt-4o')
|
||||
expect(modelChange?.newValue).toBe('claude-sonnet')
|
||||
})
|
||||
})
|
||||
|
||||
describe('Edge Changes', () => {
|
||||
it.concurrent('should detect added edges', () => {
|
||||
const previousState = createWorkflowState({
|
||||
blocks: {
|
||||
block1: createBlock('block1'),
|
||||
block2: createBlock('block2'),
|
||||
},
|
||||
edges: [],
|
||||
})
|
||||
const currentState = createWorkflowState({
|
||||
blocks: {
|
||||
block1: createBlock('block1'),
|
||||
block2: createBlock('block2'),
|
||||
},
|
||||
edges: [{ id: 'e1', source: 'block1', target: 'block2' }],
|
||||
})
|
||||
const result = generateWorkflowDiffSummary(currentState, previousState)
|
||||
expect(result.hasChanges).toBe(true)
|
||||
expect(result.edgeChanges.added).toBe(1)
|
||||
expect(result.edgeChanges.removed).toBe(0)
|
||||
})
|
||||
|
||||
it.concurrent('should detect removed edges', () => {
|
||||
const previousState = createWorkflowState({
|
||||
blocks: {
|
||||
block1: createBlock('block1'),
|
||||
block2: createBlock('block2'),
|
||||
},
|
||||
edges: [{ id: 'e1', source: 'block1', target: 'block2' }],
|
||||
})
|
||||
const currentState = createWorkflowState({
|
||||
blocks: {
|
||||
block1: createBlock('block1'),
|
||||
block2: createBlock('block2'),
|
||||
},
|
||||
edges: [],
|
||||
})
|
||||
const result = generateWorkflowDiffSummary(currentState, previousState)
|
||||
expect(result.hasChanges).toBe(true)
|
||||
expect(result.edgeChanges.added).toBe(0)
|
||||
expect(result.edgeChanges.removed).toBe(1)
|
||||
})
|
||||
})
|
||||
|
||||
describe('Variable Changes', () => {
|
||||
it.concurrent('should detect added variables', () => {
|
||||
const previousState = createWorkflowState({
|
||||
blocks: { block1: createBlock('block1') },
|
||||
variables: {},
|
||||
})
|
||||
const currentState = createWorkflowState({
|
||||
blocks: { block1: createBlock('block1') },
|
||||
variables: { var1: { id: 'var1', name: 'test', type: 'string', value: 'hello' } },
|
||||
})
|
||||
const result = generateWorkflowDiffSummary(currentState, previousState)
|
||||
expect(result.hasChanges).toBe(true)
|
||||
expect(result.variableChanges.added).toBe(1)
|
||||
})
|
||||
|
||||
it.concurrent('should detect modified variables', () => {
|
||||
const previousState = createWorkflowState({
|
||||
blocks: { block1: createBlock('block1') },
|
||||
variables: { var1: { id: 'var1', name: 'test', type: 'string', value: 'hello' } },
|
||||
})
|
||||
const currentState = createWorkflowState({
|
||||
blocks: { block1: createBlock('block1') },
|
||||
variables: { var1: { id: 'var1', name: 'test', type: 'string', value: 'world' } },
|
||||
})
|
||||
const result = generateWorkflowDiffSummary(currentState, previousState)
|
||||
expect(result.hasChanges).toBe(true)
|
||||
expect(result.variableChanges.modified).toBe(1)
|
||||
})
|
||||
})
|
||||
|
||||
describe('Consistency with hasWorkflowChanged', () => {
|
||||
it.concurrent('hasChanges should match hasWorkflowChanged result', () => {
|
||||
const state1 = createWorkflowState({
|
||||
blocks: { block1: createBlock('block1') },
|
||||
})
|
||||
const state2 = createWorkflowState({
|
||||
blocks: {
|
||||
block1: createBlock('block1', {
|
||||
subBlocks: { prompt: { id: 'prompt', type: 'long-input', value: 'new value' } },
|
||||
}),
|
||||
},
|
||||
})
|
||||
|
||||
const diffResult = generateWorkflowDiffSummary(state2, state1)
|
||||
const hasChangedResult = hasWorkflowChanged(state2, state1)
|
||||
|
||||
expect(diffResult.hasChanges).toBe(hasChangedResult)
|
||||
})
|
||||
|
||||
it.concurrent('should return same result as hasWorkflowChanged for no changes', () => {
|
||||
const state = createWorkflowState({
|
||||
blocks: { block1: createBlock('block1') },
|
||||
})
|
||||
|
||||
const diffResult = generateWorkflowDiffSummary(state, state)
|
||||
const hasChangedResult = hasWorkflowChanged(state, state)
|
||||
|
||||
expect(diffResult.hasChanges).toBe(hasChangedResult)
|
||||
expect(diffResult.hasChanges).toBe(false)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
describe('formatDiffSummaryForDescription', () => {
|
||||
it.concurrent('should return no changes message when hasChanges is false', () => {
|
||||
const summary = {
|
||||
addedBlocks: [],
|
||||
removedBlocks: [],
|
||||
modifiedBlocks: [],
|
||||
edgeChanges: { added: 0, removed: 0 },
|
||||
loopChanges: { added: 0, removed: 0 },
|
||||
parallelChanges: { added: 0, removed: 0 },
|
||||
variableChanges: { added: 0, removed: 0, modified: 0 },
|
||||
hasChanges: false,
|
||||
}
|
||||
const result = formatDiffSummaryForDescription(summary)
|
||||
expect(result).toContain('No structural changes')
|
||||
})
|
||||
|
||||
it.concurrent('should format added blocks', () => {
|
||||
const summary = {
|
||||
addedBlocks: [{ id: 'block1', type: 'agent', name: 'My Agent' }],
|
||||
removedBlocks: [],
|
||||
modifiedBlocks: [],
|
||||
edgeChanges: { added: 0, removed: 0 },
|
||||
loopChanges: { added: 0, removed: 0 },
|
||||
parallelChanges: { added: 0, removed: 0 },
|
||||
variableChanges: { added: 0, removed: 0, modified: 0 },
|
||||
hasChanges: true,
|
||||
}
|
||||
const result = formatDiffSummaryForDescription(summary)
|
||||
expect(result).toContain('Added block: My Agent (agent)')
|
||||
})
|
||||
|
||||
it.concurrent('should format removed blocks', () => {
|
||||
const summary = {
|
||||
addedBlocks: [],
|
||||
removedBlocks: [{ id: 'block1', type: 'function', name: 'Old Function' }],
|
||||
modifiedBlocks: [],
|
||||
edgeChanges: { added: 0, removed: 0 },
|
||||
loopChanges: { added: 0, removed: 0 },
|
||||
parallelChanges: { added: 0, removed: 0 },
|
||||
variableChanges: { added: 0, removed: 0, modified: 0 },
|
||||
hasChanges: true,
|
||||
}
|
||||
const result = formatDiffSummaryForDescription(summary)
|
||||
expect(result).toContain('Removed block: Old Function (function)')
|
||||
})
|
||||
|
||||
it.concurrent('should format modified blocks with field changes', () => {
|
||||
const summary = {
|
||||
addedBlocks: [],
|
||||
removedBlocks: [],
|
||||
modifiedBlocks: [
|
||||
{
|
||||
id: 'block1',
|
||||
type: 'agent',
|
||||
name: 'Agent 1',
|
||||
changes: [{ field: 'model', oldValue: 'gpt-4o', newValue: 'claude-sonnet' }],
|
||||
},
|
||||
],
|
||||
edgeChanges: { added: 0, removed: 0 },
|
||||
loopChanges: { added: 0, removed: 0 },
|
||||
parallelChanges: { added: 0, removed: 0 },
|
||||
variableChanges: { added: 0, removed: 0, modified: 0 },
|
||||
hasChanges: true,
|
||||
}
|
||||
const result = formatDiffSummaryForDescription(summary)
|
||||
expect(result).toContain('Modified Agent 1')
|
||||
expect(result).toContain('model')
|
||||
expect(result).toContain('gpt-4o')
|
||||
expect(result).toContain('claude-sonnet')
|
||||
})
|
||||
|
||||
it.concurrent('should format edge changes', () => {
|
||||
const summary = {
|
||||
addedBlocks: [],
|
||||
removedBlocks: [],
|
||||
modifiedBlocks: [],
|
||||
edgeChanges: { added: 2, removed: 1 },
|
||||
loopChanges: { added: 0, removed: 0 },
|
||||
parallelChanges: { added: 0, removed: 0 },
|
||||
variableChanges: { added: 0, removed: 0, modified: 0 },
|
||||
hasChanges: true,
|
||||
}
|
||||
const result = formatDiffSummaryForDescription(summary)
|
||||
expect(result).toContain('Added 2 connection(s)')
|
||||
expect(result).toContain('Removed 1 connection(s)')
|
||||
})
|
||||
|
||||
it.concurrent('should format variable changes', () => {
|
||||
const summary = {
|
||||
addedBlocks: [],
|
||||
removedBlocks: [],
|
||||
modifiedBlocks: [],
|
||||
edgeChanges: { added: 0, removed: 0 },
|
||||
loopChanges: { added: 0, removed: 0 },
|
||||
parallelChanges: { added: 0, removed: 0 },
|
||||
variableChanges: { added: 1, removed: 0, modified: 2 },
|
||||
hasChanges: true,
|
||||
}
|
||||
const result = formatDiffSummaryForDescription(summary)
|
||||
expect(result).toContain('Variables:')
|
||||
expect(result).toContain('1 added')
|
||||
expect(result).toContain('2 modified')
|
||||
})
|
||||
})
|
||||
|
||||
@@ -10,7 +10,6 @@ import {
|
||||
sanitizeInputFormat,
|
||||
sanitizeTools,
|
||||
sanitizeVariable,
|
||||
sortEdges,
|
||||
} from './normalize'
|
||||
|
||||
/** Block with optional diff markers added by copilot */
|
||||
@@ -28,60 +27,109 @@ type SubBlockWithDiffMarker = {
|
||||
}
|
||||
|
||||
/**
|
||||
* Compare the current workflow state with the deployed state to detect meaningful changes
|
||||
* @param currentState - The current workflow state
|
||||
* @param deployedState - The deployed workflow state
|
||||
* @returns True if there are meaningful changes, false if only position changes or no changes
|
||||
* Compare the current workflow state with the deployed state to detect meaningful changes.
|
||||
* Uses generateWorkflowDiffSummary internally to ensure consistent change detection.
|
||||
*/
|
||||
export function hasWorkflowChanged(
|
||||
currentState: WorkflowState,
|
||||
deployedState: WorkflowState | null
|
||||
): boolean {
|
||||
// If no deployed state exists, then the workflow has changed
|
||||
if (!deployedState) return true
|
||||
return generateWorkflowDiffSummary(currentState, deployedState).hasChanges
|
||||
}
|
||||
|
||||
// 1. Compare edges (connections between blocks)
|
||||
const currentEdges = currentState.edges || []
|
||||
const deployedEdges = deployedState.edges || []
|
||||
/**
|
||||
* Represents a single field change with old and new values
|
||||
*/
|
||||
export interface FieldChange {
|
||||
field: string
|
||||
oldValue: unknown
|
||||
newValue: unknown
|
||||
}
|
||||
|
||||
const normalizedCurrentEdges = sortEdges(currentEdges.map(normalizeEdge))
|
||||
const normalizedDeployedEdges = sortEdges(deployedEdges.map(normalizeEdge))
|
||||
/**
|
||||
* Result of workflow diff analysis between two workflow states
|
||||
*/
|
||||
export interface WorkflowDiffSummary {
|
||||
addedBlocks: Array<{ id: string; type: string; name?: string }>
|
||||
removedBlocks: Array<{ id: string; type: string; name?: string }>
|
||||
modifiedBlocks: Array<{ id: string; type: string; name?: string; changes: FieldChange[] }>
|
||||
edgeChanges: { added: number; removed: number }
|
||||
loopChanges: { added: number; removed: number }
|
||||
parallelChanges: { added: number; removed: number }
|
||||
variableChanges: { added: number; removed: number; modified: number }
|
||||
hasChanges: boolean
|
||||
}
|
||||
|
||||
if (
|
||||
normalizedStringify(normalizedCurrentEdges) !== normalizedStringify(normalizedDeployedEdges)
|
||||
) {
|
||||
return true
|
||||
/**
|
||||
* Generate a detailed diff summary between two workflow states
|
||||
*/
|
||||
export function generateWorkflowDiffSummary(
|
||||
currentState: WorkflowState,
|
||||
previousState: WorkflowState | null
|
||||
): WorkflowDiffSummary {
|
||||
const result: WorkflowDiffSummary = {
|
||||
addedBlocks: [],
|
||||
removedBlocks: [],
|
||||
modifiedBlocks: [],
|
||||
edgeChanges: { added: 0, removed: 0 },
|
||||
loopChanges: { added: 0, removed: 0 },
|
||||
parallelChanges: { added: 0, removed: 0 },
|
||||
variableChanges: { added: 0, removed: 0, modified: 0 },
|
||||
hasChanges: false,
|
||||
}
|
||||
|
||||
// 2. Compare blocks and their configurations
|
||||
const currentBlockIds = Object.keys(currentState.blocks || {}).sort()
|
||||
const deployedBlockIds = Object.keys(deployedState.blocks || {}).sort()
|
||||
|
||||
if (
|
||||
currentBlockIds.length !== deployedBlockIds.length ||
|
||||
normalizedStringify(currentBlockIds) !== normalizedStringify(deployedBlockIds)
|
||||
) {
|
||||
return true
|
||||
if (!previousState) {
|
||||
const currentBlocks = currentState.blocks || {}
|
||||
for (const [id, block] of Object.entries(currentBlocks)) {
|
||||
result.addedBlocks.push({
|
||||
id,
|
||||
type: block.type,
|
||||
name: block.name,
|
||||
})
|
||||
}
|
||||
result.edgeChanges.added = (currentState.edges || []).length
|
||||
result.loopChanges.added = Object.keys(currentState.loops || {}).length
|
||||
result.parallelChanges.added = Object.keys(currentState.parallels || {}).length
|
||||
result.variableChanges.added = Object.keys(currentState.variables || {}).length
|
||||
result.hasChanges = true
|
||||
return result
|
||||
}
|
||||
|
||||
// 3. Build normalized representations of blocks for comparison
|
||||
const normalizedCurrentBlocks: Record<string, unknown> = {}
|
||||
const normalizedDeployedBlocks: Record<string, unknown> = {}
|
||||
const currentBlocks = currentState.blocks || {}
|
||||
const previousBlocks = previousState.blocks || {}
|
||||
const currentBlockIds = new Set(Object.keys(currentBlocks))
|
||||
const previousBlockIds = new Set(Object.keys(previousBlocks))
|
||||
|
||||
for (const blockId of currentBlockIds) {
|
||||
const currentBlock = currentState.blocks[blockId]
|
||||
const deployedBlock = deployedState.blocks[blockId]
|
||||
for (const id of currentBlockIds) {
|
||||
if (!previousBlockIds.has(id)) {
|
||||
const block = currentBlocks[id]
|
||||
result.addedBlocks.push({
|
||||
id,
|
||||
type: block.type,
|
||||
name: block.name,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Destructure and exclude non-functional fields:
|
||||
// - position: visual positioning only
|
||||
// - subBlocks: handled separately below
|
||||
// - layout: contains measuredWidth/measuredHeight from autolayout
|
||||
// - height: block height measurement from autolayout
|
||||
// - outputs: derived from subBlocks (e.g., inputFormat), already compared via subBlocks
|
||||
// - is_diff, field_diffs: diff markers from copilot edits
|
||||
const currentBlockWithDiff = currentBlock as BlockWithDiffMarkers
|
||||
const deployedBlockWithDiff = deployedBlock as BlockWithDiffMarkers
|
||||
for (const id of previousBlockIds) {
|
||||
if (!currentBlockIds.has(id)) {
|
||||
const block = previousBlocks[id]
|
||||
result.removedBlocks.push({
|
||||
id,
|
||||
type: block.type,
|
||||
name: block.name,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
for (const id of currentBlockIds) {
|
||||
if (!previousBlockIds.has(id)) continue
|
||||
|
||||
const currentBlock = currentBlocks[id] as BlockWithDiffMarkers
|
||||
const previousBlock = previousBlocks[id] as BlockWithDiffMarkers
|
||||
const changes: FieldChange[] = []
|
||||
|
||||
// Compare block-level properties (excluding visual-only fields)
|
||||
const {
|
||||
position: _currentPos,
|
||||
subBlocks: currentSubBlocks = {},
|
||||
@@ -91,187 +139,308 @@ export function hasWorkflowChanged(
|
||||
is_diff: _currentIsDiff,
|
||||
field_diffs: _currentFieldDiffs,
|
||||
...currentRest
|
||||
} = currentBlockWithDiff
|
||||
} = currentBlock
|
||||
|
||||
const {
|
||||
position: _deployedPos,
|
||||
subBlocks: deployedSubBlocks = {},
|
||||
layout: _deployedLayout,
|
||||
height: _deployedHeight,
|
||||
outputs: _deployedOutputs,
|
||||
is_diff: _deployedIsDiff,
|
||||
field_diffs: _deployedFieldDiffs,
|
||||
...deployedRest
|
||||
} = deployedBlockWithDiff
|
||||
position: _previousPos,
|
||||
subBlocks: previousSubBlocks = {},
|
||||
layout: _previousLayout,
|
||||
height: _previousHeight,
|
||||
outputs: _previousOutputs,
|
||||
is_diff: _previousIsDiff,
|
||||
field_diffs: _previousFieldDiffs,
|
||||
...previousRest
|
||||
} = previousBlock
|
||||
|
||||
// Also exclude width/height from data object (container dimensions from autolayout)
|
||||
const {
|
||||
width: _currentDataWidth,
|
||||
height: _currentDataHeight,
|
||||
...currentDataRest
|
||||
} = currentRest.data || {}
|
||||
const {
|
||||
width: _deployedDataWidth,
|
||||
height: _deployedDataHeight,
|
||||
...deployedDataRest
|
||||
} = deployedRest.data || {}
|
||||
// Exclude width/height from data object (container dimensions from autolayout)
|
||||
const { width: _cw, height: _ch, ...currentDataRest } = currentRest.data || {}
|
||||
const { width: _pw, height: _ph, ...previousDataRest } = previousRest.data || {}
|
||||
|
||||
normalizedCurrentBlocks[blockId] = {
|
||||
...currentRest,
|
||||
data: currentDataRest,
|
||||
const normalizedCurrentBlock = { ...currentRest, data: currentDataRest, subBlocks: undefined }
|
||||
const normalizedPreviousBlock = {
|
||||
...previousRest,
|
||||
data: previousDataRest,
|
||||
subBlocks: undefined,
|
||||
}
|
||||
|
||||
normalizedDeployedBlocks[blockId] = {
|
||||
...deployedRest,
|
||||
data: deployedDataRest,
|
||||
subBlocks: undefined,
|
||||
if (
|
||||
normalizedStringify(normalizedCurrentBlock) !== normalizedStringify(normalizedPreviousBlock)
|
||||
) {
|
||||
if (currentBlock.type !== previousBlock.type) {
|
||||
changes.push({ field: 'type', oldValue: previousBlock.type, newValue: currentBlock.type })
|
||||
}
|
||||
if (currentBlock.name !== previousBlock.name) {
|
||||
changes.push({ field: 'name', oldValue: previousBlock.name, newValue: currentBlock.name })
|
||||
}
|
||||
if (currentBlock.enabled !== previousBlock.enabled) {
|
||||
changes.push({
|
||||
field: 'enabled',
|
||||
oldValue: previousBlock.enabled,
|
||||
newValue: currentBlock.enabled,
|
||||
})
|
||||
}
|
||||
// Check other block properties
|
||||
const blockFields = ['horizontalHandles', 'advancedMode', 'triggerMode'] as const
|
||||
for (const field of blockFields) {
|
||||
if (currentBlock[field] !== previousBlock[field]) {
|
||||
changes.push({
|
||||
field,
|
||||
oldValue: previousBlock[field],
|
||||
newValue: currentBlock[field],
|
||||
})
|
||||
}
|
||||
}
|
||||
if (normalizedStringify(currentDataRest) !== normalizedStringify(previousDataRest)) {
|
||||
changes.push({ field: 'data', oldValue: previousDataRest, newValue: currentDataRest })
|
||||
}
|
||||
}
|
||||
|
||||
// Get all subBlock IDs from both states, excluding runtime metadata and UI-only elements
|
||||
// Compare subBlocks
|
||||
const allSubBlockIds = [
|
||||
...new Set([...Object.keys(currentSubBlocks), ...Object.keys(deployedSubBlocks)]),
|
||||
...new Set([...Object.keys(currentSubBlocks), ...Object.keys(previousSubBlocks)]),
|
||||
]
|
||||
.filter(
|
||||
(id) => !TRIGGER_RUNTIME_SUBBLOCK_IDS.includes(id) && !SYSTEM_SUBBLOCK_IDS.includes(id)
|
||||
(subId) =>
|
||||
!TRIGGER_RUNTIME_SUBBLOCK_IDS.includes(subId) && !SYSTEM_SUBBLOCK_IDS.includes(subId)
|
||||
)
|
||||
.sort()
|
||||
|
||||
// Normalize and compare each subBlock
|
||||
for (const subBlockId of allSubBlockIds) {
|
||||
// If the subBlock doesn't exist in either state, there's a difference
|
||||
if (!currentSubBlocks[subBlockId] || !deployedSubBlocks[subBlockId]) {
|
||||
return true
|
||||
for (const subId of allSubBlockIds) {
|
||||
const currentSub = currentSubBlocks[subId]
|
||||
const previousSub = previousSubBlocks[subId]
|
||||
|
||||
if (!currentSub || !previousSub) {
|
||||
changes.push({
|
||||
field: subId,
|
||||
oldValue: previousSub?.value ?? null,
|
||||
newValue: currentSub?.value ?? null,
|
||||
})
|
||||
continue
|
||||
}
|
||||
|
||||
// Get values with special handling for null/undefined
|
||||
// Using unknown type since sanitization functions return different types
|
||||
let currentValue: unknown = currentSubBlocks[subBlockId].value ?? null
|
||||
let deployedValue: unknown = deployedSubBlocks[subBlockId].value ?? null
|
||||
// Compare subBlock values with sanitization
|
||||
let currentValue: unknown = currentSub.value ?? null
|
||||
let previousValue: unknown = previousSub.value ?? null
|
||||
|
||||
if (subBlockId === 'tools' && Array.isArray(currentValue) && Array.isArray(deployedValue)) {
|
||||
if (subId === 'tools' && Array.isArray(currentValue) && Array.isArray(previousValue)) {
|
||||
currentValue = sanitizeTools(currentValue)
|
||||
deployedValue = sanitizeTools(deployedValue)
|
||||
previousValue = sanitizeTools(previousValue)
|
||||
}
|
||||
|
||||
if (
|
||||
subBlockId === 'inputFormat' &&
|
||||
Array.isArray(currentValue) &&
|
||||
Array.isArray(deployedValue)
|
||||
) {
|
||||
if (subId === 'inputFormat' && Array.isArray(currentValue) && Array.isArray(previousValue)) {
|
||||
currentValue = sanitizeInputFormat(currentValue)
|
||||
deployedValue = sanitizeInputFormat(deployedValue)
|
||||
previousValue = sanitizeInputFormat(previousValue)
|
||||
}
|
||||
|
||||
// For string values, compare directly to catch even small text changes
|
||||
if (typeof currentValue === 'string' && typeof deployedValue === 'string') {
|
||||
if (currentValue !== deployedValue) {
|
||||
return true
|
||||
if (typeof currentValue === 'string' && typeof previousValue === 'string') {
|
||||
if (currentValue !== previousValue) {
|
||||
changes.push({ field: subId, oldValue: previousSub.value, newValue: currentSub.value })
|
||||
}
|
||||
} else {
|
||||
// For other types, use normalized comparison
|
||||
const normalizedCurrentValue = normalizeValue(currentValue)
|
||||
const normalizedDeployedValue = normalizeValue(deployedValue)
|
||||
|
||||
if (
|
||||
normalizedStringify(normalizedCurrentValue) !==
|
||||
normalizedStringify(normalizedDeployedValue)
|
||||
) {
|
||||
return true
|
||||
const normalizedCurrent = normalizeValue(currentValue)
|
||||
const normalizedPrevious = normalizeValue(previousValue)
|
||||
if (normalizedStringify(normalizedCurrent) !== normalizedStringify(normalizedPrevious)) {
|
||||
changes.push({ field: subId, oldValue: previousSub.value, newValue: currentSub.value })
|
||||
}
|
||||
}
|
||||
|
||||
// Compare type and other properties (excluding diff markers and value)
|
||||
const currentSubBlockWithDiff = currentSubBlocks[subBlockId] as SubBlockWithDiffMarker
|
||||
const deployedSubBlockWithDiff = deployedSubBlocks[subBlockId] as SubBlockWithDiffMarker
|
||||
const { value: _cv, is_diff: _cd, ...currentSubBlockRest } = currentSubBlockWithDiff
|
||||
const { value: _dv, is_diff: _dd, ...deployedSubBlockRest } = deployedSubBlockWithDiff
|
||||
// Compare subBlock REST properties (type, id, etc. - excluding value and is_diff)
|
||||
const currentSubWithDiff = currentSub as SubBlockWithDiffMarker
|
||||
const previousSubWithDiff = previousSub as SubBlockWithDiffMarker
|
||||
const { value: _cv, is_diff: _cd, ...currentSubRest } = currentSubWithDiff
|
||||
const { value: _pv, is_diff: _pd, ...previousSubRest } = previousSubWithDiff
|
||||
|
||||
if (normalizedStringify(currentSubBlockRest) !== normalizedStringify(deployedSubBlockRest)) {
|
||||
return true
|
||||
if (normalizedStringify(currentSubRest) !== normalizedStringify(previousSubRest)) {
|
||||
changes.push({
|
||||
field: `${subId}.properties`,
|
||||
oldValue: previousSubRest,
|
||||
newValue: currentSubRest,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
const blocksEqual =
|
||||
normalizedStringify(normalizedCurrentBlocks[blockId]) ===
|
||||
normalizedStringify(normalizedDeployedBlocks[blockId])
|
||||
|
||||
if (!blocksEqual) {
|
||||
return true
|
||||
if (changes.length > 0) {
|
||||
result.modifiedBlocks.push({
|
||||
id,
|
||||
type: currentBlock.type,
|
||||
name: currentBlock.name,
|
||||
changes,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// 4. Compare loops
|
||||
const currentEdges = (currentState.edges || []).map(normalizeEdge)
|
||||
const previousEdges = (previousState.edges || []).map(normalizeEdge)
|
||||
const currentEdgeSet = new Set(currentEdges.map(normalizedStringify))
|
||||
const previousEdgeSet = new Set(previousEdges.map(normalizedStringify))
|
||||
|
||||
for (const edge of currentEdgeSet) {
|
||||
if (!previousEdgeSet.has(edge)) result.edgeChanges.added++
|
||||
}
|
||||
for (const edge of previousEdgeSet) {
|
||||
if (!currentEdgeSet.has(edge)) result.edgeChanges.removed++
|
||||
}
|
||||
|
||||
const currentLoops = currentState.loops || {}
|
||||
const deployedLoops = deployedState.loops || {}
|
||||
const previousLoops = previousState.loops || {}
|
||||
const currentLoopIds = Object.keys(currentLoops)
|
||||
const previousLoopIds = Object.keys(previousLoops)
|
||||
|
||||
const currentLoopIds = Object.keys(currentLoops).sort()
|
||||
const deployedLoopIds = Object.keys(deployedLoops).sort()
|
||||
|
||||
if (
|
||||
currentLoopIds.length !== deployedLoopIds.length ||
|
||||
normalizedStringify(currentLoopIds) !== normalizedStringify(deployedLoopIds)
|
||||
) {
|
||||
return true
|
||||
for (const id of currentLoopIds) {
|
||||
if (!previousLoopIds.includes(id)) {
|
||||
result.loopChanges.added++
|
||||
} else {
|
||||
const normalizedCurrent = normalizeValue(normalizeLoop(currentLoops[id]))
|
||||
const normalizedPrevious = normalizeValue(normalizeLoop(previousLoops[id]))
|
||||
if (normalizedStringify(normalizedCurrent) !== normalizedStringify(normalizedPrevious)) {
|
||||
result.loopChanges.added++
|
||||
result.loopChanges.removed++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (const loopId of currentLoopIds) {
|
||||
const normalizedCurrentLoop = normalizeValue(normalizeLoop(currentLoops[loopId]))
|
||||
const normalizedDeployedLoop = normalizeValue(normalizeLoop(deployedLoops[loopId]))
|
||||
|
||||
if (
|
||||
normalizedStringify(normalizedCurrentLoop) !== normalizedStringify(normalizedDeployedLoop)
|
||||
) {
|
||||
return true
|
||||
for (const id of previousLoopIds) {
|
||||
if (!currentLoopIds.includes(id)) {
|
||||
result.loopChanges.removed++
|
||||
}
|
||||
}
|
||||
|
||||
// 5. Compare parallels
|
||||
const currentParallels = currentState.parallels || {}
|
||||
const deployedParallels = deployedState.parallels || {}
|
||||
const previousParallels = previousState.parallels || {}
|
||||
const currentParallelIds = Object.keys(currentParallels)
|
||||
const previousParallelIds = Object.keys(previousParallels)
|
||||
|
||||
const currentParallelIds = Object.keys(currentParallels).sort()
|
||||
const deployedParallelIds = Object.keys(deployedParallels).sort()
|
||||
|
||||
if (
|
||||
currentParallelIds.length !== deployedParallelIds.length ||
|
||||
normalizedStringify(currentParallelIds) !== normalizedStringify(deployedParallelIds)
|
||||
) {
|
||||
return true
|
||||
for (const id of currentParallelIds) {
|
||||
if (!previousParallelIds.includes(id)) {
|
||||
result.parallelChanges.added++
|
||||
} else {
|
||||
const normalizedCurrent = normalizeValue(normalizeParallel(currentParallels[id]))
|
||||
const normalizedPrevious = normalizeValue(normalizeParallel(previousParallels[id]))
|
||||
if (normalizedStringify(normalizedCurrent) !== normalizedStringify(normalizedPrevious)) {
|
||||
result.parallelChanges.added++
|
||||
result.parallelChanges.removed++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (const parallelId of currentParallelIds) {
|
||||
const normalizedCurrentParallel = normalizeValue(
|
||||
normalizeParallel(currentParallels[parallelId])
|
||||
)
|
||||
const normalizedDeployedParallel = normalizeValue(
|
||||
normalizeParallel(deployedParallels[parallelId])
|
||||
)
|
||||
|
||||
if (
|
||||
normalizedStringify(normalizedCurrentParallel) !==
|
||||
normalizedStringify(normalizedDeployedParallel)
|
||||
) {
|
||||
return true
|
||||
for (const id of previousParallelIds) {
|
||||
if (!currentParallelIds.includes(id)) {
|
||||
result.parallelChanges.removed++
|
||||
}
|
||||
}
|
||||
|
||||
// 6. Compare variables
|
||||
const currentVariables = normalizeVariables(currentState.variables)
|
||||
const deployedVariables = normalizeVariables(deployedState.variables)
|
||||
const currentVars = normalizeVariables(currentState.variables)
|
||||
const previousVars = normalizeVariables(previousState.variables)
|
||||
const currentVarIds = Object.keys(currentVars)
|
||||
const previousVarIds = Object.keys(previousVars)
|
||||
|
||||
const normalizedCurrentVars = normalizeValue(
|
||||
Object.fromEntries(Object.entries(currentVariables).map(([id, v]) => [id, sanitizeVariable(v)]))
|
||||
)
|
||||
const normalizedDeployedVars = normalizeValue(
|
||||
Object.fromEntries(
|
||||
Object.entries(deployedVariables).map(([id, v]) => [id, sanitizeVariable(v)])
|
||||
)
|
||||
)
|
||||
result.variableChanges.added = currentVarIds.filter((id) => !previousVarIds.includes(id)).length
|
||||
result.variableChanges.removed = previousVarIds.filter((id) => !currentVarIds.includes(id)).length
|
||||
|
||||
if (normalizedStringify(normalizedCurrentVars) !== normalizedStringify(normalizedDeployedVars)) {
|
||||
return true
|
||||
for (const id of currentVarIds) {
|
||||
if (!previousVarIds.includes(id)) continue
|
||||
const currentVar = normalizeValue(sanitizeVariable(currentVars[id]))
|
||||
const previousVar = normalizeValue(sanitizeVariable(previousVars[id]))
|
||||
if (normalizedStringify(currentVar) !== normalizedStringify(previousVar)) {
|
||||
result.variableChanges.modified++
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
result.hasChanges =
|
||||
result.addedBlocks.length > 0 ||
|
||||
result.removedBlocks.length > 0 ||
|
||||
result.modifiedBlocks.length > 0 ||
|
||||
result.edgeChanges.added > 0 ||
|
||||
result.edgeChanges.removed > 0 ||
|
||||
result.loopChanges.added > 0 ||
|
||||
result.loopChanges.removed > 0 ||
|
||||
result.parallelChanges.added > 0 ||
|
||||
result.parallelChanges.removed > 0 ||
|
||||
result.variableChanges.added > 0 ||
|
||||
result.variableChanges.removed > 0 ||
|
||||
result.variableChanges.modified > 0
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
function formatValueForDisplay(value: unknown): string {
|
||||
if (value === null || value === undefined) return '(none)'
|
||||
if (typeof value === 'string') {
|
||||
if (value.length > 50) return `${value.slice(0, 50)}...`
|
||||
return value || '(empty)'
|
||||
}
|
||||
if (typeof value === 'boolean') return value ? 'enabled' : 'disabled'
|
||||
if (typeof value === 'number') return String(value)
|
||||
if (Array.isArray(value)) return `[${value.length} items]`
|
||||
if (typeof value === 'object') return `${JSON.stringify(value).slice(0, 50)}...`
|
||||
return String(value)
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert a WorkflowDiffSummary to a human-readable string for AI description generation
|
||||
*/
|
||||
export function formatDiffSummaryForDescription(summary: WorkflowDiffSummary): string {
|
||||
if (!summary.hasChanges) {
|
||||
return 'No structural changes detected (configuration may have changed)'
|
||||
}
|
||||
|
||||
const changes: string[] = []
|
||||
|
||||
for (const block of summary.addedBlocks) {
|
||||
const name = block.name || block.type
|
||||
changes.push(`Added block: ${name} (${block.type})`)
|
||||
}
|
||||
|
||||
for (const block of summary.removedBlocks) {
|
||||
const name = block.name || block.type
|
||||
changes.push(`Removed block: ${name} (${block.type})`)
|
||||
}
|
||||
|
||||
for (const block of summary.modifiedBlocks) {
|
||||
const name = block.name || block.type
|
||||
for (const change of block.changes.slice(0, 3)) {
|
||||
const oldStr = formatValueForDisplay(change.oldValue)
|
||||
const newStr = formatValueForDisplay(change.newValue)
|
||||
changes.push(`Modified ${name}: ${change.field} changed from "${oldStr}" to "${newStr}"`)
|
||||
}
|
||||
if (block.changes.length > 3) {
|
||||
changes.push(` ...and ${block.changes.length - 3} more changes in ${name}`)
|
||||
}
|
||||
}
|
||||
|
||||
if (summary.edgeChanges.added > 0) {
|
||||
changes.push(`Added ${summary.edgeChanges.added} connection(s)`)
|
||||
}
|
||||
if (summary.edgeChanges.removed > 0) {
|
||||
changes.push(`Removed ${summary.edgeChanges.removed} connection(s)`)
|
||||
}
|
||||
|
||||
if (summary.loopChanges.added > 0) {
|
||||
changes.push(`Added ${summary.loopChanges.added} loop(s)`)
|
||||
}
|
||||
if (summary.loopChanges.removed > 0) {
|
||||
changes.push(`Removed ${summary.loopChanges.removed} loop(s)`)
|
||||
}
|
||||
|
||||
if (summary.parallelChanges.added > 0) {
|
||||
changes.push(`Added ${summary.parallelChanges.added} parallel group(s)`)
|
||||
}
|
||||
if (summary.parallelChanges.removed > 0) {
|
||||
changes.push(`Removed ${summary.parallelChanges.removed} parallel group(s)`)
|
||||
}
|
||||
|
||||
const varChanges: string[] = []
|
||||
if (summary.variableChanges.added > 0) {
|
||||
varChanges.push(`${summary.variableChanges.added} added`)
|
||||
}
|
||||
if (summary.variableChanges.removed > 0) {
|
||||
varChanges.push(`${summary.variableChanges.removed} removed`)
|
||||
}
|
||||
if (summary.variableChanges.modified > 0) {
|
||||
varChanges.push(`${summary.variableChanges.modified} modified`)
|
||||
}
|
||||
if (varChanges.length > 0) {
|
||||
changes.push(`Variables: ${varChanges.join(', ')}`)
|
||||
}
|
||||
|
||||
return changes.join('\n')
|
||||
}
|
||||
|
||||
@@ -23,11 +23,9 @@ import type {
|
||||
ContextExtensions,
|
||||
ExecutionCallbacks,
|
||||
IterationContext,
|
||||
SerializableExecutionState,
|
||||
} from '@/executor/execution/types'
|
||||
import type { ExecutionResult, NormalizedBlockOutput } from '@/executor/types'
|
||||
import { hasExecutionResult } from '@/executor/utils/errors'
|
||||
import { buildParallelSentinelEndId, buildSentinelEndId } from '@/executor/utils/subflow-utils'
|
||||
import { Serializer } from '@/serializer'
|
||||
|
||||
const logger = createLogger('ExecutionCore')
|
||||
@@ -42,12 +40,6 @@ export interface ExecuteWorkflowCoreOptions {
|
||||
abortSignal?: AbortSignal
|
||||
includeFileBase64?: boolean
|
||||
base64MaxBytes?: number
|
||||
stopAfterBlockId?: string
|
||||
/** Run-from-block mode: execute starting from a specific block using cached upstream outputs */
|
||||
runFromBlock?: {
|
||||
startBlockId: string
|
||||
sourceSnapshot: SerializableExecutionState
|
||||
}
|
||||
}
|
||||
|
||||
function parseVariableValueByType(value: unknown, type: string): unknown {
|
||||
@@ -122,8 +114,6 @@ export async function executeWorkflowCore(
|
||||
abortSignal,
|
||||
includeFileBase64,
|
||||
base64MaxBytes,
|
||||
stopAfterBlockId,
|
||||
runFromBlock,
|
||||
} = options
|
||||
const { metadata, workflow, input, workflowVariables, selectedOutputs } = snapshot
|
||||
const { requestId, workflowId, userId, triggerType, executionId, triggerBlockId, useDraftState } =
|
||||
@@ -256,16 +246,6 @@ export async function executeWorkflowCore(
|
||||
|
||||
processedInput = input || {}
|
||||
|
||||
// Resolve stopAfterBlockId for loop/parallel containers to their sentinel-end IDs
|
||||
let resolvedStopAfterBlockId = stopAfterBlockId
|
||||
if (stopAfterBlockId) {
|
||||
if (serializedWorkflow.loops?.[stopAfterBlockId]) {
|
||||
resolvedStopAfterBlockId = buildSentinelEndId(stopAfterBlockId)
|
||||
} else if (serializedWorkflow.parallels?.[stopAfterBlockId]) {
|
||||
resolvedStopAfterBlockId = buildParallelSentinelEndId(stopAfterBlockId)
|
||||
}
|
||||
}
|
||||
|
||||
// Create and execute workflow with callbacks
|
||||
if (resumeFromSnapshot) {
|
||||
logger.info(`[${requestId}] Resume execution detected`, {
|
||||
@@ -316,7 +296,6 @@ export async function executeWorkflowCore(
|
||||
abortSignal,
|
||||
includeFileBase64,
|
||||
base64MaxBytes,
|
||||
stopAfterBlockId: resolvedStopAfterBlockId,
|
||||
}
|
||||
|
||||
const executorInstance = new Executor({
|
||||
@@ -339,13 +318,10 @@ export async function executeWorkflowCore(
|
||||
}
|
||||
}
|
||||
|
||||
const result = runFromBlock
|
||||
? ((await executorInstance.executeFromBlock(
|
||||
workflowId,
|
||||
runFromBlock.startBlockId,
|
||||
runFromBlock.sourceSnapshot
|
||||
)) as ExecutionResult)
|
||||
: ((await executorInstance.execute(workflowId, resolvedTriggerBlockId)) as ExecutionResult)
|
||||
const result = (await executorInstance.execute(
|
||||
workflowId,
|
||||
resolvedTriggerBlockId
|
||||
)) as ExecutionResult
|
||||
|
||||
// Build trace spans for logging from the full execution result
|
||||
const { traceSpans, totalDuration } = buildTraceSpans(result)
|
||||
|
||||
@@ -180,140 +180,3 @@ export function formatSSEEvent(event: ExecutionEvent): string {
|
||||
export function encodeSSEEvent(event: ExecutionEvent): Uint8Array {
|
||||
return new TextEncoder().encode(formatSSEEvent(event))
|
||||
}
|
||||
|
||||
/**
|
||||
* Options for creating SSE execution callbacks
|
||||
*/
|
||||
export interface SSECallbackOptions {
|
||||
executionId: string
|
||||
workflowId: string
|
||||
controller: ReadableStreamDefaultController<Uint8Array>
|
||||
isStreamClosed: () => boolean
|
||||
setStreamClosed: () => void
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates SSE callbacks for workflow execution streaming
|
||||
*/
|
||||
export function createSSECallbacks(options: SSECallbackOptions) {
|
||||
const { executionId, workflowId, controller, isStreamClosed, setStreamClosed } = options
|
||||
|
||||
const sendEvent = (event: ExecutionEvent) => {
|
||||
if (isStreamClosed()) return
|
||||
try {
|
||||
controller.enqueue(encodeSSEEvent(event))
|
||||
} catch {
|
||||
setStreamClosed()
|
||||
}
|
||||
}
|
||||
|
||||
const onBlockStart = async (
|
||||
blockId: string,
|
||||
blockName: string,
|
||||
blockType: string,
|
||||
iterationContext?: { iterationCurrent: number; iterationTotal: number; iterationType: string }
|
||||
) => {
|
||||
sendEvent({
|
||||
type: 'block:started',
|
||||
timestamp: new Date().toISOString(),
|
||||
executionId,
|
||||
workflowId,
|
||||
data: {
|
||||
blockId,
|
||||
blockName,
|
||||
blockType,
|
||||
...(iterationContext && {
|
||||
iterationCurrent: iterationContext.iterationCurrent,
|
||||
iterationTotal: iterationContext.iterationTotal,
|
||||
iterationType: iterationContext.iterationType as any,
|
||||
}),
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
const onBlockComplete = async (
|
||||
blockId: string,
|
||||
blockName: string,
|
||||
blockType: string,
|
||||
callbackData: { input?: unknown; output: any; executionTime: number },
|
||||
iterationContext?: { iterationCurrent: number; iterationTotal: number; iterationType: string }
|
||||
) => {
|
||||
const hasError = callbackData.output?.error
|
||||
const iterationData = iterationContext
|
||||
? {
|
||||
iterationCurrent: iterationContext.iterationCurrent,
|
||||
iterationTotal: iterationContext.iterationTotal,
|
||||
iterationType: iterationContext.iterationType as any,
|
||||
}
|
||||
: {}
|
||||
|
||||
if (hasError) {
|
||||
sendEvent({
|
||||
type: 'block:error',
|
||||
timestamp: new Date().toISOString(),
|
||||
executionId,
|
||||
workflowId,
|
||||
data: {
|
||||
blockId,
|
||||
blockName,
|
||||
blockType,
|
||||
input: callbackData.input,
|
||||
error: callbackData.output.error,
|
||||
durationMs: callbackData.executionTime || 0,
|
||||
...iterationData,
|
||||
},
|
||||
})
|
||||
} else {
|
||||
sendEvent({
|
||||
type: 'block:completed',
|
||||
timestamp: new Date().toISOString(),
|
||||
executionId,
|
||||
workflowId,
|
||||
data: {
|
||||
blockId,
|
||||
blockName,
|
||||
blockType,
|
||||
input: callbackData.input,
|
||||
output: callbackData.output,
|
||||
durationMs: callbackData.executionTime || 0,
|
||||
...iterationData,
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
const onStream = async (streamingExecution: unknown) => {
|
||||
const streamingExec = streamingExecution as { stream: ReadableStream; execution: any }
|
||||
const blockId = streamingExec.execution?.blockId
|
||||
const reader = streamingExec.stream.getReader()
|
||||
const decoder = new TextDecoder()
|
||||
|
||||
try {
|
||||
while (true) {
|
||||
const { done, value } = await reader.read()
|
||||
if (done) break
|
||||
const chunk = decoder.decode(value, { stream: true })
|
||||
sendEvent({
|
||||
type: 'stream:chunk',
|
||||
timestamp: new Date().toISOString(),
|
||||
executionId,
|
||||
workflowId,
|
||||
data: { blockId, chunk },
|
||||
})
|
||||
}
|
||||
sendEvent({
|
||||
type: 'stream:done',
|
||||
timestamp: new Date().toISOString(),
|
||||
executionId,
|
||||
workflowId,
|
||||
data: { blockId },
|
||||
})
|
||||
} finally {
|
||||
try {
|
||||
reader.releaseLock()
|
||||
} catch {}
|
||||
}
|
||||
}
|
||||
|
||||
return { sendEvent, onBlockStart, onBlockComplete, onStream }
|
||||
}
|
||||
|
||||
@@ -27,6 +27,7 @@ export interface WorkflowDeploymentVersionResponse {
|
||||
id: string
|
||||
version: number
|
||||
name?: string | null
|
||||
description?: string | null
|
||||
isActive: boolean
|
||||
createdAt: string
|
||||
createdBy?: string | null
|
||||
|
||||
@@ -35,23 +35,4 @@ export const useExecutionStore = create<ExecutionState & ExecutionActions>()((se
|
||||
},
|
||||
clearRunPath: () => set({ lastRunPath: new Map(), lastRunEdges: new Map() }),
|
||||
reset: () => set(initialState),
|
||||
|
||||
setLastExecutionSnapshot: (workflowId, snapshot) => {
|
||||
const { lastExecutionSnapshots } = get()
|
||||
const newSnapshots = new Map(lastExecutionSnapshots)
|
||||
newSnapshots.set(workflowId, snapshot)
|
||||
set({ lastExecutionSnapshots: newSnapshots })
|
||||
},
|
||||
|
||||
getLastExecutionSnapshot: (workflowId) => {
|
||||
const { lastExecutionSnapshots } = get()
|
||||
return lastExecutionSnapshots.get(workflowId)
|
||||
},
|
||||
|
||||
clearLastExecutionSnapshot: (workflowId) => {
|
||||
const { lastExecutionSnapshots } = get()
|
||||
const newSnapshots = new Map(lastExecutionSnapshots)
|
||||
newSnapshots.delete(workflowId)
|
||||
set({ lastExecutionSnapshots: newSnapshots })
|
||||
},
|
||||
}))
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import type { Executor } from '@/executor'
|
||||
import type { SerializableExecutionState } from '@/executor/execution/types'
|
||||
import type { ExecutionContext } from '@/executor/types'
|
||||
|
||||
/**
|
||||
@@ -19,9 +18,16 @@ export interface ExecutionState {
|
||||
pendingBlocks: string[]
|
||||
executor: Executor | null
|
||||
debugContext: ExecutionContext | null
|
||||
/**
|
||||
* Tracks blocks from the last execution run and their success/error status.
|
||||
* Cleared when a new run starts. Used to show run path indicators (rings on blocks).
|
||||
*/
|
||||
lastRunPath: Map<string, BlockRunStatus>
|
||||
/**
|
||||
* Tracks edges from the last execution run and their success/error status.
|
||||
* Cleared when a new run starts. Used to show run path indicators on edges.
|
||||
*/
|
||||
lastRunEdges: Map<string, EdgeRunStatus>
|
||||
lastExecutionSnapshots: Map<string, SerializableExecutionState>
|
||||
}
|
||||
|
||||
export interface ExecutionActions {
|
||||
@@ -35,9 +41,6 @@ export interface ExecutionActions {
|
||||
setEdgeRunStatus: (edgeId: string, status: EdgeRunStatus) => void
|
||||
clearRunPath: () => void
|
||||
reset: () => void
|
||||
setLastExecutionSnapshot: (workflowId: string, snapshot: SerializableExecutionState) => void
|
||||
getLastExecutionSnapshot: (workflowId: string) => SerializableExecutionState | undefined
|
||||
clearLastExecutionSnapshot: (workflowId: string) => void
|
||||
}
|
||||
|
||||
export const initialState: ExecutionState = {
|
||||
@@ -49,5 +52,4 @@ export const initialState: ExecutionState = {
|
||||
debugContext: null,
|
||||
lastRunPath: new Map(),
|
||||
lastRunEdges: new Map(),
|
||||
lastExecutionSnapshots: new Map(),
|
||||
}
|
||||
|
||||
@@ -129,7 +129,7 @@ export const useSearchModalStore = create<SearchModalState>()(
|
||||
.filter((op) => allowedBlockTypes.has(op.blockType))
|
||||
.map((op) => ({
|
||||
id: op.id,
|
||||
name: op.operationName,
|
||||
name: `${op.serviceName}: ${op.operationName}`,
|
||||
searchValue: `${op.serviceName} ${op.operationName}`,
|
||||
icon: op.icon,
|
||||
bgColor: op.bgColor,
|
||||
|
||||
@@ -4,9 +4,8 @@
|
||||
# Global configuration
|
||||
global:
|
||||
imageRegistry: "ghcr.io"
|
||||
# Use "managed-csi-premium" for Premium SSD, "managed-csi" for Standard SSD
|
||||
# IMPORTANT: For production, use a StorageClass with reclaimPolicy: Retain
|
||||
# to protect database volumes from accidental deletion.
|
||||
# Use "managed-csi-premium" for Premium SSD (requires Premium storage-capable VMs like Standard_DS*)
|
||||
# Use "managed-csi" for Standard SSD (works with all VM types)
|
||||
storageClass: "managed-csi"
|
||||
|
||||
# Main application
|
||||
|
||||
@@ -4,7 +4,6 @@
|
||||
# Global configuration
|
||||
global:
|
||||
imageRegistry: "ghcr.io"
|
||||
# For production, use a StorageClass with reclaimPolicy: Retain
|
||||
storageClass: "managed-csi-premium"
|
||||
|
||||
# Main application
|
||||
|
||||
@@ -11,12 +11,12 @@ spec:
|
||||
duration: {{ .Values.postgresql.tls.duration | default "87600h" }} # Default: 10 years
|
||||
renewBefore: {{ .Values.postgresql.tls.renewBefore | default "2160h" }} # Default: 90 days before expiry
|
||||
isCA: false
|
||||
{{- if .Values.postgresql.tls.rotationPolicy }}
|
||||
rotationPolicy: {{ .Values.postgresql.tls.rotationPolicy }}
|
||||
{{- end }}
|
||||
privateKey:
|
||||
algorithm: {{ .Values.postgresql.tls.privateKey.algorithm | default "RSA" }}
|
||||
size: {{ .Values.postgresql.tls.privateKey.size | default 4096 }}
|
||||
{{- if .Values.postgresql.tls.rotationPolicy }}
|
||||
rotationPolicy: {{ .Values.postgresql.tls.rotationPolicy }}
|
||||
{{- end }}
|
||||
usages:
|
||||
- server auth
|
||||
- client auth
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
{{- if and .Values.branding.enabled (or .Values.branding.files .Values.branding.binaryFiles) }}
|
||||
{{- if .Values.branding.enabled }}
|
||||
---
|
||||
# Branding ConfigMap
|
||||
# Mounts custom branding assets (logos, CSS, etc.) into the application
|
||||
|
||||
@@ -110,10 +110,9 @@ spec:
|
||||
{{- end }}
|
||||
{{- include "sim.resources" .Values.app | nindent 10 }}
|
||||
{{- include "sim.securityContext" .Values.app | nindent 10 }}
|
||||
{{- $hasBranding := and .Values.branding.enabled (or .Values.branding.files .Values.branding.binaryFiles) }}
|
||||
{{- if or $hasBranding .Values.extraVolumeMounts .Values.app.extraVolumeMounts }}
|
||||
{{- if or .Values.branding.enabled .Values.extraVolumeMounts .Values.app.extraVolumeMounts }}
|
||||
volumeMounts:
|
||||
{{- if $hasBranding }}
|
||||
{{- if .Values.branding.enabled }}
|
||||
- name: branding
|
||||
mountPath: {{ .Values.branding.mountPath | default "/app/public/branding" }}
|
||||
readOnly: true
|
||||
@@ -125,10 +124,9 @@ spec:
|
||||
{{- toYaml . | nindent 12 }}
|
||||
{{- end }}
|
||||
{{- end }}
|
||||
{{- $hasBranding := and .Values.branding.enabled (or .Values.branding.files .Values.branding.binaryFiles) }}
|
||||
{{- if or $hasBranding .Values.extraVolumes .Values.app.extraVolumes }}
|
||||
{{- if or .Values.branding.enabled .Values.extraVolumes .Values.app.extraVolumes }}
|
||||
volumes:
|
||||
{{- if $hasBranding }}
|
||||
{{- if .Values.branding.enabled }}
|
||||
- name: branding
|
||||
configMap:
|
||||
name: {{ include "sim.fullname" . }}-branding
|
||||
|
||||
1
packages/db/migrations/0148_aberrant_venom.sql
Normal file
1
packages/db/migrations/0148_aberrant_venom.sql
Normal file
@@ -0,0 +1 @@
|
||||
ALTER TABLE "workflow_deployment_version" ADD COLUMN "description" text;
|
||||
10347
packages/db/migrations/meta/0148_snapshot.json
Normal file
10347
packages/db/migrations/meta/0148_snapshot.json
Normal file
File diff suppressed because it is too large
Load Diff
@@ -1030,6 +1030,13 @@
|
||||
"when": 1769134350805,
|
||||
"tag": "0147_rare_firebrand",
|
||||
"breakpoints": true
|
||||
},
|
||||
{
|
||||
"idx": 148,
|
||||
"version": "7",
|
||||
"when": 1769626313827,
|
||||
"tag": "0148_aberrant_venom",
|
||||
"breakpoints": true
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
@@ -1634,6 +1634,7 @@ export const workflowDeploymentVersion = pgTable(
|
||||
.references(() => workflow.id, { onDelete: 'cascade' }),
|
||||
version: integer('version').notNull(),
|
||||
name: text('name'),
|
||||
description: text('description'),
|
||||
state: json('state').notNull(),
|
||||
isActive: boolean('is_active').notNull().default(false),
|
||||
createdAt: timestamp('created_at').notNull().defaultNow(),
|
||||
|
||||
Reference in New Issue
Block a user