Checkpoint

This commit is contained in:
Siddharth Ganesan
2026-02-12 17:12:54 -08:00
parent 3d5336994b
commit 541665e41a
14 changed files with 475 additions and 223 deletions

View File

@@ -273,7 +273,7 @@ export const sseHandlers: Record<string, SSEHandler> = {
chatListTitle: updatedState.chats.find((c) => c.id === targetChatId)?.title || null,
})
},
tool_result: (data, context, get, set) => {
'copilot.tool.result': (data, context, get, set) => {
try {
const eventData = asRecord(data?.data)
const toolCallId: string | undefined =
@@ -428,123 +428,7 @@ export const sseHandlers: Record<string, SSEHandler> = {
})
}
},
tool_error: (data, context, get, set) => {
try {
const errorData = asRecord(data?.data)
const toolCallId: string | undefined =
data?.toolCallId ||
(errorData.id as string | undefined) ||
(errorData.callId as string | undefined)
const failedDependency: boolean = data?.failedDependency === true
if (!toolCallId) return
const { toolCallsById } = get()
const current = toolCallsById[toolCallId]
if (current) {
if (
isRejectedState(current.state) ||
isReviewState(current.state) ||
isBackgroundState(current.state)
) {
return
}
const targetState = errorData.state
? mapServerStateToClientState(errorData.state)
: failedDependency
? ClientToolCallState.rejected
: ClientToolCallState.error
const uiMetadata = extractToolUiMetadata(errorData)
const executionMetadata = extractToolExecutionMetadata(errorData)
const updatedMap = { ...toolCallsById }
updatedMap[toolCallId] = {
...current,
ui: uiMetadata || current.ui,
execution: executionMetadata || current.execution,
state: targetState,
display: resolveDisplayFromServerUi(
current.name,
targetState,
current.id,
current.params,
uiMetadata || current.ui
),
}
set({ toolCallsById: updatedMap })
}
for (let i = 0; i < context.contentBlocks.length; i++) {
const b = context.contentBlocks[i]
if (b?.type === 'tool_call' && b?.toolCall?.id === toolCallId) {
if (
isRejectedState(b.toolCall?.state) ||
isReviewState(b.toolCall?.state) ||
isBackgroundState(b.toolCall?.state)
)
break
const targetState = errorData.state
? mapServerStateToClientState(errorData.state)
: failedDependency
? ClientToolCallState.rejected
: ClientToolCallState.error
const uiMetadata = extractToolUiMetadata(errorData)
const executionMetadata = extractToolExecutionMetadata(errorData)
context.contentBlocks[i] = {
...b,
toolCall: {
...b.toolCall,
ui: uiMetadata || b.toolCall?.ui,
execution: executionMetadata || b.toolCall?.execution,
state: targetState,
display: resolveDisplayFromServerUi(
b.toolCall?.name,
targetState,
toolCallId,
b.toolCall?.params,
uiMetadata || b.toolCall?.ui
),
},
}
break
}
}
updateStreamingMessage(set, context)
} catch (error) {
logger.warn('Failed to process tool_error SSE event', {
error: error instanceof Error ? error.message : String(error),
})
}
},
tool_generating: (data, context, get, set) => {
const eventData = asRecord(data?.data)
const toolCallId =
data?.toolCallId ||
(eventData.id as string | undefined) ||
(eventData.callId as string | undefined)
const toolName =
data?.toolName ||
(eventData.name as string | undefined) ||
(eventData.toolName as string | undefined)
if (!toolCallId || !toolName) return
const { toolCallsById } = get()
if (!toolCallsById[toolCallId]) {
const initialState = ClientToolCallState.generating
const uiMetadata = extractToolUiMetadata(eventData)
const tc: CopilotToolCall = {
id: toolCallId,
name: toolName,
state: initialState,
ui: uiMetadata,
execution: extractToolExecutionMetadata(eventData),
display: resolveDisplayFromServerUi(toolName, initialState, toolCallId, undefined, uiMetadata),
}
const updated = { ...toolCallsById, [toolCallId]: tc }
set({ toolCallsById: updated })
logger.info('[toolCallsById] map updated', updated)
upsertToolCallBlock(context, tc)
updateStreamingMessage(set, context)
}
},
tool_call: (data, context, get, set) => {
'copilot.tool.call': (data, context, get, set) => {
const toolData = asRecord(data?.data)
const id: string | undefined =
(toolData.id as string | undefined) ||
@@ -647,7 +531,7 @@ export const sseHandlers: Record<string, SSEHandler> = {
return
},
reasoning: (data, context, _get, set) => {
'copilot.phase.progress': (data, context, _get, set) => {
const phase = (data && (data.phase || data?.data?.phase)) as string | undefined
if (phase === 'start') {
beginThinkingBlock(context)
@@ -664,7 +548,7 @@ export const sseHandlers: Record<string, SSEHandler> = {
appendThinkingContent(context, chunk)
updateStreamingMessage(set, context)
},
content: (data, context, get, set) => {
'copilot.content': (data, context, get, set) => {
if (!data.data) return
context.pendingContent += data.data
@@ -879,7 +763,7 @@ export const sseHandlers: Record<string, SSEHandler> = {
updateStreamingMessage(set, context)
}
},
done: (_data, context) => {
'copilot.phase.completed': (_data, context) => {
logger.info('[SSE] DONE EVENT RECEIVED', {
doneEventCount: context.doneEventCount,
data: _data,
@@ -890,7 +774,7 @@ export const sseHandlers: Record<string, SSEHandler> = {
context.streamComplete = true
}
},
error: (data, context, _get, set) => {
'copilot.error': (data, context, _get, set) => {
logger.error('Stream error:', data.error)
set((state: CopilotStore) => ({
messages: state.messages.map((msg) =>
@@ -905,6 +789,7 @@ export const sseHandlers: Record<string, SSEHandler> = {
}))
context.streamComplete = true
},
'copilot.phase.started': () => {},
stream_end: (_data, context, _get, set) => {
if (context.pendingContent) {
if (context.isInThinkingBlock && context.currentThinkingBlock) {
@@ -919,3 +804,8 @@ export const sseHandlers: Record<string, SSEHandler> = {
},
default: () => {},
}
sseHandlers['copilot.tool.interrupt_required'] = sseHandlers['copilot.tool.call']
sseHandlers['copilot.workflow.patch'] = sseHandlers['copilot.tool.result']
sseHandlers['copilot.workflow.verify'] = sseHandlers['copilot.tool.result']
sseHandlers['copilot.tool.interrupt_resolved'] = sseHandlers['copilot.tool.result']

View File

@@ -0,0 +1,172 @@
/**
* @vitest-environment node
*/
import { describe, expect, it, vi } from 'vitest'
import { applySseEvent } from '@/lib/copilot/client-sse/subagent-handlers'
import type { ClientStreamingContext } from '@/lib/copilot/client-sse/types'
import { ClientToolCallState } from '@/lib/copilot/tools/client/tool-display-registry'
import type { CopilotStore, CopilotToolCall } from '@/stores/panel/copilot/types'
type StoreSet = (
partial: Partial<CopilotStore> | ((state: CopilotStore) => Partial<CopilotStore>)
) => void
function createTestStore(initialToolCalls: Record<string, CopilotToolCall>) {
const state: Partial<CopilotStore> = {
messages: [{ id: 'assistant-msg', role: 'assistant', content: '', timestamp: new Date().toISOString() }],
toolCallsById: { ...initialToolCalls },
currentChat: null,
chats: [],
activeStream: null,
updatePlanTodoStatus: vi.fn(),
handleNewChatCreation: vi.fn().mockResolvedValue(undefined),
}
const get = () => state as CopilotStore
const set: StoreSet = (partial) => {
const patch = typeof partial === 'function' ? partial(get()) : partial
Object.assign(state, patch)
}
return { get, set }
}
function createStreamingContext(): ClientStreamingContext {
return {
messageId: 'assistant-msg',
accumulatedContent: '',
contentBlocks: [],
currentTextBlock: null,
isInThinkingBlock: false,
currentThinkingBlock: null,
isInDesignWorkflowBlock: false,
designWorkflowContent: '',
pendingContent: '',
doneEventCount: 0,
streamComplete: false,
subAgentContent: {},
subAgentToolCalls: {},
subAgentBlocks: {},
suppressStreamingUpdates: true,
}
}
describe('client SSE copilot.* stream smoke', () => {
it('processes main tool call/result events with copilot.* keys', async () => {
const { get, set } = createTestStore({})
const context = createStreamingContext()
await applySseEvent(
{
type: 'copilot.tool.call',
data: { id: 'main-tool-1', name: 'get_user_workflow', state: 'executing', arguments: {} },
} as any,
context,
get,
set
)
await applySseEvent(
{
type: 'copilot.tool.result',
toolCallId: 'main-tool-1',
success: true,
result: { ok: true },
data: {
id: 'main-tool-1',
name: 'get_user_workflow',
phase: 'completed',
state: 'success',
success: true,
result: { ok: true },
},
} as any,
context,
get,
set
)
expect(get().toolCallsById['main-tool-1']).toBeDefined()
expect(get().toolCallsById['main-tool-1'].state).toBe(ClientToolCallState.success)
expect(
context.contentBlocks.some(
(block) => block.type === 'tool_call' && block.toolCall?.id === 'main-tool-1'
)
).toBe(true)
})
it('processes subagent start/tool/result/end with copilot.* keys', async () => {
const parentToolCallId = 'parent-edit-tool'
const { get, set } = createTestStore({
[parentToolCallId]: {
id: parentToolCallId,
name: 'edit',
state: ClientToolCallState.executing,
},
})
const context = createStreamingContext()
await applySseEvent(
{
type: 'copilot.subagent.started',
subagent: 'edit',
data: { tool_call_id: parentToolCallId },
} as any,
context,
get,
set
)
await applySseEvent(
{
type: 'copilot.tool.call',
subagent: 'edit',
data: {
id: 'sub-tool-1',
name: 'workflow_context_get',
state: 'executing',
arguments: { includeSchemas: false },
},
} as any,
context,
get,
set
)
await applySseEvent(
{
type: 'copilot.tool.result',
subagent: 'edit',
data: {
id: 'sub-tool-1',
name: 'workflow_context_get',
phase: 'completed',
state: 'success',
success: true,
result: { contextPackId: 'pack-1' },
},
} as any,
context,
get,
set
)
await applySseEvent(
{
type: 'copilot.subagent.completed',
subagent: 'edit',
data: {},
} as any,
context,
get,
set
)
const parentToolCall = get().toolCallsById[parentToolCallId]
expect(parentToolCall).toBeDefined()
expect(parentToolCall.subAgentStreaming).toBe(false)
expect(parentToolCall.subAgentToolCalls?.length).toBe(1)
expect(parentToolCall.subAgentToolCalls?.[0]?.id).toBe('sub-tool-1')
expect(parentToolCall.subAgentToolCalls?.[0]?.state).toBe(ClientToolCallState.success)
})
})

View File

@@ -124,11 +124,11 @@ export function updateToolCallWithSubAgentData(
}
export const subAgentSSEHandlers: Record<string, SSEHandler> = {
start: () => {
// Subagent start event - no action needed, parent is already tracked from subagent_start
'copilot.phase.started': () => {
// No-op: parent subagent association is handled by copilot.subagent.started.
},
content: (data, context, get, set) => {
'copilot.content': (data, context, get, set) => {
const parentToolCallId = context.subAgentParentToolCallId
const contentStr = typeof data.data === 'string' ? data.data : data.content || ''
logger.info('[SubAgent] content event', {
@@ -149,7 +149,7 @@ export const subAgentSSEHandlers: Record<string, SSEHandler> = {
updateToolCallWithSubAgentData(context, get, set, parentToolCallId)
},
reasoning: (data, context, get, set) => {
'copilot.phase.progress': (data, context, get, set) => {
const parentToolCallId = context.subAgentParentToolCallId
const dataObj = asRecord(data?.data)
const phase = data?.phase || (dataObj.phase as string | undefined)
@@ -165,11 +165,7 @@ export const subAgentSSEHandlers: Record<string, SSEHandler> = {
updateToolCallWithSubAgentData(context, get, set, parentToolCallId)
},
tool_generating: () => {
// Tool generating event - no action needed, we'll handle the actual tool_call
},
tool_call: async (data, context, get, set) => {
'copilot.tool.call': async (data, context, get, set) => {
const parentToolCallId = context.subAgentParentToolCallId
if (!parentToolCallId) return
@@ -269,7 +265,7 @@ export const subAgentSSEHandlers: Record<string, SSEHandler> = {
}
},
tool_result: (data, context, get, set) => {
'copilot.tool.result': (data, context, get, set) => {
const parentToolCallId = context.subAgentParentToolCallId
if (!parentToolCallId) return
@@ -368,7 +364,7 @@ export const subAgentSSEHandlers: Record<string, SSEHandler> = {
updateToolCallWithSubAgentData(context, get, set, parentToolCallId)
},
done: (_data, context, get, set) => {
'copilot.phase.completed': (_data, context, get, set) => {
const parentToolCallId = context.subAgentParentToolCallId
if (!parentToolCallId) return
@@ -376,6 +372,11 @@ export const subAgentSSEHandlers: Record<string, SSEHandler> = {
},
}
subAgentSSEHandlers['copilot.tool.interrupt_required'] = subAgentSSEHandlers['copilot.tool.call']
subAgentSSEHandlers['copilot.workflow.patch'] = subAgentSSEHandlers['copilot.tool.result']
subAgentSSEHandlers['copilot.workflow.verify'] = subAgentSSEHandlers['copilot.tool.result']
subAgentSSEHandlers['copilot.tool.interrupt_resolved'] = subAgentSSEHandlers['copilot.tool.result']
export async function applySseEvent(
rawData: SSEEvent,
context: ClientStreamingContext,
@@ -388,7 +389,7 @@ export async function applySseEvent(
}
const data = normalizedEvent
if (data.type === 'subagent_start') {
if (data.type === 'copilot.subagent.started') {
const startData = asRecord(data.data)
const toolCallId = startData.tool_call_id as string | undefined
if (toolCallId) {
@@ -411,7 +412,7 @@ export async function applySseEvent(
return true
}
if (data.type === 'subagent_end') {
if (data.type === 'copilot.subagent.completed') {
const parentToolCallId = context.subAgentParentToolCallId
if (parentToolCallId) {
const { toolCallsById } = get()

View File

@@ -54,14 +54,14 @@ describe('sse-handlers tool lifecycle', () => {
}
})
it('executes tool_call and emits tool_result + mark-complete', async () => {
it('executes copilot.tool.call and emits copilot.tool.result + mark-complete', async () => {
executeToolServerSide.mockResolvedValueOnce({ success: true, output: { ok: true } })
markToolComplete.mockResolvedValueOnce(true)
const onEvent = vi.fn()
await sseHandlers.tool_call(
await sseHandlers['copilot.tool.call'](
{
type: 'tool_call',
type: 'copilot.tool.call',
data: { id: 'tool-1', name: 'get_user_workflow', arguments: { workflowId: 'workflow-1' } },
} as any,
context,
@@ -73,7 +73,7 @@ describe('sse-handlers tool lifecycle', () => {
expect(markToolComplete).toHaveBeenCalledTimes(1)
expect(onEvent).toHaveBeenCalledWith(
expect.objectContaining({
type: 'tool_result',
type: 'copilot.tool.result',
toolCallId: 'tool-1',
success: true,
})
@@ -84,17 +84,17 @@ describe('sse-handlers tool lifecycle', () => {
expect(updated?.result?.output).toEqual({ ok: true })
})
it('skips duplicate tool_call after result', async () => {
it('skips duplicate copilot.tool.call after result', async () => {
executeToolServerSide.mockResolvedValueOnce({ success: true, output: { ok: true } })
markToolComplete.mockResolvedValueOnce(true)
const event = {
type: 'tool_call',
type: 'copilot.tool.call',
data: { id: 'tool-dup', name: 'get_user_workflow', arguments: { workflowId: 'workflow-1' } },
}
await sseHandlers.tool_call(event as any, context, execContext, { interactive: false })
await sseHandlers.tool_call(event as any, context, execContext, { interactive: false })
await sseHandlers['copilot.tool.call'](event as any, context, execContext, { interactive: false })
await sseHandlers['copilot.tool.call'](event as any, context, execContext, { interactive: false })
expect(executeToolServerSide).toHaveBeenCalledTimes(1)
expect(markToolComplete).toHaveBeenCalledTimes(1)

View File

@@ -266,7 +266,7 @@ export const sseHandlers: Record<string, SSEHandler> = {
context.chatId = asRecord(event.data).chatId as string | undefined
},
title_updated: () => {},
tool_result: (event, context) => {
'copilot.tool.result': (event, context) => {
const data = getEventData(event)
const toolCallId = event.toolCallId || (data?.id as string | undefined)
if (!toolCallId) return
@@ -292,35 +292,7 @@ export const sseHandlers: Record<string, SSEHandler> = {
current.error = (data?.error || resultObj.error) as string | undefined
}
},
tool_error: (event, context) => {
const data = getEventData(event)
const toolCallId = event.toolCallId || (data?.id as string | undefined)
if (!toolCallId) return
const current = context.toolCalls.get(toolCallId)
if (!current) return
current.status = data?.state ? mapServerStateToToolStatus(data.state) : 'error'
current.error = (data?.error as string | undefined) || 'Tool execution failed'
current.endTime = Date.now()
},
tool_generating: (event, context) => {
const data = getEventData(event)
const toolCallId =
event.toolCallId ||
(data?.toolCallId as string | undefined) ||
(data?.id as string | undefined)
const toolName =
event.toolName || (data?.toolName as string | undefined) || (data?.name as string | undefined)
if (!toolCallId || !toolName) return
if (!context.toolCalls.has(toolCallId)) {
context.toolCalls.set(toolCallId, {
id: toolCallId,
name: toolName,
status: data?.state ? mapServerStateToToolStatus(data.state) : 'pending',
startTime: Date.now(),
})
}
},
tool_call: async (event, context, execContext, options) => {
'copilot.tool.call': async (event, context, execContext, options) => {
const toolData = getEventData(event) || ({} as Record<string, unknown>)
const toolCallId = (toolData.id as string | undefined) || event.toolCallId
const toolName = (toolData.name as string | undefined) || event.toolName
@@ -374,7 +346,7 @@ export const sseHandlers: Record<string, SSEHandler> = {
'run tool'
)
},
reasoning: (event, context) => {
'copilot.phase.progress': (event, context) => {
const d = asRecord(event.data)
const phase = d.phase || asRecord(d.data).phase
if (phase === 'start') {
@@ -398,7 +370,7 @@ export const sseHandlers: Record<string, SSEHandler> = {
if (!chunk || !context.currentThinkingBlock) return
context.currentThinkingBlock.content = `${context.currentThinkingBlock.content || ''}${chunk}`
},
content: (event, context) => {
'copilot.content': (event, context) => {
// Go backend sends content as a plain string in event.data, not wrapped in an object.
let chunk: string | undefined
if (typeof event.data === 'string') {
@@ -411,20 +383,20 @@ export const sseHandlers: Record<string, SSEHandler> = {
context.accumulatedContent += chunk
addContentBlock(context, { type: 'text', content: chunk })
},
done: (event, context) => {
'copilot.phase.completed': (event, context) => {
const d = asRecord(event.data)
if (d.responseId) {
context.conversationId = d.responseId as string
}
context.streamComplete = true
},
start: (event, context) => {
'copilot.phase.started': (event, context) => {
const d = asRecord(event.data)
if (d.responseId) {
context.conversationId = d.responseId as string
}
},
error: (event, context) => {
'copilot.error': (event, context) => {
const d = asRecord(event.data)
const message = (d.message || d.error || event.error) as string | undefined
if (message) {
@@ -435,7 +407,7 @@ export const sseHandlers: Record<string, SSEHandler> = {
}
export const subAgentHandlers: Record<string, SSEHandler> = {
content: (event, context) => {
'copilot.content': (event, context) => {
const parentToolCallId = context.subAgentParentToolCallId
if (!parentToolCallId || !event.data) return
// Go backend sends content as a plain string in event.data
@@ -451,7 +423,7 @@ export const subAgentHandlers: Record<string, SSEHandler> = {
(context.subAgentContent[parentToolCallId] || '') + chunk
addContentBlock(context, { type: 'subagent_text', content: chunk })
},
tool_call: async (event, context, execContext, options) => {
'copilot.tool.call': async (event, context, execContext, options) => {
const parentToolCallId = context.subAgentParentToolCallId
if (!parentToolCallId) return
const toolData = getEventData(event) || ({} as Record<string, unknown>)
@@ -500,7 +472,7 @@ export const subAgentHandlers: Record<string, SSEHandler> = {
'subagent run tool'
)
},
tool_result: (event, context) => {
'copilot.tool.result': (event, context) => {
const parentToolCallId = context.subAgentParentToolCallId
if (!parentToolCallId) return
const data = getEventData(event)
@@ -540,8 +512,22 @@ export const subAgentHandlers: Record<string, SSEHandler> = {
}
}
},
'copilot.phase.progress': () => {
// Subagent reasoning chunks are surfaced via copilot.content.
},
'copilot.phase.completed': () => {},
}
sseHandlers['copilot.tool.interrupt_required'] = sseHandlers['copilot.tool.call']
sseHandlers['copilot.workflow.patch'] = sseHandlers['copilot.tool.result']
sseHandlers['copilot.workflow.verify'] = sseHandlers['copilot.tool.result']
sseHandlers['copilot.tool.interrupt_resolved'] = sseHandlers['copilot.tool.result']
subAgentHandlers['copilot.tool.interrupt_required'] = subAgentHandlers['copilot.tool.call']
subAgentHandlers['copilot.workflow.patch'] = subAgentHandlers['copilot.tool.result']
subAgentHandlers['copilot.workflow.verify'] = subAgentHandlers['copilot.tool.result']
subAgentHandlers['copilot.tool.interrupt_resolved'] = subAgentHandlers['copilot.tool.result']
export function handleSubagentRouting(event: SSEEvent, context: StreamingContext): boolean {
if (!event.subagent) return false
if (!context.subAgentParentToolCallId) {

View File

@@ -80,7 +80,7 @@ export async function executeToolAndReport(
})
const resultEvent: SSEEvent = {
type: 'tool_result',
type: 'copilot.tool.result',
toolCallId: toolCall.id,
toolName: toolCall.name,
success: result.success,
@@ -88,6 +88,8 @@ export async function executeToolAndReport(
data: {
id: toolCall.id,
name: toolCall.name,
phase: 'completed',
state: result.success ? 'success' : 'error',
success: result.success,
result: result.output,
},
@@ -110,11 +112,16 @@ export async function executeToolAndReport(
})
const errorEvent: SSEEvent = {
type: 'tool_error',
type: 'copilot.tool.result',
toolCallId: toolCall.id,
toolName: toolCall.name,
success: false,
data: {
id: toolCall.id,
name: toolCall.name,
phase: 'completed',
state: 'error',
success: false,
error: toolCall.error,
},
}

View File

@@ -11,7 +11,7 @@ import {
describe('sse-utils', () => {
it.concurrent('normalizes tool fields from string data', () => {
const event = {
type: 'tool_result',
type: 'copilot.tool.result',
data: JSON.stringify({
id: 'tool_1',
name: 'workflow_change',
@@ -22,21 +22,62 @@ describe('sse-utils', () => {
const normalized = normalizeSseEvent(event as any)
expect(normalized.type).toBe('copilot.tool.result')
expect(normalized.toolCallId).toBe('tool_1')
expect(normalized.toolName).toBe('workflow_change')
expect(normalized.success).toBe(true)
expect(normalized.result).toEqual({ ok: true })
})
it.concurrent('dedupes tool_call events', () => {
const event = { type: 'tool_call', data: { id: 'tool_call_1', name: 'plan' } }
it.concurrent('maps copilot tool event aliases and preserves tool metadata', () => {
const event = {
type: 'copilot.tool.interrupt_required',
data: {
id: 'tool_legacy_1',
name: 'workflow_run',
state: 'pending',
ui: { showInterrupt: true },
},
}
const normalized = normalizeSseEvent(event as any)
expect(normalized.type).toBe('copilot.tool.interrupt_required')
expect(normalized.toolCallId).toBe('tool_legacy_1')
expect(normalized.toolName).toBe('workflow_run')
})
it.concurrent('keeps copilot content event type when payload is plain string', () => {
const event = {
type: 'copilot.content',
data: 'hello world',
}
const normalized = normalizeSseEvent(event as any)
expect(normalized.type).toBe('copilot.content')
expect(normalized.data).toBe('hello world')
})
it.concurrent('dedupes copilot tool call events', () => {
const event = { type: 'copilot.tool.call', data: { id: 'tool_call_1', name: 'plan' } }
expect(shouldSkipToolCallEvent(event as any)).toBe(false)
expect(shouldSkipToolCallEvent(event as any)).toBe(true)
})
it.concurrent('dedupes tool_result events', () => {
const event = { type: 'tool_result', data: { id: 'tool_result_1', name: 'plan' } }
it.concurrent('dedupes copilot tool result events', () => {
const event = { type: 'copilot.tool.result', data: { id: 'tool_result_1', name: 'plan' } }
expect(shouldSkipToolResultEvent(event as any)).toBe(false)
expect(shouldSkipToolResultEvent(event as any)).toBe(true)
})
it.concurrent('dedupes copilot workflow patch result events', () => {
const normalized = normalizeSseEvent({
type: 'copilot.workflow.patch',
data: { id: 'tool_result_aliased_1', name: 'workflow_change' },
} as any)
expect(shouldSkipToolResultEvent(normalized as any)).toBe(false)
expect(shouldSkipToolResultEvent(normalized as any)).toBe(true)
})
})

View File

@@ -101,8 +101,21 @@ export function wasToolResultSeen(toolCallId: string): boolean {
return seenToolResults.has(toolCallId)
}
function isToolCallEventType(type: string): boolean {
return type === 'copilot.tool.call' || type === 'copilot.tool.interrupt_required'
}
function isToolResultEventType(type: string): boolean {
return (
type === 'copilot.tool.result' ||
type === 'copilot.workflow.patch' ||
type === 'copilot.workflow.verify' ||
type === 'copilot.tool.interrupt_resolved'
)
}
export function shouldSkipToolCallEvent(event: SSEEvent): boolean {
if (event.type !== 'tool_call') return false
if (!isToolCallEventType(String(event.type || ''))) return false
const toolCallId = getToolCallIdFromEvent(event)
if (!toolCallId) return false
const eventData = getEventData(event)
@@ -115,7 +128,7 @@ export function shouldSkipToolCallEvent(event: SSEEvent): boolean {
}
export function shouldSkipToolResultEvent(event: SSEEvent): boolean {
if (event.type !== 'tool_result') return false
if (!isToolResultEventType(String(event.type || ''))) return false
const toolCallId = getToolCallIdFromEvent(event)
if (!toolCallId) return false
if (wasToolResultSeen(toolCallId)) return true

View File

@@ -97,8 +97,8 @@ describe('stream-buffer', () => {
})
it.concurrent('replays events after a given event id', async () => {
await appendStreamEvent('stream-1', { type: 'content', data: 'hello' })
await appendStreamEvent('stream-1', { type: 'content', data: 'world' })
await appendStreamEvent('stream-1', { type: 'copilot.content', data: 'hello' })
await appendStreamEvent('stream-1', { type: 'copilot.content', data: 'world' })
const allEvents = await readStreamEvents('stream-1', 0)
expect(allEvents.map((entry) => entry.event.data)).toEqual(['hello', 'world'])
@@ -109,8 +109,8 @@ describe('stream-buffer', () => {
it.concurrent('flushes buffered events for resume', async () => {
const writer = createStreamEventWriter('stream-2')
await writer.write({ type: 'content', data: 'a' })
await writer.write({ type: 'content', data: 'b' })
await writer.write({ type: 'copilot.content', data: 'a' })
await writer.write({ type: 'copilot.content', data: 'b' })
await writer.flush()
const events = await readStreamEvents('stream-2', 0)

View File

@@ -127,7 +127,7 @@ export async function runStreamLoop(
}
// Standard subagent start/end handling.
if (normalizedEvent.type === 'subagent_start') {
if (normalizedEvent.type === 'copilot.subagent.started') {
const eventData = normalizedEvent.data as Record<string, unknown> | undefined
const toolCallId = eventData?.tool_call_id as string | undefined
if (toolCallId) {
@@ -138,7 +138,7 @@ export async function runStreamLoop(
continue
}
if (normalizedEvent.type === 'subagent_end') {
if (normalizedEvent.type === 'copilot.subagent.completed') {
context.subAgentParentToolCallId = undefined
continue
}

View File

@@ -74,7 +74,7 @@ export async function orchestrateSubagentStream(
}
// For direct subagent calls, events may have the subagent field set
// but no subagent_start because this IS the top-level agent.
// but no copilot.subagent.started because this IS the top-level agent.
// Skip subagent routing for events where the subagent field matches
// the current agentId - these are top-level events.
if (event.subagent === agentId && !ctx.subAgentParentToolCallId) {

View File

@@ -1,19 +1,22 @@
export type SSEEventType =
| 'chat_id'
| 'title_updated'
| 'content'
| 'reasoning'
| 'tool_call'
| 'tool_generating'
| 'tool_result'
| 'tool_error'
| 'subagent_start'
| 'subagent_end'
| 'structured_result'
| 'subagent_result'
| 'done'
| 'error'
| 'start'
| 'stream_end'
| 'copilot.phase.started'
| 'copilot.phase.progress'
| 'copilot.phase.completed'
| 'copilot.tool.call'
| 'copilot.tool.result'
| 'copilot.tool.interrupt_required'
| 'copilot.tool.interrupt_resolved'
| 'copilot.workflow.patch'
| 'copilot.workflow.verify'
| 'copilot.subagent.started'
| 'copilot.subagent.completed'
| 'copilot.content'
| 'copilot.error'
export interface SSEEvent {
type: SSEEventType

View File

@@ -21,7 +21,7 @@ import {
} from './change-store'
import { applyWorkflowOperations } from './workflow-operations/apply'
import { applyOperationsToWorkflowState } from './workflow-operations/engine'
import { preValidateCredentialInputs } from './workflow-operations/validation'
import { preValidateCredentialInputs, validateInputsForBlock } from './workflow-operations/validation'
import { workflowVerifyServerTool } from './workflow-verify'
import { hashWorkflowState, loadWorkflowStateFromDb } from './workflow-state'
@@ -351,7 +351,8 @@ function normalizePostApply(postApply?: PostApply): NormalizedPostApply {
const evaluator = postApply?.evaluator
return {
verify: postApply?.verify !== false,
// Verification is mandatory for apply mode to keep mutation semantics deterministic.
verify: true,
run: {
enabled: run?.enabled === true,
mode: run?.mode || 'full',
@@ -697,6 +698,10 @@ async function compileChangeSpec(params: {
}
userId: string
workflowId: string
schemaContext: {
contextPackProvided: boolean
loadedSchemaTypes: Set<string>
}
}): Promise<{
operations: Array<Record<string, any>>
warnings: string[]
@@ -704,7 +709,7 @@ async function compileChangeSpec(params: {
touchedBlocks: string[]
resolvedIds: Record<string, string>
}> {
const { changeSpec, workflowState, userId, workflowId } = params
const { changeSpec, workflowState, userId, workflowId, schemaContext } = params
const operations: Array<Record<string, any>> = []
const diagnostics: string[] = []
const warnings: string[] = []
@@ -717,6 +722,54 @@ async function compileChangeSpec(params: {
const connectionTouchedSources = new Set<string>()
const plannedBlockTypes = new Map<string, string>()
const isSchemaLoaded = (blockType: string | null): boolean =>
Boolean(blockType && schemaContext.loadedSchemaTypes.has(blockType))
const requireSchema = (
targetId: string,
blockType: string | null,
operationName: 'patch_block' | 'ensure_block'
): boolean => {
if (!blockType) {
diagnostics.push(`${operationName} on ${targetId} failed: unknown block type`)
return false
}
if (isSchemaLoaded(blockType)) {
return true
}
if (schemaContext.contextPackProvided) {
diagnostics.push(
`${operationName} on ${targetId} requires schema for block type "${blockType}". ` +
`Call workflow_context_expand with blockTypes:["${blockType}"] and retry dry_run.`
)
return false
}
diagnostics.push(
`${operationName} on ${targetId} requires schema-loaded context. ` +
`Call workflow_context_get, then workflow_context_expand for "${blockType}", ` +
`then retry dry_run with contextPackId.`
)
return false
}
const normalizeInputsWithSchema = (
targetId: string,
blockType: string,
inputs: Record<string, any>,
operationName: 'patch_block' | 'ensure_block'
): Record<string, any> => {
if (!requireSchema(targetId, blockType, operationName)) {
return {}
}
const validation = validateInputsForBlock(blockType, inputs, targetId)
for (const validationError of validation.errors) {
diagnostics.push(
`${operationName} on ${targetId} failed input validation: ${validationError.error}`
)
}
return validation.validInputs
}
const recordResolved = (token: string | undefined, blockId: string | null | undefined): void => {
if (!token || !blockId) return
resolvedIds[token] = blockId
@@ -800,6 +853,9 @@ async function compileChangeSpec(params: {
diagnostics.push(`attach_credential on ${targetId} failed: unknown block type`)
return
}
if (!requireSchema(targetId, blockType, 'patch_block')) {
return
}
const credentialFieldId = selectCredentialFieldId(blockType, provider)
if (!credentialFieldId) {
const msg = `No oauth input field found for block type "${blockType}" on ${targetId}`
@@ -838,6 +894,30 @@ async function compileChangeSpec(params: {
diagnostics.push(`${change.op} on ${targetId} has invalid input path "${change.path}"`)
return
}
if (!blockType) {
diagnostics.push(`patch_block on ${targetId} failed: unknown block type`)
return
}
if (!requireSchema(targetId, blockType, 'patch_block')) {
return
}
const blockConfig = getBlock(blockType)
if (!blockConfig) {
diagnostics.push(`patch_block on ${targetId} failed: unknown block type "${blockType}"`)
return
}
const knownInputIds = new Set((blockConfig.subBlocks || []).map((subBlock) => subBlock.id))
const allowsDynamicInputs = blockType === 'loop' || blockType === 'parallel'
if (!allowsDynamicInputs && !knownInputIds.has(inputKey)) {
const knownFields = [...knownInputIds].sort()
const preview = knownFields.slice(0, 12).join(', ')
const suffix = knownFields.length > 12 ? ', ...' : ''
diagnostics.push(
`Unknown input field "${inputKey}" for block type "${blockType}" on ${targetId} ` +
`at path "${change.path}".${preview ? ` Known fields: ${preview}${suffix}` : ''}`
)
return
}
const currentInputValue =
paramsOut.inputs?.[inputKey] ??
@@ -931,7 +1011,24 @@ async function compileChangeSpec(params: {
const editParams: Record<string, any> = {}
if (mutation.name) editParams.name = mutation.name
if (mutation.type) editParams.type = mutation.type
if (mutation.inputs) editParams.inputs = mutation.inputs
if (mutation.inputs) {
const targetBlockType =
String(
mutation.type ||
workingState.blocks[targetId]?.type ||
plannedBlockTypes.get(targetId) ||
''
) || ''
const validatedInputs = normalizeInputsWithSchema(
targetId,
targetBlockType,
mutation.inputs,
'ensure_block'
)
if (Object.keys(validatedInputs).length > 0) {
editParams.inputs = validatedInputs
}
}
if (mutation.triggerMode !== undefined) editParams.triggerMode = mutation.triggerMode
if (mutation.advancedMode !== undefined) editParams.advancedMode = mutation.advancedMode
if (mutation.enabled !== undefined) editParams.enabled = mutation.enabled
@@ -955,7 +1052,19 @@ async function compileChangeSpec(params: {
type: mutation.type,
name: mutation.name,
}
if (mutation.inputs) addParams.inputs = mutation.inputs
let normalizedInputs: Record<string, any> | undefined
if (mutation.inputs) {
const validatedInputs = normalizeInputsWithSchema(
targetId,
mutation.type,
mutation.inputs,
'ensure_block'
)
if (Object.keys(validatedInputs).length > 0) {
normalizedInputs = validatedInputs
addParams.inputs = validatedInputs
}
}
if (mutation.triggerMode !== undefined) addParams.triggerMode = mutation.triggerMode
if (mutation.advancedMode !== undefined) addParams.advancedMode = mutation.advancedMode
if (mutation.enabled !== undefined) addParams.enabled = mutation.enabled
@@ -969,7 +1078,7 @@ async function compileChangeSpec(params: {
type: mutation.type,
name: mutation.name,
subBlocks: Object.fromEntries(
Object.entries(mutation.inputs || {}).map(([key, value]) => [
Object.entries(normalizedInputs || {}).map(([key, value]) => [
key,
{ id: key, value, type: 'short-input' },
])
@@ -996,7 +1105,10 @@ async function compileChangeSpec(params: {
if (mutation.action === 'patch_block') {
const targetId = resolveTarget(mutation.target)
if (!targetId) {
diagnostics.push('patch_block target could not be resolved')
diagnostics.push(
'patch_block target could not be resolved. Use target.alias or target.match, ' +
'or refresh workflow_context_get after prior apply before retrying.'
)
continue
}
const blockType =
@@ -1007,7 +1119,7 @@ async function compileChangeSpec(params: {
applyPatchChange(targetId, blockType, change, editParams)
}
if (Object.keys(editParams).length === 0) {
warnings.push(`patch_block for ${targetId} had no effective changes`)
diagnostics.push(`patch_block for ${targetId} had no effective changes`)
continue
}
operations.push({
@@ -1022,7 +1134,10 @@ async function compileChangeSpec(params: {
if (mutation.action === 'remove_block') {
const targetId = resolveTarget(mutation.target)
if (!targetId) {
diagnostics.push('remove_block target could not be resolved')
diagnostics.push(
'remove_block target could not be resolved. Use target.alias or target.match, ' +
'or refresh workflow_context_get after prior apply before retrying.'
)
continue
}
operations.push({
@@ -1046,7 +1161,10 @@ async function compileChangeSpec(params: {
const from = resolveTarget(mutation.from)
const to = resolveTarget(mutation.to)
if (!from || !to) {
diagnostics.push(`${mutation.action} requires resolvable from/to targets`)
diagnostics.push(
`${mutation.action} requires resolvable from/to targets. Prefer alias/match selectors, ` +
'or refresh workflow_context_get after prior apply before retrying.'
)
continue
}
const sourceHandle = normalizeHandle(mutation.handle)
@@ -1088,7 +1206,10 @@ async function compileChangeSpec(params: {
true
)
if (!from || !to) {
diagnostics.push('link contains unresolved from/to target')
diagnostics.push(
'link contains unresolved from/to target. Prefer alias/match selectors, ' +
'or refresh workflow_context_get after prior apply before retrying.'
)
continue
}
@@ -1192,7 +1313,7 @@ async function validateAndSimulateOperations(params: {
params.workflowState
)
for (const error of preValidationErrors) {
warnings.push(error.error)
diagnostics.push(error.error)
}
const { state, validationErrors, skippedItems } = applyOperationsToWorkflowState(
@@ -1202,15 +1323,20 @@ async function validateAndSimulateOperations(params: {
)
for (const validationError of validationErrors) {
warnings.push(validationError.error)
diagnostics.push(validationError.error)
}
for (const skippedItem of skippedItems) {
warnings.push(skippedItem.reason)
diagnostics.push(skippedItem.reason)
}
if (Object.keys(state.blocks || {}).length === 0) {
diagnostics.push('Simulation produced an empty workflow state')
}
const beforeHash = hashWorkflowState(params.workflowState as unknown as Record<string, unknown>)
const afterHash = hashWorkflowState(state as unknown as Record<string, unknown>)
if (beforeHash === afterHash) {
diagnostics.push('Simulation produced no effective workflow changes')
}
return {
operationsForApply: filteredOperations as Array<Record<string, any>>,
@@ -1263,6 +1389,10 @@ export const workflowChangeServerTool: BaseServerTool<WorkflowChangeParams, any>
workflowState,
userId: context.userId,
workflowId,
schemaContext: {
contextPackProvided: Boolean(contextPack),
loadedSchemaTypes: new Set(Object.keys(contextPack?.schemasByType || {})),
},
})
const simulation = await validateAndSimulateOperations({
@@ -1345,6 +1475,15 @@ export const workflowChangeServerTool: BaseServerTool<WorkflowChangeParams, any>
if (!proposal) {
throw new Error(`Proposal not found or expired: ${proposalId}`)
}
if (Array.isArray(proposal.diagnostics) && proposal.diagnostics.length > 0) {
throw new Error(
`proposal_invalid: proposal contains diagnostics (${proposal.diagnostics.length}). ` +
`Fix dry_run diagnostics before apply.`
)
}
if (!Array.isArray(proposal.compiledOperations) || proposal.compiledOperations.length === 0) {
throw new Error('proposal_invalid: proposal contains no operations to apply')
}
const authorization = await authorizeWorkflowByWorkspacePermission({
workflowId: proposal.workflowId,

View File

@@ -1962,7 +1962,7 @@ export const useCopilotStore = create<CopilotStore>()(
}
if (!context.wasAborted && sseHandlers.stream_end) {
sseHandlers.stream_end({ type: 'done' }, context, get, set)
sseHandlers.stream_end({ type: 'copilot.phase.completed' }, context, get, set)
}
stopStreamingUpdates()