improvement(copilot): improve context inputs and fix some bugs (#1216)

* Add logs v1

* Update

* Updates

* Updates

* Fixes

* Fix current workflow in context

* Fix mentions

* Error handling

* Fix chat loading

* Hide current workflow from context

* Run workflow fix

* Lint
This commit is contained in:
Siddharth Ganesan
2025-09-01 16:51:58 -07:00
committed by GitHub
parent f75c807580
commit 12135d2aa8
9 changed files with 1179 additions and 79 deletions

View File

@@ -50,13 +50,25 @@ const ChatMessageSchema = z.object({
contexts: z
.array(
z.object({
kind: z.enum(['past_chat', 'workflow', 'blocks', 'logs', 'knowledge', 'templates']),
kind: z.enum([
'past_chat',
'workflow',
'current_workflow',
'blocks',
'logs',
'workflow_block',
'knowledge',
'templates',
'docs',
]),
label: z.string(),
chatId: z.string().optional(),
workflowId: z.string().optional(),
knowledgeId: z.string().optional(),
blockId: z.string().optional(),
templateId: z.string().optional(),
executionId: z.string().optional(),
// For workflow_block, provide both workflowId and blockId
})
)
.optional(),
@@ -105,6 +117,7 @@ export async function POST(req: NextRequest) {
kind: c?.kind,
chatId: c?.chatId,
workflowId: c?.workflowId,
executionId: (c as any)?.executionId,
label: c?.label,
}))
: undefined,
@@ -115,13 +128,18 @@ export async function POST(req: NextRequest) {
if (Array.isArray(contexts) && contexts.length > 0) {
try {
const { processContextsServer } = await import('@/lib/copilot/process-contents')
const processed = await processContextsServer(contexts as any, authenticatedUserId)
const processed = await processContextsServer(contexts as any, authenticatedUserId, message)
agentContexts = processed
logger.info(`[${tracker.requestId}] Contexts processed for request`, {
processedCount: agentContexts.length,
kinds: agentContexts.map((c) => c.type),
lengthPreview: agentContexts.map((c) => c.content?.length ?? 0),
})
if (Array.isArray(contexts) && contexts.length > 0 && agentContexts.length === 0) {
logger.warn(
`[${tracker.requestId}] Contexts provided but none processed. Check executionId for logs contexts.`
)
}
} catch (e) {
logger.error(`[${tracker.requestId}] Failed to process contexts`, e)
}
@@ -474,16 +492,6 @@ export async function POST(req: NextRequest) {
break
}
// Check if client disconnected before processing chunk
try {
// Forward the chunk to client immediately
controller.enqueue(value)
} catch (error) {
// Client disconnected - stop reading from sim agent
reader.cancel() // Stop reading from sim agent
break
}
// Decode and parse SSE events for logging and capturing content
const decodedChunk = decoder.decode(value, { stream: true })
buffer += decodedChunk
@@ -583,6 +591,47 @@ export async function POST(req: NextRequest) {
default:
}
// Emit to client: rewrite 'error' events into user-friendly assistant message
if (event?.type === 'error') {
try {
const displayMessage: string =
(event?.data && (event.data.displayMessage as string)) ||
'Sorry, I encountered an error. Please try again.'
const formatted = `_${displayMessage}_`
// Accumulate so it persists to DB as assistant content
assistantContent += formatted
// Send as content chunk
try {
controller.enqueue(
encoder.encode(
`data: ${JSON.stringify({ type: 'content', data: formatted })}\n\n`
)
)
} catch (enqueueErr) {
reader.cancel()
break
}
// Then close this response cleanly for the client
try {
controller.enqueue(
encoder.encode(`data: ${JSON.stringify({ type: 'done' })}\n\n`)
)
} catch (enqueueErr) {
reader.cancel()
break
}
} catch {}
// Do not forward the original error event
} else {
// Forward original event to client
try {
controller.enqueue(encoder.encode(`data: ${jsonStr}\n\n`))
} catch (enqueueErr) {
reader.cancel()
break
}
}
} catch (e) {
// Enhanced error handling for large payloads and parsing issues
const lineLength = line.length
@@ -615,10 +664,37 @@ export async function POST(req: NextRequest) {
logger.debug(`[${tracker.requestId}] Processing remaining buffer: "${buffer}"`)
if (buffer.startsWith('data: ')) {
try {
const event = JSON.parse(buffer.slice(6))
const jsonStr = buffer.slice(6)
const event = JSON.parse(jsonStr)
if (event.type === 'content' && event.data) {
assistantContent += event.data
}
// Forward remaining event, applying same error rewrite behavior
if (event?.type === 'error') {
const displayMessage: string =
(event?.data && (event.data.displayMessage as string)) ||
'Sorry, I encountered an error. Please try again.'
const formatted = `_${displayMessage}_`
assistantContent += formatted
try {
controller.enqueue(
encoder.encode(
`data: ${JSON.stringify({ type: 'content', data: formatted })}\n\n`
)
)
controller.enqueue(
encoder.encode(`data: ${JSON.stringify({ type: 'done' })}\n\n`)
)
} catch (enqueueErr) {
reader.cancel()
}
} else {
try {
controller.enqueue(encoder.encode(`data: ${jsonStr}\n\n`))
} catch (enqueueErr) {
reader.cancel()
}
}
} catch (e) {
logger.warn(`[${tracker.requestId}] Failed to parse final buffer: "${buffer}"`)
}

View File

@@ -3,7 +3,9 @@
import { type FC, memo, useEffect, useMemo, useState } from 'react'
import {
Blocks,
BookOpen,
Bot,
Box,
Check,
Clipboard,
Info,
@@ -11,6 +13,7 @@ import {
Loader2,
RotateCcw,
Shapes,
SquareChevronRight,
ThumbsDown,
ThumbsUp,
Workflow,
@@ -389,7 +392,9 @@ const CopilotMessage: FC<CopilotMessageProps> = memo(
const fromBlock = Array.isArray((block as any)?.contexts)
? ((block as any).contexts as any[])
: []
const allContexts = direct.length > 0 ? direct : fromBlock
const allContexts = (direct.length > 0 ? direct : fromBlock).filter(
(c: any) => c?.kind !== 'current_workflow'
)
const MAX_VISIBLE = 4
const visible = showAllContexts
? allContexts
@@ -404,14 +409,20 @@ const CopilotMessage: FC<CopilotMessageProps> = memo(
>
{ctx?.kind === 'past_chat' ? (
<Bot className='h-3 w-3 text-muted-foreground' />
) : ctx?.kind === 'workflow' ? (
) : ctx?.kind === 'workflow' || ctx?.kind === 'current_workflow' ? (
<Workflow className='h-3 w-3 text-muted-foreground' />
) : ctx?.kind === 'blocks' ? (
<Blocks className='h-3 w-3 text-muted-foreground' />
) : ctx?.kind === 'workflow_block' ? (
<Box className='h-3 w-3 text-muted-foreground' />
) : ctx?.kind === 'knowledge' ? (
<LibraryBig className='h-3 w-3 text-muted-foreground' />
) : ctx?.kind === 'templates' ? (
<Shapes className='h-3 w-3 text-muted-foreground' />
) : ctx?.kind === 'docs' ? (
<BookOpen className='h-3 w-3 text-muted-foreground' />
) : ctx?.kind === 'logs' ? (
<SquareChevronRight className='h-3 w-3 text-muted-foreground' />
) : (
<Info className='h-3 w-3 text-muted-foreground' />
)}
@@ -500,7 +511,10 @@ const CopilotMessage: FC<CopilotMessageProps> = memo(
const contexts: any[] = Array.isArray((message as any).contexts)
? ((message as any).contexts as any[])
: []
const labels = contexts.map((c) => c?.label).filter(Boolean) as string[]
const labels = contexts
.filter((c) => c?.kind !== 'current_workflow')
.map((c) => c?.label)
.filter(Boolean) as string[]
if (!labels.length) return <WordWrap text={text} />
const escapeRegex = (s: string) => s.replace(/[.*+?^${}()|[\]\\]/g, '\\$&')

View File

@@ -120,8 +120,10 @@ export function Panel() {
// Handle new chat creation with data loading
const handleNewChat = useCallback(async () => {
await ensureCopilotDataLoaded()
// Instantly clear to a fresh chat locally
copilotRef.current?.createNewChat()
// Ensure copilot data is loaded in the background (do not await)
ensureCopilotDataLoaded().catch(() => {})
}, [ensureCopilotDataLoaded])
// Handle history dropdown opening - use smart caching instead of force refresh

View File

@@ -64,7 +64,13 @@ export interface SendMessageRequest {
implicitFeedback?: string
fileAttachments?: MessageFileAttachment[]
abortSignal?: AbortSignal
contexts?: Array<{ kind: string; label?: string; chatId?: string; workflowId?: string }>
contexts?: Array<{
kind: string
label?: string
chatId?: string
workflowId?: string
executionId?: string
}>
}
/**

View File

@@ -8,10 +8,13 @@ import type { ChatContext } from '@/stores/copilot/types'
export type AgentContextType =
| 'past_chat'
| 'workflow'
| 'current_workflow'
| 'blocks'
| 'logs'
| 'knowledge'
| 'templates'
| 'workflow_block'
| 'docs'
export interface AgentContext {
type: AgentContextType
@@ -30,8 +33,12 @@ export async function processContexts(
if (ctx.kind === 'past_chat') {
return await processPastChatViaApi(ctx.chatId, ctx.label ? `@${ctx.label}` : '@')
}
if (ctx.kind === 'workflow' && ctx.workflowId) {
return await processWorkflowFromDb(ctx.workflowId, ctx.label ? `@${ctx.label}` : '@')
if ((ctx.kind === 'workflow' || ctx.kind === 'current_workflow') && ctx.workflowId) {
return await processWorkflowFromDb(
ctx.workflowId,
ctx.label ? `@${ctx.label}` : '@',
ctx.kind
)
}
if (ctx.kind === 'knowledge' && (ctx as any).knowledgeId) {
return await processKnowledgeFromDb(
@@ -48,7 +55,16 @@ export async function processContexts(
ctx.label ? `@${ctx.label}` : '@'
)
}
// Other kinds can be added here: workflow, blocks, logs, knowledge, templates
if (ctx.kind === 'logs' && (ctx as any).executionId) {
return await processExecutionLogFromDb(
(ctx as any).executionId,
ctx.label ? `@${ctx.label}` : '@'
)
}
if (ctx.kind === 'workflow_block' && ctx.workflowId && (ctx as any).blockId) {
return await processWorkflowBlockFromDb(ctx.workflowId, (ctx as any).blockId, ctx.label)
}
// Other kinds can be added here: workflow, blocks, logs, knowledge, templates, docs
return null
} catch (error) {
logger.error('Failed processing context', { ctx, error })
@@ -57,13 +73,14 @@ export async function processContexts(
})
const results = await Promise.all(tasks)
return results.filter((r): r is AgentContext => !!r)
return results.filter((r): r is AgentContext => !!r) as AgentContext[]
}
// Server-side variant (recommended for use in API routes)
export async function processContextsServer(
contexts: ChatContext[] | undefined,
userId: string
userId: string,
userMessage?: string
): Promise<AgentContext[]> {
if (!Array.isArray(contexts) || contexts.length === 0) return []
const tasks = contexts.map(async (ctx) => {
@@ -71,8 +88,12 @@ export async function processContextsServer(
if (ctx.kind === 'past_chat' && ctx.chatId) {
return await processPastChatFromDb(ctx.chatId, userId, ctx.label ? `@${ctx.label}` : '@')
}
if (ctx.kind === 'workflow' && ctx.workflowId) {
return await processWorkflowFromDb(ctx.workflowId, ctx.label ? `@${ctx.label}` : '@')
if ((ctx.kind === 'workflow' || ctx.kind === 'current_workflow') && ctx.workflowId) {
return await processWorkflowFromDb(
ctx.workflowId,
ctx.label ? `@${ctx.label}` : '@',
ctx.kind
)
}
if (ctx.kind === 'knowledge' && (ctx as any).knowledgeId) {
return await processKnowledgeFromDb(
@@ -89,6 +110,30 @@ export async function processContextsServer(
ctx.label ? `@${ctx.label}` : '@'
)
}
if (ctx.kind === 'logs' && (ctx as any).executionId) {
return await processExecutionLogFromDb(
(ctx as any).executionId,
ctx.label ? `@${ctx.label}` : '@'
)
}
if (ctx.kind === 'workflow_block' && ctx.workflowId && (ctx as any).blockId) {
return await processWorkflowBlockFromDb(ctx.workflowId, (ctx as any).blockId, ctx.label)
}
if (ctx.kind === 'docs') {
try {
const { searchDocumentationServerTool } = await import(
'@/lib/copilot/tools/server/docs/search-documentation'
)
const rawQuery = (userMessage || '').trim() || ctx.label || 'Sim documentation'
const query = sanitizeMessageForDocs(rawQuery, contexts)
const res = await searchDocumentationServerTool.execute({ query, topK: 10 })
const content = JSON.stringify(res?.results || [])
return { type: 'docs', tag: ctx.label ? `@${ctx.label}` : '@', content }
} catch (e) {
logger.error('Failed to process docs context', e)
return null
}
}
return null
} catch (error) {
logger.error('Failed processing context (server)', { ctx, error })
@@ -107,6 +152,57 @@ export async function processContextsServer(
return filtered
}
function escapeRegExp(input: string): string {
return input.replace(/[.*+?^${}()|[\]\\]/g, '\\$&')
}
function sanitizeMessageForDocs(rawMessage: string, contexts: ChatContext[] | undefined): string {
if (!rawMessage) return ''
if (!Array.isArray(contexts) || contexts.length === 0) {
// No context mapping; conservatively strip all @mentions-like tokens
const stripped = rawMessage
.replace(/(^|\s)@([^\s]+)/g, ' ')
.replace(/\s{2,}/g, ' ')
.trim()
return stripped
}
// Gather labels by kind
const blockLabels = new Set(
contexts
.filter((c) => c.kind === 'blocks')
.map((c) => c.label)
.filter((l): l is string => typeof l === 'string' && l.length > 0)
)
const nonBlockLabels = new Set(
contexts
.filter((c) => c.kind !== 'blocks')
.map((c) => c.label)
.filter((l): l is string => typeof l === 'string' && l.length > 0)
)
let result = rawMessage
// 1) Remove all non-block mentions entirely
for (const label of nonBlockLabels) {
const pattern = new RegExp(`(^|\\s)@${escapeRegExp(label)}(?!\\S)`, 'g')
result = result.replace(pattern, ' ')
}
// 2) For block mentions, strip the '@' but keep the block name
for (const label of blockLabels) {
const pattern = new RegExp(`@${escapeRegExp(label)}(?!\\S)`, 'g')
result = result.replace(pattern, label)
}
// 3) Remove any remaining @mentions (unknown or not in contexts)
result = result.replace(/(^|\s)@([^\s]+)/g, ' ')
// Normalize whitespace
result = result.replace(/\s{2,}/g, ' ').trim()
return result
}
async function processPastChatFromDb(
chatId: string,
userId: string,
@@ -149,7 +245,8 @@ async function processPastChatFromDb(
async function processWorkflowFromDb(
workflowId: string,
tag: string
tag: string,
kind: 'workflow' | 'current_workflow' = 'workflow'
): Promise<AgentContext | null> {
try {
const normalized = await loadWorkflowFromNormalizedTables(workflowId)
@@ -170,7 +267,8 @@ async function processWorkflowFromDb(
blocks: Object.keys(workflowState.blocks || {}).length,
edges: workflowState.edges.length,
})
return { type: 'workflow', tag, content }
// Use the provided kind for the type
return { type: kind, tag, content }
} catch (error) {
logger.error('Error processing workflow context', { workflowId, error })
return null
@@ -356,3 +454,85 @@ async function processTemplateFromDb(
return null
}
}
async function processWorkflowBlockFromDb(
workflowId: string,
blockId: string,
label?: string
): Promise<AgentContext | null> {
try {
const normalized = await loadWorkflowFromNormalizedTables(workflowId)
if (!normalized) return null
const block = (normalized.blocks as any)[blockId]
if (!block) return null
const tag = label ? `@${label} in Workflow` : `@${block.name || blockId} in Workflow`
// Build content: isolate the block and include its subBlocks fully
const contentObj = {
workflowId,
block: block,
}
const content = JSON.stringify(contentObj)
return { type: 'workflow_block', tag, content }
} catch (error) {
logger.error('Error processing workflow_block context', { workflowId, blockId, error })
return null
}
}
async function processExecutionLogFromDb(
executionId: string,
tag: string
): Promise<AgentContext | null> {
try {
const { workflowExecutionLogs, workflow } = await import('@/db/schema')
const { db } = await import('@/db')
const rows = await db
.select({
id: workflowExecutionLogs.id,
workflowId: workflowExecutionLogs.workflowId,
executionId: workflowExecutionLogs.executionId,
level: workflowExecutionLogs.level,
trigger: workflowExecutionLogs.trigger,
startedAt: workflowExecutionLogs.startedAt,
endedAt: workflowExecutionLogs.endedAt,
totalDurationMs: workflowExecutionLogs.totalDurationMs,
executionData: workflowExecutionLogs.executionData,
cost: workflowExecutionLogs.cost,
workflowName: workflow.name,
})
.from(workflowExecutionLogs)
.innerJoin(workflow, eq(workflowExecutionLogs.workflowId, workflow.id))
.where(eq(workflowExecutionLogs.executionId, executionId))
.limit(1)
const log = rows?.[0] as any
if (!log) return null
const summary = {
id: log.id,
workflowId: log.workflowId,
executionId: log.executionId,
level: log.level,
trigger: log.trigger,
startedAt: log.startedAt?.toISOString?.() || String(log.startedAt),
endedAt: log.endedAt?.toISOString?.() || (log.endedAt ? String(log.endedAt) : null),
totalDurationMs: log.totalDurationMs ?? null,
workflowName: log.workflowName || '',
// Include trace spans and any available details without being huge
executionData: log.executionData
? {
traceSpans: (log.executionData as any).traceSpans || undefined,
errorDetails: (log.executionData as any).errorDetails || undefined,
}
: undefined,
cost: log.cost || undefined,
}
const content = JSON.stringify(summary)
return { type: 'logs', tag, content }
} catch (error) {
logger.error('Error processing execution log context (db)', { executionId, error })
return null
}
}

View File

@@ -99,13 +99,44 @@ export class RunWorkflowClientTool extends BaseClientTool {
})
setIsExecuting(false)
logger.debug('Set isExecuting(false) and switching state to success')
this.setState(ClientToolCallState.success)
await this.markToolComplete(
200,
`Workflow execution completed. Started at: ${executionStartTime}`
)
// Determine success for both non-streaming and streaming executions
let succeeded = true
let errorMessage: string | undefined
try {
if (result && typeof result === 'object' && 'success' in (result as any)) {
succeeded = Boolean((result as any).success)
if (!succeeded) {
errorMessage = (result as any)?.error || (result as any)?.output?.error
}
} else if (
result &&
typeof result === 'object' &&
'execution' in (result as any) &&
(result as any).execution &&
typeof (result as any).execution === 'object'
) {
succeeded = Boolean((result as any).execution.success)
if (!succeeded) {
errorMessage =
(result as any).execution?.error || (result as any).execution?.output?.error
}
}
} catch {}
if (succeeded) {
logger.debug('Workflow execution finished with success')
this.setState(ClientToolCallState.success)
await this.markToolComplete(
200,
`Workflow execution completed. Started at: ${executionStartTime}`
)
} else {
const msg = errorMessage || 'Workflow execution failed'
logger.error('Workflow execution finished with failure', { message: msg })
this.setState(ClientToolCallState.error)
await this.markToolComplete(500, msg)
}
} catch (error: any) {
const message = error instanceof Error ? error.message : String(error)
const failedDependency = typeof message === 'string' && /dependency/i.test(message)

View File

@@ -1266,7 +1266,7 @@ async function* parseSSEStream(
// Initial state (subset required for UI/streaming)
const initialState = {
mode: 'agent' as const,
agentDepth: 0 as 0 | 1 | 2 | 3,
agentDepth: 1 as 0 | 1 | 2 | 3,
agentPrefetch: true,
currentChat: null as CopilotChat | null,
chats: [] as CopilotChat[],
@@ -1292,6 +1292,7 @@ const initialState = {
planTodos: [] as Array<{ id: string; content: string; completed?: boolean; executing?: boolean }>,
showPlanTodos: false,
toolCallsById: {} as Record<string, CopilotToolCall>,
suppressAutoSelect: false,
}
export const useCopilotStore = create<CopilotStore>()(
@@ -1351,14 +1352,31 @@ export const useCopilotStore = create<CopilotStore>()(
useWorkflowDiffStore.getState().clearDiff()
} catch {}
// Capture previous chat/messages for optimistic background save
const previousChat = currentChat
const previousMessages = get().messages
// Optimistically set selected chat and normalize messages for UI
set({
currentChat: chat,
messages: normalizeMessagesForUI(chat.messages || []),
planTodos: [],
showPlanTodos: false,
suppressAutoSelect: false,
})
// Background-save the previous chat's latest messages before switching (optimistic)
try {
if (previousChat && previousChat.id !== chat.id) {
const dbMessages = validateMessagesForLLM(previousMessages)
fetch('/api/copilot/chat/update-messages', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ chatId: previousChat.id, messages: dbMessages }),
}).catch(() => {})
}
} catch {}
// Refresh selected chat from server to ensure we have latest messages/tool calls
try {
const response = await fetch(`/api/copilot/chat?workflowId=${workflowId}`)
@@ -1392,12 +1410,27 @@ export const useCopilotStore = create<CopilotStore>()(
useWorkflowDiffStore.getState().clearDiff()
} catch {}
// Background-save the current chat before clearing (optimistic)
try {
const { currentChat } = get()
if (currentChat) {
const currentMessages = get().messages
const dbMessages = validateMessagesForLLM(currentMessages)
fetch('/api/copilot/chat/update-messages', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ chatId: currentChat.id, messages: dbMessages }),
}).catch(() => {})
}
} catch {}
set({
currentChat: null,
messages: [],
messageCheckpoints: {},
planTodos: [],
showPlanTodos: false,
suppressAutoSelect: true,
})
},
@@ -1432,7 +1465,7 @@ export const useCopilotStore = create<CopilotStore>()(
})
if (data.chats.length > 0) {
const { currentChat, isSendingMessage } = get()
const { currentChat, isSendingMessage, suppressAutoSelect } = get()
const currentChatStillExists =
currentChat && data.chats.some((c: CopilotChat) => c.id === currentChat.id)
@@ -1451,7 +1484,7 @@ export const useCopilotStore = create<CopilotStore>()(
try {
await get().loadMessageCheckpoints(updatedCurrentChat.id)
} catch {}
} else if (!isSendingMessage) {
} else if (!isSendingMessage && !suppressAutoSelect) {
const mostRecentChat: CopilotChat = data.chats[0]
set({
currentChat: mostRecentChat,
@@ -1565,6 +1598,9 @@ export const useCopilotStore = create<CopilotStore>()(
} else if (result.status === 402) {
errorContent =
'_Usage limit exceeded. To continue using this service, upgrade your plan or top up on credits._'
} else if (result.status === 403) {
errorContent =
'_Provider config not allowed for non-enterprise users. Please remove the provider config and try again_'
}
const errorMessage = createErrorMessage(streamingMessage.id, errorContent)
@@ -2065,6 +2101,7 @@ export const useCopilotStore = create<CopilotStore>()(
chatsLoadedForWorkflow: null,
planTodos: [],
showPlanTodos: false,
suppressAutoSelect: false,
})
},

View File

@@ -45,10 +45,13 @@ export interface CopilotMessage {
export type ChatContext =
| { kind: 'past_chat'; chatId: string; label: string }
| { kind: 'workflow'; workflowId: string; label: string }
| { kind: 'current_workflow'; workflowId: string; label: string }
| { kind: 'blocks'; blockIds: string[]; label: string }
| { kind: 'logs'; label: string }
| { kind: 'logs'; executionId?: string; label: string }
| { kind: 'workflow_block'; workflowId: string; blockId: string; label: string }
| { kind: 'knowledge'; knowledgeId?: string; label: string }
| { kind: 'templates'; templateId?: string; label: string }
| { kind: 'docs'; label: string }
export interface CopilotChat {
id: string
@@ -101,6 +104,9 @@ export interface CopilotState {
// Map of toolCallId -> CopilotToolCall for quick access during streaming
toolCallsById: Record<string, CopilotToolCall>
// Transient flag to prevent auto-selecting a chat during new-chat UX
suppressAutoSelect?: boolean
}
export interface CopilotActions {