feat(logs) Add messageId and requestId context to all mothership log messages (#3770)

Co-authored-by: Theodore Li <theo@sim.ai>
This commit is contained in:
Theodore Li
2026-03-25 14:27:02 -07:00
committed by GitHub
parent f94be08950
commit 87e8d3caf8
24 changed files with 1108 additions and 429 deletions

View File

@@ -14,6 +14,7 @@ import {
requestChatTitle,
SSE_RESPONSE_HEADERS,
} from '@/lib/copilot/chat-streaming'
import { appendCopilotLogContext } from '@/lib/copilot/logging'
import { COPILOT_REQUEST_MODES } from '@/lib/copilot/models'
import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator'
import { getStreamMeta, readStreamEvents } from '@/lib/copilot/orchestrator/stream/buffer'
@@ -182,25 +183,36 @@ export async function POST(req: NextRequest) {
const wf = await getWorkflowById(workflowId)
resolvedWorkspaceId = wf?.workspaceId ?? undefined
} catch {
logger.warn(`[${tracker.requestId}] Failed to resolve workspaceId from workflow`)
logger.warn(
appendCopilotLogContext('Failed to resolve workspaceId from workflow', {
requestId: tracker.requestId,
messageId: userMessageId,
})
)
}
const userMessageIdToUse = userMessageId || crypto.randomUUID()
try {
logger.info(`[${tracker.requestId}] Received chat POST`, {
workflowId,
hasContexts: Array.isArray(normalizedContexts),
contextsCount: Array.isArray(normalizedContexts) ? normalizedContexts.length : 0,
contextsPreview: Array.isArray(normalizedContexts)
? normalizedContexts.map((c: any) => ({
kind: c?.kind,
chatId: c?.chatId,
workflowId: c?.workflowId,
executionId: (c as any)?.executionId,
label: c?.label,
}))
: undefined,
})
logger.info(
appendCopilotLogContext('Received chat POST', {
requestId: tracker.requestId,
messageId: userMessageIdToUse,
}),
{
workflowId,
hasContexts: Array.isArray(normalizedContexts),
contextsCount: Array.isArray(normalizedContexts) ? normalizedContexts.length : 0,
contextsPreview: Array.isArray(normalizedContexts)
? normalizedContexts.map((c: any) => ({
kind: c?.kind,
chatId: c?.chatId,
workflowId: c?.workflowId,
executionId: (c as any)?.executionId,
label: c?.label,
}))
: undefined,
}
)
} catch {}
let currentChat: any = null
@@ -238,22 +250,40 @@ export async function POST(req: NextRequest) {
actualChatId
)
agentContexts = processed
logger.info(`[${tracker.requestId}] Contexts processed for request`, {
processedCount: agentContexts.length,
kinds: agentContexts.map((c) => c.type),
lengthPreview: agentContexts.map((c) => c.content?.length ?? 0),
})
logger.info(
appendCopilotLogContext('Contexts processed for request', {
requestId: tracker.requestId,
messageId: userMessageIdToUse,
}),
{
processedCount: agentContexts.length,
kinds: agentContexts.map((c) => c.type),
lengthPreview: agentContexts.map((c) => c.content?.length ?? 0),
}
)
if (
Array.isArray(normalizedContexts) &&
normalizedContexts.length > 0 &&
agentContexts.length === 0
) {
logger.warn(
`[${tracker.requestId}] Contexts provided but none processed. Check executionId for logs contexts.`
appendCopilotLogContext(
'Contexts provided but none processed. Check executionId for logs contexts.',
{
requestId: tracker.requestId,
messageId: userMessageIdToUse,
}
)
)
}
} catch (e) {
logger.error(`[${tracker.requestId}] Failed to process contexts`, e)
logger.error(
appendCopilotLogContext('Failed to process contexts', {
requestId: tracker.requestId,
messageId: userMessageIdToUse,
}),
e
)
}
}
@@ -283,7 +313,10 @@ export async function POST(req: NextRequest) {
agentContexts.push(result.value)
} else if (result.status === 'rejected') {
logger.error(
`[${tracker.requestId}] Failed to resolve resource attachment`,
appendCopilotLogContext('Failed to resolve resource attachment', {
requestId: tracker.requestId,
messageId: userMessageIdToUse,
}),
result.reason
)
}
@@ -324,20 +357,26 @@ export async function POST(req: NextRequest) {
)
try {
logger.info(`[${tracker.requestId}] About to call Sim Agent`, {
hasContext: agentContexts.length > 0,
contextCount: agentContexts.length,
hasFileAttachments: Array.isArray(requestPayload.fileAttachments),
messageLength: message.length,
mode: effectiveMode,
hasTools: Array.isArray(requestPayload.tools),
toolCount: Array.isArray(requestPayload.tools) ? requestPayload.tools.length : 0,
hasBaseTools: Array.isArray(requestPayload.baseTools),
baseToolCount: Array.isArray(requestPayload.baseTools)
? requestPayload.baseTools.length
: 0,
hasCredentials: !!requestPayload.credentials,
})
logger.info(
appendCopilotLogContext('About to call Sim Agent', {
requestId: tracker.requestId,
messageId: userMessageIdToUse,
}),
{
hasContext: agentContexts.length > 0,
contextCount: agentContexts.length,
hasFileAttachments: Array.isArray(requestPayload.fileAttachments),
messageLength: message.length,
mode: effectiveMode,
hasTools: Array.isArray(requestPayload.tools),
toolCount: Array.isArray(requestPayload.tools) ? requestPayload.tools.length : 0,
hasBaseTools: Array.isArray(requestPayload.baseTools),
baseToolCount: Array.isArray(requestPayload.baseTools)
? requestPayload.baseTools.length
: 0,
hasCredentials: !!requestPayload.credentials,
}
)
} catch {}
if (stream && actualChatId) {
@@ -481,10 +520,16 @@ export async function POST(req: NextRequest) {
.where(eq(copilotChats.id, actualChatId))
}
} catch (error) {
logger.error(`[${tracker.requestId}] Failed to persist chat messages`, {
chatId: actualChatId,
error: error instanceof Error ? error.message : 'Unknown error',
})
logger.error(
appendCopilotLogContext('Failed to persist chat messages', {
requestId: tracker.requestId,
messageId: userMessageIdToUse,
}),
{
chatId: actualChatId,
error: error instanceof Error ? error.message : 'Unknown error',
}
)
}
},
},
@@ -510,13 +555,19 @@ export async function POST(req: NextRequest) {
provider: typeof requestPayload?.provider === 'string' ? requestPayload.provider : undefined,
}
logger.info(`[${tracker.requestId}] Non-streaming response from orchestrator:`, {
hasContent: !!responseData.content,
contentLength: responseData.content?.length || 0,
model: responseData.model,
provider: responseData.provider,
toolCallsCount: responseData.toolCalls?.length || 0,
})
logger.info(
appendCopilotLogContext('Non-streaming response from orchestrator', {
requestId: tracker.requestId,
messageId: userMessageIdToUse,
}),
{
hasContent: !!responseData.content,
contentLength: responseData.content?.length || 0,
model: responseData.model,
provider: responseData.provider,
toolCallsCount: responseData.toolCalls?.length || 0,
}
)
// Save messages if we have a chat
if (currentChat && responseData.content) {
@@ -549,8 +600,13 @@ export async function POST(req: NextRequest) {
// Start title generation in parallel if this is first message (non-streaming)
if (actualChatId && !currentChat.title && conversationHistory.length === 0) {
logger.info(`[${tracker.requestId}] Starting title generation for non-streaming response`)
requestChatTitle({ message, model: selectedModel, provider })
logger.info(
appendCopilotLogContext('Starting title generation for non-streaming response', {
requestId: tracker.requestId,
messageId: userMessageIdToUse,
})
)
requestChatTitle({ message, model: selectedModel, provider, messageId: userMessageIdToUse })
.then(async (title) => {
if (title) {
await db
@@ -560,11 +616,22 @@ export async function POST(req: NextRequest) {
updatedAt: new Date(),
})
.where(eq(copilotChats.id, actualChatId!))
logger.info(`[${tracker.requestId}] Generated and saved title: ${title}`)
logger.info(
appendCopilotLogContext(`Generated and saved title: ${title}`, {
requestId: tracker.requestId,
messageId: userMessageIdToUse,
})
)
}
})
.catch((error) => {
logger.error(`[${tracker.requestId}] Title generation failed:`, error)
logger.error(
appendCopilotLogContext('Title generation failed', {
requestId: tracker.requestId,
messageId: userMessageIdToUse,
}),
error
)
})
}
@@ -578,11 +645,17 @@ export async function POST(req: NextRequest) {
.where(eq(copilotChats.id, actualChatId!))
}
logger.info(`[${tracker.requestId}] Returning non-streaming response`, {
duration: tracker.getDuration(),
chatId: actualChatId,
responseLength: responseData.content?.length || 0,
})
logger.info(
appendCopilotLogContext('Returning non-streaming response', {
requestId: tracker.requestId,
messageId: userMessageIdToUse,
}),
{
duration: tracker.getDuration(),
chatId: actualChatId,
responseLength: responseData.content?.length || 0,
}
)
return NextResponse.json({
success: true,
@@ -606,21 +679,33 @@ export async function POST(req: NextRequest) {
const duration = tracker.getDuration()
if (error instanceof z.ZodError) {
logger.error(`[${tracker.requestId}] Validation error:`, {
duration,
errors: error.errors,
})
logger.error(
appendCopilotLogContext('Validation error', {
requestId: tracker.requestId,
messageId: pendingChatStreamID ?? undefined,
}),
{
duration,
errors: error.errors,
}
)
return NextResponse.json(
{ error: 'Invalid request data', details: error.errors },
{ status: 400 }
)
}
logger.error(`[${tracker.requestId}] Error handling copilot chat:`, {
duration,
error: error instanceof Error ? error.message : 'Unknown error',
stack: error instanceof Error ? error.stack : undefined,
})
logger.error(
appendCopilotLogContext('Error handling copilot chat', {
requestId: tracker.requestId,
messageId: pendingChatStreamID ?? undefined,
}),
{
duration,
error: error instanceof Error ? error.message : 'Unknown error',
stack: error instanceof Error ? error.stack : undefined,
}
)
return NextResponse.json(
{ error: error instanceof Error ? error.message : 'Internal server error' },
@@ -665,11 +750,16 @@ export async function GET(req: NextRequest) {
status: meta?.status || 'unknown',
}
} catch (err) {
logger.warn('Failed to read stream snapshot for chat', {
chatId,
conversationId: chat.conversationId,
error: err instanceof Error ? err.message : String(err),
})
logger.warn(
appendCopilotLogContext('Failed to read stream snapshot for chat', {
messageId: chat.conversationId || undefined,
}),
{
chatId,
conversationId: chat.conversationId,
error: err instanceof Error ? err.message : String(err),
}
)
}
}
@@ -688,7 +778,11 @@ export async function GET(req: NextRequest) {
...(streamSnapshot ? { streamSnapshot } : {}),
}
logger.info(`Retrieved chat ${chatId}`)
logger.info(
appendCopilotLogContext(`Retrieved chat ${chatId}`, {
messageId: chat.conversationId || undefined,
})
)
return NextResponse.json({ success: true, chat: transformedChat })
}
@@ -750,7 +844,7 @@ export async function GET(req: NextRequest) {
chats: transformedChats,
})
} catch (error) {
logger.error('Error fetching copilot chats:', error)
logger.error('Error fetching copilot chats', error)
return createInternalServerErrorResponse('Failed to fetch chats')
}
}

View File

