This commit is contained in:
Siddharth Ganesan
2026-02-05 16:19:52 -08:00
parent 54a5e06789
commit f582c78220
43 changed files with 2112 additions and 1356 deletions

View File

@@ -0,0 +1,35 @@
# lib/copilot/tools/server/workflow/edit-workflow.ts
90-98 ( 9 lines) [function] logSkippedItem
103-113 ( 11 lines) [function] findBlockWithDuplicateNormalizedName
127-196 ( 70 lines) [function] validateInputsForBlock
211-463 ( 253 lines) [function] validateValueForSubBlockType
481-566 ( 86 lines) [function] topologicalSortInserts
571-684 ( 114 lines) [function] createBlockFromParams
686-716 ( 31 lines) [function] updateCanonicalModesForInputs
721-762 ( 42 lines) [function] normalizeTools
786-804 ( 19 lines) [function] normalizeArrayWithIds
809-811 ( 3 lines) [function] shouldNormalizeArrayIds
818-859 ( 42 lines) [function] normalizeResponseFormat
834-847 ( 14 lines) [arrow] sortKeys
871-945 ( 75 lines) [function] validateSourceHandleForBlock
956-1051 ( 96 lines) [function] validateConditionHandle
1062-1136 ( 75 lines) [function] validateRouterHandle
1141-1149 ( 9 lines) [function] validateTargetHandle
1155-1261 ( 107 lines) [function] createValidatedEdge
1270-1307 ( 38 lines) [function] addConnectionsAsEdges
1280-1291 ( 12 lines) [arrow] addEdgeForTarget
1309-1339 ( 31 lines) [function] applyTriggerConfigToBlockSubblocks
1353-1361 ( 9 lines) [function] isBlockTypeAllowed
1367-1404 ( 38 lines) [function] filterDisallowedTools
1413-1499 ( 87 lines) [function] normalizeBlockIdsInOperations
1441-1444 ( 4 lines) [arrow] replaceId
1504-2676 (1173 lines) [function] applyOperationsToWorkflowState
1649-1656 ( 8 lines) [arrow] findChildren
2055-2059 ( 5 lines) [arrow] mapConnectionTypeToHandle
2063-2074 ( 12 lines) [arrow] addEdgeForTarget
2682-2777 ( 96 lines) [function] validateWorkflowSelectorIds
2786-3066 ( 281 lines) [function] preValidateCredentialInputs
2820-2845 ( 26 lines) [function] collectCredentialInputs
2850-2870 ( 21 lines) [function] collectHostedApiKeyInput
3068-3117 ( 50 lines) [function] getCurrentWorkflowStateFromDb
3121-3333 ( 213 lines) [method] <anonymous class>.execute

View File

@@ -0,0 +1,21 @@
# lib/copilot/tools/server/blocks/get-blocks-metadata-tool.ts
108-306 ( 199 lines) [method] <anonymous class>.execute
309-384 ( 76 lines) [function] transformBlockMetadata
386-459 ( 74 lines) [function] extractInputs
461-503 ( 43 lines) [function] extractOperationInputs
505-518 ( 14 lines) [function] extractOutputs
520-538 ( 19 lines) [function] formatOutputsFromDefinition
540-563 ( 24 lines) [function] mapSchemaTypeToSimpleType
565-591 ( 27 lines) [function] generateInputExample
593-669 ( 77 lines) [function] processSubBlock
671-679 ( 9 lines) [function] resolveAuthType
686-702 ( 17 lines) [function] getStaticModelOptions
712-754 ( 43 lines) [function] callOptionsWithFallback
756-806 ( 51 lines) [function] resolveSubblockOptions
808-820 ( 13 lines) [function] removeNullish
822-832 ( 11 lines) [function] normalizeCondition
834-872 ( 39 lines) [function] splitParametersByOperation
874-905 ( 32 lines) [function] computeBlockLevelInputs
907-935 ( 29 lines) [function] computeOperationLevelInputs
937-947 ( 11 lines) [function] resolveOperationIds
949-961 ( 13 lines) [function] resolveToolIdForOperation

View File

@@ -0,0 +1,13 @@
# lib/copilot/process-contents.ts
31-81 ( 51 lines) [function] processContexts
84-161 ( 78 lines) [function] processContextsServer
163-208 ( 46 lines) [function] sanitizeMessageForDocs
210-248 ( 39 lines) [function] processPastChatFromDb
250-281 ( 32 lines) [function] processWorkflowFromDb
283-316 ( 34 lines) [function] processPastChat
319-321 ( 3 lines) [function] processPastChatViaApi
323-362 ( 40 lines) [function] processKnowledgeFromDb
364-439 ( 76 lines) [function] processBlockMetadata
441-473 ( 33 lines) [function] processTemplateFromDb
475-498 ( 24 lines) [function] processWorkflowBlockFromDb
500-555 ( 56 lines) [function] processExecutionLogFromDb

View File

