This commit is contained in:
Siddharth Ganesan
2026-02-07 11:58:09 -08:00
parent 38e2aa0efa
commit c8e9aee55a
6 changed files with 181 additions and 25 deletions

View File

@@ -620,6 +620,11 @@ async function handleBuildToolCall(
abortSignal?: AbortSignal
): Promise<CallToolResult> {
try {
logger.info('[MCP-BUILD] handleBuildToolCall ENTER', {
hasAbortSignal: !!abortSignal,
abortSignalAborted: abortSignal?.aborted,
argsKeys: Object.keys(args),
})
const requestText = (args.request as string) || JSON.stringify(args)
const { model } = getCopilotModel('chat')
const workflowId = args.workflowId as string | undefined
@@ -661,6 +666,12 @@ async function handleBuildToolCall(
source: 'mcp',
}
logger.info('[MCP-BUILD] Calling orchestrateCopilotStream', {
workflowId: resolved.workflowId,
chatId,
hasAbortSignal: !!abortSignal,
})
const result = await orchestrateCopilotStream(requestPayload, {
userId,
workflowId: resolved.workflowId,
@@ -671,6 +682,14 @@ async function handleBuildToolCall(
abortSignal,
})
logger.info('[MCP-BUILD] orchestrateCopilotStream returned', {
success: result.success,
contentLength: result.content?.length,
toolCallCount: result.toolCalls?.length,
error: result.error,
errors: result.errors,
})
const responseData = {
success: result.success,
content: result.content,
@@ -683,7 +702,7 @@ async function handleBuildToolCall(
isError: !result.success,
}
} catch (error) {
logger.error('Build tool call failed', { error })
logger.error('[MCP-BUILD] Build tool call THREW', { error })
return {
content: [
{

View File

@@ -110,7 +110,12 @@ export const sseHandlers: Record<string, SSEHandler> = {
const toolCallId = (toolData.id as string | undefined) || event.toolCallId
const toolName = (toolData.name as string | undefined) || event.toolName
if (!toolCallId || !toolName) return
logger.info('[TOOL_CALL] Received', { toolCallId, toolName, hasToolData: !!toolData })
if (!toolCallId || !toolName) {
logger.warn('[TOOL_CALL] Missing toolCallId or toolName, returning early', { toolCallId, toolName })
return
}
const args = (toolData.arguments || toolData.input || asRecord(event.data).input) as
| Record<string, unknown>
@@ -118,12 +123,23 @@ export const sseHandlers: Record<string, SSEHandler> = {
const isPartial = toolData.partial === true
const existing = context.toolCalls.get(toolCallId)
logger.info('[TOOL_CALL] State check', {
toolCallId,
toolName,
isPartial,
hasExisting: !!existing,
existingStatus: existing?.status,
existingEndTime: existing?.endTime,
hasArgs: !!args,
})
// If we've already completed this tool call, ignore late/duplicate tool_call events
// to avoid resetting UI/state back to pending and re-executing.
if (
existing?.endTime ||
(existing && existing.status !== 'pending' && existing.status !== 'executing')
) {
logger.info('[TOOL_CALL] Already completed, skipping', { toolCallId, toolName, status: existing?.status })
if (!existing.params && args) {
existing.params = args
}
@@ -144,20 +160,31 @@ export const sseHandlers: Record<string, SSEHandler> = {
addContentBlock(context, { type: 'tool_call', toolCall: created })
}
if (isPartial) return
if (wasToolResultSeen(toolCallId)) return
if (isPartial) {
logger.info('[TOOL_CALL] Partial event, returning early', { toolCallId, toolName })
return
}
if (wasToolResultSeen(toolCallId)) {
logger.info('[TOOL_CALL] Result already seen (dedup), returning early', { toolCallId, toolName })
return
}
const toolCall = context.toolCalls.get(toolCallId)
if (!toolCall) return
if (!toolCall) {
logger.warn('[TOOL_CALL] toolCall not found in map after set, returning early', { toolCallId })
return
}
// Subagent tools are executed by the copilot backend, not sim side.
if (SUBAGENT_TOOL_SET.has(toolName)) {
logger.info('[TOOL_CALL] Subagent tool, skipping local execution', { toolCallId, toolName })
return
}
// Respond tools are internal to copilot's subagent system - skip execution.
// The copilot backend handles these internally to signal subagent completion.
if (RESPOND_TOOL_SET.has(toolName)) {
logger.info('[TOOL_CALL] Respond tool, skipping', { toolCallId, toolName })
toolCall.status = 'success'
toolCall.endTime = Date.now()
toolCall.result = {
@@ -170,6 +197,14 @@ export const sseHandlers: Record<string, SSEHandler> = {
const isInterruptTool = isInterruptToolName(toolName)
const isInteractive = options.interactive === true
logger.info('[TOOL_CALL] Pre-execute check', {
toolCallId,
toolName,
isInterruptTool,
isInteractive,
autoExecuteTools: options.autoExecuteTools,
})
if (isInterruptTool && isInteractive) {
const decision = await waitForToolDecision(
toolCallId,
@@ -231,7 +266,11 @@ export const sseHandlers: Record<string, SSEHandler> = {
}
if (options.autoExecuteTools !== false) {
logger.info('[TOOL_CALL] Calling executeToolAndReport', { toolCallId, toolName })
await executeToolAndReport(toolCallId, context, execContext, options)
logger.info('[TOOL_CALL] executeToolAndReport returned', { toolCallId, toolName })
} else {
logger.info('[TOOL_CALL] autoExecuteTools is false, skipping execution', { toolCallId, toolName })
}
},
reasoning: (event, context) => {

View File

@@ -32,14 +32,31 @@ export async function executeToolAndReport(
options?: OrchestratorOptions
): Promise<void> {
const toolCall = context.toolCalls.get(toolCallId)
if (!toolCall) return
if (!toolCall) {
logger.warn('[EXEC] toolCall not found in context map', { toolCallId })
return
}
if (toolCall.status === 'executing') return
if (wasToolResultSeen(toolCall.id)) return
if (toolCall.status === 'executing') {
logger.warn('[EXEC] toolCall already executing, skipping', { toolCallId, toolName: toolCall.name })
return
}
if (wasToolResultSeen(toolCall.id)) {
logger.warn('[EXEC] toolCall result already seen (dedup), skipping', { toolCallId, toolName: toolCall.name })
return
}
logger.info('[EXEC] Starting tool execution', { toolCallId, toolName: toolCall.name, params: toolCall.params ? Object.keys(toolCall.params) : [] })
toolCall.status = 'executing'
try {
const result = await executeToolServerSide(toolCall, execContext)
logger.info('[EXEC] executeToolServerSide returned', {
toolCallId,
toolName: toolCall.name,
success: result.success,
hasOutput: !!result.output,
error: result.error,
})
toolCall.status = result.success ? 'success' : 'error'
toolCall.result = result
toolCall.error = result.error
@@ -68,14 +85,21 @@ export async function executeToolAndReport(
// the SSE reader (our for-await loop) is paused while we're in this
// handler. Awaiting here would deadlock: sim waits for Go's response,
// Go waits for sim to drain the SSE stream.
logger.info('[EXEC] Firing markToolComplete (fire-and-forget)', {
toolCallId: toolCall.id,
toolName: toolCall.name,
status: result.success ? 200 : 500,
})
markToolComplete(
toolCall.id,
toolCall.name,
result.success ? 200 : 500,
result.error || (result.success ? 'Tool completed' : 'Tool failed'),
result.output
).catch((err) => {
logger.error('markToolComplete fire-and-forget failed', {
).then((ok) => {
logger.info('[EXEC] markToolComplete resolved', { toolCallId: toolCall.id, toolName: toolCall.name, ok })
}).catch((err) => {
logger.error('[EXEC] markToolComplete fire-and-forget FAILED', {
toolCallId: toolCall.id,
toolName: toolCall.name,
error: err instanceof Error ? err.message : String(err),
@@ -96,7 +120,13 @@ export async function executeToolAndReport(
},
}
await options?.onEvent?.(resultEvent)
logger.info('[EXEC] executeToolAndReport complete', { toolCallId, toolName: toolCall.name })
} catch (error) {
logger.error('[EXEC] executeToolAndReport CAUGHT ERROR', {
toolCallId,
toolName: toolCall.name,
error: error instanceof Error ? error.message : String(error),
})
toolCall.status = 'error'
toolCall.error = error instanceof Error ? error.message : String(error)
toolCall.endTime = Date.now()
@@ -104,8 +134,10 @@ export async function executeToolAndReport(
markToolResultSeen(toolCall.id)
// Fire-and-forget (same reasoning as above).
markToolComplete(toolCall.id, toolCall.name, 500, toolCall.error).catch((err) => {
logger.error('markToolComplete fire-and-forget failed', {
markToolComplete(toolCall.id, toolCall.name, 500, toolCall.error).then((ok) => {
logger.info('[EXEC] markToolComplete (error path) resolved', { toolCallId: toolCall.id, ok })
}).catch((err) => {
logger.error('[EXEC] markToolComplete (error path) FAILED', {
toolCallId: toolCall.id,
toolName: toolCall.name,
error: err instanceof Error ? err.message : String(err),

View File

@@ -1,6 +1,9 @@
import { createLogger } from '@sim/logger'
import { STREAM_BUFFER_MAX_DEDUP_ENTRIES } from '@/lib/copilot/constants'
import type { SSEEvent } from '@/lib/copilot/orchestrator/types'
const logger = createLogger('CopilotSseUtils')
type EventDataObject = Record<string, unknown> | undefined
/** Safely cast event.data to a record for property access. */
@@ -107,7 +110,10 @@ export function shouldSkipToolCallEvent(event: SSEEvent): boolean {
if (!toolCallId) return false
const eventData = getEventData(event)
if (eventData?.partial === true) return false
if (wasToolResultSeen(toolCallId) || wasToolCallSeen(toolCallId)) {
const resultSeen = wasToolResultSeen(toolCallId)
const callSeen = wasToolCallSeen(toolCallId)
if (resultSeen || callSeen) {
logger.info('[DEDUP] Skipping tool_call event', { toolCallId, resultSeen, callSeen, seenToolCallsSize: seenToolCalls.size, seenToolResultsSize: seenToolResults.size })
return true
}
markToolCallSeen(toolCallId)
@@ -118,7 +124,10 @@ export function shouldSkipToolResultEvent(event: SSEEvent): boolean {
if (event.type !== 'tool_result') return false
const toolCallId = getToolCallIdFromEvent(event)
if (!toolCallId) return false
if (wasToolResultSeen(toolCallId)) return true
if (wasToolResultSeen(toolCallId)) {
logger.info('[DEDUP] Skipping tool_result event', { toolCallId, seenToolResultsSize: seenToolResults.size })
return true
}
markToolResultSeen(toolCallId)
return false
}

View File

@@ -89,8 +89,15 @@ export async function runStreamLoop(
const reader = response.body.getReader()
const decoder = new TextDecoder()
let eventCount = 0
logger.info('[STREAM] SSE stream connected, starting event loop', {
timeout,
hasAbortSignal: !!abortSignal,
})
const timeoutId = setTimeout(() => {
logger.warn('[STREAM] Timeout fired, cancelling reader', { timeout, eventCount })
context.errors.push('Request timed out')
context.streamComplete = true
reader.cancel().catch(() => {})
@@ -98,17 +105,37 @@ export async function runStreamLoop(
try {
for await (const event of parseSSEStream(reader, decoder, abortSignal)) {
eventCount++
if (abortSignal?.aborted) {
logger.warn('[STREAM] AbortSignal aborted, breaking', { eventCount })
context.wasAborted = true
break
}
const normalizedEvent = normalizeSseEvent(event)
logger.info('[STREAM] Event received', {
eventNum: eventCount,
type: normalizedEvent.type,
toolCallId: normalizedEvent.toolCallId,
toolName: normalizedEvent.toolName,
hasSubagent: !!normalizedEvent.subagent,
})
// Skip duplicate tool events.
const shouldSkipToolCall = shouldSkipToolCallEvent(normalizedEvent)
const shouldSkipToolResult = shouldSkipToolResultEvent(normalizedEvent)
if (shouldSkipToolCall || shouldSkipToolResult) {
logger.info('[STREAM] Skipping duplicate event', {
type: normalizedEvent.type,
toolCallId: normalizedEvent.toolCallId,
skipToolCall: shouldSkipToolCall,
skipToolResult: shouldSkipToolResult,
})
}
if (!shouldSkipToolCall && !shouldSkipToolResult) {
try {
await options.onEvent?.(normalizedEvent)
@@ -156,10 +183,18 @@ export async function runStreamLoop(
// Main event handler dispatch.
const handler = sseHandlers[normalizedEvent.type]
if (handler) {
logger.info('[STREAM] Dispatching to handler', { type: normalizedEvent.type, toolCallId: normalizedEvent.toolCallId })
await handler(normalizedEvent, context, execContext, options)
logger.info('[STREAM] Handler returned', { type: normalizedEvent.type, toolCallId: normalizedEvent.toolCallId, streamComplete: context.streamComplete })
} else {
logger.info('[STREAM] No handler for event type', { type: normalizedEvent.type })
}
if (context.streamComplete) {
logger.info('[STREAM] Stream marked complete, breaking', { eventCount, errors: context.errors })
break
}
if (context.streamComplete) break
}
logger.info('[STREAM] Event loop ended', { eventCount, streamComplete: context.streamComplete, wasAborted: context.wasAborted, errors: context.errors })
} finally {
clearTimeout(timeoutId)
}

View File

@@ -219,29 +219,50 @@ export async function markToolComplete(
message?: unknown,
data?: unknown
): Promise<boolean> {
const url = `${SIM_AGENT_API_URL}/api/tools/mark-complete`
logger.info('[MARK-COMPLETE] Starting', {
toolCallId,
toolName,
status,
url,
hasData: !!data,
hasCopilotApiKey: !!env.COPILOT_API_KEY,
})
try {
const controller = new AbortController()
const timeoutId = setTimeout(() => controller.abort(), MARK_COMPLETE_TIMEOUT_MS)
try {
const response = await fetch(`${SIM_AGENT_API_URL}/api/tools/mark-complete`, {
const body = JSON.stringify({
id: toolCallId,
name: toolName,
status,
message,
data,
})
logger.info('[MARK-COMPLETE] Sending POST', { toolCallId, toolName, bodyLength: body.length })
const response = await fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
...(env.COPILOT_API_KEY ? { 'x-api-key': env.COPILOT_API_KEY } : {}),
},
body: JSON.stringify({
id: toolCallId,
name: toolName,
status,
message,
data,
}),
body,
signal: controller.signal,
})
logger.info('[MARK-COMPLETE] Response received', {
toolCallId,
toolName,
httpStatus: response.status,
ok: response.ok,
})
if (!response.ok) {
logger.warn('Mark-complete call failed', { toolCallId, toolName, status: response.status })
const responseText = await response.text().catch(() => '')
logger.warn('[MARK-COMPLETE] Non-OK response', { toolCallId, toolName, httpStatus: response.status, responseText })
return false
}
@@ -252,11 +273,12 @@ export async function markToolComplete(
} catch (error) {
const isTimeout =
error instanceof DOMException && error.name === 'AbortError'
logger.error('Mark-complete call failed', {
logger.error('[MARK-COMPLETE] FAILED', {
toolCallId,
toolName,
timedOut: isTimeout,
error: error instanceof Error ? error.message : String(error),
errorName: error instanceof Error ? error.name : undefined,
})
return false
}