This commit is contained in:
Siddharth Ganesan
2026-01-28 15:02:41 -08:00
parent 01e0723a3a
commit cee74f8eb5
30 changed files with 3530 additions and 125 deletions

View File

@@ -16,6 +16,10 @@ import {
createRequestTracker,
createUnauthorizedResponse,
} from '@/lib/copilot/request-helpers'
import {
handleToolCallEvent,
registerServerHandledTool,
} from '@/lib/copilot/server-executor/stream-handler'
import { getCredentialsServerTool } from '@/lib/copilot/tools/server/user/get-credentials'
import type { CopilotProviderConfig } from '@/lib/copilot/types'
import { env } from '@/lib/core/config/env'
@@ -618,6 +622,33 @@ export async function POST(req: NextRequest) {
toolCalls.push(event.data)
if (event.data?.id) {
announcedToolCallIds.add(event.data.id)
// Execute server-side tools automatically
// This runs async and calls mark-complete when done
handleToolCallEvent(
{
id: event.data.id,
name: event.data.name,
arguments: event.data.arguments || {},
partial: false,
},
{
userId: authenticatedUserId,
workflowId,
chatId: actualChatId,
}
).then((handledServerSide) => {
if (handledServerSide) {
registerServerHandledTool(event.data.id, event.data.name)
logger.info(
`[${tracker.requestId}] Tool will be executed server-side`,
{
toolCallId: event.data.id,
toolName: event.data.name,
}
)
}
})
}
}
break

View File

@@ -0,0 +1,64 @@
import { createLogger } from '@sim/logger'
import { type NextRequest, NextResponse } from 'next/server'
import {
authenticateCopilotRequestSessionOnly,
createUnauthorizedResponse,
} from '@/lib/copilot/request-helpers'
import { getToolExecutionState } from '@/lib/copilot/server-executor/stream-handler'
const logger = createLogger('ToolExecutionStateAPI')
/**
* GET /api/copilot/tools/execution-state/[toolCallId]
*
* Returns the execution state of a tool call.
* Useful for client reconnection scenarios.
*/
export async function GET(
req: NextRequest,
{ params }: { params: Promise<{ toolCallId: string }> }
) {
try {
const { userId, isAuthenticated } = await authenticateCopilotRequestSessionOnly()
if (!isAuthenticated || !userId) {
return createUnauthorizedResponse()
}
const { toolCallId } = await params
if (!toolCallId) {
return NextResponse.json({ error: 'Tool call ID is required' }, { status: 400 })
}
const state = await getToolExecutionState(toolCallId)
if (!state) {
return NextResponse.json({ error: 'Tool call not found' }, { status: 404 })
}
// Verify the user owns this tool execution
if (state.userId !== userId) {
logger.warn("User attempted to access another user's tool execution", {
requestingUserId: userId,
ownerUserId: state.userId,
toolCallId,
})
return NextResponse.json({ error: 'Tool call not found' }, { status: 404 })
}
return NextResponse.json({
toolCallId: state.toolCallId,
toolName: state.toolName,
status: state.status,
startedAt: state.startedAt,
completedAt: state.completedAt,
result: state.result,
error: state.error,
})
} catch (error) {
logger.error('Error fetching tool execution state', {
error: error instanceof Error ? error.message : String(error),
})
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
}
}

View File

@@ -0,0 +1,85 @@
import { createLogger } from '@sim/logger'
import { type NextRequest, NextResponse } from 'next/server'
import { z } from 'zod'
import {
authenticateCopilotRequestSessionOnly,
createBadRequestResponse,
createUnauthorizedResponse,
} from '@/lib/copilot/request-helpers'
import { getToolExecutionState } from '@/lib/copilot/server-executor/stream-handler'
const logger = createLogger('ToolExecutionStatesAPI')
const RequestSchema = z.object({
toolCallIds: z.array(z.string()).min(1).max(50),
})
/**
* POST /api/copilot/tools/execution-states
*
* Returns the execution states of multiple tool calls at once.
* Useful for efficient reconnection scenarios.
*/
export async function POST(req: NextRequest) {
try {
const { userId, isAuthenticated } = await authenticateCopilotRequestSessionOnly()
if (!isAuthenticated || !userId) {
return createUnauthorizedResponse()
}
const body = await req.json()
const { toolCallIds } = RequestSchema.parse(body)
const states: Record<
string,
{
toolCallId: string
toolName: string
status: string
startedAt: number
completedAt?: number
result?: unknown
error?: string
} | null
> = {}
// Fetch all states in parallel
const results = await Promise.all(
toolCallIds.map(async (toolCallId) => {
const state = await getToolExecutionState(toolCallId)
// Filter out states that don't belong to this user
if (state && state.userId !== userId) {
return { toolCallId, state: null }
}
return { toolCallId, state }
})
)
for (const { toolCallId, state } of results) {
if (state) {
states[toolCallId] = {
toolCallId: state.toolCallId,
toolName: state.toolName,
status: state.status,
startedAt: state.startedAt,
completedAt: state.completedAt,
result: state.result,
error: state.error,
}
} else {
states[toolCallId] = null
}
}
return NextResponse.json({ states })
} catch (error) {
if (error instanceof z.ZodError) {
return createBadRequestResponse('Invalid request body')
}
logger.error('Error fetching tool execution states', {
error: error instanceof Error ? error.message : String(error),
})
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
}
}

View File

@@ -9,20 +9,13 @@ import {
createRequestTracker,
createUnauthorizedResponse,
} from '@/lib/copilot/request-helpers'
import { MarkCompletePayloadSchema } from '@/lib/copilot/tools/shared/schemas'
import { env } from '@/lib/core/config/env'
const logger = createLogger('CopilotMarkToolCompleteAPI')
const SIM_AGENT_API_URL = env.SIM_AGENT_API_URL || SIM_AGENT_API_URL_DEFAULT
const MarkCompleteSchema = z.object({
id: z.string(),
name: z.string(),
status: z.number().int(),
message: z.any().optional(),
data: z.any().optional(),
})
/**
* POST /api/copilot/tools/mark-complete
* Proxy to Sim Agent: POST /api/tools/mark-complete
@@ -46,7 +39,7 @@ export async function POST(req: NextRequest) {
})
} catch {}
const parsed = MarkCompleteSchema.parse(body)
const parsed = MarkCompletePayloadSchema.parse(body)
const messagePreview = (() => {
try {

View File

@@ -0,0 +1,14 @@
import { NextResponse } from 'next/server'
import { SERVER_EXECUTED_TOOLS } from '@/lib/copilot/server-executor/registry'
/**
* GET /api/copilot/tools/server-executed
*
* Returns the list of tools that are executed server-side.
* Clients can use this to avoid double-executing these tools.
*/
export async function GET() {
return NextResponse.json({
tools: SERVER_EXECUTED_TOOLS,
})
}

View File

@@ -0,0 +1,162 @@
/**
* Client-side utilities for server-executed tools.
*
* This module helps the client know which tools are executed server-side
* to avoid double-execution.
*/
import { createLogger } from '@sim/logger'
const logger = createLogger('ServerExecutedTools')
/**
* List of tools that are executed server-side.
* This is cached after the first fetch.
*/
let cachedServerExecutedTools: Set<string> | null = null
/**
* Tools currently being executed server-side.
* Maps toolCallId to tool info.
*/
const serverHandledToolCalls = new Map<
string,
{
toolName: string
startedAt: number
}
>()
/**
* Fetch the list of server-executed tools from the API.
* Results are cached for the session.
*/
export async function fetchServerExecutedTools(): Promise<Set<string>> {
if (cachedServerExecutedTools) {
return cachedServerExecutedTools
}
try {
const response = await fetch('/api/copilot/tools/server-executed')
if (!response.ok) {
throw new Error(`HTTP ${response.status}`)
}
const data = (await response.json()) as { tools: string[] }
cachedServerExecutedTools = new Set(data.tools)
logger.info('Fetched server-executed tools', {
count: cachedServerExecutedTools.size,
tools: Array.from(cachedServerExecutedTools),
})
return cachedServerExecutedTools
} catch (error) {
logger.warn('Failed to fetch server-executed tools, using empty set', {
error: error instanceof Error ? error.message : String(error),
})
// Return empty set on error - tools will execute client-side as fallback
return new Set()
}
}
/**
* Check if a tool is executed server-side.
* Uses cached list or fetches if not available.
*/
export async function isServerExecutedTool(toolName: string): Promise<boolean> {
const serverTools = await fetchServerExecutedTools()
return serverTools.has(toolName)
}
/**
* Synchronous check if a tool is server-executed.
* Returns false if cache is not yet populated.
*/
export function isServerExecutedToolSync(toolName: string): boolean {
if (!cachedServerExecutedTools) {
return false
}
return cachedServerExecutedTools.has(toolName)
}
/**
* Get the cached list of server-executed tools.
* Returns null if not yet fetched.
*/
export function getServerExecutedToolsSync(): Set<string> | null {
return cachedServerExecutedTools
}
/**
* Mark a tool call as being handled by the server.
* Used to prevent client from executing it.
*/
export function markToolCallServerHandled(toolCallId: string, toolName: string): void {
serverHandledToolCalls.set(toolCallId, {
toolName,
startedAt: Date.now(),
})
logger.debug('Marked tool call as server-handled', { toolCallId, toolName })
// Cleanup old entries (older than 1 hour)
const oneHourAgo = Date.now() - 60 * 60 * 1000
for (const [id, info] of serverHandledToolCalls.entries()) {
if (info.startedAt < oneHourAgo) {
serverHandledToolCalls.delete(id)
}
}
}
/**
* Check if a specific tool call is being handled by the server.
*/
export function isToolCallServerHandled(toolCallId: string): boolean {
return serverHandledToolCalls.has(toolCallId)
}
/**
* Remove a tool call from server-handled tracking.
* Called when tool_result is received.
*/
export function clearToolCallServerHandled(toolCallId: string): void {
serverHandledToolCalls.delete(toolCallId)
}
/**
* Get tool execution state from the server (for reconnection scenarios).
*/
export async function getToolExecutionState(toolCallId: string): Promise<{
status: 'pending' | 'executing' | 'completed' | 'failed' | 'unknown'
result?: unknown
error?: string
} | null> {
try {
const response = await fetch(`/api/copilot/tools/execution-state/${toolCallId}`)
if (!response.ok) {
if (response.status === 404) {
return null
}
throw new Error(`HTTP ${response.status}`)
}
return await response.json()
} catch (error) {
logger.warn('Failed to get tool execution state', {
toolCallId,
error: error instanceof Error ? error.message : String(error),
})
return null
}
}
/**
* Pre-fetch server-executed tools list.
* Call this early in the app lifecycle.
*/
export function prefetchServerExecutedTools(): void {
fetchServerExecutedTools().catch(() => {
// Errors already logged in fetchServerExecutedTools
})
}

View File

@@ -0,0 +1,58 @@
/**
* Server-side tool executor.
*
* This module provides the ability to execute tools server-side (in Next.js API routes)
* rather than requiring the browser to execute them.
*
* Key function: executeToolOnServer()
* - Returns ToolResult if the tool was executed server-side
* - Returns null if the tool is not registered (client should handle)
*/
import { createLogger } from '@sim/logger'
import { executeRegisteredTool, isServerExecutedTool } from './registry'
import type { ExecutionContext, ToolResult } from './types'
const logger = createLogger('ServerExecutor')
/**
* Execute a tool on the server if it's registered.
*
* @param toolName - The name of the tool to execute
* @param args - The arguments to pass to the tool
* @param context - Execution context (userId, workflowId, etc.)
* @returns ToolResult if executed, null if tool not registered server-side
*/
export async function executeToolOnServer(
toolName: string,
args: unknown,
context: ExecutionContext
): Promise<ToolResult | null> {
// Check if this tool should be executed server-side
if (!isServerExecutedTool(toolName)) {
logger.debug('Tool not registered for server execution, client will handle', { toolName })
return null
}
logger.info('Executing tool server-side', {
toolName,
userId: context.userId,
workflowId: context.workflowId,
})
const startTime = Date.now()
const result = await executeRegisteredTool(toolName, args, context)
logger.info('Tool execution completed', {
toolName,
success: result.success,
durationMs: Date.now() - startTime,
})
return result
}
export { isServerExecutedTool, SERVER_EXECUTED_TOOLS } from './registry'
// Re-export types and utilities
export type { ExecutionContext, ToolResult } from './types'
export { errorResult, successResult } from './types'

View File

