From 6170b8008662b7a87c4b62a4251fa8f438882d01 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Tue, 20 May 2025 12:33:46 -0700 Subject: [PATCH] fix(streaming): added ability to stream back responses for agent even if there are downstream blocks (#385) --- .../executor/handlers/agent/agent-handler.ts | 76 ++++++++++--------- apps/sim/executor/index.ts | 28 ++++++- 2 files changed, 66 insertions(+), 38 deletions(-) diff --git a/apps/sim/executor/handlers/agent/agent-handler.ts b/apps/sim/executor/handlers/agent/agent-handler.ts index b540ae50e..014596a8e 100644 --- a/apps/sim/executor/handlers/agent/agent-handler.ts +++ b/apps/sim/executor/handlers/agent/agent-handler.ts @@ -180,7 +180,7 @@ export class AgentBlockHandler implements BlockHandler { const hasOutgoingConnections = context.edges?.some((edge) => edge.source === block.id) ?? false // Determine if we should use streaming for this block - const shouldUseStreaming = context.stream && isBlockSelectedForOutput && !hasOutgoingConnections + const shouldUseStreaming = context.stream && isBlockSelectedForOutput if (shouldUseStreaming) { logger.info( @@ -189,68 +189,68 @@ export class AgentBlockHandler implements BlockHandler { } // Parse messages if they're in string format - let parsedMessages = inputs.messages; + let parsedMessages = inputs.messages if (typeof inputs.messages === 'string' && inputs.messages.trim()) { try { // Fast path: try standard JSON.parse first try { - parsedMessages = JSON.parse(inputs.messages); - logger.info('Successfully parsed messages from JSON format'); + parsedMessages = JSON.parse(inputs.messages) + logger.info('Successfully parsed messages from JSON format') } catch (jsonError) { // Fast direct approach for single-quoted JSON // Replace single quotes with double quotes, but keep single quotes inside double quotes // This optimized approach handles the most common cases in one pass const preprocessed = inputs.messages - // Ensure we have valid JSON by replacing all single quotes with double quotes, + // Ensure we have valid JSON by replacing all single quotes with double quotes, // except those inside existing double quotes .replace(/(['"])(.*?)\1/g, (match, quote, content) => { - if (quote === '"') return match; // Keep existing double quotes intact - return `"${content}"`; // Replace single quotes with double quotes - }); - + if (quote === '"') return match // Keep existing double quotes intact + return `"${content}"` // Replace single quotes with double quotes + }) + try { - parsedMessages = JSON.parse(preprocessed); - logger.info('Successfully parsed messages after single-quote preprocessing'); + parsedMessages = JSON.parse(preprocessed) + logger.info('Successfully parsed messages after single-quote preprocessing') } catch (preprocessError) { // Ultimate fallback: simply replace all single quotes try { - parsedMessages = JSON.parse(inputs.messages.replace(/'/g, '"')); - logger.info('Successfully parsed messages using direct quote replacement'); + parsedMessages = JSON.parse(inputs.messages.replace(/'/g, '"')) + logger.info('Successfully parsed messages using direct quote replacement') } catch (finalError) { - logger.error('All parsing attempts failed', { + logger.error('All parsing attempts failed', { original: inputs.messages, - error: finalError - }); + error: finalError, + }) // Keep original value } } } } catch (error) { - logger.error('Failed to parse messages from string:', { error }); + logger.error('Failed to parse messages from string:', { error }) // Keep original value if all parsing fails } } // Fast validation of parsed messages - const validMessages = Array.isArray(parsedMessages) && - parsedMessages.length > 0 && - parsedMessages.every(msg => - typeof msg === 'object' && - msg !== null && - 'role' in msg && - typeof msg.role === 'string' && - ( - 'content' in msg || - (msg.role === 'assistant' && ('function_call' in msg || 'tool_calls' in msg)) - ) - ); - + const validMessages = + Array.isArray(parsedMessages) && + parsedMessages.length > 0 && + parsedMessages.every( + (msg) => + typeof msg === 'object' && + msg !== null && + 'role' in msg && + typeof msg.role === 'string' && + ('content' in msg || + (msg.role === 'assistant' && ('function_call' in msg || 'tool_calls' in msg))) + ) + if (Array.isArray(parsedMessages) && parsedMessages.length > 0 && !validMessages) { - logger.warn('Messages array has invalid format:', { - messageCount: parsedMessages.length - }); + logger.warn('Messages array has invalid format:', { + messageCount: parsedMessages.length, + }) } else if (validMessages) { - logger.info('Messages validated successfully'); + logger.info('Messages validated successfully') } // Debug request before sending to provider @@ -280,7 +280,8 @@ export class AgentBlockHandler implements BlockHandler { logger.info(`Provider request prepared`, { model: providerRequest.model, hasMessages: Array.isArray(parsedMessages) && parsedMessages.length > 0, - hasSystemPrompt: !(Array.isArray(parsedMessages) && parsedMessages.length > 0) && !!inputs.systemPrompt, + hasSystemPrompt: + !(Array.isArray(parsedMessages) && parsedMessages.length > 0) && !!inputs.systemPrompt, hasContext: !(Array.isArray(parsedMessages) && parsedMessages.length > 0) && !!inputs.context, hasTools: !!providerRequest.tools, hasApiKey: !!providerRequest.apiKey, @@ -290,7 +291,10 @@ export class AgentBlockHandler implements BlockHandler { hasOutgoingConnections, // Debug info about messages to help diagnose issues messagesProvided: 'messages' in providerRequest, - messagesCount: 'messages' in providerRequest && Array.isArray(providerRequest.messages) ? providerRequest.messages.length : 0 + messagesCount: + 'messages' in providerRequest && Array.isArray(providerRequest.messages) + ? providerRequest.messages.length + : 0, }) const baseUrl = env.NEXT_PUBLIC_APP_URL || '' diff --git a/apps/sim/executor/index.ts b/apps/sim/executor/index.ts index 7fb27aa72..a00ceccd8 100644 --- a/apps/sim/executor/index.ts +++ b/apps/sim/executor/index.ts @@ -318,6 +318,30 @@ export class Executor { } } } + + // After the stream has fully completed and we've updated the + // final content, resume workflow execution for any + // downstream blocks (e.g. memory blocks) that depend on + // the agent response. + try { + // Determine the next blocks that are now unblocked. + let nextLayer = this.getNextExecutionLayer(context) + + while (nextLayer.length > 0) { + await this.executeLayer(nextLayer, context) + + // Handle any loop activations, etc. + await this.loopManager.processLoopIterations(context) + + // Fetch the subsequent layer (if any) + nextLayer = this.getNextExecutionLayer(context) + } + } catch (resumeError) { + logger.error( + 'Error continuing workflow after stream completion:', + resumeError + ) + } } catch (e) { logger.error('Error processing stream for console update:', e) } @@ -723,7 +747,7 @@ export class Executor { input: this.workflowInput, // Add top-level fields for backward compatibility message: this.workflowInput.input, - conversationId: this.workflowInput.conversationId + conversationId: this.workflowInput.conversationId, }, } @@ -755,7 +779,7 @@ export class Executor { response: { input: this.workflowInput, message: this.workflowInput?.input, - conversationId: this.workflowInput?.conversationId + conversationId: this.workflowInput?.conversationId, }, }