diff --git a/docs/channels/slack.md b/docs/channels/slack.md index ebe588034a..42844aa6da 100644 --- a/docs/channels/slack.md +++ b/docs/channels/slack.md @@ -220,6 +220,7 @@ and still route command execution against the target conversation session (`Comm - Channel sessions: `agent::slack:channel:`. - Thread replies can create thread session suffixes (`:thread:`) when applicable. - `channels.slack.thread.historyScope` default is `thread`; `thread.inheritParent` default is `false`. +- `channels.slack.thread.initialHistoryLimit` controls how many existing thread messages are fetched when a new thread session starts (default `20`; set `0` to disable). Reply threading controls: diff --git a/src/auto-reply/reply/get-reply-run.ts b/src/auto-reply/reply/get-reply-run.ts index 27a25766be..5fc6acd45f 100644 --- a/src/auto-reply/reply/get-reply-run.ts +++ b/src/auto-reply/reply/get-reply-run.ts @@ -208,7 +208,14 @@ export async function runPreparedReply( ((baseBodyTrimmedRaw.length === 0 && rawBodyTrimmed.length > 0) || isBareNewOrReset); const baseBodyFinal = isBareSessionReset ? BARE_SESSION_RESET_PROMPT : baseBody; const inboundUserContext = buildInboundUserContextPrefix( - isNewSession ? sessionCtx : { ...sessionCtx, ThreadStarterBody: undefined }, + isNewSession + ? { + ...sessionCtx, + ...(sessionCtx.ThreadHistoryBody?.trim() + ? { InboundHistory: undefined, ThreadStarterBody: undefined } + : {}), + } + : { ...sessionCtx, ThreadStarterBody: undefined }, ); const baseBodyForPrompt = isBareSessionReset ? baseBodyFinal @@ -241,6 +248,14 @@ export async function runPreparedReply( prefixedBodyBase, }); prefixedBodyBase = appendUntrustedContext(prefixedBodyBase, sessionCtx.UntrustedContext); + const threadStarterBody = ctx.ThreadStarterBody?.trim(); + const threadHistoryBody = ctx.ThreadHistoryBody?.trim(); + const threadContextNote = + isNewSession && threadHistoryBody + ? `[Thread history - for context]\n${threadHistoryBody}` + : isNewSession && threadStarterBody + ? `[Thread starter - for context]\n${threadStarterBody}` + : undefined; const skillResult = await ensureSkillSnapshot({ sessionEntry, sessionStore, @@ -255,7 +270,7 @@ export async function runPreparedReply( sessionEntry = skillResult.sessionEntry ?? sessionEntry; currentSystemSent = skillResult.systemSent; const skillsSnapshot = skillResult.skillsSnapshot; - const prefixedBody = prefixedBodyBase; + const prefixedBody = [threadContextNote, prefixedBodyBase].filter(Boolean).join("\n\n"); const mediaNote = buildInboundMediaNote(ctx); const mediaReplyHint = mediaNote ? "To send an image back, prefer the message tool (media/path/filePath). If you must inline, use MEDIA:https://example.com/image.jpg (spaces ok, quote if needed) or a safe relative path like MEDIA:./image.jpg. Avoid absolute paths (MEDIA:/...) and ~ paths — they are blocked for security. Keep caption in the text body." @@ -322,7 +337,7 @@ export async function runPreparedReply( sessionEntry, resolveSessionFilePathOptions({ agentId, storePath }), ); - const queueBodyBase = baseBodyForPrompt; + const queueBodyBase = [threadContextNote, baseBodyForPrompt].filter(Boolean).join("\n\n"); const queuedBody = mediaNote ? [mediaNote, mediaReplyHint, queueBodyBase].filter(Boolean).join("\n").trim() : queueBodyBase; diff --git a/src/auto-reply/reply/inbound-context.ts b/src/auto-reply/reply/inbound-context.ts index a653cd7725..daeeecc885 100644 --- a/src/auto-reply/reply/inbound-context.ts +++ b/src/auto-reply/reply/inbound-context.ts @@ -30,6 +30,7 @@ export function finalizeInboundContext>( normalized.CommandBody = normalizeTextField(normalized.CommandBody); normalized.Transcript = normalizeTextField(normalized.Transcript); normalized.ThreadStarterBody = normalizeTextField(normalized.ThreadStarterBody); + normalized.ThreadHistoryBody = normalizeTextField(normalized.ThreadHistoryBody); if (Array.isArray(normalized.UntrustedContext)) { const normalizedUntrusted = normalized.UntrustedContext.map((entry) => normalizeInboundTextNewlines(entry), diff --git a/src/auto-reply/reply/session.ts b/src/auto-reply/reply/session.ts index 5f561348bc..04481d89d2 100644 --- a/src/auto-reply/reply/session.ts +++ b/src/auto-reply/reply/session.ts @@ -225,7 +225,11 @@ export async function initSessionState(params: { ? evaluateSessionFreshness({ updatedAt: entry.updatedAt, now, policy: resetPolicy }).fresh : false; - if (!isNewSession && freshEntry) { + // When this is the first user message in a thread, the session entry may already + // exist (created by recordInboundSession in prepare.ts), but we should still treat + // it as a new session so that thread context (history/starter/fork) is applied. + const forceNewForThread = Boolean(ctx.IsFirstThreadTurn) && !resetTriggered; + if (!isNewSession && freshEntry && !forceNewForThread) { sessionId = entry.sessionId; systemSent = entry.systemSent ?? false; abortedLastRun = entry.abortedLastRun ?? false; diff --git a/src/auto-reply/templating.ts b/src/auto-reply/templating.ts index b38368917f..4bc9b51754 100644 --- a/src/auto-reply/templating.ts +++ b/src/auto-reply/templating.ts @@ -69,6 +69,9 @@ export type MsgContext = { ForwardedFromMessageId?: number; ForwardedDate?: number; ThreadStarterBody?: string; + /** Full thread history when starting a new thread session. */ + ThreadHistoryBody?: string; + IsFirstThreadTurn?: boolean; ThreadLabel?: string; MediaPath?: string; MediaUrl?: string; diff --git a/src/config/schema.field-metadata.ts b/src/config/schema.field-metadata.ts index e85bed6796..e1644fa01d 100644 --- a/src/config/schema.field-metadata.ts +++ b/src/config/schema.field-metadata.ts @@ -322,6 +322,7 @@ export const FIELD_LABELS: Record = { "channels.slack.userTokenReadOnly": "Slack User Token Read Only", "channels.slack.thread.historyScope": "Slack Thread History Scope", "channels.slack.thread.inheritParent": "Slack Thread Parent Inheritance", + "channels.slack.thread.initialHistoryLimit": "Slack Thread Initial History Limit", "channels.mattermost.botToken": "Mattermost Bot Token", "channels.mattermost.baseUrl": "Mattermost Base URL", "channels.mattermost.chatmode": "Mattermost Chat Mode", @@ -465,6 +466,8 @@ export const FIELD_HELP: Record = { 'Scope for Slack thread history context ("thread" isolates per thread; "channel" reuses channel history).', "channels.slack.thread.inheritParent": "If true, Slack thread sessions inherit the parent channel transcript (default: false).", + "channels.slack.thread.initialHistoryLimit": + "Maximum number of existing Slack thread messages to fetch when starting a new thread session (default: 20, set to 0 to disable).", "channels.mattermost.botToken": "Bot token from Mattermost System Console -> Integrations -> Bot Accounts.", "channels.mattermost.baseUrl": diff --git a/src/config/schema.hints.ts b/src/config/schema.hints.ts index 56f704b6d0..c31a14d472 100644 --- a/src/config/schema.hints.ts +++ b/src/config/schema.hints.ts @@ -337,6 +337,7 @@ const FIELD_LABELS: Record = { "channels.slack.userTokenReadOnly": "Slack User Token Read Only", "channels.slack.thread.historyScope": "Slack Thread History Scope", "channels.slack.thread.inheritParent": "Slack Thread Parent Inheritance", + "channels.slack.thread.initialHistoryLimit": "Slack Thread Initial History Limit", "channels.mattermost.botToken": "Mattermost Bot Token", "channels.mattermost.baseUrl": "Mattermost Base URL", "channels.mattermost.chatmode": "Mattermost Chat Mode", @@ -480,6 +481,8 @@ const FIELD_HELP: Record = { 'Scope for Slack thread history context ("thread" isolates per thread; "channel" reuses channel history).', "channels.slack.thread.inheritParent": "If true, Slack thread sessions inherit the parent channel transcript (default: false).", + "channels.slack.thread.initialHistoryLimit": + "Maximum number of existing Slack thread messages to fetch when starting a new thread session (default: 20, set to 0 to disable).", "channels.mattermost.botToken": "Bot token from Mattermost System Console -> Integrations -> Bot Accounts.", "channels.mattermost.baseUrl": diff --git a/src/config/types.slack.ts b/src/config/types.slack.ts index 4408aeb099..c79e031a8c 100644 --- a/src/config/types.slack.ts +++ b/src/config/types.slack.ts @@ -73,6 +73,8 @@ export type SlackThreadConfig = { historyScope?: "thread" | "channel"; /** If true, thread sessions inherit the parent channel transcript. Default: false. */ inheritParent?: boolean; + /** Maximum number of thread messages to fetch as context when starting a new thread session (default: 20). Set to 0 to disable thread history fetching. */ + initialHistoryLimit?: number; }; export type SlackAccountConfig = { diff --git a/src/config/zod-schema.providers-core.ts b/src/config/zod-schema.providers-core.ts index 447ea5aca7..3e98ff3126 100644 --- a/src/config/zod-schema.providers-core.ts +++ b/src/config/zod-schema.providers-core.ts @@ -447,6 +447,7 @@ export const SlackThreadSchema = z .object({ historyScope: z.enum(["thread", "channel"]).optional(), inheritParent: z.boolean().optional(), + initialHistoryLimit: z.number().int().min(0).optional(), }) .strict(); diff --git a/src/slack/monitor/media.test.ts b/src/slack/monitor/media.test.ts index 5d8565e219..3cb27cf186 100644 --- a/src/slack/monitor/media.test.ts +++ b/src/slack/monitor/media.test.ts @@ -298,3 +298,132 @@ describe("resolveSlackMedia", () => { expect(mockFetch).toHaveBeenCalledTimes(2); }); }); + +describe("resolveSlackThreadHistory", () => { + afterEach(() => { + vi.resetModules(); + vi.restoreAllMocks(); + }); + + it("paginates and returns the latest N messages across pages", async () => { + const replies = vi + .fn() + .mockResolvedValueOnce({ + messages: Array.from({ length: 200 }, (_, i) => ({ + text: `msg-${i + 1}`, + user: "U1", + ts: `${i + 1}.000`, + })), + response_metadata: { next_cursor: "cursor-2" }, + }) + .mockResolvedValueOnce({ + messages: Array.from({ length: 60 }, (_, i) => ({ + text: `msg-${i + 201}`, + user: "U1", + ts: `${i + 201}.000`, + })), + response_metadata: { next_cursor: "" }, + }); + const { resolveSlackThreadHistory } = await import("./media.js"); + const client = { + conversations: { replies }, + } as Parameters[0]["client"]; + + const result = await resolveSlackThreadHistory({ + channelId: "C1", + threadTs: "1.000", + client, + currentMessageTs: "260.000", + limit: 5, + }); + + expect(replies).toHaveBeenCalledTimes(2); + expect(replies).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ + channel: "C1", + ts: "1.000", + limit: 200, + inclusive: true, + }), + ); + expect(replies).toHaveBeenNthCalledWith( + 2, + expect.objectContaining({ + channel: "C1", + ts: "1.000", + limit: 200, + inclusive: true, + cursor: "cursor-2", + }), + ); + expect(result.map((entry) => entry.ts)).toEqual([ + "255.000", + "256.000", + "257.000", + "258.000", + "259.000", + ]); + }); + + it("includes file-only messages and drops empty-only entries", async () => { + const replies = vi.fn().mockResolvedValueOnce({ + messages: [ + { text: " ", ts: "1.000", files: [{ name: "screenshot.png" }] }, + { text: " ", ts: "2.000" }, + { text: "hello", ts: "3.000", user: "U1" }, + ], + response_metadata: { next_cursor: "" }, + }); + const { resolveSlackThreadHistory } = await import("./media.js"); + const client = { + conversations: { replies }, + } as Parameters[0]["client"]; + + const result = await resolveSlackThreadHistory({ + channelId: "C1", + threadTs: "1.000", + client, + limit: 10, + }); + + expect(result).toHaveLength(2); + expect(result[0]?.text).toBe("[attached: screenshot.png]"); + expect(result[1]?.text).toBe("hello"); + }); + + it("returns empty when limit is zero without calling Slack API", async () => { + const replies = vi.fn(); + const { resolveSlackThreadHistory } = await import("./media.js"); + const client = { + conversations: { replies }, + } as Parameters[0]["client"]; + + const result = await resolveSlackThreadHistory({ + channelId: "C1", + threadTs: "1.000", + client, + limit: 0, + }); + + expect(result).toEqual([]); + expect(replies).not.toHaveBeenCalled(); + }); + + it("returns empty when Slack API throws", async () => { + const replies = vi.fn().mockRejectedValueOnce(new Error("slack down")); + const { resolveSlackThreadHistory } = await import("./media.js"); + const client = { + conversations: { replies }, + } as Parameters[0]["client"]; + + const result = await resolveSlackThreadHistory({ + channelId: "C1", + threadTs: "1.000", + client, + limit: 20, + }); + + expect(result).toEqual([]); + }); +}); diff --git a/src/slack/monitor/media.ts b/src/slack/monitor/media.ts index df9a6b8e41..c96ca50234 100644 --- a/src/slack/monitor/media.ts +++ b/src/slack/monitor/media.ts @@ -206,3 +206,91 @@ export async function resolveSlackThreadStarter(params: { return null; } } + +export type SlackThreadMessage = { + text: string; + userId?: string; + ts?: string; + botId?: string; + files?: SlackFile[]; +}; + +type SlackRepliesPageMessage = { + text?: string; + user?: string; + bot_id?: string; + ts?: string; + files?: SlackFile[]; +}; + +type SlackRepliesPage = { + messages?: SlackRepliesPageMessage[]; + response_metadata?: { next_cursor?: string }; +}; + +/** + * Fetches the most recent messages in a Slack thread (excluding the current message). + * Used to populate thread context when a new thread session starts. + * + * Uses cursor pagination and keeps only the latest N retained messages so long threads + * still produce up-to-date context without unbounded memory growth. + */ +export async function resolveSlackThreadHistory(params: { + channelId: string; + threadTs: string; + client: SlackWebClient; + currentMessageTs?: string; + limit?: number; +}): Promise { + const maxMessages = params.limit ?? 20; + if (!Number.isFinite(maxMessages) || maxMessages <= 0) { + return []; + } + + // Slack recommends no more than 200 per page. + const fetchLimit = 200; + const retained: SlackRepliesPageMessage[] = []; + let cursor: string | undefined; + + try { + do { + const response = (await params.client.conversations.replies({ + channel: params.channelId, + ts: params.threadTs, + limit: fetchLimit, + inclusive: true, + ...(cursor ? { cursor } : {}), + })) as SlackRepliesPage; + + for (const msg of response.messages ?? []) { + // Keep messages with text OR file attachments + if (!msg.text?.trim() && !msg.files?.length) { + continue; + } + if (params.currentMessageTs && msg.ts === params.currentMessageTs) { + continue; + } + retained.push(msg); + if (retained.length > maxMessages) { + retained.shift(); + } + } + + const next = response.response_metadata?.next_cursor; + cursor = typeof next === "string" && next.trim().length > 0 ? next.trim() : undefined; + } while (cursor); + + return retained.map((msg) => ({ + // For file-only messages, create a placeholder showing attached filenames + text: msg.text?.trim() + ? msg.text + : `[attached: ${msg.files?.map((f) => f.name ?? "file").join(", ")}]`, + userId: msg.user, + botId: msg.bot_id, + ts: msg.ts, + files: msg.files, + })); + } catch { + return []; + } +} diff --git a/src/slack/monitor/message-handler/prepare.inbound-contract.test.ts b/src/slack/monitor/message-handler/prepare.inbound-contract.test.ts index 9f438c1d2a..c8c05457d2 100644 --- a/src/slack/monitor/message-handler/prepare.inbound-contract.test.ts +++ b/src/slack/monitor/message-handler/prepare.inbound-contract.test.ts @@ -1,10 +1,15 @@ import type { App } from "@slack/bolt"; -import { describe, expect, it } from "vitest"; +import fs from "node:fs"; +import os from "node:os"; +import path from "node:path"; +import { describe, expect, it, vi } from "vitest"; import type { OpenClawConfig } from "../../../config/config.js"; import type { RuntimeEnv } from "../../../runtime.js"; import type { ResolvedSlackAccount } from "../../accounts.js"; import type { SlackMessageEvent } from "../../types.js"; import { expectInboundContextContract } from "../../../../test/helpers/inbound-contract.js"; +import { resolveAgentRoute } from "../../../routing/resolve-route.js"; +import { resolveThreadSessionKeys } from "../../../routing/session-key.js"; import { createSlackMonitorContext } from "../context.js"; import { prepareSlackMessage } from "./prepare.js"; @@ -236,6 +241,207 @@ describe("slack prepareSlackMessage inbound contract", () => { expect(prepared!.ctxPayload.MessageThreadId).toBe("1.000"); }); + it("marks first thread turn and injects thread history for a new thread session", async () => { + const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-slack-thread-")); + const storePath = path.join(tmpDir, "sessions.json"); + try { + const replies = vi + .fn() + .mockResolvedValueOnce({ + messages: [{ text: "starter", user: "U2", ts: "100.000" }], + }) + .mockResolvedValueOnce({ + messages: [ + { text: "starter", user: "U2", ts: "100.000" }, + { text: "assistant reply", bot_id: "B1", ts: "100.500" }, + { text: "follow-up question", user: "U1", ts: "100.800" }, + { text: "current message", user: "U1", ts: "101.000" }, + ], + response_metadata: { next_cursor: "" }, + }); + const slackCtx = createSlackMonitorContext({ + cfg: { + session: { store: storePath }, + channels: { slack: { enabled: true, replyToMode: "all", groupPolicy: "open" } }, + } as OpenClawConfig, + accountId: "default", + botToken: "token", + app: { client: { conversations: { replies } } } as App, + runtime: {} as RuntimeEnv, + botUserId: "B1", + teamId: "T1", + apiAppId: "A1", + historyLimit: 0, + sessionScope: "per-sender", + mainKey: "main", + dmEnabled: true, + dmPolicy: "open", + allowFrom: [], + groupDmEnabled: true, + groupDmChannels: [], + defaultRequireMention: false, + groupPolicy: "open", + useAccessGroups: false, + reactionMode: "off", + reactionAllowlist: [], + replyToMode: "all", + threadHistoryScope: "thread", + threadInheritParent: false, + slashCommand: { + enabled: false, + name: "openclaw", + sessionPrefix: "slack:slash", + ephemeral: true, + }, + textLimit: 4000, + ackReactionScope: "group-mentions", + mediaMaxBytes: 1024, + removeAckAfterReply: false, + }); + slackCtx.resolveUserName = async (id: string) => ({ + name: id === "U1" ? "Alice" : "Bob", + }); + slackCtx.resolveChannelName = async () => ({ name: "general", type: "channel" }); + + const account: ResolvedSlackAccount = { + accountId: "default", + enabled: true, + botTokenSource: "config", + appTokenSource: "config", + config: { + replyToMode: "all", + thread: { initialHistoryLimit: 20 }, + }, + }; + + const message: SlackMessageEvent = { + channel: "C123", + channel_type: "channel", + user: "U1", + text: "current message", + ts: "101.000", + thread_ts: "100.000", + } as SlackMessageEvent; + + const prepared = await prepareSlackMessage({ + ctx: slackCtx, + account, + message, + opts: { source: "message" }, + }); + + expect(prepared).toBeTruthy(); + expect(prepared!.ctxPayload.IsFirstThreadTurn).toBe(true); + expect(prepared!.ctxPayload.ThreadHistoryBody).toContain("assistant reply"); + expect(prepared!.ctxPayload.ThreadHistoryBody).toContain("follow-up question"); + expect(prepared!.ctxPayload.ThreadHistoryBody).not.toContain("current message"); + expect(replies).toHaveBeenCalledTimes(2); + } finally { + fs.rmSync(tmpDir, { recursive: true, force: true }); + } + }); + + it("does not mark first thread turn when thread session already exists in store", async () => { + const tmpDir = fs.mkdtempSync(path.join(os.tmpdir(), "openclaw-slack-thread-")); + const storePath = path.join(tmpDir, "sessions.json"); + try { + const cfg = { + session: { store: storePath }, + channels: { slack: { enabled: true, replyToMode: "all", groupPolicy: "open" } }, + } as OpenClawConfig; + const route = resolveAgentRoute({ + cfg, + channel: "slack", + accountId: "default", + teamId: "T1", + peer: { kind: "channel", id: "C123" }, + }); + const threadKeys = resolveThreadSessionKeys({ + baseSessionKey: route.sessionKey, + threadId: "200.000", + }); + fs.writeFileSync( + storePath, + JSON.stringify({ [threadKeys.sessionKey]: { updatedAt: Date.now() } }, null, 2), + ); + + const replies = vi.fn().mockResolvedValue({ + messages: [{ text: "starter", user: "U2", ts: "200.000" }], + }); + const slackCtx = createSlackMonitorContext({ + cfg, + accountId: "default", + botToken: "token", + app: { client: { conversations: { replies } } } as App, + runtime: {} as RuntimeEnv, + botUserId: "B1", + teamId: "T1", + apiAppId: "A1", + historyLimit: 0, + sessionScope: "per-sender", + mainKey: "main", + dmEnabled: true, + dmPolicy: "open", + allowFrom: [], + groupDmEnabled: true, + groupDmChannels: [], + defaultRequireMention: false, + groupPolicy: "open", + useAccessGroups: false, + reactionMode: "off", + reactionAllowlist: [], + replyToMode: "all", + threadHistoryScope: "thread", + threadInheritParent: false, + slashCommand: { + enabled: false, + name: "openclaw", + sessionPrefix: "slack:slash", + ephemeral: true, + }, + textLimit: 4000, + ackReactionScope: "group-mentions", + mediaMaxBytes: 1024, + removeAckAfterReply: false, + }); + slackCtx.resolveUserName = async () => ({ name: "Alice" }); + slackCtx.resolveChannelName = async () => ({ name: "general", type: "channel" }); + + const account: ResolvedSlackAccount = { + accountId: "default", + enabled: true, + botTokenSource: "config", + appTokenSource: "config", + config: { + replyToMode: "all", + thread: { initialHistoryLimit: 20 }, + }, + }; + + const message: SlackMessageEvent = { + channel: "C123", + channel_type: "channel", + user: "U1", + text: "reply in old thread", + ts: "201.000", + thread_ts: "200.000", + } as SlackMessageEvent; + + const prepared = await prepareSlackMessage({ + ctx: slackCtx, + account, + message, + opts: { source: "message" }, + }); + + expect(prepared).toBeTruthy(); + expect(prepared!.ctxPayload.IsFirstThreadTurn).toBeUndefined(); + expect(prepared!.ctxPayload.ThreadHistoryBody).toBeUndefined(); + } finally { + fs.rmSync(tmpDir, { recursive: true, force: true }); + } + }); + it("includes thread_ts and parent_user_id metadata in thread replies", async () => { const slackCtx = createSlackMonitorContext({ cfg: { diff --git a/src/slack/monitor/message-handler/prepare.ts b/src/slack/monitor/message-handler/prepare.ts index 438b9862c5..55e5f2b08d 100644 --- a/src/slack/monitor/message-handler/prepare.ts +++ b/src/slack/monitor/message-handler/prepare.ts @@ -44,7 +44,11 @@ import { resolveSlackEffectiveAllowFrom } from "../auth.js"; import { resolveSlackChannelConfig } from "../channel-config.js"; import { stripSlackMentionsForCommandDetection } from "../commands.js"; import { normalizeSlackChannelType, type SlackMonitorContext } from "../context.js"; -import { resolveSlackMedia, resolveSlackThreadStarter } from "../media.js"; +import { + resolveSlackMedia, + resolveSlackThreadHistory, + resolveSlackThreadStarter, +} from "../media.js"; export async function prepareSlackMessage(params: { ctx: SlackMonitorContext; @@ -461,6 +465,8 @@ export async function prepareSlackMessage(params: { systemPromptParts.length > 0 ? systemPromptParts.join("\n\n") : undefined; let threadStarterBody: string | undefined; + let threadHistoryBody: string | undefined; + let threadSessionPreviousTimestamp: number | undefined; let threadLabel: string | undefined; let threadStarterMedia: Awaited> = null; if (isThreadReply && threadTs) { @@ -490,6 +496,64 @@ export async function prepareSlackMessage(params: { } else { threadLabel = `Slack thread ${roomLabel}`; } + + // Fetch full thread history for new thread sessions + // This provides context of previous messages (including bot replies) in the thread + // Use the thread session key (not base session key) to determine if this is a new session + const threadInitialHistoryLimit = account.config?.thread?.initialHistoryLimit ?? 20; + threadSessionPreviousTimestamp = readSessionUpdatedAt({ + storePath, + sessionKey, // Thread-specific session key + }); + if (threadInitialHistoryLimit > 0 && !threadSessionPreviousTimestamp) { + const threadHistory = await resolveSlackThreadHistory({ + channelId: message.channel, + threadTs, + client: ctx.app.client, + currentMessageTs: message.ts, + limit: threadInitialHistoryLimit, + }); + + if (threadHistory.length > 0) { + // Batch resolve user names to avoid N sequential API calls + const uniqueUserIds = [ + ...new Set(threadHistory.map((m) => m.userId).filter((id): id is string => Boolean(id))), + ]; + const userMap = new Map(); + await Promise.all( + uniqueUserIds.map(async (id) => { + const user = await ctx.resolveUserName(id); + if (user) { + userMap.set(id, user); + } + }), + ); + + const historyParts: string[] = []; + for (const historyMsg of threadHistory) { + const msgUser = historyMsg.userId ? userMap.get(historyMsg.userId) : null; + const msgSenderName = + msgUser?.name ?? (historyMsg.botId ? `Bot (${historyMsg.botId})` : "Unknown"); + const isBot = Boolean(historyMsg.botId); + const role = isBot ? "assistant" : "user"; + const msgWithId = `${historyMsg.text}\n[slack message id: ${historyMsg.ts ?? "unknown"} channel: ${message.channel}]`; + historyParts.push( + formatInboundEnvelope({ + channel: "Slack", + from: `${msgSenderName} (${role})`, + timestamp: historyMsg.ts ? Math.round(Number(historyMsg.ts) * 1000) : undefined, + body: msgWithId, + chatType: "channel", + envelope: envelopeOptions, + }), + ); + } + threadHistoryBody = historyParts.join("\n\n"); + logVerbose( + `slack: populated thread history with ${threadHistory.length} messages for new session`, + ); + } + } } // Use thread starter media if current message has none @@ -529,6 +593,9 @@ export async function prepareSlackMessage(params: { MessageThreadId: threadContext.messageThreadId, ParentSessionKey: threadKeys.parentSessionKey, ThreadStarterBody: threadStarterBody, + ThreadHistoryBody: threadHistoryBody, + IsFirstThreadTurn: + isThreadReply && threadTs && !threadSessionPreviousTimestamp ? true : undefined, ThreadLabel: threadLabel, Timestamp: message.ts ? Math.round(Number(message.ts) * 1000) : undefined, WasMentioned: isRoomish ? effectiveWasMentioned : undefined,