@@ -0,0 +1,381 @@
/**
* Server Tool Registry
*
* Central registry for all server-executed tools. This replaces the scattered
* executor files with a single, declarative registry.
*
* Benefits:
* - Single source of truth for tool registration
* - Type-safe with Zod schemas
* - No duplicate wrapper code
* - Easy to add new tools
*/
import { createLogger } from '@sim/logger'
import type { z } from 'zod'
import { getBlockConfigServerTool } from '../tools/server/blocks/get-block-config'
import { getBlockOptionsServerTool } from '../tools/server/blocks/get-block-options'
// Import server tool implementations
import { getBlocksAndToolsServerTool } from '../tools/server/blocks/get-blocks-and-tools'
import { getBlocksMetadataServerTool } from '../tools/server/blocks/get-blocks-metadata-tool'
import { getTriggerBlocksServerTool } from '../tools/server/blocks/get-trigger-blocks'
import { searchDocumentationServerTool } from '../tools/server/docs/search-documentation'
import { knowledgeBaseServerTool } from '../tools/server/knowledge/knowledge-base'
import { CheckoffTodoInput, checkoffTodoServerTool } from '../tools/server/other/checkoff-todo'
import { makeApiRequestServerTool } from '../tools/server/other/make-api-request'
import {
MarkTodoInProgressInput,
markTodoInProgressServerTool,
} from '../tools/server/other/mark-todo-in-progress'
import { searchOnlineServerTool } from '../tools/server/other/search-online'
import { SleepInput, sleepServerTool } from '../tools/server/other/sleep'
import { getCredentialsServerTool } from '../tools/server/user/get-credentials'
import { setEnvironmentVariablesServerTool } from '../tools/server/user/set-environment-variables'
import {
CheckDeploymentStatusInput,
checkDeploymentStatusServerTool,
} from '../tools/server/workflow/check-deployment-status'
import {
CreateWorkspaceMcpServerInput,
createWorkspaceMcpServerServerTool,
} from '../tools/server/workflow/create-workspace-mcp-server'
import { DeployApiInput, deployApiServerTool } from '../tools/server/workflow/deploy-api'
import { DeployChatInput, deployChatServerTool } from '../tools/server/workflow/deploy-chat'
import { DeployMcpInput, deployMcpServerTool } from '../tools/server/workflow/deploy-mcp'
import { editWorkflowServerTool } from '../tools/server/workflow/edit-workflow'
import {
GetBlockOutputsInput,
getBlockOutputsServerTool,
} from '../tools/server/workflow/get-block-outputs'
import {
GetUserWorkflowInput,
getUserWorkflowServerTool,
} from '../tools/server/workflow/get-user-workflow'
import { getWorkflowConsoleServerTool } from '../tools/server/workflow/get-workflow-console'
import {
GetWorkflowFromNameInput,
getWorkflowFromNameServerTool,
} from '../tools/server/workflow/get-workflow-from-name'
import { listUserWorkflowsServerTool } from '../tools/server/workflow/list-user-workflows'
import {
ListWorkspaceMcpServersInput,
listWorkspaceMcpServersServerTool,
} from '../tools/server/workflow/list-workspace-mcp-servers'
import { RedeployInput, redeployServerTool } from '../tools/server/workflow/redeploy'
import { RunWorkflowInput, runWorkflowServerTool } from '../tools/server/workflow/run-workflow'
import {
SetGlobalWorkflowVariablesInput,
setGlobalWorkflowVariablesServerTool,
} from '../tools/server/workflow/set-global-workflow-variables'
// Import schemas
import {
EditWorkflowInput,
GetBlockConfigInput,
GetBlockOptionsInput,
GetBlocksAndToolsInput,
GetBlocksMetadataInput,
GetCredentialsInput,
GetTriggerBlocksInput,
GetWorkflowConsoleInput,
KnowledgeBaseArgsSchema,
ListUserWorkflowsInput,
MakeApiRequestInput,
SearchDocumentationInput,
SearchOnlineInput,
SetEnvironmentVariablesInput,
} from '../tools/shared/schemas'
import type { ExecutionContext, ToolResult } from './types'
import { errorResult, successResult } from './types'
const logger = createLogger('ToolRegistry')
/**
* Context type for server tools.
*/
type ServerToolContext = { userId: string } | undefined
/**
* Helper to create a typed executor wrapper.
* This provides a clean boundary between our registry (unknown args)
* and the underlying typed server tools.
*
* The generic TArgs is inferred from the Zod schema, ensuring type safety
* at compile time while allowing runtime validation.
*/
function createExecutor<TArgs, TResult>(
serverTool: { execute: (args: TArgs, ctx?: ServerToolContext) => Promise<TResult> },
options: { passContext: boolean } = { passContext: true }
): (args: unknown, ctx: ServerToolContext) => Promise<unknown> {
return (args, ctx) => {
// After Zod validation, we know args matches TArgs
// This cast is safe because validation happens before execution
const typedArgs = args as TArgs
return options.passContext ? serverTool.execute(typedArgs, ctx) : serverTool.execute(typedArgs)
}
}
/**
* Tool registration entry.
*/
interface ToolRegistration {
/** Zod schema for input validation (optional) */
inputSchema?: z.ZodType
/** Whether this tool requires authentication */
requiresAuth: boolean
/** The underlying execute function */
execute: (args: unknown, context: ServerToolContext) => Promise<unknown>
}
/**
* The tool registry - maps tool names to their configurations.
*
* Each tool is registered with:
* - inputSchema: Zod schema for validation (optional)
* - requiresAuth: Whether userId is required
* - execute: The underlying server tool's execute function
*/
const TOOL_REGISTRY: Record<string, ToolRegistration> = {
// ─────────────────────────────────────────────────────────────────────────
// Block Tools
// ─────────────────────────────────────────────────────────────────────────
get_blocks_and_tools: {
inputSchema: GetBlocksAndToolsInput,
requiresAuth: true,
execute: createExecutor(getBlocksAndToolsServerTool),
},
get_block_config: {
inputSchema: GetBlockConfigInput,
requiresAuth: true,
execute: createExecutor(getBlockConfigServerTool),
},
get_block_options: {
inputSchema: GetBlockOptionsInput,
requiresAuth: true,
execute: createExecutor(getBlockOptionsServerTool),
},
get_blocks_metadata: {
inputSchema: GetBlocksMetadataInput,
requiresAuth: true,
execute: createExecutor(getBlocksMetadataServerTool),
},
get_trigger_blocks: {
inputSchema: GetTriggerBlocksInput,
requiresAuth: true,
execute: createExecutor(getTriggerBlocksServerTool),
},
// ─────────────────────────────────────────────────────────────────────────
// Workflow Tools
// ─────────────────────────────────────────────────────────────────────────
edit_workflow: {
inputSchema: EditWorkflowInput,
requiresAuth: true,
execute: createExecutor(editWorkflowServerTool),
},
get_workflow_console: {
inputSchema: GetWorkflowConsoleInput,
requiresAuth: false, // Tool validates workflowId itself
execute: createExecutor(getWorkflowConsoleServerTool, { passContext: false }),
},
list_user_workflows: {
inputSchema: ListUserWorkflowsInput,
requiresAuth: true,
execute: createExecutor(listUserWorkflowsServerTool),
},
get_workflow_from_name: {
inputSchema: GetWorkflowFromNameInput,
requiresAuth: true,
execute: createExecutor(getWorkflowFromNameServerTool),
},
check_deployment_status: {
inputSchema: CheckDeploymentStatusInput,
requiresAuth: true,
execute: createExecutor(checkDeploymentStatusServerTool),
},
list_workspace_mcp_servers: {
inputSchema: ListWorkspaceMcpServersInput,
requiresAuth: true,
execute: createExecutor(listWorkspaceMcpServersServerTool),
},
set_global_workflow_variables: {
inputSchema: SetGlobalWorkflowVariablesInput,
requiresAuth: true,
execute: createExecutor(setGlobalWorkflowVariablesServerTool),
},
redeploy: {
inputSchema: RedeployInput,
requiresAuth: true,
execute: createExecutor(redeployServerTool),
},
create_workspace_mcp_server: {
inputSchema: CreateWorkspaceMcpServerInput,
requiresAuth: true,
execute: createExecutor(createWorkspaceMcpServerServerTool),
},
deploy_api: {
inputSchema: DeployApiInput,
requiresAuth: true,
execute: createExecutor(deployApiServerTool),
},
deploy_chat: {
inputSchema: DeployChatInput,
requiresAuth: true,
execute: createExecutor(deployChatServerTool),
},
deploy_mcp: {
inputSchema: DeployMcpInput,
requiresAuth: true,
execute: createExecutor(deployMcpServerTool),
},
run_workflow: {
inputSchema: RunWorkflowInput,
requiresAuth: true,
execute: createExecutor(runWorkflowServerTool),
},
get_user_workflow: {
inputSchema: GetUserWorkflowInput,
requiresAuth: true,
execute: createExecutor(getUserWorkflowServerTool),
},
get_block_outputs: {
inputSchema: GetBlockOutputsInput,
requiresAuth: true,
execute: createExecutor(getBlockOutputsServerTool),
},
// ─────────────────────────────────────────────────────────────────────────
// Search Tools
// ─────────────────────────────────────────────────────────────────────────
search_documentation: {
inputSchema: SearchDocumentationInput,
requiresAuth: false,
execute: createExecutor(searchDocumentationServerTool, { passContext: false }),
},
search_online: {
inputSchema: SearchOnlineInput,
requiresAuth: false,
execute: createExecutor(searchOnlineServerTool, { passContext: false }),
},
make_api_request: {
inputSchema: MakeApiRequestInput,
requiresAuth: false,
execute: createExecutor(makeApiRequestServerTool, { passContext: false }),
},
// ─────────────────────────────────────────────────────────────────────────
// Knowledge Tools
// ─────────────────────────────────────────────────────────────────────────
knowledge_base: {
inputSchema: KnowledgeBaseArgsSchema,
requiresAuth: true,
execute: createExecutor(knowledgeBaseServerTool),
},
// ─────────────────────────────────────────────────────────────────────────
// User Tools
// ─────────────────────────────────────────────────────────────────────────
get_credentials: {
inputSchema: GetCredentialsInput,
requiresAuth: true,
execute: createExecutor(getCredentialsServerTool),
},
set_environment_variables: {
inputSchema: SetEnvironmentVariablesInput,
requiresAuth: true,
execute: createExecutor(setEnvironmentVariablesServerTool),
},
// ─────────────────────────────────────────────────────────────────────────
// Todo Tools
// ─────────────────────────────────────────────────────────────────────────
checkoff_todo: {
inputSchema: CheckoffTodoInput,
requiresAuth: false, // Just returns success, no auth needed
execute: createExecutor(checkoffTodoServerTool, { passContext: false }),
},
mark_todo_in_progress: {
inputSchema: MarkTodoInProgressInput,
requiresAuth: false,
execute: createExecutor(markTodoInProgressServerTool, { passContext: false }),
},
// ─────────────────────────────────────────────────────────────────────────
// Utility Tools
// ─────────────────────────────────────────────────────────────────────────
sleep: {
inputSchema: SleepInput,
requiresAuth: false,
execute: createExecutor(sleepServerTool, { passContext: false }),
},
}
/**
* List of all server-executed tool names.
* Export this so clients know which tools NOT to execute locally.
*/
export const SERVER_EXECUTED_TOOLS = Object.keys(TOOL_REGISTRY)
/**
* Check if a tool is registered for server execution.
*/
export function isServerExecutedTool(toolName: string): boolean {
return toolName in TOOL_REGISTRY
}
/**
* Execute a tool with proper validation and error handling.
*
* This is the main entry point for tool execution. It:
* 1. Looks up the tool in the registry
* 2. Validates input against the schema (if provided)
* 3. Checks authentication requirements
* 4. Executes the tool
* 5. Returns a standardized ToolResult
*/
export async function executeRegisteredTool(
toolName: string,
args: unknown,
context: ExecutionContext
): Promise<ToolResult> {
const registration = TOOL_REGISTRY[toolName]
if (!registration) {
logger.warn('Unknown tool requested', { toolName })
return errorResult('UNKNOWN_TOOL', `Tool '${toolName}' is not registered for server execution`)
}
// Check authentication requirement
if (registration.requiresAuth && !context.userId) {
logger.error('Authentication required but not provided', { toolName })
return errorResult('AUTH_REQUIRED', `Tool '${toolName}' requires authentication`)
}
// Validate input if schema is provided
let validatedArgs: unknown = args ?? {}
if (registration.inputSchema) {
const parseResult = registration.inputSchema.safeParse(args ?? {})
if (!parseResult.success) {
logger.warn('Input validation failed', {
toolName,
errors: parseResult.error.flatten(),
})
return errorResult('VALIDATION_ERROR', 'Invalid input arguments', {
errors: parseResult.error.flatten().fieldErrors,
})
}
validatedArgs = parseResult.data
}
// Execute the tool
try {
const result = await registration.execute(
validatedArgs,
context.userId ? { userId: context.userId } : undefined
)
return successResult(result)
} catch (error) {
const message = error instanceof Error ? error.message : String(error)
logger.error('Tool execution failed', { toolName, error: message })
return errorResult('EXECUTION_ERROR', message)
}
}

View File

