diff --git a/apps/sim/app/api/wand-generate/route.ts b/apps/sim/app/api/wand-generate/route.ts index 83707bd7c..bca08194e 100644 --- a/apps/sim/app/api/wand-generate/route.ts +++ b/apps/sim/app/api/wand-generate/route.ts @@ -119,74 +119,145 @@ export async function POST(req: NextRequest) { `[${requestId}] About to create stream with model: ${useWandAzure ? wandModelName : 'gpt-4o'}` ) - // Create the stream without AbortController for Node.js runtime compatibility - const streamCompletion = await client.chat.completions.create({ - model: useWandAzure ? wandModelName : 'gpt-4o', - messages: messages, - temperature: 0.3, - max_tokens: 10000, - stream: true, - stream_options: { include_usage: true }, + // Use native fetch for streaming to avoid OpenAI SDK issues with Node.js runtime + const apiUrl = useWandAzure + ? `${azureEndpoint}/openai/deployments/${wandModelName}/chat/completions?api-version=${azureApiVersion}` + : 'https://api.openai.com/v1/chat/completions' + + const headers: Record = { + 'Content-Type': 'application/json', + } + + if (useWandAzure) { + headers['api-key'] = azureApiKey! + } else { + headers.Authorization = `Bearer ${openaiApiKey}` + } + + logger.debug(`[${requestId}] Making streaming request to: ${apiUrl}`) + + const response = await fetch(apiUrl, { + method: 'POST', + headers, + body: JSON.stringify({ + model: useWandAzure ? wandModelName : 'gpt-4o', + messages: messages, + temperature: 0.3, + max_tokens: 10000, + stream: true, + stream_options: { include_usage: true }, + }), }) - logger.info(`[${requestId}] Stream created successfully, starting response`) + if (!response.ok) { + const errorText = await response.text() + logger.error(`[${requestId}] API request failed`, { + status: response.status, + statusText: response.statusText, + error: errorText, + }) + throw new Error(`API request failed: ${response.status} ${response.statusText}`) + } - // Create a TransformStream for Node.js runtime compatibility + logger.info(`[${requestId}] Stream response received, starting processing`) + + // Create a TransformStream to process the SSE data const encoder = new TextEncoder() + const decoder = new TextDecoder() + const readable = new ReadableStream({ async start(controller) { + const reader = response.body?.getReader() + if (!reader) { + controller.close() + return + } + try { - logger.info(`[${requestId}] Starting stream processing`) + let buffer = '' let chunkCount = 0 - for await (const chunk of streamCompletion) { - chunkCount++ + while (true) { + const { done, value } = await reader.read() - if (chunkCount === 1) { - logger.info(`[${requestId}] Received first chunk`) + if (done) { + logger.info(`[${requestId}] Stream completed. Total chunks: ${chunkCount}`) + controller.enqueue(encoder.encode(`data: ${JSON.stringify({ done: true })}\n\n`)) + controller.close() + break } - // Process the chunk - const content = chunk.choices?.[0]?.delta?.content || '' - if (content) { - // Send data in SSE format - const data = `data: ${JSON.stringify({ chunk: content })}\n\n` - controller.enqueue(encoder.encode(data)) - } + // Decode the chunk + buffer += decoder.decode(value, { stream: true }) - // Check for usage data - if (chunk.usage) { - logger.info(`[${requestId}] Received usage data: ${JSON.stringify(chunk.usage)}`) - } + // Process complete SSE messages + const lines = buffer.split('\n') + buffer = lines.pop() || '' // Keep incomplete line in buffer - // Log progress periodically - if (chunkCount % 10 === 0) { - logger.debug(`[${requestId}] Processed ${chunkCount} chunks`) + for (const line of lines) { + if (line.startsWith('data: ')) { + const data = line.slice(6).trim() + + if (data === '[DONE]') { + logger.info(`[${requestId}] Received [DONE] signal`) + controller.enqueue( + encoder.encode(`data: ${JSON.stringify({ done: true })}\n\n`) + ) + controller.close() + return + } + + try { + const parsed = JSON.parse(data) + const content = parsed.choices?.[0]?.delta?.content + + if (content) { + chunkCount++ + if (chunkCount === 1) { + logger.info(`[${requestId}] Received first content chunk`) + } + + // Forward the content + controller.enqueue( + encoder.encode(`data: ${JSON.stringify({ chunk: content })}\n\n`) + ) + } + + // Log usage if present + if (parsed.usage) { + logger.info( + `[${requestId}] Received usage data: ${JSON.stringify(parsed.usage)}` + ) + } + + // Log progress periodically + if (chunkCount % 10 === 0) { + logger.debug(`[${requestId}] Processed ${chunkCount} chunks`) + } + } catch (parseError) { + // Skip invalid JSON lines + logger.debug( + `[${requestId}] Skipped non-JSON line: ${data.substring(0, 100)}` + ) + } + } } } - logger.info(`[${requestId}] Stream completed. Total chunks: ${chunkCount}`) - - // Send completion signal - controller.enqueue(encoder.encode(`data: ${JSON.stringify({ done: true })}\n\n`)) - controller.close() - logger.info(`[${requestId}] Wand generation streaming completed successfully`) } catch (streamError: any) { logger.error(`[${requestId}] Streaming error`, { name: streamError?.name, message: streamError?.message || 'Unknown error', - code: streamError?.code, - status: streamError?.status, stack: streamError?.stack, - useWandAzure, - model: useWandAzure ? wandModelName : 'gpt-4o', }) // Send error to client const errorData = `data: ${JSON.stringify({ error: 'Streaming failed', done: true })}\n\n` controller.enqueue(encoder.encode(errorData)) controller.close() + } finally { + reader.releaseLock() } }, })