Compare commits

..

50 Commits

Author SHA1 Message Date
Vikhyath Mondreti
11dc18a80d v0.5.74: autolayout improvements, clerk integration, auth enforcements 2026-01-27 20:37:39 -08:00
Vikhyath Mondreti
0d0209a108 fix(autolayout): pass through gridsize (#3042)
* fix(autolayout): pass through gridsize

* fix tests
2026-01-27 20:09:26 -08:00
Vikhyath Mondreti
500dcd4734 fix(openrouter): ignored when tools are configured but unused (#3041) 2026-01-27 19:43:44 -08:00
Vikhyath Mondreti
8bdba373c6 improvement(function): timeout increase to 5 min (#3040) 2026-01-27 19:32:11 -08:00
Vikhyath Mondreti
c8ffda1616 fix(gemini): token count (#3039)
* fix(gemini): token count

* fix to include tool call tokens
2026-01-27 19:16:54 -08:00
Waleed
b4a389a71f improvement(helm): update GPU device plugin and add cert-manager issuers (#3036)
* improvement(helm): update GPU device plugin and add cert-manager issuers

* fix(helm): address code review feedback for GPU plugin and cert-manager

* fix(helm): remove duplicate nodeSelector, add hook for CA issuer ordering

* fix(helm): remove incorrect hook, CA issuer auto-reconciles
2026-01-27 18:25:08 -08:00
Vikhyath Mondreti
65bc21608c improvement(block-inputs): must parse json accurately + models max_tokens fix (#3033)
* improvement(block-inputs): must parse json accurately

* fix sheets typing

* add reference comment

* fix models

* revert blocks changes

* fix param to follow openai new convention
2026-01-27 18:17:35 -08:00
Waleed
ef613ef035 fix(models): update cerebras and groq models (#3038) 2026-01-27 18:12:48 -08:00
Waleed
20b76e67b3 improvement(skills): extend skills (#3035) 2026-01-27 17:58:58 -08:00
Waleed
7640fdf742 feat(autolayout): add snap-to-grid support (#3031)
* feat(autolayout): add snap-to-grid support

* fix(autolayout): recalculate dimensions after grid snapping

* fix(autolayout): correct dimension calculation and propagate gridSize
2026-01-27 17:02:27 -08:00
Waleed
ab4e9dc72f v0.5.73: ci, helm updates, kb, ui fixes, note block enhancements 2026-01-26 22:04:35 -08:00
Vikhyath Mondreti
1c58c35bd8 v0.5.72: azure connection string, supabase improvement, multitrigger resolution, docs quick reference 2026-01-25 23:42:27 -08:00
Waleed
d63a5cb504 v0.5.71: ux, ci improvements, docs updates 2026-01-25 03:08:08 -08:00
Waleed
8bd5d41723 v0.5.70: router fix, anthropic agent response format adherence 2026-01-24 20:57:02 -08:00
Waleed
c12931bc50 v0.5.69: kb upgrades, blog, copilot improvements, auth consolidation (#2973)
* fix(subflows): tag dropdown + resolution logic (#2949)

* fix(subflows): tag dropdown + resolution logic

* fixes;

* revert parallel change

* chore(deps): bump posthog-js to 1.334.1 (#2948)

* fix(idempotency): add conflict target to atomicallyClaimDb query + remove redundant db namespace tracking (#2950)

* fix(idempotency): add conflict target to atomicallyClaimDb query

* delete needs to account for namespace

* simplify namespace filtering logic

* fix cleanup

* consistent target

* improvement(kb): add document filtering, select all, and React Query migration (#2951)

* improvement(kb): add document filtering, select all, and React Query migration

* test(kb): update tests for enabledFilter and removed userId params

* fix(kb): remove non-null assertion, add explicit guard

* improvement(logs): trace span, details (#2952)

* improvement(action-bar): ordering

* improvement(logs): details, trace span

* feat(blog): v0.5 release post (#2953)

* feat(blog): v0.5 post

* improvement(blog): simplify title and remove code block header

- Simplified blog title from Introducing Sim Studio v0.5 to Introducing Sim v0.5
- Removed language label header and copy button from code blocks for cleaner appearance

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* ack PR comments

* small styling improvements

* created system to create post-specific components

* updated componnet

* cache invalidation

---------

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>

* feat(admin): add credits endpoint to issue credits to users (#2954)

* feat(admin): add credits endpoint to issue credits to users

* fix(admin): use existing credit functions and handle enterprise seats

* fix(admin): reject NaN and Infinity in amount validation

* styling

* fix(admin): validate userId and email are strings

* improvement(copilot): fast mode, subagent tool responses and allow preferences (#2955)

* Improvements

* Fix actions mapping

* Remove console logs

* fix(billing): handle missing userStats and prevent crashes (#2956)

* fix(billing): handle missing userStats and prevent crashes

* fix(billing): correct import path for getFilledPillColor

* fix(billing): add Number.isFinite check to lastPeriodCost

* fix(logs): refresh logic to refresh logs details (#2958)

* fix(security): add authentication and input validation to API routes (#2959)

* fix(security): add authentication and input validation to API routes

* moved utils

* remove extraneous commetns

* removed unused dep

* improvement(helm): add internal ingress support and same-host path consolidation (#2960)

* improvement(helm): add internal ingress support and same-host path consolidation

* improvement(helm): clean up ingress template comments

Simplify verbose inline Helm comments and section dividers to match the
minimal style used in services.yaml.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* fix(helm): add missing copilot path consolidation for realtime host

When copilot.host equals realtime.host but differs from app.host,
copilot paths were not being routed. Added logic to consolidate
copilot paths into the realtime rule for this scenario.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* improvement(helm): follow ingress best practices

- Remove orphan comments that appeared when services were disabled
- Add documentation about path ordering requirements
- Paths rendered in order: realtime, copilot, app (specific before catch-all)
- Clean template output matching industry Helm chart standards

---------

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>

* feat(blog): enterprise post (#2961)

* feat(blog): enterprise post

* added more images, styling

* more content

* updated v0-5 post

* remove unused transition

---------

Co-authored-by: Vikhyath Mondreti <vikhyath@simstudio.ai>

* fix(envvars): resolution standardized (#2957)

* fix(envvars): resolution standardized

* remove comments

* address bugbot

* fix highlighting for env vars

* remove comments

* address greptile

* address bugbot

* fix(copilot): mask credentials fix (#2963)

* Fix copilot masking

* Clean up

* Lint

* improvement(webhooks): remove dead code (#2965)

* fix(webhooks): subscription recreation path

* improvement(webhooks): remove dead code

* fix tests

* address bugbot comments

* fix restoration edge case

* fix more edge cases

* address bugbot comments

* fix gmail polling

* add warnings for UI indication for credential sets

* fix(preview): subblock values (#2969)

* fix(child-workflow): nested spans handoff (#2966)

* fix(child-workflow): nested spans handoff

* remove overly defensive programming

* update type check

* type more code

* remove more dead code

* address bugbot comments

* fix(security): restrict API key access on internal-only routes (#2964)

* fix(security): restrict API key access on internal-only routes

* test(security): update function execute tests for checkInternalAuth

* updated agent handler

* move session check higher in checkSessionOrInternalAuth

* extracted duplicate code into helper for resolving user from jwt

* fix(copilot): update copilot chat title (#2968)

* fix(hitl): fix condition blocks after hitl (#2967)

* fix(notes): ghost edges (#2970)

* fix(notes): ghost edges

* fix deployed state fallback

* fallback

* remove UI level checks

* annotation missing from autoconnect source check

* improvement(docs): loop and parallel var reference syntax (#2975)

* fix(blog): slash actions description (#2976)

* improvement(docs): loop and parallel var reference syntax

* fix(blog): slash actions description

* fix(auth): copilot routes (#2977)

* Fix copilot auth

* Fix

* Fix

* Fix

* fix(copilot): fix edit summary for loops/parallels (#2978)

* fix(integrations): hide from tool bar (#2544)

* fix(landing): ui (#2979)

* fix(edge-validation): race condition on collaborative add (#2980)

* fix(variables): boolean type support and input improvements (#2981)

* fix(variables): boolean type support and input improvements

* fix formatting

---------

Co-authored-by: Vikhyath Mondreti <vikhyathvikku@gmail.com>
Co-authored-by: Emir Karabeg <78010029+emir-karabeg@users.noreply.github.com>
Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
Co-authored-by: Siddharth Ganesan <33737564+Sg312@users.noreply.github.com>
Co-authored-by: Vikhyath Mondreti <vikhyath@simstudio.ai>
2026-01-24 14:29:53 -08:00
Waleed
e9c4251c1c v0.5.68: router block reasoning, executor improvements, variable resolution consolidation, helm updates (#2946)
* improvement(workflow-item): stabilize avatar layout and fix name truncation (#2939)

* improvement(workflow-item): stabilize avatar layout and fix name truncation

* fix(avatars): revert overflow bg to hardcoded color for contrast

* fix(executor): stop parallel execution when block errors (#2940)

* improvement(helm): add per-deployment extraVolumes support (#2942)

* fix(gmail): expose messageId field in read email block (#2943)

* fix(resolver): consolidate reference resolution  (#2941)

* fix(resolver): consolidate code to resolve references

* fix edge cases

* use already formatted error

* fix multi index

* fix backwards compat reachability

* handle backwards compatibility accurately

* use shared constant correctly

* feat(router): expose reasoning output in router v2 block (#2945)

* fix(copilot): always allow, credential masking (#2947)

* Fix always allow, credential validation

* Credential masking

* Autoload

* fix(executor): handle condition dead-end branches in loops (#2944)

---------

Co-authored-by: Vikhyath Mondreti <vikhyathvikku@gmail.com>
Co-authored-by: Siddharth Ganesan <33737564+Sg312@users.noreply.github.com>
2026-01-22 13:48:15 -08:00
Waleed
cc2be33d6b v0.5.67: loading, password reset, ui improvements, helm updates (#2928)
* fix(zustand): updated to useShallow from deprecated createWithEqualityFn (#2919)

* fix(logger): use direct env access for webpack inlining (#2920)

* fix(notifications): text overflow with line-clamp (#2921)

* chore(helm): add env vars for Vertex AI, orgs, and telemetry (#2922)

* fix(auth): improve reset password flow and consolidate brand detection (#2924)

* fix(auth): improve reset password flow and consolidate brand detection

* fix(auth): set errorHandled for EMAIL_NOT_VERIFIED to prevent duplicate error

* fix(auth): clear success message on login errors

* chore(auth): fix import order per lint

* fix(action-bar): duplicate subflows with children (#2923)

* fix(action-bar): duplicate subflows with children

* fix(action-bar): add validateTriggerPaste for subflow duplicate

* fix(resolver): agent response format, input formats, root level (#2925)

* fix(resolvers): agent response format, input formats, root level

* fix response block initial seeding

* fix tests

* fix(messages-input): fix cursor alignment and auto-resize with overlay (#2926)

* fix(messages-input): fix cursor alignment and auto-resize with overlay

* fixed remaining zustand warnings

* fix(stores): remove dead code causing log spam on startup (#2927)

* fix(stores): remove dead code causing log spam on startup

* fix(stores): replace custom tools zustand store with react query cache

* improvement(ui): use BrandedButton and BrandedLink components (#2930)

- Refactor auth forms to use BrandedButton component
- Add BrandedLink component for changelog page
- Reduce code duplication in login, signup, reset-password forms
- Update star count default value

* fix(custom-tools): remove unsafe title fallback in getCustomTool (#2929)

* fix(custom-tools): remove unsafe title fallback in getCustomTool

* fix(custom-tools): restore title fallback in getCustomTool lookup

Custom tools are referenced by title (custom_${title}), not database ID.
The title fallback is required for client-side tool resolution to work.

* fix(null-bodies): empty bodies handling (#2931)

* fix(null-statuses): empty bodies handling

* address bugbot comment

* fix(token-refresh): microsoft, notion, x, linear (#2933)

* fix(microsoft): proactive refresh needed

* fix(x): missing token refresh flag

* notion and linear missing flag too

* address bugbot comment

* fix(auth): handle EMAIL_NOT_VERIFIED in onError callback (#2932)

* fix(auth): handle EMAIL_NOT_VERIFIED in onError callback

* refactor(auth): extract redirectToVerify helper to reduce duplication

* fix(workflow-selector): use dedicated selector for workflow dropdown (#2934)

* feat(workflow-block): preview (#2935)

* improvement(copilot): tool configs to show nested props (#2936)

* fix(auth): add genericOAuth providers to trustedProviders (#2937)

---------

Co-authored-by: Vikhyath Mondreti <vikhyathvikku@gmail.com>
Co-authored-by: Emir Karabeg <78010029+emir-karabeg@users.noreply.github.com>
2026-01-21 22:53:25 -08:00
Vikhyath Mondreti
45371e521e v0.5.66: external http requests fix, ring highlighting 2026-01-21 02:55:39 -08:00
Waleed
0ce0f98aa5 v0.5.65: gemini updates, textract integration, ui updates (#2909)
* fix(google): wrap primitive tool responses for Gemini API compatibility (#2900)

* fix(canonical): copilot path + update parent (#2901)

* fix(rss): add top-level title, link, pubDate fields to RSS trigger output (#2902)

* fix(rss): add top-level title, link, pubDate fields to RSS trigger output

* fix(imap): add top-level fields to IMAP trigger output

* improvement(browseruse): add profile id param (#2903)

* improvement(browseruse): add profile id param

* make request a stub since we have directExec

* improvement(executor): upgraded abort controller to handle aborts for loops and parallels (#2880)

* improvement(executor): upgraded abort controller to handle aborts for loops and parallels

* comments

* improvement(files): update execution for passing base64 strings (#2906)

* progress

* improvement(execution): update execution for passing base64 strings

* fix types

* cleanup comments

* path security vuln

* reject promise correctly

* fix redirect case

* remove proxy routes

* fix tests

* use ipaddr

* feat(tools): added textract, added v2 for mistral, updated tag dropdown (#2904)

* feat(tools): added textract

* cleanup

* ack pr comments

* reorder

* removed upload for textract async version

* fix additional fields dropdown in editor, update parser to leave validation to be done on the server

* added mistral v2, files v2, and finalized textract

* updated the rest of the old file patterns, updated mistral outputs for v2

* updated tag dropdown to parse non-operation fields as well

* updated extension finder

* cleanup

* added description for inputs to workflow

* use helper for internal route check

* fix tag dropdown merge conflict change

* remove duplicate code

---------

Co-authored-by: Vikhyath Mondreti <vikhyath@simstudio.ai>

* fix(ui): change add inputs button to match output selector (#2907)

* fix(canvas): removed invite to workspace from canvas popover (#2908)

* fix(canvas): removed invite to workspace

* removed unused props

* fix(copilot): legacy tool display names (#2911)

* fix(a2a): canonical merge  (#2912)

* fix canonical merge

* fix empty array case

* fix(change-detection): copilot diffs have extra field (#2913)

* improvement(logs): improved logs ui bugs, added subflow disable UI (#2910)

* improvement(logs): improved logs ui bugs, added subflow disable UI

* added duplicate to action bar for subflows

* feat(broadcast): email v0.5 (#2905)

---------

Co-authored-by: Vikhyath Mondreti <vikhyathvikku@gmail.com>
Co-authored-by: Vikhyath Mondreti <vikhyath@simstudio.ai>
Co-authored-by: Emir Karabeg <78010029+emir-karabeg@users.noreply.github.com>
2026-01-20 23:54:55 -08:00
Waleed
dff1c9d083 v0.5.64: unsubscribe, search improvements, metrics, additional SSO configuration 2026-01-20 00:34:11 -08:00
Vikhyath Mondreti
b09f683072 v0.5.63: ui and performance improvements, more google tools 2026-01-18 15:22:42 -08:00
Vikhyath Mondreti
a8bb0db660 v0.5.62: webhook bug fixes, seeding default subblock values, block selection fixes 2026-01-16 20:27:06 -08:00
Waleed
af82820a28 v0.5.61: webhook improvements, workflow controls, react query for deployment status, chat fixes, reducto and pulse OCR, linear fixes 2026-01-16 18:06:23 -08:00
Waleed
4372841797 v0.5.60: invitation flow improvements, chat fixes, a2a improvements, additional copilot actions 2026-01-15 00:02:18 -08:00
Waleed
5e8c843241 v0.5.59: a2a support, documentation 2026-01-13 13:21:21 -08:00
Waleed
7bf3d73ee6 v0.5.58: export folders, new tools, permissions groups enhancements 2026-01-13 00:56:59 -08:00
Vikhyath Mondreti
7ffc11a738 v0.5.57: subagents, context menu improvements, bug fixes 2026-01-11 11:38:40 -08:00
Waleed
be578e2ed7 v0.5.56: batch operations, access control and permission groups, billing fixes 2026-01-10 00:31:34 -08:00
Waleed
f415e5edc4 v0.5.55: polling groups, bedrock provider, devcontainer fixes, workflow preview enhancements 2026-01-08 23:36:56 -08:00
Waleed
13a6e6c3fa v0.5.54: seo, model blacklist, helm chart updates, fireflies integration, autoconnect improvements, billing fixes 2026-01-07 16:09:45 -08:00
Waleed
f5ab7f21ae v0.5.53: hotkey improvements, added redis fallback, fixes for workflow tool 2026-01-06 23:34:52 -08:00
Waleed
bfb6fffe38 v0.5.52: new port-based router block, combobox expression and variable support 2026-01-06 16:14:10 -08:00
Waleed
4fbec0a43f v0.5.51: triggers, kb, condition block improvements, supabase and grain integration updates 2026-01-06 14:26:46 -08:00
Waleed
585f5e365b v0.5.50: import improvements, ui upgrades, kb styling and performance improvements 2026-01-05 00:35:55 -08:00
Waleed
3792bdd252 v0.5.49: hitl improvements, new email styles, imap trigger, logs context menu (#2672)
* feat(logs-context-menu): consolidated logs utils and types, added logs record context menu (#2659)

* feat(email): welcome email; improvement(emails): ui/ux (#2658)

* feat(email): welcome email; improvement(emails): ui/ux

* improvement(emails): links, accounts, preview

* refactor(emails): file structure and wrapper components

* added envvar for personal emails sent, added isHosted gate

* fixed failing tests, added env mock

* fix: removed comment

---------

Co-authored-by: waleed <walif6@gmail.com>

* fix(logging): hitl + trigger dev crash protection (#2664)

* hitl gaps

* deal with trigger worker crashes

* cleanup import strcuture

* feat(imap): added support for imap trigger (#2663)

* feat(tools): added support for imap trigger

* feat(imap): added parity, tested

* ack PR comments

* final cleanup

* feat(i18n): update translations (#2665)

Co-authored-by: waleedlatif1 <waleedlatif1@users.noreply.github.com>

* fix(grain): updated grain trigger to auto-establish trigger (#2666)

Co-authored-by: aadamgough <adam@sim.ai>

* feat(admin): routes to manage deployments (#2667)

* feat(admin): routes to manage deployments

* fix naming fo deployed by

* feat(time-picker): added timepicker emcn component, added to playground, added searchable prop for dropdown, added more timezones for schedule, updated license and notice date (#2668)

* feat(time-picker): added timepicker emcn component, added to playground, added searchable prop for dropdown, added more timezones for schedule, updated license and notice date

* removed unused params, cleaned up redundant utils

* improvement(invite): aligned styling (#2669)

* improvement(invite): aligned with rest of app

* fix(invite): error handling

* fix: addressed comments

---------

Co-authored-by: Emir Karabeg <78010029+emir-karabeg@users.noreply.github.com>
Co-authored-by: Vikhyath Mondreti <vikhyathvikku@gmail.com>
Co-authored-by: waleedlatif1 <waleedlatif1@users.noreply.github.com>
Co-authored-by: Adam Gough <77861281+aadamgough@users.noreply.github.com>
Co-authored-by: aadamgough <adam@sim.ai>
2026-01-03 13:19:18 -08:00
Waleed
eb5d1f3e5b v0.5.48: copy-paste workflow blocks, docs updates, mcp tool fixes 2025-12-31 18:00:04 -08:00
Waleed
54ab82c8dd v0.5.47: deploy workflow as mcp, kb chunks tokenizer, UI improvements, jira service management tools 2025-12-30 23:18:58 -08:00
Waleed
f895bf469b v0.5.46: build improvements, greptile, light mode improvements 2025-12-29 02:17:52 -08:00
Waleed
dd3209af06 v0.5.45: light mode fixes, realtime usage indicator, docker build improvements 2025-12-27 19:57:42 -08:00
Waleed
b6ba3b50a7 v0.5.44: keyboard shortcuts, autolayout, light mode, byok, testing improvements 2025-12-26 21:25:19 -08:00
Waleed
b304233062 v0.5.43: export logs, circleback, grain, vertex, code hygiene, schedule improvements 2025-12-23 19:19:18 -08:00
Vikhyath Mondreti
57e4b49bd6 v0.5.42: fix memory migration 2025-12-23 01:24:54 -08:00
Vikhyath Mondreti
e12dd204ed v0.5.41: memory fixes, copilot improvements, knowledgebase improvements, LLM providers standardization 2025-12-23 00:15:18 -08:00
Vikhyath Mondreti
3d9d9cbc54 v0.5.40: supabase ops to allow non-public schemas, jira uuid 2025-12-21 22:28:05 -08:00
Waleed
0f4ec962ad v0.5.39: notion, workflow variables fixes 2025-12-20 20:44:00 -08:00
Waleed
4827866f9a v0.5.38: snap to grid, copilot ux improvements, billing line items 2025-12-20 17:24:38 -08:00
Waleed
3e697d9ed9 v0.5.37: redaction utils consolidation, logs updates, autoconnect improvements, additional kb tag types 2025-12-19 22:31:55 -08:00
Martin Yankov
4431a1a484 fix(helm): add custom egress rules to realtime network policy (#2481)
The realtime service network policy was missing the custom egress rules section
that allows configuration of additional egress rules via values.yaml. This caused
the realtime pods to be unable to connect to external databases (e.g., PostgreSQL
on port 5432) when using external database configurations.

The app network policy already had this section, but the realtime network policy
was missing it, creating an inconsistency and preventing the realtime service
from accessing external databases configured via networkPolicy.egress values.

This fix adds the same custom egress rules template section to the realtime
network policy, matching the app network policy behavior and allowing users to
configure database connectivity via values.yaml.
2025-12-19 18:59:08 -08:00
Waleed
4d1a9a3f22 v0.5.36: hitl improvements, opengraph, slack fixes, one-click unsubscribe, auth checks, new db indexes 2025-12-19 01:27:49 -08:00
Vikhyath Mondreti
eb07a080fb v0.5.35: helm updates, copilot improvements, 404 for docs, salesforce fixes, subflow resize clamping 2025-12-18 16:23:19 -08:00
54 changed files with 607 additions and 2510 deletions

View File

@@ -55,21 +55,21 @@ export const {serviceName}{Action}Tool: ToolConfig<
},
params: {
// Hidden params (system-injected)
// Hidden params (system-injected, only use hidden for oauth accessToken)
accessToken: {
type: 'string',
required: true,
visibility: 'hidden',
description: 'OAuth access token',
},
// User-only params (credentials, IDs user must provide)
// User-only params (credentials, api key, IDs user must provide)
someId: {
type: 'string',
required: true,
visibility: 'user-only',
description: 'The ID of the resource',
},
// User-or-LLM params (can be provided by user OR computed by LLM)
// User-or-LLM params (everything else, can be provided by user OR computed by LLM)
query: {
type: 'string',
required: false, // Use false for optional
@@ -114,8 +114,8 @@ export const {serviceName}{Action}Tool: ToolConfig<
### Visibility Options
- `'hidden'` - System-injected (OAuth tokens, internal params). User never sees.
- `'user-only'` - User must provide (credentials, account-specific IDs)
- `'user-or-llm'` - User provides OR LLM can compute (search queries, content, filters)
- `'user-only'` - User must provide (credentials, api keys, account-specific IDs)
- `'user-or-llm'` - User provides OR LLM can compute (search queries, content, filters, most fall into this category)
### Parameter Types
- `'string'` - Text values

View File

@@ -35,8 +35,7 @@ const AutoLayoutRequestSchema = z.object({
})
.optional()
.default({}),
// Optional: if provided, use these blocks instead of loading from DB
// This allows using blocks with live measurements from the UI
gridSize: z.number().min(0).max(50).optional(),
blocks: z.record(z.any()).optional(),
edges: z.array(z.any()).optional(),
loops: z.record(z.any()).optional(),
@@ -53,7 +52,6 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
const { id: workflowId } = await params
try {
// Get the session
const session = await getSession()
if (!session?.user?.id) {
logger.warn(`[${requestId}] Unauthorized autolayout attempt for workflow ${workflowId}`)
@@ -62,7 +60,6 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
const userId = session.user.id
// Parse request body
const body = await request.json()
const layoutOptions = AutoLayoutRequestSchema.parse(body)
@@ -70,7 +67,6 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
userId,
})
// Fetch the workflow to check ownership/access
const accessContext = await getWorkflowAccessContext(workflowId, userId)
const workflowData = accessContext?.workflow
@@ -79,7 +75,6 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
return NextResponse.json({ error: 'Workflow not found' }, { status: 404 })
}
// Check if user has permission to update this workflow
const canUpdate =
accessContext?.isOwner ||
(workflowData.workspaceId
@@ -94,8 +89,6 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
return NextResponse.json({ error: 'Access denied' }, { status: 403 })
}
// Use provided blocks/edges if available (with live measurements from UI),
// otherwise load from database
let currentWorkflowData: NormalizedWorkflowData | null
if (layoutOptions.blocks && layoutOptions.edges) {
@@ -125,6 +118,7 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
y: layoutOptions.padding?.y ?? DEFAULT_LAYOUT_PADDING.y,
},
alignment: layoutOptions.alignment,
gridSize: layoutOptions.gridSize,
}
const layoutResult = applyAutoLayout(

View File

@@ -1,216 +0,0 @@
import { db, workflow as workflowTable } from '@sim/db'
import { createLogger } from '@sim/logger'
import { eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { v4 as uuidv4 } from 'uuid'
import { z } from 'zod'
import { checkHybridAuth } from '@/lib/auth/hybrid'
import { generateRequestId } from '@/lib/core/utils/request'
import { SSE_HEADERS } from '@/lib/core/utils/sse'
import { markExecutionCancelled } from '@/lib/execution/cancellation'
import { LoggingSession } from '@/lib/logs/execution/logging-session'
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
import { createSSECallbacks } from '@/lib/workflows/executor/execution-events'
import { ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { ExecutionMetadata, SerializableExecutionState } from '@/executor/execution/types'
import { hasExecutionResult } from '@/executor/utils/errors'
const logger = createLogger('ExecuteFromBlockAPI')
const ExecuteFromBlockSchema = z.object({
startBlockId: z.string().min(1, 'Start block ID is required'),
sourceSnapshot: z.object({
blockStates: z.record(z.any()),
executedBlocks: z.array(z.string()),
blockLogs: z.array(z.any()),
decisions: z.object({
router: z.record(z.string()),
condition: z.record(z.string()),
}),
completedLoops: z.array(z.string()),
loopExecutions: z.record(z.any()).optional(),
parallelExecutions: z.record(z.any()).optional(),
parallelBlockMapping: z.record(z.any()).optional(),
activeExecutionPath: z.array(z.string()),
}),
input: z.any().optional(),
})
export const runtime = 'nodejs'
export const dynamic = 'force-dynamic'
export async function POST(req: NextRequest, { params }: { params: Promise<{ id: string }> }) {
const requestId = generateRequestId()
const { id: workflowId } = await params
try {
const auth = await checkHybridAuth(req, { requireWorkflowId: false })
if (!auth.success || !auth.userId) {
return NextResponse.json({ error: auth.error || 'Unauthorized' }, { status: 401 })
}
const userId = auth.userId
let body: unknown
try {
body = await req.json()
} catch {
return NextResponse.json({ error: 'Invalid JSON body' }, { status: 400 })
}
const validation = ExecuteFromBlockSchema.safeParse(body)
if (!validation.success) {
logger.warn(`[${requestId}] Invalid request body:`, validation.error.errors)
return NextResponse.json(
{
error: 'Invalid request body',
details: validation.error.errors.map((e) => ({
path: e.path.join('.'),
message: e.message,
})),
},
{ status: 400 }
)
}
const { startBlockId, sourceSnapshot, input } = validation.data
const executionId = uuidv4()
const [workflowRecord] = await db
.select({ workspaceId: workflowTable.workspaceId, userId: workflowTable.userId })
.from(workflowTable)
.where(eq(workflowTable.id, workflowId))
.limit(1)
if (!workflowRecord?.workspaceId) {
return NextResponse.json({ error: 'Workflow not found or has no workspace' }, { status: 404 })
}
const workspaceId = workflowRecord.workspaceId
const workflowUserId = workflowRecord.userId
logger.info(`[${requestId}] Starting run-from-block execution`, {
workflowId,
startBlockId,
executedBlocksCount: sourceSnapshot.executedBlocks.length,
})
const loggingSession = new LoggingSession(workflowId, executionId, 'manual', requestId)
const abortController = new AbortController()
let isStreamClosed = false
const stream = new ReadableStream<Uint8Array>({
async start(controller) {
const { sendEvent, onBlockStart, onBlockComplete, onStream } = createSSECallbacks({
executionId,
workflowId,
controller,
isStreamClosed: () => isStreamClosed,
setStreamClosed: () => {
isStreamClosed = true
},
})
const metadata: ExecutionMetadata = {
requestId,
workflowId,
userId,
executionId,
triggerType: 'manual',
workspaceId,
workflowUserId,
useDraftState: true,
isClientSession: true,
startTime: new Date().toISOString(),
}
const snapshot = new ExecutionSnapshot(metadata, {}, input || {}, {})
try {
const startTime = new Date()
sendEvent({
type: 'execution:started',
timestamp: startTime.toISOString(),
executionId,
workflowId,
data: { startTime: startTime.toISOString() },
})
const result = await executeWorkflowCore({
snapshot,
loggingSession,
abortSignal: abortController.signal,
runFromBlock: {
startBlockId,
sourceSnapshot: sourceSnapshot as SerializableExecutionState,
},
callbacks: { onBlockStart, onBlockComplete, onStream },
})
if (result.status === 'cancelled') {
sendEvent({
type: 'execution:cancelled',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: { duration: result.metadata?.duration || 0 },
})
} else {
sendEvent({
type: 'execution:completed',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
success: result.success,
output: result.output,
duration: result.metadata?.duration || 0,
startTime: result.metadata?.startTime || startTime.toISOString(),
endTime: result.metadata?.endTime || new Date().toISOString(),
},
})
}
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error(`[${requestId}] Run-from-block execution failed: ${errorMessage}`)
const executionResult = hasExecutionResult(error) ? error.executionResult : undefined
sendEvent({
type: 'execution:error',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
error: executionResult?.error || errorMessage,
duration: executionResult?.metadata?.duration || 0,
},
})
} finally {
if (!isStreamClosed) {
try {
controller.enqueue(new TextEncoder().encode('data: [DONE]\n\n'))
controller.close()
} catch {}
}
}
},
cancel() {
isStreamClosed = true
abortController.abort()
markExecutionCancelled(executionId).catch(() => {})
},
})
return new NextResponse(stream, {
headers: { ...SSE_HEADERS, 'X-Execution-Id': executionId },
})
} catch (error: unknown) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error'
logger.error(`[${requestId}] Failed to start run-from-block execution:`, error)
return NextResponse.json(
{ error: errorMessage || 'Failed to start execution' },
{ status: 500 }
)
}
}

View File

@@ -53,7 +53,6 @@ const ExecuteWorkflowSchema = z.object({
parallels: z.record(z.any()).optional(),
})
.optional(),
stopAfterBlockId: z.string().optional(),
})
export const runtime = 'nodejs'
@@ -223,7 +222,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
includeFileBase64,
base64MaxBytes,
workflowStateOverride,
stopAfterBlockId,
} = validation.data
// For API key and internal JWT auth, the entire body is the input (except for our control fields)
@@ -239,7 +237,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
includeFileBase64,
base64MaxBytes,
workflowStateOverride,
stopAfterBlockId: _stopAfterBlockId,
workflowId: _workflowId, // Also exclude workflowId used for internal JWT auth
...rest
} = body
@@ -437,7 +434,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
loggingSession,
includeFileBase64,
base64MaxBytes,
stopAfterBlockId,
})
const outputWithBase64 = includeFileBase64
@@ -726,7 +722,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
abortSignal: abortController.signal,
includeFileBase64,
base64MaxBytes,
stopAfterBlockId,
})
if (result.status === 'paused') {

View File

@@ -1,108 +0,0 @@
import { createLogger } from '@sim/logger'
import { type NextRequest, NextResponse } from 'next/server'
import { z } from 'zod'
import { generateRequestId } from '@/lib/core/utils/request'
import { applyAutoLayout } from '@/lib/workflows/autolayout'
import {
DEFAULT_HORIZONTAL_SPACING,
DEFAULT_LAYOUT_PADDING,
DEFAULT_VERTICAL_SPACING,
} from '@/lib/workflows/autolayout/constants'
const logger = createLogger('YamlAutoLayoutAPI')
const AutoLayoutRequestSchema = z.object({
workflowState: z.object({
blocks: z.record(z.any()),
edges: z.array(z.any()),
loops: z.record(z.any()).optional().default({}),
parallels: z.record(z.any()).optional().default({}),
}),
options: z
.object({
spacing: z
.object({
horizontal: z.number().optional(),
vertical: z.number().optional(),
})
.optional(),
alignment: z.enum(['start', 'center', 'end']).optional(),
padding: z
.object({
x: z.number().optional(),
y: z.number().optional(),
})
.optional(),
})
.optional(),
})
export async function POST(request: NextRequest) {
const requestId = generateRequestId()
try {
const body = await request.json()
const { workflowState, options } = AutoLayoutRequestSchema.parse(body)
logger.info(`[${requestId}] Applying auto layout`, {
blockCount: Object.keys(workflowState.blocks).length,
edgeCount: workflowState.edges.length,
})
const autoLayoutOptions = {
horizontalSpacing: options?.spacing?.horizontal ?? DEFAULT_HORIZONTAL_SPACING,
verticalSpacing: options?.spacing?.vertical ?? DEFAULT_VERTICAL_SPACING,
padding: {
x: options?.padding?.x ?? DEFAULT_LAYOUT_PADDING.x,
y: options?.padding?.y ?? DEFAULT_LAYOUT_PADDING.y,
},
alignment: options?.alignment ?? 'center',
}
const layoutResult = applyAutoLayout(
workflowState.blocks,
workflowState.edges,
autoLayoutOptions
)
if (!layoutResult.success || !layoutResult.blocks) {
logger.error(`[${requestId}] Auto layout failed:`, {
error: layoutResult.error,
})
return NextResponse.json(
{
success: false,
errors: [layoutResult.error || 'Unknown auto layout error'],
},
{ status: 500 }
)
}
logger.info(`[${requestId}] Auto layout completed successfully:`, {
success: true,
blockCount: Object.keys(layoutResult.blocks).length,
})
const transformedResponse = {
success: true,
workflowState: {
blocks: layoutResult.blocks,
edges: workflowState.edges,
loops: workflowState.loops || {},
parallels: workflowState.parallels || {},
},
}
return NextResponse.json(transformedResponse)
} catch (error) {
logger.error(`[${requestId}] Auto layout failed:`, error)
return NextResponse.json(
{
success: false,
errors: [error instanceof Error ? error.message : 'Unknown auto layout error'],
},
{ status: 500 }
)
}
}

View File

@@ -1,13 +1,11 @@
import { memo, useCallback } from 'react'
import { ArrowLeftRight, ArrowUpDown, Circle, CircleOff, LogOut, Play } from 'lucide-react'
import { ArrowLeftRight, ArrowUpDown, Circle, CircleOff, LogOut } from 'lucide-react'
import { Button, Copy, Tooltip, Trash2 } from '@/components/emcn'
import { cn } from '@/lib/core/utils/cn'
import { isInputDefinitionTrigger } from '@/lib/workflows/triggers/input-definition-triggers'
import { useUserPermissionsContext } from '@/app/workspace/[workspaceId]/providers/workspace-permissions-provider'
import { useWorkflowExecution } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks'
import { validateTriggerPaste } from '@/app/workspace/[workspaceId]/w/[workflowId]/utils'
import { useCollaborativeWorkflow } from '@/hooks/use-collaborative-workflow'
import { useExecutionStore } from '@/stores/execution'
import { useNotificationStore } from '@/stores/notifications'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
import { useWorkflowStore } from '@/stores/workflows/workflow/store'
@@ -51,7 +49,6 @@ export const ActionBar = memo(
collaborativeBatchToggleBlockHandles,
} = useCollaborativeWorkflow()
const { setPendingSelection } = useWorkflowRegistry()
const { handleRunFromBlock } = useWorkflowExecution()
const addNotification = useNotificationStore((s) => s.addNotification)
@@ -100,39 +97,12 @@ export const ActionBar = memo(
)
)
const { activeWorkflowId } = useWorkflowRegistry()
const { isExecuting, getLastExecutionSnapshot } = useExecutionStore()
const userPermissions = useUserPermissionsContext()
const edges = useWorkflowStore((state) => state.edges)
const isStartBlock = isInputDefinitionTrigger(blockType)
const isResponseBlock = blockType === 'response'
const isNoteBlock = blockType === 'note'
const isSubflowBlock = blockType === 'loop' || blockType === 'parallel'
const isInsideSubflow = parentId && (parentType === 'loop' || parentType === 'parallel')
const snapshot = activeWorkflowId ? getLastExecutionSnapshot(activeWorkflowId) : null
const incomingEdges = edges.filter((edge) => edge.target === blockId)
const isTriggerBlock = incomingEdges.length === 0
// Check if each source block is either executed OR is a trigger block (triggers don't need prior execution)
const isSourceSatisfied = (sourceId: string) => {
if (snapshot?.executedBlocks.includes(sourceId)) return true
// Check if source is a trigger (has no incoming edges itself)
const sourceIncomingEdges = edges.filter((edge) => edge.target === sourceId)
return sourceIncomingEdges.length === 0
}
// Non-trigger blocks need a snapshot to exist (so upstream outputs are available)
const dependenciesSatisfied =
isTriggerBlock || (snapshot && incomingEdges.every((edge) => isSourceSatisfied(edge.source)))
const canRunFromBlock =
dependenciesSatisfied && !isNoteBlock && !isInsideSubflow && !isExecuting
const handleRunFromBlockClick = useCallback(() => {
if (!activeWorkflowId || !canRunFromBlock) return
handleRunFromBlock(blockId, activeWorkflowId)
}, [blockId, activeWorkflowId, canRunFromBlock, handleRunFromBlock])
/**
* Get appropriate tooltip message based on disabled state
@@ -158,35 +128,30 @@ export const ActionBar = memo(
'dark:border-transparent dark:bg-[var(--surface-4)]'
)}
>
{!isNoteBlock && !isInsideSubflow && (
{!isNoteBlock && (
<Tooltip.Root>
<Tooltip.Trigger asChild>
<Button
variant='ghost'
onClick={(e) => {
e.stopPropagation()
if (canRunFromBlock && !disabled) {
handleRunFromBlockClick()
if (!disabled) {
collaborativeBatchToggleBlockEnabled([blockId])
}
}}
className={ACTION_BUTTON_STYLES}
disabled={disabled || !canRunFromBlock}
disabled={disabled}
>
<Play className={ICON_SIZE} />
{isEnabled ? <Circle className={ICON_SIZE} /> : <CircleOff className={ICON_SIZE} />}
</Button>
</Tooltip.Trigger>
<Tooltip.Content side='top'>
{(() => {
if (disabled) return getTooltipMessage('Run from block')
if (isExecuting) return 'Execution in progress'
if (!dependenciesSatisfied) return 'Run upstream blocks first'
return 'Run from block'
})()}
{getTooltipMessage(isEnabled ? 'Disable Block' : 'Enable Block')}
</Tooltip.Content>
</Tooltip.Root>
)}
{!isNoteBlock && (
{isSubflowBlock && (
<Tooltip.Root>
<Tooltip.Trigger asChild>
<Button

View File

@@ -40,16 +40,9 @@ export interface BlockMenuProps {
onRemoveFromSubflow: () => void
onOpenEditor: () => void
onRename: () => void
onRunFromBlock?: () => void
onRunUntilBlock?: () => void
hasClipboard?: boolean
showRemoveFromSubflow?: boolean
/** Whether run from block is available (has snapshot, was executed, not inside subflow) */
canRunFromBlock?: boolean
disableEdit?: boolean
isExecuting?: boolean
/** Whether the selected block is a trigger (has no incoming edges) */
isPositionalTrigger?: boolean
}
/**
@@ -72,14 +65,9 @@ export function BlockMenu({
onRemoveFromSubflow,
onOpenEditor,
onRename,
onRunFromBlock,
onRunUntilBlock,
hasClipboard = false,
showRemoveFromSubflow = false,
canRunFromBlock = false,
disableEdit = false,
isExecuting = false,
isPositionalTrigger = false,
}: BlockMenuProps) {
const isSingleBlock = selectedBlocks.length === 1
@@ -90,15 +78,10 @@ export function BlockMenu({
(b) =>
TriggerUtils.requiresSingleInstance(b.type) || TriggerUtils.isSingleInstanceBlockType(b.type)
)
// A block is a trigger if it's explicitly a trigger type OR has no incoming edges (positional trigger)
const hasTriggerBlock =
selectedBlocks.some((b) => TriggerUtils.isTriggerBlock(b)) || isPositionalTrigger
const hasTriggerBlock = selectedBlocks.some((b) => TriggerUtils.isTriggerBlock(b))
const allNoteBlocks = selectedBlocks.every((b) => b.type === 'note')
const isSubflow =
isSingleBlock && (selectedBlocks[0]?.type === 'loop' || selectedBlocks[0]?.type === 'parallel')
const isInsideSubflow =
isSingleBlock &&
(selectedBlocks[0]?.parentType === 'loop' || selectedBlocks[0]?.parentType === 'parallel')
const canRemoveFromSubflow = showRemoveFromSubflow && !hasTriggerBlock
@@ -220,38 +203,6 @@ export function BlockMenu({
</PopoverItem>
)}
{/* Run from/until block - only for single non-note block, not inside subflows */}
{isSingleBlock && !allNoteBlocks && !isInsideSubflow && (
<>
<PopoverDivider />
<PopoverItem
disabled={!canRunFromBlock || isExecuting}
onClick={() => {
if (canRunFromBlock && !isExecuting) {
onRunFromBlock?.()
onClose()
}
}}
>
Run from block
</PopoverItem>
{/* Hide "Run until" for triggers - they're always at the start */}
{!hasTriggerBlock && (
<PopoverItem
disabled={isExecuting}
onClick={() => {
if (!isExecuting) {
onRunUntilBlock?.()
onClose()
}
}}
>
Run until block
</PopoverItem>
)}
</>
)}
{/* Destructive action */}
<PopoverDivider />
<PopoverItem

View File

@@ -3,6 +3,7 @@ import { createLogger } from '@sim/logger'
import { useReactFlow } from 'reactflow'
import type { AutoLayoutOptions } from '@/app/workspace/[workspaceId]/w/[workflowId]/utils/auto-layout-utils'
import { applyAutoLayoutAndUpdateStore as applyAutoLayoutStandalone } from '@/app/workspace/[workspaceId]/w/[workflowId]/utils/auto-layout-utils'
import { useSnapToGridSize } from '@/hooks/queries/general-settings'
import { useCanvasViewport } from '@/hooks/use-canvas-viewport'
export type { AutoLayoutOptions }
@@ -13,21 +14,28 @@ const logger = createLogger('useAutoLayout')
* Hook providing auto-layout functionality for workflows.
* Binds workflowId context and provides memoized callback for React components.
* Includes automatic fitView animation after successful layout.
* Automatically uses the user's snap-to-grid setting for grid-aligned layout.
*
* Note: This hook requires a ReactFlowProvider ancestor.
*/
export function useAutoLayout(workflowId: string | null) {
const reactFlowInstance = useReactFlow()
const { fitViewToBounds } = useCanvasViewport(reactFlowInstance)
const snapToGridSize = useSnapToGridSize()
const applyAutoLayoutAndUpdateStore = useCallback(
async (options: AutoLayoutOptions = {}) => {
if (!workflowId) {
return { success: false, error: 'No workflow ID provided' }
}
return applyAutoLayoutStandalone(workflowId, options)
// Include gridSize from user's snap-to-grid setting
const optionsWithGrid: AutoLayoutOptions = {
...options,
gridSize: options.gridSize ?? (snapToGridSize > 0 ? snapToGridSize : undefined),
}
return applyAutoLayoutStandalone(workflowId, optionsWithGrid)
},
[workflowId]
[workflowId, snapToGridSize]
)
/**

View File

@@ -15,16 +15,13 @@ import {
TriggerUtils,
} from '@/lib/workflows/triggers/triggers'
import { useCurrentWorkflow } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-current-workflow'
import { getBlock } from '@/blocks'
import type { SerializableExecutionState } from '@/executor/execution/types'
import type { BlockLog, BlockState, ExecutionResult, StreamingExecution } from '@/executor/types'
import type { BlockLog, ExecutionResult, StreamingExecution } from '@/executor/types'
import { hasExecutionResult } from '@/executor/utils/errors'
import { coerceValue } from '@/executor/utils/start-block'
import { subscriptionKeys } from '@/hooks/queries/subscription'
import { useExecutionStream } from '@/hooks/use-execution-stream'
import { WorkflowValidationError } from '@/serializer'
import { useExecutionStore } from '@/stores/execution'
import { useNotificationStore } from '@/stores/notifications'
import { useVariablesStore } from '@/stores/panel'
import { useEnvironmentStore } from '@/stores/settings/environment'
import { type ConsoleEntry, useTerminalConsoleStore } from '@/stores/terminal'
@@ -101,15 +98,11 @@ export function useWorkflowExecution() {
setActiveBlocks,
setBlockRunStatus,
setEdgeRunStatus,
setLastExecutionSnapshot,
getLastExecutionSnapshot,
clearLastExecutionSnapshot,
} = useExecutionStore()
const [executionResult, setExecutionResult] = useState<ExecutionResult | null>(null)
const executionStream = useExecutionStream()
const currentChatExecutionIdRef = useRef<string | null>(null)
const isViewingDiff = useWorkflowDiffStore((state) => state.isShowingDiff)
const addNotification = useNotificationStore((state) => state.addNotification)
/**
* Validates debug state before performing debug operations
@@ -675,8 +668,7 @@ export function useWorkflowExecution() {
onStream?: (se: StreamingExecution) => Promise<void>,
executionId?: string,
onBlockComplete?: (blockId: string, output: any) => Promise<void>,
overrideTriggerType?: 'chat' | 'manual' | 'api',
stopAfterBlockId?: string
overrideTriggerType?: 'chat' | 'manual' | 'api'
): Promise<ExecutionResult | StreamingExecution> => {
// Use diff workflow for execution when available, regardless of canvas view state
const executionWorkflowState = null as {
@@ -884,8 +876,6 @@ export function useWorkflowExecution() {
const activeBlocksSet = new Set<string>()
const streamedContent = new Map<string, string>()
const accumulatedBlockLogs: BlockLog[] = []
const accumulatedBlockStates = new Map<string, BlockState>()
const executedBlockIds = new Set<string>()
// Execute the workflow
try {
@@ -897,7 +887,6 @@ export function useWorkflowExecution() {
triggerType: overrideTriggerType || 'manual',
useDraftState: true,
isClientSession: true,
stopAfterBlockId,
workflowStateOverride: executionWorkflowState
? {
blocks: executionWorkflowState.blocks,
@@ -927,22 +916,18 @@ export function useWorkflowExecution() {
logger.info('onBlockCompleted received:', { data })
activeBlocksSet.delete(data.blockId)
// Create a new Set to trigger React re-render
setActiveBlocks(new Set(activeBlocksSet))
// Track successful block execution in run path
setBlockRunStatus(data.blockId, 'success')
executedBlockIds.add(data.blockId)
accumulatedBlockStates.set(data.blockId, {
output: data.output,
executed: true,
executionTime: data.durationMs,
})
const isContainerBlock = data.blockType === 'loop' || data.blockType === 'parallel'
if (isContainerBlock) return
// Edges already tracked in onBlockStarted, no need to track again
const startedAt = new Date(Date.now() - data.durationMs).toISOString()
const endedAt = new Date().toISOString()
// Accumulate block log for the execution result
accumulatedBlockLogs.push({
blockId: data.blockId,
blockName: data.blockName || 'Unknown Block',
@@ -1071,53 +1056,6 @@ export function useWorkflowExecution() {
},
logs: accumulatedBlockLogs,
}
// Add trigger block to executed blocks so downstream blocks can use run-from-block
if (data.success && startBlockId) {
executedBlockIds.add(startBlockId)
}
if (data.success && activeWorkflowId) {
if (stopAfterBlockId) {
const existingSnapshot = getLastExecutionSnapshot(activeWorkflowId)
const mergedBlockStates = {
...(existingSnapshot?.blockStates || {}),
...Object.fromEntries(accumulatedBlockStates),
}
const mergedExecutedBlocks = new Set([
...(existingSnapshot?.executedBlocks || []),
...executedBlockIds,
])
const snapshot: SerializableExecutionState = {
blockStates: mergedBlockStates,
executedBlocks: Array.from(mergedExecutedBlocks),
blockLogs: [...(existingSnapshot?.blockLogs || []), ...accumulatedBlockLogs],
decisions: existingSnapshot?.decisions || { router: {}, condition: {} },
completedLoops: existingSnapshot?.completedLoops || [],
activeExecutionPath: Array.from(mergedExecutedBlocks),
}
setLastExecutionSnapshot(activeWorkflowId, snapshot)
logger.info('Merged execution snapshot after run-until-block', {
workflowId: activeWorkflowId,
newBlocksExecuted: executedBlockIds.size,
totalExecutedBlocks: mergedExecutedBlocks.size,
})
} else {
const snapshot: SerializableExecutionState = {
blockStates: Object.fromEntries(accumulatedBlockStates),
executedBlocks: Array.from(executedBlockIds),
blockLogs: accumulatedBlockLogs,
decisions: { router: {}, condition: {} },
completedLoops: [],
activeExecutionPath: Array.from(executedBlockIds),
}
setLastExecutionSnapshot(activeWorkflowId, snapshot)
logger.info('Stored execution snapshot for run-from-block', {
workflowId: activeWorkflowId,
executedBlocksCount: executedBlockIds.size,
})
}
}
},
onExecutionError: (data) => {
@@ -1438,330 +1376,6 @@ export function useWorkflowExecution() {
setActiveBlocks,
])
/**
* Handles running workflow from a specific block using cached outputs
*/
const handleRunFromBlock = useCallback(
async (blockId: string, workflowId: string) => {
const snapshot = getLastExecutionSnapshot(workflowId)
const workflowEdges = useWorkflowStore.getState().edges
const incomingEdges = workflowEdges.filter((edge) => edge.target === blockId)
const isTriggerBlock = incomingEdges.length === 0
// Check if each source block is either executed OR is a trigger block (triggers don't need prior execution)
const isSourceSatisfied = (sourceId: string) => {
if (snapshot?.executedBlocks.includes(sourceId)) return true
// Check if source is a trigger (has no incoming edges itself)
const sourceIncomingEdges = workflowEdges.filter((edge) => edge.target === sourceId)
return sourceIncomingEdges.length === 0
}
// Non-trigger blocks need a snapshot to exist (so upstream outputs are available)
if (!snapshot && !isTriggerBlock) {
logger.error('No execution snapshot available for run-from-block', { workflowId, blockId })
return
}
const dependenciesSatisfied =
isTriggerBlock || incomingEdges.every((edge) => isSourceSatisfied(edge.source))
if (!dependenciesSatisfied) {
logger.error('Upstream dependencies not satisfied for run-from-block', {
workflowId,
blockId,
})
return
}
// For trigger blocks, always use empty snapshot to prevent stale data from different
// execution paths from being resolved. For non-trigger blocks, use the existing snapshot.
const emptySnapshot: SerializableExecutionState = {
blockStates: {},
executedBlocks: [],
blockLogs: [],
decisions: { router: {}, condition: {} },
completedLoops: [],
activeExecutionPath: [],
}
const effectiveSnapshot: SerializableExecutionState = isTriggerBlock
? emptySnapshot
: snapshot || emptySnapshot
// Extract mock payload for trigger blocks
let workflowInput: any
if (isTriggerBlock) {
const workflowBlocks = useWorkflowStore.getState().blocks
const mergedStates = mergeSubblockState(workflowBlocks, workflowId)
const candidates = resolveStartCandidates(mergedStates, { execution: 'manual' })
const candidate = candidates.find((c) => c.blockId === blockId)
if (candidate) {
if (triggerNeedsMockPayload(candidate)) {
workflowInput = extractTriggerMockPayload(candidate)
} else if (
candidate.path === StartBlockPath.SPLIT_API ||
candidate.path === StartBlockPath.SPLIT_INPUT ||
candidate.path === StartBlockPath.UNIFIED
) {
const inputFormatValue = candidate.block.subBlocks?.inputFormat?.value
if (Array.isArray(inputFormatValue)) {
const testInput: Record<string, any> = {}
inputFormatValue.forEach((field: any) => {
if (field && typeof field === 'object' && field.name && field.value !== undefined) {
testInput[field.name] = coerceValue(field.type, field.value)
}
})
if (Object.keys(testInput).length > 0) {
workflowInput = testInput
}
}
}
} else {
// Fallback: block is trigger by position but not classified as start candidate
const block = mergedStates[blockId]
if (block) {
const blockConfig = getBlock(block.type)
const hasTriggers = blockConfig?.triggers?.available?.length
if (hasTriggers || block.triggerMode) {
workflowInput = extractTriggerMockPayload({
blockId,
block,
path: StartBlockPath.EXTERNAL_TRIGGER,
})
}
}
}
}
setIsExecuting(true)
const executionId = uuidv4()
const accumulatedBlockLogs: BlockLog[] = []
const accumulatedBlockStates = new Map<string, BlockState>()
const executedBlockIds = new Set<string>()
const activeBlocksSet = new Set<string>()
try {
await executionStream.executeFromBlock({
workflowId,
startBlockId: blockId,
sourceSnapshot: effectiveSnapshot,
input: workflowInput,
callbacks: {
onBlockStarted: (data) => {
activeBlocksSet.add(data.blockId)
setActiveBlocks(new Set(activeBlocksSet))
const incomingEdges = workflowEdges.filter((edge) => edge.target === data.blockId)
incomingEdges.forEach((edge) => {
setEdgeRunStatus(edge.id, 'success')
})
},
onBlockCompleted: (data) => {
activeBlocksSet.delete(data.blockId)
setActiveBlocks(new Set(activeBlocksSet))
setBlockRunStatus(data.blockId, 'success')
executedBlockIds.add(data.blockId)
accumulatedBlockStates.set(data.blockId, {
output: data.output,
executed: true,
executionTime: data.durationMs,
})
const isContainerBlock = data.blockType === 'loop' || data.blockType === 'parallel'
if (isContainerBlock) return
const startedAt = new Date(Date.now() - data.durationMs).toISOString()
const endedAt = new Date().toISOString()
accumulatedBlockLogs.push({
blockId: data.blockId,
blockName: data.blockName || 'Unknown Block',
blockType: data.blockType || 'unknown',
input: data.input || {},
output: data.output,
success: true,
durationMs: data.durationMs,
startedAt,
endedAt,
})
addConsole({
input: data.input || {},
output: data.output,
success: true,
durationMs: data.durationMs,
startedAt,
endedAt,
workflowId,
blockId: data.blockId,
executionId,
blockName: data.blockName || 'Unknown Block',
blockType: data.blockType || 'unknown',
iterationCurrent: data.iterationCurrent,
iterationTotal: data.iterationTotal,
iterationType: data.iterationType,
})
},
onBlockError: (data) => {
activeBlocksSet.delete(data.blockId)
setActiveBlocks(new Set(activeBlocksSet))
setBlockRunStatus(data.blockId, 'error')
const startedAt = new Date(Date.now() - data.durationMs).toISOString()
const endedAt = new Date().toISOString()
accumulatedBlockLogs.push({
blockId: data.blockId,
blockName: data.blockName || 'Unknown Block',
blockType: data.blockType || 'unknown',
input: data.input || {},
output: {},
success: false,
error: data.error,
durationMs: data.durationMs,
startedAt,
endedAt,
})
addConsole({
input: data.input || {},
output: {},
success: false,
error: data.error,
durationMs: data.durationMs,
startedAt,
endedAt,
workflowId,
blockId: data.blockId,
executionId,
blockName: data.blockName,
blockType: data.blockType,
iterationCurrent: data.iterationCurrent,
iterationTotal: data.iterationTotal,
iterationType: data.iterationType,
})
},
onExecutionCompleted: (data) => {
if (data.success) {
// Add the start block (trigger) to executed blocks
executedBlockIds.add(blockId)
const mergedBlockStates: Record<string, BlockState> = {
...effectiveSnapshot.blockStates,
}
for (const [bId, state] of accumulatedBlockStates) {
mergedBlockStates[bId] = state
}
const mergedExecutedBlocks = new Set([
...effectiveSnapshot.executedBlocks,
...executedBlockIds,
])
const updatedSnapshot: SerializableExecutionState = {
...effectiveSnapshot,
blockStates: mergedBlockStates,
executedBlocks: Array.from(mergedExecutedBlocks),
blockLogs: [...effectiveSnapshot.blockLogs, ...accumulatedBlockLogs],
activeExecutionPath: Array.from(mergedExecutedBlocks),
}
setLastExecutionSnapshot(workflowId, updatedSnapshot)
}
},
onExecutionError: (data) => {
const isWorkflowModified =
data.error?.includes('Block not found in workflow') ||
data.error?.includes('Upstream dependency not executed')
if (isWorkflowModified) {
clearLastExecutionSnapshot(workflowId)
addNotification({
level: 'error',
message:
'Workflow was modified. Run the workflow again to enable running from block.',
workflowId,
})
} else {
addNotification({
level: 'error',
message: data.error || 'Run from block failed',
workflowId,
})
}
},
},
})
} catch (error) {
if ((error as Error).name !== 'AbortError') {
logger.error('Run-from-block failed:', error)
}
} finally {
setIsExecuting(false)
setActiveBlocks(new Set())
}
},
[
getLastExecutionSnapshot,
setLastExecutionSnapshot,
clearLastExecutionSnapshot,
setIsExecuting,
setActiveBlocks,
setBlockRunStatus,
setEdgeRunStatus,
addNotification,
addConsole,
executionStream,
]
)
/**
* Handles running workflow until a specific block (stops after that block completes)
*/
const handleRunUntilBlock = useCallback(
async (blockId: string, workflowId: string) => {
if (!workflowId || workflowId !== activeWorkflowId) {
logger.error('Invalid workflow ID for run-until-block', { workflowId, activeWorkflowId })
return
}
logger.info('Starting run-until-block execution', { workflowId, stopAfterBlockId: blockId })
setExecutionResult(null)
setIsExecuting(true)
const executionId = uuidv4()
try {
const result = await executeWorkflow(
undefined,
undefined,
executionId,
undefined,
'manual',
blockId
)
if (result && 'success' in result) {
setExecutionResult(result)
}
} catch (error) {
const errorResult = handleExecutionError(error, { executionId })
return errorResult
} finally {
setIsExecuting(false)
setIsDebugging(false)
setActiveBlocks(new Set())
}
},
[activeWorkflowId, setExecutionResult, setIsExecuting, setIsDebugging, setActiveBlocks]
)
return {
isExecuting,
isDebugging,
@@ -1772,7 +1386,5 @@ export function useWorkflowExecution() {
handleResumeDebug,
handleCancelDebug,
handleCancelExecution,
handleRunFromBlock,
handleRunUntilBlock,
}
}

View File

@@ -21,6 +21,7 @@ export interface AutoLayoutOptions {
x?: number
y?: number
}
gridSize?: number
}
/**
@@ -62,6 +63,7 @@ export async function applyAutoLayoutAndUpdateStore(
x: options.padding?.x ?? DEFAULT_LAYOUT_PADDING.x,
y: options.padding?.y ?? DEFAULT_LAYOUT_PADDING.y,
},
gridSize: options.gridSize,
}
// Call the autolayout API route

View File

@@ -47,7 +47,6 @@ import {
useCurrentWorkflow,
useNodeUtilities,
useShiftSelectionLock,
useWorkflowExecution,
} from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks'
import {
calculateContainerDimensions,
@@ -303,8 +302,6 @@ const WorkflowContent = React.memo(() => {
const showTrainingModal = useCopilotTrainingStore((state) => state.showModal)
const { handleRunFromBlock, handleRunUntilBlock } = useWorkflowExecution()
const snapToGridSize = useSnapToGridSize()
const snapToGrid = snapToGridSize > 0
@@ -736,16 +733,13 @@ const WorkflowContent = React.memo(() => {
[collaborativeBatchAddBlocks, setSelectedEdges, setPendingSelection]
)
const { activeBlockIds, pendingBlocks, isDebugging, isExecuting, getLastExecutionSnapshot } =
useExecutionStore(
useShallow((state) => ({
activeBlockIds: state.activeBlockIds,
pendingBlocks: state.pendingBlocks,
isDebugging: state.isDebugging,
isExecuting: state.isExecuting,
getLastExecutionSnapshot: state.getLastExecutionSnapshot,
}))
)
const { activeBlockIds, pendingBlocks, isDebugging } = useExecutionStore(
useShallow((state) => ({
activeBlockIds: state.activeBlockIds,
pendingBlocks: state.pendingBlocks,
isDebugging: state.isDebugging,
}))
)
const [dragStartParentId, setDragStartParentId] = useState<string | null>(null)
@@ -1108,50 +1102,6 @@ const WorkflowContent = React.memo(() => {
}
}, [contextMenuBlocks])
const handleContextRunFromBlock = useCallback(() => {
if (contextMenuBlocks.length !== 1) return
const blockId = contextMenuBlocks[0].id
handleRunFromBlock(blockId, workflowIdParam)
}, [contextMenuBlocks, workflowIdParam, handleRunFromBlock])
const handleContextRunUntilBlock = useCallback(() => {
if (contextMenuBlocks.length !== 1) return
const blockId = contextMenuBlocks[0].id
handleRunUntilBlock(blockId, workflowIdParam)
}, [contextMenuBlocks, workflowIdParam, handleRunUntilBlock])
const runFromBlockState = useMemo(() => {
if (contextMenuBlocks.length !== 1) {
return { canRun: false, reason: undefined }
}
const block = contextMenuBlocks[0]
const snapshot = getLastExecutionSnapshot(workflowIdParam)
const incomingEdges = edges.filter((edge) => edge.target === block.id)
const isTriggerBlock = incomingEdges.length === 0
// Check if each source block is either executed OR is a trigger block (triggers don't need prior execution)
const isSourceSatisfied = (sourceId: string) => {
if (snapshot?.executedBlocks.includes(sourceId)) return true
// Check if source is a trigger (has no incoming edges itself)
const sourceIncomingEdges = edges.filter((edge) => edge.target === sourceId)
return sourceIncomingEdges.length === 0
}
// Non-trigger blocks need a snapshot to exist (so upstream outputs are available)
const dependenciesSatisfied =
isTriggerBlock || (snapshot && incomingEdges.every((edge) => isSourceSatisfied(edge.source)))
const isNoteBlock = block.type === 'note'
const isInsideSubflow =
block.parentId && (block.parentType === 'loop' || block.parentType === 'parallel')
if (isInsideSubflow) return { canRun: false, reason: 'Cannot run from inside subflow' }
if (!dependenciesSatisfied) return { canRun: false, reason: 'Run upstream blocks first' }
if (isNoteBlock) return { canRun: false, reason: undefined }
if (isExecuting) return { canRun: false, reason: undefined }
return { canRun: true, reason: undefined }
}, [contextMenuBlocks, edges, workflowIdParam, getLastExecutionSnapshot, isExecuting])
const handleContextAddBlock = useCallback(() => {
useSearchModalStore.getState().open()
}, [])
@@ -3468,19 +3418,11 @@ const WorkflowContent = React.memo(() => {
onRemoveFromSubflow={handleContextRemoveFromSubflow}
onOpenEditor={handleContextOpenEditor}
onRename={handleContextRename}
onRunFromBlock={handleContextRunFromBlock}
onRunUntilBlock={handleContextRunUntilBlock}
hasClipboard={hasClipboard()}
showRemoveFromSubflow={contextMenuBlocks.some(
(b) => b.parentId && (b.parentType === 'loop' || b.parentType === 'parallel')
)}
canRunFromBlock={runFromBlockState.canRun}
disableEdit={!effectivePermissions.canEdit}
isExecuting={isExecuting}
isPositionalTrigger={
contextMenuBlocks.length === 1 &&
edges.filter((e) => e.target === contextMenuBlocks[0]?.id).length === 0
}
/>
<CanvasMenu

View File

@@ -33,15 +33,6 @@ export interface DAG {
parallelConfigs: Map<string, SerializedParallel>
}
export interface DAGBuildOptions {
/** Trigger block ID to start path construction from */
triggerBlockId?: string
/** Saved incoming edges from snapshot for resumption */
savedIncomingEdges?: Record<string, string[]>
/** Include all enabled blocks instead of only those reachable from trigger */
includeAllBlocks?: boolean
}
export class DAGBuilder {
private pathConstructor = new PathConstructor()
private loopConstructor = new LoopConstructor()
@@ -49,9 +40,11 @@ export class DAGBuilder {
private nodeConstructor = new NodeConstructor()
private edgeConstructor = new EdgeConstructor()
build(workflow: SerializedWorkflow, options: DAGBuildOptions = {}): DAG {
const { triggerBlockId, savedIncomingEdges, includeAllBlocks } = options
build(
workflow: SerializedWorkflow,
triggerBlockId?: string,
savedIncomingEdges?: Record<string, string[]>
): DAG {
const dag: DAG = {
nodes: new Map(),
loopConfigs: new Map(),
@@ -60,7 +53,7 @@ export class DAGBuilder {
this.initializeConfigs(workflow, dag)
const reachableBlocks = this.pathConstructor.execute(workflow, triggerBlockId, includeAllBlocks)
const reachableBlocks = this.pathConstructor.execute(workflow, triggerBlockId)
this.loopConstructor.execute(dag, reachableBlocks)
this.parallelConstructor.execute(dag, reachableBlocks)

View File

@@ -6,16 +6,7 @@ import type { SerializedBlock, SerializedWorkflow } from '@/serializer/types'
const logger = createLogger('PathConstructor')
export class PathConstructor {
execute(
workflow: SerializedWorkflow,
triggerBlockId?: string,
includeAllBlocks?: boolean
): Set<string> {
// For run-from-block mode, include all enabled blocks regardless of trigger reachability
if (includeAllBlocks) {
return this.getAllEnabledBlocks(workflow)
}
execute(workflow: SerializedWorkflow, triggerBlockId?: string): Set<string> {
const resolvedTriggerId = this.findTriggerBlock(workflow, triggerBlockId)
if (!resolvedTriggerId) {

View File

@@ -28,6 +28,7 @@ import type {
} from '@/executor/types'
import { streamingResponseFormatProcessor } from '@/executor/utils'
import { buildBlockExecutionError, normalizeError } from '@/executor/utils/errors'
import { isJSONString } from '@/executor/utils/json'
import { filterOutputForLog } from '@/executor/utils/output-filter'
import { validateBlockType } from '@/executor/utils/permission-check'
import type { VariableResolver } from '@/executor/variables/resolver'
@@ -86,7 +87,7 @@ export class BlockExecutor {
resolvedInputs = this.resolver.resolveInputs(ctx, node.id, block.config.params, block)
if (blockLog) {
blockLog.input = resolvedInputs
blockLog.input = this.parseJsonInputs(resolvedInputs)
}
} catch (error) {
cleanupSelfReference?.()
@@ -157,7 +158,14 @@ export class BlockExecutor {
const displayOutput = filterOutputForLog(block.metadata?.id || '', normalizedOutput, {
block,
})
this.callOnBlockComplete(ctx, node, block, resolvedInputs, displayOutput, duration)
this.callOnBlockComplete(
ctx,
node,
block,
this.parseJsonInputs(resolvedInputs),
displayOutput,
duration
)
}
return normalizedOutput
@@ -233,7 +241,7 @@ export class BlockExecutor {
blockLog.durationMs = duration
blockLog.success = false
blockLog.error = errorMessage
blockLog.input = input
blockLog.input = this.parseJsonInputs(input)
blockLog.output = filterOutputForLog(block.metadata?.id || '', errorOutput, { block })
}
@@ -248,7 +256,14 @@ export class BlockExecutor {
if (!isSentinel) {
const displayOutput = filterOutputForLog(block.metadata?.id || '', errorOutput, { block })
this.callOnBlockComplete(ctx, node, block, input, displayOutput, duration)
this.callOnBlockComplete(
ctx,
node,
block,
this.parseJsonInputs(input),
displayOutput,
duration
)
}
const hasErrorPort = this.hasErrorPortEdge(node)
@@ -336,6 +351,36 @@ export class BlockExecutor {
return { result: output }
}
/**
* Parse JSON string inputs to objects for log display only.
* Attempts to parse any string that looks like JSON.
* Returns a new object - does not mutate the original inputs.
*/
private parseJsonInputs(inputs: Record<string, any>): Record<string, any> {
let result = inputs
let hasChanges = false
for (const [key, value] of Object.entries(inputs)) {
// isJSONString is a quick heuristic (checks for { or [), not a validator.
// Invalid JSON is safely caught below - this just avoids JSON.parse on every string.
if (typeof value !== 'string' || !isJSONString(value)) {
continue
}
try {
if (!hasChanges) {
result = { ...inputs }
hasChanges = true
}
result[key] = JSON.parse(value.trim())
} catch {
// Not valid JSON, keep original string
}
}
return result
}
private callOnBlockStart(ctx: ExecutionContext, node: DAGNode, block: SerializedBlock): void {
const blockId = node.id
const blockName = block.metadata?.name ?? blockId

View File

@@ -26,7 +26,6 @@ export class ExecutionEngine {
private allowResumeTriggers: boolean
private cancelledFlag = false
private errorFlag = false
private stoppedEarlyFlag = false
private executionError: Error | null = null
private lastCancellationCheck = 0
private readonly useRedisCancellation: boolean
@@ -106,7 +105,7 @@ export class ExecutionEngine {
this.initializeQueue(triggerBlockId)
while (this.hasWork()) {
if ((await this.checkCancellation()) || this.errorFlag || this.stoppedEarlyFlag) {
if ((await this.checkCancellation()) || this.errorFlag) {
break
}
await this.processQueue()
@@ -260,16 +259,6 @@ export class ExecutionEngine {
}
private initializeQueue(triggerBlockId?: string): void {
if (this.context.runFromBlockContext) {
const { startBlockId } = this.context.runFromBlockContext
logger.info('Initializing queue for run-from-block mode', {
startBlockId,
dirtySetSize: this.context.runFromBlockContext.dirtySet.size,
})
this.addToQueue(startBlockId)
return
}
const pendingBlocks = this.context.metadata.pendingBlocks
const remainingEdges = (this.context.metadata as any).remainingEdges
@@ -396,17 +385,6 @@ export class ExecutionEngine {
this.finalOutput = output
}
if (this.context.stopAfterBlockId === nodeId) {
// For loop/parallel sentinels, only stop if the subflow has fully exited (all iterations done)
// shouldContinue: true means more iterations, shouldExit: true means loop is done
const shouldContinueLoop = output.shouldContinue === true
if (!shouldContinueLoop) {
logger.info('Stopping execution after target block', { nodeId })
this.stoppedEarlyFlag = true
return
}
}
const readyNodes = this.edgeManager.processOutgoingEdges(node, output, false)
logger.info('Processing outgoing edges', {

View File

@@ -5,31 +5,17 @@ import { BlockExecutor } from '@/executor/execution/block-executor'
import { EdgeManager } from '@/executor/execution/edge-manager'
import { ExecutionEngine } from '@/executor/execution/engine'
import { ExecutionState } from '@/executor/execution/state'
import type {
ContextExtensions,
SerializableExecutionState,
WorkflowInput,
} from '@/executor/execution/types'
import type { ContextExtensions, WorkflowInput } from '@/executor/execution/types'
import { createBlockHandlers } from '@/executor/handlers/registry'
import { LoopOrchestrator } from '@/executor/orchestrators/loop'
import { NodeExecutionOrchestrator } from '@/executor/orchestrators/node'
import { ParallelOrchestrator } from '@/executor/orchestrators/parallel'
import type { BlockState, ExecutionContext, ExecutionResult } from '@/executor/types'
import {
computeExecutionSets,
type RunFromBlockContext,
resolveContainerToSentinelStart,
validateRunFromBlock,
} from '@/executor/utils/run-from-block'
import {
buildResolutionFromBlock,
buildStartBlockOutput,
resolveExecutorStartBlock,
} from '@/executor/utils/start-block'
import {
extractLoopIdFromSentinel,
extractParallelIdFromSentinel,
} from '@/executor/utils/subflow-utils'
import { VariableResolver } from '@/executor/variables/resolver'
import type { SerializedWorkflow } from '@/serializer/types'
@@ -62,10 +48,7 @@ export class DAGExecutor {
async execute(workflowId: string, triggerBlockId?: string): Promise<ExecutionResult> {
const savedIncomingEdges = this.contextExtensions.dagIncomingEdges
const dag = this.dagBuilder.build(this.workflow, {
triggerBlockId,
savedIncomingEdges,
})
const dag = this.dagBuilder.build(this.workflow, triggerBlockId, savedIncomingEdges)
const { context, state } = this.createExecutionContext(workflowId, triggerBlockId)
const resolver = new VariableResolver(this.workflow, this.workflowVariables, state)
@@ -106,151 +89,17 @@ export class DAGExecutor {
}
}
/**
* Execute from a specific block using cached outputs for upstream blocks.
*/
async executeFromBlock(
workflowId: string,
startBlockId: string,
sourceSnapshot: SerializableExecutionState
): Promise<ExecutionResult> {
// Build full DAG with all blocks to compute upstream set for snapshot filtering
// includeAllBlocks is needed because the startBlockId might be a trigger not reachable from the main trigger
const dag = this.dagBuilder.build(this.workflow, { includeAllBlocks: true })
const executedBlocks = new Set(sourceSnapshot.executedBlocks)
const validation = validateRunFromBlock(startBlockId, dag, executedBlocks)
if (!validation.valid) {
throw new Error(validation.error)
}
const { dirtySet, upstreamSet } = computeExecutionSets(dag, startBlockId)
const effectiveStartBlockId = resolveContainerToSentinelStart(startBlockId, dag) ?? startBlockId
// Extract container IDs from sentinel IDs in upstream set
const upstreamContainerIds = new Set<string>()
for (const nodeId of upstreamSet) {
const loopId = extractLoopIdFromSentinel(nodeId)
if (loopId) upstreamContainerIds.add(loopId)
const parallelId = extractParallelIdFromSentinel(nodeId)
if (parallelId) upstreamContainerIds.add(parallelId)
}
// Filter snapshot to only include upstream blocks - prevents references to non-upstream blocks
const filteredBlockStates: Record<string, any> = {}
for (const [blockId, state] of Object.entries(sourceSnapshot.blockStates)) {
if (upstreamSet.has(blockId) || upstreamContainerIds.has(blockId)) {
filteredBlockStates[blockId] = state
}
}
const filteredExecutedBlocks = sourceSnapshot.executedBlocks.filter(
(id) => upstreamSet.has(id) || upstreamContainerIds.has(id)
)
// Filter loop/parallel executions to only include upstream containers
const filteredLoopExecutions: Record<string, any> = {}
if (sourceSnapshot.loopExecutions) {
for (const [loopId, execution] of Object.entries(sourceSnapshot.loopExecutions)) {
if (upstreamContainerIds.has(loopId)) {
filteredLoopExecutions[loopId] = execution
}
}
}
const filteredParallelExecutions: Record<string, any> = {}
if (sourceSnapshot.parallelExecutions) {
for (const [parallelId, execution] of Object.entries(sourceSnapshot.parallelExecutions)) {
if (upstreamContainerIds.has(parallelId)) {
filteredParallelExecutions[parallelId] = execution
}
}
}
const filteredSnapshot: SerializableExecutionState = {
...sourceSnapshot,
blockStates: filteredBlockStates,
executedBlocks: filteredExecutedBlocks,
loopExecutions: filteredLoopExecutions,
parallelExecutions: filteredParallelExecutions,
}
logger.info('Executing from block', {
workflowId,
startBlockId,
effectiveStartBlockId,
dirtySetSize: dirtySet.size,
upstreamSetSize: upstreamSet.size,
})
// Remove incoming edges from non-dirty sources so convergent blocks don't wait for cached upstream
for (const nodeId of dirtySet) {
const node = dag.nodes.get(nodeId)
if (!node) continue
const nonDirtyIncoming: string[] = []
for (const sourceId of node.incomingEdges) {
if (!dirtySet.has(sourceId)) {
nonDirtyIncoming.push(sourceId)
}
}
for (const sourceId of nonDirtyIncoming) {
node.incomingEdges.delete(sourceId)
}
}
const runFromBlockContext = { startBlockId: effectiveStartBlockId, dirtySet }
const { context, state } = this.createExecutionContext(workflowId, undefined, {
snapshotState: filteredSnapshot,
runFromBlockContext,
})
const resolver = new VariableResolver(this.workflow, this.workflowVariables, state)
const loopOrchestrator = new LoopOrchestrator(dag, state, resolver)
loopOrchestrator.setContextExtensions(this.contextExtensions)
const parallelOrchestrator = new ParallelOrchestrator(dag, state)
parallelOrchestrator.setResolver(resolver)
parallelOrchestrator.setContextExtensions(this.contextExtensions)
const allHandlers = createBlockHandlers()
const blockExecutor = new BlockExecutor(allHandlers, resolver, this.contextExtensions, state)
const edgeManager = new EdgeManager(dag)
loopOrchestrator.setEdgeManager(edgeManager)
const nodeOrchestrator = new NodeExecutionOrchestrator(
dag,
state,
blockExecutor,
loopOrchestrator,
parallelOrchestrator
)
const engine = new ExecutionEngine(context, dag, edgeManager, nodeOrchestrator)
return await engine.run()
}
private createExecutionContext(
workflowId: string,
triggerBlockId?: string,
overrides?: {
snapshotState?: SerializableExecutionState
runFromBlockContext?: RunFromBlockContext
}
triggerBlockId?: string
): { context: ExecutionContext; state: ExecutionState } {
const snapshotState = overrides?.snapshotState ?? this.contextExtensions.snapshotState
const snapshotState = this.contextExtensions.snapshotState
const blockStates = snapshotState?.blockStates
? new Map(Object.entries(snapshotState.blockStates))
: new Map<string, BlockState>()
let executedBlocks = snapshotState?.executedBlocks
const executedBlocks = snapshotState?.executedBlocks
? new Set(snapshotState.executedBlocks)
: new Set<string>()
if (overrides?.runFromBlockContext) {
const { dirtySet } = overrides.runFromBlockContext
executedBlocks = new Set([...executedBlocks].filter((id) => !dirtySet.has(id)))
logger.info('Cleared executed status for dirty blocks', {
dirtySetSize: dirtySet.size,
remainingExecutedBlocks: executedBlocks.size,
})
}
const state = new ExecutionState(blockStates, executedBlocks)
const context: ExecutionContext = {
@@ -260,7 +109,7 @@ export class DAGExecutor {
userId: this.contextExtensions.userId,
isDeployedContext: this.contextExtensions.isDeployedContext,
blockStates: state.getBlockStates(),
blockLogs: overrides?.runFromBlockContext ? [] : (snapshotState?.blockLogs ?? []),
blockLogs: snapshotState?.blockLogs ?? [],
metadata: {
...this.contextExtensions.metadata,
startTime: new Date().toISOString(),
@@ -320,8 +169,6 @@ export class DAGExecutor {
abortSignal: this.contextExtensions.abortSignal,
includeFileBase64: this.contextExtensions.includeFileBase64,
base64MaxBytes: this.contextExtensions.base64MaxBytes,
runFromBlockContext: overrides?.runFromBlockContext,
stopAfterBlockId: this.contextExtensions.stopAfterBlockId,
}
if (this.contextExtensions.resumeFromSnapshot) {
@@ -346,15 +193,6 @@ export class DAGExecutor {
pendingBlocks: context.metadata.pendingBlocks,
skipStarterBlockInit: true,
})
} else if (overrides?.runFromBlockContext) {
// In run-from-block mode, initialize the start block only if it's a regular block
// Skip for sentinels/containers (loop/parallel) which aren't real blocks
const startBlockId = overrides.runFromBlockContext.startBlockId
const isRegularBlock = this.workflow.blocks.some((b) => b.id === startBlockId)
if (isRegularBlock) {
this.initializeStarterBlock(context, state, startBlockId)
}
} else {
this.initializeStarterBlock(context, state, triggerBlockId)
}

View File

@@ -1,6 +1,5 @@
import type { Edge } from 'reactflow'
import type { BlockLog, BlockState, NormalizedBlockOutput } from '@/executor/types'
import type { RunFromBlockContext } from '@/executor/utils/run-from-block'
import type { SubflowType } from '@/stores/workflows/workflow/types'
export interface ExecutionMetadata {
@@ -106,17 +105,6 @@ export interface ContextExtensions {
output: { input?: any; output: NormalizedBlockOutput; executionTime: number },
iterationContext?: IterationContext
) => Promise<void>
/**
* Run-from-block configuration. When provided, executor runs in partial
* execution mode starting from the specified block.
*/
runFromBlockContext?: RunFromBlockContext
/**
* Stop execution after this block completes. Used for "run until block" feature.
*/
stopAfterBlockId?: string
}
export interface WorkflowInput {

View File

@@ -936,8 +936,12 @@ export class AgentBlockHandler implements BlockHandler {
systemPrompt: validMessages ? undefined : inputs.systemPrompt,
context: validMessages ? undefined : stringifyJSON(messages),
tools: formattedTools,
temperature: inputs.temperature,
maxTokens: inputs.maxTokens,
temperature:
inputs.temperature != null && inputs.temperature !== ''
? Number(inputs.temperature)
: undefined,
maxTokens:
inputs.maxTokens != null && inputs.maxTokens !== '' ? Number(inputs.maxTokens) : undefined,
apiKey: inputs.apiKey,
azureEndpoint: inputs.azureEndpoint,
azureApiVersion: inputs.azureApiVersion,

View File

@@ -14,8 +14,8 @@ export interface AgentInputs {
slidingWindowSize?: string // For message-based sliding window
slidingWindowTokens?: string // For token-based sliding window
// LLM parameters
temperature?: number
maxTokens?: number
temperature?: string
maxTokens?: string
apiKey?: string
azureEndpoint?: string
azureApiVersion?: string

View File

@@ -276,16 +276,7 @@ export class LoopOrchestrator {
scope: LoopScope
): LoopContinuationResult {
const results = scope.allIterationOutputs
const output = { results }
this.state.setBlockOutput(loopId, output, DEFAULTS.EXECUTION_TIME)
// Emit onBlockComplete for the loop container so the UI can track it
if (this.contextExtensions?.onBlockComplete) {
this.contextExtensions.onBlockComplete(loopId, 'Loop', 'loop', {
output,
executionTime: DEFAULTS.EXECUTION_TIME,
})
}
this.state.setBlockOutput(loopId, { results }, DEFAULTS.EXECUTION_TIME)
return {
shouldContinue: false,

View File

@@ -31,18 +31,7 @@ export class NodeExecutionOrchestrator {
throw new Error(`Node not found in DAG: ${nodeId}`)
}
if (ctx.runFromBlockContext && !ctx.runFromBlockContext.dirtySet.has(nodeId)) {
const cachedOutput = this.state.getBlockOutput(nodeId) || {}
logger.debug('Skipping non-dirty block in run-from-block mode', { nodeId })
return {
nodeId,
output: cachedOutput,
isFinalOutput: false,
}
}
const isDirtyBlock = ctx.runFromBlockContext?.dirtySet.has(nodeId) ?? false
if (!isDirtyBlock && this.state.hasExecuted(nodeId)) {
if (this.state.hasExecuted(nodeId)) {
const output = this.state.getBlockOutput(nodeId) || {}
return {
nodeId,

View File

@@ -228,17 +228,9 @@ export class ParallelOrchestrator {
const branchOutputs = scope.branchOutputs.get(i) || []
results.push(branchOutputs)
}
const output = { results }
this.state.setBlockOutput(parallelId, output)
// Emit onBlockComplete for the parallel container so the UI can track it
if (this.contextExtensions?.onBlockComplete) {
this.contextExtensions.onBlockComplete(parallelId, 'Parallel', 'parallel', {
output,
executionTime: 0,
})
}
this.state.setBlockOutput(parallelId, {
results,
})
return {
allBranchesComplete: true,
results,

View File

@@ -1,7 +1,6 @@
import type { TraceSpan } from '@/lib/logs/types'
import type { PermissionGroupConfig } from '@/lib/permission-groups/types'
import type { BlockOutput } from '@/blocks/types'
import type { RunFromBlockContext } from '@/executor/utils/run-from-block'
import type { SerializedBlock, SerializedWorkflow } from '@/serializer/types'
export interface UserFile {
@@ -251,17 +250,6 @@ export interface ExecutionContext {
* will not have their base64 content fetched.
*/
base64MaxBytes?: number
/**
* Context for "run from block" mode. When present, only blocks in dirtySet
* will be executed; others return cached outputs from the source snapshot.
*/
runFromBlockContext?: RunFromBlockContext
/**
* Stop execution after this block completes. Used for "run until block" feature.
*/
stopAfterBlockId?: string
}
export interface ExecutionResult {

View File

@@ -1,610 +0,0 @@
import { describe, expect, it } from 'vitest'
import type { DAG, DAGNode } from '@/executor/dag/builder'
import type { DAGEdge, NodeMetadata } from '@/executor/dag/types'
import { computeExecutionSets, validateRunFromBlock } from '@/executor/utils/run-from-block'
import type { SerializedLoop, SerializedParallel } from '@/serializer/types'
/**
* Helper to extract dirty set from computeExecutionSets
*/
function computeDirtySet(dag: DAG, startBlockId: string): Set<string> {
return computeExecutionSets(dag, startBlockId).dirtySet
}
/**
* Helper to create a DAG node for testing
*/
function createNode(
id: string,
outgoingEdges: Array<{ target: string; sourceHandle?: string }> = [],
metadata: Partial<NodeMetadata> = {}
): DAGNode {
const edges = new Map<string, DAGEdge>()
for (const edge of outgoingEdges) {
edges.set(edge.target, { target: edge.target, sourceHandle: edge.sourceHandle })
}
return {
id,
block: {
id,
position: { x: 0, y: 0 },
config: { tool: 'test', params: {} },
inputs: {},
outputs: {},
metadata: { id: 'test', name: `block-${id}`, category: 'tools' },
enabled: true,
},
incomingEdges: new Set<string>(),
outgoingEdges: edges,
metadata: {
isParallelBranch: false,
isLoopNode: false,
isSentinel: false,
...metadata,
},
}
}
/**
* Helper to create a DAG for testing
*/
function createDAG(nodes: DAGNode[]): DAG {
const nodeMap = new Map<string, DAGNode>()
for (const node of nodes) {
nodeMap.set(node.id, node)
}
// Set up incoming edges based on outgoing edges
for (const node of nodes) {
for (const [, edge] of node.outgoingEdges) {
const targetNode = nodeMap.get(edge.target)
if (targetNode) {
targetNode.incomingEdges.add(node.id)
}
}
}
return {
nodes: nodeMap,
loopConfigs: new Map<string, SerializedLoop>(),
parallelConfigs: new Map<string, SerializedParallel>(),
}
}
describe('computeDirtySet', () => {
it('includes start block in dirty set', () => {
const dag = createDAG([createNode('A'), createNode('B'), createNode('C')])
const dirtySet = computeDirtySet(dag, 'B')
expect(dirtySet.has('B')).toBe(true)
})
it('includes all downstream blocks in linear workflow', () => {
// A → B → C → D
const dag = createDAG([
createNode('A', [{ target: 'B' }]),
createNode('B', [{ target: 'C' }]),
createNode('C', [{ target: 'D' }]),
createNode('D'),
])
const dirtySet = computeDirtySet(dag, 'B')
expect(dirtySet.has('A')).toBe(false)
expect(dirtySet.has('B')).toBe(true)
expect(dirtySet.has('C')).toBe(true)
expect(dirtySet.has('D')).toBe(true)
expect(dirtySet.size).toBe(3)
})
it('handles branching paths', () => {
// A → B → C
// ↓
// D → E
const dag = createDAG([
createNode('A', [{ target: 'B' }]),
createNode('B', [{ target: 'C' }, { target: 'D' }]),
createNode('C'),
createNode('D', [{ target: 'E' }]),
createNode('E'),
])
const dirtySet = computeDirtySet(dag, 'B')
expect(dirtySet.has('A')).toBe(false)
expect(dirtySet.has('B')).toBe(true)
expect(dirtySet.has('C')).toBe(true)
expect(dirtySet.has('D')).toBe(true)
expect(dirtySet.has('E')).toBe(true)
expect(dirtySet.size).toBe(4)
})
it('handles convergence points', () => {
// A → C
// B → C → D
const dag = createDAG([
createNode('A', [{ target: 'C' }]),
createNode('B', [{ target: 'C' }]),
createNode('C', [{ target: 'D' }]),
createNode('D'),
])
// Run from A: should include A, C, D (but not B)
const dirtySet = computeDirtySet(dag, 'A')
expect(dirtySet.has('A')).toBe(true)
expect(dirtySet.has('B')).toBe(false)
expect(dirtySet.has('C')).toBe(true)
expect(dirtySet.has('D')).toBe(true)
expect(dirtySet.size).toBe(3)
})
it('handles diamond pattern', () => {
// B
// ↗ ↘
// A D
// ↘ ↗
// C
const dag = createDAG([
createNode('A', [{ target: 'B' }, { target: 'C' }]),
createNode('B', [{ target: 'D' }]),
createNode('C', [{ target: 'D' }]),
createNode('D'),
])
const dirtySet = computeDirtySet(dag, 'A')
expect(dirtySet.has('A')).toBe(true)
expect(dirtySet.has('B')).toBe(true)
expect(dirtySet.has('C')).toBe(true)
expect(dirtySet.has('D')).toBe(true)
expect(dirtySet.size).toBe(4)
})
it('stops at graph boundaries', () => {
// A → B C → D (disconnected)
const dag = createDAG([
createNode('A', [{ target: 'B' }]),
createNode('B'),
createNode('C', [{ target: 'D' }]),
createNode('D'),
])
const dirtySet = computeDirtySet(dag, 'A')
expect(dirtySet.has('A')).toBe(true)
expect(dirtySet.has('B')).toBe(true)
expect(dirtySet.has('C')).toBe(false)
expect(dirtySet.has('D')).toBe(false)
expect(dirtySet.size).toBe(2)
})
it('handles single node workflow', () => {
const dag = createDAG([createNode('A')])
const dirtySet = computeDirtySet(dag, 'A')
expect(dirtySet.has('A')).toBe(true)
expect(dirtySet.size).toBe(1)
})
it('handles node not in DAG gracefully', () => {
const dag = createDAG([createNode('A'), createNode('B')])
const dirtySet = computeDirtySet(dag, 'nonexistent')
// Should just contain the start block ID even if not found
expect(dirtySet.has('nonexistent')).toBe(true)
expect(dirtySet.size).toBe(1)
})
it('includes convergent block when running from one branch of parallel', () => {
// Parallel branches converging:
// A → B → D
// A → C → D
// Running from B should include B and D (but not A or C)
const dag = createDAG([
createNode('A', [{ target: 'B' }, { target: 'C' }]),
createNode('B', [{ target: 'D' }]),
createNode('C', [{ target: 'D' }]),
createNode('D'),
])
const dirtySet = computeDirtySet(dag, 'B')
expect(dirtySet.has('A')).toBe(false)
expect(dirtySet.has('B')).toBe(true)
expect(dirtySet.has('C')).toBe(false)
expect(dirtySet.has('D')).toBe(true)
expect(dirtySet.size).toBe(2)
})
it('handles running from convergent block itself (all upstream non-dirty)', () => {
// A → C
// B → C
// Running from C should only include C
const dag = createDAG([
createNode('A', [{ target: 'C' }]),
createNode('B', [{ target: 'C' }]),
createNode('C', [{ target: 'D' }]),
createNode('D'),
])
const dirtySet = computeDirtySet(dag, 'C')
expect(dirtySet.has('A')).toBe(false)
expect(dirtySet.has('B')).toBe(false)
expect(dirtySet.has('C')).toBe(true)
expect(dirtySet.has('D')).toBe(true)
expect(dirtySet.size).toBe(2)
})
it('handles deep downstream chains', () => {
// A → B → C → D → E → F
// Running from C should include C, D, E, F
const dag = createDAG([
createNode('A', [{ target: 'B' }]),
createNode('B', [{ target: 'C' }]),
createNode('C', [{ target: 'D' }]),
createNode('D', [{ target: 'E' }]),
createNode('E', [{ target: 'F' }]),
createNode('F'),
])
const dirtySet = computeDirtySet(dag, 'C')
expect(dirtySet.has('A')).toBe(false)
expect(dirtySet.has('B')).toBe(false)
expect(dirtySet.has('C')).toBe(true)
expect(dirtySet.has('D')).toBe(true)
expect(dirtySet.has('E')).toBe(true)
expect(dirtySet.has('F')).toBe(true)
expect(dirtySet.size).toBe(4)
})
})
describe('validateRunFromBlock', () => {
it('accepts valid block', () => {
const dag = createDAG([createNode('A'), createNode('B')])
const executedBlocks = new Set(['A', 'B'])
const result = validateRunFromBlock('A', dag, executedBlocks)
expect(result.valid).toBe(true)
expect(result.error).toBeUndefined()
})
it('rejects block not found in DAG', () => {
const dag = createDAG([createNode('A')])
const executedBlocks = new Set(['A', 'B'])
const result = validateRunFromBlock('B', dag, executedBlocks)
expect(result.valid).toBe(false)
expect(result.error).toContain('Block not found')
})
it('rejects blocks inside loops', () => {
const dag = createDAG([createNode('A', [], { isLoopNode: true, loopId: 'loop-1' })])
const executedBlocks = new Set(['A'])
const result = validateRunFromBlock('A', dag, executedBlocks)
expect(result.valid).toBe(false)
expect(result.error).toContain('inside loop')
expect(result.error).toContain('loop-1')
})
it('rejects blocks inside parallels', () => {
const dag = createDAG([
createNode('A', [], { isParallelBranch: true, parallelId: 'parallel-1' }),
])
const executedBlocks = new Set(['A'])
const result = validateRunFromBlock('A', dag, executedBlocks)
expect(result.valid).toBe(false)
expect(result.error).toContain('inside parallel')
expect(result.error).toContain('parallel-1')
})
it('rejects sentinel nodes', () => {
const dag = createDAG([createNode('A', [], { isSentinel: true, sentinelType: 'start' })])
const executedBlocks = new Set(['A'])
const result = validateRunFromBlock('A', dag, executedBlocks)
expect(result.valid).toBe(false)
expect(result.error).toContain('sentinel')
})
it('rejects blocks with unexecuted upstream dependencies', () => {
// A → B, only A executed but B depends on A
const dag = createDAG([createNode('A', [{ target: 'B' }]), createNode('B')])
const executedBlocks = new Set<string>() // A was not executed
const result = validateRunFromBlock('B', dag, executedBlocks)
expect(result.valid).toBe(false)
expect(result.error).toContain('Upstream dependency not executed')
})
it('allows running from block when immediate predecessor was executed (ignores transitive)', () => {
// A → X → B → C, where X is new (not executed)
// Running from C is allowed because B (immediate predecessor) was executed
// C will use B's cached output - doesn't matter that X is new
const dag = createDAG([
createNode('A', [{ target: 'X' }]),
createNode('X', [{ target: 'B' }]),
createNode('B', [{ target: 'C' }]),
createNode('C'),
])
const executedBlocks = new Set(['A', 'B', 'C']) // X was not executed (new block)
const result = validateRunFromBlock('C', dag, executedBlocks)
// Valid because C's immediate predecessor B was executed
expect(result.valid).toBe(true)
})
it('allows blocks with no dependencies even if not previously executed', () => {
// A and B are independent (no edges)
const dag = createDAG([createNode('A'), createNode('B')])
const executedBlocks = new Set(['A']) // B was not executed but has no deps
const result = validateRunFromBlock('B', dag, executedBlocks)
expect(result.valid).toBe(true) // B has no incoming edges, so it's valid
})
it('accepts regular executed block', () => {
const dag = createDAG([
createNode('trigger', [{ target: 'A' }]),
createNode('A', [{ target: 'B' }]),
createNode('B'),
])
const executedBlocks = new Set(['trigger', 'A', 'B'])
const result = validateRunFromBlock('A', dag, executedBlocks)
expect(result.valid).toBe(true)
})
it('accepts loop container when executed', () => {
// Loop container with sentinel nodes
const loopId = 'loop-container-1'
const sentinelStartId = `loop-${loopId}-sentinel-start`
const sentinelEndId = `loop-${loopId}-sentinel-end`
const dag = createDAG([
createNode('A', [{ target: sentinelStartId }]),
createNode(sentinelStartId, [{ target: 'B' }], {
isSentinel: true,
sentinelType: 'start',
loopId,
}),
createNode('B', [{ target: sentinelEndId }], { isLoopNode: true, loopId }),
createNode(sentinelEndId, [{ target: 'C' }], {
isSentinel: true,
sentinelType: 'end',
loopId,
}),
createNode('C'),
])
dag.loopConfigs.set(loopId, { id: loopId, nodes: ['B'], iterations: 3, loopType: 'for' } as any)
const executedBlocks = new Set(['A', loopId, sentinelStartId, 'B', sentinelEndId, 'C'])
const result = validateRunFromBlock(loopId, dag, executedBlocks)
expect(result.valid).toBe(true)
})
it('accepts parallel container when executed', () => {
// Parallel container with sentinel nodes
const parallelId = 'parallel-container-1'
const sentinelStartId = `parallel-${parallelId}-sentinel-start`
const sentinelEndId = `parallel-${parallelId}-sentinel-end`
const dag = createDAG([
createNode('A', [{ target: sentinelStartId }]),
createNode(sentinelStartId, [{ target: 'B₍0₎' }], {
isSentinel: true,
sentinelType: 'start',
parallelId,
}),
createNode('B₍0₎', [{ target: sentinelEndId }], { isParallelBranch: true, parallelId }),
createNode(sentinelEndId, [{ target: 'C' }], {
isSentinel: true,
sentinelType: 'end',
parallelId,
}),
createNode('C'),
])
dag.parallelConfigs.set(parallelId, { id: parallelId, nodes: ['B'], count: 2 } as any)
const executedBlocks = new Set(['A', parallelId, sentinelStartId, 'B₍0₎', sentinelEndId, 'C'])
const result = validateRunFromBlock(parallelId, dag, executedBlocks)
expect(result.valid).toBe(true)
})
it('allows loop container with no upstream dependencies', () => {
// Loop containers are validated via their sentinel nodes, not incoming edges on the container itself
// If the loop has no upstream dependencies, it should be valid
const loopId = 'loop-container-1'
const sentinelStartId = `loop-${loopId}-sentinel-start`
const dag = createDAG([
createNode(sentinelStartId, [], { isSentinel: true, sentinelType: 'start', loopId }),
])
dag.loopConfigs.set(loopId, { id: loopId, nodes: [], iterations: 3, loopType: 'for' } as any)
const executedBlocks = new Set<string>() // Nothing executed but loop has no deps
const result = validateRunFromBlock(loopId, dag, executedBlocks)
// Loop container validation doesn't check incoming edges (containers don't have nodes in dag.nodes)
// So this is valid - the loop can start fresh
expect(result.valid).toBe(true)
})
})
describe('computeDirtySet with containers', () => {
it('includes loop container and all downstream when running from loop', () => {
// A → loop-sentinel-start → B (inside loop) → loop-sentinel-end → C
const loopId = 'loop-1'
const sentinelStartId = `loop-${loopId}-sentinel-start`
const sentinelEndId = `loop-${loopId}-sentinel-end`
const dag = createDAG([
createNode('A', [{ target: sentinelStartId }]),
createNode(sentinelStartId, [{ target: 'B' }], {
isSentinel: true,
sentinelType: 'start',
loopId,
}),
createNode('B', [{ target: sentinelEndId }], { isLoopNode: true, loopId }),
createNode(sentinelEndId, [{ target: 'C' }], {
isSentinel: true,
sentinelType: 'end',
loopId,
}),
createNode('C'),
])
dag.loopConfigs.set(loopId, { id: loopId, nodes: ['B'], iterations: 3, loopType: 'for' } as any)
const dirtySet = computeDirtySet(dag, loopId)
// Should include loop container, sentinel-start, B, sentinel-end, C
expect(dirtySet.has(loopId)).toBe(true)
expect(dirtySet.has(sentinelStartId)).toBe(true)
expect(dirtySet.has('B')).toBe(true)
expect(dirtySet.has(sentinelEndId)).toBe(true)
expect(dirtySet.has('C')).toBe(true)
// Should NOT include A (upstream)
expect(dirtySet.has('A')).toBe(false)
})
it('includes parallel container and all downstream when running from parallel', () => {
// A → parallel-sentinel-start → B₍0₎ → parallel-sentinel-end → C
const parallelId = 'parallel-1'
const sentinelStartId = `parallel-${parallelId}-sentinel-start`
const sentinelEndId = `parallel-${parallelId}-sentinel-end`
const dag = createDAG([
createNode('A', [{ target: sentinelStartId }]),
createNode(sentinelStartId, [{ target: 'B₍0₎' }], {
isSentinel: true,
sentinelType: 'start',
parallelId,
}),
createNode('B₍0₎', [{ target: sentinelEndId }], { isParallelBranch: true, parallelId }),
createNode(sentinelEndId, [{ target: 'C' }], {
isSentinel: true,
sentinelType: 'end',
parallelId,
}),
createNode('C'),
])
dag.parallelConfigs.set(parallelId, { id: parallelId, nodes: ['B'], count: 2 } as any)
const dirtySet = computeDirtySet(dag, parallelId)
// Should include parallel container, sentinel-start, B₍0₎, sentinel-end, C
expect(dirtySet.has(parallelId)).toBe(true)
expect(dirtySet.has(sentinelStartId)).toBe(true)
expect(dirtySet.has('B₍0₎')).toBe(true)
expect(dirtySet.has(sentinelEndId)).toBe(true)
expect(dirtySet.has('C')).toBe(true)
// Should NOT include A (upstream)
expect(dirtySet.has('A')).toBe(false)
})
})
describe('computeExecutionSets upstream set', () => {
it('includes all upstream blocks in linear workflow', () => {
// A → B → C → D
const dag = createDAG([
createNode('A', [{ target: 'B' }]),
createNode('B', [{ target: 'C' }]),
createNode('C', [{ target: 'D' }]),
createNode('D'),
])
const { upstreamSet } = computeExecutionSets(dag, 'C')
expect(upstreamSet.has('A')).toBe(true)
expect(upstreamSet.has('B')).toBe(true)
expect(upstreamSet.has('C')).toBe(false) // start block not in upstream
expect(upstreamSet.has('D')).toBe(false) // downstream
})
it('includes all branches in convergent upstream', () => {
// A → C
// B → C → D
const dag = createDAG([
createNode('A', [{ target: 'C' }]),
createNode('B', [{ target: 'C' }]),
createNode('C', [{ target: 'D' }]),
createNode('D'),
])
const { upstreamSet } = computeExecutionSets(dag, 'C')
expect(upstreamSet.has('A')).toBe(true)
expect(upstreamSet.has('B')).toBe(true)
expect(upstreamSet.has('C')).toBe(false)
expect(upstreamSet.has('D')).toBe(false)
})
it('excludes parallel branches not in upstream path', () => {
// A → B → D
// A → C → D
// Running from B: upstream is A only, not C
const dag = createDAG([
createNode('A', [{ target: 'B' }, { target: 'C' }]),
createNode('B', [{ target: 'D' }]),
createNode('C', [{ target: 'D' }]),
createNode('D'),
])
const { upstreamSet, dirtySet } = computeExecutionSets(dag, 'B')
// Upstream should only contain A
expect(upstreamSet.has('A')).toBe(true)
expect(upstreamSet.has('C')).toBe(false) // parallel branch, not upstream of B
// Dirty should contain B and D
expect(dirtySet.has('B')).toBe(true)
expect(dirtySet.has('D')).toBe(true)
expect(dirtySet.has('C')).toBe(false)
})
it('handles diamond pattern upstream correctly', () => {
// B
// ↗ ↘
// A D → E
// ↘ ↗
// C
// Running from D: upstream should be A, B, C
const dag = createDAG([
createNode('A', [{ target: 'B' }, { target: 'C' }]),
createNode('B', [{ target: 'D' }]),
createNode('C', [{ target: 'D' }]),
createNode('D', [{ target: 'E' }]),
createNode('E'),
])
const { upstreamSet, dirtySet } = computeExecutionSets(dag, 'D')
expect(upstreamSet.has('A')).toBe(true)
expect(upstreamSet.has('B')).toBe(true)
expect(upstreamSet.has('C')).toBe(true)
expect(upstreamSet.has('D')).toBe(false)
expect(dirtySet.has('D')).toBe(true)
expect(dirtySet.has('E')).toBe(true)
})
it('returns empty upstream set for root block', () => {
const dag = createDAG([createNode('A', [{ target: 'B' }]), createNode('B')])
const { upstreamSet } = computeExecutionSets(dag, 'A')
expect(upstreamSet.size).toBe(0)
})
})

View File

@@ -1,193 +0,0 @@
import { LOOP, PARALLEL } from '@/executor/constants'
import type { DAG } from '@/executor/dag/builder'
/**
* Builds the sentinel-start node ID for a loop.
*/
function buildLoopSentinelStartId(loopId: string): string {
return `${LOOP.SENTINEL.PREFIX}${loopId}${LOOP.SENTINEL.START_SUFFIX}`
}
/**
* Builds the sentinel-start node ID for a parallel.
*/
function buildParallelSentinelStartId(parallelId: string): string {
return `${PARALLEL.SENTINEL.PREFIX}${parallelId}${PARALLEL.SENTINEL.START_SUFFIX}`
}
/**
* Checks if a block ID is a loop or parallel container and returns the sentinel-start ID if so.
* Returns null if the block is not a container.
*/
export function resolveContainerToSentinelStart(blockId: string, dag: DAG): string | null {
if (dag.loopConfigs.has(blockId)) {
return buildLoopSentinelStartId(blockId)
}
if (dag.parallelConfigs.has(blockId)) {
return buildParallelSentinelStartId(blockId)
}
return null
}
/**
* Result of validating a block for run-from-block execution.
*/
export interface RunFromBlockValidation {
valid: boolean
error?: string
}
/**
* Context for run-from-block execution mode.
*/
export interface RunFromBlockContext {
/** The block ID to start execution from */
startBlockId: string
/** Set of block IDs that need re-execution (start block + all downstream) */
dirtySet: Set<string>
}
/**
* Result of computing execution sets for run-from-block mode.
*/
export interface ExecutionSets {
/** Blocks that need re-execution (start block + all downstream) */
dirtySet: Set<string>
/** Blocks that are upstream (ancestors) of the start block */
upstreamSet: Set<string>
}
/**
* Computes both the dirty set (downstream) and upstream set in a single traversal pass.
* - Dirty set: start block + all blocks reachable via outgoing edges (need re-execution)
* - Upstream set: all blocks reachable via incoming edges (can be referenced)
*
* For loop/parallel containers, starts from the sentinel-start node and includes
* the container ID itself in the dirty set.
*
* @param dag - The workflow DAG
* @param startBlockId - The block to start execution from
* @returns Object containing both dirtySet and upstreamSet
*/
export function computeExecutionSets(dag: DAG, startBlockId: string): ExecutionSets {
const dirty = new Set<string>([startBlockId])
const upstream = new Set<string>()
const sentinelStartId = resolveContainerToSentinelStart(startBlockId, dag)
const traversalStartId = sentinelStartId ?? startBlockId
if (sentinelStartId) {
dirty.add(sentinelStartId)
}
// BFS downstream for dirty set
const downstreamQueue = [traversalStartId]
while (downstreamQueue.length > 0) {
const nodeId = downstreamQueue.shift()!
const node = dag.nodes.get(nodeId)
if (!node) continue
for (const [, edge] of node.outgoingEdges) {
if (!dirty.has(edge.target)) {
dirty.add(edge.target)
downstreamQueue.push(edge.target)
}
}
}
// BFS upstream for upstream set
const upstreamQueue = [traversalStartId]
while (upstreamQueue.length > 0) {
const nodeId = upstreamQueue.shift()!
const node = dag.nodes.get(nodeId)
if (!node) continue
for (const sourceId of node.incomingEdges) {
if (!upstream.has(sourceId)) {
upstream.add(sourceId)
upstreamQueue.push(sourceId)
}
}
}
return { dirtySet: dirty, upstreamSet: upstream }
}
/**
* Validates that a block can be used as a run-from-block starting point.
*
* Validation rules:
* - Block must exist in the DAG (or be a loop/parallel container)
* - Block cannot be inside a loop (but loop containers are allowed)
* - Block cannot be inside a parallel (but parallel containers are allowed)
* - Block cannot be a sentinel node
* - All upstream dependencies must have been executed (have cached outputs)
*
* @param blockId - The block ID to validate
* @param dag - The workflow DAG
* @param executedBlocks - Set of blocks that were executed in the source run
* @returns Validation result with error message if invalid
*/
export function validateRunFromBlock(
blockId: string,
dag: DAG,
executedBlocks: Set<string>
): RunFromBlockValidation {
const node = dag.nodes.get(blockId)
const isLoopContainer = dag.loopConfigs.has(blockId)
const isParallelContainer = dag.parallelConfigs.has(blockId)
const isContainer = isLoopContainer || isParallelContainer
if (!node && !isContainer) {
return { valid: false, error: `Block not found in workflow: ${blockId}` }
}
if (isContainer) {
const sentinelStartId = resolveContainerToSentinelStart(blockId, dag)
if (!sentinelStartId || !dag.nodes.has(sentinelStartId)) {
return {
valid: false,
error: `Container sentinel not found for: ${blockId}`,
}
}
}
if (node) {
if (node.metadata.isLoopNode) {
return {
valid: false,
error: `Cannot run from block inside loop: ${node.metadata.loopId}`,
}
}
if (node.metadata.isParallelBranch) {
return {
valid: false,
error: `Cannot run from block inside parallel: ${node.metadata.parallelId}`,
}
}
if (node.metadata.isSentinel) {
return { valid: false, error: 'Cannot run from sentinel node' }
}
// Check immediate upstream dependencies were executed
for (const sourceId of node.incomingEdges) {
const sourceNode = dag.nodes.get(sourceId)
// Skip sentinel nodes - they're internal and not in executedBlocks
if (sourceNode?.metadata.isSentinel) continue
// Skip trigger nodes - they're entry points and don't need prior execution
// A trigger node has no incoming edges
if (sourceNode && sourceNode.incomingEdges.size === 0) continue
if (!executedBlocks.has(sourceId)) {
return {
valid: false,
error: `Upstream dependency not executed: ${sourceId}`,
}
}
}
}
return { valid: true }
}

View File

@@ -1,85 +1,10 @@
import { useCallback, useRef } from 'react'
import { createLogger } from '@sim/logger'
import type { ExecutionEvent } from '@/lib/workflows/executor/execution-events'
import type { SerializableExecutionState } from '@/executor/execution/types'
import type { SubflowType } from '@/stores/workflows/workflow/types'
const logger = createLogger('useExecutionStream')
/**
* Processes SSE events from a response body and invokes appropriate callbacks.
*/
async function processSSEStream(
reader: ReadableStreamDefaultReader<Uint8Array>,
callbacks: ExecutionStreamCallbacks,
logPrefix: string
): Promise<void> {
const decoder = new TextDecoder()
let buffer = ''
try {
while (true) {
const { done, value } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
const lines = buffer.split('\n\n')
buffer = lines.pop() || ''
for (const line of lines) {
if (!line.trim() || !line.startsWith('data: ')) continue
const data = line.substring(6).trim()
if (data === '[DONE]') {
logger.info(`${logPrefix} stream completed`)
continue
}
try {
const event = JSON.parse(data) as ExecutionEvent
switch (event.type) {
case 'execution:started':
callbacks.onExecutionStarted?.(event.data)
break
case 'execution:completed':
callbacks.onExecutionCompleted?.(event.data)
break
case 'execution:error':
callbacks.onExecutionError?.(event.data)
break
case 'execution:cancelled':
callbacks.onExecutionCancelled?.(event.data)
break
case 'block:started':
callbacks.onBlockStarted?.(event.data)
break
case 'block:completed':
callbacks.onBlockCompleted?.(event.data)
break
case 'block:error':
callbacks.onBlockError?.(event.data)
break
case 'stream:chunk':
callbacks.onStreamChunk?.(event.data)
break
case 'stream:done':
callbacks.onStreamDone?.(event.data)
break
default:
logger.warn('Unknown event type:', (event as any).type)
}
} catch (error) {
logger.error('Failed to parse SSE event:', error, { data })
}
}
}
} finally {
reader.releaseLock()
}
}
export interface ExecutionStreamCallbacks {
onExecutionStarted?: (data: { startTime: string }) => void
onExecutionCompleted?: (data: {
@@ -143,15 +68,6 @@ export interface ExecuteStreamOptions {
loops?: Record<string, any>
parallels?: Record<string, any>
}
stopAfterBlockId?: string
callbacks?: ExecutionStreamCallbacks
}
export interface ExecuteFromBlockOptions {
workflowId: string
startBlockId: string
sourceSnapshot: SerializableExecutionState
input?: any
callbacks?: ExecutionStreamCallbacks
}
@@ -203,7 +119,91 @@ export function useExecutionStream() {
}
const reader = response.body.getReader()
await processSSEStream(reader, callbacks, 'Execution')
const decoder = new TextDecoder()
let buffer = ''
try {
while (true) {
const { done, value } = await reader.read()
if (done) {
break
}
buffer += decoder.decode(value, { stream: true })
const lines = buffer.split('\n\n')
buffer = lines.pop() || ''
for (const line of lines) {
if (!line.trim() || !line.startsWith('data: ')) {
continue
}
const data = line.substring(6).trim()
if (data === '[DONE]') {
logger.info('Stream completed')
continue
}
try {
const event = JSON.parse(data) as ExecutionEvent
logger.info('📡 SSE Event received:', {
type: event.type,
executionId: event.executionId,
data: event.data,
})
switch (event.type) {
case 'execution:started':
logger.info('🚀 Execution started')
callbacks.onExecutionStarted?.(event.data)
break
case 'execution:completed':
logger.info('✅ Execution completed')
callbacks.onExecutionCompleted?.(event.data)
break
case 'execution:error':
logger.error('❌ Execution error')
callbacks.onExecutionError?.(event.data)
break
case 'execution:cancelled':
logger.warn('🛑 Execution cancelled')
callbacks.onExecutionCancelled?.(event.data)
break
case 'block:started':
logger.info('🔷 Block started:', event.data.blockId)
callbacks.onBlockStarted?.(event.data)
break
case 'block:completed':
logger.info('✓ Block completed:', event.data.blockId)
callbacks.onBlockCompleted?.(event.data)
break
case 'block:error':
logger.error('✗ Block error:', event.data.blockId)
callbacks.onBlockError?.(event.data)
break
case 'stream:chunk':
callbacks.onStreamChunk?.(event.data)
break
case 'stream:done':
logger.info('Stream done:', event.data.blockId)
callbacks.onStreamDone?.(event.data)
break
default:
logger.warn('Unknown event type:', (event as any).type)
}
} catch (error) {
logger.error('Failed to parse SSE event:', error, { data })
}
}
}
} finally {
reader.releaseLock()
}
} catch (error: any) {
if (error.name === 'AbortError') {
logger.info('Execution stream cancelled')
@@ -222,70 +222,6 @@ export function useExecutionStream() {
}
}, [])
const executeFromBlock = useCallback(async (options: ExecuteFromBlockOptions) => {
const { workflowId, startBlockId, sourceSnapshot, input, callbacks = {} } = options
if (abortControllerRef.current) {
abortControllerRef.current.abort()
}
const abortController = new AbortController()
abortControllerRef.current = abortController
currentExecutionRef.current = null
try {
const response = await fetch(`/api/workflows/${workflowId}/execute-from-block`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ startBlockId, sourceSnapshot, input }),
signal: abortController.signal,
})
if (!response.ok) {
let errorResponse: any
try {
errorResponse = await response.json()
} catch {
throw new Error(`Server error (${response.status}): ${response.statusText}`)
}
const error = new Error(errorResponse.error || 'Failed to start execution')
if (errorResponse && typeof errorResponse === 'object') {
Object.assign(error, { executionResult: errorResponse })
}
throw error
}
if (!response.body) {
throw new Error('No response body')
}
const executionId = response.headers.get('X-Execution-Id')
if (executionId) {
currentExecutionRef.current = { workflowId, executionId }
}
const reader = response.body.getReader()
await processSSEStream(reader, callbacks, 'Run-from-block')
} catch (error: any) {
if (error.name === 'AbortError') {
logger.info('Run-from-block execution cancelled')
callbacks.onExecutionCancelled?.({ duration: 0 })
} else {
logger.error('Run-from-block execution error:', error)
callbacks.onExecutionError?.({
error: error.message || 'Unknown error',
duration: 0,
})
}
throw error
} finally {
abortControllerRef.current = null
currentExecutionRef.current = null
}
}, [])
const cancel = useCallback(() => {
const execution = currentExecutionRef.current
if (execution) {
@@ -303,7 +239,6 @@ export function useExecutionStream() {
return {
execute,
executeFromBlock,
cancel,
}
}

View File

@@ -8,7 +8,7 @@ const ivm = require('isolated-vm')
const USER_CODE_START_LINE = 4
const pendingFetches = new Map()
let fetchIdCounter = 0
const FETCH_TIMEOUT_MS = 30000
const FETCH_TIMEOUT_MS = 300000 // 5 minutes
/**
* Extract line and column from error stack or message

View File

@@ -34,6 +34,7 @@ export function layoutContainers(
: DEFAULT_CONTAINER_HORIZONTAL_SPACING,
verticalSpacing: options.verticalSpacing ?? DEFAULT_VERTICAL_SPACING,
padding: { x: CONTAINER_PADDING_X, y: CONTAINER_PADDING_Y },
gridSize: options.gridSize,
}
for (const [parentId, childIds] of children.entries()) {
@@ -56,18 +57,15 @@ export function layoutContainers(
continue
}
// Use the shared core layout function with container options
const { nodes, dimensions } = layoutBlocksCore(childBlocks, childEdges, {
isContainer: true,
layoutOptions: containerOptions,
})
// Apply positions back to blocks
for (const node of nodes.values()) {
blocks[node.id].position = node.position
}
// Update container dimensions
const calculatedWidth = dimensions.width
const calculatedHeight = dimensions.height

View File

@@ -9,6 +9,7 @@ import {
getBlockMetrics,
normalizePositions,
prepareBlockMetrics,
snapNodesToGrid,
} from '@/lib/workflows/autolayout/utils'
import { BLOCK_DIMENSIONS, HANDLE_POSITIONS } from '@/lib/workflows/blocks/block-dimensions'
import { EDGE } from '@/executor/constants'
@@ -84,7 +85,6 @@ export function assignLayers(
): Map<string, GraphNode> {
const nodes = new Map<string, GraphNode>()
// Initialize nodes
for (const [id, block] of Object.entries(blocks)) {
nodes.set(id, {
id,
@@ -97,7 +97,6 @@ export function assignLayers(
})
}
// Build a map of target node -> edges coming into it (to check sourceHandle later)
const incomingEdgesMap = new Map<string, Edge[]>()
for (const edge of edges) {
if (!incomingEdgesMap.has(edge.target)) {
@@ -106,7 +105,6 @@ export function assignLayers(
incomingEdgesMap.get(edge.target)!.push(edge)
}
// Build adjacency from edges
for (const edge of edges) {
const sourceNode = nodes.get(edge.source)
const targetNode = nodes.get(edge.target)
@@ -117,7 +115,6 @@ export function assignLayers(
}
}
// Find starter nodes (no incoming edges)
const starterNodes = Array.from(nodes.values()).filter((node) => node.incoming.size === 0)
if (starterNodes.length === 0 && nodes.size > 0) {
@@ -126,7 +123,6 @@ export function assignLayers(
logger.warn('No starter blocks found, using first block as starter', { blockId: firstNode.id })
}
// Topological sort using Kahn's algorithm
const inDegreeCount = new Map<string, number>()
for (const node of nodes.values()) {
@@ -144,8 +140,6 @@ export function assignLayers(
const node = nodes.get(nodeId)!
processed.add(nodeId)
// Calculate layer based on max incoming layer + 1
// For edges from subflow ends, add the subflow's internal depth (minus 1 to avoid double-counting)
if (node.incoming.size > 0) {
let maxEffectiveLayer = -1
const incomingEdges = incomingEdgesMap.get(nodeId) || []
@@ -153,16 +147,11 @@ export function assignLayers(
for (const incomingId of node.incoming) {
const incomingNode = nodes.get(incomingId)
if (incomingNode) {
// Find edges from this incoming node to check if it's a subflow end edge
const edgesFromSource = incomingEdges.filter((e) => e.source === incomingId)
let additionalDepth = 0
// Check if any edge from this source is a subflow end edge
const hasSubflowEndEdge = edgesFromSource.some(isSubflowEndEdge)
if (hasSubflowEndEdge && subflowDepths) {
// Get the internal depth of the subflow
// Subtract 1 because the +1 at the end of layer calculation already accounts for one layer
// E.g., if subflow has 2 internal layers (depth=2), we add 1 extra so total offset is 2
const depth = subflowDepths.get(incomingId) ?? 1
additionalDepth = Math.max(0, depth - 1)
}
@@ -174,7 +163,6 @@ export function assignLayers(
node.layer = maxEffectiveLayer + 1
}
// Add outgoing nodes when all dependencies processed
for (const targetId of node.outgoing) {
const currentCount = inDegreeCount.get(targetId) || 0
inDegreeCount.set(targetId, currentCount - 1)
@@ -185,7 +173,6 @@ export function assignLayers(
}
}
// Handle isolated nodes
for (const node of nodes.values()) {
if (!processed.has(node.id)) {
logger.debug('Isolated node detected, assigning to layer 0', { blockId: node.id })
@@ -224,7 +211,6 @@ function resolveVerticalOverlaps(nodes: GraphNode[], verticalSpacing: number): v
hasOverlap = false
iteration++
// Group nodes by layer for same-layer overlap resolution
const nodesByLayer = new Map<number, GraphNode[]>()
for (const node of nodes) {
if (!nodesByLayer.has(node.layer)) {
@@ -233,11 +219,9 @@ function resolveVerticalOverlaps(nodes: GraphNode[], verticalSpacing: number): v
nodesByLayer.get(node.layer)!.push(node)
}
// Process each layer independently
for (const [layer, layerNodes] of nodesByLayer) {
if (layerNodes.length < 2) continue
// Sort by Y position for consistent processing
layerNodes.sort((a, b) => a.position.y - b.position.y)
for (let i = 0; i < layerNodes.length - 1; i++) {
@@ -302,7 +286,6 @@ export function calculatePositions(
const layerNumbers = Array.from(layers.keys()).sort((a, b) => a - b)
// Calculate max width for each layer
const layerWidths = new Map<number, number>()
for (const layerNum of layerNumbers) {
const nodesInLayer = layers.get(layerNum)!
@@ -310,7 +293,6 @@ export function calculatePositions(
layerWidths.set(layerNum, maxWidth)
}
// Calculate cumulative X positions for each layer based on actual widths
const layerXPositions = new Map<number, number>()
let cumulativeX = padding.x
@@ -319,7 +301,6 @@ export function calculatePositions(
cumulativeX += layerWidths.get(layerNum)! + horizontalSpacing
}
// Build a flat map of all nodes for quick lookups
const allNodes = new Map<string, GraphNode>()
for (const nodesInLayer of layers.values()) {
for (const node of nodesInLayer) {
@@ -327,7 +308,6 @@ export function calculatePositions(
}
}
// Build incoming edges map for handle lookups
const incomingEdgesMap = new Map<string, Edge[]>()
for (const edge of edges) {
if (!incomingEdgesMap.has(edge.target)) {
@@ -336,20 +316,16 @@ export function calculatePositions(
incomingEdgesMap.get(edge.target)!.push(edge)
}
// Position nodes layer by layer, aligning with connected predecessors
for (const layerNum of layerNumbers) {
const nodesInLayer = layers.get(layerNum)!
const xPosition = layerXPositions.get(layerNum)!
// Separate containers and non-containers
const containersInLayer = nodesInLayer.filter(isContainerBlock)
const nonContainersInLayer = nodesInLayer.filter((n) => !isContainerBlock(n))
// For the first layer (layer 0), position sequentially from padding.y
if (layerNum === 0) {
let yOffset = padding.y
// Sort containers by height for visual balance
containersInLayer.sort((a, b) => b.metrics.height - a.metrics.height)
for (const node of containersInLayer) {
@@ -361,7 +337,6 @@ export function calculatePositions(
yOffset += CONTAINER_VERTICAL_CLEARANCE
}
// Sort non-containers by outgoing connections
nonContainersInLayer.sort((a, b) => b.outgoing.size - a.outgoing.size)
for (const node of nonContainersInLayer) {
@@ -371,9 +346,7 @@ export function calculatePositions(
continue
}
// For subsequent layers, align with connected predecessors (handle-to-handle)
for (const node of [...containersInLayer, ...nonContainersInLayer]) {
// Find the bottommost predecessor handle Y (highest value) and align to it
let bestSourceHandleY = -1
let bestEdge: Edge | null = null
const incomingEdges = incomingEdgesMap.get(node.id) || []
@@ -381,7 +354,6 @@ export function calculatePositions(
for (const edge of incomingEdges) {
const predecessor = allNodes.get(edge.source)
if (predecessor) {
// Calculate actual source handle Y position based on block type and handle
const sourceHandleOffset = getSourceHandleYOffset(predecessor.block, edge.sourceHandle)
const sourceHandleY = predecessor.position.y + sourceHandleOffset
@@ -392,20 +364,16 @@ export function calculatePositions(
}
}
// If no predecessors found (shouldn't happen for layer > 0), use padding
if (bestSourceHandleY < 0) {
bestSourceHandleY = padding.y + HANDLE_POSITIONS.DEFAULT_Y_OFFSET
}
// Calculate the target handle Y offset for this node
const targetHandleOffset = getTargetHandleYOffset(node.block, bestEdge?.targetHandle)
// Position node so its target handle aligns with the source handle Y
node.position = { x: xPosition, y: bestSourceHandleY - targetHandleOffset }
}
}
// Resolve vertical overlaps within layers (X overlaps prevented by cumulative positioning)
resolveVerticalOverlaps(Array.from(layers.values()).flat(), verticalSpacing)
}
@@ -435,7 +403,7 @@ export function layoutBlocksCore(
return { nodes: new Map(), dimensions: { width: 0, height: 0 } }
}
const layoutOptions =
const layoutOptions: LayoutOptions =
options.layoutOptions ??
(options.isContainer ? CONTAINER_LAYOUT_OPTIONS : DEFAULT_LAYOUT_OPTIONS)
@@ -452,7 +420,13 @@ export function layoutBlocksCore(
calculatePositions(layers, edges, layoutOptions)
// 5. Normalize positions
const dimensions = normalizePositions(nodes, { isContainer: options.isContainer })
let dimensions = normalizePositions(nodes, { isContainer: options.isContainer })
// 6. Snap to grid if gridSize is specified (recalculates dimensions)
const snappedDimensions = snapNodesToGrid(nodes, layoutOptions.gridSize)
if (snappedDimensions) {
dimensions = snappedDimensions
}
return { nodes, dimensions }
}

View File

@@ -36,14 +36,13 @@ export function applyAutoLayout(
const horizontalSpacing = options.horizontalSpacing ?? DEFAULT_HORIZONTAL_SPACING
const verticalSpacing = options.verticalSpacing ?? DEFAULT_VERTICAL_SPACING
// Pre-calculate container dimensions by laying out their children (bottom-up)
// This ensures accurate widths/heights before root-level layout
prepareContainerDimensions(
blocksCopy,
edges,
layoutBlocksCore,
horizontalSpacing,
verticalSpacing
verticalSpacing,
options.gridSize
)
const { root: rootBlockIds } = getBlocksByParent(blocksCopy)
@@ -58,8 +57,6 @@ export function applyAutoLayout(
(edge) => layoutRootIds.includes(edge.source) && layoutRootIds.includes(edge.target)
)
// Calculate subflow depths before laying out root blocks
// This ensures blocks connected to subflow ends are positioned correctly
const subflowDepths = calculateSubflowDepths(blocksCopy, edges, assignLayers)
if (Object.keys(rootBlocks).length > 0) {
@@ -95,13 +92,12 @@ export function applyAutoLayout(
}
export type { TargetedLayoutOptions } from '@/lib/workflows/autolayout/targeted'
// Function exports
export { applyTargetedLayout } from '@/lib/workflows/autolayout/targeted'
// Type exports
export type { Edge, LayoutOptions, LayoutResult } from '@/lib/workflows/autolayout/types'
export {
getBlockMetrics,
isContainerType,
shouldSkipAutoLayout,
snapPositionToGrid,
transferBlockHeights,
} from '@/lib/workflows/autolayout/utils'

View File

@@ -1,4 +1,3 @@
import { createLogger } from '@sim/logger'
import {
CONTAINER_PADDING,
DEFAULT_HORIZONTAL_SPACING,
@@ -14,12 +13,11 @@ import {
isContainerType,
prepareContainerDimensions,
shouldSkipAutoLayout,
snapPositionToGrid,
} from '@/lib/workflows/autolayout/utils'
import { CONTAINER_DIMENSIONS } from '@/lib/workflows/blocks/block-dimensions'
import type { BlockState } from '@/stores/workflows/workflow/types'
const logger = createLogger('AutoLayout:Targeted')
export interface TargetedLayoutOptions extends LayoutOptions {
changedBlockIds: string[]
verticalSpacing?: number
@@ -39,6 +37,7 @@ export function applyTargetedLayout(
changedBlockIds,
verticalSpacing = DEFAULT_VERTICAL_SPACING,
horizontalSpacing = DEFAULT_HORIZONTAL_SPACING,
gridSize,
} = options
if (!changedBlockIds || changedBlockIds.length === 0) {
@@ -48,19 +47,17 @@ export function applyTargetedLayout(
const changedSet = new Set(changedBlockIds)
const blocksCopy: Record<string, BlockState> = JSON.parse(JSON.stringify(blocks))
// Pre-calculate container dimensions by laying out their children (bottom-up)
// This ensures accurate widths/heights before root-level layout
prepareContainerDimensions(
blocksCopy,
edges,
layoutBlocksCore,
horizontalSpacing,
verticalSpacing
verticalSpacing,
gridSize
)
const groups = getBlocksByParent(blocksCopy)
// Calculate subflow depths before layout to properly position blocks after subflow ends
const subflowDepths = calculateSubflowDepths(blocksCopy, edges, assignLayers)
layoutGroup(
@@ -71,7 +68,8 @@ export function applyTargetedLayout(
changedSet,
verticalSpacing,
horizontalSpacing,
subflowDepths
subflowDepths,
gridSize
)
for (const [parentId, childIds] of groups.children.entries()) {
@@ -83,7 +81,8 @@ export function applyTargetedLayout(
changedSet,
verticalSpacing,
horizontalSpacing,
subflowDepths
subflowDepths,
gridSize
)
}
@@ -101,7 +100,8 @@ function layoutGroup(
changedSet: Set<string>,
verticalSpacing: number,
horizontalSpacing: number,
subflowDepths: Map<string, number>
subflowDepths: Map<string, number>,
gridSize?: number
): void {
if (childIds.length === 0) return
@@ -116,7 +116,6 @@ function layoutGroup(
return
}
// Determine which blocks need repositioning
const requestedLayout = layoutEligibleChildIds.filter((id) => {
const block = blocks[id]
if (!block) return false
@@ -141,7 +140,6 @@ function layoutGroup(
return
}
// Store old positions for anchor calculation
const oldPositions = new Map<string, { x: number; y: number }>()
for (const id of layoutEligibleChildIds) {
const block = blocks[id]
@@ -149,8 +147,6 @@ function layoutGroup(
oldPositions.set(id, { ...block.position })
}
// Compute layout positions using core function
// Only pass subflowDepths for root-level layout (not inside containers)
const layoutPositions = computeLayoutPositions(
layoutEligibleChildIds,
blocks,
@@ -158,7 +154,8 @@ function layoutGroup(
parentBlock,
horizontalSpacing,
verticalSpacing,
parentId === null ? subflowDepths : undefined
parentId === null ? subflowDepths : undefined,
gridSize
)
if (layoutPositions.size === 0) {
@@ -168,7 +165,6 @@ function layoutGroup(
return
}
// Find anchor block (unchanged block with a layout position)
let offsetX = 0
let offsetY = 0
@@ -185,20 +181,16 @@ function layoutGroup(
}
}
// Apply new positions only to blocks that need layout
for (const id of needsLayout) {
const block = blocks[id]
const newPos = layoutPositions.get(id)
if (!block || !newPos) continue
block.position = {
x: newPos.x + offsetX,
y: newPos.y + offsetY,
}
block.position = snapPositionToGrid({ x: newPos.x + offsetX, y: newPos.y + offsetY }, gridSize)
}
}
/**
* Computes layout positions for a subset of blocks using the core layout
* Computes layout positions for a subset of blocks using the core layout function
*/
function computeLayoutPositions(
childIds: string[],
@@ -207,7 +199,8 @@ function computeLayoutPositions(
parentBlock: BlockState | undefined,
horizontalSpacing: number,
verticalSpacing: number,
subflowDepths?: Map<string, number>
subflowDepths?: Map<string, number>,
gridSize?: number
): Map<string, { x: number; y: number }> {
const subsetBlocks: Record<string, BlockState> = {}
for (const id of childIds) {
@@ -228,11 +221,11 @@ function computeLayoutPositions(
layoutOptions: {
horizontalSpacing: isContainer ? horizontalSpacing * 0.85 : horizontalSpacing,
verticalSpacing,
gridSize,
},
subflowDepths,
})
// Update parent container dimensions if applicable
if (parentBlock) {
parentBlock.data = {
...parentBlock.data,
@@ -241,7 +234,6 @@ function computeLayoutPositions(
}
}
// Convert nodes to position map
const positions = new Map<string, { x: number; y: number }>()
for (const node of nodes.values()) {
positions.set(node.id, { x: node.position.x, y: node.position.y })

View File

@@ -7,6 +7,7 @@ export interface LayoutOptions {
horizontalSpacing?: number
verticalSpacing?: number
padding?: { x: number; y: number }
gridSize?: number
}
export interface LayoutResult {

View File

@@ -18,6 +18,61 @@ function resolveNumeric(value: number | undefined, fallback: number): number {
return typeof value === 'number' && Number.isFinite(value) ? value : fallback
}
/**
* Snaps a single coordinate value to the nearest grid position
*/
function snapToGrid(value: number, gridSize: number): number {
return Math.round(value / gridSize) * gridSize
}
/**
* Snaps a position to the nearest grid point.
* Returns the original position if gridSize is 0 or not provided.
*/
export function snapPositionToGrid(
position: { x: number; y: number },
gridSize: number | undefined
): { x: number; y: number } {
if (!gridSize || gridSize <= 0) {
return position
}
return {
x: snapToGrid(position.x, gridSize),
y: snapToGrid(position.y, gridSize),
}
}
/**
* Snaps all node positions in a graph to grid positions and returns updated dimensions.
* Returns null if gridSize is not set or no snapping was needed.
*/
export function snapNodesToGrid(
nodes: Map<string, GraphNode>,
gridSize: number | undefined
): { width: number; height: number } | null {
if (!gridSize || gridSize <= 0 || nodes.size === 0) {
return null
}
let minX = Number.POSITIVE_INFINITY
let minY = Number.POSITIVE_INFINITY
let maxX = Number.NEGATIVE_INFINITY
let maxY = Number.NEGATIVE_INFINITY
for (const node of nodes.values()) {
node.position = snapPositionToGrid(node.position, gridSize)
minX = Math.min(minX, node.position.x)
minY = Math.min(minY, node.position.y)
maxX = Math.max(maxX, node.position.x + node.metrics.width)
maxY = Math.max(maxY, node.position.y + node.metrics.height)
}
return {
width: maxX - minX + CONTAINER_PADDING * 2,
height: maxY - minY + CONTAINER_PADDING * 2,
}
}
/**
* Checks if a block type is a container (loop or parallel)
*/
@@ -314,6 +369,7 @@ export type LayoutFunction = (
horizontalSpacing?: number
verticalSpacing?: number
padding?: { x: number; y: number }
gridSize?: number
}
subflowDepths?: Map<string, number>
}
@@ -329,13 +385,15 @@ export type LayoutFunction = (
* @param layoutFn - The layout function to use for calculating dimensions
* @param horizontalSpacing - Horizontal spacing between blocks
* @param verticalSpacing - Vertical spacing between blocks
* @param gridSize - Optional grid size for snap-to-grid
*/
export function prepareContainerDimensions(
blocks: Record<string, BlockState>,
edges: Edge[],
layoutFn: LayoutFunction,
horizontalSpacing: number,
verticalSpacing: number
verticalSpacing: number,
gridSize?: number
): void {
const { children } = getBlocksByParent(blocks)
@@ -402,6 +460,7 @@ export function prepareContainerDimensions(
layoutOptions: {
horizontalSpacing: horizontalSpacing * 0.85,
verticalSpacing,
gridSize,
},
})

View File

@@ -23,11 +23,9 @@ import type {
ContextExtensions,
ExecutionCallbacks,
IterationContext,
SerializableExecutionState,
} from '@/executor/execution/types'
import type { ExecutionResult, NormalizedBlockOutput } from '@/executor/types'
import { hasExecutionResult } from '@/executor/utils/errors'
import { buildParallelSentinelEndId, buildSentinelEndId } from '@/executor/utils/subflow-utils'
import { Serializer } from '@/serializer'
const logger = createLogger('ExecutionCore')
@@ -42,12 +40,6 @@ export interface ExecuteWorkflowCoreOptions {
abortSignal?: AbortSignal
includeFileBase64?: boolean
base64MaxBytes?: number
stopAfterBlockId?: string
/** Run-from-block mode: execute starting from a specific block using cached upstream outputs */
runFromBlock?: {
startBlockId: string
sourceSnapshot: SerializableExecutionState
}
}
function parseVariableValueByType(value: unknown, type: string): unknown {
@@ -122,8 +114,6 @@ export async function executeWorkflowCore(
abortSignal,
includeFileBase64,
base64MaxBytes,
stopAfterBlockId,
runFromBlock,
} = options
const { metadata, workflow, input, workflowVariables, selectedOutputs } = snapshot
const { requestId, workflowId, userId, triggerType, executionId, triggerBlockId, useDraftState } =
@@ -256,16 +246,6 @@ export async function executeWorkflowCore(
processedInput = input || {}
// Resolve stopAfterBlockId for loop/parallel containers to their sentinel-end IDs
let resolvedStopAfterBlockId = stopAfterBlockId
if (stopAfterBlockId) {
if (serializedWorkflow.loops?.[stopAfterBlockId]) {
resolvedStopAfterBlockId = buildSentinelEndId(stopAfterBlockId)
} else if (serializedWorkflow.parallels?.[stopAfterBlockId]) {
resolvedStopAfterBlockId = buildParallelSentinelEndId(stopAfterBlockId)
}
}
// Create and execute workflow with callbacks
if (resumeFromSnapshot) {
logger.info(`[${requestId}] Resume execution detected`, {
@@ -316,7 +296,6 @@ export async function executeWorkflowCore(
abortSignal,
includeFileBase64,
base64MaxBytes,
stopAfterBlockId: resolvedStopAfterBlockId,
}
const executorInstance = new Executor({
@@ -339,13 +318,10 @@ export async function executeWorkflowCore(
}
}
const result = runFromBlock
? ((await executorInstance.executeFromBlock(
workflowId,
runFromBlock.startBlockId,
runFromBlock.sourceSnapshot
)) as ExecutionResult)
: ((await executorInstance.execute(workflowId, resolvedTriggerBlockId)) as ExecutionResult)
const result = (await executorInstance.execute(
workflowId,
resolvedTriggerBlockId
)) as ExecutionResult
// Build trace spans for logging from the full execution result
const { traceSpans, totalDuration } = buildTraceSpans(result)

View File

@@ -180,140 +180,3 @@ export function formatSSEEvent(event: ExecutionEvent): string {
export function encodeSSEEvent(event: ExecutionEvent): Uint8Array {
return new TextEncoder().encode(formatSSEEvent(event))
}
/**
* Options for creating SSE execution callbacks
*/
export interface SSECallbackOptions {
executionId: string
workflowId: string
controller: ReadableStreamDefaultController<Uint8Array>
isStreamClosed: () => boolean
setStreamClosed: () => void
}
/**
* Creates SSE callbacks for workflow execution streaming
*/
export function createSSECallbacks(options: SSECallbackOptions) {
const { executionId, workflowId, controller, isStreamClosed, setStreamClosed } = options
const sendEvent = (event: ExecutionEvent) => {
if (isStreamClosed()) return
try {
controller.enqueue(encodeSSEEvent(event))
} catch {
setStreamClosed()
}
}
const onBlockStart = async (
blockId: string,
blockName: string,
blockType: string,
iterationContext?: { iterationCurrent: number; iterationTotal: number; iterationType: string }
) => {
sendEvent({
type: 'block:started',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
blockId,
blockName,
blockType,
...(iterationContext && {
iterationCurrent: iterationContext.iterationCurrent,
iterationTotal: iterationContext.iterationTotal,
iterationType: iterationContext.iterationType as any,
}),
},
})
}
const onBlockComplete = async (
blockId: string,
blockName: string,
blockType: string,
callbackData: { input?: unknown; output: any; executionTime: number },
iterationContext?: { iterationCurrent: number; iterationTotal: number; iterationType: string }
) => {
const hasError = callbackData.output?.error
const iterationData = iterationContext
? {
iterationCurrent: iterationContext.iterationCurrent,
iterationTotal: iterationContext.iterationTotal,
iterationType: iterationContext.iterationType as any,
}
: {}
if (hasError) {
sendEvent({
type: 'block:error',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
blockId,
blockName,
blockType,
input: callbackData.input,
error: callbackData.output.error,
durationMs: callbackData.executionTime || 0,
...iterationData,
},
})
} else {
sendEvent({
type: 'block:completed',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: {
blockId,
blockName,
blockType,
input: callbackData.input,
output: callbackData.output,
durationMs: callbackData.executionTime || 0,
...iterationData,
},
})
}
}
const onStream = async (streamingExecution: unknown) => {
const streamingExec = streamingExecution as { stream: ReadableStream; execution: any }
const blockId = streamingExec.execution?.blockId
const reader = streamingExec.stream.getReader()
const decoder = new TextDecoder()
try {
while (true) {
const { done, value } = await reader.read()
if (done) break
const chunk = decoder.decode(value, { stream: true })
sendEvent({
type: 'stream:chunk',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: { blockId, chunk },
})
}
sendEvent({
type: 'stream:done',
timestamp: new Date().toISOString(),
executionId,
workflowId,
data: { blockId },
})
} finally {
try {
reader.releaseLock()
} catch {}
}
}
return { sendEvent, onBlockStart, onBlockComplete, onStream }
}

View File

@@ -102,7 +102,7 @@ export const azureOpenAIProvider: ProviderConfig = {
}
if (request.temperature !== undefined) payload.temperature = request.temperature
if (request.maxTokens !== undefined) payload.max_tokens = request.maxTokens
if (request.maxTokens != null) payload.max_completion_tokens = request.maxTokens
if (request.reasoningEffort !== undefined) payload.reasoning_effort = request.reasoningEffort
if (request.verbosity !== undefined) payload.verbosity = request.verbosity

View File

@@ -77,7 +77,7 @@ export const cerebrasProvider: ProviderConfig = {
messages: allMessages,
}
if (request.temperature !== undefined) payload.temperature = request.temperature
if (request.maxTokens !== undefined) payload.max_tokens = request.maxTokens
if (request.maxTokens != null) payload.max_completion_tokens = request.maxTokens
if (request.responseFormat) {
payload.response_format = {
type: 'json_schema',

View File

@@ -81,7 +81,7 @@ export const deepseekProvider: ProviderConfig = {
}
if (request.temperature !== undefined) payload.temperature = request.temperature
if (request.maxTokens !== undefined) payload.max_tokens = request.maxTokens
if (request.maxTokens != null) payload.max_tokens = request.maxTokens
let preparedTools: ReturnType<typeof prepareToolsWithUsageControl> | null = null

View File

@@ -349,7 +349,7 @@ export async function executeGeminiRequest(
if (request.temperature !== undefined) {
geminiConfig.temperature = request.temperature
}
if (request.maxTokens !== undefined) {
if (request.maxTokens != null) {
geminiConfig.maxOutputTokens = request.maxTokens
}
if (systemInstruction) {

View File

@@ -123,17 +123,21 @@ export function extractFunctionCallPart(candidate: Candidate | undefined): Part
}
/**
* Converts usage metadata from SDK response to our format
* Converts usage metadata from SDK response to our format.
* Per Gemini docs, total = promptTokenCount + candidatesTokenCount + toolUsePromptTokenCount + thoughtsTokenCount
* We include toolUsePromptTokenCount in input and thoughtsTokenCount in output for correct billing.
*/
export function convertUsageMetadata(
usageMetadata: GenerateContentResponseUsageMetadata | undefined
): GeminiUsage {
const promptTokenCount = usageMetadata?.promptTokenCount ?? 0
const candidatesTokenCount = usageMetadata?.candidatesTokenCount ?? 0
const thoughtsTokenCount = usageMetadata?.thoughtsTokenCount ?? 0
const toolUsePromptTokenCount = usageMetadata?.toolUsePromptTokenCount ?? 0
const promptTokenCount = (usageMetadata?.promptTokenCount ?? 0) + toolUsePromptTokenCount
const candidatesTokenCount = (usageMetadata?.candidatesTokenCount ?? 0) + thoughtsTokenCount
return {
promptTokenCount,
candidatesTokenCount,
totalTokenCount: usageMetadata?.totalTokenCount ?? promptTokenCount + candidatesTokenCount,
totalTokenCount: usageMetadata?.totalTokenCount ?? 0,
}
}

View File

@@ -74,7 +74,7 @@ export const groqProvider: ProviderConfig = {
}
if (request.temperature !== undefined) payload.temperature = request.temperature
if (request.maxTokens !== undefined) payload.max_tokens = request.maxTokens
if (request.maxTokens != null) payload.max_completion_tokens = request.maxTokens
if (request.responseFormat) {
payload.response_format = {

View File

@@ -91,7 +91,7 @@ export const mistralProvider: ProviderConfig = {
}
if (request.temperature !== undefined) payload.temperature = request.temperature
if (request.maxTokens !== undefined) payload.max_tokens = request.maxTokens
if (request.maxTokens != null) payload.max_tokens = request.maxTokens
if (request.responseFormat) {
payload.response_format = {

View File

@@ -1130,7 +1130,7 @@ export const PROVIDER_DEFINITIONS: Record<string, ProviderDefinition> = {
id: 'cerebras',
name: 'Cerebras',
description: 'Cerebras Cloud LLMs',
defaultModel: 'cerebras/llama-3.3-70b',
defaultModel: 'cerebras/gpt-oss-120b',
modelPatterns: [/^cerebras/],
icon: CerebrasIcon,
capabilities: {
@@ -1138,44 +1138,64 @@ export const PROVIDER_DEFINITIONS: Record<string, ProviderDefinition> = {
},
models: [
{
id: 'cerebras/llama-3.1-8b',
id: 'cerebras/gpt-oss-120b',
pricing: {
input: 0.35,
output: 0.75,
updatedAt: '2026-01-27',
},
capabilities: {},
contextWindow: 131000,
},
{
id: 'cerebras/llama3.1-8b',
pricing: {
input: 0.1,
output: 0.1,
updatedAt: '2025-10-11',
updatedAt: '2026-01-27',
},
capabilities: {},
contextWindow: 32000,
},
{
id: 'cerebras/llama-3.1-70b',
pricing: {
input: 0.6,
output: 0.6,
updatedAt: '2025-10-11',
},
capabilities: {},
contextWindow: 128000,
},
{
id: 'cerebras/llama-3.3-70b',
pricing: {
input: 0.6,
output: 0.6,
updatedAt: '2025-10-11',
input: 0.85,
output: 1.2,
updatedAt: '2026-01-27',
},
capabilities: {},
contextWindow: 128000,
},
{
id: 'cerebras/llama-4-scout-17b-16e-instruct',
id: 'cerebras/qwen-3-32b',
pricing: {
input: 0.11,
output: 0.34,
updatedAt: '2025-10-11',
input: 0.4,
output: 0.8,
updatedAt: '2026-01-27',
},
capabilities: {},
contextWindow: 10000000,
contextWindow: 131000,
},
{
id: 'cerebras/qwen-3-235b-a22b-instruct-2507',
pricing: {
input: 0.6,
output: 1.2,
updatedAt: '2026-01-27',
},
capabilities: {},
contextWindow: 131000,
},
{
id: 'cerebras/zai-glm-4.7',
pricing: {
input: 2.25,
output: 2.75,
updatedAt: '2026-01-27',
},
capabilities: {},
contextWindow: 131000,
},
],
},
@@ -1194,8 +1214,8 @@ export const PROVIDER_DEFINITIONS: Record<string, ProviderDefinition> = {
id: 'groq/openai/gpt-oss-120b',
pricing: {
input: 0.15,
output: 0.75,
updatedAt: '2025-10-11',
output: 0.6,
updatedAt: '2026-01-27',
},
capabilities: {},
contextWindow: 131072,
@@ -1203,9 +1223,29 @@ export const PROVIDER_DEFINITIONS: Record<string, ProviderDefinition> = {
{
id: 'groq/openai/gpt-oss-20b',
pricing: {
input: 0.01,
output: 0.25,
updatedAt: '2025-10-11',
input: 0.075,
output: 0.3,
updatedAt: '2026-01-27',
},
capabilities: {},
contextWindow: 131072,
},
{
id: 'groq/openai/gpt-oss-safeguard-20b',
pricing: {
input: 0.075,
output: 0.3,
updatedAt: '2026-01-27',
},
capabilities: {},
contextWindow: 131072,
},
{
id: 'groq/qwen/qwen3-32b',
pricing: {
input: 0.29,
output: 0.59,
updatedAt: '2026-01-27',
},
capabilities: {},
contextWindow: 131072,
@@ -1215,7 +1255,7 @@ export const PROVIDER_DEFINITIONS: Record<string, ProviderDefinition> = {
pricing: {
input: 0.05,
output: 0.08,
updatedAt: '2025-10-11',
updatedAt: '2026-01-27',
},
capabilities: {},
contextWindow: 131072,
@@ -1225,27 +1265,17 @@ export const PROVIDER_DEFINITIONS: Record<string, ProviderDefinition> = {
pricing: {
input: 0.59,
output: 0.79,
updatedAt: '2025-10-11',
updatedAt: '2026-01-27',
},
capabilities: {},
contextWindow: 131072,
},
{
id: 'groq/llama-4-scout-17b-instruct',
id: 'groq/meta-llama/llama-4-scout-17b-16e-instruct',
pricing: {
input: 0.11,
output: 0.34,
updatedAt: '2025-10-11',
},
capabilities: {},
contextWindow: 131072,
},
{
id: 'groq/llama-4-maverick-17b-instruct',
pricing: {
input: 0.5,
output: 0.77,
updatedAt: '2025-10-11',
updatedAt: '2026-01-27',
},
capabilities: {},
contextWindow: 131072,
@@ -1253,9 +1283,9 @@ export const PROVIDER_DEFINITIONS: Record<string, ProviderDefinition> = {
{
id: 'groq/meta-llama/llama-4-maverick-17b-128e-instruct',
pricing: {
input: 0.5,
output: 0.77,
updatedAt: '2025-10-11',
input: 0.2,
output: 0.6,
updatedAt: '2026-01-27',
},
capabilities: {},
contextWindow: 131072,
@@ -1265,7 +1295,7 @@ export const PROVIDER_DEFINITIONS: Record<string, ProviderDefinition> = {
pricing: {
input: 0.04,
output: 0.04,
updatedAt: '2025-10-11',
updatedAt: '2026-01-27',
},
capabilities: {},
contextWindow: 8192,
@@ -1275,27 +1305,37 @@ export const PROVIDER_DEFINITIONS: Record<string, ProviderDefinition> = {
pricing: {
input: 0.59,
output: 0.79,
updatedAt: '2025-10-11',
updatedAt: '2026-01-27',
},
capabilities: {},
contextWindow: 128000,
},
{
id: 'groq/moonshotai/kimi-k2-instruct',
id: 'groq/deepseek-r1-distill-qwen-32b',
pricing: {
input: 0.69,
output: 0.69,
updatedAt: '2026-01-27',
},
capabilities: {},
contextWindow: 128000,
},
{
id: 'groq/moonshotai/kimi-k2-instruct-0905',
pricing: {
input: 1.0,
output: 3.0,
updatedAt: '2025-10-11',
updatedAt: '2026-01-27',
},
capabilities: {},
contextWindow: 131072,
contextWindow: 262144,
},
{
id: 'groq/meta-llama/llama-guard-4-12b',
pricing: {
input: 0.2,
output: 0.2,
updatedAt: '2025-10-11',
updatedAt: '2026-01-27',
},
capabilities: {},
contextWindow: 131072,

View File

@@ -105,7 +105,7 @@ export const ollamaProvider: ProviderConfig = {
}
if (request.temperature !== undefined) payload.temperature = request.temperature
if (request.maxTokens !== undefined) payload.max_tokens = request.maxTokens
if (request.maxTokens != null) payload.max_tokens = request.maxTokens
if (request.responseFormat) {
payload.response_format = {

View File

@@ -81,7 +81,7 @@ export const openaiProvider: ProviderConfig = {
}
if (request.temperature !== undefined) payload.temperature = request.temperature
if (request.maxTokens !== undefined) payload.max_tokens = request.maxTokens
if (request.maxTokens != null) payload.max_completion_tokens = request.maxTokens
if (request.reasoningEffort !== undefined) payload.reasoning_effort = request.reasoningEffort
if (request.verbosity !== undefined) payload.verbosity = request.verbosity

View File

@@ -121,7 +121,7 @@ export const openRouterProvider: ProviderConfig = {
}
if (request.temperature !== undefined) payload.temperature = request.temperature
if (request.maxTokens !== undefined) payload.max_tokens = request.maxTokens
if (request.maxTokens != null) payload.max_tokens = request.maxTokens
let preparedTools: ReturnType<typeof prepareToolsWithUsageControl> | null = null
let hasActiveTools = false
@@ -516,7 +516,7 @@ export const openRouterProvider: ProviderConfig = {
return streamingResult as StreamingExecution
}
if (request.responseFormat && hasActiveTools && toolCalls.length > 0) {
if (request.responseFormat && hasActiveTools) {
const finalPayload: any = {
model: payload.model,
messages: [...currentMessages],

View File

@@ -135,7 +135,7 @@ export const vllmProvider: ProviderConfig = {
}
if (request.temperature !== undefined) payload.temperature = request.temperature
if (request.maxTokens !== undefined) payload.max_tokens = request.maxTokens
if (request.maxTokens != null) payload.max_completion_tokens = request.maxTokens
if (request.responseFormat) {
payload.response_format = {

View File

@@ -92,7 +92,7 @@ export const xAIProvider: ProviderConfig = {
}
if (request.temperature !== undefined) basePayload.temperature = request.temperature
if (request.maxTokens !== undefined) basePayload.max_tokens = request.maxTokens
if (request.maxTokens != null) basePayload.max_completion_tokens = request.maxTokens
let preparedTools: ReturnType<typeof prepareToolsWithUsageControl> | null = null
if (tools?.length) {

View File

@@ -35,23 +35,4 @@ export const useExecutionStore = create<ExecutionState & ExecutionActions>()((se
},
clearRunPath: () => set({ lastRunPath: new Map(), lastRunEdges: new Map() }),
reset: () => set(initialState),
setLastExecutionSnapshot: (workflowId, snapshot) => {
const { lastExecutionSnapshots } = get()
const newSnapshots = new Map(lastExecutionSnapshots)
newSnapshots.set(workflowId, snapshot)
set({ lastExecutionSnapshots: newSnapshots })
},
getLastExecutionSnapshot: (workflowId) => {
const { lastExecutionSnapshots } = get()
return lastExecutionSnapshots.get(workflowId)
},
clearLastExecutionSnapshot: (workflowId) => {
const { lastExecutionSnapshots } = get()
const newSnapshots = new Map(lastExecutionSnapshots)
newSnapshots.delete(workflowId)
set({ lastExecutionSnapshots: newSnapshots })
},
}))

View File

@@ -1,5 +1,4 @@
import type { Executor } from '@/executor'
import type { SerializableExecutionState } from '@/executor/execution/types'
import type { ExecutionContext } from '@/executor/types'
/**
@@ -19,9 +18,16 @@ export interface ExecutionState {
pendingBlocks: string[]
executor: Executor | null
debugContext: ExecutionContext | null
/**
* Tracks blocks from the last execution run and their success/error status.
* Cleared when a new run starts. Used to show run path indicators (rings on blocks).
*/
lastRunPath: Map<string, BlockRunStatus>
/**
* Tracks edges from the last execution run and their success/error status.
* Cleared when a new run starts. Used to show run path indicators on edges.
*/
lastRunEdges: Map<string, EdgeRunStatus>
lastExecutionSnapshots: Map<string, SerializableExecutionState>
}
export interface ExecutionActions {
@@ -35,9 +41,6 @@ export interface ExecutionActions {
setEdgeRunStatus: (edgeId: string, status: EdgeRunStatus) => void
clearRunPath: () => void
reset: () => void
setLastExecutionSnapshot: (workflowId: string, snapshot: SerializableExecutionState) => void
getLastExecutionSnapshot: (workflowId: string) => SerializableExecutionState | undefined
clearLastExecutionSnapshot: (workflowId: string) => void
}
export const initialState: ExecutionState = {
@@ -49,5 +52,4 @@ export const initialState: ExecutionState = {
debugContext: null,
lastRunPath: new Map(),
lastRunEdges: new Map(),
lastExecutionSnapshots: new Map(),
}

View File

@@ -253,23 +253,6 @@ describe('executeTool Function', () => {
vi.restoreAllMocks()
})
it('should handle errors from tools', async () => {
setupFetchMock({ status: 400, ok: false, json: { error: 'Bad request' } })
const result = await executeTool(
'http_request',
{
url: 'https://api.example.com/data',
method: 'GET',
},
true
)
expect(result.success).toBe(false)
expect(result.error).toBeDefined()
expect(result.timing).toBeDefined()
})
it('should add timing information to results', async () => {
const result = await executeTool(
'http_request',

View File

@@ -0,0 +1,84 @@
{{- if .Values.certManager.enabled }}
{{- /*
cert-manager Issuer Bootstrap Pattern
PREREQUISITE: cert-manager must be installed in your cluster before enabling this.
The root CA Certificate is created in the namespace specified by certManager.rootCA.namespace
(defaults to "cert-manager"). Ensure this namespace exists and cert-manager is running there.
Install cert-manager: https://cert-manager.io/docs/installation/
This implements the recommended pattern from cert-manager documentation:
1. A self-signed ClusterIssuer (for bootstrapping the root CA only)
2. A root CA Certificate (self-signed, used to sign other certificates)
3. A CA ClusterIssuer (uses the root CA to sign certificates)
Reference: https://cert-manager.io/docs/configuration/selfsigned/
*/ -}}
---
# 1. Self-Signed ClusterIssuer (Bootstrap Only)
# This issuer is used ONLY to create the root CA certificate.
# It should NOT be used directly for application certificates.
apiVersion: cert-manager.io/v1
kind: ClusterIssuer
metadata:
name: {{ .Values.certManager.selfSignedIssuer.name }}
labels:
{{- include "sim.labels" . | nindent 4 }}
app.kubernetes.io/component: cert-manager
spec:
selfSigned: {}
---
# 2. Root CA Certificate
# This certificate is signed by the self-signed issuer and becomes the root of trust.
# The secret created here will be used by the CA issuer to sign certificates.
# NOTE: This must be created in the cert-manager namespace (or the namespace specified
# in certManager.rootCA.namespace). Ensure cert-manager is installed there first.
apiVersion: cert-manager.io/v1
kind: Certificate
metadata:
name: {{ .Values.certManager.rootCA.certificateName }}
namespace: {{ .Values.certManager.rootCA.namespace | default "cert-manager" }} # Must match cert-manager's cluster-resource-namespace
labels:
{{- include "sim.labels" . | nindent 4 }}
app.kubernetes.io/component: cert-manager
spec:
isCA: true
commonName: {{ .Values.certManager.rootCA.commonName }}
secretName: {{ .Values.certManager.rootCA.secretName }}
duration: {{ .Values.certManager.rootCA.duration | default "87600h" }}
renewBefore: {{ .Values.certManager.rootCA.renewBefore | default "2160h" }}
privateKey:
algorithm: {{ .Values.certManager.rootCA.privateKey.algorithm | default "RSA" }}
size: {{ .Values.certManager.rootCA.privateKey.size | default 4096 }}
subject:
organizations:
{{- if .Values.certManager.rootCA.subject.organizations }}
{{- toYaml .Values.certManager.rootCA.subject.organizations | nindent 6 }}
{{- else }}
- {{ .Release.Name }}
{{- end }}
issuerRef:
name: {{ .Values.certManager.selfSignedIssuer.name }}
kind: ClusterIssuer
group: cert-manager.io
---
# 3. CA ClusterIssuer
# This is the issuer that should be used by applications to obtain certificates.
# It signs certificates using the root CA created above.
# NOTE: This issuer may briefly show "not ready" on first install while cert-manager
# processes the Certificate above and creates the secret. It will auto-reconcile.
apiVersion: cert-manager.io/v1
kind: ClusterIssuer
metadata:
name: {{ .Values.certManager.caIssuer.name }}
labels:
{{- include "sim.labels" . | nindent 4 }}
app.kubernetes.io/component: cert-manager
spec:
ca:
secretName: {{ .Values.certManager.rootCA.secretName }}
{{- end }}

View File

@@ -1,6 +1,36 @@
{{- if and .Values.ollama.enabled .Values.ollama.gpu.enabled }}
---
# NVIDIA Device Plugin DaemonSet for GPU support
# 1. ConfigMap for NVIDIA Device Plugin Configuration
apiVersion: v1
kind: ConfigMap
metadata:
name: {{ include "sim.fullname" . }}-nvidia-device-plugin-config
namespace: {{ .Release.Namespace }}
labels:
{{- include "sim.labels" . | nindent 4 }}
app.kubernetes.io/component: nvidia-device-plugin
data:
config.yaml: |
version: v1
flags:
{{- if eq .Values.ollama.gpu.strategy "mig" }}
migStrategy: "single"
{{- else }}
migStrategy: "none"
{{- end }}
failOnInitError: false
plugin:
passDeviceSpecs: true
deviceListStrategy: envvar
{{- if eq .Values.ollama.gpu.strategy "time-slicing" }}
sharing:
timeSlicing:
resources:
- name: nvidia.com/gpu
replicas: {{ .Values.ollama.gpu.timeSlicingReplicas | default 5 }}
{{- end }}
---
# 2. NVIDIA Device Plugin DaemonSet for GPU support
apiVersion: apps/v1
kind: DaemonSet
metadata:
@@ -35,9 +65,6 @@ spec:
# Only schedule on nodes with NVIDIA GPUs
accelerator: nvidia
priorityClassName: system-node-critical
runtimeClassName: nvidia
hostNetwork: true
hostPID: true
volumes:
- name: device-plugin
hostPath:
@@ -48,22 +75,21 @@ spec:
- name: sys
hostPath:
path: /sys
- name: proc-driver-nvidia
hostPath:
path: /proc/driver/nvidia
# Volume to mount the ConfigMap
- name: nvidia-device-plugin-config
configMap:
name: {{ include "sim.fullname" . }}-nvidia-device-plugin-config
containers:
- name: nvidia-device-plugin
image: nvcr.io/nvidia/k8s-device-plugin:v0.14.5
image: nvcr.io/nvidia/k8s-device-plugin:v0.18.2
imagePullPolicy: Always
args:
- --mig-strategy=single
- --pass-device-specs=true
- --fail-on-init-error=false
- --device-list-strategy=envvar
- --nvidia-driver-root=/host-sys/fs/cgroup
- "--config-file=/etc/device-plugin/config.yaml"
{{- if eq .Values.ollama.gpu.strategy "mig" }}
env:
- name: NVIDIA_MIG_MONITOR_DEVICES
value: all
{{- end }}
securityContext:
allowPrivilegeEscalation: false
capabilities:
@@ -74,29 +100,16 @@ spec:
- name: dev
mountPath: /dev
- name: sys
mountPath: /host-sys
mountPath: /sys
readOnly: true
- name: proc-driver-nvidia
mountPath: /proc/driver/nvidia
- name: nvidia-device-plugin-config
mountPath: /etc/device-plugin/
readOnly: true
resources:
requests:
cpu: 50m
memory: 10Mi
memory: 20Mi
limits:
cpu: 50m
memory: 20Mi
{{- if .Values.nodeSelector }}
nodeSelector:
{{- toYaml .Values.nodeSelector | nindent 8 }}
{{- end }}
---
# RuntimeClass for NVIDIA Container Runtime
apiVersion: node.k8s.io/v1
kind: RuntimeClass
metadata:
name: {{ include "sim.fullname" . }}-nvidia
labels:
{{- include "sim.labels" . | nindent 4 }}
handler: nvidia
{{- end }}
memory: 50Mi
{{- end }}

View File

@@ -400,8 +400,10 @@ postgresql:
algorithm: RSA # RSA or ECDSA
size: 4096 # Key size in bits
# Issuer reference (REQUIRED if tls.enabled is true)
# By default, references the CA issuer created by certManager.caIssuer
# Make sure certManager.enabled is true, or provide your own issuer
issuerRef:
name: selfsigned-cluster-issuer # Name of your cert-manager Issuer/ClusterIssuer
name: sim-ca-issuer # Name of your cert-manager Issuer/ClusterIssuer
kind: ClusterIssuer # ClusterIssuer or Issuer
group: "" # Optional: cert-manager.io (leave empty for default)
# Additional DNS names (optional)
@@ -463,20 +465,26 @@ externalDatabase:
ollama:
# Enable/disable Ollama deployment
enabled: false
# Image configuration
image:
repository: ollama/ollama
tag: latest
pullPolicy: Always
# Number of replicas
replicaCount: 1
# GPU configuration
gpu:
enabled: false
count: 1
# GPU sharing strategy: "mig" (Multi-Instance GPU) or "time-slicing"
# - mig: Hardware-level GPU partitioning (requires supported GPUs like A100)
# - time-slicing: Software-level GPU sharing (works with most NVIDIA GPUs)
strategy: "time-slicing"
# Number of time-slicing replicas (only used when strategy is "time-slicing")
timeSlicingReplicas: 5
# Node selector for GPU workloads (adjust labels based on your cluster configuration)
nodeSelector:
@@ -1185,4 +1193,53 @@ externalSecrets:
# External database password (when using managed database services)
externalDatabase:
# Path to external database password in external store
password: ""
password: ""
# cert-manager configuration
# Prerequisites: Install cert-manager in your cluster first
# See: https://cert-manager.io/docs/installation/
#
# This implements the recommended CA bootstrap pattern from cert-manager:
# 1. Self-signed ClusterIssuer (bootstrap only - creates root CA)
# 2. Root CA Certificate (self-signed, becomes the trust anchor)
# 3. CA ClusterIssuer (signs application certificates using root CA)
#
# Reference: https://cert-manager.io/docs/configuration/selfsigned/
certManager:
# Enable/disable cert-manager issuer resources
enabled: false
# Self-signed ClusterIssuer (used ONLY to bootstrap the root CA)
# Do not reference this issuer directly for application certificates
selfSignedIssuer:
name: "sim-selfsigned-bootstrap-issuer"
# Root CA Certificate configuration
# This certificate is signed by the self-signed issuer and used as the trust anchor
rootCA:
# Name of the Certificate resource
certificateName: "sim-root-ca"
# Namespace where the root CA certificate and secret will be created
# Must match cert-manager's cluster-resource-namespace (default: cert-manager)
namespace: "cert-manager"
# Common name for the root CA certificate
commonName: "sim-root-ca"
# Secret name where the root CA certificate and key will be stored
secretName: "sim-root-ca-secret"
# Certificate validity duration (default: 10 years)
duration: "87600h"
# Renew before expiry (default: 90 days)
renewBefore: "2160h"
# Private key configuration
privateKey:
algorithm: RSA
size: 4096
# Subject configuration
subject:
organizations: []
# If empty, defaults to the release name
# CA ClusterIssuer configuration
# This is the issuer that applications should reference for obtaining certificates
caIssuer:
name: "sim-ca-issuer"