Compare commits

...

2 Commits

Author SHA1 Message Date
Siddharth Ganesan
a4007c7e7e v0 2026-01-20 18:22:59 -08:00
Siddharth Ganesan
71c92788c5 Add deploy subagent response 2026-01-20 17:53:35 -08:00
11 changed files with 2318 additions and 1 deletions

View File

@@ -0,0 +1,364 @@
import { db } from '@sim/db'
import { copilotChats, workflow as workflowTable } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { eq } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { z } from 'zod'
import { authenticateApiKeyFromHeader, updateApiKeyLastUsed } from '@/lib/api-key/service'
import { getSession } from '@/lib/auth'
import { getCopilotModel } from '@/lib/copilot/config'
import { SIM_AGENT_API_URL_DEFAULT, SIM_AGENT_VERSION } from '@/lib/copilot/constants'
import { COPILOT_MODEL_IDS } from '@/lib/copilot/models'
import {
createRequestTracker,
createUnauthorizedResponse,
} from '@/lib/copilot/request-helpers'
import {
createStream,
completeStream,
errorStream,
updateStreamStatus,
} from '@/lib/copilot/stream-persistence'
import { executeToolServerSide, isServerExecutableTool } from '@/lib/copilot/tools/server/executor'
import { getCredentialsServerTool } from '@/lib/copilot/tools/server/user/get-credentials'
import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/persistence/utils'
import { sanitizeForCopilot } from '@/lib/workflows/sanitization/json-sanitizer'
import { env } from '@/lib/core/config/env'
import { tools } from '@/tools/registry'
import { getLatestVersionTools, stripVersionSuffix } from '@/tools/utils'
const logger = createLogger('HeadlessCopilotAPI')
const SIM_AGENT_API_URL = env.SIM_AGENT_API_URL || SIM_AGENT_API_URL_DEFAULT
const HeadlessRequestSchema = z.object({
message: z.string().min(1, 'Message is required'),
workflowId: z.string().min(1, 'Workflow ID is required'),
chatId: z.string().optional(),
model: z.enum(COPILOT_MODEL_IDS).optional(),
mode: z.enum(['agent', 'build', 'chat']).optional().default('agent'),
timeout: z.number().optional().default(300000), // 5 minute default
persistChanges: z.boolean().optional().default(true),
createNewChat: z.boolean().optional().default(false),
})
export const dynamic = 'force-dynamic'
export const fetchCache = 'force-no-store'
export const runtime = 'nodejs'
/**
* POST /api/copilot/headless
*
* Execute copilot completely server-side without any client connection.
* All tool calls are executed server-side and results are persisted directly.
*
* Returns the final result after all processing is complete.
*/
export async function POST(req: NextRequest) {
const tracker = createRequestTracker()
const startTime = Date.now()
try {
// Authenticate via session or API key
let userId: string | null = null
const session = await getSession()
if (session?.user?.id) {
userId = session.user.id
} else {
// Try API key authentication from header
const apiKey = req.headers.get('x-api-key')
if (apiKey) {
const authResult = await authenticateApiKeyFromHeader(apiKey)
if (authResult.success && authResult.userId) {
userId = authResult.userId
// Update last used timestamp in background
if (authResult.keyId) {
updateApiKeyLastUsed(authResult.keyId).catch(() => {})
}
}
}
}
if (!userId) {
return createUnauthorizedResponse()
}
const body = await req.json()
const { message, workflowId, chatId, model, mode, timeout, persistChanges, createNewChat } =
HeadlessRequestSchema.parse(body)
logger.info(`[${tracker.requestId}] Headless copilot request`, {
userId,
workflowId,
messageLength: message.length,
mode,
})
// Verify user has access to workflow
const [wf] = await db
.select({ userId: workflowTable.userId, workspaceId: workflowTable.workspaceId })
.from(workflowTable)
.where(eq(workflowTable.id, workflowId))
.limit(1)
if (!wf) {
return NextResponse.json({ error: 'Workflow not found' }, { status: 404 })
}
// TODO: Add proper workspace access check
if (wf.userId !== userId) {
return NextResponse.json({ error: 'Access denied' }, { status: 403 })
}
// Load current workflow state from database
const workflowData = await loadWorkflowFromNormalizedTables(workflowId)
if (!workflowData) {
return NextResponse.json({ error: 'Workflow data not found' }, { status: 404 })
}
const sanitizedWorkflow = sanitizeForCopilot({
blocks: workflowData.blocks,
edges: workflowData.edges,
loops: workflowData.loops,
parallels: workflowData.parallels,
})
// Create a stream for tracking (even in headless mode)
const streamId = crypto.randomUUID()
const userMessageId = crypto.randomUUID()
const assistantMessageId = crypto.randomUUID()
await createStream({
streamId,
chatId: chatId || '',
userId,
workflowId,
userMessageId,
isClientSession: false, // Key: this is headless
})
await updateStreamStatus(streamId, 'streaming')
// Handle chat persistence
let actualChatId = chatId
if (createNewChat && !chatId) {
const { provider, model: defaultModel } = getCopilotModel('chat')
const [newChat] = await db
.insert(copilotChats)
.values({
userId,
workflowId,
title: null,
model: model || defaultModel,
messages: [],
})
.returning()
if (newChat) {
actualChatId = newChat.id
}
}
// Get credentials for tools
let credentials: {
oauth: Record<string, { accessToken: string; accountId: string; name: string }>
apiKeys: string[]
} | null = null
try {
const rawCredentials = await getCredentialsServerTool.execute({ workflowId }, { userId })
const oauthMap: Record<string, { accessToken: string; accountId: string; name: string }> = {}
for (const cred of rawCredentials?.oauth?.connected?.credentials || []) {
if (cred.accessToken) {
oauthMap[cred.provider] = {
accessToken: cred.accessToken,
accountId: cred.id,
name: cred.name,
}
}
}
credentials = {
oauth: oauthMap,
apiKeys: rawCredentials?.environment?.variableNames || [],
}
} catch (error) {
logger.warn(`[${tracker.requestId}] Failed to fetch credentials`, { error })
}
// Build tool definitions
const { createUserToolSchema } = await import('@/tools/params')
const latestTools = getLatestVersionTools(tools)
const integrationTools = Object.entries(latestTools).map(([toolId, toolConfig]) => {
const userSchema = createUserToolSchema(toolConfig)
const strippedName = stripVersionSuffix(toolId)
return {
name: strippedName,
description: toolConfig.description || toolConfig.name || strippedName,
input_schema: userSchema,
defer_loading: true,
}
})
// Build request payload
const defaults = getCopilotModel('chat')
const selectedModel = model || defaults.model
const effectiveMode = mode === 'agent' ? 'build' : mode
const requestPayload = {
message,
workflowId,
userId,
stream: false, // Non-streaming for headless
model: selectedModel,
mode: effectiveMode,
version: SIM_AGENT_VERSION,
messageId: userMessageId,
...(actualChatId && { chatId: actualChatId }),
...(integrationTools.length > 0 && { tools: integrationTools }),
...(credentials && { credentials }),
}
// Call sim agent (non-streaming)
const controller = new AbortController()
const timeoutId = setTimeout(() => controller.abort(), timeout)
try {
const response = await fetch(`${SIM_AGENT_API_URL}/api/chat-completion`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
...(env.COPILOT_API_KEY ? { 'x-api-key': env.COPILOT_API_KEY } : {}),
},
body: JSON.stringify(requestPayload),
signal: controller.signal,
})
clearTimeout(timeoutId)
if (!response.ok) {
const errorText = await response.text()
logger.error(`[${tracker.requestId}] Sim agent error`, {
status: response.status,
error: errorText,
})
await errorStream(streamId, `Agent error: ${response.statusText}`)
return NextResponse.json(
{ error: `Agent error: ${response.statusText}` },
{ status: response.status }
)
}
const result = await response.json()
// Execute tool calls server-side
const toolResults: Record<string, { success: boolean; result?: unknown; error?: string }> = {}
if (result.toolCalls && Array.isArray(result.toolCalls)) {
for (const toolCall of result.toolCalls) {
const toolName = toolCall.name
const toolArgs = toolCall.arguments || toolCall.input || {}
logger.info(`[${tracker.requestId}] Executing tool server-side`, {
toolName,
toolCallId: toolCall.id,
})
if (!isServerExecutableTool(toolName)) {
logger.warn(`[${tracker.requestId}] Tool not executable server-side`, { toolName })
toolResults[toolCall.id] = {
success: false,
error: `Tool ${toolName} requires client-side execution`,
}
continue
}
const toolResult = await executeToolServerSide(
{ name: toolName, args: toolArgs },
{ workflowId, userId, persistChanges }
)
toolResults[toolCall.id] = toolResult
}
}
// Mark stream complete
await completeStream(streamId, { content: result.content, toolResults })
// Save to chat history
if (actualChatId && persistChanges) {
const [chat] = await db
.select()
.from(copilotChats)
.where(eq(copilotChats.id, actualChatId))
.limit(1)
const existingMessages = chat ? (Array.isArray(chat.messages) ? chat.messages : []) : []
const newMessages = [
...existingMessages,
{
id: userMessageId,
role: 'user',
content: message,
timestamp: new Date().toISOString(),
},
{
id: assistantMessageId,
role: 'assistant',
content: result.content,
timestamp: new Date().toISOString(),
toolCalls: Object.entries(toolResults).map(([id, r]) => ({
id,
success: r.success,
})),
},
]
await db
.update(copilotChats)
.set({ messages: newMessages, updatedAt: new Date() })
.where(eq(copilotChats.id, actualChatId))
}
const duration = Date.now() - startTime
logger.info(`[${tracker.requestId}] Headless copilot complete`, {
duration,
contentLength: result.content?.length || 0,
toolCallsExecuted: Object.keys(toolResults).length,
})
return NextResponse.json({
success: true,
streamId,
chatId: actualChatId,
content: result.content,
toolResults,
duration,
})
} catch (error) {
clearTimeout(timeoutId)
if (error instanceof Error && error.name === 'AbortError') {
await errorStream(streamId, 'Request timed out')
return NextResponse.json({ error: 'Request timed out' }, { status: 504 })
}
throw error
}
} catch (error) {
logger.error(`[${tracker.requestId}] Headless copilot error`, { error })
if (error instanceof z.ZodError) {
return NextResponse.json({ error: 'Invalid request', details: error.errors }, { status: 400 })
}
return NextResponse.json(
{ error: error instanceof Error ? error.message : 'Internal error' },
{ status: 500 }
)
}
}

