fix(custom-tools, copilot): custom tools state + copilot fixes (#2264)

* Workspace env vars

* Fix execution animation on copilot run

* Custom tools toolg

* Custom tools

* Fix custom tool

* remove extra fallback

* Fix lint
This commit is contained in:
Siddharth Ganesan
2025-12-08 20:14:49 -08:00
committed by GitHub
parent b7a1e8f5cf
commit 6b4d76298f
25 changed files with 1456 additions and 554 deletions

View File

@@ -353,10 +353,10 @@ export async function POST(req: NextRequest) {
executeLocally: true,
},
]
// Fetch user credentials (OAuth + API keys)
// Fetch user credentials (OAuth + API keys) - pass workflowId to get workspace env vars
try {
const rawCredentials = await getCredentialsServerTool.execute(
{},
{ workflowId },
{ userId: authenticatedUserId }
)
@@ -840,9 +840,36 @@ export async function POST(req: NextRequest) {
}
} catch (error) {
logger.error(`[${tracker.requestId}] Error processing stream:`, error)
controller.error(error)
// Send an error event to the client before closing so it knows what happened
try {
const errorMessage =
error instanceof Error && error.message === 'terminated'
? 'Connection to AI service was interrupted. Please try again.'
: 'An unexpected error occurred while processing the response.'
const encoder = new TextEncoder()
// Send error as content so it shows in the chat
controller.enqueue(
encoder.encode(
`data: ${JSON.stringify({ type: 'content', data: `\n\n_${errorMessage}_` })}\n\n`
)
)
// Send done event to properly close the stream on client
controller.enqueue(encoder.encode(`data: ${JSON.stringify({ type: 'done' })}\n\n`))
} catch (enqueueError) {
// Stream might already be closed, that's ok
logger.warn(
`[${tracker.requestId}] Could not send error event to client:`,
enqueueError
)
}
} finally {
controller.close()
try {
controller.close()
} catch {
// Controller might already be closed
}
}
},
})

View File

