fix(response-format): add response format to tag dropdown, chat panel, and chat client (#637)

* add response format structure to tag dropdown

* handle response format outputs for chat client and chat panel, implemented the response format handling for streamed responses

* cleanup
This commit is contained in:
Waleed Latif
2025-07-08 15:49:31 -07:00
committed by GitHub
parent 0f21fbf705
commit 97021559cc
17 changed files with 1211 additions and 151 deletions

View File

@@ -194,6 +194,7 @@ export async function GET(
description: deployment.description,
customizations: deployment.customizations,
authType: deployment.authType,
outputConfigs: deployment.outputConfigs,
}),
request
)
@@ -219,6 +220,7 @@ export async function GET(
description: deployment.description,
customizations: deployment.customizations,
authType: deployment.authType,
outputConfigs: deployment.outputConfigs,
}),
request
)

View File

@@ -263,17 +263,26 @@ export async function executeWorkflowForChat(
let outputBlockIds: string[] = []
// Extract output configs from the new schema format
let selectedOutputIds: string[] = []
if (deployment.outputConfigs && Array.isArray(deployment.outputConfigs)) {
// Extract block IDs and paths from the new outputConfigs array format
// Extract output IDs in the format expected by the streaming processor
logger.debug(
`[${requestId}] Found ${deployment.outputConfigs.length} output configs in deployment`
)
deployment.outputConfigs.forEach((config) => {
selectedOutputIds = deployment.outputConfigs.map((config) => {
const outputId = config.path
? `${config.blockId}_${config.path}`
: `${config.blockId}.content`
logger.debug(
`[${requestId}] Processing output config: blockId=${config.blockId}, path=${config.path || 'none'}`
`[${requestId}] Processing output config: blockId=${config.blockId}, path=${config.path || 'content'} -> outputId=${outputId}`
)
return outputId
})
// Also extract block IDs for legacy compatibility
outputBlockIds = deployment.outputConfigs.map((config) => config.blockId)
} else {
// Use customizations as fallback
@@ -291,7 +300,9 @@ export async function executeWorkflowForChat(
outputBlockIds = customizations.outputBlockIds
}
logger.debug(`[${requestId}] Using ${outputBlockIds.length} output blocks for extraction`)
logger.debug(
`[${requestId}] Using ${outputBlockIds.length} output blocks and ${selectedOutputIds.length} selected output IDs for extraction`
)
// Find the workflow (deployedState is NOT deprecated - needed for chat execution)
const workflowResult = await db
@@ -457,7 +468,7 @@ export async function executeWorkflowForChat(
workflowVariables,
contextExtensions: {
stream: true,
selectedOutputIds: outputBlockIds,
selectedOutputIds: selectedOutputIds.length > 0 ? selectedOutputIds : outputBlockIds,
edges: edges.map((e: any) => ({
source: e.source,
target: e.target,

View File

@@ -33,6 +33,7 @@ interface ChatConfig {
headerText?: string
}
authType?: 'public' | 'password' | 'email'
outputConfigs?: Array<{ blockId: string; path?: string }>
}
interface AudioStreamingOptions {
@@ -373,8 +374,16 @@ export default function ChatClient({ subdomain }: { subdomain: string }) {
const json = JSON.parse(line.substring(6))
const { blockId, chunk: contentChunk, event: eventType } = json
if (eventType === 'final') {
if (eventType === 'final' && json.data) {
setIsLoading(false)
// Process final execution result for field extraction
const result = json.data
const nonStreamingLogs =
result.logs?.filter((log: any) => !messageIdMap.has(log.blockId)) || []
// Chat field extraction will be handled by the backend using deployment outputConfigs
return
}

View File

@@ -5,6 +5,12 @@ import { ArrowUp } from 'lucide-react'
import { Button } from '@/components/ui/button'
import { Input } from '@/components/ui/input'
import { ScrollArea } from '@/components/ui/scroll-area'
import { createLogger } from '@/lib/logs/console-logger'
import {
extractBlockIdFromOutputId,
extractPathFromOutputId,
parseOutputContentSafely,
} from '@/lib/response-format'
import type { BlockLog, ExecutionResult } from '@/executor/types'
import { useExecutionStore } from '@/stores/execution/store'
import { useChatStore } from '@/stores/panel/chat/store'
@@ -14,6 +20,8 @@ import { useWorkflowExecution } from '../../../../hooks/use-workflow-execution'
import { ChatMessage } from './components/chat-message/chat-message'
import { OutputSelect } from './components/output-select/output-select'
const logger = createLogger('ChatPanel')
interface ChatProps {
panelWidth: number
chatMessage: string
@@ -60,8 +68,8 @@ export function Chat({ panelWidth, chatMessage, setChatMessage }: ChatProps) {
const selected = selectedWorkflowOutputs[activeWorkflowId]
if (!selected || selected.length === 0) {
const defaultSelection = outputEntries.length > 0 ? [outputEntries[0].id] : []
return defaultSelection
// Return empty array when nothing is explicitly selected
return []
}
// Ensure we have no duplicates in the selection
@@ -74,7 +82,7 @@ export function Chat({ panelWidth, chatMessage, setChatMessage }: ChatProps) {
}
return selected
}, [selectedWorkflowOutputs, activeWorkflowId, outputEntries, setSelectedWorkflowOutput])
}, [selectedWorkflowOutputs, activeWorkflowId, setSelectedWorkflowOutput])
// Auto-scroll to bottom when new messages are added
useEffect(() => {
@@ -141,25 +149,22 @@ export function Chat({ panelWidth, chatMessage, setChatMessage }: ChatProps) {
if (nonStreamingLogs.length > 0) {
const outputsToRender = selectedOutputs.filter((outputId) => {
// Extract block ID correctly - handle both formats:
// - "blockId" (direct block ID)
// - "blockId_response.result" (block ID with path)
const blockIdForOutput = outputId.includes('_')
? outputId.split('_')[0]
: outputId.split('.')[0]
const blockIdForOutput = extractBlockIdFromOutputId(outputId)
return nonStreamingLogs.some((log) => log.blockId === blockIdForOutput)
})
for (const outputId of outputsToRender) {
const blockIdForOutput = outputId.includes('_')
? outputId.split('_')[0]
: outputId.split('.')[0]
const path = outputId.substring(blockIdForOutput.length + 1)
const blockIdForOutput = extractBlockIdFromOutputId(outputId)
const path = extractPathFromOutputId(outputId, blockIdForOutput)
const log = nonStreamingLogs.find((l) => l.blockId === blockIdForOutput)
if (log) {
let outputValue: any = log.output
if (path) {
// Parse JSON content safely
outputValue = parseOutputContentSafely(outputValue)
const pathParts = path.split('.')
for (const part of pathParts) {
if (
@@ -211,42 +216,41 @@ export function Chat({ panelWidth, chatMessage, setChatMessage }: ChatProps) {
}
}
} catch (e) {
console.error('Error parsing stream data:', e)
logger.error('Error parsing stream data:', e)
}
}
}
}
}
processStream().catch((e) => console.error('Error processing stream:', e))
processStream().catch((e) => logger.error('Error processing stream:', e))
} else if (result && 'success' in result && result.success && 'logs' in result) {
const finalOutputs: any[] = []
if (selectedOutputs && selectedOutputs.length > 0) {
if (selectedOutputs?.length > 0) {
for (const outputId of selectedOutputs) {
// Find the log that corresponds to the start of the outputId
const log = result.logs?.find(
(l: BlockLog) => l.blockId === outputId || outputId.startsWith(`${l.blockId}_`)
)
const blockIdForOutput = extractBlockIdFromOutputId(outputId)
const path = extractPathFromOutputId(outputId, blockIdForOutput)
const log = result.logs?.find((l: BlockLog) => l.blockId === blockIdForOutput)
if (log) {
let output = log.output
// Check if there is a path to traverse
if (outputId.length > log.blockId.length) {
const path = outputId.substring(log.blockId.length + 1)
if (path) {
const pathParts = path.split('.')
let current = output
for (const part of pathParts) {
if (current && typeof current === 'object' && part in current) {
current = current[part]
} else {
current = undefined
break
}
if (path) {
// Parse JSON content safely
output = parseOutputContentSafely(output)
const pathParts = path.split('.')
let current = output
for (const part of pathParts) {
if (current && typeof current === 'object' && part in current) {
current = current[part]
} else {
current = undefined
break
}
output = current
}
output = current
}
if (output !== undefined) {
finalOutputs.push(output)
@@ -255,10 +259,8 @@ export function Chat({ panelWidth, chatMessage, setChatMessage }: ChatProps) {
}
}
// If no specific outputs could be resolved, fall back to the final workflow output
if (finalOutputs.length === 0 && result.output) {
finalOutputs.push(result.output)
}
// Only show outputs if something was explicitly selected
// If no outputs are selected, don't show anything
// Add a new message for each resolved output
finalOutputs.forEach((output) => {
@@ -266,19 +268,8 @@ export function Chat({ panelWidth, chatMessage, setChatMessage }: ChatProps) {
if (typeof output === 'string') {
content = output
} else if (output && typeof output === 'object') {
// Handle cases where output is { response: ... }
const outputObj = output as Record<string, any>
const response = outputObj.response
if (response) {
if (typeof response.content === 'string') {
content = response.content
} else {
// Pretty print for better readability
content = `\`\`\`json\n${JSON.stringify(response, null, 2)}\n\`\`\``
}
} else {
content = `\`\`\`json\n${JSON.stringify(output, null, 2)}\n\`\`\``
}
// For structured responses, pretty print the JSON
content = `\`\`\`json\n${JSON.stringify(output, null, 2)}\n\`\`\``
}
if (content) {

View File

@@ -1,8 +1,10 @@
import { useEffect, useMemo, useRef, useState } from 'react'
import { Check, ChevronDown } from 'lucide-react'
import { Button } from '@/components/ui/button'
import { extractFieldsFromSchema, parseResponseFormatSafely } from '@/lib/response-format'
import { cn } from '@/lib/utils'
import { getBlock } from '@/blocks'
import { useSubBlockStore } from '@/stores/workflows/subblock/store'
import { useWorkflowStore } from '@/stores/workflows/workflow/store'
interface OutputSelectProps {
@@ -48,8 +50,31 @@ export function OutputSelect({
? block.name.replace(/\s+/g, '').toLowerCase()
: `block-${block.id}`
// Check for custom response format first
const responseFormatValue = useSubBlockStore.getState().getValue(block.id, 'responseFormat')
const responseFormat = parseResponseFormatSafely(responseFormatValue, block.id)
let outputsToProcess: Record<string, any> = {}
if (responseFormat) {
// Use custom schema properties if response format is specified
const schemaFields = extractFieldsFromSchema(responseFormat)
if (schemaFields.length > 0) {
// Convert schema fields to output structure
schemaFields.forEach((field) => {
outputsToProcess[field.name] = { type: field.type }
})
} else {
// Fallback to default outputs if schema extraction failed
outputsToProcess = block.outputs || {}
}
} else {
// Use default block outputs
outputsToProcess = block.outputs || {}
}
// Add response outputs
if (block.outputs && typeof block.outputs === 'object') {
if (Object.keys(outputsToProcess).length > 0) {
const addOutput = (path: string, outputObj: any, prefix = '') => {
const fullPath = prefix ? `${prefix}.${path}` : path
@@ -100,7 +125,7 @@ export function OutputSelect({
}
// Process all output properties directly (flattened structure)
Object.entries(block.outputs).forEach(([key, value]) => {
Object.entries(outputsToProcess).forEach(([key, value]) => {
addOutput(key, value)
})
}

View File

@@ -125,35 +125,33 @@ export function ConsoleEntry({ entry, consoleWidth }: ConsoleEntryProps) {
<div className='flex items-start gap-2'>
<Terminal className='mt-1 h-4 w-4 text-muted-foreground' />
<div className='overflow-wrap-anywhere relative flex-1 whitespace-normal break-normal font-mono text-sm'>
{typeof entry.output === 'object' &&
entry.output !== null &&
hasNestedStructure(entry.output) && (
<div className='absolute top-0 right-0 z-10'>
<Button
variant='ghost'
size='sm'
className='h-6 px-2 text-muted-foreground hover:text-foreground'
onClick={(e) => {
e.stopPropagation()
setExpandAllJson(!expandAllJson)
}}
>
<span className='flex items-center'>
{expandAllJson ? (
<>
<ChevronUp className='mr-1 h-3 w-3' />
<span className='text-xs'>Collapse</span>
</>
) : (
<>
<ChevronDown className='mr-1 h-3 w-3' />
<span className='text-xs'>Expand</span>
</>
)}
</span>
</Button>
</div>
)}
{entry.output != null && (
<div className='absolute top-0 right-0 z-10'>
<Button
variant='ghost'
size='sm'
className='h-6 px-2 text-muted-foreground hover:text-foreground'
onClick={(e) => {
e.stopPropagation()
setExpandAllJson(!expandAllJson)
}}
>
<span className='flex items-center'>
{expandAllJson ? (
<>
<ChevronUp className='mr-1 h-3 w-3' />
<span className='text-xs'>Collapse</span>
</>
) : (
<>
<ChevronDown className='mr-1 h-3 w-3' />
<span className='text-xs'>Expand</span>
</>
)}
</span>
</Button>
</div>
)}
<JSONView data={entry.output} initiallyExpanded={expandAllJson} />
</div>
</div>

View File

@@ -217,10 +217,13 @@ export function useWorkflowExecution() {
result.logs.forEach((log: BlockLog) => {
if (streamedContent.has(log.blockId)) {
const content = streamedContent.get(log.blockId) || ''
if (log.output) {
log.output.content = content
}
useConsoleStore.getState().updateConsole(log.blockId, content)
// For console display, show the actual structured block output instead of formatted streaming content
// This ensures console logs match the block state structure
// Use replaceOutput to completely replace the output instead of merging
useConsoleStore.getState().updateConsole(log.blockId, {
replaceOutput: log.output,
success: true,
})
}
})

View File

@@ -1,7 +1,8 @@
import { describe, expect, test, vi } from 'vitest'
import { extractFieldsFromSchema, parseResponseFormatSafely } from '@/lib/response-format'
import type { BlockState } from '@/stores/workflows/workflow/types'
import { generateLoopBlocks } from '@/stores/workflows/workflow/utils'
import { checkTagTrigger, extractFieldsFromSchema } from './tag-dropdown'
import { checkTagTrigger } from './tag-dropdown'
vi.mock('@/stores/workflows/workflow/store', () => ({
useWorkflowStore: vi.fn(() => ({
@@ -24,6 +25,15 @@ vi.mock('@/stores/panel/variables/store', () => ({
})),
}))
vi.mock('@/stores/workflows/subblock/store', () => ({
useSubBlockStore: vi.fn(() => ({
getValue: vi.fn(() => null),
getState: vi.fn(() => ({
getValue: vi.fn(() => null),
})),
})),
}))
describe('TagDropdown Loop Suggestions', () => {
test('should generate correct loop suggestions for forEach loops', () => {
const blocks: Record<string, BlockState> = {
@@ -603,3 +613,180 @@ describe('TagDropdown Tag Selection Logic', () => {
})
})
})
describe('TagDropdown Response Format Support', () => {
it.concurrent(
'should use custom schema properties when response format is specified',
async () => {
// Mock the subblock store to return a custom response format
const mockGetValue = vi.fn()
const mockUseSubBlockStore = vi.mocked(
await import('@/stores/workflows/subblock/store')
).useSubBlockStore
// Set up the mock to return the example schema from the user
const responseFormatValue = JSON.stringify({
name: 'short_schema',
description: 'A minimal example schema with a single string property.',
strict: true,
schema: {
type: 'object',
properties: {
example_property: {
type: 'string',
description: 'A simple string property.',
},
},
additionalProperties: false,
required: ['example_property'],
},
})
mockGetValue.mockImplementation((blockId: string, subBlockId: string) => {
if (blockId === 'agent1' && subBlockId === 'responseFormat') {
return responseFormatValue
}
return null
})
mockUseSubBlockStore.mockReturnValue({
getValue: mockGetValue,
getState: () => ({
getValue: mockGetValue,
}),
} as any)
// Test the parseResponseFormatSafely function
const parsedFormat = parseResponseFormatSafely(responseFormatValue, 'agent1')
expect(parsedFormat).toEqual({
name: 'short_schema',
description: 'A minimal example schema with a single string property.',
strict: true,
schema: {
type: 'object',
properties: {
example_property: {
type: 'string',
description: 'A simple string property.',
},
},
additionalProperties: false,
required: ['example_property'],
},
})
// Test the extractFieldsFromSchema function with the parsed format
const fields = extractFieldsFromSchema(parsedFormat)
expect(fields).toEqual([
{
name: 'example_property',
type: 'string',
description: 'A simple string property.',
},
])
}
)
it.concurrent(
'should fallback to default outputs when response format parsing fails',
async () => {
// Test with invalid JSON
const invalidFormat = parseResponseFormatSafely('invalid json', 'agent1')
expect(invalidFormat).toBeNull()
// Test with null/undefined values
expect(parseResponseFormatSafely(null, 'agent1')).toBeNull()
expect(parseResponseFormatSafely(undefined, 'agent1')).toBeNull()
expect(parseResponseFormatSafely('', 'agent1')).toBeNull()
}
)
it.concurrent('should handle response format with nested schema correctly', async () => {
const responseFormat = {
schema: {
type: 'object',
properties: {
user: {
type: 'object',
description: 'User information',
properties: {
name: { type: 'string', description: 'User name' },
age: { type: 'number', description: 'User age' },
},
},
status: { type: 'string', description: 'Response status' },
},
},
}
const fields = extractFieldsFromSchema(responseFormat)
expect(fields).toEqual([
{ name: 'user', type: 'object', description: 'User information' },
{ name: 'status', type: 'string', description: 'Response status' },
])
})
it.concurrent('should handle response format without schema wrapper', async () => {
const responseFormat = {
type: 'object',
properties: {
result: { type: 'boolean', description: 'Operation result' },
message: { type: 'string', description: 'Status message' },
},
}
const fields = extractFieldsFromSchema(responseFormat)
expect(fields).toEqual([
{ name: 'result', type: 'boolean', description: 'Operation result' },
{ name: 'message', type: 'string', description: 'Status message' },
])
})
it.concurrent('should return object as-is when it is already parsed', async () => {
const responseFormat = {
name: 'test_schema',
schema: {
properties: {
data: { type: 'string' },
},
},
}
const result = parseResponseFormatSafely(responseFormat, 'agent1')
expect(result).toEqual(responseFormat)
})
it.concurrent('should simulate block tag generation with custom response format', async () => {
// Simulate the tag generation logic that would happen in the component
const blockName = 'Agent 1'
const normalizedBlockName = blockName.replace(/\s+/g, '').toLowerCase() // 'agent1'
// Mock response format
const responseFormat = {
schema: {
properties: {
example_property: { type: 'string', description: 'A simple string property.' },
another_field: { type: 'number', description: 'Another field.' },
},
},
}
const schemaFields = extractFieldsFromSchema(responseFormat)
// Generate block tags as they would be in the component
const blockTags = schemaFields.map((field) => `${normalizedBlockName}.${field.name}`)
expect(blockTags).toEqual(['agent1.example_property', 'agent1.another_field'])
// Verify the fields extracted correctly
expect(schemaFields).toEqual([
{ name: 'example_property', type: 'string', description: 'A simple string property.' },
{ name: 'another_field', type: 'number', description: 'Another field.' },
])
})
})

View File

@@ -1,18 +1,16 @@
import type React from 'react'
import { useCallback, useEffect, useMemo, useState } from 'react'
import { BlockPathCalculator } from '@/lib/block-path-calculator'
import { createLogger } from '@/lib/logs/console-logger'
import { extractFieldsFromSchema, parseResponseFormatSafely } from '@/lib/response-format'
import { cn } from '@/lib/utils'
import { getBlock } from '@/blocks'
import { Serializer } from '@/serializer'
import { useVariablesStore } from '@/stores/panel/variables/store'
import type { Variable } from '@/stores/panel/variables/types'
import { useWorkflowRegistry } from '@/stores/workflows/registry/store'
import { useSubBlockStore } from '@/stores/workflows/subblock/store'
import { useWorkflowStore } from '@/stores/workflows/workflow/store'
const logger = createLogger('TagDropdown')
// Type definitions for component data structures
interface BlockTagGroup {
blockName: string
blockId: string
@@ -21,49 +19,6 @@ interface BlockTagGroup {
distance: number
}
interface Field {
name: string
type: string
description?: string
}
// Helper function to extract fields from JSON Schema
export function extractFieldsFromSchema(schema: any): Field[] {
if (!schema || typeof schema !== 'object') {
return []
}
// Handle legacy format with fields array
if (Array.isArray(schema.fields)) {
return schema.fields
}
// Handle new JSON Schema format
const schemaObj = schema.schema || schema
if (!schemaObj || !schemaObj.properties || typeof schemaObj.properties !== 'object') {
return []
}
// Extract fields from schema properties
return Object.entries(schemaObj.properties).map(([name, prop]: [string, any]) => {
// Handle array format like ['string', 'array']
if (Array.isArray(prop)) {
return {
name,
type: prop.includes('array') ? 'array' : prop[0] || 'string',
description: undefined,
}
}
// Handle object format like { type: 'string', description: '...' }
return {
name,
type: prop.type || 'string',
description: prop.description,
}
})
}
interface TagDropdownProps {
visible: boolean
onSelect: (newValue: string) => void
@@ -208,11 +163,29 @@ export const TagDropdown: React.FC<TagDropdownProps> = ({
const blockName = sourceBlock.name || sourceBlock.type
const normalizedBlockName = blockName.replace(/\s+/g, '').toLowerCase()
// Handle blocks with no outputs (like starter) - show as just <blockname>
// Check for custom response format first
const responseFormatValue = useSubBlockStore
.getState()
.getValue(activeSourceBlockId, 'responseFormat')
const responseFormat = parseResponseFormatSafely(responseFormatValue, activeSourceBlockId)
let blockTags: string[]
if (Object.keys(blockConfig.outputs).length === 0) {
if (responseFormat) {
// Use custom schema properties if response format is specified
const schemaFields = extractFieldsFromSchema(responseFormat)
if (schemaFields.length > 0) {
blockTags = schemaFields.map((field) => `${normalizedBlockName}.${field.name}`)
} else {
// Fallback to default if schema extraction failed
const outputPaths = generateOutputPaths(blockConfig.outputs)
blockTags = outputPaths.map((path) => `${normalizedBlockName}.${path}`)
}
} else if (Object.keys(blockConfig.outputs).length === 0) {
// Handle blocks with no outputs (like starter) - show as just <blockname>
blockTags = [normalizedBlockName]
} else {
// Use default block outputs
const outputPaths = generateOutputPaths(blockConfig.outputs)
blockTags = outputPaths.map((path) => `${normalizedBlockName}.${path}`)
}
@@ -341,7 +314,7 @@ export const TagDropdown: React.FC<TagDropdownProps> = ({
)
let containingParallelBlockId: string | null = null
if (containingParallel) {
const [parallelId, parallel] = containingParallel
const [parallelId] = containingParallel
containingParallelBlockId = parallelId
const contextualTags: string[] = ['index', 'currentItem', 'items']
@@ -413,11 +386,29 @@ export const TagDropdown: React.FC<TagDropdownProps> = ({
const blockName = accessibleBlock.name || accessibleBlock.type
const normalizedBlockName = blockName.replace(/\s+/g, '').toLowerCase()
// Handle blocks with no outputs (like starter) - show as just <blockname>
// Check for custom response format first
const responseFormatValue = useSubBlockStore
.getState()
.getValue(accessibleBlockId, 'responseFormat')
const responseFormat = parseResponseFormatSafely(responseFormatValue, accessibleBlockId)
let blockTags: string[]
if (Object.keys(blockConfig.outputs).length === 0) {
if (responseFormat) {
// Use custom schema properties if response format is specified
const schemaFields = extractFieldsFromSchema(responseFormat)
if (schemaFields.length > 0) {
blockTags = schemaFields.map((field) => `${normalizedBlockName}.${field.name}`)
} else {
// Fallback to default if schema extraction failed
const outputPaths = generateOutputPaths(blockConfig.outputs)
blockTags = outputPaths.map((path) => `${normalizedBlockName}.${path}`)
}
} else if (Object.keys(blockConfig.outputs).length === 0) {
// Handle blocks with no outputs (like starter) - show as just <blockname>
blockTags = [normalizedBlockName]
} else {
// Use default block outputs
const outputPaths = generateOutputPaths(blockConfig.outputs)
blockTags = outputPaths.map((path) => `${normalizedBlockName}.${path}`)
}

View File

@@ -30,6 +30,7 @@ import type {
NormalizedBlockOutput,
StreamingExecution,
} from './types'
import { streamingResponseFormatProcessor } from './utils'
const logger = createLogger('Executor')
@@ -242,7 +243,25 @@ export class Executor {
const streamingExec = output as StreamingExecution
const [streamForClient, streamForExecutor] = streamingExec.stream.tee()
const clientStreamingExec = { ...streamingExec, stream: streamForClient }
// Apply response format processing to the client stream if needed
const blockId = (streamingExec.execution as any).blockId
// Get response format from initial block states (passed from useWorkflowExecution)
// The initialBlockStates contain the subblock values including responseFormat
let responseFormat: any
if (this.initialBlockStates?.[blockId]) {
const blockState = this.initialBlockStates[blockId] as any
responseFormat = blockState.responseFormat
}
const processedClientStream = streamingResponseFormatProcessor.processStream(
streamForClient,
blockId,
context.selectedOutputIds || [],
responseFormat
)
const clientStreamingExec = { ...streamingExec, stream: processedClientStream }
try {
// Handle client stream with proper error handling
@@ -267,7 +286,41 @@ export class Executor {
const blockId = (streamingExec.execution as any).blockId
const blockState = context.blockStates.get(blockId)
if (blockState?.output) {
blockState.output.content = fullContent
// Check if we have response format - if so, preserve structured response
let responseFormat: any
if (this.initialBlockStates?.[blockId]) {
const initialBlockState = this.initialBlockStates[blockId] as any
responseFormat = initialBlockState.responseFormat
}
if (responseFormat && fullContent) {
// For structured responses, always try to parse the raw streaming content
// The streamForExecutor contains the raw JSON response, not the processed display text
try {
const parsedContent = JSON.parse(fullContent)
// Preserve metadata but spread parsed fields at root level (same as manual execution)
const structuredOutput = {
...parsedContent,
tokens: blockState.output.tokens,
toolCalls: blockState.output.toolCalls,
providerTiming: blockState.output.providerTiming,
cost: blockState.output.cost,
}
blockState.output = structuredOutput
// Also update the corresponding block log with the structured output
const blockLog = context.blockLogs.find((log) => log.blockId === blockId)
if (blockLog) {
blockLog.output = structuredOutput
}
} catch (parseError) {
// If parsing fails, fall back to setting content
blockState.output.content = fullContent
}
} else {
// No response format, use standard content setting
blockState.output.content = fullContent
}
}
} catch (readerError: any) {
logger.error('Error reading stream for executor:', readerError)
@@ -275,7 +328,40 @@ export class Executor {
const blockId = (streamingExec.execution as any).blockId
const blockState = context.blockStates.get(blockId)
if (blockState?.output && fullContent) {
blockState.output.content = fullContent
// Check if we have response format for error handling too
let responseFormat: any
if (this.initialBlockStates?.[blockId]) {
const initialBlockState = this.initialBlockStates[blockId] as any
responseFormat = initialBlockState.responseFormat
}
if (responseFormat) {
// For structured responses, always try to parse the raw streaming content
// The streamForExecutor contains the raw JSON response, not the processed display text
try {
const parsedContent = JSON.parse(fullContent)
const structuredOutput = {
...parsedContent,
tokens: blockState.output.tokens,
toolCalls: blockState.output.toolCalls,
providerTiming: blockState.output.providerTiming,
cost: blockState.output.cost,
}
blockState.output = structuredOutput
// Also update the corresponding block log with the structured output
const blockLog = context.blockLogs.find((log) => log.blockId === blockId)
if (blockLog) {
blockLog.output = structuredOutput
}
} catch (parseError) {
// If parsing fails, fall back to setting content
blockState.output.content = fullContent
}
} else {
// No response format, use standard content setting
blockState.output.content = fullContent
}
}
} finally {
try {
@@ -1257,6 +1343,7 @@ export class Executor {
context.blockLogs.push(blockLog)
// Skip console logging for infrastructure blocks like loops and parallels
// For streaming blocks, we'll add the console entry after stream processing
if (block.metadata?.id !== 'loop' && block.metadata?.id !== 'parallel') {
addConsole({
output: blockLog.output,

View File

@@ -269,3 +269,15 @@ export interface Tool<P = any, O = Record<string, any>> {
export interface ToolRegistry {
[key: string]: Tool
}
/**
* Interface for a stream processor that can process a stream based on a response format.
*/
export interface ResponseFormatStreamProcessor {
processStream(
originalStream: ReadableStream,
blockId: string,
selectedOutputIds: string[],
responseFormat?: any
): ReadableStream
}

View File

@@ -0,0 +1,354 @@
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
import { StreamingResponseFormatProcessor, streamingResponseFormatProcessor } from './utils'
vi.mock('@/lib/logs/console-logger', () => ({
createLogger: vi.fn().mockReturnValue({
debug: vi.fn(),
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
}),
}))
describe('StreamingResponseFormatProcessor', () => {
let processor: StreamingResponseFormatProcessor
beforeEach(() => {
processor = new StreamingResponseFormatProcessor()
})
afterEach(() => {
vi.clearAllMocks()
})
describe('processStream', () => {
it.concurrent('should return original stream when no response format selection', async () => {
const mockStream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode('{"content": "test"}'))
controller.close()
},
})
const result = processor.processStream(
mockStream,
'block-1',
['block-1.content'], // No underscore, not response format
{ schema: { properties: { username: { type: 'string' } } } }
)
expect(result).toBe(mockStream)
})
it.concurrent('should return original stream when no response format provided', async () => {
const mockStream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode('{"content": "test"}'))
controller.close()
},
})
const result = processor.processStream(
mockStream,
'block-1',
['block-1_username'], // Has underscore but no response format
undefined
)
expect(result).toBe(mockStream)
})
it.concurrent('should process stream and extract single selected field', async () => {
const mockStream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode('{"username": "alice", "age": 25}'))
controller.close()
},
})
const processedStream = processor.processStream(mockStream, 'block-1', ['block-1_username'], {
schema: { properties: { username: { type: 'string' }, age: { type: 'number' } } },
})
const reader = processedStream.getReader()
const decoder = new TextDecoder()
let result = ''
while (true) {
const { done, value } = await reader.read()
if (done) break
result += decoder.decode(value)
}
expect(result).toBe('alice')
})
it.concurrent('should process stream and extract multiple selected fields', async () => {
const mockStream = new ReadableStream({
start(controller) {
controller.enqueue(
new TextEncoder().encode('{"username": "bob", "age": 30, "email": "bob@test.com"}')
)
controller.close()
},
})
const processedStream = processor.processStream(
mockStream,
'block-1',
['block-1_username', 'block-1_age'],
{ schema: { properties: { username: { type: 'string' }, age: { type: 'number' } } } }
)
const reader = processedStream.getReader()
const decoder = new TextDecoder()
let result = ''
while (true) {
const { done, value } = await reader.read()
if (done) break
result += decoder.decode(value)
}
expect(result).toBe('bob\n30')
})
it.concurrent('should handle non-string field values by JSON stringifying them', async () => {
const mockStream = new ReadableStream({
start(controller) {
controller.enqueue(
new TextEncoder().encode(
'{"config": {"theme": "dark", "notifications": true}, "count": 42}'
)
)
controller.close()
},
})
const processedStream = processor.processStream(
mockStream,
'block-1',
['block-1_config', 'block-1_count'],
{ schema: { properties: { config: { type: 'object' }, count: { type: 'number' } } } }
)
const reader = processedStream.getReader()
const decoder = new TextDecoder()
let result = ''
while (true) {
const { done, value } = await reader.read()
if (done) break
result += decoder.decode(value)
}
expect(result).toBe('{"theme":"dark","notifications":true}\n42')
})
it.concurrent('should handle streaming JSON that comes in chunks', async () => {
const mockStream = new ReadableStream({
start(controller) {
// Simulate streaming JSON in chunks
controller.enqueue(new TextEncoder().encode('{"username": "charlie"'))
controller.enqueue(new TextEncoder().encode(', "age": 35}'))
controller.close()
},
})
const processedStream = processor.processStream(mockStream, 'block-1', ['block-1_username'], {
schema: { properties: { username: { type: 'string' }, age: { type: 'number' } } },
})
const reader = processedStream.getReader()
const decoder = new TextDecoder()
let result = ''
while (true) {
const { done, value } = await reader.read()
if (done) break
result += decoder.decode(value)
}
expect(result).toBe('charlie')
})
it.concurrent('should handle missing fields gracefully', async () => {
const mockStream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode('{"username": "diana"}'))
controller.close()
},
})
const processedStream = processor.processStream(
mockStream,
'block-1',
['block-1_username', 'block-1_missing_field'],
{ schema: { properties: { username: { type: 'string' } } } }
)
const reader = processedStream.getReader()
const decoder = new TextDecoder()
let result = ''
while (true) {
const { done, value } = await reader.read()
if (done) break
result += decoder.decode(value)
}
expect(result).toBe('diana')
})
it.concurrent('should handle invalid JSON gracefully', async () => {
const mockStream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode('invalid json'))
controller.close()
},
})
const processedStream = processor.processStream(mockStream, 'block-1', ['block-1_username'], {
schema: { properties: { username: { type: 'string' } } },
})
const reader = processedStream.getReader()
const decoder = new TextDecoder()
let result = ''
while (true) {
const { done, value } = await reader.read()
if (done) break
result += decoder.decode(value)
}
expect(result).toBe('')
})
it.concurrent('should filter selected fields for correct block ID', async () => {
const mockStream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode('{"username": "eve", "age": 28}'))
controller.close()
},
})
const processedStream = processor.processStream(
mockStream,
'block-1',
['block-1_username', 'block-2_age'], // Different block ID should be filtered out
{ schema: { properties: { username: { type: 'string' }, age: { type: 'number' } } } }
)
const reader = processedStream.getReader()
const decoder = new TextDecoder()
let result = ''
while (true) {
const { done, value } = await reader.read()
if (done) break
result += decoder.decode(value)
}
expect(result).toBe('eve')
})
it.concurrent('should handle empty result when no matching fields', async () => {
const mockStream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode('{"other_field": "value"}'))
controller.close()
},
})
const processedStream = processor.processStream(mockStream, 'block-1', ['block-1_username'], {
schema: { properties: { username: { type: 'string' } } },
})
const reader = processedStream.getReader()
const decoder = new TextDecoder()
let result = ''
while (true) {
const { done, value } = await reader.read()
if (done) break
result += decoder.decode(value)
}
expect(result).toBe('')
})
})
describe('singleton instance', () => {
it.concurrent('should export a singleton instance', () => {
expect(streamingResponseFormatProcessor).toBeInstanceOf(StreamingResponseFormatProcessor)
})
it.concurrent('should return the same instance on multiple imports', () => {
const instance1 = streamingResponseFormatProcessor
const instance2 = streamingResponseFormatProcessor
expect(instance1).toBe(instance2)
})
})
describe('edge cases', () => {
it.concurrent('should handle empty stream', async () => {
const mockStream = new ReadableStream({
start(controller) {
controller.close()
},
})
const processedStream = processor.processStream(mockStream, 'block-1', ['block-1_username'], {
schema: { properties: { username: { type: 'string' } } },
})
const reader = processedStream.getReader()
const decoder = new TextDecoder()
let result = ''
while (true) {
const { done, value } = await reader.read()
if (done) break
result += decoder.decode(value)
}
expect(result).toBe('')
})
it.concurrent('should handle very large JSON objects', async () => {
const largeObject = {
username: 'frank',
data: 'x'.repeat(10000), // Large string
nested: {
deep: {
value: 'test',
},
},
}
const mockStream = new ReadableStream({
start(controller) {
controller.enqueue(new TextEncoder().encode(JSON.stringify(largeObject)))
controller.close()
},
})
const processedStream = processor.processStream(mockStream, 'block-1', ['block-1_username'], {
schema: { properties: { username: { type: 'string' } } },
})
const reader = processedStream.getReader()
const decoder = new TextDecoder()
let result = ''
while (true) {
const { done, value } = await reader.read()
if (done) break
result += decoder.decode(value)
}
expect(result).toBe('frank')
})
})
})

201
apps/sim/executor/utils.ts Normal file
View File

@@ -0,0 +1,201 @@
import { createLogger } from '@/lib/logs/console-logger'
import type { ResponseFormatStreamProcessor } from './types'
const logger = createLogger('ExecutorUtils')
/**
* Processes a streaming response to extract only the selected response format fields
* instead of streaming the full JSON wrapper.
*/
export class StreamingResponseFormatProcessor implements ResponseFormatStreamProcessor {
processStream(
originalStream: ReadableStream,
blockId: string,
selectedOutputIds: string[],
responseFormat?: any
): ReadableStream {
// Check if this block has response format selected outputs
const hasResponseFormatSelection = selectedOutputIds.some((outputId) => {
const blockIdForOutput = outputId.includes('_')
? outputId.split('_')[0]
: outputId.split('.')[0]
return blockIdForOutput === blockId && outputId.includes('_')
})
// If no response format selection, return original stream unchanged
if (!hasResponseFormatSelection || !responseFormat) {
return originalStream
}
// Get the selected field names for this block
const selectedFields = selectedOutputIds
.filter((outputId) => {
const blockIdForOutput = outputId.includes('_')
? outputId.split('_')[0]
: outputId.split('.')[0]
return blockIdForOutput === blockId && outputId.includes('_')
})
.map((outputId) => outputId.substring(blockId.length + 1))
logger.info('Processing streaming response format', {
blockId,
selectedFields,
hasResponseFormat: !!responseFormat,
selectedFieldsCount: selectedFields.length,
})
return this.createProcessedStream(originalStream, selectedFields, blockId)
}
private createProcessedStream(
originalStream: ReadableStream,
selectedFields: string[],
blockId: string
): ReadableStream {
let buffer = ''
let hasProcessedComplete = false // Track if we've already processed the complete JSON
const self = this
return new ReadableStream({
async start(controller) {
const reader = originalStream.getReader()
const decoder = new TextDecoder()
try {
while (true) {
const { done, value } = await reader.read()
if (done) {
// Handle any remaining buffer at the end only if we haven't processed complete JSON yet
if (buffer.trim() && !hasProcessedComplete) {
self.processCompleteJson(buffer, selectedFields, controller)
}
controller.close()
break
}
const chunk = decoder.decode(value, { stream: true })
buffer += chunk
// Try to process the current buffer only if we haven't processed complete JSON yet
if (!hasProcessedComplete) {
const processedChunk = self.processStreamingChunk(buffer, selectedFields)
if (processedChunk) {
controller.enqueue(new TextEncoder().encode(processedChunk))
hasProcessedComplete = true // Mark as processed to prevent duplicate processing
}
}
}
} catch (error) {
logger.error('Error processing streaming response format:', { error, blockId })
controller.error(error)
} finally {
reader.releaseLock()
}
},
})
}
private processStreamingChunk(buffer: string, selectedFields: string[]): string | null {
// For streaming response format, we need to parse the JSON as it comes in
// and extract only the field values we care about
// Try to parse as complete JSON first
try {
const parsed = JSON.parse(buffer.trim())
if (typeof parsed === 'object' && parsed !== null) {
// We have a complete JSON object, extract the selected fields
// Process all selected fields and format them properly
const results: string[] = []
for (const field of selectedFields) {
if (field in parsed) {
const value = parsed[field]
const formattedValue = typeof value === 'string' ? value : JSON.stringify(value)
results.push(formattedValue)
}
}
if (results.length > 0) {
// Join multiple fields with newlines for readability
const result = results.join('\n')
return result
}
return null
}
} catch (e) {
// Not complete JSON yet, continue buffering
}
// For real-time extraction during streaming, we'd need more sophisticated parsing
// For now, let's handle the case where we receive chunks that might be partial JSON
// Simple heuristic: if buffer contains what looks like a complete JSON object
const openBraces = (buffer.match(/\{/g) || []).length
const closeBraces = (buffer.match(/\}/g) || []).length
if (openBraces > 0 && openBraces === closeBraces) {
// Likely a complete JSON object
try {
const parsed = JSON.parse(buffer.trim())
if (typeof parsed === 'object' && parsed !== null) {
// Process all selected fields and format them properly
const results: string[] = []
for (const field of selectedFields) {
if (field in parsed) {
const value = parsed[field]
const formattedValue = typeof value === 'string' ? value : JSON.stringify(value)
results.push(formattedValue)
}
}
if (results.length > 0) {
// Join multiple fields with newlines for readability
const result = results.join('\n')
return result
}
return null
}
} catch (e) {
// Still not valid JSON, continue
}
}
return null
}
private processCompleteJson(
buffer: string,
selectedFields: string[],
controller: ReadableStreamDefaultController
): void {
try {
const parsed = JSON.parse(buffer.trim())
if (typeof parsed === 'object' && parsed !== null) {
// Process all selected fields and format them properly
const results: string[] = []
for (const field of selectedFields) {
if (field in parsed) {
const value = parsed[field]
const formattedValue = typeof value === 'string' ? value : JSON.stringify(value)
results.push(formattedValue)
}
}
if (results.length > 0) {
// Join multiple fields with newlines for readability
const result = results.join('\n')
controller.enqueue(new TextEncoder().encode(result))
}
}
} catch (error) {
logger.warn('Failed to parse complete JSON in streaming processor:', { error })
}
}
}
// Create singleton instance
export const streamingResponseFormatProcessor = new StreamingResponseFormatProcessor()

View File

@@ -0,0 +1,185 @@
import { createLogger } from '@/lib/logs/console-logger'
const logger = createLogger('ResponseFormatUtils')
// Type definitions for component data structures
export interface Field {
name: string
type: string
description?: string
}
/**
* Helper function to extract fields from JSON Schema
* Handles both legacy format with fields array and new JSON Schema format
*/
export function extractFieldsFromSchema(schema: any): Field[] {
if (!schema || typeof schema !== 'object') {
return []
}
// Handle legacy format with fields array
if (Array.isArray(schema.fields)) {
return schema.fields
}
// Handle new JSON Schema format
const schemaObj = schema.schema || schema
if (!schemaObj || !schemaObj.properties || typeof schemaObj.properties !== 'object') {
return []
}
// Extract fields from schema properties
return Object.entries(schemaObj.properties).map(([name, prop]: [string, any]) => {
// Handle array format like ['string', 'array']
if (Array.isArray(prop)) {
return {
name,
type: prop.includes('array') ? 'array' : prop[0] || 'string',
description: undefined,
}
}
// Handle object format like { type: 'string', description: '...' }
return {
name,
type: prop.type || 'string',
description: prop.description,
}
})
}
/**
* Helper function to safely parse response format
* Handles both string and object formats
*/
export function parseResponseFormatSafely(responseFormatValue: any, blockId: string): any {
if (!responseFormatValue) {
return null
}
try {
if (typeof responseFormatValue === 'string') {
return JSON.parse(responseFormatValue)
}
return responseFormatValue
} catch (error) {
logger.warn(`Failed to parse response format for block ${blockId}:`, error)
return null
}
}
/**
* Extract field values from a parsed JSON object based on selected output paths
* Used for both workspace and chat client field extraction
*/
export function extractFieldValues(
parsedContent: any,
selectedOutputIds: string[],
blockId: string
): Record<string, any> {
const extractedValues: Record<string, any> = {}
for (const outputId of selectedOutputIds) {
const blockIdForOutput = extractBlockIdFromOutputId(outputId)
if (blockIdForOutput !== blockId) {
continue
}
const path = extractPathFromOutputId(outputId, blockIdForOutput)
if (path) {
const pathParts = path.split('.')
let current = parsedContent
for (const part of pathParts) {
if (current && typeof current === 'object' && part in current) {
current = current[part]
} else {
current = undefined
break
}
}
if (current !== undefined) {
extractedValues[path] = current
}
}
}
return extractedValues
}
/**
* Format extracted field values for display
* Returns formatted string representation of field values
*/
export function formatFieldValues(extractedValues: Record<string, any>): string {
const formattedValues: string[] = []
for (const [fieldName, value] of Object.entries(extractedValues)) {
const formattedValue = typeof value === 'string' ? value : JSON.stringify(value)
formattedValues.push(formattedValue)
}
return formattedValues.join('\n')
}
/**
* Extract block ID from output ID
* Handles both formats: "blockId" and "blockId_path" or "blockId.path"
*/
export function extractBlockIdFromOutputId(outputId: string): string {
return outputId.includes('_') ? outputId.split('_')[0] : outputId.split('.')[0]
}
/**
* Extract path from output ID after the block ID
*/
export function extractPathFromOutputId(outputId: string, blockId: string): string {
return outputId.substring(blockId.length + 1)
}
/**
* Parse JSON content from output safely
* Handles both string and object formats with proper error handling
*/
export function parseOutputContentSafely(output: any): any {
if (!output?.content) {
return output
}
if (typeof output.content === 'string') {
try {
return JSON.parse(output.content)
} catch (e) {
// Fallback to original structure if parsing fails
return output
}
}
return output
}
/**
* Check if a set of output IDs contains response format selections for a specific block
*/
export function hasResponseFormatSelection(selectedOutputIds: string[], blockId: string): boolean {
return selectedOutputIds.some((outputId) => {
const blockIdForOutput = extractBlockIdFromOutputId(outputId)
return blockIdForOutput === blockId && outputId.includes('_')
})
}
/**
* Get selected field names for a specific block from output IDs
*/
export function getSelectedFieldNames(selectedOutputIds: string[], blockId: string): string[] {
return selectedOutputIds
.filter((outputId) => {
const blockIdForOutput = extractBlockIdFromOutputId(outputId)
return blockIdForOutput === blockId && outputId.includes('_')
})
.map((outputId) => extractPathFromOutputId(outputId, blockId))
}

View File

@@ -78,7 +78,7 @@ export function processStreamingBlockLog(log: BlockLog, streamedContent: string)
log.output.cost = result.cost
log.output.model = result.model
logTokenizationDetails(`Streaming tokenization completed for ${log.blockType}`, {
logTokenizationDetails(`Streaming tokenization completed for ${log.blockType}`, {
blockId: log.blockId,
blockType: log.blockType,
model: result.model,
@@ -92,7 +92,7 @@ export function processStreamingBlockLog(log: BlockLog, streamedContent: string)
return true
} catch (error) {
logger.error(`Streaming tokenization failed for block ${log.blockId}`, {
logger.error(`Streaming tokenization failed for block ${log.blockId}`, {
blockType: log.blockType,
error: error instanceof Error ? error.message : String(error),
contentLength: streamedContent?.length || 0,

View File

@@ -193,7 +193,10 @@ export const useConsoleStore = create<ConsoleStore>()(
updatedEntry.output = newOutput
}
if (update.output !== undefined) {
if (update.replaceOutput !== undefined) {
// Complete replacement of output
updatedEntry.output = update.replaceOutput
} else if (update.output !== undefined) {
const existingOutput = entry.output || {}
updatedEntry.output = {
...existingOutput,

View File

@@ -20,6 +20,7 @@ export interface ConsoleEntry {
export interface ConsoleUpdate {
content?: string
output?: Partial<NormalizedBlockOutput>
replaceOutput?: NormalizedBlockOutput // New field for complete replacement
error?: string
warning?: string
success?: boolean