From 89574f30cb006d6ddc3fe17c51ee87c24fc363f3 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Sat, 14 Feb 2026 03:39:03 +0100 Subject: [PATCH] refactor(voice-call): split manager into facade and context slices --- extensions/voice-call/src/manager.ts | 468 ++---------------- extensions/voice-call/src/manager/context.ts | 20 +- extensions/voice-call/src/manager/events.ts | 23 +- extensions/voice-call/src/manager/outbound.ts | 41 +- extensions/voice-call/src/manager/timers.ts | 25 +- 5 files changed, 120 insertions(+), 457 deletions(-) diff --git a/extensions/voice-call/src/manager.ts b/extensions/voice-call/src/manager.ts index 480e21d70e..3b3a5b7c06 100644 --- a/extensions/voice-call/src/manager.ts +++ b/extensions/voice-call/src/manager.ts @@ -1,23 +1,21 @@ -import crypto from "node:crypto"; import fs from "node:fs"; -import fsp from "node:fs/promises"; import os from "node:os"; import path from "node:path"; -import type { CallMode, VoiceCallConfig } from "./config.js"; +import type { VoiceCallConfig } from "./config.js"; import type { CallManagerContext } from "./manager/context.js"; import type { VoiceCallProvider } from "./providers/base.js"; +import type { CallId, CallRecord, NormalizedEvent, OutboundCallOptions } from "./types.js"; import { processEvent as processManagerEvent } from "./manager/events.js"; +import { getCallByProviderCallId as getCallByProviderCallIdFromMaps } from "./manager/lookup.js"; import { - type CallId, - type CallRecord, - CallRecordSchema, - type NormalizedEvent, - type OutboundCallOptions, - TerminalStates, - type TranscriptEntry, -} from "./types.js"; + continueCall as continueCallWithContext, + endCall as endCallWithContext, + initiateCall as initiateCallWithContext, + speak as speakWithContext, + speakInitialMessage as speakInitialMessageWithContext, +} from "./manager/outbound.js"; +import { getCallHistoryFromStore, loadActiveCallsFromStore } from "./manager/store.js"; import { resolveUserPath } from "./utils.js"; -import { escapeXml, mapVoiceToPolly } from "./voice-mapping.js"; function resolveDefaultStoreBase(config: VoiceCallConfig, storePath?: string): string { const rawOverride = storePath?.trim() || config.store?.trim(); @@ -38,11 +36,11 @@ function resolveDefaultStoreBase(config: VoiceCallConfig, storePath?: string): s } /** - * Manages voice calls: state machine, persistence, and provider coordination. + * Manages voice calls: state ownership and delegation to manager helper modules. */ export class CallManager { private activeCalls = new Map(); - private providerCallIdMap = new Map(); // providerCallId -> internal callId + private providerCallIdMap = new Map(); private processedEventIds = new Set(); private rejectedProviderCallIds = new Set(); private provider: VoiceCallProvider | null = null; @@ -57,12 +55,10 @@ export class CallManager { timeout: NodeJS.Timeout; } >(); - /** Max duration timers to auto-hangup calls after configured timeout */ private maxDurationTimers = new Map(); constructor(config: VoiceCallConfig, storePath?: string) { this.config = config; - // Resolve store path with tilde expansion (like other config values) this.storePath = resolveDefaultStoreBase(config, storePath); } @@ -73,11 +69,13 @@ export class CallManager { this.provider = provider; this.webhookUrl = webhookUrl; - // Ensure store directory exists fs.mkdirSync(this.storePath, { recursive: true }); - // Load any persisted active calls - this.loadActiveCalls(); + const persisted = loadActiveCallsFromStore(this.storePath); + this.activeCalls = persisted.activeCalls; + this.providerCallIdMap = persisted.providerCallIdMap; + this.processedEventIds = persisted.processedEventIds; + this.rejectedProviderCallIds = persisted.rejectedProviderCallIds; } /** @@ -89,242 +87,27 @@ export class CallManager { /** * Initiate an outbound call. - * @param to - The phone number to call - * @param sessionKey - Optional session key for context - * @param options - Optional call options (message, mode) */ async initiateCall( to: string, sessionKey?: string, options?: OutboundCallOptions | string, ): Promise<{ callId: CallId; success: boolean; error?: string }> { - // Support legacy string argument for initialMessage - const opts: OutboundCallOptions = - typeof options === "string" ? { message: options } : (options ?? {}); - const initialMessage = opts.message; - const mode = opts.mode ?? this.config.outbound.defaultMode; - if (!this.provider) { - return { callId: "", success: false, error: "Provider not initialized" }; - } - - if (!this.webhookUrl) { - return { - callId: "", - success: false, - error: "Webhook URL not configured", - }; - } - - // Check concurrent call limit - const activeCalls = this.getActiveCalls(); - if (activeCalls.length >= this.config.maxConcurrentCalls) { - return { - callId: "", - success: false, - error: `Maximum concurrent calls (${this.config.maxConcurrentCalls}) reached`, - }; - } - - const callId = crypto.randomUUID(); - const from = - this.config.fromNumber || (this.provider?.name === "mock" ? "+15550000000" : undefined); - if (!from) { - return { callId: "", success: false, error: "fromNumber not configured" }; - } - - // Create call record with mode in metadata - const callRecord: CallRecord = { - callId, - provider: this.provider.name, - direction: "outbound", - state: "initiated", - from, - to, - sessionKey, - startedAt: Date.now(), - transcript: [], - processedEventIds: [], - metadata: { - ...(initialMessage && { initialMessage }), - mode, - }, - }; - - this.activeCalls.set(callId, callRecord); - this.persistCallRecord(callRecord); - - try { - // For notify mode with a message, use inline TwiML with - let inlineTwiml: string | undefined; - if (mode === "notify" && initialMessage) { - const pollyVoice = mapVoiceToPolly(this.config.tts?.openai?.voice); - inlineTwiml = this.generateNotifyTwiml(initialMessage, pollyVoice); - console.log(`[voice-call] Using inline TwiML for notify mode (voice: ${pollyVoice})`); - } - - const result = await this.provider.initiateCall({ - callId, - from, - to, - webhookUrl: this.webhookUrl, - inlineTwiml, - }); - - callRecord.providerCallId = result.providerCallId; - this.providerCallIdMap.set(result.providerCallId, callId); // Map providerCallId to internal callId - this.persistCallRecord(callRecord); - - return { callId, success: true }; - } catch (err) { - callRecord.state = "failed"; - callRecord.endedAt = Date.now(); - callRecord.endReason = "failed"; - this.persistCallRecord(callRecord); - this.activeCalls.delete(callId); - if (callRecord.providerCallId) { - this.providerCallIdMap.delete(callRecord.providerCallId); - } - - return { - callId, - success: false, - error: err instanceof Error ? err.message : String(err), - }; - } + return initiateCallWithContext(this.getContext(), to, sessionKey, options); } /** * Speak to user in an active call. */ async speak(callId: CallId, text: string): Promise<{ success: boolean; error?: string }> { - const call = this.activeCalls.get(callId); - if (!call) { - return { success: false, error: "Call not found" }; - } - - if (!this.provider || !call.providerCallId) { - return { success: false, error: "Call not connected" }; - } - - if (TerminalStates.has(call.state)) { - return { success: false, error: "Call has ended" }; - } - - try { - // Update state - call.state = "speaking"; - this.persistCallRecord(call); - - // Add to transcript - this.addTranscriptEntry(call, "bot", text); - - // Play TTS - const voice = this.provider?.name === "twilio" ? this.config.tts?.openai?.voice : undefined; - await this.provider.playTts({ - callId, - providerCallId: call.providerCallId, - text, - voice, - }); - - return { success: true }; - } catch (err) { - return { - success: false, - error: err instanceof Error ? err.message : String(err), - }; - } + return speakWithContext(this.getContext(), callId, text); } /** * Speak the initial message for a call (called when media stream connects). - * This is used to auto-play the message passed to initiateCall. - * In notify mode, auto-hangup after the message is delivered. */ async speakInitialMessage(providerCallId: string): Promise { - const call = this.getCallByProviderCallId(providerCallId); - if (!call) { - console.warn(`[voice-call] speakInitialMessage: no call found for ${providerCallId}`); - return; - } - - const initialMessage = call.metadata?.initialMessage as string | undefined; - const mode = (call.metadata?.mode as CallMode) ?? "conversation"; - - if (!initialMessage) { - console.log(`[voice-call] speakInitialMessage: no initial message for ${call.callId}`); - return; - } - - // Clear the initial message so we don't speak it again - if (call.metadata) { - delete call.metadata.initialMessage; - this.persistCallRecord(call); - } - - console.log(`[voice-call] Speaking initial message for call ${call.callId} (mode: ${mode})`); - const result = await this.speak(call.callId, initialMessage); - if (!result.success) { - console.warn(`[voice-call] Failed to speak initial message: ${result.error}`); - return; - } - - // In notify mode, auto-hangup after delay - if (mode === "notify") { - const delaySec = this.config.outbound.notifyHangupDelaySec; - console.log(`[voice-call] Notify mode: auto-hangup in ${delaySec}s for call ${call.callId}`); - setTimeout(async () => { - const currentCall = this.getCall(call.callId); - if (currentCall && !TerminalStates.has(currentCall.state)) { - console.log(`[voice-call] Notify mode: hanging up call ${call.callId}`); - await this.endCall(call.callId); - } - }, delaySec * 1000); - } - } - - /** - * Clear max duration timer for a call. - */ - private clearMaxDurationTimer(callId: CallId): void { - const timer = this.maxDurationTimers.get(callId); - if (timer) { - clearTimeout(timer); - this.maxDurationTimers.delete(callId); - } - } - - private clearTranscriptWaiter(callId: CallId): void { - const waiter = this.transcriptWaiters.get(callId); - if (!waiter) { - return; - } - clearTimeout(waiter.timeout); - this.transcriptWaiters.delete(callId); - } - - private rejectTranscriptWaiter(callId: CallId, reason: string): void { - const waiter = this.transcriptWaiters.get(callId); - if (!waiter) { - return; - } - this.clearTranscriptWaiter(callId); - waiter.reject(new Error(reason)); - } - - private waitForFinalTranscript(callId: CallId): Promise { - // Only allow one in-flight waiter per call. - this.rejectTranscriptWaiter(callId, "Transcript waiter replaced"); - - const timeoutMs = this.config.transcriptTimeoutMs; - return new Promise((resolve, reject) => { - const timeout = setTimeout(() => { - this.transcriptWaiters.delete(callId); - reject(new Error(`Timed out waiting for transcript after ${timeoutMs}ms`)); - }, timeoutMs); - - this.transcriptWaiters.set(callId, { resolve, reject, timeout }); - }); + return speakInitialMessageWithContext(this.getContext(), providerCallId); } /** @@ -334,91 +117,14 @@ export class CallManager { callId: CallId, prompt: string, ): Promise<{ success: boolean; transcript?: string; error?: string }> { - const call = this.activeCalls.get(callId); - if (!call) { - return { success: false, error: "Call not found" }; - } - - if (!this.provider || !call.providerCallId) { - return { success: false, error: "Call not connected" }; - } - - if (TerminalStates.has(call.state)) { - return { success: false, error: "Call has ended" }; - } - - try { - await this.speak(callId, prompt); - - call.state = "listening"; - this.persistCallRecord(call); - - await this.provider.startListening({ - callId, - providerCallId: call.providerCallId, - }); - - const transcript = await this.waitForFinalTranscript(callId); - - // Best-effort: stop listening after final transcript. - await this.provider.stopListening({ - callId, - providerCallId: call.providerCallId, - }); - - return { success: true, transcript }; - } catch (err) { - return { - success: false, - error: err instanceof Error ? err.message : String(err), - }; - } finally { - this.clearTranscriptWaiter(callId); - } + return continueCallWithContext(this.getContext(), callId, prompt); } /** * End an active call. */ async endCall(callId: CallId): Promise<{ success: boolean; error?: string }> { - const call = this.activeCalls.get(callId); - if (!call) { - return { success: false, error: "Call not found" }; - } - - if (!this.provider || !call.providerCallId) { - return { success: false, error: "Call not connected" }; - } - - if (TerminalStates.has(call.state)) { - return { success: true }; // Already ended - } - - try { - await this.provider.hangupCall({ - callId, - providerCallId: call.providerCallId, - reason: "hangup-bot", - }); - - call.state = "hangup-bot"; - call.endedAt = Date.now(); - call.endReason = "hangup-bot"; - this.persistCallRecord(call); - this.clearMaxDurationTimer(callId); - this.rejectTranscriptWaiter(callId, "Call ended: hangup-bot"); - this.activeCalls.delete(callId); - if (call.providerCallId) { - this.providerCallIdMap.delete(call.providerCallId); - } - - return { success: true }; - } catch (err) { - return { - success: false, - error: err instanceof Error ? err.message : String(err), - }; - } + return endCallWithContext(this.getContext(), callId); } private getContext(): CallManagerContext { @@ -427,15 +133,15 @@ export class CallManager { providerCallIdMap: this.providerCallIdMap, processedEventIds: this.processedEventIds, rejectedProviderCallIds: this.rejectedProviderCallIds, - onCallAnswered: (call) => { - this.maybeSpeakInitialMessageOnAnswered(call); - }, provider: this.provider, config: this.config, storePath: this.storePath, webhookUrl: this.webhookUrl, transcriptWaiters: this.transcriptWaiters, maxDurationTimers: this.maxDurationTimers, + onCallAnswered: (call) => { + this.maybeSpeakInitialMessageOnAnswered(call); + }, }; } @@ -478,20 +184,11 @@ export class CallManager { * Get an active call by provider call ID (e.g., Twilio CallSid). */ getCallByProviderCallId(providerCallId: string): CallRecord | undefined { - // Fast path: use the providerCallIdMap for O(1) lookup - const callId = this.providerCallIdMap.get(providerCallId); - if (callId) { - return this.activeCalls.get(callId); - } - - // Fallback: linear search for cases where map wasn't populated - // (e.g., providerCallId set directly on call record) - for (const call of this.activeCalls.values()) { - if (call.providerCallId === providerCallId) { - return call; - } - } - return undefined; + return getCallByProviderCallIdFromMaps({ + activeCalls: this.activeCalls, + providerCallIdMap: this.providerCallIdMap, + providerCallId, + }); } /** @@ -505,109 +202,6 @@ export class CallManager { * Get call history (from persisted logs). */ async getCallHistory(limit = 50): Promise { - const logPath = path.join(this.storePath, "calls.jsonl"); - - try { - await fsp.access(logPath); - } catch { - return []; - } - - const content = await fsp.readFile(logPath, "utf-8"); - const lines = content.trim().split("\n").filter(Boolean); - const calls: CallRecord[] = []; - - // Parse last N lines - for (const line of lines.slice(-limit)) { - try { - const parsed = CallRecordSchema.parse(JSON.parse(line)); - calls.push(parsed); - } catch { - // Skip invalid lines - } - } - - return calls; - } - - /** - * Add an entry to the call transcript. - */ - private addTranscriptEntry(call: CallRecord, speaker: "bot" | "user", text: string): void { - const entry: TranscriptEntry = { - timestamp: Date.now(), - speaker, - text, - isFinal: true, - }; - call.transcript.push(entry); - } - - /** - * Persist a call record to disk (fire-and-forget async). - */ - private persistCallRecord(call: CallRecord): void { - const logPath = path.join(this.storePath, "calls.jsonl"); - const line = `${JSON.stringify(call)}\n`; - // Fire-and-forget async write to avoid blocking event loop - fsp.appendFile(logPath, line).catch((err) => { - console.error("[voice-call] Failed to persist call record:", err); - }); - } - - /** - * Load active calls from persistence (for crash recovery). - * Uses streaming to handle large log files efficiently. - */ - private loadActiveCalls(): void { - const logPath = path.join(this.storePath, "calls.jsonl"); - if (!fs.existsSync(logPath)) { - return; - } - - // Read file synchronously and parse lines - const content = fs.readFileSync(logPath, "utf-8"); - const lines = content.split("\n"); - - // Build map of latest state per call - const callMap = new Map(); - - for (const line of lines) { - if (!line.trim()) { - continue; - } - try { - const call = CallRecordSchema.parse(JSON.parse(line)); - callMap.set(call.callId, call); - } catch { - // Skip invalid lines - } - } - - // Only keep non-terminal calls - for (const [callId, call] of callMap) { - if (!TerminalStates.has(call.state)) { - this.activeCalls.set(callId, call); - // Populate providerCallId mapping for lookups - if (call.providerCallId) { - this.providerCallIdMap.set(call.providerCallId, callId); - } - // Populate processed event IDs - for (const eventId of call.processedEventIds) { - this.processedEventIds.add(eventId); - } - } - } - } - - /** - * Generate TwiML for notify mode (speak message and hang up). - */ - private generateNotifyTwiml(message: string, voice: string): string { - return ` - - ${escapeXml(message)} - -`; + return getCallHistoryFromStore(this.storePath, limit); } } diff --git a/extensions/voice-call/src/manager/context.ts b/extensions/voice-call/src/manager/context.ts index 6cac6c9322..03cbd3c1e1 100644 --- a/extensions/voice-call/src/manager/context.ts +++ b/extensions/voice-call/src/manager/context.ts @@ -8,18 +8,32 @@ export type TranscriptWaiter = { timeout: NodeJS.Timeout; }; -export type CallManagerContext = { +export type CallManagerRuntimeState = { activeCalls: Map; providerCallIdMap: Map; processedEventIds: Set; /** Provider call IDs we already sent a reject hangup for; avoids duplicate hangup calls. */ rejectedProviderCallIds: Set; - /** Optional runtime hook invoked after an event transitions a call into answered state. */ - onCallAnswered?: (call: CallRecord) => void; +}; + +export type CallManagerRuntimeDeps = { provider: VoiceCallProvider | null; config: VoiceCallConfig; storePath: string; webhookUrl: string | null; +}; + +export type CallManagerTransientState = { transcriptWaiters: Map; maxDurationTimers: Map; }; + +export type CallManagerHooks = { + /** Optional runtime hook invoked after an event transitions a call into answered state. */ + onCallAnswered?: (call: CallRecord) => void; +}; + +export type CallManagerContext = CallManagerRuntimeState & + CallManagerRuntimeDeps & + CallManagerTransientState & + CallManagerHooks; diff --git a/extensions/voice-call/src/manager/events.ts b/extensions/voice-call/src/manager/events.ts index 2fb3639250..53371514af 100644 --- a/extensions/voice-call/src/manager/events.ts +++ b/extensions/voice-call/src/manager/events.ts @@ -13,10 +13,21 @@ import { startMaxDurationTimer, } from "./timers.js"; -function shouldAcceptInbound( - config: CallManagerContext["config"], - from: string | undefined, -): boolean { +type EventContext = Pick< + CallManagerContext, + | "activeCalls" + | "providerCallIdMap" + | "processedEventIds" + | "rejectedProviderCallIds" + | "provider" + | "config" + | "storePath" + | "transcriptWaiters" + | "maxDurationTimers" + | "onCallAnswered" +>; + +function shouldAcceptInbound(config: EventContext["config"], from: string | undefined): boolean { const { inboundPolicy: policy, allowFrom } = config; switch (policy) { @@ -49,7 +60,7 @@ function shouldAcceptInbound( } function createInboundCall(params: { - ctx: CallManagerContext; + ctx: EventContext; providerCallId: string; from: string; to: string; @@ -80,7 +91,7 @@ function createInboundCall(params: { return callRecord; } -export function processEvent(ctx: CallManagerContext, event: NormalizedEvent): void { +export function processEvent(ctx: EventContext, event: NormalizedEvent): void { if (ctx.processedEventIds.has(event.id)) { return; } diff --git a/extensions/voice-call/src/manager/outbound.ts b/extensions/voice-call/src/manager/outbound.ts index 2f810fec60..2089b95fe4 100644 --- a/extensions/voice-call/src/manager/outbound.ts +++ b/extensions/voice-call/src/manager/outbound.ts @@ -19,8 +19,39 @@ import { } from "./timers.js"; import { generateNotifyTwiml } from "./twiml.js"; +type InitiateContext = Pick< + CallManagerContext, + "activeCalls" | "providerCallIdMap" | "provider" | "config" | "storePath" | "webhookUrl" +>; + +type SpeakContext = Pick< + CallManagerContext, + "activeCalls" | "providerCallIdMap" | "provider" | "config" | "storePath" +>; + +type ConversationContext = Pick< + CallManagerContext, + | "activeCalls" + | "providerCallIdMap" + | "provider" + | "config" + | "storePath" + | "transcriptWaiters" + | "maxDurationTimers" +>; + +type EndCallContext = Pick< + CallManagerContext, + | "activeCalls" + | "providerCallIdMap" + | "provider" + | "storePath" + | "transcriptWaiters" + | "maxDurationTimers" +>; + export async function initiateCall( - ctx: CallManagerContext, + ctx: InitiateContext, to: string, sessionKey?: string, options?: OutboundCallOptions | string, @@ -113,7 +144,7 @@ export async function initiateCall( } export async function speak( - ctx: CallManagerContext, + ctx: SpeakContext, callId: CallId, text: string, ): Promise<{ success: boolean; error?: string }> { @@ -149,7 +180,7 @@ export async function speak( } export async function speakInitialMessage( - ctx: CallManagerContext, + ctx: ConversationContext, providerCallId: string, ): Promise { const call = getCallByProviderCallId({ @@ -197,7 +228,7 @@ export async function speakInitialMessage( } export async function continueCall( - ctx: CallManagerContext, + ctx: ConversationContext, callId: CallId, prompt: string, ): Promise<{ success: boolean; transcript?: string; error?: string }> { @@ -234,7 +265,7 @@ export async function continueCall( } export async function endCall( - ctx: CallManagerContext, + ctx: EndCallContext, callId: CallId, ): Promise<{ success: boolean; error?: string }> { const call = ctx.activeCalls.get(callId); diff --git a/extensions/voice-call/src/manager/timers.ts b/extensions/voice-call/src/manager/timers.ts index b8723ebcaa..4b6d215054 100644 --- a/extensions/voice-call/src/manager/timers.ts +++ b/extensions/voice-call/src/manager/timers.ts @@ -2,7 +2,20 @@ import type { CallManagerContext } from "./context.js"; import { TerminalStates, type CallId } from "../types.js"; import { persistCallRecord } from "./store.js"; -export function clearMaxDurationTimer(ctx: CallManagerContext, callId: CallId): void { +type TimerContext = Pick< + CallManagerContext, + "activeCalls" | "maxDurationTimers" | "config" | "storePath" | "transcriptWaiters" +>; +type MaxDurationTimerContext = Pick< + TimerContext, + "activeCalls" | "maxDurationTimers" | "config" | "storePath" +>; +type TranscriptWaiterContext = Pick; + +export function clearMaxDurationTimer( + ctx: Pick, + callId: CallId, +): void { const timer = ctx.maxDurationTimers.get(callId); if (timer) { clearTimeout(timer); @@ -11,7 +24,7 @@ export function clearMaxDurationTimer(ctx: CallManagerContext, callId: CallId): } export function startMaxDurationTimer(params: { - ctx: CallManagerContext; + ctx: MaxDurationTimerContext; callId: CallId; onTimeout: (callId: CallId) => Promise; }): void { @@ -38,7 +51,7 @@ export function startMaxDurationTimer(params: { params.ctx.maxDurationTimers.set(params.callId, timer); } -export function clearTranscriptWaiter(ctx: CallManagerContext, callId: CallId): void { +export function clearTranscriptWaiter(ctx: TranscriptWaiterContext, callId: CallId): void { const waiter = ctx.transcriptWaiters.get(callId); if (!waiter) { return; @@ -48,7 +61,7 @@ export function clearTranscriptWaiter(ctx: CallManagerContext, callId: CallId): } export function rejectTranscriptWaiter( - ctx: CallManagerContext, + ctx: TranscriptWaiterContext, callId: CallId, reason: string, ): void { @@ -61,7 +74,7 @@ export function rejectTranscriptWaiter( } export function resolveTranscriptWaiter( - ctx: CallManagerContext, + ctx: TranscriptWaiterContext, callId: CallId, transcript: string, ): void { @@ -73,7 +86,7 @@ export function resolveTranscriptWaiter( waiter.resolve(transcript); } -export function waitForFinalTranscript(ctx: CallManagerContext, callId: CallId): Promise { +export function waitForFinalTranscript(ctx: TimerContext, callId: CallId): Promise { // Only allow one in-flight waiter per call. rejectTranscriptWaiter(ctx, callId, "Transcript waiter replaced");