Implement basic tooling for workflow.apply

This commit is contained in:
Siddharth Ganesan
2026-02-11 16:30:11 -08:00
parent 52aff4d60b
commit 462aa15341
15 changed files with 2008 additions and 38 deletions

View File

@@ -14,6 +14,15 @@ const logger = createLogger('DiffControls')
const NOTIFICATION_WIDTH = 240
const NOTIFICATION_GAP = 16
function isWorkflowEditToolCall(name?: string, params?: Record<string, unknown>): boolean {
if (name === 'edit_workflow') return true
if (name !== 'workflow_change') return false
const mode = typeof params?.mode === 'string' ? params.mode.toLowerCase() : ''
if (mode === 'apply') return true
return typeof params?.proposalId === 'string' && params.proposalId.length > 0
}
export const DiffControls = memo(function DiffControls() {
const isTerminalResizing = useTerminalStore((state) => state.isResizing)
const isPanelResizing = usePanelStore((state) => state.isResizing)
@@ -64,7 +73,7 @@ export const DiffControls = memo(function DiffControls() {
const b = blocks[bi]
if (b?.type === 'tool_call') {
const tn = b.toolCall?.name
if (tn === 'edit_workflow') {
if (isWorkflowEditToolCall(tn, b.toolCall?.params)) {
id = b.toolCall?.id
break outer
}
@@ -72,7 +81,9 @@ export const DiffControls = memo(function DiffControls() {
}
}
if (!id) {
const candidates = Object.values(toolCallsById).filter((t) => t.name === 'edit_workflow')
const candidates = Object.values(toolCallsById).filter((t) =>
isWorkflowEditToolCall(t.name, t.params)
)
id = candidates.length ? candidates[candidates.length - 1].id : undefined
}
if (id) updatePreviewToolCallState('accepted', id)
@@ -102,7 +113,7 @@ export const DiffControls = memo(function DiffControls() {
const b = blocks[bi]
if (b?.type === 'tool_call') {
const tn = b.toolCall?.name
if (tn === 'edit_workflow') {
if (isWorkflowEditToolCall(tn, b.toolCall?.params)) {
id = b.toolCall?.id
break outer
}
@@ -110,7 +121,9 @@ export const DiffControls = memo(function DiffControls() {
}
}
if (!id) {
const candidates = Object.values(toolCallsById).filter((t) => t.name === 'edit_workflow')
const candidates = Object.values(toolCallsById).filter((t) =>
isWorkflowEditToolCall(t.name, t.params)
)
id = candidates.length ? candidates[candidates.length - 1].id : undefined
}
if (id) updatePreviewToolCallState('rejected', id)

View File

@@ -47,6 +47,28 @@ interface ParsedTags {
cleanContent: string
}
function getToolCallParams(toolCall?: CopilotToolCall): Record<string, unknown> {
const candidate = ((toolCall as any)?.parameters ||
(toolCall as any)?.input ||
(toolCall as any)?.params ||
{}) as Record<string, unknown>
return candidate && typeof candidate === 'object' ? candidate : {}
}
function isWorkflowChangeApplyMode(toolCall?: CopilotToolCall): boolean {
if (!toolCall || toolCall.name !== 'workflow_change') return false
const params = getToolCallParams(toolCall)
const mode = typeof params.mode === 'string' ? params.mode.toLowerCase() : ''
if (mode === 'apply') return true
return typeof params.proposalId === 'string' && params.proposalId.length > 0
}
function isWorkflowEditSummaryTool(toolCall?: CopilotToolCall): boolean {
if (!toolCall) return false
if (toolCall.name === 'edit_workflow') return true
return isWorkflowChangeApplyMode(toolCall)
}
/**
* Extracts plan steps from plan_respond tool calls in subagent blocks.
* @param blocks - The subagent content blocks to search
@@ -871,7 +893,10 @@ const SubagentContentRenderer = memo(function SubagentContentRenderer({
)
}
if (segment.type === 'tool' && segment.block.toolCall) {
if (toolCall.name === 'edit' && segment.block.toolCall.name === 'edit_workflow') {
if (
(toolCall.name === 'edit' || toolCall.name === 'build') &&
isWorkflowEditSummaryTool(segment.block.toolCall)
) {
return (
<div key={`tool-${segment.block.toolCall.id || index}`}>
<WorkflowEditSummary toolCall={segment.block.toolCall} />
@@ -968,12 +993,11 @@ const WorkflowEditSummary = memo(function WorkflowEditSummary({
}
}, [blocks])
if (toolCall.name !== 'edit_workflow') {
if (!isWorkflowEditSummaryTool(toolCall)) {
return null
}
const params =
(toolCall as any).parameters || (toolCall as any).input || (toolCall as any).params || {}
const params = getToolCallParams(toolCall)
let operations = Array.isArray(params.operations) ? params.operations : []
if (operations.length === 0 && Array.isArray((toolCall as any).operations)) {
@@ -2087,7 +2111,7 @@ export function ToolCall({
}
}
const isEditWorkflow = toolCall.name === 'edit_workflow'
const isEditWorkflow = isWorkflowEditSummaryTool(toolCall)
const shouldShowDetails = isRunWorkflow || (isExpandableTool && expanded)
const hasOperations = Array.isArray(params.operations) && params.operations.length > 0
const hideTextForEditWorkflow = isEditWorkflow && hasOperations
@@ -2155,7 +2179,7 @@ export function ToolCall({
</Button>
</div>
) : null}
{/* Workflow edit summary - shows block changes after edit_workflow completes */}
{/* Workflow edit summary - shows block changes after edit_workflow/workflow_change(apply) */}
<WorkflowEditSummary toolCall={toolCall} />
{/* Render subagent content as thinking text */}

View File

@@ -26,6 +26,47 @@ const MAX_BATCH_INTERVAL = 50
const MIN_BATCH_INTERVAL = 16
const MAX_QUEUE_SIZE = 5
function isWorkflowEditToolCall(toolName?: string, params?: Record<string, unknown>): boolean {
if (toolName === 'edit_workflow') return true
if (toolName !== 'workflow_change') return false
const mode = typeof params?.mode === 'string' ? params.mode.toLowerCase() : ''
if (mode === 'apply') return true
return typeof params?.proposalId === 'string' && params.proposalId.length > 0
}
function isWorkflowChangeApplyCall(toolName?: string, params?: Record<string, unknown>): boolean {
if (toolName !== 'workflow_change') return false
const mode = typeof params?.mode === 'string' ? params.mode.toLowerCase() : ''
if (mode === 'apply') return true
return typeof params?.proposalId === 'string' && params.proposalId.length > 0
}
function extractWorkflowStateFromResultPayload(
resultPayload: Record<string, unknown>
): WorkflowState | null {
const directState = asRecord(resultPayload.workflowState)
if (directState) return directState as unknown as WorkflowState
const editResult = asRecord(resultPayload.editResult)
const nestedState = asRecord(editResult?.workflowState)
if (nestedState) return nestedState as unknown as WorkflowState
return null
}
function extractOperationListFromResultPayload(
resultPayload: Record<string, unknown>
): Array<Record<string, unknown>> | undefined {
const operations = resultPayload.operations
if (Array.isArray(operations)) return operations as Array<Record<string, unknown>>
const compiled = resultPayload.compiledOperations
if (Array.isArray(compiled)) return compiled as Array<Record<string, unknown>>
return undefined
}
/**
* Send an auto-accept confirmation to the server for auto-allowed tools.
* The server-side orchestrator polls Redis for this decision.
@@ -252,6 +293,7 @@ export const sseHandlers: Record<string, SSEHandler> = {
if (!toolCallId) return
const { toolCallsById } = get()
const current = toolCallsById[toolCallId]
let paramsForCurrentToolCall: Record<string, unknown> | undefined = current?.params
if (current) {
if (
isRejectedState(current.state) ||
@@ -265,11 +307,34 @@ export const sseHandlers: Record<string, SSEHandler> = {
: failedDependency || skipped
? ClientToolCallState.rejected
: ClientToolCallState.error
const resultPayload = asRecord(
data?.result || eventData.result || eventData.data || data?.data
)
if (
targetState === ClientToolCallState.success &&
isWorkflowChangeApplyCall(current.name, paramsForCurrentToolCall)
) {
const operations = extractOperationListFromResultPayload(resultPayload || {})
if (operations && operations.length > 0) {
paramsForCurrentToolCall = {
...(current.params || {}),
operations,
}
}
}
const updatedMap = { ...toolCallsById }
updatedMap[toolCallId] = {
...current,
params: paramsForCurrentToolCall,
state: targetState,
display: resolveToolDisplay(current.name, targetState, current.id, current.params),
display: resolveToolDisplay(
current.name,
targetState,
current.id,
paramsForCurrentToolCall
),
}
set({ toolCallsById: updatedMap })
@@ -312,31 +377,39 @@ export const sseHandlers: Record<string, SSEHandler> = {
}
}
if (current.name === 'edit_workflow') {
if (
targetState === ClientToolCallState.success &&
isWorkflowEditToolCall(current.name, paramsForCurrentToolCall)
) {
try {
const resultPayload = asRecord(
data?.result || eventData.result || eventData.data || data?.data
)
const workflowState = asRecord(resultPayload?.workflowState)
const hasWorkflowState = !!resultPayload?.workflowState
logger.info('[SSE] edit_workflow result received', {
const workflowState = resultPayload
? extractWorkflowStateFromResultPayload(resultPayload)
: null
const hasWorkflowState = !!workflowState
logger.info('[SSE] workflow edit result received', {
toolName: current.name,
hasWorkflowState,
blockCount: hasWorkflowState ? Object.keys(workflowState.blocks ?? {}).length : 0,
edgeCount: Array.isArray(workflowState.edges) ? workflowState.edges.length : 0,
blockCount: hasWorkflowState
? Object.keys((workflowState as any).blocks ?? {}).length
: 0,
edgeCount:
hasWorkflowState && Array.isArray((workflowState as any).edges)
? (workflowState as any).edges.length
: 0,
})
if (hasWorkflowState) {
if (workflowState) {
const diffStore = useWorkflowDiffStore.getState()
diffStore
.setProposedChanges(resultPayload.workflowState as WorkflowState)
.catch((err) => {
logger.error('[SSE] Failed to apply edit_workflow diff', {
error: err instanceof Error ? err.message : String(err),
})
diffStore.setProposedChanges(workflowState).catch((err) => {
logger.error('[SSE] Failed to apply workflow edit diff', {
error: err instanceof Error ? err.message : String(err),
toolName: current.name,
})
})
}
} catch (err) {
logger.error('[SSE] edit_workflow result handling failed', {
logger.error('[SSE] workflow edit result handling failed', {
error: err instanceof Error ? err.message : String(err),
toolName: current.name,
})
}
}
@@ -460,16 +533,21 @@ export const sseHandlers: Record<string, SSEHandler> = {
: failedDependency || skipped
? ClientToolCallState.rejected
: ClientToolCallState.error
const paramsForBlock =
b.toolCall?.id === toolCallId
? paramsForCurrentToolCall || b.toolCall?.params
: b.toolCall?.params
context.contentBlocks[i] = {
...b,
toolCall: {
...b.toolCall,
params: paramsForBlock,
state: targetState,
display: resolveToolDisplay(
b.toolCall?.name,
targetState,
toolCallId,
b.toolCall?.params
paramsForBlock
),
},
}

View File

@@ -9,6 +9,8 @@ import type { SSEEvent } from '@/lib/copilot/orchestrator/types'
import { resolveToolDisplay } from '@/lib/copilot/store-utils'
import { ClientToolCallState } from '@/lib/copilot/tools/client/tool-display-registry'
import type { CopilotStore, CopilotToolCall } from '@/stores/panel/copilot/types'
import { useWorkflowDiffStore } from '@/stores/workflow-diff/store'
import type { WorkflowState } from '@/stores/workflows/workflow/types'
import {
type SSEHandler,
sendAutoAcceptConfirmation,
@@ -24,6 +26,39 @@ type StoreSet = (
partial: Partial<CopilotStore> | ((state: CopilotStore) => Partial<CopilotStore>)
) => void
function isWorkflowChangeApplyCall(toolCall: CopilotToolCall): boolean {
if (toolCall.name !== 'workflow_change') return false
const params = (toolCall.params || {}) as Record<string, unknown>
const mode = typeof params.mode === 'string' ? params.mode.toLowerCase() : ''
if (mode === 'apply') return true
return typeof params.proposalId === 'string' && params.proposalId.length > 0
}
function extractWorkflowStateFromResultPayload(
resultPayload: Record<string, unknown>
): WorkflowState | null {
const directState = asRecord(resultPayload.workflowState)
if (directState) return directState as unknown as WorkflowState
const editResult = asRecord(resultPayload.editResult)
const nestedState = asRecord(editResult?.workflowState)
if (nestedState) return nestedState as unknown as WorkflowState
return null
}
function extractOperationListFromResultPayload(
resultPayload: Record<string, unknown>
): Array<Record<string, unknown>> | undefined {
const operations = resultPayload.operations
if (Array.isArray(operations)) return operations as Array<Record<string, unknown>>
const compiled = resultPayload.compiledOperations
if (Array.isArray(compiled)) return compiled as Array<Record<string, unknown>>
return undefined
}
export function appendSubAgentContent(
context: ClientStreamingContext,
parentToolCallId: string,
@@ -282,10 +317,29 @@ export const subAgentSSEHandlers: Record<string, SSEHandler> = {
if (existingIndex >= 0) {
const existing = context.subAgentToolCalls[parentToolCallId][existingIndex]
let nextParams = existing.params
const resultPayload = asRecord(
data?.result || resultData.result || resultData.data || data?.data
)
if (
targetState === ClientToolCallState.success &&
isWorkflowChangeApplyCall(existing) &&
resultPayload
) {
const operations = extractOperationListFromResultPayload(resultPayload)
if (operations && operations.length > 0) {
nextParams = {
...(existing.params || {}),
operations,
}
}
}
const updatedSubAgentToolCall = {
...existing,
params: nextParams,
state: targetState,
display: resolveToolDisplay(existing.name, targetState, toolCallId, existing.params),
display: resolveToolDisplay(existing.name, targetState, toolCallId, nextParams),
}
context.subAgentToolCalls[parentToolCallId][existingIndex] = updatedSubAgentToolCall
@@ -309,6 +363,23 @@ export const subAgentSSEHandlers: Record<string, SSEHandler> = {
state: targetState,
})
}
if (
targetState === ClientToolCallState.success &&
resultPayload &&
isWorkflowChangeApplyCall(updatedSubAgentToolCall)
) {
const workflowState = extractWorkflowStateFromResultPayload(resultPayload)
if (workflowState) {
const diffStore = useWorkflowDiffStore.getState()
diffStore.setProposedChanges(workflowState).catch((error) => {
logger.error('[SubAgent] Failed to apply workflow_change diff', {
error: error instanceof Error ? error.message : String(error),
toolCallId,
})
})
}
}
}
updateToolCallWithSubAgentData(context, get, set, parentToolCallId)

View File

@@ -325,6 +325,10 @@ const SERVER_TOOLS = new Set<string>([
'get_block_config',
'get_trigger_blocks',
'edit_workflow',
'workflow_context_get',
'workflow_context_expand',
'workflow_change',
'workflow_verify',
'get_workflow_console',
'search_documentation',
'search_online',

View File

@@ -609,6 +609,83 @@ const META_edit_workflow: ToolMetadata = {
},
}
const META_workflow_change: ToolMetadata = {
displayNames: {
[ClientToolCallState.generating]: { text: 'Planning workflow changes', icon: Loader2 },
[ClientToolCallState.executing]: { text: 'Applying workflow changes', icon: Loader2 },
[ClientToolCallState.success]: { text: 'Updated your workflow', icon: Grid2x2Check },
[ClientToolCallState.error]: { text: 'Failed to update your workflow', icon: XCircle },
[ClientToolCallState.review]: { text: 'Review your workflow changes', icon: Grid2x2 },
[ClientToolCallState.rejected]: { text: 'Rejected workflow changes', icon: Grid2x2X },
[ClientToolCallState.aborted]: { text: 'Aborted workflow changes', icon: MinusCircle },
[ClientToolCallState.pending]: { text: 'Planning workflow changes', icon: Loader2 },
},
getDynamicText: (params, state) => {
const mode = typeof params?.mode === 'string' ? params.mode.toLowerCase() : ''
if (mode === 'dry_run') {
switch (state) {
case ClientToolCallState.success:
return 'Planned workflow changes'
case ClientToolCallState.executing:
case ClientToolCallState.generating:
case ClientToolCallState.pending:
return 'Planning workflow changes'
}
}
if (mode === 'apply' || typeof params?.proposalId === 'string') {
switch (state) {
case ClientToolCallState.success:
return 'Applied workflow changes'
case ClientToolCallState.executing:
case ClientToolCallState.generating:
case ClientToolCallState.pending:
return 'Applying workflow changes'
}
}
return undefined
},
uiConfig: {
isSpecial: true,
customRenderer: 'edit_summary',
},
}
const META_workflow_context_get: ToolMetadata = {
displayNames: {
[ClientToolCallState.generating]: { text: 'Gathering workflow context', icon: Loader2 },
[ClientToolCallState.pending]: { text: 'Gathering workflow context', icon: Loader2 },
[ClientToolCallState.executing]: { text: 'Gathering workflow context', icon: Loader2 },
[ClientToolCallState.success]: { text: 'Gathered workflow context', icon: FileText },
[ClientToolCallState.error]: { text: 'Failed to gather workflow context', icon: XCircle },
[ClientToolCallState.rejected]: { text: 'Skipped workflow context', icon: MinusCircle },
[ClientToolCallState.aborted]: { text: 'Aborted workflow context', icon: MinusCircle },
},
}
const META_workflow_context_expand: ToolMetadata = {
displayNames: {
[ClientToolCallState.generating]: { text: 'Expanding workflow schemas', icon: Loader2 },
[ClientToolCallState.pending]: { text: 'Expanding workflow schemas', icon: Loader2 },
[ClientToolCallState.executing]: { text: 'Expanding workflow schemas', icon: Loader2 },
[ClientToolCallState.success]: { text: 'Expanded workflow schemas', icon: FileText },
[ClientToolCallState.error]: { text: 'Failed to expand workflow schemas', icon: XCircle },
[ClientToolCallState.rejected]: { text: 'Skipped schema expansion', icon: MinusCircle },
[ClientToolCallState.aborted]: { text: 'Aborted schema expansion', icon: MinusCircle },
},
}
const META_workflow_verify: ToolMetadata = {
displayNames: {
[ClientToolCallState.generating]: { text: 'Verifying workflow', icon: Loader2 },
[ClientToolCallState.pending]: { text: 'Verifying workflow', icon: Loader2 },
[ClientToolCallState.executing]: { text: 'Verifying workflow', icon: Loader2 },
[ClientToolCallState.success]: { text: 'Verified workflow', icon: CheckCircle2 },
[ClientToolCallState.error]: { text: 'Workflow verification failed', icon: XCircle },
[ClientToolCallState.rejected]: { text: 'Skipped workflow verification', icon: MinusCircle },
[ClientToolCallState.aborted]: { text: 'Aborted workflow verification', icon: MinusCircle },
},
}
const META_evaluate: ToolMetadata = {
displayNames: {
[ClientToolCallState.generating]: { text: 'Evaluating', icon: Loader2 },
@@ -2542,6 +2619,10 @@ const TOOL_METADATA_BY_ID: Record<string, ToolMetadata> = {
deploy_mcp: META_deploy_mcp,
edit: META_edit,
edit_workflow: META_edit_workflow,
workflow_context_get: META_workflow_context_get,
workflow_context_expand: META_workflow_context_expand,
workflow_change: META_workflow_change,
workflow_verify: META_workflow_verify,
evaluate: META_evaluate,
get_block_config: META_get_block_config,
get_block_options: META_get_block_options,

View File

@@ -13,6 +13,12 @@ import { getCredentialsServerTool } from '@/lib/copilot/tools/server/user/get-cr
import { setEnvironmentVariablesServerTool } from '@/lib/copilot/tools/server/user/set-environment-variables'
import { editWorkflowServerTool } from '@/lib/copilot/tools/server/workflow/edit-workflow'
import { getWorkflowConsoleServerTool } from '@/lib/copilot/tools/server/workflow/get-workflow-console'
import { workflowChangeServerTool } from '@/lib/copilot/tools/server/workflow/workflow-change'
import {
workflowContextExpandServerTool,
workflowContextGetServerTool,
} from '@/lib/copilot/tools/server/workflow/workflow-context'
import { workflowVerifyServerTool } from '@/lib/copilot/tools/server/workflow/workflow-verify'
import { ExecuteResponseSuccessSchema } from '@/lib/copilot/tools/shared/schemas'
export { ExecuteResponseSuccessSchema }
@@ -35,6 +41,10 @@ const serverToolRegistry: Record<string, BaseServerTool> = {
[getCredentialsServerTool.name]: getCredentialsServerTool,
[makeApiRequestServerTool.name]: makeApiRequestServerTool,
[knowledgeBaseServerTool.name]: knowledgeBaseServerTool,
[workflowContextGetServerTool.name]: workflowContextGetServerTool,
[workflowContextExpandServerTool.name]: workflowContextExpandServerTool,
[workflowChangeServerTool.name]: workflowChangeServerTool,
[workflowVerifyServerTool.name]: workflowVerifyServerTool,
}
/**

View File

@@ -0,0 +1,93 @@
import crypto from 'crypto'
type StoreEntry<T> = {
value: T
expiresAt: number
}
const DEFAULT_TTL_MS = 30 * 60 * 1000
const MAX_ENTRIES = 500
class TTLStore<T> {
private readonly data = new Map<string, StoreEntry<T>>()
constructor(private readonly ttlMs = DEFAULT_TTL_MS) {}
set(value: T): string {
this.gc()
if (this.data.size >= MAX_ENTRIES) {
const firstKey = this.data.keys().next().value as string | undefined
if (firstKey) {
this.data.delete(firstKey)
}
}
const id = crypto.randomUUID()
this.data.set(id, {
value,
expiresAt: Date.now() + this.ttlMs,
})
return id
}
get(id: string): T | null {
const entry = this.data.get(id)
if (!entry) return null
if (entry.expiresAt <= Date.now()) {
this.data.delete(id)
return null
}
return entry.value
}
private gc(): void {
const now = Date.now()
for (const [key, entry] of this.data.entries()) {
if (entry.expiresAt <= now) {
this.data.delete(key)
}
}
}
}
export type WorkflowContextPack = {
workflowId: string
snapshotHash: string
workflowState: {
blocks: Record<string, any>
edges: Array<Record<string, any>>
loops: Record<string, any>
parallels: Record<string, any>
}
schemasByType: Record<string, any>
schemaRefsByType: Record<string, string>
summary: Record<string, any>
}
export type WorkflowChangeProposal = {
workflowId: string
baseSnapshotHash: string
compiledOperations: Array<Record<string, any>>
diffSummary: Record<string, any>
warnings: string[]
diagnostics: string[]
touchedBlocks: string[]
}
const contextPackStore = new TTLStore<WorkflowContextPack>()
const proposalStore = new TTLStore<WorkflowChangeProposal>()
export function saveContextPack(pack: WorkflowContextPack): string {
return contextPackStore.set(pack)
}
export function getContextPack(id: string): WorkflowContextPack | null {
return contextPackStore.get(id)
}
export function saveProposal(proposal: WorkflowChangeProposal): string {
return proposalStore.set(proposal)
}
export function getProposal(id: string): WorkflowChangeProposal | null {
return proposalStore.get(id)
}

View File

@@ -0,0 +1,987 @@
import crypto from 'crypto'
import { createLogger } from '@sim/logger'
import { z } from 'zod'
import type { BaseServerTool } from '@/lib/copilot/tools/server/base-tool'
import { getCredentialsServerTool } from '@/lib/copilot/tools/server/user/get-credentials'
import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils'
import { getBlock } from '@/blocks/registry'
import { getUserPermissionConfig } from '@/ee/access-control/utils/permission-check'
import {
getContextPack,
getProposal,
saveProposal,
type WorkflowChangeProposal,
} from './change-store'
import { editWorkflowServerTool } from './edit-workflow'
import { applyOperationsToWorkflowState } from './edit-workflow/engine'
import { preValidateCredentialInputs } from './edit-workflow/validation'
import { hashWorkflowState, loadWorkflowStateFromDb } from './workflow-state'
const logger = createLogger('WorkflowChangeServerTool')
const TargetSchema = z
.object({
blockId: z.string().optional(),
alias: z.string().optional(),
match: z
.object({
type: z.string().optional(),
name: z.string().optional(),
})
.optional(),
})
.strict()
const CredentialSelectionSchema = z
.object({
strategy: z.enum(['first_connected', 'by_id', 'by_name']).optional(),
id: z.string().optional(),
name: z.string().optional(),
})
.strict()
const ChangeOperationSchema = z
.object({
op: z.enum(['set', 'unset', 'merge', 'append', 'remove', 'attach_credential']),
path: z.string().optional(),
value: z.any().optional(),
provider: z.string().optional(),
selection: CredentialSelectionSchema.optional(),
required: z.boolean().optional(),
})
.strict()
const MutationSchema = z
.object({
action: z.enum([
'ensure_block',
'patch_block',
'remove_block',
'connect',
'disconnect',
'ensure_variable',
'set_variable',
]),
target: TargetSchema.optional(),
type: z.string().optional(),
name: z.string().optional(),
inputs: z.record(z.any()).optional(),
triggerMode: z.boolean().optional(),
advancedMode: z.boolean().optional(),
enabled: z.boolean().optional(),
changes: z.array(ChangeOperationSchema).optional(),
from: TargetSchema.optional(),
to: TargetSchema.optional(),
handle: z.string().optional(),
toHandle: z.string().optional(),
mode: z.enum(['set', 'append', 'remove']).optional(),
})
.strict()
const LinkEndpointSchema = z
.object({
blockId: z.string().optional(),
alias: z.string().optional(),
match: z
.object({
type: z.string().optional(),
name: z.string().optional(),
})
.optional(),
handle: z.string().optional(),
})
.strict()
const LinkSchema = z
.object({
from: LinkEndpointSchema,
to: LinkEndpointSchema,
mode: z.enum(['set', 'append', 'remove']).optional(),
})
.strict()
const ChangeSpecSchema = z
.object({
objective: z.string().optional(),
constraints: z.record(z.any()).optional(),
resources: z.record(z.any()).optional(),
mutations: z.array(MutationSchema).optional(),
links: z.array(LinkSchema).optional(),
acceptance: z.array(z.any()).optional(),
})
.strict()
const WorkflowChangeInputSchema = z
.object({
mode: z.enum(['dry_run', 'apply']),
workflowId: z.string().optional(),
contextPackId: z.string().optional(),
proposalId: z.string().optional(),
baseSnapshotHash: z.string().optional(),
expectedSnapshotHash: z.string().optional(),
changeSpec: ChangeSpecSchema.optional(),
})
.strict()
type WorkflowChangeParams = z.input<typeof WorkflowChangeInputSchema>
type ChangeSpec = z.input<typeof ChangeSpecSchema>
type TargetRef = z.input<typeof TargetSchema>
type ChangeOperation = z.input<typeof ChangeOperationSchema>
type CredentialRecord = {
id: string
name: string
provider: string
isDefault?: boolean
}
type ConnectionTarget = {
block: string
handle?: string
}
type ConnectionState = Map<string, Map<string, ConnectionTarget[]>>
function createDraftBlockId(seed?: string): string {
const suffix = crypto.randomUUID().slice(0, 8)
const base = seed ? seed.replace(/[^a-zA-Z0-9]/g, '').slice(0, 24) : 'draft'
return `${base || 'draft'}_${suffix}`
}
function normalizeHandle(handle?: string): string {
if (!handle) return 'source'
if (handle === 'success') return 'source'
return handle
}
function deepClone<T>(value: T): T {
return JSON.parse(JSON.stringify(value))
}
function stableUnique(values: string[]): string[] {
return [...new Set(values.filter(Boolean))]
}
function buildConnectionState(workflowState: {
edges: Array<Record<string, any>>
}): ConnectionState {
const state: ConnectionState = new Map()
for (const edge of workflowState.edges || []) {
const source = String(edge.source || '')
const target = String(edge.target || '')
if (!source || !target) continue
const sourceHandle = normalizeHandle(String(edge.sourceHandle || 'source'))
const targetHandle = edge.targetHandle ? String(edge.targetHandle) : undefined
let handleMap = state.get(source)
if (!handleMap) {
handleMap = new Map()
state.set(source, handleMap)
}
const existing = handleMap.get(sourceHandle) || []
existing.push({ block: target, handle: targetHandle })
handleMap.set(sourceHandle, existing)
}
return state
}
function connectionStateToPayload(state: Map<string, ConnectionTarget[]>): Record<string, any> {
const payload: Record<string, any> = {}
for (const [handle, targets] of state.entries()) {
if (!targets || targets.length === 0) continue
const normalizedTargets = targets.map((target) => {
if (!target.handle || target.handle === 'target') {
return target.block
}
return { block: target.block, handle: target.handle }
})
payload[handle] = normalizedTargets.length === 1 ? normalizedTargets[0] : normalizedTargets
}
return payload
}
function findMatchingBlockId(
workflowState: { blocks: Record<string, any> },
target: TargetRef
): string | null {
if (target.blockId && workflowState.blocks[target.blockId]) {
return target.blockId
}
if (target.match) {
const type = target.match.type
const name = target.match.name?.toLowerCase()
const matches = Object.entries(workflowState.blocks || {}).filter(([_, block]) => {
const blockType = String((block as Record<string, unknown>).type || '')
const blockName = String((block as Record<string, unknown>).name || '').toLowerCase()
const typeOk = type ? blockType === type : true
const nameOk = name ? blockName === name : true
return typeOk && nameOk
})
if (matches.length === 1) {
return matches[0][0]
}
if (matches.length > 1) {
throw new Error(
`ambiguous_target: target match resolved to ${matches.length} blocks (${matches.map(([id]) => id).join(', ')})`
)
}
}
return null
}
function getNestedValue(value: any, path: string[]): any {
let cursor = value
for (const segment of path) {
if (cursor == null || typeof cursor !== 'object') return undefined
cursor = cursor[segment]
}
return cursor
}
function setNestedValue(base: any, path: string[], nextValue: any): any {
if (path.length === 0) return nextValue
const out = Array.isArray(base) ? [...base] : { ...(base || {}) }
let cursor: any = out
for (let i = 0; i < path.length - 1; i++) {
const key = path[i]
const current = cursor[key]
cursor[key] =
current && typeof current === 'object'
? Array.isArray(current)
? [...current]
: { ...current }
: {}
cursor = cursor[key]
}
cursor[path[path.length - 1]] = nextValue
return out
}
function removeArrayItem(arr: unknown[], value: unknown): unknown[] {
return arr.filter((item) => JSON.stringify(item) !== JSON.stringify(value))
}
function selectCredentialId(
availableCredentials: CredentialRecord[],
provider: string,
selection: z.infer<typeof CredentialSelectionSchema> | undefined
): string | null {
const providerLower = provider.toLowerCase()
const providerMatches = availableCredentials.filter((credential) => {
const credentialProvider = credential.provider.toLowerCase()
return (
credentialProvider === providerLower || credentialProvider.startsWith(`${providerLower}-`)
)
})
const pool = providerMatches.length > 0 ? providerMatches : availableCredentials
const strategy = selection?.strategy || 'first_connected'
if (strategy === 'by_id') {
const id = selection?.id
if (!id) return null
return pool.find((credential) => credential.id === id)?.id || null
}
if (strategy === 'by_name') {
const name = selection?.name?.toLowerCase()
if (!name) return null
const exact = pool.find((credential) => credential.name.toLowerCase() === name)
if (exact) return exact.id
const partial = pool.find((credential) => credential.name.toLowerCase().includes(name))
return partial?.id || null
}
const defaultCredential = pool.find((credential) => credential.isDefault)
if (defaultCredential) return defaultCredential.id
return pool[0]?.id || null
}
function selectCredentialFieldId(blockType: string, provider: string): string | null {
const blockConfig = getBlock(blockType)
if (!blockConfig) return null
const oauthFields = (blockConfig.subBlocks || []).filter(
(subBlock) => subBlock.type === 'oauth-input'
)
if (oauthFields.length === 0) return null
const providerKey = provider.replace(/[^a-zA-Z0-9]/g, '').toLowerCase()
const fieldMatch = oauthFields.find((subBlock) =>
subBlock.id
.replace(/[^a-zA-Z0-9]/g, '')
.toLowerCase()
.includes(providerKey)
)
if (fieldMatch) return fieldMatch.id
return oauthFields[0].id
}
function ensureConnectionTarget(
existing: ConnectionTarget[],
target: ConnectionTarget,
mode: 'set' | 'append' | 'remove'
): ConnectionTarget[] {
if (mode === 'set') {
return [target]
}
if (mode === 'remove') {
return existing.filter(
(item) =>
!(item.block === target.block && (item.handle || 'target') === (target.handle || 'target'))
)
}
const duplicate = existing.some(
(item) =>
item.block === target.block && (item.handle || 'target') === (target.handle || 'target')
)
if (duplicate) return existing
return [...existing, target]
}
async function compileChangeSpec(params: {
changeSpec: ChangeSpec
workflowState: {
blocks: Record<string, any>
edges: Array<Record<string, any>>
loops: Record<string, any>
parallels: Record<string, any>
}
userId: string
workflowId: string
}): Promise<{
operations: Array<Record<string, any>>
warnings: string[]
diagnostics: string[]
touchedBlocks: string[]
}> {
const { changeSpec, workflowState, userId, workflowId } = params
const operations: Array<Record<string, any>> = []
const diagnostics: string[] = []
const warnings: string[] = []
const touchedBlocks = new Set<string>()
const aliasMap = new Map<string, string>()
const workingState = deepClone(workflowState)
const connectionState = buildConnectionState(workingState)
const connectionTouchedSources = new Set<string>()
const plannedBlockTypes = new Map<string, string>()
// Seed aliases from existing block names.
for (const [blockId, block] of Object.entries(workingState.blocks || {})) {
const blockName = String((block as Record<string, unknown>).name || '')
if (!blockName) continue
const normalizedAlias = blockName.replace(/[^a-zA-Z0-9]/g, '')
if (normalizedAlias && !aliasMap.has(normalizedAlias)) {
aliasMap.set(normalizedAlias, blockId)
}
}
const credentialsResponse = await getCredentialsServerTool.execute({ workflowId }, { userId })
const availableCredentials: CredentialRecord[] =
credentialsResponse?.oauth?.connected?.credentials?.map((credential: any) => ({
id: String(credential.id || ''),
name: String(credential.name || ''),
provider: String(credential.provider || ''),
isDefault: Boolean(credential.isDefault),
})) || []
const resolveTarget = (
target: TargetRef | undefined,
allowCreateAlias = false
): string | null => {
if (!target) return null
if (target.blockId) {
if (workingState.blocks[target.blockId] || plannedBlockTypes.has(target.blockId)) {
return target.blockId
}
return allowCreateAlias ? target.blockId : null
}
if (target.alias) {
if (aliasMap.has(target.alias)) return aliasMap.get(target.alias) || null
const byMatch = findMatchingBlockId(workingState, { alias: target.alias })
if (byMatch) {
aliasMap.set(target.alias, byMatch)
return byMatch
}
return allowCreateAlias ? target.alias : null
}
const matched = findMatchingBlockId(workingState, target)
if (matched) return matched
return null
}
const applyPatchChange = (
targetId: string,
blockType: string | null,
change: ChangeOperation,
paramsOut: Record<string, any>
): void => {
if (change.op === 'attach_credential') {
const provider = change.provider
if (!provider) {
diagnostics.push(`attach_credential on ${targetId} is missing provider`)
return
}
if (!blockType) {
diagnostics.push(`attach_credential on ${targetId} failed: unknown block type`)
return
}
const credentialFieldId = selectCredentialFieldId(blockType, provider)
if (!credentialFieldId) {
const msg = `No oauth input field found for block type "${blockType}" on ${targetId}`
if (change.required) diagnostics.push(msg)
else warnings.push(msg)
return
}
const credentialId = selectCredentialId(availableCredentials, provider, change.selection)
if (!credentialId) {
const msg = `No credential found for provider "${provider}" on ${targetId}`
if (change.required) diagnostics.push(msg)
else warnings.push(msg)
return
}
paramsOut.inputs = paramsOut.inputs || {}
paramsOut.inputs[credentialFieldId] = credentialId
return
}
if (!change.path) {
diagnostics.push(`${change.op} on ${targetId} requires a path`)
return
}
const pathSegments = change.path.split('.').filter(Boolean)
if (pathSegments.length === 0) {
diagnostics.push(`${change.op} on ${targetId} has an invalid path "${change.path}"`)
return
}
if (pathSegments[0] === 'inputs') {
const inputKey = pathSegments[1]
if (!inputKey) {
diagnostics.push(`${change.op} on ${targetId} has invalid input path "${change.path}"`)
return
}
const currentInputValue =
paramsOut.inputs?.[inputKey] ??
workingState.blocks[targetId]?.subBlocks?.[inputKey]?.value ??
null
let nextInputValue = currentInputValue
const nestedPath = pathSegments.slice(2)
if (change.op === 'set') {
nextInputValue =
nestedPath.length > 0
? setNestedValue(currentInputValue ?? {}, nestedPath, change.value)
: change.value
} else if (change.op === 'unset') {
nextInputValue =
nestedPath.length > 0 ? setNestedValue(currentInputValue ?? {}, nestedPath, null) : null
} else if (change.op === 'merge') {
if (nestedPath.length > 0) {
const baseObject = getNestedValue(currentInputValue ?? {}, nestedPath) || {}
if (
baseObject &&
typeof baseObject === 'object' &&
change.value &&
typeof change.value === 'object'
) {
nextInputValue = setNestedValue(currentInputValue ?? {}, nestedPath, {
...baseObject,
...(change.value as Record<string, unknown>),
})
} else {
diagnostics.push(`merge on ${targetId} at "${change.path}" requires object values`)
return
}
} else if (
currentInputValue &&
typeof currentInputValue === 'object' &&
!Array.isArray(currentInputValue) &&
change.value &&
typeof change.value === 'object' &&
!Array.isArray(change.value)
) {
nextInputValue = { ...currentInputValue, ...(change.value as Record<string, unknown>) }
} else if (currentInputValue == null && change.value && typeof change.value === 'object') {
nextInputValue = change.value
} else {
diagnostics.push(`merge on ${targetId} at "${change.path}" requires object values`)
return
}
} else if (change.op === 'append') {
const arr = Array.isArray(currentInputValue) ? [...currentInputValue] : []
arr.push(change.value)
nextInputValue = arr
} else if (change.op === 'remove') {
if (!Array.isArray(currentInputValue)) {
diagnostics.push(`remove on ${targetId} at "${change.path}" requires an array value`)
return
}
nextInputValue = removeArrayItem(currentInputValue, change.value)
}
paramsOut.inputs = paramsOut.inputs || {}
paramsOut.inputs[inputKey] = nextInputValue
return
}
if (pathSegments.length !== 1) {
diagnostics.push(
`Unsupported path "${change.path}" on ${targetId}. Use inputs.* or top-level field names.`
)
return
}
const topLevelField = pathSegments[0]
if (!['name', 'type', 'triggerMode', 'advancedMode', 'enabled'].includes(topLevelField)) {
diagnostics.push(`Unsupported top-level path "${change.path}" on ${targetId}`)
return
}
paramsOut[topLevelField] = change.op === 'unset' ? null : change.value
}
for (const mutation of changeSpec.mutations || []) {
if (mutation.action === 'ensure_block') {
const targetId = resolveTarget(mutation.target, true)
if (!targetId) {
diagnostics.push('ensure_block is missing a resolvable target')
continue
}
const existingBlock = workingState.blocks[targetId]
if (existingBlock) {
const editParams: Record<string, any> = {}
if (mutation.name) editParams.name = mutation.name
if (mutation.type) editParams.type = mutation.type
if (mutation.inputs) editParams.inputs = mutation.inputs
if (mutation.triggerMode !== undefined) editParams.triggerMode = mutation.triggerMode
if (mutation.advancedMode !== undefined) editParams.advancedMode = mutation.advancedMode
if (mutation.enabled !== undefined) editParams.enabled = mutation.enabled
operations.push({
operation_type: 'edit',
block_id: targetId,
params: editParams,
})
touchedBlocks.add(targetId)
} else {
if (!mutation.type || !mutation.name) {
diagnostics.push(`ensure_block for "${targetId}" requires type and name when creating`)
continue
}
const blockId =
mutation.target?.blockId || mutation.target?.alias || createDraftBlockId(mutation.name)
const addParams: Record<string, any> = {
type: mutation.type,
name: mutation.name,
}
if (mutation.inputs) addParams.inputs = mutation.inputs
if (mutation.triggerMode !== undefined) addParams.triggerMode = mutation.triggerMode
if (mutation.advancedMode !== undefined) addParams.advancedMode = mutation.advancedMode
if (mutation.enabled !== undefined) addParams.enabled = mutation.enabled
operations.push({
operation_type: 'add',
block_id: blockId,
params: addParams,
})
workingState.blocks[blockId] = {
id: blockId,
type: mutation.type,
name: mutation.name,
subBlocks: Object.fromEntries(
Object.entries(mutation.inputs || {}).map(([key, value]) => [
key,
{ id: key, value, type: 'short-input' },
])
),
triggerMode: mutation.triggerMode || false,
advancedMode: mutation.advancedMode || false,
enabled: mutation.enabled !== undefined ? mutation.enabled : true,
}
plannedBlockTypes.set(blockId, mutation.type)
touchedBlocks.add(blockId)
if (mutation.target?.alias) aliasMap.set(mutation.target.alias, blockId)
}
continue
}
if (mutation.action === 'patch_block') {
const targetId = resolveTarget(mutation.target)
if (!targetId) {
diagnostics.push('patch_block target could not be resolved')
continue
}
const blockType =
String(workingState.blocks[targetId]?.type || '') || plannedBlockTypes.get(targetId) || null
const editParams: Record<string, any> = {}
for (const change of mutation.changes || []) {
applyPatchChange(targetId, blockType, change, editParams)
}
if (Object.keys(editParams).length === 0) {
warnings.push(`patch_block for ${targetId} had no effective changes`)
continue
}
operations.push({
operation_type: 'edit',
block_id: targetId,
params: editParams,
})
touchedBlocks.add(targetId)
continue
}
if (mutation.action === 'remove_block') {
const targetId = resolveTarget(mutation.target)
if (!targetId) {
diagnostics.push('remove_block target could not be resolved')
continue
}
operations.push({
operation_type: 'delete',
block_id: targetId,
params: {},
})
touchedBlocks.add(targetId)
connectionState.delete(targetId)
for (const [source, handles] of connectionState.entries()) {
for (const [handle, targets] of handles.entries()) {
const nextTargets = targets.filter((target) => target.block !== targetId)
handles.set(handle, nextTargets)
}
connectionTouchedSources.add(source)
}
continue
}
if (mutation.action === 'connect' || mutation.action === 'disconnect') {
const from = resolveTarget(mutation.from)
const to = resolveTarget(mutation.to)
if (!from || !to) {
diagnostics.push(`${mutation.action} requires resolvable from/to targets`)
continue
}
const sourceHandle = normalizeHandle(mutation.handle)
const targetHandle = mutation.toHandle || 'target'
let sourceMap = connectionState.get(from)
if (!sourceMap) {
sourceMap = new Map()
connectionState.set(from, sourceMap)
}
const existingTargets = sourceMap.get(sourceHandle) || []
const mode = mutation.action === 'disconnect' ? 'remove' : mutation.mode || 'set'
const nextTargets = ensureConnectionTarget(
existingTargets,
{ block: to, handle: targetHandle },
mode
)
sourceMap.set(sourceHandle, nextTargets)
connectionTouchedSources.add(from)
touchedBlocks.add(from)
}
}
for (const link of changeSpec.links || []) {
const from = resolveTarget(
{
blockId: link.from.blockId,
alias: link.from.alias,
match: link.from.match,
},
true
)
const to = resolveTarget(
{
blockId: link.to.blockId,
alias: link.to.alias,
match: link.to.match,
},
true
)
if (!from || !to) {
diagnostics.push('link contains unresolved from/to target')
continue
}
const sourceHandle = normalizeHandle(link.from.handle)
const targetHandle = link.to.handle || 'target'
let sourceMap = connectionState.get(from)
if (!sourceMap) {
sourceMap = new Map()
connectionState.set(from, sourceMap)
}
const existingTargets = sourceMap.get(sourceHandle) || []
const nextTargets = ensureConnectionTarget(
existingTargets,
{ block: to, handle: targetHandle },
link.mode || 'set'
)
sourceMap.set(sourceHandle, nextTargets)
connectionTouchedSources.add(from)
touchedBlocks.add(from)
}
for (const sourceBlockId of stableUnique([...connectionTouchedSources])) {
if (!connectionState.has(sourceBlockId)) continue
const sourceConnections = connectionState.get(sourceBlockId)!
operations.push({
operation_type: 'edit',
block_id: sourceBlockId,
params: {
connections: connectionStateToPayload(sourceConnections),
},
})
}
return {
operations,
warnings,
diagnostics,
touchedBlocks: [...touchedBlocks],
}
}
function summarizeDiff(
beforeState: { blocks: Record<string, any>; edges: Array<Record<string, any>> },
afterState: { blocks: Record<string, any>; edges: Array<Record<string, any>> },
operations: Array<Record<string, any>>
): Record<string, any> {
const beforeBlocks = Object.keys(beforeState.blocks || {}).length
const afterBlocks = Object.keys(afterState.blocks || {}).length
const beforeEdges = (beforeState.edges || []).length
const afterEdges = (afterState.edges || []).length
const counts = operations.reduce<Record<string, number>>((acc, operation) => {
const opType = String(operation.operation_type || 'unknown')
acc[opType] = (acc[opType] || 0) + 1
return acc
}, {})
return {
operationCounts: counts,
blocks: {
before: beforeBlocks,
after: afterBlocks,
delta: afterBlocks - beforeBlocks,
},
edges: {
before: beforeEdges,
after: afterEdges,
delta: afterEdges - beforeEdges,
},
}
}
async function validateAndSimulateOperations(params: {
workflowState: {
blocks: Record<string, any>
edges: Array<Record<string, any>>
loops: Record<string, any>
parallels: Record<string, any>
}
operations: Array<Record<string, any>>
userId: string
}): Promise<{
operationsForApply: Array<Record<string, any>>
simulatedState: {
blocks: Record<string, any>
edges: Array<Record<string, any>>
loops: Record<string, any>
parallels: Record<string, any>
}
warnings: string[]
diagnostics: string[]
}> {
const diagnostics: string[] = []
const warnings: string[] = []
const permissionConfig = await getUserPermissionConfig(params.userId)
const { filteredOperations, errors: preValidationErrors } = await preValidateCredentialInputs(
params.operations as any,
{ userId: params.userId },
params.workflowState
)
for (const error of preValidationErrors) {
warnings.push(error.error)
}
const { state, validationErrors, skippedItems } = applyOperationsToWorkflowState(
params.workflowState,
filteredOperations as any,
permissionConfig
)
for (const validationError of validationErrors) {
warnings.push(validationError.error)
}
for (const skippedItem of skippedItems) {
warnings.push(skippedItem.reason)
}
if (Object.keys(state.blocks || {}).length === 0) {
diagnostics.push('Simulation produced an empty workflow state')
}
return {
operationsForApply: filteredOperations as Array<Record<string, any>>,
simulatedState: state,
warnings,
diagnostics,
}
}
export const workflowChangeServerTool: BaseServerTool<WorkflowChangeParams, any> = {
name: 'workflow_change',
inputSchema: WorkflowChangeInputSchema,
async execute(params: WorkflowChangeParams, context?: { userId: string }): Promise<any> {
if (!context?.userId) {
throw new Error('Unauthorized workflow access')
}
if (params.mode === 'dry_run') {
const workflowId = params.workflowId || getContextPack(params.contextPackId || '')?.workflowId
if (!workflowId) {
throw new Error('workflowId is required for dry_run')
}
if (!params.changeSpec) {
throw new Error('changeSpec is required for dry_run')
}
const authorization = await authorizeWorkflowByWorkspacePermission({
workflowId,
userId: context.userId,
action: 'write',
})
if (!authorization.allowed) {
throw new Error(authorization.message || 'Unauthorized workflow access')
}
const { workflowState } = await loadWorkflowStateFromDb(workflowId)
const currentHash = hashWorkflowState(workflowState as unknown as Record<string, unknown>)
const requestedHash = params.baseSnapshotHash
if (requestedHash && requestedHash !== currentHash) {
throw new Error(
`snapshot_mismatch: expected ${requestedHash} but current state is ${currentHash}`
)
}
const compileResult = await compileChangeSpec({
changeSpec: params.changeSpec,
workflowState,
userId: context.userId,
workflowId,
})
const simulation = await validateAndSimulateOperations({
workflowState,
operations: compileResult.operations,
userId: context.userId,
})
const diffSummary = summarizeDiff(
workflowState,
simulation.simulatedState,
simulation.operationsForApply
)
const diagnostics = [...compileResult.diagnostics, ...simulation.diagnostics]
const warnings = [...compileResult.warnings, ...simulation.warnings]
const proposal: WorkflowChangeProposal = {
workflowId,
baseSnapshotHash: currentHash,
compiledOperations: simulation.operationsForApply,
diffSummary,
warnings,
diagnostics,
touchedBlocks: compileResult.touchedBlocks,
}
const proposalId = saveProposal(proposal)
logger.info('Compiled workflow_change dry run', {
workflowId,
proposalId,
operationCount: proposal.compiledOperations.length,
warningCount: warnings.length,
diagnosticsCount: diagnostics.length,
})
return {
success: diagnostics.length === 0,
mode: 'dry_run',
workflowId,
proposalId,
baseSnapshotHash: currentHash,
compiledOperations: proposal.compiledOperations,
diffSummary,
warnings,
diagnostics,
touchedBlocks: proposal.touchedBlocks,
}
}
// apply mode
const proposalId = params.proposalId
if (!proposalId) {
throw new Error('proposalId is required for apply')
}
const proposal = getProposal(proposalId)
if (!proposal) {
throw new Error(`Proposal not found or expired: ${proposalId}`)
}
const authorization = await authorizeWorkflowByWorkspacePermission({
workflowId: proposal.workflowId,
userId: context.userId,
action: 'write',
})
if (!authorization.allowed) {
throw new Error(authorization.message || 'Unauthorized workflow access')
}
const { workflowState } = await loadWorkflowStateFromDb(proposal.workflowId)
const currentHash = hashWorkflowState(workflowState as unknown as Record<string, unknown>)
const expectedHash = params.expectedSnapshotHash || proposal.baseSnapshotHash
if (expectedHash && expectedHash !== currentHash) {
throw new Error(`snapshot_mismatch: expected ${expectedHash} but current is ${currentHash}`)
}
const applyResult = await editWorkflowServerTool.execute(
{
workflowId: proposal.workflowId,
operations: proposal.compiledOperations as any,
},
{ userId: context.userId }
)
const appliedWorkflowState = (applyResult as any)?.workflowState
const newSnapshotHash = appliedWorkflowState
? hashWorkflowState(appliedWorkflowState as Record<string, unknown>)
: null
return {
success: true,
mode: 'apply',
workflowId: proposal.workflowId,
proposalId,
baseSnapshotHash: proposal.baseSnapshotHash,
newSnapshotHash,
operations: proposal.compiledOperations,
workflowState: appliedWorkflowState || null,
appliedDiff: proposal.diffSummary,
warnings: proposal.warnings,
diagnostics: proposal.diagnostics,
editResult: applyResult,
}
},
}

View File

@@ -0,0 +1,158 @@
import { createLogger } from '@sim/logger'
import { z } from 'zod'
import type { BaseServerTool } from '@/lib/copilot/tools/server/base-tool'
import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils'
import { getContextPack, saveContextPack } from './change-store'
import {
buildSchemasByType,
getAllKnownBlockTypes,
hashWorkflowState,
loadWorkflowStateFromDb,
summarizeWorkflowState,
} from './workflow-state'
const logger = createLogger('WorkflowContextServerTool')
const WorkflowContextGetInputSchema = z.object({
workflowId: z.string(),
objective: z.string().optional(),
includeBlockTypes: z.array(z.string()).optional(),
includeAllSchemas: z.boolean().optional(),
})
type WorkflowContextGetParams = z.infer<typeof WorkflowContextGetInputSchema>
const WorkflowContextExpandInputSchema = z.object({
contextPackId: z.string(),
blockTypes: z.array(z.string()).optional(),
schemaRefs: z.array(z.string()).optional(),
})
type WorkflowContextExpandParams = z.infer<typeof WorkflowContextExpandInputSchema>
function parseSchemaRefToBlockType(schemaRef: string): string | null {
if (!schemaRef) return null
const [blockType] = schemaRef.split('@')
return blockType || null
}
function buildAvailableBlockCatalog(
schemaRefsByType: Record<string, string>
): Array<Record<string, any>> {
return Object.entries(schemaRefsByType)
.sort((a, b) => a[0].localeCompare(b[0]))
.map(([blockType, schemaRef]) => ({
blockType,
schemaRef,
}))
}
export const workflowContextGetServerTool: BaseServerTool<WorkflowContextGetParams, any> = {
name: 'workflow_context_get',
inputSchema: WorkflowContextGetInputSchema,
async execute(params: WorkflowContextGetParams, context?: { userId: string }): Promise<any> {
if (!context?.userId) {
throw new Error('Unauthorized workflow access')
}
const authorization = await authorizeWorkflowByWorkspacePermission({
workflowId: params.workflowId,
userId: context.userId,
action: 'read',
})
if (!authorization.allowed) {
throw new Error(authorization.message || 'Unauthorized workflow access')
}
const { workflowState } = await loadWorkflowStateFromDb(params.workflowId)
const snapshotHash = hashWorkflowState(workflowState as unknown as Record<string, unknown>)
const blockTypesInWorkflow = Object.values(workflowState.blocks || {}).map((block: any) =>
String(block?.type || '')
)
const requestedTypes = params.includeBlockTypes || []
const includeAllSchemas = params.includeAllSchemas === true
const candidateTypes = includeAllSchemas
? getAllKnownBlockTypes()
: [...blockTypesInWorkflow, ...requestedTypes]
const { schemasByType, schemaRefsByType } = buildSchemasByType(candidateTypes)
const summary = summarizeWorkflowState(workflowState)
const packId = saveContextPack({
workflowId: params.workflowId,
snapshotHash,
workflowState,
schemasByType,
schemaRefsByType,
summary: {
...summary,
objective: params.objective || null,
},
})
logger.info('Generated workflow context pack', {
workflowId: params.workflowId,
contextPackId: packId,
schemaCount: Object.keys(schemaRefsByType).length,
})
return {
success: true,
contextPackId: packId,
workflowId: params.workflowId,
snapshotHash,
summary: {
...summary,
objective: params.objective || null,
},
schemaRefsByType,
availableBlockCatalog: buildAvailableBlockCatalog(schemaRefsByType),
inScopeSchemas: schemasByType,
}
},
}
export const workflowContextExpandServerTool: BaseServerTool<WorkflowContextExpandParams, any> = {
name: 'workflow_context_expand',
inputSchema: WorkflowContextExpandInputSchema,
async execute(params: WorkflowContextExpandParams, context?: { userId: string }): Promise<any> {
if (!context?.userId) {
throw new Error('Unauthorized workflow access')
}
const contextPack = getContextPack(params.contextPackId)
if (!contextPack) {
throw new Error(`Context pack not found or expired: ${params.contextPackId}`)
}
const authorization = await authorizeWorkflowByWorkspacePermission({
workflowId: contextPack.workflowId,
userId: context.userId,
action: 'read',
})
if (!authorization.allowed) {
throw new Error(authorization.message || 'Unauthorized workflow access')
}
const requestedBlockTypes = new Set<string>()
for (const blockType of params.blockTypes || []) {
if (blockType) requestedBlockTypes.add(blockType)
}
for (const schemaRef of params.schemaRefs || []) {
const blockType = parseSchemaRefToBlockType(schemaRef)
if (blockType) requestedBlockTypes.add(blockType)
}
const typesToExpand = [...requestedBlockTypes]
const { schemasByType, schemaRefsByType } = buildSchemasByType(typesToExpand)
return {
success: true,
contextPackId: params.contextPackId,
workflowId: contextPack.workflowId,
snapshotHash: contextPack.snapshotHash,
schemasByType,
schemaRefsByType,
}
},
}

View File

@@ -0,0 +1,226 @@
import crypto from 'crypto'
import { db } from '@sim/db'
import { workflow as workflowTable } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { eq } from 'drizzle-orm'
import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/persistence/utils'
import { getAllBlockTypes, getBlock } from '@/blocks/registry'
import type { SubBlockConfig } from '@/blocks/types'
const logger = createLogger('WorkflowContextState')
function stableSortValue(value: any): any {
if (Array.isArray(value)) {
return value.map(stableSortValue)
}
if (value && typeof value === 'object') {
const sorted: Record<string, any> = {}
for (const key of Object.keys(value).sort()) {
sorted[key] = stableSortValue(value[key])
}
return sorted
}
return value
}
export function hashWorkflowState(state: Record<string, unknown>): string {
const stable = stableSortValue(state)
const payload = JSON.stringify(stable)
return `sha256:${crypto.createHash('sha256').update(payload).digest('hex')}`
}
function normalizeOptions(options: unknown): string[] | null {
if (!Array.isArray(options)) return null
const normalized = options
.map((option) => {
if (option == null) return null
if (typeof option === 'object') {
const optionRecord = option as Record<string, unknown>
const id = optionRecord.id
if (typeof id === 'string') return id
const label = optionRecord.label
if (typeof label === 'string') return label
return null
}
return String(option)
})
.filter((value): value is string => Boolean(value))
return normalized.length > 0 ? normalized : null
}
function serializeRequired(required: SubBlockConfig['required']): boolean | Record<string, any> {
if (typeof required === 'boolean') return required
if (!required) return false
if (typeof required === 'object') {
const out: Record<string, any> = {}
const record = required as Record<string, unknown>
for (const key of ['field', 'operator', 'value']) {
if (record[key] !== undefined) {
out[key] = record[key]
}
}
return out
}
return false
}
function serializeSubBlock(subBlock: SubBlockConfig): Record<string, unknown> {
const staticOptions =
typeof subBlock.options === 'function' ? null : normalizeOptions(subBlock.options)
return {
id: subBlock.id,
type: subBlock.type,
title: subBlock.title,
description: subBlock.description || null,
mode: subBlock.mode || null,
placeholder: subBlock.placeholder || null,
hidden: Boolean(subBlock.hidden),
multiSelect: Boolean(subBlock.multiSelect),
required: serializeRequired(subBlock.required),
hasDynamicOptions: typeof subBlock.options === 'function',
options: staticOptions,
defaultValue: subBlock.defaultValue ?? null,
min: subBlock.min ?? null,
max: subBlock.max ?? null,
}
}
function serializeBlockSchema(blockType: string): Record<string, unknown> | null {
const blockConfig = getBlock(blockType)
if (!blockConfig) return null
const subBlocks = Array.isArray(blockConfig.subBlocks)
? blockConfig.subBlocks.map(serializeSubBlock)
: []
const outputs = blockConfig.outputs || {}
const outputKeys = Object.keys(outputs)
return {
blockType,
blockName: blockConfig.name || blockType,
category: blockConfig.category,
triggerAllowed: Boolean(blockConfig.triggerAllowed || blockConfig.triggers?.enabled),
hasTriggersConfig: Boolean(blockConfig.triggers?.enabled),
subBlocks,
outputKeys,
longDescription: blockConfig.longDescription || null,
}
}
export function buildSchemasByType(blockTypes: string[]): {
schemasByType: Record<string, any>
schemaRefsByType: Record<string, string>
} {
const schemasByType: Record<string, any> = {}
const schemaRefsByType: Record<string, string> = {}
const uniqueTypes = [...new Set(blockTypes.filter(Boolean))]
for (const blockType of uniqueTypes) {
const schema = serializeBlockSchema(blockType)
if (!schema) continue
const stableSchema = stableSortValue(schema)
const schemaHash = crypto
.createHash('sha256')
.update(JSON.stringify(stableSchema))
.digest('hex')
schemasByType[blockType] = stableSchema
schemaRefsByType[blockType] = `${blockType}@sha256:${schemaHash}`
}
return { schemasByType, schemaRefsByType }
}
export async function loadWorkflowStateFromDb(workflowId: string): Promise<{
workflowState: {
blocks: Record<string, any>
edges: Array<Record<string, any>>
loops: Record<string, any>
parallels: Record<string, any>
}
workspaceId?: string
}> {
const [workflowRecord] = await db
.select({ workspaceId: workflowTable.workspaceId })
.from(workflowTable)
.where(eq(workflowTable.id, workflowId))
.limit(1)
if (!workflowRecord) {
throw new Error(`Workflow ${workflowId} not found`)
}
const normalized = await loadWorkflowFromNormalizedTables(workflowId)
if (!normalized) {
throw new Error(`Workflow ${workflowId} has no normalized data`)
}
const blocks = { ...normalized.blocks }
const invalidBlockIds: string[] = []
for (const [blockId, block] of Object.entries(blocks)) {
if (!(block as { type?: unknown })?.type) {
invalidBlockIds.push(blockId)
}
}
for (const blockId of invalidBlockIds) {
delete blocks[blockId]
}
const invalidSet = new Set(invalidBlockIds)
const edges = (normalized.edges || []).filter(
(edge: any) => !invalidSet.has(edge.source) && !invalidSet.has(edge.target)
)
if (invalidBlockIds.length > 0) {
logger.warn('Dropped blocks without type while loading workflow state', {
workflowId,
dropped: invalidBlockIds,
})
}
return {
workflowState: {
blocks,
edges,
loops: normalized.loops || {},
parallels: normalized.parallels || {},
},
workspaceId: workflowRecord.workspaceId || undefined,
}
}
export function summarizeWorkflowState(workflowState: {
blocks: Record<string, any>
edges: Array<Record<string, any>>
loops: Record<string, any>
parallels: Record<string, any>
}): Record<string, unknown> {
const blocks = workflowState.blocks || {}
const edges = workflowState.edges || []
const blockTypes: Record<string, number> = {}
const triggerBlocks: Array<{ id: string; name: string; type: string }> = []
for (const [blockId, block] of Object.entries(blocks)) {
const blockType = String((block as Record<string, unknown>).type || 'unknown')
blockTypes[blockType] = (blockTypes[blockType] || 0) + 1
if ((block as Record<string, unknown>).triggerMode === true) {
triggerBlocks.push({
id: blockId,
name: String((block as Record<string, unknown>).name || blockType),
type: blockType,
})
}
}
return {
blockCount: Object.keys(blocks).length,
edgeCount: edges.length,
loopCount: Object.keys(workflowState.loops || {}).length,
parallelCount: Object.keys(workflowState.parallels || {}).length,
blockTypes,
triggerBlocks,
}
}
export function getAllKnownBlockTypes(): string[] {
return getAllBlockTypes()
}

View File

@@ -0,0 +1,194 @@
import { createLogger } from '@sim/logger'
import { z } from 'zod'
import type { BaseServerTool } from '@/lib/copilot/tools/server/base-tool'
import { validateWorkflowState } from '@/lib/workflows/sanitization/validation'
import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils'
import { hashWorkflowState, loadWorkflowStateFromDb } from './workflow-state'
const logger = createLogger('WorkflowVerifyServerTool')
const AcceptanceItemSchema = z.union([
z.string(),
z.object({
kind: z.string().optional(),
assert: z.string(),
}),
])
const WorkflowVerifyInputSchema = z
.object({
workflowId: z.string(),
acceptance: z.array(AcceptanceItemSchema).optional(),
baseSnapshotHash: z.string().optional(),
})
.strict()
type WorkflowVerifyParams = z.infer<typeof WorkflowVerifyInputSchema>
function normalizeName(value: string): string {
return value.trim().toLowerCase()
}
function resolveBlockToken(
workflowState: { blocks: Record<string, any> },
token: string
): string | null {
if (!token) return null
if (workflowState.blocks[token]) return token
const normalized = normalizeName(token)
for (const [blockId, block] of Object.entries(workflowState.blocks || {})) {
const blockName = normalizeName(String((block as Record<string, unknown>).name || ''))
if (blockName === normalized) return blockId
}
return null
}
function hasPath(
workflowState: { edges: Array<Record<string, any>> },
blockPath: string[]
): boolean {
if (blockPath.length < 2) return true
const adjacency = new Map<string, string[]>()
for (const edge of workflowState.edges || []) {
const source = String(edge.source || '')
const target = String(edge.target || '')
if (!source || !target) continue
const existing = adjacency.get(source) || []
existing.push(target)
adjacency.set(source, existing)
}
for (let i = 0; i < blockPath.length - 1; i++) {
const from = blockPath[i]
const to = blockPath[i + 1]
const next = adjacency.get(from) || []
if (!next.includes(to)) return false
}
return true
}
function evaluateAssertions(params: {
workflowState: {
blocks: Record<string, any>
edges: Array<Record<string, any>>
}
assertions: string[]
}): { failures: string[]; checks: Array<Record<string, any>> } {
const failures: string[] = []
const checks: Array<Record<string, any>> = []
for (const assertion of params.assertions) {
if (assertion.startsWith('block_exists:')) {
const token = assertion.slice('block_exists:'.length).trim()
const blockId = resolveBlockToken(params.workflowState, token)
const passed = Boolean(blockId)
checks.push({ assert: assertion, passed, resolvedBlockId: blockId || null })
if (!passed) failures.push(`Assertion failed: ${assertion}`)
continue
}
if (assertion.startsWith('trigger_exists:')) {
const triggerType = normalizeName(assertion.slice('trigger_exists:'.length))
const triggerBlock = Object.values(params.workflowState.blocks || {}).find((block: any) => {
if (block?.triggerMode !== true) return false
return normalizeName(String(block?.type || '')) === triggerType
})
const passed = Boolean(triggerBlock)
checks.push({ assert: assertion, passed })
if (!passed) failures.push(`Assertion failed: ${assertion}`)
continue
}
if (assertion.startsWith('path_exists:')) {
const rawPath = assertion.slice('path_exists:'.length).trim()
const tokens = rawPath
.split('->')
.map((token) => token.trim())
.filter(Boolean)
const resolvedPath = tokens
.map((token) => resolveBlockToken(params.workflowState, token))
.filter((value): value is string => Boolean(value))
const resolvedAll = resolvedPath.length === tokens.length
const passed = resolvedAll && hasPath(params.workflowState, resolvedPath)
checks.push({
assert: assertion,
passed,
resolvedPath,
})
if (!passed) failures.push(`Assertion failed: ${assertion}`)
continue
}
// Unknown assertion format - mark as warning failure for explicit visibility.
checks.push({ assert: assertion, passed: false, reason: 'unknown_assertion_type' })
failures.push(`Unknown assertion format: ${assertion}`)
}
return { failures, checks }
}
export const workflowVerifyServerTool: BaseServerTool<WorkflowVerifyParams, any> = {
name: 'workflow_verify',
inputSchema: WorkflowVerifyInputSchema,
async execute(params: WorkflowVerifyParams, context?: { userId: string }): Promise<any> {
if (!context?.userId) {
throw new Error('Unauthorized workflow access')
}
const authorization = await authorizeWorkflowByWorkspacePermission({
workflowId: params.workflowId,
userId: context.userId,
action: 'read',
})
if (!authorization.allowed) {
throw new Error(authorization.message || 'Unauthorized workflow access')
}
const { workflowState } = await loadWorkflowStateFromDb(params.workflowId)
const snapshotHash = hashWorkflowState(workflowState as unknown as Record<string, unknown>)
if (params.baseSnapshotHash && params.baseSnapshotHash !== snapshotHash) {
return {
success: false,
verified: false,
reason: 'snapshot_mismatch',
expected: params.baseSnapshotHash,
current: snapshotHash,
}
}
const validation = validateWorkflowState(workflowState as any, { sanitize: false })
const assertions = (params.acceptance || []).map((item) =>
typeof item === 'string' ? item : item.assert
)
const assertionResults = evaluateAssertions({
workflowState,
assertions,
})
const verified =
validation.valid && assertionResults.failures.length === 0 && validation.errors.length === 0
logger.info('Workflow verification complete', {
workflowId: params.workflowId,
verified,
errorCount: validation.errors.length,
warningCount: validation.warnings.length,
assertionFailures: assertionResults.failures.length,
})
return {
success: true,
verified,
snapshotHash,
validation: {
valid: validation.valid,
errors: validation.errors,
warnings: validation.warnings,
},
assertions: assertionResults.checks,
failures: assertionResults.failures,
}
},
}

View File

@@ -84,6 +84,15 @@ function isPageUnloading(): boolean {
return _isPageUnloading
}
function isWorkflowEditToolCall(name?: string, params?: Record<string, unknown>): boolean {
if (name === 'edit_workflow') return true
if (name !== 'workflow_change') return false
const mode = typeof params?.mode === 'string' ? params.mode.toLowerCase() : ''
if (mode === 'apply') return true
return typeof params?.proposalId === 'string' && params.proposalId.length > 0
}
function readActiveStreamFromStorage(): CopilotStreamInfo | null {
if (typeof window === 'undefined') return null
try {
@@ -1705,7 +1714,7 @@ export const useCopilotStore = create<CopilotStore>()(
const b = blocks[bi]
if (b?.type === 'tool_call') {
const tn = b.toolCall?.name
if (tn === 'edit_workflow') {
if (isWorkflowEditToolCall(tn, b.toolCall?.params)) {
id = b.toolCall?.id
break outer
}
@@ -1714,7 +1723,9 @@ export const useCopilotStore = create<CopilotStore>()(
}
// Fallback to map if not found in messages
if (!id) {
const candidates = Object.values(toolCallsById).filter((t) => t.name === 'edit_workflow')
const candidates = Object.values(toolCallsById).filter((t) =>
isWorkflowEditToolCall(t.name, t.params)
)
id = candidates.length ? candidates[candidates.length - 1].id : undefined
}
}

View File

@@ -15,7 +15,7 @@ import {
captureBaselineSnapshot,
cloneWorkflowState,
createBatchedUpdater,
findLatestEditWorkflowToolCallId,
findLatestWorkflowEditToolCallId,
getLatestUserMessageId,
persistWorkflowStateToServer,
} from './utils'
@@ -334,7 +334,7 @@ export const useWorkflowDiffStore = create<WorkflowDiffState & WorkflowDiffActio
})
}
findLatestEditWorkflowToolCallId().then((toolCallId) => {
findLatestWorkflowEditToolCallId().then((toolCallId) => {
if (toolCallId) {
import('@/stores/panel/copilot/store')
.then(({ useCopilotStore }) => {
@@ -439,7 +439,7 @@ export const useWorkflowDiffStore = create<WorkflowDiffState & WorkflowDiffActio
})
}
findLatestEditWorkflowToolCallId().then((toolCallId) => {
findLatestWorkflowEditToolCallId().then((toolCallId) => {
if (toolCallId) {
import('@/stores/panel/copilot/store')
.then(({ useCopilotStore }) => {

View File

@@ -126,6 +126,21 @@ export async function getLatestUserMessageId(): Promise<string | null> {
}
export async function findLatestEditWorkflowToolCallId(): Promise<string | undefined> {
return findLatestWorkflowEditToolCallId()
}
function isWorkflowEditToolCall(name?: string, params?: Record<string, unknown>): boolean {
if (name === 'edit_workflow') return true
if (name !== 'workflow_change') return false
const mode = typeof params?.mode === 'string' ? params.mode.toLowerCase() : ''
if (mode === 'apply') return true
// Be permissive for legacy/incomplete events: apply calls always include proposalId.
return typeof params?.proposalId === 'string' && params.proposalId.length > 0
}
export async function findLatestWorkflowEditToolCallId(): Promise<string | undefined> {
try {
const { useCopilotStore } = await import('@/stores/panel/copilot/store')
const { messages, toolCallsById } = useCopilotStore.getState()
@@ -134,17 +149,22 @@ export async function findLatestEditWorkflowToolCallId(): Promise<string | undef
const message = messages[mi]
if (message.role !== 'assistant' || !message.contentBlocks) continue
for (const block of message.contentBlocks) {
if (block?.type === 'tool_call' && block.toolCall?.name === 'edit_workflow') {
if (
block?.type === 'tool_call' &&
isWorkflowEditToolCall(block.toolCall?.name, block.toolCall?.params)
) {
return block.toolCall?.id
}
}
}
const fallback = Object.values(toolCallsById).filter((call) => call.name === 'edit_workflow')
const fallback = Object.values(toolCallsById).filter((call) =>
isWorkflowEditToolCall(call.name, call.params)
)
return fallback.length ? fallback[fallback.length - 1].id : undefined
} catch (error) {
logger.warn('Failed to resolve edit_workflow tool call id', { error })
logger.warn('Failed to resolve workflow edit tool call id', { error })
return undefined
}
}