mirror of
https://github.com/simstudioai/sim.git
synced 2026-02-11 07:04:58 -05:00
* v0 * v1 * Basic ss tes * Ss tests * Stuff * Add mcp * mcp v1 * Improvement * Fix * BROKEN * Checkpoint * Streaming * Fix abort * Things are broken * Streaming seems to work but copilot is dumb * Fix edge issue * LUAAAA * Fix stream buffer * Fix lint * Checkpoint * Initial temp state, in the middle of a refactor * Initial test shows diff store still working * Tool refactor * First cleanup pass complete - untested * Continued cleanup * Refactor * Refactor complete - no testing yet * Fix - cursor makes me sad * Fix mcp * Clean up mcp * Updated mcp * Add respond to subagents * Fix definitions * Add tools * Add tools * Add copilot mcp tracking * Fix lint * Fix mcp * Fix * Updates * Clean up mcp * Fix copilot mcp tool names to be sim prefixed * Add opus 4.6 * Fix discovery tool * Fix * Remove logs * Fix go side tool rendering * Update docs * Fix hydration * Fix tool call resolution * Fix * Fix lint * Fix superagent and autoallow integrations * Fix always allow * Update block * Remove plan docs * Fix hardcoded ff * Fix dropped provider * Fix lint * Fix tests * Fix dead messages array * Fix discovery * Fix run workflow * Fix run block * Fix run from block in copilot * Fix lint * Fix skip and mtb * Fix typing * Fix tool call * Bump api version * Fix bun lock * Nuke bad files
131 lines
3.9 KiB
TypeScript
131 lines
3.9 KiB
TypeScript
import { createLogger } from '@sim/logger'
|
|
import { type NextRequest, NextResponse } from 'next/server'
|
|
import {
|
|
getStreamMeta,
|
|
readStreamEvents,
|
|
type StreamMeta,
|
|
} from '@/lib/copilot/orchestrator/stream-buffer'
|
|
import { authenticateCopilotRequestSessionOnly } from '@/lib/copilot/request-helpers'
|
|
import { SSE_HEADERS } from '@/lib/core/utils/sse'
|
|
|
|
const logger = createLogger('CopilotChatStreamAPI')
|
|
const POLL_INTERVAL_MS = 250
|
|
const MAX_STREAM_MS = 10 * 60 * 1000
|
|
|
|
function encodeEvent(event: Record<string, any>): Uint8Array {
|
|
return new TextEncoder().encode(`data: ${JSON.stringify(event)}\n\n`)
|
|
}
|
|
|
|
export async function GET(request: NextRequest) {
|
|
const { userId: authenticatedUserId, isAuthenticated } =
|
|
await authenticateCopilotRequestSessionOnly()
|
|
|
|
if (!isAuthenticated || !authenticatedUserId) {
|
|
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
|
|
}
|
|
|
|
const url = new URL(request.url)
|
|
const streamId = url.searchParams.get('streamId') || ''
|
|
const fromParam = url.searchParams.get('from') || '0'
|
|
const fromEventId = Number(fromParam || 0)
|
|
// If batch=true, return buffered events as JSON instead of SSE
|
|
const batchMode = url.searchParams.get('batch') === 'true'
|
|
const toParam = url.searchParams.get('to')
|
|
const toEventId = toParam ? Number(toParam) : undefined
|
|
|
|
if (!streamId) {
|
|
return NextResponse.json({ error: 'streamId is required' }, { status: 400 })
|
|
}
|
|
|
|
const meta = (await getStreamMeta(streamId)) as StreamMeta | null
|
|
logger.info('[Resume] Stream lookup', {
|
|
streamId,
|
|
fromEventId,
|
|
toEventId,
|
|
batchMode,
|
|
hasMeta: !!meta,
|
|
metaStatus: meta?.status,
|
|
})
|
|
if (!meta) {
|
|
return NextResponse.json({ error: 'Stream not found' }, { status: 404 })
|
|
}
|
|
if (meta.userId && meta.userId !== authenticatedUserId) {
|
|
return NextResponse.json({ error: 'Unauthorized' }, { status: 403 })
|
|
}
|
|
|
|
// Batch mode: return all buffered events as JSON
|
|
if (batchMode) {
|
|
const events = await readStreamEvents(streamId, fromEventId)
|
|
const filteredEvents = toEventId ? events.filter((e) => e.eventId <= toEventId) : events
|
|
logger.info('[Resume] Batch response', {
|
|
streamId,
|
|
fromEventId,
|
|
toEventId,
|
|
eventCount: filteredEvents.length,
|
|
})
|
|
return NextResponse.json({
|
|
success: true,
|
|
events: filteredEvents,
|
|
status: meta.status,
|
|
})
|
|
}
|
|
|
|
const startTime = Date.now()
|
|
|
|
const stream = new ReadableStream({
|
|
async start(controller) {
|
|
let lastEventId = Number.isFinite(fromEventId) ? fromEventId : 0
|
|
|
|
const flushEvents = async () => {
|
|
const events = await readStreamEvents(streamId, lastEventId)
|
|
if (events.length > 0) {
|
|
logger.info('[Resume] Flushing events', {
|
|
streamId,
|
|
fromEventId: lastEventId,
|
|
eventCount: events.length,
|
|
})
|
|
}
|
|
for (const entry of events) {
|
|
lastEventId = entry.eventId
|
|
const payload = {
|
|
...entry.event,
|
|
eventId: entry.eventId,
|
|
streamId: entry.streamId,
|
|
}
|
|
controller.enqueue(encodeEvent(payload))
|
|
}
|
|
}
|
|
|
|
try {
|
|
await flushEvents()
|
|
|
|
while (Date.now() - startTime < MAX_STREAM_MS) {
|
|
const currentMeta = await getStreamMeta(streamId)
|
|
if (!currentMeta) break
|
|
|
|
await flushEvents()
|
|
|
|
if (currentMeta.status === 'complete' || currentMeta.status === 'error') {
|
|
break
|
|
}
|
|
|
|
if (request.signal.aborted) {
|
|
break
|
|
}
|
|
|
|
await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL_MS))
|
|
}
|
|
} catch (error) {
|
|
logger.warn('Stream replay failed', {
|
|
streamId,
|
|
error: error instanceof Error ? error.message : String(error),
|
|
})
|
|
} finally {
|
|
controller.close()
|
|
}
|
|
},
|
|
})
|
|
|
|
return new Response(stream, { headers: SSE_HEADERS })
|
|
}
|