@@ -451,15 +451,6 @@ function RunSkipButtons({
const actionInProgressRef = useRef(false)
const { setToolCallState, addAutoAllowedTool } = useCopilotStore()
const instance = getClientTool(toolCall.id)
const interruptDisplays = instance?.getInterruptDisplays?.()
const isIntegration = isIntegrationTool(toolCall.name)
// For integration tools: Allow, Always Allow, Skip
// For client tools with interrupts: Run, Skip (or custom labels)
const acceptLabel = isIntegration ? 'Allow' : interruptDisplays?.accept?.text || 'Run'
const rejectLabel = interruptDisplays?.reject?.text || 'Skip'
const onRun = async () => {
// Prevent race condition - check ref synchronously
if (actionInProgressRef.current) return
@@ -507,20 +498,19 @@ function RunSkipButtons({
if (buttonsHidden) return null
// Standardized buttons for all interrupt tools: Allow, Always Allow, Skip
return (
<div className='mt-[12px] flex gap-[6px]'>
<Button onClick={onRun} disabled={isProcessing} variant='primary'>
{isProcessing ? <Loader2 className='mr-1 h-3 w-3 animate-spin' /> : null}
{acceptLabel}
Allow
</Button>
<Button onClick={onAlwaysAllow} disabled={isProcessing} variant='default'>
{isProcessing ? <Loader2 className='mr-1 h-3 w-3 animate-spin' /> : null}
Always Allow
</Button>
{isIntegration && (
<Button onClick={onAlwaysAllow} disabled={isProcessing} variant='default'>
{isProcessing ? <Loader2 className='mr-1 h-3 w-3 animate-spin' /> : null}
Always Allow
</Button>
)}
<Button onClick={onSkip} disabled={isProcessing} variant='default'>
{rejectLabel}
Skip
</Button>
</div>
)

View File

@@ -190,15 +190,25 @@ export const Copilot = forwardRef<CopilotRef, CopilotProps>(({ panelWidth }, ref
/**
* Cleanup on component unmount (page refresh, navigation, etc.)
* Uses a ref to track sending state to avoid stale closure issues
* Note: Parent workflow.tsx also has useStreamCleanup for page-level cleanup
*/
const isSendingRef = useRef(isSendingMessage)
isSendingRef.current = isSendingMessage
const abortMessageRef = useRef(abortMessage)
abortMessageRef.current = abortMessage
useEffect(() => {
return () => {
if (isSendingMessage) {
abortMessage()
// Use refs to check current values, not stale closure values
if (isSendingRef.current) {
abortMessageRef.current()
logger.info('Aborted active message streaming due to component unmount')
}
}
}, [isSendingMessage, abortMessage])
// Empty deps - only run cleanup on actual unmount, not on re-renders
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [])
/**
* Container-level click capture to cancel edit mode when clicking outside the current edit area

View File

@@ -58,6 +58,7 @@ interface CustomToolModalProps {
export interface CustomTool {
type: 'custom-tool'
id?: string
title: string
name: string
description: string
@@ -433,6 +434,8 @@ try {
}
}
let savedToolId: string | undefined
if (isEditing && toolIdToUpdate) {
await updateToolMutation.mutateAsync({
workspaceId,
@@ -443,8 +446,9 @@ try {
code: functionCode || '',
},
})
savedToolId = toolIdToUpdate
} else {
await createToolMutation.mutateAsync({
const result = await createToolMutation.mutateAsync({
workspaceId,
tool: {
title: name,
@@ -452,10 +456,13 @@ try {
code: functionCode || '',
},
})
// Get the ID from the created tool
savedToolId = result?.[0]?.id
}
const customTool: CustomTool = {
type: 'custom-tool',
id: savedToolId,
title: name,
name,
description,

View File

@@ -51,7 +51,10 @@ import {
import { ToolCredentialSelector } from '@/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/editor/components/sub-block/components/tool-input/components/tool-credential-selector'
import { useSubBlockValue } from '@/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/editor/components/sub-block/hooks/use-sub-block-value'
import { getAllBlocks } from '@/blocks'
import { useCustomTools } from '@/hooks/queries/custom-tools'
import {
type CustomTool as CustomToolDefinition,
useCustomTools,
} from '@/hooks/queries/custom-tools'
import { useWorkflows } from '@/hooks/queries/workflows'
import { useMcpTools } from '@/hooks/use-mcp-tools'
import { getProviderFromModel, supportsToolUsageControl } from '@/providers/utils'
@@ -85,21 +88,28 @@ interface ToolInputProps {
/**
* Represents a tool selected and configured in the workflow
*
* @remarks
* For custom tools (new format), we only store: type, customToolId, usageControl, isExpanded.
* Everything else (title, schema, code) is loaded dynamically from the database.
* Legacy custom tools with inline schema/code are still supported for backwards compatibility.
*/
interface StoredTool {
/** Block type identifier */
type: string
/** Display title for the tool */
title: string
/** Direct tool ID for execution */
toolId: string
/** Parameter values configured by the user */
params: Record<string, string>
/** Display title for the tool (optional for new custom tool format) */
title?: string
/** Direct tool ID for execution (optional for new custom tool format) */
toolId?: string
/** Parameter values configured by the user (optional for new custom tool format) */
params?: Record<string, string>
/** Whether the tool details are expanded in UI */
isExpanded?: boolean
/** Tool schema for custom tools */
/** Database ID for custom tools (new format - reference only) */
customToolId?: string
/** Tool schema for custom tools (legacy format - inline) */
schema?: any
/** Implementation code for custom tools */
/** Implementation code for custom tools (legacy format - inline) */
code?: string
/** Selected operation for multi-operation tools */
operation?: string
@@ -107,6 +117,55 @@ interface StoredTool {
usageControl?: 'auto' | 'force' | 'none'
}
/**
* Resolves a custom tool reference to its full definition
*
* @remarks
* Custom tools can be stored in two formats:
* 1. Reference-only (new): { customToolId: "...", usageControl: "auto" } - loads from database
* 2. Inline (legacy): { schema: {...}, code: "..." } - uses embedded definition
*
* @param storedTool - The stored tool reference
* @param customToolsList - List of custom tools from the database
* @returns The resolved custom tool with full definition, or null if not found
*/
function resolveCustomToolFromReference(
storedTool: StoredTool,
customToolsList: CustomToolDefinition[]
): { schema: any; code: string; title: string } | null {
// If the tool has a customToolId (new reference format), look it up
if (storedTool.customToolId) {
const customTool = customToolsList.find((t) => t.id === storedTool.customToolId)
if (customTool) {
return {
schema: customTool.schema,
code: customTool.code,
title: customTool.title,
}
}
// If not found by ID, fall through to try other methods
logger.warn(`Custom tool not found by ID: ${storedTool.customToolId}`)
}
// Legacy format: inline schema and code
if (storedTool.schema && storedTool.code !== undefined) {
return {
schema: storedTool.schema,
code: storedTool.code,
title: storedTool.title || '',
}
}
return null
}
/**
* Checks if a stored custom tool is a reference-only format (no inline code/schema)
*/
function isCustomToolReference(storedTool: StoredTool): boolean {
return storedTool.type === 'custom-tool' && !!storedTool.customToolId && !storedTool.code
}
/**
* Generic sync wrapper that synchronizes store values with local component state
*
@@ -954,18 +1013,25 @@ export function ToolInput({
(customTool: CustomTool) => {
if (isPreview || disabled) return
const customToolId = `custom-${customTool.schema?.function?.name || 'unknown'}`
const newTool: StoredTool = {
type: 'custom-tool',
title: customTool.title,
toolId: customToolId,
params: {},
isExpanded: true,
schema: customTool.schema,
code: customTool.code || '',
usageControl: 'auto',
}
// If the tool has a database ID, store minimal reference
// Otherwise, store inline for backwards compatibility
const newTool: StoredTool = customTool.id
? {
type: 'custom-tool',
customToolId: customTool.id,
usageControl: 'auto',
isExpanded: true,
}
: {
type: 'custom-tool',
title: customTool.title,
toolId: `custom-${customTool.schema?.function?.name || 'unknown'}`,
params: {},
isExpanded: true,
schema: customTool.schema,
code: customTool.code || '',
usageControl: 'auto',
}
setStoreValue([...selectedTools.map((tool) => ({ ...tool, isExpanded: false })), newTool])
},
@@ -975,12 +1041,21 @@ export function ToolInput({
const handleEditCustomTool = useCallback(
(toolIndex: number) => {
const tool = selectedTools[toolIndex]
if (tool.type !== 'custom-tool' || !tool.schema) return
if (tool.type !== 'custom-tool') return
// For reference-only tools, we need to resolve the tool from the database
// The modal will handle loading the full definition
const resolved = resolveCustomToolFromReference(tool, customTools)
if (!resolved && !tool.schema) {
// Tool not found and no inline definition - can't edit
logger.warn('Cannot edit custom tool - not found in database and no inline definition')
return
}
setEditingToolIndex(toolIndex)
setCustomToolModalOpen(true)
},
[selectedTools]
[selectedTools, customTools]
)
const handleSaveCustomTool = useCallback(
@@ -988,17 +1063,26 @@ export function ToolInput({
if (isPreview || disabled) return
if (editingToolIndex !== null) {
const existingTool = selectedTools[editingToolIndex]
// If the tool has a database ID, convert to minimal reference format
// Otherwise keep inline for backwards compatibility
const updatedTool: StoredTool = customTool.id
? {
type: 'custom-tool',
customToolId: customTool.id,
usageControl: existingTool.usageControl || 'auto',
isExpanded: existingTool.isExpanded,
}
: {
...existingTool,
title: customTool.title,
schema: customTool.schema,
code: customTool.code || '',
}
setStoreValue(
selectedTools.map((tool, index) =>
index === editingToolIndex
? {
...tool,
title: customTool.title,
schema: customTool.schema,
code: customTool.code || '',
}
: tool
)
selectedTools.map((tool, index) => (index === editingToolIndex ? updatedTool : tool))
)
setEditingToolIndex(null)
} else {
@@ -1019,8 +1103,15 @@ export function ToolInput({
const handleDeleteTool = useCallback(
(toolId: string) => {
const updatedTools = selectedTools.filter((tool) => {
if (tool.type !== 'custom-tool') return true
// New format: check customToolId
if (tool.customToolId === toolId) {
return false
}
// Legacy format: check by function name match
if (
tool.type === 'custom-tool' &&
tool.schema?.function?.name &&
customTools.some(
(customTool) =>
@@ -1083,12 +1174,12 @@ export function ToolInput({
const initialParams = initializeToolParams(newToolId, toolParams.userInputParameters, blockId)
const oldToolParams = getToolParametersConfig(tool.toolId, tool.type)
const oldToolParams = tool.toolId ? getToolParametersConfig(tool.toolId, tool.type) : null
const oldParamIds = new Set(oldToolParams?.userInputParameters.map((p) => p.id) || [])
const newParamIds = new Set(toolParams.userInputParameters.map((p) => p.id))
const preservedParams: Record<string, string> = {}
Object.entries(tool.params).forEach(([paramId, value]) => {
Object.entries(tool.params || {}).forEach(([paramId, value]) => {
if (newParamIds.has(paramId) && value) {
preservedParams[paramId] = value
}
@@ -1666,15 +1757,13 @@ export function ToolInput({
key={customTool.id}
value={customTool.title}
onSelect={() => {
// Store minimal reference - only ID, usageControl, isExpanded
// Everything else (title, toolId, params) loaded dynamically
const newTool: StoredTool = {
type: 'custom-tool',
title: customTool.title,
toolId: `custom-${customTool.schema?.function?.name || 'unknown'}`,
params: {},
isExpanded: true,
schema: customTool.schema,
code: customTool.code,
customToolId: customTool.id,
usageControl: 'auto',
isExpanded: true,
}
setStoreValue([
@@ -1757,22 +1846,33 @@ export function ToolInput({
// Get the current tool ID (may change based on operation)
const currentToolId =
!isCustomTool && !isMcpTool
? getToolIdForOperation(tool.type, tool.operation) || tool.toolId
: tool.toolId
? getToolIdForOperation(tool.type, tool.operation) || tool.toolId || ''
: tool.toolId || ''
// Get tool parameters using the new utility with block type for UI components
const toolParams =
!isCustomTool && !isMcpTool ? getToolParametersConfig(currentToolId, tool.type) : null
!isCustomTool && !isMcpTool && currentToolId
? getToolParametersConfig(currentToolId, tool.type)
: null
// For custom tools, extract parameters from schema
// For custom tools, resolve from reference (new format) or use inline (legacy)
const resolvedCustomTool = isCustomTool
? resolveCustomToolFromReference(tool, customTools)
: null
// Derive title and schema from resolved tool or inline data
const customToolTitle = isCustomTool
? tool.title || resolvedCustomTool?.title || 'Unknown Tool'
: null
const customToolSchema = isCustomTool ? tool.schema || resolvedCustomTool?.schema : null
const customToolParams =
isCustomTool && tool.schema && tool.schema.function?.parameters?.properties
? Object.entries(tool.schema.function.parameters.properties || {}).map(
isCustomTool && customToolSchema?.function?.parameters?.properties
? Object.entries(customToolSchema.function.parameters.properties || {}).map(
([paramId, param]: [string, any]) => ({
id: paramId,
type: param.type || 'string',
description: param.description || '',
visibility: (tool.schema.function.parameters.required?.includes(paramId)
visibility: (customToolSchema.function.parameters.required?.includes(paramId)
? 'user-or-llm'
: 'user-only') as 'user-or-llm' | 'user-only' | 'llm-only' | 'hidden',
})
@@ -1805,9 +1905,12 @@ export function ToolInput({
: toolParams?.userInputParameters || []
// Check if tool requires OAuth
const requiresOAuth = !isCustomTool && !isMcpTool && toolRequiresOAuth(currentToolId)
const requiresOAuth =
!isCustomTool && !isMcpTool && currentToolId && toolRequiresOAuth(currentToolId)
const oauthConfig =
!isCustomTool && !isMcpTool ? getToolOAuthConfig(currentToolId) : null
!isCustomTool && !isMcpTool && currentToolId
? getToolOAuthConfig(currentToolId)
: null
// Tools are always expandable so users can access the interface
const isExpandedForDisplay = isPreview
@@ -1816,7 +1919,7 @@ export function ToolInput({
return (
<div
key={`${tool.toolId}-${toolIndex}`}
key={`${tool.customToolId || tool.toolId || toolIndex}-${toolIndex}`}
className={cn(
'group relative flex flex-col overflow-visible rounded-[4px] border border-[var(--border-strong)] bg-[var(--surface-4)] transition-all duration-200 ease-in-out',
draggedIndex === toolIndex ? 'scale-95 opacity-40' : '',
@@ -1872,7 +1975,7 @@ export function ToolInput({
)}
</div>
<span className='truncate font-medium text-[13px] text-[var(--text-primary)]'>
{tool.title}
{isCustomTool ? customToolTitle : tool.title}
</span>
</div>
<div className='flex flex-shrink-0 items-center gap-[8px]'>
@@ -1968,7 +2071,7 @@ export function ToolInput({
</div>
<div className='w-full min-w-0'>
<ToolCredentialSelector
value={tool.params.credential || ''}
value={tool.params?.credential || ''}
onChange={(value) => handleParamChange(toolIndex, 'credential', value)}
provider={oauthConfig.provider as OAuthProvider}
requiredScopes={
@@ -2016,7 +2119,7 @@ export function ToolInput({
const firstParam = params[0] as ToolParameterConfig
const groupValue = JSON.stringify(
params.reduce(
(acc, p) => ({ ...acc, [p.id]: tool.params[p.id] === 'true' }),
(acc, p) => ({ ...acc, [p.id]: tool.params?.[p.id] === 'true' }),
{}
)
)
@@ -2075,10 +2178,10 @@ export function ToolInput({
{param.uiComponent ? (
renderParameterInput(
param,
tool.params[param.id] || '',
tool.params?.[param.id] || '',
(value) => handleParamChange(toolIndex, param.id, value),
toolIndex,
tool.params
tool.params || {}
)
) : (
<ShortInput
@@ -2094,7 +2197,7 @@ export function ToolInput({
type: 'short-input',
title: param.id,
}}
value={tool.params[param.id] || ''}
value={tool.params?.[param.id] || ''}
onChange={(value) =>
handleParamChange(toolIndex, param.id, value)
}
@@ -2267,15 +2370,35 @@ export function ToolInput({
blockId={blockId}
initialValues={
editingToolIndex !== null && selectedTools[editingToolIndex]?.type === 'custom-tool'
? {
id: customTools.find(
(tool) =>
tool.schema?.function?.name ===
selectedTools[editingToolIndex].schema?.function?.name
)?.id,
schema: selectedTools[editingToolIndex].schema,
code: selectedTools[editingToolIndex].code || '',
}
? (() => {
const storedTool = selectedTools[editingToolIndex]
// Resolve the full tool definition from reference or inline
const resolved = resolveCustomToolFromReference(storedTool, customTools)
if (resolved) {
// Find the database ID
const dbTool = storedTool.customToolId
? customTools.find((t) => t.id === storedTool.customToolId)
: customTools.find(
(t) => t.schema?.function?.name === resolved.schema?.function?.name
)
return {
id: dbTool?.id,
schema: resolved.schema,
code: resolved.code,
}
}
// Fallback to inline definition (legacy format)
return {
id: customTools.find(
(tool) => tool.schema?.function?.name === storedTool.schema?.function?.name
)?.id,
schema: storedTool.schema,
code: storedTool.code || '',
}
})()
: undefined
}
/>

View File

@@ -1,7 +1,9 @@
import { v4 as uuidv4 } from 'uuid'
import type { ExecutionResult, StreamingExecution } from '@/executor/types'
import { useExecutionStore } from '@/stores/execution/store'
import { useTerminalConsoleStore } from '@/stores/terminal'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
import { useWorkflowStore } from '@/stores/workflows/workflow/store'
export interface WorkflowExecutionOptions {
workflowInput?: any
@@ -26,6 +28,11 @@ export async function executeWorkflowWithFullLogging(
const executionId = options.executionId || uuidv4()
const { addConsole } = useTerminalConsoleStore.getState()
const { setActiveBlocks, setBlockRunStatus, setEdgeRunStatus } = useExecutionStore.getState()
const workflowEdges = useWorkflowStore.getState().edges
// Track active blocks for pulsing animation
const activeBlocksSet = new Set<string>()
const payload: any = {
input: options.workflowInput,
@@ -81,7 +88,29 @@ export async function executeWorkflowWithFullLogging(
const event = JSON.parse(data)
switch (event.type) {
case 'block:started': {
// Add block to active set for pulsing animation
activeBlocksSet.add(event.data.blockId)
setActiveBlocks(new Set(activeBlocksSet))
// Track edges that led to this block as soon as execution starts
const incomingEdges = workflowEdges.filter(
(edge) => edge.target === event.data.blockId
)
incomingEdges.forEach((edge) => {
setEdgeRunStatus(edge.id, 'success')
})
break
}
case 'block:completed':
// Remove block from active set
activeBlocksSet.delete(event.data.blockId)
setActiveBlocks(new Set(activeBlocksSet))
// Track successful block execution in run path
setBlockRunStatus(event.data.blockId, 'success')
addConsole({
input: event.data.input || {},
output: event.data.output,
@@ -105,6 +134,13 @@ export async function executeWorkflowWithFullLogging(
break
case 'block:error':
// Remove block from active set
activeBlocksSet.delete(event.data.blockId)
setActiveBlocks(new Set(activeBlocksSet))
// Track failed block execution in run path
setBlockRunStatus(event.data.blockId, 'error')
addConsole({
input: event.data.input || {},
output: {},
@@ -147,6 +183,8 @@ export async function executeWorkflowWithFullLogging(
}
} finally {
reader.releaseLock()
// Clear active blocks when execution ends
setActiveBlocks(new Set())
}
return executionResult

View File

@@ -127,7 +127,8 @@ export class AgentBlockHandler implements BlockHandler {
})
.map(async (tool) => {
try {
if (tool.type === 'custom-tool' && tool.schema) {
// Handle custom tools - either inline (schema) or reference (customToolId)
if (tool.type === 'custom-tool' && (tool.schema || tool.customToolId)) {
return await this.createCustomTool(ctx, tool)
}
if (tool.type === 'mcp') {
@@ -151,24 +152,47 @@ export class AgentBlockHandler implements BlockHandler {
private async createCustomTool(ctx: ExecutionContext, tool: ToolInput): Promise<any> {
const userProvidedParams = tool.params || {}
// Resolve tool definition - either inline or from database reference
let schema = tool.schema
let code = tool.code
let title = tool.title
// If this is a reference-only tool (has customToolId but no schema), fetch from API
if (tool.customToolId && !schema) {
const resolved = await this.fetchCustomToolById(ctx, tool.customToolId)
if (!resolved) {
logger.error(`Custom tool not found: ${tool.customToolId}`)
return null
}
schema = resolved.schema
code = resolved.code
title = resolved.title
}
// Validate we have the required data
if (!schema?.function) {
logger.error('Custom tool missing schema:', { customToolId: tool.customToolId, title })
return null
}
const { filterSchemaForLLM, mergeToolParameters } = await import('@/tools/params')
const filteredSchema = filterSchemaForLLM(tool.schema.function.parameters, userProvidedParams)
const filteredSchema = filterSchemaForLLM(schema.function.parameters, userProvidedParams)
const toolId = `${AGENT.CUSTOM_TOOL_PREFIX}${tool.title}`
const toolId = `${AGENT.CUSTOM_TOOL_PREFIX}${title}`
const base: any = {
id: toolId,
name: tool.schema.function.name,
description: tool.schema.function.description || '',
name: schema.function.name,
description: schema.function.description || '',
params: userProvidedParams,
parameters: {
...filteredSchema,
type: tool.schema.function.parameters.type,
type: schema.function.parameters.type,
},
usageControl: tool.usageControl || 'auto',
}
if (tool.code) {
if (code) {
base.executeFunction = async (callParams: Record<string, any>) => {
const mergedParams = mergeToolParameters(userProvidedParams, callParams)
@@ -177,7 +201,7 @@ export class AgentBlockHandler implements BlockHandler {
const result = await executeTool(
'function_execute',
{
code: tool.code,
code,
...mergedParams,
timeout: tool.timeout ?? AGENT.DEFAULT_FUNCTION_TIMEOUT,
envVars: ctx.environmentVariables || {},
@@ -205,6 +229,78 @@ export class AgentBlockHandler implements BlockHandler {
return base
}
/**
* Fetches a custom tool definition from the database by ID
* Uses Zustand store in browser, API call on server
*/
private async fetchCustomToolById(
ctx: ExecutionContext,
customToolId: string
): Promise<{ schema: any; code: string; title: string } | null> {
// In browser, use the Zustand store which has cached data from React Query
if (typeof window !== 'undefined') {
try {
const { useCustomToolsStore } = await import('@/stores/custom-tools/store')
const tool = useCustomToolsStore.getState().getTool(customToolId)
if (tool) {
return {
schema: tool.schema,
code: tool.code || '',
title: tool.title,
}
}
logger.warn(`Custom tool not found in store: ${customToolId}`)
} catch (error) {
logger.error('Error accessing custom tools store:', { error })
}
}
// Server-side: fetch from API
try {
const headers = await buildAuthHeaders()
const params: Record<string, string> = {}
if (ctx.workspaceId) {
params.workspaceId = ctx.workspaceId
}
if (ctx.workflowId) {
params.workflowId = ctx.workflowId
}
const url = buildAPIUrl('/api/tools/custom', params)
const response = await fetch(url.toString(), {
method: 'GET',
headers,
})
if (!response.ok) {
logger.error(`Failed to fetch custom tools: ${response.status}`)
return null
}
const data = await response.json()
if (!data.data || !Array.isArray(data.data)) {
logger.error('Invalid custom tools API response')
return null
}
const tool = data.data.find((t: any) => t.id === customToolId)
if (!tool) {
logger.warn(`Custom tool not found by ID: ${customToolId}`)
return null
}
return {
schema: tool.schema,
code: tool.code || '',
title: tool.title,
}
} catch (error) {
logger.error('Error fetching custom tool:', { customToolId, error })
return null
}
}
private async createMcpTool(ctx: ExecutionContext, tool: ToolInput): Promise<any> {
const { serverId, toolName, ...userProvidedParams } = tool.params || {}

View File

@@ -32,6 +32,8 @@ export interface ToolInput {
timeout?: number
usageControl?: 'auto' | 'force' | 'none'
operation?: string
/** Database ID for custom tools (new reference format) */
customToolId?: string
}
export interface Message {

View File

@@ -245,9 +245,11 @@ export class LoopOrchestrator {
}
/**
* Evaluates the initial condition for while loops at the sentinel start.
* For while loops, the condition must be checked BEFORE the first iteration.
* If the condition is false, the loop body should be skipped entirely.
* Evaluates the initial condition for loops at the sentinel start.
* - For while loops, the condition must be checked BEFORE the first iteration.
* - For forEach loops, skip if the items array is empty.
* - For for loops, skip if maxIterations is 0.
* - For doWhile loops, always execute at least once.
*
* @returns true if the loop should execute, false if it should be skipped
*/
@@ -258,27 +260,47 @@ export class LoopOrchestrator {
return true
}
// Only while loops need an initial condition check
// - for/forEach: always execute based on iteration count/items
// - doWhile: always execute at least once, check condition after
// - while: check condition before first iteration
if (scope.loopType !== 'while') {
// forEach: skip if items array is empty
if (scope.loopType === 'forEach') {
if (!scope.items || scope.items.length === 0) {
logger.info('ForEach loop has empty items, skipping loop body', { loopId })
return false
}
return true
}
if (!scope.condition) {
logger.warn('No condition defined for while loop', { loopId })
return false
// for: skip if maxIterations is 0
if (scope.loopType === 'for') {
if (scope.maxIterations === 0) {
logger.info('For loop has 0 iterations, skipping loop body', { loopId })
return false
}
return true
}
const result = this.evaluateWhileCondition(ctx, scope.condition, scope)
logger.info('While loop initial condition evaluation', {
loopId,
condition: scope.condition,
result,
})
// doWhile: always execute at least once
if (scope.loopType === 'doWhile') {
return true
}
return result
// while: check condition before first iteration
if (scope.loopType === 'while') {
if (!scope.condition) {
logger.warn('No condition defined for while loop', { loopId })
return false
}
const result = this.evaluateWhileCondition(ctx, scope.condition, scope)
logger.info('While loop initial condition evaluation', {
loopId,
condition: scope.condition,
result,
})
return result
}
return true
}
shouldExecuteLoopNode(_ctx: ExecutionContext, _nodeId: string, _loopId: string): boolean {

View File

@@ -38,17 +38,42 @@ export function extractLoopIdFromSentinel(sentinelId: string): string | null {
/**
* Parse distribution items from parallel config
* Handles: arrays, JSON strings, and references
* Handles: arrays, JSON strings, objects, and references
* Note: References (starting with '<') cannot be resolved at DAG construction time,
* they must be resolved at runtime. This function returns [] for references.
*/
export function parseDistributionItems(config: SerializedParallel): any[] {
const rawItems = config.distribution ?? []
if (typeof rawItems === 'string' && rawItems.startsWith(REFERENCE.START)) {
return []
// Already an array - return as-is
if (Array.isArray(rawItems)) {
return rawItems
}
// Object - convert to entries array (consistent with loop forEach behavior)
if (typeof rawItems === 'object' && rawItems !== null) {
return Object.entries(rawItems)
}
// String handling
if (typeof rawItems === 'string') {
// References cannot be resolved at DAG construction time
if (rawItems.startsWith(REFERENCE.START) && rawItems.endsWith(REFERENCE.END)) {
return []
}
// Try to parse as JSON
try {
const normalizedJSON = rawItems.replace(/'/g, '"')
return JSON.parse(normalizedJSON)
const parsed = JSON.parse(normalizedJSON)
if (Array.isArray(parsed)) {
return parsed
}
// Parsed to non-array (e.g. object) - convert to entries
if (typeof parsed === 'object' && parsed !== null) {
return Object.entries(parsed)
}
return []
} catch (error) {
logger.error('Failed to parse distribution items', {
rawItems,
@@ -57,12 +82,7 @@ export function parseDistributionItems(config: SerializedParallel): any[] {
return []
}
}
if (Array.isArray(rawItems)) {
return rawItems
}
if (typeof rawItems === 'object' && rawItems !== null) {
return [rawItems]
}
return []
}
/**

View File

@@ -98,16 +98,43 @@ export class ParallelResolver implements Resolver {
return undefined
}
private getDistributionItems(parallelConfig: any): any {
let distributionItems = parallelConfig.distributionItems || parallelConfig.distribution || []
if (typeof distributionItems === 'string' && !distributionItems.startsWith('<')) {
private getDistributionItems(parallelConfig: any): any[] {
const rawItems = parallelConfig.distributionItems || parallelConfig.distribution || []
// Already an array - return as-is
if (Array.isArray(rawItems)) {
return rawItems
}
// Object - convert to entries array (consistent with loop forEach behavior)
if (typeof rawItems === 'object' && rawItems !== null) {
return Object.entries(rawItems)
}
// String handling
if (typeof rawItems === 'string') {
// Skip references - they should be resolved by the variable resolver
if (rawItems.startsWith('<')) {
return []
}
// Try to parse as JSON
try {
distributionItems = JSON.parse(distributionItems.replace(/'/g, '"'))
const parsed = JSON.parse(rawItems.replace(/'/g, '"'))
if (Array.isArray(parsed)) {
return parsed
}
// Parsed to non-array (e.g. object) - convert to entries
if (typeof parsed === 'object' && parsed !== null) {
return Object.entries(parsed)
}
return []
} catch (e) {
logger.error('Failed to parse distribution items', { distributionItems })
logger.error('Failed to parse distribution items', { rawItems })
return []
}
}
return distributionItems
return []
}
}

View File

@@ -31,6 +31,7 @@ export const ToolIds = z.enum([
'check_deployment_status',
'navigate_ui',
'knowledge_base',
'manage_custom_tool',
])
export type ToolId = z.infer<typeof ToolIds>
@@ -187,6 +188,45 @@ export const ToolArgSchemas = {
}),
knowledge_base: KnowledgeBaseArgsSchema,
manage_custom_tool: z.object({
operation: z
.enum(['add', 'edit', 'delete'])
.describe('The operation to perform: add (create new), edit (update existing), or delete'),
toolId: z
.string()
.optional()
.describe(
'Required for edit and delete operations. The database ID of the custom tool (e.g., "0robnW7_JUVwZrDkq1mqj"). Use get_workflow_data with data_type "custom_tools" to get the list of tools and their IDs. Do NOT use the function name - use the actual "id" field from the tool.'
),
title: z
.string()
.optional()
.describe(
'The display title of the custom tool. Required for add. Should always be provided for edit/delete so the user knows which tool is being modified.'
),
schema: z
.object({
type: z.literal('function'),
function: z.object({
name: z.string().describe('The function name (camelCase, e.g. getWeather)'),
description: z.string().optional().describe('What the function does'),
parameters: z.object({
type: z.string(),
properties: z.record(z.any()),
required: z.array(z.string()).optional(),
}),
}),
})
.optional()
.describe('Required for add. The OpenAI function calling format schema.'),
code: z
.string()
.optional()
.describe(
'Required for add. The JavaScript function body code. Use {{ENV_VAR}} for environment variables and reference parameters directly by name.'
),
}),
} as const
export type ToolArgSchemaMap = typeof ToolArgSchemas
@@ -251,6 +291,7 @@ export const ToolSSESchemas = {
),
navigate_ui: toolCallSSEFor('navigate_ui', ToolArgSchemas.navigate_ui),
knowledge_base: toolCallSSEFor('knowledge_base', ToolArgSchemas.knowledge_base),
manage_custom_tool: toolCallSSEFor('manage_custom_tool', ToolArgSchemas.manage_custom_tool),
} as const
export type ToolSSESchemaMap = typeof ToolSSESchemas
@@ -471,6 +512,13 @@ export const ToolResultSchemas = {
navigated: z.boolean(),
}),
knowledge_base: KnowledgeBaseResultSchema,
manage_custom_tool: z.object({
success: z.boolean(),
operation: z.enum(['add', 'edit', 'delete']),
toolId: z.string().optional(),
title: z.string().optional(),
message: z.string().optional(),
}),
} as const
export type ToolResultSchemaMap = typeof ToolResultSchemas

View File

@@ -4,6 +4,12 @@ import { createLogger } from '@/lib/logs/console/logger'
const baseToolLogger = createLogger('BaseClientTool')
/** Default timeout for tool execution (5 minutes) */
const DEFAULT_TOOL_TIMEOUT_MS = 2 * 60 * 1000
/** Timeout for tools that run workflows (10 minutes) */
export const WORKFLOW_EXECUTION_TIMEOUT_MS = 10 * 60 * 1000
// Client tool call states used by the new runtime
export enum ClientToolCallState {
generating = 'generating',
@@ -52,6 +58,8 @@ export class BaseClientTool {
readonly name: string
protected state: ClientToolCallState
protected metadata: BaseClientToolMetadata
protected isMarkedComplete = false
protected timeoutMs: number = DEFAULT_TOOL_TIMEOUT_MS
constructor(toolCallId: string, name: string, metadata: BaseClientToolMetadata) {
this.toolCallId = toolCallId
@@ -60,14 +68,98 @@ export class BaseClientTool {
this.state = ClientToolCallState.generating
}
/**
* Set a custom timeout for this tool (in milliseconds)
*/
setTimeoutMs(ms: number): void {
this.timeoutMs = ms
}
/**
* Check if this tool has been marked complete
*/
hasBeenMarkedComplete(): boolean {
return this.isMarkedComplete
}
/**
* Ensure the tool is marked complete. If not already marked, marks it with error.
* This should be called in finally blocks to prevent leaked tool calls.
*/
async ensureMarkedComplete(
fallbackMessage = 'Tool execution did not complete properly'
): Promise<void> {
if (!this.isMarkedComplete) {
baseToolLogger.warn('Tool was not marked complete, marking with error', {
toolCallId: this.toolCallId,
toolName: this.name,
state: this.state,
})
await this.markToolComplete(500, fallbackMessage)
this.setState(ClientToolCallState.error)
}
}
/**
* Execute with timeout protection. Wraps the execution in a timeout and ensures
* markToolComplete is always called.
*/
async executeWithTimeout(executeFn: () => Promise<void>, timeoutMs?: number): Promise<void> {
const timeout = timeoutMs ?? this.timeoutMs
let timeoutId: NodeJS.Timeout | null = null
try {
await Promise.race([
executeFn(),
new Promise<never>((_, reject) => {
timeoutId = setTimeout(() => {
reject(new Error(`Tool execution timed out after ${timeout / 1000} seconds`))
}, timeout)
}),
])
} catch (error) {
const message = error instanceof Error ? error.message : String(error)
baseToolLogger.error('Tool execution failed or timed out', {
toolCallId: this.toolCallId,
toolName: this.name,
error: message,
})
// Only mark complete if not already marked
if (!this.isMarkedComplete) {
await this.markToolComplete(500, message)
this.setState(ClientToolCallState.error)
}
} finally {
if (timeoutId) clearTimeout(timeoutId)
// Ensure tool is always marked complete
await this.ensureMarkedComplete()
}
}
// Intentionally left empty - specific tools can override
// eslint-disable-next-line @typescript-eslint/no-unused-vars
async execute(_args?: Record<string, any>): Promise<void> {
return
}
// Mark a tool as complete on the server (proxies to server-side route)
/**
* Mark a tool as complete on the server (proxies to server-side route).
* Once called, the tool is considered complete and won't be marked again.
*/
async markToolComplete(status: number, message?: any, data?: any): Promise<boolean> {
// Prevent double-marking
if (this.isMarkedComplete) {
baseToolLogger.warn('markToolComplete called but tool already marked complete', {
toolCallId: this.toolCallId,
toolName: this.name,
existingState: this.state,
attemptedStatus: status,
})
return true
}
this.isMarkedComplete = true
try {
baseToolLogger.info('markToolComplete called', {
toolCallId: this.toolCallId,
@@ -78,6 +170,7 @@ export class BaseClientTool {
hasData: data !== undefined,
})
} catch {}
try {
const res = await fetch('/api/copilot/tools/mark-complete', {
method: 'POST',
@@ -104,7 +197,11 @@ export class BaseClientTool {
const json = (await res.json()) as { success?: boolean }
return json?.success === true
} catch (e) {
// Default failure path
// Default failure path - but tool is still marked complete locally
baseToolLogger.error('Failed to mark tool complete on server', {
toolCallId: this.toolCallId,
error: e instanceof Error ? e.message : String(e),
})
return false
}
}

View File

@@ -197,9 +197,15 @@ export class EditWorkflowClientTool extends BaseClientTool {
async execute(args?: EditWorkflowArgs): Promise<void> {
const logger = createLogger('EditWorkflowClientTool')
try {
// Use timeout protection to ensure tool always completes
await this.executeWithTimeout(async () => {
if (this.hasExecuted) {
logger.info('execute skipped (already executed)', { toolCallId: this.toolCallId })
// Even if skipped, ensure we mark complete
if (!this.hasBeenMarkedComplete()) {
await this.markToolComplete(200, 'Tool already executed')
}
return
}
this.hasExecuted = true
@@ -252,137 +258,136 @@ export class EditWorkflowClientTool extends BaseClientTool {
}
}
const res = await fetch('/api/copilot/execute-copilot-server-tool', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
toolName: 'edit_workflow',
payload: {
operations,
workflowId,
...(currentUserWorkflow ? { currentUserWorkflow } : {}),
},
}),
})
if (!res.ok) {
const errorText = await res.text().catch(() => '')
try {
const errorJson = JSON.parse(errorText)
throw new Error(errorJson.error || errorText || `Server error (${res.status})`)
} catch {
throw new Error(errorText || `Server error (${res.status})`)
}
}
// Fetch with AbortController for timeout support
const controller = new AbortController()
const fetchTimeout = setTimeout(() => controller.abort(), 60000) // 60s fetch timeout
const json = await res.json()
const parsed = ExecuteResponseSuccessSchema.parse(json)
const result = parsed.result as any
this.lastResult = result
logger.info('server result parsed', {
hasWorkflowState: !!result?.workflowState,
blocksCount: result?.workflowState
? Object.keys(result.workflowState.blocks || {}).length
: 0,
hasSkippedItems: !!result?.skippedItems,
skippedItemsCount: result?.skippedItems?.length || 0,
hasInputValidationErrors: !!result?.inputValidationErrors,
inputValidationErrorsCount: result?.inputValidationErrors?.length || 0,
})
// Log skipped items and validation errors for visibility
if (result?.skippedItems?.length > 0) {
logger.warn('Some operations were skipped during edit_workflow', {
skippedItems: result.skippedItems,
try {
const res = await fetch('/api/copilot/execute-copilot-server-tool', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
toolName: 'edit_workflow',
payload: {
operations,
workflowId,
...(currentUserWorkflow ? { currentUserWorkflow } : {}),
},
}),
signal: controller.signal,
})
}
if (result?.inputValidationErrors?.length > 0) {
logger.warn('Some inputs were rejected during edit_workflow', {
inputValidationErrors: result.inputValidationErrors,
})
}
// Update diff directly with workflow state - no YAML conversion needed!
// The diff engine may transform the workflow state (e.g., assign new IDs), so we must use
// the returned proposedState rather than the original result.workflowState
let actualDiffWorkflow: WorkflowState | null = null
clearTimeout(fetchTimeout)
if (result.workflowState) {
try {
if (!this.hasAppliedDiff) {
const diffStore = useWorkflowDiffStore.getState()
// setProposedChanges applies the state directly to the workflow store
await diffStore.setProposedChanges(result.workflowState)
logger.info('diff proposed changes set for edit_workflow with direct workflow state')
this.hasAppliedDiff = true
// Read back the applied state from the workflow store
const workflowStore = useWorkflowStore.getState()
actualDiffWorkflow = workflowStore.getWorkflowState()
} else {
logger.info('skipping diff apply (already applied)')
// If we already applied, read from workflow store
const workflowStore = useWorkflowStore.getState()
actualDiffWorkflow = workflowStore.getWorkflowState()
if (!res.ok) {
const errorText = await res.text().catch(() => '')
try {
const errorJson = JSON.parse(errorText)
throw new Error(errorJson.error || errorText || `Server error (${res.status})`)
} catch {
throw new Error(errorText || `Server error (${res.status})`)
}
} catch (e) {
logger.warn('Failed to set proposed changes in diff store', e as any)
throw new Error('Failed to create workflow diff')
}
} else {
throw new Error('No workflow state returned from server')
}
if (!actualDiffWorkflow) {
throw new Error('Failed to retrieve workflow from diff store after setting changes')
}
const json = await res.json()
const parsed = ExecuteResponseSuccessSchema.parse(json)
const result = parsed.result as any
this.lastResult = result
logger.info('server result parsed', {
hasWorkflowState: !!result?.workflowState,
blocksCount: result?.workflowState
? Object.keys(result.workflowState.blocks || {}).length
: 0,
hasSkippedItems: !!result?.skippedItems,
skippedItemsCount: result?.skippedItems?.length || 0,
hasInputValidationErrors: !!result?.inputValidationErrors,
inputValidationErrorsCount: result?.inputValidationErrors?.length || 0,
})
// Get the workflow state that was just applied, merge subblocks, and sanitize
// This matches what get_user_workflow would return (the true state after edits were applied)
const workflowJson = this.getSanitizedWorkflowJson(actualDiffWorkflow)
// Build sanitized data including workflow JSON and any skipped/validation info
const sanitizedData: Record<string, any> = {}
if (workflowJson) {
sanitizedData.userWorkflow = workflowJson
}
// Include skipped items and validation errors in the response for LLM feedback
if (result?.skippedItems?.length > 0) {
sanitizedData.skippedItems = result.skippedItems
sanitizedData.skippedItemsMessage = result.skippedItemsMessage
}
if (result?.inputValidationErrors?.length > 0) {
sanitizedData.inputValidationErrors = result.inputValidationErrors
sanitizedData.inputValidationMessage = result.inputValidationMessage
}
// Build a message that includes info about skipped items
let completeMessage = 'Workflow diff ready for review'
if (result?.skippedItems?.length > 0 || result?.inputValidationErrors?.length > 0) {
const parts: string[] = []
// Log skipped items and validation errors for visibility
if (result?.skippedItems?.length > 0) {
parts.push(`${result.skippedItems.length} operation(s) skipped`)
logger.warn('Some operations were skipped during edit_workflow', {
skippedItems: result.skippedItems,
})
}
if (result?.inputValidationErrors?.length > 0) {
parts.push(`${result.inputValidationErrors.length} input(s) rejected`)
logger.warn('Some inputs were rejected during edit_workflow', {
inputValidationErrors: result.inputValidationErrors,
})
}
completeMessage = `Workflow diff ready for review. Note: ${parts.join(', ')}.`
// Update diff directly with workflow state - no YAML conversion needed!
if (!result.workflowState) {
throw new Error('No workflow state returned from server')
}
let actualDiffWorkflow: WorkflowState | null = null
if (!this.hasAppliedDiff) {
const diffStore = useWorkflowDiffStore.getState()
// setProposedChanges applies the state optimistically to the workflow store
await diffStore.setProposedChanges(result.workflowState)
logger.info('diff proposed changes set for edit_workflow with direct workflow state')
this.hasAppliedDiff = true
}
// Read back the applied state from the workflow store
const workflowStore = useWorkflowStore.getState()
actualDiffWorkflow = workflowStore.getWorkflowState()
if (!actualDiffWorkflow) {
throw new Error('Failed to retrieve workflow state after applying changes')
}
// Get the workflow state that was just applied, merge subblocks, and sanitize
// This matches what get_user_workflow would return (the true state after edits were applied)
const workflowJson = this.getSanitizedWorkflowJson(actualDiffWorkflow)
// Build sanitized data including workflow JSON and any skipped/validation info
const sanitizedData: Record<string, any> = {}
if (workflowJson) {
sanitizedData.userWorkflow = workflowJson
}
// Include skipped items and validation errors in the response for LLM feedback
if (result?.skippedItems?.length > 0) {
sanitizedData.skippedItems = result.skippedItems
sanitizedData.skippedItemsMessage = result.skippedItemsMessage
}
if (result?.inputValidationErrors?.length > 0) {
sanitizedData.inputValidationErrors = result.inputValidationErrors
sanitizedData.inputValidationMessage = result.inputValidationMessage
}
// Build a message that includes info about skipped items
let completeMessage = 'Workflow diff ready for review'
if (result?.skippedItems?.length > 0 || result?.inputValidationErrors?.length > 0) {
const parts: string[] = []
if (result?.skippedItems?.length > 0) {
parts.push(`${result.skippedItems.length} operation(s) skipped`)
}
if (result?.inputValidationErrors?.length > 0) {
parts.push(`${result.inputValidationErrors.length} input(s) rejected`)
}
completeMessage = `Workflow diff ready for review. Note: ${parts.join(', ')}.`
}
// Mark complete early to unblock LLM stream
await this.markToolComplete(
200,
completeMessage,
Object.keys(sanitizedData).length > 0 ? sanitizedData : undefined
)
// Move into review state
this.setState(ClientToolCallState.review, { result })
} catch (fetchError: any) {
clearTimeout(fetchTimeout)
if (fetchError.name === 'AbortError') {
throw new Error('Server request timed out')
}
throw fetchError
}
// Mark complete early to unblock LLM stream
await this.markToolComplete(
200,
completeMessage,
Object.keys(sanitizedData).length > 0 ? sanitizedData : undefined
)
// Move into review state
this.setState(ClientToolCallState.review, { result })
} catch (error: any) {
const message = error instanceof Error ? error.message : String(error)
logger.error('execute error', { message })
await this.markToolComplete(500, message)
this.setState(ClientToolCallState.error)
}
})
}
}

View File

@@ -0,0 +1,387 @@
import { Check, Loader2, Plus, X, XCircle } from 'lucide-react'
import {
BaseClientTool,
type BaseClientToolMetadata,
ClientToolCallState,
} from '@/lib/copilot/tools/client/base-tool'
import { createLogger } from '@/lib/logs/console/logger'
import { useCustomToolsStore } from '@/stores/custom-tools/store'
import { useCopilotStore } from '@/stores/panel/copilot/store'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
interface CustomToolSchema {
type: 'function'
function: {
name: string
description?: string
parameters: {
type: string
properties: Record<string, any>
required?: string[]
}
}
}
interface ManageCustomToolArgs {
operation: 'add' | 'edit' | 'delete'
toolId?: string
title?: string
schema?: CustomToolSchema
code?: string
}
const API_ENDPOINT = '/api/tools/custom'
/**
* Client tool for creating, editing, and deleting custom tools via the copilot.
*/
export class ManageCustomToolClientTool extends BaseClientTool {
static readonly id = 'manage_custom_tool'
private currentArgs?: ManageCustomToolArgs
constructor(toolCallId: string) {
super(toolCallId, ManageCustomToolClientTool.id, ManageCustomToolClientTool.metadata)
}
static readonly metadata: BaseClientToolMetadata = {
displayNames: {
[ClientToolCallState.generating]: {
text: 'Managing custom tool',
icon: Loader2,
},
[ClientToolCallState.pending]: { text: 'Manage custom tool?', icon: Plus },
[ClientToolCallState.executing]: { text: 'Managing custom tool', icon: Loader2 },
[ClientToolCallState.success]: { text: 'Managed custom tool', icon: Check },
[ClientToolCallState.error]: { text: 'Failed to manage custom tool', icon: X },
[ClientToolCallState.aborted]: {
text: 'Aborted managing custom tool',
icon: XCircle,
},
[ClientToolCallState.rejected]: {
text: 'Skipped managing custom tool',
icon: XCircle,
},
},
interrupt: {
accept: { text: 'Allow', icon: Check },
reject: { text: 'Skip', icon: XCircle },
},
getDynamicText: (params, state) => {
const operation = params?.operation as 'add' | 'edit' | 'delete' | undefined
// Return undefined if no operation yet - use static defaults
if (!operation) return undefined
// Get tool name from params, or look it up from the store by toolId
let toolName = params?.title || params?.schema?.function?.name
if (!toolName && params?.toolId) {
try {
const tool = useCustomToolsStore.getState().getTool(params.toolId)
toolName = tool?.title || tool?.schema?.function?.name
} catch {
// Ignore errors accessing store
}
}
const getActionText = (verb: 'present' | 'past' | 'gerund') => {
switch (operation) {
case 'add':
return verb === 'present' ? 'Create' : verb === 'past' ? 'Created' : 'Creating'
case 'edit':
return verb === 'present' ? 'Edit' : verb === 'past' ? 'Edited' : 'Editing'
case 'delete':
return verb === 'present' ? 'Delete' : verb === 'past' ? 'Deleted' : 'Deleting'
}
}
// For add: only show tool name in past tense (success)
// For edit/delete: always show tool name
const shouldShowToolName = (currentState: ClientToolCallState) => {
if (operation === 'add') {
return currentState === ClientToolCallState.success
}
return true // edit and delete always show tool name
}
const nameText = shouldShowToolName(state) && toolName ? ` ${toolName}` : ' custom tool'
switch (state) {
case ClientToolCallState.success:
return `${getActionText('past')}${nameText}`
case ClientToolCallState.executing:
return `${getActionText('gerund')}${nameText}`
case ClientToolCallState.generating:
return `${getActionText('gerund')}${nameText}`
case ClientToolCallState.pending:
return `${getActionText('present')}${nameText}?`
case ClientToolCallState.error:
return `Failed to ${getActionText('present')?.toLowerCase()}${nameText}`
case ClientToolCallState.aborted:
return `Aborted ${getActionText('gerund')?.toLowerCase()}${nameText}`
case ClientToolCallState.rejected:
return `Skipped ${getActionText('gerund')?.toLowerCase()}${nameText}`
}
return undefined
},
}
/**
* Gets the tool call args from the copilot store (needed before execute() is called)
*/
private getArgsFromStore(): ManageCustomToolArgs | undefined {
try {
const { toolCallsById } = useCopilotStore.getState()
const toolCall = toolCallsById[this.toolCallId]
return (toolCall as any)?.params as ManageCustomToolArgs | undefined
} catch {
return undefined
}
}
/**
* Override getInterruptDisplays to only show confirmation for edit and delete operations.
* Add operations execute directly without confirmation.
*/
getInterruptDisplays(): BaseClientToolMetadata['interrupt'] | undefined {
// Try currentArgs first, then fall back to store (for when called before execute())
const args = this.currentArgs || this.getArgsFromStore()
const operation = args?.operation
if (operation === 'edit' || operation === 'delete') {
return this.metadata.interrupt
}
return undefined
}
async handleReject(): Promise<void> {
await super.handleReject()
this.setState(ClientToolCallState.rejected)
}
async handleAccept(args?: ManageCustomToolArgs): Promise<void> {
const logger = createLogger('ManageCustomToolClientTool')
try {
this.setState(ClientToolCallState.executing)
await this.executeOperation(args, logger)
} catch (e: any) {
logger.error('execute failed', { message: e?.message })
this.setState(ClientToolCallState.error)
await this.markToolComplete(500, e?.message || 'Failed to manage custom tool')
}
}
async execute(args?: ManageCustomToolArgs): Promise<void> {
this.currentArgs = args
// For add operation, execute directly without confirmation
// For edit/delete, the copilot store will check hasInterrupt() and wait for confirmation
if (args?.operation === 'add') {
await this.handleAccept(args)
}
// edit/delete will wait for user confirmation via handleAccept
}
/**
* Executes the custom tool operation (add, edit, or delete)
*/
private async executeOperation(
args: ManageCustomToolArgs | undefined,
logger: ReturnType<typeof createLogger>
): Promise<void> {
if (!args?.operation) {
throw new Error('Operation is required')
}
const { operation, toolId, title, schema, code } = args
// Get workspace ID from the workflow registry
const { hydration } = useWorkflowRegistry.getState()
const workspaceId = hydration.workspaceId
if (!workspaceId) {
throw new Error('No active workspace found')
}
logger.info(`Executing custom tool operation: ${operation}`, {
operation,
toolId,
title,
workspaceId,
})
switch (operation) {
case 'add':
await this.addCustomTool({ title, schema, code, workspaceId }, logger)
break
case 'edit':
await this.editCustomTool({ toolId, title, schema, code, workspaceId }, logger)
break
case 'delete':
await this.deleteCustomTool({ toolId, workspaceId }, logger)
break
default:
throw new Error(`Unknown operation: ${operation}`)
}
}
/**
* Creates a new custom tool
*/
private async addCustomTool(
params: {
title?: string
schema?: CustomToolSchema
code?: string
workspaceId: string
},
logger: ReturnType<typeof createLogger>
): Promise<void> {
const { title, schema, code, workspaceId } = params
if (!title) {
throw new Error('Title is required for adding a custom tool')
}
if (!schema) {
throw new Error('Schema is required for adding a custom tool')
}
if (!code) {
throw new Error('Code is required for adding a custom tool')
}
const response = await fetch(API_ENDPOINT, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
tools: [{ title, schema, code }],
workspaceId,
}),
})
const data = await response.json()
if (!response.ok) {
throw new Error(data.error || 'Failed to create custom tool')
}
if (!data.data || !Array.isArray(data.data) || data.data.length === 0) {
throw new Error('Invalid API response: missing tool data')
}
const createdTool = data.data[0]
logger.info(`Created custom tool: ${title}`, { toolId: createdTool.id })
this.setState(ClientToolCallState.success)
await this.markToolComplete(200, `Created custom tool "${title}"`, {
success: true,
operation: 'add',
toolId: createdTool.id,
title,
})
}
/**
* Updates an existing custom tool
*/
private async editCustomTool(
params: {
toolId?: string
title?: string
schema?: CustomToolSchema
code?: string
workspaceId: string
},
logger: ReturnType<typeof createLogger>
): Promise<void> {
const { toolId, title, schema, code, workspaceId } = params
if (!toolId) {
throw new Error('Tool ID is required for editing a custom tool')
}
// At least one of title, schema, or code must be provided
if (!title && !schema && !code) {
throw new Error('At least one of title, schema, or code must be provided for editing')
}
// We need to send the full tool data to the API for updates
// First, fetch the existing tool to merge with updates
const existingResponse = await fetch(`${API_ENDPOINT}?workspaceId=${workspaceId}`)
const existingData = await existingResponse.json()
if (!existingResponse.ok) {
throw new Error(existingData.error || 'Failed to fetch existing tools')
}
const existingTool = existingData.data?.find((t: any) => t.id === toolId)
if (!existingTool) {
throw new Error(`Tool with ID ${toolId} not found`)
}
// Merge updates with existing tool
const updatedTool = {
id: toolId,
title: title ?? existingTool.title,
schema: schema ?? existingTool.schema,
code: code ?? existingTool.code,
}
const response = await fetch(API_ENDPOINT, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
tools: [updatedTool],
workspaceId,
}),
})
const data = await response.json()
if (!response.ok) {
throw new Error(data.error || 'Failed to update custom tool')
}
logger.info(`Updated custom tool: ${updatedTool.title}`, { toolId })
this.setState(ClientToolCallState.success)
await this.markToolComplete(200, `Updated custom tool "${updatedTool.title}"`, {
success: true,
operation: 'edit',
toolId,
title: updatedTool.title,
})
}
/**
* Deletes a custom tool
*/
private async deleteCustomTool(
params: {
toolId?: string
workspaceId: string
},
logger: ReturnType<typeof createLogger>
): Promise<void> {
const { toolId, workspaceId } = params
if (!toolId) {
throw new Error('Tool ID is required for deleting a custom tool')
}
const url = `${API_ENDPOINT}?id=${toolId}&workspaceId=${workspaceId}`
const response = await fetch(url, {
method: 'DELETE',
})
const data = await response.json()
if (!response.ok) {
throw new Error(data.error || 'Failed to delete custom tool')
}
logger.info(`Deleted custom tool: ${toolId}`)
this.setState(ClientToolCallState.success)
await this.markToolComplete(200, `Deleted custom tool`, {
success: true,
operation: 'delete',
toolId,
})
}
}

View File

@@ -4,6 +4,7 @@ import {
BaseClientTool,
type BaseClientToolMetadata,
ClientToolCallState,
WORKFLOW_EXECUTION_TIMEOUT_MS,
} from '@/lib/copilot/tools/client/base-tool'
import { createLogger } from '@/lib/logs/console/logger'
import { executeWorkflowWithFullLogging } from '@/app/workspace/[workspaceId]/w/[workflowId]/utils'
@@ -74,7 +75,9 @@ export class RunWorkflowClientTool extends BaseClientTool {
async handleAccept(args?: RunWorkflowArgs): Promise<void> {
const logger = createLogger('RunWorkflowClientTool')
try {
// Use longer timeout for workflow execution (10 minutes)
await this.executeWithTimeout(async () => {
const params = args || {}
logger.debug('handleAccept() called', {
toolCallId: this.toolCallId,
@@ -124,60 +127,54 @@ export class RunWorkflowClientTool extends BaseClientTool {
toolCallId: this.toolCallId,
})
const result = await executeWorkflowWithFullLogging({
workflowInput,
executionId,
})
setIsExecuting(false)
// Determine success for both non-streaming and streaming executions
let succeeded = true
let errorMessage: string | undefined
try {
if (result && typeof result === 'object' && 'success' in (result as any)) {
succeeded = Boolean((result as any).success)
if (!succeeded) {
errorMessage = (result as any)?.error || (result as any)?.output?.error
}
} else if (
result &&
typeof result === 'object' &&
'execution' in (result as any) &&
(result as any).execution &&
typeof (result as any).execution === 'object'
) {
succeeded = Boolean((result as any).execution.success)
if (!succeeded) {
errorMessage =
(result as any).execution?.error || (result as any).execution?.output?.error
const result = await executeWorkflowWithFullLogging({
workflowInput,
executionId,
})
// Determine success for both non-streaming and streaming executions
let succeeded = true
let errorMessage: string | undefined
try {
if (result && typeof result === 'object' && 'success' in (result as any)) {
succeeded = Boolean((result as any).success)
if (!succeeded) {
errorMessage = (result as any)?.error || (result as any)?.output?.error
}
} else if (
result &&
typeof result === 'object' &&
'execution' in (result as any) &&
(result as any).execution &&
typeof (result as any).execution === 'object'
) {
succeeded = Boolean((result as any).execution.success)
if (!succeeded) {
errorMessage =
(result as any).execution?.error || (result as any).execution?.output?.error
}
}
} catch {}
if (succeeded) {
logger.debug('Workflow execution finished with success')
this.setState(ClientToolCallState.success)
await this.markToolComplete(
200,
`Workflow execution completed. Started at: ${executionStartTime}`
)
} else {
const msg = errorMessage || 'Workflow execution failed'
logger.error('Workflow execution finished with failure', { message: msg })
this.setState(ClientToolCallState.error)
await this.markToolComplete(500, msg)
}
} catch {}
if (succeeded) {
logger.debug('Workflow execution finished with success')
this.setState(ClientToolCallState.success)
await this.markToolComplete(
200,
`Workflow execution completed. Started at: ${executionStartTime}`
)
} else {
const msg = errorMessage || 'Workflow execution failed'
logger.error('Workflow execution finished with failure', { message: msg })
this.setState(ClientToolCallState.error)
await this.markToolComplete(500, msg)
} finally {
// Always clean up execution state
setIsExecuting(false)
}
} catch (error: any) {
const message = error instanceof Error ? error.message : String(error)
const failedDependency = typeof message === 'string' && /dependency/i.test(message)
const status = failedDependency ? 424 : 500
logger.error('Run workflow failed', { message })
this.setState(ClientToolCallState.error)
await this.markToolComplete(status, failedDependency ? undefined : message)
}
}, WORKFLOW_EXECUTION_TIMEOUT_MS)
}
async execute(args?: RunWorkflowArgs): Promise<void> {

View File

@@ -5,7 +5,7 @@ import { jwtDecode } from 'jwt-decode'
import { createPermissionError, verifyWorkflowAccess } from '@/lib/copilot/auth/permissions'
import type { BaseServerTool } from '@/lib/copilot/tools/server/base-tool'
import { generateRequestId } from '@/lib/core/utils/request'
import { getEnvironmentVariableKeys } from '@/lib/environment/utils'
import { getPersonalAndWorkspaceEnv } from '@/lib/environment/utils'
import { createLogger } from '@/lib/logs/console/logger'
import { getAllOAuthServices } from '@/lib/oauth/oauth'
import { refreshTokenIfNeeded } from '@/app/api/auth/oauth/utils'
@@ -26,8 +26,13 @@ export const getCredentialsServerTool: BaseServerTool<GetCredentialsParams, any>
const authenticatedUserId = context.userId
let workspaceId: string | undefined
if (params?.workflowId) {
const { hasAccess } = await verifyWorkflowAccess(authenticatedUserId, params.workflowId)
const { hasAccess, workspaceId: wId } = await verifyWorkflowAccess(
authenticatedUserId,
params.workflowId
)
if (!hasAccess) {
const errorMessage = createPermissionError('access credentials in')
@@ -37,6 +42,8 @@ export const getCredentialsServerTool: BaseServerTool<GetCredentialsParams, any>
})
throw new Error(errorMessage)
}
workspaceId = wId
}
const userId = authenticatedUserId
@@ -122,14 +129,23 @@ export const getCredentialsServerTool: BaseServerTool<GetCredentialsParams, any>
baseProvider: service.baseProvider,
}))
// Fetch environment variables
const envResult = await getEnvironmentVariableKeys(userId)
// Fetch environment variables from both personal and workspace
const envResult = await getPersonalAndWorkspaceEnv(userId, workspaceId)
// Get all unique variable names from both personal and workspace
const personalVarNames = Object.keys(envResult.personalEncrypted)
const workspaceVarNames = Object.keys(envResult.workspaceEncrypted)
const allVarNames = [...new Set([...personalVarNames, ...workspaceVarNames])]
logger.info('Fetched credentials', {
userId,
workspaceId,
connectedCount: connectedCredentials.length,
notConnectedCount: notConnectedServices.length,
envVarCount: envResult.count,
personalEnvVarCount: personalVarNames.length,
workspaceEnvVarCount: workspaceVarNames.length,
totalEnvVarCount: allVarNames.length,
conflicts: envResult.conflicts,
})
return {
@@ -144,8 +160,11 @@ export const getCredentialsServerTool: BaseServerTool<GetCredentialsParams, any>
},
},
environment: {
variableNames: envResult.variableNames,
count: envResult.count,
variableNames: allVarNames,
count: allVarNames.length,
personalVariables: personalVarNames,
workspaceVariables: workspaceVarNames,
conflicts: envResult.conflicts,
},
}
},

View File

@@ -1,5 +1,5 @@
import { db } from '@sim/db'
import { environment } from '@sim/db/schema'
import { workspaceEnvironment } from '@sim/db/schema'
import { eq } from 'drizzle-orm'
import { z } from 'zod'
import { createPermissionError, verifyWorkflowAccess } from '@/lib/copilot/auth/permissions'
@@ -52,28 +52,33 @@ export const setEnvironmentVariablesServerTool: BaseServerTool<SetEnvironmentVar
const authenticatedUserId = context.userId
const { variables, workflowId } = params || ({} as SetEnvironmentVariablesParams)
if (workflowId) {
const { hasAccess } = await verifyWorkflowAccess(authenticatedUserId, workflowId)
if (!hasAccess) {
const errorMessage = createPermissionError('modify environment variables in')
logger.error('Unauthorized attempt to set environment variables', {
workflowId,
authenticatedUserId,
})
throw new Error(errorMessage)
}
if (!workflowId) {
throw new Error('workflowId is required to set workspace environment variables')
}
const userId = authenticatedUserId
const { hasAccess, workspaceId } = await verifyWorkflowAccess(authenticatedUserId, workflowId)
if (!hasAccess) {
const errorMessage = createPermissionError('modify environment variables in')
logger.error('Unauthorized attempt to set environment variables', {
workflowId,
authenticatedUserId,
})
throw new Error(errorMessage)
}
if (!workspaceId) {
throw new Error('Could not determine workspace for this workflow')
}
const normalized = normalizeVariables(variables || {})
const { variables: validatedVariables } = EnvVarSchema.parse({ variables: normalized })
// Fetch existing workspace environment variables
const existingData = await db
.select()
.from(environment)
.where(eq(environment.userId, userId))
.from(workspaceEnvironment)
.where(eq(workspaceEnvironment.workspaceId, workspaceId))
.limit(1)
const existingEncrypted = (existingData[0]?.variables as Record<string, string>) || {}
@@ -109,26 +114,36 @@ export const setEnvironmentVariablesServerTool: BaseServerTool<SetEnvironmentVar
const finalEncrypted = { ...existingEncrypted, ...newlyEncrypted }
// Save to workspace environment variables
await db
.insert(environment)
.insert(workspaceEnvironment)
.values({
id: crypto.randomUUID(),
userId,
workspaceId,
variables: finalEncrypted,
updatedAt: new Date(),
})
.onConflictDoUpdate({
target: [environment.userId],
target: [workspaceEnvironment.workspaceId],
set: { variables: finalEncrypted, updatedAt: new Date() },
})
logger.info('Saved workspace environment variables', {
workspaceId,
workflowId,
addedCount: added.length,
updatedCount: updated.length,
totalCount: Object.keys(finalEncrypted).length,
})
return {
message: `Successfully processed ${Object.keys(validatedVariables).length} environment variable(s): ${added.length} added, ${updated.length} updated`,
message: `Successfully processed ${Object.keys(validatedVariables).length} workspace environment variable(s): ${added.length} added, ${updated.length} updated`,
variableCount: Object.keys(validatedVariables).length,
variableNames: Object.keys(validatedVariables),
totalVariableCount: Object.keys(finalEncrypted).length,
addedVariables: added,
updatedVariables: updated,
workspaceId,
}
},
}

View File

@@ -645,19 +645,29 @@ function createBlockFromParams(
function normalizeTools(tools: any[]): any[] {
return tools.map((tool) => {
if (tool.type === 'custom-tool') {
// Reconstruct sanitized custom tool fields
// New reference format: minimal fields only
if (tool.customToolId && !tool.schema && !tool.code) {
return {
type: tool.type,
customToolId: tool.customToolId,
usageControl: tool.usageControl || 'auto',
isExpanded: tool.isExpanded ?? true,
}
}
// Legacy inline format: include all fields
const normalized: any = {
...tool,
params: tool.params || {},
isExpanded: tool.isExpanded ?? true,
}
// Ensure schema has proper structure
// Ensure schema has proper structure (for inline format)
if (normalized.schema?.function) {
normalized.schema = {
type: 'function',
function: {
name: tool.title, // Derive name from title
name: normalized.schema.function.name || tool.title, // Preserve name or derive from title
description: normalized.schema.function.description,
parameters: normalized.schema.function.parameters,
},

View File

@@ -19,8 +19,32 @@ interface CustomTool {
usageControl?: string
}
/**
* Stored tool format that may contain either reference or inline definition
*/
interface StoredCustomTool {
type: string
title?: string
toolId?: string
customToolId?: string
schema?: any
code?: string
usageControl?: string
}
/**
* Checks if a stored tool is a reference-only custom tool (no inline definition)
*/
function isCustomToolReference(tool: StoredCustomTool): boolean {
return tool.type === 'custom-tool' && !!tool.customToolId && !tool.code
}
/**
* Extract all custom tools from agent blocks in the workflow state
*
* @remarks
* Only extracts tools with inline definitions (legacy format).
* Reference-only tools (new format with customToolId) are skipped since they're already in the database.
*/
export function extractCustomToolsFromWorkflowState(workflowState: any): CustomTool[] {
const customToolsMap = new Map<string, CustomTool>()
@@ -60,14 +84,18 @@ export function extractCustomToolsFromWorkflowState(workflowState: any): CustomT
}
for (const tool of tools) {
if (
tool &&
typeof tool === 'object' &&
tool.type === 'custom-tool' &&
tool.title &&
tool.schema?.function &&
tool.code
) {
if (!tool || typeof tool !== 'object' || tool.type !== 'custom-tool') {
continue
}
// Skip reference-only tools - they're already in the database
if (isCustomToolReference(tool)) {
logger.debug(`Skipping reference-only custom tool: ${tool.title || tool.customToolId}`)
continue
}
// Only persist tools with inline definitions (legacy format)
if (tool.title && tool.schema?.function && tool.code) {
const toolKey = tool.toolId || tool.title
if (!customToolsMap.has(toolKey)) {

View File

@@ -128,6 +128,16 @@ function sanitizeConditions(conditionsJson: string): string {
function sanitizeTools(tools: any[]): any[] {
return tools.map((tool) => {
if (tool.type === 'custom-tool') {
// New reference format: minimal fields only
if (tool.customToolId && !tool.schema && !tool.code) {
return {
type: tool.type,
customToolId: tool.customToolId,
usageControl: tool.usageControl,
}
}
// Legacy inline format: include all fields
const sanitized: any = {
type: tool.type,
title: tool.title,
@@ -135,15 +145,19 @@ function sanitizeTools(tools: any[]): any[] {
usageControl: tool.usageControl,
}
// Include schema for inline format (legacy format)
if (tool.schema?.function) {
sanitized.schema = {
type: tool.schema.type || 'function',
function: {
name: tool.schema.function.name,
description: tool.schema.function.description,
parameters: tool.schema.function.parameters,
},
}
}
// Include code for inline format (legacy format)
if (tool.code) {
sanitized.code = tool.code
}

View File

@@ -5,6 +5,9 @@ import { getTool } from '@/tools/utils'
const logger = createLogger('WorkflowValidation')
/**
* Checks if a custom tool has a valid inline schema
*/
function isValidCustomToolSchema(tool: any): boolean {
try {
if (!tool || typeof tool !== 'object') return false
@@ -27,6 +30,26 @@ function isValidCustomToolSchema(tool: any): boolean {
}
}
/**
* Checks if a custom tool is a valid reference-only format (new format)
*/
function isValidCustomToolReference(tool: any): boolean {
try {
if (!tool || typeof tool !== 'object') return false
if (tool.type !== 'custom-tool') return false
// Reference format: has customToolId but no inline schema/code
// This is valid - the tool will be loaded dynamically during execution
if (tool.customToolId && typeof tool.customToolId === 'string') {
return true
}
return false
} catch (_err) {
return false
}
}
export function sanitizeAgentToolsInBlocks(blocks: Record<string, any>): {
blocks: Record<string, any>
warnings: string[]
@@ -70,24 +93,34 @@ export function sanitizeAgentToolsInBlocks(blocks: Record<string, any>): {
// Allow non-custom tools to pass through as-is
if (!tool || typeof tool !== 'object') return false
if (tool.type !== 'custom-tool') return true
// Check if it's a valid reference-only format (new format)
if (isValidCustomToolReference(tool)) {
return true
}
// Check if it's a valid inline schema format (legacy format)
const ok = isValidCustomToolSchema(tool)
if (!ok) {
logger.warn('Removing invalid custom tool from workflow', {
blockId,
blockName: block.name,
hasCustomToolId: !!tool.customToolId,
hasSchema: !!tool.schema,
})
}
return ok
})
.map((tool: any) => {
if (tool.type === 'custom-tool') {
// Ensure required defaults to avoid client crashes
if (!tool.code || typeof tool.code !== 'string') {
tool.code = ''
}
// For reference-only tools, ensure usageControl default
if (!tool.usageControl) {
tool.usageControl = 'auto'
}
// For inline tools (legacy), also ensure code default
if (!tool.customToolId && (!tool.code || typeof tool.code !== 'string')) {
tool.code = ''
}
}
return tool
})

View File

@@ -147,89 +147,8 @@ export class Serializer {
loops: Record<string, Loop>,
parallels: Record<string, Parallel>
): void {
// Validate loops in forEach mode
Object.values(loops || {}).forEach((loop) => {
if (!loop) return
if (loop.loopType === 'forEach') {
const items = (loop as any).forEachItems
const hasNonEmptyCollection = (() => {
if (items === undefined || items === null) return false
if (Array.isArray(items)) return items.length > 0
if (typeof items === 'object') return Object.keys(items).length > 0
if (typeof items === 'string') {
const trimmed = items.trim()
if (trimmed.length === 0) return false
// If it looks like JSON, parse to confirm non-empty [] / {}
if (trimmed.startsWith('[') || trimmed.startsWith('{')) {
try {
const parsed = JSON.parse(trimmed)
if (Array.isArray(parsed)) return parsed.length > 0
if (parsed && typeof parsed === 'object') return Object.keys(parsed).length > 0
} catch {
// Non-JSON or invalid JSON string allow non-empty string (could be a reference like <start.items>)
return true
}
}
// Non-JSON string allow (may be a variable reference/expression)
return true
}
return false
})()
if (!hasNonEmptyCollection) {
const blockName = blocks[loop.id]?.name || 'Loop'
const error = new WorkflowValidationError(
`${blockName} requires a collection for forEach mode. Provide a non-empty array/object or a variable reference.`,
loop.id,
'loop',
blockName
)
throw error
}
}
})
// Validate parallels in collection mode
Object.values(parallels || {}).forEach((parallel) => {
if (!parallel) return
if ((parallel as any).parallelType === 'collection') {
const distribution = (parallel as any).distribution
const hasNonEmptyDistribution = (() => {
if (distribution === undefined || distribution === null) return false
if (Array.isArray(distribution)) return distribution.length > 0
if (typeof distribution === 'object') return Object.keys(distribution).length > 0
if (typeof distribution === 'string') {
const trimmed = distribution.trim()
if (trimmed.length === 0) return false
// If it looks like JSON, parse to confirm non-empty [] / {}
if (trimmed.startsWith('[') || trimmed.startsWith('{')) {
try {
const parsed = JSON.parse(trimmed)
if (Array.isArray(parsed)) return parsed.length > 0
if (parsed && typeof parsed === 'object') return Object.keys(parsed).length > 0
} catch {
return true
}
}
return true
}
return false
})()
if (!hasNonEmptyDistribution) {
const blockName = blocks[parallel.id]?.name || 'Parallel'
const error = new WorkflowValidationError(
`${blockName} requires a collection for collection mode. Provide a non-empty array/object or a variable reference.`,
parallel.id,
'parallel',
blockName
)
throw error
}
}
})
// Note: Empty collections in forEach loops and parallel collection mode are handled gracefully
// at runtime - the loop/parallel will simply be skipped. No build-time validation needed.
}
private serializeBlock(

View File

@@ -43,6 +43,7 @@ import { GetWorkflowConsoleClientTool } from '@/lib/copilot/tools/client/workflo
import { GetWorkflowDataClientTool } from '@/lib/copilot/tools/client/workflow/get-workflow-data'
import { GetWorkflowFromNameClientTool } from '@/lib/copilot/tools/client/workflow/get-workflow-from-name'
import { ListUserWorkflowsClientTool } from '@/lib/copilot/tools/client/workflow/list-user-workflows'
import { ManageCustomToolClientTool } from '@/lib/copilot/tools/client/workflow/manage-custom-tool'
import { RunWorkflowClientTool } from '@/lib/copilot/tools/client/workflow/run-workflow'
import { SetGlobalWorkflowVariablesClientTool } from '@/lib/copilot/tools/client/workflow/set-global-workflow-variables'
import { createLogger } from '@/lib/logs/console/logger'
@@ -100,6 +101,7 @@ const CLIENT_TOOL_INSTANTIATORS: Record<string, (id: string) => any> = {
deploy_workflow: (id) => new DeployWorkflowClientTool(id),
check_deployment_status: (id) => new CheckDeploymentStatusClientTool(id),
navigate_ui: (id) => new NavigateUIClientTool(id),
manage_custom_tool: (id) => new ManageCustomToolClientTool(id),
}
// Read-only static metadata for class-based tools (no instances)
@@ -135,6 +137,7 @@ export const CLASS_TOOL_METADATA: Record<string, BaseClientToolMetadata | undefi
deploy_workflow: (DeployWorkflowClientTool as any)?.metadata,
check_deployment_status: (CheckDeploymentStatusClientTool as any)?.metadata,
navigate_ui: (NavigateUIClientTool as any)?.metadata,
manage_custom_tool: (ManageCustomToolClientTool as any)?.metadata,
}
function ensureClientToolInstance(toolName: string | undefined, toolCallId: string | undefined) {

View File

@@ -207,149 +207,114 @@ export const useWorkflowDiffStore = create<WorkflowDiffState & WorkflowDiffActio
throw new Error('No active workflow found')
}
const previousState = cloneWorkflowState(useWorkflowStore.getState().getWorkflowState())
batchedUpdate({ isDiffReady: false, diffError: null })
// Capture baseline if needed (synchronous, fast)
let baselineWorkflow = get().baselineWorkflow
let baselineWorkflowId = get().baselineWorkflowId
let capturedBaseline = false
if (!baselineWorkflow || baselineWorkflowId !== activeWorkflowId) {
try {
baselineWorkflow = captureBaselineSnapshot(activeWorkflowId)
baselineWorkflowId = activeWorkflowId
capturedBaseline = true
logger.info('Captured baseline snapshot for diff workflow', {
workflowId: activeWorkflowId,
blockCount: Object.keys(baselineWorkflow.blocks || {}).length,
})
} catch (error) {
const message = 'Failed to capture workflow snapshot before applying diff'
logger.error(message, { error })
batchedUpdate({ diffError: message, isDiffReady: false })
throw error instanceof Error ? error : new Error(message)
}
baselineWorkflow = captureBaselineSnapshot(activeWorkflowId)
baselineWorkflowId = activeWorkflowId
capturedBaseline = true
logger.info('Captured baseline snapshot for diff workflow', {
workflowId: activeWorkflowId,
blockCount: Object.keys(baselineWorkflow.blocks || {}).length,
})
}
try {
const diffResult = await diffEngine.createDiffFromWorkflowState(
proposedState,
diffAnalysis,
baselineWorkflow ?? undefined
)
// Create diff (this is fast, just computes the diff)
const diffResult = await diffEngine.createDiffFromWorkflowState(
proposedState,
diffAnalysis,
baselineWorkflow ?? undefined
)
if (!diffResult.success || !diffResult.diff) {
const errorMessage = diffResult.errors?.join(', ') || 'Failed to create diff'
logger.error(errorMessage)
throw new Error(errorMessage)
}
if (!diffResult.success || !diffResult.diff) {
const errorMessage = diffResult.errors?.join(', ') || 'Failed to create diff'
logger.error(errorMessage)
throw new Error(errorMessage)
}
const candidateState = diffResult.diff.proposedState
const candidateState = diffResult.diff.proposedState
// Validate proposed workflow using serializer round-trip
try {
const serializer = new Serializer()
const serialized = serializer.serializeWorkflow(
candidateState.blocks,
candidateState.edges,
candidateState.loops,
candidateState.parallels,
false
)
serializer.deserializeWorkflow(serialized)
} catch (error) {
const message =
error instanceof Error ? error.message : 'Invalid workflow in proposed changes'
logger.error('[DiffStore] Diff validation failed', { message, error })
throw new Error(message)
}
// Validate proposed workflow using serializer round-trip
const serializer = new Serializer()
const serialized = serializer.serializeWorkflow(
candidateState.blocks,
candidateState.edges,
candidateState.loops,
candidateState.parallels,
false
)
serializer.deserializeWorkflow(serialized)
// Log to verify diff markers are present
const sampleBlock = Object.values(candidateState.blocks)[0] as any
logger.info('Applying candidate state with diff markers', {
sampleBlockId: sampleBlock?.id,
sampleBlockHasDiff: !!sampleBlock?.is_diff,
sampleBlockDiffStatus: sampleBlock?.is_diff,
totalBlocks: Object.keys(candidateState.blocks).length,
blocksWithDiff: Object.values(candidateState.blocks).filter((b: any) => b.is_diff)
.length,
})
// OPTIMISTIC: Apply state immediately to stores (this is what makes UI update)
applyWorkflowStateToStores(activeWorkflowId, candidateState)
// Apply the candidate state WITH markers locally (for visual diff)
applyWorkflowStateToStores(activeWorkflowId, candidateState)
// OPTIMISTIC: Update diff state immediately so UI shows the diff
const triggerMessageId =
capturedBaseline && !get()._triggerMessageId
? await getLatestUserMessageId()
: get()._triggerMessageId
// Broadcast state change to other users (without markers)
const cleanState = stripWorkflowDiffMarkers(cloneWorkflowState(candidateState))
await enqueueReplaceWorkflowState({
workflowId: activeWorkflowId,
state: cleanState,
})
set({
hasActiveDiff: true,
isShowingDiff: true,
isDiffReady: true,
baselineWorkflow: baselineWorkflow,
baselineWorkflowId,
diffAnalysis: diffResult.diff.diffAnalysis || null,
diffMetadata: diffResult.diff.metadata,
diffError: null,
_triggerMessageId: triggerMessageId ?? null,
})
// Persist to database
const persisted = await persistWorkflowStateToServer(activeWorkflowId, candidateState)
logger.info('Workflow diff applied optimistically', {
workflowId: activeWorkflowId,
blocks: Object.keys(candidateState.blocks || {}).length,
edges: candidateState.edges?.length || 0,
})
if (!persisted) {
logger.error('Failed to persist copilot edits, restoring previous workflow state')
applyWorkflowStateToStores(activeWorkflowId, previousState)
batchedUpdate({
hasActiveDiff: Boolean(baselineWorkflow),
isShowingDiff: Boolean(baselineWorkflow),
isDiffReady: Boolean(baselineWorkflow),
diffError: 'Failed to save Copilot changes. Please try again.',
})
throw new Error('Failed to save Copilot changes')
}
// BACKGROUND: Broadcast and persist without blocking
// These operations happen after the UI has already updated
const cleanState = stripWorkflowDiffMarkers(cloneWorkflowState(candidateState))
const triggerMessageId =
capturedBaseline && !get()._triggerMessageId
? await getLatestUserMessageId()
: get()._triggerMessageId
// Fire and forget: broadcast to other users (don't await)
enqueueReplaceWorkflowState({
workflowId: activeWorkflowId,
state: cleanState,
}).catch((error) => {
logger.warn('Failed to broadcast workflow state (non-blocking)', { error })
})
batchedUpdate({
hasActiveDiff: true,
isShowingDiff: true,
isDiffReady: true,
baselineWorkflow: baselineWorkflow,
baselineWorkflowId,
diffAnalysis: diffResult.diff.diffAnalysis || null,
diffMetadata: diffResult.diff.metadata,
diffError: null,
_triggerMessageId: triggerMessageId ?? null,
})
// Emit event for undo/redo recording (unless we're in an undo/redo operation)
if (!(window as any).__skipDiffRecording) {
window.dispatchEvent(
new CustomEvent('record-diff-operation', {
detail: {
type: 'apply-diff',
baselineSnapshot: baselineWorkflow,
proposedState: candidateState,
diffAnalysis: diffResult.diff.diffAnalysis,
},
// Fire and forget: persist to database (don't await)
persistWorkflowStateToServer(activeWorkflowId, candidateState)
.then((persisted) => {
if (!persisted) {
logger.warn('Failed to persist copilot edits (state already applied locally)')
// Don't revert - user can retry or state will sync on next save
} else {
logger.info('Workflow diff persisted to database', {
workflowId: activeWorkflowId,
})
)
}
logger.info('Workflow diff applied and persisted to main store', {
workflowId: activeWorkflowId,
blocks: Object.keys(candidateState.blocks || {}).length,
edges: candidateState.edges?.length || 0,
}
})
} catch (error) {
logger.error('Failed to set proposed changes', { error })
if (capturedBaseline) {
batchedUpdate({
baselineWorkflow: null,
baselineWorkflowId: null,
hasActiveDiff: false,
isShowingDiff: false,
.catch((error) => {
logger.warn('Failed to persist workflow state (non-blocking)', { error })
})
// Emit event for undo/redo recording
if (!(window as any).__skipDiffRecording) {
window.dispatchEvent(
new CustomEvent('record-diff-operation', {
detail: {
type: 'apply-diff',
baselineSnapshot: baselineWorkflow,
proposedState: candidateState,
diffAnalysis: diffResult.diff.diffAnalysis,
},
})
}
const message =
error instanceof Error ? error.message : 'Failed to create workflow diff'
batchedUpdate({ diffError: message, isDiffReady: false })
throw error
)
}
},