mirror of
https://github.com/simstudioai/sim.git
synced 2026-03-15 03:00:33 -04:00
Compare commits
2 Commits
improvemen
...
fix/mother
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
34283b551f | ||
|
|
e024f22aa8 |
@@ -14,6 +14,7 @@ import {
|
||||
} from '@/lib/copilot/chat-streaming'
|
||||
import { COPILOT_REQUEST_MODES } from '@/lib/copilot/models'
|
||||
import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator'
|
||||
import { getStreamMeta, readStreamEvents } from '@/lib/copilot/orchestrator/stream/buffer'
|
||||
import {
|
||||
authenticateCopilotRequestSessionOnly,
|
||||
createBadRequestResponse,
|
||||
@@ -454,6 +455,30 @@ export async function GET(req: NextRequest) {
|
||||
return NextResponse.json({ success: false, error: 'Chat not found' }, { status: 404 })
|
||||
}
|
||||
|
||||
let streamSnapshot: {
|
||||
events: Array<{ eventId: number; streamId: string; event: Record<string, unknown> }>
|
||||
status: string
|
||||
} | null = null
|
||||
|
||||
if (chat.conversationId) {
|
||||
try {
|
||||
const [meta, events] = await Promise.all([
|
||||
getStreamMeta(chat.conversationId),
|
||||
readStreamEvents(chat.conversationId, 0),
|
||||
])
|
||||
streamSnapshot = {
|
||||
events: events || [],
|
||||
status: meta?.status || 'unknown',
|
||||
}
|
||||
} catch (err) {
|
||||
logger.warn('Failed to read stream snapshot for chat', {
|
||||
chatId,
|
||||
conversationId: chat.conversationId,
|
||||
error: err instanceof Error ? err.message : String(err),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
const transformedChat = {
|
||||
id: chat.id,
|
||||
title: chat.title,
|
||||
@@ -466,6 +491,7 @@ export async function GET(req: NextRequest) {
|
||||
resources: Array.isArray(chat.resources) ? chat.resources : [],
|
||||
createdAt: chat.createdAt,
|
||||
updatedAt: chat.updatedAt,
|
||||
...(streamSnapshot ? { streamSnapshot } : {}),
|
||||
}
|
||||
|
||||
logger.info(`Retrieved chat ${chatId}`)
|
||||
|
||||
@@ -178,6 +178,7 @@ export function Home({ chatId }: HomeProps = {}) {
|
||||
const {
|
||||
messages,
|
||||
isSending,
|
||||
isReconnecting,
|
||||
sendMessage,
|
||||
stopGeneration,
|
||||
resolvedChatId,
|
||||
@@ -330,7 +331,7 @@ export function Home({ chatId }: HomeProps = {}) {
|
||||
return () => ro.disconnect()
|
||||
}, [hasMessages])
|
||||
|
||||
if (!hasMessages && chatId && isLoadingHistory) {
|
||||
if (chatId && (isLoadingHistory || isReconnecting)) {
|
||||
return (
|
||||
<ChatSkeleton>
|
||||
<UserInput
|
||||
|
||||
@@ -45,6 +45,7 @@ import type {
|
||||
export interface UseChatReturn {
|
||||
messages: ChatMessage[]
|
||||
isSending: boolean
|
||||
isReconnecting: boolean
|
||||
error: string | null
|
||||
resolvedChatId: string | undefined
|
||||
sendMessage: (
|
||||
@@ -250,6 +251,7 @@ export function useChat(
|
||||
const queryClient = useQueryClient()
|
||||
const [messages, setMessages] = useState<ChatMessage[]>([])
|
||||
const [isSending, setIsSending] = useState(false)
|
||||
const [isReconnecting, setIsReconnecting] = useState(false)
|
||||
const [error, setError] = useState<string | null>(null)
|
||||
const [resolvedChatId, setResolvedChatId] = useState<string | undefined>(initialChatId)
|
||||
const [resources, setResources] = useState<MothershipResource[]>([])
|
||||
@@ -268,6 +270,10 @@ export function useChat(
|
||||
}, [messageQueue])
|
||||
|
||||
const sendMessageRef = useRef<UseChatReturn['sendMessage']>(async () => {})
|
||||
const processSSEStreamRef = useRef<
|
||||
(reader: ReadableStreamDefaultReader<Uint8Array>, assistantId: string) => Promise<void>
|
||||
>(async () => {})
|
||||
const finalizeRef = useRef<(options?: { error?: boolean }) => void>(() => {})
|
||||
|
||||
const abortControllerRef = useRef<AbortController | null>(null)
|
||||
const chatIdRef = useRef<string | undefined>(initialChatId)
|
||||
@@ -329,6 +335,7 @@ export function useChat(
|
||||
setMessages([])
|
||||
setError(null)
|
||||
setIsSending(false)
|
||||
setIsReconnecting(false)
|
||||
setResources([])
|
||||
setActiveResourceId(null)
|
||||
setMessageQueue([])
|
||||
@@ -346,6 +353,7 @@ export function useChat(
|
||||
setMessages([])
|
||||
setError(null)
|
||||
setIsSending(false)
|
||||
setIsReconnecting(false)
|
||||
setResources([])
|
||||
setActiveResourceId(null)
|
||||
setMessageQueue([])
|
||||
@@ -365,6 +373,95 @@ export function useChat(
|
||||
ensureWorkflowInRegistry(resource.id, resource.title, workspaceId)
|
||||
}
|
||||
}
|
||||
|
||||
// Kick off stream reconnection immediately if there's an active stream.
|
||||
// The stream snapshot was fetched in parallel with the chat history (same
|
||||
// API call), so there's no extra round-trip.
|
||||
const activeStreamId = chatHistory.activeStreamId
|
||||
const snapshot = chatHistory.streamSnapshot
|
||||
if (activeStreamId && !sendingRef.current) {
|
||||
const gen = ++streamGenRef.current
|
||||
const abortController = new AbortController()
|
||||
abortControllerRef.current = abortController
|
||||
streamIdRef.current = activeStreamId
|
||||
sendingRef.current = true
|
||||
setIsReconnecting(true)
|
||||
|
||||
const assistantId = crypto.randomUUID()
|
||||
|
||||
const reconnect = async () => {
|
||||
try {
|
||||
const encoder = new TextEncoder()
|
||||
|
||||
const batchEvents = snapshot?.events ?? []
|
||||
const streamStatus = snapshot?.status ?? ''
|
||||
|
||||
if (!snapshot || (batchEvents.length === 0 && streamStatus === 'unknown')) {
|
||||
// No snapshot available — stream buffer expired. Clean up.
|
||||
const cid = chatIdRef.current
|
||||
if (cid) {
|
||||
fetch('/api/mothership/chat/stop', {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({ chatId: cid, streamId: activeStreamId, content: '' }),
|
||||
}).catch(() => {})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
setIsSending(true)
|
||||
setIsReconnecting(false)
|
||||
|
||||
const lastEventId =
|
||||
batchEvents.length > 0 ? batchEvents[batchEvents.length - 1].eventId : 0
|
||||
const isStreamDone = streamStatus === 'complete' || streamStatus === 'error'
|
||||
|
||||
const combinedStream = new ReadableStream<Uint8Array>({
|
||||
async start(controller) {
|
||||
if (batchEvents.length > 0) {
|
||||
const sseText = batchEvents
|
||||
.map((e) => `data: ${JSON.stringify(e.event)}\n`)
|
||||
.join('\n')
|
||||
controller.enqueue(encoder.encode(`${sseText}\n`))
|
||||
}
|
||||
|
||||
if (!isStreamDone) {
|
||||
try {
|
||||
const sseRes = await fetch(
|
||||
`/api/copilot/chat/stream?streamId=${activeStreamId}&from=${lastEventId}`,
|
||||
{ signal: abortController.signal }
|
||||
)
|
||||
if (sseRes.ok && sseRes.body) {
|
||||
const reader = sseRes.body.getReader()
|
||||
while (true) {
|
||||
const { done, value } = await reader.read()
|
||||
if (done) break
|
||||
controller.enqueue(value)
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
if (!(err instanceof Error && err.name === 'AbortError')) {
|
||||
logger.warn('SSE tail failed during reconnect', err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
controller.close()
|
||||
},
|
||||
})
|
||||
|
||||
await processSSEStreamRef.current(combinedStream.getReader(), assistantId)
|
||||
} catch (err) {
|
||||
if (err instanceof Error && err.name === 'AbortError') return
|
||||
} finally {
|
||||
setIsReconnecting(false)
|
||||
if (streamGenRef.current === gen) {
|
||||
finalizeRef.current()
|
||||
}
|
||||
}
|
||||
}
|
||||
reconnect()
|
||||
}
|
||||
}, [chatHistory, workspaceId])
|
||||
|
||||
useEffect(() => {
|
||||
@@ -405,11 +502,14 @@ export function useChat(
|
||||
|
||||
const flush = () => {
|
||||
streamingBlocksRef.current = [...blocks]
|
||||
setMessages((prev) =>
|
||||
prev.map((m) =>
|
||||
m.id === assistantId ? { ...m, content: runningText, contentBlocks: [...blocks] } : m
|
||||
)
|
||||
)
|
||||
const snapshot = { content: runningText, contentBlocks: [...blocks] }
|
||||
setMessages((prev) => {
|
||||
const idx = prev.findIndex((m) => m.id === assistantId)
|
||||
if (idx >= 0) {
|
||||
return prev.map((m) => (m.id === assistantId ? { ...m, ...snapshot } : m))
|
||||
}
|
||||
return [...prev, { id: assistantId, role: 'assistant' as const, ...snapshot }]
|
||||
})
|
||||
}
|
||||
|
||||
while (true) {
|
||||
@@ -662,6 +762,9 @@ export function useChat(
|
||||
},
|
||||
[workspaceId, queryClient, addResource, removeResource]
|
||||
)
|
||||
useLayoutEffect(() => {
|
||||
processSSEStreamRef.current = processSSEStream
|
||||
})
|
||||
|
||||
const persistPartialResponse = useCallback(async () => {
|
||||
const chatId = chatIdRef.current
|
||||
@@ -750,50 +853,9 @@ export function useChat(
|
||||
},
|
||||
[invalidateChatQueries]
|
||||
)
|
||||
|
||||
useEffect(() => {
|
||||
const activeStreamId = chatHistory?.activeStreamId
|
||||
if (!activeStreamId || !appliedChatIdRef.current || sendingRef.current) return
|
||||
|
||||
const gen = ++streamGenRef.current
|
||||
const abortController = new AbortController()
|
||||
abortControllerRef.current = abortController
|
||||
sendingRef.current = true
|
||||
setIsSending(true)
|
||||
|
||||
const assistantId = crypto.randomUUID()
|
||||
setMessages((prev) => [
|
||||
...prev,
|
||||
{
|
||||
id: assistantId,
|
||||
role: 'assistant' as const,
|
||||
content: '',
|
||||
contentBlocks: [],
|
||||
},
|
||||
])
|
||||
|
||||
const reconnect = async () => {
|
||||
try {
|
||||
const response = await fetch(`/api/copilot/chat/stream?streamId=${activeStreamId}&from=0`, {
|
||||
signal: abortController.signal,
|
||||
})
|
||||
if (!response.ok || !response.body) return
|
||||
await processSSEStream(response.body.getReader(), assistantId)
|
||||
} catch (err) {
|
||||
if (err instanceof Error && err.name === 'AbortError') return
|
||||
} finally {
|
||||
if (streamGenRef.current === gen) {
|
||||
finalize()
|
||||
}
|
||||
}
|
||||
}
|
||||
reconnect()
|
||||
|
||||
return () => {
|
||||
abortController.abort()
|
||||
appliedChatIdRef.current = undefined
|
||||
}
|
||||
}, [chatHistory?.activeStreamId, processSSEStream, finalize])
|
||||
useLayoutEffect(() => {
|
||||
finalizeRef.current = finalize
|
||||
})
|
||||
|
||||
const sendMessage = useCallback(
|
||||
async (message: string, fileAttachments?: FileAttachmentForApi[], contexts?: ChatContext[]) => {
|
||||
@@ -937,7 +999,11 @@ export function useChat(
|
||||
if (sendingRef.current) {
|
||||
await persistPartialResponse()
|
||||
}
|
||||
const sid = streamIdRef.current
|
||||
const sid =
|
||||
streamIdRef.current ||
|
||||
queryClient.getQueryData<TaskChatHistory>(taskKeys.detail(chatIdRef.current))
|
||||
?.activeStreamId ||
|
||||
undefined
|
||||
streamGenRef.current++
|
||||
abortControllerRef.current?.abort()
|
||||
abortControllerRef.current = null
|
||||
@@ -1054,6 +1120,7 @@ export function useChat(
|
||||
return {
|
||||
messages,
|
||||
isSending,
|
||||
isReconnecting,
|
||||
error,
|
||||
resolvedChatId,
|
||||
sendMessage,
|
||||
|
||||
@@ -521,62 +521,69 @@ export async function executeWorkflowWithFullLogging(
|
||||
const data = line.substring(6).trim()
|
||||
if (data === '[DONE]') continue
|
||||
|
||||
let event: any
|
||||
try {
|
||||
const event = JSON.parse(data)
|
||||
event = JSON.parse(data)
|
||||
} catch {
|
||||
continue
|
||||
}
|
||||
|
||||
switch (event.type) {
|
||||
case 'execution:started': {
|
||||
setCurrentExecutionId(wfId, event.executionId)
|
||||
executionIdRef.current = event.executionId || executionId
|
||||
break
|
||||
}
|
||||
|
||||
case 'block:started':
|
||||
blockHandlers.onBlockStarted(event.data)
|
||||
break
|
||||
|
||||
case 'block:completed':
|
||||
blockHandlers.onBlockCompleted(event.data)
|
||||
break
|
||||
|
||||
case 'block:error':
|
||||
blockHandlers.onBlockError(event.data)
|
||||
break
|
||||
|
||||
case 'block:childWorkflowStarted':
|
||||
blockHandlers.onBlockChildWorkflowStarted(event.data)
|
||||
break
|
||||
|
||||
case 'execution:completed':
|
||||
setCurrentExecutionId(wfId, null)
|
||||
executionResult = {
|
||||
success: event.data.success,
|
||||
output: event.data.output,
|
||||
logs: [],
|
||||
metadata: {
|
||||
duration: event.data.duration,
|
||||
startTime: event.data.startTime,
|
||||
endTime: event.data.endTime,
|
||||
},
|
||||
}
|
||||
break
|
||||
|
||||
case 'execution:cancelled':
|
||||
setCurrentExecutionId(wfId, null)
|
||||
executionResult = {
|
||||
success: false,
|
||||
output: {},
|
||||
error: 'Execution was cancelled',
|
||||
logs: [],
|
||||
}
|
||||
break
|
||||
|
||||
case 'execution:error':
|
||||
setCurrentExecutionId(wfId, null)
|
||||
throw new Error(event.data.error || 'Execution failed')
|
||||
switch (event.type) {
|
||||
case 'execution:started': {
|
||||
setCurrentExecutionId(wfId, event.executionId)
|
||||
executionIdRef.current = event.executionId || executionId
|
||||
break
|
||||
}
|
||||
} catch (parseError) {
|
||||
// Skip malformed SSE events
|
||||
|
||||
case 'block:started':
|
||||
blockHandlers.onBlockStarted(event.data)
|
||||
break
|
||||
|
||||
case 'block:completed':
|
||||
blockHandlers.onBlockCompleted(event.data)
|
||||
break
|
||||
|
||||
case 'block:error':
|
||||
blockHandlers.onBlockError(event.data)
|
||||
break
|
||||
|
||||
case 'block:childWorkflowStarted':
|
||||
blockHandlers.onBlockChildWorkflowStarted(event.data)
|
||||
break
|
||||
|
||||
case 'execution:completed':
|
||||
setCurrentExecutionId(wfId, null)
|
||||
executionResult = {
|
||||
success: event.data.success,
|
||||
output: event.data.output,
|
||||
logs: [],
|
||||
metadata: {
|
||||
duration: event.data.duration,
|
||||
startTime: event.data.startTime,
|
||||
endTime: event.data.endTime,
|
||||
},
|
||||
}
|
||||
break
|
||||
|
||||
case 'execution:cancelled':
|
||||
setCurrentExecutionId(wfId, null)
|
||||
executionResult = {
|
||||
success: false,
|
||||
output: {},
|
||||
error: 'Execution was cancelled',
|
||||
logs: [],
|
||||
}
|
||||
break
|
||||
|
||||
case 'execution:error':
|
||||
setCurrentExecutionId(wfId, null)
|
||||
executionResult = {
|
||||
success: false,
|
||||
output: {},
|
||||
error: event.data.error || 'Execution failed',
|
||||
logs: [],
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,12 +9,18 @@ export interface TaskMetadata {
|
||||
isUnread: boolean
|
||||
}
|
||||
|
||||
export interface StreamSnapshot {
|
||||
events: Array<{ eventId: number; streamId: string; event: Record<string, unknown> }>
|
||||
status: string
|
||||
}
|
||||
|
||||
export interface TaskChatHistory {
|
||||
id: string
|
||||
title: string | null
|
||||
messages: TaskStoredMessage[]
|
||||
activeStreamId: string | null
|
||||
resources: MothershipResource[]
|
||||
streamSnapshot?: StreamSnapshot | null
|
||||
}
|
||||
|
||||
export interface TaskStoredToolCall {
|
||||
@@ -135,6 +141,7 @@ async function fetchChatHistory(chatId: string, signal?: AbortSignal): Promise<T
|
||||
messages: Array.isArray(chat.messages) ? chat.messages : [],
|
||||
activeStreamId: chat.conversationId || null,
|
||||
resources: Array.isArray(chat.resources) ? chat.resources : [],
|
||||
streamSnapshot: chat.streamSnapshot || null,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -21,12 +21,6 @@ import type {
|
||||
import { executeToolAndReport, waitForToolCompletion, waitForToolDecision } from './tool-execution'
|
||||
|
||||
const logger = createLogger('CopilotSseHandlers')
|
||||
const CLIENT_WORKFLOW_TOOLS = new Set([
|
||||
'run_workflow',
|
||||
'run_workflow_until_block',
|
||||
'run_block',
|
||||
'run_from_block',
|
||||
])
|
||||
|
||||
/**
|
||||
* Extract the `ui` object from a Go SSE event. The Go backend enriches
|
||||
@@ -320,17 +314,6 @@ export const sseHandlers: Record<string, SSEHandler> = {
|
||||
}
|
||||
|
||||
if (options.interactive === false) {
|
||||
if (clientExecutable && CLIENT_WORKFLOW_TOOLS.has(toolName)) {
|
||||
toolCall.status = 'executing'
|
||||
const completion = await waitForToolCompletion(
|
||||
toolCallId,
|
||||
options.timeout || STREAM_TIMEOUT_MS,
|
||||
options.abortSignal
|
||||
)
|
||||
handleClientCompletion(toolCall, toolCallId, completion)
|
||||
await emitSyntheticToolResult(toolCallId, toolCall.name, completion, options)
|
||||
return
|
||||
}
|
||||
if (options.autoExecuteTools !== false) {
|
||||
fireToolExecution()
|
||||
}
|
||||
@@ -580,17 +563,6 @@ export const subAgentHandlers: Record<string, SSEHandler> = {
|
||||
}
|
||||
|
||||
if (options.interactive === false) {
|
||||
if (clientExecutable && CLIENT_WORKFLOW_TOOLS.has(toolName)) {
|
||||
toolCall.status = 'executing'
|
||||
const completion = await waitForToolCompletion(
|
||||
toolCallId,
|
||||
options.timeout || STREAM_TIMEOUT_MS,
|
||||
options.abortSignal
|
||||
)
|
||||
handleClientCompletion(toolCall, toolCallId, completion)
|
||||
await emitSyntheticToolResult(toolCallId, toolCall.name, completion, options)
|
||||
return
|
||||
}
|
||||
if (options.autoExecuteTools !== false) {
|
||||
fireToolExecution()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user