diff --git a/apps/sim/app/api/files/parse/route.test.ts b/apps/sim/app/api/files/parse/route.test.ts index 510de5bf5..4258b5dd9 100644 --- a/apps/sim/app/api/files/parse/route.test.ts +++ b/apps/sim/app/api/files/parse/route.test.ts @@ -1,4 +1,5 @@ import path from 'path' +import { NextRequest } from 'next/server' /** * Tests for file parse API route * @@ -6,10 +7,9 @@ import path from 'path' */ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' import { createMockRequest } from '@/app/api/__test-utils__/utils' +import { POST } from './route' -// Create actual mocks for path functions that we can use instead of using vi.doMock for path const mockJoin = vi.fn((...args: string[]): string => { - // For the UPLOAD_DIR paths, just return a test path if (args[0] === '/test/uploads') { return `/test/uploads/${args[args.length - 1]}` } @@ -17,7 +17,6 @@ const mockJoin = vi.fn((...args: string[]): string => { }) describe('File Parse API Route', () => { - // Mock file system and parser modules const mockReadFile = vi.fn().mockResolvedValue(Buffer.from('test file content')) const mockWriteFile = vi.fn().mockResolvedValue(undefined) const mockUnlink = vi.fn().mockResolvedValue(undefined) @@ -36,15 +35,12 @@ describe('File Parse API Route', () => { beforeEach(() => { vi.resetModules() - // Reset all mocks vi.resetAllMocks() - // Create a test upload file that exists for all tests mockReadFile.mockResolvedValue(Buffer.from('test file content')) mockAccessFs.mockResolvedValue(undefined) mockStatFs.mockImplementation(() => ({ isFile: () => true })) - // Mock filesystem operations vi.doMock('fs', () => ({ existsSync: vi.fn().mockReturnValue(true), constants: { R_OK: 4 }, @@ -63,19 +59,16 @@ describe('File Parse API Route', () => { stat: mockStatFs, })) - // Mock the S3 client vi.doMock('@/lib/uploads/s3-client', () => ({ downloadFromS3: mockDownloadFromS3, })) - // Mock file parsers vi.doMock('@/lib/file-parsers', () => ({ isSupportedFileType: vi.fn().mockReturnValue(true), parseFile: mockParseFile, parseBuffer: mockParseBuffer, })) - // Mock path module with our custom join function vi.doMock('path', () => { return { ...path, @@ -85,7 +78,6 @@ describe('File Parse API Route', () => { } }) - // Mock the logger vi.doMock('@/lib/logs/console-logger', () => ({ createLogger: vi.fn().mockReturnValue({ info: vi.fn(), @@ -95,7 +87,6 @@ describe('File Parse API Route', () => { }), })) - // Configure upload directory and S3 mode vi.doMock('@/lib/uploads/setup', () => ({ UPLOAD_DIR: '/test/uploads', USE_S3_STORAGE: false, @@ -105,7 +96,6 @@ describe('File Parse API Route', () => { }, })) - // Skip setup.server.ts side effects vi.doMock('@/lib/uploads/setup.server', () => ({})) }) @@ -113,7 +103,6 @@ describe('File Parse API Route', () => { vi.clearAllMocks() }) - // Basic tests testing the API structure it('should handle missing file path', async () => { const req = createMockRequest('POST', {}) const { POST } = await import('./route') @@ -125,47 +114,37 @@ describe('File Parse API Route', () => { expect(data).toHaveProperty('error', 'No file path provided') }) - // Test skipping the implementation details and testing what users would care about it('should accept and process a local file', async () => { - // Given: A request with a file path const req = createMockRequest('POST', { filePath: '/api/files/serve/test-file.txt', }) - // When: The API processes the request const { POST } = await import('./route') const response = await POST(req) const data = await response.json() - // Then: Check the API contract without making assumptions about implementation expect(response.status).toBe(200) - expect(data).not.toBeNull() // We got a response + expect(data).not.toBeNull() - // The response either has a success indicator with output OR an error if (data.success === true) { expect(data).toHaveProperty('output') } else { - // If error, there should be an error message expect(data).toHaveProperty('error') expect(typeof data.error).toBe('string') } }) it('should process S3 files', async () => { - // Given: A request with an S3 file path const req = createMockRequest('POST', { filePath: '/api/files/serve/s3/test-file.pdf', }) - // When: The API processes the request const { POST } = await import('./route') const response = await POST(req) const data = await response.json() - // Then: We should get a response with parsed content or error expect(response.status).toBe(200) - // The data should either have a success flag with output or an error if (data.success === true) { expect(data).toHaveProperty('output') } else { @@ -174,17 +153,14 @@ describe('File Parse API Route', () => { }) it('should handle multiple files', async () => { - // Given: A request with multiple file paths const req = createMockRequest('POST', { filePath: ['/api/files/serve/file1.txt', '/api/files/serve/file2.txt'], }) - // When: The API processes the request const { POST } = await import('./route') const response = await POST(req) const data = await response.json() - // Then: We get an array of results expect(response.status).toBe(200) expect(data).toHaveProperty('success') expect(data).toHaveProperty('results') @@ -193,20 +169,16 @@ describe('File Parse API Route', () => { }) it('should handle S3 access errors gracefully', async () => { - // Given: S3 will throw an error mockDownloadFromS3.mockRejectedValueOnce(new Error('S3 access denied')) - // And: A request with an S3 file path const req = createMockRequest('POST', { filePath: '/api/files/serve/s3/access-denied.pdf', }) - // When: The API processes the request const { POST } = await import('./route') const response = await POST(req) const data = await response.json() - // Then: We get an appropriate error expect(response.status).toBe(200) expect(data).toHaveProperty('success', false) expect(data).toHaveProperty('error') @@ -214,22 +186,203 @@ describe('File Parse API Route', () => { }) it('should handle access errors gracefully', async () => { - // Given: File access will fail mockAccessFs.mockRejectedValueOnce(new Error('ENOENT: no such file')) - // And: A request with a nonexistent file const req = createMockRequest('POST', { filePath: '/api/files/serve/nonexistent.txt', }) - // When: The API processes the request const { POST } = await import('./route') const response = await POST(req) const data = await response.json() - // Then: We get an appropriate error response expect(response.status).toBe(200) expect(data).toHaveProperty('success') expect(data).toHaveProperty('error') }) }) + +describe('Files Parse API - Path Traversal Security', () => { + beforeEach(() => { + vi.clearAllMocks() + }) + + describe('Path Traversal Prevention', () => { + it('should reject path traversal attempts with .. segments', async () => { + const maliciousRequests = [ + '../../../etc/passwd', + '/api/files/serve/../../../etc/passwd', + '/api/files/serve/../../app.js', + '/api/files/serve/../.env', + 'uploads/../../../etc/hosts', + ] + + for (const maliciousPath of maliciousRequests) { + const request = new NextRequest('http://localhost:3000/api/files/parse', { + method: 'POST', + body: JSON.stringify({ + filePath: maliciousPath, + }), + }) + + const response = await POST(request) + const result = await response.json() + + expect(result.success).toBe(false) + expect(result.error).toMatch(/Access denied|Invalid path|Path outside allowed directory/) + } + }) + + it('should reject paths with tilde characters', async () => { + const maliciousPaths = [ + '~/../../etc/passwd', + '/api/files/serve/~/secret.txt', + '~root/.ssh/id_rsa', + ] + + for (const maliciousPath of maliciousPaths) { + const request = new NextRequest('http://localhost:3000/api/files/parse', { + method: 'POST', + body: JSON.stringify({ + filePath: maliciousPath, + }), + }) + + const response = await POST(request) + const result = await response.json() + + expect(result.success).toBe(false) + expect(result.error).toMatch(/Access denied|Invalid path/) + } + }) + + it('should reject absolute paths outside upload directory', async () => { + const maliciousPaths = [ + '/etc/passwd', + '/root/.bashrc', + '/app/.env', + '/var/log/auth.log', + 'C:\\Windows\\System32\\drivers\\etc\\hosts', // Windows path + ] + + for (const maliciousPath of maliciousPaths) { + const request = new NextRequest('http://localhost:3000/api/files/parse', { + method: 'POST', + body: JSON.stringify({ + filePath: maliciousPath, + }), + }) + + const response = await POST(request) + const result = await response.json() + + expect(result.success).toBe(false) + expect(result.error).toMatch(/Access denied|Path outside allowed directory/) + } + }) + + it('should allow valid paths within upload directory', async () => { + // Test that valid paths don't trigger path validation errors + const validPaths = [ + '/api/files/serve/document.txt', + '/api/files/serve/folder/file.pdf', + '/api/files/serve/subfolder/image.png', + ] + + for (const validPath of validPaths) { + const request = new NextRequest('http://localhost:3000/api/files/parse', { + method: 'POST', + body: JSON.stringify({ + filePath: validPath, + }), + }) + + const response = await POST(request) + const result = await response.json() + + // Should not fail due to path validation (may fail for other reasons like file not found) + if (result.error) { + expect(result.error).not.toMatch( + /Access denied|Path outside allowed directory|Invalid path/ + ) + } + } + }) + + it('should handle encoded path traversal attempts', async () => { + const encodedMaliciousPaths = [ + '/api/files/serve/%2e%2e%2f%2e%2e%2fetc%2fpasswd', // ../../../etc/passwd + '/api/files/serve/..%2f..%2f..%2fetc%2fpasswd', + '/api/files/serve/%2e%2e/%2e%2e/etc/passwd', + ] + + for (const maliciousPath of encodedMaliciousPaths) { + const request = new NextRequest('http://localhost:3000/api/files/parse', { + method: 'POST', + body: JSON.stringify({ + filePath: decodeURIComponent(maliciousPath), // Simulate URL decoding + }), + }) + + const response = await POST(request) + const result = await response.json() + + expect(result.success).toBe(false) + expect(result.error).toMatch(/Access denied|Invalid path|Path outside allowed directory/) + } + }) + + it('should handle null byte injection attempts', async () => { + const nullBytePaths = [ + '/api/files/serve/file.txt\0../../etc/passwd', + 'file.txt\0/etc/passwd', + '/api/files/serve/document.pdf\0/var/log/auth.log', + ] + + for (const maliciousPath of nullBytePaths) { + const request = new NextRequest('http://localhost:3000/api/files/parse', { + method: 'POST', + body: JSON.stringify({ + filePath: maliciousPath, + }), + }) + + const response = await POST(request) + const result = await response.json() + + expect(result.success).toBe(false) + // Should be rejected either by path validation or file system access + } + }) + }) + + describe('Edge Cases', () => { + it('should handle empty file paths', async () => { + const request = new NextRequest('http://localhost:3000/api/files/parse', { + method: 'POST', + body: JSON.stringify({ + filePath: '', + }), + }) + + const response = await POST(request) + const result = await response.json() + + expect(response.status).toBe(400) + expect(result.error).toBe('No file path provided') + }) + + it('should handle missing filePath parameter', async () => { + const request = new NextRequest('http://localhost:3000/api/files/parse', { + method: 'POST', + body: JSON.stringify({}), + }) + + const response = await POST(request) + const result = await response.json() + + expect(response.status).toBe(400) + expect(result.error).toBe('No file path provided') + }) + }) +}) diff --git a/apps/sim/app/api/files/parse/route.ts b/apps/sim/app/api/files/parse/route.ts index e8211fa1e..c635a815b 100644 --- a/apps/sim/app/api/files/parse/route.ts +++ b/apps/sim/app/api/files/parse/route.ts @@ -15,7 +15,6 @@ export const dynamic = 'force-dynamic' const logger = createLogger('FilesParseAPI') -// Constants for URL downloads const MAX_DOWNLOAD_SIZE_BYTES = 100 * 1024 * 1024 // 100 MB const DOWNLOAD_TIMEOUT_MS = 30000 // 30 seconds @@ -71,21 +70,6 @@ const fileTypeMap: Record = { zip: 'application/zip', } -// Binary file extensions -const _binaryExtensions = [ - 'doc', - 'docx', - 'xls', - 'xlsx', - 'ppt', - 'pptx', - 'zip', - 'png', - 'jpg', - 'jpeg', - 'gif', -] - /** * Main API route handler */ @@ -529,11 +513,57 @@ async function parseBufferAsPdf(buffer: Buffer) { } } +/** + * Validate that a file path is safe and within allowed directories + */ +function validateAndResolvePath(inputPath: string): { + isValid: boolean + resolvedPath?: string + error?: string +} { + try { + let targetPath = inputPath + if (inputPath.startsWith('/api/files/serve/')) { + const filename = inputPath.replace('/api/files/serve/', '') + targetPath = path.join(UPLOAD_DIR, filename) + } + + const resolvedPath = path.resolve(targetPath) + const resolvedUploadDir = path.resolve(UPLOAD_DIR) + + if ( + !resolvedPath.startsWith(resolvedUploadDir + path.sep) && + resolvedPath !== resolvedUploadDir + ) { + return { + isValid: false, + error: `Access denied: Path outside allowed directory`, + } + } + + if (inputPath.includes('..') || inputPath.includes('~')) { + return { + isValid: false, + error: `Access denied: Invalid path characters detected`, + } + } + + return { + isValid: true, + resolvedPath, + } + } catch (error) { + return { + isValid: false, + error: `Path validation error: ${(error as Error).message}`, + } + } +} + /** * Handle a local file from the filesystem */ async function handleLocalFile(filePath: string, fileType?: string): Promise { - // Check if this is an S3 path that was incorrectly routed if (filePath.includes('/api/files/serve/s3/')) { logger.warn(`S3 path detected in handleLocalFile, redirecting to S3 handler: ${filePath}`) return handleS3File(filePath, fileType) @@ -542,15 +572,19 @@ async function handleLocalFile(filePath: string, fileType?: string): Promise (typeof arg === 'object' ? JSON.stringify(arg) : String(arg))) .join(' ')}\n` - logger.error(`[${requestId}] Code Console Error:`, errorMessage) + logger.error(`[${requestId}] Code Console Error: ${errorMessage}`) stdout += `ERROR: ${errorMessage}` }, }, @@ -234,7 +242,7 @@ export async function POST(req: NextRequest) { const errorMessage = `${args .map((arg) => (typeof arg === 'object' ? JSON.stringify(arg) : String(arg))) .join(' ')}\n` - logger.error(`[${requestId}] Code Console Error:`, errorMessage) + logger.error(`[${requestId}] Code Console Error: ${errorMessage}`) stdout += `ERROR: ${errorMessage}` }, }, diff --git a/apps/sim/app/api/providers/route.ts b/apps/sim/app/api/providers/route.ts index 400b9e1a2..ee0cbc015 100644 --- a/apps/sim/app/api/providers/route.ts +++ b/apps/sim/app/api/providers/route.ts @@ -36,6 +36,7 @@ export async function POST(request: NextRequest) { workflowId, stream, messages, + environmentVariables, } = body logger.info(`[${requestId}] Provider request details`, { @@ -51,6 +52,8 @@ export async function POST(request: NextRequest) { stream: !!stream, hasMessages: !!messages?.length, messageCount: messages?.length || 0, + hasEnvironmentVariables: + !!environmentVariables && Object.keys(environmentVariables).length > 0, }) let finalApiKey: string @@ -89,6 +92,7 @@ export async function POST(request: NextRequest) { workflowId, stream, messages, + environmentVariables, }) const executionTime = Date.now() - startTime diff --git a/apps/sim/app/chat/[subdomain]/chat-client.tsx b/apps/sim/app/chat/[subdomain]/chat-client.tsx index 85f56d659..f9f27ba55 100644 --- a/apps/sim/app/chat/[subdomain]/chat-client.tsx +++ b/apps/sim/app/chat/[subdomain]/chat-client.tsx @@ -407,21 +407,6 @@ export default function ChatClient({ subdomain }: { subdomain: string }) { return (
- - {/* Header component */} diff --git a/apps/sim/app/chat/[subdomain]/components/message-container/message-container.tsx b/apps/sim/app/chat/[subdomain]/components/message-container/message-container.tsx index 44a41604a..6c59e578d 100644 --- a/apps/sim/app/chat/[subdomain]/components/message-container/message-container.tsx +++ b/apps/sim/app/chat/[subdomain]/components/message-container/message-container.tsx @@ -30,6 +30,21 @@ export const ChatMessageContainer = memo(function ChatMessageContainer({ }: ChatMessageContainerProps) { return (
+ + {/* Scrollable Messages Area */}
({ getProviderFromModel: vi.fn().mockReturnValue('mock-provider'), transformBlockTool: vi.fn(), getBaseModelProviders: vi.fn().mockReturnValue({ openai: {}, anthropic: {} }), + getApiKey: vi.fn().mockReturnValue('mock-api-key'), + getProvider: vi.fn().mockReturnValue({ + chat: { + completions: { + create: vi.fn().mockResolvedValue({ + content: 'Mocked response content', + model: 'mock-model', + tokens: { prompt: 10, completion: 20, total: 30 }, + toolCalls: [], + cost: 0.001, + timing: { total: 100 }, + }), + }, + }, + }), })) vi.mock('@/blocks', () => ({ @@ -31,6 +47,17 @@ vi.mock('@/tools', () => ({ executeTool: vi.fn(), })) +vi.mock('@/providers', () => ({ + executeProviderRequest: vi.fn().mockResolvedValue({ + content: 'Mocked response content', + model: 'mock-model', + tokens: { prompt: 10, completion: 20, total: 30 }, + toolCalls: [], + cost: 0.001, + timing: { total: 100 }, + }), +})) + global.fetch = Object.assign(vi.fn(), { preconnect: vi.fn() }) as typeof fetch const mockGetAllBlocks = getAllBlocks as Mock @@ -39,6 +66,7 @@ const mockIsHosted = isHosted as unknown as Mock const mockGetProviderFromModel = getProviderFromModel as Mock const mockTransformBlockTool = transformBlockTool as Mock const mockFetch = global.fetch as unknown as Mock +const mockExecuteProviderRequest = executeProviderRequest as Mock describe('AgentBlockHandler', () => { let handler: AgentBlockHandler @@ -50,7 +78,12 @@ describe('AgentBlockHandler', () => { handler = new AgentBlockHandler() vi.clearAllMocks() - // Save original Promise.all to restore later + Object.defineProperty(global, 'window', { + value: {}, + writable: true, + configurable: true, + }) + originalPromiseAll = Promise.all mockBlock = { @@ -85,7 +118,7 @@ describe('AgentBlockHandler', () => { loops: {}, } as SerializedWorkflow, } - mockIsHosted.mockReturnValue(false) // Default to non-hosted env for tests + mockIsHosted.mockReturnValue(false) mockGetProviderFromModel.mockReturnValue('mock-provider') mockFetch.mockImplementation(() => { @@ -130,8 +163,15 @@ describe('AgentBlockHandler', () => { }) afterEach(() => { - // Restore original Promise.all Promise.all = originalPromiseAll + + try { + Object.defineProperty(global, 'window', { + value: undefined, + writable: true, + configurable: true, + }) + } catch (e) {} }) describe('canHandle', () => { @@ -164,7 +204,7 @@ describe('AgentBlockHandler', () => { userPrompt: 'User query: Hello!', temperature: 0.7, maxTokens: 100, - apiKey: 'test-api-key', // Add API key for non-hosted env + apiKey: 'test-api-key', } mockGetProviderFromModel.mockReturnValue('openai') @@ -193,7 +233,6 @@ describe('AgentBlockHandler', () => { Promise.all = vi.fn().mockImplementation((promises: Promise[]) => { const result = originalPromiseAll.call(Promise, promises) - // When result resolves, capture the tools result.then((tools: any[]) => { if (tools?.length) { capturedTools = tools.filter((t) => t !== null) @@ -255,7 +294,7 @@ describe('AgentBlockHandler', () => { }, }, }, - usageControl: 'auto', + usageControl: 'auto' as const, }, { type: 'custom-tool', @@ -274,7 +313,7 @@ describe('AgentBlockHandler', () => { }, }, }, - usageControl: 'force', + usageControl: 'force' as const, }, { type: 'custom-tool', @@ -293,7 +332,7 @@ describe('AgentBlockHandler', () => { }, }, }, - usageControl: 'none', // This tool should be filtered out + usageControl: 'none' as const, }, ], } @@ -355,21 +394,21 @@ describe('AgentBlockHandler', () => { title: 'Tool 1', type: 'tool-type-1', operation: 'operation1', - usageControl: 'auto', // default setting + usageControl: 'auto' as const, }, { id: 'tool_2', title: 'Tool 2', type: 'tool-type-2', operation: 'operation2', - usageControl: 'none', // should be filtered out + usageControl: 'none' as const, }, { id: 'tool_3', title: 'Tool 3', type: 'tool-type-3', operation: 'operation3', - usageControl: 'force', // should be included + usageControl: 'force' as const, }, ], } @@ -400,14 +439,14 @@ describe('AgentBlockHandler', () => { title: 'Tool 1', type: 'tool-type-1', operation: 'operation1', - usageControl: 'auto', + usageControl: 'auto' as const, }, { id: 'tool_2', title: 'Tool 2', type: 'tool-type-2', operation: 'operation2', - usageControl: 'force', + usageControl: 'force' as const, }, ], } @@ -449,7 +488,7 @@ describe('AgentBlockHandler', () => { }, }, }, - usageControl: 'auto', + usageControl: 'auto' as const, }, { type: 'custom-tool', @@ -464,7 +503,7 @@ describe('AgentBlockHandler', () => { }, }, }, - usageControl: 'force', + usageControl: 'force' as const, }, { type: 'custom-tool', @@ -479,7 +518,7 @@ describe('AgentBlockHandler', () => { }, }, }, - usageControl: 'none', // Should be filtered out + usageControl: 'none' as const, }, ], } @@ -635,6 +674,8 @@ describe('AgentBlockHandler', () => { model: 'mock-model', tokens: { prompt: 10, completion: 20, total: 30 }, timing: { total: 100 }, + toolCalls: [], + cost: undefined, }), }) }) @@ -654,7 +695,9 @@ describe('AgentBlockHandler', () => { result: 'Success', score: 0.95, tokens: { prompt: 10, completion: 20, total: 30 }, + toolCalls: { list: [], count: 0 }, providerTiming: { total: 100 }, + cost: undefined, }, }) }) @@ -729,23 +772,35 @@ describe('AgentBlockHandler', () => { }) it('should handle streaming responses with text/event-stream content type', async () => { - const mockStreamBody = { - getReader: vi.fn().mockReturnValue({ - read: vi.fn().mockResolvedValue({ done: true, value: undefined }), - }), - } + const mockStreamBody = new ReadableStream({ + start(controller) { + controller.close() + }, + }) mockFetch.mockImplementationOnce(() => { return Promise.resolve({ ok: true, headers: { get: (name: string) => { - if (name === 'Content-Type') return 'text/event-stream' + if (name === 'Content-Type') return 'application/json' if (name === 'X-Execution-Data') return null return null }, }, - body: mockStreamBody, + json: () => + Promise.resolve({ + stream: mockStreamBody, + execution: { + success: true, + output: { response: {} }, + logs: [], + metadata: { + duration: 0, + startTime: new Date().toISOString(), + }, + }, + }), }) }) @@ -771,11 +826,11 @@ describe('AgentBlockHandler', () => { }) it('should handle streaming responses with execution data in header', async () => { - const mockStreamBody = { - getReader: vi.fn().mockReturnValue({ - read: vi.fn().mockResolvedValue({ done: true, value: undefined }), - }), - } + const mockStreamBody = new ReadableStream({ + start(controller) { + controller.close() + }, + }) const mockExecutionData = { success: true, @@ -807,12 +862,16 @@ describe('AgentBlockHandler', () => { ok: true, headers: { get: (name: string) => { - if (name === 'Content-Type') return 'text/event-stream' + if (name === 'Content-Type') return 'application/json' if (name === 'X-Execution-Data') return JSON.stringify(mockExecutionData) return null }, }, - body: mockStreamBody, + json: () => + Promise.resolve({ + stream: mockStreamBody, + execution: mockExecutionData, + }), }) }) diff --git a/apps/sim/executor/handlers/agent/agent-handler.ts b/apps/sim/executor/handlers/agent/agent-handler.ts index 70984ae3d..50a65de07 100644 --- a/apps/sim/executor/handlers/agent/agent-handler.ts +++ b/apps/sim/executor/handlers/agent/agent-handler.ts @@ -2,14 +2,21 @@ import { env } from '@/lib/env' import { createLogger } from '@/lib/logs/console-logger' import { getAllBlocks } from '@/blocks' import type { BlockOutput } from '@/blocks/types' -import { getProviderFromModel, transformBlockTool } from '@/providers/utils' +import { executeProviderRequest } from '@/providers' +import { getApiKey, getProviderFromModel, transformBlockTool } from '@/providers/utils' import type { SerializedBlock } from '@/serializer/types' import { executeTool } from '@/tools' import { getTool, getToolAsync } from '@/tools/utils' import type { BlockHandler, ExecutionContext, StreamingExecution } from '../../types' +import type { AgentInputs, Message, StreamingConfig, ToolInput } from './types' const logger = createLogger('AgentBlockHandler') +const DEFAULT_MODEL = 'gpt-4o' +const DEFAULT_FUNCTION_TIMEOUT = 5000 +const REQUEST_TIMEOUT = 120000 +const CUSTOM_TOOL_PREFIX = 'custom_' + /** * Handler for Agent blocks that process LLM requests with optional tools. */ @@ -20,263 +27,279 @@ export class AgentBlockHandler implements BlockHandler { async execute( block: SerializedBlock, - inputs: Record, + inputs: AgentInputs, context: ExecutionContext ): Promise { logger.info(`Executing agent block: ${block.id}`) - // Parse response format if provided - let responseFormat: any - if (inputs.responseFormat) { - // Handle empty string case - treat it as no response format - if (inputs.responseFormat === '') { - responseFormat = undefined - } else { - try { - responseFormat = - typeof inputs.responseFormat === 'string' - ? JSON.parse(inputs.responseFormat) - : inputs.responseFormat + const responseFormat = this.parseResponseFormat(inputs.responseFormat) + const model = inputs.model || DEFAULT_MODEL + const providerId = getProviderFromModel(model) + const formattedTools = await this.formatTools(inputs.tools || [], context) + const streamingConfig = this.getStreamingConfig(block, context) + const messages = this.buildMessages(inputs) - // Ensure the responseFormat is properly structured - if (responseFormat && typeof responseFormat === 'object') { - // If it's just a raw schema without the expected wrapper properties, - // wrap it properly for the provider - if (!responseFormat.schema && !responseFormat.name) { - responseFormat = { - name: 'response_schema', - schema: responseFormat, - strict: true, - } - } - } - } catch (error: any) { - logger.error('Failed to parse response format:', { error }) - throw new Error(`Invalid response format: ${error.message}`) + const providerRequest = this.buildProviderRequest({ + providerId, + model, + messages, + inputs, + formattedTools, + responseFormat, + context, + streaming: streamingConfig.shouldUseStreaming ?? false, + }) + + this.logRequestDetails(providerRequest, messages, streamingConfig) + + return this.executeProviderRequest(providerRequest, block, responseFormat, context) + } + + private parseResponseFormat(responseFormat?: string | object): any { + if (!responseFormat || responseFormat === '') return undefined + + try { + const parsed = + typeof responseFormat === 'string' ? JSON.parse(responseFormat) : responseFormat + + if (parsed && typeof parsed === 'object' && !parsed.schema && !parsed.name) { + return { + name: 'response_schema', + schema: parsed, + strict: true, } } + return parsed + } catch (error: any) { + logger.error('Failed to parse response format:', { error }) + throw new Error(`Invalid response format: ${error.message}`) + } + } + + private async formatTools(inputTools: ToolInput[], context: ExecutionContext): Promise { + if (!Array.isArray(inputTools)) return [] + + const tools = await Promise.all( + inputTools + .filter((tool) => { + const usageControl = tool.usageControl || 'auto' + return usageControl !== 'none' + }) + .map(async (tool) => { + if (tool.type === 'custom-tool' && tool.schema) { + return this.createCustomTool(tool, context) + } + return this.transformBlockTool(tool, context) + }) + ) + + return tools.filter( + (tool): tool is NonNullable => tool !== null && tool !== undefined + ) + } + + private createCustomTool(tool: ToolInput, context: ExecutionContext): any { + const base: any = { + id: `${CUSTOM_TOOL_PREFIX}${tool.title}`, + name: tool.schema.function.name, + description: tool.schema.function.description || '', + params: tool.params || {}, + parameters: { + type: tool.schema.function.parameters.type, + properties: tool.schema.function.parameters.properties, + required: tool.schema.function.parameters.required || [], + }, + usageControl: tool.usageControl || 'auto', + } + + if (tool.code) { + base.executeFunction = async (callParams: Record) => { + const result = await executeTool('function_execute', { + code: tool.code, + ...tool.params, + ...callParams, + timeout: tool.timeout ?? DEFAULT_FUNCTION_TIMEOUT, + envVars: context.environmentVariables || {}, + isCustomTool: true, + _context: { workflowId: context.workflowId }, + }) + + if (!result.success) { + throw new Error(result.error || 'Function execution failed') + } + return result.output + } } - const model = inputs.model || 'gpt-4o' - const providerId = getProviderFromModel(model) - logger.info(`Using provider: ${providerId}, model: ${model}`) + return base + } - // Format tools for provider API - const formattedTools = Array.isArray(inputs.tools) - ? ( - await Promise.all( - // First filter out any tools with usageControl set to 'none' - inputs.tools - .filter((tool: any) => { - const usageControl = tool.usageControl || 'auto' - if (usageControl === 'none') { - logger.info(`Filtering out tool set to 'none': ${tool.title || tool.type}`) - return false - } - return true - }) - .map(async (tool: any) => { - // Handle custom tools - if (tool.type === 'custom-tool' && tool.schema) { - // Add function execution capability to custom tools with code - if (tool.code) { - // Store the tool's code and make it available for execution - const toolName = tool.schema.function.name - const params = tool.params || {} + private async transformBlockTool(tool: ToolInput, context: ExecutionContext) { + const transformedTool = await transformBlockTool(tool, { + selectedOperation: tool.operation, + getAllBlocks, + getToolAsync: (toolId: string) => getToolAsync(toolId, context.workflowId), + getTool, + }) - // Create a tool that can execute the code - return { - id: `custom_${tool.title}`, - name: toolName, - description: tool.schema.function.description || '', - params: params, - parameters: { - type: tool.schema.function.parameters.type, - properties: tool.schema.function.parameters.properties, - required: tool.schema.function.parameters.required || [], - }, - usageControl: tool.usageControl || 'auto', - executeFunction: async (callParams: Record) => { - try { - // Execute the code using the function_execute tool - const result = await executeTool('function_execute', { - code: tool.code, - ...params, - ...callParams, - timeout: tool.timeout || 5000, - }) + if (transformedTool) { + transformedTool.usageControl = tool.usageControl || 'auto' + } + return transformedTool + } - if (!result.success) { - throw new Error(result.error || 'Function execution failed') - } - - return result.output - } catch (error: any) { - logger.error(`Error executing custom tool ${toolName}:`, error) - throw new Error(`Error in ${toolName}: ${error.message}`) - } - }, - } - } - - return { - id: `custom_${tool.title}`, - name: tool.schema.function.name, - description: tool.schema.function.description || '', - params: tool.params || {}, - parameters: { - type: tool.schema.function.parameters.type, - properties: tool.schema.function.parameters.properties, - required: tool.schema.function.parameters.required || [], - }, - usageControl: tool.usageControl || 'auto', - } - } - - // Handle regular block tools with operation selection - const transformedTool = await transformBlockTool(tool, { - selectedOperation: tool.operation, - getAllBlocks, - getToolAsync: (toolId: string) => getToolAsync(toolId, context.workflowId), - getTool, - }) - - // Add usageControl to the transformed tool if it exists - if (transformedTool) { - transformedTool.usageControl = tool.usageControl || 'auto' - } - - return transformedTool - }) - ) - ).filter((t: any): t is NonNullable => t !== null) - : [] - - // Check if streaming is requested and this block is selected for streaming + private getStreamingConfig(block: SerializedBlock, context: ExecutionContext): StreamingConfig { const isBlockSelectedForOutput = context.selectedOutputIds?.some((outputId) => { - // First check for direct match (if the entire outputId is the blockId) - if (outputId === block.id) { - logger.info(`Direct match found for block ${block.id} in selected outputs`) - return true - } - - // Then try parsing the blockId from the blockId_path format + if (outputId === block.id) return true const firstUnderscoreIndex = outputId.indexOf('_') - if (firstUnderscoreIndex !== -1) { - const blockId = outputId.substring(0, firstUnderscoreIndex) - const isMatch = blockId === block.id - if (isMatch) { - logger.info( - `Path match found for block ${block.id} in selected outputs (from ${outputId})` - ) - } - return isMatch - } - return false + return ( + firstUnderscoreIndex !== -1 && outputId.substring(0, firstUnderscoreIndex) === block.id + ) }) ?? false - // Check if this block has any outgoing connections const hasOutgoingConnections = context.edges?.some((edge) => edge.source === block.id) ?? false - - // Determine if we should use streaming for this block - const shouldUseStreaming = context.stream && isBlockSelectedForOutput + const shouldUseStreaming = Boolean(context.stream) && isBlockSelectedForOutput if (shouldUseStreaming) { - logger.info( - `Block ${block.id} will use streaming response (selected for output with no outgoing connections)` - ) + logger.info(`Block ${block.id} will use streaming response`) } - // Initialize parsedMessages - will be built from memories/prompts if provided - let parsedMessages: any[] | undefined + return { shouldUseStreaming, isBlockSelectedForOutput, hasOutgoingConnections } + } - // Check if we're in advanced mode with the memories field - if (inputs.memories || (inputs.systemPrompt && inputs.userPrompt)) { - const messages: any[] = [] + private buildMessages(inputs: AgentInputs): Message[] | undefined { + if (!inputs.memories && !(inputs.systemPrompt && inputs.userPrompt)) { + return undefined + } - if (inputs.memories) { - const memories = inputs.memories + const messages: Message[] = [] - const memoryMessages = processMemories(memories, logger) - messages.push(...memoryMessages) - } + if (inputs.memories) { + messages.push(...this.processMemories(inputs.memories)) + } - // Handle system prompt with clear precedence rules - if (inputs.systemPrompt) { - // Check for existing system messages in memories - const systemMessages = messages.filter((msg) => msg.role === 'system') + if (inputs.systemPrompt) { + this.addSystemPrompt(messages, inputs.systemPrompt) + } - if (systemMessages.length > 1) { - logger.warn( - `Found ${systemMessages.length} system messages in memories. Explicit systemPrompt will take precedence.` - ) - } else if (systemMessages.length === 1) { - logger.info( - 'Found system message in memories. Explicit systemPrompt will take precedence.' - ) - } + if (inputs.userPrompt) { + this.addUserPrompt(messages, inputs.userPrompt) + } - // Remove any existing system messages and add the explicit one at the beginning - messages.splice(0, 0, { - role: 'system', - content: inputs.systemPrompt, - }) + return messages.length > 0 ? messages : undefined + } - // Remove any other system messages that came from memories - for (let i = messages.length - 1; i >= 1; i--) { - if (messages[i].role === 'system') { - messages.splice(i, 1) + private processMemories(memories: any): Message[] { + if (!memories) return [] + + let memoryArray: any[] = [] + if (memories?.response?.memories && Array.isArray(memories.response.memories)) { + memoryArray = memories.response.memories + } else if (memories?.memories && Array.isArray(memories.memories)) { + memoryArray = memories.memories + } else if (Array.isArray(memories)) { + memoryArray = memories + } + + const messages: Message[] = [] + memoryArray.forEach((memory: any) => { + if (memory.data && Array.isArray(memory.data)) { + memory.data.forEach((msg: any) => { + if (msg.role && msg.content && ['system', 'user', 'assistant'].includes(msg.role)) { + messages.push({ + role: msg.role as 'system' | 'user' | 'assistant', + content: msg.content, + }) } - } - - logger.info( - 'Added explicit system prompt as first message, removed any system messages from memories' - ) - } else { - // No explicit system prompt provided, check for multiple system messages in memories - const systemMessages = messages.filter((msg) => msg.role === 'system') - - if (systemMessages.length > 1) { - logger.warn( - `Found ${systemMessages.length} system messages in memories with no explicit systemPrompt. Consider providing an explicit systemPrompt for consistent behavior.` - ) - } else if (systemMessages.length === 1) { - logger.info('Using system message from memories') - } - } - - if (inputs.userPrompt) { - let userContent = inputs.userPrompt - if (typeof userContent === 'object' && userContent.input) { - userContent = userContent.input - } else if (typeof userContent === 'object') { - userContent = JSON.stringify(userContent) - } - + }) + } else if ( + memory.role && + memory.content && + ['system', 'user', 'assistant'].includes(memory.role) + ) { messages.push({ - role: 'user', - content: userContent, + role: memory.role as 'system' | 'user' | 'assistant', + content: memory.content, }) - logger.info('Added user prompt to messages', { contentType: typeof userContent }) } + }) - if (messages.length > 0) { - parsedMessages = messages - logger.info('Built messages from advanced mode', { - messageCount: messages.length, - firstMessage: messages[0], - lastMessage: messages[messages.length - 1], - }) + return messages + } + + private addSystemPrompt(messages: Message[], systemPrompt: string) { + const systemMessages = messages.filter((msg) => msg.role === 'system') + + if (systemMessages.length > 0) { + messages.splice(0, 0, { role: 'system', content: systemPrompt }) + for (let i = messages.length - 1; i >= 1; i--) { + if (messages[i].role === 'system') { + messages.splice(i, 1) + } } + } else { + messages.splice(0, 0, { role: 'system', content: systemPrompt }) + } + } + + private addUserPrompt(messages: Message[], userPrompt: any) { + let content = userPrompt + if (typeof userPrompt === 'object' && userPrompt.input) { + content = userPrompt.input + } else if (typeof userPrompt === 'object') { + content = JSON.stringify(userPrompt) } - // Fast validation of parsed messages - const validMessages = - Array.isArray(parsedMessages) && - parsedMessages.length > 0 && - parsedMessages.every( - (msg) => + messages.push({ role: 'user', content }) + } + + private buildProviderRequest(config: { + providerId: string + model: string + messages: Message[] | undefined + inputs: AgentInputs + formattedTools: any[] + responseFormat: any + context: ExecutionContext + streaming: boolean + }) { + const { + providerId, + model, + messages, + inputs, + formattedTools, + responseFormat, + context, + streaming, + } = config + + const validMessages = this.validateMessages(messages) + + return { + provider: providerId, + model, + systemPrompt: validMessages ? undefined : inputs.systemPrompt, + context: JSON.stringify(messages), + tools: formattedTools, + temperature: inputs.temperature, + maxTokens: inputs.maxTokens, + apiKey: inputs.apiKey, + responseFormat, + workflowId: context.workflowId, + stream: streaming, + messages, + environmentVariables: context.environmentVariables || {}, + } + } + + private validateMessages(messages: Message[] | undefined): boolean { + return ( + Array.isArray(messages) && + messages.length > 0 && + messages.every( + (msg: any) => typeof msg === 'object' && msg !== null && 'role' in msg && @@ -284,434 +307,396 @@ export class AgentBlockHandler implements BlockHandler { ('content' in msg || (msg.role === 'assistant' && ('function_call' in msg || 'tool_calls' in msg))) ) + ) + } - if (Array.isArray(parsedMessages) && parsedMessages.length > 0 && !validMessages) { - logger.warn('Messages array has invalid format:', { - messageCount: parsedMessages.length, - }) - } else if (validMessages) { - logger.info('Messages validated successfully') - } - - // Debug request before sending to provider - const providerRequest = { - provider: providerId, - model, - // If messages are provided (advanced mode), use them exclusively and skip systemPrompt/context - ...(validMessages - ? { messages: parsedMessages } - : { - systemPrompt: inputs.systemPrompt, - context: inputs.userPrompt - ? Array.isArray(inputs.userPrompt) - ? JSON.stringify(inputs.userPrompt, null, 2) - : typeof inputs.userPrompt === 'string' - ? inputs.userPrompt - : JSON.stringify(inputs.userPrompt, null, 2) - : undefined, - }), - tools: formattedTools.length > 0 ? formattedTools : undefined, - temperature: inputs.temperature, - maxTokens: inputs.maxTokens, - apiKey: inputs.apiKey, - responseFormat, - workflowId: context.workflowId, - stream: shouldUseStreaming, - } - + private logRequestDetails( + providerRequest: any, + messages: Message[] | undefined, + streamingConfig: StreamingConfig + ) { logger.info('Provider request prepared', { model: providerRequest.model, - hasMessages: Array.isArray(parsedMessages) && parsedMessages.length > 0, - hasSystemPrompt: - !(Array.isArray(parsedMessages) && parsedMessages.length > 0) && !!inputs.systemPrompt, - hasContext: - !(Array.isArray(parsedMessages) && parsedMessages.length > 0) && !!inputs.userPrompt, + hasMessages: !!messages?.length, + hasSystemPrompt: !messages?.length && !!providerRequest.systemPrompt, + hasContext: !messages?.length && !!providerRequest.context, hasTools: !!providerRequest.tools, hasApiKey: !!providerRequest.apiKey, workflowId: providerRequest.workflowId, - stream: shouldUseStreaming, - isBlockSelectedForOutput, - hasOutgoingConnections, - // Debug info about messages to help diagnose issues - messagesProvided: 'messages' in providerRequest, - messagesCount: - 'messages' in providerRequest && Array.isArray(providerRequest.messages) - ? providerRequest.messages.length - : 0, + stream: providerRequest.stream, + messagesCount: messages?.length || 0, }) + } - const baseUrl = env.NEXT_PUBLIC_APP_URL || '' - const url = new URL('/api/providers', baseUrl) + private async executeProviderRequest( + providerRequest: any, + block: SerializedBlock, + responseFormat: any, + context: ExecutionContext + ): Promise { + const providerId = providerRequest.provider + const model = providerRequest.model + const providerStartTime = Date.now() try { - logger.info(`Making provider request to: ${url.toString()}`, { - workflowId: context.workflowId, - blockId: block.id, - provider: providerId, - model, - timestamp: new Date().toISOString(), - }) + const isBrowser = typeof window !== 'undefined' - const response = await fetch(url.toString(), { - method: 'POST', - headers: { - 'Content-Type': 'application/json', - }, - body: JSON.stringify(providerRequest), - // Add timeout and signal for better error handling - signal: AbortSignal.timeout(120000), // 2 minute timeout - }) - - if (!response.ok) { - // Try to extract a helpful error message - let errorMessage = `Provider API request failed with status ${response.status}` - let errorDetails = null - - try { - const errorData = await response.json() - if (errorData.error) { - errorMessage = errorData.error - errorDetails = errorData - } - } catch (_e) { - // If JSON parsing fails, try to get text response - try { - const textError = await response.text() - if (textError) { - errorDetails = { textResponse: textError } - } - } catch (_textError) { - // If text parsing also fails, use the original error message - } - } - - logger.error('Provider API request failed', { - workflowId: context.workflowId, - blockId: block.id, - status: response.status, - statusText: response.statusText, - url: url.toString(), - errorMessage, - errorDetails, - headers: Object.fromEntries(response.headers.entries()), - }) - - throw new Error(errorMessage) - } - - // Check if we're getting a streaming response - const contentType = response.headers.get('Content-Type') - if (contentType?.includes('text/event-stream')) { - logger.info(`Received streaming response for block ${block.id}`) - - // Ensure we have a valid body stream - if (!response.body) { - throw new Error(`No response body in streaming response for block ${block.id}`) - } - - // Check if we have execution data in the header - const executionDataHeader = response.headers.get('X-Execution-Data') - if (executionDataHeader) { - try { - // Parse the execution data from the header - const executionData = JSON.parse(executionDataHeader) - - // Add block-specific data to the execution logs if needed - if (executionData?.logs) { - for (const log of executionData.logs) { - if (!log.blockId) log.blockId = block.id - if (!log.blockName && block.metadata?.name) log.blockName = block.metadata.name - if (!log.blockType && block.metadata?.id) log.blockType = block.metadata.id - } - } - - // Add block metadata to the execution data if missing - if (executionData.output?.response) { - // Ensure model and block info is set - if (block.metadata?.name && !executionData.blockName) { - executionData.blockName = block.metadata.name - } - if (block.metadata?.id && !executionData.blockType) { - executionData.blockType = block.metadata.id - } - if (!executionData.blockId) { - executionData.blockId = block.id - } - - // Add explicit streaming flag to make it easier to identify streaming executions - executionData.isStreaming = true - } - - // Return both the stream and the execution data as separate properties - const streamingExecution: StreamingExecution = { - stream: response.body, - execution: executionData, - } - return streamingExecution - } catch (error) { - logger.error(`Error parsing execution data header: ${error}`) - // Continue with just the stream if there's an error - } - } - - // No execution data in header, just return the stream - // Create a minimal StreamingExecution with empty execution data - const minimalExecution: StreamingExecution = { - stream: response.body, - execution: { - success: true, - output: { response: {} }, - logs: [], - metadata: { - duration: 0, - startTime: new Date().toISOString(), - }, - }, - } - return minimalExecution - } - - // Check if we have a combined response with both stream and execution data - const result = await response.json() - - if (result && typeof result === 'object' && 'stream' in result && 'execution' in result) { - logger.info(`Received combined streaming response for block ${block.id}`) - - // Get the stream as a ReadableStream (need to convert from serialized format) - const stream = new ReadableStream({ - start(controller) { - // Since stream was serialized as JSON, we need to reconstruct it - // For now, we'll just use a placeholder message - const encoder = new TextEncoder() - controller.enqueue( - encoder.encode( - 'Stream data cannot be serialized as JSON. You will need to return a proper stream.' - ) - ) - controller.close() - }, - }) - - // Return both in a format the executor can handle - const streamingExecution: StreamingExecution = { - stream, - execution: result.execution, - } - return streamingExecution - } - - logger.info('Provider response received', { - contentLength: result.content ? result.content.length : 0, - model: result.model, - hasTokens: !!result.tokens, - hasToolCalls: !!result.toolCalls, - toolCallsCount: result.toolCalls?.length || 0, - }) - - // If structured responses, try to parse the content - if (responseFormat) { - try { - const parsedContent = JSON.parse(result.content) - - const responseResult = { - response: { - ...parsedContent, - tokens: result.tokens || { - prompt: 0, - completion: 0, - total: 0, - }, - toolCalls: result.toolCalls - ? { - list: result.toolCalls.map((tc: any) => ({ - ...tc, - // Strip the 'custom_' prefix from tool names for display - name: stripCustomToolPrefix(tc.name), - // Preserve timing information if available - startTime: tc.startTime, - endTime: tc.endTime, - duration: tc.duration, - input: tc.arguments || tc.input, - output: tc.result || tc.output, - })), - count: result.toolCalls.length, - } - : undefined, - providerTiming: result.timing || undefined, - cost: result.cost || undefined, - }, - } - - return responseResult - } catch (error) { - logger.error('Failed to parse response content:', { error }) - logger.info('Falling back to standard response format') - - // Fall back to standard response if parsing fails - return { - response: { - content: result.content, - model: result.model, - tokens: result.tokens || { - prompt: 0, - completion: 0, - total: 0, - }, - toolCalls: { - list: result.toolCalls - ? result.toolCalls.map((tc: any) => ({ - ...tc, - // Strip the 'custom_' prefix from tool names for display - name: stripCustomToolPrefix(tc.name), - // Preserve timing information if available - startTime: tc.startTime, - endTime: tc.endTime, - duration: tc.duration, - input: tc.arguments || tc.input, - output: tc.result || tc.output, - })) - : [], - count: result.toolCalls?.length || 0, - }, - providerTiming: result.timing || undefined, - cost: result.cost || undefined, - }, - } - } - } - - // Return standard response if no responseFormat - return { - response: { - content: result.content, - model: result.model, - tokens: result.tokens || { - prompt: 0, - completion: 0, - total: 0, - }, - toolCalls: { - list: result.toolCalls - ? result.toolCalls.map((tc: any) => ({ - ...tc, - // Strip the 'custom_' prefix from tool names for display - name: stripCustomToolPrefix(tc.name), - // Preserve timing information if available - startTime: tc.startTime, - endTime: tc.endTime, - duration: tc.duration, - input: tc.arguments || tc.input, - output: tc.result || tc.output, - })) - : [], - count: result.toolCalls?.length || 0, - }, - providerTiming: result.timing || undefined, - cost: result.cost || undefined, - }, + if (!isBrowser) { + return this.executeServerSide( + providerRequest, + providerId, + model, + block, + responseFormat, + context, + providerStartTime + ) } + return this.executeBrowserSide( + providerRequest, + block, + responseFormat, + context, + providerStartTime + ) } catch (error) { - logger.error('Error executing provider request:', { error }) - - // Enhanced error logging for different error types - if (error instanceof Error) { - logger.error('Provider request error details', { - workflowId: context.workflowId, - blockId: block.id, - errorName: error.name, - errorMessage: error.message, - errorStack: error.stack, - url: url.toString(), - timestamp: new Date().toISOString(), - }) - - // Check for specific error types - if (error.name === 'AbortError') { - logger.error('Request timed out after 2 minutes', { - workflowId: context.workflowId, - blockId: block.id, - url: url.toString(), - }) - throw new Error('Provider request timed out - the API took too long to respond') - } - if (error.name === 'TypeError' && error.message.includes('fetch')) { - logger.error('Network fetch error - possible connectivity issue', { - workflowId: context.workflowId, - blockId: block.id, - url: url.toString(), - errorMessage: error.message, - }) - throw new Error( - 'Network error - unable to connect to provider API. Please check your internet connection.' - ) - } - if (error.message.includes('ENOTFOUND') || error.message.includes('ECONNREFUSED')) { - logger.error('DNS/Connection error', { - workflowId: context.workflowId, - blockId: block.id, - url: url.toString(), - errorMessage: error.message, - }) - throw new Error('Unable to connect to server - DNS or connection issue') - } - } - + this.handleExecutionError(error, providerStartTime, providerId, model, context, block) throw error } } -} -export function stripCustomToolPrefix(name: string) { - return name.startsWith('custom_') ? name.replace('custom_', '') : name -} + private async executeServerSide( + providerRequest: any, + providerId: string, + model: string, + block: SerializedBlock, + responseFormat: any, + context: ExecutionContext, + providerStartTime: number + ) { + logger.info('Using direct provider execution (server environment)') -/** - * Helper function to process memories and convert them to message format - */ -function processMemories(memories: any, logger: any): any[] { - const messages: any[] = [] + const finalApiKey = this.getApiKey(providerId, model, providerRequest.apiKey) - if (!memories) { - return messages + const response = await executeProviderRequest(providerId, { + model, + systemPrompt: 'systemPrompt' in providerRequest ? providerRequest.systemPrompt : undefined, + context: 'context' in providerRequest ? providerRequest.context : undefined, + tools: providerRequest.tools, + temperature: providerRequest.temperature, + maxTokens: providerRequest.maxTokens, + apiKey: finalApiKey, + responseFormat: providerRequest.responseFormat, + workflowId: providerRequest.workflowId, + stream: providerRequest.stream, + messages: 'messages' in providerRequest ? providerRequest.messages : undefined, + environmentVariables: context.environmentVariables || {}, + }) + + this.logExecutionSuccess(providerId, model, context, block, providerStartTime, response) + return this.processProviderResponse(response, block, responseFormat) } - let memoryArray: any[] = [] + private async executeBrowserSide( + providerRequest: any, + block: SerializedBlock, + responseFormat: any, + context: ExecutionContext, + providerStartTime: number + ) { + logger.info('Using HTTP provider request (browser environment)') - // Handle different memory input formats - if (memories?.response?.memories && Array.isArray(memories.response.memories)) { - // Memory block output format: { response: { memories: [...] } } - memoryArray = memories.response.memories - } else if (memories?.memories && Array.isArray(memories.memories)) { - // Direct memory output format: { memories: [...] } - memoryArray = memories.memories - } else if (Array.isArray(memories)) { - // Direct array of messages: [{ role, content }, ...] - memoryArray = memories - } else { - logger.warn('Unexpected memories format', { memories }) - return messages - } + const url = new URL('/api/providers', env.NEXT_PUBLIC_APP_URL || '') + const response = await fetch(url.toString(), { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify(providerRequest), + signal: AbortSignal.timeout(REQUEST_TIMEOUT), + }) - // Process the memory array - memoryArray.forEach((memory: any) => { - if (memory.data && Array.isArray(memory.data)) { - // Memory object with data array: { key, type, data: [{ role, content }, ...] } - memory.data.forEach((msg: any) => { - if (msg.role && msg.content) { - messages.push({ - role: msg.role, - content: msg.content, - }) - } - }) - } else if (memory.role && memory.content) { - // Direct message object: { role, content } - messages.push({ - role: memory.role, - content: memory.content, - }) + if (!response.ok) { + const errorMessage = await this.extractErrorMessage(response) + throw new Error(errorMessage) } - }) - return messages + this.logExecutionSuccess( + providerRequest.provider, + providerRequest.model, + context, + block, + providerStartTime, + 'HTTP response' + ) + + // Check if this is a streaming response + const contentType = response.headers.get('Content-Type') + if (contentType?.includes('text/event-stream')) { + // Handle streaming response + return this.handleStreamingResponse(response, block) + } + + // Handle regular JSON response + const result = await response.json() + return this.processProviderResponse(result, block, responseFormat) + } + + private async handleStreamingResponse( + response: Response, + block: SerializedBlock + ): Promise { + // Check if we have execution data in headers (from StreamingExecution) + const executionDataHeader = response.headers.get('X-Execution-Data') + + if (executionDataHeader) { + // Parse execution data from header + try { + const executionData = JSON.parse(executionDataHeader) + + // Create StreamingExecution object + return { + stream: response.body!, + execution: { + success: executionData.success, + output: executionData.output || { response: {} }, + error: executionData.error, + logs: [], // Logs are stripped from headers, will be populated by executor + metadata: executionData.metadata || { + duration: 0, + startTime: new Date().toISOString(), + }, + isStreaming: true, + blockId: block.id, + blockName: block.metadata?.name, + blockType: block.metadata?.id, + } as any, + } + } catch (error) { + logger.error('Failed to parse execution data from header:', error) + // Fall back to minimal streaming execution + } + } + + // Fallback for plain ReadableStream or when header parsing fails + return this.createMinimalStreamingExecution(response.body!) + } + + private getApiKey(providerId: string, model: string, inputApiKey: string): string { + try { + return getApiKey(providerId, model, inputApiKey) + } catch (error) { + logger.error('Failed to get API key:', { + provider: providerId, + model, + error: error instanceof Error ? error.message : String(error), + hasProvidedApiKey: !!inputApiKey, + }) + throw new Error(error instanceof Error ? error.message : 'API key error') + } + } + + private async extractErrorMessage(response: Response): Promise { + let errorMessage = `Provider API request failed with status ${response.status}` + try { + const errorData = await response.json() + if (errorData.error) { + errorMessage = errorData.error + } + } catch (_e) { + // Use default message if JSON parsing fails + } + return errorMessage + } + + private logExecutionSuccess( + provider: string, + model: string, + context: ExecutionContext, + block: SerializedBlock, + startTime: number, + response: any + ) { + const executionTime = Date.now() - startTime + const responseType = + response instanceof ReadableStream + ? 'stream' + : response && typeof response === 'object' && 'stream' in response + ? 'streaming-execution' + : 'json' + + logger.info('Provider request completed successfully', { + provider, + model, + workflowId: context.workflowId, + blockId: block.id, + executionTime, + responseType, + }) + } + + private handleExecutionError( + error: any, + startTime: number, + provider: string, + model: string, + context: ExecutionContext, + block: SerializedBlock + ) { + const executionTime = Date.now() - startTime + + logger.error('Error executing provider request:', { + error, + executionTime, + provider, + model, + workflowId: context.workflowId, + blockId: block.id, + }) + + if (!(error instanceof Error)) return + + logger.error('Provider request error details', { + workflowId: context.workflowId, + blockId: block.id, + errorName: error.name, + errorMessage: error.message, + errorStack: error.stack, + timestamp: new Date().toISOString(), + }) + + if (error.name === 'AbortError') { + throw new Error('Provider request timed out - the API took too long to respond') + } + if (error.name === 'TypeError' && error.message.includes('fetch')) { + throw new Error( + 'Network error - unable to connect to provider API. Please check your internet connection.' + ) + } + if (error.message.includes('ENOTFOUND') || error.message.includes('ECONNREFUSED')) { + throw new Error('Unable to connect to server - DNS or connection issue') + } + } + + private processProviderResponse( + response: any, + block: SerializedBlock, + responseFormat: any + ): BlockOutput | StreamingExecution { + if (this.isStreamingExecution(response)) { + return this.processStreamingExecution(response, block) + } + + if (response instanceof ReadableStream) { + return this.createMinimalStreamingExecution(response) + } + + return this.processRegularResponse(response, responseFormat) + } + + private isStreamingExecution(response: any): boolean { + return ( + response && typeof response === 'object' && 'stream' in response && 'execution' in response + ) + } + + private processStreamingExecution( + response: StreamingExecution, + block: SerializedBlock + ): StreamingExecution { + const streamingExec = response as StreamingExecution + logger.info(`Received StreamingExecution for block ${block.id}`) + + if (streamingExec.execution.output?.response) { + const execution = streamingExec.execution as any + if (block.metadata?.name) execution.blockName = block.metadata.name + if (block.metadata?.id) execution.blockType = block.metadata.id + execution.blockId = block.id + execution.isStreaming = true + } + + return streamingExec + } + + private createMinimalStreamingExecution(stream: ReadableStream): StreamingExecution { + return { + stream, + execution: { + success: true, + output: { response: {} }, + logs: [], + metadata: { + duration: 0, + startTime: new Date().toISOString(), + }, + }, + } + } + + private processRegularResponse(result: any, responseFormat: any): BlockOutput { + logger.info('Provider response received', { + contentLength: result.content ? result.content.length : 0, + model: result.model, + hasTokens: !!result.tokens, + hasToolCalls: !!result.toolCalls, + toolCallsCount: result.toolCalls?.length || 0, + }) + + if (responseFormat) { + return this.processStructuredResponse(result, responseFormat) + } + + return this.processStandardResponse(result) + } + + private processStructuredResponse(result: any, responseFormat: any): BlockOutput { + try { + const parsedContent = JSON.parse(result.content) + return { + response: { + ...parsedContent, + ...this.createResponseMetadata(result), + }, + } + } catch (error) { + logger.error('Failed to parse response content:', { error }) + return this.processStandardResponse(result) + } + } + + private processStandardResponse(result: any): BlockOutput { + return { + response: { + content: result.content, + model: result.model, + ...this.createResponseMetadata(result), + }, + } + } + + private createResponseMetadata(result: any) { + return { + tokens: result.tokens || { prompt: 0, completion: 0, total: 0 }, + toolCalls: { + list: result.toolCalls ? result.toolCalls.map(this.formatToolCall.bind(this)) : [], + count: result.toolCalls?.length || 0, + }, + providerTiming: result.timing, + cost: result.cost, + } + } + + private formatToolCall(tc: any) { + return { + ...tc, + name: this.stripCustomToolPrefix(tc.name), + startTime: tc.startTime, + endTime: tc.endTime, + duration: tc.duration, + input: tc.arguments || tc.input, + output: tc.result || tc.output, + } + } + + private stripCustomToolPrefix(name: string): string { + return name.startsWith('custom_') ? name.replace('custom_', '') : name + } } diff --git a/apps/sim/executor/handlers/agent/types.ts b/apps/sim/executor/handlers/agent/types.ts new file mode 100644 index 000000000..5658223b1 --- /dev/null +++ b/apps/sim/executor/handlers/agent/types.ts @@ -0,0 +1,35 @@ +export interface AgentInputs { + model?: string + responseFormat?: string | object + tools?: ToolInput[] + systemPrompt?: string + userPrompt?: string | object + memories?: any + temperature?: number + maxTokens?: number + apiKey?: string +} + +export interface ToolInput { + type?: string + schema?: any + title?: string + code?: string + params?: Record + timeout?: number + usageControl?: 'auto' | 'force' | 'none' + operation?: string +} + +export interface Message { + role: 'system' | 'user' | 'assistant' + content: string + function_call?: any + tool_calls?: any[] +} + +export interface StreamingConfig { + shouldUseStreaming: boolean + isBlockSelectedForOutput: boolean + hasOutgoingConnections: boolean +} diff --git a/apps/sim/executor/handlers/function/function-handler.test.ts b/apps/sim/executor/handlers/function/function-handler.test.ts index 87f89ecd7..92d2ba7d4 100644 --- a/apps/sim/executor/handlers/function/function-handler.test.ts +++ b/apps/sim/executor/handlers/function/function-handler.test.ts @@ -1,5 +1,3 @@ -import '../../__test-utils__/mock-dependencies' - import { beforeEach, describe, expect, it, type Mock, vi } from 'vitest' import type { BlockOutput } from '@/blocks/types' import type { SerializedBlock } from '@/serializer/types' @@ -7,6 +5,19 @@ import { executeTool } from '@/tools' import type { ExecutionContext } from '../../types' import { FunctionBlockHandler } from './function-handler' +vi.mock('@/lib/logs/console-logger', () => ({ + createLogger: vi.fn(() => ({ + info: vi.fn(), + error: vi.fn(), + warn: vi.fn(), + debug: vi.fn(), + })), +})) + +vi.mock('@/tools', () => ({ + executeTool: vi.fn(), +})) + const mockExecuteTool = executeTool as Mock describe('FunctionBlockHandler', () => { @@ -58,10 +69,14 @@ describe('FunctionBlockHandler', () => { const inputs = { code: 'console.log("Hello"); return 1 + 1;', timeout: 10000, + envVars: {}, + isCustomTool: false, + workflowId: undefined, } const expectedToolParams = { code: inputs.code, timeout: inputs.timeout, + envVars: {}, _context: { workflowId: mockContext.workflowId }, } const expectedOutput: BlockOutput = { response: { result: 'Success' } } @@ -76,11 +91,15 @@ describe('FunctionBlockHandler', () => { const inputs = { code: [{ content: 'const x = 5;' }, { content: 'return x * 2;' }], timeout: 5000, + envVars: {}, + isCustomTool: false, + workflowId: undefined, } const expectedCode = 'const x = 5;\nreturn x * 2;' const expectedToolParams = { code: expectedCode, timeout: inputs.timeout, + envVars: {}, _context: { workflowId: mockContext.workflowId }, } const expectedOutput: BlockOutput = { response: { result: 'Success' } } @@ -96,6 +115,7 @@ describe('FunctionBlockHandler', () => { const expectedToolParams = { code: inputs.code, timeout: 5000, // Default timeout + envVars: {}, _context: { workflowId: mockContext.workflowId }, } diff --git a/apps/sim/executor/handlers/function/function-handler.ts b/apps/sim/executor/handlers/function/function-handler.ts index d0d7e42e4..c63c80d2f 100644 --- a/apps/sim/executor/handlers/function/function-handler.ts +++ b/apps/sim/executor/handlers/function/function-handler.ts @@ -28,6 +28,7 @@ export class FunctionBlockHandler implements BlockHandler { const result = await executeTool('function_execute', { code: codeContent, timeout: inputs.timeout || 5000, + envVars: context.environmentVariables || {}, _context: { workflowId: context.workflowId }, }) diff --git a/apps/sim/providers/anthropic/index.ts b/apps/sim/providers/anthropic/index.ts index 4a626e48f..2181a3f06 100644 --- a/apps/sim/providers/anthropic/index.ts +++ b/apps/sim/providers/anthropic/index.ts @@ -459,6 +459,7 @@ ${fieldDescriptions} ...tool.params, ...toolArgs, ...(request.workflowId ? { _context: { workflowId: request.workflowId } } : {}), + ...(request.environmentVariables ? { envVars: request.environmentVariables } : {}), } const result = await executeTool(toolName, mergedArgs, true) const toolCallEndTime = Date.now() diff --git a/apps/sim/providers/deepseek/index.ts b/apps/sim/providers/deepseek/index.ts index 0f3e06b05..cfeb4813f 100644 --- a/apps/sim/providers/deepseek/index.ts +++ b/apps/sim/providers/deepseek/index.ts @@ -288,6 +288,7 @@ export const deepseekProvider: ProviderConfig = { ...tool.params, ...toolArgs, ...(request.workflowId ? { _context: { workflowId: request.workflowId } } : {}), + ...(request.environmentVariables ? { envVars: request.environmentVariables } : {}), } const result = await executeTool(toolName, mergedArgs, true) const toolCallEndTime = Date.now() diff --git a/apps/sim/providers/google/index.ts b/apps/sim/providers/google/index.ts index 08f943a0b..944cd0c10 100644 --- a/apps/sim/providers/google/index.ts +++ b/apps/sim/providers/google/index.ts @@ -368,6 +368,7 @@ export const googleProvider: ProviderConfig = { ...toolArgs, // Arguments from the model's function call ...requiredToolCallParams, // Required parameters from the tool definition (take precedence) ...(request.workflowId ? { _context: { workflowId: request.workflowId } } : {}), + ...(request.environmentVariables ? { envVars: request.environmentVariables } : {}), } // For debugging only - don't log actual API keys diff --git a/apps/sim/providers/openai/index.ts b/apps/sim/providers/openai/index.ts index 307481aa5..ece64fd4c 100644 --- a/apps/sim/providers/openai/index.ts +++ b/apps/sim/providers/openai/index.ts @@ -360,6 +360,7 @@ export const openaiProvider: ProviderConfig = { ...tool.params, ...toolArgs, ...(request.workflowId ? { _context: { workflowId: request.workflowId } } : {}), + ...(request.environmentVariables ? { envVars: request.environmentVariables } : {}), } const result = await executeTool(toolName, mergedArgs, true) diff --git a/apps/sim/providers/types.ts b/apps/sim/providers/types.ts index 49e2da61e..1be7cad57 100644 --- a/apps/sim/providers/types.ts +++ b/apps/sim/providers/types.ts @@ -147,6 +147,7 @@ export interface ProviderRequest { local_execution?: boolean workflowId?: string // Optional workflow ID for authentication context stream?: boolean + environmentVariables?: Record // Environment variables for tool execution } // Map of provider IDs to their configurations diff --git a/apps/sim/stores/custom-tools/store.ts b/apps/sim/stores/custom-tools/store.ts index 96ba92066..760532153 100644 --- a/apps/sim/stores/custom-tools/store.ts +++ b/apps/sim/stores/custom-tools/store.ts @@ -32,27 +32,34 @@ export const useCustomToolsStore = create()( throw new Error('Invalid response format') } - // Validate each tool object's structure before processing - data.forEach((tool, index) => { + // Filter and validate tools, skipping invalid ones instead of throwing errors + const validTools = data.filter((tool, index) => { if (!tool || typeof tool !== 'object') { - throw new Error(`Invalid tool format at index ${index}: not an object`) + logger.warn(`Skipping invalid tool at index ${index}: not an object`) + return false } if (!tool.id || typeof tool.id !== 'string') { - throw new Error(`Invalid tool format at index ${index}: missing or invalid id`) + logger.warn(`Skipping invalid tool at index ${index}: missing or invalid id`) + return false } if (!tool.title || typeof tool.title !== 'string') { - throw new Error(`Invalid tool format at index ${index}: missing or invalid title`) + logger.warn(`Skipping invalid tool at index ${index}: missing or invalid title`) + return false } if (!tool.schema || typeof tool.schema !== 'object') { - throw new Error(`Invalid tool format at index ${index}: missing or invalid schema`) + logger.warn(`Skipping invalid tool at index ${index}: missing or invalid schema`) + return false } + // Make code field optional - default to empty string if missing if (!tool.code || typeof tool.code !== 'string') { - throw new Error(`Invalid tool format at index ${index}: missing or invalid code`) + logger.warn(`Tool at index ${index} missing code field, defaulting to empty string`) + tool.code = '' } + return true }) // Transform to local format and set - const transformedTools = data.reduce( + const transformedTools = validTools.reduce( (acc, tool) => ({ ...acc, [tool.id]: tool, @@ -60,8 +67,6 @@ export const useCustomToolsStore = create()( {} ) - logger.info(`Loaded ${data.length} custom tools from server`) - set({ tools: transformedTools, isLoading: false, @@ -72,12 +77,6 @@ export const useCustomToolsStore = create()( error: error instanceof Error ? error.message : 'Unknown error', isLoading: false, }) - - // Add a delay before reloading to prevent race conditions - setTimeout(() => { - // Reload from server to ensure consistency - get().loadCustomTools() - }, 500) } }, @@ -121,21 +120,12 @@ export const useCustomToolsStore = create()( set({ isLoading: false }) logger.info('Successfully synced custom tools with server') - - // Load from server to ensure consistency even after successful sync - get().loadCustomTools() } catch (error) { logger.error('Error syncing custom tools:', error) set({ error: error instanceof Error ? error.message : 'Unknown error', isLoading: false, }) - - // Add a delay before reloading to prevent race conditions - setTimeout(() => { - // Reload from server to ensure consistency - get().loadCustomTools() - }, 500) } }, diff --git a/apps/sim/tools/function/execute.test.ts b/apps/sim/tools/function/execute.test.ts index 7ac8d7851..80e670d84 100644 --- a/apps/sim/tools/function/execute.test.ts +++ b/apps/sim/tools/function/execute.test.ts @@ -41,12 +41,18 @@ describe('Function Execute Tool', () => { test('should format single string code correctly', () => { const body = tester.getRequestBody({ code: 'return 42', + envVars: {}, + isCustomTool: false, timeout: 5000, + workflowId: undefined, }) expect(body).toEqual({ code: 'return 42', + envVars: {}, + isCustomTool: false, timeout: 5000, + workflowId: undefined, }) }) @@ -57,11 +63,18 @@ describe('Function Execute Tool', () => { { content: 'const y = 2;', id: 'block2' }, { content: 'return x + y;', id: 'block3' }, ], + envVars: {}, + isCustomTool: false, + timeout: 10000, + workflowId: undefined, }) expect(body).toEqual({ code: 'const x = 40;\nconst y = 2;\nreturn x + y;', timeout: 10000, + envVars: {}, + isCustomTool: false, + workflowId: undefined, }) }) @@ -73,6 +86,9 @@ describe('Function Execute Tool', () => { expect(body).toEqual({ code: 'return 42', timeout: 10000, + envVars: {}, + isCustomTool: false, + workflowId: undefined, }) }) }) diff --git a/apps/sim/tools/function/execute.ts b/apps/sim/tools/function/execute.ts index 6994b50ca..e98c00e67 100644 --- a/apps/sim/tools/function/execute.ts +++ b/apps/sim/tools/function/execute.ts @@ -22,6 +22,12 @@ export const functionExecuteTool: ToolConfig | string timeout?: number memoryLimit?: number + envVars?: Record + _context?: { + workflowId?: string + } + isCustomTool?: boolean } export interface CodeExecutionOutput extends ToolResponse { diff --git a/apps/sim/tools/index.ts b/apps/sim/tools/index.ts index f66903280..4a7df371e 100644 --- a/apps/sim/tools/index.ts +++ b/apps/sim/tools/index.ts @@ -495,7 +495,7 @@ function validateClientSideParams( } // Internal parameters that should be excluded from validation - const internalParamSet = new Set(['_context', 'workflowId']) + const internalParamSet = new Set(['_context', 'workflowId', 'envVars']) // Check required parameters if (schema.required) { diff --git a/apps/sim/tools/utils.ts b/apps/sim/tools/utils.ts index a6244c855..88d86494c 100644 --- a/apps/sim/tools/utils.ts +++ b/apps/sim/tools/utils.ts @@ -249,8 +249,11 @@ export function createCustomToolRequestBody( getStore?: () => any ) { return (params: Record) => { - // Get environment variables - empty on server, from store on client - const envVars = isClient ? getClientEnvVars(getStore) : {} + // Get environment variables - try multiple sources in order of preference: + // 1. envVars parameter (passed from provider/agent context) + // 2. Client-side store (if running in browser) + // 3. Empty object (fallback) + const envVars = params.envVars || (isClient ? getClientEnvVars(getStore) : {}) // Include everything needed for execution return {