@@ -0,0 +1,425 @@
/**
* SSE Stream Tool Execution Handler
*
* This module intercepts tool_call events from the Go copilot SSE stream
* and executes server-side tools, calling mark-complete to return results.
*
* Key features:
* - Non-blocking: Tool execution happens in parallel with stream forwarding
* - Resilient: Uses Redis for state persistence across disconnects
* - Transparent: Still forwards all events to browser for UI updates
*/
import { createLogger } from '@sim/logger'
import { SIM_AGENT_API_URL_DEFAULT } from '@/lib/copilot/constants'
import { env } from '@/lib/core/config/env'
import { getRedisClient } from '@/lib/core/config/redis'
import { executeToolOnServer, isServerExecutedTool } from './index'
import type { ExecutionContext } from './types'
const logger = createLogger('StreamToolHandler')
const SIM_AGENT_API_URL = env.SIM_AGENT_API_URL || SIM_AGENT_API_URL_DEFAULT
/** Redis key prefix for tool execution state */
const REDIS_KEY_PREFIX = 'copilot:tool_exec:'
/** TTL for Redis entries (1 hour) */
const REDIS_TTL_SECONDS = 60 * 60
/**
* Tool execution state stored in Redis
*/
interface ToolExecutionState {
toolCallId: string
toolName: string
status: 'pending' | 'executing' | 'completed' | 'failed'
userId: string
workflowId?: string
chatId?: string
startedAt: number
completedAt?: number
result?: unknown
error?: string
}
/**
* Tool call data from SSE event
*/
interface ToolCallEvent {
id: string
name: string
arguments: Record<string, unknown>
partial?: boolean
}
/**
* Save tool execution state to Redis.
*/
async function saveToolState(state: ToolExecutionState): Promise<void> {
const redis = getRedisClient()
if (!redis) {
logger.debug('Redis not available, skipping state save', {
toolCallId: state.toolCallId,
})
return
}
try {
const key = `${REDIS_KEY_PREFIX}${state.toolCallId}`
await redis.setex(key, REDIS_TTL_SECONDS, JSON.stringify(state))
logger.debug('Saved tool execution state to Redis', {
toolCallId: state.toolCallId,
status: state.status,
})
} catch (error) {
logger.warn('Failed to save tool state to Redis', {
toolCallId: state.toolCallId,
error: error instanceof Error ? error.message : String(error),
})
}
}
/**
* Get tool execution state from Redis.
*/
async function getToolState(toolCallId: string): Promise<ToolExecutionState | null> {
const redis = getRedisClient()
if (!redis) return null
try {
const key = `${REDIS_KEY_PREFIX}${toolCallId}`
const data = await redis.get(key)
if (!data) return null
return JSON.parse(data) as ToolExecutionState
} catch (error) {
logger.warn('Failed to get tool state from Redis', {
toolCallId,
error: error instanceof Error ? error.message : String(error),
})
return null
}
}
/**
* Mark a tool as complete by calling the Go copilot endpoint.
*/
async function markToolComplete(
toolCallId: string,
toolName: string,
status: number,
message?: unknown,
data?: unknown
): Promise<boolean> {
try {
const payload = {
id: toolCallId,
name: toolName,
status,
message,
data,
}
logger.info('Marking tool complete from server', {
toolCallId,
toolName,
status,
hasMessage: message !== undefined,
hasData: data !== undefined,
})
const response = await fetch(`${SIM_AGENT_API_URL}/api/tools/mark-complete`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
...(env.COPILOT_API_KEY ? { 'x-api-key': env.COPILOT_API_KEY } : {}),
},
body: JSON.stringify(payload),
})
if (!response.ok) {
const errorText = await response.text().catch(() => '')
logger.error('Failed to mark tool complete', {
toolCallId,
toolName,
status: response.status,
error: errorText,
})
return false
}
logger.info('Tool marked complete successfully', { toolCallId, toolName })
return true
} catch (error) {
logger.error('Error marking tool complete', {
toolCallId,
toolName,
error: error instanceof Error ? error.message : String(error),
})
return false
}
}
/**
* Handle a tool call event from the SSE stream.
*
* If the tool is server-executed:
* 1. Execute it using the server executor
* 2. Call mark-complete to return result to Go
*
* This runs asynchronously and doesn't block the stream.
*
* @returns true if this tool will be handled server-side, false if client should handle
*/
export async function handleToolCallEvent(
event: ToolCallEvent,
context: ExecutionContext
): Promise<boolean> {
// Skip partial tool calls (streaming arguments)
if (event.partial) {
return false
}
// Check if this tool should be executed server-side
if (!isServerExecutedTool(event.name)) {
logger.debug('Tool not server-executed, client will handle', {
toolCallId: event.id,
toolName: event.name,
})
return false
}
// Check if this tool is already being executed (recovery scenario)
const existingState = await getToolState(event.id)
if (existingState) {
if (existingState.status === 'executing') {
logger.info('Tool already being executed (recovery scenario)', {
toolCallId: event.id,
toolName: event.name,
startedAt: existingState.startedAt,
})
return true
}
if (existingState.status === 'completed') {
logger.info('Tool already completed (recovery scenario)', {
toolCallId: event.id,
toolName: event.name,
completedAt: existingState.completedAt,
})
return true
}
}
logger.info('Handling tool call server-side', {
toolCallId: event.id,
toolName: event.name,
userId: context.userId,
})
// Save initial state to Redis
await saveToolState({
toolCallId: event.id,
toolName: event.name,
status: 'pending',
userId: context.userId,
workflowId: context.workflowId,
chatId: context.chatId,
startedAt: Date.now(),
})
// Execute asynchronously - don't await here to avoid blocking stream
executeToolServerSide(event, context).catch((error) => {
logger.error('Async tool execution failed', {
toolCallId: event.id,
toolName: event.name,
error: error instanceof Error ? error.message : String(error),
})
})
return true
}
/**
* Execute a tool server-side and mark it complete.
* This is called asynchronously from handleToolCallEvent.
*/
async function executeToolServerSide(
event: ToolCallEvent,
context: ExecutionContext
): Promise<void> {
const startTime = Date.now()
// Update state to executing
await saveToolState({
toolCallId: event.id,
toolName: event.name,
status: 'executing',
userId: context.userId,
workflowId: context.workflowId,
chatId: context.chatId,
startedAt: startTime,
})
try {
const result = await executeToolOnServer(event.name, event.arguments, context)
if (!result) {
// This shouldn't happen since we checked isServerExecutedTool
logger.error('executeToolOnServer returned null for registered tool', {
toolCallId: event.id,
toolName: event.name,
})
await saveToolState({
toolCallId: event.id,
toolName: event.name,
status: 'failed',
userId: context.userId,
workflowId: context.workflowId,
chatId: context.chatId,
startedAt: startTime,
completedAt: Date.now(),
error: 'Internal error: tool not found',
})
await markToolComplete(event.id, event.name, 500, 'Internal error: tool not found')
return
}
const durationMs = Date.now() - startTime
if (result.success) {
logger.info('Tool executed successfully', {
toolCallId: event.id,
toolName: event.name,
durationMs,
})
await saveToolState({
toolCallId: event.id,
toolName: event.name,
status: 'completed',
userId: context.userId,
workflowId: context.workflowId,
chatId: context.chatId,
startedAt: startTime,
completedAt: Date.now(),
result: result.data,
})
// Mark complete with success
await markToolComplete(
event.id,
event.name,
200,
undefined, // message
result.data // data
)
} else {
logger.warn('Tool execution failed', {
toolCallId: event.id,
toolName: event.name,
durationMs,
error: result.error,
})
await saveToolState({
toolCallId: event.id,
toolName: event.name,
status: 'failed',
userId: context.userId,
workflowId: context.workflowId,
chatId: context.chatId,
startedAt: startTime,
completedAt: Date.now(),
error: result.error?.message,
})
// Mark complete with error
await markToolComplete(
event.id,
event.name,
400,
result.error?.message ?? 'Tool execution failed',
result.error?.details
)
}
} catch (error) {
const durationMs = Date.now() - startTime
const message = error instanceof Error ? error.message : String(error)
logger.error('Tool execution threw exception', {
toolCallId: event.id,
toolName: event.name,
durationMs,
error: message,
})
await saveToolState({
toolCallId: event.id,
toolName: event.name,
status: 'failed',
userId: context.userId,
workflowId: context.workflowId,
chatId: context.chatId,
startedAt: startTime,
completedAt: Date.now(),
error: message,
})
// Mark complete with error
await markToolComplete(event.id, event.name, 500, message)
}
}
/**
* In-memory fallback for tracking server-handled tools when Redis is unavailable.
*/
const serverHandledTools = new Map<string, { toolName: string; handledAt: number }>()
/**
* Register a tool as being handled server-side.
*/
export function registerServerHandledTool(toolCallId: string, toolName: string): void {
serverHandledTools.set(toolCallId, {
toolName,
handledAt: Date.now(),
})
// Clean up old entries (older than 1 hour)
const oneHourAgo = Date.now() - 60 * 60 * 1000
for (const [id, info] of serverHandledTools.entries()) {
if (info.handledAt < oneHourAgo) {
serverHandledTools.delete(id)
}
}
}
/**
* Check if a tool was handled server-side.
*/
export async function wasToolHandledServerSide(toolCallId: string): Promise<boolean> {
// Check in-memory first
if (serverHandledTools.has(toolCallId)) {
return true
}
// Check Redis
const state = await getToolState(toolCallId)
return state !== null
}
/**
* Get the execution state of a tool.
* Useful for client reconnection scenarios.
*/
export async function getToolExecutionState(
toolCallId: string
): Promise<ToolExecutionState | null> {
return getToolState(toolCallId)
}
/**
* Get list of server-executed tool names for client reference.
*/
export { SERVER_EXECUTED_TOOLS } from './registry'

View File

@@ -0,0 +1,87 @@
/**
* Type definitions for the server executor.
*
* This provides a clean, type-safe interface for tool execution
* without any 'any' types.
*/
import type { z } from 'zod'
/**
* Standard result type for all tool executions.
* This is the contract between server executors and the chat route.
*/
export interface ToolResult<T = unknown> {
success: boolean
data?: T
error?: {
code: string
message: string
details?: Record<string, unknown>
}
}
/**
* Context passed to tool executors.
*/
export interface ExecutionContext {
userId: string
workflowId?: string
chatId?: string
}
/**
* Configuration for a registered tool.
* This defines how a tool should be validated and executed.
*/
export interface ToolConfig<
TInputSchema extends z.ZodType = z.ZodType,
TOutputSchema extends z.ZodType = z.ZodType,
> {
/** The canonical name of the tool */
name: string
/** Zod schema for validating input args (optional - if not provided, args pass through) */
inputSchema?: TInputSchema
/** Zod schema for validating output (optional - if not provided, output passes through) */
outputSchema?: TOutputSchema
/** Whether context (userId) is required for this tool */
requiresAuth?: boolean
/**
* The execute function.
* Takes validated args and context, returns result data.
*/
execute: (
args: TInputSchema extends z.ZodType ? z.infer<TInputSchema> : unknown,
context: ExecutionContext
) => Promise<TOutputSchema extends z.ZodType ? z.infer<TOutputSchema> : unknown>
}
/**
* Type for a tool executor function (after wrapping).
*/
export type ToolExecutor = (args: unknown, context: ExecutionContext) => Promise<ToolResult>
/**
* Helper to create a success result.
*/
export function successResult<T>(data: T): ToolResult<T> {
return { success: true, data }
}
/**
* Helper to create an error result.
*/
export function errorResult(
code: string,
message: string,
details?: Record<string, unknown>
): ToolResult {
return {
success: false,
error: { code, message, details },
}
}

View File

