diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/copilot/copilot.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/copilot/copilot.tsx index e52898e15..39e2a0095 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/copilot/copilot.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/copilot/copilot.tsx @@ -107,7 +107,6 @@ export const Copilot = forwardRef(({ panelWidth }, ref currentChat, selectChat, deleteChat, - areChatsFresh, workflowId: copilotWorkflowId, setPlanTodos, closePlanTodos, @@ -142,7 +141,6 @@ export const Copilot = forwardRef(({ panelWidth }, ref activeWorkflowId, copilotWorkflowId, loadChats, - areChatsFresh, isSendingMessage, } ) diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/copilot/hooks/use-chat-history.ts b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/copilot/hooks/use-chat-history.ts index 04f1cb033..0978c8335 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/copilot/hooks/use-chat-history.ts +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/copilot/hooks/use-chat-history.ts @@ -10,7 +10,6 @@ interface UseChatHistoryProps { activeWorkflowId: string | null copilotWorkflowId: string | null loadChats: (forceRefresh: boolean) => Promise - areChatsFresh: (workflowId: string) => boolean isSendingMessage: boolean } @@ -21,8 +20,7 @@ interface UseChatHistoryProps { * @returns Chat history utilities */ export function useChatHistory(props: UseChatHistoryProps) { - const { chats, activeWorkflowId, copilotWorkflowId, loadChats, areChatsFresh, isSendingMessage } = - props + const { chats, activeWorkflowId, copilotWorkflowId, loadChats, isSendingMessage } = props /** Groups chats by time period (Today, Yesterday, This Week, etc.) */ const groupedChats = useMemo(() => { @@ -80,7 +78,7 @@ export function useChatHistory(props: UseChatHistoryProps) { /** Handles history dropdown opening and loads chats if needed (non-blocking) */ const handleHistoryDropdownOpen = useCallback( (open: boolean) => { - if (open && activeWorkflowId && !isSendingMessage && !areChatsFresh(activeWorkflowId)) { + if (open && activeWorkflowId && !isSendingMessage) { loadChats(false).catch((error) => { logger.error('Failed to load chat history:', error) }) @@ -90,7 +88,7 @@ export function useChatHistory(props: UseChatHistoryProps) { logger.info('Chat history opened during stream - showing cached data only') } }, - [activeWorkflowId, areChatsFresh, isSendingMessage, loadChats] + [activeWorkflowId, isSendingMessage, loadChats] ) return { diff --git a/apps/sim/lib/copilot/client-sse/content-blocks.ts b/apps/sim/lib/copilot/client-sse/content-blocks.ts new file mode 100644 index 000000000..c2ee72458 --- /dev/null +++ b/apps/sim/lib/copilot/client-sse/content-blocks.ts @@ -0,0 +1,149 @@ +import type { + ChatContext, + CopilotMessage, + MessageFileAttachment, +} from '@/stores/panel/copilot/types' +import type { StreamingContext } from './types' + +const TEXT_BLOCK_TYPE = 'text' +const THINKING_BLOCK_TYPE = 'thinking' +const CONTINUE_OPTIONS_TAG = '{"1":"Continue"}' + +export function createUserMessage( + content: string, + fileAttachments?: MessageFileAttachment[], + contexts?: ChatContext[], + messageId?: string +): CopilotMessage { + return { + id: messageId || crypto.randomUUID(), + role: 'user', + content, + timestamp: new Date().toISOString(), + ...(fileAttachments && fileAttachments.length > 0 && { fileAttachments }), + ...(contexts && contexts.length > 0 && { contexts }), + ...(contexts && + contexts.length > 0 && { + contentBlocks: [ + { type: 'contexts', contexts: contexts as any, timestamp: Date.now() }, + ] as any, + }), + } +} + +export function createStreamingMessage(): CopilotMessage { + return { + id: crypto.randomUUID(), + role: 'assistant', + content: '', + timestamp: new Date().toISOString(), + } +} + +export function createErrorMessage( + messageId: string, + content: string, + errorType?: 'usage_limit' | 'unauthorized' | 'forbidden' | 'rate_limit' | 'upgrade_required' +): CopilotMessage { + return { + id: messageId, + role: 'assistant', + content, + timestamp: new Date().toISOString(), + contentBlocks: [ + { + type: 'text', + content, + timestamp: Date.now(), + }, + ], + errorType, + } +} + +export function appendTextBlock(context: StreamingContext, text: string) { + if (!text) return + context.accumulatedContent += text + if (context.currentTextBlock && context.contentBlocks.length > 0) { + const lastBlock = context.contentBlocks[context.contentBlocks.length - 1] + if (lastBlock.type === TEXT_BLOCK_TYPE && lastBlock === context.currentTextBlock) { + lastBlock.content += text + return + } + } + context.currentTextBlock = { type: '', content: '', timestamp: 0, toolCall: null } + context.currentTextBlock.type = TEXT_BLOCK_TYPE + context.currentTextBlock.content = text + context.currentTextBlock.timestamp = Date.now() + context.contentBlocks.push(context.currentTextBlock) +} + +export function appendContinueOption(content: string): string { + if (//i.test(content)) return content + const suffix = content.trim().length > 0 ? '\n\n' : '' + return `${content}${suffix}${CONTINUE_OPTIONS_TAG}` +} + +export function appendContinueOptionBlock(blocks: any[]): any[] { + if (!Array.isArray(blocks)) return blocks + const hasOptions = blocks.some( + (block) => + block?.type === TEXT_BLOCK_TYPE && + typeof block.content === 'string' && + //i.test(block.content) + ) + if (hasOptions) return blocks + return [ + ...blocks, + { + type: TEXT_BLOCK_TYPE, + content: CONTINUE_OPTIONS_TAG, + timestamp: Date.now(), + }, + ] +} + +export function stripContinueOption(content: string): string { + if (!content || !content.includes(CONTINUE_OPTIONS_TAG)) return content + const next = content.replace(CONTINUE_OPTIONS_TAG, '') + return next.replace(/\n{2,}\s*$/g, '\n').trimEnd() +} + +export function stripContinueOptionFromBlocks(blocks: any[]): any[] { + if (!Array.isArray(blocks)) return blocks + return blocks.flatMap((block) => { + if ( + block?.type === TEXT_BLOCK_TYPE && + typeof block.content === 'string' && + block.content.includes(CONTINUE_OPTIONS_TAG) + ) { + const nextContent = stripContinueOption(block.content) + if (!nextContent.trim()) return [] + return [{ ...block, content: nextContent }] + } + return [block] + }) +} + +export function beginThinkingBlock(context: StreamingContext) { + if (!context.currentThinkingBlock) { + context.currentThinkingBlock = { type: '', content: '', timestamp: 0, toolCall: null } + context.currentThinkingBlock.type = THINKING_BLOCK_TYPE + context.currentThinkingBlock.content = '' + context.currentThinkingBlock.timestamp = Date.now() + ;(context.currentThinkingBlock as any).startTime = Date.now() + context.contentBlocks.push(context.currentThinkingBlock) + } + context.isInThinkingBlock = true + context.currentTextBlock = null +} + +export function finalizeThinkingBlock(context: StreamingContext) { + if (context.currentThinkingBlock) { + context.currentThinkingBlock.duration = + Date.now() - (context.currentThinkingBlock.startTime || Date.now()) + } + context.isInThinkingBlock = false + context.currentThinkingBlock = null + context.currentTextBlock = null +} diff --git a/apps/sim/lib/copilot/client-sse/handlers.ts b/apps/sim/lib/copilot/client-sse/handlers.ts new file mode 100644 index 000000000..169917578 --- /dev/null +++ b/apps/sim/lib/copilot/client-sse/handlers.ts @@ -0,0 +1,720 @@ +import { createLogger } from '@sim/logger' +import { ClientToolCallState } from '@/lib/copilot/tools/client/tool-display-registry' +import { useWorkflowDiffStore } from '@/stores/workflow-diff/store' +import type { CopilotStore, CopilotToolCall } from '@/stores/panel/copilot/types' +import { + appendTextBlock, + beginThinkingBlock, + finalizeThinkingBlock, +} from './content-blocks' +import type { StreamingContext } from './types' +import { + isBackgroundState, + isRejectedState, + isReviewState, + resolveToolDisplay, +} from '@/lib/copilot/store-utils' + +const logger = createLogger('CopilotClientSseHandlers') +const STREAM_STORAGE_KEY = 'copilot_active_stream' +const TEXT_BLOCK_TYPE = 'text' +const MAX_BATCH_INTERVAL = 50 +const MIN_BATCH_INTERVAL = 16 +const MAX_QUEUE_SIZE = 5 + +function writeActiveStreamToStorage(info: any): void { + if (typeof window === 'undefined') return + try { + if (!info) { + window.sessionStorage.removeItem(STREAM_STORAGE_KEY) + return + } + window.sessionStorage.setItem(STREAM_STORAGE_KEY, JSON.stringify(info)) + } catch {} +} + +export type SSEHandler = ( + data: any, + context: StreamingContext, + get: () => CopilotStore, + set: any +) => Promise | void + +const streamingUpdateQueue = new Map() +let streamingUpdateRAF: number | null = null +let lastBatchTime = 0 + +export function stopStreamingUpdates() { + if (streamingUpdateRAF !== null) { + cancelAnimationFrame(streamingUpdateRAF) + streamingUpdateRAF = null + } + streamingUpdateQueue.clear() +} + +function createOptimizedContentBlocks(contentBlocks: any[]): any[] { + const result: any[] = new Array(contentBlocks.length) + for (let i = 0; i < contentBlocks.length; i++) { + const block = contentBlocks[i] + result[i] = { ...block } + } + return result +} + +export function flushStreamingUpdates(set: any) { + if (streamingUpdateRAF !== null) { + cancelAnimationFrame(streamingUpdateRAF) + streamingUpdateRAF = null + } + if (streamingUpdateQueue.size === 0) return + + const updates = new Map(streamingUpdateQueue) + streamingUpdateQueue.clear() + + set((state: CopilotStore) => { + if (updates.size === 0) return state + return { + messages: state.messages.map((msg) => { + const update = updates.get(msg.id) + if (update) { + return { + ...msg, + content: '', + contentBlocks: + update.contentBlocks.length > 0 ? createOptimizedContentBlocks(update.contentBlocks) : [], + } + } + return msg + }), + } + }) +} + +export function updateStreamingMessage(set: any, context: StreamingContext) { + if (context.suppressStreamingUpdates) return + const now = performance.now() + streamingUpdateQueue.set(context.messageId, context) + const timeSinceLastBatch = now - lastBatchTime + const shouldFlushImmediately = + streamingUpdateQueue.size >= MAX_QUEUE_SIZE || timeSinceLastBatch > MAX_BATCH_INTERVAL + + if (streamingUpdateRAF === null) { + const scheduleUpdate = () => { + streamingUpdateRAF = requestAnimationFrame(() => { + const updates = new Map(streamingUpdateQueue) + streamingUpdateQueue.clear() + streamingUpdateRAF = null + lastBatchTime = performance.now() + set((state: CopilotStore) => { + if (updates.size === 0) return state + const messages = state.messages + const lastMessage = messages[messages.length - 1] + const lastMessageUpdate = lastMessage ? updates.get(lastMessage.id) : null + if (updates.size === 1 && lastMessageUpdate) { + const newMessages = [...messages] + newMessages[messages.length - 1] = { + ...lastMessage, + content: '', + contentBlocks: + lastMessageUpdate.contentBlocks.length > 0 + ? createOptimizedContentBlocks(lastMessageUpdate.contentBlocks) + : [], + } + return { messages: newMessages } + } + return { + messages: messages.map((msg) => { + const update = updates.get(msg.id) + if (update) { + return { + ...msg, + content: '', + contentBlocks: + update.contentBlocks.length > 0 + ? createOptimizedContentBlocks(update.contentBlocks) + : [], + } + } + return msg + }), + } + }) + }) + } + if (shouldFlushImmediately) scheduleUpdate() + else setTimeout(scheduleUpdate, Math.max(0, MIN_BATCH_INTERVAL - timeSinceLastBatch)) + } +} + +export function upsertToolCallBlock(context: StreamingContext, toolCall: CopilotToolCall) { + let found = false + for (let i = 0; i < context.contentBlocks.length; i++) { + const b = context.contentBlocks[i] as any + if (b.type === 'tool_call' && b.toolCall?.id === toolCall.id) { + context.contentBlocks[i] = { ...b, toolCall } + found = true + break + } + } + if (!found) { + context.contentBlocks.push({ type: 'tool_call', toolCall, timestamp: Date.now() }) + } +} + +function stripThinkingTags(text: string): string { + return text.replace(/<\/?thinking[^>]*>/gi, '').replace(/<\/?thinking[^&]*>/gi, '') +} + +function appendThinkingContent(context: StreamingContext, text: string) { + if (!text) return + const cleanedText = stripThinkingTags(text) + if (!cleanedText) return + if (context.currentThinkingBlock) { + context.currentThinkingBlock.content += cleanedText + } else { + context.currentThinkingBlock = { type: '', content: '', timestamp: 0, toolCall: null } + context.currentThinkingBlock.type = 'thinking' + context.currentThinkingBlock.content = cleanedText + context.currentThinkingBlock.timestamp = Date.now() + context.currentThinkingBlock.startTime = Date.now() + context.contentBlocks.push(context.currentThinkingBlock) + } + context.isInThinkingBlock = true + context.currentTextBlock = null +} + +export const sseHandlers: Record = { + chat_id: async (data, context, get, set) => { + context.newChatId = data.chatId + const { currentChat, activeStream } = get() + if (!currentChat && context.newChatId) { + await get().handleNewChatCreation(context.newChatId) + } + if (activeStream && context.newChatId && !activeStream.chatId) { + const updatedStream = { ...activeStream, chatId: context.newChatId } + set({ activeStream: updatedStream }) + writeActiveStreamToStorage(updatedStream) + } + }, + title_updated: (_data, _context, get, set) => { + const title = _data.title + if (!title) return + const { currentChat, chats } = get() + if (currentChat) { + set({ + currentChat: { ...currentChat, title }, + chats: chats.map((c) => (c.id === currentChat.id ? { ...c, title } : c)), + }) + } + }, + tool_result: (data, context, get, set) => { + try { + const toolCallId: string | undefined = data?.toolCallId || data?.data?.id + const success: boolean | undefined = data?.success + const failedDependency: boolean = data?.failedDependency === true + const skipped: boolean = data?.result?.skipped === true + if (!toolCallId) return + const { toolCallsById } = get() + const current = toolCallsById[toolCallId] + if (current) { + if ( + isRejectedState(current.state) || + isReviewState(current.state) || + isBackgroundState(current.state) + ) { + return + } + const targetState = success + ? ClientToolCallState.success + : failedDependency || skipped + ? ClientToolCallState.rejected + : ClientToolCallState.error + const updatedMap = { ...toolCallsById } + updatedMap[toolCallId] = { + ...current, + state: targetState, + display: resolveToolDisplay( + current.name, + targetState, + current.id, + (current as any).params + ), + } + set({ toolCallsById: updatedMap }) + + if (targetState === ClientToolCallState.success && current.name === 'checkoff_todo') { + try { + const result = (data?.result || data?.data?.result) ?? {} + const input = ((current as any).params || (current as any).input) ?? {} + const todoId = input.id || input.todoId || result.id || result.todoId + if (todoId) { + get().updatePlanTodoStatus(todoId, 'completed') + } + } catch {} + } + + if ( + targetState === ClientToolCallState.success && + current.name === 'mark_todo_in_progress' + ) { + try { + const result = (data?.result || data?.data?.result) ?? {} + const input = ((current as any).params || (current as any).input) ?? {} + const todoId = input.id || input.todoId || result.id || result.todoId + if (todoId) { + get().updatePlanTodoStatus(todoId, 'executing') + } + } catch {} + } + + if (current.name === 'edit_workflow') { + try { + const resultPayload = + (data?.result || data?.data?.result || data?.data?.data || data?.data) ?? {} + const workflowState = resultPayload?.workflowState + logger.info('[SSE] edit_workflow result received', { + hasWorkflowState: !!workflowState, + blockCount: workflowState ? Object.keys(workflowState.blocks ?? {}).length : 0, + edgeCount: workflowState?.edges?.length ?? 0, + }) + if (workflowState) { + const diffStore = useWorkflowDiffStore.getState() + diffStore.setProposedChanges(workflowState).catch((err) => { + logger.error('[SSE] Failed to apply edit_workflow diff', { + error: err instanceof Error ? err.message : String(err), + }) + }) + } + } catch (err) { + logger.error('[SSE] edit_workflow result handling failed', { + error: err instanceof Error ? err.message : String(err), + }) + } + } + } + + for (let i = 0; i < context.contentBlocks.length; i++) { + const b = context.contentBlocks[i] as any + if (b?.type === 'tool_call' && b?.toolCall?.id === toolCallId) { + if ( + isRejectedState(b.toolCall?.state) || + isReviewState(b.toolCall?.state) || + isBackgroundState(b.toolCall?.state) + ) + break + const targetState = success + ? ClientToolCallState.success + : failedDependency || skipped + ? ClientToolCallState.rejected + : ClientToolCallState.error + context.contentBlocks[i] = { + ...b, + toolCall: { + ...b.toolCall, + state: targetState, + display: resolveToolDisplay( + b.toolCall?.name, + targetState, + toolCallId, + b.toolCall?.params + ), + }, + } + break + } + } + updateStreamingMessage(set, context) + } catch {} + }, + tool_error: (data, context, get, set) => { + try { + const toolCallId: string | undefined = data?.toolCallId || data?.data?.id + const failedDependency: boolean = data?.failedDependency === true + if (!toolCallId) return + const { toolCallsById } = get() + const current = toolCallsById[toolCallId] + if (current) { + if ( + isRejectedState(current.state) || + isReviewState(current.state) || + isBackgroundState(current.state) + ) { + return + } + const targetState = failedDependency + ? ClientToolCallState.rejected + : ClientToolCallState.error + const updatedMap = { ...toolCallsById } + updatedMap[toolCallId] = { + ...current, + state: targetState, + display: resolveToolDisplay( + current.name, + targetState, + current.id, + (current as any).params + ), + } + set({ toolCallsById: updatedMap }) + } + for (let i = 0; i < context.contentBlocks.length; i++) { + const b = context.contentBlocks[i] as any + if (b?.type === 'tool_call' && b?.toolCall?.id === toolCallId) { + if ( + isRejectedState(b.toolCall?.state) || + isReviewState(b.toolCall?.state) || + isBackgroundState(b.toolCall?.state) + ) + break + const targetState = failedDependency + ? ClientToolCallState.rejected + : ClientToolCallState.error + context.contentBlocks[i] = { + ...b, + toolCall: { + ...b.toolCall, + state: targetState, + display: resolveToolDisplay( + b.toolCall?.name, + targetState, + toolCallId, + b.toolCall?.params + ), + }, + } + break + } + } + updateStreamingMessage(set, context) + } catch {} + }, + tool_generating: (data, context, get, set) => { + const { toolCallId, toolName } = data + if (!toolCallId || !toolName) return + const { toolCallsById } = get() + + if (!toolCallsById[toolCallId]) { + const initialState = ClientToolCallState.pending + const tc: CopilotToolCall = { + id: toolCallId, + name: toolName, + state: initialState, + display: resolveToolDisplay(toolName, initialState, toolCallId), + } + const updated = { ...toolCallsById, [toolCallId]: tc } + set({ toolCallsById: updated }) + logger.info('[toolCallsById] map updated', updated) + + upsertToolCallBlock(context, tc) + updateStreamingMessage(set, context) + } + }, + tool_call: (data, context, get, set) => { + const toolData = data?.data ?? {} + const id: string | undefined = toolData.id || data?.toolCallId + const name: string | undefined = toolData.name || data?.toolName + if (!id) return + const args = toolData.arguments + const isPartial = toolData.partial === true + const { toolCallsById } = get() + + const existing = toolCallsById[id] + const next: CopilotToolCall = existing + ? { + ...existing, + state: ClientToolCallState.pending, + ...(args ? { params: args } : {}), + display: resolveToolDisplay(name, ClientToolCallState.pending, id, args), + } + : { + id, + name: name || 'unknown_tool', + state: ClientToolCallState.pending, + ...(args ? { params: args } : {}), + display: resolveToolDisplay(name, ClientToolCallState.pending, id, args), + } + const updated = { ...toolCallsById, [id]: next } + set({ toolCallsById: updated }) + logger.info('[toolCallsById] → pending', { id, name, params: args }) + + upsertToolCallBlock(context, next) + updateStreamingMessage(set, context) + + if (isPartial) { + return + } + + return + }, + reasoning: (data, context, _get, set) => { + const phase = (data && (data.phase || data?.data?.phase)) as string | undefined + if (phase === 'start') { + beginThinkingBlock(context) + updateStreamingMessage(set, context) + return + } + if (phase === 'end') { + finalizeThinkingBlock(context) + updateStreamingMessage(set, context) + return + } + const chunk: string = typeof data?.data === 'string' ? data.data : data?.content || '' + if (!chunk) return + appendThinkingContent(context, chunk) + updateStreamingMessage(set, context) + }, + content: (data, context, get, set) => { + if (!data.data) return + context.pendingContent += data.data + + let contentToProcess = context.pendingContent + let hasProcessedContent = false + + const thinkingStartRegex = // + const thinkingEndRegex = /<\/thinking>/ + const designWorkflowStartRegex = // + const designWorkflowEndRegex = /<\/design_workflow>/ + + const splitTrailingPartialTag = ( + text: string, + tags: string[] + ): { text: string; remaining: string } => { + const partialIndex = text.lastIndexOf('<') + if (partialIndex < 0) { + return { text, remaining: '' } + } + const possibleTag = text.substring(partialIndex) + const matchesTagStart = tags.some((tag) => tag.startsWith(possibleTag)) + if (!matchesTagStart) { + return { text, remaining: '' } + } + return { + text: text.substring(0, partialIndex), + remaining: possibleTag, + } + } + + while (contentToProcess.length > 0) { + if (context.isInDesignWorkflowBlock) { + const endMatch = designWorkflowEndRegex.exec(contentToProcess) + if (endMatch) { + const designContent = contentToProcess.substring(0, endMatch.index) + context.designWorkflowContent += designContent + context.isInDesignWorkflowBlock = false + + logger.info('[design_workflow] Tag complete, setting plan content', { + contentLength: context.designWorkflowContent.length, + }) + set({ streamingPlanContent: context.designWorkflowContent }) + + contentToProcess = contentToProcess.substring(endMatch.index + endMatch[0].length) + hasProcessedContent = true + } else { + const { text, remaining } = splitTrailingPartialTag(contentToProcess, [ + '', + ]) + context.designWorkflowContent += text + + set({ streamingPlanContent: context.designWorkflowContent }) + + contentToProcess = remaining + hasProcessedContent = true + if (remaining) { + break + } + } + continue + } + + if (!context.isInThinkingBlock && !context.isInDesignWorkflowBlock) { + const designStartMatch = designWorkflowStartRegex.exec(contentToProcess) + if (designStartMatch) { + const textBeforeDesign = contentToProcess.substring(0, designStartMatch.index) + if (textBeforeDesign) { + appendTextBlock(context, textBeforeDesign) + hasProcessedContent = true + } + context.isInDesignWorkflowBlock = true + context.designWorkflowContent = '' + contentToProcess = contentToProcess.substring( + designStartMatch.index + designStartMatch[0].length + ) + hasProcessedContent = true + continue + } + + const nextMarkIndex = contentToProcess.indexOf('') + const nextCheckIndex = contentToProcess.indexOf('') + const hasMark = nextMarkIndex >= 0 + const hasCheck = nextCheckIndex >= 0 + + const nextTagIndex = + hasMark && hasCheck + ? Math.min(nextMarkIndex, nextCheckIndex) + : hasMark + ? nextMarkIndex + : hasCheck + ? nextCheckIndex + : -1 + + if (nextTagIndex >= 0) { + const isMarkTodo = hasMark && nextMarkIndex === nextTagIndex + const tagStart = isMarkTodo ? '' : '' + const tagEnd = isMarkTodo ? '' : '' + const closingIndex = contentToProcess.indexOf(tagEnd, nextTagIndex + tagStart.length) + + if (closingIndex === -1) { + break + } + + const todoId = contentToProcess + .substring(nextTagIndex + tagStart.length, closingIndex) + .trim() + logger.info( + isMarkTodo ? '[TODO] Detected marktodo tag' : '[TODO] Detected checkofftodo tag', + { todoId } + ) + + if (todoId) { + try { + get().updatePlanTodoStatus(todoId, isMarkTodo ? 'executing' : 'completed') + logger.info( + isMarkTodo + ? '[TODO] Successfully marked todo in progress' + : '[TODO] Successfully checked off todo', + { todoId } + ) + } catch (e) { + logger.error( + isMarkTodo + ? '[TODO] Failed to mark todo in progress' + : '[TODO] Failed to checkoff todo', + { todoId, error: e } + ) + } + } else { + logger.warn('[TODO] Empty todoId extracted from todo tag', { tagType: tagStart }) + } + + let beforeTag = contentToProcess.substring(0, nextTagIndex) + let afterTag = contentToProcess.substring(closingIndex + tagEnd.length) + + const hadNewlineBefore = /(\r?\n)+$/.test(beforeTag) + const hadNewlineAfter = /^(\r?\n)+/.test(afterTag) + + beforeTag = beforeTag.replace(/(\r?\n)+$/, '') + afterTag = afterTag.replace(/^(\r?\n)+/, '') + + contentToProcess = + beforeTag + (hadNewlineBefore && hadNewlineAfter ? '\n' : '') + afterTag + context.currentTextBlock = null + hasProcessedContent = true + continue + } + } + + if (context.isInThinkingBlock) { + const endMatch = thinkingEndRegex.exec(contentToProcess) + if (endMatch) { + const thinkingContent = contentToProcess.substring(0, endMatch.index) + appendThinkingContent(context, thinkingContent) + finalizeThinkingBlock(context) + contentToProcess = contentToProcess.substring(endMatch.index + endMatch[0].length) + hasProcessedContent = true + } else { + const { text, remaining } = splitTrailingPartialTag(contentToProcess, ['']) + if (text) { + appendThinkingContent(context, text) + hasProcessedContent = true + } + contentToProcess = remaining + if (remaining) { + break + } + } + } else { + const startMatch = thinkingStartRegex.exec(contentToProcess) + if (startMatch) { + const textBeforeThinking = contentToProcess.substring(0, startMatch.index) + if (textBeforeThinking) { + appendTextBlock(context, textBeforeThinking) + hasProcessedContent = true + } + context.isInThinkingBlock = true + context.currentTextBlock = null + contentToProcess = contentToProcess.substring(startMatch.index + startMatch[0].length) + hasProcessedContent = true + } else { + let partialTagIndex = contentToProcess.lastIndexOf('<') + + const partialMarkTodo = contentToProcess.lastIndexOf(' partialTagIndex) { + partialTagIndex = partialMarkTodo + } + if (partialCheckoffTodo > partialTagIndex) { + partialTagIndex = partialCheckoffTodo + } + + let textToAdd = contentToProcess + let remaining = '' + if (partialTagIndex >= 0 && partialTagIndex > contentToProcess.length - 50) { + textToAdd = contentToProcess.substring(0, partialTagIndex) + remaining = contentToProcess.substring(partialTagIndex) + } + if (textToAdd) { + appendTextBlock(context, textToAdd) + hasProcessedContent = true + } + contentToProcess = remaining + break + } + } + } + + context.pendingContent = contentToProcess + if (hasProcessedContent) { + updateStreamingMessage(set, context) + } + }, + done: (_data, context) => { + logger.info('[SSE] DONE EVENT RECEIVED', { + doneEventCount: context.doneEventCount, + data: _data, + }) + context.doneEventCount++ + if (context.doneEventCount >= 1) { + logger.info('[SSE] Setting streamComplete = true, stream will terminate') + context.streamComplete = true + } + }, + error: (data, context, _get, set) => { + logger.error('Stream error:', data.error) + set((state: CopilotStore) => ({ + messages: state.messages.map((msg) => + msg.id === context.messageId + ? { + ...msg, + content: context.accumulatedContent || 'An error occurred.', + error: data.error, + } + : msg + ), + })) + context.streamComplete = true + }, + stream_end: (_data, context, _get, set) => { + if (context.pendingContent) { + if (context.isInThinkingBlock && context.currentThinkingBlock) { + appendThinkingContent(context, context.pendingContent) + } else if (context.pendingContent.trim()) { + appendTextBlock(context, context.pendingContent) + } + context.pendingContent = '' + } + finalizeThinkingBlock(context) + updateStreamingMessage(set, context) + }, + default: () => {}, +} diff --git a/apps/sim/lib/copilot/client-sse/index.ts b/apps/sim/lib/copilot/client-sse/index.ts new file mode 100644 index 000000000..a08f89593 --- /dev/null +++ b/apps/sim/lib/copilot/client-sse/index.ts @@ -0,0 +1,3 @@ +export { sseHandlers } from './handlers' +export { subAgentSSEHandlers, applySseEvent } from './subagent-handlers' +export type { SSEHandler } from './handlers' diff --git a/apps/sim/lib/copilot/client-sse/subagent-handlers.ts b/apps/sim/lib/copilot/client-sse/subagent-handlers.ts new file mode 100644 index 000000000..fa2fc2e1c --- /dev/null +++ b/apps/sim/lib/copilot/client-sse/subagent-handlers.ts @@ -0,0 +1,360 @@ +import { createLogger } from '@sim/logger' +import { + normalizeSseEvent, + shouldSkipToolCallEvent, + shouldSkipToolResultEvent, +} from '@/lib/copilot/orchestrator/sse-utils' +import { ClientToolCallState } from '@/lib/copilot/tools/client/tool-display-registry' +import { resolveToolDisplay } from '@/lib/copilot/store-utils' +import type { CopilotStore, CopilotToolCall } from '@/stores/panel/copilot/types' +import type { StreamingContext } from './types' +import { sseHandlers, type SSEHandler, updateStreamingMessage } from './handlers' + +const logger = createLogger('CopilotClientSubagentHandlers') + +export function appendSubAgentContent( + context: StreamingContext, + parentToolCallId: string, + text: string +) { + if (!context.subAgentContent[parentToolCallId]) { + context.subAgentContent[parentToolCallId] = '' + } + if (!context.subAgentBlocks[parentToolCallId]) { + context.subAgentBlocks[parentToolCallId] = [] + } + context.subAgentContent[parentToolCallId] += text + const blocks = context.subAgentBlocks[parentToolCallId] + const lastBlock = blocks[blocks.length - 1] + if (lastBlock && lastBlock.type === 'subagent_text') { + lastBlock.content = (lastBlock.content || '') + text + } else { + blocks.push({ + type: 'subagent_text', + content: text, + timestamp: Date.now(), + }) + } +} + +export function updateToolCallWithSubAgentData( + context: StreamingContext, + get: () => CopilotStore, + set: any, + parentToolCallId: string +) { + const { toolCallsById } = get() + const parentToolCall = toolCallsById[parentToolCallId] + if (!parentToolCall) { + logger.warn('[SubAgent] updateToolCallWithSubAgentData: parent tool call not found', { + parentToolCallId, + availableToolCallIds: Object.keys(toolCallsById), + }) + return + } + + const blocks = context.subAgentBlocks[parentToolCallId] ?? [] + + const updatedToolCall: CopilotToolCall = { + ...parentToolCall, + subAgentContent: context.subAgentContent[parentToolCallId] || '', + subAgentToolCalls: context.subAgentToolCalls[parentToolCallId] ?? [], + subAgentBlocks: blocks, + subAgentStreaming: true, + } + + logger.info('[SubAgent] Updating tool call with subagent data', { + parentToolCallId, + parentToolName: parentToolCall.name, + subAgentContentLength: updatedToolCall.subAgentContent?.length, + subAgentBlocksCount: updatedToolCall.subAgentBlocks?.length, + subAgentToolCallsCount: updatedToolCall.subAgentToolCalls?.length, + }) + + const updatedMap = { ...toolCallsById, [parentToolCallId]: updatedToolCall } + set({ toolCallsById: updatedMap }) + + let foundInContentBlocks = false + for (let i = 0; i < context.contentBlocks.length; i++) { + const b = context.contentBlocks[i] as any + if (b.type === 'tool_call' && b.toolCall?.id === parentToolCallId) { + context.contentBlocks[i] = { ...b, toolCall: updatedToolCall } + foundInContentBlocks = true + break + } + } + + if (!foundInContentBlocks) { + logger.warn('[SubAgent] Parent tool call not found in contentBlocks', { + parentToolCallId, + contentBlocksCount: context.contentBlocks.length, + toolCallBlockIds: context.contentBlocks + .filter((b: any) => b.type === 'tool_call') + .map((b: any) => b.toolCall?.id), + }) + } + + updateStreamingMessage(set, context) +} + +export const subAgentSSEHandlers: Record = { + start: () => { + // Subagent start event - no action needed, parent is already tracked from subagent_start + }, + + content: (data, context, get, set) => { + const parentToolCallId = context.subAgentParentToolCallId + logger.info('[SubAgent] content event', { + parentToolCallId, + hasData: !!data.data, + dataPreview: typeof data.data === 'string' ? data.data.substring(0, 50) : null, + }) + if (!parentToolCallId || !data.data) { + logger.warn('[SubAgent] content missing parentToolCallId or data', { + parentToolCallId, + hasData: !!data.data, + }) + return + } + + appendSubAgentContent(context, parentToolCallId, data.data) + + updateToolCallWithSubAgentData(context, get, set, parentToolCallId) + }, + + reasoning: (data, context, get, set) => { + const parentToolCallId = context.subAgentParentToolCallId + const phase = data?.phase || data?.data?.phase + if (!parentToolCallId) return + + if (phase === 'start' || phase === 'end') return + + const chunk = typeof data?.data === 'string' ? data.data : data?.content || '' + if (!chunk) return + + appendSubAgentContent(context, parentToolCallId, chunk) + + updateToolCallWithSubAgentData(context, get, set, parentToolCallId) + }, + + tool_generating: () => { + // Tool generating event - no action needed, we'll handle the actual tool_call + }, + + tool_call: async (data, context, get, set) => { + const parentToolCallId = context.subAgentParentToolCallId + if (!parentToolCallId) return + + const toolData = data?.data ?? {} + const id: string | undefined = toolData.id || data?.toolCallId + const name: string | undefined = toolData.name || data?.toolName + if (!id || !name) return + const isPartial = toolData.partial === true + + let args = toolData.arguments || toolData.input || data?.arguments || data?.input + + if (typeof args === 'string') { + try { + args = JSON.parse(args) + } catch { + logger.warn('[SubAgent] Failed to parse arguments string', { args }) + } + } + + logger.info('[SubAgent] tool_call received', { + id, + name, + hasArgs: !!args, + argsKeys: args ? Object.keys(args) : [], + toolDataKeys: Object.keys(toolData), + dataKeys: Object.keys(data ?? {}), + }) + + if (!context.subAgentToolCalls[parentToolCallId]) { + context.subAgentToolCalls[parentToolCallId] = [] + } + if (!context.subAgentBlocks[parentToolCallId]) { + context.subAgentBlocks[parentToolCallId] = [] + } + + const existingIndex = context.subAgentToolCalls[parentToolCallId].findIndex((tc) => tc.id === id) + const subAgentToolCall: CopilotToolCall = { + id, + name, + state: ClientToolCallState.pending, + ...(args ? { params: args } : {}), + display: resolveToolDisplay(name, ClientToolCallState.pending, id, args), + } + + if (existingIndex >= 0) { + context.subAgentToolCalls[parentToolCallId][existingIndex] = subAgentToolCall + } else { + context.subAgentToolCalls[parentToolCallId].push(subAgentToolCall) + + context.subAgentBlocks[parentToolCallId].push({ + type: 'subagent_tool_call', + toolCall: subAgentToolCall, + timestamp: Date.now(), + }) + } + + const { toolCallsById } = get() + const updated = { ...toolCallsById, [id]: subAgentToolCall } + set({ toolCallsById: updated }) + + updateToolCallWithSubAgentData(context, get, set, parentToolCallId) + + if (isPartial) { + return + } + }, + + tool_result: (data, context, get, set) => { + const parentToolCallId = context.subAgentParentToolCallId + if (!parentToolCallId) return + + const toolCallId: string | undefined = data?.toolCallId || data?.data?.id + const success: boolean | undefined = data?.success !== false + if (!toolCallId) return + + if (!context.subAgentToolCalls[parentToolCallId]) return + if (!context.subAgentBlocks[parentToolCallId]) return + + const targetState = success ? ClientToolCallState.success : ClientToolCallState.error + const existingIndex = context.subAgentToolCalls[parentToolCallId].findIndex( + (tc) => tc.id === toolCallId + ) + + if (existingIndex >= 0) { + const existing = context.subAgentToolCalls[parentToolCallId][existingIndex] + const updatedSubAgentToolCall = { + ...existing, + state: targetState, + display: resolveToolDisplay(existing.name, targetState, toolCallId, existing.params), + } + context.subAgentToolCalls[parentToolCallId][existingIndex] = updatedSubAgentToolCall + + for (const block of context.subAgentBlocks[parentToolCallId]) { + if (block.type === 'subagent_tool_call' && block.toolCall?.id === toolCallId) { + block.toolCall = updatedSubAgentToolCall + break + } + } + + const { toolCallsById } = get() + if (toolCallsById[toolCallId]) { + const updatedMap = { + ...toolCallsById, + [toolCallId]: updatedSubAgentToolCall, + } + set({ toolCallsById: updatedMap }) + logger.info('[SubAgent] Updated subagent tool call state in toolCallsById', { + toolCallId, + name: existing.name, + state: targetState, + }) + } + } + + updateToolCallWithSubAgentData(context, get, set, parentToolCallId) + }, + + done: (_data, context, get, set) => { + const parentToolCallId = context.subAgentParentToolCallId + if (!parentToolCallId) return + + updateToolCallWithSubAgentData(context, get, set, parentToolCallId) + }, +} + +export async function applySseEvent( + data: any, + context: StreamingContext, + get: () => CopilotStore, + set: (next: Partial | ((state: CopilotStore) => Partial)) => void +): Promise { + const normalizedEvent = normalizeSseEvent(data) + if (shouldSkipToolCallEvent(normalizedEvent) || shouldSkipToolResultEvent(normalizedEvent)) { + return true + } + data = normalizedEvent + + if (data.type === 'subagent_start') { + const toolCallId = data.data?.tool_call_id + if (toolCallId) { + context.subAgentParentToolCallId = toolCallId + const { toolCallsById } = get() + const parentToolCall = toolCallsById[toolCallId] + if (parentToolCall) { + const updatedToolCall: CopilotToolCall = { + ...parentToolCall, + subAgentStreaming: true, + } + const updatedMap = { ...toolCallsById, [toolCallId]: updatedToolCall } + set({ toolCallsById: updatedMap }) + } + logger.info('[SSE] Subagent session started', { + subagent: data.subagent, + parentToolCallId: toolCallId, + }) + } + return true + } + + if (data.type === 'subagent_end') { + const parentToolCallId = context.subAgentParentToolCallId + if (parentToolCallId) { + const { toolCallsById } = get() + const parentToolCall = toolCallsById[parentToolCallId] + if (parentToolCall) { + const updatedToolCall: CopilotToolCall = { + ...parentToolCall, + subAgentContent: context.subAgentContent[parentToolCallId] || '', + subAgentToolCalls: context.subAgentToolCalls[parentToolCallId] ?? [], + subAgentBlocks: context.subAgentBlocks[parentToolCallId] ?? [], + subAgentStreaming: false, + } + const updatedMap = { ...toolCallsById, [parentToolCallId]: updatedToolCall } + set({ toolCallsById: updatedMap }) + logger.info('[SSE] Subagent session ended', { + subagent: data.subagent, + parentToolCallId, + contentLength: context.subAgentContent[parentToolCallId]?.length || 0, + toolCallCount: context.subAgentToolCalls[parentToolCallId]?.length || 0, + }) + } + } + context.subAgentParentToolCallId = undefined + return true + } + + if (data.subagent) { + const parentToolCallId = context.subAgentParentToolCallId + if (!parentToolCallId) { + logger.warn('[SSE] Subagent event without parent tool call ID', { + type: data.type, + subagent: data.subagent, + }) + return true + } + + logger.info('[SSE] Processing subagent event', { + type: data.type, + subagent: data.subagent, + parentToolCallId, + hasHandler: !!subAgentSSEHandlers[data.type], + }) + + const subAgentHandler = subAgentSSEHandlers[data.type] + if (subAgentHandler) { + await subAgentHandler(data, context, get, set) + } else { + logger.warn('[SSE] No handler for subagent event type', { type: data.type }) + } + return !context.streamComplete + } + + const handler = sseHandlers[data.type] || sseHandlers.default + await handler(data, context, get, set) + return !context.streamComplete +} diff --git a/apps/sim/lib/copilot/client-sse/types.ts b/apps/sim/lib/copilot/client-sse/types.ts new file mode 100644 index 000000000..82e5b99be --- /dev/null +++ b/apps/sim/lib/copilot/client-sse/types.ts @@ -0,0 +1,23 @@ +import type { CopilotToolCall } from '@/stores/panel/copilot/types' + +export interface StreamingContext { + messageId: string + accumulatedContent: string + contentBlocks: any[] + currentTextBlock: any | null + isInThinkingBlock: boolean + currentThinkingBlock: any | null + isInDesignWorkflowBlock: boolean + designWorkflowContent: string + pendingContent: string + newChatId?: string + doneEventCount: number + streamComplete?: boolean + wasAborted?: boolean + suppressContinueOption?: boolean + subAgentParentToolCallId?: string + subAgentContent: Record + subAgentToolCalls: Record + subAgentBlocks: Record + suppressStreamingUpdates?: boolean +} diff --git a/apps/sim/lib/copilot/messages/checkpoints.ts b/apps/sim/lib/copilot/messages/checkpoints.ts new file mode 100644 index 000000000..1a4847d6e --- /dev/null +++ b/apps/sim/lib/copilot/messages/checkpoints.ts @@ -0,0 +1,128 @@ +import { createLogger } from '@sim/logger' +import { mergeSubblockState } from '@/stores/workflows/utils' +import { useWorkflowStore } from '@/stores/workflows/workflow/store' +import type { WorkflowState } from '@/stores/workflows/workflow/types' +import type { CopilotMessage, CopilotStore, CopilotToolCall } from '@/stores/panel/copilot/types' + +const logger = createLogger('CopilotMessageCheckpoints') + +export function buildCheckpointWorkflowState(workflowId: string): WorkflowState | null { + const rawState = useWorkflowStore.getState().getWorkflowState() + if (!rawState) return null + + const blocksWithSubblockValues = mergeSubblockState(rawState.blocks, workflowId) + + const filteredBlocks = Object.entries(blocksWithSubblockValues).reduce( + (acc, [blockId, block]) => { + if (block?.type && block?.name) { + acc[blockId] = { + ...block, + id: block.id || blockId, + enabled: block.enabled !== undefined ? block.enabled : true, + horizontalHandles: block.horizontalHandles !== undefined ? block.horizontalHandles : true, + height: block.height !== undefined ? block.height : 90, + subBlocks: block.subBlocks ?? {}, + outputs: block.outputs ?? {}, + data: block.data ?? {}, + position: block.position || { x: 0, y: 0 }, + } + } + return acc + }, + {} as WorkflowState['blocks'] + ) + + return { + blocks: filteredBlocks, + edges: rawState.edges ?? [], + loops: rawState.loops ?? {}, + parallels: rawState.parallels ?? {}, + lastSaved: rawState.lastSaved || Date.now(), + deploymentStatuses: rawState.deploymentStatuses ?? {}, + } +} + +export async function saveMessageCheckpoint( + messageId: string, + get: () => CopilotStore, + set: (partial: Partial | ((state: CopilotStore) => Partial)) => void +): Promise { + const { workflowId, currentChat, messageSnapshots, messageCheckpoints } = get() + if (!workflowId || !currentChat?.id) return false + + const snapshot = messageSnapshots[messageId] + if (!snapshot) return false + + const nextSnapshots = { ...messageSnapshots } + delete nextSnapshots[messageId] + set({ messageSnapshots: nextSnapshots }) + + try { + const response = await fetch('/api/copilot/checkpoints', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + workflowId, + chatId: currentChat.id, + messageId, + workflowState: JSON.stringify(snapshot), + }), + }) + + if (!response.ok) { + throw new Error(`Failed to create checkpoint: ${response.statusText}`) + } + + const result = await response.json() + const newCheckpoint = result.checkpoint + if (newCheckpoint) { + const existingCheckpoints = messageCheckpoints[messageId] ?? [] + const updatedCheckpoints = { + ...messageCheckpoints, + [messageId]: [newCheckpoint, ...existingCheckpoints], + } + set({ messageCheckpoints: updatedCheckpoints }) + } + + return true + } catch (error) { + logger.error('Failed to create checkpoint from snapshot:', error) + return false + } +} + +export function extractToolCallsRecursively( + toolCall: CopilotToolCall, + map: Record +): void { + if (!toolCall?.id) return + map[toolCall.id] = toolCall + + if (Array.isArray(toolCall.subAgentBlocks)) { + for (const block of toolCall.subAgentBlocks) { + if (block?.type === 'subagent_tool_call' && block.toolCall?.id) { + extractToolCallsRecursively(block.toolCall, map) + } + } + } + + if (Array.isArray(toolCall.subAgentToolCalls)) { + for (const subTc of toolCall.subAgentToolCalls) { + extractToolCallsRecursively(subTc, map) + } + } +} + +export function buildToolCallsById(messages: CopilotMessage[]): Record { + const toolCallsById: Record = {} + for (const msg of messages) { + if (msg.contentBlocks) { + for (const block of msg.contentBlocks as any[]) { + if (block?.type === 'tool_call' && block.toolCall?.id) { + extractToolCallsRecursively(block.toolCall, toolCallsById) + } + } + } + } + return toolCallsById +} diff --git a/apps/sim/lib/copilot/messages/credential-masking.ts b/apps/sim/lib/copilot/messages/credential-masking.ts new file mode 100644 index 000000000..f0e64eef8 --- /dev/null +++ b/apps/sim/lib/copilot/messages/credential-masking.ts @@ -0,0 +1,28 @@ +export function maskCredentialIdsInValue(value: any, credentialIds: Set): any { + if (!value || credentialIds.size === 0) return value + + if (typeof value === 'string') { + let masked = value + const sortedIds = Array.from(credentialIds).sort((a, b) => b.length - a.length) + for (const id of sortedIds) { + if (id && masked.includes(id)) { + masked = masked.split(id).join('••••••••') + } + } + return masked + } + + if (Array.isArray(value)) { + return value.map((item) => maskCredentialIdsInValue(item, credentialIds)) + } + + if (typeof value === 'object') { + const masked: any = {} + for (const key of Object.keys(value)) { + masked[key] = maskCredentialIdsInValue(value[key], credentialIds) + } + return masked + } + + return value +} diff --git a/apps/sim/lib/copilot/messages/index.ts b/apps/sim/lib/copilot/messages/index.ts new file mode 100644 index 000000000..4525fcdd8 --- /dev/null +++ b/apps/sim/lib/copilot/messages/index.ts @@ -0,0 +1,3 @@ +export * from './credential-masking' +export * from './serialization' +export * from './checkpoints' diff --git a/apps/sim/lib/copilot/messages/serialization.ts b/apps/sim/lib/copilot/messages/serialization.ts new file mode 100644 index 000000000..e69bae218 --- /dev/null +++ b/apps/sim/lib/copilot/messages/serialization.ts @@ -0,0 +1,169 @@ +import { createLogger } from '@sim/logger' +import type { CopilotMessage } from '@/stores/panel/copilot/types' +import { maskCredentialIdsInValue } from './credential-masking' + +const logger = createLogger('CopilotMessageSerialization') + +export function clearStreamingFlags(toolCall: any): void { + if (!toolCall) return + + toolCall.subAgentStreaming = false + + if (Array.isArray(toolCall.subAgentBlocks)) { + for (const block of toolCall.subAgentBlocks) { + if (block?.type === 'subagent_tool_call' && block.toolCall) { + clearStreamingFlags(block.toolCall) + } + } + } + if (Array.isArray(toolCall.subAgentToolCalls)) { + for (const subTc of toolCall.subAgentToolCalls) { + clearStreamingFlags(subTc) + } + } +} + +export function normalizeMessagesForUI(messages: CopilotMessage[]): CopilotMessage[] { + try { + for (const message of messages) { + if (message.role === 'assistant') { + logger.info('[normalizeMessagesForUI] Loading assistant message', { + id: message.id, + hasContent: !!message.content?.trim(), + contentBlockCount: message.contentBlocks?.length || 0, + contentBlockTypes: (message.contentBlocks as any[])?.map((b) => b?.type) ?? [], + }) + } + } + + for (const message of messages) { + if (message.contentBlocks) { + for (const block of message.contentBlocks as any[]) { + if (block?.type === 'tool_call' && block.toolCall) { + clearStreamingFlags(block.toolCall) + } + } + } + if (message.toolCalls) { + for (const toolCall of message.toolCalls) { + clearStreamingFlags(toolCall) + } + } + } + return messages + } catch { + return messages + } +} + +export function deepClone(obj: T): T { + try { + const json = JSON.stringify(obj) + if (!json || json === 'undefined') { + logger.warn('[deepClone] JSON.stringify returned empty for object', { + type: typeof obj, + isArray: Array.isArray(obj), + length: Array.isArray(obj) ? obj.length : undefined, + }) + return obj + } + const parsed = JSON.parse(json) + if (Array.isArray(obj) && (!Array.isArray(parsed) || parsed.length !== obj.length)) { + logger.warn('[deepClone] Array clone mismatch', { + originalLength: obj.length, + clonedLength: Array.isArray(parsed) ? parsed.length : 'not array', + }) + } + return parsed + } catch (err) { + logger.error('[deepClone] Failed to clone object', { + error: String(err), + type: typeof obj, + isArray: Array.isArray(obj), + }) + return obj + } +} + +export function serializeMessagesForDB( + messages: CopilotMessage[], + credentialIds: Set +): any[] { + const result = messages + .map((msg) => { + let timestamp: string = msg.timestamp + if (typeof timestamp !== 'string') { + const ts = timestamp as any + timestamp = ts instanceof Date ? ts.toISOString() : new Date().toISOString() + } + + const serialized: any = { + id: msg.id, + role: msg.role, + content: msg.content || '', + timestamp, + } + + if (Array.isArray(msg.contentBlocks) && msg.contentBlocks.length > 0) { + serialized.contentBlocks = deepClone(msg.contentBlocks) + } + + if (Array.isArray((msg as any).toolCalls) && (msg as any).toolCalls.length > 0) { + serialized.toolCalls = deepClone((msg as any).toolCalls) + } + + if (Array.isArray(msg.fileAttachments) && msg.fileAttachments.length > 0) { + serialized.fileAttachments = deepClone(msg.fileAttachments) + } + + if (Array.isArray((msg as any).contexts) && (msg as any).contexts.length > 0) { + serialized.contexts = deepClone((msg as any).contexts) + } + + if (Array.isArray(msg.citations) && msg.citations.length > 0) { + serialized.citations = deepClone(msg.citations) + } + + if (msg.errorType) { + serialized.errorType = msg.errorType + } + + return maskCredentialIdsInValue(serialized, credentialIds) + }) + .filter((msg) => { + if (msg.role === 'assistant') { + const hasContent = typeof msg.content === 'string' && msg.content.trim().length > 0 + const hasTools = Array.isArray(msg.toolCalls) && msg.toolCalls.length > 0 + const hasBlocks = Array.isArray(msg.contentBlocks) && msg.contentBlocks.length > 0 + return hasContent || hasTools || hasBlocks + } + return true + }) + + for (const msg of messages) { + if (msg.role === 'assistant') { + logger.info('[serializeMessagesForDB] Input assistant message', { + id: msg.id, + hasContent: !!msg.content?.trim(), + contentBlockCount: msg.contentBlocks?.length || 0, + contentBlockTypes: (msg.contentBlocks as any[])?.map((b) => b?.type) ?? [], + }) + } + } + + logger.info('[serializeMessagesForDB] Serialized messages', { + inputCount: messages.length, + outputCount: result.length, + sample: + result.length > 0 + ? { + role: result[result.length - 1].role, + hasContent: !!result[result.length - 1].content, + contentBlockCount: result[result.length - 1].contentBlocks?.length || 0, + toolCallCount: result[result.length - 1].toolCalls?.length || 0, + } + : null, + }) + + return result +} diff --git a/apps/sim/lib/copilot/orchestrator/sse-handlers/handlers.ts b/apps/sim/lib/copilot/orchestrator/sse-handlers/handlers.ts index abbd2c32c..d885e9876 100644 --- a/apps/sim/lib/copilot/orchestrator/sse-handlers/handlers.ts +++ b/apps/sim/lib/copilot/orchestrator/sse-handlers/handlers.ts @@ -21,6 +21,19 @@ const logger = createLogger('CopilotSseHandlers') // Normalization + dedupe helpers live in sse-utils to keep server/client in sync. +function inferToolSuccess(data: Record | undefined): { + success: boolean + hasResultData: boolean + hasError: boolean +} { + const hasExplicitSuccess = data?.success !== undefined || data?.result?.success !== undefined + const explicitSuccess = data?.success ?? data?.result?.success + const hasResultData = data?.result !== undefined || data?.data !== undefined + const hasError = !!data?.error || !!data?.result?.error + const success = hasExplicitSuccess ? !!explicitSuccess : hasResultData && !hasError + return { success, hasResultData, hasError } +} + export type SSEHandler = ( event: SSEEvent, context: StreamingContext, @@ -47,14 +60,7 @@ export const sseHandlers: Record = { const current = context.toolCalls.get(toolCallId) if (!current) return - // Determine success: explicit success field, or if there's result data without explicit failure. - const hasExplicitSuccess = data?.success !== undefined || data?.result?.success !== undefined - const explicitSuccess = data?.success ?? data?.result?.success - const hasResultData = data?.result !== undefined || data?.data !== undefined - const hasError = !!data?.error || !!data?.result?.error - - // If explicitly set, use that; otherwise infer from data presence. - const success = hasExplicitSuccess ? !!explicitSuccess : hasResultData && !hasError + const { success, hasResultData, hasError } = inferToolSuccess(data) current.status = success ? 'success' : 'error' current.endTime = Date.now() @@ -344,12 +350,7 @@ export const subAgentHandlers: Record = { // Also update in main toolCalls (where we added it for execution). const mainToolCall = context.toolCalls.get(toolCallId) - // Use same success inference logic as main handler. - const hasExplicitSuccess = data?.success !== undefined || data?.result?.success !== undefined - const explicitSuccess = data?.success ?? data?.result?.success - const hasResultData = data?.result !== undefined || data?.data !== undefined - const hasError = !!data?.error || !!data?.result?.error - const success = hasExplicitSuccess ? !!explicitSuccess : hasResultData && !hasError + const { success, hasResultData, hasError } = inferToolSuccess(data) const status = success ? 'success' : 'error' const endTime = Date.now() diff --git a/apps/sim/lib/copilot/orchestrator/tool-executor/index.ts b/apps/sim/lib/copilot/orchestrator/tool-executor/index.ts index 41610306a..2882a8bbf 100644 --- a/apps/sim/lib/copilot/orchestrator/tool-executor/index.ts +++ b/apps/sim/lib/copilot/orchestrator/tool-executor/index.ts @@ -74,27 +74,34 @@ const SERVER_TOOLS = new Set([ 'knowledge_base', ]) -const SIM_WORKFLOW_TOOLS = new Set([ - 'get_user_workflow', - 'get_workflow_from_name', - 'list_user_workflows', - 'list_user_workspaces', - 'list_folders', - 'create_workflow', - 'create_folder', - 'get_workflow_data', - 'get_block_outputs', - 'get_block_upstream_references', - 'run_workflow', - 'set_global_workflow_variables', - 'deploy_api', - 'deploy_chat', - 'deploy_mcp', - 'redeploy', - 'check_deployment_status', - 'list_workspace_mcp_servers', - 'create_workspace_mcp_server', -]) +const SIM_WORKFLOW_TOOL_HANDLERS: Record< + string, + (params: Record, context: ExecutionContext) => Promise +> = { + get_user_workflow: (p, c) => executeGetUserWorkflow(p as GetUserWorkflowParams, c), + get_workflow_from_name: (p, c) => executeGetWorkflowFromName(p as GetWorkflowFromNameParams, c), + list_user_workflows: (p, c) => executeListUserWorkflows(p as ListUserWorkflowsParams, c), + list_user_workspaces: (_p, c) => executeListUserWorkspaces(c), + list_folders: (p, c) => executeListFolders(p as ListFoldersParams, c), + create_workflow: (p, c) => executeCreateWorkflow(p as CreateWorkflowParams, c), + create_folder: (p, c) => executeCreateFolder(p as CreateFolderParams, c), + get_workflow_data: (p, c) => executeGetWorkflowData(p as GetWorkflowDataParams, c), + get_block_outputs: (p, c) => executeGetBlockOutputs(p as GetBlockOutputsParams, c), + get_block_upstream_references: (p, c) => + executeGetBlockUpstreamReferences(p as unknown as GetBlockUpstreamReferencesParams, c), + run_workflow: (p, c) => executeRunWorkflow(p as RunWorkflowParams, c), + set_global_workflow_variables: (p, c) => + executeSetGlobalWorkflowVariables(p as SetGlobalWorkflowVariablesParams, c), + deploy_api: (p, c) => executeDeployApi(p as DeployApiParams, c), + deploy_chat: (p, c) => executeDeployChat(p as DeployChatParams, c), + deploy_mcp: (p, c) => executeDeployMcp(p as DeployMcpParams, c), + redeploy: (_p, c) => executeRedeploy(c), + check_deployment_status: (p, c) => executeCheckDeploymentStatus(p as CheckDeploymentStatusParams, c), + list_workspace_mcp_servers: (p, c) => + executeListWorkspaceMcpServers(p as ListWorkspaceMcpServersParams, c), + create_workspace_mcp_server: (p, c) => + executeCreateWorkspaceMcpServer(p as CreateWorkspaceMcpServerParams, c), +} /** * Execute a tool server-side without calling internal routes. @@ -110,7 +117,7 @@ export async function executeToolServerSide( return executeServerToolDirect(toolName, toolCall.params || {}, context) } - if (SIM_WORKFLOW_TOOLS.has(toolName)) { + if (toolName in SIM_WORKFLOW_TOOL_HANDLERS) { return executeSimWorkflowTool(toolName, toolCall.params || {}, context) } @@ -158,51 +165,12 @@ async function executeServerToolDirect( async function executeSimWorkflowTool( toolName: string, - params: Record, + params: Record, context: ExecutionContext ): Promise { - switch (toolName) { - case 'get_user_workflow': - return executeGetUserWorkflow(params as GetUserWorkflowParams, context) - case 'get_workflow_from_name': - return executeGetWorkflowFromName(params as GetWorkflowFromNameParams, context) - case 'list_user_workflows': - return executeListUserWorkflows(params as ListUserWorkflowsParams, context) - case 'list_user_workspaces': - return executeListUserWorkspaces(context) - case 'list_folders': - return executeListFolders(params as ListFoldersParams, context) - case 'create_workflow': - return executeCreateWorkflow(params as CreateWorkflowParams, context) - case 'create_folder': - return executeCreateFolder(params as CreateFolderParams, context) - case 'get_workflow_data': - return executeGetWorkflowData(params as GetWorkflowDataParams, context) - case 'get_block_outputs': - return executeGetBlockOutputs(params as GetBlockOutputsParams, context) - case 'get_block_upstream_references': - return executeGetBlockUpstreamReferences(params as GetBlockUpstreamReferencesParams, context) - case 'run_workflow': - return executeRunWorkflow(params as RunWorkflowParams, context) - case 'set_global_workflow_variables': - return executeSetGlobalWorkflowVariables(params as SetGlobalWorkflowVariablesParams, context) - case 'deploy_api': - return executeDeployApi(params as DeployApiParams, context) - case 'deploy_chat': - return executeDeployChat(params as DeployChatParams, context) - case 'deploy_mcp': - return executeDeployMcp(params as DeployMcpParams, context) - case 'redeploy': - return executeRedeploy(context) - case 'check_deployment_status': - return executeCheckDeploymentStatus(params as CheckDeploymentStatusParams, context) - case 'list_workspace_mcp_servers': - return executeListWorkspaceMcpServers(params as ListWorkspaceMcpServersParams, context) - case 'create_workspace_mcp_server': - return executeCreateWorkspaceMcpServer(params as CreateWorkspaceMcpServerParams, context) - default: - return { success: false, error: `Unsupported workflow tool: ${toolName}` } - } + const handler = SIM_WORKFLOW_TOOL_HANDLERS[toolName] + if (!handler) return { success: false, error: `Unsupported workflow tool: ${toolName}` } + return handler(params, context) } /** diff --git a/apps/sim/lib/copilot/store-utils.ts b/apps/sim/lib/copilot/store-utils.ts new file mode 100644 index 000000000..6c2cfcc4b --- /dev/null +++ b/apps/sim/lib/copilot/store-utils.ts @@ -0,0 +1,162 @@ +import { Loader2 } from 'lucide-react' +import { + ClientToolCallState, + type ClientToolDisplay, + TOOL_DISPLAY_REGISTRY, +} from '@/lib/copilot/tools/client/tool-display-registry' +import type { CopilotStore } from '@/stores/panel/copilot/types' + +export function resolveToolDisplay( + toolName: string | undefined, + state: ClientToolCallState, + _toolCallId?: string, + params?: Record +): ClientToolDisplay | undefined { + if (!toolName) return undefined + const entry = TOOL_DISPLAY_REGISTRY[toolName] + if (!entry) return humanizedFallback(toolName, state) + + if (entry.uiConfig?.dynamicText && params) { + const dynamicText = entry.uiConfig.dynamicText(params, state) + const stateDisplay = entry.displayNames[state] + if (dynamicText && stateDisplay?.icon) { + return { text: dynamicText, icon: stateDisplay.icon } + } + } + + const display = entry.displayNames[state] + if (display?.text || display?.icon) return display + + const fallbackOrder = [ + ClientToolCallState.generating, + ClientToolCallState.executing, + ClientToolCallState.success, + ] + for (const fallbackState of fallbackOrder) { + const fallback = entry.displayNames[fallbackState] + if (fallback?.text || fallback?.icon) return fallback + } + + return humanizedFallback(toolName, state) +} + +export function humanizedFallback( + toolName: string, + state: ClientToolCallState +): ClientToolDisplay | undefined { + const formattedName = toolName.replace(/_/g, ' ').replace(/\b\w/g, (c) => c.toUpperCase()) + const stateVerb = + state === ClientToolCallState.success + ? 'Executed' + : state === ClientToolCallState.error + ? 'Failed' + : state === ClientToolCallState.rejected || state === ClientToolCallState.aborted + ? 'Skipped' + : 'Executing' + return { text: `${stateVerb} ${formattedName}`, icon: Loader2 } +} + +export function isRejectedState(state: string): boolean { + return state === 'rejected' +} + +export function isReviewState(state: string): boolean { + return state === 'review' +} + +export function isBackgroundState(state: string): boolean { + return state === 'background' +} + +export function isTerminalState(state: string): boolean { + return ( + state === ClientToolCallState.success || + state === ClientToolCallState.error || + state === ClientToolCallState.rejected || + state === ClientToolCallState.aborted || + isReviewState(state) || + isBackgroundState(state) + ) +} + +export function abortAllInProgressTools( + set: any, + get: () => CopilotStore +) { + try { + const { toolCallsById, messages } = get() + const updatedMap = { ...toolCallsById } + const abortedIds = new Set() + let hasUpdates = false + for (const [id, tc] of Object.entries(toolCallsById)) { + const st = tc.state as any + const isTerminal = + st === ClientToolCallState.success || + st === ClientToolCallState.error || + st === ClientToolCallState.rejected || + st === ClientToolCallState.aborted + if (!isTerminal || isReviewState(st)) { + abortedIds.add(id) + updatedMap[id] = { + ...tc, + state: ClientToolCallState.aborted, + subAgentStreaming: false, + display: resolveToolDisplay(tc.name, ClientToolCallState.aborted, id, (tc as any).params), + } + hasUpdates = true + } else if (tc.subAgentStreaming) { + updatedMap[id] = { + ...tc, + subAgentStreaming: false, + } + hasUpdates = true + } + } + if (abortedIds.size > 0 || hasUpdates) { + set({ toolCallsById: updatedMap }) + set((s: CopilotStore) => { + const msgs = [...s.messages] + for (let mi = msgs.length - 1; mi >= 0; mi--) { + const m = msgs[mi] as any + if (m.role !== 'assistant' || !Array.isArray(m.contentBlocks)) continue + let changed = false + const blocks = m.contentBlocks.map((b: any) => { + if (b?.type === 'tool_call' && b.toolCall?.id && abortedIds.has(b.toolCall.id)) { + changed = true + const prev = b.toolCall + return { + ...b, + toolCall: { + ...prev, + state: ClientToolCallState.aborted, + display: resolveToolDisplay( + prev?.name, + ClientToolCallState.aborted, + prev?.id, + prev?.params + ), + }, + } + } + return b + }) + if (changed) { + msgs[mi] = { ...m, contentBlocks: blocks } + break + } + } + return { messages: msgs } + }) + } + } catch {} +} + +export function stripTodoTags(text: string): string { + if (!text) return text + return text + .replace(/[\s\S]*?<\/marktodo>/g, '') + .replace(/[\s\S]*?<\/checkofftodo>/g, '') + .replace(/[\s\S]*?<\/design_workflow>/g, '') + .replace(/[ \t]+\n/g, '\n') + .replace(/\n{2,}/g, '\n') +} diff --git a/apps/sim/lib/copilot/tools/client/tool-display-registry.ts b/apps/sim/lib/copilot/tools/client/tool-display-registry.ts index 42633ab37..45eb2f0f5 100644 --- a/apps/sim/lib/copilot/tools/client/tool-display-registry.ts +++ b/apps/sim/lib/copilot/tools/client/tool-display-registry.ts @@ -1,4 +1,3 @@ -// @ts-nocheck import type { LucideIcon } from 'lucide-react' import { Blocks, @@ -70,7 +69,7 @@ export interface ClientToolDisplay { } export type DynamicTextFormatter = ( - params: Record, + params: Record, state: ClientToolCallState ) => string | undefined @@ -101,6 +100,9 @@ interface ToolMetadata { subagent?: { streamingLabel?: string completedLabel?: string + shouldCollapse?: boolean + outputArtifacts?: string[] + hideThinkingText?: boolean } interrupt?: any customRenderer?: string @@ -115,6 +117,21 @@ interface ToolDisplayEntry { uiConfig?: ToolUIConfig } +type WorkflowDataType = 'global_variables' | 'custom_tools' | 'mcp_tools' | 'files' + +type NavigationDestination = 'workflow' | 'logs' | 'templates' | 'vector_db' | 'settings' + +function formatDuration(seconds: number): string { + if (seconds < 60) return `${Math.round(seconds)}s` + const mins = Math.floor(seconds / 60) + const secs = Math.round(seconds % 60) + if (mins < 60) return secs > 0 ? `${mins}m ${secs}s` : `${mins}m` + const hours = Math.floor(mins / 60) + const remMins = mins % 60 + if (remMins > 0) return `${hours}h ${remMins}m` + return `${hours}h` +} + function toUiConfig(metadata?: ToolMetadata): ToolUIConfig | undefined { const legacy = metadata?.uiConfig const subagent = legacy?.subagent @@ -1197,7 +1214,7 @@ const META_make_api_request: ToolMetadata = { { key: 'method', label: 'Method', width: '26%', editable: true, mono: true }, { key: 'url', label: 'Endpoint', width: '74%', editable: true, mono: true }, ], - extractRows: (params) => { + extractRows: (params: Record): Array<[string, ...any[]]> => { return [['request', (params.method || 'GET').toUpperCase(), params.url || '']] }, }, @@ -1665,7 +1682,7 @@ const META_run_workflow: ToolMetadata = { { key: 'input', label: 'Input', width: '36%' }, { key: 'value', label: 'Value', width: '64%', editable: true, mono: true }, ], - extractRows: (params) => { + extractRows: (params: Record): Array<[string, ...any[]]> => { let inputs = params.input || params.inputs || params.workflow_input if (typeof inputs === 'string') { try { @@ -1952,7 +1969,7 @@ const META_set_environment_variables: ToolMetadata = { { key: 'name', label: 'Variable', width: '36%', editable: true }, { key: 'value', label: 'Value', width: '64%', editable: true, mono: true }, ], - extractRows: (params) => { + extractRows: (params: Record): Array<[string, ...any[]]> => { const variables = params.variables || {} const entries = Array.isArray(variables) ? variables.map((v: any, i: number) => [String(i), v.name || `var_${i}`, v.value || '']) @@ -2021,7 +2038,7 @@ const META_set_global_workflow_variables: ToolMetadata = { { key: 'name', label: 'Name', width: '40%', editable: true, mono: true }, { key: 'value', label: 'Value', width: '60%', editable: true, mono: true }, ], - extractRows: (params) => { + extractRows: (params: Record): Array<[string, ...any[]]> => { const operations = params.operations || [] return operations.map((op: any, idx: number) => [ String(idx), diff --git a/apps/sim/stores/panel/copilot/store.ts b/apps/sim/stores/panel/copilot/store.ts index 4bf168593..694123c5f 100644 --- a/apps/sim/stores/panel/copilot/store.ts +++ b/apps/sim/stores/panel/copilot/store.ts @@ -4,17 +4,36 @@ import { createLogger } from '@sim/logger' import { create } from 'zustand' import { devtools } from 'zustand/middleware' import { type CopilotChat, sendStreamingMessage } from '@/lib/copilot/api' +import { applySseEvent, sseHandlers } from '@/lib/copilot/client-sse' +import { + appendContinueOption, + appendContinueOptionBlock, + createErrorMessage, + createStreamingMessage, + createUserMessage, + finalizeThinkingBlock, + stripContinueOption, + stripContinueOptionFromBlocks, +} from '@/lib/copilot/client-sse/content-blocks' +import { flushStreamingUpdates, stopStreamingUpdates } from '@/lib/copilot/client-sse/handlers' +import type { StreamingContext } from '@/lib/copilot/client-sse/types' +import { + buildCheckpointWorkflowState, + buildToolCallsById, + normalizeMessagesForUI, + saveMessageCheckpoint, + serializeMessagesForDB, +} from '@/lib/copilot/messages' import type { CopilotTransportMode } from '@/lib/copilot/models' +import { parseSSEStream } from '@/lib/copilot/orchestrator/sse-parser' +import { ClientToolCallState } from '@/lib/copilot/tools/client/tool-display-registry' import { - normalizeSseEvent, - shouldSkipToolCallEvent, - shouldSkipToolResultEvent, -} from '@/lib/copilot/orchestrator/sse-utils' -import { - ClientToolCallState, - type ClientToolDisplay, - TOOL_DISPLAY_REGISTRY, -} from '@/lib/copilot/tools/client/tool-display-registry' + abortAllInProgressTools, + isRejectedState, + isTerminalState, + resolveToolDisplay, + stripTodoTags, +} from '@/lib/copilot/store-utils' import { getQueryClient } from '@/app/_shell/providers/query-provider' import { subscriptionKeys } from '@/hooks/queries/subscription' import type { @@ -27,7 +46,6 @@ import type { } from '@/stores/panel/copilot/types' import { useWorkflowDiffStore } from '@/stores/workflow-diff/store' import { useSubBlockStore } from '@/stores/workflows/subblock/store' -import { mergeSubblockState } from '@/stores/workflows/utils' import { useWorkflowStore } from '@/stores/workflows/workflow/store' import type { WorkflowState } from '@/stores/workflows/workflow/types' @@ -110,1823 +128,9 @@ try { } } catch {} -// Constants const TEXT_BLOCK_TYPE = 'text' -const THINKING_BLOCK_TYPE = 'thinking' -const DATA_PREFIX = 'data: ' -const DATA_PREFIX_LENGTH = 6 const CONTINUE_OPTIONS_TAG = '{"1":"Continue"}' -// Resolve display text/icon for a tool based on its state -function resolveToolDisplay( - toolName: string | undefined, - state: ClientToolCallState, - _toolCallId?: string, - params?: Record -): ClientToolDisplay | undefined { - if (!toolName) return undefined - const entry = TOOL_DISPLAY_REGISTRY[toolName] - if (!entry) return humanizedFallback(toolName, state) - - // Check dynamic text first - if (entry.uiConfig?.dynamicText && params) { - const dynamicText = entry.uiConfig.dynamicText(params, state) - const stateDisplay = entry.displayNames[state] - if (dynamicText && stateDisplay?.icon) { - return { text: dynamicText, icon: stateDisplay.icon } - } - } - - // Exact state match - const display = entry.displayNames[state] - if (display?.text || display?.icon) return display - - // Fallback through states - const fallbackOrder = [ - ClientToolCallState.generating, - ClientToolCallState.executing, - ClientToolCallState.success, - ] - for (const fallbackState of fallbackOrder) { - const fallback = entry.displayNames[fallbackState] - if (fallback?.text || fallback?.icon) return fallback - } - - return humanizedFallback(toolName, state) -} - -function humanizedFallback( - toolName: string, - state: ClientToolCallState -): ClientToolDisplay | undefined { - const formattedName = toolName.replace(/_/g, ' ').replace(/\b\w/g, (c) => c.toUpperCase()) - const stateVerb = - state === ClientToolCallState.success - ? 'Executed' - : state === ClientToolCallState.error - ? 'Failed' - : state === ClientToolCallState.rejected || state === ClientToolCallState.aborted - ? 'Skipped' - : 'Executing' - return { text: `${stateVerb} ${formattedName}`, icon: undefined as any } -} - -// Helper: check if a tool state is rejected -function isRejectedState(state: any): boolean { - try { - return state === 'rejected' || state === (ClientToolCallState as any).rejected - } catch { - return state === 'rejected' - } -} - -// Helper: check if a tool state is review (terminal for build/edit preview) -function isReviewState(state: any): boolean { - try { - return state === 'review' || state === (ClientToolCallState as any).review - } catch { - return state === 'review' - } -} - -// Helper: check if a tool state is background (terminal) -function isBackgroundState(state: any): boolean { - try { - return state === 'background' || state === (ClientToolCallState as any).background - } catch { - return state === 'background' - } -} - -/** - * Checks if a tool call state is terminal (success, error, rejected, aborted, review, or background) - */ -function isTerminalState(state: any): boolean { - return ( - state === ClientToolCallState.success || - state === ClientToolCallState.error || - state === ClientToolCallState.rejected || - state === ClientToolCallState.aborted || - isReviewState(state) || - isBackgroundState(state) - ) -} - -// Helper: abort all in-progress client tools and update inline blocks -function abortAllInProgressTools(set: any, get: () => CopilotStore) { - try { - const { toolCallsById, messages } = get() - const updatedMap = { ...toolCallsById } - const abortedIds = new Set() - let hasUpdates = false - for (const [id, tc] of Object.entries(toolCallsById)) { - const st = tc.state as any - // Abort anything not already terminal success/error/rejected/aborted - const isTerminal = - st === ClientToolCallState.success || - st === ClientToolCallState.error || - st === ClientToolCallState.rejected || - st === ClientToolCallState.aborted - if (!isTerminal || isReviewState(st)) { - abortedIds.add(id) - updatedMap[id] = { - ...tc, - state: ClientToolCallState.aborted, - subAgentStreaming: false, - display: resolveToolDisplay(tc.name, ClientToolCallState.aborted, id, (tc as any).params), - } - hasUpdates = true - } else if (tc.subAgentStreaming) { - updatedMap[id] = { - ...tc, - subAgentStreaming: false, - } - hasUpdates = true - } - } - if (abortedIds.size > 0 || hasUpdates) { - set({ toolCallsById: updatedMap }) - // Update inline blocks in-place for the latest assistant message only (most relevant) - set((s: CopilotStore) => { - const msgs = [...s.messages] - for (let mi = msgs.length - 1; mi >= 0; mi--) { - const m = msgs[mi] as any - if (m.role !== 'assistant' || !Array.isArray(m.contentBlocks)) continue - let changed = false - const blocks = m.contentBlocks.map((b: any) => { - if (b?.type === 'tool_call' && b.toolCall?.id && abortedIds.has(b.toolCall.id)) { - changed = true - const prev = b.toolCall - return { - ...b, - toolCall: { - ...prev, - state: ClientToolCallState.aborted, - display: resolveToolDisplay( - prev?.name, - ClientToolCallState.aborted, - prev?.id, - prev?.params - ), - }, - } - } - return b - }) - if (changed) { - msgs[mi] = { ...m, contentBlocks: blocks } - break - } - } - return { messages: msgs } - }) - } - } catch {} -} - -// Normalize loaded messages so assistant messages render correctly from DB -/** - * Loads messages from DB for UI rendering. - * Messages are stored exactly as they render, so we just need to: - * 1. Clear any streaming flags (messages loaded from DB are never actively streaming) - * 2. Return the messages - */ -function normalizeMessagesForUI(messages: CopilotMessage[]): CopilotMessage[] { - try { - // Log what we're loading - for (const message of messages) { - if (message.role === 'assistant') { - logger.info('[normalizeMessagesForUI] Loading assistant message', { - id: message.id, - hasContent: !!message.content?.trim(), - contentBlockCount: message.contentBlocks?.length || 0, - contentBlockTypes: (message.contentBlocks as any[])?.map((b) => b?.type) || [], - }) - } - } - - // Clear streaming flags for all tool calls - for (const message of messages) { - if (message.contentBlocks) { - for (const block of message.contentBlocks as any[]) { - if (block?.type === 'tool_call' && block.toolCall) { - clearStreamingFlags(block.toolCall) - } - } - } - // Also clear from toolCalls array (legacy format) - if (message.toolCalls) { - for (const toolCall of message.toolCalls) { - clearStreamingFlags(toolCall) - } - } - } - return messages - } catch { - return messages - } -} - -/** - * Recursively clears streaming flags from a tool call and its nested subagent tool calls. - * This ensures messages loaded from DB don't appear to be streaming. - */ -function clearStreamingFlags(toolCall: any): void { - if (!toolCall) return - - // Always set subAgentStreaming to false - messages loaded from DB are never streaming - toolCall.subAgentStreaming = false - - // Clear nested subagent tool calls - if (Array.isArray(toolCall.subAgentBlocks)) { - for (const block of toolCall.subAgentBlocks) { - if (block?.type === 'subagent_tool_call' && block.toolCall) { - clearStreamingFlags(block.toolCall) - } - } - } - if (Array.isArray(toolCall.subAgentToolCalls)) { - for (const subTc of toolCall.subAgentToolCalls) { - clearStreamingFlags(subTc) - } - } -} - -// Simple object pool for content blocks -class ObjectPool { - private pool: T[] = [] - private createFn: () => T - private resetFn: (obj: T) => void - - constructor(createFn: () => T, resetFn: (obj: T) => void, initialSize = 5) { - this.createFn = createFn - this.resetFn = resetFn - for (let i = 0; i < initialSize; i++) this.pool.push(createFn()) - } - get(): T { - const obj = this.pool.pop() - if (obj) { - this.resetFn(obj) - return obj - } - return this.createFn() - } - release(obj: T): void { - if (this.pool.length < 20) this.pool.push(obj) - } -} - -const contentBlockPool = new ObjectPool( - () => ({ type: '', content: '', timestamp: 0, toolCall: null as any }), - (obj) => { - obj.type = '' - obj.content = '' - obj.timestamp = 0 - ;(obj as any).toolCall = null - ;(obj as any).startTime = undefined - ;(obj as any).duration = undefined - } -) - -// Efficient string builder -class StringBuilder { - private parts: string[] = [] - private length = 0 - append(str: string): void { - this.parts.push(str) - this.length += str.length - } - toString(): string { - const result = this.parts.join('') - this.clear() - return result - } - clear(): void { - this.parts.length = 0 - this.length = 0 - } - get size(): number { - return this.length - } -} - -// Helpers -function createUserMessage( - content: string, - fileAttachments?: MessageFileAttachment[], - contexts?: ChatContext[], - messageId?: string -): CopilotMessage { - return { - id: messageId || crypto.randomUUID(), - role: 'user', - content, - timestamp: new Date().toISOString(), - ...(fileAttachments && fileAttachments.length > 0 && { fileAttachments }), - ...(contexts && contexts.length > 0 && { contexts }), - ...(contexts && - contexts.length > 0 && { - contentBlocks: [ - { type: 'contexts', contexts: contexts as any, timestamp: Date.now() }, - ] as any, - }), - } -} - -function createStreamingMessage(): CopilotMessage { - return { - id: crypto.randomUUID(), - role: 'assistant', - content: '', - timestamp: new Date().toISOString(), - } -} - -function createErrorMessage( - messageId: string, - content: string, - errorType?: 'usage_limit' | 'unauthorized' | 'forbidden' | 'rate_limit' | 'upgrade_required' -): CopilotMessage { - return { - id: messageId, - role: 'assistant', - content, - timestamp: new Date().toISOString(), - contentBlocks: [ - { - type: 'text', - content, - timestamp: Date.now(), - }, - ], - errorType, - } -} - -/** - * Builds a workflow snapshot suitable for checkpoint persistence. - */ -function buildCheckpointWorkflowState(workflowId: string): WorkflowState | null { - const rawState = useWorkflowStore.getState().getWorkflowState() - if (!rawState) return null - - const blocksWithSubblockValues = mergeSubblockState(rawState.blocks, workflowId) - - const filteredBlocks = Object.entries(blocksWithSubblockValues).reduce( - (acc, [blockId, block]) => { - if (block?.type && block?.name) { - acc[blockId] = { - ...block, - id: block.id || blockId, - enabled: block.enabled !== undefined ? block.enabled : true, - horizontalHandles: block.horizontalHandles !== undefined ? block.horizontalHandles : true, - height: block.height !== undefined ? block.height : 90, - subBlocks: block.subBlocks || {}, - outputs: block.outputs || {}, - data: block.data || {}, - position: block.position || { x: 0, y: 0 }, - } - } - return acc - }, - {} as WorkflowState['blocks'] - ) - - return { - blocks: filteredBlocks, - edges: rawState.edges || [], - loops: rawState.loops || {}, - parallels: rawState.parallels || {}, - lastSaved: rawState.lastSaved || Date.now(), - deploymentStatuses: rawState.deploymentStatuses || {}, - } -} - -/** - * Persists a previously captured snapshot as a workflow checkpoint. - */ -async function saveMessageCheckpoint( - messageId: string, - get: () => CopilotStore, - set: (partial: Partial | ((state: CopilotStore) => Partial)) => void -): Promise { - const { workflowId, currentChat, messageSnapshots, messageCheckpoints } = get() - if (!workflowId || !currentChat?.id) return false - - const snapshot = messageSnapshots[messageId] - if (!snapshot) return false - - const nextSnapshots = { ...messageSnapshots } - delete nextSnapshots[messageId] - set({ messageSnapshots: nextSnapshots }) - - try { - const response = await fetch('/api/copilot/checkpoints', { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ - workflowId, - chatId: currentChat.id, - messageId, - workflowState: JSON.stringify(snapshot), - }), - }) - - if (!response.ok) { - throw new Error(`Failed to create checkpoint: ${response.statusText}`) - } - - const result = await response.json() - const newCheckpoint = result.checkpoint - if (newCheckpoint) { - const existingCheckpoints = messageCheckpoints[messageId] || [] - const updatedCheckpoints = { - ...messageCheckpoints, - [messageId]: [newCheckpoint, ...existingCheckpoints], - } - set({ messageCheckpoints: updatedCheckpoints }) - } - - return true - } catch (error) { - logger.error('Failed to create checkpoint from snapshot:', error) - return false - } -} - -function stripTodoTags(text: string): string { - if (!text) return text - return text - .replace(/[\s\S]*?<\/marktodo>/g, '') - .replace(/[\s\S]*?<\/checkofftodo>/g, '') - .replace(/[\s\S]*?<\/design_workflow>/g, '') - .replace(/[ \t]+\n/g, '\n') - .replace(/\n{2,}/g, '\n') -} - -/** - * Deep clones an object using JSON serialization. - * This ensures we strip any non-serializable data (functions, circular refs). - */ -function deepClone(obj: T): T { - try { - const json = JSON.stringify(obj) - if (!json || json === 'undefined') { - logger.warn('[deepClone] JSON.stringify returned empty for object', { - type: typeof obj, - isArray: Array.isArray(obj), - length: Array.isArray(obj) ? obj.length : undefined, - }) - return obj - } - const parsed = JSON.parse(json) - // Verify the clone worked - if (Array.isArray(obj) && (!Array.isArray(parsed) || parsed.length !== obj.length)) { - logger.warn('[deepClone] Array clone mismatch', { - originalLength: obj.length, - clonedLength: Array.isArray(parsed) ? parsed.length : 'not array', - }) - } - return parsed - } catch (err) { - logger.error('[deepClone] Failed to clone object', { - error: String(err), - type: typeof obj, - isArray: Array.isArray(obj), - }) - return obj - } -} - -/** - * Recursively masks credential IDs in any value (string, object, or array). - * Used during serialization to ensure sensitive IDs are never persisted. - */ -function maskCredentialIdsInValue(value: any, credentialIds: Set): any { - if (!value || credentialIds.size === 0) return value - - if (typeof value === 'string') { - let masked = value - // Sort by length descending to mask longer IDs first - const sortedIds = Array.from(credentialIds).sort((a, b) => b.length - a.length) - for (const id of sortedIds) { - if (id && masked.includes(id)) { - masked = masked.split(id).join('••••••••') - } - } - return masked - } - - if (Array.isArray(value)) { - return value.map((item) => maskCredentialIdsInValue(item, credentialIds)) - } - - if (typeof value === 'object') { - const masked: any = {} - for (const key of Object.keys(value)) { - masked[key] = maskCredentialIdsInValue(value[key], credentialIds) - } - return masked - } - - return value -} - -/** - * Serializes messages for database storage. - * Deep clones all fields to ensure proper JSON serialization. - * Masks sensitive credential IDs before persisting. - * This ensures they render identically when loaded back. - */ -function serializeMessagesForDB(messages: CopilotMessage[]): any[] { - // Get credential IDs to mask - const credentialIds = useCopilotStore.getState().sensitiveCredentialIds - - const result = messages - .map((msg) => { - // Deep clone the entire message to ensure all nested data is serializable - // Ensure timestamp is always a string (Zod schema requires it) - let timestamp: string = msg.timestamp - if (typeof timestamp !== 'string') { - const ts = timestamp as any - timestamp = ts instanceof Date ? ts.toISOString() : new Date().toISOString() - } - - const serialized: any = { - id: msg.id, - role: msg.role, - content: msg.content || '', - timestamp, - } - - // Deep clone contentBlocks (the main rendering data) - if (Array.isArray(msg.contentBlocks) && msg.contentBlocks.length > 0) { - serialized.contentBlocks = deepClone(msg.contentBlocks) - } - - // Deep clone toolCalls - if (Array.isArray((msg as any).toolCalls) && (msg as any).toolCalls.length > 0) { - serialized.toolCalls = deepClone((msg as any).toolCalls) - } - - // Deep clone file attachments - if (Array.isArray(msg.fileAttachments) && msg.fileAttachments.length > 0) { - serialized.fileAttachments = deepClone(msg.fileAttachments) - } - - // Deep clone contexts - if (Array.isArray((msg as any).contexts) && (msg as any).contexts.length > 0) { - serialized.contexts = deepClone((msg as any).contexts) - } - - // Deep clone citations - if (Array.isArray(msg.citations) && msg.citations.length > 0) { - serialized.citations = deepClone(msg.citations) - } - - // Copy error type - if (msg.errorType) { - serialized.errorType = msg.errorType - } - - // Mask credential IDs in the serialized message before persisting - return maskCredentialIdsInValue(serialized, credentialIds) - }) - .filter((msg) => { - // Filter out empty assistant messages - if (msg.role === 'assistant') { - const hasContent = typeof msg.content === 'string' && msg.content.trim().length > 0 - const hasTools = Array.isArray(msg.toolCalls) && msg.toolCalls.length > 0 - const hasBlocks = Array.isArray(msg.contentBlocks) && msg.contentBlocks.length > 0 - return hasContent || hasTools || hasBlocks - } - return true - }) - - // Log what we're serializing - for (const msg of messages) { - if (msg.role === 'assistant') { - logger.info('[serializeMessagesForDB] Input assistant message', { - id: msg.id, - hasContent: !!msg.content?.trim(), - contentBlockCount: msg.contentBlocks?.length || 0, - contentBlockTypes: (msg.contentBlocks as any[])?.map((b) => b?.type) || [], - }) - } - } - - logger.info('[serializeMessagesForDB] Serialized messages', { - inputCount: messages.length, - outputCount: result.length, - sample: - result.length > 0 - ? { - role: result[result.length - 1].role, - hasContent: !!result[result.length - 1].content, - contentBlockCount: result[result.length - 1].contentBlocks?.length || 0, - toolCallCount: result[result.length - 1].toolCalls?.length || 0, - } - : null, - }) - - return result -} - -/** - * @deprecated Use serializeMessagesForDB instead. - */ -function validateMessagesForLLM(messages: CopilotMessage[]): any[] { - return serializeMessagesForDB(messages) -} - -/** - * Extracts all tool calls from a toolCall object, including nested subAgentBlocks. - * Adds them to the provided map. - */ -function extractToolCallsRecursively( - toolCall: CopilotToolCall, - map: Record -): void { - if (!toolCall?.id) return - map[toolCall.id] = toolCall - - // Extract nested tool calls from subAgentBlocks - if (Array.isArray(toolCall.subAgentBlocks)) { - for (const block of toolCall.subAgentBlocks) { - if (block?.type === 'subagent_tool_call' && block.toolCall?.id) { - extractToolCallsRecursively(block.toolCall, map) - } - } - } - - // Extract from subAgentToolCalls as well - if (Array.isArray(toolCall.subAgentToolCalls)) { - for (const subTc of toolCall.subAgentToolCalls) { - extractToolCallsRecursively(subTc, map) - } - } -} - -/** - * Builds a complete toolCallsById map from normalized messages. - * Extracts all tool calls including nested subagent tool calls. - */ -function buildToolCallsById(messages: CopilotMessage[]): Record { - const toolCallsById: Record = {} - for (const msg of messages) { - if (msg.contentBlocks) { - for (const block of msg.contentBlocks as any[]) { - if (block?.type === 'tool_call' && block.toolCall?.id) { - extractToolCallsRecursively(block.toolCall, toolCallsById) - } - } - } - } - return toolCallsById -} - -// Streaming context and SSE parsing -interface StreamingContext { - messageId: string - accumulatedContent: StringBuilder - contentBlocks: any[] - currentTextBlock: any | null - isInThinkingBlock: boolean - currentThinkingBlock: any | null - isInDesignWorkflowBlock: boolean - designWorkflowContent: string - pendingContent: string - newChatId?: string - doneEventCount: number - streamComplete?: boolean - wasAborted?: boolean - suppressContinueOption?: boolean - /** Track active subagent sessions by parent tool call ID */ - subAgentParentToolCallId?: string - /** Track subagent content per parent tool call */ - subAgentContent: Record - /** Track subagent tool calls per parent tool call */ - subAgentToolCalls: Record - /** Track subagent streaming blocks per parent tool call */ - subAgentBlocks: Record - suppressStreamingUpdates?: boolean -} - -type SSEHandler = ( - data: any, - context: StreamingContext, - get: () => CopilotStore, - set: any -) => Promise | void - -function appendTextBlock(context: StreamingContext, text: string) { - if (!text) return - context.accumulatedContent.append(text) - if (context.currentTextBlock && context.contentBlocks.length > 0) { - const lastBlock = context.contentBlocks[context.contentBlocks.length - 1] - if (lastBlock.type === TEXT_BLOCK_TYPE && lastBlock === context.currentTextBlock) { - lastBlock.content += text - return - } - } - context.currentTextBlock = contentBlockPool.get() - context.currentTextBlock.type = TEXT_BLOCK_TYPE - context.currentTextBlock.content = text - context.currentTextBlock.timestamp = Date.now() - context.contentBlocks.push(context.currentTextBlock) -} - -function appendContinueOption(content: string): string { - if (//i.test(content)) return content - const suffix = content.trim().length > 0 ? '\n\n' : '' - return `${content}${suffix}${CONTINUE_OPTIONS_TAG}` -} - -function appendContinueOptionBlock(blocks: any[]): any[] { - if (!Array.isArray(blocks)) return blocks - const hasOptions = blocks.some( - (block) => - block?.type === TEXT_BLOCK_TYPE && - typeof block.content === 'string' && - //i.test(block.content) - ) - if (hasOptions) return blocks - return [ - ...blocks, - { - type: TEXT_BLOCK_TYPE, - content: CONTINUE_OPTIONS_TAG, - timestamp: Date.now(), - }, - ] -} - -function stripContinueOption(content: string): string { - if (!content || !content.includes(CONTINUE_OPTIONS_TAG)) return content - const next = content.replace(CONTINUE_OPTIONS_TAG, '') - return next.replace(/\n{2,}\s*$/g, '\n').trimEnd() -} - -function stripContinueOptionFromBlocks(blocks: any[]): any[] { - if (!Array.isArray(blocks)) return blocks - return blocks.flatMap((block) => { - if ( - block?.type === TEXT_BLOCK_TYPE && - typeof block.content === 'string' && - block.content.includes(CONTINUE_OPTIONS_TAG) - ) { - const nextContent = stripContinueOption(block.content) - if (!nextContent.trim()) return [] - return [{ ...block, content: nextContent }] - } - return [block] - }) -} - -function beginThinkingBlock(context: StreamingContext) { - if (!context.currentThinkingBlock) { - context.currentThinkingBlock = contentBlockPool.get() - context.currentThinkingBlock.type = THINKING_BLOCK_TYPE - context.currentThinkingBlock.content = '' - context.currentThinkingBlock.timestamp = Date.now() - ;(context.currentThinkingBlock as any).startTime = Date.now() - context.contentBlocks.push(context.currentThinkingBlock) - } - context.isInThinkingBlock = true - context.currentTextBlock = null -} - -/** - * Removes thinking tags (raw or escaped) from streamed content. - */ -function stripThinkingTags(text: string): string { - return text.replace(/<\/?thinking[^>]*>/gi, '').replace(/<\/?thinking[^&]*>/gi, '') -} - -function appendThinkingContent(context: StreamingContext, text: string) { - if (!text) return - const cleanedText = stripThinkingTags(text) - if (!cleanedText) return - if (context.currentThinkingBlock) { - context.currentThinkingBlock.content += cleanedText - } else { - context.currentThinkingBlock = contentBlockPool.get() - context.currentThinkingBlock.type = THINKING_BLOCK_TYPE - context.currentThinkingBlock.content = cleanedText - context.currentThinkingBlock.timestamp = Date.now() - context.currentThinkingBlock.startTime = Date.now() - context.contentBlocks.push(context.currentThinkingBlock) - } - context.isInThinkingBlock = true - context.currentTextBlock = null -} - -function finalizeThinkingBlock(context: StreamingContext) { - if (context.currentThinkingBlock) { - context.currentThinkingBlock.duration = - Date.now() - (context.currentThinkingBlock.startTime || Date.now()) - } - context.isInThinkingBlock = false - context.currentThinkingBlock = null - context.currentTextBlock = null -} - -function upsertToolCallBlock(context: StreamingContext, toolCall: CopilotToolCall) { - let found = false - for (let i = 0; i < context.contentBlocks.length; i++) { - const b = context.contentBlocks[i] as any - if (b.type === 'tool_call' && b.toolCall?.id === toolCall.id) { - context.contentBlocks[i] = { ...b, toolCall } - found = true - break - } - } - if (!found) { - context.contentBlocks.push({ type: 'tool_call', toolCall, timestamp: Date.now() }) - } -} - -function appendSubAgentText(context: StreamingContext, parentToolCallId: string, text: string) { - if (!context.subAgentContent[parentToolCallId]) { - context.subAgentContent[parentToolCallId] = '' - } - if (!context.subAgentBlocks[parentToolCallId]) { - context.subAgentBlocks[parentToolCallId] = [] - } - context.subAgentContent[parentToolCallId] += text - const blocks = context.subAgentBlocks[parentToolCallId] - const lastBlock = blocks[blocks.length - 1] - if (lastBlock && lastBlock.type === 'subagent_text') { - lastBlock.content = (lastBlock.content || '') + text - } else { - blocks.push({ - type: 'subagent_text', - content: text, - timestamp: Date.now(), - }) - } -} - -const sseHandlers: Record = { - chat_id: async (data, context, get, set) => { - context.newChatId = data.chatId - const { currentChat, activeStream } = get() - if (!currentChat && context.newChatId) { - await get().handleNewChatCreation(context.newChatId) - } - // Update activeStream with chatId for resume purposes - if (activeStream && context.newChatId && !activeStream.chatId) { - const updatedStream = { ...activeStream, chatId: context.newChatId } - set({ activeStream: updatedStream }) - writeActiveStreamToStorage(updatedStream) - } - }, - title_updated: (_data, _context, get, set) => { - const title = _data.title - if (!title) return - const { currentChat, chats } = get() - if (currentChat) { - set({ - currentChat: { ...currentChat, title }, - chats: chats.map((c) => (c.id === currentChat.id ? { ...c, title } : c)), - }) - } - }, - tool_result: (data, context, get, set) => { - try { - const toolCallId: string | undefined = data?.toolCallId || data?.data?.id - const success: boolean | undefined = data?.success - const failedDependency: boolean = data?.failedDependency === true - const skipped: boolean = data?.result?.skipped === true - if (!toolCallId) return - const { toolCallsById } = get() - const current = toolCallsById[toolCallId] - if (current) { - if ( - isRejectedState(current.state) || - isReviewState(current.state) || - isBackgroundState(current.state) - ) { - // Preserve terminal review/rejected state; do not override - return - } - const targetState = success - ? ClientToolCallState.success - : failedDependency || skipped - ? ClientToolCallState.rejected - : ClientToolCallState.error - const updatedMap = { ...toolCallsById } - updatedMap[toolCallId] = { - ...current, - state: targetState, - display: resolveToolDisplay( - current.name, - targetState, - current.id, - (current as any).params - ), - } - set({ toolCallsById: updatedMap }) - - // If checkoff_todo succeeded, mark todo as completed in planTodos - if (targetState === ClientToolCallState.success && current.name === 'checkoff_todo') { - try { - const result = data?.result || data?.data?.result || {} - const input = (current as any).params || (current as any).input || {} - const todoId = input.id || input.todoId || result.id || result.todoId - if (todoId) { - get().updatePlanTodoStatus(todoId, 'completed') - } - } catch {} - } - - // If mark_todo_in_progress succeeded, set todo executing in planTodos - if ( - targetState === ClientToolCallState.success && - current.name === 'mark_todo_in_progress' - ) { - try { - const result = data?.result || data?.data?.result || {} - const input = (current as any).params || (current as any).input || {} - const todoId = input.id || input.todoId || result.id || result.todoId - if (todoId) { - get().updatePlanTodoStatus(todoId, 'executing') - } - } catch {} - } - - if (current.name === 'edit_workflow') { - try { - const resultPayload = - data?.result || data?.data?.result || data?.data?.data || data?.data || {} - const workflowState = resultPayload?.workflowState - logger.info('[SSE] edit_workflow result received', { - hasWorkflowState: !!workflowState, - blockCount: workflowState ? Object.keys(workflowState.blocks || {}).length : 0, - edgeCount: workflowState?.edges?.length ?? 0, - }) - if (workflowState) { - const diffStore = useWorkflowDiffStore.getState() - // Await the diff application to catch any errors - diffStore.setProposedChanges(workflowState).catch((err) => { - logger.error('[SSE] Failed to apply edit_workflow diff', { - error: err instanceof Error ? err.message : String(err), - }) - }) - } - } catch (err) { - logger.error('[SSE] edit_workflow result handling failed', { - error: err instanceof Error ? err.message : String(err), - }) - } - } - } - - // Update inline content block state - for (let i = 0; i < context.contentBlocks.length; i++) { - const b = context.contentBlocks[i] as any - if (b?.type === 'tool_call' && b?.toolCall?.id === toolCallId) { - if ( - isRejectedState(b.toolCall?.state) || - isReviewState(b.toolCall?.state) || - isBackgroundState(b.toolCall?.state) - ) - break - const targetState = success - ? ClientToolCallState.success - : failedDependency || skipped - ? ClientToolCallState.rejected - : ClientToolCallState.error - context.contentBlocks[i] = { - ...b, - toolCall: { - ...b.toolCall, - state: targetState, - display: resolveToolDisplay( - b.toolCall?.name, - targetState, - toolCallId, - b.toolCall?.params - ), - }, - } - break - } - } - updateStreamingMessage(set, context) - } catch {} - }, - tool_error: (data, context, get, set) => { - try { - const toolCallId: string | undefined = data?.toolCallId || data?.data?.id - const failedDependency: boolean = data?.failedDependency === true - if (!toolCallId) return - const { toolCallsById } = get() - const current = toolCallsById[toolCallId] - if (current) { - if ( - isRejectedState(current.state) || - isReviewState(current.state) || - isBackgroundState(current.state) - ) { - return - } - const targetState = failedDependency - ? ClientToolCallState.rejected - : ClientToolCallState.error - const updatedMap = { ...toolCallsById } - updatedMap[toolCallId] = { - ...current, - state: targetState, - display: resolveToolDisplay( - current.name, - targetState, - current.id, - (current as any).params - ), - } - set({ toolCallsById: updatedMap }) - } - for (let i = 0; i < context.contentBlocks.length; i++) { - const b = context.contentBlocks[i] as any - if (b?.type === 'tool_call' && b?.toolCall?.id === toolCallId) { - if ( - isRejectedState(b.toolCall?.state) || - isReviewState(b.toolCall?.state) || - isBackgroundState(b.toolCall?.state) - ) - break - const targetState = failedDependency - ? ClientToolCallState.rejected - : ClientToolCallState.error - context.contentBlocks[i] = { - ...b, - toolCall: { - ...b.toolCall, - state: targetState, - display: resolveToolDisplay( - b.toolCall?.name, - targetState, - toolCallId, - b.toolCall?.params - ), - }, - } - break - } - } - updateStreamingMessage(set, context) - } catch {} - }, - tool_generating: (data, context, get, set) => { - const { toolCallId, toolName } = data - if (!toolCallId || !toolName) return - const { toolCallsById } = get() - - if (!toolCallsById[toolCallId]) { - // Show as pending until we receive full tool_call (with arguments) to decide execution - const initialState = ClientToolCallState.pending - const tc: CopilotToolCall = { - id: toolCallId, - name: toolName, - state: initialState, - display: resolveToolDisplay(toolName, initialState, toolCallId), - } - const updated = { ...toolCallsById, [toolCallId]: tc } - set({ toolCallsById: updated }) - logger.info('[toolCallsById] map updated', updated) - - // Add/refresh inline content block - upsertToolCallBlock(context, tc) - updateStreamingMessage(set, context) - } - }, - tool_call: (data, context, get, set) => { - const toolData = data?.data || {} - const id: string | undefined = toolData.id || data?.toolCallId - const name: string | undefined = toolData.name || data?.toolName - if (!id) return - const args = toolData.arguments - const isPartial = toolData.partial === true - const { toolCallsById } = get() - - const existing = toolCallsById[id] - const next: CopilotToolCall = existing - ? { - ...existing, - state: ClientToolCallState.pending, - ...(args ? { params: args } : {}), - display: resolveToolDisplay(name, ClientToolCallState.pending, id, args), - } - : { - id, - name: name || 'unknown_tool', - state: ClientToolCallState.pending, - ...(args ? { params: args } : {}), - display: resolveToolDisplay(name, ClientToolCallState.pending, id, args), - } - const updated = { ...toolCallsById, [id]: next } - set({ toolCallsById: updated }) - logger.info('[toolCallsById] → pending', { id, name, params: args }) - - // Ensure an inline content block exists/updated for this tool call - upsertToolCallBlock(context, next) - updateStreamingMessage(set, context) - - // Do not execute on partial tool_call frames - if (isPartial) { - return - } - - return - }, - reasoning: (data, context, _get, set) => { - const phase = (data && (data.phase || data?.data?.phase)) as string | undefined - if (phase === 'start') { - beginThinkingBlock(context) - updateStreamingMessage(set, context) - return - } - if (phase === 'end') { - finalizeThinkingBlock(context) - updateStreamingMessage(set, context) - return - } - const chunk: string = typeof data?.data === 'string' ? data.data : data?.content || '' - if (!chunk) return - appendThinkingContent(context, chunk) - updateStreamingMessage(set, context) - }, - content: (data, context, get, set) => { - if (!data.data) return - context.pendingContent += data.data - - let contentToProcess = context.pendingContent - let hasProcessedContent = false - - const thinkingStartRegex = // - const thinkingEndRegex = /<\/thinking>/ - const designWorkflowStartRegex = // - const designWorkflowEndRegex = /<\/design_workflow>/ - - const splitTrailingPartialTag = ( - text: string, - tags: string[] - ): { text: string; remaining: string } => { - const partialIndex = text.lastIndexOf('<') - if (partialIndex < 0) { - return { text, remaining: '' } - } - const possibleTag = text.substring(partialIndex) - const matchesTagStart = tags.some((tag) => tag.startsWith(possibleTag)) - if (!matchesTagStart) { - return { text, remaining: '' } - } - return { - text: text.substring(0, partialIndex), - remaining: possibleTag, - } - } - - while (contentToProcess.length > 0) { - // Handle design_workflow tags (takes priority over other content processing) - if (context.isInDesignWorkflowBlock) { - const endMatch = designWorkflowEndRegex.exec(contentToProcess) - if (endMatch) { - const designContent = contentToProcess.substring(0, endMatch.index) - context.designWorkflowContent += designContent - context.isInDesignWorkflowBlock = false - - // Update store with complete design workflow content (available in all modes) - logger.info('[design_workflow] Tag complete, setting plan content', { - contentLength: context.designWorkflowContent.length, - }) - set({ streamingPlanContent: context.designWorkflowContent }) - - contentToProcess = contentToProcess.substring(endMatch.index + endMatch[0].length) - hasProcessedContent = true - } else { - // Still in design_workflow block, accumulate content - const { text, remaining } = splitTrailingPartialTag(contentToProcess, [ - '', - ]) - context.designWorkflowContent += text - - // Update store with partial content for streaming effect (available in all modes) - set({ streamingPlanContent: context.designWorkflowContent }) - - contentToProcess = remaining - hasProcessedContent = true - if (remaining) { - break - } - } - continue - } - - if (!context.isInThinkingBlock && !context.isInDesignWorkflowBlock) { - // Check for design_workflow start tag first - const designStartMatch = designWorkflowStartRegex.exec(contentToProcess) - if (designStartMatch) { - const textBeforeDesign = contentToProcess.substring(0, designStartMatch.index) - if (textBeforeDesign) { - appendTextBlock(context, textBeforeDesign) - hasProcessedContent = true - } - context.isInDesignWorkflowBlock = true - context.designWorkflowContent = '' - contentToProcess = contentToProcess.substring( - designStartMatch.index + designStartMatch[0].length - ) - hasProcessedContent = true - continue - } - - const nextMarkIndex = contentToProcess.indexOf('') - const nextCheckIndex = contentToProcess.indexOf('') - const hasMark = nextMarkIndex >= 0 - const hasCheck = nextCheckIndex >= 0 - - const nextTagIndex = - hasMark && hasCheck - ? Math.min(nextMarkIndex, nextCheckIndex) - : hasMark - ? nextMarkIndex - : hasCheck - ? nextCheckIndex - : -1 - - if (nextTagIndex >= 0) { - const isMarkTodo = hasMark && nextMarkIndex === nextTagIndex - const tagStart = isMarkTodo ? '' : '' - const tagEnd = isMarkTodo ? '' : '' - const closingIndex = contentToProcess.indexOf(tagEnd, nextTagIndex + tagStart.length) - - if (closingIndex === -1) { - // Partial tag; wait for additional content - break - } - - const todoId = contentToProcess - .substring(nextTagIndex + tagStart.length, closingIndex) - .trim() - logger.info( - isMarkTodo ? '[TODO] Detected marktodo tag' : '[TODO] Detected checkofftodo tag', - { todoId } - ) - - if (todoId) { - try { - get().updatePlanTodoStatus(todoId, isMarkTodo ? 'executing' : 'completed') - logger.info( - isMarkTodo - ? '[TODO] Successfully marked todo in progress' - : '[TODO] Successfully checked off todo', - { todoId } - ) - } catch (e) { - logger.error( - isMarkTodo - ? '[TODO] Failed to mark todo in progress' - : '[TODO] Failed to checkoff todo', - { todoId, error: e } - ) - } - } else { - logger.warn('[TODO] Empty todoId extracted from todo tag', { tagType: tagStart }) - } - - // Remove the tag AND newlines around it, but preserve ONE newline if both sides had them - let beforeTag = contentToProcess.substring(0, nextTagIndex) - let afterTag = contentToProcess.substring(closingIndex + tagEnd.length) - - const hadNewlineBefore = /(\r?\n)+$/.test(beforeTag) - const hadNewlineAfter = /^(\r?\n)+/.test(afterTag) - - // Strip trailing newlines before the tag - beforeTag = beforeTag.replace(/(\r?\n)+$/, '') - // Strip leading newlines after the tag - afterTag = afterTag.replace(/^(\r?\n)+/, '') - - // If there were newlines on both sides, add back ONE to preserve paragraph breaks - contentToProcess = - beforeTag + (hadNewlineBefore && hadNewlineAfter ? '\n' : '') + afterTag - context.currentTextBlock = null - hasProcessedContent = true - continue - } - } - - if (context.isInThinkingBlock) { - const endMatch = thinkingEndRegex.exec(contentToProcess) - if (endMatch) { - const thinkingContent = contentToProcess.substring(0, endMatch.index) - appendThinkingContent(context, thinkingContent) - finalizeThinkingBlock(context) - contentToProcess = contentToProcess.substring(endMatch.index + endMatch[0].length) - hasProcessedContent = true - } else { - const { text, remaining } = splitTrailingPartialTag(contentToProcess, ['']) - if (text) { - appendThinkingContent(context, text) - hasProcessedContent = true - } - contentToProcess = remaining - if (remaining) { - break - } - } - } else { - const startMatch = thinkingStartRegex.exec(contentToProcess) - if (startMatch) { - const textBeforeThinking = contentToProcess.substring(0, startMatch.index) - if (textBeforeThinking) { - appendTextBlock(context, textBeforeThinking) - hasProcessedContent = true - } - context.isInThinkingBlock = true - context.currentTextBlock = null - contentToProcess = contentToProcess.substring(startMatch.index + startMatch[0].length) - hasProcessedContent = true - } else { - // Check if content might contain partial todo tags and hold them back - let partialTagIndex = contentToProcess.lastIndexOf('<') - - // Also check for partial marktodo or checkofftodo tags - const partialMarkTodo = contentToProcess.lastIndexOf(' partialTagIndex) { - partialTagIndex = partialMarkTodo - } - if (partialCheckoffTodo > partialTagIndex) { - partialTagIndex = partialCheckoffTodo - } - - let textToAdd = contentToProcess - let remaining = '' - if (partialTagIndex >= 0 && partialTagIndex > contentToProcess.length - 50) { - textToAdd = contentToProcess.substring(0, partialTagIndex) - remaining = contentToProcess.substring(partialTagIndex) - } - if (textToAdd) { - appendTextBlock(context, textToAdd) - hasProcessedContent = true - } - contentToProcess = remaining - break - } - } - } - - context.pendingContent = contentToProcess - if (hasProcessedContent) { - updateStreamingMessage(set, context) - } - }, - done: (_data, context) => { - logger.info('[SSE] DONE EVENT RECEIVED', { - doneEventCount: context.doneEventCount, - data: _data, - }) - context.doneEventCount++ - if (context.doneEventCount >= 1) { - logger.info('[SSE] Setting streamComplete = true, stream will terminate') - context.streamComplete = true - } - }, - error: (data, context, _get, set) => { - logger.error('Stream error:', data.error) - set((state: CopilotStore) => ({ - messages: state.messages.map((msg) => - msg.id === context.messageId - ? { - ...msg, - content: context.accumulatedContent || 'An error occurred.', - error: data.error, - } - : msg - ), - })) - context.streamComplete = true - }, - stream_end: (_data, context, _get, set) => { - if (context.pendingContent) { - if (context.isInThinkingBlock && context.currentThinkingBlock) { - appendThinkingContent(context, context.pendingContent) - } else if (context.pendingContent.trim()) { - appendTextBlock(context, context.pendingContent) - } - context.pendingContent = '' - } - finalizeThinkingBlock(context) - updateStreamingMessage(set, context) - }, - default: () => {}, -} - -/** - * Helper to update a tool call with subagent data in both toolCallsById and contentBlocks - */ -function updateToolCallWithSubAgentData( - context: StreamingContext, - get: () => CopilotStore, - set: any, - parentToolCallId: string -) { - const { toolCallsById } = get() - const parentToolCall = toolCallsById[parentToolCallId] - if (!parentToolCall) { - logger.warn('[SubAgent] updateToolCallWithSubAgentData: parent tool call not found', { - parentToolCallId, - availableToolCallIds: Object.keys(toolCallsById), - }) - return - } - - // Prepare subagent blocks array for ordered display - const blocks = context.subAgentBlocks[parentToolCallId] || [] - - const updatedToolCall: CopilotToolCall = { - ...parentToolCall, - subAgentContent: context.subAgentContent[parentToolCallId] || '', - subAgentToolCalls: context.subAgentToolCalls[parentToolCallId] || [], - subAgentBlocks: blocks, - subAgentStreaming: true, - } - - logger.info('[SubAgent] Updating tool call with subagent data', { - parentToolCallId, - parentToolName: parentToolCall.name, - subAgentContentLength: updatedToolCall.subAgentContent?.length, - subAgentBlocksCount: updatedToolCall.subAgentBlocks?.length, - subAgentToolCallsCount: updatedToolCall.subAgentToolCalls?.length, - }) - - // Update in toolCallsById - const updatedMap = { ...toolCallsById, [parentToolCallId]: updatedToolCall } - set({ toolCallsById: updatedMap }) - - // Update in contentBlocks - let foundInContentBlocks = false - for (let i = 0; i < context.contentBlocks.length; i++) { - const b = context.contentBlocks[i] as any - if (b.type === 'tool_call' && b.toolCall?.id === parentToolCallId) { - context.contentBlocks[i] = { ...b, toolCall: updatedToolCall } - foundInContentBlocks = true - break - } - } - - if (!foundInContentBlocks) { - logger.warn('[SubAgent] Parent tool call not found in contentBlocks', { - parentToolCallId, - contentBlocksCount: context.contentBlocks.length, - toolCallBlockIds: context.contentBlocks - .filter((b: any) => b.type === 'tool_call') - .map((b: any) => b.toolCall?.id), - }) - } - - updateStreamingMessage(set, context) -} - -/** - * SSE handlers for subagent events (events with subagent field set) - * These handle content and tool calls from subagents like debug - */ -const subAgentSSEHandlers: Record = { - // Handle subagent response start (ignore - just a marker) - start: () => { - // Subagent start event - no action needed, parent is already tracked from subagent_start - }, - - // Handle subagent text content (reasoning/thinking) - content: (data, context, get, set) => { - const parentToolCallId = context.subAgentParentToolCallId - logger.info('[SubAgent] content event', { - parentToolCallId, - hasData: !!data.data, - dataPreview: typeof data.data === 'string' ? data.data.substring(0, 50) : null, - }) - if (!parentToolCallId || !data.data) { - logger.warn('[SubAgent] content missing parentToolCallId or data', { - parentToolCallId, - hasData: !!data.data, - }) - return - } - - appendSubAgentText(context, parentToolCallId, data.data) - - updateToolCallWithSubAgentData(context, get, set, parentToolCallId) - }, - - // Handle subagent reasoning (same as content for subagent display purposes) - reasoning: (data, context, get, set) => { - const parentToolCallId = context.subAgentParentToolCallId - const phase = data?.phase || data?.data?.phase - if (!parentToolCallId) return - - // For reasoning, we just append the content (treating start/end as markers) - if (phase === 'start' || phase === 'end') return - - const chunk = typeof data?.data === 'string' ? data.data : data?.content || '' - if (!chunk) return - - appendSubAgentText(context, parentToolCallId, chunk) - - updateToolCallWithSubAgentData(context, get, set, parentToolCallId) - }, - - // Handle subagent tool_generating (tool is being generated) - tool_generating: () => { - // Tool generating event - no action needed, we'll handle the actual tool_call - }, - - // Handle subagent tool calls - also execute client tools - tool_call: async (data, context, get, set) => { - const parentToolCallId = context.subAgentParentToolCallId - if (!parentToolCallId) return - - const toolData = data?.data || {} - const id: string | undefined = toolData.id || data?.toolCallId - const name: string | undefined = toolData.name || data?.toolName - if (!id || !name) return - const isPartial = toolData.partial === true - - // Arguments can come in different locations depending on SSE format - // Check multiple possible locations - let args = toolData.arguments || toolData.input || data?.arguments || data?.input - - // If arguments is a string, try to parse it as JSON - if (typeof args === 'string') { - try { - args = JSON.parse(args) - } catch { - logger.warn('[SubAgent] Failed to parse arguments string', { args }) - } - } - - logger.info('[SubAgent] tool_call received', { - id, - name, - hasArgs: !!args, - argsKeys: args ? Object.keys(args) : [], - toolDataKeys: Object.keys(toolData), - dataKeys: Object.keys(data || {}), - }) - - // Initialize if needed - if (!context.subAgentToolCalls[parentToolCallId]) { - context.subAgentToolCalls[parentToolCallId] = [] - } - if (!context.subAgentBlocks[parentToolCallId]) { - context.subAgentBlocks[parentToolCallId] = [] - } - - // Create or update the subagent tool call - const existingIndex = context.subAgentToolCalls[parentToolCallId].findIndex( - (tc) => tc.id === id - ) - const subAgentToolCall: CopilotToolCall = { - id, - name, - state: ClientToolCallState.pending, - ...(args ? { params: args } : {}), - display: resolveToolDisplay(name, ClientToolCallState.pending, id, args), - } - - if (existingIndex >= 0) { - context.subAgentToolCalls[parentToolCallId][existingIndex] = subAgentToolCall - } else { - context.subAgentToolCalls[parentToolCallId].push(subAgentToolCall) - - // Also add to ordered blocks - context.subAgentBlocks[parentToolCallId].push({ - type: 'subagent_tool_call', - toolCall: subAgentToolCall, - timestamp: Date.now(), - }) - } - - // Also add to main toolCallsById for proper tool execution - const { toolCallsById } = get() - const updated = { ...toolCallsById, [id]: subAgentToolCall } - set({ toolCallsById: updated }) - - updateToolCallWithSubAgentData(context, get, set, parentToolCallId) - - if (isPartial) { - return - } - }, - - // Handle subagent tool results - tool_result: (data, context, get, set) => { - const parentToolCallId = context.subAgentParentToolCallId - if (!parentToolCallId) return - - const toolCallId: string | undefined = data?.toolCallId || data?.data?.id - const success: boolean | undefined = data?.success !== false // Default to true if not specified - if (!toolCallId) return - - // Initialize if needed - if (!context.subAgentToolCalls[parentToolCallId]) return - if (!context.subAgentBlocks[parentToolCallId]) return - - // Update the subagent tool call state - const targetState = success ? ClientToolCallState.success : ClientToolCallState.error - const existingIndex = context.subAgentToolCalls[parentToolCallId].findIndex( - (tc) => tc.id === toolCallId - ) - - if (existingIndex >= 0) { - const existing = context.subAgentToolCalls[parentToolCallId][existingIndex] - const updatedSubAgentToolCall = { - ...existing, - state: targetState, - display: resolveToolDisplay(existing.name, targetState, toolCallId, existing.params), - } - context.subAgentToolCalls[parentToolCallId][existingIndex] = updatedSubAgentToolCall - - // Also update in ordered blocks - for (const block of context.subAgentBlocks[parentToolCallId]) { - if (block.type === 'subagent_tool_call' && block.toolCall?.id === toolCallId) { - block.toolCall = updatedSubAgentToolCall - break - } - } - - // Update the individual tool call in toolCallsById so ToolCall component gets latest state - const { toolCallsById } = get() - if (toolCallsById[toolCallId]) { - const updatedMap = { - ...toolCallsById, - [toolCallId]: updatedSubAgentToolCall, - } - set({ toolCallsById: updatedMap }) - logger.info('[SubAgent] Updated subagent tool call state in toolCallsById', { - toolCallId, - name: existing.name, - state: targetState, - }) - } - } - - updateToolCallWithSubAgentData(context, get, set, parentToolCallId) - }, - - // Handle subagent stream done - just update the streaming state - done: (data, context, get, set) => { - const parentToolCallId = context.subAgentParentToolCallId - if (!parentToolCallId) return - - // Update the tool call with final content but keep streaming true until subagent_end - updateToolCallWithSubAgentData(context, get, set, parentToolCallId) - }, -} - -async function applySseEvent( - data: any, - context: StreamingContext, - get: () => CopilotStore, - set: (next: Partial | ((state: CopilotStore) => Partial)) => void -): Promise { - const normalizedEvent = normalizeSseEvent(data) - if (shouldSkipToolCallEvent(normalizedEvent) || shouldSkipToolResultEvent(normalizedEvent)) { - return true - } - data = normalizedEvent - - if (data.type === 'subagent_start') { - const toolCallId = data.data?.tool_call_id - if (toolCallId) { - context.subAgentParentToolCallId = toolCallId - const { toolCallsById } = get() - const parentToolCall = toolCallsById[toolCallId] - if (parentToolCall) { - const updatedToolCall: CopilotToolCall = { - ...parentToolCall, - subAgentStreaming: true, - } - const updatedMap = { ...toolCallsById, [toolCallId]: updatedToolCall } - set({ toolCallsById: updatedMap }) - } - logger.info('[SSE] Subagent session started', { - subagent: data.subagent, - parentToolCallId: toolCallId, - }) - } - return true - } - - if (data.type === 'subagent_end') { - const parentToolCallId = context.subAgentParentToolCallId - if (parentToolCallId) { - const { toolCallsById } = get() - const parentToolCall = toolCallsById[parentToolCallId] - if (parentToolCall) { - const updatedToolCall: CopilotToolCall = { - ...parentToolCall, - subAgentContent: context.subAgentContent[parentToolCallId] || '', - subAgentToolCalls: context.subAgentToolCalls[parentToolCallId] || [], - subAgentBlocks: context.subAgentBlocks[parentToolCallId] || [], - subAgentStreaming: false, - } - const updatedMap = { ...toolCallsById, [parentToolCallId]: updatedToolCall } - set({ toolCallsById: updatedMap }) - logger.info('[SSE] Subagent session ended', { - subagent: data.subagent, - parentToolCallId, - contentLength: context.subAgentContent[parentToolCallId]?.length || 0, - toolCallCount: context.subAgentToolCalls[parentToolCallId]?.length || 0, - }) - } - } - context.subAgentParentToolCallId = undefined - return true - } - - if (data.subagent) { - const parentToolCallId = context.subAgentParentToolCallId - if (!parentToolCallId) { - logger.warn('[SSE] Subagent event without parent tool call ID', { - type: data.type, - subagent: data.subagent, - }) - return true - } - - logger.info('[SSE] Processing subagent event', { - type: data.type, - subagent: data.subagent, - parentToolCallId, - hasHandler: !!subAgentSSEHandlers[data.type], - }) - - const subAgentHandler = subAgentSSEHandlers[data.type] - if (subAgentHandler) { - await subAgentHandler(data, context, get, set) - } else { - logger.warn('[SSE] No handler for subagent event type', { type: data.type }) - } - return !context.streamComplete - } - - const handler = sseHandlers[data.type] || sseHandlers.default - await handler(data, context, get, set) - return !context.streamComplete -} - -// Debounced UI update queue for smoother streaming -const streamingUpdateQueue = new Map() -let streamingUpdateRAF: number | null = null -let lastBatchTime = 0 -const MIN_BATCH_INTERVAL = 16 -const MAX_BATCH_INTERVAL = 50 -const MAX_QUEUE_SIZE = 5 - -function stopStreamingUpdates() { - if (streamingUpdateRAF !== null) { - cancelAnimationFrame(streamingUpdateRAF) - streamingUpdateRAF = null - } - streamingUpdateQueue.clear() -} - -/** Flush pending streaming updates immediately (apply them to state before clearing). */ -function flushStreamingUpdates(set: any) { - if (streamingUpdateRAF !== null) { - cancelAnimationFrame(streamingUpdateRAF) - streamingUpdateRAF = null - } - if (streamingUpdateQueue.size === 0) return - - const updates = new Map(streamingUpdateQueue) - streamingUpdateQueue.clear() - - set((state: CopilotStore) => { - if (updates.size === 0) return state - return { - messages: state.messages.map((msg) => { - const update = updates.get(msg.id) - if (update) { - return { - ...msg, - content: '', - contentBlocks: - update.contentBlocks.length > 0 - ? createOptimizedContentBlocks(update.contentBlocks) - : [], - } - } - return msg - }), - } - }) -} - function cloneContentBlocks(blocks: any[]): any[] { if (!Array.isArray(blocks)) return [] return blocks.map((block) => (block ? { ...block } : block)) @@ -1963,7 +167,7 @@ function findLastTextBlock(blocks: any[]): any | null { function replaceTextBlocks(blocks: any[], text: string): any[] { const next: any[] = [] let inserted = false - for (const block of blocks || []) { + for (const block of blocks ?? []) { if (block?.type === TEXT_BLOCK_TYPE) { if (!inserted && text) { next.push({ type: TEXT_BLOCK_TYPE, content: text, timestamp: Date.now() }) @@ -1982,7 +186,7 @@ function replaceTextBlocks(blocks: any[], text: string): any[] { function createStreamingContext(messageId: string): StreamingContext { return { messageId, - accumulatedContent: new StringBuilder(), + accumulatedContent: '', contentBlocks: [], currentTextBlock: null, isInThinkingBlock: false, @@ -1997,102 +201,6 @@ function createStreamingContext(messageId: string): StreamingContext { } } -function createOptimizedContentBlocks(contentBlocks: any[]): any[] { - const result: any[] = new Array(contentBlocks.length) - for (let i = 0; i < contentBlocks.length; i++) { - const block = contentBlocks[i] - result[i] = { ...block } - } - return result -} -;`` -function updateStreamingMessage(set: any, context: StreamingContext) { - if (context.suppressStreamingUpdates) return - const now = performance.now() - streamingUpdateQueue.set(context.messageId, context) - const timeSinceLastBatch = now - lastBatchTime - const shouldFlushImmediately = - streamingUpdateQueue.size >= MAX_QUEUE_SIZE || timeSinceLastBatch > MAX_BATCH_INTERVAL - - if (streamingUpdateRAF === null) { - const scheduleUpdate = () => { - streamingUpdateRAF = requestAnimationFrame(() => { - const updates = new Map(streamingUpdateQueue) - streamingUpdateQueue.clear() - streamingUpdateRAF = null - lastBatchTime = performance.now() - set((state: CopilotStore) => { - if (updates.size === 0) return state - const messages = state.messages - const lastMessage = messages[messages.length - 1] - const lastMessageUpdate = lastMessage ? updates.get(lastMessage.id) : null - if (updates.size === 1 && lastMessageUpdate) { - const newMessages = [...messages] - newMessages[messages.length - 1] = { - ...lastMessage, - content: '', - contentBlocks: - lastMessageUpdate.contentBlocks.length > 0 - ? createOptimizedContentBlocks(lastMessageUpdate.contentBlocks) - : [], - } - return { messages: newMessages } - } - return { - messages: messages.map((msg) => { - const update = updates.get(msg.id) - if (update) { - return { - ...msg, - content: '', - contentBlocks: - update.contentBlocks.length > 0 - ? createOptimizedContentBlocks(update.contentBlocks) - : [], - } - } - return msg - }), - } - }) - }) - } - if (shouldFlushImmediately) scheduleUpdate() - else setTimeout(scheduleUpdate, Math.max(0, MIN_BATCH_INTERVAL - timeSinceLastBatch)) - } -} - -async function* parseSSEStream( - reader: ReadableStreamDefaultReader, - decoder: TextDecoder -) { - let buffer = '' - while (true) { - const { done, value } = await reader.read() - if (done) break - const chunk = decoder.decode(value, { stream: true }) - buffer += chunk - const lastNewlineIndex = buffer.lastIndexOf('\n') - if (lastNewlineIndex !== -1) { - const linesToProcess = buffer.substring(0, lastNewlineIndex) - buffer = buffer.substring(lastNewlineIndex + 1) - const lines = linesToProcess.split('\n') - for (let i = 0; i < lines.length; i++) { - const line = lines[i] - if (line.length === 0) continue - if (line.charCodeAt(0) === 100 && line.startsWith(DATA_PREFIX)) { - try { - const jsonStr = line.substring(DATA_PREFIX_LENGTH) - yield JSON.parse(jsonStr) - } catch (error) { - logger.warn('Failed to parse SSE data:', error) - } - } - } - } - } -} - // Initial state (subset required for UI/streaming) const initialState = { mode: 'build' as const, @@ -2103,7 +211,6 @@ const initialState = { currentChat: null as CopilotChat | null, chats: [] as CopilotChat[], messages: [] as CopilotMessage[], - checkpoints: [] as any[], messageCheckpoints: {} as Record, messageSnapshots: {} as Record, isLoading: false, @@ -2193,7 +300,7 @@ export const useCopilotStore = create()( // Restore plan content and config (mode/model) from selected chat const planArtifact = chat.planArtifact || '' - const chatConfig = chat.config || {} + const chatConfig = chat.config ?? {} const chatMode = chatConfig.mode || get().mode const chatModel = chatConfig.model || get().selectedModel @@ -2211,7 +318,7 @@ export const useCopilotStore = create()( const previousModel = get().selectedModel // Optimistically set selected chat and normalize messages for UI - const normalizedMessages = normalizeMessagesForUI(chat.messages || []) + const normalizedMessages = normalizeMessagesForUI(chat.messages ?? []) const toolCallsById = buildToolCallsById(normalizedMessages) set({ @@ -2229,7 +336,7 @@ export const useCopilotStore = create()( // Background-save the previous chat's latest messages, plan artifact, and config before switching (optimistic) try { if (previousChat && previousChat.id !== chat.id) { - const dbMessages = validateMessagesForLLM(previousMessages) + const dbMessages = serializeMessagesForDB(previousMessages, get().sensitiveCredentialIds) const previousPlanArtifact = get().streamingPlanContent fetch('/api/copilot/chat/update-messages', { method: 'POST', @@ -2255,13 +362,13 @@ export const useCopilotStore = create()( if (data.success && Array.isArray(data.chats)) { const latestChat = data.chats.find((c: CopilotChat) => c.id === chat.id) if (latestChat) { - const normalizedMessages = normalizeMessagesForUI(latestChat.messages || []) + const normalizedMessages = normalizeMessagesForUI(latestChat.messages ?? []) const toolCallsById = buildToolCallsById(normalizedMessages) set({ currentChat: latestChat, messages: normalizedMessages, - chats: (get().chats || []).map((c: CopilotChat) => + chats: (get().chats ?? []).map((c: CopilotChat) => c.id === chat.id ? latestChat : c ), toolCallsById, @@ -2289,7 +396,7 @@ export const useCopilotStore = create()( const { currentChat, streamingPlanContent, mode, selectedModel } = get() if (currentChat) { const currentMessages = get().messages - const dbMessages = validateMessagesForLLM(currentMessages) + const dbMessages = serializeMessagesForDB(currentMessages, get().sensitiveCredentialIds) fetch('/api/copilot/chat/update-messages', { method: 'POST', headers: { 'Content-Type': 'application/json' }, @@ -2345,8 +452,6 @@ export const useCopilotStore = create()( } }, - areChatsFresh: (_workflowId: string) => false, - loadChats: async (_forceRefresh = false) => { const { workflowId } = get() @@ -2385,11 +490,11 @@ export const useCopilotStore = create()( if (isSendingMessage) { set({ currentChat: { ...updatedCurrentChat, messages: get().messages } }) } else { - const normalizedMessages = normalizeMessagesForUI(updatedCurrentChat.messages || []) + const normalizedMessages = normalizeMessagesForUI(updatedCurrentChat.messages ?? []) // Restore plan artifact and config from refreshed chat const refreshedPlanArtifact = updatedCurrentChat.planArtifact || '' - const refreshedConfig = updatedCurrentChat.config || {} + const refreshedConfig = updatedCurrentChat.config ?? {} const refreshedMode = refreshedConfig.mode || get().mode const refreshedModel = refreshedConfig.model || get().selectedModel const toolCallsById = buildToolCallsById(normalizedMessages) @@ -2408,11 +513,11 @@ export const useCopilotStore = create()( } catch {} } else if (!isSendingMessage && !suppressAutoSelect) { const mostRecentChat: CopilotChat = data.chats[0] - const normalizedMessages = normalizeMessagesForUI(mostRecentChat.messages || []) + const normalizedMessages = normalizeMessagesForUI(mostRecentChat.messages ?? []) // Restore plan artifact and config from most recent chat const planArtifact = mostRecentChat.planArtifact || '' - const chatConfig = mostRecentChat.config || {} + const chatConfig = mostRecentChat.config ?? {} const chatMode = chatConfig.mode || get().mode const chatModel = chatConfig.model || get().selectedModel @@ -2762,7 +867,7 @@ export const useCopilotStore = create()( if (response.ok) { const data = await response.json() if (data.success && data.chat) { - const normalizedMessages = normalizeMessagesForUI(data.chat.messages || []) + const normalizedMessages = normalizeMessagesForUI(data.chat.messages ?? []) const toolCallsById = buildToolCallsById(normalizedMessages) set({ currentChat: data.chat, @@ -2809,7 +914,7 @@ export const useCopilotStore = create()( resumeFromEventId = entry.eventId } } - bufferedContent = replayContext.accumulatedContent.toString() + bufferedContent = replayContext.accumulatedContent replayBlocks = replayContext.contentBlocks logger.info('[Copilot] Loaded buffered content instantly', { eventCount: batchData.events.length, @@ -2855,7 +960,7 @@ export const useCopilotStore = create()( return { ...m, content: stripContinueOption(m.content || ''), - contentBlocks: stripContinueOptionFromBlocks(m.contentBlocks || []), + contentBlocks: stripContinueOptionFromBlocks(m.contentBlocks ?? []), } }) @@ -2905,7 +1010,7 @@ export const useCopilotStore = create()( return { ...m, content: bufferedContent, - contentBlocks: nextBlocks || [], + contentBlocks: nextBlocks ?? [], } }) } @@ -3040,7 +1145,7 @@ export const useCopilotStore = create()( if (currentChat) { try { const currentMessages = get().messages - const dbMessages = validateMessagesForLLM(currentMessages) + const dbMessages = serializeMessagesForDB(currentMessages, get().sensitiveCredentialIds) fetch('/api/copilot/chat/update-messages', { method: 'POST', headers: { 'Content-Type': 'application/json' }, @@ -3242,7 +1347,7 @@ export const useCopilotStore = create()( const blocks = m.contentBlocks.map((b: any) => { if (b.type === 'tool_call' && b.toolCall?.id === id) { changed = true - const prev = b.toolCall || {} + const prev = b.toolCall ?? {} return { ...b, toolCall: { @@ -3277,14 +1382,6 @@ export const useCopilotStore = create()( } catch {} }, - sendDocsMessage: async (query: string) => { - await get().sendMessage(query) - }, - - saveChatMessages: async (_chatId: string) => {}, - - loadCheckpoints: async (_chatId: string) => set({ checkpoints: [] }), - loadMessageCheckpoints: async (chatId: string) => { const { workflowId } = get() if (!workflowId) return @@ -3296,7 +1393,7 @@ export const useCopilotStore = create()( if (data.success && Array.isArray(data.checkpoints)) { const grouped = data.checkpoints.reduce((acc: Record, cp: any) => { const key = cp.messageId || '__no_message__' - acc[key] = acc[key] || [] + acc[key] = acc[key] ?? [] acc[key].push(cp) return acc }, {}) @@ -3320,7 +1417,7 @@ export const useCopilotStore = create()( try { const { messageCheckpoints } = get() const checkpointMessageId = Object.entries(messageCheckpoints).find(([, cps]) => - (cps || []).some((cp: any) => cp?.id === checkpointId) + (cps ?? []).some((cp: any) => cp?.id === checkpointId) )?.[0] const response = await fetch('/api/copilot/checkpoints/revert', { method: 'POST', @@ -3341,19 +1438,19 @@ export const useCopilotStore = create()( // Apply to main workflow store useWorkflowStore.setState({ - blocks: reverted.blocks || {}, - edges: reverted.edges || [], - loops: reverted.loops || {}, - parallels: reverted.parallels || {}, + blocks: reverted.blocks ?? {}, + edges: reverted.edges ?? [], + loops: reverted.loops ?? {}, + parallels: reverted.parallels ?? {}, lastSaved: reverted.lastSaved || Date.now(), - deploymentStatuses: reverted.deploymentStatuses || {}, + deploymentStatuses: reverted.deploymentStatuses ?? {}, }) // Extract and apply subblock values const values: Record> = {} - Object.entries(reverted.blocks || {}).forEach(([blockId, block]: [string, any]) => { + Object.entries(reverted.blocks ?? {}).forEach(([blockId, block]: [string, any]) => { values[blockId] = {} - Object.entries((block as any).subBlocks || {}).forEach( + Object.entries((block as any).subBlocks ?? {}).forEach( ([subId, sub]: [string, any]) => { values[blockId][subId] = (sub as any)?.value } @@ -3383,7 +1480,7 @@ export const useCopilotStore = create()( }, getCheckpointsForMessage: (messageId: string) => { const { messageCheckpoints } = get() - return messageCheckpoints[messageId] || [] + return messageCheckpoints[messageId] ?? [] }, saveMessageCheckpoint: async (messageId: string) => { if (!messageId) return false @@ -3424,19 +1521,19 @@ export const useCopilotStore = create()( if (existingBlocks.length > 0) { const existingText = extractTextFromBlocks(existingBlocks) if (existingText) { - context.accumulatedContent.append(existingText) + context.accumulatedContent += existingText } const clonedBlocks = cloneContentBlocks(existingBlocks) context.contentBlocks = clonedBlocks context.currentTextBlock = findLastTextBlock(clonedBlocks) } else if (existingMessage.content) { - const textBlock = contentBlockPool.get() + const textBlock = { type: '', content: '', timestamp: 0, toolCall: null } textBlock.type = TEXT_BLOCK_TYPE textBlock.content = existingMessage.content textBlock.timestamp = Date.now() context.contentBlocks = [textBlock] context.currentTextBlock = textBlock - context.accumulatedContent.append(existingMessage.content) + context.accumulatedContent += existingMessage.content } } } @@ -3447,7 +1544,7 @@ export const useCopilotStore = create()( }, 600000) try { - for await (const data of parseSSEStream(reader, decoder)) { + for await (const data of parseSSEStream(reader, decoder, abortSignal)) { if (abortSignal?.aborted) { context.wasAborted = true const { suppressAbortContinueOption } = get() @@ -3462,8 +1559,9 @@ export const useCopilotStore = create()( break } - const eventId = typeof data?.eventId === 'number' ? data.eventId : undefined - const streamId = typeof data?.streamId === 'string' ? data.streamId : undefined + const eventMeta = data as { eventId?: unknown; streamId?: unknown } + const eventId = typeof eventMeta.eventId === 'number' ? eventMeta.eventId : undefined + const streamId = typeof eventMeta.streamId === 'string' ? eventMeta.streamId : undefined if (expectedStreamId && streamId && streamId !== expectedStreamId) { logger.warn('[SSE] Ignoring event for mismatched stream', { expectedStreamId, @@ -3495,15 +1593,11 @@ export const useCopilotStore = create()( sseHandlers.stream_end({}, context, get, set) } - if (streamingUpdateRAF !== null) { - cancelAnimationFrame(streamingUpdateRAF) - streamingUpdateRAF = null - } - streamingUpdateQueue.clear() + stopStreamingUpdates() let sanitizedContentBlocks: any[] = [] if (context.contentBlocks && context.contentBlocks.length > 0) { - const optimizedBlocks = createOptimizedContentBlocks(context.contentBlocks) + const optimizedBlocks = context.contentBlocks.map((block: any) => ({ ...block })) sanitizedContentBlocks = optimizedBlocks.map((block: any) => block.type === TEXT_BLOCK_TYPE && typeof block.content === 'string' ? { ...block, content: stripTodoTags(block.content) } @@ -3524,15 +1618,7 @@ export const useCopilotStore = create()( } } - if (context.contentBlocks) { - context.contentBlocks.forEach((block) => { - if (block.type === TEXT_BLOCK_TYPE || block.type === THINKING_BLOCK_TYPE) { - contentBlockPool.release(block) - } - }) - } - - const finalContent = stripTodoTags(context.accumulatedContent.toString()) + const finalContent = stripTodoTags(context.accumulatedContent) const finalContentStripped = isContinuation ? stripContinueOption(finalContent) : finalContent @@ -3616,10 +1702,10 @@ export const useCopilotStore = create()( contentLength: lastMsg.content?.length || 0, hasContentBlocks: !!lastMsg.contentBlocks, contentBlockCount: lastMsg.contentBlocks?.length || 0, - contentBlockTypes: (lastMsg.contentBlocks as any[])?.map((b) => b?.type) || [], + contentBlockTypes: (lastMsg.contentBlocks as any[])?.map((b) => b?.type) ?? [], }) } - const dbMessages = validateMessagesForLLM(currentMessages) + const dbMessages = serializeMessagesForDB(currentMessages, get().sensitiveCredentialIds) const config = { mode, model: selectedModel, @@ -3701,7 +1787,7 @@ export const useCopilotStore = create()( set({ currentChat: newChat, - chats: [newChat, ...(get().chats || [])], + chats: [newChat, ...(get().chats ?? [])], chatsLastLoadedAt: null, chatsLoadedForWorkflow: null, planTodos: [], @@ -3714,16 +1800,10 @@ export const useCopilotStore = create()( clearError: () => set({ error: null }), clearSaveError: () => set({ saveError: null }), clearCheckpointError: () => set({ checkpointError: null }), - retrySave: async (_chatId: string) => {}, - cleanup: () => { const { isSendingMessage } = get() if (isSendingMessage) get().abortMessage() - if (streamingUpdateRAF !== null) { - cancelAnimationFrame(streamingUpdateRAF) - streamingUpdateRAF = null - } - streamingUpdateQueue.clear() + stopStreamingUpdates() // Clear any diff on cleanup try { useWorkflowDiffStore.getState().clearDiff() @@ -3765,7 +1845,7 @@ export const useCopilotStore = create()( if (currentChat) { try { const currentMessages = get().messages - const dbMessages = validateMessagesForLLM(currentMessages) + const dbMessages = serializeMessagesForDB(currentMessages, get().sensitiveCredentialIds) const { mode, selectedModel } = get() await fetch('/api/copilot/chat/update-messages', { @@ -3807,7 +1887,7 @@ export const useCopilotStore = create()( if (currentChat) { try { const currentMessages = get().messages - const dbMessages = validateMessagesForLLM(currentMessages) + const dbMessages = serializeMessagesForDB(currentMessages, get().sensitiveCredentialIds) const { mode, selectedModel } = get() await fetch('/api/copilot/chat/update-messages', { @@ -3855,7 +1935,7 @@ export const useCopilotStore = create()( logger.info('[AutoAllowedTools] Load response', { status: res.status, ok: res.ok }) if (res.ok) { const data = await res.json() - const tools = data.autoAllowedTools || [] + const tools = data.autoAllowedTools ?? [] set({ autoAllowedTools: tools }) logger.info('[AutoAllowedTools] Loaded successfully', { count: tools.length, tools }) } else { @@ -3878,7 +1958,7 @@ export const useCopilotStore = create()( if (res.ok) { const data = await res.json() logger.info('[AutoAllowedTools] API returned', { toolId, tools: data.autoAllowedTools }) - set({ autoAllowedTools: data.autoAllowedTools || [] }) + set({ autoAllowedTools: data.autoAllowedTools ?? [] }) logger.info('[AutoAllowedTools] Added tool to store', { toolId }) } } catch (err) { @@ -3896,7 +1976,7 @@ export const useCopilotStore = create()( ) if (res.ok) { const data = await res.json() - set({ autoAllowedTools: data.autoAllowedTools || [] }) + set({ autoAllowedTools: data.autoAllowedTools ?? [] }) logger.info('[AutoAllowedTools] Removed tool', { toolId }) } } catch (err) { @@ -3923,7 +2003,7 @@ export const useCopilotStore = create()( } const json = await res.json() // Credentials are at result.oauth.connected.credentials - const credentials = json?.result?.oauth?.connected?.credentials || [] + const credentials = json?.result?.oauth?.connected?.credentials ?? [] logger.info('[loadSensitiveCredentialIds] Response', { hasResult: !!json?.result, credentialCount: credentials.length, diff --git a/apps/sim/stores/panel/copilot/types.ts b/apps/sim/stores/panel/copilot/types.ts index 9357ebd6b..07e77ea60 100644 --- a/apps/sim/stores/panel/copilot/types.ts +++ b/apps/sim/stores/panel/copilot/types.ts @@ -122,7 +122,6 @@ export interface CopilotState { messages: CopilotMessage[] workflowId: string | null - checkpoints: any[] messageCheckpoints: Record messageSnapshots: Record @@ -187,7 +186,6 @@ export interface CopilotActions { setWorkflowId: (workflowId: string | null) => Promise validateCurrentChat: () => boolean loadChats: (forceRefresh?: boolean) => Promise - areChatsFresh: (workflowId: string) => boolean selectChat: (chat: CopilotChat) => Promise createNewChat: () => Promise deleteChat: (chatId: string) => Promise @@ -214,10 +212,6 @@ export interface CopilotActions { resumeActiveStream: () => Promise setToolCallState: (toolCall: any, newState: ClientToolCallState, options?: any) => void updateToolCallParams: (toolCallId: string, params: Record) => void - sendDocsMessage: (query: string, options?: { stream?: boolean; topK?: number }) => Promise - saveChatMessages: (chatId: string) => Promise - - loadCheckpoints: (chatId: string) => Promise loadMessageCheckpoints: (chatId: string) => Promise revertToCheckpoint: (checkpointId: string) => Promise getCheckpointsForMessage: (messageId: string) => any[] @@ -227,7 +221,6 @@ export interface CopilotActions { clearError: () => void clearSaveError: () => void clearCheckpointError: () => void - retrySave: (chatId: string) => Promise cleanup: () => void reset: () => void diff --git a/apps/sim/stores/workflow-diff/store.ts b/apps/sim/stores/workflow-diff/store.ts index 5b9ba8b6b..116fa83d7 100644 --- a/apps/sim/stores/workflow-diff/store.ts +++ b/apps/sim/stores/workflow-diff/store.ts @@ -1,4 +1,10 @@ import { createLogger } from '@sim/logger' + +declare global { + interface Window { + __skipDiffRecording?: boolean + } +} import { create } from 'zustand' import { devtools } from 'zustand/middleware' import { stripWorkflowDiffMarkers, WorkflowDiffEngine } from '@/lib/workflows/diff' @@ -21,6 +27,17 @@ import { const logger = createLogger('WorkflowDiffStore') const diffEngine = new WorkflowDiffEngine() +const RESET_DIFF_STATE = { + hasActiveDiff: false, + isShowingDiff: false, + isDiffReady: false, + baselineWorkflow: null, + baselineWorkflowId: null, + diffAnalysis: null, + diffMetadata: null, + diffError: null, + _triggerMessageId: null, +} /** * Detects when a diff contains no meaningful changes. @@ -104,17 +121,7 @@ export const useWorkflowDiffStore = create { @@ -301,17 +298,7 @@ export const useWorkflowDiffStore = create