mirror of
https://github.com/openclaw/openclaw.git
synced 2026-02-19 18:39:20 -05:00
Voice-call: close overlap races and dedupe hardening openclaw#19140 thanks @mbelinky
This commit is contained in:
@@ -18,6 +18,7 @@ Docs: https://docs.openclaw.ai
|
||||
|
||||
### Fixes
|
||||
|
||||
- Voice call/Gateway: prevent overlapping closed-loop turn races with per-call turn locking, route transcript dedupe via source-aware fingerprints with strict cache eviction bounds, and harden `voicecall latency` stats for large logs without spread-operator stack overflow. (#19140) Thanks @mbelinky.
|
||||
- Fix types in all tests. Typecheck the whole repository.
|
||||
- Voice-call: auto-end calls when media streams disconnect to prevent stuck active calls. (#18435) Thanks @JayMishra-source.
|
||||
- Gateway/Channels: wire `gateway.channelHealthCheckMinutes` into strict config validation, treat implicit account status as managed for health checks, and harden channel auto-restart flow (preserve restart-attempt caps across crash loops, propagate enabled/configured runtime flags, and stop pending restart backoff after manual stop). Thanks @steipete.
|
||||
|
||||
@@ -62,8 +62,8 @@ function summarizeSeries(values: number[]): {
|
||||
return { count: 0, minMs: 0, maxMs: 0, avgMs: 0, p50Ms: 0, p95Ms: 0 };
|
||||
}
|
||||
|
||||
const minMs = Math.min(...values);
|
||||
const maxMs = Math.max(...values);
|
||||
const minMs = values.reduce((min, value) => (value < min ? value : min), Number.POSITIVE_INFINITY);
|
||||
const maxMs = values.reduce((max, value) => (value > max ? value : max), Number.NEGATIVE_INFINITY);
|
||||
const avgMs = values.reduce((sum, value) => sum + value, 0) / values.length;
|
||||
return {
|
||||
count: values.length,
|
||||
|
||||
@@ -347,8 +347,6 @@ describe("CallManager", () => {
|
||||
});
|
||||
|
||||
const first = manager.continueCall(started.callId, "First prompt");
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
|
||||
const second = await manager.continueCall(started.callId, "Second prompt");
|
||||
expect(second.success).toBe(false);
|
||||
expect(second.error).toBe("Already waiting for transcript");
|
||||
|
||||
@@ -47,6 +47,7 @@ export class CallManager {
|
||||
private config: VoiceCallConfig;
|
||||
private storePath: string;
|
||||
private webhookUrl: string | null = null;
|
||||
private activeTurnCalls = new Set<CallId>();
|
||||
private transcriptWaiters = new Map<
|
||||
CallId,
|
||||
{
|
||||
@@ -137,6 +138,7 @@ export class CallManager {
|
||||
config: this.config,
|
||||
storePath: this.storePath,
|
||||
webhookUrl: this.webhookUrl,
|
||||
activeTurnCalls: this.activeTurnCalls,
|
||||
transcriptWaiters: this.transcriptWaiters,
|
||||
maxDurationTimers: this.maxDurationTimers,
|
||||
onCallAnswered: (call) => {
|
||||
|
||||
@@ -24,6 +24,7 @@ export type CallManagerRuntimeDeps = {
|
||||
};
|
||||
|
||||
export type CallManagerTransientState = {
|
||||
activeTurnCalls: Set<CallId>;
|
||||
transcriptWaiters: Map<CallId, TranscriptWaiter>;
|
||||
maxDurationTimers: Map<CallId, NodeJS.Timeout>;
|
||||
};
|
||||
|
||||
@@ -24,6 +24,7 @@ function createContext(overrides: Partial<CallManagerContext> = {}): CallManager
|
||||
}),
|
||||
storePath,
|
||||
webhookUrl: null,
|
||||
activeTurnCalls: new Set(),
|
||||
transcriptWaiters: new Map(),
|
||||
maxDurationTimers: new Map(),
|
||||
...overrides,
|
||||
|
||||
@@ -36,6 +36,7 @@ type ConversationContext = Pick<
|
||||
| "provider"
|
||||
| "config"
|
||||
| "storePath"
|
||||
| "activeTurnCalls"
|
||||
| "transcriptWaiters"
|
||||
| "maxDurationTimers"
|
||||
>;
|
||||
@@ -241,9 +242,10 @@ export async function continueCall(
|
||||
if (TerminalStates.has(call.state)) {
|
||||
return { success: false, error: "Call has ended" };
|
||||
}
|
||||
if (ctx.transcriptWaiters.has(callId)) {
|
||||
if (ctx.activeTurnCalls.has(callId) || ctx.transcriptWaiters.has(callId)) {
|
||||
return { success: false, error: "Already waiting for transcript" };
|
||||
}
|
||||
ctx.activeTurnCalls.add(callId);
|
||||
|
||||
const turnStartedAt = Date.now();
|
||||
|
||||
@@ -291,6 +293,7 @@ export async function continueCall(
|
||||
} catch (err) {
|
||||
return { success: false, error: err instanceof Error ? err.message : String(err) };
|
||||
} finally {
|
||||
ctx.activeTurnCalls.delete(callId);
|
||||
clearTranscriptWaiter(ctx, callId);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -87,8 +87,9 @@ export function resolveTranscriptWaiter(
|
||||
}
|
||||
|
||||
export function waitForFinalTranscript(ctx: TimerContext, callId: CallId): Promise<string> {
|
||||
// Only allow one in-flight waiter per call.
|
||||
rejectTranscriptWaiter(ctx, callId, "Transcript waiter replaced");
|
||||
if (ctx.transcriptWaiters.has(callId)) {
|
||||
return Promise.reject(new Error("Already waiting for transcript"));
|
||||
}
|
||||
|
||||
const timeoutMs = ctx.config.transcriptTimeoutMs;
|
||||
return new Promise((resolve, reject) => {
|
||||
|
||||
@@ -198,6 +198,30 @@ describe("voice transcript events", () => {
|
||||
expect(updateSessionStoreMock).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("does not dedupe identical text when source event IDs differ", async () => {
|
||||
const ctx = buildCtx();
|
||||
|
||||
await handleNodeEvent(ctx, "node-v1", {
|
||||
event: "voice.transcript",
|
||||
payloadJSON: JSON.stringify({
|
||||
text: "hello from mic",
|
||||
sessionKey: "voice-dedupe-eventid-session",
|
||||
eventId: "evt-voice-1",
|
||||
}),
|
||||
});
|
||||
await handleNodeEvent(ctx, "node-v1", {
|
||||
event: "voice.transcript",
|
||||
payloadJSON: JSON.stringify({
|
||||
text: "hello from mic",
|
||||
sessionKey: "voice-dedupe-eventid-session",
|
||||
eventId: "evt-voice-2",
|
||||
}),
|
||||
});
|
||||
|
||||
expect(agentCommandMock).toHaveBeenCalledTimes(2);
|
||||
expect(updateSessionStoreMock).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it("forwards transcript with voice provenance", async () => {
|
||||
const ctx = buildCtx();
|
||||
|
||||
|
||||
@@ -19,22 +19,64 @@ const MAX_EXEC_EVENT_OUTPUT_CHARS = 180;
|
||||
const VOICE_TRANSCRIPT_DEDUPE_WINDOW_MS = 1500;
|
||||
const MAX_RECENT_VOICE_TRANSCRIPTS = 200;
|
||||
|
||||
const recentVoiceTranscripts = new Map<string, { text: string; ts: number }>();
|
||||
const recentVoiceTranscripts = new Map<string, { fingerprint: string; ts: number }>();
|
||||
|
||||
function normalizeNonEmptyString(value: unknown): string | null {
|
||||
if (typeof value !== "string") {
|
||||
return null;
|
||||
}
|
||||
const trimmed = value.trim();
|
||||
return trimmed.length > 0 ? trimmed : null;
|
||||
}
|
||||
|
||||
function normalizeFiniteInteger(value: unknown): number | null {
|
||||
return typeof value === "number" && Number.isFinite(value) ? Math.trunc(value) : null;
|
||||
}
|
||||
|
||||
function resolveVoiceTranscriptFingerprint(obj: Record<string, unknown>, text: string): string {
|
||||
const eventId =
|
||||
normalizeNonEmptyString(obj.eventId) ??
|
||||
normalizeNonEmptyString(obj.providerEventId) ??
|
||||
normalizeNonEmptyString(obj.transcriptId);
|
||||
if (eventId) {
|
||||
return `event:${eventId}`;
|
||||
}
|
||||
|
||||
const callId = normalizeNonEmptyString(obj.providerCallId) ?? normalizeNonEmptyString(obj.callId);
|
||||
const sequence = normalizeFiniteInteger(obj.sequence) ?? normalizeFiniteInteger(obj.seq);
|
||||
if (callId && sequence !== null) {
|
||||
return `call-seq:${callId}:${sequence}`;
|
||||
}
|
||||
|
||||
const eventTimestamp =
|
||||
normalizeFiniteInteger(obj.timestamp) ??
|
||||
normalizeFiniteInteger(obj.ts) ??
|
||||
normalizeFiniteInteger(obj.eventTimestamp);
|
||||
if (callId && eventTimestamp !== null) {
|
||||
return `call-ts:${callId}:${eventTimestamp}`;
|
||||
}
|
||||
|
||||
if (eventTimestamp !== null) {
|
||||
return `timestamp:${eventTimestamp}|text:${text}`;
|
||||
}
|
||||
|
||||
return `text:${text}`;
|
||||
}
|
||||
|
||||
function shouldDropDuplicateVoiceTranscript(params: {
|
||||
sessionKey: string;
|
||||
text: string;
|
||||
fingerprint: string;
|
||||
now: number;
|
||||
}): boolean {
|
||||
const previous = recentVoiceTranscripts.get(params.sessionKey);
|
||||
if (
|
||||
previous &&
|
||||
previous.text === params.text &&
|
||||
previous.fingerprint === params.fingerprint &&
|
||||
params.now - previous.ts <= VOICE_TRANSCRIPT_DEDUPE_WINDOW_MS
|
||||
) {
|
||||
return true;
|
||||
}
|
||||
recentVoiceTranscripts.set(params.sessionKey, { text: params.text, ts: params.now });
|
||||
recentVoiceTranscripts.set(params.sessionKey, { fingerprint: params.fingerprint, ts: params.now });
|
||||
|
||||
if (recentVoiceTranscripts.size > MAX_RECENT_VOICE_TRANSCRIPTS) {
|
||||
const cutoff = params.now - VOICE_TRANSCRIPT_DEDUPE_WINDOW_MS * 2;
|
||||
@@ -46,6 +88,13 @@ function shouldDropDuplicateVoiceTranscript(params: {
|
||||
break;
|
||||
}
|
||||
}
|
||||
while (recentVoiceTranscripts.size > MAX_RECENT_VOICE_TRANSCRIPTS) {
|
||||
const oldestKey = recentVoiceTranscripts.keys().next().value;
|
||||
if (oldestKey === undefined) {
|
||||
break;
|
||||
}
|
||||
recentVoiceTranscripts.delete(oldestKey);
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
@@ -168,7 +217,8 @@ export const handleNodeEvent = async (ctx: NodeEventContext, nodeId: string, evt
|
||||
const sessionKey = sessionKeyRaw.length > 0 ? sessionKeyRaw : rawMainKey;
|
||||
const { storePath, entry, canonicalKey } = loadSessionEntry(sessionKey);
|
||||
const now = Date.now();
|
||||
if (shouldDropDuplicateVoiceTranscript({ sessionKey: canonicalKey, text, now })) {
|
||||
const fingerprint = resolveVoiceTranscriptFingerprint(obj, text);
|
||||
if (shouldDropDuplicateVoiceTranscript({ sessionKey: canonicalKey, fingerprint, now })) {
|
||||
return;
|
||||
}
|
||||
const sessionId = entry?.sessionId ?? randomUUID();
|
||||
|
||||
Reference in New Issue
Block a user