@@ -1,4 +1,12 @@
export interface BaseServerTool<TArgs = any, TResult = any> {
/**
* Base interface for server-executed tools.
*
* @template TArgs - The type of arguments the tool accepts
* @template TResult - The type of result the tool returns
*/
export interface BaseServerTool<TArgs = unknown, TResult = unknown> {
/** The canonical name of the tool (must match the registry key) */
name: string
/** Execute the tool with the given arguments and context */
execute(args: TArgs, context?: { userId: string }): Promise<TResult>
}

View File

@@ -0,0 +1,44 @@
import { createLogger } from '@sim/logger'
import { z } from 'zod'
import type { BaseServerTool } from '@/lib/copilot/tools/server/base-tool'
const logger = createLogger('CheckoffTodoServerTool')
export const CheckoffTodoInput = z.object({
id: z.string().optional(),
todoId: z.string().optional(),
})
export const CheckoffTodoResult = z.object({
todoId: z.string(),
success: z.boolean(),
})
export type CheckoffTodoInputType = z.infer<typeof CheckoffTodoInput>
export type CheckoffTodoResultType = z.infer<typeof CheckoffTodoResult>
/**
* Server-side tool to mark a todo as complete.
* The actual UI update happens client-side when the store receives the tool_result event.
*/
export const checkoffTodoServerTool: BaseServerTool<CheckoffTodoInputType, CheckoffTodoResultType> =
{
name: 'checkoff_todo',
async execute(args: unknown, _context?: { userId: string }) {
const parsed = CheckoffTodoInput.parse(args)
const todoId = parsed.id || parsed.todoId
if (!todoId) {
throw new Error('Missing todo id')
}
logger.info('Marking todo as complete', { todoId })
// The actual state update happens client-side via tool_result handler
// We just return success to signal the action was processed
return CheckoffTodoResult.parse({
todoId,
success: true,
})
},
}

View File

@@ -0,0 +1,45 @@
import { createLogger } from '@sim/logger'
import { z } from 'zod'
import type { BaseServerTool } from '@/lib/copilot/tools/server/base-tool'
const logger = createLogger('MarkTodoInProgressServerTool')
export const MarkTodoInProgressInput = z.object({
id: z.string().optional(),
todoId: z.string().optional(),
})
export const MarkTodoInProgressResult = z.object({
todoId: z.string(),
success: z.boolean(),
})
export type MarkTodoInProgressInputType = z.infer<typeof MarkTodoInProgressInput>
export type MarkTodoInProgressResultType = z.infer<typeof MarkTodoInProgressResult>
/**
* Server-side tool to mark a todo as in progress.
* The actual UI update happens client-side when the store receives the tool_result event.
*/
export const markTodoInProgressServerTool: BaseServerTool<
MarkTodoInProgressInputType,
MarkTodoInProgressResultType
> = {
name: 'mark_todo_in_progress',
async execute(args: unknown, _context?: { userId: string }) {
const parsed = MarkTodoInProgressInput.parse(args)
const todoId = parsed.id || parsed.todoId
if (!todoId) {
throw new Error('Missing todo id')
}
logger.info('Marking todo as in progress', { todoId })
// The actual state update happens client-side via tool_result handler
return MarkTodoInProgressResult.parse({
todoId,
success: true,
})
},
}

View File

@@ -0,0 +1,45 @@
import { createLogger } from '@sim/logger'
import { z } from 'zod'
import type { BaseServerTool } from '@/lib/copilot/tools/server/base-tool'
const logger = createLogger('SleepServerTool')
/** Maximum sleep duration in seconds (3 minutes) */
const MAX_SLEEP_SECONDS = 180
export const SleepInput = z.object({
seconds: z.number().min(0).max(MAX_SLEEP_SECONDS).optional().default(0),
})
export const SleepResult = z.object({
sleptFor: z.number(),
success: z.boolean(),
})
export type SleepInputType = z.infer<typeof SleepInput>
export type SleepResultType = z.infer<typeof SleepResult>
export const sleepServerTool: BaseServerTool<SleepInputType, SleepResultType> = {
name: 'sleep',
async execute(args: unknown, _context?: { userId: string }) {
const parsed = SleepInput.parse(args)
let seconds = parsed.seconds
// Clamp to max
if (seconds > MAX_SLEEP_SECONDS) {
seconds = MAX_SLEEP_SECONDS
}
logger.info('Starting sleep', { seconds })
// Actually sleep
await new Promise((resolve) => setTimeout(resolve, seconds * 1000))
logger.info('Sleep completed', { seconds })
return SleepResult.parse({
sleptFor: seconds,
success: true,
})
},
}

View File

@@ -1,66 +1,37 @@
import { createLogger } from '@sim/logger'
import type { BaseServerTool } from '@/lib/copilot/tools/server/base-tool'
import { getBlockConfigServerTool } from '@/lib/copilot/tools/server/blocks/get-block-config'
import { getBlockOptionsServerTool } from '@/lib/copilot/tools/server/blocks/get-block-options'
import { getBlocksAndToolsServerTool } from '@/lib/copilot/tools/server/blocks/get-blocks-and-tools'
import { getBlocksMetadataServerTool } from '@/lib/copilot/tools/server/blocks/get-blocks-metadata-tool'
import { getTriggerBlocksServerTool } from '@/lib/copilot/tools/server/blocks/get-trigger-blocks'
import { searchDocumentationServerTool } from '@/lib/copilot/tools/server/docs/search-documentation'
import { knowledgeBaseServerTool } from '@/lib/copilot/tools/server/knowledge/knowledge-base'
import { makeApiRequestServerTool } from '@/lib/copilot/tools/server/other/make-api-request'
import { searchOnlineServerTool } from '@/lib/copilot/tools/server/other/search-online'
import { getCredentialsServerTool } from '@/lib/copilot/tools/server/user/get-credentials'
import { setEnvironmentVariablesServerTool } from '@/lib/copilot/tools/server/user/set-environment-variables'
import { editWorkflowServerTool } from '@/lib/copilot/tools/server/workflow/edit-workflow'
import { getWorkflowConsoleServerTool } from '@/lib/copilot/tools/server/workflow/get-workflow-console'
import {
ExecuteResponseSuccessSchema,
GetBlockConfigInput,
GetBlockConfigResult,
GetBlockOptionsInput,
GetBlockOptionsResult,
GetBlocksAndToolsInput,
GetBlocksAndToolsResult,
GetBlocksMetadataInput,
GetBlocksMetadataResult,
GetTriggerBlocksInput,
GetTriggerBlocksResult,
KnowledgeBaseArgsSchema,
} from '@/lib/copilot/tools/shared/schemas'
/**
* Server Tool Router
*
* This module provides backwards compatibility for the execute-copilot-server-tool API route.
* It delegates to the new unified registry in server-executor/registry.ts
*
* @deprecated Use executeRegisteredTool from server-executor/registry.ts directly
*/
// Generic execute response schemas (success path only for this route; errors handled via HTTP status)
import { createLogger } from '@sim/logger'
import { executeRegisteredTool, isServerExecutedTool } from '@/lib/copilot/server-executor/registry'
import { ExecuteResponseSuccessSchema } from '@/lib/copilot/tools/shared/schemas'
const logger = createLogger('ServerToolRouter')
// Re-export for backwards compatibility
export { ExecuteResponseSuccessSchema }
export type ExecuteResponseSuccess = (typeof ExecuteResponseSuccessSchema)['_type']
// Define server tool registry for the new copilot runtime
const serverToolRegistry: Record<string, BaseServerTool<any, any>> = {}
const logger = createLogger('ServerToolRouter')
// Register tools
serverToolRegistry[getBlocksAndToolsServerTool.name] = getBlocksAndToolsServerTool
serverToolRegistry[getBlocksMetadataServerTool.name] = getBlocksMetadataServerTool
serverToolRegistry[getBlockOptionsServerTool.name] = getBlockOptionsServerTool
serverToolRegistry[getBlockConfigServerTool.name] = getBlockConfigServerTool
serverToolRegistry[getTriggerBlocksServerTool.name] = getTriggerBlocksServerTool
serverToolRegistry[editWorkflowServerTool.name] = editWorkflowServerTool
serverToolRegistry[getWorkflowConsoleServerTool.name] = getWorkflowConsoleServerTool
serverToolRegistry[searchDocumentationServerTool.name] = searchDocumentationServerTool
serverToolRegistry[searchOnlineServerTool.name] = searchOnlineServerTool
serverToolRegistry[setEnvironmentVariablesServerTool.name] = setEnvironmentVariablesServerTool
serverToolRegistry[getCredentialsServerTool.name] = getCredentialsServerTool
serverToolRegistry[makeApiRequestServerTool.name] = makeApiRequestServerTool
serverToolRegistry[knowledgeBaseServerTool.name] = knowledgeBaseServerTool
/**
* Route execution to the appropriate server tool.
*
* @deprecated Use executeRegisteredTool from server-executor/registry.ts directly
*/
export async function routeExecution(
toolName: string,
payload: unknown,
context?: { userId: string }
): Promise<any> {
const tool = serverToolRegistry[toolName]
if (!tool) {
): Promise<unknown> {
if (!isServerExecutedTool(toolName)) {
throw new Error(`Unknown server tool: ${toolName}`)
}
logger.debug('Routing to tool', {
logger.debug('Routing to tool via unified registry', {
toolName,
payloadPreview: (() => {
try {
@@ -71,43 +42,15 @@ export async function routeExecution(
})(),
})
let args: any = payload || {}
if (toolName === 'get_blocks_and_tools') {
args = GetBlocksAndToolsInput.parse(args)
}
if (toolName === 'get_blocks_metadata') {
args = GetBlocksMetadataInput.parse(args)
}
if (toolName === 'get_block_options') {
args = GetBlockOptionsInput.parse(args)
}
if (toolName === 'get_block_config') {
args = GetBlockConfigInput.parse(args)
}
if (toolName === 'get_trigger_blocks') {
args = GetTriggerBlocksInput.parse(args)
}
if (toolName === 'knowledge_base') {
args = KnowledgeBaseArgsSchema.parse(args)
const result = await executeRegisteredTool(toolName, payload, {
userId: context?.userId ?? '',
})
// The old API expected the raw result, not wrapped in ToolResult
// For backwards compatibility, unwrap and throw on error
if (!result.success) {
throw new Error(result.error?.message ?? 'Tool execution failed')
}
const result = await tool.execute(args, context)
if (toolName === 'get_blocks_and_tools') {
return GetBlocksAndToolsResult.parse(result)
}
if (toolName === 'get_blocks_metadata') {
return GetBlocksMetadataResult.parse(result)
}
if (toolName === 'get_block_options') {
return GetBlockOptionsResult.parse(result)
}
if (toolName === 'get_block_config') {
return GetBlockConfigResult.parse(result)
}
if (toolName === 'get_trigger_blocks') {
return GetTriggerBlocksResult.parse(result)
}
return result
return result.data
}

View File

@@ -0,0 +1,172 @@
import { db } from '@sim/db'
import {
chat,
workflow,
workflowDeploymentVersion,
workflowMcpServer,
workflowMcpTool,
} from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq } from 'drizzle-orm'
import { z } from 'zod'
import type { BaseServerTool } from '@/lib/copilot/tools/server/base-tool'
import { env } from '@/lib/core/config/env'
const logger = createLogger('CheckDeploymentStatusServerTool')
export const CheckDeploymentStatusInput = z.object({
workflowId: z.string(),
})
export const CheckDeploymentStatusResult = z.object({
isDeployed: z.boolean(),
deploymentTypes: z.array(z.string()),
api: z.object({
isDeployed: z.boolean(),
deployedAt: z.string().nullable(),
endpoint: z.string().nullable(),
}),
chat: z.object({
isDeployed: z.boolean(),
chatId: z.string().nullable(),
identifier: z.string().nullable(),
chatUrl: z.string().nullable(),
title: z.string().nullable(),
}),
mcp: z.object({
isDeployed: z.boolean(),
servers: z.array(
z.object({
serverId: z.string(),
serverName: z.string(),
toolName: z.string(),
})
),
}),
message: z.string(),
})
export type CheckDeploymentStatusInputType = z.infer<typeof CheckDeploymentStatusInput>
export type CheckDeploymentStatusResultType = z.infer<typeof CheckDeploymentStatusResult>
export const checkDeploymentStatusServerTool: BaseServerTool<
CheckDeploymentStatusInputType,
CheckDeploymentStatusResultType
> = {
name: 'check_deployment_status',
async execute(args: unknown, _context?: { userId: string }) {
const parsed = CheckDeploymentStatusInput.parse(args)
const { workflowId } = parsed
logger.debug('Checking deployment status', { workflowId })
// Get workflow to find workspaceId
const [wf] = await db
.select({ workspaceId: workflow.workspaceId })
.from(workflow)
.where(eq(workflow.id, workflowId))
.limit(1)
const workspaceId = wf?.workspaceId
// Check API deployment (active deployment version)
const [apiDeploy] = await db
.select({
id: workflowDeploymentVersion.id,
createdAt: workflowDeploymentVersion.createdAt,
})
.from(workflowDeploymentVersion)
.where(
and(
eq(workflowDeploymentVersion.workflowId, workflowId),
eq(workflowDeploymentVersion.isActive, true)
)
)
.limit(1)
const isApiDeployed = !!apiDeploy
const appUrl = env.NEXT_PUBLIC_APP_URL || 'https://simstudio.ai'
// Check chat deployment
const [chatDeploy] = await db
.select({
id: chat.id,
identifier: chat.identifier,
title: chat.title,
})
.from(chat)
.where(eq(chat.workflowId, workflowId))
.limit(1)
const isChatDeployed = !!chatDeploy
// Check MCP deployment
let mcpToolDeployments: { serverId: string; serverName: string; toolName: string }[] = []
if (workspaceId) {
const mcpTools = await db
.select({
toolName: workflowMcpTool.toolName,
serverId: workflowMcpTool.serverId,
serverName: workflowMcpServer.name,
})
.from(workflowMcpTool)
.innerJoin(workflowMcpServer, eq(workflowMcpTool.serverId, workflowMcpServer.id))
.where(eq(workflowMcpTool.workflowId, workflowId))
mcpToolDeployments = mcpTools.map((t) => ({
serverId: t.serverId,
serverName: t.serverName,
toolName: t.toolName,
}))
}
const isMcpDeployed = mcpToolDeployments.length > 0
// Build result
const deploymentTypes: string[] = []
if (isApiDeployed) deploymentTypes.push('api')
if (isChatDeployed) deploymentTypes.push('chat')
if (isMcpDeployed) deploymentTypes.push('mcp')
const isDeployed = isApiDeployed || isChatDeployed || isMcpDeployed
// Build summary message
let message = ''
if (!isDeployed) {
message = 'Workflow is not deployed'
} else {
const parts: string[] = []
if (isApiDeployed) parts.push('API')
if (isChatDeployed) parts.push(`Chat (${chatDeploy?.identifier})`)
if (isMcpDeployed) {
const serverNames = [...new Set(mcpToolDeployments.map((d) => d.serverName))].join(', ')
parts.push(`MCP (${serverNames})`)
}
message = `Workflow is deployed as: ${parts.join(', ')}`
}
logger.info('Checked deployment status', { workflowId, isDeployed, deploymentTypes })
return CheckDeploymentStatusResult.parse({
isDeployed,
deploymentTypes,
api: {
isDeployed: isApiDeployed,
deployedAt: apiDeploy?.createdAt?.toISOString() || null,
endpoint: isApiDeployed ? `${appUrl}/api/workflows/${workflowId}/execute` : null,
},
chat: {
isDeployed: isChatDeployed,
chatId: chatDeploy?.id || null,
identifier: chatDeploy?.identifier || null,
chatUrl: isChatDeployed ? `${appUrl}/chat/${chatDeploy?.identifier}` : null,
title: chatDeploy?.title || null,
},
mcp: {
isDeployed: isMcpDeployed,
servers: mcpToolDeployments,
},
message,
})
},
}

View File

@@ -0,0 +1,73 @@
import { db } from '@sim/db'
import { workflowMcpServer } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { eq } from 'drizzle-orm'
import { z } from 'zod'
import type { BaseServerTool } from '@/lib/copilot/tools/server/base-tool'
const logger = createLogger('CreateWorkspaceMcpServerServerTool')
export const CreateWorkspaceMcpServerInput = z.object({
name: z.string().min(1),
description: z.string().optional(),
workspaceId: z.string().min(1),
})
export const CreateWorkspaceMcpServerResult = z.object({
success: z.boolean(),
serverId: z.string().nullable(),
serverName: z.string().nullable(),
description: z.string().nullable(),
message: z.string(),
})
export type CreateWorkspaceMcpServerInputType = z.infer<typeof CreateWorkspaceMcpServerInput>
export type CreateWorkspaceMcpServerResultType = z.infer<typeof CreateWorkspaceMcpServerResult>
export const createWorkspaceMcpServerServerTool: BaseServerTool<
CreateWorkspaceMcpServerInputType,
CreateWorkspaceMcpServerResultType
> = {
name: 'create_workspace_mcp_server',
async execute(args: unknown, context?: { userId: string }) {
const parsed = CreateWorkspaceMcpServerInput.parse(args)
const { name, description, workspaceId } = parsed
if (!context?.userId) {
throw new Error('User authentication required')
}
logger.debug('Creating workspace MCP server', { name, workspaceId })
// Check if server with same name already exists
const existing = await db
.select({ id: workflowMcpServer.id })
.from(workflowMcpServer)
.where(eq(workflowMcpServer.workspaceId, workspaceId))
.limit(100)
// Generate unique ID
const serverId = crypto.randomUUID()
const now = new Date()
await db.insert(workflowMcpServer).values({
id: serverId,
workspaceId,
createdBy: context.userId,
name: name.trim(),
description: description?.trim() || null,
createdAt: now,
updatedAt: now,
})
logger.info('Created MCP server', { serverId, name })
return CreateWorkspaceMcpServerResult.parse({
success: true,
serverId,
serverName: name.trim(),
description: description?.trim() || null,
message: `MCP server "${name}" created successfully. You can now deploy workflows to it using deploy_mcp.`,
})
},
}

View File

@@ -0,0 +1,152 @@
import { db } from '@sim/db'
import { apiKey, workflow, workflowDeploymentVersion } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { desc, eq, or } from 'drizzle-orm'
import { z } from 'zod'
import type { BaseServerTool } from '@/lib/copilot/tools/server/base-tool'
import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/persistence/utils'
const logger = createLogger('DeployApiServerTool')
export const DeployApiInput = z.object({
action: z.enum(['deploy', 'undeploy']).default('deploy'),
workflowId: z.string().min(1),
})
export const DeployApiResult = z.object({
success: z.boolean(),
action: z.string(),
isDeployed: z.boolean(),
deployedAt: z.string().nullable(),
endpoint: z.string().nullable(),
curlCommand: z.string().nullable(),
message: z.string(),
needsApiKey: z.boolean().optional(),
})
export type DeployApiInputType = z.infer<typeof DeployApiInput>
export type DeployApiResultType = z.infer<typeof DeployApiResult>
export const deployApiServerTool: BaseServerTool<DeployApiInputType, DeployApiResultType> = {
name: 'deploy_api',
async execute(args: unknown, context?: { userId: string }) {
const parsed = DeployApiInput.parse(args)
const { action, workflowId } = parsed
if (!context?.userId) {
throw new Error('User authentication required')
}
logger.debug('Deploy API', { action, workflowId })
// Get workflow info
const [wf] = await db.select().from(workflow).where(eq(workflow.id, workflowId)).limit(1)
if (!wf) {
throw new Error(`Workflow not found: ${workflowId}`)
}
const workspaceId = wf.workspaceId
if (action === 'undeploy') {
// Deactivate all deployment versions
await db
.update(workflowDeploymentVersion)
.set({ isActive: false })
.where(eq(workflowDeploymentVersion.workflowId, workflowId))
logger.info('Workflow undeployed', { workflowId })
return DeployApiResult.parse({
success: true,
action: 'undeploy',
isDeployed: false,
deployedAt: null,
endpoint: null,
curlCommand: null,
message: 'Workflow undeployed successfully.',
})
}
// Deploy action - check if user has API keys
const keys = await db
.select({ id: apiKey.id })
.from(apiKey)
.where(
or(
eq(apiKey.userId, context.userId),
workspaceId ? eq(apiKey.workspaceId, workspaceId) : undefined
)
)
.limit(1)
if (keys.length === 0) {
return DeployApiResult.parse({
success: false,
action: 'deploy',
isDeployed: false,
deployedAt: null,
endpoint: null,
curlCommand: null,
message:
'Cannot deploy without an API key. Please create an API key in settings first, then try deploying again.',
needsApiKey: true,
})
}
// Get current max version
const [maxVersion] = await db
.select({ version: workflowDeploymentVersion.version })
.from(workflowDeploymentVersion)
.where(eq(workflowDeploymentVersion.workflowId, workflowId))
.orderBy(desc(workflowDeploymentVersion.version))
.limit(1)
const newVersion = (maxVersion?.version || 0) + 1
// Deactivate all existing versions
await db
.update(workflowDeploymentVersion)
.set({ isActive: false })
.where(eq(workflowDeploymentVersion.workflowId, workflowId))
// Create new deployment version
const deploymentId = crypto.randomUUID()
const now = new Date()
// Load workflow state from normalized tables
const normalizedData = await loadWorkflowFromNormalizedTables(workflowId)
const workflowState = {
blocks: normalizedData?.blocks || {},
edges: normalizedData?.edges || [],
loops: normalizedData?.loops || {},
parallels: normalizedData?.parallels || {},
}
await db.insert(workflowDeploymentVersion).values({
id: deploymentId,
workflowId,
version: newVersion,
state: workflowState,
isActive: true,
createdAt: now,
})
// Build API info
const appUrl = process.env.NEXT_PUBLIC_APP_URL || 'http://localhost:3000'
const apiEndpoint = `${appUrl}/api/workflows/${workflowId}/execute`
const curlCommand = `curl -X POST -H "X-API-Key: $SIM_API_KEY" -H "Content-Type: application/json" ${apiEndpoint}`
logger.info('Workflow deployed as API', { workflowId, version: newVersion })
return DeployApiResult.parse({
success: true,
action: 'deploy',
isDeployed: true,
deployedAt: now.toISOString(),
endpoint: apiEndpoint,
curlCommand,
message: 'Workflow deployed successfully as API. You can now call it via REST.',
})
},
}

View File

@@ -0,0 +1,274 @@
import { db } from '@sim/db'
import { chat, workflow, workflowDeploymentVersion } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, desc, eq } from 'drizzle-orm'
import { z } from 'zod'
import type { BaseServerTool } from '@/lib/copilot/tools/server/base-tool'
import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/persistence/utils'
const logger = createLogger('DeployChatServerTool')
const OutputConfigSchema = z.object({
blockId: z.string(),
path: z.string(),
})
export const DeployChatInput = z.object({
action: z.enum(['deploy', 'undeploy']).default('deploy'),
workflowId: z.string().min(1),
identifier: z.string().optional(),
title: z.string().optional(),
description: z.string().optional(),
authType: z.enum(['public', 'password', 'email', 'sso']).optional().default('public'),
password: z.string().optional(),
allowedEmails: z.array(z.string()).optional(),
welcomeMessage: z.string().optional(),
outputConfigs: z.array(OutputConfigSchema).optional(),
})
export const DeployChatResult = z.object({
success: z.boolean(),
action: z.string(),
isDeployed: z.boolean(),
chatId: z.string().nullable(),
chatUrl: z.string().nullable(),
identifier: z.string().nullable(),
title: z.string().nullable(),
authType: z.string().nullable(),
message: z.string(),
error: z.string().optional(),
errorCode: z.string().optional(),
})
export type DeployChatInputType = z.infer<typeof DeployChatInput>
export type DeployChatResultType = z.infer<typeof DeployChatResult>
function generateIdentifier(workflowName: string): string {
return workflowName
.toLowerCase()
.replace(/[^a-z0-9]+/g, '-')
.replace(/^-|-$/g, '')
.substring(0, 50)
}
export const deployChatServerTool: BaseServerTool<DeployChatInputType, DeployChatResultType> = {
name: 'deploy_chat',
async execute(args: unknown, context?: { userId: string }) {
const parsed = DeployChatInput.parse(args)
const { action, workflowId, authType = 'public' } = parsed
if (!context?.userId) {
throw new Error('User authentication required')
}
logger.debug('Deploy Chat', { action, workflowId })
// Get workflow info
const [wf] = await db.select().from(workflow).where(eq(workflow.id, workflowId)).limit(1)
if (!wf) {
throw new Error(`Workflow not found: ${workflowId}`)
}
const appUrl = process.env.NEXT_PUBLIC_APP_URL || 'http://localhost:3000'
// Check for existing deployment
const [existingChat] = await db
.select()
.from(chat)
.where(and(eq(chat.workflowId, workflowId), eq(chat.isActive, true)))
.limit(1)
if (action === 'undeploy') {
if (!existingChat) {
return DeployChatResult.parse({
success: false,
action: 'undeploy',
isDeployed: false,
chatId: null,
chatUrl: null,
identifier: null,
title: null,
authType: null,
message: 'No active chat deployment found for this workflow',
error: 'No active chat deployment found',
errorCode: 'VALIDATION_ERROR',
})
}
// Deactivate the chat deployment
await db.update(chat).set({ isActive: false }).where(eq(chat.id, existingChat.id))
logger.info('Chat undeployed', { workflowId, chatId: existingChat.id })
return DeployChatResult.parse({
success: true,
action: 'undeploy',
isDeployed: false,
chatId: null,
chatUrl: null,
identifier: null,
title: null,
authType: null,
message: 'Chat deployment removed successfully.',
})
}
// Deploy action
const identifier =
parsed.identifier || existingChat?.identifier || generateIdentifier(wf.name || 'chat')
const title = parsed.title || existingChat?.title || wf.name || 'Chat'
const description = parsed.description ?? existingChat?.description ?? ''
const welcomeMessage =
parsed.welcomeMessage ||
(existingChat?.customizations as any)?.welcomeMessage ||
'Hi there! How can I help you today?'
const primaryColor =
(existingChat?.customizations as any)?.primaryColor || 'var(--brand-primary-hover-hex)'
const existingAllowedEmails = Array.isArray(existingChat?.allowedEmails)
? existingChat.allowedEmails
: []
const allowedEmails = parsed.allowedEmails || existingAllowedEmails
const outputConfigs = parsed.outputConfigs || existingChat?.outputConfigs || []
// Validate requirements
if (authType === 'password' && !parsed.password && !existingChat?.password) {
throw new Error('Password is required when using password protection')
}
if ((authType === 'email' || authType === 'sso') && allowedEmails.length === 0) {
throw new Error(`At least one email or domain is required when using ${authType} access`)
}
// Check if identifier is already in use by another workflow
if (!existingChat) {
const [existingIdentifier] = await db
.select({ id: chat.id })
.from(chat)
.where(and(eq(chat.identifier, identifier), eq(chat.isActive, true)))
.limit(1)
if (existingIdentifier) {
return DeployChatResult.parse({
success: false,
action: 'deploy',
isDeployed: false,
chatId: null,
chatUrl: null,
identifier,
title: null,
authType: null,
message: `The identifier "${identifier}" is already in use. Please choose a different one.`,
error: `Identifier "${identifier}" is already taken`,
errorCode: 'IDENTIFIER_TAKEN',
})
}
}
// Ensure workflow is deployed as API first
const [deployment] = await db
.select({ id: workflowDeploymentVersion.id })
.from(workflowDeploymentVersion)
.where(
and(
eq(workflowDeploymentVersion.workflowId, workflowId),
eq(workflowDeploymentVersion.isActive, true)
)
)
.limit(1)
if (!deployment) {
// Auto-deploy the API
const [maxVersion] = await db
.select({ version: workflowDeploymentVersion.version })
.from(workflowDeploymentVersion)
.where(eq(workflowDeploymentVersion.workflowId, workflowId))
.orderBy(desc(workflowDeploymentVersion.version))
.limit(1)
const newVersion = (maxVersion?.version || 0) + 1
const deploymentId = crypto.randomUUID()
const now = new Date()
// Load workflow state from normalized tables
const normalizedData = await loadWorkflowFromNormalizedTables(workflowId)
const workflowState = {
blocks: normalizedData?.blocks || {},
edges: normalizedData?.edges || [],
loops: normalizedData?.loops || {},
parallels: normalizedData?.parallels || {},
}
await db.insert(workflowDeploymentVersion).values({
id: deploymentId,
workflowId,
version: newVersion,
state: workflowState,
isActive: true,
createdAt: now,
})
logger.info('Auto-deployed API for chat', { workflowId, version: newVersion })
}
const now = new Date()
let chatId: string
if (existingChat) {
// Update existing deployment
await db
.update(chat)
.set({
identifier: identifier.trim(),
title: title.trim(),
description: description.trim(),
authType,
password: authType === 'password' ? parsed.password || existingChat.password : null,
allowedEmails: authType === 'email' || authType === 'sso' ? allowedEmails : [],
customizations: { primaryColor, welcomeMessage: welcomeMessage.trim() },
outputConfigs,
updatedAt: now,
})
.where(eq(chat.id, existingChat.id))
chatId = existingChat.id
logger.info('Updated chat deployment', { chatId })
} else {
// Create new deployment
chatId = crypto.randomUUID()
await db.insert(chat).values({
id: chatId,
workflowId,
userId: context.userId,
identifier: identifier.trim(),
title: title.trim(),
description: description.trim(),
authType,
password: authType === 'password' ? parsed.password : null,
allowedEmails: authType === 'email' || authType === 'sso' ? allowedEmails : [],
customizations: { primaryColor, welcomeMessage: welcomeMessage.trim() },
outputConfigs,
isActive: true,
createdAt: now,
updatedAt: now,
})
logger.info('Created chat deployment', { chatId })
}
const chatUrl = `${appUrl}/chat/${identifier}`
return DeployChatResult.parse({
success: true,
action: 'deploy',
isDeployed: true,
chatId,
chatUrl,
identifier,
title,
authType,
message: `Chat deployed successfully! Available at: ${chatUrl}`,
})
},
}

View File

@@ -0,0 +1,166 @@
import { db } from '@sim/db'
import {
workflow,
workflowDeploymentVersion,
workflowMcpServer,
workflowMcpTool,
} from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq } from 'drizzle-orm'
import { z } from 'zod'
import type { BaseServerTool } from '@/lib/copilot/tools/server/base-tool'
const logger = createLogger('DeployMcpServerTool')
const ParameterDescriptionSchema = z.object({
name: z.string(),
description: z.string(),
})
export const DeployMcpInput = z.object({
serverId: z.string().min(1),
workflowId: z.string().min(1),
toolName: z.string().optional(),
toolDescription: z.string().optional(),
parameterDescriptions: z.array(ParameterDescriptionSchema).optional(),
})
export const DeployMcpResult = z.object({
success: z.boolean(),
toolId: z.string().nullable(),
toolName: z.string().nullable(),
toolDescription: z.string().nullable(),
serverId: z.string().nullable(),
updated: z.boolean().optional(),
message: z.string(),
error: z.string().optional(),
})
export type DeployMcpInputType = z.infer<typeof DeployMcpInput>
export type DeployMcpResultType = z.infer<typeof DeployMcpResult>
export const deployMcpServerTool: BaseServerTool<DeployMcpInputType, DeployMcpResultType> = {
name: 'deploy_mcp',
async execute(args: unknown, context?: { userId: string }) {
const parsed = DeployMcpInput.parse(args)
const { serverId, workflowId, toolName, toolDescription, parameterDescriptions } = parsed
if (!context?.userId) {
throw new Error('User authentication required')
}
logger.debug('Deploy MCP', { serverId, workflowId })
// Get workflow info
const [wf] = await db.select().from(workflow).where(eq(workflow.id, workflowId)).limit(1)
if (!wf) {
throw new Error(`Workflow not found: ${workflowId}`)
}
// Check if server exists
const [server] = await db
.select()
.from(workflowMcpServer)
.where(eq(workflowMcpServer.id, serverId))
.limit(1)
if (!server) {
throw new Error(
'MCP server not found. Use list_workspace_mcp_servers to see available servers.'
)
}
// Check if workflow is deployed as API
const [deployment] = await db
.select({ id: workflowDeploymentVersion.id })
.from(workflowDeploymentVersion)
.where(
and(
eq(workflowDeploymentVersion.workflowId, workflowId),
eq(workflowDeploymentVersion.isActive, true)
)
)
.limit(1)
if (!deployment) {
throw new Error(
'Workflow must be deployed before adding as an MCP tool. Use deploy_api first.'
)
}
// Build parameter schema if provided
let parameterSchema: Record<string, unknown> | null = null
if (parameterDescriptions && parameterDescriptions.length > 0) {
const properties: Record<string, { description: string }> = {}
for (const param of parameterDescriptions) {
properties[param.name] = { description: param.description }
}
parameterSchema = { properties }
}
const finalToolName = toolName?.trim() || wf.name || 'workflow'
const finalToolDescription = toolDescription?.trim() || null
// Check if tool already exists for this workflow on this server
const [existingTool] = await db
.select()
.from(workflowMcpTool)
.where(
and(eq(workflowMcpTool.serverId, serverId), eq(workflowMcpTool.workflowId, workflowId))
)
.limit(1)
const now = new Date()
if (existingTool) {
// Update existing tool
await db
.update(workflowMcpTool)
.set({
toolName: finalToolName,
toolDescription: finalToolDescription,
parameterSchema,
updatedAt: now,
})
.where(eq(workflowMcpTool.id, existingTool.id))
logger.info('Updated MCP tool', { toolId: existingTool.id, toolName: finalToolName })
return DeployMcpResult.parse({
success: true,
toolId: existingTool.id,
toolName: finalToolName,
toolDescription: finalToolDescription,
serverId,
updated: true,
message: `Workflow MCP tool updated to "${finalToolName}".`,
})
}
// Create new tool
const toolId = crypto.randomUUID()
await db.insert(workflowMcpTool).values({
id: toolId,
serverId,
workflowId,
toolName: finalToolName,
toolDescription: finalToolDescription,
parameterSchema,
createdAt: now,
updatedAt: now,
})
logger.info('Created MCP tool', { toolId, toolName: finalToolName })
return DeployMcpResult.parse({
success: true,
toolId,
toolName: finalToolName,
toolDescription: finalToolDescription,
serverId,
message: `Workflow deployed as MCP tool "${finalToolName}" to server.`,
})
},
}

View File

@@ -0,0 +1,181 @@
import { db } from '@sim/db'
import { workflow } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { eq } from 'drizzle-orm'
import { z } from 'zod'
import { getBlockOutputPaths } from '@/lib/workflows/blocks/block-outputs'
import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/persistence/utils'
import { normalizeName } from '@/executor/constants'
import type { Loop, Parallel } from '@/stores/workflows/workflow/types'
import type { BaseServerTool } from '../base-tool'
const logger = createLogger('GetBlockOutputsServerTool')
export const GetBlockOutputsInput = z.object({
workflowId: z.string().min(1),
blockIds: z.array(z.string()).optional(),
})
const BlockOutputSchema = z.object({
blockId: z.string(),
blockName: z.string(),
blockType: z.string(),
triggerMode: z.boolean().optional(),
outputs: z.array(z.string()),
insideSubflowOutputs: z.array(z.string()).optional(),
outsideSubflowOutputs: z.array(z.string()).optional(),
})
const VariableOutputSchema = z.object({
id: z.string(),
name: z.string(),
type: z.string(),
tag: z.string(),
})
export const GetBlockOutputsResult = z.object({
blocks: z.array(BlockOutputSchema),
variables: z.array(VariableOutputSchema).optional(),
})
export type GetBlockOutputsInputType = z.infer<typeof GetBlockOutputsInput>
export type GetBlockOutputsResultType = z.infer<typeof GetBlockOutputsResult>
interface Variable {
id: string
name: string
type: string
}
function formatOutputsWithPrefix(paths: string[], blockName: string): string[] {
const normalizedName = normalizeName(blockName)
return paths.map((path) => `${normalizedName}.${path}`)
}
function getSubflowInsidePaths(
blockType: 'loop' | 'parallel',
blockId: string,
loops: Record<string, Loop>,
parallels: Record<string, Parallel>
): string[] {
const paths = ['index']
if (blockType === 'loop') {
const loopType = loops[blockId]?.loopType || 'for'
if (loopType === 'forEach') {
paths.push('currentItem', 'items')
}
} else {
const parallelType = parallels[blockId]?.parallelType || 'count'
if (parallelType === 'collection') {
paths.push('currentItem', 'items')
}
}
return paths
}
export const getBlockOutputsServerTool: BaseServerTool<
GetBlockOutputsInputType,
GetBlockOutputsResultType
> = {
name: 'get_block_outputs',
async execute(args: unknown, context?: { userId: string }) {
const parsed = GetBlockOutputsInput.parse(args)
const { workflowId, blockIds } = parsed
if (!context?.userId) {
throw new Error('User authentication required')
}
logger.debug('Getting block outputs', { workflowId, blockIds })
// Load workflow from normalized tables
const normalizedData = await loadWorkflowFromNormalizedTables(workflowId)
if (!normalizedData?.blocks) {
throw new Error('Workflow state is empty or invalid')
}
const blocks = normalizedData.blocks
const loops = normalizedData.loops || {}
const parallels = normalizedData.parallels || {}
const targetBlockIds = blockIds && blockIds.length > 0 ? blockIds : Object.keys(blocks)
const blockOutputs: GetBlockOutputsResultType['blocks'] = []
for (const blockId of targetBlockIds) {
const block = blocks[blockId]
if (!block?.type) continue
const blockName = block.name || block.type
const blockOutput: GetBlockOutputsResultType['blocks'][0] = {
blockId,
blockName,
blockType: block.type,
outputs: [],
}
// Include triggerMode if the block is in trigger mode
if (block.triggerMode) {
blockOutput.triggerMode = true
}
if (block.type === 'loop' || block.type === 'parallel') {
const insidePaths = getSubflowInsidePaths(block.type, blockId, loops, parallels)
blockOutput.insideSubflowOutputs = formatOutputsWithPrefix(insidePaths, blockName)
blockOutput.outsideSubflowOutputs = formatOutputsWithPrefix(['results'], blockName)
} else {
// Compute output paths using the block's subBlocks
const outputPaths = getBlockOutputPaths(block.type, block.subBlocks, block.triggerMode)
blockOutput.outputs = formatOutputsWithPrefix(outputPaths, blockName)
}
blockOutputs.push(blockOutput)
}
// Get workflow variables if no specific blockIds requested
let variables: GetBlockOutputsResultType['variables'] | undefined
const includeVariables = !blockIds || blockIds.length === 0
if (includeVariables) {
// Get variables from workflow record
const [wf] = await db
.select({ variables: workflow.variables })
.from(workflow)
.where(eq(workflow.id, workflowId))
.limit(1)
const workflowVariables = wf?.variables as Record<string, Variable> | null
if (workflowVariables && typeof workflowVariables === 'object') {
variables = Object.values(workflowVariables)
.filter(
(v): v is Variable =>
typeof v === 'object' &&
v !== null &&
'name' in v &&
typeof v.name === 'string' &&
v.name.trim() !== ''
)
.map((variable) => ({
id: variable.id,
name: variable.name,
type: variable.type || 'string',
tag: `variable.${normalizeName(variable.name)}`,
}))
}
}
logger.info('Retrieved block outputs', {
workflowId,
blockCount: blockOutputs.length,
variableCount: variables?.length ?? 0,
})
return GetBlockOutputsResult.parse({
blocks: blockOutputs,
...(variables && { variables }),
})
},
}

View File

@@ -0,0 +1,82 @@
import { db } from '@sim/db'
import { workflow } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { eq } from 'drizzle-orm'
import { z } from 'zod'
import type { BaseServerTool } from '@/lib/copilot/tools/server/base-tool'
import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/persistence/utils'
import { sanitizeForCopilot } from '@/lib/workflows/sanitization/json-sanitizer'
const logger = createLogger('GetUserWorkflowServerTool')
export const GetUserWorkflowInput = z.object({
workflowId: z.string().min(1),
})
export const GetUserWorkflowResult = z.object({
userWorkflow: z.string(),
workflowId: z.string(),
workflowName: z.string(),
})
export type GetUserWorkflowInputType = z.infer<typeof GetUserWorkflowInput>
export type GetUserWorkflowResultType = z.infer<typeof GetUserWorkflowResult>
export const getUserWorkflowServerTool: BaseServerTool<
GetUserWorkflowInputType,
GetUserWorkflowResultType
> = {
name: 'get_user_workflow',
async execute(args: unknown, context?: { userId: string }) {
const parsed = GetUserWorkflowInput.parse(args)
const { workflowId } = parsed
if (!context?.userId) {
throw new Error('User authentication required')
}
logger.debug('Getting user workflow', { workflowId })
// Get workflow metadata
const [wf] = await db
.select({ id: workflow.id, name: workflow.name, userId: workflow.userId })
.from(workflow)
.where(eq(workflow.id, workflowId))
.limit(1)
if (!wf) {
throw new Error(`Workflow not found: ${workflowId}`)
}
// Load workflow from normalized tables
const normalizedData = await loadWorkflowFromNormalizedTables(workflowId)
if (!normalizedData?.blocks || Object.keys(normalizedData.blocks).length === 0) {
throw new Error('Workflow state is empty or invalid')
}
// Build workflow state
const workflowState = {
blocks: normalizedData.blocks,
edges: normalizedData.edges || [],
loops: normalizedData.loops || {},
parallels: normalizedData.parallels || {},
}
// Sanitize for copilot (remove UI-specific data)
const sanitizedState = sanitizeForCopilot(workflowState as any)
const userWorkflow = JSON.stringify(sanitizedState, null, 2)
logger.info('Retrieved user workflow', {
workflowId,
workflowName: wf.name,
blockCount: Object.keys(normalizedData.blocks).length,
})
return GetUserWorkflowResult.parse({
userWorkflow,
workflowId,
workflowName: wf.name || 'Untitled',
})
},
}

View File

@@ -0,0 +1,86 @@
import { db } from '@sim/db'
import { workflow } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq, ilike } from 'drizzle-orm'
import { z } from 'zod'
import type { BaseServerTool } from '@/lib/copilot/tools/server/base-tool'
import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/persistence/utils'
import { sanitizeForCopilot } from '@/lib/workflows/sanitization/json-sanitizer'
const logger = createLogger('GetWorkflowFromNameServerTool')
export const GetWorkflowFromNameInput = z.object({
workflow_name: z.string().min(1),
})
export const GetWorkflowFromNameResult = z.object({
userWorkflow: z.string(),
workflowId: z.string(),
workflowName: z.string(),
})
export type GetWorkflowFromNameInputType = z.infer<typeof GetWorkflowFromNameInput>
export type GetWorkflowFromNameResultType = z.infer<typeof GetWorkflowFromNameResult>
export const getWorkflowFromNameServerTool: BaseServerTool<
GetWorkflowFromNameInputType,
GetWorkflowFromNameResultType
> = {
name: 'get_workflow_from_name',
async execute(args: unknown, context?: { userId: string }) {
const parsed = GetWorkflowFromNameInput.parse(args)
const workflowName = parsed.workflow_name.trim()
logger.debug('Executing get_workflow_from_name', {
workflowName,
userId: context?.userId,
})
if (!context?.userId) {
throw new Error('User ID is required')
}
// Find workflow by name (case-insensitive)
const workflows = await db
.select({ id: workflow.id, name: workflow.name })
.from(workflow)
.where(and(eq(workflow.userId, context.userId), ilike(workflow.name, workflowName)))
.limit(1)
if (workflows.length === 0) {
throw new Error(`Workflow not found: ${workflowName}`)
}
const wf = workflows[0]
// Load workflow from normalized tables
const normalizedData = await loadWorkflowFromNormalizedTables(wf.id)
if (!normalizedData?.blocks || Object.keys(normalizedData.blocks).length === 0) {
throw new Error('Workflow state is empty or invalid')
}
// Build workflow state from normalized data
const workflowState = {
blocks: normalizedData.blocks,
edges: normalizedData.edges || [],
loops: normalizedData.loops || {},
parallels: normalizedData.parallels || {},
}
// Sanitize for copilot
const sanitizedState = sanitizeForCopilot(workflowState as any)
const userWorkflow = JSON.stringify(sanitizedState, null, 2)
logger.info('Retrieved workflow by name', {
workflowId: wf.id,
workflowName: wf.name,
})
return GetWorkflowFromNameResult.parse({
userWorkflow,
workflowId: wf.id,
workflowName: wf.name || workflowName,
})
},
}

View File

@@ -0,0 +1,42 @@
import { db } from '@sim/db'
import { workflow } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { eq } from 'drizzle-orm'
import type { BaseServerTool } from '@/lib/copilot/tools/server/base-tool'
import {
type ListUserWorkflowsInputType,
ListUserWorkflowsResult,
type ListUserWorkflowsResultType,
} from '@/lib/copilot/tools/shared/schemas'
const logger = createLogger('ListUserWorkflowsServerTool')
export const listUserWorkflowsServerTool: BaseServerTool<
ListUserWorkflowsInputType,
ListUserWorkflowsResultType
> = {
name: 'list_user_workflows',
async execute(_args: unknown, context?: { userId: string }) {
logger.debug('Executing list_user_workflows', { userId: context?.userId })
if (!context?.userId) {
throw new Error('User ID is required to list workflows')
}
const workflows = await db
.select({ id: workflow.id, name: workflow.name })
.from(workflow)
.where(eq(workflow.userId, context.userId))
const names = workflows
.map((w) => w.name)
.filter((n): n is string => typeof n === 'string' && n.length > 0)
logger.info('Found workflows', { count: names.length, userId: context.userId })
return ListUserWorkflowsResult.parse({
workflow_names: names,
count: names.length,
})
},
}

View File

@@ -0,0 +1,83 @@
import { db } from '@sim/db'
import { workflowMcpServer, workflowMcpTool } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { eq } from 'drizzle-orm'
import { z } from 'zod'
import type { BaseServerTool } from '@/lib/copilot/tools/server/base-tool'
const logger = createLogger('ListWorkspaceMcpServersServerTool')
export const ListWorkspaceMcpServersInput = z.object({
workspaceId: z.string(),
})
export const ListWorkspaceMcpServersResult = z.object({
servers: z.array(
z.object({
id: z.string(),
name: z.string(),
description: z.string().nullable(),
toolCount: z.number(),
toolNames: z.array(z.string()),
})
),
count: z.number(),
message: z.string(),
})
export type ListWorkspaceMcpServersInputType = z.infer<typeof ListWorkspaceMcpServersInput>
export type ListWorkspaceMcpServersResultType = z.infer<typeof ListWorkspaceMcpServersResult>
export const listWorkspaceMcpServersServerTool: BaseServerTool<
ListWorkspaceMcpServersInputType,
ListWorkspaceMcpServersResultType
> = {
name: 'list_workspace_mcp_servers',
async execute(args: unknown, _context?: { userId: string }) {
const parsed = ListWorkspaceMcpServersInput.parse(args)
const { workspaceId } = parsed
logger.debug('Listing workspace MCP servers', { workspaceId })
// Get all MCP servers in the workspace with their tool counts
const servers = await db
.select({
id: workflowMcpServer.id,
name: workflowMcpServer.name,
description: workflowMcpServer.description,
})
.from(workflowMcpServer)
.where(eq(workflowMcpServer.workspaceId, workspaceId))
// Get tool names for each server
const serversWithTools = await Promise.all(
servers.map(async (server) => {
const tools = await db
.select({ toolName: workflowMcpTool.toolName })
.from(workflowMcpTool)
.where(eq(workflowMcpTool.serverId, server.id))
return {
id: server.id,
name: server.name,
description: server.description,
toolCount: tools.length,
toolNames: tools.map((t) => t.toolName),
}
})
)
const message =
serversWithTools.length === 0
? 'No MCP servers found in this workspace. Use create_workspace_mcp_server to create one.'
: `Found ${serversWithTools.length} MCP server(s) in the workspace.`
logger.info('Listed MCP servers', { workspaceId, count: serversWithTools.length })
return ListWorkspaceMcpServersResult.parse({
servers: serversWithTools,
count: serversWithTools.length,
message,
})
},
}

View File

@@ -0,0 +1,87 @@
import { db } from '@sim/db'
import { workflow, workflowDeploymentVersion } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { desc, eq } from 'drizzle-orm'
import { z } from 'zod'
import type { BaseServerTool } from '@/lib/copilot/tools/server/base-tool'
import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/persistence/utils'
const logger = createLogger('RedeployServerTool')
export const RedeployInput = z.object({
workflowId: z.string(),
})
export const RedeployResult = z.object({
success: z.boolean(),
workflowId: z.string(),
deployedAt: z.string().nullable(),
message: z.string(),
})
export type RedeployInputType = z.infer<typeof RedeployInput>
export type RedeployResultType = z.infer<typeof RedeployResult>
export const redeployServerTool: BaseServerTool<RedeployInputType, RedeployResultType> = {
name: 'redeploy',
async execute(args: unknown, _context?: { userId: string }) {
const parsed = RedeployInput.parse(args)
const { workflowId } = parsed
logger.debug('Redeploying workflow', { workflowId })
// Get workflow state
const [wf] = await db.select().from(workflow).where(eq(workflow.id, workflowId)).limit(1)
if (!wf) {
throw new Error(`Workflow not found: ${workflowId}`)
}
// Get current max version
const [maxVersion] = await db
.select({ version: workflowDeploymentVersion.version })
.from(workflowDeploymentVersion)
.where(eq(workflowDeploymentVersion.workflowId, workflowId))
.orderBy(desc(workflowDeploymentVersion.version))
.limit(1)
const newVersion = (maxVersion?.version || 0) + 1
// Deactivate all existing versions
await db
.update(workflowDeploymentVersion)
.set({ isActive: false })
.where(eq(workflowDeploymentVersion.workflowId, workflowId))
// Create new deployment version
const deploymentId = crypto.randomUUID()
const now = new Date()
// Load workflow state from normalized tables
const normalizedData = await loadWorkflowFromNormalizedTables(workflowId)
const workflowState = {
blocks: normalizedData?.blocks || {},
edges: normalizedData?.edges || [],
loops: normalizedData?.loops || {},
parallels: normalizedData?.parallels || {},
}
await db.insert(workflowDeploymentVersion).values({
id: deploymentId,
workflowId,
version: newVersion,
state: workflowState,
isActive: true,
createdAt: now,
})
logger.info('Workflow redeployed', { workflowId, version: newVersion })
return RedeployResult.parse({
success: true,
workflowId,
deployedAt: now.toISOString(),
message: `Workflow redeployed (version ${newVersion})`,
})
},
}

View File

@@ -0,0 +1,130 @@
import { db } from '@sim/db'
import { workflow, workflowDeploymentVersion } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq } from 'drizzle-orm'
import { z } from 'zod'
import type { BaseServerTool } from '@/lib/copilot/tools/server/base-tool'
const logger = createLogger('RunWorkflowServerTool')
export const RunWorkflowInput = z.object({
workflowId: z.string().min(1),
workflow_input: z.record(z.any()).optional(),
})
export const RunWorkflowResult = z.object({
success: z.boolean(),
executionId: z.string().nullable(),
executionStartTime: z.string().nullable(),
output: z.any().nullable(),
message: z.string(),
error: z.string().optional(),
})
export type RunWorkflowInputType = z.infer<typeof RunWorkflowInput>
export type RunWorkflowResultType = z.infer<typeof RunWorkflowResult>
export const runWorkflowServerTool: BaseServerTool<RunWorkflowInputType, RunWorkflowResultType> = {
name: 'run_workflow',
async execute(args: unknown, context?: { userId: string }) {
const parsed = RunWorkflowInput.parse(args)
const { workflowId, workflow_input } = parsed
if (!context?.userId) {
throw new Error('User authentication required')
}
logger.debug('Running workflow', { workflowId, hasInput: !!workflow_input })
// Get workflow info
const [wf] = await db.select().from(workflow).where(eq(workflow.id, workflowId)).limit(1)
if (!wf) {
throw new Error(`Workflow not found: ${workflowId}`)
}
// Check if workflow is deployed
const [deployment] = await db
.select({ id: workflowDeploymentVersion.id })
.from(workflowDeploymentVersion)
.where(
and(
eq(workflowDeploymentVersion.workflowId, workflowId),
eq(workflowDeploymentVersion.isActive, true)
)
)
.limit(1)
const executionId = crypto.randomUUID()
const executionStartTime = new Date().toISOString()
// If workflow is deployed, we can use the execute API
// Otherwise we need to execute directly
const appUrl = process.env.NEXT_PUBLIC_APP_URL || 'http://localhost:3000'
try {
// Call the internal execution endpoint
// Note: For server-side execution without a browser, we call the API directly
const executeUrl = `${appUrl}/api/workflows/${workflowId}/execute`
const response = await fetch(executeUrl, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
// Use internal auth header
'X-Internal-Auth': context.userId,
},
body: JSON.stringify({
input: workflow_input || {},
executionId,
}),
})
if (!response.ok) {
const errorText = await response.text().catch(() => '')
throw new Error(errorText || `Execution failed with status ${response.status}`)
}
const result = await response.json()
// Determine success from result
const succeeded = result.success !== false
const output = result.output || result.result || null
if (succeeded) {
logger.info('Workflow execution completed', { workflowId, executionId })
return RunWorkflowResult.parse({
success: true,
executionId,
executionStartTime,
output,
message: `Workflow execution completed. Started at: ${executionStartTime}`,
})
}
const errorMessage = result.error || 'Workflow execution failed'
logger.error('Workflow execution failed', { workflowId, error: errorMessage })
return RunWorkflowResult.parse({
success: false,
executionId,
executionStartTime,
output: null,
message: errorMessage,
error: errorMessage,
})
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error)
logger.error('Workflow execution error', { workflowId, error: errorMessage })
return RunWorkflowResult.parse({
success: false,
executionId,
executionStartTime,
output: null,
message: errorMessage,
error: errorMessage,
})
}
},
}

