mirror of
https://github.com/simstudioai/sim.git
synced 2026-01-20 04:17:57 -05:00
Compare commits
5 Commits
feat/super
...
feat/copil
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5da1dfb5e4 | ||
|
|
69614d2d93 | ||
|
|
6cbadd7110 | ||
|
|
9efd3d5b4c | ||
|
|
e575ba2965 |
@@ -0,0 +1,86 @@
|
||||
/**
|
||||
* GET /api/copilot/chat/[chatId]/active-stream
|
||||
*
|
||||
* Check if a chat has an active stream that can be resumed.
|
||||
* Used by the client on page load to detect if there's an in-progress stream.
|
||||
*/
|
||||
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { getSession } from '@/lib/auth'
|
||||
import {
|
||||
getActiveStreamForChat,
|
||||
getChunkCount,
|
||||
getStreamMeta,
|
||||
} from '@/lib/copilot/stream-persistence'
|
||||
|
||||
const logger = createLogger('CopilotActiveStreamAPI')
|
||||
|
||||
export async function GET(
|
||||
req: NextRequest,
|
||||
{ params }: { params: Promise<{ chatId: string }> }
|
||||
) {
|
||||
const session = await getSession()
|
||||
if (!session?.user?.id) {
|
||||
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
|
||||
}
|
||||
|
||||
const { chatId } = await params
|
||||
|
||||
logger.info('Active stream check', { chatId, userId: session.user.id })
|
||||
|
||||
// Look up active stream ID from Redis
|
||||
const streamId = await getActiveStreamForChat(chatId)
|
||||
|
||||
if (!streamId) {
|
||||
logger.debug('No active stream found', { chatId })
|
||||
return NextResponse.json({ hasActiveStream: false })
|
||||
}
|
||||
|
||||
// Get stream metadata
|
||||
const meta = await getStreamMeta(streamId)
|
||||
|
||||
if (!meta) {
|
||||
logger.debug('Stream metadata not found', { streamId, chatId })
|
||||
return NextResponse.json({ hasActiveStream: false })
|
||||
}
|
||||
|
||||
// Verify the stream is still active
|
||||
if (meta.status !== 'streaming') {
|
||||
logger.debug('Stream not active', { streamId, chatId, status: meta.status })
|
||||
return NextResponse.json({ hasActiveStream: false })
|
||||
}
|
||||
|
||||
// Verify ownership
|
||||
if (meta.userId !== session.user.id) {
|
||||
logger.warn('Stream belongs to different user', {
|
||||
streamId,
|
||||
chatId,
|
||||
requesterId: session.user.id,
|
||||
ownerId: meta.userId,
|
||||
})
|
||||
return NextResponse.json({ hasActiveStream: false })
|
||||
}
|
||||
|
||||
// Get current chunk count for client to track progress
|
||||
const chunkCount = await getChunkCount(streamId)
|
||||
|
||||
logger.info('Active stream found', {
|
||||
streamId,
|
||||
chatId,
|
||||
chunkCount,
|
||||
toolCallsCount: meta.toolCalls.length,
|
||||
})
|
||||
|
||||
return NextResponse.json({
|
||||
hasActiveStream: true,
|
||||
streamId,
|
||||
chunkCount,
|
||||
toolCalls: meta.toolCalls.filter(
|
||||
(tc) => tc.state === 'pending' || tc.state === 'executing'
|
||||
),
|
||||
createdAt: meta.createdAt,
|
||||
updatedAt: meta.updatedAt,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ import { db } from '@sim/db'
|
||||
import { copilotChats } from '@sim/db/schema'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { and, desc, eq } from 'drizzle-orm'
|
||||
import { after } from 'next/server'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { z } from 'zod'
|
||||
import { getSession } from '@/lib/auth'
|
||||
@@ -16,6 +17,16 @@ import {
|
||||
createRequestTracker,
|
||||
createUnauthorizedResponse,
|
||||
} from '@/lib/copilot/request-helpers'
|
||||
import {
|
||||
appendChunk,
|
||||
appendContent,
|
||||
checkAbortSignal,
|
||||
completeStream,
|
||||
createStream,
|
||||
errorStream,
|
||||
refreshStreamTTL,
|
||||
updateToolCall,
|
||||
} from '@/lib/copilot/stream-persistence'
|
||||
import { getCredentialsServerTool } from '@/lib/copilot/tools/server/user/get-credentials'
|
||||
import type { CopilotProviderConfig } from '@/lib/copilot/types'
|
||||
import { env } from '@/lib/core/config/env'
|
||||
@@ -492,385 +503,186 @@ export async function POST(req: NextRequest) {
|
||||
)
|
||||
}
|
||||
|
||||
// If streaming is requested, forward the stream and update chat later
|
||||
// If streaming is requested, start background processing and return streamId immediately
|
||||
if (stream && simAgentResponse.body) {
|
||||
// Create user message to save
|
||||
const userMessage = {
|
||||
id: userMessageIdToUse, // Consistent ID used for request and persistence
|
||||
role: 'user',
|
||||
content: message,
|
||||
timestamp: new Date().toISOString(),
|
||||
...(fileAttachments && fileAttachments.length > 0 && { fileAttachments }),
|
||||
...(Array.isArray(contexts) && contexts.length > 0 && { contexts }),
|
||||
...(Array.isArray(contexts) &&
|
||||
contexts.length > 0 && {
|
||||
contentBlocks: [{ type: 'contexts', contexts: contexts as any, timestamp: Date.now() }],
|
||||
}),
|
||||
}
|
||||
// Create stream ID for persistence and resumption
|
||||
const streamId = crypto.randomUUID()
|
||||
|
||||
// Create a pass-through stream that captures the response
|
||||
const transformedStream = new ReadableStream({
|
||||
async start(controller) {
|
||||
const encoder = new TextEncoder()
|
||||
let assistantContent = ''
|
||||
const toolCalls: any[] = []
|
||||
let buffer = ''
|
||||
const isFirstDone = true
|
||||
let responseIdFromStart: string | undefined
|
||||
let responseIdFromDone: string | undefined
|
||||
// Track tool call progress to identify a safe done event
|
||||
const announcedToolCallIds = new Set<string>()
|
||||
const startedToolExecutionIds = new Set<string>()
|
||||
const completedToolExecutionIds = new Set<string>()
|
||||
let lastDoneResponseId: string | undefined
|
||||
let lastSafeDoneResponseId: string | undefined
|
||||
// Initialize stream state in Redis
|
||||
await createStream({
|
||||
streamId,
|
||||
chatId: actualChatId!,
|
||||
userId: authenticatedUserId,
|
||||
workflowId,
|
||||
userMessageId: userMessageIdToUse,
|
||||
isClientSession: true,
|
||||
})
|
||||
|
||||
// Send chatId as first event
|
||||
if (actualChatId) {
|
||||
const chatIdEvent = `data: ${JSON.stringify({
|
||||
type: 'chat_id',
|
||||
chatId: actualChatId,
|
||||
})}\n\n`
|
||||
controller.enqueue(encoder.encode(chatIdEvent))
|
||||
logger.debug(`[${tracker.requestId}] Sent initial chatId event to client`)
|
||||
}
|
||||
// Track last TTL refresh time
|
||||
const TTL_REFRESH_INTERVAL = 60000 // Refresh TTL every minute
|
||||
|
||||
// Start title generation in parallel if needed
|
||||
if (actualChatId && !currentChat?.title && conversationHistory.length === 0) {
|
||||
generateChatTitle(message)
|
||||
.then(async (title) => {
|
||||
if (title) {
|
||||
await db
|
||||
.update(copilotChats)
|
||||
.set({
|
||||
title,
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(copilotChats.id, actualChatId!))
|
||||
// Capture needed values for background task
|
||||
const capturedChatId = actualChatId!
|
||||
const capturedCurrentChat = currentChat
|
||||
|
||||
const titleEvent = `data: ${JSON.stringify({
|
||||
type: 'title_updated',
|
||||
title: title,
|
||||
})}\n\n`
|
||||
controller.enqueue(encoder.encode(titleEvent))
|
||||
logger.info(`[${tracker.requestId}] Generated and saved title: ${title}`)
|
||||
}
|
||||
})
|
||||
.catch((error) => {
|
||||
logger.error(`[${tracker.requestId}] Title generation failed:`, error)
|
||||
})
|
||||
} else {
|
||||
logger.debug(`[${tracker.requestId}] Skipping title generation`)
|
||||
}
|
||||
// Start background processing task - runs independently of client
|
||||
// Client will connect to /api/copilot/stream/{streamId} for live updates
|
||||
const backgroundTask = (async () => {
|
||||
const bgReader = simAgentResponse.body!.getReader()
|
||||
const decoder = new TextDecoder()
|
||||
let buffer = ''
|
||||
let assistantContent = ''
|
||||
const toolCalls: any[] = []
|
||||
let lastSafeDoneResponseId: string | undefined
|
||||
let bgLastTTLRefresh = Date.now()
|
||||
|
||||
// Forward the sim agent stream and capture assistant response
|
||||
const reader = simAgentResponse.body!.getReader()
|
||||
const decoder = new TextDecoder()
|
||||
// Send initial events via Redis for client to receive
|
||||
const chatIdEvent = `data: ${JSON.stringify({ type: 'chat_id', chatId: capturedChatId })}\n\n`
|
||||
await appendChunk(streamId, chatIdEvent).catch(() => {})
|
||||
|
||||
try {
|
||||
while (true) {
|
||||
const { done, value } = await reader.read()
|
||||
if (done) {
|
||||
break
|
||||
}
|
||||
const streamIdEvent = `data: ${JSON.stringify({ type: 'stream_id', streamId })}\n\n`
|
||||
await appendChunk(streamId, streamIdEvent).catch(() => {})
|
||||
|
||||
// Decode and parse SSE events for logging and capturing content
|
||||
const decodedChunk = decoder.decode(value, { stream: true })
|
||||
buffer += decodedChunk
|
||||
|
||||
const lines = buffer.split('\n')
|
||||
buffer = lines.pop() || '' // Keep incomplete line in buffer
|
||||
|
||||
for (const line of lines) {
|
||||
if (line.trim() === '') continue // Skip empty lines
|
||||
|
||||
if (line.startsWith('data: ') && line.length > 6) {
|
||||
try {
|
||||
const jsonStr = line.slice(6)
|
||||
|
||||
// Check if the JSON string is unusually large (potential streaming issue)
|
||||
if (jsonStr.length > 50000) {
|
||||
// 50KB limit
|
||||
logger.warn(`[${tracker.requestId}] Large SSE event detected`, {
|
||||
size: jsonStr.length,
|
||||
preview: `${jsonStr.substring(0, 100)}...`,
|
||||
})
|
||||
}
|
||||
|
||||
const event = JSON.parse(jsonStr)
|
||||
|
||||
// Log different event types comprehensively
|
||||
switch (event.type) {
|
||||
case 'content':
|
||||
if (event.data) {
|
||||
assistantContent += event.data
|
||||
}
|
||||
break
|
||||
|
||||
case 'reasoning':
|
||||
logger.debug(
|
||||
`[${tracker.requestId}] Reasoning chunk received (${(event.data || event.content || '').length} chars)`
|
||||
)
|
||||
break
|
||||
|
||||
case 'tool_call':
|
||||
if (!event.data?.partial) {
|
||||
toolCalls.push(event.data)
|
||||
if (event.data?.id) {
|
||||
announcedToolCallIds.add(event.data.id)
|
||||
}
|
||||
}
|
||||
break
|
||||
|
||||
case 'tool_generating':
|
||||
if (event.toolCallId) {
|
||||
startedToolExecutionIds.add(event.toolCallId)
|
||||
}
|
||||
break
|
||||
|
||||
case 'tool_result':
|
||||
if (event.toolCallId) {
|
||||
completedToolExecutionIds.add(event.toolCallId)
|
||||
}
|
||||
break
|
||||
|
||||
case 'tool_error':
|
||||
logger.error(`[${tracker.requestId}] Tool error:`, {
|
||||
toolCallId: event.toolCallId,
|
||||
toolName: event.toolName,
|
||||
error: event.error,
|
||||
success: event.success,
|
||||
})
|
||||
if (event.toolCallId) {
|
||||
completedToolExecutionIds.add(event.toolCallId)
|
||||
}
|
||||
break
|
||||
|
||||
case 'start':
|
||||
if (event.data?.responseId) {
|
||||
responseIdFromStart = event.data.responseId
|
||||
}
|
||||
break
|
||||
|
||||
case 'done':
|
||||
if (event.data?.responseId) {
|
||||
responseIdFromDone = event.data.responseId
|
||||
lastDoneResponseId = responseIdFromDone
|
||||
|
||||
// Mark this done as safe only if no tool call is currently in progress or pending
|
||||
const announced = announcedToolCallIds.size
|
||||
const completed = completedToolExecutionIds.size
|
||||
const started = startedToolExecutionIds.size
|
||||
const hasToolInProgress = announced > completed || started > completed
|
||||
if (!hasToolInProgress) {
|
||||
lastSafeDoneResponseId = responseIdFromDone
|
||||
}
|
||||
}
|
||||
break
|
||||
|
||||
case 'error':
|
||||
break
|
||||
|
||||
default:
|
||||
}
|
||||
|
||||
// Emit to client: rewrite 'error' events into user-friendly assistant message
|
||||
if (event?.type === 'error') {
|
||||
try {
|
||||
const displayMessage: string =
|
||||
(event?.data && (event.data.displayMessage as string)) ||
|
||||
'Sorry, I encountered an error. Please try again.'
|
||||
const formatted = `_${displayMessage}_`
|
||||
// Accumulate so it persists to DB as assistant content
|
||||
assistantContent += formatted
|
||||
// Send as content chunk
|
||||
try {
|
||||
controller.enqueue(
|
||||
encoder.encode(
|
||||
`data: ${JSON.stringify({ type: 'content', data: formatted })}\n\n`
|
||||
)
|
||||
)
|
||||
} catch (enqueueErr) {
|
||||
reader.cancel()
|
||||
break
|
||||
}
|
||||
// Then close this response cleanly for the client
|
||||
try {
|
||||
controller.enqueue(
|
||||
encoder.encode(`data: ${JSON.stringify({ type: 'done' })}\n\n`)
|
||||
)
|
||||
} catch (enqueueErr) {
|
||||
reader.cancel()
|
||||
break
|
||||
}
|
||||
} catch {}
|
||||
// Do not forward the original error event
|
||||
} else {
|
||||
// Forward original event to client
|
||||
try {
|
||||
controller.enqueue(encoder.encode(`data: ${jsonStr}\n\n`))
|
||||
} catch (enqueueErr) {
|
||||
reader.cancel()
|
||||
break
|
||||
}
|
||||
}
|
||||
} catch (e) {
|
||||
// Enhanced error handling for large payloads and parsing issues
|
||||
const lineLength = line.length
|
||||
const isLargePayload = lineLength > 10000
|
||||
|
||||
if (isLargePayload) {
|
||||
logger.error(
|
||||
`[${tracker.requestId}] Failed to parse large SSE event (${lineLength} chars)`,
|
||||
{
|
||||
error: e,
|
||||
preview: `${line.substring(0, 200)}...`,
|
||||
size: lineLength,
|
||||
}
|
||||
)
|
||||
} else {
|
||||
logger.warn(
|
||||
`[${tracker.requestId}] Failed to parse SSE event: "${line.substring(0, 200)}..."`,
|
||||
e
|
||||
)
|
||||
}
|
||||
}
|
||||
} else if (line.trim() && line !== 'data: [DONE]') {
|
||||
logger.debug(`[${tracker.requestId}] Non-SSE line from sim agent: "${line}"`)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Process any remaining buffer
|
||||
if (buffer.trim()) {
|
||||
logger.debug(`[${tracker.requestId}] Processing remaining buffer: "${buffer}"`)
|
||||
if (buffer.startsWith('data: ')) {
|
||||
try {
|
||||
const jsonStr = buffer.slice(6)
|
||||
const event = JSON.parse(jsonStr)
|
||||
if (event.type === 'content' && event.data) {
|
||||
assistantContent += event.data
|
||||
}
|
||||
// Forward remaining event, applying same error rewrite behavior
|
||||
if (event?.type === 'error') {
|
||||
const displayMessage: string =
|
||||
(event?.data && (event.data.displayMessage as string)) ||
|
||||
'Sorry, I encountered an error. Please try again.'
|
||||
const formatted = `_${displayMessage}_`
|
||||
assistantContent += formatted
|
||||
try {
|
||||
controller.enqueue(
|
||||
encoder.encode(
|
||||
`data: ${JSON.stringify({ type: 'content', data: formatted })}\n\n`
|
||||
)
|
||||
)
|
||||
controller.enqueue(
|
||||
encoder.encode(`data: ${JSON.stringify({ type: 'done' })}\n\n`)
|
||||
)
|
||||
} catch (enqueueErr) {
|
||||
reader.cancel()
|
||||
}
|
||||
} else {
|
||||
try {
|
||||
controller.enqueue(encoder.encode(`data: ${jsonStr}\n\n`))
|
||||
} catch (enqueueErr) {
|
||||
reader.cancel()
|
||||
}
|
||||
}
|
||||
} catch (e) {
|
||||
logger.warn(`[${tracker.requestId}] Failed to parse final buffer: "${buffer}"`)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Log final streaming summary
|
||||
logger.info(`[${tracker.requestId}] Streaming complete summary:`, {
|
||||
totalContentLength: assistantContent.length,
|
||||
toolCallsCount: toolCalls.length,
|
||||
hasContent: assistantContent.length > 0,
|
||||
toolNames: toolCalls.map((tc) => tc?.name).filter(Boolean),
|
||||
})
|
||||
|
||||
// NOTE: Messages are saved by the client via update-messages endpoint with full contentBlocks.
|
||||
// Server only updates conversationId here to avoid overwriting client's richer save.
|
||||
if (currentChat) {
|
||||
// Persist only a safe conversationId to avoid continuing from a state that expects tool outputs
|
||||
const previousConversationId = currentChat?.conversationId as string | undefined
|
||||
const responseId = lastSafeDoneResponseId || previousConversationId || undefined
|
||||
|
||||
if (responseId) {
|
||||
// Start title generation if needed
|
||||
if (capturedChatId && !capturedCurrentChat?.title && conversationHistory.length === 0) {
|
||||
generateChatTitle(message)
|
||||
.then(async (title) => {
|
||||
if (title) {
|
||||
await db
|
||||
.update(copilotChats)
|
||||
.set({
|
||||
updatedAt: new Date(),
|
||||
conversationId: responseId,
|
||||
})
|
||||
.where(eq(copilotChats.id, actualChatId!))
|
||||
.set({ title, updatedAt: new Date() })
|
||||
.where(eq(copilotChats.id, capturedChatId))
|
||||
|
||||
logger.info(
|
||||
`[${tracker.requestId}] Updated conversationId for chat ${actualChatId}`,
|
||||
{
|
||||
updatedConversationId: responseId,
|
||||
}
|
||||
)
|
||||
const titleEvent = `data: ${JSON.stringify({ type: 'title_updated', title })}\n\n`
|
||||
await appendChunk(streamId, titleEvent).catch(() => {})
|
||||
logger.info(`[${tracker.requestId}] Generated and saved title: ${title}`)
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(`[${tracker.requestId}] Error processing stream:`, error)
|
||||
})
|
||||
.catch((error) => {
|
||||
logger.error(`[${tracker.requestId}] Title generation failed:`, error)
|
||||
})
|
||||
}
|
||||
|
||||
// Send an error event to the client before closing so it knows what happened
|
||||
try {
|
||||
const errorMessage =
|
||||
error instanceof Error && error.message === 'terminated'
|
||||
? 'Connection to AI service was interrupted. Please try again.'
|
||||
: 'An unexpected error occurred while processing the response.'
|
||||
const encoder = new TextEncoder()
|
||||
|
||||
// Send error as content so it shows in the chat
|
||||
controller.enqueue(
|
||||
encoder.encode(
|
||||
`data: ${JSON.stringify({ type: 'content', data: `\n\n_${errorMessage}_` })}\n\n`
|
||||
)
|
||||
)
|
||||
// Send done event to properly close the stream on client
|
||||
controller.enqueue(encoder.encode(`data: ${JSON.stringify({ type: 'done' })}\n\n`))
|
||||
} catch (enqueueError) {
|
||||
// Stream might already be closed, that's ok
|
||||
logger.warn(
|
||||
`[${tracker.requestId}] Could not send error event to client:`,
|
||||
enqueueError
|
||||
)
|
||||
try {
|
||||
while (true) {
|
||||
// Check for abort signal
|
||||
const isAborted = await checkAbortSignal(streamId)
|
||||
if (isAborted) {
|
||||
logger.info(`[${tracker.requestId}] Background stream aborted via signal`, { streamId })
|
||||
bgReader.cancel()
|
||||
break
|
||||
}
|
||||
} finally {
|
||||
try {
|
||||
controller.close()
|
||||
} catch {
|
||||
// Controller might already be closed
|
||||
|
||||
const { done, value } = await bgReader.read()
|
||||
if (done) break
|
||||
|
||||
const chunk = decoder.decode(value, { stream: true })
|
||||
buffer += chunk
|
||||
|
||||
// Persist raw chunk for replay and publish to live subscribers
|
||||
await appendChunk(streamId, chunk).catch(() => {})
|
||||
|
||||
// Refresh TTL periodically
|
||||
const now = Date.now()
|
||||
if (now - bgLastTTLRefresh > TTL_REFRESH_INTERVAL) {
|
||||
bgLastTTLRefresh = now
|
||||
refreshStreamTTL(streamId, capturedChatId).catch(() => {})
|
||||
}
|
||||
|
||||
// Parse and track content/tool calls
|
||||
const lines = buffer.split('\n')
|
||||
buffer = lines.pop() || ''
|
||||
|
||||
for (const line of lines) {
|
||||
if (!line.startsWith('data: ') || line.length <= 6) continue
|
||||
try {
|
||||
const event = JSON.parse(line.slice(6))
|
||||
switch (event.type) {
|
||||
case 'content':
|
||||
if (event.data) {
|
||||
assistantContent += event.data
|
||||
appendContent(streamId, event.data).catch(() => {})
|
||||
}
|
||||
break
|
||||
case 'tool_call':
|
||||
if (!event.data?.partial && event.data?.id) {
|
||||
toolCalls.push(event.data)
|
||||
updateToolCall(streamId, event.data.id, {
|
||||
id: event.data.id,
|
||||
name: event.data.name,
|
||||
args: event.data.arguments || {},
|
||||
state: 'pending',
|
||||
}).catch(() => {})
|
||||
}
|
||||
break
|
||||
case 'tool_generating':
|
||||
if (event.toolCallId) {
|
||||
updateToolCall(streamId, event.toolCallId, { state: 'executing' }).catch(() => {})
|
||||
}
|
||||
break
|
||||
case 'tool_result':
|
||||
if (event.toolCallId) {
|
||||
updateToolCall(streamId, event.toolCallId, {
|
||||
state: 'success',
|
||||
result: event.result,
|
||||
}).catch(() => {})
|
||||
}
|
||||
break
|
||||
case 'tool_error':
|
||||
if (event.toolCallId) {
|
||||
updateToolCall(streamId, event.toolCallId, {
|
||||
state: 'error',
|
||||
error: event.error,
|
||||
}).catch(() => {})
|
||||
}
|
||||
break
|
||||
case 'done':
|
||||
if (event.data?.responseId) {
|
||||
lastSafeDoneResponseId = event.data.responseId
|
||||
}
|
||||
break
|
||||
}
|
||||
} catch {}
|
||||
}
|
||||
}
|
||||
},
|
||||
|
||||
// Complete stream - save to DB
|
||||
const finalConversationId = lastSafeDoneResponseId || (capturedCurrentChat?.conversationId as string | undefined)
|
||||
await completeStream(streamId, finalConversationId)
|
||||
|
||||
// Update conversationId in DB
|
||||
if (capturedCurrentChat && lastSafeDoneResponseId) {
|
||||
await db
|
||||
.update(copilotChats)
|
||||
.set({ updatedAt: new Date(), conversationId: lastSafeDoneResponseId })
|
||||
.where(eq(copilotChats.id, capturedChatId))
|
||||
}
|
||||
|
||||
logger.info(`[${tracker.requestId}] Background stream processing complete`, {
|
||||
streamId,
|
||||
contentLength: assistantContent.length,
|
||||
toolCallsCount: toolCalls.length,
|
||||
})
|
||||
} catch (error) {
|
||||
logger.error(`[${tracker.requestId}] Background stream error`, { streamId, error })
|
||||
await errorStream(streamId, error instanceof Error ? error.message : 'Unknown error')
|
||||
}
|
||||
})()
|
||||
|
||||
// Use after() to ensure background task completes even after response is sent
|
||||
after(() => backgroundTask)
|
||||
|
||||
// Return streamId immediately - client will connect to stream endpoint
|
||||
logger.info(`[${tracker.requestId}] Returning streamId for client to connect`, {
|
||||
streamId,
|
||||
chatId: capturedChatId,
|
||||
})
|
||||
|
||||
const response = new Response(transformedStream, {
|
||||
headers: {
|
||||
'Content-Type': 'text/event-stream',
|
||||
'Cache-Control': 'no-cache',
|
||||
Connection: 'keep-alive',
|
||||
'X-Accel-Buffering': 'no',
|
||||
},
|
||||
return NextResponse.json({
|
||||
success: true,
|
||||
streamId,
|
||||
chatId: capturedChatId,
|
||||
})
|
||||
|
||||
logger.info(`[${tracker.requestId}] Returning streaming response to client`, {
|
||||
duration: tracker.getDuration(),
|
||||
chatId: actualChatId,
|
||||
headers: {
|
||||
'Content-Type': 'text/event-stream',
|
||||
'Cache-Control': 'no-cache',
|
||||
Connection: 'keep-alive',
|
||||
},
|
||||
})
|
||||
|
||||
return response
|
||||
}
|
||||
|
||||
// For non-streaming responses
|
||||
@@ -899,7 +711,7 @@ export async function POST(req: NextRequest) {
|
||||
// Save messages if we have a chat
|
||||
if (currentChat && responseData.content) {
|
||||
const userMessage = {
|
||||
id: userMessageIdToUse, // Consistent ID used for request and persistence
|
||||
id: userMessageIdToUse,
|
||||
role: 'user',
|
||||
content: message,
|
||||
timestamp: new Date().toISOString(),
|
||||
|
||||
64
apps/sim/app/api/copilot/stream/[streamId]/abort/route.ts
Normal file
64
apps/sim/app/api/copilot/stream/[streamId]/abort/route.ts
Normal file
@@ -0,0 +1,64 @@
|
||||
/**
|
||||
* POST /api/copilot/stream/[streamId]/abort
|
||||
*
|
||||
* Signal the server to abort an active stream.
|
||||
* The original request handler will check for this signal and cancel the stream.
|
||||
*/
|
||||
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { getSession } from '@/lib/auth'
|
||||
import { getStreamMeta, setAbortSignal } from '@/lib/copilot/stream-persistence'
|
||||
|
||||
const logger = createLogger('CopilotStreamAbortAPI')
|
||||
|
||||
export async function POST(
|
||||
req: NextRequest,
|
||||
{ params }: { params: Promise<{ streamId: string }> }
|
||||
) {
|
||||
const session = await getSession()
|
||||
if (!session?.user?.id) {
|
||||
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
|
||||
}
|
||||
|
||||
const { streamId } = await params
|
||||
|
||||
logger.info('Stream abort request', { streamId, userId: session.user.id })
|
||||
|
||||
const meta = await getStreamMeta(streamId)
|
||||
|
||||
if (!meta) {
|
||||
logger.info('Stream not found for abort', { streamId })
|
||||
return NextResponse.json({ error: 'Stream not found' }, { status: 404 })
|
||||
}
|
||||
|
||||
// Verify ownership
|
||||
if (meta.userId !== session.user.id) {
|
||||
logger.warn('Unauthorized abort attempt', {
|
||||
streamId,
|
||||
requesterId: session.user.id,
|
||||
ownerId: meta.userId,
|
||||
})
|
||||
return NextResponse.json({ error: 'Forbidden' }, { status: 403 })
|
||||
}
|
||||
|
||||
// Stream already finished
|
||||
if (meta.status !== 'streaming') {
|
||||
logger.info('Stream already finished, nothing to abort', {
|
||||
streamId,
|
||||
status: meta.status,
|
||||
})
|
||||
return NextResponse.json({
|
||||
success: true,
|
||||
message: 'Stream already finished',
|
||||
})
|
||||
}
|
||||
|
||||
// Set abort signal in Redis
|
||||
await setAbortSignal(streamId)
|
||||
|
||||
logger.info('Abort signal set for stream', { streamId })
|
||||
|
||||
return NextResponse.json({ success: true })
|
||||
}
|
||||
|
||||
160
apps/sim/app/api/copilot/stream/[streamId]/route.ts
Normal file
160
apps/sim/app/api/copilot/stream/[streamId]/route.ts
Normal file
@@ -0,0 +1,160 @@
|
||||
/**
|
||||
* GET /api/copilot/stream/[streamId]
|
||||
*
|
||||
* Resume an active copilot stream.
|
||||
* - If stream is still active: returns SSE with replay of missed chunks + live updates via Redis Pub/Sub
|
||||
* - If stream is completed: returns JSON indicating to load from database
|
||||
* - If stream not found: returns 404
|
||||
*/
|
||||
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { getSession } from '@/lib/auth'
|
||||
import {
|
||||
getChunks,
|
||||
getStreamMeta,
|
||||
subscribeToStream,
|
||||
} from '@/lib/copilot/stream-persistence'
|
||||
|
||||
const logger = createLogger('CopilotStreamResumeAPI')
|
||||
|
||||
const SSE_HEADERS = {
|
||||
'Content-Type': 'text/event-stream',
|
||||
'Cache-Control': 'no-cache',
|
||||
Connection: 'keep-alive',
|
||||
'X-Accel-Buffering': 'no',
|
||||
}
|
||||
|
||||
export async function GET(
|
||||
req: NextRequest,
|
||||
{ params }: { params: Promise<{ streamId: string }> }
|
||||
) {
|
||||
const session = await getSession()
|
||||
if (!session?.user?.id) {
|
||||
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
|
||||
}
|
||||
|
||||
const { streamId } = await params
|
||||
const fromChunk = parseInt(req.nextUrl.searchParams.get('from') || '0')
|
||||
|
||||
logger.info('Stream resume request', { streamId, fromChunk, userId: session.user.id })
|
||||
|
||||
const meta = await getStreamMeta(streamId)
|
||||
|
||||
if (!meta) {
|
||||
logger.info('Stream not found or expired', { streamId })
|
||||
return NextResponse.json(
|
||||
{
|
||||
status: 'not_found',
|
||||
message: 'Stream not found or expired. Reload chat from database.',
|
||||
},
|
||||
{ status: 404 }
|
||||
)
|
||||
}
|
||||
|
||||
// Verify ownership
|
||||
if (meta.userId !== session.user.id) {
|
||||
logger.warn('Unauthorized stream access attempt', {
|
||||
streamId,
|
||||
requesterId: session.user.id,
|
||||
ownerId: meta.userId,
|
||||
})
|
||||
return NextResponse.json({ error: 'Forbidden' }, { status: 403 })
|
||||
}
|
||||
|
||||
// Stream completed - tell client to load from database
|
||||
if (meta.status === 'completed') {
|
||||
logger.info('Stream already completed', { streamId, chatId: meta.chatId })
|
||||
return NextResponse.json({
|
||||
status: 'completed',
|
||||
chatId: meta.chatId,
|
||||
message: 'Stream completed. Messages saved to database.',
|
||||
})
|
||||
}
|
||||
|
||||
// Stream errored
|
||||
if (meta.status === 'error') {
|
||||
logger.info('Stream encountered error', { streamId, chatId: meta.chatId })
|
||||
return NextResponse.json({
|
||||
status: 'error',
|
||||
chatId: meta.chatId,
|
||||
message: 'Stream encountered an error.',
|
||||
})
|
||||
}
|
||||
|
||||
// Stream still active - return SSE with replay + live updates
|
||||
logger.info('Resuming active stream', { streamId, chatId: meta.chatId })
|
||||
|
||||
const encoder = new TextEncoder()
|
||||
const abortController = new AbortController()
|
||||
|
||||
// Handle client disconnect
|
||||
req.signal.addEventListener('abort', () => {
|
||||
logger.info('Client disconnected from resumed stream', { streamId })
|
||||
abortController.abort()
|
||||
})
|
||||
|
||||
const responseStream = new ReadableStream({
|
||||
async start(controller) {
|
||||
try {
|
||||
// 1. Replay missed chunks (single read from Redis LIST)
|
||||
const missedChunks = await getChunks(streamId, fromChunk)
|
||||
logger.info('Replaying missed chunks', {
|
||||
streamId,
|
||||
fromChunk,
|
||||
missedChunkCount: missedChunks.length,
|
||||
})
|
||||
|
||||
for (const chunk of missedChunks) {
|
||||
// Chunks are already in SSE format, just re-encode
|
||||
controller.enqueue(encoder.encode(chunk))
|
||||
}
|
||||
|
||||
// 2. Subscribe to live chunks via Redis Pub/Sub (blocking, no polling)
|
||||
await subscribeToStream(
|
||||
streamId,
|
||||
(chunk) => {
|
||||
try {
|
||||
controller.enqueue(encoder.encode(chunk))
|
||||
} catch {
|
||||
// Client disconnected
|
||||
abortController.abort()
|
||||
}
|
||||
},
|
||||
() => {
|
||||
// Stream complete - close connection
|
||||
logger.info('Stream completed during resume', { streamId })
|
||||
try {
|
||||
controller.close()
|
||||
} catch {
|
||||
// Already closed
|
||||
}
|
||||
},
|
||||
abortController.signal
|
||||
)
|
||||
} catch (error) {
|
||||
logger.error('Error in stream resume', {
|
||||
streamId,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
})
|
||||
try {
|
||||
controller.close()
|
||||
} catch {
|
||||
// Already closed
|
||||
}
|
||||
}
|
||||
},
|
||||
cancel() {
|
||||
abortController.abort()
|
||||
},
|
||||
})
|
||||
|
||||
return new Response(responseStream, {
|
||||
headers: {
|
||||
...SSE_HEADERS,
|
||||
'X-Stream-Id': streamId,
|
||||
'X-Chat-Id': meta.chatId,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
@@ -550,6 +550,8 @@ export interface AdminUserBilling {
|
||||
totalWebhookTriggers: number
|
||||
totalScheduledExecutions: number
|
||||
totalChatExecutions: number
|
||||
totalMcpExecutions: number
|
||||
totalA2aExecutions: number
|
||||
totalTokensUsed: number
|
||||
totalCost: string
|
||||
currentUsageLimit: string | null
|
||||
|
||||
@@ -97,6 +97,8 @@ export const GET = withAdminAuthParams<RouteParams>(async (_, context) => {
|
||||
totalWebhookTriggers: stats?.totalWebhookTriggers ?? 0,
|
||||
totalScheduledExecutions: stats?.totalScheduledExecutions ?? 0,
|
||||
totalChatExecutions: stats?.totalChatExecutions ?? 0,
|
||||
totalMcpExecutions: stats?.totalMcpExecutions ?? 0,
|
||||
totalA2aExecutions: stats?.totalA2aExecutions ?? 0,
|
||||
totalTokensUsed: stats?.totalTokensUsed ?? 0,
|
||||
totalCost: stats?.totalCost ?? '0',
|
||||
currentUsageLimit: stats?.currentUsageLimit ?? null,
|
||||
|
||||
@@ -19,7 +19,7 @@ export interface RateLimitResult {
|
||||
|
||||
export async function checkRateLimit(
|
||||
request: NextRequest,
|
||||
endpoint: 'logs' | 'logs-detail' = 'logs'
|
||||
endpoint: 'logs' | 'logs-detail' | 'workflows' | 'workflow-detail' = 'logs'
|
||||
): Promise<RateLimitResult> {
|
||||
try {
|
||||
const auth = await authenticateV1Request(request)
|
||||
|
||||
102
apps/sim/app/api/v1/workflows/[id]/route.ts
Normal file
102
apps/sim/app/api/v1/workflows/[id]/route.ts
Normal file
@@ -0,0 +1,102 @@
|
||||
import { db } from '@sim/db'
|
||||
import { permissions, workflow, workflowBlocks } from '@sim/db/schema'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { and, eq } from 'drizzle-orm'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { extractInputFieldsFromBlocks } from '@/lib/workflows/input-format'
|
||||
import { createApiResponse, getUserLimits } from '@/app/api/v1/logs/meta'
|
||||
import { checkRateLimit, createRateLimitResponse } from '@/app/api/v1/middleware'
|
||||
|
||||
const logger = createLogger('V1WorkflowDetailsAPI')
|
||||
|
||||
export const revalidate = 0
|
||||
|
||||
export async function GET(request: NextRequest, { params }: { params: Promise<{ id: string }> }) {
|
||||
const requestId = crypto.randomUUID().slice(0, 8)
|
||||
|
||||
try {
|
||||
const rateLimit = await checkRateLimit(request, 'workflow-detail')
|
||||
if (!rateLimit.allowed) {
|
||||
return createRateLimitResponse(rateLimit)
|
||||
}
|
||||
|
||||
const userId = rateLimit.userId!
|
||||
const { id } = await params
|
||||
|
||||
logger.info(`[${requestId}] Fetching workflow details for ${id}`, { userId })
|
||||
|
||||
const rows = await db
|
||||
.select({
|
||||
id: workflow.id,
|
||||
name: workflow.name,
|
||||
description: workflow.description,
|
||||
color: workflow.color,
|
||||
folderId: workflow.folderId,
|
||||
workspaceId: workflow.workspaceId,
|
||||
isDeployed: workflow.isDeployed,
|
||||
deployedAt: workflow.deployedAt,
|
||||
runCount: workflow.runCount,
|
||||
lastRunAt: workflow.lastRunAt,
|
||||
variables: workflow.variables,
|
||||
createdAt: workflow.createdAt,
|
||||
updatedAt: workflow.updatedAt,
|
||||
})
|
||||
.from(workflow)
|
||||
.innerJoin(
|
||||
permissions,
|
||||
and(
|
||||
eq(permissions.entityType, 'workspace'),
|
||||
eq(permissions.entityId, workflow.workspaceId),
|
||||
eq(permissions.userId, userId)
|
||||
)
|
||||
)
|
||||
.where(eq(workflow.id, id))
|
||||
.limit(1)
|
||||
|
||||
const workflowData = rows[0]
|
||||
if (!workflowData) {
|
||||
return NextResponse.json({ error: 'Workflow not found' }, { status: 404 })
|
||||
}
|
||||
|
||||
const blockRows = await db
|
||||
.select({
|
||||
id: workflowBlocks.id,
|
||||
type: workflowBlocks.type,
|
||||
subBlocks: workflowBlocks.subBlocks,
|
||||
})
|
||||
.from(workflowBlocks)
|
||||
.where(eq(workflowBlocks.workflowId, id))
|
||||
|
||||
const blocksRecord = Object.fromEntries(
|
||||
blockRows.map((block) => [block.id, { type: block.type, subBlocks: block.subBlocks }])
|
||||
)
|
||||
const inputs = extractInputFieldsFromBlocks(blocksRecord)
|
||||
|
||||
const response = {
|
||||
id: workflowData.id,
|
||||
name: workflowData.name,
|
||||
description: workflowData.description,
|
||||
color: workflowData.color,
|
||||
folderId: workflowData.folderId,
|
||||
workspaceId: workflowData.workspaceId,
|
||||
isDeployed: workflowData.isDeployed,
|
||||
deployedAt: workflowData.deployedAt?.toISOString() || null,
|
||||
runCount: workflowData.runCount,
|
||||
lastRunAt: workflowData.lastRunAt?.toISOString() || null,
|
||||
variables: workflowData.variables || {},
|
||||
inputs,
|
||||
createdAt: workflowData.createdAt.toISOString(),
|
||||
updatedAt: workflowData.updatedAt.toISOString(),
|
||||
}
|
||||
|
||||
const limits = await getUserLimits(userId)
|
||||
|
||||
const apiResponse = createApiResponse({ data: response }, limits, rateLimit)
|
||||
|
||||
return NextResponse.json(apiResponse.body, { headers: apiResponse.headers })
|
||||
} catch (error: unknown) {
|
||||
const message = error instanceof Error ? error.message : 'Unknown error'
|
||||
logger.error(`[${requestId}] Workflow details fetch error`, { error: message })
|
||||
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
|
||||
}
|
||||
}
|
||||
184
apps/sim/app/api/v1/workflows/route.ts
Normal file
184
apps/sim/app/api/v1/workflows/route.ts
Normal file
@@ -0,0 +1,184 @@
|
||||
import { db } from '@sim/db'
|
||||
import { permissions, workflow } from '@sim/db/schema'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { and, asc, eq, gt, or } from 'drizzle-orm'
|
||||
import { type NextRequest, NextResponse } from 'next/server'
|
||||
import { z } from 'zod'
|
||||
import { createApiResponse, getUserLimits } from '@/app/api/v1/logs/meta'
|
||||
import { checkRateLimit, createRateLimitResponse } from '@/app/api/v1/middleware'
|
||||
|
||||
const logger = createLogger('V1WorkflowsAPI')
|
||||
|
||||
export const dynamic = 'force-dynamic'
|
||||
export const revalidate = 0
|
||||
|
||||
const QueryParamsSchema = z.object({
|
||||
workspaceId: z.string(),
|
||||
folderId: z.string().optional(),
|
||||
deployedOnly: z.coerce.boolean().optional().default(false),
|
||||
limit: z.coerce.number().min(1).max(100).optional().default(50),
|
||||
cursor: z.string().optional(),
|
||||
})
|
||||
|
||||
interface CursorData {
|
||||
sortOrder: number
|
||||
createdAt: string
|
||||
id: string
|
||||
}
|
||||
|
||||
function encodeCursor(data: CursorData): string {
|
||||
return Buffer.from(JSON.stringify(data)).toString('base64')
|
||||
}
|
||||
|
||||
function decodeCursor(cursor: string): CursorData | null {
|
||||
try {
|
||||
return JSON.parse(Buffer.from(cursor, 'base64').toString())
|
||||
} catch {
|
||||
return null
|
||||
}
|
||||
}
|
||||
|
||||
export async function GET(request: NextRequest) {
|
||||
const requestId = crypto.randomUUID().slice(0, 8)
|
||||
|
||||
try {
|
||||
const rateLimit = await checkRateLimit(request, 'workflows')
|
||||
if (!rateLimit.allowed) {
|
||||
return createRateLimitResponse(rateLimit)
|
||||
}
|
||||
|
||||
const userId = rateLimit.userId!
|
||||
const { searchParams } = new URL(request.url)
|
||||
const rawParams = Object.fromEntries(searchParams.entries())
|
||||
|
||||
const validationResult = QueryParamsSchema.safeParse(rawParams)
|
||||
if (!validationResult.success) {
|
||||
return NextResponse.json(
|
||||
{ error: 'Invalid parameters', details: validationResult.error.errors },
|
||||
{ status: 400 }
|
||||
)
|
||||
}
|
||||
|
||||
const params = validationResult.data
|
||||
|
||||
logger.info(`[${requestId}] Fetching workflows for workspace ${params.workspaceId}`, {
|
||||
userId,
|
||||
filters: {
|
||||
folderId: params.folderId,
|
||||
deployedOnly: params.deployedOnly,
|
||||
},
|
||||
})
|
||||
|
||||
const conditions = [
|
||||
eq(workflow.workspaceId, params.workspaceId),
|
||||
eq(permissions.entityType, 'workspace'),
|
||||
eq(permissions.entityId, params.workspaceId),
|
||||
eq(permissions.userId, userId),
|
||||
]
|
||||
|
||||
if (params.folderId) {
|
||||
conditions.push(eq(workflow.folderId, params.folderId))
|
||||
}
|
||||
|
||||
if (params.deployedOnly) {
|
||||
conditions.push(eq(workflow.isDeployed, true))
|
||||
}
|
||||
|
||||
if (params.cursor) {
|
||||
const cursorData = decodeCursor(params.cursor)
|
||||
if (cursorData) {
|
||||
const cursorCondition = or(
|
||||
gt(workflow.sortOrder, cursorData.sortOrder),
|
||||
and(
|
||||
eq(workflow.sortOrder, cursorData.sortOrder),
|
||||
gt(workflow.createdAt, new Date(cursorData.createdAt))
|
||||
),
|
||||
and(
|
||||
eq(workflow.sortOrder, cursorData.sortOrder),
|
||||
eq(workflow.createdAt, new Date(cursorData.createdAt)),
|
||||
gt(workflow.id, cursorData.id)
|
||||
)
|
||||
)
|
||||
if (cursorCondition) {
|
||||
conditions.push(cursorCondition)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const orderByClause = [asc(workflow.sortOrder), asc(workflow.createdAt), asc(workflow.id)]
|
||||
|
||||
const rows = await db
|
||||
.select({
|
||||
id: workflow.id,
|
||||
name: workflow.name,
|
||||
description: workflow.description,
|
||||
color: workflow.color,
|
||||
folderId: workflow.folderId,
|
||||
workspaceId: workflow.workspaceId,
|
||||
isDeployed: workflow.isDeployed,
|
||||
deployedAt: workflow.deployedAt,
|
||||
runCount: workflow.runCount,
|
||||
lastRunAt: workflow.lastRunAt,
|
||||
sortOrder: workflow.sortOrder,
|
||||
createdAt: workflow.createdAt,
|
||||
updatedAt: workflow.updatedAt,
|
||||
})
|
||||
.from(workflow)
|
||||
.innerJoin(
|
||||
permissions,
|
||||
and(
|
||||
eq(permissions.entityType, 'workspace'),
|
||||
eq(permissions.entityId, params.workspaceId),
|
||||
eq(permissions.userId, userId)
|
||||
)
|
||||
)
|
||||
.where(and(...conditions))
|
||||
.orderBy(...orderByClause)
|
||||
.limit(params.limit + 1)
|
||||
|
||||
const hasMore = rows.length > params.limit
|
||||
const data = rows.slice(0, params.limit)
|
||||
|
||||
let nextCursor: string | undefined
|
||||
if (hasMore && data.length > 0) {
|
||||
const lastWorkflow = data[data.length - 1]
|
||||
nextCursor = encodeCursor({
|
||||
sortOrder: lastWorkflow.sortOrder,
|
||||
createdAt: lastWorkflow.createdAt.toISOString(),
|
||||
id: lastWorkflow.id,
|
||||
})
|
||||
}
|
||||
|
||||
const formattedWorkflows = data.map((w) => ({
|
||||
id: w.id,
|
||||
name: w.name,
|
||||
description: w.description,
|
||||
color: w.color,
|
||||
folderId: w.folderId,
|
||||
workspaceId: w.workspaceId,
|
||||
isDeployed: w.isDeployed,
|
||||
deployedAt: w.deployedAt?.toISOString() || null,
|
||||
runCount: w.runCount,
|
||||
lastRunAt: w.lastRunAt?.toISOString() || null,
|
||||
createdAt: w.createdAt.toISOString(),
|
||||
updatedAt: w.updatedAt.toISOString(),
|
||||
}))
|
||||
|
||||
const limits = await getUserLimits(userId)
|
||||
|
||||
const response = createApiResponse(
|
||||
{
|
||||
data: formattedWorkflows,
|
||||
nextCursor,
|
||||
},
|
||||
limits,
|
||||
rateLimit
|
||||
)
|
||||
|
||||
return NextResponse.json(response.body, { headers: response.headers })
|
||||
} catch (error: unknown) {
|
||||
const message = error instanceof Error ? error.message : 'Unknown error'
|
||||
logger.error(`[${requestId}] Workflows fetch error`, { error: message })
|
||||
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
|
||||
}
|
||||
}
|
||||
@@ -2,7 +2,6 @@
|
||||
|
||||
import { useRef, useState } from 'react'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { useQueryClient } from '@tanstack/react-query'
|
||||
import {
|
||||
Button,
|
||||
Label,
|
||||
@@ -14,7 +13,7 @@ import {
|
||||
Textarea,
|
||||
} from '@/components/emcn'
|
||||
import type { DocumentData } from '@/lib/knowledge/types'
|
||||
import { knowledgeKeys } from '@/hooks/queries/knowledge'
|
||||
import { useCreateChunk } from '@/hooks/queries/knowledge'
|
||||
|
||||
const logger = createLogger('CreateChunkModal')
|
||||
|
||||
@@ -31,16 +30,20 @@ export function CreateChunkModal({
|
||||
document,
|
||||
knowledgeBaseId,
|
||||
}: CreateChunkModalProps) {
|
||||
const queryClient = useQueryClient()
|
||||
const {
|
||||
mutate: createChunk,
|
||||
isPending: isCreating,
|
||||
error: mutationError,
|
||||
reset: resetMutation,
|
||||
} = useCreateChunk()
|
||||
const [content, setContent] = useState('')
|
||||
const [isCreating, setIsCreating] = useState(false)
|
||||
const [error, setError] = useState<string | null>(null)
|
||||
const [showUnsavedChangesAlert, setShowUnsavedChangesAlert] = useState(false)
|
||||
const isProcessingRef = useRef(false)
|
||||
|
||||
const error = mutationError?.message ?? null
|
||||
const hasUnsavedChanges = content.trim().length > 0
|
||||
|
||||
const handleCreateChunk = async () => {
|
||||
const handleCreateChunk = () => {
|
||||
if (!document || content.trim().length === 0 || isProcessingRef.current) {
|
||||
if (isProcessingRef.current) {
|
||||
logger.warn('Chunk creation already in progress, ignoring duplicate request')
|
||||
@@ -48,57 +51,32 @@ export function CreateChunkModal({
|
||||
return
|
||||
}
|
||||
|
||||
try {
|
||||
isProcessingRef.current = true
|
||||
setIsCreating(true)
|
||||
setError(null)
|
||||
isProcessingRef.current = true
|
||||
|
||||
const response = await fetch(
|
||||
`/api/knowledge/${knowledgeBaseId}/documents/${document.id}/chunks`,
|
||||
{
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
},
|
||||
body: JSON.stringify({
|
||||
content: content.trim(),
|
||||
enabled: true,
|
||||
}),
|
||||
}
|
||||
)
|
||||
|
||||
if (!response.ok) {
|
||||
const result = await response.json()
|
||||
throw new Error(result.error || 'Failed to create chunk')
|
||||
createChunk(
|
||||
{
|
||||
knowledgeBaseId,
|
||||
documentId: document.id,
|
||||
content: content.trim(),
|
||||
enabled: true,
|
||||
},
|
||||
{
|
||||
onSuccess: () => {
|
||||
isProcessingRef.current = false
|
||||
onClose()
|
||||
},
|
||||
onError: () => {
|
||||
isProcessingRef.current = false
|
||||
},
|
||||
}
|
||||
|
||||
const result = await response.json()
|
||||
|
||||
if (result.success && result.data) {
|
||||
logger.info('Chunk created successfully:', result.data.id)
|
||||
|
||||
await queryClient.invalidateQueries({
|
||||
queryKey: knowledgeKeys.detail(knowledgeBaseId),
|
||||
})
|
||||
|
||||
onClose()
|
||||
} else {
|
||||
throw new Error(result.error || 'Failed to create chunk')
|
||||
}
|
||||
} catch (err) {
|
||||
logger.error('Error creating chunk:', err)
|
||||
setError(err instanceof Error ? err.message : 'An error occurred')
|
||||
} finally {
|
||||
isProcessingRef.current = false
|
||||
setIsCreating(false)
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
const onClose = () => {
|
||||
onOpenChange(false)
|
||||
setContent('')
|
||||
setError(null)
|
||||
setShowUnsavedChangesAlert(false)
|
||||
resetMutation()
|
||||
}
|
||||
|
||||
const handleCloseAttempt = () => {
|
||||
|
||||
@@ -1,13 +1,8 @@
|
||||
'use client'
|
||||
|
||||
import { useState } from 'react'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { useQueryClient } from '@tanstack/react-query'
|
||||
import { Button, Modal, ModalBody, ModalContent, ModalFooter, ModalHeader } from '@/components/emcn'
|
||||
import type { ChunkData } from '@/lib/knowledge/types'
|
||||
import { knowledgeKeys } from '@/hooks/queries/knowledge'
|
||||
|
||||
const logger = createLogger('DeleteChunkModal')
|
||||
import { useDeleteChunk } from '@/hooks/queries/knowledge'
|
||||
|
||||
interface DeleteChunkModalProps {
|
||||
chunk: ChunkData | null
|
||||
@@ -24,44 +19,12 @@ export function DeleteChunkModal({
|
||||
isOpen,
|
||||
onClose,
|
||||
}: DeleteChunkModalProps) {
|
||||
const queryClient = useQueryClient()
|
||||
const [isDeleting, setIsDeleting] = useState(false)
|
||||
const { mutate: deleteChunk, isPending: isDeleting } = useDeleteChunk()
|
||||
|
||||
const handleDeleteChunk = async () => {
|
||||
const handleDeleteChunk = () => {
|
||||
if (!chunk || isDeleting) return
|
||||
|
||||
try {
|
||||
setIsDeleting(true)
|
||||
|
||||
const response = await fetch(
|
||||
`/api/knowledge/${knowledgeBaseId}/documents/${documentId}/chunks/${chunk.id}`,
|
||||
{
|
||||
method: 'DELETE',
|
||||
}
|
||||
)
|
||||
|
||||
if (!response.ok) {
|
||||
throw new Error('Failed to delete chunk')
|
||||
}
|
||||
|
||||
const result = await response.json()
|
||||
|
||||
if (result.success) {
|
||||
logger.info('Chunk deleted successfully:', chunk.id)
|
||||
|
||||
await queryClient.invalidateQueries({
|
||||
queryKey: knowledgeKeys.detail(knowledgeBaseId),
|
||||
})
|
||||
|
||||
onClose()
|
||||
} else {
|
||||
throw new Error(result.error || 'Failed to delete chunk')
|
||||
}
|
||||
} catch (err) {
|
||||
logger.error('Error deleting chunk:', err)
|
||||
} finally {
|
||||
setIsDeleting(false)
|
||||
}
|
||||
deleteChunk({ knowledgeBaseId, documentId, chunkId: chunk.id }, { onSuccess: onClose })
|
||||
}
|
||||
|
||||
if (!chunk) return null
|
||||
|
||||
@@ -25,6 +25,7 @@ import {
|
||||
} from '@/hooks/kb/use-knowledge-base-tag-definitions'
|
||||
import { useNextAvailableSlot } from '@/hooks/kb/use-next-available-slot'
|
||||
import { type TagDefinitionInput, useTagDefinitions } from '@/hooks/kb/use-tag-definitions'
|
||||
import { useUpdateDocumentTags } from '@/hooks/queries/knowledge'
|
||||
|
||||
const logger = createLogger('DocumentTagsModal')
|
||||
|
||||
@@ -58,8 +59,6 @@ function formatValueForDisplay(value: string, fieldType: string): string {
|
||||
try {
|
||||
const date = new Date(value)
|
||||
if (Number.isNaN(date.getTime())) return value
|
||||
// For UTC dates, display the UTC date to prevent timezone shifts
|
||||
// e.g., 2002-05-16T00:00:00.000Z should show as "May 16, 2002" not "May 15, 2002"
|
||||
if (typeof value === 'string' && (value.endsWith('Z') || /[+-]\d{2}:\d{2}$/.test(value))) {
|
||||
return new Date(
|
||||
date.getUTCFullYear(),
|
||||
@@ -96,6 +95,7 @@ export function DocumentTagsModal({
|
||||
const documentTagHook = useTagDefinitions(knowledgeBaseId, documentId)
|
||||
const kbTagHook = useKnowledgeBaseTagDefinitions(knowledgeBaseId)
|
||||
const { getNextAvailableSlot: getServerNextSlot } = useNextAvailableSlot(knowledgeBaseId)
|
||||
const { mutateAsync: updateDocumentTags } = useUpdateDocumentTags()
|
||||
|
||||
const { saveTagDefinitions, tagDefinitions, fetchTagDefinitions } = documentTagHook
|
||||
const { tagDefinitions: kbTagDefinitions, fetchTagDefinitions: refreshTagDefinitions } = kbTagHook
|
||||
@@ -118,7 +118,6 @@ export function DocumentTagsModal({
|
||||
const definition = definitions.find((def) => def.tagSlot === slot)
|
||||
|
||||
if (rawValue !== null && rawValue !== undefined && definition) {
|
||||
// Convert value to string for storage
|
||||
const stringValue = String(rawValue).trim()
|
||||
if (stringValue) {
|
||||
tags.push({
|
||||
@@ -142,41 +141,34 @@ export function DocumentTagsModal({
|
||||
async (tagsToSave: DocumentTag[]) => {
|
||||
if (!documentData) return
|
||||
|
||||
try {
|
||||
const tagData: Record<string, string> = {}
|
||||
const tagData: Record<string, string> = {}
|
||||
|
||||
// Only include tags that have values (omit empty ones)
|
||||
// Use empty string for slots that should be cleared
|
||||
ALL_TAG_SLOTS.forEach((slot) => {
|
||||
const tag = tagsToSave.find((t) => t.slot === slot)
|
||||
if (tag?.value.trim()) {
|
||||
tagData[slot] = tag.value.trim()
|
||||
} else {
|
||||
// Use empty string to clear a tag (API schema expects string, not null)
|
||||
tagData[slot] = ''
|
||||
}
|
||||
})
|
||||
|
||||
const response = await fetch(`/api/knowledge/${knowledgeBaseId}/documents/${documentId}`, {
|
||||
method: 'PUT',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
},
|
||||
body: JSON.stringify(tagData),
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
throw new Error('Failed to update document tags')
|
||||
ALL_TAG_SLOTS.forEach((slot) => {
|
||||
const tag = tagsToSave.find((t) => t.slot === slot)
|
||||
if (tag?.value.trim()) {
|
||||
tagData[slot] = tag.value.trim()
|
||||
} else {
|
||||
tagData[slot] = ''
|
||||
}
|
||||
})
|
||||
|
||||
onDocumentUpdate?.(tagData as Record<string, string>)
|
||||
await fetchTagDefinitions()
|
||||
} catch (error) {
|
||||
logger.error('Error updating document tags:', error)
|
||||
throw error
|
||||
}
|
||||
await updateDocumentTags({
|
||||
knowledgeBaseId,
|
||||
documentId,
|
||||
tags: tagData,
|
||||
})
|
||||
|
||||
onDocumentUpdate?.(tagData)
|
||||
await fetchTagDefinitions()
|
||||
},
|
||||
[documentData, knowledgeBaseId, documentId, fetchTagDefinitions, onDocumentUpdate]
|
||||
[
|
||||
documentData,
|
||||
knowledgeBaseId,
|
||||
documentId,
|
||||
updateDocumentTags,
|
||||
fetchTagDefinitions,
|
||||
onDocumentUpdate,
|
||||
]
|
||||
)
|
||||
|
||||
const handleRemoveTag = async (index: number) => {
|
||||
|
||||
@@ -2,7 +2,6 @@
|
||||
|
||||
import { useEffect, useMemo, useRef, useState } from 'react'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { useQueryClient } from '@tanstack/react-query'
|
||||
import { ChevronDown, ChevronUp } from 'lucide-react'
|
||||
import {
|
||||
Button,
|
||||
@@ -19,7 +18,7 @@ import {
|
||||
import type { ChunkData, DocumentData } from '@/lib/knowledge/types'
|
||||
import { getAccurateTokenCount, getTokenStrings } from '@/lib/tokenization/estimators'
|
||||
import { useUserPermissionsContext } from '@/app/workspace/[workspaceId]/providers/workspace-permissions-provider'
|
||||
import { knowledgeKeys } from '@/hooks/queries/knowledge'
|
||||
import { useUpdateChunk } from '@/hooks/queries/knowledge'
|
||||
|
||||
const logger = createLogger('EditChunkModal')
|
||||
|
||||
@@ -50,17 +49,22 @@ export function EditChunkModal({
|
||||
onNavigateToPage,
|
||||
maxChunkSize,
|
||||
}: EditChunkModalProps) {
|
||||
const queryClient = useQueryClient()
|
||||
const userPermissions = useUserPermissionsContext()
|
||||
const {
|
||||
mutate: updateChunk,
|
||||
isPending: isSaving,
|
||||
error: mutationError,
|
||||
reset: resetMutation,
|
||||
} = useUpdateChunk()
|
||||
const [editedContent, setEditedContent] = useState(chunk?.content || '')
|
||||
const [isSaving, setIsSaving] = useState(false)
|
||||
const [isNavigating, setIsNavigating] = useState(false)
|
||||
const [error, setError] = useState<string | null>(null)
|
||||
const [showUnsavedChangesAlert, setShowUnsavedChangesAlert] = useState(false)
|
||||
const [pendingNavigation, setPendingNavigation] = useState<(() => void) | null>(null)
|
||||
const [tokenizerOn, setTokenizerOn] = useState(false)
|
||||
const textareaRef = useRef<HTMLTextAreaElement>(null)
|
||||
|
||||
const error = mutationError?.message ?? null
|
||||
|
||||
const hasUnsavedChanges = editedContent !== (chunk?.content || '')
|
||||
|
||||
const tokenStrings = useMemo(() => {
|
||||
@@ -102,44 +106,15 @@ export function EditChunkModal({
|
||||
const canNavigatePrev = currentChunkIndex > 0 || currentPage > 1
|
||||
const canNavigateNext = currentChunkIndex < allChunks.length - 1 || currentPage < totalPages
|
||||
|
||||
const handleSaveContent = async () => {
|
||||
const handleSaveContent = () => {
|
||||
if (!chunk || !document) return
|
||||
|
||||
try {
|
||||
setIsSaving(true)
|
||||
setError(null)
|
||||
|
||||
const response = await fetch(
|
||||
`/api/knowledge/${knowledgeBaseId}/documents/${document.id}/chunks/${chunk.id}`,
|
||||
{
|
||||
method: 'PUT',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
},
|
||||
body: JSON.stringify({
|
||||
content: editedContent,
|
||||
}),
|
||||
}
|
||||
)
|
||||
|
||||
if (!response.ok) {
|
||||
const result = await response.json()
|
||||
throw new Error(result.error || 'Failed to update chunk')
|
||||
}
|
||||
|
||||
const result = await response.json()
|
||||
|
||||
if (result.success) {
|
||||
await queryClient.invalidateQueries({
|
||||
queryKey: knowledgeKeys.detail(knowledgeBaseId),
|
||||
})
|
||||
}
|
||||
} catch (err) {
|
||||
logger.error('Error updating chunk:', err)
|
||||
setError(err instanceof Error ? err.message : 'An error occurred')
|
||||
} finally {
|
||||
setIsSaving(false)
|
||||
}
|
||||
updateChunk({
|
||||
knowledgeBaseId,
|
||||
documentId: document.id,
|
||||
chunkId: chunk.id,
|
||||
content: editedContent,
|
||||
})
|
||||
}
|
||||
|
||||
const navigateToChunk = async (direction: 'prev' | 'next') => {
|
||||
@@ -165,7 +140,6 @@ export function EditChunkModal({
|
||||
}
|
||||
} catch (err) {
|
||||
logger.error(`Error navigating ${direction}:`, err)
|
||||
setError(`Failed to navigate to ${direction === 'prev' ? 'previous' : 'next'} chunk`)
|
||||
} finally {
|
||||
setIsNavigating(false)
|
||||
}
|
||||
@@ -185,6 +159,7 @@ export function EditChunkModal({
|
||||
setPendingNavigation(null)
|
||||
setShowUnsavedChangesAlert(true)
|
||||
} else {
|
||||
resetMutation()
|
||||
onClose()
|
||||
}
|
||||
}
|
||||
@@ -195,6 +170,7 @@ export function EditChunkModal({
|
||||
void pendingNavigation()
|
||||
setPendingNavigation(null)
|
||||
} else {
|
||||
resetMutation()
|
||||
onClose()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -48,7 +48,13 @@ import { ActionBar } from '@/app/workspace/[workspaceId]/knowledge/[id]/componen
|
||||
import { useUserPermissionsContext } from '@/app/workspace/[workspaceId]/providers/workspace-permissions-provider'
|
||||
import { useContextMenu } from '@/app/workspace/[workspaceId]/w/components/sidebar/hooks'
|
||||
import { useDocument, useDocumentChunks, useKnowledgeBase } from '@/hooks/kb/use-knowledge'
|
||||
import { knowledgeKeys, useDocumentChunkSearchQuery } from '@/hooks/queries/knowledge'
|
||||
import {
|
||||
knowledgeKeys,
|
||||
useBulkChunkOperation,
|
||||
useDeleteDocument,
|
||||
useDocumentChunkSearchQuery,
|
||||
useUpdateChunk,
|
||||
} from '@/hooks/queries/knowledge'
|
||||
|
||||
const logger = createLogger('Document')
|
||||
|
||||
@@ -403,11 +409,13 @@ export function Document({
|
||||
const [isCreateChunkModalOpen, setIsCreateChunkModalOpen] = useState(false)
|
||||
const [chunkToDelete, setChunkToDelete] = useState<ChunkData | null>(null)
|
||||
const [isDeleteModalOpen, setIsDeleteModalOpen] = useState(false)
|
||||
const [isBulkOperating, setIsBulkOperating] = useState(false)
|
||||
const [showDeleteDocumentDialog, setShowDeleteDocumentDialog] = useState(false)
|
||||
const [isDeletingDocument, setIsDeletingDocument] = useState(false)
|
||||
const [contextMenuChunk, setContextMenuChunk] = useState<ChunkData | null>(null)
|
||||
|
||||
const { mutate: updateChunkMutation } = useUpdateChunk()
|
||||
const { mutate: deleteDocumentMutation, isPending: isDeletingDocument } = useDeleteDocument()
|
||||
const { mutate: bulkChunkMutation, isPending: isBulkOperating } = useBulkChunkOperation()
|
||||
|
||||
const {
|
||||
isOpen: isContextMenuOpen,
|
||||
position: contextMenuPosition,
|
||||
@@ -440,36 +448,23 @@ export function Document({
|
||||
setSelectedChunk(null)
|
||||
}
|
||||
|
||||
const handleToggleEnabled = async (chunkId: string) => {
|
||||
const handleToggleEnabled = (chunkId: string) => {
|
||||
const chunk = displayChunks.find((c) => c.id === chunkId)
|
||||
if (!chunk) return
|
||||
|
||||
try {
|
||||
const response = await fetch(
|
||||
`/api/knowledge/${knowledgeBaseId}/documents/${documentId}/chunks/${chunkId}`,
|
||||
{
|
||||
method: 'PUT',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
},
|
||||
body: JSON.stringify({
|
||||
enabled: !chunk.enabled,
|
||||
}),
|
||||
}
|
||||
)
|
||||
|
||||
if (!response.ok) {
|
||||
throw new Error('Failed to update chunk')
|
||||
updateChunkMutation(
|
||||
{
|
||||
knowledgeBaseId,
|
||||
documentId,
|
||||
chunkId,
|
||||
enabled: !chunk.enabled,
|
||||
},
|
||||
{
|
||||
onSuccess: () => {
|
||||
updateChunk(chunkId, { enabled: !chunk.enabled })
|
||||
},
|
||||
}
|
||||
|
||||
const result = await response.json()
|
||||
|
||||
if (result.success) {
|
||||
updateChunk(chunkId, { enabled: !chunk.enabled })
|
||||
}
|
||||
} catch (err) {
|
||||
logger.error('Error updating chunk:', err)
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
const handleDeleteChunk = (chunkId: string) => {
|
||||
@@ -515,107 +510,69 @@ export function Document({
|
||||
/**
|
||||
* Handles deleting the document
|
||||
*/
|
||||
const handleDeleteDocument = async () => {
|
||||
const handleDeleteDocument = () => {
|
||||
if (!documentData) return
|
||||
|
||||
try {
|
||||
setIsDeletingDocument(true)
|
||||
|
||||
const response = await fetch(`/api/knowledge/${knowledgeBaseId}/documents/${documentId}`, {
|
||||
method: 'DELETE',
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
throw new Error('Failed to delete document')
|
||||
deleteDocumentMutation(
|
||||
{ knowledgeBaseId, documentId },
|
||||
{
|
||||
onSuccess: () => {
|
||||
router.push(`/workspace/${workspaceId}/knowledge/${knowledgeBaseId}`)
|
||||
},
|
||||
}
|
||||
|
||||
const result = await response.json()
|
||||
|
||||
if (result.success) {
|
||||
await queryClient.invalidateQueries({
|
||||
queryKey: knowledgeKeys.detail(knowledgeBaseId),
|
||||
})
|
||||
|
||||
router.push(`/workspace/${workspaceId}/knowledge/${knowledgeBaseId}`)
|
||||
} else {
|
||||
throw new Error(result.error || 'Failed to delete document')
|
||||
}
|
||||
} catch (err) {
|
||||
logger.error('Error deleting document:', err)
|
||||
setIsDeletingDocument(false)
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
const performBulkChunkOperation = async (
|
||||
const performBulkChunkOperation = (
|
||||
operation: 'enable' | 'disable' | 'delete',
|
||||
chunks: ChunkData[]
|
||||
) => {
|
||||
if (chunks.length === 0) return
|
||||
|
||||
try {
|
||||
setIsBulkOperating(true)
|
||||
|
||||
const response = await fetch(
|
||||
`/api/knowledge/${knowledgeBaseId}/documents/${documentId}/chunks`,
|
||||
{
|
||||
method: 'PATCH',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
},
|
||||
body: JSON.stringify({
|
||||
operation,
|
||||
chunkIds: chunks.map((chunk) => chunk.id),
|
||||
}),
|
||||
}
|
||||
)
|
||||
|
||||
if (!response.ok) {
|
||||
throw new Error(`Failed to ${operation} chunks`)
|
||||
bulkChunkMutation(
|
||||
{
|
||||
knowledgeBaseId,
|
||||
documentId,
|
||||
operation,
|
||||
chunkIds: chunks.map((chunk) => chunk.id),
|
||||
},
|
||||
{
|
||||
onSuccess: (result) => {
|
||||
if (operation === 'delete') {
|
||||
refreshChunks()
|
||||
} else {
|
||||
result.results.forEach((opResult) => {
|
||||
if (opResult.operation === operation) {
|
||||
opResult.chunkIds.forEach((chunkId: string) => {
|
||||
updateChunk(chunkId, { enabled: operation === 'enable' })
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
logger.info(`Successfully ${operation}d ${result.successCount} chunks`)
|
||||
setSelectedChunks(new Set())
|
||||
},
|
||||
}
|
||||
|
||||
const result = await response.json()
|
||||
|
||||
if (result.success) {
|
||||
if (operation === 'delete') {
|
||||
await refreshChunks()
|
||||
} else {
|
||||
result.data.results.forEach((opResult: any) => {
|
||||
if (opResult.operation === operation) {
|
||||
opResult.chunkIds.forEach((chunkId: string) => {
|
||||
updateChunk(chunkId, { enabled: operation === 'enable' })
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
logger.info(`Successfully ${operation}d ${result.data.successCount} chunks`)
|
||||
}
|
||||
|
||||
setSelectedChunks(new Set())
|
||||
} catch (err) {
|
||||
logger.error(`Error ${operation}ing chunks:`, err)
|
||||
} finally {
|
||||
setIsBulkOperating(false)
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
const handleBulkEnable = async () => {
|
||||
const handleBulkEnable = () => {
|
||||
const chunksToEnable = displayChunks.filter(
|
||||
(chunk) => selectedChunks.has(chunk.id) && !chunk.enabled
|
||||
)
|
||||
await performBulkChunkOperation('enable', chunksToEnable)
|
||||
performBulkChunkOperation('enable', chunksToEnable)
|
||||
}
|
||||
|
||||
const handleBulkDisable = async () => {
|
||||
const handleBulkDisable = () => {
|
||||
const chunksToDisable = displayChunks.filter(
|
||||
(chunk) => selectedChunks.has(chunk.id) && chunk.enabled
|
||||
)
|
||||
await performBulkChunkOperation('disable', chunksToDisable)
|
||||
performBulkChunkOperation('disable', chunksToDisable)
|
||||
}
|
||||
|
||||
const handleBulkDelete = async () => {
|
||||
const handleBulkDelete = () => {
|
||||
const chunksToDelete = displayChunks.filter((chunk) => selectedChunks.has(chunk.id))
|
||||
await performBulkChunkOperation('delete', chunksToDelete)
|
||||
performBulkChunkOperation('delete', chunksToDelete)
|
||||
}
|
||||
|
||||
const selectedChunksList = displayChunks.filter((chunk) => selectedChunks.has(chunk.id))
|
||||
|
||||
@@ -2,7 +2,6 @@
|
||||
|
||||
import { useCallback, useEffect, useRef, useState } from 'react'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { useQueryClient } from '@tanstack/react-query'
|
||||
import { format } from 'date-fns'
|
||||
import {
|
||||
AlertCircle,
|
||||
@@ -62,7 +61,12 @@ import {
|
||||
type TagDefinition,
|
||||
useKnowledgeBaseTagDefinitions,
|
||||
} from '@/hooks/kb/use-knowledge-base-tag-definitions'
|
||||
import { knowledgeKeys } from '@/hooks/queries/knowledge'
|
||||
import {
|
||||
useBulkDocumentOperation,
|
||||
useDeleteDocument,
|
||||
useDeleteKnowledgeBase,
|
||||
useUpdateDocument,
|
||||
} from '@/hooks/queries/knowledge'
|
||||
|
||||
const logger = createLogger('KnowledgeBase')
|
||||
|
||||
@@ -407,12 +411,17 @@ export function KnowledgeBase({
|
||||
id,
|
||||
knowledgeBaseName: passedKnowledgeBaseName,
|
||||
}: KnowledgeBaseProps) {
|
||||
const queryClient = useQueryClient()
|
||||
const params = useParams()
|
||||
const workspaceId = params.workspaceId as string
|
||||
const { removeKnowledgeBase } = useKnowledgeBasesList(workspaceId, { enabled: false })
|
||||
const userPermissions = useUserPermissionsContext()
|
||||
|
||||
const { mutate: updateDocumentMutation } = useUpdateDocument()
|
||||
const { mutate: deleteDocumentMutation } = useDeleteDocument()
|
||||
const { mutate: deleteKnowledgeBaseMutation, isPending: isDeleting } =
|
||||
useDeleteKnowledgeBase(workspaceId)
|
||||
const { mutate: bulkDocumentMutation, isPending: isBulkOperating } = useBulkDocumentOperation()
|
||||
|
||||
const [searchQuery, setSearchQuery] = useState('')
|
||||
const [showTagsModal, setShowTagsModal] = useState(false)
|
||||
|
||||
@@ -427,8 +436,6 @@ export function KnowledgeBase({
|
||||
const [selectedDocuments, setSelectedDocuments] = useState<Set<string>>(new Set())
|
||||
const [showDeleteDialog, setShowDeleteDialog] = useState(false)
|
||||
const [showAddDocumentsModal, setShowAddDocumentsModal] = useState(false)
|
||||
const [isDeleting, setIsDeleting] = useState(false)
|
||||
const [isBulkOperating, setIsBulkOperating] = useState(false)
|
||||
const [showDeleteDocumentModal, setShowDeleteDocumentModal] = useState(false)
|
||||
const [documentToDelete, setDocumentToDelete] = useState<string | null>(null)
|
||||
const [showBulkDeleteModal, setShowBulkDeleteModal] = useState(false)
|
||||
@@ -550,7 +557,7 @@ export function KnowledgeBase({
|
||||
/**
|
||||
* Checks for documents with stale processing states and marks them as failed
|
||||
*/
|
||||
const checkForDeadProcesses = async () => {
|
||||
const checkForDeadProcesses = () => {
|
||||
const now = new Date()
|
||||
const DEAD_PROCESS_THRESHOLD_MS = 600 * 1000 // 10 minutes
|
||||
|
||||
@@ -567,116 +574,79 @@ export function KnowledgeBase({
|
||||
|
||||
logger.warn(`Found ${staleDocuments.length} documents with dead processes`)
|
||||
|
||||
const markFailedPromises = staleDocuments.map(async (doc) => {
|
||||
try {
|
||||
const response = await fetch(`/api/knowledge/${id}/documents/${doc.id}`, {
|
||||
method: 'PUT',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
staleDocuments.forEach((doc) => {
|
||||
updateDocumentMutation(
|
||||
{
|
||||
knowledgeBaseId: id,
|
||||
documentId: doc.id,
|
||||
updates: { markFailedDueToTimeout: true },
|
||||
},
|
||||
{
|
||||
onSuccess: () => {
|
||||
logger.info(`Successfully marked dead process as failed for document: ${doc.filename}`)
|
||||
},
|
||||
body: JSON.stringify({
|
||||
markFailedDueToTimeout: true,
|
||||
}),
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
const errorData = await response.json().catch(() => ({ error: 'Unknown error' }))
|
||||
logger.error(`Failed to mark document ${doc.id} as failed: ${errorData.error}`)
|
||||
return
|
||||
}
|
||||
|
||||
const result = await response.json()
|
||||
if (result.success) {
|
||||
logger.info(`Successfully marked dead process as failed for document: ${doc.filename}`)
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error(`Error marking document ${doc.id} as failed:`, error)
|
||||
}
|
||||
)
|
||||
})
|
||||
|
||||
await Promise.allSettled(markFailedPromises)
|
||||
}
|
||||
|
||||
const handleToggleEnabled = async (docId: string) => {
|
||||
const handleToggleEnabled = (docId: string) => {
|
||||
const document = documents.find((doc) => doc.id === docId)
|
||||
if (!document) return
|
||||
|
||||
const newEnabled = !document.enabled
|
||||
|
||||
// Optimistic update
|
||||
updateDocument(docId, { enabled: newEnabled })
|
||||
|
||||
try {
|
||||
const response = await fetch(`/api/knowledge/${id}/documents/${docId}`, {
|
||||
method: 'PUT',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
updateDocumentMutation(
|
||||
{
|
||||
knowledgeBaseId: id,
|
||||
documentId: docId,
|
||||
updates: { enabled: newEnabled },
|
||||
},
|
||||
{
|
||||
onError: () => {
|
||||
// Rollback on error
|
||||
updateDocument(docId, { enabled: !newEnabled })
|
||||
},
|
||||
body: JSON.stringify({
|
||||
enabled: newEnabled,
|
||||
}),
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
throw new Error('Failed to update document')
|
||||
}
|
||||
|
||||
const result = await response.json()
|
||||
|
||||
if (!result.success) {
|
||||
updateDocument(docId, { enabled: !newEnabled })
|
||||
}
|
||||
} catch (err) {
|
||||
updateDocument(docId, { enabled: !newEnabled })
|
||||
logger.error('Error updating document:', err)
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles retrying a failed document processing
|
||||
*/
|
||||
const handleRetryDocument = async (docId: string) => {
|
||||
try {
|
||||
updateDocument(docId, {
|
||||
processingStatus: 'pending',
|
||||
processingError: null,
|
||||
processingStartedAt: null,
|
||||
processingCompletedAt: null,
|
||||
})
|
||||
const handleRetryDocument = (docId: string) => {
|
||||
// Optimistic update
|
||||
updateDocument(docId, {
|
||||
processingStatus: 'pending',
|
||||
processingError: null,
|
||||
processingStartedAt: null,
|
||||
processingCompletedAt: null,
|
||||
})
|
||||
|
||||
const response = await fetch(`/api/knowledge/${id}/documents/${docId}`, {
|
||||
method: 'PUT',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
updateDocumentMutation(
|
||||
{
|
||||
knowledgeBaseId: id,
|
||||
documentId: docId,
|
||||
updates: { retryProcessing: true },
|
||||
},
|
||||
{
|
||||
onSuccess: () => {
|
||||
refreshDocuments()
|
||||
logger.info(`Document retry initiated successfully for: ${docId}`)
|
||||
},
|
||||
onError: (err) => {
|
||||
logger.error('Error retrying document:', err)
|
||||
updateDocument(docId, {
|
||||
processingStatus: 'failed',
|
||||
processingError:
|
||||
err instanceof Error ? err.message : 'Failed to retry document processing',
|
||||
})
|
||||
},
|
||||
body: JSON.stringify({
|
||||
retryProcessing: true,
|
||||
}),
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
throw new Error('Failed to retry document processing')
|
||||
}
|
||||
|
||||
const result = await response.json()
|
||||
|
||||
if (!result.success) {
|
||||
throw new Error(result.error || 'Failed to retry document processing')
|
||||
}
|
||||
|
||||
await refreshDocuments()
|
||||
|
||||
logger.info(`Document retry initiated successfully for: ${docId}`)
|
||||
} catch (err) {
|
||||
logger.error('Error retrying document:', err)
|
||||
const currentDoc = documents.find((doc) => doc.id === docId)
|
||||
if (currentDoc) {
|
||||
updateDocument(docId, {
|
||||
processingStatus: 'failed',
|
||||
processingError:
|
||||
err instanceof Error ? err.message : 'Failed to retry document processing',
|
||||
})
|
||||
}
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -694,43 +664,32 @@ export function KnowledgeBase({
|
||||
const currentDoc = documents.find((doc) => doc.id === documentId)
|
||||
const previousName = currentDoc?.filename
|
||||
|
||||
// Optimistic update
|
||||
updateDocument(documentId, { filename: newName })
|
||||
queryClient.setQueryData<DocumentData>(knowledgeKeys.document(id, documentId), (previous) =>
|
||||
previous ? { ...previous, filename: newName } : previous
|
||||
)
|
||||
|
||||
try {
|
||||
const response = await fetch(`/api/knowledge/${id}/documents/${documentId}`, {
|
||||
method: 'PUT',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
updateDocumentMutation(
|
||||
{
|
||||
knowledgeBaseId: id,
|
||||
documentId,
|
||||
updates: { filename: newName },
|
||||
},
|
||||
body: JSON.stringify({ filename: newName }),
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
const result = await response.json()
|
||||
throw new Error(result.error || 'Failed to rename document')
|
||||
}
|
||||
|
||||
const result = await response.json()
|
||||
|
||||
if (!result.success) {
|
||||
throw new Error(result.error || 'Failed to rename document')
|
||||
}
|
||||
|
||||
logger.info(`Document renamed: ${documentId}`)
|
||||
} catch (err) {
|
||||
if (previousName !== undefined) {
|
||||
updateDocument(documentId, { filename: previousName })
|
||||
queryClient.setQueryData<DocumentData>(
|
||||
knowledgeKeys.document(id, documentId),
|
||||
(previous) => (previous ? { ...previous, filename: previousName } : previous)
|
||||
)
|
||||
}
|
||||
logger.error('Error renaming document:', err)
|
||||
throw err
|
||||
}
|
||||
{
|
||||
onSuccess: () => {
|
||||
logger.info(`Document renamed: ${documentId}`)
|
||||
resolve()
|
||||
},
|
||||
onError: (err) => {
|
||||
// Rollback on error
|
||||
if (previousName !== undefined) {
|
||||
updateDocument(documentId, { filename: previousName })
|
||||
}
|
||||
logger.error('Error renaming document:', err)
|
||||
reject(err)
|
||||
},
|
||||
}
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -744,35 +703,26 @@ export function KnowledgeBase({
|
||||
/**
|
||||
* Confirms and executes the deletion of a single document
|
||||
*/
|
||||
const confirmDeleteDocument = async () => {
|
||||
const confirmDeleteDocument = () => {
|
||||
if (!documentToDelete) return
|
||||
|
||||
try {
|
||||
const response = await fetch(`/api/knowledge/${id}/documents/${documentToDelete}`, {
|
||||
method: 'DELETE',
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
throw new Error('Failed to delete document')
|
||||
deleteDocumentMutation(
|
||||
{ knowledgeBaseId: id, documentId: documentToDelete },
|
||||
{
|
||||
onSuccess: () => {
|
||||
refreshDocuments()
|
||||
setSelectedDocuments((prev) => {
|
||||
const newSet = new Set(prev)
|
||||
newSet.delete(documentToDelete)
|
||||
return newSet
|
||||
})
|
||||
},
|
||||
onSettled: () => {
|
||||
setShowDeleteDocumentModal(false)
|
||||
setDocumentToDelete(null)
|
||||
},
|
||||
}
|
||||
|
||||
const result = await response.json()
|
||||
|
||||
if (result.success) {
|
||||
refreshDocuments()
|
||||
|
||||
setSelectedDocuments((prev) => {
|
||||
const newSet = new Set(prev)
|
||||
newSet.delete(documentToDelete)
|
||||
return newSet
|
||||
})
|
||||
}
|
||||
} catch (err) {
|
||||
logger.error('Error deleting document:', err)
|
||||
} finally {
|
||||
setShowDeleteDocumentModal(false)
|
||||
setDocumentToDelete(null)
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -818,32 +768,18 @@ export function KnowledgeBase({
|
||||
/**
|
||||
* Handles deleting the entire knowledge base
|
||||
*/
|
||||
const handleDeleteKnowledgeBase = async () => {
|
||||
const handleDeleteKnowledgeBase = () => {
|
||||
if (!knowledgeBase) return
|
||||
|
||||
try {
|
||||
setIsDeleting(true)
|
||||
|
||||
const response = await fetch(`/api/knowledge/${id}`, {
|
||||
method: 'DELETE',
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
throw new Error('Failed to delete knowledge base')
|
||||
deleteKnowledgeBaseMutation(
|
||||
{ knowledgeBaseId: id },
|
||||
{
|
||||
onSuccess: () => {
|
||||
removeKnowledgeBase(id)
|
||||
router.push(`/workspace/${workspaceId}/knowledge`)
|
||||
},
|
||||
}
|
||||
|
||||
const result = await response.json()
|
||||
|
||||
if (result.success) {
|
||||
removeKnowledgeBase(id)
|
||||
router.push(`/workspace/${workspaceId}/knowledge`)
|
||||
} else {
|
||||
throw new Error(result.error || 'Failed to delete knowledge base')
|
||||
}
|
||||
} catch (err) {
|
||||
logger.error('Error deleting knowledge base:', err)
|
||||
setIsDeleting(false)
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -856,93 +792,57 @@ export function KnowledgeBase({
|
||||
/**
|
||||
* Handles bulk enabling of selected documents
|
||||
*/
|
||||
const handleBulkEnable = async () => {
|
||||
const handleBulkEnable = () => {
|
||||
const documentsToEnable = documents.filter(
|
||||
(doc) => selectedDocuments.has(doc.id) && !doc.enabled
|
||||
)
|
||||
|
||||
if (documentsToEnable.length === 0) return
|
||||
|
||||
try {
|
||||
setIsBulkOperating(true)
|
||||
|
||||
const response = await fetch(`/api/knowledge/${id}/documents`, {
|
||||
method: 'PATCH',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
bulkDocumentMutation(
|
||||
{
|
||||
knowledgeBaseId: id,
|
||||
operation: 'enable',
|
||||
documentIds: documentsToEnable.map((doc) => doc.id),
|
||||
},
|
||||
{
|
||||
onSuccess: (result) => {
|
||||
result.updatedDocuments?.forEach((updatedDoc) => {
|
||||
updateDocument(updatedDoc.id, { enabled: updatedDoc.enabled })
|
||||
})
|
||||
logger.info(`Successfully enabled ${result.successCount} documents`)
|
||||
setSelectedDocuments(new Set())
|
||||
},
|
||||
body: JSON.stringify({
|
||||
operation: 'enable',
|
||||
documentIds: documentsToEnable.map((doc) => doc.id),
|
||||
}),
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
throw new Error('Failed to enable documents')
|
||||
}
|
||||
|
||||
const result = await response.json()
|
||||
|
||||
if (result.success) {
|
||||
result.data.updatedDocuments.forEach((updatedDoc: { id: string; enabled: boolean }) => {
|
||||
updateDocument(updatedDoc.id, { enabled: updatedDoc.enabled })
|
||||
})
|
||||
|
||||
logger.info(`Successfully enabled ${result.data.successCount} documents`)
|
||||
}
|
||||
|
||||
setSelectedDocuments(new Set())
|
||||
} catch (err) {
|
||||
logger.error('Error enabling documents:', err)
|
||||
} finally {
|
||||
setIsBulkOperating(false)
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles bulk disabling of selected documents
|
||||
*/
|
||||
const handleBulkDisable = async () => {
|
||||
const handleBulkDisable = () => {
|
||||
const documentsToDisable = documents.filter(
|
||||
(doc) => selectedDocuments.has(doc.id) && doc.enabled
|
||||
)
|
||||
|
||||
if (documentsToDisable.length === 0) return
|
||||
|
||||
try {
|
||||
setIsBulkOperating(true)
|
||||
|
||||
const response = await fetch(`/api/knowledge/${id}/documents`, {
|
||||
method: 'PATCH',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
bulkDocumentMutation(
|
||||
{
|
||||
knowledgeBaseId: id,
|
||||
operation: 'disable',
|
||||
documentIds: documentsToDisable.map((doc) => doc.id),
|
||||
},
|
||||
{
|
||||
onSuccess: (result) => {
|
||||
result.updatedDocuments?.forEach((updatedDoc) => {
|
||||
updateDocument(updatedDoc.id, { enabled: updatedDoc.enabled })
|
||||
})
|
||||
logger.info(`Successfully disabled ${result.successCount} documents`)
|
||||
setSelectedDocuments(new Set())
|
||||
},
|
||||
body: JSON.stringify({
|
||||
operation: 'disable',
|
||||
documentIds: documentsToDisable.map((doc) => doc.id),
|
||||
}),
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
throw new Error('Failed to disable documents')
|
||||
}
|
||||
|
||||
const result = await response.json()
|
||||
|
||||
if (result.success) {
|
||||
result.data.updatedDocuments.forEach((updatedDoc: { id: string; enabled: boolean }) => {
|
||||
updateDocument(updatedDoc.id, { enabled: updatedDoc.enabled })
|
||||
})
|
||||
|
||||
logger.info(`Successfully disabled ${result.data.successCount} documents`)
|
||||
}
|
||||
|
||||
setSelectedDocuments(new Set())
|
||||
} catch (err) {
|
||||
logger.error('Error disabling documents:', err)
|
||||
} finally {
|
||||
setIsBulkOperating(false)
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -956,44 +856,28 @@ export function KnowledgeBase({
|
||||
/**
|
||||
* Confirms and executes the bulk deletion of selected documents
|
||||
*/
|
||||
const confirmBulkDelete = async () => {
|
||||
const confirmBulkDelete = () => {
|
||||
const documentsToDelete = documents.filter((doc) => selectedDocuments.has(doc.id))
|
||||
|
||||
if (documentsToDelete.length === 0) return
|
||||
|
||||
try {
|
||||
setIsBulkOperating(true)
|
||||
|
||||
const response = await fetch(`/api/knowledge/${id}/documents`, {
|
||||
method: 'PATCH',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
bulkDocumentMutation(
|
||||
{
|
||||
knowledgeBaseId: id,
|
||||
operation: 'delete',
|
||||
documentIds: documentsToDelete.map((doc) => doc.id),
|
||||
},
|
||||
{
|
||||
onSuccess: (result) => {
|
||||
logger.info(`Successfully deleted ${result.successCount} documents`)
|
||||
refreshDocuments()
|
||||
setSelectedDocuments(new Set())
|
||||
},
|
||||
onSettled: () => {
|
||||
setShowBulkDeleteModal(false)
|
||||
},
|
||||
body: JSON.stringify({
|
||||
operation: 'delete',
|
||||
documentIds: documentsToDelete.map((doc) => doc.id),
|
||||
}),
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
throw new Error('Failed to delete documents')
|
||||
}
|
||||
|
||||
const result = await response.json()
|
||||
|
||||
if (result.success) {
|
||||
logger.info(`Successfully deleted ${result.data.successCount} documents`)
|
||||
}
|
||||
|
||||
await refreshDocuments()
|
||||
|
||||
setSelectedDocuments(new Set())
|
||||
} catch (err) {
|
||||
logger.error('Error deleting documents:', err)
|
||||
} finally {
|
||||
setIsBulkOperating(false)
|
||||
setShowBulkDeleteModal(false)
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
const selectedDocumentsList = documents.filter((doc) => selectedDocuments.has(doc.id))
|
||||
|
||||
@@ -22,10 +22,10 @@ import {
|
||||
type TagDefinition,
|
||||
useKnowledgeBaseTagDefinitions,
|
||||
} from '@/hooks/kb/use-knowledge-base-tag-definitions'
|
||||
import { useCreateTagDefinition, useDeleteTagDefinition } from '@/hooks/queries/knowledge'
|
||||
|
||||
const logger = createLogger('BaseTagsModal')
|
||||
|
||||
/** Field type display labels */
|
||||
const FIELD_TYPE_LABELS: Record<string, string> = {
|
||||
text: 'Text',
|
||||
number: 'Number',
|
||||
@@ -45,7 +45,6 @@ interface DocumentListProps {
|
||||
totalCount: number
|
||||
}
|
||||
|
||||
/** Displays a list of documents affected by tag operations */
|
||||
function DocumentList({ documents, totalCount }: DocumentListProps) {
|
||||
const displayLimit = 5
|
||||
const hasMore = totalCount > displayLimit
|
||||
@@ -95,13 +94,14 @@ export function BaseTagsModal({ open, onOpenChange, knowledgeBaseId }: BaseTagsM
|
||||
const { tagDefinitions: kbTagDefinitions, fetchTagDefinitions: refreshTagDefinitions } =
|
||||
useKnowledgeBaseTagDefinitions(knowledgeBaseId)
|
||||
|
||||
const createTagMutation = useCreateTagDefinition()
|
||||
const deleteTagMutation = useDeleteTagDefinition()
|
||||
|
||||
const [deleteTagDialogOpen, setDeleteTagDialogOpen] = useState(false)
|
||||
const [selectedTag, setSelectedTag] = useState<TagDefinition | null>(null)
|
||||
const [viewDocumentsDialogOpen, setViewDocumentsDialogOpen] = useState(false)
|
||||
const [isDeletingTag, setIsDeletingTag] = useState(false)
|
||||
const [tagUsageData, setTagUsageData] = useState<TagUsageData[]>([])
|
||||
const [isCreatingTag, setIsCreatingTag] = useState(false)
|
||||
const [isSavingTag, setIsSavingTag] = useState(false)
|
||||
const [createTagForm, setCreateTagForm] = useState({
|
||||
displayName: '',
|
||||
fieldType: 'text',
|
||||
@@ -177,13 +177,12 @@ export function BaseTagsModal({ open, onOpenChange, knowledgeBaseId }: BaseTagsM
|
||||
}
|
||||
|
||||
const tagNameConflict =
|
||||
isCreatingTag && !isSavingTag && hasTagNameConflict(createTagForm.displayName)
|
||||
isCreatingTag && !createTagMutation.isPending && hasTagNameConflict(createTagForm.displayName)
|
||||
|
||||
const canSaveTag = () => {
|
||||
return createTagForm.displayName.trim() && !hasTagNameConflict(createTagForm.displayName)
|
||||
}
|
||||
|
||||
/** Get slot usage counts per field type */
|
||||
const getSlotUsageByFieldType = (fieldType: string): { used: number; max: number } => {
|
||||
const config = TAG_SLOT_CONFIG[fieldType as keyof typeof TAG_SLOT_CONFIG]
|
||||
if (!config) return { used: 0, max: 0 }
|
||||
@@ -191,13 +190,11 @@ export function BaseTagsModal({ open, onOpenChange, knowledgeBaseId }: BaseTagsM
|
||||
return { used, max: config.maxSlots }
|
||||
}
|
||||
|
||||
/** Check if a field type has available slots */
|
||||
const hasAvailableSlots = (fieldType: string): boolean => {
|
||||
const { used, max } = getSlotUsageByFieldType(fieldType)
|
||||
return used < max
|
||||
}
|
||||
|
||||
/** Field type options for Combobox */
|
||||
const fieldTypeOptions: ComboboxOption[] = useMemo(() => {
|
||||
return SUPPORTED_FIELD_TYPES.filter((type) => hasAvailableSlots(type)).map((type) => {
|
||||
const { used, max } = getSlotUsageByFieldType(type)
|
||||
@@ -211,43 +208,17 @@ export function BaseTagsModal({ open, onOpenChange, knowledgeBaseId }: BaseTagsM
|
||||
const saveTagDefinition = async () => {
|
||||
if (!canSaveTag()) return
|
||||
|
||||
setIsSavingTag(true)
|
||||
try {
|
||||
// Check if selected field type has available slots
|
||||
if (!hasAvailableSlots(createTagForm.fieldType)) {
|
||||
throw new Error(`No available slots for ${createTagForm.fieldType} type`)
|
||||
}
|
||||
|
||||
// Get the next available slot from the API
|
||||
const slotResponse = await fetch(
|
||||
`/api/knowledge/${knowledgeBaseId}/next-available-slot?fieldType=${createTagForm.fieldType}`
|
||||
)
|
||||
if (!slotResponse.ok) {
|
||||
throw new Error('Failed to get available slot')
|
||||
}
|
||||
const slotResult = await slotResponse.json()
|
||||
if (!slotResult.success || !slotResult.data?.nextAvailableSlot) {
|
||||
throw new Error('No available tag slots for this field type')
|
||||
}
|
||||
|
||||
const newTagDefinition = {
|
||||
tagSlot: slotResult.data.nextAvailableSlot,
|
||||
await createTagMutation.mutateAsync({
|
||||
knowledgeBaseId,
|
||||
displayName: createTagForm.displayName.trim(),
|
||||
fieldType: createTagForm.fieldType,
|
||||
}
|
||||
|
||||
const response = await fetch(`/api/knowledge/${knowledgeBaseId}/tag-definitions`, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
},
|
||||
body: JSON.stringify(newTagDefinition),
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
throw new Error('Failed to create tag definition')
|
||||
}
|
||||
|
||||
await Promise.all([refreshTagDefinitions(), fetchTagUsage()])
|
||||
|
||||
setCreateTagForm({
|
||||
@@ -257,27 +228,17 @@ export function BaseTagsModal({ open, onOpenChange, knowledgeBaseId }: BaseTagsM
|
||||
setIsCreatingTag(false)
|
||||
} catch (error) {
|
||||
logger.error('Error creating tag definition:', error)
|
||||
} finally {
|
||||
setIsSavingTag(false)
|
||||
}
|
||||
}
|
||||
|
||||
const confirmDeleteTag = async () => {
|
||||
if (!selectedTag) return
|
||||
|
||||
setIsDeletingTag(true)
|
||||
try {
|
||||
const response = await fetch(
|
||||
`/api/knowledge/${knowledgeBaseId}/tag-definitions/${selectedTag.id}`,
|
||||
{
|
||||
method: 'DELETE',
|
||||
}
|
||||
)
|
||||
|
||||
if (!response.ok) {
|
||||
const errorText = await response.text()
|
||||
throw new Error(`Failed to delete tag definition: ${response.status} ${errorText}`)
|
||||
}
|
||||
await deleteTagMutation.mutateAsync({
|
||||
knowledgeBaseId,
|
||||
tagDefinitionId: selectedTag.id,
|
||||
})
|
||||
|
||||
await Promise.all([refreshTagDefinitions(), fetchTagUsage()])
|
||||
|
||||
@@ -285,8 +246,6 @@ export function BaseTagsModal({ open, onOpenChange, knowledgeBaseId }: BaseTagsM
|
||||
setSelectedTag(null)
|
||||
} catch (error) {
|
||||
logger.error('Error deleting tag definition:', error)
|
||||
} finally {
|
||||
setIsDeletingTag(false)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -433,11 +392,11 @@ export function BaseTagsModal({ open, onOpenChange, knowledgeBaseId }: BaseTagsM
|
||||
className='flex-1'
|
||||
disabled={
|
||||
!canSaveTag() ||
|
||||
isSavingTag ||
|
||||
createTagMutation.isPending ||
|
||||
!hasAvailableSlots(createTagForm.fieldType)
|
||||
}
|
||||
>
|
||||
{isSavingTag ? 'Creating...' : 'Create Tag'}
|
||||
{createTagMutation.isPending ? 'Creating...' : 'Create Tag'}
|
||||
</Button>
|
||||
</div>
|
||||
</div>
|
||||
@@ -481,13 +440,17 @@ export function BaseTagsModal({ open, onOpenChange, knowledgeBaseId }: BaseTagsM
|
||||
<ModalFooter>
|
||||
<Button
|
||||
variant='default'
|
||||
disabled={isDeletingTag}
|
||||
disabled={deleteTagMutation.isPending}
|
||||
onClick={() => setDeleteTagDialogOpen(false)}
|
||||
>
|
||||
Cancel
|
||||
</Button>
|
||||
<Button variant='destructive' onClick={confirmDeleteTag} disabled={isDeletingTag}>
|
||||
{isDeletingTag ? <>Deleting...</> : 'Delete Tag'}
|
||||
<Button
|
||||
variant='destructive'
|
||||
onClick={confirmDeleteTag}
|
||||
disabled={deleteTagMutation.isPending}
|
||||
>
|
||||
{deleteTagMutation.isPending ? 'Deleting...' : 'Delete Tag'}
|
||||
</Button>
|
||||
</ModalFooter>
|
||||
</ModalContent>
|
||||
|
||||
@@ -3,7 +3,6 @@
|
||||
import { useEffect, useRef, useState } from 'react'
|
||||
import { zodResolver } from '@hookform/resolvers/zod'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { useQueryClient } from '@tanstack/react-query'
|
||||
import { Loader2, RotateCcw, X } from 'lucide-react'
|
||||
import { useParams } from 'next/navigation'
|
||||
import { useForm } from 'react-hook-form'
|
||||
@@ -23,7 +22,7 @@ import { cn } from '@/lib/core/utils/cn'
|
||||
import { formatFileSize, validateKnowledgeBaseFile } from '@/lib/uploads/utils/file-utils'
|
||||
import { ACCEPT_ATTRIBUTE } from '@/lib/uploads/utils/validation'
|
||||
import { useKnowledgeUpload } from '@/app/workspace/[workspaceId]/knowledge/hooks/use-knowledge-upload'
|
||||
import { knowledgeKeys } from '@/hooks/queries/knowledge'
|
||||
import { useCreateKnowledgeBase, useDeleteKnowledgeBase } from '@/hooks/queries/knowledge'
|
||||
|
||||
const logger = createLogger('CreateBaseModal')
|
||||
|
||||
@@ -82,10 +81,11 @@ interface SubmitStatus {
|
||||
export function CreateBaseModal({ open, onOpenChange }: CreateBaseModalProps) {
|
||||
const params = useParams()
|
||||
const workspaceId = params.workspaceId as string
|
||||
const queryClient = useQueryClient()
|
||||
|
||||
const createKnowledgeBaseMutation = useCreateKnowledgeBase(workspaceId)
|
||||
const deleteKnowledgeBaseMutation = useDeleteKnowledgeBase(workspaceId)
|
||||
|
||||
const fileInputRef = useRef<HTMLInputElement>(null)
|
||||
const [isSubmitting, setIsSubmitting] = useState(false)
|
||||
const [submitStatus, setSubmitStatus] = useState<SubmitStatus | null>(null)
|
||||
const [files, setFiles] = useState<FileWithPreview[]>([])
|
||||
const [fileError, setFileError] = useState<string | null>(null)
|
||||
@@ -245,12 +245,14 @@ export function CreateBaseModal({ open, onOpenChange }: CreateBaseModalProps) {
|
||||
})
|
||||
}
|
||||
|
||||
const isSubmitting =
|
||||
createKnowledgeBaseMutation.isPending || deleteKnowledgeBaseMutation.isPending || isUploading
|
||||
|
||||
const onSubmit = async (data: FormValues) => {
|
||||
setIsSubmitting(true)
|
||||
setSubmitStatus(null)
|
||||
|
||||
try {
|
||||
const knowledgeBasePayload = {
|
||||
const newKnowledgeBase = await createKnowledgeBaseMutation.mutateAsync({
|
||||
name: data.name,
|
||||
description: data.description || undefined,
|
||||
workspaceId: workspaceId,
|
||||
@@ -259,29 +261,8 @@ export function CreateBaseModal({ open, onOpenChange }: CreateBaseModalProps) {
|
||||
minSize: data.minChunkSize,
|
||||
overlap: data.overlapSize,
|
||||
},
|
||||
}
|
||||
|
||||
const response = await fetch('/api/knowledge', {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
},
|
||||
body: JSON.stringify(knowledgeBasePayload),
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
const errorData = await response.json()
|
||||
throw new Error(errorData.error || 'Failed to create knowledge base')
|
||||
}
|
||||
|
||||
const result = await response.json()
|
||||
|
||||
if (!result.success) {
|
||||
throw new Error(result.error || 'Failed to create knowledge base')
|
||||
}
|
||||
|
||||
const newKnowledgeBase = result.data
|
||||
|
||||
if (files.length > 0) {
|
||||
try {
|
||||
const uploadedFiles = await uploadFiles(files, newKnowledgeBase.id, {
|
||||
@@ -293,15 +274,11 @@ export function CreateBaseModal({ open, onOpenChange }: CreateBaseModalProps) {
|
||||
|
||||
logger.info(`Successfully uploaded ${uploadedFiles.length} files`)
|
||||
logger.info(`Started processing ${uploadedFiles.length} documents in the background`)
|
||||
|
||||
await queryClient.invalidateQueries({
|
||||
queryKey: knowledgeKeys.list(workspaceId),
|
||||
})
|
||||
} catch (uploadError) {
|
||||
logger.error('File upload failed, deleting knowledge base:', uploadError)
|
||||
try {
|
||||
await fetch(`/api/knowledge/${newKnowledgeBase.id}`, {
|
||||
method: 'DELETE',
|
||||
await deleteKnowledgeBaseMutation.mutateAsync({
|
||||
knowledgeBaseId: newKnowledgeBase.id,
|
||||
})
|
||||
logger.info(`Deleted orphaned knowledge base: ${newKnowledgeBase.id}`)
|
||||
} catch (deleteError) {
|
||||
@@ -309,10 +286,6 @@ export function CreateBaseModal({ open, onOpenChange }: CreateBaseModalProps) {
|
||||
}
|
||||
throw uploadError
|
||||
}
|
||||
} else {
|
||||
await queryClient.invalidateQueries({
|
||||
queryKey: knowledgeKeys.list(workspaceId),
|
||||
})
|
||||
}
|
||||
|
||||
files.forEach((file) => URL.revokeObjectURL(file.preview))
|
||||
@@ -325,8 +298,6 @@ export function CreateBaseModal({ open, onOpenChange }: CreateBaseModalProps) {
|
||||
type: 'error',
|
||||
message: error instanceof Error ? error.message : 'An unknown error occurred',
|
||||
})
|
||||
} finally {
|
||||
setIsSubmitting(false)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -2,7 +2,6 @@
|
||||
|
||||
import { useEffect, useState } from 'react'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { useQueryClient } from '@tanstack/react-query'
|
||||
import { AlertTriangle, ChevronDown, LibraryBig, MoreHorizontal } from 'lucide-react'
|
||||
import Link from 'next/link'
|
||||
import {
|
||||
@@ -15,7 +14,7 @@ import {
|
||||
} from '@/components/emcn'
|
||||
import { Trash } from '@/components/emcn/icons/trash'
|
||||
import { filterButtonClass } from '@/app/workspace/[workspaceId]/knowledge/components/constants'
|
||||
import { knowledgeKeys } from '@/hooks/queries/knowledge'
|
||||
import { useUpdateKnowledgeBase } from '@/hooks/queries/knowledge'
|
||||
|
||||
const logger = createLogger('KnowledgeHeader')
|
||||
|
||||
@@ -54,14 +53,13 @@ interface Workspace {
|
||||
}
|
||||
|
||||
export function KnowledgeHeader({ breadcrumbs, options }: KnowledgeHeaderProps) {
|
||||
const queryClient = useQueryClient()
|
||||
const [isActionsPopoverOpen, setIsActionsPopoverOpen] = useState(false)
|
||||
const [isWorkspacePopoverOpen, setIsWorkspacePopoverOpen] = useState(false)
|
||||
const [workspaces, setWorkspaces] = useState<Workspace[]>([])
|
||||
const [isLoadingWorkspaces, setIsLoadingWorkspaces] = useState(false)
|
||||
const [isUpdatingWorkspace, setIsUpdatingWorkspace] = useState(false)
|
||||
|
||||
// Fetch available workspaces
|
||||
const updateKnowledgeBase = useUpdateKnowledgeBase()
|
||||
|
||||
useEffect(() => {
|
||||
if (!options?.knowledgeBaseId) return
|
||||
|
||||
@@ -76,7 +74,6 @@ export function KnowledgeHeader({ breadcrumbs, options }: KnowledgeHeaderProps)
|
||||
|
||||
const data = await response.json()
|
||||
|
||||
// Filter workspaces where user has write/admin permissions
|
||||
const availableWorkspaces = data.workspaces
|
||||
.filter((ws: any) => ws.permissions === 'write' || ws.permissions === 'admin')
|
||||
.map((ws: any) => ({
|
||||
@@ -97,47 +94,27 @@ export function KnowledgeHeader({ breadcrumbs, options }: KnowledgeHeaderProps)
|
||||
}, [options?.knowledgeBaseId])
|
||||
|
||||
const handleWorkspaceChange = async (workspaceId: string | null) => {
|
||||
if (isUpdatingWorkspace || !options?.knowledgeBaseId) return
|
||||
if (updateKnowledgeBase.isPending || !options?.knowledgeBaseId) return
|
||||
|
||||
try {
|
||||
setIsUpdatingWorkspace(true)
|
||||
setIsWorkspacePopoverOpen(false)
|
||||
setIsWorkspacePopoverOpen(false)
|
||||
|
||||
const response = await fetch(`/api/knowledge/${options.knowledgeBaseId}`, {
|
||||
method: 'PUT',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
updateKnowledgeBase.mutate(
|
||||
{
|
||||
knowledgeBaseId: options.knowledgeBaseId,
|
||||
updates: { workspaceId },
|
||||
},
|
||||
{
|
||||
onSuccess: () => {
|
||||
logger.info(
|
||||
`Knowledge base workspace updated: ${options.knowledgeBaseId} -> ${workspaceId}`
|
||||
)
|
||||
options.onWorkspaceChange?.(workspaceId)
|
||||
},
|
||||
onError: (err) => {
|
||||
logger.error('Error updating workspace:', err)
|
||||
},
|
||||
body: JSON.stringify({
|
||||
workspaceId,
|
||||
}),
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
const result = await response.json()
|
||||
throw new Error(result.error || 'Failed to update workspace')
|
||||
}
|
||||
|
||||
const result = await response.json()
|
||||
|
||||
if (result.success) {
|
||||
logger.info(
|
||||
`Knowledge base workspace updated: ${options.knowledgeBaseId} -> ${workspaceId}`
|
||||
)
|
||||
|
||||
await queryClient.invalidateQueries({
|
||||
queryKey: knowledgeKeys.detail(options.knowledgeBaseId),
|
||||
})
|
||||
|
||||
await options.onWorkspaceChange?.(workspaceId)
|
||||
} else {
|
||||
throw new Error(result.error || 'Failed to update workspace')
|
||||
}
|
||||
} catch (err) {
|
||||
logger.error('Error updating workspace:', err)
|
||||
} finally {
|
||||
setIsUpdatingWorkspace(false)
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
const currentWorkspace = workspaces.find((ws) => ws.id === options?.currentWorkspaceId)
|
||||
@@ -147,7 +124,6 @@ export function KnowledgeHeader({ breadcrumbs, options }: KnowledgeHeaderProps)
|
||||
<div className={HEADER_STYLES.container}>
|
||||
<div className={HEADER_STYLES.breadcrumbs}>
|
||||
{breadcrumbs.map((breadcrumb, index) => {
|
||||
// Use unique identifier when available, fallback to content-based key
|
||||
const key = breadcrumb.id || `${breadcrumb.label}-${breadcrumb.href || index}`
|
||||
|
||||
return (
|
||||
@@ -189,13 +165,13 @@ export function KnowledgeHeader({ breadcrumbs, options }: KnowledgeHeaderProps)
|
||||
<PopoverTrigger asChild>
|
||||
<Button
|
||||
variant='outline'
|
||||
disabled={isLoadingWorkspaces || isUpdatingWorkspace}
|
||||
disabled={isLoadingWorkspaces || updateKnowledgeBase.isPending}
|
||||
className={filterButtonClass}
|
||||
>
|
||||
<span className='truncate'>
|
||||
{isLoadingWorkspaces
|
||||
? 'Loading...'
|
||||
: isUpdatingWorkspace
|
||||
: updateKnowledgeBase.isPending
|
||||
? 'Updating...'
|
||||
: currentWorkspace?.name || 'No workspace'}
|
||||
</span>
|
||||
|
||||
@@ -32,6 +32,7 @@ import {
|
||||
import { useUserPermissionsContext } from '@/app/workspace/[workspaceId]/providers/workspace-permissions-provider'
|
||||
import { useContextMenu } from '@/app/workspace/[workspaceId]/w/components/sidebar/hooks'
|
||||
import { useKnowledgeBasesList } from '@/hooks/kb/use-knowledge'
|
||||
import { useDeleteKnowledgeBase, useUpdateKnowledgeBase } from '@/hooks/queries/knowledge'
|
||||
import { useDebounce } from '@/hooks/use-debounce'
|
||||
|
||||
const logger = createLogger('Knowledge')
|
||||
@@ -51,10 +52,12 @@ export function Knowledge() {
|
||||
const params = useParams()
|
||||
const workspaceId = params.workspaceId as string
|
||||
|
||||
const { knowledgeBases, isLoading, error, removeKnowledgeBase, updateKnowledgeBase } =
|
||||
useKnowledgeBasesList(workspaceId)
|
||||
const { knowledgeBases, isLoading, error } = useKnowledgeBasesList(workspaceId)
|
||||
const userPermissions = useUserPermissionsContext()
|
||||
|
||||
const { mutateAsync: updateKnowledgeBaseMutation } = useUpdateKnowledgeBase(workspaceId)
|
||||
const { mutateAsync: deleteKnowledgeBaseMutation } = useDeleteKnowledgeBase(workspaceId)
|
||||
|
||||
const [searchQuery, setSearchQuery] = useState('')
|
||||
const debouncedSearchQuery = useDebounce(searchQuery, 300)
|
||||
const [isCreateModalOpen, setIsCreateModalOpen] = useState(false)
|
||||
@@ -112,29 +115,13 @@ export function Knowledge() {
|
||||
*/
|
||||
const handleUpdateKnowledgeBase = useCallback(
|
||||
async (id: string, name: string, description: string) => {
|
||||
const response = await fetch(`/api/knowledge/${id}`, {
|
||||
method: 'PUT',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
},
|
||||
body: JSON.stringify({ name, description }),
|
||||
await updateKnowledgeBaseMutation({
|
||||
knowledgeBaseId: id,
|
||||
updates: { name, description },
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
const result = await response.json()
|
||||
throw new Error(result.error || 'Failed to update knowledge base')
|
||||
}
|
||||
|
||||
const result = await response.json()
|
||||
|
||||
if (result.success) {
|
||||
logger.info(`Knowledge base updated: ${id}`)
|
||||
updateKnowledgeBase(id, { name, description })
|
||||
} else {
|
||||
throw new Error(result.error || 'Failed to update knowledge base')
|
||||
}
|
||||
logger.info(`Knowledge base updated: ${id}`)
|
||||
},
|
||||
[updateKnowledgeBase]
|
||||
[updateKnowledgeBaseMutation]
|
||||
)
|
||||
|
||||
/**
|
||||
@@ -142,25 +129,10 @@ export function Knowledge() {
|
||||
*/
|
||||
const handleDeleteKnowledgeBase = useCallback(
|
||||
async (id: string) => {
|
||||
const response = await fetch(`/api/knowledge/${id}`, {
|
||||
method: 'DELETE',
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
const result = await response.json()
|
||||
throw new Error(result.error || 'Failed to delete knowledge base')
|
||||
}
|
||||
|
||||
const result = await response.json()
|
||||
|
||||
if (result.success) {
|
||||
logger.info(`Knowledge base deleted: ${id}`)
|
||||
removeKnowledgeBase(id)
|
||||
} else {
|
||||
throw new Error(result.error || 'Failed to delete knowledge base')
|
||||
}
|
||||
await deleteKnowledgeBaseMutation({ knowledgeBaseId: id })
|
||||
logger.info(`Knowledge base deleted: ${id}`)
|
||||
},
|
||||
[removeKnowledgeBase]
|
||||
[deleteKnowledgeBaseMutation]
|
||||
)
|
||||
|
||||
/**
|
||||
|
||||
@@ -191,26 +191,10 @@ export const Copilot = forwardRef<CopilotRef, CopilotProps>(({ panelWidth }, ref
|
||||
}, [isInitialized, messages.length, scrollToBottom])
|
||||
|
||||
/**
|
||||
* Cleanup on component unmount (page refresh, navigation, etc.)
|
||||
* Uses a ref to track sending state to avoid stale closure issues
|
||||
* Note: Parent workflow.tsx also has useStreamCleanup for page-level cleanup
|
||||
* Note: We intentionally do NOT abort on component unmount.
|
||||
* Streams continue server-side and can be resumed when user returns.
|
||||
* The server persists chunks to Redis for resumption.
|
||||
*/
|
||||
const isSendingRef = useRef(isSendingMessage)
|
||||
isSendingRef.current = isSendingMessage
|
||||
const abortMessageRef = useRef(abortMessage)
|
||||
abortMessageRef.current = abortMessage
|
||||
|
||||
useEffect(() => {
|
||||
return () => {
|
||||
// Use refs to check current values, not stale closure values
|
||||
if (isSendingRef.current) {
|
||||
abortMessageRef.current()
|
||||
logger.info('Aborted active message streaming due to component unmount')
|
||||
}
|
||||
}
|
||||
// Empty deps - only run cleanup on actual unmount, not on re-renders
|
||||
// eslint-disable-next-line react-hooks/exhaustive-deps
|
||||
}, [])
|
||||
|
||||
/**
|
||||
* Container-level click capture to cancel edit mode when clicking outside the current edit area
|
||||
|
||||
@@ -452,39 +452,6 @@ console.log(limits);`
|
||||
</div>
|
||||
)}
|
||||
|
||||
{/* <div>
|
||||
<div className='mb-[6.5px] flex items-center justify-between'>
|
||||
<Label className='block pl-[2px] font-medium text-[13px] text-[var(--text-primary)]'>
|
||||
URL
|
||||
</Label>
|
||||
<Tooltip.Root>
|
||||
<Tooltip.Trigger asChild>
|
||||
<Button
|
||||
variant='ghost'
|
||||
onClick={() => handleCopy('endpoint', info.endpoint)}
|
||||
aria-label='Copy endpoint'
|
||||
className='!p-1.5 -my-1.5'
|
||||
>
|
||||
{copied.endpoint ? (
|
||||
<Check className='h-3 w-3' />
|
||||
) : (
|
||||
<Clipboard className='h-3 w-3' />
|
||||
)}
|
||||
</Button>
|
||||
</Tooltip.Trigger>
|
||||
<Tooltip.Content>
|
||||
<span>{copied.endpoint ? 'Copied' : 'Copy'}</span>
|
||||
</Tooltip.Content>
|
||||
</Tooltip.Root>
|
||||
</div>
|
||||
<Code.Viewer
|
||||
code={info.endpoint}
|
||||
language='javascript'
|
||||
wrapText
|
||||
className='!min-h-0 rounded-[4px] border border-[var(--border-1)]'
|
||||
/>
|
||||
</div> */}
|
||||
|
||||
<div>
|
||||
<div className='mb-[6.5px] flex items-center justify-between'>
|
||||
<Label className='block pl-[2px] font-medium text-[13px] text-[var(--text-primary)]'>
|
||||
|
||||
@@ -0,0 +1,260 @@
|
||||
'use client'
|
||||
|
||||
import { useCallback, useEffect, useMemo, useRef, useState } from 'react'
|
||||
import {
|
||||
Badge,
|
||||
Button,
|
||||
Input,
|
||||
Label,
|
||||
Modal,
|
||||
ModalBody,
|
||||
ModalContent,
|
||||
ModalFooter,
|
||||
ModalHeader,
|
||||
Textarea,
|
||||
} from '@/components/emcn'
|
||||
import { normalizeInputFormatValue } from '@/lib/workflows/input-format'
|
||||
import { isValidStartBlockType } from '@/lib/workflows/triggers/start-block-types'
|
||||
import type { InputFormatField } from '@/lib/workflows/types'
|
||||
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
|
||||
import { useSubBlockStore } from '@/stores/workflows/subblock/store'
|
||||
import { useWorkflowStore } from '@/stores/workflows/workflow/store'
|
||||
|
||||
type NormalizedField = InputFormatField & { name: string }
|
||||
|
||||
interface ApiInfoModalProps {
|
||||
open: boolean
|
||||
onOpenChange: (open: boolean) => void
|
||||
workflowId: string
|
||||
}
|
||||
|
||||
export function ApiInfoModal({ open, onOpenChange, workflowId }: ApiInfoModalProps) {
|
||||
const blocks = useWorkflowStore((state) => state.blocks)
|
||||
const setValue = useSubBlockStore((state) => state.setValue)
|
||||
const subBlockValues = useSubBlockStore((state) =>
|
||||
workflowId ? (state.workflowValues[workflowId] ?? {}) : {}
|
||||
)
|
||||
|
||||
const workflowMetadata = useWorkflowRegistry((state) =>
|
||||
workflowId ? state.workflows[workflowId] : undefined
|
||||
)
|
||||
const updateWorkflow = useWorkflowRegistry((state) => state.updateWorkflow)
|
||||
|
||||
const [description, setDescription] = useState('')
|
||||
const [paramDescriptions, setParamDescriptions] = useState<Record<string, string>>({})
|
||||
const [isSaving, setIsSaving] = useState(false)
|
||||
const [showUnsavedChangesAlert, setShowUnsavedChangesAlert] = useState(false)
|
||||
|
||||
const initialDescriptionRef = useRef('')
|
||||
const initialParamDescriptionsRef = useRef<Record<string, string>>({})
|
||||
|
||||
const starterBlockId = useMemo(() => {
|
||||
for (const [blockId, block] of Object.entries(blocks)) {
|
||||
if (!block || typeof block !== 'object') continue
|
||||
const blockType = (block as { type?: string }).type
|
||||
if (blockType && isValidStartBlockType(blockType)) {
|
||||
return blockId
|
||||
}
|
||||
}
|
||||
return null
|
||||
}, [blocks])
|
||||
|
||||
const inputFormat = useMemo((): NormalizedField[] => {
|
||||
if (!starterBlockId) return []
|
||||
|
||||
const storeValue = subBlockValues[starterBlockId]?.inputFormat
|
||||
const normalized = normalizeInputFormatValue(storeValue) as NormalizedField[]
|
||||
if (normalized.length > 0) return normalized
|
||||
|
||||
const startBlock = blocks[starterBlockId]
|
||||
const blockValue = startBlock?.subBlocks?.inputFormat?.value
|
||||
return normalizeInputFormatValue(blockValue) as NormalizedField[]
|
||||
}, [starterBlockId, subBlockValues, blocks])
|
||||
|
||||
useEffect(() => {
|
||||
if (open) {
|
||||
const normalizedDesc = workflowMetadata?.description?.toLowerCase().trim()
|
||||
const isDefaultDescription =
|
||||
!workflowMetadata?.description ||
|
||||
workflowMetadata.description === workflowMetadata.name ||
|
||||
normalizedDesc === 'new workflow' ||
|
||||
normalizedDesc === 'your first workflow - start building here!'
|
||||
|
||||
const initialDescription = isDefaultDescription ? '' : workflowMetadata?.description || ''
|
||||
setDescription(initialDescription)
|
||||
initialDescriptionRef.current = initialDescription
|
||||
|
||||
const descriptions: Record<string, string> = {}
|
||||
for (const field of inputFormat) {
|
||||
if (field.description) {
|
||||
descriptions[field.name] = field.description
|
||||
}
|
||||
}
|
||||
setParamDescriptions(descriptions)
|
||||
initialParamDescriptionsRef.current = { ...descriptions }
|
||||
}
|
||||
}, [open, workflowMetadata, inputFormat])
|
||||
|
||||
const hasChanges = useMemo(() => {
|
||||
if (description.trim() !== initialDescriptionRef.current.trim()) return true
|
||||
|
||||
for (const field of inputFormat) {
|
||||
const currentValue = (paramDescriptions[field.name] || '').trim()
|
||||
const initialValue = (initialParamDescriptionsRef.current[field.name] || '').trim()
|
||||
if (currentValue !== initialValue) return true
|
||||
}
|
||||
|
||||
return false
|
||||
}, [description, paramDescriptions, inputFormat])
|
||||
|
||||
const handleParamDescriptionChange = (fieldName: string, value: string) => {
|
||||
setParamDescriptions((prev) => ({
|
||||
...prev,
|
||||
[fieldName]: value,
|
||||
}))
|
||||
}
|
||||
|
||||
const handleCloseAttempt = useCallback(() => {
|
||||
if (hasChanges && !isSaving) {
|
||||
setShowUnsavedChangesAlert(true)
|
||||
} else {
|
||||
onOpenChange(false)
|
||||
}
|
||||
}, [hasChanges, isSaving, onOpenChange])
|
||||
|
||||
const handleDiscardChanges = useCallback(() => {
|
||||
setShowUnsavedChangesAlert(false)
|
||||
setDescription(initialDescriptionRef.current)
|
||||
setParamDescriptions({ ...initialParamDescriptionsRef.current })
|
||||
onOpenChange(false)
|
||||
}, [onOpenChange])
|
||||
|
||||
const handleSave = useCallback(async () => {
|
||||
if (!workflowId) return
|
||||
|
||||
const activeWorkflowId = useWorkflowRegistry.getState().activeWorkflowId
|
||||
if (activeWorkflowId !== workflowId) {
|
||||
return
|
||||
}
|
||||
|
||||
setIsSaving(true)
|
||||
try {
|
||||
if (description.trim() !== (workflowMetadata?.description || '')) {
|
||||
updateWorkflow(workflowId, { description: description.trim() || 'New workflow' })
|
||||
}
|
||||
|
||||
if (starterBlockId) {
|
||||
const updatedValue = inputFormat.map((field) => ({
|
||||
...field,
|
||||
description: paramDescriptions[field.name]?.trim() || undefined,
|
||||
}))
|
||||
setValue(starterBlockId, 'inputFormat', updatedValue)
|
||||
}
|
||||
|
||||
onOpenChange(false)
|
||||
} finally {
|
||||
setIsSaving(false)
|
||||
}
|
||||
}, [
|
||||
workflowId,
|
||||
description,
|
||||
workflowMetadata,
|
||||
updateWorkflow,
|
||||
starterBlockId,
|
||||
inputFormat,
|
||||
paramDescriptions,
|
||||
setValue,
|
||||
onOpenChange,
|
||||
])
|
||||
|
||||
return (
|
||||
<>
|
||||
<Modal open={open} onOpenChange={(openState) => !openState && handleCloseAttempt()}>
|
||||
<ModalContent className='max-w-[480px]'>
|
||||
<ModalHeader>
|
||||
<span>Edit API Info</span>
|
||||
</ModalHeader>
|
||||
<ModalBody className='space-y-[12px]'>
|
||||
<div>
|
||||
<Label className='mb-[6.5px] block pl-[2px] font-medium text-[13px] text-[var(--text-primary)]'>
|
||||
Description
|
||||
</Label>
|
||||
<Textarea
|
||||
placeholder='Describe what this workflow API does...'
|
||||
className='min-h-[80px] resize-none'
|
||||
value={description}
|
||||
onChange={(e) => setDescription(e.target.value)}
|
||||
/>
|
||||
</div>
|
||||
|
||||
{inputFormat.length > 0 && (
|
||||
<div>
|
||||
<Label className='mb-[6.5px] block pl-[2px] font-medium text-[13px] text-[var(--text-primary)]'>
|
||||
Parameters ({inputFormat.length})
|
||||
</Label>
|
||||
<div className='flex flex-col gap-[8px]'>
|
||||
{inputFormat.map((field) => (
|
||||
<div
|
||||
key={field.name}
|
||||
className='overflow-hidden rounded-[4px] border border-[var(--border-1)]'
|
||||
>
|
||||
<div className='flex items-center justify-between bg-[var(--surface-4)] px-[10px] py-[5px]'>
|
||||
<div className='flex min-w-0 flex-1 items-center gap-[8px]'>
|
||||
<span className='block truncate font-medium text-[14px] text-[var(--text-tertiary)]'>
|
||||
{field.name}
|
||||
</span>
|
||||
<Badge size='sm'>{field.type || 'string'}</Badge>
|
||||
</div>
|
||||
</div>
|
||||
<div className='border-[var(--border-1)] border-t px-[10px] pt-[6px] pb-[10px]'>
|
||||
<div className='flex flex-col gap-[6px]'>
|
||||
<Label className='text-[13px]'>Description</Label>
|
||||
<Input
|
||||
value={paramDescriptions[field.name] || ''}
|
||||
onChange={(e) =>
|
||||
handleParamDescriptionChange(field.name, e.target.value)
|
||||
}
|
||||
placeholder={`Enter description for ${field.name}`}
|
||||
/>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
))}
|
||||
</div>
|
||||
</div>
|
||||
)}
|
||||
</ModalBody>
|
||||
<ModalFooter>
|
||||
<Button variant='default' onClick={handleCloseAttempt} disabled={isSaving}>
|
||||
Cancel
|
||||
</Button>
|
||||
<Button variant='tertiary' onClick={handleSave} disabled={isSaving || !hasChanges}>
|
||||
{isSaving ? 'Saving...' : 'Save'}
|
||||
</Button>
|
||||
</ModalFooter>
|
||||
</ModalContent>
|
||||
</Modal>
|
||||
|
||||
<Modal open={showUnsavedChangesAlert} onOpenChange={setShowUnsavedChangesAlert}>
|
||||
<ModalContent className='max-w-[400px]'>
|
||||
<ModalHeader>
|
||||
<span>Unsaved Changes</span>
|
||||
</ModalHeader>
|
||||
<ModalBody>
|
||||
<p className='text-[14px] text-[var(--text-secondary)]'>
|
||||
You have unsaved changes. Are you sure you want to discard them?
|
||||
</p>
|
||||
</ModalBody>
|
||||
<ModalFooter>
|
||||
<Button variant='default' onClick={() => setShowUnsavedChangesAlert(false)}>
|
||||
Keep Editing
|
||||
</Button>
|
||||
<Button variant='destructive' onClick={handleDiscardChanges}>
|
||||
Discard Changes
|
||||
</Button>
|
||||
</ModalFooter>
|
||||
</ModalContent>
|
||||
</Modal>
|
||||
</>
|
||||
)
|
||||
}
|
||||
@@ -43,6 +43,7 @@ import type { WorkflowState } from '@/stores/workflows/workflow/types'
|
||||
import { A2aDeploy } from './components/a2a/a2a'
|
||||
import { ApiDeploy } from './components/api/api'
|
||||
import { ChatDeploy, type ExistingChat } from './components/chat/chat'
|
||||
import { ApiInfoModal } from './components/general/components/api-info-modal'
|
||||
import { GeneralDeploy } from './components/general/general'
|
||||
import { McpDeploy } from './components/mcp/mcp'
|
||||
import { TemplateDeploy } from './components/template/template'
|
||||
@@ -110,6 +111,7 @@ export function DeployModal({
|
||||
const [chatSuccess, setChatSuccess] = useState(false)
|
||||
|
||||
const [isCreateKeyModalOpen, setIsCreateKeyModalOpen] = useState(false)
|
||||
const [isApiInfoModalOpen, setIsApiInfoModalOpen] = useState(false)
|
||||
const userPermissions = useUserPermissionsContext()
|
||||
const canManageWorkspaceKeys = userPermissions.canAdmin
|
||||
const { config: permissionConfig } = usePermissionConfig()
|
||||
@@ -389,11 +391,6 @@ export function DeployModal({
|
||||
form?.requestSubmit()
|
||||
}, [])
|
||||
|
||||
const handleA2aFormSubmit = useCallback(() => {
|
||||
const form = document.getElementById('a2a-deploy-form') as HTMLFormElement
|
||||
form?.requestSubmit()
|
||||
}, [])
|
||||
|
||||
const handleA2aPublish = useCallback(() => {
|
||||
const form = document.getElementById('a2a-deploy-form')
|
||||
const publishTrigger = form?.querySelector('[data-a2a-publish-trigger]') as HTMLButtonElement
|
||||
@@ -594,7 +591,11 @@ export function DeployModal({
|
||||
)}
|
||||
{activeTab === 'api' && (
|
||||
<ModalFooter className='items-center justify-between'>
|
||||
<div />
|
||||
<div>
|
||||
<Button variant='default' onClick={() => setIsApiInfoModalOpen(true)}>
|
||||
Edit API Info
|
||||
</Button>
|
||||
</div>
|
||||
<div className='flex items-center gap-2'>
|
||||
<Button
|
||||
variant='tertiary'
|
||||
@@ -880,6 +881,14 @@ export function DeployModal({
|
||||
canManageWorkspaceKeys={canManageWorkspaceKeys}
|
||||
defaultKeyType={defaultKeyType}
|
||||
/>
|
||||
|
||||
{workflowId && (
|
||||
<ApiInfoModal
|
||||
open={isApiInfoModalOpen}
|
||||
onOpenChange={setIsApiInfoModalOpen}
|
||||
workflowId={workflowId}
|
||||
/>
|
||||
)}
|
||||
</>
|
||||
)
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { keepPreviousData, useQuery } from '@tanstack/react-query'
|
||||
import { keepPreviousData, useMutation, useQuery, useQueryClient } from '@tanstack/react-query'
|
||||
import type {
|
||||
ChunkData,
|
||||
ChunksPagination,
|
||||
@@ -332,3 +332,629 @@ export function useDocumentChunkSearchQuery(
|
||||
placeholderData: keepPreviousData,
|
||||
})
|
||||
}
|
||||
|
||||
export interface UpdateChunkParams {
|
||||
knowledgeBaseId: string
|
||||
documentId: string
|
||||
chunkId: string
|
||||
content?: string
|
||||
enabled?: boolean
|
||||
}
|
||||
|
||||
export async function updateChunk({
|
||||
knowledgeBaseId,
|
||||
documentId,
|
||||
chunkId,
|
||||
content,
|
||||
enabled,
|
||||
}: UpdateChunkParams): Promise<ChunkData> {
|
||||
const body: Record<string, unknown> = {}
|
||||
if (content !== undefined) body.content = content
|
||||
if (enabled !== undefined) body.enabled = enabled
|
||||
|
||||
const response = await fetch(
|
||||
`/api/knowledge/${knowledgeBaseId}/documents/${documentId}/chunks/${chunkId}`,
|
||||
{
|
||||
method: 'PUT',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify(body),
|
||||
}
|
||||
)
|
||||
|
||||
if (!response.ok) {
|
||||
const result = await response.json()
|
||||
throw new Error(result.error || 'Failed to update chunk')
|
||||
}
|
||||
|
||||
const result = await response.json()
|
||||
if (!result?.success) {
|
||||
throw new Error(result?.error || 'Failed to update chunk')
|
||||
}
|
||||
|
||||
return result.data
|
||||
}
|
||||
|
||||
export function useUpdateChunk() {
|
||||
const queryClient = useQueryClient()
|
||||
|
||||
return useMutation({
|
||||
mutationFn: updateChunk,
|
||||
onSuccess: (_, { knowledgeBaseId, documentId }) => {
|
||||
queryClient.invalidateQueries({
|
||||
queryKey: knowledgeKeys.detail(knowledgeBaseId),
|
||||
})
|
||||
queryClient.invalidateQueries({
|
||||
queryKey: knowledgeKeys.document(knowledgeBaseId, documentId),
|
||||
})
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
export interface DeleteChunkParams {
|
||||
knowledgeBaseId: string
|
||||
documentId: string
|
||||
chunkId: string
|
||||
}
|
||||
|
||||
export async function deleteChunk({
|
||||
knowledgeBaseId,
|
||||
documentId,
|
||||
chunkId,
|
||||
}: DeleteChunkParams): Promise<void> {
|
||||
const response = await fetch(
|
||||
`/api/knowledge/${knowledgeBaseId}/documents/${documentId}/chunks/${chunkId}`,
|
||||
{ method: 'DELETE' }
|
||||
)
|
||||
|
||||
if (!response.ok) {
|
||||
const result = await response.json()
|
||||
throw new Error(result.error || 'Failed to delete chunk')
|
||||
}
|
||||
|
||||
const result = await response.json()
|
||||
if (!result?.success) {
|
||||
throw new Error(result?.error || 'Failed to delete chunk')
|
||||
}
|
||||
}
|
||||
|
||||
export function useDeleteChunk() {
|
||||
const queryClient = useQueryClient()
|
||||
|
||||
return useMutation({
|
||||
mutationFn: deleteChunk,
|
||||
onSuccess: (_, { knowledgeBaseId, documentId }) => {
|
||||
queryClient.invalidateQueries({
|
||||
queryKey: knowledgeKeys.detail(knowledgeBaseId),
|
||||
})
|
||||
queryClient.invalidateQueries({
|
||||
queryKey: knowledgeKeys.document(knowledgeBaseId, documentId),
|
||||
})
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
export interface CreateChunkParams {
|
||||
knowledgeBaseId: string
|
||||
documentId: string
|
||||
content: string
|
||||
enabled?: boolean
|
||||
}
|
||||
|
||||
export async function createChunk({
|
||||
knowledgeBaseId,
|
||||
documentId,
|
||||
content,
|
||||
enabled = true,
|
||||
}: CreateChunkParams): Promise<ChunkData> {
|
||||
const response = await fetch(`/api/knowledge/${knowledgeBaseId}/documents/${documentId}/chunks`, {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({ content, enabled }),
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
const result = await response.json()
|
||||
throw new Error(result.error || 'Failed to create chunk')
|
||||
}
|
||||
|
||||
const result = await response.json()
|
||||
if (!result?.success || !result?.data) {
|
||||
throw new Error(result?.error || 'Failed to create chunk')
|
||||
}
|
||||
|
||||
return result.data
|
||||
}
|
||||
|
||||
export function useCreateChunk() {
|
||||
const queryClient = useQueryClient()
|
||||
|
||||
return useMutation({
|
||||
mutationFn: createChunk,
|
||||
onSuccess: (_, { knowledgeBaseId, documentId }) => {
|
||||
queryClient.invalidateQueries({
|
||||
queryKey: knowledgeKeys.detail(knowledgeBaseId),
|
||||
})
|
||||
queryClient.invalidateQueries({
|
||||
queryKey: knowledgeKeys.document(knowledgeBaseId, documentId),
|
||||
})
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
export interface UpdateDocumentParams {
|
||||
knowledgeBaseId: string
|
||||
documentId: string
|
||||
updates: {
|
||||
enabled?: boolean
|
||||
filename?: string
|
||||
retryProcessing?: boolean
|
||||
markFailedDueToTimeout?: boolean
|
||||
}
|
||||
}
|
||||
|
||||
export async function updateDocument({
|
||||
knowledgeBaseId,
|
||||
documentId,
|
||||
updates,
|
||||
}: UpdateDocumentParams): Promise<DocumentData> {
|
||||
const response = await fetch(`/api/knowledge/${knowledgeBaseId}/documents/${documentId}`, {
|
||||
method: 'PUT',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify(updates),
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
const result = await response.json()
|
||||
throw new Error(result.error || 'Failed to update document')
|
||||
}
|
||||
|
||||
const result = await response.json()
|
||||
if (!result?.success) {
|
||||
throw new Error(result?.error || 'Failed to update document')
|
||||
}
|
||||
|
||||
return result.data
|
||||
}
|
||||
|
||||
export function useUpdateDocument() {
|
||||
const queryClient = useQueryClient()
|
||||
|
||||
return useMutation({
|
||||
mutationFn: updateDocument,
|
||||
onSuccess: (_, { knowledgeBaseId, documentId }) => {
|
||||
queryClient.invalidateQueries({
|
||||
queryKey: knowledgeKeys.detail(knowledgeBaseId),
|
||||
})
|
||||
queryClient.invalidateQueries({
|
||||
queryKey: knowledgeKeys.document(knowledgeBaseId, documentId),
|
||||
})
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
export interface DeleteDocumentParams {
|
||||
knowledgeBaseId: string
|
||||
documentId: string
|
||||
}
|
||||
|
||||
export async function deleteDocument({
|
||||
knowledgeBaseId,
|
||||
documentId,
|
||||
}: DeleteDocumentParams): Promise<void> {
|
||||
const response = await fetch(`/api/knowledge/${knowledgeBaseId}/documents/${documentId}`, {
|
||||
method: 'DELETE',
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
const result = await response.json()
|
||||
throw new Error(result.error || 'Failed to delete document')
|
||||
}
|
||||
|
||||
const result = await response.json()
|
||||
if (!result?.success) {
|
||||
throw new Error(result?.error || 'Failed to delete document')
|
||||
}
|
||||
}
|
||||
|
||||
export function useDeleteDocument() {
|
||||
const queryClient = useQueryClient()
|
||||
|
||||
return useMutation({
|
||||
mutationFn: deleteDocument,
|
||||
onSuccess: (_, { knowledgeBaseId }) => {
|
||||
queryClient.invalidateQueries({
|
||||
queryKey: knowledgeKeys.detail(knowledgeBaseId),
|
||||
})
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
export interface BulkDocumentOperationParams {
|
||||
knowledgeBaseId: string
|
||||
operation: 'enable' | 'disable' | 'delete'
|
||||
documentIds: string[]
|
||||
}
|
||||
|
||||
export interface BulkDocumentOperationResult {
|
||||
successCount: number
|
||||
failedCount: number
|
||||
updatedDocuments?: Array<{ id: string; enabled: boolean }>
|
||||
}
|
||||
|
||||
export async function bulkDocumentOperation({
|
||||
knowledgeBaseId,
|
||||
operation,
|
||||
documentIds,
|
||||
}: BulkDocumentOperationParams): Promise<BulkDocumentOperationResult> {
|
||||
const response = await fetch(`/api/knowledge/${knowledgeBaseId}/documents`, {
|
||||
method: 'PATCH',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({ operation, documentIds }),
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
const result = await response.json()
|
||||
throw new Error(result.error || `Failed to ${operation} documents`)
|
||||
}
|
||||
|
||||
const result = await response.json()
|
||||
if (!result?.success) {
|
||||
throw new Error(result?.error || `Failed to ${operation} documents`)
|
||||
}
|
||||
|
||||
return result.data
|
||||
}
|
||||
|
||||
export function useBulkDocumentOperation() {
|
||||
const queryClient = useQueryClient()
|
||||
|
||||
return useMutation({
|
||||
mutationFn: bulkDocumentOperation,
|
||||
onSuccess: (_, { knowledgeBaseId }) => {
|
||||
queryClient.invalidateQueries({
|
||||
queryKey: knowledgeKeys.detail(knowledgeBaseId),
|
||||
})
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
export interface CreateKnowledgeBaseParams {
|
||||
name: string
|
||||
description?: string
|
||||
workspaceId: string
|
||||
chunkingConfig: {
|
||||
maxSize: number
|
||||
minSize: number
|
||||
overlap: number
|
||||
}
|
||||
}
|
||||
|
||||
export async function createKnowledgeBase(
|
||||
params: CreateKnowledgeBaseParams
|
||||
): Promise<KnowledgeBaseData> {
|
||||
const response = await fetch('/api/knowledge', {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify(params),
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
const result = await response.json()
|
||||
throw new Error(result.error || 'Failed to create knowledge base')
|
||||
}
|
||||
|
||||
const result = await response.json()
|
||||
if (!result?.success || !result?.data) {
|
||||
throw new Error(result?.error || 'Failed to create knowledge base')
|
||||
}
|
||||
|
||||
return result.data
|
||||
}
|
||||
|
||||
export function useCreateKnowledgeBase(workspaceId?: string) {
|
||||
const queryClient = useQueryClient()
|
||||
|
||||
return useMutation({
|
||||
mutationFn: createKnowledgeBase,
|
||||
onSuccess: () => {
|
||||
queryClient.invalidateQueries({
|
||||
queryKey: knowledgeKeys.list(workspaceId),
|
||||
})
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
export interface UpdateKnowledgeBaseParams {
|
||||
knowledgeBaseId: string
|
||||
updates: {
|
||||
name?: string
|
||||
description?: string
|
||||
workspaceId?: string | null
|
||||
}
|
||||
}
|
||||
|
||||
export async function updateKnowledgeBase({
|
||||
knowledgeBaseId,
|
||||
updates,
|
||||
}: UpdateKnowledgeBaseParams): Promise<KnowledgeBaseData> {
|
||||
const response = await fetch(`/api/knowledge/${knowledgeBaseId}`, {
|
||||
method: 'PUT',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify(updates),
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
const result = await response.json()
|
||||
throw new Error(result.error || 'Failed to update knowledge base')
|
||||
}
|
||||
|
||||
const result = await response.json()
|
||||
if (!result?.success) {
|
||||
throw new Error(result?.error || 'Failed to update knowledge base')
|
||||
}
|
||||
|
||||
return result.data
|
||||
}
|
||||
|
||||
export function useUpdateKnowledgeBase(workspaceId?: string) {
|
||||
const queryClient = useQueryClient()
|
||||
|
||||
return useMutation({
|
||||
mutationFn: updateKnowledgeBase,
|
||||
onSuccess: (_, { knowledgeBaseId }) => {
|
||||
queryClient.invalidateQueries({
|
||||
queryKey: knowledgeKeys.detail(knowledgeBaseId),
|
||||
})
|
||||
queryClient.invalidateQueries({
|
||||
queryKey: knowledgeKeys.list(workspaceId),
|
||||
})
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
export interface DeleteKnowledgeBaseParams {
|
||||
knowledgeBaseId: string
|
||||
}
|
||||
|
||||
export async function deleteKnowledgeBase({
|
||||
knowledgeBaseId,
|
||||
}: DeleteKnowledgeBaseParams): Promise<void> {
|
||||
const response = await fetch(`/api/knowledge/${knowledgeBaseId}`, {
|
||||
method: 'DELETE',
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
const result = await response.json()
|
||||
throw new Error(result.error || 'Failed to delete knowledge base')
|
||||
}
|
||||
|
||||
const result = await response.json()
|
||||
if (!result?.success) {
|
||||
throw new Error(result?.error || 'Failed to delete knowledge base')
|
||||
}
|
||||
}
|
||||
|
||||
export function useDeleteKnowledgeBase(workspaceId?: string) {
|
||||
const queryClient = useQueryClient()
|
||||
|
||||
return useMutation({
|
||||
mutationFn: deleteKnowledgeBase,
|
||||
onSuccess: () => {
|
||||
queryClient.invalidateQueries({
|
||||
queryKey: knowledgeKeys.list(workspaceId),
|
||||
})
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
export interface BulkChunkOperationParams {
|
||||
knowledgeBaseId: string
|
||||
documentId: string
|
||||
operation: 'enable' | 'disable' | 'delete'
|
||||
chunkIds: string[]
|
||||
}
|
||||
|
||||
export interface BulkChunkOperationResult {
|
||||
successCount: number
|
||||
failedCount: number
|
||||
results: Array<{
|
||||
operation: string
|
||||
chunkIds: string[]
|
||||
}>
|
||||
}
|
||||
|
||||
export async function bulkChunkOperation({
|
||||
knowledgeBaseId,
|
||||
documentId,
|
||||
operation,
|
||||
chunkIds,
|
||||
}: BulkChunkOperationParams): Promise<BulkChunkOperationResult> {
|
||||
const response = await fetch(`/api/knowledge/${knowledgeBaseId}/documents/${documentId}/chunks`, {
|
||||
method: 'PATCH',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({ operation, chunkIds }),
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
const result = await response.json()
|
||||
throw new Error(result.error || `Failed to ${operation} chunks`)
|
||||
}
|
||||
|
||||
const result = await response.json()
|
||||
if (!result?.success) {
|
||||
throw new Error(result?.error || `Failed to ${operation} chunks`)
|
||||
}
|
||||
|
||||
return result.data
|
||||
}
|
||||
|
||||
export function useBulkChunkOperation() {
|
||||
const queryClient = useQueryClient()
|
||||
|
||||
return useMutation({
|
||||
mutationFn: bulkChunkOperation,
|
||||
onSuccess: (_, { knowledgeBaseId, documentId }) => {
|
||||
queryClient.invalidateQueries({
|
||||
queryKey: knowledgeKeys.detail(knowledgeBaseId),
|
||||
})
|
||||
queryClient.invalidateQueries({
|
||||
queryKey: knowledgeKeys.document(knowledgeBaseId, documentId),
|
||||
})
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
export interface UpdateDocumentTagsParams {
|
||||
knowledgeBaseId: string
|
||||
documentId: string
|
||||
tags: Record<string, string>
|
||||
}
|
||||
|
||||
export async function updateDocumentTags({
|
||||
knowledgeBaseId,
|
||||
documentId,
|
||||
tags,
|
||||
}: UpdateDocumentTagsParams): Promise<DocumentData> {
|
||||
const response = await fetch(`/api/knowledge/${knowledgeBaseId}/documents/${documentId}`, {
|
||||
method: 'PUT',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify(tags),
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
const result = await response.json()
|
||||
throw new Error(result.error || 'Failed to update document tags')
|
||||
}
|
||||
|
||||
const result = await response.json()
|
||||
if (!result?.success) {
|
||||
throw new Error(result?.error || 'Failed to update document tags')
|
||||
}
|
||||
|
||||
return result.data
|
||||
}
|
||||
|
||||
export function useUpdateDocumentTags() {
|
||||
const queryClient = useQueryClient()
|
||||
|
||||
return useMutation({
|
||||
mutationFn: updateDocumentTags,
|
||||
onSuccess: (_, { knowledgeBaseId, documentId }) => {
|
||||
queryClient.invalidateQueries({
|
||||
queryKey: knowledgeKeys.detail(knowledgeBaseId),
|
||||
})
|
||||
queryClient.invalidateQueries({
|
||||
queryKey: knowledgeKeys.document(knowledgeBaseId, documentId),
|
||||
})
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
export interface TagDefinitionData {
|
||||
id: string
|
||||
tagSlot: string
|
||||
displayName: string
|
||||
fieldType: string
|
||||
createdAt: string
|
||||
updatedAt: string
|
||||
}
|
||||
|
||||
export interface CreateTagDefinitionParams {
|
||||
knowledgeBaseId: string
|
||||
displayName: string
|
||||
fieldType: string
|
||||
}
|
||||
|
||||
async function fetchNextAvailableSlot(knowledgeBaseId: string, fieldType: string): Promise<string> {
|
||||
const response = await fetch(
|
||||
`/api/knowledge/${knowledgeBaseId}/next-available-slot?fieldType=${fieldType}`
|
||||
)
|
||||
|
||||
if (!response.ok) {
|
||||
throw new Error('Failed to get available slot')
|
||||
}
|
||||
|
||||
const result = await response.json()
|
||||
if (!result.success || !result.data?.nextAvailableSlot) {
|
||||
throw new Error('No available tag slots for this field type')
|
||||
}
|
||||
|
||||
return result.data.nextAvailableSlot
|
||||
}
|
||||
|
||||
export async function createTagDefinition({
|
||||
knowledgeBaseId,
|
||||
displayName,
|
||||
fieldType,
|
||||
}: CreateTagDefinitionParams): Promise<TagDefinitionData> {
|
||||
const tagSlot = await fetchNextAvailableSlot(knowledgeBaseId, fieldType)
|
||||
|
||||
const response = await fetch(`/api/knowledge/${knowledgeBaseId}/tag-definitions`, {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({ tagSlot, displayName, fieldType }),
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
const result = await response.json()
|
||||
throw new Error(result.error || 'Failed to create tag definition')
|
||||
}
|
||||
|
||||
const result = await response.json()
|
||||
if (!result?.success || !result?.data) {
|
||||
throw new Error(result?.error || 'Failed to create tag definition')
|
||||
}
|
||||
|
||||
return result.data
|
||||
}
|
||||
|
||||
export function useCreateTagDefinition() {
|
||||
const queryClient = useQueryClient()
|
||||
|
||||
return useMutation({
|
||||
mutationFn: createTagDefinition,
|
||||
onSuccess: (_, { knowledgeBaseId }) => {
|
||||
queryClient.invalidateQueries({
|
||||
queryKey: knowledgeKeys.detail(knowledgeBaseId),
|
||||
})
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
export interface DeleteTagDefinitionParams {
|
||||
knowledgeBaseId: string
|
||||
tagDefinitionId: string
|
||||
}
|
||||
|
||||
export async function deleteTagDefinition({
|
||||
knowledgeBaseId,
|
||||
tagDefinitionId,
|
||||
}: DeleteTagDefinitionParams): Promise<void> {
|
||||
const response = await fetch(
|
||||
`/api/knowledge/${knowledgeBaseId}/tag-definitions/${tagDefinitionId}`,
|
||||
{ method: 'DELETE' }
|
||||
)
|
||||
|
||||
if (!response.ok) {
|
||||
const result = await response.json()
|
||||
throw new Error(result.error || 'Failed to delete tag definition')
|
||||
}
|
||||
|
||||
const result = await response.json()
|
||||
if (!result?.success) {
|
||||
throw new Error(result?.error || 'Failed to delete tag definition')
|
||||
}
|
||||
}
|
||||
|
||||
export function useDeleteTagDefinition() {
|
||||
const queryClient = useQueryClient()
|
||||
|
||||
return useMutation({
|
||||
mutationFn: deleteTagDefinition,
|
||||
onSuccess: (_, { knowledgeBaseId }) => {
|
||||
queryClient.invalidateQueries({
|
||||
queryKey: knowledgeKeys.detail(knowledgeBaseId),
|
||||
})
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
@@ -3,6 +3,15 @@ import type { CopilotMode, CopilotModelId, CopilotTransportMode } from '@/lib/co
|
||||
|
||||
const logger = createLogger('CopilotAPI')
|
||||
|
||||
/**
|
||||
* Response from chat initiation endpoint
|
||||
*/
|
||||
export interface ChatInitResponse {
|
||||
success: boolean
|
||||
streamId: string
|
||||
chatId: string
|
||||
}
|
||||
|
||||
/**
|
||||
* Citation interface for documentation references
|
||||
*/
|
||||
@@ -115,10 +124,16 @@ async function handleApiError(response: Response, defaultMessage: string): Promi
|
||||
/**
|
||||
* Send a streaming message to the copilot chat API
|
||||
* This is the main API endpoint that handles all chat operations
|
||||
*
|
||||
* Server-first architecture:
|
||||
* 1. POST to /api/copilot/chat - starts background processing, returns { streamId, chatId }
|
||||
* 2. Connect to /api/copilot/stream/{streamId} for SSE stream
|
||||
*
|
||||
* This ensures stream continues server-side even if client disconnects
|
||||
*/
|
||||
export async function sendStreamingMessage(
|
||||
request: SendMessageRequest
|
||||
): Promise<StreamingResponse> {
|
||||
): Promise<StreamingResponse & { streamId?: string; chatId?: string }> {
|
||||
try {
|
||||
const { abortSignal, ...requestBody } = request
|
||||
try {
|
||||
@@ -138,34 +153,83 @@ export async function sendStreamingMessage(
|
||||
contextsPreview: preview,
|
||||
})
|
||||
} catch {}
|
||||
const response = await fetch('/api/copilot/chat', {
|
||||
|
||||
// Step 1: Initiate chat - server starts background processing
|
||||
const initResponse = await fetch('/api/copilot/chat', {
|
||||
method: 'POST',
|
||||
headers: { 'Content-Type': 'application/json' },
|
||||
body: JSON.stringify({ ...requestBody, stream: true }),
|
||||
signal: abortSignal,
|
||||
credentials: 'include', // Include cookies for session authentication
|
||||
credentials: 'include',
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
const errorMessage = await handleApiError(response, 'Failed to send streaming message')
|
||||
if (!initResponse.ok) {
|
||||
const errorMessage = await handleApiError(initResponse, 'Failed to initiate chat')
|
||||
return {
|
||||
success: false,
|
||||
error: errorMessage,
|
||||
status: response.status,
|
||||
status: initResponse.status,
|
||||
}
|
||||
}
|
||||
|
||||
if (!response.body) {
|
||||
const initData: ChatInitResponse = await initResponse.json()
|
||||
if (!initData.success || !initData.streamId) {
|
||||
return {
|
||||
success: false,
|
||||
error: 'No response body received',
|
||||
error: 'Failed to get stream ID from server',
|
||||
status: 500,
|
||||
}
|
||||
}
|
||||
|
||||
logger.info('Chat initiated, connecting to stream', {
|
||||
streamId: initData.streamId,
|
||||
chatId: initData.chatId,
|
||||
})
|
||||
|
||||
// Step 2: Connect to stream endpoint for SSE
|
||||
const streamResponse = await fetch(`/api/copilot/stream/${initData.streamId}`, {
|
||||
method: 'GET',
|
||||
headers: { Accept: 'text/event-stream' },
|
||||
signal: abortSignal,
|
||||
credentials: 'include',
|
||||
})
|
||||
|
||||
if (!streamResponse.ok) {
|
||||
// Handle completed/not found cases
|
||||
if (streamResponse.status === 404) {
|
||||
return {
|
||||
success: false,
|
||||
error: 'Stream not found or expired',
|
||||
status: 404,
|
||||
streamId: initData.streamId,
|
||||
chatId: initData.chatId,
|
||||
}
|
||||
}
|
||||
|
||||
const errorMessage = await handleApiError(streamResponse, 'Failed to connect to stream')
|
||||
return {
|
||||
success: false,
|
||||
error: errorMessage,
|
||||
status: streamResponse.status,
|
||||
streamId: initData.streamId,
|
||||
chatId: initData.chatId,
|
||||
}
|
||||
}
|
||||
|
||||
if (!streamResponse.body) {
|
||||
return {
|
||||
success: false,
|
||||
error: 'No stream body received',
|
||||
status: 500,
|
||||
streamId: initData.streamId,
|
||||
chatId: initData.chatId,
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
success: true,
|
||||
stream: response.body,
|
||||
stream: streamResponse.body,
|
||||
streamId: initData.streamId,
|
||||
chatId: initData.chatId,
|
||||
}
|
||||
} catch (error) {
|
||||
// Handle AbortError gracefully - this is expected when user aborts
|
||||
|
||||
438
apps/sim/lib/copilot/server-tool-executor.ts
Normal file
438
apps/sim/lib/copilot/server-tool-executor.ts
Normal file
@@ -0,0 +1,438 @@
|
||||
/**
|
||||
* Server-Side Tool Executor for Copilot
|
||||
*
|
||||
* Executes copilot tools server-side when no client session is present.
|
||||
* Handles routing to appropriate server implementations and marking tools complete.
|
||||
*/
|
||||
|
||||
import { db } from '@sim/db'
|
||||
import { account, workflow } from '@sim/db/schema'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { and, eq } from 'drizzle-orm'
|
||||
import { isClientOnlyTool } from '@/lib/copilot/tools/client/ui-config'
|
||||
import { routeExecution } from '@/lib/copilot/tools/server/router'
|
||||
import { SIM_AGENT_API_URL_DEFAULT } from '@/lib/copilot/constants'
|
||||
import { env } from '@/lib/core/config/env'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { getBaseUrl } from '@/lib/core/utils/urls'
|
||||
import { getEffectiveDecryptedEnv } from '@/lib/environment/utils'
|
||||
import { refreshTokenIfNeeded } from '@/app/api/auth/oauth/utils'
|
||||
import { resolveEnvVarReferences } from '@/executor/utils/reference-validation'
|
||||
import { executeTool } from '@/tools'
|
||||
import { getTool, resolveToolId } from '@/tools/utils'
|
||||
|
||||
const logger = createLogger('ServerToolExecutor')
|
||||
|
||||
const SIM_AGENT_API_URL = env.SIM_AGENT_API_URL || SIM_AGENT_API_URL_DEFAULT
|
||||
|
||||
/**
|
||||
* Context for tool execution
|
||||
*/
|
||||
export interface ToolExecutionContext {
|
||||
userId: string
|
||||
workflowId: string
|
||||
chatId: string
|
||||
streamId: string
|
||||
workspaceId?: string
|
||||
}
|
||||
|
||||
/**
|
||||
* Result of tool execution
|
||||
*/
|
||||
export interface ToolExecutionResult {
|
||||
success: boolean
|
||||
status: number
|
||||
message?: string
|
||||
data?: unknown
|
||||
}
|
||||
|
||||
/**
|
||||
* Tools that have dedicated server implementations in the router
|
||||
*/
|
||||
const SERVER_ROUTED_TOOLS = [
|
||||
'edit_workflow',
|
||||
'get_workflow_data',
|
||||
'get_workflow_console',
|
||||
'get_blocks_and_tools',
|
||||
'get_blocks_metadata',
|
||||
'get_block_options',
|
||||
'get_block_config',
|
||||
'get_trigger_blocks',
|
||||
'knowledge_base',
|
||||
'set_environment_variables',
|
||||
'get_credentials',
|
||||
'search_documentation',
|
||||
'make_api_request',
|
||||
'search_online',
|
||||
]
|
||||
|
||||
/**
|
||||
* Tools that execute workflows
|
||||
*/
|
||||
const WORKFLOW_EXECUTION_TOOLS = ['run_workflow']
|
||||
|
||||
/**
|
||||
* Tools that handle deployments
|
||||
*/
|
||||
const DEPLOYMENT_TOOLS = ['deploy_api', 'deploy_chat', 'deploy_mcp', 'redeploy']
|
||||
|
||||
/**
|
||||
* Execute a tool server-side.
|
||||
* Returns result to be sent to Sim Agent via mark-complete.
|
||||
*/
|
||||
export async function executeToolServerSide(
|
||||
toolName: string,
|
||||
toolCallId: string,
|
||||
args: Record<string, unknown>,
|
||||
context: ToolExecutionContext
|
||||
): Promise<ToolExecutionResult> {
|
||||
logger.info('Executing tool server-side', {
|
||||
toolName,
|
||||
toolCallId,
|
||||
userId: context.userId,
|
||||
workflowId: context.workflowId,
|
||||
})
|
||||
|
||||
// 1. Check if tool is client-only
|
||||
if (isClientOnlyTool(toolName)) {
|
||||
logger.info('Skipping client-only tool', { toolName, toolCallId })
|
||||
return {
|
||||
success: true,
|
||||
status: 200,
|
||||
message: `Tool "${toolName}" requires a browser session and was skipped in API mode.`,
|
||||
data: { skipped: true, reason: 'client_only' },
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
// 2. Route to appropriate executor
|
||||
if (SERVER_ROUTED_TOOLS.includes(toolName)) {
|
||||
return executeServerRoutedTool(toolName, args, context)
|
||||
}
|
||||
|
||||
if (WORKFLOW_EXECUTION_TOOLS.includes(toolName)) {
|
||||
return executeRunWorkflow(args, context)
|
||||
}
|
||||
|
||||
if (DEPLOYMENT_TOOLS.includes(toolName)) {
|
||||
return executeDeploymentTool(toolName, args, context)
|
||||
}
|
||||
|
||||
// 3. Try integration tool execution (Slack, Gmail, etc.)
|
||||
return executeIntegrationTool(toolName, toolCallId, args, context)
|
||||
} catch (error) {
|
||||
logger.error('Tool execution failed', {
|
||||
toolName,
|
||||
toolCallId,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
})
|
||||
return {
|
||||
success: false,
|
||||
status: 500,
|
||||
message: error instanceof Error ? error.message : 'Tool execution failed',
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a tool that has a dedicated server implementation
|
||||
*/
|
||||
async function executeServerRoutedTool(
|
||||
toolName: string,
|
||||
args: Record<string, unknown>,
|
||||
context: ToolExecutionContext
|
||||
): Promise<ToolExecutionResult> {
|
||||
try {
|
||||
const result = await routeExecution(toolName, args, { userId: context.userId })
|
||||
return {
|
||||
success: true,
|
||||
status: 200,
|
||||
data: result,
|
||||
}
|
||||
} catch (error) {
|
||||
return {
|
||||
success: false,
|
||||
status: 500,
|
||||
message: error instanceof Error ? error.message : 'Server tool execution failed',
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute the run_workflow tool
|
||||
*/
|
||||
async function executeRunWorkflow(
|
||||
args: Record<string, unknown>,
|
||||
context: ToolExecutionContext
|
||||
): Promise<ToolExecutionResult> {
|
||||
const workflowId = (args.workflowId as string) || context.workflowId
|
||||
const input = (args.input as Record<string, unknown>) || {}
|
||||
|
||||
logger.info('Executing run_workflow', { workflowId, inputKeys: Object.keys(input) })
|
||||
|
||||
try {
|
||||
const response = await fetch(`${getBaseUrl()}/api/workflows/${workflowId}/execute`, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
Authorization: `Bearer ${await generateInternalToken()}`,
|
||||
},
|
||||
body: JSON.stringify({
|
||||
input,
|
||||
triggerType: 'copilot',
|
||||
workflowId, // For internal auth
|
||||
}),
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
const errorText = await response.text()
|
||||
return {
|
||||
success: false,
|
||||
status: response.status,
|
||||
message: `Workflow execution failed: ${errorText}`,
|
||||
}
|
||||
}
|
||||
|
||||
const result = await response.json()
|
||||
return {
|
||||
success: true,
|
||||
status: 200,
|
||||
data: result,
|
||||
}
|
||||
} catch (error) {
|
||||
return {
|
||||
success: false,
|
||||
status: 500,
|
||||
message: error instanceof Error ? error.message : 'Workflow execution failed',
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a deployment tool
|
||||
*/
|
||||
async function executeDeploymentTool(
|
||||
toolName: string,
|
||||
args: Record<string, unknown>,
|
||||
context: ToolExecutionContext
|
||||
): Promise<ToolExecutionResult> {
|
||||
// Deployment tools modify workflow state and create deployments
|
||||
// These can be executed server-side via the server router
|
||||
try {
|
||||
const result = await routeExecution(toolName, args, { userId: context.userId })
|
||||
return {
|
||||
success: true,
|
||||
status: 200,
|
||||
data: result,
|
||||
}
|
||||
} catch (error) {
|
||||
// If the tool isn't in the router, it might need to be added
|
||||
// For now, return a skip result
|
||||
logger.warn('Deployment tool not available server-side', { toolName })
|
||||
return {
|
||||
success: true,
|
||||
status: 200,
|
||||
message: `Deployment tool "${toolName}" executed with limited functionality in API mode.`,
|
||||
data: { skipped: true, reason: 'limited_api_support' },
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute an integration tool (Slack, Gmail, etc.)
|
||||
* Uses the same logic as /api/copilot/execute-tool
|
||||
*/
|
||||
async function executeIntegrationTool(
|
||||
toolName: string,
|
||||
toolCallId: string,
|
||||
args: Record<string, unknown>,
|
||||
context: ToolExecutionContext
|
||||
): Promise<ToolExecutionResult> {
|
||||
const resolvedToolName = resolveToolId(toolName)
|
||||
const toolConfig = getTool(resolvedToolName)
|
||||
|
||||
if (!toolConfig) {
|
||||
// Tool not found - try server router as fallback
|
||||
try {
|
||||
const result = await routeExecution(toolName, args, { userId: context.userId })
|
||||
return {
|
||||
success: true,
|
||||
status: 200,
|
||||
data: result,
|
||||
}
|
||||
} catch {
|
||||
logger.warn('Tool not found', { toolName, resolvedToolName })
|
||||
return {
|
||||
success: true,
|
||||
status: 200,
|
||||
message: `Tool "${toolName}" not found. Skipped.`,
|
||||
data: { skipped: true, reason: 'not_found' },
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Get workspaceId for env vars
|
||||
let workspaceId = context.workspaceId
|
||||
if (!workspaceId && context.workflowId) {
|
||||
const workflowResult = await db
|
||||
.select({ workspaceId: workflow.workspaceId })
|
||||
.from(workflow)
|
||||
.where(eq(workflow.id, context.workflowId))
|
||||
.limit(1)
|
||||
workspaceId = workflowResult[0]?.workspaceId ?? undefined
|
||||
}
|
||||
|
||||
// Get decrypted environment variables
|
||||
const decryptedEnvVars = await getEffectiveDecryptedEnv(context.userId, workspaceId)
|
||||
|
||||
// Resolve env var references in arguments
|
||||
const executionParams: Record<string, unknown> = resolveEnvVarReferences(
|
||||
args,
|
||||
decryptedEnvVars,
|
||||
{
|
||||
resolveExactMatch: true,
|
||||
allowEmbedded: true,
|
||||
trimKeys: true,
|
||||
onMissing: 'keep',
|
||||
deep: true,
|
||||
}
|
||||
) as Record<string, unknown>
|
||||
|
||||
// Resolve OAuth access token if required
|
||||
if (toolConfig.oauth?.required && toolConfig.oauth.provider) {
|
||||
const provider = toolConfig.oauth.provider
|
||||
|
||||
try {
|
||||
const accounts = await db
|
||||
.select()
|
||||
.from(account)
|
||||
.where(and(eq(account.providerId, provider), eq(account.userId, context.userId)))
|
||||
.limit(1)
|
||||
|
||||
if (accounts.length > 0) {
|
||||
const acc = accounts[0]
|
||||
const requestId = generateRequestId()
|
||||
const { accessToken } = await refreshTokenIfNeeded(requestId, acc as any, acc.id)
|
||||
|
||||
if (accessToken) {
|
||||
executionParams.accessToken = accessToken
|
||||
} else {
|
||||
return {
|
||||
success: false,
|
||||
status: 400,
|
||||
message: `OAuth token not available for ${provider}. Please reconnect your account.`,
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return {
|
||||
success: false,
|
||||
status: 400,
|
||||
message: `No ${provider} account connected. Please connect your account first.`,
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
return {
|
||||
success: false,
|
||||
status: 500,
|
||||
message: `Failed to get OAuth token for ${toolConfig.oauth.provider}`,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check if tool requires an API key
|
||||
const needsApiKey = toolConfig.params?.apiKey?.required
|
||||
if (needsApiKey && !executionParams.apiKey) {
|
||||
return {
|
||||
success: false,
|
||||
status: 400,
|
||||
message: `API key not provided for ${toolName}.`,
|
||||
}
|
||||
}
|
||||
|
||||
// Add execution context
|
||||
executionParams._context = {
|
||||
workflowId: context.workflowId,
|
||||
userId: context.userId,
|
||||
}
|
||||
|
||||
// Special handling for function_execute
|
||||
if (toolName === 'function_execute') {
|
||||
executionParams.envVars = decryptedEnvVars
|
||||
executionParams.workflowVariables = {}
|
||||
executionParams.blockData = {}
|
||||
executionParams.blockNameMapping = {}
|
||||
executionParams.language = executionParams.language || 'javascript'
|
||||
executionParams.timeout = executionParams.timeout || 30000
|
||||
}
|
||||
|
||||
// Execute the tool
|
||||
const result = await executeTool(resolvedToolName, executionParams, true)
|
||||
|
||||
logger.info('Integration tool execution complete', {
|
||||
toolName,
|
||||
success: result.success,
|
||||
})
|
||||
|
||||
return {
|
||||
success: result.success,
|
||||
status: result.success ? 200 : 500,
|
||||
message: result.error,
|
||||
data: result.output,
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark a tool as complete with Sim Agent
|
||||
*/
|
||||
export async function markToolComplete(
|
||||
toolCallId: string,
|
||||
toolName: string,
|
||||
result: ToolExecutionResult
|
||||
): Promise<boolean> {
|
||||
logger.info('Marking tool complete', {
|
||||
toolCallId,
|
||||
toolName,
|
||||
success: result.success,
|
||||
status: result.status,
|
||||
})
|
||||
|
||||
try {
|
||||
const response = await fetch(`${SIM_AGENT_API_URL}/api/tools/mark-complete`, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
...(env.COPILOT_API_KEY ? { 'x-api-key': env.COPILOT_API_KEY } : {}),
|
||||
},
|
||||
body: JSON.stringify({
|
||||
id: toolCallId,
|
||||
name: toolName,
|
||||
status: result.status,
|
||||
message: result.message,
|
||||
data: result.data,
|
||||
}),
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
logger.error('Mark complete failed', { toolCallId, status: response.status })
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
} catch (error) {
|
||||
logger.error('Mark complete error', {
|
||||
toolCallId,
|
||||
error: error instanceof Error ? error.message : String(error),
|
||||
})
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate an internal authentication token for server-to-server calls
|
||||
*/
|
||||
async function generateInternalToken(): Promise<string> {
|
||||
// Use the same pattern as A2A for internal auth
|
||||
const { generateInternalToken: genToken } = await import('@/app/api/a2a/serve/[agentId]/utils')
|
||||
return genToken()
|
||||
}
|
||||
|
||||
453
apps/sim/lib/copilot/stream-persistence.ts
Normal file
453
apps/sim/lib/copilot/stream-persistence.ts
Normal file
@@ -0,0 +1,453 @@
|
||||
/**
|
||||
* Stream Persistence Service for Copilot
|
||||
*
|
||||
* Handles persisting copilot stream state to Redis (ephemeral) and database (permanent).
|
||||
* Uses Redis LIST for chunk history and Pub/Sub for live updates (no polling).
|
||||
*
|
||||
* Redis Key Structure:
|
||||
* - copilot:stream:{streamId}:meta → StreamMeta JSON (TTL: 10 min)
|
||||
* - copilot:stream:{streamId}:chunks → LIST of chunks (for replay)
|
||||
* - copilot:stream:{streamId} → Pub/Sub CHANNEL (for live updates)
|
||||
* - copilot:active:{chatId} → streamId lookup
|
||||
* - copilot:abort:{streamId} → abort signal flag
|
||||
*/
|
||||
|
||||
import { db } from '@sim/db'
|
||||
import { copilotChats } from '@sim/db/schema'
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { eq } from 'drizzle-orm'
|
||||
import type Redis from 'ioredis'
|
||||
import { getRedisClient } from '@/lib/core/config/redis'
|
||||
|
||||
const logger = createLogger('CopilotStreamPersistence')
|
||||
|
||||
const STREAM_TTL = 60 * 10 // 10 minutes
|
||||
|
||||
/**
|
||||
* Tool call record stored in stream state
|
||||
*/
|
||||
export interface ToolCallRecord {
|
||||
id: string
|
||||
name: string
|
||||
args: Record<string, unknown>
|
||||
state: 'pending' | 'executing' | 'success' | 'error' | 'skipped'
|
||||
result?: unknown
|
||||
error?: string
|
||||
}
|
||||
|
||||
/**
|
||||
* Stream metadata stored in Redis
|
||||
*/
|
||||
export interface StreamMeta {
|
||||
id: string
|
||||
status: 'streaming' | 'completed' | 'error'
|
||||
chatId: string
|
||||
userId: string
|
||||
workflowId: string
|
||||
userMessageId: string
|
||||
isClientSession: boolean
|
||||
toolCalls: ToolCallRecord[]
|
||||
assistantContent: string
|
||||
conversationId?: string
|
||||
createdAt: number
|
||||
updatedAt: number
|
||||
}
|
||||
|
||||
/**
|
||||
* Parameters for creating a new stream
|
||||
*/
|
||||
export interface CreateStreamParams {
|
||||
streamId: string
|
||||
chatId: string
|
||||
userId: string
|
||||
workflowId: string
|
||||
userMessageId: string
|
||||
isClientSession: boolean
|
||||
}
|
||||
|
||||
// ============ WRITE OPERATIONS (used by original request handler) ============
|
||||
|
||||
/**
|
||||
* Create a new stream state in Redis
|
||||
*/
|
||||
export async function createStream(params: CreateStreamParams): Promise<void> {
|
||||
const redis = getRedisClient()
|
||||
if (!redis) {
|
||||
logger.warn('Redis not available, stream persistence disabled')
|
||||
return
|
||||
}
|
||||
|
||||
const meta: StreamMeta = {
|
||||
id: params.streamId,
|
||||
status: 'streaming',
|
||||
chatId: params.chatId,
|
||||
userId: params.userId,
|
||||
workflowId: params.workflowId,
|
||||
userMessageId: params.userMessageId,
|
||||
isClientSession: params.isClientSession,
|
||||
toolCalls: [],
|
||||
assistantContent: '',
|
||||
createdAt: Date.now(),
|
||||
updatedAt: Date.now(),
|
||||
}
|
||||
|
||||
const metaKey = `copilot:stream:${params.streamId}:meta`
|
||||
const activeKey = `copilot:active:${params.chatId}`
|
||||
|
||||
await redis.setex(metaKey, STREAM_TTL, JSON.stringify(meta))
|
||||
await redis.setex(activeKey, STREAM_TTL, params.streamId)
|
||||
|
||||
logger.info('Created stream state', { streamId: params.streamId, chatId: params.chatId })
|
||||
}
|
||||
|
||||
/**
|
||||
* Append a chunk to the stream buffer and publish for live subscribers
|
||||
*/
|
||||
export async function appendChunk(streamId: string, chunk: string): Promise<void> {
|
||||
const redis = getRedisClient()
|
||||
if (!redis) return
|
||||
|
||||
const listKey = `copilot:stream:${streamId}:chunks`
|
||||
const channel = `copilot:stream:${streamId}`
|
||||
|
||||
// Push to list for replay, publish for live subscribers
|
||||
await redis.rpush(listKey, chunk)
|
||||
await redis.expire(listKey, STREAM_TTL)
|
||||
await redis.publish(channel, chunk)
|
||||
}
|
||||
|
||||
/**
|
||||
* Append content to the accumulated assistant content
|
||||
*/
|
||||
export async function appendContent(streamId: string, content: string): Promise<void> {
|
||||
const redis = getRedisClient()
|
||||
if (!redis) return
|
||||
|
||||
const metaKey = `copilot:stream:${streamId}:meta`
|
||||
const raw = await redis.get(metaKey)
|
||||
if (!raw) return
|
||||
|
||||
const meta: StreamMeta = JSON.parse(raw)
|
||||
meta.assistantContent += content
|
||||
meta.updatedAt = Date.now()
|
||||
|
||||
await redis.setex(metaKey, STREAM_TTL, JSON.stringify(meta))
|
||||
}
|
||||
|
||||
/**
|
||||
* Update stream metadata
|
||||
*/
|
||||
export async function updateMeta(streamId: string, update: Partial<StreamMeta>): Promise<void> {
|
||||
const redis = getRedisClient()
|
||||
if (!redis) return
|
||||
|
||||
const metaKey = `copilot:stream:${streamId}:meta`
|
||||
const raw = await redis.get(metaKey)
|
||||
if (!raw) return
|
||||
|
||||
const meta: StreamMeta = { ...JSON.parse(raw), ...update, updatedAt: Date.now() }
|
||||
await redis.setex(metaKey, STREAM_TTL, JSON.stringify(meta))
|
||||
}
|
||||
|
||||
/**
|
||||
* Update a specific tool call in the stream state
|
||||
*/
|
||||
export async function updateToolCall(
|
||||
streamId: string,
|
||||
toolCallId: string,
|
||||
update: Partial<ToolCallRecord>
|
||||
): Promise<void> {
|
||||
const redis = getRedisClient()
|
||||
if (!redis) return
|
||||
|
||||
const metaKey = `copilot:stream:${streamId}:meta`
|
||||
const raw = await redis.get(metaKey)
|
||||
if (!raw) return
|
||||
|
||||
const meta: StreamMeta = JSON.parse(raw)
|
||||
const toolCallIndex = meta.toolCalls.findIndex((tc) => tc.id === toolCallId)
|
||||
|
||||
if (toolCallIndex >= 0) {
|
||||
meta.toolCalls[toolCallIndex] = { ...meta.toolCalls[toolCallIndex], ...update }
|
||||
} else {
|
||||
// Add new tool call
|
||||
meta.toolCalls.push({
|
||||
id: toolCallId,
|
||||
name: update.name || 'unknown',
|
||||
args: update.args || {},
|
||||
state: update.state || 'pending',
|
||||
result: update.result,
|
||||
error: update.error,
|
||||
})
|
||||
}
|
||||
|
||||
meta.updatedAt = Date.now()
|
||||
await redis.setex(metaKey, STREAM_TTL, JSON.stringify(meta))
|
||||
}
|
||||
|
||||
/**
|
||||
* Complete the stream - save to database and cleanup Redis
|
||||
*/
|
||||
export async function completeStream(streamId: string, conversationId?: string): Promise<void> {
|
||||
const redis = getRedisClient()
|
||||
if (!redis) return
|
||||
|
||||
const meta = await getStreamMeta(streamId)
|
||||
if (!meta) return
|
||||
|
||||
// Publish completion event for subscribers
|
||||
await redis.publish(`copilot:stream:${streamId}`, JSON.stringify({ type: 'stream_complete' }))
|
||||
|
||||
// Save to database
|
||||
await saveToDatabase(meta, conversationId)
|
||||
|
||||
// Cleanup Redis
|
||||
await redis.del(`copilot:stream:${streamId}:meta`)
|
||||
await redis.del(`copilot:stream:${streamId}:chunks`)
|
||||
await redis.del(`copilot:active:${meta.chatId}`)
|
||||
await redis.del(`copilot:abort:${streamId}`)
|
||||
|
||||
logger.info('Completed stream', { streamId, chatId: meta.chatId })
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark stream as errored and save partial content
|
||||
*/
|
||||
export async function errorStream(streamId: string, error: string): Promise<void> {
|
||||
const redis = getRedisClient()
|
||||
if (!redis) return
|
||||
|
||||
const meta = await getStreamMeta(streamId)
|
||||
if (!meta) return
|
||||
|
||||
// Update status
|
||||
meta.status = 'error'
|
||||
|
||||
// Publish error event for subscribers
|
||||
await redis.publish(
|
||||
`copilot:stream:${streamId}`,
|
||||
JSON.stringify({ type: 'stream_error', error })
|
||||
)
|
||||
|
||||
// Still save what we have to database
|
||||
await saveToDatabase(meta)
|
||||
|
||||
// Cleanup Redis
|
||||
await redis.del(`copilot:stream:${streamId}:meta`)
|
||||
await redis.del(`copilot:stream:${streamId}:chunks`)
|
||||
await redis.del(`copilot:active:${meta.chatId}`)
|
||||
await redis.del(`copilot:abort:${streamId}`)
|
||||
|
||||
logger.info('Errored stream', { streamId, error })
|
||||
}
|
||||
|
||||
/**
|
||||
* Save stream content to database as assistant message
|
||||
*/
|
||||
async function saveToDatabase(meta: StreamMeta, conversationId?: string): Promise<void> {
|
||||
try {
|
||||
const [chat] = await db
|
||||
.select()
|
||||
.from(copilotChats)
|
||||
.where(eq(copilotChats.id, meta.chatId))
|
||||
.limit(1)
|
||||
|
||||
if (!chat) {
|
||||
logger.warn('Chat not found for stream save', { chatId: meta.chatId })
|
||||
return
|
||||
}
|
||||
|
||||
const existingMessages = Array.isArray(chat.messages) ? chat.messages : []
|
||||
|
||||
// Build the assistant message
|
||||
const assistantMessage = {
|
||||
id: crypto.randomUUID(),
|
||||
role: 'assistant',
|
||||
content: meta.assistantContent,
|
||||
toolCalls: meta.toolCalls,
|
||||
timestamp: new Date().toISOString(),
|
||||
serverCompleted: true, // Mark that this was completed server-side
|
||||
}
|
||||
|
||||
const updatedMessages = [...existingMessages, assistantMessage]
|
||||
|
||||
await db
|
||||
.update(copilotChats)
|
||||
.set({
|
||||
messages: updatedMessages,
|
||||
conversationId: conversationId || (chat.conversationId as string | undefined),
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(copilotChats.id, meta.chatId))
|
||||
|
||||
logger.info('Saved stream to database', {
|
||||
streamId: meta.id,
|
||||
chatId: meta.chatId,
|
||||
contentLength: meta.assistantContent.length,
|
||||
toolCallsCount: meta.toolCalls.length,
|
||||
})
|
||||
} catch (error) {
|
||||
logger.error('Failed to save stream to database', { streamId: meta.id, error })
|
||||
}
|
||||
}
|
||||
|
||||
// ============ READ OPERATIONS (used by resume handler) ============
|
||||
|
||||
/**
|
||||
* Get stream metadata
|
||||
*/
|
||||
export async function getStreamMeta(streamId: string): Promise<StreamMeta | null> {
|
||||
const redis = getRedisClient()
|
||||
if (!redis) return null
|
||||
|
||||
const raw = await redis.get(`copilot:stream:${streamId}:meta`)
|
||||
return raw ? JSON.parse(raw) : null
|
||||
}
|
||||
|
||||
/**
|
||||
* Get chunks from stream history (for replay)
|
||||
*/
|
||||
export async function getChunks(streamId: string, fromIndex: number = 0): Promise<string[]> {
|
||||
const redis = getRedisClient()
|
||||
if (!redis) return []
|
||||
|
||||
const listKey = `copilot:stream:${streamId}:chunks`
|
||||
return redis.lrange(listKey, fromIndex, -1)
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the number of chunks in the stream
|
||||
*/
|
||||
export async function getChunkCount(streamId: string): Promise<number> {
|
||||
const redis = getRedisClient()
|
||||
if (!redis) return 0
|
||||
|
||||
const listKey = `copilot:stream:${streamId}:chunks`
|
||||
return redis.llen(listKey)
|
||||
}
|
||||
|
||||
/**
|
||||
* Get active stream ID for a chat (if any)
|
||||
*/
|
||||
export async function getActiveStreamForChat(chatId: string): Promise<string | null> {
|
||||
const redis = getRedisClient()
|
||||
if (!redis) return null
|
||||
|
||||
return redis.get(`copilot:active:${chatId}`)
|
||||
}
|
||||
|
||||
// ============ SUBSCRIPTION (for resume handler) ============
|
||||
|
||||
/**
|
||||
* Subscribe to live stream updates.
|
||||
* Uses Redis Pub/Sub - no polling, fully event-driven.
|
||||
*
|
||||
* @param streamId - Stream to subscribe to
|
||||
* @param onChunk - Callback for each new chunk
|
||||
* @param onComplete - Callback when stream completes
|
||||
* @param signal - Optional AbortSignal to cancel subscription
|
||||
*/
|
||||
export async function subscribeToStream(
|
||||
streamId: string,
|
||||
onChunk: (chunk: string) => void,
|
||||
onComplete: () => void,
|
||||
signal?: AbortSignal
|
||||
): Promise<void> {
|
||||
const redis = getRedisClient()
|
||||
if (!redis) {
|
||||
onComplete()
|
||||
return
|
||||
}
|
||||
|
||||
// Create a separate Redis connection for subscription
|
||||
const subscriber = redis.duplicate()
|
||||
const channel = `copilot:stream:${streamId}`
|
||||
|
||||
let isComplete = false
|
||||
|
||||
const cleanup = () => {
|
||||
if (!isComplete) {
|
||||
isComplete = true
|
||||
subscriber.unsubscribe(channel).catch(() => {})
|
||||
subscriber.quit().catch(() => {})
|
||||
}
|
||||
}
|
||||
|
||||
signal?.addEventListener('abort', cleanup)
|
||||
|
||||
await subscriber.subscribe(channel)
|
||||
|
||||
subscriber.on('message', (ch, message) => {
|
||||
if (ch !== channel) return
|
||||
|
||||
try {
|
||||
const parsed = JSON.parse(message)
|
||||
if (parsed.type === 'stream_complete' || parsed.type === 'stream_error') {
|
||||
cleanup()
|
||||
onComplete()
|
||||
return
|
||||
}
|
||||
} catch {
|
||||
// Not a control message, just a chunk
|
||||
}
|
||||
|
||||
onChunk(message)
|
||||
})
|
||||
|
||||
subscriber.on('error', (err) => {
|
||||
logger.error('Subscriber error', { streamId, error: err })
|
||||
cleanup()
|
||||
onComplete()
|
||||
})
|
||||
}
|
||||
|
||||
// ============ ABORT HANDLING ============
|
||||
|
||||
/**
|
||||
* Set abort signal for a stream.
|
||||
* The original request handler should check this and cancel if set.
|
||||
*/
|
||||
export async function setAbortSignal(streamId: string): Promise<void> {
|
||||
const redis = getRedisClient()
|
||||
if (!redis) return
|
||||
|
||||
await redis.setex(`copilot:abort:${streamId}`, 60, '1')
|
||||
// Also publish to channel so handler sees it immediately
|
||||
await redis.publish(`copilot:stream:${streamId}`, JSON.stringify({ type: 'abort' }))
|
||||
|
||||
logger.info('Set abort signal', { streamId })
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if abort signal is set for a stream
|
||||
*/
|
||||
export async function checkAbortSignal(streamId: string): Promise<boolean> {
|
||||
const redis = getRedisClient()
|
||||
if (!redis) return false
|
||||
|
||||
const val = await redis.get(`copilot:abort:${streamId}`)
|
||||
return val === '1'
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear abort signal for a stream
|
||||
*/
|
||||
export async function clearAbortSignal(streamId: string): Promise<void> {
|
||||
const redis = getRedisClient()
|
||||
if (!redis) return
|
||||
|
||||
await redis.del(`copilot:abort:${streamId}`)
|
||||
}
|
||||
|
||||
/**
|
||||
* Refresh TTL on all stream keys (call periodically during long streams)
|
||||
*/
|
||||
export async function refreshStreamTTL(streamId: string, chatId: string): Promise<void> {
|
||||
const redis = getRedisClient()
|
||||
if (!redis) return
|
||||
|
||||
await redis.expire(`copilot:stream:${streamId}:meta`, STREAM_TTL)
|
||||
await redis.expire(`copilot:stream:${streamId}:chunks`, STREAM_TTL)
|
||||
await redis.expire(`copilot:active:${chatId}`, STREAM_TTL)
|
||||
}
|
||||
|
||||
@@ -5,6 +5,9 @@
|
||||
* Import this module early in the app to ensure all tool configs are available.
|
||||
*/
|
||||
|
||||
// Navigation tools
|
||||
import './navigation/navigate-ui'
|
||||
|
||||
// Other tools (subagents)
|
||||
import './other/auth'
|
||||
import './other/custom-tool'
|
||||
@@ -41,6 +44,7 @@ export {
|
||||
getToolUIConfig,
|
||||
hasInterrupt,
|
||||
type InterruptConfig,
|
||||
isClientOnlyTool,
|
||||
isSpecialTool,
|
||||
isSubagentTool,
|
||||
type ParamsTableConfig,
|
||||
|
||||
@@ -5,6 +5,7 @@ import {
|
||||
type BaseClientToolMetadata,
|
||||
ClientToolCallState,
|
||||
} from '@/lib/copilot/tools/client/base-tool'
|
||||
import { registerToolUIConfig } from '@/lib/copilot/tools/client/ui-config'
|
||||
import { useCopilotStore } from '@/stores/panel/copilot/store'
|
||||
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
|
||||
|
||||
@@ -239,3 +240,12 @@ export class NavigateUIClientTool extends BaseClientTool {
|
||||
await this.handleAccept(args)
|
||||
}
|
||||
}
|
||||
|
||||
// Register UI config at module load - clientOnly because this requires browser navigation
|
||||
registerToolUIConfig(NavigateUIClientTool.id, {
|
||||
clientOnly: true,
|
||||
interrupt: {
|
||||
accept: { text: 'Open', icon: Navigation },
|
||||
reject: { text: 'Skip', icon: XCircle },
|
||||
},
|
||||
})
|
||||
|
||||
@@ -33,6 +33,7 @@ export class TourClientTool extends BaseClientTool {
|
||||
[ClientToolCallState.aborted]: { text: 'Aborted tour', icon: XCircle },
|
||||
},
|
||||
uiConfig: {
|
||||
clientOnly: true, // Tour requires browser UI to guide the user
|
||||
subagent: {
|
||||
streamingLabel: 'Touring',
|
||||
completedLabel: 'Tour complete',
|
||||
|
||||
@@ -172,6 +172,13 @@ export interface ToolUIConfig {
|
||||
* The tool-call component will use this to render specialized content.
|
||||
*/
|
||||
customRenderer?: 'code' | 'edit_summary' | 'none'
|
||||
|
||||
/**
|
||||
* Whether this tool requires a client/browser session to execute.
|
||||
* Client-only tools (like navigate_ui, tour) cannot run in headless/API mode.
|
||||
* In API-only mode, these tools will be skipped with a message.
|
||||
*/
|
||||
clientOnly?: boolean
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -215,6 +222,14 @@ export function hasInterrupt(toolName: string): boolean {
|
||||
return !!toolUIConfigs[toolName]?.interrupt
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a tool is client-only (requires browser session).
|
||||
* Client-only tools cannot execute in headless/API mode.
|
||||
*/
|
||||
export function isClientOnlyTool(toolName: string): boolean {
|
||||
return !!toolUIConfigs[toolName]?.clientOnly
|
||||
}
|
||||
|
||||
/**
|
||||
* Get subagent labels for a tool
|
||||
*/
|
||||
|
||||
@@ -543,6 +543,12 @@ export class ExecutionLogger implements IExecutionLoggerService {
|
||||
case 'chat':
|
||||
updateFields.totalChatExecutions = sql`total_chat_executions + 1`
|
||||
break
|
||||
case 'mcp':
|
||||
updateFields.totalMcpExecutions = sql`total_mcp_executions + 1`
|
||||
break
|
||||
case 'a2a':
|
||||
updateFields.totalA2aExecutions = sql`total_a2a_executions + 1`
|
||||
break
|
||||
}
|
||||
|
||||
await db.update(userStats).set(updateFields).where(eq(userStats.userId, userId))
|
||||
|
||||
@@ -7,6 +7,7 @@ import type { InputFormatField } from '@/lib/workflows/types'
|
||||
export interface WorkflowInputField {
|
||||
name: string
|
||||
type: string
|
||||
description?: string
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -37,7 +38,7 @@ export function extractInputFieldsFromBlocks(
|
||||
if (Array.isArray(inputFormat)) {
|
||||
return inputFormat
|
||||
.filter(
|
||||
(field: unknown): field is { name: string; type?: string } =>
|
||||
(field: unknown): field is { name: string; type?: string; description?: string } =>
|
||||
typeof field === 'object' &&
|
||||
field !== null &&
|
||||
'name' in field &&
|
||||
@@ -47,6 +48,7 @@ export function extractInputFieldsFromBlocks(
|
||||
.map((field) => ({
|
||||
name: field.name,
|
||||
type: field.type || 'string',
|
||||
...(field.description && { description: field.description }),
|
||||
}))
|
||||
}
|
||||
|
||||
@@ -57,7 +59,7 @@ export function extractInputFieldsFromBlocks(
|
||||
if (Array.isArray(legacyFormat)) {
|
||||
return legacyFormat
|
||||
.filter(
|
||||
(field: unknown): field is { name: string; type?: string } =>
|
||||
(field: unknown): field is { name: string; type?: string; description?: string } =>
|
||||
typeof field === 'object' &&
|
||||
field !== null &&
|
||||
'name' in field &&
|
||||
@@ -67,6 +69,7 @@ export function extractInputFieldsFromBlocks(
|
||||
.map((field) => ({
|
||||
name: field.name,
|
||||
type: field.type || 'string',
|
||||
...(field.description && { description: field.description }),
|
||||
}))
|
||||
}
|
||||
|
||||
|
||||
@@ -1086,6 +1086,14 @@ const sseHandlers: Record<string, SSEHandler> = {
|
||||
await get().handleNewChatCreation(context.newChatId)
|
||||
}
|
||||
},
|
||||
stream_id: (_data, _context, get) => {
|
||||
// Store stream ID for potential resumption
|
||||
const streamId = _data?.streamId
|
||||
if (streamId) {
|
||||
get().setActiveStreamId(streamId)
|
||||
logger.debug('[SSE] Received stream ID', { streamId })
|
||||
}
|
||||
},
|
||||
tool_result: (data, context, get, set) => {
|
||||
try {
|
||||
const toolCallId: string | undefined = data?.toolCallId || data?.data?.id
|
||||
@@ -1735,10 +1743,12 @@ const sseHandlers: Record<string, SSEHandler> = {
|
||||
updateStreamingMessage(set, context)
|
||||
}
|
||||
},
|
||||
done: (_data, context) => {
|
||||
done: (_data, context, get) => {
|
||||
context.doneEventCount++
|
||||
if (context.doneEventCount >= 1) {
|
||||
context.streamComplete = true
|
||||
// Clear active stream ID when stream completes
|
||||
get().setActiveStreamId(null)
|
||||
}
|
||||
},
|
||||
error: (data, context, _get, set) => {
|
||||
@@ -2227,6 +2237,9 @@ const initialState = {
|
||||
autoAllowedTools: [] as string[],
|
||||
messageQueue: [] as import('./types').QueuedMessage[],
|
||||
suppressAbortContinueOption: false,
|
||||
activeStreamId: null as string | null,
|
||||
isResuming: false,
|
||||
userInitiatedAbort: false, // Track if abort was user-initiated vs browser refresh
|
||||
}
|
||||
|
||||
export const useCopilotStore = create<CopilotStore>()(
|
||||
@@ -2243,11 +2256,12 @@ export const useCopilotStore = create<CopilotStore>()(
|
||||
setWorkflowId: async (workflowId: string | null) => {
|
||||
const currentWorkflowId = get().workflowId
|
||||
if (currentWorkflowId === workflowId) return
|
||||
const { isSendingMessage } = get()
|
||||
if (isSendingMessage) get().abortMessage()
|
||||
|
||||
// Abort all in-progress tools and clear any diff preview
|
||||
abortAllInProgressTools(set, get)
|
||||
// Don't abort - let server-side stream continue for resumption
|
||||
// Just reset client state; stream will be resumable when returning to the chat
|
||||
// Don't abort tools either - they may still be running server-side
|
||||
set({ isSendingMessage: false, abortController: null })
|
||||
|
||||
try {
|
||||
useWorkflowDiffStore.getState().clearDiff({ restoreBaseline: false })
|
||||
} catch {}
|
||||
@@ -2278,10 +2292,14 @@ export const useCopilotStore = create<CopilotStore>()(
|
||||
if (!workflowId) {
|
||||
return
|
||||
}
|
||||
if (currentChat && currentChat.id !== chat.id && isSendingMessage) get().abortMessage()
|
||||
|
||||
// Abort in-progress tools and clear diff when changing chats
|
||||
abortAllInProgressTools(set, get)
|
||||
// Don't abort when switching chats - let server-side stream continue for resumption
|
||||
// Just reset client state; stream will be resumable when returning to that chat
|
||||
// Don't abort tools either - they may still be running server-side
|
||||
if (currentChat && currentChat.id !== chat.id && isSendingMessage) {
|
||||
set({ isSendingMessage: false, abortController: null })
|
||||
}
|
||||
|
||||
try {
|
||||
useWorkflowDiffStore.getState().clearDiff({ restoreBaseline: false })
|
||||
} catch {}
|
||||
@@ -2367,14 +2385,29 @@ export const useCopilotStore = create<CopilotStore>()(
|
||||
}
|
||||
}
|
||||
} catch {}
|
||||
|
||||
// Check for active stream that can be resumed
|
||||
try {
|
||||
const hasActiveStream = await get().checkForActiveStream(chat.id)
|
||||
if (hasActiveStream && get().activeStreamId) {
|
||||
logger.info('[Chat] Resuming active stream on chat select', { chatId: chat.id })
|
||||
await get().resumeActiveStream(get().activeStreamId!)
|
||||
}
|
||||
} catch (err) {
|
||||
logger.warn('[Chat] Failed to check/resume active stream', { error: err })
|
||||
}
|
||||
},
|
||||
|
||||
createNewChat: async () => {
|
||||
const { isSendingMessage } = get()
|
||||
if (isSendingMessage) get().abortMessage()
|
||||
|
||||
// Abort in-progress tools and clear diff on new chat
|
||||
abortAllInProgressTools(set, get)
|
||||
// Don't abort when creating new chat - let server-side stream continue for resumption
|
||||
// Just reset client state; stream will be resumable when returning to that chat
|
||||
// Don't abort tools either - they may still be running server-side
|
||||
if (isSendingMessage) {
|
||||
set({ isSendingMessage: false, abortController: null })
|
||||
}
|
||||
|
||||
try {
|
||||
useWorkflowDiffStore.getState().clearDiff({ restoreBaseline: false })
|
||||
} catch {}
|
||||
@@ -2497,6 +2530,21 @@ export const useCopilotStore = create<CopilotStore>()(
|
||||
mode: refreshedMode,
|
||||
selectedModel: refreshedModel as CopilotStore['selectedModel'],
|
||||
})
|
||||
|
||||
// Check for active stream that can be resumed (e.g., after page refresh)
|
||||
try {
|
||||
const hasActiveStream = await get().checkForActiveStream(updatedCurrentChat.id)
|
||||
if (hasActiveStream && get().activeStreamId) {
|
||||
logger.info('[Chat] Resuming active stream on refresh', {
|
||||
chatId: updatedCurrentChat.id,
|
||||
})
|
||||
await get().resumeActiveStream(get().activeStreamId!)
|
||||
}
|
||||
} catch (err) {
|
||||
logger.warn('[Chat] Failed to check/resume active stream on refresh', {
|
||||
error: err,
|
||||
})
|
||||
}
|
||||
}
|
||||
try {
|
||||
await get().loadMessageCheckpoints(updatedCurrentChat.id)
|
||||
@@ -2531,6 +2579,21 @@ export const useCopilotStore = create<CopilotStore>()(
|
||||
try {
|
||||
await get().loadMessageCheckpoints(mostRecentChat.id)
|
||||
} catch {}
|
||||
|
||||
// Check for active stream that can be resumed (e.g., after page refresh)
|
||||
try {
|
||||
const hasActiveStream = await get().checkForActiveStream(mostRecentChat.id)
|
||||
if (hasActiveStream && get().activeStreamId) {
|
||||
logger.info('[Chat] Resuming active stream on auto-select', {
|
||||
chatId: mostRecentChat.id,
|
||||
})
|
||||
await get().resumeActiveStream(get().activeStreamId!)
|
||||
}
|
||||
} catch (err) {
|
||||
logger.warn('[Chat] Failed to check/resume active stream on auto-select', {
|
||||
error: err,
|
||||
})
|
||||
}
|
||||
}
|
||||
} else {
|
||||
set({ currentChat: null, messages: [] })
|
||||
@@ -2697,13 +2760,18 @@ export const useCopilotStore = create<CopilotStore>()(
|
||||
})
|
||||
|
||||
if (result.success && result.stream) {
|
||||
// Store streamId for resumption if client disconnects
|
||||
if (result.streamId) {
|
||||
set({ activeStreamId: result.streamId })
|
||||
}
|
||||
await get().handleStreamingResponse(
|
||||
result.stream,
|
||||
streamingMessage.id,
|
||||
false,
|
||||
userMessage.id
|
||||
)
|
||||
set({ chatsLastLoadedAt: null, chatsLoadedForWorkflow: null })
|
||||
// Clear stream ID on completion
|
||||
set({ activeStreamId: null, chatsLastLoadedAt: null, chatsLoadedForWorkflow: null })
|
||||
} else {
|
||||
if (result.error === 'Request was aborted') {
|
||||
return
|
||||
@@ -2762,12 +2830,13 @@ export const useCopilotStore = create<CopilotStore>()(
|
||||
}
|
||||
},
|
||||
|
||||
// Abort streaming
|
||||
// Abort streaming (user-initiated)
|
||||
abortMessage: (options?: { suppressContinueOption?: boolean }) => {
|
||||
const { abortController, isSendingMessage, messages } = get()
|
||||
if (!isSendingMessage || !abortController) return
|
||||
const suppressContinueOption = options?.suppressContinueOption === true
|
||||
set({ isAborting: true, suppressAbortContinueOption: suppressContinueOption })
|
||||
// Mark this as a user-initiated abort (vs browser refresh which doesn't call this)
|
||||
set({ isAborting: true, suppressAbortContinueOption: suppressContinueOption, userInitiatedAbort: true })
|
||||
try {
|
||||
abortController.abort()
|
||||
stopStreamingUpdates()
|
||||
@@ -2861,7 +2930,11 @@ export const useCopilotStore = create<CopilotStore>()(
|
||||
abortSignal: abortController.signal,
|
||||
})
|
||||
if (result.success && result.stream) {
|
||||
if (result.streamId) {
|
||||
set({ activeStreamId: result.streamId })
|
||||
}
|
||||
await get().handleStreamingResponse(result.stream, newAssistantMessage.id, false)
|
||||
set({ activeStreamId: null })
|
||||
} else {
|
||||
if (result.error === 'Request was aborted') return
|
||||
const errorMessage = createErrorMessage(
|
||||
@@ -3206,16 +3279,30 @@ export const useCopilotStore = create<CopilotStore>()(
|
||||
reader.cancel()
|
||||
}, 600000)
|
||||
|
||||
// Track if this is a browser-initiated abort (not user clicking stop)
|
||||
let browserAbort = false
|
||||
|
||||
try {
|
||||
for await (const data of parseSSEStream(reader, decoder)) {
|
||||
const { abortController } = get()
|
||||
const { abortController, userInitiatedAbort } = get()
|
||||
if (abortController?.signal.aborted) {
|
||||
context.wasAborted = true
|
||||
const { suppressAbortContinueOption } = get()
|
||||
context.suppressContinueOption = suppressAbortContinueOption === true
|
||||
if (suppressAbortContinueOption) {
|
||||
set({ suppressAbortContinueOption: false })
|
||||
// Only treat as abort if user explicitly clicked stop (not browser refresh)
|
||||
if (userInitiatedAbort) {
|
||||
context.wasAborted = true
|
||||
const { suppressAbortContinueOption } = get()
|
||||
context.suppressContinueOption = suppressAbortContinueOption === true
|
||||
if (suppressAbortContinueOption) {
|
||||
set({ suppressAbortContinueOption: false })
|
||||
}
|
||||
set({ userInitiatedAbort: false }) // Reset flag
|
||||
} else {
|
||||
// Browser refresh/navigation - don't update any UI, just exit
|
||||
// The page is about to reload anyway
|
||||
browserAbort = true
|
||||
reader.cancel().catch(() => {})
|
||||
return // Exit immediately, skip all finalization
|
||||
}
|
||||
// User-initiated abort: clean up and break
|
||||
context.pendingContent = ''
|
||||
finalizeThinkingBlock(context)
|
||||
stopStreamingUpdates()
|
||||
@@ -3503,8 +3590,7 @@ export const useCopilotStore = create<CopilotStore>()(
|
||||
createdAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
}
|
||||
// Abort any in-progress tools and clear diff on new chat creation
|
||||
abortAllInProgressTools(set, get)
|
||||
// Don't abort tools during streaming - just clear diff
|
||||
try {
|
||||
useWorkflowDiffStore.getState().clearDiff()
|
||||
} catch {}
|
||||
@@ -3527,8 +3613,10 @@ export const useCopilotStore = create<CopilotStore>()(
|
||||
retrySave: async (_chatId: string) => {},
|
||||
|
||||
cleanup: () => {
|
||||
const { isSendingMessage } = get()
|
||||
if (isSendingMessage) get().abortMessage()
|
||||
// Don't abort on cleanup - let server-side stream continue for resumption
|
||||
// Just reset client state; stream will be resumable on page reload
|
||||
set({ isSendingMessage: false, abortController: null })
|
||||
|
||||
if (streamingUpdateRAF !== null) {
|
||||
cancelAnimationFrame(streamingUpdateRAF)
|
||||
streamingUpdateRAF = null
|
||||
@@ -3912,6 +4000,105 @@ export const useCopilotStore = create<CopilotStore>()(
|
||||
set({ messageQueue: [] })
|
||||
logger.info('[Queue] Queue cleared')
|
||||
},
|
||||
|
||||
// =====================
|
||||
// Stream Resumption
|
||||
// =====================
|
||||
|
||||
setActiveStreamId: (streamId) => {
|
||||
set({ activeStreamId: streamId })
|
||||
},
|
||||
|
||||
checkForActiveStream: async (chatId) => {
|
||||
try {
|
||||
const response = await fetch(`/api/copilot/chat/${chatId}/active-stream`, {
|
||||
credentials: 'include',
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
return false
|
||||
}
|
||||
|
||||
const data = await response.json()
|
||||
|
||||
if (data.hasActiveStream && data.streamId) {
|
||||
logger.info('[Resume] Found active stream', {
|
||||
chatId,
|
||||
streamId: data.streamId,
|
||||
chunkCount: data.chunkCount,
|
||||
})
|
||||
set({ activeStreamId: data.streamId })
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
} catch (error) {
|
||||
logger.warn('[Resume] Failed to check for active stream', { chatId, error })
|
||||
return false
|
||||
}
|
||||
},
|
||||
|
||||
resumeActiveStream: async (streamId) => {
|
||||
const state = get()
|
||||
|
||||
if (state.isResuming) {
|
||||
logger.warn('[Resume] Already resuming a stream')
|
||||
return
|
||||
}
|
||||
|
||||
logger.info('[Resume] Resuming stream', { streamId })
|
||||
set({ isResuming: true, isSendingMessage: true })
|
||||
|
||||
try {
|
||||
const response = await fetch(`/api/copilot/stream/${streamId}?from=0`, {
|
||||
credentials: 'include',
|
||||
})
|
||||
|
||||
if (!response.ok) {
|
||||
const data = await response.json().catch(() => ({}))
|
||||
|
||||
// Stream completed or errored - refresh messages from DB
|
||||
if (data.status === 'completed' || data.status === 'error') {
|
||||
logger.info('[Resume] Stream already finished', { streamId, status: data.status })
|
||||
// Reload the chat to get the saved messages
|
||||
const currentChat = get().currentChat
|
||||
if (currentChat) {
|
||||
await get().selectChat(currentChat)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
logger.warn('[Resume] Failed to resume stream', { streamId, status: response.status })
|
||||
return
|
||||
}
|
||||
|
||||
if (!response.body) {
|
||||
logger.warn('[Resume] No response body for resume stream')
|
||||
return
|
||||
}
|
||||
|
||||
// Create a placeholder assistant message for the resumed stream
|
||||
const resumeMessageId = crypto.randomUUID()
|
||||
const messages = get().messages
|
||||
const assistantMessage: CopilotMessage = {
|
||||
id: resumeMessageId,
|
||||
role: 'assistant',
|
||||
content: '',
|
||||
timestamp: new Date().toISOString(),
|
||||
toolCalls: [],
|
||||
contentBlocks: [],
|
||||
}
|
||||
|
||||
set({ messages: [...messages, assistantMessage] })
|
||||
|
||||
// Process the resumed stream
|
||||
await get().handleStreamingResponse(response.body, resumeMessageId, true)
|
||||
} catch (error) {
|
||||
logger.error('[Resume] Stream resumption failed', { streamId, error })
|
||||
} finally {
|
||||
set({ isResuming: false, isSendingMessage: false, activeStreamId: null })
|
||||
}
|
||||
},
|
||||
}))
|
||||
)
|
||||
|
||||
|
||||
@@ -156,6 +156,13 @@ export interface CopilotState {
|
||||
|
||||
// Message queue for messages sent while another is in progress
|
||||
messageQueue: QueuedMessage[]
|
||||
|
||||
// Stream resumption state
|
||||
activeStreamId: string | null
|
||||
isResuming: boolean
|
||||
|
||||
// Track if abort was user-initiated (vs browser refresh)
|
||||
userInitiatedAbort: boolean
|
||||
}
|
||||
|
||||
export interface CopilotActions {
|
||||
@@ -249,6 +256,11 @@ export interface CopilotActions {
|
||||
moveUpInQueue: (id: string) => void
|
||||
sendNow: (id: string) => Promise<void>
|
||||
clearQueue: () => void
|
||||
|
||||
// Stream resumption actions
|
||||
checkForActiveStream: (chatId: string) => Promise<boolean>
|
||||
resumeActiveStream: (streamId: string) => Promise<void>
|
||||
setActiveStreamId: (streamId: string | null) => void
|
||||
}
|
||||
|
||||
export type CopilotStore = CopilotState & CopilotActions
|
||||
|
||||
1
bun.lock
1
bun.lock
@@ -1,6 +1,5 @@
|
||||
{
|
||||
"lockfileVersion": 1,
|
||||
"configVersion": 1,
|
||||
"workspaces": {
|
||||
"": {
|
||||
"name": "simstudio",
|
||||
|
||||
2
packages/db/migrations/0146_cultured_ikaris.sql
Normal file
2
packages/db/migrations/0146_cultured_ikaris.sql
Normal file
@@ -0,0 +1,2 @@
|
||||
ALTER TABLE "user_stats" ADD COLUMN "total_mcp_executions" integer DEFAULT 0 NOT NULL;--> statement-breakpoint
|
||||
ALTER TABLE "user_stats" ADD COLUMN "total_a2a_executions" integer DEFAULT 0 NOT NULL;
|
||||
10384
packages/db/migrations/meta/0146_snapshot.json
Normal file
10384
packages/db/migrations/meta/0146_snapshot.json
Normal file
File diff suppressed because it is too large
Load Diff
@@ -1016,6 +1016,13 @@
|
||||
"when": 1768602646955,
|
||||
"tag": "0145_messy_archangel",
|
||||
"breakpoints": true
|
||||
},
|
||||
{
|
||||
"idx": 146,
|
||||
"version": "7",
|
||||
"when": 1768867605608,
|
||||
"tag": "0146_cultured_ikaris",
|
||||
"breakpoints": true
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
@@ -698,6 +698,8 @@ export const userStats = pgTable('user_stats', {
|
||||
totalWebhookTriggers: integer('total_webhook_triggers').notNull().default(0),
|
||||
totalScheduledExecutions: integer('total_scheduled_executions').notNull().default(0),
|
||||
totalChatExecutions: integer('total_chat_executions').notNull().default(0),
|
||||
totalMcpExecutions: integer('total_mcp_executions').notNull().default(0),
|
||||
totalA2aExecutions: integer('total_a2a_executions').notNull().default(0),
|
||||
totalTokensUsed: integer('total_tokens_used').notNull().default(0),
|
||||
totalCost: decimal('total_cost').notNull().default('0'),
|
||||
currentUsageLimit: decimal('current_usage_limit').default(DEFAULT_FREE_CREDITS.toString()), // Default $20 for free plan, null for team/enterprise
|
||||
|
||||
Reference in New Issue
Block a user