@@ -6,8 +6,10 @@ import { type NextRequest, NextResponse } from 'next/server'
import { z } from 'zod'
import { getSession } from '@/lib/auth'
import { generateChatTitle } from '@/lib/copilot/chat-title'
import { buildConversationHistory } from '@/lib/copilot/chat-context'
import { resolveOrCreateChat } from '@/lib/copilot/chat-lifecycle'
import { buildCopilotRequestPayload } from '@/lib/copilot/chat-payload'
import { getCopilotModel } from '@/lib/copilot/config'
import { SIM_AGENT_VERSION } from '@/lib/copilot/constants'
import { COPILOT_MODEL_IDS, COPILOT_REQUEST_MODES } from '@/lib/copilot/models'
import { orchestrateCopilotStream } from '@/lib/copilot/orchestrator'
import {
@@ -22,14 +24,8 @@ import {
createRequestTracker,
createUnauthorizedResponse,
} from '@/lib/copilot/request-helpers'
import { getCredentialsServerTool } from '@/lib/copilot/tools/server/user/get-credentials'
import type { CopilotProviderConfig } from '@/lib/copilot/types'
import { env } from '@/lib/core/config/env'
import { CopilotFiles } from '@/lib/uploads'
import { createFileContent } from '@/lib/uploads/utils/file-utils'
import { resolveWorkflowIdForUser } from '@/lib/workflows/utils'
import { tools } from '@/tools/registry'
import { getLatestVersionTools, stripVersionSuffix } from '@/tools/utils'
const logger = createLogger('CopilotChatAPI')
@@ -178,311 +174,66 @@ export async function POST(req: NextRequest) {
let conversationHistory: any[] = []
let actualChatId = chatId
if (chatId) {
// Load existing chat
const [chat] = await db
.select()
.from(copilotChats)
.where(and(eq(copilotChats.id, chatId), eq(copilotChats.userId, authenticatedUserId)))
.limit(1)
if (chat) {
currentChat = chat
conversationHistory = Array.isArray(chat.messages) ? chat.messages : []
}
} else if (createNewChat && workflowId) {
// Create new chat
const { provider, model } = getCopilotModel('chat')
const [newChat] = await db
.insert(copilotChats)
.values({
userId: authenticatedUserId,
workflowId,
title: null,
model,
messages: [],
})
.returning()
if (newChat) {
currentChat = newChat
actualChatId = newChat.id
}
}
// Process file attachments if present
const processedFileContents: any[] = []
if (fileAttachments && fileAttachments.length > 0) {
const processedAttachments = await CopilotFiles.processCopilotAttachments(
fileAttachments,
tracker.requestId
if (chatId || createNewChat) {
const defaultsForChatRow = getCopilotModel('chat')
const chatResult = await resolveOrCreateChat({
chatId,
userId: authenticatedUserId,
workflowId,
model: defaultsForChatRow.model,
})
currentChat = chatResult.chat
actualChatId = chatResult.chatId || chatId
const history = buildConversationHistory(
chatResult.conversationHistory,
(chatResult.chat?.conversationId as string | undefined) || conversationId
)
for (const { buffer, attachment } of processedAttachments) {
const fileContent = createFileContent(buffer, attachment.media_type)
if (fileContent) {
processedFileContents.push(fileContent)
}
}
}
// Build messages array for sim agent with conversation history
const messages: any[] = []
// Add conversation history (need to rebuild these with file support if they had attachments)
for (const msg of conversationHistory) {
if (msg.fileAttachments && msg.fileAttachments.length > 0) {
// This is a message with file attachments - rebuild with content array
const content: any[] = [{ type: 'text', text: msg.content }]
const processedHistoricalAttachments = await CopilotFiles.processCopilotAttachments(
msg.fileAttachments,
tracker.requestId
)
for (const { buffer, attachment } of processedHistoricalAttachments) {
const fileContent = createFileContent(buffer, attachment.media_type)
if (fileContent) {
content.push(fileContent)
}
}
messages.push({
role: msg.role,
content,
})
} else {
// Regular text-only message
messages.push({
role: msg.role,
content: msg.content,
})
}
}
// Add implicit feedback if provided
if (implicitFeedback) {
messages.push({
role: 'system',
content: implicitFeedback,
})
}
// Add current user message with file attachments
if (processedFileContents.length > 0) {
// Message with files - use content array format
const content: any[] = [{ type: 'text', text: message }]
// Add file contents
for (const fileContent of processedFileContents) {
content.push(fileContent)
}
messages.push({
role: 'user',
content,
})
} else {
// Text-only message
messages.push({
role: 'user',
content: message,
})
conversationHistory = history.history
}
const defaults = getCopilotModel('chat')
const selectedModel = model || defaults.model
const envModel = env.COPILOT_MODEL || defaults.model
let providerConfig: CopilotProviderConfig | undefined
const providerEnv = env.COPILOT_PROVIDER as any
if (providerEnv) {
if (providerEnv === 'azure-openai') {
providerConfig = {
provider: 'azure-openai',
model: envModel,
apiKey: env.AZURE_OPENAI_API_KEY,
apiVersion: 'preview',
endpoint: env.AZURE_OPENAI_ENDPOINT,
}
} else if (providerEnv === 'vertex') {
providerConfig = {
provider: 'vertex',
model: envModel,
apiKey: env.COPILOT_API_KEY,
vertexProject: env.VERTEX_PROJECT,
vertexLocation: env.VERTEX_LOCATION,
}
} else {
providerConfig = {
provider: providerEnv,
model: selectedModel,
apiKey: env.COPILOT_API_KEY,
}
}
}
const effectiveMode = mode === 'agent' ? 'build' : mode
const transportMode = effectiveMode === 'build' ? 'agent' : effectiveMode
// Determine conversationId to use for this request
const effectiveConversationId =
(currentChat?.conversationId as string | undefined) || conversationId
// For agent/build mode, fetch credentials and build tool definitions
let integrationTools: any[] = []
let baseTools: any[] = []
let credentials: {
oauth: Record<
string,
{ accessToken: string; accountId: string; name: string; expiresAt?: string }
>
apiKeys: string[]
metadata?: {
connectedOAuth: Array<{ provider: string; name: string; scopes?: string[] }>
configuredApiKeys: string[]
const requestPayload = await buildCopilotRequestPayload(
{
message,
workflowId,
userId: authenticatedUserId,
userMessageId: userMessageIdToUse,
mode,
model: selectedModel,
stream,
conversationId: effectiveConversationId,
conversationHistory,
contexts: agentContexts,
fileAttachments,
commands,
chatId: actualChatId,
prefetch,
userName: session?.user?.name || undefined,
implicitFeedback,
},
{
selectedModel,
}
} | null = null
if (effectiveMode === 'build') {
// Build base tools (executed locally, not deferred)
// Include function_execute for code execution capability
baseTools = [
{
name: 'function_execute',
description:
'Execute JavaScript code to perform calculations, data transformations, API calls, or any programmatic task. Code runs in a secure sandbox with fetch() available. Write plain statements (not wrapped in functions). Example: const res = await fetch(url); const data = await res.json(); return data;',
input_schema: {
type: 'object',
properties: {
code: {
type: 'string',
description:
'Raw JavaScript statements to execute. Code is auto-wrapped in async context. Use fetch() for HTTP requests. Write like: const res = await fetch(url); return await res.json();',
},
},
required: ['code'],
},
executeLocally: true,
},
]
// Fetch user credentials (OAuth + API keys) - pass workflowId to get workspace env vars
try {
const rawCredentials = await getCredentialsServerTool.execute(
{ workflowId },
{ userId: authenticatedUserId }
)
// Transform OAuth credentials to map format: { [provider]: { accessToken, accountId, ... } }
const oauthMap: Record<
string,
{ accessToken: string; accountId: string; name: string; expiresAt?: string }
> = {}
const connectedOAuth: Array<{ provider: string; name: string; scopes?: string[] }> = []
for (const cred of rawCredentials?.oauth?.connected?.credentials || []) {
if (cred.accessToken) {
oauthMap[cred.provider] = {
accessToken: cred.accessToken,
accountId: cred.id,
name: cred.name,
}
connectedOAuth.push({
provider: cred.provider,
name: cred.name,
})
}
}
credentials = {
oauth: oauthMap,
apiKeys: rawCredentials?.environment?.variableNames || [],
metadata: {
connectedOAuth,
configuredApiKeys: rawCredentials?.environment?.variableNames || [],
},
}
logger.info(`[${tracker.requestId}] Fetched credentials for build mode`, {
oauthProviders: Object.keys(oauthMap),
apiKeyCount: credentials.apiKeys.length,
})
} catch (error) {
logger.warn(`[${tracker.requestId}] Failed to fetch credentials`, {
error: error instanceof Error ? error.message : String(error),
})
}
// Build tool definitions (schemas only)
try {
const { createUserToolSchema } = await import('@/tools/params')
const latestTools = getLatestVersionTools(tools)
integrationTools = Object.entries(latestTools).map(([toolId, toolConfig]) => {
const userSchema = createUserToolSchema(toolConfig)
const strippedName = stripVersionSuffix(toolId)
return {
name: strippedName,
description: toolConfig.description || toolConfig.name || strippedName,
input_schema: userSchema,
defer_loading: true, // Anthropic Advanced Tool Use
...(toolConfig.oauth?.required && {
oauth: {
required: true,
provider: toolConfig.oauth.provider,
},
}),
}
})
logger.info(`[${tracker.requestId}] Built tool definitions for build mode`, {
integrationToolCount: integrationTools.length,
})
} catch (error) {
logger.warn(`[${tracker.requestId}] Failed to build tool definitions`, {
error: error instanceof Error ? error.message : String(error),
})
}
}
const requestPayload = {
message: message, // Just send the current user message text
workflowId,
userId: authenticatedUserId,
stream: stream,
streamToolCalls: true,
model: selectedModel,
mode: transportMode,
messageId: userMessageIdToUse,
version: SIM_AGENT_VERSION,
...(providerConfig ? { provider: providerConfig } : {}),
...(effectiveConversationId ? { conversationId: effectiveConversationId } : {}),
...(typeof prefetch === 'boolean' ? { prefetch: prefetch } : {}),
...(session?.user?.name && { userName: session.user.name }),
...(agentContexts.length > 0 && { context: agentContexts }),
...(actualChatId ? { chatId: actualChatId } : {}),
...(processedFileContents.length > 0 && { fileAttachments: processedFileContents }),
// For build/agent mode, include tools and credentials
...(integrationTools.length > 0 && { tools: integrationTools }),
...(baseTools.length > 0 && { baseTools }),
...(credentials && { credentials }),
...(commands && commands.length > 0 && { commands }),
}
)
try {
logger.info(`[${tracker.requestId}] About to call Sim Agent`, {
hasContext: agentContexts.length > 0,
contextCount: agentContexts.length,
hasConversationId: !!effectiveConversationId,
hasFileAttachments: processedFileContents.length > 0,
hasFileAttachments: Array.isArray(requestPayload.fileAttachments),
messageLength: message.length,
mode: effectiveMode,
hasTools: integrationTools.length > 0,
toolCount: integrationTools.length,
hasBaseTools: baseTools.length > 0,
baseToolCount: baseTools.length,
hasCredentials: !!credentials,
hasTools: Array.isArray(requestPayload.tools),
toolCount: Array.isArray(requestPayload.tools) ? requestPayload.tools.length : 0,
hasBaseTools: Array.isArray(requestPayload.baseTools),
baseToolCount: Array.isArray(requestPayload.baseTools) ? requestPayload.baseTools.length : 0,
hasCredentials: !!requestPayload.credentials,
})
} catch {}
@@ -623,7 +374,7 @@ export async function POST(req: NextRequest) {
content: nonStreamingResult.content,
toolCalls: nonStreamingResult.toolCalls,
model: selectedModel,
provider: providerConfig?.provider || env.COPILOT_PROVIDER || 'openai',
provider: (requestPayload?.provider as Record<string, unknown>)?.provider || env.COPILOT_PROVIDER || 'openai',
}
logger.info(`[${tracker.requestId}] Non-streaming response from orchestrator:`, {

View File

@@ -1,6 +1,7 @@
import { createLogger } from '@sim/logger'
import { type NextRequest, NextResponse } from 'next/server'
import { z } from 'zod'
import { REDIS_TOOL_CALL_PREFIX, REDIS_TOOL_CALL_TTL_SECONDS } from '@/lib/copilot/constants'
import {
authenticateCopilotRequestSessionOnly,
createBadRequestResponse,
@@ -38,13 +39,13 @@ async function updateToolCallStatus(
}
try {
const key = `tool_call:${toolCallId}`
const key = `${REDIS_TOOL_CALL_PREFIX}${toolCallId}`
const payload = {
status,
message: message || null,
timestamp: new Date().toISOString(),
}
await redis.set(key, JSON.stringify(payload), 'EX', 86400)
await redis.set(key, JSON.stringify(payload), 'EX', REDIS_TOOL_CALL_TTL_SECONDS)
return true
} catch (error) {
logger.error('Failed to update tool call status', {

View File

@@ -211,7 +211,7 @@ const CopilotMessage: FC<CopilotMessageProps> = memo(
if (block.type === 'text') {
const isLastTextBlock =
index === message.contentBlocks!.length - 1 && block.type === 'text'
const parsed = parseSpecialTags(block.content)
const parsed = parseSpecialTags(block.content ?? '')
// Mask credential IDs in the displayed content
const cleanBlockContent = maskCredentialValue(
parsed.cleanContent.replace(/\n{3,}/g, '\n\n')
@@ -243,7 +243,7 @@ const CopilotMessage: FC<CopilotMessageProps> = memo(
return (
<div key={blockKey} className='w-full'>
<ThinkingBlock
content={maskCredentialValue(block.content)}
content={maskCredentialValue(block.content ?? '')}
isStreaming={isActivelyStreaming}
hasFollowingContent={hasFollowingContent}
hasSpecialTags={hasSpecialTags}
@@ -251,7 +251,7 @@ const CopilotMessage: FC<CopilotMessageProps> = memo(
</div>
)
}
if (block.type === 'tool_call') {
if (block.type === 'tool_call' && block.toolCall) {
const blockKey = `tool-${block.toolCall.id}`
return (

View File

@@ -1,5 +1,11 @@
import { useCallback } from 'react'
import { createLogger } from '@sim/logger'
declare global {
interface Window {
__skipDiffRecording?: boolean
}
}
import type { Edge } from 'reactflow'
import { useSession } from '@/lib/auth/auth-client'
import { enqueueReplaceWorkflowState } from '@/lib/workflows/operations/socket-operations'
@@ -908,7 +914,7 @@ export function useUndoRedo() {
// Set flag to skip recording during this operation
;(window as any).__skipDiffRecording = true
;window.__skipDiffRecording = true
try {
// Restore baseline state and broadcast to everyone
if (baselineSnapshot && activeWorkflowId) {
@@ -945,7 +951,7 @@ export function useUndoRedo() {
logger.info('Clearing diff UI state')
useWorkflowDiffStore.getState().clearDiff({ restoreBaseline: false })
} finally {
;(window as any).__skipDiffRecording = false
;window.__skipDiffRecording = false
}
logger.info('Undid apply-diff operation successfully')
@@ -965,7 +971,7 @@ export function useUndoRedo() {
// Set flag to skip recording during this operation
;(window as any).__skipDiffRecording = true
;window.__skipDiffRecording = true
try {
// Apply the before-accept state (with markers for this user)
useWorkflowStore.getState().replaceWorkflowState(beforeAccept)
@@ -1004,7 +1010,7 @@ export function useUndoRedo() {
diffAnalysis: diffAnalysis,
})
} finally {
;(window as any).__skipDiffRecording = false
;window.__skipDiffRecording = false
}
logger.info('Undid accept-diff operation - restored diff view')
@@ -1018,7 +1024,7 @@ export function useUndoRedo() {
const { useWorkflowStore } = await import('@/stores/workflows/workflow/store')
const { useSubBlockStore } = await import('@/stores/workflows/subblock/store')
;(window as any).__skipDiffRecording = true
;window.__skipDiffRecording = true
try {
// Apply the before-reject state (with markers for this user)
useWorkflowStore.getState().replaceWorkflowState(beforeReject)
@@ -1055,7 +1061,7 @@ export function useUndoRedo() {
diffAnalysis: diffAnalysis,
})
} finally {
;(window as any).__skipDiffRecording = false
;window.__skipDiffRecording = false
}
logger.info('Undid reject-diff operation - restored diff view')
@@ -1526,7 +1532,7 @@ export function useUndoRedo() {
// Set flag to skip recording during this operation
;(window as any).__skipDiffRecording = true
;window.__skipDiffRecording = true
try {
// Manually apply the proposed state and set up diff store (similar to setProposedChanges but with original baseline)
const diffStore = useWorkflowDiffStore.getState()
@@ -1567,7 +1573,7 @@ export function useUndoRedo() {
diffAnalysis: diffAnalysis,
})
} finally {
;(window as any).__skipDiffRecording = false
;window.__skipDiffRecording = false
}
logger.info('Redid apply-diff operation')
@@ -1583,7 +1589,7 @@ export function useUndoRedo() {
// Set flag to skip recording during this operation
;(window as any).__skipDiffRecording = true
;window.__skipDiffRecording = true
try {
// Clear diff state FIRST to prevent flash of colors (local UI only)
// Use setState directly to ensure synchronous clearing
@@ -1621,7 +1627,7 @@ export function useUndoRedo() {
operationId: opId,
})
} finally {
;(window as any).__skipDiffRecording = false
;window.__skipDiffRecording = false
}
logger.info('Redid accept-diff operation - cleared diff view')
@@ -1635,7 +1641,7 @@ export function useUndoRedo() {
const { useWorkflowStore } = await import('@/stores/workflows/workflow/store')
const { useSubBlockStore } = await import('@/stores/workflows/subblock/store')
;(window as any).__skipDiffRecording = true
;window.__skipDiffRecording = true
try {
// Clear diff state FIRST to prevent flash of colors (local UI only)
// Use setState directly to ensure synchronous clearing
@@ -1673,7 +1679,7 @@ export function useUndoRedo() {
operationId: opId,
})
} finally {
;(window as any).__skipDiffRecording = false
;window.__skipDiffRecording = false
}
logger.info('Redid reject-diff operation - cleared diff view')

View File

@@ -1,4 +1,5 @@
import { createLogger } from '@sim/logger'
import { COPILOT_CHAT_API_PATH, COPILOT_CHAT_STREAM_API_PATH } from '@/lib/copilot/constants'
import type { CopilotMode, CopilotModelId, CopilotTransportMode } from '@/lib/copilot/models'
const logger = createLogger('CopilotAPI')
@@ -139,7 +140,9 @@ export async function sendStreamingMessage(
contextsPreview: preview,
resumeFromEventId,
})
} catch {}
} catch (error) {
logger.warn('Failed to log streaming message context preview', { error: error instanceof Error ? error.message : String(error) })
}
const streamId = request.userMessageId
if (typeof resumeFromEventId === 'number') {
@@ -150,7 +153,7 @@ export async function sendStreamingMessage(
status: 400,
}
}
const url = `/api/copilot/chat/stream?streamId=${encodeURIComponent(
const url = `${COPILOT_CHAT_STREAM_API_PATH}?streamId=${encodeURIComponent(
streamId
)}&from=${encodeURIComponent(String(resumeFromEventId))}`
const response = await fetch(url, {
@@ -182,7 +185,7 @@ export async function sendStreamingMessage(
}
}
const response = await fetch('/api/copilot/chat', {
const response = await fetch(COPILOT_CHAT_API_PATH, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ ...requestBody, stream: true }),

View File

@@ -0,0 +1,63 @@
import { createLogger } from '@sim/logger'
import { CopilotFiles } from '@/lib/uploads'
import { createFileContent } from '@/lib/uploads/utils/file-utils'
const logger = createLogger('CopilotChatContext')
/**
* Build conversation history from stored chat messages.
*/
export function buildConversationHistory(
messages: unknown[],
conversationId?: string
): { history: unknown[]; conversationId?: string } {
const history = Array.isArray(messages) ? messages : []
return {
history,
...(conversationId ? { conversationId } : {}),
}
}
export interface FileAttachmentInput {
id: string
key: string
name?: string
filename?: string
mimeType?: string
media_type?: string
size: number
}
export interface FileContent {
type: string
[key: string]: unknown
}
/**
* Process file attachments into content for the payload.
*/
export async function processFileAttachments(
fileAttachments: FileAttachmentInput[],
userId: string
): Promise<FileContent[]> {
if (!Array.isArray(fileAttachments) || fileAttachments.length === 0) return []
const processedFileContents: FileContent[] = []
const requestId = `copilot-${userId}-${Date.now()}`
const processedAttachments = await CopilotFiles.processCopilotAttachments(fileAttachments as Parameters<typeof CopilotFiles.processCopilotAttachments>[0], requestId)
for (const { buffer, attachment } of processedAttachments) {
const fileContent = createFileContent(buffer, attachment.media_type)
if (fileContent) {
processedFileContents.push(fileContent as FileContent)
}
}
logger.debug('Processed file attachments for payload', {
userId,
inputCount: fileAttachments.length,
outputCount: processedFileContents.length,
})
return processedFileContents
}

View File

@@ -0,0 +1,69 @@
import { db } from '@sim/db'
import { copilotChats } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { and, eq } from 'drizzle-orm'
const logger = createLogger('CopilotChatLifecycle')
export interface ChatLoadResult {
chatId: string
chat: typeof copilotChats.$inferSelect | null
conversationHistory: unknown[]
isNew: boolean
}
/**
* Resolve or create a copilot chat session.
* If chatId is provided, loads the existing chat. Otherwise creates a new one.
*/
export async function resolveOrCreateChat(params: {
chatId?: string
userId: string
workflowId: string
model: string
}): Promise<ChatLoadResult> {
const { chatId, userId, workflowId, model } = params
if (chatId) {
const [chat] = await db
.select()
.from(copilotChats)
.where(and(eq(copilotChats.id, chatId), eq(copilotChats.userId, userId)))
.limit(1)
return {
chatId,
chat: chat ?? null,
conversationHistory: chat && Array.isArray(chat.messages) ? chat.messages : [],
isNew: false,
}
}
const [newChat] = await db
.insert(copilotChats)
.values({
userId,
workflowId,
title: null,
model,
messages: [],
})
.returning()
if (!newChat) {
logger.warn('Failed to create new copilot chat row', { userId, workflowId })
return {
chatId: '',
chat: null,
conversationHistory: [],
isNew: true,
}
}
return {
chatId: newChat.id,
chat: newChat,
conversationHistory: [],
isNew: true,
}
}

View File

@@ -0,0 +1,252 @@
import { createLogger } from '@sim/logger'
import { env } from '@/lib/core/config/env'
import { getCopilotModel } from '@/lib/copilot/config'
import { SIM_AGENT_VERSION } from '@/lib/copilot/constants'
import { getCredentialsServerTool } from '@/lib/copilot/tools/server/user/get-credentials'
import type { CopilotProviderConfig } from '@/lib/copilot/types'
import { tools } from '@/tools/registry'
import { getLatestVersionTools, stripVersionSuffix } from '@/tools/utils'
import { type FileContent, processFileAttachments } from '@/lib/copilot/chat-context'
const logger = createLogger('CopilotChatPayload')
export interface BuildPayloadParams {
message: string
workflowId: string
userId: string
userMessageId: string
mode: string
model: string
stream: boolean
conversationId?: string
conversationHistory?: unknown[]
contexts?: Array<{ type: string; content: string }>
fileAttachments?: Array<{ id: string; key: string; size: number; [key: string]: unknown }>
commands?: string[]
chatId?: string
prefetch?: boolean
userName?: string
implicitFeedback?: string
}
interface ToolSchema {
name: string
description: string
input_schema: Record<string, unknown>
defer_loading?: boolean
executeLocally?: boolean
oauth?: { required: boolean; provider: string }
}
interface CredentialsPayload {
oauth: Record<string, { accessToken: string; accountId: string; name: string; expiresAt?: string }>
apiKeys: string[]
metadata?: {
connectedOAuth: Array<{ provider: string; name: string; scopes?: string[] }>
configuredApiKeys: string[]
}
}
type MessageContent = string | Array<{ type: string; text?: string; [key: string]: unknown }>
interface ConversationMessage {
role: string
content: MessageContent
}
function buildProviderConfig(selectedModel: string): CopilotProviderConfig | undefined {
const defaults = getCopilotModel('chat')
const envModel = env.COPILOT_MODEL || defaults.model
const providerEnv = env.COPILOT_PROVIDER
if (!providerEnv) return undefined
if (providerEnv === 'azure-openai') {
return {
provider: 'azure-openai',
model: envModel,
apiKey: env.AZURE_OPENAI_API_KEY,
apiVersion: 'preview',
endpoint: env.AZURE_OPENAI_ENDPOINT,
}
}
if (providerEnv === 'vertex') {
return {
provider: 'vertex',
model: envModel,
apiKey: env.COPILOT_API_KEY,
vertexProject: env.VERTEX_PROJECT,
vertexLocation: env.VERTEX_LOCATION,
}
}
return {
provider: providerEnv as Exclude<string, 'azure-openai' | 'vertex'>,
model: selectedModel,
apiKey: env.COPILOT_API_KEY,
} as CopilotProviderConfig
}
/**
* Build the request payload for the copilot backend.
*/
export async function buildCopilotRequestPayload(
params: BuildPayloadParams,
options: {
providerConfig?: CopilotProviderConfig
selectedModel: string
}
): Promise<Record<string, unknown>> {
const {
message, workflowId, userId, userMessageId, mode, stream,
conversationId, conversationHistory = [], contexts, fileAttachments,
commands, chatId, prefetch, userName, implicitFeedback,
} = params
const selectedModel = options.selectedModel
const providerConfig = options.providerConfig ?? buildProviderConfig(selectedModel)
const effectiveMode = mode === 'agent' ? 'build' : mode
const transportMode = effectiveMode === 'build' ? 'agent' : effectiveMode
const processedFileContents = await processFileAttachments(fileAttachments ?? [], userId)
const messages: ConversationMessage[] = []
for (const msg of conversationHistory as Array<Record<string, unknown>>) {
const msgAttachments = msg.fileAttachments as Array<Record<string, unknown>> | undefined
if (Array.isArray(msgAttachments) && msgAttachments.length > 0) {
const content: Array<{ type: string; text?: string; [key: string]: unknown }> = [
{ type: 'text', text: msg.content as string },
]
const processedHistoricalAttachments = await processFileAttachments(msgAttachments as BuildPayloadParams['fileAttachments'] ?? [], userId)
for (const fileContent of processedHistoricalAttachments) {
content.push(fileContent)
}
messages.push({ role: msg.role as string, content })
} else {
messages.push({ role: msg.role as string, content: msg.content as string })
}
}
if (implicitFeedback) {
messages.push({ role: 'system', content: implicitFeedback })
}
if (processedFileContents.length > 0) {
const content: Array<{ type: string; text?: string; [key: string]: unknown }> = [
{ type: 'text', text: message },
]
for (const fileContent of processedFileContents) {
content.push(fileContent)
}
messages.push({ role: 'user', content })
} else {
messages.push({ role: 'user', content: message })
}
let integrationTools: ToolSchema[] = []
let baseTools: ToolSchema[] = []
let credentials: CredentialsPayload | null = null
if (effectiveMode === 'build') {
baseTools = [
{
name: 'function_execute',
description:
'Execute JavaScript code to perform calculations, data transformations, API calls, or any programmatic task. Code runs in a secure sandbox with fetch() available. Write plain statements (not wrapped in functions). Example: const res = await fetch(url); const data = await res.json(); return data;',
input_schema: {
type: 'object',
properties: {
code: {
type: 'string',
description:
'Raw JavaScript statements to execute. Code is auto-wrapped in async context. Use fetch() for HTTP requests. Write like: const res = await fetch(url); return await res.json();',
},
},
required: ['code'],
},
executeLocally: true,
},
]
try {
const rawCredentials = await getCredentialsServerTool.execute({ workflowId }, { userId })
const oauthMap: CredentialsPayload['oauth'] = {}
const connectedOAuth: Array<{ provider: string; name: string; scopes?: string[] }> = []
for (const cred of rawCredentials?.oauth?.connected?.credentials ?? []) {
if (cred.accessToken) {
oauthMap[cred.provider] = {
accessToken: cred.accessToken,
accountId: cred.id,
name: cred.name,
}
connectedOAuth.push({ provider: cred.provider, name: cred.name })
}
}
credentials = {
oauth: oauthMap,
apiKeys: rawCredentials?.environment?.variableNames ?? [],
metadata: {
connectedOAuth,
configuredApiKeys: rawCredentials?.environment?.variableNames ?? [],
},
}
} catch (error) {
logger.warn('Failed to fetch credentials for build payload', {
error: error instanceof Error ? error.message : String(error),
})
}
try {
const { createUserToolSchema } = await import('@/tools/params')
const latestTools = getLatestVersionTools(tools)
integrationTools = Object.entries(latestTools).map(([toolId, toolConfig]) => {
const userSchema = createUserToolSchema(toolConfig)
const strippedName = stripVersionSuffix(toolId)
return {
name: strippedName,
description: toolConfig.description || toolConfig.name || strippedName,
input_schema: userSchema as unknown as Record<string, unknown>,
defer_loading: true,
...(toolConfig.oauth?.required && {
oauth: {
required: true,
provider: toolConfig.oauth.provider,
},
}),
}
})
} catch (error) {
logger.warn('Failed to build tool schemas for payload', {
error: error instanceof Error ? error.message : String(error),
})
}
}
return {
message,
workflowId,
userId,
stream,
streamToolCalls: true,
model: selectedModel,
mode: transportMode,
messageId: userMessageId,
version: SIM_AGENT_VERSION,
...(providerConfig ? { provider: providerConfig } : {}),
...(conversationId ? { conversationId } : {}),
...(typeof prefetch === 'boolean' ? { prefetch } : {}),
...(userName ? { userName } : {}),
...(contexts && contexts.length > 0 ? { context: contexts } : {}),
...(chatId ? { chatId } : {}),
...(processedFileContents.length > 0 ? { fileAttachments: processedFileContents } : {}),
...(integrationTools.length > 0 ? { tools: integrationTools } : {}),
...(baseTools.length > 0 ? { baseTools } : {}),
...(credentials ? { credentials } : {}),
...(commands && commands.length > 0 ? { commands } : {}),
}
}

View File

@@ -3,7 +3,7 @@ import type {
CopilotMessage,
MessageFileAttachment,
} from '@/stores/panel/copilot/types'
import type { StreamingContext } from './types'
import type { ClientContentBlock, ClientStreamingContext } from './types'
const TEXT_BLOCK_TYPE = 'text'
const THINKING_BLOCK_TYPE = 'thinking'
@@ -25,8 +25,8 @@ export function createUserMessage(
...(contexts &&
contexts.length > 0 && {
contentBlocks: [
{ type: 'contexts', contexts: contexts as any, timestamp: Date.now() },
] as any,
{ type: 'contexts', contexts, timestamp: Date.now() },
],
}),
}
}
@@ -61,7 +61,7 @@ export function createErrorMessage(
}
}
export function appendTextBlock(context: StreamingContext, text: string) {
export function appendTextBlock(context: ClientStreamingContext, text: string) {
if (!text) return
context.accumulatedContent += text
if (context.currentTextBlock && context.contentBlocks.length > 0) {
@@ -71,11 +71,9 @@ export function appendTextBlock(context: StreamingContext, text: string) {
return
}
}
context.currentTextBlock = { type: '', content: '', timestamp: 0, toolCall: null }
context.currentTextBlock.type = TEXT_BLOCK_TYPE
context.currentTextBlock.content = text
context.currentTextBlock.timestamp = Date.now()
context.contentBlocks.push(context.currentTextBlock)
const newBlock: ClientContentBlock = { type: 'text', content: text, timestamp: Date.now() }
context.currentTextBlock = newBlock
context.contentBlocks.push(newBlock)
}
export function appendContinueOption(content: string): string {
@@ -84,7 +82,7 @@ export function appendContinueOption(content: string): string {
return `${content}${suffix}${CONTINUE_OPTIONS_TAG}`
}
export function appendContinueOptionBlock(blocks: any[]): any[] {
export function appendContinueOptionBlock(blocks: ClientContentBlock[]): ClientContentBlock[] {
if (!Array.isArray(blocks)) return blocks
const hasOptions = blocks.some(
(block) =>
@@ -109,7 +107,7 @@ export function stripContinueOption(content: string): string {
return next.replace(/\n{2,}\s*$/g, '\n').trimEnd()
}
export function stripContinueOptionFromBlocks(blocks: any[]): any[] {
export function stripContinueOptionFromBlocks(blocks: ClientContentBlock[]): ClientContentBlock[] {
if (!Array.isArray(blocks)) return blocks
return blocks.flatMap((block) => {
if (
@@ -125,20 +123,17 @@ export function stripContinueOptionFromBlocks(blocks: any[]): any[] {
})
}
export function beginThinkingBlock(context: StreamingContext) {
export function beginThinkingBlock(context: ClientStreamingContext) {
if (!context.currentThinkingBlock) {
context.currentThinkingBlock = { type: '', content: '', timestamp: 0, toolCall: null }
context.currentThinkingBlock.type = THINKING_BLOCK_TYPE
context.currentThinkingBlock.content = ''
context.currentThinkingBlock.timestamp = Date.now()
;(context.currentThinkingBlock as any).startTime = Date.now()
context.contentBlocks.push(context.currentThinkingBlock)
const newBlock: ClientContentBlock = { type: 'thinking', content: '', timestamp: Date.now(), startTime: Date.now() }
context.currentThinkingBlock = newBlock
context.contentBlocks.push(newBlock)
}
context.isInThinkingBlock = true
context.currentTextBlock = null
}
export function finalizeThinkingBlock(context: StreamingContext) {
export function finalizeThinkingBlock(context: ClientStreamingContext) {
if (context.currentThinkingBlock) {
context.currentThinkingBlock.duration =
Date.now() - (context.currentThinkingBlock.startTime || Date.now())

View File

@@ -1,28 +1,30 @@
import { createLogger } from '@sim/logger'
import { STREAM_STORAGE_KEY } from '@/lib/copilot/constants'
import type { SSEEvent } from '@/lib/copilot/orchestrator/types'
import { asRecord } from '@/lib/copilot/orchestrator/sse-utils'
import { ClientToolCallState } from '@/lib/copilot/tools/client/tool-display-registry'
import { useWorkflowDiffStore } from '@/stores/workflow-diff/store'
import type { CopilotStore, CopilotToolCall } from '@/stores/panel/copilot/types'
import {
appendTextBlock,
beginThinkingBlock,
finalizeThinkingBlock,
} from './content-blocks'
import type { StreamingContext } from './types'
import {
isBackgroundState,
isRejectedState,
isReviewState,
resolveToolDisplay,
} from '@/lib/copilot/store-utils'
import { useWorkflowDiffStore } from '@/stores/workflow-diff/store'
import type { CopilotStore, CopilotStreamInfo, CopilotToolCall } from '@/stores/panel/copilot/types'
import {
appendTextBlock,
beginThinkingBlock,
finalizeThinkingBlock,
} from './content-blocks'
import type { ClientContentBlock, ClientStreamingContext } from './types'
const logger = createLogger('CopilotClientSseHandlers')
const STREAM_STORAGE_KEY = 'copilot_active_stream'
const TEXT_BLOCK_TYPE = 'text'
const MAX_BATCH_INTERVAL = 50
const MIN_BATCH_INTERVAL = 16
const MAX_QUEUE_SIZE = 5
function writeActiveStreamToStorage(info: any): void {
function writeActiveStreamToStorage(info: CopilotStreamInfo | null): void {
if (typeof window === 'undefined') return
try {
if (!info) {
@@ -30,17 +32,25 @@ function writeActiveStreamToStorage(info: any): void {
return
}
window.sessionStorage.setItem(STREAM_STORAGE_KEY, JSON.stringify(info))
} catch {}
} catch (error) {
logger.warn('Failed to write active stream to storage', {
error: error instanceof Error ? error.message : String(error),
})
}
}
type StoreSet = (
partial: Partial<CopilotStore> | ((state: CopilotStore) => Partial<CopilotStore>)
) => void
export type SSEHandler = (
data: any,
context: StreamingContext,
data: SSEEvent,
context: ClientStreamingContext,
get: () => CopilotStore,
set: any
set: StoreSet
) => Promise<void> | void
const streamingUpdateQueue = new Map<string, StreamingContext>()
const streamingUpdateQueue = new Map<string, ClientStreamingContext>()
let streamingUpdateRAF: number | null = null
let lastBatchTime = 0
@@ -52,8 +62,8 @@ export function stopStreamingUpdates() {
streamingUpdateQueue.clear()
}
function createOptimizedContentBlocks(contentBlocks: any[]): any[] {
const result: any[] = new Array(contentBlocks.length)
function createOptimizedContentBlocks(contentBlocks: ClientContentBlock[]): ClientContentBlock[] {
const result: ClientContentBlock[] = new Array(contentBlocks.length)
for (let i = 0; i < contentBlocks.length; i++) {
const block = contentBlocks[i]
result[i] = { ...block }
@@ -61,7 +71,7 @@ function createOptimizedContentBlocks(contentBlocks: any[]): any[] {
return result
}
export function flushStreamingUpdates(set: any) {
export function flushStreamingUpdates(set: StoreSet) {
if (streamingUpdateRAF !== null) {
cancelAnimationFrame(streamingUpdateRAF)
streamingUpdateRAF = null
@@ -90,7 +100,7 @@ export function flushStreamingUpdates(set: any) {
})
}
export function updateStreamingMessage(set: any, context: StreamingContext) {
export function updateStreamingMessage(set: StoreSet, context: ClientStreamingContext) {
if (context.suppressStreamingUpdates) return
const now = performance.now()
streamingUpdateQueue.set(context.messageId, context)
@@ -146,10 +156,10 @@ export function updateStreamingMessage(set: any, context: StreamingContext) {
}
}
export function upsertToolCallBlock(context: StreamingContext, toolCall: CopilotToolCall) {
export function upsertToolCallBlock(context: ClientStreamingContext, toolCall: CopilotToolCall) {
let found = false
for (let i = 0; i < context.contentBlocks.length; i++) {
const b = context.contentBlocks[i] as any
const b = context.contentBlocks[i]
if (b.type === 'tool_call' && b.toolCall?.id === toolCall.id) {
context.contentBlocks[i] = { ...b, toolCall }
found = true
@@ -165,19 +175,16 @@ function stripThinkingTags(text: string): string {
return text.replace(/<\/?thinking[^>]*>/gi, '').replace(/&lt;\/?thinking[^&]*&gt;/gi, '')
}
function appendThinkingContent(context: StreamingContext, text: string) {
function appendThinkingContent(context: ClientStreamingContext, text: string) {
if (!text) return
const cleanedText = stripThinkingTags(text)
if (!cleanedText) return
if (context.currentThinkingBlock) {
context.currentThinkingBlock.content += cleanedText
} else {
context.currentThinkingBlock = { type: '', content: '', timestamp: 0, toolCall: null }
context.currentThinkingBlock.type = 'thinking'
context.currentThinkingBlock.content = cleanedText
context.currentThinkingBlock.timestamp = Date.now()
context.currentThinkingBlock.startTime = Date.now()
context.contentBlocks.push(context.currentThinkingBlock)
const newBlock: ClientContentBlock = { type: 'thinking', content: cleanedText, timestamp: Date.now(), startTime: Date.now() }
context.currentThinkingBlock = newBlock
context.contentBlocks.push(newBlock)
}
context.isInThinkingBlock = true
context.currentTextBlock = null
@@ -209,10 +216,12 @@ export const sseHandlers: Record<string, SSEHandler> = {
},
tool_result: (data, context, get, set) => {
try {
const toolCallId: string | undefined = data?.toolCallId || data?.data?.id
const eventData = asRecord(data?.data)
const toolCallId: string | undefined = data?.toolCallId || (eventData.id as string | undefined)
const success: boolean | undefined = data?.success
const failedDependency: boolean = data?.failedDependency === true
const skipped: boolean = data?.result?.skipped === true
const resultObj = asRecord(data?.result)
const skipped: boolean = resultObj.skipped === true
if (!toolCallId) return
const { toolCallsById } = get()
const current = toolCallsById[toolCallId]
@@ -233,24 +242,24 @@ export const sseHandlers: Record<string, SSEHandler> = {
updatedMap[toolCallId] = {
...current,
state: targetState,
display: resolveToolDisplay(
current.name,
targetState,
current.id,
(current as any).params
),
display: resolveToolDisplay(current.name, targetState, current.id, current.params),
}
set({ toolCallsById: updatedMap })
if (targetState === ClientToolCallState.success && current.name === 'checkoff_todo') {
try {
const result = (data?.result || data?.data?.result) ?? {}
const input = ((current as any).params || (current as any).input) ?? {}
const todoId = input.id || input.todoId || result.id || result.todoId
const result = asRecord(data?.result) || asRecord(eventData.result)
const input = asRecord(current.params || current.input)
const todoId = (input.id || input.todoId || result.id || result.todoId) as string | undefined
if (todoId) {
get().updatePlanTodoStatus(todoId, 'completed')
}
} catch {}
} catch (error) {
logger.warn('Failed to process checkoff_todo tool result', {
error: error instanceof Error ? error.message : String(error),
toolCallId,
})
}
}
if (
@@ -258,28 +267,35 @@ export const sseHandlers: Record<string, SSEHandler> = {
current.name === 'mark_todo_in_progress'
) {
try {
const result = (data?.result || data?.data?.result) ?? {}
const input = ((current as any).params || (current as any).input) ?? {}
const todoId = input.id || input.todoId || result.id || result.todoId
const result = asRecord(data?.result) || asRecord(eventData.result)
const input = asRecord(current.params || current.input)
const todoId = (input.id || input.todoId || result.id || result.todoId) as string | undefined
if (todoId) {
get().updatePlanTodoStatus(todoId, 'executing')
}
} catch {}
} catch (error) {
logger.warn('Failed to process mark_todo_in_progress tool result', {
error: error instanceof Error ? error.message : String(error),
toolCallId,
})
}
}
if (current.name === 'edit_workflow') {
try {
const resultPayload =
(data?.result || data?.data?.result || data?.data?.data || data?.data) ?? {}
const workflowState = resultPayload?.workflowState
const resultPayload = asRecord(
data?.result || eventData.result || eventData.data || data?.data
)
const workflowState = asRecord(resultPayload?.workflowState)
const hasWorkflowState = !!resultPayload?.workflowState
logger.info('[SSE] edit_workflow result received', {
hasWorkflowState: !!workflowState,
blockCount: workflowState ? Object.keys(workflowState.blocks ?? {}).length : 0,
edgeCount: workflowState?.edges?.length ?? 0,
hasWorkflowState,
blockCount: hasWorkflowState ? Object.keys(workflowState.blocks ?? {}).length : 0,
edgeCount: Array.isArray(workflowState.edges) ? workflowState.edges.length : 0,
})
if (workflowState) {
if (hasWorkflowState) {
const diffStore = useWorkflowDiffStore.getState()
diffStore.setProposedChanges(workflowState).catch((err) => {
diffStore.setProposedChanges(resultPayload.workflowState).catch((err) => {
logger.error('[SSE] Failed to apply edit_workflow diff', {
error: err instanceof Error ? err.message : String(err),
})
@@ -294,7 +310,7 @@ export const sseHandlers: Record<string, SSEHandler> = {
}
for (let i = 0; i < context.contentBlocks.length; i++) {
const b = context.contentBlocks[i] as any
const b = context.contentBlocks[i]
if (b?.type === 'tool_call' && b?.toolCall?.id === toolCallId) {
if (
isRejectedState(b.toolCall?.state) ||
@@ -324,11 +340,16 @@ export const sseHandlers: Record<string, SSEHandler> = {
}
}
updateStreamingMessage(set, context)
} catch {}
} catch (error) {
logger.warn('Failed to process tool_result SSE event', {
error: error instanceof Error ? error.message : String(error),
})
}
},
tool_error: (data, context, get, set) => {
try {
const toolCallId: string | undefined = data?.toolCallId || data?.data?.id
const errorData = asRecord(data?.data)
const toolCallId: string | undefined = data?.toolCallId || (errorData.id as string | undefined)
const failedDependency: boolean = data?.failedDependency === true
if (!toolCallId) return
const { toolCallsById } = get()
@@ -348,17 +369,12 @@ export const sseHandlers: Record<string, SSEHandler> = {
updatedMap[toolCallId] = {
...current,
state: targetState,
display: resolveToolDisplay(
current.name,
targetState,
current.id,
(current as any).params
),
display: resolveToolDisplay(current.name, targetState, current.id, current.params),
}
set({ toolCallsById: updatedMap })
}
for (let i = 0; i < context.contentBlocks.length; i++) {
const b = context.contentBlocks[i] as any
const b = context.contentBlocks[i]
if (b?.type === 'tool_call' && b?.toolCall?.id === toolCallId) {
if (
isRejectedState(b.toolCall?.state) ||
@@ -386,7 +402,11 @@ export const sseHandlers: Record<string, SSEHandler> = {
}
}
updateStreamingMessage(set, context)
} catch {}
} catch (error) {
logger.warn('Failed to process tool_error SSE event', {
error: error instanceof Error ? error.message : String(error),
})
}
},
tool_generating: (data, context, get, set) => {
const { toolCallId, toolName } = data
@@ -410,11 +430,11 @@ export const sseHandlers: Record<string, SSEHandler> = {
}
},
tool_call: (data, context, get, set) => {
const toolData = data?.data ?? {}
const id: string | undefined = toolData.id || data?.toolCallId
const name: string | undefined = toolData.name || data?.toolName
const toolData = asRecord(data?.data)
const id: string | undefined = (toolData.id as string | undefined) || data?.toolCallId
const name: string | undefined = (toolData.name as string | undefined) || data?.toolName
if (!id) return
const args = toolData.arguments
const args = toolData.arguments as Record<string, unknown> | undefined
const isPartial = toolData.partial === true
const { toolCallsById } = get()

View File

@@ -1,19 +1,25 @@
import { createLogger } from '@sim/logger'
import {
asRecord,
normalizeSseEvent,
shouldSkipToolCallEvent,
shouldSkipToolResultEvent,
} from '@/lib/copilot/orchestrator/sse-utils'
import type { SSEEvent } from '@/lib/copilot/orchestrator/types'
import { ClientToolCallState } from '@/lib/copilot/tools/client/tool-display-registry'
import { resolveToolDisplay } from '@/lib/copilot/store-utils'
import type { CopilotStore, CopilotToolCall } from '@/stores/panel/copilot/types'
import type { StreamingContext } from './types'
import type { ClientStreamingContext } from './types'
import { sseHandlers, type SSEHandler, updateStreamingMessage } from './handlers'
const logger = createLogger('CopilotClientSubagentHandlers')
type StoreSet = (
partial: Partial<CopilotStore> | ((state: CopilotStore) => Partial<CopilotStore>)
) => void
export function appendSubAgentContent(
context: StreamingContext,
context: ClientStreamingContext,
parentToolCallId: string,
text: string
) {
@@ -38,9 +44,9 @@ export function appendSubAgentContent(
}
export function updateToolCallWithSubAgentData(
context: StreamingContext,
context: ClientStreamingContext,
get: () => CopilotStore,
set: any,
set: StoreSet,
parentToolCallId: string
) {
const { toolCallsById } = get()
@@ -76,7 +82,7 @@ export function updateToolCallWithSubAgentData(
let foundInContentBlocks = false
for (let i = 0; i < context.contentBlocks.length; i++) {
const b = context.contentBlocks[i] as any
const b = context.contentBlocks[i]
if (b.type === 'tool_call' && b.toolCall?.id === parentToolCallId) {
context.contentBlocks[i] = { ...b, toolCall: updatedToolCall }
foundInContentBlocks = true
@@ -89,8 +95,8 @@ export function updateToolCallWithSubAgentData(
parentToolCallId,
contentBlocksCount: context.contentBlocks.length,
toolCallBlockIds: context.contentBlocks
.filter((b: any) => b.type === 'tool_call')
.map((b: any) => b.toolCall?.id),
.filter((b) => b.type === 'tool_call')
.map((b) => b.toolCall?.id),
})
}
@@ -104,27 +110,29 @@ export const subAgentSSEHandlers: Record<string, SSEHandler> = {
content: (data, context, get, set) => {
const parentToolCallId = context.subAgentParentToolCallId
const contentStr = typeof data.data === 'string' ? data.data : (data.content || '')
logger.info('[SubAgent] content event', {
parentToolCallId,
hasData: !!data.data,
dataPreview: typeof data.data === 'string' ? data.data.substring(0, 50) : null,
hasData: !!contentStr,
dataPreview: contentStr ? contentStr.substring(0, 50) : null,
})
if (!parentToolCallId || !data.data) {
if (!parentToolCallId || !contentStr) {
logger.warn('[SubAgent] content missing parentToolCallId or data', {
parentToolCallId,
hasData: !!data.data,
hasData: !!contentStr,
})
return
}
appendSubAgentContent(context, parentToolCallId, data.data)
appendSubAgentContent(context, parentToolCallId, contentStr)
updateToolCallWithSubAgentData(context, get, set, parentToolCallId)
},
reasoning: (data, context, get, set) => {
const parentToolCallId = context.subAgentParentToolCallId
const phase = data?.phase || data?.data?.phase
const dataObj = asRecord(data?.data)
const phase = data?.phase || (dataObj.phase as string | undefined)
if (!parentToolCallId) return
if (phase === 'start' || phase === 'end') return
@@ -145,17 +153,18 @@ export const subAgentSSEHandlers: Record<string, SSEHandler> = {
const parentToolCallId = context.subAgentParentToolCallId
if (!parentToolCallId) return
const toolData = data?.data ?? {}
const id: string | undefined = toolData.id || data?.toolCallId
const name: string | undefined = toolData.name || data?.toolName
const toolData = asRecord(data?.data)
const id: string | undefined = (toolData.id as string | undefined) || data?.toolCallId
const name: string | undefined = (toolData.name as string | undefined) || data?.toolName
if (!id || !name) return
const isPartial = toolData.partial === true
let args = toolData.arguments || toolData.input || data?.arguments || data?.input
let args: Record<string, unknown> | undefined =
(toolData.arguments || toolData.input) as Record<string, unknown> | undefined
if (typeof args === 'string') {
try {
args = JSON.parse(args)
args = JSON.parse(args) as Record<string, unknown>
} catch {
logger.warn('[SubAgent] Failed to parse arguments string', { args })
}
@@ -177,7 +186,9 @@ export const subAgentSSEHandlers: Record<string, SSEHandler> = {
context.subAgentBlocks[parentToolCallId] = []
}
const existingIndex = context.subAgentToolCalls[parentToolCallId].findIndex((tc) => tc.id === id)
const existingIndex = context.subAgentToolCalls[parentToolCallId].findIndex(
(tc: CopilotToolCall) => tc.id === id
)
const subAgentToolCall: CopilotToolCall = {
id,
name,
@@ -213,7 +224,8 @@ export const subAgentSSEHandlers: Record<string, SSEHandler> = {
const parentToolCallId = context.subAgentParentToolCallId
if (!parentToolCallId) return
const toolCallId: string | undefined = data?.toolCallId || data?.data?.id
const resultData = asRecord(data?.data)
const toolCallId: string | undefined = data?.toolCallId || (resultData.id as string | undefined)
const success: boolean | undefined = data?.success !== false
if (!toolCallId) return
@@ -222,7 +234,7 @@ export const subAgentSSEHandlers: Record<string, SSEHandler> = {
const targetState = success ? ClientToolCallState.success : ClientToolCallState.error
const existingIndex = context.subAgentToolCalls[parentToolCallId].findIndex(
(tc) => tc.id === toolCallId
(tc: CopilotToolCall) => tc.id === toolCallId
)
if (existingIndex >= 0) {
@@ -268,19 +280,20 @@ export const subAgentSSEHandlers: Record<string, SSEHandler> = {
}
export async function applySseEvent(
data: any,
context: StreamingContext,
rawData: SSEEvent,
context: ClientStreamingContext,
get: () => CopilotStore,
set: (next: Partial<CopilotStore> | ((state: CopilotStore) => Partial<CopilotStore>)) => void
): Promise<boolean> {
const normalizedEvent = normalizeSseEvent(data)
const normalizedEvent = normalizeSseEvent(rawData)
if (shouldSkipToolCallEvent(normalizedEvent) || shouldSkipToolResultEvent(normalizedEvent)) {
return true
}
data = normalizedEvent
const data = normalizedEvent
if (data.type === 'subagent_start') {
const toolCallId = data.data?.tool_call_id
const startData = asRecord(data.data)
const toolCallId = startData.tool_call_id as string | undefined
if (toolCallId) {
context.subAgentParentToolCallId = toolCallId
const { toolCallsById } = get()

View File

@@ -1,12 +1,28 @@
import type { CopilotToolCall } from '@/stores/panel/copilot/types'
import type { ChatContext, CopilotToolCall, SubAgentContentBlock } from '@/stores/panel/copilot/types'
/**
* A content block used in copilot messages and during streaming.
* Uses a literal type union for `type` to stay compatible with CopilotMessage.
*/
export type ContentBlockType = 'text' | 'thinking' | 'tool_call' | 'contexts'
export interface ClientContentBlock {
type: ContentBlockType
content?: string
timestamp: number
toolCall?: CopilotToolCall | null
startTime?: number
duration?: number
contexts?: ChatContext[]
}
export interface StreamingContext {
messageId: string
accumulatedContent: string
contentBlocks: any[]
currentTextBlock: any | null
contentBlocks: ClientContentBlock[]
currentTextBlock: ClientContentBlock | null
isInThinkingBlock: boolean
currentThinkingBlock: any | null
currentThinkingBlock: ClientContentBlock | null
isInDesignWorkflowBlock: boolean
designWorkflowContent: string
pendingContent: string
@@ -18,6 +34,8 @@ export interface StreamingContext {
subAgentParentToolCallId?: string
subAgentContent: Record<string, string>
subAgentToolCalls: Record<string, CopilotToolCall[]>
subAgentBlocks: Record<string, any[]>
subAgentBlocks: Record<string, SubAgentContentBlock[]>
suppressStreamingUpdates?: boolean
}
export type ClientStreamingContext = StreamingContext

View File

@@ -9,3 +9,107 @@ export const SIM_AGENT_API_URL =
rawAgentUrl.startsWith('http://') || rawAgentUrl.startsWith('https://')
? rawAgentUrl
: SIM_AGENT_API_URL_DEFAULT
// ---------------------------------------------------------------------------
// Redis key prefixes
// ---------------------------------------------------------------------------
/** Redis key prefix for tool call confirmation payloads (polled by waitForToolDecision). */
export const REDIS_TOOL_CALL_PREFIX = 'tool_call:'
/** Redis key prefix for copilot SSE stream buffers. */
export const REDIS_COPILOT_STREAM_PREFIX = 'copilot_stream:'
// ---------------------------------------------------------------------------
// Timeouts
// ---------------------------------------------------------------------------
/** Default timeout for the copilot orchestration stream loop (5 min). */
export const ORCHESTRATION_TIMEOUT_MS = 300_000
/** Timeout for the client-side streaming response handler (10 min). */
export const STREAM_TIMEOUT_MS = 600_000
/** TTL for Redis tool call confirmation entries (24 h). */
export const REDIS_TOOL_CALL_TTL_SECONDS = 86_400
// ---------------------------------------------------------------------------
// Tool decision polling
// ---------------------------------------------------------------------------
/** Initial poll interval when waiting for a user tool decision. */
export const TOOL_DECISION_INITIAL_POLL_MS = 100
/** Maximum poll interval when waiting for a user tool decision. */
export const TOOL_DECISION_MAX_POLL_MS = 3_000
/** Backoff multiplier for the tool decision poll interval. */
export const TOOL_DECISION_POLL_BACKOFF = 1.5
// ---------------------------------------------------------------------------
// Stream resume
// ---------------------------------------------------------------------------
/** Maximum number of resume attempts before giving up. */
export const MAX_RESUME_ATTEMPTS = 3
/** SessionStorage key for persisting active stream metadata across page reloads. */
export const STREAM_STORAGE_KEY = 'copilot_active_stream'
// ---------------------------------------------------------------------------
// Client-side streaming batching
// ---------------------------------------------------------------------------
/** Delay (ms) before processing the next queued message after stream completion. */
export const QUEUE_PROCESS_DELAY_MS = 100
/** Delay (ms) before invalidating subscription queries after stream completion. */
export const SUBSCRIPTION_INVALIDATE_DELAY_MS = 1_000
// ---------------------------------------------------------------------------
// UI helpers
// ---------------------------------------------------------------------------
/** Maximum character length for an optimistic chat title derived from a user message. */
export const OPTIMISTIC_TITLE_MAX_LENGTH = 50
// ---------------------------------------------------------------------------
// Copilot API paths (client-side fetch targets)
// ---------------------------------------------------------------------------
/** POST — send a chat message to the copilot. */
export const COPILOT_CHAT_API_PATH = '/api/copilot/chat'
/** GET — resume/replay a copilot SSE stream. */
export const COPILOT_CHAT_STREAM_API_PATH = '/api/copilot/chat/stream'
/** POST — persist chat messages / plan artifact / config. */
export const COPILOT_UPDATE_MESSAGES_API_PATH = '/api/copilot/chat/update-messages'
/** DELETE — delete a copilot chat. */
export const COPILOT_DELETE_CHAT_API_PATH = '/api/copilot/chat/delete'
/** POST — confirm or reject a tool call. */
export const COPILOT_CONFIRM_API_PATH = '/api/copilot/confirm'
/** POST — forward diff-accepted/rejected stats to the copilot backend. */
export const COPILOT_STATS_API_PATH = '/api/copilot/stats'
/** GET — load checkpoints for a chat. */
export const COPILOT_CHECKPOINTS_API_PATH = '/api/copilot/checkpoints'
/** POST — revert to a checkpoint. */
export const COPILOT_CHECKPOINTS_REVERT_API_PATH = '/api/copilot/checkpoints/revert'
/** GET/POST/DELETE — manage auto-allowed tools. */
export const COPILOT_AUTO_ALLOWED_TOOLS_API_PATH = '/api/copilot/auto-allowed-tools'
/** GET — fetch user credentials for masking. */
export const COPILOT_CREDENTIALS_API_PATH = '/api/copilot/credentials'
// ---------------------------------------------------------------------------
// Dedup limits
// ---------------------------------------------------------------------------
/** Maximum entries in the in-memory SSE tool-event dedup cache. */
export const STREAM_BUFFER_MAX_DEDUP_ENTRIES = 1_000

View File

@@ -1,4 +1,5 @@
import { createLogger } from '@sim/logger'
import { COPILOT_CHECKPOINTS_API_PATH } from '@/lib/copilot/constants'
import { mergeSubblockState } from '@/stores/workflows/utils'
import { useWorkflowStore } from '@/stores/workflows/workflow/store'
import type { WorkflowState } from '@/stores/workflows/workflow/types'
@@ -58,7 +59,7 @@ export async function saveMessageCheckpoint(
set({ messageSnapshots: nextSnapshots })
try {
const response = await fetch('/api/copilot/checkpoints', {
const response = await fetch(COPILOT_CHECKPOINTS_API_PATH, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
@@ -117,7 +118,7 @@ export function buildToolCallsById(messages: CopilotMessage[]): Record<string, C
const toolCallsById: Record<string, CopilotToolCall> = {}
for (const msg of messages) {
if (msg.contentBlocks) {
for (const block of msg.contentBlocks as any[]) {
for (const block of msg.contentBlocks) {
if (block?.type === 'tool_call' && block.toolCall?.id) {
extractToolCallsRecursively(block.toolCall, toolCallsById)
}

View File

@@ -1,27 +1,30 @@
export function maskCredentialIdsInValue(value: any, credentialIds: Set<string>): any {
export function maskCredentialIdsInValue<T>(value: T, credentialIds: Set<string>): T {
if (!value || credentialIds.size === 0) return value
if (typeof value === 'string') {
let masked = value
let masked = value as string
const sortedIds = Array.from(credentialIds).sort((a, b) => b.length - a.length)
for (const id of sortedIds) {
if (id && masked.includes(id)) {
masked = masked.split(id).join('••••••••')
}
}
return masked
return masked as unknown as T
}
if (Array.isArray(value)) {
return value.map((item) => maskCredentialIdsInValue(item, credentialIds))
return value.map((item) => maskCredentialIdsInValue(item, credentialIds)) as T
}
if (typeof value === 'object') {
const masked: any = {}
for (const key of Object.keys(value)) {
masked[key] = maskCredentialIdsInValue(value[key], credentialIds)
const masked: Record<string, unknown> = {}
for (const key of Object.keys(value as Record<string, unknown>)) {
masked[key] = maskCredentialIdsInValue(
(value as Record<string, unknown>)[key],
credentialIds
)
}
return masked
return masked as T
}
return value

View File

@@ -1,3 +1,4 @@
export * from './credential-masking'
export * from './serialization'
export * from './checkpoints'
export * from './persist'

View File

@@ -0,0 +1,43 @@
import { createLogger } from '@sim/logger'
import { COPILOT_UPDATE_MESSAGES_API_PATH } from '@/lib/copilot/constants'
import type { CopilotMessage } from '@/stores/panel/copilot/types'
import { serializeMessagesForDB } from './serialization'
const logger = createLogger('CopilotMessagePersistence')
export async function persistMessages(params: {
chatId: string
messages: CopilotMessage[]
sensitiveCredentialIds?: Set<string>
planArtifact?: string | null
mode?: string
model?: string
conversationId?: string
}): Promise<boolean> {
try {
const dbMessages = serializeMessagesForDB(
params.messages,
params.sensitiveCredentialIds ?? new Set<string>()
)
const response = await fetch(COPILOT_UPDATE_MESSAGES_API_PATH, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
chatId: params.chatId,
messages: dbMessages,
...(params.planArtifact !== undefined ? { planArtifact: params.planArtifact } : {}),
...(params.mode || params.model
? { config: { mode: params.mode, model: params.model } }
: {}),
...(params.conversationId ? { conversationId: params.conversationId } : {}),
}),
})
return response.ok
} catch (error) {
logger.warn('Failed to persist messages', {
chatId: params.chatId,
error: error instanceof Error ? error.message : String(error),
})
return false
}
}

View File

@@ -1,10 +1,10 @@
import { createLogger } from '@sim/logger'
import type { CopilotMessage } from '@/stores/panel/copilot/types'
import type { CopilotMessage, CopilotToolCall } from '@/stores/panel/copilot/types'
import { maskCredentialIdsInValue } from './credential-masking'
const logger = createLogger('CopilotMessageSerialization')
export function clearStreamingFlags(toolCall: any): void {
export function clearStreamingFlags(toolCall: CopilotToolCall): void {
if (!toolCall) return
toolCall.subAgentStreaming = false
@@ -27,18 +27,18 @@ export function normalizeMessagesForUI(messages: CopilotMessage[]): CopilotMessa
try {
for (const message of messages) {
if (message.role === 'assistant') {
logger.info('[normalizeMessagesForUI] Loading assistant message', {
logger.debug('[normalizeMessagesForUI] Loading assistant message', {
id: message.id,
hasContent: !!message.content?.trim(),
contentBlockCount: message.contentBlocks?.length || 0,
contentBlockTypes: (message.contentBlocks as any[])?.map((b) => b?.type) ?? [],
contentBlockTypes: message.contentBlocks?.map((b) => b?.type) ?? [],
})
}
}
for (const message of messages) {
if (message.contentBlocks) {
for (const block of message.contentBlocks as any[]) {
for (const block of message.contentBlocks) {
if (block?.type === 'tool_call' && block.toolCall) {
clearStreamingFlags(block.toolCall)
}
@@ -51,7 +51,10 @@ export function normalizeMessagesForUI(messages: CopilotMessage[]): CopilotMessa
}
}
return messages
} catch {
} catch (error) {
logger.warn('[normalizeMessagesForUI] Failed to normalize messages', {
error: error instanceof Error ? error.message : String(error),
})
return messages
}
}
@@ -88,16 +91,16 @@ export function deepClone<T>(obj: T): T {
export function serializeMessagesForDB(
messages: CopilotMessage[],
credentialIds: Set<string>
): any[] {
): CopilotMessage[] {
const result = messages
.map((msg) => {
let timestamp: string = msg.timestamp
if (typeof timestamp !== 'string') {
const ts = timestamp as any
const ts = timestamp as unknown
timestamp = ts instanceof Date ? ts.toISOString() : new Date().toISOString()
}
const serialized: any = {
const serialized: CopilotMessage = {
id: msg.id,
role: msg.role,
content: msg.content || '',
@@ -108,16 +111,16 @@ export function serializeMessagesForDB(
serialized.contentBlocks = deepClone(msg.contentBlocks)
}
if (Array.isArray((msg as any).toolCalls) && (msg as any).toolCalls.length > 0) {
serialized.toolCalls = deepClone((msg as any).toolCalls)
if (Array.isArray(msg.toolCalls) && msg.toolCalls.length > 0) {
serialized.toolCalls = deepClone(msg.toolCalls)
}
if (Array.isArray(msg.fileAttachments) && msg.fileAttachments.length > 0) {
serialized.fileAttachments = deepClone(msg.fileAttachments)
}
if (Array.isArray((msg as any).contexts) && (msg as any).contexts.length > 0) {
serialized.contexts = deepClone((msg as any).contexts)
if (Array.isArray(msg.contexts) && msg.contexts.length > 0) {
serialized.contexts = deepClone(msg.contexts)
}
if (Array.isArray(msg.citations) && msg.citations.length > 0) {
@@ -142,16 +145,16 @@ export function serializeMessagesForDB(
for (const msg of messages) {
if (msg.role === 'assistant') {
logger.info('[serializeMessagesForDB] Input assistant message', {
logger.debug('[serializeMessagesForDB] Input assistant message', {
id: msg.id,
hasContent: !!msg.content?.trim(),
contentBlockCount: msg.contentBlocks?.length || 0,
contentBlockTypes: (msg.contentBlocks as any[])?.map((b) => b?.type) ?? [],
contentBlockTypes: msg.contentBlocks?.map((b) => b?.type) ?? [],
})
}
}
logger.info('[serializeMessagesForDB] Serialized messages', {
logger.debug('[serializeMessagesForDB] Serialized messages', {
inputCount: messages.length,
outputCount: result.length,
sample:

View File

@@ -1,4 +1,5 @@
import { createLogger } from '@sim/logger'
import { REDIS_TOOL_CALL_PREFIX } from '@/lib/copilot/constants'
import { getRedisClient } from '@/lib/core/config/redis'
const logger = createLogger('CopilotOrchestratorPersistence')
@@ -15,7 +16,7 @@ export async function getToolConfirmation(toolCallId: string): Promise<{
if (!redis) return null
try {
const data = await redis.get(`tool_call:${toolCallId}`)
const data = await redis.get(`${REDIS_TOOL_CALL_PREFIX}${toolCallId}`)
if (!data) return null
return JSON.parse(data) as { status: string; message?: string; timestamp?: string }
} catch (error) {

View File

@@ -1,4 +1,5 @@
import { createLogger } from '@sim/logger'
import { STREAM_TIMEOUT_MS } from '@/lib/copilot/constants'
import { RESPOND_TOOL_SET, SUBAGENT_TOOL_SET } from '@/lib/copilot/orchestrator/config'
import {
asRecord,
@@ -21,15 +22,16 @@ const logger = createLogger('CopilotSseHandlers')
// Normalization + dedupe helpers live in sse-utils to keep server/client in sync.
function inferToolSuccess(data: Record<string, any> | undefined): {
function inferToolSuccess(data: Record<string, unknown> | undefined): {
success: boolean
hasResultData: boolean
hasError: boolean
} {
const hasExplicitSuccess = data?.success !== undefined || data?.result?.success !== undefined
const explicitSuccess = data?.success ?? data?.result?.success
const resultObj = asRecord(data?.result)
const hasExplicitSuccess = data?.success !== undefined || resultObj.success !== undefined
const explicitSuccess = data?.success ?? resultObj.success
const hasResultData = data?.result !== undefined || data?.data !== undefined
const hasError = !!data?.error || !!data?.result?.error
const hasError = !!data?.error || !!resultObj.error
const success = hasExplicitSuccess ? !!explicitSuccess : hasResultData && !hasError
return { success, hasResultData, hasError }
}
@@ -50,12 +52,12 @@ function addContentBlock(context: StreamingContext, block: Omit<ContentBlock, 't
export const sseHandlers: Record<string, SSEHandler> = {
chat_id: (event, context) => {
context.chatId = asRecord(event.data).chatId
context.chatId = asRecord(event.data).chatId as string | undefined
},
title_updated: () => {},
tool_result: (event, context) => {
const data = getEventData(event)
const toolCallId = event.toolCallId || data?.id
const toolCallId = event.toolCallId || (data?.id as string | undefined)
if (!toolCallId) return
const current = context.toolCalls.get(toolCallId)
if (!current) return
@@ -71,23 +73,24 @@ export const sseHandlers: Record<string, SSEHandler> = {
}
}
if (hasError) {
current.error = data?.error || data?.result?.error
const resultObj = asRecord(data?.result)
current.error = (data?.error || resultObj.error) as string | undefined
}
},
tool_error: (event, context) => {
const data = getEventData(event)
const toolCallId = event.toolCallId || data?.id
const toolCallId = event.toolCallId || (data?.id as string | undefined)
if (!toolCallId) return
const current = context.toolCalls.get(toolCallId)
if (!current) return
current.status = 'error'
current.error = data?.error || 'Tool execution failed'
current.error = (data?.error as string | undefined) || 'Tool execution failed'
current.endTime = Date.now()
},
tool_generating: (event, context) => {
const data = getEventData(event)
const toolCallId = event.toolCallId || data?.toolCallId || data?.id
const toolName = event.toolName || data?.toolName || data?.name
const toolCallId = event.toolCallId || (data?.toolCallId as string | undefined) || (data?.id as string | undefined)
const toolName = event.toolName || (data?.toolName as string | undefined) || (data?.name as string | undefined)
if (!toolCallId || !toolName) return
if (!context.toolCalls.has(toolCallId)) {
context.toolCalls.set(toolCallId, {
@@ -99,12 +102,12 @@ export const sseHandlers: Record<string, SSEHandler> = {
}
},
tool_call: async (event, context, execContext, options) => {
const toolData = getEventData(event) || {}
const toolCallId = toolData.id || event.toolCallId
const toolName = toolData.name || event.toolName
const toolData = getEventData(event) || ({} as Record<string, unknown>)
const toolCallId = (toolData.id as string | undefined) || event.toolCallId
const toolName = (toolData.name as string | undefined) || event.toolName
if (!toolCallId || !toolName) return
const args = toolData.arguments || toolData.input || asRecord(event.data).input
const args = (toolData.arguments || toolData.input || asRecord(event.data).input) as Record<string, unknown> | undefined
const isPartial = toolData.partial === true
const existing = context.toolCalls.get(toolCallId)
@@ -161,7 +164,7 @@ export const sseHandlers: Record<string, SSEHandler> = {
const isInteractive = options.interactive === true
if (isInterruptTool && isInteractive) {
const decision = await waitForToolDecision(toolCallId, options.timeout || 600000)
const decision = await waitForToolDecision(toolCallId, options.timeout || STREAM_TIMEOUT_MS, options.abortSignal)
if (decision?.status === 'accepted' || decision?.status === 'success') {
await executeToolAndReport(toolCallId, context, execContext, options)
return
@@ -221,7 +224,8 @@ export const sseHandlers: Record<string, SSEHandler> = {
}
},
reasoning: (event, context) => {
const phase = asRecord(event.data).phase || asRecord(asRecord(event.data).data).phase
const d = asRecord(event.data)
const phase = d.phase || asRecord(d.data).phase
if (phase === 'start') {
context.isInThinkingBlock = true
context.currentThinkingBlock = {
@@ -239,17 +243,16 @@ export const sseHandlers: Record<string, SSEHandler> = {
context.currentThinkingBlock = null
return
}
const d = asRecord(event.data)
const chunk = typeof event.data === 'string' ? event.data : d.data || d.content
const chunk = (d.data || d.content || event.content) as string | undefined
if (!chunk || !context.currentThinkingBlock) return
context.currentThinkingBlock.content = `${context.currentThinkingBlock.content || ''}${chunk}`
},
content: (event, context) => {
const d = asRecord(event.data)
const chunk = typeof event.data === 'string' ? event.data : d.content || d.data
const chunk = (d.content || d.data || event.content) as string | undefined
if (!chunk) return
context.accumulatedContent += chunk
addContentBlock(context, { type: 'text', content: chunk as string })
addContentBlock(context, { type: 'text', content: chunk })
},
done: (event, context) => {
const d = asRecord(event.data)
@@ -266,7 +269,7 @@ export const sseHandlers: Record<string, SSEHandler> = {
},
error: (event, context) => {
const d = asRecord(event.data)
const message = d.message || d.error || (typeof event.data === 'string' ? event.data : null)
const message = (d.message || d.error || event.error) as string | undefined
if (message) {
context.errors.push(message)
}
@@ -278,7 +281,8 @@ export const subAgentHandlers: Record<string, SSEHandler> = {
content: (event, context) => {
const parentToolCallId = context.subAgentParentToolCallId
if (!parentToolCallId || !event.data) return
const chunk = typeof event.data === 'string' ? event.data : asRecord(event.data).content || ''
const d = asRecord(event.data)
const chunk = (d.content || d.data || event.content) as string | undefined
if (!chunk) return
context.subAgentContent[parentToolCallId] =
(context.subAgentContent[parentToolCallId] || '') + chunk
@@ -287,12 +291,12 @@ export const subAgentHandlers: Record<string, SSEHandler> = {
tool_call: async (event, context, execContext, options) => {
const parentToolCallId = context.subAgentParentToolCallId
if (!parentToolCallId) return
const toolData = getEventData(event) || {}
const toolCallId = toolData.id || event.toolCallId
const toolName = toolData.name || event.toolName
const toolData = getEventData(event) || ({} as Record<string, unknown>)
const toolCallId = (toolData.id as string | undefined) || event.toolCallId
const toolName = (toolData.name as string | undefined) || event.toolName
if (!toolCallId || !toolName) return
const isPartial = toolData.partial === true
const args = toolData.arguments || toolData.input || asRecord(event.data).input
const args = (toolData.arguments || toolData.input || asRecord(event.data).input) as Record<string, unknown> | undefined
const existing = context.toolCalls.get(toolCallId)
// Ignore late/duplicate tool_call events once we already have a result.

View File

@@ -1,4 +1,9 @@
import { createLogger } from '@sim/logger'
import {
TOOL_DECISION_INITIAL_POLL_MS,
TOOL_DECISION_MAX_POLL_MS,
TOOL_DECISION_POLL_BACKOFF,
} from '@/lib/copilot/constants'
import { INTERRUPT_TOOL_SET } from '@/lib/copilot/orchestrator/config'
import { getToolConfirmation } from '@/lib/copilot/orchestrator/persistence'
import {
@@ -103,15 +108,20 @@ export async function executeToolAndReport(
export async function waitForToolDecision(
toolCallId: string,
timeoutMs: number
timeoutMs: number,
abortSignal?: AbortSignal
): Promise<{ status: string; message?: string } | null> {
const start = Date.now()
let interval = TOOL_DECISION_INITIAL_POLL_MS
const maxInterval = TOOL_DECISION_MAX_POLL_MS
while (Date.now() - start < timeoutMs) {
if (abortSignal?.aborted) return null
const decision = await getToolConfirmation(toolCallId)
if (decision?.status) {
return decision
}
await new Promise((resolve) => setTimeout(resolve, 100))
await new Promise((resolve) => setTimeout(resolve, interval))
interval = Math.min(interval * TOOL_DECISION_POLL_BACKOFF, maxInterval)
}
return null
}

View File

@@ -1,22 +1,29 @@
import { STREAM_BUFFER_MAX_DEDUP_ENTRIES } from '@/lib/copilot/constants'
import type { SSEEvent } from '@/lib/copilot/orchestrator/types'
type EventDataObject = Record<string, any> | undefined
type EventDataObject = Record<string, unknown> | undefined
/** Safely cast event.data to a record for property access. */
export const asRecord = (data: unknown): Record<string, any> =>
(data && typeof data === 'object' && !Array.isArray(data) ? data : {}) as Record<string, any>
const DEFAULT_TOOL_EVENT_TTL_MS = 5 * 60 * 1000
export const asRecord = (data: unknown): Record<string, unknown> =>
(data && typeof data === 'object' && !Array.isArray(data) ? data : {}) as Record<string, unknown>
/**
* In-memory tool event dedupe.
* In-memory tool event dedupe with bounded size.
*
* NOTE: These sets are process-local only. In a multi-instance setup (e.g., ECS),
* each task maintains its own dedupe cache, so duplicates can still appear across tasks.
* NOTE: Process-local only. In a multi-instance setup (e.g., ECS),
* each task maintains its own dedupe cache.
*/
const seenToolCalls = new Set<string>()
const seenToolResults = new Set<string>()
function addToSet(set: Set<string>, id: string): void {
if (set.size >= STREAM_BUFFER_MAX_DEDUP_ENTRIES) {
const first = set.values().next().value
if (first) set.delete(first)
}
set.add(id)
}
const parseEventData = (data: unknown): EventDataObject => {
if (!data) return undefined
if (typeof data !== 'string') {
@@ -51,7 +58,7 @@ export const getEventData = (event: SSEEvent): EventDataObject => {
function getToolCallIdFromEvent(event: SSEEvent): string | undefined {
const data = getEventData(event)
return event.toolCallId || data?.id || data?.toolCallId
return event.toolCallId || (data?.id as string | undefined) || (data?.toolCallId as string | undefined)
}
/** Normalizes SSE events so tool metadata is available at the top level. */
@@ -59,9 +66,9 @@ export function normalizeSseEvent(event: SSEEvent): SSEEvent {
if (!event) return event
const data = getEventData(event)
if (!data) return event
const toolCallId = event.toolCallId || data.id || data.toolCallId
const toolName = event.toolName || data.name || data.toolName
const success = event.success ?? data.success
const toolCallId = event.toolCallId || (data.id as string | undefined) || (data.toolCallId as string | undefined)
const toolName = event.toolName || (data.name as string | undefined) || (data.toolName as string | undefined)
const success = event.success ?? (data.success as boolean | undefined)
const result = event.result ?? data.result
const normalizedData = typeof event.data === 'string' ? data : event.data
return {
@@ -74,25 +81,16 @@ export function normalizeSseEvent(event: SSEEvent): SSEEvent {
}
}
function markToolCallSeen(toolCallId: string, ttlMs: number = DEFAULT_TOOL_EVENT_TTL_MS): void {
seenToolCalls.add(toolCallId)
setTimeout(() => {
seenToolCalls.delete(toolCallId)
}, ttlMs)
function markToolCallSeen(toolCallId: string): void {
addToSet(seenToolCalls, toolCallId)
}
function wasToolCallSeen(toolCallId: string): boolean {
return seenToolCalls.has(toolCallId)
}
export function markToolResultSeen(
toolCallId: string,
ttlMs: number = DEFAULT_TOOL_EVENT_TTL_MS
): void {
seenToolResults.add(toolCallId)
setTimeout(() => {
seenToolResults.delete(toolCallId)
}, ttlMs)
export function markToolResultSeen(toolCallId: string): void {
addToSet(seenToolResults, toolCallId)
}
export function wasToolResultSeen(toolCallId: string): boolean {

View File

@@ -1,4 +1,5 @@
import { createLogger } from '@sim/logger'
import { REDIS_COPILOT_STREAM_PREFIX } from '@/lib/copilot/constants'
import { env } from '@/lib/core/config/env'
import { getRedisClient } from '@/lib/core/config/redis'
@@ -59,7 +60,7 @@ return id
`
function getStreamKeyPrefix(streamId: string) {
return `copilot_stream:${streamId}`
return `${REDIS_COPILOT_STREAM_PREFIX}${streamId}`
}
function getEventsKey(streamId: string) {
@@ -86,11 +87,11 @@ export type StreamMeta = {
export type StreamEventEntry = {
eventId: number
streamId: string
event: Record<string, any>
event: Record<string, unknown>
}
export type StreamEventWriter = {
write: (event: Record<string, any>) => Promise<StreamEventEntry>
write: (event: Record<string, unknown>) => Promise<StreamEventEntry>
flush: () => Promise<void>
close: () => Promise<void>
}
@@ -147,7 +148,7 @@ export async function getStreamMeta(streamId: string): Promise<StreamMeta | null
export async function appendStreamEvent(
streamId: string,
event: Record<string, any>
event: Record<string, unknown>
): Promise<StreamEventEntry> {
const redis = getRedisClient()
if (!redis) {
@@ -225,7 +226,7 @@ export function createStreamEventWriter(streamId: string): StreamEventWriter {
zaddArgs.push(entry.eventId, JSON.stringify(entry))
}
const pipeline = redis.pipeline()
pipeline.zadd(key, ...(zaddArgs as any))
pipeline.zadd(key, ...(zaddArgs as [number, string]))
pipeline.expire(key, config.ttlSeconds)
pipeline.expire(getSeqKey(streamId), config.ttlSeconds)
pipeline.zremrangebyrank(key, 0, -config.eventLimit - 1)
@@ -253,7 +254,7 @@ export function createStreamEventWriter(streamId: string): StreamEventWriter {
}
}
const write = async (event: Record<string, any>) => {
const write = async (event: Record<string, unknown>) => {
if (closed) return { eventId: 0, streamId, event }
if (nextEventId === 0 || nextEventId > maxReservedId) {
await reserveIds(1)

View File

@@ -1,4 +1,5 @@
import { createLogger } from '@sim/logger'
import { ORCHESTRATION_TIMEOUT_MS } from '@/lib/copilot/constants'
import {
handleSubagentRouting,
sseHandlers,
@@ -68,7 +69,7 @@ export async function runStreamLoop(
execContext: ExecutionContext,
options: StreamLoopOptions
): Promise<void> {
const { timeout = 300000, abortSignal } = options
const { timeout = ORCHESTRATION_TIMEOUT_MS, abortSignal } = options
const response = await fetch(fetchUrl, {
...fetchOptions,

View File

@@ -28,7 +28,7 @@ export interface SubagentOrchestratorResult {
structuredResult?: {
type?: string
summary?: string
data?: any
data?: unknown
success?: boolean
}
error?: string
@@ -37,14 +37,15 @@ export interface SubagentOrchestratorResult {
export async function orchestrateSubagentStream(
agentId: string,
requestPayload: Record<string, any>,
requestPayload: Record<string, unknown>,
options: SubagentOrchestratorOptions
): Promise<SubagentOrchestratorResult> {
const { userId, workflowId, workspaceId } = options
const execContext = await buildExecutionContext(userId, workflowId, workspaceId)
const msgId = requestPayload?.messageId
const context = createStreamingContext({
messageId: requestPayload?.messageId || crypto.randomUUID(),
messageId: typeof msgId === 'string' ? msgId : crypto.randomUUID(),
})
let structuredResult: SubagentOrchestratorResult['structuredResult']
@@ -109,12 +110,12 @@ export async function orchestrateSubagentStream(
function normalizeStructuredResult(data: unknown): SubagentOrchestratorResult['structuredResult'] {
if (!data || typeof data !== 'object') return undefined
const d = data as Record<string, any>
const d = data as Record<string, unknown>
return {
type: d.result_type || d.type,
summary: d.summary,
type: (d.result_type || d.type) as string | undefined,
summary: d.summary as string | undefined,
data: d.data ?? d,
success: d.success,
success: d.success as boolean | undefined,
}
}

View File

@@ -58,7 +58,17 @@ export async function executeCheckDeploymentStatus(
hasPassword: Boolean(chatDeploy[0]?.password),
}
const mcpDetails = { isDeployed: false, servers: [] as any[] }
const mcpDetails: {
isDeployed: boolean
servers: Array<{
serverId: string
serverName: string
toolName: string
toolDescription: string | null
parameterSchema: unknown
toolId: string
}>
} = { isDeployed: false, servers: [] }
if (workspaceId) {
const servers = await db
.select({

View File

@@ -138,7 +138,7 @@ export async function executeToolServerSide(
*/
async function executeServerToolDirect(
toolName: string,
params: Record<string, any>,
params: Record<string, unknown>,
context: ExecutionContext
): Promise<ToolCallResult> {
try {
@@ -180,8 +180,8 @@ export async function markToolComplete(
toolCallId: string,
toolName: string,
status: number,
message?: any,
data?: any
message?: unknown,
data?: unknown
): Promise<boolean> {
try {
const response = await fetch(`${SIM_AGENT_API_URL}/api/tools/mark-complete`, {

View File

@@ -41,9 +41,9 @@ export async function executeIntegrationToolDirect(
// Deep resolution walks nested objects to replace {{ENV_VAR}} references.
// Safe because tool arguments originate from the LLM (not direct user input)
// and env vars belong to the user themselves.
const executionParams: Record<string, any> = resolveEnvVarReferences(toolArgs, decryptedEnvVars, {
const executionParams = resolveEnvVarReferences(toolArgs, decryptedEnvVars, {
deep: true,
}) as Record<string, any>
}) as Record<string, unknown>
if (toolConfig.oauth?.required && toolConfig.oauth.provider) {
const provider = toolConfig.oauth.provider
@@ -62,7 +62,7 @@ export async function executeIntegrationToolDirect(
const acc = accounts[0]
const requestId = generateRequestId()
const { accessToken } = await refreshTokenIfNeeded(requestId, acc as any, acc.id)
const { accessToken } = await refreshTokenIfNeeded(requestId, acc, acc.id)
if (!accessToken) {
return {

View File

@@ -1,4 +1,5 @@
import crypto from 'crypto'
import { createLogger } from '@sim/logger'
import { db } from '@sim/db'
import { workflow, workflowFolder } from '@sim/db/schema'
import { and, eq, isNull, max } from 'drizzle-orm'
@@ -16,6 +17,8 @@ import type {
VariableOperation,
} from '../param-types'
const logger = createLogger('WorkflowMutations')
export async function executeCreateWorkflow(
params: CreateWorkflowParams,
context: ExecutionContext
@@ -185,17 +188,27 @@ export async function executeSetGlobalWorkflowVariables(
: []
const { workflow: workflowRecord } = await ensureWorkflowAccess(workflowId, context.userId)
const currentVarsRecord = (workflowRecord.variables as Record<string, any>) || {}
const byName: Record<string, any> = {}
Object.values(currentVarsRecord).forEach((v: any) => {
if (v && typeof v === 'object' && v.id && v.name) byName[String(v.name)] = v
interface WorkflowVariable {
id: string
workflowId?: string
name: string
type: string
value?: unknown
}
const currentVarsRecord = (workflowRecord.variables as Record<string, unknown>) || {}
const byName: Record<string, WorkflowVariable> = {}
Object.values(currentVarsRecord).forEach((v) => {
if (v && typeof v === 'object' && 'id' in v && 'name' in v) {
const variable = v as WorkflowVariable
byName[String(variable.name)] = variable
}
})
for (const op of operations) {
const key = String(op?.name || '')
if (!key) continue
const nextType = op?.type || byName[key]?.type || 'plain'
const coerceValue = (value: any, type: string) => {
const coerceValue = (value: unknown, type: string): unknown => {
if (value === undefined) return value
if (type === 'number') {
const n = Number(value)
@@ -213,7 +226,9 @@ export async function executeSetGlobalWorkflowVariables(
if (type === 'array' && Array.isArray(parsed)) return parsed
if (type === 'object' && parsed && typeof parsed === 'object' && !Array.isArray(parsed))
return parsed
} catch {}
} catch (error) {
logger.warn('Failed to parse JSON value for variable coercion', { error: error instanceof Error ? error.message : String(error) })
}
return value
}
return value
@@ -254,7 +269,7 @@ export async function executeSetGlobalWorkflowVariables(
}
const nextVarsRecord = Object.fromEntries(
Object.values(byName).map((v: any) => [String(v.id), v])
Object.values(byName).map((v) => [String(v.id), v])
)
await db

View File

@@ -13,6 +13,7 @@ import { getBlockOutputPaths } from '@/lib/workflows/blocks/block-outputs'
import { BlockPathCalculator } from '@/lib/workflows/blocks/block-path-calculator'
import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/persistence/utils'
import { isInputDefinitionTrigger } from '@/lib/workflows/triggers/input-definition-triggers'
import type { Loop, Parallel } from '@/stores/workflows/workflow/types'
import { normalizeName } from '@/executor/constants'
import {
ensureWorkflowAccess,
@@ -209,12 +210,15 @@ export async function executeGetWorkflowData(
)
if (dataType === 'global_variables') {
const variablesRecord = (workflowRecord.variables as Record<string, any>) || {}
const variables = Object.values(variablesRecord).map((v: any) => ({
id: String(v?.id || ''),
name: String(v?.name || ''),
value: v?.value,
}))
const variablesRecord = (workflowRecord.variables as Record<string, unknown>) || {}
const variables = Object.values(variablesRecord).map((v) => {
const variable = v as Record<string, unknown> | null
return {
id: String(variable?.id || ''),
name: String(variable?.name || ''),
value: variable?.value,
}
})
return { success: true, output: { variables } }
}
@@ -232,13 +236,17 @@ export async function executeGetWorkflowData(
.where(or(...conditions))
.orderBy(desc(customTools.createdAt))
const customToolsData = toolsRows.map((tool) => ({
id: String(tool.id || ''),
title: String(tool.title || ''),
functionName: String((tool.schema as any)?.function?.name || ''),
description: String((tool.schema as any)?.function?.description || ''),
parameters: (tool.schema as any)?.function?.parameters,
}))
const customToolsData = toolsRows.map((tool) => {
const schema = tool.schema as Record<string, unknown> | null
const fn = (schema?.function ?? {}) as Record<string, unknown>
return {
id: String(tool.id || ''),
title: String(tool.title || ''),
functionName: String(fn.name || ''),
description: String(fn.description || ''),
parameters: fn.parameters,
}
})
return { success: true, output: { customTools: customToolsData } }
}
@@ -377,10 +385,28 @@ export async function executeGetBlockUpstreamReferences(
const loops = normalized.loops || {}
const parallels = normalized.parallels || {}
const graphEdges = edges.map((edge: any) => ({ source: edge.source, target: edge.target }))
const graphEdges = edges.map((edge) => ({ source: edge.source, target: edge.target }))
const variableOutputs = await getWorkflowVariablesForTool(workflowId)
const results: any[] = []
interface AccessibleBlockEntry {
blockId: string
blockName: string
blockType: string
outputs: string[]
triggerMode?: boolean
accessContext?: 'inside' | 'outside'
}
interface UpstreamReferenceResult {
blockId: string
blockName: string
blockType: string
accessibleBlocks: AccessibleBlockEntry[]
insideSubflows: Array<{ blockId: string; blockName: string; blockType: string }>
variables: Array<{ id: string; name: string; type: string; tag: string }>
}
const results: UpstreamReferenceResult[] = []
for (const blockId of params.blockIds) {
const targetBlock = blocks[blockId]
@@ -390,7 +416,7 @@ export async function executeGetBlockUpstreamReferences(
const containingLoopIds = new Set<string>()
const containingParallelIds = new Set<string>()
Object.values(loops as Record<string, any>).forEach((loop) => {
Object.values(loops).forEach((loop) => {
if (loop?.nodes?.includes(blockId)) {
containingLoopIds.add(loop.id)
const loopBlock = blocks[loop.id]
@@ -404,7 +430,7 @@ export async function executeGetBlockUpstreamReferences(
}
})
Object.values(parallels as Record<string, any>).forEach((parallel) => {
Object.values(parallels).forEach((parallel) => {
if (parallel?.nodes?.includes(blockId)) {
containingParallelIds.add(parallel.id)
const parallelBlock = blocks[parallel.id]
@@ -422,9 +448,9 @@ export async function executeGetBlockUpstreamReferences(
const accessibleIds = new Set<string>(ancestorIds)
accessibleIds.add(blockId)
const starterBlock = Object.values(blocks).find((b: any) => isInputDefinitionTrigger(b.type))
if (starterBlock && ancestorIds.includes((starterBlock as any).id)) {
accessibleIds.add((starterBlock as any).id)
const starterBlock = Object.values(blocks).find((b) => isInputDefinitionTrigger(b.type))
if (starterBlock && ancestorIds.includes(starterBlock.id)) {
accessibleIds.add(starterBlock.id)
}
containingLoopIds.forEach((loopId) => {
@@ -437,7 +463,7 @@ export async function executeGetBlockUpstreamReferences(
parallels[parallelId]?.nodes?.forEach((nodeId: string) => accessibleIds.add(nodeId))
})
const accessibleBlocks: any[] = []
const accessibleBlocks: AccessibleBlockEntry[] = []
for (const accessibleBlockId of accessibleIds) {
const block = blocks[accessibleBlockId]
@@ -462,14 +488,14 @@ export async function executeGetBlockUpstreamReferences(
}
const formattedOutputs = formatOutputsWithPrefix(outputPaths, blockName)
const entry: any = {
const entry: AccessibleBlockEntry = {
blockId: accessibleBlockId,
blockName,
blockType: block.type,
outputs: formattedOutputs,
...(block.triggerMode ? { triggerMode: true } : {}),
...(accessContext ? { accessContext } : {}),
}
if (block.triggerMode) entry.triggerMode = true
if (accessContext) entry.accessContext = accessContext
accessibleBlocks.push(entry)
}
@@ -499,10 +525,14 @@ async function getWorkflowVariablesForTool(
.where(eq(workflow.id, workflowId))
.limit(1)
const variablesRecord = (workflowRecord?.variables as Record<string, any>) || {}
const variablesRecord = (workflowRecord?.variables as Record<string, unknown>) || {}
return Object.values(variablesRecord)
.filter((v: any) => v?.name && String(v.name).trim() !== '')
.map((v: any) => ({
.filter((v): v is Record<string, unknown> => {
if (!v || typeof v !== 'object') return false
const variable = v as Record<string, unknown>
return !!variable.name && String(variable.name).trim() !== ''
})
.map((v) => ({
id: String(v.id || ''),
name: String(v.name || ''),
type: String(v.type || 'plain'),
@@ -513,8 +543,8 @@ async function getWorkflowVariablesForTool(
function getSubflowInsidePaths(
blockType: 'loop' | 'parallel',
blockId: string,
loops: Record<string, any>,
parallels: Record<string, any>
loops: Record<string, Loop>,
parallels: Record<string, Parallel>
): string[] {
const paths = ['index']
if (blockType === 'loop') {

View File

@@ -19,12 +19,24 @@ export type SSEEventType =
export interface SSEEvent {
type: SSEEventType
data?: unknown
data?: Record<string, unknown>
subagent?: string
toolCallId?: string
toolName?: string
success?: boolean
result?: unknown
/** Set on chat_id events */
chatId?: string
/** Set on title_updated events */
title?: string
/** Set on error events */
error?: string
/** Set on content/reasoning events */
content?: string
/** Set on reasoning events */
phase?: string
/** Set on tool_result events */
failedDependency?: boolean
}
export type ToolCallStatus = 'pending' | 'executing' | 'success' | 'error' | 'skipped' | 'rejected'

View File

@@ -1,3 +1,4 @@
import { createLogger } from '@sim/logger'
import { Loader2 } from 'lucide-react'
import {
ClientToolCallState,
@@ -6,6 +7,12 @@ import {
} from '@/lib/copilot/tools/client/tool-display-registry'
import type { CopilotStore } from '@/stores/panel/copilot/types'
const logger = createLogger('CopilotStoreUtils')
type StoreSet = (
partial: Partial<CopilotStore> | ((state: CopilotStore) => Partial<CopilotStore>)
) => void
export function resolveToolDisplay(
toolName: string | undefined,
state: ClientToolCallState,
@@ -80,7 +87,7 @@ export function isTerminalState(state: string): boolean {
}
export function abortAllInProgressTools(
set: any,
set: StoreSet,
get: () => CopilotStore
) {
try {
@@ -89,7 +96,7 @@ export function abortAllInProgressTools(
const abortedIds = new Set<string>()
let hasUpdates = false
for (const [id, tc] of Object.entries(toolCallsById)) {
const st = tc.state as any
const st = tc.state
const isTerminal =
st === ClientToolCallState.success ||
st === ClientToolCallState.error ||
@@ -101,7 +108,7 @@ export function abortAllInProgressTools(
...tc,
state: ClientToolCallState.aborted,
subAgentStreaming: false,
display: resolveToolDisplay(tc.name, ClientToolCallState.aborted, id, (tc as any).params),
display: resolveToolDisplay(tc.name, ClientToolCallState.aborted, id, tc.params),
}
hasUpdates = true
} else if (tc.subAgentStreaming) {
@@ -117,7 +124,7 @@ export function abortAllInProgressTools(
set((s: CopilotStore) => {
const msgs = [...s.messages]
for (let mi = msgs.length - 1; mi >= 0; mi--) {
const m = msgs[mi] as any
const m = msgs[mi]
if (m.role !== 'assistant' || !Array.isArray(m.contentBlocks)) continue
let changed = false
const blocks = m.contentBlocks.map((b: any) => {
@@ -148,7 +155,33 @@ export function abortAllInProgressTools(
return { messages: msgs }
})
}
} catch {}
} catch (error) {
logger.warn('Failed to abort in-progress tools', {
error: error instanceof Error ? error.message : String(error),
})
}
}
export function cleanupActiveState(
set: (partial: Record<string, unknown>) => void,
get: () => Record<string, unknown>
): void {
abortAllInProgressTools(
set as unknown as StoreSet,
get as unknown as () => CopilotStore
)
try {
const { useWorkflowDiffStore } = require('@/stores/workflow-diff/store') as {
useWorkflowDiffStore: {
getState: () => { clearDiff: (options?: { restoreBaseline?: boolean }) => void }
}
}
useWorkflowDiffStore.getState().clearDiff({ restoreBaseline: false })
} catch (error) {
logger.warn('Failed to clear diff during cleanup', {
error: error instanceof Error ? error.message : String(error),
})
}
}
export function stripTodoTags(text: string): string {

View File

@@ -288,7 +288,9 @@ export const getBlocksMetadataServerTool: BaseServerTool<
if (existsSync(docPath)) {
metadata.yamlDocumentation = readFileSync(docPath, 'utf-8')
}
} catch {}
} catch (error) {
logger.warn('Failed to read YAML documentation file', { error: error instanceof Error ? error.message : String(error) })
}
if (metadata) {
result[blockId] = removeNullish(metadata) as CopilotBlockMetadata
@@ -951,7 +953,10 @@ function resolveToolIdForOperation(blockConfig: BlockConfig, opId: string): stri
const maybeToolId = toolSelector({ operation: opId })
if (typeof maybeToolId === 'string') return maybeToolId
}
} catch {}
} catch (error) {
const toolLogger = createLogger('GetBlocksMetadataServerTool')
toolLogger.warn('Failed to resolve tool ID for operation', { error: error instanceof Error ? error.message : String(error) })
}
return undefined
}

View File

@@ -89,7 +89,9 @@ export const getCredentialsServerTool: BaseServerTool<GetCredentialsParams, any>
try {
const decoded = jwtDecode<{ email?: string; name?: string }>(acc.idToken)
displayName = decoded.email || decoded.name || ''
} catch {}
} catch (error) {
logger.warn('Failed to decode JWT id token', { error: error instanceof Error ? error.message : String(error) })
}
}
if (!displayName && baseProvider === 'github') displayName = `${acc.accountId} (GitHub)`
if (!displayName && userEmail) displayName = userEmail
@@ -107,7 +109,9 @@ export const getCredentialsServerTool: BaseServerTool<GetCredentialsParams, any>
acc.id
)
accessToken = refreshedToken || accessToken
} catch {}
} catch (error) {
logger.warn('Failed to refresh OAuth access token', { error: error instanceof Error ? error.message : String(error) })
}
connectedCredentials.push({
id: acc.id,
name: displayName,

View File

@@ -4,6 +4,8 @@ import { createLogger } from '@sim/logger'
import { desc, eq } from 'drizzle-orm'
import type { BaseServerTool } from '@/lib/copilot/tools/server/base-tool'
const logger = createLogger('GetWorkflowConsoleServerTool')
interface GetWorkflowConsoleArgs {
workflowId: string
limit?: number
@@ -87,7 +89,9 @@ function normalizeErrorMessage(errorValue: unknown): string | undefined {
if (typeof errorValue === 'object') {
try {
return JSON.stringify(errorValue)
} catch {}
} catch (error) {
logger.warn('Failed to stringify error value', { error: error instanceof Error ? error.message : String(error) })
}
}
try {
return String(errorValue)
@@ -217,7 +221,6 @@ function deriveExecutionErrorSummary(params: {
export const getWorkflowConsoleServerTool: BaseServerTool<GetWorkflowConsoleArgs, any> = {
name: 'get_workflow_console',
async execute(rawArgs: GetWorkflowConsoleArgs): Promise<any> {
const logger = createLogger('GetWorkflowConsoleServerTool')
const {
workflowId,
limit = 2,

File diff suppressed because it is too large Load Diff

View File

@@ -2,6 +2,7 @@ import type { CopilotMode, CopilotModelId } from '@/lib/copilot/models'
export type { CopilotMode, CopilotModelId } from '@/lib/copilot/models'
import type { ClientContentBlock } from '@/lib/copilot/client-sse/types'
import type { ClientToolCallState, ClientToolDisplay } from '@/lib/copilot/tools/client/base-tool'
import type { WorkflowState } from '@/stores/workflows/workflow/types'
@@ -21,7 +22,8 @@ export interface CopilotToolCall {
id: string
name: string
state: ClientToolCallState
params?: Record<string, any>
params?: Record<string, unknown>
input?: Record<string, unknown>
display?: ClientToolDisplay
/** Content streamed from a subagent (e.g., debug agent) */
subAgentContent?: string
@@ -62,18 +64,7 @@ export interface CopilotMessage {
timestamp: string
citations?: { id: number; title: string; url: string; similarity?: number }[]
toolCalls?: CopilotToolCall[]
contentBlocks?: Array<
| { type: 'text'; content: string; timestamp: number }
| {
type: 'thinking'
content: string
timestamp: number
duration?: number
startTime?: number
}
| { type: 'tool_call'; toolCall: CopilotToolCall; timestamp: number }
| { type: 'contexts'; contexts: ChatContext[]; timestamp: number }
>
contentBlocks?: ClientContentBlock[]
fileAttachments?: MessageFileAttachment[]
contexts?: ChatContext[]
errorType?: 'usage_limit' | 'unauthorized' | 'forbidden' | 'rate_limit' | 'upgrade_required'
@@ -110,6 +101,16 @@ import type { CopilotChat as ApiCopilotChat } from '@/lib/copilot/api'
export type CopilotChat = ApiCopilotChat
/**
* A checkpoint entry as returned from the checkpoints API.
*/
export interface CheckpointEntry {
id: string
messageId?: string
workflowState?: Record<string, unknown>
createdAt?: string
}
export interface CopilotState {
mode: CopilotMode
selectedModel: CopilotModelId
@@ -122,7 +123,7 @@ export interface CopilotState {
messages: CopilotMessage[]
workflowId: string | null
messageCheckpoints: Record<string, any[]>
messageCheckpoints: Record<string, CheckpointEntry[]>
messageSnapshots: Record<string, WorkflowState>
isLoading: boolean
@@ -210,11 +211,11 @@ export interface CopilotActions {
toolCallId?: string
) => void
resumeActiveStream: () => Promise<boolean>
setToolCallState: (toolCall: any, newState: ClientToolCallState, options?: any) => void
updateToolCallParams: (toolCallId: string, params: Record<string, any>) => void
setToolCallState: (toolCall: CopilotToolCall, newState: ClientToolCallState | string) => void
updateToolCallParams: (toolCallId: string, params: Record<string, unknown>) => void
loadMessageCheckpoints: (chatId: string) => Promise<void>
revertToCheckpoint: (checkpointId: string) => Promise<void>
getCheckpointsForMessage: (messageId: string) => any[]
getCheckpointsForMessage: (messageId: string) => CheckpointEntry[]
saveMessageCheckpoint: (messageId: string) => Promise<boolean>
clearMessages: () => void

View File

@@ -1,12 +1,7 @@
import { createLogger } from '@sim/logger'
declare global {
interface Window {
__skipDiffRecording?: boolean
}
}
import { create } from 'zustand'
import { devtools } from 'zustand/middleware'
import { COPILOT_STATS_API_PATH } from '@/lib/copilot/constants'
import { stripWorkflowDiffMarkers, WorkflowDiffEngine } from '@/lib/workflows/diff'
import { enqueueReplaceWorkflowState } from '@/lib/workflows/operations/socket-operations'
import { validateWorkflowState } from '@/lib/workflows/sanitization/validation'
@@ -82,7 +77,7 @@ export const useWorkflowDiffStore = create<WorkflowDiffState & WorkflowDiffActio
_triggerMessageId: null,
_batchedStateUpdate: batchedUpdate,
setProposedChanges: async (proposedState, diffAnalysis) => {
setProposedChanges: async (proposedState, diffAnalysis, options) => {
const activeWorkflowId = useWorkflowRegistry.getState().activeWorkflowId
if (!activeWorkflowId) {
logger.error('Cannot apply diff without an active workflow')
@@ -212,7 +207,7 @@ export const useWorkflowDiffStore = create<WorkflowDiffState & WorkflowDiffActio
})
// Emit event for undo/redo recording
if (!window.__skipDiffRecording) {
if (!options?.skipRecording) {
window.dispatchEvent(
new CustomEvent('record-diff-operation', {
detail: {
@@ -257,7 +252,7 @@ export const useWorkflowDiffStore = create<WorkflowDiffState & WorkflowDiffActio
batchedUpdate({ isShowingDiff: !isShowingDiff })
},
acceptChanges: async () => {
acceptChanges: async (options) => {
const activeWorkflowId = useWorkflowRegistry.getState().activeWorkflowId
if (!activeWorkflowId) {
logger.error('No active workflow ID found when accepting diff')
@@ -307,7 +302,7 @@ export const useWorkflowDiffStore = create<WorkflowDiffState & WorkflowDiffActio
applyWorkflowStateToStores(activeWorkflowId, stateToApply)
// Emit event for undo/redo recording (unless we're in an undo/redo operation)
if (!window.__skipDiffRecording) {
if (!options?.skipRecording) {
window.dispatchEvent(
new CustomEvent('record-diff-operation', {
detail: {
@@ -323,7 +318,7 @@ export const useWorkflowDiffStore = create<WorkflowDiffState & WorkflowDiffActio
// Background operations (fire-and-forget) - don't block
if (triggerMessageId) {
fetch('/api/copilot/stats', {
fetch(COPILOT_STATS_API_PATH, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
@@ -331,7 +326,12 @@ export const useWorkflowDiffStore = create<WorkflowDiffState & WorkflowDiffActio
diffCreated: true,
diffAccepted: true,
}),
}).catch(() => {})
}).catch((error) => {
logger.warn('Failed to send diff-accepted stats', {
error: error instanceof Error ? error.message : String(error),
messageId: triggerMessageId,
})
})
}
findLatestEditWorkflowToolCallId().then((toolCallId) => {
@@ -347,7 +347,7 @@ export const useWorkflowDiffStore = create<WorkflowDiffState & WorkflowDiffActio
})
},
rejectChanges: async () => {
rejectChanges: async (options) => {
const { baselineWorkflow, baselineWorkflowId, _triggerMessageId, diffAnalysis } = get()
const activeWorkflowId = useWorkflowRegistry.getState().activeWorkflowId
@@ -389,7 +389,7 @@ export const useWorkflowDiffStore = create<WorkflowDiffState & WorkflowDiffActio
applyWorkflowStateToStores(baselineWorkflowId, baselineWorkflow)
// Emit event for undo/redo recording synchronously
if (!window.__skipDiffRecording) {
if (!options?.skipRecording) {
window.dispatchEvent(
new CustomEvent('record-diff-operation', {
detail: {
@@ -423,7 +423,7 @@ export const useWorkflowDiffStore = create<WorkflowDiffState & WorkflowDiffActio
})
if (_triggerMessageId) {
fetch('/api/copilot/stats', {
fetch(COPILOT_STATS_API_PATH, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
@@ -431,7 +431,12 @@ export const useWorkflowDiffStore = create<WorkflowDiffState & WorkflowDiffActio
diffCreated: true,
diffAccepted: false,
}),
}).catch(() => {})
}).catch((error) => {
logger.warn('Failed to send diff-rejected stats', {
error: error instanceof Error ? error.message : String(error),
messageId: _triggerMessageId,
})
})
}
findLatestEditWorkflowToolCallId().then((toolCallId) => {
@@ -460,11 +465,13 @@ export const useWorkflowDiffStore = create<WorkflowDiffState & WorkflowDiffActio
const needsUpdate =
diffAnalysis.new_blocks?.some((blockId) => {
const block = currentBlocks[blockId]
return block && (block as any).is_diff !== 'new'
const blockDiffState = (block as { is_diff?: string } | undefined)?.is_diff
return block && blockDiffState !== 'new'
}) ||
diffAnalysis.edited_blocks?.some((blockId) => {
const block = currentBlocks[blockId]
return block && (block as any).is_diff !== 'edited'
const blockDiffState = (block as { is_diff?: string } | undefined)?.is_diff
return block && blockDiffState !== 'edited'
})
if (!needsUpdate) {
@@ -478,11 +485,12 @@ export const useWorkflowDiffStore = create<WorkflowDiffState & WorkflowDiffActio
Object.entries(currentBlocks).forEach(([blockId, block]) => {
const isNewBlock = diffAnalysis.new_blocks?.includes(blockId)
const isEditedBlock = diffAnalysis.edited_blocks?.includes(blockId)
const blockDiffState = (block as { is_diff?: string } | undefined)?.is_diff
if (isNewBlock && (block as any).is_diff !== 'new') {
if (isNewBlock && blockDiffState !== 'new') {
updatedBlocks[blockId] = { ...block, is_diff: 'new' }
hasChanges = true
} else if (isEditedBlock && (block as any).is_diff !== 'edited') {
} else if (isEditedBlock && blockDiffState !== 'edited') {
updatedBlocks[blockId] = { ...block, is_diff: 'edited' }
// Re-apply field_diffs if available

View File

@@ -13,12 +13,17 @@ export interface WorkflowDiffState {
_triggerMessageId?: string | null
}
export interface DiffActionOptions {
/** Skip recording this operation for undo/redo. Used during undo/redo replay. */
skipRecording?: boolean
}
export interface WorkflowDiffActions {
setProposedChanges: (workflowState: WorkflowState, diffAnalysis?: DiffAnalysis) => Promise<void>
setProposedChanges: (workflowState: WorkflowState, diffAnalysis?: DiffAnalysis, options?: DiffActionOptions) => Promise<void>
clearDiff: (options?: { restoreBaseline?: boolean }) => void
toggleDiffView: () => void
acceptChanges: () => Promise<void>
rejectChanges: () => Promise<void>
acceptChanges: (options?: DiffActionOptions) => Promise<void>
rejectChanges: (options?: DiffActionOptions) => Promise<void>
reapplyDiffMarkers: () => void
_batchedStateUpdate: (updates: Partial<WorkflowDiffState>) => void
}

View File

@@ -26,7 +26,7 @@ export function extractSubBlockValues(
Object.entries(workflowState.blocks || {}).forEach(([blockId, block]) => {
values[blockId] = {}
Object.entries(block.subBlocks || {}).forEach(([subBlockId, subBlock]) => {
values[blockId][subBlockId] = (subBlock as any)?.value ?? null
values[blockId][subBlockId] = subBlock?.value ?? null
})
})
return values
@@ -37,7 +37,7 @@ export function applyWorkflowStateToStores(
workflowState: WorkflowState,
options?: { updateLastSaved?: boolean }
) {
logger.info('[applyWorkflowStateToStores] Applying state', {
logger.debug('[applyWorkflowStateToStores] Applying state', {
workflowId,
blockCount: Object.keys(workflowState.blocks || {}).length,
edgeCount: workflowState.edges?.length ?? 0,
@@ -45,7 +45,7 @@ export function applyWorkflowStateToStores(
})
const workflowStore = useWorkflowStore.getState()
const cloned = cloneWorkflowState(workflowState)
logger.info('[applyWorkflowStateToStores] Cloned state edges', {
logger.debug('[applyWorkflowStateToStores] Cloned state edges', {
clonedEdgeCount: cloned.edges?.length ?? 0,
})
workflowStore.replaceWorkflowState(cloned, options)
@@ -54,7 +54,8 @@ export function applyWorkflowStateToStores(
// Verify what's in the store after apply
const afterState = workflowStore.getWorkflowState()
logger.info('[applyWorkflowStateToStores] After apply', {
logger.info('[applyWorkflowStateToStores] Applied workflow state to stores', {
workflowId,
afterEdgeCount: afterState.edges?.length ?? 0,
})
}
@@ -107,7 +108,7 @@ export async function persistWorkflowStateToServer(
export async function getLatestUserMessageId(): Promise<string | null> {
try {
const { useCopilotStore } = await import('@/stores/panel/copilot/store')
const { messages } = useCopilotStore.getState() as any
const { messages } = useCopilotStore.getState()
if (!Array.isArray(messages) || messages.length === 0) {
return null
}
@@ -127,21 +128,19 @@ export async function getLatestUserMessageId(): Promise<string | null> {
export async function findLatestEditWorkflowToolCallId(): Promise<string | undefined> {
try {
const { useCopilotStore } = await import('@/stores/panel/copilot/store')
const { messages, toolCallsById } = useCopilotStore.getState() as any
const { messages, toolCallsById } = useCopilotStore.getState()
for (let mi = messages.length - 1; mi >= 0; mi--) {
const message = messages[mi]
if (message.role !== 'assistant' || !message.contentBlocks) continue
for (const block of message.contentBlocks as any[]) {
for (const block of message.contentBlocks) {
if (block?.type === 'tool_call' && block.toolCall?.name === 'edit_workflow') {
return block.toolCall?.id
}
}
}
const fallback = Object.values(toolCallsById).filter(
(call: any) => call.name === 'edit_workflow'
) as any[]
const fallback = Object.values(toolCallsById).filter((call) => call.name === 'edit_workflow')
return fallback.length ? fallback[fallback.length - 1].id : undefined
} catch (error) {
@@ -150,7 +149,7 @@ export async function findLatestEditWorkflowToolCallId(): Promise<string | undef
}
}
export function createBatchedUpdater(set: any) {
export function createBatchedUpdater(set: (updates: Partial<WorkflowDiffState>) => void) {
let updateTimer: NodeJS.Timeout | null = null
const UPDATE_DEBOUNCE_MS = 16
let pendingUpdates: Partial<WorkflowDiffState> = {}