Files
sim/apps/sim/app/api/workflows/[id]/execute/route.ts
Waleed Latif 76df2b9cd9 fix(sockets): added throttling, refactor entire socket server, added tests (#534)
* refactor(kb): use chonkie locally (#475)

* feat(parsers): text and markdown parsers (#473)

* feat: text and markdown parsers

* fix: don't readfile on buffer, convert buffer to string instead

* fix(knowledge-wh): fixed authentication error on webhook trigger

fix(knowledge-wh): fixed authentication error on webhook trigger

* feat(tools): add huggingface tools/blcok  (#472)

* add hugging face tool

* docs: add Hugging Face tool documentation

* fix: format and lint Hugging Face integration files

* docs: add manual intro section to Hugging Face documentation

* feat: replace Record<string, any> with proper HuggingFaceRequestBody interface

* accidental local files added

* restore some docs

* make layout full for model field

* change huggingface logo

* add manual content

* fix lint

---------

Co-authored-by: Vikhyath Mondreti <vikhyathmondreti@Vikhyaths-MacBook-Air.local>

* fix(knowledge-ux): fixed ux for knowledge base (#478)

fix(knowledge-ux): fixed ux for knowledge base (#478)

* fix(billing): bump better-auth version & fix existing subscription issue when adding seats (#484)

* bump better-auth version & fix existing subscription issue Bwhen adding seats

* ack PR comments

* fix(env): added NEXT_PUBLIC_APP_URL to .env.example (#485)

* feat(subworkflows): workflows as a block within workflows (#480)

* feat(subworkflows) workflows in workflows

* revert sync changes

* working output vars

* fix greptile comments

* add cycle detection

* add tests

* working tests

* works

* fix formatting

* fix input var handling

* add images

---------

Co-authored-by: Vikhyath Mondreti <vikhyathmondreti@Vikhyaths-MacBook-Air.local>
Co-authored-by: Vikhyath Mondreti <vikhyathmondreti@Vikhyaths-Air.attlocal.net>

* fix(kb): fixed kb race condition resulting in no chunks found (#487)

* fix: added all blocks activeExecutionPath (#486)

* refactor(chunker): replace chonkie with custom TextChunker (#479)

* refactor(chunker): replace chonkie with custom TextChunker implementation and update document processing logic

* chore: cleanup unimplemented types

* fix: KB tests updated

* fix(tab-sync): sync between tabs on change (#489)

* fix(tab-sync): sync between tabs on change

* refactor: optimize JSON.stringify operations that are redundant

* fix(file-upload): upload presigned url to kb for file upload instead of the whole file, circumvents 4.5MB serverless func limit (#491)

* feat(folders): folders to manage workflows (#490)

* feat(subworkflows) workflows in workflows

* revert sync changes

* working output vars

* fix greptile comments

* add cycle detection

* add tests

* working tests

* works

* fix formatting

* fix input var handling

* fix(tab-sync): sync between tabs on change

* feat(folders): folders to organize workflows

* address comments

* change schema types

* fix lint error

* fix typing error

* fix race cond

* delete unused files

* improved UI

* updated naming conventions

* revert unrelated changes to db schema

* fixed collapsed sidebar subfolders

* add logs filters for folders

---------

Co-authored-by: Vikhyath Mondreti <vikhyathmondreti@Vikhyaths-MacBook-Air.local>
Co-authored-by: Vikhyath Mondreti <vikhyathmondreti@Vikhyaths-Air.attlocal.net>
Co-authored-by: Waleed Latif <walif6@gmail.com>

* revert tab sync

* improvement(folders): added multi-select for moving folders (#493)

* added multi-select for folders

* allow drag into root

* remove extraneous comments

* instantly create worfklow on plus

* styling improvements, fixed flicker

* small improvement to dragover container

* ack PR comments

* fix(deployed-chat): made the chat mobile friendly (#494)

* improvement(ui/ux): chat deploy (#496)

* improvement(ui/ux): chat deploy experience

* improvement(ui/ux): chat fontweight

* feat(gmail): added option to access raw gmail from gmail polling service (#495)

* added option to grab raw gmail from gmail polling service

* safe json parse for function block execution to prevent vars in raw email from being resolved as sim studio vars

* added tests

* remove extraneous comments

* fix(ui): fix the UI for folder deletion, huggingface icon, workflow block icon, standardized alert dialog (#498)

* fixed folder delete UI

* fixed UI for workflow block, huggingface, & added alert dialog for deleting folders

* consistently style all alert dialogs

* fix(reset-data): remove reset all data button from settings modal along with logic (#499)

* fix(airtable): fixed airtable oauth token refresh, added tests (#502)

* fixed airtable token refresh, added tests

* added helpers for refreshOAuthToken function

* feat(registration): disable registration + handle env booleans (#501)

* feat: disable registration + handle env booleans

* chore: removing pre-process because we need to use util

* chore: format

* feat(providers): added azure openai (#503)

* added azure openai

* fix request params being passed through agent block for azure

* remove o1 from azure-openai models list

* fix: add vscode settings to gitignore

* feat(file-upload): generalized storage to support azure blob, enhanced error logging in kb, added xlsx parser (#506)

* added blob storage option for azure, refactored storage client to be provider agnostic, tested kb & file upload and s3 is undisrupted, still have to test blob

* updated CORS policy for blob, added azure blob-specific headers

* remove extraneous comments

* add file size limit and timeout

* added some extra error handling in kb add documents

* grouped envvars

* ack PR comments

* added sheetjs and xlsx parser

* fix(folders): modified folder deletion to delete subfolders & workflows in it instead of moving to root (#508)

* modified folder deletion to delete subfolders & workflows in it instead of moving to root

* added additional testing utils

* ack PR comments

* feat: api response block and implementation

* improvement(local-storage): remove use of local storage except for oauth and last active workspace id (#497)

* remove local storage usage

* remove migration for last active workspace id

* Update apps/sim/app/w/[id]/components/workflow-block/components/sub-block/components/file-selector/components/jira-issue-selector.tsx

Add fallback for required scopes

Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>

* add url builder util

* fi

* fix lint

* lint

* modify pre commit hook

* fix oauth

* get last active workspace working again

* new workspace logic works

* fetch locks

* works now

* remove empty useEffect

* fix loading issue

* skip empty workflow syncs

* use isWorkspace in transition flag

* add logging

* add data initialized flag

* fix lint

* fix: build error by create a server-side utils

* remove migration snapshots

* reverse search for workspace based on workflow id

* fix lint

* improvement: loading check and animation

* remove unused utils

* remove console  logs

---------

Co-authored-by: Vikhyath Mondreti <vikhyathmondreti@Vikhyaths-Air.attlocal.net>
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
Co-authored-by: Emir Karabeg <emirkarabeg@berkeley.edu>
Co-authored-by: Vikhyath Mondreti <vikhyathmondreti@vikhyaths-air.lan>

* feat(multi-select): simplified chat to always return readable stream, can select multiple outputs and get response streamed back in chat panel & deployed chat (#507)

* improvement: all workflow executions return ReadableStream & use sse to support multiple streamed outputs in chats

* fixed build

* remove extraneous comments

* general improvemetns

* ack PR comments

* fixed built

* improvement(workflow-state): split workflow state into separate tables  (#511)

* new tables to track workflow state

* fix lint

* refactor into separate tables

* fix typing

* fix lint

* add tests

* fix lint

* add correct foreign key constraint

* add self ref

* remove unused checks

* fix types

* fix type

---------

Co-authored-by: Vikhyath Mondreti <vikhyathmondreti@Vikhyaths-Air.attlocal.net>

* feat(models): added new openai models, updated model pricing, added new groq model (#513)

* fix(autocomplete): fixed extra closing tag on tag dropdown autocomplete (#514)

* chore: enable input format again

* fix: process the input made on api calls with proper extraction

* feat: add json-object for ai generation for response block and others

* chore: add documentation for response block

* chore: rollback temp fix and uncomment original input handler

* chore: add missing mock for response handler

* chore: add missing mock

* chore: greptile recommendations

* added cost tracking for router & evaluator blocks, consolidated model information into a single file, hosted keys for evaluator & router, parallelized unit tests (#516)

* fix(deployState): deploy not persisting bug  (#518)

* fix(undeploy-bug): fix deployment persistence failing bug

* fix lint

---------

Co-authored-by: Vikhyath Mondreti <vikhyathmondreti@Vikhyaths-MacBook-Air.local>

* fix decimal entry issues

* remove unused files

* fix(db): decimal position entry issues (#520)

* fix decimal entry issues

* remove unused files

---------

Co-authored-by: Vikhyath Mondreti <vikhyathmondreti@Vikhyaths-Air.attlocal.net>

* fix lint

* fix test

* improvement(kb): added configurability for chunks, query across multiple knowledge bases (#512)

* refactor: consolidate create modal file

* fix: identify dead processes

* fix: mark failed in DB after processing timeout

* improvement: added overlap chunks and fixed modal UI

* feat: multiselect logic

* fix: biome changes for css ordering warn instead of error

* improvement: create chunk ui

* fix: removed unused schema columns

* fix: removed references to deleted columns

* improvement: sped up vector search time

* feat: multi-kb search

* add bulk endpoint to disable/delete multiple chunks

* add bulk endpoint to disable/delete multiple chunks

* fix: removed unused schema columns

* fix: removed references to deleted columns

* made endpoints for knowledge more RESTful, added tests

* added batch operations for delete/enable/disable docs, alr have this for chunks

* added migrations

* added migrations

---------

Co-authored-by: Waleed Latif <walif6@gmail.com>

* fix(models): remove temp from models that don't support it

* feat(sdk): added ts and python SDKs + docs (#524)

* added ts & python sdk, renamed cli from simstudio to cli

* added docs

* ack PR comments

* improvements

* fixed issue where it goes to random workspace when you click reload

fixed lint issue

* feat: better response builder + doc update

* fix(auth): added preview URLs to list of trusted origins (#525)

* trusted origins

* lint error

* removed localhost

* ran lint

---------

Co-authored-by: Waleed Latif <walif6@gmail.com>

* fix(sdk): remove dev script from SDK

* PR: changes for migration

* add changes on top of db migration changes

* fix: allow removing single input field

* improvement(permissions): workspace permissions improvements, added provider and reduced API calls by 85% (#530)

* improved permissions UI & access patterns, show outstanding invites

* added logger

* added provider for workspace permissions, 85% reduction in API calls to get user permissions and improved performance for invitations

* ack PR comments

* cleanup

* fix disabled tooltips

* improvement(tests): parallelized tests and build fixes (#531)

* added provider for workspace permissions, 85% reduction in API calls to get user permissions and improved performance for invitations

* parallelized more tests, fixed test warnings

* removed waitlist verification route, use more utils in tests

* fixed build

* ack PR comments

* fix

* fix(kb): reduced params in kb block, added advanced mode to starter block, updated docs

* feat(realtime): sockets + normalized tables + deprecate sync (#523)

* feat: implement real-time collaborative workflow editing with Socket.IO

- Add Socket.IO server with room-based architecture for workflow collaboration
- Implement socket context for client-side real-time communication
- Add collaborative workflow hook for synchronized state management
- Update CSP to allow socket connections to localhost:3002
- Add fallback authentication for testing collaborative features
- Enable real-time broadcasting of workflow operations between tabs
- Support multi-user editing of blocks, edges, and workflow state

Key components:
- socket-server/: Complete Socket.IO server with authentication and room management
- contexts/socket-context.tsx: Client-side socket connection and state management
- hooks/use-collaborative-workflow.ts: Hook for collaborative workflow operations
- Workflow store integration for real-time state synchronization

Status: Basic collaborative features working, authentication bypass enabled for testing

* feat: complete collaborative subblock editing implementation

 All collaborative features now working perfectly:
- Real-time block movement and positioning
- Real-time subblock value editing (text fields, inputs)
- Real-time edge operations and parent updates
- Multi-user workflow rooms with proper broadcasting
- Socket.IO server with room-based architecture
- Permission bypass system for testing

🔧 Technical improvements:
- Modified useSubBlockValue hook to use collaborative event system
- All subblock setValue calls now dispatch 'update-subblock-value' events
- Collaborative workflow hook handles all real-time operations
- Socket server processes and persists all operations to database
- Clean separation between local and collaborative state management

🧪 Tested and verified:
- Multiple browser tabs with different fallback users
- Block dragging and positioning updates in real-time
- Subblock text editing reflects immediately across tabs
- Workflow room management and user presence
- Database persistence of all collaborative operations

Status: Full collaborative workflow editing working with fallback authentication

* feat: implement proper authentication for collaborative Socket.IO server

 **Authentication System Complete**:
- Removed all fallback authentication code and bypasses
- Socket server now requires valid Better Auth session cookies
- Proper session validation using auth.api.getSession()
- Authentication errors properly handled and logged
- User info extracted from session: userId, userName, email, organizationId

🔧 **Technical Implementation**:
- Updated CSP to allow WebSocket connections (ws://localhost:3002)
- Socket authentication middleware validates session tokens
- Proper error handling for missing/invalid sessions
- Permission system enforces workflow access controls
- Clean separation between authenticated and unauthenticated states

🧪 **Testing Status**:
- Socket server properly rejects unauthenticated connections
- Authentication errors logged with clear messages
- CSP updated to allow both HTTP and WebSocket protocols
- Ready for testing with authenticated users

Status: Production-ready collaborative authentication system

* feat: complete authentication integration for collaborative Socket.IO system

🎉 **PRODUCTION-READY COLLABORATIVE SYSTEM**

 **Authentication Integration Complete**:
- Fixed Socket.IO client to send credentials (withCredentials: true)
- Updated server CORS to accept credentials with specific origin
- Removed all fallback authentication bypasses
- Proper Better Auth session validation working

🔧 **Technical Fixes**:
- Socket client: Enable withCredentials for cookie transmission
- Socket server: Accept credentials with origin 'http://localhost:3000'
- Better Auth cookie utility integration for session parsing
- Comprehensive authentication middleware with proper error handling

🧪 **Verified Working Features**:
-  Real user authentication (Vikhyath Mondreti authenticated)
-  Multi-user workflow rooms (2+ users in same workflow)
-  Permission system enforcing workflow access controls
-  Real-time subblock editing across browser tabs
-  Block movement and positioning updates
-  Automatic room cleanup and management
-  Database persistence of all collaborative operations

🚀 **Status**: Complete enterprise-grade collaborative workflow editing system
- No more fallback users - production authentication
- Multi-tab collaboration working perfectly
- Secure access control with Better Auth integration
- Real-time updates for all workflow operations

* remove sync system and move to server side

* fix lint

* delete unused file

* added socketio dep

* fix subblock persistence bug

* working deletion of workflows

* fix lint

* added railway

* add debug logging for railway deployment

* improve typing

* fix lint

* working subflow persistence

* fix lint

* working cascade deletion

* fix lint

* working subflow inside subflow

* works

* fix lint

* prevent subflow in subflow

* fix lint

* add additional logs, add localhost as allowedOrigin

* add additional logs, add localhost as allowedOrigin

* fix type error

* remove unused code

* fix lint

* fix tests

* fix lint

* fix build error

* workign folder updates

* fix typing issue

* fix lint

* fix typing issues

* lib/

* fix tests

* added old presence component back, updated to use one-time-token better auth plugin for socket server auth, tested

* fix errors

* fix bugs

* add migration scripts to run

* fix lint

* fix deploy tests

* fix lint

* fix minor issues

* fix lint

* fix migration script

* allow comma separateds id file input to migration script

* fix lint

* fixed

* fix lint

* fix fallback case

* fix type errors

* address greptile comments

* fix lint

* fix script to generate new block ids

* fix lint

---------

Co-authored-by: Vikhyath Mondreti <vikhyathmondreti@Vikhyaths-Air.attlocal.net>
Co-authored-by: Vikhyath Mondreti <vikhyathmondreti@vikhyaths-air.lan>
Co-authored-by: Waleed Latif <walif6@gmail.com>
Co-authored-by: Vikhyath Mondreti <vikhyathmondreti@Vikhyaths-MacBook-Air.local>

* fix(sockets): updated CSP

* remove unecessary logs

* fix lint

* added throttling, refactor entire socket server, added tests

* improvements

* remove self monitoring func, add block name event

* working isWide, isAdvanced toggles with sockets

* fix lint

* fix duplicate key issue for user avatar

* fix lint

* fix user presence

* working parallel badges / loop badges updates

* working connection output persistence

* fix lint

* fix build errors

* fix lint

* logs removed

* fix cascade var name update bug

* works

* fix lint

* fix parallel blocks

* fix placeholder

* fix test

* fixed tests

---------

Co-authored-by: Aditya Tripathi <aditya@climactic.co>
Co-authored-by: Adam Gough <77861281+aadamgough@users.noreply.github.com>
Co-authored-by: Vikhyath Mondreti <vikhyathvikku@gmail.com>
Co-authored-by: Vikhyath Mondreti <vikhyathmondreti@Vikhyaths-MacBook-Air.local>
Co-authored-by: Vikhyath Mondreti <vikhyathmondreti@Vikhyaths-Air.attlocal.net>
Co-authored-by: Emir Karabeg <emirkarabeg@berkeley.edu>
Co-authored-by: Emir Karabeg <78010029+emir-karabeg@users.noreply.github.com>
Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
Co-authored-by: Vikhyath Mondreti <vikhyathmondreti@vikhyaths-air.lan>
Co-authored-by: Ajit Kadaveru <ajit.kadaveru@berkeley.edu>
2025-06-24 17:44:30 -07:00

407 lines
14 KiB
TypeScript

import { eq, sql } from 'drizzle-orm'
import { type NextRequest, NextResponse } from 'next/server'
import { v4 as uuidv4 } from 'uuid'
import { z } from 'zod'
import { createLogger } from '@/lib/logs/console-logger'
import { persistExecutionError, persistExecutionLogs } from '@/lib/logs/execution-logger'
import { buildTraceSpans } from '@/lib/logs/trace-spans'
import { checkServerSideUsageLimits } from '@/lib/usage-monitor'
import { decryptSecret } from '@/lib/utils'
import {
createHttpResponseFromBlock,
updateWorkflowRunCounts,
workflowHasResponseBlock,
} from '@/lib/workflows/utils'
import { db } from '@/db'
import { environment, userStats } from '@/db/schema'
import { Executor } from '@/executor'
import { Serializer } from '@/serializer'
import { mergeSubblockState } from '@/stores/workflows/server-utils'
import type { WorkflowState } from '@/stores/workflows/workflow/types'
import { validateWorkflowAccess } from '../../middleware'
import { createErrorResponse, createSuccessResponse } from '../../utils'
const logger = createLogger('WorkflowExecuteAPI')
export const dynamic = 'force-dynamic'
export const runtime = 'nodejs'
// Define the schema for environment variables
const EnvVarsSchema = z.record(z.string())
// Keep track of running executions to prevent overlap
const runningExecutions = new Set<string>()
// Custom error class for usage limit exceeded
class UsageLimitError extends Error {
statusCode: number
constructor(message: string) {
super(message)
this.name = 'UsageLimitError'
this.statusCode = 402 // Payment Required status code
}
}
async function executeWorkflow(workflow: any, requestId: string, input?: any) {
const workflowId = workflow.id
const executionId = uuidv4()
// Skip if this workflow is already running
if (runningExecutions.has(workflowId)) {
logger.warn(`[${requestId}] Workflow is already running: ${workflowId}`)
throw new Error('Workflow is already running')
}
// Check if the user has exceeded their usage limits
const usageCheck = await checkServerSideUsageLimits(workflow.userId)
if (usageCheck.isExceeded) {
logger.warn(`[${requestId}] User ${workflow.userId} has exceeded usage limits`, {
currentUsage: usageCheck.currentUsage,
limit: usageCheck.limit,
})
throw new UsageLimitError(
usageCheck.message || 'Usage limit exceeded. Please upgrade your plan to continue.'
)
}
// Log input to help debug
logger.info(
`[${requestId}] Executing workflow with input:`,
input ? JSON.stringify(input, null, 2) : 'No input provided'
)
// Validate and structure input for maximum compatibility
let processedInput = input
if (input && typeof input === 'object') {
// Ensure input is properly structured for the starter block
if (input.input === undefined) {
// If input is not already nested, structure it properly
processedInput = { input: input }
logger.info(
`[${requestId}] Restructured input for workflow:`,
JSON.stringify(processedInput, null, 2)
)
}
}
try {
runningExecutions.add(workflowId)
logger.info(`[${requestId}] Starting workflow execution: ${workflowId}`)
// Use the deployed state if available, otherwise fall back to current state
const workflowState = workflow.deployedState || workflow.state
if (!workflow.deployedState) {
logger.warn(
`[${requestId}] No deployed state found for workflow: ${workflowId}, using current state`
)
} else {
logger.info(`[${requestId}] Using deployed state for workflow execution: ${workflowId}`)
}
const state = workflowState as WorkflowState
const { blocks, edges, loops, parallels } = state
// Use the same execution flow as in scheduled executions
const mergedStates = mergeSubblockState(blocks)
// Fetch the user's environment variables (if any)
const [userEnv] = await db
.select()
.from(environment)
.where(eq(environment.userId, workflow.userId))
.limit(1)
if (!userEnv) {
logger.debug(
`[${requestId}] No environment record found for user ${workflow.userId}. Proceeding with empty variables.`
)
}
// Parse and validate environment variables.
const variables = EnvVarsSchema.parse(userEnv?.variables ?? {})
// Replace environment variables in the block states
const currentBlockStates = await Object.entries(mergedStates).reduce(
async (accPromise, [id, block]) => {
const acc = await accPromise
acc[id] = await Object.entries(block.subBlocks).reduce(
async (subAccPromise, [key, subBlock]) => {
const subAcc = await subAccPromise
let value = subBlock.value
// If the value is a string and contains environment variable syntax
if (typeof value === 'string' && value.includes('{{') && value.includes('}}')) {
const matches = value.match(/{{([^}]+)}}/g)
if (matches) {
// Process all matches sequentially
for (const match of matches) {
const varName = match.slice(2, -2) // Remove {{ and }}
const encryptedValue = variables[varName]
if (!encryptedValue) {
throw new Error(`Environment variable "${varName}" was not found`)
}
try {
const { decrypted } = await decryptSecret(encryptedValue)
value = (value as string).replace(match, decrypted)
} catch (error: any) {
logger.error(
`[${requestId}] Error decrypting environment variable "${varName}"`,
error
)
throw new Error(
`Failed to decrypt environment variable "${varName}": ${error.message}`
)
}
}
}
}
subAcc[key] = value
return subAcc
},
Promise.resolve({} as Record<string, any>)
)
return acc
},
Promise.resolve({} as Record<string, Record<string, any>>)
)
// Create a map of decrypted environment variables
const decryptedEnvVars: Record<string, string> = {}
for (const [key, encryptedValue] of Object.entries(variables)) {
try {
const { decrypted } = await decryptSecret(encryptedValue)
decryptedEnvVars[key] = decrypted
} catch (error: any) {
logger.error(`[${requestId}] Failed to decrypt environment variable "${key}"`, error)
throw new Error(`Failed to decrypt environment variable "${key}": ${error.message}`)
}
}
// Process the block states to ensure response formats are properly parsed
const processedBlockStates = Object.entries(currentBlockStates).reduce(
(acc, [blockId, blockState]) => {
// Check if this block has a responseFormat that needs to be parsed
if (blockState.responseFormat && typeof blockState.responseFormat === 'string') {
try {
logger.debug(`[${requestId}] Parsing responseFormat for block ${blockId}`)
// Attempt to parse the responseFormat if it's a string
const parsedResponseFormat = JSON.parse(blockState.responseFormat)
acc[blockId] = {
...blockState,
responseFormat: parsedResponseFormat,
}
} catch (error) {
logger.warn(`[${requestId}] Failed to parse responseFormat for block ${blockId}`, error)
acc[blockId] = blockState
}
} else {
acc[blockId] = blockState
}
return acc
},
{} as Record<string, Record<string, any>>
)
// Get workflow variables
let workflowVariables = {}
if (workflow.variables) {
try {
// Parse workflow variables if they're stored as a string
if (typeof workflow.variables === 'string') {
workflowVariables = JSON.parse(workflow.variables)
} else {
// Otherwise use as is (already parsed JSON)
workflowVariables = workflow.variables
}
logger.debug(
`[${requestId}] Loaded ${Object.keys(workflowVariables).length} workflow variables for: ${workflowId}`
)
} catch (error) {
logger.error(`[${requestId}] Failed to parse workflow variables: ${workflowId}`, error)
// Continue execution even if variables can't be parsed
}
} else {
logger.debug(`[${requestId}] No workflow variables found for: ${workflowId}`)
}
// Serialize and execute the workflow
logger.debug(`[${requestId}] Serializing workflow: ${workflowId}`)
const serializedWorkflow = new Serializer().serializeWorkflow(
mergedStates,
edges,
loops,
parallels
)
const executor = new Executor(
serializedWorkflow,
processedBlockStates,
decryptedEnvVars,
processedInput,
workflowVariables
)
const result = await executor.execute(workflowId)
// Check if we got a StreamingExecution result (with stream + execution properties)
// For API routes, we only care about the ExecutionResult part, not the stream
const executionResult = 'stream' in result && 'execution' in result ? result.execution : result
logger.info(`[${requestId}] Workflow execution completed: ${workflowId}`, {
success: executionResult.success,
executionTime: executionResult.metadata?.duration,
})
// Update workflow run counts if execution was successful
if (executionResult.success) {
await updateWorkflowRunCounts(workflowId)
// Track API call in user stats
await db
.update(userStats)
.set({
totalApiCalls: sql`total_api_calls + 1`,
lastActive: new Date(),
})
.where(eq(userStats.userId, workflow.userId))
}
// Build trace spans from execution logs
const { traceSpans, totalDuration } = buildTraceSpans(executionResult)
// Add trace spans to the execution result
const enrichedResult = {
...executionResult,
traceSpans,
totalDuration,
}
// Log each execution step and the final result
await persistExecutionLogs(workflowId, executionId, enrichedResult, 'api')
return executionResult
} catch (error: any) {
logger.error(`[${requestId}] Workflow execution failed: ${workflowId}`, error)
// Log the error
await persistExecutionError(workflowId, executionId, error, 'api')
throw error
} finally {
runningExecutions.delete(workflowId)
}
}
export async function GET(request: NextRequest, { params }: { params: Promise<{ id: string }> }) {
const requestId = crypto.randomUUID().slice(0, 8)
const { id } = await params
try {
logger.debug(`[${requestId}] GET execution request for workflow: ${id}`)
const validation = await validateWorkflowAccess(request, id)
if (validation.error) {
logger.warn(`[${requestId}] Workflow access validation failed: ${validation.error.message}`)
return createErrorResponse(validation.error.message, validation.error.status)
}
const result = await executeWorkflow(validation.workflow, requestId)
// Check if the workflow execution contains a response block output
const hasResponseBlock = workflowHasResponseBlock(result)
if (hasResponseBlock) {
return createHttpResponseFromBlock(result)
}
return createSuccessResponse(result)
} catch (error: any) {
logger.error(`[${requestId}] Error executing workflow: ${id}`, error)
// Check if this is a usage limit error
if (error instanceof UsageLimitError) {
return createErrorResponse(error.message, error.statusCode, 'USAGE_LIMIT_EXCEEDED')
}
return createErrorResponse(
error.message || 'Failed to execute workflow',
500,
'EXECUTION_ERROR'
)
}
}
export async function POST(request: NextRequest, { params }: { params: Promise<{ id: string }> }) {
const requestId = crypto.randomUUID().slice(0, 8)
const { id } = await params
try {
logger.debug(`[${requestId}] POST execution request for workflow: ${id}`)
const validation = await validateWorkflowAccess(request, id)
if (validation.error) {
logger.warn(`[${requestId}] Workflow access validation failed: ${validation.error.message}`)
return createErrorResponse(validation.error.message, validation.error.status)
}
const bodyText = await request.text()
logger.info(`[${requestId}] Raw request body:`, bodyText)
let body = {}
if (bodyText?.trim()) {
try {
body = JSON.parse(bodyText)
logger.info(`[${requestId}] Parsed request body:`, JSON.stringify(body, null, 2))
} catch (error) {
logger.error(`[${requestId}] Failed to parse request body:`, error)
return createErrorResponse('Invalid JSON in request body', 400, 'INVALID_JSON')
}
} else {
logger.info(`[${requestId}] No request body provided`)
}
// Don't double-nest the input if it's already structured
const hasContent = Object.keys(body).length > 0
const input = hasContent ? { input: body } : {}
logger.info(`[${requestId}] Input passed to workflow:`, JSON.stringify(input, null, 2))
// Execute workflow with the structured input
const result = await executeWorkflow(validation.workflow, requestId, input)
// Check if the workflow execution contains a response block output
const hasResponseBlock = workflowHasResponseBlock(result)
if (hasResponseBlock) {
return createHttpResponseFromBlock(result)
}
return createSuccessResponse(result)
} catch (error: any) {
logger.error(`[${requestId}] Error executing workflow: ${id}`, error)
// Check if this is a usage limit error
if (error instanceof UsageLimitError) {
return createErrorResponse(error.message, error.statusCode, 'USAGE_LIMIT_EXCEEDED')
}
return createErrorResponse(
error.message || 'Failed to execute workflow',
500,
'EXECUTION_ERROR'
)
}
}
export async function OPTIONS(request: NextRequest) {
return new NextResponse(null, {
status: 200,
headers: {
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Methods': 'GET, POST, OPTIONS',
'Access-Control-Allow-Headers':
'Content-Type, X-API-Key, X-CSRF-Token, X-Requested-With, Accept, Accept-Version, Content-Length, Content-MD5, Date, X-Api-Version',
'Access-Control-Max-Age': '86400',
},
})
}