mirror of
https://github.com/simstudioai/sim.git
synced 2026-02-06 04:35:03 -05:00
Checkpoint
This commit is contained in:
@@ -1,16 +1,11 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { SIM_AGENT_API_URL_DEFAULT } from '@/lib/copilot/constants'
|
||||
import { handleSubagentRouting, sseHandlers, subAgentHandlers } from '@/lib/copilot/orchestrator/sse-handlers'
|
||||
import {
|
||||
getToolCallIdFromEvent,
|
||||
handleSubagentRouting,
|
||||
markToolCallSeen,
|
||||
markToolResultSeen,
|
||||
normalizeSseEvent,
|
||||
sseHandlers,
|
||||
subAgentHandlers,
|
||||
wasToolCallSeen,
|
||||
wasToolResultSeen,
|
||||
} from '@/lib/copilot/orchestrator/sse-handlers'
|
||||
shouldSkipToolCallEvent,
|
||||
shouldSkipToolResultEvent,
|
||||
} from '@/lib/copilot/orchestrator/sse-utils'
|
||||
import { parseSSEStream } from '@/lib/copilot/orchestrator/sse-parser'
|
||||
import { prepareExecutionContext } from '@/lib/copilot/orchestrator/tool-executor'
|
||||
import type {
|
||||
@@ -99,38 +94,9 @@ export async function orchestrateCopilotStream(
|
||||
|
||||
const normalizedEvent = normalizeSseEvent(event)
|
||||
|
||||
// Skip tool_result events for tools the sim-side already executed.
|
||||
// The sim-side emits its own tool_result with complete data.
|
||||
// For server-side tools (not executed by sim), we still forward the Go backend's tool_result.
|
||||
const toolCallId = getToolCallIdFromEvent(normalizedEvent)
|
||||
const eventData = normalizedEvent.data
|
||||
|
||||
const isPartialToolCall =
|
||||
normalizedEvent.type === 'tool_call' && eventData?.partial === true
|
||||
|
||||
const shouldSkipToolCall =
|
||||
normalizedEvent.type === 'tool_call' &&
|
||||
!!toolCallId &&
|
||||
!isPartialToolCall &&
|
||||
(wasToolResultSeen(toolCallId) || wasToolCallSeen(toolCallId))
|
||||
|
||||
if (
|
||||
normalizedEvent.type === 'tool_call' &&
|
||||
toolCallId &&
|
||||
!isPartialToolCall &&
|
||||
!shouldSkipToolCall
|
||||
) {
|
||||
markToolCallSeen(toolCallId)
|
||||
}
|
||||
|
||||
const shouldSkipToolResult =
|
||||
normalizedEvent.type === 'tool_result' &&
|
||||
(() => {
|
||||
if (!toolCallId) return false
|
||||
if (wasToolResultSeen(toolCallId)) return true
|
||||
markToolResultSeen(toolCallId)
|
||||
return false
|
||||
})()
|
||||
// Skip duplicate tool events to prevent state regressions.
|
||||
const shouldSkipToolCall = shouldSkipToolCallEvent(normalizedEvent)
|
||||
const shouldSkipToolResult = shouldSkipToolResultEvent(normalizedEvent)
|
||||
|
||||
if (!shouldSkipToolCall && !shouldSkipToolResult) {
|
||||
await forwardEvent(normalizedEvent, options)
|
||||
|
||||
95
apps/sim/lib/copilot/orchestrator/sse-handlers.test.ts
Normal file
95
apps/sim/lib/copilot/orchestrator/sse-handlers.test.ts
Normal file
@@ -0,0 +1,95 @@
|
||||
/**
|
||||
* @vitest-environment node
|
||||
*/
|
||||
import { beforeEach, describe, expect, it, vi } from 'vitest'
|
||||
import { loggerMock } from '@sim/testing'
|
||||
|
||||
vi.mock('@sim/logger', () => loggerMock)
|
||||
|
||||
const executeToolServerSide = vi.fn()
|
||||
const markToolComplete = vi.fn()
|
||||
|
||||
vi.mock('@/lib/copilot/orchestrator/tool-executor', () => ({
|
||||
executeToolServerSide,
|
||||
markToolComplete,
|
||||
}))
|
||||
|
||||
import { sseHandlers } from '@/lib/copilot/orchestrator/sse-handlers'
|
||||
import type { ExecutionContext, StreamingContext } from '@/lib/copilot/orchestrator/types'
|
||||
|
||||
describe('sse-handlers tool lifecycle', () => {
|
||||
let context: StreamingContext
|
||||
let execContext: ExecutionContext
|
||||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks()
|
||||
context = {
|
||||
chatId: undefined,
|
||||
conversationId: undefined,
|
||||
messageId: 'msg-1',
|
||||
accumulatedContent: '',
|
||||
contentBlocks: [],
|
||||
toolCalls: new Map(),
|
||||
currentThinkingBlock: null,
|
||||
isInThinkingBlock: false,
|
||||
subAgentParentToolCallId: undefined,
|
||||
subAgentContent: {},
|
||||
subAgentToolCalls: {},
|
||||
pendingContent: '',
|
||||
streamComplete: false,
|
||||
wasAborted: false,
|
||||
errors: [],
|
||||
}
|
||||
execContext = {
|
||||
userId: 'user-1',
|
||||
workflowId: 'workflow-1',
|
||||
}
|
||||
})
|
||||
|
||||
it('executes tool_call and emits tool_result + mark-complete', async () => {
|
||||
executeToolServerSide.mockResolvedValueOnce({ success: true, output: { ok: true } })
|
||||
markToolComplete.mockResolvedValueOnce(true)
|
||||
const onEvent = vi.fn()
|
||||
|
||||
await sseHandlers.tool_call(
|
||||
{
|
||||
type: 'tool_call',
|
||||
data: { id: 'tool-1', name: 'get_user_workflow', arguments: { workflowId: 'workflow-1' } },
|
||||
} as any,
|
||||
context,
|
||||
execContext,
|
||||
{ onEvent, interactive: false, timeout: 1000 }
|
||||
)
|
||||
|
||||
expect(executeToolServerSide).toHaveBeenCalledTimes(1)
|
||||
expect(markToolComplete).toHaveBeenCalledTimes(1)
|
||||
expect(onEvent).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
type: 'tool_result',
|
||||
toolCallId: 'tool-1',
|
||||
success: true,
|
||||
})
|
||||
)
|
||||
|
||||
const updated = context.toolCalls.get('tool-1')
|
||||
expect(updated?.status).toBe('success')
|
||||
expect(updated?.result?.output).toEqual({ ok: true })
|
||||
})
|
||||
|
||||
it('skips duplicate tool_call after result', async () => {
|
||||
executeToolServerSide.mockResolvedValueOnce({ success: true, output: { ok: true } })
|
||||
markToolComplete.mockResolvedValueOnce(true)
|
||||
|
||||
const event = {
|
||||
type: 'tool_call',
|
||||
data: { id: 'tool-dup', name: 'get_user_workflow', arguments: { workflowId: 'workflow-1' } },
|
||||
}
|
||||
|
||||
await sseHandlers.tool_call(event as any, context, execContext, { interactive: false })
|
||||
await sseHandlers.tool_call(event as any, context, execContext, { interactive: false })
|
||||
|
||||
expect(executeToolServerSide).toHaveBeenCalledTimes(1)
|
||||
expect(markToolComplete).toHaveBeenCalledTimes(1)
|
||||
})
|
||||
})
|
||||
|
||||
@@ -1,6 +1,11 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { INTERRUPT_TOOL_SET, SUBAGENT_TOOL_SET } from '@/lib/copilot/orchestrator/config'
|
||||
import { getToolConfirmation } from '@/lib/copilot/orchestrator/persistence'
|
||||
import {
|
||||
getEventData,
|
||||
markToolResultSeen,
|
||||
wasToolResultSeen,
|
||||
} from '@/lib/copilot/orchestrator/sse-utils'
|
||||
import { executeToolServerSide, markToolComplete } from '@/lib/copilot/orchestrator/tool-executor'
|
||||
import type {
|
||||
ContentBlock,
|
||||
@@ -13,109 +18,7 @@ import type {
|
||||
|
||||
const logger = createLogger('CopilotSseHandlers')
|
||||
|
||||
/**
|
||||
* Tracks tool call IDs for which a tool_call has already been forwarded/emitted (non-partial).
|
||||
*/
|
||||
const seenToolCalls = new Set<string>()
|
||||
|
||||
/**
|
||||
* Tracks tool call IDs for which a tool_result has already been emitted or forwarded.
|
||||
*/
|
||||
const seenToolResults = new Set<string>()
|
||||
|
||||
export function markToolCallSeen(toolCallId: string): void {
|
||||
seenToolCalls.add(toolCallId)
|
||||
setTimeout(
|
||||
() => {
|
||||
seenToolCalls.delete(toolCallId)
|
||||
},
|
||||
5 * 60 * 1000
|
||||
)
|
||||
}
|
||||
|
||||
export function wasToolCallSeen(toolCallId: string): boolean {
|
||||
return seenToolCalls.has(toolCallId)
|
||||
}
|
||||
|
||||
type EventDataObject = Record<string, any> | undefined
|
||||
|
||||
const parseEventData = (data: unknown): EventDataObject => {
|
||||
if (!data) return undefined
|
||||
if (typeof data !== 'string') {
|
||||
return data as EventDataObject
|
||||
}
|
||||
try {
|
||||
return JSON.parse(data) as EventDataObject
|
||||
} catch {
|
||||
return undefined
|
||||
}
|
||||
}
|
||||
|
||||
const hasToolFields = (data: EventDataObject): boolean => {
|
||||
if (!data) return false
|
||||
return (
|
||||
data.id !== undefined ||
|
||||
data.toolCallId !== undefined ||
|
||||
data.name !== undefined ||
|
||||
data.success !== undefined ||
|
||||
data.result !== undefined ||
|
||||
data.arguments !== undefined
|
||||
)
|
||||
}
|
||||
|
||||
const getEventData = (event: SSEEvent): EventDataObject => {
|
||||
const topLevel = parseEventData(event.data)
|
||||
if (!topLevel) return undefined
|
||||
if (hasToolFields(topLevel)) return topLevel
|
||||
const nested = parseEventData(topLevel.data)
|
||||
return nested || topLevel
|
||||
}
|
||||
|
||||
export function getToolCallIdFromEvent(event: SSEEvent): string | undefined {
|
||||
const data = getEventData(event)
|
||||
return event.toolCallId || data?.id || data?.toolCallId
|
||||
}
|
||||
|
||||
/** Normalizes SSE events so tool metadata is available at the top level. */
|
||||
export function normalizeSseEvent(event: SSEEvent): SSEEvent {
|
||||
if (!event) return event
|
||||
const data = getEventData(event)
|
||||
if (!data) return event
|
||||
const toolCallId = event.toolCallId || data.id || data.toolCallId
|
||||
const toolName = event.toolName || data.name || data.toolName
|
||||
const success = event.success ?? data.success
|
||||
const result = event.result ?? data.result
|
||||
const normalizedData = typeof event.data === 'string' ? data : event.data
|
||||
return {
|
||||
...event,
|
||||
data: normalizedData,
|
||||
toolCallId,
|
||||
toolName,
|
||||
success,
|
||||
result,
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark a tool call as executed by the sim-side.
|
||||
* This prevents the Go backend's duplicate tool_result from being forwarded.
|
||||
*/
|
||||
export function markToolResultSeen(toolCallId: string): void {
|
||||
seenToolResults.add(toolCallId)
|
||||
setTimeout(
|
||||
() => {
|
||||
seenToolResults.delete(toolCallId)
|
||||
},
|
||||
5 * 60 * 1000
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a tool call was executed by the sim-side.
|
||||
*/
|
||||
export function wasToolResultSeen(toolCallId: string): boolean {
|
||||
return seenToolResults.has(toolCallId)
|
||||
}
|
||||
// Normalization + dedupe helpers live in sse-utils to keep server/client in sync.
|
||||
|
||||
/**
|
||||
* Respond tools are internal to the copilot's subagent system.
|
||||
|
||||
43
apps/sim/lib/copilot/orchestrator/sse-utils.test.ts
Normal file
43
apps/sim/lib/copilot/orchestrator/sse-utils.test.ts
Normal file
@@ -0,0 +1,43 @@
|
||||
/**
|
||||
* @vitest-environment node
|
||||
*/
|
||||
import { describe, expect, it } from 'vitest'
|
||||
import {
|
||||
normalizeSseEvent,
|
||||
shouldSkipToolCallEvent,
|
||||
shouldSkipToolResultEvent,
|
||||
} from '@/lib/copilot/orchestrator/sse-utils'
|
||||
|
||||
describe('sse-utils', () => {
|
||||
it.concurrent('normalizes tool fields from string data', () => {
|
||||
const event = {
|
||||
type: 'tool_result',
|
||||
data: JSON.stringify({
|
||||
id: 'tool_1',
|
||||
name: 'edit_workflow',
|
||||
success: true,
|
||||
result: { ok: true },
|
||||
}),
|
||||
}
|
||||
|
||||
const normalized = normalizeSseEvent(event as any)
|
||||
|
||||
expect(normalized.toolCallId).toBe('tool_1')
|
||||
expect(normalized.toolName).toBe('edit_workflow')
|
||||
expect(normalized.success).toBe(true)
|
||||
expect(normalized.result).toEqual({ ok: true })
|
||||
})
|
||||
|
||||
it.concurrent('dedupes tool_call events', () => {
|
||||
const event = { type: 'tool_call', data: { id: 'tool_call_1', name: 'plan' } }
|
||||
expect(shouldSkipToolCallEvent(event as any)).toBe(false)
|
||||
expect(shouldSkipToolCallEvent(event as any)).toBe(true)
|
||||
})
|
||||
|
||||
it.concurrent('dedupes tool_result events', () => {
|
||||
const event = { type: 'tool_result', data: { id: 'tool_result_1', name: 'plan' } }
|
||||
expect(shouldSkipToolResultEvent(event as any)).toBe(false)
|
||||
expect(shouldSkipToolResultEvent(event as any)).toBe(true)
|
||||
})
|
||||
})
|
||||
|
||||
119
apps/sim/lib/copilot/orchestrator/sse-utils.ts
Normal file
119
apps/sim/lib/copilot/orchestrator/sse-utils.ts
Normal file
@@ -0,0 +1,119 @@
|
||||
import type { SSEEvent } from '@/lib/copilot/orchestrator/types'
|
||||
|
||||
type EventDataObject = Record<string, any> | undefined
|
||||
|
||||
const DEFAULT_TOOL_EVENT_TTL_MS = 5 * 60 * 1000
|
||||
|
||||
/**
|
||||
* In-memory tool event dedupe.
|
||||
*
|
||||
* NOTE: These sets are process-local only. In a multi-instance setup (e.g., ECS),
|
||||
* each task maintains its own dedupe cache, so duplicates can still appear across tasks.
|
||||
*/
|
||||
const seenToolCalls = new Set<string>()
|
||||
const seenToolResults = new Set<string>()
|
||||
|
||||
const parseEventData = (data: unknown): EventDataObject => {
|
||||
if (!data) return undefined
|
||||
if (typeof data !== 'string') {
|
||||
return data as EventDataObject
|
||||
}
|
||||
try {
|
||||
return JSON.parse(data) as EventDataObject
|
||||
} catch {
|
||||
return undefined
|
||||
}
|
||||
}
|
||||
|
||||
const hasToolFields = (data: EventDataObject): boolean => {
|
||||
if (!data) return false
|
||||
return (
|
||||
data.id !== undefined ||
|
||||
data.toolCallId !== undefined ||
|
||||
data.name !== undefined ||
|
||||
data.success !== undefined ||
|
||||
data.result !== undefined ||
|
||||
data.arguments !== undefined
|
||||
)
|
||||
}
|
||||
|
||||
export const getEventData = (event: SSEEvent): EventDataObject => {
|
||||
const topLevel = parseEventData(event.data)
|
||||
if (!topLevel) return undefined
|
||||
if (hasToolFields(topLevel)) return topLevel
|
||||
const nested = parseEventData(topLevel.data)
|
||||
return nested || topLevel
|
||||
}
|
||||
|
||||
export function getToolCallIdFromEvent(event: SSEEvent): string | undefined {
|
||||
const data = getEventData(event)
|
||||
return event.toolCallId || data?.id || data?.toolCallId
|
||||
}
|
||||
|
||||
/** Normalizes SSE events so tool metadata is available at the top level. */
|
||||
export function normalizeSseEvent(event: SSEEvent): SSEEvent {
|
||||
if (!event) return event
|
||||
const data = getEventData(event)
|
||||
if (!data) return event
|
||||
const toolCallId = event.toolCallId || data.id || data.toolCallId
|
||||
const toolName = event.toolName || data.name || data.toolName
|
||||
const success = event.success ?? data.success
|
||||
const result = event.result ?? data.result
|
||||
const normalizedData = typeof event.data === 'string' ? data : event.data
|
||||
return {
|
||||
...event,
|
||||
data: normalizedData,
|
||||
toolCallId,
|
||||
toolName,
|
||||
success,
|
||||
result,
|
||||
}
|
||||
}
|
||||
|
||||
export function markToolCallSeen(toolCallId: string, ttlMs: number = DEFAULT_TOOL_EVENT_TTL_MS): void {
|
||||
seenToolCalls.add(toolCallId)
|
||||
setTimeout(() => {
|
||||
seenToolCalls.delete(toolCallId)
|
||||
}, ttlMs)
|
||||
}
|
||||
|
||||
export function wasToolCallSeen(toolCallId: string): boolean {
|
||||
return seenToolCalls.has(toolCallId)
|
||||
}
|
||||
|
||||
export function markToolResultSeen(
|
||||
toolCallId: string,
|
||||
ttlMs: number = DEFAULT_TOOL_EVENT_TTL_MS
|
||||
): void {
|
||||
seenToolResults.add(toolCallId)
|
||||
setTimeout(() => {
|
||||
seenToolResults.delete(toolCallId)
|
||||
}, ttlMs)
|
||||
}
|
||||
|
||||
export function wasToolResultSeen(toolCallId: string): boolean {
|
||||
return seenToolResults.has(toolCallId)
|
||||
}
|
||||
|
||||
export function shouldSkipToolCallEvent(event: SSEEvent): boolean {
|
||||
if (event.type !== 'tool_call') return false
|
||||
const toolCallId = getToolCallIdFromEvent(event)
|
||||
if (!toolCallId) return false
|
||||
const eventData = getEventData(event)
|
||||
if (eventData?.partial === true) return false
|
||||
if (wasToolResultSeen(toolCallId) || wasToolCallSeen(toolCallId)) {
|
||||
return true
|
||||
}
|
||||
markToolCallSeen(toolCallId)
|
||||
return false
|
||||
}
|
||||
|
||||
export function shouldSkipToolResultEvent(event: SSEEvent): boolean {
|
||||
if (event.type !== 'tool_result') return false
|
||||
const toolCallId = getToolCallIdFromEvent(event)
|
||||
if (!toolCallId) return false
|
||||
if (wasToolResultSeen(toolCallId)) return true
|
||||
markToolResultSeen(toolCallId)
|
||||
return false
|
||||
}
|
||||
|
||||
118
apps/sim/lib/copilot/orchestrator/stream-buffer.test.ts
Normal file
118
apps/sim/lib/copilot/orchestrator/stream-buffer.test.ts
Normal file
@@ -0,0 +1,118 @@
|
||||
/**
|
||||
* @vitest-environment node
|
||||
*/
|
||||
import { beforeEach, describe, expect, it, vi } from 'vitest'
|
||||
import { loggerMock } from '@sim/testing'
|
||||
|
||||
vi.mock('@sim/logger', () => loggerMock)
|
||||
|
||||
type StoredEntry = { score: number; value: string }
|
||||
|
||||
const createRedisStub = () => {
|
||||
const events = new Map<string, StoredEntry[]>()
|
||||
const counters = new Map<string, number>()
|
||||
|
||||
const readEntries = (key: string, min: number, max: number) => {
|
||||
const list = events.get(key) || []
|
||||
return list
|
||||
.filter((entry) => entry.score >= min && entry.score <= max)
|
||||
.sort((a, b) => a.score - b.score)
|
||||
.map((entry) => entry.value)
|
||||
}
|
||||
|
||||
return {
|
||||
del: vi.fn().mockResolvedValue(1),
|
||||
hset: vi.fn().mockResolvedValue(1),
|
||||
hgetall: vi.fn().mockResolvedValue({}),
|
||||
expire: vi.fn().mockResolvedValue(1),
|
||||
eval: vi.fn().mockImplementation(
|
||||
(
|
||||
_lua: string,
|
||||
_keysCount: number,
|
||||
seqKey: string,
|
||||
eventsKey: string,
|
||||
_ttl: number,
|
||||
_limit: number,
|
||||
streamId: string,
|
||||
eventJson: string
|
||||
) => {
|
||||
const current = counters.get(seqKey) || 0
|
||||
const next = current + 1
|
||||
counters.set(seqKey, next)
|
||||
const entry = JSON.stringify({ eventId: next, streamId, event: JSON.parse(eventJson) })
|
||||
const list = events.get(eventsKey) || []
|
||||
list.push({ score: next, value: entry })
|
||||
events.set(eventsKey, list)
|
||||
return next
|
||||
}
|
||||
),
|
||||
incrby: vi.fn().mockImplementation((key: string, amount: number) => {
|
||||
const current = counters.get(key) || 0
|
||||
const next = current + amount
|
||||
counters.set(key, next)
|
||||
return next
|
||||
}),
|
||||
zrangebyscore: vi.fn().mockImplementation((key: string, min: string, max: string) => {
|
||||
const minVal = Number(min)
|
||||
const maxVal = max === '+inf' ? Number.POSITIVE_INFINITY : Number(max)
|
||||
return Promise.resolve(readEntries(key, minVal, maxVal))
|
||||
}),
|
||||
pipeline: vi.fn().mockImplementation(() => {
|
||||
const api = {
|
||||
zadd: vi.fn().mockImplementation((key: string, ...args: Array<string | number>) => {
|
||||
const list = events.get(key) || []
|
||||
for (let i = 0; i < args.length; i += 2) {
|
||||
list.push({ score: Number(args[i]), value: String(args[i + 1]) })
|
||||
}
|
||||
events.set(key, list)
|
||||
return api
|
||||
}),
|
||||
expire: vi.fn().mockReturnValue(api),
|
||||
zremrangebyrank: vi.fn().mockReturnValue(api),
|
||||
exec: vi.fn().mockResolvedValue([]),
|
||||
}
|
||||
return api
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
let mockRedis: ReturnType<typeof createRedisStub>
|
||||
|
||||
vi.mock('@/lib/core/config/redis', () => ({
|
||||
getRedisClient: () => mockRedis,
|
||||
}))
|
||||
|
||||
import {
|
||||
appendStreamEvent,
|
||||
createStreamEventWriter,
|
||||
readStreamEvents,
|
||||
} from '@/lib/copilot/orchestrator/stream-buffer'
|
||||
|
||||
describe('stream-buffer', () => {
|
||||
beforeEach(() => {
|
||||
mockRedis = createRedisStub()
|
||||
vi.clearAllMocks()
|
||||
})
|
||||
|
||||
it.concurrent('replays events after a given event id', async () => {
|
||||
await appendStreamEvent('stream-1', { type: 'content', data: 'hello' })
|
||||
await appendStreamEvent('stream-1', { type: 'content', data: 'world' })
|
||||
|
||||
const allEvents = await readStreamEvents('stream-1', 0)
|
||||
expect(allEvents.map((entry) => entry.event.data)).toEqual(['hello', 'world'])
|
||||
|
||||
const replayed = await readStreamEvents('stream-1', 1)
|
||||
expect(replayed.map((entry) => entry.event.data)).toEqual(['world'])
|
||||
})
|
||||
|
||||
it.concurrent('flushes buffered events for resume', async () => {
|
||||
const writer = createStreamEventWriter('stream-2')
|
||||
await writer.write({ type: 'content', data: 'a' })
|
||||
await writer.write({ type: 'content', data: 'b' })
|
||||
await writer.flush()
|
||||
|
||||
const events = await readStreamEvents('stream-2', 0)
|
||||
expect(events.map((entry) => entry.event.data)).toEqual(['a', 'b'])
|
||||
})
|
||||
})
|
||||
|
||||
@@ -1,13 +1,40 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { env } from '@/lib/core/config/env'
|
||||
import { getRedisClient } from '@/lib/core/config/redis'
|
||||
|
||||
const logger = createLogger('CopilotStreamBuffer')
|
||||
|
||||
const STREAM_TTL_SECONDS = 60 * 60
|
||||
const STREAM_EVENT_LIMIT = 5000
|
||||
const STREAM_RESERVE_BATCH = 200
|
||||
const STREAM_FLUSH_INTERVAL_MS = 15
|
||||
const STREAM_FLUSH_MAX_BATCH = 200
|
||||
const STREAM_DEFAULTS = {
|
||||
ttlSeconds: 60 * 60,
|
||||
eventLimit: 5000,
|
||||
reserveBatch: 200,
|
||||
flushIntervalMs: 15,
|
||||
flushMaxBatch: 200,
|
||||
}
|
||||
|
||||
export type StreamBufferConfig = {
|
||||
ttlSeconds: number
|
||||
eventLimit: number
|
||||
reserveBatch: number
|
||||
flushIntervalMs: number
|
||||
flushMaxBatch: number
|
||||
}
|
||||
|
||||
const parseNumber = (value: number | string | undefined, fallback: number): number => {
|
||||
if (typeof value === 'number' && Number.isFinite(value)) return value
|
||||
const parsed = Number(value)
|
||||
return Number.isFinite(parsed) ? parsed : fallback
|
||||
}
|
||||
|
||||
export function getStreamBufferConfig(): StreamBufferConfig {
|
||||
return {
|
||||
ttlSeconds: parseNumber(env.COPILOT_STREAM_TTL_SECONDS, STREAM_DEFAULTS.ttlSeconds),
|
||||
eventLimit: parseNumber(env.COPILOT_STREAM_EVENT_LIMIT, STREAM_DEFAULTS.eventLimit),
|
||||
reserveBatch: parseNumber(env.COPILOT_STREAM_RESERVE_BATCH, STREAM_DEFAULTS.reserveBatch),
|
||||
flushIntervalMs: parseNumber(env.COPILOT_STREAM_FLUSH_INTERVAL_MS, STREAM_DEFAULTS.flushIntervalMs),
|
||||
flushMaxBatch: parseNumber(env.COPILOT_STREAM_FLUSH_MAX_BATCH, STREAM_DEFAULTS.flushMaxBatch),
|
||||
}
|
||||
}
|
||||
|
||||
const APPEND_STREAM_EVENT_LUA = `
|
||||
local seqKey = KEYS[1]
|
||||
@@ -82,6 +109,7 @@ export async function setStreamMeta(streamId: string, meta: StreamMeta): Promise
|
||||
const redis = getRedisClient()
|
||||
if (!redis) return
|
||||
try {
|
||||
const config = getStreamBufferConfig()
|
||||
const payload: Record<string, string> = {
|
||||
status: meta.status,
|
||||
updatedAt: meta.updatedAt || new Date().toISOString(),
|
||||
@@ -89,7 +117,7 @@ export async function setStreamMeta(streamId: string, meta: StreamMeta): Promise
|
||||
if (meta.userId) payload.userId = meta.userId
|
||||
if (meta.error) payload.error = meta.error
|
||||
await redis.hset(getMetaKey(streamId), payload)
|
||||
await redis.expire(getMetaKey(streamId), STREAM_TTL_SECONDS)
|
||||
await redis.expire(getMetaKey(streamId), config.ttlSeconds)
|
||||
} catch (error) {
|
||||
logger.warn('Failed to update stream meta', {
|
||||
streamId,
|
||||
@@ -124,14 +152,15 @@ export async function appendStreamEvent(
|
||||
}
|
||||
|
||||
try {
|
||||
const config = getStreamBufferConfig()
|
||||
const eventJson = JSON.stringify(event)
|
||||
const nextId = await redis.eval(
|
||||
APPEND_STREAM_EVENT_LUA,
|
||||
2,
|
||||
getSeqKey(streamId),
|
||||
getEventsKey(streamId),
|
||||
STREAM_TTL_SECONDS,
|
||||
STREAM_EVENT_LIMIT,
|
||||
config.ttlSeconds,
|
||||
config.eventLimit,
|
||||
streamId,
|
||||
eventJson
|
||||
)
|
||||
@@ -156,6 +185,7 @@ export function createStreamEventWriter(streamId: string): StreamEventWriter {
|
||||
}
|
||||
}
|
||||
|
||||
const config = getStreamBufferConfig()
|
||||
let pending: StreamEventEntry[] = []
|
||||
let nextEventId = 0
|
||||
let maxReservedId = 0
|
||||
@@ -167,11 +197,11 @@ export function createStreamEventWriter(streamId: string): StreamEventWriter {
|
||||
flushTimer = setTimeout(() => {
|
||||
flushTimer = null
|
||||
void flush()
|
||||
}, STREAM_FLUSH_INTERVAL_MS)
|
||||
}, config.flushIntervalMs)
|
||||
}
|
||||
|
||||
const reserveIds = async (minCount: number) => {
|
||||
const reserveCount = Math.max(STREAM_RESERVE_BATCH, minCount)
|
||||
const reserveCount = Math.max(config.reserveBatch, minCount)
|
||||
const newMax = await redis.incrby(getSeqKey(streamId), reserveCount)
|
||||
const startId = newMax - reserveCount + 1
|
||||
if (nextEventId === 0 || nextEventId > maxReservedId) {
|
||||
@@ -193,9 +223,9 @@ export function createStreamEventWriter(streamId: string): StreamEventWriter {
|
||||
}
|
||||
const pipeline = redis.pipeline()
|
||||
pipeline.zadd(key, ...(zaddArgs as any))
|
||||
pipeline.expire(key, STREAM_TTL_SECONDS)
|
||||
pipeline.expire(getSeqKey(streamId), STREAM_TTL_SECONDS)
|
||||
pipeline.zremrangebyrank(key, 0, -STREAM_EVENT_LIMIT - 1)
|
||||
pipeline.expire(key, config.ttlSeconds)
|
||||
pipeline.expire(getSeqKey(streamId), config.ttlSeconds)
|
||||
pipeline.zremrangebyrank(key, 0, -config.eventLimit - 1)
|
||||
await pipeline.exec()
|
||||
} catch (error) {
|
||||
logger.warn('Failed to flush stream events', {
|
||||
@@ -216,7 +246,7 @@ export function createStreamEventWriter(streamId: string): StreamEventWriter {
|
||||
const eventId = nextEventId++
|
||||
const entry: StreamEventEntry = { eventId, streamId, event }
|
||||
pending.push(entry)
|
||||
if (pending.length >= STREAM_FLUSH_MAX_BATCH) {
|
||||
if (pending.length >= config.flushMaxBatch) {
|
||||
await flush()
|
||||
} else {
|
||||
scheduleFlush()
|
||||
|
||||
@@ -1,16 +1,11 @@
|
||||
import { createLogger } from '@sim/logger'
|
||||
import { SIM_AGENT_API_URL_DEFAULT } from '@/lib/copilot/constants'
|
||||
import { handleSubagentRouting, sseHandlers, subAgentHandlers } from '@/lib/copilot/orchestrator/sse-handlers'
|
||||
import {
|
||||
getToolCallIdFromEvent,
|
||||
handleSubagentRouting,
|
||||
markToolCallSeen,
|
||||
markToolResultSeen,
|
||||
normalizeSseEvent,
|
||||
sseHandlers,
|
||||
subAgentHandlers,
|
||||
wasToolCallSeen,
|
||||
wasToolResultSeen,
|
||||
} from '@/lib/copilot/orchestrator/sse-handlers'
|
||||
shouldSkipToolCallEvent,
|
||||
shouldSkipToolResultEvent,
|
||||
} from '@/lib/copilot/orchestrator/sse-utils'
|
||||
import { parseSSEStream } from '@/lib/copilot/orchestrator/sse-parser'
|
||||
import { prepareExecutionContext } from '@/lib/copilot/orchestrator/tool-executor'
|
||||
import type {
|
||||
@@ -115,38 +110,9 @@ export async function orchestrateSubagentStream(
|
||||
|
||||
const normalizedEvent = normalizeSseEvent(event)
|
||||
|
||||
// Skip tool_result events for tools the sim-side already executed.
|
||||
// The sim-side emits its own tool_result with complete data.
|
||||
// For server-side tools (not executed by sim), we still forward the Go backend's tool_result.
|
||||
const toolCallId = getToolCallIdFromEvent(normalizedEvent)
|
||||
const eventData = normalizedEvent.data
|
||||
|
||||
const isPartialToolCall =
|
||||
normalizedEvent.type === 'tool_call' && eventData?.partial === true
|
||||
|
||||
const shouldSkipToolCall =
|
||||
normalizedEvent.type === 'tool_call' &&
|
||||
!!toolCallId &&
|
||||
!isPartialToolCall &&
|
||||
(wasToolResultSeen(toolCallId) || wasToolCallSeen(toolCallId))
|
||||
|
||||
if (
|
||||
normalizedEvent.type === 'tool_call' &&
|
||||
toolCallId &&
|
||||
!isPartialToolCall &&
|
||||
!shouldSkipToolCall
|
||||
) {
|
||||
markToolCallSeen(toolCallId)
|
||||
}
|
||||
|
||||
const shouldSkipToolResult =
|
||||
normalizedEvent.type === 'tool_result' &&
|
||||
(() => {
|
||||
if (!toolCallId) return false
|
||||
if (wasToolResultSeen(toolCallId)) return true
|
||||
markToolResultSeen(toolCallId)
|
||||
return false
|
||||
})()
|
||||
// Skip duplicate tool events to prevent state regressions.
|
||||
const shouldSkipToolCall = shouldSkipToolCallEvent(normalizedEvent)
|
||||
const shouldSkipToolResult = shouldSkipToolResultEvent(normalizedEvent)
|
||||
|
||||
if (!shouldSkipToolCall && !shouldSkipToolResult) {
|
||||
await forwardEvent(normalizedEvent, options)
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
130
apps/sim/lib/copilot/orchestrator/tool-executor/access.ts
Normal file
130
apps/sim/lib/copilot/orchestrator/tool-executor/access.ts
Normal file
@@ -0,0 +1,130 @@
|
||||
import { db } from '@sim/db'
|
||||
import { permissions, workflow, workspace } from '@sim/db/schema'
|
||||
import { and, asc, desc, eq, inArray, or } from 'drizzle-orm'
|
||||
|
||||
type WorkflowRecord = typeof workflow.$inferSelect
|
||||
|
||||
export async function ensureWorkflowAccess(
|
||||
workflowId: string,
|
||||
userId: string
|
||||
): Promise<{
|
||||
workflow: WorkflowRecord
|
||||
workspaceId?: string | null
|
||||
}> {
|
||||
const [workflowRecord] = await db
|
||||
.select()
|
||||
.from(workflow)
|
||||
.where(eq(workflow.id, workflowId))
|
||||
.limit(1)
|
||||
if (!workflowRecord) {
|
||||
throw new Error(`Workflow ${workflowId} not found`)
|
||||
}
|
||||
|
||||
if (workflowRecord.userId === userId) {
|
||||
return { workflow: workflowRecord, workspaceId: workflowRecord.workspaceId }
|
||||
}
|
||||
|
||||
if (workflowRecord.workspaceId) {
|
||||
const [permissionRow] = await db
|
||||
.select({ permissionType: permissions.permissionType })
|
||||
.from(permissions)
|
||||
.where(
|
||||
and(
|
||||
eq(permissions.entityType, 'workspace'),
|
||||
eq(permissions.entityId, workflowRecord.workspaceId),
|
||||
eq(permissions.userId, userId)
|
||||
)
|
||||
)
|
||||
.limit(1)
|
||||
if (permissionRow) {
|
||||
return { workflow: workflowRecord, workspaceId: workflowRecord.workspaceId }
|
||||
}
|
||||
}
|
||||
|
||||
throw new Error('Unauthorized workflow access')
|
||||
}
|
||||
|
||||
export async function getDefaultWorkspaceId(userId: string): Promise<string> {
|
||||
const workspaces = await db
|
||||
.select({ workspaceId: workspace.id })
|
||||
.from(permissions)
|
||||
.innerJoin(workspace, eq(permissions.entityId, workspace.id))
|
||||
.where(and(eq(permissions.userId, userId), eq(permissions.entityType, 'workspace')))
|
||||
.orderBy(desc(workspace.createdAt))
|
||||
.limit(1)
|
||||
|
||||
const workspaceId = workspaces[0]?.workspaceId
|
||||
if (!workspaceId) {
|
||||
throw new Error('No workspace found for user')
|
||||
}
|
||||
|
||||
return workspaceId
|
||||
}
|
||||
|
||||
export async function ensureWorkspaceAccess(
|
||||
workspaceId: string,
|
||||
userId: string,
|
||||
requireWrite: boolean
|
||||
): Promise<void> {
|
||||
const [row] = await db
|
||||
.select({
|
||||
permissionType: permissions.permissionType,
|
||||
ownerId: workspace.ownerId,
|
||||
})
|
||||
.from(permissions)
|
||||
.innerJoin(workspace, eq(permissions.entityId, workspace.id))
|
||||
.where(
|
||||
and(
|
||||
eq(permissions.entityType, 'workspace'),
|
||||
eq(permissions.entityId, workspaceId),
|
||||
eq(permissions.userId, userId)
|
||||
)
|
||||
)
|
||||
.limit(1)
|
||||
|
||||
if (!row) {
|
||||
throw new Error(`Workspace ${workspaceId} not found`)
|
||||
}
|
||||
|
||||
const isOwner = row.ownerId === userId
|
||||
const permissionType = row.permissionType
|
||||
const canWrite = isOwner || permissionType === 'admin' || permissionType === 'write'
|
||||
|
||||
if (requireWrite && !canWrite) {
|
||||
throw new Error('Write or admin access required for this workspace')
|
||||
}
|
||||
|
||||
if (!requireWrite && !canWrite && permissionType !== 'read') {
|
||||
throw new Error('Access denied to workspace')
|
||||
}
|
||||
}
|
||||
|
||||
export async function getAccessibleWorkflowsForUser(
|
||||
userId: string,
|
||||
options?: { workspaceId?: string; folderId?: string }
|
||||
) {
|
||||
const workspaceIds = await db
|
||||
.select({ entityId: permissions.entityId })
|
||||
.from(permissions)
|
||||
.where(and(eq(permissions.userId, userId), eq(permissions.entityType, 'workspace')))
|
||||
|
||||
const workspaceIdList = workspaceIds.map((row) => row.entityId)
|
||||
|
||||
const workflowConditions = [eq(workflow.userId, userId)]
|
||||
if (workspaceIdList.length > 0) {
|
||||
workflowConditions.push(inArray(workflow.workspaceId, workspaceIdList))
|
||||
}
|
||||
if (options?.workspaceId) {
|
||||
workflowConditions.push(eq(workflow.workspaceId, options.workspaceId))
|
||||
}
|
||||
if (options?.folderId) {
|
||||
workflowConditions.push(eq(workflow.folderId, options.folderId))
|
||||
}
|
||||
|
||||
return db
|
||||
.select()
|
||||
.from(workflow)
|
||||
.where(or(...workflowConditions))
|
||||
.orderBy(asc(workflow.sortOrder), asc(workflow.createdAt), asc(workflow.id))
|
||||
}
|
||||
|
||||
@@ -0,0 +1,479 @@
|
||||
import crypto from 'crypto'
|
||||
import { db } from '@sim/db'
|
||||
import { chat, workflow, workflowMcpServer, workflowMcpTool } from '@sim/db/schema'
|
||||
import { and, eq, inArray } from 'drizzle-orm'
|
||||
import type { ExecutionContext, ToolCallResult } from '@/lib/copilot/orchestrator/types'
|
||||
import { sanitizeToolName } from '@/lib/mcp/workflow-tool-schema'
|
||||
import { deployWorkflow, undeployWorkflow } from '@/lib/workflows/persistence/utils'
|
||||
import { hasValidStartBlock } from '@/lib/workflows/triggers/trigger-utils.server'
|
||||
import { checkChatAccess, checkWorkflowAccessForChatCreation } from '@/app/api/chat/utils'
|
||||
import { ensureWorkflowAccess } from './access'
|
||||
|
||||
export async function executeDeployApi(
|
||||
params: Record<string, any>,
|
||||
context: ExecutionContext
|
||||
): Promise<ToolCallResult> {
|
||||
try {
|
||||
const workflowId = params.workflowId || context.workflowId
|
||||
if (!workflowId) {
|
||||
return { success: false, error: 'workflowId is required' }
|
||||
}
|
||||
const action = params.action === 'undeploy' ? 'undeploy' : 'deploy'
|
||||
const { workflow: workflowRecord } = await ensureWorkflowAccess(workflowId, context.userId)
|
||||
|
||||
if (action === 'undeploy') {
|
||||
const result = await undeployWorkflow({ workflowId })
|
||||
if (!result.success) {
|
||||
return { success: false, error: result.error || 'Failed to undeploy workflow' }
|
||||
}
|
||||
return { success: true, output: { workflowId, isDeployed: false } }
|
||||
}
|
||||
|
||||
const result = await deployWorkflow({
|
||||
workflowId,
|
||||
deployedBy: context.userId,
|
||||
workflowName: workflowRecord.name || undefined,
|
||||
})
|
||||
if (!result.success) {
|
||||
return { success: false, error: result.error || 'Failed to deploy workflow' }
|
||||
}
|
||||
|
||||
return {
|
||||
success: true,
|
||||
output: {
|
||||
workflowId,
|
||||
isDeployed: true,
|
||||
deployedAt: result.deployedAt,
|
||||
version: result.version,
|
||||
},
|
||||
}
|
||||
} catch (error) {
|
||||
return { success: false, error: error instanceof Error ? error.message : String(error) }
|
||||
}
|
||||
}
|
||||
|
||||
export async function executeDeployChat(
|
||||
params: Record<string, any>,
|
||||
context: ExecutionContext
|
||||
): Promise<ToolCallResult> {
|
||||
try {
|
||||
const workflowId = params.workflowId || context.workflowId
|
||||
if (!workflowId) {
|
||||
return { success: false, error: 'workflowId is required' }
|
||||
}
|
||||
|
||||
const action = params.action === 'undeploy' ? 'undeploy' : 'deploy'
|
||||
if (action === 'undeploy') {
|
||||
const existing = await db.select().from(chat).where(eq(chat.workflowId, workflowId)).limit(1)
|
||||
if (!existing.length) {
|
||||
return { success: false, error: 'No active chat deployment found for this workflow' }
|
||||
}
|
||||
const { hasAccess } = await checkChatAccess(existing[0].id, context.userId)
|
||||
if (!hasAccess) {
|
||||
return { success: false, error: 'Unauthorized chat access' }
|
||||
}
|
||||
await db.delete(chat).where(eq(chat.id, existing[0].id))
|
||||
return { success: true, output: { success: true, action: 'undeploy', isDeployed: false } }
|
||||
}
|
||||
|
||||
const { hasAccess } = await checkWorkflowAccessForChatCreation(workflowId, context.userId)
|
||||
if (!hasAccess) {
|
||||
return { success: false, error: 'Workflow not found or access denied' }
|
||||
}
|
||||
|
||||
const existing = await db.select().from(chat).where(eq(chat.workflowId, workflowId)).limit(1)
|
||||
const existingDeployment = existing[0] || null
|
||||
|
||||
const identifier = String(params.identifier || existingDeployment?.identifier || '').trim()
|
||||
const title = String(params.title || existingDeployment?.title || '').trim()
|
||||
if (!identifier || !title) {
|
||||
return { success: false, error: 'Chat identifier and title are required' }
|
||||
}
|
||||
|
||||
const identifierPattern = /^[a-z0-9-]+$/
|
||||
if (!identifierPattern.test(identifier)) {
|
||||
return {
|
||||
success: false,
|
||||
error: 'Identifier can only contain lowercase letters, numbers, and hyphens',
|
||||
}
|
||||
}
|
||||
|
||||
const existingIdentifier = await db
|
||||
.select()
|
||||
.from(chat)
|
||||
.where(eq(chat.identifier, identifier))
|
||||
.limit(1)
|
||||
if (existingIdentifier.length > 0 && existingIdentifier[0].id !== existingDeployment?.id) {
|
||||
return { success: false, error: 'Identifier already in use' }
|
||||
}
|
||||
|
||||
const deployResult = await deployWorkflow({
|
||||
workflowId,
|
||||
deployedBy: context.userId,
|
||||
})
|
||||
if (!deployResult.success) {
|
||||
return { success: false, error: deployResult.error || 'Failed to deploy workflow' }
|
||||
}
|
||||
|
||||
const payload = {
|
||||
workflowId,
|
||||
identifier,
|
||||
title,
|
||||
description: String(params.description || existingDeployment?.description || ''),
|
||||
customizations: {
|
||||
primaryColor:
|
||||
params.customizations?.primaryColor ||
|
||||
existingDeployment?.customizations?.primaryColor ||
|
||||
'var(--brand-primary-hover-hex)',
|
||||
welcomeMessage:
|
||||
params.customizations?.welcomeMessage ||
|
||||
existingDeployment?.customizations?.welcomeMessage ||
|
||||
'Hi there! How can I help you today?',
|
||||
},
|
||||
authType: params.authType || existingDeployment?.authType || 'public',
|
||||
password: params.password,
|
||||
allowedEmails: params.allowedEmails || existingDeployment?.allowedEmails || [],
|
||||
outputConfigs: params.outputConfigs || existingDeployment?.outputConfigs || [],
|
||||
}
|
||||
|
||||
if (existingDeployment) {
|
||||
await db
|
||||
.update(chat)
|
||||
.set({
|
||||
identifier: payload.identifier,
|
||||
title: payload.title,
|
||||
description: payload.description,
|
||||
customizations: payload.customizations,
|
||||
authType: payload.authType,
|
||||
password: payload.password || existingDeployment.password,
|
||||
allowedEmails:
|
||||
payload.authType === 'email' || payload.authType === 'sso' ? payload.allowedEmails : [],
|
||||
outputConfigs: payload.outputConfigs,
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(chat.id, existingDeployment.id))
|
||||
} else {
|
||||
await db.insert(chat).values({
|
||||
id: crypto.randomUUID(),
|
||||
workflowId,
|
||||
userId: context.userId,
|
||||
identifier: payload.identifier,
|
||||
title: payload.title,
|
||||
description: payload.description,
|
||||
customizations: payload.customizations,
|
||||
isActive: true,
|
||||
authType: payload.authType,
|
||||
password: payload.password || null,
|
||||
allowedEmails:
|
||||
payload.authType === 'email' || payload.authType === 'sso' ? payload.allowedEmails : [],
|
||||
outputConfigs: payload.outputConfigs,
|
||||
createdAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
}
|
||||
|
||||
return {
|
||||
success: true,
|
||||
output: { success: true, action: 'deploy', isDeployed: true, identifier },
|
||||
}
|
||||
} catch (error) {
|
||||
return { success: false, error: error instanceof Error ? error.message : String(error) }
|
||||
}
|
||||
}
|
||||
|
||||
export async function executeDeployMcp(
|
||||
params: Record<string, any>,
|
||||
context: ExecutionContext
|
||||
): Promise<ToolCallResult> {
|
||||
try {
|
||||
const workflowId = params.workflowId || context.workflowId
|
||||
if (!workflowId) {
|
||||
return { success: false, error: 'workflowId is required' }
|
||||
}
|
||||
|
||||
const { workflow: workflowRecord } = await ensureWorkflowAccess(workflowId, context.userId)
|
||||
const workspaceId = workflowRecord.workspaceId
|
||||
if (!workspaceId) {
|
||||
return { success: false, error: 'workspaceId is required' }
|
||||
}
|
||||
|
||||
if (!workflowRecord.isDeployed) {
|
||||
return {
|
||||
success: false,
|
||||
error: 'Workflow must be deployed before adding as an MCP tool. Use deploy_api first.',
|
||||
}
|
||||
}
|
||||
|
||||
const serverId = params.serverId
|
||||
if (!serverId) {
|
||||
return {
|
||||
success: false,
|
||||
error: 'serverId is required. Use list_workspace_mcp_servers to get available servers.',
|
||||
}
|
||||
}
|
||||
|
||||
const existingTool = await db
|
||||
.select()
|
||||
.from(workflowMcpTool)
|
||||
.where(and(eq(workflowMcpTool.serverId, serverId), eq(workflowMcpTool.workflowId, workflowId)))
|
||||
.limit(1)
|
||||
|
||||
const toolName = sanitizeToolName(
|
||||
params.toolName || workflowRecord.name || `workflow_${workflowId}`
|
||||
)
|
||||
const toolDescription =
|
||||
params.toolDescription || workflowRecord.description || `Execute ${workflowRecord.name} workflow`
|
||||
const parameterSchema = params.parameterSchema || {}
|
||||
|
||||
if (existingTool.length > 0) {
|
||||
const toolId = existingTool[0].id
|
||||
await db
|
||||
.update(workflowMcpTool)
|
||||
.set({
|
||||
toolName,
|
||||
toolDescription,
|
||||
parameterSchema,
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.where(eq(workflowMcpTool.id, toolId))
|
||||
return { success: true, output: { toolId, toolName, toolDescription, updated: true } }
|
||||
}
|
||||
|
||||
const toolId = crypto.randomUUID()
|
||||
await db.insert(workflowMcpTool).values({
|
||||
id: toolId,
|
||||
serverId,
|
||||
workflowId,
|
||||
toolName,
|
||||
toolDescription,
|
||||
parameterSchema,
|
||||
createdAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
|
||||
return { success: true, output: { toolId, toolName, toolDescription, updated: false } }
|
||||
} catch (error) {
|
||||
return { success: false, error: error instanceof Error ? error.message : String(error) }
|
||||
}
|
||||
}
|
||||
|
||||
export async function executeRedeploy(context: ExecutionContext): Promise<ToolCallResult> {
|
||||
try {
|
||||
const workflowId = context.workflowId
|
||||
if (!workflowId) {
|
||||
return { success: false, error: 'workflowId is required' }
|
||||
}
|
||||
await ensureWorkflowAccess(workflowId, context.userId)
|
||||
|
||||
const result = await deployWorkflow({ workflowId, deployedBy: context.userId })
|
||||
if (!result.success) {
|
||||
return { success: false, error: result.error || 'Failed to redeploy workflow' }
|
||||
}
|
||||
return {
|
||||
success: true,
|
||||
output: { workflowId, deployedAt: result.deployedAt || null, version: result.version },
|
||||
}
|
||||
} catch (error) {
|
||||
return { success: false, error: error instanceof Error ? error.message : String(error) }
|
||||
}
|
||||
}
|
||||
|
||||
export async function executeCheckDeploymentStatus(
|
||||
params: Record<string, any>,
|
||||
context: ExecutionContext
|
||||
): Promise<ToolCallResult> {
|
||||
try {
|
||||
const workflowId = params.workflowId || context.workflowId
|
||||
if (!workflowId) {
|
||||
return { success: false, error: 'workflowId is required' }
|
||||
}
|
||||
const { workflow: workflowRecord } = await ensureWorkflowAccess(workflowId, context.userId)
|
||||
const workspaceId = workflowRecord.workspaceId
|
||||
|
||||
const [apiDeploy, chatDeploy] = await Promise.all([
|
||||
db.select().from(workflow).where(eq(workflow.id, workflowId)).limit(1),
|
||||
db.select().from(chat).where(eq(chat.workflowId, workflowId)).limit(1),
|
||||
])
|
||||
|
||||
const isApiDeployed = apiDeploy[0]?.isDeployed || false
|
||||
const apiDetails = {
|
||||
isDeployed: isApiDeployed,
|
||||
deployedAt: apiDeploy[0]?.deployedAt || null,
|
||||
endpoint: isApiDeployed ? `/api/workflows/${workflowId}/execute` : null,
|
||||
apiKey: workflowRecord.workspaceId ? 'Workspace API keys' : 'Personal API keys',
|
||||
needsRedeployment: false,
|
||||
}
|
||||
|
||||
const isChatDeployed = !!chatDeploy[0]
|
||||
const chatDetails = {
|
||||
isDeployed: isChatDeployed,
|
||||
chatId: chatDeploy[0]?.id || null,
|
||||
identifier: chatDeploy[0]?.identifier || null,
|
||||
chatUrl: isChatDeployed ? `/chat/${chatDeploy[0]?.identifier}` : null,
|
||||
title: chatDeploy[0]?.title || null,
|
||||
description: chatDeploy[0]?.description || null,
|
||||
authType: chatDeploy[0]?.authType || null,
|
||||
allowedEmails: chatDeploy[0]?.allowedEmails || null,
|
||||
outputConfigs: chatDeploy[0]?.outputConfigs || null,
|
||||
welcomeMessage: chatDeploy[0]?.customizations?.welcomeMessage || null,
|
||||
primaryColor: chatDeploy[0]?.customizations?.primaryColor || null,
|
||||
hasPassword: Boolean(chatDeploy[0]?.password),
|
||||
}
|
||||
|
||||
const mcpDetails = { isDeployed: false, servers: [] as any[] }
|
||||
if (workspaceId) {
|
||||
const servers = await db
|
||||
.select({
|
||||
serverId: workflowMcpServer.id,
|
||||
serverName: workflowMcpServer.name,
|
||||
toolName: workflowMcpTool.toolName,
|
||||
toolDescription: workflowMcpTool.toolDescription,
|
||||
parameterSchema: workflowMcpTool.parameterSchema,
|
||||
toolId: workflowMcpTool.id,
|
||||
})
|
||||
.from(workflowMcpTool)
|
||||
.innerJoin(workflowMcpServer, eq(workflowMcpTool.serverId, workflowMcpServer.id))
|
||||
.where(eq(workflowMcpTool.workflowId, workflowId))
|
||||
|
||||
if (servers.length > 0) {
|
||||
mcpDetails.isDeployed = true
|
||||
mcpDetails.servers = servers
|
||||
}
|
||||
}
|
||||
|
||||
const isDeployed = apiDetails.isDeployed || chatDetails.isDeployed || mcpDetails.isDeployed
|
||||
return {
|
||||
success: true,
|
||||
output: { isDeployed, api: apiDetails, chat: chatDetails, mcp: mcpDetails },
|
||||
}
|
||||
} catch (error) {
|
||||
return { success: false, error: error instanceof Error ? error.message : String(error) }
|
||||
}
|
||||
}
|
||||
|
||||
export async function executeListWorkspaceMcpServers(
|
||||
params: Record<string, any>,
|
||||
context: ExecutionContext
|
||||
): Promise<ToolCallResult> {
|
||||
try {
|
||||
const workflowId = params.workflowId || context.workflowId
|
||||
if (!workflowId) {
|
||||
return { success: false, error: 'workflowId is required' }
|
||||
}
|
||||
const { workflow: workflowRecord } = await ensureWorkflowAccess(workflowId, context.userId)
|
||||
const workspaceId = workflowRecord.workspaceId
|
||||
if (!workspaceId) {
|
||||
return { success: false, error: 'workspaceId is required' }
|
||||
}
|
||||
|
||||
const servers = await db
|
||||
.select({
|
||||
id: workflowMcpServer.id,
|
||||
name: workflowMcpServer.name,
|
||||
description: workflowMcpServer.description,
|
||||
})
|
||||
.from(workflowMcpServer)
|
||||
.where(eq(workflowMcpServer.workspaceId, workspaceId))
|
||||
|
||||
const serverIds = servers.map((server) => server.id)
|
||||
const tools =
|
||||
serverIds.length > 0
|
||||
? await db
|
||||
.select({
|
||||
serverId: workflowMcpTool.serverId,
|
||||
toolName: workflowMcpTool.toolName,
|
||||
})
|
||||
.from(workflowMcpTool)
|
||||
.where(inArray(workflowMcpTool.serverId, serverIds))
|
||||
: []
|
||||
|
||||
const toolNamesByServer: Record<string, string[]> = {}
|
||||
for (const tool of tools) {
|
||||
if (!toolNamesByServer[tool.serverId]) {
|
||||
toolNamesByServer[tool.serverId] = []
|
||||
}
|
||||
toolNamesByServer[tool.serverId].push(tool.toolName)
|
||||
}
|
||||
|
||||
const serversWithToolNames = servers.map((server) => ({
|
||||
...server,
|
||||
toolCount: toolNamesByServer[server.id]?.length || 0,
|
||||
toolNames: toolNamesByServer[server.id] || [],
|
||||
}))
|
||||
|
||||
return { success: true, output: { servers: serversWithToolNames, count: servers.length } }
|
||||
} catch (error) {
|
||||
return { success: false, error: error instanceof Error ? error.message : String(error) }
|
||||
}
|
||||
}
|
||||
|
||||
export async function executeCreateWorkspaceMcpServer(
|
||||
params: Record<string, any>,
|
||||
context: ExecutionContext
|
||||
): Promise<ToolCallResult> {
|
||||
try {
|
||||
const workflowId = params.workflowId || context.workflowId
|
||||
if (!workflowId) {
|
||||
return { success: false, error: 'workflowId is required' }
|
||||
}
|
||||
const { workflow: workflowRecord } = await ensureWorkflowAccess(workflowId, context.userId)
|
||||
const workspaceId = workflowRecord.workspaceId
|
||||
if (!workspaceId) {
|
||||
return { success: false, error: 'workspaceId is required' }
|
||||
}
|
||||
|
||||
const name = params.name?.trim()
|
||||
if (!name) {
|
||||
return { success: false, error: 'name is required' }
|
||||
}
|
||||
|
||||
const serverId = crypto.randomUUID()
|
||||
const [server] = await db
|
||||
.insert(workflowMcpServer)
|
||||
.values({
|
||||
id: serverId,
|
||||
workspaceId,
|
||||
createdBy: context.userId,
|
||||
name,
|
||||
description: params.description?.trim() || null,
|
||||
isPublic: params.isPublic ?? false,
|
||||
createdAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
.returning()
|
||||
|
||||
const workflowIds: string[] = params.workflowIds || []
|
||||
const addedTools: Array<{ workflowId: string; toolName: string }> = []
|
||||
|
||||
if (workflowIds.length > 0) {
|
||||
const workflows = await db.select().from(workflow).where(inArray(workflow.id, workflowIds))
|
||||
|
||||
for (const wf of workflows) {
|
||||
if (wf.workspaceId !== workspaceId || !wf.isDeployed) {
|
||||
continue
|
||||
}
|
||||
const hasStartBlock = await hasValidStartBlock(wf.id)
|
||||
if (!hasStartBlock) {
|
||||
continue
|
||||
}
|
||||
const toolName = sanitizeToolName(wf.name || `workflow_${wf.id}`)
|
||||
await db.insert(workflowMcpTool).values({
|
||||
id: crypto.randomUUID(),
|
||||
serverId,
|
||||
workflowId: wf.id,
|
||||
toolName,
|
||||
toolDescription: wf.description || `Execute ${wf.name} workflow`,
|
||||
parameterSchema: {},
|
||||
createdAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
addedTools.push({ workflowId: wf.id, toolName })
|
||||
}
|
||||
}
|
||||
|
||||
return { success: true, output: { server, addedTools } }
|
||||
} catch (error) {
|
||||
return { success: false, error: error instanceof Error ? error.message : String(error) }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,100 @@
|
||||
import { db } from '@sim/db'
|
||||
import { account, workflow } from '@sim/db/schema'
|
||||
import { and, eq } from 'drizzle-orm'
|
||||
import type {
|
||||
ExecutionContext,
|
||||
ToolCallResult,
|
||||
ToolCallState,
|
||||
} from '@/lib/copilot/orchestrator/types'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { getEffectiveDecryptedEnv } from '@/lib/environment/utils'
|
||||
import { refreshTokenIfNeeded } from '@/app/api/auth/oauth/utils'
|
||||
import { resolveEnvVarReferences } from '@/executor/utils/reference-validation'
|
||||
import { executeTool } from '@/tools'
|
||||
import { resolveToolId } from '@/tools/utils'
|
||||
|
||||
export async function executeIntegrationToolDirect(
|
||||
toolCall: ToolCallState,
|
||||
toolConfig: any,
|
||||
context: ExecutionContext
|
||||
): Promise<ToolCallResult> {
|
||||
const { userId, workflowId } = context
|
||||
const toolName = resolveToolId(toolCall.name)
|
||||
const toolArgs = toolCall.params || {}
|
||||
|
||||
let workspaceId = context.workspaceId
|
||||
if (!workspaceId && workflowId) {
|
||||
const workflowResult = await db
|
||||
.select({ workspaceId: workflow.workspaceId })
|
||||
.from(workflow)
|
||||
.where(eq(workflow.id, workflowId))
|
||||
.limit(1)
|
||||
workspaceId = workflowResult[0]?.workspaceId ?? undefined
|
||||
}
|
||||
|
||||
const decryptedEnvVars =
|
||||
context.decryptedEnvVars || (await getEffectiveDecryptedEnv(userId, workspaceId))
|
||||
|
||||
const executionParams: Record<string, any> = resolveEnvVarReferences(toolArgs, decryptedEnvVars, {
|
||||
deep: true,
|
||||
}) as Record<string, any>
|
||||
|
||||
if (toolConfig.oauth?.required && toolConfig.oauth.provider) {
|
||||
const provider = toolConfig.oauth.provider
|
||||
const accounts = await db
|
||||
.select()
|
||||
.from(account)
|
||||
.where(and(eq(account.providerId, provider), eq(account.userId, userId)))
|
||||
.limit(1)
|
||||
|
||||
if (!accounts.length) {
|
||||
return {
|
||||
success: false,
|
||||
error: `No ${provider} account connected. Please connect your account first.`,
|
||||
}
|
||||
}
|
||||
|
||||
const acc = accounts[0]
|
||||
const requestId = generateRequestId()
|
||||
const { accessToken } = await refreshTokenIfNeeded(requestId, acc as any, acc.id)
|
||||
|
||||
if (!accessToken) {
|
||||
return {
|
||||
success: false,
|
||||
error: `OAuth token not available for ${provider}. Please reconnect your account.`,
|
||||
}
|
||||
}
|
||||
|
||||
executionParams.accessToken = accessToken
|
||||
}
|
||||
|
||||
if (toolConfig.params?.apiKey?.required && !executionParams.apiKey) {
|
||||
return {
|
||||
success: false,
|
||||
error: `API key not provided for ${toolName}. Use {{YOUR_API_KEY_ENV_VAR}} to reference your environment variable.`,
|
||||
}
|
||||
}
|
||||
|
||||
executionParams._context = {
|
||||
workflowId,
|
||||
userId,
|
||||
}
|
||||
|
||||
if (toolName === 'function_execute') {
|
||||
executionParams.envVars = decryptedEnvVars
|
||||
executionParams.workflowVariables = {}
|
||||
executionParams.blockData = {}
|
||||
executionParams.blockNameMapping = {}
|
||||
executionParams.language = executionParams.language || 'javascript'
|
||||
executionParams.timeout = executionParams.timeout || 30000
|
||||
}
|
||||
|
||||
const result = await executeTool(toolName, executionParams)
|
||||
|
||||
return {
|
||||
success: result.success,
|
||||
output: result.output,
|
||||
error: result.error,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,769 @@
|
||||
import crypto from 'crypto'
|
||||
import { db } from '@sim/db'
|
||||
import { customTools, permissions, workflow, workflowFolder, workspace } from '@sim/db/schema'
|
||||
import { and, asc, desc, eq, inArray, isNull, max, or } from 'drizzle-orm'
|
||||
import type { ExecutionContext, ToolCallResult } from '@/lib/copilot/orchestrator/types'
|
||||
import {
|
||||
extractWorkflowNames,
|
||||
formatNormalizedWorkflowForCopilot,
|
||||
normalizeWorkflowName,
|
||||
} from '@/lib/copilot/tools/shared/workflow-utils'
|
||||
import { generateRequestId } from '@/lib/core/utils/request'
|
||||
import { mcpService } from '@/lib/mcp/service'
|
||||
import { listWorkspaceFiles } from '@/lib/uploads/contexts/workspace'
|
||||
import { getBlockOutputPaths } from '@/lib/workflows/blocks/block-outputs'
|
||||
import { BlockPathCalculator } from '@/lib/workflows/blocks/block-path-calculator'
|
||||
import { buildDefaultWorkflowArtifacts } from '@/lib/workflows/defaults'
|
||||
import { executeWorkflow } from '@/lib/workflows/executor/execute-workflow'
|
||||
import {
|
||||
loadWorkflowFromNormalizedTables,
|
||||
saveWorkflowToNormalizedTables,
|
||||
} from '@/lib/workflows/persistence/utils'
|
||||
import { isInputDefinitionTrigger } from '@/lib/workflows/triggers/input-definition-triggers'
|
||||
import { ensureWorkflowAccess, ensureWorkspaceAccess, getAccessibleWorkflowsForUser, getDefaultWorkspaceId } from './access'
|
||||
import { normalizeName } from '@/executor/constants'
|
||||
|
||||
export async function executeGetUserWorkflow(
|
||||
params: Record<string, any>,
|
||||
context: ExecutionContext
|
||||
): Promise<ToolCallResult> {
|
||||
try {
|
||||
const workflowId = params.workflowId || context.workflowId
|
||||
if (!workflowId) {
|
||||
return { success: false, error: 'workflowId is required' }
|
||||
}
|
||||
|
||||
const { workflow: workflowRecord, workspaceId } = await ensureWorkflowAccess(
|
||||
workflowId,
|
||||
context.userId
|
||||
)
|
||||
|
||||
const normalized = await loadWorkflowFromNormalizedTables(workflowId)
|
||||
const userWorkflow = formatNormalizedWorkflowForCopilot(normalized)
|
||||
if (!userWorkflow) {
|
||||
return { success: false, error: 'Workflow has no normalized data' }
|
||||
}
|
||||
|
||||
return {
|
||||
success: true,
|
||||
output: {
|
||||
workflowId,
|
||||
workflowName: workflowRecord.name || '',
|
||||
workspaceId,
|
||||
userWorkflow,
|
||||
},
|
||||
}
|
||||
} catch (error) {
|
||||
return { success: false, error: error instanceof Error ? error.message : String(error) }
|
||||
}
|
||||
}
|
||||
|
||||
export async function executeGetWorkflowFromName(
|
||||
params: Record<string, any>,
|
||||
context: ExecutionContext
|
||||
): Promise<ToolCallResult> {
|
||||
try {
|
||||
const workflowName = typeof params.workflow_name === 'string' ? params.workflow_name.trim() : ''
|
||||
if (!workflowName) {
|
||||
return { success: false, error: 'workflow_name is required' }
|
||||
}
|
||||
|
||||
const workflows = await getAccessibleWorkflowsForUser(context.userId)
|
||||
|
||||
const targetName = normalizeWorkflowName(workflowName)
|
||||
const match = workflows.find((w) => normalizeWorkflowName(w.name) === targetName)
|
||||
if (!match) {
|
||||
return { success: false, error: `Workflow not found: ${workflowName}` }
|
||||
}
|
||||
|
||||
const normalized = await loadWorkflowFromNormalizedTables(match.id)
|
||||
const userWorkflow = formatNormalizedWorkflowForCopilot(normalized)
|
||||
if (!userWorkflow) {
|
||||
return { success: false, error: 'Workflow has no normalized data' }
|
||||
}
|
||||
|
||||
return {
|
||||
success: true,
|
||||
output: {
|
||||
workflowId: match.id,
|
||||
workflowName: match.name || '',
|
||||
workspaceId: match.workspaceId,
|
||||
userWorkflow,
|
||||
},
|
||||
}
|
||||
} catch (error) {
|
||||
return { success: false, error: error instanceof Error ? error.message : String(error) }
|
||||
}
|
||||
}
|
||||
|
||||
export async function executeListUserWorkflows(
|
||||
params: Record<string, any>,
|
||||
context: ExecutionContext
|
||||
): Promise<ToolCallResult> {
|
||||
try {
|
||||
const workspaceId = params?.workspaceId as string | undefined
|
||||
const folderId = params?.folderId as string | undefined
|
||||
|
||||
const workflows = await getAccessibleWorkflowsForUser(context.userId, { workspaceId, folderId })
|
||||
|
||||
const names = extractWorkflowNames(workflows)
|
||||
|
||||
const workflowList = workflows.map((w) => ({
|
||||
workflowId: w.id,
|
||||
workflowName: w.name || '',
|
||||
workspaceId: w.workspaceId,
|
||||
folderId: w.folderId,
|
||||
}))
|
||||
|
||||
return { success: true, output: { workflow_names: names, workflows: workflowList } }
|
||||
} catch (error) {
|
||||
return { success: false, error: error instanceof Error ? error.message : String(error) }
|
||||
}
|
||||
}
|
||||
|
||||
export async function executeListUserWorkspaces(context: ExecutionContext): Promise<ToolCallResult> {
|
||||
try {
|
||||
const workspaces = await db
|
||||
.select({
|
||||
workspaceId: workspace.id,
|
||||
workspaceName: workspace.name,
|
||||
ownerId: workspace.ownerId,
|
||||
permissionType: permissions.permissionType,
|
||||
})
|
||||
.from(permissions)
|
||||
.innerJoin(workspace, eq(permissions.entityId, workspace.id))
|
||||
.where(and(eq(permissions.userId, context.userId), eq(permissions.entityType, 'workspace')))
|
||||
.orderBy(desc(workspace.createdAt))
|
||||
|
||||
const output = workspaces.map((row) => ({
|
||||
workspaceId: row.workspaceId,
|
||||
workspaceName: row.workspaceName,
|
||||
role: row.ownerId === context.userId ? 'owner' : row.permissionType,
|
||||
}))
|
||||
|
||||
return { success: true, output: { workspaces: output } }
|
||||
} catch (error) {
|
||||
return { success: false, error: error instanceof Error ? error.message : String(error) }
|
||||
}
|
||||
}
|
||||
|
||||
export async function executeListFolders(
|
||||
params: Record<string, any>,
|
||||
context: ExecutionContext
|
||||
): Promise<ToolCallResult> {
|
||||
try {
|
||||
const workspaceId =
|
||||
(params?.workspaceId as string | undefined) || (await getDefaultWorkspaceId(context.userId))
|
||||
|
||||
await ensureWorkspaceAccess(workspaceId, context.userId, false)
|
||||
|
||||
const folders = await db
|
||||
.select({
|
||||
folderId: workflowFolder.id,
|
||||
folderName: workflowFolder.name,
|
||||
parentId: workflowFolder.parentId,
|
||||
sortOrder: workflowFolder.sortOrder,
|
||||
})
|
||||
.from(workflowFolder)
|
||||
.where(eq(workflowFolder.workspaceId, workspaceId))
|
||||
.orderBy(asc(workflowFolder.sortOrder), asc(workflowFolder.createdAt))
|
||||
|
||||
return {
|
||||
success: true,
|
||||
output: {
|
||||
workspaceId,
|
||||
folders,
|
||||
},
|
||||
}
|
||||
} catch (error) {
|
||||
return { success: false, error: error instanceof Error ? error.message : String(error) }
|
||||
}
|
||||
}
|
||||
|
||||
export async function executeCreateWorkflow(
|
||||
params: Record<string, any>,
|
||||
context: ExecutionContext
|
||||
): Promise<ToolCallResult> {
|
||||
try {
|
||||
const name = typeof params?.name === 'string' ? params.name.trim() : ''
|
||||
if (!name) {
|
||||
return { success: false, error: 'name is required' }
|
||||
}
|
||||
|
||||
const workspaceId = params?.workspaceId || (await getDefaultWorkspaceId(context.userId))
|
||||
const folderId = params?.folderId || null
|
||||
const description = typeof params?.description === 'string' ? params.description : null
|
||||
|
||||
await ensureWorkspaceAccess(workspaceId, context.userId, true)
|
||||
|
||||
const workflowId = crypto.randomUUID()
|
||||
const now = new Date()
|
||||
|
||||
const folderCondition = folderId ? eq(workflow.folderId, folderId) : isNull(workflow.folderId)
|
||||
const [maxResult] = await db
|
||||
.select({ maxOrder: max(workflow.sortOrder) })
|
||||
.from(workflow)
|
||||
.where(and(eq(workflow.workspaceId, workspaceId), folderCondition))
|
||||
const sortOrder = (maxResult?.maxOrder ?? 0) + 1
|
||||
|
||||
await db.insert(workflow).values({
|
||||
id: workflowId,
|
||||
userId: context.userId,
|
||||
workspaceId,
|
||||
folderId,
|
||||
sortOrder,
|
||||
name,
|
||||
description,
|
||||
color: '#3972F6',
|
||||
lastSynced: now,
|
||||
createdAt: now,
|
||||
updatedAt: now,
|
||||
isDeployed: false,
|
||||
runCount: 0,
|
||||
variables: {},
|
||||
})
|
||||
|
||||
const { workflowState } = buildDefaultWorkflowArtifacts()
|
||||
const saveResult = await saveWorkflowToNormalizedTables(workflowId, workflowState)
|
||||
if (!saveResult.success) {
|
||||
throw new Error(saveResult.error || 'Failed to save workflow state')
|
||||
}
|
||||
|
||||
return {
|
||||
success: true,
|
||||
output: {
|
||||
workflowId,
|
||||
workflowName: name,
|
||||
workspaceId,
|
||||
folderId,
|
||||
},
|
||||
}
|
||||
} catch (error) {
|
||||
return { success: false, error: error instanceof Error ? error.message : String(error) }
|
||||
}
|
||||
}
|
||||
|
||||
export async function executeCreateFolder(
|
||||
params: Record<string, any>,
|
||||
context: ExecutionContext
|
||||
): Promise<ToolCallResult> {
|
||||
try {
|
||||
const name = typeof params?.name === 'string' ? params.name.trim() : ''
|
||||
if (!name) {
|
||||
return { success: false, error: 'name is required' }
|
||||
}
|
||||
|
||||
const workspaceId = params?.workspaceId || (await getDefaultWorkspaceId(context.userId))
|
||||
const parentId = params?.parentId || null
|
||||
|
||||
await ensureWorkspaceAccess(workspaceId, context.userId, true)
|
||||
|
||||
const [maxResult] = await db
|
||||
.select({ maxOrder: max(workflowFolder.sortOrder) })
|
||||
.from(workflowFolder)
|
||||
.where(
|
||||
and(
|
||||
eq(workflowFolder.workspaceId, workspaceId),
|
||||
parentId ? eq(workflowFolder.parentId, parentId) : isNull(workflowFolder.parentId)
|
||||
)
|
||||
)
|
||||
const sortOrder = (maxResult?.maxOrder ?? 0) + 1
|
||||
|
||||
const folderId = crypto.randomUUID()
|
||||
await db.insert(workflowFolder).values({
|
||||
id: folderId,
|
||||
workspaceId,
|
||||
parentId,
|
||||
name,
|
||||
sortOrder,
|
||||
createdAt: new Date(),
|
||||
updatedAt: new Date(),
|
||||
})
|
||||
|
||||
return { success: true, output: { folderId, name, workspaceId, parentId } }
|
||||
} catch (error) {
|
||||
return { success: false, error: error instanceof Error ? error.message : String(error) }
|
||||
}
|
||||
}
|
||||
|
||||
export async function executeGetWorkflowData(
|
||||
params: Record<string, any>,
|
||||
context: ExecutionContext
|
||||
): Promise<ToolCallResult> {
|
||||
try {
|
||||
const workflowId = params.workflowId || context.workflowId
|
||||
const dataType = params.data_type || params.dataType || ''
|
||||
if (!workflowId) {
|
||||
return { success: false, error: 'workflowId is required' }
|
||||
}
|
||||
if (!dataType) {
|
||||
return { success: false, error: 'data_type is required' }
|
||||
}
|
||||
|
||||
const { workflow: workflowRecord, workspaceId } = await ensureWorkflowAccess(
|
||||
workflowId,
|
||||
context.userId
|
||||
)
|
||||
|
||||
if (dataType === 'global_variables') {
|
||||
const variablesRecord = (workflowRecord.variables as Record<string, any>) || {}
|
||||
const variables = Object.values(variablesRecord).map((v: any) => ({
|
||||
id: String(v?.id || ''),
|
||||
name: String(v?.name || ''),
|
||||
value: v?.value,
|
||||
}))
|
||||
return { success: true, output: { variables } }
|
||||
}
|
||||
|
||||
if (dataType === 'custom_tools') {
|
||||
if (!workspaceId) {
|
||||
return { success: false, error: 'workspaceId is required' }
|
||||
}
|
||||
const conditions = [
|
||||
eq(customTools.workspaceId, workspaceId),
|
||||
and(eq(customTools.userId, context.userId), isNull(customTools.workspaceId)),
|
||||
]
|
||||
const toolsRows = await db
|
||||
.select()
|
||||
.from(customTools)
|
||||
.where(or(...conditions))
|
||||
.orderBy(desc(customTools.createdAt))
|
||||
|
||||
const customToolsData = toolsRows.map((tool) => ({
|
||||
id: String(tool.id || ''),
|
||||
title: String(tool.title || ''),
|
||||
functionName: String((tool.schema as any)?.function?.name || ''),
|
||||
description: String((tool.schema as any)?.function?.description || ''),
|
||||
parameters: (tool.schema as any)?.function?.parameters,
|
||||
}))
|
||||
|
||||
return { success: true, output: { customTools: customToolsData } }
|
||||
}
|
||||
|
||||
if (dataType === 'mcp_tools') {
|
||||
if (!workspaceId) {
|
||||
return { success: false, error: 'workspaceId is required' }
|
||||
}
|
||||
const tools = await mcpService.discoverTools(context.userId, workspaceId, false)
|
||||
const mcpTools = tools.map((tool) => ({
|
||||
name: String(tool.name || ''),
|
||||
serverId: String(tool.serverId || ''),
|
||||
serverName: String(tool.serverName || ''),
|
||||
description: String(tool.description || ''),
|
||||
inputSchema: tool.inputSchema,
|
||||
}))
|
||||
return { success: true, output: { mcpTools } }
|
||||
}
|
||||
|
||||
if (dataType === 'files') {
|
||||
if (!workspaceId) {
|
||||
return { success: false, error: 'workspaceId is required' }
|
||||
}
|
||||
const files = await listWorkspaceFiles(workspaceId)
|
||||
const fileResults = files.map((file) => ({
|
||||
id: String(file.id || ''),
|
||||
name: String(file.name || ''),
|
||||
key: String(file.key || ''),
|
||||
path: String(file.path || ''),
|
||||
size: Number(file.size || 0),
|
||||
type: String(file.type || ''),
|
||||
uploadedAt: String(file.uploadedAt || ''),
|
||||
}))
|
||||
return { success: true, output: { files: fileResults } }
|
||||
}
|
||||
|
||||
return { success: false, error: `Unknown data_type: ${dataType}` }
|
||||
} catch (error) {
|
||||
return { success: false, error: error instanceof Error ? error.message : String(error) }
|
||||
}
|
||||
}
|
||||
|
||||
export async function executeGetBlockOutputs(
|
||||
params: Record<string, any>,
|
||||
context: ExecutionContext
|
||||
): Promise<ToolCallResult> {
|
||||
try {
|
||||
const workflowId = params.workflowId || context.workflowId
|
||||
if (!workflowId) {
|
||||
return { success: false, error: 'workflowId is required' }
|
||||
}
|
||||
await ensureWorkflowAccess(workflowId, context.userId)
|
||||
|
||||
const normalized = await loadWorkflowFromNormalizedTables(workflowId)
|
||||
if (!normalized) {
|
||||
return { success: false, error: 'Workflow has no normalized data' }
|
||||
}
|
||||
|
||||
const blocks = normalized.blocks || {}
|
||||
const loops = normalized.loops || {}
|
||||
const parallels = normalized.parallels || {}
|
||||
const blockIds =
|
||||
Array.isArray(params.blockIds) && params.blockIds.length > 0
|
||||
? params.blockIds
|
||||
: Object.keys(blocks)
|
||||
|
||||
const results: Array<{
|
||||
blockId: string
|
||||
blockName: string
|
||||
blockType: string
|
||||
outputs: string[]
|
||||
insideSubflowOutputs?: string[]
|
||||
outsideSubflowOutputs?: string[]
|
||||
triggerMode?: boolean
|
||||
}> = []
|
||||
|
||||
for (const blockId of blockIds) {
|
||||
const block = blocks[blockId]
|
||||
if (!block?.type) continue
|
||||
const blockName = block.name || block.type
|
||||
|
||||
if (block.type === 'loop' || block.type === 'parallel') {
|
||||
const insidePaths = getSubflowInsidePaths(block.type, blockId, loops, parallels)
|
||||
results.push({
|
||||
blockId,
|
||||
blockName,
|
||||
blockType: block.type,
|
||||
outputs: [],
|
||||
insideSubflowOutputs: formatOutputsWithPrefix(insidePaths, blockName),
|
||||
outsideSubflowOutputs: formatOutputsWithPrefix(['results'], blockName),
|
||||
triggerMode: block.triggerMode,
|
||||
})
|
||||
continue
|
||||
}
|
||||
|
||||
const outputs = getBlockOutputPaths(block.type, block.subBlocks, block.triggerMode)
|
||||
results.push({
|
||||
blockId,
|
||||
blockName,
|
||||
blockType: block.type,
|
||||
outputs: formatOutputsWithPrefix(outputs, blockName),
|
||||
triggerMode: block.triggerMode,
|
||||
})
|
||||
}
|
||||
|
||||
const variables = await getWorkflowVariablesForTool(workflowId)
|
||||
|
||||
const payload = { blocks: results, variables }
|
||||
return { success: true, output: payload }
|
||||
} catch (error) {
|
||||
return { success: false, error: error instanceof Error ? error.message : String(error) }
|
||||
}
|
||||
}
|
||||
|
||||
export async function executeGetBlockUpstreamReferences(
|
||||
params: Record<string, any>,
|
||||
context: ExecutionContext
|
||||
): Promise<ToolCallResult> {
|
||||
try {
|
||||
const workflowId = params.workflowId || context.workflowId
|
||||
if (!workflowId) {
|
||||
return { success: false, error: 'workflowId is required' }
|
||||
}
|
||||
if (!Array.isArray(params.blockIds) || params.blockIds.length === 0) {
|
||||
return { success: false, error: 'blockIds array is required' }
|
||||
}
|
||||
await ensureWorkflowAccess(workflowId, context.userId)
|
||||
|
||||
const normalized = await loadWorkflowFromNormalizedTables(workflowId)
|
||||
if (!normalized) {
|
||||
return { success: false, error: 'Workflow has no normalized data' }
|
||||
}
|
||||
|
||||
const blocks = normalized.blocks || {}
|
||||
const edges = normalized.edges || []
|
||||
const loops = normalized.loops || {}
|
||||
const parallels = normalized.parallels || {}
|
||||
|
||||
const graphEdges = edges.map((edge: any) => ({ source: edge.source, target: edge.target }))
|
||||
const variableOutputs = await getWorkflowVariablesForTool(workflowId)
|
||||
|
||||
const results: any[] = []
|
||||
|
||||
for (const blockId of params.blockIds) {
|
||||
const targetBlock = blocks[blockId]
|
||||
if (!targetBlock) continue
|
||||
|
||||
const insideSubflows: Array<{ blockId: string; blockName: string; blockType: string }> = []
|
||||
const containingLoopIds = new Set<string>()
|
||||
const containingParallelIds = new Set<string>()
|
||||
|
||||
Object.values(loops as Record<string, any>).forEach((loop) => {
|
||||
if (loop?.nodes?.includes(blockId)) {
|
||||
containingLoopIds.add(loop.id)
|
||||
const loopBlock = blocks[loop.id]
|
||||
if (loopBlock) {
|
||||
insideSubflows.push({
|
||||
blockId: loop.id,
|
||||
blockName: loopBlock.name || loopBlock.type,
|
||||
blockType: 'loop',
|
||||
})
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
Object.values(parallels as Record<string, any>).forEach((parallel) => {
|
||||
if (parallel?.nodes?.includes(blockId)) {
|
||||
containingParallelIds.add(parallel.id)
|
||||
const parallelBlock = blocks[parallel.id]
|
||||
if (parallelBlock) {
|
||||
insideSubflows.push({
|
||||
blockId: parallel.id,
|
||||
blockName: parallelBlock.name || parallelBlock.type,
|
||||
blockType: 'parallel',
|
||||
})
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
const ancestorIds = BlockPathCalculator.findAllPathNodes(graphEdges, blockId)
|
||||
const accessibleIds = new Set<string>(ancestorIds)
|
||||
accessibleIds.add(blockId)
|
||||
|
||||
const starterBlock = Object.values(blocks).find((b: any) => isInputDefinitionTrigger(b.type))
|
||||
if (starterBlock && ancestorIds.includes((starterBlock as any).id)) {
|
||||
accessibleIds.add((starterBlock as any).id)
|
||||
}
|
||||
|
||||
containingLoopIds.forEach((loopId) => {
|
||||
accessibleIds.add(loopId)
|
||||
loops[loopId]?.nodes?.forEach((nodeId: string) => accessibleIds.add(nodeId))
|
||||
})
|
||||
|
||||
containingParallelIds.forEach((parallelId) => {
|
||||
accessibleIds.add(parallelId)
|
||||
parallels[parallelId]?.nodes?.forEach((nodeId: string) => accessibleIds.add(nodeId))
|
||||
})
|
||||
|
||||
const accessibleBlocks: any[] = []
|
||||
|
||||
for (const accessibleBlockId of accessibleIds) {
|
||||
const block = blocks[accessibleBlockId]
|
||||
if (!block?.type) continue
|
||||
const canSelfReference = block.type === 'approval' || block.type === 'human_in_the_loop'
|
||||
if (accessibleBlockId === blockId && !canSelfReference) continue
|
||||
|
||||
const blockName = block.name || block.type
|
||||
let accessContext: 'inside' | 'outside' | undefined
|
||||
let outputPaths: string[]
|
||||
|
||||
if (block.type === 'loop' || block.type === 'parallel') {
|
||||
const isInside =
|
||||
(block.type === 'loop' && containingLoopIds.has(accessibleBlockId)) ||
|
||||
(block.type === 'parallel' && containingParallelIds.has(accessibleBlockId))
|
||||
accessContext = isInside ? 'inside' : 'outside'
|
||||
outputPaths = isInside
|
||||
? getSubflowInsidePaths(block.type, accessibleBlockId, loops, parallels)
|
||||
: ['results']
|
||||
} else {
|
||||
outputPaths = getBlockOutputPaths(block.type, block.subBlocks, block.triggerMode)
|
||||
}
|
||||
|
||||
const formattedOutputs = formatOutputsWithPrefix(outputPaths, blockName)
|
||||
const entry: any = {
|
||||
blockId: accessibleBlockId,
|
||||
blockName,
|
||||
blockType: block.type,
|
||||
outputs: formattedOutputs,
|
||||
}
|
||||
if (block.triggerMode) entry.triggerMode = true
|
||||
if (accessContext) entry.accessContext = accessContext
|
||||
accessibleBlocks.push(entry)
|
||||
}
|
||||
|
||||
results.push({
|
||||
blockId,
|
||||
blockName: targetBlock.name || targetBlock.type,
|
||||
blockType: targetBlock.type,
|
||||
accessibleBlocks,
|
||||
insideSubflows,
|
||||
variables: variableOutputs,
|
||||
})
|
||||
}
|
||||
|
||||
const payload = { results }
|
||||
return { success: true, output: payload }
|
||||
} catch (error) {
|
||||
return { success: false, error: error instanceof Error ? error.message : String(error) }
|
||||
}
|
||||
}
|
||||
|
||||
export async function executeRunWorkflow(
|
||||
params: Record<string, any>,
|
||||
context: ExecutionContext
|
||||
): Promise<ToolCallResult> {
|
||||
try {
|
||||
const workflowId = params.workflowId || context.workflowId
|
||||
if (!workflowId) {
|
||||
return { success: false, error: 'workflowId is required' }
|
||||
}
|
||||
|
||||
const { workflow: workflowRecord } = await ensureWorkflowAccess(workflowId, context.userId)
|
||||
|
||||
const result = await executeWorkflow(
|
||||
{
|
||||
id: workflowRecord.id,
|
||||
userId: workflowRecord.userId,
|
||||
workspaceId: workflowRecord.workspaceId,
|
||||
variables: workflowRecord.variables || {},
|
||||
},
|
||||
generateRequestId(),
|
||||
params.workflow_input || params.input || undefined,
|
||||
context.userId
|
||||
)
|
||||
|
||||
return {
|
||||
success: result.success,
|
||||
output: {
|
||||
executionId: result.executionId,
|
||||
success: result.success,
|
||||
output: result.output,
|
||||
logs: result.logs,
|
||||
},
|
||||
error: result.success ? undefined : result.error || 'Workflow execution failed',
|
||||
}
|
||||
} catch (error) {
|
||||
return { success: false, error: error instanceof Error ? error.message : String(error) }
|
||||
}
|
||||
}
|
||||
|
||||
export async function executeSetGlobalWorkflowVariables(
|
||||
params: Record<string, any>,
|
||||
context: ExecutionContext
|
||||
): Promise<ToolCallResult> {
|
||||
try {
|
||||
const workflowId = params.workflowId || context.workflowId
|
||||
if (!workflowId) {
|
||||
return { success: false, error: 'workflowId is required' }
|
||||
}
|
||||
const operations = Array.isArray(params.operations) ? params.operations : []
|
||||
const { workflow: workflowRecord } = await ensureWorkflowAccess(workflowId, context.userId)
|
||||
|
||||
const currentVarsRecord = (workflowRecord.variables as Record<string, any>) || {}
|
||||
const byName: Record<string, any> = {}
|
||||
Object.values(currentVarsRecord).forEach((v: any) => {
|
||||
if (v && typeof v === 'object' && v.id && v.name) byName[String(v.name)] = v
|
||||
})
|
||||
|
||||
for (const op of operations) {
|
||||
const key = String(op?.name || '')
|
||||
if (!key) continue
|
||||
const nextType = op?.type || byName[key]?.type || 'plain'
|
||||
const coerceValue = (value: any, type: string) => {
|
||||
if (value === undefined) return value
|
||||
if (type === 'number') {
|
||||
const n = Number(value)
|
||||
return Number.isNaN(n) ? value : n
|
||||
}
|
||||
if (type === 'boolean') {
|
||||
const v = String(value).trim().toLowerCase()
|
||||
if (v === 'true') return true
|
||||
if (v === 'false') return false
|
||||
return value
|
||||
}
|
||||
if (type === 'array' || type === 'object') {
|
||||
try {
|
||||
const parsed = JSON.parse(String(value))
|
||||
if (type === 'array' && Array.isArray(parsed)) return parsed
|
||||
if (type === 'object' && parsed && typeof parsed === 'object' && !Array.isArray(parsed))
|
||||
return parsed
|
||||
} catch {}
|
||||
return value
|
||||
}
|
||||
return value
|
||||
}
|
||||
|
||||
if (op.operation === 'delete') {
|
||||
delete byName[key]
|
||||
continue
|
||||
}
|
||||
const typedValue = coerceValue(op.value, nextType)
|
||||
if (op.operation === 'add') {
|
||||
byName[key] = {
|
||||
id: crypto.randomUUID(),
|
||||
workflowId,
|
||||
name: key,
|
||||
type: nextType,
|
||||
value: typedValue,
|
||||
}
|
||||
continue
|
||||
}
|
||||
if (op.operation === 'edit') {
|
||||
if (!byName[key]) {
|
||||
byName[key] = {
|
||||
id: crypto.randomUUID(),
|
||||
workflowId,
|
||||
name: key,
|
||||
type: nextType,
|
||||
value: typedValue,
|
||||
}
|
||||
} else {
|
||||
byName[key] = {
|
||||
...byName[key],
|
||||
type: nextType,
|
||||
value: typedValue,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const nextVarsRecord = Object.fromEntries(
|
||||
Object.values(byName).map((v: any) => [String(v.id), v])
|
||||
)
|
||||
|
||||
await db
|
||||
.update(workflow)
|
||||
.set({ variables: nextVarsRecord, updatedAt: new Date() })
|
||||
.where(eq(workflow.id, workflowId))
|
||||
|
||||
return { success: true, output: { updated: Object.values(byName).length } }
|
||||
} catch (error) {
|
||||
return { success: false, error: error instanceof Error ? error.message : String(error) }
|
||||
}
|
||||
}
|
||||
|
||||
async function getWorkflowVariablesForTool(
|
||||
workflowId: string
|
||||
): Promise<Array<{ id: string; name: string; type: string; tag: string }>> {
|
||||
const [workflowRecord] = await db
|
||||
.select({ variables: workflow.variables })
|
||||
.from(workflow)
|
||||
.where(eq(workflow.id, workflowId))
|
||||
.limit(1)
|
||||
|
||||
const variablesRecord = (workflowRecord?.variables as Record<string, any>) || {}
|
||||
return Object.values(variablesRecord)
|
||||
.filter((v: any) => v?.name && String(v.name).trim() !== '')
|
||||
.map((v: any) => ({
|
||||
id: String(v.id || ''),
|
||||
name: String(v.name || ''),
|
||||
type: String(v.type || 'plain'),
|
||||
tag: `variable.${normalizeName(String(v.name || ''))}`,
|
||||
}))
|
||||
}
|
||||
|
||||
function getSubflowInsidePaths(
|
||||
blockType: 'loop' | 'parallel',
|
||||
blockId: string,
|
||||
loops: Record<string, any>,
|
||||
parallels: Record<string, any>
|
||||
): string[] {
|
||||
const paths = ['index']
|
||||
if (blockType === 'loop') {
|
||||
const loopType = loops[blockId]?.loopType || 'for'
|
||||
if (loopType === 'forEach') {
|
||||
paths.push('currentItem', 'items')
|
||||
}
|
||||
} else {
|
||||
const parallelType = parallels[blockId]?.parallelType || 'count'
|
||||
if (parallelType === 'collection') {
|
||||
paths.push('currentItem', 'items')
|
||||
}
|
||||
}
|
||||
return paths
|
||||
}
|
||||
|
||||
function formatOutputsWithPrefix(paths: string[], blockName: string): string[] {
|
||||
const normalizedName = normalizeName(blockName)
|
||||
return paths.map((path) => `${normalizedName}.${path}`)
|
||||
}
|
||||
|
||||
@@ -48,7 +48,8 @@ export class GetBlocksAndToolsClientTool extends BaseClientTool {
|
||||
const parsed = ExecuteResponseSuccessSchema.parse(json)
|
||||
const result = GetBlocksAndToolsResult.parse(parsed.result)
|
||||
|
||||
await this.markToolComplete(200, 'Successfully retrieved blocks and tools', result)
|
||||
// TODO: Temporarily sending empty data to test 403 issue
|
||||
await this.markToolComplete(200, 'Successfully retrieved blocks and tools', {})
|
||||
this.setState(ClientToolCallState.success)
|
||||
} catch (error: any) {
|
||||
const message = error instanceof Error ? error.message : String(error)
|
||||
|
||||
@@ -35,6 +35,11 @@ export const env = createEnv({
|
||||
SIM_AGENT_API_URL: z.string().url().optional(), // URL for internal sim agent API
|
||||
AGENT_INDEXER_URL: z.string().url().optional(), // URL for agent training data indexer
|
||||
AGENT_INDEXER_API_KEY: z.string().min(1).optional(), // API key for agent indexer authentication
|
||||
COPILOT_STREAM_TTL_SECONDS: z.number().optional(), // Redis TTL for copilot SSE buffer
|
||||
COPILOT_STREAM_EVENT_LIMIT: z.number().optional(), // Max events retained per stream
|
||||
COPILOT_STREAM_RESERVE_BATCH: z.number().optional(), // Event ID reservation batch size
|
||||
COPILOT_STREAM_FLUSH_INTERVAL_MS: z.number().optional(), // Buffer flush interval in ms
|
||||
COPILOT_STREAM_FLUSH_MAX_BATCH: z.number().optional(), // Max events per flush batch
|
||||
|
||||
// Database & Storage
|
||||
REDIS_URL: z.string().url().optional(), // Redis connection string for caching/sessions
|
||||
|
||||
@@ -5,6 +5,11 @@ import { create } from 'zustand'
|
||||
import { devtools } from 'zustand/middleware'
|
||||
import { type CopilotChat, sendStreamingMessage } from '@/lib/copilot/api'
|
||||
import type { CopilotTransportMode } from '@/lib/copilot/models'
|
||||
import {
|
||||
normalizeSseEvent,
|
||||
shouldSkipToolCallEvent,
|
||||
shouldSkipToolResultEvent,
|
||||
} from '@/lib/copilot/orchestrator/sse-utils'
|
||||
import type {
|
||||
BaseClientToolMetadata,
|
||||
ClientToolDisplay,
|
||||
@@ -2045,6 +2050,12 @@ async function applySseEvent(
|
||||
get: () => CopilotStore,
|
||||
set: (next: Partial<CopilotStore> | ((state: CopilotStore) => Partial<CopilotStore>)) => void
|
||||
): Promise<boolean> {
|
||||
const normalizedEvent = normalizeSseEvent(data)
|
||||
if (shouldSkipToolCallEvent(normalizedEvent) || shouldSkipToolResultEvent(normalizedEvent)) {
|
||||
return true
|
||||
}
|
||||
data = normalizedEvent
|
||||
|
||||
if (data.type === 'subagent_start') {
|
||||
const toolCallId = data.data?.tool_call_id
|
||||
if (toolCallId) {
|
||||
|
||||
1
bun.lock
1
bun.lock
@@ -1,6 +1,5 @@
|
||||
{
|
||||
"lockfileVersion": 1,
|
||||
"configVersion": 0,
|
||||
"workspaces": {
|
||||
"": {
|
||||
"name": "simstudio",
|
||||
|
||||
@@ -836,6 +836,15 @@ describe('POST /api/v1/copilot/chat', () => {
|
||||
- Save checkpoints to resume
|
||||
- Handle partial completions gracefully
|
||||
|
||||
### Risk 6: Process-Local Dedupe
|
||||
|
||||
**Risk**: Tool call/result dedupe caches are in-memory and scoped to a single process, so duplicate events can still appear across ECS tasks.
|
||||
|
||||
**Mitigation**:
|
||||
- Treat dedupe as best-effort, not global
|
||||
- Prefer idempotent state updates on the client
|
||||
- Use Redis-backed stream replay for authoritative ordering
|
||||
|
||||
---
|
||||
|
||||
## File Inventory
|
||||
|
||||
Reference in New Issue
Block a user