diff --git a/CHANGELOG.md b/CHANGELOG.md index 95d6041400..ba0f382bb7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Voice call/Gateway: prevent overlapping closed-loop turn races with per-call turn locking, route transcript dedupe via source-aware fingerprints with strict cache eviction bounds, and harden `voicecall latency` stats for large logs without spread-operator stack overflow. (#19140) Thanks @mbelinky. - Fix types in all tests. Typecheck the whole repository. - Voice-call: auto-end calls when media streams disconnect to prevent stuck active calls. (#18435) Thanks @JayMishra-source. - Gateway/Channels: wire `gateway.channelHealthCheckMinutes` into strict config validation, treat implicit account status as managed for health checks, and harden channel auto-restart flow (preserve restart-attempt caps across crash loops, propagate enabled/configured runtime flags, and stop pending restart backoff after manual stop). Thanks @steipete. diff --git a/extensions/voice-call/src/cli.ts b/extensions/voice-call/src/cli.ts index 5127a9791a..963fa6f517 100644 --- a/extensions/voice-call/src/cli.ts +++ b/extensions/voice-call/src/cli.ts @@ -62,8 +62,8 @@ function summarizeSeries(values: number[]): { return { count: 0, minMs: 0, maxMs: 0, avgMs: 0, p50Ms: 0, p95Ms: 0 }; } - const minMs = Math.min(...values); - const maxMs = Math.max(...values); + const minMs = values.reduce((min, value) => (value < min ? value : min), Number.POSITIVE_INFINITY); + const maxMs = values.reduce((max, value) => (value > max ? value : max), Number.NEGATIVE_INFINITY); const avgMs = values.reduce((sum, value) => sum + value, 0) / values.length; return { count: values.length, diff --git a/extensions/voice-call/src/manager.test.ts b/extensions/voice-call/src/manager.test.ts index aa8b2f76f2..3d02cb323b 100644 --- a/extensions/voice-call/src/manager.test.ts +++ b/extensions/voice-call/src/manager.test.ts @@ -347,8 +347,6 @@ describe("CallManager", () => { }); const first = manager.continueCall(started.callId, "First prompt"); - await new Promise((resolve) => setTimeout(resolve, 0)); - const second = await manager.continueCall(started.callId, "Second prompt"); expect(second.success).toBe(false); expect(second.error).toBe("Already waiting for transcript"); diff --git a/extensions/voice-call/src/manager.ts b/extensions/voice-call/src/manager.ts index d2c7d6eae8..927899f325 100644 --- a/extensions/voice-call/src/manager.ts +++ b/extensions/voice-call/src/manager.ts @@ -47,6 +47,7 @@ export class CallManager { private config: VoiceCallConfig; private storePath: string; private webhookUrl: string | null = null; + private activeTurnCalls = new Set(); private transcriptWaiters = new Map< CallId, { @@ -137,6 +138,7 @@ export class CallManager { config: this.config, storePath: this.storePath, webhookUrl: this.webhookUrl, + activeTurnCalls: this.activeTurnCalls, transcriptWaiters: this.transcriptWaiters, maxDurationTimers: this.maxDurationTimers, onCallAnswered: (call) => { diff --git a/extensions/voice-call/src/manager/context.ts b/extensions/voice-call/src/manager/context.ts index 03cbd3c1e1..1af703ed32 100644 --- a/extensions/voice-call/src/manager/context.ts +++ b/extensions/voice-call/src/manager/context.ts @@ -24,6 +24,7 @@ export type CallManagerRuntimeDeps = { }; export type CallManagerTransientState = { + activeTurnCalls: Set; transcriptWaiters: Map; maxDurationTimers: Map; }; diff --git a/extensions/voice-call/src/manager/events.test.ts b/extensions/voice-call/src/manager/events.test.ts index 8407c9cc65..74d1f10e46 100644 --- a/extensions/voice-call/src/manager/events.test.ts +++ b/extensions/voice-call/src/manager/events.test.ts @@ -24,6 +24,7 @@ function createContext(overrides: Partial = {}): CallManager }), storePath, webhookUrl: null, + activeTurnCalls: new Set(), transcriptWaiters: new Map(), maxDurationTimers: new Map(), ...overrides, diff --git a/extensions/voice-call/src/manager/outbound.ts b/extensions/voice-call/src/manager/outbound.ts index 61fe4567fd..d94c9da99e 100644 --- a/extensions/voice-call/src/manager/outbound.ts +++ b/extensions/voice-call/src/manager/outbound.ts @@ -36,6 +36,7 @@ type ConversationContext = Pick< | "provider" | "config" | "storePath" + | "activeTurnCalls" | "transcriptWaiters" | "maxDurationTimers" >; @@ -241,9 +242,10 @@ export async function continueCall( if (TerminalStates.has(call.state)) { return { success: false, error: "Call has ended" }; } - if (ctx.transcriptWaiters.has(callId)) { + if (ctx.activeTurnCalls.has(callId) || ctx.transcriptWaiters.has(callId)) { return { success: false, error: "Already waiting for transcript" }; } + ctx.activeTurnCalls.add(callId); const turnStartedAt = Date.now(); @@ -291,6 +293,7 @@ export async function continueCall( } catch (err) { return { success: false, error: err instanceof Error ? err.message : String(err) }; } finally { + ctx.activeTurnCalls.delete(callId); clearTranscriptWaiter(ctx, callId); } } diff --git a/extensions/voice-call/src/manager/timers.ts b/extensions/voice-call/src/manager/timers.ts index 116920e993..236ffa1435 100644 --- a/extensions/voice-call/src/manager/timers.ts +++ b/extensions/voice-call/src/manager/timers.ts @@ -87,8 +87,9 @@ export function resolveTranscriptWaiter( } export function waitForFinalTranscript(ctx: TimerContext, callId: CallId): Promise { - // Only allow one in-flight waiter per call. - rejectTranscriptWaiter(ctx, callId, "Transcript waiter replaced"); + if (ctx.transcriptWaiters.has(callId)) { + return Promise.reject(new Error("Already waiting for transcript")); + } const timeoutMs = ctx.config.transcriptTimeoutMs; return new Promise((resolve, reject) => { diff --git a/src/gateway/server-node-events.test.ts b/src/gateway/server-node-events.test.ts index 0b0788e0b8..77530882e3 100644 --- a/src/gateway/server-node-events.test.ts +++ b/src/gateway/server-node-events.test.ts @@ -198,6 +198,30 @@ describe("voice transcript events", () => { expect(updateSessionStoreMock).toHaveBeenCalledTimes(1); }); + it("does not dedupe identical text when source event IDs differ", async () => { + const ctx = buildCtx(); + + await handleNodeEvent(ctx, "node-v1", { + event: "voice.transcript", + payloadJSON: JSON.stringify({ + text: "hello from mic", + sessionKey: "voice-dedupe-eventid-session", + eventId: "evt-voice-1", + }), + }); + await handleNodeEvent(ctx, "node-v1", { + event: "voice.transcript", + payloadJSON: JSON.stringify({ + text: "hello from mic", + sessionKey: "voice-dedupe-eventid-session", + eventId: "evt-voice-2", + }), + }); + + expect(agentCommandMock).toHaveBeenCalledTimes(2); + expect(updateSessionStoreMock).toHaveBeenCalledTimes(2); + }); + it("forwards transcript with voice provenance", async () => { const ctx = buildCtx(); diff --git a/src/gateway/server-node-events.ts b/src/gateway/server-node-events.ts index 93f1255bb5..bd662a126a 100644 --- a/src/gateway/server-node-events.ts +++ b/src/gateway/server-node-events.ts @@ -19,22 +19,64 @@ const MAX_EXEC_EVENT_OUTPUT_CHARS = 180; const VOICE_TRANSCRIPT_DEDUPE_WINDOW_MS = 1500; const MAX_RECENT_VOICE_TRANSCRIPTS = 200; -const recentVoiceTranscripts = new Map(); +const recentVoiceTranscripts = new Map(); + +function normalizeNonEmptyString(value: unknown): string | null { + if (typeof value !== "string") { + return null; + } + const trimmed = value.trim(); + return trimmed.length > 0 ? trimmed : null; +} + +function normalizeFiniteInteger(value: unknown): number | null { + return typeof value === "number" && Number.isFinite(value) ? Math.trunc(value) : null; +} + +function resolveVoiceTranscriptFingerprint(obj: Record, text: string): string { + const eventId = + normalizeNonEmptyString(obj.eventId) ?? + normalizeNonEmptyString(obj.providerEventId) ?? + normalizeNonEmptyString(obj.transcriptId); + if (eventId) { + return `event:${eventId}`; + } + + const callId = normalizeNonEmptyString(obj.providerCallId) ?? normalizeNonEmptyString(obj.callId); + const sequence = normalizeFiniteInteger(obj.sequence) ?? normalizeFiniteInteger(obj.seq); + if (callId && sequence !== null) { + return `call-seq:${callId}:${sequence}`; + } + + const eventTimestamp = + normalizeFiniteInteger(obj.timestamp) ?? + normalizeFiniteInteger(obj.ts) ?? + normalizeFiniteInteger(obj.eventTimestamp); + if (callId && eventTimestamp !== null) { + return `call-ts:${callId}:${eventTimestamp}`; + } + + if (eventTimestamp !== null) { + return `timestamp:${eventTimestamp}|text:${text}`; + } + + return `text:${text}`; +} function shouldDropDuplicateVoiceTranscript(params: { sessionKey: string; - text: string; + fingerprint: string; now: number; }): boolean { const previous = recentVoiceTranscripts.get(params.sessionKey); if ( previous && - previous.text === params.text && + previous.fingerprint === params.fingerprint && params.now - previous.ts <= VOICE_TRANSCRIPT_DEDUPE_WINDOW_MS ) { return true; } - recentVoiceTranscripts.set(params.sessionKey, { text: params.text, ts: params.now }); + recentVoiceTranscripts.set(params.sessionKey, { fingerprint: params.fingerprint, ts: params.now }); if (recentVoiceTranscripts.size > MAX_RECENT_VOICE_TRANSCRIPTS) { const cutoff = params.now - VOICE_TRANSCRIPT_DEDUPE_WINDOW_MS * 2; @@ -46,6 +88,13 @@ function shouldDropDuplicateVoiceTranscript(params: { break; } } + while (recentVoiceTranscripts.size > MAX_RECENT_VOICE_TRANSCRIPTS) { + const oldestKey = recentVoiceTranscripts.keys().next().value; + if (oldestKey === undefined) { + break; + } + recentVoiceTranscripts.delete(oldestKey); + } } return false; @@ -168,7 +217,8 @@ export const handleNodeEvent = async (ctx: NodeEventContext, nodeId: string, evt const sessionKey = sessionKeyRaw.length > 0 ? sessionKeyRaw : rawMainKey; const { storePath, entry, canonicalKey } = loadSessionEntry(sessionKey); const now = Date.now(); - if (shouldDropDuplicateVoiceTranscript({ sessionKey: canonicalKey, text, now })) { + const fingerprint = resolveVoiceTranscriptFingerprint(obj, text); + if (shouldDropDuplicateVoiceTranscript({ sessionKey: canonicalKey, fingerprint, now })) { return; } const sessionId = entry?.sessionId ?? randomUUID();