Checkpoint

This commit is contained in:
Siddharth Ganesan
2026-02-05 11:20:58 -08:00
parent 99f920a8d1
commit 378b19abdf
18 changed files with 1969 additions and 1680 deletions

View File

@@ -1,16 +1,11 @@
import { createLogger } from '@sim/logger'
import { SIM_AGENT_API_URL_DEFAULT } from '@/lib/copilot/constants'
import { handleSubagentRouting, sseHandlers, subAgentHandlers } from '@/lib/copilot/orchestrator/sse-handlers'
import {
getToolCallIdFromEvent,
handleSubagentRouting,
markToolCallSeen,
markToolResultSeen,
normalizeSseEvent,
sseHandlers,
subAgentHandlers,
wasToolCallSeen,
wasToolResultSeen,
} from '@/lib/copilot/orchestrator/sse-handlers'
shouldSkipToolCallEvent,
shouldSkipToolResultEvent,
} from '@/lib/copilot/orchestrator/sse-utils'
import { parseSSEStream } from '@/lib/copilot/orchestrator/sse-parser'
import { prepareExecutionContext } from '@/lib/copilot/orchestrator/tool-executor'
import type {
@@ -99,38 +94,9 @@ export async function orchestrateCopilotStream(
const normalizedEvent = normalizeSseEvent(event)
// Skip tool_result events for tools the sim-side already executed.
// The sim-side emits its own tool_result with complete data.
// For server-side tools (not executed by sim), we still forward the Go backend's tool_result.
const toolCallId = getToolCallIdFromEvent(normalizedEvent)
const eventData = normalizedEvent.data
const isPartialToolCall =
normalizedEvent.type === 'tool_call' && eventData?.partial === true
const shouldSkipToolCall =
normalizedEvent.type === 'tool_call' &&
!!toolCallId &&
!isPartialToolCall &&
(wasToolResultSeen(toolCallId) || wasToolCallSeen(toolCallId))
if (
normalizedEvent.type === 'tool_call' &&
toolCallId &&
!isPartialToolCall &&
!shouldSkipToolCall
) {
markToolCallSeen(toolCallId)
}
const shouldSkipToolResult =
normalizedEvent.type === 'tool_result' &&
(() => {
if (!toolCallId) return false
if (wasToolResultSeen(toolCallId)) return true
markToolResultSeen(toolCallId)
return false
})()
// Skip duplicate tool events to prevent state regressions.
const shouldSkipToolCall = shouldSkipToolCallEvent(normalizedEvent)
const shouldSkipToolResult = shouldSkipToolResultEvent(normalizedEvent)
if (!shouldSkipToolCall && !shouldSkipToolResult) {
await forwardEvent(normalizedEvent, options)

View File

@@ -0,0 +1,95 @@
/**
* @vitest-environment node
*/
import { beforeEach, describe, expect, it, vi } from 'vitest'
import { loggerMock } from '@sim/testing'
vi.mock('@sim/logger', () => loggerMock)
const executeToolServerSide = vi.fn()
const markToolComplete = vi.fn()
vi.mock('@/lib/copilot/orchestrator/tool-executor', () => ({
executeToolServerSide,
markToolComplete,
}))
import { sseHandlers } from '@/lib/copilot/orchestrator/sse-handlers'
import type { ExecutionContext, StreamingContext } from '@/lib/copilot/orchestrator/types'
describe('sse-handlers tool lifecycle', () => {
let context: StreamingContext
let execContext: ExecutionContext
beforeEach(() => {
vi.clearAllMocks()
context = {
chatId: undefined,
conversationId: undefined,
messageId: 'msg-1',
accumulatedContent: '',
contentBlocks: [],
toolCalls: new Map(),
currentThinkingBlock: null,
isInThinkingBlock: false,
subAgentParentToolCallId: undefined,
subAgentContent: {},
subAgentToolCalls: {},
pendingContent: '',
streamComplete: false,
wasAborted: false,
errors: [],
}
execContext = {
userId: 'user-1',
workflowId: 'workflow-1',
}
})
it('executes tool_call and emits tool_result + mark-complete', async () => {
executeToolServerSide.mockResolvedValueOnce({ success: true, output: { ok: true } })
markToolComplete.mockResolvedValueOnce(true)
const onEvent = vi.fn()
await sseHandlers.tool_call(
{
type: 'tool_call',
data: { id: 'tool-1', name: 'get_user_workflow', arguments: { workflowId: 'workflow-1' } },
} as any,
context,
execContext,
{ onEvent, interactive: false, timeout: 1000 }
)
expect(executeToolServerSide).toHaveBeenCalledTimes(1)
expect(markToolComplete).toHaveBeenCalledTimes(1)
expect(onEvent).toHaveBeenCalledWith(
expect.objectContaining({
type: 'tool_result',
toolCallId: 'tool-1',
success: true,
})
)
const updated = context.toolCalls.get('tool-1')
expect(updated?.status).toBe('success')
expect(updated?.result?.output).toEqual({ ok: true })
})
it('skips duplicate tool_call after result', async () => {
executeToolServerSide.mockResolvedValueOnce({ success: true, output: { ok: true } })
markToolComplete.mockResolvedValueOnce(true)
const event = {
type: 'tool_call',
data: { id: 'tool-dup', name: 'get_user_workflow', arguments: { workflowId: 'workflow-1' } },
}
await sseHandlers.tool_call(event as any, context, execContext, { interactive: false })
await sseHandlers.tool_call(event as any, context, execContext, { interactive: false })
expect(executeToolServerSide).toHaveBeenCalledTimes(1)
expect(markToolComplete).toHaveBeenCalledTimes(1)
})
})

View File

@@ -1,6 +1,11 @@
import { createLogger } from '@sim/logger'
import { INTERRUPT_TOOL_SET, SUBAGENT_TOOL_SET } from '@/lib/copilot/orchestrator/config'
import { getToolConfirmation } from '@/lib/copilot/orchestrator/persistence'
import {
getEventData,
markToolResultSeen,
wasToolResultSeen,
} from '@/lib/copilot/orchestrator/sse-utils'
import { executeToolServerSide, markToolComplete } from '@/lib/copilot/orchestrator/tool-executor'
import type {
ContentBlock,
@@ -13,109 +18,7 @@ import type {
const logger = createLogger('CopilotSseHandlers')
/**
* Tracks tool call IDs for which a tool_call has already been forwarded/emitted (non-partial).
*/
const seenToolCalls = new Set<string>()
/**
* Tracks tool call IDs for which a tool_result has already been emitted or forwarded.
*/
const seenToolResults = new Set<string>()
export function markToolCallSeen(toolCallId: string): void {
seenToolCalls.add(toolCallId)
setTimeout(
() => {
seenToolCalls.delete(toolCallId)
},
5 * 60 * 1000
)
}
export function wasToolCallSeen(toolCallId: string): boolean {
return seenToolCalls.has(toolCallId)
}
type EventDataObject = Record<string, any> | undefined
const parseEventData = (data: unknown): EventDataObject => {
if (!data) return undefined
if (typeof data !== 'string') {
return data as EventDataObject
}
try {
return JSON.parse(data) as EventDataObject
} catch {
return undefined
}
}
const hasToolFields = (data: EventDataObject): boolean => {
if (!data) return false
return (
data.id !== undefined ||
data.toolCallId !== undefined ||
data.name !== undefined ||
data.success !== undefined ||
data.result !== undefined ||
data.arguments !== undefined
)
}
const getEventData = (event: SSEEvent): EventDataObject => {
const topLevel = parseEventData(event.data)
if (!topLevel) return undefined
if (hasToolFields(topLevel)) return topLevel
const nested = parseEventData(topLevel.data)
return nested || topLevel
}
export function getToolCallIdFromEvent(event: SSEEvent): string | undefined {
const data = getEventData(event)
return event.toolCallId || data?.id || data?.toolCallId
}
/** Normalizes SSE events so tool metadata is available at the top level. */
export function normalizeSseEvent(event: SSEEvent): SSEEvent {
if (!event) return event
const data = getEventData(event)
if (!data) return event
const toolCallId = event.toolCallId || data.id || data.toolCallId
const toolName = event.toolName || data.name || data.toolName
const success = event.success ?? data.success
const result = event.result ?? data.result
const normalizedData = typeof event.data === 'string' ? data : event.data
return {
...event,
data: normalizedData,
toolCallId,
toolName,
success,
result,
}
}
/**
* Mark a tool call as executed by the sim-side.
* This prevents the Go backend's duplicate tool_result from being forwarded.
*/
export function markToolResultSeen(toolCallId: string): void {
seenToolResults.add(toolCallId)
setTimeout(
() => {
seenToolResults.delete(toolCallId)
},
5 * 60 * 1000
)
}
/**
* Check if a tool call was executed by the sim-side.
*/
export function wasToolResultSeen(toolCallId: string): boolean {
return seenToolResults.has(toolCallId)
}
// Normalization + dedupe helpers live in sse-utils to keep server/client in sync.
/**
* Respond tools are internal to the copilot's subagent system.

View File

@@ -0,0 +1,43 @@
/**
* @vitest-environment node
*/
import { describe, expect, it } from 'vitest'
import {
normalizeSseEvent,
shouldSkipToolCallEvent,
shouldSkipToolResultEvent,
} from '@/lib/copilot/orchestrator/sse-utils'
describe('sse-utils', () => {
it.concurrent('normalizes tool fields from string data', () => {
const event = {
type: 'tool_result',
data: JSON.stringify({
id: 'tool_1',
name: 'edit_workflow',
success: true,
result: { ok: true },
}),
}
const normalized = normalizeSseEvent(event as any)
expect(normalized.toolCallId).toBe('tool_1')
expect(normalized.toolName).toBe('edit_workflow')
expect(normalized.success).toBe(true)
expect(normalized.result).toEqual({ ok: true })
})
it.concurrent('dedupes tool_call events', () => {
const event = { type: 'tool_call', data: { id: 'tool_call_1', name: 'plan' } }
expect(shouldSkipToolCallEvent(event as any)).toBe(false)
expect(shouldSkipToolCallEvent(event as any)).toBe(true)
})
it.concurrent('dedupes tool_result events', () => {
const event = { type: 'tool_result', data: { id: 'tool_result_1', name: 'plan' } }
expect(shouldSkipToolResultEvent(event as any)).toBe(false)
expect(shouldSkipToolResultEvent(event as any)).toBe(true)
})
})

View File

@@ -0,0 +1,119 @@
import type { SSEEvent } from '@/lib/copilot/orchestrator/types'
type EventDataObject = Record<string, any> | undefined
const DEFAULT_TOOL_EVENT_TTL_MS = 5 * 60 * 1000
/**
* In-memory tool event dedupe.
*
* NOTE: These sets are process-local only. In a multi-instance setup (e.g., ECS),
* each task maintains its own dedupe cache, so duplicates can still appear across tasks.
*/
const seenToolCalls = new Set<string>()
const seenToolResults = new Set<string>()
const parseEventData = (data: unknown): EventDataObject => {
if (!data) return undefined
if (typeof data !== 'string') {
return data as EventDataObject
}
try {
return JSON.parse(data) as EventDataObject
} catch {
return undefined
}
}
const hasToolFields = (data: EventDataObject): boolean => {
if (!data) return false
return (
data.id !== undefined ||
data.toolCallId !== undefined ||
data.name !== undefined ||
data.success !== undefined ||
data.result !== undefined ||
data.arguments !== undefined
)
}
export const getEventData = (event: SSEEvent): EventDataObject => {
const topLevel = parseEventData(event.data)
if (!topLevel) return undefined
if (hasToolFields(topLevel)) return topLevel
const nested = parseEventData(topLevel.data)
return nested || topLevel
}
export function getToolCallIdFromEvent(event: SSEEvent): string | undefined {
const data = getEventData(event)
return event.toolCallId || data?.id || data?.toolCallId
}
/** Normalizes SSE events so tool metadata is available at the top level. */
export function normalizeSseEvent(event: SSEEvent): SSEEvent {
if (!event) return event
const data = getEventData(event)
if (!data) return event
const toolCallId = event.toolCallId || data.id || data.toolCallId
const toolName = event.toolName || data.name || data.toolName
const success = event.success ?? data.success
const result = event.result ?? data.result
const normalizedData = typeof event.data === 'string' ? data : event.data
return {
...event,
data: normalizedData,
toolCallId,
toolName,
success,
result,
}
}
export function markToolCallSeen(toolCallId: string, ttlMs: number = DEFAULT_TOOL_EVENT_TTL_MS): void {
seenToolCalls.add(toolCallId)
setTimeout(() => {
seenToolCalls.delete(toolCallId)
}, ttlMs)
}
export function wasToolCallSeen(toolCallId: string): boolean {
return seenToolCalls.has(toolCallId)
}
export function markToolResultSeen(
toolCallId: string,
ttlMs: number = DEFAULT_TOOL_EVENT_TTL_MS
): void {
seenToolResults.add(toolCallId)
setTimeout(() => {
seenToolResults.delete(toolCallId)
}, ttlMs)
}
export function wasToolResultSeen(toolCallId: string): boolean {
return seenToolResults.has(toolCallId)
}
export function shouldSkipToolCallEvent(event: SSEEvent): boolean {
if (event.type !== 'tool_call') return false
const toolCallId = getToolCallIdFromEvent(event)
if (!toolCallId) return false
const eventData = getEventData(event)
if (eventData?.partial === true) return false
if (wasToolResultSeen(toolCallId) || wasToolCallSeen(toolCallId)) {
return true
}
markToolCallSeen(toolCallId)
return false
}
export function shouldSkipToolResultEvent(event: SSEEvent): boolean {
if (event.type !== 'tool_result') return false
const toolCallId = getToolCallIdFromEvent(event)
if (!toolCallId) return false
if (wasToolResultSeen(toolCallId)) return true
markToolResultSeen(toolCallId)
return false
}

View File

@@ -0,0 +1,118 @@
/**
* @vitest-environment node
*/
import { beforeEach, describe, expect, it, vi } from 'vitest'
import { loggerMock } from '@sim/testing'
vi.mock('@sim/logger', () => loggerMock)
type StoredEntry = { score: number; value: string }
const createRedisStub = () => {
const events = new Map<string, StoredEntry[]>()
const counters = new Map<string, number>()
const readEntries = (key: string, min: number, max: number) => {
const list = events.get(key) || []
return list
.filter((entry) => entry.score >= min && entry.score <= max)
.sort((a, b) => a.score - b.score)
.map((entry) => entry.value)
}
return {
del: vi.fn().mockResolvedValue(1),
hset: vi.fn().mockResolvedValue(1),
hgetall: vi.fn().mockResolvedValue({}),
expire: vi.fn().mockResolvedValue(1),
eval: vi.fn().mockImplementation(
(
_lua: string,
_keysCount: number,
seqKey: string,
eventsKey: string,
_ttl: number,
_limit: number,
streamId: string,
eventJson: string
) => {
const current = counters.get(seqKey) || 0
const next = current + 1
counters.set(seqKey, next)
const entry = JSON.stringify({ eventId: next, streamId, event: JSON.parse(eventJson) })
const list = events.get(eventsKey) || []
list.push({ score: next, value: entry })
events.set(eventsKey, list)
return next
}
),
incrby: vi.fn().mockImplementation((key: string, amount: number) => {
const current = counters.get(key) || 0
const next = current + amount
counters.set(key, next)
return next
}),
zrangebyscore: vi.fn().mockImplementation((key: string, min: string, max: string) => {
const minVal = Number(min)
const maxVal = max === '+inf' ? Number.POSITIVE_INFINITY : Number(max)
return Promise.resolve(readEntries(key, minVal, maxVal))
}),
pipeline: vi.fn().mockImplementation(() => {
const api = {
zadd: vi.fn().mockImplementation((key: string, ...args: Array<string | number>) => {
const list = events.get(key) || []
for (let i = 0; i < args.length; i += 2) {
list.push({ score: Number(args[i]), value: String(args[i + 1]) })
}
events.set(key, list)
return api
}),
expire: vi.fn().mockReturnValue(api),
zremrangebyrank: vi.fn().mockReturnValue(api),
exec: vi.fn().mockResolvedValue([]),
}
return api
}),
}
}
let mockRedis: ReturnType<typeof createRedisStub>
vi.mock('@/lib/core/config/redis', () => ({
getRedisClient: () => mockRedis,
}))
import {
appendStreamEvent,
createStreamEventWriter,
readStreamEvents,
} from '@/lib/copilot/orchestrator/stream-buffer'
describe('stream-buffer', () => {
beforeEach(() => {
mockRedis = createRedisStub()
vi.clearAllMocks()
})
it.concurrent('replays events after a given event id', async () => {
await appendStreamEvent('stream-1', { type: 'content', data: 'hello' })
await appendStreamEvent('stream-1', { type: 'content', data: 'world' })
const allEvents = await readStreamEvents('stream-1', 0)
expect(allEvents.map((entry) => entry.event.data)).toEqual(['hello', 'world'])
const replayed = await readStreamEvents('stream-1', 1)
expect(replayed.map((entry) => entry.event.data)).toEqual(['world'])
})
it.concurrent('flushes buffered events for resume', async () => {
const writer = createStreamEventWriter('stream-2')
await writer.write({ type: 'content', data: 'a' })
await writer.write({ type: 'content', data: 'b' })
await writer.flush()
const events = await readStreamEvents('stream-2', 0)
expect(events.map((entry) => entry.event.data)).toEqual(['a', 'b'])
})
})

View File

@@ -1,13 +1,40 @@
import { createLogger } from '@sim/logger'
import { env } from '@/lib/core/config/env'
import { getRedisClient } from '@/lib/core/config/redis'
const logger = createLogger('CopilotStreamBuffer')
const STREAM_TTL_SECONDS = 60 * 60
const STREAM_EVENT_LIMIT = 5000
const STREAM_RESERVE_BATCH = 200
const STREAM_FLUSH_INTERVAL_MS = 15
const STREAM_FLUSH_MAX_BATCH = 200
const STREAM_DEFAULTS = {
ttlSeconds: 60 * 60,
eventLimit: 5000,
reserveBatch: 200,
flushIntervalMs: 15,
flushMaxBatch: 200,
}
export type StreamBufferConfig = {
ttlSeconds: number
eventLimit: number
reserveBatch: number
flushIntervalMs: number
flushMaxBatch: number
}
const parseNumber = (value: number | string | undefined, fallback: number): number => {
if (typeof value === 'number' && Number.isFinite(value)) return value
const parsed = Number(value)
return Number.isFinite(parsed) ? parsed : fallback
}
export function getStreamBufferConfig(): StreamBufferConfig {
return {
ttlSeconds: parseNumber(env.COPILOT_STREAM_TTL_SECONDS, STREAM_DEFAULTS.ttlSeconds),
eventLimit: parseNumber(env.COPILOT_STREAM_EVENT_LIMIT, STREAM_DEFAULTS.eventLimit),
reserveBatch: parseNumber(env.COPILOT_STREAM_RESERVE_BATCH, STREAM_DEFAULTS.reserveBatch),
flushIntervalMs: parseNumber(env.COPILOT_STREAM_FLUSH_INTERVAL_MS, STREAM_DEFAULTS.flushIntervalMs),
flushMaxBatch: parseNumber(env.COPILOT_STREAM_FLUSH_MAX_BATCH, STREAM_DEFAULTS.flushMaxBatch),
}
}
const APPEND_STREAM_EVENT_LUA = `
local seqKey = KEYS[1]
@@ -82,6 +109,7 @@ export async function setStreamMeta(streamId: string, meta: StreamMeta): Promise
const redis = getRedisClient()
if (!redis) return
try {
const config = getStreamBufferConfig()
const payload: Record<string, string> = {
status: meta.status,
updatedAt: meta.updatedAt || new Date().toISOString(),
@@ -89,7 +117,7 @@ export async function setStreamMeta(streamId: string, meta: StreamMeta): Promise
if (meta.userId) payload.userId = meta.userId
if (meta.error) payload.error = meta.error
await redis.hset(getMetaKey(streamId), payload)
await redis.expire(getMetaKey(streamId), STREAM_TTL_SECONDS)
await redis.expire(getMetaKey(streamId), config.ttlSeconds)
} catch (error) {
logger.warn('Failed to update stream meta', {
streamId,
@@ -124,14 +152,15 @@ export async function appendStreamEvent(
}
try {
const config = getStreamBufferConfig()
const eventJson = JSON.stringify(event)
const nextId = await redis.eval(
APPEND_STREAM_EVENT_LUA,
2,
getSeqKey(streamId),
getEventsKey(streamId),
STREAM_TTL_SECONDS,
STREAM_EVENT_LIMIT,
config.ttlSeconds,
config.eventLimit,
streamId,
eventJson
)
@@ -156,6 +185,7 @@ export function createStreamEventWriter(streamId: string): StreamEventWriter {
}
}
const config = getStreamBufferConfig()
let pending: StreamEventEntry[] = []
let nextEventId = 0
let maxReservedId = 0
@@ -167,11 +197,11 @@ export function createStreamEventWriter(streamId: string): StreamEventWriter {
flushTimer = setTimeout(() => {
flushTimer = null
void flush()
}, STREAM_FLUSH_INTERVAL_MS)
}, config.flushIntervalMs)
}
const reserveIds = async (minCount: number) => {
const reserveCount = Math.max(STREAM_RESERVE_BATCH, minCount)
const reserveCount = Math.max(config.reserveBatch, minCount)
const newMax = await redis.incrby(getSeqKey(streamId), reserveCount)
const startId = newMax - reserveCount + 1
if (nextEventId === 0 || nextEventId > maxReservedId) {
@@ -193,9 +223,9 @@ export function createStreamEventWriter(streamId: string): StreamEventWriter {
}
const pipeline = redis.pipeline()
pipeline.zadd(key, ...(zaddArgs as any))
pipeline.expire(key, STREAM_TTL_SECONDS)
pipeline.expire(getSeqKey(streamId), STREAM_TTL_SECONDS)
pipeline.zremrangebyrank(key, 0, -STREAM_EVENT_LIMIT - 1)
pipeline.expire(key, config.ttlSeconds)
pipeline.expire(getSeqKey(streamId), config.ttlSeconds)
pipeline.zremrangebyrank(key, 0, -config.eventLimit - 1)
await pipeline.exec()
} catch (error) {
logger.warn('Failed to flush stream events', {
@@ -216,7 +246,7 @@ export function createStreamEventWriter(streamId: string): StreamEventWriter {
const eventId = nextEventId++
const entry: StreamEventEntry = { eventId, streamId, event }
pending.push(entry)
if (pending.length >= STREAM_FLUSH_MAX_BATCH) {
if (pending.length >= config.flushMaxBatch) {
await flush()
} else {
scheduleFlush()

View File

@@ -1,16 +1,11 @@
import { createLogger } from '@sim/logger'
import { SIM_AGENT_API_URL_DEFAULT } from '@/lib/copilot/constants'
import { handleSubagentRouting, sseHandlers, subAgentHandlers } from '@/lib/copilot/orchestrator/sse-handlers'
import {
getToolCallIdFromEvent,
handleSubagentRouting,
markToolCallSeen,
markToolResultSeen,
normalizeSseEvent,
sseHandlers,
subAgentHandlers,
wasToolCallSeen,
wasToolResultSeen,
} from '@/lib/copilot/orchestrator/sse-handlers'
shouldSkipToolCallEvent,
shouldSkipToolResultEvent,
} from '@/lib/copilot/orchestrator/sse-utils'
import { parseSSEStream } from '@/lib/copilot/orchestrator/sse-parser'
import { prepareExecutionContext } from '@/lib/copilot/orchestrator/tool-executor'
import type {
@@ -115,38 +110,9 @@ export async function orchestrateSubagentStream(
const normalizedEvent = normalizeSseEvent(event)
// Skip tool_result events for tools the sim-side already executed.
// The sim-side emits its own tool_result with complete data.
// For server-side tools (not executed by sim), we still forward the Go backend's tool_result.
const toolCallId = getToolCallIdFromEvent(normalizedEvent)
const eventData = normalizedEvent.data
const isPartialToolCall =
normalizedEvent.type === 'tool_call' && eventData?.partial === true
const shouldSkipToolCall =
normalizedEvent.type === 'tool_call' &&
!!toolCallId &&
!isPartialToolCall &&
(wasToolResultSeen(toolCallId) || wasToolCallSeen(toolCallId))
if (
normalizedEvent.type === 'tool_call' &&
toolCallId &&
!isPartialToolCall &&
!shouldSkipToolCall
) {
markToolCallSeen(toolCallId)
}
const shouldSkipToolResult =
normalizedEvent.type === 'tool_result' &&
(() => {
if (!toolCallId) return false
if (wasToolResultSeen(toolCallId)) return true
markToolResultSeen(toolCallId)
return false
})()
// Skip duplicate tool events to prevent state regressions.
const shouldSkipToolCall = shouldSkipToolCallEvent(normalizedEvent)
const shouldSkipToolResult = shouldSkipToolResultEvent(normalizedEvent)
if (!shouldSkipToolCall && !shouldSkipToolResult) {
await forwardEvent(normalizedEvent, options)

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,130 @@
import { db } from '@sim/db'
import { permissions, workflow, workspace } from '@sim/db/schema'
import { and, asc, desc, eq, inArray, or } from 'drizzle-orm'
type WorkflowRecord = typeof workflow.$inferSelect
export async function ensureWorkflowAccess(
workflowId: string,
userId: string
): Promise<{
workflow: WorkflowRecord
workspaceId?: string | null
}> {
const [workflowRecord] = await db
.select()
.from(workflow)
.where(eq(workflow.id, workflowId))
.limit(1)
if (!workflowRecord) {
throw new Error(`Workflow ${workflowId} not found`)
}
if (workflowRecord.userId === userId) {
return { workflow: workflowRecord, workspaceId: workflowRecord.workspaceId }
}
if (workflowRecord.workspaceId) {
const [permissionRow] = await db
.select({ permissionType: permissions.permissionType })
.from(permissions)
.where(
and(
eq(permissions.entityType, 'workspace'),
eq(permissions.entityId, workflowRecord.workspaceId),
eq(permissions.userId, userId)
)
)
.limit(1)
if (permissionRow) {
return { workflow: workflowRecord, workspaceId: workflowRecord.workspaceId }
}
}
throw new Error('Unauthorized workflow access')
}
export async function getDefaultWorkspaceId(userId: string): Promise<string> {
const workspaces = await db
.select({ workspaceId: workspace.id })
.from(permissions)
.innerJoin(workspace, eq(permissions.entityId, workspace.id))
.where(and(eq(permissions.userId, userId), eq(permissions.entityType, 'workspace')))
.orderBy(desc(workspace.createdAt))
.limit(1)
const workspaceId = workspaces[0]?.workspaceId
if (!workspaceId) {
throw new Error('No workspace found for user')
}
return workspaceId
}
export async function ensureWorkspaceAccess(
workspaceId: string,
userId: string,
requireWrite: boolean
): Promise<void> {
const [row] = await db
.select({
permissionType: permissions.permissionType,
ownerId: workspace.ownerId,
})
.from(permissions)
.innerJoin(workspace, eq(permissions.entityId, workspace.id))
.where(
and(
eq(permissions.entityType, 'workspace'),
eq(permissions.entityId, workspaceId),
eq(permissions.userId, userId)
)
)
.limit(1)
if (!row) {
throw new Error(`Workspace ${workspaceId} not found`)
}
const isOwner = row.ownerId === userId
const permissionType = row.permissionType
const canWrite = isOwner || permissionType === 'admin' || permissionType === 'write'
if (requireWrite && !canWrite) {
throw new Error('Write or admin access required for this workspace')
}
if (!requireWrite && !canWrite && permissionType !== 'read') {
throw new Error('Access denied to workspace')
}
}
export async function getAccessibleWorkflowsForUser(
userId: string,
options?: { workspaceId?: string; folderId?: string }
) {
const workspaceIds = await db
.select({ entityId: permissions.entityId })
.from(permissions)
.where(and(eq(permissions.userId, userId), eq(permissions.entityType, 'workspace')))
const workspaceIdList = workspaceIds.map((row) => row.entityId)
const workflowConditions = [eq(workflow.userId, userId)]
if (workspaceIdList.length > 0) {
workflowConditions.push(inArray(workflow.workspaceId, workspaceIdList))
}
if (options?.workspaceId) {
workflowConditions.push(eq(workflow.workspaceId, options.workspaceId))
}
if (options?.folderId) {
workflowConditions.push(eq(workflow.folderId, options.folderId))
}
return db
.select()
.from(workflow)
.where(or(...workflowConditions))
.orderBy(asc(workflow.sortOrder), asc(workflow.createdAt), asc(workflow.id))
}

View File

@@ -0,0 +1,479 @@
import crypto from 'crypto'
import { db } from '@sim/db'
import { chat, workflow, workflowMcpServer, workflowMcpTool } from '@sim/db/schema'
import { and, eq, inArray } from 'drizzle-orm'
import type { ExecutionContext, ToolCallResult } from '@/lib/copilot/orchestrator/types'
import { sanitizeToolName } from '@/lib/mcp/workflow-tool-schema'
import { deployWorkflow, undeployWorkflow } from '@/lib/workflows/persistence/utils'
import { hasValidStartBlock } from '@/lib/workflows/triggers/trigger-utils.server'
import { checkChatAccess, checkWorkflowAccessForChatCreation } from '@/app/api/chat/utils'
import { ensureWorkflowAccess } from './access'
export async function executeDeployApi(
params: Record<string, any>,
context: ExecutionContext
): Promise<ToolCallResult> {
try {
const workflowId = params.workflowId || context.workflowId
if (!workflowId) {
return { success: false, error: 'workflowId is required' }
}
const action = params.action === 'undeploy' ? 'undeploy' : 'deploy'
const { workflow: workflowRecord } = await ensureWorkflowAccess(workflowId, context.userId)
if (action === 'undeploy') {
const result = await undeployWorkflow({ workflowId })
if (!result.success) {
return { success: false, error: result.error || 'Failed to undeploy workflow' }
}
return { success: true, output: { workflowId, isDeployed: false } }
}
const result = await deployWorkflow({
workflowId,
deployedBy: context.userId,
workflowName: workflowRecord.name || undefined,
})
if (!result.success) {
return { success: false, error: result.error || 'Failed to deploy workflow' }
}
return {
success: true,
output: {
workflowId,
isDeployed: true,
deployedAt: result.deployedAt,
version: result.version,
},
}
} catch (error) {
return { success: false, error: error instanceof Error ? error.message : String(error) }
}
}
export async function executeDeployChat(
params: Record<string, any>,
context: ExecutionContext
): Promise<ToolCallResult> {
try {
const workflowId = params.workflowId || context.workflowId
if (!workflowId) {
return { success: false, error: 'workflowId is required' }
}
const action = params.action === 'undeploy' ? 'undeploy' : 'deploy'
if (action === 'undeploy') {
const existing = await db.select().from(chat).where(eq(chat.workflowId, workflowId)).limit(1)
if (!existing.length) {
return { success: false, error: 'No active chat deployment found for this workflow' }
}
const { hasAccess } = await checkChatAccess(existing[0].id, context.userId)
if (!hasAccess) {
return { success: false, error: 'Unauthorized chat access' }
}
await db.delete(chat).where(eq(chat.id, existing[0].id))
return { success: true, output: { success: true, action: 'undeploy', isDeployed: false } }
}
const { hasAccess } = await checkWorkflowAccessForChatCreation(workflowId, context.userId)
if (!hasAccess) {
return { success: false, error: 'Workflow not found or access denied' }
}
const existing = await db.select().from(chat).where(eq(chat.workflowId, workflowId)).limit(1)
const existingDeployment = existing[0] || null
const identifier = String(params.identifier || existingDeployment?.identifier || '').trim()
const title = String(params.title || existingDeployment?.title || '').trim()
if (!identifier || !title) {
return { success: false, error: 'Chat identifier and title are required' }
}
const identifierPattern = /^[a-z0-9-]+$/
if (!identifierPattern.test(identifier)) {
return {
success: false,
error: 'Identifier can only contain lowercase letters, numbers, and hyphens',
}
}
const existingIdentifier = await db
.select()
.from(chat)
.where(eq(chat.identifier, identifier))
.limit(1)
if (existingIdentifier.length > 0 && existingIdentifier[0].id !== existingDeployment?.id) {
return { success: false, error: 'Identifier already in use' }
}
const deployResult = await deployWorkflow({
workflowId,
deployedBy: context.userId,
})
if (!deployResult.success) {
return { success: false, error: deployResult.error || 'Failed to deploy workflow' }
}
const payload = {
workflowId,
identifier,
title,
description: String(params.description || existingDeployment?.description || ''),
customizations: {
primaryColor:
params.customizations?.primaryColor ||
existingDeployment?.customizations?.primaryColor ||
'var(--brand-primary-hover-hex)',
welcomeMessage:
params.customizations?.welcomeMessage ||
existingDeployment?.customizations?.welcomeMessage ||
'Hi there! How can I help you today?',
},
authType: params.authType || existingDeployment?.authType || 'public',
password: params.password,
allowedEmails: params.allowedEmails || existingDeployment?.allowedEmails || [],
outputConfigs: params.outputConfigs || existingDeployment?.outputConfigs || [],
}
if (existingDeployment) {
await db
.update(chat)
.set({
identifier: payload.identifier,
title: payload.title,
description: payload.description,
customizations: payload.customizations,
authType: payload.authType,
password: payload.password || existingDeployment.password,
allowedEmails:
payload.authType === 'email' || payload.authType === 'sso' ? payload.allowedEmails : [],
outputConfigs: payload.outputConfigs,
updatedAt: new Date(),
})
.where(eq(chat.id, existingDeployment.id))
} else {
await db.insert(chat).values({
id: crypto.randomUUID(),
workflowId,
userId: context.userId,
identifier: payload.identifier,
title: payload.title,
description: payload.description,
customizations: payload.customizations,
isActive: true,
authType: payload.authType,
password: payload.password || null,
allowedEmails:
payload.authType === 'email' || payload.authType === 'sso' ? payload.allowedEmails : [],
outputConfigs: payload.outputConfigs,
createdAt: new Date(),
updatedAt: new Date(),
})
}
return {
success: true,
output: { success: true, action: 'deploy', isDeployed: true, identifier },
}
} catch (error) {
return { success: false, error: error instanceof Error ? error.message : String(error) }
}
}
export async function executeDeployMcp(
params: Record<string, any>,
context: ExecutionContext
): Promise<ToolCallResult> {
try {
const workflowId = params.workflowId || context.workflowId
if (!workflowId) {
return { success: false, error: 'workflowId is required' }
}
const { workflow: workflowRecord } = await ensureWorkflowAccess(workflowId, context.userId)
const workspaceId = workflowRecord.workspaceId
if (!workspaceId) {
return { success: false, error: 'workspaceId is required' }
}
if (!workflowRecord.isDeployed) {
return {
success: false,
error: 'Workflow must be deployed before adding as an MCP tool. Use deploy_api first.',
}
}
const serverId = params.serverId
if (!serverId) {
return {
success: false,
error: 'serverId is required. Use list_workspace_mcp_servers to get available servers.',
}
}
const existingTool = await db
.select()
.from(workflowMcpTool)
.where(and(eq(workflowMcpTool.serverId, serverId), eq(workflowMcpTool.workflowId, workflowId)))
.limit(1)
const toolName = sanitizeToolName(
params.toolName || workflowRecord.name || `workflow_${workflowId}`
)
const toolDescription =
params.toolDescription || workflowRecord.description || `Execute ${workflowRecord.name} workflow`
const parameterSchema = params.parameterSchema || {}
if (existingTool.length > 0) {
const toolId = existingTool[0].id
await db
.update(workflowMcpTool)
.set({
toolName,
toolDescription,
parameterSchema,
updatedAt: new Date(),
})
.where(eq(workflowMcpTool.id, toolId))
return { success: true, output: { toolId, toolName, toolDescription, updated: true } }
}
const toolId = crypto.randomUUID()
await db.insert(workflowMcpTool).values({
id: toolId,
serverId,
workflowId,
toolName,
toolDescription,
parameterSchema,
createdAt: new Date(),
updatedAt: new Date(),
})
return { success: true, output: { toolId, toolName, toolDescription, updated: false } }
} catch (error) {
return { success: false, error: error instanceof Error ? error.message : String(error) }
}
}
export async function executeRedeploy(context: ExecutionContext): Promise<ToolCallResult> {
try {
const workflowId = context.workflowId
if (!workflowId) {
return { success: false, error: 'workflowId is required' }
}
await ensureWorkflowAccess(workflowId, context.userId)
const result = await deployWorkflow({ workflowId, deployedBy: context.userId })
if (!result.success) {
return { success: false, error: result.error || 'Failed to redeploy workflow' }
}
return {
success: true,
output: { workflowId, deployedAt: result.deployedAt || null, version: result.version },
}
} catch (error) {
return { success: false, error: error instanceof Error ? error.message : String(error) }
}
}
export async function executeCheckDeploymentStatus(
params: Record<string, any>,
context: ExecutionContext
): Promise<ToolCallResult> {
try {
const workflowId = params.workflowId || context.workflowId
if (!workflowId) {
return { success: false, error: 'workflowId is required' }
}
const { workflow: workflowRecord } = await ensureWorkflowAccess(workflowId, context.userId)
const workspaceId = workflowRecord.workspaceId
const [apiDeploy, chatDeploy] = await Promise.all([
db.select().from(workflow).where(eq(workflow.id, workflowId)).limit(1),
db.select().from(chat).where(eq(chat.workflowId, workflowId)).limit(1),
])
const isApiDeployed = apiDeploy[0]?.isDeployed || false
const apiDetails = {
isDeployed: isApiDeployed,
deployedAt: apiDeploy[0]?.deployedAt || null,
endpoint: isApiDeployed ? `/api/workflows/${workflowId}/execute` : null,
apiKey: workflowRecord.workspaceId ? 'Workspace API keys' : 'Personal API keys',
needsRedeployment: false,
}
const isChatDeployed = !!chatDeploy[0]
const chatDetails = {
isDeployed: isChatDeployed,
chatId: chatDeploy[0]?.id || null,
identifier: chatDeploy[0]?.identifier || null,
chatUrl: isChatDeployed ? `/chat/${chatDeploy[0]?.identifier}` : null,
title: chatDeploy[0]?.title || null,
description: chatDeploy[0]?.description || null,
authType: chatDeploy[0]?.authType || null,
allowedEmails: chatDeploy[0]?.allowedEmails || null,
outputConfigs: chatDeploy[0]?.outputConfigs || null,
welcomeMessage: chatDeploy[0]?.customizations?.welcomeMessage || null,
primaryColor: chatDeploy[0]?.customizations?.primaryColor || null,
hasPassword: Boolean(chatDeploy[0]?.password),
}
const mcpDetails = { isDeployed: false, servers: [] as any[] }
if (workspaceId) {
const servers = await db
.select({
serverId: workflowMcpServer.id,
serverName: workflowMcpServer.name,
toolName: workflowMcpTool.toolName,
toolDescription: workflowMcpTool.toolDescription,
parameterSchema: workflowMcpTool.parameterSchema,
toolId: workflowMcpTool.id,
})
.from(workflowMcpTool)
.innerJoin(workflowMcpServer, eq(workflowMcpTool.serverId, workflowMcpServer.id))
.where(eq(workflowMcpTool.workflowId, workflowId))
if (servers.length > 0) {
mcpDetails.isDeployed = true
mcpDetails.servers = servers
}
}
const isDeployed = apiDetails.isDeployed || chatDetails.isDeployed || mcpDetails.isDeployed
return {
success: true,
output: { isDeployed, api: apiDetails, chat: chatDetails, mcp: mcpDetails },
}
} catch (error) {
return { success: false, error: error instanceof Error ? error.message : String(error) }
}
}
export async function executeListWorkspaceMcpServers(
params: Record<string, any>,
context: ExecutionContext
): Promise<ToolCallResult> {
try {
const workflowId = params.workflowId || context.workflowId
if (!workflowId) {
return { success: false, error: 'workflowId is required' }
}
const { workflow: workflowRecord } = await ensureWorkflowAccess(workflowId, context.userId)
const workspaceId = workflowRecord.workspaceId
if (!workspaceId) {
return { success: false, error: 'workspaceId is required' }
}
const servers = await db
.select({
id: workflowMcpServer.id,
name: workflowMcpServer.name,
description: workflowMcpServer.description,
})
.from(workflowMcpServer)
.where(eq(workflowMcpServer.workspaceId, workspaceId))
const serverIds = servers.map((server) => server.id)
const tools =
serverIds.length > 0
? await db
.select({
serverId: workflowMcpTool.serverId,
toolName: workflowMcpTool.toolName,
})
.from(workflowMcpTool)
.where(inArray(workflowMcpTool.serverId, serverIds))
: []
const toolNamesByServer: Record<string, string[]> = {}
for (const tool of tools) {
if (!toolNamesByServer[tool.serverId]) {
toolNamesByServer[tool.serverId] = []
}
toolNamesByServer[tool.serverId].push(tool.toolName)
}
const serversWithToolNames = servers.map((server) => ({
...server,
toolCount: toolNamesByServer[server.id]?.length || 0,
toolNames: toolNamesByServer[server.id] || [],
}))
return { success: true, output: { servers: serversWithToolNames, count: servers.length } }
} catch (error) {
return { success: false, error: error instanceof Error ? error.message : String(error) }
}
}
export async function executeCreateWorkspaceMcpServer(
params: Record<string, any>,
context: ExecutionContext
): Promise<ToolCallResult> {
try {
const workflowId = params.workflowId || context.workflowId
if (!workflowId) {
return { success: false, error: 'workflowId is required' }
}
const { workflow: workflowRecord } = await ensureWorkflowAccess(workflowId, context.userId)
const workspaceId = workflowRecord.workspaceId
if (!workspaceId) {
return { success: false, error: 'workspaceId is required' }
}
const name = params.name?.trim()
if (!name) {
return { success: false, error: 'name is required' }
}
const serverId = crypto.randomUUID()
const [server] = await db
.insert(workflowMcpServer)
.values({
id: serverId,
workspaceId,
createdBy: context.userId,
name,
description: params.description?.trim() || null,
isPublic: params.isPublic ?? false,
createdAt: new Date(),
updatedAt: new Date(),
})
.returning()
const workflowIds: string[] = params.workflowIds || []
const addedTools: Array<{ workflowId: string; toolName: string }> = []
if (workflowIds.length > 0) {
const workflows = await db.select().from(workflow).where(inArray(workflow.id, workflowIds))
for (const wf of workflows) {
if (wf.workspaceId !== workspaceId || !wf.isDeployed) {
continue
}
const hasStartBlock = await hasValidStartBlock(wf.id)
if (!hasStartBlock) {
continue
}
const toolName = sanitizeToolName(wf.name || `workflow_${wf.id}`)
await db.insert(workflowMcpTool).values({
id: crypto.randomUUID(),
serverId,
workflowId: wf.id,
toolName,
toolDescription: wf.description || `Execute ${wf.name} workflow`,
parameterSchema: {},
createdAt: new Date(),
updatedAt: new Date(),
})
addedTools.push({ workflowId: wf.id, toolName })
}
}
return { success: true, output: { server, addedTools } }
} catch (error) {
return { success: false, error: error instanceof Error ? error.message : String(error) }
}
}

View File

@@ -0,0 +1,100 @@
import { db } from '@sim/db'
import { account, workflow } from '@sim/db/schema'
import { and, eq } from 'drizzle-orm'
import type {
ExecutionContext,
ToolCallResult,
ToolCallState,
} from '@/lib/copilot/orchestrator/types'
import { generateRequestId } from '@/lib/core/utils/request'
import { getEffectiveDecryptedEnv } from '@/lib/environment/utils'
import { refreshTokenIfNeeded } from '@/app/api/auth/oauth/utils'
import { resolveEnvVarReferences } from '@/executor/utils/reference-validation'
import { executeTool } from '@/tools'
import { resolveToolId } from '@/tools/utils'
export async function executeIntegrationToolDirect(
toolCall: ToolCallState,
toolConfig: any,
context: ExecutionContext
): Promise<ToolCallResult> {
const { userId, workflowId } = context
const toolName = resolveToolId(toolCall.name)
const toolArgs = toolCall.params || {}
let workspaceId = context.workspaceId
if (!workspaceId && workflowId) {
const workflowResult = await db
.select({ workspaceId: workflow.workspaceId })
.from(workflow)
.where(eq(workflow.id, workflowId))
.limit(1)
workspaceId = workflowResult[0]?.workspaceId ?? undefined
}
const decryptedEnvVars =
context.decryptedEnvVars || (await getEffectiveDecryptedEnv(userId, workspaceId))
const executionParams: Record<string, any> = resolveEnvVarReferences(toolArgs, decryptedEnvVars, {
deep: true,
}) as Record<string, any>
if (toolConfig.oauth?.required && toolConfig.oauth.provider) {
const provider = toolConfig.oauth.provider
const accounts = await db
.select()
.from(account)
.where(and(eq(account.providerId, provider), eq(account.userId, userId)))
.limit(1)
if (!accounts.length) {
return {
success: false,
error: `No ${provider} account connected. Please connect your account first.`,
}
}
const acc = accounts[0]
const requestId = generateRequestId()
const { accessToken } = await refreshTokenIfNeeded(requestId, acc as any, acc.id)
if (!accessToken) {
return {
success: false,
error: `OAuth token not available for ${provider}. Please reconnect your account.`,
}
}
executionParams.accessToken = accessToken
}
if (toolConfig.params?.apiKey?.required && !executionParams.apiKey) {
return {
success: false,
error: `API key not provided for ${toolName}. Use {{YOUR_API_KEY_ENV_VAR}} to reference your environment variable.`,
}
}
executionParams._context = {
workflowId,
userId,
}
if (toolName === 'function_execute') {
executionParams.envVars = decryptedEnvVars
executionParams.workflowVariables = {}
executionParams.blockData = {}
executionParams.blockNameMapping = {}
executionParams.language = executionParams.language || 'javascript'
executionParams.timeout = executionParams.timeout || 30000
}
const result = await executeTool(toolName, executionParams)
return {
success: result.success,
output: result.output,
error: result.error,
}
}

View File

@@ -0,0 +1,769 @@
import crypto from 'crypto'
import { db } from '@sim/db'
import { customTools, permissions, workflow, workflowFolder, workspace } from '@sim/db/schema'
import { and, asc, desc, eq, inArray, isNull, max, or } from 'drizzle-orm'
import type { ExecutionContext, ToolCallResult } from '@/lib/copilot/orchestrator/types'
import {
extractWorkflowNames,
formatNormalizedWorkflowForCopilot,
normalizeWorkflowName,
} from '@/lib/copilot/tools/shared/workflow-utils'
import { generateRequestId } from '@/lib/core/utils/request'
import { mcpService } from '@/lib/mcp/service'
import { listWorkspaceFiles } from '@/lib/uploads/contexts/workspace'
import { getBlockOutputPaths } from '@/lib/workflows/blocks/block-outputs'
import { BlockPathCalculator } from '@/lib/workflows/blocks/block-path-calculator'
import { buildDefaultWorkflowArtifacts } from '@/lib/workflows/defaults'
import { executeWorkflow } from '@/lib/workflows/executor/execute-workflow'
import {
loadWorkflowFromNormalizedTables,
saveWorkflowToNormalizedTables,
} from '@/lib/workflows/persistence/utils'
import { isInputDefinitionTrigger } from '@/lib/workflows/triggers/input-definition-triggers'
import { ensureWorkflowAccess, ensureWorkspaceAccess, getAccessibleWorkflowsForUser, getDefaultWorkspaceId } from './access'
import { normalizeName } from '@/executor/constants'
export async function executeGetUserWorkflow(
params: Record<string, any>,
context: ExecutionContext
): Promise<ToolCallResult> {
try {
const workflowId = params.workflowId || context.workflowId
if (!workflowId) {
return { success: false, error: 'workflowId is required' }
}
const { workflow: workflowRecord, workspaceId } = await ensureWorkflowAccess(
workflowId,
context.userId
)
const normalized = await loadWorkflowFromNormalizedTables(workflowId)
const userWorkflow = formatNormalizedWorkflowForCopilot(normalized)
if (!userWorkflow) {
return { success: false, error: 'Workflow has no normalized data' }
}
return {
success: true,
output: {
workflowId,
workflowName: workflowRecord.name || '',
workspaceId,
userWorkflow,
},
}
} catch (error) {
return { success: false, error: error instanceof Error ? error.message : String(error) }
}
}
export async function executeGetWorkflowFromName(
params: Record<string, any>,
context: ExecutionContext
): Promise<ToolCallResult> {
try {
const workflowName = typeof params.workflow_name === 'string' ? params.workflow_name.trim() : ''
if (!workflowName) {
return { success: false, error: 'workflow_name is required' }
}
const workflows = await getAccessibleWorkflowsForUser(context.userId)
const targetName = normalizeWorkflowName(workflowName)
const match = workflows.find((w) => normalizeWorkflowName(w.name) === targetName)
if (!match) {
return { success: false, error: `Workflow not found: ${workflowName}` }
}
const normalized = await loadWorkflowFromNormalizedTables(match.id)
const userWorkflow = formatNormalizedWorkflowForCopilot(normalized)
if (!userWorkflow) {
return { success: false, error: 'Workflow has no normalized data' }
}
return {
success: true,
output: {
workflowId: match.id,
workflowName: match.name || '',
workspaceId: match.workspaceId,
userWorkflow,
},
}
} catch (error) {
return { success: false, error: error instanceof Error ? error.message : String(error) }
}
}
export async function executeListUserWorkflows(
params: Record<string, any>,
context: ExecutionContext
): Promise<ToolCallResult> {
try {
const workspaceId = params?.workspaceId as string | undefined
const folderId = params?.folderId as string | undefined
const workflows = await getAccessibleWorkflowsForUser(context.userId, { workspaceId, folderId })
const names = extractWorkflowNames(workflows)
const workflowList = workflows.map((w) => ({
workflowId: w.id,
workflowName: w.name || '',
workspaceId: w.workspaceId,
folderId: w.folderId,
}))
return { success: true, output: { workflow_names: names, workflows: workflowList } }
} catch (error) {
return { success: false, error: error instanceof Error ? error.message : String(error) }
}
}
export async function executeListUserWorkspaces(context: ExecutionContext): Promise<ToolCallResult> {
try {
const workspaces = await db
.select({
workspaceId: workspace.id,
workspaceName: workspace.name,
ownerId: workspace.ownerId,
permissionType: permissions.permissionType,
})
.from(permissions)
.innerJoin(workspace, eq(permissions.entityId, workspace.id))
.where(and(eq(permissions.userId, context.userId), eq(permissions.entityType, 'workspace')))
.orderBy(desc(workspace.createdAt))
const output = workspaces.map((row) => ({
workspaceId: row.workspaceId,
workspaceName: row.workspaceName,
role: row.ownerId === context.userId ? 'owner' : row.permissionType,
}))
return { success: true, output: { workspaces: output } }
} catch (error) {
return { success: false, error: error instanceof Error ? error.message : String(error) }
}
}
export async function executeListFolders(
params: Record<string, any>,
context: ExecutionContext
): Promise<ToolCallResult> {
try {
const workspaceId =
(params?.workspaceId as string | undefined) || (await getDefaultWorkspaceId(context.userId))
await ensureWorkspaceAccess(workspaceId, context.userId, false)
const folders = await db
.select({
folderId: workflowFolder.id,
folderName: workflowFolder.name,
parentId: workflowFolder.parentId,
sortOrder: workflowFolder.sortOrder,
})
.from(workflowFolder)
.where(eq(workflowFolder.workspaceId, workspaceId))
.orderBy(asc(workflowFolder.sortOrder), asc(workflowFolder.createdAt))
return {
success: true,
output: {
workspaceId,
folders,
},
}
} catch (error) {
return { success: false, error: error instanceof Error ? error.message : String(error) }
}
}
export async function executeCreateWorkflow(
params: Record<string, any>,
context: ExecutionContext
): Promise<ToolCallResult> {
try {
const name = typeof params?.name === 'string' ? params.name.trim() : ''
if (!name) {
return { success: false, error: 'name is required' }
}
const workspaceId = params?.workspaceId || (await getDefaultWorkspaceId(context.userId))
const folderId = params?.folderId || null
const description = typeof params?.description === 'string' ? params.description : null
await ensureWorkspaceAccess(workspaceId, context.userId, true)
const workflowId = crypto.randomUUID()
const now = new Date()
const folderCondition = folderId ? eq(workflow.folderId, folderId) : isNull(workflow.folderId)
const [maxResult] = await db
.select({ maxOrder: max(workflow.sortOrder) })
.from(workflow)
.where(and(eq(workflow.workspaceId, workspaceId), folderCondition))
const sortOrder = (maxResult?.maxOrder ?? 0) + 1
await db.insert(workflow).values({
id: workflowId,
userId: context.userId,
workspaceId,
folderId,
sortOrder,
name,
description,
color: '#3972F6',
lastSynced: now,
createdAt: now,
updatedAt: now,
isDeployed: false,
runCount: 0,
variables: {},
})
const { workflowState } = buildDefaultWorkflowArtifacts()
const saveResult = await saveWorkflowToNormalizedTables(workflowId, workflowState)
if (!saveResult.success) {
throw new Error(saveResult.error || 'Failed to save workflow state')
}
return {
success: true,
output: {
workflowId,
workflowName: name,
workspaceId,
folderId,
},
}
} catch (error) {
return { success: false, error: error instanceof Error ? error.message : String(error) }
}
}
export async function executeCreateFolder(
params: Record<string, any>,
context: ExecutionContext
): Promise<ToolCallResult> {
try {
const name = typeof params?.name === 'string' ? params.name.trim() : ''
if (!name) {
return { success: false, error: 'name is required' }
}
const workspaceId = params?.workspaceId || (await getDefaultWorkspaceId(context.userId))
const parentId = params?.parentId || null
await ensureWorkspaceAccess(workspaceId, context.userId, true)
const [maxResult] = await db
.select({ maxOrder: max(workflowFolder.sortOrder) })
.from(workflowFolder)
.where(
and(
eq(workflowFolder.workspaceId, workspaceId),
parentId ? eq(workflowFolder.parentId, parentId) : isNull(workflowFolder.parentId)
)
)
const sortOrder = (maxResult?.maxOrder ?? 0) + 1
const folderId = crypto.randomUUID()
await db.insert(workflowFolder).values({
id: folderId,
workspaceId,
parentId,
name,
sortOrder,
createdAt: new Date(),
updatedAt: new Date(),
})
return { success: true, output: { folderId, name, workspaceId, parentId } }
} catch (error) {
return { success: false, error: error instanceof Error ? error.message : String(error) }
}
}
export async function executeGetWorkflowData(
params: Record<string, any>,
context: ExecutionContext
): Promise<ToolCallResult> {
try {
const workflowId = params.workflowId || context.workflowId
const dataType = params.data_type || params.dataType || ''
if (!workflowId) {
return { success: false, error: 'workflowId is required' }
}
if (!dataType) {
return { success: false, error: 'data_type is required' }
}
const { workflow: workflowRecord, workspaceId } = await ensureWorkflowAccess(
workflowId,
context.userId
)
if (dataType === 'global_variables') {
const variablesRecord = (workflowRecord.variables as Record<string, any>) || {}
const variables = Object.values(variablesRecord).map((v: any) => ({
id: String(v?.id || ''),
name: String(v?.name || ''),
value: v?.value,
}))
return { success: true, output: { variables } }
}
if (dataType === 'custom_tools') {
if (!workspaceId) {
return { success: false, error: 'workspaceId is required' }
}
const conditions = [
eq(customTools.workspaceId, workspaceId),
and(eq(customTools.userId, context.userId), isNull(customTools.workspaceId)),
]
const toolsRows = await db
.select()
.from(customTools)
.where(or(...conditions))
.orderBy(desc(customTools.createdAt))
const customToolsData = toolsRows.map((tool) => ({
id: String(tool.id || ''),
title: String(tool.title || ''),
functionName: String((tool.schema as any)?.function?.name || ''),
description: String((tool.schema as any)?.function?.description || ''),
parameters: (tool.schema as any)?.function?.parameters,
}))
return { success: true, output: { customTools: customToolsData } }
}
if (dataType === 'mcp_tools') {
if (!workspaceId) {
return { success: false, error: 'workspaceId is required' }
}
const tools = await mcpService.discoverTools(context.userId, workspaceId, false)
const mcpTools = tools.map((tool) => ({
name: String(tool.name || ''),
serverId: String(tool.serverId || ''),
serverName: String(tool.serverName || ''),
description: String(tool.description || ''),
inputSchema: tool.inputSchema,
}))
return { success: true, output: { mcpTools } }
}
if (dataType === 'files') {
if (!workspaceId) {
return { success: false, error: 'workspaceId is required' }
}
const files = await listWorkspaceFiles(workspaceId)
const fileResults = files.map((file) => ({
id: String(file.id || ''),
name: String(file.name || ''),
key: String(file.key || ''),
path: String(file.path || ''),
size: Number(file.size || 0),
type: String(file.type || ''),
uploadedAt: String(file.uploadedAt || ''),
}))
return { success: true, output: { files: fileResults } }
}
return { success: false, error: `Unknown data_type: ${dataType}` }
} catch (error) {
return { success: false, error: error instanceof Error ? error.message : String(error) }
}
}
export async function executeGetBlockOutputs(
params: Record<string, any>,
context: ExecutionContext
): Promise<ToolCallResult> {
try {
const workflowId = params.workflowId || context.workflowId
if (!workflowId) {
return { success: false, error: 'workflowId is required' }
}
await ensureWorkflowAccess(workflowId, context.userId)
const normalized = await loadWorkflowFromNormalizedTables(workflowId)
if (!normalized) {
return { success: false, error: 'Workflow has no normalized data' }
}
const blocks = normalized.blocks || {}
const loops = normalized.loops || {}
const parallels = normalized.parallels || {}
const blockIds =
Array.isArray(params.blockIds) && params.blockIds.length > 0
? params.blockIds
: Object.keys(blocks)
const results: Array<{
blockId: string
blockName: string
blockType: string
outputs: string[]
insideSubflowOutputs?: string[]
outsideSubflowOutputs?: string[]
triggerMode?: boolean
}> = []
for (const blockId of blockIds) {
const block = blocks[blockId]
if (!block?.type) continue
const blockName = block.name || block.type
if (block.type === 'loop' || block.type === 'parallel') {
const insidePaths = getSubflowInsidePaths(block.type, blockId, loops, parallels)
results.push({
blockId,
blockName,
blockType: block.type,
outputs: [],
insideSubflowOutputs: formatOutputsWithPrefix(insidePaths, blockName),
outsideSubflowOutputs: formatOutputsWithPrefix(['results'], blockName),
triggerMode: block.triggerMode,
})
continue
}
const outputs = getBlockOutputPaths(block.type, block.subBlocks, block.triggerMode)
results.push({
blockId,
blockName,
blockType: block.type,
outputs: formatOutputsWithPrefix(outputs, blockName),
triggerMode: block.triggerMode,
})
}
const variables = await getWorkflowVariablesForTool(workflowId)
const payload = { blocks: results, variables }
return { success: true, output: payload }
} catch (error) {
return { success: false, error: error instanceof Error ? error.message : String(error) }
}
}
export async function executeGetBlockUpstreamReferences(
params: Record<string, any>,
context: ExecutionContext
): Promise<ToolCallResult> {
try {
const workflowId = params.workflowId || context.workflowId
if (!workflowId) {
return { success: false, error: 'workflowId is required' }
}
if (!Array.isArray(params.blockIds) || params.blockIds.length === 0) {
return { success: false, error: 'blockIds array is required' }
}
await ensureWorkflowAccess(workflowId, context.userId)
const normalized = await loadWorkflowFromNormalizedTables(workflowId)
if (!normalized) {
return { success: false, error: 'Workflow has no normalized data' }
}
const blocks = normalized.blocks || {}
const edges = normalized.edges || []
const loops = normalized.loops || {}
const parallels = normalized.parallels || {}
const graphEdges = edges.map((edge: any) => ({ source: edge.source, target: edge.target }))
const variableOutputs = await getWorkflowVariablesForTool(workflowId)
const results: any[] = []
for (const blockId of params.blockIds) {
const targetBlock = blocks[blockId]
if (!targetBlock) continue
const insideSubflows: Array<{ blockId: string; blockName: string; blockType: string }> = []
const containingLoopIds = new Set<string>()
const containingParallelIds = new Set<string>()
Object.values(loops as Record<string, any>).forEach((loop) => {
if (loop?.nodes?.includes(blockId)) {
containingLoopIds.add(loop.id)
const loopBlock = blocks[loop.id]
if (loopBlock) {
insideSubflows.push({
blockId: loop.id,
blockName: loopBlock.name || loopBlock.type,
blockType: 'loop',
})
}
}
})
Object.values(parallels as Record<string, any>).forEach((parallel) => {
if (parallel?.nodes?.includes(blockId)) {
containingParallelIds.add(parallel.id)
const parallelBlock = blocks[parallel.id]
if (parallelBlock) {
insideSubflows.push({
blockId: parallel.id,
blockName: parallelBlock.name || parallelBlock.type,
blockType: 'parallel',
})
}
}
})
const ancestorIds = BlockPathCalculator.findAllPathNodes(graphEdges, blockId)
const accessibleIds = new Set<string>(ancestorIds)
accessibleIds.add(blockId)
const starterBlock = Object.values(blocks).find((b: any) => isInputDefinitionTrigger(b.type))
if (starterBlock && ancestorIds.includes((starterBlock as any).id)) {
accessibleIds.add((starterBlock as any).id)
}
containingLoopIds.forEach((loopId) => {
accessibleIds.add(loopId)
loops[loopId]?.nodes?.forEach((nodeId: string) => accessibleIds.add(nodeId))
})
containingParallelIds.forEach((parallelId) => {
accessibleIds.add(parallelId)
parallels[parallelId]?.nodes?.forEach((nodeId: string) => accessibleIds.add(nodeId))
})
const accessibleBlocks: any[] = []
for (const accessibleBlockId of accessibleIds) {
const block = blocks[accessibleBlockId]
if (!block?.type) continue
const canSelfReference = block.type === 'approval' || block.type === 'human_in_the_loop'
if (accessibleBlockId === blockId && !canSelfReference) continue
const blockName = block.name || block.type
let accessContext: 'inside' | 'outside' | undefined
let outputPaths: string[]
if (block.type === 'loop' || block.type === 'parallel') {
const isInside =
(block.type === 'loop' && containingLoopIds.has(accessibleBlockId)) ||
(block.type === 'parallel' && containingParallelIds.has(accessibleBlockId))
accessContext = isInside ? 'inside' : 'outside'
outputPaths = isInside
? getSubflowInsidePaths(block.type, accessibleBlockId, loops, parallels)
: ['results']
} else {
outputPaths = getBlockOutputPaths(block.type, block.subBlocks, block.triggerMode)
}
const formattedOutputs = formatOutputsWithPrefix(outputPaths, blockName)
const entry: any = {
blockId: accessibleBlockId,
blockName,
blockType: block.type,
outputs: formattedOutputs,
}
if (block.triggerMode) entry.triggerMode = true
if (accessContext) entry.accessContext = accessContext
accessibleBlocks.push(entry)
}
results.push({
blockId,
blockName: targetBlock.name || targetBlock.type,
blockType: targetBlock.type,
accessibleBlocks,
insideSubflows,
variables: variableOutputs,
})
}
const payload = { results }
return { success: true, output: payload }
} catch (error) {
return { success: false, error: error instanceof Error ? error.message : String(error) }
}
}
export async function executeRunWorkflow(
params: Record<string, any>,
context: ExecutionContext
): Promise<ToolCallResult> {
try {
const workflowId = params.workflowId || context.workflowId
if (!workflowId) {
return { success: false, error: 'workflowId is required' }
}
const { workflow: workflowRecord } = await ensureWorkflowAccess(workflowId, context.userId)
const result = await executeWorkflow(
{
id: workflowRecord.id,
userId: workflowRecord.userId,
workspaceId: workflowRecord.workspaceId,
variables: workflowRecord.variables || {},
},
generateRequestId(),
params.workflow_input || params.input || undefined,
context.userId
)
return {
success: result.success,
output: {
executionId: result.executionId,
success: result.success,
output: result.output,
logs: result.logs,
},
error: result.success ? undefined : result.error || 'Workflow execution failed',
}
} catch (error) {
return { success: false, error: error instanceof Error ? error.message : String(error) }
}
}
export async function executeSetGlobalWorkflowVariables(
params: Record<string, any>,
context: ExecutionContext
): Promise<ToolCallResult> {
try {
const workflowId = params.workflowId || context.workflowId
if (!workflowId) {
return { success: false, error: 'workflowId is required' }
}
const operations = Array.isArray(params.operations) ? params.operations : []
const { workflow: workflowRecord } = await ensureWorkflowAccess(workflowId, context.userId)
const currentVarsRecord = (workflowRecord.variables as Record<string, any>) || {}
const byName: Record<string, any> = {}
Object.values(currentVarsRecord).forEach((v: any) => {
if (v && typeof v === 'object' && v.id && v.name) byName[String(v.name)] = v
})
for (const op of operations) {
const key = String(op?.name || '')
if (!key) continue
const nextType = op?.type || byName[key]?.type || 'plain'
const coerceValue = (value: any, type: string) => {
if (value === undefined) return value
if (type === 'number') {
const n = Number(value)
return Number.isNaN(n) ? value : n
}
if (type === 'boolean') {
const v = String(value).trim().toLowerCase()
if (v === 'true') return true
if (v === 'false') return false
return value
}
if (type === 'array' || type === 'object') {
try {
const parsed = JSON.parse(String(value))
if (type === 'array' && Array.isArray(parsed)) return parsed
if (type === 'object' && parsed && typeof parsed === 'object' && !Array.isArray(parsed))
return parsed
} catch {}
return value
}
return value
}
if (op.operation === 'delete') {
delete byName[key]
continue
}
const typedValue = coerceValue(op.value, nextType)
if (op.operation === 'add') {
byName[key] = {
id: crypto.randomUUID(),
workflowId,
name: key,
type: nextType,
value: typedValue,
}
continue
}
if (op.operation === 'edit') {
if (!byName[key]) {
byName[key] = {
id: crypto.randomUUID(),
workflowId,
name: key,
type: nextType,
value: typedValue,
}
} else {
byName[key] = {
...byName[key],
type: nextType,
value: typedValue,
}
}
}
}
const nextVarsRecord = Object.fromEntries(
Object.values(byName).map((v: any) => [String(v.id), v])
)
await db
.update(workflow)
.set({ variables: nextVarsRecord, updatedAt: new Date() })
.where(eq(workflow.id, workflowId))
return { success: true, output: { updated: Object.values(byName).length } }
} catch (error) {
return { success: false, error: error instanceof Error ? error.message : String(error) }
}
}
async function getWorkflowVariablesForTool(
workflowId: string
): Promise<Array<{ id: string; name: string; type: string; tag: string }>> {
const [workflowRecord] = await db
.select({ variables: workflow.variables })
.from(workflow)
.where(eq(workflow.id, workflowId))
.limit(1)
const variablesRecord = (workflowRecord?.variables as Record<string, any>) || {}
return Object.values(variablesRecord)
.filter((v: any) => v?.name && String(v.name).trim() !== '')
.map((v: any) => ({
id: String(v.id || ''),
name: String(v.name || ''),
type: String(v.type || 'plain'),
tag: `variable.${normalizeName(String(v.name || ''))}`,
}))
}
function getSubflowInsidePaths(
blockType: 'loop' | 'parallel',
blockId: string,
loops: Record<string, any>,
parallels: Record<string, any>
): string[] {
const paths = ['index']
if (blockType === 'loop') {
const loopType = loops[blockId]?.loopType || 'for'
if (loopType === 'forEach') {
paths.push('currentItem', 'items')
}
} else {
const parallelType = parallels[blockId]?.parallelType || 'count'
if (parallelType === 'collection') {
paths.push('currentItem', 'items')
}
}
return paths
}
function formatOutputsWithPrefix(paths: string[], blockName: string): string[] {
const normalizedName = normalizeName(blockName)
return paths.map((path) => `${normalizedName}.${path}`)
}

View File

@@ -48,7 +48,8 @@ export class GetBlocksAndToolsClientTool extends BaseClientTool {
const parsed = ExecuteResponseSuccessSchema.parse(json)
const result = GetBlocksAndToolsResult.parse(parsed.result)
await this.markToolComplete(200, 'Successfully retrieved blocks and tools', result)
// TODO: Temporarily sending empty data to test 403 issue
await this.markToolComplete(200, 'Successfully retrieved blocks and tools', {})
this.setState(ClientToolCallState.success)
} catch (error: any) {
const message = error instanceof Error ? error.message : String(error)

View File

@@ -35,6 +35,11 @@ export const env = createEnv({
SIM_AGENT_API_URL: z.string().url().optional(), // URL for internal sim agent API
AGENT_INDEXER_URL: z.string().url().optional(), // URL for agent training data indexer
AGENT_INDEXER_API_KEY: z.string().min(1).optional(), // API key for agent indexer authentication
COPILOT_STREAM_TTL_SECONDS: z.number().optional(), // Redis TTL for copilot SSE buffer
COPILOT_STREAM_EVENT_LIMIT: z.number().optional(), // Max events retained per stream
COPILOT_STREAM_RESERVE_BATCH: z.number().optional(), // Event ID reservation batch size
COPILOT_STREAM_FLUSH_INTERVAL_MS: z.number().optional(), // Buffer flush interval in ms
COPILOT_STREAM_FLUSH_MAX_BATCH: z.number().optional(), // Max events per flush batch
// Database & Storage
REDIS_URL: z.string().url().optional(), // Redis connection string for caching/sessions

View File

@@ -5,6 +5,11 @@ import { create } from 'zustand'
import { devtools } from 'zustand/middleware'
import { type CopilotChat, sendStreamingMessage } from '@/lib/copilot/api'
import type { CopilotTransportMode } from '@/lib/copilot/models'
import {
normalizeSseEvent,
shouldSkipToolCallEvent,
shouldSkipToolResultEvent,
} from '@/lib/copilot/orchestrator/sse-utils'
import type {
BaseClientToolMetadata,
ClientToolDisplay,
@@ -2045,6 +2050,12 @@ async function applySseEvent(
get: () => CopilotStore,
set: (next: Partial<CopilotStore> | ((state: CopilotStore) => Partial<CopilotStore>)) => void
): Promise<boolean> {
const normalizedEvent = normalizeSseEvent(data)
if (shouldSkipToolCallEvent(normalizedEvent) || shouldSkipToolResultEvent(normalizedEvent)) {
return true
}
data = normalizedEvent
if (data.type === 'subagent_start') {
const toolCallId = data.data?.tool_call_id
if (toolCallId) {

View File

@@ -1,6 +1,5 @@
{
"lockfileVersion": 1,
"configVersion": 0,
"workspaces": {
"": {
"name": "simstudio",

View File

@@ -836,6 +836,15 @@ describe('POST /api/v1/copilot/chat', () => {
- Save checkpoints to resume
- Handle partial completions gracefully
### Risk 6: Process-Local Dedupe
**Risk**: Tool call/result dedupe caches are in-memory and scoped to a single process, so duplicate events can still appear across ECS tasks.
**Mitigation**:
- Treat dedupe as best-effort, not global
- Prefer idempotent state updates on the client
- Use Redis-backed stream replay for authoritative ordering
---
## File Inventory