fix(chat): update stream to respect all output select objects (#2729)

This commit is contained in:
Waleed
2026-01-08 11:54:07 -08:00
committed by GitHub
parent a54fcbc094
commit fdac4314d2
2 changed files with 72 additions and 16 deletions

View File

@@ -473,6 +473,7 @@ export function Chat() {
/**
* Processes streaming response from workflow execution
* Reads the stream chunk by chunk and updates the message content in real-time
* When the final event arrives, extracts any additional selected outputs (model, tokens, toolCalls)
* @param stream - ReadableStream containing the workflow execution response
* @param responseMessageId - ID of the message to update with streamed content
*/
@@ -529,6 +530,35 @@ export function Chat() {
return
}
if (
selectedOutputs.length > 0 &&
'logs' in result &&
Array.isArray(result.logs) &&
activeWorkflowId
) {
const additionalOutputs: string[] = []
for (const outputId of selectedOutputs) {
const blockId = extractBlockIdFromOutputId(outputId)
const path = extractPathFromOutputId(outputId, blockId)
if (path === 'content') continue
const outputValue = extractOutputFromLogs(result.logs as BlockLog[], outputId)
if (outputValue !== undefined) {
const formattedValue =
typeof outputValue === 'string' ? outputValue : JSON.stringify(outputValue)
if (formattedValue) {
additionalOutputs.push(`**${path}:** ${formattedValue}`)
}
}
}
if (additionalOutputs.length > 0) {
appendMessageContent(responseMessageId, `\n\n${additionalOutputs.join('\n\n')}`)
}
}
finalizeMessageStream(responseMessageId)
} else if (contentChunk) {
accumulatedContent += contentChunk
@@ -552,7 +582,7 @@ export function Chat() {
focusInput(100)
}
},
[appendMessageContent, finalizeMessageStream, focusInput]
[appendMessageContent, finalizeMessageStream, focusInput, selectedOutputs, activeWorkflowId]
)
/**
@@ -564,7 +594,6 @@ export function Chat() {
if (!result || !activeWorkflowId) return
if (typeof result !== 'object') return
// Handle streaming response
if ('stream' in result && result.stream instanceof ReadableStream) {
const responseMessageId = crypto.randomUUID()
addMessage({
@@ -578,7 +607,6 @@ export function Chat() {
return
}
// Handle success with logs
if ('success' in result && result.success && 'logs' in result && Array.isArray(result.logs)) {
selectedOutputs
.map((outputId) => extractOutputFromLogs(result.logs as BlockLog[], outputId))
@@ -596,7 +624,6 @@ export function Chat() {
return
}
// Handle error response
if ('success' in result && !result.success) {
const errorMessage =
'error' in result && typeof result.error === 'string'
@@ -622,7 +649,6 @@ export function Chat() {
const sentMessage = chatMessage.trim()
// Update prompt history (only if new unique message)
if (sentMessage && promptHistory[promptHistory.length - 1] !== sentMessage) {
setPromptHistory((prev) => [...prev, sentMessage])
}
@@ -631,10 +657,8 @@ export function Chat() {
const conversationId = getConversationId(activeWorkflowId)
try {
// Process file attachments
const attachmentsWithData = await processFileAttachments(chatFiles)
// Add user message
const messageContent =
sentMessage || (chatFiles.length > 0 ? `Uploaded ${chatFiles.length} file(s)` : '')
addMessage({
@@ -644,7 +668,6 @@ export function Chat() {
attachments: attachmentsWithData,
})
// Prepare workflow input
const workflowInput: {
input: string
conversationId: string
@@ -667,13 +690,11 @@ export function Chat() {
}
}
// Clear input and files
setChatMessage('')
clearFiles()
clearErrors()
focusInput(10)
// Execute workflow
const result = await handleRunWorkflow(workflowInput)
handleWorkflowResponse(result)
} catch (error) {

View File

@@ -885,6 +885,7 @@ export function useWorkflowExecution() {
const activeBlocksSet = new Set<string>()
const streamedContent = new Map<string, string>()
const accumulatedBlockLogs: BlockLog[] = []
// Execute the workflow
try {
@@ -933,14 +934,30 @@ export function useWorkflowExecution() {
// Edges already tracked in onBlockStarted, no need to track again
const startedAt = new Date(Date.now() - data.durationMs).toISOString()
const endedAt = new Date().toISOString()
// Accumulate block log for the execution result
accumulatedBlockLogs.push({
blockId: data.blockId,
blockName: data.blockName || 'Unknown Block',
blockType: data.blockType || 'unknown',
input: data.input || {},
output: data.output,
success: true,
durationMs: data.durationMs,
startedAt,
endedAt,
})
// Add to console
addConsole({
input: data.input || {},
output: data.output,
success: true,
durationMs: data.durationMs,
startedAt: new Date(Date.now() - data.durationMs).toISOString(),
endedAt: new Date().toISOString(),
startedAt,
endedAt,
workflowId: activeWorkflowId,
blockId: data.blockId,
executionId: executionId || uuidv4(),
@@ -967,6 +984,24 @@ export function useWorkflowExecution() {
// Track failed block execution in run path
setBlockRunStatus(data.blockId, 'error')
const startedAt = new Date(Date.now() - data.durationMs).toISOString()
const endedAt = new Date().toISOString()
// Accumulate block error log for the execution result
accumulatedBlockLogs.push({
blockId: data.blockId,
blockName: data.blockName || 'Unknown Block',
blockType: data.blockType || 'unknown',
input: data.input || {},
output: {},
success: false,
error: data.error,
durationMs: data.durationMs,
startedAt,
endedAt,
})
// Add error to console
addConsole({
input: data.input || {},
@@ -974,8 +1009,8 @@ export function useWorkflowExecution() {
success: false,
error: data.error,
durationMs: data.durationMs,
startedAt: new Date(Date.now() - data.durationMs).toISOString(),
endedAt: new Date().toISOString(),
startedAt,
endedAt,
workflowId: activeWorkflowId,
blockId: data.blockId,
executionId: executionId || uuidv4(),
@@ -1029,7 +1064,7 @@ export function useWorkflowExecution() {
startTime: data.startTime,
endTime: data.endTime,
},
logs: [],
logs: accumulatedBlockLogs,
}
},
@@ -1041,7 +1076,7 @@ export function useWorkflowExecution() {
metadata: {
duration: data.duration,
},
logs: [],
logs: accumulatedBlockLogs,
}
// Only add workflow-level error if no blocks have executed yet