From 8aadd0e3f05fc2cc86b703a7572adf3c946f5bab Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan Date: Thu, 5 Feb 2026 11:20:58 -0800 Subject: [PATCH] Checkpoint --- apps/sim/lib/copilot/orchestrator/index.ts | 48 +- .../copilot/orchestrator/sse-handlers.test.ts | 95 ++ .../lib/copilot/orchestrator/sse-handlers.ts | 109 +- .../copilot/orchestrator/sse-utils.test.ts | 43 + .../sim/lib/copilot/orchestrator/sse-utils.ts | 119 ++ .../orchestrator/stream-buffer.test.ts | 118 ++ .../lib/copilot/orchestrator/stream-buffer.ts | 58 +- apps/sim/lib/copilot/orchestrator/subagent.ts | 48 +- .../lib/copilot/orchestrator/tool-executor.ts | 1504 +---------------- .../orchestrator/tool-executor/access.ts | 130 ++ .../tool-executor/deployment-tools.ts | 479 ++++++ .../tool-executor/integration-tools.ts | 100 ++ .../tool-executor/workflow-tools.ts | 769 +++++++++ .../client/blocks/get-blocks-and-tools.ts | 3 +- apps/sim/lib/core/config/env.ts | 5 + apps/sim/stores/panel/copilot/store.ts | 11 + bun.lock | 1 - docs/COPILOT_SERVER_REFACTOR.md | 9 + 18 files changed, 1969 insertions(+), 1680 deletions(-) create mode 100644 apps/sim/lib/copilot/orchestrator/sse-handlers.test.ts create mode 100644 apps/sim/lib/copilot/orchestrator/sse-utils.test.ts create mode 100644 apps/sim/lib/copilot/orchestrator/sse-utils.ts create mode 100644 apps/sim/lib/copilot/orchestrator/stream-buffer.test.ts create mode 100644 apps/sim/lib/copilot/orchestrator/tool-executor/access.ts create mode 100644 apps/sim/lib/copilot/orchestrator/tool-executor/deployment-tools.ts create mode 100644 apps/sim/lib/copilot/orchestrator/tool-executor/integration-tools.ts create mode 100644 apps/sim/lib/copilot/orchestrator/tool-executor/workflow-tools.ts diff --git a/apps/sim/lib/copilot/orchestrator/index.ts b/apps/sim/lib/copilot/orchestrator/index.ts index e990612be..0fe0abe62 100644 --- a/apps/sim/lib/copilot/orchestrator/index.ts +++ b/apps/sim/lib/copilot/orchestrator/index.ts @@ -1,16 +1,11 @@ import { createLogger } from '@sim/logger' import { SIM_AGENT_API_URL_DEFAULT } from '@/lib/copilot/constants' +import { handleSubagentRouting, sseHandlers, subAgentHandlers } from '@/lib/copilot/orchestrator/sse-handlers' import { - getToolCallIdFromEvent, - handleSubagentRouting, - markToolCallSeen, - markToolResultSeen, normalizeSseEvent, - sseHandlers, - subAgentHandlers, - wasToolCallSeen, - wasToolResultSeen, -} from '@/lib/copilot/orchestrator/sse-handlers' + shouldSkipToolCallEvent, + shouldSkipToolResultEvent, +} from '@/lib/copilot/orchestrator/sse-utils' import { parseSSEStream } from '@/lib/copilot/orchestrator/sse-parser' import { prepareExecutionContext } from '@/lib/copilot/orchestrator/tool-executor' import type { @@ -99,38 +94,9 @@ export async function orchestrateCopilotStream( const normalizedEvent = normalizeSseEvent(event) - // Skip tool_result events for tools the sim-side already executed. - // The sim-side emits its own tool_result with complete data. - // For server-side tools (not executed by sim), we still forward the Go backend's tool_result. - const toolCallId = getToolCallIdFromEvent(normalizedEvent) - const eventData = normalizedEvent.data - - const isPartialToolCall = - normalizedEvent.type === 'tool_call' && eventData?.partial === true - - const shouldSkipToolCall = - normalizedEvent.type === 'tool_call' && - !!toolCallId && - !isPartialToolCall && - (wasToolResultSeen(toolCallId) || wasToolCallSeen(toolCallId)) - - if ( - normalizedEvent.type === 'tool_call' && - toolCallId && - !isPartialToolCall && - !shouldSkipToolCall - ) { - markToolCallSeen(toolCallId) - } - - const shouldSkipToolResult = - normalizedEvent.type === 'tool_result' && - (() => { - if (!toolCallId) return false - if (wasToolResultSeen(toolCallId)) return true - markToolResultSeen(toolCallId) - return false - })() + // Skip duplicate tool events to prevent state regressions. + const shouldSkipToolCall = shouldSkipToolCallEvent(normalizedEvent) + const shouldSkipToolResult = shouldSkipToolResultEvent(normalizedEvent) if (!shouldSkipToolCall && !shouldSkipToolResult) { await forwardEvent(normalizedEvent, options) diff --git a/apps/sim/lib/copilot/orchestrator/sse-handlers.test.ts b/apps/sim/lib/copilot/orchestrator/sse-handlers.test.ts new file mode 100644 index 000000000..f9368ec69 --- /dev/null +++ b/apps/sim/lib/copilot/orchestrator/sse-handlers.test.ts @@ -0,0 +1,95 @@ +/** + * @vitest-environment node + */ +import { beforeEach, describe, expect, it, vi } from 'vitest' +import { loggerMock } from '@sim/testing' + +vi.mock('@sim/logger', () => loggerMock) + +const executeToolServerSide = vi.fn() +const markToolComplete = vi.fn() + +vi.mock('@/lib/copilot/orchestrator/tool-executor', () => ({ + executeToolServerSide, + markToolComplete, +})) + +import { sseHandlers } from '@/lib/copilot/orchestrator/sse-handlers' +import type { ExecutionContext, StreamingContext } from '@/lib/copilot/orchestrator/types' + +describe('sse-handlers tool lifecycle', () => { + let context: StreamingContext + let execContext: ExecutionContext + + beforeEach(() => { + vi.clearAllMocks() + context = { + chatId: undefined, + conversationId: undefined, + messageId: 'msg-1', + accumulatedContent: '', + contentBlocks: [], + toolCalls: new Map(), + currentThinkingBlock: null, + isInThinkingBlock: false, + subAgentParentToolCallId: undefined, + subAgentContent: {}, + subAgentToolCalls: {}, + pendingContent: '', + streamComplete: false, + wasAborted: false, + errors: [], + } + execContext = { + userId: 'user-1', + workflowId: 'workflow-1', + } + }) + + it('executes tool_call and emits tool_result + mark-complete', async () => { + executeToolServerSide.mockResolvedValueOnce({ success: true, output: { ok: true } }) + markToolComplete.mockResolvedValueOnce(true) + const onEvent = vi.fn() + + await sseHandlers.tool_call( + { + type: 'tool_call', + data: { id: 'tool-1', name: 'get_user_workflow', arguments: { workflowId: 'workflow-1' } }, + } as any, + context, + execContext, + { onEvent, interactive: false, timeout: 1000 } + ) + + expect(executeToolServerSide).toHaveBeenCalledTimes(1) + expect(markToolComplete).toHaveBeenCalledTimes(1) + expect(onEvent).toHaveBeenCalledWith( + expect.objectContaining({ + type: 'tool_result', + toolCallId: 'tool-1', + success: true, + }) + ) + + const updated = context.toolCalls.get('tool-1') + expect(updated?.status).toBe('success') + expect(updated?.result?.output).toEqual({ ok: true }) + }) + + it('skips duplicate tool_call after result', async () => { + executeToolServerSide.mockResolvedValueOnce({ success: true, output: { ok: true } }) + markToolComplete.mockResolvedValueOnce(true) + + const event = { + type: 'tool_call', + data: { id: 'tool-dup', name: 'get_user_workflow', arguments: { workflowId: 'workflow-1' } }, + } + + await sseHandlers.tool_call(event as any, context, execContext, { interactive: false }) + await sseHandlers.tool_call(event as any, context, execContext, { interactive: false }) + + expect(executeToolServerSide).toHaveBeenCalledTimes(1) + expect(markToolComplete).toHaveBeenCalledTimes(1) + }) +}) + diff --git a/apps/sim/lib/copilot/orchestrator/sse-handlers.ts b/apps/sim/lib/copilot/orchestrator/sse-handlers.ts index 9a3d0f1b9..0f5f3df1a 100644 --- a/apps/sim/lib/copilot/orchestrator/sse-handlers.ts +++ b/apps/sim/lib/copilot/orchestrator/sse-handlers.ts @@ -1,6 +1,11 @@ import { createLogger } from '@sim/logger' import { INTERRUPT_TOOL_SET, SUBAGENT_TOOL_SET } from '@/lib/copilot/orchestrator/config' import { getToolConfirmation } from '@/lib/copilot/orchestrator/persistence' +import { + getEventData, + markToolResultSeen, + wasToolResultSeen, +} from '@/lib/copilot/orchestrator/sse-utils' import { executeToolServerSide, markToolComplete } from '@/lib/copilot/orchestrator/tool-executor' import type { ContentBlock, @@ -13,109 +18,7 @@ import type { const logger = createLogger('CopilotSseHandlers') -/** - * Tracks tool call IDs for which a tool_call has already been forwarded/emitted (non-partial). - */ -const seenToolCalls = new Set() - -/** - * Tracks tool call IDs for which a tool_result has already been emitted or forwarded. - */ -const seenToolResults = new Set() - -export function markToolCallSeen(toolCallId: string): void { - seenToolCalls.add(toolCallId) - setTimeout( - () => { - seenToolCalls.delete(toolCallId) - }, - 5 * 60 * 1000 - ) -} - -export function wasToolCallSeen(toolCallId: string): boolean { - return seenToolCalls.has(toolCallId) -} - -type EventDataObject = Record | undefined - -const parseEventData = (data: unknown): EventDataObject => { - if (!data) return undefined - if (typeof data !== 'string') { - return data as EventDataObject - } - try { - return JSON.parse(data) as EventDataObject - } catch { - return undefined - } -} - -const hasToolFields = (data: EventDataObject): boolean => { - if (!data) return false - return ( - data.id !== undefined || - data.toolCallId !== undefined || - data.name !== undefined || - data.success !== undefined || - data.result !== undefined || - data.arguments !== undefined - ) -} - -const getEventData = (event: SSEEvent): EventDataObject => { - const topLevel = parseEventData(event.data) - if (!topLevel) return undefined - if (hasToolFields(topLevel)) return topLevel - const nested = parseEventData(topLevel.data) - return nested || topLevel -} - -export function getToolCallIdFromEvent(event: SSEEvent): string | undefined { - const data = getEventData(event) - return event.toolCallId || data?.id || data?.toolCallId -} - -/** Normalizes SSE events so tool metadata is available at the top level. */ -export function normalizeSseEvent(event: SSEEvent): SSEEvent { - if (!event) return event - const data = getEventData(event) - if (!data) return event - const toolCallId = event.toolCallId || data.id || data.toolCallId - const toolName = event.toolName || data.name || data.toolName - const success = event.success ?? data.success - const result = event.result ?? data.result - const normalizedData = typeof event.data === 'string' ? data : event.data - return { - ...event, - data: normalizedData, - toolCallId, - toolName, - success, - result, - } -} - -/** - * Mark a tool call as executed by the sim-side. - * This prevents the Go backend's duplicate tool_result from being forwarded. - */ -export function markToolResultSeen(toolCallId: string): void { - seenToolResults.add(toolCallId) - setTimeout( - () => { - seenToolResults.delete(toolCallId) - }, - 5 * 60 * 1000 - ) -} - -/** - * Check if a tool call was executed by the sim-side. - */ -export function wasToolResultSeen(toolCallId: string): boolean { - return seenToolResults.has(toolCallId) -} +// Normalization + dedupe helpers live in sse-utils to keep server/client in sync. /** * Respond tools are internal to the copilot's subagent system. diff --git a/apps/sim/lib/copilot/orchestrator/sse-utils.test.ts b/apps/sim/lib/copilot/orchestrator/sse-utils.test.ts new file mode 100644 index 000000000..37b748a7f --- /dev/null +++ b/apps/sim/lib/copilot/orchestrator/sse-utils.test.ts @@ -0,0 +1,43 @@ +/** + * @vitest-environment node + */ +import { describe, expect, it } from 'vitest' +import { + normalizeSseEvent, + shouldSkipToolCallEvent, + shouldSkipToolResultEvent, +} from '@/lib/copilot/orchestrator/sse-utils' + +describe('sse-utils', () => { + it.concurrent('normalizes tool fields from string data', () => { + const event = { + type: 'tool_result', + data: JSON.stringify({ + id: 'tool_1', + name: 'edit_workflow', + success: true, + result: { ok: true }, + }), + } + + const normalized = normalizeSseEvent(event as any) + + expect(normalized.toolCallId).toBe('tool_1') + expect(normalized.toolName).toBe('edit_workflow') + expect(normalized.success).toBe(true) + expect(normalized.result).toEqual({ ok: true }) + }) + + it.concurrent('dedupes tool_call events', () => { + const event = { type: 'tool_call', data: { id: 'tool_call_1', name: 'plan' } } + expect(shouldSkipToolCallEvent(event as any)).toBe(false) + expect(shouldSkipToolCallEvent(event as any)).toBe(true) + }) + + it.concurrent('dedupes tool_result events', () => { + const event = { type: 'tool_result', data: { id: 'tool_result_1', name: 'plan' } } + expect(shouldSkipToolResultEvent(event as any)).toBe(false) + expect(shouldSkipToolResultEvent(event as any)).toBe(true) + }) +}) + diff --git a/apps/sim/lib/copilot/orchestrator/sse-utils.ts b/apps/sim/lib/copilot/orchestrator/sse-utils.ts new file mode 100644 index 000000000..792a42aba --- /dev/null +++ b/apps/sim/lib/copilot/orchestrator/sse-utils.ts @@ -0,0 +1,119 @@ +import type { SSEEvent } from '@/lib/copilot/orchestrator/types' + +type EventDataObject = Record | undefined + +const DEFAULT_TOOL_EVENT_TTL_MS = 5 * 60 * 1000 + +/** + * In-memory tool event dedupe. + * + * NOTE: These sets are process-local only. In a multi-instance setup (e.g., ECS), + * each task maintains its own dedupe cache, so duplicates can still appear across tasks. + */ +const seenToolCalls = new Set() +const seenToolResults = new Set() + +const parseEventData = (data: unknown): EventDataObject => { + if (!data) return undefined + if (typeof data !== 'string') { + return data as EventDataObject + } + try { + return JSON.parse(data) as EventDataObject + } catch { + return undefined + } +} + +const hasToolFields = (data: EventDataObject): boolean => { + if (!data) return false + return ( + data.id !== undefined || + data.toolCallId !== undefined || + data.name !== undefined || + data.success !== undefined || + data.result !== undefined || + data.arguments !== undefined + ) +} + +export const getEventData = (event: SSEEvent): EventDataObject => { + const topLevel = parseEventData(event.data) + if (!topLevel) return undefined + if (hasToolFields(topLevel)) return topLevel + const nested = parseEventData(topLevel.data) + return nested || topLevel +} + +export function getToolCallIdFromEvent(event: SSEEvent): string | undefined { + const data = getEventData(event) + return event.toolCallId || data?.id || data?.toolCallId +} + +/** Normalizes SSE events so tool metadata is available at the top level. */ +export function normalizeSseEvent(event: SSEEvent): SSEEvent { + if (!event) return event + const data = getEventData(event) + if (!data) return event + const toolCallId = event.toolCallId || data.id || data.toolCallId + const toolName = event.toolName || data.name || data.toolName + const success = event.success ?? data.success + const result = event.result ?? data.result + const normalizedData = typeof event.data === 'string' ? data : event.data + return { + ...event, + data: normalizedData, + toolCallId, + toolName, + success, + result, + } +} + +export function markToolCallSeen(toolCallId: string, ttlMs: number = DEFAULT_TOOL_EVENT_TTL_MS): void { + seenToolCalls.add(toolCallId) + setTimeout(() => { + seenToolCalls.delete(toolCallId) + }, ttlMs) +} + +export function wasToolCallSeen(toolCallId: string): boolean { + return seenToolCalls.has(toolCallId) +} + +export function markToolResultSeen( + toolCallId: string, + ttlMs: number = DEFAULT_TOOL_EVENT_TTL_MS +): void { + seenToolResults.add(toolCallId) + setTimeout(() => { + seenToolResults.delete(toolCallId) + }, ttlMs) +} + +export function wasToolResultSeen(toolCallId: string): boolean { + return seenToolResults.has(toolCallId) +} + +export function shouldSkipToolCallEvent(event: SSEEvent): boolean { + if (event.type !== 'tool_call') return false + const toolCallId = getToolCallIdFromEvent(event) + if (!toolCallId) return false + const eventData = getEventData(event) + if (eventData?.partial === true) return false + if (wasToolResultSeen(toolCallId) || wasToolCallSeen(toolCallId)) { + return true + } + markToolCallSeen(toolCallId) + return false +} + +export function shouldSkipToolResultEvent(event: SSEEvent): boolean { + if (event.type !== 'tool_result') return false + const toolCallId = getToolCallIdFromEvent(event) + if (!toolCallId) return false + if (wasToolResultSeen(toolCallId)) return true + markToolResultSeen(toolCallId) + return false +} + diff --git a/apps/sim/lib/copilot/orchestrator/stream-buffer.test.ts b/apps/sim/lib/copilot/orchestrator/stream-buffer.test.ts new file mode 100644 index 000000000..6e834c629 --- /dev/null +++ b/apps/sim/lib/copilot/orchestrator/stream-buffer.test.ts @@ -0,0 +1,118 @@ +/** + * @vitest-environment node + */ +import { beforeEach, describe, expect, it, vi } from 'vitest' +import { loggerMock } from '@sim/testing' + +vi.mock('@sim/logger', () => loggerMock) + +type StoredEntry = { score: number; value: string } + +const createRedisStub = () => { + const events = new Map() + const counters = new Map() + + const readEntries = (key: string, min: number, max: number) => { + const list = events.get(key) || [] + return list + .filter((entry) => entry.score >= min && entry.score <= max) + .sort((a, b) => a.score - b.score) + .map((entry) => entry.value) + } + + return { + del: vi.fn().mockResolvedValue(1), + hset: vi.fn().mockResolvedValue(1), + hgetall: vi.fn().mockResolvedValue({}), + expire: vi.fn().mockResolvedValue(1), + eval: vi.fn().mockImplementation( + ( + _lua: string, + _keysCount: number, + seqKey: string, + eventsKey: string, + _ttl: number, + _limit: number, + streamId: string, + eventJson: string + ) => { + const current = counters.get(seqKey) || 0 + const next = current + 1 + counters.set(seqKey, next) + const entry = JSON.stringify({ eventId: next, streamId, event: JSON.parse(eventJson) }) + const list = events.get(eventsKey) || [] + list.push({ score: next, value: entry }) + events.set(eventsKey, list) + return next + } + ), + incrby: vi.fn().mockImplementation((key: string, amount: number) => { + const current = counters.get(key) || 0 + const next = current + amount + counters.set(key, next) + return next + }), + zrangebyscore: vi.fn().mockImplementation((key: string, min: string, max: string) => { + const minVal = Number(min) + const maxVal = max === '+inf' ? Number.POSITIVE_INFINITY : Number(max) + return Promise.resolve(readEntries(key, minVal, maxVal)) + }), + pipeline: vi.fn().mockImplementation(() => { + const api = { + zadd: vi.fn().mockImplementation((key: string, ...args: Array) => { + const list = events.get(key) || [] + for (let i = 0; i < args.length; i += 2) { + list.push({ score: Number(args[i]), value: String(args[i + 1]) }) + } + events.set(key, list) + return api + }), + expire: vi.fn().mockReturnValue(api), + zremrangebyrank: vi.fn().mockReturnValue(api), + exec: vi.fn().mockResolvedValue([]), + } + return api + }), + } +} + +let mockRedis: ReturnType + +vi.mock('@/lib/core/config/redis', () => ({ + getRedisClient: () => mockRedis, +})) + +import { + appendStreamEvent, + createStreamEventWriter, + readStreamEvents, +} from '@/lib/copilot/orchestrator/stream-buffer' + +describe('stream-buffer', () => { + beforeEach(() => { + mockRedis = createRedisStub() + vi.clearAllMocks() + }) + + it.concurrent('replays events after a given event id', async () => { + await appendStreamEvent('stream-1', { type: 'content', data: 'hello' }) + await appendStreamEvent('stream-1', { type: 'content', data: 'world' }) + + const allEvents = await readStreamEvents('stream-1', 0) + expect(allEvents.map((entry) => entry.event.data)).toEqual(['hello', 'world']) + + const replayed = await readStreamEvents('stream-1', 1) + expect(replayed.map((entry) => entry.event.data)).toEqual(['world']) + }) + + it.concurrent('flushes buffered events for resume', async () => { + const writer = createStreamEventWriter('stream-2') + await writer.write({ type: 'content', data: 'a' }) + await writer.write({ type: 'content', data: 'b' }) + await writer.flush() + + const events = await readStreamEvents('stream-2', 0) + expect(events.map((entry) => entry.event.data)).toEqual(['a', 'b']) + }) +}) + diff --git a/apps/sim/lib/copilot/orchestrator/stream-buffer.ts b/apps/sim/lib/copilot/orchestrator/stream-buffer.ts index 24621ee57..29fd8f55b 100644 --- a/apps/sim/lib/copilot/orchestrator/stream-buffer.ts +++ b/apps/sim/lib/copilot/orchestrator/stream-buffer.ts @@ -1,13 +1,40 @@ import { createLogger } from '@sim/logger' +import { env } from '@/lib/core/config/env' import { getRedisClient } from '@/lib/core/config/redis' const logger = createLogger('CopilotStreamBuffer') -const STREAM_TTL_SECONDS = 60 * 60 -const STREAM_EVENT_LIMIT = 5000 -const STREAM_RESERVE_BATCH = 200 -const STREAM_FLUSH_INTERVAL_MS = 15 -const STREAM_FLUSH_MAX_BATCH = 200 +const STREAM_DEFAULTS = { + ttlSeconds: 60 * 60, + eventLimit: 5000, + reserveBatch: 200, + flushIntervalMs: 15, + flushMaxBatch: 200, +} + +export type StreamBufferConfig = { + ttlSeconds: number + eventLimit: number + reserveBatch: number + flushIntervalMs: number + flushMaxBatch: number +} + +const parseNumber = (value: number | string | undefined, fallback: number): number => { + if (typeof value === 'number' && Number.isFinite(value)) return value + const parsed = Number(value) + return Number.isFinite(parsed) ? parsed : fallback +} + +export function getStreamBufferConfig(): StreamBufferConfig { + return { + ttlSeconds: parseNumber(env.COPILOT_STREAM_TTL_SECONDS, STREAM_DEFAULTS.ttlSeconds), + eventLimit: parseNumber(env.COPILOT_STREAM_EVENT_LIMIT, STREAM_DEFAULTS.eventLimit), + reserveBatch: parseNumber(env.COPILOT_STREAM_RESERVE_BATCH, STREAM_DEFAULTS.reserveBatch), + flushIntervalMs: parseNumber(env.COPILOT_STREAM_FLUSH_INTERVAL_MS, STREAM_DEFAULTS.flushIntervalMs), + flushMaxBatch: parseNumber(env.COPILOT_STREAM_FLUSH_MAX_BATCH, STREAM_DEFAULTS.flushMaxBatch), + } +} const APPEND_STREAM_EVENT_LUA = ` local seqKey = KEYS[1] @@ -82,6 +109,7 @@ export async function setStreamMeta(streamId: string, meta: StreamMeta): Promise const redis = getRedisClient() if (!redis) return try { + const config = getStreamBufferConfig() const payload: Record = { status: meta.status, updatedAt: meta.updatedAt || new Date().toISOString(), @@ -89,7 +117,7 @@ export async function setStreamMeta(streamId: string, meta: StreamMeta): Promise if (meta.userId) payload.userId = meta.userId if (meta.error) payload.error = meta.error await redis.hset(getMetaKey(streamId), payload) - await redis.expire(getMetaKey(streamId), STREAM_TTL_SECONDS) + await redis.expire(getMetaKey(streamId), config.ttlSeconds) } catch (error) { logger.warn('Failed to update stream meta', { streamId, @@ -124,14 +152,15 @@ export async function appendStreamEvent( } try { + const config = getStreamBufferConfig() const eventJson = JSON.stringify(event) const nextId = await redis.eval( APPEND_STREAM_EVENT_LUA, 2, getSeqKey(streamId), getEventsKey(streamId), - STREAM_TTL_SECONDS, - STREAM_EVENT_LIMIT, + config.ttlSeconds, + config.eventLimit, streamId, eventJson ) @@ -156,6 +185,7 @@ export function createStreamEventWriter(streamId: string): StreamEventWriter { } } + const config = getStreamBufferConfig() let pending: StreamEventEntry[] = [] let nextEventId = 0 let maxReservedId = 0 @@ -167,11 +197,11 @@ export function createStreamEventWriter(streamId: string): StreamEventWriter { flushTimer = setTimeout(() => { flushTimer = null void flush() - }, STREAM_FLUSH_INTERVAL_MS) + }, config.flushIntervalMs) } const reserveIds = async (minCount: number) => { - const reserveCount = Math.max(STREAM_RESERVE_BATCH, minCount) + const reserveCount = Math.max(config.reserveBatch, minCount) const newMax = await redis.incrby(getSeqKey(streamId), reserveCount) const startId = newMax - reserveCount + 1 if (nextEventId === 0 || nextEventId > maxReservedId) { @@ -193,9 +223,9 @@ export function createStreamEventWriter(streamId: string): StreamEventWriter { } const pipeline = redis.pipeline() pipeline.zadd(key, ...(zaddArgs as any)) - pipeline.expire(key, STREAM_TTL_SECONDS) - pipeline.expire(getSeqKey(streamId), STREAM_TTL_SECONDS) - pipeline.zremrangebyrank(key, 0, -STREAM_EVENT_LIMIT - 1) + pipeline.expire(key, config.ttlSeconds) + pipeline.expire(getSeqKey(streamId), config.ttlSeconds) + pipeline.zremrangebyrank(key, 0, -config.eventLimit - 1) await pipeline.exec() } catch (error) { logger.warn('Failed to flush stream events', { @@ -216,7 +246,7 @@ export function createStreamEventWriter(streamId: string): StreamEventWriter { const eventId = nextEventId++ const entry: StreamEventEntry = { eventId, streamId, event } pending.push(entry) - if (pending.length >= STREAM_FLUSH_MAX_BATCH) { + if (pending.length >= config.flushMaxBatch) { await flush() } else { scheduleFlush() diff --git a/apps/sim/lib/copilot/orchestrator/subagent.ts b/apps/sim/lib/copilot/orchestrator/subagent.ts index fa1f3d36a..17079e4d5 100644 --- a/apps/sim/lib/copilot/orchestrator/subagent.ts +++ b/apps/sim/lib/copilot/orchestrator/subagent.ts @@ -1,16 +1,11 @@ import { createLogger } from '@sim/logger' import { SIM_AGENT_API_URL_DEFAULT } from '@/lib/copilot/constants' +import { handleSubagentRouting, sseHandlers, subAgentHandlers } from '@/lib/copilot/orchestrator/sse-handlers' import { - getToolCallIdFromEvent, - handleSubagentRouting, - markToolCallSeen, - markToolResultSeen, normalizeSseEvent, - sseHandlers, - subAgentHandlers, - wasToolCallSeen, - wasToolResultSeen, -} from '@/lib/copilot/orchestrator/sse-handlers' + shouldSkipToolCallEvent, + shouldSkipToolResultEvent, +} from '@/lib/copilot/orchestrator/sse-utils' import { parseSSEStream } from '@/lib/copilot/orchestrator/sse-parser' import { prepareExecutionContext } from '@/lib/copilot/orchestrator/tool-executor' import type { @@ -115,38 +110,9 @@ export async function orchestrateSubagentStream( const normalizedEvent = normalizeSseEvent(event) - // Skip tool_result events for tools the sim-side already executed. - // The sim-side emits its own tool_result with complete data. - // For server-side tools (not executed by sim), we still forward the Go backend's tool_result. - const toolCallId = getToolCallIdFromEvent(normalizedEvent) - const eventData = normalizedEvent.data - - const isPartialToolCall = - normalizedEvent.type === 'tool_call' && eventData?.partial === true - - const shouldSkipToolCall = - normalizedEvent.type === 'tool_call' && - !!toolCallId && - !isPartialToolCall && - (wasToolResultSeen(toolCallId) || wasToolCallSeen(toolCallId)) - - if ( - normalizedEvent.type === 'tool_call' && - toolCallId && - !isPartialToolCall && - !shouldSkipToolCall - ) { - markToolCallSeen(toolCallId) - } - - const shouldSkipToolResult = - normalizedEvent.type === 'tool_result' && - (() => { - if (!toolCallId) return false - if (wasToolResultSeen(toolCallId)) return true - markToolResultSeen(toolCallId) - return false - })() + // Skip duplicate tool events to prevent state regressions. + const shouldSkipToolCall = shouldSkipToolCallEvent(normalizedEvent) + const shouldSkipToolResult = shouldSkipToolResultEvent(normalizedEvent) if (!shouldSkipToolCall && !shouldSkipToolResult) { await forwardEvent(normalizedEvent, options) diff --git a/apps/sim/lib/copilot/orchestrator/tool-executor.ts b/apps/sim/lib/copilot/orchestrator/tool-executor.ts index 867526395..1c04181cd 100644 --- a/apps/sim/lib/copilot/orchestrator/tool-executor.ts +++ b/apps/sim/lib/copilot/orchestrator/tool-executor.ts @@ -1,17 +1,7 @@ import { db } from '@sim/db' -import { - account, - chat, - customTools, - permissions, - workflow, - workflowFolder, - workflowMcpServer, - workflowMcpTool, - workspace, -} from '@sim/db/schema' +import { workflow } from '@sim/db/schema' import { createLogger } from '@sim/logger' -import { and, asc, desc, eq, inArray, isNull, max, or } from 'drizzle-orm' +import { eq } from 'drizzle-orm' import { SIM_AGENT_API_URL_DEFAULT } from '@/lib/copilot/constants' import type { ExecutionContext, @@ -19,34 +9,32 @@ import type { ToolCallState, } from '@/lib/copilot/orchestrator/types' import { routeExecution } from '@/lib/copilot/tools/server/router' -import { - extractWorkflowNames, - formatNormalizedWorkflowForCopilot, - normalizeWorkflowName, -} from '@/lib/copilot/tools/shared/workflow-utils' import { env } from '@/lib/core/config/env' -import { generateRequestId } from '@/lib/core/utils/request' import { getEffectiveDecryptedEnv } from '@/lib/environment/utils' -import { mcpService } from '@/lib/mcp/service' -import { sanitizeToolName } from '@/lib/mcp/workflow-tool-schema' -import { listWorkspaceFiles } from '@/lib/uploads/contexts/workspace' -import { getBlockOutputPaths } from '@/lib/workflows/blocks/block-outputs' -import { BlockPathCalculator } from '@/lib/workflows/blocks/block-path-calculator' -import { buildDefaultWorkflowArtifacts } from '@/lib/workflows/defaults' -import { executeWorkflow } from '@/lib/workflows/executor/execute-workflow' +import { executeIntegrationToolDirect } from '@/lib/copilot/orchestrator/tool-executor/integration-tools' import { - deployWorkflow, - loadWorkflowFromNormalizedTables, - saveWorkflowToNormalizedTables, - undeployWorkflow, -} from '@/lib/workflows/persistence/utils' -import { isInputDefinitionTrigger } from '@/lib/workflows/triggers/input-definition-triggers' -import { hasValidStartBlock } from '@/lib/workflows/triggers/trigger-utils.server' -import { refreshTokenIfNeeded } from '@/app/api/auth/oauth/utils' -import { checkChatAccess, checkWorkflowAccessForChatCreation } from '@/app/api/chat/utils' -import { normalizeName } from '@/executor/constants' -import { resolveEnvVarReferences } from '@/executor/utils/reference-validation' -import { executeTool } from '@/tools' + executeGetBlockOutputs, + executeGetBlockUpstreamReferences, + executeGetUserWorkflow, + executeGetWorkflowData, + executeGetWorkflowFromName, + executeListFolders, + executeListUserWorkflows, + executeListUserWorkspaces, + executeCreateWorkflow, + executeCreateFolder, + executeRunWorkflow, + executeSetGlobalWorkflowVariables, +} from '@/lib/copilot/orchestrator/tool-executor/workflow-tools' +import { + executeCheckDeploymentStatus, + executeCreateWorkspaceMcpServer, + executeDeployApi, + executeDeployChat, + executeDeployMcp, + executeListWorkspaceMcpServers, + executeRedeploy, +} from '@/lib/copilot/orchestrator/tool-executor/deployment-tools' import { getTool, resolveToolId } from '@/tools/utils' const logger = createLogger('CopilotToolExecutor') @@ -150,94 +138,6 @@ async function executeServerToolDirect( } } -/** - * Execute an integration tool directly via the tools registry. - */ -async function executeIntegrationToolDirect( - toolCall: ToolCallState, - toolConfig: any, - context: ExecutionContext -): Promise { - const { userId, workflowId } = context - const toolName = resolveToolId(toolCall.name) - const toolArgs = toolCall.params || {} - - let workspaceId = context.workspaceId - if (!workspaceId && workflowId) { - const workflowResult = await db - .select({ workspaceId: workflow.workspaceId }) - .from(workflow) - .where(eq(workflow.id, workflowId)) - .limit(1) - workspaceId = workflowResult[0]?.workspaceId ?? undefined - } - - const decryptedEnvVars = - context.decryptedEnvVars || (await getEffectiveDecryptedEnv(userId, workspaceId)) - - const executionParams: Record = resolveEnvVarReferences(toolArgs, decryptedEnvVars, { - deep: true, - }) as Record - - if (toolConfig.oauth?.required && toolConfig.oauth.provider) { - const provider = toolConfig.oauth.provider - const accounts = await db - .select() - .from(account) - .where(and(eq(account.providerId, provider), eq(account.userId, userId))) - .limit(1) - - if (!accounts.length) { - return { - success: false, - error: `No ${provider} account connected. Please connect your account first.`, - } - } - - const acc = accounts[0] - const requestId = generateRequestId() - const { accessToken } = await refreshTokenIfNeeded(requestId, acc as any, acc.id) - - if (!accessToken) { - return { - success: false, - error: `OAuth token not available for ${provider}. Please reconnect your account.`, - } - } - - executionParams.accessToken = accessToken - } - - if (toolConfig.params?.apiKey?.required && !executionParams.apiKey) { - return { - success: false, - error: `API key not provided for ${toolName}. Use {{YOUR_API_KEY_ENV_VAR}} to reference your environment variable.`, - } - } - - executionParams._context = { - workflowId, - userId, - } - - if (toolName === 'function_execute') { - executionParams.envVars = decryptedEnvVars - executionParams.workflowVariables = {} - executionParams.blockData = {} - executionParams.blockNameMapping = {} - executionParams.language = executionParams.language || 'javascript' - executionParams.timeout = executionParams.timeout || 30000 - } - - const result = await executeTool(toolName, executionParams) - - return { - success: result.success, - output: result.output, - error: result.error, - } -} - async function executeSimWorkflowTool( toolName: string, params: Record, @@ -287,1360 +187,6 @@ async function executeSimWorkflowTool( } } -async function ensureWorkflowAccess( - workflowId: string, - userId: string -): Promise<{ - workflow: typeof workflow.$inferSelect - workspaceId?: string | null -}> { - const [workflowRecord] = await db - .select() - .from(workflow) - .where(eq(workflow.id, workflowId)) - .limit(1) - if (!workflowRecord) { - throw new Error(`Workflow ${workflowId} not found`) - } - - if (workflowRecord.userId === userId) { - return { workflow: workflowRecord, workspaceId: workflowRecord.workspaceId } - } - - if (workflowRecord.workspaceId) { - const [permissionRow] = await db - .select({ permissionType: permissions.permissionType }) - .from(permissions) - .where( - and( - eq(permissions.entityType, 'workspace'), - eq(permissions.entityId, workflowRecord.workspaceId), - eq(permissions.userId, userId) - ) - ) - .limit(1) - if (permissionRow) { - return { workflow: workflowRecord, workspaceId: workflowRecord.workspaceId } - } - } - - throw new Error('Unauthorized workflow access') -} - -async function getDefaultWorkspaceId(userId: string): Promise { - const workspaces = await db - .select({ workspaceId: workspace.id }) - .from(permissions) - .innerJoin(workspace, eq(permissions.entityId, workspace.id)) - .where(and(eq(permissions.userId, userId), eq(permissions.entityType, 'workspace'))) - .orderBy(desc(workspace.createdAt)) - .limit(1) - - const workspaceId = workspaces[0]?.workspaceId - if (!workspaceId) { - throw new Error('No workspace found for user') - } - - return workspaceId -} - -async function ensureWorkspaceAccess( - workspaceId: string, - userId: string, - requireWrite: boolean -): Promise { - const [row] = await db - .select({ - permissionType: permissions.permissionType, - ownerId: workspace.ownerId, - }) - .from(permissions) - .innerJoin(workspace, eq(permissions.entityId, workspace.id)) - .where( - and( - eq(permissions.entityType, 'workspace'), - eq(permissions.entityId, workspaceId), - eq(permissions.userId, userId) - ) - ) - .limit(1) - - if (!row) { - throw new Error(`Workspace ${workspaceId} not found`) - } - - const isOwner = row.ownerId === userId - const permissionType = row.permissionType - const canWrite = isOwner || permissionType === 'admin' || permissionType === 'write' - - if (requireWrite && !canWrite) { - throw new Error('Write or admin access required for this workspace') - } - - if (!requireWrite && !canWrite && permissionType !== 'read') { - throw new Error('Access denied to workspace') - } -} - -async function getAccessibleWorkflowsForUser( - userId: string, - options?: { workspaceId?: string; folderId?: string } -) { - const workspaceIds = await db - .select({ entityId: permissions.entityId }) - .from(permissions) - .where(and(eq(permissions.userId, userId), eq(permissions.entityType, 'workspace'))) - - const workspaceIdList = workspaceIds.map((row) => row.entityId) - - const workflowConditions = [eq(workflow.userId, userId)] - if (workspaceIdList.length > 0) { - workflowConditions.push(inArray(workflow.workspaceId, workspaceIdList)) - } - if (options?.workspaceId) { - workflowConditions.push(eq(workflow.workspaceId, options.workspaceId)) - } - if (options?.folderId) { - workflowConditions.push(eq(workflow.folderId, options.folderId)) - } - - return db - .select() - .from(workflow) - .where(or(...workflowConditions)) - .orderBy(asc(workflow.sortOrder), asc(workflow.createdAt), asc(workflow.id)) -} - -async function executeGetUserWorkflow( - params: Record, - context: ExecutionContext -): Promise { - try { - const workflowId = params.workflowId || context.workflowId - if (!workflowId) { - return { success: false, error: 'workflowId is required' } - } - - const { workflow: workflowRecord, workspaceId } = await ensureWorkflowAccess( - workflowId, - context.userId - ) - - const normalized = await loadWorkflowFromNormalizedTables(workflowId) - const userWorkflow = formatNormalizedWorkflowForCopilot(normalized) - if (!userWorkflow) { - return { success: false, error: 'Workflow has no normalized data' } - } - - // Return workflow ID so copilot can use it for subsequent tool calls - return { - success: true, - output: { - workflowId, - workflowName: workflowRecord.name || '', - workspaceId, - userWorkflow, - }, - } - } catch (error) { - return { success: false, error: error instanceof Error ? error.message : String(error) } - } -} - -async function executeGetWorkflowFromName( - params: Record, - context: ExecutionContext -): Promise { - try { - const workflowName = typeof params.workflow_name === 'string' ? params.workflow_name.trim() : '' - if (!workflowName) { - return { success: false, error: 'workflow_name is required' } - } - - const workflows = await getAccessibleWorkflowsForUser(context.userId) - - const targetName = normalizeWorkflowName(workflowName) - const match = workflows.find((w) => normalizeWorkflowName(w.name) === targetName) - if (!match) { - return { success: false, error: `Workflow not found: ${workflowName}` } - } - - const normalized = await loadWorkflowFromNormalizedTables(match.id) - const userWorkflow = formatNormalizedWorkflowForCopilot(normalized) - if (!userWorkflow) { - return { success: false, error: 'Workflow has no normalized data' } - } - - // Return workflow ID and workspaceId so copilot can use them for subsequent tool calls - return { - success: true, - output: { - workflowId: match.id, - workflowName: match.name || '', - workspaceId: match.workspaceId, - userWorkflow, - }, - } - } catch (error) { - return { success: false, error: error instanceof Error ? error.message : String(error) } - } -} - -async function executeListUserWorkflows( - params: Record, - context: ExecutionContext -): Promise { - try { - const workspaceId = params?.workspaceId as string | undefined - const folderId = params?.folderId as string | undefined - - const workflows = await getAccessibleWorkflowsForUser(context.userId, { workspaceId, folderId }) - - // Return both names (for backward compatibility) and full workflow info with IDs - const names = extractWorkflowNames(workflows) - - const workflowList = workflows.map((w) => ({ - workflowId: w.id, - workflowName: w.name || '', - workspaceId: w.workspaceId, - folderId: w.folderId, - })) - - return { success: true, output: { workflow_names: names, workflows: workflowList } } - } catch (error) { - return { success: false, error: error instanceof Error ? error.message : String(error) } - } -} - -async function executeListUserWorkspaces(context: ExecutionContext): Promise { - try { - const workspaces = await db - .select({ - workspaceId: workspace.id, - workspaceName: workspace.name, - ownerId: workspace.ownerId, - permissionType: permissions.permissionType, - }) - .from(permissions) - .innerJoin(workspace, eq(permissions.entityId, workspace.id)) - .where(and(eq(permissions.userId, context.userId), eq(permissions.entityType, 'workspace'))) - .orderBy(desc(workspace.createdAt)) - - const output = workspaces.map((row) => ({ - workspaceId: row.workspaceId, - workspaceName: row.workspaceName, - role: row.ownerId === context.userId ? 'owner' : row.permissionType, - })) - - return { success: true, output: { workspaces: output } } - } catch (error) { - return { success: false, error: error instanceof Error ? error.message : String(error) } - } -} - -async function executeListFolders( - params: Record, - context: ExecutionContext -): Promise { - try { - const workspaceId = - (params?.workspaceId as string | undefined) || (await getDefaultWorkspaceId(context.userId)) - - await ensureWorkspaceAccess(workspaceId, context.userId, false) - - const folders = await db - .select({ - folderId: workflowFolder.id, - folderName: workflowFolder.name, - parentId: workflowFolder.parentId, - sortOrder: workflowFolder.sortOrder, - }) - .from(workflowFolder) - .where(eq(workflowFolder.workspaceId, workspaceId)) - .orderBy(asc(workflowFolder.sortOrder), asc(workflowFolder.createdAt)) - - return { - success: true, - output: { - workspaceId, - folders, - }, - } - } catch (error) { - return { success: false, error: error instanceof Error ? error.message : String(error) } - } -} - -async function executeCreateWorkflow( - params: Record, - context: ExecutionContext -): Promise { - try { - const name = typeof params?.name === 'string' ? params.name.trim() : '' - if (!name) { - return { success: false, error: 'name is required' } - } - - const workspaceId = params?.workspaceId || (await getDefaultWorkspaceId(context.userId)) - const folderId = params?.folderId || null - const description = typeof params?.description === 'string' ? params.description : null - - await ensureWorkspaceAccess(workspaceId, context.userId, true) - - const workflowId = crypto.randomUUID() - const now = new Date() - - const folderCondition = folderId ? eq(workflow.folderId, folderId) : isNull(workflow.folderId) - const [maxResult] = await db - .select({ maxOrder: max(workflow.sortOrder) }) - .from(workflow) - .where(and(eq(workflow.workspaceId, workspaceId), folderCondition)) - const sortOrder = (maxResult?.maxOrder ?? 0) + 1 - - await db.insert(workflow).values({ - id: workflowId, - userId: context.userId, - workspaceId, - folderId, - sortOrder, - name, - description, - color: '#3972F6', - lastSynced: now, - createdAt: now, - updatedAt: now, - isDeployed: false, - runCount: 0, - variables: {}, - }) - - const { workflowState } = buildDefaultWorkflowArtifacts() - const saveResult = await saveWorkflowToNormalizedTables(workflowId, workflowState) - if (!saveResult.success) { - throw new Error(saveResult.error || 'Failed to save workflow state') - } - - return { - success: true, - output: { - workflowId, - workflowName: name, - workspaceId, - folderId, - }, - } - } catch (error) { - return { success: false, error: error instanceof Error ? error.message : String(error) } - } -} - -async function executeCreateFolder( - params: Record, - context: ExecutionContext -): Promise { - try { - const name = typeof params?.name === 'string' ? params.name.trim() : '' - if (!name) { - return { success: false, error: 'name is required' } - } - - const workspaceId = params?.workspaceId || (await getDefaultWorkspaceId(context.userId)) - const parentId = params?.parentId || null - - await ensureWorkspaceAccess(workspaceId, context.userId, true) - - const [maxOrder] = await db - .select({ maxOrder: max(workflowFolder.sortOrder) }) - .from(workflowFolder) - .where( - and( - eq(workflowFolder.workspaceId, workspaceId), - parentId ? eq(workflowFolder.parentId, parentId) : isNull(workflowFolder.parentId) - ) - ) - .limit(1) - - const sortOrder = (maxOrder?.maxOrder ?? 0) + 1 - const folderId = crypto.randomUUID() - - await db.insert(workflowFolder).values({ - id: folderId, - name, - userId: context.userId, - workspaceId, - parentId, - color: '#6B7280', - sortOrder, - }) - - return { - success: true, - output: { - folderId, - folderName: name, - workspaceId, - parentId, - sortOrder, - }, - } - } catch (error) { - return { success: false, error: error instanceof Error ? error.message : String(error) } - } -} - -async function executeGetWorkflowData( - params: Record, - context: ExecutionContext -): Promise { - try { - const dataType = params.data_type - if (!dataType) { - return { success: false, error: 'data_type is required' } - } - const workflowId = params.workflowId || context.workflowId - if (!workflowId) { - return { success: false, error: 'workflowId is required' } - } - - const { workflow: workflowRecord, workspaceId } = await ensureWorkflowAccess( - workflowId, - context.userId - ) - - if (dataType === 'global_variables') { - const variablesRecord = (workflowRecord.variables as Record) || {} - const variables = Object.values(variablesRecord).map((v: any) => ({ - id: String(v?.id || ''), - name: String(v?.name || ''), - value: v?.value, - })) - return { success: true, output: { variables } } - } - - if (dataType === 'custom_tools') { - if (!workspaceId) { - return { success: false, error: 'workspaceId is required' } - } - const conditions = [ - eq(customTools.workspaceId, workspaceId), - and(eq(customTools.userId, context.userId), isNull(customTools.workspaceId)), - ] - const toolsRows = await db - .select() - .from(customTools) - .where(or(...conditions)) - .orderBy(desc(customTools.createdAt)) - - const customToolsData = toolsRows.map((tool) => ({ - id: String(tool.id || ''), - title: String(tool.title || ''), - functionName: String((tool.schema as any)?.function?.name || ''), - description: String((tool.schema as any)?.function?.description || ''), - parameters: (tool.schema as any)?.function?.parameters, - })) - - return { success: true, output: { customTools: customToolsData } } - } - - if (dataType === 'mcp_tools') { - if (!workspaceId) { - return { success: false, error: 'workspaceId is required' } - } - const tools = await mcpService.discoverTools(context.userId, workspaceId, false) - const mcpTools = tools.map((tool) => ({ - name: String(tool.name || ''), - serverId: String(tool.serverId || ''), - serverName: String(tool.serverName || ''), - description: String(tool.description || ''), - inputSchema: tool.inputSchema, - })) - return { success: true, output: { mcpTools } } - } - - if (dataType === 'files') { - if (!workspaceId) { - return { success: false, error: 'workspaceId is required' } - } - const files = await listWorkspaceFiles(workspaceId) - const fileResults = files.map((file) => ({ - id: String(file.id || ''), - name: String(file.name || ''), - key: String(file.key || ''), - path: String(file.path || ''), - size: Number(file.size || 0), - type: String(file.type || ''), - uploadedAt: String(file.uploadedAt || ''), - })) - return { success: true, output: { files: fileResults } } - } - - return { success: false, error: `Unknown data_type: ${dataType}` } - } catch (error) { - return { success: false, error: error instanceof Error ? error.message : String(error) } - } -} - -async function executeGetBlockOutputs( - params: Record, - context: ExecutionContext -): Promise { - try { - const workflowId = params.workflowId || context.workflowId - if (!workflowId) { - return { success: false, error: 'workflowId is required' } - } - await ensureWorkflowAccess(workflowId, context.userId) - - const normalized = await loadWorkflowFromNormalizedTables(workflowId) - if (!normalized) { - return { success: false, error: 'Workflow has no normalized data' } - } - - const blocks = normalized.blocks || {} - const loops = normalized.loops || {} - const parallels = normalized.parallels || {} - const blockIds = - Array.isArray(params.blockIds) && params.blockIds.length > 0 - ? params.blockIds - : Object.keys(blocks) - - const results: Array<{ - blockId: string - blockName: string - blockType: string - outputs: string[] - insideSubflowOutputs?: string[] - outsideSubflowOutputs?: string[] - triggerMode?: boolean - }> = [] - - for (const blockId of blockIds) { - const block = blocks[blockId] - if (!block?.type) continue - const blockName = block.name || block.type - - if (block.type === 'loop' || block.type === 'parallel') { - const insidePaths = getSubflowInsidePaths(block.type, blockId, loops, parallels) - results.push({ - blockId, - blockName, - blockType: block.type, - outputs: [], - insideSubflowOutputs: formatOutputsWithPrefix(insidePaths, blockName), - outsideSubflowOutputs: formatOutputsWithPrefix(['results'], blockName), - triggerMode: block.triggerMode, - }) - continue - } - - const outputs = getBlockOutputPaths(block.type, block.subBlocks, block.triggerMode) - results.push({ - blockId, - blockName, - blockType: block.type, - outputs: formatOutputsWithPrefix(outputs, blockName), - triggerMode: block.triggerMode, - }) - } - - const variables = await getWorkflowVariablesForTool(workflowId) - - const payload = { blocks: results, variables } - return { success: true, output: payload } - } catch (error) { - return { success: false, error: error instanceof Error ? error.message : String(error) } - } -} - -async function executeGetBlockUpstreamReferences( - params: Record, - context: ExecutionContext -): Promise { - try { - const workflowId = params.workflowId || context.workflowId - if (!workflowId) { - return { success: false, error: 'workflowId is required' } - } - if (!Array.isArray(params.blockIds) || params.blockIds.length === 0) { - return { success: false, error: 'blockIds array is required' } - } - - await ensureWorkflowAccess(workflowId, context.userId) - const normalized = await loadWorkflowFromNormalizedTables(workflowId) - if (!normalized) { - return { success: false, error: 'Workflow has no normalized data' } - } - - const blocks = normalized.blocks || {} - const edges = normalized.edges || [] - const loops = normalized.loops || {} - const parallels = normalized.parallels || {} - - const graphEdges = edges.map((edge: any) => ({ source: edge.source, target: edge.target })) - const variableOutputs = await getWorkflowVariablesForTool(workflowId) - - const results: any[] = [] - - for (const blockId of params.blockIds) { - const targetBlock = blocks[blockId] - if (!targetBlock) continue - - const insideSubflows: Array<{ blockId: string; blockName: string; blockType: string }> = [] - const containingLoopIds = new Set() - const containingParallelIds = new Set() - - Object.values(loops as Record).forEach((loop) => { - if (loop?.nodes?.includes(blockId)) { - containingLoopIds.add(loop.id) - const loopBlock = blocks[loop.id] - if (loopBlock) { - insideSubflows.push({ - blockId: loop.id, - blockName: loopBlock.name || loopBlock.type, - blockType: 'loop', - }) - } - } - }) - - Object.values(parallels as Record).forEach((parallel) => { - if (parallel?.nodes?.includes(blockId)) { - containingParallelIds.add(parallel.id) - const parallelBlock = blocks[parallel.id] - if (parallelBlock) { - insideSubflows.push({ - blockId: parallel.id, - blockName: parallelBlock.name || parallelBlock.type, - blockType: 'parallel', - }) - } - } - }) - - const ancestorIds = BlockPathCalculator.findAllPathNodes(graphEdges, blockId) - const accessibleIds = new Set(ancestorIds) - accessibleIds.add(blockId) - - const starterBlock = Object.values(blocks).find((b: any) => isInputDefinitionTrigger(b.type)) - if (starterBlock && ancestorIds.includes((starterBlock as any).id)) { - accessibleIds.add((starterBlock as any).id) - } - - containingLoopIds.forEach((loopId) => { - accessibleIds.add(loopId) - loops[loopId]?.nodes?.forEach((nodeId: string) => accessibleIds.add(nodeId)) - }) - - containingParallelIds.forEach((parallelId) => { - accessibleIds.add(parallelId) - parallels[parallelId]?.nodes?.forEach((nodeId: string) => accessibleIds.add(nodeId)) - }) - - const accessibleBlocks: any[] = [] - - for (const accessibleBlockId of accessibleIds) { - const block = blocks[accessibleBlockId] - if (!block?.type) continue - const canSelfReference = block.type === 'approval' || block.type === 'human_in_the_loop' - if (accessibleBlockId === blockId && !canSelfReference) continue - - const blockName = block.name || block.type - let accessContext: 'inside' | 'outside' | undefined - let outputPaths: string[] - - if (block.type === 'loop' || block.type === 'parallel') { - const isInside = - (block.type === 'loop' && containingLoopIds.has(accessibleBlockId)) || - (block.type === 'parallel' && containingParallelIds.has(accessibleBlockId)) - accessContext = isInside ? 'inside' : 'outside' - outputPaths = isInside - ? getSubflowInsidePaths(block.type, accessibleBlockId, loops, parallels) - : ['results'] - } else { - outputPaths = getBlockOutputPaths(block.type, block.subBlocks, block.triggerMode) - } - - const formattedOutputs = formatOutputsWithPrefix(outputPaths, blockName) - const entry: any = { - blockId: accessibleBlockId, - blockName, - blockType: block.type, - outputs: formattedOutputs, - } - if (block.triggerMode) entry.triggerMode = true - if (accessContext) entry.accessContext = accessContext - accessibleBlocks.push(entry) - } - - results.push({ - blockId, - blockName: targetBlock.name || targetBlock.type, - blockType: targetBlock.type, - accessibleBlocks, - insideSubflows, - variables: variableOutputs, - }) - } - - const payload = { results } - return { success: true, output: payload } - } catch (error) { - return { success: false, error: error instanceof Error ? error.message : String(error) } - } -} - -async function executeRunWorkflow( - params: Record, - context: ExecutionContext -): Promise { - try { - const workflowId = params.workflowId || context.workflowId - if (!workflowId) { - return { success: false, error: 'workflowId is required' } - } - - const { workflow: workflowRecord } = await ensureWorkflowAccess(workflowId, context.userId) - - const result = await executeWorkflow( - { - id: workflowRecord.id, - userId: workflowRecord.userId, - workspaceId: workflowRecord.workspaceId, - variables: workflowRecord.variables || {}, - }, - generateRequestId(), - params.workflow_input || params.input || undefined, - context.userId - ) - - return { - success: result.success, - output: { - executionId: result.executionId, - success: result.success, - output: result.output, - logs: result.logs, - }, - error: result.success ? undefined : result.error || 'Workflow execution failed', - } - } catch (error) { - return { success: false, error: error instanceof Error ? error.message : String(error) } - } -} - -async function executeSetGlobalWorkflowVariables( - params: Record, - context: ExecutionContext -): Promise { - try { - const workflowId = params.workflowId || context.workflowId - if (!workflowId) { - return { success: false, error: 'workflowId is required' } - } - const operations = Array.isArray(params.operations) ? params.operations : [] - const { workflow: workflowRecord } = await ensureWorkflowAccess(workflowId, context.userId) - - const currentVarsRecord = (workflowRecord.variables as Record) || {} - const byName: Record = {} - Object.values(currentVarsRecord).forEach((v: any) => { - if (v && typeof v === 'object' && v.id && v.name) byName[String(v.name)] = v - }) - - for (const op of operations) { - const key = String(op?.name || '') - if (!key) continue - const nextType = op?.type || byName[key]?.type || 'plain' - const coerceValue = (value: any, type: string) => { - if (value === undefined) return value - if (type === 'number') { - const n = Number(value) - return Number.isNaN(n) ? value : n - } - if (type === 'boolean') { - const v = String(value).trim().toLowerCase() - if (v === 'true') return true - if (v === 'false') return false - return value - } - if (type === 'array' || type === 'object') { - try { - const parsed = JSON.parse(String(value)) - if (type === 'array' && Array.isArray(parsed)) return parsed - if (type === 'object' && parsed && typeof parsed === 'object' && !Array.isArray(parsed)) - return parsed - } catch {} - return value - } - return value - } - - if (op.operation === 'delete') { - delete byName[key] - continue - } - const typedValue = coerceValue(op.value, nextType) - if (op.operation === 'add') { - byName[key] = { - id: crypto.randomUUID(), - workflowId, - name: key, - type: nextType, - value: typedValue, - } - continue - } - if (op.operation === 'edit') { - if (!byName[key]) { - byName[key] = { - id: crypto.randomUUID(), - workflowId, - name: key, - type: nextType, - value: typedValue, - } - } else { - byName[key] = { - ...byName[key], - type: nextType, - value: typedValue, - } - } - } - } - - const nextVarsRecord = Object.fromEntries( - Object.values(byName).map((v: any) => [String(v.id), v]) - ) - - await db - .update(workflow) - .set({ variables: nextVarsRecord, updatedAt: new Date() }) - .where(eq(workflow.id, workflowId)) - - return { success: true, output: { updated: Object.values(byName).length } } - } catch (error) { - return { success: false, error: error instanceof Error ? error.message : String(error) } - } -} - -async function executeDeployApi( - params: Record, - context: ExecutionContext -): Promise { - try { - const workflowId = params.workflowId || context.workflowId - if (!workflowId) { - return { success: false, error: 'workflowId is required' } - } - const action = params.action === 'undeploy' ? 'undeploy' : 'deploy' - const { workflow: workflowRecord } = await ensureWorkflowAccess(workflowId, context.userId) - - if (action === 'undeploy') { - const result = await undeployWorkflow({ workflowId }) - if (!result.success) { - return { success: false, error: result.error || 'Failed to undeploy workflow' } - } - return { success: true, output: { workflowId, isDeployed: false } } - } - - const result = await deployWorkflow({ - workflowId, - deployedBy: context.userId, - workflowName: workflowRecord.name || undefined, - }) - if (!result.success) { - return { success: false, error: result.error || 'Failed to deploy workflow' } - } - - return { - success: true, - output: { - workflowId, - isDeployed: true, - deployedAt: result.deployedAt, - version: result.version, - }, - } - } catch (error) { - return { success: false, error: error instanceof Error ? error.message : String(error) } - } -} - -async function executeDeployChat( - params: Record, - context: ExecutionContext -): Promise { - try { - const workflowId = params.workflowId || context.workflowId - if (!workflowId) { - return { success: false, error: 'workflowId is required' } - } - - const action = params.action === 'undeploy' ? 'undeploy' : 'deploy' - if (action === 'undeploy') { - const existing = await db.select().from(chat).where(eq(chat.workflowId, workflowId)).limit(1) - if (!existing.length) { - return { success: false, error: 'No active chat deployment found for this workflow' } - } - const { hasAccess } = await checkChatAccess(existing[0].id, context.userId) - if (!hasAccess) { - return { success: false, error: 'Unauthorized chat access' } - } - await db.delete(chat).where(eq(chat.id, existing[0].id)) - return { success: true, output: { success: true, action: 'undeploy', isDeployed: false } } - } - - const { hasAccess } = await checkWorkflowAccessForChatCreation(workflowId, context.userId) - if (!hasAccess) { - return { success: false, error: 'Workflow not found or access denied' } - } - - const existing = await db.select().from(chat).where(eq(chat.workflowId, workflowId)).limit(1) - const existingDeployment = existing[0] || null - - const identifier = String(params.identifier || existingDeployment?.identifier || '').trim() - const title = String(params.title || existingDeployment?.title || '').trim() - if (!identifier || !title) { - return { success: false, error: 'Chat identifier and title are required' } - } - - const identifierPattern = /^[a-z0-9-]+$/ - if (!identifierPattern.test(identifier)) { - return { - success: false, - error: 'Identifier can only contain lowercase letters, numbers, and hyphens', - } - } - - const existingIdentifier = await db - .select() - .from(chat) - .where(eq(chat.identifier, identifier)) - .limit(1) - if (existingIdentifier.length > 0 && existingIdentifier[0].id !== existingDeployment?.id) { - return { success: false, error: 'Identifier already in use' } - } - - const deployResult = await deployWorkflow({ - workflowId, - deployedBy: context.userId, - }) - if (!deployResult.success) { - return { success: false, error: deployResult.error || 'Failed to deploy workflow' } - } - - const payload = { - workflowId, - identifier, - title, - description: String(params.description || existingDeployment?.description || ''), - customizations: { - primaryColor: - params.customizations?.primaryColor || - existingDeployment?.customizations?.primaryColor || - 'var(--brand-primary-hover-hex)', - welcomeMessage: - params.customizations?.welcomeMessage || - existingDeployment?.customizations?.welcomeMessage || - 'Hi there! How can I help you today?', - }, - authType: params.authType || existingDeployment?.authType || 'public', - password: params.password, - allowedEmails: params.allowedEmails || existingDeployment?.allowedEmails || [], - outputConfigs: params.outputConfigs || existingDeployment?.outputConfigs || [], - } - - if (existingDeployment) { - await db - .update(chat) - .set({ - identifier: payload.identifier, - title: payload.title, - description: payload.description, - customizations: payload.customizations, - authType: payload.authType, - password: payload.password || existingDeployment.password, - allowedEmails: - payload.authType === 'email' || payload.authType === 'sso' ? payload.allowedEmails : [], - outputConfigs: payload.outputConfigs, - updatedAt: new Date(), - }) - .where(eq(chat.id, existingDeployment.id)) - } else { - await db.insert(chat).values({ - id: crypto.randomUUID(), - workflowId, - userId: context.userId, - identifier: payload.identifier, - title: payload.title, - description: payload.description, - customizations: payload.customizations, - isActive: true, - authType: payload.authType, - password: payload.password || null, - allowedEmails: - payload.authType === 'email' || payload.authType === 'sso' ? payload.allowedEmails : [], - outputConfigs: payload.outputConfigs, - createdAt: new Date(), - updatedAt: new Date(), - }) - } - - return { - success: true, - output: { success: true, action: 'deploy', isDeployed: true, identifier }, - } - } catch (error) { - return { success: false, error: error instanceof Error ? error.message : String(error) } - } -} - -async function executeDeployMcp( - params: Record, - context: ExecutionContext -): Promise { - try { - const workflowId = params.workflowId || context.workflowId - if (!workflowId) { - return { success: false, error: 'workflowId is required' } - } - - const { workflow: workflowRecord } = await ensureWorkflowAccess(workflowId, context.userId) - const workspaceId = workflowRecord.workspaceId - if (!workspaceId) { - return { success: false, error: 'workspaceId is required' } - } - - if (!workflowRecord.isDeployed) { - return { - success: false, - error: 'Workflow must be deployed before adding as an MCP tool. Use deploy_api first.', - } - } - - const serverId = params.serverId - if (!serverId) { - return { - success: false, - error: 'serverId is required. Use list_workspace_mcp_servers to get available servers.', - } - } - - const existingTool = await db - .select() - .from(workflowMcpTool) - .where( - and(eq(workflowMcpTool.serverId, serverId), eq(workflowMcpTool.workflowId, workflowId)) - ) - .limit(1) - - const toolName = sanitizeToolName( - params.toolName || workflowRecord.name || `workflow_${workflowId}` - ) - const toolDescription = - params.toolDescription || - workflowRecord.description || - `Execute ${workflowRecord.name} workflow` - const parameterSchema = params.parameterSchema || {} - - if (existingTool.length > 0) { - const toolId = existingTool[0].id - await db - .update(workflowMcpTool) - .set({ - toolName, - toolDescription, - parameterSchema, - updatedAt: new Date(), - }) - .where(eq(workflowMcpTool.id, toolId)) - return { success: true, output: { toolId, toolName, toolDescription, updated: true } } - } - - const toolId = crypto.randomUUID() - await db.insert(workflowMcpTool).values({ - id: toolId, - serverId, - workflowId, - toolName, - toolDescription, - parameterSchema, - createdAt: new Date(), - updatedAt: new Date(), - }) - - return { success: true, output: { toolId, toolName, toolDescription, updated: false } } - } catch (error) { - return { success: false, error: error instanceof Error ? error.message : String(error) } - } -} - -async function executeRedeploy(context: ExecutionContext): Promise { - try { - const workflowId = context.workflowId - if (!workflowId) { - return { success: false, error: 'workflowId is required' } - } - await ensureWorkflowAccess(workflowId, context.userId) - - const result = await deployWorkflow({ workflowId, deployedBy: context.userId }) - if (!result.success) { - return { success: false, error: result.error || 'Failed to redeploy workflow' } - } - return { - success: true, - output: { workflowId, deployedAt: result.deployedAt || null, version: result.version }, - } - } catch (error) { - return { success: false, error: error instanceof Error ? error.message : String(error) } - } -} - -async function executeCheckDeploymentStatus( - params: Record, - context: ExecutionContext -): Promise { - try { - const workflowId = params.workflowId || context.workflowId - if (!workflowId) { - return { success: false, error: 'workflowId is required' } - } - const { workflow: workflowRecord } = await ensureWorkflowAccess(workflowId, context.userId) - const workspaceId = workflowRecord.workspaceId - - const [apiDeploy, chatDeploy] = await Promise.all([ - db.select().from(workflow).where(eq(workflow.id, workflowId)).limit(1), - db.select().from(chat).where(eq(chat.workflowId, workflowId)).limit(1), - ]) - - const isApiDeployed = apiDeploy[0]?.isDeployed || false - const apiDetails = { - isDeployed: isApiDeployed, - deployedAt: apiDeploy[0]?.deployedAt || null, - endpoint: isApiDeployed ? `/api/workflows/${workflowId}/execute` : null, - apiKey: workflowRecord.workspaceId ? 'Workspace API keys' : 'Personal API keys', - needsRedeployment: false, - } - - const isChatDeployed = !!chatDeploy[0] - const chatDetails = { - isDeployed: isChatDeployed, - chatId: chatDeploy[0]?.id || null, - identifier: chatDeploy[0]?.identifier || null, - chatUrl: isChatDeployed ? `/chat/${chatDeploy[0]?.identifier}` : null, - title: chatDeploy[0]?.title || null, - description: chatDeploy[0]?.description || null, - authType: chatDeploy[0]?.authType || null, - allowedEmails: chatDeploy[0]?.allowedEmails || null, - outputConfigs: chatDeploy[0]?.outputConfigs || null, - welcomeMessage: chatDeploy[0]?.customizations?.welcomeMessage || null, - primaryColor: chatDeploy[0]?.customizations?.primaryColor || null, - hasPassword: Boolean(chatDeploy[0]?.password), - } - - const mcpDetails = { isDeployed: false, servers: [] as any[] } - if (workspaceId) { - const servers = await db - .select({ - serverId: workflowMcpServer.id, - serverName: workflowMcpServer.name, - toolName: workflowMcpTool.toolName, - toolDescription: workflowMcpTool.toolDescription, - parameterSchema: workflowMcpTool.parameterSchema, - toolId: workflowMcpTool.id, - }) - .from(workflowMcpTool) - .innerJoin(workflowMcpServer, eq(workflowMcpTool.serverId, workflowMcpServer.id)) - .where(eq(workflowMcpTool.workflowId, workflowId)) - - if (servers.length > 0) { - mcpDetails.isDeployed = true - mcpDetails.servers = servers - } - } - - const isDeployed = apiDetails.isDeployed || chatDetails.isDeployed || mcpDetails.isDeployed - return { - success: true, - output: { isDeployed, api: apiDetails, chat: chatDetails, mcp: mcpDetails }, - } - } catch (error) { - return { success: false, error: error instanceof Error ? error.message : String(error) } - } -} - -async function executeListWorkspaceMcpServers( - params: Record, - context: ExecutionContext -): Promise { - try { - const workflowId = params.workflowId || context.workflowId - if (!workflowId) { - return { success: false, error: 'workflowId is required' } - } - const { workflow: workflowRecord } = await ensureWorkflowAccess(workflowId, context.userId) - const workspaceId = workflowRecord.workspaceId - if (!workspaceId) { - return { success: false, error: 'workspaceId is required' } - } - - const servers = await db - .select({ - id: workflowMcpServer.id, - name: workflowMcpServer.name, - description: workflowMcpServer.description, - }) - .from(workflowMcpServer) - .where(eq(workflowMcpServer.workspaceId, workspaceId)) - - const serverIds = servers.map((server) => server.id) - const tools = - serverIds.length > 0 - ? await db - .select({ - serverId: workflowMcpTool.serverId, - toolName: workflowMcpTool.toolName, - }) - .from(workflowMcpTool) - .where(inArray(workflowMcpTool.serverId, serverIds)) - : [] - - const toolNamesByServer: Record = {} - for (const tool of tools) { - if (!toolNamesByServer[tool.serverId]) { - toolNamesByServer[tool.serverId] = [] - } - toolNamesByServer[tool.serverId].push(tool.toolName) - } - - const serversWithToolNames = servers.map((server) => ({ - ...server, - toolCount: toolNamesByServer[server.id]?.length || 0, - toolNames: toolNamesByServer[server.id] || [], - })) - - return { success: true, output: { servers: serversWithToolNames, count: servers.length } } - } catch (error) { - return { success: false, error: error instanceof Error ? error.message : String(error) } - } -} - -async function executeCreateWorkspaceMcpServer( - params: Record, - context: ExecutionContext -): Promise { - try { - const workflowId = params.workflowId || context.workflowId - if (!workflowId) { - return { success: false, error: 'workflowId is required' } - } - const { workflow: workflowRecord } = await ensureWorkflowAccess(workflowId, context.userId) - const workspaceId = workflowRecord.workspaceId - if (!workspaceId) { - return { success: false, error: 'workspaceId is required' } - } - - const name = params.name?.trim() - if (!name) { - return { success: false, error: 'name is required' } - } - - const serverId = crypto.randomUUID() - const [server] = await db - .insert(workflowMcpServer) - .values({ - id: serverId, - workspaceId, - createdBy: context.userId, - name, - description: params.description?.trim() || null, - isPublic: params.isPublic ?? false, - createdAt: new Date(), - updatedAt: new Date(), - }) - .returning() - - const workflowIds: string[] = params.workflowIds || [] - const addedTools: Array<{ workflowId: string; toolName: string }> = [] - - if (workflowIds.length > 0) { - const workflows = await db.select().from(workflow).where(inArray(workflow.id, workflowIds)) - - for (const wf of workflows) { - if (wf.workspaceId !== workspaceId || !wf.isDeployed) { - continue - } - const hasStartBlock = await hasValidStartBlock(wf.id) - if (!hasStartBlock) { - continue - } - const toolName = sanitizeToolName(wf.name || `workflow_${wf.id}`) - await db.insert(workflowMcpTool).values({ - id: crypto.randomUUID(), - serverId, - workflowId: wf.id, - toolName, - toolDescription: wf.description || `Execute ${wf.name} workflow`, - parameterSchema: {}, - createdAt: new Date(), - updatedAt: new Date(), - }) - addedTools.push({ workflowId: wf.id, toolName }) - } - } - - return { success: true, output: { server, addedTools } } - } catch (error) { - return { success: false, error: error instanceof Error ? error.message : String(error) } - } -} - -async function getWorkflowVariablesForTool( - workflowId: string -): Promise> { - const [workflowRecord] = await db - .select({ variables: workflow.variables }) - .from(workflow) - .where(eq(workflow.id, workflowId)) - .limit(1) - - const variablesRecord = (workflowRecord?.variables as Record) || {} - return Object.values(variablesRecord) - .filter((v: any) => v?.name && String(v.name).trim() !== '') - .map((v: any) => ({ - id: String(v.id || ''), - name: String(v.name || ''), - type: String(v.type || 'plain'), - tag: `variable.${normalizeName(String(v.name || ''))}`, - })) -} - -function getSubflowInsidePaths( - blockType: 'loop' | 'parallel', - blockId: string, - loops: Record, - parallels: Record -): string[] { - const paths = ['index'] - if (blockType === 'loop') { - const loopType = loops[blockId]?.loopType || 'for' - if (loopType === 'forEach') { - paths.push('currentItem', 'items') - } - } else { - const parallelType = parallels[blockId]?.parallelType || 'count' - if (parallelType === 'collection') { - paths.push('currentItem', 'items') - } - } - return paths -} - -function formatOutputsWithPrefix(paths: string[], blockName: string): string[] { - const normalizedName = normalizeName(blockName) - return paths.map((path) => `${normalizedName}.${path}`) -} - /** * Notify the copilot backend that a tool has completed. */ diff --git a/apps/sim/lib/copilot/orchestrator/tool-executor/access.ts b/apps/sim/lib/copilot/orchestrator/tool-executor/access.ts new file mode 100644 index 000000000..0f3f32492 --- /dev/null +++ b/apps/sim/lib/copilot/orchestrator/tool-executor/access.ts @@ -0,0 +1,130 @@ +import { db } from '@sim/db' +import { permissions, workflow, workspace } from '@sim/db/schema' +import { and, asc, desc, eq, inArray, or } from 'drizzle-orm' + +type WorkflowRecord = typeof workflow.$inferSelect + +export async function ensureWorkflowAccess( + workflowId: string, + userId: string +): Promise<{ + workflow: WorkflowRecord + workspaceId?: string | null +}> { + const [workflowRecord] = await db + .select() + .from(workflow) + .where(eq(workflow.id, workflowId)) + .limit(1) + if (!workflowRecord) { + throw new Error(`Workflow ${workflowId} not found`) + } + + if (workflowRecord.userId === userId) { + return { workflow: workflowRecord, workspaceId: workflowRecord.workspaceId } + } + + if (workflowRecord.workspaceId) { + const [permissionRow] = await db + .select({ permissionType: permissions.permissionType }) + .from(permissions) + .where( + and( + eq(permissions.entityType, 'workspace'), + eq(permissions.entityId, workflowRecord.workspaceId), + eq(permissions.userId, userId) + ) + ) + .limit(1) + if (permissionRow) { + return { workflow: workflowRecord, workspaceId: workflowRecord.workspaceId } + } + } + + throw new Error('Unauthorized workflow access') +} + +export async function getDefaultWorkspaceId(userId: string): Promise { + const workspaces = await db + .select({ workspaceId: workspace.id }) + .from(permissions) + .innerJoin(workspace, eq(permissions.entityId, workspace.id)) + .where(and(eq(permissions.userId, userId), eq(permissions.entityType, 'workspace'))) + .orderBy(desc(workspace.createdAt)) + .limit(1) + + const workspaceId = workspaces[0]?.workspaceId + if (!workspaceId) { + throw new Error('No workspace found for user') + } + + return workspaceId +} + +export async function ensureWorkspaceAccess( + workspaceId: string, + userId: string, + requireWrite: boolean +): Promise { + const [row] = await db + .select({ + permissionType: permissions.permissionType, + ownerId: workspace.ownerId, + }) + .from(permissions) + .innerJoin(workspace, eq(permissions.entityId, workspace.id)) + .where( + and( + eq(permissions.entityType, 'workspace'), + eq(permissions.entityId, workspaceId), + eq(permissions.userId, userId) + ) + ) + .limit(1) + + if (!row) { + throw new Error(`Workspace ${workspaceId} not found`) + } + + const isOwner = row.ownerId === userId + const permissionType = row.permissionType + const canWrite = isOwner || permissionType === 'admin' || permissionType === 'write' + + if (requireWrite && !canWrite) { + throw new Error('Write or admin access required for this workspace') + } + + if (!requireWrite && !canWrite && permissionType !== 'read') { + throw new Error('Access denied to workspace') + } +} + +export async function getAccessibleWorkflowsForUser( + userId: string, + options?: { workspaceId?: string; folderId?: string } +) { + const workspaceIds = await db + .select({ entityId: permissions.entityId }) + .from(permissions) + .where(and(eq(permissions.userId, userId), eq(permissions.entityType, 'workspace'))) + + const workspaceIdList = workspaceIds.map((row) => row.entityId) + + const workflowConditions = [eq(workflow.userId, userId)] + if (workspaceIdList.length > 0) { + workflowConditions.push(inArray(workflow.workspaceId, workspaceIdList)) + } + if (options?.workspaceId) { + workflowConditions.push(eq(workflow.workspaceId, options.workspaceId)) + } + if (options?.folderId) { + workflowConditions.push(eq(workflow.folderId, options.folderId)) + } + + return db + .select() + .from(workflow) + .where(or(...workflowConditions)) + .orderBy(asc(workflow.sortOrder), asc(workflow.createdAt), asc(workflow.id)) +} + diff --git a/apps/sim/lib/copilot/orchestrator/tool-executor/deployment-tools.ts b/apps/sim/lib/copilot/orchestrator/tool-executor/deployment-tools.ts new file mode 100644 index 000000000..fdc962382 --- /dev/null +++ b/apps/sim/lib/copilot/orchestrator/tool-executor/deployment-tools.ts @@ -0,0 +1,479 @@ +import crypto from 'crypto' +import { db } from '@sim/db' +import { chat, workflow, workflowMcpServer, workflowMcpTool } from '@sim/db/schema' +import { and, eq, inArray } from 'drizzle-orm' +import type { ExecutionContext, ToolCallResult } from '@/lib/copilot/orchestrator/types' +import { sanitizeToolName } from '@/lib/mcp/workflow-tool-schema' +import { deployWorkflow, undeployWorkflow } from '@/lib/workflows/persistence/utils' +import { hasValidStartBlock } from '@/lib/workflows/triggers/trigger-utils.server' +import { checkChatAccess, checkWorkflowAccessForChatCreation } from '@/app/api/chat/utils' +import { ensureWorkflowAccess } from './access' + +export async function executeDeployApi( + params: Record, + context: ExecutionContext +): Promise { + try { + const workflowId = params.workflowId || context.workflowId + if (!workflowId) { + return { success: false, error: 'workflowId is required' } + } + const action = params.action === 'undeploy' ? 'undeploy' : 'deploy' + const { workflow: workflowRecord } = await ensureWorkflowAccess(workflowId, context.userId) + + if (action === 'undeploy') { + const result = await undeployWorkflow({ workflowId }) + if (!result.success) { + return { success: false, error: result.error || 'Failed to undeploy workflow' } + } + return { success: true, output: { workflowId, isDeployed: false } } + } + + const result = await deployWorkflow({ + workflowId, + deployedBy: context.userId, + workflowName: workflowRecord.name || undefined, + }) + if (!result.success) { + return { success: false, error: result.error || 'Failed to deploy workflow' } + } + + return { + success: true, + output: { + workflowId, + isDeployed: true, + deployedAt: result.deployedAt, + version: result.version, + }, + } + } catch (error) { + return { success: false, error: error instanceof Error ? error.message : String(error) } + } +} + +export async function executeDeployChat( + params: Record, + context: ExecutionContext +): Promise { + try { + const workflowId = params.workflowId || context.workflowId + if (!workflowId) { + return { success: false, error: 'workflowId is required' } + } + + const action = params.action === 'undeploy' ? 'undeploy' : 'deploy' + if (action === 'undeploy') { + const existing = await db.select().from(chat).where(eq(chat.workflowId, workflowId)).limit(1) + if (!existing.length) { + return { success: false, error: 'No active chat deployment found for this workflow' } + } + const { hasAccess } = await checkChatAccess(existing[0].id, context.userId) + if (!hasAccess) { + return { success: false, error: 'Unauthorized chat access' } + } + await db.delete(chat).where(eq(chat.id, existing[0].id)) + return { success: true, output: { success: true, action: 'undeploy', isDeployed: false } } + } + + const { hasAccess } = await checkWorkflowAccessForChatCreation(workflowId, context.userId) + if (!hasAccess) { + return { success: false, error: 'Workflow not found or access denied' } + } + + const existing = await db.select().from(chat).where(eq(chat.workflowId, workflowId)).limit(1) + const existingDeployment = existing[0] || null + + const identifier = String(params.identifier || existingDeployment?.identifier || '').trim() + const title = String(params.title || existingDeployment?.title || '').trim() + if (!identifier || !title) { + return { success: false, error: 'Chat identifier and title are required' } + } + + const identifierPattern = /^[a-z0-9-]+$/ + if (!identifierPattern.test(identifier)) { + return { + success: false, + error: 'Identifier can only contain lowercase letters, numbers, and hyphens', + } + } + + const existingIdentifier = await db + .select() + .from(chat) + .where(eq(chat.identifier, identifier)) + .limit(1) + if (existingIdentifier.length > 0 && existingIdentifier[0].id !== existingDeployment?.id) { + return { success: false, error: 'Identifier already in use' } + } + + const deployResult = await deployWorkflow({ + workflowId, + deployedBy: context.userId, + }) + if (!deployResult.success) { + return { success: false, error: deployResult.error || 'Failed to deploy workflow' } + } + + const payload = { + workflowId, + identifier, + title, + description: String(params.description || existingDeployment?.description || ''), + customizations: { + primaryColor: + params.customizations?.primaryColor || + existingDeployment?.customizations?.primaryColor || + 'var(--brand-primary-hover-hex)', + welcomeMessage: + params.customizations?.welcomeMessage || + existingDeployment?.customizations?.welcomeMessage || + 'Hi there! How can I help you today?', + }, + authType: params.authType || existingDeployment?.authType || 'public', + password: params.password, + allowedEmails: params.allowedEmails || existingDeployment?.allowedEmails || [], + outputConfigs: params.outputConfigs || existingDeployment?.outputConfigs || [], + } + + if (existingDeployment) { + await db + .update(chat) + .set({ + identifier: payload.identifier, + title: payload.title, + description: payload.description, + customizations: payload.customizations, + authType: payload.authType, + password: payload.password || existingDeployment.password, + allowedEmails: + payload.authType === 'email' || payload.authType === 'sso' ? payload.allowedEmails : [], + outputConfigs: payload.outputConfigs, + updatedAt: new Date(), + }) + .where(eq(chat.id, existingDeployment.id)) + } else { + await db.insert(chat).values({ + id: crypto.randomUUID(), + workflowId, + userId: context.userId, + identifier: payload.identifier, + title: payload.title, + description: payload.description, + customizations: payload.customizations, + isActive: true, + authType: payload.authType, + password: payload.password || null, + allowedEmails: + payload.authType === 'email' || payload.authType === 'sso' ? payload.allowedEmails : [], + outputConfigs: payload.outputConfigs, + createdAt: new Date(), + updatedAt: new Date(), + }) + } + + return { + success: true, + output: { success: true, action: 'deploy', isDeployed: true, identifier }, + } + } catch (error) { + return { success: false, error: error instanceof Error ? error.message : String(error) } + } +} + +export async function executeDeployMcp( + params: Record, + context: ExecutionContext +): Promise { + try { + const workflowId = params.workflowId || context.workflowId + if (!workflowId) { + return { success: false, error: 'workflowId is required' } + } + + const { workflow: workflowRecord } = await ensureWorkflowAccess(workflowId, context.userId) + const workspaceId = workflowRecord.workspaceId + if (!workspaceId) { + return { success: false, error: 'workspaceId is required' } + } + + if (!workflowRecord.isDeployed) { + return { + success: false, + error: 'Workflow must be deployed before adding as an MCP tool. Use deploy_api first.', + } + } + + const serverId = params.serverId + if (!serverId) { + return { + success: false, + error: 'serverId is required. Use list_workspace_mcp_servers to get available servers.', + } + } + + const existingTool = await db + .select() + .from(workflowMcpTool) + .where(and(eq(workflowMcpTool.serverId, serverId), eq(workflowMcpTool.workflowId, workflowId))) + .limit(1) + + const toolName = sanitizeToolName( + params.toolName || workflowRecord.name || `workflow_${workflowId}` + ) + const toolDescription = + params.toolDescription || workflowRecord.description || `Execute ${workflowRecord.name} workflow` + const parameterSchema = params.parameterSchema || {} + + if (existingTool.length > 0) { + const toolId = existingTool[0].id + await db + .update(workflowMcpTool) + .set({ + toolName, + toolDescription, + parameterSchema, + updatedAt: new Date(), + }) + .where(eq(workflowMcpTool.id, toolId)) + return { success: true, output: { toolId, toolName, toolDescription, updated: true } } + } + + const toolId = crypto.randomUUID() + await db.insert(workflowMcpTool).values({ + id: toolId, + serverId, + workflowId, + toolName, + toolDescription, + parameterSchema, + createdAt: new Date(), + updatedAt: new Date(), + }) + + return { success: true, output: { toolId, toolName, toolDescription, updated: false } } + } catch (error) { + return { success: false, error: error instanceof Error ? error.message : String(error) } + } +} + +export async function executeRedeploy(context: ExecutionContext): Promise { + try { + const workflowId = context.workflowId + if (!workflowId) { + return { success: false, error: 'workflowId is required' } + } + await ensureWorkflowAccess(workflowId, context.userId) + + const result = await deployWorkflow({ workflowId, deployedBy: context.userId }) + if (!result.success) { + return { success: false, error: result.error || 'Failed to redeploy workflow' } + } + return { + success: true, + output: { workflowId, deployedAt: result.deployedAt || null, version: result.version }, + } + } catch (error) { + return { success: false, error: error instanceof Error ? error.message : String(error) } + } +} + +export async function executeCheckDeploymentStatus( + params: Record, + context: ExecutionContext +): Promise { + try { + const workflowId = params.workflowId || context.workflowId + if (!workflowId) { + return { success: false, error: 'workflowId is required' } + } + const { workflow: workflowRecord } = await ensureWorkflowAccess(workflowId, context.userId) + const workspaceId = workflowRecord.workspaceId + + const [apiDeploy, chatDeploy] = await Promise.all([ + db.select().from(workflow).where(eq(workflow.id, workflowId)).limit(1), + db.select().from(chat).where(eq(chat.workflowId, workflowId)).limit(1), + ]) + + const isApiDeployed = apiDeploy[0]?.isDeployed || false + const apiDetails = { + isDeployed: isApiDeployed, + deployedAt: apiDeploy[0]?.deployedAt || null, + endpoint: isApiDeployed ? `/api/workflows/${workflowId}/execute` : null, + apiKey: workflowRecord.workspaceId ? 'Workspace API keys' : 'Personal API keys', + needsRedeployment: false, + } + + const isChatDeployed = !!chatDeploy[0] + const chatDetails = { + isDeployed: isChatDeployed, + chatId: chatDeploy[0]?.id || null, + identifier: chatDeploy[0]?.identifier || null, + chatUrl: isChatDeployed ? `/chat/${chatDeploy[0]?.identifier}` : null, + title: chatDeploy[0]?.title || null, + description: chatDeploy[0]?.description || null, + authType: chatDeploy[0]?.authType || null, + allowedEmails: chatDeploy[0]?.allowedEmails || null, + outputConfigs: chatDeploy[0]?.outputConfigs || null, + welcomeMessage: chatDeploy[0]?.customizations?.welcomeMessage || null, + primaryColor: chatDeploy[0]?.customizations?.primaryColor || null, + hasPassword: Boolean(chatDeploy[0]?.password), + } + + const mcpDetails = { isDeployed: false, servers: [] as any[] } + if (workspaceId) { + const servers = await db + .select({ + serverId: workflowMcpServer.id, + serverName: workflowMcpServer.name, + toolName: workflowMcpTool.toolName, + toolDescription: workflowMcpTool.toolDescription, + parameterSchema: workflowMcpTool.parameterSchema, + toolId: workflowMcpTool.id, + }) + .from(workflowMcpTool) + .innerJoin(workflowMcpServer, eq(workflowMcpTool.serverId, workflowMcpServer.id)) + .where(eq(workflowMcpTool.workflowId, workflowId)) + + if (servers.length > 0) { + mcpDetails.isDeployed = true + mcpDetails.servers = servers + } + } + + const isDeployed = apiDetails.isDeployed || chatDetails.isDeployed || mcpDetails.isDeployed + return { + success: true, + output: { isDeployed, api: apiDetails, chat: chatDetails, mcp: mcpDetails }, + } + } catch (error) { + return { success: false, error: error instanceof Error ? error.message : String(error) } + } +} + +export async function executeListWorkspaceMcpServers( + params: Record, + context: ExecutionContext +): Promise { + try { + const workflowId = params.workflowId || context.workflowId + if (!workflowId) { + return { success: false, error: 'workflowId is required' } + } + const { workflow: workflowRecord } = await ensureWorkflowAccess(workflowId, context.userId) + const workspaceId = workflowRecord.workspaceId + if (!workspaceId) { + return { success: false, error: 'workspaceId is required' } + } + + const servers = await db + .select({ + id: workflowMcpServer.id, + name: workflowMcpServer.name, + description: workflowMcpServer.description, + }) + .from(workflowMcpServer) + .where(eq(workflowMcpServer.workspaceId, workspaceId)) + + const serverIds = servers.map((server) => server.id) + const tools = + serverIds.length > 0 + ? await db + .select({ + serverId: workflowMcpTool.serverId, + toolName: workflowMcpTool.toolName, + }) + .from(workflowMcpTool) + .where(inArray(workflowMcpTool.serverId, serverIds)) + : [] + + const toolNamesByServer: Record = {} + for (const tool of tools) { + if (!toolNamesByServer[tool.serverId]) { + toolNamesByServer[tool.serverId] = [] + } + toolNamesByServer[tool.serverId].push(tool.toolName) + } + + const serversWithToolNames = servers.map((server) => ({ + ...server, + toolCount: toolNamesByServer[server.id]?.length || 0, + toolNames: toolNamesByServer[server.id] || [], + })) + + return { success: true, output: { servers: serversWithToolNames, count: servers.length } } + } catch (error) { + return { success: false, error: error instanceof Error ? error.message : String(error) } + } +} + +export async function executeCreateWorkspaceMcpServer( + params: Record, + context: ExecutionContext +): Promise { + try { + const workflowId = params.workflowId || context.workflowId + if (!workflowId) { + return { success: false, error: 'workflowId is required' } + } + const { workflow: workflowRecord } = await ensureWorkflowAccess(workflowId, context.userId) + const workspaceId = workflowRecord.workspaceId + if (!workspaceId) { + return { success: false, error: 'workspaceId is required' } + } + + const name = params.name?.trim() + if (!name) { + return { success: false, error: 'name is required' } + } + + const serverId = crypto.randomUUID() + const [server] = await db + .insert(workflowMcpServer) + .values({ + id: serverId, + workspaceId, + createdBy: context.userId, + name, + description: params.description?.trim() || null, + isPublic: params.isPublic ?? false, + createdAt: new Date(), + updatedAt: new Date(), + }) + .returning() + + const workflowIds: string[] = params.workflowIds || [] + const addedTools: Array<{ workflowId: string; toolName: string }> = [] + + if (workflowIds.length > 0) { + const workflows = await db.select().from(workflow).where(inArray(workflow.id, workflowIds)) + + for (const wf of workflows) { + if (wf.workspaceId !== workspaceId || !wf.isDeployed) { + continue + } + const hasStartBlock = await hasValidStartBlock(wf.id) + if (!hasStartBlock) { + continue + } + const toolName = sanitizeToolName(wf.name || `workflow_${wf.id}`) + await db.insert(workflowMcpTool).values({ + id: crypto.randomUUID(), + serverId, + workflowId: wf.id, + toolName, + toolDescription: wf.description || `Execute ${wf.name} workflow`, + parameterSchema: {}, + createdAt: new Date(), + updatedAt: new Date(), + }) + addedTools.push({ workflowId: wf.id, toolName }) + } + } + + return { success: true, output: { server, addedTools } } + } catch (error) { + return { success: false, error: error instanceof Error ? error.message : String(error) } + } +} + diff --git a/apps/sim/lib/copilot/orchestrator/tool-executor/integration-tools.ts b/apps/sim/lib/copilot/orchestrator/tool-executor/integration-tools.ts new file mode 100644 index 000000000..44a10d7af --- /dev/null +++ b/apps/sim/lib/copilot/orchestrator/tool-executor/integration-tools.ts @@ -0,0 +1,100 @@ +import { db } from '@sim/db' +import { account, workflow } from '@sim/db/schema' +import { and, eq } from 'drizzle-orm' +import type { + ExecutionContext, + ToolCallResult, + ToolCallState, +} from '@/lib/copilot/orchestrator/types' +import { generateRequestId } from '@/lib/core/utils/request' +import { getEffectiveDecryptedEnv } from '@/lib/environment/utils' +import { refreshTokenIfNeeded } from '@/app/api/auth/oauth/utils' +import { resolveEnvVarReferences } from '@/executor/utils/reference-validation' +import { executeTool } from '@/tools' +import { resolveToolId } from '@/tools/utils' + +export async function executeIntegrationToolDirect( + toolCall: ToolCallState, + toolConfig: any, + context: ExecutionContext +): Promise { + const { userId, workflowId } = context + const toolName = resolveToolId(toolCall.name) + const toolArgs = toolCall.params || {} + + let workspaceId = context.workspaceId + if (!workspaceId && workflowId) { + const workflowResult = await db + .select({ workspaceId: workflow.workspaceId }) + .from(workflow) + .where(eq(workflow.id, workflowId)) + .limit(1) + workspaceId = workflowResult[0]?.workspaceId ?? undefined + } + + const decryptedEnvVars = + context.decryptedEnvVars || (await getEffectiveDecryptedEnv(userId, workspaceId)) + + const executionParams: Record = resolveEnvVarReferences(toolArgs, decryptedEnvVars, { + deep: true, + }) as Record + + if (toolConfig.oauth?.required && toolConfig.oauth.provider) { + const provider = toolConfig.oauth.provider + const accounts = await db + .select() + .from(account) + .where(and(eq(account.providerId, provider), eq(account.userId, userId))) + .limit(1) + + if (!accounts.length) { + return { + success: false, + error: `No ${provider} account connected. Please connect your account first.`, + } + } + + const acc = accounts[0] + const requestId = generateRequestId() + const { accessToken } = await refreshTokenIfNeeded(requestId, acc as any, acc.id) + + if (!accessToken) { + return { + success: false, + error: `OAuth token not available for ${provider}. Please reconnect your account.`, + } + } + + executionParams.accessToken = accessToken + } + + if (toolConfig.params?.apiKey?.required && !executionParams.apiKey) { + return { + success: false, + error: `API key not provided for ${toolName}. Use {{YOUR_API_KEY_ENV_VAR}} to reference your environment variable.`, + } + } + + executionParams._context = { + workflowId, + userId, + } + + if (toolName === 'function_execute') { + executionParams.envVars = decryptedEnvVars + executionParams.workflowVariables = {} + executionParams.blockData = {} + executionParams.blockNameMapping = {} + executionParams.language = executionParams.language || 'javascript' + executionParams.timeout = executionParams.timeout || 30000 + } + + const result = await executeTool(toolName, executionParams) + + return { + success: result.success, + output: result.output, + error: result.error, + } +} + diff --git a/apps/sim/lib/copilot/orchestrator/tool-executor/workflow-tools.ts b/apps/sim/lib/copilot/orchestrator/tool-executor/workflow-tools.ts new file mode 100644 index 000000000..0adc1a768 --- /dev/null +++ b/apps/sim/lib/copilot/orchestrator/tool-executor/workflow-tools.ts @@ -0,0 +1,769 @@ +import crypto from 'crypto' +import { db } from '@sim/db' +import { customTools, permissions, workflow, workflowFolder, workspace } from '@sim/db/schema' +import { and, asc, desc, eq, inArray, isNull, max, or } from 'drizzle-orm' +import type { ExecutionContext, ToolCallResult } from '@/lib/copilot/orchestrator/types' +import { + extractWorkflowNames, + formatNormalizedWorkflowForCopilot, + normalizeWorkflowName, +} from '@/lib/copilot/tools/shared/workflow-utils' +import { generateRequestId } from '@/lib/core/utils/request' +import { mcpService } from '@/lib/mcp/service' +import { listWorkspaceFiles } from '@/lib/uploads/contexts/workspace' +import { getBlockOutputPaths } from '@/lib/workflows/blocks/block-outputs' +import { BlockPathCalculator } from '@/lib/workflows/blocks/block-path-calculator' +import { buildDefaultWorkflowArtifacts } from '@/lib/workflows/defaults' +import { executeWorkflow } from '@/lib/workflows/executor/execute-workflow' +import { + loadWorkflowFromNormalizedTables, + saveWorkflowToNormalizedTables, +} from '@/lib/workflows/persistence/utils' +import { isInputDefinitionTrigger } from '@/lib/workflows/triggers/input-definition-triggers' +import { ensureWorkflowAccess, ensureWorkspaceAccess, getAccessibleWorkflowsForUser, getDefaultWorkspaceId } from './access' +import { normalizeName } from '@/executor/constants' + +export async function executeGetUserWorkflow( + params: Record, + context: ExecutionContext +): Promise { + try { + const workflowId = params.workflowId || context.workflowId + if (!workflowId) { + return { success: false, error: 'workflowId is required' } + } + + const { workflow: workflowRecord, workspaceId } = await ensureWorkflowAccess( + workflowId, + context.userId + ) + + const normalized = await loadWorkflowFromNormalizedTables(workflowId) + const userWorkflow = formatNormalizedWorkflowForCopilot(normalized) + if (!userWorkflow) { + return { success: false, error: 'Workflow has no normalized data' } + } + + return { + success: true, + output: { + workflowId, + workflowName: workflowRecord.name || '', + workspaceId, + userWorkflow, + }, + } + } catch (error) { + return { success: false, error: error instanceof Error ? error.message : String(error) } + } +} + +export async function executeGetWorkflowFromName( + params: Record, + context: ExecutionContext +): Promise { + try { + const workflowName = typeof params.workflow_name === 'string' ? params.workflow_name.trim() : '' + if (!workflowName) { + return { success: false, error: 'workflow_name is required' } + } + + const workflows = await getAccessibleWorkflowsForUser(context.userId) + + const targetName = normalizeWorkflowName(workflowName) + const match = workflows.find((w) => normalizeWorkflowName(w.name) === targetName) + if (!match) { + return { success: false, error: `Workflow not found: ${workflowName}` } + } + + const normalized = await loadWorkflowFromNormalizedTables(match.id) + const userWorkflow = formatNormalizedWorkflowForCopilot(normalized) + if (!userWorkflow) { + return { success: false, error: 'Workflow has no normalized data' } + } + + return { + success: true, + output: { + workflowId: match.id, + workflowName: match.name || '', + workspaceId: match.workspaceId, + userWorkflow, + }, + } + } catch (error) { + return { success: false, error: error instanceof Error ? error.message : String(error) } + } +} + +export async function executeListUserWorkflows( + params: Record, + context: ExecutionContext +): Promise { + try { + const workspaceId = params?.workspaceId as string | undefined + const folderId = params?.folderId as string | undefined + + const workflows = await getAccessibleWorkflowsForUser(context.userId, { workspaceId, folderId }) + + const names = extractWorkflowNames(workflows) + + const workflowList = workflows.map((w) => ({ + workflowId: w.id, + workflowName: w.name || '', + workspaceId: w.workspaceId, + folderId: w.folderId, + })) + + return { success: true, output: { workflow_names: names, workflows: workflowList } } + } catch (error) { + return { success: false, error: error instanceof Error ? error.message : String(error) } + } +} + +export async function executeListUserWorkspaces(context: ExecutionContext): Promise { + try { + const workspaces = await db + .select({ + workspaceId: workspace.id, + workspaceName: workspace.name, + ownerId: workspace.ownerId, + permissionType: permissions.permissionType, + }) + .from(permissions) + .innerJoin(workspace, eq(permissions.entityId, workspace.id)) + .where(and(eq(permissions.userId, context.userId), eq(permissions.entityType, 'workspace'))) + .orderBy(desc(workspace.createdAt)) + + const output = workspaces.map((row) => ({ + workspaceId: row.workspaceId, + workspaceName: row.workspaceName, + role: row.ownerId === context.userId ? 'owner' : row.permissionType, + })) + + return { success: true, output: { workspaces: output } } + } catch (error) { + return { success: false, error: error instanceof Error ? error.message : String(error) } + } +} + +export async function executeListFolders( + params: Record, + context: ExecutionContext +): Promise { + try { + const workspaceId = + (params?.workspaceId as string | undefined) || (await getDefaultWorkspaceId(context.userId)) + + await ensureWorkspaceAccess(workspaceId, context.userId, false) + + const folders = await db + .select({ + folderId: workflowFolder.id, + folderName: workflowFolder.name, + parentId: workflowFolder.parentId, + sortOrder: workflowFolder.sortOrder, + }) + .from(workflowFolder) + .where(eq(workflowFolder.workspaceId, workspaceId)) + .orderBy(asc(workflowFolder.sortOrder), asc(workflowFolder.createdAt)) + + return { + success: true, + output: { + workspaceId, + folders, + }, + } + } catch (error) { + return { success: false, error: error instanceof Error ? error.message : String(error) } + } +} + +export async function executeCreateWorkflow( + params: Record, + context: ExecutionContext +): Promise { + try { + const name = typeof params?.name === 'string' ? params.name.trim() : '' + if (!name) { + return { success: false, error: 'name is required' } + } + + const workspaceId = params?.workspaceId || (await getDefaultWorkspaceId(context.userId)) + const folderId = params?.folderId || null + const description = typeof params?.description === 'string' ? params.description : null + + await ensureWorkspaceAccess(workspaceId, context.userId, true) + + const workflowId = crypto.randomUUID() + const now = new Date() + + const folderCondition = folderId ? eq(workflow.folderId, folderId) : isNull(workflow.folderId) + const [maxResult] = await db + .select({ maxOrder: max(workflow.sortOrder) }) + .from(workflow) + .where(and(eq(workflow.workspaceId, workspaceId), folderCondition)) + const sortOrder = (maxResult?.maxOrder ?? 0) + 1 + + await db.insert(workflow).values({ + id: workflowId, + userId: context.userId, + workspaceId, + folderId, + sortOrder, + name, + description, + color: '#3972F6', + lastSynced: now, + createdAt: now, + updatedAt: now, + isDeployed: false, + runCount: 0, + variables: {}, + }) + + const { workflowState } = buildDefaultWorkflowArtifacts() + const saveResult = await saveWorkflowToNormalizedTables(workflowId, workflowState) + if (!saveResult.success) { + throw new Error(saveResult.error || 'Failed to save workflow state') + } + + return { + success: true, + output: { + workflowId, + workflowName: name, + workspaceId, + folderId, + }, + } + } catch (error) { + return { success: false, error: error instanceof Error ? error.message : String(error) } + } +} + +export async function executeCreateFolder( + params: Record, + context: ExecutionContext +): Promise { + try { + const name = typeof params?.name === 'string' ? params.name.trim() : '' + if (!name) { + return { success: false, error: 'name is required' } + } + + const workspaceId = params?.workspaceId || (await getDefaultWorkspaceId(context.userId)) + const parentId = params?.parentId || null + + await ensureWorkspaceAccess(workspaceId, context.userId, true) + + const [maxResult] = await db + .select({ maxOrder: max(workflowFolder.sortOrder) }) + .from(workflowFolder) + .where( + and( + eq(workflowFolder.workspaceId, workspaceId), + parentId ? eq(workflowFolder.parentId, parentId) : isNull(workflowFolder.parentId) + ) + ) + const sortOrder = (maxResult?.maxOrder ?? 0) + 1 + + const folderId = crypto.randomUUID() + await db.insert(workflowFolder).values({ + id: folderId, + workspaceId, + parentId, + name, + sortOrder, + createdAt: new Date(), + updatedAt: new Date(), + }) + + return { success: true, output: { folderId, name, workspaceId, parentId } } + } catch (error) { + return { success: false, error: error instanceof Error ? error.message : String(error) } + } +} + +export async function executeGetWorkflowData( + params: Record, + context: ExecutionContext +): Promise { + try { + const workflowId = params.workflowId || context.workflowId + const dataType = params.data_type || params.dataType || '' + if (!workflowId) { + return { success: false, error: 'workflowId is required' } + } + if (!dataType) { + return { success: false, error: 'data_type is required' } + } + + const { workflow: workflowRecord, workspaceId } = await ensureWorkflowAccess( + workflowId, + context.userId + ) + + if (dataType === 'global_variables') { + const variablesRecord = (workflowRecord.variables as Record) || {} + const variables = Object.values(variablesRecord).map((v: any) => ({ + id: String(v?.id || ''), + name: String(v?.name || ''), + value: v?.value, + })) + return { success: true, output: { variables } } + } + + if (dataType === 'custom_tools') { + if (!workspaceId) { + return { success: false, error: 'workspaceId is required' } + } + const conditions = [ + eq(customTools.workspaceId, workspaceId), + and(eq(customTools.userId, context.userId), isNull(customTools.workspaceId)), + ] + const toolsRows = await db + .select() + .from(customTools) + .where(or(...conditions)) + .orderBy(desc(customTools.createdAt)) + + const customToolsData = toolsRows.map((tool) => ({ + id: String(tool.id || ''), + title: String(tool.title || ''), + functionName: String((tool.schema as any)?.function?.name || ''), + description: String((tool.schema as any)?.function?.description || ''), + parameters: (tool.schema as any)?.function?.parameters, + })) + + return { success: true, output: { customTools: customToolsData } } + } + + if (dataType === 'mcp_tools') { + if (!workspaceId) { + return { success: false, error: 'workspaceId is required' } + } + const tools = await mcpService.discoverTools(context.userId, workspaceId, false) + const mcpTools = tools.map((tool) => ({ + name: String(tool.name || ''), + serverId: String(tool.serverId || ''), + serverName: String(tool.serverName || ''), + description: String(tool.description || ''), + inputSchema: tool.inputSchema, + })) + return { success: true, output: { mcpTools } } + } + + if (dataType === 'files') { + if (!workspaceId) { + return { success: false, error: 'workspaceId is required' } + } + const files = await listWorkspaceFiles(workspaceId) + const fileResults = files.map((file) => ({ + id: String(file.id || ''), + name: String(file.name || ''), + key: String(file.key || ''), + path: String(file.path || ''), + size: Number(file.size || 0), + type: String(file.type || ''), + uploadedAt: String(file.uploadedAt || ''), + })) + return { success: true, output: { files: fileResults } } + } + + return { success: false, error: `Unknown data_type: ${dataType}` } + } catch (error) { + return { success: false, error: error instanceof Error ? error.message : String(error) } + } +} + +export async function executeGetBlockOutputs( + params: Record, + context: ExecutionContext +): Promise { + try { + const workflowId = params.workflowId || context.workflowId + if (!workflowId) { + return { success: false, error: 'workflowId is required' } + } + await ensureWorkflowAccess(workflowId, context.userId) + + const normalized = await loadWorkflowFromNormalizedTables(workflowId) + if (!normalized) { + return { success: false, error: 'Workflow has no normalized data' } + } + + const blocks = normalized.blocks || {} + const loops = normalized.loops || {} + const parallels = normalized.parallels || {} + const blockIds = + Array.isArray(params.blockIds) && params.blockIds.length > 0 + ? params.blockIds + : Object.keys(blocks) + + const results: Array<{ + blockId: string + blockName: string + blockType: string + outputs: string[] + insideSubflowOutputs?: string[] + outsideSubflowOutputs?: string[] + triggerMode?: boolean + }> = [] + + for (const blockId of blockIds) { + const block = blocks[blockId] + if (!block?.type) continue + const blockName = block.name || block.type + + if (block.type === 'loop' || block.type === 'parallel') { + const insidePaths = getSubflowInsidePaths(block.type, blockId, loops, parallels) + results.push({ + blockId, + blockName, + blockType: block.type, + outputs: [], + insideSubflowOutputs: formatOutputsWithPrefix(insidePaths, blockName), + outsideSubflowOutputs: formatOutputsWithPrefix(['results'], blockName), + triggerMode: block.triggerMode, + }) + continue + } + + const outputs = getBlockOutputPaths(block.type, block.subBlocks, block.triggerMode) + results.push({ + blockId, + blockName, + blockType: block.type, + outputs: formatOutputsWithPrefix(outputs, blockName), + triggerMode: block.triggerMode, + }) + } + + const variables = await getWorkflowVariablesForTool(workflowId) + + const payload = { blocks: results, variables } + return { success: true, output: payload } + } catch (error) { + return { success: false, error: error instanceof Error ? error.message : String(error) } + } +} + +export async function executeGetBlockUpstreamReferences( + params: Record, + context: ExecutionContext +): Promise { + try { + const workflowId = params.workflowId || context.workflowId + if (!workflowId) { + return { success: false, error: 'workflowId is required' } + } + if (!Array.isArray(params.blockIds) || params.blockIds.length === 0) { + return { success: false, error: 'blockIds array is required' } + } + await ensureWorkflowAccess(workflowId, context.userId) + + const normalized = await loadWorkflowFromNormalizedTables(workflowId) + if (!normalized) { + return { success: false, error: 'Workflow has no normalized data' } + } + + const blocks = normalized.blocks || {} + const edges = normalized.edges || [] + const loops = normalized.loops || {} + const parallels = normalized.parallels || {} + + const graphEdges = edges.map((edge: any) => ({ source: edge.source, target: edge.target })) + const variableOutputs = await getWorkflowVariablesForTool(workflowId) + + const results: any[] = [] + + for (const blockId of params.blockIds) { + const targetBlock = blocks[blockId] + if (!targetBlock) continue + + const insideSubflows: Array<{ blockId: string; blockName: string; blockType: string }> = [] + const containingLoopIds = new Set() + const containingParallelIds = new Set() + + Object.values(loops as Record).forEach((loop) => { + if (loop?.nodes?.includes(blockId)) { + containingLoopIds.add(loop.id) + const loopBlock = blocks[loop.id] + if (loopBlock) { + insideSubflows.push({ + blockId: loop.id, + blockName: loopBlock.name || loopBlock.type, + blockType: 'loop', + }) + } + } + }) + + Object.values(parallels as Record).forEach((parallel) => { + if (parallel?.nodes?.includes(blockId)) { + containingParallelIds.add(parallel.id) + const parallelBlock = blocks[parallel.id] + if (parallelBlock) { + insideSubflows.push({ + blockId: parallel.id, + blockName: parallelBlock.name || parallelBlock.type, + blockType: 'parallel', + }) + } + } + }) + + const ancestorIds = BlockPathCalculator.findAllPathNodes(graphEdges, blockId) + const accessibleIds = new Set(ancestorIds) + accessibleIds.add(blockId) + + const starterBlock = Object.values(blocks).find((b: any) => isInputDefinitionTrigger(b.type)) + if (starterBlock && ancestorIds.includes((starterBlock as any).id)) { + accessibleIds.add((starterBlock as any).id) + } + + containingLoopIds.forEach((loopId) => { + accessibleIds.add(loopId) + loops[loopId]?.nodes?.forEach((nodeId: string) => accessibleIds.add(nodeId)) + }) + + containingParallelIds.forEach((parallelId) => { + accessibleIds.add(parallelId) + parallels[parallelId]?.nodes?.forEach((nodeId: string) => accessibleIds.add(nodeId)) + }) + + const accessibleBlocks: any[] = [] + + for (const accessibleBlockId of accessibleIds) { + const block = blocks[accessibleBlockId] + if (!block?.type) continue + const canSelfReference = block.type === 'approval' || block.type === 'human_in_the_loop' + if (accessibleBlockId === blockId && !canSelfReference) continue + + const blockName = block.name || block.type + let accessContext: 'inside' | 'outside' | undefined + let outputPaths: string[] + + if (block.type === 'loop' || block.type === 'parallel') { + const isInside = + (block.type === 'loop' && containingLoopIds.has(accessibleBlockId)) || + (block.type === 'parallel' && containingParallelIds.has(accessibleBlockId)) + accessContext = isInside ? 'inside' : 'outside' + outputPaths = isInside + ? getSubflowInsidePaths(block.type, accessibleBlockId, loops, parallels) + : ['results'] + } else { + outputPaths = getBlockOutputPaths(block.type, block.subBlocks, block.triggerMode) + } + + const formattedOutputs = formatOutputsWithPrefix(outputPaths, blockName) + const entry: any = { + blockId: accessibleBlockId, + blockName, + blockType: block.type, + outputs: formattedOutputs, + } + if (block.triggerMode) entry.triggerMode = true + if (accessContext) entry.accessContext = accessContext + accessibleBlocks.push(entry) + } + + results.push({ + blockId, + blockName: targetBlock.name || targetBlock.type, + blockType: targetBlock.type, + accessibleBlocks, + insideSubflows, + variables: variableOutputs, + }) + } + + const payload = { results } + return { success: true, output: payload } + } catch (error) { + return { success: false, error: error instanceof Error ? error.message : String(error) } + } +} + +export async function executeRunWorkflow( + params: Record, + context: ExecutionContext +): Promise { + try { + const workflowId = params.workflowId || context.workflowId + if (!workflowId) { + return { success: false, error: 'workflowId is required' } + } + + const { workflow: workflowRecord } = await ensureWorkflowAccess(workflowId, context.userId) + + const result = await executeWorkflow( + { + id: workflowRecord.id, + userId: workflowRecord.userId, + workspaceId: workflowRecord.workspaceId, + variables: workflowRecord.variables || {}, + }, + generateRequestId(), + params.workflow_input || params.input || undefined, + context.userId + ) + + return { + success: result.success, + output: { + executionId: result.executionId, + success: result.success, + output: result.output, + logs: result.logs, + }, + error: result.success ? undefined : result.error || 'Workflow execution failed', + } + } catch (error) { + return { success: false, error: error instanceof Error ? error.message : String(error) } + } +} + +export async function executeSetGlobalWorkflowVariables( + params: Record, + context: ExecutionContext +): Promise { + try { + const workflowId = params.workflowId || context.workflowId + if (!workflowId) { + return { success: false, error: 'workflowId is required' } + } + const operations = Array.isArray(params.operations) ? params.operations : [] + const { workflow: workflowRecord } = await ensureWorkflowAccess(workflowId, context.userId) + + const currentVarsRecord = (workflowRecord.variables as Record) || {} + const byName: Record = {} + Object.values(currentVarsRecord).forEach((v: any) => { + if (v && typeof v === 'object' && v.id && v.name) byName[String(v.name)] = v + }) + + for (const op of operations) { + const key = String(op?.name || '') + if (!key) continue + const nextType = op?.type || byName[key]?.type || 'plain' + const coerceValue = (value: any, type: string) => { + if (value === undefined) return value + if (type === 'number') { + const n = Number(value) + return Number.isNaN(n) ? value : n + } + if (type === 'boolean') { + const v = String(value).trim().toLowerCase() + if (v === 'true') return true + if (v === 'false') return false + return value + } + if (type === 'array' || type === 'object') { + try { + const parsed = JSON.parse(String(value)) + if (type === 'array' && Array.isArray(parsed)) return parsed + if (type === 'object' && parsed && typeof parsed === 'object' && !Array.isArray(parsed)) + return parsed + } catch {} + return value + } + return value + } + + if (op.operation === 'delete') { + delete byName[key] + continue + } + const typedValue = coerceValue(op.value, nextType) + if (op.operation === 'add') { + byName[key] = { + id: crypto.randomUUID(), + workflowId, + name: key, + type: nextType, + value: typedValue, + } + continue + } + if (op.operation === 'edit') { + if (!byName[key]) { + byName[key] = { + id: crypto.randomUUID(), + workflowId, + name: key, + type: nextType, + value: typedValue, + } + } else { + byName[key] = { + ...byName[key], + type: nextType, + value: typedValue, + } + } + } + } + + const nextVarsRecord = Object.fromEntries( + Object.values(byName).map((v: any) => [String(v.id), v]) + ) + + await db + .update(workflow) + .set({ variables: nextVarsRecord, updatedAt: new Date() }) + .where(eq(workflow.id, workflowId)) + + return { success: true, output: { updated: Object.values(byName).length } } + } catch (error) { + return { success: false, error: error instanceof Error ? error.message : String(error) } + } +} + +async function getWorkflowVariablesForTool( + workflowId: string +): Promise> { + const [workflowRecord] = await db + .select({ variables: workflow.variables }) + .from(workflow) + .where(eq(workflow.id, workflowId)) + .limit(1) + + const variablesRecord = (workflowRecord?.variables as Record) || {} + return Object.values(variablesRecord) + .filter((v: any) => v?.name && String(v.name).trim() !== '') + .map((v: any) => ({ + id: String(v.id || ''), + name: String(v.name || ''), + type: String(v.type || 'plain'), + tag: `variable.${normalizeName(String(v.name || ''))}`, + })) +} + +function getSubflowInsidePaths( + blockType: 'loop' | 'parallel', + blockId: string, + loops: Record, + parallels: Record +): string[] { + const paths = ['index'] + if (blockType === 'loop') { + const loopType = loops[blockId]?.loopType || 'for' + if (loopType === 'forEach') { + paths.push('currentItem', 'items') + } + } else { + const parallelType = parallels[blockId]?.parallelType || 'count' + if (parallelType === 'collection') { + paths.push('currentItem', 'items') + } + } + return paths +} + +function formatOutputsWithPrefix(paths: string[], blockName: string): string[] { + const normalizedName = normalizeName(blockName) + return paths.map((path) => `${normalizedName}.${path}`) +} + diff --git a/apps/sim/lib/copilot/tools/client/blocks/get-blocks-and-tools.ts b/apps/sim/lib/copilot/tools/client/blocks/get-blocks-and-tools.ts index d57cb1d24..7532ca6c4 100644 --- a/apps/sim/lib/copilot/tools/client/blocks/get-blocks-and-tools.ts +++ b/apps/sim/lib/copilot/tools/client/blocks/get-blocks-and-tools.ts @@ -48,7 +48,8 @@ export class GetBlocksAndToolsClientTool extends BaseClientTool { const parsed = ExecuteResponseSuccessSchema.parse(json) const result = GetBlocksAndToolsResult.parse(parsed.result) - await this.markToolComplete(200, 'Successfully retrieved blocks and tools', result) + // TODO: Temporarily sending empty data to test 403 issue + await this.markToolComplete(200, 'Successfully retrieved blocks and tools', {}) this.setState(ClientToolCallState.success) } catch (error: any) { const message = error instanceof Error ? error.message : String(error) diff --git a/apps/sim/lib/core/config/env.ts b/apps/sim/lib/core/config/env.ts index 8440de3bc..ee934d240 100644 --- a/apps/sim/lib/core/config/env.ts +++ b/apps/sim/lib/core/config/env.ts @@ -35,6 +35,11 @@ export const env = createEnv({ SIM_AGENT_API_URL: z.string().url().optional(), // URL for internal sim agent API AGENT_INDEXER_URL: z.string().url().optional(), // URL for agent training data indexer AGENT_INDEXER_API_KEY: z.string().min(1).optional(), // API key for agent indexer authentication + COPILOT_STREAM_TTL_SECONDS: z.number().optional(), // Redis TTL for copilot SSE buffer + COPILOT_STREAM_EVENT_LIMIT: z.number().optional(), // Max events retained per stream + COPILOT_STREAM_RESERVE_BATCH: z.number().optional(), // Event ID reservation batch size + COPILOT_STREAM_FLUSH_INTERVAL_MS: z.number().optional(), // Buffer flush interval in ms + COPILOT_STREAM_FLUSH_MAX_BATCH: z.number().optional(), // Max events per flush batch // Database & Storage REDIS_URL: z.string().url().optional(), // Redis connection string for caching/sessions diff --git a/apps/sim/stores/panel/copilot/store.ts b/apps/sim/stores/panel/copilot/store.ts index 0db3d68c0..a17044afb 100644 --- a/apps/sim/stores/panel/copilot/store.ts +++ b/apps/sim/stores/panel/copilot/store.ts @@ -5,6 +5,11 @@ import { create } from 'zustand' import { devtools } from 'zustand/middleware' import { type CopilotChat, sendStreamingMessage } from '@/lib/copilot/api' import type { CopilotTransportMode } from '@/lib/copilot/models' +import { + normalizeSseEvent, + shouldSkipToolCallEvent, + shouldSkipToolResultEvent, +} from '@/lib/copilot/orchestrator/sse-utils' import type { BaseClientToolMetadata, ClientToolDisplay, @@ -2045,6 +2050,12 @@ async function applySseEvent( get: () => CopilotStore, set: (next: Partial | ((state: CopilotStore) => Partial)) => void ): Promise { + const normalizedEvent = normalizeSseEvent(data) + if (shouldSkipToolCallEvent(normalizedEvent) || shouldSkipToolResultEvent(normalizedEvent)) { + return true + } + data = normalizedEvent + if (data.type === 'subagent_start') { const toolCallId = data.data?.tool_call_id if (toolCallId) { diff --git a/bun.lock b/bun.lock index defa6c36f..4b18dedf8 100644 --- a/bun.lock +++ b/bun.lock @@ -1,6 +1,5 @@ { "lockfileVersion": 1, - "configVersion": 0, "workspaces": { "": { "name": "simstudio", diff --git a/docs/COPILOT_SERVER_REFACTOR.md b/docs/COPILOT_SERVER_REFACTOR.md index a58e5aa6a..3184fa3a1 100644 --- a/docs/COPILOT_SERVER_REFACTOR.md +++ b/docs/COPILOT_SERVER_REFACTOR.md @@ -836,6 +836,15 @@ describe('POST /api/v1/copilot/chat', () => { - Save checkpoints to resume - Handle partial completions gracefully +### Risk 6: Process-Local Dedupe + +**Risk**: Tool call/result dedupe caches are in-memory and scoped to a single process, so duplicate events can still appear across ECS tasks. + +**Mitigation**: +- Treat dedupe as best-effort, not global +- Prefer idempotent state updates on the client +- Use Redis-backed stream replay for authoritative ordering + --- ## File Inventory