Merge pull request #1286 from simstudioai/fix/copilot-custom-tools

fix(copilot): custom tools
This commit is contained in:
Siddharth Ganesan
2025-09-08 17:23:58 -07:00
committed by GitHub
4 changed files with 279 additions and 146 deletions

View File

@@ -5,6 +5,7 @@ import { getSession } from '@/lib/auth'
import { createLogger } from '@/lib/logs/console/logger'
import { getUserEntityPermissions } from '@/lib/permissions/utils'
import { saveWorkflowToNormalizedTables } from '@/lib/workflows/db-helpers'
import { sanitizeAgentToolsInBlocks } from '@/lib/workflows/validation'
import { db } from '@/db'
import { workflow } from '@/db/schema'
@@ -168,11 +169,14 @@ export async function PUT(request: NextRequest, { params }: { params: Promise<{
return NextResponse.json({ error: 'Access denied' }, { status: 403 })
}
// Sanitize custom tools in agent blocks before saving
const { blocks: sanitizedBlocks, warnings } = sanitizeAgentToolsInBlocks(state.blocks as any)
// Save to normalized tables
// Ensure all required fields are present for WorkflowState type
// Filter out blocks without type or name before saving
const filteredBlocks = Object.entries(state.blocks).reduce(
(acc, [blockId, block]) => {
const filteredBlocks = Object.entries(sanitizedBlocks).reduce(
(acc, [blockId, block]: [string, any]) => {
if (block.type && block.name) {
// Ensure all required fields are present
acc[blockId] = {
@@ -184,7 +188,6 @@ export async function PUT(request: NextRequest, { params }: { params: Promise<{
height: block.height !== undefined ? block.height : 0,
subBlocks: block.subBlocks || {},
outputs: block.outputs || {},
data: block.data || {},
}
}
return acc
@@ -226,30 +229,21 @@ export async function PUT(request: NextRequest, { params }: { params: Promise<{
const elapsed = Date.now() - startTime
logger.info(`[${requestId}] Successfully saved workflow ${workflowId} state in ${elapsed}ms`)
return NextResponse.json(
{
success: true,
blocksCount: Object.keys(filteredBlocks).length,
edgesCount: state.edges.length,
},
{ status: 200 }
)
} catch (error: unknown) {
return NextResponse.json({ success: true, warnings }, { status: 200 })
} catch (error: any) {
const elapsed = Date.now() - startTime
if (error instanceof z.ZodError) {
logger.warn(`[${requestId}] Invalid workflow state data for ${workflowId}`, {
errors: error.errors,
})
return NextResponse.json(
{ error: 'Invalid state data', details: error.errors },
{ status: 400 }
)
}
logger.error(
`[${requestId}] Error saving workflow ${workflowId} state after ${elapsed}ms`,
error
)
if (error instanceof z.ZodError) {
return NextResponse.json(
{ error: 'Invalid request body', details: error.errors },
{ status: 400 }
)
}
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
}
}

View File

@@ -1,39 +1,63 @@
import crypto from 'crypto'
import { eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { z } from 'zod'
import { env } from '@/lib/env'
import { createLogger } from '@/lib/logs/console/logger'
import { getUserEntityPermissions } from '@/lib/permissions/utils'
import { SIM_AGENT_API_URL_DEFAULT, simAgentClient } from '@/lib/sim-agent'
import { simAgentClient } from '@/lib/sim-agent'
import {
loadWorkflowFromNormalizedTables,
saveWorkflowToNormalizedTables,
} from '@/lib/workflows/db-helpers'
import { getUserId as getOAuthUserId } from '@/app/api/auth/oauth/utils'
import { getBlock } from '@/blocks'
import { getAllBlocks } from '@/blocks/registry'
import { sanitizeAgentToolsInBlocks } from '@/lib/workflows/validation'
import { getUserId } from '@/app/api/auth/oauth/utils'
import { getAllBlocks, getBlock } from '@/blocks'
import type { BlockConfig } from '@/blocks/types'
import { resolveOutputType } from '@/blocks/utils'
import { db } from '@/db'
import { workflowCheckpoints, workflow as workflowTable } from '@/db/schema'
import { customTools, workflowCheckpoints, workflow as workflowTable } from '@/db/schema'
import { generateLoopBlocks, generateParallelBlocks } from '@/stores/workflows/workflow/utils'
const SIM_AGENT_API_URL = env.SIM_AGENT_API_URL || SIM_AGENT_API_URL_DEFAULT
export const dynamic = 'force-dynamic'
const logger = createLogger('WorkflowYamlAPI')
const logger = createLogger('YamlWorkflowAPI')
const YamlWorkflowRequestSchema = z.object({
yamlContent: z.string().min(1, 'YAML content is required'),
description: z.string().optional(),
chatId: z.string().optional(), // For copilot checkpoints
source: z.enum(['copilot', 'import', 'editor']).default('editor'),
applyAutoLayout: z.boolean().default(true),
createCheckpoint: z.boolean().default(false),
chatId: z.string().optional(),
source: z.enum(['copilot', 'editor', 'import']).default('editor'),
applyAutoLayout: z.boolean().optional().default(false),
createCheckpoint: z.boolean().optional().default(false),
})
type YamlWorkflowRequest = z.infer<typeof YamlWorkflowRequestSchema>
function updateBlockReferences(
value: any,
blockIdMapping: Map<string, string>,
requestId: string
): any {
if (typeof value === 'string') {
// Replace references in string values
for (const [oldId, newId] of blockIdMapping.entries()) {
if (value.includes(oldId)) {
value = value.replaceAll(`<${oldId}.`, `<${newId}.`).replaceAll(`%${oldId}.`, `%${newId}.`)
}
}
return value
}
if (Array.isArray(value)) {
return value.map((item) => updateBlockReferences(item, blockIdMapping, requestId))
}
if (value && typeof value === 'object') {
const result: Record<string, any> = {}
for (const [key, val] of Object.entries(value)) {
result[key] = updateBlockReferences(val, blockIdMapping, requestId)
}
return result
}
return value
}
/**
* Helper function to create a checkpoint before workflow changes
@@ -68,7 +92,7 @@ async function createWorkflowCheckpoint(
{} as Record<string, BlockConfig>
)
const generateResponse = await fetch(`${SIM_AGENT_API_URL}/api/workflow/to-yaml`, {
const generateResponse = await fetch(`${env.SIM_AGENT_API_URL}/api/workflow/to-yaml`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
@@ -114,118 +138,104 @@ async function createWorkflowCheckpoint(
}
}
/**
* Helper function to get user ID with proper authentication for both tool calls and direct requests
*/
async function getUserId(requestId: string, workflowId: string): Promise<string | null> {
// Use the OAuth utils function that handles both session and workflow-based auth
const userId = await getOAuthUserId(requestId, workflowId)
if (!userId) {
logger.warn(`[${requestId}] Could not determine user ID for workflow ${workflowId}`)
return null
}
// For additional security, verify the user has permission to access this workflow
const workflowData = await db
.select()
.from(workflowTable)
.where(eq(workflowTable.id, workflowId))
.then((rows) => rows[0])
if (!workflowData) {
logger.warn(`[${requestId}] Workflow ${workflowId} not found`)
return null
}
// Check if user has permission to update this workflow
let canUpdate = false
// Case 1: User owns the workflow
if (workflowData.userId === userId) {
canUpdate = true
}
// Case 2: Workflow belongs to a workspace and user has write or admin permission
if (!canUpdate && workflowData.workspaceId) {
try {
const userPermission = await getUserEntityPermissions(
userId,
'workspace',
workflowData.workspaceId
)
if (userPermission === 'write' || userPermission === 'admin') {
canUpdate = true
}
} catch (error) {
logger.warn(`[${requestId}] Error checking workspace permissions:`, error)
}
}
if (!canUpdate) {
logger.warn(`[${requestId}] User ${userId} denied permission to update workflow ${workflowId}`)
return null
}
return userId
}
/**
* Helper function to update block references in values with new mapped IDs
*/
function updateBlockReferences(
value: any,
blockIdMapping: Map<string, string>,
async function upsertCustomToolsFromBlocks(
userId: string,
blocks: Record<string, any>,
requestId: string
): any {
if (typeof value === 'string' && value.includes('<') && value.includes('>')) {
let processedValue = value
const blockMatches = value.match(/<([^>]+)>/g)
): Promise<{ created: number; updated: number }> {
try {
// Collect custom tools from all agent blocks
const collected: Array<{ title: string; schema: any; code: string }> = []
if (blockMatches) {
for (const match of blockMatches) {
const path = match.slice(1, -1)
const [blockRef] = path.split('.')
for (const block of Object.values(blocks)) {
if (!block || block.type !== 'agent') continue
const toolsSub = block.subBlocks?.tools
if (!toolsSub) continue
// Skip system references (start, loop, parallel, variable)
if (['start', 'loop', 'parallel', 'variable'].includes(blockRef.toLowerCase())) {
let value = toolsSub.value
if (!value) continue
if (typeof value === 'string') {
try {
value = JSON.parse(value)
} catch {
continue
}
}
if (!Array.isArray(value)) continue
// Check if this references an old block ID that needs mapping
const newMappedId = blockIdMapping.get(blockRef)
if (newMappedId) {
logger.info(`[${requestId}] Updating block reference: ${blockRef} -> ${newMappedId}`)
processedValue = processedValue.replace(
new RegExp(`<${blockRef}\\.`, 'g'),
`<${newMappedId}.`
)
processedValue = processedValue.replace(
new RegExp(`<${blockRef}>`, 'g'),
`<${newMappedId}>`
)
for (const tool of value) {
if (
tool &&
tool.type === 'custom-tool' &&
tool.schema &&
tool.schema.function &&
tool.schema.function.name &&
typeof tool.code === 'string'
) {
collected.push({
title: tool.title || tool.schema.function.name,
schema: tool.schema,
code: tool.code,
})
}
}
}
return processedValue
}
if (collected.length === 0) return { created: 0, updated: 0 }
// Handle arrays
if (Array.isArray(value)) {
return value.map((item) => updateBlockReferences(item, blockIdMapping, requestId))
}
// Handle objects
if (value !== null && typeof value === 'object') {
const result = { ...value }
for (const key in result) {
result[key] = updateBlockReferences(result[key], blockIdMapping, requestId)
// Ensure unique by function name
const byName = new Map<string, { title: string; schema: any; code: string }>()
for (const t of collected) {
const name = t.schema.function.name
if (!byName.has(name)) byName.set(name, t)
}
return result
}
return value
// Load existing user's tools
const existing = await db.select().from(customTools).where(eq(customTools.userId, userId))
const existingByName = new Map<string, (typeof existing)[number]>()
for (const row of existing) {
try {
const fnName = (row.schema as any)?.function?.name
if (fnName) existingByName.set(fnName, row as any)
} catch {}
}
let created = 0
let updated = 0
const now = new Date()
// Upsert by function name
for (const [name, tool] of byName.entries()) {
const match = existingByName.get(name)
if (!match) {
await db.insert(customTools).values({
id: crypto.randomUUID(),
userId,
title: tool.title,
schema: tool.schema,
code: tool.code,
createdAt: now,
updatedAt: now,
})
created++
} else {
await db
.update(customTools)
.set({ title: tool.title, schema: tool.schema, code: tool.code, updatedAt: now })
.where(eq(customTools.id, match.id))
updated++
}
}
logger.info(`[${requestId}] Upserted custom tools from YAML`, { created, updated })
return { created, updated }
} catch (err) {
logger.warn(`[${requestId}] Failed to upsert custom tools from YAML`, {
error: err instanceof Error ? err.message : String(err),
})
return { created: 0, updated: 0 }
}
}
/**
@@ -281,7 +291,7 @@ export async function PUT(request: NextRequest, { params }: { params: Promise<{
{} as Record<string, BlockConfig>
)
const conversionResponse = await fetch(`${SIM_AGENT_API_URL}/api/yaml/to-workflow`, {
const conversionResponse = await fetch(`${env.SIM_AGENT_API_URL}/api/yaml/to-workflow`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
@@ -317,7 +327,9 @@ export async function PUT(request: NextRequest, { params }: { params: Promise<{
const conversionResult = await conversionResponse.json()
if (!conversionResult.success || !conversionResult.workflowState) {
const workflowState = conversionResult.workflowState || conversionResult.diff?.proposedState
if (!conversionResult.success || !workflowState) {
logger.error(`[${requestId}] YAML conversion failed`, {
errors: conversionResult.errors,
warnings: conversionResult.warnings,
@@ -330,8 +342,6 @@ export async function PUT(request: NextRequest, { params }: { params: Promise<{
})
}
const { workflowState } = conversionResult
// Ensure all blocks have required fields
Object.values(workflowState.blocks).forEach((block: any) => {
if (block.enabled === undefined) {
@@ -541,7 +551,6 @@ export async function PUT(request: NextRequest, { params }: { params: Promise<{
}
}
// Debug: Log block parent-child relationships before generating loops
// Generate loop and parallel configurations
const loops = generateLoopBlocks(newWorkflowState.blocks)
const parallels = generateParallelBlocks(newWorkflowState.blocks)
@@ -626,6 +635,18 @@ export async function PUT(request: NextRequest, { params }: { params: Promise<{
}
}
// Sanitize custom tools in agent blocks before saving
const { blocks: sanitizedBlocks, warnings: sanitationWarnings } = sanitizeAgentToolsInBlocks(
newWorkflowState.blocks
)
if (sanitationWarnings.length > 0) {
logger.warn(`[${requestId}] Tool sanitation produced ${sanitationWarnings.length} warning(s)`)
}
newWorkflowState.blocks = sanitizedBlocks
// Upsert custom tools from blocks
await upsertCustomToolsFromBlocks(userId, newWorkflowState.blocks, requestId)
// Save to database
const saveResult = await saveWorkflowToNormalizedTables(workflowId, newWorkflowState)
@@ -635,7 +656,7 @@ export async function PUT(request: NextRequest, { params }: { params: Promise<{
success: false,
message: `Database save failed: ${saveResult.error || 'Unknown error'}`,
errors: [saveResult.error || 'Database save failed'],
warnings,
warnings: [...warnings, ...sanitationWarnings],
})
}
@@ -687,7 +708,7 @@ export async function PUT(request: NextRequest, { params }: { params: Promise<{
parallelsCount: Object.keys(parallels).length,
},
errors: [],
warnings,
warnings: [...warnings, ...sanitationWarnings],
})
} catch (error) {
const elapsed = Date.now() - startTime

View File

@@ -1,5 +1,6 @@
import { eq } from 'drizzle-orm'
import { createLogger } from '@/lib/logs/console/logger'
import { sanitizeAgentToolsInBlocks } from '@/lib/workflows/validation'
import { db } from '@/db'
import { workflow, workflowBlocks, workflowEdges, workflowSubflows } from '@/db/schema'
import type { WorkflowState } from '@/stores/workflows/workflow/types'
@@ -114,6 +115,14 @@ export async function loadWorkflowFromNormalizedTables(
}
})
// Sanitize any invalid custom tools in agent blocks to prevent client crashes
const { blocks: sanitizedBlocks, warnings } = sanitizeAgentToolsInBlocks(blocksMap)
if (warnings.length > 0) {
logger.warn(`Sanitized workflow ${workflowId} tools with ${warnings.length} warning(s)`, {
warnings,
})
}
// Convert edges to the expected format
const edgesArray = edges.map((edge) => ({
id: edge.id,
@@ -146,7 +155,7 @@ export async function loadWorkflowFromNormalizedTables(
})
return {
blocks: blocksMap,
blocks: sanitizedBlocks,
edges: edgesArray,
loops,
parallels,

View File

@@ -0,0 +1,109 @@
import { createLogger } from '@/lib/logs/console/logger'
const logger = createLogger('WorkflowValidation')
function isValidCustomToolSchema(tool: any): boolean {
try {
if (!tool || typeof tool !== 'object') return false
if (tool.type !== 'custom-tool') return true // non-custom tools are validated elsewhere
const schema = tool.schema
if (!schema || typeof schema !== 'object') return false
const fn = schema.function
if (!fn || typeof fn !== 'object') return false
if (!fn.name || typeof fn.name !== 'string') return false
const params = fn.parameters
if (!params || typeof params !== 'object') return false
if (params.type !== 'object') return false
if (!params.properties || typeof params.properties !== 'object') return false
return true
} catch (_err) {
return false
}
}
export function sanitizeAgentToolsInBlocks(blocks: Record<string, any>): {
blocks: Record<string, any>
warnings: string[]
} {
const warnings: string[] = []
// Shallow clone to avoid mutating callers
const sanitizedBlocks: Record<string, any> = { ...blocks }
for (const [blockId, block] of Object.entries(sanitizedBlocks)) {
try {
if (!block || block.type !== 'agent') continue
const subBlocks = block.subBlocks || {}
const toolsSubBlock = subBlocks.tools
if (!toolsSubBlock) continue
let value = toolsSubBlock.value
// Parse legacy string format
if (typeof value === 'string') {
try {
value = JSON.parse(value)
} catch (_e) {
warnings.push(
`Block ${block.name || blockId}: invalid tools JSON; resetting tools to empty array`
)
value = []
}
}
if (!Array.isArray(value)) {
// Force to array to keep client safe
warnings.push(`Block ${block.name || blockId}: tools value is not an array; resetting`)
toolsSubBlock.value = []
continue
}
const originalLength = value.length
const cleaned = value
.filter((tool: any) => {
// Allow non-custom tools to pass through as-is
if (!tool || typeof tool !== 'object') return false
if (tool.type !== 'custom-tool') return true
const ok = isValidCustomToolSchema(tool)
if (!ok) {
logger.warn('Removing invalid custom tool from workflow', {
blockId,
blockName: block.name,
})
}
return ok
})
.map((tool: any) => {
if (tool.type === 'custom-tool') {
// Ensure required defaults to avoid client crashes
if (!tool.code || typeof tool.code !== 'string') {
tool.code = ''
}
if (!tool.usageControl) {
tool.usageControl = 'auto'
}
}
return tool
})
if (cleaned.length !== originalLength) {
warnings.push(
`Block ${block.name || blockId}: removed ${originalLength - cleaned.length} invalid tool(s)`
)
}
toolsSubBlock.value = cleaned
// Reassign in case caller uses object identity
sanitizedBlocks[blockId] = { ...block, subBlocks: { ...subBlocks, tools: toolsSubBlock } }
} catch (err: any) {
warnings.push(
`Block ${block?.name || blockId}: tools sanitation failed: ${err?.message || String(err)}`
)
}
}
return { blocks: sanitizedBlocks, warnings }
}