Added logic in executor to execute router block, traversing down one of the paths. Use BFS to determine available paths, continue down the one that the router decides. If there is no router, we can still continue with parallel block execution. Added recursive envvar resolution in executor as well

This commit is contained in:
Waleed Latif
2025-02-05 14:11:00 -08:00
parent 4b887fca8f
commit ac079cc295
3 changed files with 497 additions and 238 deletions

View File

@@ -1,11 +1,37 @@
import { ConnectIcon } from '@/components/icons'
import { CodeExecutionOutput } from '@/tools/function/execute'
import { BlockConfig } from '../types'
import { ToolResponse } from '@/tools/types'
import { MODEL_TOOLS, ModelType } from '../consts'
import { BlockConfig } from '../types'
const routerPrompt = (
prompt: string
) => `You are an intelligent routing agent responsible for directing workflow requests to the most appropriate block. Your task is to analyze the input and determine the single most suitable destination based on the request.
interface RouterResponse extends ToolResponse {
output: {
content: string
model: string
tokens?: {
prompt?: number
completion?: number
total?: number
}
selectedPath: {
blockId: string
blockType: string
blockTitle: string
}
}
}
interface TargetBlock {
id: string
type?: string
title?: string
description?: string
category?: string
subBlocks?: Record<string, any>
currentState?: any
}
export const generateRouterPrompt = (prompt: string, targetBlocks?: TargetBlock[]): string => {
const basePrompt = `You are an intelligent routing agent responsible for directing workflow requests to the most appropriate block. Your task is to analyze the input and determine the single most suitable destination based on the request.
Key Instructions:
1. You MUST choose exactly ONE destination from the IDs of the blocks in the workflow. The destination must be a valid block id.
@@ -13,15 +39,53 @@ Key Instructions:
2. Analysis Framework:
- Carefully evaluate the intent and requirements of the request
- Consider the primary action needed
- Match the core functionality with the most appropriate destination
- Match the core functionality with the most appropriate destination`
// If we have target blocks, add their information to the prompt
const targetBlocksInfo = targetBlocks
? `
Available Target Blocks:
${targetBlocks
.map(
(block) => `
ID: ${block.id}
Type: ${block.type}
Title: ${block.title}
Description: ${block.description}
Category: ${block.category}
Configuration: ${JSON.stringify(block.subBlocks, null, 2)}
${block.currentState ? `Current State: ${JSON.stringify(block.currentState, null, 2)}` : ''}
---`
)
.join('\n')}
Routing Instructions:
1. Analyze the input request carefully against each block's:
- Primary purpose (from description)
- Configuration settings
- Current state (if available)
- Processing capabilities
2. Selection Criteria:
- Choose the block that best matches the input's requirements
- Consider the block's specific functionality and constraints
- Factor in any relevant current state or configuration
- Prioritize blocks that can handle the input most effectively`
: ''
return `${basePrompt}${targetBlocksInfo}
Routing Request: ${prompt}
Response Format:
Return ONLY the destination id as a single word, lowercase, no punctuation or explanation.
Example: "2acd9007-27e8-4510-a487-73d3b825e7c1"`
Example: "2acd9007-27e8-4510-a487-73d3b825e7c1"
export const RouterBlock: BlockConfig<CodeExecutionOutput> = {
Remember: Your response must be ONLY the block ID - no additional text, formatting, or explanation.`
}
export const RouterBlock: BlockConfig<RouterResponse> = {
type: 'router',
toolbar: {
title: 'Router',
@@ -55,13 +119,17 @@ export const RouterBlock: BlockConfig<CodeExecutionOutput> = {
},
workflow: {
inputs: {
code: { type: 'string', required: true },
prompt: { type: 'string', required: true },
model: { type: 'string', required: true },
apiKey: { type: 'string', required: true },
},
outputs: {
response: {
type: {
result: 'any',
stdout: 'string',
content: 'string',
model: 'string',
tokens: 'any',
selectedPath: 'json',
},
},
},
@@ -77,7 +145,7 @@ export const RouterBlock: BlockConfig<CodeExecutionOutput> = {
id: 'model',
title: 'Model',
type: 'dropdown',
layout: 'full',
layout: 'half',
options: Object.keys(MODEL_TOOLS),
},
{
@@ -96,7 +164,7 @@ export const RouterBlock: BlockConfig<CodeExecutionOutput> = {
layout: 'full',
hidden: true,
value: (params: Record<string, any>) => {
return routerPrompt(params.prompt || '')
return generateRouterPrompt(params.prompt || '')
},
},
],

View File

@@ -9,6 +9,7 @@
* • Returns per-block logs that can be displayed in the UI for better trace/debug.
*/
import { getAllBlocks } from '@/blocks'
import { generateRouterPrompt } from '@/blocks/blocks/router'
import { BlockOutput } from '@/blocks/types'
import { BlockConfig } from '@/blocks/types'
import { executeProviderRequest } from '@/providers/service'
@@ -98,71 +99,67 @@ export class Executor {
adjacency.get(conn.source)?.push(conn.target)
}
let lastOutput: BlockOutput = { response: {} }
let routerDecision: { routerId: string; chosenPath: string } | null = null
// Start with all blocks that have inDegree = 0
let layer = blocks.filter((b) => (inDegree.get(b.id) || 0) === 0).map((b) => b.id)
// Track the final output from the "last" block or set of blocks
let lastOutput: BlockOutput = { response: {} }
while (layer.length > 0) {
// Execute current layer in parallel
// Execute current layer in parallel, but only if blocks are in the chosen path
const results = await Promise.all(
layer.map(async (blockId) => {
// Find the block object
const block = blocks.find((b) => b.id === blockId)
if (!block) {
throw new Error(`Missing block ${blockId}`)
}
// Skip disabled blocks
if (block.enabled === false) {
return { response: {} }
}
// Prepare a new blockLog entry
const blockLog: Partial<BlockLog> = {
blockId: block.id,
blockTitle: block.metadata?.title,
blockType: block.metadata?.type,
startedAt: new Date().toISOString(),
}
try {
// Resolve template references in block config params
const resolvedInputs = this.resolveInputs(block, context)
// Execute the block, store the result
const output = await this.executeBlock(block, resolvedInputs, context)
context.blockStates.set(block.id, output)
// Update block log with success
blockLog.success = true
blockLog.output = output
return output
} catch (error) {
// Update block log with error
blockLog.success = false
blockLog.error = error instanceof Error ? error.message : 'Unknown error'
throw error
} finally {
// Compute the end time and duration
const end = new Date()
blockLog.endedAt = end.toISOString()
if (blockLog.startedAt) {
const started = new Date(blockLog.startedAt).getTime()
blockLog.durationMs = end.getTime() - started
} else {
blockLog.durationMs = 0
layer
.filter((blockId) => {
// If we have a router decision, only execute blocks in the chosen path
if (routerDecision) {
return this.isInChosenPath(
blockId,
routerDecision.chosenPath,
routerDecision.routerId
)
}
return true
})
.map(async (blockId) => {
const block = blocks.find((b) => b.id === blockId)
if (!block) {
throw new Error(`Missing block ${blockId}`)
}
// Push the log entry
context.blockLogs.push(blockLog as BlockLog)
}
})
// Skip disabled blocks
if (block.enabled === false) {
return { response: {} }
}
try {
const resolvedInputs = this.resolveInputs(block, context)
const output = await this.executeBlock(block, resolvedInputs, context)
// If this is a router block, store its decision
if (
block.metadata?.type === 'router' &&
output &&
typeof output === 'object' &&
'response' in output &&
output.response &&
typeof output.response === 'object' &&
'selectedPath' in output.response
) {
const routerResponse = output.response as { selectedPath: { blockId: string } }
routerDecision = {
routerId: block.id,
chosenPath: routerResponse.selectedPath.blockId,
}
}
context.blockStates.set(block.id, output)
return output
} catch (error) {
throw error
}
})
)
// Keep track of the "most recent" result as lastOutput
if (results.length > 0) {
lastOutput = results[results.length - 1]
}
@@ -204,172 +201,226 @@ export class Executor {
block: SerializedBlock,
inputs: Record<string, any>,
context: ExecutionContext
): Promise<{ response: Record<string, any> }> {
// Special handling for agent blocks that use providers
if (block.metadata?.type === 'agent') {
const model = inputs.model || 'gpt-4o'
const providerId =
model.startsWith('gpt') || model.startsWith('o1')
? 'openai'
: model.startsWith('claude')
? 'anthropic'
: model.startsWith('gemini')
? 'google'
: model.startsWith('grok')
? 'xai'
: 'deepseek'
): Promise<BlockOutput> {
// console.log(`Executing block ${block.metadata?.title} (${block.id})`, {
// type: block.metadata?.type,
// inputs
// });
// Format tools if they exist
const tools = Array.isArray(inputs.tools)
? inputs.tools
.map((tool: any) => {
// Get the tool ID from the block type
const block = getAllBlocks().find((b: BlockConfig) => b.type === tool.type)
const toolId = block?.tools.access[0]
if (!toolId) return null
// Get the tool configuration
const toolConfig = getTool(toolId)
if (!toolConfig) return null
// Resolve environment variables in tool parameters
const resolvedParams = Object.entries(tool.params || {}).reduce(
(acc, [key, value]) => {
if (typeof value === 'string') {
// Handle environment variables with {{}} syntax
const envMatches = value.match(/\{\{([^}]+)\}\}/g)
if (envMatches) {
let resolvedValue = value
for (const match of envMatches) {
const envKey = match.slice(2, -2) // remove {{ and }}
const envValue = context.environmentVariables?.[envKey]
if (envValue === undefined) {
throw new Error(`Environment variable "${envKey}" was not found.`)
}
resolvedValue = resolvedValue.replace(match, envValue)
}
acc[key] = resolvedValue
} else {
acc[key] = value
}
} else {
acc[key] = value
}
return acc
},
{} as Record<string, any>
)
// Return the tool configuration with resolved parameters
return {
id: toolConfig.id,
name: toolConfig.name,
description: toolConfig.description,
params: resolvedParams,
parameters: {
type: 'object',
properties: Object.entries(toolConfig.params).reduce(
(acc, [key, config]) => ({
...acc,
[key]: {
type: config.type === 'json' ? 'object' : config.type,
description: config.description || '',
...(key in resolvedParams && { default: resolvedParams[key] }),
},
}),
{}
),
required: Object.entries(toolConfig.params)
.filter(([_, config]) => config.required)
.map(([key]) => key),
},
}
})
.filter((t): t is NonNullable<typeof t> => t !== null)
: []
const requestPayload = {
model,
systemPrompt: inputs.systemPrompt,
context: inputs.context,
tools: tools.length > 0 ? tools : undefined,
temperature: inputs.temperature,
maxTokens: inputs.maxTokens,
apiKey: inputs.apiKey,
}
// Log the request payload for debugging
console.log('Provider Request:', {
providerId,
model,
tools: requestPayload.tools,
})
const response = await executeProviderRequest(providerId, requestPayload)
// Return the actual response values
return {
response: {
content: response.content,
model: response.model,
tokens: response.tokens || {
prompt: 0,
completion: 0,
total: 0,
},
toolCalls: {
list: response.toolCalls || [],
count: response.toolCalls?.length || 0,
},
},
}
// Start timing
const startTime = new Date()
const blockLog: BlockLog = {
blockId: block.id,
blockTitle: block.metadata?.title,
blockType: block.metadata?.type,
startedAt: startTime.toISOString(),
endedAt: '',
durationMs: 0,
success: false,
}
// Regular tool execution for non-agent blocks
const toolId = block.config.tool
if (!toolId) {
throw new Error(`Block "${block.id}" does not specify a tool`)
}
const tool: Tool | undefined = tools[toolId]
if (!tool) {
throw new Error(`Tool not found: ${toolId}`)
}
const validatedParams = this.validateToolParams(tool, {
...block.config.params,
...inputs,
})
try {
const result = await executeTool(toolId, validatedParams)
// Handle router blocks differently
if (block.metadata?.type === 'router') {
const routerOutput = await this.executeRouterBlock(block, context)
// console.log('Router output:', routerOutput);
// Filter workflow to only include blocks in the chosen path
this.workflow.blocks = this.workflow.blocks.filter((b) =>
this.isInChosenPath(b.id, routerOutput.selectedPath.blockId, block.id)
)
const output = {
response: {
content: routerOutput.content,
model: routerOutput.model,
tokens: routerOutput.tokens,
selectedPath: routerOutput.selectedPath,
},
}
blockLog.success = true
blockLog.output = output
// Compute timing
const endTime = new Date()
blockLog.endedAt = endTime.toISOString()
blockLog.durationMs = endTime.getTime() - startTime.getTime()
// Add log entry
context.blockLogs.push(blockLog)
return output
}
// Special handling for agent blocks that use providers
if (block.metadata?.type === 'agent') {
// console.log('Agent inputs:', {
// systemPrompt: inputs.systemPrompt,
// context: inputs.context,
// tools: inputs.tools
// });
const model = inputs.model || 'gpt-4o'
const providerId =
model.startsWith('gpt') || model.startsWith('o1')
? 'openai'
: model.startsWith('claude')
? 'anthropic'
: model.startsWith('gemini')
? 'google'
: model.startsWith('grok')
? 'xai'
: 'deepseek'
// Format tools if they exist
const tools = Array.isArray(inputs.tools)
? inputs.tools
.map((tool: any) => {
// console.log('Processing tool:', tool);
// Get the tool ID from the block type
const block = getAllBlocks().find((b: BlockConfig) => b.type === tool.type)
const toolId = block?.tools.access[0]
if (!toolId) {
// console.log('No tool ID found for type:', tool.type);
return null
}
// Get the tool configuration
const toolConfig = getTool(toolId)
if (!toolConfig) {
// console.log('No tool config found for ID:', toolId);
return null
}
// Return the tool configuration with resolved parameters
const toolSetup = {
id: toolConfig.id,
name: toolConfig.name,
description: toolConfig.description,
params: tool.params || {},
parameters: {
type: 'object',
properties: Object.entries(toolConfig.params).reduce(
(acc, [key, config]) => ({
...acc,
[key]: {
type: config.type === 'json' ? 'object' : config.type,
description: config.description || '',
...(key in tool.params && { default: tool.params[key] }),
},
}),
{}
),
required: Object.entries(toolConfig.params)
.filter(([_, config]) => config.required)
.map(([key]) => key),
},
}
// console.log('Tool setup:', toolSetup);
return toolSetup
})
.filter((t): t is NonNullable<typeof t> => t !== null)
: []
// console.log('Formatted tools:', tools);
const response = await executeProviderRequest(providerId, {
model,
systemPrompt: inputs.systemPrompt,
context: inputs.context,
tools: tools.length > 0 ? tools : undefined,
temperature: inputs.temperature,
maxTokens: inputs.maxTokens,
apiKey: inputs.apiKey,
})
// console.log('Provider response:', {
// content: response.content,
// toolCalls: response.toolCalls
// });
const output = {
response: {
content: response.content,
model: response.model,
tokens: response.tokens || {
prompt: 0,
completion: 0,
total: 0,
},
toolCalls: {
list: response.toolCalls || [],
count: response.toolCalls?.length || 0,
},
},
}
blockLog.success = true
blockLog.output = output
// Compute timing
const endTime = new Date()
blockLog.endedAt = endTime.toISOString()
blockLog.durationMs = endTime.getTime() - startTime.getTime()
// Add log entry
context.blockLogs.push(blockLog)
return output
}
// Regular tool execution
const tool = getTool(block.config.tool)
if (!tool) {
throw new Error(`Tool ${block.config.tool} not found`)
}
// console.log('Executing tool:', {
// tool: block.config.tool,
// inputs
// });
const result = await executeTool(block.config.tool, inputs)
if (!result.success) {
// Ensure we have a meaningful error message
const errorMessage = result.error || `Tool ${toolId} failed with no error message`
throw new Error(errorMessage)
console.error('Tool execution failed:', result.error)
throw new Error(result.error || `Tool ${block.config.tool} failed with no error message`)
}
return { response: result.output }
const output = { response: result.output }
blockLog.success = true
blockLog.output = output
// Compute timing
const endTime = new Date()
blockLog.endedAt = endTime.toISOString()
blockLog.durationMs = endTime.getTime() - startTime.getTime()
// Add log entry
context.blockLogs.push(blockLog)
return output
} catch (error: any) {
// Update block log with error
const blockLog: Partial<BlockLog> = {
console.error('Block execution failed:', {
blockId: block.id,
blockTitle: block.metadata?.title,
blockType: block.metadata?.type,
success: false,
error: error.message || `Tool ${toolId} failed`,
startedAt: new Date().toISOString(),
endedAt: new Date().toISOString(),
durationMs: 0,
}
error: error.message,
})
// Add the log entry
context.blockLogs.push(blockLog as BlockLog)
// Update block log with error
blockLog.success = false
blockLog.error = error.message || `Block execution failed`
// Compute timing
const endTime = new Date()
blockLog.endedAt = endTime.toISOString()
blockLog.durationMs = endTime.getTime() - startTime.getTime()
// Add log entry
context.blockLogs.push(blockLog)
// Re-throw the error
throw error
}
}
@@ -409,6 +460,38 @@ export class Executor {
])
)
// Helper function to resolve environment variables in a value
const resolveEnvVars = (value: any): any => {
if (typeof value === 'string') {
const envMatches = value.match(/\{\{([^}]+)\}\}/g)
if (envMatches) {
let resolvedValue = value
for (const match of envMatches) {
const envKey = match.slice(2, -2)
const envValue = this.environmentVariables?.[envKey]
if (envValue === undefined) {
throw new Error(`Environment variable "${envKey}" was not found.`)
}
resolvedValue = resolvedValue.replace(match, envValue)
}
return resolvedValue
}
} else if (Array.isArray(value)) {
return value.map((item) => resolveEnvVars(item))
} else if (value && typeof value === 'object') {
return Object.entries(value).reduce(
(acc, [k, v]) => ({
...acc,
[k]: resolveEnvVars(v),
}),
{}
)
}
return value
}
const resolvedInputs = Object.entries(inputs).reduce(
(acc, [key, value]) => {
if (typeof value === 'string') {
@@ -475,20 +558,8 @@ export class Executor {
}
}
// Handle environment variables with {{}} syntax
const envMatches = resolvedValue.match(/\{\{([^}]+)\}\}/g)
if (envMatches) {
for (const match of envMatches) {
const envKey = match.slice(2, -2) // remove {{ and }}
const envValue = this.environmentVariables?.[envKey]
if (envValue === undefined) {
throw new Error(`Environment variable "${envKey}" was not found.`)
}
resolvedValue = resolvedValue.replace(match, envValue)
}
}
// After all block references are resolved, resolve any environment variables
resolvedValue = resolveEnvVars(resolvedValue)
// After all replacements are done, attempt JSON parse if it looks like JSON
try {
@@ -501,8 +572,8 @@ export class Executor {
acc[key] = resolvedValue
}
} else {
// Not a string param
acc[key] = value
// For non-string values, still try to resolve any nested environment variables
acc[key] = resolveEnvVars(value)
}
return acc
},
@@ -511,4 +582,123 @@ export class Executor {
return resolvedInputs
}
private async executeRouterBlock(
block: SerializedBlock,
context: ExecutionContext
): Promise<{
content: string
model: string
tokens: {
prompt: number
completion: number
total: number
}
selectedPath: {
blockId: string
blockType: string
blockTitle: string
}
}> {
// First resolve all inputs including environment variables
const resolvedInputs = this.resolveInputs(block, context)
const outgoingConnections = this.workflow.connections.filter((conn) => conn.source === block.id)
const targetBlocks = outgoingConnections.map((conn) => {
const targetBlock = this.workflow.blocks.find((b) => b.id === conn.target)
if (!targetBlock) {
throw new Error(`Target block ${conn.target} not found`)
}
return {
id: targetBlock.id,
type: targetBlock.metadata?.type,
title: targetBlock.metadata?.title,
description: targetBlock.metadata?.description,
category: targetBlock.metadata?.category,
subBlocks: targetBlock.config.params,
currentState: context.blockStates.get(targetBlock.id),
}
})
const routerConfig = {
prompt: resolvedInputs.prompt,
model: resolvedInputs.model,
apiKey: resolvedInputs.apiKey,
temperature: resolvedInputs.temperature || 0,
}
// Determine provider based on model
const model = routerConfig.model || 'gpt-4o'
const providerId =
model.startsWith('gpt') || model.startsWith('o1')
? 'openai'
: model.startsWith('claude')
? 'anthropic'
: model.startsWith('gemini')
? 'google'
: model.startsWith('grok')
? 'xai'
: 'deepseek'
const response = await executeProviderRequest(providerId, {
model: routerConfig.model,
systemPrompt: generateRouterPrompt(routerConfig.prompt, targetBlocks),
messages: [
{
role: 'user',
content: routerConfig.prompt,
},
],
temperature: routerConfig.temperature,
apiKey: routerConfig.apiKey,
})
const chosenBlockId = response.content.trim().toLowerCase()
const chosenBlock = targetBlocks.find((b) => b.id === chosenBlockId)
if (!chosenBlock) {
throw new Error(`Invalid routing decision: ${chosenBlockId}`)
}
// Pass through the actual resolved content from the source block
const sourceContent = resolvedInputs.prompt
// Ensure tokens are properly typed
const tokens = response.tokens || { prompt: 0, completion: 0, total: 0 }
return {
content: sourceContent, // This now contains the actual resolved content from Agent 4
model: response.model,
tokens: {
prompt: tokens.prompt || 0,
completion: tokens.completion || 0,
total: tokens.total || 0,
},
selectedPath: {
blockId: chosenBlock.id,
blockType: chosenBlock.type || 'unknown',
blockTitle: chosenBlock.title || 'Untitled Block',
},
}
}
private isInChosenPath(blockId: string, chosenBlockId: string, routerId: string): boolean {
const visited = new Set<string>()
const queue = [chosenBlockId]
while (queue.length > 0) {
const currentId = queue.shift()!
if (visited.has(currentId)) continue
visited.add(currentId)
const connections = this.workflow.connections.filter((conn) => conn.source === currentId)
for (const conn of connections) {
queue.push(conn.target)
}
}
return blockId === routerId || visited.has(blockId)
}
}

View File

@@ -40,6 +40,7 @@ export const readUrlTool: ToolConfig<ReadUrlParams, ReadUrlResponse> = {
},
apiKey: {
type: 'string',
requiredForToolCall: true,
description: 'Your Jina AI API key',
},
},