This commit is contained in:
Siddharth Ganesan
2026-02-04 11:27:45 -08:00
parent 2198a6caae
commit 8cea43d926
4 changed files with 124 additions and 69 deletions

View File

@@ -7,6 +7,7 @@ import {
handleSubagentRouting,
markToolCallSeen,
markToolResultSeen,
normalizeSseEvent,
sseHandlers,
subAgentHandlers,
wasToolCallSeen,
@@ -95,35 +96,33 @@ export async function orchestrateCopilotStream(
break
}
const normalizedEvent = normalizeSseEvent(event)
// Skip tool_result events for tools the sim-side already executed.
// The sim-side emits its own tool_result with complete data.
// For server-side tools (not executed by sim), we still forward the Go backend's tool_result.
const toolCallId = getToolCallIdFromEvent(event)
const eventData =
typeof event.data === 'string'
? (() => {
try {
return JSON.parse(event.data)
} catch {
return undefined
}
})()
: event.data
const toolCallId = getToolCallIdFromEvent(normalizedEvent)
const eventData = normalizedEvent.data
const isPartialToolCall = event.type === 'tool_call' && eventData?.partial === true
const isPartialToolCall = normalizedEvent.type === 'tool_call' && eventData?.partial === true
const shouldSkipToolCall =
event.type === 'tool_call' &&
normalizedEvent.type === 'tool_call' &&
!!toolCallId &&
!isPartialToolCall &&
(wasToolResultSeen(toolCallId) || wasToolCallSeen(toolCallId))
if (event.type === 'tool_call' && toolCallId && !isPartialToolCall && !shouldSkipToolCall) {
if (
normalizedEvent.type === 'tool_call' &&
toolCallId &&
!isPartialToolCall &&
!shouldSkipToolCall
) {
markToolCallSeen(toolCallId)
}
const shouldSkipToolResult =
event.type === 'tool_result' &&
normalizedEvent.type === 'tool_result' &&
(() => {
if (!toolCallId) return false
if (wasToolResultSeen(toolCallId)) return true
@@ -132,11 +131,11 @@ export async function orchestrateCopilotStream(
})()
if (!shouldSkipToolCall && !shouldSkipToolResult) {
await forwardEvent(event, options)
await forwardEvent(normalizedEvent, options)
}
if (event.type === 'subagent_start') {
const toolCallId = event.data?.tool_call_id
if (normalizedEvent.type === 'subagent_start') {
const toolCallId = normalizedEvent.data?.tool_call_id
if (toolCallId) {
context.subAgentParentToolCallId = toolCallId
context.subAgentContent[toolCallId] = ''
@@ -145,23 +144,23 @@ export async function orchestrateCopilotStream(
continue
}
if (event.type === 'subagent_end') {
if (normalizedEvent.type === 'subagent_end') {
context.subAgentParentToolCallId = undefined
continue
}
if (handleSubagentRouting(event, context)) {
const handler = subAgentHandlers[event.type]
if (handleSubagentRouting(normalizedEvent, context)) {
const handler = subAgentHandlers[normalizedEvent.type]
if (handler) {
await handler(event, context, execContext, options)
await handler(normalizedEvent, context, execContext, options)
}
if (context.streamComplete) break
continue
}
const handler = sseHandlers[event.type]
const handler = sseHandlers[normalizedEvent.type]
if (handler) {
await handler(event, context, execContext, options)
await handler(normalizedEvent, context, execContext, options)
}
if (context.streamComplete) break
}

View File

@@ -38,7 +38,9 @@ type EventDataObject = Record<string, any> | undefined
const parseEventData = (data: unknown): EventDataObject => {
if (!data) return undefined
if (typeof data !== 'string') return data as EventDataObject
if (typeof data !== 'string') {
return data as EventDataObject
}
try {
return JSON.parse(data) as EventDataObject
} catch {
@@ -46,13 +48,51 @@ const parseEventData = (data: unknown): EventDataObject => {
}
}
const getEventData = (event: SSEEvent): EventDataObject => parseEventData(event.data)
const hasToolFields = (data: EventDataObject): boolean => {
if (!data) return false
return (
data.id !== undefined ||
data.toolCallId !== undefined ||
data.name !== undefined ||
data.success !== undefined ||
data.result !== undefined ||
data.arguments !== undefined
)
}
const getEventData = (event: SSEEvent): EventDataObject => {
const topLevel = parseEventData(event.data)
if (!topLevel) return undefined
if (hasToolFields(topLevel)) return topLevel
const nested = parseEventData(topLevel.data)
return nested || topLevel
}
export function getToolCallIdFromEvent(event: SSEEvent): string | undefined {
const data = getEventData(event)
return event.toolCallId || data?.id || data?.toolCallId
}
/** Normalizes SSE events so tool metadata is available at the top level. */
export function normalizeSseEvent(event: SSEEvent): SSEEvent {
if (!event) return event
const data = getEventData(event)
if (!data) return event
const toolCallId = event.toolCallId || data.id || data.toolCallId
const toolName = event.toolName || data.name || data.toolName
const success = event.success ?? data.success
const result = event.result ?? data.result
const normalizedData = typeof event.data === 'string' ? data : event.data
return {
...event,
data: normalizedData,
toolCallId,
toolName,
success,
result,
}
}
/**
* Mark a tool call as executed by the sim-side.
* This prevents the Go backend's duplicate tool_result from being forwarded.

View File

@@ -6,6 +6,25 @@ const logger = createLogger('CopilotStreamBuffer')
const STREAM_TTL_SECONDS = 60 * 60
const STREAM_EVENT_LIMIT = 5000
const APPEND_STREAM_EVENT_LUA = `
local seqKey = KEYS[1]
local eventsKey = KEYS[2]
local ttl = tonumber(ARGV[1])
local limit = tonumber(ARGV[2])
local streamId = ARGV[3]
local eventJson = ARGV[4]
local id = redis.call('INCR', seqKey)
local entry = '{"eventId":' .. id .. ',"streamId":' .. cjson.encode(streamId) .. ',"event":' .. eventJson .. '}'
redis.call('ZADD', eventsKey, id, entry)
redis.call('EXPIRE', eventsKey, ttl)
redis.call('EXPIRE', seqKey, ttl)
if limit > 0 then
redis.call('ZREMRANGEBYRANK', eventsKey, 0, -limit-1)
end
return id
`
function getStreamKeyPrefix(streamId: string) {
return `copilot_stream:${streamId}`
}
@@ -99,22 +118,19 @@ export async function appendStreamEvent(
}
try {
const nextId = await redis.incr(getSeqKey(streamId))
const entry: StreamEventEntry = { eventId: nextId, streamId, event }
await redis.zadd(getEventsKey(streamId), nextId, JSON.stringify(entry))
const count = await redis.zcard(getEventsKey(streamId))
if (count > STREAM_EVENT_LIMIT) {
const trimCount = count - STREAM_EVENT_LIMIT
if (trimCount > 0) {
await redis.zremrangebyrank(getEventsKey(streamId), 0, trimCount - 1)
}
}
await redis.expire(getEventsKey(streamId), STREAM_TTL_SECONDS)
await redis.expire(getSeqKey(streamId), STREAM_TTL_SECONDS)
return entry
const eventJson = JSON.stringify(event)
const nextId = await redis.eval(
APPEND_STREAM_EVENT_LUA,
2,
getSeqKey(streamId),
getEventsKey(streamId),
STREAM_TTL_SECONDS,
STREAM_EVENT_LIMIT,
streamId,
eventJson
)
const eventId = typeof nextId === 'number' ? nextId : Number(nextId)
return { eventId, streamId, event }
} catch (error) {
logger.warn('Failed to append stream event', {
streamId,

View File

@@ -6,6 +6,7 @@ import {
handleSubagentRouting,
markToolCallSeen,
markToolResultSeen,
normalizeSseEvent,
sseHandlers,
subAgentHandlers,
wasToolCallSeen,
@@ -112,35 +113,33 @@ export async function orchestrateSubagentStream(
break
}
const normalizedEvent = normalizeSseEvent(event)
// Skip tool_result events for tools the sim-side already executed.
// The sim-side emits its own tool_result with complete data.
// For server-side tools (not executed by sim), we still forward the Go backend's tool_result.
const toolCallId = getToolCallIdFromEvent(event)
const eventData =
typeof event.data === 'string'
? (() => {
try {
return JSON.parse(event.data)
} catch {
return undefined
}
})()
: event.data
const toolCallId = getToolCallIdFromEvent(normalizedEvent)
const eventData = normalizedEvent.data
const isPartialToolCall = event.type === 'tool_call' && eventData?.partial === true
const isPartialToolCall = normalizedEvent.type === 'tool_call' && eventData?.partial === true
const shouldSkipToolCall =
event.type === 'tool_call' &&
normalizedEvent.type === 'tool_call' &&
!!toolCallId &&
!isPartialToolCall &&
(wasToolResultSeen(toolCallId) || wasToolCallSeen(toolCallId))
if (event.type === 'tool_call' && toolCallId && !isPartialToolCall && !shouldSkipToolCall) {
if (
normalizedEvent.type === 'tool_call' &&
toolCallId &&
!isPartialToolCall &&
!shouldSkipToolCall
) {
markToolCallSeen(toolCallId)
}
const shouldSkipToolResult =
event.type === 'tool_result' &&
normalizedEvent.type === 'tool_result' &&
(() => {
if (!toolCallId) return false
if (wasToolResultSeen(toolCallId)) return true
@@ -149,18 +148,18 @@ export async function orchestrateSubagentStream(
})()
if (!shouldSkipToolCall && !shouldSkipToolResult) {
await forwardEvent(event, options)
await forwardEvent(normalizedEvent, options)
}
if (event.type === 'structured_result' || event.type === 'subagent_result') {
structuredResult = normalizeStructuredResult(event.data)
if (normalizedEvent.type === 'structured_result' || normalizedEvent.type === 'subagent_result') {
structuredResult = normalizeStructuredResult(normalizedEvent.data)
context.streamComplete = true
continue
}
// Handle subagent_start/subagent_end events to track nested subagent calls
if (event.type === 'subagent_start') {
const toolCallId = event.data?.tool_call_id
if (normalizedEvent.type === 'subagent_start') {
const toolCallId = normalizedEvent.data?.tool_call_id
if (toolCallId) {
context.subAgentParentToolCallId = toolCallId
context.subAgentContent[toolCallId] = ''
@@ -169,7 +168,7 @@ export async function orchestrateSubagentStream(
continue
}
if (event.type === 'subagent_end') {
if (normalizedEvent.type === 'subagent_end') {
context.subAgentParentToolCallId = undefined
continue
}
@@ -177,22 +176,23 @@ export async function orchestrateSubagentStream(
// For direct subagent calls, events may have the subagent field set (e.g., subagent: "discovery")
// but no subagent_start event because this IS the top-level agent. Skip subagent routing
// for events where the subagent field matches the current agentId - these are top-level events.
const isTopLevelSubagentEvent = event.subagent === agentId && !context.subAgentParentToolCallId
const isTopLevelSubagentEvent =
normalizedEvent.subagent === agentId && !context.subAgentParentToolCallId
// Only route to subagent handlers for nested subagent events (not matching current agentId)
if (!isTopLevelSubagentEvent && handleSubagentRouting(event, context)) {
const handler = subAgentHandlers[event.type]
if (!isTopLevelSubagentEvent && handleSubagentRouting(normalizedEvent, context)) {
const handler = subAgentHandlers[normalizedEvent.type]
if (handler) {
await handler(event, context, execContext, options)
await handler(normalizedEvent, context, execContext, options)
}
if (context.streamComplete) break
continue
}
// Process as a regular SSE event (including top-level subagent events)
const handler = sseHandlers[event.type]
const handler = sseHandlers[normalizedEvent.type]
if (handler) {
await handler(event, context, execContext, options)
await handler(normalizedEvent, context, execContext, options)
}
if (context.streamComplete) break
}