Compare commits

..

11 Commits

Author SHA1 Message Date
Waleed Latif
3b982533d1 v0.2.5: feat, improvement, fix (#595) (#603)
* feat(function): added more granular error logs for function execution for easier debugging (#593)

* added more granular error logs for function execution

* added tests

* fixed syntax error reporting

* feat(models): added temp controls for gpt-4.1 family of models (#594)

* improvement(knowledge-upload): create and upload document to KB (#579)

* improvement: added knowledge upload

* improvement: added greptile comments (#579)

* improvement: changed to text to doc (#579)

* improvement: removed comment (#579)

* added input validation, tested persistence of KB selector

* update docs

---------

Co-authored-by: Adam Gough <adamgough@Mac.attlocal.net>
Co-authored-by: Waleed Latif <walif6@gmail.com>

* fix(remove workflow.state usage): no more usage of deprecated state column in any routes (#586)

* fix(remove workflow.state usage): no more usage of deprecated state col in routes

* fix lint

* fix chat route to only use deployed state

* fix lint

* better typing

* remove useless logs

* fix lint

* restore workflow handler file

* removed all other usages of deprecated 'state' column from workflows table, updated tests

---------

Co-authored-by: Vikhyath Mondreti <vikhyathmondreti@Vikhyaths-MacBook-Air.local>
Co-authored-by: Waleed Latif <walif6@gmail.com>

* fix(doc-selector-kb): enable doc selector when kb is selected (#596)

Co-authored-by: Vikhyath Mondreti <vikhyathmondreti@vikhyaths-air.lan>

* fix(unload): remove beforeunload warning since we communicate via wss (#597)

* fix(executor): fix dependency resolution, allow blocks with multiple inputs to execute (#598)

* feat(billing): added migrations for usage-based billing (#601)

* feat(billing): added migrations for usage-based billing

* lint

* lint

* feat(logging): add new schemas + types for new logging system (#599)

* feat(logging): add new schemas + types for logging

* fix lint

* update migration

* fix lint

* Remove migration 48 to avoid conflict with staging

* fixed merge conflict

* fix lint

---------

Co-authored-by: Vikhyath Mondreti <vikhyathmondreti@Vikhyaths-Air.attlocal.net>

---------

Co-authored-by: Adam Gough <77861281+aadamgough@users.noreply.github.com>
Co-authored-by: Adam Gough <adamgough@Mac.attlocal.net>
Co-authored-by: Vikhyath Mondreti <vikhyathvikku@gmail.com>
Co-authored-by: Vikhyath Mondreti <vikhyathmondreti@Vikhyaths-MacBook-Air.local>
Co-authored-by: Vikhyath Mondreti <vikhyathmondreti@vikhyaths-air.lan>
Co-authored-by: Vikhyath Mondreti <vikhyathmondreti@Vikhyaths-Air.attlocal.net>
2025-07-02 08:40:41 -07:00
Vikhyath Mondreti
1604ce4d7c v0.2.4: feat, improvement, fix (#595)
* feat(function): added more granular error logs for function execution for easier debugging (#593)

* added more granular error logs for function execution

* added tests

* fixed syntax error reporting

* feat(models): added temp controls for gpt-4.1 family of models (#594)

* improvement(knowledge-upload): create and upload document to KB (#579)

* improvement: added knowledge upload

* improvement: added greptile comments (#579)

* improvement: changed to text to doc (#579)

* improvement: removed comment (#579)

* added input validation, tested persistence of KB selector

* update docs

---------

Co-authored-by: Adam Gough <adamgough@Mac.attlocal.net>
Co-authored-by: Waleed Latif <walif6@gmail.com>

* fix(remove workflow.state usage): no more usage of deprecated state column in any routes (#586)

* fix(remove workflow.state usage): no more usage of deprecated state col in routes

* fix lint

* fix chat route to only use deployed state

* fix lint

* better typing

* remove useless logs

* fix lint

* restore workflow handler file

* removed all other usages of deprecated 'state' column from workflows table, updated tests

---------

Co-authored-by: Vikhyath Mondreti <vikhyathmondreti@Vikhyaths-MacBook-Air.local>
Co-authored-by: Waleed Latif <walif6@gmail.com>

---------

Co-authored-by: Waleed Latif <walif6@gmail.com>
Co-authored-by: Adam Gough <77861281+aadamgough@users.noreply.github.com>
Co-authored-by: Adam Gough <adamgough@Mac.attlocal.net>
Co-authored-by: Vikhyath Mondreti <vikhyathmondreti@Vikhyaths-MacBook-Air.local>
2025-06-30 19:23:37 -07:00
Vikhyath Mondreti
86168f1a87 fix(dep): dependency for useEffect missing 2025-06-30 18:10:20 -07:00
Vikhyath Mondreti
5d7fc5382c fix lint 2025-06-30 17:53:59 -07:00
Vikhyath Mondreti
7a5aeadbb7 fix(knowledge base): selector infinite render 2025-06-30 17:53:51 -07:00
Vikhyath Mondreti
f4e627a9f7 v0.2.3: fix (#592)
* fix(variable resolution): use variable references to not have escaping issues (#587)

* fix(variable-resolution): don't inject stringified json, use var refs

* fix lint

* remove unused var

---------

Co-authored-by: Vikhyath Mondreti <vikhyathmondreti@vikhyaths-air.lan>

* fix(subblock updates): special selectors persistence (#591)

* fix(knowledge-base-selector): should trigger sockets event for persistence

* fix subblock value updates for non useSubblockValue components

* fix lint

---------

Co-authored-by: Vikhyath Mondreti <vikhyathmondreti@Vikhyaths-MacBook-Air.local>

* fix(race-cond): for auto-connect rare race condition between adding edge + block (#582)

* auto connect race condition

* fix lint

* Update apps/sim/hooks/use-collaborative-workflow.ts

Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>

* Update apps/sim/app/workspace/[workspaceId]/w/[workflowId]/workflow.tsx

Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>

* fix lint

---------

Co-authored-by: Vikhyath Mondreti <vikhyathmondreti@Vikhyaths-Air.attlocal.net>
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
Co-authored-by: Vikhyath Mondreti <vikhyathmondreti@Vikhyaths-MacBook-Air.local>

---------

Co-authored-by: Vikhyath Mondreti <vikhyathmondreti@vikhyaths-air.lan>
Co-authored-by: Vikhyath Mondreti <vikhyathmondreti@Vikhyaths-MacBook-Air.local>
Co-authored-by: Vikhyath Mondreti <vikhyathmondreti@Vikhyaths-Air.attlocal.net>
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
2025-06-30 16:57:48 -07:00
Vikhyath Mondreti
b0c1547198 fix test failure 2025-06-30 15:52:26 -07:00
Vikhyath Mondreti
d19632aec3 fix typing issue 2025-06-30 14:01:21 -07:00
Vikhyath Mondreti
35ac68f579 fix(func var resolution): variable ref codepath triggered - lint fixed 2025-06-30 13:55:12 -07:00
Vikhyath Mondreti
9c14f5f8fc fix(func var resolution): variable ref codepath triggered 2025-06-30 13:54:53 -07:00
Vikhyath Mondreti
d50db1d3fb add dot check 2025-06-30 12:45:36 -07:00
52 changed files with 10942 additions and 504 deletions

View File

@@ -49,7 +49,7 @@ In Sim Studio, the Knowledge Base block enables your agents to perform intellige
## Usage Instructions
Perform semantic vector search across one or more knowledge bases or upload new chunks to documents. Uses advanced AI embeddings to understand meaning and context for search operations.
Perform semantic vector search across knowledge bases, upload individual chunks to existing documents, or create new documents from text content. Uses advanced AI embeddings to understand meaning and context for search operations.
@@ -100,6 +100,25 @@ Upload a new chunk to a document in a knowledge base
| `createdAt` | string |
| `updatedAt` | string |
### `knowledge_create_document`
Create a new document in a knowledge base
#### Input
| Parameter | Type | Required | Description |
| --------- | ---- | -------- | ----------- |
| `knowledgeBaseId` | string | Yes | ID of the knowledge base containing the document |
| `name` | string | Yes | Name of the document |
| `content` | string | Yes | Content of the document |
#### Output
| Parameter | Type |
| --------- | ---- |
| `data` | string |
| `name` | string |
## Block Configuration

View File

@@ -292,12 +292,12 @@ export async function executeWorkflowForChat(
logger.debug(`[${requestId}] Using ${outputBlockIds.length} output blocks for extraction`)
// Find the workflow
// Find the workflow (deployedState is NOT deprecated - needed for chat execution)
const workflowResult = await db
.select({
state: workflow.state,
deployedState: workflow.deployedState,
isDeployed: workflow.isDeployed,
deployedState: workflow.deployedState,
variables: workflow.variables,
})
.from(workflow)
.where(eq(workflow.id, workflowId))
@@ -308,9 +308,14 @@ export async function executeWorkflowForChat(
throw new Error('Workflow not available')
}
// Use deployed state for execution
const state = workflowResult[0].deployedState || workflowResult[0].state
const { blocks, edges, loops, parallels } = state as WorkflowState
// For chat execution, use ONLY the deployed state (no fallback)
if (!workflowResult[0].deployedState) {
throw new Error(`Workflow must be deployed to be available for chat`)
}
// Use deployed state for chat execution (this is the stable, deployed version)
const deployedState = workflowResult[0].deployedState as WorkflowState
const { blocks, edges, loops, parallels } = deployedState
// Prepare for execution, similar to use-workflow-execution.ts
const mergedStates = mergeSubblockState(blocks)
@@ -344,16 +349,13 @@ export async function executeWorkflowForChat(
logger.warn(`[${requestId}] Could not fetch environment variables:`, error)
}
// Get workflow variables
let workflowVariables = {}
try {
// The workflow state may contain variables
const workflowState = state as any
if (workflowState.variables) {
if (workflowResult[0].variables) {
workflowVariables =
typeof workflowState.variables === 'string'
? JSON.parse(workflowState.variables)
: workflowState.variables
typeof workflowResult[0].variables === 'string'
? JSON.parse(workflowResult[0].variables)
: workflowResult[0].variables
}
} catch (error) {
logger.warn(`[${requestId}] Could not parse workflow variables:`, error)

View File

@@ -391,6 +391,225 @@ describe('Function Execute API Route', () => {
})
})
describe('Enhanced Error Handling', () => {
it('should provide detailed syntax error with line content', async () => {
// Mock VM Script to throw a syntax error
const mockScript = vi.fn().mockImplementation(() => {
const error = new Error('Invalid or unexpected token')
error.name = 'SyntaxError'
error.stack = `user-function.js:5
description: "This has a missing closing quote
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
SyntaxError: Invalid or unexpected token
at new Script (node:vm:117:7)
at POST (/path/to/route.ts:123:24)`
throw error
})
vi.doMock('vm', () => ({
createContext: mockCreateContext,
Script: mockScript,
}))
const req = createMockRequest('POST', {
code: 'const obj = {\n name: "test",\n description: "This has a missing closing quote\n};\nreturn obj;',
timeout: 5000,
})
const { POST } = await import('./route')
const response = await POST(req)
const data = await response.json()
expect(response.status).toBe(500)
expect(data.success).toBe(false)
expect(data.error).toContain('Syntax Error')
expect(data.error).toContain('Line 3')
expect(data.error).toContain('description: "This has a missing closing quote')
expect(data.error).toContain('Invalid or unexpected token')
expect(data.error).toContain('(Check for missing quotes, brackets, or semicolons)')
// Check debug information
expect(data.debug).toBeDefined()
expect(data.debug.line).toBe(3)
expect(data.debug.errorType).toBe('SyntaxError')
expect(data.debug.lineContent).toBe('description: "This has a missing closing quote')
})
it('should provide detailed runtime error with line and column', async () => {
// Create the error object first
const runtimeError = new Error("Cannot read properties of null (reading 'someMethod')")
runtimeError.name = 'TypeError'
runtimeError.stack = `TypeError: Cannot read properties of null (reading 'someMethod')
at user-function.js:4:16
at user-function.js:9:3
at Script.runInContext (node:vm:147:14)`
// Mock successful script creation but runtime error
const mockScript = vi.fn().mockImplementation(() => ({
runInContext: vi.fn().mockRejectedValue(runtimeError),
}))
vi.doMock('vm', () => ({
createContext: mockCreateContext,
Script: mockScript,
}))
const req = createMockRequest('POST', {
code: 'const obj = null;\nreturn obj.someMethod();',
timeout: 5000,
})
const { POST } = await import('./route')
const response = await POST(req)
const data = await response.json()
expect(response.status).toBe(500)
expect(data.success).toBe(false)
expect(data.error).toContain('Type Error')
expect(data.error).toContain('Line 2')
expect(data.error).toContain('return obj.someMethod();')
expect(data.error).toContain('Cannot read properties of null')
// Check debug information
expect(data.debug).toBeDefined()
expect(data.debug.line).toBe(2)
expect(data.debug.column).toBe(16)
expect(data.debug.errorType).toBe('TypeError')
expect(data.debug.lineContent).toBe('return obj.someMethod();')
})
it('should handle ReferenceError with enhanced details', async () => {
// Create the error object first
const referenceError = new Error('undefinedVariable is not defined')
referenceError.name = 'ReferenceError'
referenceError.stack = `ReferenceError: undefinedVariable is not defined
at user-function.js:4:8
at Script.runInContext (node:vm:147:14)`
const mockScript = vi.fn().mockImplementation(() => ({
runInContext: vi.fn().mockRejectedValue(referenceError),
}))
vi.doMock('vm', () => ({
createContext: mockCreateContext,
Script: mockScript,
}))
const req = createMockRequest('POST', {
code: 'const x = 42;\nreturn undefinedVariable + x;',
timeout: 5000,
})
const { POST } = await import('./route')
const response = await POST(req)
const data = await response.json()
expect(response.status).toBe(500)
expect(data.success).toBe(false)
expect(data.error).toContain('Reference Error')
expect(data.error).toContain('Line 2')
expect(data.error).toContain('return undefinedVariable + x;')
expect(data.error).toContain('undefinedVariable is not defined')
})
it('should handle errors without line content gracefully', async () => {
const mockScript = vi.fn().mockImplementation(() => {
const error = new Error('Generic error without stack trace')
error.name = 'Error'
// No stack trace
throw error
})
vi.doMock('vm', () => ({
createContext: mockCreateContext,
Script: mockScript,
}))
const req = createMockRequest('POST', {
code: 'return "test";',
timeout: 5000,
})
const { POST } = await import('./route')
const response = await POST(req)
const data = await response.json()
expect(response.status).toBe(500)
expect(data.success).toBe(false)
expect(data.error).toBe('Generic error without stack trace')
// Should still have debug info, but without line details
expect(data.debug).toBeDefined()
expect(data.debug.errorType).toBe('Error')
expect(data.debug.line).toBeUndefined()
expect(data.debug.lineContent).toBeUndefined()
})
it('should extract line numbers from different stack trace formats', async () => {
const mockScript = vi.fn().mockImplementation(() => {
const error = new Error('Test error')
error.name = 'Error'
error.stack = `Error: Test error
at user-function.js:7:25
at async function
at Script.runInContext (node:vm:147:14)`
throw error
})
vi.doMock('vm', () => ({
createContext: mockCreateContext,
Script: mockScript,
}))
const req = createMockRequest('POST', {
code: 'const a = 1;\nconst b = 2;\nconst c = 3;\nconst d = 4;\nreturn a + b + c + d;',
timeout: 5000,
})
const { POST } = await import('./route')
const response = await POST(req)
const data = await response.json()
expect(response.status).toBe(500)
expect(data.success).toBe(false)
// Line 7 in VM should map to line 5 in user code (7 - 3 + 1 = 5)
expect(data.debug.line).toBe(5)
expect(data.debug.column).toBe(25)
expect(data.debug.lineContent).toBe('return a + b + c + d;')
})
it('should provide helpful suggestions for common syntax errors', async () => {
const mockScript = vi.fn().mockImplementation(() => {
const error = new Error('Unexpected end of input')
error.name = 'SyntaxError'
error.stack = 'user-function.js:4\nSyntaxError: Unexpected end of input'
throw error
})
vi.doMock('vm', () => ({
createContext: mockCreateContext,
Script: mockScript,
}))
const req = createMockRequest('POST', {
code: 'const obj = {\n name: "test"\n// Missing closing brace',
timeout: 5000,
})
const { POST } = await import('./route')
const response = await POST(req)
const data = await response.json()
expect(response.status).toBe(500)
expect(data.success).toBe(false)
expect(data.error).toContain('Syntax Error')
expect(data.error).toContain('Unexpected end of input')
expect(data.error).toContain('(Check for missing closing brackets or braces)')
})
})
describe('Utility Functions', () => {
it('should properly escape regex special characters', async () => {
// This tests the escapeRegExp function indirectly

View File

@@ -8,6 +8,210 @@ export const maxDuration = 60
const logger = createLogger('FunctionExecuteAPI')
/**
* Enhanced error information interface
*/
interface EnhancedError {
message: string
line?: number
column?: number
stack?: string
name: string
originalError: any
lineContent?: string
}
/**
* Extract enhanced error information from VM execution errors
*/
function extractEnhancedError(
error: any,
userCodeStartLine: number,
userCode?: string
): EnhancedError {
const enhanced: EnhancedError = {
message: error.message || 'Unknown error',
name: error.name || 'Error',
originalError: error,
}
if (error.stack) {
enhanced.stack = error.stack
// Parse stack trace to extract line and column information
// Handle both compilation errors and runtime errors
const stackLines: string[] = error.stack.split('\n')
for (const line of stackLines) {
// Pattern 1: Compilation errors - "user-function.js:6"
let match = line.match(/user-function\.js:(\d+)(?::(\d+))?/)
// Pattern 2: Runtime errors - "at user-function.js:5:12"
if (!match) {
match = line.match(/at\s+user-function\.js:(\d+):(\d+)/)
}
// Pattern 3: Generic patterns for any line containing our filename
if (!match) {
match = line.match(/user-function\.js:(\d+)(?::(\d+))?/)
}
if (match) {
const stackLine = Number.parseInt(match[1], 10)
const stackColumn = match[2] ? Number.parseInt(match[2], 10) : undefined
// Adjust line number to account for wrapper code
// The user code starts at a specific line in our wrapper
const adjustedLine = stackLine - userCodeStartLine + 1
// Check if this is a syntax error in wrapper code caused by incomplete user code
const isWrapperSyntaxError =
stackLine > userCodeStartLine &&
error.name === 'SyntaxError' &&
(error.message.includes('Unexpected token') ||
error.message.includes('Unexpected end of input'))
if (isWrapperSyntaxError && userCode) {
// Map wrapper syntax errors to the last line of user code
const codeLines = userCode.split('\n')
const lastUserLine = codeLines.length
enhanced.line = lastUserLine
enhanced.column = codeLines[lastUserLine - 1]?.length || 0
enhanced.lineContent = codeLines[lastUserLine - 1]?.trim()
break
}
if (adjustedLine > 0) {
enhanced.line = adjustedLine
enhanced.column = stackColumn
// Extract the actual line content from user code
if (userCode) {
const codeLines = userCode.split('\n')
if (adjustedLine <= codeLines.length) {
enhanced.lineContent = codeLines[adjustedLine - 1]?.trim()
}
}
break
}
if (stackLine <= userCodeStartLine) {
// Error is in wrapper code itself
enhanced.line = stackLine
enhanced.column = stackColumn
break
}
}
}
// Clean up stack trace to show user-relevant information
const cleanedStackLines: string[] = stackLines
.filter(
(line: string) =>
line.includes('user-function.js') ||
(!line.includes('vm.js') && !line.includes('internal/'))
)
.map((line: string) => line.replace(/\s+at\s+/, ' at '))
if (cleanedStackLines.length > 0) {
enhanced.stack = cleanedStackLines.join('\n')
}
}
// Keep original message without adding error type prefix
// The error type will be added later in createUserFriendlyErrorMessage
return enhanced
}
/**
* Create a detailed error message for users
*/
function createUserFriendlyErrorMessage(
enhanced: EnhancedError,
requestId: string,
userCode?: string
): string {
let errorMessage = enhanced.message
// Add line and column information if available
if (enhanced.line !== undefined) {
let lineInfo = `Line ${enhanced.line}${enhanced.column !== undefined ? `:${enhanced.column}` : ''}`
// Add the actual line content if available
if (enhanced.lineContent) {
lineInfo += `: \`${enhanced.lineContent}\``
}
errorMessage = `${lineInfo} - ${errorMessage}`
} else {
// If no line number, try to extract it from stack trace for display
if (enhanced.stack) {
const stackMatch = enhanced.stack.match(/user-function\.js:(\d+)(?::(\d+))?/)
if (stackMatch) {
const line = Number.parseInt(stackMatch[1], 10)
const column = stackMatch[2] ? Number.parseInt(stackMatch[2], 10) : undefined
let lineInfo = `Line ${line}${column ? `:${column}` : ''}`
// Try to get line content if we have userCode
if (userCode) {
const codeLines = userCode.split('\n')
// Note: stackMatch gives us VM line number, need to adjust
// This is a fallback case, so we might not have perfect line mapping
if (line <= codeLines.length) {
const lineContent = codeLines[line - 1]?.trim()
if (lineContent) {
lineInfo += `: \`${lineContent}\``
}
}
}
errorMessage = `${lineInfo} - ${errorMessage}`
}
}
}
// Add error type prefix with consistent naming
if (enhanced.name !== 'Error') {
const errorTypePrefix =
enhanced.name === 'SyntaxError'
? 'Syntax Error'
: enhanced.name === 'TypeError'
? 'Type Error'
: enhanced.name === 'ReferenceError'
? 'Reference Error'
: enhanced.name
// Only add prefix if not already present
if (!errorMessage.toLowerCase().includes(errorTypePrefix.toLowerCase())) {
errorMessage = `${errorTypePrefix}: ${errorMessage}`
}
}
// For syntax errors, provide additional context
if (enhanced.name === 'SyntaxError') {
if (errorMessage.includes('Invalid or unexpected token')) {
errorMessage += ' (Check for missing quotes, brackets, or semicolons)'
} else if (errorMessage.includes('Unexpected end of input')) {
errorMessage += ' (Check for missing closing brackets or braces)'
} else if (errorMessage.includes('Unexpected token')) {
// Check if this might be due to incomplete code
if (
enhanced.lineContent &&
((enhanced.lineContent.includes('(') && !enhanced.lineContent.includes(')')) ||
(enhanced.lineContent.includes('[') && !enhanced.lineContent.includes(']')) ||
(enhanced.lineContent.includes('{') && !enhanced.lineContent.includes('}')))
) {
errorMessage += ' (Check for missing closing parentheses, brackets, or braces)'
} else {
errorMessage += ' (Check your syntax)'
}
}
}
return errorMessage
}
/**
* Resolves environment variables and tags in code
* @param code - Code with variables
@@ -19,7 +223,9 @@ const logger = createLogger('FunctionExecuteAPI')
function resolveCodeVariables(
code: string,
params: Record<string, any>,
envVars: Record<string, string> = {}
envVars: Record<string, string> = {},
blockData: Record<string, any> = {},
blockNameMapping: Record<string, string> = {}
): { resolvedCode: string; contextVariables: Record<string, any> } {
let resolvedCode = code
const contextVariables: Record<string, any> = {}
@@ -39,11 +245,52 @@ function resolveCodeVariables(
resolvedCode = resolvedCode.replace(new RegExp(escapeRegExp(match), 'g'), safeVarName)
}
// Resolve tags with <tag_name> syntax
const tagMatches = resolvedCode.match(/<([a-zA-Z_][a-zA-Z0-9_]*)>/g) || []
// Resolve tags with <tag_name> syntax (including nested paths like <block.response.data>)
const tagMatches = resolvedCode.match(/<([a-zA-Z_][a-zA-Z0-9_.]*[a-zA-Z0-9_])>/g) || []
for (const match of tagMatches) {
const tagName = match.slice(1, -1).trim()
const tagValue = params[tagName] || ''
// Handle nested paths like "getrecord.response.data" or "function1.response.result"
// First try params, then blockData directly, then try with block name mapping
let tagValue = getNestedValue(params, tagName) || getNestedValue(blockData, tagName) || ''
// If not found and the path starts with a block name, try mapping the block name to ID
if (!tagValue && tagName.includes('.')) {
const pathParts = tagName.split('.')
const normalizedBlockName = pathParts[0] // This should already be normalized like "function1"
// Find the block ID by looking for a block name that normalizes to this value
let blockId = null
for (const [blockName, id] of Object.entries(blockNameMapping)) {
// Apply the same normalization logic as the UI: remove spaces and lowercase
const normalizedName = blockName.replace(/\s+/g, '').toLowerCase()
if (normalizedName === normalizedBlockName) {
blockId = id
break
}
}
if (blockId) {
const remainingPath = pathParts.slice(1).join('.')
const fullPath = `${blockId}.${remainingPath}`
tagValue = getNestedValue(blockData, fullPath) || ''
}
}
// If the value is a stringified JSON, parse it back to object
if (
typeof tagValue === 'string' &&
tagValue.length > 100 &&
(tagValue.startsWith('{') || tagValue.startsWith('['))
) {
try {
tagValue = JSON.parse(tagValue)
} catch (e) {
// Keep as string if parsing fails
}
}
// Instead of injecting large JSON directly, create a variable reference
const safeVarName = `__tag_${tagName.replace(/[^a-zA-Z0-9_]/g, '_')}`
@@ -56,6 +303,17 @@ function resolveCodeVariables(
return { resolvedCode, contextVariables }
}
/**
* Get nested value from object using dot notation path
*/
function getNestedValue(obj: any, path: string): any {
if (!obj || !path) return undefined
return path.split('.').reduce((current, key) => {
return current && typeof current === 'object' ? current[key] : undefined
}, obj)
}
/**
* Escape special regex characters in a string
*/
@@ -67,6 +325,8 @@ export async function POST(req: NextRequest) {
const requestId = crypto.randomUUID().slice(0, 8)
const startTime = Date.now()
let stdout = ''
let userCodeStartLine = 3 // Default value for error reporting
let resolvedCode = '' // Store resolved code for error reporting
try {
const body = await req.json()
@@ -76,6 +336,8 @@ export async function POST(req: NextRequest) {
params = {},
timeout = 5000,
envVars = {},
blockData = {},
blockNameMapping = {},
workflowId,
isCustomTool = false,
} = body
@@ -93,7 +355,15 @@ export async function POST(req: NextRequest) {
})
// Resolve variables in the code with workflow environment variables
const { resolvedCode, contextVariables } = resolveCodeVariables(code, executionParams, envVars)
const codeResolution = resolveCodeVariables(
code,
executionParams,
envVars,
blockData,
blockNameMapping
)
resolvedCode = codeResolution.resolvedCode
const contextVariables = codeResolution.contextVariables
const executionMethod = 'vm' // Default execution method
@@ -239,16 +509,12 @@ export async function POST(req: NextRequest) {
// timeout,
// displayErrors: true,
// })
// logger.info(`[${requestId}] VM execution result`, {
// result,
// stdout,
// })
// }
// } else {
logger.info(`[${requestId}] Using VM for code execution`, {
resolvedCode,
executionParams,
envVars,
hasEnvVars: Object.keys(envVars).length > 0,
})
// Create a secure context with console logging
@@ -274,28 +540,40 @@ export async function POST(req: NextRequest) {
},
})
const script = new Script(`
(async () => {
try {
${
isCustomTool
? `// For custom tools, make parameters directly accessible
${Object.keys(executionParams)
.map((key) => `const ${key} = params.${key};`)
.join('\n ')}`
: ''
}
${resolvedCode}
} catch (error) {
console.error(error);
throw error;
}
})()
`)
// Calculate line offset for user code to provide accurate error reporting
const wrapperLines = ['(async () => {', ' try {']
// Add custom tool parameter declarations if needed
if (isCustomTool) {
wrapperLines.push(' // For custom tools, make parameters directly accessible')
Object.keys(executionParams).forEach((key) => {
wrapperLines.push(` const ${key} = params.${key};`)
})
}
userCodeStartLine = wrapperLines.length + 1 // +1 because user code starts on next line
// Build the complete script with proper formatting for line numbers
const fullScript = [
...wrapperLines,
` ${resolvedCode.split('\n').join('\n ')}`, // Indent user code
' } catch (error) {',
' console.error(error);',
' throw error;',
' }',
'})()',
].join('\n')
const script = new Script(fullScript, {
filename: 'user-function.js', // This filename will appear in stack traces
lineOffset: 0, // Start line numbering from 0
columnOffset: 0, // Start column numbering from 0
})
const result = await script.runInContext(context, {
timeout,
displayErrors: true,
breakOnSigint: true, // Allow breaking on SIGINT for better debugging
})
// }
@@ -322,14 +600,40 @@ export async function POST(req: NextRequest) {
executionTime,
})
const enhancedError = extractEnhancedError(error, userCodeStartLine, resolvedCode)
const userFriendlyErrorMessage = createUserFriendlyErrorMessage(
enhancedError,
requestId,
resolvedCode
)
// Log enhanced error details for debugging
logger.error(`[${requestId}] Enhanced error details`, {
originalMessage: error.message,
enhancedMessage: userFriendlyErrorMessage,
line: enhancedError.line,
column: enhancedError.column,
lineContent: enhancedError.lineContent,
errorType: enhancedError.name,
userCodeStartLine,
})
const errorResponse = {
success: false,
error: error.message || 'Code execution failed',
error: userFriendlyErrorMessage,
output: {
result: null,
stdout,
executionTime,
},
// Include debug information in development or for debugging
debug: {
line: enhancedError.line,
column: enhancedError.column,
errorType: enhancedError.name,
lineContent: enhancedError.lineContent,
stack: enhancedError.stack,
},
}
return NextResponse.json(errorResponse, { status: 500 })

View File

@@ -17,6 +17,17 @@ describe('Scheduled Workflow Execution API Route', () => {
mockExecutionDependencies()
// Mock the normalized tables helper
vi.doMock('@/lib/workflows/db-helpers', () => ({
loadWorkflowFromNormalizedTables: vi.fn().mockResolvedValue({
blocks: sampleWorkflowState.blocks,
edges: sampleWorkflowState.edges || [],
loops: sampleWorkflowState.loops || {},
parallels: sampleWorkflowState.parallels || {},
isFromNormalizedTables: true,
}),
}))
vi.doMock('croner', () => ({
Cron: vi.fn().mockImplementation(() => ({
nextRun: vi.fn().mockReturnValue(new Date(Date.now() + 60000)), // Next run in 1 minute

View File

@@ -14,13 +14,13 @@ import {
} from '@/lib/schedules/utils'
import { checkServerSideUsageLimits } from '@/lib/usage-monitor'
import { decryptSecret } from '@/lib/utils'
import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/db-helpers'
import { updateWorkflowRunCounts } from '@/lib/workflows/utils'
import { db } from '@/db'
import { environment, userStats, workflow, workflowSchedule } from '@/db/schema'
import { Executor } from '@/executor'
import { Serializer } from '@/serializer'
import { mergeSubblockState } from '@/stores/workflows/server-utils'
import type { WorkflowState } from '@/stores/workflows/workflow/types'
// Add dynamic export to prevent caching
export const dynamic = 'force-dynamic'
@@ -149,8 +149,27 @@ export async function GET(req: NextRequest) {
continue
}
const state = workflowRecord.state as WorkflowState
const { blocks, edges, loops, parallels } = state
// Load workflow data from normalized tables (no fallback to deprecated state column)
logger.debug(
`[${requestId}] Loading workflow ${schedule.workflowId} from normalized tables`
)
const normalizedData = await loadWorkflowFromNormalizedTables(schedule.workflowId)
if (!normalizedData) {
logger.error(
`[${requestId}] No normalized data found for scheduled workflow ${schedule.workflowId}`
)
throw new Error(`Workflow data not found in normalized tables for ${schedule.workflowId}`)
}
// Use normalized data only
const blocks = normalizedData.blocks
const edges = normalizedData.edges
const loops = normalizedData.loops
const parallels = normalizedData.parallels
logger.info(
`[${requestId}] Loaded scheduled workflow ${schedule.workflowId} from normalized tables`
)
const mergedStates = mergeSubblockState(blocks)
@@ -405,9 +424,13 @@ export async function GET(req: NextRequest) {
.limit(1)
if (workflowRecord) {
const state = workflowRecord.state as WorkflowState
const { blocks } = state
nextRunAt = calculateNextRunTime(schedule, blocks)
const normalizedData = await loadWorkflowFromNormalizedTables(schedule.workflowId)
if (!normalizedData) {
nextRunAt = new Date(now.getTime() + 24 * 60 * 60 * 1000)
} else {
nextRunAt = calculateNextRunTime(schedule, normalizedData.blocks)
}
} else {
nextRunAt = new Date(now.getTime() + 24 * 60 * 60 * 1000)
}

View File

@@ -5,11 +5,7 @@ import { NextRequest } from 'next/server'
* @vitest-environment node
*/
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
import {
createMockRequest,
mockExecutionDependencies,
sampleWorkflowState,
} from '@/app/api/__test-utils__/utils'
import { createMockRequest, mockExecutionDependencies } from '@/app/api/__test-utils__/utils'
// Define mock functions at the top level to be used in mocks
const hasProcessedMessageMock = vi.fn().mockResolvedValue(false)
@@ -148,10 +144,18 @@ describe('Webhook Trigger API Route', () => {
vi.resetAllMocks()
vi.clearAllTimers()
// Mock all dependencies
mockExecutionDependencies()
// Reset mock behaviors to default for each test
vi.doMock('@/lib/workflows/db-helpers', () => ({
loadWorkflowFromNormalizedTables: vi.fn().mockResolvedValue({
blocks: {},
edges: [],
loops: {},
parallels: {},
isFromNormalizedTables: true,
}),
}))
hasProcessedMessageMock.mockResolvedValue(false)
markMessageAsProcessedMock.mockResolvedValue(true)
acquireLockMock.mockResolvedValue(true)
@@ -159,12 +163,10 @@ describe('Webhook Trigger API Route', () => {
processGenericDeduplicationMock.mockResolvedValue(null)
processWebhookMock.mockResolvedValue(new Response('Webhook processed', { status: 200 }))
// Restore original crypto.randomUUID if it was mocked
if ((global as any).crypto?.randomUUID) {
vi.spyOn(crypto, 'randomUUID').mockRestore()
}
// Mock crypto.randomUUID to return predictable values
vi.spyOn(crypto, 'randomUUID').mockReturnValue('mock-uuid-12345')
})
@@ -263,7 +265,6 @@ describe('Webhook Trigger API Route', () => {
workflow: {
id: 'workflow-id',
userId: 'user-id',
state: sampleWorkflowState,
},
},
])
@@ -355,7 +356,6 @@ describe('Webhook Trigger API Route', () => {
workflow: {
id: 'workflow-id',
userId: 'user-id',
state: sampleWorkflowState,
},
},
])
@@ -409,7 +409,6 @@ describe('Webhook Trigger API Route', () => {
workflow: {
id: 'workflow-id',
userId: 'user-id',
state: sampleWorkflowState,
},
},
])
@@ -482,7 +481,6 @@ describe('Webhook Trigger API Route', () => {
workflow: {
id: 'workflow-id',
userId: 'user-id',
state: sampleWorkflowState,
},
},
])
@@ -553,7 +551,6 @@ describe('Webhook Trigger API Route', () => {
workflow: {
id: 'workflow-id',
userId: 'user-id',
state: sampleWorkflowState,
},
},
])

View File

@@ -12,6 +12,7 @@ import {
processWebhook,
processWhatsAppDeduplication,
} from '@/lib/webhooks/utils'
import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/db-helpers'
import { db } from '@/db'
import { webhook, workflow } from '@/db/schema'
@@ -187,6 +188,24 @@ export async function POST(
foundWebhook = webhooks[0].webhook
foundWorkflow = webhooks[0].workflow
const normalizedData = await loadWorkflowFromNormalizedTables(foundWorkflow.id)
if (!normalizedData) {
logger.error(`[${requestId}] No normalized data found for webhook workflow ${foundWorkflow.id}`)
return new NextResponse('Workflow data not found in normalized tables', { status: 500 })
}
// Construct state from normalized data only (execution-focused, no frontend state fields)
foundWorkflow.state = {
blocks: normalizedData.blocks,
edges: normalizedData.edges,
loops: normalizedData.loops,
parallels: normalizedData.parallels,
lastSaved: Date.now(),
isDeployed: foundWorkflow.isDeployed || false,
deployedAt: foundWorkflow.deployedAt,
}
// Special handling for Telegram webhooks to work around middleware User-Agent checks
if (foundWebhook.provider === 'telegram') {
// Log detailed information about the request for debugging

View File

@@ -31,6 +31,27 @@ describe('Workflow Deployment API Route', () => {
}),
}))
vi.doMock('@/lib/workflows/db-helpers', () => ({
loadWorkflowFromNormalizedTables: vi.fn().mockResolvedValue({
blocks: {
'block-1': {
id: 'block-1',
type: 'starter',
name: 'Start',
position: { x: 100, y: 100 },
enabled: true,
subBlocks: {},
outputs: {},
data: {},
},
},
edges: [],
loops: {},
parallels: {},
isFromNormalizedTables: true,
}),
}))
vi.doMock('../../middleware', () => ({
validateWorkflowAccess: vi.fn().mockResolvedValue({
workflow: {
@@ -74,6 +95,7 @@ describe('Workflow Deployment API Route', () => {
isDeployed: false,
deployedAt: null,
userId: 'user-id',
deployedState: null,
},
]),
}),
@@ -129,7 +151,6 @@ describe('Workflow Deployment API Route', () => {
}),
}),
})
// Mock normalized table queries (blocks, edges, subflows)
.mockReturnValueOnce({
from: vi.fn().mockReturnValue({
where: vi.fn().mockResolvedValue([
@@ -216,7 +237,6 @@ describe('Workflow Deployment API Route', () => {
}),
}),
})
// Mock normalized table queries (blocks, edges, subflows)
.mockReturnValueOnce({
from: vi.fn().mockReturnValue({
where: vi.fn().mockResolvedValue([

View File

@@ -32,7 +32,6 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{
isDeployed: workflow.isDeployed,
deployedAt: workflow.deployedAt,
userId: workflow.userId,
state: workflow.state,
deployedState: workflow.deployedState,
})
.from(workflow)
@@ -93,11 +92,25 @@ export async function GET(request: NextRequest, { params }: { params: Promise<{
// Check if the workflow has meaningful changes that would require redeployment
let needsRedeployment = false
if (workflowData.deployedState) {
const { hasWorkflowChanged } = await import('@/lib/workflows/utils')
needsRedeployment = hasWorkflowChanged(
workflowData.state as any,
workflowData.deployedState as any
)
// Load current state from normalized tables for comparison
const { loadWorkflowFromNormalizedTables } = await import('@/lib/workflows/db-helpers')
const normalizedData = await loadWorkflowFromNormalizedTables(id)
if (normalizedData) {
// Convert normalized data to WorkflowState format for comparison
const currentState = {
blocks: normalizedData.blocks,
edges: normalizedData.edges,
loops: normalizedData.loops,
parallels: normalizedData.parallels,
}
const { hasWorkflowChanged } = await import('@/lib/workflows/utils')
needsRedeployment = hasWorkflowChanged(
currentState as any,
workflowData.deployedState as any
)
}
}
logger.info(`[${requestId}] Successfully retrieved deployment info: ${id}`)
@@ -126,11 +139,10 @@ export async function POST(request: NextRequest, { params }: { params: Promise<{
return createErrorResponse(validation.error.message, validation.error.status)
}
// Get the workflow to find the user
// Get the workflow to find the user (removed deprecated state column)
const workflowData = await db
.select({
userId: workflow.userId,
state: workflow.state,
})
.from(workflow)
.where(eq(workflow.id, id))

View File

@@ -24,45 +24,54 @@ describe('Workflow Execution API Route', () => {
beforeEach(() => {
vi.resetModules()
// Mock workflow middleware
vi.doMock('@/app/api/workflows/middleware', () => ({
validateWorkflowAccess: vi.fn().mockResolvedValue({
workflow: {
id: 'workflow-id',
userId: 'user-id',
state: {
blocks: {
'starter-id': {
id: 'starter-id',
type: 'starter',
name: 'Start',
position: { x: 100, y: 100 },
enabled: true,
},
'agent-id': {
id: 'agent-id',
type: 'agent',
name: 'Agent',
position: { x: 300, y: 100 },
enabled: true,
},
},
edges: [
{
id: 'edge-1',
source: 'starter-id',
target: 'agent-id',
sourceHandle: 'source',
targetHandle: 'target',
},
],
loops: {},
},
},
}),
}))
// Reset execute mock to track calls
vi.doMock('@/lib/workflows/db-helpers', () => ({
loadWorkflowFromNormalizedTables: vi.fn().mockResolvedValue({
blocks: {
'starter-id': {
id: 'starter-id',
type: 'starter',
name: 'Start',
position: { x: 100, y: 100 },
enabled: true,
subBlocks: {},
outputs: {},
data: {},
},
'agent-id': {
id: 'agent-id',
type: 'agent',
name: 'Agent',
position: { x: 300, y: 100 },
enabled: true,
subBlocks: {},
outputs: {},
data: {},
},
},
edges: [
{
id: 'edge-1',
source: 'starter-id',
target: 'agent-id',
sourceHandle: 'source',
targetHandle: 'target',
},
],
loops: {},
parallels: {},
isFromNormalizedTables: true,
}),
}))
executeMock = vi.fn().mockResolvedValue({
success: true,
output: {
@@ -76,14 +85,12 @@ describe('Workflow Execution API Route', () => {
},
})
// Mock executor
vi.doMock('@/executor', () => ({
Executor: vi.fn().mockImplementation(() => ({
execute: executeMock,
})),
}))
// Mock environment variables
vi.doMock('@/lib/utils', () => ({
decryptSecret: vi.fn().mockResolvedValue({
decrypted: 'decrypted-secret-value',
@@ -92,13 +99,11 @@ describe('Workflow Execution API Route', () => {
getRotatingApiKey: vi.fn().mockReturnValue('rotated-api-key'),
}))
// Mock logger
vi.doMock('@/lib/logs/execution-logger', () => ({
persistExecutionLogs: vi.fn().mockResolvedValue(undefined),
persistExecutionError: vi.fn().mockResolvedValue(undefined),
}))
// Mock trace spans
vi.doMock('@/lib/logs/trace-spans', () => ({
buildTraceSpans: vi.fn().mockReturnValue({
traceSpans: [],
@@ -106,13 +111,11 @@ describe('Workflow Execution API Route', () => {
}),
}))
// Mock workflow run counts
vi.doMock('@/lib/workflows/utils', () => ({
updateWorkflowRunCounts: vi.fn().mockResolvedValue(undefined),
workflowHasResponseBlock: vi.fn().mockReturnValue(false),
}))
// Mock database
vi.doMock('@/db', () => {
const mockDb = {
select: vi.fn().mockImplementation(() => ({
@@ -140,7 +143,6 @@ describe('Workflow Execution API Route', () => {
return { db: mockDb }
})
// Mock Serializer
vi.doMock('@/serializer', () => ({
Serializer: vi.fn().mockImplementation(() => ({
serializeWorkflow: vi.fn().mockReturnValue({
@@ -162,49 +164,37 @@ describe('Workflow Execution API Route', () => {
* Simulates direct execution with URL-based parameters
*/
it('should execute workflow with GET request successfully', async () => {
// Create a mock request with query parameters
const req = createMockRequest('GET')
// Create params similar to what Next.js would provide
const params = Promise.resolve({ id: 'workflow-id' })
// Import the handler after mocks are set up
const { GET } = await import('./route')
// Call the handler
const response = await GET(req, { params })
// Get the actual status code - in some implementations this might not be 200
// Based on the current implementation, validate the response exists
expect(response).toBeDefined()
// Try to parse the response body
let data
try {
data = await response.json()
} catch (e) {
// If we can't parse JSON, the response may not be what we expect
console.error('Response could not be parsed as JSON:', await response.text())
throw e
}
// If status is 200, verify success structure
if (response.status === 200) {
expect(data).toHaveProperty('success', true)
expect(data).toHaveProperty('output')
expect(data.output).toHaveProperty('response')
}
// Verify middleware was called
const validateWorkflowAccess = (await import('@/app/api/workflows/middleware'))
.validateWorkflowAccess
expect(validateWorkflowAccess).toHaveBeenCalledWith(expect.any(Object), 'workflow-id')
// Verify executor was initialized
const Executor = (await import('@/executor')).Executor
expect(Executor).toHaveBeenCalled()
// Verify execute was called with undefined input (GET requests don't have body)
expect(executeMock).toHaveBeenCalledWith('workflow-id')
})
@@ -213,59 +203,45 @@ describe('Workflow Execution API Route', () => {
* Simulates execution with a JSON body containing parameters
*/
it('should execute workflow with POST request successfully', async () => {
// Create request body with custom inputs
const requestBody = {
inputs: {
message: 'Test input message',
},
}
// Create a mock request with the request body
const req = createMockRequest('POST', requestBody)
// Create params similar to what Next.js would provide
const params = Promise.resolve({ id: 'workflow-id' })
// Import the handler after mocks are set up
const { POST } = await import('./route')
// Call the handler
const response = await POST(req, { params })
// Ensure response exists
expect(response).toBeDefined()
// Try to parse the response body
let data
try {
data = await response.json()
} catch (e) {
// If we can't parse JSON, the response may not be what we expect
console.error('Response could not be parsed as JSON:', await response.text())
throw e
}
// If status is 200, verify success structure
if (response.status === 200) {
expect(data).toHaveProperty('success', true)
expect(data).toHaveProperty('output')
expect(data.output).toHaveProperty('response')
}
// Verify middleware was called
const validateWorkflowAccess = (await import('@/app/api/workflows/middleware'))
.validateWorkflowAccess
expect(validateWorkflowAccess).toHaveBeenCalledWith(expect.any(Object), 'workflow-id')
// Verify executor was constructed
const Executor = (await import('@/executor')).Executor
expect(Executor).toHaveBeenCalled()
// Verify execute was called with the input body
expect(executeMock).toHaveBeenCalledWith('workflow-id')
// Updated expectations to match actual implementation
// The structure should match: serializedWorkflow, processedBlockStates, decryptedEnvVars, processedInput, workflowVariables
expect(Executor).toHaveBeenCalledWith(
expect.anything(), // serializedWorkflow
expect.anything(), // processedBlockStates
@@ -282,7 +258,6 @@ describe('Workflow Execution API Route', () => {
* Test POST execution with structured input matching the input format
*/
it('should execute workflow with structured input matching the input format', async () => {
// Create structured input matching the expected input format
const structuredInput = {
firstName: 'John',
age: 30,
@@ -291,27 +266,20 @@ describe('Workflow Execution API Route', () => {
tags: ['test', 'api'],
}
// Create a mock request with the structured input
const req = createMockRequest('POST', structuredInput)
// Create params similar to what Next.js would provide
const params = Promise.resolve({ id: 'workflow-id' })
// Import the handler after mocks are set up
const { POST } = await import('./route')
// Call the handler
const response = await POST(req, { params })
// Ensure response exists and is successful
expect(response).toBeDefined()
expect(response.status).toBe(200)
// Parse the response body
const data = await response.json()
expect(data).toHaveProperty('success', true)
// Verify the executor was constructed with the structured input - updated to match implementation
const Executor = (await import('@/executor')).Executor
expect(Executor).toHaveBeenCalledWith(
expect.anything(), // serializedWorkflow
@@ -478,39 +446,51 @@ describe('Workflow Execution API Route', () => {
workflow: {
id: 'workflow-with-vars-id',
userId: 'user-id',
state: {
blocks: {
'starter-id': {
id: 'starter-id',
type: 'starter',
name: 'Start',
position: { x: 100, y: 100 },
enabled: true,
},
'agent-id': {
id: 'agent-id',
type: 'agent',
name: 'Agent',
position: { x: 300, y: 100 },
enabled: true,
},
},
edges: [
{
id: 'edge-1',
source: 'starter-id',
target: 'agent-id',
sourceHandle: 'source',
targetHandle: 'target',
},
],
loops: {},
},
variables: workflowVariables,
},
}),
}))
// Mock normalized tables helper for this specific test
vi.doMock('@/lib/workflows/db-helpers', () => ({
loadWorkflowFromNormalizedTables: vi.fn().mockResolvedValue({
blocks: {
'starter-id': {
id: 'starter-id',
type: 'starter',
name: 'Start',
position: { x: 100, y: 100 },
enabled: true,
subBlocks: {},
outputs: {},
data: {},
},
'agent-id': {
id: 'agent-id',
type: 'agent',
name: 'Agent',
position: { x: 300, y: 100 },
enabled: true,
subBlocks: {},
outputs: {},
data: {},
},
},
edges: [
{
id: 'edge-1',
source: 'starter-id',
target: 'agent-id',
sourceHandle: 'source',
targetHandle: 'target',
},
],
loops: {},
parallels: {},
isFromNormalizedTables: true,
}),
}))
// Create a constructor mock to capture the arguments
const executorConstructorMock = vi.fn().mockImplementation(() => ({
execute: vi.fn().mockResolvedValue({

View File

@@ -7,6 +7,7 @@ import { persistExecutionError, persistExecutionLogs } from '@/lib/logs/executio
import { buildTraceSpans } from '@/lib/logs/trace-spans'
import { checkServerSideUsageLimits } from '@/lib/usage-monitor'
import { decryptSecret } from '@/lib/utils'
import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/db-helpers'
import {
createHttpResponseFromBlock,
updateWorkflowRunCounts,
@@ -94,19 +95,34 @@ async function executeWorkflow(workflow: any, requestId: string, input?: any) {
runningExecutions.add(executionKey)
logger.info(`[${requestId}] Starting workflow execution: ${workflowId}`)
// Use the deployed state if available, otherwise fall back to current state
const workflowState = workflow.deployedState || workflow.state
// Load workflow data from normalized tables
logger.debug(`[${requestId}] Loading workflow ${workflowId} from normalized tables`)
const normalizedData = await loadWorkflowFromNormalizedTables(workflowId)
if (!workflow.deployedState) {
logger.warn(
`[${requestId}] No deployed state found for workflow: ${workflowId}, using current state`
)
let blocks: Record<string, any>
let edges: any[]
let loops: Record<string, any>
let parallels: Record<string, any>
if (normalizedData) {
// Use normalized data as primary source
;({ blocks, edges, loops, parallels } = normalizedData)
logger.info(`[${requestId}] Using normalized tables for workflow execution: ${workflowId}`)
} else {
logger.info(`[${requestId}] Using deployed state for workflow execution: ${workflowId}`)
}
// Fallback to deployed state if available (for legacy workflows)
logger.warn(
`[${requestId}] No normalized data found, falling back to deployed state for workflow: ${workflowId}`
)
const state = workflowState as WorkflowState
const { blocks, edges, loops, parallels } = state
if (!workflow.deployedState) {
throw new Error(
`Workflow ${workflowId} has no deployed state and no normalized data available`
)
}
const deployedState = workflow.deployedState as WorkflowState
;({ blocks, edges, loops, parallels } = deployedState)
}
// Use the same execution flow as in scheduled executions
const mergedStates = mergeSubblockState(blocks)

View File

@@ -13,7 +13,7 @@ import {
} from '@/components/ui/command'
import { Popover, PopoverContent, PopoverTrigger } from '@/components/ui/popover'
import type { SubBlockConfig } from '@/blocks/types'
import { useSubBlockStore } from '@/stores/workflows/subblock/store'
import { useSubBlockValue } from '../../hooks/use-sub-block-value'
interface DocumentData {
id: string
@@ -50,45 +50,25 @@ export function DocumentSelector({
isPreview = false,
previewValue,
}: DocumentSelectorProps) {
const { getValue, setValue } = useSubBlockStore()
const [documents, setDocuments] = useState<DocumentData[]>([])
const [error, setError] = useState<string | null>(null)
const [open, setOpen] = useState(false)
const [selectedDocument, setSelectedDocument] = useState<DocumentData | null>(null)
const [initialFetchDone, setInitialFetchDone] = useState(false)
const [selectedId, setSelectedId] = useState('')
// Get the current value from the store
const storeValue = getValue(blockId, subBlock.id)
// Use the proper hook to get the current value and setter
const [storeValue, setStoreValue] = useSubBlockValue(blockId, subBlock.id)
// Get the knowledge base ID from the same block's knowledgeBaseId subblock
const knowledgeBaseId = getValue(blockId, 'knowledgeBaseId')
const [knowledgeBaseId] = useSubBlockValue(blockId, 'knowledgeBaseId')
// Use preview value when in preview mode, otherwise use store value
const value = isPreview ? previewValue : storeValue
// Initialize selectedId with the effective value
useEffect(() => {
if (isPreview && previewValue !== undefined) {
setSelectedId(previewValue || '')
} else {
setSelectedId(value || '')
}
}, [value, isPreview, previewValue])
// Update local state when external value changes
useEffect(() => {
const currentValue = isPreview ? previewValue : value
setSelectedId(currentValue || '')
}, [value, isPreview, previewValue])
// Fetch documents for the selected knowledge base
const fetchDocuments = useCallback(async () => {
if (!knowledgeBaseId) {
setDocuments([])
setError('No knowledge base selected')
setInitialFetchDone(true)
return
}
@@ -109,39 +89,12 @@ export function DocumentSelector({
const fetchedDocuments = result.data || []
setDocuments(fetchedDocuments)
setInitialFetchDone(true)
// Auto-selection logic: if we have a valid selection, keep it
// If there's only one document, select it
// If we have a value but it's not in the documents, reset it
if (selectedId && !fetchedDocuments.some((doc: DocumentData) => doc.id === selectedId)) {
setSelectedId('')
if (!isPreview) {
setValue(blockId, subBlock.id, '')
}
}
if (
(!selectedId || !fetchedDocuments.some((doc: DocumentData) => doc.id === selectedId)) &&
fetchedDocuments.length > 0
) {
if (fetchedDocuments.length === 1) {
// If only one document, auto-select it
const singleDoc = fetchedDocuments[0]
setSelectedId(singleDoc.id)
setSelectedDocument(singleDoc)
if (!isPreview) {
setValue(blockId, subBlock.id, singleDoc.id)
}
onDocumentSelect?.(singleDoc.id)
}
}
} catch (err) {
if ((err as Error).name === 'AbortError') return
setError((err as Error).message)
setDocuments([])
}
}, [knowledgeBaseId, selectedId, setValue, blockId, subBlock.id, isPreview, onDocumentSelect])
}, [knowledgeBaseId])
// Handle dropdown open/close - fetch documents when opening
const handleOpenChange = (isOpen: boolean) => {
@@ -160,50 +113,34 @@ export function DocumentSelector({
if (isPreview) return
setSelectedDocument(document)
setSelectedId(document.id)
if (!isPreview) {
setValue(blockId, subBlock.id, document.id)
}
setStoreValue(document.id)
onDocumentSelect?.(document.id)
setOpen(false)
}
// Sync selected document with value prop
useEffect(() => {
if (selectedId && documents.length > 0) {
const docInfo = documents.find((doc) => doc.id === selectedId)
if (docInfo) {
setSelectedDocument(docInfo)
} else {
setSelectedDocument(null)
}
} else if (!selectedId) {
if (value && documents.length > 0) {
const docInfo = documents.find((doc) => doc.id === value)
setSelectedDocument(docInfo || null)
} else {
setSelectedDocument(null)
}
}, [selectedId, documents])
}, [value, documents])
// Reset documents when knowledge base changes
useEffect(() => {
if (knowledgeBaseId) {
setDocuments([])
setSelectedDocument(null)
setSelectedId('')
setInitialFetchDone(false)
setError(null)
if (!isPreview) {
setValue(blockId, subBlock.id, '')
}
}
}, [knowledgeBaseId, blockId, subBlock.id, setValue, isPreview])
setDocuments([])
setSelectedDocument(null)
setError(null)
}, [knowledgeBaseId])
// Fetch documents when knowledge base is available and we haven't fetched yet
// Fetch documents when knowledge base is available
useEffect(() => {
if (knowledgeBaseId && !initialFetchDone && !isPreview) {
if (knowledgeBaseId && !isPreview) {
fetchDocuments()
}
}, [knowledgeBaseId, initialFetchDone, fetchDocuments, isPreview])
}, [knowledgeBaseId, isPreview, fetchDocuments])
const formatDocumentName = (document: DocumentData) => {
return document.filename
@@ -297,7 +234,7 @@ export function DocumentSelector({
</div>
</div>
</div>
{document.id === selectedId && <Check className='ml-auto h-4 w-4' />}
{document.id === value && <Check className='ml-auto h-4 w-4' />}
</CommandItem>
))}
</CommandGroup>

View File

@@ -4,8 +4,10 @@ import { useEffect, useState } from 'react'
import { Tooltip, TooltipContent, TooltipProvider, TooltipTrigger } from '@/components/ui/tooltip'
import { env } from '@/lib/env'
import type { SubBlockConfig } from '@/blocks/types'
import { useCollaborativeWorkflow } from '@/hooks/use-collaborative-workflow'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
import { useSubBlockStore } from '@/stores/workflows/subblock/store'
import { useSubBlockValue } from '../../hooks/use-sub-block-value'
import type { ConfluenceFileInfo } from './components/confluence-file-selector'
import { ConfluenceFileSelector } from './components/confluence-file-selector'
import type { DiscordChannelInfo } from './components/discord-channel-selector'
@@ -36,8 +38,12 @@ export function FileSelectorInput({
isPreview = false,
previewValue,
}: FileSelectorInputProps) {
const { getValue, setValue } = useSubBlockStore()
const { getValue } = useSubBlockStore()
const { collaborativeSetSubblockValue } = useCollaborativeWorkflow()
const { activeWorkflowId } = useWorkflowRegistry()
// Use the proper hook to get the current value and setter
const [storeValue, setStoreValue] = useSubBlockValue(blockId, subBlock.id)
const [selectedFileId, setSelectedFileId] = useState<string>('')
const [_fileInfo, setFileInfo] = useState<FileInfo | ConfluenceFileInfo | null>(null)
const [selectedIssueId, setSelectedIssueId] = useState<string>('')
@@ -64,7 +70,7 @@ export function FileSelectorInput({
const serverId = isDiscord ? (getValue(blockId, 'serverId') as string) || '' : ''
// Use preview value when in preview mode, otherwise use store value
const value = isPreview ? previewValue : getValue(blockId, subBlock.id)
const value = isPreview ? previewValue : storeValue
// Get the current value from the store or prop value if in preview mode
useEffect(() => {
@@ -115,19 +121,19 @@ export function FileSelectorInput({
const handleFileChange = (fileId: string, info?: any) => {
setSelectedFileId(fileId)
setFileInfo(info || null)
setValue(blockId, subBlock.id, fileId)
setStoreValue(fileId)
}
// Handle issue selection
const handleIssueChange = (issueKey: string, info?: JiraIssueInfo) => {
setSelectedIssueId(issueKey)
setIssueInfo(info || null)
setValue(blockId, subBlock.id, issueKey)
setStoreValue(issueKey)
// Clear the fields when a new issue is selected
if (isJira) {
setValue(blockId, 'summary', '')
setValue(blockId, 'description', '')
collaborativeSetSubblockValue(blockId, 'summary', '')
collaborativeSetSubblockValue(blockId, 'description', '')
}
}
@@ -135,14 +141,14 @@ export function FileSelectorInput({
const handleChannelChange = (channelId: string, info?: DiscordChannelInfo) => {
setSelectedChannelId(channelId)
setChannelInfo(info || null)
setValue(blockId, subBlock.id, channelId)
setStoreValue(channelId)
}
// Handle calendar selection
const handleCalendarChange = (calendarId: string, info?: GoogleCalendarInfo) => {
setSelectedCalendarId(calendarId)
setCalendarInfo(info || null)
setValue(blockId, subBlock.id, calendarId)
setStoreValue(calendarId)
}
// For Google Drive
@@ -337,7 +343,7 @@ export function FileSelectorInput({
onChange={(value, info) => {
setSelectedMessageId(value)
setMessageInfo(info || null)
setValue(blockId, subBlock.id, value)
collaborativeSetSubblockValue(blockId, subBlock.id, value)
}}
provider='microsoft-teams'
requiredScopes={subBlock.requiredScopes || []}

View File

@@ -6,7 +6,6 @@ import { Button } from '@/components/ui/button'
import { Progress } from '@/components/ui/progress'
import { useNotificationStore } from '@/stores/notifications/store'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
import { useSubBlockStore } from '@/stores/workflows/subblock/store'
import { useWorkflowStore } from '@/stores/workflows/workflow/store'
import { useSubBlockValue } from '../hooks/use-sub-block-value'
@@ -297,16 +296,10 @@ export function FileUpload({
const newFiles = Array.from(uniqueFiles.values())
setStoreValue(newFiles)
// Make sure to update the subblock store value for the workflow execution
useSubBlockStore.getState().setValue(blockId, subBlockId, newFiles)
useWorkflowStore.getState().triggerUpdate()
} else {
// For single file: Replace with last uploaded file
setStoreValue(uploadedFiles[0] || null)
// Make sure to update the subblock store value for the workflow execution
useSubBlockStore.getState().setValue(blockId, subBlockId, uploadedFiles[0] || null)
useWorkflowStore.getState().triggerUpdate()
}
} catch (error) {
@@ -362,17 +355,9 @@ export function FileUpload({
const filesArray = Array.isArray(value) ? value : value ? [value] : []
const updatedFiles = filesArray.filter((f) => f.path !== file.path)
setStoreValue(updatedFiles.length > 0 ? updatedFiles : null)
// Make sure to update the subblock store value for the workflow execution
useSubBlockStore
.getState()
.setValue(blockId, subBlockId, updatedFiles.length > 0 ? updatedFiles : null)
} else {
// For single file: Clear the value
setStoreValue(null)
// Make sure to update the subblock store
useSubBlockStore.getState().setValue(blockId, subBlockId, null)
}
useWorkflowStore.getState().triggerUpdate()
@@ -413,7 +398,6 @@ export function FileUpload({
// Clear input state immediately for better UX
setStoreValue(null)
useSubBlockStore.getState().setValue(blockId, subBlockId, null)
useWorkflowStore.getState().triggerUpdate()
if (fileInputRef.current) {

View File

@@ -1,6 +1,6 @@
'use client'
import { useCallback, useEffect, useState } from 'react'
import { useCallback, useEffect, useMemo, useState } from 'react'
import { Check, ChevronDown, RefreshCw, X } from 'lucide-react'
import { PackageSearchIcon } from '@/components/icons'
import { Button } from '@/components/ui/button'
@@ -15,7 +15,7 @@ import {
import { Popover, PopoverContent, PopoverTrigger } from '@/components/ui/popover'
import type { SubBlockConfig } from '@/blocks/types'
import { type KnowledgeBaseData, useKnowledgeStore } from '@/stores/knowledge/store'
import { useSubBlockStore } from '@/stores/workflows/subblock/store'
import { useSubBlockValue } from '../../../sub-block/hooks/use-sub-block-value'
interface KnowledgeBaseSelectorProps {
blockId: string
@@ -36,23 +36,39 @@ export function KnowledgeBaseSelector({
}: KnowledgeBaseSelectorProps) {
const { getKnowledgeBasesList, knowledgeBasesList, loadingKnowledgeBasesList } =
useKnowledgeStore()
const { getValue, setValue } = useSubBlockStore()
const [knowledgeBases, setKnowledgeBases] = useState<KnowledgeBaseData[]>([])
const [loading, setLoading] = useState(false)
const [error, setError] = useState<string | null>(null)
const [open, setOpen] = useState(false)
const [selectedKnowledgeBases, setSelectedKnowledgeBases] = useState<KnowledgeBaseData[]>([])
const [initialFetchDone, setInitialFetchDone] = useState(false)
// Get the current value from the store
const storeValue = getValue(blockId, subBlock.id)
// Use the proper hook to get the current value and setter - this prevents infinite loops
const [storeValue, setStoreValue] = useSubBlockValue(blockId, subBlock.id)
// Use preview value when in preview mode, otherwise use store value
const value = isPreview ? previewValue : storeValue
const isMultiSelect = subBlock.multiSelect === true
// Compute selected knowledge bases directly from value - no local state to avoid loops
const selectedKnowledgeBases = useMemo(() => {
if (value && knowledgeBases.length > 0) {
const selectedIds =
typeof value === 'string'
? value.includes(',')
? value
.split(',')
.map((id) => id.trim())
.filter((id) => id.length > 0)
: [value]
: []
return knowledgeBases.filter((kb) => selectedIds.includes(kb.id))
}
return []
}, [value, knowledgeBases])
// Fetch knowledge bases
const fetchKnowledgeBases = useCallback(async () => {
setLoading(true)
@@ -87,11 +103,8 @@ export function KnowledgeBaseSelector({
const handleSelectSingleKnowledgeBase = (knowledgeBase: KnowledgeBaseData) => {
if (isPreview) return
setSelectedKnowledgeBases([knowledgeBase])
if (!isPreview) {
setValue(blockId, subBlock.id, knowledgeBase.id)
}
// Use the hook's setter which handles collaborative updates
setStoreValue(knowledgeBase.id)
onKnowledgeBaseSelect?.(knowledgeBase.id)
setOpen(false)
@@ -112,15 +125,13 @@ export function KnowledgeBaseSelector({
newSelected = [...selectedKnowledgeBases, knowledgeBase]
}
setSelectedKnowledgeBases(newSelected)
const selectedIds = newSelected.map((kb) => kb.id)
const valueToStore = selectedIds.length === 1 ? selectedIds[0] : selectedIds.join(',')
if (!isPreview) {
const selectedIds = newSelected.map((kb) => kb.id)
const valueToStore = selectedIds.length === 1 ? selectedIds[0] : selectedIds.join(',')
setValue(blockId, subBlock.id, valueToStore)
}
// Use the hook's setter which handles collaborative updates
setStoreValue(valueToStore)
onKnowledgeBaseSelect?.(newSelected.map((kb) => kb.id))
onKnowledgeBaseSelect?.(selectedIds)
}
// Remove selected knowledge base (for multi-select tags)
@@ -128,37 +139,15 @@ export function KnowledgeBaseSelector({
if (isPreview) return
const newSelected = selectedKnowledgeBases.filter((kb) => kb.id !== knowledgeBaseId)
setSelectedKnowledgeBases(newSelected)
const selectedIds = newSelected.map((kb) => kb.id)
const valueToStore = selectedIds.length === 1 ? selectedIds[0] : selectedIds.join(',')
if (!isPreview) {
const selectedIds = newSelected.map((kb) => kb.id)
const valueToStore = selectedIds.length === 1 ? selectedIds[0] : selectedIds.join(',')
setValue(blockId, subBlock.id, valueToStore)
}
// Use the hook's setter which handles collaborative updates
setStoreValue(valueToStore)
onKnowledgeBaseSelect?.(newSelected.map((kb) => kb.id))
onKnowledgeBaseSelect?.(selectedIds)
}
// Sync selected knowledge bases with value prop
useEffect(() => {
if (value && knowledgeBases.length > 0) {
const selectedIds =
typeof value === 'string'
? value.includes(',')
? value
.split(',')
.map((id) => id.trim())
.filter((id) => id.length > 0)
: [value]
: []
const selectedKbs = knowledgeBases.filter((kb) => selectedIds.includes(kb.id))
setSelectedKnowledgeBases(selectedKbs)
} else if (!value) {
setSelectedKnowledgeBases([])
}
}, [value, knowledgeBases])
// Use cached data if available
useEffect(() => {
if (knowledgeBasesList.length > 0 && !initialFetchDone) {
@@ -172,6 +161,7 @@ export function KnowledgeBaseSelector({
if (
value &&
selectedKnowledgeBases.length === 0 &&
knowledgeBases.length === 0 &&
!loading &&
!initialFetchDone &&
!isPreview
@@ -181,6 +171,7 @@ export function KnowledgeBaseSelector({
}, [
value,
selectedKnowledgeBases.length,
knowledgeBases.length,
loading,
initialFetchDone,
fetchKnowledgeBases,

View File

@@ -3,7 +3,9 @@
import { useEffect, useState } from 'react'
import { Tooltip, TooltipContent, TooltipProvider, TooltipTrigger } from '@/components/ui/tooltip'
import type { SubBlockConfig } from '@/blocks/types'
import { useCollaborativeWorkflow } from '@/hooks/use-collaborative-workflow'
import { useSubBlockStore } from '@/stores/workflows/subblock/store'
import { useSubBlockValue } from '../../hooks/use-sub-block-value'
import { type DiscordServerInfo, DiscordServerSelector } from './components/discord-server-selector'
import { type JiraProjectInfo, JiraProjectSelector } from './components/jira-project-selector'
import { type LinearProjectInfo, LinearProjectSelector } from './components/linear-project-selector'
@@ -26,10 +28,14 @@ export function ProjectSelectorInput({
isPreview = false,
previewValue,
}: ProjectSelectorInputProps) {
const { getValue, setValue } = useSubBlockStore()
const { getValue } = useSubBlockStore()
const { collaborativeSetSubblockValue } = useCollaborativeWorkflow()
const [selectedProjectId, setSelectedProjectId] = useState<string>('')
const [_projectInfo, setProjectInfo] = useState<JiraProjectInfo | DiscordServerInfo | null>(null)
// Use the proper hook to get the current value and setter
const [storeValue, setStoreValue] = useSubBlockValue(blockId, subBlock.id)
// Get provider-specific values
const provider = subBlock.provider || 'jira'
const isDiscord = provider === 'discord'
@@ -58,21 +64,21 @@ export function ProjectSelectorInput({
) => {
setSelectedProjectId(projectId)
setProjectInfo(info || null)
setValue(blockId, subBlock.id, projectId)
setStoreValue(projectId)
// Clear the issue-related fields when a new project is selected
if (provider === 'jira') {
setValue(blockId, 'summary', '')
setValue(blockId, 'description', '')
setValue(blockId, 'issueKey', '')
collaborativeSetSubblockValue(blockId, 'summary', '')
collaborativeSetSubblockValue(blockId, 'description', '')
collaborativeSetSubblockValue(blockId, 'issueKey', '')
} else if (provider === 'discord') {
setValue(blockId, 'channelId', '')
collaborativeSetSubblockValue(blockId, 'channelId', '')
} else if (provider === 'linear') {
if (subBlock.id === 'teamId') {
setValue(blockId, 'teamId', projectId)
setValue(blockId, 'projectId', '')
collaborativeSetSubblockValue(blockId, 'teamId', projectId)
collaborativeSetSubblockValue(blockId, 'projectId', '')
} else if (subBlock.id === 'projectId') {
setValue(blockId, 'projectId', projectId)
collaborativeSetSubblockValue(blockId, 'projectId', projectId)
}
}

View File

@@ -1,5 +1,6 @@
import { useCallback, useEffect, useRef } from 'react'
import { isEqual } from 'lodash'
import { useCollaborativeWorkflow } from '@/hooks/use-collaborative-workflow'
import { getProviderFromModel } from '@/providers/utils'
import { useGeneralStore } from '@/stores/settings/general/store'
import { useSubBlockStore } from '@/stores/workflows/subblock/store'
@@ -167,6 +168,8 @@ export function useSubBlockValue<T = any>(
subBlockId: string,
triggerWorkflowUpdate = false
): readonly [T | null, (value: T) => void] {
const { collaborativeSetSubblockValue } = useCollaborativeWorkflow()
const blockType = useWorkflowStore(
useCallback((state) => state.blocks?.[blockId]?.type, [blockId])
)
@@ -228,25 +231,24 @@ export function useSubBlockValue<T = any>(
storeApiKeyValue(blockId, blockType, modelValue, newValue, storeValue)
}
// Update the subblock store directly
useSubBlockStore.getState().setValue(blockId, subBlockId, valueCopy)
// Dispatch event to trigger socket emission only (not store update)
const event = new CustomEvent('update-subblock-value', {
detail: {
blockId,
subBlockId,
value: valueCopy,
},
})
window.dispatchEvent(event)
// Use collaborative function which handles both local store update and socket emission
collaborativeSetSubblockValue(blockId, subBlockId, valueCopy)
if (triggerWorkflowUpdate) {
useWorkflowStore.getState().triggerUpdate()
}
}
},
[blockId, subBlockId, blockType, isApiKey, storeValue, triggerWorkflowUpdate, modelValue]
[
blockId,
subBlockId,
blockType,
isApiKey,
storeValue,
triggerWorkflowUpdate,
modelValue,
collaborativeSetSubblockValue,
]
)
// Initialize valueRef on first render

View File

@@ -419,6 +419,7 @@ const WorkflowContent = React.memo(() => {
}
const { type } = event.detail
console.log('🛠️ Adding block from toolbar:', type)
if (!type) return
if (type === 'connectionBlock') return
@@ -439,32 +440,42 @@ const WorkflowContent = React.memo(() => {
y: window.innerHeight / 2,
})
// Add the container node directly to canvas with default dimensions
addBlock(id, type, name, centerPosition, {
width: 500,
height: 300,
type: type === 'loop' ? 'loopNode' : 'parallelNode',
})
// Auto-connect logic for container nodes
const isAutoConnectEnabled = useGeneralStore.getState().isAutoConnectEnabled
let autoConnectEdge
if (isAutoConnectEnabled) {
const closestBlock = findClosestOutput(centerPosition)
if (closestBlock) {
// Get appropriate source handle
const sourceHandle = determineSourceHandle(closestBlock)
addEdge({
autoConnectEdge = {
id: crypto.randomUUID(),
source: closestBlock.id,
target: id,
sourceHandle,
targetHandle: 'target',
type: 'workflowEdge',
})
}
}
}
// Add the container node directly to canvas with default dimensions and auto-connect edge
addBlock(
id,
type,
name,
centerPosition,
{
width: 500,
height: 300,
type: type === 'loop' ? 'loopNode' : 'parallelNode',
},
undefined,
undefined,
autoConnectEdge
)
return
}
@@ -486,27 +497,30 @@ const WorkflowContent = React.memo(() => {
Object.values(blocks).filter((b) => b.type === type).length + 1
}`
// Add the block to the workflow
addBlock(id, type, name, centerPosition)
// Auto-connect logic
const isAutoConnectEnabled = useGeneralStore.getState().isAutoConnectEnabled
let autoConnectEdge
if (isAutoConnectEnabled && type !== 'starter') {
const closestBlock = findClosestOutput(centerPosition)
console.log('🎯 Closest block found:', closestBlock)
if (closestBlock) {
// Get appropriate source handle
const sourceHandle = determineSourceHandle(closestBlock)
addEdge({
autoConnectEdge = {
id: crypto.randomUUID(),
source: closestBlock.id,
target: id,
sourceHandle,
targetHandle: 'target',
type: 'workflowEdge',
})
}
console.log('✅ Auto-connect edge created:', autoConnectEdge)
}
}
// Add the block to the workflow with auto-connect edge
addBlock(id, type, name, centerPosition, undefined, undefined, undefined, autoConnectEdge)
}
window.addEventListener('add-block-from-toolbar', handleAddBlockFromToolbar as EventListener)
@@ -583,30 +597,40 @@ const WorkflowContent = React.memo(() => {
// Resize the parent container to fit the new child container
resizeLoopNodesWrapper()
} else {
// Add the container node directly to canvas with default dimensions
addBlock(id, data.type, name, position, {
width: 500,
height: 300,
type: data.type === 'loop' ? 'loopNode' : 'parallelNode',
})
// Auto-connect the container to the closest node on the canvas
const isAutoConnectEnabled = useGeneralStore.getState().isAutoConnectEnabled
let autoConnectEdge
if (isAutoConnectEnabled) {
const closestBlock = findClosestOutput(position)
if (closestBlock) {
const sourceHandle = determineSourceHandle(closestBlock)
addEdge({
autoConnectEdge = {
id: crypto.randomUUID(),
source: closestBlock.id,
target: id,
sourceHandle,
targetHandle: 'target',
type: 'workflowEdge',
})
}
}
}
// Add the container node directly to canvas with default dimensions and auto-connect edge
addBlock(
id,
data.type,
name,
position,
{
width: 500,
height: 300,
type: data.type === 'loop' ? 'loopNode' : 'parallelNode',
},
undefined,
undefined,
autoConnectEdge
)
}
return
@@ -706,26 +730,27 @@ const WorkflowContent = React.memo(() => {
}
}
} else {
// Regular canvas drop
addBlock(id, data.type, name, position)
// Regular auto-connect logic
const isAutoConnectEnabled = useGeneralStore.getState().isAutoConnectEnabled
let autoConnectEdge
if (isAutoConnectEnabled && data.type !== 'starter') {
const closestBlock = findClosestOutput(position)
if (closestBlock) {
const sourceHandle = determineSourceHandle(closestBlock)
addEdge({
autoConnectEdge = {
id: crypto.randomUUID(),
source: closestBlock.id,
target: id,
sourceHandle,
targetHandle: 'target',
type: 'workflowEdge',
})
}
}
}
// Regular canvas drop with auto-connect edge
addBlock(id, data.type, name, position, undefined, undefined, undefined, autoConnectEdge)
}
} catch (err) {
logger.error('Error dropping block:', { err })

View File

@@ -6,13 +6,13 @@ export const KnowledgeBlock: BlockConfig = {
name: 'Knowledge',
description: 'Use vector search',
longDescription:
'Perform semantic vector search across one or more knowledge bases or upload new chunks to documents. Uses advanced AI embeddings to understand meaning and context for search operations.',
'Perform semantic vector search across knowledge bases, upload individual chunks to existing documents, or create new documents from text content. Uses advanced AI embeddings to understand meaning and context for search operations.',
bgColor: '#00B0B0',
icon: PackageSearchIcon,
category: 'blocks',
docsLink: 'https://docs.simstudio.ai/blocks/knowledge',
tools: {
access: ['knowledge_search', 'knowledge_upload_chunk'],
access: ['knowledge_search', 'knowledge_upload_chunk', 'knowledge_create_document'],
config: {
tool: (params) => {
switch (params.operation) {
@@ -20,6 +20,8 @@ export const KnowledgeBlock: BlockConfig = {
return 'knowledge_search'
case 'upload_chunk':
return 'knowledge_upload_chunk'
case 'create_document':
return 'knowledge_create_document'
default:
return 'knowledge_search'
}
@@ -53,6 +55,7 @@ export const KnowledgeBlock: BlockConfig = {
options: [
{ label: 'Search', id: 'search' },
{ label: 'Upload Chunk', id: 'upload_chunk' },
{ label: 'Create Document', id: 'create_document' },
],
value: () => 'search',
},
@@ -72,7 +75,7 @@ export const KnowledgeBlock: BlockConfig = {
layout: 'full',
placeholder: 'Select knowledge base',
multiSelect: false,
condition: { field: 'operation', value: 'upload_chunk' },
condition: { field: 'operation', value: ['upload_chunk', 'create_document'] },
},
{
id: 'query',
@@ -107,5 +110,22 @@ export const KnowledgeBlock: BlockConfig = {
rows: 6,
condition: { field: 'operation', value: 'upload_chunk' },
},
{
id: 'name',
title: 'Document Name',
type: 'short-input',
layout: 'full',
placeholder: 'Enter document name',
condition: { field: 'operation', value: ['create_document'] },
},
{
id: 'content',
title: 'Document Content',
type: 'long-input',
layout: 'full',
placeholder: 'Enter the document content',
rows: 6,
condition: { field: 'operation', value: ['create_document'] },
},
],
}

View File

@@ -0,0 +1,9 @@
ALTER TABLE "user_stats" ADD COLUMN "current_usage_limit" numeric DEFAULT '5' NOT NULL;--> statement-breakpoint
ALTER TABLE "user_stats" ADD COLUMN "usage_limit_set_by" text;--> statement-breakpoint
ALTER TABLE "user_stats" ADD COLUMN "usage_limit_updated_at" timestamp DEFAULT now();--> statement-breakpoint
ALTER TABLE "user_stats" ADD COLUMN "current_period_cost" numeric DEFAULT '0' NOT NULL;--> statement-breakpoint
ALTER TABLE "user_stats" ADD COLUMN "billing_period_start" timestamp DEFAULT now();--> statement-breakpoint
ALTER TABLE "user_stats" ADD COLUMN "billing_period_end" timestamp;--> statement-breakpoint
ALTER TABLE "user_stats" ADD COLUMN "last_period_cost" numeric DEFAULT '0';--> statement-breakpoint
CREATE INDEX "subscription_reference_status_idx" ON "subscription" USING btree ("reference_id","status");--> statement-breakpoint
ALTER TABLE "subscription" ADD CONSTRAINT "check_enterprise_metadata" CHECK (plan != 'enterprise' OR (metadata IS NOT NULL AND (metadata->>'perSeatAllowance' IS NOT NULL OR metadata->>'totalAllowance' IS NOT NULL)));

View File

@@ -0,0 +1,82 @@
CREATE TABLE "workflow_execution_blocks" (
"id" text PRIMARY KEY NOT NULL,
"execution_id" text NOT NULL,
"workflow_id" text NOT NULL,
"block_id" text NOT NULL,
"block_name" text,
"block_type" text NOT NULL,
"started_at" timestamp NOT NULL,
"ended_at" timestamp,
"duration_ms" integer,
"status" text NOT NULL,
"error_message" text,
"error_stack_trace" text,
"input_data" jsonb,
"output_data" jsonb,
"cost_input" numeric(10, 6),
"cost_output" numeric(10, 6),
"cost_total" numeric(10, 6),
"tokens_prompt" integer,
"tokens_completion" integer,
"tokens_total" integer,
"model_used" text,
"metadata" jsonb,
"created_at" timestamp DEFAULT now() NOT NULL
);
--> statement-breakpoint
CREATE TABLE "workflow_execution_logs" (
"id" text PRIMARY KEY NOT NULL,
"workflow_id" text NOT NULL,
"execution_id" text NOT NULL,
"state_snapshot_id" text NOT NULL,
"level" text NOT NULL,
"message" text NOT NULL,
"trigger" text NOT NULL,
"started_at" timestamp NOT NULL,
"ended_at" timestamp,
"total_duration_ms" integer,
"block_count" integer DEFAULT 0 NOT NULL,
"success_count" integer DEFAULT 0 NOT NULL,
"error_count" integer DEFAULT 0 NOT NULL,
"skipped_count" integer DEFAULT 0 NOT NULL,
"total_cost" numeric(10, 6),
"total_input_cost" numeric(10, 6),
"total_output_cost" numeric(10, 6),
"total_tokens" integer,
"metadata" jsonb DEFAULT '{}' NOT NULL,
"created_at" timestamp DEFAULT now() NOT NULL
);
--> statement-breakpoint
CREATE TABLE "workflow_execution_snapshots" (
"id" text PRIMARY KEY NOT NULL,
"workflow_id" text NOT NULL,
"state_hash" text NOT NULL,
"state_data" jsonb NOT NULL,
"created_at" timestamp DEFAULT now() NOT NULL
);
--> statement-breakpoint
ALTER TABLE "workflow_execution_blocks" ADD CONSTRAINT "workflow_execution_blocks_workflow_id_workflow_id_fk" FOREIGN KEY ("workflow_id") REFERENCES "public"."workflow"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
ALTER TABLE "workflow_execution_logs" ADD CONSTRAINT "workflow_execution_logs_workflow_id_workflow_id_fk" FOREIGN KEY ("workflow_id") REFERENCES "public"."workflow"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
ALTER TABLE "workflow_execution_logs" ADD CONSTRAINT "workflow_execution_logs_state_snapshot_id_workflow_execution_snapshots_id_fk" FOREIGN KEY ("state_snapshot_id") REFERENCES "public"."workflow_execution_snapshots"("id") ON DELETE no action ON UPDATE no action;--> statement-breakpoint
ALTER TABLE "workflow_execution_snapshots" ADD CONSTRAINT "workflow_execution_snapshots_workflow_id_workflow_id_fk" FOREIGN KEY ("workflow_id") REFERENCES "public"."workflow"("id") ON DELETE cascade ON UPDATE no action;--> statement-breakpoint
CREATE INDEX "execution_blocks_execution_id_idx" ON "workflow_execution_blocks" USING btree ("execution_id");--> statement-breakpoint
CREATE INDEX "execution_blocks_workflow_id_idx" ON "workflow_execution_blocks" USING btree ("workflow_id");--> statement-breakpoint
CREATE INDEX "execution_blocks_block_id_idx" ON "workflow_execution_blocks" USING btree ("block_id");--> statement-breakpoint
CREATE INDEX "execution_blocks_status_idx" ON "workflow_execution_blocks" USING btree ("status");--> statement-breakpoint
CREATE INDEX "execution_blocks_duration_idx" ON "workflow_execution_blocks" USING btree ("duration_ms");--> statement-breakpoint
CREATE INDEX "execution_blocks_cost_idx" ON "workflow_execution_blocks" USING btree ("cost_total");--> statement-breakpoint
CREATE INDEX "execution_blocks_workflow_execution_idx" ON "workflow_execution_blocks" USING btree ("workflow_id","execution_id");--> statement-breakpoint
CREATE INDEX "execution_blocks_execution_status_idx" ON "workflow_execution_blocks" USING btree ("execution_id","status");--> statement-breakpoint
CREATE INDEX "execution_blocks_started_at_idx" ON "workflow_execution_blocks" USING btree ("started_at");--> statement-breakpoint
CREATE INDEX "workflow_execution_logs_workflow_id_idx" ON "workflow_execution_logs" USING btree ("workflow_id");--> statement-breakpoint
CREATE INDEX "workflow_execution_logs_execution_id_idx" ON "workflow_execution_logs" USING btree ("execution_id");--> statement-breakpoint
CREATE INDEX "workflow_execution_logs_trigger_idx" ON "workflow_execution_logs" USING btree ("trigger");--> statement-breakpoint
CREATE INDEX "workflow_execution_logs_level_idx" ON "workflow_execution_logs" USING btree ("level");--> statement-breakpoint
CREATE INDEX "workflow_execution_logs_started_at_idx" ON "workflow_execution_logs" USING btree ("started_at");--> statement-breakpoint
CREATE INDEX "workflow_execution_logs_cost_idx" ON "workflow_execution_logs" USING btree ("total_cost");--> statement-breakpoint
CREATE INDEX "workflow_execution_logs_duration_idx" ON "workflow_execution_logs" USING btree ("total_duration_ms");--> statement-breakpoint
CREATE UNIQUE INDEX "workflow_execution_logs_execution_id_unique" ON "workflow_execution_logs" USING btree ("execution_id");--> statement-breakpoint
CREATE INDEX "workflow_snapshots_workflow_id_idx" ON "workflow_execution_snapshots" USING btree ("workflow_id");--> statement-breakpoint
CREATE INDEX "workflow_snapshots_hash_idx" ON "workflow_execution_snapshots" USING btree ("state_hash");--> statement-breakpoint
CREATE UNIQUE INDEX "workflow_snapshots_workflow_hash_idx" ON "workflow_execution_snapshots" USING btree ("workflow_id","state_hash");--> statement-breakpoint
CREATE INDEX "workflow_snapshots_created_at_idx" ON "workflow_execution_snapshots" USING btree ("created_at");

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -330,6 +330,20 @@
"when": 1750794256278,
"tag": "0047_new_triathlon",
"breakpoints": true
},
{
"idx": 48,
"version": "7",
"when": 1751422991828,
"tag": "0048_flawless_ultron",
"breakpoints": true
},
{
"idx": 49,
"version": "7",
"when": 1751430703326,
"tag": "0049_fancy_cardiac",
"breakpoints": true
}
]
}

View File

@@ -116,6 +116,7 @@ export const workflow = pgTable('workflow', {
folderId: text('folder_id').references(() => workflowFolder.id, { onDelete: 'set null' }),
name: text('name').notNull(),
description: text('description'),
// DEPRECATED: Use normalized tables (workflow_blocks, workflow_edges, workflow_subflows) instead
state: json('state').notNull(),
color: text('color').notNull().default('#3972F6'),
lastSynced: timestamp('last_synced').notNull(),
@@ -132,58 +133,43 @@ export const workflow = pgTable('workflow', {
marketplaceData: json('marketplace_data'),
})
// New normalized workflow tables
export const workflowBlocks = pgTable(
'workflow_blocks',
{
// Primary identification
id: text('id').primaryKey(), // Block UUID from the current JSON structure
id: text('id').primaryKey(),
workflowId: text('workflow_id')
.notNull()
.references(() => workflow.id, { onDelete: 'cascade' }), // Link to parent workflow
.references(() => workflow.id, { onDelete: 'cascade' }),
// Block properties (from current BlockState interface)
type: text('type').notNull(), // e.g., 'starter', 'agent', 'api', 'function'
name: text('name').notNull(), // Display name of the block
type: text('type').notNull(), // 'starter', 'agent', 'api', 'function'
name: text('name').notNull(),
// Position coordinates (from position.x, position.y)
positionX: decimal('position_x').notNull(), // X coordinate on canvas
positionY: decimal('position_y').notNull(), // Y coordinate on canvas
positionX: decimal('position_x').notNull(),
positionY: decimal('position_y').notNull(),
// Block behavior flags (from current BlockState)
enabled: boolean('enabled').notNull().default(true), // Whether block is active
horizontalHandles: boolean('horizontal_handles').notNull().default(true), // UI layout preference
isWide: boolean('is_wide').notNull().default(false), // Whether block uses wide layout
advancedMode: boolean('advanced_mode').notNull().default(false), // Whether block is in advanced mode
height: decimal('height').notNull().default('0'), // Custom height override
enabled: boolean('enabled').notNull().default(true),
horizontalHandles: boolean('horizontal_handles').notNull().default(true),
isWide: boolean('is_wide').notNull().default(false),
advancedMode: boolean('advanced_mode').notNull().default(false),
height: decimal('height').notNull().default('0'),
// Block data (keeping JSON for flexibility as current system does)
subBlocks: jsonb('sub_blocks').notNull().default('{}'), // All subblock configurations
outputs: jsonb('outputs').notNull().default('{}'), // Output type definitions
data: jsonb('data').default('{}'), // Additional block-specific data
subBlocks: jsonb('sub_blocks').notNull().default('{}'),
outputs: jsonb('outputs').notNull().default('{}'),
data: jsonb('data').default('{}'),
// Hierarchy support (for loop/parallel child blocks)
parentId: text('parent_id'), // Self-reference handled by foreign key constraint in migration
extent: text('extent'), // 'parent' or null - for ReactFlow parent constraint
parentId: text('parent_id'),
extent: text('extent'), // 'parent' or null
// Timestamps
createdAt: timestamp('created_at').notNull().defaultNow(),
updatedAt: timestamp('updated_at').notNull().defaultNow(),
},
(table) => ({
// Primary access pattern: get all blocks for a workflow
workflowIdIdx: index('workflow_blocks_workflow_id_idx').on(table.workflowId),
// For finding child blocks of a parent (loop/parallel containers)
parentIdIdx: index('workflow_blocks_parent_id_idx').on(table.parentId),
// Composite index for efficient parent-child queries
workflowParentIdx: index('workflow_blocks_workflow_parent_idx').on(
table.workflowId,
table.parentId
),
// For block type filtering/analytics
workflowTypeIdx: index('workflow_blocks_workflow_type_idx').on(table.workflowId, table.type),
})
)
@@ -191,36 +177,26 @@ export const workflowBlocks = pgTable(
export const workflowEdges = pgTable(
'workflow_edges',
{
// Primary identification
id: text('id').primaryKey(), // Edge UUID from ReactFlow
id: text('id').primaryKey(),
workflowId: text('workflow_id')
.notNull()
.references(() => workflow.id, { onDelete: 'cascade' }), // Link to parent workflow
.references(() => workflow.id, { onDelete: 'cascade' }),
// Connection definition (from ReactFlow Edge interface)
sourceBlockId: text('source_block_id')
.notNull()
.references(() => workflowBlocks.id, { onDelete: 'cascade' }), // Source block ID
.references(() => workflowBlocks.id, { onDelete: 'cascade' }),
targetBlockId: text('target_block_id')
.notNull()
.references(() => workflowBlocks.id, { onDelete: 'cascade' }), // Target block ID
sourceHandle: text('source_handle'), // Specific output handle (optional)
targetHandle: text('target_handle'), // Specific input handle (optional)
.references(() => workflowBlocks.id, { onDelete: 'cascade' }),
sourceHandle: text('source_handle'),
targetHandle: text('target_handle'),
// Timestamps
createdAt: timestamp('created_at').notNull().defaultNow(),
},
(table) => ({
// Primary access pattern: get all edges for a workflow
workflowIdIdx: index('workflow_edges_workflow_id_idx').on(table.workflowId),
// For finding outgoing connections from a block
sourceBlockIdx: index('workflow_edges_source_block_idx').on(table.sourceBlockId),
// For finding incoming connections to a block
targetBlockIdx: index('workflow_edges_target_block_idx').on(table.targetBlockId),
// For comprehensive workflow topology queries
workflowSourceIdx: index('workflow_edges_workflow_source_idx').on(
table.workflowId,
table.sourceBlockId
@@ -235,25 +211,19 @@ export const workflowEdges = pgTable(
export const workflowSubflows = pgTable(
'workflow_subflows',
{
// Primary identification
id: text('id').primaryKey(), // Subflow UUID (currently loop/parallel ID)
id: text('id').primaryKey(),
workflowId: text('workflow_id')
.notNull()
.references(() => workflow.id, { onDelete: 'cascade' }), // Link to parent workflow
.references(() => workflow.id, { onDelete: 'cascade' }),
// Subflow type and configuration
type: text('type').notNull(), // 'loop' or 'parallel' (extensible for future types)
config: jsonb('config').notNull().default('{}'), // Type-specific configuration
type: text('type').notNull(), // 'loop' or 'parallel'
config: jsonb('config').notNull().default('{}'),
// Timestamps
createdAt: timestamp('created_at').notNull().defaultNow(),
updatedAt: timestamp('updated_at').notNull().defaultNow(),
},
(table) => ({
// Primary access pattern: get all subflows for a workflow
workflowIdIdx: index('workflow_subflows_workflow_id_idx').on(table.workflowId),
// For filtering by subflow type
workflowTypeIdx: index('workflow_subflows_workflow_type_idx').on(table.workflowId, table.type),
})
)
@@ -272,14 +242,136 @@ export const workflowLogs = pgTable('workflow_logs', {
.notNull()
.references(() => workflow.id, { onDelete: 'cascade' }),
executionId: text('execution_id'),
level: text('level').notNull(), // e.g. "info", "error", etc.
level: text('level').notNull(), // "info", "error", etc.
message: text('message').notNull(),
duration: text('duration'), // Store as text to allow 'NA' for errors
trigger: text('trigger'), // e.g. "api", "schedule", "manual"
trigger: text('trigger'), // "api", "schedule", "manual"
createdAt: timestamp('created_at').notNull().defaultNow(),
metadata: json('metadata'), // Optional JSON field for storing additional context like tool calls
metadata: json('metadata'),
})
export const workflowExecutionSnapshots = pgTable(
'workflow_execution_snapshots',
{
id: text('id').primaryKey(),
workflowId: text('workflow_id')
.notNull()
.references(() => workflow.id, { onDelete: 'cascade' }),
stateHash: text('state_hash').notNull(),
stateData: jsonb('state_data').notNull(),
createdAt: timestamp('created_at').notNull().defaultNow(),
},
(table) => ({
workflowIdIdx: index('workflow_snapshots_workflow_id_idx').on(table.workflowId),
stateHashIdx: index('workflow_snapshots_hash_idx').on(table.stateHash),
workflowHashUnique: uniqueIndex('workflow_snapshots_workflow_hash_idx').on(
table.workflowId,
table.stateHash
),
createdAtIdx: index('workflow_snapshots_created_at_idx').on(table.createdAt),
})
)
export const workflowExecutionLogs = pgTable(
'workflow_execution_logs',
{
id: text('id').primaryKey(),
workflowId: text('workflow_id')
.notNull()
.references(() => workflow.id, { onDelete: 'cascade' }),
executionId: text('execution_id').notNull(),
stateSnapshotId: text('state_snapshot_id')
.notNull()
.references(() => workflowExecutionSnapshots.id),
level: text('level').notNull(), // 'info', 'error'
message: text('message').notNull(),
trigger: text('trigger').notNull(), // 'api', 'webhook', 'schedule', 'manual', 'chat'
startedAt: timestamp('started_at').notNull(),
endedAt: timestamp('ended_at'),
totalDurationMs: integer('total_duration_ms'),
blockCount: integer('block_count').notNull().default(0),
successCount: integer('success_count').notNull().default(0),
errorCount: integer('error_count').notNull().default(0),
skippedCount: integer('skipped_count').notNull().default(0),
totalCost: decimal('total_cost', { precision: 10, scale: 6 }),
totalInputCost: decimal('total_input_cost', { precision: 10, scale: 6 }),
totalOutputCost: decimal('total_output_cost', { precision: 10, scale: 6 }),
totalTokens: integer('total_tokens'),
metadata: jsonb('metadata').notNull().default('{}'),
createdAt: timestamp('created_at').notNull().defaultNow(),
},
(table) => ({
workflowIdIdx: index('workflow_execution_logs_workflow_id_idx').on(table.workflowId),
executionIdIdx: index('workflow_execution_logs_execution_id_idx').on(table.executionId),
triggerIdx: index('workflow_execution_logs_trigger_idx').on(table.trigger),
levelIdx: index('workflow_execution_logs_level_idx').on(table.level),
startedAtIdx: index('workflow_execution_logs_started_at_idx').on(table.startedAt),
costIdx: index('workflow_execution_logs_cost_idx').on(table.totalCost),
durationIdx: index('workflow_execution_logs_duration_idx').on(table.totalDurationMs),
executionIdUnique: uniqueIndex('workflow_execution_logs_execution_id_unique').on(
table.executionId
),
})
)
export const workflowExecutionBlocks = pgTable(
'workflow_execution_blocks',
{
id: text('id').primaryKey(),
executionId: text('execution_id').notNull(),
workflowId: text('workflow_id')
.notNull()
.references(() => workflow.id, { onDelete: 'cascade' }),
blockId: text('block_id').notNull(),
blockName: text('block_name'),
blockType: text('block_type').notNull(),
startedAt: timestamp('started_at').notNull(),
endedAt: timestamp('ended_at'),
durationMs: integer('duration_ms'),
status: text('status').notNull(), // 'success', 'error', 'skipped'
errorMessage: text('error_message'),
errorStackTrace: text('error_stack_trace'),
inputData: jsonb('input_data'),
outputData: jsonb('output_data'),
costInput: decimal('cost_input', { precision: 10, scale: 6 }),
costOutput: decimal('cost_output', { precision: 10, scale: 6 }),
costTotal: decimal('cost_total', { precision: 10, scale: 6 }),
tokensPrompt: integer('tokens_prompt'),
tokensCompletion: integer('tokens_completion'),
tokensTotal: integer('tokens_total'),
modelUsed: text('model_used'),
metadata: jsonb('metadata'),
createdAt: timestamp('created_at').notNull().defaultNow(),
},
(table) => ({
executionIdIdx: index('execution_blocks_execution_id_idx').on(table.executionId),
workflowIdIdx: index('execution_blocks_workflow_id_idx').on(table.workflowId),
blockIdIdx: index('execution_blocks_block_id_idx').on(table.blockId),
statusIdx: index('execution_blocks_status_idx').on(table.status),
durationIdx: index('execution_blocks_duration_idx').on(table.durationMs),
costIdx: index('execution_blocks_cost_idx').on(table.costTotal),
workflowExecutionIdx: index('execution_blocks_workflow_execution_idx').on(
table.workflowId,
table.executionId
),
executionStatusIdx: index('execution_blocks_execution_status_idx').on(
table.executionId,
table.status
),
startedAtIdx: index('execution_blocks_started_at_idx').on(table.startedAt),
})
)
export const environment = pgTable('environment', {
id: text('id').primaryKey(), // Use the user id as the key
userId: text('user_id')
@@ -400,6 +492,14 @@ export const userStats = pgTable('user_stats', {
totalChatExecutions: integer('total_chat_executions').notNull().default(0),
totalTokensUsed: integer('total_tokens_used').notNull().default(0),
totalCost: decimal('total_cost').notNull().default('0'),
currentUsageLimit: decimal('current_usage_limit').notNull().default('5'), // Default $5 for free plan
usageLimitSetBy: text('usage_limit_set_by'), // User ID who set the limit (for team admin tracking)
usageLimitUpdatedAt: timestamp('usage_limit_updated_at').defaultNow(),
// Billing period tracking
currentPeriodCost: decimal('current_period_cost').notNull().default('0'), // Usage in current billing period
billingPeriodStart: timestamp('billing_period_start').defaultNow(), // When current billing period started
billingPeriodEnd: timestamp('billing_period_end'), // When current billing period ends
lastPeriodCost: decimal('last_period_cost').default('0'), // Usage from previous billing period
lastActive: timestamp('last_active').notNull().defaultNow(),
})
@@ -415,21 +515,34 @@ export const customTools = pgTable('custom_tools', {
updatedAt: timestamp('updated_at').notNull().defaultNow(),
})
export const subscription = pgTable('subscription', {
id: text('id').primaryKey(),
plan: text('plan').notNull(),
referenceId: text('reference_id').notNull(),
stripeCustomerId: text('stripe_customer_id'),
stripeSubscriptionId: text('stripe_subscription_id'),
status: text('status'),
periodStart: timestamp('period_start'),
periodEnd: timestamp('period_end'),
cancelAtPeriodEnd: boolean('cancel_at_period_end'),
seats: integer('seats'),
trialStart: timestamp('trial_start'),
trialEnd: timestamp('trial_end'),
metadata: json('metadata'),
})
export const subscription = pgTable(
'subscription',
{
id: text('id').primaryKey(),
plan: text('plan').notNull(),
referenceId: text('reference_id').notNull(),
stripeCustomerId: text('stripe_customer_id'),
stripeSubscriptionId: text('stripe_subscription_id'),
status: text('status'),
periodStart: timestamp('period_start'),
periodEnd: timestamp('period_end'),
cancelAtPeriodEnd: boolean('cancel_at_period_end'),
seats: integer('seats'),
trialStart: timestamp('trial_start'),
trialEnd: timestamp('trial_end'),
metadata: json('metadata'),
},
(table) => ({
referenceStatusIdx: index('subscription_reference_status_idx').on(
table.referenceId,
table.status
),
enterpriseMetadataCheck: check(
'check_enterprise_metadata',
sql`plan != 'enterprise' OR (metadata IS NOT NULL AND (metadata->>'perSeatAllowance' IS NOT NULL OR metadata->>'totalAllowance' IS NOT NULL))`
),
})
)
export const chat = pgTable(
'chat',
@@ -484,7 +597,7 @@ export const member = pgTable('member', {
organizationId: text('organization_id')
.notNull()
.references(() => organization.id, { onDelete: 'cascade' }),
role: text('role').notNull(),
role: text('role').notNull(), // 'admin' or 'member' - team-level permissions only
createdAt: timestamp('created_at').defaultNow().notNull(),
})

View File

@@ -77,6 +77,8 @@ describe('FunctionBlockHandler', () => {
code: inputs.code,
timeout: inputs.timeout,
envVars: {},
blockData: {},
blockNameMapping: {},
_context: { workflowId: mockContext.workflowId },
}
const expectedOutput: BlockOutput = { response: { result: 'Success' } }
@@ -100,6 +102,8 @@ describe('FunctionBlockHandler', () => {
code: expectedCode,
timeout: inputs.timeout,
envVars: {},
blockData: {},
blockNameMapping: {},
_context: { workflowId: mockContext.workflowId },
}
const expectedOutput: BlockOutput = { response: { result: 'Success' } }
@@ -116,6 +120,8 @@ describe('FunctionBlockHandler', () => {
code: inputs.code,
timeout: 5000, // Default timeout
envVars: {},
blockData: {},
blockNameMapping: {},
_context: { workflowId: mockContext.workflowId },
}

View File

@@ -23,12 +23,29 @@ export class FunctionBlockHandler implements BlockHandler {
? inputs.code.map((c: { content: string }) => c.content).join('\n')
: inputs.code
// Extract block data for variable resolution
const blockData: Record<string, any> = {}
const blockNameMapping: Record<string, string> = {}
for (const [blockId, blockState] of context.blockStates.entries()) {
if (blockState.output) {
blockData[blockId] = blockState.output
// Try to find the block name from the workflow
const workflowBlock = context.workflow?.blocks?.find((b) => b.id === blockId)
if (workflowBlock?.metadata?.name) {
blockNameMapping[workflowBlock.metadata.name] = blockId
}
}
}
// Directly use the function_execute tool which calls the API route
logger.info(`Executing function block via API route: ${block.id}`)
const result = await executeTool('function_execute', {
code: codeContent,
timeout: inputs.timeout || 5000,
envVars: context.environmentVariables || {},
blockData: blockData, // Pass block data for variable resolution
blockNameMapping: blockNameMapping, // Pass block name to ID mapping
_context: { workflowId: context.workflowId },
})

View File

@@ -145,7 +145,7 @@ export class WorkflowBlockHandler implements BlockHandler {
logger.info(`Loaded child workflow: ${workflowData.name} (${workflowId})`)
// Extract the workflow state
// Extract the workflow state (API returns normalized data in state field)
const workflowState = workflowData.state
if (!workflowState || !workflowState.blocks) {
@@ -153,7 +153,7 @@ export class WorkflowBlockHandler implements BlockHandler {
return null
}
// Use blocks directly since DB format should match UI format
// Use blocks directly since API returns data from normalized tables
const serializedWorkflow = this.serializer.serializeWorkflow(
workflowState.blocks,
workflowState.edges || [],

View File

@@ -668,4 +668,238 @@ describe('Executor', () => {
expect(createContextSpy).toHaveBeenCalled()
})
})
/**
* Dependency checking logic tests
*/
describe('dependency checking', () => {
test('should handle multi-input blocks with inactive sources correctly', () => {
// Create workflow with router -> multiple APIs -> single agent
const routerWorkflow = {
blocks: [
{
id: 'start',
metadata: { id: 'starter', name: 'Start' },
config: { params: {} },
enabled: true,
},
{
id: 'router',
metadata: { id: 'router', name: 'Router' },
config: { params: { prompt: 'test', model: 'gpt-4' } },
enabled: true,
},
{
id: 'api1',
metadata: { id: 'api', name: 'API 1' },
config: { params: { url: 'http://api1.com', method: 'GET' } },
enabled: true,
},
{
id: 'api2',
metadata: { id: 'api', name: 'API 2' },
config: { params: { url: 'http://api2.com', method: 'GET' } },
enabled: true,
},
{
id: 'agent',
metadata: { id: 'agent', name: 'Agent' },
config: { params: { model: 'gpt-4', userPrompt: 'test' } },
enabled: true,
},
],
connections: [
{ source: 'start', target: 'router' },
{ source: 'router', target: 'api1' },
{ source: 'router', target: 'api2' },
{ source: 'api1', target: 'agent' },
{ source: 'api2', target: 'agent' },
],
loops: {},
parallels: {},
}
const executor = new Executor(routerWorkflow)
const checkDependencies = (executor as any).checkDependencies.bind(executor)
// Mock context simulating: router selected api1, api1 executed, api2 not in active path
const mockContext = {
blockStates: new Map(),
decisions: {
router: new Map([['router', 'api1']]),
condition: new Map(),
},
activeExecutionPath: new Set(['start', 'router', 'api1', 'agent']),
workflow: routerWorkflow,
} as any
const executedBlocks = new Set(['start', 'router', 'api1'])
// Test agent's dependencies
const agentConnections = [
{ source: 'api1', target: 'agent', sourceHandle: 'source' },
{ source: 'api2', target: 'agent', sourceHandle: 'source' },
]
const dependenciesMet = checkDependencies(agentConnections, executedBlocks, mockContext)
// Both dependencies should be met:
// - api1: in active path AND executed = met
// - api2: NOT in active path = automatically met
expect(dependenciesMet).toBe(true)
})
test('should prioritize special connection types over active path check', () => {
const workflow = createMinimalWorkflow()
const executor = new Executor(workflow)
const checkDependencies = (executor as any).checkDependencies.bind(executor)
const mockContext = {
blockStates: new Map(),
decisions: { router: new Map(), condition: new Map() },
activeExecutionPath: new Set(['block1']), // block2 not in active path
completedLoops: new Set(),
workflow: workflow,
} as any
const executedBlocks = new Set(['block1'])
// Test error connection (should be handled before active path check)
const errorConnections = [{ source: 'block2', target: 'block3', sourceHandle: 'error' }]
// Mock block2 with error state
mockContext.blockStates.set('block2', {
output: { error: 'test error' },
})
// Even though block2 is not in active path, error connection should be handled specially
const errorDepsResult = checkDependencies(errorConnections, new Set(['block2']), mockContext)
expect(errorDepsResult).toBe(true) // source executed + has error = dependency met
// Test loop connection
const loopConnections = [
{ source: 'block2', target: 'block3', sourceHandle: 'loop-end-source' },
]
mockContext.completedLoops.add('block2')
const loopDepsResult = checkDependencies(loopConnections, new Set(['block2']), mockContext)
expect(loopDepsResult).toBe(true) // loop completed = dependency met
})
test('should handle router decisions correctly in dependency checking', () => {
const workflow = createMinimalWorkflow()
const executor = new Executor(workflow)
const checkDependencies = (executor as any).checkDependencies.bind(executor)
// Add router block to workflow
workflow.blocks.push({
id: 'router1',
metadata: { id: 'router', name: 'Router' },
config: { params: {} },
enabled: true,
})
const mockContext = {
blockStates: new Map(),
decisions: {
router: new Map([['router1', 'target1']]), // router selected target1
condition: new Map(),
},
activeExecutionPath: new Set(['router1', 'target1', 'target2']),
workflow: workflow,
} as any
const executedBlocks = new Set(['router1'])
// Test selected target
const selectedConnections = [{ source: 'router1', target: 'target1', sourceHandle: 'source' }]
const selectedResult = checkDependencies(selectedConnections, executedBlocks, mockContext)
expect(selectedResult).toBe(true) // router executed + target selected = dependency met
// Test non-selected target
const nonSelectedConnections = [
{ source: 'router1', target: 'target2', sourceHandle: 'source' },
]
const nonSelectedResult = checkDependencies(
nonSelectedConnections,
executedBlocks,
mockContext
)
expect(nonSelectedResult).toBe(true) // router executed + target NOT selected = dependency auto-met
})
test('should handle condition decisions correctly in dependency checking', () => {
const conditionWorkflow = createWorkflowWithCondition()
const executor = new Executor(conditionWorkflow)
const checkDependencies = (executor as any).checkDependencies.bind(executor)
const mockContext = {
blockStates: new Map(),
decisions: {
router: new Map(),
condition: new Map([['condition1', 'true']]), // condition selected true path
},
activeExecutionPath: new Set(['condition1', 'trueTarget']),
workflow: conditionWorkflow,
} as any
const executedBlocks = new Set(['condition1'])
// Test selected condition path
const trueConnections = [
{ source: 'condition1', target: 'trueTarget', sourceHandle: 'condition-true' },
]
const trueResult = checkDependencies(trueConnections, executedBlocks, mockContext)
expect(trueResult).toBe(true)
// Test non-selected condition path
const falseConnections = [
{ source: 'condition1', target: 'falseTarget', sourceHandle: 'condition-false' },
]
const falseResult = checkDependencies(falseConnections, executedBlocks, mockContext)
expect(falseResult).toBe(true) // condition executed + path NOT selected = dependency auto-met
})
test('should handle regular sequential dependencies correctly', () => {
const workflow = createMinimalWorkflow()
const executor = new Executor(workflow)
const checkDependencies = (executor as any).checkDependencies.bind(executor)
const mockContext = {
blockStates: new Map(),
decisions: { router: new Map(), condition: new Map() },
activeExecutionPath: new Set(['block1', 'block2']),
workflow: workflow,
} as any
const executedBlocks = new Set(['block1'])
// Test normal sequential dependency
const normalConnections = [{ source: 'block1', target: 'block2', sourceHandle: 'source' }]
// Without error
const normalResult = checkDependencies(normalConnections, executedBlocks, mockContext)
expect(normalResult).toBe(true) // source executed + no error = dependency met
// With error should fail regular connection
mockContext.blockStates.set('block1', {
output: { error: 'test error' },
})
const errorResult = checkDependencies(normalConnections, executedBlocks, mockContext)
expect(errorResult).toBe(false) // source executed + has error = regular dependency not met
})
test('should handle empty dependency list', () => {
const workflow = createMinimalWorkflow()
const executor = new Executor(workflow)
const checkDependencies = (executor as any).checkDependencies.bind(executor)
const mockContext = createMockContext()
const executedBlocks = new Set<string>()
// Empty connections should return true
const result = checkDependencies([], executedBlocks, mockContext)
expect(result).toBe(true)
})
})
})

View File

@@ -877,6 +877,9 @@ export class Executor {
insideParallel?: string,
iterationIndex?: number
): boolean {
if (incomingConnections.length === 0) {
return true
}
// Check if this is a loop block
const isLoopBlock = incomingConnections.some((conn) => {
const sourceBlock = this.actualWorkflow.blocks.find((b) => b.id === conn.source)
@@ -994,6 +997,12 @@ export class Executor {
return sourceExecuted && conn.target === selectedTarget
}
// If source is not in active path, consider this dependency met
// This allows blocks with multiple inputs to execute even if some inputs are from inactive paths
if (!context.activeExecutionPath.has(conn.source)) {
return true
}
// For error connections, check if the source had an error
if (conn.sourceHandle === 'error') {
return sourceExecuted && hasSourceError
@@ -1004,12 +1013,6 @@ export class Executor {
return sourceExecuted && !hasSourceError
}
// If source is not in active path, consider this dependency met
// This allows blocks with multiple inputs to execute even if some inputs are from inactive paths
if (!context.activeExecutionPath.has(conn.source)) {
return true
}
// For regular blocks, dependency is met if source is executed
return sourceExecuted
})

View File

@@ -408,4 +408,206 @@ describe('PathTracker', () => {
}).not.toThrow()
})
})
describe('Router downstream path activation', () => {
beforeEach(() => {
// Create router workflow with downstream connections
mockWorkflow = {
version: '1.0',
blocks: [
{
id: 'router1',
metadata: { id: 'router', name: 'Router' },
position: { x: 0, y: 0 },
config: { tool: 'router', params: {} },
inputs: {},
outputs: {},
enabled: true,
},
{
id: 'api1',
metadata: { id: 'api', name: 'API 1' },
position: { x: 0, y: 0 },
config: { tool: 'api', params: {} },
inputs: {},
outputs: {},
enabled: true,
},
{
id: 'api2',
metadata: { id: 'api', name: 'API 2' },
position: { x: 0, y: 0 },
config: { tool: 'api', params: {} },
inputs: {},
outputs: {},
enabled: true,
},
{
id: 'agent1',
metadata: { id: 'agent', name: 'Agent' },
position: { x: 0, y: 0 },
config: { tool: 'agent', params: {} },
inputs: {},
outputs: {},
enabled: true,
},
],
connections: [
{ source: 'router1', target: 'api1' },
{ source: 'router1', target: 'api2' },
{ source: 'api1', target: 'agent1' },
{ source: 'api2', target: 'agent1' },
],
loops: {},
parallels: {},
}
pathTracker = new PathTracker(mockWorkflow)
mockContext = {
workflowId: 'test-router-workflow',
blockStates: new Map(),
blockLogs: [],
metadata: { duration: 0 },
environmentVariables: {},
decisions: { router: new Map(), condition: new Map() },
loopIterations: new Map(),
loopItems: new Map(),
completedLoops: new Set(),
executedBlocks: new Set(),
activeExecutionPath: new Set(),
workflow: mockWorkflow,
}
})
it('should activate downstream paths when router selects a target', () => {
// Mock router output selecting api1
mockContext.blockStates.set('router1', {
output: {
response: {
selectedPath: {
blockId: 'api1',
blockType: 'api',
blockTitle: 'API 1',
},
},
},
executed: true,
executionTime: 100,
})
// Update paths for router
pathTracker.updateExecutionPaths(['router1'], mockContext)
// Both api1 and agent1 should be activated (downstream from api1)
expect(mockContext.activeExecutionPath.has('api1')).toBe(true)
expect(mockContext.activeExecutionPath.has('agent1')).toBe(true)
// api2 should NOT be activated (not selected by router)
expect(mockContext.activeExecutionPath.has('api2')).toBe(false)
})
it('should handle multiple levels of downstream connections', () => {
// Add another level to test deep activation
mockWorkflow.blocks.push({
id: 'finalStep',
metadata: { id: 'api', name: 'Final Step' },
position: { x: 0, y: 0 },
config: { tool: 'api', params: {} },
inputs: {},
outputs: {},
enabled: true,
})
mockWorkflow.connections.push({ source: 'agent1', target: 'finalStep' })
pathTracker = new PathTracker(mockWorkflow)
// Mock router output selecting api1
mockContext.blockStates.set('router1', {
output: {
response: {
selectedPath: {
blockId: 'api1',
blockType: 'api',
blockTitle: 'API 1',
},
},
},
executed: true,
executionTime: 100,
})
pathTracker.updateExecutionPaths(['router1'], mockContext)
// All downstream blocks should be activated
expect(mockContext.activeExecutionPath.has('api1')).toBe(true)
expect(mockContext.activeExecutionPath.has('agent1')).toBe(true)
expect(mockContext.activeExecutionPath.has('finalStep')).toBe(true)
// Non-selected path should not be activated
expect(mockContext.activeExecutionPath.has('api2')).toBe(false)
})
it('should not create infinite loops in cyclic workflows', () => {
// Add a cycle to test loop prevention
mockWorkflow.connections.push({ source: 'agent1', target: 'api1' })
pathTracker = new PathTracker(mockWorkflow)
mockContext.blockStates.set('router1', {
output: {
response: {
selectedPath: {
blockId: 'api1',
blockType: 'api',
blockTitle: 'API 1',
},
},
},
executed: true,
executionTime: 100,
})
// This should not throw or cause infinite recursion
expect(() => {
pathTracker.updateExecutionPaths(['router1'], mockContext)
}).not.toThrow()
// Both api1 and agent1 should still be activated
expect(mockContext.activeExecutionPath.has('api1')).toBe(true)
expect(mockContext.activeExecutionPath.has('agent1')).toBe(true)
})
it('should handle router with no downstream connections', () => {
// Create isolated router
const isolatedWorkflow = {
...mockWorkflow,
connections: [
{ source: 'router1', target: 'api1' },
{ source: 'router1', target: 'api2' },
// Remove downstream connections from api1/api2
],
}
pathTracker = new PathTracker(isolatedWorkflow)
mockContext.blockStates.set('router1', {
output: {
response: {
selectedPath: {
blockId: 'api1',
blockType: 'api',
blockTitle: 'API 1',
},
},
},
executed: true,
executionTime: 100,
})
pathTracker.updateExecutionPaths(['router1'], mockContext)
// Only the selected target should be activated
expect(mockContext.activeExecutionPath.has('api1')).toBe(true)
expect(mockContext.activeExecutionPath.has('api2')).toBe(false)
expect(mockContext.activeExecutionPath.has('agent1')).toBe(false)
})
})
})

View File

@@ -165,10 +165,28 @@ export class PathTracker {
if (selectedPath) {
context.decisions.router.set(block.id, selectedPath)
context.activeExecutionPath.add(selectedPath)
this.activateDownstreamPaths(selectedPath, context)
logger.info(`Router ${block.id} selected path: ${selectedPath}`)
}
}
/**
* Recursively activate downstream paths from a block
*/
private activateDownstreamPaths(blockId: string, context: ExecutionContext): void {
const outgoingConnections = this.getOutgoingConnections(blockId)
for (const conn of outgoingConnections) {
if (!context.activeExecutionPath.has(conn.target)) {
context.activeExecutionPath.add(conn.target)
this.activateDownstreamPaths(conn.target, context)
}
}
}
/**
* Update paths for condition blocks
*/
@@ -219,9 +237,7 @@ export class PathTracker {
const isPartOfLoop = blockLoops.length > 0
for (const conn of outgoingConnections) {
if (
this.shouldActivateConnection(conn, block.id, hasError, isPartOfLoop, blockLoops, context)
) {
if (this.shouldActivateConnection(conn, hasError, isPartOfLoop, blockLoops, context)) {
context.activeExecutionPath.add(conn.target)
}
}
@@ -253,7 +269,6 @@ export class PathTracker {
*/
private shouldActivateConnection(
conn: SerializedConnection,
sourceBlockId: string,
hasError: boolean,
isPartOfLoop: boolean,
blockLoops: Array<{ id: string; loop: any }>,

View File

@@ -593,6 +593,7 @@ export class InputResolver {
isInTemplateLiteral
)
} else {
// The function execution API will handle variable resolution within code strings
formattedValue =
typeof replacementValue === 'object'
? JSON.stringify(replacementValue)

View File

@@ -91,6 +91,10 @@ export function useCollaborativeWorkflow() {
payload.parentId,
payload.extent
)
// Handle auto-connect edge if present
if (payload.autoConnectEdge) {
workflowStore.addEdge(payload.autoConnectEdge)
}
break
case 'update-position': {
// Apply position update only if it's newer than the last applied timestamp
@@ -164,6 +168,10 @@ export function useCollaborativeWorkflow() {
payload.parentId,
payload.extent
)
// Handle auto-connect edge if present
if (payload.autoConnectEdge) {
workflowStore.addEdge(payload.autoConnectEdge)
}
break
}
} else if (target === 'edge') {
@@ -284,7 +292,8 @@ export function useCollaborativeWorkflow() {
position: Position,
data?: Record<string, any>,
parentId?: string,
extent?: 'parent'
extent?: 'parent',
autoConnectEdge?: Edge
) => {
// Create complete block data upfront using the same logic as the store
const blockConfig = getBlock(type)
@@ -306,10 +315,14 @@ export function useCollaborativeWorkflow() {
height: 0,
parentId,
extent,
autoConnectEdge, // Include edge data for atomic operation
}
// Apply locally first
workflowStore.addBlock(id, type, name, position, data, parentId, extent)
if (autoConnectEdge) {
workflowStore.addEdge(autoConnectEdge)
}
// Then broadcast to other clients with complete block data
if (!isApplyingRemoteChange.current) {
@@ -354,10 +367,14 @@ export function useCollaborativeWorkflow() {
height: 0, // Default height, will be set by the UI
parentId,
extent,
autoConnectEdge, // Include edge data for atomic operation
}
// Apply locally first
workflowStore.addBlock(id, type, name, position, data, parentId, extent)
if (autoConnectEdge) {
workflowStore.addEdge(autoConnectEdge)
}
// Then broadcast to other clients with complete block data
if (!isApplyingRemoteChange.current) {

View File

@@ -339,8 +339,36 @@ async function parseWithFileParser(
try {
let content: string
if (fileUrl.startsWith('http://') || fileUrl.startsWith('https://')) {
// Download and parse remote file with timeout
if (fileUrl.startsWith('data:')) {
logger.info(`Processing data URI for: ${filename}`)
try {
const [header, base64Data] = fileUrl.split(',')
if (!base64Data) {
throw new Error('Invalid data URI format')
}
if (header.includes('base64')) {
const buffer = Buffer.from(base64Data, 'base64')
content = buffer.toString('utf8')
} else {
content = decodeURIComponent(base64Data)
}
if (mimeType === 'text/plain') {
logger.info(`Data URI processed successfully for text content: ${filename}`)
} else {
const extension = filename.split('.').pop()?.toLowerCase() || 'txt'
const buffer = Buffer.from(base64Data, 'base64')
const result = await parseBuffer(buffer, extension)
content = result.content
}
} catch (error) {
throw new Error(
`Failed to process data URI: ${error instanceof Error ? error.message : 'Unknown error'}`
)
}
} else if (fileUrl.startsWith('http://') || fileUrl.startsWith('https://')) {
const controller = new AbortController()
const timeoutId = setTimeout(() => controller.abort(), TIMEOUTS.FILE_DOWNLOAD)
@@ -354,7 +382,6 @@ async function parseWithFileParser(
const buffer = Buffer.from(await response.arrayBuffer())
// Extract file extension from filename
const extension = filename.split('.').pop()?.toLowerCase() || ''
if (!extension) {
throw new Error(`Could not determine file extension from filename: ${filename}`)

380
apps/sim/lib/logs/types.ts Normal file
View File

@@ -0,0 +1,380 @@
import type { Edge } from 'reactflow'
import type { BlockLog, NormalizedBlockOutput } from '@/executor/types'
import type { DeploymentStatus } from '@/stores/workflows/registry/types'
import type { Loop, Parallel, WorkflowState } from '@/stores/workflows/workflow/types'
export type { WorkflowState, Loop, Parallel, DeploymentStatus }
export type WorkflowEdge = Edge
export type { NormalizedBlockOutput, BlockLog }
export interface PricingInfo {
input: number
output: number
cachedInput?: number
updatedAt: string
}
export interface TokenUsage {
prompt: number
completion: number
total: number
}
export interface CostBreakdown {
input: number
output: number
total: number
tokens: TokenUsage
model: string
pricing: PricingInfo
}
export interface ToolCall {
name: string
duration: number
startTime: string
endTime: string
status: 'success' | 'error'
input: Record<string, unknown>
output: Record<string, unknown>
error?: string
}
export type BlockInputData = Record<string, any>
export type BlockOutputData = NormalizedBlockOutput | null
export interface ExecutionEnvironment {
variables: Record<string, string>
workflowId: string
executionId: string
userId: string
workspaceId: string
}
export interface ExecutionTrigger {
type: 'api' | 'webhook' | 'schedule' | 'manual' | 'chat'
source: string
data?: Record<string, unknown>
timestamp: string
}
export interface ExecutionStatus {
status: 'running' | 'completed' | 'failed' | 'cancelled'
startedAt: string
endedAt?: string
durationMs?: number
}
export interface WorkflowExecutionSnapshot {
id: string
workflowId: string
stateHash: string
stateData: WorkflowState
createdAt: string
}
export type WorkflowExecutionSnapshotInsert = Omit<WorkflowExecutionSnapshot, 'createdAt'>
export type WorkflowExecutionSnapshotSelect = WorkflowExecutionSnapshot
export interface WorkflowExecutionLog {
id: string
workflowId: string
executionId: string
stateSnapshotId: string
level: 'info' | 'error'
message: string
trigger: ExecutionTrigger['type']
startedAt: string
endedAt: string
totalDurationMs: number
blockCount: number
successCount: number
errorCount: number
skippedCount: number
totalCost: number
totalInputCost: number
totalOutputCost: number
totalTokens: number
primaryModel: string
metadata: {
environment: ExecutionEnvironment
trigger: ExecutionTrigger
traceSpans?: TraceSpan[]
errorDetails?: {
blockId: string
blockName: string
error: string
stackTrace?: string
}
}
duration?: string
createdAt: string
}
export type WorkflowExecutionLogInsert = Omit<WorkflowExecutionLog, 'id' | 'createdAt'>
export type WorkflowExecutionLogSelect = WorkflowExecutionLog
export interface BlockExecutionLog {
id: string
executionId: string
workflowId: string
blockId: string
blockName: string
blockType: string
startedAt: string
endedAt: string
durationMs: number
status: 'success' | 'error' | 'skipped'
errorMessage?: string
errorStackTrace?: string
inputData: BlockInputData
outputData: BlockOutputData
cost: CostBreakdown | null
metadata: {
toolCalls?: ToolCall[]
iterationIndex?: number
virtualBlockId?: string
parentBlockId?: string
environmentSnapshot?: Record<string, string>
}
createdAt: string
}
export type BlockExecutionLogInsert = Omit<BlockExecutionLog, 'id' | 'createdAt'>
export type BlockExecutionLogSelect = BlockExecutionLog
export interface TraceSpan {
id: string
name: string
type: string
duration: number
startTime: string
endTime: string
children?: TraceSpan[]
toolCalls?: ToolCall[]
status?: 'success' | 'error'
tokens?: number
relativeStartMs?: number
blockId?: string
input?: Record<string, unknown>
}
export interface WorkflowExecutionSummary {
id: string
workflowId: string
workflowName: string
executionId: string
trigger: ExecutionTrigger['type']
status: ExecutionStatus['status']
startedAt: string
endedAt: string
durationMs: number
blockStats: {
total: number
success: number
error: number
skipped: number
}
costSummary: {
total: number
inputCost: number
outputCost: number
tokens: number
primaryModel: string
}
stateSnapshotId: string
errorSummary?: {
blockId: string
blockName: string
message: string
}
}
export interface WorkflowExecutionDetail extends WorkflowExecutionSummary {
environment: ExecutionEnvironment
triggerData: ExecutionTrigger
blockExecutions: BlockExecutionSummary[]
traceSpans: TraceSpan[]
workflowState: WorkflowState
}
export interface BlockExecutionSummary {
id: string
blockId: string
blockName: string
blockType: string
startedAt: string
endedAt: string
durationMs: number
status: BlockExecutionLog['status']
errorMessage?: string
cost?: CostBreakdown
inputSummary: {
parameterCount: number
hasComplexData: boolean
}
outputSummary: {
hasOutput: boolean
outputType: string
hasError: boolean
}
}
export interface BlockExecutionDetail extends BlockExecutionSummary {
inputData: BlockInputData
outputData: BlockOutputData
metadata: BlockExecutionLog['metadata']
toolCalls?: ToolCall[]
}
export interface PaginatedResponse<T> {
data: T[]
pagination: {
page: number
pageSize: number
total: number
totalPages: number
hasNext: boolean
hasPrevious: boolean
}
}
export type WorkflowExecutionsResponse = PaginatedResponse<WorkflowExecutionSummary>
export type BlockExecutionsResponse = PaginatedResponse<BlockExecutionSummary>
export interface WorkflowExecutionFilters {
workflowIds?: string[]
folderIds?: string[]
triggers?: ExecutionTrigger['type'][]
status?: ExecutionStatus['status'][]
startDate?: string
endDate?: string
search?: string
minDuration?: number
maxDuration?: number
minCost?: number
maxCost?: number
hasErrors?: boolean
}
export interface PaginationParams {
page: number
pageSize: number
sortBy?: 'startedAt' | 'durationMs' | 'totalCost' | 'blockCount'
sortOrder?: 'asc' | 'desc'
}
export interface LogsQueryParams extends WorkflowExecutionFilters, PaginationParams {
includeBlockSummary?: boolean
includeWorkflowState?: boolean
}
export interface LogsError {
code: 'EXECUTION_NOT_FOUND' | 'SNAPSHOT_NOT_FOUND' | 'INVALID_WORKFLOW_STATE' | 'STORAGE_ERROR'
message: string
details?: Record<string, unknown>
}
export interface ValidationError {
field: string
message: string
value: unknown
}
export class LogsServiceError extends Error {
public code: LogsError['code']
public details?: Record<string, unknown>
constructor(message: string, code: LogsError['code'], details?: Record<string, unknown>) {
super(message)
this.name = 'LogsServiceError'
this.code = code
this.details = details
}
}
export interface DatabaseOperationResult<T> {
success: boolean
data?: T
error?: LogsServiceError
}
export interface BatchInsertResult<T> {
inserted: T[]
failed: Array<{
item: T
error: string
}>
totalAttempted: number
totalSucceeded: number
totalFailed: number
}
export interface SnapshotService {
createSnapshot(workflowId: string, state: WorkflowState): Promise<WorkflowExecutionSnapshot>
getSnapshot(id: string): Promise<WorkflowExecutionSnapshot | null>
getSnapshotByHash(workflowId: string, hash: string): Promise<WorkflowExecutionSnapshot | null>
computeStateHash(state: WorkflowState): string
cleanupOrphanedSnapshots(olderThanDays: number): Promise<number>
}
export interface SnapshotCreationResult {
snapshot: WorkflowExecutionSnapshot
isNew: boolean
}
export interface ExecutionLoggerService {
startWorkflowExecution(params: {
workflowId: string
executionId: string
trigger: ExecutionTrigger
environment: ExecutionEnvironment
workflowState: WorkflowState
}): Promise<{
workflowLog: WorkflowExecutionLog
snapshot: WorkflowExecutionSnapshot
}>
logBlockExecution(params: {
executionId: string
workflowId: string
blockId: string
blockName: string
blockType: string
input: BlockInputData
output: BlockOutputData
timing: {
startedAt: string
endedAt: string
durationMs: number
}
status: BlockExecutionLog['status']
error?: {
message: string
stackTrace?: string
}
cost?: CostBreakdown
metadata?: BlockExecutionLog['metadata']
}): Promise<BlockExecutionLog>
completeWorkflowExecution(params: {
executionId: string
endedAt: string
totalDurationMs: number
blockStats: {
total: number
success: number
error: number
skipped: number
}
costSummary: {
totalCost: number
totalInputCost: number
totalOutputCost: number
totalTokens: number
primaryModel: string
}
finalOutput: BlockOutputData
traceSpans?: TraceSpan[]
}): Promise<WorkflowExecutionLog>
}

View File

@@ -6,6 +6,7 @@ import { persistExecutionError, persistExecutionLogs } from '@/lib/logs/executio
import { buildTraceSpans } from '@/lib/logs/trace-spans'
import { hasProcessedMessage, markMessageAsProcessed } from '@/lib/redis'
import { decryptSecret } from '@/lib/utils'
import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/db-helpers'
import { updateWorkflowRunCounts } from '@/lib/workflows/utils'
import { getOAuthToken } from '@/app/api/auth/oauth/utils'
import { db } from '@/db'
@@ -13,7 +14,6 @@ import { environment, userStats, webhook } from '@/db/schema'
import { Executor } from '@/executor'
import { Serializer } from '@/serializer'
import { mergeSubblockStateAsync } from '@/stores/workflows/server-utils'
import type { WorkflowState } from '@/stores/workflows/workflow/types'
const logger = createLogger('WebhookUtils')
@@ -475,23 +475,28 @@ export async function executeWorkflowFromPayload(
// Returns void as errors are handled internally
try {
// Get the workflow state
if (!foundWorkflow.state) {
logger.error(`[${requestId}] TRACE: Missing workflow state`, {
// Load workflow data from normalized tables
logger.debug(`[${requestId}] Loading workflow ${foundWorkflow.id} from normalized tables`)
const normalizedData = await loadWorkflowFromNormalizedTables(foundWorkflow.id)
if (!normalizedData) {
logger.error(`[${requestId}] TRACE: No normalized data found for workflow`, {
workflowId: foundWorkflow.id,
hasState: false,
hasNormalizedData: false,
})
throw new Error(`Workflow ${foundWorkflow.id} has no state`)
throw new Error(`Workflow ${foundWorkflow.id} data not found in normalized tables`)
}
const state = foundWorkflow.state as WorkflowState
const { blocks, edges, loops, parallels } = state
// Use normalized data for execution
const { blocks, edges, loops, parallels } = normalizedData
logger.info(`[${requestId}] Loaded workflow ${foundWorkflow.id} from normalized tables`)
// DEBUG: Log state information
logger.debug(`[${requestId}] TRACE: Retrieved workflow state`, {
logger.debug(`[${requestId}] TRACE: Retrieved workflow state from normalized tables`, {
workflowId: foundWorkflow.id,
blockCount: Object.keys(blocks || {}).length,
edgeCount: (edges || []).length,
loopCount: (loops || []).length,
loopCount: Object.keys(loops || {}).length,
})
logger.debug(

View File

@@ -122,6 +122,7 @@ export const PROVIDER_DEFINITIONS: Record<string, ProviderDefinition> = {
updatedAt: '2025-06-17',
},
capabilities: {
temperature: { min: 0, max: 2 },
toolUsageControl: true,
},
},
@@ -134,6 +135,7 @@ export const PROVIDER_DEFINITIONS: Record<string, ProviderDefinition> = {
updatedAt: '2025-06-17',
},
capabilities: {
temperature: { min: 0, max: 2 },
toolUsageControl: true,
},
},
@@ -146,6 +148,7 @@ export const PROVIDER_DEFINITIONS: Record<string, ProviderDefinition> = {
updatedAt: '2025-06-17',
},
capabilities: {
temperature: { min: 0, max: 2 },
toolUsageControl: true,
},
},

View File

@@ -110,6 +110,9 @@ describe('Model Capabilities', () => {
it.concurrent('should return true for models that support temperature', () => {
const supportedModels = [
'gpt-4o',
'gpt-4.1',
'gpt-4.1-mini',
'gpt-4.1-nano',
'gemini-2.5-flash',
'claude-sonnet-4-0',
'claude-opus-4-0',
@@ -139,10 +142,6 @@ describe('Model Capabilities', () => {
'deepseek-r1',
// Chat models that don't support temperature
'deepseek-chat',
// GPT-4.1 family models that don't support temperature
'gpt-4.1',
'gpt-4.1-nano',
'gpt-4.1-mini',
'azure/gpt-4.1',
'azure/model-router',
]

View File

@@ -29,6 +29,34 @@ const db = socketDb
// Constants
const DEFAULT_LOOP_ITERATIONS = 5
/**
* Shared function to handle auto-connect edge insertion
* @param tx - Database transaction
* @param workflowId - The workflow ID
* @param autoConnectEdge - The auto-connect edge data
* @param logger - Logger instance
*/
async function insertAutoConnectEdge(
tx: any,
workflowId: string,
autoConnectEdge: any,
logger: any
) {
if (!autoConnectEdge) return
await tx.insert(workflowEdges).values({
id: autoConnectEdge.id,
workflowId,
sourceBlockId: autoConnectEdge.source,
targetBlockId: autoConnectEdge.target,
sourceHandle: autoConnectEdge.sourceHandle || null,
targetHandle: autoConnectEdge.targetHandle || null,
})
logger.debug(
`Added auto-connect edge ${autoConnectEdge.id}: ${autoConnectEdge.source} -> ${autoConnectEdge.target}`
)
}
// Enum for subflow types
enum SubflowType {
LOOP = 'loop',
@@ -246,6 +274,9 @@ async function handleBlockOperationTx(
}
await tx.insert(workflowBlocks).values(insertData)
// Handle auto-connect edge if present
await insertAutoConnectEdge(tx, workflowId, payload.autoConnectEdge, logger)
} catch (insertError) {
logger.error(`[SERVER] ❌ Failed to insert block ${payload.id}:`, insertError)
throw insertError
@@ -592,6 +623,9 @@ async function handleBlockOperationTx(
}
await tx.insert(workflowBlocks).values(insertData)
// Handle auto-connect edge if present
await insertAutoConnectEdge(tx, workflowId, payload.autoConnectEdge, logger)
} catch (insertError) {
logger.error(`[SERVER] ❌ Failed to insert duplicated block ${payload.id}:`, insertError)
throw insertError

View File

@@ -279,6 +279,32 @@ describe('Socket Server Index Integration', () => {
expect(() => WorkflowOperationSchema.parse(validOperation)).not.toThrow()
})
it.concurrent('should validate block operations with autoConnectEdge', async () => {
const { WorkflowOperationSchema } = await import('./validation/schemas')
const validOperationWithAutoEdge = {
operation: 'add',
target: 'block',
payload: {
id: 'test-block',
type: 'action',
name: 'Test Block',
position: { x: 100, y: 200 },
autoConnectEdge: {
id: 'auto-edge-123',
source: 'source-block',
target: 'test-block',
sourceHandle: 'output',
targetHandle: 'target',
type: 'workflowEdge',
},
},
timestamp: Date.now(),
}
expect(() => WorkflowOperationSchema.parse(validOperationWithAutoEdge)).not.toThrow()
})
it.concurrent('should validate edge operations', async () => {
const { WorkflowOperationSchema } = await import('./validation/schemas')

View File

@@ -5,6 +5,16 @@ const PositionSchema = z.object({
y: z.number(),
})
// Schema for auto-connect edge data
const AutoConnectEdgeSchema = z.object({
id: z.string(),
source: z.string(),
target: z.string(),
sourceHandle: z.string().nullable().optional(),
targetHandle: z.string().nullable().optional(),
type: z.string().optional(),
})
export const BlockOperationSchema = z.object({
operation: z.enum([
'add',
@@ -35,6 +45,7 @@ export const BlockOperationSchema = z.object({
isWide: z.boolean().optional(),
advancedMode: z.boolean().optional(),
height: z.number().optional(),
autoConnectEdge: AutoConnectEdgeSchema.optional(), // Add support for auto-connect edges
}),
timestamp: z.number(),
})
@@ -69,4 +80,4 @@ export const WorkflowOperationSchema = z.union([
SubflowOperationSchema,
])
export { PositionSchema }
export { PositionSchema, AutoConnectEdgeSchema }

View File

@@ -43,9 +43,6 @@ async function initializeApplication(): Promise<void> {
// Mark data as initialized only after sync managers have loaded data from DB
dataInitialized = true
// Register cleanup
window.addEventListener('beforeunload', handleBeforeUnload)
// Log initialization timing information
const initDuration = Date.now() - initStartTime
logger.info(`Application initialization completed in ${initDuration}ms`)

View File

@@ -432,7 +432,7 @@ export const useWorkflowRegistry = create<WorkflowRegistry>()(
let workflowState: any
if (workflowData?.state) {
// Use the state from the database
// API returns normalized data in state
workflowState = {
blocks: workflowData.state.blocks || {},
edges: workflowData.state.edges || [],
@@ -448,9 +448,18 @@ export const useWorkflowRegistry = create<WorkflowRegistry>()(
history: {
past: [],
present: {
state: workflowData.state,
state: {
blocks: workflowData.state.blocks || {},
edges: workflowData.state.edges || [],
loops: workflowData.state.loops || {},
parallels: workflowData.state.parallels || {},
isDeployed: workflowData.isDeployed || false,
deployedAt: workflowData.deployedAt
? new Date(workflowData.deployedAt)
: undefined,
},
timestamp: Date.now(),
action: 'Loaded from database',
action: 'Loaded from database (normalized tables)',
subblockValues: {},
},
future: [],

View File

@@ -50,6 +50,8 @@ describe('Function Execute Tool', () => {
expect(body).toEqual({
code: 'return 42',
envVars: {},
blockData: {},
blockNameMapping: {},
isCustomTool: false,
timeout: 5000,
workflowId: undefined,
@@ -73,6 +75,8 @@ describe('Function Execute Tool', () => {
code: 'const x = 40;\nconst y = 2;\nreturn x + y;',
timeout: 10000,
envVars: {},
blockData: {},
blockNameMapping: {},
isCustomTool: false,
workflowId: undefined,
})
@@ -87,6 +91,8 @@ describe('Function Execute Tool', () => {
code: 'return 42',
timeout: 10000,
envVars: {},
blockData: {},
blockNameMapping: {},
isCustomTool: false,
workflowId: undefined,
})
@@ -158,6 +164,197 @@ describe('Function Execute Tool', () => {
})
})
describe('Enhanced Error Handling', () => {
test('should handle enhanced syntax error with line content', async () => {
// Setup enhanced error response with debug information
tester.setup(
{
success: false,
error:
'Syntax Error: Line 3: `description: "This has a missing closing quote` - Invalid or unexpected token (Check for missing quotes, brackets, or semicolons)',
output: {
result: null,
stdout: '',
executionTime: 5,
},
debug: {
line: 3,
column: undefined,
errorType: 'SyntaxError',
lineContent: 'description: "This has a missing closing quote',
stack: 'user-function.js:5\n description: "This has a missing closing quote\n...',
},
},
{ ok: false, status: 500 }
)
// Execute the tool with syntax error
const result = await tester.execute({
code: 'const obj = {\n name: "test",\n description: "This has a missing closing quote\n};\nreturn obj;',
})
// Check enhanced error handling
expect(result.success).toBe(false)
expect(result.error).toContain('Syntax Error')
expect(result.error).toContain('Line 3')
expect(result.error).toContain('description: "This has a missing closing quote')
expect(result.error).toContain('Invalid or unexpected token')
expect(result.error).toContain('(Check for missing quotes, brackets, or semicolons)')
})
test('should handle enhanced runtime error with line and column', async () => {
// Setup enhanced runtime error response
tester.setup(
{
success: false,
error:
"Type Error: Line 2:16: `return obj.someMethod();` - Cannot read properties of null (reading 'someMethod')",
output: {
result: null,
stdout: 'ERROR: {}\n',
executionTime: 12,
},
debug: {
line: 2,
column: 16,
errorType: 'TypeError',
lineContent: 'return obj.someMethod();',
stack: 'TypeError: Cannot read properties of null...',
},
},
{ ok: false, status: 500 }
)
// Execute the tool with runtime error
const result = await tester.execute({
code: 'const obj = null;\nreturn obj.someMethod();',
})
// Check enhanced error handling
expect(result.success).toBe(false)
expect(result.error).toContain('Type Error')
expect(result.error).toContain('Line 2:16')
expect(result.error).toContain('return obj.someMethod();')
expect(result.error).toContain('Cannot read properties of null')
})
test('should handle enhanced error information in tool response', async () => {
// Setup enhanced error response with full debug info
tester.setup(
{
success: false,
error: 'Reference Error: Line 1: `return undefinedVar` - undefinedVar is not defined',
output: {
result: null,
stdout: '',
executionTime: 3,
},
debug: {
line: 1,
column: 7,
errorType: 'ReferenceError',
lineContent: 'return undefinedVar',
stack: 'ReferenceError: undefinedVar is not defined...',
},
},
{ ok: false, status: 500 }
)
// Execute the tool with reference error
const result = await tester.execute({
code: 'return undefinedVar',
})
// Check that the tool properly captures enhanced error
expect(result.success).toBe(false)
expect(result.error).toBe(
'Reference Error: Line 1: `return undefinedVar` - undefinedVar is not defined'
)
})
test('should preserve debug information in error object', async () => {
// Setup enhanced error response
tester.setup(
{
success: false,
error: 'Syntax Error: Line 2 - Invalid syntax',
debug: {
line: 2,
column: 5,
errorType: 'SyntaxError',
lineContent: 'invalid syntax here',
stack: 'SyntaxError: Invalid syntax...',
},
},
{ ok: false, status: 500 }
)
// Execute the tool
const result = await tester.execute({
code: 'valid line\ninvalid syntax here',
})
// Check that enhanced error information is available
expect(result.success).toBe(false)
expect(result.error).toBe('Syntax Error: Line 2 - Invalid syntax')
// Note: In this test framework, debug information would be available
// in the response object, but the tool transforms it into the error message
})
test('should handle enhanced error without line information', async () => {
// Setup error response without line information
tester.setup(
{
success: false,
error: 'Generic error message',
debug: {
errorType: 'Error',
stack: 'Error: Generic error message...',
},
},
{ ok: false, status: 500 }
)
// Execute the tool
const result = await tester.execute({
code: 'return "test";',
})
// Check error handling without enhanced line info
expect(result.success).toBe(false)
expect(result.error).toBe('Generic error message')
})
test('should provide line-specific error message when available', async () => {
// Setup enhanced error response with line info
tester.setup(
{
success: false,
error:
'Type Error: Line 5:20: `obj.nonExistentMethod()` - obj.nonExistentMethod is not a function',
debug: {
line: 5,
column: 20,
errorType: 'TypeError',
lineContent: 'obj.nonExistentMethod()',
},
},
{ ok: false, status: 500 }
)
// Execute the tool
const result = await tester.execute({
code: 'const obj = {};\nobj.nonExistentMethod();',
})
// Check that enhanced error message is provided
expect(result.success).toBe(false)
expect(result.error).toContain('Line 5:20')
expect(result.error).toContain('obj.nonExistentMethod()')
})
})
describe('Edge Cases', () => {
test('should handle empty code input', async () => {
// Execute with empty code - this should still pass through to the API

View File

@@ -28,6 +28,18 @@ export const functionExecuteTool: ToolConfig<CodeExecutionInput, CodeExecutionOu
description: 'Environment variables to make available during execution',
default: {},
},
blockData: {
type: 'object',
required: false,
description: 'Block output data for variable resolution',
default: {},
},
blockNameMapping: {
type: 'object',
required: false,
description: 'Mapping of block names to block IDs',
default: {},
},
},
request: {
@@ -45,6 +57,8 @@ export const functionExecuteTool: ToolConfig<CodeExecutionInput, CodeExecutionOu
code: codeContent,
timeout: params.timeout || DEFAULT_TIMEOUT,
envVars: params.envVars || {},
blockData: params.blockData || {},
blockNameMapping: params.blockNameMapping || {},
workflowId: params._context?.workflowId,
isCustomTool: params.isCustomTool || false,
}
@@ -56,7 +70,21 @@ export const functionExecuteTool: ToolConfig<CodeExecutionInput, CodeExecutionOu
const result = await response.json()
if (!response.ok || !result.success) {
throw new Error(result.error || 'Code execution failed')
// Create enhanced error with debug information if available
const error = new Error(result.error || 'Code execution failed')
// Add debug information to the error object if available
if (result.debug) {
Object.assign(error, {
line: result.debug.line,
column: result.debug.column,
errorType: result.debug.errorType,
stack: result.debug.stack,
enhancedError: true,
})
}
throw error
}
return {
@@ -69,6 +97,10 @@ export const functionExecuteTool: ToolConfig<CodeExecutionInput, CodeExecutionOu
},
transformError: (error: any) => {
// If we have enhanced error information, create a more detailed message
if (error.enhancedError && error.line) {
return `Line ${error.line}${error.column ? `:${error.column}` : ''} - ${error.message}`
}
return error.message || 'Code execution failed'
},
}

View File

@@ -5,6 +5,8 @@ export interface CodeExecutionInput {
timeout?: number
memoryLimit?: number
envVars?: Record<string, string>
blockData?: Record<string, any>
blockNameMapping?: Record<string, string>
_context?: {
workflowId?: string
}

View File

@@ -0,0 +1,173 @@
import type { ToolConfig } from '../types'
import type { KnowledgeCreateDocumentResponse } from './types'
export const knowledgeCreateDocumentTool: ToolConfig<any, KnowledgeCreateDocumentResponse> = {
id: 'knowledge_create_document',
name: 'Knowledge Create Document',
description: 'Create a new document in a knowledge base',
version: '1.0.0',
params: {
knowledgeBaseId: {
type: 'string',
required: true,
description: 'ID of the knowledge base containing the document',
},
name: {
type: 'string',
required: true,
description: 'Name of the document',
},
content: {
type: 'string',
required: true,
description: 'Content of the document',
},
},
request: {
url: (params) => `/api/knowledge/${params.knowledgeBaseId}/documents`,
method: 'POST',
headers: () => ({
'Content-Type': 'application/json',
}),
body: (params) => {
const textContent = params.content?.trim()
const documentName = params.name?.trim()
if (!documentName || documentName.length === 0) {
throw new Error('Document name is required')
}
if (documentName.length > 255) {
throw new Error('Document name must be 255 characters or less')
}
if (/[<>:"/\\|?*]/.test(documentName)) {
throw new Error('Document name contains invalid characters. Avoid: < > : " / \\ | ? *')
}
if (!textContent || textContent.length < 10) {
throw new Error('Document content must be at least 10 characters long')
}
if (textContent.length > 1000000) {
throw new Error('Document content exceeds maximum size of 1MB')
}
const contentBytes = new TextEncoder().encode(textContent).length
const utf8Bytes = new TextEncoder().encode(textContent)
const base64Content =
typeof Buffer !== 'undefined'
? Buffer.from(textContent, 'utf8').toString('base64')
: btoa(String.fromCharCode(...utf8Bytes))
const dataUri = `data:text/plain;base64,${base64Content}`
const documents = [
{
filename: documentName.endsWith('.txt') ? documentName : `${documentName}.txt`,
fileUrl: dataUri,
fileSize: contentBytes,
mimeType: 'text/plain',
},
]
return {
documents: documents,
processingOptions: {
chunkSize: 1024,
minCharactersPerChunk: 100,
chunkOverlap: 200,
recipe: 'default',
lang: 'en',
},
bulk: true,
}
},
isInternalRoute: true,
},
transformResponse: async (response): Promise<KnowledgeCreateDocumentResponse> => {
try {
const result = await response.json()
if (!response.ok) {
const errorMessage = result.error?.message || result.message || 'Failed to create document'
throw new Error(errorMessage)
}
const data = result.data || result
const documentsCreated = data.documentsCreated || []
// Handle multiple documents response
const uploadCount = documentsCreated.length
const firstDocument = documentsCreated[0]
return {
success: true,
output: {
data: {
id: firstDocument?.documentId || firstDocument?.id || '',
name:
uploadCount > 1 ? `${uploadCount} documents` : firstDocument?.filename || 'Unknown',
type: 'document',
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
enabled: true,
},
message:
uploadCount > 1
? `Successfully created ${uploadCount} documents in knowledge base`
: `Successfully created document in knowledge base`,
documentId: firstDocument?.documentId || firstDocument?.id || '',
},
}
} catch (error: any) {
return {
success: false,
output: {
data: {
id: '',
name: '',
type: '',
enabled: true,
createdAt: '',
updatedAt: '',
},
message: `Failed to create document: ${error.message || 'Unknown error'}`,
documentId: '',
},
error: `Failed to create document: ${error.message || 'Unknown error'}`,
}
}
},
transformError: async (error): Promise<KnowledgeCreateDocumentResponse> => {
let errorMessage = 'Failed to create document'
if (error.message) {
if (error.message.includes('Document name')) {
errorMessage = `Document name error: ${error.message}`
} else if (error.message.includes('Document content')) {
errorMessage = `Document content error: ${error.message}`
} else if (error.message.includes('invalid characters')) {
errorMessage = `${error.message}. Please use a valid filename.`
} else if (error.message.includes('maximum size')) {
errorMessage = `${error.message}. Consider breaking large content into smaller documents.`
} else {
errorMessage = `Failed to create document: ${error.message}`
}
}
return {
success: false,
output: {
data: {
id: '',
name: '',
type: '',
enabled: true,
createdAt: '',
updatedAt: '',
},
message: errorMessage,
documentId: '',
},
error: errorMessage,
}
},
}

View File

@@ -1,4 +1,5 @@
import { knowledgeCreateDocumentTool } from './create_document'
import { knowledgeSearchTool } from './search'
import { knowledgeUploadChunkTool } from './upload_chunk'
export { knowledgeSearchTool, knowledgeUploadChunkTool }
export { knowledgeSearchTool, knowledgeUploadChunkTool, knowledgeCreateDocumentTool }

View File

@@ -49,3 +49,22 @@ export interface KnowledgeUploadChunkParams {
content: string
enabled?: boolean
}
export interface KnowledgeCreateDocumentResult {
id: string
name: string
type: string
enabled: boolean
createdAt: string
updatedAt: string
}
export interface KnowledgeCreateDocumentResponse {
success: boolean
output: {
data: KnowledgeCreateDocumentResult
message: string
documentId: string
}
error?: string
}

View File

@@ -53,7 +53,11 @@ import { contactsTool as hubspotContacts } from './hubspot/contacts'
import { huggingfaceChatTool } from './huggingface'
import { readUrlTool } from './jina'
import { jiraBulkRetrieveTool, jiraRetrieveTool, jiraUpdateTool, jiraWriteTool } from './jira'
import { knowledgeSearchTool, knowledgeUploadChunkTool } from './knowledge'
import {
knowledgeCreateDocumentTool,
knowledgeSearchTool,
knowledgeUploadChunkTool,
} from './knowledge'
import { linearCreateIssueTool, linearReadIssuesTool } from './linear'
import { linkupSearchTool } from './linkup'
import { mem0AddMemoriesTool, mem0GetMemoriesTool, mem0SearchMemoriesTool } from './mem0'
@@ -191,6 +195,7 @@ export const tools: Record<string, ToolConfig> = {
memory_delete: memoryDeleteTool,
knowledge_search: knowledgeSearchTool,
knowledge_upload_chunk: knowledgeUploadChunkTool,
knowledge_create_document: knowledgeCreateDocumentTool,
elevenlabs_tts: elevenLabsTtsTool,
s3_get_object: s3GetObjectTool,
telegram_message: telegramMessageTool,