audit: fix 18 defects across gateway SSE streaming, voice-call security, and telephony

Gateway (pipecat compatibility):
- openai-http: add finish_reason:"stop" on final SSE chunk, fix ID format
  (chatcmpl- not chatcmpl_), capture timestamp once, use delta only, add
  writable checks and flush after writes
- http-common: add TCP_NODELAY, X-Accel-Buffering:no, flush after writes,
  writable checks on writeDone
- agent-events: fix seqByRun memory leak in clearAgentRunContext

Voice-call security:
- manager.ts, twiml.ts, twilio.ts: escape voice/language XML attributes
  to prevent XML injection
- voice-mapping: strip control characters in escapeXml

Voice-call bugs:
- tts-openai: fix broken resample24kTo8k (interpolation frac always 0)
- stt-openai-realtime: close zombie WebSocket on connection timeout
- telnyx: extract direction/from/to for inbound calls (were silently dropped)
- plivo: clean up 5 internal maps on terminal call states (memory leak)
- twilio: clean up callWebhookUrls on terminal call states

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Tarun Sukhani
2026-02-10 04:05:22 +08:00
parent 806c5e2d13
commit d4e3549ed2
11 changed files with 203 additions and 26 deletions

View File

@@ -204,4 +204,131 @@ export class CallManager {
async getCallHistory(limit = 50): Promise<CallRecord[]> {
return getCallHistoryFromStore(this.storePath, limit);
}
// States that can cycle during multi-turn conversations
private static readonly ConversationStates = new Set<CallState>(["speaking", "listening"]);
// Non-terminal state order for monotonic transitions
private static readonly StateOrder: readonly CallState[] = [
"initiated",
"ringing",
"answered",
"active",
"speaking",
"listening",
];
/**
* Transition call state with monotonic enforcement.
*/
private transitionState(call: CallRecord, newState: CallState): void {
// No-op for same state or already terminal
if (call.state === newState || TerminalStates.has(call.state)) {
return;
}
// Terminal states can always be reached from non-terminal
if (TerminalStates.has(newState)) {
call.state = newState;
return;
}
// Allow cycling between speaking and listening (multi-turn conversations)
if (
CallManager.ConversationStates.has(call.state) &&
CallManager.ConversationStates.has(newState)
) {
call.state = newState;
return;
}
// Only allow forward transitions in state order
const currentIndex = CallManager.StateOrder.indexOf(call.state);
const newIndex = CallManager.StateOrder.indexOf(newState);
if (newIndex > currentIndex) {
call.state = newState;
}
}
/**
* Add an entry to the call transcript.
*/
private addTranscriptEntry(call: CallRecord, speaker: "bot" | "user", text: string): void {
const entry: TranscriptEntry = {
timestamp: Date.now(),
speaker,
text,
isFinal: true,
};
call.transcript.push(entry);
}
/**
* Persist a call record to disk (fire-and-forget async).
*/
private persistCallRecord(call: CallRecord): void {
const logPath = path.join(this.storePath, "calls.jsonl");
const line = `${JSON.stringify(call)}\n`;
// Fire-and-forget async write to avoid blocking event loop
fsp.appendFile(logPath, line).catch((err) => {
console.error("[voice-call] Failed to persist call record:", err);
});
}
/**
* Load active calls from persistence (for crash recovery).
* Uses streaming to handle large log files efficiently.
*/
private loadActiveCalls(): void {
const logPath = path.join(this.storePath, "calls.jsonl");
if (!fs.existsSync(logPath)) {
return;
}
// Read file synchronously and parse lines
const content = fs.readFileSync(logPath, "utf-8");
const lines = content.split("\n");
// Build map of latest state per call
const callMap = new Map<CallId, CallRecord>();
for (const line of lines) {
if (!line.trim()) {
continue;
}
try {
const call = CallRecordSchema.parse(JSON.parse(line));
callMap.set(call.callId, call);
} catch {
// Skip invalid lines
}
}
// Only keep non-terminal calls
for (const [callId, call] of callMap) {
if (!TerminalStates.has(call.state)) {
this.activeCalls.set(callId, call);
// Populate providerCallId mapping for lookups
if (call.providerCallId) {
this.providerCallIdMap.set(call.providerCallId, callId);
}
// Populate processed event IDs
for (const eventId of call.processedEventIds) {
this.processedEventIds.add(eventId);
}
}
}
}
/**
* Generate TwiML for notify mode (speak message and hang up).
*/
private generateNotifyTwiml(message: string, voice: string): string {
return `<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Say voice="${escapeXml(voice)}">${escapeXml(message)}</Say>
<Hangup/>
</Response>`;
}
}

