mirror of
https://github.com/simstudioai/sim.git
synced 2026-04-28 03:00:29 -04:00
improvement(mothership): stream retry state machine, progressive re-rendering (#4287)
* improvement(mothership): stream rety state machine, progressive re-rendering * address comments
This commit is contained in:
committed by
GitHub
parent
04f1d015f3
commit
56044776d5
@@ -38,6 +38,7 @@ vi.mock('@/lib/copilot/request/session', () => ({
|
||||
}),
|
||||
encodeSSEEnvelope: (event: Record<string, unknown>) =>
|
||||
new TextEncoder().encode(`data: ${JSON.stringify(event)}\n\n`),
|
||||
encodeSSEComment: (comment: string) => new TextEncoder().encode(`: ${comment}\n\n`),
|
||||
SSE_RESPONSE_HEADERS: {
|
||||
'Content-Type': 'text/event-stream',
|
||||
},
|
||||
@@ -132,6 +133,7 @@ describe('copilot chat stream replay route', () => {
|
||||
)
|
||||
|
||||
const chunks = await readAllChunks(response)
|
||||
expect(chunks[0]).toBe(': accepted\n\n')
|
||||
expect(chunks.join('')).toContain(
|
||||
JSON.stringify({
|
||||
status: MothershipStreamV1CompletionStatus.cancelled,
|
||||
|
||||
@@ -19,6 +19,7 @@ import { getCopilotTracer, markSpanForError } from '@/lib/copilot/request/otel'
|
||||
import {
|
||||
checkForReplayGap,
|
||||
createEvent,
|
||||
encodeSSEComment,
|
||||
encodeSSEEnvelope,
|
||||
readEvents,
|
||||
readFilePreviewSessions,
|
||||
@@ -31,6 +32,7 @@ export const maxDuration = 3600
|
||||
|
||||
const logger = createLogger('CopilotChatStreamAPI')
|
||||
const POLL_INTERVAL_MS = 250
|
||||
const REPLAY_KEEPALIVE_INTERVAL_MS = 15_000
|
||||
const MAX_STREAM_MS = 60 * 60 * 1000
|
||||
|
||||
function extractCanonicalRequestId(value: unknown): string {
|
||||
@@ -266,6 +268,7 @@ async function handleResumeRequestBody({
|
||||
let controllerClosed = false
|
||||
let sawTerminalEvent = false
|
||||
let currentRequestId = extractRunRequestId(run)
|
||||
let lastWriteTime = Date.now()
|
||||
// Stamp the logical request id + chat id on the resume root as soon
|
||||
// as we resolve them from the run row, so TraceQL joins work on
|
||||
// resume legs the same way they do on the original POST.
|
||||
@@ -291,6 +294,19 @@ async function handleResumeRequestBody({
|
||||
if (controllerClosed) return false
|
||||
try {
|
||||
controller.enqueue(encodeSSEEnvelope(payload))
|
||||
lastWriteTime = Date.now()
|
||||
return true
|
||||
} catch {
|
||||
controllerClosed = true
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
const enqueueComment = (comment: string) => {
|
||||
if (controllerClosed) return false
|
||||
try {
|
||||
controller.enqueue(encodeSSEComment(comment))
|
||||
lastWriteTime = Date.now()
|
||||
return true
|
||||
} catch {
|
||||
controllerClosed = true
|
||||
@@ -306,7 +322,6 @@ async function handleResumeRequestBody({
|
||||
const flushEvents = async () => {
|
||||
const events = await readEvents(streamId, cursor)
|
||||
if (events.length > 0) {
|
||||
totalEventsFlushed += events.length
|
||||
logger.debug('[Resume] Flushing events', {
|
||||
streamId,
|
||||
afterCursor: cursor,
|
||||
@@ -314,14 +329,15 @@ async function handleResumeRequestBody({
|
||||
})
|
||||
}
|
||||
for (const envelope of events) {
|
||||
if (!enqueueEvent(envelope)) {
|
||||
break
|
||||
}
|
||||
totalEventsFlushed += 1
|
||||
cursor = envelope.stream.cursor ?? String(envelope.seq)
|
||||
currentRequestId = extractEnvelopeRequestId(envelope) || currentRequestId
|
||||
if (envelope.type === MothershipStreamV1EventType.complete) {
|
||||
sawTerminalEvent = true
|
||||
}
|
||||
if (!enqueueEvent(envelope)) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -341,21 +357,30 @@ async function handleResumeRequestBody({
|
||||
reason: options?.reason,
|
||||
requestId: currentRequestId,
|
||||
})) {
|
||||
if (!enqueueEvent(envelope)) {
|
||||
break
|
||||
}
|
||||
cursor = envelope.stream.cursor ?? String(envelope.seq)
|
||||
if (envelope.type === MothershipStreamV1EventType.complete) {
|
||||
sawTerminalEvent = true
|
||||
}
|
||||
if (!enqueueEvent(envelope)) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
enqueueComment('accepted')
|
||||
|
||||
const gap = await checkForReplayGap(streamId, afterCursor, currentRequestId)
|
||||
if (gap) {
|
||||
for (const envelope of gap.envelopes) {
|
||||
enqueueEvent(envelope)
|
||||
if (!enqueueEvent(envelope)) {
|
||||
break
|
||||
}
|
||||
cursor = envelope.stream.cursor ?? String(envelope.seq)
|
||||
currentRequestId = extractEnvelopeRequestId(envelope) || currentRequestId
|
||||
if (envelope.type === MothershipStreamV1EventType.complete) {
|
||||
sawTerminalEvent = true
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -408,6 +433,10 @@ async function handleResumeRequestBody({
|
||||
break
|
||||
}
|
||||
|
||||
if (Date.now() - lastWriteTime >= REPLAY_KEEPALIVE_INTERVAL_MS) {
|
||||
enqueueComment('keepalive')
|
||||
}
|
||||
|
||||
await sleep(POLL_INTERVAL_MS)
|
||||
}
|
||||
if (!controllerClosed && Date.now() - startTime >= MAX_STREAM_MS) {
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
'use client'
|
||||
|
||||
import { useCallback, useLayoutEffect, useRef } from 'react'
|
||||
import { useCallback, useLayoutEffect, useMemo, useRef } from 'react'
|
||||
import { cn } from '@/lib/core/utils/cn'
|
||||
import { MessageActions } from '@/app/workspace/[workspaceId]/components'
|
||||
import { ChatMessageAttachments } from '@/app/workspace/[workspaceId]/home/components/chat-message-attachments'
|
||||
@@ -22,6 +22,7 @@ import type {
|
||||
QueuedMessage,
|
||||
} from '@/app/workspace/[workspaceId]/home/types'
|
||||
import { useAutoScroll } from '@/hooks/use-auto-scroll'
|
||||
import { useProgressiveList } from '@/hooks/use-progressive-list'
|
||||
import type { ChatContext } from '@/stores/panel'
|
||||
import { MothershipChatSkeleton } from './mothership-chat-skeleton'
|
||||
|
||||
@@ -104,6 +105,21 @@ export function MothershipChat({
|
||||
scrollOnMount: true,
|
||||
})
|
||||
const hasMessages = messages.length > 0
|
||||
const stagingKey = chatId ?? 'pending-chat'
|
||||
const { staged: stagedMessages, isStaging } = useProgressiveList(messages, stagingKey)
|
||||
const stagedMessageCount = stagedMessages.length
|
||||
const stagedOffset = messages.length - stagedMessages.length
|
||||
const precedingUserContentByIndex = useMemo(() => {
|
||||
const contentByIndex: Array<string | undefined> = []
|
||||
let lastUserContent: string | undefined
|
||||
for (const [index, message] of messages.entries()) {
|
||||
contentByIndex[index] = lastUserContent
|
||||
if (message.role === 'user') {
|
||||
lastUserContent = message.content
|
||||
}
|
||||
}
|
||||
return contentByIndex
|
||||
}, [messages])
|
||||
const initialScrollDoneRef = useRef(false)
|
||||
const userInputRef = useRef<UserInputHandle>(null)
|
||||
const handleSendQueuedHead = useCallback(() => {
|
||||
@@ -134,6 +150,11 @@ export function MothershipChat({
|
||||
scrollToBottom()
|
||||
}, [hasMessages, initialScrollBlocked, scrollToBottom])
|
||||
|
||||
useLayoutEffect(() => {
|
||||
if (!isStaging || initialScrollBlocked || !initialScrollDoneRef.current) return
|
||||
scrollToBottom()
|
||||
}, [isStaging, stagedMessageCount, initialScrollBlocked, scrollToBottom])
|
||||
|
||||
return (
|
||||
<div className={cn('flex h-full min-h-0 flex-col', className)}>
|
||||
<div ref={scrollContainerRef} className={styles.scrollContainer}>
|
||||
@@ -141,7 +162,8 @@ export function MothershipChat({
|
||||
<MothershipChatSkeleton layout={layout} />
|
||||
) : (
|
||||
<div className={styles.content}>
|
||||
{messages.map((msg, index) => {
|
||||
{stagedMessages.map((msg, localIndex) => {
|
||||
const index = stagedOffset + localIndex
|
||||
if (msg.role === 'user') {
|
||||
const hasAttachments = Boolean(msg.attachments?.length)
|
||||
return (
|
||||
@@ -177,10 +199,7 @@ export function MothershipChat({
|
||||
}
|
||||
|
||||
const isLastMessage = index === messages.length - 1
|
||||
const precedingUserMsg = [...messages]
|
||||
.slice(0, index)
|
||||
.reverse()
|
||||
.find((m) => m.role === 'user')
|
||||
const precedingUserContent = precedingUserContentByIndex[index]
|
||||
|
||||
return (
|
||||
<div key={msg.id} className={styles.assistantRow}>
|
||||
@@ -196,7 +215,7 @@ export function MothershipChat({
|
||||
<MessageActions
|
||||
content={msg.content}
|
||||
chatId={chatId}
|
||||
userQuery={precedingUserMsg?.content}
|
||||
userQuery={precedingUserContent}
|
||||
requestId={msg.requestId}
|
||||
/>
|
||||
</div>
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
'use client'
|
||||
|
||||
import { useEffect, useRef, useState } from 'react'
|
||||
import { useEffect, useLayoutEffect, useRef, useState } from 'react'
|
||||
|
||||
interface ProgressiveListOptions {
|
||||
/** Number of items to render in the initial batch (most recent items) */
|
||||
@@ -14,15 +14,31 @@ const DEFAULTS = {
|
||||
batchSize: 5,
|
||||
} satisfies Required<ProgressiveListOptions>
|
||||
|
||||
interface ProgressiveListState {
|
||||
key: string
|
||||
count: number
|
||||
caughtUp: boolean
|
||||
}
|
||||
|
||||
function createInitialState(
|
||||
key: string,
|
||||
itemCount: number,
|
||||
initialBatch: number
|
||||
): ProgressiveListState {
|
||||
const count = Math.min(itemCount, initialBatch)
|
||||
return {
|
||||
key,
|
||||
count,
|
||||
caughtUp: itemCount > 0 && count >= itemCount,
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Progressively renders a list of items so that first paint is fast.
|
||||
*
|
||||
* On mount (or when `key` changes), only the most recent `initialBatch`
|
||||
* items are rendered. The rest are added in `batchSize` increments via
|
||||
* `requestAnimationFrame` so the browser never blocks on a large DOM mount.
|
||||
*
|
||||
* Once staging completes for a given key it never re-stages -- new items
|
||||
* appended to the list are rendered immediately.
|
||||
* `requestAnimationFrame`.
|
||||
*
|
||||
* @param items Full list of items to render.
|
||||
* @param key A session/conversation identifier. When it changes,
|
||||
@@ -35,67 +51,83 @@ export function useProgressiveList<T>(
|
||||
key: string,
|
||||
options?: ProgressiveListOptions
|
||||
): { staged: T[]; isStaging: boolean } {
|
||||
const initialBatch = options?.initialBatch ?? DEFAULTS.initialBatch
|
||||
const batchSize = options?.batchSize ?? DEFAULTS.batchSize
|
||||
const initialBatch = Math.max(0, options?.initialBatch ?? DEFAULTS.initialBatch)
|
||||
const batchSize = Math.max(1, options?.batchSize ?? DEFAULTS.batchSize)
|
||||
const [state, setState] = useState(() => createInitialState(key, items.length, initialBatch))
|
||||
const latestItemCountRef = useRef(items.length)
|
||||
|
||||
const completedKeysRef = useRef(new Set<string>())
|
||||
const prevKeyRef = useRef(key)
|
||||
const stagingCountRef = useRef(initialBatch)
|
||||
const [count, setCount] = useState(() => {
|
||||
if (items.length <= initialBatch) return items.length
|
||||
return initialBatch
|
||||
})
|
||||
useLayoutEffect(() => {
|
||||
latestItemCountRef.current = items.length
|
||||
}, [items.length])
|
||||
|
||||
const renderState =
|
||||
state.key === key && (state.count > 0 || items.length === 0 || state.caughtUp)
|
||||
? state
|
||||
: createInitialState(key, items.length, initialBatch)
|
||||
|
||||
useEffect(() => {
|
||||
if (completedKeysRef.current.has(key)) {
|
||||
setCount(items.length)
|
||||
return
|
||||
}
|
||||
|
||||
if (items.length <= initialBatch) {
|
||||
setCount(items.length)
|
||||
completedKeysRef.current.add(key)
|
||||
return
|
||||
}
|
||||
|
||||
let current = Math.max(stagingCountRef.current, initialBatch)
|
||||
setCount(current)
|
||||
|
||||
let frame: number | undefined
|
||||
|
||||
const step = () => {
|
||||
const total = items.length
|
||||
current = Math.min(total, current + batchSize)
|
||||
stagingCountRef.current = current
|
||||
setCount(current)
|
||||
if (current >= total) {
|
||||
completedKeysRef.current.add(key)
|
||||
frame = undefined
|
||||
return
|
||||
setState((prev) => {
|
||||
if (prev.key !== key) {
|
||||
return createInitialState(key, items.length, initialBatch)
|
||||
}
|
||||
frame = requestAnimationFrame(step)
|
||||
|
||||
if (items.length === 0) {
|
||||
if (prev.count === 0 && !prev.caughtUp) {
|
||||
return prev
|
||||
}
|
||||
return { key, count: 0, caughtUp: false }
|
||||
}
|
||||
|
||||
if (prev.caughtUp) {
|
||||
if (prev.count === items.length) {
|
||||
return prev
|
||||
}
|
||||
return { key, count: items.length, caughtUp: true }
|
||||
}
|
||||
|
||||
const minimumCount = Math.min(items.length, initialBatch)
|
||||
if (prev.count >= minimumCount && prev.count <= items.length) {
|
||||
return prev
|
||||
}
|
||||
|
||||
const count = Math.min(items.length, Math.max(prev.count, minimumCount))
|
||||
return {
|
||||
key,
|
||||
count,
|
||||
caughtUp: count >= items.length,
|
||||
}
|
||||
})
|
||||
}, [key, items.length, initialBatch])
|
||||
|
||||
useEffect(() => {
|
||||
if (state.key !== key || state.caughtUp || state.count >= items.length) {
|
||||
return
|
||||
}
|
||||
|
||||
frame = requestAnimationFrame(step)
|
||||
const frame = requestAnimationFrame(() => {
|
||||
setState((prev) => {
|
||||
if (prev.key !== key || prev.caughtUp) {
|
||||
return prev
|
||||
}
|
||||
|
||||
return () => {
|
||||
if (frame !== undefined) cancelAnimationFrame(frame)
|
||||
}
|
||||
}, [key, items.length, initialBatch, batchSize])
|
||||
const itemCount = latestItemCountRef.current
|
||||
const count = Math.min(itemCount, prev.count + batchSize)
|
||||
return {
|
||||
key,
|
||||
count,
|
||||
caughtUp: count >= itemCount,
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
let effectiveCount = count
|
||||
if (prevKeyRef.current !== key) {
|
||||
effectiveCount = items.length <= initialBatch ? items.length : initialBatch
|
||||
stagingCountRef.current = initialBatch
|
||||
}
|
||||
prevKeyRef.current = key
|
||||
return () => cancelAnimationFrame(frame)
|
||||
}, [state.key, state.count, state.caughtUp, key, items.length, batchSize])
|
||||
|
||||
const isCompleted = completedKeysRef.current.has(key)
|
||||
const isStaging = !isCompleted && effectiveCount < items.length
|
||||
const staged =
|
||||
isCompleted || effectiveCount >= items.length
|
||||
? items
|
||||
: items.slice(Math.max(0, items.length - effectiveCount))
|
||||
const effectiveCount = renderState.caughtUp
|
||||
? items.length
|
||||
: Math.min(renderState.count, items.length)
|
||||
const staged = items.slice(Math.max(0, items.length - effectiveCount))
|
||||
const isStaging = effectiveCount < items.length
|
||||
|
||||
return { staged, isStaging }
|
||||
}
|
||||
|
||||
@@ -194,6 +194,64 @@ describe('copilot go stream helpers', () => {
|
||||
)
|
||||
})
|
||||
|
||||
it('does not retry transient backend statuses because stream requests are not idempotent', async () => {
|
||||
vi.mocked(fetch).mockResolvedValueOnce(new Response('bad gateway', { status: 502 }))
|
||||
|
||||
const context = createStreamingContext()
|
||||
const execContext: ExecutionContext = {
|
||||
userId: 'user-1',
|
||||
workflowId: 'workflow-1',
|
||||
}
|
||||
|
||||
await expect(
|
||||
runStreamLoop('https://example.com/mothership/stream', {}, context, execContext, {
|
||||
timeout: 1000,
|
||||
})
|
||||
).rejects.toMatchObject({
|
||||
name: 'CopilotBackendError',
|
||||
status: 502,
|
||||
body: 'bad gateway',
|
||||
})
|
||||
|
||||
expect(fetch).toHaveBeenCalledTimes(1)
|
||||
})
|
||||
|
||||
it('does not retry non-transient backend statuses before the SSE stream opens', async () => {
|
||||
vi.mocked(fetch).mockResolvedValueOnce(new Response('limit reached', { status: 402 }))
|
||||
|
||||
const context = createStreamingContext()
|
||||
const execContext: ExecutionContext = {
|
||||
userId: 'user-1',
|
||||
workflowId: 'workflow-1',
|
||||
}
|
||||
|
||||
await expect(
|
||||
runStreamLoop('https://example.com/mothership/stream', {}, context, execContext, {
|
||||
timeout: 1000,
|
||||
})
|
||||
).rejects.toThrow('Usage limit reached')
|
||||
|
||||
expect(fetch).toHaveBeenCalledTimes(1)
|
||||
})
|
||||
|
||||
it('does not retry network errors because Go may already be executing the request', async () => {
|
||||
vi.mocked(fetch).mockRejectedValueOnce(new TypeError('fetch failed'))
|
||||
|
||||
const context = createStreamingContext()
|
||||
const execContext: ExecutionContext = {
|
||||
userId: 'user-1',
|
||||
workflowId: 'workflow-1',
|
||||
}
|
||||
|
||||
await expect(
|
||||
runStreamLoop('https://example.com/mothership/stream', {}, context, execContext, {
|
||||
timeout: 1000,
|
||||
})
|
||||
).rejects.toThrow('fetch failed')
|
||||
|
||||
expect(fetch).toHaveBeenCalledTimes(1)
|
||||
})
|
||||
|
||||
it('fails closed when the shared stream ends before a terminal event', async () => {
|
||||
const textEvent = createEvent({
|
||||
streamId: 'stream-1',
|
||||
|
||||
@@ -134,17 +134,27 @@ export async function runStreamLoop(
|
||||
requestBodyBytes,
|
||||
})
|
||||
const fetchStart = performance.now()
|
||||
const response = await fetchGo(fetchUrl, {
|
||||
...fetchOptions,
|
||||
signal: abortSignal,
|
||||
otelContext: options.otelContext,
|
||||
spanName: `sim → go ${pathname}`,
|
||||
operation: 'stream',
|
||||
attributes: {
|
||||
[TraceAttr.CopilotStream]: true,
|
||||
...(requestBodyBytes ? { [TraceAttr.HttpRequestContentLength]: requestBodyBytes } : {}),
|
||||
},
|
||||
})
|
||||
let response: Response
|
||||
try {
|
||||
response = await fetchGo(fetchUrl, {
|
||||
...fetchOptions,
|
||||
signal: abortSignal,
|
||||
otelContext: options.otelContext,
|
||||
spanName: `sim → go ${pathname}`,
|
||||
operation: 'stream',
|
||||
attributes: {
|
||||
[TraceAttr.CopilotStream]: true,
|
||||
...(requestBodyBytes ? { [TraceAttr.HttpRequestContentLength]: requestBodyBytes } : {}),
|
||||
},
|
||||
})
|
||||
} catch (error) {
|
||||
fetchSpan.attributes = {
|
||||
...(fetchSpan.attributes ?? {}),
|
||||
headersMs: Math.round(performance.now() - fetchStart),
|
||||
}
|
||||
context.trace.endSpan(fetchSpan, abortSignal?.aborted ? 'cancelled' : 'error')
|
||||
throw error
|
||||
}
|
||||
const headersElapsedMs = Math.round(performance.now() - fetchStart)
|
||||
fetchSpan.attributes = {
|
||||
...(fetchSpan.attributes ?? {}),
|
||||
|
||||
Reference in New Issue
Block a user