fix(streaming): fixed streaming by switching to edge runtime

This commit is contained in:
Waleed Latif
2025-04-07 15:24:12 -07:00
parent f5612faa54
commit dcb88e66bb
2 changed files with 156 additions and 160 deletions

View File

@@ -3,16 +3,19 @@ import OpenAI from 'openai'
import { createLogger } from '@/lib/logs/console-logger'
export const dynamic = 'force-dynamic'
export const runtime = 'nodejs'
export const runtime = 'edge'
export const maxDuration = 60
const logger = createLogger('GenerateCodeAPI')
let openai: OpenAI | null = null
if (process.env.OPENAI_API_KEY) {
openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY,
})
} else {
// Edge-optimized OpenAI client initialization
const openai = process.env.OPENAI_API_KEY
? new OpenAI({
apiKey: process.env.OPENAI_API_KEY,
})
: null
if (!process.env.OPENAI_API_KEY) {
logger.warn('OPENAI_API_KEY not found. Code generation API will not function.')
}
@@ -239,114 +242,99 @@ export async function POST(req: NextRequest) {
// For streaming responses
if (stream) {
const encoder = new TextEncoder()
const streamResponse = new TransformStream()
const writer = streamResponse.writable.getWriter()
try {
const streamCompletion = await openai!.chat.completions.create({
model: 'gpt-4o',
messages: messages,
temperature: 0.2,
max_tokens: 1500,
stream: true,
})
// Start streaming response
const streamOpenAI = async () => {
try {
const streamCompletion = await openai!.chat.completions.create({
// Use non-null assertion as openai is checked above
model: 'gpt-4o',
// Pass the constructed messages array
messages: messages,
temperature: 0.2,
max_tokens: 1500,
stream: true,
})
// Use ReadableStream for Edge runtime
return new Response(
new ReadableStream({
async start(controller) {
const encoder = new TextEncoder()
let fullContent = generationType === 'json-schema' ? '' : undefined
// Conditionally initialize fullContent only if needed for validation
let fullContent = generationType === 'json-schema' ? '' : undefined
// Process each chunk
for await (const chunk of streamCompletion) {
const content = chunk.choices[0]?.delta?.content || ''
if (content) {
// Only append if fullContent is defined (i.e., for json-schema)
if (fullContent !== undefined) {
fullContent += content
}
// Process each chunk
for await (const chunk of streamCompletion) {
const content = chunk.choices[0]?.delta?.content || ''
if (content) {
// Only append if fullContent is defined (i.e., for json-schema)
if (fullContent !== undefined) {
fullContent += content
// Send the chunk to the client
controller.enqueue(
encoder.encode(
JSON.stringify({
chunk: content,
done: false,
}) + '\n'
)
)
}
}
// Send the chunk to the client
const payload = encoder.encode(
JSON.stringify({
chunk: content,
done: false,
}) + '\n'
// Check JSON validity for json-schema type when streaming is complete
if (generationType === 'json-schema' && fullContent) {
try {
JSON.parse(fullContent)
} catch (parseError: any) {
logger.error(`[${requestId}] Generated JSON schema is invalid`, {
error: parseError.message,
content: fullContent,
})
// Send error to client
controller.enqueue(
encoder.encode(
JSON.stringify({
error: 'Generated JSON schema was invalid.',
done: true,
}) + '\n'
)
)
controller.close()
return
}
}
// Send the final done message
controller.enqueue(
encoder.encode(
JSON.stringify({
done: true,
...(fullContent !== undefined && { fullContent: fullContent }),
}) + '\n'
)
)
await writer.write(payload)
}
controller.close()
logger.info(`[${requestId}] Code generation streaming completed`, { generationType })
},
}),
{
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache, no-transform',
Connection: 'keep-alive',
},
}
)
} catch (error: any) {
logger.error(`[${requestId}] Streaming error`, {
error: error.message || 'Unknown error',
stack: error.stack,
})
// Check JSON validity for json-schema type when streaming is complete
if (generationType === 'json-schema') {
try {
JSON.parse(fullContent!)
} catch (parseError: any) {
logger.error(`[${requestId}] Generated JSON schema is invalid`, {
error: parseError.message,
content: fullContent,
})
// Send error to client
const errorPayload = encoder.encode(
JSON.stringify({
error: 'Generated JSON schema was invalid.',
done: true,
}) + '\n'
)
await writer.write(errorPayload)
await writer.close()
return
}
}
// Send the final done message
const donePayload = encoder.encode(
JSON.stringify({
done: true,
...(fullContent !== undefined && { fullContent: fullContent }),
}) + '\n'
)
await writer.write(donePayload)
await writer.close()
logger.info(`[${requestId}] Code generation streaming completed`, { generationType })
} catch (error: any) {
logger.error(`[${requestId}] Streaming error`, {
error: error.message || 'Unknown error',
stack: error.stack,
})
const clientErrorMessage = 'An error occurred during code generation streaming.'
// Send error to client
const errorPayload = encoder.encode(
JSON.stringify({
error: clientErrorMessage,
done: true,
}) + '\n'
)
await writer.write(errorPayload)
await writer.close()
}
return NextResponse.json(
{ success: false, error: 'An error occurred during code generation streaming.' },
{ status: 500 }
)
}
// Start streaming asynchronously
streamOpenAI()
return new Response(streamResponse.readable, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
},
})
}
// For non-streaming responses (original implementation)
@@ -395,16 +383,13 @@ export async function POST(req: NextRequest) {
stack: error.stack,
})
// --- MODIFICATION: Use generic error message for client ---
let clientErrorMessage = 'Code generation failed. Please try again later.'
// Keep original message for server logging
let serverErrorMessage = error.message || 'Unknown error'
// --- END MODIFICATION ---
let status = 500
if (error instanceof OpenAI.APIError) {
status = error.status || 500
// --- MODIFICATION: Update server log message, keep client message generic ---
serverErrorMessage = error.message // Use specific API error for server logs
logger.error(`[${requestId}] OpenAI API Error: ${status} - ${serverErrorMessage}`)
// Optionally, customize client message based on status, but keep it generic
@@ -416,15 +401,12 @@ export async function POST(req: NextRequest) {
clientErrorMessage =
'The code generation service is currently unavailable. Please try again later.'
}
// --- END MODIFICATION ---
}
return NextResponse.json(
{
success: false,
// --- MODIFICATION: Use generic client error message ---
error: clientErrorMessage,
// --- END MODIFICATION ---
},
{ status }
)

View File

@@ -138,6 +138,7 @@ export function useCodeGeneration({
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Cache-Control': 'no-cache, no-transform',
},
body: JSON.stringify({
prompt,
@@ -147,6 +148,8 @@ export function useCodeGeneration({
history: conversationHistory, // Send history
}),
signal: abortControllerRef.current.signal,
// Ensure no caching for Edge Functions
cache: 'no-store',
})
if (!response.ok) {
@@ -158,69 +161,80 @@ export function useCodeGeneration({
throw new Error('Response body is null')
}
// Set up streaming reader
const reader = response.body.getReader()
const decoder = new TextDecoder()
let fullContent = ''
// Signal the start of the stream to clear previous content
if (onStreamStart) {
onStreamStart()
}
while (true) {
const { done, value } = await reader.read()
if (done) break
// Set up streaming reader
const reader = response.body.getReader()
const decoder = new TextDecoder()
let fullContent = ''
// Process incoming chunks
const text = decoder.decode(value)
const lines = text.split('\n').filter((line) => line.trim() !== '')
try {
while (true) {
const { done, value } = await reader.read()
if (done) break
for (const line of lines) {
try {
const data = JSON.parse(line)
// Process incoming chunks
const text = decoder.decode(value)
const lines = text.split('\n').filter((line) => line.trim() !== '')
// Check if there's an error
if (data.error) {
throw new Error(data.error)
}
for (const line of lines) {
try {
const data = JSON.parse(line)
// Process chunk
if (data.chunk) {
fullContent += data.chunk
if (onStreamChunk) {
onStreamChunk(data.chunk)
}
}
// Check if streaming is complete
if (data.done) {
// Use full content from server if available (for validation)
if (data.fullContent) {
fullContent = data.fullContent
// Check if there's an error
if (data.error) {
throw new Error(data.error)
}
logger.info('Streaming code generation completed', { generationType })
// Update history AFTER the stream is fully complete
setConversationHistory((prevHistory) => [
...prevHistory,
{ role: 'user', content: currentPrompt },
{ role: 'assistant', content: fullContent }, // Use the final full content
])
// Call the main handler for the complete content
onGeneratedContent(fullContent)
if (onGenerationComplete) {
onGenerationComplete(currentPrompt, fullContent)
// Process chunk
if (data.chunk) {
fullContent += data.chunk
if (onStreamChunk) {
onStreamChunk(data.chunk)
}
}
addNotification('info', 'Content generated successfully!', null)
break
// Check if streaming is complete
if (data.done) {
// Use full content from server if available (for validation)
if (data.fullContent) {
fullContent = data.fullContent
}
logger.info('Streaming code generation completed', { generationType })
// Update history AFTER the stream is fully complete
setConversationHistory((prevHistory) => [
...prevHistory,
{ role: 'user', content: currentPrompt },
{ role: 'assistant', content: fullContent }, // Use the final full content
])
// Call the main handler for the complete content
onGeneratedContent(fullContent)
if (onGenerationComplete) {
onGenerationComplete(currentPrompt, fullContent)
}
addNotification('info', 'Content generated successfully!', null)
break
}
} catch (jsonError: any) {
logger.error('Failed to parse streaming response', { error: jsonError.message, line })
}
} catch (jsonError: any) {
logger.error('Failed to parse streaming response', { error: jsonError.message, line })
}
}
} catch (streamError: any) {
// Additional error handling for stream processing
if (streamError.name !== 'AbortError') {
logger.error('Error processing stream', { error: streamError.message })
throw streamError // Re-throw to be caught by outer try/catch
}
} finally {
// Always release the reader when done
reader.releaseLock()
}
} catch (err: any) {
// Don't show error if it was due to an abort