fix(streaming-api): fix streaming api (#1846)

* fix(billing): should allow restoring subscription (#1728)

* fix(already-cancelled-sub): UI should allow restoring subscription

* restore functionality fixed

* fix

* Fix streaming api

* Fix uuid stuff

* Lint

* Stripe docs fix

* Fix docs build error

* Fix uuid check

* Fix deployed chat streaming for non agent blocks

* Fix lint

---------

Co-authored-by: Waleed <walif6@gmail.com>
Co-authored-by: Vikhyath Mondreti <vikhyathvikku@gmail.com>
This commit is contained in:
Siddharth Ganesan
2025-11-07 14:38:15 -08:00
committed by GitHub
parent c9a8c7e392
commit 9fd2156e9e
8 changed files with 195 additions and 16 deletions

View File

@@ -359,7 +359,7 @@ Ein neues Abonnement für einen Kunden erstellen
| --------- | ---- | -------- | ----------- |
| `apiKey` | string | Ja | Stripe API-Schlüssel (geheimer Schlüssel) |
| `customer` | string | Ja | Kunden-ID für das Abonnement |
| `items` | json | Ja | Array von Artikeln mit Preis-IDs (z.B. [{"price": "price_xxx", "quantity": 1}]) |
| `items` | json | Ja | Array von Artikeln mit Preis-IDs \(z.B. \[\{"price": "price_xxx", "quantity": 1\}\]\) |
| `trial_period_days` | number | Nein | Anzahl der Testtage |
| `default_payment_method` | string | Nein | Zahlungsmethoden-ID |
| `cancel_at_period_end` | boolean | Nein | Abonnement am Ende der Periode kündigen |
@@ -774,7 +774,7 @@ Alle Zahlungen auflisten
| `apiKey` | string | Ja | Stripe API-Schlüssel (geheimer Schlüssel) |
| `limit` | number | Nein | Anzahl der zurückzugebenden Ergebnisse (Standard 10, max. 100) |
| `customer` | string | Nein | Nach Kunden-ID filtern |
| `created` | json | Nein | Nach Erstellungsdatum filtern (z.B. {"gt": 1633024800}) |
| `created` | json | Nein | Nach Erstellungsdatum filtern \(z.B. \{"gt": 1633024800\}\) |
#### Output
@@ -1051,7 +1051,7 @@ Alle Events auflisten
| `apiKey` | string | Ja | Stripe API-Schlüssel (Secret-Key) |
| `limit` | number | Nein | Anzahl der zurückzugebenden Ergebnisse (Standard 10, max. 100) |
| `type` | string | Nein | Nach Event-Typ filtern (z.B. payment_intent.created) |
| `created` | json | Nein | Nach Erstellungsdatum filtern (z.B. {"gt": 1633024800}) |
| `created` | json | Nein | Nach Erstellungsdatum filtern \(z.B. \{"gt": 1633024800\}\) |
#### Output

View File

@@ -79,7 +79,7 @@ Crear un nuevo Payment Intent para procesar un pago
| `description` | string | No | Descripción del pago |
| `receipt_email` | string | No | Dirección de correo electrónico para enviar el recibo |
| `metadata` | json | No | Conjunto de pares clave-valor para almacenar información adicional |
| `automatic_payment_methods` | json | No | Habilitar métodos de pago automáticos (p. ej., {"enabled": true}) |
| `automatic_payment_methods` | json | No | Habilitar métodos de pago automáticos \(p. ej., \{"enabled": true\}\) |
#### Salida
@@ -359,7 +359,7 @@ Crear una nueva suscripción para un cliente
| --------- | ---- | -------- | ----------- |
| `apiKey` | string | Sí | Clave API de Stripe (clave secreta) |
| `customer` | string | Sí | ID del cliente a suscribir |
| `items` | json | Sí | Array de elementos con IDs de precio (p. ej., [{"price": "price_xxx", "quantity": 1}]) |
| `items` | json | Sí | Array de elementos con IDs de precio \(p. ej., \[\{"price": "price_xxx", "quantity": 1\}\]\) |
| `trial_period_days` | number | No | Número de días de prueba |
| `default_payment_method` | string | No | ID del método de pago |
| `cancel_at_period_end` | boolean | No | Cancelar suscripción al final del período |
@@ -1051,7 +1051,7 @@ Listar todos los eventos
| `apiKey` | string | Sí | Clave API de Stripe (clave secreta) |
| `limit` | number | No | Número de resultados a devolver (predeterminado 10, máximo 100) |
| `type` | string | No | Filtrar por tipo de evento (p. ej., payment_intent.created) |
| `created` | json | No | Filtrar por fecha de creación (p. ej., {"gt": 1633024800}) |
| `created` | json | No | Filtrar por fecha de creación \(p. ej., \{"gt": 1633024800\}\) |
#### Salida

View File

@@ -79,7 +79,7 @@ Créer une nouvelle intention de paiement pour traiter un paiement
| `description` | string | Non | Description du paiement |
| `receipt_email` | string | Non | Adresse e-mail pour l'envoi du reçu |
| `metadata` | json | Non | Ensemble de paires clé-valeur pour stocker des informations supplémentaires |
| `automatic_payment_methods` | json | Non | Activer les méthodes de paiement automatiques (par ex., {"enabled": true}) |
| `automatic_payment_methods` | json | Non | Activer les méthodes de paiement automatiques \(par ex., \{"enabled": true\}\) |
#### Sortie
@@ -359,7 +359,7 @@ Créer un nouvel abonnement pour un client
| --------- | ---- | -------- | ----------- |
| `apiKey` | string | Oui | Clé API Stripe (clé secrète) |
| `customer` | string | Oui | ID du client à abonner |
| `items` | json | Oui | Tableau d'articles avec IDs de prix (ex., \[\{"price": "price_xxx", "quantity": 1\}\]) |
| `items` | json | Oui | Tableau d'articles avec IDs de prix \(ex., \[\{"price": "price_xxx", "quantity": 1\}\]\) |
| `trial_period_days` | number | Non | Nombre de jours d'essai |
| `default_payment_method` | string | Non | ID de la méthode de paiement |
| `cancel_at_period_end` | boolean | Non | Annuler l'abonnement à la fin de la période |
@@ -1051,7 +1051,7 @@ Lister tous les événements
| `apiKey` | string | Oui | Clé API Stripe (clé secrète) |
| `limit` | number | Non | Nombre de résultats à retourner (par défaut 10, max 100) |
| `type` | string | Non | Filtrer par type d'événement (ex. : payment_intent.created) |
| `created` | json | Non | Filtrer par date de création (ex. : {"gt": 1633024800}) |
| `created` | json | Non | Filtrer par date de création \(ex. : \{"gt": 1633024800\}\) |
#### Sortie

View File

@@ -359,7 +359,7 @@ IDで既存の顧客を取得する
| --------- | ---- | -------- | ----------- |
| `apiKey` | string | はい | Stripe APIキーシークレットキー |
| `customer` | string | はい | サブスクライブする顧客ID |
| `items` | json | はい | 価格IDを含むアイテムの配列[{"price": "price_xxx", "quantity": 1}] |
| `items` | json | はい | 価格IDを含むアイテムの配列\[\{"price": "price_xxx", "quantity": 1\}\] |
| `trial_period_days` | number | いいえ | トライアル日数 |
| `default_payment_method` | string | いいえ | 支払い方法ID |
| `cancel_at_period_end` | boolean | いいえ | 期間終了時にサブスクリプションをキャンセルする |

View File

@@ -79,7 +79,7 @@ import { BlockInfoCard } from "@/components/ui/block-info-card"
| `description` | string | 否 | 支付描述 |
| `receipt_email` | string | 否 | 用于发送收据的电子邮件地址 |
| `metadata` | json | 否 | 用于存储附加信息的键值对集合 |
| `automatic_payment_methods` | json | 否 | 启用自动支付方式(例如,{"enabled": true} |
| `automatic_payment_methods` | json | 否 | 启用自动支付方式(例如,\{"enabled": true\} |
#### 输出
@@ -197,7 +197,7 @@ import { BlockInfoCard } from "@/components/ui/block-info-card"
| `apiKey` | string | 是 | Stripe API 密钥secret key |
| `limit` | number | 否 | 返回结果数量(默认 10最大 100 |
| `customer` | string | 否 | 按客户 ID 过滤 |
| `created` | json | 否 | 按创建日期过滤(例如,{"gt": 1633024800} |
| `created` | json | 否 | 按创建日期过滤(例如,\{"gt": 1633024800\} |
#### 输出
@@ -774,7 +774,7 @@ import { BlockInfoCard } from "@/components/ui/block-info-card"
| `apiKey` | string | 是 | Stripe API 密钥(密钥) |
| `limit` | number | 否 | 返回结果的数量(默认 10最大 100 |
| `customer` | string | 否 | 按客户 ID 过滤 |
| `created` | json | 否 | 按创建日期过滤(例如,{"gt": 1633024800} |
| `created` | json | 否 | 按创建日期过滤(例如,\{"gt": 1633024800\} |
#### 输出
@@ -1051,7 +1051,7 @@ import { BlockInfoCard } from "@/components/ui/block-info-card"
| `apiKey` | string | 是 | Stripe API 密钥(密钥) |
| `limit` | number | 否 | 返回结果数量(默认 10最大 100 |
| `type` | string | 否 | 按事件类型过滤例如payment_intent.created |
| `created` | json | 否 | 按创建日期过滤(例如,{"gt": 1633024800} |
| `created` | json | 否 | 按创建日期过滤(例如,\{"gt": 1633024800\} |
#### 输出

View File

@@ -1,5 +1,5 @@
import { type NextRequest, NextResponse } from 'next/server'
import { v4 as uuidv4 } from 'uuid'
import { validate as uuidValidate, v4 as uuidv4 } from 'uuid'
import { checkHybridAuth } from '@/lib/auth/hybrid'
import { checkServerSideUsageLimits } from '@/lib/billing'
import { processInputFileFields } from '@/lib/execution/files'
@@ -13,6 +13,7 @@ import {
import { executeWorkflowCore } from '@/lib/workflows/executor/execution-core'
import { type ExecutionEvent, encodeSSEEvent } from '@/lib/workflows/executor/execution-events'
import { PauseResumeManager } from '@/lib/workflows/executor/pause-resume-manager'
import { createStreamingResponse } from '@/lib/workflows/streaming'
import { validateWorkflowAccess } from '@/app/api/workflows/middleware'
import { type ExecutionMetadata, ExecutionSnapshot } from '@/executor/execution/snapshot'
import type { StreamingExecution } from '@/executor/types'
@@ -184,6 +185,60 @@ export function createFilteredResult(result: any) {
}
}
function resolveOutputIds(
selectedOutputs: string[] | undefined,
blocks: Record<string, any>
): string[] | undefined {
if (!selectedOutputs || selectedOutputs.length === 0) {
return selectedOutputs
}
return selectedOutputs.map((outputId) => {
const underscoreIndex = outputId.indexOf('_')
const dotIndex = outputId.indexOf('.')
if (underscoreIndex > 0) {
const maybeUuid = outputId.substring(0, underscoreIndex)
if (uuidValidate(maybeUuid)) {
return outputId
}
}
if (dotIndex > 0) {
const maybeUuid = outputId.substring(0, dotIndex)
if (uuidValidate(maybeUuid)) {
return `${outputId.substring(0, dotIndex)}_${outputId.substring(dotIndex + 1)}`
}
}
if (uuidValidate(outputId)) {
return outputId
}
if (dotIndex === -1) {
logger.warn(`Invalid output ID format (missing dot): ${outputId}`)
return outputId
}
const blockName = outputId.substring(0, dotIndex)
const path = outputId.substring(dotIndex + 1)
const normalizedBlockName = blockName.toLowerCase().replace(/\s+/g, '')
const block = Object.values(blocks).find((b: any) => {
const normalized = (b.name || '').toLowerCase().replace(/\s+/g, '')
return normalized === normalizedBlockName
})
if (!block) {
logger.warn(`Block not found for name: ${blockName} (from output ID: ${outputId})`)
return outputId
}
const resolvedId = `${block.id}_${path}`
logger.debug(`Resolved output ID: ${outputId} -> ${resolvedId}`)
return resolvedId
})
}
/**
* POST /api/workflows/[id]/execute
*
@@ -425,7 +480,32 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
}
}
logger.info(`[${requestId}] Using SSE execution (streaming response)`)
if (shouldUseDraftState) {
logger.info(`[${requestId}] Using SSE console log streaming (manual execution)`)
} else {
logger.info(`[${requestId}] Using streaming API response`)
const deployedData = await loadDeployedWorkflowState(workflowId)
const resolvedSelectedOutputs = resolveOutputIds(selectedOutputs, deployedData?.blocks || {})
const stream = await createStreamingResponse({
requestId,
workflow,
input: processedInput,
executingUserId: userId,
streamConfig: {
selectedOutputs: resolvedSelectedOutputs,
isSecureMode: false,
workflowTriggerType: triggerType === 'chat' ? 'chat' : 'api',
},
createFilteredResult,
executionId,
})
return new NextResponse(stream, {
status: 200,
headers: SSE_HEADERS,
})
}
const encoder = new TextEncoder()
let executorInstance: any = null
let isStreamClosed = false

View File

@@ -409,6 +409,7 @@ export default function ChatClient({ identifier }: { identifier: string }) {
autoPlayResponses: shouldPlayAudio,
},
audioStreamHandler: audioHandler,
outputConfigs: chatConfig?.outputConfigs,
}
)
} catch (error: any) {

View File

@@ -21,6 +21,7 @@ export interface StreamingOptions {
onAudioStart?: () => void
onAudioEnd?: () => void
audioStreamHandler?: (text: string) => Promise<void>
outputConfigs?: Array<{ blockId: string; path?: string }>
}
export function useChatStreaming() {
@@ -176,17 +177,114 @@ export function useChatStreaming() {
}
if (eventType === 'final' && json.data) {
const finalData = json.data as {
success: boolean
error?: string | { message?: string }
output?: Record<string, Record<string, any>>
}
const outputConfigs = streamingOptions?.outputConfigs
const formattedOutputs: string[] = []
const formatValue = (value: any): string | null => {
if (value === null || value === undefined) {
return null
}
if (typeof value === 'string') {
return value
}
if (typeof value === 'object') {
try {
return `\`\`\`json\n${JSON.stringify(value, null, 2)}\n\`\`\``
} catch {
return String(value)
}
}
return String(value)
}
const getOutputValue = (blockOutputs: Record<string, any>, path?: string) => {
if (!path || path === 'content') {
if (blockOutputs.content !== undefined) return blockOutputs.content
if (blockOutputs.result !== undefined) return blockOutputs.result
return blockOutputs
}
if (blockOutputs[path] !== undefined) {
return blockOutputs[path]
}
if (path.includes('.')) {
return path.split('.').reduce<any>((current, segment) => {
if (current && typeof current === 'object' && segment in current) {
return current[segment]
}
return undefined
}, blockOutputs)
}
return undefined
}
if (outputConfigs?.length && finalData.output) {
for (const config of outputConfigs) {
const blockOutputs = finalData.output[config.blockId]
if (!blockOutputs) continue
const value = getOutputValue(blockOutputs, config.path)
const formatted = formatValue(value)
if (formatted) {
formattedOutputs.push(formatted)
}
}
}
let finalContent = accumulatedText
if (formattedOutputs.length > 0) {
const combinedOutputs = formattedOutputs.join('\n\n')
finalContent = finalContent
? `${finalContent.trim()}\n\n${combinedOutputs}`
: combinedOutputs
}
if (!finalContent) {
if (finalData.error) {
if (typeof finalData.error === 'string') {
finalContent = finalData.error
} else if (typeof finalData.error?.message === 'string') {
finalContent = finalData.error.message
}
} else if (finalData.success && finalData.output) {
const fallbackOutput = Object.values(finalData.output)
.map((block) => formatValue(block)?.trim())
.filter(Boolean)[0]
if (fallbackOutput) {
finalContent = fallbackOutput
}
}
}
setMessages((prev) =>
prev.map((msg) =>
msg.id === messageId
? {
...msg,
isStreaming: false,
content: finalContent ?? msg.content,
}
: msg
)
)
accumulatedTextRef.current = ''
lastStreamedPositionRef.current = 0
lastDisplayedPositionRef.current = 0
audioStreamingActiveRef.current = false
return
}