diff --git a/apps/sim/app/api/chat/[subdomain]/route.ts b/apps/sim/app/api/chat/[subdomain]/route.ts index 325629420..9c96e76b0 100644 --- a/apps/sim/app/api/chat/[subdomain]/route.ts +++ b/apps/sim/app/api/chat/[subdomain]/route.ts @@ -194,6 +194,7 @@ export async function GET( description: deployment.description, customizations: deployment.customizations, authType: deployment.authType, + outputConfigs: deployment.outputConfigs, }), request ) @@ -219,6 +220,7 @@ export async function GET( description: deployment.description, customizations: deployment.customizations, authType: deployment.authType, + outputConfigs: deployment.outputConfigs, }), request ) diff --git a/apps/sim/app/api/chat/utils.ts b/apps/sim/app/api/chat/utils.ts index 2d954c8dd..fb67d1bad 100644 --- a/apps/sim/app/api/chat/utils.ts +++ b/apps/sim/app/api/chat/utils.ts @@ -263,17 +263,26 @@ export async function executeWorkflowForChat( let outputBlockIds: string[] = [] // Extract output configs from the new schema format + let selectedOutputIds: string[] = [] if (deployment.outputConfigs && Array.isArray(deployment.outputConfigs)) { - // Extract block IDs and paths from the new outputConfigs array format + // Extract output IDs in the format expected by the streaming processor logger.debug( `[${requestId}] Found ${deployment.outputConfigs.length} output configs in deployment` ) - deployment.outputConfigs.forEach((config) => { + + selectedOutputIds = deployment.outputConfigs.map((config) => { + const outputId = config.path + ? `${config.blockId}_${config.path}` + : `${config.blockId}.content` + logger.debug( - `[${requestId}] Processing output config: blockId=${config.blockId}, path=${config.path || 'none'}` + `[${requestId}] Processing output config: blockId=${config.blockId}, path=${config.path || 'content'} -> outputId=${outputId}` ) + + return outputId }) + // Also extract block IDs for legacy compatibility outputBlockIds = deployment.outputConfigs.map((config) => config.blockId) } else { // Use customizations as fallback @@ -291,7 +300,9 @@ export async function executeWorkflowForChat( outputBlockIds = customizations.outputBlockIds } - logger.debug(`[${requestId}] Using ${outputBlockIds.length} output blocks for extraction`) + logger.debug( + `[${requestId}] Using ${outputBlockIds.length} output blocks and ${selectedOutputIds.length} selected output IDs for extraction` + ) // Find the workflow (deployedState is NOT deprecated - needed for chat execution) const workflowResult = await db @@ -457,7 +468,7 @@ export async function executeWorkflowForChat( workflowVariables, contextExtensions: { stream: true, - selectedOutputIds: outputBlockIds, + selectedOutputIds: selectedOutputIds.length > 0 ? selectedOutputIds : outputBlockIds, edges: edges.map((e: any) => ({ source: e.source, target: e.target, diff --git a/apps/sim/app/chat/[subdomain]/chat-client.tsx b/apps/sim/app/chat/[subdomain]/chat-client.tsx index 23833e814..17cda7bbf 100644 --- a/apps/sim/app/chat/[subdomain]/chat-client.tsx +++ b/apps/sim/app/chat/[subdomain]/chat-client.tsx @@ -33,6 +33,7 @@ interface ChatConfig { headerText?: string } authType?: 'public' | 'password' | 'email' + outputConfigs?: Array<{ blockId: string; path?: string }> } interface AudioStreamingOptions { @@ -373,8 +374,16 @@ export default function ChatClient({ subdomain }: { subdomain: string }) { const json = JSON.parse(line.substring(6)) const { blockId, chunk: contentChunk, event: eventType } = json - if (eventType === 'final') { + if (eventType === 'final' && json.data) { setIsLoading(false) + + // Process final execution result for field extraction + const result = json.data + const nonStreamingLogs = + result.logs?.filter((log: any) => !messageIdMap.has(log.blockId)) || [] + + // Chat field extraction will be handled by the backend using deployment outputConfigs + return } diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/chat/chat.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/chat/chat.tsx index 6ce33a675..3beedb3b8 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/chat/chat.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/chat/chat.tsx @@ -5,6 +5,12 @@ import { ArrowUp } from 'lucide-react' import { Button } from '@/components/ui/button' import { Input } from '@/components/ui/input' import { ScrollArea } from '@/components/ui/scroll-area' +import { createLogger } from '@/lib/logs/console-logger' +import { + extractBlockIdFromOutputId, + extractPathFromOutputId, + parseOutputContentSafely, +} from '@/lib/response-format' import type { BlockLog, ExecutionResult } from '@/executor/types' import { useExecutionStore } from '@/stores/execution/store' import { useChatStore } from '@/stores/panel/chat/store' @@ -14,6 +20,8 @@ import { useWorkflowExecution } from '../../../../hooks/use-workflow-execution' import { ChatMessage } from './components/chat-message/chat-message' import { OutputSelect } from './components/output-select/output-select' +const logger = createLogger('ChatPanel') + interface ChatProps { panelWidth: number chatMessage: string @@ -60,8 +68,8 @@ export function Chat({ panelWidth, chatMessage, setChatMessage }: ChatProps) { const selected = selectedWorkflowOutputs[activeWorkflowId] if (!selected || selected.length === 0) { - const defaultSelection = outputEntries.length > 0 ? [outputEntries[0].id] : [] - return defaultSelection + // Return empty array when nothing is explicitly selected + return [] } // Ensure we have no duplicates in the selection @@ -74,7 +82,7 @@ export function Chat({ panelWidth, chatMessage, setChatMessage }: ChatProps) { } return selected - }, [selectedWorkflowOutputs, activeWorkflowId, outputEntries, setSelectedWorkflowOutput]) + }, [selectedWorkflowOutputs, activeWorkflowId, setSelectedWorkflowOutput]) // Auto-scroll to bottom when new messages are added useEffect(() => { @@ -141,25 +149,22 @@ export function Chat({ panelWidth, chatMessage, setChatMessage }: ChatProps) { if (nonStreamingLogs.length > 0) { const outputsToRender = selectedOutputs.filter((outputId) => { - // Extract block ID correctly - handle both formats: - // - "blockId" (direct block ID) - // - "blockId_response.result" (block ID with path) - const blockIdForOutput = outputId.includes('_') - ? outputId.split('_')[0] - : outputId.split('.')[0] + const blockIdForOutput = extractBlockIdFromOutputId(outputId) return nonStreamingLogs.some((log) => log.blockId === blockIdForOutput) }) for (const outputId of outputsToRender) { - const blockIdForOutput = outputId.includes('_') - ? outputId.split('_')[0] - : outputId.split('.')[0] - const path = outputId.substring(blockIdForOutput.length + 1) + const blockIdForOutput = extractBlockIdFromOutputId(outputId) + const path = extractPathFromOutputId(outputId, blockIdForOutput) const log = nonStreamingLogs.find((l) => l.blockId === blockIdForOutput) if (log) { let outputValue: any = log.output + if (path) { + // Parse JSON content safely + outputValue = parseOutputContentSafely(outputValue) + const pathParts = path.split('.') for (const part of pathParts) { if ( @@ -211,42 +216,41 @@ export function Chat({ panelWidth, chatMessage, setChatMessage }: ChatProps) { } } } catch (e) { - console.error('Error parsing stream data:', e) + logger.error('Error parsing stream data:', e) } } } } } - processStream().catch((e) => console.error('Error processing stream:', e)) + processStream().catch((e) => logger.error('Error processing stream:', e)) } else if (result && 'success' in result && result.success && 'logs' in result) { const finalOutputs: any[] = [] - if (selectedOutputs && selectedOutputs.length > 0) { + if (selectedOutputs?.length > 0) { for (const outputId of selectedOutputs) { - // Find the log that corresponds to the start of the outputId - const log = result.logs?.find( - (l: BlockLog) => l.blockId === outputId || outputId.startsWith(`${l.blockId}_`) - ) + const blockIdForOutput = extractBlockIdFromOutputId(outputId) + const path = extractPathFromOutputId(outputId, blockIdForOutput) + const log = result.logs?.find((l: BlockLog) => l.blockId === blockIdForOutput) if (log) { let output = log.output - // Check if there is a path to traverse - if (outputId.length > log.blockId.length) { - const path = outputId.substring(log.blockId.length + 1) - if (path) { - const pathParts = path.split('.') - let current = output - for (const part of pathParts) { - if (current && typeof current === 'object' && part in current) { - current = current[part] - } else { - current = undefined - break - } + + if (path) { + // Parse JSON content safely + output = parseOutputContentSafely(output) + + const pathParts = path.split('.') + let current = output + for (const part of pathParts) { + if (current && typeof current === 'object' && part in current) { + current = current[part] + } else { + current = undefined + break } - output = current } + output = current } if (output !== undefined) { finalOutputs.push(output) @@ -255,10 +259,8 @@ export function Chat({ panelWidth, chatMessage, setChatMessage }: ChatProps) { } } - // If no specific outputs could be resolved, fall back to the final workflow output - if (finalOutputs.length === 0 && result.output) { - finalOutputs.push(result.output) - } + // Only show outputs if something was explicitly selected + // If no outputs are selected, don't show anything // Add a new message for each resolved output finalOutputs.forEach((output) => { @@ -266,19 +268,8 @@ export function Chat({ panelWidth, chatMessage, setChatMessage }: ChatProps) { if (typeof output === 'string') { content = output } else if (output && typeof output === 'object') { - // Handle cases where output is { response: ... } - const outputObj = output as Record - const response = outputObj.response - if (response) { - if (typeof response.content === 'string') { - content = response.content - } else { - // Pretty print for better readability - content = `\`\`\`json\n${JSON.stringify(response, null, 2)}\n\`\`\`` - } - } else { - content = `\`\`\`json\n${JSON.stringify(output, null, 2)}\n\`\`\`` - } + // For structured responses, pretty print the JSON + content = `\`\`\`json\n${JSON.stringify(output, null, 2)}\n\`\`\`` } if (content) { diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/chat/components/output-select/output-select.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/chat/components/output-select/output-select.tsx index 71b24b790..a4768fd70 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/chat/components/output-select/output-select.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/chat/components/output-select/output-select.tsx @@ -1,8 +1,10 @@ import { useEffect, useMemo, useRef, useState } from 'react' import { Check, ChevronDown } from 'lucide-react' import { Button } from '@/components/ui/button' +import { extractFieldsFromSchema, parseResponseFormatSafely } from '@/lib/response-format' import { cn } from '@/lib/utils' import { getBlock } from '@/blocks' +import { useSubBlockStore } from '@/stores/workflows/subblock/store' import { useWorkflowStore } from '@/stores/workflows/workflow/store' interface OutputSelectProps { @@ -48,8 +50,31 @@ export function OutputSelect({ ? block.name.replace(/\s+/g, '').toLowerCase() : `block-${block.id}` + // Check for custom response format first + const responseFormatValue = useSubBlockStore.getState().getValue(block.id, 'responseFormat') + const responseFormat = parseResponseFormatSafely(responseFormatValue, block.id) + + let outputsToProcess: Record = {} + + if (responseFormat) { + // Use custom schema properties if response format is specified + const schemaFields = extractFieldsFromSchema(responseFormat) + if (schemaFields.length > 0) { + // Convert schema fields to output structure + schemaFields.forEach((field) => { + outputsToProcess[field.name] = { type: field.type } + }) + } else { + // Fallback to default outputs if schema extraction failed + outputsToProcess = block.outputs || {} + } + } else { + // Use default block outputs + outputsToProcess = block.outputs || {} + } + // Add response outputs - if (block.outputs && typeof block.outputs === 'object') { + if (Object.keys(outputsToProcess).length > 0) { const addOutput = (path: string, outputObj: any, prefix = '') => { const fullPath = prefix ? `${prefix}.${path}` : path @@ -100,7 +125,7 @@ export function OutputSelect({ } // Process all output properties directly (flattened structure) - Object.entries(block.outputs).forEach(([key, value]) => { + Object.entries(outputsToProcess).forEach(([key, value]) => { addOutput(key, value) }) } diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/console/components/console-entry/console-entry.tsx b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/console/components/console-entry/console-entry.tsx index 99adc458e..1b0e9fab3 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/console/components/console-entry/console-entry.tsx +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/console/components/console-entry/console-entry.tsx @@ -125,35 +125,33 @@ export function ConsoleEntry({ entry, consoleWidth }: ConsoleEntryProps) {
- {typeof entry.output === 'object' && - entry.output !== null && - hasNestedStructure(entry.output) && ( -
- -
- )} + {entry.output != null && ( +
+ +
+ )}
diff --git a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts index 55710599b..55b1e2397 100644 --- a/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts +++ b/apps/sim/app/workspace/[workspaceId]/w/[workflowId]/hooks/use-workflow-execution.ts @@ -217,10 +217,13 @@ export function useWorkflowExecution() { result.logs.forEach((log: BlockLog) => { if (streamedContent.has(log.blockId)) { const content = streamedContent.get(log.blockId) || '' - if (log.output) { - log.output.content = content - } - useConsoleStore.getState().updateConsole(log.blockId, content) + // For console display, show the actual structured block output instead of formatted streaming content + // This ensures console logs match the block state structure + // Use replaceOutput to completely replace the output instead of merging + useConsoleStore.getState().updateConsole(log.blockId, { + replaceOutput: log.output, + success: true, + }) } }) diff --git a/apps/sim/components/ui/tag-dropdown.test.tsx b/apps/sim/components/ui/tag-dropdown.test.tsx index 839970e95..d95584d52 100644 --- a/apps/sim/components/ui/tag-dropdown.test.tsx +++ b/apps/sim/components/ui/tag-dropdown.test.tsx @@ -1,7 +1,8 @@ import { describe, expect, test, vi } from 'vitest' +import { extractFieldsFromSchema, parseResponseFormatSafely } from '@/lib/response-format' import type { BlockState } from '@/stores/workflows/workflow/types' import { generateLoopBlocks } from '@/stores/workflows/workflow/utils' -import { checkTagTrigger, extractFieldsFromSchema } from './tag-dropdown' +import { checkTagTrigger } from './tag-dropdown' vi.mock('@/stores/workflows/workflow/store', () => ({ useWorkflowStore: vi.fn(() => ({ @@ -24,6 +25,15 @@ vi.mock('@/stores/panel/variables/store', () => ({ })), })) +vi.mock('@/stores/workflows/subblock/store', () => ({ + useSubBlockStore: vi.fn(() => ({ + getValue: vi.fn(() => null), + getState: vi.fn(() => ({ + getValue: vi.fn(() => null), + })), + })), +})) + describe('TagDropdown Loop Suggestions', () => { test('should generate correct loop suggestions for forEach loops', () => { const blocks: Record = { @@ -603,3 +613,180 @@ describe('TagDropdown Tag Selection Logic', () => { }) }) }) + +describe('TagDropdown Response Format Support', () => { + it.concurrent( + 'should use custom schema properties when response format is specified', + async () => { + // Mock the subblock store to return a custom response format + const mockGetValue = vi.fn() + const mockUseSubBlockStore = vi.mocked( + await import('@/stores/workflows/subblock/store') + ).useSubBlockStore + + // Set up the mock to return the example schema from the user + const responseFormatValue = JSON.stringify({ + name: 'short_schema', + description: 'A minimal example schema with a single string property.', + strict: true, + schema: { + type: 'object', + properties: { + example_property: { + type: 'string', + description: 'A simple string property.', + }, + }, + additionalProperties: false, + required: ['example_property'], + }, + }) + + mockGetValue.mockImplementation((blockId: string, subBlockId: string) => { + if (blockId === 'agent1' && subBlockId === 'responseFormat') { + return responseFormatValue + } + return null + }) + + mockUseSubBlockStore.mockReturnValue({ + getValue: mockGetValue, + getState: () => ({ + getValue: mockGetValue, + }), + } as any) + + // Test the parseResponseFormatSafely function + const parsedFormat = parseResponseFormatSafely(responseFormatValue, 'agent1') + + expect(parsedFormat).toEqual({ + name: 'short_schema', + description: 'A minimal example schema with a single string property.', + strict: true, + schema: { + type: 'object', + properties: { + example_property: { + type: 'string', + description: 'A simple string property.', + }, + }, + additionalProperties: false, + required: ['example_property'], + }, + }) + + // Test the extractFieldsFromSchema function with the parsed format + const fields = extractFieldsFromSchema(parsedFormat) + + expect(fields).toEqual([ + { + name: 'example_property', + type: 'string', + description: 'A simple string property.', + }, + ]) + } + ) + + it.concurrent( + 'should fallback to default outputs when response format parsing fails', + async () => { + // Test with invalid JSON + const invalidFormat = parseResponseFormatSafely('invalid json', 'agent1') + expect(invalidFormat).toBeNull() + + // Test with null/undefined values + expect(parseResponseFormatSafely(null, 'agent1')).toBeNull() + expect(parseResponseFormatSafely(undefined, 'agent1')).toBeNull() + expect(parseResponseFormatSafely('', 'agent1')).toBeNull() + } + ) + + it.concurrent('should handle response format with nested schema correctly', async () => { + const responseFormat = { + schema: { + type: 'object', + properties: { + user: { + type: 'object', + description: 'User information', + properties: { + name: { type: 'string', description: 'User name' }, + age: { type: 'number', description: 'User age' }, + }, + }, + status: { type: 'string', description: 'Response status' }, + }, + }, + } + + const fields = extractFieldsFromSchema(responseFormat) + + expect(fields).toEqual([ + { name: 'user', type: 'object', description: 'User information' }, + { name: 'status', type: 'string', description: 'Response status' }, + ]) + }) + + it.concurrent('should handle response format without schema wrapper', async () => { + const responseFormat = { + type: 'object', + properties: { + result: { type: 'boolean', description: 'Operation result' }, + message: { type: 'string', description: 'Status message' }, + }, + } + + const fields = extractFieldsFromSchema(responseFormat) + + expect(fields).toEqual([ + { name: 'result', type: 'boolean', description: 'Operation result' }, + { name: 'message', type: 'string', description: 'Status message' }, + ]) + }) + + it.concurrent('should return object as-is when it is already parsed', async () => { + const responseFormat = { + name: 'test_schema', + schema: { + properties: { + data: { type: 'string' }, + }, + }, + } + + const result = parseResponseFormatSafely(responseFormat, 'agent1') + + expect(result).toEqual(responseFormat) + }) + + it.concurrent('should simulate block tag generation with custom response format', async () => { + // Simulate the tag generation logic that would happen in the component + const blockName = 'Agent 1' + const normalizedBlockName = blockName.replace(/\s+/g, '').toLowerCase() // 'agent1' + + // Mock response format + const responseFormat = { + schema: { + properties: { + example_property: { type: 'string', description: 'A simple string property.' }, + another_field: { type: 'number', description: 'Another field.' }, + }, + }, + } + + const schemaFields = extractFieldsFromSchema(responseFormat) + + // Generate block tags as they would be in the component + const blockTags = schemaFields.map((field) => `${normalizedBlockName}.${field.name}`) + + expect(blockTags).toEqual(['agent1.example_property', 'agent1.another_field']) + + // Verify the fields extracted correctly + expect(schemaFields).toEqual([ + { name: 'example_property', type: 'string', description: 'A simple string property.' }, + { name: 'another_field', type: 'number', description: 'Another field.' }, + ]) + }) +}) diff --git a/apps/sim/components/ui/tag-dropdown.tsx b/apps/sim/components/ui/tag-dropdown.tsx index 9ae988386..ff07b6d42 100644 --- a/apps/sim/components/ui/tag-dropdown.tsx +++ b/apps/sim/components/ui/tag-dropdown.tsx @@ -1,18 +1,16 @@ import type React from 'react' import { useCallback, useEffect, useMemo, useState } from 'react' import { BlockPathCalculator } from '@/lib/block-path-calculator' -import { createLogger } from '@/lib/logs/console-logger' +import { extractFieldsFromSchema, parseResponseFormatSafely } from '@/lib/response-format' import { cn } from '@/lib/utils' import { getBlock } from '@/blocks' import { Serializer } from '@/serializer' import { useVariablesStore } from '@/stores/panel/variables/store' import type { Variable } from '@/stores/panel/variables/types' import { useWorkflowRegistry } from '@/stores/workflows/registry/store' +import { useSubBlockStore } from '@/stores/workflows/subblock/store' import { useWorkflowStore } from '@/stores/workflows/workflow/store' -const logger = createLogger('TagDropdown') - -// Type definitions for component data structures interface BlockTagGroup { blockName: string blockId: string @@ -21,49 +19,6 @@ interface BlockTagGroup { distance: number } -interface Field { - name: string - type: string - description?: string -} - -// Helper function to extract fields from JSON Schema -export function extractFieldsFromSchema(schema: any): Field[] { - if (!schema || typeof schema !== 'object') { - return [] - } - - // Handle legacy format with fields array - if (Array.isArray(schema.fields)) { - return schema.fields - } - - // Handle new JSON Schema format - const schemaObj = schema.schema || schema - if (!schemaObj || !schemaObj.properties || typeof schemaObj.properties !== 'object') { - return [] - } - - // Extract fields from schema properties - return Object.entries(schemaObj.properties).map(([name, prop]: [string, any]) => { - // Handle array format like ['string', 'array'] - if (Array.isArray(prop)) { - return { - name, - type: prop.includes('array') ? 'array' : prop[0] || 'string', - description: undefined, - } - } - - // Handle object format like { type: 'string', description: '...' } - return { - name, - type: prop.type || 'string', - description: prop.description, - } - }) -} - interface TagDropdownProps { visible: boolean onSelect: (newValue: string) => void @@ -208,11 +163,29 @@ export const TagDropdown: React.FC = ({ const blockName = sourceBlock.name || sourceBlock.type const normalizedBlockName = blockName.replace(/\s+/g, '').toLowerCase() - // Handle blocks with no outputs (like starter) - show as just + // Check for custom response format first + const responseFormatValue = useSubBlockStore + .getState() + .getValue(activeSourceBlockId, 'responseFormat') + const responseFormat = parseResponseFormatSafely(responseFormatValue, activeSourceBlockId) + let blockTags: string[] - if (Object.keys(blockConfig.outputs).length === 0) { + + if (responseFormat) { + // Use custom schema properties if response format is specified + const schemaFields = extractFieldsFromSchema(responseFormat) + if (schemaFields.length > 0) { + blockTags = schemaFields.map((field) => `${normalizedBlockName}.${field.name}`) + } else { + // Fallback to default if schema extraction failed + const outputPaths = generateOutputPaths(blockConfig.outputs) + blockTags = outputPaths.map((path) => `${normalizedBlockName}.${path}`) + } + } else if (Object.keys(blockConfig.outputs).length === 0) { + // Handle blocks with no outputs (like starter) - show as just blockTags = [normalizedBlockName] } else { + // Use default block outputs const outputPaths = generateOutputPaths(blockConfig.outputs) blockTags = outputPaths.map((path) => `${normalizedBlockName}.${path}`) } @@ -341,7 +314,7 @@ export const TagDropdown: React.FC = ({ ) let containingParallelBlockId: string | null = null if (containingParallel) { - const [parallelId, parallel] = containingParallel + const [parallelId] = containingParallel containingParallelBlockId = parallelId const contextualTags: string[] = ['index', 'currentItem', 'items'] @@ -413,11 +386,29 @@ export const TagDropdown: React.FC = ({ const blockName = accessibleBlock.name || accessibleBlock.type const normalizedBlockName = blockName.replace(/\s+/g, '').toLowerCase() - // Handle blocks with no outputs (like starter) - show as just + // Check for custom response format first + const responseFormatValue = useSubBlockStore + .getState() + .getValue(accessibleBlockId, 'responseFormat') + const responseFormat = parseResponseFormatSafely(responseFormatValue, accessibleBlockId) + let blockTags: string[] - if (Object.keys(blockConfig.outputs).length === 0) { + + if (responseFormat) { + // Use custom schema properties if response format is specified + const schemaFields = extractFieldsFromSchema(responseFormat) + if (schemaFields.length > 0) { + blockTags = schemaFields.map((field) => `${normalizedBlockName}.${field.name}`) + } else { + // Fallback to default if schema extraction failed + const outputPaths = generateOutputPaths(blockConfig.outputs) + blockTags = outputPaths.map((path) => `${normalizedBlockName}.${path}`) + } + } else if (Object.keys(blockConfig.outputs).length === 0) { + // Handle blocks with no outputs (like starter) - show as just blockTags = [normalizedBlockName] } else { + // Use default block outputs const outputPaths = generateOutputPaths(blockConfig.outputs) blockTags = outputPaths.map((path) => `${normalizedBlockName}.${path}`) } diff --git a/apps/sim/executor/index.ts b/apps/sim/executor/index.ts index a74e70c5d..a75201d20 100644 --- a/apps/sim/executor/index.ts +++ b/apps/sim/executor/index.ts @@ -30,6 +30,7 @@ import type { NormalizedBlockOutput, StreamingExecution, } from './types' +import { streamingResponseFormatProcessor } from './utils' const logger = createLogger('Executor') @@ -242,7 +243,25 @@ export class Executor { const streamingExec = output as StreamingExecution const [streamForClient, streamForExecutor] = streamingExec.stream.tee() - const clientStreamingExec = { ...streamingExec, stream: streamForClient } + // Apply response format processing to the client stream if needed + const blockId = (streamingExec.execution as any).blockId + + // Get response format from initial block states (passed from useWorkflowExecution) + // The initialBlockStates contain the subblock values including responseFormat + let responseFormat: any + if (this.initialBlockStates?.[blockId]) { + const blockState = this.initialBlockStates[blockId] as any + responseFormat = blockState.responseFormat + } + + const processedClientStream = streamingResponseFormatProcessor.processStream( + streamForClient, + blockId, + context.selectedOutputIds || [], + responseFormat + ) + + const clientStreamingExec = { ...streamingExec, stream: processedClientStream } try { // Handle client stream with proper error handling @@ -267,7 +286,41 @@ export class Executor { const blockId = (streamingExec.execution as any).blockId const blockState = context.blockStates.get(blockId) if (blockState?.output) { - blockState.output.content = fullContent + // Check if we have response format - if so, preserve structured response + let responseFormat: any + if (this.initialBlockStates?.[blockId]) { + const initialBlockState = this.initialBlockStates[blockId] as any + responseFormat = initialBlockState.responseFormat + } + + if (responseFormat && fullContent) { + // For structured responses, always try to parse the raw streaming content + // The streamForExecutor contains the raw JSON response, not the processed display text + try { + const parsedContent = JSON.parse(fullContent) + // Preserve metadata but spread parsed fields at root level (same as manual execution) + const structuredOutput = { + ...parsedContent, + tokens: blockState.output.tokens, + toolCalls: blockState.output.toolCalls, + providerTiming: blockState.output.providerTiming, + cost: blockState.output.cost, + } + blockState.output = structuredOutput + + // Also update the corresponding block log with the structured output + const blockLog = context.blockLogs.find((log) => log.blockId === blockId) + if (blockLog) { + blockLog.output = structuredOutput + } + } catch (parseError) { + // If parsing fails, fall back to setting content + blockState.output.content = fullContent + } + } else { + // No response format, use standard content setting + blockState.output.content = fullContent + } } } catch (readerError: any) { logger.error('Error reading stream for executor:', readerError) @@ -275,7 +328,40 @@ export class Executor { const blockId = (streamingExec.execution as any).blockId const blockState = context.blockStates.get(blockId) if (blockState?.output && fullContent) { - blockState.output.content = fullContent + // Check if we have response format for error handling too + let responseFormat: any + if (this.initialBlockStates?.[blockId]) { + const initialBlockState = this.initialBlockStates[blockId] as any + responseFormat = initialBlockState.responseFormat + } + + if (responseFormat) { + // For structured responses, always try to parse the raw streaming content + // The streamForExecutor contains the raw JSON response, not the processed display text + try { + const parsedContent = JSON.parse(fullContent) + const structuredOutput = { + ...parsedContent, + tokens: blockState.output.tokens, + toolCalls: blockState.output.toolCalls, + providerTiming: blockState.output.providerTiming, + cost: blockState.output.cost, + } + blockState.output = structuredOutput + + // Also update the corresponding block log with the structured output + const blockLog = context.blockLogs.find((log) => log.blockId === blockId) + if (blockLog) { + blockLog.output = structuredOutput + } + } catch (parseError) { + // If parsing fails, fall back to setting content + blockState.output.content = fullContent + } + } else { + // No response format, use standard content setting + blockState.output.content = fullContent + } } } finally { try { @@ -1257,6 +1343,7 @@ export class Executor { context.blockLogs.push(blockLog) // Skip console logging for infrastructure blocks like loops and parallels + // For streaming blocks, we'll add the console entry after stream processing if (block.metadata?.id !== 'loop' && block.metadata?.id !== 'parallel') { addConsole({ output: blockLog.output, diff --git a/apps/sim/executor/types.ts b/apps/sim/executor/types.ts index ac76a3538..d8c078c4b 100644 --- a/apps/sim/executor/types.ts +++ b/apps/sim/executor/types.ts @@ -269,3 +269,15 @@ export interface Tool

> { export interface ToolRegistry { [key: string]: Tool } + +/** + * Interface for a stream processor that can process a stream based on a response format. + */ +export interface ResponseFormatStreamProcessor { + processStream( + originalStream: ReadableStream, + blockId: string, + selectedOutputIds: string[], + responseFormat?: any + ): ReadableStream +} diff --git a/apps/sim/executor/utils.test.ts b/apps/sim/executor/utils.test.ts new file mode 100644 index 000000000..4453bc580 --- /dev/null +++ b/apps/sim/executor/utils.test.ts @@ -0,0 +1,354 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' +import { StreamingResponseFormatProcessor, streamingResponseFormatProcessor } from './utils' + +vi.mock('@/lib/logs/console-logger', () => ({ + createLogger: vi.fn().mockReturnValue({ + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }), +})) + +describe('StreamingResponseFormatProcessor', () => { + let processor: StreamingResponseFormatProcessor + + beforeEach(() => { + processor = new StreamingResponseFormatProcessor() + }) + + afterEach(() => { + vi.clearAllMocks() + }) + + describe('processStream', () => { + it.concurrent('should return original stream when no response format selection', async () => { + const mockStream = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode('{"content": "test"}')) + controller.close() + }, + }) + + const result = processor.processStream( + mockStream, + 'block-1', + ['block-1.content'], // No underscore, not response format + { schema: { properties: { username: { type: 'string' } } } } + ) + + expect(result).toBe(mockStream) + }) + + it.concurrent('should return original stream when no response format provided', async () => { + const mockStream = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode('{"content": "test"}')) + controller.close() + }, + }) + + const result = processor.processStream( + mockStream, + 'block-1', + ['block-1_username'], // Has underscore but no response format + undefined + ) + + expect(result).toBe(mockStream) + }) + + it.concurrent('should process stream and extract single selected field', async () => { + const mockStream = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode('{"username": "alice", "age": 25}')) + controller.close() + }, + }) + + const processedStream = processor.processStream(mockStream, 'block-1', ['block-1_username'], { + schema: { properties: { username: { type: 'string' }, age: { type: 'number' } } }, + }) + + const reader = processedStream.getReader() + const decoder = new TextDecoder() + let result = '' + + while (true) { + const { done, value } = await reader.read() + if (done) break + result += decoder.decode(value) + } + + expect(result).toBe('alice') + }) + + it.concurrent('should process stream and extract multiple selected fields', async () => { + const mockStream = new ReadableStream({ + start(controller) { + controller.enqueue( + new TextEncoder().encode('{"username": "bob", "age": 30, "email": "bob@test.com"}') + ) + controller.close() + }, + }) + + const processedStream = processor.processStream( + mockStream, + 'block-1', + ['block-1_username', 'block-1_age'], + { schema: { properties: { username: { type: 'string' }, age: { type: 'number' } } } } + ) + + const reader = processedStream.getReader() + const decoder = new TextDecoder() + let result = '' + + while (true) { + const { done, value } = await reader.read() + if (done) break + result += decoder.decode(value) + } + + expect(result).toBe('bob\n30') + }) + + it.concurrent('should handle non-string field values by JSON stringifying them', async () => { + const mockStream = new ReadableStream({ + start(controller) { + controller.enqueue( + new TextEncoder().encode( + '{"config": {"theme": "dark", "notifications": true}, "count": 42}' + ) + ) + controller.close() + }, + }) + + const processedStream = processor.processStream( + mockStream, + 'block-1', + ['block-1_config', 'block-1_count'], + { schema: { properties: { config: { type: 'object' }, count: { type: 'number' } } } } + ) + + const reader = processedStream.getReader() + const decoder = new TextDecoder() + let result = '' + + while (true) { + const { done, value } = await reader.read() + if (done) break + result += decoder.decode(value) + } + + expect(result).toBe('{"theme":"dark","notifications":true}\n42') + }) + + it.concurrent('should handle streaming JSON that comes in chunks', async () => { + const mockStream = new ReadableStream({ + start(controller) { + // Simulate streaming JSON in chunks + controller.enqueue(new TextEncoder().encode('{"username": "charlie"')) + controller.enqueue(new TextEncoder().encode(', "age": 35}')) + controller.close() + }, + }) + + const processedStream = processor.processStream(mockStream, 'block-1', ['block-1_username'], { + schema: { properties: { username: { type: 'string' }, age: { type: 'number' } } }, + }) + + const reader = processedStream.getReader() + const decoder = new TextDecoder() + let result = '' + + while (true) { + const { done, value } = await reader.read() + if (done) break + result += decoder.decode(value) + } + + expect(result).toBe('charlie') + }) + + it.concurrent('should handle missing fields gracefully', async () => { + const mockStream = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode('{"username": "diana"}')) + controller.close() + }, + }) + + const processedStream = processor.processStream( + mockStream, + 'block-1', + ['block-1_username', 'block-1_missing_field'], + { schema: { properties: { username: { type: 'string' } } } } + ) + + const reader = processedStream.getReader() + const decoder = new TextDecoder() + let result = '' + + while (true) { + const { done, value } = await reader.read() + if (done) break + result += decoder.decode(value) + } + + expect(result).toBe('diana') + }) + + it.concurrent('should handle invalid JSON gracefully', async () => { + const mockStream = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode('invalid json')) + controller.close() + }, + }) + + const processedStream = processor.processStream(mockStream, 'block-1', ['block-1_username'], { + schema: { properties: { username: { type: 'string' } } }, + }) + + const reader = processedStream.getReader() + const decoder = new TextDecoder() + let result = '' + + while (true) { + const { done, value } = await reader.read() + if (done) break + result += decoder.decode(value) + } + + expect(result).toBe('') + }) + + it.concurrent('should filter selected fields for correct block ID', async () => { + const mockStream = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode('{"username": "eve", "age": 28}')) + controller.close() + }, + }) + + const processedStream = processor.processStream( + mockStream, + 'block-1', + ['block-1_username', 'block-2_age'], // Different block ID should be filtered out + { schema: { properties: { username: { type: 'string' }, age: { type: 'number' } } } } + ) + + const reader = processedStream.getReader() + const decoder = new TextDecoder() + let result = '' + + while (true) { + const { done, value } = await reader.read() + if (done) break + result += decoder.decode(value) + } + + expect(result).toBe('eve') + }) + + it.concurrent('should handle empty result when no matching fields', async () => { + const mockStream = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode('{"other_field": "value"}')) + controller.close() + }, + }) + + const processedStream = processor.processStream(mockStream, 'block-1', ['block-1_username'], { + schema: { properties: { username: { type: 'string' } } }, + }) + + const reader = processedStream.getReader() + const decoder = new TextDecoder() + let result = '' + + while (true) { + const { done, value } = await reader.read() + if (done) break + result += decoder.decode(value) + } + + expect(result).toBe('') + }) + }) + + describe('singleton instance', () => { + it.concurrent('should export a singleton instance', () => { + expect(streamingResponseFormatProcessor).toBeInstanceOf(StreamingResponseFormatProcessor) + }) + + it.concurrent('should return the same instance on multiple imports', () => { + const instance1 = streamingResponseFormatProcessor + const instance2 = streamingResponseFormatProcessor + expect(instance1).toBe(instance2) + }) + }) + + describe('edge cases', () => { + it.concurrent('should handle empty stream', async () => { + const mockStream = new ReadableStream({ + start(controller) { + controller.close() + }, + }) + + const processedStream = processor.processStream(mockStream, 'block-1', ['block-1_username'], { + schema: { properties: { username: { type: 'string' } } }, + }) + + const reader = processedStream.getReader() + const decoder = new TextDecoder() + let result = '' + + while (true) { + const { done, value } = await reader.read() + if (done) break + result += decoder.decode(value) + } + + expect(result).toBe('') + }) + + it.concurrent('should handle very large JSON objects', async () => { + const largeObject = { + username: 'frank', + data: 'x'.repeat(10000), // Large string + nested: { + deep: { + value: 'test', + }, + }, + } + + const mockStream = new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode(JSON.stringify(largeObject))) + controller.close() + }, + }) + + const processedStream = processor.processStream(mockStream, 'block-1', ['block-1_username'], { + schema: { properties: { username: { type: 'string' } } }, + }) + + const reader = processedStream.getReader() + const decoder = new TextDecoder() + let result = '' + + while (true) { + const { done, value } = await reader.read() + if (done) break + result += decoder.decode(value) + } + + expect(result).toBe('frank') + }) + }) +}) diff --git a/apps/sim/executor/utils.ts b/apps/sim/executor/utils.ts new file mode 100644 index 000000000..007404319 --- /dev/null +++ b/apps/sim/executor/utils.ts @@ -0,0 +1,201 @@ +import { createLogger } from '@/lib/logs/console-logger' +import type { ResponseFormatStreamProcessor } from './types' + +const logger = createLogger('ExecutorUtils') + +/** + * Processes a streaming response to extract only the selected response format fields + * instead of streaming the full JSON wrapper. + */ +export class StreamingResponseFormatProcessor implements ResponseFormatStreamProcessor { + processStream( + originalStream: ReadableStream, + blockId: string, + selectedOutputIds: string[], + responseFormat?: any + ): ReadableStream { + // Check if this block has response format selected outputs + const hasResponseFormatSelection = selectedOutputIds.some((outputId) => { + const blockIdForOutput = outputId.includes('_') + ? outputId.split('_')[0] + : outputId.split('.')[0] + return blockIdForOutput === blockId && outputId.includes('_') + }) + + // If no response format selection, return original stream unchanged + if (!hasResponseFormatSelection || !responseFormat) { + return originalStream + } + + // Get the selected field names for this block + const selectedFields = selectedOutputIds + .filter((outputId) => { + const blockIdForOutput = outputId.includes('_') + ? outputId.split('_')[0] + : outputId.split('.')[0] + return blockIdForOutput === blockId && outputId.includes('_') + }) + .map((outputId) => outputId.substring(blockId.length + 1)) + + logger.info('Processing streaming response format', { + blockId, + selectedFields, + hasResponseFormat: !!responseFormat, + selectedFieldsCount: selectedFields.length, + }) + + return this.createProcessedStream(originalStream, selectedFields, blockId) + } + + private createProcessedStream( + originalStream: ReadableStream, + selectedFields: string[], + blockId: string + ): ReadableStream { + let buffer = '' + let hasProcessedComplete = false // Track if we've already processed the complete JSON + + const self = this + + return new ReadableStream({ + async start(controller) { + const reader = originalStream.getReader() + const decoder = new TextDecoder() + + try { + while (true) { + const { done, value } = await reader.read() + + if (done) { + // Handle any remaining buffer at the end only if we haven't processed complete JSON yet + if (buffer.trim() && !hasProcessedComplete) { + self.processCompleteJson(buffer, selectedFields, controller) + } + controller.close() + break + } + + const chunk = decoder.decode(value, { stream: true }) + buffer += chunk + + // Try to process the current buffer only if we haven't processed complete JSON yet + if (!hasProcessedComplete) { + const processedChunk = self.processStreamingChunk(buffer, selectedFields) + + if (processedChunk) { + controller.enqueue(new TextEncoder().encode(processedChunk)) + hasProcessedComplete = true // Mark as processed to prevent duplicate processing + } + } + } + } catch (error) { + logger.error('Error processing streaming response format:', { error, blockId }) + controller.error(error) + } finally { + reader.releaseLock() + } + }, + }) + } + + private processStreamingChunk(buffer: string, selectedFields: string[]): string | null { + // For streaming response format, we need to parse the JSON as it comes in + // and extract only the field values we care about + + // Try to parse as complete JSON first + try { + const parsed = JSON.parse(buffer.trim()) + if (typeof parsed === 'object' && parsed !== null) { + // We have a complete JSON object, extract the selected fields + // Process all selected fields and format them properly + const results: string[] = [] + for (const field of selectedFields) { + if (field in parsed) { + const value = parsed[field] + const formattedValue = typeof value === 'string' ? value : JSON.stringify(value) + results.push(formattedValue) + } + } + + if (results.length > 0) { + // Join multiple fields with newlines for readability + const result = results.join('\n') + return result + } + + return null + } + } catch (e) { + // Not complete JSON yet, continue buffering + } + + // For real-time extraction during streaming, we'd need more sophisticated parsing + // For now, let's handle the case where we receive chunks that might be partial JSON + + // Simple heuristic: if buffer contains what looks like a complete JSON object + const openBraces = (buffer.match(/\{/g) || []).length + const closeBraces = (buffer.match(/\}/g) || []).length + + if (openBraces > 0 && openBraces === closeBraces) { + // Likely a complete JSON object + try { + const parsed = JSON.parse(buffer.trim()) + if (typeof parsed === 'object' && parsed !== null) { + // Process all selected fields and format them properly + const results: string[] = [] + for (const field of selectedFields) { + if (field in parsed) { + const value = parsed[field] + const formattedValue = typeof value === 'string' ? value : JSON.stringify(value) + results.push(formattedValue) + } + } + + if (results.length > 0) { + // Join multiple fields with newlines for readability + const result = results.join('\n') + return result + } + + return null + } + } catch (e) { + // Still not valid JSON, continue + } + } + + return null + } + + private processCompleteJson( + buffer: string, + selectedFields: string[], + controller: ReadableStreamDefaultController + ): void { + try { + const parsed = JSON.parse(buffer.trim()) + if (typeof parsed === 'object' && parsed !== null) { + // Process all selected fields and format them properly + const results: string[] = [] + for (const field of selectedFields) { + if (field in parsed) { + const value = parsed[field] + const formattedValue = typeof value === 'string' ? value : JSON.stringify(value) + results.push(formattedValue) + } + } + + if (results.length > 0) { + // Join multiple fields with newlines for readability + const result = results.join('\n') + controller.enqueue(new TextEncoder().encode(result)) + } + } + } catch (error) { + logger.warn('Failed to parse complete JSON in streaming processor:', { error }) + } + } +} + +// Create singleton instance +export const streamingResponseFormatProcessor = new StreamingResponseFormatProcessor() diff --git a/apps/sim/lib/response-format.ts b/apps/sim/lib/response-format.ts new file mode 100644 index 000000000..7a319854a --- /dev/null +++ b/apps/sim/lib/response-format.ts @@ -0,0 +1,185 @@ +import { createLogger } from '@/lib/logs/console-logger' + +const logger = createLogger('ResponseFormatUtils') + +// Type definitions for component data structures +export interface Field { + name: string + type: string + description?: string +} + +/** + * Helper function to extract fields from JSON Schema + * Handles both legacy format with fields array and new JSON Schema format + */ +export function extractFieldsFromSchema(schema: any): Field[] { + if (!schema || typeof schema !== 'object') { + return [] + } + + // Handle legacy format with fields array + if (Array.isArray(schema.fields)) { + return schema.fields + } + + // Handle new JSON Schema format + const schemaObj = schema.schema || schema + if (!schemaObj || !schemaObj.properties || typeof schemaObj.properties !== 'object') { + return [] + } + + // Extract fields from schema properties + return Object.entries(schemaObj.properties).map(([name, prop]: [string, any]) => { + // Handle array format like ['string', 'array'] + if (Array.isArray(prop)) { + return { + name, + type: prop.includes('array') ? 'array' : prop[0] || 'string', + description: undefined, + } + } + + // Handle object format like { type: 'string', description: '...' } + return { + name, + type: prop.type || 'string', + description: prop.description, + } + }) +} + +/** + * Helper function to safely parse response format + * Handles both string and object formats + */ +export function parseResponseFormatSafely(responseFormatValue: any, blockId: string): any { + if (!responseFormatValue) { + return null + } + + try { + if (typeof responseFormatValue === 'string') { + return JSON.parse(responseFormatValue) + } + return responseFormatValue + } catch (error) { + logger.warn(`Failed to parse response format for block ${blockId}:`, error) + return null + } +} + +/** + * Extract field values from a parsed JSON object based on selected output paths + * Used for both workspace and chat client field extraction + */ +export function extractFieldValues( + parsedContent: any, + selectedOutputIds: string[], + blockId: string +): Record { + const extractedValues: Record = {} + + for (const outputId of selectedOutputIds) { + const blockIdForOutput = extractBlockIdFromOutputId(outputId) + + if (blockIdForOutput !== blockId) { + continue + } + + const path = extractPathFromOutputId(outputId, blockIdForOutput) + + if (path) { + const pathParts = path.split('.') + let current = parsedContent + + for (const part of pathParts) { + if (current && typeof current === 'object' && part in current) { + current = current[part] + } else { + current = undefined + break + } + } + + if (current !== undefined) { + extractedValues[path] = current + } + } + } + + return extractedValues +} + +/** + * Format extracted field values for display + * Returns formatted string representation of field values + */ +export function formatFieldValues(extractedValues: Record): string { + const formattedValues: string[] = [] + + for (const [fieldName, value] of Object.entries(extractedValues)) { + const formattedValue = typeof value === 'string' ? value : JSON.stringify(value) + formattedValues.push(formattedValue) + } + + return formattedValues.join('\n') +} + +/** + * Extract block ID from output ID + * Handles both formats: "blockId" and "blockId_path" or "blockId.path" + */ +export function extractBlockIdFromOutputId(outputId: string): string { + return outputId.includes('_') ? outputId.split('_')[0] : outputId.split('.')[0] +} + +/** + * Extract path from output ID after the block ID + */ +export function extractPathFromOutputId(outputId: string, blockId: string): string { + return outputId.substring(blockId.length + 1) +} + +/** + * Parse JSON content from output safely + * Handles both string and object formats with proper error handling + */ +export function parseOutputContentSafely(output: any): any { + if (!output?.content) { + return output + } + + if (typeof output.content === 'string') { + try { + return JSON.parse(output.content) + } catch (e) { + // Fallback to original structure if parsing fails + return output + } + } + + return output +} + +/** + * Check if a set of output IDs contains response format selections for a specific block + */ +export function hasResponseFormatSelection(selectedOutputIds: string[], blockId: string): boolean { + return selectedOutputIds.some((outputId) => { + const blockIdForOutput = extractBlockIdFromOutputId(outputId) + return blockIdForOutput === blockId && outputId.includes('_') + }) +} + +/** + * Get selected field names for a specific block from output IDs + */ +export function getSelectedFieldNames(selectedOutputIds: string[], blockId: string): string[] { + return selectedOutputIds + .filter((outputId) => { + const blockIdForOutput = extractBlockIdFromOutputId(outputId) + return blockIdForOutput === blockId && outputId.includes('_') + }) + .map((outputId) => extractPathFromOutputId(outputId, blockId)) +} diff --git a/apps/sim/lib/tokenization/streaming.ts b/apps/sim/lib/tokenization/streaming.ts index 6b2ac589e..442f4be85 100644 --- a/apps/sim/lib/tokenization/streaming.ts +++ b/apps/sim/lib/tokenization/streaming.ts @@ -78,7 +78,7 @@ export function processStreamingBlockLog(log: BlockLog, streamedContent: string) log.output.cost = result.cost log.output.model = result.model - logTokenizationDetails(`✅ Streaming tokenization completed for ${log.blockType}`, { + logTokenizationDetails(`Streaming tokenization completed for ${log.blockType}`, { blockId: log.blockId, blockType: log.blockType, model: result.model, @@ -92,7 +92,7 @@ export function processStreamingBlockLog(log: BlockLog, streamedContent: string) return true } catch (error) { - logger.error(`❌ Streaming tokenization failed for block ${log.blockId}`, { + logger.error(`Streaming tokenization failed for block ${log.blockId}`, { blockType: log.blockType, error: error instanceof Error ? error.message : String(error), contentLength: streamedContent?.length || 0, diff --git a/apps/sim/stores/panel/console/store.ts b/apps/sim/stores/panel/console/store.ts index 396666823..91661d998 100644 --- a/apps/sim/stores/panel/console/store.ts +++ b/apps/sim/stores/panel/console/store.ts @@ -193,7 +193,10 @@ export const useConsoleStore = create()( updatedEntry.output = newOutput } - if (update.output !== undefined) { + if (update.replaceOutput !== undefined) { + // Complete replacement of output + updatedEntry.output = update.replaceOutput + } else if (update.output !== undefined) { const existingOutput = entry.output || {} updatedEntry.output = { ...existingOutput, diff --git a/apps/sim/stores/panel/console/types.ts b/apps/sim/stores/panel/console/types.ts index 3760afbc7..363b03364 100644 --- a/apps/sim/stores/panel/console/types.ts +++ b/apps/sim/stores/panel/console/types.ts @@ -20,6 +20,7 @@ export interface ConsoleEntry { export interface ConsoleUpdate { content?: string output?: Partial + replaceOutput?: NormalizedBlockOutput // New field for complete replacement error?: string warning?: string success?: boolean