mirror of
https://github.com/simstudioai/sim.git
synced 2026-02-13 07:55:09 -05:00
Checkpoitn
This commit is contained in:
89
apps/sim/app/api/mcp/copilot/route.test.ts
Normal file
89
apps/sim/app/api/mcp/copilot/route.test.ts
Normal file
@@ -0,0 +1,89 @@
|
||||
/**
|
||||
* @vitest-environment node
|
||||
*/
|
||||
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
|
||||
|
||||
describe('mcp copilot route manifest contract', () => {
|
||||
const previousInternalSecret = process.env.INTERNAL_API_SECRET
|
||||
const previousAgentUrl = process.env.SIM_AGENT_API_URL
|
||||
const previousFetch = global.fetch
|
||||
|
||||
beforeEach(() => {
|
||||
vi.resetModules()
|
||||
process.env.INTERNAL_API_SECRET = 'x'.repeat(32)
|
||||
process.env.SIM_AGENT_API_URL = 'https://copilot.sim.ai'
|
||||
})
|
||||
|
||||
afterEach(() => {
|
||||
vi.restoreAllMocks()
|
||||
global.fetch = previousFetch
|
||||
if (previousInternalSecret === undefined) {
|
||||
delete process.env.INTERNAL_API_SECRET
|
||||
} else {
|
||||
process.env.INTERNAL_API_SECRET = previousInternalSecret
|
||||
}
|
||||
if (previousAgentUrl === undefined) {
|
||||
delete process.env.SIM_AGENT_API_URL
|
||||
} else {
|
||||
process.env.SIM_AGENT_API_URL = previousAgentUrl
|
||||
}
|
||||
})
|
||||
|
||||
it('loads and caches tool manifest from copilot backend', async () => {
|
||||
const payload = {
|
||||
directTools: [
|
||||
{
|
||||
name: 'list_workspaces',
|
||||
description: 'List workspaces',
|
||||
inputSchema: { type: 'object', properties: {} },
|
||||
toolId: 'list_user_workspaces',
|
||||
},
|
||||
],
|
||||
subagentTools: [
|
||||
{
|
||||
name: 'sim_build',
|
||||
description: 'Build workflows',
|
||||
inputSchema: { type: 'object', properties: {} },
|
||||
agentId: 'build',
|
||||
},
|
||||
],
|
||||
generatedAt: '2026-02-12T00:00:00Z',
|
||||
}
|
||||
|
||||
const fetchSpy = vi.spyOn(global, 'fetch').mockResolvedValue(
|
||||
new Response(JSON.stringify(payload), {
|
||||
status: 200,
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
})
|
||||
)
|
||||
|
||||
const mod = await import('./route')
|
||||
mod.clearMcpToolManifestCacheForTests()
|
||||
|
||||
const first = await mod.getMcpToolManifest()
|
||||
const second = await mod.getMcpToolManifest()
|
||||
|
||||
expect(first).toEqual(payload)
|
||||
expect(second).toEqual(payload)
|
||||
expect(fetchSpy).toHaveBeenCalledTimes(1)
|
||||
expect(fetchSpy.mock.calls[0]?.[0]).toBe('https://copilot.sim.ai/api/mcp/tools/manifest')
|
||||
})
|
||||
|
||||
it('rejects invalid manifest payloads from copilot backend', async () => {
|
||||
const fetchSpy = vi.spyOn(global, 'fetch').mockResolvedValue(
|
||||
new Response(JSON.stringify({ tools: [] }), {
|
||||
status: 200,
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
})
|
||||
)
|
||||
|
||||
const mod = await import('./route')
|
||||
mod.clearMcpToolManifestCacheForTests()
|
||||
|
||||
await expect(mod.fetchMcpToolManifestFromCopilot()).rejects.toThrow(
|
||||
'invalid manifest payload from copilot'
|
||||
)
|
||||
expect(fetchSpy).toHaveBeenCalledTimes(1)
|
||||
})
|
||||
})
|
||||
@@ -28,7 +28,6 @@ import {
|
||||
executeToolServerSide,
|
||||
prepareExecutionContext,
|
||||
} from '@/lib/copilot/orchestrator/tool-executor'
|
||||
import { DIRECT_TOOL_DEFS, SUBAGENT_TOOL_DEFS } from '@/lib/copilot/tools/mcp/definitions'
|
||||
import { env } from '@/lib/core/config/env'
|
||||
import { RateLimiter } from '@/lib/core/rate-limiter'
|
||||
import {
|
||||
@@ -39,6 +38,32 @@ import {
|
||||
const logger = createLogger('CopilotMcpAPI')
|
||||
const mcpRateLimiter = new RateLimiter()
|
||||
const DEFAULT_COPILOT_MODEL = 'claude-opus-4-6'
|
||||
const MCP_TOOL_MANIFEST_CACHE_TTL_MS = 60_000
|
||||
|
||||
type McpDirectToolDef = {
|
||||
name: string
|
||||
description: string
|
||||
inputSchema: { type: 'object'; properties?: Record<string, unknown>; required?: string[] }
|
||||
toolId: string
|
||||
}
|
||||
|
||||
type McpSubagentToolDef = {
|
||||
name: string
|
||||
description: string
|
||||
inputSchema: { type: 'object'; properties?: Record<string, unknown>; required?: string[] }
|
||||
agentId: string
|
||||
}
|
||||
|
||||
type McpToolManifest = {
|
||||
directTools: McpDirectToolDef[]
|
||||
subagentTools: McpSubagentToolDef[]
|
||||
generatedAt?: string
|
||||
}
|
||||
|
||||
let cachedMcpToolManifest: {
|
||||
value: McpToolManifest
|
||||
expiresAt: number
|
||||
} | null = null
|
||||
|
||||
export const dynamic = 'force-dynamic'
|
||||
export const runtime = 'nodejs'
|
||||
@@ -112,6 +137,58 @@ async function authenticateCopilotApiKey(apiKey: string): Promise<CopilotKeyAuth
|
||||
}
|
||||
}
|
||||
|
||||
export function isMcpToolManifest(value: unknown): value is McpToolManifest {
|
||||
if (!value || typeof value !== 'object') return false
|
||||
const payload = value as Record<string, unknown>
|
||||
return Array.isArray(payload.directTools) && Array.isArray(payload.subagentTools)
|
||||
}
|
||||
|
||||
export async function fetchMcpToolManifestFromCopilot(): Promise<McpToolManifest> {
|
||||
const internalSecret = env.INTERNAL_API_SECRET
|
||||
if (!internalSecret) {
|
||||
throw new Error('INTERNAL_API_SECRET not configured')
|
||||
}
|
||||
|
||||
const res = await fetch(`${SIM_AGENT_API_URL}/api/mcp/tools/manifest`, {
|
||||
method: 'GET',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
'x-api-key': internalSecret,
|
||||
},
|
||||
signal: AbortSignal.timeout(10_000),
|
||||
})
|
||||
|
||||
if (!res.ok) {
|
||||
const bodyText = await res.text().catch(() => '')
|
||||
throw new Error(`manifest fetch failed (${res.status}): ${bodyText || res.statusText}`)
|
||||
}
|
||||
|
||||
const payload: unknown = await res.json()
|
||||
if (!isMcpToolManifest(payload)) {
|
||||
throw new Error('invalid manifest payload from copilot')
|
||||
}
|
||||
|
||||
return payload
|
||||
}
|
||||
|
||||
export async function getMcpToolManifest(): Promise<McpToolManifest> {
|
||||
const now = Date.now()
|
||||
if (cachedMcpToolManifest && cachedMcpToolManifest.expiresAt > now) {
|
||||
return cachedMcpToolManifest.value
|
||||
}
|
||||
|
||||
const manifest = await fetchMcpToolManifestFromCopilot()
|
||||
cachedMcpToolManifest = {
|
||||
value: manifest,
|
||||
expiresAt: now + MCP_TOOL_MANIFEST_CACHE_TTL_MS,
|
||||
}
|
||||
return manifest
|
||||
}
|
||||
|
||||
export function clearMcpToolManifestCacheForTests(): void {
|
||||
cachedMcpToolManifest = null
|
||||
}
|
||||
|
||||
/**
|
||||
* MCP Server instructions that guide LLMs on how to use the Sim copilot tools.
|
||||
* This is included in the initialize response to help external LLMs understand
|
||||
@@ -380,13 +457,15 @@ function buildMcpServer(abortSignal?: AbortSignal): Server {
|
||||
)
|
||||
|
||||
server.setRequestHandler(ListToolsRequestSchema, async () => {
|
||||
const directTools = DIRECT_TOOL_DEFS.map((tool) => ({
|
||||
const manifest = await getMcpToolManifest()
|
||||
|
||||
const directTools = manifest.directTools.map((tool) => ({
|
||||
name: tool.name,
|
||||
description: tool.description,
|
||||
inputSchema: tool.inputSchema,
|
||||
}))
|
||||
|
||||
const subagentTools = SUBAGENT_TOOL_DEFS.map((tool) => ({
|
||||
const subagentTools = manifest.subagentTools.map((tool) => ({
|
||||
name: tool.name,
|
||||
description: tool.description,
|
||||
inputSchema: tool.inputSchema,
|
||||
@@ -455,12 +534,15 @@ function buildMcpServer(abortSignal?: AbortSignal): Server {
|
||||
throw new McpError(ErrorCode.InvalidParams, 'Tool name required')
|
||||
}
|
||||
|
||||
const manifest = await getMcpToolManifest()
|
||||
|
||||
const result = await handleToolsCall(
|
||||
{
|
||||
name: params.name,
|
||||
arguments: params.arguments,
|
||||
},
|
||||
authResult.userId,
|
||||
manifest,
|
||||
abortSignal
|
||||
)
|
||||
|
||||
@@ -556,16 +638,17 @@ function trackMcpCopilotCall(userId: string): void {
|
||||
async function handleToolsCall(
|
||||
params: { name: string; arguments?: Record<string, unknown> },
|
||||
userId: string,
|
||||
manifest: McpToolManifest,
|
||||
abortSignal?: AbortSignal
|
||||
): Promise<CallToolResult> {
|
||||
const args = params.arguments || {}
|
||||
|
||||
const directTool = DIRECT_TOOL_DEFS.find((tool) => tool.name === params.name)
|
||||
const directTool = manifest.directTools.find((tool) => tool.name === params.name)
|
||||
if (directTool) {
|
||||
return handleDirectToolCall(directTool, args, userId)
|
||||
}
|
||||
|
||||
const subagentTool = SUBAGENT_TOOL_DEFS.find((tool) => tool.name === params.name)
|
||||
const subagentTool = manifest.subagentTools.find((tool) => tool.name === params.name)
|
||||
if (subagentTool) {
|
||||
return handleSubagentToolCall(subagentTool, args, userId, abortSignal)
|
||||
}
|
||||
@@ -574,7 +657,7 @@ async function handleToolsCall(
|
||||
}
|
||||
|
||||
async function handleDirectToolCall(
|
||||
toolDef: (typeof DIRECT_TOOL_DEFS)[number],
|
||||
toolDef: McpDirectToolDef,
|
||||
args: Record<string, unknown>,
|
||||
userId: string
|
||||
): Promise<CallToolResult> {
|
||||
@@ -711,7 +794,7 @@ async function handleBuildToolCall(
|
||||
}
|
||||
|
||||
async function handleSubagentToolCall(
|
||||
toolDef: (typeof SUBAGENT_TOOL_DEFS)[number],
|
||||
toolDef: McpSubagentToolDef,
|
||||
args: Record<string, unknown>,
|
||||
userId: string,
|
||||
abortSignal?: AbortSignal
|
||||
|
||||
170
apps/sim/lib/copilot/client-sse/tool-effects.test.ts
Normal file
170
apps/sim/lib/copilot/client-sse/tool-effects.test.ts
Normal file
@@ -0,0 +1,170 @@
|
||||
/**
|
||||
* @vitest-environment node
|
||||
*/
|
||||
|
||||
import { loggerMock } from '@sim/testing'
|
||||
import { beforeEach, describe, expect, it, vi } from 'vitest'
|
||||
|
||||
vi.mock('@sim/logger', () => loggerMock)
|
||||
|
||||
const mocked = vi.hoisted(() => ({
|
||||
setProposedChanges: vi.fn().mockResolvedValue(undefined),
|
||||
loadEnvironmentVariables: vi.fn(),
|
||||
loadVariablesForWorkflow: vi.fn(),
|
||||
getWorkflowDeploymentStatus: vi.fn().mockReturnValue(null),
|
||||
setDeploymentStatus: vi.fn(),
|
||||
registryState: {
|
||||
activeWorkflowId: 'workflow-active',
|
||||
},
|
||||
}))
|
||||
|
||||
vi.mock('@/stores/workflow-diff/store', () => ({
|
||||
useWorkflowDiffStore: {
|
||||
getState: () => ({
|
||||
setProposedChanges: mocked.setProposedChanges,
|
||||
}),
|
||||
},
|
||||
}))
|
||||
|
||||
vi.mock('@/stores/settings/environment/store', () => ({
|
||||
useEnvironmentStore: {
|
||||
getState: () => ({
|
||||
loadEnvironmentVariables: mocked.loadEnvironmentVariables,
|
||||
}),
|
||||
},
|
||||
}))
|
||||
|
||||
vi.mock('@/stores/panel/variables/store', () => ({
|
||||
useVariablesStore: {
|
||||
getState: () => ({
|
||||
loadForWorkflow: mocked.loadVariablesForWorkflow,
|
||||
}),
|
||||
},
|
||||
}))
|
||||
|
||||
vi.mock('@/stores/workflows/registry/store', () => ({
|
||||
useWorkflowRegistry: {
|
||||
getState: () => ({
|
||||
activeWorkflowId: mocked.registryState.activeWorkflowId,
|
||||
getWorkflowDeploymentStatus: mocked.getWorkflowDeploymentStatus,
|
||||
setDeploymentStatus: mocked.setDeploymentStatus,
|
||||
}),
|
||||
},
|
||||
}))
|
||||
|
||||
import { applyToolEffects } from '@/lib/copilot/client-sse/tool-effects'
|
||||
|
||||
describe('applyToolEffects', () => {
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks()
|
||||
mocked.registryState.activeWorkflowId = 'workflow-active'
|
||||
})
|
||||
|
||||
it('applies workflow_change fallback diff when effects are absent', () => {
|
||||
const workflowState = {
|
||||
blocks: {
|
||||
start: { id: 'start', metadata: { id: 'start', type: 'start' }, inputs: {}, outputs: {} },
|
||||
},
|
||||
edges: [],
|
||||
loops: {},
|
||||
parallels: {},
|
||||
}
|
||||
|
||||
applyToolEffects({
|
||||
effectsRaw: [],
|
||||
toolCall: {
|
||||
id: 'tool-1',
|
||||
name: 'workflow_change',
|
||||
state: 'success',
|
||||
params: { workflowId: 'workflow-123' },
|
||||
} as any,
|
||||
resultPayload: {
|
||||
workflowState,
|
||||
},
|
||||
})
|
||||
|
||||
expect(mocked.setProposedChanges).toHaveBeenCalledTimes(1)
|
||||
expect(mocked.setProposedChanges).toHaveBeenCalledWith(workflowState)
|
||||
})
|
||||
|
||||
it('applies workflow_change fallback diff from nested editResult.workflowState', () => {
|
||||
const workflowState = {
|
||||
blocks: {
|
||||
start: { id: 'start', metadata: { id: 'start', type: 'start' }, inputs: {}, outputs: {} },
|
||||
},
|
||||
edges: [],
|
||||
loops: {},
|
||||
parallels: {},
|
||||
}
|
||||
|
||||
applyToolEffects({
|
||||
effectsRaw: [],
|
||||
toolCall: {
|
||||
id: 'tool-2',
|
||||
name: 'workflow_change',
|
||||
state: 'success',
|
||||
} as any,
|
||||
resultPayload: {
|
||||
editResult: {
|
||||
workflowState,
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
expect(mocked.setProposedChanges).toHaveBeenCalledTimes(1)
|
||||
expect(mocked.setProposedChanges).toHaveBeenCalledWith(workflowState)
|
||||
})
|
||||
|
||||
it('applies explicit workflow.diff.proposed effect', () => {
|
||||
const workflowState = {
|
||||
blocks: {
|
||||
start: { id: 'start', metadata: { id: 'start', type: 'start' }, inputs: {}, outputs: {} },
|
||||
},
|
||||
edges: [],
|
||||
loops: {},
|
||||
parallels: {},
|
||||
}
|
||||
|
||||
applyToolEffects({
|
||||
effectsRaw: [
|
||||
{
|
||||
kind: 'workflow.diff.proposed',
|
||||
payload: {
|
||||
workflowState,
|
||||
},
|
||||
},
|
||||
],
|
||||
toolCall: {
|
||||
id: 'tool-3',
|
||||
name: 'workflow_change',
|
||||
state: 'success',
|
||||
} as any,
|
||||
})
|
||||
|
||||
expect(mocked.setProposedChanges).toHaveBeenCalledTimes(1)
|
||||
expect(mocked.setProposedChanges).toHaveBeenCalledWith(workflowState)
|
||||
})
|
||||
|
||||
it('does not apply fallback diff for non-workflow_change tools', () => {
|
||||
const workflowState = {
|
||||
blocks: {},
|
||||
edges: [],
|
||||
loops: {},
|
||||
parallels: {},
|
||||
}
|
||||
|
||||
applyToolEffects({
|
||||
effectsRaw: [],
|
||||
toolCall: {
|
||||
id: 'tool-4',
|
||||
name: 'list_workflows',
|
||||
state: 'success',
|
||||
} as any,
|
||||
resultPayload: {
|
||||
workflowState,
|
||||
},
|
||||
})
|
||||
|
||||
expect(mocked.setProposedChanges).not.toHaveBeenCalled()
|
||||
})
|
||||
})
|
||||
@@ -14,6 +14,11 @@ type ParsedToolEffect = {
|
||||
payload: Record<string, unknown>
|
||||
}
|
||||
|
||||
function asNonEmptyRecord(value: unknown): Record<string, unknown> | null {
|
||||
const record = asRecord(value)
|
||||
return Object.keys(record).length > 0 ? record : null
|
||||
}
|
||||
|
||||
function parseToolEffects(raw: unknown): ParsedToolEffect[] {
|
||||
if (!Array.isArray(raw)) return []
|
||||
const effects: ParsedToolEffect[] = []
|
||||
@@ -47,14 +52,14 @@ function resolveWorkflowState(
|
||||
payload: Record<string, unknown>,
|
||||
resultPayload?: Record<string, unknown>
|
||||
): WorkflowState | null {
|
||||
const payloadState = asRecord(payload.workflowState)
|
||||
const payloadState = asNonEmptyRecord(payload.workflowState)
|
||||
if (payloadState) return payloadState as unknown as WorkflowState
|
||||
|
||||
if (resultPayload) {
|
||||
const directState = asRecord(resultPayload.workflowState)
|
||||
const directState = asNonEmptyRecord(resultPayload.workflowState)
|
||||
if (directState) return directState as unknown as WorkflowState
|
||||
const editResult = asRecord(resultPayload.editResult)
|
||||
const nestedState = asRecord(editResult?.workflowState)
|
||||
const nestedState = asNonEmptyRecord(editResult?.workflowState)
|
||||
if (nestedState) return nestedState as unknown as WorkflowState
|
||||
}
|
||||
|
||||
|
||||
@@ -1,606 +0,0 @@
|
||||
export type DirectToolDef = {
|
||||
name: string
|
||||
description: string
|
||||
inputSchema: { type: 'object'; properties?: Record<string, unknown>; required?: string[] }
|
||||
toolId: string
|
||||
}
|
||||
|
||||
export type SubagentToolDef = {
|
||||
name: string
|
||||
description: string
|
||||
inputSchema: { type: 'object'; properties?: Record<string, unknown>; required?: string[] }
|
||||
agentId: string
|
||||
}
|
||||
|
||||
/**
|
||||
* Direct tools that execute immediately without LLM orchestration.
|
||||
* These are fast database queries that don't need AI reasoning.
|
||||
*/
|
||||
export const DIRECT_TOOL_DEFS: DirectToolDef[] = [
|
||||
{
|
||||
name: 'list_workspaces',
|
||||
toolId: 'list_user_workspaces',
|
||||
description:
|
||||
'List all workspaces the user has access to. Returns workspace IDs, names, and roles. Use this first to determine which workspace to operate in.',
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'list_workflows',
|
||||
toolId: 'list_user_workflows',
|
||||
description:
|
||||
'List all workflows the user has access to. Returns workflow IDs, names, workspace, and folder info. Use workspaceId/folderId to scope results.',
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
workspaceId: {
|
||||
type: 'string',
|
||||
description: 'Optional workspace ID to filter workflows.',
|
||||
},
|
||||
folderId: {
|
||||
type: 'string',
|
||||
description: 'Optional folder ID to filter workflows.',
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'list_folders',
|
||||
toolId: 'list_folders',
|
||||
description:
|
||||
'List all folders in a workspace. Returns folder IDs, names, and parent relationships for organizing workflows.',
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
workspaceId: {
|
||||
type: 'string',
|
||||
description: 'Workspace ID to list folders from.',
|
||||
},
|
||||
},
|
||||
required: ['workspaceId'],
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'get_workflow',
|
||||
toolId: 'get_user_workflow',
|
||||
description:
|
||||
'Get a workflow by ID. Returns the full workflow definition including all blocks, connections, and configuration.',
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
workflowId: {
|
||||
type: 'string',
|
||||
description: 'Workflow ID to retrieve.',
|
||||
},
|
||||
},
|
||||
required: ['workflowId'],
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'create_workflow',
|
||||
toolId: 'create_workflow',
|
||||
description:
|
||||
'Create a new empty workflow. Returns the new workflow ID. Always call this FIRST before sim_build for new workflows. Use workspaceId to place it in a specific workspace.',
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
name: {
|
||||
type: 'string',
|
||||
description: 'Name for the new workflow.',
|
||||
},
|
||||
workspaceId: {
|
||||
type: 'string',
|
||||
description: 'Optional workspace ID. Uses default workspace if not provided.',
|
||||
},
|
||||
folderId: {
|
||||
type: 'string',
|
||||
description: 'Optional folder ID to place the workflow in.',
|
||||
},
|
||||
description: {
|
||||
type: 'string',
|
||||
description: 'Optional description for the workflow.',
|
||||
},
|
||||
},
|
||||
required: ['name'],
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'create_folder',
|
||||
toolId: 'create_folder',
|
||||
description:
|
||||
'Create a new folder for organizing workflows. Use parentId to create nested folder hierarchies.',
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
name: {
|
||||
type: 'string',
|
||||
description: 'Name for the new folder.',
|
||||
},
|
||||
workspaceId: {
|
||||
type: 'string',
|
||||
description: 'Optional workspace ID. Uses default workspace if not provided.',
|
||||
},
|
||||
parentId: {
|
||||
type: 'string',
|
||||
description: 'Optional parent folder ID for nested folders.',
|
||||
},
|
||||
},
|
||||
required: ['name'],
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'rename_workflow',
|
||||
toolId: 'rename_workflow',
|
||||
description: 'Rename an existing workflow.',
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
workflowId: {
|
||||
type: 'string',
|
||||
description: 'The workflow ID to rename.',
|
||||
},
|
||||
name: {
|
||||
type: 'string',
|
||||
description: 'The new name for the workflow.',
|
||||
},
|
||||
},
|
||||
required: ['workflowId', 'name'],
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'move_workflow',
|
||||
toolId: 'move_workflow',
|
||||
description:
|
||||
'Move a workflow into a different folder. Omit folderId or pass empty string to move to workspace root.',
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
workflowId: {
|
||||
type: 'string',
|
||||
description: 'The workflow ID to move.',
|
||||
},
|
||||
folderId: {
|
||||
type: 'string',
|
||||
description: 'Target folder ID. Omit or pass empty string to move to workspace root.',
|
||||
},
|
||||
},
|
||||
required: ['workflowId'],
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'move_folder',
|
||||
toolId: 'move_folder',
|
||||
description:
|
||||
'Move a folder into another folder. Omit parentId or pass empty string to move to workspace root.',
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
folderId: {
|
||||
type: 'string',
|
||||
description: 'The folder ID to move.',
|
||||
},
|
||||
parentId: {
|
||||
type: 'string',
|
||||
description:
|
||||
'Target parent folder ID. Omit or pass empty string to move to workspace root.',
|
||||
},
|
||||
},
|
||||
required: ['folderId'],
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'workflow_run',
|
||||
toolId: 'workflow_run',
|
||||
description:
|
||||
'Run a workflow using one unified interface. Supports full runs and partial execution modes.',
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
workflowId: {
|
||||
type: 'string',
|
||||
description: 'REQUIRED. The workflow ID to run.',
|
||||
},
|
||||
mode: {
|
||||
type: 'string',
|
||||
description: 'Execution mode: full, until_block, from_block, or block. Default: full.',
|
||||
enum: ['full', 'until_block', 'from_block', 'block'],
|
||||
},
|
||||
workflow_input: {
|
||||
type: 'object',
|
||||
description:
|
||||
'JSON object with input values. Keys should match workflow start block input names.',
|
||||
},
|
||||
stopAfterBlockId: {
|
||||
type: 'string',
|
||||
description: 'Required when mode is until_block.',
|
||||
},
|
||||
startBlockId: {
|
||||
type: 'string',
|
||||
description: 'Required when mode is from_block.',
|
||||
},
|
||||
blockId: {
|
||||
type: 'string',
|
||||
description: 'Required when mode is block.',
|
||||
},
|
||||
executionId: {
|
||||
type: 'string',
|
||||
description: 'Optional execution snapshot ID for from_block or block modes.',
|
||||
},
|
||||
useDeployedState: {
|
||||
type: 'boolean',
|
||||
description: 'When true, runs deployed state instead of draft. Default: false.',
|
||||
},
|
||||
},
|
||||
required: ['workflowId'],
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'get_deployed_workflow_state',
|
||||
toolId: 'get_deployed_workflow_state',
|
||||
description:
|
||||
'Get the deployed (production) state of a workflow. Returns the full workflow definition as deployed, or indicates if the workflow is not yet deployed.',
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
workflowId: {
|
||||
type: 'string',
|
||||
description: 'REQUIRED. The workflow ID to get the deployed state for.',
|
||||
},
|
||||
},
|
||||
required: ['workflowId'],
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'generate_api_key',
|
||||
toolId: 'generate_api_key',
|
||||
description:
|
||||
'Generate a new workspace API key for calling workflow API endpoints. The key is only shown once — tell the user to save it immediately.',
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
name: {
|
||||
type: 'string',
|
||||
description:
|
||||
'A descriptive name for the API key (e.g., "production-key", "dev-testing").',
|
||||
},
|
||||
workspaceId: {
|
||||
type: 'string',
|
||||
description: "Optional workspace ID. Defaults to user's default workspace.",
|
||||
},
|
||||
},
|
||||
required: ['name'],
|
||||
},
|
||||
},
|
||||
]
|
||||
|
||||
export const SUBAGENT_TOOL_DEFS: SubagentToolDef[] = [
|
||||
{
|
||||
name: 'sim_build',
|
||||
agentId: 'build',
|
||||
description: `Build a workflow end-to-end in a single step. This is the fast mode equivalent for headless/MCP usage.
|
||||
|
||||
USE THIS WHEN:
|
||||
- Building a new workflow from scratch
|
||||
- Modifying an existing workflow
|
||||
- You want to gather information and build in one pass without separate plan→edit steps
|
||||
|
||||
WORKFLOW ID (REQUIRED):
|
||||
- For NEW workflows: First call create_workflow to get a workflowId, then pass it here
|
||||
- For EXISTING workflows: Always pass the workflowId parameter
|
||||
|
||||
CAN DO:
|
||||
- Gather information about blocks, credentials, patterns
|
||||
- Search documentation and patterns for best practices
|
||||
- Add, modify, or remove blocks
|
||||
- Configure block settings and connections
|
||||
- Set environment variables and workflow variables
|
||||
|
||||
CANNOT DO:
|
||||
- Run or test workflows (use sim_test separately)
|
||||
- Deploy workflows (use sim_deploy separately)
|
||||
|
||||
WORKFLOW:
|
||||
1. Call create_workflow to get a workflowId (for new workflows)
|
||||
2. Call sim_build with the request and workflowId
|
||||
3. Build agent gathers info and builds in one pass
|
||||
4. Call sim_test to verify it works
|
||||
5. Optionally call sim_deploy to make it externally accessible`,
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
request: {
|
||||
type: 'string',
|
||||
description: 'What you want to build or modify in the workflow.',
|
||||
},
|
||||
workflowId: {
|
||||
type: 'string',
|
||||
description:
|
||||
'REQUIRED. The workflow ID. For new workflows, call create_workflow first to get this.',
|
||||
},
|
||||
context: { type: 'object' },
|
||||
},
|
||||
required: ['request', 'workflowId'],
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'sim_discovery',
|
||||
agentId: 'discovery',
|
||||
description: `Find workflows by their contents or functionality when the user doesn't know the exact name or ID.
|
||||
|
||||
USE THIS WHEN:
|
||||
- User describes a workflow by what it does: "the one that sends emails", "my Slack notification workflow"
|
||||
- User refers to workflow contents: "the workflow with the OpenAI block"
|
||||
- User needs to search/match workflows by functionality or description
|
||||
|
||||
DO NOT USE (use direct tools instead):
|
||||
- User knows the workflow name → use get_workflow
|
||||
- User wants to list all workflows → use list_workflows
|
||||
- User wants to list workspaces → use list_workspaces
|
||||
- User wants to list folders → use list_folders`,
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
request: { type: 'string' },
|
||||
workspaceId: { type: 'string' },
|
||||
context: { type: 'object' },
|
||||
},
|
||||
required: ['request'],
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'sim_plan',
|
||||
agentId: 'plan',
|
||||
description: `Plan workflow changes by gathering required information. For most cases, prefer sim_build which combines planning and editing in one step.
|
||||
|
||||
USE THIS WHEN:
|
||||
- You need fine-grained control over the build process
|
||||
- You want to inspect the plan before executing it
|
||||
|
||||
WORKFLOW ID (REQUIRED):
|
||||
- For NEW workflows: First call create_workflow to get a workflowId, then pass it here
|
||||
- For EXISTING workflows: Always pass the workflowId parameter
|
||||
|
||||
This tool gathers information about available blocks, credentials, and the current workflow state.
|
||||
|
||||
RETURNS: A plan object containing block configurations, connections, and technical details.
|
||||
IMPORTANT: Pass the returned plan EXACTLY to sim_edit - do not modify or summarize it.`,
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
request: {
|
||||
type: 'string',
|
||||
description: 'What you want to build or modify in the workflow.',
|
||||
},
|
||||
workflowId: {
|
||||
type: 'string',
|
||||
description:
|
||||
'REQUIRED. The workflow ID. For new workflows, call create_workflow first to get this.',
|
||||
},
|
||||
context: { type: 'object' },
|
||||
},
|
||||
required: ['request', 'workflowId'],
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'sim_edit',
|
||||
agentId: 'edit',
|
||||
description: `Execute a workflow plan from sim_plan. For most cases, prefer sim_build which combines planning and editing in one step.
|
||||
|
||||
WORKFLOW ID (REQUIRED):
|
||||
- You MUST provide the workflowId parameter
|
||||
|
||||
PLAN (REQUIRED):
|
||||
- Pass the EXACT plan object from sim_plan in the context.plan field
|
||||
- Do NOT modify, summarize, or interpret the plan - pass it verbatim
|
||||
|
||||
After sim_edit completes, you can test immediately with sim_test, or deploy with sim_deploy to make it accessible externally.`,
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
message: { type: 'string', description: 'Optional additional instructions for the edit.' },
|
||||
workflowId: {
|
||||
type: 'string',
|
||||
description:
|
||||
'REQUIRED. The workflow ID to edit. Get this from create_workflow for new workflows.',
|
||||
},
|
||||
plan: {
|
||||
type: 'object',
|
||||
description: 'The plan object from sim_plan. Pass it EXACTLY as returned, do not modify.',
|
||||
},
|
||||
context: {
|
||||
type: 'object',
|
||||
description:
|
||||
'Additional context. Put the plan in context.plan if not using the plan field directly.',
|
||||
},
|
||||
},
|
||||
required: ['workflowId'],
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'sim_deploy',
|
||||
agentId: 'deploy',
|
||||
description: `Deploy a workflow to make it accessible externally. Workflows can be tested without deploying, but deployment is needed for API access, chat UIs, or MCP exposure.
|
||||
|
||||
DEPLOYMENT TYPES:
|
||||
- "deploy as api" - REST API endpoint for programmatic access
|
||||
- "deploy as chat" - Managed chat UI with auth options
|
||||
- "deploy as mcp" - Expose as MCP tool on an MCP server for AI agents to call
|
||||
|
||||
MCP DEPLOYMENT FLOW:
|
||||
The deploy subagent will automatically: list available MCP servers → create one if needed → deploy the workflow as an MCP tool to that server. You can specify server name, tool name, and tool description.
|
||||
|
||||
ALSO CAN:
|
||||
- Get the deployed (production) state to compare with draft
|
||||
- Generate workspace API keys for calling deployed workflows
|
||||
- List and create MCP servers in the workspace`,
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
request: {
|
||||
type: 'string',
|
||||
description: 'The deployment request, e.g. "deploy as api" or "deploy as chat"',
|
||||
},
|
||||
workflowId: {
|
||||
type: 'string',
|
||||
description: 'REQUIRED. The workflow ID to deploy.',
|
||||
},
|
||||
context: { type: 'object' },
|
||||
},
|
||||
required: ['request', 'workflowId'],
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'sim_test',
|
||||
agentId: 'test',
|
||||
description: `Run a workflow and verify its outputs. Works on both deployed and undeployed (draft) workflows. Use after building to verify correctness.
|
||||
|
||||
Supports full and partial execution:
|
||||
- Full run with test inputs using workflow_run mode "full"
|
||||
- Stop after a specific block using workflow_run mode "until_block"
|
||||
- Run a single block in isolation using workflow_run mode "block"
|
||||
- Resume from a specific block using workflow_run mode "from_block"`,
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
request: { type: 'string' },
|
||||
workflowId: {
|
||||
type: 'string',
|
||||
description: 'REQUIRED. The workflow ID to test.',
|
||||
},
|
||||
context: { type: 'object' },
|
||||
},
|
||||
required: ['request', 'workflowId'],
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'sim_debug',
|
||||
agentId: 'debug',
|
||||
description:
|
||||
'Diagnose errors or unexpected workflow behavior. Provide the error message and workflowId. Returns root cause analysis and fix suggestions.',
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
error: { type: 'string', description: 'The error message or description of the issue.' },
|
||||
workflowId: { type: 'string', description: 'REQUIRED. The workflow ID to debug.' },
|
||||
context: { type: 'object' },
|
||||
},
|
||||
required: ['error', 'workflowId'],
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'sim_auth',
|
||||
agentId: 'auth',
|
||||
description:
|
||||
'Check OAuth connection status, list connected services, and initiate new OAuth connections. Use when a workflow needs third-party service access (Google, Slack, GitHub, etc.). In MCP/headless mode, returns an authorization URL the user must open in their browser to complete the OAuth flow.',
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
request: { type: 'string' },
|
||||
context: { type: 'object' },
|
||||
},
|
||||
required: ['request'],
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'sim_knowledge',
|
||||
agentId: 'knowledge',
|
||||
description:
|
||||
'Manage knowledge bases for RAG-powered document retrieval. Supports listing, creating, updating, and deleting knowledge bases. Knowledge bases can be attached to agent blocks for context-aware responses.',
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
request: { type: 'string' },
|
||||
context: { type: 'object' },
|
||||
},
|
||||
required: ['request'],
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'sim_custom_tool',
|
||||
agentId: 'custom_tool',
|
||||
description:
|
||||
'Manage custom tools (reusable API integrations). Supports listing, creating, updating, and deleting custom tools. Custom tools can be added to agent blocks as callable functions.',
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
request: { type: 'string' },
|
||||
context: { type: 'object' },
|
||||
},
|
||||
required: ['request'],
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'sim_info',
|
||||
agentId: 'info',
|
||||
description:
|
||||
"Inspect a workflow's blocks, connections, outputs, variables, and metadata. Use for questions about the Sim platform itself — how blocks work, what integrations are available, platform concepts, etc. Always provide workflowId to scope results to a specific workflow.",
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
request: { type: 'string' },
|
||||
workflowId: { type: 'string' },
|
||||
context: { type: 'object' },
|
||||
},
|
||||
required: ['request'],
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'sim_workflow',
|
||||
agentId: 'workflow',
|
||||
description:
|
||||
'Manage workflow-level configuration: environment variables, settings, scheduling, and deployment status. Use for any data about a specific workflow — its settings, credentials, variables, or deployment state.',
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
request: { type: 'string' },
|
||||
workflowId: { type: 'string' },
|
||||
context: { type: 'object' },
|
||||
},
|
||||
required: ['request'],
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'sim_research',
|
||||
agentId: 'research',
|
||||
description:
|
||||
'Research external APIs and documentation. Use when you need to understand third-party services, external APIs, authentication flows, or data formats OUTSIDE of Sim. For questions about Sim itself, use sim_info instead.',
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
request: { type: 'string' },
|
||||
context: { type: 'object' },
|
||||
},
|
||||
required: ['request'],
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'sim_superagent',
|
||||
agentId: 'superagent',
|
||||
description:
|
||||
'Execute direct actions NOW: send an email, post to Slack, make an API call, etc. Use when the user wants to DO something immediately rather than build a workflow for it.',
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
request: { type: 'string' },
|
||||
context: { type: 'object' },
|
||||
},
|
||||
required: ['request'],
|
||||
},
|
||||
},
|
||||
{
|
||||
name: 'sim_platform',
|
||||
agentId: 'tour',
|
||||
description:
|
||||
'Get help with Sim platform navigation, keyboard shortcuts, and UI actions. Use when the user asks "how do I..." about the Sim editor, wants keyboard shortcuts, or needs to know what actions are available in the UI.',
|
||||
inputSchema: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
request: { type: 'string' },
|
||||
context: { type: 'object' },
|
||||
},
|
||||
required: ['request'],
|
||||
},
|
||||
},
|
||||
]
|
||||
@@ -78,6 +78,7 @@ export type WorkflowChangeProposal = {
|
||||
warnings: string[]
|
||||
diagnostics: string[]
|
||||
touchedBlocks: string[]
|
||||
resolvedIds?: Record<string, string>
|
||||
acceptanceAssertions: string[]
|
||||
postApply?: {
|
||||
verify?: boolean
|
||||
|
||||
@@ -1,298 +0,0 @@
|
||||
import { db } from '@sim/db'
|
||||
import { workflow as workflowTable } from '@sim/db/schema'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { eq } from 'drizzle-orm'
|
||||
import type { BaseServerTool } from '@/lib/copilot/tools/server/base-tool'
|
||||
import { applyAutoLayout } from '@/lib/workflows/autolayout'
|
||||
import { extractAndPersistCustomTools } from '@/lib/workflows/persistence/custom-tools-persistence'
|
||||
import {
|
||||
loadWorkflowFromNormalizedTables,
|
||||
saveWorkflowToNormalizedTables,
|
||||
} from '@/lib/workflows/persistence/utils'
|
||||
import { validateWorkflowState } from '@/lib/workflows/sanitization/validation'
|
||||
import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils'
|
||||
import { getUserPermissionConfig } from '@/ee/access-control/utils/permission-check'
|
||||
import { generateLoopBlocks, generateParallelBlocks } from '@/stores/workflows/workflow/utils'
|
||||
import { applyOperationsToWorkflowState } from './engine'
|
||||
import type { EditWorkflowParams, ValidationError } from './types'
|
||||
import { preValidateCredentialInputs, validateWorkflowSelectorIds } from './validation'
|
||||
|
||||
async function getCurrentWorkflowStateFromDb(
|
||||
workflowId: string
|
||||
): Promise<{ workflowState: any; subBlockValues: Record<string, Record<string, any>> }> {
|
||||
const logger = createLogger('EditWorkflowServerTool')
|
||||
const [workflowRecord] = await db
|
||||
.select()
|
||||
.from(workflowTable)
|
||||
.where(eq(workflowTable.id, workflowId))
|
||||
.limit(1)
|
||||
if (!workflowRecord) throw new Error(`Workflow ${workflowId} not found in database`)
|
||||
const normalized = await loadWorkflowFromNormalizedTables(workflowId)
|
||||
if (!normalized) throw new Error('Workflow has no normalized data')
|
||||
|
||||
// Validate and fix blocks without types
|
||||
const blocks = { ...normalized.blocks }
|
||||
const invalidBlocks: string[] = []
|
||||
|
||||
Object.entries(blocks).forEach(([id, block]: [string, any]) => {
|
||||
if (!block.type) {
|
||||
logger.warn(`Block ${id} loaded without type from database`, {
|
||||
blockKeys: Object.keys(block),
|
||||
blockName: block.name,
|
||||
})
|
||||
invalidBlocks.push(id)
|
||||
}
|
||||
})
|
||||
|
||||
// Remove invalid blocks
|
||||
invalidBlocks.forEach((id) => delete blocks[id])
|
||||
|
||||
// Remove edges connected to invalid blocks
|
||||
const edges = normalized.edges.filter(
|
||||
(edge: any) => !invalidBlocks.includes(edge.source) && !invalidBlocks.includes(edge.target)
|
||||
)
|
||||
|
||||
const workflowState: any = {
|
||||
blocks,
|
||||
edges,
|
||||
loops: normalized.loops || {},
|
||||
parallels: normalized.parallels || {},
|
||||
}
|
||||
const subBlockValues: Record<string, Record<string, any>> = {}
|
||||
Object.entries(normalized.blocks).forEach(([blockId, block]) => {
|
||||
subBlockValues[blockId] = {}
|
||||
Object.entries((block as any).subBlocks || {}).forEach(([subId, sub]) => {
|
||||
if ((sub as any).value !== undefined) subBlockValues[blockId][subId] = (sub as any).value
|
||||
})
|
||||
})
|
||||
return { workflowState, subBlockValues }
|
||||
}
|
||||
|
||||
export const applyWorkflowOperationsServerTool: BaseServerTool<EditWorkflowParams, unknown> = {
|
||||
name: '__internal_apply_workflow_operations',
|
||||
async execute(params: EditWorkflowParams, context?: { userId: string }): Promise<unknown> {
|
||||
const logger = createLogger('EditWorkflowServerTool')
|
||||
const { operations, workflowId, currentUserWorkflow } = params
|
||||
if (!Array.isArray(operations) || operations.length === 0) {
|
||||
throw new Error('operations are required and must be an array')
|
||||
}
|
||||
if (!workflowId) throw new Error('workflowId is required')
|
||||
if (!context?.userId) {
|
||||
throw new Error('Unauthorized workflow access')
|
||||
}
|
||||
|
||||
const authorization = await authorizeWorkflowByWorkspacePermission({
|
||||
workflowId,
|
||||
userId: context.userId,
|
||||
action: 'write',
|
||||
})
|
||||
if (!authorization.allowed) {
|
||||
throw new Error(authorization.message || 'Unauthorized workflow access')
|
||||
}
|
||||
|
||||
logger.info('Executing internal workflow operation apply', {
|
||||
operationCount: operations.length,
|
||||
workflowId,
|
||||
hasCurrentUserWorkflow: !!currentUserWorkflow,
|
||||
})
|
||||
|
||||
// Get current workflow state
|
||||
let workflowState: any
|
||||
if (currentUserWorkflow) {
|
||||
try {
|
||||
workflowState = JSON.parse(currentUserWorkflow)
|
||||
} catch (error) {
|
||||
logger.error('Failed to parse currentUserWorkflow', error)
|
||||
throw new Error('Invalid currentUserWorkflow format')
|
||||
}
|
||||
} else {
|
||||
const fromDb = await getCurrentWorkflowStateFromDb(workflowId)
|
||||
workflowState = fromDb.workflowState
|
||||
}
|
||||
|
||||
// Get permission config for the user
|
||||
const permissionConfig = context?.userId ? await getUserPermissionConfig(context.userId) : null
|
||||
|
||||
// Pre-validate credential and apiKey inputs before applying operations
|
||||
// This filters out invalid credentials and apiKeys for hosted models
|
||||
let operationsToApply = operations
|
||||
const credentialErrors: ValidationError[] = []
|
||||
if (context?.userId) {
|
||||
const { filteredOperations, errors: credErrors } = await preValidateCredentialInputs(
|
||||
operations,
|
||||
{ userId: context.userId },
|
||||
workflowState
|
||||
)
|
||||
operationsToApply = filteredOperations
|
||||
credentialErrors.push(...credErrors)
|
||||
}
|
||||
|
||||
// Apply operations directly to the workflow state
|
||||
const {
|
||||
state: modifiedWorkflowState,
|
||||
validationErrors,
|
||||
skippedItems,
|
||||
} = applyOperationsToWorkflowState(workflowState, operationsToApply, permissionConfig)
|
||||
|
||||
// Add credential validation errors
|
||||
validationErrors.push(...credentialErrors)
|
||||
|
||||
// Get workspaceId for selector validation
|
||||
let workspaceId: string | undefined
|
||||
try {
|
||||
const [workflowRecord] = await db
|
||||
.select({ workspaceId: workflowTable.workspaceId })
|
||||
.from(workflowTable)
|
||||
.where(eq(workflowTable.id, workflowId))
|
||||
.limit(1)
|
||||
workspaceId = workflowRecord?.workspaceId ?? undefined
|
||||
} catch (error) {
|
||||
logger.warn('Failed to get workspaceId for selector validation', { error, workflowId })
|
||||
}
|
||||
|
||||
// Validate selector IDs exist in the database
|
||||
if (context?.userId) {
|
||||
try {
|
||||
const selectorErrors = await validateWorkflowSelectorIds(modifiedWorkflowState, {
|
||||
userId: context.userId,
|
||||
workspaceId,
|
||||
})
|
||||
validationErrors.push(...selectorErrors)
|
||||
} catch (error) {
|
||||
logger.warn('Selector ID validation failed', {
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Validate the workflow state
|
||||
const validation = validateWorkflowState(modifiedWorkflowState, { sanitize: true })
|
||||
|
||||
if (!validation.valid) {
|
||||
logger.error('Edited workflow state is invalid', {
|
||||
errors: validation.errors,
|
||||
warnings: validation.warnings,
|
||||
})
|
||||
throw new Error(`Invalid edited workflow: ${validation.errors.join('; ')}`)
|
||||
}
|
||||
|
||||
if (validation.warnings.length > 0) {
|
||||
logger.warn('Edited workflow validation warnings', {
|
||||
warnings: validation.warnings,
|
||||
})
|
||||
}
|
||||
|
||||
// Extract and persist custom tools to database (reuse workspaceId from selector validation)
|
||||
if (context?.userId && workspaceId) {
|
||||
try {
|
||||
const finalWorkflowState = validation.sanitizedState || modifiedWorkflowState
|
||||
const { saved, errors } = await extractAndPersistCustomTools(
|
||||
finalWorkflowState,
|
||||
workspaceId,
|
||||
context.userId
|
||||
)
|
||||
|
||||
if (saved > 0) {
|
||||
logger.info(`Persisted ${saved} custom tool(s) to database`, { workflowId })
|
||||
}
|
||||
|
||||
if (errors.length > 0) {
|
||||
logger.warn('Some custom tools failed to persist', { errors, workflowId })
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('Failed to persist custom tools', { error, workflowId })
|
||||
}
|
||||
} else if (context?.userId && !workspaceId) {
|
||||
logger.warn('Workflow has no workspaceId, skipping custom tools persistence', {
|
||||
workflowId,
|
||||
})
|
||||
} else {
|
||||
logger.warn('No userId in context - skipping custom tools persistence', { workflowId })
|
||||
}
|
||||
|
||||
logger.info('Internal workflow operation apply succeeded', {
|
||||
operationCount: operations.length,
|
||||
blocksCount: Object.keys(modifiedWorkflowState.blocks).length,
|
||||
edgesCount: modifiedWorkflowState.edges.length,
|
||||
inputValidationErrors: validationErrors.length,
|
||||
skippedItemsCount: skippedItems.length,
|
||||
schemaValidationErrors: validation.errors.length,
|
||||
validationWarnings: validation.warnings.length,
|
||||
})
|
||||
|
||||
// Format validation errors for LLM feedback
|
||||
const inputErrors =
|
||||
validationErrors.length > 0
|
||||
? validationErrors.map((e) => `Block "${e.blockId}" (${e.blockType}): ${e.error}`)
|
||||
: undefined
|
||||
|
||||
// Format skipped items for LLM feedback
|
||||
const skippedMessages =
|
||||
skippedItems.length > 0 ? skippedItems.map((item) => item.reason) : undefined
|
||||
|
||||
// Persist the workflow state to the database
|
||||
const finalWorkflowState = validation.sanitizedState || modifiedWorkflowState
|
||||
|
||||
// Apply autolayout to position blocks properly
|
||||
const layoutResult = applyAutoLayout(finalWorkflowState.blocks, finalWorkflowState.edges, {
|
||||
horizontalSpacing: 250,
|
||||
verticalSpacing: 100,
|
||||
padding: { x: 100, y: 100 },
|
||||
})
|
||||
|
||||
const layoutedBlocks =
|
||||
layoutResult.success && layoutResult.blocks ? layoutResult.blocks : finalWorkflowState.blocks
|
||||
|
||||
if (!layoutResult.success) {
|
||||
logger.warn('Autolayout failed, using default positions', {
|
||||
workflowId,
|
||||
error: layoutResult.error,
|
||||
})
|
||||
}
|
||||
|
||||
const workflowStateForDb = {
|
||||
blocks: layoutedBlocks,
|
||||
edges: finalWorkflowState.edges,
|
||||
loops: generateLoopBlocks(layoutedBlocks as any),
|
||||
parallels: generateParallelBlocks(layoutedBlocks as any),
|
||||
lastSaved: Date.now(),
|
||||
isDeployed: false,
|
||||
}
|
||||
|
||||
const saveResult = await saveWorkflowToNormalizedTables(workflowId, workflowStateForDb as any)
|
||||
if (!saveResult.success) {
|
||||
logger.error('Failed to persist workflow state to database', {
|
||||
workflowId,
|
||||
error: saveResult.error,
|
||||
})
|
||||
throw new Error(`Failed to save workflow: ${saveResult.error}`)
|
||||
}
|
||||
|
||||
// Update workflow's lastSynced timestamp
|
||||
await db
|
||||
.update(workflowTable)
|
||||
.set({
|
||||
lastSynced: new Date(),
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(workflowTable.id, workflowId))
|
||||
|
||||
logger.info('Workflow state persisted to database', { workflowId })
|
||||
|
||||
// Return the modified workflow state with autolayout applied
|
||||
return {
|
||||
success: true,
|
||||
workflowState: { ...finalWorkflowState, blocks: layoutedBlocks },
|
||||
// Include input validation errors so the LLM can see what was rejected
|
||||
...(inputErrors && {
|
||||
inputValidationErrors: inputErrors,
|
||||
inputValidationMessage: `${inputErrors.length} input(s) were rejected due to validation errors. The workflow was still updated with valid inputs only. Errors: ${inputErrors.join('; ')}`,
|
||||
}),
|
||||
// Include skipped items so the LLM can see what operations were skipped
|
||||
...(skippedMessages && {
|
||||
skippedItems: skippedMessages,
|
||||
skippedItemsMessage: `${skippedItems.length} operation(s) were skipped due to invalid references. Details: ${skippedMessages.join('; ')}`,
|
||||
}),
|
||||
}
|
||||
},
|
||||
}
|
||||
@@ -19,9 +19,9 @@ import {
|
||||
saveProposal,
|
||||
type WorkflowChangeProposal,
|
||||
} from './change-store'
|
||||
import { applyWorkflowOperationsServerTool } from './edit-workflow'
|
||||
import { applyOperationsToWorkflowState } from './edit-workflow/engine'
|
||||
import { preValidateCredentialInputs } from './edit-workflow/validation'
|
||||
import { applyWorkflowOperations } from './workflow-operations/apply'
|
||||
import { applyOperationsToWorkflowState } from './workflow-operations/engine'
|
||||
import { preValidateCredentialInputs } from './workflow-operations/validation'
|
||||
import { workflowVerifyServerTool } from './workflow-verify'
|
||||
import { hashWorkflowState, loadWorkflowStateFromDb } from './workflow-state'
|
||||
|
||||
@@ -287,10 +287,10 @@ type ConnectionTarget = {
|
||||
|
||||
type ConnectionState = Map<string, Map<string, ConnectionTarget[]>>
|
||||
|
||||
function createDraftBlockId(seed?: string): string {
|
||||
const suffix = crypto.randomUUID().slice(0, 8)
|
||||
const base = seed ? seed.replace(/[^a-zA-Z0-9]/g, '').slice(0, 24) : 'draft'
|
||||
return `${base || 'draft'}_${suffix}`
|
||||
const UUID_REGEX = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i
|
||||
|
||||
function createDraftBlockId(_seed?: string): string {
|
||||
return crypto.randomUUID()
|
||||
}
|
||||
|
||||
function normalizeHandle(handle?: string): string {
|
||||
@@ -314,6 +314,38 @@ function normalizeAcceptance(assertions: ChangeSpec['acceptance'] | undefined):
|
||||
.filter((item): item is string => typeof item === 'string' && item.trim().length > 0)
|
||||
}
|
||||
|
||||
function materializeAcceptanceAssertions(
|
||||
assertions: string[],
|
||||
resolvedIds?: Record<string, string>
|
||||
): string[] {
|
||||
if (!resolvedIds || Object.keys(resolvedIds).length === 0) {
|
||||
return assertions
|
||||
}
|
||||
|
||||
const resolveToken = (token: string): string => {
|
||||
const trimmed = token.trim()
|
||||
return resolvedIds[trimmed] || trimmed
|
||||
}
|
||||
|
||||
return assertions.map((assertion) => {
|
||||
if (assertion.startsWith('block_exists:')) {
|
||||
const token = assertion.slice('block_exists:'.length)
|
||||
return `block_exists:${resolveToken(token)}`
|
||||
}
|
||||
|
||||
if (assertion.startsWith('path_exists:')) {
|
||||
const rawPath = assertion.slice('path_exists:'.length).trim()
|
||||
const mapped = rawPath
|
||||
.split('->')
|
||||
.map((token) => resolveToken(token))
|
||||
.join('->')
|
||||
return `path_exists:${mapped}`
|
||||
}
|
||||
|
||||
return assertion
|
||||
})
|
||||
}
|
||||
|
||||
function normalizePostApply(postApply?: PostApply): NormalizedPostApply {
|
||||
const run = postApply?.run
|
||||
const evaluator = postApply?.evaluator
|
||||
@@ -670,12 +702,14 @@ async function compileChangeSpec(params: {
|
||||
warnings: string[]
|
||||
diagnostics: string[]
|
||||
touchedBlocks: string[]
|
||||
resolvedIds: Record<string, string>
|
||||
}> {
|
||||
const { changeSpec, workflowState, userId, workflowId } = params
|
||||
const operations: Array<Record<string, any>> = []
|
||||
const diagnostics: string[] = []
|
||||
const warnings: string[] = []
|
||||
const touchedBlocks = new Set<string>()
|
||||
const resolvedIds: Record<string, string> = { ...(changeSpec.resolvedIds || {}) }
|
||||
|
||||
const aliasMap = new Map<string, string>()
|
||||
const workingState = deepClone(workflowState)
|
||||
@@ -683,6 +717,11 @@ async function compileChangeSpec(params: {
|
||||
const connectionTouchedSources = new Set<string>()
|
||||
const plannedBlockTypes = new Map<string, string>()
|
||||
|
||||
const recordResolved = (token: string | undefined, blockId: string | null | undefined): void => {
|
||||
if (!token || !blockId) return
|
||||
resolvedIds[token] = blockId
|
||||
}
|
||||
|
||||
// Seed aliases from existing block names.
|
||||
for (const [blockId, block] of Object.entries(workingState.blocks || {})) {
|
||||
const blockName = String((block as Record<string, unknown>).name || '')
|
||||
@@ -708,24 +747,40 @@ async function compileChangeSpec(params: {
|
||||
): string | null => {
|
||||
if (!target) return null
|
||||
if (target.blockId) {
|
||||
if (aliasMap.has(target.blockId)) {
|
||||
const mapped = aliasMap.get(target.blockId) || null
|
||||
recordResolved(target.blockId, mapped)
|
||||
return mapped
|
||||
}
|
||||
if (workingState.blocks[target.blockId] || plannedBlockTypes.has(target.blockId)) {
|
||||
recordResolved(target.blockId, target.blockId)
|
||||
return target.blockId
|
||||
}
|
||||
return allowCreateAlias ? target.blockId : null
|
||||
}
|
||||
|
||||
if (target.alias) {
|
||||
if (aliasMap.has(target.alias)) return aliasMap.get(target.alias) || null
|
||||
if (aliasMap.has(target.alias)) {
|
||||
const mapped = aliasMap.get(target.alias) || null
|
||||
recordResolved(target.alias, mapped)
|
||||
return mapped
|
||||
}
|
||||
const byMatch = findMatchingBlockId(workingState, { alias: target.alias })
|
||||
if (byMatch) {
|
||||
aliasMap.set(target.alias, byMatch)
|
||||
recordResolved(target.alias, byMatch)
|
||||
return byMatch
|
||||
}
|
||||
return allowCreateAlias ? target.alias : null
|
||||
}
|
||||
|
||||
const matched = findMatchingBlockId(workingState, target)
|
||||
if (matched) return matched
|
||||
if (matched) {
|
||||
if (target.match?.name) {
|
||||
recordResolved(target.match.name, matched)
|
||||
}
|
||||
return matched
|
||||
}
|
||||
return null
|
||||
}
|
||||
|
||||
@@ -891,8 +946,11 @@ async function compileChangeSpec(params: {
|
||||
diagnostics.push(`ensure_block for "${targetId}" requires type and name when creating`)
|
||||
continue
|
||||
}
|
||||
const requestedBlockId = mutation.target?.blockId
|
||||
const blockId =
|
||||
mutation.target?.blockId || mutation.target?.alias || createDraftBlockId(mutation.name)
|
||||
requestedBlockId && UUID_REGEX.test(requestedBlockId)
|
||||
? requestedBlockId
|
||||
: createDraftBlockId(mutation.name)
|
||||
const addParams: Record<string, any> = {
|
||||
type: mutation.type,
|
||||
name: mutation.name,
|
||||
@@ -922,7 +980,15 @@ async function compileChangeSpec(params: {
|
||||
}
|
||||
plannedBlockTypes.set(blockId, mutation.type)
|
||||
touchedBlocks.add(blockId)
|
||||
if (mutation.target?.alias) aliasMap.set(mutation.target.alias, blockId)
|
||||
if (requestedBlockId) {
|
||||
aliasMap.set(requestedBlockId, blockId)
|
||||
recordResolved(requestedBlockId, blockId)
|
||||
}
|
||||
if (mutation.target?.alias) {
|
||||
aliasMap.set(mutation.target.alias, blockId)
|
||||
recordResolved(mutation.target.alias, blockId)
|
||||
}
|
||||
recordResolved(targetId, blockId)
|
||||
}
|
||||
continue
|
||||
}
|
||||
@@ -1061,6 +1127,7 @@ async function compileChangeSpec(params: {
|
||||
warnings,
|
||||
diagnostics,
|
||||
touchedBlocks: [...touchedBlocks],
|
||||
resolvedIds,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1212,6 +1279,10 @@ export const workflowChangeServerTool: BaseServerTool<WorkflowChangeParams, any>
|
||||
const diagnostics = [...compileResult.diagnostics, ...simulation.diagnostics]
|
||||
const warnings = [...compileResult.warnings, ...simulation.warnings]
|
||||
const acceptanceAssertions = normalizeAcceptance(params.changeSpec.acceptance)
|
||||
const materializedAcceptance = materializeAcceptanceAssertions(
|
||||
acceptanceAssertions,
|
||||
compileResult.resolvedIds
|
||||
)
|
||||
const normalizedPostApply = normalizePostApply(
|
||||
(params.postApply as PostApply | undefined) || params.changeSpec.postApply
|
||||
)
|
||||
@@ -1224,12 +1295,13 @@ export const workflowChangeServerTool: BaseServerTool<WorkflowChangeParams, any>
|
||||
warnings,
|
||||
diagnostics,
|
||||
touchedBlocks: compileResult.touchedBlocks,
|
||||
acceptanceAssertions,
|
||||
resolvedIds: compileResult.resolvedIds,
|
||||
acceptanceAssertions: materializedAcceptance,
|
||||
postApply: normalizedPostApply,
|
||||
handoff: {
|
||||
objective: params.changeSpec.objective,
|
||||
constraints: params.changeSpec.constraints,
|
||||
resolvedIds: params.changeSpec.resolvedIds,
|
||||
resolvedIds: compileResult.resolvedIds,
|
||||
assumptions: params.changeSpec.assumptions,
|
||||
unresolvedRisks: params.changeSpec.unresolvedRisks,
|
||||
},
|
||||
@@ -1256,7 +1328,8 @@ export const workflowChangeServerTool: BaseServerTool<WorkflowChangeParams, any>
|
||||
warnings,
|
||||
diagnostics,
|
||||
touchedBlocks: proposal.touchedBlocks,
|
||||
acceptance: proposal.acceptanceAssertions,
|
||||
resolvedIds: proposal.resolvedIds || {},
|
||||
acceptance: materializedAcceptance,
|
||||
postApply: normalizedPostApply,
|
||||
handoff: proposal.handoff,
|
||||
}
|
||||
@@ -1294,18 +1367,24 @@ export const workflowChangeServerTool: BaseServerTool<WorkflowChangeParams, any>
|
||||
throw new Error(`snapshot_mismatch: expected ${expectedHash} but current is ${currentHash}`)
|
||||
}
|
||||
|
||||
const applyResult = await applyWorkflowOperationsServerTool.execute(
|
||||
{
|
||||
workflowId: proposal.workflowId,
|
||||
operations: proposal.compiledOperations as any,
|
||||
},
|
||||
{ userId: context.userId }
|
||||
const applyResult = await applyWorkflowOperations({
|
||||
workflowId: proposal.workflowId,
|
||||
operations: proposal.compiledOperations as any,
|
||||
userId: context.userId,
|
||||
})
|
||||
|
||||
const resolvedIds = proposal.resolvedIds || proposal.handoff?.resolvedIds || {}
|
||||
const acceptanceAssertions = materializeAcceptanceAssertions(
|
||||
proposal.acceptanceAssertions,
|
||||
resolvedIds
|
||||
)
|
||||
|
||||
const appliedWorkflowState = (applyResult as any)?.workflowState
|
||||
const newSnapshotHash = appliedWorkflowState
|
||||
? hashWorkflowState(appliedWorkflowState as Record<string, unknown>)
|
||||
: null
|
||||
// Canonicalize post-apply state from persisted DB snapshot to avoid
|
||||
// in-memory serialization drift and transient hash mismatches.
|
||||
const { workflowState: persistedWorkflowState } = await loadWorkflowStateFromDb(proposal.workflowId)
|
||||
const newSnapshotHash = hashWorkflowState(
|
||||
persistedWorkflowState as unknown as Record<string, unknown>
|
||||
)
|
||||
const normalizedPostApply = normalizePostApply(
|
||||
(params.postApply as PostApply | undefined) || (proposal.postApply as PostApply | undefined)
|
||||
)
|
||||
@@ -1315,8 +1394,9 @@ export const workflowChangeServerTool: BaseServerTool<WorkflowChangeParams, any>
|
||||
verifyResult = await workflowVerifyServerTool.execute(
|
||||
{
|
||||
workflowId: proposal.workflowId,
|
||||
baseSnapshotHash: newSnapshotHash || undefined,
|
||||
acceptance: proposal.acceptanceAssertions,
|
||||
// Intentionally omit baseSnapshotHash for same-request post-apply verification
|
||||
// to avoid false negatives from benign persistence reorderings.
|
||||
acceptance: acceptanceAssertions,
|
||||
},
|
||||
{ userId: context.userId }
|
||||
)
|
||||
@@ -1349,10 +1429,12 @@ export const workflowChangeServerTool: BaseServerTool<WorkflowChangeParams, any>
|
||||
baseSnapshotHash: proposal.baseSnapshotHash,
|
||||
newSnapshotHash,
|
||||
operations: proposal.compiledOperations,
|
||||
workflowState: appliedWorkflowState || null,
|
||||
workflowState: persistedWorkflowState || null,
|
||||
appliedDiff: proposal.diffSummary,
|
||||
warnings: proposal.warnings,
|
||||
diagnostics: proposal.diagnostics,
|
||||
resolvedIds,
|
||||
acceptance: acceptanceAssertions,
|
||||
editResult: applyResult,
|
||||
postApply: {
|
||||
ok: evaluatorGate.passed,
|
||||
|
||||
@@ -0,0 +1,286 @@
|
||||
import { db } from '@sim/db'
|
||||
import { workflow as workflowTable } from '@sim/db/schema'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { eq } from 'drizzle-orm'
|
||||
import { applyAutoLayout } from '@/lib/workflows/autolayout'
|
||||
import { extractAndPersistCustomTools } from '@/lib/workflows/persistence/custom-tools-persistence'
|
||||
import {
|
||||
loadWorkflowFromNormalizedTables,
|
||||
saveWorkflowToNormalizedTables,
|
||||
} from '@/lib/workflows/persistence/utils'
|
||||
import { validateWorkflowState } from '@/lib/workflows/sanitization/validation'
|
||||
import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils'
|
||||
import { getUserPermissionConfig } from '@/ee/access-control/utils/permission-check'
|
||||
import { generateLoopBlocks, generateParallelBlocks } from '@/stores/workflows/workflow/utils'
|
||||
import { applyOperationsToWorkflowState } from './engine'
|
||||
import type { EditWorkflowOperation, ValidationError } from './types'
|
||||
import { preValidateCredentialInputs, validateWorkflowSelectorIds } from './validation'
|
||||
|
||||
type ApplyWorkflowOperationsParams = {
|
||||
operations: EditWorkflowOperation[]
|
||||
workflowId: string
|
||||
userId: string
|
||||
currentUserWorkflow?: string
|
||||
}
|
||||
|
||||
async function getCurrentWorkflowStateFromDb(
|
||||
workflowId: string
|
||||
): Promise<{ workflowState: any; subBlockValues: Record<string, Record<string, any>> }> {
|
||||
const logger = createLogger('WorkflowOperationApply')
|
||||
const [workflowRecord] = await db
|
||||
.select()
|
||||
.from(workflowTable)
|
||||
.where(eq(workflowTable.id, workflowId))
|
||||
.limit(1)
|
||||
if (!workflowRecord) throw new Error(`Workflow ${workflowId} not found in database`)
|
||||
const normalized = await loadWorkflowFromNormalizedTables(workflowId)
|
||||
if (!normalized) throw new Error('Workflow has no normalized data')
|
||||
|
||||
// Validate and fix blocks without types
|
||||
const blocks = { ...normalized.blocks }
|
||||
const invalidBlocks: string[] = []
|
||||
|
||||
Object.entries(blocks).forEach(([id, block]: [string, any]) => {
|
||||
if (!block.type) {
|
||||
logger.warn(`Block ${id} loaded without type from database`, {
|
||||
blockKeys: Object.keys(block),
|
||||
blockName: block.name,
|
||||
})
|
||||
invalidBlocks.push(id)
|
||||
}
|
||||
})
|
||||
|
||||
// Remove invalid blocks
|
||||
invalidBlocks.forEach((id) => delete blocks[id])
|
||||
|
||||
// Remove edges connected to invalid blocks
|
||||
const edges = normalized.edges.filter(
|
||||
(edge: any) => !invalidBlocks.includes(edge.source) && !invalidBlocks.includes(edge.target)
|
||||
)
|
||||
|
||||
const workflowState: any = {
|
||||
blocks,
|
||||
edges,
|
||||
loops: normalized.loops || {},
|
||||
parallels: normalized.parallels || {},
|
||||
}
|
||||
const subBlockValues: Record<string, Record<string, any>> = {}
|
||||
Object.entries(normalized.blocks).forEach(([blockId, block]) => {
|
||||
subBlockValues[blockId] = {}
|
||||
Object.entries((block as any).subBlocks || {}).forEach(([subId, sub]) => {
|
||||
if ((sub as any).value !== undefined) subBlockValues[blockId][subId] = (sub as any).value
|
||||
})
|
||||
})
|
||||
return { workflowState, subBlockValues }
|
||||
}
|
||||
|
||||
export async function applyWorkflowOperations(params: ApplyWorkflowOperationsParams): Promise<any> {
|
||||
const logger = createLogger('WorkflowOperationApply')
|
||||
const { operations, workflowId, currentUserWorkflow, userId } = params
|
||||
if (!Array.isArray(operations) || operations.length === 0) {
|
||||
throw new Error('operations are required and must be an array')
|
||||
}
|
||||
if (!workflowId) throw new Error('workflowId is required')
|
||||
if (!userId) throw new Error('Unauthorized workflow access')
|
||||
|
||||
const authorization = await authorizeWorkflowByWorkspacePermission({
|
||||
workflowId,
|
||||
userId,
|
||||
action: 'write',
|
||||
})
|
||||
if (!authorization.allowed) {
|
||||
throw new Error(authorization.message || 'Unauthorized workflow access')
|
||||
}
|
||||
|
||||
logger.info('Executing workflow operation apply', {
|
||||
operationCount: operations.length,
|
||||
workflowId,
|
||||
hasCurrentUserWorkflow: !!currentUserWorkflow,
|
||||
})
|
||||
|
||||
// Get current workflow state
|
||||
let workflowState: any
|
||||
if (currentUserWorkflow) {
|
||||
try {
|
||||
workflowState = JSON.parse(currentUserWorkflow)
|
||||
} catch (error) {
|
||||
logger.error('Failed to parse currentUserWorkflow', error)
|
||||
throw new Error('Invalid currentUserWorkflow format')
|
||||
}
|
||||
} else {
|
||||
const fromDb = await getCurrentWorkflowStateFromDb(workflowId)
|
||||
workflowState = fromDb.workflowState
|
||||
}
|
||||
|
||||
// Get permission config for the user
|
||||
const permissionConfig = await getUserPermissionConfig(userId)
|
||||
|
||||
// Pre-validate credential and apiKey inputs before applying operations
|
||||
// This filters out invalid credentials and apiKeys for hosted models
|
||||
let operationsToApply = operations
|
||||
const credentialErrors: ValidationError[] = []
|
||||
const { filteredOperations, errors: credErrors } = await preValidateCredentialInputs(
|
||||
operations,
|
||||
{ userId },
|
||||
workflowState
|
||||
)
|
||||
operationsToApply = filteredOperations
|
||||
credentialErrors.push(...credErrors)
|
||||
|
||||
// Apply operations directly to the workflow state
|
||||
const {
|
||||
state: modifiedWorkflowState,
|
||||
validationErrors,
|
||||
skippedItems,
|
||||
} = applyOperationsToWorkflowState(workflowState, operationsToApply, permissionConfig)
|
||||
|
||||
// Add credential validation errors
|
||||
validationErrors.push(...credentialErrors)
|
||||
|
||||
// Get workspaceId for selector validation
|
||||
let workspaceId: string | undefined
|
||||
try {
|
||||
const [workflowRecord] = await db
|
||||
.select({ workspaceId: workflowTable.workspaceId })
|
||||
.from(workflowTable)
|
||||
.where(eq(workflowTable.id, workflowId))
|
||||
.limit(1)
|
||||
workspaceId = workflowRecord?.workspaceId ?? undefined
|
||||
} catch (error) {
|
||||
logger.warn('Failed to get workspaceId for selector validation', { error, workflowId })
|
||||
}
|
||||
|
||||
// Validate selector IDs exist in the database
|
||||
try {
|
||||
const selectorErrors = await validateWorkflowSelectorIds(modifiedWorkflowState, {
|
||||
userId,
|
||||
workspaceId,
|
||||
})
|
||||
validationErrors.push(...selectorErrors)
|
||||
} catch (error) {
|
||||
logger.warn('Selector ID validation failed', {
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
})
|
||||
}
|
||||
|
||||
// Validate the workflow state
|
||||
const validation = validateWorkflowState(modifiedWorkflowState, { sanitize: true })
|
||||
|
||||
if (!validation.valid) {
|
||||
logger.error('Edited workflow state is invalid', {
|
||||
errors: validation.errors,
|
||||
warnings: validation.warnings,
|
||||
})
|
||||
throw new Error(`Invalid edited workflow: ${validation.errors.join('; ')}`)
|
||||
}
|
||||
|
||||
if (validation.warnings.length > 0) {
|
||||
logger.warn('Edited workflow validation warnings', {
|
||||
warnings: validation.warnings,
|
||||
})
|
||||
}
|
||||
|
||||
// Extract and persist custom tools to database (reuse workspaceId from selector validation)
|
||||
if (workspaceId) {
|
||||
try {
|
||||
const finalWorkflowState = validation.sanitizedState || modifiedWorkflowState
|
||||
const { saved, errors } = await extractAndPersistCustomTools(finalWorkflowState, workspaceId, userId)
|
||||
|
||||
if (saved > 0) {
|
||||
logger.info(`Persisted ${saved} custom tool(s) to database`, { workflowId })
|
||||
}
|
||||
|
||||
if (errors.length > 0) {
|
||||
logger.warn('Some custom tools failed to persist', { errors, workflowId })
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('Failed to persist custom tools', { error, workflowId })
|
||||
}
|
||||
} else {
|
||||
logger.warn('Workflow has no workspaceId, skipping custom tools persistence', {
|
||||
workflowId,
|
||||
})
|
||||
}
|
||||
|
||||
logger.info('Workflow operation apply succeeded', {
|
||||
operationCount: operations.length,
|
||||
blocksCount: Object.keys(modifiedWorkflowState.blocks).length,
|
||||
edgesCount: modifiedWorkflowState.edges.length,
|
||||
inputValidationErrors: validationErrors.length,
|
||||
skippedItemsCount: skippedItems.length,
|
||||
schemaValidationErrors: validation.errors.length,
|
||||
validationWarnings: validation.warnings.length,
|
||||
})
|
||||
|
||||
// Format validation errors for LLM feedback
|
||||
const inputErrors =
|
||||
validationErrors.length > 0
|
||||
? validationErrors.map((e) => `Block "${e.blockId}" (${e.blockType}): ${e.error}`)
|
||||
: undefined
|
||||
|
||||
// Format skipped items for LLM feedback
|
||||
const skippedMessages =
|
||||
skippedItems.length > 0 ? skippedItems.map((item) => item.reason) : undefined
|
||||
|
||||
// Persist the workflow state to the database
|
||||
const finalWorkflowState = validation.sanitizedState || modifiedWorkflowState
|
||||
|
||||
// Apply autolayout to position blocks properly
|
||||
const layoutResult = applyAutoLayout(finalWorkflowState.blocks, finalWorkflowState.edges, {
|
||||
horizontalSpacing: 250,
|
||||
verticalSpacing: 100,
|
||||
padding: { x: 100, y: 100 },
|
||||
})
|
||||
|
||||
const layoutedBlocks =
|
||||
layoutResult.success && layoutResult.blocks ? layoutResult.blocks : finalWorkflowState.blocks
|
||||
|
||||
if (!layoutResult.success) {
|
||||
logger.warn('Autolayout failed, using default positions', {
|
||||
workflowId,
|
||||
error: layoutResult.error,
|
||||
})
|
||||
}
|
||||
|
||||
const workflowStateForDb = {
|
||||
blocks: layoutedBlocks,
|
||||
edges: finalWorkflowState.edges,
|
||||
loops: generateLoopBlocks(layoutedBlocks as any),
|
||||
parallels: generateParallelBlocks(layoutedBlocks as any),
|
||||
lastSaved: Date.now(),
|
||||
isDeployed: false,
|
||||
}
|
||||
|
||||
const saveResult = await saveWorkflowToNormalizedTables(workflowId, workflowStateForDb as any)
|
||||
if (!saveResult.success) {
|
||||
logger.error('Failed to persist workflow state to database', {
|
||||
workflowId,
|
||||
error: saveResult.error,
|
||||
})
|
||||
throw new Error(`Failed to save workflow: ${saveResult.error}`)
|
||||
}
|
||||
|
||||
// Update workflow's lastSynced timestamp
|
||||
await db
|
||||
.update(workflowTable)
|
||||
.set({
|
||||
lastSynced: new Date(),
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(workflowTable.id, workflowId))
|
||||
|
||||
logger.info('Workflow state persisted to database', { workflowId })
|
||||
|
||||
return {
|
||||
success: true,
|
||||
workflowState: { ...finalWorkflowState, blocks: layoutedBlocks },
|
||||
...(inputErrors && {
|
||||
inputValidationErrors: inputErrors,
|
||||
inputValidationMessage: `${inputErrors.length} input(s) were rejected due to validation errors. The workflow was still updated with valid inputs only. Errors: ${inputErrors.join('; ')}`,
|
||||
}),
|
||||
...(skippedMessages && {
|
||||
skippedItems: skippedMessages,
|
||||
skippedItemsMessage: `${skippedItems.length} operation(s) were skipped due to invalid references. Details: ${skippedMessages.join('; ')}`,
|
||||
}),
|
||||
}
|
||||
}
|
||||
@@ -29,6 +29,10 @@ function normalizeName(value: string): string {
|
||||
return value.trim().toLowerCase()
|
||||
}
|
||||
|
||||
function canonicalizeToken(value: string): string {
|
||||
return normalizeName(value).replace(/[^a-z0-9]/g, '')
|
||||
}
|
||||
|
||||
function resolveBlockToken(
|
||||
workflowState: { blocks: Record<string, any> },
|
||||
token: string
|
||||
@@ -36,9 +40,11 @@ function resolveBlockToken(
|
||||
if (!token) return null
|
||||
if (workflowState.blocks[token]) return token
|
||||
const normalized = normalizeName(token)
|
||||
const canonical = canonicalizeToken(token)
|
||||
for (const [blockId, block] of Object.entries(workflowState.blocks || {})) {
|
||||
const blockName = normalizeName(String((block as Record<string, unknown>).name || ''))
|
||||
if (blockName === normalized) return blockId
|
||||
if (canonicalizeToken(blockName) === canonical) return blockId
|
||||
}
|
||||
return null
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user