mirror of
https://github.com/openclaw/openclaw.git
synced 2026-02-19 18:39:20 -05:00
voice-call: harden closed-loop turn loop and transcript routing (#19140)
Merged via /review-pr -> /prepare-pr -> /merge-pr.
Prepared head SHA: 14a3edb005
Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com>
Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com>
Reviewed-by: @mbelinky
This commit is contained in:
@@ -18,6 +18,7 @@ Docs: https://docs.openclaw.ai
|
||||
|
||||
### Fixes
|
||||
|
||||
- Voice call/Gateway: prevent overlapping closed-loop turn races with per-call turn locking, route transcript dedupe via source-aware fingerprints with strict cache eviction bounds, and harden `voicecall latency` stats for large logs without spread-operator stack overflow. (#19140) Thanks @mbelinky.
|
||||
- Fix types in all tests. Typecheck the whole repository.
|
||||
- Voice-call: auto-end calls when media streams disconnect to prevent stuck active calls. (#18435) Thanks @JayMishra-source.
|
||||
- Gateway/Channels: wire `gateway.channelHealthCheckMinutes` into strict config validation, treat implicit account status as managed for health checks, and harden channel auto-restart flow (preserve restart-attempt caps across crash loops, propagate enabled/configured runtime flags, and stop pending restart backoff after manual stop). Thanks @steipete.
|
||||
|
||||
@@ -41,6 +41,40 @@ function resolveDefaultStorePath(config: VoiceCallConfig): string {
|
||||
return path.join(base, "calls.jsonl");
|
||||
}
|
||||
|
||||
function percentile(values: number[], p: number): number {
|
||||
if (values.length === 0) {
|
||||
return 0;
|
||||
}
|
||||
const sorted = [...values].sort((a, b) => a - b);
|
||||
const idx = Math.min(sorted.length - 1, Math.max(0, Math.ceil((p / 100) * sorted.length) - 1));
|
||||
return sorted[idx] ?? 0;
|
||||
}
|
||||
|
||||
function summarizeSeries(values: number[]): {
|
||||
count: number;
|
||||
minMs: number;
|
||||
maxMs: number;
|
||||
avgMs: number;
|
||||
p50Ms: number;
|
||||
p95Ms: number;
|
||||
} {
|
||||
if (values.length === 0) {
|
||||
return { count: 0, minMs: 0, maxMs: 0, avgMs: 0, p50Ms: 0, p95Ms: 0 };
|
||||
}
|
||||
|
||||
const minMs = values.reduce((min, value) => (value < min ? value : min), Number.POSITIVE_INFINITY);
|
||||
const maxMs = values.reduce((max, value) => (value > max ? value : max), Number.NEGATIVE_INFINITY);
|
||||
const avgMs = values.reduce((sum, value) => sum + value, 0) / values.length;
|
||||
return {
|
||||
count: values.length,
|
||||
minMs,
|
||||
maxMs,
|
||||
avgMs,
|
||||
p50Ms: percentile(values, 50),
|
||||
p95Ms: percentile(values, 95),
|
||||
};
|
||||
}
|
||||
|
||||
export function registerVoiceCallCli(params: {
|
||||
program: Command;
|
||||
config: VoiceCallConfig;
|
||||
@@ -216,6 +250,57 @@ export function registerVoiceCallCli(params: {
|
||||
}
|
||||
});
|
||||
|
||||
root
|
||||
.command("latency")
|
||||
.description("Summarize turn latency metrics from voice-call JSONL logs")
|
||||
.option("--file <path>", "Path to calls.jsonl", resolveDefaultStorePath(config))
|
||||
.option("--last <n>", "Analyze last N records", "200")
|
||||
.action(async (options: { file: string; last?: string }) => {
|
||||
const file = options.file;
|
||||
const last = Math.max(1, Number(options.last ?? 200));
|
||||
|
||||
if (!fs.existsSync(file)) {
|
||||
throw new Error("No log file at " + file);
|
||||
}
|
||||
|
||||
const content = fs.readFileSync(file, "utf8");
|
||||
const lines = content.split("\n").filter(Boolean).slice(-last);
|
||||
|
||||
const turnLatencyMs: number[] = [];
|
||||
const listenWaitMs: number[] = [];
|
||||
|
||||
for (const line of lines) {
|
||||
try {
|
||||
const parsed = JSON.parse(line) as {
|
||||
metadata?: { lastTurnLatencyMs?: unknown; lastTurnListenWaitMs?: unknown };
|
||||
};
|
||||
const latency = parsed.metadata?.lastTurnLatencyMs;
|
||||
const listenWait = parsed.metadata?.lastTurnListenWaitMs;
|
||||
if (typeof latency === "number" && Number.isFinite(latency)) {
|
||||
turnLatencyMs.push(latency);
|
||||
}
|
||||
if (typeof listenWait === "number" && Number.isFinite(listenWait)) {
|
||||
listenWaitMs.push(listenWait);
|
||||
}
|
||||
} catch {
|
||||
// ignore malformed JSON lines
|
||||
}
|
||||
}
|
||||
|
||||
// eslint-disable-next-line no-console
|
||||
console.log(
|
||||
JSON.stringify(
|
||||
{
|
||||
recordsScanned: lines.length,
|
||||
turnLatency: summarizeSeries(turnLatencyMs),
|
||||
listenWait: summarizeSeries(listenWaitMs),
|
||||
},
|
||||
null,
|
||||
2,
|
||||
),
|
||||
);
|
||||
});
|
||||
|
||||
root
|
||||
.command("expose")
|
||||
.description("Enable/disable Tailscale serve/funnel for the webhook")
|
||||
|
||||
@@ -20,6 +20,8 @@ class FakeProvider implements VoiceCallProvider {
|
||||
readonly name = "plivo" as const;
|
||||
readonly playTtsCalls: PlayTtsInput[] = [];
|
||||
readonly hangupCalls: HangupCallInput[] = [];
|
||||
readonly startListeningCalls: StartListeningInput[] = [];
|
||||
readonly stopListeningCalls: StopListeningInput[] = [];
|
||||
|
||||
verifyWebhook(_ctx: WebhookContext): WebhookVerificationResult {
|
||||
return { ok: true };
|
||||
@@ -36,8 +38,12 @@ class FakeProvider implements VoiceCallProvider {
|
||||
async playTts(input: PlayTtsInput): Promise<void> {
|
||||
this.playTtsCalls.push(input);
|
||||
}
|
||||
async startListening(_input: StartListeningInput): Promise<void> {}
|
||||
async stopListening(_input: StopListeningInput): Promise<void> {}
|
||||
async startListening(input: StartListeningInput): Promise<void> {
|
||||
this.startListeningCalls.push(input);
|
||||
}
|
||||
async stopListening(input: StopListeningInput): Promise<void> {
|
||||
this.stopListeningCalls.push(input);
|
||||
}
|
||||
}
|
||||
|
||||
describe("CallManager", () => {
|
||||
@@ -261,4 +267,219 @@ describe("CallManager", () => {
|
||||
|
||||
expect(manager.getCallByProviderCallId("provider-exact")).toBeDefined();
|
||||
});
|
||||
|
||||
it("completes a closed-loop turn without live audio", async () => {
|
||||
const config = VoiceCallConfigSchema.parse({
|
||||
enabled: true,
|
||||
provider: "plivo",
|
||||
fromNumber: "+15550000000",
|
||||
transcriptTimeoutMs: 5000,
|
||||
});
|
||||
|
||||
const storePath = path.join(os.tmpdir(), `openclaw-voice-call-test-${Date.now()}`);
|
||||
const provider = new FakeProvider();
|
||||
const manager = new CallManager(config, storePath);
|
||||
manager.initialize(provider, "https://example.com/voice/webhook");
|
||||
|
||||
const started = await manager.initiateCall("+15550000003");
|
||||
expect(started.success).toBe(true);
|
||||
|
||||
manager.processEvent({
|
||||
id: "evt-closed-loop-answered",
|
||||
type: "call.answered",
|
||||
callId: started.callId,
|
||||
providerCallId: "request-uuid",
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
|
||||
const turnPromise = manager.continueCall(started.callId, "How can I help?");
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
|
||||
manager.processEvent({
|
||||
id: "evt-closed-loop-speech",
|
||||
type: "call.speech",
|
||||
callId: started.callId,
|
||||
providerCallId: "request-uuid",
|
||||
timestamp: Date.now(),
|
||||
transcript: "Please check status",
|
||||
isFinal: true,
|
||||
});
|
||||
|
||||
const turn = await turnPromise;
|
||||
expect(turn.success).toBe(true);
|
||||
expect(turn.transcript).toBe("Please check status");
|
||||
expect(provider.startListeningCalls).toHaveLength(1);
|
||||
expect(provider.stopListeningCalls).toHaveLength(1);
|
||||
|
||||
const call = manager.getCall(started.callId);
|
||||
expect(call?.transcript.map((entry) => entry.text)).toEqual([
|
||||
"How can I help?",
|
||||
"Please check status",
|
||||
]);
|
||||
const metadata = (call?.metadata ?? {}) as Record<string, unknown>;
|
||||
expect(typeof metadata.lastTurnLatencyMs).toBe("number");
|
||||
expect(typeof metadata.lastTurnListenWaitMs).toBe("number");
|
||||
expect(metadata.turnCount).toBe(1);
|
||||
});
|
||||
|
||||
it("rejects overlapping continueCall requests for the same call", async () => {
|
||||
const config = VoiceCallConfigSchema.parse({
|
||||
enabled: true,
|
||||
provider: "plivo",
|
||||
fromNumber: "+15550000000",
|
||||
transcriptTimeoutMs: 5000,
|
||||
});
|
||||
|
||||
const storePath = path.join(os.tmpdir(), `openclaw-voice-call-test-${Date.now()}`);
|
||||
const provider = new FakeProvider();
|
||||
const manager = new CallManager(config, storePath);
|
||||
manager.initialize(provider, "https://example.com/voice/webhook");
|
||||
|
||||
const started = await manager.initiateCall("+15550000004");
|
||||
expect(started.success).toBe(true);
|
||||
|
||||
manager.processEvent({
|
||||
id: "evt-overlap-answered",
|
||||
type: "call.answered",
|
||||
callId: started.callId,
|
||||
providerCallId: "request-uuid",
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
|
||||
const first = manager.continueCall(started.callId, "First prompt");
|
||||
const second = await manager.continueCall(started.callId, "Second prompt");
|
||||
expect(second.success).toBe(false);
|
||||
expect(second.error).toBe("Already waiting for transcript");
|
||||
|
||||
manager.processEvent({
|
||||
id: "evt-overlap-speech",
|
||||
type: "call.speech",
|
||||
callId: started.callId,
|
||||
providerCallId: "request-uuid",
|
||||
timestamp: Date.now(),
|
||||
transcript: "Done",
|
||||
isFinal: true,
|
||||
});
|
||||
|
||||
const firstResult = await first;
|
||||
expect(firstResult.success).toBe(true);
|
||||
expect(firstResult.transcript).toBe("Done");
|
||||
expect(provider.startListeningCalls).toHaveLength(1);
|
||||
expect(provider.stopListeningCalls).toHaveLength(1);
|
||||
});
|
||||
|
||||
it("tracks latency metadata across multiple closed-loop turns", async () => {
|
||||
const config = VoiceCallConfigSchema.parse({
|
||||
enabled: true,
|
||||
provider: "plivo",
|
||||
fromNumber: "+15550000000",
|
||||
transcriptTimeoutMs: 5000,
|
||||
});
|
||||
|
||||
const storePath = path.join(os.tmpdir(), `openclaw-voice-call-test-${Date.now()}`);
|
||||
const provider = new FakeProvider();
|
||||
const manager = new CallManager(config, storePath);
|
||||
manager.initialize(provider, "https://example.com/voice/webhook");
|
||||
|
||||
const started = await manager.initiateCall("+15550000005");
|
||||
expect(started.success).toBe(true);
|
||||
|
||||
manager.processEvent({
|
||||
id: "evt-multi-answered",
|
||||
type: "call.answered",
|
||||
callId: started.callId,
|
||||
providerCallId: "request-uuid",
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
|
||||
const firstTurn = manager.continueCall(started.callId, "First question");
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
manager.processEvent({
|
||||
id: "evt-multi-speech-1",
|
||||
type: "call.speech",
|
||||
callId: started.callId,
|
||||
providerCallId: "request-uuid",
|
||||
timestamp: Date.now(),
|
||||
transcript: "First answer",
|
||||
isFinal: true,
|
||||
});
|
||||
await firstTurn;
|
||||
|
||||
const secondTurn = manager.continueCall(started.callId, "Second question");
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
manager.processEvent({
|
||||
id: "evt-multi-speech-2",
|
||||
type: "call.speech",
|
||||
callId: started.callId,
|
||||
providerCallId: "request-uuid",
|
||||
timestamp: Date.now(),
|
||||
transcript: "Second answer",
|
||||
isFinal: true,
|
||||
});
|
||||
const secondResult = await secondTurn;
|
||||
|
||||
expect(secondResult.success).toBe(true);
|
||||
|
||||
const call = manager.getCall(started.callId);
|
||||
expect(call?.transcript.map((entry) => entry.text)).toEqual([
|
||||
"First question",
|
||||
"First answer",
|
||||
"Second question",
|
||||
"Second answer",
|
||||
]);
|
||||
const metadata = (call?.metadata ?? {}) as Record<string, unknown>;
|
||||
expect(metadata.turnCount).toBe(2);
|
||||
expect(typeof metadata.lastTurnLatencyMs).toBe("number");
|
||||
expect(typeof metadata.lastTurnListenWaitMs).toBe("number");
|
||||
expect(provider.startListeningCalls).toHaveLength(2);
|
||||
expect(provider.stopListeningCalls).toHaveLength(2);
|
||||
});
|
||||
|
||||
it("handles repeated closed-loop turns without waiter churn", async () => {
|
||||
const config = VoiceCallConfigSchema.parse({
|
||||
enabled: true,
|
||||
provider: "plivo",
|
||||
fromNumber: "+15550000000",
|
||||
transcriptTimeoutMs: 5000,
|
||||
});
|
||||
|
||||
const storePath = path.join(os.tmpdir(), `openclaw-voice-call-test-${Date.now()}`);
|
||||
const provider = new FakeProvider();
|
||||
const manager = new CallManager(config, storePath);
|
||||
manager.initialize(provider, "https://example.com/voice/webhook");
|
||||
|
||||
const started = await manager.initiateCall("+15550000006");
|
||||
expect(started.success).toBe(true);
|
||||
|
||||
manager.processEvent({
|
||||
id: "evt-loop-answered",
|
||||
type: "call.answered",
|
||||
callId: started.callId,
|
||||
providerCallId: "request-uuid",
|
||||
timestamp: Date.now(),
|
||||
});
|
||||
|
||||
for (let i = 1; i <= 5; i++) {
|
||||
const turnPromise = manager.continueCall(started.callId, `Prompt ${i}`);
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
manager.processEvent({
|
||||
id: `evt-loop-speech-${i}`,
|
||||
type: "call.speech",
|
||||
callId: started.callId,
|
||||
providerCallId: "request-uuid",
|
||||
timestamp: Date.now(),
|
||||
transcript: `Answer ${i}`,
|
||||
isFinal: true,
|
||||
});
|
||||
const result = await turnPromise;
|
||||
expect(result.success).toBe(true);
|
||||
expect(result.transcript).toBe(`Answer ${i}`);
|
||||
}
|
||||
|
||||
const call = manager.getCall(started.callId);
|
||||
const metadata = (call?.metadata ?? {}) as Record<string, unknown>;
|
||||
expect(metadata.turnCount).toBe(5);
|
||||
expect(provider.startListeningCalls).toHaveLength(5);
|
||||
expect(provider.stopListeningCalls).toHaveLength(5);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -47,6 +47,7 @@ export class CallManager {
|
||||
private config: VoiceCallConfig;
|
||||
private storePath: string;
|
||||
private webhookUrl: string | null = null;
|
||||
private activeTurnCalls = new Set<CallId>();
|
||||
private transcriptWaiters = new Map<
|
||||
CallId,
|
||||
{
|
||||
@@ -137,6 +138,7 @@ export class CallManager {
|
||||
config: this.config,
|
||||
storePath: this.storePath,
|
||||
webhookUrl: this.webhookUrl,
|
||||
activeTurnCalls: this.activeTurnCalls,
|
||||
transcriptWaiters: this.transcriptWaiters,
|
||||
maxDurationTimers: this.maxDurationTimers,
|
||||
onCallAnswered: (call) => {
|
||||
|
||||
@@ -24,6 +24,7 @@ export type CallManagerRuntimeDeps = {
|
||||
};
|
||||
|
||||
export type CallManagerTransientState = {
|
||||
activeTurnCalls: Set<CallId>;
|
||||
transcriptWaiters: Map<CallId, TranscriptWaiter>;
|
||||
maxDurationTimers: Map<CallId, NodeJS.Timeout>;
|
||||
};
|
||||
|
||||
@@ -24,6 +24,7 @@ function createContext(overrides: Partial<CallManagerContext> = {}): CallManager
|
||||
}),
|
||||
storePath,
|
||||
webhookUrl: null,
|
||||
activeTurnCalls: new Set(),
|
||||
transcriptWaiters: new Map(),
|
||||
maxDurationTimers: new Map(),
|
||||
...overrides,
|
||||
|
||||
@@ -36,6 +36,7 @@ type ConversationContext = Pick<
|
||||
| "provider"
|
||||
| "config"
|
||||
| "storePath"
|
||||
| "activeTurnCalls"
|
||||
| "transcriptWaiters"
|
||||
| "maxDurationTimers"
|
||||
>;
|
||||
@@ -158,7 +159,6 @@ export async function speak(
|
||||
if (TerminalStates.has(call.state)) {
|
||||
return { success: false, error: "Call has ended" };
|
||||
}
|
||||
|
||||
try {
|
||||
transitionState(call, "speaking");
|
||||
persistCallRecord(ctx.storePath, call);
|
||||
@@ -242,6 +242,12 @@ export async function continueCall(
|
||||
if (TerminalStates.has(call.state)) {
|
||||
return { success: false, error: "Call has ended" };
|
||||
}
|
||||
if (ctx.activeTurnCalls.has(callId) || ctx.transcriptWaiters.has(callId)) {
|
||||
return { success: false, error: "Already waiting for transcript" };
|
||||
}
|
||||
ctx.activeTurnCalls.add(callId);
|
||||
|
||||
const turnStartedAt = Date.now();
|
||||
|
||||
try {
|
||||
await speak(ctx, callId, prompt);
|
||||
@@ -249,17 +255,45 @@ export async function continueCall(
|
||||
transitionState(call, "listening");
|
||||
persistCallRecord(ctx.storePath, call);
|
||||
|
||||
const listenStartedAt = Date.now();
|
||||
await ctx.provider.startListening({ callId, providerCallId: call.providerCallId });
|
||||
|
||||
const transcript = await waitForFinalTranscript(ctx, callId);
|
||||
const transcriptReceivedAt = Date.now();
|
||||
|
||||
// Best-effort: stop listening after final transcript.
|
||||
await ctx.provider.stopListening({ callId, providerCallId: call.providerCallId });
|
||||
|
||||
const lastTurnLatencyMs = transcriptReceivedAt - turnStartedAt;
|
||||
const lastTurnListenWaitMs = transcriptReceivedAt - listenStartedAt;
|
||||
const turnCount =
|
||||
call.metadata && typeof call.metadata.turnCount === "number"
|
||||
? call.metadata.turnCount + 1
|
||||
: 1;
|
||||
|
||||
call.metadata = {
|
||||
...(call.metadata ?? {}),
|
||||
turnCount,
|
||||
lastTurnLatencyMs,
|
||||
lastTurnListenWaitMs,
|
||||
lastTurnCompletedAt: transcriptReceivedAt,
|
||||
};
|
||||
persistCallRecord(ctx.storePath, call);
|
||||
|
||||
console.log(
|
||||
"[voice-call] continueCall latency call=" +
|
||||
call.callId +
|
||||
" totalMs=" +
|
||||
String(lastTurnLatencyMs) +
|
||||
" listenWaitMs=" +
|
||||
String(lastTurnListenWaitMs),
|
||||
);
|
||||
|
||||
return { success: true, transcript };
|
||||
} catch (err) {
|
||||
return { success: false, error: err instanceof Error ? err.message : String(err) };
|
||||
} finally {
|
||||
ctx.activeTurnCalls.delete(callId);
|
||||
clearTranscriptWaiter(ctx, callId);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -87,8 +87,9 @@ export function resolveTranscriptWaiter(
|
||||
}
|
||||
|
||||
export function waitForFinalTranscript(ctx: TimerContext, callId: CallId): Promise<string> {
|
||||
// Only allow one in-flight waiter per call.
|
||||
rejectTranscriptWaiter(ctx, callId, "Transcript waiter replaced");
|
||||
if (ctx.transcriptWaiters.has(callId)) {
|
||||
return Promise.reject(new Error("Already waiting for transcript"));
|
||||
}
|
||||
|
||||
const timeoutMs = ctx.config.transcriptTimeoutMs;
|
||||
return new Promise((resolve, reject) => {
|
||||
|
||||
@@ -117,6 +117,7 @@
|
||||
"test:live": "OPENCLAW_LIVE_TEST=1 CLAWDBOT_LIVE_TEST=1 vitest run --config vitest.live.config.ts",
|
||||
"test:macmini": "OPENCLAW_TEST_VM_FORKS=0 OPENCLAW_TEST_PROFILE=serial node scripts/test-parallel.mjs",
|
||||
"test:ui": "pnpm --dir ui test",
|
||||
"test:voicecall:closedloop": "vitest run extensions/voice-call/src/manager.test.ts extensions/voice-call/src/media-stream.test.ts src/plugins/voice-call.plugin.test.ts --maxWorkers=1",
|
||||
"test:watch": "vitest",
|
||||
"tui": "node scripts/run-node.mjs tui",
|
||||
"tui:dev": "OPENCLAW_PROFILE=dev CLAWDBOT_PROFILE=dev node scripts/run-node.mjs --dev tui",
|
||||
|
||||
@@ -6,16 +6,41 @@ vi.mock("../infra/system-events.js", () => ({
|
||||
vi.mock("../infra/heartbeat-wake.js", () => ({
|
||||
requestHeartbeatNow: vi.fn(),
|
||||
}));
|
||||
vi.mock("../commands/agent.js", () => ({
|
||||
agentCommand: vi.fn(),
|
||||
}));
|
||||
vi.mock("../config/config.js", () => ({
|
||||
loadConfig: vi.fn(() => ({ session: { mainKey: "agent:main:main" } })),
|
||||
}));
|
||||
vi.mock("../config/sessions.js", () => ({
|
||||
updateSessionStore: vi.fn(),
|
||||
}));
|
||||
vi.mock("./session-utils.js", () => ({
|
||||
loadSessionEntry: vi.fn((sessionKey: string) => ({
|
||||
storePath: "/tmp/sessions.json",
|
||||
entry: { sessionId: `sid-${sessionKey}` },
|
||||
canonicalKey: sessionKey,
|
||||
})),
|
||||
pruneLegacyStoreKeys: vi.fn(),
|
||||
resolveGatewaySessionStoreTarget: vi.fn(({ key }: { key: string }) => ({
|
||||
canonicalKey: key,
|
||||
storeKeys: [key],
|
||||
})),
|
||||
}));
|
||||
|
||||
import type { CliDeps } from "../cli/deps.js";
|
||||
import type { HealthSummary } from "../commands/health.js";
|
||||
import type { NodeEventContext } from "./server-node-events-types.js";
|
||||
import { agentCommand } from "../commands/agent.js";
|
||||
import { updateSessionStore } from "../config/sessions.js";
|
||||
import { requestHeartbeatNow } from "../infra/heartbeat-wake.js";
|
||||
import { enqueueSystemEvent } from "../infra/system-events.js";
|
||||
import type { NodeEventContext } from "./server-node-events-types.js";
|
||||
import { handleNodeEvent } from "./server-node-events.js";
|
||||
|
||||
const enqueueSystemEventMock = vi.mocked(enqueueSystemEvent);
|
||||
const requestHeartbeatNowMock = vi.mocked(requestHeartbeatNow);
|
||||
const agentCommandMock = vi.mocked(agentCommand);
|
||||
const updateSessionStoreMock = vi.mocked(updateSessionStore);
|
||||
|
||||
function buildCtx(): NodeEventContext {
|
||||
return {
|
||||
@@ -138,3 +163,106 @@ describe("node exec events", () => {
|
||||
expect(requestHeartbeatNowMock).toHaveBeenCalledWith({ reason: "exec-event" });
|
||||
});
|
||||
});
|
||||
|
||||
describe("voice transcript events", () => {
|
||||
beforeEach(() => {
|
||||
agentCommandMock.mockReset();
|
||||
updateSessionStoreMock.mockReset();
|
||||
agentCommandMock.mockResolvedValue({ status: "ok" } as never);
|
||||
updateSessionStoreMock.mockImplementation(async (_storePath, update) => {
|
||||
update({});
|
||||
});
|
||||
});
|
||||
|
||||
it("dedupes repeated transcript payloads for the same session", async () => {
|
||||
const addChatRun = vi.fn();
|
||||
const ctx = buildCtx();
|
||||
ctx.addChatRun = addChatRun;
|
||||
|
||||
const payload = {
|
||||
text: "hello from mic",
|
||||
sessionKey: "voice-dedupe-session",
|
||||
};
|
||||
|
||||
await handleNodeEvent(ctx, "node-v1", {
|
||||
event: "voice.transcript",
|
||||
payloadJSON: JSON.stringify(payload),
|
||||
});
|
||||
await handleNodeEvent(ctx, "node-v1", {
|
||||
event: "voice.transcript",
|
||||
payloadJSON: JSON.stringify(payload),
|
||||
});
|
||||
|
||||
expect(agentCommandMock).toHaveBeenCalledTimes(1);
|
||||
expect(addChatRun).toHaveBeenCalledTimes(1);
|
||||
expect(updateSessionStoreMock).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("does not dedupe identical text when source event IDs differ", async () => {
|
||||
const ctx = buildCtx();
|
||||
|
||||
await handleNodeEvent(ctx, "node-v1", {
|
||||
event: "voice.transcript",
|
||||
payloadJSON: JSON.stringify({
|
||||
text: "hello from mic",
|
||||
sessionKey: "voice-dedupe-eventid-session",
|
||||
eventId: "evt-voice-1",
|
||||
}),
|
||||
});
|
||||
await handleNodeEvent(ctx, "node-v1", {
|
||||
event: "voice.transcript",
|
||||
payloadJSON: JSON.stringify({
|
||||
text: "hello from mic",
|
||||
sessionKey: "voice-dedupe-eventid-session",
|
||||
eventId: "evt-voice-2",
|
||||
}),
|
||||
});
|
||||
|
||||
expect(agentCommandMock).toHaveBeenCalledTimes(2);
|
||||
expect(updateSessionStoreMock).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it("forwards transcript with voice provenance", async () => {
|
||||
const ctx = buildCtx();
|
||||
|
||||
await handleNodeEvent(ctx, "node-v2", {
|
||||
event: "voice.transcript",
|
||||
payloadJSON: JSON.stringify({
|
||||
text: "check provenance",
|
||||
sessionKey: "voice-provenance-session",
|
||||
}),
|
||||
});
|
||||
|
||||
expect(agentCommandMock).toHaveBeenCalledTimes(1);
|
||||
const [opts] = agentCommandMock.mock.calls[0] ?? [];
|
||||
expect(opts).toMatchObject({
|
||||
message: "check provenance",
|
||||
deliver: false,
|
||||
messageChannel: "node",
|
||||
inputProvenance: {
|
||||
kind: "external_user",
|
||||
sourceChannel: "voice",
|
||||
sourceTool: "gateway.voice.transcript",
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
it("does not block agent dispatch when session-store touch fails", async () => {
|
||||
const warn = vi.fn();
|
||||
const ctx = buildCtx();
|
||||
ctx.logGateway = { warn };
|
||||
updateSessionStoreMock.mockRejectedValueOnce(new Error("disk down"));
|
||||
|
||||
await handleNodeEvent(ctx, "node-v3", {
|
||||
event: "voice.transcript",
|
||||
payloadJSON: JSON.stringify({
|
||||
text: "continue anyway",
|
||||
sessionKey: "voice-store-fail-session",
|
||||
}),
|
||||
});
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
|
||||
expect(agentCommandMock).toHaveBeenCalledTimes(1);
|
||||
expect(warn).toHaveBeenCalledWith(expect.stringContaining("voice session-store update failed"));
|
||||
});
|
||||
});
|
||||
|
||||
@@ -16,6 +16,89 @@ import {
|
||||
import { formatForLog } from "./ws-log.js";
|
||||
|
||||
const MAX_EXEC_EVENT_OUTPUT_CHARS = 180;
|
||||
const VOICE_TRANSCRIPT_DEDUPE_WINDOW_MS = 1500;
|
||||
const MAX_RECENT_VOICE_TRANSCRIPTS = 200;
|
||||
|
||||
const recentVoiceTranscripts = new Map<string, { fingerprint: string; ts: number }>();
|
||||
|
||||
function normalizeNonEmptyString(value: unknown): string | null {
|
||||
if (typeof value !== "string") {
|
||||
return null;
|
||||
}
|
||||
const trimmed = value.trim();
|
||||
return trimmed.length > 0 ? trimmed : null;
|
||||
}
|
||||
|
||||
function normalizeFiniteInteger(value: unknown): number | null {
|
||||
return typeof value === "number" && Number.isFinite(value) ? Math.trunc(value) : null;
|
||||
}
|
||||
|
||||
function resolveVoiceTranscriptFingerprint(obj: Record<string, unknown>, text: string): string {
|
||||
const eventId =
|
||||
normalizeNonEmptyString(obj.eventId) ??
|
||||
normalizeNonEmptyString(obj.providerEventId) ??
|
||||
normalizeNonEmptyString(obj.transcriptId);
|
||||
if (eventId) {
|
||||
return `event:${eventId}`;
|
||||
}
|
||||
|
||||
const callId = normalizeNonEmptyString(obj.providerCallId) ?? normalizeNonEmptyString(obj.callId);
|
||||
const sequence = normalizeFiniteInteger(obj.sequence) ?? normalizeFiniteInteger(obj.seq);
|
||||
if (callId && sequence !== null) {
|
||||
return `call-seq:${callId}:${sequence}`;
|
||||
}
|
||||
|
||||
const eventTimestamp =
|
||||
normalizeFiniteInteger(obj.timestamp) ??
|
||||
normalizeFiniteInteger(obj.ts) ??
|
||||
normalizeFiniteInteger(obj.eventTimestamp);
|
||||
if (callId && eventTimestamp !== null) {
|
||||
return `call-ts:${callId}:${eventTimestamp}`;
|
||||
}
|
||||
|
||||
if (eventTimestamp !== null) {
|
||||
return `timestamp:${eventTimestamp}|text:${text}`;
|
||||
}
|
||||
|
||||
return `text:${text}`;
|
||||
}
|
||||
|
||||
function shouldDropDuplicateVoiceTranscript(params: {
|
||||
sessionKey: string;
|
||||
fingerprint: string;
|
||||
now: number;
|
||||
}): boolean {
|
||||
const previous = recentVoiceTranscripts.get(params.sessionKey);
|
||||
if (
|
||||
previous &&
|
||||
previous.fingerprint === params.fingerprint &&
|
||||
params.now - previous.ts <= VOICE_TRANSCRIPT_DEDUPE_WINDOW_MS
|
||||
) {
|
||||
return true;
|
||||
}
|
||||
recentVoiceTranscripts.set(params.sessionKey, { fingerprint: params.fingerprint, ts: params.now });
|
||||
|
||||
if (recentVoiceTranscripts.size > MAX_RECENT_VOICE_TRANSCRIPTS) {
|
||||
const cutoff = params.now - VOICE_TRANSCRIPT_DEDUPE_WINDOW_MS * 2;
|
||||
for (const [key, value] of recentVoiceTranscripts) {
|
||||
if (value.ts < cutoff) {
|
||||
recentVoiceTranscripts.delete(key);
|
||||
}
|
||||
if (recentVoiceTranscripts.size <= MAX_RECENT_VOICE_TRANSCRIPTS) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
while (recentVoiceTranscripts.size > MAX_RECENT_VOICE_TRANSCRIPTS) {
|
||||
const oldestKey = recentVoiceTranscripts.keys().next().value;
|
||||
if (oldestKey === undefined) {
|
||||
break;
|
||||
}
|
||||
recentVoiceTranscripts.delete(oldestKey);
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
function compactExecEventOutput(raw: string) {
|
||||
const normalized = raw.replace(/\s+/g, " ").trim();
|
||||
@@ -69,6 +152,29 @@ async function touchSessionStore(params: {
|
||||
});
|
||||
}
|
||||
|
||||
function queueSessionStoreTouch(params: {
|
||||
ctx: NodeEventContext;
|
||||
cfg: ReturnType<typeof loadConfig>;
|
||||
sessionKey: string;
|
||||
storePath: LoadedSessionEntry["storePath"];
|
||||
canonicalKey: LoadedSessionEntry["canonicalKey"];
|
||||
entry: LoadedSessionEntry["entry"];
|
||||
sessionId: string;
|
||||
now: number;
|
||||
}) {
|
||||
void touchSessionStore({
|
||||
cfg: params.cfg,
|
||||
sessionKey: params.sessionKey,
|
||||
storePath: params.storePath,
|
||||
canonicalKey: params.canonicalKey,
|
||||
entry: params.entry,
|
||||
sessionId: params.sessionId,
|
||||
now: params.now,
|
||||
}).catch((err) => {
|
||||
params.ctx.logGateway.warn("voice session-store update failed: " + formatForLog(err));
|
||||
});
|
||||
}
|
||||
|
||||
function parseSessionKeyFromPayloadJSON(payloadJSON: string): string | null {
|
||||
let payload: unknown;
|
||||
try {
|
||||
@@ -111,8 +217,21 @@ export const handleNodeEvent = async (ctx: NodeEventContext, nodeId: string, evt
|
||||
const sessionKey = sessionKeyRaw.length > 0 ? sessionKeyRaw : rawMainKey;
|
||||
const { storePath, entry, canonicalKey } = loadSessionEntry(sessionKey);
|
||||
const now = Date.now();
|
||||
const fingerprint = resolveVoiceTranscriptFingerprint(obj, text);
|
||||
if (shouldDropDuplicateVoiceTranscript({ sessionKey: canonicalKey, fingerprint, now })) {
|
||||
return;
|
||||
}
|
||||
const sessionId = entry?.sessionId ?? randomUUID();
|
||||
await touchSessionStore({ cfg, sessionKey, storePath, canonicalKey, entry, sessionId, now });
|
||||
queueSessionStoreTouch({
|
||||
ctx,
|
||||
cfg,
|
||||
sessionKey,
|
||||
storePath,
|
||||
canonicalKey,
|
||||
entry,
|
||||
sessionId,
|
||||
now,
|
||||
});
|
||||
|
||||
// Ensure chat UI clients refresh when this run completes (even though it wasn't started via chat.send).
|
||||
// This maps agent bus events (keyed by sessionId) to chat events (keyed by clientRunId).
|
||||
@@ -129,6 +248,11 @@ export const handleNodeEvent = async (ctx: NodeEventContext, nodeId: string, evt
|
||||
thinking: "low",
|
||||
deliver: false,
|
||||
messageChannel: "node",
|
||||
inputProvenance: {
|
||||
kind: "external_user",
|
||||
sourceChannel: "voice",
|
||||
sourceTool: "gateway.voice.transcript",
|
||||
},
|
||||
},
|
||||
defaultRuntime,
|
||||
ctx.deps,
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
import { Command } from "commander";
|
||||
import fs from "node:fs";
|
||||
import os from "node:os";
|
||||
import path from "node:path";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
|
||||
let runtimeStub: {
|
||||
@@ -141,6 +144,69 @@ describe("voice-call plugin", () => {
|
||||
expect(String(result.details.error)).toContain("sid required");
|
||||
});
|
||||
|
||||
it("CLI latency summarizes turn metrics from JSONL", async () => {
|
||||
const { register } = plugin as unknown as {
|
||||
register: (api: Record<string, unknown>) => void | Promise<void>;
|
||||
};
|
||||
const program = new Command();
|
||||
const tmpFile = path.join(os.tmpdir(), `voicecall-latency-${Date.now()}.jsonl`);
|
||||
fs.writeFileSync(
|
||||
tmpFile,
|
||||
[
|
||||
JSON.stringify({ metadata: { lastTurnLatencyMs: 100, lastTurnListenWaitMs: 70 } }),
|
||||
JSON.stringify({ metadata: { lastTurnLatencyMs: 200, lastTurnListenWaitMs: 110 } }),
|
||||
].join("\n") + "\n",
|
||||
"utf8",
|
||||
);
|
||||
|
||||
const logSpy = vi.spyOn(console, "log").mockImplementation(() => {});
|
||||
|
||||
try {
|
||||
await register({
|
||||
id: "voice-call",
|
||||
name: "Voice Call",
|
||||
description: "test",
|
||||
version: "0",
|
||||
source: "test",
|
||||
config: {},
|
||||
pluginConfig: { provider: "mock" },
|
||||
runtime: { tts: { textToSpeechTelephony: vi.fn() } },
|
||||
logger: noopLogger,
|
||||
registerGatewayMethod: () => {},
|
||||
registerTool: () => {},
|
||||
registerCli: (
|
||||
fn: (ctx: {
|
||||
program: Command;
|
||||
config: Record<string, unknown>;
|
||||
workspaceDir?: string;
|
||||
logger: typeof noopLogger;
|
||||
}) => void,
|
||||
) =>
|
||||
fn({
|
||||
program,
|
||||
config: {},
|
||||
workspaceDir: undefined,
|
||||
logger: noopLogger,
|
||||
}),
|
||||
registerService: () => {},
|
||||
resolvePath: (p: string) => p,
|
||||
});
|
||||
|
||||
await program.parseAsync(["voicecall", "latency", "--file", tmpFile, "--last", "10"], {
|
||||
from: "user",
|
||||
});
|
||||
|
||||
expect(logSpy).toHaveBeenCalled();
|
||||
const printed = String(logSpy.mock.calls.at(-1)?.[0] ?? "");
|
||||
expect(printed).toContain('"recordsScanned": 2');
|
||||
expect(printed).toContain('"p50Ms": 100');
|
||||
expect(printed).toContain('"p95Ms": 200');
|
||||
} finally {
|
||||
logSpy.mockRestore();
|
||||
fs.unlinkSync(tmpFile);
|
||||
}
|
||||
});
|
||||
|
||||
it("CLI start prints JSON", async () => {
|
||||
const { register } = plugin as unknown as {
|
||||
register: (api: Record<string, unknown>) => void | Promise<void>;
|
||||
|
||||
Reference in New Issue
Block a user