From 8875dbd44910c78df5d0b86902633720b6d6ff33 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 9 Jan 2026 10:27:06 +0100 Subject: [PATCH] refactor(msteams): split monitor handler and poll store --- docs/cli/message.md | 14 + src/infra/outbound/message.test.ts | 77 ++++ src/msteams/monitor-handler.ts | 547 +++++++++++++++++++++++++ src/msteams/monitor.ts | 509 +---------------------- src/msteams/polls-store-memory.test.ts | 25 ++ src/msteams/polls-store-memory.ts | 36 ++ src/msteams/polls.ts | 43 +- src/msteams/send.ts | 151 ++++--- 8 files changed, 819 insertions(+), 583 deletions(-) create mode 100644 src/infra/outbound/message.test.ts create mode 100644 src/msteams/monitor-handler.ts create mode 100644 src/msteams/polls-store-memory.test.ts create mode 100644 src/msteams/polls-store-memory.ts diff --git a/docs/cli/message.md b/docs/cli/message.md index cec9bec6cb..1e6b8b2e44 100644 --- a/docs/cli/message.md +++ b/docs/cli/message.md @@ -155,6 +155,20 @@ clawdbot message poll --provider discord \ --poll-multi --poll-duration-hours 48 ``` +Send a Teams proactive message: +``` +clawdbot message send --provider msteams \ + --to conversation:19:abc@thread.tacv2 --message "hi" +``` + +Create a Teams poll: +``` +clawdbot message poll --provider msteams \ + --to conversation:19:abc@thread.tacv2 \ + --poll-question "Lunch?" \ + --poll-option Pizza --poll-option Sushi +``` + React in Slack: ``` clawdbot message react --provider slack \ diff --git a/src/infra/outbound/message.test.ts b/src/infra/outbound/message.test.ts new file mode 100644 index 0000000000..ac28d9a1b7 --- /dev/null +++ b/src/infra/outbound/message.test.ts @@ -0,0 +1,77 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; + +import { sendMessage, sendPoll } from "./message.js"; + +const callGatewayMock = vi.fn(); +vi.mock("../../gateway/call.js", () => ({ + callGateway: (...args: unknown[]) => callGatewayMock(...args), + randomIdempotencyKey: () => "idem-1", +})); + +describe("sendMessage provider normalization", () => { + beforeEach(() => { + callGatewayMock.mockReset(); + }); + + it("normalizes Teams alias", async () => { + const sendMSTeams = vi.fn(async () => ({ + messageId: "m1", + conversationId: "c1", + })); + const result = await sendMessage({ + cfg: {}, + to: "conversation:19:abc@thread.tacv2", + content: "hi", + provider: "teams", + deps: { sendMSTeams }, + }); + + expect(sendMSTeams).toHaveBeenCalledWith( + "conversation:19:abc@thread.tacv2", + "hi", + ); + expect(result.provider).toBe("msteams"); + }); + + it("normalizes iMessage alias", async () => { + const sendIMessage = vi.fn(async () => ({ messageId: "i1" })); + const result = await sendMessage({ + cfg: {}, + to: "someone@example.com", + content: "hi", + provider: "imsg", + deps: { sendIMessage }, + }); + + expect(sendIMessage).toHaveBeenCalledWith( + "someone@example.com", + "hi", + expect.any(Object), + ); + expect(result.provider).toBe("imessage"); + }); +}); + +describe("sendPoll provider normalization", () => { + beforeEach(() => { + callGatewayMock.mockReset(); + }); + + it("normalizes Teams alias for polls", async () => { + callGatewayMock.mockResolvedValueOnce({ messageId: "p1" }); + + const result = await sendPoll({ + cfg: {}, + to: "conversation:19:abc@thread.tacv2", + question: "Lunch?", + options: ["Pizza", "Sushi"], + provider: "Teams", + }); + + const call = callGatewayMock.mock.calls[0]?.[0] as { + params?: Record; + }; + expect(call?.params?.provider).toBe("msteams"); + expect(result.provider).toBe("msteams"); + }); +}); diff --git a/src/msteams/monitor-handler.ts b/src/msteams/monitor-handler.ts new file mode 100644 index 0000000000..72b524f3f5 --- /dev/null +++ b/src/msteams/monitor-handler.ts @@ -0,0 +1,547 @@ +import { formatAgentEnvelope } from "../auto-reply/envelope.js"; +import { dispatchReplyFromConfig } from "../auto-reply/reply/dispatch-from-config.js"; +import { createReplyDispatcherWithTyping } from "../auto-reply/reply/reply-dispatcher.js"; +import type { ClawdbotConfig } from "../config/types.js"; +import { danger, logVerbose, shouldLogVerbose } from "../globals.js"; +import { enqueueSystemEvent } from "../infra/system-events.js"; +import { + readProviderAllowFromStore, + upsertProviderPairingRequest, +} from "../pairing/pairing-store.js"; +import { resolveAgentRoute } from "../routing/resolve-route.js"; +import type { RuntimeEnv } from "../runtime.js"; +import { + buildMSTeamsAttachmentPlaceholder, + buildMSTeamsGraphMessageUrls, + buildMSTeamsMediaPayload, + downloadMSTeamsGraphMedia, + downloadMSTeamsImageAttachments, + type MSTeamsAttachmentLike, + summarizeMSTeamsHtmlAttachments, +} from "./attachments.js"; +import type { + MSTeamsConversationStore, + StoredConversationReference, +} from "./conversation-store.js"; +import { + classifyMSTeamsSendError, + formatMSTeamsSendErrorHint, + formatUnknownError, +} from "./errors.js"; +import { + extractMSTeamsConversationMessageId, + normalizeMSTeamsConversationId, + parseMSTeamsActivityTimestamp, + stripMSTeamsMentionTags, + wasMSTeamsBotMentioned, +} from "./inbound.js"; +import { + type MSTeamsAdapter, + renderReplyPayloadsToMessages, + sendMSTeamsMessages, +} from "./messenger.js"; +import { + resolveMSTeamsReplyPolicy, + resolveMSTeamsRouteConfig, +} from "./policy.js"; +import { extractMSTeamsPollVote, type MSTeamsPollStore } from "./polls.js"; +import type { MSTeamsTurnContext } from "./sdk-types.js"; + +export type MSTeamsMonitorLogger = { + debug: (message: string, meta?: Record) => void; + info: (message: string, meta?: Record) => void; + error: (message: string, meta?: Record) => void; +}; + +export type MSTeamsAccessTokenProvider = { + getAccessToken: (scope: string) => Promise; +}; + +export type MSTeamsActivityHandler = { + onMessage: ( + handler: (context: unknown, next: () => Promise) => Promise, + ) => MSTeamsActivityHandler; + onMembersAdded: ( + handler: (context: unknown, next: () => Promise) => Promise, + ) => MSTeamsActivityHandler; +}; + +export type MSTeamsMessageHandlerDeps = { + cfg: ClawdbotConfig; + runtime: RuntimeEnv; + appId: string; + adapter: MSTeamsAdapter; + tokenProvider: MSTeamsAccessTokenProvider; + textLimit: number; + mediaMaxBytes: number; + conversationStore: MSTeamsConversationStore; + pollStore: MSTeamsPollStore; + log: MSTeamsMonitorLogger; +}; + +export function registerMSTeamsHandlers( + handler: T, + deps: MSTeamsMessageHandlerDeps, +): T { + const handleTeamsMessage = createMSTeamsMessageHandler(deps); + + return handler + .onMessage(async (context, next) => { + try { + await handleTeamsMessage(context as MSTeamsTurnContext); + } catch (err) { + deps.runtime.error?.(danger(`msteams handler failed: ${String(err)}`)); + } + await next(); + }) + .onMembersAdded(async (context, next) => { + const membersAdded = + (context as MSTeamsTurnContext).activity?.membersAdded ?? []; + for (const member of membersAdded) { + if ( + member.id !== (context as MSTeamsTurnContext).activity?.recipient?.id + ) { + deps.log.debug("member added", { member: member.id }); + // Don't send welcome message - let the user initiate conversation. + } + } + await next(); + }); +} + +function createMSTeamsMessageHandler(deps: MSTeamsMessageHandlerDeps) { + const { + cfg, + runtime, + appId, + adapter, + tokenProvider, + textLimit, + mediaMaxBytes, + conversationStore, + pollStore, + log, + } = deps; + const msteamsCfg = cfg.msteams; + + return async function handleTeamsMessage(context: MSTeamsTurnContext) { + const activity = context.activity; + const rawText = activity.text?.trim() ?? ""; + const text = stripMSTeamsMentionTags(rawText); + const attachments = Array.isArray(activity.attachments) + ? (activity.attachments as unknown as MSTeamsAttachmentLike[]) + : []; + const attachmentPlaceholder = + buildMSTeamsAttachmentPlaceholder(attachments); + const rawBody = text || attachmentPlaceholder; + const from = activity.from; + const conversation = activity.conversation; + + const attachmentTypes = attachments + .map((att) => + typeof att.contentType === "string" ? att.contentType : undefined, + ) + .filter(Boolean) + .slice(0, 3); + const htmlSummary = summarizeMSTeamsHtmlAttachments(attachments); + + log.info("received message", { + rawText: rawText.slice(0, 50), + text: text.slice(0, 50), + attachments: attachments.length, + attachmentTypes, + from: from?.id, + conversation: conversation?.id, + }); + if (htmlSummary) { + log.debug("html attachment summary", htmlSummary); + } + + if (!from?.id) { + log.debug("skipping message without from.id"); + return; + } + + // Teams conversation.id may include ";messageid=..." suffix - strip it for session key + const rawConversationId = conversation?.id ?? ""; + const conversationId = normalizeMSTeamsConversationId(rawConversationId); + const conversationMessageId = + extractMSTeamsConversationMessageId(rawConversationId); + const conversationType = conversation?.conversationType ?? "personal"; + const isGroupChat = + conversationType === "groupChat" || conversation?.isGroup === true; + const isChannel = conversationType === "channel"; + const isDirectMessage = !isGroupChat && !isChannel; + + const senderName = from.name ?? from.id; + const senderId = from.aadObjectId ?? from.id; + + // Check DM policy for direct messages + if (isDirectMessage && msteamsCfg) { + const dmPolicy = msteamsCfg.dmPolicy ?? "pairing"; + const allowFrom = msteamsCfg.allowFrom ?? []; + + if (dmPolicy === "disabled") { + log.debug("dropping dm (dms disabled)"); + return; + } + + if (dmPolicy !== "open") { + // Check allowlist - look up from config and pairing store + const storedAllowFrom = await readProviderAllowFromStore("msteams"); + const effectiveAllowFrom = [ + ...allowFrom.map((v) => String(v).toLowerCase()), + ...storedAllowFrom, + ]; + + const senderLower = senderId.toLowerCase(); + const senderNameLower = senderName.toLowerCase(); + const allowed = + effectiveAllowFrom.includes("*") || + effectiveAllowFrom.includes(senderLower) || + effectiveAllowFrom.includes(senderNameLower); + + if (!allowed) { + if (dmPolicy === "pairing") { + const request = await upsertProviderPairingRequest({ + provider: "msteams", + sender: senderId, + label: senderName, + }); + if (request) { + log.info("msteams pairing request created", { + sender: senderId, + label: senderName, + }); + } + } + log.debug("dropping dm (not allowlisted)", { + sender: senderId, + label: senderName, + }); + return; + } + } + } + + // Build conversation reference for proactive replies + const agent = activity.recipient; + const teamId = activity.channelData?.team?.id; + const conversationRef: StoredConversationReference = { + activityId: activity.id, + user: { id: from.id, name: from.name, aadObjectId: from.aadObjectId }, + agent, + bot: agent ? { id: agent.id, name: agent.name } : undefined, + conversation: { + id: conversationId, + conversationType, + tenantId: conversation?.tenantId, + }, + teamId, + channelId: activity.channelId, + serviceUrl: activity.serviceUrl, + locale: activity.locale, + }; + conversationStore.upsert(conversationId, conversationRef).catch((err) => { + log.debug("failed to save conversation reference", { + error: formatUnknownError(err), + }); + }); + + const pollVote = extractMSTeamsPollVote(activity); + if (pollVote) { + try { + const poll = await pollStore.recordVote({ + pollId: pollVote.pollId, + voterId: senderId, + selections: pollVote.selections, + }); + if (!poll) { + log.debug("poll vote ignored (poll not found)", { + pollId: pollVote.pollId, + }); + } else { + log.info("recorded poll vote", { + pollId: pollVote.pollId, + voter: senderId, + selections: pollVote.selections, + }); + } + } catch (err) { + log.error("failed to record poll vote", { + pollId: pollVote.pollId, + error: formatUnknownError(err), + }); + } + return; + } + + if (!rawBody) { + log.debug("skipping empty message after stripping mentions"); + return; + } + + // Build Teams-specific identifiers + const teamsFrom = isDirectMessage + ? `msteams:${senderId}` + : isChannel + ? `msteams:channel:${conversationId}` + : `msteams:group:${conversationId}`; + const teamsTo = isDirectMessage + ? `user:${senderId}` + : `conversation:${conversationId}`; + + // Resolve routing + const route = resolveAgentRoute({ + cfg, + provider: "msteams", + peer: { + kind: isDirectMessage ? "dm" : isChannel ? "channel" : "group", + id: isDirectMessage ? senderId : conversationId, + }, + }); + + const preview = rawBody.replace(/\s+/g, " ").slice(0, 160); + const inboundLabel = isDirectMessage + ? `Teams DM from ${senderName}` + : `Teams message in ${conversationType} from ${senderName}`; + + enqueueSystemEvent(`${inboundLabel}: ${preview}`, { + sessionKey: route.sessionKey, + contextKey: `msteams:message:${conversationId}:${activity.id ?? "unknown"}`, + }); + + // Resolve team/channel config for channels and group chats + const channelId = conversationId; + const { teamConfig, channelConfig } = resolveMSTeamsRouteConfig({ + cfg: msteamsCfg, + teamId, + conversationId: channelId, + }); + const { requireMention, replyStyle } = resolveMSTeamsReplyPolicy({ + isDirectMessage, + globalConfig: msteamsCfg, + teamConfig, + channelConfig, + }); + + // Check requireMention for channels and group chats + if (!isDirectMessage) { + const mentioned = wasMSTeamsBotMentioned(activity); + + if (requireMention && !mentioned) { + log.debug("skipping message (mention required)", { + teamId, + channelId, + requireMention, + mentioned, + }); + return; + } + } + + // Format the message body with envelope + const timestamp = parseMSTeamsActivityTimestamp(activity.timestamp); + let mediaList = await downloadMSTeamsImageAttachments({ + attachments, + maxBytes: mediaMaxBytes, + tokenProvider: { + getAccessToken: (scope) => tokenProvider.getAccessToken(scope), + }, + allowHosts: msteamsCfg?.mediaAllowHosts, + }); + if (mediaList.length === 0) { + const onlyHtmlAttachments = + attachments.length > 0 && + attachments.every((att) => + String(att.contentType ?? "").startsWith("text/html"), + ); + if (onlyHtmlAttachments) { + const messageUrls = buildMSTeamsGraphMessageUrls({ + conversationType, + conversationId, + messageId: activity.id ?? undefined, + replyToId: activity.replyToId ?? undefined, + conversationMessageId, + channelData: activity.channelData, + }); + if (messageUrls.length === 0) { + log.debug("graph message url unavailable", { + conversationType, + hasChannelData: Boolean(activity.channelData), + messageId: activity.id ?? undefined, + replyToId: activity.replyToId ?? undefined, + }); + } else { + const attempts: Array<{ + url: string; + hostedStatus?: number; + attachmentStatus?: number; + hostedCount?: number; + attachmentCount?: number; + tokenError?: boolean; + }> = []; + for (const messageUrl of messageUrls) { + const graphMedia = await downloadMSTeamsGraphMedia({ + messageUrl, + tokenProvider: { + getAccessToken: (scope) => tokenProvider.getAccessToken(scope), + }, + maxBytes: mediaMaxBytes, + allowHosts: msteamsCfg?.mediaAllowHosts, + }); + attempts.push({ + url: messageUrl, + hostedStatus: graphMedia.hostedStatus, + attachmentStatus: graphMedia.attachmentStatus, + hostedCount: graphMedia.hostedCount, + attachmentCount: graphMedia.attachmentCount, + tokenError: graphMedia.tokenError, + }); + if (graphMedia.media.length > 0) { + mediaList = graphMedia.media; + break; + } + if (graphMedia.tokenError) break; + } + if (mediaList.length === 0) { + log.debug("graph media fetch empty", { attempts }); + } + } + } + } + if (mediaList.length > 0) { + log.debug("downloaded image attachments", { count: mediaList.length }); + } else if (htmlSummary?.imgTags) { + log.debug("inline images detected but none downloaded", { + imgTags: htmlSummary.imgTags, + srcHosts: htmlSummary.srcHosts, + dataImages: htmlSummary.dataImages, + cidImages: htmlSummary.cidImages, + }); + } + const mediaPayload = buildMSTeamsMediaPayload(mediaList); + const body = formatAgentEnvelope({ + provider: "Teams", + from: senderName, + timestamp, + body: rawBody, + }); + + // Build context payload for agent + const ctxPayload = { + Body: body, + From: teamsFrom, + To: teamsTo, + SessionKey: route.sessionKey, + AccountId: route.accountId, + ChatType: isDirectMessage ? "direct" : isChannel ? "room" : "group", + GroupSubject: !isDirectMessage ? conversationType : undefined, + SenderName: senderName, + SenderId: senderId, + Provider: "msteams" as const, + Surface: "msteams" as const, + MessageSid: activity.id, + Timestamp: timestamp?.getTime() ?? Date.now(), + WasMentioned: isDirectMessage || wasMSTeamsBotMentioned(activity), + CommandAuthorized: true, + OriginatingChannel: "msteams" as const, + OriginatingTo: teamsTo, + ...mediaPayload, + }; + + if (shouldLogVerbose()) { + logVerbose( + `msteams inbound: from=${ctxPayload.From} preview="${preview}"`, + ); + } + + // Send typing indicator + const sendTypingIndicator = async () => { + try { + await context.sendActivities([{ type: "typing" }]); + } catch { + // Typing indicator is best-effort. + } + }; + + // Create reply dispatcher + const { dispatcher, replyOptions, markDispatchIdle } = + createReplyDispatcherWithTyping({ + responsePrefix: cfg.messages?.responsePrefix, + deliver: async (payload) => { + const messages = renderReplyPayloadsToMessages([payload], { + textChunkLimit: textLimit, + chunkText: true, + mediaMode: "split", + }); + await sendMSTeamsMessages({ + replyStyle, + adapter, + appId, + conversationRef, + context, + messages, + // Enable default retry/backoff for throttling/transient failures. + retry: {}, + onRetry: (event) => { + log.debug("retrying send", { + replyStyle, + ...event, + }); + }, + }); + }, + onError: (err, info) => { + const errMsg = formatUnknownError(err); + const classification = classifyMSTeamsSendError(err); + const hint = formatMSTeamsSendErrorHint(classification); + runtime.error?.( + danger( + `msteams ${info.kind} reply failed: ${errMsg}${hint ? ` (${hint})` : ""}`, + ), + ); + log.error("reply failed", { + kind: info.kind, + error: errMsg, + classification, + hint, + }); + }, + onReplyStart: sendTypingIndicator, + }); + + // Dispatch to agent + log.info("dispatching to agent", { sessionKey: route.sessionKey }); + try { + const { queuedFinal, counts } = await dispatchReplyFromConfig({ + ctx: ctxPayload, + cfg, + dispatcher, + replyOptions, + }); + + markDispatchIdle(); + log.info("dispatch complete", { queuedFinal, counts }); + + if (!queuedFinal) return; + if (shouldLogVerbose()) { + const finalCount = counts.final; + logVerbose( + `msteams: delivered ${finalCount} reply${finalCount === 1 ? "" : "ies"} to ${teamsTo}`, + ); + } + } catch (err) { + log.error("dispatch failed", { error: String(err) }); + runtime.error?.(danger(`msteams dispatch failed: ${String(err)}`)); + // Try to send error message back to Teams. + try { + await context.sendActivity( + `⚠️ Agent failed: ${err instanceof Error ? err.message : String(err)}`, + ); + } catch { + // Best effort. + } + } + }; +} diff --git a/src/msteams/monitor.ts b/src/msteams/monitor.ts index df0eaf6c6c..b00de330ae 100644 --- a/src/msteams/monitor.ts +++ b/src/msteams/monitor.ts @@ -1,59 +1,14 @@ import type { Request, Response } from "express"; import { resolveTextChunkLimit } from "../auto-reply/chunk.js"; -import { formatAgentEnvelope } from "../auto-reply/envelope.js"; -import { dispatchReplyFromConfig } from "../auto-reply/reply/dispatch-from-config.js"; -import { createReplyDispatcherWithTyping } from "../auto-reply/reply/reply-dispatcher.js"; import type { ClawdbotConfig } from "../config/types.js"; -import { danger, logVerbose, shouldLogVerbose } from "../globals.js"; -import { enqueueSystemEvent } from "../infra/system-events.js"; import { getChildLogger } from "../logging.js"; -import { - readProviderAllowFromStore, - upsertProviderPairingRequest, -} from "../pairing/pairing-store.js"; -import { resolveAgentRoute } from "../routing/resolve-route.js"; import type { RuntimeEnv } from "../runtime.js"; -import { - buildMSTeamsAttachmentPlaceholder, - buildMSTeamsGraphMessageUrls, - buildMSTeamsMediaPayload, - downloadMSTeamsGraphMedia, - downloadMSTeamsImageAttachments, - type MSTeamsAttachmentLike, - summarizeMSTeamsHtmlAttachments, -} from "./attachments.js"; -import type { - MSTeamsConversationStore, - StoredConversationReference, -} from "./conversation-store.js"; +import type { MSTeamsConversationStore } from "./conversation-store.js"; import { createMSTeamsConversationStoreFs } from "./conversation-store-fs.js"; -import { - classifyMSTeamsSendError, - formatMSTeamsSendErrorHint, - formatUnknownError, -} from "./errors.js"; -import { - extractMSTeamsConversationMessageId, - normalizeMSTeamsConversationId, - parseMSTeamsActivityTimestamp, - stripMSTeamsMentionTags, - wasMSTeamsBotMentioned, -} from "./inbound.js"; -import { - type MSTeamsAdapter, - renderReplyPayloadsToMessages, - sendMSTeamsMessages, -} from "./messenger.js"; -import { - resolveMSTeamsReplyPolicy, - resolveMSTeamsRouteConfig, -} from "./policy.js"; -import { - createMSTeamsPollStoreFs, - extractMSTeamsPollVote, - type MSTeamsPollStore, -} from "./polls.js"; -import type { MSTeamsTurnContext } from "./sdk-types.js"; +import { formatUnknownError } from "./errors.js"; +import type { MSTeamsAdapter } from "./messenger.js"; +import { registerMSTeamsHandlers } from "./monitor-handler.js"; +import { createMSTeamsPollStoreFs, type MSTeamsPollStore } from "./polls.js"; import { resolveMSTeamsCredentials } from "./token.js"; const log = getChildLogger({ name: "msteams" }); @@ -130,448 +85,18 @@ export async function monitorMSTeamsProvider( const tokenProvider = new MsalTokenProvider(authConfig); const adapter = new CloudAdapter(authConfig); - // Handler for incoming messages - async function handleTeamsMessage(context: MSTeamsTurnContext) { - const activity = context.activity; - const rawText = activity.text?.trim() ?? ""; - const text = stripMSTeamsMentionTags(rawText); - const attachments = Array.isArray(activity.attachments) - ? (activity.attachments as unknown as MSTeamsAttachmentLike[]) - : []; - const attachmentPlaceholder = - buildMSTeamsAttachmentPlaceholder(attachments); - const rawBody = text || attachmentPlaceholder; - const from = activity.from; - const conversation = activity.conversation; - - const attachmentTypes = attachments - .map((att) => - typeof att.contentType === "string" ? att.contentType : undefined, - ) - .filter(Boolean) - .slice(0, 3); - const htmlSummary = summarizeMSTeamsHtmlAttachments(attachments); - - log.info("received message", { - rawText: rawText.slice(0, 50), - text: text.slice(0, 50), - attachments: attachments.length, - attachmentTypes, - from: from?.id, - conversation: conversation?.id, - }); - if (htmlSummary) { - log.debug("html attachment summary", htmlSummary); - } - - if (!from?.id) { - log.debug("skipping message without from.id"); - return; - } - - // Teams conversation.id may include ";messageid=..." suffix - strip it for session key - const rawConversationId = conversation?.id ?? ""; - const conversationId = normalizeMSTeamsConversationId(rawConversationId); - const conversationMessageId = - extractMSTeamsConversationMessageId(rawConversationId); - const conversationType = conversation?.conversationType ?? "personal"; - const isGroupChat = - conversationType === "groupChat" || conversation?.isGroup === true; - const isChannel = conversationType === "channel"; - const isDirectMessage = !isGroupChat && !isChannel; - - const senderName = from.name ?? from.id; - const senderId = from.aadObjectId ?? from.id; - - // Check DM policy for direct messages - if (isDirectMessage && msteamsCfg) { - const dmPolicy = msteamsCfg.dmPolicy ?? "pairing"; - const allowFrom = msteamsCfg.allowFrom ?? []; - - if (dmPolicy === "disabled") { - log.debug("dropping dm (dms disabled)"); - return; - } - - if (dmPolicy !== "open") { - // Check allowlist - look up from config and pairing store - const storedAllowFrom = await readProviderAllowFromStore("msteams"); - const effectiveAllowFrom = [ - ...allowFrom.map((v) => String(v).toLowerCase()), - ...storedAllowFrom.map((v) => v.toLowerCase()), - ]; - - const senderLower = senderId.toLowerCase(); - const permitted = effectiveAllowFrom.some( - (entry) => entry === senderLower || entry === "*", - ); - - if (!permitted) { - if (dmPolicy === "pairing") { - const { code, created } = await upsertProviderPairingRequest({ - provider: "msteams", - id: senderId, - meta: { name: senderName }, - }); - const msg = created - ? `👋 Hi ${senderName}! To chat with me, please share this pairing code with my owner: **${code}**` - : `🔑 Your pairing code is: **${code}** — please share it with my owner to get access.`; - await context.sendActivity(msg); - log.info("sent pairing code", { senderId, code }); - } else { - log.debug("dropping unauthorized dm", { senderId, dmPolicy }); - } - return; - } - } - } - - // Save conversation reference for proactive messaging - const agent = activity.recipient - ? { - id: activity.recipient.id, - name: activity.recipient.name, - aadObjectId: activity.recipient.aadObjectId, - } - : undefined; - const teamId = activity.channelData?.team?.id; - const conversationRef: StoredConversationReference = { - activityId: activity.id, - user: { id: from.id, name: from.name, aadObjectId: from.aadObjectId }, - agent, - bot: agent ? { id: agent.id, name: agent.name } : undefined, - conversation: { - id: conversationId, - conversationType, - tenantId: conversation?.tenantId, - }, - teamId, - channelId: activity.channelId, - serviceUrl: activity.serviceUrl, - }; - conversationStore.upsert(conversationId, conversationRef).catch((err) => { - log.debug("failed to save conversation reference", { - error: formatUnknownError(err), - }); - }); - - const pollVote = extractMSTeamsPollVote(activity); - if (pollVote) { - try { - const poll = await pollStore.recordVote({ - pollId: pollVote.pollId, - voterId: senderId, - selections: pollVote.selections, - }); - if (!poll) { - log.debug("poll vote ignored (poll not found)", { - pollId: pollVote.pollId, - }); - } else { - log.info("recorded poll vote", { - pollId: pollVote.pollId, - voter: senderId, - selections: pollVote.selections, - }); - } - } catch (err) { - log.error("failed to record poll vote", { - pollId: pollVote.pollId, - error: formatUnknownError(err), - }); - } - return; - } - - if (!rawBody) { - log.debug("skipping empty message after stripping mentions"); - return; - } - - // Build Teams-specific identifiers - const teamsFrom = isDirectMessage - ? `msteams:${senderId}` - : isChannel - ? `msteams:channel:${conversationId}` - : `msteams:group:${conversationId}`; - const teamsTo = isDirectMessage - ? `user:${senderId}` - : `conversation:${conversationId}`; - - // Resolve routing - const route = resolveAgentRoute({ - cfg, - provider: "msteams", - peer: { - kind: isDirectMessage ? "dm" : isChannel ? "channel" : "group", - id: isDirectMessage ? senderId : conversationId, - }, - }); - - const preview = rawBody.replace(/\s+/g, " ").slice(0, 160); - const inboundLabel = isDirectMessage - ? `Teams DM from ${senderName}` - : `Teams message in ${conversationType} from ${senderName}`; - - enqueueSystemEvent(`${inboundLabel}: ${preview}`, { - sessionKey: route.sessionKey, - contextKey: `msteams:message:${conversationId}:${activity.id ?? "unknown"}`, - }); - - // Resolve team/channel config for channels and group chats - const channelId = conversationId; - const { teamConfig, channelConfig } = resolveMSTeamsRouteConfig({ - cfg: msteamsCfg, - teamId, - conversationId: channelId, - }); - const { requireMention, replyStyle } = resolveMSTeamsReplyPolicy({ - isDirectMessage, - globalConfig: msteamsCfg, - teamConfig, - channelConfig, - }); - - // Check requireMention for channels and group chats - if (!isDirectMessage) { - const mentioned = wasMSTeamsBotMentioned(activity); - - if (requireMention && !mentioned) { - log.debug("skipping message (mention required)", { - teamId, - channelId, - requireMention, - mentioned, - }); - return; - } - } - - // Format the message body with envelope - const timestamp = parseMSTeamsActivityTimestamp(activity.timestamp); - let mediaList = await downloadMSTeamsImageAttachments({ - attachments, - maxBytes: mediaMaxBytes, - tokenProvider: { - getAccessToken: (scope) => tokenProvider.getAccessToken(scope), - }, - allowHosts: msteamsCfg?.mediaAllowHosts, - }); - if (mediaList.length === 0) { - const onlyHtmlAttachments = - attachments.length > 0 && - attachments.every((att) => - String(att.contentType ?? "").startsWith("text/html"), - ); - if (onlyHtmlAttachments) { - const messageUrls = buildMSTeamsGraphMessageUrls({ - conversationType, - conversationId, - messageId: activity.id ?? undefined, - replyToId: activity.replyToId ?? undefined, - conversationMessageId, - channelData: activity.channelData, - }); - if (messageUrls.length === 0) { - log.debug("graph message url unavailable", { - conversationType, - hasChannelData: Boolean(activity.channelData), - messageId: activity.id ?? undefined, - replyToId: activity.replyToId ?? undefined, - }); - } else { - const attempts: Array<{ - url: string; - hostedStatus?: number; - attachmentStatus?: number; - hostedCount?: number; - attachmentCount?: number; - tokenError?: boolean; - }> = []; - for (const messageUrl of messageUrls) { - const graphMedia = await downloadMSTeamsGraphMedia({ - messageUrl, - tokenProvider: { - getAccessToken: (scope) => tokenProvider.getAccessToken(scope), - }, - maxBytes: mediaMaxBytes, - allowHosts: msteamsCfg?.mediaAllowHosts, - }); - attempts.push({ - url: messageUrl, - hostedStatus: graphMedia.hostedStatus, - attachmentStatus: graphMedia.attachmentStatus, - hostedCount: graphMedia.hostedCount, - attachmentCount: graphMedia.attachmentCount, - tokenError: graphMedia.tokenError, - }); - if (graphMedia.media.length > 0) { - mediaList = graphMedia.media; - break; - } - if (graphMedia.tokenError) break; - } - if (mediaList.length === 0) { - log.debug("graph media fetch empty", { attempts }); - } - } - } - } - if (mediaList.length > 0) { - log.debug("downloaded image attachments", { count: mediaList.length }); - } else if (htmlSummary?.imgTags) { - log.debug("inline images detected but none downloaded", { - imgTags: htmlSummary.imgTags, - srcHosts: htmlSummary.srcHosts, - dataImages: htmlSummary.dataImages, - cidImages: htmlSummary.cidImages, - }); - } - const mediaPayload = buildMSTeamsMediaPayload(mediaList); - const body = formatAgentEnvelope({ - provider: "Teams", - from: senderName, - timestamp, - body: rawBody, - }); - - // Build context payload for agent - const ctxPayload = { - Body: body, - From: teamsFrom, - To: teamsTo, - SessionKey: route.sessionKey, - AccountId: route.accountId, - ChatType: isDirectMessage ? "direct" : isChannel ? "room" : "group", - GroupSubject: !isDirectMessage ? conversationType : undefined, - SenderName: senderName, - SenderId: senderId, - Provider: "msteams" as const, - Surface: "msteams" as const, - MessageSid: activity.id, - Timestamp: timestamp?.getTime() ?? Date.now(), - WasMentioned: isDirectMessage || wasMSTeamsBotMentioned(activity), - CommandAuthorized: true, - OriginatingChannel: "msteams" as const, - OriginatingTo: teamsTo, - ...mediaPayload, - }; - - if (shouldLogVerbose()) { - logVerbose( - `msteams inbound: from=${ctxPayload.From} preview="${preview}"`, - ); - } - - // Send typing indicator - const sendTypingIndicator = async () => { - try { - await context.sendActivities([{ type: "typing" }]); - } catch { - // Typing indicator is best-effort - } - }; - - // Create reply dispatcher - const { dispatcher, replyOptions, markDispatchIdle } = - createReplyDispatcherWithTyping({ - responsePrefix: cfg.messages?.responsePrefix, - deliver: async (payload) => { - const messages = renderReplyPayloadsToMessages([payload], { - textChunkLimit: textLimit, - chunkText: true, - mediaMode: "split", - }); - await sendMSTeamsMessages({ - replyStyle, - adapter: adapter as unknown as MSTeamsAdapter, - appId, - conversationRef, - context, - messages, - // Enable default retry/backoff for throttling/transient failures. - retry: {}, - onRetry: (event) => { - log.debug("retrying send", { - replyStyle, - ...event, - }); - }, - }); - }, - onError: (err, info) => { - const errMsg = formatUnknownError(err); - const classification = classifyMSTeamsSendError(err); - const hint = formatMSTeamsSendErrorHint(classification); - runtime.error?.( - danger( - `msteams ${info.kind} reply failed: ${errMsg}${hint ? ` (${hint})` : ""}`, - ), - ); - log.error("reply failed", { - kind: info.kind, - error: errMsg, - classification, - hint, - }); - }, - onReplyStart: sendTypingIndicator, - }); - - // Dispatch to agent - log.info("dispatching to agent", { sessionKey: route.sessionKey }); - try { - const { queuedFinal, counts } = await dispatchReplyFromConfig({ - ctx: ctxPayload, - cfg, - dispatcher, - replyOptions, - }); - - markDispatchIdle(); - log.info("dispatch complete", { queuedFinal, counts }); - - if (!queuedFinal) return; - if (shouldLogVerbose()) { - const finalCount = counts.final; - logVerbose( - `msteams: delivered ${finalCount} reply${finalCount === 1 ? "" : "ies"} to ${teamsTo}`, - ); - } - } catch (err) { - log.error("dispatch failed", { error: String(err) }); - runtime.error?.(danger(`msteams dispatch failed: ${String(err)}`)); - // Try to send error message back to Teams - try { - await context.sendActivity( - `⚠️ Agent failed: ${err instanceof Error ? err.message : String(err)}`, - ); - } catch { - // Best effort - } - } - } - - // Create activity handler using fluent API - const handler = new ActivityHandler() - .onMessage(async (context, next) => { - try { - await handleTeamsMessage(context as unknown as MSTeamsTurnContext); - } catch (err) { - runtime.error?.(danger(`msteams handler failed: ${String(err)}`)); - } - await next(); - }) - .onMembersAdded(async (context, next) => { - const membersAdded = context.activity?.membersAdded ?? []; - for (const member of membersAdded) { - if (member.id !== context.activity?.recipient?.id) { - log.debug("member added", { member: member.id }); - // Don't send welcome message - let the user initiate conversation - } - } - await next(); - }); + const handler = registerMSTeamsHandlers(new ActivityHandler(), { + cfg, + runtime, + appId, + adapter: adapter as unknown as MSTeamsAdapter, + tokenProvider, + textLimit, + mediaMaxBytes, + conversationStore, + pollStore, + log, + }); // Create Express server const expressApp = express.default(); diff --git a/src/msteams/polls-store-memory.test.ts b/src/msteams/polls-store-memory.test.ts new file mode 100644 index 0000000000..ba1f9cf69d --- /dev/null +++ b/src/msteams/polls-store-memory.test.ts @@ -0,0 +1,25 @@ +import { describe, expect, it } from "vitest"; + +import { createMSTeamsPollStoreMemory } from "./polls-store-memory.js"; + +describe("msteams poll memory store", () => { + it("stores polls and records normalized votes", async () => { + const store = createMSTeamsPollStoreMemory(); + await store.createPoll({ + id: "poll-1", + question: "Lunch?", + options: ["Pizza", "Sushi"], + maxSelections: 1, + createdAt: new Date().toISOString(), + votes: {}, + }); + + const poll = await store.recordVote({ + pollId: "poll-1", + voterId: "user-1", + selections: ["0", "1"], + }); + + expect(poll?.votes["user-1"]).toEqual(["0"]); + }); +}); diff --git a/src/msteams/polls-store-memory.ts b/src/msteams/polls-store-memory.ts new file mode 100644 index 0000000000..9873ccba22 --- /dev/null +++ b/src/msteams/polls-store-memory.ts @@ -0,0 +1,36 @@ +import { + type MSTeamsPoll, + type MSTeamsPollStore, + normalizeMSTeamsPollSelections, +} from "./polls.js"; + +export function createMSTeamsPollStoreMemory( + initial: MSTeamsPoll[] = [], +): MSTeamsPollStore { + const polls = new Map(); + for (const poll of initial) { + polls.set(poll.id, { ...poll }); + } + + const createPoll = async (poll: MSTeamsPoll) => { + polls.set(poll.id, { ...poll }); + }; + + const getPoll = async (pollId: string) => polls.get(pollId) ?? null; + + const recordVote = async (params: { + pollId: string; + voterId: string; + selections: string[]; + }) => { + const poll = polls.get(params.pollId); + if (!poll) return null; + const normalized = normalizeMSTeamsPollSelections(poll, params.selections); + poll.votes[params.voterId] = normalized; + poll.updatedAt = new Date().toISOString(); + polls.set(poll.id, poll); + return poll; + }; + + return { createPoll, getPoll, recordVote }; +} diff --git a/src/msteams/polls.ts b/src/msteams/polls.ts index 55c15c7289..3ad65c41da 100644 --- a/src/msteams/polls.ts +++ b/src/msteams/polls.ts @@ -232,13 +232,23 @@ export function buildMSTeamsPollCard(params: { }; } -function resolveStorePath( - env: NodeJS.ProcessEnv = process.env, - homedir?: () => string, -): string { - const stateDir = homedir - ? resolveStateDir(env, homedir) - : resolveStateDir(env); +export type MSTeamsPollStoreFsOptions = { + env?: NodeJS.ProcessEnv; + homedir?: () => string; + stateDir?: string; + storePath?: string; +}; + +function resolveStorePath(params?: MSTeamsPollStoreFsOptions): string { + if (params?.storePath) { + return params.storePath; + } + if (params?.stateDir) { + return path.join(params.stateDir, STORE_FILENAME); + } + const stateDir = params?.homedir + ? resolveStateDir(params.env ?? process.env, params.homedir) + : resolveStateDir(params?.env ?? process.env); return path.join(stateDir, STORE_FILENAME); } @@ -336,7 +346,10 @@ function pruneToLimit(polls: Record) { return Object.fromEntries(keep); } -function normalizePollSelections(poll: MSTeamsPoll, selections: string[]) { +export function normalizeMSTeamsPollSelections( + poll: MSTeamsPoll, + selections: string[], +) { const maxSelections = Math.max(1, poll.maxSelections); const mapped = selections .map((entry) => Number.parseInt(entry, 10)) @@ -348,11 +361,10 @@ function normalizePollSelections(poll: MSTeamsPoll, selections: string[]) { return Array.from(new Set(limited)); } -export function createMSTeamsPollStoreFs(params?: { - env?: NodeJS.ProcessEnv; - homedir?: () => string; -}): MSTeamsPollStore { - const filePath = resolveStorePath(params?.env, params?.homedir); +export function createMSTeamsPollStoreFs( + params?: MSTeamsPollStoreFsOptions, +): MSTeamsPollStore { + const filePath = resolveStorePath(params); const empty: PollStoreData = { version: 1, polls: {} }; const readStore = async (): Promise => { @@ -388,7 +400,10 @@ export function createMSTeamsPollStoreFs(params?: { const data = await readStore(); const poll = data.polls[params.pollId]; if (!poll) return null; - const normalized = normalizePollSelections(poll, params.selections); + const normalized = normalizeMSTeamsPollSelections( + poll, + params.selections, + ); poll.votes[params.voterId] = normalized; poll.updatedAt = new Date().toISOString(); data.polls[poll.id] = poll; diff --git a/src/msteams/send.ts b/src/msteams/send.ts index 2371447dd4..88f2138ac0 100644 --- a/src/msteams/send.ts +++ b/src/msteams/send.ts @@ -117,6 +117,66 @@ function extractMessageId(response: unknown): string | null { return id; } +type MSTeamsProactiveContext = { + appId: string; + conversationId: string; + ref: StoredConversationReference; + adapter: MSTeamsAdapter; + log: Awaited>; +}; + +async function resolveMSTeamsSendContext(params: { + cfg: ClawdbotConfig; + to: string; +}): Promise { + const msteamsCfg = params.cfg.msteams; + + if (!msteamsCfg?.enabled) { + throw new Error("msteams provider is not enabled"); + } + + const creds = resolveMSTeamsCredentials(msteamsCfg); + if (!creds) { + throw new Error("msteams credentials not configured"); + } + + const store = createMSTeamsConversationStoreFs(); + + // Parse recipient and find conversation reference + const recipient = parseRecipient(params.to); + const found = await findConversationReference({ ...recipient, store }); + + if (!found) { + throw new Error( + `No conversation reference found for ${recipient.type}:${recipient.id}. ` + + `The bot must receive a message from this conversation before it can send proactively.`, + ); + } + + const { conversationId, ref } = found; + const log = await getLog(); + + // Dynamic import to avoid loading SDK when not needed + const agentsHosting = await import("@microsoft/agents-hosting"); + const { CloudAdapter, getAuthConfigWithDefaults } = agentsHosting; + + const authConfig = getAuthConfigWithDefaults({ + clientId: creds.appId, + clientSecret: creds.appPassword, + tenantId: creds.tenantId, + }); + + const adapter = new CloudAdapter(authConfig); + + return { + appId: creds.appId, + conversationId, + ref, + adapter: adapter as unknown as MSTeamsAdapter, + log, + }; +} + async function sendMSTeamsActivity(params: { adapter: MSTeamsAdapter; appId: string; @@ -151,33 +211,11 @@ export async function sendMessageMSTeams( params: SendMSTeamsMessageParams, ): Promise { const { cfg, to, text, mediaUrl } = params; - const msteamsCfg = cfg.msteams; - - if (!msteamsCfg?.enabled) { - throw new Error("msteams provider is not enabled"); - } - - const creds = resolveMSTeamsCredentials(msteamsCfg); - if (!creds) { - throw new Error("msteams credentials not configured"); - } - - const store = createMSTeamsConversationStoreFs(); - - // Parse recipient and find conversation reference - const recipient = parseRecipient(to); - const found = await findConversationReference({ ...recipient, store }); - - if (!found) { - throw new Error( - `No conversation reference found for ${recipient.type}:${recipient.id}. ` + - `The bot must receive a message from this conversation before it can send proactively.`, - ); - } - - const { conversationId, ref } = found; - - const log = await getLog(); + const { adapter, appId, conversationId, ref, log } = + await resolveMSTeamsSendContext({ + cfg, + to, + }); log.debug("sending proactive message", { conversationId, @@ -185,18 +223,6 @@ export async function sendMessageMSTeams( hasMedia: Boolean(mediaUrl), }); - // Dynamic import to avoid loading SDK when not needed - const agentsHosting = await import("@microsoft/agents-hosting"); - const { CloudAdapter, getAuthConfigWithDefaults } = agentsHosting; - - const authConfig = getAuthConfigWithDefaults({ - clientId: creds.appId, - clientSecret: creds.appPassword, - tenantId: creds.tenantId, - }); - - const adapter = new CloudAdapter(authConfig); - const message = mediaUrl ? text ? `${text}\n\n${mediaUrl}` @@ -206,8 +232,8 @@ export async function sendMessageMSTeams( try { messageIds = await sendMSTeamsMessages({ replyStyle: "top-level", - adapter: adapter as unknown as MSTeamsAdapter, - appId: creds.appId, + adapter, + appId, conversationRef: ref, messages: [message], // Enable default retry/backoff for throttling/transient failures. @@ -243,30 +269,11 @@ export async function sendPollMSTeams( params: SendMSTeamsPollParams, ): Promise { const { cfg, to, question, options, maxSelections } = params; - const msteamsCfg = cfg.msteams; - - if (!msteamsCfg?.enabled) { - throw new Error("msteams provider is not enabled"); - } - - const creds = resolveMSTeamsCredentials(msteamsCfg); - if (!creds) { - throw new Error("msteams credentials not configured"); - } - - const store = createMSTeamsConversationStoreFs(); - const recipient = parseRecipient(to); - const found = await findConversationReference({ ...recipient, store }); - - if (!found) { - throw new Error( - `No conversation reference found for ${recipient.type}:${recipient.id}. ` + - `The bot must receive a message from this conversation before it can send proactively.`, - ); - } - - const { conversationId, ref } = found; - const log = await getLog(); + const { adapter, appId, conversationId, ref, log } = + await resolveMSTeamsSendContext({ + cfg, + to, + }); const pollCard = buildMSTeamsPollCard({ question, @@ -280,16 +287,6 @@ export async function sendPollMSTeams( optionCount: pollCard.options.length, }); - const agentsHosting = await import("@microsoft/agents-hosting"); - const { CloudAdapter, getAuthConfigWithDefaults } = agentsHosting; - - const authConfig = getAuthConfigWithDefaults({ - clientId: creds.appId, - clientSecret: creds.appPassword, - tenantId: creds.tenantId, - }); - - const adapter = new CloudAdapter(authConfig); const activity = { type: "message", text: pollCard.fallbackText, @@ -304,8 +301,8 @@ export async function sendPollMSTeams( let messageId: string; try { messageId = await sendMSTeamsActivity({ - adapter: adapter as unknown as MSTeamsAdapter, - appId: creds.appId, + adapter, + appId, conversationRef: ref, activity, });