This commit is contained in:
Siddharth Ganesan
2026-01-28 17:31:47 -08:00
parent f808fd6c69
commit a6e2e4bdb9
9 changed files with 481 additions and 26 deletions

View File

@@ -1,5 +1,5 @@
import { db } from '@sim/db'
import { copilotChats } from '@sim/db/schema'
import { copilotChats, workflow } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, desc, eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
@@ -40,11 +40,24 @@ const FileAttachmentSchema = z.object({
size: z.number(),
})
/**
* Session context for headless mode.
* In headless mode, workflowId may not be known at start.
* The set_context tool can be used to establish context mid-conversation.
*/
const SessionContextSchema = z.object({
workflowId: z.string().optional(),
workspaceId: z.string().optional(),
})
const ChatMessageSchema = z.object({
message: z.string().min(1, 'Message is required'),
userMessageId: z.string().optional(), // ID from frontend for the user message
chatId: z.string().optional(),
workflowId: z.string().min(1, 'Workflow ID is required'),
// workflowId is optional for headless mode - can be set via set_context tool
workflowId: z.string().optional(),
// Session context for headless mode - provides initial context that can be updated via set_context
sessionContext: SessionContextSchema.optional(),
model: z.enum(COPILOT_MODEL_IDS).optional().default('claude-4.5-opus'),
mode: z.enum(COPILOT_REQUEST_MODES).optional().default('agent'),
prefetch: z.boolean().optional(),
@@ -105,6 +118,7 @@ export async function POST(req: NextRequest) {
userMessageId,
chatId,
workflowId,
sessionContext,
model,
mode,
prefetch,
@@ -117,6 +131,41 @@ export async function POST(req: NextRequest) {
contexts,
commands,
} = ChatMessageSchema.parse(body)
// ─────────────────────────────────────────────────────────────────────────
// Resolve execution context (workflowId, workspaceId)
// In client mode: workflowId comes from request, we look up workspaceId
// In headless mode: may start without workflowId, set via set_context tool
// ─────────────────────────────────────────────────────────────────────────
const resolvedWorkflowId = workflowId || sessionContext?.workflowId
let resolvedWorkspaceId = sessionContext?.workspaceId
// If we have a workflowId but no workspaceId, look it up once
if (resolvedWorkflowId && !resolvedWorkspaceId) {
try {
const [wf] = await db
.select({ workspaceId: workflow.workspaceId })
.from(workflow)
.where(eq(workflow.id, resolvedWorkflowId))
.limit(1)
resolvedWorkspaceId = wf?.workspaceId ?? undefined
} catch (error) {
logger.warn(`[${tracker.requestId}] Failed to lookup workspaceId for workflow`, {
workflowId: resolvedWorkflowId,
error: error instanceof Error ? error.message : String(error),
})
}
}
// Build execution context that will be passed to Go and used for tool execution
const executionContext = {
userId: authenticatedUserId,
workflowId: resolvedWorkflowId,
workspaceId: resolvedWorkspaceId,
}
logger.debug(`[${tracker.requestId}] Resolved execution context`, executionContext)
// Ensure we have a consistent user message ID for this request
const userMessageIdToUse = userMessageId || crypto.randomUUID()
try {
@@ -431,7 +480,7 @@ export async function POST(req: NextRequest) {
const requestPayload = {
message: message, // Just send the current user message text
workflowId,
workflowId: resolvedWorkflowId,
userId: authenticatedUserId,
stream: stream,
streamToolCalls: true,
@@ -439,6 +488,9 @@ export async function POST(req: NextRequest) {
mode: transportMode,
messageId: userMessageIdToUse,
version: SIM_AGENT_VERSION,
// Execution context for Go to maintain and echo back in tool_call events
// This enables headless mode where context can be set dynamically via set_context tool
executionContext,
...(providerConfig ? { provider: providerConfig } : {}),
...(effectiveConversationId ? { conversationId: effectiveConversationId } : {}),
...(typeof prefetch === 'boolean' ? { prefetch: prefetch } : {}),
@@ -625,6 +677,16 @@ export async function POST(req: NextRequest) {
// Execute server-side tools automatically
// This runs async and calls mark-complete when done
// Use context from Go's event.data.executionContext if provided,
// falling back to the initial resolved context
const toolContext = {
userId: authenticatedUserId,
workflowId:
event.data.executionContext?.workflowId || resolvedWorkflowId,
workspaceId:
event.data.executionContext?.workspaceId || resolvedWorkspaceId,
chatId: actualChatId,
}
handleToolCallEvent(
{
id: event.data.id,
@@ -632,11 +694,7 @@ export async function POST(req: NextRequest) {
arguments: event.data.arguments || {},
partial: false,
},
{
userId: authenticatedUserId,
workflowId,
chatId: actualChatId,
}
toolContext
).then((handledServerSide) => {
if (handledServerSide) {
registerServerHandledTool(event.data.id, event.data.name)

View File

@@ -0,0 +1,186 @@
/**
* POST /api/copilot/test
*
* Simple test endpoint for copilot without authentication.
* Pass just a query and optional userId to test headless mode.
*
* Request body:
* {
* query: string, // Required - the message to send
* userId?: string, // Optional - defaults to 'test-user'
* workflowId?: string, // Optional - workflow context
* workspaceId?: string, // Optional - workspace context
* stream?: boolean, // Optional - defaults to true
* }
*/
import { createLogger } from '@sim/logger'
import { type NextRequest, NextResponse } from 'next/server'
import { z } from 'zod'
import { SIM_AGENT_API_URL_DEFAULT, SIM_AGENT_VERSION } from '@/lib/copilot/constants'
import {
handleToolCallEvent,
registerServerHandledTool,
} from '@/lib/copilot/server-executor/stream-handler'
import { env } from '@/lib/core/config/env'
const logger = createLogger('CopilotTestAPI')
const SIM_AGENT_API_URL = env.SIM_AGENT_API_URL || SIM_AGENT_API_URL_DEFAULT
const TestRequestSchema = z.object({
query: z.string().min(1, 'Query is required'),
userId: z.string().optional().default('test-user'),
workflowId: z.string().optional(),
workspaceId: z.string().optional(),
stream: z.boolean().optional().default(true),
})
export async function POST(req: NextRequest) {
try {
const body = await req.json()
const { query, userId, workflowId, workspaceId, stream } = TestRequestSchema.parse(body)
logger.info('Test copilot request', { query, userId, workflowId, workspaceId, stream })
// Build execution context
const executionContext = {
userId,
workflowId,
workspaceId,
}
// Build request payload for Go copilot
const requestPayload = {
message: query,
workflowId,
userId,
stream: stream,
streamToolCalls: true,
model: 'claude-sonnet-4-20250514',
mode: 'agent',
messageId: crypto.randomUUID(),
version: SIM_AGENT_VERSION,
executionContext,
}
logger.info('Sending to Go copilot', { url: `${SIM_AGENT_API_URL}/api/chat-completion-streaming` })
const simAgentResponse = await fetch(`${SIM_AGENT_API_URL}/api/chat-completion-streaming`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
...(env.COPILOT_API_KEY ? { 'x-api-key': env.COPILOT_API_KEY } : {}),
},
body: JSON.stringify(requestPayload),
})
if (!simAgentResponse.ok) {
const errorText = await simAgentResponse.text().catch(() => '')
logger.error('Go copilot error', { status: simAgentResponse.status, error: errorText })
return NextResponse.json(
{ error: `Copilot error: ${simAgentResponse.statusText}`, details: errorText },
{ status: simAgentResponse.status }
)
}
if (stream && simAgentResponse.body) {
// Create streaming response
const transformedStream = new ReadableStream({
async start(controller) {
const encoder = new TextEncoder()
const reader = simAgentResponse.body!.getReader()
const decoder = new TextDecoder()
let buffer = ''
try {
while (true) {
const { done, value } = await reader.read()
if (done) break
const chunk = decoder.decode(value, { stream: true })
buffer += chunk
const lines = buffer.split('\n')
buffer = lines.pop() || ''
for (const line of lines) {
if (line.trim() === '') continue
if (line.startsWith('data: ') && line.length > 6) {
try {
const jsonStr = line.slice(6)
const event = JSON.parse(jsonStr)
// Handle tool calls server-side
if (event.type === 'tool_call' && !event.data?.partial && event.data?.id) {
const toolContext = {
userId,
workflowId: event.data.executionContext?.workflowId || workflowId,
workspaceId: event.data.executionContext?.workspaceId || workspaceId,
chatId: undefined,
}
handleToolCallEvent(
{
id: event.data.id,
name: event.data.name,
arguments: event.data.arguments || {},
partial: false,
},
toolContext
).then((handledServerSide) => {
if (handledServerSide) {
registerServerHandledTool(event.data.id, event.data.name)
logger.info('Tool executed server-side', {
toolCallId: event.data.id,
toolName: event.data.name,
})
}
})
}
} catch {
// Ignore parse errors
}
}
// Forward all events to client
controller.enqueue(encoder.encode(line + '\n'))
}
}
// Handle remaining buffer
if (buffer.trim()) {
controller.enqueue(encoder.encode(buffer + '\n'))
}
} catch (error) {
logger.error('Stream error', { error })
} finally {
controller.close()
}
},
})
return new Response(transformedStream, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
},
})
} else {
// Non-streaming response
const text = await simAgentResponse.text()
return NextResponse.json({ response: text })
}
} catch (error) {
logger.error('Test endpoint error', { error })
if (error instanceof z.ZodError) {
return NextResponse.json({ error: 'Invalid request', details: error.errors }, { status: 400 })
}
return NextResponse.json(
{ error: 'Internal error', message: error instanceof Error ? error.message : String(error) },
{ status: 500 }
)
}
}

View File

@@ -29,6 +29,7 @@ import {
} from '../tools/server/other/mark-todo-in-progress'
import { searchOnlineServerTool } from '../tools/server/other/search-online'
import { SleepInput, sleepServerTool } from '../tools/server/other/sleep'
import { setContextServerTool } from '../tools/server/context/set-context'
import { getCredentialsServerTool } from '../tools/server/user/get-credentials'
import { setEnvironmentVariablesServerTool } from '../tools/server/user/set-environment-variables'
import {
@@ -98,6 +99,7 @@ import {
MakeApiRequestInput,
SearchDocumentationInput,
SearchOnlineInput,
SetContextInput,
SetEnvironmentVariablesInput,
} from '../tools/shared/schemas'
import type { ExecutionContext, ToolResult } from './types'
@@ -107,8 +109,15 @@ const logger = createLogger('ToolRegistry')
/**
* Context type for server tools.
* This is the full execution context passed to tools that need workflow/workspace info.
*/
type ServerToolContext = { userId: string } | undefined
type ServerToolContext =
| {
userId: string
workflowId?: string
workspaceId?: string
}
| undefined
/**
* Helper to create a typed executor wrapper.
@@ -321,6 +330,15 @@ const TOOL_REGISTRY: Record<string, ToolRegistration> = {
execute: createExecutor(setEnvironmentVariablesServerTool),
},
// ─────────────────────────────────────────────────────────────────────────
// Context Tools (for headless mode)
// ─────────────────────────────────────────────────────────────────────────
set_context: {
inputSchema: SetContextInput,
requiresAuth: true,
execute: createExecutor(setContextServerTool),
},
// ─────────────────────────────────────────────────────────────────────────
// Todo Tools
// ─────────────────────────────────────────────────────────────────────────
@@ -404,10 +422,15 @@ export async function executeRegisteredTool(
// Execute the tool
try {
const result = await registration.execute(
validatedArgs,
context.userId ? { userId: context.userId } : undefined
)
// Pass the full execution context so tools can access workflowId/workspaceId
const toolContext = context.userId
? {
userId: context.userId,
workflowId: context.workflowId,
workspaceId: context.workspaceId,
}
: undefined
const result = await registration.execute(validatedArgs, toolContext)
return successResult(result)
} catch (error) {
const message = error instanceof Error ? error.message : String(error)

View File

@@ -36,6 +36,7 @@ interface ToolExecutionState {
status: 'pending' | 'executing' | 'completed' | 'failed'
userId: string
workflowId?: string
workspaceId?: string
chatId?: string
startedAt: number
completedAt?: number
@@ -223,6 +224,7 @@ export async function handleToolCallEvent(
status: 'pending',
userId: context.userId,
workflowId: context.workflowId,
workspaceId: context.workspaceId,
chatId: context.chatId,
startedAt: Date.now(),
})
@@ -256,6 +258,7 @@ async function executeToolServerSide(
status: 'executing',
userId: context.userId,
workflowId: context.workflowId,
workspaceId: context.workspaceId,
chatId: context.chatId,
startedAt: startTime,
})
@@ -276,6 +279,7 @@ async function executeToolServerSide(
status: 'failed',
userId: context.userId,
workflowId: context.workflowId,
workspaceId: context.workspaceId,
chatId: context.chatId,
startedAt: startTime,
completedAt: Date.now(),
@@ -301,6 +305,7 @@ async function executeToolServerSide(
status: 'completed',
userId: context.userId,
workflowId: context.workflowId,
workspaceId: context.workspaceId,
chatId: context.chatId,
startedAt: startTime,
completedAt: Date.now(),
@@ -329,6 +334,7 @@ async function executeToolServerSide(
status: 'failed',
userId: context.userId,
workflowId: context.workflowId,
workspaceId: context.workspaceId,
chatId: context.chatId,
startedAt: startTime,
completedAt: Date.now(),
@@ -361,6 +367,7 @@ async function executeToolServerSide(
status: 'failed',
userId: context.userId,
workflowId: context.workflowId,
workspaceId: context.workspaceId,
chatId: context.chatId,
startedAt: startTime,
completedAt: Date.now(),

View File

@@ -23,10 +23,15 @@ export interface ToolResult<T = unknown> {
/**
* Context passed to tool executors.
*
* This context is passed from Go copilot to SIM on each tool_call event.
* In client mode, workflowId/workspaceId come from the initial request.
* In headless mode, they can be set dynamically via the set_context tool.
*/
export interface ExecutionContext {
userId: string
workflowId?: string
workspaceId?: string
chatId?: string
}

View File

@@ -0,0 +1,81 @@
/**
* Set Context Server Tool
*
* Allows headless mode sessions to dynamically set the workflow context.
* When called, this tool validates that the user has access to the specified
* workflow and returns the resolved context (including workspaceId).
*
* Go copilot should update its internal session state with the returned context
* and include it in subsequent tool_call events.
*/
import { createLogger } from '@sim/logger'
import { verifyWorkflowAccess } from '@/lib/copilot/auth/permissions'
import type { BaseServerTool } from '@/lib/copilot/tools/server/base-tool'
const logger = createLogger('SetContextServerTool')
export interface SetContextParams {
/** The workflow ID to set as the current context */
workflowId: string
}
export interface SetContextResult {
success: boolean
/** The resolved execution context - Go should store this and include in tool_call events */
executionContext: {
workflowId: string
workspaceId?: string
userId: string
}
message: string
}
export const setContextServerTool: BaseServerTool<SetContextParams, SetContextResult> = {
name: 'set_context',
async execute(params: SetContextParams, context?: { userId: string }): Promise<SetContextResult> {
if (!context?.userId) {
logger.error('Unauthorized attempt to set context - no authenticated user')
throw new Error('Authentication required')
}
const { workflowId } = params
if (!workflowId) {
throw new Error('workflowId is required')
}
logger.info('Setting execution context', {
workflowId,
userId: context.userId,
})
// Verify the user has access to this workflow
const { hasAccess, workspaceId } = await verifyWorkflowAccess(context.userId, workflowId)
if (!hasAccess) {
logger.warn('User does not have access to workflow', {
workflowId,
userId: context.userId,
})
throw new Error(`Access denied to workflow ${workflowId}`)
}
logger.info('Context set successfully', {
workflowId,
workspaceId,
userId: context.userId,
})
return {
success: true,
executionContext: {
workflowId,
workspaceId,
userId: context.userId,
},
message: `Context set to workflow ${workflowId}${workspaceId ? ` (workspace: ${workspaceId})` : ''}`,
}
},
}

View File

@@ -8,7 +8,10 @@ import { validateSelectorIds } from '@/lib/copilot/validation/selector-validator
import type { PermissionGroupConfig } from '@/lib/permission-groups/types'
import { getBlockOutputs } from '@/lib/workflows/blocks/block-outputs'
import { extractAndPersistCustomTools } from '@/lib/workflows/persistence/custom-tools-persistence'
import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/persistence/utils'
import {
loadWorkflowFromNormalizedTables,
saveWorkflowToNormalizedTables,
} from '@/lib/workflows/persistence/utils'
import { isValidKey } from '@/lib/workflows/sanitization/key-validation'
import { validateWorkflowState } from '@/lib/workflows/sanitization/validation'
import { buildCanonicalIndex, isCanonicalPair } from '@/lib/workflows/subblocks/visibility'
@@ -2626,13 +2629,22 @@ async function getCurrentWorkflowStateFromDb(
export const editWorkflowServerTool: BaseServerTool<EditWorkflowParams, any> = {
name: 'edit_workflow',
async execute(params: EditWorkflowParams, context?: { userId: string }): Promise<any> {
async execute(
params: EditWorkflowParams,
context?: { userId: string; workflowId?: string }
): Promise<any> {
const logger = createLogger('EditWorkflowServerTool')
const { operations, workflowId, currentUserWorkflow } = params
const { operations, currentUserWorkflow } = params
// Use workflowId from params if provided, otherwise fall back to context
const workflowId = params.workflowId || context?.workflowId
if (!Array.isArray(operations) || operations.length === 0) {
throw new Error('operations are required and must be an array')
}
if (!workflowId) throw new Error('workflowId is required')
if (!workflowId) {
throw new Error(
'No workflow specified. Please provide a workflowId or ensure you have an active workflow open.'
)
}
logger.info('Executing edit_workflow', {
operationCount: operations.length,
@@ -2737,10 +2749,66 @@ export const editWorkflowServerTool: BaseServerTool<EditWorkflowParams, any> = {
logger.warn('No userId in context - skipping custom tools persistence', { workflowId })
}
logger.info('edit_workflow successfully applied operations', {
// Prepare the final workflow state for persistence
const finalWorkflowState = validation.sanitizedState || modifiedWorkflowState
// ─────────────────────────────────────────────────────────────────────────
// PERSIST THE CHANGES TO THE DATABASE
// This is critical for headless mode and ensures changes are saved
// ─────────────────────────────────────────────────────────────────────────
const workflowStateForPersistence = {
blocks: finalWorkflowState.blocks,
edges: finalWorkflowState.edges,
loops: finalWorkflowState.loops || {},
parallels: finalWorkflowState.parallels || {},
lastSaved: Date.now(),
}
const saveResult = await saveWorkflowToNormalizedTables(workflowId, workflowStateForPersistence)
if (!saveResult.success) {
logger.error('Failed to persist workflow changes to database', {
workflowId,
error: saveResult.error,
})
throw new Error(`Failed to save workflow: ${saveResult.error}`)
}
// Update workflow's lastSynced timestamp
await db
.update(workflowTable)
.set({
lastSynced: new Date(),
updatedAt: new Date(),
})
.where(eq(workflowTable.id, workflowId))
// Notify socket server so connected clients can refresh
// This uses the copilot-specific endpoint to trigger UI refresh
try {
const socketUrl = process.env.SOCKET_SERVER_URL || 'http://localhost:3002'
const operationsSummary = operations
.map((op: any) => `${op.operation_type} ${op.block_id || 'block'}`)
.slice(0, 3)
.join(', ')
const description = `Applied ${operations.length} operation(s): ${operationsSummary}${operations.length > 3 ? '...' : ''}`
await fetch(`${socketUrl}/api/copilot-workflow-edit`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ workflowId, description }),
}).catch((err) => {
logger.warn('Failed to notify socket server about copilot edit', { error: err.message })
})
} catch (notifyError) {
// Non-fatal - log and continue
logger.warn('Error notifying socket server', { error: notifyError })
}
logger.info('edit_workflow successfully applied and persisted operations', {
operationCount: operations.length,
blocksCount: Object.keys(modifiedWorkflowState.blocks).length,
edgesCount: modifiedWorkflowState.edges.length,
blocksCount: Object.keys(finalWorkflowState.blocks).length,
edgesCount: finalWorkflowState.edges.length,
inputValidationErrors: validationErrors.length,
skippedItemsCount: skippedItems.length,
schemaValidationErrors: validation.errors.length,
@@ -2760,7 +2828,7 @@ export const editWorkflowServerTool: BaseServerTool<EditWorkflowParams, any> = {
// Return the modified workflow state for the client to convert to YAML if needed
return {
success: true,
workflowState: validation.sanitizedState || modifiedWorkflowState,
workflowState: finalWorkflowState,
// Include input validation errors so the LLM can see what was rejected
...(inputErrors && {
inputValidationErrors: inputErrors,

View File

@@ -9,8 +9,9 @@ import { sanitizeForCopilot } from '@/lib/workflows/sanitization/json-sanitizer'
const logger = createLogger('GetUserWorkflowServerTool')
// workflowId is optional - if not provided, we use the one from execution context
export const GetUserWorkflowInput = z.object({
workflowId: z.string().min(1),
workflowId: z.string().optional(),
})
export const GetUserWorkflowResult = z.object({
@@ -27,15 +28,22 @@ export const getUserWorkflowServerTool: BaseServerTool<
GetUserWorkflowResultType
> = {
name: 'get_user_workflow',
async execute(args: unknown, context?: { userId: string }) {
async execute(args: unknown, context?: { userId: string; workflowId?: string }) {
const parsed = GetUserWorkflowInput.parse(args)
const { workflowId } = parsed
// Use workflowId from args if provided, otherwise fall back to context
const workflowId = parsed.workflowId || context?.workflowId
if (!context?.userId) {
throw new Error('User authentication required')
}
logger.debug('Getting user workflow', { workflowId })
if (!workflowId) {
throw new Error(
'No workflow specified. Please provide a workflowId or ensure you have an active workflow open.'
)
}
logger.debug('Getting user workflow', { workflowId, fromContext: !parsed.workflowId })
// Get workflow metadata
const [wf] = await db

View File

@@ -329,8 +329,9 @@ export type MakeApiRequestResultType = z.infer<typeof MakeApiRequestResult>
// ============================================================================
// edit_workflow - input is complex, using passthrough for flexibility
// workflowId is optional - if not provided, uses the active workflow from context
export const EditWorkflowInput = z.object({
workflowId: z.string(),
workflowId: z.string().optional(),
operations: z.array(z.record(z.unknown())),
currentUserWorkflow: z.unknown().optional(),
})
@@ -443,3 +444,21 @@ export const SetEnvironmentVariablesResult = z.object({
savedCount: z.number().optional(),
variables: z.array(z.string()).optional(),
})
// set_context - for headless mode to dynamically set the workflow context
export const SetContextInput = z.object({
/** The workflow ID to set as the current context */
workflowId: z.string().min(1, 'workflowId is required'),
})
export const SetContextResult = z.object({
success: z.boolean(),
/** The resolved execution context - Go should store this and include in tool_call events */
executionContext: z.object({
workflowId: z.string(),
workspaceId: z.string().optional(),
userId: z.string(),
}),
message: z.string(),
})
export type SetContextInputType = z.infer<typeof SetContextInput>
export type SetContextResultType = z.infer<typeof SetContextResult>