fix(executor): streaming after tool calls (#1963)

* Provider changes

* Fix lint
This commit is contained in:
Siddharth Ganesan
2025-11-13 12:24:26 -08:00
committed by GitHub
parent 4e5b834433
commit a70f2a6690
11 changed files with 155 additions and 27 deletions

View File

@@ -21,6 +21,7 @@ import type {
ExecutionContext,
NormalizedBlockOutput,
} from '@/executor/types'
import { streamingResponseFormatProcessor } from '@/executor/utils'
import { buildBlockExecutionError, normalizeError } from '@/executor/utils/errors'
import type { VariableResolver } from '@/executor/variables/resolver'
import type { SerializedBlock } from '@/serializer/types'
@@ -100,11 +101,14 @@ export class BlockExecutor {
const streamingExec = output as { stream: ReadableStream; execution: any }
if (ctx.onStream) {
try {
await ctx.onStream(streamingExec)
} catch (error) {
logger.error('Error in onStream callback', { blockId: node.id, error })
}
await this.handleStreamingExecution(
ctx,
node,
block,
streamingExec,
resolvedInputs,
ctx.selectedOutputs ?? []
)
}
normalizedOutput = this.normalizeOutput(
@@ -446,4 +450,128 @@ export class BlockExecutor {
}
}
}
private async handleStreamingExecution(
ctx: ExecutionContext,
node: DAGNode,
block: SerializedBlock,
streamingExec: { stream: ReadableStream; execution: any },
resolvedInputs: Record<string, any>,
selectedOutputs: string[]
): Promise<void> {
const blockId = node.id
const responseFormat =
resolvedInputs?.responseFormat ??
(block.config?.params as Record<string, any> | undefined)?.responseFormat ??
(block.config as Record<string, any> | undefined)?.responseFormat
const stream = streamingExec.stream
if (typeof stream.tee !== 'function') {
await this.forwardStream(ctx, blockId, streamingExec, stream, responseFormat, selectedOutputs)
return
}
const [clientStream, executorStream] = stream.tee()
const processedClientStream = streamingResponseFormatProcessor.processStream(
clientStream,
blockId,
selectedOutputs,
responseFormat
)
const clientStreamingExec = {
...streamingExec,
stream: processedClientStream,
}
const executorConsumption = this.consumeExecutorStream(
executorStream,
streamingExec,
blockId,
responseFormat
)
const clientConsumption = (async () => {
try {
await ctx.onStream?.(clientStreamingExec)
} catch (error) {
logger.error('Error in onStream callback', { blockId, error })
}
})()
await Promise.all([clientConsumption, executorConsumption])
}
private async forwardStream(
ctx: ExecutionContext,
blockId: string,
streamingExec: { stream: ReadableStream; execution: any },
stream: ReadableStream,
responseFormat: any,
selectedOutputs: string[]
): Promise<void> {
const processedStream = streamingResponseFormatProcessor.processStream(
stream,
blockId,
selectedOutputs,
responseFormat
)
try {
await ctx.onStream?.({
...streamingExec,
stream: processedStream,
})
} catch (error) {
logger.error('Error in onStream callback', { blockId, error })
}
}
private async consumeExecutorStream(
stream: ReadableStream,
streamingExec: { execution: any },
blockId: string,
responseFormat: any
): Promise<void> {
const reader = stream.getReader()
const decoder = new TextDecoder()
let fullContent = ''
try {
while (true) {
const { done, value } = await reader.read()
if (done) break
fullContent += decoder.decode(value, { stream: true })
}
} catch (error) {
logger.error('Error reading executor stream for block', { blockId, error })
} finally {
try {
reader.releaseLock()
} catch {}
}
if (!fullContent) {
return
}
const executionOutput = streamingExec.execution?.output
if (!executionOutput || typeof executionOutput !== 'object') {
return
}
if (responseFormat) {
try {
const parsed = JSON.parse(fullContent.trim())
Object.assign(executionOutput, parsed)
return
} catch (error) {
logger.warn('Failed to parse streamed content for response format', { blockId, error })
}
}
executionOutput.content = fullContent
}
}

View File