View File

@@ -0,0 +1,237 @@
import { createLogger } from '@sim/logger'
import { type NextRequest, NextResponse } from 'next/server'
import { getSession } from '@/lib/auth'
import {
getStreamMetadata,
getStreamEvents,
getStreamEventCount,
getToolCallStates,
refreshStreamTTL,
checkAbortSignal,
abortStream,
} from '@/lib/copilot/stream-persistence'
const logger = createLogger('StreamResumeAPI')
interface RouteParams {
streamId: string
}
/**
* GET /api/copilot/stream/{streamId}
* Subscribe to or resume a stream
*
* Query params:
* - offset: Start from this event index (for resumption)
* - mode: 'sse' (default) or 'poll'
*/
export async function GET(req: NextRequest, { params }: { params: Promise<RouteParams> }) {
const { streamId } = await params
const session = await getSession()
if (!session?.user?.id) {
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const metadata = await getStreamMetadata(streamId)
if (!metadata) {
return NextResponse.json({ error: 'Stream not found' }, { status: 404 })
}
// Verify user owns this stream
if (metadata.userId !== session.user.id) {
return NextResponse.json({ error: 'Forbidden' }, { status: 403 })
}
const offset = parseInt(req.nextUrl.searchParams.get('offset') || '0', 10)
const mode = req.nextUrl.searchParams.get('mode') || 'sse'
// Refresh TTL since someone is actively consuming
await refreshStreamTTL(streamId)
// Poll mode: return current state as JSON
if (mode === 'poll') {
const events = await getStreamEvents(streamId, offset)
const toolCalls = await getToolCallStates(streamId)
const eventCount = await getStreamEventCount(streamId)
return NextResponse.json({
metadata,
events,
toolCalls,
totalEvents: eventCount,
nextOffset: offset + events.length,
})
}
// SSE mode: stream events
const encoder = new TextEncoder()
const readable = new ReadableStream({
async start(controller) {
let closed = false
const safeEnqueue = (data: string) => {
if (closed) return
try {
controller.enqueue(encoder.encode(data))
} catch {
closed = true
}
}
const safeClose = () => {
if (closed) return
closed = true
try {
controller.close()
} catch {
// Already closed
}
}
// Send initial connection event
safeEnqueue(`: connected\n\n`)
// Send metadata
safeEnqueue(`event: metadata\ndata: ${JSON.stringify(metadata)}\n\n`)
// Send tool call states
const toolCalls = await getToolCallStates(streamId)
if (Object.keys(toolCalls).length > 0) {
safeEnqueue(`event: tool_states\ndata: ${JSON.stringify(toolCalls)}\n\n`)
}
// Replay missed events
const missedEvents = await getStreamEvents(streamId, offset)
for (const event of missedEvents) {
safeEnqueue(event)
}
// If stream is complete, send done and close
if (metadata.status === 'complete' || metadata.status === 'error' || metadata.status === 'aborted') {
safeEnqueue(
`event: stream_status\ndata: ${JSON.stringify({
status: metadata.status,
error: metadata.error,
})}\n\n`
)
safeClose()
return
}
// Stream is still active - poll for new events
let lastOffset = offset + missedEvents.length
const pollInterval = 100 // 100ms
const maxPollTime = 5 * 60 * 1000 // 5 minutes max
const startTime = Date.now()
const poll = async () => {
if (closed) return
try {
// Check for timeout
if (Date.now() - startTime > maxPollTime) {
logger.info('Stream poll timeout', { streamId })
safeEnqueue(
`event: stream_status\ndata: ${JSON.stringify({ status: 'timeout' })}\n\n`
)
safeClose()
return
}
// Check if client disconnected
if (await checkAbortSignal(streamId)) {
safeEnqueue(
`event: stream_status\ndata: ${JSON.stringify({ status: 'aborted' })}\n\n`
)
safeClose()
return
}
// Get current metadata to check status
const currentMeta = await getStreamMetadata(streamId)
if (!currentMeta) {
safeClose()
return
}
// Get new events
const newEvents = await getStreamEvents(streamId, lastOffset)
for (const event of newEvents) {
safeEnqueue(event)
}
lastOffset += newEvents.length
// Refresh TTL
await refreshStreamTTL(streamId)
// If complete, send status and close
if (
currentMeta.status === 'complete' ||
currentMeta.status === 'error' ||
currentMeta.status === 'aborted'
) {
safeEnqueue(
`event: stream_status\ndata: ${JSON.stringify({
status: currentMeta.status,
error: currentMeta.error,
})}\n\n`
)
safeClose()
return
}
// Continue polling
setTimeout(poll, pollInterval)
} catch (error) {
logger.error('Stream poll error', { streamId, error })
safeClose()
}
}
// Start polling
setTimeout(poll, pollInterval)
},
})
return new Response(readable, {
headers: {
'Content-Type': 'text/event-stream; charset=utf-8',
'Cache-Control': 'no-cache, no-transform',
Connection: 'keep-alive',
'X-Accel-Buffering': 'no',
'X-Stream-Id': streamId,
},
})
}
/**
* DELETE /api/copilot/stream/{streamId}
* Abort a stream
*/
export async function DELETE(req: NextRequest, { params }: { params: Promise<RouteParams> }) {
const { streamId } = await params
const session = await getSession()
if (!session?.user?.id) {
return NextResponse.json({ error: 'Unauthorized' }, { status: 401 })
}
const metadata = await getStreamMetadata(streamId)
if (!metadata) {
return NextResponse.json({ error: 'Stream not found' }, { status: 404 })
}
// Verify user owns this stream
if (metadata.userId !== session.user.id) {
return NextResponse.json({ error: 'Forbidden' }, { status: 403 })
}
await abortStream(streamId)
logger.info('Stream aborted by user', { streamId, userId: session.user.id })
return NextResponse.json({ success: true, streamId })
}

View File

@@ -1510,7 +1510,8 @@ export function ToolCall({
toolCall.name === 'user_memory' ||
toolCall.name === 'edit_respond' ||
toolCall.name === 'debug_respond' ||
toolCall.name === 'plan_respond'
toolCall.name === 'plan_respond' ||
toolCall.name === 'deploy_respond'
)
return null

View File

@@ -98,6 +98,8 @@ export interface ApiResponse {
*/
export interface StreamingResponse extends ApiResponse {
stream?: ReadableStream
streamId?: string
chatId?: string
}
/**
@@ -163,9 +165,15 @@ export async function sendStreamingMessage(
}
}
// Extract stream and chat IDs from headers for resumption support
const streamId = response.headers.get('X-Stream-Id') || undefined
const chatId = response.headers.get('X-Chat-Id') || undefined
return {
success: true,
stream: response.body,
streamId,
chatId,
}
} catch (error) {
// Handle AbortError gracefully - this is expected when user aborts

View File

@@ -0,0 +1,324 @@
/**
* Render events are the normalized event types sent to clients.
* These are independent of the sim agent's internal event format.
*/
export type RenderEventType =
| 'text_delta'
| 'text_complete'
| 'tool_pending'
| 'tool_executing'
| 'tool_success'
| 'tool_error'
| 'tool_result'
| 'subagent_start'
| 'subagent_text'
| 'subagent_tool_call'
| 'subagent_end'
| 'thinking_start'
| 'thinking_delta'
| 'thinking_end'
| 'message_start'
| 'message_complete'
| 'chat_id'
| 'conversation_id'
| 'error'
| 'stream_status'
export interface BaseRenderEvent {
type: RenderEventType
timestamp?: number
}
export interface TextDeltaEvent extends BaseRenderEvent {
type: 'text_delta'
content: string
}
export interface TextCompleteEvent extends BaseRenderEvent {
type: 'text_complete'
content: string
}
export interface ToolPendingEvent extends BaseRenderEvent {
type: 'tool_pending'
toolCallId: string
toolName: string
args?: Record<string, unknown>
display?: {
label: string
icon?: string
}
}
export interface ToolExecutingEvent extends BaseRenderEvent {
type: 'tool_executing'
toolCallId: string
toolName: string
}
export interface ToolSuccessEvent extends BaseRenderEvent {
type: 'tool_success'
toolCallId: string
toolName: string
result?: unknown
display?: {
label: string
icon?: string
}
}
export interface ToolErrorEvent extends BaseRenderEvent {
type: 'tool_error'
toolCallId: string
toolName: string
error: string
display?: {
label: string
icon?: string
}
}
export interface ToolResultEvent extends BaseRenderEvent {
type: 'tool_result'
toolCallId: string
success: boolean
result?: unknown
error?: string
failedDependency?: boolean
skipped?: boolean
}
export interface SubagentStartEvent extends BaseRenderEvent {
type: 'subagent_start'
parentToolCallId: string
subagentName: string
}
export interface SubagentTextEvent extends BaseRenderEvent {
type: 'subagent_text'
parentToolCallId: string
content: string
}
export interface SubagentToolCallEvent extends BaseRenderEvent {
type: 'subagent_tool_call'
parentToolCallId: string
toolCallId: string
toolName: string
args?: Record<string, unknown>
state: 'pending' | 'executing' | 'success' | 'error'
result?: unknown
error?: string
}
export interface SubagentEndEvent extends BaseRenderEvent {
type: 'subagent_end'
parentToolCallId: string
}
export interface ThinkingStartEvent extends BaseRenderEvent {
type: 'thinking_start'
}
export interface ThinkingDeltaEvent extends BaseRenderEvent {
type: 'thinking_delta'
content: string
}
export interface ThinkingEndEvent extends BaseRenderEvent {
type: 'thinking_end'
}
export interface MessageStartEvent extends BaseRenderEvent {
type: 'message_start'
messageId: string
}
export interface MessageCompleteEvent extends BaseRenderEvent {
type: 'message_complete'
messageId: string
content?: string
}
export interface ChatIdEvent extends BaseRenderEvent {
type: 'chat_id'
chatId: string
}
export interface ConversationIdEvent extends BaseRenderEvent {
type: 'conversation_id'
conversationId: string
}
export interface ErrorEvent extends BaseRenderEvent {
type: 'error'
error: string
code?: string
}
export interface StreamStatusEvent extends BaseRenderEvent {
type: 'stream_status'
status: 'streaming' | 'complete' | 'error' | 'aborted'
error?: string
}
export type RenderEvent =
| TextDeltaEvent
| TextCompleteEvent
| ToolPendingEvent
| ToolExecutingEvent
| ToolSuccessEvent
| ToolErrorEvent
| ToolResultEvent
| SubagentStartEvent
| SubagentTextEvent
| SubagentToolCallEvent
| SubagentEndEvent
| ThinkingStartEvent
| ThinkingDeltaEvent
| ThinkingEndEvent
| MessageStartEvent
| MessageCompleteEvent
| ChatIdEvent
| ConversationIdEvent
| ErrorEvent
| StreamStatusEvent
/**
* Serialize a render event to SSE format
*/
export function serializeRenderEvent(event: RenderEvent): string {
const eventWithTimestamp = {
...event,
timestamp: event.timestamp || Date.now(),
}
return `event: ${event.type}\ndata: ${JSON.stringify(eventWithTimestamp)}\n\n`
}
/**
* Parse an SSE chunk into a render event
*/
export function parseRenderEvent(chunk: string): RenderEvent | null {
// SSE format: "event: <type>\ndata: <json>\n\n"
const lines = chunk.trim().split('\n')
let eventType: string | null = null
let data: string | null = null
for (const line of lines) {
if (line.startsWith('event: ')) {
eventType = line.slice(7)
} else if (line.startsWith('data: ')) {
data = line.slice(6)
}
}
if (!data) return null
try {
const parsed = JSON.parse(data)
// If we extracted an event type from SSE, use it; otherwise use from data
if (eventType && !parsed.type) {
parsed.type = eventType
}
return parsed as RenderEvent
} catch {
return null
}
}
/**
* Create a text delta event
*/
export function createTextDelta(content: string): TextDeltaEvent {
return { type: 'text_delta', content, timestamp: Date.now() }
}
/**
* Create a tool pending event
*/
export function createToolPending(
toolCallId: string,
toolName: string,
args?: Record<string, unknown>,
display?: { label: string; icon?: string }
): ToolPendingEvent {
return {
type: 'tool_pending',
toolCallId,
toolName,
args,
display,
timestamp: Date.now(),
}
}
/**
* Create a tool executing event
*/
export function createToolExecuting(toolCallId: string, toolName: string): ToolExecutingEvent {
return { type: 'tool_executing', toolCallId, toolName, timestamp: Date.now() }
}
/**
* Create a tool success event
*/
export function createToolSuccess(
toolCallId: string,
toolName: string,
result?: unknown,
display?: { label: string; icon?: string }
): ToolSuccessEvent {
return {
type: 'tool_success',
toolCallId,
toolName,
result,
display,
timestamp: Date.now(),
}
}
/**
* Create a tool error event
*/
export function createToolError(
toolCallId: string,
toolName: string,
error: string,
display?: { label: string; icon?: string }
): ToolErrorEvent {
return {
type: 'tool_error',
toolCallId,
toolName,
error,
display,
timestamp: Date.now(),
}
}
/**
* Create a message complete event
*/
export function createMessageComplete(messageId: string, content?: string): MessageCompleteEvent {
return { type: 'message_complete', messageId, content, timestamp: Date.now() }
}
/**
* Create a stream status event
*/
export function createStreamStatus(
status: 'streaming' | 'complete' | 'error' | 'aborted',
error?: string
): StreamStatusEvent {
return { type: 'stream_status', status, error, timestamp: Date.now() }
}
/**
* Create an error event
*/
export function createError(error: string, code?: string): ErrorEvent {
return { type: 'error', error, code, timestamp: Date.now() }
}

View File

@@ -0,0 +1,299 @@
'use client'
import { createLogger } from '@sim/logger'
const logger = createLogger('StreamClient')
export interface StreamMetadata {
streamId: string
chatId: string
userId: string
workflowId: string
userMessageId: string
assistantMessageId?: string
status: 'pending' | 'streaming' | 'complete' | 'error' | 'aborted'
isClientSession: boolean
createdAt: number
updatedAt: number
completedAt?: number
error?: string
}
export interface StreamResumeResponse {
metadata: StreamMetadata
events: string[]
toolCalls: Record<string, unknown>
totalEvents: number
nextOffset: number
}
const STREAM_ID_STORAGE_KEY = 'copilot:activeStream'
const RECONNECT_DELAY_MS = 1000
const MAX_RECONNECT_ATTEMPTS = 5
/**
* Store active stream info for potential resumption
*/
export function storeActiveStream(
chatId: string,
streamId: string,
messageId: string
): void {
try {
const data = { chatId, streamId, messageId, storedAt: Date.now() }
sessionStorage.setItem(STREAM_ID_STORAGE_KEY, JSON.stringify(data))
logger.info('Stored active stream for potential resumption', { streamId, chatId })
} catch {
// Session storage not available
}
}
/**
* Get stored active stream if one exists
*/
export function getStoredActiveStream(): {
chatId: string
streamId: string
messageId: string
storedAt: number
} | null {
try {
const data = sessionStorage.getItem(STREAM_ID_STORAGE_KEY)
if (!data) return null
return JSON.parse(data)
} catch {
return null
}
}
/**
* Clear stored active stream
*/
export function clearStoredActiveStream(): void {
try {
sessionStorage.removeItem(STREAM_ID_STORAGE_KEY)
} catch {
// Session storage not available
}
}
/**
* Check if a stream is still active
*/
export async function checkStreamStatus(streamId: string): Promise<StreamMetadata | null> {
try {
const response = await fetch(`/api/copilot/stream/${streamId}?mode=poll&offset=0`)
if (!response.ok) {
if (response.status === 404) {
// Stream not found or expired
return null
}
throw new Error(`Failed to check stream status: ${response.statusText}`)
}
const data: StreamResumeResponse = await response.json()
return data.metadata
} catch (error) {
logger.error('Failed to check stream status', { streamId, error })
return null
}
}
/**
* Resume a stream from a given offset using SSE
*/
export async function resumeStream(
streamId: string,
offset: number = 0
): Promise<ReadableStream<Uint8Array> | null> {
try {
const response = await fetch(`/api/copilot/stream/${streamId}?mode=sse&offset=${offset}`)
if (!response.ok || !response.body) {
if (response.status === 404) {
logger.info('Stream not found for resumption', { streamId })
clearStoredActiveStream()
return null
}
throw new Error(`Failed to resume stream: ${response.statusText}`)
}
logger.info('Stream resumption started', { streamId, offset })
return response.body
} catch (error) {
logger.error('Failed to resume stream', { streamId, error })
return null
}
}
/**
* Abort a stream
*/
export async function abortStream(streamId: string): Promise<boolean> {
try {
const response = await fetch(`/api/copilot/stream/${streamId}`, {
method: 'DELETE',
})
if (!response.ok && response.status !== 404) {
throw new Error(`Failed to abort stream: ${response.statusText}`)
}
clearStoredActiveStream()
return true
} catch (error) {
logger.error('Failed to abort stream', { streamId, error })
return false
}
}
export interface StreamSubscription {
unsubscribe: () => void
getStreamId: () => string
}
export interface StreamEventHandler {
onEvent: (event: { type: string; data: Record<string, unknown> }) => void
onError?: (error: Error) => void
onComplete?: () => void
}
/**
* Subscribe to a stream (new or resumed) and process events
* This provides a unified interface for both initial streams and resumed streams
*/
export function subscribeToStream(
streamBody: ReadableStream<Uint8Array>,
handlers: StreamEventHandler
): StreamSubscription {
const reader = streamBody.getReader()
const decoder = new TextDecoder()
let cancelled = false
let buffer = ''
let streamId = ''
const processEvents = async () => {
try {
while (!cancelled) {
const { done, value } = await reader.read()
if (done || cancelled) break
buffer += decoder.decode(value, { stream: true })
// Process complete SSE messages
const messages = buffer.split('\n\n')
buffer = messages.pop() || ''
for (const message of messages) {
if (!message.trim()) continue
if (message.startsWith(':')) continue // SSE comment (ping)
// Parse SSE format
const lines = message.split('\n')
let eventType = 'message'
let data: Record<string, unknown> = {}
for (const line of lines) {
if (line.startsWith('event: ')) {
eventType = line.slice(7)
} else if (line.startsWith('data: ')) {
try {
data = JSON.parse(line.slice(6))
} catch {
data = { raw: line.slice(6) }
}
}
}
// Track stream ID if provided in metadata
if (eventType === 'metadata' && data.streamId) {
streamId = data.streamId as string
}
handlers.onEvent({ type: eventType, data })
// Check for terminal events
if (eventType === 'stream_status') {
const status = data.status as string
if (status === 'complete' || status === 'error' || status === 'aborted') {
if (status === 'error' && handlers.onError) {
handlers.onError(new Error(data.error as string || 'Stream error'))
}
if (handlers.onComplete) {
handlers.onComplete()
}
clearStoredActiveStream()
return
}
}
}
}
// Stream ended without explicit status
if (handlers.onComplete) {
handlers.onComplete()
}
} catch (error) {
if (!cancelled && handlers.onError) {
handlers.onError(error instanceof Error ? error : new Error(String(error)))
}
} finally {
reader.releaseLock()
}
}
// Start processing
processEvents()
return {
unsubscribe: () => {
cancelled = true
reader.cancel().catch(() => {})
clearStoredActiveStream()
},
getStreamId: () => streamId,
}
}
/**
* Attempt to resume any active stream from session storage
* Returns handlers if resumption is possible, null otherwise
*/
export async function attemptStreamResumption(): Promise<{
stream: ReadableStream<Uint8Array>
metadata: StreamMetadata
offset: number
} | null> {
const stored = getStoredActiveStream()
if (!stored) return null
// Check if stream is still valid (not too old)
const maxAge = 5 * 60 * 1000 // 5 minutes
if (Date.now() - stored.storedAt > maxAge) {
clearStoredActiveStream()
return null
}
// Check stream status
const metadata = await checkStreamStatus(stored.streamId)
if (!metadata) {
clearStoredActiveStream()
return null
}
// Only resume if stream is still active
if (metadata.status !== 'streaming' && metadata.status !== 'pending') {
clearStoredActiveStream()
return null
}
// Get the stream
const stream = await resumeStream(stored.streamId, 0)
if (!stream) {
return null
}
logger.info('Stream resumption possible', {
streamId: stored.streamId,
status: metadata.status,
})
return { stream, metadata, offset: 0 }
}

View File

@@ -0,0 +1,327 @@
import { createLogger } from '@sim/logger'
import { getRedisClient } from '@/lib/core/config/redis'
const logger = createLogger('StreamPersistence')
const STREAM_PREFIX = 'copilot:stream:'
const STREAM_TTL = 60 * 60 * 24 // 24 hours
export type StreamStatus = 'pending' | 'streaming' | 'complete' | 'error' | 'aborted'
export interface StreamMetadata {
streamId: string
chatId: string
userId: string
workflowId: string
userMessageId: string
assistantMessageId?: string
status: StreamStatus
isClientSession: boolean
createdAt: number
updatedAt: number
completedAt?: number
error?: string
}
export interface ToolCallState {
id: string
name: string
args: Record<string, unknown>
state: 'pending' | 'executing' | 'success' | 'error'
result?: unknown
error?: string
}
/**
* Initialize a new stream in Redis
*/
export async function createStream(params: {
streamId: string
chatId: string
userId: string
workflowId: string
userMessageId: string
isClientSession: boolean
}): Promise<void> {
const redis = getRedisClient()
if (!redis) {
logger.warn('Redis not available, stream will not be resumable')
return
}
const metadata: StreamMetadata = {
...params,
status: 'pending',
createdAt: Date.now(),
updatedAt: Date.now(),
}
const key = `${STREAM_PREFIX}${params.streamId}:meta`
await redis.set(key, JSON.stringify(metadata), 'EX', STREAM_TTL)
logger.info('Stream created', { streamId: params.streamId })
}
/**
* Update stream status
*/
export async function updateStreamStatus(
streamId: string,
status: StreamStatus,
error?: string
): Promise<void> {
const redis = getRedisClient()
if (!redis) return
const key = `${STREAM_PREFIX}${streamId}:meta`
const data = await redis.get(key)
if (!data) return
const metadata: StreamMetadata = JSON.parse(data)
metadata.status = status
metadata.updatedAt = Date.now()
if (status === 'complete' || status === 'error') {
metadata.completedAt = Date.now()
}
if (error) {
metadata.error = error
}
await redis.set(key, JSON.stringify(metadata), 'EX', STREAM_TTL)
}
/**
* Update stream metadata with additional fields
*/
export async function updateStreamMetadata(
streamId: string,
updates: Partial<StreamMetadata>
): Promise<void> {
const redis = getRedisClient()
if (!redis) return
const key = `${STREAM_PREFIX}${streamId}:meta`
const data = await redis.get(key)
if (!data) return
const metadata: StreamMetadata = JSON.parse(data)
Object.assign(metadata, updates, { updatedAt: Date.now() })
await redis.set(key, JSON.stringify(metadata), 'EX', STREAM_TTL)
}
/**
* Append a serialized SSE event chunk to the stream
*/
export async function appendChunk(streamId: string, chunk: string): Promise<void> {
const redis = getRedisClient()
if (!redis) return
const key = `${STREAM_PREFIX}${streamId}:events`
await redis.rpush(key, chunk)
await redis.expire(key, STREAM_TTL)
}
/**
* Append text content (for quick content retrieval without parsing events)
*/
export async function appendContent(streamId: string, content: string): Promise<void> {
const redis = getRedisClient()
if (!redis) return
const key = `${STREAM_PREFIX}${streamId}:content`
await redis.append(key, content)
await redis.expire(key, STREAM_TTL)
}
/**
* Update tool call state
*/
export async function updateToolCall(
streamId: string,
toolCallId: string,
update: Partial<ToolCallState>
): Promise<void> {
const redis = getRedisClient()
if (!redis) return
const key = `${STREAM_PREFIX}${streamId}:tools`
const existing = await redis.hget(key, toolCallId)
const current: ToolCallState = existing
? JSON.parse(existing)
: { id: toolCallId, name: '', args: {}, state: 'pending' }
const updated = { ...current, ...update }
await redis.hset(key, toolCallId, JSON.stringify(updated))
await redis.expire(key, STREAM_TTL)
}
/**
* Mark stream as complete
*/
export async function completeStream(streamId: string, result?: unknown): Promise<void> {
const redis = getRedisClient()
if (!redis) return
await updateStreamStatus(streamId, 'complete')
if (result !== undefined) {
const key = `${STREAM_PREFIX}${streamId}:result`
await redis.set(key, JSON.stringify(result), 'EX', STREAM_TTL)
}
logger.info('Stream completed', { streamId })
}
/**
* Mark stream as errored
*/
export async function errorStream(streamId: string, error: string): Promise<void> {
await updateStreamStatus(streamId, 'error', error)
logger.error('Stream errored', { streamId, error })
}
/**
* Check if stream was aborted (client requested abort)
*/
export async function checkAbortSignal(streamId: string): Promise<boolean> {
const redis = getRedisClient()
if (!redis) return false
const key = `${STREAM_PREFIX}${streamId}:abort`
const aborted = await redis.exists(key)
return aborted === 1
}
/**
* Signal stream abort
*/
export async function abortStream(streamId: string): Promise<void> {
const redis = getRedisClient()
if (!redis) return
await redis.set(`${STREAM_PREFIX}${streamId}:abort`, '1', 'EX', STREAM_TTL)
await updateStreamStatus(streamId, 'aborted')
logger.info('Stream aborted', { streamId })
}
/**
* Refresh TTL on all stream keys
*/
export async function refreshStreamTTL(streamId: string): Promise<void> {
const redis = getRedisClient()
if (!redis) return
const keys = [
`${STREAM_PREFIX}${streamId}:meta`,
`${STREAM_PREFIX}${streamId}:events`,
`${STREAM_PREFIX}${streamId}:content`,
`${STREAM_PREFIX}${streamId}:tools`,
`${STREAM_PREFIX}${streamId}:result`,
]
for (const key of keys) {
await redis.expire(key, STREAM_TTL)
}
}
/**
* Get stream metadata
*/
export async function getStreamMetadata(streamId: string): Promise<StreamMetadata | null> {
const redis = getRedisClient()
if (!redis) return null
const data = await redis.get(`${STREAM_PREFIX}${streamId}:meta`)
return data ? JSON.parse(data) : null
}
/**
* Get stream events from offset (for resumption)
*/
export async function getStreamEvents(streamId: string, fromOffset: number = 0): Promise<string[]> {
const redis = getRedisClient()
if (!redis) return []
const key = `${STREAM_PREFIX}${streamId}:events`
return redis.lrange(key, fromOffset, -1)
}
/**
* Get current event count (for client to know where it is)
*/
export async function getStreamEventCount(streamId: string): Promise<number> {
const redis = getRedisClient()
if (!redis) return 0
const key = `${STREAM_PREFIX}${streamId}:events`
return redis.llen(key)
}
/**
* Get all tool call states
*/
export async function getToolCallStates(streamId: string): Promise<Record<string, ToolCallState>> {
const redis = getRedisClient()
if (!redis) return {}
const key = `${STREAM_PREFIX}${streamId}:tools`
const data = await redis.hgetall(key)
const result: Record<string, ToolCallState> = {}
for (const [id, json] of Object.entries(data)) {
result[id] = JSON.parse(json)
}
return result
}
/**
* Get accumulated content
*/
export async function getStreamContent(streamId: string): Promise<string> {
const redis = getRedisClient()
if (!redis) return ''
const key = `${STREAM_PREFIX}${streamId}:content`
return (await redis.get(key)) || ''
}
/**
* Get final result (if complete)
*/
export async function getStreamResult(streamId: string): Promise<unknown | null> {
const redis = getRedisClient()
if (!redis) return null
const key = `${STREAM_PREFIX}${streamId}:result`
const data = await redis.get(key)
return data ? JSON.parse(data) : null
}
/**
* Check if Redis is available for stream persistence
*/
export function isStreamPersistenceEnabled(): boolean {
return getRedisClient() !== null
}
/**
* Delete all stream data (cleanup)
*/
export async function deleteStream(streamId: string): Promise<void> {
const redis = getRedisClient()
if (!redis) return
const keys = [
`${STREAM_PREFIX}${streamId}:meta`,
`${STREAM_PREFIX}${streamId}:events`,
`${STREAM_PREFIX}${streamId}:content`,
`${STREAM_PREFIX}${streamId}:tools`,
`${STREAM_PREFIX}${streamId}:result`,
`${STREAM_PREFIX}${streamId}:abort`,
]
await redis.del(...keys)
logger.info('Stream deleted', { streamId })
}

View File

@@ -0,0 +1,419 @@
import { createLogger } from '@sim/logger'
import type { RenderEvent } from './render-events'
const logger = createLogger('StreamTransformer')
export interface TransformStreamContext {
streamId: string
chatId: string
userId: string
workflowId: string
userMessageId: string
assistantMessageId: string
/** Callback for each render event - handles both client delivery and persistence */
onRenderEvent: (event: RenderEvent) => Promise<void>
/** Callback for persistence operations */
onPersist?: (data: { type: string; [key: string]: unknown }) => Promise<void>
/** Check if stream should be aborted */
isAborted: () => boolean | Promise<boolean>
}
interface SimAgentEvent {
type?: string
event?: string
data?: unknown
[key: string]: unknown
}
/**
* Transform a sim agent SSE stream into normalized render events.
* This function consumes the entire stream and emits events via callbacks.
*/
export async function transformStream(
body: ReadableStream<Uint8Array>,
context: TransformStreamContext
): Promise<void> {
const { onRenderEvent, onPersist, isAborted } = context
const reader = body.getReader()
const decoder = new TextDecoder()
let buffer = ''
try {
while (true) {
// Check abort signal
const shouldAbort = await Promise.resolve(isAborted())
if (shouldAbort) {
logger.info('Stream aborted by signal', { streamId: context.streamId })
break
}
const { done, value } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
// Process complete SSE messages (separated by double newlines)
const messages = buffer.split('\n\n')
buffer = messages.pop() || '' // Keep incomplete message in buffer
for (const message of messages) {
if (!message.trim()) continue
const events = parseSimAgentMessage(message)
for (const simEvent of events) {
const renderEvents = transformSimAgentEvent(simEvent, context)
for (const renderEvent of renderEvents) {
await onRenderEvent(renderEvent)
}
}
}
}
// Process any remaining buffer content
if (buffer.trim()) {
const events = parseSimAgentMessage(buffer)
for (const simEvent of events) {
const renderEvents = transformSimAgentEvent(simEvent, context)
for (const renderEvent of renderEvents) {
await onRenderEvent(renderEvent)
}
}
}
// Emit message complete
await onRenderEvent({
type: 'message_complete',
messageId: context.assistantMessageId,
timestamp: Date.now(),
})
// Notify persistence layer
if (onPersist) {
await onPersist({ type: 'message_complete', messageId: context.assistantMessageId })
}
} catch (error) {
logger.error('Stream transform error', { streamId: context.streamId, error })
await onRenderEvent({
type: 'error',
error: error instanceof Error ? error.message : 'Stream processing error',
timestamp: Date.now(),
})
throw error
} finally {
reader.releaseLock()
}
}
/**
* Parse a raw SSE message into sim agent events
*/
function parseSimAgentMessage(message: string): SimAgentEvent[] {
const events: SimAgentEvent[] = []
const lines = message.split('\n')
let currentEvent: string | null = null
let currentData: string[] = []
for (const line of lines) {
if (line.startsWith('event: ')) {
// If we have accumulated data, emit previous event
if (currentData.length > 0) {
const dataStr = currentData.join('\n')
const parsed = tryParseJson(dataStr)
if (parsed) {
events.push({ ...parsed, event: currentEvent || undefined })
}
currentData = []
}
currentEvent = line.slice(7)
} else if (line.startsWith('data: ')) {
currentData.push(line.slice(6))
} else if (line === '' && currentData.length > 0) {
// Empty line signals end of event
const dataStr = currentData.join('\n')
const parsed = tryParseJson(dataStr)
if (parsed) {
events.push({ ...parsed, event: currentEvent || undefined })
}
currentEvent = null
currentData = []
}
}
// Handle remaining data
if (currentData.length > 0) {
const dataStr = currentData.join('\n')
const parsed = tryParseJson(dataStr)
if (parsed) {
events.push({ ...parsed, event: currentEvent || undefined })
}
}
return events
}
function tryParseJson(str: string): Record<string, unknown> | null {
if (str === '[DONE]') return null
try {
return JSON.parse(str)
} catch {
return null
}
}
/**
* Transform a sim agent event into one or more render events
*/
function transformSimAgentEvent(
simEvent: SimAgentEvent,
context: TransformStreamContext
): RenderEvent[] {
const eventType = simEvent.type || simEvent.event
const events: RenderEvent[] = []
const timestamp = Date.now()
switch (eventType) {
// Text content events
case 'content_block_delta':
case 'text_delta':
case 'delta': {
const delta = (simEvent.delta as Record<string, unknown>) || simEvent
const text = (delta.text as string) || (delta.content as string) || (simEvent.text as string)
if (text) {
events.push({ type: 'text_delta', content: text, timestamp })
}
break
}
case 'content_block_stop':
case 'text_complete': {
events.push({
type: 'text_complete',
content: (simEvent.content as string) || '',
timestamp,
})
break
}
// Tool call events
case 'tool_call':
case 'tool_use': {
const data = (simEvent.data as Record<string, unknown>) || simEvent
const toolCallId = (data.id as string) || (simEvent.id as string)
const toolName = (data.name as string) || (simEvent.name as string)
const args = (data.arguments as Record<string, unknown>) || (data.input as Record<string, unknown>)
if (toolCallId && toolName) {
events.push({
type: 'tool_pending',
toolCallId,
toolName,
args,
timestamp,
})
}
break
}
case 'tool_executing': {
const toolCallId = (simEvent.toolCallId as string) || (simEvent.id as string)
const toolName = (simEvent.toolName as string) || (simEvent.name as string) || ''
if (toolCallId) {
events.push({
type: 'tool_executing',
toolCallId,
toolName,
timestamp,
})
}
break
}
case 'tool_result': {
const toolCallId = (simEvent.toolCallId as string) || (simEvent.id as string)
const success = simEvent.success as boolean
const result = simEvent.result
const error = simEvent.error as string | undefined
if (toolCallId) {
events.push({
type: 'tool_result',
toolCallId,
success: success !== false,
result,
error,
failedDependency: simEvent.failedDependency as boolean | undefined,
skipped: (simEvent.result as Record<string, unknown>)?.skipped as boolean | undefined,
timestamp,
})
// Also emit success/error event for UI
if (success !== false) {
events.push({
type: 'tool_success',
toolCallId,
toolName: (simEvent.toolName as string) || '',
result,
timestamp,
})
} else {
events.push({
type: 'tool_error',
toolCallId,
toolName: (simEvent.toolName as string) || '',
error: error || 'Tool execution failed',
timestamp,
})
}
}
break
}
// Subagent events
case 'subagent_start': {
events.push({
type: 'subagent_start',
parentToolCallId: simEvent.parentToolCallId as string,
subagentName: simEvent.subagentName as string,
timestamp,
})
break
}
case 'subagent_text':
case 'subagent_delta': {
events.push({
type: 'subagent_text',
parentToolCallId: simEvent.parentToolCallId as string,
content: (simEvent.content as string) || (simEvent.text as string) || '',
timestamp,
})
break
}
case 'subagent_tool_call': {
events.push({
type: 'subagent_tool_call',
parentToolCallId: simEvent.parentToolCallId as string,
toolCallId: simEvent.toolCallId as string,
toolName: simEvent.toolName as string,
args: simEvent.args as Record<string, unknown> | undefined,
state: (simEvent.state as 'pending' | 'executing' | 'success' | 'error') || 'pending',
result: simEvent.result,
error: simEvent.error as string | undefined,
timestamp,
})
break
}
case 'subagent_end': {
events.push({
type: 'subagent_end',
parentToolCallId: simEvent.parentToolCallId as string,
timestamp,
})
break
}
// Thinking events (for extended thinking models)
case 'thinking_start':
case 'thinking': {
if (simEvent.type === 'thinking_start' || !simEvent.content) {
events.push({ type: 'thinking_start', timestamp })
}
if (simEvent.content) {
events.push({
type: 'thinking_delta',
content: simEvent.content as string,
timestamp,
})
}
break
}
case 'thinking_delta': {
events.push({
type: 'thinking_delta',
content: (simEvent.content as string) || '',
timestamp,
})
break
}
case 'thinking_end':
case 'thinking_complete': {
events.push({ type: 'thinking_end', timestamp })
break
}
// Message lifecycle events
case 'message_start': {
events.push({
type: 'message_start',
messageId: (simEvent.messageId as string) || context.assistantMessageId,
timestamp,
})
break
}
case 'message_stop':
case 'message_complete':
case 'message_delta': {
if (eventType === 'message_complete' || eventType === 'message_stop') {
events.push({
type: 'message_complete',
messageId: (simEvent.messageId as string) || context.assistantMessageId,
content: simEvent.content as string | undefined,
timestamp,
})
}
break
}
// Metadata events
case 'chat_id': {
events.push({
type: 'chat_id',
chatId: simEvent.chatId as string,
timestamp,
})
break
}
case 'conversation_id': {
events.push({
type: 'conversation_id',
conversationId: simEvent.conversationId as string,
timestamp,
})
break
}
// Error events
case 'error': {
events.push({
type: 'error',
error: (simEvent.error as string) || (simEvent.message as string) || 'Unknown error',
code: simEvent.code as string | undefined,
timestamp,
})
break
}
default: {
// Log unhandled event types for debugging
if (eventType && eventType !== 'ping') {
logger.debug('Unhandled sim agent event type', { eventType, streamId: context.streamId })
}
}
}
return events
}

View File

@@ -0,0 +1,255 @@
import { createLogger } from '@sim/logger'
import { routeExecution } from './router'
import { saveWorkflowToNormalizedTables } from '@/lib/workflows/persistence/utils'
const logger = createLogger('ServerToolExecutor')
export interface ServerToolContext {
workflowId: string
userId: string
persistChanges?: boolean
}
export interface ServerToolResult {
success: boolean
result?: unknown
error?: string
}
/**
* Execute any copilot tool completely server-side.
* This is the central dispatcher for headless/API operation.
*/
export async function executeToolServerSide(
toolCall: { name: string; args: Record<string, unknown> },
context: ServerToolContext
): Promise<ServerToolResult> {
const { name, args } = toolCall
const { workflowId, userId, persistChanges = true } = context
logger.info('Executing tool server-side', { name, workflowId, userId })
try {
const result = await executeToolInternal(name, args, context)
return { success: true, result }
} catch (error) {
logger.error('Server-side tool execution failed', {
name,
workflowId,
error: error instanceof Error ? error.message : String(error),
})
return {
success: false,
error: error instanceof Error ? error.message : 'Tool execution failed',
}
}
}
async function executeToolInternal(
name: string,
args: Record<string, unknown>,
context: ServerToolContext
): Promise<unknown> {
const { workflowId, userId, persistChanges = true } = context
switch (name) {
case 'edit_workflow': {
// Execute edit_workflow with direct persistence
const result = await routeExecution(
'edit_workflow',
{
...args,
workflowId,
// Don't require currentUserWorkflow - server tool will load from DB
},
{ userId }
)
// Persist directly to database if enabled
if (persistChanges && result.workflowState) {
try {
await saveWorkflowToNormalizedTables(workflowId, result.workflowState)
logger.info('Workflow changes persisted directly', { workflowId })
} catch (error) {
logger.error('Failed to persist workflow changes', { error, workflowId })
// Don't throw - return the result anyway
}
}
return result
}
case 'run_workflow': {
// Import dynamically to avoid circular dependencies
const { executeWorkflow } = await import('@/lib/workflows/executor/execute-workflow')
const result = await executeWorkflow({
workflowId,
input: (args.workflow_input as Record<string, unknown>) || {},
isClientSession: false,
})
return result
}
case 'deploy_api':
case 'deploy_chat':
case 'deploy_mcp': {
// Import dynamically
const { deployWorkflow } = await import('@/lib/workflows/persistence/utils')
const deployType = name.replace('deploy_', '')
const result = await deployWorkflow({
workflowId,
deployedBy: userId,
})
return { ...result, deployType }
}
case 'redeploy': {
const { deployWorkflow } = await import('@/lib/workflows/persistence/utils')
const result = await deployWorkflow({
workflowId,
deployedBy: userId,
})
return result
}
// Server tools that already exist in the router
case 'get_blocks_and_tools':
case 'get_blocks_metadata':
case 'get_block_options':
case 'get_block_config':
case 'get_trigger_blocks':
case 'get_workflow_console':
case 'search_documentation':
case 'search_online':
case 'set_environment_variables':
case 'get_credentials':
case 'make_api_request':
case 'knowledge_base': {
return routeExecution(name, args, { userId })
}
// Tools that just need workflowId context
case 'get_user_workflow':
case 'get_workflow_data': {
const { loadWorkflowFromNormalizedTables } = await import(
'@/lib/workflows/persistence/utils'
)
const { sanitizeForCopilot } = await import('@/lib/workflows/sanitization/json-sanitizer')
const workflowData = await loadWorkflowFromNormalizedTables(workflowId)
if (!workflowData) {
throw new Error('Workflow not found')
}
const sanitized = sanitizeForCopilot({
blocks: workflowData.blocks,
edges: workflowData.edges,
loops: workflowData.loops,
parallels: workflowData.parallels,
})
return { workflow: JSON.stringify(sanitized, null, 2) }
}
case 'list_user_workflows': {
const { db } = await import('@sim/db')
const { workflow: workflowTable } = await import('@sim/db/schema')
const { eq } = await import('drizzle-orm')
const workflows = await db
.select({
id: workflowTable.id,
name: workflowTable.name,
description: workflowTable.description,
isDeployed: workflowTable.isDeployed,
createdAt: workflowTable.createdAt,
updatedAt: workflowTable.updatedAt,
})
.from(workflowTable)
.where(eq(workflowTable.userId, userId))
return { workflows }
}
case 'check_deployment_status': {
const { db } = await import('@sim/db')
const { workflow: workflowTable } = await import('@sim/db/schema')
const { eq } = await import('drizzle-orm')
const [wf] = await db
.select({
isDeployed: workflowTable.isDeployed,
deployedAt: workflowTable.deployedAt,
})
.from(workflowTable)
.where(eq(workflowTable.id, workflowId))
.limit(1)
return {
isDeployed: wf?.isDeployed || false,
deployedAt: wf?.deployedAt || null,
}
}
default: {
logger.warn('Unknown tool for server-side execution', { name })
throw new Error(`Tool ${name} is not available for server-side execution`)
}
}
}
/**
* Check if a tool can be executed server-side
*/
export function isServerExecutableTool(toolName: string): boolean {
const serverExecutableTools = new Set([
// Core editing tools
'edit_workflow',
'run_workflow',
// Deployment tools
'deploy_api',
'deploy_chat',
'deploy_mcp',
'redeploy',
'check_deployment_status',
// Existing server tools
'get_blocks_and_tools',
'get_blocks_metadata',
'get_block_options',
'get_block_config',
'get_trigger_blocks',
'get_workflow_console',
'search_documentation',
'search_online',
'set_environment_variables',
'get_credentials',
'make_api_request',
'knowledge_base',
// Workflow info tools
'get_user_workflow',
'get_workflow_data',
'list_user_workflows',
])
return serverExecutableTools.has(toolName)
}
/**
* Get list of tools that require client-side execution
*/
export function getClientOnlyTools(): string[] {
return [
'navigate_ui', // Requires DOM
'oauth_request_access', // Requires browser auth flow
]
}

View File

@@ -2708,6 +2708,16 @@ export const useCopilotStore = create<CopilotStore>()(
})
if (result.success && result.stream) {
// Store stream ID for potential resumption on disconnect
if (result.streamId) {
const { storeActiveStream } = await import('@/lib/copilot/stream-client')
storeActiveStream(
result.chatId || currentChat?.id || '',
result.streamId,
streamingMessage.id
)
}
await get().handleStreamingResponse(
result.stream,
streamingMessage.id,
@@ -2715,6 +2725,12 @@ export const useCopilotStore = create<CopilotStore>()(
userMessage.id
)
set({ chatsLastLoadedAt: null, chatsLoadedForWorkflow: null })
// Clear stream storage on successful completion
if (result.streamId) {
const { clearStoredActiveStream } = await import('@/lib/copilot/stream-client')
clearStoredActiveStream()
}
} else {
if (result.error === 'Request was aborted') {
return
@@ -3853,6 +3869,68 @@ export const useCopilotStore = create<CopilotStore>()(
return autoAllowedTools.includes(toolId)
},
// Stream resumption
attemptStreamResumption: async () => {
const { isSendingMessage } = get()
if (isSendingMessage) {
logger.info('[Stream] Cannot attempt resumption while already sending')
return false
}
try {
const { attemptStreamResumption, clearStoredActiveStream } = await import(
'@/lib/copilot/stream-client'
)
const resumption = await attemptStreamResumption()
if (!resumption) {
return false
}
const { stream, metadata } = resumption
logger.info('[Stream] Resuming stream', {
streamId: metadata.streamId,
chatId: metadata.chatId,
})
// Find or create the assistant message for this stream
const { messages } = get()
let assistantMessageId = metadata.assistantMessageId
// If we don't have the assistant message, create a placeholder
if (!assistantMessageId || !messages.find((m) => m.id === assistantMessageId)) {
assistantMessageId = crypto.randomUUID()
const streamingMessage: CopilotMessage = {
id: assistantMessageId,
role: 'assistant',
content: '',
timestamp: new Date().toISOString(),
isStreaming: true,
contentBlocks: [],
}
set((state) => ({
messages: [...state.messages, streamingMessage],
isSendingMessage: true,
}))
}
// Process the resumed stream
await get().handleStreamingResponse(
stream,
assistantMessageId,
true, // This is a continuation
metadata.userMessageId
)
clearStoredActiveStream()
return true
} catch (error) {
logger.error('[Stream] Resumption failed', { error })
return false
}
},
// Message queue actions
addToQueue: (message, options) => {
const queuedMessage: import('./types').QueuedMessage = {

View File

@@ -63,6 +63,8 @@ export interface CopilotMessage {
fileAttachments?: MessageFileAttachment[]
contexts?: ChatContext[]
errorType?: 'usage_limit' | 'unauthorized' | 'forbidden' | 'rate_limit' | 'upgrade_required'
/** Whether this message is currently being streamed */
isStreaming?: boolean
}
/**
@@ -235,6 +237,9 @@ export interface CopilotActions {
removeAutoAllowedTool: (toolId: string) => Promise<void>
isToolAutoAllowed: (toolId: string) => boolean
// Stream resumption
attemptStreamResumption: () => Promise<boolean>
// Message queue actions
addToQueue: (
message: string,