First cleanup pass complete - untested

This commit is contained in:
Siddharth Ganesan
2026-02-05 13:24:29 -08:00
parent 57cba2ab1e
commit ea22b1da4d
27 changed files with 2715 additions and 2574 deletions

View File

@@ -18,9 +18,11 @@ export async function GET(_req: NextRequest) {
return NextResponse.json({ success: true, result })
} catch (error) {
return NextResponse.json(
{ success: false, error: error instanceof Error ? error.message : 'Failed to load credentials' },
{
success: false,
error: error instanceof Error ? error.message : 'Failed to load credentials',
},
{ status: 500 }
)
}
}

View File

@@ -355,13 +355,23 @@ async function handleBuildToolCall(
const { model } = getCopilotModel('chat')
const workflowId = args.workflowId as string | undefined
const resolved = workflowId
? { workflowId }
: await resolveWorkflowIdForUser(userId)
const resolved = workflowId ? { workflowId } : await resolveWorkflowIdForUser(userId)
if (!resolved?.workflowId) {
const response: CallToolResult = {
content: [{ type: 'text', text: JSON.stringify({ success: false, error: 'workflowId is required for build. Call create_workflow first.' }, null, 2) }],
content: [
{
type: 'text',
text: JSON.stringify(
{
success: false,
error: 'workflowId is required for build. Call create_workflow first.',
},
null,
2
),
},
],
isError: true,
}
return NextResponse.json(createResponse(id, response))
@@ -410,10 +420,9 @@ async function handleBuildToolCall(
return NextResponse.json(createResponse(id, response))
} catch (error) {
logger.error('Build tool call failed', { error })
return NextResponse.json(
createError(id, ErrorCode.InternalError, `Build failed: ${error}`),
{ status: 500 }
)
return NextResponse.json(createError(id, ErrorCode.InternalError, `Build failed: ${error}`), {
status: 500,
})
}
}

View File

@@ -5,10 +5,43 @@ import { CheckCircle, ChevronDown, ChevronRight, Loader2, Settings, XCircle } fr
import { Badge } from '@/components/emcn'
import { Button } from '@/components/ui/button'
import { Collapsible, CollapsibleContent, CollapsibleTrigger } from '@/components/ui/collapsible'
import type { ToolCallGroup, ToolCallState } from '@/lib/copilot/types'
import { cn } from '@/lib/core/utils/cn'
import { formatDuration } from '@/lib/core/utils/formatting'
interface ToolCallState {
id: string
name: string
displayName?: string
parameters?: Record<string, unknown>
state:
| 'detecting'
| 'pending'
| 'executing'
| 'completed'
| 'error'
| 'rejected'
| 'applied'
| 'ready_for_review'
| 'aborted'
| 'skipped'
| 'background'
startTime?: number
endTime?: number
duration?: number
result?: unknown
error?: string
progress?: string
}
interface ToolCallGroup {
id: string
toolCalls: ToolCallState[]
status: 'pending' | 'in_progress' | 'completed' | 'error'
startTime?: number
endTime?: number
summary?: string
}
interface ToolCallProps {
toolCall: ToolCallState
isCompact?: boolean

View File

@@ -4,4 +4,8 @@ export const SIM_AGENT_API_URL_DEFAULT = 'https://copilot.sim.ai'
export const SIM_AGENT_VERSION = '1.0.3'
/** Resolved copilot backend URL — reads from env with fallback to default. */
export const SIM_AGENT_API_URL = env.SIM_AGENT_API_URL || SIM_AGENT_API_URL_DEFAULT
const rawAgentUrl = env.SIM_AGENT_API_URL || SIM_AGENT_API_URL_DEFAULT
export const SIM_AGENT_API_URL =
rawAgentUrl.startsWith('http://') || rawAgentUrl.startsWith('https://')
? rawAgentUrl
: SIM_AGENT_API_URL_DEFAULT

View File

@@ -1,21 +1,10 @@
import { createLogger } from '@sim/logger'
import { SIM_AGENT_API_URL } from '@/lib/copilot/constants'
import { handleSubagentRouting, sseHandlers, subAgentHandlers } from '@/lib/copilot/orchestrator/sse-handlers'
import { env } from '@/lib/core/config/env'
import {
normalizeSseEvent,
shouldSkipToolCallEvent,
shouldSkipToolResultEvent,
} from '@/lib/copilot/orchestrator/sse-utils'
import { parseSSEStream } from '@/lib/copilot/orchestrator/sse-parser'
import { prepareExecutionContext } from '@/lib/copilot/orchestrator/tool-executor'
import type {
OrchestratorOptions,
OrchestratorResult,
SSEEvent,
StreamingContext,
ToolCallSummary,
} from '@/lib/copilot/orchestrator/types'
import type { OrchestratorOptions, OrchestratorResult } from '@/lib/copilot/orchestrator/types'
import { env } from '@/lib/core/config/env'
import { buildToolCallSummaries, createStreamingContext, runStreamLoop } from './stream-core'
const logger = createLogger('CopilotOrchestrator')
export interface OrchestrateStreamOptions extends OrchestratorOptions {
@@ -24,118 +13,43 @@ export interface OrchestrateStreamOptions extends OrchestratorOptions {
chatId?: string
}
/**
* Orchestrate a copilot SSE stream and execute tool calls server-side.
*/
export async function orchestrateCopilotStream(
requestPayload: Record<string, any>,
options: OrchestrateStreamOptions
): Promise<OrchestratorResult> {
const { userId, workflowId, chatId, timeout = 300000, abortSignal } = options
const { userId, workflowId, chatId } = options
const execContext = await prepareExecutionContext(userId, workflowId)
const context: StreamingContext = {
const context = createStreamingContext({
chatId,
conversationId: undefined,
messageId: requestPayload?.messageId || crypto.randomUUID(),
accumulatedContent: '',
contentBlocks: [],
toolCalls: new Map(),
currentThinkingBlock: null,
isInThinkingBlock: false,
subAgentParentToolCallId: undefined,
subAgentContent: {},
subAgentToolCalls: {},
pendingContent: '',
streamComplete: false,
wasAborted: false,
errors: [],
}
})
try {
const response = await fetch(`${SIM_AGENT_API_URL}/api/chat-completion-streaming`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
...(env.COPILOT_API_KEY ? { 'x-api-key': env.COPILOT_API_KEY } : {}),
await runStreamLoop(
`${SIM_AGENT_API_URL}/api/chat-completion-streaming`,
{
method: 'POST',
headers: {
'Content-Type': 'application/json',
...(env.COPILOT_API_KEY ? { 'x-api-key': env.COPILOT_API_KEY } : {}),
},
body: JSON.stringify(requestPayload),
},
body: JSON.stringify(requestPayload),
signal: abortSignal,
})
context,
execContext,
options
)
if (!response.ok) {
const errorText = await response.text().catch(() => '')
throw new Error(
`Copilot backend error (${response.status}): ${errorText || response.statusText}`
)
const result: OrchestratorResult = {
success: context.errors.length === 0,
content: context.accumulatedContent,
contentBlocks: context.contentBlocks,
toolCalls: buildToolCallSummaries(context),
chatId: context.chatId,
conversationId: context.conversationId,
errors: context.errors.length ? context.errors : undefined,
}
if (!response.body) {
throw new Error('Copilot backend response missing body')
}
const reader = response.body.getReader()
const decoder = new TextDecoder()
const timeoutId = setTimeout(() => {
context.errors.push('Request timed out')
context.streamComplete = true
reader.cancel().catch(() => {})
}, timeout)
try {
for await (const event of parseSSEStream(reader, decoder, abortSignal)) {
if (abortSignal?.aborted) {
context.wasAborted = true
break
}
const normalizedEvent = normalizeSseEvent(event)
// Skip duplicate tool events to prevent state regressions.
const shouldSkipToolCall = shouldSkipToolCallEvent(normalizedEvent)
const shouldSkipToolResult = shouldSkipToolResultEvent(normalizedEvent)
if (!shouldSkipToolCall && !shouldSkipToolResult) {
await forwardEvent(normalizedEvent, options)
}
if (normalizedEvent.type === 'subagent_start') {
const eventData = normalizedEvent.data as Record<string, unknown> | undefined
const toolCallId = eventData?.tool_call_id as string | undefined
if (toolCallId) {
context.subAgentParentToolCallId = toolCallId
context.subAgentContent[toolCallId] = ''
context.subAgentToolCalls[toolCallId] = []
}
continue
}
if (normalizedEvent.type === 'subagent_end') {
context.subAgentParentToolCallId = undefined
continue
}
if (handleSubagentRouting(normalizedEvent, context)) {
const handler = subAgentHandlers[normalizedEvent.type]
if (handler) {
await handler(normalizedEvent, context, execContext, options)
}
if (context.streamComplete) break
continue
}
const handler = sseHandlers[normalizedEvent.type]
if (handler) {
await handler(normalizedEvent, context, execContext, options)
}
if (context.streamComplete) break
}
} finally {
clearTimeout(timeoutId)
}
const result = buildResult(context)
await options.onComplete?.(result)
return result
} catch (error) {
@@ -153,37 +67,3 @@ export async function orchestrateCopilotStream(
}
}
}
async function forwardEvent(event: SSEEvent, options: OrchestratorOptions): Promise<void> {
try {
await options.onEvent?.(event)
} catch (error) {
logger.warn('Failed to forward SSE event', {
type: event.type,
error: error instanceof Error ? error.message : String(error),
})
}
}
function buildResult(context: StreamingContext): OrchestratorResult {
const toolCalls: ToolCallSummary[] = Array.from(context.toolCalls.values()).map((toolCall) => ({
id: toolCall.id,
name: toolCall.name,
status: toolCall.status,
params: toolCall.params,
result: toolCall.result?.output,
error: toolCall.error,
durationMs:
toolCall.endTime && toolCall.startTime ? toolCall.endTime - toolCall.startTime : undefined,
}))
return {
success: context.errors.length === 0,
content: context.accumulatedContent,
contentBlocks: context.contentBlocks,
toolCalls,
chatId: context.chatId,
conversationId: context.conversationId,
errors: context.errors.length ? context.errors : undefined,
}
}

View File

@@ -1,8 +1,9 @@
/**
* @vitest-environment node
*/
import { beforeEach, describe, expect, it, vi } from 'vitest'
import { loggerMock } from '@sim/testing'
import { beforeEach, describe, expect, it, vi } from 'vitest'
vi.mock('@sim/logger', () => loggerMock)
@@ -92,4 +93,3 @@ describe('sse-handlers tool lifecycle', () => {
expect(markToolComplete).toHaveBeenCalledTimes(1)
})
})

View File

@@ -1,17 +1,12 @@
import { createLogger } from '@sim/logger'
import {
INTERRUPT_TOOL_SET,
RESPOND_TOOL_SET,
SUBAGENT_TOOL_SET,
} from '@/lib/copilot/orchestrator/config'
import { getToolConfirmation } from '@/lib/copilot/orchestrator/persistence'
import { RESPOND_TOOL_SET, SUBAGENT_TOOL_SET } from '@/lib/copilot/orchestrator/config'
import {
asRecord,
getEventData,
markToolResultSeen,
wasToolResultSeen,
} from '@/lib/copilot/orchestrator/sse-utils'
import { executeToolServerSide, markToolComplete } from '@/lib/copilot/orchestrator/tool-executor'
import { markToolComplete } from '@/lib/copilot/orchestrator/tool-executor'
import type {
ContentBlock,
ExecutionContext,
@@ -20,6 +15,7 @@ import type {
StreamingContext,
ToolCallState,
} from '@/lib/copilot/orchestrator/types'
import { executeToolAndReport, isInterruptToolName, waitForToolDecision } from './tool-execution'
const logger = createLogger('CopilotSseHandlers')
@@ -39,100 +35,6 @@ function addContentBlock(context: StreamingContext, block: Omit<ContentBlock, 't
})
}
async function executeToolAndReport(
toolCallId: string,
context: StreamingContext,
execContext: ExecutionContext,
options?: OrchestratorOptions
): Promise<void> {
const toolCall = context.toolCalls.get(toolCallId)
if (!toolCall) return
if (toolCall.status === 'executing') return
if (wasToolResultSeen(toolCall.id)) return
toolCall.status = 'executing'
try {
const result = await executeToolServerSide(toolCall, execContext)
toolCall.status = result.success ? 'success' : 'error'
toolCall.result = result
toolCall.error = result.error
toolCall.endTime = Date.now()
// If create_workflow was successful, update the execution context with the new workflowId
// This ensures subsequent tools in the same stream have access to the workflowId
const output = asRecord(result.output)
if (
toolCall.name === 'create_workflow' &&
result.success &&
output.workflowId &&
!execContext.workflowId
) {
execContext.workflowId = output.workflowId as string
if (output.workspaceId) {
execContext.workspaceId = output.workspaceId as string
}
}
markToolResultSeen(toolCall.id)
await markToolComplete(
toolCall.id,
toolCall.name,
result.success ? 200 : 500,
result.error || (result.success ? 'Tool completed' : 'Tool failed'),
result.output
)
await options?.onEvent?.({
type: 'tool_result',
toolCallId: toolCall.id,
toolName: toolCall.name,
success: result.success,
result: result.output,
data: {
id: toolCall.id,
name: toolCall.name,
success: result.success,
result: result.output,
},
})
} catch (error) {
toolCall.status = 'error'
toolCall.error = error instanceof Error ? error.message : String(error)
toolCall.endTime = Date.now()
markToolResultSeen(toolCall.id)
await markToolComplete(toolCall.id, toolCall.name, 500, toolCall.error)
await options?.onEvent?.({
type: 'tool_error',
toolCallId: toolCall.id,
data: {
id: toolCall.id,
name: toolCall.name,
error: toolCall.error,
},
})
}
}
async function waitForToolDecision(
toolCallId: string,
timeoutMs: number
): Promise<{ status: string; message?: string } | null> {
const start = Date.now()
while (Date.now() - start < timeoutMs) {
const decision = await getToolConfirmation(toolCallId)
if (decision?.status) {
return decision
}
await new Promise((resolve) => setTimeout(resolve, 100))
}
return null
}
export const sseHandlers: Record<string, SSEHandler> = {
chat_id: (event, context) => {
context.chatId = asRecord(event.data).chatId
@@ -145,13 +47,13 @@ export const sseHandlers: Record<string, SSEHandler> = {
const current = context.toolCalls.get(toolCallId)
if (!current) return
// Determine success: explicit success field, or if there's result data without explicit failure
// Determine success: explicit success field, or if there's result data without explicit failure.
const hasExplicitSuccess = data?.success !== undefined || data?.result?.success !== undefined
const explicitSuccess = data?.success ?? data?.result?.success
const hasResultData = data?.result !== undefined || data?.data !== undefined
const hasError = !!data?.error || !!data?.result?.error
// If explicitly set, use that; otherwise infer from data presence
// If explicitly set, use that; otherwise infer from data presence.
const success = hasExplicitSuccess ? !!explicitSuccess : hasResultData && !hasError
current.status = success ? 'success' : 'error'
@@ -232,13 +134,13 @@ export const sseHandlers: Record<string, SSEHandler> = {
const toolCall = context.toolCalls.get(toolCallId)
if (!toolCall) return
// Subagent tools are executed by the copilot backend, not sim side
// Subagent tools are executed by the copilot backend, not sim side.
if (SUBAGENT_TOOL_SET.has(toolName)) {
return
}
// Respond tools are internal to copilot's subagent system - skip execution
// The copilot backend handles these internally to signal subagent completion
// Respond tools are internal to copilot's subagent system - skip execution.
// The copilot backend handles these internally to signal subagent completion.
if (RESPOND_TOOL_SET.has(toolName)) {
toolCall.status = 'success'
toolCall.endTime = Date.now()
@@ -249,7 +151,7 @@ export const sseHandlers: Record<string, SSEHandler> = {
return
}
const isInterruptTool = INTERRUPT_TOOL_SET.has(toolName)
const isInterruptTool = isInterruptToolName(toolName)
const isInteractive = options.interactive === true
if (isInterruptTool && isInteractive) {
@@ -358,8 +260,7 @@ export const sseHandlers: Record<string, SSEHandler> = {
},
error: (event, context) => {
const d = asRecord(event.data)
const message =
d.message || d.error || (typeof event.data === 'string' ? event.data : null)
const message = d.message || d.error || (typeof event.data === 'string' ? event.data : null)
if (message) {
context.errors.push(message)
}
@@ -388,7 +289,7 @@ export const subAgentHandlers: Record<string, SSEHandler> = {
const args = toolData.arguments || toolData.input || asRecord(event.data).input
const existing = context.toolCalls.get(toolCallId)
// Ignore late/duplicate tool_call events once we already have a result
// Ignore late/duplicate tool_call events once we already have a result.
if (wasToolResultSeen(toolCallId) || existing?.endTime) {
return
}
@@ -401,7 +302,7 @@ export const subAgentHandlers: Record<string, SSEHandler> = {
startTime: Date.now(),
}
// Store in both places - but do NOT overwrite existing tool call state for the same id
// Store in both places - but do NOT overwrite existing tool call state for the same id.
if (!context.subAgentToolCalls[parentToolCallId]) {
context.subAgentToolCalls[parentToolCallId] = []
}
@@ -414,7 +315,7 @@ export const subAgentHandlers: Record<string, SSEHandler> = {
if (isPartial) return
// Respond tools are internal to copilot's subagent system - skip execution
// Respond tools are internal to copilot's subagent system - skip execution.
if (RESPOND_TOOL_SET.has(toolName)) {
toolCall.status = 'success'
toolCall.endTime = Date.now()
@@ -436,14 +337,14 @@ export const subAgentHandlers: Record<string, SSEHandler> = {
const toolCallId = event.toolCallId || data?.id
if (!toolCallId) return
// Update in subAgentToolCalls
// Update in subAgentToolCalls.
const toolCalls = context.subAgentToolCalls[parentToolCallId] || []
const subAgentToolCall = toolCalls.find((tc) => tc.id === toolCallId)
// Also update in main toolCalls (where we added it for execution)
// Also update in main toolCalls (where we added it for execution).
const mainToolCall = context.toolCalls.get(toolCallId)
// Use same success inference logic as main handler
// Use same success inference logic as main handler.
const hasExplicitSuccess = data?.success !== undefined || data?.result?.success !== undefined
const explicitSuccess = data?.success ?? data?.result?.success
const hasResultData = data?.result !== undefined || data?.data !== undefined

View File

@@ -0,0 +1,2 @@
export type { SSEHandler } from './handlers'
export { handleSubagentRouting, sseHandlers, subAgentHandlers } from './handlers'

View File

@@ -0,0 +1,117 @@
import { createLogger } from '@sim/logger'
import { INTERRUPT_TOOL_SET } from '@/lib/copilot/orchestrator/config'
import { getToolConfirmation } from '@/lib/copilot/orchestrator/persistence'
import {
asRecord,
markToolResultSeen,
wasToolResultSeen,
} from '@/lib/copilot/orchestrator/sse-utils'
import { executeToolServerSide, markToolComplete } from '@/lib/copilot/orchestrator/tool-executor'
import type {
ExecutionContext,
OrchestratorOptions,
SSEEvent,
StreamingContext,
} from '@/lib/copilot/orchestrator/types'
const logger = createLogger('CopilotSseToolExecution')
export function isInterruptToolName(toolName: string): boolean {
return INTERRUPT_TOOL_SET.has(toolName)
}
export async function executeToolAndReport(
toolCallId: string,
context: StreamingContext,
execContext: ExecutionContext,
options?: OrchestratorOptions
): Promise<void> {
const toolCall = context.toolCalls.get(toolCallId)
if (!toolCall) return
if (toolCall.status === 'executing') return
if (wasToolResultSeen(toolCall.id)) return
toolCall.status = 'executing'
try {
const result = await executeToolServerSide(toolCall, execContext)
toolCall.status = result.success ? 'success' : 'error'
toolCall.result = result
toolCall.error = result.error
toolCall.endTime = Date.now()
// If create_workflow was successful, update the execution context with the new workflowId.
// This ensures subsequent tools in the same stream have access to the workflowId.
const output = asRecord(result.output)
if (
toolCall.name === 'create_workflow' &&
result.success &&
output.workflowId &&
!execContext.workflowId
) {
execContext.workflowId = output.workflowId as string
if (output.workspaceId) {
execContext.workspaceId = output.workspaceId as string
}
}
markToolResultSeen(toolCall.id)
await markToolComplete(
toolCall.id,
toolCall.name,
result.success ? 200 : 500,
result.error || (result.success ? 'Tool completed' : 'Tool failed'),
result.output
)
const resultEvent: SSEEvent = {
type: 'tool_result',
toolCallId: toolCall.id,
toolName: toolCall.name,
success: result.success,
result: result.output,
data: {
id: toolCall.id,
name: toolCall.name,
success: result.success,
result: result.output,
},
}
await options?.onEvent?.(resultEvent)
} catch (error) {
toolCall.status = 'error'
toolCall.error = error instanceof Error ? error.message : String(error)
toolCall.endTime = Date.now()
markToolResultSeen(toolCall.id)
await markToolComplete(toolCall.id, toolCall.name, 500, toolCall.error)
const errorEvent: SSEEvent = {
type: 'tool_error',
toolCallId: toolCall.id,
data: {
id: toolCall.id,
name: toolCall.name,
error: toolCall.error,
},
}
await options?.onEvent?.(errorEvent)
}
}
export async function waitForToolDecision(
toolCallId: string,
timeoutMs: number
): Promise<{ status: string; message?: string } | null> {
const start = Date.now()
while (Date.now() - start < timeoutMs) {
const decision = await getToolConfirmation(toolCallId)
if (decision?.status) {
return decision
}
await new Promise((resolve) => setTimeout(resolve, 100))
}
return null
}

View File

@@ -40,4 +40,3 @@ describe('sse-utils', () => {
expect(shouldSkipToolResultEvent(event as any)).toBe(true)
})
})

View File

@@ -120,4 +120,3 @@ export function shouldSkipToolResultEvent(event: SSEEvent): boolean {
markToolResultSeen(toolCallId)
return false
}

View File

@@ -1,8 +1,9 @@
/**
* @vitest-environment node
*/
import { beforeEach, describe, expect, it, vi } from 'vitest'
import { loggerMock } from '@sim/testing'
import { beforeEach, describe, expect, it, vi } from 'vitest'
vi.mock('@sim/logger', () => loggerMock)
@@ -25,27 +26,29 @@ const createRedisStub = () => {
hset: vi.fn().mockResolvedValue(1),
hgetall: vi.fn().mockResolvedValue({}),
expire: vi.fn().mockResolvedValue(1),
eval: vi.fn().mockImplementation(
(
_lua: string,
_keysCount: number,
seqKey: string,
eventsKey: string,
_ttl: number,
_limit: number,
streamId: string,
eventJson: string
) => {
const current = counters.get(seqKey) || 0
const next = current + 1
counters.set(seqKey, next)
const entry = JSON.stringify({ eventId: next, streamId, event: JSON.parse(eventJson) })
const list = events.get(eventsKey) || []
list.push({ score: next, value: entry })
events.set(eventsKey, list)
return next
}
),
eval: vi
.fn()
.mockImplementation(
(
_lua: string,
_keysCount: number,
seqKey: string,
eventsKey: string,
_ttl: number,
_limit: number,
streamId: string,
eventJson: string
) => {
const current = counters.get(seqKey) || 0
const next = current + 1
counters.set(seqKey, next)
const entry = JSON.stringify({ eventId: next, streamId, event: JSON.parse(eventJson) })
const list = events.get(eventsKey) || []
list.push({ score: next, value: entry })
events.set(eventsKey, list)
return next
}
),
incrby: vi.fn().mockImplementation((key: string, amount: number) => {
const current = counters.get(key) || 0
const next = current + amount
@@ -58,19 +61,18 @@ const createRedisStub = () => {
return Promise.resolve(readEntries(key, minVal, maxVal))
}),
pipeline: vi.fn().mockImplementation(() => {
const api = {
zadd: vi.fn().mockImplementation((key: string, ...args: Array<string | number>) => {
const list = events.get(key) || []
for (let i = 0; i < args.length; i += 2) {
list.push({ score: Number(args[i]), value: String(args[i + 1]) })
}
events.set(key, list)
return api
}),
expire: vi.fn().mockReturnValue(api),
zremrangebyrank: vi.fn().mockReturnValue(api),
exec: vi.fn().mockResolvedValue([]),
}
const api: Record<string, any> = {}
api.zadd = vi.fn().mockImplementation((key: string, ...args: Array<string | number>) => {
const list = events.get(key) || []
for (let i = 0; i < args.length; i += 2) {
list.push({ score: Number(args[i]), value: String(args[i + 1]) })
}
events.set(key, list)
return api
})
api.expire = vi.fn().mockReturnValue(api)
api.zremrangebyrank = vi.fn().mockReturnValue(api)
api.exec = vi.fn().mockResolvedValue([])
return api
}),
}
@@ -115,4 +117,3 @@ describe('stream-buffer', () => {
expect(events.map((entry) => entry.event.data)).toEqual(['a', 'b'])
})
})

View File

@@ -31,7 +31,10 @@ export function getStreamBufferConfig(): StreamBufferConfig {
ttlSeconds: parseNumber(env.COPILOT_STREAM_TTL_SECONDS, STREAM_DEFAULTS.ttlSeconds),
eventLimit: parseNumber(env.COPILOT_STREAM_EVENT_LIMIT, STREAM_DEFAULTS.eventLimit),
reserveBatch: parseNumber(env.COPILOT_STREAM_RESERVE_BATCH, STREAM_DEFAULTS.reserveBatch),
flushIntervalMs: parseNumber(env.COPILOT_STREAM_FLUSH_INTERVAL_MS, STREAM_DEFAULTS.flushIntervalMs),
flushIntervalMs: parseNumber(
env.COPILOT_STREAM_FLUSH_INTERVAL_MS,
STREAM_DEFAULTS.flushIntervalMs
),
flushMaxBatch: parseNumber(env.COPILOT_STREAM_FLUSH_MAX_BATCH, STREAM_DEFAULTS.flushMaxBatch),
}
}
@@ -190,8 +193,6 @@ export function createStreamEventWriter(streamId: string): StreamEventWriter {
let nextEventId = 0
let maxReservedId = 0
let flushTimer: ReturnType<typeof setTimeout> | null = null
let isFlushing = false
const scheduleFlush = () => {
if (flushTimer) return
flushTimer = setTimeout(() => {
@@ -210,9 +211,11 @@ export function createStreamEventWriter(streamId: string): StreamEventWriter {
}
}
const flush = async () => {
if (isFlushing || pending.length === 0) return
isFlushing = true
let flushPromise: Promise<void> | null = null
let closed = false
const doFlush = async () => {
if (pending.length === 0) return
const batch = pending
pending = []
try {
@@ -233,13 +236,25 @@ export function createStreamEventWriter(streamId: string): StreamEventWriter {
error: error instanceof Error ? error.message : String(error),
})
pending = batch.concat(pending)
}
}
const flush = async () => {
if (flushPromise) {
await flushPromise
return
}
flushPromise = doFlush()
try {
await flushPromise
} finally {
isFlushing = false
flushPromise = null
if (pending.length > 0) scheduleFlush()
}
}
const write = async (event: Record<string, any>) => {
if (closed) return { eventId: 0, streamId, event }
if (nextEventId === 0 || nextEventId > maxReservedId) {
await reserveIds(1)
}
@@ -255,6 +270,7 @@ export function createStreamEventWriter(streamId: string): StreamEventWriter {
}
const close = async () => {
closed = true
if (flushTimer) {
clearTimeout(flushTimer)
flushTimer = null

View File

@@ -0,0 +1,179 @@
import { createLogger } from '@sim/logger'
import {
handleSubagentRouting,
sseHandlers,
subAgentHandlers,
} from '@/lib/copilot/orchestrator/sse-handlers'
import { parseSSEStream } from '@/lib/copilot/orchestrator/sse-parser'
import {
normalizeSseEvent,
shouldSkipToolCallEvent,
shouldSkipToolResultEvent,
} from '@/lib/copilot/orchestrator/sse-utils'
import type {
ExecutionContext,
OrchestratorOptions,
SSEEvent,
StreamingContext,
ToolCallSummary,
} from '@/lib/copilot/orchestrator/types'
const logger = createLogger('CopilotStreamCore')
/**
* Options for the shared stream processing loop.
*/
export interface StreamLoopOptions extends OrchestratorOptions {
/**
* Called for each normalized event BEFORE standard handler dispatch.
* Return true to skip the default handler for this event.
*/
onBeforeDispatch?: (event: SSEEvent, context: StreamingContext) => boolean | void
}
/**
* Create a fresh StreamingContext.
*/
export function createStreamingContext(overrides?: Partial<StreamingContext>): StreamingContext {
return {
chatId: undefined,
conversationId: undefined,
messageId: crypto.randomUUID(),
accumulatedContent: '',
contentBlocks: [],
toolCalls: new Map(),
currentThinkingBlock: null,
isInThinkingBlock: false,
subAgentParentToolCallId: undefined,
subAgentContent: {},
subAgentToolCalls: {},
pendingContent: '',
streamComplete: false,
wasAborted: false,
errors: [],
...overrides,
}
}
/**
* Run the SSE stream processing loop.
*
* Handles: fetch -> parse -> normalize -> dedupe -> subagent routing -> handler dispatch.
* Callers provide the fetch URL/options and can intercept events via onBeforeDispatch.
*/
export async function runStreamLoop(
fetchUrl: string,
fetchOptions: RequestInit,
context: StreamingContext,
execContext: ExecutionContext,
options: StreamLoopOptions
): Promise<void> {
const { timeout = 300000, abortSignal } = options
const response = await fetch(fetchUrl, {
...fetchOptions,
signal: abortSignal,
})
if (!response.ok) {
const errorText = await response.text().catch(() => '')
throw new Error(`Copilot backend error (${response.status}): ${errorText || response.statusText}`)
}
if (!response.body) {
throw new Error('Copilot backend response missing body')
}
const reader = response.body.getReader()
const decoder = new TextDecoder()
const timeoutId = setTimeout(() => {
context.errors.push('Request timed out')
context.streamComplete = true
reader.cancel().catch(() => {})
}, timeout)
try {
for await (const event of parseSSEStream(reader, decoder, abortSignal)) {
if (abortSignal?.aborted) {
context.wasAborted = true
break
}
const normalizedEvent = normalizeSseEvent(event)
// Skip duplicate tool events.
const shouldSkipToolCall = shouldSkipToolCallEvent(normalizedEvent)
const shouldSkipToolResult = shouldSkipToolResultEvent(normalizedEvent)
if (!shouldSkipToolCall && !shouldSkipToolResult) {
try {
await options.onEvent?.(normalizedEvent)
} catch (error) {
logger.warn('Failed to forward SSE event', {
type: normalizedEvent.type,
error: error instanceof Error ? error.message : String(error),
})
}
}
// Let the caller intercept before standard dispatch.
if (options.onBeforeDispatch?.(normalizedEvent, context)) {
if (context.streamComplete) break
continue
}
// Standard subagent start/end handling.
if (normalizedEvent.type === 'subagent_start') {
const eventData = normalizedEvent.data as Record<string, unknown> | undefined
const toolCallId = eventData?.tool_call_id as string | undefined
if (toolCallId) {
context.subAgentParentToolCallId = toolCallId
context.subAgentContent[toolCallId] = ''
context.subAgentToolCalls[toolCallId] = []
}
continue
}
if (normalizedEvent.type === 'subagent_end') {
context.subAgentParentToolCallId = undefined
continue
}
// Subagent event routing.
if (handleSubagentRouting(normalizedEvent, context)) {
const handler = subAgentHandlers[normalizedEvent.type]
if (handler) {
await handler(normalizedEvent, context, execContext, options)
}
if (context.streamComplete) break
continue
}
// Main event handler dispatch.
const handler = sseHandlers[normalizedEvent.type]
if (handler) {
await handler(normalizedEvent, context, execContext, options)
}
if (context.streamComplete) break
}
} finally {
clearTimeout(timeoutId)
}
}
/**
* Build a ToolCallSummary array from the streaming context.
*/
export function buildToolCallSummaries(context: StreamingContext): ToolCallSummary[] {
return Array.from(context.toolCalls.values()).map((toolCall) => ({
id: toolCall.id,
name: toolCall.name,
status: toolCall.status,
params: toolCall.params,
result: toolCall.result?.output,
error: toolCall.error,
durationMs:
toolCall.endTime && toolCall.startTime ? toolCall.endTime - toolCall.startTime : undefined,
}))
}

View File

@@ -1,13 +1,5 @@
import { createLogger } from '@sim/logger'
import { SIM_AGENT_API_URL } from '@/lib/copilot/constants'
import { handleSubagentRouting, sseHandlers, subAgentHandlers } from '@/lib/copilot/orchestrator/sse-handlers'
import { env } from '@/lib/core/config/env'
import {
normalizeSseEvent,
shouldSkipToolCallEvent,
shouldSkipToolResultEvent,
} from '@/lib/copilot/orchestrator/sse-utils'
import { parseSSEStream } from '@/lib/copilot/orchestrator/sse-parser'
import { prepareExecutionContext } from '@/lib/copilot/orchestrator/tool-executor'
import type {
ExecutionContext,
@@ -16,7 +8,9 @@ import type {
StreamingContext,
ToolCallSummary,
} from '@/lib/copilot/orchestrator/types'
import { env } from '@/lib/core/config/env'
import { getEffectiveDecryptedEnv } from '@/lib/environment/utils'
import { buildToolCallSummaries, createStreamingContext, runStreamLoop } from './stream-core'
const logger = createLogger('CopilotSubagentOrchestrator')
@@ -46,131 +40,58 @@ export async function orchestrateSubagentStream(
requestPayload: Record<string, any>,
options: SubagentOrchestratorOptions
): Promise<SubagentOrchestratorResult> {
const { userId, workflowId, workspaceId, timeout = 300000, abortSignal } = options
const { userId, workflowId, workspaceId } = options
const execContext = await buildExecutionContext(userId, workflowId, workspaceId)
const context: StreamingContext = {
chatId: undefined,
conversationId: undefined,
const context = createStreamingContext({
messageId: requestPayload?.messageId || crypto.randomUUID(),
accumulatedContent: '',
contentBlocks: [],
toolCalls: new Map(),
currentThinkingBlock: null,
isInThinkingBlock: false,
subAgentParentToolCallId: undefined,
subAgentContent: {},
subAgentToolCalls: {},
pendingContent: '',
streamComplete: false,
wasAborted: false,
errors: [],
}
})
let structuredResult: SubagentOrchestratorResult['structuredResult']
try {
const response = await fetch(`${SIM_AGENT_API_URL}/api/subagent/${agentId}`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
...(env.COPILOT_API_KEY ? { 'x-api-key': env.COPILOT_API_KEY } : {}),
await runStreamLoop(
`${SIM_AGENT_API_URL}/api/subagent/${agentId}`,
{
method: 'POST',
headers: {
'Content-Type': 'application/json',
...(env.COPILOT_API_KEY ? { 'x-api-key': env.COPILOT_API_KEY } : {}),
},
body: JSON.stringify({ ...requestPayload, stream: true }),
},
body: JSON.stringify({ ...requestPayload, stream: true }),
signal: abortSignal,
})
if (!response.ok) {
const errorText = await response.text().catch(() => '')
throw new Error(
`Copilot backend error (${response.status}): ${errorText || response.statusText}`
)
}
if (!response.body) {
throw new Error('Copilot backend response missing body')
}
const reader = response.body.getReader()
const decoder = new TextDecoder()
const timeoutId = setTimeout(() => {
context.errors.push('Request timed out')
context.streamComplete = true
reader.cancel().catch(() => {})
}, timeout)
try {
for await (const event of parseSSEStream(reader, decoder, abortSignal)) {
if (abortSignal?.aborted) {
context.wasAborted = true
break
}
const normalizedEvent = normalizeSseEvent(event)
// Skip duplicate tool events to prevent state regressions.
const shouldSkipToolCall = shouldSkipToolCallEvent(normalizedEvent)
const shouldSkipToolResult = shouldSkipToolResultEvent(normalizedEvent)
if (!shouldSkipToolCall && !shouldSkipToolResult) {
await forwardEvent(normalizedEvent, options)
}
if (
normalizedEvent.type === 'structured_result' ||
normalizedEvent.type === 'subagent_result'
) {
structuredResult = normalizeStructuredResult(normalizedEvent.data)
context.streamComplete = true
continue
}
// Handle subagent_start/subagent_end events to track nested subagent calls
if (normalizedEvent.type === 'subagent_start') {
const eventData = normalizedEvent.data as Record<string, unknown> | undefined
const toolCallId = eventData?.tool_call_id as string | undefined
if (toolCallId) {
context.subAgentParentToolCallId = toolCallId
context.subAgentContent[toolCallId] = ''
context.subAgentToolCalls[toolCallId] = []
context,
execContext,
{
...options,
onBeforeDispatch: (event: SSEEvent, ctx: StreamingContext) => {
// Handle structured_result / subagent_result - subagent-specific.
if (event.type === 'structured_result' || event.type === 'subagent_result') {
structuredResult = normalizeStructuredResult(event.data)
ctx.streamComplete = true
return true // skip default dispatch
}
continue
}
if (normalizedEvent.type === 'subagent_end') {
context.subAgentParentToolCallId = undefined
continue
}
// For direct subagent calls, events may have the subagent field set (e.g., subagent: "discovery")
// but no subagent_start event because this IS the top-level agent. Skip subagent routing
// for events where the subagent field matches the current agentId - these are top-level events.
const isTopLevelSubagentEvent =
normalizedEvent.subagent === agentId && !context.subAgentParentToolCallId
// Only route to subagent handlers for nested subagent events (not matching current agentId)
if (!isTopLevelSubagentEvent && handleSubagentRouting(normalizedEvent, context)) {
const handler = subAgentHandlers[normalizedEvent.type]
if (handler) {
await handler(normalizedEvent, context, execContext, options)
// For direct subagent calls, events may have the subagent field set
// but no subagent_start because this IS the top-level agent.
// Skip subagent routing for events where the subagent field matches
// the current agentId - these are top-level events.
if (event.subagent === agentId && !ctx.subAgentParentToolCallId) {
return false // let default dispatch handle it
}
if (context.streamComplete) break
continue
}
// Process as a regular SSE event (including top-level subagent events)
const handler = sseHandlers[normalizedEvent.type]
if (handler) {
await handler(normalizedEvent, context, execContext, options)
}
if (context.streamComplete) break
return false // let default dispatch handle it
},
}
} finally {
clearTimeout(timeoutId)
}
)
const result = buildResult(context, structuredResult)
const result: SubagentOrchestratorResult = {
success: context.errors.length === 0 && !context.wasAborted,
content: context.accumulatedContent,
toolCalls: buildToolCallSummaries(context),
structuredResult,
errors: context.errors.length ? context.errors : undefined,
}
await options.onComplete?.(result)
return result
} catch (error) {
@@ -186,26 +107,14 @@ export async function orchestrateSubagentStream(
}
}
async function forwardEvent(event: SSEEvent, options: OrchestratorOptions): Promise<void> {
try {
await options.onEvent?.(event)
} catch (error) {
logger.warn('Failed to forward SSE event', {
type: event.type,
error: error instanceof Error ? error.message : String(error),
})
}
}
function normalizeStructuredResult(data: any): SubagentOrchestratorResult['structuredResult'] {
if (!data || typeof data !== 'object') {
return undefined
}
function normalizeStructuredResult(data: unknown): SubagentOrchestratorResult['structuredResult'] {
if (!data || typeof data !== 'object') return undefined
const d = data as Record<string, any>
return {
type: data.result_type || data.type,
summary: data.summary,
data: data.data ?? data,
success: data.success,
type: d.result_type || d.type,
summary: d.summary,
data: d.data ?? d,
success: d.success,
}
}
@@ -217,7 +126,6 @@ async function buildExecutionContext(
if (workflowId) {
return prepareExecutionContext(userId, workflowId)
}
const decryptedEnvVars = await getEffectiveDecryptedEnv(userId, workspaceId)
return {
userId,
@@ -226,27 +134,3 @@ async function buildExecutionContext(
decryptedEnvVars,
}
}
function buildResult(
context: StreamingContext,
structuredResult?: SubagentOrchestratorResult['structuredResult']
): SubagentOrchestratorResult {
const toolCalls: ToolCallSummary[] = Array.from(context.toolCalls.values()).map((toolCall) => ({
id: toolCall.id,
name: toolCall.name,
status: toolCall.status,
params: toolCall.params,
result: toolCall.result?.output,
error: toolCall.error,
durationMs:
toolCall.endTime && toolCall.startTime ? toolCall.endTime - toolCall.startTime : undefined,
}))
return {
success: context.errors.length === 0 && !context.wasAborted,
content: context.accumulatedContent,
toolCalls,
structuredResult,
errors: context.errors.length ? context.errors : undefined,
}
}

View File

@@ -127,4 +127,3 @@ export async function getAccessibleWorkflowsForUser(
.where(or(...workflowConditions))
.orderBy(asc(workflow.sortOrder), asc(workflow.createdAt), asc(workflow.id))
}

View File

@@ -1,15 +1,16 @@
import crypto from 'crypto'
import { db } from '@sim/db'
import { chat, workflow, workflowMcpTool } from '@sim/db/schema'
import { chat, workflowMcpTool } from '@sim/db/schema'
import { and, eq } from 'drizzle-orm'
import type { ExecutionContext, ToolCallResult } from '@/lib/copilot/orchestrator/types'
import { sanitizeToolName } from '@/lib/mcp/workflow-tool-schema'
import { deployWorkflow, undeployWorkflow } from '@/lib/workflows/persistence/utils'
import { checkChatAccess, checkWorkflowAccessForChatCreation } from '@/app/api/chat/utils'
import { ensureWorkflowAccess } from '../access'
import type { DeployApiParams, DeployChatParams, DeployMcpParams } from '../param-types'
export async function executeDeployApi(
params: Record<string, any>,
params: DeployApiParams,
context: ExecutionContext
): Promise<ToolCallResult> {
try {
@@ -52,7 +53,7 @@ export async function executeDeployApi(
}
export async function executeDeployChat(
params: Record<string, any>,
params: DeployChatParams,
context: ExecutionContext
): Promise<ToolCallResult> {
try {
@@ -126,10 +127,12 @@ export async function executeDeployChat(
description: String(params.description || existingDeployment?.description || ''),
customizations: {
primaryColor:
params.customizations?.primaryColor || existingCustomizations.primaryColor ||
params.customizations?.primaryColor ||
existingCustomizations.primaryColor ||
'var(--brand-primary-hover-hex)',
welcomeMessage:
params.customizations?.welcomeMessage || existingCustomizations.welcomeMessage ||
params.customizations?.welcomeMessage ||
existingCustomizations.welcomeMessage ||
'Hi there! How can I help you today?',
},
authType: params.authType || existingDeployment?.authType || 'public',
@@ -184,7 +187,7 @@ export async function executeDeployChat(
}
export async function executeDeployMcp(
params: Record<string, any>,
params: DeployMcpParams,
context: ExecutionContext
): Promise<ToolCallResult> {
try {
@@ -217,14 +220,18 @@ export async function executeDeployMcp(
const existingTool = await db
.select()
.from(workflowMcpTool)
.where(and(eq(workflowMcpTool.serverId, serverId), eq(workflowMcpTool.workflowId, workflowId)))
.where(
and(eq(workflowMcpTool.serverId, serverId), eq(workflowMcpTool.workflowId, workflowId))
)
.limit(1)
const toolName = sanitizeToolName(
params.toolName || workflowRecord.name || `workflow_${workflowId}`
)
const toolDescription =
params.toolDescription || workflowRecord.description || `Execute ${workflowRecord.name} workflow`
params.toolDescription ||
workflowRecord.description ||
`Execute ${workflowRecord.name} workflow`
const parameterSchema = params.parameterSchema || {}
if (existingTool.length > 0) {

View File

@@ -6,9 +6,14 @@ import type { ExecutionContext, ToolCallResult } from '@/lib/copilot/orchestrato
import { sanitizeToolName } from '@/lib/mcp/workflow-tool-schema'
import { hasValidStartBlock } from '@/lib/workflows/triggers/trigger-utils.server'
import { ensureWorkflowAccess } from '../access'
import type {
CheckDeploymentStatusParams,
CreateWorkspaceMcpServerParams,
ListWorkspaceMcpServersParams,
} from '../param-types'
export async function executeCheckDeploymentStatus(
params: Record<string, any>,
params: CheckDeploymentStatusParams,
context: ExecutionContext
): Promise<ToolCallResult> {
try {
@@ -85,7 +90,7 @@ export async function executeCheckDeploymentStatus(
}
export async function executeListWorkspaceMcpServers(
params: Record<string, any>,
params: ListWorkspaceMcpServersParams,
context: ExecutionContext
): Promise<ToolCallResult> {
try {
@@ -141,7 +146,7 @@ export async function executeListWorkspaceMcpServers(
}
export async function executeCreateWorkspaceMcpServer(
params: Record<string, any>,
params: CreateWorkspaceMcpServerParams,
context: ExecutionContext
): Promise<ToolCallResult> {
try {

View File

@@ -11,21 +11,7 @@ import type {
import { routeExecution } from '@/lib/copilot/tools/server/router'
import { env } from '@/lib/core/config/env'
import { getEffectiveDecryptedEnv } from '@/lib/environment/utils'
import { executeIntegrationToolDirect } from './integration-tools'
import {
executeGetBlockOutputs,
executeGetBlockUpstreamReferences,
executeGetUserWorkflow,
executeGetWorkflowData,
executeGetWorkflowFromName,
executeListFolders,
executeListUserWorkflows,
executeListUserWorkspaces,
executeCreateWorkflow,
executeCreateFolder,
executeRunWorkflow,
executeSetGlobalWorkflowVariables,
} from './workflow-tools'
import { getTool, resolveToolId } from '@/tools/utils'
import {
executeCheckDeploymentStatus,
executeCreateWorkspaceMcpServer,
@@ -35,7 +21,40 @@ import {
executeListWorkspaceMcpServers,
executeRedeploy,
} from './deployment-tools'
import { getTool, resolveToolId } from '@/tools/utils'
import { executeIntegrationToolDirect } from './integration-tools'
import type {
CheckDeploymentStatusParams,
CreateFolderParams,
CreateWorkflowParams,
CreateWorkspaceMcpServerParams,
DeployApiParams,
DeployChatParams,
DeployMcpParams,
GetBlockOutputsParams,
GetBlockUpstreamReferencesParams,
GetUserWorkflowParams,
GetWorkflowDataParams,
GetWorkflowFromNameParams,
ListFoldersParams,
ListUserWorkflowsParams,
ListWorkspaceMcpServersParams,
RunWorkflowParams,
SetGlobalWorkflowVariablesParams,
} from './param-types'
import {
executeCreateFolder,
executeCreateWorkflow,
executeGetBlockOutputs,
executeGetBlockUpstreamReferences,
executeGetUserWorkflow,
executeGetWorkflowData,
executeGetWorkflowFromName,
executeListFolders,
executeListUserWorkflows,
executeListUserWorkspaces,
executeRunWorkflow,
executeSetGlobalWorkflowVariables,
} from './workflow-tools'
const logger = createLogger('CopilotToolExecutor')
@@ -144,43 +163,43 @@ async function executeSimWorkflowTool(
): Promise<ToolCallResult> {
switch (toolName) {
case 'get_user_workflow':
return executeGetUserWorkflow(params, context)
return executeGetUserWorkflow(params as GetUserWorkflowParams, context)
case 'get_workflow_from_name':
return executeGetWorkflowFromName(params, context)
return executeGetWorkflowFromName(params as GetWorkflowFromNameParams, context)
case 'list_user_workflows':
return executeListUserWorkflows(params, context)
return executeListUserWorkflows(params as ListUserWorkflowsParams, context)
case 'list_user_workspaces':
return executeListUserWorkspaces(context)
case 'list_folders':
return executeListFolders(params, context)
return executeListFolders(params as ListFoldersParams, context)
case 'create_workflow':
return executeCreateWorkflow(params, context)
return executeCreateWorkflow(params as CreateWorkflowParams, context)
case 'create_folder':
return executeCreateFolder(params, context)
return executeCreateFolder(params as CreateFolderParams, context)
case 'get_workflow_data':
return executeGetWorkflowData(params, context)
return executeGetWorkflowData(params as GetWorkflowDataParams, context)
case 'get_block_outputs':
return executeGetBlockOutputs(params, context)
return executeGetBlockOutputs(params as GetBlockOutputsParams, context)
case 'get_block_upstream_references':
return executeGetBlockUpstreamReferences(params, context)
return executeGetBlockUpstreamReferences(params as GetBlockUpstreamReferencesParams, context)
case 'run_workflow':
return executeRunWorkflow(params, context)
return executeRunWorkflow(params as RunWorkflowParams, context)
case 'set_global_workflow_variables':
return executeSetGlobalWorkflowVariables(params, context)
return executeSetGlobalWorkflowVariables(params as SetGlobalWorkflowVariablesParams, context)
case 'deploy_api':
return executeDeployApi(params, context)
return executeDeployApi(params as DeployApiParams, context)
case 'deploy_chat':
return executeDeployChat(params, context)
return executeDeployChat(params as DeployChatParams, context)
case 'deploy_mcp':
return executeDeployMcp(params, context)
return executeDeployMcp(params as DeployMcpParams, context)
case 'redeploy':
return executeRedeploy(context)
case 'check_deployment_status':
return executeCheckDeploymentStatus(params, context)
return executeCheckDeploymentStatus(params as CheckDeploymentStatusParams, context)
case 'list_workspace_mcp_servers':
return executeListWorkspaceMcpServers(params, context)
return executeListWorkspaceMcpServers(params as ListWorkspaceMcpServersParams, context)
case 'create_workspace_mcp_server':
return executeCreateWorkspaceMcpServer(params, context)
return executeCreateWorkspaceMcpServer(params as CreateWorkspaceMcpServerParams, context)
default:
return { success: false, error: `Unsupported workflow tool: ${toolName}` }
}

View File

@@ -15,7 +15,10 @@ import { resolveToolId } from '@/tools/utils'
export async function executeIntegrationToolDirect(
toolCall: ToolCallState,
toolConfig: any,
toolConfig: {
oauth?: { required?: boolean; provider?: string }
params?: { apiKey?: { required?: boolean } }
},
context: ExecutionContext
): Promise<ToolCallResult> {
const { userId, workflowId } = context
@@ -35,6 +38,9 @@ export async function executeIntegrationToolDirect(
const decryptedEnvVars =
context.decryptedEnvVars || (await getEffectiveDecryptedEnv(userId, workspaceId))
// Deep resolution walks nested objects to replace {{ENV_VAR}} references.
// Safe because tool arguments originate from the LLM (not direct user input)
// and env vars belong to the user themselves.
const executionParams: Record<string, any> = resolveEnvVarReferences(toolArgs, decryptedEnvVars, {
deep: true,
}) as Record<string, any>
@@ -97,4 +103,3 @@ export async function executeIntegrationToolDirect(
error: result.error,
}
}

View File

@@ -0,0 +1,127 @@
/**
* Typed parameter interfaces for tool executor functions.
* Replaces Record<string, any> with specific shapes based on actual property access.
*/
// === Workflow Query Params ===
export interface GetUserWorkflowParams {
workflowId?: string
}
export interface GetWorkflowFromNameParams {
workflow_name?: string
}
export interface ListUserWorkflowsParams {
workspaceId?: string
folderId?: string
}
export interface GetWorkflowDataParams {
workflowId?: string
data_type?: string
dataType?: string
}
export interface GetBlockOutputsParams {
workflowId?: string
blockIds?: string[]
}
export interface GetBlockUpstreamReferencesParams {
workflowId?: string
blockIds: string[]
}
export interface ListFoldersParams {
workspaceId?: string
}
// === Workflow Mutation Params ===
export interface CreateWorkflowParams {
name?: string
workspaceId?: string
folderId?: string
description?: string
}
export interface CreateFolderParams {
name?: string
workspaceId?: string
parentId?: string
}
export interface RunWorkflowParams {
workflowId?: string
workflow_input?: unknown
input?: unknown
}
export interface VariableOperation {
name: string
operation: 'add' | 'edit' | 'delete'
value?: unknown
type?: string
}
export interface SetGlobalWorkflowVariablesParams {
workflowId?: string
operations?: VariableOperation[]
}
// === Deployment Params ===
export interface DeployApiParams {
workflowId?: string
action?: 'deploy' | 'undeploy'
}
export interface DeployChatParams {
workflowId?: string
action?: 'deploy' | 'undeploy' | 'update'
identifier?: string
title?: string
description?: string
customizations?: {
primaryColor?: string
secondaryColor?: string
welcomeMessage?: string
iconUrl?: string
}
authType?: 'none' | 'password' | 'public' | 'email' | 'sso'
password?: string
subdomain?: string
allowedEmails?: string[]
outputConfigs?: unknown[]
}
export interface DeployMcpParams {
workflowId?: string
action?: 'deploy' | 'undeploy'
toolName?: string
toolDescription?: string
serverId?: string
parameterSchema?: Record<string, unknown>
}
export interface CheckDeploymentStatusParams {
workflowId?: string
}
export interface ListWorkspaceMcpServersParams {
workspaceId?: string
workflowId?: string
}
export interface CreateWorkspaceMcpServerParams {
workflowId?: string
name?: string
description?: string
toolName?: string
toolDescription?: string
serverName?: string
isPublic?: boolean
workflowIds?: string[]
}

View File

@@ -1,2 +1,2 @@
export * from './queries'
export * from './mutations'
export * from './queries'

View File

@@ -8,9 +8,16 @@ import { buildDefaultWorkflowArtifacts } from '@/lib/workflows/defaults'
import { executeWorkflow } from '@/lib/workflows/executor/execute-workflow'
import { saveWorkflowToNormalizedTables } from '@/lib/workflows/persistence/utils'
import { ensureWorkflowAccess, ensureWorkspaceAccess, getDefaultWorkspaceId } from '../access'
import type {
CreateFolderParams,
CreateWorkflowParams,
RunWorkflowParams,
SetGlobalWorkflowVariablesParams,
VariableOperation,
} from '../param-types'
export async function executeCreateWorkflow(
params: Record<string, any>,
params: CreateWorkflowParams,
context: ExecutionContext
): Promise<ToolCallResult> {
try {
@@ -18,10 +25,16 @@ export async function executeCreateWorkflow(
if (!name) {
return { success: false, error: 'name is required' }
}
if (name.length > 200) {
return { success: false, error: 'Workflow name must be 200 characters or less' }
}
const description = typeof params?.description === 'string' ? params.description : null
if (description && description.length > 2000) {
return { success: false, error: 'Description must be 2000 characters or less' }
}
const workspaceId = params?.workspaceId || (await getDefaultWorkspaceId(context.userId))
const folderId = params?.folderId || null
const description = typeof params?.description === 'string' ? params.description : null
await ensureWorkspaceAccess(workspaceId, context.userId, true)
@@ -73,7 +86,7 @@ export async function executeCreateWorkflow(
}
export async function executeCreateFolder(
params: Record<string, any>,
params: CreateFolderParams,
context: ExecutionContext
): Promise<ToolCallResult> {
try {
@@ -81,6 +94,9 @@ export async function executeCreateFolder(
if (!name) {
return { success: false, error: 'name is required' }
}
if (name.length > 200) {
return { success: false, error: 'Folder name must be 200 characters or less' }
}
const workspaceId = params?.workspaceId || (await getDefaultWorkspaceId(context.userId))
const parentId = params?.parentId || null
@@ -117,7 +133,7 @@ export async function executeCreateFolder(
}
export async function executeRunWorkflow(
params: Record<string, any>,
params: RunWorkflowParams,
context: ExecutionContext
): Promise<ToolCallResult> {
try {
@@ -156,7 +172,7 @@ export async function executeRunWorkflow(
}
export async function executeSetGlobalWorkflowVariables(
params: Record<string, any>,
params: SetGlobalWorkflowVariablesParams,
context: ExecutionContext
): Promise<ToolCallResult> {
try {
@@ -164,7 +180,9 @@ export async function executeSetGlobalWorkflowVariables(
if (!workflowId) {
return { success: false, error: 'workflowId is required' }
}
const operations = Array.isArray(params.operations) ? params.operations : []
const operations: VariableOperation[] = Array.isArray(params.operations)
? params.operations
: []
const { workflow: workflowRecord } = await ensureWorkflowAccess(workflowId, context.userId)
const currentVarsRecord = (workflowRecord.variables as Record<string, any>) || {}

View File

@@ -13,16 +13,25 @@ import { getBlockOutputPaths } from '@/lib/workflows/blocks/block-outputs'
import { BlockPathCalculator } from '@/lib/workflows/blocks/block-path-calculator'
import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/persistence/utils'
import { isInputDefinitionTrigger } from '@/lib/workflows/triggers/input-definition-triggers'
import { normalizeName } from '@/executor/constants'
import {
ensureWorkflowAccess,
ensureWorkspaceAccess,
getAccessibleWorkflowsForUser,
getDefaultWorkspaceId,
} from '../access'
import { normalizeName } from '@/executor/constants'
import type {
GetBlockOutputsParams,
GetBlockUpstreamReferencesParams,
GetUserWorkflowParams,
GetWorkflowDataParams,
GetWorkflowFromNameParams,
ListFoldersParams,
ListUserWorkflowsParams,
} from '../param-types'
export async function executeGetUserWorkflow(
params: Record<string, any>,
params: GetUserWorkflowParams,
context: ExecutionContext
): Promise<ToolCallResult> {
try {
@@ -57,7 +66,7 @@ export async function executeGetUserWorkflow(
}
export async function executeGetWorkflowFromName(
params: Record<string, any>,
params: GetWorkflowFromNameParams,
context: ExecutionContext
): Promise<ToolCallResult> {
try {
@@ -95,7 +104,7 @@ export async function executeGetWorkflowFromName(
}
export async function executeListUserWorkflows(
params: Record<string, any>,
params: ListUserWorkflowsParams,
context: ExecutionContext
): Promise<ToolCallResult> {
try {
@@ -119,7 +128,9 @@ export async function executeListUserWorkflows(
}
}
export async function executeListUserWorkspaces(context: ExecutionContext): Promise<ToolCallResult> {
export async function executeListUserWorkspaces(
context: ExecutionContext
): Promise<ToolCallResult> {
try {
const workspaces = await db
.select({
@@ -146,7 +157,7 @@ export async function executeListUserWorkspaces(context: ExecutionContext): Prom
}
export async function executeListFolders(
params: Record<string, any>,
params: ListFoldersParams,
context: ExecutionContext
): Promise<ToolCallResult> {
try {
@@ -179,7 +190,7 @@ export async function executeListFolders(
}
export async function executeGetWorkflowData(
params: Record<string, any>,
params: GetWorkflowDataParams,
context: ExecutionContext
): Promise<ToolCallResult> {
try {
@@ -271,7 +282,7 @@ export async function executeGetWorkflowData(
}
export async function executeGetBlockOutputs(
params: Record<string, any>,
params: GetBlockOutputsParams,
context: ExecutionContext
): Promise<ToolCallResult> {
try {
@@ -343,7 +354,7 @@ export async function executeGetBlockOutputs(
}
export async function executeGetBlockUpstreamReferences(
params: Record<string, any>,
params: GetBlockUpstreamReferencesParams,
context: ExecutionContext
): Promise<ToolCallResult> {
try {
@@ -524,4 +535,3 @@ function formatOutputsWithPrefix(paths: string[], blockName: string): string[] {
const normalizedName = normalizeName(blockName)
return paths.map((path) => `${normalizedName}.${path}`)
}

View File

@@ -20,7 +20,10 @@ export interface ClientToolDisplay {
export interface BaseClientToolMetadata {
displayNames: Partial<Record<ClientToolCallState, ClientToolDisplay>>
uiConfig?: Record<string, unknown>
getDynamicText?: (params: Record<string, unknown>, state: ClientToolCallState) => string | undefined
getDynamicText?: (
params: Record<string, unknown>,
state: ClientToolCallState
) => string | undefined
}
export type DynamicTextFormatter = (

File diff suppressed because it is too large Load Diff

View File

@@ -1,58 +1,4 @@
/**
* Copilot Types - Consolidated from various locations
* This file contains all copilot-related type definitions
*/
// Tool call state types (from apps/sim/types/tool-call.ts)
export interface ToolCallState {
id: string
name: string
displayName?: string
parameters?: Record<string, any>
state:
| 'detecting'
| 'pending'
| 'executing'
| 'completed'
| 'error'
| 'rejected'
| 'applied'
| 'ready_for_review'
| 'aborted'
| 'skipped'
| 'background'
startTime?: number
endTime?: number
duration?: number
result?: any
error?: string
progress?: string
}
export interface ToolCallGroup {
id: string
toolCalls: ToolCallState[]
status: 'pending' | 'in_progress' | 'completed' | 'error'
startTime?: number
endTime?: number
summary?: string
}
export interface InlineContent {
type: 'text' | 'tool_call'
content: string
toolCall?: ToolCallState
}
export interface ParsedMessageContent {
textContent: string
toolCalls: ToolCallState[]
toolGroups: ToolCallGroup[]
inlineContent?: InlineContent[]
}
import type { ProviderId } from '@/providers/types'
// Copilot Tools Type Definitions (from workspace copilot lib)
import type { CopilotToolCall, ToolState } from '@/stores/panel'
export type NotificationStatus =
@@ -63,82 +9,10 @@ export type NotificationStatus =
| 'rejected'
| 'background'
// Export the consolidated types
export type { CopilotToolCall, ToolState }
// Display configuration for different states
export interface StateDisplayConfig {
displayName: string
icon?: string
className?: string
}
// Complete display configuration for a tool
export interface ToolDisplayConfig {
states: {
[K in ToolState]?: StateDisplayConfig
}
getDynamicDisplayName?: (state: ToolState, params: Record<string, any>) => string | null
}
// Schema for tool parameters (OpenAI function calling format)
export interface ToolSchema {
name: string
description: string
parameters?: {
type: 'object'
properties: Record<string, any>
required?: string[]
}
}
// Tool metadata - all the static configuration
export interface ToolMetadata {
id: string
displayConfig: ToolDisplayConfig
schema: ToolSchema
requiresInterrupt: boolean
allowBackgroundExecution?: boolean
stateMessages?: Partial<Record<NotificationStatus, string>>
}
// Result from executing a tool
export interface ToolExecuteResult {
success: boolean
data?: any
error?: string
}
// Response from the confirmation API
export interface ToolConfirmResponse {
success: boolean
message?: string
}
// Options for tool execution
export interface ToolExecutionOptions {
onStateChange?: (state: ToolState) => void
beforeExecute?: () => Promise<boolean>
afterExecute?: (result: ToolExecuteResult) => Promise<void>
context?: Record<string, any>
}
// The main tool interface that all tools must implement
export interface Tool {
metadata: ToolMetadata
execute(toolCall: CopilotToolCall, options?: ToolExecutionOptions): Promise<ToolExecuteResult>
getDisplayName(toolCall: CopilotToolCall): string
getIcon(toolCall: CopilotToolCall): string
handleUserAction(
toolCall: CopilotToolCall,
action: 'run' | 'skip' | 'background',
options?: ToolExecutionOptions
): Promise<void>
requiresConfirmation(toolCall: CopilotToolCall): boolean
}
// Provider configuration for Sim Agent requests
// This type is only for the `provider` field in requests sent to the Sim Agent
// Provider configuration for Sim Agent requests.
// This type is only for the `provider` field in requests sent to the Sim Agent.
export type CopilotProviderConfig =
| {
provider: 'azure-openai'