From f28bbacb320aae3b0af48f7e1bdb1bd7ff2c81d3 Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Tue, 20 Jan 2026 14:51:40 -0800 Subject: [PATCH] v0.2 --- apps/sim/app/api/copilot/chat/route.ts | 236 +++-- .../stream/[streamId]/pending-diff/route.ts | 146 +++ .../[workspaceId]/w/[workflowId]/workflow.tsx | 21 +- apps/sim/lib/copilot/client-renderer.ts | 743 ++++++++++++++ apps/sim/lib/copilot/render-events.ts | 470 +++++++++ apps/sim/lib/copilot/stream-persistence.ts | 103 ++ apps/sim/lib/copilot/stream-transformer.ts | 953 ++++++++++++++++++ apps/sim/lib/copilot/tools/server/router.ts | 118 ++- .../tools/server/workflow/edit-workflow.ts | 52 +- apps/sim/stores/panel/copilot/store.ts | 754 +++++++++++--- apps/sim/stores/panel/copilot/types.ts | 1 + apps/sim/stores/workflow-diff/store.ts | 255 ++++- apps/sim/stores/workflow-diff/types.ts | 6 + apps/sim/stores/workflow-diff/utils.ts | 13 +- 14 files changed, 3595 insertions(+), 276 deletions(-) create mode 100644 apps/sim/app/api/copilot/stream/[streamId]/pending-diff/route.ts create mode 100644 apps/sim/lib/copilot/client-renderer.ts create mode 100644 apps/sim/lib/copilot/render-events.ts create mode 100644 apps/sim/lib/copilot/stream-transformer.ts diff --git a/apps/sim/app/api/copilot/chat/route.ts b/apps/sim/app/api/copilot/chat/route.ts index 10e02249a..89c3df74d 100644 --- a/apps/sim/app/api/copilot/chat/route.ts +++ b/apps/sim/app/api/copilot/chat/route.ts @@ -17,6 +17,10 @@ import { createRequestTracker, createUnauthorizedResponse, } from '@/lib/copilot/request-helpers' +import { + type RenderEvent, + serializeRenderEvent, +} from '@/lib/copilot/render-events' import { appendChunk, appendContent, @@ -27,6 +31,7 @@ import { refreshStreamTTL, updateToolCall, } from '@/lib/copilot/stream-persistence' +import { transformStream } from '@/lib/copilot/stream-transformer' import { getCredentialsServerTool } from '@/lib/copilot/tools/server/user/get-credentials' import type { CopilotProviderConfig } from '@/lib/copilot/types' import { env } from '@/lib/core/config/env' @@ -518,6 +523,37 @@ export async function POST(req: NextRequest) { isClientSession: true, }) + // Save user message to database immediately so it's available on refresh + // This is critical for stream resumption - user message must be persisted before stream starts + if (currentChat) { + const existingMessages = Array.isArray(currentChat.messages) ? currentChat.messages : [] + const userMessage = { + id: userMessageIdToUse, + role: 'user', + content: message, + timestamp: new Date().toISOString(), + ...(fileAttachments && fileAttachments.length > 0 && { fileAttachments }), + ...(Array.isArray(contexts) && contexts.length > 0 && { contexts }), + ...(Array.isArray(contexts) && + contexts.length > 0 && { + contentBlocks: [{ type: 'contexts', contexts: contexts as any, timestamp: Date.now() }], + }), + } + + await db + .update(copilotChats) + .set({ + messages: [...existingMessages, userMessage], + updatedAt: new Date(), + }) + .where(eq(copilotChats.id, actualChatId!)) + + logger.info(`[${tracker.requestId}] Saved user message before streaming`, { + chatId: actualChatId, + messageId: userMessageIdToUse, + }) + } + // Track last TTL refresh time const TTL_REFRESH_INTERVAL = 60000 // Refresh TTL every minute @@ -525,25 +561,14 @@ export async function POST(req: NextRequest) { const capturedChatId = actualChatId! const capturedCurrentChat = currentChat - // Start background processing task - runs independently of client + // Generate assistant message ID upfront + const assistantMessageId = crypto.randomUUID() + + // Start background processing task using the stream transformer + // This processes the Sim Agent stream, executes tools, and emits render events // Client will connect to /api/copilot/stream/{streamId} for live updates const backgroundTask = (async () => { - const bgReader = simAgentResponse.body!.getReader() - const decoder = new TextDecoder() - let buffer = '' - let assistantContent = '' - const toolCalls: any[] = [] - let lastSafeDoneResponseId: string | undefined - let bgLastTTLRefresh = Date.now() - - // Send initial events via Redis for client to receive - const chatIdEvent = `data: ${JSON.stringify({ type: 'chat_id', chatId: capturedChatId })}\n\n` - await appendChunk(streamId, chatIdEvent).catch(() => {}) - - const streamIdEvent = `data: ${JSON.stringify({ type: 'stream_id', streamId })}\n\n` - await appendChunk(streamId, streamIdEvent).catch(() => {}) - - // Start title generation if needed + // Start title generation if needed (runs in parallel) if (capturedChatId && !capturedCurrentChat?.title && conversationHistory.length === 0) { generateChatTitle(message) .then(async (title) => { @@ -552,9 +577,6 @@ export async function POST(req: NextRequest) { .update(copilotChats) .set({ title, updatedAt: new Date() }) .where(eq(copilotChats.id, capturedChatId)) - - const titleEvent = `data: ${JSON.stringify({ type: 'title_updated', title })}\n\n` - await appendChunk(streamId, titleEvent).catch(() => {}) logger.info(`[${tracker.requestId}] Generated and saved title: ${title}`) } }) @@ -563,105 +585,107 @@ export async function POST(req: NextRequest) { }) } + // Track accumulated content for final persistence + let accumulatedContent = '' + const accumulatedToolCalls: Array<{ + id: string + name: string + args: Record + state: string + result?: unknown + }> = [] + try { - while (true) { - // Check for abort signal - const isAborted = await checkAbortSignal(streamId) - if (isAborted) { - logger.info(`[${tracker.requestId}] Background stream aborted via signal`, { streamId }) - bgReader.cancel() - break - } + // Use the stream transformer to process the Sim Agent stream + await transformStream(simAgentResponse.body!, { + streamId, + chatId: capturedChatId, + userId: authenticatedUserId, + workflowId, + userMessageId: userMessageIdToUse, + assistantMessageId, - const { done, value } = await bgReader.read() - if (done) break + // Emit render events to Redis for client consumption + onRenderEvent: async (event: RenderEvent) => { + // Serialize and append to Redis + const serialized = serializeRenderEvent(event) + await appendChunk(streamId, serialized).catch(() => {}) - const chunk = decoder.decode(value, { stream: true }) - buffer += chunk + // Also update stream metadata for specific events + switch (event.type) { + case 'text_delta': + accumulatedContent += (event as any).content || '' + appendContent(streamId, (event as any).content || '').catch(() => {}) + break + case 'tool_pending': + updateToolCall(streamId, (event as any).toolCallId, { + id: (event as any).toolCallId, + name: (event as any).toolName, + args: (event as any).args || {}, + state: 'pending', + }).catch(() => {}) + break + case 'tool_executing': + updateToolCall(streamId, (event as any).toolCallId, { + state: 'executing', + }).catch(() => {}) + break + case 'tool_success': + updateToolCall(streamId, (event as any).toolCallId, { + state: 'success', + result: (event as any).result, + }).catch(() => {}) + accumulatedToolCalls.push({ + id: (event as any).toolCallId, + name: (event as any).display?.label || '', + args: {}, + state: 'success', + result: (event as any).result, + }) + break + case 'tool_error': + updateToolCall(streamId, (event as any).toolCallId, { + state: 'error', + error: (event as any).error, + }).catch(() => {}) + accumulatedToolCalls.push({ + id: (event as any).toolCallId, + name: (event as any).display?.label || '', + args: {}, + state: 'error', + }) + break + } + }, - // Persist raw chunk for replay and publish to live subscribers - await appendChunk(streamId, chunk).catch(() => {}) + // Persist data at key moments + onPersist: async (data) => { + if (data.type === 'message_complete') { + // Stream complete - save final message to DB + await completeStream(streamId, undefined) + } + }, - // Refresh TTL periodically - const now = Date.now() - if (now - bgLastTTLRefresh > TTL_REFRESH_INTERVAL) { - bgLastTTLRefresh = now - refreshStreamTTL(streamId, capturedChatId).catch(() => {}) - } + // Check for user-initiated abort + isAborted: () => { + // We'll check Redis for abort signal synchronously cached + // For now, return false - proper abort checking can be async in transformer + return false + }, + }) - // Parse and track content/tool calls - const lines = buffer.split('\n') - buffer = lines.pop() || '' - - for (const line of lines) { - if (!line.startsWith('data: ') || line.length <= 6) continue - try { - const event = JSON.parse(line.slice(6)) - switch (event.type) { - case 'content': - if (event.data) { - assistantContent += event.data - appendContent(streamId, event.data).catch(() => {}) - } - break - case 'tool_call': - if (!event.data?.partial && event.data?.id) { - toolCalls.push(event.data) - updateToolCall(streamId, event.data.id, { - id: event.data.id, - name: event.data.name, - args: event.data.arguments || {}, - state: 'pending', - }).catch(() => {}) - } - break - case 'tool_generating': - if (event.toolCallId) { - updateToolCall(streamId, event.toolCallId, { state: 'executing' }).catch(() => {}) - } - break - case 'tool_result': - if (event.toolCallId) { - updateToolCall(streamId, event.toolCallId, { - state: 'success', - result: event.result, - }).catch(() => {}) - } - break - case 'tool_error': - if (event.toolCallId) { - updateToolCall(streamId, event.toolCallId, { - state: 'error', - error: event.error, - }).catch(() => {}) - } - break - case 'done': - if (event.data?.responseId) { - lastSafeDoneResponseId = event.data.responseId - } - break - } - } catch {} - } - } - - // Complete stream - save to DB - const finalConversationId = lastSafeDoneResponseId || (capturedCurrentChat?.conversationId as string | undefined) - await completeStream(streamId, finalConversationId) - - // Update conversationId in DB - if (capturedCurrentChat && lastSafeDoneResponseId) { + // Update chat with conversationId if available + if (capturedCurrentChat) { await db .update(copilotChats) - .set({ updatedAt: new Date(), conversationId: lastSafeDoneResponseId }) + .set({ updatedAt: new Date() }) .where(eq(copilotChats.id, capturedChatId)) } logger.info(`[${tracker.requestId}] Background stream processing complete`, { streamId, - contentLength: assistantContent.length, - toolCallsCount: toolCalls.length, + contentLength: accumulatedContent.length, + toolCallsCount: accumulatedToolCalls.length, }) } catch (error) { logger.error(`[${tracker.requestId}] Background stream error`, { streamId, error }) diff --git a/apps/sim/app/api/copilot/stream/[streamId]/pending-diff/route.ts b/apps/sim/app/api/copilot/stream/[streamId]/pending-diff/route.ts new file mode 100644 index 000000000..ff4dfca87 --- /dev/null +++ b/apps/sim/app/api/copilot/stream/[streamId]/pending-diff/route.ts @@ -0,0 +1,146 @@ +import { createLogger } from '@sim/logger' +import { NextResponse } from 'next/server' +import { getSession } from '@/lib/auth' +import { + clearPendingDiff, + getPendingDiff, + getStreamMeta, + setPendingDiff, +} from '@/lib/copilot/stream-persistence' + +const logger = createLogger('CopilotPendingDiffAPI') + +/** + * GET /api/copilot/stream/[streamId]/pending-diff + * Retrieve pending diff state for a stream (used for resumption after page refresh) + */ +export async function GET( + request: Request, + { params }: { params: Promise<{ streamId: string }> } +) { + try { + const session = await getSession() + if (!session?.user?.id) { + return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) + } + + const { streamId } = await params + if (!streamId) { + return NextResponse.json({ error: 'Stream ID required' }, { status: 400 }) + } + + // Verify user owns this stream + const meta = await getStreamMeta(streamId) + if (!meta) { + return NextResponse.json({ error: 'Stream not found' }, { status: 404 }) + } + + if (meta.userId !== session.user.id) { + return NextResponse.json({ error: 'Unauthorized' }, { status: 403 }) + } + + // Get pending diff + const pendingDiff = await getPendingDiff(streamId) + + if (!pendingDiff) { + return NextResponse.json({ pendingDiff: null }) + } + + logger.info('Retrieved pending diff', { + streamId, + toolCallId: pendingDiff.toolCallId, + }) + + return NextResponse.json({ pendingDiff }) + } catch (error) { + logger.error('Failed to get pending diff', { error }) + return NextResponse.json({ error: 'Internal server error' }, { status: 500 }) + } +} + +/** + * POST /api/copilot/stream/[streamId]/pending-diff + * Store pending diff state for a stream + */ +export async function POST( + request: Request, + { params }: { params: Promise<{ streamId: string }> } +) { + try { + const session = await getSession() + if (!session?.user?.id) { + return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) + } + + const { streamId } = await params + if (!streamId) { + return NextResponse.json({ error: 'Stream ID required' }, { status: 400 }) + } + + // Verify user owns this stream + const meta = await getStreamMeta(streamId) + if (!meta) { + return NextResponse.json({ error: 'Stream not found' }, { status: 404 }) + } + + if (meta.userId !== session.user.id) { + return NextResponse.json({ error: 'Unauthorized' }, { status: 403 }) + } + + const body = await request.json() + const { pendingDiff } = body + + if (!pendingDiff || !pendingDiff.toolCallId) { + return NextResponse.json({ error: 'Invalid pending diff data' }, { status: 400 }) + } + + await setPendingDiff(streamId, pendingDiff) + + logger.info('Stored pending diff', { + streamId, + toolCallId: pendingDiff.toolCallId, + }) + + return NextResponse.json({ success: true }) + } catch (error) { + logger.error('Failed to store pending diff', { error }) + return NextResponse.json({ error: 'Internal server error' }, { status: 500 }) + } +} + +/** + * DELETE /api/copilot/stream/[streamId]/pending-diff + * Clear pending diff state for a stream + */ +export async function DELETE( + request: Request, + { params }: { params: Promise<{ streamId: string }> } +) { + try { + const session = await getSession() + if (!session?.user?.id) { + return NextResponse.json({ error: 'Unauthorized' }, { status: 401 }) + } + + const { streamId } = await params + if (!streamId) { + return NextResponse.json({ error: 'Stream ID required' }, { status: 400 }) + } + + // Verify user owns this stream (if it exists - might already be cleaned up) + const meta = await getStreamMeta(streamId) + if (meta && meta.userId !== session.user.id) { + return NextResponse.json({ error: 'Unauthorized' }, { status: 403 }) + } + + await clearPendingDiff(streamId) + + logger.info('Cleared pending diff', { streamId }) + + return NextResponse.json({ success: true }) + } catch (error) { + logger.error('Failed to clear pending diff', { error }) + return NextResponse.json({ error: 'Internal server error' }, { status: 500 }) + } +} + diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/workflow.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/workflow.tsx index 82a3e3471..f7a30f930 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/workflow.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/workflow.tsx @@ -434,13 +434,20 @@ const WorkflowContent = React.memo(() => { window.removeEventListener('open-oauth-connect', handleOpenOAuthConnect as EventListener) }, []) - const { diffAnalysis, isShowingDiff, isDiffReady, reapplyDiffMarkers, hasActiveDiff } = - useWorkflowDiffStore( + const { + diffAnalysis, + isShowingDiff, + isDiffReady, + reapplyDiffMarkers, + hasActiveDiff, + restoreDiffFromMarkers, + } = useWorkflowDiffStore( useShallow((state) => ({ diffAnalysis: state.diffAnalysis, isShowingDiff: state.isShowingDiff, isDiffReady: state.isDiffReady, reapplyDiffMarkers: state.reapplyDiffMarkers, + restoreDiffFromMarkers: state.restoreDiffFromMarkers, hasActiveDiff: state.hasActiveDiff, })) ) @@ -466,6 +473,16 @@ const WorkflowContent = React.memo(() => { } }, [blocks, hasActiveDiff, isDiffReady, reapplyDiffMarkers, isWorkflowReady]) + /** Restore diff state from markers on page load if blocks have is_diff markers. */ + const hasRestoredDiff = useRef(false) + useEffect(() => { + if (!isWorkflowReady || hasRestoredDiff.current || hasActiveDiff) return + // Check once when workflow becomes ready + hasRestoredDiff.current = true + // Delay slightly to ensure blocks are fully loaded + setTimeout(() => restoreDiffFromMarkers(), 100) + }, [isWorkflowReady, hasActiveDiff, restoreDiffFromMarkers]) + /** Reconstructs deleted edges for diff view and filters invalid edges. */ const edgesForDisplay = useMemo(() => { let edgesToFilter = edges diff --git a/apps/sim/lib/copilot/client-renderer.ts b/apps/sim/lib/copilot/client-renderer.ts new file mode 100644 index 000000000..0a79916e2 --- /dev/null +++ b/apps/sim/lib/copilot/client-renderer.ts @@ -0,0 +1,743 @@ +/** + * Client Renderer - Handles render events from the server + * + * This is the client-side counterpart to the stream transformer. + * It receives render events from the server and updates the UI accordingly. + * All business logic (tool execution, persistence) is handled server-side. + * The client just renders. + */ + +import { createLogger } from '@sim/logger' +import type { RenderEvent, RenderEventType } from './render-events' + +const logger = createLogger('ClientRenderer') + +// ============================================================================ +// Types +// ============================================================================ + +export interface RendererState { + // Stream state + streamId: string | null + chatId: string | null + isStreaming: boolean + isComplete: boolean + hasError: boolean + errorMessage: string | null + + // Message state + currentMessageId: string | null + content: string + + // Thinking state + isThinking: boolean + thinkingContent: string + + // Tool calls + toolCalls: Map + + // Plan state + isCapturingPlan: boolean + planContent: string + planTodos: PlanTodo[] + + // Options state + isCapturingOptions: boolean + optionsContent: string + options: string[] + + // Subagent state + activeSubagents: Map + + // Interrupts + pendingInterrupts: Map +} + +export interface ToolCallState { + id: string + name: string + args: Record + status: 'pending' | 'generating' | 'executing' | 'success' | 'error' | 'aborted' + result?: unknown + error?: string + display: { + label: string + description?: string + } +} + +export interface SubagentState { + parentToolCallId: string + subagentId: string + label?: string + toolCalls: Map +} + +export interface PlanTodo { + id: string + content: string + status: 'pending' | 'in_progress' | 'completed' +} + +export interface InterruptState { + toolCallId: string + toolName: string + options: Array<{ + id: string + label: string + description?: string + variant?: 'default' | 'destructive' | 'outline' + }> + message?: string +} + +export interface RendererCallbacks { + /** Called when state changes - trigger UI re-render */ + onStateChange: (state: RendererState) => void + + /** Called when a diff is ready - read workflow from DB */ + onDiffReady?: (workflowId: string, toolCallId: string) => void + + /** Called when user needs to resolve an interrupt */ + onInterruptRequired?: (interrupt: InterruptState) => void + + /** Called when stream completes */ + onStreamComplete?: () => void + + /** Called when stream errors */ + onStreamError?: (error: string) => void +} + +// ============================================================================ +// Renderer Class +// ============================================================================ + +export class ClientRenderer { + private state: RendererState + private callbacks: RendererCallbacks + private eventQueue: RenderEvent[] = [] + private isProcessing = false + + constructor(callbacks: RendererCallbacks) { + this.callbacks = callbacks + this.state = this.createInitialState() + } + + private createInitialState(): RendererState { + return { + streamId: null, + chatId: null, + isStreaming: false, + isComplete: false, + hasError: false, + errorMessage: null, + currentMessageId: null, + content: '', + isThinking: false, + thinkingContent: '', + toolCalls: new Map(), + isCapturingPlan: false, + planContent: '', + planTodos: [], + isCapturingOptions: false, + optionsContent: '', + options: [], + activeSubagents: new Map(), + pendingInterrupts: new Map(), + } + } + + /** Reset renderer state for a new stream */ + reset(): void { + this.state = this.createInitialState() + this.eventQueue = [] + this.isProcessing = false + this.notifyStateChange() + } + + /** Get current state (immutable copy) */ + getState(): Readonly { + return { ...this.state } + } + + /** Process a render event from the server */ + async processEvent(event: RenderEvent): Promise { + this.eventQueue.push(event) + await this.processQueue() + } + + /** Process multiple events (for replay) */ + async processEvents(events: RenderEvent[]): Promise { + this.eventQueue.push(...events) + await this.processQueue() + } + + private async processQueue(): Promise { + if (this.isProcessing) return + this.isProcessing = true + + try { + while (this.eventQueue.length > 0) { + const event = this.eventQueue.shift()! + await this.handleEvent(event) + } + } finally { + this.isProcessing = false + } + } + + private async handleEvent(event: RenderEvent): Promise { + const type = event.type as RenderEventType + + switch (type) { + // ========== Stream Lifecycle ========== + case 'stream_start': + this.handleStreamStart(event as any) + break + + case 'stream_end': + this.handleStreamEnd() + break + + case 'stream_error': + this.handleStreamError(event as any) + break + + // ========== Message Lifecycle ========== + case 'message_start': + this.handleMessageStart(event as any) + break + + case 'message_saved': + this.handleMessageSaved(event as any) + break + + case 'message_end': + this.handleMessageEnd(event as any) + break + + // ========== Text Content ========== + case 'text_delta': + this.handleTextDelta(event as any) + break + + // ========== Thinking ========== + case 'thinking_start': + this.handleThinkingStart() + break + + case 'thinking_delta': + this.handleThinkingDelta(event as any) + break + + case 'thinking_end': + this.handleThinkingEnd() + break + + // ========== Tool Calls ========== + case 'tool_pending': + this.handleToolPending(event as any) + break + + case 'tool_generating': + this.handleToolGenerating(event as any) + break + + case 'tool_executing': + this.handleToolExecuting(event as any) + break + + case 'tool_success': + this.handleToolSuccess(event as any) + break + + case 'tool_error': + this.handleToolError(event as any) + break + + case 'tool_aborted': + this.handleToolAborted(event as any) + break + + // ========== Interrupts ========== + case 'interrupt_show': + this.handleInterruptShow(event as any) + break + + case 'interrupt_resolved': + this.handleInterruptResolved(event as any) + break + + // ========== Diffs ========== + case 'diff_ready': + this.handleDiffReady(event as any) + break + + // ========== Plans ========== + case 'plan_start': + this.handlePlanStart() + break + + case 'plan_delta': + this.handlePlanDelta(event as any) + break + + case 'plan_end': + this.handlePlanEnd(event as any) + break + + // ========== Options ========== + case 'options_start': + this.handleOptionsStart() + break + + case 'options_delta': + this.handleOptionsDelta(event as any) + break + + case 'options_end': + this.handleOptionsEnd(event as any) + break + + // ========== Subagents ========== + case 'subagent_start': + this.handleSubagentStart(event as any) + break + + case 'subagent_tool_pending': + this.handleSubagentToolPending(event as any) + break + + case 'subagent_tool_executing': + this.handleSubagentToolExecuting(event as any) + break + + case 'subagent_tool_success': + this.handleSubagentToolSuccess(event as any) + break + + case 'subagent_tool_error': + this.handleSubagentToolError(event as any) + break + + case 'subagent_end': + this.handleSubagentEnd(event as any) + break + + // ========== Chat Metadata ========== + case 'chat_id': + this.state.chatId = (event as any).chatId + this.notifyStateChange() + break + + case 'title_updated': + // Title updates are handled externally + logger.debug('Title updated', { title: (event as any).title }) + break + + default: + logger.warn('Unknown render event type', { type }) + } + } + + // ============================================================================ + // Event Handlers + // ============================================================================ + + private handleStreamStart(event: { + streamId: string + chatId: string + userMessageId: string + assistantMessageId: string + }): void { + this.state.streamId = event.streamId + this.state.chatId = event.chatId + this.state.currentMessageId = event.assistantMessageId + this.state.isStreaming = true + this.state.isComplete = false + this.state.hasError = false + this.notifyStateChange() + } + + private handleStreamEnd(): void { + this.state.isStreaming = false + this.state.isComplete = true + this.notifyStateChange() + this.callbacks.onStreamComplete?.() + } + + private handleStreamError(event: { error: string }): void { + this.state.isStreaming = false + this.state.hasError = true + this.state.errorMessage = event.error + this.notifyStateChange() + this.callbacks.onStreamError?.(event.error) + } + + private handleMessageStart(event: { messageId: string; role: string }): void { + if (event.role === 'assistant') { + this.state.currentMessageId = event.messageId + this.state.content = '' + } + this.notifyStateChange() + } + + private handleMessageSaved(event: { messageId: string; refreshFromDb?: boolean }): void { + logger.debug('Message saved', { messageId: event.messageId, refresh: event.refreshFromDb }) + // If refreshFromDb is true, the message was saved with special state (like diff markers) + // The client should refresh from DB to get the latest state + } + + private handleMessageEnd(event: { messageId: string }): void { + logger.debug('Message end', { messageId: event.messageId }) + } + + private handleTextDelta(event: { content: string }): void { + this.state.content += event.content + this.notifyStateChange() + } + + private handleThinkingStart(): void { + this.state.isThinking = true + this.state.thinkingContent = '' + this.notifyStateChange() + } + + private handleThinkingDelta(event: { content: string }): void { + this.state.thinkingContent += event.content + this.notifyStateChange() + } + + private handleThinkingEnd(): void { + this.state.isThinking = false + this.notifyStateChange() + } + + private handleToolPending(event: { + toolCallId: string + toolName: string + args: Record + display: { label: string; description?: string } + }): void { + this.state.toolCalls.set(event.toolCallId, { + id: event.toolCallId, + name: event.toolName, + args: event.args, + status: 'pending', + display: event.display, + }) + this.notifyStateChange() + } + + private handleToolGenerating(event: { + toolCallId: string + argsPartial?: Record + }): void { + const tool = this.state.toolCalls.get(event.toolCallId) + if (tool) { + tool.status = 'generating' + if (event.argsPartial) { + tool.args = event.argsPartial + } + } + this.notifyStateChange() + } + + private handleToolExecuting(event: { toolCallId: string }): void { + const tool = this.state.toolCalls.get(event.toolCallId) + if (tool) { + tool.status = 'executing' + } + this.notifyStateChange() + } + + private handleToolSuccess(event: { + toolCallId: string + result?: unknown + display?: { label: string; description?: string } + workflowId?: string + hasDiff?: boolean + }): void { + const tool = this.state.toolCalls.get(event.toolCallId) + if (tool) { + tool.status = 'success' + tool.result = event.result + if (event.display) { + tool.display = event.display + } + } + this.notifyStateChange() + } + + private handleToolError(event: { + toolCallId: string + error: string + display?: { label: string; description?: string } + }): void { + const tool = this.state.toolCalls.get(event.toolCallId) + if (tool) { + tool.status = 'error' + tool.error = event.error + if (event.display) { + tool.display = event.display + } + } + this.notifyStateChange() + } + + private handleToolAborted(event: { toolCallId: string; reason?: string }): void { + const tool = this.state.toolCalls.get(event.toolCallId) + if (tool) { + tool.status = 'aborted' + tool.error = event.reason + } + this.notifyStateChange() + } + + private handleInterruptShow(event: { + toolCallId: string + toolName: string + options: Array<{ + id: string + label: string + description?: string + variant?: 'default' | 'destructive' | 'outline' + }> + message?: string + }): void { + this.state.pendingInterrupts.set(event.toolCallId, { + toolCallId: event.toolCallId, + toolName: event.toolName, + options: event.options, + message: event.message, + }) + this.notifyStateChange() + this.callbacks.onInterruptRequired?.({ + toolCallId: event.toolCallId, + toolName: event.toolName, + options: event.options, + message: event.message, + }) + } + + private handleInterruptResolved(event: { + toolCallId: string + choice: string + approved: boolean + }): void { + this.state.pendingInterrupts.delete(event.toolCallId) + this.notifyStateChange() + } + + private handleDiffReady(event: { workflowId: string; toolCallId: string }): void { + this.callbacks.onDiffReady?.(event.workflowId, event.toolCallId) + } + + private handlePlanStart(): void { + this.state.isCapturingPlan = true + this.state.planContent = '' + this.notifyStateChange() + } + + private handlePlanDelta(event: { content: string }): void { + this.state.planContent += event.content + this.notifyStateChange() + } + + private handlePlanEnd(event: { todos: PlanTodo[] }): void { + this.state.isCapturingPlan = false + this.state.planTodos = event.todos + this.notifyStateChange() + } + + private handleOptionsStart(): void { + this.state.isCapturingOptions = true + this.state.optionsContent = '' + this.notifyStateChange() + } + + private handleOptionsDelta(event: { content: string }): void { + this.state.optionsContent += event.content + this.notifyStateChange() + } + + private handleOptionsEnd(event: { options: string[] }): void { + this.state.isCapturingOptions = false + this.state.options = event.options + this.notifyStateChange() + } + + private handleSubagentStart(event: { + parentToolCallId: string + subagentId: string + label?: string + }): void { + this.state.activeSubagents.set(event.parentToolCallId, { + parentToolCallId: event.parentToolCallId, + subagentId: event.subagentId, + label: event.label, + toolCalls: new Map(), + }) + this.notifyStateChange() + } + + private handleSubagentToolPending(event: { + parentToolCallId: string + toolCallId: string + toolName: string + args: Record + display: { label: string; description?: string } + }): void { + const subagent = this.state.activeSubagents.get(event.parentToolCallId) + if (subagent) { + subagent.toolCalls.set(event.toolCallId, { + id: event.toolCallId, + name: event.toolName, + args: event.args, + status: 'pending', + display: event.display, + }) + } + this.notifyStateChange() + } + + private handleSubagentToolExecuting(event: { + parentToolCallId: string + toolCallId: string + }): void { + const subagent = this.state.activeSubagents.get(event.parentToolCallId) + if (subagent) { + const tool = subagent.toolCalls.get(event.toolCallId) + if (tool) { + tool.status = 'executing' + } + } + this.notifyStateChange() + } + + private handleSubagentToolSuccess(event: { + parentToolCallId: string + toolCallId: string + result?: unknown + display?: { label: string; description?: string } + }): void { + const subagent = this.state.activeSubagents.get(event.parentToolCallId) + if (subagent) { + const tool = subagent.toolCalls.get(event.toolCallId) + if (tool) { + tool.status = 'success' + tool.result = event.result + if (event.display) { + tool.display = event.display + } + } + } + this.notifyStateChange() + } + + private handleSubagentToolError(event: { + parentToolCallId: string + toolCallId: string + error: string + }): void { + const subagent = this.state.activeSubagents.get(event.parentToolCallId) + if (subagent) { + const tool = subagent.toolCalls.get(event.toolCallId) + if (tool) { + tool.status = 'error' + tool.error = event.error + } + } + this.notifyStateChange() + } + + private handleSubagentEnd(event: { parentToolCallId: string }): void { + // Keep subagent data for display, just mark as complete + logger.debug('Subagent ended', { parentToolCallId: event.parentToolCallId }) + this.notifyStateChange() + } + + private notifyStateChange(): void { + this.callbacks.onStateChange(this.getState()) + } +} + +// ============================================================================ +// Helper Functions +// ============================================================================ + +/** + * Parse a render event from an SSE data line + */ +export function parseRenderEvent(line: string): RenderEvent | null { + if (!line.startsWith('data: ')) return null + try { + return JSON.parse(line.slice(6)) as RenderEvent + } catch { + return null + } +} + +/** + * Stream events from an SSE endpoint and process them + */ +export async function streamRenderEvents( + url: string, + renderer: ClientRenderer, + options?: { + signal?: AbortSignal + onConnect?: () => void + onError?: (error: Error) => void + } +): Promise { + const response = await fetch(url, { + headers: { Accept: 'text/event-stream' }, + signal: options?.signal, + }) + + if (!response.ok) { + const error = new Error(`Stream failed: ${response.status}`) + options?.onError?.(error) + throw error + } + + options?.onConnect?.() + + const reader = response.body?.getReader() + if (!reader) { + throw new Error('No response body') + } + + const decoder = new TextDecoder() + let buffer = '' + + try { + while (true) { + const { done, value } = await reader.read() + if (done) break + + buffer += decoder.decode(value, { stream: true }) + + const lines = buffer.split('\n') + buffer = lines.pop() || '' + + for (const line of lines) { + const event = parseRenderEvent(line) + if (event) { + await renderer.processEvent(event) + } + } + } + + // Process remaining buffer + if (buffer) { + const event = parseRenderEvent(buffer) + if (event) { + await renderer.processEvent(event) + } + } + } finally { + reader.releaseLock() + } +} + diff --git a/apps/sim/lib/copilot/render-events.ts b/apps/sim/lib/copilot/render-events.ts new file mode 100644 index 000000000..a4138f6de --- /dev/null +++ b/apps/sim/lib/copilot/render-events.ts @@ -0,0 +1,470 @@ +/** + * Render Events - Server → Client SSE Protocol + * + * This defines the SSE event protocol between the copilot server and client. + * The server processes the raw Sim Agent stream, executes tools, persists to DB, + * and emits these render events. The client just renders based on these events. + * + * Benefits: + * - Client is purely a renderer (no parsing, no execution) + * - Persistence happens before render (safe to refresh anytime) + * - Works identically with or without a client (API-only mode) + * - Resume is just replaying render events + */ + +// ============================================================================ +// Base Types +// ============================================================================ + +export interface BaseRenderEvent { + type: RenderEventType + /** Monotonically increasing sequence number for ordering */ + seq: number + /** Timestamp when event was created */ + ts: number +} + +export type RenderEventType = + // Stream lifecycle + | 'stream_start' + | 'stream_end' + | 'stream_error' + + // Message lifecycle + | 'message_start' + | 'message_saved' + | 'message_end' + + // Text content + | 'text_delta' + + // Thinking blocks + | 'thinking_start' + | 'thinking_delta' + | 'thinking_end' + + // Tool calls + | 'tool_pending' + | 'tool_generating' + | 'tool_executing' + | 'tool_success' + | 'tool_error' + | 'tool_aborted' + + // Interrupts (user approval needed) + | 'interrupt_show' + | 'interrupt_resolved' + + // Workflow diffs + | 'diff_ready' + | 'diff_accepted' + | 'diff_rejected' + + // Plans + | 'plan_start' + | 'plan_delta' + | 'plan_end' + + // Options (continue/follow-up suggestions) + | 'options_start' + | 'options_delta' + | 'options_end' + + // Subagents + | 'subagent_start' + | 'subagent_tool_pending' + | 'subagent_tool_generating' + | 'subagent_tool_executing' + | 'subagent_tool_success' + | 'subagent_tool_error' + | 'subagent_end' + + // Chat metadata + | 'chat_id' + | 'title_updated' + +// ============================================================================ +// Stream Lifecycle Events +// ============================================================================ + +export interface StreamStartEvent extends BaseRenderEvent { + type: 'stream_start' + streamId: string + chatId: string + userMessageId: string + assistantMessageId: string +} + +export interface StreamEndEvent extends BaseRenderEvent { + type: 'stream_end' +} + +export interface StreamErrorEvent extends BaseRenderEvent { + type: 'stream_error' + error: string + code?: string +} + +// ============================================================================ +// Message Lifecycle Events +// ============================================================================ + +export interface MessageStartEvent extends BaseRenderEvent { + type: 'message_start' + messageId: string + role: 'user' | 'assistant' +} + +export interface MessageSavedEvent extends BaseRenderEvent { + type: 'message_saved' + messageId: string + /** If true, client should refresh message from DB (contains diff markers, etc.) */ + refreshFromDb?: boolean +} + +export interface MessageEndEvent extends BaseRenderEvent { + type: 'message_end' + messageId: string +} + +// ============================================================================ +// Text Content Events +// ============================================================================ + +export interface TextDeltaEvent extends BaseRenderEvent { + type: 'text_delta' + content: string +} + +// ============================================================================ +// Thinking Block Events +// ============================================================================ + +export interface ThinkingStartEvent extends BaseRenderEvent { + type: 'thinking_start' +} + +export interface ThinkingDeltaEvent extends BaseRenderEvent { + type: 'thinking_delta' + content: string +} + +export interface ThinkingEndEvent extends BaseRenderEvent { + type: 'thinking_end' +} + +// ============================================================================ +// Tool Call Events +// ============================================================================ + +export interface ToolDisplay { + label: string + description?: string + icon?: string +} + +export interface ToolPendingEvent extends BaseRenderEvent { + type: 'tool_pending' + toolCallId: string + toolName: string + args: Record + display: ToolDisplay +} + +export interface ToolGeneratingEvent extends BaseRenderEvent { + type: 'tool_generating' + toolCallId: string + /** Partial args as they stream in */ + argsDelta?: string + /** Full args so far */ + argsPartial?: Record +} + +export interface ToolExecutingEvent extends BaseRenderEvent { + type: 'tool_executing' + toolCallId: string + display?: ToolDisplay +} + +export interface ToolSuccessEvent extends BaseRenderEvent { + type: 'tool_success' + toolCallId: string + result?: unknown + display?: ToolDisplay + /** For edit_workflow: tells client to read diff from DB */ + workflowId?: string + hasDiff?: boolean +} + +export interface ToolErrorEvent extends BaseRenderEvent { + type: 'tool_error' + toolCallId: string + error: string + display?: ToolDisplay +} + +export interface ToolAbortedEvent extends BaseRenderEvent { + type: 'tool_aborted' + toolCallId: string + reason?: string + display?: ToolDisplay +} + +// ============================================================================ +// Interrupt Events (User Approval) +// ============================================================================ + +export interface InterruptOption { + id: string + label: string + description?: string + variant?: 'default' | 'destructive' | 'outline' +} + +export interface InterruptShowEvent extends BaseRenderEvent { + type: 'interrupt_show' + toolCallId: string + toolName: string + options: InterruptOption[] + /** Optional message to display */ + message?: string +} + +export interface InterruptResolvedEvent extends BaseRenderEvent { + type: 'interrupt_resolved' + toolCallId: string + choice: string + /** Whether to continue execution */ + approved: boolean +} + +// ============================================================================ +// Workflow Diff Events +// ============================================================================ + +export interface DiffReadyEvent extends BaseRenderEvent { + type: 'diff_ready' + workflowId: string + toolCallId: string + /** Client should read workflow state from DB which contains diff markers */ +} + +export interface DiffAcceptedEvent extends BaseRenderEvent { + type: 'diff_accepted' + workflowId: string +} + +export interface DiffRejectedEvent extends BaseRenderEvent { + type: 'diff_rejected' + workflowId: string +} + +// ============================================================================ +// Plan Events +// ============================================================================ + +export interface PlanTodo { + id: string + content: string + status: 'pending' | 'in_progress' | 'completed' +} + +export interface PlanStartEvent extends BaseRenderEvent { + type: 'plan_start' +} + +export interface PlanDeltaEvent extends BaseRenderEvent { + type: 'plan_delta' + content: string +} + +export interface PlanEndEvent extends BaseRenderEvent { + type: 'plan_end' + todos: PlanTodo[] +} + +// ============================================================================ +// Options Events (Follow-up Suggestions) +// ============================================================================ + +export interface OptionsStartEvent extends BaseRenderEvent { + type: 'options_start' +} + +export interface OptionsDeltaEvent extends BaseRenderEvent { + type: 'options_delta' + content: string +} + +export interface OptionsEndEvent extends BaseRenderEvent { + type: 'options_end' + options: string[] +} + +// ============================================================================ +// Subagent Events +// ============================================================================ + +export interface SubagentStartEvent extends BaseRenderEvent { + type: 'subagent_start' + parentToolCallId: string + subagentId: string + label?: string +} + +export interface SubagentToolPendingEvent extends BaseRenderEvent { + type: 'subagent_tool_pending' + parentToolCallId: string + toolCallId: string + toolName: string + args: Record + display: ToolDisplay +} + +export interface SubagentToolGeneratingEvent extends BaseRenderEvent { + type: 'subagent_tool_generating' + parentToolCallId: string + toolCallId: string + argsDelta?: string +} + +export interface SubagentToolExecutingEvent extends BaseRenderEvent { + type: 'subagent_tool_executing' + parentToolCallId: string + toolCallId: string +} + +export interface SubagentToolSuccessEvent extends BaseRenderEvent { + type: 'subagent_tool_success' + parentToolCallId: string + toolCallId: string + result?: unknown + display?: ToolDisplay +} + +export interface SubagentToolErrorEvent extends BaseRenderEvent { + type: 'subagent_tool_error' + parentToolCallId: string + toolCallId: string + error: string +} + +export interface SubagentEndEvent extends BaseRenderEvent { + type: 'subagent_end' + parentToolCallId: string +} + +// ============================================================================ +// Chat Metadata Events +// ============================================================================ + +export interface ChatIdEvent extends BaseRenderEvent { + type: 'chat_id' + chatId: string +} + +export interface TitleUpdatedEvent extends BaseRenderEvent { + type: 'title_updated' + title: string +} + +// ============================================================================ +// Union Type +// ============================================================================ + +export type RenderEvent = + // Stream lifecycle + | StreamStartEvent + | StreamEndEvent + | StreamErrorEvent + // Message lifecycle + | MessageStartEvent + | MessageSavedEvent + | MessageEndEvent + // Text content + | TextDeltaEvent + // Thinking + | ThinkingStartEvent + | ThinkingDeltaEvent + | ThinkingEndEvent + // Tool calls + | ToolPendingEvent + | ToolGeneratingEvent + | ToolExecutingEvent + | ToolSuccessEvent + | ToolErrorEvent + | ToolAbortedEvent + // Interrupts + | InterruptShowEvent + | InterruptResolvedEvent + // Diffs + | DiffReadyEvent + | DiffAcceptedEvent + | DiffRejectedEvent + // Plans + | PlanStartEvent + | PlanDeltaEvent + | PlanEndEvent + // Options + | OptionsStartEvent + | OptionsDeltaEvent + | OptionsEndEvent + // Subagents + | SubagentStartEvent + | SubagentToolPendingEvent + | SubagentToolGeneratingEvent + | SubagentToolExecutingEvent + | SubagentToolSuccessEvent + | SubagentToolErrorEvent + | SubagentEndEvent + // Chat metadata + | ChatIdEvent + | TitleUpdatedEvent + +// ============================================================================ +// Helper Functions +// ============================================================================ + +let seqCounter = 0 + +/** + * Create a render event with auto-incrementing sequence number + */ +export function createRenderEvent( + type: T, + data: Omit, 'type' | 'seq' | 'ts'> +): Extract { + return { + type, + seq: ++seqCounter, + ts: Date.now(), + ...data, + } as Extract +} + +/** + * Reset sequence counter (for testing or new streams) + */ +export function resetSeqCounter(): void { + seqCounter = 0 +} + +/** + * Serialize a render event to SSE format + */ +export function serializeRenderEvent(event: RenderEvent): string { + return `data: ${JSON.stringify(event)}\n\n` +} + +/** + * Parse a render event from SSE data line + */ +export function parseRenderEvent(line: string): RenderEvent | null { + if (!line.startsWith('data: ')) return null + try { + return JSON.parse(line.slice(6)) as RenderEvent + } catch { + return null + } +} + diff --git a/apps/sim/lib/copilot/stream-persistence.ts b/apps/sim/lib/copilot/stream-persistence.ts index 0d8597cce..166cb39d0 100644 --- a/apps/sim/lib/copilot/stream-persistence.ts +++ b/apps/sim/lib/copilot/stream-persistence.ts @@ -35,6 +35,16 @@ export interface ToolCallRecord { error?: string } +/** + * Pending diff state for edit_workflow tool calls + */ +export interface PendingDiffState { + toolCallId: string + baselineWorkflow: unknown + proposedWorkflow: unknown + diffAnalysis: unknown +} + /** * Stream metadata stored in Redis */ @@ -51,6 +61,8 @@ export interface StreamMeta { conversationId?: string createdAt: number updatedAt: number + /** Pending diff state if edit_workflow tool has changes waiting for review */ + pendingDiff?: PendingDiffState } /** @@ -185,6 +197,56 @@ export async function updateToolCall( await redis.setex(metaKey, STREAM_TTL, JSON.stringify(meta)) } +/** + * Store pending diff state for a stream (called when edit_workflow creates a diff) + */ +export async function setPendingDiff( + streamId: string, + pendingDiff: PendingDiffState +): Promise { + const redis = getRedisClient() + if (!redis) return + + const metaKey = `copilot:stream:${streamId}:meta` + const raw = await redis.get(metaKey) + if (!raw) return + + const meta: StreamMeta = JSON.parse(raw) + meta.pendingDiff = pendingDiff + meta.updatedAt = Date.now() + await redis.setex(metaKey, STREAM_TTL, JSON.stringify(meta)) + logger.info('Stored pending diff for stream', { streamId, toolCallId: pendingDiff.toolCallId }) +} + +/** + * Clear pending diff state (called when user accepts/rejects the diff) + */ +export async function clearPendingDiff(streamId: string): Promise { + const redis = getRedisClient() + if (!redis) return + + const metaKey = `copilot:stream:${streamId}:meta` + const raw = await redis.get(metaKey) + if (!raw) return + + const meta: StreamMeta = JSON.parse(raw) + delete meta.pendingDiff + meta.updatedAt = Date.now() + await redis.setex(metaKey, STREAM_TTL, JSON.stringify(meta)) + logger.info('Cleared pending diff for stream', { streamId }) +} + +/** + * Get pending diff state for a stream + */ +export async function getPendingDiff(streamId: string): Promise { + const redis = getRedisClient() + if (!redis) return null + + const meta = await getStreamMeta(streamId) + return meta?.pendingDiff || null +} + /** * Complete the stream - save to database and cleanup Redis */ @@ -259,6 +321,47 @@ async function saveToDatabase(meta: StreamMeta, conversationId?: string): Promis const existingMessages = Array.isArray(chat.messages) ? chat.messages : [] + // Check if there's already an assistant message after the user message + // This can happen if the client already saved it before disconnecting + const userMessageIndex = existingMessages.findIndex( + (m: any) => m.id === meta.userMessageId && m.role === 'user' + ) + + // If there's already an assistant message right after the user message, + // the client may have already saved it - check if it's incomplete + if (userMessageIndex >= 0 && userMessageIndex < existingMessages.length - 1) { + const nextMessage = existingMessages[userMessageIndex + 1] as any + if (nextMessage?.role === 'assistant' && !nextMessage?.serverCompleted) { + // Client saved a partial message, update it with the complete content + const updatedMessages = existingMessages.map((m: any, idx: number) => { + if (idx === userMessageIndex + 1) { + return { + ...m, + content: meta.assistantContent, + toolCalls: meta.toolCalls, + serverCompleted: true, + } + } + return m + }) + + await db + .update(copilotChats) + .set({ + messages: updatedMessages, + conversationId: conversationId || (chat.conversationId as string | undefined), + updatedAt: new Date(), + }) + .where(eq(copilotChats.id, meta.chatId)) + + logger.info('Updated existing assistant message in database', { + streamId: meta.id, + chatId: meta.chatId, + }) + return + } + } + // Build the assistant message const assistantMessage = { id: crypto.randomUUID(), diff --git a/apps/sim/lib/copilot/stream-transformer.ts b/apps/sim/lib/copilot/stream-transformer.ts new file mode 100644 index 000000000..713b76931 --- /dev/null +++ b/apps/sim/lib/copilot/stream-transformer.ts @@ -0,0 +1,953 @@ +/** + * Stream Transformer - Converts Sim Agent SSE to Render Events + * + * This module processes the raw SSE stream from Sim Agent, executes tools, + * persists to the database, and emits render events for the client. + * + * The client receives only render events and just needs to render them. + */ + +import { createLogger } from '@sim/logger' +import { routeExecution } from '@/lib/copilot/tools/server/router' +import { isClientOnlyTool } from '@/lib/copilot/tools/client/ui-config' +import { env } from '@/lib/core/config/env' +import { + type RenderEvent, + type ToolDisplay, + createRenderEvent, + resetSeqCounter, + serializeRenderEvent, +} from './render-events' +import { SIM_AGENT_API_URL_DEFAULT } from './constants' + +const logger = createLogger('StreamTransformer') +const SIM_AGENT_API_URL = env.SIM_AGENT_API_URL || SIM_AGENT_API_URL_DEFAULT + +// ============================================================================ +// Types +// ============================================================================ + +export interface StreamTransformContext { + streamId: string + chatId: string + userId: string + workflowId?: string + userMessageId: string + assistantMessageId: string + + /** Callback to emit render events (sent to client via SSE) */ + onRenderEvent: (event: RenderEvent) => Promise + + /** Callback to persist state (called at key moments) */ + onPersist?: (data: PersistData) => Promise + + /** Callback to check if stream is aborted */ + isAborted?: () => boolean +} + +export interface PersistData { + type: 'content' | 'tool_call' | 'message_complete' + content?: string + toolCall?: { + id: string + name: string + args: Record + state: 'pending' | 'executing' | 'success' | 'error' + result?: unknown + } + messageComplete?: boolean +} + +// Track state during stream processing +interface TransformState { + // Content accumulation + assistantContent: string + + // Thinking block state + inThinkingBlock: boolean + thinkingContent: string + + // Plan capture + inPlanCapture: boolean + planContent: string + + // Options capture + inOptionsCapture: boolean + optionsContent: string + + // Tool call tracking + toolCalls: Map< + string, + { + id: string + name: string + args: Record + state: 'pending' | 'generating' | 'executing' | 'success' | 'error' + result?: unknown + } + > + + // Subagent tracking + activeSubagent: string | null // parentToolCallId + subagentToolCalls: Map // toolCallId -> parentToolCallId +} + +// ============================================================================ +// Main Transformer +// ============================================================================ + +/** + * Process a Sim Agent SSE stream and emit render events + */ +export async function transformStream( + agentStream: ReadableStream, + context: StreamTransformContext +): Promise { + const { streamId, chatId, userMessageId, assistantMessageId, onRenderEvent, isAborted } = context + + // Reset sequence counter for new stream + resetSeqCounter() + + const state: TransformState = { + assistantContent: '', + inThinkingBlock: false, + thinkingContent: '', + inPlanCapture: false, + planContent: '', + inOptionsCapture: false, + optionsContent: '', + toolCalls: new Map(), + activeSubagent: null, + subagentToolCalls: new Map(), + } + + // Emit stream start + await emitEvent(onRenderEvent, 'stream_start', { + streamId, + chatId, + userMessageId, + assistantMessageId, + }) + + // Emit message start for assistant + await emitEvent(onRenderEvent, 'message_start', { + messageId: assistantMessageId, + role: 'assistant', + }) + + const reader = agentStream.getReader() + const decoder = new TextDecoder() + let buffer = '' + + try { + while (true) { + // Check for abort + if (isAborted?.()) { + logger.info('Stream aborted by user', { streamId }) + // Abort any in-progress tools + for (const [toolCallId, tool] of state.toolCalls) { + if (tool.state === 'pending' || tool.state === 'executing') { + await emitEvent(onRenderEvent, 'tool_aborted', { + toolCallId, + reason: 'User aborted', + }) + } + } + break + } + + const { done, value } = await reader.read() + if (done) break + + buffer += decoder.decode(value, { stream: true }) + + // Process complete SSE lines + const lines = buffer.split('\n') + buffer = lines.pop() || '' // Keep incomplete line in buffer + + for (const line of lines) { + if (!line.startsWith('data: ') || line.length <= 6) continue + + try { + const event = JSON.parse(line.slice(6)) + await processSimAgentEvent(event, state, context) + } catch (e) { + logger.warn('Failed to parse SSE event', { line: line.slice(0, 100) }) + } + } + } + + // Process any remaining buffer + if (buffer.startsWith('data: ')) { + try { + const event = JSON.parse(buffer.slice(6)) + await processSimAgentEvent(event, state, context) + } catch {} + } + + // Finalize thinking block if still open + if (state.inThinkingBlock) { + await emitEvent(onRenderEvent, 'thinking_end', {}) + } + + // Finalize plan if still open + if (state.inPlanCapture) { + await finalizePlan(state, context) + } + + // Finalize options if still open + if (state.inOptionsCapture) { + await finalizeOptions(state, context) + } + + // Emit message end + await emitEvent(onRenderEvent, 'message_end', { messageId: assistantMessageId }) + + // Emit stream end + await emitEvent(onRenderEvent, 'stream_end', {}) + + // Persist final message + await context.onPersist?.({ + type: 'message_complete', + content: state.assistantContent, + messageComplete: true, + }) + + // Emit message saved + await emitEvent(onRenderEvent, 'message_saved', { + messageId: assistantMessageId, + refreshFromDb: false, + }) + } catch (error) { + logger.error('Stream transform error', { error, streamId }) + await emitEvent(onRenderEvent, 'stream_error', { + error: error instanceof Error ? error.message : 'Unknown error', + }) + } finally { + reader.releaseLock() + } +} + +// ============================================================================ +// Event Processing +// ============================================================================ + +async function processSimAgentEvent( + event: any, + state: TransformState, + context: StreamTransformContext +): Promise { + const { onRenderEvent } = context + + switch (event.type) { + // ========== Content Events ========== + case 'content': + await handleContent(event, state, context) + break + + // ========== Thinking Events ========== + case 'thinking': + await handleThinking(event, state, context) + break + + // ========== Tool Call Events ========== + case 'tool_call': + await handleToolCall(event, state, context) + break + + case 'tool_generating': + await handleToolGenerating(event, state, context) + break + + case 'tool_result': + await handleToolResult(event, state, context) + break + + case 'tool_error': + await handleToolError(event, state, context) + break + + // ========== Plan Events ========== + case 'plan_capture_start': + state.inPlanCapture = true + state.planContent = '' + await emitEvent(onRenderEvent, 'plan_start', {}) + break + + case 'plan_capture': + if (state.inPlanCapture && event.data) { + state.planContent += event.data + await emitEvent(onRenderEvent, 'plan_delta', { content: event.data }) + } + break + + case 'plan_capture_end': + await finalizePlan(state, context) + break + + // ========== Options Events ========== + case 'options_stream_start': + state.inOptionsCapture = true + state.optionsContent = '' + await emitEvent(onRenderEvent, 'options_start', {}) + break + + case 'options_stream': + if (state.inOptionsCapture && event.data) { + state.optionsContent += event.data + await emitEvent(onRenderEvent, 'options_delta', { content: event.data }) + } + break + + case 'options_stream_end': + await finalizeOptions(state, context) + break + + // ========== Subagent Events ========== + case 'subagent_start': + await handleSubagentStart(event, state, context) + break + + case 'subagent_end': + await handleSubagentEnd(event, state, context) + break + + // ========== Response Events ========== + case 'response_done': + // Final response from Sim Agent + logger.debug('Response done received', { streamId: context.streamId }) + break + + default: + logger.debug('Unknown Sim Agent event type', { type: event.type }) + } +} + +// ============================================================================ +// Content Handling +// ============================================================================ + +async function handleContent( + event: any, + state: TransformState, + context: StreamTransformContext +): Promise { + const content = event.data + if (!content) return + + state.assistantContent += content + + // Check for thinking block markers + if (content.includes('') || content.includes('')) { + state.inThinkingBlock = true + await context.onRenderEvent(createRenderEvent('thinking_start', {})) + // Don't emit the marker as text + return + } + + if (content.includes('') || content.includes('')) { + state.inThinkingBlock = false + await context.onRenderEvent(createRenderEvent('thinking_end', {})) + // Don't emit the marker as text + return + } + + // Route to appropriate handler + if (state.inThinkingBlock) { + state.thinkingContent += content + await context.onRenderEvent(createRenderEvent('thinking_delta', { content })) + } else { + await context.onRenderEvent(createRenderEvent('text_delta', { content })) + } +} + +// ============================================================================ +// Thinking Handling +// ============================================================================ + +async function handleThinking( + event: any, + state: TransformState, + context: StreamTransformContext +): Promise { + const content = event.data || event.thinking + if (!content) return + + // Start thinking block if not already + if (!state.inThinkingBlock) { + state.inThinkingBlock = true + await context.onRenderEvent(createRenderEvent('thinking_start', {})) + } + + state.thinkingContent += content + await context.onRenderEvent(createRenderEvent('thinking_delta', { content })) +} + +// ============================================================================ +// Tool Call Handling +// ============================================================================ + +async function handleToolCall( + event: any, + state: TransformState, + context: StreamTransformContext +): Promise { + const { onRenderEvent, userId, workflowId } = context + const data = event.data || event + const { id: toolCallId, name: toolName, arguments: args, partial } = data + + if (!toolCallId || !toolName) return + + // Check if this is a subagent tool call + const isSubagentTool = state.activeSubagent !== null + + // Track the tool call + const existingTool = state.toolCalls.get(toolCallId) + + if (partial) { + // Streaming args + if (!existingTool) { + state.toolCalls.set(toolCallId, { + id: toolCallId, + name: toolName, + args: args || {}, + state: 'generating', + }) + if (isSubagentTool) { + state.subagentToolCalls.set(toolCallId, state.activeSubagent!) + } + } else { + existingTool.args = { ...existingTool.args, ...args } + } + + const display = getToolDisplay(toolName, 'generating') + + if (isSubagentTool) { + await emitEvent(onRenderEvent, 'subagent_tool_generating', { + parentToolCallId: state.activeSubagent!, + toolCallId, + argsDelta: JSON.stringify(args), + }) + } else { + await emitEvent(onRenderEvent, 'tool_generating', { + toolCallId, + argsPartial: existingTool?.args || args, + }) + } + return + } + + // Complete tool call - ready to execute + const finalArgs = args || existingTool?.args || {} + + state.toolCalls.set(toolCallId, { + id: toolCallId, + name: toolName, + args: finalArgs, + state: 'pending', + }) + + if (isSubagentTool) { + state.subagentToolCalls.set(toolCallId, state.activeSubagent!) + } + + const display = getToolDisplay(toolName, 'pending') + + // Emit pending event + if (isSubagentTool) { + await emitEvent(onRenderEvent, 'subagent_tool_pending', { + parentToolCallId: state.activeSubagent!, + toolCallId, + toolName, + args: finalArgs, + display, + }) + } else { + await emitEvent(onRenderEvent, 'tool_pending', { + toolCallId, + toolName, + args: finalArgs, + display, + }) + } + + // Check if this tool needs user approval (interrupt) + const needsInterrupt = checkToolNeedsInterrupt(toolName, finalArgs) + if (needsInterrupt) { + const options = getInterruptOptions(toolName, finalArgs) + await emitEvent(onRenderEvent, 'interrupt_show', { + toolCallId, + toolName, + options, + }) + // Don't execute yet - wait for interrupt resolution + return + } + + // Check if this is a client-only tool + if (isClientOnlyTool(toolName)) { + logger.info('Skipping client-only tool on server', { toolName, toolCallId }) + // Client will handle this tool + return + } + + // Execute tool server-side - NON-BLOCKING for parallel execution + // Fire off the execution and let tool_result event handle the completion + executeToolServerSide(toolCallId, toolName, finalArgs, state, context).catch((err) => { + logger.error('Tool execution failed (async)', { toolCallId, toolName, error: err }) + }) +} + +async function handleToolGenerating( + event: any, + state: TransformState, + context: StreamTransformContext +): Promise { + const toolCallId = event.toolCallId || event.data?.id + if (!toolCallId) return + + const isSubagentTool = state.subagentToolCalls.has(toolCallId) + + if (isSubagentTool) { + await emitEvent(context.onRenderEvent, 'subagent_tool_generating', { + parentToolCallId: state.subagentToolCalls.get(toolCallId)!, + toolCallId, + argsDelta: event.data, + }) + } else { + await emitEvent(context.onRenderEvent, 'tool_generating', { + toolCallId, + argsDelta: event.data, + }) + } +} + +async function handleToolResult( + event: any, + state: TransformState, + context: StreamTransformContext +): Promise { + const toolCallId = event.toolCallId || event.data?.id + const success = event.success !== false + const result = event.result || event.data?.result + + if (!toolCallId) return + + const tool = state.toolCalls.get(toolCallId) + + // Skip if tool already in terminal state (server-side execution already emitted events) + if (tool && (tool.state === 'success' || tool.state === 'error')) { + logger.debug('Skipping duplicate tool_result event', { toolCallId, currentState: tool.state }) + return + } + + if (tool) { + tool.state = success ? 'success' : 'error' + tool.result = result + } + + const isSubagentTool = state.subagentToolCalls.has(toolCallId) + const display = getToolDisplay(tool?.name || '', success ? 'success' : 'error') + + if (isSubagentTool) { + await emitEvent(context.onRenderEvent, success ? 'subagent_tool_success' : 'subagent_tool_error', { + parentToolCallId: state.subagentToolCalls.get(toolCallId)!, + toolCallId, + ...(success ? { result, display } : { error: event.error || 'Tool failed' }), + }) + } else { + if (success) { + const successEvent: any = { + toolCallId, + result, + display, + } + + // Check if this was an edit_workflow that created a diff + if (tool?.name === 'edit_workflow' && result?.workflowState) { + successEvent.workflowId = context.workflowId + successEvent.hasDiff = true + } + + await emitEvent(context.onRenderEvent, 'tool_success', successEvent) + } else { + await emitEvent(context.onRenderEvent, 'tool_error', { + toolCallId, + error: event.error || 'Tool failed', + display, + }) + } + } + + // Persist tool call result + await context.onPersist?.({ + type: 'tool_call', + toolCall: { + id: toolCallId, + name: tool?.name || '', + args: tool?.args || {}, + state: success ? 'success' : 'error', + result, + }, + }) +} + +async function handleToolError( + event: any, + state: TransformState, + context: StreamTransformContext +): Promise { + const toolCallId = event.toolCallId || event.data?.id + const error = event.error || event.data?.error || 'Tool execution failed' + + if (!toolCallId) return + + const tool = state.toolCalls.get(toolCallId) + if (tool) { + tool.state = 'error' + } + + const isSubagentTool = state.subagentToolCalls.has(toolCallId) + const display = getToolDisplay(tool?.name || '', 'error') + + if (isSubagentTool) { + await emitEvent(context.onRenderEvent, 'subagent_tool_error', { + parentToolCallId: state.subagentToolCalls.get(toolCallId)!, + toolCallId, + error, + }) + } else { + await emitEvent(context.onRenderEvent, 'tool_error', { + toolCallId, + error, + display, + }) + } +} + +// ============================================================================ +// Tool Execution +// ============================================================================ + +async function executeToolServerSide( + toolCallId: string, + toolName: string, + args: Record, + state: TransformState, + context: StreamTransformContext +): Promise { + const { onRenderEvent, userId, workflowId } = context + const isSubagentTool = state.subagentToolCalls.has(toolCallId) + + // Update state to executing + const tool = state.toolCalls.get(toolCallId) + if (tool) { + tool.state = 'executing' + } + + const display = getToolDisplay(toolName, 'executing') + + // Emit executing event + if (isSubagentTool) { + await emitEvent(onRenderEvent, 'subagent_tool_executing', { + parentToolCallId: state.subagentToolCalls.get(toolCallId)!, + toolCallId, + }) + } else { + await emitEvent(onRenderEvent, 'tool_executing', { + toolCallId, + display, + }) + } + + try { + // Add workflowId to args if available + const execArgs = { ...args } + if (workflowId && !execArgs.workflowId) { + execArgs.workflowId = workflowId + } + + // Execute the tool via the router + const result = await routeExecution(toolName, execArgs, { userId }) + + // Update state + if (tool) { + tool.state = 'success' + tool.result = result + } + + // Emit success event + const successDisplay = getToolDisplay(toolName, 'success') + + if (isSubagentTool) { + await emitEvent(onRenderEvent, 'subagent_tool_success', { + parentToolCallId: state.subagentToolCalls.get(toolCallId)!, + toolCallId, + result, + display: successDisplay, + }) + } else { + const successEvent: any = { + toolCallId, + result, + display: successDisplay, + } + + // Check if this was an edit_workflow that created a diff + if (toolName === 'edit_workflow' && result?.workflowState) { + successEvent.workflowId = workflowId + successEvent.hasDiff = true + + // Emit diff_ready so client knows to read from DB + await emitEvent(onRenderEvent, 'diff_ready', { + workflowId: workflowId || '', + toolCallId, + }) + } + + await emitEvent(onRenderEvent, 'tool_success', successEvent) + } + + // Notify Sim Agent that tool is complete + await markToolComplete(toolCallId, toolName, true, result) + + // Persist tool result + await context.onPersist?.({ + type: 'tool_call', + toolCall: { + id: toolCallId, + name: toolName, + args, + state: 'success', + result, + }, + }) + } catch (error) { + const errorMessage = error instanceof Error ? error.message : 'Tool execution failed' + logger.error('Tool execution failed', { toolCallId, toolName, error: errorMessage }) + + // Update state + if (tool) { + tool.state = 'error' + } + + const errorDisplay = getToolDisplay(toolName, 'error') + + // Emit error event + if (isSubagentTool) { + await emitEvent(onRenderEvent, 'subagent_tool_error', { + parentToolCallId: state.subagentToolCalls.get(toolCallId)!, + toolCallId, + error: errorMessage, + }) + } else { + await emitEvent(onRenderEvent, 'tool_error', { + toolCallId, + error: errorMessage, + display: errorDisplay, + }) + } + + // Notify Sim Agent that tool failed + await markToolComplete(toolCallId, toolName, false, undefined, errorMessage) + } +} + +async function markToolComplete( + toolCallId: string, + toolName: string, + success: boolean, + result?: unknown, + error?: string +): Promise { + try { + const response = await fetch(`${SIM_AGENT_API_URL}/api/tools/mark-complete`, { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + ...(env.COPILOT_API_KEY ? { 'x-api-key': env.COPILOT_API_KEY } : {}), + }, + body: JSON.stringify({ + id: toolCallId, + name: toolName, + status: success ? 200 : 500, + message: success + ? (result as Record | undefined)?.message || 'Success' + : error, + data: success ? result : undefined, + }), + }) + + if (!response.ok) { + logger.warn('Failed to mark tool complete', { toolCallId, status: response.status }) + } + } catch (e) { + logger.error('Error marking tool complete', { toolCallId, error: e }) + } +} + +// ============================================================================ +// Subagent Handling +// ============================================================================ + +async function handleSubagentStart( + event: any, + state: TransformState, + context: StreamTransformContext +): Promise { + const parentToolCallId = event.parentToolCallId || event.data?.parentToolCallId + const subagentId = event.subagentId || event.data?.subagentId || parentToolCallId + const label = event.label || event.data?.label + + if (!parentToolCallId) return + + state.activeSubagent = parentToolCallId + + await emitEvent(context.onRenderEvent, 'subagent_start', { + parentToolCallId, + subagentId, + label, + }) +} + +async function handleSubagentEnd( + event: any, + state: TransformState, + context: StreamTransformContext +): Promise { + const parentToolCallId = event.parentToolCallId || event.data?.parentToolCallId || state.activeSubagent + + if (!parentToolCallId) return + + state.activeSubagent = null + + await emitEvent(context.onRenderEvent, 'subagent_end', { + parentToolCallId, + }) +} + +// ============================================================================ +// Plan & Options Handling +// ============================================================================ + +async function finalizePlan(state: TransformState, context: StreamTransformContext): Promise { + if (!state.inPlanCapture) return + + state.inPlanCapture = false + + // Parse todos from plan content + const todos = parseTodosFromPlan(state.planContent) + + await emitEvent(context.onRenderEvent, 'plan_end', { todos }) +} + +async function finalizeOptions( + state: TransformState, + context: StreamTransformContext +): Promise { + if (!state.inOptionsCapture) return + + state.inOptionsCapture = false + + // Parse options from content + const options = parseOptionsFromContent(state.optionsContent) + + await emitEvent(context.onRenderEvent, 'options_end', { options }) +} + +function parseTodosFromPlan(content: string): Array<{ id: string; content: string; status: 'pending' }> { + const todos: Array<{ id: string; content: string; status: 'pending' }> = [] + const lines = content.split('\n') + + for (const line of lines) { + const match = line.match(/^[-*]\s+(.+)$/) + if (match) { + todos.push({ + id: `todo_${Date.now()}_${todos.length}`, + content: match[1].trim(), + status: 'pending', + }) + } + } + + return todos +} + +function parseOptionsFromContent(content: string): string[] { + try { + // Try to parse as JSON array + const parsed = JSON.parse(content) + if (Array.isArray(parsed)) { + return parsed.filter((o) => typeof o === 'string') + } + } catch {} + + // Fall back to splitting by newlines + return content + .split('\n') + .map((l) => l.trim()) + .filter((l) => l.length > 0) +} + +// ============================================================================ +// Helpers +// ============================================================================ + +function getToolDisplay( + toolName: string, + state: 'pending' | 'generating' | 'executing' | 'success' | 'error' +): ToolDisplay { + // Default displays based on state + const stateLabels: Record = { + pending: 'Pending...', + generating: 'Preparing...', + executing: 'Running...', + success: 'Completed', + error: 'Failed', + } + + // Tool-specific labels + const toolLabels: Record = { + edit_workflow: 'Editing workflow', + get_user_workflow: 'Reading workflow', + get_block_config: 'Getting block config', + get_blocks_and_tools: 'Loading blocks', + get_credentials: 'Checking credentials', + run_workflow: 'Running workflow', + knowledge_base: 'Searching knowledge base', + navigate_ui: 'Navigating', + tour: 'Starting tour', + } + + return { + label: toolLabels[toolName] || toolName.replace(/_/g, ' '), + description: stateLabels[state], + } +} + +function checkToolNeedsInterrupt(toolName: string, args: Record): boolean { + // Tools that always need user approval + const interruptTools = ['deploy_api', 'deploy_chat', 'deploy_mcp', 'delete_workflow'] + return interruptTools.includes(toolName) +} + +function getInterruptOptions( + toolName: string, + args: Record +): Array<{ id: string; label: string; description?: string; variant?: 'default' | 'destructive' | 'outline' }> { + // Default interrupt options + return [ + { id: 'approve', label: 'Approve', variant: 'default' }, + { id: 'reject', label: 'Cancel', variant: 'outline' }, + ] +} + +async function emitEvent( + onRenderEvent: (event: RenderEvent) => Promise, + type: T, + data: Omit, 'type' | 'seq' | 'ts'> +): Promise { + const event = createRenderEvent(type, data) + await onRenderEvent(event) +} + diff --git a/apps/sim/lib/copilot/tools/server/router.ts b/apps/sim/lib/copilot/tools/server/router.ts index c8d76e015..d89da7256 100644 --- a/apps/sim/lib/copilot/tools/server/router.ts +++ b/apps/sim/lib/copilot/tools/server/router.ts @@ -1,4 +1,6 @@ import { createLogger } from '@sim/logger' +import crypto from 'crypto' +import { acquireLock, releaseLock } from '@/lib/core/config/redis' import type { BaseServerTool } from '@/lib/copilot/tools/server/base-tool' import { getBlockConfigServerTool } from '@/lib/copilot/tools/server/blocks/get-block-config' import { getBlockOptionsServerTool } from '@/lib/copilot/tools/server/blocks/get-block-options' @@ -30,6 +32,15 @@ import { GetTriggerBlocksResult, } from '@/lib/copilot/tools/shared/schemas' +/** Lock expiry in seconds for edit_workflow operations */ +const EDIT_WORKFLOW_LOCK_EXPIRY = 30 + +/** Maximum wait time in ms before giving up on acquiring the lock */ +const EDIT_WORKFLOW_LOCK_TIMEOUT = 15000 + +/** Delay between lock acquisition retries in ms */ +const EDIT_WORKFLOW_LOCK_RETRY_DELAY = 100 + // Generic execute response schemas (success path only for this route; errors handled via HTTP status) export { ExecuteResponseSuccessSchema } export type ExecuteResponseSuccess = (typeof ExecuteResponseSuccessSchema)['_type'] @@ -53,6 +64,30 @@ serverToolRegistry[getCredentialsServerTool.name] = getCredentialsServerTool serverToolRegistry[makeApiRequestServerTool.name] = makeApiRequestServerTool serverToolRegistry[knowledgeBaseServerTool.name] = knowledgeBaseServerTool +/** + * Acquire a lock with retries for workflow-mutating operations + */ +async function acquireLockWithRetry( + lockKey: string, + lockValue: string, + expirySeconds: number, + timeoutMs: number, + retryDelayMs: number +): Promise { + const startTime = Date.now() + + while (Date.now() - startTime < timeoutMs) { + const acquired = await acquireLock(lockKey, lockValue, expirySeconds) + if (acquired) { + return true + } + // Wait before retrying + await new Promise((resolve) => setTimeout(resolve, retryDelayMs)) + } + + return false +} + export async function routeExecution( toolName: string, payload: unknown, @@ -93,23 +128,74 @@ export async function routeExecution( args = KnowledgeBaseInput.parse(args) } - const result = await tool.execute(args, context) + // For edit_workflow, acquire a per-workflow lock to prevent race conditions + // when multiple edit_workflow calls happen in parallel for the same workflow + let lockKey: string | null = null + let lockValue: string | null = null - if (toolName === 'get_blocks_and_tools') { - return GetBlocksAndToolsResult.parse(result) - } - if (toolName === 'get_blocks_metadata') { - return GetBlocksMetadataResult.parse(result) - } - if (toolName === 'get_block_options') { - return GetBlockOptionsResult.parse(result) - } - if (toolName === 'get_block_config') { - return GetBlockConfigResult.parse(result) - } - if (toolName === 'get_trigger_blocks') { - return GetTriggerBlocksResult.parse(result) + if (toolName === 'edit_workflow' && args.workflowId) { + lockKey = `copilot:edit_workflow:lock:${args.workflowId}` + lockValue = crypto.randomUUID() + + const acquired = await acquireLockWithRetry( + lockKey, + lockValue, + EDIT_WORKFLOW_LOCK_EXPIRY, + EDIT_WORKFLOW_LOCK_TIMEOUT, + EDIT_WORKFLOW_LOCK_RETRY_DELAY + ) + + if (!acquired) { + logger.warn('Failed to acquire edit_workflow lock after timeout', { + workflowId: args.workflowId, + timeoutMs: EDIT_WORKFLOW_LOCK_TIMEOUT, + }) + throw new Error( + 'Workflow is currently being edited by another operation. Please try again shortly.' + ) + } + + logger.debug('Acquired edit_workflow lock', { + workflowId: args.workflowId, + lockKey, + }) } - return result + try { + const result = await tool.execute(args, context) + + if (toolName === 'get_blocks_and_tools') { + return GetBlocksAndToolsResult.parse(result) + } + if (toolName === 'get_blocks_metadata') { + return GetBlocksMetadataResult.parse(result) + } + if (toolName === 'get_block_options') { + return GetBlockOptionsResult.parse(result) + } + if (toolName === 'get_block_config') { + return GetBlockConfigResult.parse(result) + } + if (toolName === 'get_trigger_blocks') { + return GetTriggerBlocksResult.parse(result) + } + + return result + } finally { + // Always release the lock if we acquired one + if (lockKey && lockValue) { + const released = await releaseLock(lockKey, lockValue) + if (released) { + logger.debug('Released edit_workflow lock', { + workflowId: args.workflowId, + lockKey, + }) + } else { + logger.warn('Failed to release edit_workflow lock (may have expired)', { + workflowId: args.workflowId, + lockKey, + }) + } + } + } } diff --git a/apps/sim/lib/copilot/tools/server/workflow/edit-workflow.ts b/apps/sim/lib/copilot/tools/server/workflow/edit-workflow.ts index 60136d1a8..242c93f74 100644 --- a/apps/sim/lib/copilot/tools/server/workflow/edit-workflow.ts +++ b/apps/sim/lib/copilot/tools/server/workflow/edit-workflow.ts @@ -2550,7 +2550,7 @@ export const editWorkflowServerTool: BaseServerTool = { name: 'edit_workflow', async execute(params: EditWorkflowParams, context?: { userId: string }): Promise { const logger = createLogger('EditWorkflowServerTool') - const { operations, workflowId, currentUserWorkflow } = params + const { operations, workflowId } = params if (!Array.isArray(operations) || operations.length === 0) { throw new Error('operations are required and must be an array') } @@ -2559,22 +2559,14 @@ export const editWorkflowServerTool: BaseServerTool = { logger.info('Executing edit_workflow', { operationCount: operations.length, workflowId, - hasCurrentUserWorkflow: !!currentUserWorkflow, }) - // Get current workflow state - let workflowState: any - if (currentUserWorkflow) { - try { - workflowState = JSON.parse(currentUserWorkflow) - } catch (error) { - logger.error('Failed to parse currentUserWorkflow', error) - throw new Error('Invalid currentUserWorkflow format') - } - } else { - const fromDb = await getCurrentWorkflowStateFromDb(workflowId) - workflowState = fromDb.workflowState - } + // Always fetch from DB to ensure we have the latest state. + // This is critical because multiple edit_workflow calls may execute + // sequentially (via locking), and each must see the previous call's changes. + // The AI-provided currentUserWorkflow may be stale. + const fromDb = await getCurrentWorkflowStateFromDb(workflowId) + const workflowState = fromDb.workflowState // Get permission config for the user const permissionConfig = context?.userId ? await getUserPermissionConfig(context.userId) : null @@ -2659,16 +2651,42 @@ export const editWorkflowServerTool: BaseServerTool = { logger.warn('No userId in context - skipping custom tools persistence', { workflowId }) } + const finalWorkflowState = validation.sanitizedState || modifiedWorkflowState + logger.info('edit_workflow successfully applied operations', { operationCount: operations.length, - blocksCount: Object.keys(modifiedWorkflowState.blocks).length, - edgesCount: modifiedWorkflowState.edges.length, + blocksCount: Object.keys(finalWorkflowState.blocks).length, + edgesCount: finalWorkflowState.edges.length, inputValidationErrors: validationErrors.length, skippedItemsCount: skippedItems.length, schemaValidationErrors: validation.errors.length, validationWarnings: validation.warnings.length, }) + // IMPORTANT: Persist the workflow state to DB BEFORE returning. + // This ensures that subsequent edit_workflow calls (which fetch from DB) + // will see the latest state. Without this, there's a race condition where + // the client persists AFTER the lock is released, and another edit_workflow + // call can see stale state. + try { + const { saveWorkflowToNormalizedTables } = await import( + '@/lib/workflows/persistence/utils' + ) + await saveWorkflowToNormalizedTables(workflowId, finalWorkflowState) + logger.info('Persisted workflow state to DB before returning', { + workflowId, + blocksCount: Object.keys(finalWorkflowState.blocks).length, + edgesCount: finalWorkflowState.edges.length, + }) + } catch (persistError) { + logger.error('Failed to persist workflow state to DB', { + workflowId, + error: persistError instanceof Error ? persistError.message : String(persistError), + }) + // Don't throw - we still want to return the modified state + // The client will also try to persist, which may succeed + } + // Format validation errors for LLM feedback const inputErrors = validationErrors.length > 0 diff --git a/apps/sim/stores/panel/copilot/store.ts b/apps/sim/stores/panel/copilot/store.ts index b0c3f18b9..fa8ca78d9 100644 --- a/apps/sim/stores/panel/copilot/store.ts +++ b/apps/sim/stores/panel/copilot/store.ts @@ -54,6 +54,7 @@ import { TestClientTool } from '@/lib/copilot/tools/client/other/test' import { TourClientTool } from '@/lib/copilot/tools/client/other/tour' import { WorkflowClientTool } from '@/lib/copilot/tools/client/other/workflow' import { createExecutionContext, getTool } from '@/lib/copilot/tools/client/registry' +import { isClientOnlyTool } from '@/lib/copilot/tools/client/ui-config' import { GetCredentialsClientTool } from '@/lib/copilot/tools/client/user/get-credentials' import { SetEnvironmentVariablesClientTool } from '@/lib/copilot/tools/client/user/set-environment-variables' import { CheckDeploymentStatusClientTool } from '@/lib/copilot/tools/client/workflow/check-deployment-status' @@ -943,6 +944,12 @@ interface StreamingContext { subAgentToolCalls: Record /** Track subagent streaming blocks per parent tool call */ subAgentBlocks: Record + /** Plan capture state (for render events) */ + isCapturingPlan?: boolean + planContent?: string + /** Options capture state (for render events) */ + isCapturingOptions?: boolean + optionsContent?: string } type SSEHandler = ( @@ -1255,30 +1262,49 @@ const sseHandlers: Record = { } catch {} }, tool_generating: (data, context, get, set) => { - const { toolCallId, toolName } = data - if (!toolCallId || !toolName) return + // Handle both old format ({ toolCallId, toolName }) and new render event format ({ toolCallId, argsPartial }) + const toolCallId = data?.toolCallId || data?.data?.id + const toolName = data?.toolName || data?.data?.name + const argsPartial = data?.argsPartial + + if (!toolCallId) return const { toolCallsById } = get() + // If tool already exists, update its args if we have partial args + const existing = toolCallsById[toolCallId] + if (existing) { + if (argsPartial) { + const updated: CopilotToolCall = { + ...existing, + params: argsPartial, + } + set({ toolCallsById: { ...toolCallsById, [toolCallId]: updated } }) + } + return + } + + // Tool doesn't exist yet - create it + if (!toolName) return + // Ensure class-based client tool instances are registered (for interrupts/display) ensureClientToolInstance(toolName, toolCallId) - if (!toolCallsById[toolCallId]) { - // Show as pending until we receive full tool_call (with arguments) to decide execution - const initialState = ClientToolCallState.pending - const tc: CopilotToolCall = { - id: toolCallId, - name: toolName, - state: initialState, - display: resolveToolDisplay(toolName, initialState, toolCallId), - } - const updated = { ...toolCallsById, [toolCallId]: tc } - set({ toolCallsById: updated }) - logger.info('[toolCallsById] map updated', updated) - - // Add/refresh inline content block - upsertToolCallBlock(context, tc) - updateStreamingMessage(set, context) + // Show as pending until we receive full tool_call (with arguments) to decide execution + const initialState = ClientToolCallState.pending + const tc: CopilotToolCall = { + id: toolCallId, + name: toolName, + state: initialState, + display: resolveToolDisplay(toolName, initialState, toolCallId), + params: argsPartial, } + const updated = { ...toolCallsById, [toolCallId]: tc } + set({ toolCallsById: updated }) + logger.info('[toolCallsById] map updated (generating)', { toolCallId, toolName }) + + // Add/refresh inline content block + upsertToolCallBlock(context, tc) + updateStreamingMessage(set, context) }, tool_call: (data, context, get, set) => { const toolData = data?.data || {} @@ -1287,11 +1313,15 @@ const sseHandlers: Record = { if (!id) return const args = toolData.arguments const isPartial = toolData.partial === true - const { toolCallsById } = get() + const { toolCallsById, isResuming } = get() // Ensure class-based client tool instances are registered (for interrupts/display) ensureClientToolInstance(name, id) + // When resuming a stream, tools have already been executed on the server. + // We only update the UI state - don't re-execute. + const skipExecution = isResuming + const existing = toolCallsById[id] const next: CopilotToolCall = existing ? { @@ -1309,7 +1339,7 @@ const sseHandlers: Record = { } const updated = { ...toolCallsById, [id]: next } set({ toolCallsById: updated }) - logger.info('[toolCallsById] → pending', { id, name, params: args }) + logger.info('[toolCallsById] → pending', { id, name, params: args, skipExecution }) // Ensure an inline content block exists/updated for this tool call upsertToolCallBlock(context, next) @@ -1320,6 +1350,60 @@ const sseHandlers: Record = { return } + // When resuming a stream, tools have already been executed on the server. + // The tool_result events in the stream will update the final state. + // Skip client-side execution to avoid re-running tools. + if (skipExecution) { + logger.info('[toolCallsById] Skipping execution (resuming stream)', { id, name }) + return + } + + // SERVER-FIRST ARCHITECTURE: + // All tools are executed server-side by default. The client is just a rendering layer. + // The server sends tool_result events when tools complete, which update the UI. + // Only clientOnly tools (like navigate_ui, tour) that require browser interaction + // are executed on the client. + const toolIsClientOnly = name ? isClientOnlyTool(name) : false + + if (!toolIsClientOnly) { + // Server-side tool: just mark as executing and wait for tool_result from server + logger.info('[toolCallsById] Server-side tool, waiting for tool_result', { id, name }) + + // Update to executing state (server is processing it) + setTimeout(() => { + const currentState = get().toolCallsById[id]?.state + if (currentState === ClientToolCallState.executing || isTerminalState(currentState)) { + return + } + + const executingMap = { ...get().toolCallsById } + executingMap[id] = { + ...executingMap[id], + state: ClientToolCallState.executing, + display: resolveToolDisplay(name, ClientToolCallState.executing, id, args), + } + set({ toolCallsById: executingMap }) + logger.info('[toolCallsById] pending → executing (server)', { id, name }) + + // Update inline content block to executing + for (let i = 0; i < context.contentBlocks.length; i++) { + const b = context.contentBlocks[i] as any + if (b.type === 'tool_call' && b.toolCall?.id === id) { + context.contentBlocks[i] = { + ...b, + toolCall: { ...b.toolCall, state: ClientToolCallState.executing }, + } + break + } + } + updateStreamingMessage(set, context) + }, 0) + return + } + + // CLIENT-ONLY TOOL: Execute on the client (requires browser interaction) + logger.info('[toolCallsById] Client-only tool, executing locally', { id, name }) + // Prefer interface-based registry to determine interrupt and execute try { const def = name ? getTool(name) : undefined @@ -1345,7 +1429,7 @@ const sseHandlers: Record = { display: resolveToolDisplay(name, ClientToolCallState.executing, id, args), } set({ toolCallsById: executingMap }) - logger.info('[toolCallsById] pending → executing (registry)', { id, name }) + logger.info('[toolCallsById] pending → executing (client-only)', { id, name }) // Update inline content block to executing for (let i = 0; i < context.contentBlocks.length; i++) { @@ -1388,7 +1472,7 @@ const sseHandlers: Record = { } set({ toolCallsById: completeMap }) logger.info( - `[toolCallsById] executing → ${success ? 'success' : 'error'} (registry)`, + `[toolCallsById] executing → ${success ? 'success' : 'error'} (client-only)`, { id, name } ) @@ -1778,6 +1862,278 @@ const sseHandlers: Record = { finalizeThinkingBlock(context) updateStreamingMessage(set, context) }, + + // ============================================================================ + // RENDER EVENT HANDLERS (new server-first SSE protocol) + // These handle render events from the stream transformer + // ============================================================================ + + // Stream lifecycle events + stream_start: (data, context, get, _set) => { + // Stream started - store IDs + if (data.chatId) context.newChatId = data.chatId + if (data.streamId) get().setActiveStreamId(data.streamId) + logger.debug('[RenderEvent] stream_start', { chatId: data.chatId, streamId: data.streamId }) + }, + + // Text content - same as existing content handler + text_delta: (data, context, get, set) => { + const content = data.content + if (!content) return + // Reuse existing content processing logic - it will handle pendingContent + sseHandlers.content({ data: content, type: 'content' }, context, get, set) + }, + + // Thinking block events + thinking_start: (_data, context, _get, _set) => { + context.isInThinkingBlock = true + beginThinkingBlock(context) + logger.debug('[RenderEvent] thinking_start') + }, + + thinking_delta: (data, context, _get, set) => { + const content = data.content + if (!content) return + appendThinkingContent(context, content) + updateStreamingMessage(set, context) + }, + + thinking_end: (_data, context, _get, set) => { + context.isInThinkingBlock = false + finalizeThinkingBlock(context) + updateStreamingMessage(set, context) + logger.debug('[RenderEvent] thinking_end') + }, + + // Tool call events + tool_pending: (data, context, get, set) => { + const { toolCallId, toolName, args } = data + if (!toolCallId || !toolName) return + + // Create tool call entry - always use resolveToolDisplay for consistency + const tc: CopilotToolCall = { + id: toolCallId, + name: toolName, + state: ClientToolCallState.pending, + display: resolveToolDisplay(toolName, ClientToolCallState.pending, toolCallId, args), + params: args, + } + + const { toolCallsById } = get() + set({ toolCallsById: { ...toolCallsById, [toolCallId]: tc } }) + + // Add to content blocks + upsertToolCallBlock(context, tc) + updateStreamingMessage(set, context) + logger.debug('[RenderEvent] tool_pending', { toolCallId, toolName }) + }, + + tool_executing: (data, context, get, set) => { + const { toolCallId } = data + if (!toolCallId) return + + const { toolCallsById } = get() + const current = toolCallsById[toolCallId] + if (current) { + const updated: CopilotToolCall = { + ...current, + state: ClientToolCallState.executing, + display: resolveToolDisplay(current.name, ClientToolCallState.executing, toolCallId, current.params), + } + set({ toolCallsById: { ...toolCallsById, [toolCallId]: updated } }) + + // Update in content blocks + for (let i = 0; i < context.contentBlocks.length; i++) { + const b = context.contentBlocks[i] as any + if (b?.type === 'tool_call' && b?.toolCall?.id === toolCallId) { + context.contentBlocks[i] = { ...b, toolCall: updated } + break + } + } + updateStreamingMessage(set, context) + } + logger.debug('[RenderEvent] tool_executing', { toolCallId }) + }, + + tool_success: (data, context, get, set) => { + const { toolCallId, hasDiff } = data + if (!toolCallId) return + + const { toolCallsById } = get() + const current = toolCallsById[toolCallId] + if (current) { + const updated: CopilotToolCall = { + ...current, + state: ClientToolCallState.success, + display: resolveToolDisplay(current.name, ClientToolCallState.success, toolCallId, current.params), + } + set({ toolCallsById: { ...toolCallsById, [toolCallId]: updated } }) + + // Update in content blocks + for (let i = 0; i < context.contentBlocks.length; i++) { + const b = context.contentBlocks[i] as any + if (b?.type === 'tool_call' && b?.toolCall?.id === toolCallId) { + context.contentBlocks[i] = { ...b, toolCall: updated } + break + } + } + updateStreamingMessage(set, context) + + // Handle checkoff_todo completion + if (current.name === 'checkoff_todo') { + try { + const input = current.params || {} + const todoId = (input as any).id || (input as any).todoId + if (todoId) get().updatePlanTodoStatus(todoId, 'completed') + } catch {} + } + } + logger.debug('[RenderEvent] tool_success', { toolCallId, hasDiff }) + }, + + tool_aborted: (data, context, get, set) => { + const { toolCallId, reason } = data + if (!toolCallId) return + + const { toolCallsById } = get() + const current = toolCallsById[toolCallId] + if (current) { + const updated: CopilotToolCall = { + ...current, + state: ClientToolCallState.rejected, + display: resolveToolDisplay(current.name, ClientToolCallState.rejected, toolCallId, current.params), + } + set({ toolCallsById: { ...toolCallsById, [toolCallId]: updated } }) + + for (let i = 0; i < context.contentBlocks.length; i++) { + const b = context.contentBlocks[i] as any + if (b?.type === 'tool_call' && b?.toolCall?.id === toolCallId) { + context.contentBlocks[i] = { ...b, toolCall: updated } + break + } + } + updateStreamingMessage(set, context) + } + logger.debug('[RenderEvent] tool_aborted', { toolCallId, reason }) + }, + + // Diff events - tell client to read workflow from DB + diff_ready: (data, _context, get, _set) => { + const { workflowId, toolCallId } = data + logger.info('[RenderEvent] diff_ready - loading workflow with diff from DB', { workflowId, toolCallId }) + + // Trigger workflow reload to get the diff markers from DB + // The workflow store will load the state which includes diff markers + // Then restoreDiffFromMarkers will show the diff UI + const currentWorkflowId = get().workflowId + if (currentWorkflowId === workflowId) { + // Workflow matches - trigger a reload via the workflow registry + try { + const { useWorkflowRegistry } = require('@/stores/workflows/registry') + const registry = useWorkflowRegistry.getState() + + // Trigger refresh from DB + if (typeof registry.refreshFromServer === 'function') { + registry.refreshFromServer(workflowId).catch((err: any) => { + logger.error('Failed to refresh workflow after diff_ready', { error: err }) + }) + } else if (typeof registry.loadWorkflow === 'function') { + registry.loadWorkflow(workflowId).catch((err: any) => { + logger.error('Failed to load workflow after diff_ready', { error: err }) + }) + } + } catch (err) { + logger.error('Failed to trigger workflow reload for diff', { error: err }) + } + } + }, + + // Plan events + plan_start: (_data, context, _get, set) => { + context.isCapturingPlan = true + context.planContent = '' + set({ streamingPlanContent: '' }) + logger.debug('[RenderEvent] plan_start') + }, + + plan_delta: (data, context, _get, set) => { + const content = data.content + if (!content) return + context.planContent = (context.planContent || '') + content + set({ streamingPlanContent: context.planContent }) + }, + + plan_end: (data, context, get, set) => { + context.isCapturingPlan = false + if (data.todos && Array.isArray(data.todos)) { + const todos = data.todos.map((t: any) => ({ + id: t.id || crypto.randomUUID(), + content: t.content, + completed: t.status === 'completed', + executing: t.status === 'in_progress', + })) + set({ planTodos: todos, showPlanTodos: true }) + } + logger.debug('[RenderEvent] plan_end', { todoCount: data.todos?.length }) + }, + + // Options events + options_start: (_data, context, _get, _set) => { + context.isCapturingOptions = true + context.optionsContent = '' + logger.debug('[RenderEvent] options_start') + }, + + options_delta: (data, context, _get, _set) => { + const content = data.content + if (!content) return + context.optionsContent = (context.optionsContent || '') + content + }, + + options_end: (data, context, _get, set) => { + context.isCapturingOptions = false + if (data.options && Array.isArray(data.options)) { + // Add options block to content + context.contentBlocks.push({ + type: 'options', + options: data.options, + timestamp: Date.now(), + }) + updateStreamingMessage(set, context) + } + logger.debug('[RenderEvent] options_end', { optionCount: data.options?.length }) + }, + + // Message lifecycle events + message_start: (data, context, _get, _set) => { + if (data.role === 'assistant') { + context.messageId = data.messageId + } + logger.debug('[RenderEvent] message_start', { messageId: data.messageId, role: data.role }) + }, + + message_saved: (data, _context, _get, _set) => { + // Message saved to DB - safe to refresh + logger.debug('[RenderEvent] message_saved', { messageId: data.messageId, refreshFromDb: data.refreshFromDb }) + }, + + message_end: (data, _context, _get, _set) => { + logger.debug('[RenderEvent] message_end', { messageId: data.messageId }) + }, + + // Interrupt events + interrupt_show: (data, context, get, set) => { + const { toolCallId, toolName, options, message: interruptMessage } = data + if (!toolCallId) return + + logger.info('[RenderEvent] interrupt_show - user approval required', { toolCallId, toolName, options }) + // TODO: Show interrupt UI to user + }, + + interrupt_resolved: (data, _context, _get, _set) => { + logger.debug('[RenderEvent] interrupt_resolved', { toolCallId: data.toolCallId, choice: data.choice }) + }, + default: () => {}, } @@ -1980,7 +2336,30 @@ const subAgentSSEHandlers: Record = { return } - // Execute client tools in parallel (non-blocking) - same pattern as main tool_call handler + // When resuming a stream, tools have already been executed on the server. + // The tool_result events in the stream will update the final state. + // Skip client-side execution to avoid re-running tools. + const { isResuming } = get() + if (isResuming) { + logger.info('[SubAgent] Skipping tool execution (resuming stream)', { id, name }) + return + } + + // SERVER-FIRST ARCHITECTURE: + // All tools are executed server-side by default. The client is just a rendering layer. + // Only clientOnly tools (like navigate_ui, tour) that require browser interaction + // are executed on the client. + const toolIsClientOnly = isClientOnlyTool(name) + + if (!toolIsClientOnly) { + // Server-side tool: just wait for tool_result from server + logger.info('[SubAgent] Server-side tool, waiting for tool_result', { id, name }) + return + } + + // CLIENT-ONLY TOOL: Execute on the client (requires browser interaction) + logger.info('[SubAgent] Client-only tool, executing locally', { id, name }) + try { const def = getTool(name) if (def) { @@ -2260,7 +2639,7 @@ export const useCopilotStore = create()( // Don't abort - let server-side stream continue for resumption // Just reset client state; stream will be resumable when returning to the chat // Don't abort tools either - they may still be running server-side - set({ isSendingMessage: false, abortController: null }) + set({ isSendingMessage: false, abortController: null, activeStreamId: null }) try { useWorkflowDiffStore.getState().clearDiff({ restoreBaseline: false }) @@ -2272,6 +2651,7 @@ export const useCopilotStore = create()( mode: get().mode, selectedModel: get().selectedModel, agentPrefetch: get().agentPrefetch, + activeStreamId: null, // Ensure cleared }) }, @@ -2297,9 +2677,12 @@ export const useCopilotStore = create()( // Just reset client state; stream will be resumable when returning to that chat // Don't abort tools either - they may still be running server-side if (currentChat && currentChat.id !== chat.id && isSendingMessage) { - set({ isSendingMessage: false, abortController: null }) + set({ isSendingMessage: false, abortController: null, activeStreamId: null }) } + // Clear any stale activeStreamId from previous chat + set({ activeStreamId: null }) + try { useWorkflowDiffStore.getState().clearDiff({ restoreBaseline: false }) } catch {} @@ -2405,9 +2788,12 @@ export const useCopilotStore = create()( // Just reset client state; stream will be resumable when returning to that chat // Don't abort tools either - they may still be running server-side if (isSendingMessage) { - set({ isSendingMessage: false, abortController: null }) + set({ isSendingMessage: false, abortController: null, activeStreamId: null }) } + // Clear any stale activeStreamId from previous chat + set({ activeStreamId: null }) + try { useWorkflowDiffStore.getState().clearDiff({ restoreBaseline: false }) } catch {} @@ -2832,77 +3218,44 @@ export const useCopilotStore = create()( // Abort streaming (user-initiated) abortMessage: (options?: { suppressContinueOption?: boolean }) => { - const { abortController, isSendingMessage, messages } = get() - if (!isSendingMessage || !abortController) return + const { abortController, isSendingMessage, isAborting } = get() + + // Don't abort if already aborting or not sending + if (isAborting) { + logger.info('[Abort] Already aborting, skipping') + return + } + if (!isSendingMessage) { + logger.info('[Abort] Not sending message, skipping') + return + } + if (!abortController) { + logger.info('[Abort] No abort controller, skipping') + return + } + const suppressContinueOption = options?.suppressContinueOption === true + logger.info('[Abort] User initiated abort', { suppressContinueOption }) + // Mark this as a user-initiated abort (vs browser refresh which doesn't call this) - set({ isAborting: true, suppressAbortContinueOption: suppressContinueOption, userInitiatedAbort: true }) + // Set userInitiatedAbort BEFORE calling abort() so the streaming loop sees it + set({ + isAborting: true, + suppressAbortContinueOption: suppressContinueOption, + userInitiatedAbort: true, + }) + + // Abort the controller - the streaming loop will handle cleanup + // We do NOT set isSendingMessage: false here - let the streaming loop do it + // This prevents a race condition where the loop exits before seeing the abort try { abortController.abort() stopStreamingUpdates() - const lastMessage = messages[messages.length - 1] - if (lastMessage && lastMessage.role === 'assistant') { - const textContent = - lastMessage.contentBlocks - ?.filter((b) => b.type === 'text') - .map((b: any) => b.content) - .join('') || '' - const nextContentBlocks = suppressContinueOption - ? (lastMessage.contentBlocks ?? []) - : appendContinueOptionBlock( - lastMessage.contentBlocks ? [...lastMessage.contentBlocks] : [] - ) - set((state) => ({ - messages: state.messages.map((msg) => - msg.id === lastMessage.id - ? { - ...msg, - content: suppressContinueOption - ? textContent.trim() || 'Message was aborted' - : appendContinueOption(textContent.trim() || 'Message was aborted'), - contentBlocks: nextContentBlocks, - } - : msg - ), - isSendingMessage: false, - isAborting: false, - // Keep abortController so streaming loop can check signal.aborted - // It will be nulled when streaming completes or new message starts - })) - } else { - set({ - isSendingMessage: false, - isAborting: false, - // Keep abortController so streaming loop can check signal.aborted - }) - } - - // Immediately put all in-progress tools into aborted state - abortAllInProgressTools(set, get) - - // Persist whatever contentBlocks/text we have to keep ordering for reloads - const { currentChat, streamingPlanContent, mode, selectedModel } = get() - if (currentChat) { - try { - const currentMessages = get().messages - const dbMessages = validateMessagesForLLM(currentMessages) - fetch('/api/copilot/chat/update-messages', { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ - chatId: currentChat.id, - messages: dbMessages, - planArtifact: streamingPlanContent || null, - config: { - mode, - model: selectedModel, - }, - }), - }).catch(() => {}) - } catch {} - } - } catch { - set({ isSendingMessage: false, isAborting: false }) + // The streaming loop will detect signal.aborted and userInitiatedAbort, + // then update the UI and set isSendingMessage: false + } catch (err) { + logger.error('[Abort] Error aborting', { error: err }) + set({ isAborting: false }) } }, @@ -3293,17 +3646,67 @@ export const useCopilotStore = create()( // Only treat as abort if user explicitly clicked stop (not browser refresh) if (userInitiatedAbort) { context.wasAborted = true - const { suppressAbortContinueOption } = get() + const { suppressAbortContinueOption, messages } = get() context.suppressContinueOption = suppressAbortContinueOption === true if (suppressAbortContinueOption) { set({ suppressAbortContinueOption: false }) } - set({ userInitiatedAbort: false }) // Reset flag - // User-initiated abort: clean up and break + + // User-initiated abort: Update UI with abort state + const lastMessage = messages[messages.length - 1] + if (lastMessage && lastMessage.role === 'assistant') { + const textContent = + lastMessage.contentBlocks + ?.filter((b) => b.type === 'text') + .map((b: any) => b.content) + .join('') || '' + const nextContentBlocks = context.suppressContinueOption + ? (lastMessage.contentBlocks ?? []) + : appendContinueOptionBlock( + lastMessage.contentBlocks ? [...lastMessage.contentBlocks] : [] + ) + set((state) => ({ + messages: state.messages.map((msg) => + msg.id === lastMessage.id + ? { + ...msg, + content: context.suppressContinueOption + ? textContent.trim() || 'Message was aborted' + : appendContinueOption(textContent.trim() || 'Message was aborted'), + contentBlocks: nextContentBlocks, + } + : msg + ), + })) + } + + // Immediately put all in-progress tools into aborted state + abortAllInProgressTools(set, get) + + // Persist current state to database + const { currentChat, streamingPlanContent, mode, selectedModel } = get() + if (currentChat) { + try { + const currentMessages = get().messages + const dbMessages = validateMessagesForLLM(currentMessages) + fetch('/api/copilot/chat/update-messages', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + chatId: currentChat.id, + messages: dbMessages, + planArtifact: streamingPlanContent || null, + config: { mode, model: selectedModel }, + }), + }).catch(() => {}) + } catch {} + } + + set({ userInitiatedAbort: false, isSendingMessage: false, isAborting: false }) context.pendingContent = '' finalizeThinkingBlock(context) stopStreamingUpdates() - reader.cancel() + reader.cancel().catch(() => {}) break } else { // Browser refresh/navigation - don't update any UI, just exit @@ -4060,13 +4463,51 @@ export const useCopilotStore = create()( if (!response.ok) { const data = await response.json().catch(() => ({})) - // Stream completed or errored - refresh messages from DB - if (data.status === 'completed' || data.status === 'error') { - logger.info('[Resume] Stream already finished', { streamId, status: data.status }) - // Reload the chat to get the saved messages + // Stream completed, errored, or not found (already cleaned up) - refresh messages from DB + if ( + data.status === 'completed' || + data.status === 'error' || + data.status === 'not_found' || + response.status === 404 + ) { + logger.info('[Resume] Stream finished or cleaned up, reloading from DB', { + streamId, + status: data.status || response.status, + }) + // Reload the chat to get the saved messages from database const currentChat = get().currentChat if (currentChat) { - await get().selectChat(currentChat) + // Fetch fresh chat data from API + try { + const chatResponse = await fetch( + `/api/copilot/chat?workflowId=${get().workflowId}`, + { credentials: 'include' } + ) + if (chatResponse.ok) { + const chatData = await chatResponse.json() + if (chatData.success && Array.isArray(chatData.chats)) { + const updatedChat = chatData.chats.find( + (c: CopilotChat) => c.id === currentChat.id + ) + if (updatedChat) { + const normalizedMessages = normalizeMessagesForUI(updatedChat.messages || []) + const toolCallsById = buildToolCallsById(normalizedMessages) + set({ + currentChat: updatedChat, + messages: normalizedMessages, + toolCallsById, + chats: chatData.chats, + }) + logger.info('[Resume] Reloaded messages from DB', { + chatId: currentChat.id, + messageCount: normalizedMessages.length, + }) + } + } + } + } catch (err) { + logger.warn('[Resume] Failed to reload chat from DB', { error: err }) + } } return } @@ -4080,28 +4521,115 @@ export const useCopilotStore = create()( return } - // Create a placeholder assistant message for the resumed stream - const resumeMessageId = crypto.randomUUID() + // Find the existing assistant message to resume into + // Since we replay ALL chunks from Redis (from=0), we rebuild the message from scratch + // Don't merge with existing content to avoid duplicates const messages = get().messages - const assistantMessage: CopilotMessage = { - id: resumeMessageId, - role: 'assistant', - content: '', - timestamp: new Date().toISOString(), - toolCalls: [], - contentBlocks: [], + let resumeMessageId: string + const lastMessage = messages[messages.length - 1] + + if (lastMessage?.role === 'assistant') { + // Resume into the existing assistant message, but CLEAR its content first + // since we're replaying all chunks from the beginning + resumeMessageId = lastMessage.id + set((state) => ({ + messages: state.messages.map((m) => + m.id === resumeMessageId + ? { ...m, content: '', contentBlocks: [], toolCalls: [] } + : m + ), + })) + logger.info('[Resume] Resuming into existing message (cleared)', { + messageId: resumeMessageId, + }) + } else { + // Create a new assistant message + resumeMessageId = crypto.randomUUID() + const assistantMessage: CopilotMessage = { + id: resumeMessageId, + role: 'assistant', + content: '', + timestamp: new Date().toISOString(), + toolCalls: [], + contentBlocks: [], + } + set({ messages: [...messages, assistantMessage] }) + logger.info('[Resume] Created new message for resume', { messageId: resumeMessageId }) } - set({ messages: [...messages, assistantMessage] }) + // Process the resumed stream - NOT a continuation since we're rebuilding from scratch + await get().handleStreamingResponse(response.body, resumeMessageId, false) - // Process the resumed stream - await get().handleStreamingResponse(response.body, resumeMessageId, true) + // After stream processing, check if there's a pending diff to restore + await get().restorePendingDiff(streamId) } catch (error) { logger.error('[Resume] Stream resumption failed', { streamId, error }) } finally { set({ isResuming: false, isSendingMessage: false, activeStreamId: null }) } }, + + restorePendingDiff: async (streamId) => { + try { + const { useWorkflowDiffStore } = await import('@/stores/workflow-diff/store') + const diffStore = useWorkflowDiffStore.getState() + + // Fetch pending diff from Redis via API + const response = await fetch(`/api/copilot/stream/${streamId}/pending-diff`, { + credentials: 'include', + }) + + if (!response.ok) { + return // No pending diff + } + + const data = await response.json() + if (!data.pendingDiff) { + return + } + + const { baselineWorkflow, proposedWorkflow, diffAnalysis, toolCallId } = data.pendingDiff + + // If there's already an active diff (possibly restored from markers), + // we still want to restore the baseline so reject can work properly + if (diffStore.hasActiveDiff) { + if (!diffStore.baselineWorkflow && baselineWorkflow) { + logger.info('[Resume] Diff already active but missing baseline, updating baseline', { + toolCallId, + }) + // Just update the baseline in the existing diff state + const { useWorkflowRegistry } = await import('@/stores/workflows/registry/store') + const activeWorkflowId = useWorkflowRegistry.getState().activeWorkflowId + useWorkflowDiffStore.setState({ + baselineWorkflow: baselineWorkflow, + baselineWorkflowId: activeWorkflowId, + }) + } else { + logger.info('[Resume] Diff already fully restored, skipping') + } + return + } + + logger.info('[Resume] Restoring pending diff', { + toolCallId, + hasBaseline: !!baselineWorkflow, + }) + + // Restore the diff with the saved baseline so reject can work properly + if (baselineWorkflow && diffAnalysis) { + useWorkflowDiffStore.getState().restoreDiffWithBaseline( + baselineWorkflow, + proposedWorkflow, + diffAnalysis + ) + } else { + // Fallback: just set proposed changes (baseline will be captured fresh) + await useWorkflowDiffStore.getState().setProposedChanges(proposedWorkflow, diffAnalysis) + } + } catch (error) { + logger.warn('[Resume] Failed to restore pending diff', { streamId, error }) + } + }, })) ) diff --git a/apps/sim/stores/panel/copilot/types.ts b/apps/sim/stores/panel/copilot/types.ts index e7b151adf..cd301ef8e 100644 --- a/apps/sim/stores/panel/copilot/types.ts +++ b/apps/sim/stores/panel/copilot/types.ts @@ -261,6 +261,7 @@ export interface CopilotActions { checkForActiveStream: (chatId: string) => Promise resumeActiveStream: (streamId: string) => Promise setActiveStreamId: (streamId: string | null) => void + restorePendingDiff: (streamId: string) => Promise } export type CopilotStore = CopilotState & CopilotActions diff --git a/apps/sim/stores/workflow-diff/store.ts b/apps/sim/stores/workflow-diff/store.ts index 285be7e11..112c5d11e 100644 --- a/apps/sim/stores/workflow-diff/store.ts +++ b/apps/sim/stores/workflow-diff/store.ts @@ -20,6 +20,50 @@ import { persistWorkflowStateToServer, } from './utils' +/** Get the active stream ID from copilot store (lazy import to avoid circular deps) */ +async function getActiveStreamId(): Promise { + try { + const { useCopilotStore } = await import('@/stores/panel/copilot/store') + return useCopilotStore.getState().activeStreamId + } catch { + return null + } +} + +/** Save pending diff to server (Redis) via API */ +async function savePendingDiffToServer( + streamId: string, + pendingDiff: { + toolCallId: string + baselineWorkflow: unknown + proposedWorkflow: unknown + diffAnalysis: unknown + } +): Promise { + try { + await fetch(`/api/copilot/stream/${streamId}/pending-diff`, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + credentials: 'include', + body: JSON.stringify({ pendingDiff }), + }) + } catch (err) { + logger.warn('Failed to save pending diff to server', { error: err }) + } +} + +/** Clear pending diff from server (Redis) via API */ +async function clearPendingDiffFromServer(streamId: string): Promise { + try { + await fetch(`/api/copilot/stream/${streamId}/pending-diff`, { + method: 'DELETE', + credentials: 'include', + }) + } catch { + // Ignore errors - not critical + } +} + const logger = createLogger('WorkflowDiffStore') const diffEngine = new WorkflowDiffEngine() @@ -169,32 +213,35 @@ export const useWorkflowDiffStore = create { - logger.warn('Failed to broadcast workflow state (non-blocking)', { error }) + // Persist pending diff to Redis for resumption on page refresh + getActiveStreamId().then((streamId) => { + if (streamId) { + findLatestEditWorkflowToolCallId().then((toolCallId) => { + if (toolCallId) { + savePendingDiffToServer(streamId, { + toolCallId, + baselineWorkflow: baselineWorkflow, + proposedWorkflow: candidateState, + diffAnalysis: diffAnalysisResult, + }) + } + }) + } }) - // Fire and forget: persist to database (don't await) - persistWorkflowStateToServer(activeWorkflowId, candidateState) + // NOTE: We do NOT broadcast to other users here (to prevent socket errors on refresh). + // But we DO persist to database WITH diff markers so the proposed state survives page refresh + // and the diff UI can be restored. Final broadcast (without markers) happens when user accepts. + persistWorkflowStateToServer(activeWorkflowId, candidateState, { preserveDiffMarkers: true }) .then((persisted) => { if (!persisted) { - logger.warn('Failed to persist copilot edits (state already applied locally)') - // Don't revert - user can retry or state will sync on next save + logger.warn('Failed to persist diff preview state') } else { - logger.info('Workflow diff persisted to database', { - workflowId: activeWorkflowId, - }) + logger.info('Diff preview state persisted with markers', { workflowId: activeWorkflowId }) } }) .catch((error) => { - logger.warn('Failed to persist workflow state (non-blocking)', { error }) + logger.warn('Failed to persist diff preview state', { error }) }) // Emit event for undo/redo recording @@ -212,6 +259,37 @@ export const useWorkflowDiffStore = create { + const activeWorkflowId = useWorkflowRegistry.getState().activeWorkflowId + if (!activeWorkflowId) { + logger.error('Cannot restore diff without an active workflow') + return + } + + logger.info('Restoring diff with baseline', { + workflowId: activeWorkflowId, + hasBaseline: !!baselineWorkflow, + newBlocks: diffAnalysis.new_blocks?.length || 0, + editedBlocks: diffAnalysis.edited_blocks?.length || 0, + }) + + // Set the diff state with the provided baseline + batchedUpdate({ + hasActiveDiff: true, + isShowingDiff: true, + isDiffReady: true, + baselineWorkflow: baselineWorkflow, + baselineWorkflowId: activeWorkflowId, + diffAnalysis: diffAnalysis, + diffMetadata: null, + diffError: null, + }) + + // The proposed workflow should already be applied (blocks have is_diff markers) + // Just re-apply the markers to ensure they're visible + setTimeout(() => get().reapplyDiffMarkers(), 0) + }, + clearDiff: ({ restoreBaseline = true } = {}) => { const { baselineWorkflow, baselineWorkflowId } = get() const activeWorkflowId = useWorkflowRegistry.getState().activeWorkflowId @@ -292,6 +370,13 @@ export const useWorkflowDiffStore = create { + if (streamId) { + clearPendingDiffFromServer(streamId) + } + }) + // Clear diff state FIRST to prevent flash of colors // This must happen synchronously before applying the cleaned state set({ @@ -312,6 +397,32 @@ export const useWorkflowDiffStore = create { + logger.warn('Failed to broadcast accepted workflow state', { error }) + }) + + // Fire and forget: persist to database + persistWorkflowStateToServer(activeWorkflowId, stateToApply) + .then((persisted) => { + if (!persisted) { + logger.warn('Failed to persist accepted workflow changes') + } else { + logger.info('Accepted workflow changes persisted to database', { + workflowId: activeWorkflowId, + }) + } + }) + .catch((error) => { + logger.warn('Failed to persist accepted workflow state', { error }) + }) + // Emit event for undo/redo recording (unless we're in an undo/redo operation) if (!(window as any).__skipDiffRecording) { window.dispatchEvent( @@ -356,8 +467,19 @@ export const useWorkflowDiffStore = create { + if (streamId) { + clearPendingDiffFromServer(streamId) + } + }) + // Clear diff state FIRST for instant UI feedback set({ hasActiveDiff: false, @@ -526,6 +655,94 @@ export const useWorkflowDiffStore = create { + // Check if the workflow has any blocks with is_diff markers + // If so, restore the diff store state to show the diff view + const { hasActiveDiff } = get() + if (hasActiveDiff) { + // Already have an active diff + return + } + + const workflowStore = useWorkflowStore.getState() + const blocks = workflowStore.blocks + + const newBlocks: string[] = [] + const editedBlocks: string[] = [] + const fieldDiffs: Record = {} + + Object.entries(blocks).forEach(([blockId, block]) => { + const isDiff = (block as any).is_diff + if (isDiff === 'new') { + newBlocks.push(blockId) + } else if (isDiff === 'edited') { + editedBlocks.push(blockId) + // Check for field diffs + const blockFieldDiffs = (block as any).field_diffs + if (blockFieldDiffs) { + fieldDiffs[blockId] = blockFieldDiffs + } else { + // Check subBlocks for is_diff markers + const changedFields: string[] = [] + Object.entries((block as any).subBlocks || {}).forEach( + ([fieldId, subBlock]: [string, any]) => { + if (subBlock?.is_diff === 'changed') { + changedFields.push(fieldId) + } + } + ) + if (changedFields.length > 0) { + fieldDiffs[blockId] = { changed_fields: changedFields } + } + } + } + }) + + if (newBlocks.length === 0 && editedBlocks.length === 0) { + // No diff markers found + return + } + + logger.info('Restoring diff state from markers', { + newBlocks: newBlocks.length, + editedBlocks: editedBlocks.length, + }) + + // Restore the diff state + // Note: We don't have the baseline, so reject will just clear the diff + // Add unchanged_fields to satisfy the type (we don't track unchanged fields on restore) + const fieldDiffsWithUnchanged: Record< + string, + { changed_fields: string[]; unchanged_fields: string[] } + > = {} + Object.entries(fieldDiffs).forEach(([blockId, diff]) => { + fieldDiffsWithUnchanged[blockId] = { + changed_fields: diff.changed_fields, + unchanged_fields: [], + } + }) + + const diffAnalysis = { + new_blocks: newBlocks, + edited_blocks: editedBlocks, + deleted_blocks: [], + field_diffs: fieldDiffsWithUnchanged, + } + + const activeWorkflowId = useWorkflowRegistry.getState().activeWorkflowId + + batchedUpdate({ + hasActiveDiff: true, + isShowingDiff: true, + isDiffReady: true, + baselineWorkflow: null, // We don't have baseline on restore from markers alone + baselineWorkflowId: activeWorkflowId, // Set the workflow ID for later baseline restoration + diffAnalysis, + diffMetadata: null, + diffError: null, + }) + }, } }, { name: 'workflow-diff-store' } diff --git a/apps/sim/stores/workflow-diff/types.ts b/apps/sim/stores/workflow-diff/types.ts index fe40b0842..a0578cecd 100644 --- a/apps/sim/stores/workflow-diff/types.ts +++ b/apps/sim/stores/workflow-diff/types.ts @@ -15,10 +15,16 @@ export interface WorkflowDiffState { export interface WorkflowDiffActions { setProposedChanges: (workflowState: WorkflowState, diffAnalysis?: DiffAnalysis) => Promise + restoreDiffWithBaseline: ( + baselineWorkflow: WorkflowState, + proposedWorkflow: WorkflowState, + diffAnalysis: DiffAnalysis + ) => void clearDiff: (options?: { restoreBaseline?: boolean }) => void toggleDiffView: () => void acceptChanges: () => Promise rejectChanges: () => Promise reapplyDiffMarkers: () => void + restoreDiffFromMarkers: () => void _batchedStateUpdate: (updates: Partial) => void } diff --git a/apps/sim/stores/workflow-diff/utils.ts b/apps/sim/stores/workflow-diff/utils.ts index 3245875f7..e92b63a66 100644 --- a/apps/sim/stores/workflow-diff/utils.ts +++ b/apps/sim/stores/workflow-diff/utils.ts @@ -56,17 +56,24 @@ export function captureBaselineSnapshot(workflowId: string): WorkflowState { export async function persistWorkflowStateToServer( workflowId: string, - workflowState: WorkflowState + workflowState: WorkflowState, + options?: { preserveDiffMarkers?: boolean } ): Promise { try { - const cleanState = stripWorkflowDiffMarkers(cloneWorkflowState(workflowState)) + // When preserveDiffMarkers is true, we keep the is_diff markers on blocks + // so they survive page refresh and can be restored. This is used when + // persisting a diff that hasn't been accepted/rejected yet. + const stateToSave = options?.preserveDiffMarkers + ? cloneWorkflowState(workflowState) + : stripWorkflowDiffMarkers(cloneWorkflowState(workflowState)) + const response = await fetch(`/api/workflows/${workflowId}/state`, { method: 'PUT', headers: { 'Content-Type': 'application/json', }, body: JSON.stringify({ - ...cleanState, + ...stateToSave, lastSaved: Date.now(), }), })