From 1933e1aad577ae40faf62430ee2358de8ebae5e9 Mon Sep 17 00:00:00 2001 From: Siddharth Ganesan <33737564+Sg312@users.noreply.github.com> Date: Wed, 4 Feb 2026 15:50:18 -0800 Subject: [PATCH] improvement(openai): migrate to responses api (#3135) * Migrate openai to use responses api * Consolidate azure * Fix streaming * Bug fixes * Bug fixes * Fix responseformat * Refactor * Fix bugs * Fix * Fix azure openai response format with tool calls * Fixes * Fixes * Fix temp --- apps/sim/app/api/wand/route.ts | 248 ++++--- apps/sim/lib/copilot/chat-title.ts | 78 ++- apps/sim/providers/azure-openai/index.ts | 557 +--------------- apps/sim/providers/azure-openai/utils.ts | 37 -- apps/sim/providers/openai/core.ts | 807 +++++++++++++++++++++++ apps/sim/providers/openai/index.ts | 563 +--------------- apps/sim/providers/openai/utils.ts | 472 ++++++++++++- 7 files changed, 1512 insertions(+), 1250 deletions(-) delete mode 100644 apps/sim/providers/azure-openai/utils.ts create mode 100644 apps/sim/providers/openai/core.ts diff --git a/apps/sim/app/api/wand/route.ts b/apps/sim/app/api/wand/route.ts index 54f914a2b..bba2c2c2d 100644 --- a/apps/sim/app/api/wand/route.ts +++ b/apps/sim/app/api/wand/route.ts @@ -3,7 +3,6 @@ import { userStats, workflow } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { eq, sql } from 'drizzle-orm' import { type NextRequest, NextResponse } from 'next/server' -import OpenAI, { AzureOpenAI } from 'openai' import { getBYOKKey } from '@/lib/api-key/byok' import { getSession } from '@/lib/auth' import { logModelUsage } from '@/lib/billing/core/usage-log' @@ -12,6 +11,7 @@ import { env } from '@/lib/core/config/env' import { getCostMultiplier, isBillingEnabled } from '@/lib/core/config/feature-flags' import { generateRequestId } from '@/lib/core/utils/request' import { verifyWorkspaceMembership } from '@/app/api/workflows/utils' +import { extractResponseText, parseResponsesUsage } from '@/providers/openai/utils' import { getModelPricing } from '@/providers/utils' export const dynamic = 'force-dynamic' @@ -28,18 +28,6 @@ const openaiApiKey = env.OPENAI_API_KEY const useWandAzure = azureApiKey && azureEndpoint && azureApiVersion -const client = useWandAzure - ? new AzureOpenAI({ - apiKey: azureApiKey, - apiVersion: azureApiVersion, - endpoint: azureEndpoint, - }) - : openaiApiKey - ? new OpenAI({ - apiKey: openaiApiKey, - }) - : null - if (!useWandAzure && !openaiApiKey) { logger.warn( 'Neither Azure OpenAI nor OpenAI API key found. Wand generation API will not function.' @@ -202,20 +190,18 @@ export async function POST(req: NextRequest) { } let isBYOK = false - let activeClient = client - let byokApiKey: string | null = null + let activeOpenAIKey = openaiApiKey if (workspaceId && !useWandAzure) { const byokResult = await getBYOKKey(workspaceId, 'openai') if (byokResult) { isBYOK = true - byokApiKey = byokResult.apiKey - activeClient = new OpenAI({ apiKey: byokResult.apiKey }) + activeOpenAIKey = byokResult.apiKey logger.info(`[${requestId}] Using BYOK OpenAI key for wand generation`) } } - if (!activeClient) { + if (!useWandAzure && !activeOpenAIKey) { logger.error(`[${requestId}] AI client not initialized. Missing API key.`) return NextResponse.json( { success: false, error: 'Wand generation service is not configured.' }, @@ -276,17 +262,18 @@ Use this context to calculate relative dates like "yesterday", "last week", "beg ) const apiUrl = useWandAzure - ? `${azureEndpoint}/openai/deployments/${wandModelName}/chat/completions?api-version=${azureApiVersion}` - : 'https://api.openai.com/v1/chat/completions' + ? `${azureEndpoint?.replace(/\/$/, '')}/openai/v1/responses?api-version=${azureApiVersion}` + : 'https://api.openai.com/v1/responses' const headers: Record = { 'Content-Type': 'application/json', + 'OpenAI-Beta': 'responses=v1', } if (useWandAzure) { headers['api-key'] = azureApiKey! } else { - headers.Authorization = `Bearer ${byokApiKey || openaiApiKey}` + headers.Authorization = `Bearer ${activeOpenAIKey}` } logger.debug(`[${requestId}] Making streaming request to: ${apiUrl}`) @@ -296,11 +283,10 @@ Use this context to calculate relative dates like "yesterday", "last week", "beg headers, body: JSON.stringify({ model: useWandAzure ? wandModelName : 'gpt-4o', - messages: messages, + input: messages, temperature: 0.2, - max_tokens: 10000, + max_output_tokens: 10000, stream: true, - stream_options: { include_usage: true }, }), }) @@ -327,16 +313,29 @@ Use this context to calculate relative dates like "yesterday", "last week", "beg return } + let finalUsage: any = null + let usageRecorded = false + + const recordUsage = async () => { + if (usageRecorded || !finalUsage) { + return + } + + usageRecorded = true + await updateUserStatsForWand(session.user.id, finalUsage, requestId, isBYOK) + } + try { let buffer = '' let chunkCount = 0 - let finalUsage: any = null + let activeEventType: string | undefined while (true) { const { done, value } = await reader.read() if (done) { logger.info(`[${requestId}] Stream completed. Total chunks: ${chunkCount}`) + await recordUsage() controller.enqueue(encoder.encode(`data: ${JSON.stringify({ done: true })}\n\n`)) controller.close() break @@ -348,47 +347,90 @@ Use this context to calculate relative dates like "yesterday", "last week", "beg buffer = lines.pop() || '' for (const line of lines) { - if (line.startsWith('data: ')) { - const data = line.slice(6).trim() + const trimmed = line.trim() + if (!trimmed) { + continue + } - if (data === '[DONE]') { - logger.info(`[${requestId}] Received [DONE] signal`) + if (trimmed.startsWith('event:')) { + activeEventType = trimmed.slice(6).trim() + continue + } - if (finalUsage) { - await updateUserStatsForWand(session.user.id, finalUsage, requestId, isBYOK) + if (!trimmed.startsWith('data:')) { + continue + } + + const data = trimmed.slice(5).trim() + if (data === '[DONE]') { + logger.info(`[${requestId}] Received [DONE] signal`) + + await recordUsage() + + controller.enqueue( + encoder.encode(`data: ${JSON.stringify({ done: true })}\n\n`) + ) + controller.close() + return + } + + let parsed: any + try { + parsed = JSON.parse(data) + } catch (parseError) { + logger.debug(`[${requestId}] Skipped non-JSON line: ${data.substring(0, 100)}`) + continue + } + + const eventType = parsed?.type ?? activeEventType + + if ( + eventType === 'response.error' || + eventType === 'error' || + eventType === 'response.failed' + ) { + throw new Error(parsed?.error?.message || 'Responses stream error') + } + + if ( + eventType === 'response.output_text.delta' || + eventType === 'response.output_json.delta' + ) { + let content = '' + if (typeof parsed.delta === 'string') { + content = parsed.delta + } else if (parsed.delta && typeof parsed.delta.text === 'string') { + content = parsed.delta.text + } else if (parsed.delta && parsed.delta.json !== undefined) { + content = JSON.stringify(parsed.delta.json) + } else if (parsed.json !== undefined) { + content = JSON.stringify(parsed.json) + } else if (typeof parsed.text === 'string') { + content = parsed.text + } + + if (content) { + chunkCount++ + if (chunkCount === 1) { + logger.info(`[${requestId}] Received first content chunk`) } controller.enqueue( - encoder.encode(`data: ${JSON.stringify({ done: true })}\n\n`) + encoder.encode(`data: ${JSON.stringify({ chunk: content })}\n\n`) ) - controller.close() - return } + } - try { - const parsed = JSON.parse(data) - const content = parsed.choices?.[0]?.delta?.content - - if (content) { - chunkCount++ - if (chunkCount === 1) { - logger.info(`[${requestId}] Received first content chunk`) - } - - controller.enqueue( - encoder.encode(`data: ${JSON.stringify({ chunk: content })}\n\n`) - ) + if (eventType === 'response.completed') { + const usage = parseResponsesUsage(parsed?.response?.usage ?? parsed?.usage) + if (usage) { + finalUsage = { + prompt_tokens: usage.promptTokens, + completion_tokens: usage.completionTokens, + total_tokens: usage.totalTokens, } - - if (parsed.usage) { - finalUsage = parsed.usage - logger.info( - `[${requestId}] Received usage data: ${JSON.stringify(parsed.usage)}` - ) - } - } catch (parseError) { - logger.debug( - `[${requestId}] Skipped non-JSON line: ${data.substring(0, 100)}` + logger.info( + `[${requestId}] Received usage data: ${JSON.stringify(finalUsage)}` ) } } @@ -401,6 +443,12 @@ Use this context to calculate relative dates like "yesterday", "last week", "beg stack: streamError?.stack, }) + try { + await recordUsage() + } catch (usageError) { + logger.warn(`[${requestId}] Failed to record usage after stream error`, usageError) + } + const errorData = `data: ${JSON.stringify({ error: 'Streaming failed', done: true })}\n\n` controller.enqueue(encoder.encode(errorData)) controller.close() @@ -424,8 +472,6 @@ Use this context to calculate relative dates like "yesterday", "last week", "beg message: error?.message || 'Unknown error', code: error?.code, status: error?.status, - responseStatus: error?.response?.status, - responseData: error?.response?.data ? safeStringify(error.response.data) : undefined, stack: error?.stack, useWandAzure, model: useWandAzure ? wandModelName : 'gpt-4o', @@ -440,14 +486,43 @@ Use this context to calculate relative dates like "yesterday", "last week", "beg } } - const completion = await activeClient.chat.completions.create({ - model: useWandAzure ? wandModelName : 'gpt-4o', - messages: messages, - temperature: 0.3, - max_tokens: 10000, + const apiUrl = useWandAzure + ? `${azureEndpoint?.replace(/\/$/, '')}/openai/v1/responses?api-version=${azureApiVersion}` + : 'https://api.openai.com/v1/responses' + + const headers: Record = { + 'Content-Type': 'application/json', + 'OpenAI-Beta': 'responses=v1', + } + + if (useWandAzure) { + headers['api-key'] = azureApiKey! + } else { + headers.Authorization = `Bearer ${activeOpenAIKey}` + } + + const response = await fetch(apiUrl, { + method: 'POST', + headers, + body: JSON.stringify({ + model: useWandAzure ? wandModelName : 'gpt-4o', + input: messages, + temperature: 0.2, + max_output_tokens: 10000, + }), }) - const generatedContent = completion.choices[0]?.message?.content?.trim() + if (!response.ok) { + const errorText = await response.text() + const apiError = new Error( + `API request failed: ${response.status} ${response.statusText} - ${errorText}` + ) + ;(apiError as any).status = response.status + throw apiError + } + + const completion = await response.json() + const generatedContent = extractResponseText(completion.output)?.trim() if (!generatedContent) { logger.error( @@ -461,8 +536,18 @@ Use this context to calculate relative dates like "yesterday", "last week", "beg logger.info(`[${requestId}] Wand generation successful`) - if (completion.usage) { - await updateUserStatsForWand(session.user.id, completion.usage, requestId, isBYOK) + const usage = parseResponsesUsage(completion.usage) + if (usage) { + await updateUserStatsForWand( + session.user.id, + { + prompt_tokens: usage.promptTokens, + completion_tokens: usage.completionTokens, + total_tokens: usage.totalTokens, + }, + requestId, + isBYOK + ) } return NextResponse.json({ success: true, content: generatedContent }) @@ -472,10 +557,6 @@ Use this context to calculate relative dates like "yesterday", "last week", "beg message: error?.message || 'Unknown error', code: error?.code, status: error?.status, - responseStatus: error instanceof OpenAI.APIError ? error.status : error?.response?.status, - responseData: (error as any)?.response?.data - ? safeStringify((error as any).response.data) - : undefined, stack: error?.stack, useWandAzure, model: useWandAzure ? wandModelName : 'gpt-4o', @@ -484,26 +565,19 @@ Use this context to calculate relative dates like "yesterday", "last week", "beg }) let clientErrorMessage = 'Wand generation failed. Please try again later.' - let status = 500 + let status = typeof (error as any)?.status === 'number' ? (error as any).status : 500 - if (error instanceof OpenAI.APIError) { - status = error.status || 500 - logger.error( - `[${requestId}] ${useWandAzure ? 'Azure OpenAI' : 'OpenAI'} API Error: ${status} - ${error.message}` - ) - - if (status === 401) { - clientErrorMessage = 'Authentication failed. Please check your API key configuration.' - } else if (status === 429) { - clientErrorMessage = 'Rate limit exceeded. Please try again later.' - } else if (status >= 500) { - clientErrorMessage = - 'The wand generation service is currently unavailable. Please try again later.' - } - } else if (useWandAzure && error.message?.includes('DeploymentNotFound')) { + if (useWandAzure && error?.message?.includes('DeploymentNotFound')) { clientErrorMessage = 'Azure OpenAI deployment not found. Please check your model deployment configuration.' status = 404 + } else if (status === 401) { + clientErrorMessage = 'Authentication failed. Please check your API key configuration.' + } else if (status === 429) { + clientErrorMessage = 'Rate limit exceeded. Please try again later.' + } else if (status >= 500) { + clientErrorMessage = + 'The wand generation service is currently unavailable. Please try again later.' } return NextResponse.json( diff --git a/apps/sim/lib/copilot/chat-title.ts b/apps/sim/lib/copilot/chat-title.ts index 7e383bdbe..10dd88299 100644 --- a/apps/sim/lib/copilot/chat-title.ts +++ b/apps/sim/lib/copilot/chat-title.ts @@ -1,6 +1,6 @@ import { createLogger } from '@sim/logger' -import OpenAI, { AzureOpenAI } from 'openai' import { env } from '@/lib/core/config/env' +import { extractResponseText } from '@/providers/openai/utils' const logger = createLogger('SimAgentUtils') @@ -12,47 +12,65 @@ const openaiApiKey = env.OPENAI_API_KEY const useChatTitleAzure = azureApiKey && azureEndpoint && azureApiVersion -const client = useChatTitleAzure - ? new AzureOpenAI({ - apiKey: azureApiKey, - apiVersion: azureApiVersion, - endpoint: azureEndpoint, - }) - : openaiApiKey - ? new OpenAI({ - apiKey: openaiApiKey, - }) - : null - /** * Generates a short title for a chat based on the first message * @param message First user message in the chat * @returns A short title or null if API key is not available */ export async function generateChatTitle(message: string): Promise { - if (!client) { + if (!useChatTitleAzure && !openaiApiKey) { return null } try { - const response = await client.chat.completions.create({ - model: useChatTitleAzure ? chatTitleModelName : 'gpt-4o', - messages: [ - { - role: 'system', - content: - 'Generate a very short title (3-5 words max) for a chat that starts with this message. The title should be concise and descriptive. Do not wrap the title in quotes.', - }, - { - role: 'user', - content: message, - }, - ], - max_tokens: 20, - temperature: 0.2, + const apiUrl = useChatTitleAzure + ? `${azureEndpoint?.replace(/\/$/, '')}/openai/v1/responses?api-version=${azureApiVersion}` + : 'https://api.openai.com/v1/responses' + + const headers: Record = { + 'Content-Type': 'application/json', + 'OpenAI-Beta': 'responses=v1', + } + + if (useChatTitleAzure) { + headers['api-key'] = azureApiKey! + } else { + headers.Authorization = `Bearer ${openaiApiKey}` + } + + const response = await fetch(apiUrl, { + method: 'POST', + headers, + body: JSON.stringify({ + model: useChatTitleAzure ? chatTitleModelName : 'gpt-4o', + input: [ + { + role: 'system', + content: + 'Generate a very short title (3-5 words max) for a chat that starts with this message. The title should be concise and descriptive. Do not wrap the title in quotes.', + }, + { + role: 'user', + content: message, + }, + ], + max_output_tokens: 20, + temperature: 0.2, + }), }) - const title = response.choices[0]?.message?.content?.trim() || null + if (!response.ok) { + const errorText = await response.text() + logger.error('Error generating chat title:', { + status: response.status, + statusText: response.statusText, + error: errorText, + }) + return null + } + + const data = await response.json() + const title = extractResponseText(data.output)?.trim() || null return title } catch (error) { logger.error('Error generating chat title:', error) diff --git a/apps/sim/providers/azure-openai/index.ts b/apps/sim/providers/azure-openai/index.ts index 195103ffe..da11e0017 100644 --- a/apps/sim/providers/azure-openai/index.ts +++ b/apps/sim/providers/azure-openai/index.ts @@ -1,26 +1,9 @@ import { createLogger } from '@sim/logger' -import { AzureOpenAI } from 'openai' -import type { ChatCompletionCreateParamsStreaming } from 'openai/resources/chat/completions' import { env } from '@/lib/core/config/env' import type { StreamingExecution } from '@/executor/types' -import { MAX_TOOL_ITERATIONS } from '@/providers' -import { - checkForForcedToolUsage, - createReadableStreamFromAzureOpenAIStream, -} from '@/providers/azure-openai/utils' import { getProviderDefaultModel, getProviderModels } from '@/providers/models' -import type { - ProviderConfig, - ProviderRequest, - ProviderResponse, - TimeSegment, -} from '@/providers/types' -import { - calculateCost, - prepareToolExecution, - prepareToolsWithUsageControl, -} from '@/providers/utils' -import { executeTool } from '@/tools' +import { executeResponsesProviderRequest } from '@/providers/openai/core' +import type { ProviderConfig, ProviderRequest, ProviderResponse } from '@/providers/types' const logger = createLogger('AzureOpenAIProvider') @@ -38,16 +21,6 @@ export const azureOpenAIProvider: ProviderConfig = { executeRequest: async ( request: ProviderRequest ): Promise => { - logger.info('Preparing Azure OpenAI request', { - model: request.model, - hasSystemPrompt: !!request.systemPrompt, - hasMessages: !!request.messages?.length, - hasTools: !!request.tools?.length, - toolCount: request.tools?.length || 0, - hasResponseFormat: !!request.responseFormat, - stream: !!request.stream, - }) - const azureEndpoint = request.azureEndpoint || env.AZURE_OPENAI_ENDPOINT const azureApiVersion = request.azureApiVersion || env.AZURE_OPENAI_API_VERSION || '2024-07-01-preview' @@ -58,520 +31,24 @@ export const azureOpenAIProvider: ProviderConfig = { ) } - const azureOpenAI = new AzureOpenAI({ - apiKey: request.apiKey, - apiVersion: azureApiVersion, - endpoint: azureEndpoint, - }) - - const allMessages = [] - - if (request.systemPrompt) { - allMessages.push({ - role: 'system', - content: request.systemPrompt, - }) + if (!request.apiKey) { + throw new Error('API key is required for Azure OpenAI') } - if (request.context) { - allMessages.push({ - role: 'user', - content: request.context, - }) - } - - if (request.messages) { - allMessages.push(...request.messages) - } - - const tools = request.tools?.length - ? request.tools.map((tool) => ({ - type: 'function', - function: { - name: tool.id, - description: tool.description, - parameters: tool.parameters, - }, - })) - : undefined - const deploymentName = request.model.replace('azure/', '') - const payload: any = { - model: deploymentName, - messages: allMessages, - } + const apiUrl = `${azureEndpoint.replace(/\/$/, '')}/openai/v1/responses?api-version=${azureApiVersion}` - if (request.temperature !== undefined) payload.temperature = request.temperature - if (request.maxTokens != null) payload.max_completion_tokens = request.maxTokens - - if (request.reasoningEffort !== undefined) payload.reasoning_effort = request.reasoningEffort - if (request.verbosity !== undefined) payload.verbosity = request.verbosity - - if (request.responseFormat) { - payload.response_format = { - type: 'json_schema', - json_schema: { - name: request.responseFormat.name || 'response_schema', - schema: request.responseFormat.schema || request.responseFormat, - strict: request.responseFormat.strict !== false, - }, - } - - logger.info('Added JSON schema response format to Azure OpenAI request') - } - - let preparedTools: ReturnType | null = null - - if (tools?.length) { - preparedTools = prepareToolsWithUsageControl(tools, request.tools, logger, 'azure-openai') - const { tools: filteredTools, toolChoice } = preparedTools - - if (filteredTools?.length && toolChoice) { - payload.tools = filteredTools - payload.tool_choice = toolChoice - - logger.info('Azure OpenAI request configuration:', { - toolCount: filteredTools.length, - toolChoice: - typeof toolChoice === 'string' - ? toolChoice - : toolChoice.type === 'function' - ? `force:${toolChoice.function.name}` - : toolChoice.type === 'tool' - ? `force:${toolChoice.name}` - : toolChoice.type === 'any' - ? `force:${toolChoice.any?.name || 'unknown'}` - : 'unknown', - model: deploymentName, - }) - } - } - - const providerStartTime = Date.now() - const providerStartTimeISO = new Date(providerStartTime).toISOString() - - try { - if (request.stream && (!tools || tools.length === 0)) { - logger.info('Using streaming response for Azure OpenAI request') - - const streamingParams: ChatCompletionCreateParamsStreaming = { - ...payload, - stream: true, - stream_options: { include_usage: true }, - } - const streamResponse = await azureOpenAI.chat.completions.create(streamingParams) - - const streamingResult = { - stream: createReadableStreamFromAzureOpenAIStream(streamResponse, (content, usage) => { - streamingResult.execution.output.content = content - streamingResult.execution.output.tokens = { - input: usage.prompt_tokens, - output: usage.completion_tokens, - total: usage.total_tokens, - } - - const costResult = calculateCost( - request.model, - usage.prompt_tokens, - usage.completion_tokens - ) - streamingResult.execution.output.cost = { - input: costResult.input, - output: costResult.output, - total: costResult.total, - } - - const streamEndTime = Date.now() - const streamEndTimeISO = new Date(streamEndTime).toISOString() - - if (streamingResult.execution.output.providerTiming) { - streamingResult.execution.output.providerTiming.endTime = streamEndTimeISO - streamingResult.execution.output.providerTiming.duration = - streamEndTime - providerStartTime - - if (streamingResult.execution.output.providerTiming.timeSegments?.[0]) { - streamingResult.execution.output.providerTiming.timeSegments[0].endTime = - streamEndTime - streamingResult.execution.output.providerTiming.timeSegments[0].duration = - streamEndTime - providerStartTime - } - } - }), - execution: { - success: true, - output: { - content: '', - model: request.model, - tokens: { input: 0, output: 0, total: 0 }, - toolCalls: undefined, - providerTiming: { - startTime: providerStartTimeISO, - endTime: new Date().toISOString(), - duration: Date.now() - providerStartTime, - timeSegments: [ - { - type: 'model', - name: 'Streaming response', - startTime: providerStartTime, - endTime: Date.now(), - duration: Date.now() - providerStartTime, - }, - ], - }, - cost: { input: 0, output: 0, total: 0 }, - }, - logs: [], - metadata: { - startTime: providerStartTimeISO, - endTime: new Date().toISOString(), - duration: Date.now() - providerStartTime, - }, - }, - } as StreamingExecution - - return streamingResult as StreamingExecution - } - - const initialCallTime = Date.now() - const originalToolChoice = payload.tool_choice - const forcedTools = preparedTools?.forcedTools || [] - let usedForcedTools: string[] = [] - - let currentResponse = await azureOpenAI.chat.completions.create(payload) - const firstResponseTime = Date.now() - initialCallTime - - let content = currentResponse.choices[0]?.message?.content || '' - const tokens = { - input: currentResponse.usage?.prompt_tokens || 0, - output: currentResponse.usage?.completion_tokens || 0, - total: currentResponse.usage?.total_tokens || 0, - } - const toolCalls = [] - const toolResults = [] - const currentMessages = [...allMessages] - let iterationCount = 0 - let modelTime = firstResponseTime - let toolsTime = 0 - let hasUsedForcedTool = false - - const timeSegments: TimeSegment[] = [ - { - type: 'model', - name: 'Initial response', - startTime: initialCallTime, - endTime: initialCallTime + firstResponseTime, - duration: firstResponseTime, - }, - ] - - const firstCheckResult = checkForForcedToolUsage( - currentResponse, - originalToolChoice, - logger, - forcedTools, - usedForcedTools - ) - hasUsedForcedTool = firstCheckResult.hasUsedForcedTool - usedForcedTools = firstCheckResult.usedForcedTools - - while (iterationCount < MAX_TOOL_ITERATIONS) { - if (currentResponse.choices[0]?.message?.content) { - content = currentResponse.choices[0].message.content - } - - const toolCallsInResponse = currentResponse.choices[0]?.message?.tool_calls - if (!toolCallsInResponse || toolCallsInResponse.length === 0) { - break - } - - logger.info( - `Processing ${toolCallsInResponse.length} tool calls (iteration ${iterationCount + 1}/${MAX_TOOL_ITERATIONS})` - ) - - const toolsStartTime = Date.now() - - const toolExecutionPromises = toolCallsInResponse.map(async (toolCall) => { - const toolCallStartTime = Date.now() - const toolName = toolCall.function.name - - try { - const toolArgs = JSON.parse(toolCall.function.arguments) - const tool = request.tools?.find((t) => t.id === toolName) - - if (!tool) return null - - const { toolParams, executionParams } = prepareToolExecution(tool, toolArgs, request) - const result = await executeTool(toolName, executionParams) - const toolCallEndTime = Date.now() - - return { - toolCall, - toolName, - toolParams, - result, - startTime: toolCallStartTime, - endTime: toolCallEndTime, - duration: toolCallEndTime - toolCallStartTime, - } - } catch (error) { - const toolCallEndTime = Date.now() - logger.error('Error processing tool call:', { error, toolName }) - - return { - toolCall, - toolName, - toolParams: {}, - result: { - success: false, - output: undefined, - error: error instanceof Error ? error.message : 'Tool execution failed', - }, - startTime: toolCallStartTime, - endTime: toolCallEndTime, - duration: toolCallEndTime - toolCallStartTime, - } - } - }) - - const executionResults = await Promise.allSettled(toolExecutionPromises) - - currentMessages.push({ - role: 'assistant', - content: null, - tool_calls: toolCallsInResponse.map((tc) => ({ - id: tc.id, - type: 'function', - function: { - name: tc.function.name, - arguments: tc.function.arguments, - }, - })), - }) - - for (const settledResult of executionResults) { - if (settledResult.status === 'rejected' || !settledResult.value) continue - - const { toolCall, toolName, toolParams, result, startTime, endTime, duration } = - settledResult.value - - timeSegments.push({ - type: 'tool', - name: toolName, - startTime: startTime, - endTime: endTime, - duration: duration, - }) - - let resultContent: any - if (result.success) { - toolResults.push(result.output) - resultContent = result.output - } else { - resultContent = { - error: true, - message: result.error || 'Tool execution failed', - tool: toolName, - } - } - - toolCalls.push({ - name: toolName, - arguments: toolParams, - startTime: new Date(startTime).toISOString(), - endTime: new Date(endTime).toISOString(), - duration: duration, - result: resultContent, - success: result.success, - }) - - currentMessages.push({ - role: 'tool', - tool_call_id: toolCall.id, - content: JSON.stringify(resultContent), - }) - } - - const thisToolsTime = Date.now() - toolsStartTime - toolsTime += thisToolsTime - - const nextPayload = { - ...payload, - messages: currentMessages, - } - - if (typeof originalToolChoice === 'object' && hasUsedForcedTool && forcedTools.length > 0) { - const remainingTools = forcedTools.filter((tool) => !usedForcedTools.includes(tool)) - - if (remainingTools.length > 0) { - nextPayload.tool_choice = { - type: 'function', - function: { name: remainingTools[0] }, - } - logger.info(`Forcing next tool: ${remainingTools[0]}`) - } else { - nextPayload.tool_choice = 'auto' - logger.info('All forced tools have been used, switching to auto tool_choice') - } - } - - const nextModelStartTime = Date.now() - currentResponse = await azureOpenAI.chat.completions.create(nextPayload) - - const nextCheckResult = checkForForcedToolUsage( - currentResponse, - nextPayload.tool_choice, - logger, - forcedTools, - usedForcedTools - ) - hasUsedForcedTool = nextCheckResult.hasUsedForcedTool - usedForcedTools = nextCheckResult.usedForcedTools - - const nextModelEndTime = Date.now() - const thisModelTime = nextModelEndTime - nextModelStartTime - - timeSegments.push({ - type: 'model', - name: `Model response (iteration ${iterationCount + 1})`, - startTime: nextModelStartTime, - endTime: nextModelEndTime, - duration: thisModelTime, - }) - - modelTime += thisModelTime - - if (currentResponse.choices[0]?.message?.content) { - content = currentResponse.choices[0].message.content - } - - if (currentResponse.usage) { - tokens.input += currentResponse.usage.prompt_tokens || 0 - tokens.output += currentResponse.usage.completion_tokens || 0 - tokens.total += currentResponse.usage.total_tokens || 0 - } - - iterationCount++ - } - - if (request.stream) { - logger.info('Using streaming for final response after tool processing') - - const accumulatedCost = calculateCost(request.model, tokens.input, tokens.output) - - const streamingParams: ChatCompletionCreateParamsStreaming = { - ...payload, - messages: currentMessages, - tool_choice: 'auto', - stream: true, - stream_options: { include_usage: true }, - } - const streamResponse = await azureOpenAI.chat.completions.create(streamingParams) - - const streamingResult = { - stream: createReadableStreamFromAzureOpenAIStream(streamResponse, (content, usage) => { - streamingResult.execution.output.content = content - streamingResult.execution.output.tokens = { - input: tokens.input + usage.prompt_tokens, - output: tokens.output + usage.completion_tokens, - total: tokens.total + usage.total_tokens, - } - - const streamCost = calculateCost( - request.model, - usage.prompt_tokens, - usage.completion_tokens - ) - streamingResult.execution.output.cost = { - input: accumulatedCost.input + streamCost.input, - output: accumulatedCost.output + streamCost.output, - total: accumulatedCost.total + streamCost.total, - } - }), - execution: { - success: true, - output: { - content: '', - model: request.model, - tokens: { - input: tokens.input, - output: tokens.output, - total: tokens.total, - }, - toolCalls: - toolCalls.length > 0 - ? { - list: toolCalls, - count: toolCalls.length, - } - : undefined, - providerTiming: { - startTime: providerStartTimeISO, - endTime: new Date().toISOString(), - duration: Date.now() - providerStartTime, - modelTime: modelTime, - toolsTime: toolsTime, - firstResponseTime: firstResponseTime, - iterations: iterationCount + 1, - timeSegments: timeSegments, - }, - cost: { - input: accumulatedCost.input, - output: accumulatedCost.output, - total: accumulatedCost.total, - }, - }, - logs: [], - metadata: { - startTime: providerStartTimeISO, - endTime: new Date().toISOString(), - duration: Date.now() - providerStartTime, - }, - }, - } as StreamingExecution - - return streamingResult as StreamingExecution - } - - const providerEndTime = Date.now() - const providerEndTimeISO = new Date(providerEndTime).toISOString() - const totalDuration = providerEndTime - providerStartTime - - return { - content, - model: request.model, - tokens, - toolCalls: toolCalls.length > 0 ? toolCalls : undefined, - toolResults: toolResults.length > 0 ? toolResults : undefined, - timing: { - startTime: providerStartTimeISO, - endTime: providerEndTimeISO, - duration: totalDuration, - modelTime: modelTime, - toolsTime: toolsTime, - firstResponseTime: firstResponseTime, - iterations: iterationCount + 1, - timeSegments: timeSegments, - }, - } - } catch (error) { - const providerEndTime = Date.now() - const providerEndTimeISO = new Date(providerEndTime).toISOString() - const totalDuration = providerEndTime - providerStartTime - - logger.error('Error in Azure OpenAI request:', { - error, - duration: totalDuration, - }) - - const enhancedError = new Error(error instanceof Error ? error.message : String(error)) - // @ts-ignore - enhancedError.timing = { - startTime: providerStartTimeISO, - endTime: providerEndTimeISO, - duration: totalDuration, - } - - throw enhancedError - } + return executeResponsesProviderRequest(request, { + providerId: 'azure-openai', + providerLabel: 'Azure OpenAI', + modelName: deploymentName, + endpoint: apiUrl, + headers: { + 'Content-Type': 'application/json', + 'OpenAI-Beta': 'responses=v1', + 'api-key': request.apiKey, + }, + logger, + }) }, } diff --git a/apps/sim/providers/azure-openai/utils.ts b/apps/sim/providers/azure-openai/utils.ts deleted file mode 100644 index a3d12fa96..000000000 --- a/apps/sim/providers/azure-openai/utils.ts +++ /dev/null @@ -1,37 +0,0 @@ -import type { Logger } from '@sim/logger' -import type { ChatCompletionChunk } from 'openai/resources/chat/completions' -import type { CompletionUsage } from 'openai/resources/completions' -import type { Stream } from 'openai/streaming' -import { checkForForcedToolUsageOpenAI, createOpenAICompatibleStream } from '@/providers/utils' - -/** - * Creates a ReadableStream from an Azure OpenAI streaming response. - * Uses the shared OpenAI-compatible streaming utility. - */ -export function createReadableStreamFromAzureOpenAIStream( - azureOpenAIStream: Stream, - onComplete?: (content: string, usage: CompletionUsage) => void -): ReadableStream { - return createOpenAICompatibleStream(azureOpenAIStream, 'Azure OpenAI', onComplete) -} - -/** - * Checks if a forced tool was used in an Azure OpenAI response. - * Uses the shared OpenAI-compatible forced tool usage helper. - */ -export function checkForForcedToolUsage( - response: any, - toolChoice: string | { type: string; function?: { name: string }; name?: string; any?: any }, - _logger: Logger, - forcedTools: string[], - usedForcedTools: string[] -): { hasUsedForcedTool: boolean; usedForcedTools: string[] } { - return checkForForcedToolUsageOpenAI( - response, - toolChoice, - 'Azure OpenAI', - forcedTools, - usedForcedTools, - _logger - ) -} diff --git a/apps/sim/providers/openai/core.ts b/apps/sim/providers/openai/core.ts new file mode 100644 index 000000000..8ed4c9386 --- /dev/null +++ b/apps/sim/providers/openai/core.ts @@ -0,0 +1,807 @@ +import type { Logger } from '@sim/logger' +import type { StreamingExecution } from '@/executor/types' +import { MAX_TOOL_ITERATIONS } from '@/providers' +import type { Message, ProviderRequest, ProviderResponse, TimeSegment } from '@/providers/types' +import { + calculateCost, + prepareToolExecution, + prepareToolsWithUsageControl, + trackForcedToolUsage, +} from '@/providers/utils' +import { executeTool } from '@/tools' +import { + buildResponsesInputFromMessages, + convertResponseOutputToInputItems, + convertToolsToResponses, + createReadableStreamFromResponses, + extractResponseText, + extractResponseToolCalls, + parseResponsesUsage, + type ResponsesInputItem, + type ResponsesToolCall, + toResponsesToolChoice, +} from './utils' + +type PreparedTools = ReturnType +type ToolChoice = PreparedTools['toolChoice'] + +/** + * Recursively enforces OpenAI strict mode requirements on a JSON schema. + * - Sets additionalProperties: false on all object types. + * - Ensures required includes ALL property keys. + */ +function enforceStrictSchema(schema: any): any { + if (!schema || typeof schema !== 'object') return schema + + const result = { ...schema } + + // If this is an object type, enforce strict requirements + if (result.type === 'object') { + result.additionalProperties = false + + // Recursively process properties and ensure required includes all keys + if (result.properties && typeof result.properties === 'object') { + const propKeys = Object.keys(result.properties) + result.required = propKeys // Strict mode requires ALL properties + result.properties = Object.fromEntries( + Object.entries(result.properties).map(([key, value]) => [key, enforceStrictSchema(value)]) + ) + } + } + + // Handle array items + if (result.type === 'array' && result.items) { + result.items = enforceStrictSchema(result.items) + } + + // Handle anyOf, oneOf, allOf + for (const keyword of ['anyOf', 'oneOf', 'allOf']) { + if (Array.isArray(result[keyword])) { + result[keyword] = result[keyword].map(enforceStrictSchema) + } + } + + // Handle $defs / definitions + for (const defKey of ['$defs', 'definitions']) { + if (result[defKey] && typeof result[defKey] === 'object') { + result[defKey] = Object.fromEntries( + Object.entries(result[defKey]).map(([key, value]) => [key, enforceStrictSchema(value)]) + ) + } + } + + return result +} + +export interface ResponsesProviderConfig { + providerId: string + providerLabel: string + modelName: string + endpoint: string + headers: Record + logger: Logger +} + +/** + * Executes a Responses API request with tool-loop handling and streaming support. + */ +export async function executeResponsesProviderRequest( + request: ProviderRequest, + config: ResponsesProviderConfig +): Promise { + const { logger } = config + + logger.info(`Preparing ${config.providerLabel} request`, { + model: request.model, + hasSystemPrompt: !!request.systemPrompt, + hasMessages: !!request.messages?.length, + hasTools: !!request.tools?.length, + toolCount: request.tools?.length || 0, + hasResponseFormat: !!request.responseFormat, + stream: !!request.stream, + }) + + const allMessages: Message[] = [] + + if (request.systemPrompt) { + allMessages.push({ + role: 'system', + content: request.systemPrompt, + }) + } + + if (request.context) { + allMessages.push({ + role: 'user', + content: request.context, + }) + } + + if (request.messages) { + allMessages.push(...request.messages) + } + + const initialInput = buildResponsesInputFromMessages(allMessages) + + const basePayload: Record = { + model: config.modelName, + } + + if (request.temperature !== undefined) basePayload.temperature = request.temperature + if (request.maxTokens != null) basePayload.max_output_tokens = request.maxTokens + + if (request.reasoningEffort !== undefined) { + basePayload.reasoning = { + effort: request.reasoningEffort, + summary: 'auto', + } + } + + if (request.verbosity !== undefined) { + basePayload.text = { + ...(basePayload.text ?? {}), + verbosity: request.verbosity, + } + } + + // Store response format config - for Azure with tools, we defer applying it until after tool calls complete + let deferredTextFormat: { type: string; name: string; schema: any; strict: boolean } | undefined + const hasTools = !!request.tools?.length + const isAzure = config.providerId === 'azure-openai' + + if (request.responseFormat) { + const isStrict = request.responseFormat.strict !== false + const rawSchema = request.responseFormat.schema || request.responseFormat + // OpenAI strict mode requires additionalProperties: false on ALL nested objects + const cleanedSchema = isStrict ? enforceStrictSchema(rawSchema) : rawSchema + + const textFormat = { + type: 'json_schema' as const, + name: request.responseFormat.name || 'response_schema', + schema: cleanedSchema, + strict: isStrict, + } + + // Azure OpenAI has issues combining tools + response_format in the same request + // Defer the format until after tool calls complete for Azure + if (isAzure && hasTools) { + deferredTextFormat = textFormat + logger.info( + `Deferring JSON schema response format for ${config.providerLabel} (will apply after tool calls complete)` + ) + } else { + basePayload.text = { + ...(basePayload.text ?? {}), + format: textFormat, + } + logger.info(`Added JSON schema response format to ${config.providerLabel} request`) + } + } + + const tools = request.tools?.length + ? request.tools.map((tool) => ({ + type: 'function', + function: { + name: tool.id, + description: tool.description, + parameters: tool.parameters, + }, + })) + : undefined + + let preparedTools: PreparedTools | null = null + let responsesToolChoice: ReturnType | undefined + let trackingToolChoice: ToolChoice | undefined + + if (tools?.length) { + preparedTools = prepareToolsWithUsageControl(tools, request.tools, logger, config.providerId) + const { tools: filteredTools, toolChoice } = preparedTools + trackingToolChoice = toolChoice + + if (filteredTools?.length) { + const convertedTools = convertToolsToResponses(filteredTools) + if (!convertedTools.length) { + throw new Error('All tools have empty names') + } + + basePayload.tools = convertedTools + basePayload.parallel_tool_calls = true + } + + if (toolChoice) { + responsesToolChoice = toResponsesToolChoice(toolChoice) + if (responsesToolChoice) { + basePayload.tool_choice = responsesToolChoice + } + + logger.info(`${config.providerLabel} request configuration:`, { + toolCount: filteredTools?.length || 0, + toolChoice: + typeof toolChoice === 'string' + ? toolChoice + : toolChoice.type === 'function' + ? `force:${toolChoice.function?.name}` + : toolChoice.type === 'tool' + ? `force:${toolChoice.name}` + : toolChoice.type === 'any' + ? `force:${toolChoice.any?.name || 'unknown'}` + : 'unknown', + model: config.modelName, + }) + } + } + + const createRequestBody = (input: ResponsesInputItem[], overrides: Record = {}) => ({ + ...basePayload, + input, + ...overrides, + }) + + const parseErrorResponse = async (response: Response): Promise => { + const text = await response.text() + try { + const payload = JSON.parse(text) + return payload?.error?.message || text + } catch { + return text + } + } + + const postResponses = async (body: Record) => { + const response = await fetch(config.endpoint, { + method: 'POST', + headers: config.headers, + body: JSON.stringify(body), + }) + + if (!response.ok) { + const message = await parseErrorResponse(response) + throw new Error(`${config.providerLabel} API error (${response.status}): ${message}`) + } + + return response.json() + } + + const providerStartTime = Date.now() + const providerStartTimeISO = new Date(providerStartTime).toISOString() + + try { + if (request.stream && (!tools || tools.length === 0)) { + logger.info(`Using streaming response for ${config.providerLabel} request`) + + const streamResponse = await fetch(config.endpoint, { + method: 'POST', + headers: config.headers, + body: JSON.stringify(createRequestBody(initialInput, { stream: true })), + }) + + if (!streamResponse.ok) { + const message = await parseErrorResponse(streamResponse) + throw new Error(`${config.providerLabel} API error (${streamResponse.status}): ${message}`) + } + + const streamingResult = { + stream: createReadableStreamFromResponses(streamResponse, (content, usage) => { + streamingResult.execution.output.content = content + streamingResult.execution.output.tokens = { + input: usage?.promptTokens || 0, + output: usage?.completionTokens || 0, + total: usage?.totalTokens || 0, + } + + const costResult = calculateCost( + request.model, + usage?.promptTokens || 0, + usage?.completionTokens || 0 + ) + streamingResult.execution.output.cost = { + input: costResult.input, + output: costResult.output, + total: costResult.total, + } + + const streamEndTime = Date.now() + const streamEndTimeISO = new Date(streamEndTime).toISOString() + + if (streamingResult.execution.output.providerTiming) { + streamingResult.execution.output.providerTiming.endTime = streamEndTimeISO + streamingResult.execution.output.providerTiming.duration = + streamEndTime - providerStartTime + + if (streamingResult.execution.output.providerTiming.timeSegments?.[0]) { + streamingResult.execution.output.providerTiming.timeSegments[0].endTime = + streamEndTime + streamingResult.execution.output.providerTiming.timeSegments[0].duration = + streamEndTime - providerStartTime + } + } + }), + execution: { + success: true, + output: { + content: '', + model: request.model, + tokens: { input: 0, output: 0, total: 0 }, + toolCalls: undefined, + providerTiming: { + startTime: providerStartTimeISO, + endTime: new Date().toISOString(), + duration: Date.now() - providerStartTime, + timeSegments: [ + { + type: 'model', + name: 'Streaming response', + startTime: providerStartTime, + endTime: Date.now(), + duration: Date.now() - providerStartTime, + }, + ], + }, + cost: { input: 0, output: 0, total: 0 }, + }, + logs: [], + metadata: { + startTime: providerStartTimeISO, + endTime: new Date().toISOString(), + duration: Date.now() - providerStartTime, + }, + }, + } as StreamingExecution + + return streamingResult as StreamingExecution + } + + const initialCallTime = Date.now() + const forcedTools = preparedTools?.forcedTools || [] + let usedForcedTools: string[] = [] + let hasUsedForcedTool = false + let currentToolChoice = responsesToolChoice + let currentTrackingToolChoice = trackingToolChoice + + const checkForForcedToolUsage = ( + toolCallsInResponse: ResponsesToolCall[], + toolChoice: ToolChoice | undefined + ) => { + if (typeof toolChoice === 'object' && toolCallsInResponse.length > 0) { + const result = trackForcedToolUsage( + toolCallsInResponse, + toolChoice, + logger, + config.providerId, + forcedTools, + usedForcedTools + ) + hasUsedForcedTool = result.hasUsedForcedTool + usedForcedTools = result.usedForcedTools + } + } + + const currentInput: ResponsesInputItem[] = [...initialInput] + let currentResponse = await postResponses( + createRequestBody(currentInput, { tool_choice: currentToolChoice }) + ) + const firstResponseTime = Date.now() - initialCallTime + + const initialUsage = parseResponsesUsage(currentResponse.usage) + const tokens = { + input: initialUsage?.promptTokens || 0, + output: initialUsage?.completionTokens || 0, + total: initialUsage?.totalTokens || 0, + } + + const toolCalls = [] + const toolResults = [] + let iterationCount = 0 + let modelTime = firstResponseTime + let toolsTime = 0 + let content = extractResponseText(currentResponse.output) || '' + + const timeSegments: TimeSegment[] = [ + { + type: 'model', + name: 'Initial response', + startTime: initialCallTime, + endTime: initialCallTime + firstResponseTime, + duration: firstResponseTime, + }, + ] + + checkForForcedToolUsage( + extractResponseToolCalls(currentResponse.output), + currentTrackingToolChoice + ) + + while (iterationCount < MAX_TOOL_ITERATIONS) { + const responseText = extractResponseText(currentResponse.output) + if (responseText) { + content = responseText + } + + const toolCallsInResponse = extractResponseToolCalls(currentResponse.output) + if (!toolCallsInResponse.length) { + break + } + + const outputInputItems = convertResponseOutputToInputItems(currentResponse.output) + if (outputInputItems.length) { + currentInput.push(...outputInputItems) + } + + logger.info( + `Processing ${toolCallsInResponse.length} tool calls in parallel (iteration ${ + iterationCount + 1 + }/${MAX_TOOL_ITERATIONS})` + ) + + const toolsStartTime = Date.now() + + const toolExecutionPromises = toolCallsInResponse.map(async (toolCall) => { + const toolCallStartTime = Date.now() + const toolName = toolCall.name + + try { + const toolArgs = toolCall.arguments ? JSON.parse(toolCall.arguments) : {} + const tool = request.tools?.find((t) => t.id === toolName) + + if (!tool) { + return null + } + + const { toolParams, executionParams } = prepareToolExecution(tool, toolArgs, request) + const result = await executeTool(toolName, executionParams) + const toolCallEndTime = Date.now() + + return { + toolCall, + toolName, + toolParams, + result, + startTime: toolCallStartTime, + endTime: toolCallEndTime, + duration: toolCallEndTime - toolCallStartTime, + } + } catch (error) { + const toolCallEndTime = Date.now() + logger.error('Error processing tool call:', { error, toolName }) + + return { + toolCall, + toolName, + toolParams: {}, + result: { + success: false, + output: undefined, + error: error instanceof Error ? error.message : 'Tool execution failed', + }, + startTime: toolCallStartTime, + endTime: toolCallEndTime, + duration: toolCallEndTime - toolCallStartTime, + } + } + }) + + const executionResults = await Promise.allSettled(toolExecutionPromises) + + for (const settledResult of executionResults) { + if (settledResult.status === 'rejected' || !settledResult.value) continue + + const { toolCall, toolName, toolParams, result, startTime, endTime, duration } = + settledResult.value + + timeSegments.push({ + type: 'tool', + name: toolName, + startTime: startTime, + endTime: endTime, + duration: duration, + }) + + let resultContent: any + if (result.success) { + toolResults.push(result.output) + resultContent = result.output + } else { + resultContent = { + error: true, + message: result.error || 'Tool execution failed', + tool: toolName, + } + } + + toolCalls.push({ + name: toolName, + arguments: toolParams, + startTime: new Date(startTime).toISOString(), + endTime: new Date(endTime).toISOString(), + duration: duration, + result: resultContent, + success: result.success, + }) + + currentInput.push({ + type: 'function_call_output', + call_id: toolCall.id, + output: JSON.stringify(resultContent), + }) + } + + const thisToolsTime = Date.now() - toolsStartTime + toolsTime += thisToolsTime + + if (typeof currentToolChoice === 'object' && hasUsedForcedTool && forcedTools.length > 0) { + const remainingTools = forcedTools.filter((tool) => !usedForcedTools.includes(tool)) + + if (remainingTools.length > 0) { + currentToolChoice = { + type: 'function', + name: remainingTools[0], + } + currentTrackingToolChoice = { + type: 'function', + function: { name: remainingTools[0] }, + } + logger.info(`Forcing next tool: ${remainingTools[0]}`) + } else { + currentToolChoice = 'auto' + currentTrackingToolChoice = 'auto' + logger.info('All forced tools have been used, switching to auto tool_choice') + } + } + + const nextModelStartTime = Date.now() + + currentResponse = await postResponses( + createRequestBody(currentInput, { tool_choice: currentToolChoice }) + ) + + checkForForcedToolUsage( + extractResponseToolCalls(currentResponse.output), + currentTrackingToolChoice + ) + + const latestText = extractResponseText(currentResponse.output) + if (latestText) { + content = latestText + } + + const nextModelEndTime = Date.now() + const thisModelTime = nextModelEndTime - nextModelStartTime + + timeSegments.push({ + type: 'model', + name: `Model response (iteration ${iterationCount + 1})`, + startTime: nextModelStartTime, + endTime: nextModelEndTime, + duration: thisModelTime, + }) + + modelTime += thisModelTime + + const usage = parseResponsesUsage(currentResponse.usage) + if (usage) { + tokens.input += usage.promptTokens + tokens.output += usage.completionTokens + tokens.total += usage.totalTokens + } + + iterationCount++ + } + + // For Azure with deferred format: make a final call with the response format applied + // This happens whenever we have a deferred format, even if no tools were called + // (the initial call was made without the format, so we need to apply it now) + let appliedDeferredFormat = false + if (deferredTextFormat) { + logger.info( + `Applying deferred JSON schema response format for ${config.providerLabel} (iterationCount: ${iterationCount})` + ) + + const finalFormatStartTime = Date.now() + + // Determine what input to use for the formatted call + let formattedInput: ResponsesInputItem[] + + if (iterationCount > 0) { + // Tools were called - include the conversation history with tool results + const lastOutputItems = convertResponseOutputToInputItems(currentResponse.output) + if (lastOutputItems.length) { + currentInput.push(...lastOutputItems) + } + formattedInput = currentInput + } else { + // No tools were called - just retry the initial call with format applied + // Don't include the model's previous unformatted response + formattedInput = initialInput + } + + // Make final call with the response format - build payload without tools + const finalPayload: Record = { + model: config.modelName, + input: formattedInput, + text: { + ...(basePayload.text ?? {}), + format: deferredTextFormat, + }, + } + + // Copy over non-tool related settings + if (request.temperature !== undefined) finalPayload.temperature = request.temperature + if (request.maxTokens != null) finalPayload.max_output_tokens = request.maxTokens + if (request.reasoningEffort !== undefined) { + finalPayload.reasoning = { + effort: request.reasoningEffort, + summary: 'auto', + } + } + if (request.verbosity !== undefined) { + finalPayload.text = { + ...finalPayload.text, + verbosity: request.verbosity, + } + } + + currentResponse = await postResponses(finalPayload) + + const finalFormatEndTime = Date.now() + const finalFormatDuration = finalFormatEndTime - finalFormatStartTime + + timeSegments.push({ + type: 'model', + name: 'Final formatted response', + startTime: finalFormatStartTime, + endTime: finalFormatEndTime, + duration: finalFormatDuration, + }) + + modelTime += finalFormatDuration + + const finalUsage = parseResponsesUsage(currentResponse.usage) + if (finalUsage) { + tokens.input += finalUsage.promptTokens + tokens.output += finalUsage.completionTokens + tokens.total += finalUsage.totalTokens + } + + // Update content with the formatted response + const formattedText = extractResponseText(currentResponse.output) + if (formattedText) { + content = formattedText + } + + appliedDeferredFormat = true + } + + // Skip streaming if we already applied deferred format - we have the formatted content + // Making another streaming call would lose the formatted response + if (request.stream && !appliedDeferredFormat) { + logger.info('Using streaming for final response after tool processing') + + const accumulatedCost = calculateCost(request.model, tokens.input, tokens.output) + + // For Azure with deferred format in streaming mode, include the format in the streaming call + const streamOverrides: Record = { stream: true, tool_choice: 'auto' } + if (deferredTextFormat) { + streamOverrides.text = { + ...(basePayload.text ?? {}), + format: deferredTextFormat, + } + } + + const streamResponse = await fetch(config.endpoint, { + method: 'POST', + headers: config.headers, + body: JSON.stringify(createRequestBody(currentInput, streamOverrides)), + }) + + if (!streamResponse.ok) { + const message = await parseErrorResponse(streamResponse) + throw new Error(`${config.providerLabel} API error (${streamResponse.status}): ${message}`) + } + + const streamingResult = { + stream: createReadableStreamFromResponses(streamResponse, (content, usage) => { + streamingResult.execution.output.content = content + streamingResult.execution.output.tokens = { + input: tokens.input + (usage?.promptTokens || 0), + output: tokens.output + (usage?.completionTokens || 0), + total: tokens.total + (usage?.totalTokens || 0), + } + + const streamCost = calculateCost( + request.model, + usage?.promptTokens || 0, + usage?.completionTokens || 0 + ) + streamingResult.execution.output.cost = { + input: accumulatedCost.input + streamCost.input, + output: accumulatedCost.output + streamCost.output, + total: accumulatedCost.total + streamCost.total, + } + }), + execution: { + success: true, + output: { + content: '', + model: request.model, + tokens: { + input: tokens.input, + output: tokens.output, + total: tokens.total, + }, + toolCalls: + toolCalls.length > 0 + ? { + list: toolCalls, + count: toolCalls.length, + } + : undefined, + providerTiming: { + startTime: providerStartTimeISO, + endTime: new Date().toISOString(), + duration: Date.now() - providerStartTime, + modelTime: modelTime, + toolsTime: toolsTime, + firstResponseTime: firstResponseTime, + iterations: iterationCount + 1, + timeSegments: timeSegments, + }, + cost: { + input: accumulatedCost.input, + output: accumulatedCost.output, + total: accumulatedCost.total, + }, + }, + logs: [], + metadata: { + startTime: providerStartTimeISO, + endTime: new Date().toISOString(), + duration: Date.now() - providerStartTime, + }, + }, + } as StreamingExecution + + return streamingResult as StreamingExecution + } + + const providerEndTime = Date.now() + const providerEndTimeISO = new Date(providerEndTime).toISOString() + const totalDuration = providerEndTime - providerStartTime + + return { + content, + model: request.model, + tokens, + toolCalls: toolCalls.length > 0 ? toolCalls : undefined, + toolResults: toolResults.length > 0 ? toolResults : undefined, + timing: { + startTime: providerStartTimeISO, + endTime: providerEndTimeISO, + duration: totalDuration, + modelTime: modelTime, + toolsTime: toolsTime, + firstResponseTime: firstResponseTime, + iterations: iterationCount + 1, + timeSegments: timeSegments, + }, + } + } catch (error) { + const providerEndTime = Date.now() + const providerEndTimeISO = new Date(providerEndTime).toISOString() + const totalDuration = providerEndTime - providerStartTime + + logger.error(`Error in ${config.providerLabel} request:`, { + error, + duration: totalDuration, + }) + + const enhancedError = new Error(error instanceof Error ? error.message : String(error)) + // @ts-ignore - Adding timing property to the error + enhancedError.timing = { + startTime: providerStartTimeISO, + endTime: providerEndTimeISO, + duration: totalDuration, + } + + throw enhancedError + } +} diff --git a/apps/sim/providers/openai/index.ts b/apps/sim/providers/openai/index.ts index b2cecfceb..db95598e8 100644 --- a/apps/sim/providers/openai/index.ts +++ b/apps/sim/providers/openai/index.ts @@ -1,25 +1,11 @@ import { createLogger } from '@sim/logger' -import OpenAI from 'openai' -import type { ChatCompletionCreateParamsStreaming } from 'openai/resources/chat/completions' import type { StreamingExecution } from '@/executor/types' -import { MAX_TOOL_ITERATIONS } from '@/providers' import { getProviderDefaultModel, getProviderModels } from '@/providers/models' -import { createReadableStreamFromOpenAIStream } from '@/providers/openai/utils' -import type { - ProviderConfig, - ProviderRequest, - ProviderResponse, - TimeSegment, -} from '@/providers/types' -import { - calculateCost, - prepareToolExecution, - prepareToolsWithUsageControl, - trackForcedToolUsage, -} from '@/providers/utils' -import { executeTool } from '@/tools' +import type { ProviderConfig, ProviderRequest, ProviderResponse } from '@/providers/types' +import { executeResponsesProviderRequest } from './core' const logger = createLogger('OpenAIProvider') +const responsesEndpoint = 'https://api.openai.com/v1/responses' export const openaiProvider: ProviderConfig = { id: 'openai', @@ -32,534 +18,21 @@ export const openaiProvider: ProviderConfig = { executeRequest: async ( request: ProviderRequest ): Promise => { - logger.info('Preparing OpenAI request', { - model: request.model, - hasSystemPrompt: !!request.systemPrompt, - hasMessages: !!request.messages?.length, - hasTools: !!request.tools?.length, - toolCount: request.tools?.length || 0, - hasResponseFormat: !!request.responseFormat, - stream: !!request.stream, + if (!request.apiKey) { + throw new Error('API key is required for OpenAI') + } + + return executeResponsesProviderRequest(request, { + providerId: 'openai', + providerLabel: 'OpenAI', + modelName: request.model, + endpoint: responsesEndpoint, + headers: { + Authorization: `Bearer ${request.apiKey}`, + 'Content-Type': 'application/json', + 'OpenAI-Beta': 'responses=v1', + }, + logger, }) - - const openai = new OpenAI({ apiKey: request.apiKey }) - - const allMessages = [] - - if (request.systemPrompt) { - allMessages.push({ - role: 'system', - content: request.systemPrompt, - }) - } - - if (request.context) { - allMessages.push({ - role: 'user', - content: request.context, - }) - } - - if (request.messages) { - allMessages.push(...request.messages) - } - - const tools = request.tools?.length - ? request.tools.map((tool) => ({ - type: 'function', - function: { - name: tool.id, - description: tool.description, - parameters: tool.parameters, - }, - })) - : undefined - - const payload: any = { - model: request.model, - messages: allMessages, - } - - if (request.temperature !== undefined) payload.temperature = request.temperature - if (request.maxTokens != null) payload.max_completion_tokens = request.maxTokens - - if (request.reasoningEffort !== undefined) payload.reasoning_effort = request.reasoningEffort - if (request.verbosity !== undefined) payload.verbosity = request.verbosity - - if (request.responseFormat) { - payload.response_format = { - type: 'json_schema', - json_schema: { - name: request.responseFormat.name || 'response_schema', - schema: request.responseFormat.schema || request.responseFormat, - strict: request.responseFormat.strict !== false, - }, - } - - logger.info('Added JSON schema response format to request') - } - - let preparedTools: ReturnType | null = null - - if (tools?.length) { - preparedTools = prepareToolsWithUsageControl(tools, request.tools, logger, 'openai') - const { tools: filteredTools, toolChoice } = preparedTools - - if (filteredTools?.length && toolChoice) { - payload.tools = filteredTools - payload.tool_choice = toolChoice - - logger.info('OpenAI request configuration:', { - toolCount: filteredTools.length, - toolChoice: - typeof toolChoice === 'string' - ? toolChoice - : toolChoice.type === 'function' - ? `force:${toolChoice.function.name}` - : toolChoice.type === 'tool' - ? `force:${toolChoice.name}` - : toolChoice.type === 'any' - ? `force:${toolChoice.any?.name || 'unknown'}` - : 'unknown', - model: request.model, - }) - } - } - - const providerStartTime = Date.now() - const providerStartTimeISO = new Date(providerStartTime).toISOString() - - try { - if (request.stream && (!tools || tools.length === 0)) { - logger.info('Using streaming response for OpenAI request') - - const streamingParams: ChatCompletionCreateParamsStreaming = { - ...payload, - stream: true, - stream_options: { include_usage: true }, - } - const streamResponse = await openai.chat.completions.create(streamingParams) - - const streamingResult = { - stream: createReadableStreamFromOpenAIStream(streamResponse, (content, usage) => { - streamingResult.execution.output.content = content - streamingResult.execution.output.tokens = { - input: usage.prompt_tokens, - output: usage.completion_tokens, - total: usage.total_tokens, - } - - const costResult = calculateCost( - request.model, - usage.prompt_tokens, - usage.completion_tokens - ) - streamingResult.execution.output.cost = { - input: costResult.input, - output: costResult.output, - total: costResult.total, - } - - const streamEndTime = Date.now() - const streamEndTimeISO = new Date(streamEndTime).toISOString() - - if (streamingResult.execution.output.providerTiming) { - streamingResult.execution.output.providerTiming.endTime = streamEndTimeISO - streamingResult.execution.output.providerTiming.duration = - streamEndTime - providerStartTime - - if (streamingResult.execution.output.providerTiming.timeSegments?.[0]) { - streamingResult.execution.output.providerTiming.timeSegments[0].endTime = - streamEndTime - streamingResult.execution.output.providerTiming.timeSegments[0].duration = - streamEndTime - providerStartTime - } - } - }), - execution: { - success: true, - output: { - content: '', - model: request.model, - tokens: { input: 0, output: 0, total: 0 }, - toolCalls: undefined, - providerTiming: { - startTime: providerStartTimeISO, - endTime: new Date().toISOString(), - duration: Date.now() - providerStartTime, - timeSegments: [ - { - type: 'model', - name: 'Streaming response', - startTime: providerStartTime, - endTime: Date.now(), - duration: Date.now() - providerStartTime, - }, - ], - }, - cost: { input: 0, output: 0, total: 0 }, - }, - logs: [], - metadata: { - startTime: providerStartTimeISO, - endTime: new Date().toISOString(), - duration: Date.now() - providerStartTime, - }, - }, - } as StreamingExecution - - return streamingResult as StreamingExecution - } - - const initialCallTime = Date.now() - - const originalToolChoice = payload.tool_choice - - const forcedTools = preparedTools?.forcedTools || [] - let usedForcedTools: string[] = [] - - /** - * Helper function to check for forced tool usage in responses - */ - const checkForForcedToolUsage = ( - response: any, - toolChoice: string | { type: string; function?: { name: string }; name?: string; any?: any } - ) => { - if (typeof toolChoice === 'object' && response.choices[0]?.message?.tool_calls) { - const toolCallsResponse = response.choices[0].message.tool_calls - const result = trackForcedToolUsage( - toolCallsResponse, - toolChoice, - logger, - 'openai', - forcedTools, - usedForcedTools - ) - hasUsedForcedTool = result.hasUsedForcedTool - usedForcedTools = result.usedForcedTools - } - } - - let currentResponse = await openai.chat.completions.create(payload) - const firstResponseTime = Date.now() - initialCallTime - - let content = currentResponse.choices[0]?.message?.content || '' - const tokens = { - input: currentResponse.usage?.prompt_tokens || 0, - output: currentResponse.usage?.completion_tokens || 0, - total: currentResponse.usage?.total_tokens || 0, - } - const toolCalls = [] - const toolResults = [] - const currentMessages = [...allMessages] - let iterationCount = 0 - - let modelTime = firstResponseTime - let toolsTime = 0 - - let hasUsedForcedTool = false - - const timeSegments: TimeSegment[] = [ - { - type: 'model', - name: 'Initial response', - startTime: initialCallTime, - endTime: initialCallTime + firstResponseTime, - duration: firstResponseTime, - }, - ] - - checkForForcedToolUsage(currentResponse, originalToolChoice) - - while (iterationCount < MAX_TOOL_ITERATIONS) { - if (currentResponse.choices[0]?.message?.content) { - content = currentResponse.choices[0].message.content - } - - const toolCallsInResponse = currentResponse.choices[0]?.message?.tool_calls - if (!toolCallsInResponse || toolCallsInResponse.length === 0) { - break - } - - logger.info( - `Processing ${toolCallsInResponse.length} tool calls in parallel (iteration ${iterationCount + 1}/${MAX_TOOL_ITERATIONS})` - ) - - const toolsStartTime = Date.now() - - const toolExecutionPromises = toolCallsInResponse.map(async (toolCall) => { - const toolCallStartTime = Date.now() - const toolName = toolCall.function.name - - try { - const toolArgs = JSON.parse(toolCall.function.arguments) - const tool = request.tools?.find((t) => t.id === toolName) - - if (!tool) { - return null - } - - const { toolParams, executionParams } = prepareToolExecution(tool, toolArgs, request) - const result = await executeTool(toolName, executionParams) - const toolCallEndTime = Date.now() - - return { - toolCall, - toolName, - toolParams, - result, - startTime: toolCallStartTime, - endTime: toolCallEndTime, - duration: toolCallEndTime - toolCallStartTime, - } - } catch (error) { - const toolCallEndTime = Date.now() - logger.error('Error processing tool call:', { error, toolName }) - - return { - toolCall, - toolName, - toolParams: {}, - result: { - success: false, - output: undefined, - error: error instanceof Error ? error.message : 'Tool execution failed', - }, - startTime: toolCallStartTime, - endTime: toolCallEndTime, - duration: toolCallEndTime - toolCallStartTime, - } - } - }) - - const executionResults = await Promise.allSettled(toolExecutionPromises) - - currentMessages.push({ - role: 'assistant', - content: null, - tool_calls: toolCallsInResponse.map((tc) => ({ - id: tc.id, - type: 'function', - function: { - name: tc.function.name, - arguments: tc.function.arguments, - }, - })), - }) - - for (const settledResult of executionResults) { - if (settledResult.status === 'rejected' || !settledResult.value) continue - - const { toolCall, toolName, toolParams, result, startTime, endTime, duration } = - settledResult.value - - timeSegments.push({ - type: 'tool', - name: toolName, - startTime: startTime, - endTime: endTime, - duration: duration, - }) - - let resultContent: any - if (result.success) { - toolResults.push(result.output) - resultContent = result.output - } else { - resultContent = { - error: true, - message: result.error || 'Tool execution failed', - tool: toolName, - } - } - - toolCalls.push({ - name: toolName, - arguments: toolParams, - startTime: new Date(startTime).toISOString(), - endTime: new Date(endTime).toISOString(), - duration: duration, - result: resultContent, - success: result.success, - }) - - currentMessages.push({ - role: 'tool', - tool_call_id: toolCall.id, - content: JSON.stringify(resultContent), - }) - } - - const thisToolsTime = Date.now() - toolsStartTime - toolsTime += thisToolsTime - - const nextPayload = { - ...payload, - messages: currentMessages, - } - - if (typeof originalToolChoice === 'object' && hasUsedForcedTool && forcedTools.length > 0) { - const remainingTools = forcedTools.filter((tool) => !usedForcedTools.includes(tool)) - - if (remainingTools.length > 0) { - nextPayload.tool_choice = { - type: 'function', - function: { name: remainingTools[0] }, - } - logger.info(`Forcing next tool: ${remainingTools[0]}`) - } else { - nextPayload.tool_choice = 'auto' - logger.info('All forced tools have been used, switching to auto tool_choice') - } - } - - const nextModelStartTime = Date.now() - - currentResponse = await openai.chat.completions.create(nextPayload) - - checkForForcedToolUsage(currentResponse, nextPayload.tool_choice) - - const nextModelEndTime = Date.now() - const thisModelTime = nextModelEndTime - nextModelStartTime - - timeSegments.push({ - type: 'model', - name: `Model response (iteration ${iterationCount + 1})`, - startTime: nextModelStartTime, - endTime: nextModelEndTime, - duration: thisModelTime, - }) - - modelTime += thisModelTime - - if (currentResponse.usage) { - tokens.input += currentResponse.usage.prompt_tokens || 0 - tokens.output += currentResponse.usage.completion_tokens || 0 - tokens.total += currentResponse.usage.total_tokens || 0 - } - - iterationCount++ - } - - if (request.stream) { - logger.info('Using streaming for final response after tool processing') - - const accumulatedCost = calculateCost(request.model, tokens.input, tokens.output) - - const streamingParams: ChatCompletionCreateParamsStreaming = { - ...payload, - messages: currentMessages, - tool_choice: 'auto', - stream: true, - stream_options: { include_usage: true }, - } - const streamResponse = await openai.chat.completions.create(streamingParams) - - const streamingResult = { - stream: createReadableStreamFromOpenAIStream(streamResponse, (content, usage) => { - streamingResult.execution.output.content = content - streamingResult.execution.output.tokens = { - input: tokens.input + usage.prompt_tokens, - output: tokens.output + usage.completion_tokens, - total: tokens.total + usage.total_tokens, - } - - const streamCost = calculateCost( - request.model, - usage.prompt_tokens, - usage.completion_tokens - ) - streamingResult.execution.output.cost = { - input: accumulatedCost.input + streamCost.input, - output: accumulatedCost.output + streamCost.output, - total: accumulatedCost.total + streamCost.total, - } - }), - execution: { - success: true, - output: { - content: '', - model: request.model, - tokens: { - input: tokens.input, - output: tokens.output, - total: tokens.total, - }, - toolCalls: - toolCalls.length > 0 - ? { - list: toolCalls, - count: toolCalls.length, - } - : undefined, - providerTiming: { - startTime: providerStartTimeISO, - endTime: new Date().toISOString(), - duration: Date.now() - providerStartTime, - modelTime: modelTime, - toolsTime: toolsTime, - firstResponseTime: firstResponseTime, - iterations: iterationCount + 1, - timeSegments: timeSegments, - }, - cost: { - input: accumulatedCost.input, - output: accumulatedCost.output, - total: accumulatedCost.total, - }, - }, - logs: [], - metadata: { - startTime: providerStartTimeISO, - endTime: new Date().toISOString(), - duration: Date.now() - providerStartTime, - }, - }, - } as StreamingExecution - - return streamingResult as StreamingExecution - } - - const providerEndTime = Date.now() - const providerEndTimeISO = new Date(providerEndTime).toISOString() - const totalDuration = providerEndTime - providerStartTime - - return { - content, - model: request.model, - tokens, - toolCalls: toolCalls.length > 0 ? toolCalls : undefined, - toolResults: toolResults.length > 0 ? toolResults : undefined, - timing: { - startTime: providerStartTimeISO, - endTime: providerEndTimeISO, - duration: totalDuration, - modelTime: modelTime, - toolsTime: toolsTime, - firstResponseTime: firstResponseTime, - iterations: iterationCount + 1, - timeSegments: timeSegments, - }, - } - } catch (error) { - const providerEndTime = Date.now() - const providerEndTimeISO = new Date(providerEndTime).toISOString() - const totalDuration = providerEndTime - providerStartTime - - logger.error('Error in OpenAI request:', { - error, - duration: totalDuration, - }) - - const enhancedError = new Error(error instanceof Error ? error.message : String(error)) - // @ts-ignore - Adding timing property to the error - enhancedError.timing = { - startTime: providerStartTimeISO, - endTime: providerEndTimeISO, - duration: totalDuration, - } - - throw enhancedError - } }, } diff --git a/apps/sim/providers/openai/utils.ts b/apps/sim/providers/openai/utils.ts index bd04d601c..664c0d8fc 100644 --- a/apps/sim/providers/openai/utils.ts +++ b/apps/sim/providers/openai/utils.ts @@ -1,15 +1,465 @@ -import type { ChatCompletionChunk } from 'openai/resources/chat/completions' -import type { CompletionUsage } from 'openai/resources/completions' -import type { Stream } from 'openai/streaming' -import { createOpenAICompatibleStream } from '@/providers/utils' +import { createLogger } from '@sim/logger' +import type { Message } from '@/providers/types' + +const logger = createLogger('ResponsesUtils') + +export interface ResponsesUsageTokens { + promptTokens: number + completionTokens: number + totalTokens: number + cachedTokens: number + reasoningTokens: number +} + +export interface ResponsesToolCall { + id: string + name: string + arguments: string +} + +export type ResponsesInputItem = + | { + role: 'system' | 'user' | 'assistant' + content: string + } + | { + type: 'function_call' + call_id: string + name: string + arguments: string + } + | { + type: 'function_call_output' + call_id: string + output: string + } + +export interface ResponsesToolDefinition { + type: 'function' + name: string + description?: string + parameters?: Record +} /** - * Creates a ReadableStream from an OpenAI streaming response. - * Uses the shared OpenAI-compatible streaming utility. + * Converts chat-style messages into Responses API input items. */ -export function createReadableStreamFromOpenAIStream( - openaiStream: Stream, - onComplete?: (content: string, usage: CompletionUsage) => void -): ReadableStream { - return createOpenAICompatibleStream(openaiStream, 'OpenAI', onComplete) +export function buildResponsesInputFromMessages(messages: Message[]): ResponsesInputItem[] { + const input: ResponsesInputItem[] = [] + + for (const message of messages) { + if (message.role === 'tool' && message.tool_call_id) { + input.push({ + type: 'function_call_output', + call_id: message.tool_call_id, + output: message.content ?? '', + }) + continue + } + + if ( + message.content && + (message.role === 'system' || message.role === 'user' || message.role === 'assistant') + ) { + input.push({ + role: message.role, + content: message.content, + }) + } + + if (message.tool_calls?.length) { + for (const toolCall of message.tool_calls) { + input.push({ + type: 'function_call', + call_id: toolCall.id, + name: toolCall.function.name, + arguments: toolCall.function.arguments, + }) + } + } + } + + return input +} + +/** + * Converts tool definitions to the Responses API format. + */ +export function convertToolsToResponses(tools: any[]): ResponsesToolDefinition[] { + return tools + .map((tool) => { + const name = tool.function?.name ?? tool.name + if (!name) { + return null + } + + return { + type: 'function' as const, + name, + description: tool.function?.description ?? tool.description, + parameters: tool.function?.parameters ?? tool.parameters, + } + }) + .filter(Boolean) as ResponsesToolDefinition[] +} + +/** + * Converts tool_choice to the Responses API format. + */ +export function toResponsesToolChoice( + toolChoice: + | 'auto' + | 'none' + | { type: 'function'; function?: { name: string }; name?: string } + | { type: 'tool'; name: string } + | { type: 'any'; any: { model: string; name: string } } + | undefined +): 'auto' | 'none' | { type: 'function'; name: string } | undefined { + if (!toolChoice) { + return undefined + } + + if (typeof toolChoice === 'string') { + return toolChoice + } + + if (toolChoice.type === 'function') { + const name = toolChoice.name ?? toolChoice.function?.name + return name ? { type: 'function', name } : undefined + } + + return 'auto' +} + +function extractTextFromMessageItem(item: any): string { + if (!item) { + return '' + } + + if (typeof item.content === 'string') { + return item.content + } + + if (!Array.isArray(item.content)) { + return '' + } + + const textParts: string[] = [] + for (const part of item.content) { + if (!part || typeof part !== 'object') { + continue + } + + if ((part.type === 'output_text' || part.type === 'text') && typeof part.text === 'string') { + textParts.push(part.text) + continue + } + + if (part.type === 'output_json') { + if (typeof part.text === 'string') { + textParts.push(part.text) + } else if (part.json !== undefined) { + textParts.push(JSON.stringify(part.json)) + } + } + } + + return textParts.join('') +} + +/** + * Extracts plain text from Responses API output items. + */ +export function extractResponseText(output: unknown): string { + if (!Array.isArray(output)) { + return '' + } + + const textParts: string[] = [] + for (const item of output) { + if (item?.type !== 'message') { + continue + } + + const text = extractTextFromMessageItem(item) + if (text) { + textParts.push(text) + } + } + + return textParts.join('') +} + +/** + * Converts Responses API output items into input items for subsequent calls. + */ +export function convertResponseOutputToInputItems(output: unknown): ResponsesInputItem[] { + if (!Array.isArray(output)) { + return [] + } + + const items: ResponsesInputItem[] = [] + for (const item of output) { + if (!item || typeof item !== 'object') { + continue + } + + if (item.type === 'message') { + const text = extractTextFromMessageItem(item) + if (text) { + items.push({ + role: 'assistant', + content: text, + }) + } + + const toolCalls = Array.isArray(item.tool_calls) ? item.tool_calls : [] + for (const toolCall of toolCalls) { + const callId = toolCall?.id + const name = toolCall?.function?.name ?? toolCall?.name + if (!callId || !name) { + continue + } + + const argumentsValue = + typeof toolCall?.function?.arguments === 'string' + ? toolCall.function.arguments + : JSON.stringify(toolCall?.function?.arguments ?? {}) + + items.push({ + type: 'function_call', + call_id: callId, + name, + arguments: argumentsValue, + }) + } + + continue + } + + if (item.type === 'function_call') { + const callId = item.call_id ?? item.id + const name = item.name ?? item.function?.name + if (!callId || !name) { + continue + } + + const argumentsValue = + typeof item.arguments === 'string' ? item.arguments : JSON.stringify(item.arguments ?? {}) + + items.push({ + type: 'function_call', + call_id: callId, + name, + arguments: argumentsValue, + }) + } + } + + return items +} + +/** + * Extracts tool calls from Responses API output items. + */ +export function extractResponseToolCalls(output: unknown): ResponsesToolCall[] { + if (!Array.isArray(output)) { + return [] + } + + const toolCalls: ResponsesToolCall[] = [] + + for (const item of output) { + if (!item || typeof item !== 'object') { + continue + } + + if (item.type === 'function_call') { + const callId = item.call_id ?? item.id + const name = item.name ?? item.function?.name + if (!callId || !name) { + continue + } + + const argumentsValue = + typeof item.arguments === 'string' ? item.arguments : JSON.stringify(item.arguments ?? {}) + + toolCalls.push({ + id: callId, + name, + arguments: argumentsValue, + }) + continue + } + + if (item.type === 'message' && Array.isArray(item.tool_calls)) { + for (const toolCall of item.tool_calls) { + const callId = toolCall?.id + const name = toolCall?.function?.name ?? toolCall?.name + if (!callId || !name) { + continue + } + + const argumentsValue = + typeof toolCall?.function?.arguments === 'string' + ? toolCall.function.arguments + : JSON.stringify(toolCall?.function?.arguments ?? {}) + + toolCalls.push({ + id: callId, + name, + arguments: argumentsValue, + }) + } + } + } + + return toolCalls +} + +/** + * Maps Responses API usage data to prompt/completion token counts. + * + * Note: output_tokens is expected to include reasoning tokens; fall back to reasoning_tokens + * when output_tokens is missing or zero. + */ +export function parseResponsesUsage(usage: any): ResponsesUsageTokens | undefined { + if (!usage || typeof usage !== 'object') { + return undefined + } + + const inputTokens = Number(usage.input_tokens ?? 0) + const outputTokens = Number(usage.output_tokens ?? 0) + const cachedTokens = Number(usage.input_tokens_details?.cached_tokens ?? 0) + const reasoningTokens = Number(usage.output_tokens_details?.reasoning_tokens ?? 0) + const completionTokens = Math.max(outputTokens, reasoningTokens) + const totalTokens = inputTokens + completionTokens + + return { + promptTokens: inputTokens, + completionTokens, + totalTokens, + cachedTokens, + reasoningTokens, + } +} + +/** + * Creates a ReadableStream from a Responses API SSE stream. + */ +export function createReadableStreamFromResponses( + response: Response, + onComplete?: (content: string, usage?: ResponsesUsageTokens) => void +): ReadableStream { + let fullContent = '' + let finalUsage: ResponsesUsageTokens | undefined + let activeEventType: string | undefined + const encoder = new TextEncoder() + + return new ReadableStream({ + async start(controller) { + const reader = response.body?.getReader() + if (!reader) { + controller.close() + return + } + + const decoder = new TextDecoder() + let buffer = '' + + try { + while (true) { + const { done, value } = await reader.read() + if (done) { + break + } + + buffer += decoder.decode(value, { stream: true }) + const lines = buffer.split('\n') + buffer = lines.pop() || '' + + for (const line of lines) { + const trimmed = line.trim() + if (!trimmed) { + continue + } + + if (trimmed.startsWith('event:')) { + activeEventType = trimmed.slice(6).trim() + continue + } + + if (!trimmed.startsWith('data:')) { + continue + } + + const data = trimmed.slice(5).trim() + if (data === '[DONE]') { + continue + } + + let event: any + try { + event = JSON.parse(data) + } catch (error) { + logger.debug('Skipping non-JSON response stream chunk', { + data: data.slice(0, 200), + error, + }) + continue + } + + const eventType = event?.type ?? activeEventType + + if ( + eventType === 'response.error' || + eventType === 'error' || + eventType === 'response.failed' + ) { + const message = event?.error?.message || 'Responses API stream error' + controller.error(new Error(message)) + return + } + + if ( + eventType === 'response.output_text.delta' || + eventType === 'response.output_json.delta' + ) { + let deltaText = '' + if (typeof event.delta === 'string') { + deltaText = event.delta + } else if (event.delta && typeof event.delta.text === 'string') { + deltaText = event.delta.text + } else if (event.delta && event.delta.json !== undefined) { + deltaText = JSON.stringify(event.delta.json) + } else if (event.json !== undefined) { + deltaText = JSON.stringify(event.json) + } else if (typeof event.text === 'string') { + deltaText = event.text + } + + if (deltaText.length > 0) { + fullContent += deltaText + controller.enqueue(encoder.encode(deltaText)) + } + } + + if (eventType === 'response.completed') { + finalUsage = parseResponsesUsage(event?.response?.usage ?? event?.usage) + } + } + } + + if (onComplete) { + onComplete(fullContent, finalUsage) + } + + controller.close() + } catch (error) { + controller.error(error) + } finally { + reader.releaseLock() + } + }, + }) }