From 09f06cb683b01af31851d2d0ffd28a721ea334d1 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Fri, 10 Apr 2026 14:58:36 -0700 Subject: [PATCH] durable stream for files --- apps/sim/app/api/copilot/chat/queries.ts | 58 ++- .../app/api/copilot/chat/stream/route.test.ts | 44 ++ apps/sim/app/api/copilot/chat/stream/route.ts | 14 +- .../api/mothership/chats/[chatId]/route.ts | 28 +- .../components/file-viewer/file-viewer.tsx | 6 +- .../resource-content/resource-content.tsx | 273 +--------- .../mothership-view/mothership-view.tsx | 38 +- .../app/workspace/[workspaceId]/home/home.tsx | 4 +- .../[workspaceId]/home/hooks/use-chat.ts | 485 ++++++++++++------ .../hooks/use-file-preview-sessions.test.tsx | 147 ++++++ .../home/hooks/use-file-preview-sessions.ts | 187 +++++++ apps/sim/hooks/queries/tasks.ts | 8 +- .../sim/lib/copilot/request/go/stream.test.ts | 52 ++ apps/sim/lib/copilot/request/go/stream.ts | 404 ++++++++++++++- apps/sim/lib/copilot/request/lifecycle/run.ts | 2 + .../copilot/request/lifecycle/start.test.ts | 8 + .../lib/copilot/request/lifecycle/start.ts | 4 + .../session/file-preview-session.test.ts | 42 ++ .../request/session/file-preview-session.ts | 211 ++++++++ apps/sim/lib/copilot/request/session/index.ts | 14 + apps/sim/lib/copilot/tool-executor/types.ts | 1 + .../tools/registry/server-tool-adapter.ts | 1 + .../tools/server/files/edit-content.ts | 5 +- .../tools/server/files/file-intent-store.ts | 263 ++++++++-- .../tools/server/files/file-preview.test.ts | 79 +++ .../tools/server/files/file-preview.ts | 192 +++++++ .../tools/server/files/workspace-file.ts | 12 +- 27 files changed, 2059 insertions(+), 523 deletions(-) create mode 100644 apps/sim/app/workspace/[workspaceId]/home/hooks/use-file-preview-sessions.test.tsx create mode 100644 apps/sim/app/workspace/[workspaceId]/home/hooks/use-file-preview-sessions.ts create mode 100644 apps/sim/lib/copilot/request/go/stream.test.ts create mode 100644 apps/sim/lib/copilot/request/session/file-preview-session.test.ts create mode 100644 apps/sim/lib/copilot/request/session/file-preview-session.ts create mode 100644 apps/sim/lib/copilot/tools/server/files/file-preview.test.ts create mode 100644 apps/sim/lib/copilot/tools/server/files/file-preview.ts diff --git a/apps/sim/app/api/copilot/chat/queries.ts b/apps/sim/app/api/copilot/chat/queries.ts index 6f1f548a09..4828a15aa7 100644 --- a/apps/sim/app/api/copilot/chat/queries.ts +++ b/apps/sim/app/api/copilot/chat/queries.ts @@ -3,6 +3,7 @@ import { copilotChats } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { and, desc, eq } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' +import { getLatestRunForStream } from '@/lib/copilot/async-runs/repository' import { getAccessibleCopilotChat } from '@/lib/copilot/chat/lifecycle' import { authenticateCopilotRequestSessionOnly, @@ -10,6 +11,9 @@ import { createInternalServerErrorResponse, createUnauthorizedResponse, } from '@/lib/copilot/request/http' +import { readFilePreviewSessions } from '@/lib/copilot/request/session' +import { readEvents } from '@/lib/copilot/request/session/buffer' +import { toStreamBatchEvent } from '@/lib/copilot/request/session/types' import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils' import { assertActiveWorkspaceAccess } from '@/lib/workspaces/permissions/utils' @@ -63,8 +67,60 @@ export async function GET(req: NextRequest) { return NextResponse.json({ success: false, error: 'Chat not found' }, { status: 404 }) } + let streamSnapshot: { + events: ReturnType[] + previewSessions: Awaited> + status: string + } | null = null + if (chat.conversationId) { + try { + const [events, previewSessions, run] = await Promise.all([ + readEvents(chat.conversationId, '0'), + readFilePreviewSessions(chat.conversationId).catch((error) => { + logger.warn('Failed to read preview sessions for copilot chat', { + chatId, + conversationId: chat.conversationId, + error: error instanceof Error ? error.message : String(error), + }) + return [] + }), + getLatestRunForStream(chat.conversationId, authenticatedUserId).catch((error) => { + logger.warn('Failed to fetch latest run for copilot chat snapshot', { + chatId, + conversationId: chat.conversationId, + error: error instanceof Error ? error.message : String(error), + }) + return null + }), + ]) + + streamSnapshot = { + events: events.map(toStreamBatchEvent), + previewSessions, + status: + typeof run?.status === 'string' + ? run.status + : events.length > 0 + ? 'active' + : 'unknown', + } + } catch (error) { + logger.warn('Failed to load copilot chat stream snapshot', { + chatId, + conversationId: chat.conversationId, + error: error instanceof Error ? error.message : String(error), + }) + } + } + logger.info(`Retrieved chat ${chatId}`) - return NextResponse.json({ success: true, chat: transformChat(chat) }) + return NextResponse.json({ + success: true, + chat: { + ...transformChat(chat), + ...(streamSnapshot ? { streamSnapshot } : {}), + }, + }) } if (!workflowId && !workspaceId) { diff --git a/apps/sim/app/api/copilot/chat/stream/route.test.ts b/apps/sim/app/api/copilot/chat/stream/route.test.ts index ff5115e637..3105f9216c 100644 --- a/apps/sim/app/api/copilot/chat/stream/route.test.ts +++ b/apps/sim/app/api/copilot/chat/stream/route.test.ts @@ -12,11 +12,13 @@ import { const { getLatestRunForStream, readEvents, + readFilePreviewSessions, checkForReplayGap, authenticateCopilotRequestSessionOnly, } = vi.hoisted(() => ({ getLatestRunForStream: vi.fn(), readEvents: vi.fn(), + readFilePreviewSessions: vi.fn(), checkForReplayGap: vi.fn(), authenticateCopilotRequestSessionOnly: vi.fn(), })) @@ -27,6 +29,7 @@ vi.mock('@/lib/copilot/async-runs/repository', () => ({ vi.mock('@/lib/copilot/request/session', () => ({ readEvents, + readFilePreviewSessions, checkForReplayGap, createEvent: (event: Record) => ({ stream: { @@ -74,9 +77,50 @@ describe('copilot chat stream replay route', () => { isAuthenticated: true, }) readEvents.mockResolvedValue([]) + readFilePreviewSessions.mockResolvedValue([]) checkForReplayGap.mockResolvedValue(null) }) + it('returns preview sessions in batch mode', async () => { + getLatestRunForStream.mockResolvedValue({ + status: 'active', + executionId: 'exec-1', + id: 'run-1', + }) + readFilePreviewSessions.mockResolvedValue([ + { + schemaVersion: 1, + id: 'preview-1', + streamId: 'stream-1', + toolCallId: 'preview-1', + status: 'streaming', + fileName: 'draft.md', + previewText: 'hello', + previewVersion: 2, + updatedAt: '2026-04-10T00:00:00.000Z', + }, + ]) + + const response = await GET( + new NextRequest( + 'http://localhost:3000/api/copilot/chat/stream?streamId=stream-1&after=0&batch=true' + ) + ) + + expect(response.status).toBe(200) + await expect(response.json()).resolves.toMatchObject({ + success: true, + previewSessions: [ + expect.objectContaining({ + id: 'preview-1', + previewText: 'hello', + previewVersion: 2, + }), + ], + status: 'active', + }) + }) + it('stops replay polling when run becomes cancelled', async () => { getLatestRunForStream .mockResolvedValueOnce({ diff --git a/apps/sim/app/api/copilot/chat/stream/route.ts b/apps/sim/app/api/copilot/chat/stream/route.ts index 9f1323a215..5028ecf7e5 100644 --- a/apps/sim/app/api/copilot/chat/stream/route.ts +++ b/apps/sim/app/api/copilot/chat/stream/route.ts @@ -11,6 +11,7 @@ import { createEvent, encodeSSEEnvelope, readEvents, + readFilePreviewSessions, SSE_RESPONSE_HEADERS, } from '@/lib/copilot/request/session' import { toStreamBatchEvent } from '@/lib/copilot/request/session/types' @@ -113,17 +114,28 @@ export async function GET(request: NextRequest) { if (batchMode) { const afterSeq = afterCursor || '0' - const events = await readEvents(streamId, afterSeq) + const [events, previewSessions] = await Promise.all([ + readEvents(streamId, afterSeq), + readFilePreviewSessions(streamId).catch((error) => { + logger.warn('Failed to read preview sessions for stream batch', { + streamId, + error: error instanceof Error ? error.message : String(error), + }) + return [] + }), + ]) const batchEvents = events.map(toStreamBatchEvent) logger.info('[Resume] Batch response', { streamId, afterCursor: afterSeq, eventCount: batchEvents.length, + previewSessionCount: previewSessions.length, runStatus: run.status, }) return NextResponse.json({ success: true, events: batchEvents, + previewSessions, status: run.status, }) } diff --git a/apps/sim/app/api/mothership/chats/[chatId]/route.ts b/apps/sim/app/api/mothership/chats/[chatId]/route.ts index 39c1a7465d..e5fc73f301 100644 --- a/apps/sim/app/api/mothership/chats/[chatId]/route.ts +++ b/apps/sim/app/api/mothership/chats/[chatId]/route.ts @@ -4,6 +4,7 @@ import { createLogger } from '@sim/logger' import { and, eq, sql } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' import { z } from 'zod' +import { getLatestRunForStream } from '@/lib/copilot/async-runs/repository' import { getAccessibleCopilotChat } from '@/lib/copilot/chat/lifecycle' import { authenticateCopilotRequestSessionOnly, @@ -11,7 +12,9 @@ import { createInternalServerErrorResponse, createUnauthorizedResponse, } from '@/lib/copilot/request/http' +import type { FilePreviewSession } from '@/lib/copilot/request/session' import { readEvents } from '@/lib/copilot/request/session/buffer' +import { readFilePreviewSessions } from '@/lib/copilot/request/session/file-preview-session' import { type StreamBatchEvent, toStreamBatchEvent } from '@/lib/copilot/request/session/types' import { taskPubSub } from '@/lib/copilot/tasks' import { captureServerEvent } from '@/lib/posthog/server' @@ -49,16 +52,37 @@ export async function GET( let streamSnapshot: { events: StreamBatchEvent[] + previewSessions: FilePreviewSession[] status: string } | null = null if (chat.conversationId) { try { - const events = await readEvents(chat.conversationId, '0') + const [events, previewSessions] = await Promise.all([ + readEvents(chat.conversationId, '0'), + readFilePreviewSessions(chat.conversationId).catch((error) => { + logger.warn('Failed to read preview sessions for mothership chat', { + chatId, + conversationId: chat.conversationId, + error: error instanceof Error ? error.message : String(error), + }) + return [] + }), + ]) + const run = await getLatestRunForStream(chat.conversationId, userId).catch((error) => { + logger.warn('Failed to fetch latest run for mothership chat snapshot', { + chatId, + conversationId: chat.conversationId, + error: error instanceof Error ? error.message : String(error), + }) + return null + }) streamSnapshot = { events: events.map(toStreamBatchEvent), - status: events.length > 0 ? 'active' : 'unknown', + previewSessions, + status: + typeof run?.status === 'string' ? run.status : events.length > 0 ? 'active' : 'unknown', } } catch (error) { logger.warn('Failed to read stream snapshot for mothership chat', { diff --git a/apps/sim/app/workspace/[workspaceId]/files/components/file-viewer/file-viewer.tsx b/apps/sim/app/workspace/[workspaceId]/files/components/file-viewer/file-viewer.tsx index 9194a38eef..a9e4a8ca46 100644 --- a/apps/sim/app/workspace/[workspaceId]/files/components/file-viewer/file-viewer.tsx +++ b/apps/sim/app/workspace/[workspaceId]/files/components/file-viewer/file-viewer.tsx @@ -29,7 +29,6 @@ import { useWorkspaceFileContent, } from '@/hooks/queries/workspace-files' import { useAutosave } from '@/hooks/use-autosave' -import { useStreamingText } from '@/hooks/use-streaming-text' import { DataTable } from './data-table' import { PreviewPanel, resolvePreviewType } from './preview-panel' @@ -526,8 +525,7 @@ function TextEditor({ const isStreaming = isStreamInteractionLocked const isEditorReadOnly = isStreamInteractionLocked || !canEdit - const revealedContent = useStreamingText(content, false) - const renderedContent = isStreaming ? revealedContent : content + const renderedContent = content const gutterWidthPx = useMemo(() => { const lineCount = renderedContent.split('\n').length return calculateGutterWidth(lineCount) @@ -731,7 +729,7 @@ function TextEditor({ const el = (shouldUseCodeRenderer ? codeScrollRef.current : textareaRef.current) ?? null if (!el) return el.scrollTop = el.scrollHeight - }, [isStreaming, revealedContent, shouldUseCodeRenderer]) + }, [isStreaming, renderedContent, shouldUseCodeRenderer]) if (streamingContent === undefined) { if (isLoading) return DOCUMENT_SKELETON diff --git a/apps/sim/app/workspace/[workspaceId]/home/components/mothership-view/components/resource-content/resource-content.tsx b/apps/sim/app/workspace/[workspaceId]/home/components/mothership-view/components/resource-content/resource-content.tsx index 36ab4ecf15..3ef1bab6da 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/components/mothership-view/components/resource-content/resource-content.tsx +++ b/apps/sim/app/workspace/[workspaceId]/home/components/mothership-view/components/resource-content/resource-content.tsx @@ -1,8 +1,7 @@ 'use client' -import { lazy, memo, Suspense, useCallback, useEffect, useMemo, useState } from 'react' +import { lazy, memo, Suspense, useCallback, useEffect, useMemo } from 'react' import { createLogger } from '@sim/logger' -import { useQueryClient } from '@tanstack/react-query' import { Square } from 'lucide-react' import { useRouter } from 'next/navigation' import { Button, PlayOutline, Skeleton, Tooltip } from '@/components/emcn' @@ -13,6 +12,7 @@ import { SquareArrowUpRight, WorkflowX, } from '@/components/emcn/icons' +import type { FilePreviewSession } from '@/lib/copilot/request/session' import { cancelRunToolExecution, markRunToolManuallyStopped, @@ -47,11 +47,7 @@ import { useUsageLimits } from '@/app/workspace/[workspaceId]/w/[workflowId]/com import { useWorkflowExecution } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution' import { useFolders } from '@/hooks/queries/folders' import { useWorkflows } from '@/hooks/queries/workflows' -import { - useWorkspaceFileContent, - useWorkspaceFiles, - workspaceFilesKeys, -} from '@/hooks/queries/workspace-files' +import { useWorkspaceFiles } from '@/hooks/queries/workspace-files' import { useSettingsNavigation } from '@/hooks/use-settings-navigation' import { useExecutionStore } from '@/stores/execution/store' import { useWorkflowRegistry } from '@/stores/workflows/registry/store' @@ -70,15 +66,7 @@ interface ResourceContentProps { workspaceId: string resource: MothershipResource previewMode?: PreviewMode - streamingFile?: { - toolCallId?: string - fileName: string - fileId?: string - targetKind?: 'new_file' | 'file_id' - operation?: string - edit?: Record - content: string - } | null + previewSession?: FilePreviewSession | null genericResourceData?: GenericResourceData } @@ -93,126 +81,10 @@ export const ResourceContent = memo(function ResourceContent({ workspaceId, resource, previewMode, - streamingFile, + previewSession, genericResourceData, }: ResourceContentProps) { - const queryClient = useQueryClient() - const streamFileName = streamingFile?.fileName || 'file.md' - - const streamOperation = useMemo(() => { - if (!streamingFile) return undefined - return streamingFile.operation - }, [streamingFile]) - - const isWriteStream = streamOperation === 'create' || streamOperation === 'append' - const isPatchStream = streamOperation === 'patch' - const isUpdateStream = streamOperation === 'update' - - const { data: allFiles = [] } = useWorkspaceFiles(workspaceId) - const previewFileId = - streamingFile?.fileId ?? (resource.type === 'file' ? resource.id : undefined) - const previewFileRecord = useMemo(() => { - if (!previewFileId) return undefined - return allFiles.find((f) => f.id === previewFileId) - }, [previewFileId, allFiles]) - - const isSourceMime = - previewFileRecord?.type === 'text/x-pptxgenjs' || - previewFileRecord?.type === 'text/x-docxjs' || - previewFileRecord?.type === 'text/x-pdflibjs' - const previewContentMode = isSourceMime ? 'raw' : 'text' - - const { data: fetchedFileContent } = useWorkspaceFileContent( - workspaceId, - previewFileRecord?.id ?? '', - previewFileRecord?.key ?? '', - isSourceMime - ) - - const frozenPreviewBaseKey = - streamingFile?.toolCallId && - previewFileRecord?.id && - streamingFile.targetKind === 'file_id' && - (streamOperation === 'append' || streamOperation === 'patch') - ? `${streamingFile.toolCallId}:${streamOperation}:${previewFileRecord.id}` - : null - const cachedFrozenPreviewBase = - frozenPreviewBaseKey && previewFileRecord?.id - ? queryClient.getQueryData( - workspaceFilesKeys.content(workspaceId, previewFileRecord.id, previewContentMode) - ) - : undefined - const [frozenPreviewBaseState, setFrozenPreviewBaseState] = useState<{ - key: string | null - content: string | null - }>({ key: null, content: null }) - - useEffect(() => { - if (!frozenPreviewBaseKey || !previewFileRecord?.id) { - setFrozenPreviewBaseState((prev) => (prev.key === null ? prev : { key: null, content: null })) - return - } - - setFrozenPreviewBaseState((prev) => { - if (prev.key === frozenPreviewBaseKey) { - return prev - } - return typeof cachedFrozenPreviewBase === 'string' - ? { key: frozenPreviewBaseKey, content: cachedFrozenPreviewBase } - : { key: frozenPreviewBaseKey, content: null } - }) - }, [cachedFrozenPreviewBase, frozenPreviewBaseKey, previewFileRecord?.id]) - - const frozenExistingFilePreviewBase = - frozenPreviewBaseState.key === frozenPreviewBaseKey - ? (frozenPreviewBaseState.content ?? undefined) - : typeof cachedFrozenPreviewBase === 'string' - ? cachedFrozenPreviewBase - : undefined - - const streamingExtractedContent = useMemo(() => { - if (!streamingFile) return undefined - if (!streamOperation) return undefined - - if (isPatchStream) { - if (streamingFile.targetKind === 'file_id') { - if (frozenExistingFilePreviewBase === undefined) return undefined - if (!shouldApplyPatchPreview(streamingFile)) return undefined - return extractPatchPreview(streamingFile, frozenExistingFilePreviewBase) - } - if (fetchedFileContent === undefined) return undefined - if (!shouldApplyPatchPreview(streamingFile)) return undefined - return extractPatchPreview(streamingFile, fetchedFileContent) - } - - const extracted = streamingFile.content - - if (isUpdateStream) return extracted - - if (streamOperation === 'append') { - if (streamingFile.targetKind === 'file_id') { - if (frozenExistingFilePreviewBase === undefined) return undefined - return buildAppendPreview(frozenExistingFilePreviewBase, extracted) - } - return extracted.length > 0 ? extracted : undefined - } - - if (streamOperation === 'create') { - return extracted.length > 0 ? extracted : undefined - } - - if (isWriteStream) return extracted.length > 0 ? extracted : undefined - - return undefined - }, [ - streamingFile, - streamOperation, - isWriteStream, - isPatchStream, - isUpdateStream, - fetchedFileContent, - frozenExistingFilePreviewBase, - ]) + const streamFileName = previewSession?.fileName || 'file.md' const syntheticFile = useMemo(() => { const ext = getFileExtension(streamFileName) const SOURCE_MIME_MAP: Record = { @@ -234,22 +106,21 @@ export const ResourceContent = memo(function ResourceContent({ } }, [workspaceId, streamFileName]) - // ResourceContent now reconstructs full-file preview text per operation, - // so the viewer can always treat streaming content as a whole-file replace. const streamingFileMode: 'append' | 'replace' = 'replace' + const rawPreviewText = previewSession?.previewText + const streamingPreviewText = + typeof rawPreviewText === 'string' && rawPreviewText.length > 0 ? rawPreviewText : undefined - const embeddedStreamingContent = streamingExtractedContent - - if (streamingFile && resource.id === 'streaming-file') { + if (previewSession && resource.id === 'streaming-file') { return (
- {streamingExtractedContent !== undefined ? ( + {streamingPreviewText !== undefined ? ( @@ -273,7 +144,9 @@ export const ResourceContent = memo(function ResourceContent({ workspaceId={workspaceId} fileId={resource.id} previewMode={previewMode} - streamingContent={embeddedStreamingContent} + streamingContent={ + previewSession?.fileId === resource.id ? streamingPreviewText : undefined + } streamingMode={streamingFileMode} /> ) @@ -674,119 +547,3 @@ function EmbeddedFolder({ workspaceId, folderId }: EmbeddedFolderProps) {
) } - -function findAnchorIndex(lines: string[], anchor: string, occurrence = 1, afterIndex = -1): number { - const trimmed = anchor.trim() - let count = 0 - for (let i = afterIndex + 1; i < lines.length; i++) { - if (lines[i].trim() === trimmed) { - count++ - if (count === occurrence) return i - } - } - return -1 -} - -function extractPatchPreview( - streamingFile: { - content: string - edit?: Record - }, - existingContent: string -): string | undefined { - const edit = streamingFile.edit ?? {} - const strategy = typeof edit.strategy === 'string' ? edit.strategy : undefined - const lines = existingContent.split('\n') - const occurrence = - typeof edit.occurrence === 'number' && Number.isFinite(edit.occurrence) ? edit.occurrence : 1 - - if (strategy === 'search_replace') { - const search = typeof edit.search === 'string' ? edit.search : '' - if (!search) return undefined - const replace = streamingFile.content - if ((edit.replaceAll as boolean | undefined) === true) { - return existingContent.split(search).join(replace) - } - const firstIdx = existingContent.indexOf(search) - if (firstIdx === -1) return undefined - return ( - existingContent.slice(0, firstIdx) + replace + existingContent.slice(firstIdx + search.length) - ) - } - - const mode = typeof edit.mode === 'string' ? edit.mode : undefined - if (!mode) return undefined - - if (mode === 'replace_between') { - const beforeAnchor = typeof edit.before_anchor === 'string' ? edit.before_anchor : undefined - const afterAnchor = typeof edit.after_anchor === 'string' ? edit.after_anchor : undefined - if (!beforeAnchor || !afterAnchor) return undefined - - const beforeIdx = findAnchorIndex(lines, beforeAnchor, occurrence) - const afterIdx = findAnchorIndex(lines, afterAnchor, occurrence, beforeIdx) - if (beforeIdx === -1 || afterIdx === -1 || afterIdx <= beforeIdx) return undefined - - const newContent = streamingFile.content - const spliced = [ - ...lines.slice(0, beforeIdx + 1), - ...(newContent.length > 0 ? newContent.split('\n') : []), - ...lines.slice(afterIdx), - ] - return spliced.join('\n') - } - - if (mode === 'insert_after') { - const anchor = typeof edit.anchor === 'string' ? edit.anchor : undefined - if (!anchor) return undefined - - const anchorIdx = findAnchorIndex(lines, anchor, occurrence) - if (anchorIdx === -1) return undefined - - const newContent = streamingFile.content - const spliced = [ - ...lines.slice(0, anchorIdx + 1), - ...(newContent.length > 0 ? newContent.split('\n') : []), - ...lines.slice(anchorIdx + 1), - ] - return spliced.join('\n') - } - - if (mode === 'delete_between') { - const startAnchor = typeof edit.start_anchor === 'string' ? edit.start_anchor : undefined - const endAnchor = typeof edit.end_anchor === 'string' ? edit.end_anchor : undefined - if (!startAnchor || !endAnchor) return undefined - - const startIdx = findAnchorIndex(lines, startAnchor, occurrence) - const endIdx = findAnchorIndex(lines, endAnchor, occurrence, startIdx) - if (startIdx === -1 || endIdx === -1 || endIdx <= startIdx) return undefined - - const spliced = [...lines.slice(0, startIdx), ...lines.slice(endIdx)] - return spliced.join('\n') - } - - return undefined -} - -function shouldApplyPatchPreview(streamingFile: { - content: string - edit?: Record -}): boolean { - const edit = streamingFile.edit ?? {} - const strategy = typeof edit.strategy === 'string' ? edit.strategy : undefined - const mode = typeof edit.mode === 'string' ? edit.mode : undefined - - // delete_between is delete-only and can be previewed from intent metadata alone. - if (strategy === 'anchored' && mode === 'delete_between') { - return true - } - - // For all other patch modes, keep the visible file unchanged until - // edit_content actually streams content into the target location. - return streamingFile.content.length > 0 -} - -function buildAppendPreview(existingContent: string, incomingContent: string): string { - if (incomingContent.length === 0) return existingContent - if (existingContent.length === 0) return incomingContent - return `${existingContent}\n${incomingContent}` -} diff --git a/apps/sim/app/workspace/[workspaceId]/home/components/mothership-view/mothership-view.tsx b/apps/sim/app/workspace/[workspaceId]/home/components/mothership-view/mothership-view.tsx index 6c1062416e..1ce4099064 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/components/mothership-view/mothership-view.tsx +++ b/apps/sim/app/workspace/[workspaceId]/home/components/mothership-view/mothership-view.tsx @@ -1,6 +1,7 @@ 'use client' import { forwardRef, memo, useCallback, useEffect, useState } from 'react' +import type { FilePreviewSession } from '@/lib/copilot/request/session' import { cn } from '@/lib/core/utils/cn' import { getFileExtension } from '@/lib/uploads/utils/file-utils' import type { PreviewMode } from '@/app/workspace/[workspaceId]/files/components/file-viewer' @@ -25,24 +26,13 @@ const PREVIEW_CYCLE: Record = { * when the streamed fileId matches that exact resource. */ function shouldShowStreamingFilePanel( - streamingFile: - | { - toolCallId?: string - fileName: string - fileId?: string - targetKind?: 'new_file' | 'file_id' - operation?: string - edit?: Record - content: string - } - | null - | undefined, + previewSession: FilePreviewSession | null | undefined, active: MothershipResource | null ): boolean { - if (!streamingFile || !active) return false + if (!previewSession || previewSession.status === 'complete' || !active) return false if (active.id === 'streaming-file') return true if (active.type !== 'file') return false - if (active.id && streamingFile.fileId === active.id) return true + if (active.id && previewSession.fileId === active.id) return true return false } @@ -58,15 +48,7 @@ interface MothershipViewProps { onCollapse: () => void isCollapsed: boolean className?: string - streamingFile?: { - toolCallId?: string - fileName: string - fileId?: string - targetKind?: 'new_file' | 'file_id' - operation?: string - edit?: Record - content: string - } | null + previewSession?: FilePreviewSession | null genericResourceData?: GenericResourceData } @@ -84,7 +66,7 @@ export const MothershipView = memo( onCollapse, isCollapsed, className, - streamingFile, + previewSession, genericResourceData, }: MothershipViewProps, ref @@ -92,9 +74,9 @@ export const MothershipView = memo( const active = resources.find((r) => r.id === activeResourceId) ?? resources[0] ?? null const { canEdit } = useUserPermissionsContext() - const streamingForActive = - streamingFile && active && shouldShowStreamingFilePanel(streamingFile, active) - ? streamingFile + const previewForActive = + previewSession && active && shouldShowStreamingFilePanel(previewSession, active) + ? previewSession : undefined const [previewMode, setPreviewMode] = useState('preview') @@ -145,7 +127,7 @@ export const MothershipView = memo( workspaceId={workspaceId} resource={active} previewMode={isActivePreviewable ? previewMode : undefined} - streamingFile={streamingForActive} + previewSession={previewForActive} genericResourceData={active.type === 'generic' ? genericResourceData : undefined} /> ) : ( diff --git a/apps/sim/app/workspace/[workspaceId]/home/home.tsx b/apps/sim/app/workspace/[workspaceId]/home/home.tsx index 520dc2dee9..cd19028e2f 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/home.tsx +++ b/apps/sim/app/workspace/[workspaceId]/home/home.tsx @@ -157,7 +157,7 @@ export function Home({ chatId }: HomeProps = {}) { removeFromQueue, sendNow, editQueuedMessage, - streamingFile, + previewSession, genericResourceData, } = useChat( workspaceId, @@ -413,7 +413,7 @@ export function Home({ chatId }: HomeProps = {}) { onReorderResources={reorderResources} onCollapse={collapseResource} isCollapsed={isResourceCollapsed} - streamingFile={streamingFile} + previewSession={previewSession} genericResourceData={genericResourceData ?? undefined} className={skipResourceTransition ? '!transition-none' : undefined} /> diff --git a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts index 7749bcb353..790be10175 100644 --- a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts +++ b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts @@ -56,6 +56,7 @@ import { WorkspaceFile, WorkspaceFileOperation, } from '@/lib/copilot/generated/tool-catalog-v1' +import type { FilePreviewSession } from '@/lib/copilot/request/session' import type { StreamBatchEvent } from '@/lib/copilot/request/session/types' import { extractResourcesFromToolResult, @@ -75,6 +76,12 @@ import { generateId } from '@/lib/core/utils/uuid' import { getNextWorkflowColor } from '@/lib/workflows/colors' import { getQueryClient } from '@/app/_shell/providers/get-query-client' import { invalidateResourceQueries } from '@/app/workspace/[workspaceId]/home/components/mothership-view/components/resource-registry' +import { + type FilePreviewSessionsState, + INITIAL_FILE_PREVIEW_SESSIONS_STATE, + reduceFilePreviewSessions, + useFilePreviewSessions, +} from '@/app/workspace/[workspaceId]/home/hooks/use-file-preview-sessions' import { deploymentKeys } from '@/hooks/queries/deployments' import { fetchChatHistory, @@ -88,6 +95,7 @@ import { invalidateWorkflowSelectors } from '@/hooks/queries/utils/invalidate-wo import { getTopInsertionSortOrder } from '@/hooks/queries/utils/top-insertion-sort-order' import { getWorkflowById, getWorkflows } from '@/hooks/queries/utils/workflow-cache' import { workflowKeys } from '@/hooks/queries/workflows' +import { workspaceFilesKeys } from '@/hooks/queries/workspace-files' import { useExecutionStream } from '@/hooks/use-execution-stream' import { useExecutionStore } from '@/stores/execution/store' import type { ChatContext } from '@/stores/panel' @@ -127,7 +135,7 @@ export interface UseChatReturn { removeFromQueue: (id: string) => void sendNow: (id: string) => Promise editQueuedMessage: (id: string) => QueuedMessage | undefined - streamingFile: StreamingFilePreview | null + previewSession: FilePreviewSession | null genericResourceData: GenericResourceData | null } @@ -470,22 +478,42 @@ type StreamToolUI = { clientExecutable?: boolean } -type StreamingFilePreview = { - toolCallId: string - fileName: string - fileId?: string - targetKind?: 'new_file' | 'file_id' - operation?: string - edit?: Record - content: string -} - type StreamBatchResponse = { success: boolean events: StreamBatchEvent[] + previewSessions?: FilePreviewSession[] status: string } +function buildChatHistoryHydrationKey(chatHistory: TaskChatHistory): string { + const resourceKey = chatHistory.resources + .map((resource) => `${resource.type}:${resource.id}:${resource.title}`) + .join('|') + const messageKey = chatHistory.messages.map((message) => message.id).join('|') + const streamSnapshot = chatHistory.streamSnapshot + const snapshotKey = streamSnapshot + ? [ + streamSnapshot.status, + streamSnapshot.events.length, + streamSnapshot.events[streamSnapshot.events.length - 1]?.eventId ?? '', + streamSnapshot.previewSessions + .map( + (session) => + `${session.id}:${session.previewVersion}:${session.status}:${session.updatedAt}` + ) + .join('|'), + ].join('~') + : 'none' + + return [ + chatHistory.id, + chatHistory.activeStreamId ?? '', + messageKey, + resourceKey, + snapshotKey, + ].join('::') +} + const TERMINAL_STREAM_STATUSES = new Set(['complete', 'error', 'cancelled']) function isTerminalStreamStatus(status: string | null | undefined): boolean { @@ -676,13 +704,80 @@ export function useChat( const activeResourceIdRef = useRef(effectiveActiveResourceId) activeResourceIdRef.current = effectiveActiveResourceId - const [streamingFile, setStreamingFile] = useState(null) - const streamingFileRef = useRef(streamingFile) - streamingFileRef.current = streamingFile - const pendingStreamingFileRef = useRef<{ value: StreamingFilePreview | null } | null>(null) - const streamingFileFrameRef = useRef(null) - const filePreviewSessionsRef = useRef>(new Map()) - const activeFilePreviewToolCallIdRef = useRef(null) + const { + previewSession, + previewSessionsById, + activePreviewSessionId, + hydratePreviewSessions, + upsertPreviewSession, + completePreviewSession, + removePreviewSession, + resetPreviewSessions, + } = useFilePreviewSessions() + const previewSessionRef = useRef(previewSession) + previewSessionRef.current = previewSession + const previewSessionsRef = useRef(previewSessionsById) + previewSessionsRef.current = previewSessionsById + const activePreviewSessionIdRef = useRef(activePreviewSessionId) + activePreviewSessionIdRef.current = activePreviewSessionId + const previewSessionsStateRef = useRef({ + activeSessionId: activePreviewSessionId, + sessions: previewSessionsById, + }) + previewSessionsStateRef.current = { + activeSessionId: activePreviewSessionId, + sessions: previewSessionsById, + } + + const syncPreviewSessionRefs = useCallback((nextState: FilePreviewSessionsState) => { + previewSessionsStateRef.current = nextState + previewSessionsRef.current = nextState.sessions + activePreviewSessionIdRef.current = nextState.activeSessionId + previewSessionRef.current = + nextState.activeSessionId !== null + ? (nextState.sessions[nextState.activeSessionId] ?? null) + : null + }, []) + + const applyPreviewSessionUpdate = useCallback( + (session: FilePreviewSession, options?: { activate?: boolean }) => { + const nextState = reduceFilePreviewSessions(previewSessionsStateRef.current, { + type: 'upsert', + session, + ...(options?.activate === false ? { activate: false } : {}), + }) + syncPreviewSessionRefs(nextState) + upsertPreviewSession(session, options) + return nextState + }, + [syncPreviewSessionRefs, upsertPreviewSession] + ) + + const applyCompletedPreviewSession = useCallback( + (session: FilePreviewSession) => { + const nextState = reduceFilePreviewSessions(previewSessionsStateRef.current, { + type: 'complete', + session, + }) + syncPreviewSessionRefs(nextState) + completePreviewSession(session) + return nextState + }, + [completePreviewSession, syncPreviewSessionRefs] + ) + + const removePreviewSessionImmediate = useCallback( + (sessionId: string) => { + const nextState = reduceFilePreviewSessions(previewSessionsStateRef.current, { + type: 'remove', + sessionId, + }) + syncPreviewSessionRefs(nextState) + removePreviewSession(sessionId) + return nextState + }, + [removePreviewSession, syncPreviewSessionRefs] + ) const [messageQueue, setMessageQueue] = useState([]) const messageQueueRef = useRef([]) @@ -711,55 +806,77 @@ export function useChat( >(async () => false) const finalizeRef = useRef<(options?: { error?: boolean }) => void>(() => {}) - const cancelQueuedStreamingFileUpdate = useCallback(() => { - if (streamingFileFrameRef.current !== null) { - cancelAnimationFrame(streamingFileFrameRef.current) - streamingFileFrameRef.current = null - } - pendingStreamingFileRef.current = null - }, []) - - const setStreamingFileImmediate = useCallback( - (next: StreamingFilePreview | null) => { - cancelQueuedStreamingFileUpdate() - streamingFileRef.current = next - setStreamingFile(next) - }, - [cancelQueuedStreamingFileUpdate] - ) - const resetEphemeralPreviewState = useCallback( (options?: { removeStreamingResource?: boolean }) => { - setStreamingFileImmediate(null) - filePreviewSessionsRef.current.clear() - activeFilePreviewToolCallIdRef.current = null + syncPreviewSessionRefs(INITIAL_FILE_PREVIEW_SESSIONS_STATE) + resetPreviewSessions() if (options?.removeStreamingResource) { setResources((current) => current.filter((resource) => resource.id !== 'streaming-file')) } }, - [setStreamingFileImmediate] + [resetPreviewSessions, syncPreviewSessionRefs] ) - const queueStreamingFileUpdate = useCallback((next: StreamingFilePreview) => { - streamingFileRef.current = next - pendingStreamingFileRef.current = { value: next } - if (streamingFileFrameRef.current !== null) return - streamingFileFrameRef.current = requestAnimationFrame(() => { - streamingFileFrameRef.current = null - const pending = pendingStreamingFileRef.current - pendingStreamingFileRef.current = null - if (!pending) return - setStreamingFile(pending.value) - }) + const syncPreviewResourceChrome = useCallback((session: FilePreviewSession) => { + if (session.targetKind === 'new_file') { + setResources((current) => { + const existing = current.find((resource) => resource.id === 'streaming-file') + if (existing) { + return current.map((resource) => + resource.id === 'streaming-file' + ? { ...resource, title: session.fileName || 'Writing file...' } + : resource + ) + } + return [ + ...current, + { + type: 'file', + id: 'streaming-file', + title: session.fileName || 'Writing file...', + }, + ] + }) + setActiveResourceId('streaming-file') + return + } + + if (session.fileId) { + setResources((current) => current.filter((resource) => resource.id !== 'streaming-file')) + setActiveResourceId(session.fileId) + } }, []) + const seedPreviewSessions = useCallback( + (sessions: FilePreviewSession[]) => { + if (sessions.length === 0) { + return + } + + const nextState = reduceFilePreviewSessions(previewSessionsStateRef.current, { + type: 'hydrate', + sessions, + }) + syncPreviewSessionRefs(nextState) + hydratePreviewSessions(sessions) + const active = + nextState.activeSessionId !== null + ? (nextState.sessions[nextState.activeSessionId] ?? null) + : null + if (active) { + syncPreviewResourceChrome(active) + } + }, + [hydratePreviewSessions, syncPreviewResourceChrome, syncPreviewSessionRefs] + ) + const abortControllerRef = useRef(null) const streamReaderRef = useRef | null>(null) const chatIdRef = useRef(initialChatId) /** Panel/task selection — drives createNewChat + request chatId; may differ from chatIdRef while a stream is still finishing. */ const selectedChatIdRef = useRef(initialChatId) selectedChatIdRef.current = initialChatId - const appliedChatIdRef = useRef(undefined) + const appliedChatHistoryKeyRef = useRef(undefined) const pendingUserMsgRef = useRef<{ id: string; content: string } | null>(null) const streamIdRef = useRef(undefined) const lastCursorRef = useRef('0') @@ -887,7 +1004,7 @@ export function useChat( chatIdRef.current = initialChatId lastCursorRef.current = '0' setResolvedChatId(initialChatId) - appliedChatIdRef.current = undefined + appliedChatHistoryKeyRef.current = undefined setMessages([]) setError(null) setIsSending(false) @@ -905,7 +1022,7 @@ export function useChat( chatIdRef.current = undefined lastCursorRef.current = '0' setResolvedChatId(undefined) - appliedChatIdRef.current = undefined + appliedChatHistoryKeyRef.current = undefined abortControllerRef.current = null sendingRef.current = false setMessages([]) @@ -919,10 +1036,13 @@ export function useChat( }, [isHomePage, resetEphemeralPreviewState]) useEffect(() => { - if (!chatHistory || appliedChatIdRef.current === chatHistory.id) return + if (!chatHistory) return + + const hydrationKey = buildChatHistoryHydrationKey(chatHistory) + if (appliedChatHistoryKeyRef.current === hydrationKey) return const activeStreamId = chatHistory.activeStreamId - appliedChatIdRef.current = chatHistory.id + appliedChatHistoryKeyRef.current = hydrationKey const mappedMessages = chatHistory.messages.map(toDisplayMessage) const shouldPreserveActiveStreamingMessage = sendingRef.current && Boolean(activeStreamId) && activeStreamId === streamIdRef.current @@ -973,6 +1093,13 @@ export function useChat( setActiveResourceId(null) } + const snapshotPreviewSessions = Array.isArray(chatHistory.streamSnapshot?.previewSessions) + ? (chatHistory.streamSnapshot.previewSessions as FilePreviewSession[]) + : [] + if (snapshotPreviewSessions.length > 0) { + seedPreviewSessions(snapshotPreviewSessions) + } + if (activeStreamId && !sendingRef.current) { const gen = ++streamGenRef.current const abortController = new AbortController() @@ -998,6 +1125,7 @@ export function useChat( initialBatch: { success: true, events: snapshotEvents, + previewSessions: snapshotPreviewSessions, status: initialSnapshot?.status ?? 'unknown', }, afterCursor: String(snapshotEvents[snapshotEvents.length - 1]?.eventId ?? '0'), @@ -1026,7 +1154,13 @@ export function useChat( } reconnect() } - }, [chatHistory, workspaceId, queryClient, recoverPendingClientWorkflowTools]) + }, [ + chatHistory, + workspaceId, + queryClient, + recoverPendingClientWorkflowTools, + seedPreviewSessions, + ]) const processSSEStream = useCallback( async ( @@ -1285,126 +1419,129 @@ export function useChat( if (!id) break if (previewPhase) { - const sessions = filePreviewSessionsRef.current - const prevSession = sessions.get(id) ?? { + const prevSession = previewSessionsRef.current[id] + const target = asPayloadRecord(payload.target) + const targetKind = + payload.targetKind === 'new_file' || payload.targetKind === 'file_id' + ? (payload.targetKind as 'new_file' | 'file_id') + : target?.kind === 'new_file' || target?.kind === 'file_id' + ? (target.kind as 'new_file' | 'file_id') + : prevSession?.targetKind + const fileId = + typeof payload.fileId === 'string' + ? payload.fileId + : typeof target?.fileId === 'string' + ? target.fileId + : prevSession?.fileId + const fileName = + typeof payload.fileName === 'string' + ? payload.fileName + : typeof target?.fileName === 'string' + ? target.fileName + : (prevSession?.fileName ?? '') + const operation = + typeof payload.operation === 'string' ? payload.operation : prevSession?.operation + const edit = asPayloadRecord(payload.edit) ?? prevSession?.edit + const streamId = parsed.stream?.streamId ?? prevSession?.streamId ?? '' + const nextPreviewVersion = + typeof payload.previewVersion === 'number' && + Number.isFinite(payload.previewVersion) + ? payload.previewVersion + : (prevSession?.previewVersion ?? 0) + 1 + const baseSession: FilePreviewSession = { + schemaVersion: 1, + id, + streamId, toolCallId: id, - fileName: '', - content: '', + status: prevSession?.status ?? 'pending', + fileName, + ...(fileId ? { fileId } : {}), + ...(targetKind ? { targetKind } : {}), + ...(operation ? { operation } : {}), + ...(edit ? { edit } : {}), + previewText: prevSession?.previewText ?? '', + previewVersion: prevSession?.previewVersion ?? 0, + updatedAt: prevSession?.updatedAt ?? new Date().toISOString(), + ...(prevSession?.completedAt ? { completedAt: prevSession.completedAt } : {}), } if (previewPhase === 'file_preview_start') { - const nextSession: StreamingFilePreview = { - ...prevSession, - toolCallId: id, - ...(typeof payload.operation === 'string' - ? { operation: payload.operation } - : {}), - ...(typeof payload.fileId === 'string' - ? { fileId: payload.fileId, targetKind: 'file_id' as const } - : {}), + const nextSession: FilePreviewSession = { + ...baseSession, + status: 'pending', + updatedAt: new Date().toISOString(), } - sessions.set(id, nextSession) - activeFilePreviewToolCallIdRef.current = id if (nextSession.fileId) { setActiveResourceId(nextSession.fileId) } - setStreamingFileImmediate(nextSession) + applyPreviewSessionUpdate(nextSession) break } if (previewPhase === 'file_preview_target') { - const target = asPayloadRecord(payload.target) - const nextSession: StreamingFilePreview = { - ...prevSession, - operation: - typeof payload.operation === 'string' - ? payload.operation - : prevSession.operation, - targetKind: - target?.kind === 'new_file' || target?.kind === 'file_id' - ? (target.kind as 'new_file' | 'file_id') - : prevSession.targetKind, - fileId: typeof target?.fileId === 'string' ? target.fileId : prevSession.fileId, - fileName: - typeof target?.fileName === 'string' ? target.fileName : prevSession.fileName, + const nextSession: FilePreviewSession = { + ...baseSession, + updatedAt: new Date().toISOString(), } - sessions.set(id, nextSession) - activeFilePreviewToolCallIdRef.current = id - - if (nextSession.targetKind === 'new_file') { - const hasStreamingResource = resourcesRef.current.some( - (resource) => resource.id === 'streaming-file' - ) - if (!hasStreamingResource) { - addResource({ - type: 'file', - id: 'streaming-file', - title: nextSession.fileName || 'Writing file...', - }) - setActiveResourceId('streaming-file') - } - } else if (nextSession.fileId) { - setResources((rs) => rs.filter((resource) => resource.id !== 'streaming-file')) - if ( - activeResourceIdRef.current === null || - activeResourceIdRef.current === 'streaming-file' - ) { - setActiveResourceId(nextSession.fileId) - } + const nextState = applyPreviewSessionUpdate(nextSession) + const activePreview = + nextState.activeSessionId !== null + ? (nextState.sessions[nextState.activeSessionId] ?? null) + : null + if (activePreview?.id === nextSession.id) { + syncPreviewResourceChrome(activePreview) } - - setStreamingFileImmediate(nextSession) break } if (previewPhase === 'file_preview_edit_meta') { - const nextSession: StreamingFilePreview = { - ...prevSession, - edit: asPayloadRecord(payload.edit), + const nextSession: FilePreviewSession = { + ...baseSession, + status: prevSession?.status ?? 'pending', + updatedAt: new Date().toISOString(), } - sessions.set(id, nextSession) - activeFilePreviewToolCallIdRef.current = id - setStreamingFileImmediate(nextSession) + applyPreviewSessionUpdate(nextSession) break } if (previewPhase === 'file_preview_content') { const content = typeof payload.content === 'string' ? payload.content : '' - const contentMode = - typeof payload.contentMode === 'string' ? payload.contentMode : undefined - let nextContent: string - if (contentMode === 'snapshot') { - nextContent = content - } else if (contentMode === 'delta') { - nextContent = (prevSession.content ?? '') + content - } else { - const isAppendOp = prevSession.operation === 'append' - const prevContent = streamingFileRef.current?.content ?? '' - nextContent = isAppendOp ? prevContent + content : content + const contentMode = payload.contentMode === 'delta' ? 'delta' : 'snapshot' + const nextPreviewText = + contentMode === 'delta' ? (prevSession?.previewText ?? '') + content : content + const nextSession: FilePreviewSession = { + ...baseSession, + status: 'streaming', + previewText: nextPreviewText, + previewVersion: nextPreviewVersion, + updatedAt: new Date().toISOString(), } - const nextSession: StreamingFilePreview = { - ...prevSession, - content: nextContent, - } - sessions.set(id, nextSession) - activeFilePreviewToolCallIdRef.current = id + applyPreviewSessionUpdate(nextSession) const previewToolIdx = toolMap.get(id) if (previewToolIdx !== undefined && blocks[previewToolIdx].toolCall) { blocks[previewToolIdx].toolCall!.status = 'executing' } - queueStreamingFileUpdate(nextSession) break } if (previewPhase === 'file_preview_complete') { - const fileId = - typeof payload.fileId === 'string' ? payload.fileId : prevSession.fileId const resultData = asPayloadRecord(payload.output) - sessions.delete(id) - activeFilePreviewToolCallIdRef.current = null + const completedAt = new Date().toISOString() + const nextSession: FilePreviewSession = { + ...baseSession, + status: 'complete', + previewVersion: + typeof payload.previewVersion === 'number' && + Number.isFinite(payload.previewVersion) + ? payload.previewVersion + : (prevSession?.previewVersion ?? 0), + updatedAt: completedAt, + completedAt, + } + const nextState = applyCompletedPreviewSession(nextSession) if (fileId && resultData?.id) { - const fileName = (resultData.name as string) ?? prevSession.fileName ?? 'File' + const fileName = (resultData.name as string) ?? nextSession.fileName ?? 'File' const fileResource = { type: 'file' as const, id: fileId, title: fileName } setResources((rs) => { const without = rs.filter((r) => r.id !== 'streaming-file') @@ -1414,11 +1551,21 @@ export function useChat( return [...without, fileResource] }) setActiveResourceId(fileId) + if (nextSession.previewText) { + queryClient.setQueryData( + workspaceFilesKeys.content(workspaceId, fileId, 'text'), + nextSession.previewText + ) + } invalidateResourceQueries(queryClient, workspaceId, 'file', fileId) - } - - if (!activeSubagent || activeSubagent !== FileTool.id) { - setStreamingFileImmediate(null) + } else { + const activePreview = + nextState.activeSessionId !== null + ? (nextState.sessions[nextState.activeSessionId] ?? null) + : null + if (activePreview) { + syncPreviewResourceChrome(activePreview) + } } break } @@ -1544,12 +1691,8 @@ export function useChat( (tc.name === WorkspaceFile.id || tc.name === 'edit_content') && !shouldKeepWorkspacePreviewOpen ) { - filePreviewSessionsRef.current.delete(id) - if (activeFilePreviewToolCallIdRef.current === id) { - activeFilePreviewToolCallIdRef.current = null - if (!activeSubagent || activeSubagent !== FileTool.id) { - setStreamingFileImmediate(null) - } + if (tc.name === WorkspaceFile.id) { + removePreviewSessionImmediate(id) } const fileResource = extractedResources.find((r) => r.type === 'file') if (fileResource) { @@ -1591,7 +1734,7 @@ export function useChat( if (name === 'edit_content') { const parentToolCallId = - activeFilePreviewToolCallIdRef.current ?? streamingFileRef.current?.toolCallId + activePreviewSessionIdRef.current ?? previewSessionRef.current?.toolCallId const parentIdx = parentToolCallId !== null && parentToolCallId !== undefined ? toolMap.get(parentToolCallId) @@ -1767,20 +1910,24 @@ export function useChat( blocks.push({ type: 'subagent', content: name }) } if (name === FileTool.id && !isSameActiveSubagent) { - const emptyFile: StreamingFilePreview = { + applyPreviewSessionUpdate({ + schemaVersion: 1, + id: parentToolCallId || 'file-preview', + streamId: streamIdRef.current ?? '', toolCallId: parentToolCallId || 'file-preview', + status: 'pending', fileName: '', - content: '', - } - setStreamingFileImmediate(emptyFile) + previewText: '', + previewVersion: 0, + updatedAt: new Date().toISOString(), + }) } flush() } else if (spanEvent === MothershipStreamV1SpanLifecycleEvent.end) { if (isPendingPause) { break } - if (streamingFileRef.current && !activeFilePreviewToolCallIdRef.current) { - setStreamingFileImmediate(null) + if (previewSessionRef.current && !activePreviewSessionIdRef.current) { const lastFileResource = resourcesRef.current.find( (r) => r.type === 'file' && r.id !== 'streaming-file' ) @@ -1820,7 +1967,16 @@ export function useChat( } return { sawStreamError, sawComplete: sawCompleteEvent } }, - [workspaceId, queryClient, addResource, removeResource] + [ + workspaceId, + queryClient, + addResource, + removeResource, + applyPreviewSessionUpdate, + applyCompletedPreviewSession, + removePreviewSessionImmediate, + syncPreviewResourceChrome, + ] ) processSSEStreamRef.current = processSSEStream @@ -1859,9 +2015,13 @@ export function useChat( if (!response.ok) { throw new Error(`Stream resume batch failed: ${response.status}`) } - return response.json() + const batch = (await response.json()) as StreamBatchResponse + if (Array.isArray(batch.previewSessions) && batch.previewSessions.length > 0) { + seedPreviewSessions(batch.previewSessions) + } + return batch }, - [] + [seedPreviewSessions] ) const attachToExistingStream = useCallback( @@ -2492,13 +2652,12 @@ export function useChat( useEffect(() => { return () => { - cancelQueuedStreamingFileUpdate() streamReaderRef.current = null abortControllerRef.current = null streamGenRef.current++ sendingRef.current = false } - }, [cancelQueuedStreamingFileUpdate]) + }, []) return { messages, @@ -2518,7 +2677,7 @@ export function useChat( removeFromQueue, sendNow, editQueuedMessage, - streamingFile, + previewSession, genericResourceData, } } diff --git a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-file-preview-sessions.test.tsx b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-file-preview-sessions.test.tsx new file mode 100644 index 0000000000..70209af163 --- /dev/null +++ b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-file-preview-sessions.test.tsx @@ -0,0 +1,147 @@ +/** + * @vitest-environment node + */ +import { describe, expect, it } from 'vitest' +import type { FilePreviewSession } from '@/lib/copilot/request/session' +import { + INITIAL_FILE_PREVIEW_SESSIONS_STATE, + reduceFilePreviewSessions, +} from '@/app/workspace/[workspaceId]/home/hooks/use-file-preview-sessions' + +function createSession( + overrides: Partial & Pick +): FilePreviewSession { + return { + schemaVersion: 1, + id: overrides.id, + streamId: overrides.streamId ?? 'stream-1', + toolCallId: overrides.toolCallId, + status: overrides.status ?? 'streaming', + fileName: overrides.fileName ?? `${overrides.id}.md`, + previewText: overrides.previewText ?? '', + previewVersion: overrides.previewVersion ?? 1, + updatedAt: overrides.updatedAt ?? '2026-04-10T00:00:00.000Z', + ...(overrides.fileId ? { fileId: overrides.fileId } : {}), + ...(overrides.targetKind ? { targetKind: overrides.targetKind } : {}), + ...(overrides.operation ? { operation: overrides.operation } : {}), + ...(overrides.edit ? { edit: overrides.edit } : {}), + ...(overrides.completedAt ? { completedAt: overrides.completedAt } : {}), + } +} + +describe('reduceFilePreviewSessions', () => { + it('hydrates the latest active preview session', () => { + const state = reduceFilePreviewSessions(INITIAL_FILE_PREVIEW_SESSIONS_STATE, { + type: 'hydrate', + sessions: [ + createSession({ + id: 'preview-1', + toolCallId: 'preview-1', + previewVersion: 1, + updatedAt: '2026-04-10T00:00:00.000Z', + }), + createSession({ + id: 'preview-2', + toolCallId: 'preview-2', + previewVersion: 2, + updatedAt: '2026-04-10T00:00:01.000Z', + previewText: 'latest', + }), + ], + }) + + expect(state.activeSessionId).toBe('preview-2') + expect(state.sessions['preview-2']?.previewText).toBe('latest') + }) + + it('drops the active session when it completes and promotes the next active session', () => { + const hydratedState = reduceFilePreviewSessions(INITIAL_FILE_PREVIEW_SESSIONS_STATE, { + type: 'hydrate', + sessions: [ + createSession({ + id: 'preview-1', + toolCallId: 'preview-1', + previewVersion: 1, + updatedAt: '2026-04-10T00:00:00.000Z', + }), + createSession({ + id: 'preview-2', + toolCallId: 'preview-2', + previewVersion: 2, + updatedAt: '2026-04-10T00:00:01.000Z', + }), + ], + }) + const completedState = reduceFilePreviewSessions(hydratedState, { + type: 'complete', + session: createSession({ + id: 'preview-2', + toolCallId: 'preview-2', + status: 'complete', + previewVersion: 3, + updatedAt: '2026-04-10T00:00:02.000Z', + completedAt: '2026-04-10T00:00:02.000Z', + }), + }) + + expect(completedState.activeSessionId).toBe('preview-1') + expect(completedState.sessions['preview-1']?.id).toBe('preview-1') + }) + + it('clears active session when the only session completes', () => { + const onlyStreaming = reduceFilePreviewSessions(INITIAL_FILE_PREVIEW_SESSIONS_STATE, { + type: 'upsert', + session: createSession({ + id: 'preview-1', + toolCallId: 'preview-1', + previewVersion: 2, + updatedAt: '2026-04-10T00:00:01.000Z', + previewText: 'final', + }), + }) + + const completed = reduceFilePreviewSessions(onlyStreaming, { + type: 'complete', + session: createSession({ + id: 'preview-1', + toolCallId: 'preview-1', + status: 'complete', + previewVersion: 3, + updatedAt: '2026-04-10T00:00:02.000Z', + completedAt: '2026-04-10T00:00:02.000Z', + previewText: 'final', + }), + }) + + expect(completed.activeSessionId).toBeNull() + expect(completed.sessions['preview-1']?.status).toBe('complete') + }) + + it('ignores stale complete events for a newer active session', () => { + const activeState = reduceFilePreviewSessions(INITIAL_FILE_PREVIEW_SESSIONS_STATE, { + type: 'upsert', + session: createSession({ + id: 'preview-1', + toolCallId: 'preview-1', + previewVersion: 3, + updatedAt: '2026-04-10T00:00:03.000Z', + }), + }) + + const staleCompleteState = reduceFilePreviewSessions(activeState, { + type: 'complete', + session: createSession({ + id: 'preview-1', + toolCallId: 'preview-1', + status: 'complete', + previewVersion: 2, + updatedAt: '2026-04-10T00:00:02.000Z', + completedAt: '2026-04-10T00:00:02.000Z', + }), + }) + + expect(staleCompleteState.activeSessionId).toBe('preview-1') + expect(staleCompleteState.sessions['preview-1']?.status).toBe('streaming') + expect(staleCompleteState.sessions['preview-1']?.previewVersion).toBe(3) + }) +}) diff --git a/apps/sim/app/workspace/[workspaceId]/home/hooks/use-file-preview-sessions.ts b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-file-preview-sessions.ts new file mode 100644 index 0000000000..075b6c549b --- /dev/null +++ b/apps/sim/app/workspace/[workspaceId]/home/hooks/use-file-preview-sessions.ts @@ -0,0 +1,187 @@ +import { useCallback, useMemo, useReducer } from 'react' +import type { FilePreviewSession } from '@/lib/copilot/request/session' + +export interface FilePreviewSessionsState { + activeSessionId: string | null + sessions: Record +} + +export type FilePreviewSessionsAction = + | { type: 'hydrate'; sessions: FilePreviewSession[] } + | { type: 'upsert'; session: FilePreviewSession; activate?: boolean } + | { type: 'complete'; session: FilePreviewSession } + | { type: 'remove'; sessionId: string } + | { type: 'reset' } + +export const INITIAL_FILE_PREVIEW_SESSIONS_STATE: FilePreviewSessionsState = { + activeSessionId: null, + sessions: {}, +} + +export function shouldReplaceSession( + current: FilePreviewSession | undefined, + next: FilePreviewSession +): boolean { + if (!current) return true + if (next.previewVersion !== current.previewVersion) { + return next.previewVersion > current.previewVersion + } + return next.updatedAt >= current.updatedAt +} + +export function pickActiveSessionId( + sessions: Record, + preferredId?: string | null +): string | null { + if (preferredId && sessions[preferredId]?.status !== 'complete') { + return preferredId + } + + let latestActive: FilePreviewSession | null = null + for (const session of Object.values(sessions)) { + if (session.status === 'complete') continue + if (!latestActive || shouldReplaceSession(latestActive, session)) { + latestActive = session + } + } + + return latestActive?.id ?? null +} + +export function reduceFilePreviewSessions( + state: FilePreviewSessionsState, + action: FilePreviewSessionsAction +): FilePreviewSessionsState { + switch (action.type) { + case 'hydrate': { + if (action.sessions.length === 0) { + return state + } + + const nextSessions = { ...state.sessions } + for (const session of action.sessions) { + if (shouldReplaceSession(nextSessions[session.id], session)) { + nextSessions[session.id] = session + } + } + + return { + sessions: nextSessions, + activeSessionId: pickActiveSessionId(nextSessions, state.activeSessionId), + } + } + + case 'upsert': { + if (!shouldReplaceSession(state.sessions[action.session.id], action.session)) { + return state + } + + const nextSessions = { + ...state.sessions, + [action.session.id]: action.session, + } + + return { + sessions: nextSessions, + activeSessionId: + action.activate === false + ? pickActiveSessionId(nextSessions, state.activeSessionId) + : action.session.status === 'complete' + ? pickActiveSessionId(nextSessions, state.activeSessionId) + : action.session.id, + } + } + + case 'complete': { + if (!shouldReplaceSession(state.sessions[action.session.id], action.session)) { + return state + } + + const nextSessions = { + ...state.sessions, + [action.session.id]: action.session, + } + + return { + sessions: nextSessions, + activeSessionId: + state.activeSessionId === action.session.id + ? pickActiveSessionId(nextSessions, null) + : state.activeSessionId, + } + } + + case 'remove': { + if (!state.sessions[action.sessionId]) { + return state + } + + const nextSessions = { ...state.sessions } + delete nextSessions[action.sessionId] + + return { + sessions: nextSessions, + activeSessionId: + state.activeSessionId === action.sessionId + ? pickActiveSessionId(nextSessions, null) + : state.activeSessionId, + } + } + + case 'reset': + return INITIAL_FILE_PREVIEW_SESSIONS_STATE + + default: + return state + } +} + +export function useFilePreviewSessions() { + const [state, dispatch] = useReducer( + reduceFilePreviewSessions, + INITIAL_FILE_PREVIEW_SESSIONS_STATE + ) + + const previewSession = useMemo( + () => (state.activeSessionId ? (state.sessions[state.activeSessionId] ?? null) : null), + [state.activeSessionId, state.sessions] + ) + + const hydratePreviewSessions = useCallback((sessions: FilePreviewSession[]) => { + dispatch({ type: 'hydrate', sessions }) + }, []) + + const upsertPreviewSession = useCallback( + (session: FilePreviewSession, options?: { activate?: boolean }) => { + dispatch({ + type: 'upsert', + session, + ...(options?.activate === false ? { activate: false } : {}), + }) + }, + [] + ) + + const completePreviewSession = useCallback((session: FilePreviewSession) => { + dispatch({ type: 'complete', session }) + }, []) + + const removePreviewSession = useCallback((sessionId: string) => { + dispatch({ type: 'remove', sessionId }) + }, []) + + const resetPreviewSessions = useCallback(() => { + dispatch({ type: 'reset' }) + }, []) + + return { + previewSession, + previewSessionsById: state.sessions, + activePreviewSessionId: state.activeSessionId, + hydratePreviewSessions, + upsertPreviewSession, + completePreviewSession, + removePreviewSession, + resetPreviewSessions, + } +} diff --git a/apps/sim/hooks/queries/tasks.ts b/apps/sim/hooks/queries/tasks.ts index 4b84635414..1980b7fd39 100644 --- a/apps/sim/hooks/queries/tasks.ts +++ b/apps/sim/hooks/queries/tasks.ts @@ -1,6 +1,7 @@ import { keepPreviousData, useMutation, useQuery, useQueryClient } from '@tanstack/react-query' import type { PersistedMessage } from '@/lib/copilot/chat/persisted-message' import { normalizeMessage } from '@/lib/copilot/chat/persisted-message' +import type { FilePreviewSession } from '@/lib/copilot/request/session' import type { StreamBatchEvent } from '@/lib/copilot/request/session/types' import type { MothershipResource } from '@/app/workspace/[workspaceId]/home/types' @@ -18,7 +19,11 @@ export interface TaskChatHistory { messages: PersistedMessage[] activeStreamId: string | null resources: MothershipResource[] - streamSnapshot?: { events: StreamBatchEvent[]; status: string } | null + streamSnapshot?: { + events: StreamBatchEvent[] + previewSessions: FilePreviewSession[] + status: string + } | null } export const taskKeys = { @@ -112,6 +117,7 @@ export async function fetchChatHistory( : [], activeStreamId: chat.activeStreamId || null, resources: Array.isArray(chat.resources) ? chat.resources : [], + streamSnapshot: chat.streamSnapshot || null, } } diff --git a/apps/sim/lib/copilot/request/go/stream.test.ts b/apps/sim/lib/copilot/request/go/stream.test.ts new file mode 100644 index 0000000000..2e3fb54973 --- /dev/null +++ b/apps/sim/lib/copilot/request/go/stream.test.ts @@ -0,0 +1,52 @@ +/** + * @vitest-environment node + */ +import { describe, expect, it } from 'vitest' +import { + buildPreviewContentUpdate, + decodeJsonStringPrefix, + extractEditContent, +} from '@/lib/copilot/request/go/stream' + +describe('copilot go stream helpers', () => { + it('decodes complete escapes and stops at incomplete unicode escapes', () => { + expect(decodeJsonStringPrefix('hello\\nworld')).toBe('hello\nworld') + expect(decodeJsonStringPrefix('emoji \\u263A')).toBe('emoji ☺') + expect(decodeJsonStringPrefix('partial \\u26')).toBe('partial ') + }) + + it('extracts the streamed edit_content prefix from partial JSON', () => { + expect(extractEditContent('{"content":"hello\\nwor')).toBe('hello\nwor') + expect(extractEditContent('{"content":"tab\\tvalue"}')).toBe('tab\tvalue') + }) + + it('emits full snapshots for append (sidebar viewer uses replace mode; no delta merge)', () => { + expect(buildPreviewContentUpdate('hello', 'hello world', 100, 200, 'append')).toEqual({ + content: 'hello world', + contentMode: 'snapshot', + lastSnapshotAt: 200, + }) + }) + + it('emits deltas for update when the preview extends the previous text', () => { + expect(buildPreviewContentUpdate('hello', 'hello world', 100, 200, 'update')).toEqual({ + content: ' world', + contentMode: 'delta', + lastSnapshotAt: 100, + }) + }) + + it('falls back to snapshots for patches and divergent content', () => { + expect(buildPreviewContentUpdate('hello', 'goodbye', 100, 200, 'update')).toEqual({ + content: 'goodbye', + contentMode: 'snapshot', + lastSnapshotAt: 200, + }) + + expect(buildPreviewContentUpdate('hello', 'hello world', 100, 200, 'patch')).toEqual({ + content: 'hello world', + contentMode: 'snapshot', + lastSnapshotAt: 200, + }) + }) +}) diff --git a/apps/sim/lib/copilot/request/go/stream.ts b/apps/sim/lib/copilot/request/go/stream.ts index fecd6e787d..0a80430508 100644 --- a/apps/sim/lib/copilot/request/go/stream.ts +++ b/apps/sim/lib/copilot/request/go/stream.ts @@ -12,7 +12,14 @@ import { sseHandlers, subAgentHandlers, } from '@/lib/copilot/request/handlers' -import { eventToStreamEvent, isEventRecord } from '@/lib/copilot/request/session' +import { + createFilePreviewSession, + eventToStreamEvent, + type FilePreviewContentMode, + type FilePreviewSession, + isEventRecord, + upsertFilePreviewSession, +} from '@/lib/copilot/request/session' import { shouldSkipToolCallEvent, shouldSkipToolResultEvent } from '@/lib/copilot/request/sse-utils' import type { ExecutionContext, @@ -20,7 +27,14 @@ import type { StreamEvent, StreamingContext, } from '@/lib/copilot/request/types' -import { clearIntentsForWorkspace } from '@/lib/copilot/tools/server/files/file-intent-store' +import { + clearIntentsForWorkspace, + peekFileIntent, +} from '@/lib/copilot/tools/server/files/file-intent-store' +import { + buildFilePreviewText, + loadWorkspaceFileTextForPreview, +} from '@/lib/copilot/tools/server/files/file-preview' const logger = createLogger('CopilotGoStream') @@ -38,11 +52,20 @@ type EditContentStreamState = { lastContentSnapshot?: string } +type FilePreviewStreamState = { + session: FilePreviewSession + lastEmittedPreviewText: string + lastSnapshotAt: number +} + +const PATCH_PREVIEW_SNAPSHOT_INTERVAL_MS = 80 +const DELTA_PREVIEW_CHECKPOINT_INTERVAL_MS = 1000 + /** * Decode a prefix of a JSON-encoded string value, handling escape sequences * that may be incomplete at the end of a streaming chunk. */ -function decodeJsonStringPrefix(input: string): string { +export function decodeJsonStringPrefix(input: string): string { let output = '' for (let i = 0; i < input.length; i++) { const ch = input[i] @@ -109,7 +132,7 @@ function decodeJsonStringPrefix(input: string): string { * Since edit_content has a single field `content`, the JSON is always * `{"content":"..."}`. We find `"content":"` and decode everything after. */ -function extractEditContent(raw: string): string { +export function extractEditContent(raw: string): string { const marker = '"content":' const idx = raw.indexOf(marker) if (idx === -1) return '' @@ -130,6 +153,88 @@ function extractEditContent(raw: string): string { return decodeJsonStringPrefix(inner) } +function isContentOperation( + operation: string | undefined +): operation is 'append' | 'update' | 'patch' { + return operation === 'append' || operation === 'update' || operation === 'patch' +} + +function isDocFormat(fileName: string | undefined): boolean { + return /\.(pptx|docx|pdf)$/i.test(fileName ?? '') +} + +function buildPreviewSessionFromIntent( + streamId: string, + intent: FileIntent, + current?: FilePreviewSession +): FilePreviewSession { + return createFilePreviewSession({ + streamId, + toolCallId: intent.toolCallId, + fileName: intent.target.fileName ?? current?.fileName, + ...(intent.target.fileId ? { fileId: intent.target.fileId } : {}), + ...(intent.target.kind === 'new_file' || intent.target.kind === 'file_id' + ? { targetKind: intent.target.kind } + : {}), + operation: intent.operation, + ...(intent.edit ? { edit: intent.edit } : {}), + ...(typeof current?.baseContent === 'string' ? { baseContent: current.baseContent } : {}), + previewText: current?.previewText ?? '', + previewVersion: current?.previewVersion ?? 0, + status: current?.status ?? 'pending', + completedAt: current?.completedAt, + }) +} + +async function persistFilePreviewSession(session: FilePreviewSession): Promise { + try { + await upsertFilePreviewSession(session) + } catch (error) { + logger.warn('Failed to persist file preview session', { + streamId: session.streamId, + toolCallId: session.toolCallId, + previewVersion: session.previewVersion, + error: error instanceof Error ? error.message : String(error), + }) + } +} + +/** + * Chooses snapshot vs delta emission for `file_preview_content`. + * + * **Append** always uses snapshot mode: the sidebar `FileViewer` gets `streamingMode: 'replace'` + * and must receive the full composed document every tick. Delta mode saved bandwidth but required + * merging chunks with `prevSession.previewText`, which could desync and look like mid-stream overwrites. + */ +export function buildPreviewContentUpdate( + previousText: string, + nextText: string, + lastSnapshotAt: number, + now: number, + operation: string | undefined +): { content: string; contentMode: FilePreviewContentMode; lastSnapshotAt: number } { + const shouldForceSnapshot = + previousText.length === 0 || + !nextText.startsWith(previousText) || + operation === 'patch' || + operation === 'append' || + now - lastSnapshotAt >= DELTA_PREVIEW_CHECKPOINT_INTERVAL_MS + + if (shouldForceSnapshot) { + return { + content: nextText, + contentMode: 'snapshot', + lastSnapshotAt: now, + } + } + + return { + content: nextText.slice(previousText.length), + contentMode: 'delta', + lastSnapshotAt, + } +} + export class CopilotBackendError extends Error { status?: number body?: string @@ -170,10 +275,11 @@ export interface StreamLoopOptions extends OrchestratorOptions { * Handles: fetch -> parse -> normalize -> dedupe -> subagent routing -> handler dispatch. * Callers provide the fetch URL/options and can intercept events via onBeforeDispatch. * - * File preview streaming uses an intent-based approach: - * 1. workspace_file phase:call → store intent (operation, target, edit metadata) - * 2. edit_content phase:args_delta → stream content using stored intent - * 3. edit_content phase:call → consume and clear intent + * File preview streaming: + * 1. workspace_file phase:call → active intent + load base file text for append/patch (awaited here so + * it cannot race edit_content args_delta; tool executor still stores Redis intent async) + * 2. edit_content phase:args_delta → stream preview from base + streamed content + * 3. edit_content phase:result → complete preview; edit_content tool consumes Redis intent */ export async function runStreamLoop( fetchUrl: string, @@ -184,6 +290,7 @@ export async function runStreamLoop( ): Promise { const { timeout = ORCHESTRATION_TIMEOUT_MS, abortSignal } = options const editContentState = new Map() + const filePreviewState = new Map() const fetchSpan = context.trace.startSpan( `HTTP Request → ${new URL(fetchUrl).pathname}`, @@ -272,8 +379,7 @@ export async function runStreamLoop( const fileId = target.fileId as string | undefined const fileName = target.fileName as string | undefined - const isContentOp = - operation === 'append' || operation === 'update' || operation === 'patch' + const isContentOp = isContentOperation(operation) if (context.activeFileIntent && isContentOp) { logger.warn( 'Orphaned workspace_file intent: content-op workspace_file arrived without edit_content for prior intent', @@ -285,7 +391,10 @@ export async function runStreamLoop( } ) if (execContext.workspaceId) { - const cleared = clearIntentsForWorkspace(execContext.workspaceId) + const cleared = await clearIntentsForWorkspace(execContext.workspaceId, { + chatId: execContext.chatId, + messageId: execContext.messageId, + }) if (cleared > 0) { logger.warn('Cleared orphaned execution intents from store', { cleared, @@ -308,8 +417,32 @@ export async function runStreamLoop( ...(edit ? { edit } : {}), } - const isDocFormat = /\.(pptx|docx|pdf)$/i.test(fileName ?? '') - if (!isDocFormat && isContentOp) { + if (!isDocFormat(fileName) && isContentOp) { + let previewBaseContent: string | undefined + if ( + execContext.workspaceId && + fileId && + (operation === 'append' || operation === 'patch') + ) { + previewBaseContent = await loadWorkspaceFileTextForPreview( + execContext.workspaceId, + fileId + ) + } + + let session = buildPreviewSessionFromIntent( + raw.stream.streamId, + context.activeFileIntent + ) + if (previewBaseContent !== undefined) { + session = { ...session, baseContent: previewBaseContent } + } + filePreviewState.set(toolCallId, { + session, + lastEmittedPreviewText: '', + lastSnapshotAt: 0, + }) + await persistFilePreviewSession(session) const scope = streamEvent.scope ? { scope: streamEvent.scope } : {} await options.onEvent?.({ type: MothershipStreamV1EventType.tool, @@ -353,6 +486,66 @@ export async function runStreamLoop( } } + if ( + streamEvent.type === MothershipStreamV1EventType.tool && + streamEvent.payload.phase === MothershipStreamV1ToolPhase.result && + streamEvent.payload.toolName === 'workspace_file' && + context.activeFileIntent && + isContentOperation(context.activeFileIntent.operation) && + context.activeFileIntent.operation === 'patch' && + context.activeFileIntent.edit?.strategy === 'anchored' && + context.activeFileIntent.edit?.mode === 'delete_between' && + execContext.workspaceId && + context.activeFileIntent.target.fileId && + !isDocFormat(context.activeFileIntent.target.fileName) + ) { + const currentPreview = filePreviewState.get(context.activeFileIntent.toolCallId) + const previewText = buildFilePreviewText({ + operation: 'patch', + streamedContent: '', + existingContent: currentPreview?.session.baseContent, + edit: currentPreview?.session.edit as Record | undefined, + }) + + if (previewText !== undefined) { + const baseSession = buildPreviewSessionFromIntent( + raw.stream.streamId, + context.activeFileIntent, + currentPreview?.session + ) + const nextSession: FilePreviewSession = { + ...baseSession, + status: 'streaming', + previewText, + previewVersion: (currentPreview?.session.previewVersion ?? 0) + 1, + updatedAt: new Date().toISOString(), + } + filePreviewState.set(context.activeFileIntent.toolCallId, { + session: nextSession, + lastEmittedPreviewText: previewText, + lastSnapshotAt: Date.now(), + }) + await persistFilePreviewSession(nextSession) + await options.onEvent?.({ + type: MothershipStreamV1EventType.tool, + payload: { + toolCallId: nextSession.toolCallId, + toolName: 'workspace_file', + previewPhase: 'file_preview_content', + content: previewText, + contentMode: 'snapshot', + previewVersion: nextSession.previewVersion, + fileName: nextSession.fileName, + ...(nextSession.fileId ? { fileId: nextSession.fileId } : {}), + ...(nextSession.targetKind ? { targetKind: nextSession.targetKind } : {}), + ...(nextSession.operation ? { operation: nextSession.operation } : {}), + ...(nextSession.edit ? { edit: nextSession.edit } : {}), + }, + ...(streamEvent.scope ? { scope: streamEvent.scope } : {}), + }) + } + } + // ── edit_content phase:args_delta → stream content using stored intent ── if ( streamEvent.type === MothershipStreamV1EventType.tool && @@ -367,24 +560,129 @@ export async function runStreamLoop( state.raw += delta if (context.activeFileIntent) { - const isDocFormat = /\.(pptx|docx|pdf)$/i.test( - context.activeFileIntent.target.fileName ?? '' - ) - if (!isDocFormat) { + if (!isDocFormat(context.activeFileIntent.target.fileName)) { const streamedContent = extractEditContent(state.raw) if (streamedContent !== (state.lastContentSnapshot ?? '')) { state.lastContentSnapshot = streamedContent - await options.onEvent?.({ - type: MothershipStreamV1EventType.tool, - payload: { - toolCallId: context.activeFileIntent.toolCallId, - toolName: 'workspace_file', - previewPhase: 'file_preview_content', - content: streamedContent, - contentMode: 'snapshot', - }, - ...(streamEvent.scope ? { scope: streamEvent.scope } : {}), + let currentPreview = filePreviewState.get(context.activeFileIntent.toolCallId) ?? { + session: buildPreviewSessionFromIntent( + raw.stream.streamId, + context.activeFileIntent + ), + lastEmittedPreviewText: '', + lastSnapshotAt: 0, + } + + /** + * Fallback: primary base is set on `workspace_file` call via + * {@link loadWorkspaceFileTextForPreview}. This only runs when that load failed or + * hydrated sessions lack `baseContent`, once Redis intent is available. + */ + if ( + currentPreview.session.baseContent === undefined && + (context.activeFileIntent.operation === 'append' || + context.activeFileIntent.operation === 'patch') && + execContext.workspaceId && + context.activeFileIntent.target.fileId + ) { + const intentBase = await peekFileIntent( + execContext.workspaceId, + context.activeFileIntent.target.fileId, + { + chatId: execContext.chatId, + messageId: execContext.messageId, + } + ) + if (typeof intentBase?.existingContent === 'string') { + const seededSession: FilePreviewSession = { + ...currentPreview.session, + baseContent: intentBase.existingContent, + ...(intentBase.edit + ? { edit: intentBase.edit as Record } + : {}), + } + currentPreview = { + ...currentPreview, + session: seededSession, + } + filePreviewState.set(context.activeFileIntent.toolCallId, currentPreview) + await persistFilePreviewSession(seededSession) + } + } + + const previewText = buildFilePreviewText({ + operation: context.activeFileIntent.operation as 'append' | 'update' | 'patch', + streamedContent, + existingContent: currentPreview.session.baseContent, + edit: currentPreview.session.edit as Record | undefined, }) + + if (previewText !== undefined) { + const baseSession = buildPreviewSessionFromIntent( + raw.stream.streamId, + context.activeFileIntent, + currentPreview?.session + ) + const now = Date.now() + const nextSession: FilePreviewSession = { + ...baseSession, + status: 'streaming', + previewText, + previewVersion: (currentPreview?.session.previewVersion ?? 0) + 1, + updatedAt: new Date(now).toISOString(), + } + + await persistFilePreviewSession(nextSession) + + if ( + nextSession.operation === 'patch' && + now - (currentPreview?.lastSnapshotAt ?? 0) < PATCH_PREVIEW_SNAPSHOT_INTERVAL_MS + ) { + filePreviewState.set(context.activeFileIntent.toolCallId, { + session: nextSession, + lastEmittedPreviewText: currentPreview?.lastEmittedPreviewText ?? '', + lastSnapshotAt: currentPreview?.lastSnapshotAt ?? 0, + }) + } else { + const previewUpdate = buildPreviewContentUpdate( + currentPreview?.lastEmittedPreviewText ?? '', + nextSession.previewText, + currentPreview?.lastSnapshotAt ?? 0, + now, + nextSession.operation + ) + + filePreviewState.set(context.activeFileIntent.toolCallId, { + session: nextSession, + lastEmittedPreviewText: nextSession.previewText, + lastSnapshotAt: previewUpdate.lastSnapshotAt, + }) + + await options.onEvent?.({ + type: MothershipStreamV1EventType.tool, + payload: { + toolCallId: nextSession.toolCallId, + toolName: 'workspace_file', + previewPhase: 'file_preview_content', + content: previewUpdate.content, + contentMode: previewUpdate.contentMode, + previewVersion: nextSession.previewVersion, + fileName: nextSession.fileName, + ...(nextSession.fileId ? { fileId: nextSession.fileId } : {}), + ...(nextSession.targetKind ? { targetKind: nextSession.targetKind } : {}), + ...(nextSession.operation ? { operation: nextSession.operation } : {}), + ...(nextSession.edit ? { edit: nextSession.edit } : {}), + }, + ...(streamEvent.scope ? { scope: streamEvent.scope } : {}), + }) + } + } else { + filePreviewState.set(context.activeFileIntent.toolCallId, { + session: currentPreview.session, + lastEmittedPreviewText: currentPreview.lastEmittedPreviewText, + lastSnapshotAt: currentPreview.lastSnapshotAt, + }) + } } } } @@ -411,6 +709,57 @@ export async function runStreamLoop( streamEvent.payload.toolName === 'edit_content' && context.activeFileIntent ) { + const currentPreview = filePreviewState.get(context.activeFileIntent.toolCallId) + const completedAt = new Date().toISOString() + + if ( + currentPreview && + currentPreview.lastEmittedPreviewText !== currentPreview.session.previewText && + currentPreview.session.previewText.length > 0 + ) { + filePreviewState.set(context.activeFileIntent.toolCallId, { + session: currentPreview.session, + lastEmittedPreviewText: currentPreview.session.previewText, + lastSnapshotAt: Date.now(), + }) + await options.onEvent?.({ + type: MothershipStreamV1EventType.tool, + payload: { + toolCallId: currentPreview.session.toolCallId, + toolName: 'workspace_file', + previewPhase: 'file_preview_content', + content: currentPreview.session.previewText, + contentMode: 'snapshot', + previewVersion: currentPreview.session.previewVersion, + fileName: currentPreview.session.fileName, + ...(currentPreview.session.fileId ? { fileId: currentPreview.session.fileId } : {}), + ...(currentPreview.session.targetKind + ? { targetKind: currentPreview.session.targetKind } + : {}), + ...(currentPreview.session.operation + ? { operation: currentPreview.session.operation } + : {}), + ...(currentPreview.session.edit ? { edit: currentPreview.session.edit } : {}), + }, + ...(streamEvent.scope ? { scope: streamEvent.scope } : {}), + }) + } + + if (currentPreview) { + const completedSession: FilePreviewSession = { + ...currentPreview.session, + status: 'complete', + updatedAt: completedAt, + completedAt, + } + filePreviewState.set(context.activeFileIntent.toolCallId, { + session: completedSession, + lastEmittedPreviewText: completedSession.previewText, + lastSnapshotAt: Date.now(), + }) + await persistFilePreviewSession(completedSession) + } + await options.onEvent?.({ type: MothershipStreamV1EventType.tool, payload: { @@ -419,6 +768,7 @@ export async function runStreamLoop( previewPhase: 'file_preview_complete', fileId: context.activeFileIntent.target.fileId, output: streamEvent.payload.output, + ...(currentPreview ? { previewVersion: currentPreview.session.previewVersion } : {}), }, ...(streamEvent.scope ? { scope: streamEvent.scope } : {}), }) diff --git a/apps/sim/lib/copilot/request/lifecycle/run.ts b/apps/sim/lib/copilot/request/lifecycle/run.ts index 00f2dddb0b..0dd9c9567a 100644 --- a/apps/sim/lib/copilot/request/lifecycle/run.ts +++ b/apps/sim/lib/copilot/request/lifecycle/run.ts @@ -403,6 +403,8 @@ async function buildExecutionContext( if (userTimezone) execContext.userTimezone = userTimezone execContext.copilotToolExecution = true if (requestMode) execContext.requestMode = requestMode + execContext.messageId = + typeof requestPayload?.messageId === 'string' ? requestPayload.messageId : undefined execContext.executionId = executionId execContext.runId = runId execContext.abortSignal = abortSignal diff --git a/apps/sim/lib/copilot/request/lifecycle/start.test.ts b/apps/sim/lib/copilot/request/lifecycle/start.test.ts index d8e992381e..9c5ee7adac 100644 --- a/apps/sim/lib/copilot/request/lifecycle/start.test.ts +++ b/apps/sim/lib/copilot/request/lifecycle/start.test.ts @@ -10,7 +10,9 @@ const { createRunSegment, updateRunStatus, resetBuffer, + clearFilePreviewSessions, scheduleBufferCleanup, + scheduleFilePreviewSessionCleanup, allocateCursor, appendEvent, cleanupAbortMarker, @@ -21,7 +23,9 @@ const { createRunSegment: vi.fn(), updateRunStatus: vi.fn(), resetBuffer: vi.fn(), + clearFilePreviewSessions: vi.fn(), scheduleBufferCleanup: vi.fn(), + scheduleFilePreviewSessionCleanup: vi.fn(), allocateCursor: vi.fn(), appendEvent: vi.fn(), cleanupAbortMarker: vi.fn(), @@ -42,7 +46,9 @@ let mockPublisherController: ReadableStreamDefaultController | null = null vi.mock('@/lib/copilot/request/session', () => ({ resetBuffer, + clearFilePreviewSessions, scheduleBufferCleanup, + scheduleFilePreviewSessionCleanup, allocateCursor, appendEvent, cleanupAbortMarker, @@ -110,7 +116,9 @@ describe('createSSEStream terminal error handling', () => { beforeEach(() => { vi.clearAllMocks() resetBuffer.mockResolvedValue(undefined) + clearFilePreviewSessions.mockResolvedValue(undefined) scheduleBufferCleanup.mockResolvedValue(undefined) + scheduleFilePreviewSessionCleanup.mockResolvedValue(undefined) allocateCursor .mockResolvedValueOnce({ seq: 1, cursor: '1' }) .mockResolvedValueOnce({ seq: 2, cursor: '2' }) diff --git a/apps/sim/lib/copilot/request/lifecycle/start.ts b/apps/sim/lib/copilot/request/lifecycle/start.ts index 34454db99c..da5d536c51 100644 --- a/apps/sim/lib/copilot/request/lifecycle/start.ts +++ b/apps/sim/lib/copilot/request/lifecycle/start.ts @@ -14,11 +14,13 @@ import type { CopilotLifecycleOptions } from '@/lib/copilot/request/lifecycle/ru import { runCopilotLifecycle } from '@/lib/copilot/request/lifecycle/run' import { cleanupAbortMarker, + clearFilePreviewSessions, registerActiveStream, releasePendingChatStream, resetBuffer, StreamWriter, scheduleBufferCleanup, + scheduleFilePreviewSessionCleanup, startAbortPoller, unregisterActiveStream, } from '@/lib/copilot/request/session' @@ -91,6 +93,7 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS | undefined await resetBuffer(streamId) + await clearFilePreviewSessions(streamId) if (chatId) { createRunSegment({ @@ -207,6 +210,7 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS await releasePendingChatStream(chatId, streamId) } await scheduleBufferCleanup(streamId) + await scheduleFilePreviewSessionCleanup(streamId) await cleanupAbortMarker(streamId) const trace = collector.build({ diff --git a/apps/sim/lib/copilot/request/session/file-preview-session.test.ts b/apps/sim/lib/copilot/request/session/file-preview-session.test.ts new file mode 100644 index 0000000000..c994dae0b6 --- /dev/null +++ b/apps/sim/lib/copilot/request/session/file-preview-session.test.ts @@ -0,0 +1,42 @@ +/** + * @vitest-environment node + */ +import { describe, expect, it } from 'vitest' +import { + createFilePreviewSession, + sortFilePreviewSessions, +} from '@/lib/copilot/request/session/file-preview-session' + +describe('file preview session helpers', () => { + it('preserves baseContent when creating a preview session', () => { + const session = createFilePreviewSession({ + streamId: 'stream-1', + toolCallId: 'preview-1', + fileName: 'draft.md', + baseContent: 'existing content', + }) + + expect(session.baseContent).toBe('existing content') + }) + + it('sorts preview sessions by updatedAt across tool call ids', () => { + const sessions = sortFilePreviewSessions([ + createFilePreviewSession({ + streamId: 'stream-1', + toolCallId: 'preview-2', + fileName: 'b.md', + previewVersion: 10, + updatedAt: '2026-04-10T00:00:02.000Z', + }), + createFilePreviewSession({ + streamId: 'stream-1', + toolCallId: 'preview-1', + fileName: 'a.md', + previewVersion: 1, + updatedAt: '2026-04-10T00:00:01.000Z', + }), + ]) + + expect(sessions.map((session) => session.id)).toEqual(['preview-1', 'preview-2']) + }) +}) diff --git a/apps/sim/lib/copilot/request/session/file-preview-session.ts b/apps/sim/lib/copilot/request/session/file-preview-session.ts new file mode 100644 index 0000000000..31a793f4ac --- /dev/null +++ b/apps/sim/lib/copilot/request/session/file-preview-session.ts @@ -0,0 +1,211 @@ +import { createLogger } from '@sim/logger' +import { getRedisClient } from '@/lib/core/config/redis' +import { getStreamConfig } from './buffer' + +const logger = createLogger('FilePreviewSessionStore') + +const STREAM_OUTBOX_PREFIX = 'mothership_stream:' +const DEFAULT_COMPLETED_TTL_SECONDS = 5 * 60 +const RETRY_DELAYS_MS = [0, 50, 150] as const + +export const FILE_PREVIEW_SESSION_SCHEMA_VERSION = 1 as const + +export type FilePreviewTargetKind = 'new_file' | 'file_id' +export type FilePreviewStatus = 'pending' | 'streaming' | 'complete' +export type FilePreviewContentMode = 'delta' | 'snapshot' + +export interface FilePreviewSession { + schemaVersion: typeof FILE_PREVIEW_SESSION_SCHEMA_VERSION + id: string + streamId: string + toolCallId: string + status: FilePreviewStatus + fileName: string + fileId?: string + targetKind?: FilePreviewTargetKind + operation?: string + edit?: Record + baseContent?: string + previewText: string + previewVersion: number + updatedAt: string + completedAt?: string +} + +function getPreviewSessionsKey(streamId: string): string { + return `${STREAM_OUTBOX_PREFIX}${streamId}:preview_sessions` +} + +type RedisOperationMetadata = { + operation: string + streamId: string +} + +async function withRedisRetry( + metadata: RedisOperationMetadata, + operation: (redis: NonNullable>) => Promise +): Promise { + const redis = getRedisClient() + if (!redis) { + throw new Error('Redis is required for mothership preview durability') + } + + let lastError: unknown + + for (let attempt = 0; attempt < RETRY_DELAYS_MS.length; attempt++) { + const delay = RETRY_DELAYS_MS[attempt] + if (delay > 0) { + await new Promise((resolve) => setTimeout(resolve, delay)) + } + + try { + return await operation(redis) + } catch (error) { + lastError = error + logger.warn('Redis preview session operation failed', { + operation: metadata.operation, + streamId: metadata.streamId, + attempt: attempt + 1, + error: error instanceof Error ? error.message : String(error), + }) + } + } + + throw lastError instanceof Error + ? lastError + : new Error(`${metadata.operation} failed for stream ${metadata.streamId}`) +} + +export function createFilePreviewSession(input: { + streamId: string + toolCallId: string + fileName?: string + fileId?: string + targetKind?: FilePreviewTargetKind + operation?: string + edit?: Record + baseContent?: string + previewText?: string + previewVersion?: number + status?: FilePreviewStatus + updatedAt?: string + completedAt?: string +}): FilePreviewSession { + return { + schemaVersion: FILE_PREVIEW_SESSION_SCHEMA_VERSION, + id: input.toolCallId, + streamId: input.streamId, + toolCallId: input.toolCallId, + status: input.status ?? 'pending', + fileName: input.fileName ?? '', + ...(input.fileId ? { fileId: input.fileId } : {}), + ...(input.targetKind ? { targetKind: input.targetKind } : {}), + ...(input.operation ? { operation: input.operation } : {}), + ...(input.edit ? { edit: input.edit } : {}), + ...(typeof input.baseContent === 'string' ? { baseContent: input.baseContent } : {}), + previewText: input.previewText ?? '', + previewVersion: input.previewVersion ?? 0, + updatedAt: input.updatedAt ?? new Date().toISOString(), + ...(input.completedAt ? { completedAt: input.completedAt } : {}), + } +} + +export async function upsertFilePreviewSession( + session: FilePreviewSession +): Promise { + const config = getStreamConfig() + await withRedisRetry( + { operation: 'upsert_preview_session', streamId: session.streamId }, + async (redis) => { + const key = getPreviewSessionsKey(session.streamId) + const pipeline = redis.pipeline() + pipeline.hset(key, session.id, JSON.stringify(session)) + pipeline.expire(key, config.ttlSeconds) + await pipeline.exec() + } + ) + return session +} + +function isFilePreviewSession(value: unknown): value is FilePreviewSession { + if (!value || typeof value !== 'object') { + return false + } + + const record = value as Record + return ( + record.schemaVersion === FILE_PREVIEW_SESSION_SCHEMA_VERSION && + typeof record.id === 'string' && + typeof record.streamId === 'string' && + typeof record.toolCallId === 'string' && + typeof record.status === 'string' && + typeof record.fileName === 'string' && + (record.baseContent === undefined || typeof record.baseContent === 'string') && + typeof record.previewText === 'string' && + typeof record.previewVersion === 'number' && + typeof record.updatedAt === 'string' + ) +} + +export function sortFilePreviewSessions(sessions: FilePreviewSession[]): FilePreviewSession[] { + return [...sessions].sort((a, b) => { + const updatedAtCompare = a.updatedAt.localeCompare(b.updatedAt) + if (updatedAtCompare !== 0) { + return updatedAtCompare + } + return a.id.localeCompare(b.id) + }) +} + +export async function readFilePreviewSessions(streamId: string): Promise { + const raw = await withRedisRetry( + { operation: 'read_preview_sessions', streamId }, + async (redis) => redis.hgetall(getPreviewSessionsKey(streamId)) + ) + + const sessions: FilePreviewSession[] = [] + const values = Object.values(raw ?? {}) + for (const entry of values) { + try { + const parsed = JSON.parse(entry) as unknown + if (!isFilePreviewSession(parsed)) { + logger.warn('Skipping invalid file preview session entry', { streamId }) + continue + } + sessions.push(parsed) + } catch (error) { + logger.warn('Failed to parse file preview session entry', { + streamId, + error: error instanceof Error ? error.message : String(error), + }) + } + } + + return sortFilePreviewSessions(sessions) +} + +export async function clearFilePreviewSessions(streamId: string): Promise { + await withRedisRetry({ operation: 'clear_preview_sessions', streamId }, async (redis) => { + await redis.del(getPreviewSessionsKey(streamId)) + }) +} + +export async function scheduleFilePreviewSessionCleanup( + streamId: string, + ttlSeconds = DEFAULT_COMPLETED_TTL_SECONDS +): Promise { + try { + await withRedisRetry( + { operation: 'schedule_preview_session_cleanup', streamId }, + async (redis) => { + await redis.expire(getPreviewSessionsKey(streamId), ttlSeconds) + } + ) + } catch (error) { + logger.warn('Failed to shorten preview session retention', { + streamId, + ttlSeconds, + error: error instanceof Error ? error.message : String(error), + }) + } +} diff --git a/apps/sim/lib/copilot/request/session/index.ts b/apps/sim/lib/copilot/request/session/index.ts index 61502be65b..ac8986795d 100644 --- a/apps/sim/lib/copilot/request/session/index.ts +++ b/apps/sim/lib/copilot/request/session/index.ts @@ -25,6 +25,20 @@ export { writeAbortMarker, } from './buffer' export { createEvent, eventToStreamEvent, isEventRecord, TOOL_CALL_STATUS } from './event' +export type { + FilePreviewContentMode, + FilePreviewSession, + FilePreviewStatus, + FilePreviewTargetKind, +} from './file-preview-session' +export { + clearFilePreviewSessions, + createFilePreviewSession, + FILE_PREVIEW_SESSION_SCHEMA_VERSION, + readFilePreviewSessions, + scheduleFilePreviewSessionCleanup, + upsertFilePreviewSession, +} from './file-preview-session' export { checkForReplayGap, type ReplayGapResult } from './recovery' export { encodeSSEComment, encodeSSEEnvelope, SSE_RESPONSE_HEADERS } from './sse' export type { StreamEvent } from './types' diff --git a/apps/sim/lib/copilot/tool-executor/types.ts b/apps/sim/lib/copilot/tool-executor/types.ts index 53f43634cc..9087f38f63 100644 --- a/apps/sim/lib/copilot/tool-executor/types.ts +++ b/apps/sim/lib/copilot/tool-executor/types.ts @@ -5,6 +5,7 @@ export interface ToolExecutionContext { workflowId: string workspaceId?: string chatId?: string + messageId?: string executionId?: string runId?: string copilotToolExecution?: boolean diff --git a/apps/sim/lib/copilot/tools/registry/server-tool-adapter.ts b/apps/sim/lib/copilot/tools/registry/server-tool-adapter.ts index efac336d80..d9defa43e1 100644 --- a/apps/sim/lib/copilot/tools/registry/server-tool-adapter.ts +++ b/apps/sim/lib/copilot/tools/registry/server-tool-adapter.ts @@ -18,6 +18,7 @@ export function createServerToolHandler(toolId: string): ToolHandler { workspaceId: context.workspaceId, userPermission: context.userPermission ?? undefined, chatId: context.chatId, + messageId: context.messageId, abortSignal: context.abortSignal, }) diff --git a/apps/sim/lib/copilot/tools/server/files/edit-content.ts b/apps/sim/lib/copilot/tools/server/files/edit-content.ts index 0e5eb93f37..114c0ae7ef 100644 --- a/apps/sim/lib/copilot/tools/server/files/edit-content.ts +++ b/apps/sim/lib/copilot/tools/server/files/edit-content.ts @@ -85,7 +85,10 @@ export const editContentServerTool: BaseServerTool() +export type FileIntentScope = { + chatId?: string + messageId?: string +} + +const logger = createLogger('FileIntentStore') + +const INTENT_TTL_MS = 60 * 60 * 1000 +const INTENT_TTL_SECONDS = INTENT_TTL_MS / 1000 +const REDIS_KEY_PREFIX = 'mothership_file_intent:' +const RETRY_DELAYS_MS = [0, 50, 150] as const +const memoryStore = new Map() function buildKey(workspaceId: string, fileId: string): string { return `${workspaceId}:${fileId}` } +function getWorkspaceRedisKey(workspaceId: string): string { + return `${REDIS_KEY_PREFIX}${workspaceId}` +} + +function scopeMatches(intent: PendingFileIntent, scope?: FileIntentScope): boolean { + return intent.chatId === scope?.chatId && intent.messageId === scope?.messageId +} + +function buildScopedField(fileId: string, scope?: FileIntentScope): string { + return `${scope?.chatId ?? ''}:${scope?.messageId ?? ''}:${fileId}` +} + function cleanupStale(): void { const now = Date.now() - for (const [key, intent] of store) { + for (const [key, intent] of memoryStore) { if (now - intent.createdAt > INTENT_TTL_MS) { - store.delete(key) + memoryStore.delete(key) } } } -export function storeFileIntent( +async function withRedisRetry( + operation: string, + workspaceId: string, + work: (redis: NonNullable>) => Promise +): Promise { + const redis = getRedisClient() + if (!redis) { + throw new Error('Redis client unavailable') + } + + let lastError: unknown + for (let attempt = 0; attempt < RETRY_DELAYS_MS.length; attempt++) { + const delay = RETRY_DELAYS_MS[attempt] + if (delay > 0) { + await new Promise((resolve) => setTimeout(resolve, delay)) + } + + try { + return await work(redis) + } catch (error) { + lastError = error + logger.warn('Redis file intent operation failed', { + operation, + workspaceId, + attempt: attempt + 1, + error: error instanceof Error ? error.message : String(error), + }) + } + } + + throw lastError instanceof Error ? lastError : new Error(`${operation} failed`) +} + +function isStale(intent: PendingFileIntent): boolean { + return Date.now() - intent.createdAt > INTENT_TTL_MS +} + +function parseIntent(raw: string | null | undefined): PendingFileIntent | undefined { + if (!raw) return undefined + try { + const parsed = JSON.parse(raw) as PendingFileIntent + return isStale(parsed) ? undefined : parsed + } catch (error) { + logger.warn('Failed to parse file intent', { + error: error instanceof Error ? error.message : String(error), + }) + return undefined + } +} + +export async function storeFileIntent( workspaceId: string, fileId: string, intent: PendingFileIntent -): void { - cleanupStale() - store.set(buildKey(workspaceId, fileId), intent) +): Promise { + const redis = getRedisClient() + if (!redis) { + cleanupStale() + memoryStore.set(buildKey(workspaceId, buildScopedField(fileId, intent)), intent) + return + } + + await withRedisRetry('store_file_intent', workspaceId, async (client) => { + const key = getWorkspaceRedisKey(workspaceId) + const pipeline = client.pipeline() + pipeline.hset(key, buildScopedField(fileId, intent), JSON.stringify(intent)) + pipeline.expire(key, INTENT_TTL_SECONDS) + await pipeline.exec() + }) } -export function consumeFileIntent( +export async function consumeFileIntent( workspaceId: string, - fileId: string -): PendingFileIntent | undefined { - const key = buildKey(workspaceId, fileId) - const intent = store.get(key) - if (intent) { - store.delete(key) + fileId: string, + scope?: FileIntentScope +): Promise { + const redis = getRedisClient() + if (!redis) { + const key = buildKey(workspaceId, buildScopedField(fileId, scope)) + const intent = memoryStore.get(key) + if (intent) { + memoryStore.delete(key) + } + return intent + } + + const raw = await withRedisRetry('consume_file_intent', workspaceId, async (client) => { + const key = getWorkspaceRedisKey(workspaceId) + const field = buildScopedField(fileId, scope) + const value = await client.hget(key, field) + if (value !== null) { + await client.hdel(key, field) + } + return value + }) + return parseIntent(raw) +} + +export async function peekFileIntent( + workspaceId: string, + fileId: string, + scope?: FileIntentScope +): Promise { + const redis = getRedisClient() + if (!redis) { + cleanupStale() + return memoryStore.get(buildKey(workspaceId, buildScopedField(fileId, scope))) + } + + const raw = await withRedisRetry('peek_file_intent', workspaceId, async (client) => { + const key = getWorkspaceRedisKey(workspaceId) + return client.hget(key, buildScopedField(fileId, scope)) + }) + const intent = parseIntent(raw) + if (!intent && raw !== null) { + await withRedisRetry('clear_stale_file_intent', workspaceId, async (client) => { + await client.hdel(getWorkspaceRedisKey(workspaceId), buildScopedField(fileId, scope)) + }) } return intent } -export function consumeLatestFileIntent(workspaceId: string): PendingFileIntent | undefined { - let latest: PendingFileIntent | undefined - let latestKey: string | undefined - for (const [key, intent] of store) { - if (intent.workspaceId === workspaceId) { - if (!latest || intent.createdAt > latest.createdAt) { - latest = intent - latestKey = key +export async function consumeLatestFileIntent( + workspaceId: string, + scope?: FileIntentScope +): Promise { + const redis = getRedisClient() + if (!redis) { + cleanupStale() + let latest: PendingFileIntent | undefined + let latestKey: string | undefined + for (const [key, intent] of memoryStore) { + if (intent.workspaceId === workspaceId && scopeMatches(intent, scope)) { + if (!latest || intent.createdAt > latest.createdAt) { + latest = intent + latestKey = key + } } } + if (latestKey) { + memoryStore.delete(latestKey) + } + return latest } - if (latestKey) { - store.delete(latestKey) + + const entries = await withRedisRetry('read_workspace_file_intents', workspaceId, async (client) => + client.hgetall(getWorkspaceRedisKey(workspaceId)) + ) + let latest: PendingFileIntent | undefined + let latestField: string | undefined + const staleFields: string[] = [] + for (const [field, raw] of Object.entries(entries)) { + const parsed = parseIntent(raw) + if (!parsed) { + staleFields.push(field) + continue + } + if (!scopeMatches(parsed, scope)) { + continue + } + if (!latest || parsed.createdAt > latest.createdAt) { + latest = parsed + latestField = field + } + } + + const fieldsToDelete = latestField ? [...staleFields, latestField] : staleFields + if (fieldsToDelete.length > 0) { + await withRedisRetry('delete_workspace_file_intents', workspaceId, async (client) => { + await client.hdel(getWorkspaceRedisKey(workspaceId), ...fieldsToDelete) + }) } return latest } -export function clearIntentsForWorkspace(workspaceId: string): number { - let cleared = 0 - for (const [key, intent] of store) { - if (intent.workspaceId === workspaceId) { - store.delete(key) - cleared++ +export async function clearIntentsForWorkspace( + workspaceId: string, + scope?: FileIntentScope +): Promise { + const redis = getRedisClient() + if (!redis) { + let cleared = 0 + for (const [key, intent] of memoryStore) { + if (intent.workspaceId === workspaceId && (!scope || scopeMatches(intent, scope))) { + memoryStore.delete(key) + cleared++ + } + } + return cleared + } + + const key = getWorkspaceRedisKey(workspaceId) + if (!scope) { + const count = await withRedisRetry( + 'count_workspace_file_intents', + workspaceId, + async (client) => client.hlen(key) + ) + await withRedisRetry('clear_workspace_file_intents', workspaceId, async (client) => { + await client.del(key) + }) + return count + } + + const entries = await withRedisRetry('read_workspace_file_intents', workspaceId, async (client) => + client.hgetall(key) + ) + const fieldsToDelete: string[] = [] + for (const [field, raw] of Object.entries(entries)) { + const parsed = parseIntent(raw) + if (parsed && scopeMatches(parsed, scope)) { + fieldsToDelete.push(field) } } - return cleared + if (fieldsToDelete.length > 0) { + await withRedisRetry('clear_scoped_file_intents', workspaceId, async (client) => { + await client.hdel(key, ...fieldsToDelete) + }) + } + return fieldsToDelete.length } diff --git a/apps/sim/lib/copilot/tools/server/files/file-preview.test.ts b/apps/sim/lib/copilot/tools/server/files/file-preview.test.ts new file mode 100644 index 0000000000..52a033c1b6 --- /dev/null +++ b/apps/sim/lib/copilot/tools/server/files/file-preview.test.ts @@ -0,0 +1,79 @@ +/** + * @vitest-environment node + */ +import { describe, expect, it } from 'vitest' +import { buildFilePreviewText } from '@/lib/copilot/tools/server/files/file-preview' + +describe('buildFilePreviewText', () => { + it('returns the full streamed content for update previews', () => { + expect( + buildFilePreviewText({ + operation: 'update', + streamedContent: '', + }) + ).toBe('') + + expect( + buildFilePreviewText({ + operation: 'update', + streamedContent: 'updated body', + }) + ).toBe('updated body') + }) + + it('builds append previews from the existing file content', () => { + expect( + buildFilePreviewText({ + operation: 'append', + existingContent: 'line one', + streamedContent: 'line two', + }) + ).toBe('line one\nline two') + }) + + it('applies anchored replace_between previews', () => { + expect( + buildFilePreviewText({ + operation: 'patch', + existingContent: ['# Title', 'before', 'after', 'footer'].join('\n'), + streamedContent: 'replacement', + edit: { + strategy: 'anchored', + mode: 'replace_between', + before_anchor: '# Title', + after_anchor: 'after', + }, + }) + ).toBe(['# Title', 'replacement', 'after', 'footer'].join('\n')) + }) + + it('applies delete_between previews without streamed replacement text', () => { + expect( + buildFilePreviewText({ + operation: 'patch', + existingContent: ['keep', 'start', 'remove me', 'end', 'keep too'].join('\n'), + streamedContent: '', + edit: { + strategy: 'anchored', + mode: 'delete_between', + start_anchor: 'start', + end_anchor: 'end', + }, + }) + ).toBe(['keep', 'end', 'keep too'].join('\n')) + }) + + it('applies search_replace previews', () => { + expect( + buildFilePreviewText({ + operation: 'patch', + existingContent: 'hello world', + streamedContent: 'sim', + edit: { + strategy: 'search_replace', + search: 'world', + }, + }) + ).toBe('hello sim') + }) +}) diff --git a/apps/sim/lib/copilot/tools/server/files/file-preview.ts b/apps/sim/lib/copilot/tools/server/files/file-preview.ts new file mode 100644 index 0000000000..fd783321b3 --- /dev/null +++ b/apps/sim/lib/copilot/tools/server/files/file-preview.ts @@ -0,0 +1,192 @@ +import { createLogger } from '@sim/logger' +import { + downloadWorkspaceFile, + getWorkspaceFile, +} from '@/lib/uploads/contexts/workspace/workspace-file-manager' + +const logger = createLogger('CopilotFilePreview') + +type FilePreviewEdit = { + strategy?: string + search?: string + replaceAll?: boolean + mode?: string + occurrence?: number + before_anchor?: string + after_anchor?: string + anchor?: string + start_anchor?: string + end_anchor?: string +} + +interface BuildFilePreviewTextOptions { + operation: 'create' | 'append' | 'update' | 'patch' + streamedContent: string + existingContent?: string + edit?: Record +} + +function findAnchorIndex(lines: string[], anchor: string, occurrence = 1, afterIndex = -1): number { + const trimmed = anchor.trim() + let count = 0 + for (let i = afterIndex + 1; i < lines.length; i++) { + if (lines[i].trim() === trimmed) { + count++ + if (count === occurrence) return i + } + } + return -1 +} + +function extractPatchPreview( + streamedContent: string, + existingContent: string, + edit?: Record +): string | undefined { + const strategy = typeof edit?.strategy === 'string' ? edit.strategy : undefined + const lines = existingContent.split('\n') + const occurrence = + typeof edit?.occurrence === 'number' && Number.isFinite(edit.occurrence) ? edit.occurrence : 1 + + if (strategy === 'search_replace') { + const search = typeof edit?.search === 'string' ? edit.search : '' + if (!search) return undefined + if (edit?.replaceAll === true) { + return existingContent.split(search).join(streamedContent) + } + const firstIdx = existingContent.indexOf(search) + if (firstIdx === -1) return undefined + return ( + existingContent.slice(0, firstIdx) + + streamedContent + + existingContent.slice(firstIdx + search.length) + ) + } + + const mode = typeof edit?.mode === 'string' ? edit.mode : undefined + if (!mode) return undefined + + if (mode === 'replace_between') { + const beforeAnchor = typeof edit?.before_anchor === 'string' ? edit.before_anchor : undefined + const afterAnchor = typeof edit?.after_anchor === 'string' ? edit.after_anchor : undefined + if (!beforeAnchor || !afterAnchor) return undefined + + const beforeIdx = findAnchorIndex(lines, beforeAnchor, occurrence) + const afterIdx = findAnchorIndex(lines, afterAnchor, occurrence, beforeIdx) + if (beforeIdx === -1 || afterIdx === -1 || afterIdx <= beforeIdx) return undefined + + const spliced = [ + ...lines.slice(0, beforeIdx + 1), + ...(streamedContent.length > 0 ? streamedContent.split('\n') : []), + ...lines.slice(afterIdx), + ] + return spliced.join('\n') + } + + if (mode === 'insert_after') { + const anchor = typeof edit?.anchor === 'string' ? edit.anchor : undefined + if (!anchor) return undefined + + const anchorIdx = findAnchorIndex(lines, anchor, occurrence) + if (anchorIdx === -1) return undefined + + const spliced = [ + ...lines.slice(0, anchorIdx + 1), + ...(streamedContent.length > 0 ? streamedContent.split('\n') : []), + ...lines.slice(anchorIdx + 1), + ] + return spliced.join('\n') + } + + if (mode === 'delete_between') { + const startAnchor = typeof edit?.start_anchor === 'string' ? edit.start_anchor : undefined + const endAnchor = typeof edit?.end_anchor === 'string' ? edit.end_anchor : undefined + if (!startAnchor || !endAnchor) return undefined + + const startIdx = findAnchorIndex(lines, startAnchor, occurrence) + const endIdx = findAnchorIndex(lines, endAnchor, occurrence, startIdx) + if (startIdx === -1 || endIdx === -1 || endIdx <= startIdx) return undefined + + const spliced = [...lines.slice(0, startIdx), ...lines.slice(endIdx)] + return spliced.join('\n') + } + + return undefined +} + +function shouldApplyPatchPreview(streamedContent: string, edit?: Record): boolean { + const strategy = typeof edit?.strategy === 'string' ? edit.strategy : undefined + const mode = typeof edit?.mode === 'string' ? edit.mode : undefined + + if (strategy === 'anchored' && mode === 'delete_between') { + return true + } + + return streamedContent.length > 0 +} + +function buildAppendPreview(existingContent: string, incomingContent: string): string { + if (incomingContent.length === 0) return existingContent + if (existingContent.length === 0) return incomingContent + return `${existingContent}\n${incomingContent}` +} + +/** + * Reads the current UTF-8 text of a workspace file for streaming previews. + * + * Preview runs in the SSE loop on `workspace_file` **call** events, which are + * processed **before** the async tool executor persists {@link storeFileIntent}. + * Loading the base here avoids a race where `edit_content` `args_delta` arrives + * before Redis holds `existingContent`, which would make append previews look like + * full-file replacement until the intent landed. + */ +export async function loadWorkspaceFileTextForPreview( + workspaceId: string, + fileId: string +): Promise { + try { + const record = await getWorkspaceFile(workspaceId, fileId) + if (!record) return undefined + const buffer = await downloadWorkspaceFile(record) + return buffer.toString('utf-8') + } catch (error) { + logger.warn('Failed to load workspace file text for preview', { + workspaceId, + fileId, + error: error instanceof Error ? error.message : String(error), + }) + return undefined + } +} + +export function buildFilePreviewText({ + operation, + streamedContent, + existingContent, + edit, +}: BuildFilePreviewTextOptions): string | undefined { + if (operation === 'update') { + return streamedContent + } + + if (operation === 'create') { + return streamedContent + } + + if (operation === 'append') { + if (existingContent !== undefined) { + return buildAppendPreview(existingContent, streamedContent) + } + return streamedContent + } + + if (existingContent === undefined) { + return undefined + } + + if (!shouldApplyPatchPreview(streamedContent, edit)) { + return undefined + } + + return extractPatchPreview(streamedContent, existingContent, edit) +} diff --git a/apps/sim/lib/copilot/tools/server/files/workspace-file.ts b/apps/sim/lib/copilot/tools/server/files/workspace-file.ts index dd23523083..a9ae344192 100644 --- a/apps/sim/lib/copilot/tools/server/files/workspace-file.ts +++ b/apps/sim/lib/copilot/tools/server/files/workspace-file.ts @@ -264,11 +264,13 @@ export const workspaceFileServerTool: BaseServerTool