From 0c87dbdcfc2653c512d0bc72fcfefad06cde33c8 Mon Sep 17 00:00:00 2001 From: Mariano <132747814+mbelinky@users.noreply.github.com> Date: Tue, 17 Feb 2026 13:02:38 +0000 Subject: [PATCH] voice-call: harden closed-loop turn loop and transcript routing (#19140) Merged via /review-pr -> /prepare-pr -> /merge-pr. Prepared head SHA: 14a3edb00572b20348f839bbafa56ca826cee362 Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com> Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com> Reviewed-by: @mbelinky --- CHANGELOG.md | 1 + extensions/voice-call/src/cli.ts | 85 +++++++ extensions/voice-call/src/manager.test.ts | 225 +++++++++++++++++- extensions/voice-call/src/manager.ts | 2 + extensions/voice-call/src/manager/context.ts | 1 + .../voice-call/src/manager/events.test.ts | 1 + extensions/voice-call/src/manager/outbound.ts | 36 ++- extensions/voice-call/src/manager/timers.ts | 5 +- package.json | 1 + src/gateway/server-node-events.test.ts | 130 +++++++++- src/gateway/server-node-events.ts | 126 +++++++++- src/plugins/voice-call.plugin.test.ts | 66 +++++ 12 files changed, 672 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 95d6041400..ba0f382bb7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,7 @@ Docs: https://docs.openclaw.ai ### Fixes +- Voice call/Gateway: prevent overlapping closed-loop turn races with per-call turn locking, route transcript dedupe via source-aware fingerprints with strict cache eviction bounds, and harden `voicecall latency` stats for large logs without spread-operator stack overflow. (#19140) Thanks @mbelinky. - Fix types in all tests. Typecheck the whole repository. - Voice-call: auto-end calls when media streams disconnect to prevent stuck active calls. (#18435) Thanks @JayMishra-source. - Gateway/Channels: wire `gateway.channelHealthCheckMinutes` into strict config validation, treat implicit account status as managed for health checks, and harden channel auto-restart flow (preserve restart-attempt caps across crash loops, propagate enabled/configured runtime flags, and stop pending restart backoff after manual stop). Thanks @steipete. diff --git a/extensions/voice-call/src/cli.ts b/extensions/voice-call/src/cli.ts index 85049bab7f..963fa6f517 100644 --- a/extensions/voice-call/src/cli.ts +++ b/extensions/voice-call/src/cli.ts @@ -41,6 +41,40 @@ function resolveDefaultStorePath(config: VoiceCallConfig): string { return path.join(base, "calls.jsonl"); } +function percentile(values: number[], p: number): number { + if (values.length === 0) { + return 0; + } + const sorted = [...values].sort((a, b) => a - b); + const idx = Math.min(sorted.length - 1, Math.max(0, Math.ceil((p / 100) * sorted.length) - 1)); + return sorted[idx] ?? 0; +} + +function summarizeSeries(values: number[]): { + count: number; + minMs: number; + maxMs: number; + avgMs: number; + p50Ms: number; + p95Ms: number; +} { + if (values.length === 0) { + return { count: 0, minMs: 0, maxMs: 0, avgMs: 0, p50Ms: 0, p95Ms: 0 }; + } + + const minMs = values.reduce((min, value) => (value < min ? value : min), Number.POSITIVE_INFINITY); + const maxMs = values.reduce((max, value) => (value > max ? value : max), Number.NEGATIVE_INFINITY); + const avgMs = values.reduce((sum, value) => sum + value, 0) / values.length; + return { + count: values.length, + minMs, + maxMs, + avgMs, + p50Ms: percentile(values, 50), + p95Ms: percentile(values, 95), + }; +} + export function registerVoiceCallCli(params: { program: Command; config: VoiceCallConfig; @@ -216,6 +250,57 @@ export function registerVoiceCallCli(params: { } }); + root + .command("latency") + .description("Summarize turn latency metrics from voice-call JSONL logs") + .option("--file ", "Path to calls.jsonl", resolveDefaultStorePath(config)) + .option("--last ", "Analyze last N records", "200") + .action(async (options: { file: string; last?: string }) => { + const file = options.file; + const last = Math.max(1, Number(options.last ?? 200)); + + if (!fs.existsSync(file)) { + throw new Error("No log file at " + file); + } + + const content = fs.readFileSync(file, "utf8"); + const lines = content.split("\n").filter(Boolean).slice(-last); + + const turnLatencyMs: number[] = []; + const listenWaitMs: number[] = []; + + for (const line of lines) { + try { + const parsed = JSON.parse(line) as { + metadata?: { lastTurnLatencyMs?: unknown; lastTurnListenWaitMs?: unknown }; + }; + const latency = parsed.metadata?.lastTurnLatencyMs; + const listenWait = parsed.metadata?.lastTurnListenWaitMs; + if (typeof latency === "number" && Number.isFinite(latency)) { + turnLatencyMs.push(latency); + } + if (typeof listenWait === "number" && Number.isFinite(listenWait)) { + listenWaitMs.push(listenWait); + } + } catch { + // ignore malformed JSON lines + } + } + + // eslint-disable-next-line no-console + console.log( + JSON.stringify( + { + recordsScanned: lines.length, + turnLatency: summarizeSeries(turnLatencyMs), + listenWait: summarizeSeries(listenWaitMs), + }, + null, + 2, + ), + ); + }); + root .command("expose") .description("Enable/disable Tailscale serve/funnel for the webhook") diff --git a/extensions/voice-call/src/manager.test.ts b/extensions/voice-call/src/manager.test.ts index 856556bd2e..3d02cb323b 100644 --- a/extensions/voice-call/src/manager.test.ts +++ b/extensions/voice-call/src/manager.test.ts @@ -20,6 +20,8 @@ class FakeProvider implements VoiceCallProvider { readonly name = "plivo" as const; readonly playTtsCalls: PlayTtsInput[] = []; readonly hangupCalls: HangupCallInput[] = []; + readonly startListeningCalls: StartListeningInput[] = []; + readonly stopListeningCalls: StopListeningInput[] = []; verifyWebhook(_ctx: WebhookContext): WebhookVerificationResult { return { ok: true }; @@ -36,8 +38,12 @@ class FakeProvider implements VoiceCallProvider { async playTts(input: PlayTtsInput): Promise { this.playTtsCalls.push(input); } - async startListening(_input: StartListeningInput): Promise {} - async stopListening(_input: StopListeningInput): Promise {} + async startListening(input: StartListeningInput): Promise { + this.startListeningCalls.push(input); + } + async stopListening(input: StopListeningInput): Promise { + this.stopListeningCalls.push(input); + } } describe("CallManager", () => { @@ -261,4 +267,219 @@ describe("CallManager", () => { expect(manager.getCallByProviderCallId("provider-exact")).toBeDefined(); }); + + it("completes a closed-loop turn without live audio", async () => { + const config = VoiceCallConfigSchema.parse({ + enabled: true, + provider: "plivo", + fromNumber: "+15550000000", + transcriptTimeoutMs: 5000, + }); + + const storePath = path.join(os.tmpdir(), `openclaw-voice-call-test-${Date.now()}`); + const provider = new FakeProvider(); + const manager = new CallManager(config, storePath); + manager.initialize(provider, "https://example.com/voice/webhook"); + + const started = await manager.initiateCall("+15550000003"); + expect(started.success).toBe(true); + + manager.processEvent({ + id: "evt-closed-loop-answered", + type: "call.answered", + callId: started.callId, + providerCallId: "request-uuid", + timestamp: Date.now(), + }); + + const turnPromise = manager.continueCall(started.callId, "How can I help?"); + await new Promise((resolve) => setTimeout(resolve, 0)); + + manager.processEvent({ + id: "evt-closed-loop-speech", + type: "call.speech", + callId: started.callId, + providerCallId: "request-uuid", + timestamp: Date.now(), + transcript: "Please check status", + isFinal: true, + }); + + const turn = await turnPromise; + expect(turn.success).toBe(true); + expect(turn.transcript).toBe("Please check status"); + expect(provider.startListeningCalls).toHaveLength(1); + expect(provider.stopListeningCalls).toHaveLength(1); + + const call = manager.getCall(started.callId); + expect(call?.transcript.map((entry) => entry.text)).toEqual([ + "How can I help?", + "Please check status", + ]); + const metadata = (call?.metadata ?? {}) as Record; + expect(typeof metadata.lastTurnLatencyMs).toBe("number"); + expect(typeof metadata.lastTurnListenWaitMs).toBe("number"); + expect(metadata.turnCount).toBe(1); + }); + + it("rejects overlapping continueCall requests for the same call", async () => { + const config = VoiceCallConfigSchema.parse({ + enabled: true, + provider: "plivo", + fromNumber: "+15550000000", + transcriptTimeoutMs: 5000, + }); + + const storePath = path.join(os.tmpdir(), `openclaw-voice-call-test-${Date.now()}`); + const provider = new FakeProvider(); + const manager = new CallManager(config, storePath); + manager.initialize(provider, "https://example.com/voice/webhook"); + + const started = await manager.initiateCall("+15550000004"); + expect(started.success).toBe(true); + + manager.processEvent({ + id: "evt-overlap-answered", + type: "call.answered", + callId: started.callId, + providerCallId: "request-uuid", + timestamp: Date.now(), + }); + + const first = manager.continueCall(started.callId, "First prompt"); + const second = await manager.continueCall(started.callId, "Second prompt"); + expect(second.success).toBe(false); + expect(second.error).toBe("Already waiting for transcript"); + + manager.processEvent({ + id: "evt-overlap-speech", + type: "call.speech", + callId: started.callId, + providerCallId: "request-uuid", + timestamp: Date.now(), + transcript: "Done", + isFinal: true, + }); + + const firstResult = await first; + expect(firstResult.success).toBe(true); + expect(firstResult.transcript).toBe("Done"); + expect(provider.startListeningCalls).toHaveLength(1); + expect(provider.stopListeningCalls).toHaveLength(1); + }); + + it("tracks latency metadata across multiple closed-loop turns", async () => { + const config = VoiceCallConfigSchema.parse({ + enabled: true, + provider: "plivo", + fromNumber: "+15550000000", + transcriptTimeoutMs: 5000, + }); + + const storePath = path.join(os.tmpdir(), `openclaw-voice-call-test-${Date.now()}`); + const provider = new FakeProvider(); + const manager = new CallManager(config, storePath); + manager.initialize(provider, "https://example.com/voice/webhook"); + + const started = await manager.initiateCall("+15550000005"); + expect(started.success).toBe(true); + + manager.processEvent({ + id: "evt-multi-answered", + type: "call.answered", + callId: started.callId, + providerCallId: "request-uuid", + timestamp: Date.now(), + }); + + const firstTurn = manager.continueCall(started.callId, "First question"); + await new Promise((resolve) => setTimeout(resolve, 0)); + manager.processEvent({ + id: "evt-multi-speech-1", + type: "call.speech", + callId: started.callId, + providerCallId: "request-uuid", + timestamp: Date.now(), + transcript: "First answer", + isFinal: true, + }); + await firstTurn; + + const secondTurn = manager.continueCall(started.callId, "Second question"); + await new Promise((resolve) => setTimeout(resolve, 0)); + manager.processEvent({ + id: "evt-multi-speech-2", + type: "call.speech", + callId: started.callId, + providerCallId: "request-uuid", + timestamp: Date.now(), + transcript: "Second answer", + isFinal: true, + }); + const secondResult = await secondTurn; + + expect(secondResult.success).toBe(true); + + const call = manager.getCall(started.callId); + expect(call?.transcript.map((entry) => entry.text)).toEqual([ + "First question", + "First answer", + "Second question", + "Second answer", + ]); + const metadata = (call?.metadata ?? {}) as Record; + expect(metadata.turnCount).toBe(2); + expect(typeof metadata.lastTurnLatencyMs).toBe("number"); + expect(typeof metadata.lastTurnListenWaitMs).toBe("number"); + expect(provider.startListeningCalls).toHaveLength(2); + expect(provider.stopListeningCalls).toHaveLength(2); + }); + + it("handles repeated closed-loop turns without waiter churn", async () => { + const config = VoiceCallConfigSchema.parse({ + enabled: true, + provider: "plivo", + fromNumber: "+15550000000", + transcriptTimeoutMs: 5000, + }); + + const storePath = path.join(os.tmpdir(), `openclaw-voice-call-test-${Date.now()}`); + const provider = new FakeProvider(); + const manager = new CallManager(config, storePath); + manager.initialize(provider, "https://example.com/voice/webhook"); + + const started = await manager.initiateCall("+15550000006"); + expect(started.success).toBe(true); + + manager.processEvent({ + id: "evt-loop-answered", + type: "call.answered", + callId: started.callId, + providerCallId: "request-uuid", + timestamp: Date.now(), + }); + + for (let i = 1; i <= 5; i++) { + const turnPromise = manager.continueCall(started.callId, `Prompt ${i}`); + await new Promise((resolve) => setTimeout(resolve, 0)); + manager.processEvent({ + id: `evt-loop-speech-${i}`, + type: "call.speech", + callId: started.callId, + providerCallId: "request-uuid", + timestamp: Date.now(), + transcript: `Answer ${i}`, + isFinal: true, + }); + const result = await turnPromise; + expect(result.success).toBe(true); + expect(result.transcript).toBe(`Answer ${i}`); + } + + const call = manager.getCall(started.callId); + const metadata = (call?.metadata ?? {}) as Record; + expect(metadata.turnCount).toBe(5); + expect(provider.startListeningCalls).toHaveLength(5); + expect(provider.stopListeningCalls).toHaveLength(5); + }); }); diff --git a/extensions/voice-call/src/manager.ts b/extensions/voice-call/src/manager.ts index d2c7d6eae8..927899f325 100644 --- a/extensions/voice-call/src/manager.ts +++ b/extensions/voice-call/src/manager.ts @@ -47,6 +47,7 @@ export class CallManager { private config: VoiceCallConfig; private storePath: string; private webhookUrl: string | null = null; + private activeTurnCalls = new Set(); private transcriptWaiters = new Map< CallId, { @@ -137,6 +138,7 @@ export class CallManager { config: this.config, storePath: this.storePath, webhookUrl: this.webhookUrl, + activeTurnCalls: this.activeTurnCalls, transcriptWaiters: this.transcriptWaiters, maxDurationTimers: this.maxDurationTimers, onCallAnswered: (call) => { diff --git a/extensions/voice-call/src/manager/context.ts b/extensions/voice-call/src/manager/context.ts index 03cbd3c1e1..1af703ed32 100644 --- a/extensions/voice-call/src/manager/context.ts +++ b/extensions/voice-call/src/manager/context.ts @@ -24,6 +24,7 @@ export type CallManagerRuntimeDeps = { }; export type CallManagerTransientState = { + activeTurnCalls: Set; transcriptWaiters: Map; maxDurationTimers: Map; }; diff --git a/extensions/voice-call/src/manager/events.test.ts b/extensions/voice-call/src/manager/events.test.ts index 8407c9cc65..74d1f10e46 100644 --- a/extensions/voice-call/src/manager/events.test.ts +++ b/extensions/voice-call/src/manager/events.test.ts @@ -24,6 +24,7 @@ function createContext(overrides: Partial = {}): CallManager }), storePath, webhookUrl: null, + activeTurnCalls: new Set(), transcriptWaiters: new Map(), maxDurationTimers: new Map(), ...overrides, diff --git a/extensions/voice-call/src/manager/outbound.ts b/extensions/voice-call/src/manager/outbound.ts index 477ce18b83..d94c9da99e 100644 --- a/extensions/voice-call/src/manager/outbound.ts +++ b/extensions/voice-call/src/manager/outbound.ts @@ -36,6 +36,7 @@ type ConversationContext = Pick< | "provider" | "config" | "storePath" + | "activeTurnCalls" | "transcriptWaiters" | "maxDurationTimers" >; @@ -158,7 +159,6 @@ export async function speak( if (TerminalStates.has(call.state)) { return { success: false, error: "Call has ended" }; } - try { transitionState(call, "speaking"); persistCallRecord(ctx.storePath, call); @@ -242,6 +242,12 @@ export async function continueCall( if (TerminalStates.has(call.state)) { return { success: false, error: "Call has ended" }; } + if (ctx.activeTurnCalls.has(callId) || ctx.transcriptWaiters.has(callId)) { + return { success: false, error: "Already waiting for transcript" }; + } + ctx.activeTurnCalls.add(callId); + + const turnStartedAt = Date.now(); try { await speak(ctx, callId, prompt); @@ -249,17 +255,45 @@ export async function continueCall( transitionState(call, "listening"); persistCallRecord(ctx.storePath, call); + const listenStartedAt = Date.now(); await ctx.provider.startListening({ callId, providerCallId: call.providerCallId }); const transcript = await waitForFinalTranscript(ctx, callId); + const transcriptReceivedAt = Date.now(); // Best-effort: stop listening after final transcript. await ctx.provider.stopListening({ callId, providerCallId: call.providerCallId }); + const lastTurnLatencyMs = transcriptReceivedAt - turnStartedAt; + const lastTurnListenWaitMs = transcriptReceivedAt - listenStartedAt; + const turnCount = + call.metadata && typeof call.metadata.turnCount === "number" + ? call.metadata.turnCount + 1 + : 1; + + call.metadata = { + ...(call.metadata ?? {}), + turnCount, + lastTurnLatencyMs, + lastTurnListenWaitMs, + lastTurnCompletedAt: transcriptReceivedAt, + }; + persistCallRecord(ctx.storePath, call); + + console.log( + "[voice-call] continueCall latency call=" + + call.callId + + " totalMs=" + + String(lastTurnLatencyMs) + + " listenWaitMs=" + + String(lastTurnListenWaitMs), + ); + return { success: true, transcript }; } catch (err) { return { success: false, error: err instanceof Error ? err.message : String(err) }; } finally { + ctx.activeTurnCalls.delete(callId); clearTranscriptWaiter(ctx, callId); } } diff --git a/extensions/voice-call/src/manager/timers.ts b/extensions/voice-call/src/manager/timers.ts index 116920e993..236ffa1435 100644 --- a/extensions/voice-call/src/manager/timers.ts +++ b/extensions/voice-call/src/manager/timers.ts @@ -87,8 +87,9 @@ export function resolveTranscriptWaiter( } export function waitForFinalTranscript(ctx: TimerContext, callId: CallId): Promise { - // Only allow one in-flight waiter per call. - rejectTranscriptWaiter(ctx, callId, "Transcript waiter replaced"); + if (ctx.transcriptWaiters.has(callId)) { + return Promise.reject(new Error("Already waiting for transcript")); + } const timeoutMs = ctx.config.transcriptTimeoutMs; return new Promise((resolve, reject) => { diff --git a/package.json b/package.json index 343b24af2e..c26cb2ccfb 100644 --- a/package.json +++ b/package.json @@ -117,6 +117,7 @@ "test:live": "OPENCLAW_LIVE_TEST=1 CLAWDBOT_LIVE_TEST=1 vitest run --config vitest.live.config.ts", "test:macmini": "OPENCLAW_TEST_VM_FORKS=0 OPENCLAW_TEST_PROFILE=serial node scripts/test-parallel.mjs", "test:ui": "pnpm --dir ui test", + "test:voicecall:closedloop": "vitest run extensions/voice-call/src/manager.test.ts extensions/voice-call/src/media-stream.test.ts src/plugins/voice-call.plugin.test.ts --maxWorkers=1", "test:watch": "vitest", "tui": "node scripts/run-node.mjs tui", "tui:dev": "OPENCLAW_PROFILE=dev CLAWDBOT_PROFILE=dev node scripts/run-node.mjs --dev tui", diff --git a/src/gateway/server-node-events.test.ts b/src/gateway/server-node-events.test.ts index 2b49210d2f..77530882e3 100644 --- a/src/gateway/server-node-events.test.ts +++ b/src/gateway/server-node-events.test.ts @@ -6,16 +6,41 @@ vi.mock("../infra/system-events.js", () => ({ vi.mock("../infra/heartbeat-wake.js", () => ({ requestHeartbeatNow: vi.fn(), })); +vi.mock("../commands/agent.js", () => ({ + agentCommand: vi.fn(), +})); +vi.mock("../config/config.js", () => ({ + loadConfig: vi.fn(() => ({ session: { mainKey: "agent:main:main" } })), +})); +vi.mock("../config/sessions.js", () => ({ + updateSessionStore: vi.fn(), +})); +vi.mock("./session-utils.js", () => ({ + loadSessionEntry: vi.fn((sessionKey: string) => ({ + storePath: "/tmp/sessions.json", + entry: { sessionId: `sid-${sessionKey}` }, + canonicalKey: sessionKey, + })), + pruneLegacyStoreKeys: vi.fn(), + resolveGatewaySessionStoreTarget: vi.fn(({ key }: { key: string }) => ({ + canonicalKey: key, + storeKeys: [key], + })), +})); import type { CliDeps } from "../cli/deps.js"; import type { HealthSummary } from "../commands/health.js"; +import type { NodeEventContext } from "./server-node-events-types.js"; +import { agentCommand } from "../commands/agent.js"; +import { updateSessionStore } from "../config/sessions.js"; import { requestHeartbeatNow } from "../infra/heartbeat-wake.js"; import { enqueueSystemEvent } from "../infra/system-events.js"; -import type { NodeEventContext } from "./server-node-events-types.js"; import { handleNodeEvent } from "./server-node-events.js"; const enqueueSystemEventMock = vi.mocked(enqueueSystemEvent); const requestHeartbeatNowMock = vi.mocked(requestHeartbeatNow); +const agentCommandMock = vi.mocked(agentCommand); +const updateSessionStoreMock = vi.mocked(updateSessionStore); function buildCtx(): NodeEventContext { return { @@ -138,3 +163,106 @@ describe("node exec events", () => { expect(requestHeartbeatNowMock).toHaveBeenCalledWith({ reason: "exec-event" }); }); }); + +describe("voice transcript events", () => { + beforeEach(() => { + agentCommandMock.mockReset(); + updateSessionStoreMock.mockReset(); + agentCommandMock.mockResolvedValue({ status: "ok" } as never); + updateSessionStoreMock.mockImplementation(async (_storePath, update) => { + update({}); + }); + }); + + it("dedupes repeated transcript payloads for the same session", async () => { + const addChatRun = vi.fn(); + const ctx = buildCtx(); + ctx.addChatRun = addChatRun; + + const payload = { + text: "hello from mic", + sessionKey: "voice-dedupe-session", + }; + + await handleNodeEvent(ctx, "node-v1", { + event: "voice.transcript", + payloadJSON: JSON.stringify(payload), + }); + await handleNodeEvent(ctx, "node-v1", { + event: "voice.transcript", + payloadJSON: JSON.stringify(payload), + }); + + expect(agentCommandMock).toHaveBeenCalledTimes(1); + expect(addChatRun).toHaveBeenCalledTimes(1); + expect(updateSessionStoreMock).toHaveBeenCalledTimes(1); + }); + + it("does not dedupe identical text when source event IDs differ", async () => { + const ctx = buildCtx(); + + await handleNodeEvent(ctx, "node-v1", { + event: "voice.transcript", + payloadJSON: JSON.stringify({ + text: "hello from mic", + sessionKey: "voice-dedupe-eventid-session", + eventId: "evt-voice-1", + }), + }); + await handleNodeEvent(ctx, "node-v1", { + event: "voice.transcript", + payloadJSON: JSON.stringify({ + text: "hello from mic", + sessionKey: "voice-dedupe-eventid-session", + eventId: "evt-voice-2", + }), + }); + + expect(agentCommandMock).toHaveBeenCalledTimes(2); + expect(updateSessionStoreMock).toHaveBeenCalledTimes(2); + }); + + it("forwards transcript with voice provenance", async () => { + const ctx = buildCtx(); + + await handleNodeEvent(ctx, "node-v2", { + event: "voice.transcript", + payloadJSON: JSON.stringify({ + text: "check provenance", + sessionKey: "voice-provenance-session", + }), + }); + + expect(agentCommandMock).toHaveBeenCalledTimes(1); + const [opts] = agentCommandMock.mock.calls[0] ?? []; + expect(opts).toMatchObject({ + message: "check provenance", + deliver: false, + messageChannel: "node", + inputProvenance: { + kind: "external_user", + sourceChannel: "voice", + sourceTool: "gateway.voice.transcript", + }, + }); + }); + + it("does not block agent dispatch when session-store touch fails", async () => { + const warn = vi.fn(); + const ctx = buildCtx(); + ctx.logGateway = { warn }; + updateSessionStoreMock.mockRejectedValueOnce(new Error("disk down")); + + await handleNodeEvent(ctx, "node-v3", { + event: "voice.transcript", + payloadJSON: JSON.stringify({ + text: "continue anyway", + sessionKey: "voice-store-fail-session", + }), + }); + await new Promise((resolve) => setTimeout(resolve, 0)); + + expect(agentCommandMock).toHaveBeenCalledTimes(1); + expect(warn).toHaveBeenCalledWith(expect.stringContaining("voice session-store update failed")); + }); +}); diff --git a/src/gateway/server-node-events.ts b/src/gateway/server-node-events.ts index cad2e803c1..bd662a126a 100644 --- a/src/gateway/server-node-events.ts +++ b/src/gateway/server-node-events.ts @@ -16,6 +16,89 @@ import { import { formatForLog } from "./ws-log.js"; const MAX_EXEC_EVENT_OUTPUT_CHARS = 180; +const VOICE_TRANSCRIPT_DEDUPE_WINDOW_MS = 1500; +const MAX_RECENT_VOICE_TRANSCRIPTS = 200; + +const recentVoiceTranscripts = new Map(); + +function normalizeNonEmptyString(value: unknown): string | null { + if (typeof value !== "string") { + return null; + } + const trimmed = value.trim(); + return trimmed.length > 0 ? trimmed : null; +} + +function normalizeFiniteInteger(value: unknown): number | null { + return typeof value === "number" && Number.isFinite(value) ? Math.trunc(value) : null; +} + +function resolveVoiceTranscriptFingerprint(obj: Record, text: string): string { + const eventId = + normalizeNonEmptyString(obj.eventId) ?? + normalizeNonEmptyString(obj.providerEventId) ?? + normalizeNonEmptyString(obj.transcriptId); + if (eventId) { + return `event:${eventId}`; + } + + const callId = normalizeNonEmptyString(obj.providerCallId) ?? normalizeNonEmptyString(obj.callId); + const sequence = normalizeFiniteInteger(obj.sequence) ?? normalizeFiniteInteger(obj.seq); + if (callId && sequence !== null) { + return `call-seq:${callId}:${sequence}`; + } + + const eventTimestamp = + normalizeFiniteInteger(obj.timestamp) ?? + normalizeFiniteInteger(obj.ts) ?? + normalizeFiniteInteger(obj.eventTimestamp); + if (callId && eventTimestamp !== null) { + return `call-ts:${callId}:${eventTimestamp}`; + } + + if (eventTimestamp !== null) { + return `timestamp:${eventTimestamp}|text:${text}`; + } + + return `text:${text}`; +} + +function shouldDropDuplicateVoiceTranscript(params: { + sessionKey: string; + fingerprint: string; + now: number; +}): boolean { + const previous = recentVoiceTranscripts.get(params.sessionKey); + if ( + previous && + previous.fingerprint === params.fingerprint && + params.now - previous.ts <= VOICE_TRANSCRIPT_DEDUPE_WINDOW_MS + ) { + return true; + } + recentVoiceTranscripts.set(params.sessionKey, { fingerprint: params.fingerprint, ts: params.now }); + + if (recentVoiceTranscripts.size > MAX_RECENT_VOICE_TRANSCRIPTS) { + const cutoff = params.now - VOICE_TRANSCRIPT_DEDUPE_WINDOW_MS * 2; + for (const [key, value] of recentVoiceTranscripts) { + if (value.ts < cutoff) { + recentVoiceTranscripts.delete(key); + } + if (recentVoiceTranscripts.size <= MAX_RECENT_VOICE_TRANSCRIPTS) { + break; + } + } + while (recentVoiceTranscripts.size > MAX_RECENT_VOICE_TRANSCRIPTS) { + const oldestKey = recentVoiceTranscripts.keys().next().value; + if (oldestKey === undefined) { + break; + } + recentVoiceTranscripts.delete(oldestKey); + } + } + + return false; +} function compactExecEventOutput(raw: string) { const normalized = raw.replace(/\s+/g, " ").trim(); @@ -69,6 +152,29 @@ async function touchSessionStore(params: { }); } +function queueSessionStoreTouch(params: { + ctx: NodeEventContext; + cfg: ReturnType; + sessionKey: string; + storePath: LoadedSessionEntry["storePath"]; + canonicalKey: LoadedSessionEntry["canonicalKey"]; + entry: LoadedSessionEntry["entry"]; + sessionId: string; + now: number; +}) { + void touchSessionStore({ + cfg: params.cfg, + sessionKey: params.sessionKey, + storePath: params.storePath, + canonicalKey: params.canonicalKey, + entry: params.entry, + sessionId: params.sessionId, + now: params.now, + }).catch((err) => { + params.ctx.logGateway.warn("voice session-store update failed: " + formatForLog(err)); + }); +} + function parseSessionKeyFromPayloadJSON(payloadJSON: string): string | null { let payload: unknown; try { @@ -111,8 +217,21 @@ export const handleNodeEvent = async (ctx: NodeEventContext, nodeId: string, evt const sessionKey = sessionKeyRaw.length > 0 ? sessionKeyRaw : rawMainKey; const { storePath, entry, canonicalKey } = loadSessionEntry(sessionKey); const now = Date.now(); + const fingerprint = resolveVoiceTranscriptFingerprint(obj, text); + if (shouldDropDuplicateVoiceTranscript({ sessionKey: canonicalKey, fingerprint, now })) { + return; + } const sessionId = entry?.sessionId ?? randomUUID(); - await touchSessionStore({ cfg, sessionKey, storePath, canonicalKey, entry, sessionId, now }); + queueSessionStoreTouch({ + ctx, + cfg, + sessionKey, + storePath, + canonicalKey, + entry, + sessionId, + now, + }); // Ensure chat UI clients refresh when this run completes (even though it wasn't started via chat.send). // This maps agent bus events (keyed by sessionId) to chat events (keyed by clientRunId). @@ -129,6 +248,11 @@ export const handleNodeEvent = async (ctx: NodeEventContext, nodeId: string, evt thinking: "low", deliver: false, messageChannel: "node", + inputProvenance: { + kind: "external_user", + sourceChannel: "voice", + sourceTool: "gateway.voice.transcript", + }, }, defaultRuntime, ctx.deps, diff --git a/src/plugins/voice-call.plugin.test.ts b/src/plugins/voice-call.plugin.test.ts index 8b6d703414..a9c4c49b74 100644 --- a/src/plugins/voice-call.plugin.test.ts +++ b/src/plugins/voice-call.plugin.test.ts @@ -1,4 +1,7 @@ import { Command } from "commander"; +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; let runtimeStub: { @@ -141,6 +144,69 @@ describe("voice-call plugin", () => { expect(String(result.details.error)).toContain("sid required"); }); + it("CLI latency summarizes turn metrics from JSONL", async () => { + const { register } = plugin as unknown as { + register: (api: Record) => void | Promise; + }; + const program = new Command(); + const tmpFile = path.join(os.tmpdir(), `voicecall-latency-${Date.now()}.jsonl`); + fs.writeFileSync( + tmpFile, + [ + JSON.stringify({ metadata: { lastTurnLatencyMs: 100, lastTurnListenWaitMs: 70 } }), + JSON.stringify({ metadata: { lastTurnLatencyMs: 200, lastTurnListenWaitMs: 110 } }), + ].join("\n") + "\n", + "utf8", + ); + + const logSpy = vi.spyOn(console, "log").mockImplementation(() => {}); + + try { + await register({ + id: "voice-call", + name: "Voice Call", + description: "test", + version: "0", + source: "test", + config: {}, + pluginConfig: { provider: "mock" }, + runtime: { tts: { textToSpeechTelephony: vi.fn() } }, + logger: noopLogger, + registerGatewayMethod: () => {}, + registerTool: () => {}, + registerCli: ( + fn: (ctx: { + program: Command; + config: Record; + workspaceDir?: string; + logger: typeof noopLogger; + }) => void, + ) => + fn({ + program, + config: {}, + workspaceDir: undefined, + logger: noopLogger, + }), + registerService: () => {}, + resolvePath: (p: string) => p, + }); + + await program.parseAsync(["voicecall", "latency", "--file", tmpFile, "--last", "10"], { + from: "user", + }); + + expect(logSpy).toHaveBeenCalled(); + const printed = String(logSpy.mock.calls.at(-1)?.[0] ?? ""); + expect(printed).toContain('"recordsScanned": 2'); + expect(printed).toContain('"p50Ms": 100'); + expect(printed).toContain('"p95Ms": 200'); + } finally { + logSpy.mockRestore(); + fs.unlinkSync(tmpFile); + } + }); + it("CLI start prints JSON", async () => { const { register } = plugin as unknown as { register: (api: Record) => void | Promise;