@@ -985,9 +985,9 @@ ${fieldDescriptions}
const providerEndTimeISO = new Date(providerEndTime).toISOString()
const totalDuration = providerEndTime - providerStartTime
// After all tool processing complete, if streaming was requested and we have messages, use streaming for the final response
if (request.stream && iterationCount > 0) {
logger.info('Using streaming for final Anthropic response after tool calls')
// After all tool processing complete, if streaming was requested, use streaming for the final response
if (request.stream) {
logger.info('Using streaming for final Anthropic response after tool processing')
// When streaming after tool calls with forced tools, make sure tool_choice is removed
// This prevents the API from trying to force tool usage again in the final streaming response

View File

@@ -523,9 +523,9 @@ export const azureOpenAIProvider: ProviderConfig = {
iterationCount++
}
// After all tool processing complete, if streaming was requested and we have messages, use streaming for the final response
if (request.stream && iterationCount > 0) {
logger.info('Using streaming for final response after tool calls')
// After all tool processing complete, if streaming was requested, use streaming for the final response
if (request.stream) {
logger.info('Using streaming for final response after tool processing')
// When streaming after tool calls with forced tools, make sure tool_choice is set to 'auto'
// This prevents Azure OpenAI API from trying to force tool usage again in the final streaming response

View File

@@ -455,8 +455,8 @@ export const cerebrasProvider: ProviderConfig = {
const totalDuration = providerEndTime - providerStartTime
// POST-TOOL-STREAMING: stream after tool calls if requested
if (request.stream && iterationCount > 0) {
logger.info('Using streaming for final Cerebras response after tool calls')
if (request.stream) {
logger.info('Using streaming for final Cerebras response after tool processing')
// When streaming after tool calls with forced tools, make sure tool_choice is set to 'auto'
// This prevents the API from trying to force tool usage again in the final streaming response

View File

@@ -457,8 +457,8 @@ export const deepseekProvider: ProviderConfig = {
const totalDuration = providerEndTime - providerStartTime
// POST-TOOL STREAMING: stream final response after tool calls if requested
if (request.stream && iterationCount > 0) {
logger.info('Using streaming for final DeepSeek response after tool calls')
if (request.stream) {
logger.info('Using streaming for final DeepSeek response after tool processing')
// When streaming after tool calls with forced tools, make sure tool_choice is set to 'auto'
// This prevents the API from trying to force tool usage again in the final streaming response

View File

@@ -374,8 +374,8 @@ export const groqProvider: ProviderConfig = {
}
// After all tool processing complete, if streaming was requested and we have messages, use streaming for the final response
if (request.stream && iterationCount > 0) {
logger.info('Using streaming for final Groq response after tool calls')
if (request.stream) {
logger.info('Using streaming for final Groq response after tool processing')
// When streaming after tool calls with forced tools, make sure tool_choice is set to 'auto'
// This prevents the API from trying to force tool usage again in the final streaming response

View File

@@ -447,8 +447,8 @@ export const mistralProvider: ProviderConfig = {
iterationCount++
}
if (request.stream && iterationCount > 0) {
logger.info('Using streaming for final response after tool calls')
if (request.stream) {
logger.info('Using streaming for final response after tool processing')
const streamingPayload = {
...payload,

View File

@@ -529,8 +529,8 @@ export const ollamaProvider: ProviderConfig = {
}
// After all tool processing complete, if streaming was requested and we have messages, use streaming for the final response
if (request.stream && iterationCount > 0) {
logger.info('Using streaming for final response after tool calls')
if (request.stream) {
logger.info('Using streaming for final response after tool processing')
const streamingPayload = {
...payload,

View File

@@ -504,9 +504,9 @@ export const openaiProvider: ProviderConfig = {
iterationCount++
}
// After all tool processing complete, if streaming was requested and we have messages, use streaming for the final response
if (request.stream && iterationCount > 0) {
logger.info('Using streaming for final response after tool calls')
// After all tool processing complete, if streaming was requested, use streaming for the final response
if (request.stream) {
logger.info('Using streaming for final response after tool processing')
// When streaming after tool calls with forced tools, make sure tool_choice is set to 'auto'
// This prevents OpenAI API from trying to force tool usage again in the final streaming response

View File

@@ -381,7 +381,7 @@ export const openRouterProvider: ProviderConfig = {
iterationCount++
}
if (request.stream && iterationCount > 0) {
if (request.stream) {
const streamingPayload = {
...payload,
messages: currentMessages,

View File

@@ -501,8 +501,8 @@ export const xAIProvider: ProviderConfig = {
})
}
// After all tool processing complete, if streaming was requested and we have messages, use streaming for the final response
if (request.stream && iterationCount > 0) {
// After all tool processing complete, if streaming was requested, use streaming for the final response
if (request.stream) {
// For final streaming response, choose between tools (auto) or response_format (never both)
let finalStreamingPayload: any