feat(manual-trigger): add manual trigger (#1452)

* feat(manual-trigger): add manual trigger

* consolidate input format extraction

* exclude triggers from console logs + deployed chat error surfacing

* works

* centralize error messages + logging for deployed chat
This commit is contained in:
Vikhyath Mondreti
2025-09-25 16:29:50 -07:00
committed by GitHub
parent 7e8ac5c27f
commit e03c036a10
11 changed files with 441 additions and 272 deletions

View File

@@ -14,6 +14,7 @@ import { processStreamingBlockLogs } from '@/lib/tokenization'
import { getEmailDomain } from '@/lib/urls/utils'
import { decryptSecret, generateRequestId } from '@/lib/utils'
import { TriggerUtils } from '@/lib/workflows/triggers'
import { CHAT_ERROR_MESSAGES } from '@/app/chat/constants'
import { getBlock } from '@/blocks'
import { Executor } from '@/executor'
import type { BlockLog, ExecutionResult } from '@/executor/types'
@@ -345,9 +346,7 @@ export async function executeWorkflowForChat(
chatId,
}
)
throw new Error(
usageCheck.message || 'Usage limit exceeded. Please upgrade your plan to continue using chat.'
)
throw new Error(usageCheck.message || CHAT_ERROR_MESSAGES.USAGE_LIMIT_EXCEEDED)
}
// Set up logging for chat execution
@@ -404,6 +403,7 @@ export async function executeWorkflowForChat(
.select({
isDeployed: workflow.isDeployed,
variables: workflow.variables,
workspaceId: workflow.workspaceId,
})
.from(workflow)
.where(eq(workflow.id, workflowId))
@@ -462,13 +462,7 @@ export async function executeWorkflowForChat(
// Get user environment variables with workspace precedence
let envVars: Record<string, string> = {}
try {
const wfWorkspaceRow = await db
.select({ workspaceId: workflow.workspaceId })
.from(workflow)
.where(eq(workflow.id, workflowId))
.limit(1)
const workspaceId = wfWorkspaceRow[0]?.workspaceId || undefined
const workspaceId = workflowResult[0].workspaceId || undefined
const { personalEncrypted, workspaceEncrypted } = await getPersonalAndWorkspaceEnv(
deployment.userId,
workspaceId
@@ -552,268 +546,326 @@ export async function executeWorkflowForChat(
// Start logging session
await loggingSession.safeStart({
userId: deployment.userId,
workspaceId: '', // TODO: Get from workflow
workspaceId: workflowResult[0].workspaceId || '',
variables: workflowVariables,
})
let sessionCompleted = false
const stream = new ReadableStream({
async start(controller) {
const encoder = new TextEncoder()
const streamedContent = new Map<string, string>()
const streamedBlocks = new Set<string>() // Track which blocks have started streaming
const onStream = async (streamingExecution: any): Promise<void> => {
if (!streamingExecution.stream) return
const blockId = streamingExecution.execution?.blockId
const reader = streamingExecution.stream.getReader()
if (blockId) {
streamedContent.set(blockId, '')
// Add separator if this is not the first block to stream
if (streamedBlocks.size > 0) {
// Send separator before the new block starts
controller.enqueue(
encoder.encode(`data: ${JSON.stringify({ blockId, chunk: '\n\n' })}\n\n`)
)
}
streamedBlocks.add(blockId)
}
try {
while (true) {
const { done, value } = await reader.read()
if (done) {
controller.enqueue(
encoder.encode(`data: ${JSON.stringify({ blockId, event: 'end' })}\n\n`)
)
break
}
const chunk = new TextDecoder().decode(value)
if (blockId) {
streamedContent.set(blockId, (streamedContent.get(blockId) || '') + chunk)
}
controller.enqueue(encoder.encode(`data: ${JSON.stringify({ blockId, chunk })}\n\n`))
}
} catch (error) {
logger.error('Error while reading from stream:', error)
controller.error(error)
}
}
const executor = new Executor({
workflow: serializedWorkflow,
currentBlockStates: processedBlockStates,
envVarValues: decryptedEnvVars,
workflowInput: { input: input, conversationId },
workflowVariables,
contextExtensions: {
stream: true,
selectedOutputIds: selectedOutputIds.length > 0 ? selectedOutputIds : outputBlockIds,
edges: filteredEdges.map((e: any) => ({
source: e.source,
target: e.target,
})),
onStream,
isDeployedContext: true,
},
})
// Set up logging on the executor
loggingSession.setupExecutor(executor)
// Determine the start block for chat execution
const startBlock = TriggerUtils.findStartBlock(mergedStates, 'chat')
if (!startBlock) {
const errorMessage =
'No Chat trigger configured for this workflow. Add a Chat Trigger block to enable chat execution.'
logger.error(`[${requestId}] ${errorMessage}`)
await loggingSession.safeCompleteWithError({
endedAt: new Date().toISOString(),
totalDurationMs: 0,
error: {
message: errorMessage,
stackTrace: undefined,
},
})
throw new Error(errorMessage)
}
const startBlockId = startBlock.blockId
let result
try {
result = await executor.execute(workflowId, startBlockId)
} catch (error: any) {
logger.error(`[${requestId}] Chat workflow execution failed:`, error)
await loggingSession.safeCompleteWithError({
endedAt: new Date().toISOString(),
totalDurationMs: 0,
error: {
message: error.message || 'Chat workflow execution failed',
stackTrace: error.stack,
const streamedContent = new Map<string, string>()
const streamedBlocks = new Set<string>() // Track which blocks have started streaming
const onStream = async (streamingExecution: any): Promise<void> => {
if (!streamingExecution.stream) return
const blockId = streamingExecution.execution?.blockId
const reader = streamingExecution.stream.getReader()
if (blockId) {
streamedContent.set(blockId, '')
// Add separator if this is not the first block to stream
if (streamedBlocks.size > 0) {
// Send separator before the new block starts
controller.enqueue(
encoder.encode(`data: ${JSON.stringify({ blockId, chunk: '\n\n' })}\n\n`)
)
}
streamedBlocks.add(blockId)
}
try {
while (true) {
const { done, value } = await reader.read()
if (done) {
controller.enqueue(
encoder.encode(`data: ${JSON.stringify({ blockId, event: 'end' })}\n\n`)
)
break
}
const chunk = new TextDecoder().decode(value)
if (blockId) {
streamedContent.set(blockId, (streamedContent.get(blockId) || '') + chunk)
}
controller.enqueue(encoder.encode(`data: ${JSON.stringify({ blockId, chunk })}\n\n`))
}
} catch (error) {
logger.error('Error while reading from stream:', error)
controller.error(error)
}
}
// Determine the start block for chat execution BEFORE creating executor
const startBlock = TriggerUtils.findStartBlock(mergedStates, 'chat')
if (!startBlock) {
const errorMessage = CHAT_ERROR_MESSAGES.NO_CHAT_TRIGGER
logger.error(`[${requestId}] ${errorMessage}`)
if (!sessionCompleted) {
await loggingSession.safeCompleteWithError({
endedAt: new Date().toISOString(),
totalDurationMs: 0,
error: { message: errorMessage },
})
sessionCompleted = true
}
// Send error event that the client expects
controller.enqueue(
encoder.encode(
`data: ${JSON.stringify({
event: 'error',
error: CHAT_ERROR_MESSAGES.GENERIC_ERROR,
})}\n\n`
)
)
controller.close()
return
}
const startBlockId = startBlock.blockId
// Create executor AFTER confirming we have a chat trigger
const executor = new Executor({
workflow: serializedWorkflow,
currentBlockStates: processedBlockStates,
envVarValues: decryptedEnvVars,
workflowInput: { input: input, conversationId },
workflowVariables,
contextExtensions: {
stream: true,
selectedOutputIds: selectedOutputIds.length > 0 ? selectedOutputIds : outputBlockIds,
edges: filteredEdges.map((e: any) => ({
source: e.source,
target: e.target,
})),
onStream,
isDeployedContext: true,
},
})
throw error
}
// Handle both ExecutionResult and StreamingExecution types
const executionResult =
result && typeof result === 'object' && 'execution' in result
? (result.execution as ExecutionResult)
: (result as ExecutionResult)
// Set up logging on the executor
loggingSession.setupExecutor(executor)
if (executionResult?.logs) {
// Update streamed content and apply tokenization - process regardless of overall success
// This ensures partial successes (some agents succeed, some fail) still return results
// Add newlines between different agent outputs for better readability
const processedOutputs = new Set<string>()
executionResult.logs.forEach((log: BlockLog) => {
if (streamedContent.has(log.blockId)) {
const content = streamedContent.get(log.blockId)
if (log.output && content) {
// Add newline separation between different outputs (but not before the first one)
const separator = processedOutputs.size > 0 ? '\n\n' : ''
log.output.content = separator + content
processedOutputs.add(log.blockId)
}
let result
try {
result = await executor.execute(workflowId, startBlockId)
} catch (error: any) {
logger.error(`[${requestId}] Chat workflow execution failed:`, error)
if (!sessionCompleted) {
await loggingSession.safeCompleteWithError({
endedAt: new Date().toISOString(),
totalDurationMs: 0,
error: { message: error.message || 'Chat workflow execution failed' },
})
sessionCompleted = true
}
})
// Also process non-streamed outputs from selected blocks (like function blocks)
// This uses the same logic as the chat panel to ensure identical behavior
const nonStreamingLogs = executionResult.logs.filter(
(log: BlockLog) => !streamedContent.has(log.blockId)
)
// Extract the exact same functions used by the chat panel
const extractBlockIdFromOutputId = (outputId: string): string => {
return outputId.includes('_') ? outputId.split('_')[0] : outputId.split('.')[0]
// Send error to stream before ending
controller.enqueue(
encoder.encode(
`data: ${JSON.stringify({
event: 'error',
error: error.message || 'Chat workflow execution failed',
})}\n\n`
)
)
controller.close()
return // Don't throw - just return to end the stream gracefully
}
const extractPathFromOutputId = (outputId: string, blockId: string): string => {
return outputId.substring(blockId.length + 1)
}
// Handle both ExecutionResult and StreamingExecution types
const executionResult =
result && typeof result === 'object' && 'execution' in result
? (result.execution as ExecutionResult)
: (result as ExecutionResult)
if (executionResult?.logs) {
// Update streamed content and apply tokenization - process regardless of overall success
// This ensures partial successes (some agents succeed, some fail) still return results
// Add newlines between different agent outputs for better readability
const processedOutputs = new Set<string>()
executionResult.logs.forEach((log: BlockLog) => {
if (streamedContent.has(log.blockId)) {
const content = streamedContent.get(log.blockId)
if (log.output && content) {
// Add newline separation between different outputs (but not before the first one)
const separator = processedOutputs.size > 0 ? '\n\n' : ''
log.output.content = separator + content
processedOutputs.add(log.blockId)
}
}
})
// Also process non-streamed outputs from selected blocks (like function blocks)
// This uses the same logic as the chat panel to ensure identical behavior
const nonStreamingLogs = executionResult.logs.filter(
(log: BlockLog) => !streamedContent.has(log.blockId)
)
// Extract the exact same functions used by the chat panel
const extractBlockIdFromOutputId = (outputId: string): string => {
return outputId.includes('_') ? outputId.split('_')[0] : outputId.split('.')[0]
}
const extractPathFromOutputId = (outputId: string, blockId: string): string => {
return outputId.substring(blockId.length + 1)
}
const parseOutputContentSafely = (output: any): any => {
if (!output?.content) {
return output
}
if (typeof output.content === 'string') {
try {
return JSON.parse(output.content)
} catch (e) {
// Fallback to original structure if parsing fails
return output
}
}
const parseOutputContentSafely = (output: any): any => {
if (!output?.content) {
return output
}
if (typeof output.content === 'string') {
try {
return JSON.parse(output.content)
} catch (e) {
// Fallback to original structure if parsing fails
return output
}
}
// Filter outputs that have matching logs (exactly like chat panel)
const outputsToRender = selectedOutputIds.filter((outputId) => {
const blockIdForOutput = extractBlockIdFromOutputId(outputId)
return nonStreamingLogs.some((log) => log.blockId === blockIdForOutput)
})
return output
}
// Process each selected output (exactly like chat panel)
for (const outputId of outputsToRender) {
const blockIdForOutput = extractBlockIdFromOutputId(outputId)
const path = extractPathFromOutputId(outputId, blockIdForOutput)
const log = nonStreamingLogs.find((l) => l.blockId === blockIdForOutput)
// Filter outputs that have matching logs (exactly like chat panel)
const outputsToRender = selectedOutputIds.filter((outputId) => {
const blockIdForOutput = extractBlockIdFromOutputId(outputId)
return nonStreamingLogs.some((log) => log.blockId === blockIdForOutput)
})
if (log) {
let outputValue: any = log.output
// Process each selected output (exactly like chat panel)
for (const outputId of outputsToRender) {
const blockIdForOutput = extractBlockIdFromOutputId(outputId)
const path = extractPathFromOutputId(outputId, blockIdForOutput)
const log = nonStreamingLogs.find((l) => l.blockId === blockIdForOutput)
if (path) {
// Parse JSON content safely (exactly like chat panel)
outputValue = parseOutputContentSafely(outputValue)
if (log) {
let outputValue: any = log.output
if (path) {
// Parse JSON content safely (exactly like chat panel)
outputValue = parseOutputContentSafely(outputValue)
const pathParts = path.split('.')
for (const part of pathParts) {
if (outputValue && typeof outputValue === 'object' && part in outputValue) {
outputValue = outputValue[part]
} else {
outputValue = undefined
break
const pathParts = path.split('.')
for (const part of pathParts) {
if (outputValue && typeof outputValue === 'object' && part in outputValue) {
outputValue = outputValue[part]
} else {
outputValue = undefined
break
}
}
}
}
if (outputValue !== undefined) {
// Add newline separation between different outputs
const separator = processedOutputs.size > 0 ? '\n\n' : ''
if (outputValue !== undefined) {
// Add newline separation between different outputs
const separator = processedOutputs.size > 0 ? '\n\n' : ''
// Format the output exactly like the chat panel
const formattedOutput =
typeof outputValue === 'string' ? outputValue : JSON.stringify(outputValue, null, 2)
// Format the output exactly like the chat panel
const formattedOutput =
typeof outputValue === 'string'
? outputValue
: JSON.stringify(outputValue, null, 2)
// Update the log content
if (!log.output.content) {
log.output.content = separator + formattedOutput
} else {
log.output.content = separator + formattedOutput
// Update the log content
if (!log.output.content) {
log.output.content = separator + formattedOutput
} else {
log.output.content = separator + formattedOutput
}
processedOutputs.add(log.blockId)
}
processedOutputs.add(log.blockId)
}
}
// Process all logs for streaming tokenization
const processedCount = processStreamingBlockLogs(executionResult.logs, streamedContent)
logger.info(`Processed ${processedCount} blocks for streaming tokenization`)
const { traceSpans, totalDuration } = buildTraceSpans(executionResult)
const enrichedResult = { ...executionResult, traceSpans, totalDuration }
if (conversationId) {
if (!enrichedResult.metadata) {
enrichedResult.metadata = {
duration: totalDuration,
startTime: new Date().toISOString(),
}
}
;(enrichedResult.metadata as any).conversationId = conversationId
}
// Use the executionId created at the beginning of this function
logger.debug(`Using execution ID for deployed chat: ${executionId}`)
if (executionResult.success) {
try {
await db
.update(userStats)
.set({
totalChatExecutions: sql`total_chat_executions + 1`,
lastActive: new Date(),
})
.where(eq(userStats.userId, deployment.userId))
logger.debug(`Updated user stats for deployed chat: ${deployment.userId}`)
} catch (error) {
logger.error(`Failed to update user stats for deployed chat:`, error)
}
}
}
// Process all logs for streaming tokenization
const processedCount = processStreamingBlockLogs(executionResult.logs, streamedContent)
logger.info(`Processed ${processedCount} blocks for streaming tokenization`)
const { traceSpans, totalDuration } = buildTraceSpans(executionResult)
const enrichedResult = { ...executionResult, traceSpans, totalDuration }
if (conversationId) {
if (!enrichedResult.metadata) {
enrichedResult.metadata = {
duration: totalDuration,
startTime: new Date().toISOString(),
}
}
;(enrichedResult.metadata as any).conversationId = conversationId
if (!(result && typeof result === 'object' && 'stream' in result)) {
controller.enqueue(
encoder.encode(`data: ${JSON.stringify({ event: 'final', data: result })}\n\n`)
)
}
const executionId = uuidv4()
logger.debug(`Generated execution ID for deployed chat: ${executionId}`)
if (executionResult.success) {
try {
await db
.update(userStats)
.set({
totalChatExecutions: sql`total_chat_executions + 1`,
lastActive: new Date(),
})
.where(eq(userStats.userId, deployment.userId))
logger.debug(`Updated user stats for deployed chat: ${deployment.userId}`)
} catch (error) {
logger.error(`Failed to update user stats for deployed chat:`, error)
}
if (!sessionCompleted) {
const resultForTracing =
executionResult || ({ success: true, output: {}, logs: [] } as ExecutionResult)
const { traceSpans } = buildTraceSpans(resultForTracing)
await loggingSession.safeComplete({
endedAt: new Date().toISOString(),
totalDurationMs: executionResult?.metadata?.duration || 0,
finalOutput: executionResult?.output || {},
traceSpans,
})
sessionCompleted = true
}
}
if (!(result && typeof result === 'object' && 'stream' in result)) {
controller.close()
} catch (error: any) {
// Handle any errors that occur in the stream
logger.error(`[${requestId}] Stream error:`, error)
// Send error event to client
const encoder = new TextEncoder()
controller.enqueue(
encoder.encode(`data: ${JSON.stringify({ event: 'final', data: result })}\n\n`)
encoder.encode(
`data: ${JSON.stringify({
event: 'error',
error: error.message || 'An unexpected error occurred',
})}\n\n`
)
)
}
// Complete logging session (for both success and failure)
if (executionResult?.logs) {
const { traceSpans } = buildTraceSpans(executionResult)
await loggingSession.safeComplete({
endedAt: new Date().toISOString(),
totalDurationMs: executionResult.metadata?.duration || 0,
finalOutput: executionResult.output,
traceSpans,
})
}
// Try to complete the logging session with error if not already completed
if (!sessionCompleted && loggingSession) {
await loggingSession.safeCompleteWithError({
endedAt: new Date().toISOString(),
totalDurationMs: 0,
error: { message: error.message || 'Stream processing error' },
})
sessionCompleted = true
}
controller.close()
controller.close()
}
},
})

View File

@@ -16,13 +16,11 @@ import {
PasswordAuth,
VoiceInterface,
} from '@/app/chat/components'
import { CHAT_ERROR_MESSAGES, CHAT_REQUEST_TIMEOUT_MS } from '@/app/chat/constants'
import { useAudioStreaming, useChatStreaming } from '@/app/chat/hooks'
const logger = createLogger('ChatClient')
// Chat timeout configuration (5 minutes)
const CHAT_REQUEST_TIMEOUT_MS = 300000
interface ChatConfig {
id: string
title: string
@@ -237,7 +235,7 @@ export default function ChatClient({ subdomain }: { subdomain: string }) {
}
} catch (error) {
logger.error('Error fetching chat config:', error)
setError('This chat is currently unavailable. Please try again later.')
setError(CHAT_ERROR_MESSAGES.CHAT_UNAVAILABLE)
}
}
@@ -372,7 +370,7 @@ export default function ChatClient({ subdomain }: { subdomain: string }) {
setIsLoading(false)
const errorMessage: ChatMessage = {
id: crypto.randomUUID(),
content: 'Sorry, there was an error processing your message. Please try again.',
content: CHAT_ERROR_MESSAGES.GENERIC_ERROR,
type: 'assistant',
timestamp: new Date(),
}

View File

@@ -0,0 +1,15 @@
export const CHAT_ERROR_MESSAGES = {
GENERIC_ERROR: 'Sorry, there was an error processing your message. Please try again.',
NETWORK_ERROR: 'Unable to connect to the server. Please check your connection and try again.',
TIMEOUT_ERROR: 'Request timed out. Please try again.',
AUTH_REQUIRED_PASSWORD: 'This chat requires a password to access.',
AUTH_REQUIRED_EMAIL: 'Please provide your email to access this chat.',
CHAT_UNAVAILABLE: 'This chat is currently unavailable. Please try again later.',
NO_CHAT_TRIGGER:
'No Chat trigger configured for this workflow. Add a Chat Trigger block to enable chat execution.',
USAGE_LIMIT_EXCEEDED: 'Usage limit exceeded. Please upgrade your plan to continue using chat.',
} as const
export const CHAT_REQUEST_TIMEOUT_MS = 300000 // 5 minutes (same as in chat.tsx)
export type ChatErrorType = keyof typeof CHAT_ERROR_MESSAGES

View File

@@ -3,6 +3,7 @@
import { useRef, useState } from 'react'
import { createLogger } from '@/lib/logs/console/logger'
import type { ChatMessage } from '@/app/chat/components/message/message'
import { CHAT_ERROR_MESSAGES } from '@/app/chat/constants'
// No longer need complex output extraction - backend handles this
import type { ExecutionResult } from '@/executor/types'
@@ -151,6 +152,25 @@ export function useChatStreaming() {
const json = JSON.parse(line.substring(6))
const { blockId, chunk: contentChunk, event: eventType } = json
// Handle error events from the server
if (eventType === 'error' || json.event === 'error') {
const errorMessage = json.error || CHAT_ERROR_MESSAGES.GENERIC_ERROR
setMessages((prev) =>
prev.map((msg) =>
msg.id === messageId
? {
...msg,
content: errorMessage,
isStreaming: false,
type: 'assistant' as const,
}
: msg
)
)
setIsLoading(false)
return
}
if (eventType === 'final' && json.data) {
// The backend has already processed and combined all outputs
// We just need to extract the combined content and use it

View File

@@ -671,6 +671,19 @@ export function useWorkflowExecution() {
selectedOutputIds = chatStore.getState().getSelectedWorkflowOutput(activeWorkflowId)
}
// Helper to extract test values from inputFormat subblock
const extractTestValuesFromInputFormat = (inputFormatValue: any): Record<string, any> => {
const testInput: Record<string, any> = {}
if (Array.isArray(inputFormatValue)) {
inputFormatValue.forEach((field: any) => {
if (field && typeof field === 'object' && field.name && field.value !== undefined) {
testInput[field.name] = field.value
}
})
}
return testInput
}
// Determine start block and workflow input based on execution type
let startBlockId: string | undefined
let finalWorkflowInput = workflowInput
@@ -720,19 +733,12 @@ export function useWorkflowExecution() {
// Extract test values from the API trigger's inputFormat
if (selectedTrigger.type === 'api_trigger' || selectedTrigger.type === 'starter') {
const inputFormatValue = selectedTrigger.subBlocks?.inputFormat?.value
if (Array.isArray(inputFormatValue)) {
const testInput: Record<string, any> = {}
inputFormatValue.forEach((field: any) => {
if (field && typeof field === 'object' && field.name && field.value !== undefined) {
testInput[field.name] = field.value
}
})
const testInput = extractTestValuesFromInputFormat(inputFormatValue)
// Use the test input as workflow input
if (Object.keys(testInput).length > 0) {
finalWorkflowInput = testInput
logger.info('Using API trigger test values for manual run:', testInput)
}
// Use the test input as workflow input
if (Object.keys(testInput).length > 0) {
finalWorkflowInput = testInput
logger.info('Using API trigger test values for manual run:', testInput)
}
}
}
@@ -741,18 +747,29 @@ export function useWorkflowExecution() {
logger.error('Multiple API triggers found')
setIsExecuting(false)
throw error
} else if (manualTriggers.length === 1) {
// No API trigger, check for manual trigger
selectedTrigger = manualTriggers[0]
} else if (manualTriggers.length >= 1) {
// No API trigger, check for manual triggers
// Prefer manual_trigger over input_trigger for simple runs
const manualTrigger = manualTriggers.find((t) => t.type === 'manual_trigger')
const inputTrigger = manualTriggers.find((t) => t.type === 'input_trigger')
selectedTrigger = manualTrigger || inputTrigger || manualTriggers[0]
const blockEntry = entries.find(([, block]) => block === selectedTrigger)
if (blockEntry) {
selectedBlockId = blockEntry[0]
// Extract test values from input trigger's inputFormat if it's an input_trigger
if (selectedTrigger.type === 'input_trigger') {
const inputFormatValue = selectedTrigger.subBlocks?.inputFormat?.value
const testInput = extractTestValuesFromInputFormat(inputFormatValue)
// Use the test input as workflow input
if (Object.keys(testInput).length > 0) {
finalWorkflowInput = testInput
logger.info('Using Input trigger test values for manual run:', testInput)
}
}
}
} else if (manualTriggers.length > 1) {
const error = new Error('Multiple Input Trigger blocks found. Keep only one.')
logger.error('Multiple input triggers found')
setIsExecuting(false)
throw error
} else {
// Fallback: Check for legacy starter block
const starterBlock = Object.values(filteredStates).find((block) => block.type === 'starter')
@@ -769,8 +786,8 @@ export function useWorkflowExecution() {
}
if (!selectedBlockId || !selectedTrigger) {
const error = new Error('Manual run requires an Input Trigger or API Trigger block')
logger.error('No input or API triggers found for manual run')
const error = new Error('Manual run requires a Manual, Input Form, or API Trigger block')
logger.error('No manual/input or API triggers found for manual run')
setIsExecuting(false)
throw error
}

View File

@@ -1,9 +1,9 @@
import type { SVGProps } from 'react'
import { createElement } from 'react'
import { Play } from 'lucide-react'
import { FormInput } from 'lucide-react'
import type { BlockConfig } from '@/blocks/types'
const InputTriggerIcon = (props: SVGProps<SVGSVGElement>) => createElement(Play, props)
const InputTriggerIcon = (props: SVGProps<SVGSVGElement>) => createElement(FormInput, props)
export const InputTriggerBlock: BlockConfig = {
type: 'input_trigger',

View File

@@ -0,0 +1,31 @@
import type { SVGProps } from 'react'
import { createElement } from 'react'
import { Play } from 'lucide-react'
import type { BlockConfig } from '@/blocks/types'
const ManualTriggerIcon = (props: SVGProps<SVGSVGElement>) => createElement(Play, props)
export const ManualTriggerBlock: BlockConfig = {
type: 'manual_trigger',
name: 'Manual',
description: 'Start workflow manually from the editor',
longDescription:
'Trigger the workflow manually without defining an input schema. Useful for simple runs where no structured input is needed.',
bestPractices: `
- Use when you want a simple manual start without defining an input format.
- If you need structured inputs or child workflows to map variables from, prefer the Input Form Trigger.
`,
category: 'triggers',
bgColor: '#2563EB',
icon: ManualTriggerIcon,
subBlocks: [],
tools: {
access: [],
},
inputs: {},
outputs: {},
triggers: {
enabled: true,
available: ['manual'],
},
}

View File

@@ -19,8 +19,8 @@ const getAvailableWorkflows = (): Array<{ label: string; id: string }> => {
export const WorkflowInputBlock: BlockConfig = {
type: 'workflow_input',
name: 'Workflow',
description: 'Execute another workflow and map variables to its Input Trigger schema.',
longDescription: `Execute another child workflow and map variables to its Input Trigger schema. Helps with modularizing workflows.`,
description: 'Execute another workflow and map variables to its Input Form Trigger schema.',
longDescription: `Execute another child workflow and map variables to its Input Form Trigger schema. Helps with modularizing workflows.`,
bestPractices: `
- Usually clarify/check if the user has tagged a workflow to use as the child workflow. Understand the child workflow to determine the logical position of this block in the workflow.
- Remember, that the start point of the child workflow is the Input Form Trigger block.

View File

@@ -38,6 +38,7 @@ import { JiraBlock } from '@/blocks/blocks/jira'
import { KnowledgeBlock } from '@/blocks/blocks/knowledge'
import { LinearBlock } from '@/blocks/blocks/linear'
import { LinkupBlock } from '@/blocks/blocks/linkup'
import { ManualTriggerBlock } from '@/blocks/blocks/manual_trigger'
import { McpBlock } from '@/blocks/blocks/mcp'
import { Mem0Block } from '@/blocks/blocks/mem0'
import { MemoryBlock } from '@/blocks/blocks/memory'
@@ -153,6 +154,7 @@ export const registry: Record<string, BlockConfig> = {
starter: StarterBlock,
input_trigger: InputTriggerBlock,
chat_trigger: ChatTriggerBlock,
manual_trigger: ManualTriggerBlock,
api_trigger: ApiTriggerBlock,
supabase: SupabaseBlock,
tavily: TavilyBlock,

View File

@@ -1,6 +1,7 @@
import { BlockPathCalculator } from '@/lib/block-path-calculator'
import { createLogger } from '@/lib/logs/console/logger'
import type { TraceSpan } from '@/lib/logs/types'
import { getBlock } from '@/blocks'
import type { BlockOutput } from '@/blocks/types'
import { BlockType } from '@/executor/consts'
import {
@@ -1779,9 +1780,16 @@ export class Executor {
context.blockLogs.push(blockLog)
// Skip console logging for infrastructure blocks like loops and parallels
// Skip console logging for infrastructure blocks and trigger blocks
// For streaming blocks, we'll add the console entry after stream processing
if (block.metadata?.id !== BlockType.LOOP && block.metadata?.id !== BlockType.PARALLEL) {
const blockConfig = getBlock(block.metadata?.id || '')
const isTriggerBlock =
blockConfig?.category === 'triggers' || block.metadata?.id === BlockType.STARTER
if (
block.metadata?.id !== BlockType.LOOP &&
block.metadata?.id !== BlockType.PARALLEL &&
!isTriggerBlock
) {
// Determine iteration context for this block
let iterationCurrent: number | undefined
let iterationTotal: number | undefined
@@ -1889,8 +1897,15 @@ export class Executor {
context.blockLogs.push(blockLog)
// Skip console logging for infrastructure blocks like loops and parallels
if (block.metadata?.id !== BlockType.LOOP && block.metadata?.id !== BlockType.PARALLEL) {
// Skip console logging for infrastructure blocks and trigger blocks
const nonStreamBlockConfig = getBlock(block.metadata?.id || '')
const isNonStreamTriggerBlock =
nonStreamBlockConfig?.category === 'triggers' || block.metadata?.id === BlockType.STARTER
if (
block.metadata?.id !== BlockType.LOOP &&
block.metadata?.id !== BlockType.PARALLEL &&
!isNonStreamTriggerBlock
) {
// Determine iteration context for this block
let iterationCurrent: number | undefined
let iterationTotal: number | undefined
@@ -2001,8 +2016,15 @@ export class Executor {
// Log the error even if we'll continue execution through error path
context.blockLogs.push(blockLog)
// Skip console logging for infrastructure blocks like loops and parallels
if (block.metadata?.id !== BlockType.LOOP && block.metadata?.id !== BlockType.PARALLEL) {
// Skip console logging for infrastructure blocks and trigger blocks
const errorBlockConfig = getBlock(block.metadata?.id || '')
const isErrorTriggerBlock =
errorBlockConfig?.category === 'triggers' || block.metadata?.id === BlockType.STARTER
if (
block.metadata?.id !== BlockType.LOOP &&
block.metadata?.id !== BlockType.PARALLEL &&
!isErrorTriggerBlock
) {
// Determine iteration context for this block
let iterationCurrent: number | undefined
let iterationTotal: number | undefined

View File

@@ -5,6 +5,7 @@ import { getBlock } from '@/blocks'
*/
export const TRIGGER_TYPES = {
INPUT: 'input_trigger',
MANUAL: 'manual_trigger',
CHAT: 'chat_trigger',
API: 'api_trigger',
WEBHOOK: 'webhook',
@@ -81,7 +82,7 @@ export class TriggerUtils {
* Check if a block is a manual-compatible trigger
*/
static isManualTrigger(block: { type: string; subBlocks?: any }): boolean {
if (block.type === TRIGGER_TYPES.INPUT) {
if (block.type === TRIGGER_TYPES.INPUT || block.type === TRIGGER_TYPES.MANUAL) {
return true
}
@@ -139,6 +140,8 @@ export class TriggerUtils {
return 'Chat'
case TRIGGER_TYPES.INPUT:
return 'Input Trigger'
case TRIGGER_TYPES.MANUAL:
return 'Manual'
case TRIGGER_TYPES.API:
return 'API'
case TRIGGER_TYPES.WEBHOOK:
@@ -216,12 +219,14 @@ export class TriggerUtils {
* Check if a trigger type requires single instance constraint
*/
static requiresSingleInstance(triggerType: string): boolean {
// API and Input triggers cannot coexist with each other
// Chat trigger must be unique
// Schedules and webhooks can coexist with API/Input triggers
// Each trigger type can only have one instance of itself
// Manual and Input Form can coexist
// API, Chat triggers must be unique
// Schedules and webhooks can have multiple instances
return (
triggerType === TRIGGER_TYPES.API ||
triggerType === TRIGGER_TYPES.INPUT ||
triggerType === TRIGGER_TYPES.MANUAL ||
triggerType === TRIGGER_TYPES.CHAT
)
}
@@ -244,11 +249,12 @@ export class TriggerUtils {
const blockArray = Array.isArray(blocks) ? blocks : Object.values(blocks)
const hasLegacyStarter = TriggerUtils.hasLegacyStarter(blocks)
// Legacy starter block can't coexist with Chat, Input, or API triggers
// Legacy starter block can't coexist with Chat, Input, Manual, or API triggers
if (hasLegacyStarter) {
if (
triggerType === TRIGGER_TYPES.CHAT ||
triggerType === TRIGGER_TYPES.INPUT ||
triggerType === TRIGGER_TYPES.MANUAL ||
triggerType === TRIGGER_TYPES.API
) {
return true
@@ -260,6 +266,7 @@ export class TriggerUtils {
(block) =>
block.type === TRIGGER_TYPES.CHAT ||
block.type === TRIGGER_TYPES.INPUT ||
block.type === TRIGGER_TYPES.MANUAL ||
block.type === TRIGGER_TYPES.API
)
if (hasModernTriggers) {
@@ -272,6 +279,11 @@ export class TriggerUtils {
return blockArray.some((block) => block.type === TRIGGER_TYPES.INPUT)
}
// Only one Manual trigger allowed
if (triggerType === TRIGGER_TYPES.MANUAL) {
return blockArray.some((block) => block.type === TRIGGER_TYPES.MANUAL)
}
// Only one API trigger allowed
if (triggerType === TRIGGER_TYPES.API) {
return blockArray.some((block) => block.type === TRIGGER_TYPES.API)