diff --git a/apps/sim/lib/copilot/client-sse/handlers.ts b/apps/sim/lib/copilot/client-sse/handlers.ts index 1887c540d..10b1b4733 100644 --- a/apps/sim/lib/copilot/client-sse/handlers.ts +++ b/apps/sim/lib/copilot/client-sse/handlers.ts @@ -273,7 +273,7 @@ export const sseHandlers: Record = { chatListTitle: updatedState.chats.find((c) => c.id === targetChatId)?.title || null, }) }, - tool_result: (data, context, get, set) => { + 'copilot.tool.result': (data, context, get, set) => { try { const eventData = asRecord(data?.data) const toolCallId: string | undefined = @@ -428,123 +428,7 @@ export const sseHandlers: Record = { }) } }, - tool_error: (data, context, get, set) => { - try { - const errorData = asRecord(data?.data) - const toolCallId: string | undefined = - data?.toolCallId || - (errorData.id as string | undefined) || - (errorData.callId as string | undefined) - const failedDependency: boolean = data?.failedDependency === true - if (!toolCallId) return - const { toolCallsById } = get() - const current = toolCallsById[toolCallId] - if (current) { - if ( - isRejectedState(current.state) || - isReviewState(current.state) || - isBackgroundState(current.state) - ) { - return - } - const targetState = errorData.state - ? mapServerStateToClientState(errorData.state) - : failedDependency - ? ClientToolCallState.rejected - : ClientToolCallState.error - const uiMetadata = extractToolUiMetadata(errorData) - const executionMetadata = extractToolExecutionMetadata(errorData) - const updatedMap = { ...toolCallsById } - updatedMap[toolCallId] = { - ...current, - ui: uiMetadata || current.ui, - execution: executionMetadata || current.execution, - state: targetState, - display: resolveDisplayFromServerUi( - current.name, - targetState, - current.id, - current.params, - uiMetadata || current.ui - ), - } - set({ toolCallsById: updatedMap }) - } - for (let i = 0; i < context.contentBlocks.length; i++) { - const b = context.contentBlocks[i] - if (b?.type === 'tool_call' && b?.toolCall?.id === toolCallId) { - if ( - isRejectedState(b.toolCall?.state) || - isReviewState(b.toolCall?.state) || - isBackgroundState(b.toolCall?.state) - ) - break - const targetState = errorData.state - ? mapServerStateToClientState(errorData.state) - : failedDependency - ? ClientToolCallState.rejected - : ClientToolCallState.error - const uiMetadata = extractToolUiMetadata(errorData) - const executionMetadata = extractToolExecutionMetadata(errorData) - context.contentBlocks[i] = { - ...b, - toolCall: { - ...b.toolCall, - ui: uiMetadata || b.toolCall?.ui, - execution: executionMetadata || b.toolCall?.execution, - state: targetState, - display: resolveDisplayFromServerUi( - b.toolCall?.name, - targetState, - toolCallId, - b.toolCall?.params, - uiMetadata || b.toolCall?.ui - ), - }, - } - break - } - } - updateStreamingMessage(set, context) - } catch (error) { - logger.warn('Failed to process tool_error SSE event', { - error: error instanceof Error ? error.message : String(error), - }) - } - }, - tool_generating: (data, context, get, set) => { - const eventData = asRecord(data?.data) - const toolCallId = - data?.toolCallId || - (eventData.id as string | undefined) || - (eventData.callId as string | undefined) - const toolName = - data?.toolName || - (eventData.name as string | undefined) || - (eventData.toolName as string | undefined) - if (!toolCallId || !toolName) return - const { toolCallsById } = get() - - if (!toolCallsById[toolCallId]) { - const initialState = ClientToolCallState.generating - const uiMetadata = extractToolUiMetadata(eventData) - const tc: CopilotToolCall = { - id: toolCallId, - name: toolName, - state: initialState, - ui: uiMetadata, - execution: extractToolExecutionMetadata(eventData), - display: resolveDisplayFromServerUi(toolName, initialState, toolCallId, undefined, uiMetadata), - } - const updated = { ...toolCallsById, [toolCallId]: tc } - set({ toolCallsById: updated }) - logger.info('[toolCallsById] map updated', updated) - - upsertToolCallBlock(context, tc) - updateStreamingMessage(set, context) - } - }, - tool_call: (data, context, get, set) => { + 'copilot.tool.call': (data, context, get, set) => { const toolData = asRecord(data?.data) const id: string | undefined = (toolData.id as string | undefined) || @@ -647,7 +531,7 @@ export const sseHandlers: Record = { return }, - reasoning: (data, context, _get, set) => { + 'copilot.phase.progress': (data, context, _get, set) => { const phase = (data && (data.phase || data?.data?.phase)) as string | undefined if (phase === 'start') { beginThinkingBlock(context) @@ -664,7 +548,7 @@ export const sseHandlers: Record = { appendThinkingContent(context, chunk) updateStreamingMessage(set, context) }, - content: (data, context, get, set) => { + 'copilot.content': (data, context, get, set) => { if (!data.data) return context.pendingContent += data.data @@ -879,7 +763,7 @@ export const sseHandlers: Record = { updateStreamingMessage(set, context) } }, - done: (_data, context) => { + 'copilot.phase.completed': (_data, context) => { logger.info('[SSE] DONE EVENT RECEIVED', { doneEventCount: context.doneEventCount, data: _data, @@ -890,7 +774,7 @@ export const sseHandlers: Record = { context.streamComplete = true } }, - error: (data, context, _get, set) => { + 'copilot.error': (data, context, _get, set) => { logger.error('Stream error:', data.error) set((state: CopilotStore) => ({ messages: state.messages.map((msg) => @@ -905,6 +789,7 @@ export const sseHandlers: Record = { })) context.streamComplete = true }, + 'copilot.phase.started': () => {}, stream_end: (_data, context, _get, set) => { if (context.pendingContent) { if (context.isInThinkingBlock && context.currentThinkingBlock) { @@ -919,3 +804,8 @@ export const sseHandlers: Record = { }, default: () => {}, } + +sseHandlers['copilot.tool.interrupt_required'] = sseHandlers['copilot.tool.call'] +sseHandlers['copilot.workflow.patch'] = sseHandlers['copilot.tool.result'] +sseHandlers['copilot.workflow.verify'] = sseHandlers['copilot.tool.result'] +sseHandlers['copilot.tool.interrupt_resolved'] = sseHandlers['copilot.tool.result'] diff --git a/apps/sim/lib/copilot/client-sse/subagent-handlers.test.ts b/apps/sim/lib/copilot/client-sse/subagent-handlers.test.ts new file mode 100644 index 000000000..16aaa2a5a --- /dev/null +++ b/apps/sim/lib/copilot/client-sse/subagent-handlers.test.ts @@ -0,0 +1,172 @@ +/** + * @vitest-environment node + */ +import { describe, expect, it, vi } from 'vitest' +import { applySseEvent } from '@/lib/copilot/client-sse/subagent-handlers' +import type { ClientStreamingContext } from '@/lib/copilot/client-sse/types' +import { ClientToolCallState } from '@/lib/copilot/tools/client/tool-display-registry' +import type { CopilotStore, CopilotToolCall } from '@/stores/panel/copilot/types' + +type StoreSet = ( + partial: Partial | ((state: CopilotStore) => Partial) +) => void + +function createTestStore(initialToolCalls: Record) { + const state: Partial = { + messages: [{ id: 'assistant-msg', role: 'assistant', content: '', timestamp: new Date().toISOString() }], + toolCallsById: { ...initialToolCalls }, + currentChat: null, + chats: [], + activeStream: null, + updatePlanTodoStatus: vi.fn(), + handleNewChatCreation: vi.fn().mockResolvedValue(undefined), + } + + const get = () => state as CopilotStore + const set: StoreSet = (partial) => { + const patch = typeof partial === 'function' ? partial(get()) : partial + Object.assign(state, patch) + } + + return { get, set } +} + +function createStreamingContext(): ClientStreamingContext { + return { + messageId: 'assistant-msg', + accumulatedContent: '', + contentBlocks: [], + currentTextBlock: null, + isInThinkingBlock: false, + currentThinkingBlock: null, + isInDesignWorkflowBlock: false, + designWorkflowContent: '', + pendingContent: '', + doneEventCount: 0, + streamComplete: false, + subAgentContent: {}, + subAgentToolCalls: {}, + subAgentBlocks: {}, + suppressStreamingUpdates: true, + } +} + +describe('client SSE copilot.* stream smoke', () => { + it('processes main tool call/result events with copilot.* keys', async () => { + const { get, set } = createTestStore({}) + const context = createStreamingContext() + + await applySseEvent( + { + type: 'copilot.tool.call', + data: { id: 'main-tool-1', name: 'get_user_workflow', state: 'executing', arguments: {} }, + } as any, + context, + get, + set + ) + + await applySseEvent( + { + type: 'copilot.tool.result', + toolCallId: 'main-tool-1', + success: true, + result: { ok: true }, + data: { + id: 'main-tool-1', + name: 'get_user_workflow', + phase: 'completed', + state: 'success', + success: true, + result: { ok: true }, + }, + } as any, + context, + get, + set + ) + + expect(get().toolCallsById['main-tool-1']).toBeDefined() + expect(get().toolCallsById['main-tool-1'].state).toBe(ClientToolCallState.success) + expect( + context.contentBlocks.some( + (block) => block.type === 'tool_call' && block.toolCall?.id === 'main-tool-1' + ) + ).toBe(true) + }) + + it('processes subagent start/tool/result/end with copilot.* keys', async () => { + const parentToolCallId = 'parent-edit-tool' + const { get, set } = createTestStore({ + [parentToolCallId]: { + id: parentToolCallId, + name: 'edit', + state: ClientToolCallState.executing, + }, + }) + const context = createStreamingContext() + + await applySseEvent( + { + type: 'copilot.subagent.started', + subagent: 'edit', + data: { tool_call_id: parentToolCallId }, + } as any, + context, + get, + set + ) + + await applySseEvent( + { + type: 'copilot.tool.call', + subagent: 'edit', + data: { + id: 'sub-tool-1', + name: 'workflow_context_get', + state: 'executing', + arguments: { includeSchemas: false }, + }, + } as any, + context, + get, + set + ) + + await applySseEvent( + { + type: 'copilot.tool.result', + subagent: 'edit', + data: { + id: 'sub-tool-1', + name: 'workflow_context_get', + phase: 'completed', + state: 'success', + success: true, + result: { contextPackId: 'pack-1' }, + }, + } as any, + context, + get, + set + ) + + await applySseEvent( + { + type: 'copilot.subagent.completed', + subagent: 'edit', + data: {}, + } as any, + context, + get, + set + ) + + const parentToolCall = get().toolCallsById[parentToolCallId] + expect(parentToolCall).toBeDefined() + expect(parentToolCall.subAgentStreaming).toBe(false) + expect(parentToolCall.subAgentToolCalls?.length).toBe(1) + expect(parentToolCall.subAgentToolCalls?.[0]?.id).toBe('sub-tool-1') + expect(parentToolCall.subAgentToolCalls?.[0]?.state).toBe(ClientToolCallState.success) + }) +}) diff --git a/apps/sim/lib/copilot/client-sse/subagent-handlers.ts b/apps/sim/lib/copilot/client-sse/subagent-handlers.ts index 9b494ac3e..57f542b5f 100644 --- a/apps/sim/lib/copilot/client-sse/subagent-handlers.ts +++ b/apps/sim/lib/copilot/client-sse/subagent-handlers.ts @@ -124,11 +124,11 @@ export function updateToolCallWithSubAgentData( } export const subAgentSSEHandlers: Record = { - start: () => { - // Subagent start event - no action needed, parent is already tracked from subagent_start + 'copilot.phase.started': () => { + // No-op: parent subagent association is handled by copilot.subagent.started. }, - content: (data, context, get, set) => { + 'copilot.content': (data, context, get, set) => { const parentToolCallId = context.subAgentParentToolCallId const contentStr = typeof data.data === 'string' ? data.data : data.content || '' logger.info('[SubAgent] content event', { @@ -149,7 +149,7 @@ export const subAgentSSEHandlers: Record = { updateToolCallWithSubAgentData(context, get, set, parentToolCallId) }, - reasoning: (data, context, get, set) => { + 'copilot.phase.progress': (data, context, get, set) => { const parentToolCallId = context.subAgentParentToolCallId const dataObj = asRecord(data?.data) const phase = data?.phase || (dataObj.phase as string | undefined) @@ -165,11 +165,7 @@ export const subAgentSSEHandlers: Record = { updateToolCallWithSubAgentData(context, get, set, parentToolCallId) }, - tool_generating: () => { - // Tool generating event - no action needed, we'll handle the actual tool_call - }, - - tool_call: async (data, context, get, set) => { + 'copilot.tool.call': async (data, context, get, set) => { const parentToolCallId = context.subAgentParentToolCallId if (!parentToolCallId) return @@ -269,7 +265,7 @@ export const subAgentSSEHandlers: Record = { } }, - tool_result: (data, context, get, set) => { + 'copilot.tool.result': (data, context, get, set) => { const parentToolCallId = context.subAgentParentToolCallId if (!parentToolCallId) return @@ -368,7 +364,7 @@ export const subAgentSSEHandlers: Record = { updateToolCallWithSubAgentData(context, get, set, parentToolCallId) }, - done: (_data, context, get, set) => { + 'copilot.phase.completed': (_data, context, get, set) => { const parentToolCallId = context.subAgentParentToolCallId if (!parentToolCallId) return @@ -376,6 +372,11 @@ export const subAgentSSEHandlers: Record = { }, } +subAgentSSEHandlers['copilot.tool.interrupt_required'] = subAgentSSEHandlers['copilot.tool.call'] +subAgentSSEHandlers['copilot.workflow.patch'] = subAgentSSEHandlers['copilot.tool.result'] +subAgentSSEHandlers['copilot.workflow.verify'] = subAgentSSEHandlers['copilot.tool.result'] +subAgentSSEHandlers['copilot.tool.interrupt_resolved'] = subAgentSSEHandlers['copilot.tool.result'] + export async function applySseEvent( rawData: SSEEvent, context: ClientStreamingContext, @@ -388,7 +389,7 @@ export async function applySseEvent( } const data = normalizedEvent - if (data.type === 'subagent_start') { + if (data.type === 'copilot.subagent.started') { const startData = asRecord(data.data) const toolCallId = startData.tool_call_id as string | undefined if (toolCallId) { @@ -411,7 +412,7 @@ export async function applySseEvent( return true } - if (data.type === 'subagent_end') { + if (data.type === 'copilot.subagent.completed') { const parentToolCallId = context.subAgentParentToolCallId if (parentToolCallId) { const { toolCallsById } = get() diff --git a/apps/sim/lib/copilot/orchestrator/sse-handlers.test.ts b/apps/sim/lib/copilot/orchestrator/sse-handlers.test.ts index fcc5abf43..b073aab8e 100644 --- a/apps/sim/lib/copilot/orchestrator/sse-handlers.test.ts +++ b/apps/sim/lib/copilot/orchestrator/sse-handlers.test.ts @@ -54,14 +54,14 @@ describe('sse-handlers tool lifecycle', () => { } }) - it('executes tool_call and emits tool_result + mark-complete', async () => { + it('executes copilot.tool.call and emits copilot.tool.result + mark-complete', async () => { executeToolServerSide.mockResolvedValueOnce({ success: true, output: { ok: true } }) markToolComplete.mockResolvedValueOnce(true) const onEvent = vi.fn() - await sseHandlers.tool_call( + await sseHandlers['copilot.tool.call']( { - type: 'tool_call', + type: 'copilot.tool.call', data: { id: 'tool-1', name: 'get_user_workflow', arguments: { workflowId: 'workflow-1' } }, } as any, context, @@ -73,7 +73,7 @@ describe('sse-handlers tool lifecycle', () => { expect(markToolComplete).toHaveBeenCalledTimes(1) expect(onEvent).toHaveBeenCalledWith( expect.objectContaining({ - type: 'tool_result', + type: 'copilot.tool.result', toolCallId: 'tool-1', success: true, }) @@ -84,17 +84,17 @@ describe('sse-handlers tool lifecycle', () => { expect(updated?.result?.output).toEqual({ ok: true }) }) - it('skips duplicate tool_call after result', async () => { + it('skips duplicate copilot.tool.call after result', async () => { executeToolServerSide.mockResolvedValueOnce({ success: true, output: { ok: true } }) markToolComplete.mockResolvedValueOnce(true) const event = { - type: 'tool_call', + type: 'copilot.tool.call', data: { id: 'tool-dup', name: 'get_user_workflow', arguments: { workflowId: 'workflow-1' } }, } - await sseHandlers.tool_call(event as any, context, execContext, { interactive: false }) - await sseHandlers.tool_call(event as any, context, execContext, { interactive: false }) + await sseHandlers['copilot.tool.call'](event as any, context, execContext, { interactive: false }) + await sseHandlers['copilot.tool.call'](event as any, context, execContext, { interactive: false }) expect(executeToolServerSide).toHaveBeenCalledTimes(1) expect(markToolComplete).toHaveBeenCalledTimes(1) diff --git a/apps/sim/lib/copilot/orchestrator/sse-handlers/handlers.ts b/apps/sim/lib/copilot/orchestrator/sse-handlers/handlers.ts index 5ff335e50..0ed38942d 100644 --- a/apps/sim/lib/copilot/orchestrator/sse-handlers/handlers.ts +++ b/apps/sim/lib/copilot/orchestrator/sse-handlers/handlers.ts @@ -266,7 +266,7 @@ export const sseHandlers: Record = { context.chatId = asRecord(event.data).chatId as string | undefined }, title_updated: () => {}, - tool_result: (event, context) => { + 'copilot.tool.result': (event, context) => { const data = getEventData(event) const toolCallId = event.toolCallId || (data?.id as string | undefined) if (!toolCallId) return @@ -292,35 +292,7 @@ export const sseHandlers: Record = { current.error = (data?.error || resultObj.error) as string | undefined } }, - tool_error: (event, context) => { - const data = getEventData(event) - const toolCallId = event.toolCallId || (data?.id as string | undefined) - if (!toolCallId) return - const current = context.toolCalls.get(toolCallId) - if (!current) return - current.status = data?.state ? mapServerStateToToolStatus(data.state) : 'error' - current.error = (data?.error as string | undefined) || 'Tool execution failed' - current.endTime = Date.now() - }, - tool_generating: (event, context) => { - const data = getEventData(event) - const toolCallId = - event.toolCallId || - (data?.toolCallId as string | undefined) || - (data?.id as string | undefined) - const toolName = - event.toolName || (data?.toolName as string | undefined) || (data?.name as string | undefined) - if (!toolCallId || !toolName) return - if (!context.toolCalls.has(toolCallId)) { - context.toolCalls.set(toolCallId, { - id: toolCallId, - name: toolName, - status: data?.state ? mapServerStateToToolStatus(data.state) : 'pending', - startTime: Date.now(), - }) - } - }, - tool_call: async (event, context, execContext, options) => { + 'copilot.tool.call': async (event, context, execContext, options) => { const toolData = getEventData(event) || ({} as Record) const toolCallId = (toolData.id as string | undefined) || event.toolCallId const toolName = (toolData.name as string | undefined) || event.toolName @@ -374,7 +346,7 @@ export const sseHandlers: Record = { 'run tool' ) }, - reasoning: (event, context) => { + 'copilot.phase.progress': (event, context) => { const d = asRecord(event.data) const phase = d.phase || asRecord(d.data).phase if (phase === 'start') { @@ -398,7 +370,7 @@ export const sseHandlers: Record = { if (!chunk || !context.currentThinkingBlock) return context.currentThinkingBlock.content = `${context.currentThinkingBlock.content || ''}${chunk}` }, - content: (event, context) => { + 'copilot.content': (event, context) => { // Go backend sends content as a plain string in event.data, not wrapped in an object. let chunk: string | undefined if (typeof event.data === 'string') { @@ -411,20 +383,20 @@ export const sseHandlers: Record = { context.accumulatedContent += chunk addContentBlock(context, { type: 'text', content: chunk }) }, - done: (event, context) => { + 'copilot.phase.completed': (event, context) => { const d = asRecord(event.data) if (d.responseId) { context.conversationId = d.responseId as string } context.streamComplete = true }, - start: (event, context) => { + 'copilot.phase.started': (event, context) => { const d = asRecord(event.data) if (d.responseId) { context.conversationId = d.responseId as string } }, - error: (event, context) => { + 'copilot.error': (event, context) => { const d = asRecord(event.data) const message = (d.message || d.error || event.error) as string | undefined if (message) { @@ -435,7 +407,7 @@ export const sseHandlers: Record = { } export const subAgentHandlers: Record = { - content: (event, context) => { + 'copilot.content': (event, context) => { const parentToolCallId = context.subAgentParentToolCallId if (!parentToolCallId || !event.data) return // Go backend sends content as a plain string in event.data @@ -451,7 +423,7 @@ export const subAgentHandlers: Record = { (context.subAgentContent[parentToolCallId] || '') + chunk addContentBlock(context, { type: 'subagent_text', content: chunk }) }, - tool_call: async (event, context, execContext, options) => { + 'copilot.tool.call': async (event, context, execContext, options) => { const parentToolCallId = context.subAgentParentToolCallId if (!parentToolCallId) return const toolData = getEventData(event) || ({} as Record) @@ -500,7 +472,7 @@ export const subAgentHandlers: Record = { 'subagent run tool' ) }, - tool_result: (event, context) => { + 'copilot.tool.result': (event, context) => { const parentToolCallId = context.subAgentParentToolCallId if (!parentToolCallId) return const data = getEventData(event) @@ -540,8 +512,22 @@ export const subAgentHandlers: Record = { } } }, + 'copilot.phase.progress': () => { + // Subagent reasoning chunks are surfaced via copilot.content. + }, + 'copilot.phase.completed': () => {}, } +sseHandlers['copilot.tool.interrupt_required'] = sseHandlers['copilot.tool.call'] +sseHandlers['copilot.workflow.patch'] = sseHandlers['copilot.tool.result'] +sseHandlers['copilot.workflow.verify'] = sseHandlers['copilot.tool.result'] +sseHandlers['copilot.tool.interrupt_resolved'] = sseHandlers['copilot.tool.result'] + +subAgentHandlers['copilot.tool.interrupt_required'] = subAgentHandlers['copilot.tool.call'] +subAgentHandlers['copilot.workflow.patch'] = subAgentHandlers['copilot.tool.result'] +subAgentHandlers['copilot.workflow.verify'] = subAgentHandlers['copilot.tool.result'] +subAgentHandlers['copilot.tool.interrupt_resolved'] = subAgentHandlers['copilot.tool.result'] + export function handleSubagentRouting(event: SSEEvent, context: StreamingContext): boolean { if (!event.subagent) return false if (!context.subAgentParentToolCallId) { diff --git a/apps/sim/lib/copilot/orchestrator/sse-handlers/tool-execution.ts b/apps/sim/lib/copilot/orchestrator/sse-handlers/tool-execution.ts index 2582cc275..b7fcca185 100644 --- a/apps/sim/lib/copilot/orchestrator/sse-handlers/tool-execution.ts +++ b/apps/sim/lib/copilot/orchestrator/sse-handlers/tool-execution.ts @@ -80,7 +80,7 @@ export async function executeToolAndReport( }) const resultEvent: SSEEvent = { - type: 'tool_result', + type: 'copilot.tool.result', toolCallId: toolCall.id, toolName: toolCall.name, success: result.success, @@ -88,6 +88,8 @@ export async function executeToolAndReport( data: { id: toolCall.id, name: toolCall.name, + phase: 'completed', + state: result.success ? 'success' : 'error', success: result.success, result: result.output, }, @@ -110,11 +112,16 @@ export async function executeToolAndReport( }) const errorEvent: SSEEvent = { - type: 'tool_error', + type: 'copilot.tool.result', toolCallId: toolCall.id, + toolName: toolCall.name, + success: false, data: { id: toolCall.id, name: toolCall.name, + phase: 'completed', + state: 'error', + success: false, error: toolCall.error, }, } diff --git a/apps/sim/lib/copilot/orchestrator/sse-utils.test.ts b/apps/sim/lib/copilot/orchestrator/sse-utils.test.ts index 5a6796b2e..37a058fec 100644 --- a/apps/sim/lib/copilot/orchestrator/sse-utils.test.ts +++ b/apps/sim/lib/copilot/orchestrator/sse-utils.test.ts @@ -11,7 +11,7 @@ import { describe('sse-utils', () => { it.concurrent('normalizes tool fields from string data', () => { const event = { - type: 'tool_result', + type: 'copilot.tool.result', data: JSON.stringify({ id: 'tool_1', name: 'workflow_change', @@ -22,21 +22,62 @@ describe('sse-utils', () => { const normalized = normalizeSseEvent(event as any) + expect(normalized.type).toBe('copilot.tool.result') expect(normalized.toolCallId).toBe('tool_1') expect(normalized.toolName).toBe('workflow_change') expect(normalized.success).toBe(true) expect(normalized.result).toEqual({ ok: true }) }) - it.concurrent('dedupes tool_call events', () => { - const event = { type: 'tool_call', data: { id: 'tool_call_1', name: 'plan' } } + it.concurrent('maps copilot tool event aliases and preserves tool metadata', () => { + const event = { + type: 'copilot.tool.interrupt_required', + data: { + id: 'tool_legacy_1', + name: 'workflow_run', + state: 'pending', + ui: { showInterrupt: true }, + }, + } + + const normalized = normalizeSseEvent(event as any) + + expect(normalized.type).toBe('copilot.tool.interrupt_required') + expect(normalized.toolCallId).toBe('tool_legacy_1') + expect(normalized.toolName).toBe('workflow_run') + }) + + it.concurrent('keeps copilot content event type when payload is plain string', () => { + const event = { + type: 'copilot.content', + data: 'hello world', + } + + const normalized = normalizeSseEvent(event as any) + + expect(normalized.type).toBe('copilot.content') + expect(normalized.data).toBe('hello world') + }) + + it.concurrent('dedupes copilot tool call events', () => { + const event = { type: 'copilot.tool.call', data: { id: 'tool_call_1', name: 'plan' } } expect(shouldSkipToolCallEvent(event as any)).toBe(false) expect(shouldSkipToolCallEvent(event as any)).toBe(true) }) - it.concurrent('dedupes tool_result events', () => { - const event = { type: 'tool_result', data: { id: 'tool_result_1', name: 'plan' } } + it.concurrent('dedupes copilot tool result events', () => { + const event = { type: 'copilot.tool.result', data: { id: 'tool_result_1', name: 'plan' } } expect(shouldSkipToolResultEvent(event as any)).toBe(false) expect(shouldSkipToolResultEvent(event as any)).toBe(true) }) + + it.concurrent('dedupes copilot workflow patch result events', () => { + const normalized = normalizeSseEvent({ + type: 'copilot.workflow.patch', + data: { id: 'tool_result_aliased_1', name: 'workflow_change' }, + } as any) + + expect(shouldSkipToolResultEvent(normalized as any)).toBe(false) + expect(shouldSkipToolResultEvent(normalized as any)).toBe(true) + }) }) diff --git a/apps/sim/lib/copilot/orchestrator/sse-utils.ts b/apps/sim/lib/copilot/orchestrator/sse-utils.ts index afcbf2111..1769af67a 100644 --- a/apps/sim/lib/copilot/orchestrator/sse-utils.ts +++ b/apps/sim/lib/copilot/orchestrator/sse-utils.ts @@ -101,8 +101,21 @@ export function wasToolResultSeen(toolCallId: string): boolean { return seenToolResults.has(toolCallId) } +function isToolCallEventType(type: string): boolean { + return type === 'copilot.tool.call' || type === 'copilot.tool.interrupt_required' +} + +function isToolResultEventType(type: string): boolean { + return ( + type === 'copilot.tool.result' || + type === 'copilot.workflow.patch' || + type === 'copilot.workflow.verify' || + type === 'copilot.tool.interrupt_resolved' + ) +} + export function shouldSkipToolCallEvent(event: SSEEvent): boolean { - if (event.type !== 'tool_call') return false + if (!isToolCallEventType(String(event.type || ''))) return false const toolCallId = getToolCallIdFromEvent(event) if (!toolCallId) return false const eventData = getEventData(event) @@ -115,7 +128,7 @@ export function shouldSkipToolCallEvent(event: SSEEvent): boolean { } export function shouldSkipToolResultEvent(event: SSEEvent): boolean { - if (event.type !== 'tool_result') return false + if (!isToolResultEventType(String(event.type || ''))) return false const toolCallId = getToolCallIdFromEvent(event) if (!toolCallId) return false if (wasToolResultSeen(toolCallId)) return true diff --git a/apps/sim/lib/copilot/orchestrator/stream-buffer.test.ts b/apps/sim/lib/copilot/orchestrator/stream-buffer.test.ts index 94458e452..cd7d36d08 100644 --- a/apps/sim/lib/copilot/orchestrator/stream-buffer.test.ts +++ b/apps/sim/lib/copilot/orchestrator/stream-buffer.test.ts @@ -97,8 +97,8 @@ describe('stream-buffer', () => { }) it.concurrent('replays events after a given event id', async () => { - await appendStreamEvent('stream-1', { type: 'content', data: 'hello' }) - await appendStreamEvent('stream-1', { type: 'content', data: 'world' }) + await appendStreamEvent('stream-1', { type: 'copilot.content', data: 'hello' }) + await appendStreamEvent('stream-1', { type: 'copilot.content', data: 'world' }) const allEvents = await readStreamEvents('stream-1', 0) expect(allEvents.map((entry) => entry.event.data)).toEqual(['hello', 'world']) @@ -109,8 +109,8 @@ describe('stream-buffer', () => { it.concurrent('flushes buffered events for resume', async () => { const writer = createStreamEventWriter('stream-2') - await writer.write({ type: 'content', data: 'a' }) - await writer.write({ type: 'content', data: 'b' }) + await writer.write({ type: 'copilot.content', data: 'a' }) + await writer.write({ type: 'copilot.content', data: 'b' }) await writer.flush() const events = await readStreamEvents('stream-2', 0) diff --git a/apps/sim/lib/copilot/orchestrator/stream-core.ts b/apps/sim/lib/copilot/orchestrator/stream-core.ts index e1dc2e2fc..b77ff47e5 100644 --- a/apps/sim/lib/copilot/orchestrator/stream-core.ts +++ b/apps/sim/lib/copilot/orchestrator/stream-core.ts @@ -127,7 +127,7 @@ export async function runStreamLoop( } // Standard subagent start/end handling. - if (normalizedEvent.type === 'subagent_start') { + if (normalizedEvent.type === 'copilot.subagent.started') { const eventData = normalizedEvent.data as Record | undefined const toolCallId = eventData?.tool_call_id as string | undefined if (toolCallId) { @@ -138,7 +138,7 @@ export async function runStreamLoop( continue } - if (normalizedEvent.type === 'subagent_end') { + if (normalizedEvent.type === 'copilot.subagent.completed') { context.subAgentParentToolCallId = undefined continue } diff --git a/apps/sim/lib/copilot/orchestrator/subagent.ts b/apps/sim/lib/copilot/orchestrator/subagent.ts index d997fcbb9..a0d65c4a9 100644 --- a/apps/sim/lib/copilot/orchestrator/subagent.ts +++ b/apps/sim/lib/copilot/orchestrator/subagent.ts @@ -74,7 +74,7 @@ export async function orchestrateSubagentStream( } // For direct subagent calls, events may have the subagent field set - // but no subagent_start because this IS the top-level agent. + // but no copilot.subagent.started because this IS the top-level agent. // Skip subagent routing for events where the subagent field matches // the current agentId - these are top-level events. if (event.subagent === agentId && !ctx.subAgentParentToolCallId) { diff --git a/apps/sim/lib/copilot/orchestrator/types.ts b/apps/sim/lib/copilot/orchestrator/types.ts index 3113a23b5..e107e5309 100644 --- a/apps/sim/lib/copilot/orchestrator/types.ts +++ b/apps/sim/lib/copilot/orchestrator/types.ts @@ -1,19 +1,22 @@ export type SSEEventType = | 'chat_id' | 'title_updated' - | 'content' - | 'reasoning' - | 'tool_call' - | 'tool_generating' - | 'tool_result' - | 'tool_error' - | 'subagent_start' - | 'subagent_end' | 'structured_result' | 'subagent_result' - | 'done' - | 'error' - | 'start' + | 'stream_end' + | 'copilot.phase.started' + | 'copilot.phase.progress' + | 'copilot.phase.completed' + | 'copilot.tool.call' + | 'copilot.tool.result' + | 'copilot.tool.interrupt_required' + | 'copilot.tool.interrupt_resolved' + | 'copilot.workflow.patch' + | 'copilot.workflow.verify' + | 'copilot.subagent.started' + | 'copilot.subagent.completed' + | 'copilot.content' + | 'copilot.error' export interface SSEEvent { type: SSEEventType diff --git a/apps/sim/lib/copilot/tools/server/workflow/workflow-change.ts b/apps/sim/lib/copilot/tools/server/workflow/workflow-change.ts index 6bec17732..96579e930 100644 --- a/apps/sim/lib/copilot/tools/server/workflow/workflow-change.ts +++ b/apps/sim/lib/copilot/tools/server/workflow/workflow-change.ts @@ -21,7 +21,7 @@ import { } from './change-store' import { applyWorkflowOperations } from './workflow-operations/apply' import { applyOperationsToWorkflowState } from './workflow-operations/engine' -import { preValidateCredentialInputs } from './workflow-operations/validation' +import { preValidateCredentialInputs, validateInputsForBlock } from './workflow-operations/validation' import { workflowVerifyServerTool } from './workflow-verify' import { hashWorkflowState, loadWorkflowStateFromDb } from './workflow-state' @@ -351,7 +351,8 @@ function normalizePostApply(postApply?: PostApply): NormalizedPostApply { const evaluator = postApply?.evaluator return { - verify: postApply?.verify !== false, + // Verification is mandatory for apply mode to keep mutation semantics deterministic. + verify: true, run: { enabled: run?.enabled === true, mode: run?.mode || 'full', @@ -697,6 +698,10 @@ async function compileChangeSpec(params: { } userId: string workflowId: string + schemaContext: { + contextPackProvided: boolean + loadedSchemaTypes: Set + } }): Promise<{ operations: Array> warnings: string[] @@ -704,7 +709,7 @@ async function compileChangeSpec(params: { touchedBlocks: string[] resolvedIds: Record }> { - const { changeSpec, workflowState, userId, workflowId } = params + const { changeSpec, workflowState, userId, workflowId, schemaContext } = params const operations: Array> = [] const diagnostics: string[] = [] const warnings: string[] = [] @@ -717,6 +722,54 @@ async function compileChangeSpec(params: { const connectionTouchedSources = new Set() const plannedBlockTypes = new Map() + const isSchemaLoaded = (blockType: string | null): boolean => + Boolean(blockType && schemaContext.loadedSchemaTypes.has(blockType)) + + const requireSchema = ( + targetId: string, + blockType: string | null, + operationName: 'patch_block' | 'ensure_block' + ): boolean => { + if (!blockType) { + diagnostics.push(`${operationName} on ${targetId} failed: unknown block type`) + return false + } + if (isSchemaLoaded(blockType)) { + return true + } + if (schemaContext.contextPackProvided) { + diagnostics.push( + `${operationName} on ${targetId} requires schema for block type "${blockType}". ` + + `Call workflow_context_expand with blockTypes:["${blockType}"] and retry dry_run.` + ) + return false + } + diagnostics.push( + `${operationName} on ${targetId} requires schema-loaded context. ` + + `Call workflow_context_get, then workflow_context_expand for "${blockType}", ` + + `then retry dry_run with contextPackId.` + ) + return false + } + + const normalizeInputsWithSchema = ( + targetId: string, + blockType: string, + inputs: Record, + operationName: 'patch_block' | 'ensure_block' + ): Record => { + if (!requireSchema(targetId, blockType, operationName)) { + return {} + } + const validation = validateInputsForBlock(blockType, inputs, targetId) + for (const validationError of validation.errors) { + diagnostics.push( + `${operationName} on ${targetId} failed input validation: ${validationError.error}` + ) + } + return validation.validInputs + } + const recordResolved = (token: string | undefined, blockId: string | null | undefined): void => { if (!token || !blockId) return resolvedIds[token] = blockId @@ -800,6 +853,9 @@ async function compileChangeSpec(params: { diagnostics.push(`attach_credential on ${targetId} failed: unknown block type`) return } + if (!requireSchema(targetId, blockType, 'patch_block')) { + return + } const credentialFieldId = selectCredentialFieldId(blockType, provider) if (!credentialFieldId) { const msg = `No oauth input field found for block type "${blockType}" on ${targetId}` @@ -838,6 +894,30 @@ async function compileChangeSpec(params: { diagnostics.push(`${change.op} on ${targetId} has invalid input path "${change.path}"`) return } + if (!blockType) { + diagnostics.push(`patch_block on ${targetId} failed: unknown block type`) + return + } + if (!requireSchema(targetId, blockType, 'patch_block')) { + return + } + const blockConfig = getBlock(blockType) + if (!blockConfig) { + diagnostics.push(`patch_block on ${targetId} failed: unknown block type "${blockType}"`) + return + } + const knownInputIds = new Set((blockConfig.subBlocks || []).map((subBlock) => subBlock.id)) + const allowsDynamicInputs = blockType === 'loop' || blockType === 'parallel' + if (!allowsDynamicInputs && !knownInputIds.has(inputKey)) { + const knownFields = [...knownInputIds].sort() + const preview = knownFields.slice(0, 12).join(', ') + const suffix = knownFields.length > 12 ? ', ...' : '' + diagnostics.push( + `Unknown input field "${inputKey}" for block type "${blockType}" on ${targetId} ` + + `at path "${change.path}".${preview ? ` Known fields: ${preview}${suffix}` : ''}` + ) + return + } const currentInputValue = paramsOut.inputs?.[inputKey] ?? @@ -931,7 +1011,24 @@ async function compileChangeSpec(params: { const editParams: Record = {} if (mutation.name) editParams.name = mutation.name if (mutation.type) editParams.type = mutation.type - if (mutation.inputs) editParams.inputs = mutation.inputs + if (mutation.inputs) { + const targetBlockType = + String( + mutation.type || + workingState.blocks[targetId]?.type || + plannedBlockTypes.get(targetId) || + '' + ) || '' + const validatedInputs = normalizeInputsWithSchema( + targetId, + targetBlockType, + mutation.inputs, + 'ensure_block' + ) + if (Object.keys(validatedInputs).length > 0) { + editParams.inputs = validatedInputs + } + } if (mutation.triggerMode !== undefined) editParams.triggerMode = mutation.triggerMode if (mutation.advancedMode !== undefined) editParams.advancedMode = mutation.advancedMode if (mutation.enabled !== undefined) editParams.enabled = mutation.enabled @@ -955,7 +1052,19 @@ async function compileChangeSpec(params: { type: mutation.type, name: mutation.name, } - if (mutation.inputs) addParams.inputs = mutation.inputs + let normalizedInputs: Record | undefined + if (mutation.inputs) { + const validatedInputs = normalizeInputsWithSchema( + targetId, + mutation.type, + mutation.inputs, + 'ensure_block' + ) + if (Object.keys(validatedInputs).length > 0) { + normalizedInputs = validatedInputs + addParams.inputs = validatedInputs + } + } if (mutation.triggerMode !== undefined) addParams.triggerMode = mutation.triggerMode if (mutation.advancedMode !== undefined) addParams.advancedMode = mutation.advancedMode if (mutation.enabled !== undefined) addParams.enabled = mutation.enabled @@ -969,7 +1078,7 @@ async function compileChangeSpec(params: { type: mutation.type, name: mutation.name, subBlocks: Object.fromEntries( - Object.entries(mutation.inputs || {}).map(([key, value]) => [ + Object.entries(normalizedInputs || {}).map(([key, value]) => [ key, { id: key, value, type: 'short-input' }, ]) @@ -996,7 +1105,10 @@ async function compileChangeSpec(params: { if (mutation.action === 'patch_block') { const targetId = resolveTarget(mutation.target) if (!targetId) { - diagnostics.push('patch_block target could not be resolved') + diagnostics.push( + 'patch_block target could not be resolved. Use target.alias or target.match, ' + + 'or refresh workflow_context_get after prior apply before retrying.' + ) continue } const blockType = @@ -1007,7 +1119,7 @@ async function compileChangeSpec(params: { applyPatchChange(targetId, blockType, change, editParams) } if (Object.keys(editParams).length === 0) { - warnings.push(`patch_block for ${targetId} had no effective changes`) + diagnostics.push(`patch_block for ${targetId} had no effective changes`) continue } operations.push({ @@ -1022,7 +1134,10 @@ async function compileChangeSpec(params: { if (mutation.action === 'remove_block') { const targetId = resolveTarget(mutation.target) if (!targetId) { - diagnostics.push('remove_block target could not be resolved') + diagnostics.push( + 'remove_block target could not be resolved. Use target.alias or target.match, ' + + 'or refresh workflow_context_get after prior apply before retrying.' + ) continue } operations.push({ @@ -1046,7 +1161,10 @@ async function compileChangeSpec(params: { const from = resolveTarget(mutation.from) const to = resolveTarget(mutation.to) if (!from || !to) { - diagnostics.push(`${mutation.action} requires resolvable from/to targets`) + diagnostics.push( + `${mutation.action} requires resolvable from/to targets. Prefer alias/match selectors, ` + + 'or refresh workflow_context_get after prior apply before retrying.' + ) continue } const sourceHandle = normalizeHandle(mutation.handle) @@ -1088,7 +1206,10 @@ async function compileChangeSpec(params: { true ) if (!from || !to) { - diagnostics.push('link contains unresolved from/to target') + diagnostics.push( + 'link contains unresolved from/to target. Prefer alias/match selectors, ' + + 'or refresh workflow_context_get after prior apply before retrying.' + ) continue } @@ -1192,7 +1313,7 @@ async function validateAndSimulateOperations(params: { params.workflowState ) for (const error of preValidationErrors) { - warnings.push(error.error) + diagnostics.push(error.error) } const { state, validationErrors, skippedItems } = applyOperationsToWorkflowState( @@ -1202,15 +1323,20 @@ async function validateAndSimulateOperations(params: { ) for (const validationError of validationErrors) { - warnings.push(validationError.error) + diagnostics.push(validationError.error) } for (const skippedItem of skippedItems) { - warnings.push(skippedItem.reason) + diagnostics.push(skippedItem.reason) } if (Object.keys(state.blocks || {}).length === 0) { diagnostics.push('Simulation produced an empty workflow state') } + const beforeHash = hashWorkflowState(params.workflowState as unknown as Record) + const afterHash = hashWorkflowState(state as unknown as Record) + if (beforeHash === afterHash) { + diagnostics.push('Simulation produced no effective workflow changes') + } return { operationsForApply: filteredOperations as Array>, @@ -1263,6 +1389,10 @@ export const workflowChangeServerTool: BaseServerTool workflowState, userId: context.userId, workflowId, + schemaContext: { + contextPackProvided: Boolean(contextPack), + loadedSchemaTypes: new Set(Object.keys(contextPack?.schemasByType || {})), + }, }) const simulation = await validateAndSimulateOperations({ @@ -1345,6 +1475,15 @@ export const workflowChangeServerTool: BaseServerTool if (!proposal) { throw new Error(`Proposal not found or expired: ${proposalId}`) } + if (Array.isArray(proposal.diagnostics) && proposal.diagnostics.length > 0) { + throw new Error( + `proposal_invalid: proposal contains diagnostics (${proposal.diagnostics.length}). ` + + `Fix dry_run diagnostics before apply.` + ) + } + if (!Array.isArray(proposal.compiledOperations) || proposal.compiledOperations.length === 0) { + throw new Error('proposal_invalid: proposal contains no operations to apply') + } const authorization = await authorizeWorkflowByWorkspacePermission({ workflowId: proposal.workflowId, diff --git a/apps/sim/stores/panel/copilot/store.ts b/apps/sim/stores/panel/copilot/store.ts index 96ce62e41..0a427629b 100644 --- a/apps/sim/stores/panel/copilot/store.ts +++ b/apps/sim/stores/panel/copilot/store.ts @@ -1962,7 +1962,7 @@ export const useCopilotStore = create()( } if (!context.wasAborted && sseHandlers.stream_end) { - sseHandlers.stream_end({ type: 'done' }, context, get, set) + sseHandlers.stream_end({ type: 'copilot.phase.completed' }, context, get, set) } stopStreamingUpdates()