Continued cleanup

This commit is contained in:
Siddharth Ganesan
2026-02-05 15:26:24 -08:00
parent ea22b1da4d
commit 54a5e06789
18 changed files with 1919 additions and 2142 deletions

View File

@@ -107,7 +107,6 @@ export const Copilot = forwardRef<CopilotRef, CopilotProps>(({ panelWidth }, ref
currentChat,
selectChat,
deleteChat,
areChatsFresh,
workflowId: copilotWorkflowId,
setPlanTodos,
closePlanTodos,
@@ -142,7 +141,6 @@ export const Copilot = forwardRef<CopilotRef, CopilotProps>(({ panelWidth }, ref
activeWorkflowId,
copilotWorkflowId,
loadChats,
areChatsFresh,
isSendingMessage,
}
)

View File

@@ -10,7 +10,6 @@ interface UseChatHistoryProps {
activeWorkflowId: string | null
copilotWorkflowId: string | null
loadChats: (forceRefresh: boolean) => Promise<void>
areChatsFresh: (workflowId: string) => boolean
isSendingMessage: boolean
}
@@ -21,8 +20,7 @@ interface UseChatHistoryProps {
* @returns Chat history utilities
*/
export function useChatHistory(props: UseChatHistoryProps) {
const { chats, activeWorkflowId, copilotWorkflowId, loadChats, areChatsFresh, isSendingMessage } =
props
const { chats, activeWorkflowId, copilotWorkflowId, loadChats, isSendingMessage } = props
/** Groups chats by time period (Today, Yesterday, This Week, etc.) */
const groupedChats = useMemo(() => {
@@ -80,7 +78,7 @@ export function useChatHistory(props: UseChatHistoryProps) {
/** Handles history dropdown opening and loads chats if needed (non-blocking) */
const handleHistoryDropdownOpen = useCallback(
(open: boolean) => {
if (open && activeWorkflowId && !isSendingMessage && !areChatsFresh(activeWorkflowId)) {
if (open && activeWorkflowId && !isSendingMessage) {
loadChats(false).catch((error) => {
logger.error('Failed to load chat history:', error)
})
@@ -90,7 +88,7 @@ export function useChatHistory(props: UseChatHistoryProps) {
logger.info('Chat history opened during stream - showing cached data only')
}
},
[activeWorkflowId, areChatsFresh, isSendingMessage, loadChats]
[activeWorkflowId, isSendingMessage, loadChats]
)
return {

View File

@@ -0,0 +1,149 @@
import type {
ChatContext,
CopilotMessage,
MessageFileAttachment,
} from '@/stores/panel/copilot/types'
import type { StreamingContext } from './types'
const TEXT_BLOCK_TYPE = 'text'
const THINKING_BLOCK_TYPE = 'thinking'
const CONTINUE_OPTIONS_TAG = '<options>{"1":"Continue"}</options>'
export function createUserMessage(
content: string,
fileAttachments?: MessageFileAttachment[],
contexts?: ChatContext[],
messageId?: string
): CopilotMessage {
return {
id: messageId || crypto.randomUUID(),
role: 'user',
content,
timestamp: new Date().toISOString(),
...(fileAttachments && fileAttachments.length > 0 && { fileAttachments }),
...(contexts && contexts.length > 0 && { contexts }),
...(contexts &&
contexts.length > 0 && {
contentBlocks: [
{ type: 'contexts', contexts: contexts as any, timestamp: Date.now() },
] as any,
}),
}
}
export function createStreamingMessage(): CopilotMessage {
return {
id: crypto.randomUUID(),
role: 'assistant',
content: '',
timestamp: new Date().toISOString(),
}
}
export function createErrorMessage(
messageId: string,
content: string,
errorType?: 'usage_limit' | 'unauthorized' | 'forbidden' | 'rate_limit' | 'upgrade_required'
): CopilotMessage {
return {
id: messageId,
role: 'assistant',
content,
timestamp: new Date().toISOString(),
contentBlocks: [
{
type: 'text',
content,
timestamp: Date.now(),
},
],
errorType,
}
}
export function appendTextBlock(context: StreamingContext, text: string) {
if (!text) return
context.accumulatedContent += text
if (context.currentTextBlock && context.contentBlocks.length > 0) {
const lastBlock = context.contentBlocks[context.contentBlocks.length - 1]
if (lastBlock.type === TEXT_BLOCK_TYPE && lastBlock === context.currentTextBlock) {
lastBlock.content += text
return
}
}
context.currentTextBlock = { type: '', content: '', timestamp: 0, toolCall: null }
context.currentTextBlock.type = TEXT_BLOCK_TYPE
context.currentTextBlock.content = text
context.currentTextBlock.timestamp = Date.now()
context.contentBlocks.push(context.currentTextBlock)
}
export function appendContinueOption(content: string): string {
if (/<options>/i.test(content)) return content
const suffix = content.trim().length > 0 ? '\n\n' : ''
return `${content}${suffix}${CONTINUE_OPTIONS_TAG}`
}
export function appendContinueOptionBlock(blocks: any[]): any[] {
if (!Array.isArray(blocks)) return blocks
const hasOptions = blocks.some(
(block) =>
block?.type === TEXT_BLOCK_TYPE &&
typeof block.content === 'string' &&
/<options>/i.test(block.content)
)
if (hasOptions) return blocks
return [
...blocks,
{
type: TEXT_BLOCK_TYPE,
content: CONTINUE_OPTIONS_TAG,
timestamp: Date.now(),
},
]
}
export function stripContinueOption(content: string): string {
if (!content || !content.includes(CONTINUE_OPTIONS_TAG)) return content
const next = content.replace(CONTINUE_OPTIONS_TAG, '')
return next.replace(/\n{2,}\s*$/g, '\n').trimEnd()
}
export function stripContinueOptionFromBlocks(blocks: any[]): any[] {
if (!Array.isArray(blocks)) return blocks
return blocks.flatMap((block) => {
if (
block?.type === TEXT_BLOCK_TYPE &&
typeof block.content === 'string' &&
block.content.includes(CONTINUE_OPTIONS_TAG)
) {
const nextContent = stripContinueOption(block.content)
if (!nextContent.trim()) return []
return [{ ...block, content: nextContent }]
}
return [block]
})
}
export function beginThinkingBlock(context: StreamingContext) {
if (!context.currentThinkingBlock) {
context.currentThinkingBlock = { type: '', content: '', timestamp: 0, toolCall: null }
context.currentThinkingBlock.type = THINKING_BLOCK_TYPE
context.currentThinkingBlock.content = ''
context.currentThinkingBlock.timestamp = Date.now()
;(context.currentThinkingBlock as any).startTime = Date.now()
context.contentBlocks.push(context.currentThinkingBlock)
}
context.isInThinkingBlock = true
context.currentTextBlock = null
}
export function finalizeThinkingBlock(context: StreamingContext) {
if (context.currentThinkingBlock) {
context.currentThinkingBlock.duration =
Date.now() - (context.currentThinkingBlock.startTime || Date.now())
}
context.isInThinkingBlock = false
context.currentThinkingBlock = null
context.currentTextBlock = null
}

View File

@@ -0,0 +1,720 @@
import { createLogger } from '@sim/logger'
import { ClientToolCallState } from '@/lib/copilot/tools/client/tool-display-registry'
import { useWorkflowDiffStore } from '@/stores/workflow-diff/store'
import type { CopilotStore, CopilotToolCall } from '@/stores/panel/copilot/types'
import {
appendTextBlock,
beginThinkingBlock,
finalizeThinkingBlock,
} from './content-blocks'
import type { StreamingContext } from './types'
import {
isBackgroundState,
isRejectedState,
isReviewState,
resolveToolDisplay,
} from '@/lib/copilot/store-utils'
const logger = createLogger('CopilotClientSseHandlers')
const STREAM_STORAGE_KEY = 'copilot_active_stream'
const TEXT_BLOCK_TYPE = 'text'
const MAX_BATCH_INTERVAL = 50
const MIN_BATCH_INTERVAL = 16
const MAX_QUEUE_SIZE = 5
function writeActiveStreamToStorage(info: any): void {
if (typeof window === 'undefined') return
try {
if (!info) {
window.sessionStorage.removeItem(STREAM_STORAGE_KEY)
return
}
window.sessionStorage.setItem(STREAM_STORAGE_KEY, JSON.stringify(info))
} catch {}
}
export type SSEHandler = (
data: any,
context: StreamingContext,
get: () => CopilotStore,
set: any
) => Promise<void> | void
const streamingUpdateQueue = new Map<string, StreamingContext>()
let streamingUpdateRAF: number | null = null
let lastBatchTime = 0
export function stopStreamingUpdates() {
if (streamingUpdateRAF !== null) {
cancelAnimationFrame(streamingUpdateRAF)
streamingUpdateRAF = null
}
streamingUpdateQueue.clear()
}
function createOptimizedContentBlocks(contentBlocks: any[]): any[] {
const result: any[] = new Array(contentBlocks.length)
for (let i = 0; i < contentBlocks.length; i++) {
const block = contentBlocks[i]
result[i] = { ...block }
}
return result
}
export function flushStreamingUpdates(set: any) {
if (streamingUpdateRAF !== null) {
cancelAnimationFrame(streamingUpdateRAF)
streamingUpdateRAF = null
}
if (streamingUpdateQueue.size === 0) return
const updates = new Map(streamingUpdateQueue)
streamingUpdateQueue.clear()
set((state: CopilotStore) => {
if (updates.size === 0) return state
return {
messages: state.messages.map((msg) => {
const update = updates.get(msg.id)
if (update) {
return {
...msg,
content: '',
contentBlocks:
update.contentBlocks.length > 0 ? createOptimizedContentBlocks(update.contentBlocks) : [],
}
}
return msg
}),
}
})
}
export function updateStreamingMessage(set: any, context: StreamingContext) {
if (context.suppressStreamingUpdates) return
const now = performance.now()
streamingUpdateQueue.set(context.messageId, context)
const timeSinceLastBatch = now - lastBatchTime
const shouldFlushImmediately =
streamingUpdateQueue.size >= MAX_QUEUE_SIZE || timeSinceLastBatch > MAX_BATCH_INTERVAL
if (streamingUpdateRAF === null) {
const scheduleUpdate = () => {
streamingUpdateRAF = requestAnimationFrame(() => {
const updates = new Map(streamingUpdateQueue)
streamingUpdateQueue.clear()
streamingUpdateRAF = null
lastBatchTime = performance.now()
set((state: CopilotStore) => {
if (updates.size === 0) return state
const messages = state.messages
const lastMessage = messages[messages.length - 1]
const lastMessageUpdate = lastMessage ? updates.get(lastMessage.id) : null
if (updates.size === 1 && lastMessageUpdate) {
const newMessages = [...messages]
newMessages[messages.length - 1] = {
...lastMessage,
content: '',
contentBlocks:
lastMessageUpdate.contentBlocks.length > 0
? createOptimizedContentBlocks(lastMessageUpdate.contentBlocks)
: [],
}
return { messages: newMessages }
}
return {
messages: messages.map((msg) => {
const update = updates.get(msg.id)
if (update) {
return {
...msg,
content: '',
contentBlocks:
update.contentBlocks.length > 0
? createOptimizedContentBlocks(update.contentBlocks)
: [],
}
}
return msg
}),
}
})
})
}
if (shouldFlushImmediately) scheduleUpdate()
else setTimeout(scheduleUpdate, Math.max(0, MIN_BATCH_INTERVAL - timeSinceLastBatch))
}
}
export function upsertToolCallBlock(context: StreamingContext, toolCall: CopilotToolCall) {
let found = false
for (let i = 0; i < context.contentBlocks.length; i++) {
const b = context.contentBlocks[i] as any
if (b.type === 'tool_call' && b.toolCall?.id === toolCall.id) {
context.contentBlocks[i] = { ...b, toolCall }
found = true
break
}
}
if (!found) {
context.contentBlocks.push({ type: 'tool_call', toolCall, timestamp: Date.now() })
}
}
function stripThinkingTags(text: string): string {
return text.replace(/<\/?thinking[^>]*>/gi, '').replace(/&lt;\/?thinking[^&]*&gt;/gi, '')
}
function appendThinkingContent(context: StreamingContext, text: string) {
if (!text) return
const cleanedText = stripThinkingTags(text)
if (!cleanedText) return
if (context.currentThinkingBlock) {
context.currentThinkingBlock.content += cleanedText
} else {
context.currentThinkingBlock = { type: '', content: '', timestamp: 0, toolCall: null }
context.currentThinkingBlock.type = 'thinking'
context.currentThinkingBlock.content = cleanedText
context.currentThinkingBlock.timestamp = Date.now()
context.currentThinkingBlock.startTime = Date.now()
context.contentBlocks.push(context.currentThinkingBlock)
}
context.isInThinkingBlock = true
context.currentTextBlock = null
}
export const sseHandlers: Record<string, SSEHandler> = {
chat_id: async (data, context, get, set) => {
context.newChatId = data.chatId
const { currentChat, activeStream } = get()
if (!currentChat && context.newChatId) {
await get().handleNewChatCreation(context.newChatId)
}
if (activeStream && context.newChatId && !activeStream.chatId) {
const updatedStream = { ...activeStream, chatId: context.newChatId }
set({ activeStream: updatedStream })
writeActiveStreamToStorage(updatedStream)
}
},
title_updated: (_data, _context, get, set) => {
const title = _data.title
if (!title) return
const { currentChat, chats } = get()
if (currentChat) {
set({
currentChat: { ...currentChat, title },
chats: chats.map((c) => (c.id === currentChat.id ? { ...c, title } : c)),
})
}
},
tool_result: (data, context, get, set) => {
try {
const toolCallId: string | undefined = data?.toolCallId || data?.data?.id
const success: boolean | undefined = data?.success
const failedDependency: boolean = data?.failedDependency === true
const skipped: boolean = data?.result?.skipped === true
if (!toolCallId) return
const { toolCallsById } = get()
const current = toolCallsById[toolCallId]
if (current) {
if (
isRejectedState(current.state) ||
isReviewState(current.state) ||
isBackgroundState(current.state)
) {
return
}
const targetState = success
? ClientToolCallState.success
: failedDependency || skipped
? ClientToolCallState.rejected
: ClientToolCallState.error
const updatedMap = { ...toolCallsById }
updatedMap[toolCallId] = {
...current,
state: targetState,
display: resolveToolDisplay(
current.name,
targetState,
current.id,
(current as any).params
),
}
set({ toolCallsById: updatedMap })
if (targetState === ClientToolCallState.success && current.name === 'checkoff_todo') {
try {
const result = (data?.result || data?.data?.result) ?? {}
const input = ((current as any).params || (current as any).input) ?? {}
const todoId = input.id || input.todoId || result.id || result.todoId
if (todoId) {
get().updatePlanTodoStatus(todoId, 'completed')
}
} catch {}
}
if (
targetState === ClientToolCallState.success &&
current.name === 'mark_todo_in_progress'
) {
try {
const result = (data?.result || data?.data?.result) ?? {}
const input = ((current as any).params || (current as any).input) ?? {}
const todoId = input.id || input.todoId || result.id || result.todoId
if (todoId) {
get().updatePlanTodoStatus(todoId, 'executing')
}
} catch {}
}
if (current.name === 'edit_workflow') {
try {
const resultPayload =
(data?.result || data?.data?.result || data?.data?.data || data?.data) ?? {}
const workflowState = resultPayload?.workflowState
logger.info('[SSE] edit_workflow result received', {
hasWorkflowState: !!workflowState,
blockCount: workflowState ? Object.keys(workflowState.blocks ?? {}).length : 0,
edgeCount: workflowState?.edges?.length ?? 0,
})
if (workflowState) {
const diffStore = useWorkflowDiffStore.getState()
diffStore.setProposedChanges(workflowState).catch((err) => {
logger.error('[SSE] Failed to apply edit_workflow diff', {
error: err instanceof Error ? err.message : String(err),
})
})
}
} catch (err) {
logger.error('[SSE] edit_workflow result handling failed', {
error: err instanceof Error ? err.message : String(err),
})
}
}
}
for (let i = 0; i < context.contentBlocks.length; i++) {
const b = context.contentBlocks[i] as any
if (b?.type === 'tool_call' && b?.toolCall?.id === toolCallId) {
if (
isRejectedState(b.toolCall?.state) ||
isReviewState(b.toolCall?.state) ||
isBackgroundState(b.toolCall?.state)
)
break
const targetState = success
? ClientToolCallState.success
: failedDependency || skipped
? ClientToolCallState.rejected
: ClientToolCallState.error
context.contentBlocks[i] = {
...b,
toolCall: {
...b.toolCall,
state: targetState,
display: resolveToolDisplay(
b.toolCall?.name,
targetState,
toolCallId,
b.toolCall?.params
),
},
}
break
}
}
updateStreamingMessage(set, context)
} catch {}
},
tool_error: (data, context, get, set) => {
try {
const toolCallId: string | undefined = data?.toolCallId || data?.data?.id
const failedDependency: boolean = data?.failedDependency === true
if (!toolCallId) return
const { toolCallsById } = get()
const current = toolCallsById[toolCallId]
if (current) {
if (
isRejectedState(current.state) ||
isReviewState(current.state) ||
isBackgroundState(current.state)
) {
return
}
const targetState = failedDependency
? ClientToolCallState.rejected
: ClientToolCallState.error
const updatedMap = { ...toolCallsById }
updatedMap[toolCallId] = {
...current,
state: targetState,
display: resolveToolDisplay(
current.name,
targetState,
current.id,
(current as any).params
),
}
set({ toolCallsById: updatedMap })
}
for (let i = 0; i < context.contentBlocks.length; i++) {
const b = context.contentBlocks[i] as any
if (b?.type === 'tool_call' && b?.toolCall?.id === toolCallId) {
if (
isRejectedState(b.toolCall?.state) ||
isReviewState(b.toolCall?.state) ||
isBackgroundState(b.toolCall?.state)
)
break
const targetState = failedDependency
? ClientToolCallState.rejected
: ClientToolCallState.error
context.contentBlocks[i] = {
...b,
toolCall: {
...b.toolCall,
state: targetState,
display: resolveToolDisplay(
b.toolCall?.name,
targetState,
toolCallId,
b.toolCall?.params
),
},
}
break
}
}
updateStreamingMessage(set, context)
} catch {}
},
tool_generating: (data, context, get, set) => {
const { toolCallId, toolName } = data
if (!toolCallId || !toolName) return
const { toolCallsById } = get()
if (!toolCallsById[toolCallId]) {
const initialState = ClientToolCallState.pending
const tc: CopilotToolCall = {
id: toolCallId,
name: toolName,
state: initialState,
display: resolveToolDisplay(toolName, initialState, toolCallId),
}
const updated = { ...toolCallsById, [toolCallId]: tc }
set({ toolCallsById: updated })
logger.info('[toolCallsById] map updated', updated)
upsertToolCallBlock(context, tc)
updateStreamingMessage(set, context)
}
},
tool_call: (data, context, get, set) => {
const toolData = data?.data ?? {}
const id: string | undefined = toolData.id || data?.toolCallId
const name: string | undefined = toolData.name || data?.toolName
if (!id) return
const args = toolData.arguments
const isPartial = toolData.partial === true
const { toolCallsById } = get()
const existing = toolCallsById[id]
const next: CopilotToolCall = existing
? {
...existing,
state: ClientToolCallState.pending,
...(args ? { params: args } : {}),
display: resolveToolDisplay(name, ClientToolCallState.pending, id, args),
}
: {
id,
name: name || 'unknown_tool',
state: ClientToolCallState.pending,
...(args ? { params: args } : {}),
display: resolveToolDisplay(name, ClientToolCallState.pending, id, args),
}
const updated = { ...toolCallsById, [id]: next }
set({ toolCallsById: updated })
logger.info('[toolCallsById] → pending', { id, name, params: args })
upsertToolCallBlock(context, next)
updateStreamingMessage(set, context)
if (isPartial) {
return
}
return
},
reasoning: (data, context, _get, set) => {
const phase = (data && (data.phase || data?.data?.phase)) as string | undefined
if (phase === 'start') {
beginThinkingBlock(context)
updateStreamingMessage(set, context)
return
}
if (phase === 'end') {
finalizeThinkingBlock(context)
updateStreamingMessage(set, context)
return
}
const chunk: string = typeof data?.data === 'string' ? data.data : data?.content || ''
if (!chunk) return
appendThinkingContent(context, chunk)
updateStreamingMessage(set, context)
},
content: (data, context, get, set) => {
if (!data.data) return
context.pendingContent += data.data
let contentToProcess = context.pendingContent
let hasProcessedContent = false
const thinkingStartRegex = /<thinking>/
const thinkingEndRegex = /<\/thinking>/
const designWorkflowStartRegex = /<design_workflow>/
const designWorkflowEndRegex = /<\/design_workflow>/
const splitTrailingPartialTag = (
text: string,
tags: string[]
): { text: string; remaining: string } => {
const partialIndex = text.lastIndexOf('<')
if (partialIndex < 0) {
return { text, remaining: '' }
}
const possibleTag = text.substring(partialIndex)
const matchesTagStart = tags.some((tag) => tag.startsWith(possibleTag))
if (!matchesTagStart) {
return { text, remaining: '' }
}
return {
text: text.substring(0, partialIndex),
remaining: possibleTag,
}
}
while (contentToProcess.length > 0) {
if (context.isInDesignWorkflowBlock) {
const endMatch = designWorkflowEndRegex.exec(contentToProcess)
if (endMatch) {
const designContent = contentToProcess.substring(0, endMatch.index)
context.designWorkflowContent += designContent
context.isInDesignWorkflowBlock = false
logger.info('[design_workflow] Tag complete, setting plan content', {
contentLength: context.designWorkflowContent.length,
})
set({ streamingPlanContent: context.designWorkflowContent })
contentToProcess = contentToProcess.substring(endMatch.index + endMatch[0].length)
hasProcessedContent = true
} else {
const { text, remaining } = splitTrailingPartialTag(contentToProcess, [
'</design_workflow>',
])
context.designWorkflowContent += text
set({ streamingPlanContent: context.designWorkflowContent })
contentToProcess = remaining
hasProcessedContent = true
if (remaining) {
break
}
}
continue
}
if (!context.isInThinkingBlock && !context.isInDesignWorkflowBlock) {
const designStartMatch = designWorkflowStartRegex.exec(contentToProcess)
if (designStartMatch) {
const textBeforeDesign = contentToProcess.substring(0, designStartMatch.index)
if (textBeforeDesign) {
appendTextBlock(context, textBeforeDesign)
hasProcessedContent = true
}
context.isInDesignWorkflowBlock = true
context.designWorkflowContent = ''
contentToProcess = contentToProcess.substring(
designStartMatch.index + designStartMatch[0].length
)
hasProcessedContent = true
continue
}
const nextMarkIndex = contentToProcess.indexOf('<marktodo>')
const nextCheckIndex = contentToProcess.indexOf('<checkofftodo>')
const hasMark = nextMarkIndex >= 0
const hasCheck = nextCheckIndex >= 0
const nextTagIndex =
hasMark && hasCheck
? Math.min(nextMarkIndex, nextCheckIndex)
: hasMark
? nextMarkIndex
: hasCheck
? nextCheckIndex
: -1
if (nextTagIndex >= 0) {
const isMarkTodo = hasMark && nextMarkIndex === nextTagIndex
const tagStart = isMarkTodo ? '<marktodo>' : '<checkofftodo>'
const tagEnd = isMarkTodo ? '</marktodo>' : '</checkofftodo>'
const closingIndex = contentToProcess.indexOf(tagEnd, nextTagIndex + tagStart.length)
if (closingIndex === -1) {
break
}
const todoId = contentToProcess
.substring(nextTagIndex + tagStart.length, closingIndex)
.trim()
logger.info(
isMarkTodo ? '[TODO] Detected marktodo tag' : '[TODO] Detected checkofftodo tag',
{ todoId }
)
if (todoId) {
try {
get().updatePlanTodoStatus(todoId, isMarkTodo ? 'executing' : 'completed')
logger.info(
isMarkTodo
? '[TODO] Successfully marked todo in progress'
: '[TODO] Successfully checked off todo',
{ todoId }
)
} catch (e) {
logger.error(
isMarkTodo
? '[TODO] Failed to mark todo in progress'
: '[TODO] Failed to checkoff todo',
{ todoId, error: e }
)
}
} else {
logger.warn('[TODO] Empty todoId extracted from todo tag', { tagType: tagStart })
}
let beforeTag = contentToProcess.substring(0, nextTagIndex)
let afterTag = contentToProcess.substring(closingIndex + tagEnd.length)
const hadNewlineBefore = /(\r?\n)+$/.test(beforeTag)
const hadNewlineAfter = /^(\r?\n)+/.test(afterTag)
beforeTag = beforeTag.replace(/(\r?\n)+$/, '')
afterTag = afterTag.replace(/^(\r?\n)+/, '')
contentToProcess =
beforeTag + (hadNewlineBefore && hadNewlineAfter ? '\n' : '') + afterTag
context.currentTextBlock = null
hasProcessedContent = true
continue
}
}
if (context.isInThinkingBlock) {
const endMatch = thinkingEndRegex.exec(contentToProcess)
if (endMatch) {
const thinkingContent = contentToProcess.substring(0, endMatch.index)
appendThinkingContent(context, thinkingContent)
finalizeThinkingBlock(context)
contentToProcess = contentToProcess.substring(endMatch.index + endMatch[0].length)
hasProcessedContent = true
} else {
const { text, remaining } = splitTrailingPartialTag(contentToProcess, ['</thinking>'])
if (text) {
appendThinkingContent(context, text)
hasProcessedContent = true
}
contentToProcess = remaining
if (remaining) {
break
}
}
} else {
const startMatch = thinkingStartRegex.exec(contentToProcess)
if (startMatch) {
const textBeforeThinking = contentToProcess.substring(0, startMatch.index)
if (textBeforeThinking) {
appendTextBlock(context, textBeforeThinking)
hasProcessedContent = true
}
context.isInThinkingBlock = true
context.currentTextBlock = null
contentToProcess = contentToProcess.substring(startMatch.index + startMatch[0].length)
hasProcessedContent = true
} else {
let partialTagIndex = contentToProcess.lastIndexOf('<')
const partialMarkTodo = contentToProcess.lastIndexOf('<marktodo')
const partialCheckoffTodo = contentToProcess.lastIndexOf('<checkofftodo')
if (partialMarkTodo > partialTagIndex) {
partialTagIndex = partialMarkTodo
}
if (partialCheckoffTodo > partialTagIndex) {
partialTagIndex = partialCheckoffTodo
}
let textToAdd = contentToProcess
let remaining = ''
if (partialTagIndex >= 0 && partialTagIndex > contentToProcess.length - 50) {
textToAdd = contentToProcess.substring(0, partialTagIndex)
remaining = contentToProcess.substring(partialTagIndex)
}
if (textToAdd) {
appendTextBlock(context, textToAdd)
hasProcessedContent = true
}
contentToProcess = remaining
break
}
}
}
context.pendingContent = contentToProcess
if (hasProcessedContent) {
updateStreamingMessage(set, context)
}
},
done: (_data, context) => {
logger.info('[SSE] DONE EVENT RECEIVED', {
doneEventCount: context.doneEventCount,
data: _data,
})
context.doneEventCount++
if (context.doneEventCount >= 1) {
logger.info('[SSE] Setting streamComplete = true, stream will terminate')
context.streamComplete = true
}
},
error: (data, context, _get, set) => {
logger.error('Stream error:', data.error)
set((state: CopilotStore) => ({
messages: state.messages.map((msg) =>
msg.id === context.messageId
? {
...msg,
content: context.accumulatedContent || 'An error occurred.',
error: data.error,
}
: msg
),
}))
context.streamComplete = true
},
stream_end: (_data, context, _get, set) => {
if (context.pendingContent) {
if (context.isInThinkingBlock && context.currentThinkingBlock) {
appendThinkingContent(context, context.pendingContent)
} else if (context.pendingContent.trim()) {
appendTextBlock(context, context.pendingContent)
}
context.pendingContent = ''
}
finalizeThinkingBlock(context)
updateStreamingMessage(set, context)
},
default: () => {},
}

View File

@@ -0,0 +1,3 @@
export { sseHandlers } from './handlers'
export { subAgentSSEHandlers, applySseEvent } from './subagent-handlers'
export type { SSEHandler } from './handlers'

View File

@@ -0,0 +1,360 @@
import { createLogger } from '@sim/logger'
import {
normalizeSseEvent,
shouldSkipToolCallEvent,
shouldSkipToolResultEvent,
} from '@/lib/copilot/orchestrator/sse-utils'
import { ClientToolCallState } from '@/lib/copilot/tools/client/tool-display-registry'
import { resolveToolDisplay } from '@/lib/copilot/store-utils'
import type { CopilotStore, CopilotToolCall } from '@/stores/panel/copilot/types'
import type { StreamingContext } from './types'
import { sseHandlers, type SSEHandler, updateStreamingMessage } from './handlers'
const logger = createLogger('CopilotClientSubagentHandlers')
export function appendSubAgentContent(
context: StreamingContext,
parentToolCallId: string,
text: string
) {
if (!context.subAgentContent[parentToolCallId]) {
context.subAgentContent[parentToolCallId] = ''
}
if (!context.subAgentBlocks[parentToolCallId]) {
context.subAgentBlocks[parentToolCallId] = []
}
context.subAgentContent[parentToolCallId] += text
const blocks = context.subAgentBlocks[parentToolCallId]
const lastBlock = blocks[blocks.length - 1]
if (lastBlock && lastBlock.type === 'subagent_text') {
lastBlock.content = (lastBlock.content || '') + text
} else {
blocks.push({
type: 'subagent_text',
content: text,
timestamp: Date.now(),
})
}
}
export function updateToolCallWithSubAgentData(
context: StreamingContext,
get: () => CopilotStore,
set: any,
parentToolCallId: string
) {
const { toolCallsById } = get()
const parentToolCall = toolCallsById[parentToolCallId]
if (!parentToolCall) {
logger.warn('[SubAgent] updateToolCallWithSubAgentData: parent tool call not found', {
parentToolCallId,
availableToolCallIds: Object.keys(toolCallsById),
})
return
}
const blocks = context.subAgentBlocks[parentToolCallId] ?? []
const updatedToolCall: CopilotToolCall = {
...parentToolCall,
subAgentContent: context.subAgentContent[parentToolCallId] || '',
subAgentToolCalls: context.subAgentToolCalls[parentToolCallId] ?? [],
subAgentBlocks: blocks,
subAgentStreaming: true,
}
logger.info('[SubAgent] Updating tool call with subagent data', {
parentToolCallId,
parentToolName: parentToolCall.name,
subAgentContentLength: updatedToolCall.subAgentContent?.length,
subAgentBlocksCount: updatedToolCall.subAgentBlocks?.length,
subAgentToolCallsCount: updatedToolCall.subAgentToolCalls?.length,
})
const updatedMap = { ...toolCallsById, [parentToolCallId]: updatedToolCall }
set({ toolCallsById: updatedMap })
let foundInContentBlocks = false
for (let i = 0; i < context.contentBlocks.length; i++) {
const b = context.contentBlocks[i] as any
if (b.type === 'tool_call' && b.toolCall?.id === parentToolCallId) {
context.contentBlocks[i] = { ...b, toolCall: updatedToolCall }
foundInContentBlocks = true
break
}
}
if (!foundInContentBlocks) {
logger.warn('[SubAgent] Parent tool call not found in contentBlocks', {
parentToolCallId,
contentBlocksCount: context.contentBlocks.length,
toolCallBlockIds: context.contentBlocks
.filter((b: any) => b.type === 'tool_call')
.map((b: any) => b.toolCall?.id),
})
}
updateStreamingMessage(set, context)
}
export const subAgentSSEHandlers: Record<string, SSEHandler> = {
start: () => {
// Subagent start event - no action needed, parent is already tracked from subagent_start
},
content: (data, context, get, set) => {
const parentToolCallId = context.subAgentParentToolCallId
logger.info('[SubAgent] content event', {
parentToolCallId,
hasData: !!data.data,
dataPreview: typeof data.data === 'string' ? data.data.substring(0, 50) : null,
})
if (!parentToolCallId || !data.data) {
logger.warn('[SubAgent] content missing parentToolCallId or data', {
parentToolCallId,
hasData: !!data.data,
})
return
}
appendSubAgentContent(context, parentToolCallId, data.data)
updateToolCallWithSubAgentData(context, get, set, parentToolCallId)
},
reasoning: (data, context, get, set) => {
const parentToolCallId = context.subAgentParentToolCallId
const phase = data?.phase || data?.data?.phase
if (!parentToolCallId) return
if (phase === 'start' || phase === 'end') return
const chunk = typeof data?.data === 'string' ? data.data : data?.content || ''
if (!chunk) return
appendSubAgentContent(context, parentToolCallId, chunk)
updateToolCallWithSubAgentData(context, get, set, parentToolCallId)
},
tool_generating: () => {
// Tool generating event - no action needed, we'll handle the actual tool_call
},
tool_call: async (data, context, get, set) => {
const parentToolCallId = context.subAgentParentToolCallId
if (!parentToolCallId) return
const toolData = data?.data ?? {}
const id: string | undefined = toolData.id || data?.toolCallId
const name: string | undefined = toolData.name || data?.toolName
if (!id || !name) return
const isPartial = toolData.partial === true
let args = toolData.arguments || toolData.input || data?.arguments || data?.input
if (typeof args === 'string') {
try {
args = JSON.parse(args)
} catch {
logger.warn('[SubAgent] Failed to parse arguments string', { args })
}
}
logger.info('[SubAgent] tool_call received', {
id,
name,
hasArgs: !!args,
argsKeys: args ? Object.keys(args) : [],
toolDataKeys: Object.keys(toolData),
dataKeys: Object.keys(data ?? {}),
})
if (!context.subAgentToolCalls[parentToolCallId]) {
context.subAgentToolCalls[parentToolCallId] = []
}
if (!context.subAgentBlocks[parentToolCallId]) {
context.subAgentBlocks[parentToolCallId] = []
}
const existingIndex = context.subAgentToolCalls[parentToolCallId].findIndex((tc) => tc.id === id)
const subAgentToolCall: CopilotToolCall = {
id,
name,
state: ClientToolCallState.pending,
...(args ? { params: args } : {}),
display: resolveToolDisplay(name, ClientToolCallState.pending, id, args),
}
if (existingIndex >= 0) {
context.subAgentToolCalls[parentToolCallId][existingIndex] = subAgentToolCall
} else {
context.subAgentToolCalls[parentToolCallId].push(subAgentToolCall)
context.subAgentBlocks[parentToolCallId].push({
type: 'subagent_tool_call',
toolCall: subAgentToolCall,
timestamp: Date.now(),
})
}
const { toolCallsById } = get()
const updated = { ...toolCallsById, [id]: subAgentToolCall }
set({ toolCallsById: updated })
updateToolCallWithSubAgentData(context, get, set, parentToolCallId)
if (isPartial) {
return
}
},
tool_result: (data, context, get, set) => {
const parentToolCallId = context.subAgentParentToolCallId
if (!parentToolCallId) return
const toolCallId: string | undefined = data?.toolCallId || data?.data?.id
const success: boolean | undefined = data?.success !== false
if (!toolCallId) return
if (!context.subAgentToolCalls[parentToolCallId]) return
if (!context.subAgentBlocks[parentToolCallId]) return
const targetState = success ? ClientToolCallState.success : ClientToolCallState.error
const existingIndex = context.subAgentToolCalls[parentToolCallId].findIndex(
(tc) => tc.id === toolCallId
)
if (existingIndex >= 0) {
const existing = context.subAgentToolCalls[parentToolCallId][existingIndex]
const updatedSubAgentToolCall = {
...existing,
state: targetState,
display: resolveToolDisplay(existing.name, targetState, toolCallId, existing.params),
}
context.subAgentToolCalls[parentToolCallId][existingIndex] = updatedSubAgentToolCall
for (const block of context.subAgentBlocks[parentToolCallId]) {
if (block.type === 'subagent_tool_call' && block.toolCall?.id === toolCallId) {
block.toolCall = updatedSubAgentToolCall
break
}
}
const { toolCallsById } = get()
if (toolCallsById[toolCallId]) {
const updatedMap = {
...toolCallsById,
[toolCallId]: updatedSubAgentToolCall,
}
set({ toolCallsById: updatedMap })
logger.info('[SubAgent] Updated subagent tool call state in toolCallsById', {
toolCallId,
name: existing.name,
state: targetState,
})
}
}
updateToolCallWithSubAgentData(context, get, set, parentToolCallId)
},
done: (_data, context, get, set) => {
const parentToolCallId = context.subAgentParentToolCallId
if (!parentToolCallId) return
updateToolCallWithSubAgentData(context, get, set, parentToolCallId)
},
}
export async function applySseEvent(
data: any,
context: StreamingContext,
get: () => CopilotStore,
set: (next: Partial<CopilotStore> | ((state: CopilotStore) => Partial<CopilotStore>)) => void
): Promise<boolean> {
const normalizedEvent = normalizeSseEvent(data)
if (shouldSkipToolCallEvent(normalizedEvent) || shouldSkipToolResultEvent(normalizedEvent)) {
return true
}
data = normalizedEvent
if (data.type === 'subagent_start') {
const toolCallId = data.data?.tool_call_id
if (toolCallId) {
context.subAgentParentToolCallId = toolCallId
const { toolCallsById } = get()
const parentToolCall = toolCallsById[toolCallId]
if (parentToolCall) {
const updatedToolCall: CopilotToolCall = {
...parentToolCall,
subAgentStreaming: true,
}
const updatedMap = { ...toolCallsById, [toolCallId]: updatedToolCall }
set({ toolCallsById: updatedMap })
}
logger.info('[SSE] Subagent session started', {
subagent: data.subagent,
parentToolCallId: toolCallId,
})
}
return true
}
if (data.type === 'subagent_end') {
const parentToolCallId = context.subAgentParentToolCallId
if (parentToolCallId) {
const { toolCallsById } = get()
const parentToolCall = toolCallsById[parentToolCallId]
if (parentToolCall) {
const updatedToolCall: CopilotToolCall = {
...parentToolCall,
subAgentContent: context.subAgentContent[parentToolCallId] || '',
subAgentToolCalls: context.subAgentToolCalls[parentToolCallId] ?? [],
subAgentBlocks: context.subAgentBlocks[parentToolCallId] ?? [],
subAgentStreaming: false,
}
const updatedMap = { ...toolCallsById, [parentToolCallId]: updatedToolCall }
set({ toolCallsById: updatedMap })
logger.info('[SSE] Subagent session ended', {
subagent: data.subagent,
parentToolCallId,
contentLength: context.subAgentContent[parentToolCallId]?.length || 0,
toolCallCount: context.subAgentToolCalls[parentToolCallId]?.length || 0,
})
}
}
context.subAgentParentToolCallId = undefined
return true
}
if (data.subagent) {
const parentToolCallId = context.subAgentParentToolCallId
if (!parentToolCallId) {
logger.warn('[SSE] Subagent event without parent tool call ID', {
type: data.type,
subagent: data.subagent,
})
return true
}
logger.info('[SSE] Processing subagent event', {
type: data.type,
subagent: data.subagent,
parentToolCallId,
hasHandler: !!subAgentSSEHandlers[data.type],
})
const subAgentHandler = subAgentSSEHandlers[data.type]
if (subAgentHandler) {
await subAgentHandler(data, context, get, set)
} else {
logger.warn('[SSE] No handler for subagent event type', { type: data.type })
}
return !context.streamComplete
}
const handler = sseHandlers[data.type] || sseHandlers.default
await handler(data, context, get, set)
return !context.streamComplete
}

View File

@@ -0,0 +1,23 @@
import type { CopilotToolCall } from '@/stores/panel/copilot/types'
export interface StreamingContext {
messageId: string
accumulatedContent: string
contentBlocks: any[]
currentTextBlock: any | null
isInThinkingBlock: boolean
currentThinkingBlock: any | null
isInDesignWorkflowBlock: boolean
designWorkflowContent: string
pendingContent: string
newChatId?: string
doneEventCount: number
streamComplete?: boolean
wasAborted?: boolean
suppressContinueOption?: boolean
subAgentParentToolCallId?: string
subAgentContent: Record<string, string>
subAgentToolCalls: Record<string, CopilotToolCall[]>
subAgentBlocks: Record<string, any[]>
suppressStreamingUpdates?: boolean
}

View File

@@ -0,0 +1,128 @@
import { createLogger } from '@sim/logger'
import { mergeSubblockState } from '@/stores/workflows/utils'
import { useWorkflowStore } from '@/stores/workflows/workflow/store'
import type { WorkflowState } from '@/stores/workflows/workflow/types'
import type { CopilotMessage, CopilotStore, CopilotToolCall } from '@/stores/panel/copilot/types'
const logger = createLogger('CopilotMessageCheckpoints')
export function buildCheckpointWorkflowState(workflowId: string): WorkflowState | null {
const rawState = useWorkflowStore.getState().getWorkflowState()
if (!rawState) return null
const blocksWithSubblockValues = mergeSubblockState(rawState.blocks, workflowId)
const filteredBlocks = Object.entries(blocksWithSubblockValues).reduce(
(acc, [blockId, block]) => {
if (block?.type && block?.name) {
acc[blockId] = {
...block,
id: block.id || blockId,
enabled: block.enabled !== undefined ? block.enabled : true,
horizontalHandles: block.horizontalHandles !== undefined ? block.horizontalHandles : true,
height: block.height !== undefined ? block.height : 90,
subBlocks: block.subBlocks ?? {},
outputs: block.outputs ?? {},
data: block.data ?? {},
position: block.position || { x: 0, y: 0 },
}
}
return acc
},
{} as WorkflowState['blocks']
)
return {
blocks: filteredBlocks,
edges: rawState.edges ?? [],
loops: rawState.loops ?? {},
parallels: rawState.parallels ?? {},
lastSaved: rawState.lastSaved || Date.now(),
deploymentStatuses: rawState.deploymentStatuses ?? {},
}
}
export async function saveMessageCheckpoint(
messageId: string,
get: () => CopilotStore,
set: (partial: Partial<CopilotStore> | ((state: CopilotStore) => Partial<CopilotStore>)) => void
): Promise<boolean> {
const { workflowId, currentChat, messageSnapshots, messageCheckpoints } = get()
if (!workflowId || !currentChat?.id) return false
const snapshot = messageSnapshots[messageId]
if (!snapshot) return false
const nextSnapshots = { ...messageSnapshots }
delete nextSnapshots[messageId]
set({ messageSnapshots: nextSnapshots })
try {
const response = await fetch('/api/copilot/checkpoints', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
workflowId,
chatId: currentChat.id,
messageId,
workflowState: JSON.stringify(snapshot),
}),
})
if (!response.ok) {
throw new Error(`Failed to create checkpoint: ${response.statusText}`)
}
const result = await response.json()
const newCheckpoint = result.checkpoint
if (newCheckpoint) {
const existingCheckpoints = messageCheckpoints[messageId] ?? []
const updatedCheckpoints = {
...messageCheckpoints,
[messageId]: [newCheckpoint, ...existingCheckpoints],
}
set({ messageCheckpoints: updatedCheckpoints })
}
return true
} catch (error) {
logger.error('Failed to create checkpoint from snapshot:', error)
return false
}
}
export function extractToolCallsRecursively(
toolCall: CopilotToolCall,
map: Record<string, CopilotToolCall>
): void {
if (!toolCall?.id) return
map[toolCall.id] = toolCall
if (Array.isArray(toolCall.subAgentBlocks)) {
for (const block of toolCall.subAgentBlocks) {
if (block?.type === 'subagent_tool_call' && block.toolCall?.id) {
extractToolCallsRecursively(block.toolCall, map)
}
}
}
if (Array.isArray(toolCall.subAgentToolCalls)) {
for (const subTc of toolCall.subAgentToolCalls) {
extractToolCallsRecursively(subTc, map)
}
}
}
export function buildToolCallsById(messages: CopilotMessage[]): Record<string, CopilotToolCall> {
const toolCallsById: Record<string, CopilotToolCall> = {}
for (const msg of messages) {
if (msg.contentBlocks) {
for (const block of msg.contentBlocks as any[]) {
if (block?.type === 'tool_call' && block.toolCall?.id) {
extractToolCallsRecursively(block.toolCall, toolCallsById)
}
}
}
}
return toolCallsById
}

View File

@@ -0,0 +1,28 @@
export function maskCredentialIdsInValue(value: any, credentialIds: Set<string>): any {
if (!value || credentialIds.size === 0) return value
if (typeof value === 'string') {
let masked = value
const sortedIds = Array.from(credentialIds).sort((a, b) => b.length - a.length)
for (const id of sortedIds) {
if (id && masked.includes(id)) {
masked = masked.split(id).join('••••••••')
}
}
return masked
}
if (Array.isArray(value)) {
return value.map((item) => maskCredentialIdsInValue(item, credentialIds))
}
if (typeof value === 'object') {
const masked: any = {}
for (const key of Object.keys(value)) {
masked[key] = maskCredentialIdsInValue(value[key], credentialIds)
}
return masked
}
return value
}

View File

@@ -0,0 +1,3 @@
export * from './credential-masking'
export * from './serialization'
export * from './checkpoints'

View File

@@ -0,0 +1,169 @@
import { createLogger } from '@sim/logger'
import type { CopilotMessage } from '@/stores/panel/copilot/types'
import { maskCredentialIdsInValue } from './credential-masking'
const logger = createLogger('CopilotMessageSerialization')
export function clearStreamingFlags(toolCall: any): void {
if (!toolCall) return
toolCall.subAgentStreaming = false
if (Array.isArray(toolCall.subAgentBlocks)) {
for (const block of toolCall.subAgentBlocks) {
if (block?.type === 'subagent_tool_call' && block.toolCall) {
clearStreamingFlags(block.toolCall)
}
}
}
if (Array.isArray(toolCall.subAgentToolCalls)) {
for (const subTc of toolCall.subAgentToolCalls) {
clearStreamingFlags(subTc)
}
}
}
export function normalizeMessagesForUI(messages: CopilotMessage[]): CopilotMessage[] {
try {
for (const message of messages) {
if (message.role === 'assistant') {
logger.info('[normalizeMessagesForUI] Loading assistant message', {
id: message.id,
hasContent: !!message.content?.trim(),
contentBlockCount: message.contentBlocks?.length || 0,
contentBlockTypes: (message.contentBlocks as any[])?.map((b) => b?.type) ?? [],
})
}
}
for (const message of messages) {
if (message.contentBlocks) {
for (const block of message.contentBlocks as any[]) {
if (block?.type === 'tool_call' && block.toolCall) {
clearStreamingFlags(block.toolCall)
}
}
}
if (message.toolCalls) {
for (const toolCall of message.toolCalls) {
clearStreamingFlags(toolCall)
}
}
}
return messages
} catch {
return messages
}
}
export function deepClone<T>(obj: T): T {
try {
const json = JSON.stringify(obj)
if (!json || json === 'undefined') {
logger.warn('[deepClone] JSON.stringify returned empty for object', {
type: typeof obj,
isArray: Array.isArray(obj),
length: Array.isArray(obj) ? obj.length : undefined,
})
return obj
}
const parsed = JSON.parse(json)
if (Array.isArray(obj) && (!Array.isArray(parsed) || parsed.length !== obj.length)) {
logger.warn('[deepClone] Array clone mismatch', {
originalLength: obj.length,
clonedLength: Array.isArray(parsed) ? parsed.length : 'not array',
})
}
return parsed
} catch (err) {
logger.error('[deepClone] Failed to clone object', {
error: String(err),
type: typeof obj,
isArray: Array.isArray(obj),
})
return obj
}
}
export function serializeMessagesForDB(
messages: CopilotMessage[],
credentialIds: Set<string>
): any[] {
const result = messages
.map((msg) => {
let timestamp: string = msg.timestamp
if (typeof timestamp !== 'string') {
const ts = timestamp as any
timestamp = ts instanceof Date ? ts.toISOString() : new Date().toISOString()
}
const serialized: any = {
id: msg.id,
role: msg.role,
content: msg.content || '',
timestamp,
}
if (Array.isArray(msg.contentBlocks) && msg.contentBlocks.length > 0) {
serialized.contentBlocks = deepClone(msg.contentBlocks)
}
if (Array.isArray((msg as any).toolCalls) && (msg as any).toolCalls.length > 0) {
serialized.toolCalls = deepClone((msg as any).toolCalls)
}
if (Array.isArray(msg.fileAttachments) && msg.fileAttachments.length > 0) {
serialized.fileAttachments = deepClone(msg.fileAttachments)
}
if (Array.isArray((msg as any).contexts) && (msg as any).contexts.length > 0) {
serialized.contexts = deepClone((msg as any).contexts)
}
if (Array.isArray(msg.citations) && msg.citations.length > 0) {
serialized.citations = deepClone(msg.citations)
}
if (msg.errorType) {
serialized.errorType = msg.errorType
}
return maskCredentialIdsInValue(serialized, credentialIds)
})
.filter((msg) => {
if (msg.role === 'assistant') {
const hasContent = typeof msg.content === 'string' && msg.content.trim().length > 0
const hasTools = Array.isArray(msg.toolCalls) && msg.toolCalls.length > 0
const hasBlocks = Array.isArray(msg.contentBlocks) && msg.contentBlocks.length > 0
return hasContent || hasTools || hasBlocks
}
return true
})
for (const msg of messages) {
if (msg.role === 'assistant') {
logger.info('[serializeMessagesForDB] Input assistant message', {
id: msg.id,
hasContent: !!msg.content?.trim(),
contentBlockCount: msg.contentBlocks?.length || 0,
contentBlockTypes: (msg.contentBlocks as any[])?.map((b) => b?.type) ?? [],
})
}
}
logger.info('[serializeMessagesForDB] Serialized messages', {
inputCount: messages.length,
outputCount: result.length,
sample:
result.length > 0
? {
role: result[result.length - 1].role,
hasContent: !!result[result.length - 1].content,
contentBlockCount: result[result.length - 1].contentBlocks?.length || 0,
toolCallCount: result[result.length - 1].toolCalls?.length || 0,
}
: null,
})
return result
}

View File

@@ -21,6 +21,19 @@ const logger = createLogger('CopilotSseHandlers')
// Normalization + dedupe helpers live in sse-utils to keep server/client in sync.
function inferToolSuccess(data: Record<string, any> | undefined): {
success: boolean
hasResultData: boolean
hasError: boolean
} {
const hasExplicitSuccess = data?.success !== undefined || data?.result?.success !== undefined
const explicitSuccess = data?.success ?? data?.result?.success
const hasResultData = data?.result !== undefined || data?.data !== undefined
const hasError = !!data?.error || !!data?.result?.error
const success = hasExplicitSuccess ? !!explicitSuccess : hasResultData && !hasError
return { success, hasResultData, hasError }
}
export type SSEHandler = (
event: SSEEvent,
context: StreamingContext,
@@ -47,14 +60,7 @@ export const sseHandlers: Record<string, SSEHandler> = {
const current = context.toolCalls.get(toolCallId)
if (!current) return
// Determine success: explicit success field, or if there's result data without explicit failure.
const hasExplicitSuccess = data?.success !== undefined || data?.result?.success !== undefined
const explicitSuccess = data?.success ?? data?.result?.success
const hasResultData = data?.result !== undefined || data?.data !== undefined
const hasError = !!data?.error || !!data?.result?.error
// If explicitly set, use that; otherwise infer from data presence.
const success = hasExplicitSuccess ? !!explicitSuccess : hasResultData && !hasError
const { success, hasResultData, hasError } = inferToolSuccess(data)
current.status = success ? 'success' : 'error'
current.endTime = Date.now()
@@ -344,12 +350,7 @@ export const subAgentHandlers: Record<string, SSEHandler> = {
// Also update in main toolCalls (where we added it for execution).
const mainToolCall = context.toolCalls.get(toolCallId)
// Use same success inference logic as main handler.
const hasExplicitSuccess = data?.success !== undefined || data?.result?.success !== undefined
const explicitSuccess = data?.success ?? data?.result?.success
const hasResultData = data?.result !== undefined || data?.data !== undefined
const hasError = !!data?.error || !!data?.result?.error
const success = hasExplicitSuccess ? !!explicitSuccess : hasResultData && !hasError
const { success, hasResultData, hasError } = inferToolSuccess(data)
const status = success ? 'success' : 'error'
const endTime = Date.now()

View File

@@ -74,27 +74,34 @@ const SERVER_TOOLS = new Set<string>([
'knowledge_base',
])
const SIM_WORKFLOW_TOOLS = new Set<string>([
'get_user_workflow',
'get_workflow_from_name',
'list_user_workflows',
'list_user_workspaces',
'list_folders',
'create_workflow',
'create_folder',
'get_workflow_data',
'get_block_outputs',
'get_block_upstream_references',
'run_workflow',
'set_global_workflow_variables',
'deploy_api',
'deploy_chat',
'deploy_mcp',
'redeploy',
'check_deployment_status',
'list_workspace_mcp_servers',
'create_workspace_mcp_server',
])
const SIM_WORKFLOW_TOOL_HANDLERS: Record<
string,
(params: Record<string, unknown>, context: ExecutionContext) => Promise<ToolCallResult>
> = {
get_user_workflow: (p, c) => executeGetUserWorkflow(p as GetUserWorkflowParams, c),
get_workflow_from_name: (p, c) => executeGetWorkflowFromName(p as GetWorkflowFromNameParams, c),
list_user_workflows: (p, c) => executeListUserWorkflows(p as ListUserWorkflowsParams, c),
list_user_workspaces: (_p, c) => executeListUserWorkspaces(c),
list_folders: (p, c) => executeListFolders(p as ListFoldersParams, c),
create_workflow: (p, c) => executeCreateWorkflow(p as CreateWorkflowParams, c),
create_folder: (p, c) => executeCreateFolder(p as CreateFolderParams, c),
get_workflow_data: (p, c) => executeGetWorkflowData(p as GetWorkflowDataParams, c),
get_block_outputs: (p, c) => executeGetBlockOutputs(p as GetBlockOutputsParams, c),
get_block_upstream_references: (p, c) =>
executeGetBlockUpstreamReferences(p as unknown as GetBlockUpstreamReferencesParams, c),
run_workflow: (p, c) => executeRunWorkflow(p as RunWorkflowParams, c),
set_global_workflow_variables: (p, c) =>
executeSetGlobalWorkflowVariables(p as SetGlobalWorkflowVariablesParams, c),
deploy_api: (p, c) => executeDeployApi(p as DeployApiParams, c),
deploy_chat: (p, c) => executeDeployChat(p as DeployChatParams, c),
deploy_mcp: (p, c) => executeDeployMcp(p as DeployMcpParams, c),
redeploy: (_p, c) => executeRedeploy(c),
check_deployment_status: (p, c) => executeCheckDeploymentStatus(p as CheckDeploymentStatusParams, c),
list_workspace_mcp_servers: (p, c) =>
executeListWorkspaceMcpServers(p as ListWorkspaceMcpServersParams, c),
create_workspace_mcp_server: (p, c) =>
executeCreateWorkspaceMcpServer(p as CreateWorkspaceMcpServerParams, c),
}
/**
* Execute a tool server-side without calling internal routes.
@@ -110,7 +117,7 @@ export async function executeToolServerSide(
return executeServerToolDirect(toolName, toolCall.params || {}, context)
}
if (SIM_WORKFLOW_TOOLS.has(toolName)) {
if (toolName in SIM_WORKFLOW_TOOL_HANDLERS) {
return executeSimWorkflowTool(toolName, toolCall.params || {}, context)
}
@@ -158,51 +165,12 @@ async function executeServerToolDirect(
async function executeSimWorkflowTool(
toolName: string,
params: Record<string, any>,
params: Record<string, unknown>,
context: ExecutionContext
): Promise<ToolCallResult> {
switch (toolName) {
case 'get_user_workflow':
return executeGetUserWorkflow(params as GetUserWorkflowParams, context)
case 'get_workflow_from_name':
return executeGetWorkflowFromName(params as GetWorkflowFromNameParams, context)
case 'list_user_workflows':
return executeListUserWorkflows(params as ListUserWorkflowsParams, context)
case 'list_user_workspaces':
return executeListUserWorkspaces(context)
case 'list_folders':
return executeListFolders(params as ListFoldersParams, context)
case 'create_workflow':
return executeCreateWorkflow(params as CreateWorkflowParams, context)
case 'create_folder':
return executeCreateFolder(params as CreateFolderParams, context)
case 'get_workflow_data':
return executeGetWorkflowData(params as GetWorkflowDataParams, context)
case 'get_block_outputs':
return executeGetBlockOutputs(params as GetBlockOutputsParams, context)
case 'get_block_upstream_references':
return executeGetBlockUpstreamReferences(params as GetBlockUpstreamReferencesParams, context)
case 'run_workflow':
return executeRunWorkflow(params as RunWorkflowParams, context)
case 'set_global_workflow_variables':
return executeSetGlobalWorkflowVariables(params as SetGlobalWorkflowVariablesParams, context)
case 'deploy_api':
return executeDeployApi(params as DeployApiParams, context)
case 'deploy_chat':
return executeDeployChat(params as DeployChatParams, context)
case 'deploy_mcp':
return executeDeployMcp(params as DeployMcpParams, context)
case 'redeploy':
return executeRedeploy(context)
case 'check_deployment_status':
return executeCheckDeploymentStatus(params as CheckDeploymentStatusParams, context)
case 'list_workspace_mcp_servers':
return executeListWorkspaceMcpServers(params as ListWorkspaceMcpServersParams, context)
case 'create_workspace_mcp_server':
return executeCreateWorkspaceMcpServer(params as CreateWorkspaceMcpServerParams, context)
default:
return { success: false, error: `Unsupported workflow tool: ${toolName}` }
}
const handler = SIM_WORKFLOW_TOOL_HANDLERS[toolName]
if (!handler) return { success: false, error: `Unsupported workflow tool: ${toolName}` }
return handler(params, context)
}
/**

View File

@@ -0,0 +1,162 @@
import { Loader2 } from 'lucide-react'
import {
ClientToolCallState,
type ClientToolDisplay,
TOOL_DISPLAY_REGISTRY,
} from '@/lib/copilot/tools/client/tool-display-registry'
import type { CopilotStore } from '@/stores/panel/copilot/types'
export function resolveToolDisplay(
toolName: string | undefined,
state: ClientToolCallState,
_toolCallId?: string,
params?: Record<string, any>
): ClientToolDisplay | undefined {
if (!toolName) return undefined
const entry = TOOL_DISPLAY_REGISTRY[toolName]
if (!entry) return humanizedFallback(toolName, state)
if (entry.uiConfig?.dynamicText && params) {
const dynamicText = entry.uiConfig.dynamicText(params, state)
const stateDisplay = entry.displayNames[state]
if (dynamicText && stateDisplay?.icon) {
return { text: dynamicText, icon: stateDisplay.icon }
}
}
const display = entry.displayNames[state]
if (display?.text || display?.icon) return display
const fallbackOrder = [
ClientToolCallState.generating,
ClientToolCallState.executing,
ClientToolCallState.success,
]
for (const fallbackState of fallbackOrder) {
const fallback = entry.displayNames[fallbackState]
if (fallback?.text || fallback?.icon) return fallback
}
return humanizedFallback(toolName, state)
}
export function humanizedFallback(
toolName: string,
state: ClientToolCallState
): ClientToolDisplay | undefined {
const formattedName = toolName.replace(/_/g, ' ').replace(/\b\w/g, (c) => c.toUpperCase())
const stateVerb =
state === ClientToolCallState.success
? 'Executed'
: state === ClientToolCallState.error
? 'Failed'
: state === ClientToolCallState.rejected || state === ClientToolCallState.aborted
? 'Skipped'
: 'Executing'
return { text: `${stateVerb} ${formattedName}`, icon: Loader2 }
}
export function isRejectedState(state: string): boolean {
return state === 'rejected'
}
export function isReviewState(state: string): boolean {
return state === 'review'
}
export function isBackgroundState(state: string): boolean {
return state === 'background'
}
export function isTerminalState(state: string): boolean {
return (
state === ClientToolCallState.success ||
state === ClientToolCallState.error ||
state === ClientToolCallState.rejected ||
state === ClientToolCallState.aborted ||
isReviewState(state) ||
isBackgroundState(state)
)
}
export function abortAllInProgressTools(
set: any,
get: () => CopilotStore
) {
try {
const { toolCallsById, messages } = get()
const updatedMap = { ...toolCallsById }
const abortedIds = new Set<string>()
let hasUpdates = false
for (const [id, tc] of Object.entries(toolCallsById)) {
const st = tc.state as any
const isTerminal =
st === ClientToolCallState.success ||
st === ClientToolCallState.error ||
st === ClientToolCallState.rejected ||
st === ClientToolCallState.aborted
if (!isTerminal || isReviewState(st)) {
abortedIds.add(id)
updatedMap[id] = {
...tc,
state: ClientToolCallState.aborted,
subAgentStreaming: false,
display: resolveToolDisplay(tc.name, ClientToolCallState.aborted, id, (tc as any).params),
}
hasUpdates = true
} else if (tc.subAgentStreaming) {
updatedMap[id] = {
...tc,
subAgentStreaming: false,
}
hasUpdates = true
}
}
if (abortedIds.size > 0 || hasUpdates) {
set({ toolCallsById: updatedMap })
set((s: CopilotStore) => {
const msgs = [...s.messages]
for (let mi = msgs.length - 1; mi >= 0; mi--) {
const m = msgs[mi] as any
if (m.role !== 'assistant' || !Array.isArray(m.contentBlocks)) continue
let changed = false
const blocks = m.contentBlocks.map((b: any) => {
if (b?.type === 'tool_call' && b.toolCall?.id && abortedIds.has(b.toolCall.id)) {
changed = true
const prev = b.toolCall
return {
...b,
toolCall: {
...prev,
state: ClientToolCallState.aborted,
display: resolveToolDisplay(
prev?.name,
ClientToolCallState.aborted,
prev?.id,
prev?.params
),
},
}
}
return b
})
if (changed) {
msgs[mi] = { ...m, contentBlocks: blocks }
break
}
}
return { messages: msgs }
})
}
} catch {}
}
export function stripTodoTags(text: string): string {
if (!text) return text
return text
.replace(/<marktodo>[\s\S]*?<\/marktodo>/g, '')
.replace(/<checkofftodo>[\s\S]*?<\/checkofftodo>/g, '')
.replace(/<design_workflow>[\s\S]*?<\/design_workflow>/g, '')
.replace(/[ \t]+\n/g, '\n')
.replace(/\n{2,}/g, '\n')
}

View File

@@ -1,4 +1,3 @@
// @ts-nocheck
import type { LucideIcon } from 'lucide-react'
import {
Blocks,
@@ -70,7 +69,7 @@ export interface ClientToolDisplay {
}
export type DynamicTextFormatter = (
params: Record<string, unknown>,
params: Record<string, any>,
state: ClientToolCallState
) => string | undefined
@@ -101,6 +100,9 @@ interface ToolMetadata {
subagent?: {
streamingLabel?: string
completedLabel?: string
shouldCollapse?: boolean
outputArtifacts?: string[]
hideThinkingText?: boolean
}
interrupt?: any
customRenderer?: string
@@ -115,6 +117,21 @@ interface ToolDisplayEntry {
uiConfig?: ToolUIConfig
}
type WorkflowDataType = 'global_variables' | 'custom_tools' | 'mcp_tools' | 'files'
type NavigationDestination = 'workflow' | 'logs' | 'templates' | 'vector_db' | 'settings'
function formatDuration(seconds: number): string {
if (seconds < 60) return `${Math.round(seconds)}s`
const mins = Math.floor(seconds / 60)
const secs = Math.round(seconds % 60)
if (mins < 60) return secs > 0 ? `${mins}m ${secs}s` : `${mins}m`
const hours = Math.floor(mins / 60)
const remMins = mins % 60
if (remMins > 0) return `${hours}h ${remMins}m`
return `${hours}h`
}
function toUiConfig(metadata?: ToolMetadata): ToolUIConfig | undefined {
const legacy = metadata?.uiConfig
const subagent = legacy?.subagent
@@ -1197,7 +1214,7 @@ const META_make_api_request: ToolMetadata = {
{ key: 'method', label: 'Method', width: '26%', editable: true, mono: true },
{ key: 'url', label: 'Endpoint', width: '74%', editable: true, mono: true },
],
extractRows: (params) => {
extractRows: (params: Record<string, any>): Array<[string, ...any[]]> => {
return [['request', (params.method || 'GET').toUpperCase(), params.url || '']]
},
},
@@ -1665,7 +1682,7 @@ const META_run_workflow: ToolMetadata = {
{ key: 'input', label: 'Input', width: '36%' },
{ key: 'value', label: 'Value', width: '64%', editable: true, mono: true },
],
extractRows: (params) => {
extractRows: (params: Record<string, any>): Array<[string, ...any[]]> => {
let inputs = params.input || params.inputs || params.workflow_input
if (typeof inputs === 'string') {
try {
@@ -1952,7 +1969,7 @@ const META_set_environment_variables: ToolMetadata = {
{ key: 'name', label: 'Variable', width: '36%', editable: true },
{ key: 'value', label: 'Value', width: '64%', editable: true, mono: true },
],
extractRows: (params) => {
extractRows: (params: Record<string, any>): Array<[string, ...any[]]> => {
const variables = params.variables || {}
const entries = Array.isArray(variables)
? variables.map((v: any, i: number) => [String(i), v.name || `var_${i}`, v.value || ''])
@@ -2021,7 +2038,7 @@ const META_set_global_workflow_variables: ToolMetadata = {
{ key: 'name', label: 'Name', width: '40%', editable: true, mono: true },
{ key: 'value', label: 'Value', width: '60%', editable: true, mono: true },
],
extractRows: (params) => {
extractRows: (params: Record<string, any>): Array<[string, ...any[]]> => {
const operations = params.operations || []
return operations.map((op: any, idx: number) => [
String(idx),

File diff suppressed because it is too large Load Diff

View File

@@ -122,7 +122,6 @@ export interface CopilotState {
messages: CopilotMessage[]
workflowId: string | null
checkpoints: any[]
messageCheckpoints: Record<string, any[]>
messageSnapshots: Record<string, WorkflowState>
@@ -187,7 +186,6 @@ export interface CopilotActions {
setWorkflowId: (workflowId: string | null) => Promise<void>
validateCurrentChat: () => boolean
loadChats: (forceRefresh?: boolean) => Promise<void>
areChatsFresh: (workflowId: string) => boolean
selectChat: (chat: CopilotChat) => Promise<void>
createNewChat: () => Promise<void>
deleteChat: (chatId: string) => Promise<void>
@@ -214,10 +212,6 @@ export interface CopilotActions {
resumeActiveStream: () => Promise<boolean>
setToolCallState: (toolCall: any, newState: ClientToolCallState, options?: any) => void
updateToolCallParams: (toolCallId: string, params: Record<string, any>) => void
sendDocsMessage: (query: string, options?: { stream?: boolean; topK?: number }) => Promise<void>
saveChatMessages: (chatId: string) => Promise<void>
loadCheckpoints: (chatId: string) => Promise<void>
loadMessageCheckpoints: (chatId: string) => Promise<void>
revertToCheckpoint: (checkpointId: string) => Promise<void>
getCheckpointsForMessage: (messageId: string) => any[]
@@ -227,7 +221,6 @@ export interface CopilotActions {
clearError: () => void
clearSaveError: () => void
clearCheckpointError: () => void
retrySave: (chatId: string) => Promise<void>
cleanup: () => void
reset: () => void

View File

@@ -1,4 +1,10 @@
import { createLogger } from '@sim/logger'
declare global {
interface Window {
__skipDiffRecording?: boolean
}
}
import { create } from 'zustand'
import { devtools } from 'zustand/middleware'
import { stripWorkflowDiffMarkers, WorkflowDiffEngine } from '@/lib/workflows/diff'
@@ -21,6 +27,17 @@ import {
const logger = createLogger('WorkflowDiffStore')
const diffEngine = new WorkflowDiffEngine()
const RESET_DIFF_STATE = {
hasActiveDiff: false,
isShowingDiff: false,
isDiffReady: false,
baselineWorkflow: null,
baselineWorkflowId: null,
diffAnalysis: null,
diffMetadata: null,
diffError: null,
_triggerMessageId: null,
}
/**
* Detects when a diff contains no meaningful changes.
@@ -104,17 +121,7 @@ export const useWorkflowDiffStore = create<WorkflowDiffState & WorkflowDiffActio
if (isEmptyDiffAnalysis(diffAnalysisResult)) {
logger.info('No workflow diff detected; skipping diff view')
diffEngine.clearDiff()
batchedUpdate({
hasActiveDiff: false,
isShowingDiff: false,
isDiffReady: false,
baselineWorkflow: null,
baselineWorkflowId: null,
diffAnalysis: null,
diffMetadata: null,
diffError: null,
_triggerMessageId: null,
})
batchedUpdate(RESET_DIFF_STATE)
return
}
@@ -205,7 +212,7 @@ export const useWorkflowDiffStore = create<WorkflowDiffState & WorkflowDiffActio
})
// Emit event for undo/redo recording
if (!(window as any).__skipDiffRecording) {
if (!window.__skipDiffRecording) {
window.dispatchEvent(
new CustomEvent('record-diff-operation', {
detail: {
@@ -234,17 +241,7 @@ export const useWorkflowDiffStore = create<WorkflowDiffState & WorkflowDiffActio
diffEngine.clearDiff()
batchedUpdate({
hasActiveDiff: false,
isShowingDiff: false,
isDiffReady: false,
baselineWorkflow: null,
baselineWorkflowId: null,
diffAnalysis: null,
diffMetadata: null,
diffError: null,
_triggerMessageId: null,
})
batchedUpdate(RESET_DIFF_STATE)
},
toggleDiffView: () => {
@@ -301,17 +298,7 @@ export const useWorkflowDiffStore = create<WorkflowDiffState & WorkflowDiffActio
// Clear diff state FIRST to prevent flash of colors
// This must happen synchronously before applying the cleaned state
set({
hasActiveDiff: false,
isShowingDiff: false,
isDiffReady: false,
baselineWorkflow: null,
baselineWorkflowId: null,
diffAnalysis: null,
diffMetadata: null,
diffError: null,
_triggerMessageId: null,
})
set(RESET_DIFF_STATE)
// Clear the diff engine
diffEngine.clearDiff()
@@ -320,7 +307,7 @@ export const useWorkflowDiffStore = create<WorkflowDiffState & WorkflowDiffActio
applyWorkflowStateToStores(activeWorkflowId, stateToApply)
// Emit event for undo/redo recording (unless we're in an undo/redo operation)
if (!(window as any).__skipDiffRecording) {
if (!window.__skipDiffRecording) {
window.dispatchEvent(
new CustomEvent('record-diff-operation', {
detail: {
@@ -393,17 +380,7 @@ export const useWorkflowDiffStore = create<WorkflowDiffState & WorkflowDiffActio
const afterReject = cloneWorkflowState(baselineWorkflow)
// Clear diff state FIRST for instant UI feedback
set({
hasActiveDiff: false,
isShowingDiff: false,
isDiffReady: false,
baselineWorkflow: null,
baselineWorkflowId: null,
diffAnalysis: null,
diffMetadata: null,
diffError: null,
_triggerMessageId: null,
})
set(RESET_DIFF_STATE)
// Clear the diff engine
diffEngine.clearDiff()
@@ -412,7 +389,7 @@ export const useWorkflowDiffStore = create<WorkflowDiffState & WorkflowDiffActio
applyWorkflowStateToStores(baselineWorkflowId, baselineWorkflow)
// Emit event for undo/redo recording synchronously
if (!(window as any).__skipDiffRecording) {
if (!window.__skipDiffRecording) {
window.dispatchEvent(
new CustomEvent('record-diff-operation', {
detail: {