mirror of
https://github.com/simstudioai/sim.git
synced 2026-01-08 22:48:14 -05:00
Modified executor to log at block-level granularity, and support parallel block execution
This commit is contained in:
@@ -26,32 +26,44 @@ export function useWorkflowExecution() {
|
||||
}, {} as Record<string, any>)
|
||||
|
||||
// Execute workflow
|
||||
const executor = new Executor(
|
||||
new Serializer().serializeWorkflow(blocks, edges),
|
||||
currentBlockStates
|
||||
)
|
||||
const workflow = new Serializer().serializeWorkflow(blocks, edges)
|
||||
const executor = new Executor(workflow, currentBlockStates)
|
||||
|
||||
const result = await executor.execute(crypto.randomUUID())
|
||||
const result = await executor.execute('my-run-id')
|
||||
setExecutionResult(result)
|
||||
|
||||
if (result.logs) {
|
||||
console.group('Detailed Block Logs')
|
||||
result.logs.forEach((log) => {
|
||||
console.log(`Block ${log.blockTitle}: Success=${log.success}`, {
|
||||
output: log.output,
|
||||
error: log.error,
|
||||
durationMs: log.durationMs,
|
||||
startedAt: log.startedAt,
|
||||
endedAt: log.endedAt
|
||||
})
|
||||
})
|
||||
console.groupEnd()
|
||||
}
|
||||
|
||||
// Show execution result with workflowId
|
||||
addNotification(
|
||||
result.success ? 'console' : 'error',
|
||||
result.success
|
||||
? 'Workflow completed successfully'
|
||||
: `Failed to execute workflow: ${result.error}`,
|
||||
: `Workflow execution failed: ${result.error}`,
|
||||
activeWorkflowId
|
||||
)
|
||||
|
||||
// Log detailed result to console
|
||||
// Also log final output info
|
||||
if (result.success) {
|
||||
console.group('Workflow Execution Result')
|
||||
console.group('Final Output')
|
||||
console.log('Status: ✅ Success')
|
||||
console.log('Output:', result.output)
|
||||
if (result.metadata) {
|
||||
console.log('Duration:', result.metadata.duration + 'ms')
|
||||
console.log('Start Time:', new Date(result.metadata.startTime).toLocaleTimeString())
|
||||
console.log('End Time:', new Date(result.metadata.endTime).toLocaleTimeString())
|
||||
console.log('StartedAt:', new Date(result.metadata.startTime).toLocaleTimeString())
|
||||
console.log('EndedAt:', new Date(result.metadata.endTime).toLocaleTimeString())
|
||||
}
|
||||
console.groupEnd()
|
||||
}
|
||||
@@ -62,7 +74,7 @@ export function useWorkflowExecution() {
|
||||
output: { response: {} },
|
||||
error: errorMessage
|
||||
})
|
||||
addNotification('error', `Failed to execute workflow: ${errorMessage}`, activeWorkflowId)
|
||||
addNotification('error', `Workflow execution failed: ${errorMessage}`, activeWorkflowId)
|
||||
} finally {
|
||||
setIsExecuting(false)
|
||||
}
|
||||
|
||||
@@ -1,234 +1,386 @@
|
||||
/**
|
||||
* "Executor" for running agentic workflows in parallel.
|
||||
*
|
||||
* Notes & Features:
|
||||
* • Uses a layered topological sort to allow parallel block execution for blocks with no remaining dependencies.
|
||||
* • Each block's inputs are resolved through a template mechanism (e.g., <blockId.property>).
|
||||
* • Stores block outputs in context.blockStates so subsequent blocks can reference them by ID or name.
|
||||
* • Maintains robust error handling (if a block fails, throws an error for the entire workflow).
|
||||
* • Returns per-block logs that can be displayed in the UI for better trace/debug.
|
||||
*/
|
||||
|
||||
import { SerializedWorkflow, SerializedBlock } from '@/serializer/types'
|
||||
import { ExecutionContext, ExecutionResult, Tool } from './types'
|
||||
import { tools } from '@/tools'
|
||||
import { BlockOutput } from '@/blocks/types'
|
||||
import {
|
||||
Tool,
|
||||
ExecutionContext,
|
||||
ExecutionResult,
|
||||
BlockLog
|
||||
} from './types'
|
||||
import { tools } from '@/tools'
|
||||
|
||||
export class Executor {
|
||||
constructor(
|
||||
private workflow: SerializedWorkflow,
|
||||
// Initial block states can be passed in if you need to resume workflows or pre-populate data.
|
||||
private initialBlockStates: Record<string, BlockOutput> = {}
|
||||
) {}
|
||||
|
||||
/**
|
||||
* Main entry point that executes the entire workflow in parallel layers.
|
||||
*/
|
||||
async execute(workflowId: string): Promise<ExecutionResult> {
|
||||
const startTime = new Date()
|
||||
|
||||
// Build the ExecutionContext with new blockLogs array
|
||||
const context: ExecutionContext = {
|
||||
workflowId,
|
||||
blockStates: new Map<string, BlockOutput>(),
|
||||
blockLogs: [],
|
||||
metadata: {
|
||||
startTime: startTime.toISOString()
|
||||
},
|
||||
}
|
||||
|
||||
// Pre-populate block states if initialBlockStates exist
|
||||
Object.entries(this.initialBlockStates).forEach(([blockId, output]) => {
|
||||
context.blockStates.set(blockId, output)
|
||||
})
|
||||
|
||||
try {
|
||||
// Perform layered parallel execution
|
||||
const lastOutput = await this.executeInParallel(context)
|
||||
|
||||
const endTime = new Date()
|
||||
context.metadata.endTime = endTime.toISOString()
|
||||
|
||||
// Return full logs for the UI to consume
|
||||
return {
|
||||
success: true,
|
||||
output: lastOutput,
|
||||
metadata: {
|
||||
duration: endTime.getTime() - startTime.getTime(),
|
||||
startTime: context.metadata.startTime!,
|
||||
endTime: context.metadata.endTime!,
|
||||
},
|
||||
logs: context.blockLogs,
|
||||
}
|
||||
} catch (error) {
|
||||
return {
|
||||
success: false,
|
||||
output: { response: {} },
|
||||
error: error instanceof Error ? error.message : 'Unknown error',
|
||||
logs: context.blockLogs,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes all blocks in a layered topological fashion, running each layer in parallel via Promise.all.
|
||||
* If a cycle is detected, throws an error.
|
||||
*/
|
||||
private async executeInParallel(context: ExecutionContext): Promise<BlockOutput> {
|
||||
const { blocks, connections } = this.workflow
|
||||
|
||||
// Build in-degree and adjacency list for each block
|
||||
const inDegree = new Map<string, number>()
|
||||
const adjacency = new Map<string, string[]>()
|
||||
|
||||
// Initialize inDegree and adjacency
|
||||
for (const block of blocks) {
|
||||
inDegree.set(block.id, 0)
|
||||
adjacency.set(block.id, [])
|
||||
}
|
||||
|
||||
// Populate edges
|
||||
for (const conn of connections) {
|
||||
inDegree.set(conn.target, (inDegree.get(conn.target) || 0) + 1)
|
||||
adjacency.get(conn.source)?.push(conn.target)
|
||||
}
|
||||
|
||||
// Start with all blocks that have inDegree = 0
|
||||
let layer = blocks
|
||||
.filter((b) => (inDegree.get(b.id) || 0) === 0)
|
||||
.map((b) => b.id)
|
||||
|
||||
// Track the final output from the "last" block or set of blocks
|
||||
let lastOutput: BlockOutput = { response: {} }
|
||||
|
||||
while (layer.length > 0) {
|
||||
// Execute current layer in parallel
|
||||
const results = await Promise.all(
|
||||
layer.map(async (blockId) => {
|
||||
// Find the block object
|
||||
const block = blocks.find((b) => b.id === blockId)
|
||||
if (!block) {
|
||||
throw new Error(`Missing block ${blockId}`)
|
||||
}
|
||||
|
||||
// Resolve template references in block config params
|
||||
const resolvedInputs = this.resolveInputs(block, context)
|
||||
|
||||
// Execute the block, store the result
|
||||
const output = await this.executeBlock(block, resolvedInputs, context)
|
||||
context.blockStates.set(block.id, output)
|
||||
return output
|
||||
})
|
||||
)
|
||||
|
||||
// Keep track of the "most recent" result as lastOutput
|
||||
if (results.length > 0) {
|
||||
lastOutput = results[results.length - 1]
|
||||
}
|
||||
|
||||
// Build the next layer by reducing in-degree of neighbors
|
||||
const nextLayer: string[] = []
|
||||
for (const blockId of layer) {
|
||||
const neighbors = adjacency.get(blockId) || []
|
||||
for (const targetId of neighbors) {
|
||||
const deg = inDegree.get(targetId) ?? 0
|
||||
const newDeg = deg - 1
|
||||
inDegree.set(targetId, newDeg)
|
||||
if (newDeg === 0) {
|
||||
nextLayer.push(targetId)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
layer = nextLayer
|
||||
}
|
||||
|
||||
// Validate that all blocks were executed. If not, the workflow has a cycle.
|
||||
const executedCount = [...inDegree.values()].filter((x) => x === 0).length
|
||||
if (executedCount !== blocks.length) {
|
||||
throw new Error('Workflow contains cycles or invalid connections')
|
||||
}
|
||||
|
||||
return lastOutput
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes a single block by:
|
||||
* 1) Determining which tool to call
|
||||
* 2) Validating parameters
|
||||
* 3) Making the request (for http blocks or LLM blocks, etc.)
|
||||
* 4) Transforming the response via the tool's transformResponse
|
||||
*/
|
||||
private async executeBlock(
|
||||
block: SerializedBlock,
|
||||
inputs: Record<string, any>,
|
||||
context: ExecutionContext
|
||||
): Promise<BlockOutput> {
|
||||
const toolId = block.config.tool
|
||||
if (!toolId) throw new Error(`Block ${block.id} does not specify a tool`)
|
||||
if (!toolId) {
|
||||
throw new Error(`Block "${block.id}" does not specify a tool`)
|
||||
}
|
||||
|
||||
const tool = tools[toolId]
|
||||
if (!tool) throw new Error(`Tool not found: ${toolId}`)
|
||||
const tool: Tool | undefined = tools[toolId]
|
||||
if (!tool) {
|
||||
throw new Error(`Tool not found: ${toolId}`)
|
||||
}
|
||||
|
||||
const validatedParams = this.validateToolParams(tool, { ...block.config.params, ...inputs })
|
||||
// Merge block's static params with dynamic inputs
|
||||
const validatedParams = this.validateToolParams(tool, {
|
||||
...block.config.params,
|
||||
...inputs,
|
||||
})
|
||||
|
||||
// Prepare a new blockLog entry
|
||||
const blockLog: Partial<BlockLog> = {
|
||||
blockId: block.id,
|
||||
blockTitle: block.metadata?.title || 'Unnamed Block',
|
||||
startedAt: new Date().toISOString(),
|
||||
}
|
||||
|
||||
try {
|
||||
const url = typeof tool.request.url === 'function' ? tool.request.url(validatedParams) : tool.request.url
|
||||
const headers = tool.request.headers(validatedParams)
|
||||
const method = typeof validatedParams.method === 'object'
|
||||
? validatedParams.method.method
|
||||
: (validatedParams.method || tool.request.method)
|
||||
if (!tool.request) {
|
||||
throw new Error(`Tool "${toolId}" has no request config.`)
|
||||
}
|
||||
|
||||
const body = (method !== 'GET' && method !== 'HEAD' && tool.request.body)
|
||||
? JSON.stringify(tool.request.body(validatedParams))
|
||||
const { url: urlOrFn, method: defaultMethod, headers: headersFn, body: bodyFn } =
|
||||
tool.request
|
||||
|
||||
// Build the URL
|
||||
const url = typeof urlOrFn === 'function' ? urlOrFn(validatedParams) : urlOrFn
|
||||
// Determine HTTP method
|
||||
const methodFromParams =
|
||||
typeof validatedParams.method === 'object'
|
||||
? validatedParams.method.method
|
||||
: validatedParams.method
|
||||
const method = methodFromParams || defaultMethod || 'GET'
|
||||
|
||||
// Safely compute headers
|
||||
const headers = headersFn?.(validatedParams) ?? {}
|
||||
|
||||
// Build body if needed
|
||||
const bodyNeeded = method !== 'GET' && method !== 'HEAD' && !!bodyFn
|
||||
const body = bodyNeeded
|
||||
? JSON.stringify(bodyFn!(validatedParams))
|
||||
: undefined
|
||||
|
||||
const response = await fetch(url, { method, headers, body })
|
||||
|
||||
// Perform fetch()
|
||||
const response = await fetch(url || '', { method, headers, body })
|
||||
if (!response.ok) {
|
||||
const error = await response.json().catch(() => ({ message: response.statusText }))
|
||||
throw new Error(tool.transformError(error))
|
||||
// In case there is a custom transformError
|
||||
const transformError = tool.transformError ?? (() => 'Unknown error')
|
||||
const errorBody = await response.json().catch(() => ({
|
||||
message: response.statusText,
|
||||
}))
|
||||
throw new Error(transformError(errorBody))
|
||||
}
|
||||
|
||||
const result = await tool.transformResponse(response)
|
||||
|
||||
// Transform the response
|
||||
const transformResponse =
|
||||
tool.transformResponse ??
|
||||
(async (resp: Response) => ({
|
||||
success: true,
|
||||
output: await resp.json(),
|
||||
}))
|
||||
|
||||
const result = await transformResponse(response)
|
||||
if (!result.success) {
|
||||
throw new Error(tool.transformError(result))
|
||||
const transformError = tool.transformError ?? (() => 'Tool returned an error object')
|
||||
throw new Error(transformError(result))
|
||||
}
|
||||
|
||||
return {
|
||||
response: result.output
|
||||
|
||||
// Success: update the blockLog with success & output
|
||||
blockLog.success = true
|
||||
blockLog.output = result.output
|
||||
|
||||
return { response: result.output }
|
||||
} catch (e) {
|
||||
blockLog.success = false
|
||||
blockLog.error = e instanceof Error ? e.message : 'Unknown error'
|
||||
throw e
|
||||
} finally {
|
||||
// Compute the end time and duration
|
||||
const end = new Date()
|
||||
blockLog.endedAt = end.toISOString()
|
||||
|
||||
if (blockLog.startedAt) {
|
||||
const started = new Date(blockLog.startedAt).getTime()
|
||||
blockLog.durationMs = end.getTime() - started
|
||||
} else {
|
||||
blockLog.durationMs = 0
|
||||
}
|
||||
} catch (error) {
|
||||
throw new Error(`Tool ${toolId} execution failed: ${error instanceof Error ? error.message : 'Unknown error'}`)
|
||||
|
||||
// Push the log entry
|
||||
context.blockLogs.push(blockLog as BlockLog)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates required parameters for a Tool, or uses defaults if present.
|
||||
*/
|
||||
private validateToolParams(tool: Tool, params: Record<string, any>): Record<string, any> {
|
||||
return Object.entries(tool.params).reduce((validated, [name, config]) => {
|
||||
if (name in params) validated[name] = params[name]
|
||||
else if ('default' in config) validated[name] = config.default
|
||||
else if (config.required) throw new Error(`Missing required parameter '${name}'`)
|
||||
return validated
|
||||
return Object.entries(tool.params).reduce((acc, [name, config]) => {
|
||||
if (name in params) {
|
||||
acc[name] = params[name]
|
||||
} else if ('default' in config) {
|
||||
acc[name] = config.default
|
||||
} else if (config.required) {
|
||||
throw new Error(`Missing required parameter '${name}'`)
|
||||
}
|
||||
return acc
|
||||
}, {} as Record<string, any>)
|
||||
}
|
||||
|
||||
private determineExecutionOrder(): string[] {
|
||||
const { blocks, connections } = this.workflow
|
||||
const order: string[] = []
|
||||
const visited = new Set<string>()
|
||||
const inDegree = new Map(blocks.map(block => [block.id, 0]))
|
||||
|
||||
connections.forEach(conn => {
|
||||
inDegree.set(conn.target, (inDegree.get(conn.target) || 0) + 1)
|
||||
})
|
||||
|
||||
const queue = blocks
|
||||
.filter(block => inDegree.get(block.id) === 0)
|
||||
.map(block => block.id)
|
||||
|
||||
while (queue.length > 0) {
|
||||
const blockId = queue.shift()!
|
||||
if (!visited.has(blockId)) {
|
||||
visited.add(blockId)
|
||||
order.push(blockId)
|
||||
|
||||
connections
|
||||
.filter(conn => conn.source === blockId)
|
||||
.forEach(conn => {
|
||||
const newDegree = (inDegree.get(conn.target) || 0) - 1
|
||||
inDegree.set(conn.target, newDegree)
|
||||
if (newDegree === 0) queue.push(conn.target)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
if (order.length !== blocks.length) throw new Error('Workflow contains cycles')
|
||||
return order
|
||||
}
|
||||
|
||||
private resolveInputs(block: SerializedBlock, context: ExecutionContext): Record<string, any> {
|
||||
/**
|
||||
* Resolves any template references in a block's config params (e.g., "<someBlockId.response>"),
|
||||
* pulling from context.blockStates. This is how outputs from one block get wired as inputs to another.
|
||||
*/
|
||||
private resolveInputs(
|
||||
block: SerializedBlock,
|
||||
context: ExecutionContext
|
||||
): Record<string, any> {
|
||||
const inputs = { ...block.config.params }
|
||||
|
||||
// Create maps for both ID and name lookups
|
||||
const blockById = new Map(
|
||||
this.workflow.blocks.map(b => [b.id, b])
|
||||
)
|
||||
// Create quick-lookup for blocks by ID and by normalized name
|
||||
const blockById = new Map(this.workflow.blocks.map((b) => [b.id, b]))
|
||||
const blockByName = new Map(
|
||||
this.workflow.blocks.map(b => [
|
||||
this.workflow.blocks.map((b) => [
|
||||
b.metadata?.title?.toLowerCase().replace(/\s+/g, '') || '',
|
||||
b
|
||||
])
|
||||
)
|
||||
|
||||
const resolvedInputs = Object.entries(inputs).reduce((acc, [key, value]) => {
|
||||
if (typeof value === 'string') {
|
||||
const matches = value.match(/<([^>]+)>/g)
|
||||
|
||||
if (matches) {
|
||||
let resolvedValue = value
|
||||
|
||||
matches.forEach(match => {
|
||||
const path = match.slice(1, -1) // Remove < and >
|
||||
const [blockRef, ...pathParts] = path.split('.')
|
||||
|
||||
// Try to find block by ID first, then by normalized name
|
||||
let sourceBlock = blockById.get(blockRef)
|
||||
if (!sourceBlock) {
|
||||
const normalizedName = blockRef.toLowerCase().replace(/\s+/g, '')
|
||||
sourceBlock = blockByName.get(normalizedName)
|
||||
}
|
||||
const resolvedInputs = Object.entries(inputs).reduce(
|
||||
(acc, [key, value]) => {
|
||||
if (typeof value === 'string') {
|
||||
const matches = value.match(/<([^>]+)>/g)
|
||||
if (matches) {
|
||||
let resolvedValue = value
|
||||
for (const match of matches) {
|
||||
// e.g. "<someBlockId.response>"
|
||||
const path = match.slice(1, -1) // remove < and >
|
||||
const [blockRef, ...pathParts] = path.split('.')
|
||||
|
||||
if (!sourceBlock) {
|
||||
console.warn(`Block ${blockRef} not found by ID or name`)
|
||||
return
|
||||
}
|
||||
|
||||
const sourceState = context.blockStates.get(sourceBlock.id)
|
||||
if (!sourceState) {
|
||||
console.warn(`No state found for block ${sourceBlock.id}`)
|
||||
return
|
||||
}
|
||||
|
||||
// Start with the block's state
|
||||
let replacementValue: any = sourceState
|
||||
|
||||
// Traverse the path parts to get the final value
|
||||
for (const part of pathParts) {
|
||||
if (!replacementValue || typeof replacementValue !== 'object') {
|
||||
console.warn(`Invalid path part ${part} in ${path}`)
|
||||
return
|
||||
// Try blockRef as ID, then as normalized name
|
||||
let sourceBlock = blockById.get(blockRef)
|
||||
if (!sourceBlock) {
|
||||
const normalized = blockRef.toLowerCase().replace(/\s+/g, '')
|
||||
sourceBlock = blockByName.get(normalized)
|
||||
}
|
||||
|
||||
if (!sourceBlock) {
|
||||
console.warn(`Block reference "${blockRef}" not found by ID or name.`)
|
||||
continue
|
||||
}
|
||||
|
||||
const sourceState = context.blockStates.get(sourceBlock.id)
|
||||
if (!sourceState) {
|
||||
console.warn(`No state found for block ID "${sourceBlock.id}".`)
|
||||
continue
|
||||
}
|
||||
|
||||
// Drill into the path
|
||||
let replacementValue: any = sourceState
|
||||
for (const part of pathParts) {
|
||||
if (!replacementValue || typeof replacementValue !== 'object') {
|
||||
console.warn(`Invalid path part "${part}" in "${path}".`)
|
||||
replacementValue = undefined
|
||||
break
|
||||
}
|
||||
replacementValue = replacementValue[part]
|
||||
}
|
||||
|
||||
// If a valid leaf is found
|
||||
if (replacementValue !== undefined) {
|
||||
// Replace the placeholder in the string
|
||||
resolvedValue = resolvedValue.replace(
|
||||
match,
|
||||
typeof replacementValue === 'object'
|
||||
? JSON.stringify(replacementValue)
|
||||
: String(replacementValue)
|
||||
)
|
||||
} else {
|
||||
console.warn(`No value found at path "${path}".`)
|
||||
}
|
||||
replacementValue = replacementValue[part]
|
||||
}
|
||||
|
||||
if (replacementValue !== undefined) {
|
||||
// Replace the entire template expression with the resolved value
|
||||
resolvedValue = resolvedValue.replace(match,
|
||||
typeof replacementValue === 'object'
|
||||
? JSON.stringify(replacementValue)
|
||||
: String(replacementValue)
|
||||
)
|
||||
} else {
|
||||
console.warn(`No value found at path ${path}`)
|
||||
}
|
||||
})
|
||||
|
||||
// Try to parse the value if it looks like JSON
|
||||
try {
|
||||
if (resolvedValue.startsWith('{') || resolvedValue.startsWith('[')) {
|
||||
acc[key] = JSON.parse(resolvedValue)
|
||||
} else {
|
||||
// Attempt JSON parse if it looks like JSON
|
||||
try {
|
||||
if (resolvedValue.startsWith('{') || resolvedValue.startsWith('[')) {
|
||||
acc[key] = JSON.parse(resolvedValue)
|
||||
} else {
|
||||
acc[key] = resolvedValue
|
||||
}
|
||||
} catch {
|
||||
acc[key] = resolvedValue
|
||||
}
|
||||
} catch {
|
||||
acc[key] = resolvedValue
|
||||
} else {
|
||||
// No placeholders
|
||||
acc[key] = value
|
||||
}
|
||||
} else {
|
||||
// Not a string param
|
||||
acc[key] = value
|
||||
}
|
||||
} else {
|
||||
acc[key] = value
|
||||
}
|
||||
|
||||
return acc
|
||||
}, {} as Record<string, any>)
|
||||
return acc
|
||||
},
|
||||
{} as Record<string, any>
|
||||
)
|
||||
|
||||
return resolvedInputs
|
||||
}
|
||||
|
||||
async execute(workflowId: string): Promise<ExecutionResult> {
|
||||
const startTime = new Date()
|
||||
|
||||
const context: ExecutionContext = {
|
||||
workflowId,
|
||||
blockStates: new Map(),
|
||||
metadata: { startTime: startTime.toISOString() }
|
||||
}
|
||||
|
||||
try {
|
||||
const executionOrder = this.determineExecutionOrder()
|
||||
|
||||
for (const blockId of executionOrder) {
|
||||
const block = this.workflow.blocks.find(b => b.id === blockId)
|
||||
if (!block) throw new Error(`Block ${blockId} not found in workflow`)
|
||||
|
||||
const output = await this.executeBlock(block, this.resolveInputs(block, context), context)
|
||||
context.blockStates.set(blockId, output)
|
||||
}
|
||||
|
||||
const endTime = new Date()
|
||||
const lastOutput = context.blockStates.get(executionOrder[executionOrder.length - 1])
|
||||
|
||||
if (!lastOutput) {
|
||||
throw new Error('No output from workflow execution')
|
||||
}
|
||||
|
||||
return {
|
||||
success: true,
|
||||
output: lastOutput,
|
||||
metadata: {
|
||||
duration: endTime.getTime() - startTime.getTime(),
|
||||
startTime: startTime.toISOString(),
|
||||
endTime: endTime.toISOString()
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
return {
|
||||
success: false,
|
||||
output: { response: {} },
|
||||
error: error instanceof Error ? error.message : 'Unknown error'
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,49 +1,88 @@
|
||||
import { BlockOutput } from '@/blocks/types'
|
||||
|
||||
/**
|
||||
* Describes a single block's logs, including timing and success/failure state.
|
||||
*/
|
||||
export interface BlockLog {
|
||||
blockId: string
|
||||
blockTitle?: string
|
||||
success: boolean
|
||||
error?: string
|
||||
startedAt: string
|
||||
endedAt: string
|
||||
durationMs: number
|
||||
output?: any
|
||||
}
|
||||
|
||||
/**
|
||||
* Describes the runtime context for executing a workflow,
|
||||
* including all block outputs (blockStates), metadata for timing, and block logs.
|
||||
*/
|
||||
export interface ExecutionContext {
|
||||
workflowId: string
|
||||
blockStates: Map<string, BlockOutput>
|
||||
// Make metadata non-optional so we can assign .startTime or .endTime without TS warnings
|
||||
metadata: {
|
||||
startTime?: string
|
||||
endTime?: string
|
||||
// You can keep an index signature if you want to store extra fields
|
||||
[key: string]: any
|
||||
}
|
||||
// We store logs in an array so the final result includes a step-by-step record
|
||||
blockLogs: BlockLog[]
|
||||
}
|
||||
|
||||
/**
|
||||
* The complete result from executing the workflow. Includes success/fail,
|
||||
* the "last block" output, optional error, timing metadata, and logs of each block's run.
|
||||
*/
|
||||
export interface ExecutionResult {
|
||||
success: boolean
|
||||
output: BlockOutput
|
||||
error?: string
|
||||
metadata?: {
|
||||
duration: number
|
||||
startTime: string
|
||||
endTime: string
|
||||
}
|
||||
// Detailed logs of what happened in each block
|
||||
logs?: BlockLog[]
|
||||
}
|
||||
|
||||
/**
|
||||
* Defines how a particular tool is invoked (URLs, headers, etc.), how it transforms responses
|
||||
* and handles errors. Used by blocks that reference a particular tool ID.
|
||||
*/
|
||||
export interface Tool<P = any, O = Record<string, any>> {
|
||||
id: string
|
||||
name: string
|
||||
description: string
|
||||
version: string
|
||||
id: string
|
||||
name: string
|
||||
description: string
|
||||
version: string
|
||||
params: {
|
||||
[key: string]: {
|
||||
type: string
|
||||
required?: boolean
|
||||
description?: string
|
||||
default?: any
|
||||
}
|
||||
}
|
||||
request: {
|
||||
url: string | ((params: P) => string)
|
||||
method: string
|
||||
headers: (params: P) => Record<string, string>
|
||||
body?: (params: P) => Record<string, any>
|
||||
}
|
||||
transformResponse: (response: any) => Promise<{
|
||||
type: string
|
||||
required?: boolean
|
||||
description?: string
|
||||
default?: any
|
||||
}
|
||||
}
|
||||
request?: {
|
||||
url?: string | ((params: P) => string)
|
||||
method?: string
|
||||
headers?: (params: P) => Record<string, string>
|
||||
body?: (params: P) => Record<string, any>
|
||||
}
|
||||
transformResponse?: (response: any) => Promise<{
|
||||
success: boolean
|
||||
output: O
|
||||
error?: string
|
||||
}>
|
||||
transformError: (error: any) => string
|
||||
transformError?: (error: any) => string
|
||||
}
|
||||
|
||||
/**
|
||||
* A registry of Tools, keyed by their IDs or names.
|
||||
*/
|
||||
export interface ToolRegistry {
|
||||
[key: string]: Tool
|
||||
}
|
||||
|
||||
export interface ExecutionContext {
|
||||
workflowId: string
|
||||
blockStates: Map<string, BlockOutput>
|
||||
metadata?: Record<string, any>
|
||||
}
|
||||
|
||||
export interface ExecutionResult {
|
||||
success: boolean
|
||||
output: BlockOutput
|
||||
error?: string
|
||||
metadata?: {
|
||||
duration: number
|
||||
startTime: string
|
||||
endTime: string
|
||||
}
|
||||
[key: string]: Tool
|
||||
}
|
||||
Reference in New Issue
Block a user