durable stream for files

This commit is contained in:
Vikhyath Mondreti
2026-04-10 14:58:36 -07:00
parent ac84c6224b
commit 09f06cb683
27 changed files with 2059 additions and 523 deletions

View File

@@ -3,6 +3,7 @@ import { copilotChats } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, desc, eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { getLatestRunForStream } from '@/lib/copilot/async-runs/repository'
import { getAccessibleCopilotChat } from '@/lib/copilot/chat/lifecycle'
import {
authenticateCopilotRequestSessionOnly,
@@ -10,6 +11,9 @@ import {
createInternalServerErrorResponse,
createUnauthorizedResponse,
} from '@/lib/copilot/request/http'
import { readFilePreviewSessions } from '@/lib/copilot/request/session'
import { readEvents } from '@/lib/copilot/request/session/buffer'
import { toStreamBatchEvent } from '@/lib/copilot/request/session/types'
import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils'
import { assertActiveWorkspaceAccess } from '@/lib/workspaces/permissions/utils'
@@ -63,8 +67,60 @@ export async function GET(req: NextRequest) {
return NextResponse.json({ success: false, error: 'Chat not found' }, { status: 404 })
}
let streamSnapshot: {
events: ReturnType<typeof toStreamBatchEvent>[]
previewSessions: Awaited<ReturnType<typeof readFilePreviewSessions>>
status: string
} | null = null
if (chat.conversationId) {
try {
const [events, previewSessions, run] = await Promise.all([
readEvents(chat.conversationId, '0'),
readFilePreviewSessions(chat.conversationId).catch((error) => {
logger.warn('Failed to read preview sessions for copilot chat', {
chatId,
conversationId: chat.conversationId,
error: error instanceof Error ? error.message : String(error),
})
return []
}),
getLatestRunForStream(chat.conversationId, authenticatedUserId).catch((error) => {
logger.warn('Failed to fetch latest run for copilot chat snapshot', {
chatId,
conversationId: chat.conversationId,
error: error instanceof Error ? error.message : String(error),
})
return null
}),
])
streamSnapshot = {
events: events.map(toStreamBatchEvent),
previewSessions,
status:
typeof run?.status === 'string'
? run.status
: events.length > 0
? 'active'
: 'unknown',
}
} catch (error) {
logger.warn('Failed to load copilot chat stream snapshot', {
chatId,
conversationId: chat.conversationId,
error: error instanceof Error ? error.message : String(error),
})
}
}
logger.info(`Retrieved chat ${chatId}`)
return NextResponse.json({ success: true, chat: transformChat(chat) })
return NextResponse.json({
success: true,
chat: {
...transformChat(chat),
...(streamSnapshot ? { streamSnapshot } : {}),
},
})
}
if (!workflowId && !workspaceId) {

View File

@@ -12,11 +12,13 @@ import {
const {
getLatestRunForStream,
readEvents,
readFilePreviewSessions,
checkForReplayGap,
authenticateCopilotRequestSessionOnly,
} = vi.hoisted(() => ({
getLatestRunForStream: vi.fn(),
readEvents: vi.fn(),
readFilePreviewSessions: vi.fn(),
checkForReplayGap: vi.fn(),
authenticateCopilotRequestSessionOnly: vi.fn(),
}))
@@ -27,6 +29,7 @@ vi.mock('@/lib/copilot/async-runs/repository', () => ({
vi.mock('@/lib/copilot/request/session', () => ({
readEvents,
readFilePreviewSessions,
checkForReplayGap,
createEvent: (event: Record<string, unknown>) => ({
stream: {
@@ -74,9 +77,50 @@ describe('copilot chat stream replay route', () => {
isAuthenticated: true,
})
readEvents.mockResolvedValue([])
readFilePreviewSessions.mockResolvedValue([])
checkForReplayGap.mockResolvedValue(null)
})
it('returns preview sessions in batch mode', async () => {
getLatestRunForStream.mockResolvedValue({
status: 'active',
executionId: 'exec-1',
id: 'run-1',
})
readFilePreviewSessions.mockResolvedValue([
{
schemaVersion: 1,
id: 'preview-1',
streamId: 'stream-1',
toolCallId: 'preview-1',
status: 'streaming',
fileName: 'draft.md',
previewText: 'hello',
previewVersion: 2,
updatedAt: '2026-04-10T00:00:00.000Z',
},
])
const response = await GET(
new NextRequest(
'http://localhost:3000/api/copilot/chat/stream?streamId=stream-1&after=0&batch=true'
)
)
expect(response.status).toBe(200)
await expect(response.json()).resolves.toMatchObject({
success: true,
previewSessions: [
expect.objectContaining({
id: 'preview-1',
previewText: 'hello',
previewVersion: 2,
}),
],
status: 'active',
})
})
it('stops replay polling when run becomes cancelled', async () => {
getLatestRunForStream
.mockResolvedValueOnce({

View File

@@ -11,6 +11,7 @@ import {
createEvent,
encodeSSEEnvelope,
readEvents,
readFilePreviewSessions,
SSE_RESPONSE_HEADERS,
} from '@/lib/copilot/request/session'
import { toStreamBatchEvent } from '@/lib/copilot/request/session/types'
@@ -113,17 +114,28 @@ export async function GET(request: NextRequest) {
if (batchMode) {
const afterSeq = afterCursor || '0'
const events = await readEvents(streamId, afterSeq)
const [events, previewSessions] = await Promise.all([
readEvents(streamId, afterSeq),
readFilePreviewSessions(streamId).catch((error) => {
logger.warn('Failed to read preview sessions for stream batch', {
streamId,
error: error instanceof Error ? error.message : String(error),
})
return []
}),
])
const batchEvents = events.map(toStreamBatchEvent)
logger.info('[Resume] Batch response', {
streamId,
afterCursor: afterSeq,
eventCount: batchEvents.length,
previewSessionCount: previewSessions.length,
runStatus: run.status,
})
return NextResponse.json({
success: true,
events: batchEvents,
previewSessions,
status: run.status,
})
}

View File

@@ -4,6 +4,7 @@ import { createLogger } from '@sim/logger'
import { and, eq, sql } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { z } from 'zod'
import { getLatestRunForStream } from '@/lib/copilot/async-runs/repository'
import { getAccessibleCopilotChat } from '@/lib/copilot/chat/lifecycle'
import {
authenticateCopilotRequestSessionOnly,
@@ -11,7 +12,9 @@ import {
createInternalServerErrorResponse,
createUnauthorizedResponse,
} from '@/lib/copilot/request/http'
import type { FilePreviewSession } from '@/lib/copilot/request/session'
import { readEvents } from '@/lib/copilot/request/session/buffer'
import { readFilePreviewSessions } from '@/lib/copilot/request/session/file-preview-session'
import { type StreamBatchEvent, toStreamBatchEvent } from '@/lib/copilot/request/session/types'
import { taskPubSub } from '@/lib/copilot/tasks'
import { captureServerEvent } from '@/lib/posthog/server'
@@ -49,16 +52,37 @@ export async function GET(
let streamSnapshot: {
events: StreamBatchEvent[]
previewSessions: FilePreviewSession[]
status: string
} | null = null
if (chat.conversationId) {
try {
const events = await readEvents(chat.conversationId, '0')
const [events, previewSessions] = await Promise.all([
readEvents(chat.conversationId, '0'),
readFilePreviewSessions(chat.conversationId).catch((error) => {
logger.warn('Failed to read preview sessions for mothership chat', {
chatId,
conversationId: chat.conversationId,
error: error instanceof Error ? error.message : String(error),
})
return []
}),
])
const run = await getLatestRunForStream(chat.conversationId, userId).catch((error) => {
logger.warn('Failed to fetch latest run for mothership chat snapshot', {
chatId,
conversationId: chat.conversationId,
error: error instanceof Error ? error.message : String(error),
})
return null
})
streamSnapshot = {
events: events.map(toStreamBatchEvent),
status: events.length > 0 ? 'active' : 'unknown',
previewSessions,
status:
typeof run?.status === 'string' ? run.status : events.length > 0 ? 'active' : 'unknown',
}
} catch (error) {
logger.warn('Failed to read stream snapshot for mothership chat', {

View File

@@ -29,7 +29,6 @@ import {
useWorkspaceFileContent,
} from '@/hooks/queries/workspace-files'
import { useAutosave } from '@/hooks/use-autosave'
import { useStreamingText } from '@/hooks/use-streaming-text'
import { DataTable } from './data-table'
import { PreviewPanel, resolvePreviewType } from './preview-panel'
@@ -526,8 +525,7 @@ function TextEditor({
const isStreaming = isStreamInteractionLocked
const isEditorReadOnly = isStreamInteractionLocked || !canEdit
const revealedContent = useStreamingText(content, false)
const renderedContent = isStreaming ? revealedContent : content
const renderedContent = content
const gutterWidthPx = useMemo(() => {
const lineCount = renderedContent.split('\n').length
return calculateGutterWidth(lineCount)
@@ -731,7 +729,7 @@ function TextEditor({
const el = (shouldUseCodeRenderer ? codeScrollRef.current : textareaRef.current) ?? null
if (!el) return
el.scrollTop = el.scrollHeight
}, [isStreaming, revealedContent, shouldUseCodeRenderer])
}, [isStreaming, renderedContent, shouldUseCodeRenderer])
if (streamingContent === undefined) {
if (isLoading) return DOCUMENT_SKELETON

View File

@@ -1,8 +1,7 @@
'use client'
import { lazy, memo, Suspense, useCallback, useEffect, useMemo, useState } from 'react'
import { lazy, memo, Suspense, useCallback, useEffect, useMemo } from 'react'
import { createLogger } from '@sim/logger'
import { useQueryClient } from '@tanstack/react-query'
import { Square } from 'lucide-react'
import { useRouter } from 'next/navigation'
import { Button, PlayOutline, Skeleton, Tooltip } from '@/components/emcn'
@@ -13,6 +12,7 @@ import {
SquareArrowUpRight,
WorkflowX,
} from '@/components/emcn/icons'
import type { FilePreviewSession } from '@/lib/copilot/request/session'
import {
cancelRunToolExecution,
markRunToolManuallyStopped,
@@ -47,11 +47,7 @@ import { useUsageLimits } from '@/app/workspace/[workspaceId]/w/[workflowId]/com
import { useWorkflowExecution } from '@/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution'
import { useFolders } from '@/hooks/queries/folders'
import { useWorkflows } from '@/hooks/queries/workflows'
import {
useWorkspaceFileContent,
useWorkspaceFiles,
workspaceFilesKeys,
} from '@/hooks/queries/workspace-files'
import { useWorkspaceFiles } from '@/hooks/queries/workspace-files'
import { useSettingsNavigation } from '@/hooks/use-settings-navigation'
import { useExecutionStore } from '@/stores/execution/store'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
@@ -70,15 +66,7 @@ interface ResourceContentProps {
workspaceId: string
resource: MothershipResource
previewMode?: PreviewMode
streamingFile?: {
toolCallId?: string
fileName: string
fileId?: string
targetKind?: 'new_file' | 'file_id'
operation?: string
edit?: Record<string, unknown>
content: string
} | null
previewSession?: FilePreviewSession | null
genericResourceData?: GenericResourceData
}
@@ -93,126 +81,10 @@ export const ResourceContent = memo(function ResourceContent({
workspaceId,
resource,
previewMode,
streamingFile,
previewSession,
genericResourceData,
}: ResourceContentProps) {
const queryClient = useQueryClient()
const streamFileName = streamingFile?.fileName || 'file.md'
const streamOperation = useMemo(() => {
if (!streamingFile) return undefined
return streamingFile.operation
}, [streamingFile])
const isWriteStream = streamOperation === 'create' || streamOperation === 'append'
const isPatchStream = streamOperation === 'patch'
const isUpdateStream = streamOperation === 'update'
const { data: allFiles = [] } = useWorkspaceFiles(workspaceId)
const previewFileId =
streamingFile?.fileId ?? (resource.type === 'file' ? resource.id : undefined)
const previewFileRecord = useMemo(() => {
if (!previewFileId) return undefined
return allFiles.find((f) => f.id === previewFileId)
}, [previewFileId, allFiles])
const isSourceMime =
previewFileRecord?.type === 'text/x-pptxgenjs' ||
previewFileRecord?.type === 'text/x-docxjs' ||
previewFileRecord?.type === 'text/x-pdflibjs'
const previewContentMode = isSourceMime ? 'raw' : 'text'
const { data: fetchedFileContent } = useWorkspaceFileContent(
workspaceId,
previewFileRecord?.id ?? '',
previewFileRecord?.key ?? '',
isSourceMime
)
const frozenPreviewBaseKey =
streamingFile?.toolCallId &&
previewFileRecord?.id &&
streamingFile.targetKind === 'file_id' &&
(streamOperation === 'append' || streamOperation === 'patch')
? `${streamingFile.toolCallId}:${streamOperation}:${previewFileRecord.id}`
: null
const cachedFrozenPreviewBase =
frozenPreviewBaseKey && previewFileRecord?.id
? queryClient.getQueryData<string>(
workspaceFilesKeys.content(workspaceId, previewFileRecord.id, previewContentMode)
)
: undefined
const [frozenPreviewBaseState, setFrozenPreviewBaseState] = useState<{
key: string | null
content: string | null
}>({ key: null, content: null })
useEffect(() => {
if (!frozenPreviewBaseKey || !previewFileRecord?.id) {
setFrozenPreviewBaseState((prev) => (prev.key === null ? prev : { key: null, content: null }))
return
}
setFrozenPreviewBaseState((prev) => {
if (prev.key === frozenPreviewBaseKey) {
return prev
}
return typeof cachedFrozenPreviewBase === 'string'
? { key: frozenPreviewBaseKey, content: cachedFrozenPreviewBase }
: { key: frozenPreviewBaseKey, content: null }
})
}, [cachedFrozenPreviewBase, frozenPreviewBaseKey, previewFileRecord?.id])
const frozenExistingFilePreviewBase =
frozenPreviewBaseState.key === frozenPreviewBaseKey
? (frozenPreviewBaseState.content ?? undefined)
: typeof cachedFrozenPreviewBase === 'string'
? cachedFrozenPreviewBase
: undefined
const streamingExtractedContent = useMemo(() => {
if (!streamingFile) return undefined
if (!streamOperation) return undefined
if (isPatchStream) {
if (streamingFile.targetKind === 'file_id') {
if (frozenExistingFilePreviewBase === undefined) return undefined
if (!shouldApplyPatchPreview(streamingFile)) return undefined
return extractPatchPreview(streamingFile, frozenExistingFilePreviewBase)
}
if (fetchedFileContent === undefined) return undefined
if (!shouldApplyPatchPreview(streamingFile)) return undefined
return extractPatchPreview(streamingFile, fetchedFileContent)
}
const extracted = streamingFile.content
if (isUpdateStream) return extracted
if (streamOperation === 'append') {
if (streamingFile.targetKind === 'file_id') {
if (frozenExistingFilePreviewBase === undefined) return undefined
return buildAppendPreview(frozenExistingFilePreviewBase, extracted)
}
return extracted.length > 0 ? extracted : undefined
}
if (streamOperation === 'create') {
return extracted.length > 0 ? extracted : undefined
}
if (isWriteStream) return extracted.length > 0 ? extracted : undefined
return undefined
}, [
streamingFile,
streamOperation,
isWriteStream,
isPatchStream,
isUpdateStream,
fetchedFileContent,
frozenExistingFilePreviewBase,
])
const streamFileName = previewSession?.fileName || 'file.md'
const syntheticFile = useMemo(() => {
const ext = getFileExtension(streamFileName)
const SOURCE_MIME_MAP: Record<string, string> = {
@@ -234,22 +106,21 @@ export const ResourceContent = memo(function ResourceContent({
}
}, [workspaceId, streamFileName])
// ResourceContent now reconstructs full-file preview text per operation,
// so the viewer can always treat streaming content as a whole-file replace.
const streamingFileMode: 'append' | 'replace' = 'replace'
const rawPreviewText = previewSession?.previewText
const streamingPreviewText =
typeof rawPreviewText === 'string' && rawPreviewText.length > 0 ? rawPreviewText : undefined
const embeddedStreamingContent = streamingExtractedContent
if (streamingFile && resource.id === 'streaming-file') {
if (previewSession && resource.id === 'streaming-file') {
return (
<div className='flex h-full flex-col overflow-hidden'>
{streamingExtractedContent !== undefined ? (
{streamingPreviewText !== undefined ? (
<FileViewer
file={syntheticFile}
workspaceId={workspaceId}
canEdit={false}
previewMode={previewMode ?? 'preview'}
streamingContent={streamingExtractedContent}
streamingContent={streamingPreviewText}
streamingMode={streamingFileMode}
useCodeRendererForCodeFiles
/>
@@ -273,7 +144,9 @@ export const ResourceContent = memo(function ResourceContent({
workspaceId={workspaceId}
fileId={resource.id}
previewMode={previewMode}
streamingContent={embeddedStreamingContent}
streamingContent={
previewSession?.fileId === resource.id ? streamingPreviewText : undefined
}
streamingMode={streamingFileMode}
/>
)
@@ -674,119 +547,3 @@ function EmbeddedFolder({ workspaceId, folderId }: EmbeddedFolderProps) {
</div>
)
}
function findAnchorIndex(lines: string[], anchor: string, occurrence = 1, afterIndex = -1): number {
const trimmed = anchor.trim()
let count = 0
for (let i = afterIndex + 1; i < lines.length; i++) {
if (lines[i].trim() === trimmed) {
count++
if (count === occurrence) return i
}
}
return -1
}
function extractPatchPreview(
streamingFile: {
content: string
edit?: Record<string, unknown>
},
existingContent: string
): string | undefined {
const edit = streamingFile.edit ?? {}
const strategy = typeof edit.strategy === 'string' ? edit.strategy : undefined
const lines = existingContent.split('\n')
const occurrence =
typeof edit.occurrence === 'number' && Number.isFinite(edit.occurrence) ? edit.occurrence : 1
if (strategy === 'search_replace') {
const search = typeof edit.search === 'string' ? edit.search : ''
if (!search) return undefined
const replace = streamingFile.content
if ((edit.replaceAll as boolean | undefined) === true) {
return existingContent.split(search).join(replace)
}
const firstIdx = existingContent.indexOf(search)
if (firstIdx === -1) return undefined
return (
existingContent.slice(0, firstIdx) + replace + existingContent.slice(firstIdx + search.length)
)
}
const mode = typeof edit.mode === 'string' ? edit.mode : undefined
if (!mode) return undefined
if (mode === 'replace_between') {
const beforeAnchor = typeof edit.before_anchor === 'string' ? edit.before_anchor : undefined
const afterAnchor = typeof edit.after_anchor === 'string' ? edit.after_anchor : undefined
if (!beforeAnchor || !afterAnchor) return undefined
const beforeIdx = findAnchorIndex(lines, beforeAnchor, occurrence)
const afterIdx = findAnchorIndex(lines, afterAnchor, occurrence, beforeIdx)
if (beforeIdx === -1 || afterIdx === -1 || afterIdx <= beforeIdx) return undefined
const newContent = streamingFile.content
const spliced = [
...lines.slice(0, beforeIdx + 1),
...(newContent.length > 0 ? newContent.split('\n') : []),
...lines.slice(afterIdx),
]
return spliced.join('\n')
}
if (mode === 'insert_after') {
const anchor = typeof edit.anchor === 'string' ? edit.anchor : undefined
if (!anchor) return undefined
const anchorIdx = findAnchorIndex(lines, anchor, occurrence)
if (anchorIdx === -1) return undefined
const newContent = streamingFile.content
const spliced = [
...lines.slice(0, anchorIdx + 1),
...(newContent.length > 0 ? newContent.split('\n') : []),
...lines.slice(anchorIdx + 1),
]
return spliced.join('\n')
}
if (mode === 'delete_between') {
const startAnchor = typeof edit.start_anchor === 'string' ? edit.start_anchor : undefined
const endAnchor = typeof edit.end_anchor === 'string' ? edit.end_anchor : undefined
if (!startAnchor || !endAnchor) return undefined
const startIdx = findAnchorIndex(lines, startAnchor, occurrence)
const endIdx = findAnchorIndex(lines, endAnchor, occurrence, startIdx)
if (startIdx === -1 || endIdx === -1 || endIdx <= startIdx) return undefined
const spliced = [...lines.slice(0, startIdx), ...lines.slice(endIdx)]
return spliced.join('\n')
}
return undefined
}
function shouldApplyPatchPreview(streamingFile: {
content: string
edit?: Record<string, unknown>
}): boolean {
const edit = streamingFile.edit ?? {}
const strategy = typeof edit.strategy === 'string' ? edit.strategy : undefined
const mode = typeof edit.mode === 'string' ? edit.mode : undefined
// delete_between is delete-only and can be previewed from intent metadata alone.
if (strategy === 'anchored' && mode === 'delete_between') {
return true
}
// For all other patch modes, keep the visible file unchanged until
// edit_content actually streams content into the target location.
return streamingFile.content.length > 0
}
function buildAppendPreview(existingContent: string, incomingContent: string): string {
if (incomingContent.length === 0) return existingContent
if (existingContent.length === 0) return incomingContent
return `${existingContent}\n${incomingContent}`
}

View File

@@ -1,6 +1,7 @@
'use client'
import { forwardRef, memo, useCallback, useEffect, useState } from 'react'
import type { FilePreviewSession } from '@/lib/copilot/request/session'
import { cn } from '@/lib/core/utils/cn'
import { getFileExtension } from '@/lib/uploads/utils/file-utils'
import type { PreviewMode } from '@/app/workspace/[workspaceId]/files/components/file-viewer'
@@ -25,24 +26,13 @@ const PREVIEW_CYCLE: Record<PreviewMode, PreviewMode> = {
* when the streamed fileId matches that exact resource.
*/
function shouldShowStreamingFilePanel(
streamingFile:
| {
toolCallId?: string
fileName: string
fileId?: string
targetKind?: 'new_file' | 'file_id'
operation?: string
edit?: Record<string, unknown>
content: string
}
| null
| undefined,
previewSession: FilePreviewSession | null | undefined,
active: MothershipResource | null
): boolean {
if (!streamingFile || !active) return false
if (!previewSession || previewSession.status === 'complete' || !active) return false
if (active.id === 'streaming-file') return true
if (active.type !== 'file') return false
if (active.id && streamingFile.fileId === active.id) return true
if (active.id && previewSession.fileId === active.id) return true
return false
}
@@ -58,15 +48,7 @@ interface MothershipViewProps {
onCollapse: () => void
isCollapsed: boolean
className?: string
streamingFile?: {
toolCallId?: string
fileName: string
fileId?: string
targetKind?: 'new_file' | 'file_id'
operation?: string
edit?: Record<string, unknown>
content: string
} | null
previewSession?: FilePreviewSession | null
genericResourceData?: GenericResourceData
}
@@ -84,7 +66,7 @@ export const MothershipView = memo(
onCollapse,
isCollapsed,
className,
streamingFile,
previewSession,
genericResourceData,
}: MothershipViewProps,
ref
@@ -92,9 +74,9 @@ export const MothershipView = memo(
const active = resources.find((r) => r.id === activeResourceId) ?? resources[0] ?? null
const { canEdit } = useUserPermissionsContext()
const streamingForActive =
streamingFile && active && shouldShowStreamingFilePanel(streamingFile, active)
? streamingFile
const previewForActive =
previewSession && active && shouldShowStreamingFilePanel(previewSession, active)
? previewSession
: undefined
const [previewMode, setPreviewMode] = useState<PreviewMode>('preview')
@@ -145,7 +127,7 @@ export const MothershipView = memo(
workspaceId={workspaceId}
resource={active}
previewMode={isActivePreviewable ? previewMode : undefined}
streamingFile={streamingForActive}
previewSession={previewForActive}
genericResourceData={active.type === 'generic' ? genericResourceData : undefined}
/>
) : (

View File

@@ -157,7 +157,7 @@ export function Home({ chatId }: HomeProps = {}) {
removeFromQueue,
sendNow,
editQueuedMessage,
streamingFile,
previewSession,
genericResourceData,
} = useChat(
workspaceId,
@@ -413,7 +413,7 @@ export function Home({ chatId }: HomeProps = {}) {
onReorderResources={reorderResources}
onCollapse={collapseResource}
isCollapsed={isResourceCollapsed}
streamingFile={streamingFile}
previewSession={previewSession}
genericResourceData={genericResourceData ?? undefined}
className={skipResourceTransition ? '!transition-none' : undefined}
/>

View File

@@ -56,6 +56,7 @@ import {
WorkspaceFile,
WorkspaceFileOperation,
} from '@/lib/copilot/generated/tool-catalog-v1'
import type { FilePreviewSession } from '@/lib/copilot/request/session'
import type { StreamBatchEvent } from '@/lib/copilot/request/session/types'
import {
extractResourcesFromToolResult,
@@ -75,6 +76,12 @@ import { generateId } from '@/lib/core/utils/uuid'
import { getNextWorkflowColor } from '@/lib/workflows/colors'
import { getQueryClient } from '@/app/_shell/providers/get-query-client'
import { invalidateResourceQueries } from '@/app/workspace/[workspaceId]/home/components/mothership-view/components/resource-registry'
import {
type FilePreviewSessionsState,
INITIAL_FILE_PREVIEW_SESSIONS_STATE,
reduceFilePreviewSessions,
useFilePreviewSessions,
} from '@/app/workspace/[workspaceId]/home/hooks/use-file-preview-sessions'
import { deploymentKeys } from '@/hooks/queries/deployments'
import {
fetchChatHistory,
@@ -88,6 +95,7 @@ import { invalidateWorkflowSelectors } from '@/hooks/queries/utils/invalidate-wo
import { getTopInsertionSortOrder } from '@/hooks/queries/utils/top-insertion-sort-order'
import { getWorkflowById, getWorkflows } from '@/hooks/queries/utils/workflow-cache'
import { workflowKeys } from '@/hooks/queries/workflows'
import { workspaceFilesKeys } from '@/hooks/queries/workspace-files'
import { useExecutionStream } from '@/hooks/use-execution-stream'
import { useExecutionStore } from '@/stores/execution/store'
import type { ChatContext } from '@/stores/panel'
@@ -127,7 +135,7 @@ export interface UseChatReturn {
removeFromQueue: (id: string) => void
sendNow: (id: string) => Promise<void>
editQueuedMessage: (id: string) => QueuedMessage | undefined
streamingFile: StreamingFilePreview | null
previewSession: FilePreviewSession | null
genericResourceData: GenericResourceData | null
}
@@ -470,22 +478,42 @@ type StreamToolUI = {
clientExecutable?: boolean
}
type StreamingFilePreview = {
toolCallId: string
fileName: string
fileId?: string
targetKind?: 'new_file' | 'file_id'
operation?: string
edit?: Record<string, unknown>
content: string
}
type StreamBatchResponse = {
success: boolean
events: StreamBatchEvent[]
previewSessions?: FilePreviewSession[]
status: string
}
function buildChatHistoryHydrationKey(chatHistory: TaskChatHistory): string {
const resourceKey = chatHistory.resources
.map((resource) => `${resource.type}:${resource.id}:${resource.title}`)
.join('|')
const messageKey = chatHistory.messages.map((message) => message.id).join('|')
const streamSnapshot = chatHistory.streamSnapshot
const snapshotKey = streamSnapshot
? [
streamSnapshot.status,
streamSnapshot.events.length,
streamSnapshot.events[streamSnapshot.events.length - 1]?.eventId ?? '',
streamSnapshot.previewSessions
.map(
(session) =>
`${session.id}:${session.previewVersion}:${session.status}:${session.updatedAt}`
)
.join('|'),
].join('~')
: 'none'
return [
chatHistory.id,
chatHistory.activeStreamId ?? '',
messageKey,
resourceKey,
snapshotKey,
].join('::')
}
const TERMINAL_STREAM_STATUSES = new Set(['complete', 'error', 'cancelled'])
function isTerminalStreamStatus(status: string | null | undefined): boolean {
@@ -676,13 +704,80 @@ export function useChat(
const activeResourceIdRef = useRef(effectiveActiveResourceId)
activeResourceIdRef.current = effectiveActiveResourceId
const [streamingFile, setStreamingFile] = useState<StreamingFilePreview | null>(null)
const streamingFileRef = useRef(streamingFile)
streamingFileRef.current = streamingFile
const pendingStreamingFileRef = useRef<{ value: StreamingFilePreview | null } | null>(null)
const streamingFileFrameRef = useRef<number | null>(null)
const filePreviewSessionsRef = useRef<Map<string, StreamingFilePreview>>(new Map())
const activeFilePreviewToolCallIdRef = useRef<string | null>(null)
const {
previewSession,
previewSessionsById,
activePreviewSessionId,
hydratePreviewSessions,
upsertPreviewSession,
completePreviewSession,
removePreviewSession,
resetPreviewSessions,
} = useFilePreviewSessions()
const previewSessionRef = useRef(previewSession)
previewSessionRef.current = previewSession
const previewSessionsRef = useRef(previewSessionsById)
previewSessionsRef.current = previewSessionsById
const activePreviewSessionIdRef = useRef(activePreviewSessionId)
activePreviewSessionIdRef.current = activePreviewSessionId
const previewSessionsStateRef = useRef<FilePreviewSessionsState>({
activeSessionId: activePreviewSessionId,
sessions: previewSessionsById,
})
previewSessionsStateRef.current = {
activeSessionId: activePreviewSessionId,
sessions: previewSessionsById,
}
const syncPreviewSessionRefs = useCallback((nextState: FilePreviewSessionsState) => {
previewSessionsStateRef.current = nextState
previewSessionsRef.current = nextState.sessions
activePreviewSessionIdRef.current = nextState.activeSessionId
previewSessionRef.current =
nextState.activeSessionId !== null
? (nextState.sessions[nextState.activeSessionId] ?? null)
: null
}, [])
const applyPreviewSessionUpdate = useCallback(
(session: FilePreviewSession, options?: { activate?: boolean }) => {
const nextState = reduceFilePreviewSessions(previewSessionsStateRef.current, {
type: 'upsert',
session,
...(options?.activate === false ? { activate: false } : {}),
})
syncPreviewSessionRefs(nextState)
upsertPreviewSession(session, options)
return nextState
},
[syncPreviewSessionRefs, upsertPreviewSession]
)
const applyCompletedPreviewSession = useCallback(
(session: FilePreviewSession) => {
const nextState = reduceFilePreviewSessions(previewSessionsStateRef.current, {
type: 'complete',
session,
})
syncPreviewSessionRefs(nextState)
completePreviewSession(session)
return nextState
},
[completePreviewSession, syncPreviewSessionRefs]
)
const removePreviewSessionImmediate = useCallback(
(sessionId: string) => {
const nextState = reduceFilePreviewSessions(previewSessionsStateRef.current, {
type: 'remove',
sessionId,
})
syncPreviewSessionRefs(nextState)
removePreviewSession(sessionId)
return nextState
},
[removePreviewSession, syncPreviewSessionRefs]
)
const [messageQueue, setMessageQueue] = useState<QueuedMessage[]>([])
const messageQueueRef = useRef<QueuedMessage[]>([])
@@ -711,55 +806,77 @@ export function useChat(
>(async () => false)
const finalizeRef = useRef<(options?: { error?: boolean }) => void>(() => {})
const cancelQueuedStreamingFileUpdate = useCallback(() => {
if (streamingFileFrameRef.current !== null) {
cancelAnimationFrame(streamingFileFrameRef.current)
streamingFileFrameRef.current = null
}
pendingStreamingFileRef.current = null
}, [])
const setStreamingFileImmediate = useCallback(
(next: StreamingFilePreview | null) => {
cancelQueuedStreamingFileUpdate()
streamingFileRef.current = next
setStreamingFile(next)
},
[cancelQueuedStreamingFileUpdate]
)
const resetEphemeralPreviewState = useCallback(
(options?: { removeStreamingResource?: boolean }) => {
setStreamingFileImmediate(null)
filePreviewSessionsRef.current.clear()
activeFilePreviewToolCallIdRef.current = null
syncPreviewSessionRefs(INITIAL_FILE_PREVIEW_SESSIONS_STATE)
resetPreviewSessions()
if (options?.removeStreamingResource) {
setResources((current) => current.filter((resource) => resource.id !== 'streaming-file'))
}
},
[setStreamingFileImmediate]
[resetPreviewSessions, syncPreviewSessionRefs]
)
const queueStreamingFileUpdate = useCallback((next: StreamingFilePreview) => {
streamingFileRef.current = next
pendingStreamingFileRef.current = { value: next }
if (streamingFileFrameRef.current !== null) return
streamingFileFrameRef.current = requestAnimationFrame(() => {
streamingFileFrameRef.current = null
const pending = pendingStreamingFileRef.current
pendingStreamingFileRef.current = null
if (!pending) return
setStreamingFile(pending.value)
})
const syncPreviewResourceChrome = useCallback((session: FilePreviewSession) => {
if (session.targetKind === 'new_file') {
setResources((current) => {
const existing = current.find((resource) => resource.id === 'streaming-file')
if (existing) {
return current.map((resource) =>
resource.id === 'streaming-file'
? { ...resource, title: session.fileName || 'Writing file...' }
: resource
)
}
return [
...current,
{
type: 'file',
id: 'streaming-file',
title: session.fileName || 'Writing file...',
},
]
})
setActiveResourceId('streaming-file')
return
}
if (session.fileId) {
setResources((current) => current.filter((resource) => resource.id !== 'streaming-file'))
setActiveResourceId(session.fileId)
}
}, [])
const seedPreviewSessions = useCallback(
(sessions: FilePreviewSession[]) => {
if (sessions.length === 0) {
return
}
const nextState = reduceFilePreviewSessions(previewSessionsStateRef.current, {
type: 'hydrate',
sessions,
})
syncPreviewSessionRefs(nextState)
hydratePreviewSessions(sessions)
const active =
nextState.activeSessionId !== null
? (nextState.sessions[nextState.activeSessionId] ?? null)
: null
if (active) {
syncPreviewResourceChrome(active)
}
},
[hydratePreviewSessions, syncPreviewResourceChrome, syncPreviewSessionRefs]
)
const abortControllerRef = useRef<AbortController | null>(null)
const streamReaderRef = useRef<ReadableStreamDefaultReader<Uint8Array> | null>(null)
const chatIdRef = useRef<string | undefined>(initialChatId)
/** Panel/task selection — drives createNewChat + request chatId; may differ from chatIdRef while a stream is still finishing. */
const selectedChatIdRef = useRef<string | undefined>(initialChatId)
selectedChatIdRef.current = initialChatId
const appliedChatIdRef = useRef<string | undefined>(undefined)
const appliedChatHistoryKeyRef = useRef<string | undefined>(undefined)
const pendingUserMsgRef = useRef<{ id: string; content: string } | null>(null)
const streamIdRef = useRef<string | undefined>(undefined)
const lastCursorRef = useRef('0')
@@ -887,7 +1004,7 @@ export function useChat(
chatIdRef.current = initialChatId
lastCursorRef.current = '0'
setResolvedChatId(initialChatId)
appliedChatIdRef.current = undefined
appliedChatHistoryKeyRef.current = undefined
setMessages([])
setError(null)
setIsSending(false)
@@ -905,7 +1022,7 @@ export function useChat(
chatIdRef.current = undefined
lastCursorRef.current = '0'
setResolvedChatId(undefined)
appliedChatIdRef.current = undefined
appliedChatHistoryKeyRef.current = undefined
abortControllerRef.current = null
sendingRef.current = false
setMessages([])
@@ -919,10 +1036,13 @@ export function useChat(
}, [isHomePage, resetEphemeralPreviewState])
useEffect(() => {
if (!chatHistory || appliedChatIdRef.current === chatHistory.id) return
if (!chatHistory) return
const hydrationKey = buildChatHistoryHydrationKey(chatHistory)
if (appliedChatHistoryKeyRef.current === hydrationKey) return
const activeStreamId = chatHistory.activeStreamId
appliedChatIdRef.current = chatHistory.id
appliedChatHistoryKeyRef.current = hydrationKey
const mappedMessages = chatHistory.messages.map(toDisplayMessage)
const shouldPreserveActiveStreamingMessage =
sendingRef.current && Boolean(activeStreamId) && activeStreamId === streamIdRef.current
@@ -973,6 +1093,13 @@ export function useChat(
setActiveResourceId(null)
}
const snapshotPreviewSessions = Array.isArray(chatHistory.streamSnapshot?.previewSessions)
? (chatHistory.streamSnapshot.previewSessions as FilePreviewSession[])
: []
if (snapshotPreviewSessions.length > 0) {
seedPreviewSessions(snapshotPreviewSessions)
}
if (activeStreamId && !sendingRef.current) {
const gen = ++streamGenRef.current
const abortController = new AbortController()
@@ -998,6 +1125,7 @@ export function useChat(
initialBatch: {
success: true,
events: snapshotEvents,
previewSessions: snapshotPreviewSessions,
status: initialSnapshot?.status ?? 'unknown',
},
afterCursor: String(snapshotEvents[snapshotEvents.length - 1]?.eventId ?? '0'),
@@ -1026,7 +1154,13 @@ export function useChat(
}
reconnect()
}
}, [chatHistory, workspaceId, queryClient, recoverPendingClientWorkflowTools])
}, [
chatHistory,
workspaceId,
queryClient,
recoverPendingClientWorkflowTools,
seedPreviewSessions,
])
const processSSEStream = useCallback(
async (
@@ -1285,126 +1419,129 @@ export function useChat(
if (!id) break
if (previewPhase) {
const sessions = filePreviewSessionsRef.current
const prevSession = sessions.get(id) ?? {
const prevSession = previewSessionsRef.current[id]
const target = asPayloadRecord(payload.target)
const targetKind =
payload.targetKind === 'new_file' || payload.targetKind === 'file_id'
? (payload.targetKind as 'new_file' | 'file_id')
: target?.kind === 'new_file' || target?.kind === 'file_id'
? (target.kind as 'new_file' | 'file_id')
: prevSession?.targetKind
const fileId =
typeof payload.fileId === 'string'
? payload.fileId
: typeof target?.fileId === 'string'
? target.fileId
: prevSession?.fileId
const fileName =
typeof payload.fileName === 'string'
? payload.fileName
: typeof target?.fileName === 'string'
? target.fileName
: (prevSession?.fileName ?? '')
const operation =
typeof payload.operation === 'string' ? payload.operation : prevSession?.operation
const edit = asPayloadRecord(payload.edit) ?? prevSession?.edit
const streamId = parsed.stream?.streamId ?? prevSession?.streamId ?? ''
const nextPreviewVersion =
typeof payload.previewVersion === 'number' &&
Number.isFinite(payload.previewVersion)
? payload.previewVersion
: (prevSession?.previewVersion ?? 0) + 1
const baseSession: FilePreviewSession = {
schemaVersion: 1,
id,
streamId,
toolCallId: id,
fileName: '',
content: '',
status: prevSession?.status ?? 'pending',
fileName,
...(fileId ? { fileId } : {}),
...(targetKind ? { targetKind } : {}),
...(operation ? { operation } : {}),
...(edit ? { edit } : {}),
previewText: prevSession?.previewText ?? '',
previewVersion: prevSession?.previewVersion ?? 0,
updatedAt: prevSession?.updatedAt ?? new Date().toISOString(),
...(prevSession?.completedAt ? { completedAt: prevSession.completedAt } : {}),
}
if (previewPhase === 'file_preview_start') {
const nextSession: StreamingFilePreview = {
...prevSession,
toolCallId: id,
...(typeof payload.operation === 'string'
? { operation: payload.operation }
: {}),
...(typeof payload.fileId === 'string'
? { fileId: payload.fileId, targetKind: 'file_id' as const }
: {}),
const nextSession: FilePreviewSession = {
...baseSession,
status: 'pending',
updatedAt: new Date().toISOString(),
}
sessions.set(id, nextSession)
activeFilePreviewToolCallIdRef.current = id
if (nextSession.fileId) {
setActiveResourceId(nextSession.fileId)
}
setStreamingFileImmediate(nextSession)
applyPreviewSessionUpdate(nextSession)
break
}
if (previewPhase === 'file_preview_target') {
const target = asPayloadRecord(payload.target)
const nextSession: StreamingFilePreview = {
...prevSession,
operation:
typeof payload.operation === 'string'
? payload.operation
: prevSession.operation,
targetKind:
target?.kind === 'new_file' || target?.kind === 'file_id'
? (target.kind as 'new_file' | 'file_id')
: prevSession.targetKind,
fileId: typeof target?.fileId === 'string' ? target.fileId : prevSession.fileId,
fileName:
typeof target?.fileName === 'string' ? target.fileName : prevSession.fileName,
const nextSession: FilePreviewSession = {
...baseSession,
updatedAt: new Date().toISOString(),
}
sessions.set(id, nextSession)
activeFilePreviewToolCallIdRef.current = id
if (nextSession.targetKind === 'new_file') {
const hasStreamingResource = resourcesRef.current.some(
(resource) => resource.id === 'streaming-file'
)
if (!hasStreamingResource) {
addResource({
type: 'file',
id: 'streaming-file',
title: nextSession.fileName || 'Writing file...',
})
setActiveResourceId('streaming-file')
}
} else if (nextSession.fileId) {
setResources((rs) => rs.filter((resource) => resource.id !== 'streaming-file'))
if (
activeResourceIdRef.current === null ||
activeResourceIdRef.current === 'streaming-file'
) {
setActiveResourceId(nextSession.fileId)
}
const nextState = applyPreviewSessionUpdate(nextSession)
const activePreview =
nextState.activeSessionId !== null
? (nextState.sessions[nextState.activeSessionId] ?? null)
: null
if (activePreview?.id === nextSession.id) {
syncPreviewResourceChrome(activePreview)
}
setStreamingFileImmediate(nextSession)
break
}
if (previewPhase === 'file_preview_edit_meta') {
const nextSession: StreamingFilePreview = {
...prevSession,
edit: asPayloadRecord(payload.edit),
const nextSession: FilePreviewSession = {
...baseSession,
status: prevSession?.status ?? 'pending',
updatedAt: new Date().toISOString(),
}
sessions.set(id, nextSession)
activeFilePreviewToolCallIdRef.current = id
setStreamingFileImmediate(nextSession)
applyPreviewSessionUpdate(nextSession)
break
}
if (previewPhase === 'file_preview_content') {
const content = typeof payload.content === 'string' ? payload.content : ''
const contentMode =
typeof payload.contentMode === 'string' ? payload.contentMode : undefined
let nextContent: string
if (contentMode === 'snapshot') {
nextContent = content
} else if (contentMode === 'delta') {
nextContent = (prevSession.content ?? '') + content
} else {
const isAppendOp = prevSession.operation === 'append'
const prevContent = streamingFileRef.current?.content ?? ''
nextContent = isAppendOp ? prevContent + content : content
const contentMode = payload.contentMode === 'delta' ? 'delta' : 'snapshot'
const nextPreviewText =
contentMode === 'delta' ? (prevSession?.previewText ?? '') + content : content
const nextSession: FilePreviewSession = {
...baseSession,
status: 'streaming',
previewText: nextPreviewText,
previewVersion: nextPreviewVersion,
updatedAt: new Date().toISOString(),
}
const nextSession: StreamingFilePreview = {
...prevSession,
content: nextContent,
}
sessions.set(id, nextSession)
activeFilePreviewToolCallIdRef.current = id
applyPreviewSessionUpdate(nextSession)
const previewToolIdx = toolMap.get(id)
if (previewToolIdx !== undefined && blocks[previewToolIdx].toolCall) {
blocks[previewToolIdx].toolCall!.status = 'executing'
}
queueStreamingFileUpdate(nextSession)
break
}
if (previewPhase === 'file_preview_complete') {
const fileId =
typeof payload.fileId === 'string' ? payload.fileId : prevSession.fileId
const resultData = asPayloadRecord(payload.output)
sessions.delete(id)
activeFilePreviewToolCallIdRef.current = null
const completedAt = new Date().toISOString()
const nextSession: FilePreviewSession = {
...baseSession,
status: 'complete',
previewVersion:
typeof payload.previewVersion === 'number' &&
Number.isFinite(payload.previewVersion)
? payload.previewVersion
: (prevSession?.previewVersion ?? 0),
updatedAt: completedAt,
completedAt,
}
const nextState = applyCompletedPreviewSession(nextSession)
if (fileId && resultData?.id) {
const fileName = (resultData.name as string) ?? prevSession.fileName ?? 'File'
const fileName = (resultData.name as string) ?? nextSession.fileName ?? 'File'
const fileResource = { type: 'file' as const, id: fileId, title: fileName }
setResources((rs) => {
const without = rs.filter((r) => r.id !== 'streaming-file')
@@ -1414,11 +1551,21 @@ export function useChat(
return [...without, fileResource]
})
setActiveResourceId(fileId)
if (nextSession.previewText) {
queryClient.setQueryData(
workspaceFilesKeys.content(workspaceId, fileId, 'text'),
nextSession.previewText
)
}
invalidateResourceQueries(queryClient, workspaceId, 'file', fileId)
}
if (!activeSubagent || activeSubagent !== FileTool.id) {
setStreamingFileImmediate(null)
} else {
const activePreview =
nextState.activeSessionId !== null
? (nextState.sessions[nextState.activeSessionId] ?? null)
: null
if (activePreview) {
syncPreviewResourceChrome(activePreview)
}
}
break
}
@@ -1544,12 +1691,8 @@ export function useChat(
(tc.name === WorkspaceFile.id || tc.name === 'edit_content') &&
!shouldKeepWorkspacePreviewOpen
) {
filePreviewSessionsRef.current.delete(id)
if (activeFilePreviewToolCallIdRef.current === id) {
activeFilePreviewToolCallIdRef.current = null
if (!activeSubagent || activeSubagent !== FileTool.id) {
setStreamingFileImmediate(null)
}
if (tc.name === WorkspaceFile.id) {
removePreviewSessionImmediate(id)
}
const fileResource = extractedResources.find((r) => r.type === 'file')
if (fileResource) {
@@ -1591,7 +1734,7 @@ export function useChat(
if (name === 'edit_content') {
const parentToolCallId =
activeFilePreviewToolCallIdRef.current ?? streamingFileRef.current?.toolCallId
activePreviewSessionIdRef.current ?? previewSessionRef.current?.toolCallId
const parentIdx =
parentToolCallId !== null && parentToolCallId !== undefined
? toolMap.get(parentToolCallId)
@@ -1767,20 +1910,24 @@ export function useChat(
blocks.push({ type: 'subagent', content: name })
}
if (name === FileTool.id && !isSameActiveSubagent) {
const emptyFile: StreamingFilePreview = {
applyPreviewSessionUpdate({
schemaVersion: 1,
id: parentToolCallId || 'file-preview',
streamId: streamIdRef.current ?? '',
toolCallId: parentToolCallId || 'file-preview',
status: 'pending',
fileName: '',
content: '',
}
setStreamingFileImmediate(emptyFile)
previewText: '',
previewVersion: 0,
updatedAt: new Date().toISOString(),
})
}
flush()
} else if (spanEvent === MothershipStreamV1SpanLifecycleEvent.end) {
if (isPendingPause) {
break
}
if (streamingFileRef.current && !activeFilePreviewToolCallIdRef.current) {
setStreamingFileImmediate(null)
if (previewSessionRef.current && !activePreviewSessionIdRef.current) {
const lastFileResource = resourcesRef.current.find(
(r) => r.type === 'file' && r.id !== 'streaming-file'
)
@@ -1820,7 +1967,16 @@ export function useChat(
}
return { sawStreamError, sawComplete: sawCompleteEvent }
},
[workspaceId, queryClient, addResource, removeResource]
[
workspaceId,
queryClient,
addResource,
removeResource,
applyPreviewSessionUpdate,
applyCompletedPreviewSession,
removePreviewSessionImmediate,
syncPreviewResourceChrome,
]
)
processSSEStreamRef.current = processSSEStream
@@ -1859,9 +2015,13 @@ export function useChat(
if (!response.ok) {
throw new Error(`Stream resume batch failed: ${response.status}`)
}
return response.json()
const batch = (await response.json()) as StreamBatchResponse
if (Array.isArray(batch.previewSessions) && batch.previewSessions.length > 0) {
seedPreviewSessions(batch.previewSessions)
}
return batch
},
[]
[seedPreviewSessions]
)
const attachToExistingStream = useCallback(
@@ -2492,13 +2652,12 @@ export function useChat(
useEffect(() => {
return () => {
cancelQueuedStreamingFileUpdate()
streamReaderRef.current = null
abortControllerRef.current = null
streamGenRef.current++
sendingRef.current = false
}
}, [cancelQueuedStreamingFileUpdate])
}, [])
return {
messages,
@@ -2518,7 +2677,7 @@ export function useChat(
removeFromQueue,
sendNow,
editQueuedMessage,
streamingFile,
previewSession,
genericResourceData,
}
}

View File

@@ -0,0 +1,147 @@
/**
* @vitest-environment node
*/
import { describe, expect, it } from 'vitest'
import type { FilePreviewSession } from '@/lib/copilot/request/session'
import {
INITIAL_FILE_PREVIEW_SESSIONS_STATE,
reduceFilePreviewSessions,
} from '@/app/workspace/[workspaceId]/home/hooks/use-file-preview-sessions'
function createSession(
overrides: Partial<FilePreviewSession> & Pick<FilePreviewSession, 'id' | 'toolCallId'>
): FilePreviewSession {
return {
schemaVersion: 1,
id: overrides.id,
streamId: overrides.streamId ?? 'stream-1',
toolCallId: overrides.toolCallId,
status: overrides.status ?? 'streaming',
fileName: overrides.fileName ?? `${overrides.id}.md`,
previewText: overrides.previewText ?? '',
previewVersion: overrides.previewVersion ?? 1,
updatedAt: overrides.updatedAt ?? '2026-04-10T00:00:00.000Z',
...(overrides.fileId ? { fileId: overrides.fileId } : {}),
...(overrides.targetKind ? { targetKind: overrides.targetKind } : {}),
...(overrides.operation ? { operation: overrides.operation } : {}),
...(overrides.edit ? { edit: overrides.edit } : {}),
...(overrides.completedAt ? { completedAt: overrides.completedAt } : {}),
}
}
describe('reduceFilePreviewSessions', () => {
it('hydrates the latest active preview session', () => {
const state = reduceFilePreviewSessions(INITIAL_FILE_PREVIEW_SESSIONS_STATE, {
type: 'hydrate',
sessions: [
createSession({
id: 'preview-1',
toolCallId: 'preview-1',
previewVersion: 1,
updatedAt: '2026-04-10T00:00:00.000Z',
}),
createSession({
id: 'preview-2',
toolCallId: 'preview-2',
previewVersion: 2,
updatedAt: '2026-04-10T00:00:01.000Z',
previewText: 'latest',
}),
],
})
expect(state.activeSessionId).toBe('preview-2')
expect(state.sessions['preview-2']?.previewText).toBe('latest')
})
it('drops the active session when it completes and promotes the next active session', () => {
const hydratedState = reduceFilePreviewSessions(INITIAL_FILE_PREVIEW_SESSIONS_STATE, {
type: 'hydrate',
sessions: [
createSession({
id: 'preview-1',
toolCallId: 'preview-1',
previewVersion: 1,
updatedAt: '2026-04-10T00:00:00.000Z',
}),
createSession({
id: 'preview-2',
toolCallId: 'preview-2',
previewVersion: 2,
updatedAt: '2026-04-10T00:00:01.000Z',
}),
],
})
const completedState = reduceFilePreviewSessions(hydratedState, {
type: 'complete',
session: createSession({
id: 'preview-2',
toolCallId: 'preview-2',
status: 'complete',
previewVersion: 3,
updatedAt: '2026-04-10T00:00:02.000Z',
completedAt: '2026-04-10T00:00:02.000Z',
}),
})
expect(completedState.activeSessionId).toBe('preview-1')
expect(completedState.sessions['preview-1']?.id).toBe('preview-1')
})
it('clears active session when the only session completes', () => {
const onlyStreaming = reduceFilePreviewSessions(INITIAL_FILE_PREVIEW_SESSIONS_STATE, {
type: 'upsert',
session: createSession({
id: 'preview-1',
toolCallId: 'preview-1',
previewVersion: 2,
updatedAt: '2026-04-10T00:00:01.000Z',
previewText: 'final',
}),
})
const completed = reduceFilePreviewSessions(onlyStreaming, {
type: 'complete',
session: createSession({
id: 'preview-1',
toolCallId: 'preview-1',
status: 'complete',
previewVersion: 3,
updatedAt: '2026-04-10T00:00:02.000Z',
completedAt: '2026-04-10T00:00:02.000Z',
previewText: 'final',
}),
})
expect(completed.activeSessionId).toBeNull()
expect(completed.sessions['preview-1']?.status).toBe('complete')
})
it('ignores stale complete events for a newer active session', () => {
const activeState = reduceFilePreviewSessions(INITIAL_FILE_PREVIEW_SESSIONS_STATE, {
type: 'upsert',
session: createSession({
id: 'preview-1',
toolCallId: 'preview-1',
previewVersion: 3,
updatedAt: '2026-04-10T00:00:03.000Z',
}),
})
const staleCompleteState = reduceFilePreviewSessions(activeState, {
type: 'complete',
session: createSession({
id: 'preview-1',
toolCallId: 'preview-1',
status: 'complete',
previewVersion: 2,
updatedAt: '2026-04-10T00:00:02.000Z',
completedAt: '2026-04-10T00:00:02.000Z',
}),
})
expect(staleCompleteState.activeSessionId).toBe('preview-1')
expect(staleCompleteState.sessions['preview-1']?.status).toBe('streaming')
expect(staleCompleteState.sessions['preview-1']?.previewVersion).toBe(3)
})
})

View File

@@ -0,0 +1,187 @@
import { useCallback, useMemo, useReducer } from 'react'
import type { FilePreviewSession } from '@/lib/copilot/request/session'
export interface FilePreviewSessionsState {
activeSessionId: string | null
sessions: Record<string, FilePreviewSession>
}
export type FilePreviewSessionsAction =
| { type: 'hydrate'; sessions: FilePreviewSession[] }
| { type: 'upsert'; session: FilePreviewSession; activate?: boolean }
| { type: 'complete'; session: FilePreviewSession }
| { type: 'remove'; sessionId: string }
| { type: 'reset' }
export const INITIAL_FILE_PREVIEW_SESSIONS_STATE: FilePreviewSessionsState = {
activeSessionId: null,
sessions: {},
}
export function shouldReplaceSession(
current: FilePreviewSession | undefined,
next: FilePreviewSession
): boolean {
if (!current) return true
if (next.previewVersion !== current.previewVersion) {
return next.previewVersion > current.previewVersion
}
return next.updatedAt >= current.updatedAt
}
export function pickActiveSessionId(
sessions: Record<string, FilePreviewSession>,
preferredId?: string | null
): string | null {
if (preferredId && sessions[preferredId]?.status !== 'complete') {
return preferredId
}
let latestActive: FilePreviewSession | null = null
for (const session of Object.values(sessions)) {
if (session.status === 'complete') continue
if (!latestActive || shouldReplaceSession(latestActive, session)) {
latestActive = session
}
}
return latestActive?.id ?? null
}
export function reduceFilePreviewSessions(
state: FilePreviewSessionsState,
action: FilePreviewSessionsAction
): FilePreviewSessionsState {
switch (action.type) {
case 'hydrate': {
if (action.sessions.length === 0) {
return state
}
const nextSessions = { ...state.sessions }
for (const session of action.sessions) {
if (shouldReplaceSession(nextSessions[session.id], session)) {
nextSessions[session.id] = session
}
}
return {
sessions: nextSessions,
activeSessionId: pickActiveSessionId(nextSessions, state.activeSessionId),
}
}
case 'upsert': {
if (!shouldReplaceSession(state.sessions[action.session.id], action.session)) {
return state
}
const nextSessions = {
...state.sessions,
[action.session.id]: action.session,
}
return {
sessions: nextSessions,
activeSessionId:
action.activate === false
? pickActiveSessionId(nextSessions, state.activeSessionId)
: action.session.status === 'complete'
? pickActiveSessionId(nextSessions, state.activeSessionId)
: action.session.id,
}
}
case 'complete': {
if (!shouldReplaceSession(state.sessions[action.session.id], action.session)) {
return state
}
const nextSessions = {
...state.sessions,
[action.session.id]: action.session,
}
return {
sessions: nextSessions,
activeSessionId:
state.activeSessionId === action.session.id
? pickActiveSessionId(nextSessions, null)
: state.activeSessionId,
}
}
case 'remove': {
if (!state.sessions[action.sessionId]) {
return state
}
const nextSessions = { ...state.sessions }
delete nextSessions[action.sessionId]
return {
sessions: nextSessions,
activeSessionId:
state.activeSessionId === action.sessionId
? pickActiveSessionId(nextSessions, null)
: state.activeSessionId,
}
}
case 'reset':
return INITIAL_FILE_PREVIEW_SESSIONS_STATE
default:
return state
}
}
export function useFilePreviewSessions() {
const [state, dispatch] = useReducer(
reduceFilePreviewSessions,
INITIAL_FILE_PREVIEW_SESSIONS_STATE
)
const previewSession = useMemo(
() => (state.activeSessionId ? (state.sessions[state.activeSessionId] ?? null) : null),
[state.activeSessionId, state.sessions]
)
const hydratePreviewSessions = useCallback((sessions: FilePreviewSession[]) => {
dispatch({ type: 'hydrate', sessions })
}, [])
const upsertPreviewSession = useCallback(
(session: FilePreviewSession, options?: { activate?: boolean }) => {
dispatch({
type: 'upsert',
session,
...(options?.activate === false ? { activate: false } : {}),
})
},
[]
)
const completePreviewSession = useCallback((session: FilePreviewSession) => {
dispatch({ type: 'complete', session })
}, [])
const removePreviewSession = useCallback((sessionId: string) => {
dispatch({ type: 'remove', sessionId })
}, [])
const resetPreviewSessions = useCallback(() => {
dispatch({ type: 'reset' })
}, [])
return {
previewSession,
previewSessionsById: state.sessions,
activePreviewSessionId: state.activeSessionId,
hydratePreviewSessions,
upsertPreviewSession,
completePreviewSession,
removePreviewSession,
resetPreviewSessions,
}
}

View File

@@ -1,6 +1,7 @@
import { keepPreviousData, useMutation, useQuery, useQueryClient } from '@tanstack/react-query'
import type { PersistedMessage } from '@/lib/copilot/chat/persisted-message'
import { normalizeMessage } from '@/lib/copilot/chat/persisted-message'
import type { FilePreviewSession } from '@/lib/copilot/request/session'
import type { StreamBatchEvent } from '@/lib/copilot/request/session/types'
import type { MothershipResource } from '@/app/workspace/[workspaceId]/home/types'
@@ -18,7 +19,11 @@ export interface TaskChatHistory {
messages: PersistedMessage[]
activeStreamId: string | null
resources: MothershipResource[]
streamSnapshot?: { events: StreamBatchEvent[]; status: string } | null
streamSnapshot?: {
events: StreamBatchEvent[]
previewSessions: FilePreviewSession[]
status: string
} | null
}
export const taskKeys = {
@@ -112,6 +117,7 @@ export async function fetchChatHistory(
: [],
activeStreamId: chat.activeStreamId || null,
resources: Array.isArray(chat.resources) ? chat.resources : [],
streamSnapshot: chat.streamSnapshot || null,
}
}

View File

@@ -0,0 +1,52 @@
/**
* @vitest-environment node
*/
import { describe, expect, it } from 'vitest'
import {
buildPreviewContentUpdate,
decodeJsonStringPrefix,
extractEditContent,
} from '@/lib/copilot/request/go/stream'
describe('copilot go stream helpers', () => {
it('decodes complete escapes and stops at incomplete unicode escapes', () => {
expect(decodeJsonStringPrefix('hello\\nworld')).toBe('hello\nworld')
expect(decodeJsonStringPrefix('emoji \\u263A')).toBe('emoji ☺')
expect(decodeJsonStringPrefix('partial \\u26')).toBe('partial ')
})
it('extracts the streamed edit_content prefix from partial JSON', () => {
expect(extractEditContent('{"content":"hello\\nwor')).toBe('hello\nwor')
expect(extractEditContent('{"content":"tab\\tvalue"}')).toBe('tab\tvalue')
})
it('emits full snapshots for append (sidebar viewer uses replace mode; no delta merge)', () => {
expect(buildPreviewContentUpdate('hello', 'hello world', 100, 200, 'append')).toEqual({
content: 'hello world',
contentMode: 'snapshot',
lastSnapshotAt: 200,
})
})
it('emits deltas for update when the preview extends the previous text', () => {
expect(buildPreviewContentUpdate('hello', 'hello world', 100, 200, 'update')).toEqual({
content: ' world',
contentMode: 'delta',
lastSnapshotAt: 100,
})
})
it('falls back to snapshots for patches and divergent content', () => {
expect(buildPreviewContentUpdate('hello', 'goodbye', 100, 200, 'update')).toEqual({
content: 'goodbye',
contentMode: 'snapshot',
lastSnapshotAt: 200,
})
expect(buildPreviewContentUpdate('hello', 'hello world', 100, 200, 'patch')).toEqual({
content: 'hello world',
contentMode: 'snapshot',
lastSnapshotAt: 200,
})
})
})

View File

@@ -12,7 +12,14 @@ import {
sseHandlers,
subAgentHandlers,
} from '@/lib/copilot/request/handlers'
import { eventToStreamEvent, isEventRecord } from '@/lib/copilot/request/session'
import {
createFilePreviewSession,
eventToStreamEvent,
type FilePreviewContentMode,
type FilePreviewSession,
isEventRecord,
upsertFilePreviewSession,
} from '@/lib/copilot/request/session'
import { shouldSkipToolCallEvent, shouldSkipToolResultEvent } from '@/lib/copilot/request/sse-utils'
import type {
ExecutionContext,
@@ -20,7 +27,14 @@ import type {
StreamEvent,
StreamingContext,
} from '@/lib/copilot/request/types'
import { clearIntentsForWorkspace } from '@/lib/copilot/tools/server/files/file-intent-store'
import {
clearIntentsForWorkspace,
peekFileIntent,
} from '@/lib/copilot/tools/server/files/file-intent-store'
import {
buildFilePreviewText,
loadWorkspaceFileTextForPreview,
} from '@/lib/copilot/tools/server/files/file-preview'
const logger = createLogger('CopilotGoStream')
@@ -38,11 +52,20 @@ type EditContentStreamState = {
lastContentSnapshot?: string
}
type FilePreviewStreamState = {
session: FilePreviewSession
lastEmittedPreviewText: string
lastSnapshotAt: number
}
const PATCH_PREVIEW_SNAPSHOT_INTERVAL_MS = 80
const DELTA_PREVIEW_CHECKPOINT_INTERVAL_MS = 1000
/**
* Decode a prefix of a JSON-encoded string value, handling escape sequences
* that may be incomplete at the end of a streaming chunk.
*/
function decodeJsonStringPrefix(input: string): string {
export function decodeJsonStringPrefix(input: string): string {
let output = ''
for (let i = 0; i < input.length; i++) {
const ch = input[i]
@@ -109,7 +132,7 @@ function decodeJsonStringPrefix(input: string): string {
* Since edit_content has a single field `content`, the JSON is always
* `{"content":"..."}`. We find `"content":"` and decode everything after.
*/
function extractEditContent(raw: string): string {
export function extractEditContent(raw: string): string {
const marker = '"content":'
const idx = raw.indexOf(marker)
if (idx === -1) return ''
@@ -130,6 +153,88 @@ function extractEditContent(raw: string): string {
return decodeJsonStringPrefix(inner)
}
function isContentOperation(
operation: string | undefined
): operation is 'append' | 'update' | 'patch' {
return operation === 'append' || operation === 'update' || operation === 'patch'
}
function isDocFormat(fileName: string | undefined): boolean {
return /\.(pptx|docx|pdf)$/i.test(fileName ?? '')
}
function buildPreviewSessionFromIntent(
streamId: string,
intent: FileIntent,
current?: FilePreviewSession
): FilePreviewSession {
return createFilePreviewSession({
streamId,
toolCallId: intent.toolCallId,
fileName: intent.target.fileName ?? current?.fileName,
...(intent.target.fileId ? { fileId: intent.target.fileId } : {}),
...(intent.target.kind === 'new_file' || intent.target.kind === 'file_id'
? { targetKind: intent.target.kind }
: {}),
operation: intent.operation,
...(intent.edit ? { edit: intent.edit } : {}),
...(typeof current?.baseContent === 'string' ? { baseContent: current.baseContent } : {}),
previewText: current?.previewText ?? '',
previewVersion: current?.previewVersion ?? 0,
status: current?.status ?? 'pending',
completedAt: current?.completedAt,
})
}
async function persistFilePreviewSession(session: FilePreviewSession): Promise<void> {
try {
await upsertFilePreviewSession(session)
} catch (error) {
logger.warn('Failed to persist file preview session', {
streamId: session.streamId,
toolCallId: session.toolCallId,
previewVersion: session.previewVersion,
error: error instanceof Error ? error.message : String(error),
})
}
}
/**
* Chooses snapshot vs delta emission for `file_preview_content`.
*
* **Append** always uses snapshot mode: the sidebar `FileViewer` gets `streamingMode: 'replace'`
* and must receive the full composed document every tick. Delta mode saved bandwidth but required
* merging chunks with `prevSession.previewText`, which could desync and look like mid-stream overwrites.
*/
export function buildPreviewContentUpdate(
previousText: string,
nextText: string,
lastSnapshotAt: number,
now: number,
operation: string | undefined
): { content: string; contentMode: FilePreviewContentMode; lastSnapshotAt: number } {
const shouldForceSnapshot =
previousText.length === 0 ||
!nextText.startsWith(previousText) ||
operation === 'patch' ||
operation === 'append' ||
now - lastSnapshotAt >= DELTA_PREVIEW_CHECKPOINT_INTERVAL_MS
if (shouldForceSnapshot) {
return {
content: nextText,
contentMode: 'snapshot',
lastSnapshotAt: now,
}
}
return {
content: nextText.slice(previousText.length),
contentMode: 'delta',
lastSnapshotAt,
}
}
export class CopilotBackendError extends Error {
status?: number
body?: string
@@ -170,10 +275,11 @@ export interface StreamLoopOptions extends OrchestratorOptions {
* Handles: fetch -> parse -> normalize -> dedupe -> subagent routing -> handler dispatch.
* Callers provide the fetch URL/options and can intercept events via onBeforeDispatch.
*
* File preview streaming uses an intent-based approach:
* 1. workspace_file phase:call → store intent (operation, target, edit metadata)
* 2. edit_content phase:args_delta → stream content using stored intent
* 3. edit_content phase:call → consume and clear intent
* File preview streaming:
* 1. workspace_file phase:call → active intent + load base file text for append/patch (awaited here so
* it cannot race edit_content args_delta; tool executor still stores Redis intent async)
* 2. edit_content phase:args_delta → stream preview from base + streamed content
* 3. edit_content phase:result → complete preview; edit_content tool consumes Redis intent
*/
export async function runStreamLoop(
fetchUrl: string,
@@ -184,6 +290,7 @@ export async function runStreamLoop(
): Promise<void> {
const { timeout = ORCHESTRATION_TIMEOUT_MS, abortSignal } = options
const editContentState = new Map<string, EditContentStreamState>()
const filePreviewState = new Map<string, FilePreviewStreamState>()
const fetchSpan = context.trace.startSpan(
`HTTP Request → ${new URL(fetchUrl).pathname}`,
@@ -272,8 +379,7 @@ export async function runStreamLoop(
const fileId = target.fileId as string | undefined
const fileName = target.fileName as string | undefined
const isContentOp =
operation === 'append' || operation === 'update' || operation === 'patch'
const isContentOp = isContentOperation(operation)
if (context.activeFileIntent && isContentOp) {
logger.warn(
'Orphaned workspace_file intent: content-op workspace_file arrived without edit_content for prior intent',
@@ -285,7 +391,10 @@ export async function runStreamLoop(
}
)
if (execContext.workspaceId) {
const cleared = clearIntentsForWorkspace(execContext.workspaceId)
const cleared = await clearIntentsForWorkspace(execContext.workspaceId, {
chatId: execContext.chatId,
messageId: execContext.messageId,
})
if (cleared > 0) {
logger.warn('Cleared orphaned execution intents from store', {
cleared,
@@ -308,8 +417,32 @@ export async function runStreamLoop(
...(edit ? { edit } : {}),
}
const isDocFormat = /\.(pptx|docx|pdf)$/i.test(fileName ?? '')
if (!isDocFormat && isContentOp) {
if (!isDocFormat(fileName) && isContentOp) {
let previewBaseContent: string | undefined
if (
execContext.workspaceId &&
fileId &&
(operation === 'append' || operation === 'patch')
) {
previewBaseContent = await loadWorkspaceFileTextForPreview(
execContext.workspaceId,
fileId
)
}
let session = buildPreviewSessionFromIntent(
raw.stream.streamId,
context.activeFileIntent
)
if (previewBaseContent !== undefined) {
session = { ...session, baseContent: previewBaseContent }
}
filePreviewState.set(toolCallId, {
session,
lastEmittedPreviewText: '',
lastSnapshotAt: 0,
})
await persistFilePreviewSession(session)
const scope = streamEvent.scope ? { scope: streamEvent.scope } : {}
await options.onEvent?.({
type: MothershipStreamV1EventType.tool,
@@ -353,6 +486,66 @@ export async function runStreamLoop(
}
}
if (
streamEvent.type === MothershipStreamV1EventType.tool &&
streamEvent.payload.phase === MothershipStreamV1ToolPhase.result &&
streamEvent.payload.toolName === 'workspace_file' &&
context.activeFileIntent &&
isContentOperation(context.activeFileIntent.operation) &&
context.activeFileIntent.operation === 'patch' &&
context.activeFileIntent.edit?.strategy === 'anchored' &&
context.activeFileIntent.edit?.mode === 'delete_between' &&
execContext.workspaceId &&
context.activeFileIntent.target.fileId &&
!isDocFormat(context.activeFileIntent.target.fileName)
) {
const currentPreview = filePreviewState.get(context.activeFileIntent.toolCallId)
const previewText = buildFilePreviewText({
operation: 'patch',
streamedContent: '',
existingContent: currentPreview?.session.baseContent,
edit: currentPreview?.session.edit as Record<string, unknown> | undefined,
})
if (previewText !== undefined) {
const baseSession = buildPreviewSessionFromIntent(
raw.stream.streamId,
context.activeFileIntent,
currentPreview?.session
)
const nextSession: FilePreviewSession = {
...baseSession,
status: 'streaming',
previewText,
previewVersion: (currentPreview?.session.previewVersion ?? 0) + 1,
updatedAt: new Date().toISOString(),
}
filePreviewState.set(context.activeFileIntent.toolCallId, {
session: nextSession,
lastEmittedPreviewText: previewText,
lastSnapshotAt: Date.now(),
})
await persistFilePreviewSession(nextSession)
await options.onEvent?.({
type: MothershipStreamV1EventType.tool,
payload: {
toolCallId: nextSession.toolCallId,
toolName: 'workspace_file',
previewPhase: 'file_preview_content',
content: previewText,
contentMode: 'snapshot',
previewVersion: nextSession.previewVersion,
fileName: nextSession.fileName,
...(nextSession.fileId ? { fileId: nextSession.fileId } : {}),
...(nextSession.targetKind ? { targetKind: nextSession.targetKind } : {}),
...(nextSession.operation ? { operation: nextSession.operation } : {}),
...(nextSession.edit ? { edit: nextSession.edit } : {}),
},
...(streamEvent.scope ? { scope: streamEvent.scope } : {}),
})
}
}
// ── edit_content phase:args_delta → stream content using stored intent ──
if (
streamEvent.type === MothershipStreamV1EventType.tool &&
@@ -367,24 +560,129 @@ export async function runStreamLoop(
state.raw += delta
if (context.activeFileIntent) {
const isDocFormat = /\.(pptx|docx|pdf)$/i.test(
context.activeFileIntent.target.fileName ?? ''
)
if (!isDocFormat) {
if (!isDocFormat(context.activeFileIntent.target.fileName)) {
const streamedContent = extractEditContent(state.raw)
if (streamedContent !== (state.lastContentSnapshot ?? '')) {
state.lastContentSnapshot = streamedContent
await options.onEvent?.({
type: MothershipStreamV1EventType.tool,
payload: {
toolCallId: context.activeFileIntent.toolCallId,
toolName: 'workspace_file',
previewPhase: 'file_preview_content',
content: streamedContent,
contentMode: 'snapshot',
},
...(streamEvent.scope ? { scope: streamEvent.scope } : {}),
let currentPreview = filePreviewState.get(context.activeFileIntent.toolCallId) ?? {
session: buildPreviewSessionFromIntent(
raw.stream.streamId,
context.activeFileIntent
),
lastEmittedPreviewText: '',
lastSnapshotAt: 0,
}
/**
* Fallback: primary base is set on `workspace_file` call via
* {@link loadWorkspaceFileTextForPreview}. This only runs when that load failed or
* hydrated sessions lack `baseContent`, once Redis intent is available.
*/
if (
currentPreview.session.baseContent === undefined &&
(context.activeFileIntent.operation === 'append' ||
context.activeFileIntent.operation === 'patch') &&
execContext.workspaceId &&
context.activeFileIntent.target.fileId
) {
const intentBase = await peekFileIntent(
execContext.workspaceId,
context.activeFileIntent.target.fileId,
{
chatId: execContext.chatId,
messageId: execContext.messageId,
}
)
if (typeof intentBase?.existingContent === 'string') {
const seededSession: FilePreviewSession = {
...currentPreview.session,
baseContent: intentBase.existingContent,
...(intentBase.edit
? { edit: intentBase.edit as Record<string, unknown> }
: {}),
}
currentPreview = {
...currentPreview,
session: seededSession,
}
filePreviewState.set(context.activeFileIntent.toolCallId, currentPreview)
await persistFilePreviewSession(seededSession)
}
}
const previewText = buildFilePreviewText({
operation: context.activeFileIntent.operation as 'append' | 'update' | 'patch',
streamedContent,
existingContent: currentPreview.session.baseContent,
edit: currentPreview.session.edit as Record<string, unknown> | undefined,
})
if (previewText !== undefined) {
const baseSession = buildPreviewSessionFromIntent(
raw.stream.streamId,
context.activeFileIntent,
currentPreview?.session
)
const now = Date.now()
const nextSession: FilePreviewSession = {
...baseSession,
status: 'streaming',
previewText,
previewVersion: (currentPreview?.session.previewVersion ?? 0) + 1,
updatedAt: new Date(now).toISOString(),
}
await persistFilePreviewSession(nextSession)
if (
nextSession.operation === 'patch' &&
now - (currentPreview?.lastSnapshotAt ?? 0) < PATCH_PREVIEW_SNAPSHOT_INTERVAL_MS
) {
filePreviewState.set(context.activeFileIntent.toolCallId, {
session: nextSession,
lastEmittedPreviewText: currentPreview?.lastEmittedPreviewText ?? '',
lastSnapshotAt: currentPreview?.lastSnapshotAt ?? 0,
})
} else {
const previewUpdate = buildPreviewContentUpdate(
currentPreview?.lastEmittedPreviewText ?? '',
nextSession.previewText,
currentPreview?.lastSnapshotAt ?? 0,
now,
nextSession.operation
)
filePreviewState.set(context.activeFileIntent.toolCallId, {
session: nextSession,
lastEmittedPreviewText: nextSession.previewText,
lastSnapshotAt: previewUpdate.lastSnapshotAt,
})
await options.onEvent?.({
type: MothershipStreamV1EventType.tool,
payload: {
toolCallId: nextSession.toolCallId,
toolName: 'workspace_file',
previewPhase: 'file_preview_content',
content: previewUpdate.content,
contentMode: previewUpdate.contentMode,
previewVersion: nextSession.previewVersion,
fileName: nextSession.fileName,
...(nextSession.fileId ? { fileId: nextSession.fileId } : {}),
...(nextSession.targetKind ? { targetKind: nextSession.targetKind } : {}),
...(nextSession.operation ? { operation: nextSession.operation } : {}),
...(nextSession.edit ? { edit: nextSession.edit } : {}),
},
...(streamEvent.scope ? { scope: streamEvent.scope } : {}),
})
}
} else {
filePreviewState.set(context.activeFileIntent.toolCallId, {
session: currentPreview.session,
lastEmittedPreviewText: currentPreview.lastEmittedPreviewText,
lastSnapshotAt: currentPreview.lastSnapshotAt,
})
}
}
}
}
@@ -411,6 +709,57 @@ export async function runStreamLoop(
streamEvent.payload.toolName === 'edit_content' &&
context.activeFileIntent
) {
const currentPreview = filePreviewState.get(context.activeFileIntent.toolCallId)
const completedAt = new Date().toISOString()
if (
currentPreview &&
currentPreview.lastEmittedPreviewText !== currentPreview.session.previewText &&
currentPreview.session.previewText.length > 0
) {
filePreviewState.set(context.activeFileIntent.toolCallId, {
session: currentPreview.session,
lastEmittedPreviewText: currentPreview.session.previewText,
lastSnapshotAt: Date.now(),
})
await options.onEvent?.({
type: MothershipStreamV1EventType.tool,
payload: {
toolCallId: currentPreview.session.toolCallId,
toolName: 'workspace_file',
previewPhase: 'file_preview_content',
content: currentPreview.session.previewText,
contentMode: 'snapshot',
previewVersion: currentPreview.session.previewVersion,
fileName: currentPreview.session.fileName,
...(currentPreview.session.fileId ? { fileId: currentPreview.session.fileId } : {}),
...(currentPreview.session.targetKind
? { targetKind: currentPreview.session.targetKind }
: {}),
...(currentPreview.session.operation
? { operation: currentPreview.session.operation }
: {}),
...(currentPreview.session.edit ? { edit: currentPreview.session.edit } : {}),
},
...(streamEvent.scope ? { scope: streamEvent.scope } : {}),
})
}
if (currentPreview) {
const completedSession: FilePreviewSession = {
...currentPreview.session,
status: 'complete',
updatedAt: completedAt,
completedAt,
}
filePreviewState.set(context.activeFileIntent.toolCallId, {
session: completedSession,
lastEmittedPreviewText: completedSession.previewText,
lastSnapshotAt: Date.now(),
})
await persistFilePreviewSession(completedSession)
}
await options.onEvent?.({
type: MothershipStreamV1EventType.tool,
payload: {
@@ -419,6 +768,7 @@ export async function runStreamLoop(
previewPhase: 'file_preview_complete',
fileId: context.activeFileIntent.target.fileId,
output: streamEvent.payload.output,
...(currentPreview ? { previewVersion: currentPreview.session.previewVersion } : {}),
},
...(streamEvent.scope ? { scope: streamEvent.scope } : {}),
})

View File

@@ -403,6 +403,8 @@ async function buildExecutionContext(
if (userTimezone) execContext.userTimezone = userTimezone
execContext.copilotToolExecution = true
if (requestMode) execContext.requestMode = requestMode
execContext.messageId =
typeof requestPayload?.messageId === 'string' ? requestPayload.messageId : undefined
execContext.executionId = executionId
execContext.runId = runId
execContext.abortSignal = abortSignal

View File

@@ -10,7 +10,9 @@ const {
createRunSegment,
updateRunStatus,
resetBuffer,
clearFilePreviewSessions,
scheduleBufferCleanup,
scheduleFilePreviewSessionCleanup,
allocateCursor,
appendEvent,
cleanupAbortMarker,
@@ -21,7 +23,9 @@ const {
createRunSegment: vi.fn(),
updateRunStatus: vi.fn(),
resetBuffer: vi.fn(),
clearFilePreviewSessions: vi.fn(),
scheduleBufferCleanup: vi.fn(),
scheduleFilePreviewSessionCleanup: vi.fn(),
allocateCursor: vi.fn(),
appendEvent: vi.fn(),
cleanupAbortMarker: vi.fn(),
@@ -42,7 +46,9 @@ let mockPublisherController: ReadableStreamDefaultController | null = null
vi.mock('@/lib/copilot/request/session', () => ({
resetBuffer,
clearFilePreviewSessions,
scheduleBufferCleanup,
scheduleFilePreviewSessionCleanup,
allocateCursor,
appendEvent,
cleanupAbortMarker,
@@ -110,7 +116,9 @@ describe('createSSEStream terminal error handling', () => {
beforeEach(() => {
vi.clearAllMocks()
resetBuffer.mockResolvedValue(undefined)
clearFilePreviewSessions.mockResolvedValue(undefined)
scheduleBufferCleanup.mockResolvedValue(undefined)
scheduleFilePreviewSessionCleanup.mockResolvedValue(undefined)
allocateCursor
.mockResolvedValueOnce({ seq: 1, cursor: '1' })
.mockResolvedValueOnce({ seq: 2, cursor: '2' })

View File

@@ -14,11 +14,13 @@ import type { CopilotLifecycleOptions } from '@/lib/copilot/request/lifecycle/ru
import { runCopilotLifecycle } from '@/lib/copilot/request/lifecycle/run'
import {
cleanupAbortMarker,
clearFilePreviewSessions,
registerActiveStream,
releasePendingChatStream,
resetBuffer,
StreamWriter,
scheduleBufferCleanup,
scheduleFilePreviewSessionCleanup,
startAbortPoller,
unregisterActiveStream,
} from '@/lib/copilot/request/session'
@@ -91,6 +93,7 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS
| undefined
await resetBuffer(streamId)
await clearFilePreviewSessions(streamId)
if (chatId) {
createRunSegment({
@@ -207,6 +210,7 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS
await releasePendingChatStream(chatId, streamId)
}
await scheduleBufferCleanup(streamId)
await scheduleFilePreviewSessionCleanup(streamId)
await cleanupAbortMarker(streamId)
const trace = collector.build({

View File

@@ -0,0 +1,42 @@
/**
* @vitest-environment node
*/
import { describe, expect, it } from 'vitest'
import {
createFilePreviewSession,
sortFilePreviewSessions,
} from '@/lib/copilot/request/session/file-preview-session'
describe('file preview session helpers', () => {
it('preserves baseContent when creating a preview session', () => {
const session = createFilePreviewSession({
streamId: 'stream-1',
toolCallId: 'preview-1',
fileName: 'draft.md',
baseContent: 'existing content',
})
expect(session.baseContent).toBe('existing content')
})
it('sorts preview sessions by updatedAt across tool call ids', () => {
const sessions = sortFilePreviewSessions([
createFilePreviewSession({
streamId: 'stream-1',
toolCallId: 'preview-2',
fileName: 'b.md',
previewVersion: 10,
updatedAt: '2026-04-10T00:00:02.000Z',
}),
createFilePreviewSession({
streamId: 'stream-1',
toolCallId: 'preview-1',
fileName: 'a.md',
previewVersion: 1,
updatedAt: '2026-04-10T00:00:01.000Z',
}),
])
expect(sessions.map((session) => session.id)).toEqual(['preview-1', 'preview-2'])
})
})

View File

@@ -0,0 +1,211 @@
import { createLogger } from '@sim/logger'
import { getRedisClient } from '@/lib/core/config/redis'
import { getStreamConfig } from './buffer'
const logger = createLogger('FilePreviewSessionStore')
const STREAM_OUTBOX_PREFIX = 'mothership_stream:'
const DEFAULT_COMPLETED_TTL_SECONDS = 5 * 60
const RETRY_DELAYS_MS = [0, 50, 150] as const
export const FILE_PREVIEW_SESSION_SCHEMA_VERSION = 1 as const
export type FilePreviewTargetKind = 'new_file' | 'file_id'
export type FilePreviewStatus = 'pending' | 'streaming' | 'complete'
export type FilePreviewContentMode = 'delta' | 'snapshot'
export interface FilePreviewSession {
schemaVersion: typeof FILE_PREVIEW_SESSION_SCHEMA_VERSION
id: string
streamId: string
toolCallId: string
status: FilePreviewStatus
fileName: string
fileId?: string
targetKind?: FilePreviewTargetKind
operation?: string
edit?: Record<string, unknown>
baseContent?: string
previewText: string
previewVersion: number
updatedAt: string
completedAt?: string
}
function getPreviewSessionsKey(streamId: string): string {
return `${STREAM_OUTBOX_PREFIX}${streamId}:preview_sessions`
}
type RedisOperationMetadata = {
operation: string
streamId: string
}
async function withRedisRetry<T>(
metadata: RedisOperationMetadata,
operation: (redis: NonNullable<ReturnType<typeof getRedisClient>>) => Promise<T>
): Promise<T> {
const redis = getRedisClient()
if (!redis) {
throw new Error('Redis is required for mothership preview durability')
}
let lastError: unknown
for (let attempt = 0; attempt < RETRY_DELAYS_MS.length; attempt++) {
const delay = RETRY_DELAYS_MS[attempt]
if (delay > 0) {
await new Promise((resolve) => setTimeout(resolve, delay))
}
try {
return await operation(redis)
} catch (error) {
lastError = error
logger.warn('Redis preview session operation failed', {
operation: metadata.operation,
streamId: metadata.streamId,
attempt: attempt + 1,
error: error instanceof Error ? error.message : String(error),
})
}
}
throw lastError instanceof Error
? lastError
: new Error(`${metadata.operation} failed for stream ${metadata.streamId}`)
}
export function createFilePreviewSession(input: {
streamId: string
toolCallId: string
fileName?: string
fileId?: string
targetKind?: FilePreviewTargetKind
operation?: string
edit?: Record<string, unknown>
baseContent?: string
previewText?: string
previewVersion?: number
status?: FilePreviewStatus
updatedAt?: string
completedAt?: string
}): FilePreviewSession {
return {
schemaVersion: FILE_PREVIEW_SESSION_SCHEMA_VERSION,
id: input.toolCallId,
streamId: input.streamId,
toolCallId: input.toolCallId,
status: input.status ?? 'pending',
fileName: input.fileName ?? '',
...(input.fileId ? { fileId: input.fileId } : {}),
...(input.targetKind ? { targetKind: input.targetKind } : {}),
...(input.operation ? { operation: input.operation } : {}),
...(input.edit ? { edit: input.edit } : {}),
...(typeof input.baseContent === 'string' ? { baseContent: input.baseContent } : {}),
previewText: input.previewText ?? '',
previewVersion: input.previewVersion ?? 0,
updatedAt: input.updatedAt ?? new Date().toISOString(),
...(input.completedAt ? { completedAt: input.completedAt } : {}),
}
}
export async function upsertFilePreviewSession(
session: FilePreviewSession
): Promise<FilePreviewSession> {
const config = getStreamConfig()
await withRedisRetry(
{ operation: 'upsert_preview_session', streamId: session.streamId },
async (redis) => {
const key = getPreviewSessionsKey(session.streamId)
const pipeline = redis.pipeline()
pipeline.hset(key, session.id, JSON.stringify(session))
pipeline.expire(key, config.ttlSeconds)
await pipeline.exec()
}
)
return session
}
function isFilePreviewSession(value: unknown): value is FilePreviewSession {
if (!value || typeof value !== 'object') {
return false
}
const record = value as Record<string, unknown>
return (
record.schemaVersion === FILE_PREVIEW_SESSION_SCHEMA_VERSION &&
typeof record.id === 'string' &&
typeof record.streamId === 'string' &&
typeof record.toolCallId === 'string' &&
typeof record.status === 'string' &&
typeof record.fileName === 'string' &&
(record.baseContent === undefined || typeof record.baseContent === 'string') &&
typeof record.previewText === 'string' &&
typeof record.previewVersion === 'number' &&
typeof record.updatedAt === 'string'
)
}
export function sortFilePreviewSessions(sessions: FilePreviewSession[]): FilePreviewSession[] {
return [...sessions].sort((a, b) => {
const updatedAtCompare = a.updatedAt.localeCompare(b.updatedAt)
if (updatedAtCompare !== 0) {
return updatedAtCompare
}
return a.id.localeCompare(b.id)
})
}
export async function readFilePreviewSessions(streamId: string): Promise<FilePreviewSession[]> {
const raw = await withRedisRetry(
{ operation: 'read_preview_sessions', streamId },
async (redis) => redis.hgetall(getPreviewSessionsKey(streamId))
)
const sessions: FilePreviewSession[] = []
const values = Object.values(raw ?? {})
for (const entry of values) {
try {
const parsed = JSON.parse(entry) as unknown
if (!isFilePreviewSession(parsed)) {
logger.warn('Skipping invalid file preview session entry', { streamId })
continue
}
sessions.push(parsed)
} catch (error) {
logger.warn('Failed to parse file preview session entry', {
streamId,
error: error instanceof Error ? error.message : String(error),
})
}
}
return sortFilePreviewSessions(sessions)
}
export async function clearFilePreviewSessions(streamId: string): Promise<void> {
await withRedisRetry({ operation: 'clear_preview_sessions', streamId }, async (redis) => {
await redis.del(getPreviewSessionsKey(streamId))
})
}
export async function scheduleFilePreviewSessionCleanup(
streamId: string,
ttlSeconds = DEFAULT_COMPLETED_TTL_SECONDS
): Promise<void> {
try {
await withRedisRetry(
{ operation: 'schedule_preview_session_cleanup', streamId },
async (redis) => {
await redis.expire(getPreviewSessionsKey(streamId), ttlSeconds)
}
)
} catch (error) {
logger.warn('Failed to shorten preview session retention', {
streamId,
ttlSeconds,
error: error instanceof Error ? error.message : String(error),
})
}
}

View File

@@ -25,6 +25,20 @@ export {
writeAbortMarker,
} from './buffer'
export { createEvent, eventToStreamEvent, isEventRecord, TOOL_CALL_STATUS } from './event'
export type {
FilePreviewContentMode,
FilePreviewSession,
FilePreviewStatus,
FilePreviewTargetKind,
} from './file-preview-session'
export {
clearFilePreviewSessions,
createFilePreviewSession,
FILE_PREVIEW_SESSION_SCHEMA_VERSION,
readFilePreviewSessions,
scheduleFilePreviewSessionCleanup,
upsertFilePreviewSession,
} from './file-preview-session'
export { checkForReplayGap, type ReplayGapResult } from './recovery'
export { encodeSSEComment, encodeSSEEnvelope, SSE_RESPONSE_HEADERS } from './sse'
export type { StreamEvent } from './types'

View File

@@ -5,6 +5,7 @@ export interface ToolExecutionContext {
workflowId: string
workspaceId?: string
chatId?: string
messageId?: string
executionId?: string
runId?: string
copilotToolExecution?: boolean

View File

@@ -18,6 +18,7 @@ export function createServerToolHandler(toolId: string): ToolHandler {
workspaceId: context.workspaceId,
userPermission: context.userPermission ?? undefined,
chatId: context.chatId,
messageId: context.messageId,
abortSignal: context.abortSignal,
})

View File

@@ -85,7 +85,10 @@ export const editContentServerTool: BaseServerTool<EditContentArgs, EditContentR
return { success: false, message: 'content is required for edit_content' }
}
const intent = consumeLatestFileIntent(workspaceId)
const intent = await consumeLatestFileIntent(workspaceId, {
chatId: context.chatId,
messageId: context.messageId,
})
if (!intent) {
return {
success: false,

View File

@@ -1,3 +1,5 @@
import { createLogger } from '@sim/logger'
import { getRedisClient } from '@/lib/core/config/redis'
import type { WorkspaceFileRecord } from '@/lib/uploads/contexts/workspace/workspace-file-manager'
export type PendingFileIntent = {
@@ -5,6 +7,8 @@ export type PendingFileIntent = {
fileId: string
workspaceId: string
userId: string
chatId?: string
messageId?: string
fileRecord: WorkspaceFileRecord
existingContent?: string
edit?: {
@@ -24,67 +28,262 @@ export type PendingFileIntent = {
createdAt: number
}
const INTENT_TTL_MS = 60_000
const store = new Map<string, PendingFileIntent>()
export type FileIntentScope = {
chatId?: string
messageId?: string
}
const logger = createLogger('FileIntentStore')
const INTENT_TTL_MS = 60 * 60 * 1000
const INTENT_TTL_SECONDS = INTENT_TTL_MS / 1000
const REDIS_KEY_PREFIX = 'mothership_file_intent:'
const RETRY_DELAYS_MS = [0, 50, 150] as const
const memoryStore = new Map<string, PendingFileIntent>()
function buildKey(workspaceId: string, fileId: string): string {
return `${workspaceId}:${fileId}`
}
function getWorkspaceRedisKey(workspaceId: string): string {
return `${REDIS_KEY_PREFIX}${workspaceId}`
}
function scopeMatches(intent: PendingFileIntent, scope?: FileIntentScope): boolean {
return intent.chatId === scope?.chatId && intent.messageId === scope?.messageId
}
function buildScopedField(fileId: string, scope?: FileIntentScope): string {
return `${scope?.chatId ?? ''}:${scope?.messageId ?? ''}:${fileId}`
}
function cleanupStale(): void {
const now = Date.now()
for (const [key, intent] of store) {
for (const [key, intent] of memoryStore) {
if (now - intent.createdAt > INTENT_TTL_MS) {
store.delete(key)
memoryStore.delete(key)
}
}
}
export function storeFileIntent(
async function withRedisRetry<T>(
operation: string,
workspaceId: string,
work: (redis: NonNullable<ReturnType<typeof getRedisClient>>) => Promise<T>
): Promise<T> {
const redis = getRedisClient()
if (!redis) {
throw new Error('Redis client unavailable')
}
let lastError: unknown
for (let attempt = 0; attempt < RETRY_DELAYS_MS.length; attempt++) {
const delay = RETRY_DELAYS_MS[attempt]
if (delay > 0) {
await new Promise((resolve) => setTimeout(resolve, delay))
}
try {
return await work(redis)
} catch (error) {
lastError = error
logger.warn('Redis file intent operation failed', {
operation,
workspaceId,
attempt: attempt + 1,
error: error instanceof Error ? error.message : String(error),
})
}
}
throw lastError instanceof Error ? lastError : new Error(`${operation} failed`)
}
function isStale(intent: PendingFileIntent): boolean {
return Date.now() - intent.createdAt > INTENT_TTL_MS
}
function parseIntent(raw: string | null | undefined): PendingFileIntent | undefined {
if (!raw) return undefined
try {
const parsed = JSON.parse(raw) as PendingFileIntent
return isStale(parsed) ? undefined : parsed
} catch (error) {
logger.warn('Failed to parse file intent', {
error: error instanceof Error ? error.message : String(error),
})
return undefined
}
}
export async function storeFileIntent(
workspaceId: string,
fileId: string,
intent: PendingFileIntent
): void {
cleanupStale()
store.set(buildKey(workspaceId, fileId), intent)
): Promise<void> {
const redis = getRedisClient()
if (!redis) {
cleanupStale()
memoryStore.set(buildKey(workspaceId, buildScopedField(fileId, intent)), intent)
return
}
await withRedisRetry('store_file_intent', workspaceId, async (client) => {
const key = getWorkspaceRedisKey(workspaceId)
const pipeline = client.pipeline()
pipeline.hset(key, buildScopedField(fileId, intent), JSON.stringify(intent))
pipeline.expire(key, INTENT_TTL_SECONDS)
await pipeline.exec()
})
}
export function consumeFileIntent(
export async function consumeFileIntent(
workspaceId: string,
fileId: string
): PendingFileIntent | undefined {
const key = buildKey(workspaceId, fileId)
const intent = store.get(key)
if (intent) {
store.delete(key)
fileId: string,
scope?: FileIntentScope
): Promise<PendingFileIntent | undefined> {
const redis = getRedisClient()
if (!redis) {
const key = buildKey(workspaceId, buildScopedField(fileId, scope))
const intent = memoryStore.get(key)
if (intent) {
memoryStore.delete(key)
}
return intent
}
const raw = await withRedisRetry('consume_file_intent', workspaceId, async (client) => {
const key = getWorkspaceRedisKey(workspaceId)
const field = buildScopedField(fileId, scope)
const value = await client.hget(key, field)
if (value !== null) {
await client.hdel(key, field)
}
return value
})
return parseIntent(raw)
}
export async function peekFileIntent(
workspaceId: string,
fileId: string,
scope?: FileIntentScope
): Promise<PendingFileIntent | undefined> {
const redis = getRedisClient()
if (!redis) {
cleanupStale()
return memoryStore.get(buildKey(workspaceId, buildScopedField(fileId, scope)))
}
const raw = await withRedisRetry('peek_file_intent', workspaceId, async (client) => {
const key = getWorkspaceRedisKey(workspaceId)
return client.hget(key, buildScopedField(fileId, scope))
})
const intent = parseIntent(raw)
if (!intent && raw !== null) {
await withRedisRetry('clear_stale_file_intent', workspaceId, async (client) => {
await client.hdel(getWorkspaceRedisKey(workspaceId), buildScopedField(fileId, scope))
})
}
return intent
}
export function consumeLatestFileIntent(workspaceId: string): PendingFileIntent | undefined {
let latest: PendingFileIntent | undefined
let latestKey: string | undefined
for (const [key, intent] of store) {
if (intent.workspaceId === workspaceId) {
if (!latest || intent.createdAt > latest.createdAt) {
latest = intent
latestKey = key
export async function consumeLatestFileIntent(
workspaceId: string,
scope?: FileIntentScope
): Promise<PendingFileIntent | undefined> {
const redis = getRedisClient()
if (!redis) {
cleanupStale()
let latest: PendingFileIntent | undefined
let latestKey: string | undefined
for (const [key, intent] of memoryStore) {
if (intent.workspaceId === workspaceId && scopeMatches(intent, scope)) {
if (!latest || intent.createdAt > latest.createdAt) {
latest = intent
latestKey = key
}
}
}
if (latestKey) {
memoryStore.delete(latestKey)
}
return latest
}
if (latestKey) {
store.delete(latestKey)
const entries = await withRedisRetry('read_workspace_file_intents', workspaceId, async (client) =>
client.hgetall(getWorkspaceRedisKey(workspaceId))
)
let latest: PendingFileIntent | undefined
let latestField: string | undefined
const staleFields: string[] = []
for (const [field, raw] of Object.entries(entries)) {
const parsed = parseIntent(raw)
if (!parsed) {
staleFields.push(field)
continue
}
if (!scopeMatches(parsed, scope)) {
continue
}
if (!latest || parsed.createdAt > latest.createdAt) {
latest = parsed
latestField = field
}
}
const fieldsToDelete = latestField ? [...staleFields, latestField] : staleFields
if (fieldsToDelete.length > 0) {
await withRedisRetry('delete_workspace_file_intents', workspaceId, async (client) => {
await client.hdel(getWorkspaceRedisKey(workspaceId), ...fieldsToDelete)
})
}
return latest
}
export function clearIntentsForWorkspace(workspaceId: string): number {
let cleared = 0
for (const [key, intent] of store) {
if (intent.workspaceId === workspaceId) {
store.delete(key)
cleared++
export async function clearIntentsForWorkspace(
workspaceId: string,
scope?: FileIntentScope
): Promise<number> {
const redis = getRedisClient()
if (!redis) {
let cleared = 0
for (const [key, intent] of memoryStore) {
if (intent.workspaceId === workspaceId && (!scope || scopeMatches(intent, scope))) {
memoryStore.delete(key)
cleared++
}
}
return cleared
}
const key = getWorkspaceRedisKey(workspaceId)
if (!scope) {
const count = await withRedisRetry(
'count_workspace_file_intents',
workspaceId,
async (client) => client.hlen(key)
)
await withRedisRetry('clear_workspace_file_intents', workspaceId, async (client) => {
await client.del(key)
})
return count
}
const entries = await withRedisRetry('read_workspace_file_intents', workspaceId, async (client) =>
client.hgetall(key)
)
const fieldsToDelete: string[] = []
for (const [field, raw] of Object.entries(entries)) {
const parsed = parseIntent(raw)
if (parsed && scopeMatches(parsed, scope)) {
fieldsToDelete.push(field)
}
}
return cleared
if (fieldsToDelete.length > 0) {
await withRedisRetry('clear_scoped_file_intents', workspaceId, async (client) => {
await client.hdel(key, ...fieldsToDelete)
})
}
return fieldsToDelete.length
}

View File

@@ -0,0 +1,79 @@
/**
* @vitest-environment node
*/
import { describe, expect, it } from 'vitest'
import { buildFilePreviewText } from '@/lib/copilot/tools/server/files/file-preview'
describe('buildFilePreviewText', () => {
it('returns the full streamed content for update previews', () => {
expect(
buildFilePreviewText({
operation: 'update',
streamedContent: '',
})
).toBe('')
expect(
buildFilePreviewText({
operation: 'update',
streamedContent: 'updated body',
})
).toBe('updated body')
})
it('builds append previews from the existing file content', () => {
expect(
buildFilePreviewText({
operation: 'append',
existingContent: 'line one',
streamedContent: 'line two',
})
).toBe('line one\nline two')
})
it('applies anchored replace_between previews', () => {
expect(
buildFilePreviewText({
operation: 'patch',
existingContent: ['# Title', 'before', 'after', 'footer'].join('\n'),
streamedContent: 'replacement',
edit: {
strategy: 'anchored',
mode: 'replace_between',
before_anchor: '# Title',
after_anchor: 'after',
},
})
).toBe(['# Title', 'replacement', 'after', 'footer'].join('\n'))
})
it('applies delete_between previews without streamed replacement text', () => {
expect(
buildFilePreviewText({
operation: 'patch',
existingContent: ['keep', 'start', 'remove me', 'end', 'keep too'].join('\n'),
streamedContent: '',
edit: {
strategy: 'anchored',
mode: 'delete_between',
start_anchor: 'start',
end_anchor: 'end',
},
})
).toBe(['keep', 'end', 'keep too'].join('\n'))
})
it('applies search_replace previews', () => {
expect(
buildFilePreviewText({
operation: 'patch',
existingContent: 'hello world',
streamedContent: 'sim',
edit: {
strategy: 'search_replace',
search: 'world',
},
})
).toBe('hello sim')
})
})

View File

@@ -0,0 +1,192 @@
import { createLogger } from '@sim/logger'
import {
downloadWorkspaceFile,
getWorkspaceFile,
} from '@/lib/uploads/contexts/workspace/workspace-file-manager'
const logger = createLogger('CopilotFilePreview')
type FilePreviewEdit = {
strategy?: string
search?: string
replaceAll?: boolean
mode?: string
occurrence?: number
before_anchor?: string
after_anchor?: string
anchor?: string
start_anchor?: string
end_anchor?: string
}
interface BuildFilePreviewTextOptions {
operation: 'create' | 'append' | 'update' | 'patch'
streamedContent: string
existingContent?: string
edit?: Record<string, unknown>
}
function findAnchorIndex(lines: string[], anchor: string, occurrence = 1, afterIndex = -1): number {
const trimmed = anchor.trim()
let count = 0
for (let i = afterIndex + 1; i < lines.length; i++) {
if (lines[i].trim() === trimmed) {
count++
if (count === occurrence) return i
}
}
return -1
}
function extractPatchPreview(
streamedContent: string,
existingContent: string,
edit?: Record<string, unknown>
): string | undefined {
const strategy = typeof edit?.strategy === 'string' ? edit.strategy : undefined
const lines = existingContent.split('\n')
const occurrence =
typeof edit?.occurrence === 'number' && Number.isFinite(edit.occurrence) ? edit.occurrence : 1
if (strategy === 'search_replace') {
const search = typeof edit?.search === 'string' ? edit.search : ''
if (!search) return undefined
if (edit?.replaceAll === true) {
return existingContent.split(search).join(streamedContent)
}
const firstIdx = existingContent.indexOf(search)
if (firstIdx === -1) return undefined
return (
existingContent.slice(0, firstIdx) +
streamedContent +
existingContent.slice(firstIdx + search.length)
)
}
const mode = typeof edit?.mode === 'string' ? edit.mode : undefined
if (!mode) return undefined
if (mode === 'replace_between') {
const beforeAnchor = typeof edit?.before_anchor === 'string' ? edit.before_anchor : undefined
const afterAnchor = typeof edit?.after_anchor === 'string' ? edit.after_anchor : undefined
if (!beforeAnchor || !afterAnchor) return undefined
const beforeIdx = findAnchorIndex(lines, beforeAnchor, occurrence)
const afterIdx = findAnchorIndex(lines, afterAnchor, occurrence, beforeIdx)
if (beforeIdx === -1 || afterIdx === -1 || afterIdx <= beforeIdx) return undefined
const spliced = [
...lines.slice(0, beforeIdx + 1),
...(streamedContent.length > 0 ? streamedContent.split('\n') : []),
...lines.slice(afterIdx),
]
return spliced.join('\n')
}
if (mode === 'insert_after') {
const anchor = typeof edit?.anchor === 'string' ? edit.anchor : undefined
if (!anchor) return undefined
const anchorIdx = findAnchorIndex(lines, anchor, occurrence)
if (anchorIdx === -1) return undefined
const spliced = [
...lines.slice(0, anchorIdx + 1),
...(streamedContent.length > 0 ? streamedContent.split('\n') : []),
...lines.slice(anchorIdx + 1),
]
return spliced.join('\n')
}
if (mode === 'delete_between') {
const startAnchor = typeof edit?.start_anchor === 'string' ? edit.start_anchor : undefined
const endAnchor = typeof edit?.end_anchor === 'string' ? edit.end_anchor : undefined
if (!startAnchor || !endAnchor) return undefined
const startIdx = findAnchorIndex(lines, startAnchor, occurrence)
const endIdx = findAnchorIndex(lines, endAnchor, occurrence, startIdx)
if (startIdx === -1 || endIdx === -1 || endIdx <= startIdx) return undefined
const spliced = [...lines.slice(0, startIdx), ...lines.slice(endIdx)]
return spliced.join('\n')
}
return undefined
}
function shouldApplyPatchPreview(streamedContent: string, edit?: Record<string, unknown>): boolean {
const strategy = typeof edit?.strategy === 'string' ? edit.strategy : undefined
const mode = typeof edit?.mode === 'string' ? edit.mode : undefined
if (strategy === 'anchored' && mode === 'delete_between') {
return true
}
return streamedContent.length > 0
}
function buildAppendPreview(existingContent: string, incomingContent: string): string {
if (incomingContent.length === 0) return existingContent
if (existingContent.length === 0) return incomingContent
return `${existingContent}\n${incomingContent}`
}
/**
* Reads the current UTF-8 text of a workspace file for streaming previews.
*
* Preview runs in the SSE loop on `workspace_file` **call** events, which are
* processed **before** the async tool executor persists {@link storeFileIntent}.
* Loading the base here avoids a race where `edit_content` `args_delta` arrives
* before Redis holds `existingContent`, which would make append previews look like
* full-file replacement until the intent landed.
*/
export async function loadWorkspaceFileTextForPreview(
workspaceId: string,
fileId: string
): Promise<string | undefined> {
try {
const record = await getWorkspaceFile(workspaceId, fileId)
if (!record) return undefined
const buffer = await downloadWorkspaceFile(record)
return buffer.toString('utf-8')
} catch (error) {
logger.warn('Failed to load workspace file text for preview', {
workspaceId,
fileId,
error: error instanceof Error ? error.message : String(error),
})
return undefined
}
}
export function buildFilePreviewText({
operation,
streamedContent,
existingContent,
edit,
}: BuildFilePreviewTextOptions): string | undefined {
if (operation === 'update') {
return streamedContent
}
if (operation === 'create') {
return streamedContent
}
if (operation === 'append') {
if (existingContent !== undefined) {
return buildAppendPreview(existingContent, streamedContent)
}
return streamedContent
}
if (existingContent === undefined) {
return undefined
}
if (!shouldApplyPatchPreview(streamedContent, edit)) {
return undefined
}
return extractPatchPreview(streamedContent, existingContent, edit)
}

View File

@@ -264,11 +264,13 @@ export const workspaceFileServerTool: BaseServerTool<WorkspaceFileArgs, Workspac
}
const currentBuffer = await downloadWsFile(existingFile)
storeFileIntent(workspaceId, target.fileId, {
await storeFileIntent(workspaceId, target.fileId, {
operation: 'append',
fileId: target.fileId,
workspaceId,
userId: context.userId,
chatId: context.chatId,
messageId: context.messageId,
fileRecord: existingFile,
existingContent: currentBuffer.toString('utf-8'),
contentType: normalized.contentType,
@@ -305,11 +307,13 @@ export const workspaceFileServerTool: BaseServerTool<WorkspaceFileArgs, Workspac
}
}
storeFileIntent(workspaceId, target.fileId, {
await storeFileIntent(workspaceId, target.fileId, {
operation: 'update',
fileId: target.fileId,
workspaceId,
userId: context.userId,
chatId: context.chatId,
messageId: context.messageId,
fileRecord,
contentType: normalized.contentType,
title: normalized.title,
@@ -441,11 +445,13 @@ export const workspaceFileServerTool: BaseServerTool<WorkspaceFileArgs, Workspac
}
}
storeFileIntent(workspaceId, target.fileId, {
await storeFileIntent(workspaceId, target.fileId, {
operation: 'patch',
fileId: target.fileId,
workspaceId,
userId: context.userId,
chatId: context.chatId,
messageId: context.messageId,
fileRecord,
existingContent,
edit: {