View File

@@ -0,0 +1,161 @@
import { db } from '@sim/db'
import { workflow } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { eq } from 'drizzle-orm'
import { z } from 'zod'
import type { BaseServerTool } from '@/lib/copilot/tools/server/base-tool'
const logger = createLogger('SetGlobalWorkflowVariablesServerTool')
const OperationItemSchema = z.object({
operation: z.enum(['add', 'edit', 'delete']),
name: z.string(),
type: z.enum(['plain', 'number', 'boolean', 'array', 'object']).optional(),
value: z.string().optional(),
})
export const SetGlobalWorkflowVariablesInput = z.object({
workflowId: z.string(),
operations: z.array(OperationItemSchema),
})
export const SetGlobalWorkflowVariablesResult = z.object({
success: z.boolean(),
message: z.string(),
variables: z.record(z.unknown()),
})
export type SetGlobalWorkflowVariablesInputType = z.infer<typeof SetGlobalWorkflowVariablesInput>
export type SetGlobalWorkflowVariablesResultType = z.infer<typeof SetGlobalWorkflowVariablesResult>
function coerceValue(
value: string | undefined,
type?: 'plain' | 'number' | 'boolean' | 'array' | 'object'
): unknown {
if (value === undefined) return value
const t = type || 'plain'
try {
if (t === 'number') {
const n = Number(value)
if (Number.isNaN(n)) return value
return n
}
if (t === 'boolean') {
const v = String(value).trim().toLowerCase()
if (v === 'true') return true
if (v === 'false') return false
return value
}
if (t === 'array' || t === 'object') {
const parsed = JSON.parse(value)
if (t === 'array' && Array.isArray(parsed)) return parsed
if (t === 'object' && parsed && typeof parsed === 'object' && !Array.isArray(parsed))
return parsed
return value
}
} catch {
// Fall through to return value as-is
}
return value
}
export const setGlobalWorkflowVariablesServerTool: BaseServerTool<
SetGlobalWorkflowVariablesInputType,
SetGlobalWorkflowVariablesResultType
> = {
name: 'set_global_workflow_variables',
async execute(args: unknown, _context?: { userId: string }) {
const parsed = SetGlobalWorkflowVariablesInput.parse(args)
const { workflowId, operations } = parsed
logger.debug('Setting workflow variables', { workflowId, operationCount: operations.length })
// Get current workflow variables
const [wf] = await db
.select({ variables: workflow.variables })
.from(workflow)
.where(eq(workflow.id, workflowId))
.limit(1)
if (!wf) {
throw new Error(`Workflow not found: ${workflowId}`)
}
const currentVarsRecord = (wf.variables as Record<string, unknown>) || {}
// Build mutable map by variable name
const byName: Record<string, Record<string, unknown>> = {}
Object.values(currentVarsRecord).forEach((v: unknown) => {
if (v && typeof v === 'object' && 'id' in v && 'name' in v) {
const variable = v as Record<string, unknown>
byName[String(variable.name)] = variable
}
})
// Apply operations in order
for (const op of operations) {
const key = String(op.name)
const nextType = op.type || (byName[key]?.type as string) || 'plain'
if (op.operation === 'delete') {
delete byName[key]
continue
}
const typedValue = coerceValue(
op.value,
nextType as 'plain' | 'number' | 'boolean' | 'array' | 'object'
)
if (op.operation === 'add') {
byName[key] = {
id: crypto.randomUUID(),
workflowId,
name: key,
type: nextType,
value: typedValue,
}
continue
}
if (op.operation === 'edit') {
if (!byName[key]) {
// If editing a non-existent variable, create it
byName[key] = {
id: crypto.randomUUID(),
workflowId,
name: key,
type: nextType,
value: typedValue,
}
} else {
byName[key] = {
...byName[key],
type: nextType,
...(op.value !== undefined ? { value: typedValue } : {}),
}
}
}
}
// Convert byName (keyed by name) to record keyed by ID for storage
const variablesRecord: Record<string, unknown> = {}
for (const v of Object.values(byName)) {
variablesRecord[v.id as string] = v
}
// Update workflow variables
await db.update(workflow).set({ variables: variablesRecord }).where(eq(workflow.id, workflowId))
logger.info('Updated workflow variables', {
workflowId,
variableCount: Object.keys(byName).length,
})
return SetGlobalWorkflowVariablesResult.parse({
success: true,
message: 'Workflow variables updated',
variables: byName,
})
},
}