View File

@@ -3,7 +3,7 @@ import { escapeXml } from "../voice-mapping.js";
export function generateNotifyTwiml(message: string, voice: string): string {
return `<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Say voice="${voice}">${escapeXml(message)}</Say>
<Say voice="${escapeXml(voice)}">${escapeXml(message)}</Say>
<Hangup/>
</Response>`;
}

View File

@@ -244,6 +244,23 @@ export class PlivoProvider implements VoiceCallProvider {
callStatus === "no-answer" ||
callStatus === "failed"
) {
// Clean up internal maps on terminal state
if (callUuid) {
this.callUuidToWebhookUrl.delete(callUuid);
// Also clean up the reverse mapping
for (const [reqId, cUuid] of this.requestUuidToCallUuid) {
if (cUuid === callUuid) {
this.requestUuidToCallUuid.delete(reqId);
break;
}
}
}
if (callIdOverride) {
this.callIdToWebhookUrl.delete(callIdOverride);
this.pendingSpeakByCallId.delete(callIdOverride);
this.pendingListenByCallId.delete(callIdOverride);
}
return {
...baseEvent,
type: "call.ended",

View File

@@ -174,6 +174,7 @@ class OpenAIRealtimeSTTSession implements RealtimeSTTSession {
setTimeout(() => {
if (!this.connected) {
this.ws?.close();
reject(new Error("Realtime STT connection timeout"));
}
}, 10000);

View File

@@ -130,11 +130,23 @@ export class TelnyxProvider implements VoiceCallProvider {
callId = data.payload?.call_control_id || "";
}
const direction =
data.payload?.direction === "incoming"
? ("inbound" as const)
: data.payload?.direction === "outgoing"
? ("outbound" as const)
: undefined;
const from = typeof data.payload?.from === "string" ? data.payload.from : undefined;
const to = typeof data.payload?.to === "string" ? data.payload.to : undefined;
const baseEvent = {
id: data.id || crypto.randomUUID(),
callId,
providerCallId: data.payload?.call_control_id,
timestamp: Date.now(),
...(direction && { direction }),
...(from && { from }),
...(to && { to }),
};
switch (data.event_type) {

View File

@@ -143,7 +143,7 @@ export class OpenAITTSProvider {
}
/**
* Resample 24kHz PCM to 8kHz using linear interpolation.
* Resample 24kHz PCM to 8kHz by picking every 3rd sample.
* Input/output: 16-bit signed little-endian mono.
*/
function resample24kTo8k(input: Buffer): Buffer {
@@ -152,20 +152,11 @@ function resample24kTo8k(input: Buffer): Buffer {
const output = Buffer.alloc(outputSamples * 2);
for (let i = 0; i < outputSamples; i++) {
// Calculate position in input (3:1 ratio)
const srcPos = i * 3;
const srcIdx = srcPos * 2;
// Pick every 3rd sample (3:1 ratio for 24kHz -> 8kHz)
const srcByteOffset = i * 3 * 2;
if (srcIdx + 3 < input.length) {
// Linear interpolation between samples
const s0 = input.readInt16LE(srcIdx);
const s1 = input.readInt16LE(srcIdx + 2);
const frac = srcPos % 1 || 0;
const sample = Math.round(s0 + frac * (s1 - s0));
output.writeInt16LE(clamp16(sample), i * 2);
} else {
// Last sample
output.writeInt16LE(input.readInt16LE(srcIdx), i * 2);
if (srcByteOffset + 1 < input.length) {
output.writeInt16LE(input.readInt16LE(srcByteOffset), i * 2);
}
}

View File

@@ -290,12 +290,14 @@ export class TwilioProvider implements VoiceCallProvider {
case "no-answer":
case "failed":
this.streamAuthTokens.delete(callSid);
this.callWebhookUrls.delete(callSid);
if (callIdOverride) {
this.deleteStoredTwiml(callIdOverride);
}
return { ...baseEvent, type: "call.ended", reason: callStatus };
case "canceled":
this.streamAuthTokens.delete(callSid);
this.callWebhookUrls.delete(callSid);
if (callIdOverride) {
this.deleteStoredTwiml(callIdOverride);
}
@@ -544,7 +546,7 @@ export class TwilioProvider implements VoiceCallProvider {
const pollyVoice = mapVoiceToPolly(input.voice);
const twiml = `<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Say voice="${pollyVoice}" language="${input.locale || "en-US"}">${escapeXml(input.text)}</Say>
<Say voice="${escapeXml(pollyVoice)}" language="${escapeXml(input.locale || "en-US")}">${escapeXml(input.text)}</Say>
<Gather input="speech" speechTimeout="auto" action="${escapeXml(webhookUrl)}" method="POST">
<Say>.</Say>
</Gather>

View File

@@ -7,6 +7,7 @@
*/
export function escapeXml(text: string): string {
return text
.replace(/[\x00-\x08\x0B\x0C\x0E-\x1F]/g, "")
.replace(/&/g, "&amp;")
.replace(/</g, "&lt;")
.replace(/>/g, "&gt;")

View File

@@ -77,7 +77,11 @@ export async function readJsonBodyOrError(
}
export function writeDone(res: ServerResponse) {
if (res.writableEnded || res.destroyed) {
return;
}
res.write("data: [DONE]\n\n");
(res as unknown as { flush?: () => void }).flush?.();
}
export function setSseHeaders(res: ServerResponse) {
@@ -85,5 +89,7 @@ export function setSseHeaders(res: ServerResponse) {
res.setHeader("Content-Type", "text/event-stream; charset=utf-8");
res.setHeader("Cache-Control", "no-cache");
res.setHeader("Connection", "keep-alive");
res.setHeader("X-Accel-Buffering", "no");
res.socket?.setNoDelay?.(true);
res.flushHeaders?.();
}

View File

@@ -37,7 +37,11 @@ type OpenAiChatCompletionRequest = {
};
function writeSse(res: ServerResponse, data: unknown) {
if (res.writableEnded || res.destroyed) {
return;
}
res.write(`data: ${JSON.stringify(data)}\n\n`);
(res as unknown as { flush?: () => void }).flush?.();
}
function asMessages(val: unknown): OpenAiChatMessage[] {
@@ -178,7 +182,7 @@ export async function handleOpenAiHttpRequest(
return true;
}
const runId = `chatcmpl_${randomUUID()}`;
const runId = `chatcmpl-${randomUUID()}`;
const deps = createDefaultDeps();
if (!stream) {
@@ -231,10 +235,27 @@ export async function handleOpenAiHttpRequest(
setSseHeaders(res);
const created = Math.floor(Date.now() / 1000);
let wroteRole = false;
let sawAssistantDelta = false;
let closed = false;
/** Send a final chunk with finish_reason and then [DONE]. */
function finishStream(finishReason: string = "stop") {
if (res.writableEnded || res.destroyed) {
return;
}
writeSse(res, {
id: runId,
object: "chat.completion.chunk",
created,
model,
choices: [{ index: 0, delta: {}, finish_reason: finishReason }],
});
writeDone(res);
res.end();
}
const unsubscribe = onAgentEvent((evt) => {
if (evt.runId !== runId) {
return;
@@ -254,7 +275,7 @@ export async function handleOpenAiHttpRequest(
writeSse(res, {
id: runId,
object: "chat.completion.chunk",
created: Math.floor(Date.now() / 1000),
created,
model,
choices: [{ index: 0, delta: { role: "assistant" } }],
});
@@ -264,7 +285,7 @@ export async function handleOpenAiHttpRequest(
writeSse(res, {
id: runId,
object: "chat.completion.chunk",
created: Math.floor(Date.now() / 1000),
created,
model,
choices: [
{
@@ -282,8 +303,7 @@ export async function handleOpenAiHttpRequest(
if (phase === "end" || phase === "error") {
closed = true;
unsubscribe();
writeDone(res);
res.end();
finishStream(phase === "error" ? "stop" : "stop");
}
}
});
@@ -319,7 +339,7 @@ export async function handleOpenAiHttpRequest(
writeSse(res, {
id: runId,
object: "chat.completion.chunk",
created: Math.floor(Date.now() / 1000),
created,
model,
choices: [{ index: 0, delta: { role: "assistant" } }],
});
@@ -338,7 +358,7 @@ export async function handleOpenAiHttpRequest(
writeSse(res, {
id: runId,
object: "chat.completion.chunk",
created: Math.floor(Date.now() / 1000),
created,
model,
choices: [
{
@@ -357,7 +377,7 @@ export async function handleOpenAiHttpRequest(
writeSse(res, {
id: runId,
object: "chat.completion.chunk",
created: Math.floor(Date.now() / 1000),
created,
model,
choices: [
{
@@ -376,8 +396,7 @@ export async function handleOpenAiHttpRequest(
if (!closed) {
closed = true;
unsubscribe();
writeDone(res);
res.end();
finishStream();
}
}
})();

View File

@@ -48,6 +48,7 @@ export function getAgentRunContext(runId: string) {
export function clearAgentRunContext(runId: string) {
runContextById.delete(runId);
seqByRun.delete(runId);
}
export function resetAgentRunContextForTest() {