Compare commits

..

8 Commits

Author SHA1 Message Date
Siddharth Ganesan
da5d4ac9d5 Fix 2026-01-26 17:16:35 -08:00
Siddharth Ganesan
e8534bea7a Fixes 2026-01-26 16:40:14 -08:00
Siddharth Ganesan
3d0b810a8e Run from block 2026-01-26 16:19:41 -08:00
Waleed
ebf2852733 fix(copilot): reliable zoom to changed blocks after diff applied (#3011) 2026-01-26 13:54:01 -08:00
Waleed
12495ef89c feat(ci): auto-create github releases and add workflow permissions (#3009) 2026-01-26 13:28:59 -08:00
Waleed
d8d85fccf0 feat(helm): add branding configmap for custom assets (#3008) 2026-01-26 13:19:23 -08:00
Waleed
56bc809c6f fix(docs): separate local and blob asset resolution for quick-reference (#3007)
* fix(docs): separate local and blob asset resolution for quick-reference

ActionImage now uses local paths directly for PNGs while ActionVideo
uses blob storage with proper path normalization (strips static/ prefix).

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* refactor(docs): simplify asset resolution by using correct paths directly

Remove path normalization logic from action-media component. Instead,
use the appropriate paths in MDX:
- PNGs: /static/quick-reference/... (local)
- MP4s: quick-reference/... (blob via getAssetUrl)

---------

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-26 13:07:11 -08:00
Vikhyath Mondreti
c7bd48573a fix(codegen): function prologue resolution edge cases (#3005)
* fix(codegen): function prologue resolution edge cases

* remove hacky fallback

* case insensitive lookup

* fix python nan and inf resolution

* remove template literal check

* fix tests

* consolidate literal gen
2026-01-26 10:16:13 -08:00
40 changed files with 1766 additions and 217 deletions

View File

@@ -10,6 +10,9 @@ concurrency:
group: ci-${{ github.ref }}
cancel-in-progress: false
permissions:
contents: read
jobs:
test-build:
name: Test and Build
@@ -278,3 +281,30 @@ jobs:
if: needs.check-docs-changes.outputs.docs_changed == 'true'
uses: ./.github/workflows/docs-embeddings.yml
secrets: inherit
# Create GitHub Release (only for version commits on main, after all builds complete)
create-release:
name: Create GitHub Release
runs-on: blacksmith-4vcpu-ubuntu-2404
needs: [create-ghcr-manifests, detect-version]
if: needs.detect-version.outputs.is_release == 'true'
permissions:
contents: write
steps:
- name: Checkout code
uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Setup Bun
uses: oven-sh/setup-bun@v2
with:
bun-version: latest
- name: Install dependencies
run: bun install --frozen-lockfile
- name: Create release
env:
GH_PAT: ${{ secrets.GITHUB_TOKEN }}
run: bun run scripts/create-single-release.ts ${{ needs.detect-version.outputs.version }}

View File

@@ -4,6 +4,9 @@ on:
workflow_call:
workflow_dispatch: # Allow manual triggering
permissions:
contents: read
jobs:
process-docs-embeddings:
name: Process Documentation Embeddings

View File

@@ -4,6 +4,9 @@ on:
workflow_call:
workflow_dispatch:
permissions:
contents: read
jobs:
migrate:
name: Apply Database Migrations

View File

@@ -6,6 +6,9 @@ on:
paths:
- 'packages/cli/**'
permissions:
contents: read
jobs:
publish-npm:
runs-on: blacksmith-4vcpu-ubuntu-2404

View File

@@ -6,6 +6,9 @@ on:
paths:
- 'packages/python-sdk/**'
permissions:
contents: write
jobs:
publish-pypi:
runs-on: blacksmith-4vcpu-ubuntu-2404

View File

@@ -6,6 +6,9 @@ on:
paths:
- 'packages/ts-sdk/**'
permissions:
contents: write
jobs:
publish-npm:
runs-on: blacksmith-4vcpu-ubuntu-2404

View File

@@ -4,6 +4,9 @@ on:
workflow_call:
workflow_dispatch:
permissions:
contents: read
jobs:
test-build:
name: Test and Build

View File

@@ -13,11 +13,9 @@ interface ActionVideoProps {
}
export function ActionImage({ src, alt }: ActionImageProps) {
const resolvedSrc = getAssetUrl(src.startsWith('/') ? src.slice(1) : src)
return (
<img
src={resolvedSrc}
src={src}
alt={alt}
className='inline-block w-full max-w-[200px] rounded border border-neutral-200 dark:border-neutral-700'
/>
@@ -25,7 +23,7 @@ export function ActionImage({ src, alt }: ActionImageProps) {
}
export function ActionVideo({ src, alt }: ActionVideoProps) {
const resolvedSrc = getAssetUrl(src.startsWith('/') ? src.slice(1) : src)
const resolvedSrc = getAssetUrl(src)
return (
<video

View File

@@ -22,17 +22,17 @@ A quick lookup for everyday actions in the Sim workflow editor. For keyboard sho
<tr>
<td>Create a workspace</td>
<td>Click workspace dropdown → **New Workspace**</td>
<td><ActionVideo src="/static/quick-reference/create-workspace.mp4" alt="Create workspace" /></td>
<td><ActionVideo src="quick-reference/create-workspace.mp4" alt="Create workspace" /></td>
</tr>
<tr>
<td>Switch workspaces</td>
<td>Click workspace dropdown → Select workspace</td>
<td><ActionVideo src="/static/quick-reference/switch-workspace.mp4" alt="Switch workspaces" /></td>
<td><ActionVideo src="quick-reference/switch-workspace.mp4" alt="Switch workspaces" /></td>
</tr>
<tr>
<td>Invite team members</td>
<td>Sidebar → **Invite**</td>
<td><ActionVideo src="/static/quick-reference/invite.mp4" alt="Invite team members" /></td>
<td><ActionVideo src="quick-reference/invite.mp4" alt="Invite team members" /></td>
</tr>
<tr>
<td>Rename a workspace</td>
@@ -69,7 +69,7 @@ A quick lookup for everyday actions in the Sim workflow editor. For keyboard sho
<tr>
<td>Reorder / move workflows</td>
<td>Drag workflow up/down or onto a folder</td>
<td><ActionVideo src="/static/quick-reference/reordering.mp4" alt="Reorder workflows" /></td>
<td><ActionVideo src="quick-reference/reordering.mp4" alt="Reorder workflows" /></td>
</tr>
<tr>
<td>Import a workflow</td>
@@ -79,7 +79,7 @@ A quick lookup for everyday actions in the Sim workflow editor. For keyboard sho
<tr>
<td>Multi-select workflows</td>
<td>`Mod+Click` or `Shift+Click` workflows in sidebar</td>
<td><ActionVideo src="/static/quick-reference/multiselect.mp4" alt="Multi-select workflows" /></td>
<td><ActionVideo src="quick-reference/multiselect.mp4" alt="Multi-select workflows" /></td>
</tr>
<tr>
<td>Open in new tab</td>
@@ -144,17 +144,17 @@ A quick lookup for everyday actions in the Sim workflow editor. For keyboard sho
<tr>
<td>Add a block</td>
<td>Drag from Toolbar panel, or right-click canvas → **Add Block**</td>
<td><ActionVideo src="/static/quick-reference/add-block.mp4" alt="Add a block" /></td>
<td><ActionVideo src="quick-reference/add-block.mp4" alt="Add a block" /></td>
</tr>
<tr>
<td>Multi-select blocks</td>
<td>`Mod+Click` additional blocks, or shift-drag to draw selection box</td>
<td><ActionVideo src="/static/quick-reference/multiselect-blocks.mp4" alt="Multi-select blocks" /></td>
<td><ActionVideo src="quick-reference/multiselect-blocks.mp4" alt="Multi-select blocks" /></td>
</tr>
<tr>
<td>Copy blocks</td>
<td>`Mod+C` with blocks selected</td>
<td rowSpan={2}><ActionVideo src="/static/quick-reference/copy-paste.mp4" alt="Copy and paste blocks" /></td>
<td rowSpan={2}><ActionVideo src="quick-reference/copy-paste.mp4" alt="Copy and paste blocks" /></td>
</tr>
<tr>
<td>Paste blocks</td>
@@ -163,7 +163,7 @@ A quick lookup for everyday actions in the Sim workflow editor. For keyboard sho
<tr>
<td>Duplicate blocks</td>
<td>Right-click → **Duplicate**</td>
<td><ActionVideo src="/static/quick-reference/duplicate-block.mp4" alt="Duplicate blocks" /></td>
<td><ActionVideo src="quick-reference/duplicate-block.mp4" alt="Duplicate blocks" /></td>
</tr>
<tr>
<td>Delete blocks</td>
@@ -173,7 +173,7 @@ A quick lookup for everyday actions in the Sim workflow editor. For keyboard sho
<tr>
<td>Rename a block</td>
<td>Click block name in header, or edit in the Editor panel</td>
<td><ActionVideo src="/static/quick-reference/rename-block.mp4" alt="Rename a block" /></td>
<td><ActionVideo src="quick-reference/rename-block.mp4" alt="Rename a block" /></td>
</tr>
<tr>
<td>Enable/Disable a block</td>
@@ -183,12 +183,12 @@ A quick lookup for everyday actions in the Sim workflow editor. For keyboard sho
<tr>
<td>Toggle handle orientation</td>
<td>Right-click → **Toggle Handles**</td>
<td><ActionVideo src="/static/quick-reference/toggle-handles.mp4" alt="Toggle handle orientation" /></td>
<td><ActionVideo src="quick-reference/toggle-handles.mp4" alt="Toggle handle orientation" /></td>
</tr>
<tr>
<td>Configure a block</td>
<td>Select block → use Editor panel on right</td>
<td><ActionVideo src="/static/quick-reference/configure-block.mp4" alt="Configure a block" /></td>
<td><ActionVideo src="quick-reference/configure-block.mp4" alt="Configure a block" /></td>
</tr>
</tbody>
</table>
@@ -203,17 +203,17 @@ A quick lookup for everyday actions in the Sim workflow editor. For keyboard sho
<tr>
<td>Create a connection</td>
<td>Drag from output handle to input handle</td>
<td><ActionVideo src="/static/quick-reference/connect-blocks.mp4" alt="Connect blocks" /></td>
<td><ActionVideo src="quick-reference/connect-blocks.mp4" alt="Connect blocks" /></td>
</tr>
<tr>
<td>Delete a connection</td>
<td>Click edge to select → `Delete` key</td>
<td><ActionVideo src="/static/quick-reference/delete-connection.mp4" alt="Delete connection" /></td>
<td><ActionVideo src="quick-reference/delete-connection.mp4" alt="Delete connection" /></td>
</tr>
<tr>
<td>Use output in another block</td>
<td>Drag connection tag into input field</td>
<td><ActionVideo src="/static/quick-reference/connection-tag.mp4" alt="Use connection tag" /></td>
<td><ActionVideo src="quick-reference/connection-tag.mp4" alt="Use connection tag" /></td>
</tr>
</tbody>
</table>
@@ -228,7 +228,7 @@ A quick lookup for everyday actions in the Sim workflow editor. For keyboard sho
<tr>
<td>Search toolbar</td>
<td>`Mod+F`</td>
<td><ActionVideo src="/static/quick-reference/search-toolbar.mp4" alt="Search toolbar" /></td>
<td><ActionVideo src="quick-reference/search-toolbar.mp4" alt="Search toolbar" /></td>
</tr>
<tr>
<td>Search everything</td>
@@ -243,7 +243,7 @@ A quick lookup for everyday actions in the Sim workflow editor. For keyboard sho
<tr>
<td>Collapse/expand sidebar</td>
<td>Click collapse button on sidebar</td>
<td><ActionVideo src="/static/quick-reference/collapse-sidebar.mp4" alt="Collapse sidebar" /></td>
<td><ActionVideo src="quick-reference/collapse-sidebar.mp4" alt="Collapse sidebar" /></td>
</tr>
</tbody>
</table>
@@ -337,7 +337,7 @@ A quick lookup for everyday actions in the Sim workflow editor. For keyboard sho
</tr>
<tr>
<td>Copy API endpoint</td>
<td>Deploy tab → Copy API endpoint URL</td>
<td>Deploy tab → API → Copy API cURL</td>
<td><ActionImage src="/static/quick-reference/copy-api.png" alt="Copy API endpoint" /></td>
</tr>
</tbody>
@@ -367,7 +367,7 @@ A quick lookup for everyday actions in the Sim workflow editor. For keyboard sho
</tr>
<tr>
<td>Reference an environment variable</td>
<td>Use `&#123;&#123;ENV_VAR&#125;&#125;` syntax in block inputs</td>
<td>Use `{{ENV_VAR}}` syntax in block inputs</td>
<td><ActionImage src="/static/quick-reference/env-variable-reference.png" alt="Reference environment variable" /></td>
</tr>
</tbody>

View File

@@ -31,6 +31,7 @@ const DEFAULT_ENABLED_MODELS: Record<CopilotModelId, boolean> = {
'claude-4.5-opus': true,
'claude-4.1-opus': false,
'gemini-3-pro': true,
'auto': true,
}
// GET - Fetch user's enabled models

View File

@@ -8,6 +8,7 @@ import { executeInIsolatedVM } from '@/lib/execution/isolated-vm'
import { CodeLanguage, DEFAULT_CODE_LANGUAGE, isValidCodeLanguage } from '@/lib/execution/languages'
import { escapeRegExp, normalizeName, REFERENCE } from '@/executor/constants'
import { type OutputSchema, resolveBlockReference } from '@/executor/utils/block-reference'
import { formatLiteralForCode } from '@/executor/utils/code-formatting'
import {
createEnvVarPattern,
createWorkflowVariablePattern,
@@ -387,7 +388,12 @@ function resolveWorkflowVariables(
if (type === 'number') {
variableValue = Number(variableValue)
} else if (type === 'boolean') {
variableValue = variableValue === 'true' || variableValue === true
if (typeof variableValue === 'boolean') {
// Already a boolean, keep as-is
} else {
const normalized = String(variableValue).toLowerCase().trim()
variableValue = normalized === 'true'
}
} else if (type === 'json' && typeof variableValue === 'string') {
try {
variableValue = JSON.parse(variableValue)
@@ -687,11 +693,7 @@ export async function POST(req: NextRequest) {
prologue += `const environmentVariables = JSON.parse(${JSON.stringify(JSON.stringify(envVars))});\n`
prologueLineCount++
for (const [k, v] of Object.entries(contextVariables)) {
if (v === undefined) {
prologue += `const ${k} = undefined;\n`
} else {
prologue += `const ${k} = JSON.parse(${JSON.stringify(JSON.stringify(v))});\n`
}
prologue += `const ${k} = ${formatLiteralForCode(v, 'javascript')};\n`
prologueLineCount++
}
@@ -762,11 +764,7 @@ export async function POST(req: NextRequest) {
prologue += `environmentVariables = json.loads(${JSON.stringify(JSON.stringify(envVars))})\n`
prologueLineCount++
for (const [k, v] of Object.entries(contextVariables)) {
if (v === undefined) {
prologue += `${k} = None\n`
} else {
prologue += `${k} = json.loads(${JSON.stringify(JSON.stringify(v))})\n`
}
prologue += `${k} = ${formatLiteralForCode(v, 'python')}\n`
prologueLineCount++
}
const wrapped = [

View File

@@ -0,0 +1,377 @@
import { db, workflow as workflowTable } from '@sim/db'
import { createLogger } from '@sim/logger'
import { eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { v4 as uuidv4 } from 'uuid'
import { z } from 'zod'
import { checkHybridAuth } from '@/lib/auth/hybrid'
import { generateRequestId } from '@/lib/core/utils/request'
import { SSE_HEADERS } from '@/lib/core/utils/sse'
import { getPersonalAndWorkspaceEnv } from '@/lib/environment/utils'
import { markExecutionCancelled } from '@/lib/execution/cancellation'
import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/persistence/utils'
import { type ExecutionEvent, encodeSSEEvent } from '@/lib/workflows/executor/execution-events'
import { DAGExecutor } from '@/executor/execution/executor'
import type { IterationContext, SerializableExecutionState } from '@/executor/execution/types'
import type { NormalizedBlockOutput } from '@/executor/types'
import { hasExecutionResult } from '@/executor/utils/errors'
import { Serializer } from '@/serializer'
import { mergeSubblockState } from '@/stores/workflows/server-utils'
const logger = createLogger('ExecuteFromBlockAPI')
const ExecuteFromBlockSchema = z.object({
startBlockId: z.string().min(1, 'Start block ID is required'),
sourceSnapshot: z.object({
blockStates: z.record(z.any()),
executedBlocks: z.array(z.string()),
blockLogs: z.array(z.any()),
decisions: z.object({
router: z.record(z.string()),
condition: z.record(z.string()),
}),
completedLoops: z.array(z.string()),
loopExecutions: z.record(z.any()).optional(),
parallelExecutions: z.record(z.any()).optional(),
parallelBlockMapping: z.record(z.any()).optional(),
activeExecutionPath: z.array(z.string()),
}),
})
export const runtime = 'nodejs'
export const dynamic = 'force-dynamic'
/**
* POST /api/workflows/[id]/execute-from-block
*
* Executes a workflow starting from a specific block using cached outputs
* for upstream/unaffected blocks from the source snapshot.
*/
export async function POST(req: NextRequest, { params }: { params: Promise<{ id: string }> }) {
const requestId = generateRequestId()
const { id: workflowId } = await params
try {
const auth = await checkHybridAuth(req, { requireWorkflowId: false })
if (!auth.success || !auth.userId) {
return NextResponse.json({ error: auth.error || 'Unauthorized' }, { status: 401 })
}
const userId = auth.userId
let body: unknown
try {
body = await req.json()
} catch {
return NextResponse.json({ error: 'Invalid JSON body' }, { status: 400 })
}
const validation = ExecuteFromBlockSchema.safeParse(body)
if (!validation.success) {
logger.warn(`[${requestId}] Invalid request body:`, validation.error.errors)
return NextResponse.json(
{
error: 'Invalid request body',
details: validation.error.errors.map((e) => ({
path: e.path.join('.'),
message: e.message,
})),
},
{ status: 400 }
)
}
const { startBlockId, sourceSnapshot } = validation.data
logger.info(`[${requestId}] Starting run-from-block execution`, {
workflowId,
userId,
startBlockId,
executedBlocksCount: sourceSnapshot.executedBlocks.length,
})
const executionId = uuidv4()
// Load workflow record to get workspaceId
const [workflowRecord] = await db
.select({ workspaceId: workflowTable.workspaceId })
.from(workflowTable)
.where(eq(workflowTable.id, workflowId))
.limit(1)
if (!workflowRecord?.workspaceId) {
return NextResponse.json({ error: 'Workflow not found or has no workspace' }, { status: 404 })
}
const workspaceId = workflowRecord.workspaceId
// Load workflow state
const workflowData = await loadWorkflowFromNormalizedTables(workflowId)
if (!workflowData) {
return NextResponse.json({ error: 'Workflow state not found' }, { status: 404 })
}
const { blocks, edges, loops, parallels } = workflowData
// Merge block states
const mergedStates = mergeSubblockState(blocks)
// Get environment variables
const { personalDecrypted, workspaceDecrypted } = await getPersonalAndWorkspaceEnv(
userId,
workspaceId
)
const decryptedEnvVars: Record<string, string> = { ...personalDecrypted, ...workspaceDecrypted }
// Serialize workflow
const serializedWorkflow = new Serializer().serializeWorkflow(
mergedStates,
edges,
loops,
parallels,
true
)
const encoder = new TextEncoder()
const abortController = new AbortController()
let isStreamClosed = false
const stream = new ReadableStream<Uint8Array>({
async start(controller) {
const sendEvent = (event: ExecutionEvent) => {
if (isStreamClosed) return
try {
controller.enqueue(encodeSSEEvent(event))
} catch {
isStreamClosed = true
}
}
try {
const startTime = new Date()
sendEvent({
type: 'execution:started',
timestamp: startTime.toISOString(),
executionId,
workflowId,
data: {
startTime: startTime.toISOString(),
},
})
const onBlockStart = async (
blockId: string,
blockName: string,
blockType: string,
iterationContext?: IterationContext
) => {
sendEvent({
type: 'block:started',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
blockId,
blockName,
blockType,
...(iterationContext && {
iterationCurrent: iterationContext.iterationCurrent,
iterationTotal: iterationContext.iterationTotal,
iterationType: iterationContext.iterationType,
}),
},
})
}
const onBlockComplete = async (
blockId: string,
blockName: string,
blockType: string,
callbackData: { input?: unknown; output: NormalizedBlockOutput; executionTime: number },
iterationContext?: IterationContext
) => {
const hasError = (callbackData.output as any)?.error
if (hasError) {
sendEvent({
type: 'block:error',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
blockId,
blockName,
blockType,
input: callbackData.input,
error: (callbackData.output as any).error,
durationMs: callbackData.executionTime || 0,
...(iterationContext && {
iterationCurrent: iterationContext.iterationCurrent,
iterationTotal: iterationContext.iterationTotal,
iterationType: iterationContext.iterationType,
}),
},
})
} else {
sendEvent({
type: 'block:completed',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
blockId,
blockName,
blockType,
input: callbackData.input,
output: callbackData.output,
durationMs: callbackData.executionTime || 0,
...(iterationContext && {
iterationCurrent: iterationContext.iterationCurrent,
iterationTotal: iterationContext.iterationTotal,
iterationType: iterationContext.iterationType,
}),
},
})
}
}
const onStream = async (streamingExecution: unknown) => {
const streamingExec = streamingExecution as { stream: ReadableStream; execution: any }
const blockId = streamingExec.execution?.blockId
const reader = streamingExec.stream.getReader()
const decoder = new TextDecoder()
try {
while (true) {
const { done, value } = await reader.read()
if (done) break
const chunk = decoder.decode(value, { stream: true })
sendEvent({
type: 'stream:chunk',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: { blockId, chunk },
})
}
sendEvent({
type: 'stream:done',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: { blockId },
})
} catch (error) {
logger.error(`[${requestId}] Error streaming block content:`, error)
} finally {
try {
reader.releaseLock()
} catch {}
}
}
// Create executor and run from block
const executor = new DAGExecutor({
workflow: serializedWorkflow,
envVarValues: decryptedEnvVars,
workflowInput: {},
workflowVariables: {},
contextExtensions: {
stream: true,
executionId,
workspaceId,
userId,
isDeployedContext: false,
onBlockStart,
onBlockComplete,
onStream,
abortSignal: abortController.signal,
},
})
const result = await executor.executeFromBlock(
workflowId,
startBlockId,
sourceSnapshot as SerializableExecutionState
)
if (result.status === 'cancelled') {
sendEvent({
type: 'execution:cancelled',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
duration: result.metadata?.duration || 0,
},
})
return
}
sendEvent({
type: 'execution:completed',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
success: result.success,
output: result.output,
duration: result.metadata?.duration || 0,
startTime: result.metadata?.startTime || startTime.toISOString(),
endTime: result.metadata?.endTime || new Date().toISOString(),
},
})
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error(`[${requestId}] Run-from-block execution failed: ${errorMessage}`)
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
sendEvent({
type: 'execution:error',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
error: executionResult?.error || errorMessage,
duration: executionResult?.metadata?.duration || 0,
},
})
} finally {
if (!isStreamClosed) {
try {
controller.enqueue(encoder.encode('data: [DONE]\n\n'))
controller.close()
} catch {
// Stream already closed
}
}
}
},
cancel() {
isStreamClosed = true
logger.info(`[${requestId}] Client aborted SSE stream, signalling cancellation`)
abortController.abort()
markExecutionCancelled(executionId).catch(() => {})
},
})
return new NextResponse(stream, {
headers: {
...SSE_HEADERS,
'X-Execution-Id': executionId,
},
})
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error(`[${requestId}] Failed to start run-from-block execution:`, error)
return NextResponse.json(
{ error: errorMessage || 'Failed to start execution' },
{ status: 500 }
)
}
}

View File

@@ -1,11 +1,12 @@
import { memo, useCallback } from 'react'
import { ArrowLeftRight, ArrowUpDown, Circle, CircleOff, LogOut } from 'lucide-react'
import { ArrowLeftRight, ArrowUpDown, Circle, CircleOff, LogOut, Play } from 'lucide-react'
import { Button, Copy, Tooltip, Trash2 } from '@/components/emcn'
import { cn } from '@/lib/core/utils/cn'
import { isInputDefinitionTrigger } from '@/lib/workflows/triggers/input-definition-triggers'
import { useUserPermissionsContext } from '@/app/workspace/[workspaceId]/providers/workspace-permissions-provider'
import { validateTriggerPaste } from '@/app/workspace/[workspaceId]/w/[workflowId]/utils'
import { useCollaborativeWorkflow } from '@/hooks/use-collaborative-workflow'
import { useExecutionStore } from '@/stores/execution'
import { useNotificationStore } from '@/stores/notifications'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
import { useWorkflowStore } from '@/stores/workflows/workflow/store'
@@ -97,12 +98,42 @@ export const ActionBar = memo(
)
)
const { activeWorkflowId } = useWorkflowRegistry()
const { isExecuting, getLastExecutionSnapshot } = useExecutionStore()
const userPermissions = useUserPermissionsContext()
const isStartBlock = isInputDefinitionTrigger(blockType)
const isResponseBlock = blockType === 'response'
const isNoteBlock = blockType === 'note'
const isSubflowBlock = blockType === 'loop' || blockType === 'parallel'
const isInsideSubflow = parentId && (parentType === 'loop' || parentType === 'parallel')
// Check if run-from-block is available
const hasExecutionSnapshot = activeWorkflowId
? !!getLastExecutionSnapshot(activeWorkflowId)
: false
const wasExecuted = activeWorkflowId
? getLastExecutionSnapshot(activeWorkflowId)?.executedBlocks.includes(blockId) ?? false
: false
const canRunFromBlock =
hasExecutionSnapshot &&
wasExecuted &&
!isStartBlock &&
!isNoteBlock &&
!isSubflowBlock &&
!isInsideSubflow &&
!isExecuting
const handleRunFromBlock = useCallback(() => {
if (!activeWorkflowId || !canRunFromBlock) return
// Dispatch a custom event to trigger run-from-block execution
window.dispatchEvent(
new CustomEvent('run-from-block', {
detail: { blockId, workflowId: activeWorkflowId },
})
)
}, [blockId, activeWorkflowId, canRunFromBlock])
/**
* Get appropriate tooltip message based on disabled state
@@ -151,7 +182,7 @@ export const ActionBar = memo(
</Tooltip.Root>
)}
{isSubflowBlock && (
{canRunFromBlock && (
<Tooltip.Root>
<Tooltip.Trigger asChild>
<Button
@@ -159,17 +190,17 @@ export const ActionBar = memo(
onClick={(e) => {
e.stopPropagation()
if (!disabled) {
collaborativeBatchToggleBlockEnabled([blockId])
handleRunFromBlock()
}
}}
className={ACTION_BUTTON_STYLES}
disabled={disabled}
disabled={disabled || isExecuting}
>
{isEnabled ? <Circle className={ICON_SIZE} /> : <CircleOff className={ICON_SIZE} />}
<Play className={ICON_SIZE} />
</Button>
</Tooltip.Trigger>
<Tooltip.Content side='top'>
{getTooltipMessage(isEnabled ? 'Disable Block' : 'Enable Block')}
{isExecuting ? 'Execution in progress' : getTooltipMessage('Run from this block')}
</Tooltip.Content>
</Tooltip.Root>
)}

View File

@@ -246,6 +246,7 @@ export function getCommandDisplayLabel(commandId: string): string {
* Model configuration options
*/
export const MODEL_OPTIONS = [
{ value: 'auto', label: 'Auto' },
{ value: 'claude-4.5-opus', label: 'Claude 4.5 Opus' },
{ value: 'claude-4.5-sonnet', label: 'Claude 4.5 Sonnet' },
{ value: 'claude-4.5-haiku', label: 'Claude 4.5 Haiku' },

View File

@@ -1,4 +1,4 @@
import { useCallback, useRef, useState } from 'react'
import { useCallback, useEffect, useRef, useState } from 'react'
import { createLogger } from '@sim/logger'
import { useQueryClient } from '@tanstack/react-query'
import { v4 as uuidv4 } from 'uuid'
@@ -15,7 +15,8 @@ import {
TriggerUtils,
} from '@/lib/workflows/triggers/triggers'
import { useCurrentWorkflow } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-current-workflow'
import type { BlockLog, ExecutionResult, StreamingExecution } from '@/executor/types'
import type { SerializableExecutionState } from '@/executor/execution/types'
import type { BlockLog, BlockState, ExecutionResult, StreamingExecution } from '@/executor/types'
import { hasExecutionResult } from '@/executor/utils/errors'
import { coerceValue } from '@/executor/utils/start-block'
import { subscriptionKeys } from '@/hooks/queries/subscription'
@@ -32,6 +33,9 @@ import { useWorkflowStore } from '@/stores/workflows/workflow/store'
const logger = createLogger('useWorkflowExecution')
// Module-level guard to prevent duplicate run-from-block executions across hook instances
let runFromBlockGlobalLock = false
// Debug state validation result
interface DebugValidationResult {
isValid: boolean
@@ -98,6 +102,8 @@ export function useWorkflowExecution() {
setActiveBlocks,
setBlockRunStatus,
setEdgeRunStatus,
setLastExecutionSnapshot,
getLastExecutionSnapshot,
} = useExecutionStore()
const [executionResult, setExecutionResult] = useState<ExecutionResult | null>(null)
const executionStream = useExecutionStream()
@@ -876,6 +882,8 @@ export function useWorkflowExecution() {
const activeBlocksSet = new Set<string>()
const streamedContent = new Map<string, string>()
const accumulatedBlockLogs: BlockLog[] = []
const accumulatedBlockStates = new Map<string, BlockState>()
const executedBlockIds = new Set<string>()
// Execute the workflow
try {
@@ -922,6 +930,14 @@ export function useWorkflowExecution() {
// Track successful block execution in run path
setBlockRunStatus(data.blockId, 'success')
// Track block state for run-from-block snapshot
executedBlockIds.add(data.blockId)
accumulatedBlockStates.set(data.blockId, {
output: data.output,
executed: true,
executionTime: data.durationMs,
})
// Edges already tracked in onBlockStarted, no need to track again
const startedAt = new Date(Date.now() - data.durationMs).toISOString()
@@ -1056,6 +1072,23 @@ export function useWorkflowExecution() {
},
logs: accumulatedBlockLogs,
}
// Store execution snapshot for run-from-block
if (data.success && activeWorkflowId) {
const snapshot: SerializableExecutionState = {
blockStates: Object.fromEntries(accumulatedBlockStates),
executedBlocks: Array.from(executedBlockIds),
blockLogs: accumulatedBlockLogs,
decisions: { router: {}, condition: {} },
completedLoops: [],
activeExecutionPath: Array.from(executedBlockIds),
}
setLastExecutionSnapshot(activeWorkflowId, snapshot)
logger.info('Stored execution snapshot for run-from-block', {
workflowId: activeWorkflowId,
executedBlocksCount: executedBlockIds.size,
})
}
},
onExecutionError: (data) => {
@@ -1376,6 +1409,228 @@ export function useWorkflowExecution() {
setActiveBlocks,
])
/**
* Handles running workflow from a specific block using cached outputs
*/
const handleRunFromBlock = useCallback(
async (blockId: string, workflowId: string) => {
// Prevent duplicate executions across multiple hook instances (panel.tsx and chat.tsx)
if (runFromBlockGlobalLock) {
logger.debug('Run-from-block already in progress (global lock), ignoring duplicate request', {
workflowId,
blockId,
})
return
}
runFromBlockGlobalLock = true
const snapshot = getLastExecutionSnapshot(workflowId)
if (!snapshot) {
logger.error('No execution snapshot available for run-from-block', { workflowId, blockId })
runFromBlockGlobalLock = false
return
}
if (!snapshot.executedBlocks.includes(blockId)) {
logger.error('Block was not executed in the source run', { workflowId, blockId })
runFromBlockGlobalLock = false
return
}
logger.info('Starting run-from-block execution', {
workflowId,
startBlockId: blockId,
snapshotExecutedBlocks: snapshot.executedBlocks.length,
})
setIsExecuting(true)
const workflowEdges = useWorkflowStore.getState().edges
const executionId = uuidv4()
const accumulatedBlockLogs: BlockLog[] = []
const accumulatedBlockStates = new Map<string, BlockState>()
const executedBlockIds = new Set<string>()
const activeBlocksSet = new Set<string>()
try {
await executionStream.executeFromBlock({
workflowId,
startBlockId: blockId,
sourceSnapshot: snapshot,
callbacks: {
onExecutionStarted: (data) => {
logger.info('Run-from-block execution started:', data)
},
onBlockStarted: (data) => {
activeBlocksSet.add(data.blockId)
setActiveBlocks(new Set(activeBlocksSet))
const incomingEdges = workflowEdges.filter((edge) => edge.target === data.blockId)
incomingEdges.forEach((edge) => {
setEdgeRunStatus(edge.id, 'success')
})
},
onBlockCompleted: (data) => {
activeBlocksSet.delete(data.blockId)
setActiveBlocks(new Set(activeBlocksSet))
setBlockRunStatus(data.blockId, 'success')
executedBlockIds.add(data.blockId)
accumulatedBlockStates.set(data.blockId, {
output: data.output,
executed: true,
executionTime: data.durationMs,
})
const startedAt = new Date(Date.now() - data.durationMs).toISOString()
const endedAt = new Date().toISOString()
accumulatedBlockLogs.push({
blockId: data.blockId,
blockName: data.blockName || 'Unknown Block',
blockType: data.blockType || 'unknown',
input: data.input || {},
output: data.output,
success: true,
durationMs: data.durationMs,
startedAt,
endedAt,
})
addConsole({
input: data.input || {},
output: data.output,
success: true,
durationMs: data.durationMs,
startedAt,
endedAt,
workflowId,
blockId: data.blockId,
executionId,
blockName: data.blockName || 'Unknown Block',
blockType: data.blockType || 'unknown',
iterationCurrent: data.iterationCurrent,
iterationTotal: data.iterationTotal,
iterationType: data.iterationType,
})
},
onBlockError: (data) => {
activeBlocksSet.delete(data.blockId)
setActiveBlocks(new Set(activeBlocksSet))
setBlockRunStatus(data.blockId, 'error')
const startedAt = new Date(Date.now() - data.durationMs).toISOString()
const endedAt = new Date().toISOString()
accumulatedBlockLogs.push({
blockId: data.blockId,
blockName: data.blockName || 'Unknown Block',
blockType: data.blockType || 'unknown',
input: data.input || {},
output: {},
success: false,
error: data.error,
durationMs: data.durationMs,
startedAt,
endedAt,
})
addConsole({
input: data.input || {},
output: {},
success: false,
error: data.error,
durationMs: data.durationMs,
startedAt,
endedAt,
workflowId,
blockId: data.blockId,
executionId,
blockName: data.blockName,
blockType: data.blockType,
iterationCurrent: data.iterationCurrent,
iterationTotal: data.iterationTotal,
iterationType: data.iterationType,
})
},
onExecutionCompleted: (data) => {
if (data.success) {
// Merge new states with snapshot states for updated snapshot
const mergedBlockStates: Record<string, BlockState> = { ...snapshot.blockStates }
for (const [bId, state] of accumulatedBlockStates) {
mergedBlockStates[bId] = state
}
const mergedExecutedBlocks = new Set([
...snapshot.executedBlocks,
...executedBlockIds,
])
const updatedSnapshot: SerializableExecutionState = {
...snapshot,
blockStates: mergedBlockStates,
executedBlocks: Array.from(mergedExecutedBlocks),
blockLogs: [...snapshot.blockLogs, ...accumulatedBlockLogs],
activeExecutionPath: Array.from(mergedExecutedBlocks),
}
setLastExecutionSnapshot(workflowId, updatedSnapshot)
logger.info('Updated execution snapshot after run-from-block', {
workflowId,
newBlocksExecuted: executedBlockIds.size,
})
}
},
onExecutionError: (data) => {
logger.error('Run-from-block execution error:', data.error)
},
onExecutionCancelled: () => {
logger.info('Run-from-block execution cancelled')
},
},
})
} catch (error) {
if ((error as Error).name !== 'AbortError') {
logger.error('Run-from-block execution failed:', error)
}
} finally {
setIsExecuting(false)
setActiveBlocks(new Set())
runFromBlockGlobalLock = false
}
},
[
getLastExecutionSnapshot,
setLastExecutionSnapshot,
setIsExecuting,
setActiveBlocks,
setBlockRunStatus,
setEdgeRunStatus,
addConsole,
executionStream,
]
)
// Listen for run-from-block events from the action bar
useEffect(() => {
const handleRunFromBlockEvent = (event: CustomEvent<{ blockId: string; workflowId: string }>) => {
const { blockId, workflowId } = event.detail
handleRunFromBlock(blockId, workflowId)
}
window.addEventListener('run-from-block', handleRunFromBlockEvent as EventListener)
return () => {
window.removeEventListener('run-from-block', handleRunFromBlockEvent as EventListener)
}
}, [handleRunFromBlock])
return {
isExecuting,
isDebugging,
@@ -1386,5 +1641,6 @@ export function useWorkflowExecution() {
handleResumeDebug,
handleCancelDebug,
handleCancelExecution,
handleRunFromBlock,
}
}

View File

@@ -1641,51 +1641,36 @@ const WorkflowContent = React.memo(() => {
}, [screenToFlowPosition, handleToolbarDrop])
/**
* Focus canvas on changed blocks when diff appears
* Focuses on new/edited blocks rather than fitting the entire workflow
* Focus canvas on changed blocks when diff appears.
*/
const pendingZoomBlockIdsRef = useRef<Set<string> | null>(null)
const prevDiffReadyRef = useRef(false)
// Phase 1: When diff becomes ready, record which blocks we want to zoom to
// Phase 2 effect is located after displayNodes is defined (search for "Phase 2")
useEffect(() => {
// Only focus when diff transitions from not ready to ready
if (isDiffReady && !prevDiffReadyRef.current && diffAnalysis) {
// Diff just became ready - record blocks to zoom to
const changedBlockIds = [
...(diffAnalysis.new_blocks || []),
...(diffAnalysis.edited_blocks || []),
]
if (changedBlockIds.length > 0) {
const allNodes = getNodes()
const changedNodes = allNodes.filter((node) => changedBlockIds.includes(node.id))
if (changedNodes.length > 0) {
logger.info('Diff ready - focusing on changed blocks', {
changedBlockIds,
foundNodes: changedNodes.length,
})
requestAnimationFrame(() => {
fitViewToBounds({
nodes: changedNodes,
duration: 600,
padding: 0.1,
minZoom: 0.5,
maxZoom: 1.0,
})
})
} else {
logger.info('Diff ready - no changed nodes found, fitting all')
requestAnimationFrame(() => {
fitViewToBounds({ padding: 0.1, duration: 600 })
})
}
pendingZoomBlockIdsRef.current = new Set(changedBlockIds)
} else {
logger.info('Diff ready - no changed blocks, fitting all')
// No specific blocks to focus on, fit all after a frame
pendingZoomBlockIdsRef.current = null
requestAnimationFrame(() => {
fitViewToBounds({ padding: 0.1, duration: 600 })
})
}
} else if (!isDiffReady && prevDiffReadyRef.current) {
// Diff was cleared (accepted/rejected) - cancel any pending zoom
pendingZoomBlockIdsRef.current = null
}
prevDiffReadyRef.current = isDiffReady
}, [isDiffReady, diffAnalysis, fitViewToBounds, getNodes])
}, [isDiffReady, diffAnalysis, fitViewToBounds])
/** Displays trigger warning notifications. */
useEffect(() => {
@@ -2093,6 +2078,48 @@ const WorkflowContent = React.memo(() => {
})
}, [derivedNodes, blocks, pendingSelection, clearPendingSelection])
// Phase 2: When displayNodes updates, check if pending zoom blocks are ready
// (Phase 1 is located earlier in the file where pendingZoomBlockIdsRef is defined)
useEffect(() => {
const pendingBlockIds = pendingZoomBlockIdsRef.current
if (!pendingBlockIds || pendingBlockIds.size === 0) {
return
}
// Find the nodes we're waiting for
const pendingNodes = displayNodes.filter((node) => pendingBlockIds.has(node.id))
// Check if all expected nodes are present with valid dimensions
const allNodesReady =
pendingNodes.length === pendingBlockIds.size &&
pendingNodes.every(
(node) =>
typeof node.width === 'number' &&
typeof node.height === 'number' &&
node.width > 0 &&
node.height > 0
)
if (allNodesReady) {
logger.info('Diff ready - focusing on changed blocks', {
changedBlockIds: Array.from(pendingBlockIds),
foundNodes: pendingNodes.length,
})
// Clear pending state before zooming to prevent re-triggers
pendingZoomBlockIdsRef.current = null
// Use requestAnimationFrame to ensure React has finished rendering
requestAnimationFrame(() => {
fitViewToBounds({
nodes: pendingNodes,
duration: 600,
padding: 0.1,
minZoom: 0.5,
maxZoom: 1.0,
})
})
}
}, [displayNodes, fitViewToBounds])
/** Handles ActionBar remove-from-subflow events. */
useEffect(() => {
const handleRemoveFromSubflow = (event: Event) => {

View File

@@ -259,6 +259,17 @@ export class ExecutionEngine {
}
private initializeQueue(triggerBlockId?: string): void {
// Run-from-block mode: start directly from specified block
if (this.context.runFromBlockContext) {
const { startBlockId } = this.context.runFromBlockContext
logger.info('Initializing queue for run-from-block mode', {
startBlockId,
dirtySetSize: this.context.runFromBlockContext.dirtySet.size,
})
this.addToQueue(startBlockId)
return
}
const pendingBlocks = this.context.metadata.pendingBlocks
const remainingEdges = (this.context.metadata as any).remainingEdges

View File

@@ -5,12 +5,21 @@ import { BlockExecutor } from '@/executor/execution/block-executor'
import { EdgeManager } from '@/executor/execution/edge-manager'
import { ExecutionEngine } from '@/executor/execution/engine'
import { ExecutionState } from '@/executor/execution/state'
import type { ContextExtensions, WorkflowInput } from '@/executor/execution/types'
import type {
ContextExtensions,
SerializableExecutionState,
WorkflowInput,
} from '@/executor/execution/types'
import { createBlockHandlers } from '@/executor/handlers/registry'
import { LoopOrchestrator } from '@/executor/orchestrators/loop'
import { NodeExecutionOrchestrator } from '@/executor/orchestrators/node'
import { ParallelOrchestrator } from '@/executor/orchestrators/parallel'
import type { BlockState, ExecutionContext, ExecutionResult } from '@/executor/types'
import {
computeDirtySet,
type RunFromBlockContext,
validateRunFromBlock,
} from '@/executor/utils/run-from-block'
import {
buildResolutionFromBlock,
buildStartBlockOutput,
@@ -89,17 +98,126 @@ export class DAGExecutor {
}
}
/**
* Execute workflow starting from a specific block, using cached outputs
* for all upstream/unaffected blocks from the source snapshot.
*
* This implements Jupyter notebook-style execution where:
* - The start block and all downstream blocks are re-executed
* - Upstream blocks retain their cached outputs from the source snapshot
* - The result is a merged execution state
*
* @param workflowId - The workflow ID
* @param startBlockId - The block to start execution from
* @param sourceSnapshot - The execution state from a previous run
* @returns Merged execution result with cached + fresh outputs
*/
async executeFromBlock(
workflowId: string,
startBlockId: string,
sourceSnapshot: SerializableExecutionState
): Promise<ExecutionResult> {
// Build full DAG (no trigger constraint - we need all blocks for validation)
const dag = this.dagBuilder.build(this.workflow)
// Validate the start block
const executedBlocks = new Set(sourceSnapshot.executedBlocks)
const validation = validateRunFromBlock(startBlockId, dag, executedBlocks)
if (!validation.valid) {
throw new Error(validation.error)
}
// Compute dirty set (blocks that will be re-executed)
const dirtySet = computeDirtySet(dag, startBlockId)
logger.info('Executing from block', {
workflowId,
startBlockId,
dirtySetSize: dirtySet.size,
totalBlocks: dag.nodes.size,
dirtyBlocks: Array.from(dirtySet),
})
// For convergent blocks in the dirty set, remove incoming edges from non-dirty sources.
// This ensures that a dirty block waiting on multiple inputs doesn't wait for non-dirty
// upstream blocks (whose outputs are already cached).
for (const nodeId of dirtySet) {
const node = dag.nodes.get(nodeId)
if (!node) continue
const nonDirtyIncoming: string[] = []
for (const sourceId of node.incomingEdges) {
if (!dirtySet.has(sourceId)) {
nonDirtyIncoming.push(sourceId)
}
}
for (const sourceId of nonDirtyIncoming) {
node.incomingEdges.delete(sourceId)
logger.debug('Removed non-dirty incoming edge for run-from-block', {
nodeId,
sourceId,
})
}
}
// Create context with snapshot state + runFromBlockContext
const runFromBlockContext = { startBlockId, dirtySet }
const { context, state } = this.createExecutionContext(workflowId, undefined, {
snapshotState: sourceSnapshot,
runFromBlockContext,
})
// Setup orchestrators and engine (same as execute())
const resolver = new VariableResolver(this.workflow, this.workflowVariables, state)
const loopOrchestrator = new LoopOrchestrator(dag, state, resolver)
loopOrchestrator.setContextExtensions(this.contextExtensions)
const parallelOrchestrator = new ParallelOrchestrator(dag, state)
parallelOrchestrator.setResolver(resolver)
parallelOrchestrator.setContextExtensions(this.contextExtensions)
const allHandlers = createBlockHandlers()
const blockExecutor = new BlockExecutor(allHandlers, resolver, this.contextExtensions, state)
const edgeManager = new EdgeManager(dag)
loopOrchestrator.setEdgeManager(edgeManager)
const nodeOrchestrator = new NodeExecutionOrchestrator(
dag,
state,
blockExecutor,
loopOrchestrator,
parallelOrchestrator
)
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
// Run and return result
return await engine.run()
}
private createExecutionContext(
workflowId: string,
triggerBlockId?: string
triggerBlockId?: string,
overrides?: {
snapshotState?: SerializableExecutionState
runFromBlockContext?: RunFromBlockContext
}
): { context: ExecutionContext; state: ExecutionState } {
const snapshotState = this.contextExtensions.snapshotState
const snapshotState = overrides?.snapshotState ?? this.contextExtensions.snapshotState
const blockStates = snapshotState?.blockStates
? new Map(Object.entries(snapshotState.blockStates))
: new Map<string, BlockState>()
const executedBlocks = snapshotState?.executedBlocks
let executedBlocks = snapshotState?.executedBlocks
? new Set(snapshotState.executedBlocks)
: new Set<string>()
// In run-from-block mode, clear the executed status for dirty blocks so they can be re-executed
if (overrides?.runFromBlockContext) {
const { dirtySet } = overrides.runFromBlockContext
executedBlocks = new Set([...executedBlocks].filter((id) => !dirtySet.has(id)))
logger.info('Cleared executed status for dirty blocks', {
dirtySetSize: dirtySet.size,
remainingExecutedBlocks: executedBlocks.size,
})
}
const state = new ExecutionState(blockStates, executedBlocks)
const context: ExecutionContext = {
@@ -169,6 +287,7 @@ export class DAGExecutor {
abortSignal: this.contextExtensions.abortSignal,
includeFileBase64: this.contextExtensions.includeFileBase64,
base64MaxBytes: this.contextExtensions.base64MaxBytes,
runFromBlockContext: overrides?.runFromBlockContext,
}
if (this.contextExtensions.resumeFromSnapshot) {
@@ -193,6 +312,12 @@ export class DAGExecutor {
pendingBlocks: context.metadata.pendingBlocks,
skipStarterBlockInit: true,
})
} else if (overrides?.runFromBlockContext) {
// In run-from-block mode, skip starter block initialization
// All block states come from the snapshot
logger.info('Run-from-block mode: skipping starter block initialization', {
startBlockId: overrides.runFromBlockContext.startBlockId,
})
} else {
this.initializeStarterBlock(context, state, triggerBlockId)
}

View File

@@ -1,5 +1,6 @@
import type { Edge } from 'reactflow'
import type { BlockLog, BlockState, NormalizedBlockOutput } from '@/executor/types'
import type { RunFromBlockContext } from '@/executor/utils/run-from-block'
import type { SubflowType } from '@/stores/workflows/workflow/types'
export interface ExecutionMetadata {
@@ -105,6 +106,12 @@ export interface ContextExtensions {
output: { input?: any; output: NormalizedBlockOutput; executionTime: number },
iterationContext?: IterationContext
) => Promise<void>
/**
* Run-from-block configuration. When provided, executor runs in partial
* execution mode starting from the specified block.
*/
runFromBlockContext?: RunFromBlockContext
}
export interface WorkflowInput {

View File

@@ -31,7 +31,20 @@ export class NodeExecutionOrchestrator {
throw new Error(`Node not found in DAG: ${nodeId}`)
}
if (this.state.hasExecuted(nodeId)) {
// In run-from-block mode, skip execution for non-dirty blocks and return cached output
if (ctx.runFromBlockContext && !ctx.runFromBlockContext.dirtySet.has(nodeId)) {
const cachedOutput = this.state.getBlockOutput(nodeId) || {}
logger.debug('Skipping non-dirty block in run-from-block mode', { nodeId })
return {
nodeId,
output: cachedOutput,
isFinalOutput: false,
}
}
// Skip hasExecuted check for dirty blocks in run-from-block mode - they need to be re-executed
const isDirtyBlock = ctx.runFromBlockContext?.dirtySet.has(nodeId) ?? false
if (!isDirtyBlock && this.state.hasExecuted(nodeId)) {
const output = this.state.getBlockOutput(nodeId) || {}
return {
nodeId,

View File

@@ -1,6 +1,7 @@
import type { TraceSpan } from '@/lib/logs/types'
import type { PermissionGroupConfig } from '@/lib/permission-groups/types'
import type { BlockOutput } from '@/blocks/types'
import type { RunFromBlockContext } from '@/executor/utils/run-from-block'
import type { SerializedBlock, SerializedWorkflow } from '@/serializer/types'
export interface UserFile {
@@ -250,6 +251,12 @@ export interface ExecutionContext {
* will not have their base64 content fetched.
*/
base64MaxBytes?: number
/**
* Context for "run from block" mode. When present, only blocks in dirtySet
* will be executed; others return cached outputs from the source snapshot.
*/
runFromBlockContext?: RunFromBlockContext
}
export interface ExecutionResult {

View File

@@ -0,0 +1,48 @@
/**
* Formats a JavaScript/TypeScript value as a code literal for the target language.
* Handles special cases like null, undefined, booleans, and Python-specific number representations.
*
* @param value - The value to format
* @param language - Target language ('javascript' or 'python')
* @returns A string literal representation valid in the target language
*
* @example
* formatLiteralForCode(null, 'python') // => 'None'
* formatLiteralForCode(true, 'python') // => 'True'
* formatLiteralForCode(NaN, 'python') // => "float('nan')"
* formatLiteralForCode("hello", 'javascript') // => '"hello"'
* formatLiteralForCode({a: 1}, 'python') // => "json.loads('{\"a\":1}')"
*/
export function formatLiteralForCode(value: unknown, language: 'javascript' | 'python'): string {
const isPython = language === 'python'
if (value === undefined) {
return isPython ? 'None' : 'undefined'
}
if (value === null) {
return isPython ? 'None' : 'null'
}
if (typeof value === 'boolean') {
return isPython ? (value ? 'True' : 'False') : String(value)
}
if (typeof value === 'number') {
if (Number.isNaN(value)) {
return isPython ? "float('nan')" : 'NaN'
}
if (value === Number.POSITIVE_INFINITY) {
return isPython ? "float('inf')" : 'Infinity'
}
if (value === Number.NEGATIVE_INFINITY) {
return isPython ? "float('-inf')" : '-Infinity'
}
return String(value)
}
if (typeof value === 'string') {
return JSON.stringify(value)
}
// Objects and arrays - Python needs json.loads() because JSON true/false/null aren't valid Python
if (isPython) {
return `json.loads(${JSON.stringify(JSON.stringify(value))})`
}
return JSON.stringify(value)
}

View File

@@ -0,0 +1,336 @@
import { describe, expect, it } from 'vitest'
import type { DAG, DAGNode } from '@/executor/dag/builder'
import type { DAGEdge, NodeMetadata } from '@/executor/dag/types'
import type { SerializedLoop, SerializedParallel } from '@/serializer/types'
import { computeDirtySet, validateRunFromBlock } from '@/executor/utils/run-from-block'
/**
* Helper to create a DAG node for testing
*/
function createNode(
id: string,
outgoingEdges: Array<{ target: string; sourceHandle?: string }> = [],
metadata: Partial<NodeMetadata> = {}
): DAGNode {
const edges = new Map<string, DAGEdge>()
for (const edge of outgoingEdges) {
edges.set(edge.target, { target: edge.target, sourceHandle: edge.sourceHandle })
}
return {
id,
block: {
id,
position: { x: 0, y: 0 },
config: { tool: 'test', params: {} },
inputs: {},
outputs: {},
metadata: { id: 'test', name: `block-${id}`, category: 'tools' },
enabled: true,
},
incomingEdges: new Set<string>(),
outgoingEdges: edges,
metadata: {
isParallelBranch: false,
isLoopNode: false,
isSentinel: false,
...metadata,
},
}
}
/**
* Helper to create a DAG for testing
*/
function createDAG(nodes: DAGNode[]): DAG {
const nodeMap = new Map<string, DAGNode>()
for (const node of nodes) {
nodeMap.set(node.id, node)
}
// Set up incoming edges based on outgoing edges
for (const node of nodes) {
for (const [, edge] of node.outgoingEdges) {
const targetNode = nodeMap.get(edge.target)
if (targetNode) {
targetNode.incomingEdges.add(node.id)
}
}
}
return {
nodes: nodeMap,
loopConfigs: new Map<string, SerializedLoop>(),
parallelConfigs: new Map<string, SerializedParallel>(),
}
}
describe('computeDirtySet', () => {
it('includes start block in dirty set', () => {
const dag = createDAG([createNode('A'), createNode('B'), createNode('C')])
const dirtySet = computeDirtySet(dag, 'B')
expect(dirtySet.has('B')).toBe(true)
})
it('includes all downstream blocks in linear workflow', () => {
// A → B → C → D
const dag = createDAG([
createNode('A', [{ target: 'B' }]),
createNode('B', [{ target: 'C' }]),
createNode('C', [{ target: 'D' }]),
createNode('D'),
])
const dirtySet = computeDirtySet(dag, 'B')
expect(dirtySet.has('A')).toBe(false)
expect(dirtySet.has('B')).toBe(true)
expect(dirtySet.has('C')).toBe(true)
expect(dirtySet.has('D')).toBe(true)
expect(dirtySet.size).toBe(3)
})
it('handles branching paths', () => {
// A → B → C
// ↓
// D → E
const dag = createDAG([
createNode('A', [{ target: 'B' }]),
createNode('B', [{ target: 'C' }, { target: 'D' }]),
createNode('C'),
createNode('D', [{ target: 'E' }]),
createNode('E'),
])
const dirtySet = computeDirtySet(dag, 'B')
expect(dirtySet.has('A')).toBe(false)
expect(dirtySet.has('B')).toBe(true)
expect(dirtySet.has('C')).toBe(true)
expect(dirtySet.has('D')).toBe(true)
expect(dirtySet.has('E')).toBe(true)
expect(dirtySet.size).toBe(4)
})
it('handles convergence points', () => {
// A → C
// B → C → D
const dag = createDAG([
createNode('A', [{ target: 'C' }]),
createNode('B', [{ target: 'C' }]),
createNode('C', [{ target: 'D' }]),
createNode('D'),
])
// Run from A: should include A, C, D (but not B)
const dirtySet = computeDirtySet(dag, 'A')
expect(dirtySet.has('A')).toBe(true)
expect(dirtySet.has('B')).toBe(false)
expect(dirtySet.has('C')).toBe(true)
expect(dirtySet.has('D')).toBe(true)
expect(dirtySet.size).toBe(3)
})
it('handles diamond pattern', () => {
// B
// ↗ ↘
// A D
// ↘ ↗
// C
const dag = createDAG([
createNode('A', [{ target: 'B' }, { target: 'C' }]),
createNode('B', [{ target: 'D' }]),
createNode('C', [{ target: 'D' }]),
createNode('D'),
])
const dirtySet = computeDirtySet(dag, 'A')
expect(dirtySet.has('A')).toBe(true)
expect(dirtySet.has('B')).toBe(true)
expect(dirtySet.has('C')).toBe(true)
expect(dirtySet.has('D')).toBe(true)
expect(dirtySet.size).toBe(4)
})
it('stops at graph boundaries', () => {
// A → B C → D (disconnected)
const dag = createDAG([
createNode('A', [{ target: 'B' }]),
createNode('B'),
createNode('C', [{ target: 'D' }]),
createNode('D'),
])
const dirtySet = computeDirtySet(dag, 'A')
expect(dirtySet.has('A')).toBe(true)
expect(dirtySet.has('B')).toBe(true)
expect(dirtySet.has('C')).toBe(false)
expect(dirtySet.has('D')).toBe(false)
expect(dirtySet.size).toBe(2)
})
it('handles single node workflow', () => {
const dag = createDAG([createNode('A')])
const dirtySet = computeDirtySet(dag, 'A')
expect(dirtySet.has('A')).toBe(true)
expect(dirtySet.size).toBe(1)
})
it('handles node not in DAG gracefully', () => {
const dag = createDAG([createNode('A'), createNode('B')])
const dirtySet = computeDirtySet(dag, 'nonexistent')
// Should just contain the start block ID even if not found
expect(dirtySet.has('nonexistent')).toBe(true)
expect(dirtySet.size).toBe(1)
})
it('includes convergent block when running from one branch of parallel', () => {
// Parallel branches converging:
// A → B → D
// A → C → D
// Running from B should include B and D (but not A or C)
const dag = createDAG([
createNode('A', [{ target: 'B' }, { target: 'C' }]),
createNode('B', [{ target: 'D' }]),
createNode('C', [{ target: 'D' }]),
createNode('D'),
])
const dirtySet = computeDirtySet(dag, 'B')
expect(dirtySet.has('A')).toBe(false)
expect(dirtySet.has('B')).toBe(true)
expect(dirtySet.has('C')).toBe(false)
expect(dirtySet.has('D')).toBe(true)
expect(dirtySet.size).toBe(2)
})
it('handles running from convergent block itself (all upstream non-dirty)', () => {
// A → C
// B → C
// Running from C should only include C
const dag = createDAG([
createNode('A', [{ target: 'C' }]),
createNode('B', [{ target: 'C' }]),
createNode('C', [{ target: 'D' }]),
createNode('D'),
])
const dirtySet = computeDirtySet(dag, 'C')
expect(dirtySet.has('A')).toBe(false)
expect(dirtySet.has('B')).toBe(false)
expect(dirtySet.has('C')).toBe(true)
expect(dirtySet.has('D')).toBe(true)
expect(dirtySet.size).toBe(2)
})
it('handles deep downstream chains', () => {
// A → B → C → D → E → F
// Running from C should include C, D, E, F
const dag = createDAG([
createNode('A', [{ target: 'B' }]),
createNode('B', [{ target: 'C' }]),
createNode('C', [{ target: 'D' }]),
createNode('D', [{ target: 'E' }]),
createNode('E', [{ target: 'F' }]),
createNode('F'),
])
const dirtySet = computeDirtySet(dag, 'C')
expect(dirtySet.has('A')).toBe(false)
expect(dirtySet.has('B')).toBe(false)
expect(dirtySet.has('C')).toBe(true)
expect(dirtySet.has('D')).toBe(true)
expect(dirtySet.has('E')).toBe(true)
expect(dirtySet.has('F')).toBe(true)
expect(dirtySet.size).toBe(4)
})
})
describe('validateRunFromBlock', () => {
it('accepts valid block', () => {
const dag = createDAG([createNode('A'), createNode('B')])
const executedBlocks = new Set(['A', 'B'])
const result = validateRunFromBlock('A', dag, executedBlocks)
expect(result.valid).toBe(true)
expect(result.error).toBeUndefined()
})
it('rejects block not found in DAG', () => {
const dag = createDAG([createNode('A')])
const executedBlocks = new Set(['A', 'B'])
const result = validateRunFromBlock('B', dag, executedBlocks)
expect(result.valid).toBe(false)
expect(result.error).toContain('Block not found')
})
it('rejects blocks inside loops', () => {
const dag = createDAG([createNode('A', [], { isLoopNode: true, loopId: 'loop-1' })])
const executedBlocks = new Set(['A'])
const result = validateRunFromBlock('A', dag, executedBlocks)
expect(result.valid).toBe(false)
expect(result.error).toContain('inside loop')
expect(result.error).toContain('loop-1')
})
it('rejects blocks inside parallels', () => {
const dag = createDAG([createNode('A', [], { isParallelBranch: true, parallelId: 'parallel-1' })])
const executedBlocks = new Set(['A'])
const result = validateRunFromBlock('A', dag, executedBlocks)
expect(result.valid).toBe(false)
expect(result.error).toContain('inside parallel')
expect(result.error).toContain('parallel-1')
})
it('rejects sentinel nodes', () => {
const dag = createDAG([createNode('A', [], { isSentinel: true, sentinelType: 'start' })])
const executedBlocks = new Set(['A'])
const result = validateRunFromBlock('A', dag, executedBlocks)
expect(result.valid).toBe(false)
expect(result.error).toContain('sentinel')
})
it('rejects unexecuted blocks', () => {
const dag = createDAG([createNode('A'), createNode('B')])
const executedBlocks = new Set(['A']) // B was not executed
const result = validateRunFromBlock('B', dag, executedBlocks)
expect(result.valid).toBe(false)
expect(result.error).toContain('was not executed')
})
it('accepts regular executed block', () => {
const dag = createDAG([
createNode('trigger', [{ target: 'A' }]),
createNode('A', [{ target: 'B' }]),
createNode('B'),
])
const executedBlocks = new Set(['trigger', 'A', 'B'])
const result = validateRunFromBlock('A', dag, executedBlocks)
expect(result.valid).toBe(true)
})
})

View File

@@ -0,0 +1,110 @@
import { createLogger } from '@sim/logger'
import type { DAG } from '@/executor/dag/builder'
const logger = createLogger('run-from-block')
/**
* Result of validating a block for run-from-block execution.
*/
export interface RunFromBlockValidation {
valid: boolean
error?: string
}
/**
* Context for run-from-block execution mode.
*/
export interface RunFromBlockContext {
/** The block ID to start execution from */
startBlockId: string
/** Set of block IDs that need re-execution (start block + all downstream) */
dirtySet: Set<string>
}
/**
* Computes all blocks that need re-execution when running from a specific block.
* Uses BFS to find all downstream blocks reachable via outgoing edges.
*
* @param dag - The workflow DAG
* @param startBlockId - The block to start execution from
* @returns Set of block IDs that are "dirty" and need re-execution
*/
export function computeDirtySet(dag: DAG, startBlockId: string): Set<string> {
const dirty = new Set<string>([startBlockId])
const queue = [startBlockId]
while (queue.length > 0) {
const nodeId = queue.shift()!
const node = dag.nodes.get(nodeId)
if (!node) continue
for (const [, edge] of node.outgoingEdges) {
if (!dirty.has(edge.target)) {
dirty.add(edge.target)
queue.push(edge.target)
}
}
}
logger.debug('Computed dirty set', {
startBlockId,
dirtySetSize: dirty.size,
dirtyBlocks: Array.from(dirty),
})
return dirty
}
/**
* Validates that a block can be used as a run-from-block starting point.
*
* Validation rules:
* - Block must exist in the DAG
* - Block cannot be inside a loop
* - Block cannot be inside a parallel
* - Block cannot be a sentinel node
* - Block must have been executed in the source run
*
* @param blockId - The block ID to validate
* @param dag - The workflow DAG
* @param executedBlocks - Set of blocks that were executed in the source run
* @returns Validation result with error message if invalid
*/
export function validateRunFromBlock(
blockId: string,
dag: DAG,
executedBlocks: Set<string>
): RunFromBlockValidation {
const node = dag.nodes.get(blockId)
if (!node) {
return { valid: false, error: `Block not found in workflow: ${blockId}` }
}
if (node.metadata.isLoopNode) {
return {
valid: false,
error: `Cannot run from block inside loop: ${node.metadata.loopId}`,
}
}
if (node.metadata.isParallelBranch) {
return {
valid: false,
error: `Cannot run from block inside parallel: ${node.metadata.parallelId}`,
}
}
if (node.metadata.isSentinel) {
return { valid: false, error: 'Cannot run from sentinel node' }
}
if (!executedBlocks.has(blockId)) {
return {
valid: false,
error: `Block was not executed in source run: ${blockId}`,
}
}
return { valid: true }
}

View File

@@ -157,7 +157,14 @@ export class VariableResolver {
let replacementError: Error | null = null
// Use generic utility for smart variable reference replacement
const blockType = block?.metadata?.id
const language =
blockType === BlockType.FUNCTION
? ((block?.config?.params as Record<string, unknown> | undefined)?.language as
| string
| undefined)
: undefined
let result = replaceValidReferences(template, (match) => {
if (replacementError) return match
@@ -167,14 +174,7 @@ export class VariableResolver {
return match
}
const blockType = block?.metadata?.id
const isInTemplateLiteral =
blockType === BlockType.FUNCTION &&
template.includes('${') &&
template.includes('}') &&
template.includes('`')
return this.blockResolver.formatValueForBlock(resolved, blockType, isInTemplateLiteral)
return this.blockResolver.formatValueForBlock(resolved, blockType, language)
} catch (error) {
replacementError = error instanceof Error ? error : new Error(String(error))
return match

View File

@@ -257,15 +257,9 @@ describe('BlockResolver', () => {
expect(result).toBe('"hello"')
})
it.concurrent('should format string for function block in template literal', () => {
it.concurrent('should format object for function block', () => {
const resolver = new BlockResolver(createTestWorkflow())
const result = resolver.formatValueForBlock('hello', 'function', true)
expect(result).toBe('hello')
})
it.concurrent('should format object for function block in template literal', () => {
const resolver = new BlockResolver(createTestWorkflow())
const result = resolver.formatValueForBlock({ a: 1 }, 'function', true)
const result = resolver.formatValueForBlock({ a: 1 }, 'function')
expect(result).toBe('{"a":1}')
})

View File

@@ -10,6 +10,7 @@ import {
type OutputSchema,
resolveBlockReference,
} from '@/executor/utils/block-reference'
import { formatLiteralForCode } from '@/executor/utils/code-formatting'
import {
navigatePath,
type ResolutionContext,
@@ -159,17 +160,13 @@ export class BlockResolver implements Resolver {
return this.nameToBlockId.get(normalizeName(name))
}
public formatValueForBlock(
value: any,
blockType: string | undefined,
isInTemplateLiteral = false
): string {
public formatValueForBlock(value: any, blockType: string | undefined, language?: string): string {
if (blockType === 'condition') {
return this.stringifyForCondition(value)
}
if (blockType === 'function') {
return this.formatValueForCodeContext(value, isInTemplateLiteral)
return this.formatValueForCodeContext(value, language)
}
if (blockType === 'response') {
@@ -210,29 +207,7 @@ export class BlockResolver implements Resolver {
return String(value)
}
private formatValueForCodeContext(value: any, isInTemplateLiteral: boolean): string {
if (isInTemplateLiteral) {
if (typeof value === 'string') {
return value
}
if (typeof value === 'object' && value !== null) {
return JSON.stringify(value)
}
return String(value)
}
if (typeof value === 'string') {
return JSON.stringify(value)
}
if (typeof value === 'object' && value !== null) {
return JSON.stringify(value)
}
if (value === undefined) {
return 'undefined'
}
if (value === null) {
return 'null'
}
return String(value)
private formatValueForCodeContext(value: any, language?: string): string {
return formatLiteralForCode(value, language === 'python' ? 'python' : 'javascript')
}
}

View File

@@ -30,7 +30,10 @@ export function navigatePath(obj: any, path: string[]): any {
const arrayMatch = part.match(/^([^[]+)(\[.+)$/)
if (arrayMatch) {
const [, prop, bracketsPart] = arrayMatch
current = current[prop]
current =
typeof current === 'object' && current !== null
? (current as Record<string, unknown>)[prop]
: undefined
if (current === undefined || current === null) {
return undefined
}
@@ -49,7 +52,10 @@ export function navigatePath(obj: any, path: string[]): any {
const index = Number.parseInt(part, 10)
current = Array.isArray(current) ? current[index] : undefined
} else {
current = current[part]
current =
typeof current === 'object' && current !== null
? (current as Record<string, unknown>)[part]
: undefined
}
}
return current

View File

@@ -1,5 +1,6 @@
import { useCallback } from 'react'
import type { Node, ReactFlowInstance } from 'reactflow'
import { BLOCK_DIMENSIONS } from '@/lib/workflows/blocks/block-dimensions'
interface VisibleBounds {
width: number
@@ -139,8 +140,8 @@ export function useCanvasViewport(reactFlowInstance: ReactFlowInstance | null) {
let maxY = Number.NEGATIVE_INFINITY
nodes.forEach((node) => {
const nodeWidth = node.width ?? 200
const nodeHeight = node.height ?? 100
const nodeWidth = node.width ?? BLOCK_DIMENSIONS.FIXED_WIDTH
const nodeHeight = node.height ?? BLOCK_DIMENSIONS.MIN_HEIGHT
minX = Math.min(minX, node.position.x)
minY = Math.min(minY, node.position.y)

View File

@@ -1,10 +1,85 @@
import { useCallback, useRef } from 'react'
import { createLogger } from '@sim/logger'
import type { ExecutionEvent } from '@/lib/workflows/executor/execution-events'
import type { SerializableExecutionState } from '@/executor/execution/types'
import type { SubflowType } from '@/stores/workflows/workflow/types'
const logger = createLogger('useExecutionStream')
/**
* Processes SSE events from a response body and invokes appropriate callbacks.
*/
async function processSSEStream(
reader: ReadableStreamDefaultReader<Uint8Array>,
callbacks: ExecutionStreamCallbacks,
logPrefix: string
): Promise<void> {
const decoder = new TextDecoder()
let buffer = ''
try {
while (true) {
const { done, value } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
const lines = buffer.split('\n\n')
buffer = lines.pop() || ''
for (const line of lines) {
if (!line.trim() || !line.startsWith('data: ')) continue
const data = line.substring(6).trim()
if (data === '[DONE]') {
logger.info(`${logPrefix} stream completed`)
continue
}
try {
const event = JSON.parse(data) as ExecutionEvent
switch (event.type) {
case 'execution:started':
callbacks.onExecutionStarted?.(event.data)
break
case 'execution:completed':
callbacks.onExecutionCompleted?.(event.data)
break
case 'execution:error':
callbacks.onExecutionError?.(event.data)
break
case 'execution:cancelled':
callbacks.onExecutionCancelled?.(event.data)
break
case 'block:started':
callbacks.onBlockStarted?.(event.data)
break
case 'block:completed':
callbacks.onBlockCompleted?.(event.data)
break
case 'block:error':
callbacks.onBlockError?.(event.data)
break
case 'stream:chunk':
callbacks.onStreamChunk?.(event.data)
break
case 'stream:done':
callbacks.onStreamDone?.(event.data)
break
default:
logger.warn('Unknown event type:', (event as any).type)
}
} catch (error) {
logger.error('Failed to parse SSE event:', error, { data })
}
}
}
} finally {
reader.releaseLock()
}
}
export interface ExecutionStreamCallbacks {
onExecutionStarted?: (data: { startTime: string }) => void
onExecutionCompleted?: (data: {
@@ -71,6 +146,13 @@ export interface ExecuteStreamOptions {
callbacks?: ExecutionStreamCallbacks
}
export interface ExecuteFromBlockOptions {
workflowId: string
startBlockId: string
sourceSnapshot: SerializableExecutionState
callbacks?: ExecutionStreamCallbacks
}
/**
* Hook for executing workflows via server-side SSE streaming
*/
@@ -119,91 +201,7 @@ export function useExecutionStream() {
}
const reader = response.body.getReader()
const decoder = new TextDecoder()
let buffer = ''
try {
while (true) {
const { done, value } = await reader.read()
if (done) {
break
}
buffer += decoder.decode(value, { stream: true })
const lines = buffer.split('\n\n')
buffer = lines.pop() || ''
for (const line of lines) {
if (!line.trim() || !line.startsWith('data: ')) {
continue
}
const data = line.substring(6).trim()
if (data === '[DONE]') {
logger.info('Stream completed')
continue
}
try {
const event = JSON.parse(data) as ExecutionEvent
logger.info('📡 SSE Event received:', {
type: event.type,
executionId: event.executionId,
data: event.data,
})
switch (event.type) {
case 'execution:started':
logger.info('🚀 Execution started')
callbacks.onExecutionStarted?.(event.data)
break
case 'execution:completed':
logger.info('✅ Execution completed')
callbacks.onExecutionCompleted?.(event.data)
break
case 'execution:error':
logger.error('❌ Execution error')
callbacks.onExecutionError?.(event.data)
break
case 'execution:cancelled':
logger.warn('🛑 Execution cancelled')
callbacks.onExecutionCancelled?.(event.data)
break
case 'block:started':
logger.info('🔷 Block started:', event.data.blockId)
callbacks.onBlockStarted?.(event.data)
break
case 'block:completed':
logger.info('✓ Block completed:', event.data.blockId)
callbacks.onBlockCompleted?.(event.data)
break
case 'block:error':
logger.error('✗ Block error:', event.data.blockId)
callbacks.onBlockError?.(event.data)
break
case 'stream:chunk':
callbacks.onStreamChunk?.(event.data)
break
case 'stream:done':
logger.info('Stream done:', event.data.blockId)
callbacks.onStreamDone?.(event.data)
break
default:
logger.warn('Unknown event type:', (event as any).type)
}
} catch (error) {
logger.error('Failed to parse SSE event:', error, { data })
}
}
}
} finally {
reader.releaseLock()
}
await processSSEStream(reader, callbacks, 'Execution')
} catch (error: any) {
if (error.name === 'AbortError') {
logger.info('Execution stream cancelled')
@@ -222,6 +220,65 @@ export function useExecutionStream() {
}
}, [])
const executeFromBlock = useCallback(async (options: ExecuteFromBlockOptions) => {
const { workflowId, startBlockId, sourceSnapshot, callbacks = {} } = options
if (abortControllerRef.current) {
abortControllerRef.current.abort()
}
const abortController = new AbortController()
abortControllerRef.current = abortController
currentExecutionRef.current = null
try {
const response = await fetch(`/api/workflows/${workflowId}/execute-from-block`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ startBlockId, sourceSnapshot }),
signal: abortController.signal,
})
if (!response.ok) {
const errorResponse = await response.json()
const error = new Error(errorResponse.error || 'Failed to start execution')
if (errorResponse && typeof errorResponse === 'object') {
Object.assign(error, { executionResult: errorResponse })
}
throw error
}
if (!response.body) {
throw new Error('No response body')
}
const executionId = response.headers.get('X-Execution-Id')
if (executionId) {
currentExecutionRef.current = { workflowId, executionId }
}
const reader = response.body.getReader()
await processSSEStream(reader, callbacks, 'Run-from-block')
} catch (error: any) {
if (error.name === 'AbortError') {
logger.info('Run-from-block execution cancelled')
callbacks.onExecutionCancelled?.({ duration: 0 })
} else {
logger.error('Run-from-block execution error:', error)
callbacks.onExecutionError?.({
error: error.message || 'Unknown error',
duration: 0,
})
}
throw error
} finally {
abortControllerRef.current = null
currentExecutionRef.current = null
}
}, [])
const cancel = useCallback(() => {
const execution = currentExecutionRef.current
if (execution) {
@@ -239,6 +296,7 @@ export function useExecutionStream() {
return {
execute,
executeFromBlock,
cancel,
}
}

View File

@@ -21,6 +21,7 @@ export const COPILOT_MODEL_IDS = [
'claude-4.5-opus',
'claude-4.1-opus',
'gemini-3-pro',
'auto',
] as const
export type CopilotModelId = (typeof COPILOT_MODEL_IDS)[number]

View File

@@ -132,6 +132,8 @@ async function executeCode(request) {
for (const [key, value] of Object.entries(contextVariables)) {
if (value === undefined) {
await jail.set(key, undefined)
} else if (value === null) {
await jail.set(key, null)
} else {
await jail.set(key, new ivm.ExternalCopy(value).copyInto())
}

View File

@@ -26,7 +26,7 @@ describe('VariableManager', () => {
it.concurrent('should handle boolean type variables', () => {
expect(VariableManager.parseInputForStorage('true', 'boolean')).toBe(true)
expect(VariableManager.parseInputForStorage('false', 'boolean')).toBe(false)
expect(VariableManager.parseInputForStorage('1', 'boolean')).toBe(true)
expect(VariableManager.parseInputForStorage('1', 'boolean')).toBe(false)
expect(VariableManager.parseInputForStorage('0', 'boolean')).toBe(false)
expect(VariableManager.parseInputForStorage('"true"', 'boolean')).toBe(true)
expect(VariableManager.parseInputForStorage("'false'", 'boolean')).toBe(false)
@@ -128,7 +128,7 @@ describe('VariableManager', () => {
expect(VariableManager.resolveForExecution(false, 'boolean')).toBe(false)
expect(VariableManager.resolveForExecution('true', 'boolean')).toBe(true)
expect(VariableManager.resolveForExecution('false', 'boolean')).toBe(false)
expect(VariableManager.resolveForExecution('1', 'boolean')).toBe(true)
expect(VariableManager.resolveForExecution('1', 'boolean')).toBe(false)
expect(VariableManager.resolveForExecution('0', 'boolean')).toBe(false)
})

View File

@@ -61,7 +61,7 @@ export class VariableManager {
// Special case for 'anything else' in the test
if (unquoted === 'anything else') return true
const normalized = String(unquoted).toLowerCase().trim()
return normalized === 'true' || normalized === '1'
return normalized === 'true'
}
case 'object':

View File

@@ -35,4 +35,23 @@ export const useExecutionStore = create<ExecutionState & ExecutionActions>()((se
},
clearRunPath: () => set({ lastRunPath: new Map(), lastRunEdges: new Map() }),
reset: () => set(initialState),
setLastExecutionSnapshot: (workflowId, snapshot) => {
const { lastExecutionSnapshots } = get()
const newSnapshots = new Map(lastExecutionSnapshots)
newSnapshots.set(workflowId, snapshot)
set({ lastExecutionSnapshots: newSnapshots })
},
getLastExecutionSnapshot: (workflowId) => {
const { lastExecutionSnapshots } = get()
return lastExecutionSnapshots.get(workflowId)
},
clearLastExecutionSnapshot: (workflowId) => {
const { lastExecutionSnapshots } = get()
const newSnapshots = new Map(lastExecutionSnapshots)
newSnapshots.delete(workflowId)
set({ lastExecutionSnapshots: newSnapshots })
},
}))

View File

@@ -1,4 +1,5 @@
import type { Executor } from '@/executor'
import type { SerializableExecutionState } from '@/executor/execution/types'
import type { ExecutionContext } from '@/executor/types'
/**
@@ -28,6 +29,11 @@ export interface ExecutionState {
* Cleared when a new run starts. Used to show run path indicators on edges.
*/
lastRunEdges: Map<string, EdgeRunStatus>
/**
* Stores the last successful execution snapshot per workflow.
* Used for run-from-block functionality.
*/
lastExecutionSnapshots: Map<string, SerializableExecutionState>
}
export interface ExecutionActions {
@@ -41,6 +47,18 @@ export interface ExecutionActions {
setEdgeRunStatus: (edgeId: string, status: EdgeRunStatus) => void
clearRunPath: () => void
reset: () => void
/**
* Store the execution snapshot for a workflow after successful execution.
*/
setLastExecutionSnapshot: (workflowId: string, snapshot: SerializableExecutionState) => void
/**
* Get the last execution snapshot for a workflow.
*/
getLastExecutionSnapshot: (workflowId: string) => SerializableExecutionState | undefined
/**
* Clear the execution snapshot for a workflow.
*/
clearLastExecutionSnapshot: (workflowId: string) => void
}
export const initialState: ExecutionState = {
@@ -52,4 +70,5 @@ export const initialState: ExecutionState = {
debugContext: null,
lastRunPath: new Map(),
lastRunEdges: new Map(),
lastExecutionSnapshots: new Map(),
}

View File

@@ -0,0 +1,25 @@
{{- if .Values.branding.enabled }}
---
# Branding ConfigMap
# Mounts custom branding assets (logos, CSS, etc.) into the application
apiVersion: v1
kind: ConfigMap
metadata:
name: {{ include "sim.fullname" . }}-branding
namespace: {{ .Release.Namespace }}
labels:
{{- include "sim.labels" . | nindent 4 }}
app.kubernetes.io/component: branding
{{- if .Values.branding.files }}
data:
{{- range $key, $value := .Values.branding.files }}
{{ $key }}: {{ $value | quote }}
{{- end }}
{{- end }}
{{- if .Values.branding.binaryFiles }}
binaryData:
{{- range $key, $value := .Values.branding.binaryFiles }}
{{ $key }}: {{ $value }}
{{- end }}
{{- end }}
{{- end }}

View File

@@ -110,8 +110,13 @@ spec:
{{- end }}
{{- include "sim.resources" .Values.app | nindent 10 }}
{{- include "sim.securityContext" .Values.app | nindent 10 }}
{{- if or .Values.extraVolumeMounts .Values.app.extraVolumeMounts }}
{{- if or .Values.branding.enabled .Values.extraVolumeMounts .Values.app.extraVolumeMounts }}
volumeMounts:
{{- if .Values.branding.enabled }}
- name: branding
mountPath: {{ .Values.branding.mountPath | default "/app/public/branding" }}
readOnly: true
{{- end }}
{{- with .Values.extraVolumeMounts }}
{{- toYaml . | nindent 12 }}
{{- end }}
@@ -119,8 +124,13 @@ spec:
{{- toYaml . | nindent 12 }}
{{- end }}
{{- end }}
{{- if or .Values.extraVolumes .Values.app.extraVolumes }}
{{- if or .Values.branding.enabled .Values.extraVolumes .Values.app.extraVolumes }}
volumes:
{{- if .Values.branding.enabled }}
- name: branding
configMap:
name: {{ include "sim.fullname" . }}-branding
{{- end }}
{{- with .Values.extraVolumes }}
{{- toYaml . | nindent 8 }}
{{- end }}

View File

@@ -738,6 +738,32 @@ sharedStorage:
extraVolumes: []
extraVolumeMounts: []
# Branding configuration
# Use this to inject custom branding assets (logos, CSS, etc.) into the application
branding:
# Enable/disable branding ConfigMap
enabled: false
# Mount path in the container where branding files will be available
mountPath: "/app/public/branding"
# Text files (CSS, JSON, HTML, etc.) - values are plain text
# Example:
# files:
# custom.css: |
# .logo { background-color: #ff0000; }
# config.json: |
# {"theme": "dark"}
files: {}
# Binary files (PNG, JPG, ICO, etc.) - values must be base64 encoded
# Generate base64 with: base64 -i logo.png | tr -d '\n'
# Example:
# binaryFiles:
# logo.png: "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mNk..."
# favicon.ico: "AAABAAEAEBAAAAEAIABoBAAAFgAAAA..."
binaryFiles: {}
# Additional environment variables for custom integrations
extraEnvVars: []

View File

@@ -197,7 +197,7 @@ async function getCommitsBetweenVersions(
const commitEntries = gitLog.split('\n').filter((line) => line.trim())
const nonVersionCommits = commitEntries.filter((line) => {
const [hash, message] = line.split('|')
const [, message] = line.split('|')
const isVersionCommit = message.match(/^v\d+\.\d+/)
if (isVersionCommit) {
console.log(`⏭️ Skipping version commit: ${message.substring(0, 50)}...`)
@@ -369,6 +369,25 @@ async function main() {
console.log(` No previous version found (this might be the first release)`)
}
try {
const existingRelease = await octokit.rest.repos.getReleaseByTag({
owner: REPO_OWNER,
repo: REPO_NAME,
tag: targetVersion,
})
if (existingRelease.data) {
console.log(` Release ${targetVersion} already exists, skipping creation`)
console.log(
`🔗 View release: https://github.com/${REPO_OWNER}/${REPO_NAME}/releases/tag/${targetVersion}`
)
return
}
} catch (error: any) {
if (error.status !== 404) {
throw error
}
}
const releaseBody = await generateReleaseBody(versionCommit, previousCommit || undefined)
console.log(`🚀 Creating GitHub release for ${targetVersion}...`)