@@ -1,5 +1,6 @@
import { createLogger } from '@sim/logger'
import { type NextRequest, NextResponse } from 'next/server'
import { appendCopilotLogContext } from '@/lib/copilot/logging'
import {
getStreamMeta,
readStreamEvents,
@@ -35,12 +36,24 @@ export async function GET(request: NextRequest) {
const toParam = url.searchParams.get('to')
const toEventId = toParam ? Number(toParam) : undefined
logger.info(
appendCopilotLogContext('[Resume] Received resume request', {
messageId: streamId || undefined,
}),
{
streamId: streamId || undefined,
fromEventId,
toEventId,
batchMode,
}
)
if (!streamId) {
return NextResponse.json({ error: 'streamId is required' }, { status: 400 })
}
const meta = (await getStreamMeta(streamId)) as StreamMeta | null
logger.info('[Resume] Stream lookup', {
logger.info(appendCopilotLogContext('[Resume] Stream lookup', { messageId: streamId }), {
streamId,
fromEventId,
toEventId,
@@ -59,7 +72,7 @@ export async function GET(request: NextRequest) {
if (batchMode) {
const events = await readStreamEvents(streamId, fromEventId)
const filteredEvents = toEventId ? events.filter((e) => e.eventId <= toEventId) : events
logger.info('[Resume] Batch response', {
logger.info(appendCopilotLogContext('[Resume] Batch response', { messageId: streamId }), {
streamId,
fromEventId,
toEventId,
@@ -111,11 +124,14 @@ export async function GET(request: NextRequest) {
const flushEvents = async () => {
const events = await readStreamEvents(streamId, lastEventId)
if (events.length > 0) {
logger.info('[Resume] Flushing events', {
streamId,
fromEventId: lastEventId,
eventCount: events.length,
})
logger.info(
appendCopilotLogContext('[Resume] Flushing events', { messageId: streamId }),
{
streamId,
fromEventId: lastEventId,
eventCount: events.length,
}
)
}
for (const entry of events) {
lastEventId = entry.eventId
@@ -162,7 +178,7 @@ export async function GET(request: NextRequest) {
}
} catch (error) {
if (!controllerClosed && !request.signal.aborted) {
logger.warn('Stream replay failed', {
logger.warn(appendCopilotLogContext('Stream replay failed', { messageId: streamId }), {
streamId,
error: error instanceof Error ? error.message : String(error),
})

View File

@@ -12,6 +12,7 @@ import {
createSSEStream,
SSE_RESPONSE_HEADERS,
} from '@/lib/copilot/chat-streaming'
import { appendCopilotLogContext } from '@/lib/copilot/logging'
import type { OrchestratorResult } from '@/lib/copilot/orchestrator/types'
import { processContextsServer, resolveActiveResourceContext } from '@/lib/copilot/process-contents'
import { createRequestTracker, createUnauthorizedResponse } from '@/lib/copilot/request-helpers'
@@ -87,6 +88,7 @@ const MothershipMessageSchema = z.object({
*/
export async function POST(req: NextRequest) {
const tracker = createRequestTracker()
let userMessageIdForLogs: string | undefined
try {
const session = await getSession()
@@ -109,6 +111,28 @@ export async function POST(req: NextRequest) {
} = MothershipMessageSchema.parse(body)
const userMessageId = providedMessageId || crypto.randomUUID()
userMessageIdForLogs = userMessageId
logger.info(
appendCopilotLogContext('Received mothership chat start request', {
requestId: tracker.requestId,
messageId: userMessageId,
}),
{
workspaceId,
chatId,
createNewChat,
hasContexts: Array.isArray(contexts) && contexts.length > 0,
contextsCount: Array.isArray(contexts) ? contexts.length : 0,
hasResourceAttachments:
Array.isArray(resourceAttachments) && resourceAttachments.length > 0,
resourceAttachmentCount: Array.isArray(resourceAttachments)
? resourceAttachments.length
: 0,
hasFileAttachments: Array.isArray(fileAttachments) && fileAttachments.length > 0,
fileAttachmentCount: Array.isArray(fileAttachments) ? fileAttachments.length : 0,
}
)
try {
await assertActiveWorkspaceAccess(workspaceId, authenticatedUserId)
@@ -150,7 +174,13 @@ export async function POST(req: NextRequest) {
actualChatId
)
} catch (e) {
logger.error(`[${tracker.requestId}] Failed to process contexts`, e)
logger.error(
appendCopilotLogContext('Failed to process contexts', {
requestId: tracker.requestId,
messageId: userMessageId,
}),
e
)
}
}
@@ -176,7 +206,10 @@ export async function POST(req: NextRequest) {
agentContexts.push(result.value)
} else if (result.status === 'rejected') {
logger.error(
`[${tracker.requestId}] Failed to resolve resource attachment`,
appendCopilotLogContext('Failed to resolve resource attachment', {
requestId: tracker.requestId,
messageId: userMessageId,
}),
result.reason
)
}
@@ -366,10 +399,16 @@ export async function POST(req: NextRequest) {
})
}
} catch (error) {
logger.error(`[${tracker.requestId}] Failed to persist chat messages`, {
chatId: actualChatId,
error: error instanceof Error ? error.message : 'Unknown error',
})
logger.error(
appendCopilotLogContext('Failed to persist chat messages', {
requestId: tracker.requestId,
messageId: userMessageId,
}),
{
chatId: actualChatId,
error: error instanceof Error ? error.message : 'Unknown error',
}
)
}
},
},
@@ -384,9 +423,15 @@ export async function POST(req: NextRequest) {
)
}
logger.error(`[${tracker.requestId}] Error handling mothership chat:`, {
error: error instanceof Error ? error.message : 'Unknown error',
})
logger.error(
appendCopilotLogContext('Error handling mothership chat', {
requestId: tracker.requestId,
messageId: userMessageIdForLogs,
}),
{
error: error instanceof Error ? error.message : 'Unknown error',
}
)
return NextResponse.json(
{ error: error instanceof Error ? error.message : 'Internal server error' },

View File

@@ -3,6 +3,7 @@ import { type NextRequest, NextResponse } from 'next/server'
import { z } from 'zod'
import { checkInternalAuth } from '@/lib/auth/hybrid'
import { buildIntegrationToolSchemas } from '@/lib/copilot/chat-payload'
import { appendCopilotLogContext } from '@/lib/copilot/logging'
import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator'
import { generateWorkspaceContext } from '@/lib/copilot/workspace-context'
import {
@@ -35,6 +36,8 @@ const ExecuteRequestSchema = z.object({
* Consumes the Go SSE stream internally and returns a single JSON response.
*/
export async function POST(req: NextRequest) {
let messageId: string | undefined
try {
const auth = await checkInternalAuth(req, { requireWorkflowId: false })
if (!auth.success) {
@@ -48,9 +51,10 @@ export async function POST(req: NextRequest) {
await assertActiveWorkspaceAccess(workspaceId, userId)
const effectiveChatId = chatId || crypto.randomUUID()
messageId = crypto.randomUUID()
const [workspaceContext, integrationTools, userPermission] = await Promise.all([
generateWorkspaceContext(workspaceId, userId),
buildIntegrationToolSchemas(userId),
buildIntegrationToolSchemas(userId, messageId),
getUserEntityPermissions(userId, 'workspace', workspaceId).catch(() => null),
])
@@ -60,7 +64,7 @@ export async function POST(req: NextRequest) {
userId,
chatId: effectiveChatId,
mode: 'agent',
messageId: crypto.randomUUID(),
messageId,
isHosted: true,
workspaceContext,
...(integrationTools.length > 0 ? { integrationTools } : {}),
@@ -77,7 +81,7 @@ export async function POST(req: NextRequest) {
})
if (!result.success) {
logger.error('Mothership execute failed', {
logger.error(appendCopilotLogContext('Mothership execute failed', { messageId }), {
error: result.error,
errors: result.errors,
})
@@ -116,7 +120,7 @@ export async function POST(req: NextRequest) {
)
}
logger.error('Mothership execute error', {
logger.error(appendCopilotLogContext('Mothership execute error', { messageId }), {
error: error instanceof Error ? error.message : 'Unknown error',
})

View File

@@ -1,6 +1,7 @@
import { createLogger } from '@sim/logger'
import { type NextRequest, NextResponse } from 'next/server'
import { z } from 'zod'
import { appendCopilotLogContext } from '@/lib/copilot/logging'
import { COPILOT_REQUEST_MODES } from '@/lib/copilot/models'
import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator'
import { getWorkflowById, resolveWorkflowIdForUser } from '@/lib/workflows/utils'
@@ -32,6 +33,7 @@ const RequestSchema = z.object({
* - The copilot can still operate on any workflow using list_user_workflows
*/
export async function POST(req: NextRequest) {
let messageId: string | undefined
const auth = await authenticateV1Request(req)
if (!auth.authenticated || !auth.userId) {
return NextResponse.json(
@@ -80,13 +82,25 @@ export async function POST(req: NextRequest) {
// Always generate a chatId - required for artifacts system to work with subagents
const chatId = parsed.chatId || crypto.randomUUID()
messageId = crypto.randomUUID()
logger.info(
appendCopilotLogContext('Received headless copilot chat start request', { messageId }),
{
workflowId: resolved.workflowId,
workflowName: parsed.workflowName,
chatId,
mode: transportMode,
autoExecuteTools: parsed.autoExecuteTools,
timeout: parsed.timeout,
}
)
const requestPayload = {
message: parsed.message,
workflowId: resolved.workflowId,
userId: auth.userId,
model: selectedModel,
mode: transportMode,
messageId: crypto.randomUUID(),
messageId,
chatId,
}
@@ -115,7 +129,7 @@ export async function POST(req: NextRequest) {
)
}
logger.error('Headless copilot request failed', {
logger.error(appendCopilotLogContext('Headless copilot request failed', { messageId }), {
error: error instanceof Error ? error.message : String(error),
})
return NextResponse.json({ success: false, error: 'Internal server error' }, { status: 500 })

View File

@@ -1,5 +1,6 @@
import { createLogger } from '@sim/logger'
import { getUserSubscriptionState } from '@/lib/billing/core/subscription'
import { appendCopilotLogContext } from '@/lib/copilot/logging'
import { getCopilotToolDescription } from '@/lib/copilot/tool-descriptions'
import { isHosted } from '@/lib/core/config/feature-flags'
import { createMcpToolId } from '@/lib/mcp/utils'
@@ -45,7 +46,10 @@ export interface ToolSchema {
* Shared by the interactive chat payload builder and the non-interactive
* block execution route so both paths send the same tool definitions to Go.
*/
export async function buildIntegrationToolSchemas(userId: string): Promise<ToolSchema[]> {
export async function buildIntegrationToolSchemas(
userId: string,
messageId?: string
): Promise<ToolSchema[]> {
const integrationTools: ToolSchema[] = []
try {
const { createUserToolSchema } = await import('@/tools/params')
@@ -56,10 +60,15 @@ export async function buildIntegrationToolSchemas(userId: string): Promise<ToolS
const subscriptionState = await getUserSubscriptionState(userId)
shouldAppendEmailTagline = subscriptionState.isFree
} catch (error) {
logger.warn('Failed to load subscription state for copilot tool descriptions', {
userId,
error: error instanceof Error ? error.message : String(error),
})
logger.warn(
appendCopilotLogContext('Failed to load subscription state for copilot tool descriptions', {
messageId,
}),
{
userId,
error: error instanceof Error ? error.message : String(error),
}
)
}
for (const [toolId, toolConfig] of Object.entries(latestTools)) {
@@ -83,14 +92,17 @@ export async function buildIntegrationToolSchemas(userId: string): Promise<ToolS
}),
})
} catch (toolError) {
logger.warn('Failed to build schema for tool, skipping', {
toolId,
error: toolError instanceof Error ? toolError.message : String(toolError),
})
logger.warn(
appendCopilotLogContext('Failed to build schema for tool, skipping', { messageId }),
{
toolId,
error: toolError instanceof Error ? toolError.message : String(toolError),
}
)
}
}
} catch (error) {
logger.warn('Failed to build tool schemas', {
logger.warn(appendCopilotLogContext('Failed to build tool schemas', { messageId }), {
error: error instanceof Error ? error.message : String(error),
})
}
@@ -171,7 +183,7 @@ export async function buildCopilotRequestPayload(
let integrationTools: ToolSchema[] = []
if (effectiveMode === 'build') {
integrationTools = await buildIntegrationToolSchemas(userId)
integrationTools = await buildIntegrationToolSchemas(userId, userMessageId)
// Discover MCP tools from workspace servers and include as deferred tools
if (workflowId) {
@@ -189,13 +201,23 @@ export async function buildCopilotRequestPayload(
})
}
if (mcpTools.length > 0) {
logger.info('Added MCP tools to copilot payload', { count: mcpTools.length })
logger.info(
appendCopilotLogContext('Added MCP tools to copilot payload', {
messageId: userMessageId,
}),
{ count: mcpTools.length }
)
}
}
} catch (error) {
logger.warn('Failed to discover MCP tools for copilot', {
error: error instanceof Error ? error.message : String(error),
})
logger.warn(
appendCopilotLogContext('Failed to discover MCP tools for copilot', {
messageId: userMessageId,
}),
{
error: error instanceof Error ? error.message : String(error),
}
)
}
}
}

View File

@@ -4,6 +4,7 @@ import { createLogger } from '@sim/logger'
import { eq } from 'drizzle-orm'
import { createRunSegment, updateRunStatus } from '@/lib/copilot/async-runs/repository'
import { SIM_AGENT_API_URL } from '@/lib/copilot/constants'
import { appendCopilotLogContext } from '@/lib/copilot/logging'
import type { OrchestrateStreamOptions } from '@/lib/copilot/orchestrator'
import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator'
import {
@@ -205,8 +206,9 @@ export async function requestChatTitle(params: {
message: string
model: string
provider?: string
messageId?: string
}): Promise<string | null> {
const { message, model, provider } = params
const { message, model, provider, messageId } = params
if (!message || !model) return null
const headers: Record<string, string> = { 'Content-Type': 'application/json' }
@@ -223,17 +225,20 @@ export async function requestChatTitle(params: {
const payload = await response.json().catch(() => ({}))
if (!response.ok) {
logger.warn('Failed to generate chat title via copilot backend', {
status: response.status,
error: payload,
})
logger.warn(
appendCopilotLogContext('Failed to generate chat title via copilot backend', { messageId }),
{
status: response.status,
error: payload,
}
)
return null
}
const title = typeof payload?.title === 'string' ? payload.title.trim() : ''
return title || null
} catch (error) {
logger.error('Error generating chat title:', error)
logger.error(appendCopilotLogContext('Error generating chat title', { messageId }), error)
return null
}
}
@@ -274,6 +279,8 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS
orchestrateOptions,
pendingChatStreamAlreadyRegistered = false,
} = params
const messageId =
typeof requestPayload.messageId === 'string' ? requestPayload.messageId : streamId
let eventWriter: ReturnType<typeof createStreamEventWriter> | null = null
let clientDisconnected = false
@@ -303,9 +310,15 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS
provider: (requestPayload.provider as string | undefined) || null,
requestContext: { requestId },
}).catch((error) => {
logger.warn(`[${requestId}] Failed to create copilot run segment`, {
error: error instanceof Error ? error.message : String(error),
})
logger.warn(
appendCopilotLogContext('Failed to create copilot run segment', {
requestId,
messageId,
}),
{
error: error instanceof Error ? error.message : String(error),
}
)
})
}
eventWriter = createStreamEventWriter(streamId)
@@ -324,10 +337,16 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS
await redis.del(getStreamAbortKey(streamId))
}
} catch (error) {
logger.warn(`[${requestId}] Failed to poll distributed stream abort`, {
streamId,
error: error instanceof Error ? error.message : String(error),
})
logger.warn(
appendCopilotLogContext('Failed to poll distributed stream abort', {
requestId,
messageId,
}),
{
streamId,
error: error instanceof Error ? error.message : String(error),
}
)
}
})()
}, STREAM_ABORT_POLL_MS)
@@ -344,11 +363,14 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS
await eventWriter.flush()
}
} catch (error) {
logger.error(`[${requestId}] Failed to persist stream event`, {
eventType: event.type,
eventId,
error: error instanceof Error ? error.message : String(error),
})
logger.error(
appendCopilotLogContext('Failed to persist stream event', { requestId, messageId }),
{
eventType: event.type,
eventId,
error: error instanceof Error ? error.message : String(error),
}
)
// Keep the live SSE stream going even if durable buffering hiccups.
}
@@ -367,7 +389,7 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS
try {
await pushEvent(event)
} catch (error) {
logger.error(`[${requestId}] Failed to push event`, {
logger.error(appendCopilotLogContext('Failed to push event', { requestId, messageId }), {
eventType: event.type,
error: error instanceof Error ? error.message : String(error),
})
@@ -379,7 +401,7 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS
}
if (chatId && !currentChat?.title && isNewChat) {
requestChatTitle({ message, model: titleModel, provider: titleProvider })
requestChatTitle({ message, model: titleModel, provider: titleProvider, messageId })
.then(async (title) => {
if (title) {
await db.update(copilotChats).set({ title }).where(eq(copilotChats.id, chatId!))
@@ -390,7 +412,10 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS
}
})
.catch((error) => {
logger.error(`[${requestId}] Title generation failed:`, error)
logger.error(
appendCopilotLogContext('Title generation failed', { requestId, messageId }),
error
)
})
}
@@ -415,7 +440,9 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS
})
if (abortController.signal.aborted) {
logger.info(`[${requestId}] Stream aborted by explicit stop`)
logger.info(
appendCopilotLogContext('Stream aborted by explicit stop', { requestId, messageId })
)
await eventWriter.close().catch(() => {})
await setStreamMeta(streamId, { status: 'cancelled', userId, executionId, runId })
await updateRunStatus(runId, 'cancelled', { completedAt: new Date() }).catch(() => {})
@@ -429,14 +456,23 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS
'An unexpected error occurred while processing the response.'
if (clientDisconnected) {
logger.info(`[${requestId}] Stream failed after client disconnect`, {
error: errorMessage,
})
logger.info(
appendCopilotLogContext('Stream failed after client disconnect', {
requestId,
messageId,
}),
{
error: errorMessage,
}
)
}
logger.error(`[${requestId}] Orchestration returned failure`, {
error: errorMessage,
})
logger.error(
appendCopilotLogContext('Orchestration returned failure', { requestId, messageId }),
{
error: errorMessage,
}
)
await pushEventBestEffort({
type: 'error',
error: errorMessage,
@@ -464,18 +500,29 @@ export function createSSEStream(params: StreamingOrchestrationParams): ReadableS
await updateRunStatus(runId, 'complete', { completedAt: new Date() }).catch(() => {})
} catch (error) {
if (abortController.signal.aborted) {
logger.info(`[${requestId}] Stream aborted by explicit stop`)
logger.info(
appendCopilotLogContext('Stream aborted by explicit stop', { requestId, messageId })
)
await eventWriter.close().catch(() => {})
await setStreamMeta(streamId, { status: 'cancelled', userId, executionId, runId })
await updateRunStatus(runId, 'cancelled', { completedAt: new Date() }).catch(() => {})
return
}
if (clientDisconnected) {
logger.info(`[${requestId}] Stream errored after client disconnect`, {
error: error instanceof Error ? error.message : 'Stream error',
})
logger.info(
appendCopilotLogContext('Stream errored after client disconnect', {
requestId,
messageId,
}),
{
error: error instanceof Error ? error.message : 'Stream error',
}
)
}
logger.error(`[${requestId}] Orchestration error:`, error)
logger.error(
appendCopilotLogContext('Orchestration error', { requestId, messageId }),
error
)
const errorMessage = error instanceof Error ? error.message : 'Stream error'
await pushEventBestEffort({
type: 'error',

View File

@@ -0,0 +1,25 @@
export interface CopilotLogContext {
requestId?: string
messageId?: string
}
/**
* Appends copilot request identifiers to a log message.
*/
export function appendCopilotLogContext(message: string, context: CopilotLogContext = {}): string {
const suffixParts: string[] = []
if (context.requestId) {
suffixParts.push(`requestId:${context.requestId}`)
}
if (context.messageId) {
suffixParts.push(`messageId:${context.messageId}`)
}
if (suffixParts.length === 0) {
return message
}
return `${message} [${suffixParts.join(' ')}]`
}

View File

@@ -14,6 +14,7 @@ import {
updateRunStatus,
} from '@/lib/copilot/async-runs/repository'
import { SIM_AGENT_API_URL, SIM_AGENT_VERSION } from '@/lib/copilot/constants'
import { appendCopilotLogContext } from '@/lib/copilot/logging'
import {
isToolAvailableOnSimSide,
prepareExecutionContext,
@@ -117,15 +118,32 @@ export async function orchestrateCopilotStream(
execContext.abortSignal = options.abortSignal
const payloadMsgId = requestPayload?.messageId
const messageId = typeof payloadMsgId === 'string' ? payloadMsgId : crypto.randomUUID()
execContext.messageId = messageId
const context = createStreamingContext({
chatId,
executionId,
runId,
messageId: typeof payloadMsgId === 'string' ? payloadMsgId : crypto.randomUUID(),
messageId,
})
const withLogContext = (message: string) =>
appendCopilotLogContext(message, {
requestId: context.requestId,
messageId,
})
let claimedToolCallIds: string[] = []
let claimedByWorkerId: string | null = null
logger.info(withLogContext('Starting copilot orchestration'), {
goRoute,
workflowId,
workspaceId,
chatId,
executionId,
runId,
hasUserTimezone: Boolean(userTimezone),
})
try {
let route = goRoute
let payload = requestPayload
@@ -135,6 +153,12 @@ export async function orchestrateCopilotStream(
for (;;) {
context.streamComplete = false
logger.info(withLogContext('Starting orchestration loop iteration'), {
route,
hasPendingAsyncContinuation: Boolean(context.awaitingAsyncContinuation),
claimedToolCallCount: claimedToolCallIds.length,
})
const loopOptions = {
...options,
onEvent: async (event: SSEEvent) => {
@@ -142,6 +166,14 @@ export async function orchestrateCopilotStream(
const d = (event.data ?? {}) as Record<string, unknown>
const response = (d.response ?? {}) as Record<string, unknown>
if (response.async_pause) {
logger.info(withLogContext('Detected async pause from copilot backend'), {
route,
checkpointId:
typeof (response.async_pause as Record<string, unknown>)?.checkpointId ===
'string'
? (response.async_pause as Record<string, unknown>).checkpointId
: undefined,
})
if (runId) {
await updateRunStatus(runId, 'paused_waiting_for_tool').catch(() => {})
}
@@ -167,8 +199,18 @@ export async function orchestrateCopilotStream(
loopOptions
)
logger.info(withLogContext('Completed orchestration loop iteration'), {
route,
streamComplete: context.streamComplete,
wasAborted: context.wasAborted,
hasAsyncContinuation: Boolean(context.awaitingAsyncContinuation),
errorCount: context.errors.length,
})
if (claimedToolCallIds.length > 0) {
logger.info('Marking async tool calls as delivered', { toolCallIds: claimedToolCallIds })
logger.info(withLogContext('Marking async tool calls as delivered'), {
toolCallIds: claimedToolCallIds,
})
await Promise.all(
claimedToolCallIds.map((toolCallId) =>
markAsyncToolDelivered(toolCallId).catch(() => null)
@@ -179,6 +221,11 @@ export async function orchestrateCopilotStream(
}
if (options.abortSignal?.aborted || context.wasAborted) {
logger.info(withLogContext('Stopping orchestration because request was aborted'), {
pendingToolCallCount: Array.from(context.toolCalls.values()).filter(
(toolCall) => toolCall.status === 'pending' || toolCall.status === 'executing'
).length,
})
for (const [toolCallId, toolCall] of context.toolCalls) {
if (toolCall.status === 'pending' || toolCall.status === 'executing') {
toolCall.status = 'cancelled'
@@ -191,10 +238,18 @@ export async function orchestrateCopilotStream(
}
const continuation = context.awaitingAsyncContinuation
if (!continuation) break
if (!continuation) {
logger.info(withLogContext('No async continuation pending; finishing orchestration'))
break
}
let resumeReady = false
let resumeRetries = 0
logger.info(withLogContext('Processing async continuation'), {
checkpointId: continuation.checkpointId,
runId: continuation.runId,
pendingToolCallIds: continuation.pendingToolCallIds,
})
for (;;) {
claimedToolCallIds = []
claimedByWorkerId = null
@@ -210,21 +265,31 @@ export async function orchestrateCopilotStream(
if (localPendingPromise) {
localPendingPromises.push(localPendingPromise)
logger.info('Waiting for local async tool completion before retrying resume claim', {
toolCallId,
runId: continuation.runId,
})
logger.info(
withLogContext(
'Waiting for local async tool completion before retrying resume claim'
),
{
toolCallId,
runId: continuation.runId,
}
)
continue
}
if (durableRow && isTerminalAsyncStatus(durableRow.status)) {
if (durableRow.claimedBy && durableRow.claimedBy !== resumeWorkerId) {
missingToolCallIds.push(toolCallId)
logger.warn('Async tool continuation is waiting on a claim held by another worker', {
toolCallId,
runId: continuation.runId,
claimedBy: durableRow.claimedBy,
})
logger.warn(
withLogContext(
'Async tool continuation is waiting on a claim held by another worker'
),
{
toolCallId,
runId: continuation.runId,
claimedBy: durableRow.claimedBy,
}
)
continue
}
readyTools.push({
@@ -243,12 +308,15 @@ export async function orchestrateCopilotStream(
isTerminalToolCallStatus(toolState.status) &&
!isToolAvailableOnSimSide(toolState.name)
) {
logger.info('Including Go-handled tool in resume payload (no Sim-side row)', {
toolCallId,
toolName: toolState.name,
status: toolState.status,
runId: continuation.runId,
})
logger.info(
withLogContext('Including Go-handled tool in resume payload (no Sim-side row)'),
{
toolCallId,
toolName: toolState.name,
status: toolState.status,
runId: continuation.runId,
}
)
readyTools.push({
toolCallId,
toolState,
@@ -258,7 +326,7 @@ export async function orchestrateCopilotStream(
continue
}
logger.warn('Skipping already-claimed or missing async tool resume', {
logger.warn(withLogContext('Skipping already-claimed or missing async tool resume'), {
toolCallId,
runId: continuation.runId,
durableStatus: durableRow?.status,
@@ -268,6 +336,13 @@ export async function orchestrateCopilotStream(
}
if (localPendingPromises.length > 0) {
logger.info(
withLogContext('Waiting for local pending async tools before resuming continuation'),
{
checkpointId: continuation.checkpointId,
pendingPromiseCount: localPendingPromises.length,
}
)
await Promise.allSettled(localPendingPromises)
continue
}
@@ -275,15 +350,28 @@ export async function orchestrateCopilotStream(
if (missingToolCallIds.length > 0) {
if (resumeRetries < 3) {
resumeRetries++
logger.info('Retrying async resume after some tool calls were not yet ready', {
checkpointId: continuation.checkpointId,
runId: continuation.runId,
retry: resumeRetries,
missingToolCallIds,
})
logger.info(
withLogContext('Retrying async resume after some tool calls were not yet ready'),
{
checkpointId: continuation.checkpointId,
runId: continuation.runId,
retry: resumeRetries,
missingToolCallIds,
}
)
await new Promise((resolve) => setTimeout(resolve, 250 * resumeRetries))
continue
}
logger.error(
withLogContext(
'Async continuation failed because pending tool calls never became ready'
),
{
checkpointId: continuation.checkpointId,
runId: continuation.runId,
missingToolCallIds,
}
)
throw new Error(
`Failed to resume async tool continuation: pending tool calls were not ready (${missingToolCallIds.join(', ')})`
)
@@ -292,14 +380,25 @@ export async function orchestrateCopilotStream(
if (readyTools.length === 0) {
if (resumeRetries < 3 && continuation.pendingToolCallIds.length > 0) {
resumeRetries++
logger.info('Retrying async resume because no tool calls were ready yet', {
checkpointId: continuation.checkpointId,
runId: continuation.runId,
retry: resumeRetries,
})
logger.info(
withLogContext('Retrying async resume because no tool calls were ready yet'),
{
checkpointId: continuation.checkpointId,
runId: continuation.runId,
retry: resumeRetries,
}
)
await new Promise((resolve) => setTimeout(resolve, 250 * resumeRetries))
continue
}
logger.error(
withLogContext('Async continuation failed because no tool calls were ready'),
{
checkpointId: continuation.checkpointId,
runId: continuation.runId,
requestedToolCallIds: continuation.pendingToolCallIds,
}
)
throw new Error('Failed to resume async tool continuation: no tool calls were ready')
}
@@ -320,12 +419,15 @@ export async function orchestrateCopilotStream(
if (claimFailures.length > 0) {
if (newlyClaimedToolCallIds.length > 0) {
logger.info('Releasing async tool claims after claim contention during resume', {
checkpointId: continuation.checkpointId,
runId: continuation.runId,
newlyClaimedToolCallIds,
claimFailures,
})
logger.info(
withLogContext('Releasing async tool claims after claim contention during resume'),
{
checkpointId: continuation.checkpointId,
runId: continuation.runId,
newlyClaimedToolCallIds,
claimFailures,
}
)
await Promise.all(
newlyClaimedToolCallIds.map((toolCallId) =>
releaseCompletedAsyncToolClaim(toolCallId, resumeWorkerId).catch(() => null)
@@ -334,7 +436,7 @@ export async function orchestrateCopilotStream(
}
if (resumeRetries < 3) {
resumeRetries++
logger.info('Retrying async resume after claim contention', {
logger.info(withLogContext('Retrying async resume after claim contention'), {
checkpointId: continuation.checkpointId,
runId: continuation.runId,
retry: resumeRetries,
@@ -343,6 +445,14 @@ export async function orchestrateCopilotStream(
await new Promise((resolve) => setTimeout(resolve, 250 * resumeRetries))
continue
}
logger.error(
withLogContext('Async continuation failed because tool claims could not be acquired'),
{
checkpointId: continuation.checkpointId,
runId: continuation.runId,
claimFailures,
}
)
throw new Error(
`Failed to resume async tool continuation: unable to claim tool calls (${claimFailures.join(', ')})`
)
@@ -356,7 +466,7 @@ export async function orchestrateCopilotStream(
]
claimedByWorkerId = claimedToolCallIds.length > 0 ? resumeWorkerId : null
logger.info('Resuming async tool continuation', {
logger.info(withLogContext('Resuming async tool continuation'), {
checkpointId: continuation.checkpointId,
runId: continuation.runId,
toolCallIds: readyTools.map((tool) => tool.toolCallId),
@@ -395,10 +505,15 @@ export async function orchestrateCopilotStream(
!isTerminalAsyncStatus(durableStatus) &&
!isDeliveredAsyncStatus(durableStatus)
) {
logger.warn('Async tool row was claimed for resume without terminal durable state', {
toolCallId: tool.toolCallId,
status: durableStatus,
})
logger.warn(
withLogContext(
'Async tool row was claimed for resume without terminal durable state'
),
{
toolCallId: tool.toolCallId,
status: durableStatus,
}
)
}
return {
@@ -416,11 +531,20 @@ export async function orchestrateCopilotStream(
checkpointId: continuation.checkpointId,
results,
}
logger.info(withLogContext('Prepared async continuation payload for resume endpoint'), {
route,
checkpointId: continuation.checkpointId,
resultCount: results.length,
})
resumeReady = true
break
}
if (!resumeReady) {
logger.warn(withLogContext('Async continuation loop exited without resume payload'), {
checkpointId: continuation.checkpointId,
runId: continuation.runId,
})
break
}
}
@@ -436,12 +560,19 @@ export async function orchestrateCopilotStream(
usage: context.usage,
cost: context.cost,
}
logger.info(withLogContext('Completing copilot orchestration'), {
success: result.success,
chatId: result.chatId,
hasRequestId: Boolean(result.requestId),
errorCount: result.errors?.length || 0,
toolCallCount: result.toolCalls.length,
})
await options.onComplete?.(result)
return result
} catch (error) {
const err = error instanceof Error ? error : new Error('Copilot orchestration failed')
if (claimedToolCallIds.length > 0 && claimedByWorkerId) {
logger.warn('Releasing async tool claims after delivery failure', {
logger.warn(withLogContext('Releasing async tool claims after delivery failure'), {
toolCallIds: claimedToolCallIds,
workerId: claimedByWorkerId,
})
@@ -451,7 +582,9 @@ export async function orchestrateCopilotStream(
)
)
}
logger.error('Copilot orchestration failed', { error: err.message })
logger.error(withLogContext('Copilot orchestration failed'), {
error: err.message,
})
await options.onError?.(err)
return {
success: false,

View File

@@ -148,7 +148,8 @@ describe('sse-handlers tool lifecycle', () => {
'read',
499,
'Request aborted during tool execution',
{ cancelled: true }
{ cancelled: true },
'msg-1'
)
const updated = context.toolCalls.get('tool-cancel')

View File

@@ -1,6 +1,7 @@
import { createLogger } from '@sim/logger'
import { upsertAsyncToolCall } from '@/lib/copilot/async-runs/repository'
import { STREAM_TIMEOUT_MS } from '@/lib/copilot/constants'
import { appendCopilotLogContext } from '@/lib/copilot/logging'
import {
asRecord,
getEventData,
@@ -53,13 +54,25 @@ function abortPendingToolIfStreamDead(
toolCall.status = 'cancelled'
toolCall.endTime = Date.now()
markToolResultSeen(toolCallId)
markToolComplete(toolCall.id, toolCall.name, 499, 'Request aborted before tool execution', {
cancelled: true,
}).catch((err) => {
logger.error('markToolComplete fire-and-forget failed (stream aborted)', {
toolCallId: toolCall.id,
error: err instanceof Error ? err.message : String(err),
})
markToolComplete(
toolCall.id,
toolCall.name,
499,
'Request aborted before tool execution',
{
cancelled: true,
},
context.messageId
).catch((err) => {
logger.error(
appendCopilotLogContext('markToolComplete fire-and-forget failed (stream aborted)', {
messageId: context.messageId,
}),
{
toolCallId: toolCall.id,
error: err instanceof Error ? err.message : String(err),
}
)
})
return true
}
@@ -90,7 +103,8 @@ function getEventUI(event: SSEEvent): {
function handleClientCompletion(
toolCall: ToolCallState,
toolCallId: string,
completion: { status: string; message?: string; data?: Record<string, unknown> } | null
completion: { status: string; message?: string; data?: Record<string, unknown> } | null,
context: StreamingContext
): void {
if (completion?.status === 'background') {
toolCall.status = 'skipped'
@@ -100,12 +114,18 @@ function handleClientCompletion(
toolCall.name,
202,
completion.message || 'Tool execution moved to background',
{ background: true }
{ background: true },
context.messageId
).catch((err) => {
logger.error('markToolComplete fire-and-forget failed (client background)', {
toolCallId: toolCall.id,
error: err instanceof Error ? err.message : String(err),
})
logger.error(
appendCopilotLogContext('markToolComplete fire-and-forget failed (client background)', {
messageId: context.messageId,
}),
{
toolCallId: toolCall.id,
error: err instanceof Error ? err.message : String(err),
}
)
})
markToolResultSeen(toolCallId)
return
@@ -117,12 +137,19 @@ function handleClientCompletion(
toolCall.id,
toolCall.name,
400,
completion.message || 'Tool execution rejected'
completion.message || 'Tool execution rejected',
undefined,
context.messageId
).catch((err) => {
logger.error('markToolComplete fire-and-forget failed (client rejected)', {
toolCallId: toolCall.id,
error: err instanceof Error ? err.message : String(err),
})
logger.error(
appendCopilotLogContext('markToolComplete fire-and-forget failed (client rejected)', {
messageId: context.messageId,
}),
{
toolCallId: toolCall.id,
error: err instanceof Error ? err.message : String(err),
}
)
})
markToolResultSeen(toolCallId)
return
@@ -135,12 +162,18 @@ function handleClientCompletion(
toolCall.name,
499,
completion.message || 'Workflow execution was stopped manually by the user.',
completion.data
completion.data,
context.messageId
).catch((err) => {
logger.error('markToolComplete fire-and-forget failed (client cancelled)', {
toolCallId: toolCall.id,
error: err instanceof Error ? err.message : String(err),
})
logger.error(
appendCopilotLogContext('markToolComplete fire-and-forget failed (client cancelled)', {
messageId: context.messageId,
}),
{
toolCallId: toolCall.id,
error: err instanceof Error ? err.message : String(err),
}
)
})
markToolResultSeen(toolCallId)
return
@@ -149,15 +182,25 @@ function handleClientCompletion(
toolCall.status = success ? 'success' : 'error'
toolCall.endTime = Date.now()
const msg = completion?.message || (success ? 'Tool completed' : 'Tool failed or timed out')
markToolComplete(toolCall.id, toolCall.name, success ? 200 : 500, msg, completion?.data).catch(
(err) => {
logger.error('markToolComplete fire-and-forget failed (client completion)', {
markToolComplete(
toolCall.id,
toolCall.name,
success ? 200 : 500,
msg,
completion?.data,
context.messageId
).catch((err) => {
logger.error(
appendCopilotLogContext('markToolComplete fire-and-forget failed (client completion)', {
messageId: context.messageId,
}),
{
toolCallId: toolCall.id,
toolName: toolCall.name,
error: err instanceof Error ? err.message : String(err),
})
}
)
}
)
})
markToolResultSeen(toolCallId)
}
@@ -170,7 +213,8 @@ async function emitSyntheticToolResult(
toolCallId: string,
toolName: string,
completion: { status: string; message?: string; data?: Record<string, unknown> } | null,
options: OrchestratorOptions
options: OrchestratorOptions,
context: StreamingContext
): Promise<void> {
const success = completion?.status === 'success'
const isCancelled = completion?.status === 'cancelled'
@@ -189,11 +233,16 @@ async function emitSyntheticToolResult(
error: !success ? completion?.message : undefined,
} as SSEEvent)
} catch (error) {
logger.warn('Failed to emit synthetic tool_result', {
toolCallId,
toolName,
error: error instanceof Error ? error.message : String(error),
})
logger.warn(
appendCopilotLogContext('Failed to emit synthetic tool_result', {
messageId: context.messageId,
}),
{
toolCallId,
toolName,
error: error instanceof Error ? error.message : String(error),
}
)
}
}
@@ -260,6 +309,17 @@ export const sseHandlers: Record<string, SSEHandler> = {
const rid = typeof event.data === 'string' ? event.data : undefined
if (rid) {
context.requestId = rid
logger.info(
appendCopilotLogContext('Mapped copilot message to Go trace ID', {
messageId: context.messageId,
}),
{
goTraceId: rid,
chatId: context.chatId,
executionId: context.executionId,
runId: context.runId,
}
)
}
},
title_updated: () => {},
@@ -406,19 +466,29 @@ export const sseHandlers: Record<string, SSEHandler> = {
args,
})
} catch (err) {
logger.warn('Failed to persist async tool row before execution', {
toolCallId,
toolName,
error: err instanceof Error ? err.message : String(err),
})
logger.warn(
appendCopilotLogContext('Failed to persist async tool row before execution', {
messageId: context.messageId,
}),
{
toolCallId,
toolName,
error: err instanceof Error ? err.message : String(err),
}
)
}
return executeToolAndReport(toolCallId, context, execContext, options)
})().catch((err) => {
logger.error('Parallel tool execution failed', {
toolCallId,
toolName,
error: err instanceof Error ? err.message : String(err),
})
logger.error(
appendCopilotLogContext('Parallel tool execution failed', {
messageId: context.messageId,
}),
{
toolCallId,
toolName,
error: err instanceof Error ? err.message : String(err),
}
)
return {
status: 'error',
message: err instanceof Error ? err.message : String(err),
@@ -457,19 +527,24 @@ export const sseHandlers: Record<string, SSEHandler> = {
args,
status: 'running',
}).catch((err) => {
logger.warn('Failed to persist async tool row for client-executable tool', {
toolCallId,
toolName,
error: err instanceof Error ? err.message : String(err),
})
logger.warn(
appendCopilotLogContext('Failed to persist async tool row for client-executable tool', {
messageId: context.messageId,
}),
{
toolCallId,
toolName,
error: err instanceof Error ? err.message : String(err),
}
)
})
const completion = await waitForToolCompletion(
toolCallId,
options.timeout || STREAM_TIMEOUT_MS,
options.abortSignal
)
handleClientCompletion(toolCall, toolCallId, completion)
await emitSyntheticToolResult(toolCallId, toolCall.name, completion, options)
handleClientCompletion(toolCall, toolCallId, completion, context)
await emitSyntheticToolResult(toolCallId, toolCall.name, completion, options, context)
}
return
}
@@ -651,19 +726,29 @@ export const subAgentHandlers: Record<string, SSEHandler> = {
args,
})
} catch (err) {
logger.warn('Failed to persist async subagent tool row before execution', {
toolCallId,
toolName,
error: err instanceof Error ? err.message : String(err),
})
logger.warn(
appendCopilotLogContext('Failed to persist async subagent tool row before execution', {
messageId: context.messageId,
}),
{
toolCallId,
toolName,
error: err instanceof Error ? err.message : String(err),
}
)
}
return executeToolAndReport(toolCallId, context, execContext, options)
})().catch((err) => {
logger.error('Parallel subagent tool execution failed', {
toolCallId,
toolName,
error: err instanceof Error ? err.message : String(err),
})
logger.error(
appendCopilotLogContext('Parallel subagent tool execution failed', {
messageId: context.messageId,
}),
{
toolCallId,
toolName,
error: err instanceof Error ? err.message : String(err),
}
)
return {
status: 'error',
message: err instanceof Error ? err.message : String(err),
@@ -697,19 +782,25 @@ export const subAgentHandlers: Record<string, SSEHandler> = {
args,
status: 'running',
}).catch((err) => {
logger.warn('Failed to persist async tool row for client-executable subagent tool', {
toolCallId,
toolName,
error: err instanceof Error ? err.message : String(err),
})
logger.warn(
appendCopilotLogContext(
'Failed to persist async tool row for client-executable subagent tool',
{ messageId: context.messageId }
),
{
toolCallId,
toolName,
error: err instanceof Error ? err.message : String(err),
}
)
})
const completion = await waitForToolCompletion(
toolCallId,
options.timeout || STREAM_TIMEOUT_MS,
options.abortSignal
)
handleClientCompletion(toolCall, toolCallId, completion)
await emitSyntheticToolResult(toolCallId, toolCall.name, completion, options)
handleClientCompletion(toolCall, toolCallId, completion, context)
await emitSyntheticToolResult(toolCallId, toolCall.name, completion, options, context)
}
return
}
@@ -769,10 +860,15 @@ export const subAgentHandlers: Record<string, SSEHandler> = {
export function handleSubagentRouting(event: SSEEvent, context: StreamingContext): boolean {
if (!event.subagent) return false
if (!context.subAgentParentToolCallId) {
logger.warn('Subagent event missing parent tool call', {
type: event.type,
subagent: event.subagent,
})
logger.warn(
appendCopilotLogContext('Subagent event missing parent tool call', {
messageId: context.messageId,
}),
{
type: event.type,
subagent: event.subagent,
}
)
return false
}
return true

View File

@@ -3,6 +3,7 @@ import { userTableRows } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { eq } from 'drizzle-orm'
import { completeAsyncToolCall, markAsyncToolRunning } from '@/lib/copilot/async-runs/repository'
import { appendCopilotLogContext } from '@/lib/copilot/logging'
import { waitForToolConfirmation } from '@/lib/copilot/orchestrator/persistence'
import { asRecord, markToolResultSeen } from '@/lib/copilot/orchestrator/sse/utils'
import { executeToolServerSide, markToolComplete } from '@/lib/copilot/orchestrator/tool-executor'
@@ -186,12 +187,15 @@ async function maybeWriteOutputToFile(
contentType
)
logger.info('Tool output written to file', {
toolName,
fileName,
size: buffer.length,
fileId: uploaded.id,
})
logger.info(
appendCopilotLogContext('Tool output written to file', { messageId: context.messageId }),
{
toolName,
fileName,
size: buffer.length,
fileId: uploaded.id,
}
)
return {
success: true,
@@ -205,11 +209,16 @@ async function maybeWriteOutputToFile(
}
} catch (err) {
const message = err instanceof Error ? err.message : String(err)
logger.warn('Failed to write tool output to file', {
toolName,
outputPath,
error: message,
})
logger.warn(
appendCopilotLogContext('Failed to write tool output to file', {
messageId: context.messageId,
}),
{
toolName,
outputPath,
error: message,
}
)
return {
success: false,
error: `Failed to write output file: ${message}`,
@@ -289,10 +298,11 @@ function terminalCompletionFromToolCall(toolCall: {
function reportCancelledTool(
toolCall: { id: string; name: string },
message: string,
messageId?: string,
data: Record<string, unknown> = { cancelled: true }
): void {
markToolComplete(toolCall.id, toolCall.name, 499, message, data).catch((err) => {
logger.error('markToolComplete failed (cancelled)', {
markToolComplete(toolCall.id, toolCall.name, 499, message, data, messageId).catch((err) => {
logger.error(appendCopilotLogContext('markToolComplete failed (cancelled)', { messageId }), {
toolCallId: toolCall.id,
toolName: toolCall.name,
error: err instanceof Error ? err.message : String(err),
@@ -387,11 +397,14 @@ async function maybeWriteOutputToTable(
}
})
logger.info('Tool output written to table', {
toolName,
tableId: outputTable,
rowCount: rows.length,
})
logger.info(
appendCopilotLogContext('Tool output written to table', { messageId: context.messageId }),
{
toolName,
tableId: outputTable,
rowCount: rows.length,
}
)
return {
success: true,
@@ -402,11 +415,16 @@ async function maybeWriteOutputToTable(
},
}
} catch (err) {
logger.warn('Failed to write tool output to table', {
toolName,
outputTable,
error: err instanceof Error ? err.message : String(err),
})
logger.warn(
appendCopilotLogContext('Failed to write tool output to table', {
messageId: context.messageId,
}),
{
toolName,
outputTable,
error: err instanceof Error ? err.message : String(err),
}
)
return {
success: false,
error: `Failed to write to table: ${err instanceof Error ? err.message : String(err)}`,
@@ -506,13 +524,16 @@ async function maybeWriteReadCsvToTable(
}
})
logger.info('Read output written to table', {
toolName,
tableId: outputTable,
tableName: table.name,
rowCount: rows.length,
filePath,
})
logger.info(
appendCopilotLogContext('Read output written to table', { messageId: context.messageId }),
{
toolName,
tableId: outputTable,
tableName: table.name,
rowCount: rows.length,
filePath,
}
)
return {
success: true,
@@ -524,11 +545,16 @@ async function maybeWriteReadCsvToTable(
},
}
} catch (err) {
logger.warn('Failed to write read output to table', {
toolName,
outputTable,
error: err instanceof Error ? err.message : String(err),
})
logger.warn(
appendCopilotLogContext('Failed to write read output to table', {
messageId: context.messageId,
}),
{
toolName,
outputTable,
error: err instanceof Error ? err.message : String(err),
}
)
return {
success: false,
error: `Failed to import into table: ${err instanceof Error ? err.message : String(err)}`,
@@ -562,14 +588,14 @@ export async function executeToolAndReport(
result: { cancelled: true },
error: 'Request aborted before tool execution',
}).catch(() => {})
reportCancelledTool(toolCall, 'Request aborted before tool execution')
reportCancelledTool(toolCall, 'Request aborted before tool execution', context.messageId)
return cancelledCompletion('Request aborted before tool execution')
}
toolCall.status = 'executing'
await markAsyncToolRunning(toolCall.id, 'sim-stream').catch(() => {})
logger.info('Tool execution started', {
logger.info(appendCopilotLogContext('Tool execution started', { messageId: context.messageId }), {
toolCallId: toolCall.id,
toolName: toolCall.name,
params: toolCall.params,
@@ -590,7 +616,7 @@ export async function executeToolAndReport(
result: { cancelled: true },
error: 'Request aborted during tool execution',
}).catch(() => {})
reportCancelledTool(toolCall, 'Request aborted during tool execution')
reportCancelledTool(toolCall, 'Request aborted during tool execution', context.messageId)
return cancelledCompletion('Request aborted during tool execution')
}
result = await maybeWriteOutputToFile(toolCall.name, toolCall.params, result, execContext)
@@ -604,7 +630,11 @@ export async function executeToolAndReport(
result: { cancelled: true },
error: 'Request aborted during tool post-processing',
}).catch(() => {})
reportCancelledTool(toolCall, 'Request aborted during tool post-processing')
reportCancelledTool(
toolCall,
'Request aborted during tool post-processing',
context.messageId
)
return cancelledCompletion('Request aborted during tool post-processing')
}
result = await maybeWriteOutputToTable(toolCall.name, toolCall.params, result, execContext)
@@ -618,7 +648,11 @@ export async function executeToolAndReport(
result: { cancelled: true },
error: 'Request aborted during tool post-processing',
}).catch(() => {})
reportCancelledTool(toolCall, 'Request aborted during tool post-processing')
reportCancelledTool(
toolCall,
'Request aborted during tool post-processing',
context.messageId
)
return cancelledCompletion('Request aborted during tool post-processing')
}
result = await maybeWriteReadCsvToTable(toolCall.name, toolCall.params, result, execContext)
@@ -632,7 +666,11 @@ export async function executeToolAndReport(
result: { cancelled: true },
error: 'Request aborted during tool post-processing',
}).catch(() => {})
reportCancelledTool(toolCall, 'Request aborted during tool post-processing')
reportCancelledTool(
toolCall,
'Request aborted during tool post-processing',
context.messageId
)
return cancelledCompletion('Request aborted during tool post-processing')
}
toolCall.status = result.success ? 'success' : 'error'
@@ -648,18 +686,24 @@ export async function executeToolAndReport(
: raw && typeof raw === 'object'
? JSON.stringify(raw).slice(0, 200)
: undefined
logger.info('Tool execution succeeded', {
toolCallId: toolCall.id,
toolName: toolCall.name,
outputPreview: preview,
})
logger.info(
appendCopilotLogContext('Tool execution succeeded', { messageId: context.messageId }),
{
toolCallId: toolCall.id,
toolName: toolCall.name,
outputPreview: preview,
}
)
} else {
logger.warn('Tool execution failed', {
toolCallId: toolCall.id,
toolName: toolCall.name,
error: result.error,
params: toolCall.params,
})
logger.warn(
appendCopilotLogContext('Tool execution failed', { messageId: context.messageId }),
{
toolCallId: toolCall.id,
toolName: toolCall.name,
error: result.error,
params: toolCall.params,
}
)
}
// If create_workflow was successful, update the execution context with the new workflowId.
@@ -687,7 +731,11 @@ export async function executeToolAndReport(
if (abortRequested(context, execContext, options)) {
toolCall.status = 'cancelled'
reportCancelledTool(toolCall, 'Request aborted before tool result delivery')
reportCancelledTool(
toolCall,
'Request aborted before tool result delivery',
context.messageId
)
return cancelledCompletion('Request aborted before tool result delivery')
}
@@ -702,13 +750,19 @@ export async function executeToolAndReport(
toolCall.name,
result.success ? 200 : 500,
result.error || (result.success ? 'Tool completed' : 'Tool failed'),
result.output
result.output,
context.messageId
).catch((err) => {
logger.error('markToolComplete fire-and-forget failed', {
toolCallId: toolCall.id,
toolName: toolCall.name,
error: err instanceof Error ? err.message : String(err),
})
logger.error(
appendCopilotLogContext('markToolComplete fire-and-forget failed', {
messageId: context.messageId,
}),
{
toolCallId: toolCall.id,
toolName: toolCall.name,
error: err instanceof Error ? err.message : String(err),
}
)
})
const resultEvent: SSEEvent = {
@@ -743,10 +797,15 @@ export async function executeToolAndReport(
if (deleted.length > 0) {
isDeleteOp = true
removeChatResources(execContext.chatId, deleted).catch((err) => {
logger.warn('Failed to remove chat resources after deletion', {
chatId: execContext.chatId,
error: err instanceof Error ? err.message : String(err),
})
logger.warn(
appendCopilotLogContext('Failed to remove chat resources after deletion', {
messageId: context.messageId,
}),
{
chatId: execContext.chatId,
error: err instanceof Error ? err.message : String(err),
}
)
})
for (const resource of deleted) {
@@ -769,10 +828,15 @@ export async function executeToolAndReport(
if (resources.length > 0) {
persistChatResources(execContext.chatId, resources).catch((err) => {
logger.warn('Failed to persist chat resources', {
chatId: execContext.chatId,
error: err instanceof Error ? err.message : String(err),
})
logger.warn(
appendCopilotLogContext('Failed to persist chat resources', {
messageId: context.messageId,
}),
{
chatId: execContext.chatId,
error: err instanceof Error ? err.message : String(err),
}
)
})
for (const resource of resources) {
@@ -801,19 +865,22 @@ export async function executeToolAndReport(
result: { cancelled: true },
error: 'Request aborted during tool execution',
}).catch(() => {})
reportCancelledTool(toolCall, 'Request aborted during tool execution')
reportCancelledTool(toolCall, 'Request aborted during tool execution', context.messageId)
return cancelledCompletion('Request aborted during tool execution')
}
toolCall.status = 'error'
toolCall.error = error instanceof Error ? error.message : String(error)
toolCall.endTime = Date.now()
logger.error('Tool execution threw', {
toolCallId: toolCall.id,
toolName: toolCall.name,
error: toolCall.error,
params: toolCall.params,
})
logger.error(
appendCopilotLogContext('Tool execution threw', { messageId: context.messageId }),
{
toolCallId: toolCall.id,
toolName: toolCall.name,
error: toolCall.error,
params: toolCall.params,
}
)
markToolResultSeen(toolCall.id)
await completeAsyncToolCall({
@@ -825,14 +892,26 @@ export async function executeToolAndReport(
// Fire-and-forget (same reasoning as above).
// Pass error as structured data so the Go side can surface it to the LLM.
markToolComplete(toolCall.id, toolCall.name, 500, toolCall.error, {
error: toolCall.error,
}).catch((err) => {
logger.error('markToolComplete fire-and-forget failed', {
toolCallId: toolCall.id,
toolName: toolCall.name,
error: err instanceof Error ? err.message : String(err),
})
markToolComplete(
toolCall.id,
toolCall.name,
500,
toolCall.error,
{
error: toolCall.error,
},
context.messageId
).catch((err) => {
logger.error(
appendCopilotLogContext('markToolComplete fire-and-forget failed', {
messageId: context.messageId,
}),
{
toolCallId: toolCall.id,
toolName: toolCall.name,
error: err instanceof Error ? err.message : String(err),
}
)
})
const errorEvent: SSEEvent = {

View File

@@ -2,6 +2,7 @@ import { createLogger } from '@sim/logger'
import { getHighestPrioritySubscription } from '@/lib/billing/core/plan'
import { isPaid } from '@/lib/billing/plan-helpers'
import { ORCHESTRATION_TIMEOUT_MS } from '@/lib/copilot/constants'
import { appendCopilotLogContext } from '@/lib/copilot/logging'
import {
handleSubagentRouting,
sseHandlers,
@@ -164,10 +165,13 @@ export async function runStreamLoop(
try {
await options.onEvent?.(normalizedEvent)
} catch (error) {
logger.warn('Failed to forward SSE event', {
type: normalizedEvent.type,
error: error instanceof Error ? error.message : String(error),
})
logger.warn(
appendCopilotLogContext('Failed to forward SSE event', { messageId: context.messageId }),
{
type: normalizedEvent.type,
error: error instanceof Error ? error.message : String(error),
}
)
}
// Let the caller intercept before standard dispatch.
@@ -201,7 +205,11 @@ export async function runStreamLoop(
if (context.subAgentParentStack.length > 0) {
context.subAgentParentStack.pop()
} else {
logger.warn('subagent_end without matching subagent_start')
logger.warn(
appendCopilotLogContext('subagent_end without matching subagent_start', {
messageId: context.messageId,
})
)
}
context.subAgentParentToolCallId =
context.subAgentParentStack.length > 0

View File

@@ -3,6 +3,7 @@ import { credential, mcpServers, pendingCredentialDraft, user } from '@sim/db/sc
import { createLogger } from '@sim/logger'
import { and, eq, isNull, lt } from 'drizzle-orm'
import { SIM_AGENT_API_URL } from '@/lib/copilot/constants'
import { appendCopilotLogContext } from '@/lib/copilot/logging'
import type {
ExecutionContext,
ToolCallResult,
@@ -321,12 +322,17 @@ async function executeManageCustomTool(
error: `Unsupported operation for manage_custom_tool: ${operation}`,
}
} catch (error) {
logger.error('manage_custom_tool execution failed', {
operation,
workspaceId,
userId: context.userId,
error: error instanceof Error ? error.message : String(error),
})
logger.error(
appendCopilotLogContext('manage_custom_tool execution failed', {
messageId: context.messageId,
}),
{
operation,
workspaceId,
userId: context.userId,
error: error instanceof Error ? error.message : String(error),
}
)
return {
success: false,
error: error instanceof Error ? error.message : 'Failed to manage custom tool',
@@ -553,11 +559,16 @@ async function executeManageMcpTool(
return { success: false, error: `Unsupported operation for manage_mcp_tool: ${operation}` }
} catch (error) {
logger.error('manage_mcp_tool execution failed', {
operation,
workspaceId,
error: error instanceof Error ? error.message : String(error),
})
logger.error(
appendCopilotLogContext('manage_mcp_tool execution failed', {
messageId: context.messageId,
}),
{
operation,
workspaceId,
error: error instanceof Error ? error.message : String(error),
}
)
return {
success: false,
error: error instanceof Error ? error.message : 'Failed to manage MCP server',
@@ -716,11 +727,16 @@ async function executeManageSkill(
return { success: false, error: `Unsupported operation for manage_skill: ${operation}` }
} catch (error) {
logger.error('manage_skill execution failed', {
operation,
workspaceId,
error: error instanceof Error ? error.message : String(error),
})
logger.error(
appendCopilotLogContext('manage_skill execution failed', {
messageId: context.messageId,
}),
{
operation,
workspaceId,
error: error instanceof Error ? error.message : String(error),
}
)
return {
success: false,
error: error instanceof Error ? error.message : 'Failed to manage skill',
@@ -992,10 +1008,15 @@ const SIM_WORKFLOW_TOOL_HANDLERS: Record<
},
}
} catch (err) {
logger.warn('Failed to generate OAuth link, falling back to generic URL', {
providerName,
error: err instanceof Error ? err.message : String(err),
})
logger.warn(
appendCopilotLogContext('Failed to generate OAuth link, falling back to generic URL', {
messageId: c.messageId,
}),
{
providerName,
error: err instanceof Error ? err.message : String(err),
}
)
const workspaceUrl = c.workspaceId
? `${baseUrl}/workspace/${c.workspaceId}`
: `${baseUrl}/workspace`
@@ -1179,7 +1200,12 @@ export async function executeToolServerSide(
const toolConfig = getTool(resolvedToolName)
if (!toolConfig) {
logger.warn('Tool not found in registry', { toolName, resolvedToolName })
logger.warn(
appendCopilotLogContext('Tool not found in registry', {
messageId: context.messageId,
}),
{ toolName, resolvedToolName }
)
return {
success: false,
error: `Tool not found: ${toolName}`,
@@ -1241,6 +1267,7 @@ async function executeServerToolDirect(
workspaceId: context.workspaceId,
userPermission: context.userPermission,
chatId: context.chatId,
messageId: context.messageId,
abortSignal: context.abortSignal,
})
@@ -1266,10 +1293,15 @@ async function executeServerToolDirect(
return { success: true, output: result }
} catch (error) {
logger.error('Server tool execution failed', {
toolName,
error: error instanceof Error ? error.message : String(error),
})
logger.error(
appendCopilotLogContext('Server tool execution failed', {
messageId: context.messageId,
}),
{
toolName,
error: error instanceof Error ? error.message : String(error),
}
)
return {
success: false,
error: error instanceof Error ? error.message : 'Server tool execution failed',
@@ -1320,7 +1352,8 @@ export async function markToolComplete(
toolName: string,
status: number,
message?: unknown,
data?: unknown
data?: unknown,
messageId?: string
): Promise<boolean> {
try {
const controller = new AbortController()
@@ -1344,7 +1377,11 @@ export async function markToolComplete(
})
if (!response.ok) {
logger.warn('Mark-complete call failed', { toolCallId, toolName, status: response.status })
logger.warn(appendCopilotLogContext('Mark-complete call failed', { messageId }), {
toolCallId,
toolName,
status: response.status,
})
return false
}
@@ -1354,7 +1391,7 @@ export async function markToolComplete(
}
} catch (error) {
const isTimeout = error instanceof DOMException && error.name === 'AbortError'
logger.error('Mark-complete call failed', {
logger.error(appendCopilotLogContext('Mark-complete call failed', { messageId }), {
toolCallId,
toolName,
timedOut: isTimeout,

View File

@@ -195,6 +195,7 @@ export interface ExecutionContext {
workflowId: string
workspaceId?: string
chatId?: string
messageId?: string
executionId?: string
runId?: string
abortSignal?: AbortSignal

View File

@@ -5,6 +5,7 @@ export interface ServerToolContext {
workspaceId?: string
userPermission?: string
chatId?: string
messageId?: string
abortSignal?: AbortSignal
}

View File

@@ -1,5 +1,6 @@
import { createLogger } from '@sim/logger'
import { z } from 'zod'
import { appendCopilotLogContext } from '@/lib/copilot/logging'
import {
assertServerToolNotAborted,
type BaseServerTool,
@@ -124,6 +125,9 @@ export const downloadToWorkspaceFileServerTool: BaseServerTool<
params: DownloadToWorkspaceFileArgs,
context?: ServerToolContext
): Promise<DownloadToWorkspaceFileResult> {
const withMessageId = (message: string) =>
appendCopilotLogContext(message, { messageId: context?.messageId })
if (!context?.userId) {
throw new Error('Authentication required')
}
@@ -174,7 +178,7 @@ export const downloadToWorkspaceFileServerTool: BaseServerTool<
mimeType
)
logger.info('Downloaded remote file to workspace', {
logger.info(withMessageId('Downloaded remote file to workspace'), {
sourceUrl: params.url,
fileId: uploaded.id,
fileName: uploaded.name,
@@ -191,7 +195,10 @@ export const downloadToWorkspaceFileServerTool: BaseServerTool<
}
} catch (error) {
const msg = error instanceof Error ? error.message : 'Unknown error'
logger.error('Failed to download file to workspace', { url: params.url, error: msg })
logger.error(withMessageId('Failed to download file to workspace'), {
url: params.url,
error: msg,
})
return { success: false, message: `Failed to download file: ${msg}` }
}
},

View File

@@ -1,4 +1,5 @@
import { createLogger } from '@sim/logger'
import { appendCopilotLogContext } from '@/lib/copilot/logging'
import {
assertServerToolNotAborted,
type BaseServerTool,
@@ -50,8 +51,11 @@ export const workspaceFileServerTool: BaseServerTool<WorkspaceFileArgs, Workspac
params: WorkspaceFileArgs,
context?: ServerToolContext
): Promise<WorkspaceFileResult> {
const withMessageId = (message: string) =>
appendCopilotLogContext(message, { messageId: context?.messageId })
if (!context?.userId) {
logger.error('Unauthorized attempt to access workspace files')
logger.error(withMessageId('Unauthorized attempt to access workspace files'))
throw new Error('Authentication required')
}
@@ -90,7 +94,7 @@ export const workspaceFileServerTool: BaseServerTool<WorkspaceFileArgs, Workspac
await generatePptxFromCode(content, workspaceId)
} catch (err) {
const msg = err instanceof Error ? err.message : String(err)
logger.error('PPTX code validation failed', { error: msg, fileName })
logger.error(withMessageId('PPTX code validation failed'), { error: msg, fileName })
return {
success: false,
message: `PPTX generation failed: ${msg}. Fix the pptxgenjs code and retry.`,
@@ -112,7 +116,7 @@ export const workspaceFileServerTool: BaseServerTool<WorkspaceFileArgs, Workspac
contentType
)
logger.info('Workspace file written via copilot', {
logger.info(withMessageId('Workspace file written via copilot'), {
fileId: result.id,
name: fileName,
size: fileBuffer.length,
@@ -173,7 +177,7 @@ export const workspaceFileServerTool: BaseServerTool<WorkspaceFileArgs, Workspac
isPptxUpdate ? PPTX_SOURCE_MIME : undefined
)
logger.info('Workspace file updated via copilot', {
logger.info(withMessageId('Workspace file updated via copilot'), {
fileId,
name: fileRecord.name,
size: fileBuffer.length,
@@ -215,7 +219,7 @@ export const workspaceFileServerTool: BaseServerTool<WorkspaceFileArgs, Workspac
assertServerToolNotAborted(context)
await renameWorkspaceFile(workspaceId, fileId, newName)
logger.info('Workspace file renamed via copilot', {
logger.info(withMessageId('Workspace file renamed via copilot'), {
fileId,
oldName,
newName,
@@ -243,7 +247,7 @@ export const workspaceFileServerTool: BaseServerTool<WorkspaceFileArgs, Workspac
assertServerToolNotAborted(context)
await deleteWorkspaceFile(workspaceId, fileId)
logger.info('Workspace file deleted via copilot', {
logger.info(withMessageId('Workspace file deleted via copilot'), {
fileId,
name: fileRecord.name,
userId: context.userId,
@@ -320,7 +324,7 @@ export const workspaceFileServerTool: BaseServerTool<WorkspaceFileArgs, Workspac
isPptxPatch ? PPTX_SOURCE_MIME : undefined
)
logger.info('Workspace file patched via copilot', {
logger.info(withMessageId('Workspace file patched via copilot'), {
fileId,
name: fileRecord.name,
editCount: edits.length,
@@ -346,7 +350,7 @@ export const workspaceFileServerTool: BaseServerTool<WorkspaceFileArgs, Workspac
}
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error occurred'
logger.error('Error in workspace_file tool', {
logger.error(withMessageId('Error in workspace_file tool'), {
operation,
error: errorMessage,
userId: context.userId,

View File

@@ -1,5 +1,6 @@
import { GoogleGenAI, type Part } from '@google/genai'
import { createLogger } from '@sim/logger'
import { appendCopilotLogContext } from '@/lib/copilot/logging'
import {
assertServerToolNotAborted,
type BaseServerTool,
@@ -60,6 +61,9 @@ export const generateImageServerTool: BaseServerTool<GenerateImageArgs, Generate
params: GenerateImageArgs,
context?: ServerToolContext
): Promise<GenerateImageResult> {
const withMessageId = (message: string) =>
appendCopilotLogContext(message, { messageId: context?.messageId })
if (!context?.userId) {
throw new Error('Authentication required')
}
@@ -93,17 +97,17 @@ export const generateImageServerTool: BaseServerTool<GenerateImageArgs, Generate
parts.push({
inlineData: { mimeType: mime, data: base64 },
})
logger.info('Loaded reference image', {
logger.info(withMessageId('Loaded reference image'), {
fileId,
name: fileRecord.name,
size: buffer.length,
mimeType: mime,
})
} else {
logger.warn('Reference file not found, skipping', { fileId })
logger.warn(withMessageId('Reference file not found, skipping'), { fileId })
}
} catch (err) {
logger.warn('Failed to load reference image, skipping', {
logger.warn(withMessageId('Failed to load reference image, skipping'), {
fileId,
error: err instanceof Error ? err.message : String(err),
})
@@ -117,7 +121,7 @@ export const generateImageServerTool: BaseServerTool<GenerateImageArgs, Generate
parts.push({ text: prompt + sizeInstruction })
logger.info('Generating image with Nano Banana 2', {
logger.info(withMessageId('Generating image with Nano Banana 2'), {
model: NANO_BANANA_MODEL,
aspectRatio,
promptLength: prompt.length,
@@ -182,7 +186,7 @@ export const generateImageServerTool: BaseServerTool<GenerateImageArgs, Generate
imageBuffer,
mimeType
)
logger.info('Generated image overwritten', {
logger.info(withMessageId('Generated image overwritten'), {
fileId: updated.id,
fileName: updated.name,
size: imageBuffer.length,
@@ -208,7 +212,7 @@ export const generateImageServerTool: BaseServerTool<GenerateImageArgs, Generate
mimeType
)
logger.info('Generated image saved', {
logger.info(withMessageId('Generated image saved'), {
fileId: uploaded.id,
fileName: uploaded.name,
size: imageBuffer.length,
@@ -225,7 +229,7 @@ export const generateImageServerTool: BaseServerTool<GenerateImageArgs, Generate
}
} catch (error) {
const msg = error instanceof Error ? error.message : 'Unknown error'
logger.error('Image generation failed', { error: msg })
logger.error(withMessageId('Image generation failed'), { error: msg })
return { success: false, message: `Failed to generate image: ${msg}` }
}
},

View File

@@ -2,6 +2,7 @@ import { db } from '@sim/db'
import { jobExecutionLogs } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, desc, eq } from 'drizzle-orm'
import { appendCopilotLogContext } from '@/lib/copilot/logging'
import type { BaseServerTool, ServerToolContext } from '@/lib/copilot/tools/server/base-tool'
import { checkWorkspaceAccess } from '@/lib/workspaces/permissions/utils'
@@ -85,6 +86,9 @@ function extractOutputAndError(executionData: any): {
export const getJobLogsServerTool: BaseServerTool<GetJobLogsArgs, JobLogEntry[]> = {
name: 'get_job_logs',
async execute(rawArgs: GetJobLogsArgs, context?: ServerToolContext): Promise<JobLogEntry[]> {
const withMessageId = (message: string) =>
appendCopilotLogContext(message, { messageId: context?.messageId })
const {
jobId,
executionId,
@@ -110,7 +114,12 @@ export const getJobLogsServerTool: BaseServerTool<GetJobLogsArgs, JobLogEntry[]>
const clampedLimit = Math.min(Math.max(1, limit), 5)
logger.info('Fetching job logs', { jobId, executionId, limit: clampedLimit, includeDetails })
logger.info(withMessageId('Fetching job logs'), {
jobId,
executionId,
limit: clampedLimit,
includeDetails,
})
const conditions = [eq(jobExecutionLogs.scheduleId, jobId)]
if (executionId) {
@@ -164,7 +173,7 @@ export const getJobLogsServerTool: BaseServerTool<GetJobLogsArgs, JobLogEntry[]>
return entry
})
logger.info('Job logs prepared', {
logger.info(withMessageId('Job logs prepared'), {
jobId,
count: entries.length,
resultSizeKB: Math.round(JSON.stringify(entries).length / 1024),

View File

@@ -3,6 +3,7 @@ import { knowledgeConnector } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq, isNull } from 'drizzle-orm'
import { generateInternalToken } from '@/lib/auth/internal'
import { appendCopilotLogContext } from '@/lib/copilot/logging'
import {
assertServerToolNotAborted,
type BaseServerTool,
@@ -47,8 +48,15 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
params: KnowledgeBaseArgs,
context?: ServerToolContext
): Promise<KnowledgeBaseResult> {
const withMessageId = (message: string) =>
appendCopilotLogContext(message, { messageId: context?.messageId })
if (!context?.userId) {
logger.error('Unauthorized attempt to access knowledge base - no authenticated user context')
logger.error(
withMessageId(
'Unauthorized attempt to access knowledge base - no authenticated user context'
)
)
throw new Error('Authentication required')
}
@@ -97,7 +105,7 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
requestId
)
logger.info('Knowledge base created via copilot', {
logger.info(withMessageId('Knowledge base created via copilot'), {
knowledgeBaseId: newKnowledgeBase.id,
name: newKnowledgeBase.name,
userId: context.userId,
@@ -133,7 +141,7 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
}
}
logger.info('Knowledge base metadata retrieved via copilot', {
logger.info(withMessageId('Knowledge base metadata retrieved via copilot'), {
knowledgeBaseId: knowledgeBase.id,
userId: context.userId,
})
@@ -197,7 +205,7 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
distanceThreshold: strategy.distanceThreshold,
})
logger.info('Knowledge base queried via copilot', {
logger.info(withMessageId('Knowledge base queried via copilot'), {
knowledgeBaseId: args.knowledgeBaseId,
query: args.query.substring(0, 100),
resultCount: results.length,
@@ -288,13 +296,13 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
},
{}
).catch((err) => {
logger.error('Background document processing failed', {
logger.error(withMessageId('Background document processing failed'), {
documentId: doc.id,
error: err instanceof Error ? err.message : String(err),
})
})
logger.info('Workspace file added to knowledge base via copilot', {
logger.info(withMessageId('Workspace file added to knowledge base via copilot'), {
knowledgeBaseId: args.knowledgeBaseId,
documentId: doc.id,
fileName: fileRecord.name,
@@ -344,7 +352,7 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
assertNotAborted()
const updatedKb = await updateKnowledgeBase(args.knowledgeBaseId, updates, requestId)
logger.info('Knowledge base updated via copilot', {
logger.info(withMessageId('Knowledge base updated via copilot'), {
knowledgeBaseId: args.knowledgeBaseId,
userId: context.userId,
})
@@ -383,7 +391,7 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
assertNotAborted()
await deleteKnowledgeBase(args.knowledgeBaseId, requestId)
logger.info('Knowledge base deleted via copilot', {
logger.info(withMessageId('Knowledge base deleted via copilot'), {
knowledgeBaseId: args.knowledgeBaseId,
name: kbToDelete.name,
userId: context.userId,
@@ -460,7 +468,7 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
const tagDefinitions = await getDocumentTagDefinitions(args.knowledgeBaseId)
logger.info('Tag definitions listed via copilot', {
logger.info(withMessageId('Tag definitions listed via copilot'), {
knowledgeBaseId: args.knowledgeBaseId,
count: tagDefinitions.length,
userId: context.userId,
@@ -514,7 +522,7 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
requestId
)
logger.info('Tag definition created via copilot', {
logger.info(withMessageId('Tag definition created via copilot'), {
knowledgeBaseId: args.knowledgeBaseId,
tagId: newTag.id,
displayName: newTag.displayName,
@@ -565,7 +573,7 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
assertNotAborted()
const updatedTag = await updateTagDefinition(args.tagDefinitionId, updateData, requestId)
logger.info('Tag definition updated via copilot', {
logger.info(withMessageId('Tag definition updated via copilot'), {
tagId: args.tagDefinitionId,
knowledgeBaseId: existingTag.knowledgeBaseId,
userId: context.userId,
@@ -606,7 +614,7 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
requestId
)
logger.info('Tag definition deleted via copilot', {
logger.info(withMessageId('Tag definition deleted via copilot'), {
tagId: args.tagDefinitionId,
tagSlot: deleted.tagSlot,
displayName: deleted.displayName,
@@ -688,7 +696,7 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
}
const connector = createRes.data
logger.info('Connector created via copilot', {
logger.info(withMessageId('Connector created via copilot'), {
connectorId: connector.id,
connectorType: args.connectorType,
knowledgeBaseId: args.knowledgeBaseId,
@@ -743,7 +751,7 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
return { success: false, message: updateRes.error ?? 'Failed to update connector' }
}
logger.info('Connector updated via copilot', {
logger.info(withMessageId('Connector updated via copilot'), {
connectorId: args.connectorId,
userId: context.userId,
})
@@ -776,7 +784,7 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
return { success: false, message: deleteRes.error ?? 'Failed to delete connector' }
}
logger.info('Connector deleted via copilot', {
logger.info(withMessageId('Connector deleted via copilot'), {
connectorId: args.connectorId,
userId: context.userId,
})
@@ -809,7 +817,7 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
return { success: false, message: syncRes.error ?? 'Failed to sync connector' }
}
logger.info('Connector sync triggered via copilot', {
logger.info(withMessageId('Connector sync triggered via copilot'), {
connectorId: args.connectorId,
userId: context.userId,
})
@@ -829,7 +837,7 @@ export const knowledgeBaseServerTool: BaseServerTool<KnowledgeBaseArgs, Knowledg
}
} catch (error) {
const errorMessage = error instanceof Error ? error.message : 'Unknown error occurred'
logger.error('Error in knowledge_base tool', {
logger.error(withMessageId('Error in knowledge_base tool'), {
operation,
error: errorMessage,
userId: context.userId,

View File

@@ -1,4 +1,5 @@
import { createLogger } from '@sim/logger'
import { appendCopilotLogContext } from '@/lib/copilot/logging'
import {
assertServerToolNotAborted,
type BaseServerTool,
@@ -122,7 +123,9 @@ export async function routeExecution(
throw new Error(`Unknown server tool: ${toolName}`)
}
logger.debug('Routing to tool', { toolName })
logger.debug(appendCopilotLogContext('Routing to tool', { messageId: context?.messageId }), {
toolName,
})
// Action-level permission enforcement for mixed read/write tools
if (context?.userPermission && WRITE_ACTIONS[toolName]) {

View File

@@ -1,4 +1,5 @@
import { createLogger } from '@sim/logger'
import { appendCopilotLogContext } from '@/lib/copilot/logging'
import {
assertServerToolNotAborted,
type BaseServerTool,
@@ -241,8 +242,13 @@ async function batchInsertAll(
export const userTableServerTool: BaseServerTool<UserTableArgs, UserTableResult> = {
name: 'user_table',
async execute(params: UserTableArgs, context?: ServerToolContext): Promise<UserTableResult> {
const withMessageId = (message: string) =>
appendCopilotLogContext(message, { messageId: context?.messageId })
if (!context?.userId) {
logger.error('Unauthorized attempt to access user table - no authenticated user context')
logger.error(
withMessageId('Unauthorized attempt to access user table - no authenticated user context')
)
throw new Error('Authentication required')
}
@@ -723,7 +729,7 @@ export const userTableServerTool: BaseServerTool<UserTableArgs, UserTableResult>
const coerced = coerceRows(rows, columns, columnMap)
const inserted = await batchInsertAll(table.id, coerced, table, workspaceId, context)
logger.info('Table created from file', {
logger.info(withMessageId('Table created from file'), {
tableId: table.id,
fileName: file.name,
columns: columns.length,
@@ -799,7 +805,7 @@ export const userTableServerTool: BaseServerTool<UserTableArgs, UserTableResult>
const coerced = coerceRows(rows, matchedColumns, columnMap)
const inserted = await batchInsertAll(table.id, coerced, table, workspaceId, context)
logger.info('Rows imported from file', {
logger.info(withMessageId('Rows imported from file'), {
tableId: table.id,
fileName: file.name,
matchedColumns: mappedHeaders.length,
@@ -997,7 +1003,11 @@ export const userTableServerTool: BaseServerTool<UserTableArgs, UserTableResult>
? error.cause.message
: String(error.cause)
: undefined
logger.error('Table operation failed', { operation, error: errorMessage, cause })
logger.error(withMessageId('Table operation failed'), {
operation,
error: errorMessage,
cause,
})
const displayMessage = cause ? `${errorMessage} (${cause})` : errorMessage
return { success: false, message: `Operation failed: ${displayMessage}` }
}

View File

@@ -1,4 +1,5 @@
import { createLogger } from '@sim/logger'
import { appendCopilotLogContext } from '@/lib/copilot/logging'
import {
assertServerToolNotAborted,
type BaseServerTool,
@@ -62,8 +63,10 @@ function validateGeneratedWorkspaceFileName(fileName: string): string | null {
async function collectSandboxFiles(
workspaceId: string,
inputFiles?: string[],
inputTables?: string[]
inputTables?: string[],
messageId?: string
): Promise<SandboxFile[]> {
const withMessageId = (message: string) => appendCopilotLogContext(message, { messageId })
const sandboxFiles: SandboxFile[] = []
let totalSize = 0
@@ -72,12 +75,12 @@ async function collectSandboxFiles(
for (const fileRef of inputFiles) {
const record = findWorkspaceFileRecord(allFiles, fileRef)
if (!record) {
logger.warn('Sandbox input file not found', { fileRef })
logger.warn(withMessageId('Sandbox input file not found'), { fileRef })
continue
}
const ext = record.name.split('.').pop()?.toLowerCase() ?? ''
if (!TEXT_EXTENSIONS.has(ext)) {
logger.warn('Skipping non-text sandbox input file', {
logger.warn(withMessageId('Skipping non-text sandbox input file'), {
fileId: record.id,
fileName: record.name,
ext,
@@ -85,7 +88,7 @@ async function collectSandboxFiles(
continue
}
if (record.size > MAX_FILE_SIZE) {
logger.warn('Sandbox input file exceeds size limit', {
logger.warn(withMessageId('Sandbox input file exceeds size limit'), {
fileId: record.id,
fileName: record.name,
size: record.size,
@@ -93,7 +96,9 @@ async function collectSandboxFiles(
continue
}
if (totalSize + record.size > MAX_TOTAL_SIZE) {
logger.warn('Sandbox input total size limit reached, skipping remaining files')
logger.warn(
withMessageId('Sandbox input total size limit reached, skipping remaining files')
)
break
}
const buffer = await downloadWorkspaceFile(record)
@@ -114,7 +119,7 @@ async function collectSandboxFiles(
for (const tableId of inputTables) {
const table = await getTableById(tableId)
if (!table) {
logger.warn('Sandbox input table not found', { tableId })
logger.warn(withMessageId('Sandbox input table not found'), { tableId })
continue
}
const { rows } = await queryRows(tableId, workspaceId, { limit: 10000 }, 'sandbox-input')
@@ -129,7 +134,9 @@ async function collectSandboxFiles(
}
const csvContent = csvLines.join('\n')
if (totalSize + csvContent.length > MAX_TOTAL_SIZE) {
logger.warn('Sandbox input total size limit reached, skipping remaining tables')
logger.warn(
withMessageId('Sandbox input total size limit reached, skipping remaining tables')
)
break
}
totalSize += csvContent.length
@@ -150,6 +157,9 @@ export const generateVisualizationServerTool: BaseServerTool<
params: VisualizationArgs,
context?: ServerToolContext
): Promise<VisualizationResult> {
const withMessageId = (message: string) =>
appendCopilotLogContext(message, { messageId: context?.messageId })
if (!context?.userId) {
throw new Error('Authentication required')
}
@@ -167,7 +177,8 @@ export const generateVisualizationServerTool: BaseServerTool<
const sandboxFiles = await collectSandboxFiles(
workspaceId,
params.inputFiles,
params.inputTables
params.inputTables,
context.messageId
)
const wrappedCode = [
@@ -232,7 +243,7 @@ export const generateVisualizationServerTool: BaseServerTool<
imageBuffer,
'image/png'
)
logger.info('Chart image overwritten', {
logger.info(withMessageId('Chart image overwritten'), {
fileId: updated.id,
fileName: updated.name,
size: imageBuffer.length,
@@ -256,7 +267,7 @@ export const generateVisualizationServerTool: BaseServerTool<
'image/png'
)
logger.info('Chart image saved', {
logger.info(withMessageId('Chart image saved'), {
fileId: uploaded.id,
fileName: uploaded.name,
size: imageBuffer.length,
@@ -271,7 +282,7 @@ export const generateVisualizationServerTool: BaseServerTool<
}
} catch (error) {
const msg = error instanceof Error ? error.message : 'Unknown error'
logger.error('Visualization generation failed', { error: msg })
logger.error(withMessageId('Visualization generation failed'), { error: msg })
return { success: false, message: `Failed to generate visualization: ${msg}` }
}
},