diff --git a/apps/sim/app/api/chat/utils.ts b/apps/sim/app/api/chat/utils.ts index 706f2b520..7b3165706 100644 --- a/apps/sim/app/api/chat/utils.ts +++ b/apps/sim/app/api/chat/utils.ts @@ -14,6 +14,7 @@ import { processStreamingBlockLogs } from '@/lib/tokenization' import { getEmailDomain } from '@/lib/urls/utils' import { decryptSecret, generateRequestId } from '@/lib/utils' import { TriggerUtils } from '@/lib/workflows/triggers' +import { CHAT_ERROR_MESSAGES } from '@/app/chat/constants' import { getBlock } from '@/blocks' import { Executor } from '@/executor' import type { BlockLog, ExecutionResult } from '@/executor/types' @@ -345,9 +346,7 @@ export async function executeWorkflowForChat( chatId, } ) - throw new Error( - usageCheck.message || 'Usage limit exceeded. Please upgrade your plan to continue using chat.' - ) + throw new Error(usageCheck.message || CHAT_ERROR_MESSAGES.USAGE_LIMIT_EXCEEDED) } // Set up logging for chat execution @@ -404,6 +403,7 @@ export async function executeWorkflowForChat( .select({ isDeployed: workflow.isDeployed, variables: workflow.variables, + workspaceId: workflow.workspaceId, }) .from(workflow) .where(eq(workflow.id, workflowId)) @@ -462,13 +462,7 @@ export async function executeWorkflowForChat( // Get user environment variables with workspace precedence let envVars: Record = {} try { - const wfWorkspaceRow = await db - .select({ workspaceId: workflow.workspaceId }) - .from(workflow) - .where(eq(workflow.id, workflowId)) - .limit(1) - - const workspaceId = wfWorkspaceRow[0]?.workspaceId || undefined + const workspaceId = workflowResult[0].workspaceId || undefined const { personalEncrypted, workspaceEncrypted } = await getPersonalAndWorkspaceEnv( deployment.userId, workspaceId @@ -552,268 +546,326 @@ export async function executeWorkflowForChat( // Start logging session await loggingSession.safeStart({ userId: deployment.userId, - workspaceId: '', // TODO: Get from workflow + workspaceId: workflowResult[0].workspaceId || '', variables: workflowVariables, }) + let sessionCompleted = false + const stream = new ReadableStream({ async start(controller) { const encoder = new TextEncoder() - const streamedContent = new Map() - const streamedBlocks = new Set() // Track which blocks have started streaming - const onStream = async (streamingExecution: any): Promise => { - if (!streamingExecution.stream) return - - const blockId = streamingExecution.execution?.blockId - const reader = streamingExecution.stream.getReader() - if (blockId) { - streamedContent.set(blockId, '') - - // Add separator if this is not the first block to stream - if (streamedBlocks.size > 0) { - // Send separator before the new block starts - controller.enqueue( - encoder.encode(`data: ${JSON.stringify({ blockId, chunk: '\n\n' })}\n\n`) - ) - } - streamedBlocks.add(blockId) - } - try { - while (true) { - const { done, value } = await reader.read() - if (done) { - controller.enqueue( - encoder.encode(`data: ${JSON.stringify({ blockId, event: 'end' })}\n\n`) - ) - break - } - const chunk = new TextDecoder().decode(value) - if (blockId) { - streamedContent.set(blockId, (streamedContent.get(blockId) || '') + chunk) - } - controller.enqueue(encoder.encode(`data: ${JSON.stringify({ blockId, chunk })}\n\n`)) - } - } catch (error) { - logger.error('Error while reading from stream:', error) - controller.error(error) - } - } - - const executor = new Executor({ - workflow: serializedWorkflow, - currentBlockStates: processedBlockStates, - envVarValues: decryptedEnvVars, - workflowInput: { input: input, conversationId }, - workflowVariables, - contextExtensions: { - stream: true, - selectedOutputIds: selectedOutputIds.length > 0 ? selectedOutputIds : outputBlockIds, - edges: filteredEdges.map((e: any) => ({ - source: e.source, - target: e.target, - })), - onStream, - isDeployedContext: true, - }, - }) - - // Set up logging on the executor - loggingSession.setupExecutor(executor) - - // Determine the start block for chat execution - const startBlock = TriggerUtils.findStartBlock(mergedStates, 'chat') - - if (!startBlock) { - const errorMessage = - 'No Chat trigger configured for this workflow. Add a Chat Trigger block to enable chat execution.' - logger.error(`[${requestId}] ${errorMessage}`) - await loggingSession.safeCompleteWithError({ - endedAt: new Date().toISOString(), - totalDurationMs: 0, - error: { - message: errorMessage, - stackTrace: undefined, - }, - }) - throw new Error(errorMessage) - } - - const startBlockId = startBlock.blockId - - let result try { - result = await executor.execute(workflowId, startBlockId) - } catch (error: any) { - logger.error(`[${requestId}] Chat workflow execution failed:`, error) - await loggingSession.safeCompleteWithError({ - endedAt: new Date().toISOString(), - totalDurationMs: 0, - error: { - message: error.message || 'Chat workflow execution failed', - stackTrace: error.stack, + const streamedContent = new Map() + const streamedBlocks = new Set() // Track which blocks have started streaming + + const onStream = async (streamingExecution: any): Promise => { + if (!streamingExecution.stream) return + + const blockId = streamingExecution.execution?.blockId + const reader = streamingExecution.stream.getReader() + if (blockId) { + streamedContent.set(blockId, '') + + // Add separator if this is not the first block to stream + if (streamedBlocks.size > 0) { + // Send separator before the new block starts + controller.enqueue( + encoder.encode(`data: ${JSON.stringify({ blockId, chunk: '\n\n' })}\n\n`) + ) + } + streamedBlocks.add(blockId) + } + try { + while (true) { + const { done, value } = await reader.read() + if (done) { + controller.enqueue( + encoder.encode(`data: ${JSON.stringify({ blockId, event: 'end' })}\n\n`) + ) + break + } + const chunk = new TextDecoder().decode(value) + if (blockId) { + streamedContent.set(blockId, (streamedContent.get(blockId) || '') + chunk) + } + controller.enqueue(encoder.encode(`data: ${JSON.stringify({ blockId, chunk })}\n\n`)) + } + } catch (error) { + logger.error('Error while reading from stream:', error) + controller.error(error) + } + } + + // Determine the start block for chat execution BEFORE creating executor + const startBlock = TriggerUtils.findStartBlock(mergedStates, 'chat') + + if (!startBlock) { + const errorMessage = CHAT_ERROR_MESSAGES.NO_CHAT_TRIGGER + logger.error(`[${requestId}] ${errorMessage}`) + + if (!sessionCompleted) { + await loggingSession.safeCompleteWithError({ + endedAt: new Date().toISOString(), + totalDurationMs: 0, + error: { message: errorMessage }, + }) + sessionCompleted = true + } + + // Send error event that the client expects + controller.enqueue( + encoder.encode( + `data: ${JSON.stringify({ + event: 'error', + error: CHAT_ERROR_MESSAGES.GENERIC_ERROR, + })}\n\n` + ) + ) + controller.close() + return + } + + const startBlockId = startBlock.blockId + + // Create executor AFTER confirming we have a chat trigger + const executor = new Executor({ + workflow: serializedWorkflow, + currentBlockStates: processedBlockStates, + envVarValues: decryptedEnvVars, + workflowInput: { input: input, conversationId }, + workflowVariables, + contextExtensions: { + stream: true, + selectedOutputIds: selectedOutputIds.length > 0 ? selectedOutputIds : outputBlockIds, + edges: filteredEdges.map((e: any) => ({ + source: e.source, + target: e.target, + })), + onStream, + isDeployedContext: true, }, }) - throw error - } - // Handle both ExecutionResult and StreamingExecution types - const executionResult = - result && typeof result === 'object' && 'execution' in result - ? (result.execution as ExecutionResult) - : (result as ExecutionResult) + // Set up logging on the executor + loggingSession.setupExecutor(executor) - if (executionResult?.logs) { - // Update streamed content and apply tokenization - process regardless of overall success - // This ensures partial successes (some agents succeed, some fail) still return results - - // Add newlines between different agent outputs for better readability - const processedOutputs = new Set() - executionResult.logs.forEach((log: BlockLog) => { - if (streamedContent.has(log.blockId)) { - const content = streamedContent.get(log.blockId) - if (log.output && content) { - // Add newline separation between different outputs (but not before the first one) - const separator = processedOutputs.size > 0 ? '\n\n' : '' - log.output.content = separator + content - processedOutputs.add(log.blockId) - } + let result + try { + result = await executor.execute(workflowId, startBlockId) + } catch (error: any) { + logger.error(`[${requestId}] Chat workflow execution failed:`, error) + if (!sessionCompleted) { + await loggingSession.safeCompleteWithError({ + endedAt: new Date().toISOString(), + totalDurationMs: 0, + error: { message: error.message || 'Chat workflow execution failed' }, + }) + sessionCompleted = true } - }) - // Also process non-streamed outputs from selected blocks (like function blocks) - // This uses the same logic as the chat panel to ensure identical behavior - const nonStreamingLogs = executionResult.logs.filter( - (log: BlockLog) => !streamedContent.has(log.blockId) - ) - - // Extract the exact same functions used by the chat panel - const extractBlockIdFromOutputId = (outputId: string): string => { - return outputId.includes('_') ? outputId.split('_')[0] : outputId.split('.')[0] + // Send error to stream before ending + controller.enqueue( + encoder.encode( + `data: ${JSON.stringify({ + event: 'error', + error: error.message || 'Chat workflow execution failed', + })}\n\n` + ) + ) + controller.close() + return // Don't throw - just return to end the stream gracefully } - const extractPathFromOutputId = (outputId: string, blockId: string): string => { - return outputId.substring(blockId.length + 1) - } + // Handle both ExecutionResult and StreamingExecution types + const executionResult = + result && typeof result === 'object' && 'execution' in result + ? (result.execution as ExecutionResult) + : (result as ExecutionResult) + + if (executionResult?.logs) { + // Update streamed content and apply tokenization - process regardless of overall success + // This ensures partial successes (some agents succeed, some fail) still return results + + // Add newlines between different agent outputs for better readability + const processedOutputs = new Set() + executionResult.logs.forEach((log: BlockLog) => { + if (streamedContent.has(log.blockId)) { + const content = streamedContent.get(log.blockId) + if (log.output && content) { + // Add newline separation between different outputs (but not before the first one) + const separator = processedOutputs.size > 0 ? '\n\n' : '' + log.output.content = separator + content + processedOutputs.add(log.blockId) + } + } + }) + + // Also process non-streamed outputs from selected blocks (like function blocks) + // This uses the same logic as the chat panel to ensure identical behavior + const nonStreamingLogs = executionResult.logs.filter( + (log: BlockLog) => !streamedContent.has(log.blockId) + ) + + // Extract the exact same functions used by the chat panel + const extractBlockIdFromOutputId = (outputId: string): string => { + return outputId.includes('_') ? outputId.split('_')[0] : outputId.split('.')[0] + } + + const extractPathFromOutputId = (outputId: string, blockId: string): string => { + return outputId.substring(blockId.length + 1) + } + + const parseOutputContentSafely = (output: any): any => { + if (!output?.content) { + return output + } + + if (typeof output.content === 'string') { + try { + return JSON.parse(output.content) + } catch (e) { + // Fallback to original structure if parsing fails + return output + } + } - const parseOutputContentSafely = (output: any): any => { - if (!output?.content) { return output } - if (typeof output.content === 'string') { - try { - return JSON.parse(output.content) - } catch (e) { - // Fallback to original structure if parsing fails - return output - } - } + // Filter outputs that have matching logs (exactly like chat panel) + const outputsToRender = selectedOutputIds.filter((outputId) => { + const blockIdForOutput = extractBlockIdFromOutputId(outputId) + return nonStreamingLogs.some((log) => log.blockId === blockIdForOutput) + }) - return output - } + // Process each selected output (exactly like chat panel) + for (const outputId of outputsToRender) { + const blockIdForOutput = extractBlockIdFromOutputId(outputId) + const path = extractPathFromOutputId(outputId, blockIdForOutput) + const log = nonStreamingLogs.find((l) => l.blockId === blockIdForOutput) - // Filter outputs that have matching logs (exactly like chat panel) - const outputsToRender = selectedOutputIds.filter((outputId) => { - const blockIdForOutput = extractBlockIdFromOutputId(outputId) - return nonStreamingLogs.some((log) => log.blockId === blockIdForOutput) - }) + if (log) { + let outputValue: any = log.output - // Process each selected output (exactly like chat panel) - for (const outputId of outputsToRender) { - const blockIdForOutput = extractBlockIdFromOutputId(outputId) - const path = extractPathFromOutputId(outputId, blockIdForOutput) - const log = nonStreamingLogs.find((l) => l.blockId === blockIdForOutput) + if (path) { + // Parse JSON content safely (exactly like chat panel) + outputValue = parseOutputContentSafely(outputValue) - if (log) { - let outputValue: any = log.output - - if (path) { - // Parse JSON content safely (exactly like chat panel) - outputValue = parseOutputContentSafely(outputValue) - - const pathParts = path.split('.') - for (const part of pathParts) { - if (outputValue && typeof outputValue === 'object' && part in outputValue) { - outputValue = outputValue[part] - } else { - outputValue = undefined - break + const pathParts = path.split('.') + for (const part of pathParts) { + if (outputValue && typeof outputValue === 'object' && part in outputValue) { + outputValue = outputValue[part] + } else { + outputValue = undefined + break + } } } - } - if (outputValue !== undefined) { - // Add newline separation between different outputs - const separator = processedOutputs.size > 0 ? '\n\n' : '' + if (outputValue !== undefined) { + // Add newline separation between different outputs + const separator = processedOutputs.size > 0 ? '\n\n' : '' - // Format the output exactly like the chat panel - const formattedOutput = - typeof outputValue === 'string' ? outputValue : JSON.stringify(outputValue, null, 2) + // Format the output exactly like the chat panel + const formattedOutput = + typeof outputValue === 'string' + ? outputValue + : JSON.stringify(outputValue, null, 2) - // Update the log content - if (!log.output.content) { - log.output.content = separator + formattedOutput - } else { - log.output.content = separator + formattedOutput + // Update the log content + if (!log.output.content) { + log.output.content = separator + formattedOutput + } else { + log.output.content = separator + formattedOutput + } + processedOutputs.add(log.blockId) } - processedOutputs.add(log.blockId) + } + } + + // Process all logs for streaming tokenization + const processedCount = processStreamingBlockLogs(executionResult.logs, streamedContent) + logger.info(`Processed ${processedCount} blocks for streaming tokenization`) + + const { traceSpans, totalDuration } = buildTraceSpans(executionResult) + const enrichedResult = { ...executionResult, traceSpans, totalDuration } + if (conversationId) { + if (!enrichedResult.metadata) { + enrichedResult.metadata = { + duration: totalDuration, + startTime: new Date().toISOString(), + } + } + ;(enrichedResult.metadata as any).conversationId = conversationId + } + // Use the executionId created at the beginning of this function + logger.debug(`Using execution ID for deployed chat: ${executionId}`) + + if (executionResult.success) { + try { + await db + .update(userStats) + .set({ + totalChatExecutions: sql`total_chat_executions + 1`, + lastActive: new Date(), + }) + .where(eq(userStats.userId, deployment.userId)) + logger.debug(`Updated user stats for deployed chat: ${deployment.userId}`) + } catch (error) { + logger.error(`Failed to update user stats for deployed chat:`, error) } } } - // Process all logs for streaming tokenization - const processedCount = processStreamingBlockLogs(executionResult.logs, streamedContent) - logger.info(`Processed ${processedCount} blocks for streaming tokenization`) - - const { traceSpans, totalDuration } = buildTraceSpans(executionResult) - const enrichedResult = { ...executionResult, traceSpans, totalDuration } - if (conversationId) { - if (!enrichedResult.metadata) { - enrichedResult.metadata = { - duration: totalDuration, - startTime: new Date().toISOString(), - } - } - ;(enrichedResult.metadata as any).conversationId = conversationId + if (!(result && typeof result === 'object' && 'stream' in result)) { + controller.enqueue( + encoder.encode(`data: ${JSON.stringify({ event: 'final', data: result })}\n\n`) + ) } - const executionId = uuidv4() - logger.debug(`Generated execution ID for deployed chat: ${executionId}`) - if (executionResult.success) { - try { - await db - .update(userStats) - .set({ - totalChatExecutions: sql`total_chat_executions + 1`, - lastActive: new Date(), - }) - .where(eq(userStats.userId, deployment.userId)) - logger.debug(`Updated user stats for deployed chat: ${deployment.userId}`) - } catch (error) { - logger.error(`Failed to update user stats for deployed chat:`, error) - } + if (!sessionCompleted) { + const resultForTracing = + executionResult || ({ success: true, output: {}, logs: [] } as ExecutionResult) + const { traceSpans } = buildTraceSpans(resultForTracing) + await loggingSession.safeComplete({ + endedAt: new Date().toISOString(), + totalDurationMs: executionResult?.metadata?.duration || 0, + finalOutput: executionResult?.output || {}, + traceSpans, + }) + sessionCompleted = true } - } - if (!(result && typeof result === 'object' && 'stream' in result)) { + controller.close() + } catch (error: any) { + // Handle any errors that occur in the stream + logger.error(`[${requestId}] Stream error:`, error) + + // Send error event to client + const encoder = new TextEncoder() controller.enqueue( - encoder.encode(`data: ${JSON.stringify({ event: 'final', data: result })}\n\n`) + encoder.encode( + `data: ${JSON.stringify({ + event: 'error', + error: error.message || 'An unexpected error occurred', + })}\n\n` + ) ) - } - // Complete logging session (for both success and failure) - if (executionResult?.logs) { - const { traceSpans } = buildTraceSpans(executionResult) - await loggingSession.safeComplete({ - endedAt: new Date().toISOString(), - totalDurationMs: executionResult.metadata?.duration || 0, - finalOutput: executionResult.output, - traceSpans, - }) - } + // Try to complete the logging session with error if not already completed + if (!sessionCompleted && loggingSession) { + await loggingSession.safeCompleteWithError({ + endedAt: new Date().toISOString(), + totalDurationMs: 0, + error: { message: error.message || 'Stream processing error' }, + }) + sessionCompleted = true + } - controller.close() + controller.close() + } }, }) diff --git a/apps/sim/app/chat/[subdomain]/chat.tsx b/apps/sim/app/chat/[subdomain]/chat.tsx index 941b65ad2..d6f9ce704 100644 --- a/apps/sim/app/chat/[subdomain]/chat.tsx +++ b/apps/sim/app/chat/[subdomain]/chat.tsx @@ -16,13 +16,11 @@ import { PasswordAuth, VoiceInterface, } from '@/app/chat/components' +import { CHAT_ERROR_MESSAGES, CHAT_REQUEST_TIMEOUT_MS } from '@/app/chat/constants' import { useAudioStreaming, useChatStreaming } from '@/app/chat/hooks' const logger = createLogger('ChatClient') -// Chat timeout configuration (5 minutes) -const CHAT_REQUEST_TIMEOUT_MS = 300000 - interface ChatConfig { id: string title: string @@ -237,7 +235,7 @@ export default function ChatClient({ subdomain }: { subdomain: string }) { } } catch (error) { logger.error('Error fetching chat config:', error) - setError('This chat is currently unavailable. Please try again later.') + setError(CHAT_ERROR_MESSAGES.CHAT_UNAVAILABLE) } } @@ -372,7 +370,7 @@ export default function ChatClient({ subdomain }: { subdomain: string }) { setIsLoading(false) const errorMessage: ChatMessage = { id: crypto.randomUUID(), - content: 'Sorry, there was an error processing your message. Please try again.', + content: CHAT_ERROR_MESSAGES.GENERIC_ERROR, type: 'assistant', timestamp: new Date(), } diff --git a/apps/sim/app/chat/constants.ts b/apps/sim/app/chat/constants.ts new file mode 100644 index 000000000..babbdd00d --- /dev/null +++ b/apps/sim/app/chat/constants.ts @@ -0,0 +1,15 @@ +export const CHAT_ERROR_MESSAGES = { + GENERIC_ERROR: 'Sorry, there was an error processing your message. Please try again.', + NETWORK_ERROR: 'Unable to connect to the server. Please check your connection and try again.', + TIMEOUT_ERROR: 'Request timed out. Please try again.', + AUTH_REQUIRED_PASSWORD: 'This chat requires a password to access.', + AUTH_REQUIRED_EMAIL: 'Please provide your email to access this chat.', + CHAT_UNAVAILABLE: 'This chat is currently unavailable. Please try again later.', + NO_CHAT_TRIGGER: + 'No Chat trigger configured for this workflow. Add a Chat Trigger block to enable chat execution.', + USAGE_LIMIT_EXCEEDED: 'Usage limit exceeded. Please upgrade your plan to continue using chat.', +} as const + +export const CHAT_REQUEST_TIMEOUT_MS = 300000 // 5 minutes (same as in chat.tsx) + +export type ChatErrorType = keyof typeof CHAT_ERROR_MESSAGES diff --git a/apps/sim/app/chat/hooks/use-chat-streaming.ts b/apps/sim/app/chat/hooks/use-chat-streaming.ts index b8ad400a0..01b1a32a7 100644 --- a/apps/sim/app/chat/hooks/use-chat-streaming.ts +++ b/apps/sim/app/chat/hooks/use-chat-streaming.ts @@ -3,6 +3,7 @@ import { useRef, useState } from 'react' import { createLogger } from '@/lib/logs/console/logger' import type { ChatMessage } from '@/app/chat/components/message/message' +import { CHAT_ERROR_MESSAGES } from '@/app/chat/constants' // No longer need complex output extraction - backend handles this import type { ExecutionResult } from '@/executor/types' @@ -151,6 +152,25 @@ export function useChatStreaming() { const json = JSON.parse(line.substring(6)) const { blockId, chunk: contentChunk, event: eventType } = json + // Handle error events from the server + if (eventType === 'error' || json.event === 'error') { + const errorMessage = json.error || CHAT_ERROR_MESSAGES.GENERIC_ERROR + setMessages((prev) => + prev.map((msg) => + msg.id === messageId + ? { + ...msg, + content: errorMessage, + isStreaming: false, + type: 'assistant' as const, + } + : msg + ) + ) + setIsLoading(false) + return + } + if (eventType === 'final' && json.data) { // The backend has already processed and combined all outputs // We just need to extract the combined content and use it diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts index e5ca24009..67c790ef8 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts @@ -671,6 +671,19 @@ export function useWorkflowExecution() { selectedOutputIds = chatStore.getState().getSelectedWorkflowOutput(activeWorkflowId) } + // Helper to extract test values from inputFormat subblock + const extractTestValuesFromInputFormat = (inputFormatValue: any): Record => { + const testInput: Record = {} + if (Array.isArray(inputFormatValue)) { + inputFormatValue.forEach((field: any) => { + if (field && typeof field === 'object' && field.name && field.value !== undefined) { + testInput[field.name] = field.value + } + }) + } + return testInput + } + // Determine start block and workflow input based on execution type let startBlockId: string | undefined let finalWorkflowInput = workflowInput @@ -720,19 +733,12 @@ export function useWorkflowExecution() { // Extract test values from the API trigger's inputFormat if (selectedTrigger.type === 'api_trigger' || selectedTrigger.type === 'starter') { const inputFormatValue = selectedTrigger.subBlocks?.inputFormat?.value - if (Array.isArray(inputFormatValue)) { - const testInput: Record = {} - inputFormatValue.forEach((field: any) => { - if (field && typeof field === 'object' && field.name && field.value !== undefined) { - testInput[field.name] = field.value - } - }) + const testInput = extractTestValuesFromInputFormat(inputFormatValue) - // Use the test input as workflow input - if (Object.keys(testInput).length > 0) { - finalWorkflowInput = testInput - logger.info('Using API trigger test values for manual run:', testInput) - } + // Use the test input as workflow input + if (Object.keys(testInput).length > 0) { + finalWorkflowInput = testInput + logger.info('Using API trigger test values for manual run:', testInput) } } } @@ -741,18 +747,29 @@ export function useWorkflowExecution() { logger.error('Multiple API triggers found') setIsExecuting(false) throw error - } else if (manualTriggers.length === 1) { - // No API trigger, check for manual trigger - selectedTrigger = manualTriggers[0] + } else if (manualTriggers.length >= 1) { + // No API trigger, check for manual triggers + // Prefer manual_trigger over input_trigger for simple runs + const manualTrigger = manualTriggers.find((t) => t.type === 'manual_trigger') + const inputTrigger = manualTriggers.find((t) => t.type === 'input_trigger') + + selectedTrigger = manualTrigger || inputTrigger || manualTriggers[0] const blockEntry = entries.find(([, block]) => block === selectedTrigger) if (blockEntry) { selectedBlockId = blockEntry[0] + + // Extract test values from input trigger's inputFormat if it's an input_trigger + if (selectedTrigger.type === 'input_trigger') { + const inputFormatValue = selectedTrigger.subBlocks?.inputFormat?.value + const testInput = extractTestValuesFromInputFormat(inputFormatValue) + + // Use the test input as workflow input + if (Object.keys(testInput).length > 0) { + finalWorkflowInput = testInput + logger.info('Using Input trigger test values for manual run:', testInput) + } + } } - } else if (manualTriggers.length > 1) { - const error = new Error('Multiple Input Trigger blocks found. Keep only one.') - logger.error('Multiple input triggers found') - setIsExecuting(false) - throw error } else { // Fallback: Check for legacy starter block const starterBlock = Object.values(filteredStates).find((block) => block.type === 'starter') @@ -769,8 +786,8 @@ export function useWorkflowExecution() { } if (!selectedBlockId || !selectedTrigger) { - const error = new Error('Manual run requires an Input Trigger or API Trigger block') - logger.error('No input or API triggers found for manual run') + const error = new Error('Manual run requires a Manual, Input Form, or API Trigger block') + logger.error('No manual/input or API triggers found for manual run') setIsExecuting(false) throw error } diff --git a/apps/sim/blocks/blocks/input_trigger.ts b/apps/sim/blocks/blocks/input_trigger.ts index 08e2f2ccb..954c31bc1 100644 --- a/apps/sim/blocks/blocks/input_trigger.ts +++ b/apps/sim/blocks/blocks/input_trigger.ts @@ -1,9 +1,9 @@ import type { SVGProps } from 'react' import { createElement } from 'react' -import { Play } from 'lucide-react' +import { FormInput } from 'lucide-react' import type { BlockConfig } from '@/blocks/types' -const InputTriggerIcon = (props: SVGProps) => createElement(Play, props) +const InputTriggerIcon = (props: SVGProps) => createElement(FormInput, props) export const InputTriggerBlock: BlockConfig = { type: 'input_trigger', diff --git a/apps/sim/blocks/blocks/manual_trigger.ts b/apps/sim/blocks/blocks/manual_trigger.ts new file mode 100644 index 000000000..508ef3f79 --- /dev/null +++ b/apps/sim/blocks/blocks/manual_trigger.ts @@ -0,0 +1,31 @@ +import type { SVGProps } from 'react' +import { createElement } from 'react' +import { Play } from 'lucide-react' +import type { BlockConfig } from '@/blocks/types' + +const ManualTriggerIcon = (props: SVGProps) => createElement(Play, props) + +export const ManualTriggerBlock: BlockConfig = { + type: 'manual_trigger', + name: 'Manual', + description: 'Start workflow manually from the editor', + longDescription: + 'Trigger the workflow manually without defining an input schema. Useful for simple runs where no structured input is needed.', + bestPractices: ` + - Use when you want a simple manual start without defining an input format. + - If you need structured inputs or child workflows to map variables from, prefer the Input Form Trigger. + `, + category: 'triggers', + bgColor: '#2563EB', + icon: ManualTriggerIcon, + subBlocks: [], + tools: { + access: [], + }, + inputs: {}, + outputs: {}, + triggers: { + enabled: true, + available: ['manual'], + }, +} diff --git a/apps/sim/blocks/blocks/workflow_input.ts b/apps/sim/blocks/blocks/workflow_input.ts index 774e18ff7..de5d6014f 100644 --- a/apps/sim/blocks/blocks/workflow_input.ts +++ b/apps/sim/blocks/blocks/workflow_input.ts @@ -19,8 +19,8 @@ const getAvailableWorkflows = (): Array<{ label: string; id: string }> => { export const WorkflowInputBlock: BlockConfig = { type: 'workflow_input', name: 'Workflow', - description: 'Execute another workflow and map variables to its Input Trigger schema.', - longDescription: `Execute another child workflow and map variables to its Input Trigger schema. Helps with modularizing workflows.`, + description: 'Execute another workflow and map variables to its Input Form Trigger schema.', + longDescription: `Execute another child workflow and map variables to its Input Form Trigger schema. Helps with modularizing workflows.`, bestPractices: ` - Usually clarify/check if the user has tagged a workflow to use as the child workflow. Understand the child workflow to determine the logical position of this block in the workflow. - Remember, that the start point of the child workflow is the Input Form Trigger block. diff --git a/apps/sim/blocks/registry.ts b/apps/sim/blocks/registry.ts index f049a349b..f225a1c3f 100644 --- a/apps/sim/blocks/registry.ts +++ b/apps/sim/blocks/registry.ts @@ -38,6 +38,7 @@ import { JiraBlock } from '@/blocks/blocks/jira' import { KnowledgeBlock } from '@/blocks/blocks/knowledge' import { LinearBlock } from '@/blocks/blocks/linear' import { LinkupBlock } from '@/blocks/blocks/linkup' +import { ManualTriggerBlock } from '@/blocks/blocks/manual_trigger' import { McpBlock } from '@/blocks/blocks/mcp' import { Mem0Block } from '@/blocks/blocks/mem0' import { MemoryBlock } from '@/blocks/blocks/memory' @@ -153,6 +154,7 @@ export const registry: Record = { starter: StarterBlock, input_trigger: InputTriggerBlock, chat_trigger: ChatTriggerBlock, + manual_trigger: ManualTriggerBlock, api_trigger: ApiTriggerBlock, supabase: SupabaseBlock, tavily: TavilyBlock, diff --git a/apps/sim/executor/index.ts b/apps/sim/executor/index.ts index e2cbf95d7..b50d0ae7a 100644 --- a/apps/sim/executor/index.ts +++ b/apps/sim/executor/index.ts @@ -1,6 +1,7 @@ import { BlockPathCalculator } from '@/lib/block-path-calculator' import { createLogger } from '@/lib/logs/console/logger' import type { TraceSpan } from '@/lib/logs/types' +import { getBlock } from '@/blocks' import type { BlockOutput } from '@/blocks/types' import { BlockType } from '@/executor/consts' import { @@ -1779,9 +1780,16 @@ export class Executor { context.blockLogs.push(blockLog) - // Skip console logging for infrastructure blocks like loops and parallels + // Skip console logging for infrastructure blocks and trigger blocks // For streaming blocks, we'll add the console entry after stream processing - if (block.metadata?.id !== BlockType.LOOP && block.metadata?.id !== BlockType.PARALLEL) { + const blockConfig = getBlock(block.metadata?.id || '') + const isTriggerBlock = + blockConfig?.category === 'triggers' || block.metadata?.id === BlockType.STARTER + if ( + block.metadata?.id !== BlockType.LOOP && + block.metadata?.id !== BlockType.PARALLEL && + !isTriggerBlock + ) { // Determine iteration context for this block let iterationCurrent: number | undefined let iterationTotal: number | undefined @@ -1889,8 +1897,15 @@ export class Executor { context.blockLogs.push(blockLog) - // Skip console logging for infrastructure blocks like loops and parallels - if (block.metadata?.id !== BlockType.LOOP && block.metadata?.id !== BlockType.PARALLEL) { + // Skip console logging for infrastructure blocks and trigger blocks + const nonStreamBlockConfig = getBlock(block.metadata?.id || '') + const isNonStreamTriggerBlock = + nonStreamBlockConfig?.category === 'triggers' || block.metadata?.id === BlockType.STARTER + if ( + block.metadata?.id !== BlockType.LOOP && + block.metadata?.id !== BlockType.PARALLEL && + !isNonStreamTriggerBlock + ) { // Determine iteration context for this block let iterationCurrent: number | undefined let iterationTotal: number | undefined @@ -2001,8 +2016,15 @@ export class Executor { // Log the error even if we'll continue execution through error path context.blockLogs.push(blockLog) - // Skip console logging for infrastructure blocks like loops and parallels - if (block.metadata?.id !== BlockType.LOOP && block.metadata?.id !== BlockType.PARALLEL) { + // Skip console logging for infrastructure blocks and trigger blocks + const errorBlockConfig = getBlock(block.metadata?.id || '') + const isErrorTriggerBlock = + errorBlockConfig?.category === 'triggers' || block.metadata?.id === BlockType.STARTER + if ( + block.metadata?.id !== BlockType.LOOP && + block.metadata?.id !== BlockType.PARALLEL && + !isErrorTriggerBlock + ) { // Determine iteration context for this block let iterationCurrent: number | undefined let iterationTotal: number | undefined diff --git a/apps/sim/lib/workflows/triggers.ts b/apps/sim/lib/workflows/triggers.ts index 218046c96..c2f550110 100644 --- a/apps/sim/lib/workflows/triggers.ts +++ b/apps/sim/lib/workflows/triggers.ts @@ -5,6 +5,7 @@ import { getBlock } from '@/blocks' */ export const TRIGGER_TYPES = { INPUT: 'input_trigger', + MANUAL: 'manual_trigger', CHAT: 'chat_trigger', API: 'api_trigger', WEBHOOK: 'webhook', @@ -81,7 +82,7 @@ export class TriggerUtils { * Check if a block is a manual-compatible trigger */ static isManualTrigger(block: { type: string; subBlocks?: any }): boolean { - if (block.type === TRIGGER_TYPES.INPUT) { + if (block.type === TRIGGER_TYPES.INPUT || block.type === TRIGGER_TYPES.MANUAL) { return true } @@ -139,6 +140,8 @@ export class TriggerUtils { return 'Chat' case TRIGGER_TYPES.INPUT: return 'Input Trigger' + case TRIGGER_TYPES.MANUAL: + return 'Manual' case TRIGGER_TYPES.API: return 'API' case TRIGGER_TYPES.WEBHOOK: @@ -216,12 +219,14 @@ export class TriggerUtils { * Check if a trigger type requires single instance constraint */ static requiresSingleInstance(triggerType: string): boolean { - // API and Input triggers cannot coexist with each other - // Chat trigger must be unique - // Schedules and webhooks can coexist with API/Input triggers + // Each trigger type can only have one instance of itself + // Manual and Input Form can coexist + // API, Chat triggers must be unique + // Schedules and webhooks can have multiple instances return ( triggerType === TRIGGER_TYPES.API || triggerType === TRIGGER_TYPES.INPUT || + triggerType === TRIGGER_TYPES.MANUAL || triggerType === TRIGGER_TYPES.CHAT ) } @@ -244,11 +249,12 @@ export class TriggerUtils { const blockArray = Array.isArray(blocks) ? blocks : Object.values(blocks) const hasLegacyStarter = TriggerUtils.hasLegacyStarter(blocks) - // Legacy starter block can't coexist with Chat, Input, or API triggers + // Legacy starter block can't coexist with Chat, Input, Manual, or API triggers if (hasLegacyStarter) { if ( triggerType === TRIGGER_TYPES.CHAT || triggerType === TRIGGER_TYPES.INPUT || + triggerType === TRIGGER_TYPES.MANUAL || triggerType === TRIGGER_TYPES.API ) { return true @@ -260,6 +266,7 @@ export class TriggerUtils { (block) => block.type === TRIGGER_TYPES.CHAT || block.type === TRIGGER_TYPES.INPUT || + block.type === TRIGGER_TYPES.MANUAL || block.type === TRIGGER_TYPES.API ) if (hasModernTriggers) { @@ -272,6 +279,11 @@ export class TriggerUtils { return blockArray.some((block) => block.type === TRIGGER_TYPES.INPUT) } + // Only one Manual trigger allowed + if (triggerType === TRIGGER_TYPES.MANUAL) { + return blockArray.some((block) => block.type === TRIGGER_TYPES.MANUAL) + } + // Only one API trigger allowed if (triggerType === TRIGGER_TYPES.API) { return blockArray.some((block) => block.type === TRIGGER_TYPES.API)