mirror of
https://github.com/simstudioai/sim.git
synced 2026-04-06 03:00:16 -04:00
Remove dead code
This commit is contained in:
@@ -5,17 +5,11 @@ import { and, desc, eq } from 'drizzle-orm'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { z } from 'zod'
|
||||
import { getSession } from '@/lib/auth'
|
||||
import { buildConversationHistory } from '@/lib/copilot/chat-context'
|
||||
import { resolveOrCreateChat } from '@/lib/copilot/chat-lifecycle'
|
||||
import { buildCopilotRequestPayload } from '@/lib/copilot/chat-payload'
|
||||
import { SIM_AGENT_API_URL } from '@/lib/copilot/constants'
|
||||
import { createSSEStream, requestChatTitle, SSE_RESPONSE_HEADERS } from '@/lib/copilot/chat-streaming'
|
||||
import { COPILOT_REQUEST_MODES } from '@/lib/copilot/models'
|
||||
import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator'
|
||||
import {
|
||||
createStreamEventWriter,
|
||||
resetStreamBuffer,
|
||||
setStreamMeta,
|
||||
} from '@/lib/copilot/orchestrator/stream-buffer'
|
||||
import {
|
||||
authenticateCopilotRequestSessionOnly,
|
||||
createBadRequestResponse,
|
||||
@@ -23,54 +17,10 @@ import {
|
||||
createRequestTracker,
|
||||
createUnauthorizedResponse,
|
||||
} from '@/lib/copilot/request-helpers'
|
||||
import { env } from '@/lib/core/config/env'
|
||||
import { resolveWorkflowIdForUser } from '@/lib/workflows/utils'
|
||||
|
||||
const logger = createLogger('CopilotChatAPI')
|
||||
|
||||
async function requestChatTitleFromCopilot(params: {
|
||||
message: string
|
||||
model: string
|
||||
provider?: string
|
||||
}): Promise<string | null> {
|
||||
const { message, model, provider } = params
|
||||
if (!message || !model) return null
|
||||
|
||||
const headers: Record<string, string> = {
|
||||
'Content-Type': 'application/json',
|
||||
}
|
||||
if (env.COPILOT_API_KEY) {
|
||||
headers['x-api-key'] = env.COPILOT_API_KEY
|
||||
}
|
||||
|
||||
try {
|
||||
const response = await fetch(`${SIM_AGENT_API_URL}/api/generate-chat-title`, {
|
||||
method: 'POST',
|
||||
headers,
|
||||
body: JSON.stringify({
|
||||
message,
|
||||
model,
|
||||
...(provider ? { provider } : {}),
|
||||
}),
|
||||
})
|
||||
|
||||
const payload = await response.json().catch(() => ({}))
|
||||
if (!response.ok) {
|
||||
logger.warn('Failed to generate chat title via copilot backend', {
|
||||
status: response.status,
|
||||
error: payload,
|
||||
})
|
||||
return null
|
||||
}
|
||||
|
||||
const title = typeof payload?.title === 'string' ? payload.title.trim() : ''
|
||||
return title || null
|
||||
} catch (error) {
|
||||
logger.error('Error generating chat title:', error)
|
||||
return null
|
||||
}
|
||||
}
|
||||
|
||||
const FileAttachmentSchema = z.object({
|
||||
id: z.string(),
|
||||
key: z.string(),
|
||||
@@ -247,8 +197,9 @@ export async function POST(req: NextRequest) {
|
||||
})
|
||||
currentChat = chatResult.chat
|
||||
actualChatId = chatResult.chatId || chatId
|
||||
const history = buildConversationHistory(chatResult.conversationHistory)
|
||||
conversationHistory = history.history
|
||||
conversationHistory = Array.isArray(chatResult.conversationHistory)
|
||||
? chatResult.conversationHistory
|
||||
: []
|
||||
}
|
||||
|
||||
const effectiveMode = mode === 'agent' ? 'build' : mode
|
||||
@@ -294,126 +245,28 @@ export async function POST(req: NextRequest) {
|
||||
} catch {}
|
||||
|
||||
if (stream) {
|
||||
const streamId = userMessageIdToUse
|
||||
let eventWriter: ReturnType<typeof createStreamEventWriter> | null = null
|
||||
let clientDisconnected = false
|
||||
const transformedStream = new ReadableStream({
|
||||
async start(controller) {
|
||||
const encoder = new TextEncoder()
|
||||
|
||||
await resetStreamBuffer(streamId)
|
||||
await setStreamMeta(streamId, { status: 'active', userId: authenticatedUserId })
|
||||
eventWriter = createStreamEventWriter(streamId)
|
||||
|
||||
const shouldFlushEvent = (event: Record<string, any>) =>
|
||||
event.type === 'tool_call' ||
|
||||
event.type === 'tool_result' ||
|
||||
event.type === 'tool_error' ||
|
||||
event.type === 'subagent_end' ||
|
||||
event.type === 'structured_result' ||
|
||||
event.type === 'subagent_result' ||
|
||||
event.type === 'done' ||
|
||||
event.type === 'error'
|
||||
|
||||
const pushEvent = async (event: Record<string, any>) => {
|
||||
if (!eventWriter) return
|
||||
const entry = await eventWriter.write(event)
|
||||
if (shouldFlushEvent(event)) {
|
||||
await eventWriter.flush()
|
||||
}
|
||||
const payload = {
|
||||
...event,
|
||||
eventId: entry.eventId,
|
||||
streamId,
|
||||
}
|
||||
try {
|
||||
if (!clientDisconnected) {
|
||||
controller.enqueue(encoder.encode(`data: ${JSON.stringify(payload)}\n\n`))
|
||||
}
|
||||
} catch {
|
||||
clientDisconnected = true
|
||||
await eventWriter.flush()
|
||||
}
|
||||
}
|
||||
|
||||
if (actualChatId) {
|
||||
await pushEvent({ type: 'chat_id', chatId: actualChatId })
|
||||
}
|
||||
|
||||
if (actualChatId && !currentChat?.title && conversationHistory.length === 0) {
|
||||
requestChatTitleFromCopilot({ message, model: selectedModel, provider })
|
||||
.then(async (title) => {
|
||||
if (title) {
|
||||
await db
|
||||
.update(copilotChats)
|
||||
.set({
|
||||
title,
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(copilotChats.id, actualChatId!))
|
||||
await pushEvent({ type: 'title_updated', title })
|
||||
}
|
||||
})
|
||||
.catch((error) => {
|
||||
logger.error(`[${tracker.requestId}] Title generation failed:`, error)
|
||||
})
|
||||
}
|
||||
|
||||
try {
|
||||
const result = await orchestrateCopilotStream(requestPayload, {
|
||||
userId: authenticatedUserId,
|
||||
workflowId,
|
||||
chatId: actualChatId,
|
||||
goRoute: '/api/copilot',
|
||||
autoExecuteTools: true,
|
||||
interactive: true,
|
||||
onEvent: async (event) => {
|
||||
await pushEvent(event)
|
||||
},
|
||||
})
|
||||
|
||||
if (currentChat) {
|
||||
await db
|
||||
.update(copilotChats)
|
||||
.set({ updatedAt: new Date() })
|
||||
.where(eq(copilotChats.id, actualChatId!))
|
||||
}
|
||||
await eventWriter.close()
|
||||
await setStreamMeta(streamId, { status: 'complete', userId: authenticatedUserId })
|
||||
} catch (error) {
|
||||
logger.error(`[${tracker.requestId}] Orchestration error:`, error)
|
||||
await eventWriter.close()
|
||||
await setStreamMeta(streamId, {
|
||||
status: 'error',
|
||||
userId: authenticatedUserId,
|
||||
error: error instanceof Error ? error.message : 'Stream error',
|
||||
})
|
||||
await pushEvent({
|
||||
type: 'error',
|
||||
data: {
|
||||
displayMessage: 'An unexpected error occurred while processing the response.',
|
||||
},
|
||||
})
|
||||
} finally {
|
||||
controller.close()
|
||||
}
|
||||
},
|
||||
async cancel() {
|
||||
clientDisconnected = true
|
||||
if (eventWriter) {
|
||||
await eventWriter.flush()
|
||||
}
|
||||
const sseStream = createSSEStream({
|
||||
requestPayload,
|
||||
userId: authenticatedUserId,
|
||||
streamId: userMessageIdToUse,
|
||||
chatId: actualChatId,
|
||||
currentChat,
|
||||
conversationHistory,
|
||||
message,
|
||||
titleModel: selectedModel,
|
||||
titleProvider: provider,
|
||||
requestId: tracker.requestId,
|
||||
orchestrateOptions: {
|
||||
userId: authenticatedUserId,
|
||||
workflowId,
|
||||
chatId: actualChatId,
|
||||
goRoute: '/api/copilot',
|
||||
autoExecuteTools: true,
|
||||
interactive: true,
|
||||
},
|
||||
})
|
||||
|
||||
return new Response(transformedStream, {
|
||||
headers: {
|
||||
'Content-Type': 'text/event-stream',
|
||||
'Cache-Control': 'no-cache',
|
||||
Connection: 'keep-alive',
|
||||
'X-Accel-Buffering': 'no',
|
||||
},
|
||||
})
|
||||
return new Response(sseStream, { headers: SSE_RESPONSE_HEADERS })
|
||||
}
|
||||
|
||||
const nonStreamingResult = await orchestrateCopilotStream(requestPayload, {
|
||||
@@ -472,7 +325,7 @@ export async function POST(req: NextRequest) {
|
||||
// Start title generation in parallel if this is first message (non-streaming)
|
||||
if (actualChatId && !currentChat.title && conversationHistory.length === 0) {
|
||||
logger.info(`[${tracker.requestId}] Starting title generation for non-streaming response`)
|
||||
requestChatTitleFromCopilot({ message, model: selectedModel, provider })
|
||||
requestChatTitle({ message, model: selectedModel, provider })
|
||||
.then(async (title) => {
|
||||
if (title) {
|
||||
await db
|
||||
|
||||
@@ -1,66 +1,18 @@
|
||||
import { db } from '@sim/db'
|
||||
import { copilotChats } from '@sim/db/schema'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { eq } from 'drizzle-orm'
|
||||
import { z } from 'zod'
|
||||
import { getSession } from '@/lib/auth'
|
||||
import { buildConversationHistory } from '@/lib/copilot/chat-context'
|
||||
import { resolveOrCreateChat } from '@/lib/copilot/chat-lifecycle'
|
||||
import { buildCopilotRequestPayload } from '@/lib/copilot/chat-payload'
|
||||
import { SIM_AGENT_API_URL } from '@/lib/copilot/constants'
|
||||
import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator'
|
||||
import {
|
||||
createStreamEventWriter,
|
||||
resetStreamBuffer,
|
||||
setStreamMeta,
|
||||
} from '@/lib/copilot/orchestrator/stream-buffer'
|
||||
import { createSSEStream, SSE_RESPONSE_HEADERS } from '@/lib/copilot/chat-streaming'
|
||||
import {
|
||||
createBadRequestResponse,
|
||||
createRequestTracker,
|
||||
createUnauthorizedResponse,
|
||||
} from '@/lib/copilot/request-helpers'
|
||||
import { env } from '@/lib/core/config/env'
|
||||
|
||||
const logger = createLogger('MothershipChatAPI')
|
||||
|
||||
async function requestChatTitleFromCopilot(params: {
|
||||
message: string
|
||||
model: string
|
||||
provider?: string
|
||||
}): Promise<string | null> {
|
||||
const { message, model, provider } = params
|
||||
if (!message || !model) return null
|
||||
|
||||
const headers: Record<string, string> = { 'Content-Type': 'application/json' }
|
||||
if (env.COPILOT_API_KEY) {
|
||||
headers['x-api-key'] = env.COPILOT_API_KEY
|
||||
}
|
||||
|
||||
try {
|
||||
const response = await fetch(`${SIM_AGENT_API_URL}/api/generate-chat-title`, {
|
||||
method: 'POST',
|
||||
headers,
|
||||
body: JSON.stringify({ message, model, ...(provider ? { provider } : {}) }),
|
||||
})
|
||||
|
||||
const payload = await response.json().catch(() => ({}))
|
||||
if (!response.ok) {
|
||||
logger.warn('Failed to generate chat title via copilot backend', {
|
||||
status: response.status,
|
||||
error: payload,
|
||||
})
|
||||
return null
|
||||
}
|
||||
|
||||
const title = typeof payload?.title === 'string' ? payload.title.trim() : ''
|
||||
return title || null
|
||||
} catch (error) {
|
||||
logger.error('Error generating chat title:', error)
|
||||
return null
|
||||
}
|
||||
}
|
||||
|
||||
const FileAttachmentSchema = z.object({
|
||||
id: z.string(),
|
||||
key: z.string(),
|
||||
@@ -157,8 +109,9 @@ export async function POST(req: NextRequest) {
|
||||
})
|
||||
currentChat = chatResult.chat
|
||||
actualChatId = chatResult.chatId || chatId
|
||||
const history = buildConversationHistory(chatResult.conversationHistory)
|
||||
conversationHistory = history.history
|
||||
conversationHistory = Array.isArray(chatResult.conversationHistory)
|
||||
? chatResult.conversationHistory
|
||||
: []
|
||||
}
|
||||
|
||||
const requestPayload = await buildCopilotRequestPayload(
|
||||
@@ -176,124 +129,27 @@ export async function POST(req: NextRequest) {
|
||||
{ selectedModel: '' }
|
||||
)
|
||||
|
||||
const streamId = userMessageId
|
||||
let eventWriter: ReturnType<typeof createStreamEventWriter> | null = null
|
||||
let clientDisconnected = false
|
||||
|
||||
const transformedStream = new ReadableStream({
|
||||
async start(controller) {
|
||||
const encoder = new TextEncoder()
|
||||
|
||||
await resetStreamBuffer(streamId)
|
||||
await setStreamMeta(streamId, { status: 'active', userId: authenticatedUserId })
|
||||
eventWriter = createStreamEventWriter(streamId)
|
||||
|
||||
const shouldFlushEvent = (event: Record<string, any>) =>
|
||||
event.type === 'tool_call' ||
|
||||
event.type === 'tool_result' ||
|
||||
event.type === 'tool_error' ||
|
||||
event.type === 'subagent_end' ||
|
||||
event.type === 'structured_result' ||
|
||||
event.type === 'subagent_result' ||
|
||||
event.type === 'done' ||
|
||||
event.type === 'error'
|
||||
|
||||
const pushEvent = async (event: Record<string, any>) => {
|
||||
if (!eventWriter) return
|
||||
const entry = await eventWriter.write(event)
|
||||
if (shouldFlushEvent(event)) {
|
||||
await eventWriter.flush()
|
||||
}
|
||||
const payload = {
|
||||
...event,
|
||||
eventId: entry.eventId,
|
||||
streamId,
|
||||
}
|
||||
try {
|
||||
if (!clientDisconnected) {
|
||||
controller.enqueue(encoder.encode(`data: ${JSON.stringify(payload)}\n\n`))
|
||||
}
|
||||
} catch {
|
||||
clientDisconnected = true
|
||||
await eventWriter.flush()
|
||||
}
|
||||
}
|
||||
|
||||
if (actualChatId) {
|
||||
await pushEvent({ type: 'chat_id', chatId: actualChatId })
|
||||
}
|
||||
|
||||
if (actualChatId && !currentChat?.title && conversationHistory.length === 0) {
|
||||
requestChatTitleFromCopilot({ message, model: 'claude-opus-4-5' })
|
||||
.then(async (title) => {
|
||||
if (title) {
|
||||
await db
|
||||
.update(copilotChats)
|
||||
.set({ title, updatedAt: new Date() })
|
||||
.where(eq(copilotChats.id, actualChatId!))
|
||||
await pushEvent({ type: 'title_updated', title })
|
||||
}
|
||||
})
|
||||
.catch((error) => {
|
||||
logger.error(`[${tracker.requestId}] Title generation failed:`, error)
|
||||
})
|
||||
}
|
||||
|
||||
try {
|
||||
const result = await orchestrateCopilotStream(requestPayload, {
|
||||
userId: authenticatedUserId,
|
||||
workspaceId,
|
||||
chatId: actualChatId,
|
||||
goRoute: '/api/mothership',
|
||||
autoExecuteTools: true,
|
||||
interactive: false,
|
||||
onEvent: async (event) => {
|
||||
await pushEvent(event)
|
||||
},
|
||||
})
|
||||
|
||||
if (currentChat) {
|
||||
await db
|
||||
.update(copilotChats)
|
||||
.set({ updatedAt: new Date() })
|
||||
.where(eq(copilotChats.id, actualChatId!))
|
||||
}
|
||||
await eventWriter.close()
|
||||
await setStreamMeta(streamId, { status: 'complete', userId: authenticatedUserId })
|
||||
} catch (error) {
|
||||
logger.error(`[${tracker.requestId}] Orchestration error:`, error)
|
||||
await eventWriter.close()
|
||||
await setStreamMeta(streamId, {
|
||||
status: 'error',
|
||||
userId: authenticatedUserId,
|
||||
error: error instanceof Error ? error.message : 'Stream error',
|
||||
})
|
||||
await pushEvent({
|
||||
type: 'error',
|
||||
data: {
|
||||
displayMessage: 'An unexpected error occurred while processing the response.',
|
||||
},
|
||||
})
|
||||
} finally {
|
||||
controller.close()
|
||||
}
|
||||
},
|
||||
async cancel() {
|
||||
clientDisconnected = true
|
||||
if (eventWriter) {
|
||||
await eventWriter.flush()
|
||||
}
|
||||
},
|
||||
})
|
||||
|
||||
return new Response(transformedStream, {
|
||||
headers: {
|
||||
'Content-Type': 'text/event-stream',
|
||||
'Cache-Control': 'no-cache',
|
||||
Connection: 'keep-alive',
|
||||
'X-Accel-Buffering': 'no',
|
||||
const stream = createSSEStream({
|
||||
requestPayload,
|
||||
userId: authenticatedUserId,
|
||||
streamId: userMessageId,
|
||||
chatId: actualChatId,
|
||||
currentChat,
|
||||
conversationHistory,
|
||||
message,
|
||||
titleModel: 'claude-opus-4-5',
|
||||
requestId: tracker.requestId,
|
||||
orchestrateOptions: {
|
||||
userId: authenticatedUserId,
|
||||
workspaceId,
|
||||
chatId: actualChatId,
|
||||
goRoute: '/api/mothership',
|
||||
autoExecuteTools: true,
|
||||
interactive: false,
|
||||
},
|
||||
})
|
||||
|
||||
return new Response(stream, { headers: SSE_RESPONSE_HEADERS })
|
||||
} catch (error) {
|
||||
if (error instanceof z.ZodError) {
|
||||
return NextResponse.json(
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { z } from 'zod'
|
||||
import { SIM_AGENT_VERSION } from '@/lib/copilot/constants'
|
||||
import { COPILOT_REQUEST_MODES } from '@/lib/copilot/models'
|
||||
import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator'
|
||||
import { resolveWorkflowIdForUser } from '@/lib/workflows/utils'
|
||||
@@ -75,8 +74,6 @@ export async function POST(req: NextRequest) {
|
||||
model: selectedModel,
|
||||
mode: transportMode,
|
||||
messageId: crypto.randomUUID(),
|
||||
version: SIM_AGENT_VERSION,
|
||||
headless: true,
|
||||
chatId,
|
||||
}
|
||||
|
||||
@@ -84,6 +81,7 @@ export async function POST(req: NextRequest) {
|
||||
userId: auth.userId,
|
||||
workflowId: resolved.workflowId,
|
||||
chatId,
|
||||
goRoute: '/api/mcp',
|
||||
autoExecuteTools: parsed.autoExecuteTools,
|
||||
timeout: parsed.timeout,
|
||||
interactive: false,
|
||||
|
||||
@@ -1,340 +0,0 @@
|
||||
'use client'
|
||||
|
||||
import { useCallback, useRef, useState } from 'react'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { MOTHERSHIP_CHAT_API_PATH } from '@/lib/copilot/constants'
|
||||
|
||||
const logger = createLogger('useWorkspaceChat')
|
||||
|
||||
export type ToolCallStatus = 'executing' | 'success' | 'error'
|
||||
|
||||
export interface ToolCallInfo {
|
||||
id: string
|
||||
name: string
|
||||
status: ToolCallStatus
|
||||
displayTitle?: string
|
||||
}
|
||||
|
||||
export type ContentBlockType = 'text' | 'tool_call' | 'subagent'
|
||||
|
||||
export interface ContentBlock {
|
||||
type: ContentBlockType
|
||||
content?: string
|
||||
toolCall?: ToolCallInfo
|
||||
}
|
||||
|
||||
export interface ChatMessage {
|
||||
id: string
|
||||
role: 'user' | 'assistant'
|
||||
content: string
|
||||
timestamp: string
|
||||
contentBlocks?: ContentBlock[]
|
||||
activeSubagent?: string | null
|
||||
}
|
||||
|
||||
interface UseWorkspaceChatProps {
|
||||
workspaceId: string
|
||||
}
|
||||
|
||||
interface UseWorkspaceChatReturn {
|
||||
messages: ChatMessage[]
|
||||
isSending: boolean
|
||||
error: string | null
|
||||
sendMessage: (message: string) => Promise<void>
|
||||
abortMessage: () => void
|
||||
clearMessages: () => void
|
||||
}
|
||||
|
||||
const SUBAGENT_LABELS: Record<string, string> = {
|
||||
build: 'Building',
|
||||
deploy: 'Deploying',
|
||||
auth: 'Connecting credentials',
|
||||
research: 'Researching',
|
||||
knowledge: 'Managing knowledge base',
|
||||
table: 'Managing tables',
|
||||
custom_tool: 'Creating tool',
|
||||
superagent: 'Executing action',
|
||||
plan: 'Planning',
|
||||
debug: 'Debugging',
|
||||
edit: 'Editing workflow',
|
||||
}
|
||||
|
||||
export function useWorkspaceChat({ workspaceId }: UseWorkspaceChatProps): UseWorkspaceChatReturn {
|
||||
const [messages, setMessages] = useState<ChatMessage[]>([])
|
||||
const [isSending, setIsSending] = useState(false)
|
||||
const [error, setError] = useState<string | null>(null)
|
||||
const abortControllerRef = useRef<AbortController | null>(null)
|
||||
const chatIdRef = useRef<string | undefined>(undefined)
|
||||
|
||||
const sendMessage = useCallback(
|
||||
async (message: string) => {
|
||||
if (!message.trim() || !workspaceId) return
|
||||
|
||||
setError(null)
|
||||
setIsSending(true)
|
||||
|
||||
const userMessage: ChatMessage = {
|
||||
id: crypto.randomUUID(),
|
||||
role: 'user',
|
||||
content: message,
|
||||
timestamp: new Date().toISOString(),
|
||||
}
|
||||
|
||||
const assistantMessage: ChatMessage = {
|
||||
id: crypto.randomUUID(),
|
||||
role: 'assistant',
|
||||
content: '',
|
||||
timestamp: new Date().toISOString(),
|
||||
contentBlocks: [],
|
||||
activeSubagent: null,
|
||||
}
|
||||
|
||||
setMessages((prev) => [...prev, userMessage, assistantMessage])
|
||||
|
||||
const abortController = new AbortController()
|
||||
abortControllerRef.current = abortController
|
||||
|
||||
const blocksRef: ContentBlock[] = []
|
||||
const toolCallMapRef = new Map<string, number>()
|
||||
|
||||
const ensureTextBlock = (): ContentBlock => {
|
||||
const last = blocksRef[blocksRef.length - 1]
|
||||
if (last && last.type === 'text') return last
|
||||
const newBlock: ContentBlock = { type: 'text', content: '' }
|
||||
blocksRef.push(newBlock)
|
||||
return newBlock
|
||||
}
|
||||
|
||||
const flushBlocks = (extra?: Partial<ChatMessage>) => {
|
||||
const fullText = blocksRef
|
||||
.filter((b) => b.type === 'text')
|
||||
.map((b) => b.content ?? '')
|
||||
.join('')
|
||||
setMessages((prev) =>
|
||||
prev.map((msg) =>
|
||||
msg.id === assistantMessage.id
|
||||
? {
|
||||
...msg,
|
||||
content: fullText,
|
||||
contentBlocks: [...blocksRef],
|
||||
...extra,
|
||||
}
|
||||
: msg
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
try {
|
||||
const response = await fetch(MOTHERSHIP_CHAT_API_PATH, {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({
|
||||
message,
|
||||
workspaceId,
|
||||
userMessageId: userMessage.id,
|
||||
createNewChat: !chatIdRef.current,
|
||||
...(chatIdRef.current ? { chatId: chatIdRef.current } : {}),
|
||||
}),
|
||||
signal: abortController.signal,
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
const errorData = await response.json().catch(() => ({}))
|
||||
throw new Error(errorData.error || `Request failed: ${response.status}`)
|
||||
}
|
||||
|
||||
if (!response.body) {
|
||||
throw new Error('No response body')
|
||||
}
|
||||
|
||||
const reader = response.body.getReader()
|
||||
const decoder = new TextDecoder()
|
||||
let buffer = ''
|
||||
|
||||
while (true) {
|
||||
const { done, value } = await reader.read()
|
||||
if (done) break
|
||||
|
||||
buffer += decoder.decode(value, { stream: true })
|
||||
|
||||
const lines = buffer.split('\n')
|
||||
buffer = lines.pop() || ''
|
||||
|
||||
for (const line of lines) {
|
||||
if (!line.startsWith('data: ')) continue
|
||||
|
||||
try {
|
||||
const event = JSON.parse(line.slice(6))
|
||||
|
||||
switch (event.type) {
|
||||
case 'chat_id': {
|
||||
if (event.chatId) {
|
||||
chatIdRef.current = event.chatId
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
case 'content': {
|
||||
if (event.content || event.data) {
|
||||
const chunk =
|
||||
typeof event.data === 'string' ? event.data : event.content || ''
|
||||
if (chunk) {
|
||||
const textBlock = ensureTextBlock()
|
||||
textBlock.content = (textBlock.content ?? '') + chunk
|
||||
flushBlocks()
|
||||
}
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
case 'tool_generating':
|
||||
case 'tool_call': {
|
||||
const toolCallId = event.toolCallId
|
||||
const toolName = event.toolName || event.data?.name || 'unknown'
|
||||
if (!toolCallId) break
|
||||
|
||||
const ui = event.ui || event.data?.ui
|
||||
const hidden = ui?.hidden
|
||||
if (hidden) break
|
||||
|
||||
const displayTitle = ui?.title || ui?.phaseLabel
|
||||
|
||||
if (!toolCallMapRef.has(toolCallId)) {
|
||||
const toolBlock: ContentBlock = {
|
||||
type: 'tool_call',
|
||||
toolCall: {
|
||||
id: toolCallId,
|
||||
name: toolName,
|
||||
status: 'executing',
|
||||
displayTitle,
|
||||
},
|
||||
}
|
||||
toolCallMapRef.set(toolCallId, blocksRef.length)
|
||||
blocksRef.push(toolBlock)
|
||||
} else {
|
||||
const idx = toolCallMapRef.get(toolCallId)!
|
||||
const existing = blocksRef[idx]
|
||||
if (existing.toolCall) {
|
||||
existing.toolCall.name = toolName
|
||||
if (displayTitle) existing.toolCall.displayTitle = displayTitle
|
||||
}
|
||||
}
|
||||
flushBlocks()
|
||||
break
|
||||
}
|
||||
|
||||
case 'tool_result': {
|
||||
const toolCallId = event.toolCallId || event.data?.id
|
||||
if (!toolCallId) break
|
||||
const idx = toolCallMapRef.get(toolCallId)
|
||||
if (idx !== undefined) {
|
||||
const block = blocksRef[idx]
|
||||
if (block.toolCall) {
|
||||
block.toolCall.status = event.success ? 'success' : 'error'
|
||||
}
|
||||
flushBlocks()
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
case 'tool_error': {
|
||||
const toolCallId = event.toolCallId || event.data?.id
|
||||
if (!toolCallId) break
|
||||
const idx = toolCallMapRef.get(toolCallId)
|
||||
if (idx !== undefined) {
|
||||
const block = blocksRef[idx]
|
||||
if (block.toolCall) {
|
||||
block.toolCall.status = 'error'
|
||||
}
|
||||
flushBlocks()
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
case 'subagent_start': {
|
||||
const subagentName = event.subagent || event.data?.agent
|
||||
if (subagentName) {
|
||||
const label = SUBAGENT_LABELS[subagentName] || subagentName
|
||||
const subBlock: ContentBlock = {
|
||||
type: 'subagent',
|
||||
content: label,
|
||||
}
|
||||
blocksRef.push(subBlock)
|
||||
flushBlocks({ activeSubagent: label })
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
case 'subagent_end': {
|
||||
flushBlocks({ activeSubagent: null })
|
||||
break
|
||||
}
|
||||
|
||||
case 'error': {
|
||||
setError(event.error || 'An error occurred')
|
||||
break
|
||||
}
|
||||
|
||||
case 'done': {
|
||||
if (event.content && typeof event.content === 'string') {
|
||||
setMessages((prev) =>
|
||||
prev.map((msg) =>
|
||||
msg.id === assistantMessage.id && !msg.content
|
||||
? { ...msg, content: event.content }
|
||||
: msg
|
||||
)
|
||||
)
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
// Skip malformed SSE lines
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
if (err instanceof Error && err.name === 'AbortError') {
|
||||
logger.info('Message aborted by user')
|
||||
return
|
||||
}
|
||||
|
||||
const errorMessage = err instanceof Error ? err.message : 'Failed to send message'
|
||||
logger.error('Failed to send workspace chat message', { error: errorMessage })
|
||||
setError(errorMessage)
|
||||
|
||||
setMessages((prev) =>
|
||||
prev.map((msg) =>
|
||||
msg.id === assistantMessage.id && !msg.content
|
||||
? { ...msg, content: 'Sorry, something went wrong. Please try again.' }
|
||||
: msg
|
||||
)
|
||||
)
|
||||
} finally {
|
||||
setIsSending(false)
|
||||
abortControllerRef.current = null
|
||||
}
|
||||
},
|
||||
[workspaceId]
|
||||
)
|
||||
|
||||
const abortMessage = useCallback(() => {
|
||||
abortControllerRef.current?.abort()
|
||||
setIsSending(false)
|
||||
}, [])
|
||||
|
||||
const clearMessages = useCallback(() => {
|
||||
setMessages([])
|
||||
setError(null)
|
||||
chatIdRef.current = undefined
|
||||
}, [])
|
||||
|
||||
return {
|
||||
messages,
|
||||
isSending,
|
||||
error,
|
||||
sendMessage,
|
||||
abortMessage,
|
||||
clearMessages,
|
||||
}
|
||||
}
|
||||
@@ -1,341 +0,0 @@
|
||||
# Copilot SSE Client Integration Guide
|
||||
|
||||
How to consume the copilot SSE stream from any client UI.
|
||||
|
||||
## Endpoint
|
||||
|
||||
```
|
||||
POST /api/copilot/chat
|
||||
Content-Type: application/json
|
||||
```
|
||||
|
||||
### Request body
|
||||
|
||||
| Field | Type | Required | Description |
|
||||
|---------------|----------|----------|-------------|
|
||||
| `message` | string | yes | User message |
|
||||
| `workspaceId` | string | yes* | Workspace scope (required when no `workflowId`) |
|
||||
| `workflowId` | string | no | Workflow scope — when set, copilot operates on this workflow |
|
||||
| `chatId` | string | no | Continue an existing conversation |
|
||||
| `createNewChat` | boolean | no | Create a new persisted chat session |
|
||||
| `stream` | boolean | no | Default `true`. Set to get SSE stream |
|
||||
| `model` | string | no | Model ID (default: `claude-opus-4-5`) |
|
||||
| `mode` | string | no | `agent` / `ask` / `plan` |
|
||||
| `headless` | boolean | no | Skip interactive confirmation for all tools |
|
||||
|
||||
*Either `workflowId` or `workspaceId` must be provided. When only `workspaceId` is sent, the copilot runs in workspace mode (no workflow context).
|
||||
|
||||
### Response
|
||||
|
||||
`Content-Type: text/event-stream` — each line is `data: <JSON>\n\n`.
|
||||
|
||||
---
|
||||
|
||||
## SSE Event Types
|
||||
|
||||
Every event has a `type` field. The `state` field (when present) is the authoritative tool call lifecycle state set by the Go backend — clients should use it directly without deriving state from other fields.
|
||||
|
||||
### Session events
|
||||
|
||||
#### `chat_id`
|
||||
Emitted once at the start. Store this to continue the conversation.
|
||||
```json
|
||||
{ "type": "chat_id", "chatId": "uuid" }
|
||||
```
|
||||
|
||||
#### `title_updated`
|
||||
Chat title was generated asynchronously.
|
||||
```json
|
||||
{ "type": "title_updated", "title": "My chat title" }
|
||||
```
|
||||
|
||||
### Content events
|
||||
|
||||
#### `content`
|
||||
Streamed text chunks from the assistant. Append to the current text block.
|
||||
```json
|
||||
{ "type": "content", "data": "Hello, " }
|
||||
```
|
||||
May also appear as `{ "type": "content", "content": "Hello, " }`. Check `data` first, fall back to `content`.
|
||||
|
||||
#### `reasoning`
|
||||
Model thinking/reasoning content (if the model supports it). Render in a collapsible "thinking" block.
|
||||
```json
|
||||
{ "type": "reasoning", "content": "Let me think about...", "phase": "thinking" }
|
||||
```
|
||||
|
||||
### Tool call lifecycle
|
||||
|
||||
Tools follow this lifecycle: `generating → pending|executing → success|error|rejected`.
|
||||
|
||||
The `state` field on each event tells you exactly what to render.
|
||||
|
||||
#### `tool_generating`
|
||||
The model is streaming the tool call arguments. Create a placeholder block.
|
||||
```json
|
||||
{
|
||||
"type": "tool_generating",
|
||||
"state": "generating",
|
||||
"toolCallId": "toolu_abc123",
|
||||
"toolName": "edit_workflow",
|
||||
"ui": {
|
||||
"title": "Editing workflow",
|
||||
"icon": "pencil",
|
||||
"phaseLabel": "Build"
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
#### `tool_call`
|
||||
Arguments are finalized. The `state` tells you what to render:
|
||||
- `"pending"` — user approval required. Show Allow/Skip buttons.
|
||||
- `"executing"` — tool is running. Show spinner.
|
||||
|
||||
```json
|
||||
{
|
||||
"type": "tool_call",
|
||||
"state": "pending",
|
||||
"toolCallId": "toolu_abc123",
|
||||
"toolName": "deploy_api",
|
||||
"data": { "id": "toolu_abc123", "name": "deploy_api", "arguments": { ... } },
|
||||
"ui": {
|
||||
"title": "Deploying API",
|
||||
"icon": "rocket",
|
||||
"requiresConfirmation": true,
|
||||
"clientExecutable": false,
|
||||
"hidden": false,
|
||||
"internal": false
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**Partial tool calls** (argument streaming): `tool_call` events with `data.partial: true` have no `state` field. Keep the current state, just update arguments for display.
|
||||
|
||||
#### `tool_result`
|
||||
Tool execution completed.
|
||||
```json
|
||||
{
|
||||
"type": "tool_result",
|
||||
"state": "success",
|
||||
"toolCallId": "toolu_abc123",
|
||||
"success": true,
|
||||
"result": { ... }
|
||||
}
|
||||
```
|
||||
`state` will be `"success"`, `"error"`, or `"rejected"`.
|
||||
|
||||
#### `tool_error`
|
||||
Tool execution failed (error on the Sim server side, not from Go).
|
||||
```json
|
||||
{
|
||||
"type": "tool_error",
|
||||
"state": "error",
|
||||
"toolCallId": "toolu_abc123",
|
||||
"error": "Connection timeout"
|
||||
}
|
||||
```
|
||||
|
||||
### Subagent events
|
||||
|
||||
Subagents are specialized agents (build, deploy, auth, research, knowledge, table, etc.) that handle complex tasks. Their events are scoped by a parent tool call.
|
||||
|
||||
#### `subagent_start`
|
||||
A subagent session started. All subsequent events with `"subagent": "<name>"` belong to this session.
|
||||
```json
|
||||
{
|
||||
"type": "subagent_start",
|
||||
"subagent": "build",
|
||||
"data": { "tool_call_id": "toolu_parent123" }
|
||||
}
|
||||
```
|
||||
Render a label like "Building..." under the parent tool call.
|
||||
|
||||
#### `subagent_end`
|
||||
Subagent session completed.
|
||||
```json
|
||||
{ "type": "subagent_end", "subagent": "build" }
|
||||
```
|
||||
|
||||
#### Nested events
|
||||
While a subagent is active, you'll receive `content`, `tool_generating`, `tool_call`, `tool_result`, etc. with `"subagent": "build"` on them. These are the subagent's own tool calls and text, nested under the parent.
|
||||
|
||||
### Terminal events
|
||||
|
||||
#### `done`
|
||||
Stream completed. May include final content.
|
||||
```json
|
||||
{ "type": "done", "success": true, "content": "..." }
|
||||
```
|
||||
|
||||
#### `error`
|
||||
Fatal stream error.
|
||||
```json
|
||||
{ "type": "error", "error": "An unexpected error occurred" }
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## UI Metadata (`ui` field)
|
||||
|
||||
Present on `tool_generating` and `tool_call` events. Use for rendering:
|
||||
|
||||
| Field | Type | Description |
|
||||
|------------------------|---------|-------------|
|
||||
| `title` | string | Human-readable title (e.g., "Editing workflow") |
|
||||
| `phaseLabel` | string | Category label (e.g., "Build", "Deploy") |
|
||||
| `icon` | string | Icon identifier |
|
||||
| `requiresConfirmation` | boolean | If `true`, show approval UI (Allow/Skip) |
|
||||
| `clientExecutable` | boolean | If `true`, tool should execute on the client (e.g., `run_workflow`) |
|
||||
| `internal` | boolean | If `true`, this is an internal tool (subagent trigger). Skip rendering |
|
||||
| `hidden` | boolean | If `true`, don't render this tool call at all |
|
||||
|
||||
---
|
||||
|
||||
## Tool Call State Machine
|
||||
|
||||
```
|
||||
tool_generating → state: "generating" → Show placeholder with spinner
|
||||
tool_call → state: "pending" → Show Allow/Skip buttons
|
||||
tool_call → state: "executing" → Show spinner
|
||||
tool_result → state: "success" → Show checkmark
|
||||
tool_result → state: "error" → Show error icon
|
||||
tool_result → state: "rejected" → Show skipped/rejected
|
||||
tool_error → state: "error" → Show error icon
|
||||
```
|
||||
|
||||
Client-only states (not from SSE, managed locally):
|
||||
- `background` — tool running in background (client UX decision)
|
||||
- `aborted` — user aborted the stream
|
||||
- `review` — client wants user to review result
|
||||
|
||||
---
|
||||
|
||||
## Handling User Confirmation
|
||||
|
||||
When a tool arrives with `state: "pending"`:
|
||||
|
||||
1. Render Allow/Skip buttons
|
||||
2. On Allow: `POST /api/copilot/confirm` with `{ toolCallId, status: "accepted" }`
|
||||
3. On Skip: `POST /api/copilot/confirm` with `{ toolCallId, status: "rejected" }`
|
||||
4. Optimistically update to `executing` / `rejected`
|
||||
5. The next SSE event (`tool_result`) will confirm the final state
|
||||
|
||||
For `clientExecutable` tools (e.g., `run_workflow`): after accepting, the client must execute the tool locally and report the result via `POST /api/copilot/confirm` with `{ toolCallId, status: "success"|"error", data: { ... } }`.
|
||||
|
||||
---
|
||||
|
||||
## Identifying Tool Categories
|
||||
|
||||
Use the `toolName` and `ui` metadata to determine what the tool does. Common patterns:
|
||||
|
||||
| Tool name pattern | Category | What to render |
|
||||
|--------------------------|-------------------|----------------|
|
||||
| `edit_workflow` | Workflow editing | Diff preview, block changes |
|
||||
| `deploy_*`, `redeploy` | Deployment | Deploy status |
|
||||
| `user_table` | Table management | Table creation/query results |
|
||||
| `knowledge_base` | Knowledge bases | KB operations |
|
||||
| `run_workflow`, `run_block` | Execution | Execution results (client-executable) |
|
||||
| `read`, `glob`, `grep` | VFS | File browser (often `hidden`) |
|
||||
| `search_documentation` | Research | Doc search results |
|
||||
| `navigate_ui` | Navigation | UI navigation command |
|
||||
|
||||
### Structured results
|
||||
|
||||
The `structured_result` event carries rich data that tools return. The `subagent_result` event similarly carries subagent completion data. Parse `result` / `data` to render tables, KB entries, deployment URLs, etc.
|
||||
|
||||
```json
|
||||
{
|
||||
"type": "structured_result",
|
||||
"data": {
|
||||
"action": "table_created",
|
||||
"tables": [{ "id": "tbl_...", "name": "tasks" }],
|
||||
"success": true
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Minimal Client Example
|
||||
|
||||
```typescript
|
||||
const response = await fetch('/api/copilot/chat', {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({
|
||||
message: 'Create a tasks table',
|
||||
workspaceId: 'ws_123',
|
||||
stream: true,
|
||||
createNewChat: true,
|
||||
}),
|
||||
})
|
||||
|
||||
const reader = response.body.getReader()
|
||||
const decoder = new TextDecoder()
|
||||
let buffer = ''
|
||||
|
||||
while (true) {
|
||||
const { done, value } = await reader.read()
|
||||
if (done) break
|
||||
|
||||
buffer += decoder.decode(value, { stream: true })
|
||||
const lines = buffer.split('\n')
|
||||
buffer = lines.pop() || ''
|
||||
|
||||
for (const line of lines) {
|
||||
if (!line.startsWith('data: ')) continue
|
||||
const event = JSON.parse(line.slice(6))
|
||||
|
||||
switch (event.type) {
|
||||
case 'chat_id':
|
||||
// Store event.chatId for follow-up messages
|
||||
break
|
||||
case 'content':
|
||||
// Append event.data || event.content to text
|
||||
break
|
||||
case 'tool_generating':
|
||||
case 'tool_call':
|
||||
if (event.ui?.hidden) break
|
||||
// Create/update tool block using event.state
|
||||
// If event.state === 'pending', show approval buttons
|
||||
break
|
||||
case 'tool_result':
|
||||
case 'tool_error':
|
||||
// Update tool block with event.state
|
||||
break
|
||||
case 'subagent_start':
|
||||
// Show subagent activity label
|
||||
break
|
||||
case 'subagent_end':
|
||||
// Clear subagent label
|
||||
break
|
||||
case 'done':
|
||||
// Stream complete
|
||||
break
|
||||
case 'error':
|
||||
// Show error
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Subagent Labels
|
||||
|
||||
Map subagent IDs to display labels:
|
||||
|
||||
| Subagent ID | Display label |
|
||||
|----------------|---------------|
|
||||
| `build` | Building |
|
||||
| `deploy` | Deploying |
|
||||
| `auth` | Connecting credentials |
|
||||
| `research` | Researching |
|
||||
| `knowledge` | Managing knowledge base |
|
||||
| `table` | Managing tables |
|
||||
| `custom_tool` | Creating tool |
|
||||
| `superagent` | Executing action |
|
||||
| `plan` | Planning |
|
||||
| `debug` | Debugging |
|
||||
| `edit` | Editing workflow |
|
||||
@@ -4,13 +4,6 @@ import { createFileContent } from '@/lib/uploads/utils/file-utils'
|
||||
|
||||
const logger = createLogger('CopilotChatContext')
|
||||
|
||||
/**
|
||||
* Build conversation history from stored chat messages.
|
||||
*/
|
||||
export function buildConversationHistory(messages: unknown[]): { history: unknown[] } {
|
||||
return { history: Array.isArray(messages) ? messages : [] }
|
||||
}
|
||||
|
||||
export interface FileAttachmentInput {
|
||||
id: string
|
||||
key: string
|
||||
|
||||
193
apps/sim/lib/copilot/chat-streaming.ts
Normal file
193
apps/sim/lib/copilot/chat-streaming.ts
Normal file
@@ -0,0 +1,193 @@
|
||||
import { db } from '@sim/db'
|
||||
import { copilotChats } from '@sim/db/schema'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { eq } from 'drizzle-orm'
|
||||
import { SIM_AGENT_API_URL } from '@/lib/copilot/constants'
|
||||
import type { OrchestrateStreamOptions } from '@/lib/copilot/orchestrator'
|
||||
import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator'
|
||||
import {
|
||||
createStreamEventWriter,
|
||||
resetStreamBuffer,
|
||||
setStreamMeta,
|
||||
} from '@/lib/copilot/orchestrator/stream-buffer'
|
||||
import { env } from '@/lib/core/config/env'
|
||||
|
||||
const logger = createLogger('CopilotChatStreaming')
|
||||
|
||||
const FLUSH_EVENT_TYPES = new Set([
|
||||
'tool_call',
|
||||
'tool_result',
|
||||
'tool_error',
|
||||
'subagent_end',
|
||||
'structured_result',
|
||||
'subagent_result',
|
||||
'done',
|
||||
'error',
|
||||
])
|
||||
|
||||
export async function requestChatTitle(params: {
|
||||
message: string
|
||||
model: string
|
||||
provider?: string
|
||||
}): Promise<string | null> {
|
||||
const { message, model, provider } = params
|
||||
if (!message || !model) return null
|
||||
|
||||
const headers: Record<string, string> = { 'Content-Type': 'application/json' }
|
||||
if (env.COPILOT_API_KEY) {
|
||||
headers['x-api-key'] = env.COPILOT_API_KEY
|
||||
}
|
||||
|
||||
try {
|
||||
const response = await fetch(`${SIM_AGENT_API_URL}/api/generate-chat-title`, {
|
||||
method: 'POST',
|
||||
headers,
|
||||
body: JSON.stringify({ message, model, ...(provider ? { provider } : {}) }),
|
||||
})
|
||||
|
||||
const payload = await response.json().catch(() => ({}))
|
||||
if (!response.ok) {
|
||||
logger.warn('Failed to generate chat title via copilot backend', {
|
||||
status: response.status,
|
||||
error: payload,
|
||||
})
|
||||
return null
|
||||
}
|
||||
|
||||
const title = typeof payload?.title === 'string' ? payload.title.trim() : ''
|
||||
return title || null
|
||||
} catch (error) {
|
||||
logger.error('Error generating chat title:', error)
|
||||
return null
|
||||
}
|
||||
}
|
||||
|
||||
export interface StreamingOrchestrationParams {
|
||||
requestPayload: Record<string, unknown>
|
||||
userId: string
|
||||
streamId: string
|
||||
chatId?: string
|
||||
currentChat: any
|
||||
conversationHistory: unknown[]
|
||||
message: string
|
||||
titleModel: string
|
||||
titleProvider?: string
|
||||
requestId: string
|
||||
orchestrateOptions: Omit<OrchestrateStreamOptions, 'onEvent'>
|
||||
}
|
||||
|
||||
export function createSSEStream(params: StreamingOrchestrationParams): ReadableStream {
|
||||
const {
|
||||
requestPayload,
|
||||
userId,
|
||||
streamId,
|
||||
chatId,
|
||||
currentChat,
|
||||
conversationHistory,
|
||||
message,
|
||||
titleModel,
|
||||
titleProvider,
|
||||
requestId,
|
||||
orchestrateOptions,
|
||||
} = params
|
||||
|
||||
let eventWriter: ReturnType<typeof createStreamEventWriter> | null = null
|
||||
let clientDisconnected = false
|
||||
|
||||
return new ReadableStream({
|
||||
async start(controller) {
|
||||
const encoder = new TextEncoder()
|
||||
|
||||
await resetStreamBuffer(streamId)
|
||||
await setStreamMeta(streamId, { status: 'active', userId })
|
||||
eventWriter = createStreamEventWriter(streamId)
|
||||
|
||||
const pushEvent = async (event: Record<string, any>) => {
|
||||
if (!eventWriter) return
|
||||
const entry = await eventWriter.write(event)
|
||||
if (FLUSH_EVENT_TYPES.has(event.type)) {
|
||||
await eventWriter.flush()
|
||||
}
|
||||
try {
|
||||
if (!clientDisconnected) {
|
||||
controller.enqueue(
|
||||
encoder.encode(
|
||||
`data: ${JSON.stringify({ ...event, eventId: entry.eventId, streamId })}\n\n`
|
||||
)
|
||||
)
|
||||
}
|
||||
} catch {
|
||||
clientDisconnected = true
|
||||
await eventWriter.flush()
|
||||
}
|
||||
}
|
||||
|
||||
if (chatId) {
|
||||
await pushEvent({ type: 'chat_id', chatId })
|
||||
}
|
||||
|
||||
if (chatId && !currentChat?.title && conversationHistory.length === 0) {
|
||||
requestChatTitle({ message, model: titleModel, provider: titleProvider })
|
||||
.then(async (title) => {
|
||||
if (title) {
|
||||
await db
|
||||
.update(copilotChats)
|
||||
.set({ title, updatedAt: new Date() })
|
||||
.where(eq(copilotChats.id, chatId!))
|
||||
await pushEvent({ type: 'title_updated', title })
|
||||
}
|
||||
})
|
||||
.catch((error) => {
|
||||
logger.error(`[${requestId}] Title generation failed:`, error)
|
||||
})
|
||||
}
|
||||
|
||||
try {
|
||||
await orchestrateCopilotStream(requestPayload, {
|
||||
...orchestrateOptions,
|
||||
onEvent: async (event) => {
|
||||
await pushEvent(event)
|
||||
},
|
||||
})
|
||||
|
||||
if (currentChat) {
|
||||
await db
|
||||
.update(copilotChats)
|
||||
.set({ updatedAt: new Date() })
|
||||
.where(eq(copilotChats.id, chatId!))
|
||||
}
|
||||
await eventWriter.close()
|
||||
await setStreamMeta(streamId, { status: 'complete', userId })
|
||||
} catch (error) {
|
||||
logger.error(`[${requestId}] Orchestration error:`, error)
|
||||
await eventWriter.close()
|
||||
await setStreamMeta(streamId, {
|
||||
status: 'error',
|
||||
userId,
|
||||
error: error instanceof Error ? error.message : 'Stream error',
|
||||
})
|
||||
await pushEvent({
|
||||
type: 'error',
|
||||
data: {
|
||||
displayMessage: 'An unexpected error occurred while processing the response.',
|
||||
},
|
||||
})
|
||||
} finally {
|
||||
controller.close()
|
||||
}
|
||||
},
|
||||
async cancel() {
|
||||
clientDisconnected = true
|
||||
if (eventWriter) {
|
||||
await eventWriter.flush()
|
||||
}
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
export const SSE_RESPONSE_HEADERS = {
|
||||
'Content-Type': 'text/event-stream',
|
||||
'Cache-Control': 'no-cache',
|
||||
Connection: 'keep-alive',
|
||||
'X-Accel-Buffering': 'no',
|
||||
} as const
|
||||
Reference in New Issue
Block a user