diff --git a/apps/sim/lib/copilot/orchestrator/index.ts b/apps/sim/lib/copilot/orchestrator/index.ts index e3a0b35fc..f2286de75 100644 --- a/apps/sim/lib/copilot/orchestrator/index.ts +++ b/apps/sim/lib/copilot/orchestrator/index.ts @@ -7,6 +7,7 @@ import { handleSubagentRouting, markToolCallSeen, markToolResultSeen, + normalizeSseEvent, sseHandlers, subAgentHandlers, wasToolCallSeen, @@ -95,35 +96,33 @@ export async function orchestrateCopilotStream( break } + const normalizedEvent = normalizeSseEvent(event) + // Skip tool_result events for tools the sim-side already executed. // The sim-side emits its own tool_result with complete data. // For server-side tools (not executed by sim), we still forward the Go backend's tool_result. - const toolCallId = getToolCallIdFromEvent(event) - const eventData = - typeof event.data === 'string' - ? (() => { - try { - return JSON.parse(event.data) - } catch { - return undefined - } - })() - : event.data + const toolCallId = getToolCallIdFromEvent(normalizedEvent) + const eventData = normalizedEvent.data - const isPartialToolCall = event.type === 'tool_call' && eventData?.partial === true + const isPartialToolCall = normalizedEvent.type === 'tool_call' && eventData?.partial === true const shouldSkipToolCall = - event.type === 'tool_call' && + normalizedEvent.type === 'tool_call' && !!toolCallId && !isPartialToolCall && (wasToolResultSeen(toolCallId) || wasToolCallSeen(toolCallId)) - if (event.type === 'tool_call' && toolCallId && !isPartialToolCall && !shouldSkipToolCall) { + if ( + normalizedEvent.type === 'tool_call' && + toolCallId && + !isPartialToolCall && + !shouldSkipToolCall + ) { markToolCallSeen(toolCallId) } const shouldSkipToolResult = - event.type === 'tool_result' && + normalizedEvent.type === 'tool_result' && (() => { if (!toolCallId) return false if (wasToolResultSeen(toolCallId)) return true @@ -132,11 +131,11 @@ export async function orchestrateCopilotStream( })() if (!shouldSkipToolCall && !shouldSkipToolResult) { - await forwardEvent(event, options) + await forwardEvent(normalizedEvent, options) } - if (event.type === 'subagent_start') { - const toolCallId = event.data?.tool_call_id + if (normalizedEvent.type === 'subagent_start') { + const toolCallId = normalizedEvent.data?.tool_call_id if (toolCallId) { context.subAgentParentToolCallId = toolCallId context.subAgentContent[toolCallId] = '' @@ -145,23 +144,23 @@ export async function orchestrateCopilotStream( continue } - if (event.type === 'subagent_end') { + if (normalizedEvent.type === 'subagent_end') { context.subAgentParentToolCallId = undefined continue } - if (handleSubagentRouting(event, context)) { - const handler = subAgentHandlers[event.type] + if (handleSubagentRouting(normalizedEvent, context)) { + const handler = subAgentHandlers[normalizedEvent.type] if (handler) { - await handler(event, context, execContext, options) + await handler(normalizedEvent, context, execContext, options) } if (context.streamComplete) break continue } - const handler = sseHandlers[event.type] + const handler = sseHandlers[normalizedEvent.type] if (handler) { - await handler(event, context, execContext, options) + await handler(normalizedEvent, context, execContext, options) } if (context.streamComplete) break } diff --git a/apps/sim/lib/copilot/orchestrator/sse-handlers.ts b/apps/sim/lib/copilot/orchestrator/sse-handlers.ts index 0198599c9..4ad167345 100644 --- a/apps/sim/lib/copilot/orchestrator/sse-handlers.ts +++ b/apps/sim/lib/copilot/orchestrator/sse-handlers.ts @@ -38,7 +38,9 @@ type EventDataObject = Record | undefined const parseEventData = (data: unknown): EventDataObject => { if (!data) return undefined - if (typeof data !== 'string') return data as EventDataObject + if (typeof data !== 'string') { + return data as EventDataObject + } try { return JSON.parse(data) as EventDataObject } catch { @@ -46,13 +48,51 @@ const parseEventData = (data: unknown): EventDataObject => { } } -const getEventData = (event: SSEEvent): EventDataObject => parseEventData(event.data) +const hasToolFields = (data: EventDataObject): boolean => { + if (!data) return false + return ( + data.id !== undefined || + data.toolCallId !== undefined || + data.name !== undefined || + data.success !== undefined || + data.result !== undefined || + data.arguments !== undefined + ) +} + +const getEventData = (event: SSEEvent): EventDataObject => { + const topLevel = parseEventData(event.data) + if (!topLevel) return undefined + if (hasToolFields(topLevel)) return topLevel + const nested = parseEventData(topLevel.data) + return nested || topLevel +} export function getToolCallIdFromEvent(event: SSEEvent): string | undefined { const data = getEventData(event) return event.toolCallId || data?.id || data?.toolCallId } +/** Normalizes SSE events so tool metadata is available at the top level. */ +export function normalizeSseEvent(event: SSEEvent): SSEEvent { + if (!event) return event + const data = getEventData(event) + if (!data) return event + const toolCallId = event.toolCallId || data.id || data.toolCallId + const toolName = event.toolName || data.name || data.toolName + const success = event.success ?? data.success + const result = event.result ?? data.result + const normalizedData = typeof event.data === 'string' ? data : event.data + return { + ...event, + data: normalizedData, + toolCallId, + toolName, + success, + result, + } +} + /** * Mark a tool call as executed by the sim-side. * This prevents the Go backend's duplicate tool_result from being forwarded. diff --git a/apps/sim/lib/copilot/orchestrator/stream-buffer.ts b/apps/sim/lib/copilot/orchestrator/stream-buffer.ts index 11f651870..b1b37bb55 100644 --- a/apps/sim/lib/copilot/orchestrator/stream-buffer.ts +++ b/apps/sim/lib/copilot/orchestrator/stream-buffer.ts @@ -6,6 +6,25 @@ const logger = createLogger('CopilotStreamBuffer') const STREAM_TTL_SECONDS = 60 * 60 const STREAM_EVENT_LIMIT = 5000 +const APPEND_STREAM_EVENT_LUA = ` +local seqKey = KEYS[1] +local eventsKey = KEYS[2] +local ttl = tonumber(ARGV[1]) +local limit = tonumber(ARGV[2]) +local streamId = ARGV[3] +local eventJson = ARGV[4] + +local id = redis.call('INCR', seqKey) +local entry = '{"eventId":' .. id .. ',"streamId":' .. cjson.encode(streamId) .. ',"event":' .. eventJson .. '}' +redis.call('ZADD', eventsKey, id, entry) +redis.call('EXPIRE', eventsKey, ttl) +redis.call('EXPIRE', seqKey, ttl) +if limit > 0 then + redis.call('ZREMRANGEBYRANK', eventsKey, 0, -limit-1) +end +return id +` + function getStreamKeyPrefix(streamId: string) { return `copilot_stream:${streamId}` } @@ -99,22 +118,19 @@ export async function appendStreamEvent( } try { - const nextId = await redis.incr(getSeqKey(streamId)) - const entry: StreamEventEntry = { eventId: nextId, streamId, event } - await redis.zadd(getEventsKey(streamId), nextId, JSON.stringify(entry)) - - const count = await redis.zcard(getEventsKey(streamId)) - if (count > STREAM_EVENT_LIMIT) { - const trimCount = count - STREAM_EVENT_LIMIT - if (trimCount > 0) { - await redis.zremrangebyrank(getEventsKey(streamId), 0, trimCount - 1) - } - } - - await redis.expire(getEventsKey(streamId), STREAM_TTL_SECONDS) - await redis.expire(getSeqKey(streamId), STREAM_TTL_SECONDS) - - return entry + const eventJson = JSON.stringify(event) + const nextId = await redis.eval( + APPEND_STREAM_EVENT_LUA, + 2, + getSeqKey(streamId), + getEventsKey(streamId), + STREAM_TTL_SECONDS, + STREAM_EVENT_LIMIT, + streamId, + eventJson + ) + const eventId = typeof nextId === 'number' ? nextId : Number(nextId) + return { eventId, streamId, event } } catch (error) { logger.warn('Failed to append stream event', { streamId, diff --git a/apps/sim/lib/copilot/orchestrator/subagent.ts b/apps/sim/lib/copilot/orchestrator/subagent.ts index 3d3795f7b..1d649eb3c 100644 --- a/apps/sim/lib/copilot/orchestrator/subagent.ts +++ b/apps/sim/lib/copilot/orchestrator/subagent.ts @@ -6,6 +6,7 @@ import { handleSubagentRouting, markToolCallSeen, markToolResultSeen, + normalizeSseEvent, sseHandlers, subAgentHandlers, wasToolCallSeen, @@ -112,35 +113,33 @@ export async function orchestrateSubagentStream( break } + const normalizedEvent = normalizeSseEvent(event) + // Skip tool_result events for tools the sim-side already executed. // The sim-side emits its own tool_result with complete data. // For server-side tools (not executed by sim), we still forward the Go backend's tool_result. - const toolCallId = getToolCallIdFromEvent(event) - const eventData = - typeof event.data === 'string' - ? (() => { - try { - return JSON.parse(event.data) - } catch { - return undefined - } - })() - : event.data + const toolCallId = getToolCallIdFromEvent(normalizedEvent) + const eventData = normalizedEvent.data - const isPartialToolCall = event.type === 'tool_call' && eventData?.partial === true + const isPartialToolCall = normalizedEvent.type === 'tool_call' && eventData?.partial === true const shouldSkipToolCall = - event.type === 'tool_call' && + normalizedEvent.type === 'tool_call' && !!toolCallId && !isPartialToolCall && (wasToolResultSeen(toolCallId) || wasToolCallSeen(toolCallId)) - if (event.type === 'tool_call' && toolCallId && !isPartialToolCall && !shouldSkipToolCall) { + if ( + normalizedEvent.type === 'tool_call' && + toolCallId && + !isPartialToolCall && + !shouldSkipToolCall + ) { markToolCallSeen(toolCallId) } const shouldSkipToolResult = - event.type === 'tool_result' && + normalizedEvent.type === 'tool_result' && (() => { if (!toolCallId) return false if (wasToolResultSeen(toolCallId)) return true @@ -149,18 +148,18 @@ export async function orchestrateSubagentStream( })() if (!shouldSkipToolCall && !shouldSkipToolResult) { - await forwardEvent(event, options) + await forwardEvent(normalizedEvent, options) } - if (event.type === 'structured_result' || event.type === 'subagent_result') { - structuredResult = normalizeStructuredResult(event.data) + if (normalizedEvent.type === 'structured_result' || normalizedEvent.type === 'subagent_result') { + structuredResult = normalizeStructuredResult(normalizedEvent.data) context.streamComplete = true continue } // Handle subagent_start/subagent_end events to track nested subagent calls - if (event.type === 'subagent_start') { - const toolCallId = event.data?.tool_call_id + if (normalizedEvent.type === 'subagent_start') { + const toolCallId = normalizedEvent.data?.tool_call_id if (toolCallId) { context.subAgentParentToolCallId = toolCallId context.subAgentContent[toolCallId] = '' @@ -169,7 +168,7 @@ export async function orchestrateSubagentStream( continue } - if (event.type === 'subagent_end') { + if (normalizedEvent.type === 'subagent_end') { context.subAgentParentToolCallId = undefined continue } @@ -177,22 +176,23 @@ export async function orchestrateSubagentStream( // For direct subagent calls, events may have the subagent field set (e.g., subagent: "discovery") // but no subagent_start event because this IS the top-level agent. Skip subagent routing // for events where the subagent field matches the current agentId - these are top-level events. - const isTopLevelSubagentEvent = event.subagent === agentId && !context.subAgentParentToolCallId + const isTopLevelSubagentEvent = + normalizedEvent.subagent === agentId && !context.subAgentParentToolCallId // Only route to subagent handlers for nested subagent events (not matching current agentId) - if (!isTopLevelSubagentEvent && handleSubagentRouting(event, context)) { - const handler = subAgentHandlers[event.type] + if (!isTopLevelSubagentEvent && handleSubagentRouting(normalizedEvent, context)) { + const handler = subAgentHandlers[normalizedEvent.type] if (handler) { - await handler(event, context, execContext, options) + await handler(normalizedEvent, context, execContext, options) } if (context.streamComplete) break continue } // Process as a regular SSE event (including top-level subagent events) - const handler = sseHandlers[event.type] + const handler = sseHandlers[normalizedEvent.type] if (handler) { - await handler(event, context, execContext, options) + await handler(normalizedEvent, context, execContext, options) } if (context.streamComplete) break }