View File

@@ -1,5 +1,9 @@
import { z } from 'zod'
// ============================================================================
// Common Schemas (shared across multiple tools and API routes)
// ============================================================================
// Generic envelope used by client to validate API responses
export const ExecuteResponseSuccessSchema = z.object({
success: z.literal(true),
@@ -7,6 +11,85 @@ export const ExecuteResponseSuccessSchema = z.object({
})
export type ExecuteResponseSuccess = z.infer<typeof ExecuteResponseSuccessSchema>
/**
* Standard tool error structure.
* Used in ToolResult and tool_result events.
*/
export const ToolErrorSchema = z.object({
code: z.string(),
message: z.string(),
details: z.record(z.unknown()).optional(),
})
export type ToolError = z.infer<typeof ToolErrorSchema>
/**
* Standard tool result structure.
* This is the canonical format for tool execution results across the system.
*/
export const ToolResultSchema = z.object({
success: z.boolean(),
data: z.unknown().optional(),
error: ToolErrorSchema.optional(),
})
export type ToolResultType = z.infer<typeof ToolResultSchema>
/**
* Mark-complete payload schema.
* Used when calling the Go copilot's mark-complete endpoint.
*/
export const MarkCompletePayloadSchema = z.object({
/** Tool call ID */
id: z.string(),
/** Tool name */
name: z.string(),
/** HTTP-like status code (200 = success, 4xx = client error, 5xx = server error) */
status: z.number().int().min(100).max(599),
/** Optional message (typically error message or success description) */
message: z.unknown().optional(),
/** Optional data payload (tool result data) */
data: z.unknown().optional(),
})
export type MarkCompletePayload = z.infer<typeof MarkCompletePayloadSchema>
/**
* Tool result event from Go (received via SSE stream).
* This represents what we receive back after mark-complete.
*/
export const ToolResultEventSchema = z.object({
toolCallId: z.string().optional(),
data: z
.object({
id: z.string().optional(),
})
.optional(),
success: z.boolean().optional(),
failedDependency: z.boolean().optional(),
result: z
.object({
skipped: z.boolean().optional(),
})
.passthrough()
.optional(),
})
export type ToolResultEvent = z.infer<typeof ToolResultEventSchema>
/**
* Helper to extract toolCallId from tool_result event data.
* Handles the various formats Go might send.
*/
export function extractToolCallId(data: unknown): string | undefined {
if (!data || typeof data !== 'object') return undefined
const d = data as Record<string, unknown>
// Try direct toolCallId first
if (typeof d.toolCallId === 'string') return d.toolCallId
// Then try nested data.id
if (d.data && typeof d.data === 'object') {
const nested = d.data as Record<string, unknown>
if (typeof nested.id === 'string') return nested.id
}
return undefined
}
// get_blocks_and_tools
export const GetBlocksAndToolsInput = z.object({})
export const GetBlocksAndToolsResult = z.object({
@@ -176,3 +259,187 @@ export const GetBlockUpstreamReferencesResult = z.object({
})
export type GetBlockUpstreamReferencesInputType = z.infer<typeof GetBlockUpstreamReferencesInput>
export type GetBlockUpstreamReferencesResultType = z.infer<typeof GetBlockUpstreamReferencesResult>
// ============================================================================
// Search Tools
// ============================================================================
// search_documentation
export const SearchDocumentationInput = z.object({
query: z.string().min(1),
topK: z.number().min(1).max(50).optional().default(10),
threshold: z.number().min(0).max(1).optional(),
})
export const SearchDocumentationResult = z.object({
results: z.array(
z.object({
id: z.number(),
title: z.string(),
url: z.string(),
content: z.string(),
similarity: z.number(),
})
),
query: z.string(),
totalResults: z.number(),
})
export type SearchDocumentationInputType = z.infer<typeof SearchDocumentationInput>
export type SearchDocumentationResultType = z.infer<typeof SearchDocumentationResult>
// search_online
export const SearchOnlineInput = z.object({
query: z.string().min(1),
num: z.number().min(1).max(100).optional().default(10),
type: z.string().optional().default('search'),
gl: z.string().optional(),
hl: z.string().optional(),
})
export const SearchOnlineResult = z.object({
results: z.array(z.record(z.unknown())),
query: z.string(),
type: z.string(),
totalResults: z.number(),
source: z.enum(['exa', 'serper']),
})
export type SearchOnlineInputType = z.infer<typeof SearchOnlineInput>
export type SearchOnlineResultType = z.infer<typeof SearchOnlineResult>
// make_api_request
export const MakeApiRequestInput = z.object({
url: z.string().url(),
method: z.enum(['GET', 'POST', 'PUT']),
queryParams: z.record(z.union([z.string(), z.number(), z.boolean()])).optional(),
headers: z.record(z.string()).optional(),
body: z.unknown().optional(),
})
export const MakeApiRequestResult = z.object({
data: z.unknown(),
status: z.number(),
headers: z.record(z.unknown()).optional(),
truncated: z.boolean().optional(),
totalChars: z.number().optional(),
previewChars: z.number().optional(),
note: z.string().optional(),
})
export type MakeApiRequestInputType = z.infer<typeof MakeApiRequestInput>
export type MakeApiRequestResultType = z.infer<typeof MakeApiRequestResult>
// ============================================================================
// Workflow Tools
// ============================================================================
// edit_workflow - input is complex, using passthrough for flexibility
export const EditWorkflowInput = z.object({
workflowId: z.string(),
operations: z.array(z.record(z.unknown())),
currentUserWorkflow: z.unknown().optional(),
})
export const EditWorkflowResult = z.object({
success: z.boolean(),
workflowState: z.unknown(),
inputValidationErrors: z.array(z.string()).optional(),
inputValidationMessage: z.string().optional(),
skippedItems: z.array(z.string()).optional(),
skippedItemsMessage: z.string().optional(),
})
export type EditWorkflowInputType = z.infer<typeof EditWorkflowInput>
export type EditWorkflowResultType = z.infer<typeof EditWorkflowResult>
// get_workflow_console
export const GetWorkflowConsoleInput = z.object({
workflowId: z.string(),
limit: z.number().min(1).max(50).optional().default(2),
includeDetails: z.boolean().optional().default(false),
})
export const GetWorkflowConsoleResult = z.array(
z.object({
executionId: z.string(),
startedAt: z.string(),
blocks: z.array(
z.object({
id: z.string(),
name: z.string(),
startedAt: z.string(),
endedAt: z.string(),
durationMs: z.number(),
output: z.unknown(),
error: z.string().optional(),
})
),
})
)
export type GetWorkflowConsoleInputType = z.infer<typeof GetWorkflowConsoleInput>
export type GetWorkflowConsoleResultType = z.infer<typeof GetWorkflowConsoleResult>
// list_user_workflows
export const ListUserWorkflowsInput = z.object({})
export const ListUserWorkflowsResult = z.object({
workflow_names: z.array(z.string()),
count: z.number(),
})
export type ListUserWorkflowsInputType = z.infer<typeof ListUserWorkflowsInput>
export type ListUserWorkflowsResultType = z.infer<typeof ListUserWorkflowsResult>
// ============================================================================
// User Tools
// ============================================================================
// get_credentials
export const GetCredentialsInput = z.object({
workflowId: z.string().optional(),
})
export const GetCredentialsResult = z.object({
oauth: z.object({
connected: z.object({
credentials: z.array(
z.object({
id: z.string(),
name: z.string(),
provider: z.string(),
serviceName: z.string(),
lastUsed: z.string(),
isDefault: z.boolean(),
accessToken: z.string().nullable(),
})
),
total: z.number(),
}),
notConnected: z.object({
services: z.array(
z.object({
providerId: z.string(),
name: z.string(),
description: z.string().optional(),
baseProvider: z.string().optional(),
})
),
total: z.number(),
}),
}),
environment: z.object({
variableNames: z.array(z.string()),
count: z.number(),
personalVariables: z.array(z.string()),
workspaceVariables: z.array(z.string()),
conflicts: z.array(z.string()).optional(),
}),
})
export type GetCredentialsInputType = z.infer<typeof GetCredentialsInput>
export type GetCredentialsResultType = z.infer<typeof GetCredentialsResult>
// set_environment_variables
export const SetEnvironmentVariablesInput = z.object({
variables: z.array(
z.object({
key: z.string(),
value: z.string(),
})
),
workspaceId: z.string().optional(),
})
export const SetEnvironmentVariablesResult = z.object({
success: z.boolean(),
message: z.string(),
savedCount: z.number().optional(),
variables: z.array(z.string()).optional(),
})

View File

@@ -5,6 +5,11 @@ import { create } from 'zustand'
import { devtools } from 'zustand/middleware'
import { type CopilotChat, sendStreamingMessage } from '@/lib/copilot/api'
import type { CopilotTransportMode } from '@/lib/copilot/models'
import {
isServerExecutedToolSync,
markToolCallServerHandled,
prefetchServerExecutedTools,
} from '@/lib/copilot/server-executed-tools'
import type {
BaseClientToolMetadata,
ClientToolDisplay,
@@ -75,6 +80,7 @@ import { ManageMcpToolClientTool } from '@/lib/copilot/tools/client/workflow/man
import { RedeployClientTool } from '@/lib/copilot/tools/client/workflow/redeploy'
import { RunWorkflowClientTool } from '@/lib/copilot/tools/client/workflow/run-workflow'
import { SetGlobalWorkflowVariablesClientTool } from '@/lib/copilot/tools/client/workflow/set-global-workflow-variables'
import { extractToolCallId } from '@/lib/copilot/tools/shared/schemas'
import { getQueryClient } from '@/app/_shell/providers/query-provider'
import { subscriptionKeys } from '@/hooks/queries/subscription'
import type {
@@ -347,6 +353,17 @@ function isBackgroundState(state: any): boolean {
}
}
/**
* Checks if a tool call state is aborted
*/
function isAbortedState(state: any): boolean {
try {
return state === 'aborted' || state === (ClientToolCallState as any).aborted
} catch {
return state === 'aborted'
}
}
/**
* Checks if a tool call state is terminal (success, error, rejected, aborted, review, or background)
*/
@@ -1138,8 +1155,8 @@ const sseHandlers: Record<string, SSEHandler> = {
},
tool_result: (data, context, get, set) => {
try {
const toolCallId: string | undefined = data?.toolCallId || data?.data?.id
const success: boolean | undefined = data?.success
const toolCallId = extractToolCallId(data)
const success: boolean | undefined = (data as Record<string, unknown>)?.success as boolean
const failedDependency: boolean = data?.failedDependency === true
const skipped: boolean = data?.result?.skipped === true
if (!toolCallId) return
@@ -1149,9 +1166,10 @@ const sseHandlers: Record<string, SSEHandler> = {
if (
isRejectedState(current.state) ||
isReviewState(current.state) ||
isBackgroundState(current.state)
isBackgroundState(current.state) ||
isAbortedState(current.state)
) {
// Preserve terminal review/rejected state; do not override
// Preserve terminal review/rejected/aborted state; do not override
return
}
const targetState = success
@@ -1207,7 +1225,8 @@ const sseHandlers: Record<string, SSEHandler> = {
if (
isRejectedState(b.toolCall?.state) ||
isReviewState(b.toolCall?.state) ||
isBackgroundState(b.toolCall?.state)
isBackgroundState(b.toolCall?.state) ||
isAbortedState(b.toolCall?.state)
)
break
const targetState = success
@@ -1236,8 +1255,8 @@ const sseHandlers: Record<string, SSEHandler> = {
},
tool_error: (data, context, get, set) => {
try {
const toolCallId: string | undefined = data?.toolCallId || data?.data?.id
const failedDependency: boolean = data?.failedDependency === true
const toolCallId = extractToolCallId(data)
const failedDependency: boolean = (data as Record<string, unknown>)?.failedDependency === true
if (!toolCallId) return
const { toolCallsById } = get()
const current = toolCallsById[toolCallId]
@@ -1245,7 +1264,8 @@ const sseHandlers: Record<string, SSEHandler> = {
if (
isRejectedState(current.state) ||
isReviewState(current.state) ||
isBackgroundState(current.state)
isBackgroundState(current.state) ||
isAbortedState(current.state)
) {
return
}
@@ -1271,7 +1291,8 @@ const sseHandlers: Record<string, SSEHandler> = {
if (
isRejectedState(b.toolCall?.state) ||
isReviewState(b.toolCall?.state) ||
isBackgroundState(b.toolCall?.state)
isBackgroundState(b.toolCall?.state) ||
isAbortedState(b.toolCall?.state)
)
break
const targetState = failedDependency
@@ -1362,6 +1383,28 @@ const sseHandlers: Record<string, SSEHandler> = {
return
}
// Check if this tool is executed server-side
// If so, skip client execution - the server will handle it and send tool_result
if (name && isServerExecutedToolSync(name)) {
markToolCallServerHandled(id, name)
logger.info('[toolCallsById] Tool is server-executed, skipping client execution', {
id,
name,
})
// Update state to executing to show progress in UI
const executingMap = { ...get().toolCallsById }
executingMap[id] = {
...executingMap[id],
state: ClientToolCallState.executing,
display: resolveToolDisplay(name, ClientToolCallState.executing, id, args),
}
set({ toolCallsById: executingMap })
// Update inline content block
upsertToolCallBlock(context, executingMap[id])
updateStreamingMessage(set, context)
return
}
// Prefer interface-based registry to determine interrupt and execute
try {
const def = name ? getTool(name) : undefined
@@ -1419,11 +1462,12 @@ const sseHandlers: Record<string, SSEHandler> = {
? result.status >= 200 && result.status < 300
: true
const completeMap = { ...get().toolCallsById }
// Do not override terminal review/rejected
// Do not override terminal review/rejected/aborted
if (
isRejectedState(completeMap[id]?.state) ||
isReviewState(completeMap[id]?.state) ||
isBackgroundState(completeMap[id]?.state)
isBackgroundState(completeMap[id]?.state) ||
isAbortedState(completeMap[id]?.state)
) {
return
}
@@ -1461,11 +1505,12 @@ const sseHandlers: Record<string, SSEHandler> = {
})
.catch((e) => {
const errorMap = { ...get().toolCallsById }
// Do not override terminal review/rejected
// Do not override terminal review/rejected/aborted
if (
isRejectedState(errorMap[id]?.state) ||
isReviewState(errorMap[id]?.state) ||
isBackgroundState(errorMap[id]?.state)
isBackgroundState(errorMap[id]?.state) ||
isAbortedState(errorMap[id]?.state)
) {
return
}
@@ -1530,11 +1575,12 @@ const sseHandlers: Record<string, SSEHandler> = {
})
.catch(() => {
const errorMap = { ...get().toolCallsById }
// Do not override terminal review/rejected
// Do not override terminal review/rejected/aborted
if (
isRejectedState(errorMap[id]?.state) ||
isReviewState(errorMap[id]?.state) ||
isBackgroundState(errorMap[id]?.state)
isBackgroundState(errorMap[id]?.state) ||
isAbortedState(errorMap[id]?.state)
) {
return
}
@@ -2157,8 +2203,8 @@ const subAgentSSEHandlers: Record<string, SSEHandler> = {
const parentToolCallId = context.subAgentParentToolCallId
if (!parentToolCallId) return
const toolCallId: string | undefined = data?.toolCallId || data?.data?.id
const success: boolean | undefined = data?.success !== false // Default to true if not specified
const toolCallId = extractToolCallId(data)
const success: boolean | undefined = (data as Record<string, unknown>)?.success !== false // Default to true if not specified
if (!toolCallId) return
// Initialize if needed
@@ -2173,6 +2219,12 @@ const subAgentSSEHandlers: Record<string, SSEHandler> = {
if (existingIndex >= 0) {
const existing = context.subAgentToolCalls[parentToolCallId][existingIndex]
// Preserve aborted state - don't override if user aborted
if (isAbortedState(existing.state)) {
return
}
const updatedSubAgentToolCall = {
...existing,
state: targetState,
@@ -2191,6 +2243,10 @@ const subAgentSSEHandlers: Record<string, SSEHandler> = {
// Update the individual tool call in toolCallsById so ToolCall component gets latest state
const { toolCallsById } = get()
if (toolCallsById[toolCallId]) {
// Also check toolCallsById state in case it was aborted there
if (isAbortedState(toolCallsById[toolCallId].state)) {
return
}
const updatedMap = {
...toolCallsById,
[toolCallId]: updatedSubAgentToolCall,
@@ -2385,6 +2441,9 @@ export const useCopilotStore = create<CopilotStore>()(
const { isSendingMessage } = get()
if (isSendingMessage) get().abortMessage()
// Prefetch server-executed tools list (for skipping client execution)
prefetchServerExecutedTools()
// Abort all in-progress tools and clear any diff preview
abortAllInProgressTools(set, get)
try {
@@ -3063,9 +3122,9 @@ export const useCopilotStore = create<CopilotStore>()(
const map = { ...get().toolCallsById }
const current = map[id]
if (!current) return
// Preserve rejected state from being overridden
// Preserve rejected/aborted state from being overridden with success
if (
isRejectedState(current.state) &&
(isRejectedState(current.state) || isAbortedState(current.state)) &&
(newState === 'success' || newState === (ClientToolCallState as any).success)
) {
return
@@ -3143,8 +3202,11 @@ export const useCopilotStore = create<CopilotStore>()(
if (!id) return
const current = toolCallsById[id]
if (!current) return
// Do not override a rejected tool with success
if (isRejectedState(current.state) && targetState === (ClientToolCallState as any).success) {
// Do not override a rejected or aborted tool with success
if (
(isRejectedState(current.state) || isAbortedState(current.state)) &&
targetState === (ClientToolCallState as any).success
) {
return
}
@@ -3862,11 +3924,12 @@ export const useCopilotStore = create<CopilotStore>()(
const success = result.success && result.result?.success
const completeMap = { ...get().toolCallsById }
// Do not override terminal review/rejected
// Do not override terminal review/rejected/aborted
if (
isRejectedState(completeMap[id]?.state) ||
isReviewState(completeMap[id]?.state) ||
isBackgroundState(completeMap[id]?.state)
isBackgroundState(completeMap[id]?.state) ||
isAbortedState(completeMap[id]?.state)
) {
return
}
@@ -3911,11 +3974,12 @@ export const useCopilotStore = create<CopilotStore>()(
} catch {}
} catch (e) {
const errorMap = { ...get().toolCallsById }
// Do not override terminal review/rejected
// Do not override terminal review/rejected/aborted
if (
isRejectedState(errorMap[id]?.state) ||
isReviewState(errorMap[id]?.state) ||
isBackgroundState(errorMap[id]?.state)
isBackgroundState(errorMap[id]?.state) ||
isAbortedState(errorMap[id]?.state)
) {
return
}