Compare commits

...

2 Commits

Author SHA1 Message Date
Siddharth Ganesan
34283b551f Fix dynimp 2026-03-14 18:54:37 -07:00
Siddharth Ganesan
e024f22aa8 Fix redis queuing and run 2026-03-14 18:43:19 -07:00
6 changed files with 212 additions and 132 deletions

View File

@@ -14,6 +14,7 @@ import {
} from '@/lib/copilot/chat-streaming'
import { COPILOT_REQUEST_MODES } from '@/lib/copilot/models'
import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator'
import { getStreamMeta, readStreamEvents } from '@/lib/copilot/orchestrator/stream/buffer'
import {
authenticateCopilotRequestSessionOnly,
createBadRequestResponse,
@@ -454,6 +455,30 @@ export async function GET(req: NextRequest) {
return NextResponse.json({ success: false, error: 'Chat not found' }, { status: 404 })
}
let streamSnapshot: {
events: Array<{ eventId: number; streamId: string; event: Record<string, unknown> }>
status: string
} | null = null
if (chat.conversationId) {
try {
const [meta, events] = await Promise.all([
getStreamMeta(chat.conversationId),
readStreamEvents(chat.conversationId, 0),
])
streamSnapshot = {
events: events || [],
status: meta?.status || 'unknown',
}
} catch (err) {
logger.warn('Failed to read stream snapshot for chat', {
chatId,
conversationId: chat.conversationId,
error: err instanceof Error ? err.message : String(err),
})
}
}
const transformedChat = {
id: chat.id,
title: chat.title,
@@ -466,6 +491,7 @@ export async function GET(req: NextRequest) {
resources: Array.isArray(chat.resources) ? chat.resources : [],
createdAt: chat.createdAt,
updatedAt: chat.updatedAt,
...(streamSnapshot ? { streamSnapshot } : {}),
}
logger.info(`Retrieved chat ${chatId}`)

View File

@@ -178,6 +178,7 @@ export function Home({ chatId }: HomeProps = {}) {
const {
messages,
isSending,
isReconnecting,
sendMessage,
stopGeneration,
resolvedChatId,
@@ -330,7 +331,7 @@ export function Home({ chatId }: HomeProps = {}) {
return () => ro.disconnect()
}, [hasMessages])
if (!hasMessages && chatId && isLoadingHistory) {
if (chatId && (isLoadingHistory || isReconnecting)) {
return (
<ChatSkeleton>
<UserInput

View File

@@ -45,6 +45,7 @@ import type {
export interface UseChatReturn {
messages: ChatMessage[]
isSending: boolean
isReconnecting: boolean
error: string | null
resolvedChatId: string | undefined
sendMessage: (
@@ -250,6 +251,7 @@ export function useChat(
const queryClient = useQueryClient()
const [messages, setMessages] = useState<ChatMessage[]>([])
const [isSending, setIsSending] = useState(false)
const [isReconnecting, setIsReconnecting] = useState(false)
const [error, setError] = useState<string | null>(null)
const [resolvedChatId, setResolvedChatId] = useState<string | undefined>(initialChatId)
const [resources, setResources] = useState<MothershipResource[]>([])
@@ -268,6 +270,10 @@ export function useChat(
}, [messageQueue])
const sendMessageRef = useRef<UseChatReturn['sendMessage']>(async () => {})
const processSSEStreamRef = useRef<
(reader: ReadableStreamDefaultReader<Uint8Array>, assistantId: string) => Promise<void>
>(async () => {})
const finalizeRef = useRef<(options?: { error?: boolean }) => void>(() => {})
const abortControllerRef = useRef<AbortController | null>(null)
const chatIdRef = useRef<string | undefined>(initialChatId)
@@ -329,6 +335,7 @@ export function useChat(
setMessages([])
setError(null)
setIsSending(false)
setIsReconnecting(false)
setResources([])
setActiveResourceId(null)
setMessageQueue([])
@@ -346,6 +353,7 @@ export function useChat(
setMessages([])
setError(null)
setIsSending(false)
setIsReconnecting(false)
setResources([])
setActiveResourceId(null)
setMessageQueue([])
@@ -365,6 +373,95 @@ export function useChat(
ensureWorkflowInRegistry(resource.id, resource.title, workspaceId)
}
}
// Kick off stream reconnection immediately if there's an active stream.
// The stream snapshot was fetched in parallel with the chat history (same
// API call), so there's no extra round-trip.
const activeStreamId = chatHistory.activeStreamId
const snapshot = chatHistory.streamSnapshot
if (activeStreamId && !sendingRef.current) {
const gen = ++streamGenRef.current
const abortController = new AbortController()
abortControllerRef.current = abortController
streamIdRef.current = activeStreamId
sendingRef.current = true
setIsReconnecting(true)
const assistantId = crypto.randomUUID()
const reconnect = async () => {
try {
const encoder = new TextEncoder()
const batchEvents = snapshot?.events ?? []
const streamStatus = snapshot?.status ?? ''
if (!snapshot || (batchEvents.length === 0 && streamStatus === 'unknown')) {
// No snapshot available — stream buffer expired. Clean up.
const cid = chatIdRef.current
if (cid) {
fetch('/api/mothership/chat/stop', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ chatId: cid, streamId: activeStreamId, content: '' }),
}).catch(() => {})
}
return
}
setIsSending(true)
setIsReconnecting(false)
const lastEventId =
batchEvents.length > 0 ? batchEvents[batchEvents.length - 1].eventId : 0
const isStreamDone = streamStatus === 'complete' || streamStatus === 'error'
const combinedStream = new ReadableStream<Uint8Array>({
async start(controller) {
if (batchEvents.length > 0) {
const sseText = batchEvents
.map((e) => `data: ${JSON.stringify(e.event)}\n`)
.join('\n')
controller.enqueue(encoder.encode(`${sseText}\n`))
}
if (!isStreamDone) {
try {
const sseRes = await fetch(
`/api/copilot/chat/stream?streamId=${activeStreamId}&from=${lastEventId}`,
{ signal: abortController.signal }
)
if (sseRes.ok && sseRes.body) {
const reader = sseRes.body.getReader()
while (true) {
const { done, value } = await reader.read()
if (done) break
controller.enqueue(value)
}
}
} catch (err) {
if (!(err instanceof Error && err.name === 'AbortError')) {
logger.warn('SSE tail failed during reconnect', err)
}
}
}
controller.close()
},
})
await processSSEStreamRef.current(combinedStream.getReader(), assistantId)
} catch (err) {
if (err instanceof Error && err.name === 'AbortError') return
} finally {
setIsReconnecting(false)
if (streamGenRef.current === gen) {
finalizeRef.current()
}
}
}
reconnect()
}
}, [chatHistory, workspaceId])
useEffect(() => {
@@ -405,11 +502,14 @@ export function useChat(
const flush = () => {
streamingBlocksRef.current = [...blocks]
setMessages((prev) =>
prev.map((m) =>
m.id === assistantId ? { ...m, content: runningText, contentBlocks: [...blocks] } : m
)
)
const snapshot = { content: runningText, contentBlocks: [...blocks] }
setMessages((prev) => {
const idx = prev.findIndex((m) => m.id === assistantId)
if (idx >= 0) {
return prev.map((m) => (m.id === assistantId ? { ...m, ...snapshot } : m))
}
return [...prev, { id: assistantId, role: 'assistant' as const, ...snapshot }]
})
}
while (true) {
@@ -662,6 +762,9 @@ export function useChat(
},
[workspaceId, queryClient, addResource, removeResource]
)
useLayoutEffect(() => {
processSSEStreamRef.current = processSSEStream
})
const persistPartialResponse = useCallback(async () => {
const chatId = chatIdRef.current
@@ -750,50 +853,9 @@ export function useChat(
},
[invalidateChatQueries]
)
useEffect(() => {
const activeStreamId = chatHistory?.activeStreamId
if (!activeStreamId || !appliedChatIdRef.current || sendingRef.current) return
const gen = ++streamGenRef.current
const abortController = new AbortController()
abortControllerRef.current = abortController
sendingRef.current = true
setIsSending(true)
const assistantId = crypto.randomUUID()
setMessages((prev) => [
...prev,
{
id: assistantId,
role: 'assistant' as const,
content: '',
contentBlocks: [],
},
])
const reconnect = async () => {
try {
const response = await fetch(`/api/copilot/chat/stream?streamId=${activeStreamId}&from=0`, {
signal: abortController.signal,
})
if (!response.ok || !response.body) return
await processSSEStream(response.body.getReader(), assistantId)
} catch (err) {
if (err instanceof Error && err.name === 'AbortError') return
} finally {
if (streamGenRef.current === gen) {
finalize()
}
}
}
reconnect()
return () => {
abortController.abort()
appliedChatIdRef.current = undefined
}
}, [chatHistory?.activeStreamId, processSSEStream, finalize])
useLayoutEffect(() => {
finalizeRef.current = finalize
})
const sendMessage = useCallback(
async (message: string, fileAttachments?: FileAttachmentForApi[], contexts?: ChatContext[]) => {
@@ -937,7 +999,11 @@ export function useChat(
if (sendingRef.current) {
await persistPartialResponse()
}
const sid = streamIdRef.current
const sid =
streamIdRef.current ||
queryClient.getQueryData<TaskChatHistory>(taskKeys.detail(chatIdRef.current))
?.activeStreamId ||
undefined
streamGenRef.current++
abortControllerRef.current?.abort()
abortControllerRef.current = null
@@ -1054,6 +1120,7 @@ export function useChat(
return {
messages,
isSending,
isReconnecting,
error,
resolvedChatId,
sendMessage,

View File

@@ -521,62 +521,69 @@ export async function executeWorkflowWithFullLogging(
const data = line.substring(6).trim()
if (data === '[DONE]') continue
let event: any
try {
const event = JSON.parse(data)
event = JSON.parse(data)
} catch {
continue
}
switch (event.type) {
case 'execution:started': {
setCurrentExecutionId(wfId, event.executionId)
executionIdRef.current = event.executionId || executionId
break
}
case 'block:started':
blockHandlers.onBlockStarted(event.data)
break
case 'block:completed':
blockHandlers.onBlockCompleted(event.data)
break
case 'block:error':
blockHandlers.onBlockError(event.data)
break
case 'block:childWorkflowStarted':
blockHandlers.onBlockChildWorkflowStarted(event.data)
break
case 'execution:completed':
setCurrentExecutionId(wfId, null)
executionResult = {
success: event.data.success,
output: event.data.output,
logs: [],
metadata: {
duration: event.data.duration,
startTime: event.data.startTime,
endTime: event.data.endTime,
},
}
break
case 'execution:cancelled':
setCurrentExecutionId(wfId, null)
executionResult = {
success: false,
output: {},
error: 'Execution was cancelled',
logs: [],
}
break
case 'execution:error':
setCurrentExecutionId(wfId, null)
throw new Error(event.data.error || 'Execution failed')
switch (event.type) {
case 'execution:started': {
setCurrentExecutionId(wfId, event.executionId)
executionIdRef.current = event.executionId || executionId
break
}
} catch (parseError) {
// Skip malformed SSE events
case 'block:started':
blockHandlers.onBlockStarted(event.data)
break
case 'block:completed':
blockHandlers.onBlockCompleted(event.data)
break
case 'block:error':
blockHandlers.onBlockError(event.data)
break
case 'block:childWorkflowStarted':
blockHandlers.onBlockChildWorkflowStarted(event.data)
break
case 'execution:completed':
setCurrentExecutionId(wfId, null)
executionResult = {
success: event.data.success,
output: event.data.output,
logs: [],
metadata: {
duration: event.data.duration,
startTime: event.data.startTime,
endTime: event.data.endTime,
},
}
break
case 'execution:cancelled':
setCurrentExecutionId(wfId, null)
executionResult = {
success: false,
output: {},
error: 'Execution was cancelled',
logs: [],
}
break
case 'execution:error':
setCurrentExecutionId(wfId, null)
executionResult = {
success: false,
output: {},
error: event.data.error || 'Execution failed',
logs: [],
}
break
}
}
}

View File

@@ -9,12 +9,18 @@ export interface TaskMetadata {
isUnread: boolean
}
export interface StreamSnapshot {
events: Array<{ eventId: number; streamId: string; event: Record<string, unknown> }>
status: string
}
export interface TaskChatHistory {
id: string
title: string | null
messages: TaskStoredMessage[]
activeStreamId: string | null
resources: MothershipResource[]
streamSnapshot?: StreamSnapshot | null
}
export interface TaskStoredToolCall {
@@ -135,6 +141,7 @@ async function fetchChatHistory(chatId: string, signal?: AbortSignal): Promise<T
messages: Array.isArray(chat.messages) ? chat.messages : [],
activeStreamId: chat.conversationId || null,
resources: Array.isArray(chat.resources) ? chat.resources : [],
streamSnapshot: chat.streamSnapshot || null,
}
}

View File

@@ -21,12 +21,6 @@ import type {
import { executeToolAndReport, waitForToolCompletion, waitForToolDecision } from './tool-execution'
const logger = createLogger('CopilotSseHandlers')
const CLIENT_WORKFLOW_TOOLS = new Set([
'run_workflow',
'run_workflow_until_block',
'run_block',
'run_from_block',
])
/**
* Extract the `ui` object from a Go SSE event. The Go backend enriches
@@ -320,17 +314,6 @@ export const sseHandlers: Record<string, SSEHandler> = {
}
if (options.interactive === false) {
if (clientExecutable && CLIENT_WORKFLOW_TOOLS.has(toolName)) {
toolCall.status = 'executing'
const completion = await waitForToolCompletion(
toolCallId,
options.timeout || STREAM_TIMEOUT_MS,
options.abortSignal
)
handleClientCompletion(toolCall, toolCallId, completion)
await emitSyntheticToolResult(toolCallId, toolCall.name, completion, options)
return
}
if (options.autoExecuteTools !== false) {
fireToolExecution()
}
@@ -580,17 +563,6 @@ export const subAgentHandlers: Record<string, SSEHandler> = {
}
if (options.interactive === false) {
if (clientExecutable && CLIENT_WORKFLOW_TOOLS.has(toolName)) {
toolCall.status = 'executing'
const completion = await waitForToolCompletion(
toolCallId,
options.timeout || STREAM_TIMEOUT_MS,
options.abortSignal
)
handleClientCompletion(toolCall, toolCallId, completion)
await emitSyntheticToolResult(toolCallId, toolCall.name, completion, options)
return
}
if (options.autoExecuteTools !== false) {
fireToolExecution()
}