diff --git a/CHANGELOG.md b/CHANGELOG.md index e2191180ed..1db8fea7a9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,9 @@ Docs: https://docs.openclaw.ai - Agents/Subagents: add an accepted response note for `sessions_spawn` explaining polling subagents are disabled for one-off calls. Thanks @tyler6204. - Agents/Subagents: prefix spawned subagent task messages with context to preserve source information in downstream handling. Thanks @tyler6204. +- iMessage: support `replyToId` on outbound text/media sends and normalize leading `[[reply_to:]]` tags so replies target the intended iMessage. Thanks @tyler6204. +- UI/Sessions: avoid duplicating typed session prefixes in display names (for example `Subagent Subagent ...`). Thanks @tyler6204. +- Auto-reply/Prompts: include trusted inbound `message_id` in conversation metadata payloads for downstream targeting workflows. Thanks @tyler6204. - iOS/Talk: add a `Background Listening` toggle that keeps Talk Mode active while the app is backgrounded (off by default for battery safety). Thanks @zeulewan. - iOS/Talk: harden barge-in behavior by disabling interrupt-on-speech when output route is built-in speaker/receiver, reducing false interruptions from local TTS bleed-through. Thanks @zeulewan. - iOS/Talk: add a `Voice Directive Hint` toggle for Talk Mode prompts so users can disable ElevenLabs voice-switching instructions to save tokens when not needed. (#18250) Thanks @zeulewan. @@ -42,6 +45,12 @@ Docs: https://docs.openclaw.ai ### Fixes +- Agents/Subagents: preemptively guard accumulated tool-result context before model calls by truncating oversized outputs and compacting oldest tool-result messages to avoid context-window overflow crashes. Thanks @tyler6204. +- Agents/Subagents: add explicit subagent guidance to recover from `[compacted: tool output removed to free context]` / `[truncated: output exceeded context limit]` markers by re-reading with smaller chunks instead of full-file `cat`. Thanks @tyler6204. +- Agents/Tools: make `read` auto-page across chunks (when no explicit `limit` is provided) and scale its per-call output budget from model `contextWindow`, so larger contexts can read more before context guards kick in. Thanks @tyler6204. +- Agents/Tools: strip duplicated `read` truncation payloads from tool-result `details` and make pre-call context guarding account for heavy tool-result metadata, so repeated `read` calls no longer bypass compaction and overflow model context windows. Thanks @tyler6204. +- Reply threading: keep reply context sticky across streamed/split chunks and preserve `replyToId` on all chunk sends across shared and channel-specific delivery paths (including iMessage, BlueBubbles, Telegram, Discord, and Matrix), so follow-up bubbles stay attached to the same referenced message. Thanks @tyler6204. +- Gateway/Agent: defer transient lifecycle `error` snapshots with a short grace window so `agent.wait` does not resolve early during retry/failover. Thanks @tyler6204. - iOS/Onboarding: stop auth Step 3 retry-loop churn by pausing reconnect attempts on unauthorized/missing-token gateway errors and keeping auth/pairing issue state sticky during manual retry. (#19153) Thanks @mbelinky. - Voice-call: auto-end calls when media streams disconnect to prevent stuck active calls. (#18435) Thanks @JayMishra-source. - Voice call/Gateway: prevent overlapping closed-loop turn races with per-call turn locking, route transcript dedupe via source-aware fingerprints with strict cache eviction bounds, and harden `voicecall latency` stats for large logs without spread-operator stack overflow. (#19140) Thanks @mbelinky. diff --git a/extensions/imessage/src/channel.outbound.test.ts b/extensions/imessage/src/channel.outbound.test.ts new file mode 100644 index 0000000000..a2b5a3a435 --- /dev/null +++ b/extensions/imessage/src/channel.outbound.test.ts @@ -0,0 +1,66 @@ +import { describe, expect, it, vi } from "vitest"; +import { imessagePlugin } from "./channel.js"; + +describe("imessagePlugin outbound", () => { + const cfg = { + channels: { + imessage: { + mediaMaxMb: 3, + }, + }, + }; + + it("forwards replyToId on direct sendText adapter path", async () => { + const sendIMessage = vi.fn().mockResolvedValue({ messageId: "m-text" }); + const sendText = imessagePlugin.outbound?.sendText; + expect(sendText).toBeDefined(); + + const result = await sendText!({ + cfg, + to: "chat_id:12", + text: "hello", + accountId: "default", + replyToId: "reply-1", + deps: { sendIMessage }, + }); + + expect(sendIMessage).toHaveBeenCalledWith( + "chat_id:12", + "hello", + expect.objectContaining({ + accountId: "default", + replyToId: "reply-1", + maxBytes: 3 * 1024 * 1024, + }), + ); + expect(result).toEqual({ channel: "imessage", messageId: "m-text" }); + }); + + it("forwards replyToId on direct sendMedia adapter path", async () => { + const sendIMessage = vi.fn().mockResolvedValue({ messageId: "m-media" }); + const sendMedia = imessagePlugin.outbound?.sendMedia; + expect(sendMedia).toBeDefined(); + + const result = await sendMedia!({ + cfg, + to: "chat_id:77", + text: "caption", + mediaUrl: "https://example.com/pic.png", + accountId: "acct-1", + replyToId: "reply-2", + deps: { sendIMessage }, + }); + + expect(sendIMessage).toHaveBeenCalledWith( + "chat_id:77", + "caption", + expect.objectContaining({ + mediaUrl: "https://example.com/pic.png", + accountId: "acct-1", + replyToId: "reply-2", + maxBytes: 3 * 1024 * 1024, + }), + ); + expect(result).toEqual({ channel: "imessage", messageId: "m-media" }); + }); +}); diff --git a/extensions/imessage/src/channel.ts b/extensions/imessage/src/channel.ts index 3903226140..dd57a0b75b 100644 --- a/extensions/imessage/src/channel.ts +++ b/extensions/imessage/src/channel.ts @@ -183,7 +183,7 @@ export const imessagePlugin: ChannelPlugin = { chunker: (text, limit) => getIMessageRuntime().channel.text.chunkText(text, limit), chunkerMode: "text", textChunkLimit: 4000, - sendText: async ({ cfg, to, text, accountId, deps }) => { + sendText: async ({ cfg, to, text, accountId, deps, replyToId }) => { const send = deps?.sendIMessage ?? getIMessageRuntime().channel.imessage.sendMessageIMessage; const maxBytes = resolveChannelMediaMaxBytes({ cfg, @@ -195,10 +195,11 @@ export const imessagePlugin: ChannelPlugin = { const result = await send(to, text, { maxBytes, accountId: accountId ?? undefined, + replyToId: replyToId ?? undefined, }); return { channel: "imessage", ...result }; }, - sendMedia: async ({ cfg, to, text, mediaUrl, accountId, deps }) => { + sendMedia: async ({ cfg, to, text, mediaUrl, accountId, deps, replyToId }) => { const send = deps?.sendIMessage ?? getIMessageRuntime().channel.imessage.sendMessageIMessage; const maxBytes = resolveChannelMediaMaxBytes({ cfg, @@ -211,6 +212,7 @@ export const imessagePlugin: ChannelPlugin = { mediaUrl, maxBytes, accountId: accountId ?? undefined, + replyToId: replyToId ?? undefined, }); return { channel: "imessage", ...result }; }, diff --git a/extensions/matrix/src/matrix/monitor/replies.test.ts b/extensions/matrix/src/matrix/monitor/replies.test.ts new file mode 100644 index 0000000000..22e9b58561 --- /dev/null +++ b/extensions/matrix/src/matrix/monitor/replies.test.ts @@ -0,0 +1,131 @@ +import type { MatrixClient } from "@vector-im/matrix-bot-sdk"; +import type { PluginRuntime, RuntimeEnv } from "openclaw/plugin-sdk"; +import { beforeEach, describe, expect, it, vi } from "vitest"; + +const sendMessageMatrixMock = vi.hoisted(() => vi.fn().mockResolvedValue({ messageId: "mx-1" })); + +vi.mock("../send.js", () => ({ + sendMessageMatrix: (...args: unknown[]) => sendMessageMatrixMock(...args), +})); + +import { setMatrixRuntime } from "../../runtime.js"; +import { deliverMatrixReplies } from "./replies.js"; + +describe("deliverMatrixReplies", () => { + const loadConfigMock = vi.fn(() => ({})); + const resolveMarkdownTableModeMock = vi.fn(() => "code"); + const convertMarkdownTablesMock = vi.fn((text: string) => text); + const resolveChunkModeMock = vi.fn(() => "length"); + const chunkMarkdownTextWithModeMock = vi.fn((text: string) => [text]); + + const runtimeStub = { + config: { + loadConfig: (...args: unknown[]) => loadConfigMock(...args), + }, + channel: { + text: { + resolveMarkdownTableMode: (...args: unknown[]) => resolveMarkdownTableModeMock(...args), + convertMarkdownTables: (...args: unknown[]) => convertMarkdownTablesMock(...args), + resolveChunkMode: (...args: unknown[]) => resolveChunkModeMock(...args), + chunkMarkdownTextWithMode: (...args: unknown[]) => chunkMarkdownTextWithModeMock(...args), + }, + }, + logging: { + shouldLogVerbose: () => false, + }, + } as unknown as PluginRuntime; + + const runtimeEnv: RuntimeEnv = { + log: vi.fn(), + error: vi.fn(), + } as unknown as RuntimeEnv; + + beforeEach(() => { + vi.clearAllMocks(); + setMatrixRuntime(runtimeStub); + chunkMarkdownTextWithModeMock.mockImplementation((text: string) => [text]); + }); + + it("keeps replyToId on first reply only when replyToMode=first", async () => { + chunkMarkdownTextWithModeMock.mockImplementation((text: string) => text.split("|")); + + await deliverMatrixReplies({ + replies: [ + { text: "first-a|first-b", replyToId: "reply-1" }, + { text: "second", replyToId: "reply-2" }, + ], + roomId: "room:1", + client: {} as MatrixClient, + runtime: runtimeEnv, + textLimit: 4000, + replyToMode: "first", + }); + + expect(sendMessageMatrixMock).toHaveBeenCalledTimes(3); + expect(sendMessageMatrixMock.mock.calls[0]?.[2]).toEqual( + expect.objectContaining({ replyToId: "reply-1", threadId: undefined }), + ); + expect(sendMessageMatrixMock.mock.calls[1]?.[2]).toEqual( + expect.objectContaining({ replyToId: "reply-1", threadId: undefined }), + ); + expect(sendMessageMatrixMock.mock.calls[2]?.[2]).toEqual( + expect.objectContaining({ replyToId: undefined, threadId: undefined }), + ); + }); + + it("keeps replyToId on every reply when replyToMode=all", async () => { + await deliverMatrixReplies({ + replies: [ + { + text: "caption", + mediaUrls: ["https://example.com/a.jpg", "https://example.com/b.jpg"], + replyToId: "reply-media", + audioAsVoice: true, + }, + { text: "plain", replyToId: "reply-text" }, + ], + roomId: "room:2", + client: {} as MatrixClient, + runtime: runtimeEnv, + textLimit: 4000, + replyToMode: "all", + }); + + expect(sendMessageMatrixMock).toHaveBeenCalledTimes(3); + expect(sendMessageMatrixMock.mock.calls[0]).toEqual([ + "room:2", + "caption", + expect.objectContaining({ mediaUrl: "https://example.com/a.jpg", replyToId: "reply-media" }), + ]); + expect(sendMessageMatrixMock.mock.calls[1]).toEqual([ + "room:2", + "", + expect.objectContaining({ mediaUrl: "https://example.com/b.jpg", replyToId: "reply-media" }), + ]); + expect(sendMessageMatrixMock.mock.calls[2]?.[2]).toEqual( + expect.objectContaining({ replyToId: "reply-text" }), + ); + }); + + it("suppresses replyToId when threadId is set", async () => { + chunkMarkdownTextWithModeMock.mockImplementation((text: string) => text.split("|")); + + await deliverMatrixReplies({ + replies: [{ text: "hello|thread", replyToId: "reply-thread" }], + roomId: "room:3", + client: {} as MatrixClient, + runtime: runtimeEnv, + textLimit: 4000, + replyToMode: "all", + threadId: "thread-77", + }); + + expect(sendMessageMatrixMock).toHaveBeenCalledTimes(2); + expect(sendMessageMatrixMock.mock.calls[0]?.[2]).toEqual( + expect.objectContaining({ replyToId: undefined, threadId: "thread-77" }), + ); + expect(sendMessageMatrixMock.mock.calls[1]?.[2]).toEqual( + expect.objectContaining({ replyToId: undefined, threadId: "thread-77" }), + ); + }); +}); diff --git a/extensions/matrix/src/matrix/monitor/replies.ts b/extensions/matrix/src/matrix/monitor/replies.ts index 1193d59f80..643e95cd41 100644 --- a/extensions/matrix/src/matrix/monitor/replies.ts +++ b/extensions/matrix/src/matrix/monitor/replies.ts @@ -53,8 +53,10 @@ export async function deliverMatrixReplies(params: { const shouldIncludeReply = (id?: string) => Boolean(id) && (params.replyToMode === "all" || !hasReplied); + const replyToIdForReply = shouldIncludeReply(replyToId) ? replyToId : undefined; if (mediaList.length === 0) { + let sentTextChunk = false; for (const chunk of core.channel.text.chunkMarkdownTextWithMode( text, chunkLimit, @@ -66,13 +68,14 @@ export async function deliverMatrixReplies(params: { } await sendMessageMatrix(params.roomId, trimmed, { client: params.client, - replyToId: shouldIncludeReply(replyToId) ? replyToId : undefined, + replyToId: replyToIdForReply, threadId: params.threadId, accountId: params.accountId, }); - if (shouldIncludeReply(replyToId)) { - hasReplied = true; - } + sentTextChunk = true; + } + if (replyToIdForReply && !hasReplied && sentTextChunk) { + hasReplied = true; } continue; } @@ -83,15 +86,15 @@ export async function deliverMatrixReplies(params: { await sendMessageMatrix(params.roomId, caption, { client: params.client, mediaUrl, - replyToId: shouldIncludeReply(replyToId) ? replyToId : undefined, + replyToId: replyToIdForReply, threadId: params.threadId, audioAsVoice: reply.audioAsVoice, accountId: params.accountId, }); - if (shouldIncludeReply(replyToId)) { - hasReplied = true; - } first = false; } + if (replyToIdForReply && !hasReplied) { + hasReplied = true; + } } } diff --git a/git-hooks/pre-commit b/git-hooks/pre-commit index 919e8507bb..948f2087ad 100755 --- a/git-hooks/pre-commit +++ b/git-hooks/pre-commit @@ -18,14 +18,25 @@ fi # Security: avoid option-injection from malicious file names (e.g. "--all", "--force"). # Robustness: NUL-delimited file list handles spaces/newlines safely. -mapfile -d '' -t files < <(git diff --cached --name-only --diff-filter=ACMR -z) +# Compatibility: use read loops instead of `mapfile` so this runs on macOS Bash 3.x. +files=() +while IFS= read -r -d '' file; do + files+=("$file") +done < <(git diff --cached --name-only --diff-filter=ACMR -z) if [ "${#files[@]}" -eq 0 ]; then exit 0 fi -mapfile -d '' -t lint_files < <(node "$FILTER_FILES" lint -- "${files[@]}") -mapfile -d '' -t format_files < <(node "$FILTER_FILES" format -- "${files[@]}") +lint_files=() +while IFS= read -r -d '' file; do + lint_files+=("$file") +done < <(node "$FILTER_FILES" lint -- "${files[@]}") + +format_files=() +while IFS= read -r -d '' file; do + format_files+=("$file") +done < <(node "$FILTER_FILES" format -- "${files[@]}") if [ "${#lint_files[@]}" -gt 0 ]; then "$RUN_NODE_TOOL" oxlint --type-aware --fix -- "${lint_files[@]}" diff --git a/src/agents/pi-embedded-runner/compact.ts b/src/agents/pi-embedded-runner/compact.ts index 5b78971037..4197da59bf 100644 --- a/src/agents/pi-embedded-runner/compact.ts +++ b/src/agents/pi-embedded-runner/compact.ts @@ -381,6 +381,7 @@ export async function compactEmbeddedPiSessionDirect( abortSignal: runAbortController.signal, modelProvider: model.provider, modelId, + modelContextWindowTokens: model.contextWindow, modelAuthMode: resolveModelAuthMode(model.provider, params.config), }); const tools = sanitizeToolsForGoogle({ tools: toolsRaw, provider }); diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index 11bf3f5da3..3c41489896 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -30,6 +30,7 @@ import { listChannelSupportedActions, resolveChannelMessageToolHints, } from "../../channel-tools.js"; +import { DEFAULT_CONTEXT_TOKENS } from "../../defaults.js"; import { resolveOpenClawDocsPath } from "../../docs-path.js"; import { isTimeoutError } from "../../failover-error.js"; import { resolveModelAuthMode } from "../../model-auth.js"; @@ -95,6 +96,7 @@ import { buildEmbeddedSystemPrompt, createSystemPromptOverride, } from "../system-prompt.js"; +import { installToolResultContextGuard } from "../tool-result-context-guard.js"; import { splitSdkTools } from "../tool-split.js"; import { describeUnknownError, mapThinkingLevel } from "../utils.js"; import { flushPendingToolResultsAfterIdle } from "../wait-for-idle-before-flush.js"; @@ -313,6 +315,7 @@ export async function runEmbeddedAttempt( abortSignal: runAbortController.signal, modelProvider: params.model.provider, modelId: params.modelId, + modelContextWindowTokens: params.model.contextWindow, modelAuthMode: resolveModelAuthMode(params.model.provider, params.config), currentChannelId: params.currentChannelId, currentThreadTs: params.currentThreadTs, @@ -492,6 +495,7 @@ export async function runEmbeddedAttempt( let sessionManager: ReturnType | undefined; let session: Awaited>["session"] | undefined; + let removeToolResultContextGuard: (() => void) | undefined; try { await repairSessionFileIfNeeded({ sessionFile: params.sessionFile, @@ -587,6 +591,15 @@ export async function runEmbeddedAttempt( throw new Error("Embedded agent session missing"); } const activeSession = session; + removeToolResultContextGuard = installToolResultContextGuard({ + agent: activeSession.agent, + contextWindowTokens: Math.max( + 1, + Math.floor( + params.model.contextWindow ?? params.model.maxTokens ?? DEFAULT_CONTEXT_TOKENS, + ), + ), + }); const cacheTrace = createCacheTrace({ cfg: params.config, env: process.env, @@ -1251,6 +1264,7 @@ export async function runEmbeddedAttempt( // flushPendingToolResults() fires while tools are still executing, inserting // synthetic "missing tool result" errors and causing silent agent failures. // See: https://github.com/openclaw/openclaw/issues/8643 + removeToolResultContextGuard?.(); await flushPendingToolResultsAfterIdle({ agent: session?.agent, sessionManager, diff --git a/src/agents/pi-embedded-runner/tool-result-context-guard.e2e.test.ts b/src/agents/pi-embedded-runner/tool-result-context-guard.e2e.test.ts new file mode 100644 index 0000000000..6b614b4726 --- /dev/null +++ b/src/agents/pi-embedded-runner/tool-result-context-guard.e2e.test.ts @@ -0,0 +1,279 @@ +import type { AgentMessage } from "@mariozechner/pi-agent-core"; +import { describe, expect, it } from "vitest"; +import { + CONTEXT_LIMIT_TRUNCATION_NOTICE, + PREEMPTIVE_TOOL_RESULT_COMPACTION_PLACEHOLDER, + installToolResultContextGuard, +} from "./tool-result-context-guard.js"; + +function makeUser(text: string): AgentMessage { + return { + role: "user", + content: text, + timestamp: Date.now(), + } as unknown as AgentMessage; +} + +function makeToolResult(id: string, text: string): AgentMessage { + return { + role: "toolResult", + toolCallId: id, + toolName: "read", + content: [{ type: "text", text }], + isError: false, + timestamp: Date.now(), + } as unknown as AgentMessage; +} + +function makeLegacyToolResult(id: string, text: string): AgentMessage { + return { + role: "tool", + tool_call_id: id, + tool_name: "read", + content: text, + } as unknown as AgentMessage; +} + +function makeToolResultWithDetails(id: string, text: string, detailText: string): AgentMessage { + return { + role: "toolResult", + toolCallId: id, + toolName: "read", + content: [{ type: "text", text }], + details: { + truncation: { + truncated: true, + outputLines: 100, + content: detailText, + }, + }, + isError: false, + timestamp: Date.now(), + } as unknown as AgentMessage; +} + +function getToolResultText(msg: AgentMessage): string { + const content = (msg as { content?: unknown }).content; + if (!Array.isArray(content)) { + return ""; + } + const block = content.find( + (entry) => entry && typeof entry === "object" && (entry as { type?: string }).type === "text", + ) as { text?: string } | undefined; + return typeof block?.text === "string" ? block.text : ""; +} + +function makeGuardableAgent( + transformContext?: ( + messages: AgentMessage[], + signal: AbortSignal, + ) => AgentMessage[] | Promise, +) { + return { transformContext }; +} + +describe("installToolResultContextGuard", () => { + it("compacts oldest-first when total context overflows, even if each result fits individually", async () => { + const agent = makeGuardableAgent(); + + installToolResultContextGuard({ + agent, + contextWindowTokens: 1_000, + }); + + const contextForNextCall = [ + makeUser("u".repeat(2_000)), + makeToolResult("call_old", "x".repeat(1_000)), + makeToolResult("call_new", "y".repeat(1_000)), + ]; + + const transformed = await agent.transformContext?.( + contextForNextCall, + new AbortController().signal, + ); + + expect(transformed).toBe(contextForNextCall); + const oldResultText = getToolResultText(contextForNextCall[1]); + const newResultText = getToolResultText(contextForNextCall[2]); + + expect(oldResultText).toBe(PREEMPTIVE_TOOL_RESULT_COMPACTION_PLACEHOLDER); + expect(newResultText).toBe(PREEMPTIVE_TOOL_RESULT_COMPACTION_PLACEHOLDER); + expect(newResultText).not.toContain(CONTEXT_LIMIT_TRUNCATION_NOTICE); + }); + + it("keeps compacting oldest-first until context is back under budget", async () => { + const agent = makeGuardableAgent(); + + installToolResultContextGuard({ + agent, + contextWindowTokens: 1_000, + }); + + const contextForNextCall = [ + makeUser("u".repeat(2_200)), + makeToolResult("call_1", "a".repeat(800)), + makeToolResult("call_2", "b".repeat(800)), + makeToolResult("call_3", "c".repeat(800)), + ]; + + await agent.transformContext?.(contextForNextCall, new AbortController().signal); + + const first = getToolResultText(contextForNextCall[1]); + const second = getToolResultText(contextForNextCall[2]); + const third = getToolResultText(contextForNextCall[3]); + + expect(first).toBe(PREEMPTIVE_TOOL_RESULT_COMPACTION_PLACEHOLDER); + expect(second).toBe(PREEMPTIVE_TOOL_RESULT_COMPACTION_PLACEHOLDER); + expect(third).toBe(PREEMPTIVE_TOOL_RESULT_COMPACTION_PLACEHOLDER); + }); + + it("survives repeated large tool results by compacting older outputs before later turns", async () => { + const agent = makeGuardableAgent(); + + installToolResultContextGuard({ + agent, + contextWindowTokens: 100_000, + }); + + const contextForNextCall: AgentMessage[] = [makeUser("stress")]; + for (let i = 1; i <= 4; i++) { + contextForNextCall.push(makeToolResult(`call_${i}`, String(i).repeat(95_000))); + await agent.transformContext?.(contextForNextCall, new AbortController().signal); + } + + const toolResultTexts = contextForNextCall + .filter((msg) => msg.role === "toolResult") + .map((msg) => getToolResultText(msg as AgentMessage)); + + expect(toolResultTexts[0]).toBe(PREEMPTIVE_TOOL_RESULT_COMPACTION_PLACEHOLDER); + expect(toolResultTexts[3]?.length).toBe(95_000); + expect(toolResultTexts.join("\n")).not.toContain(CONTEXT_LIMIT_TRUNCATION_NOTICE); + }); + + it("truncates an individually oversized tool result with a context-limit notice", async () => { + const agent = makeGuardableAgent(); + + installToolResultContextGuard({ + agent, + contextWindowTokens: 1_000, + }); + + const contextForNextCall = [makeToolResult("call_big", "z".repeat(5_000))]; + + await agent.transformContext?.(contextForNextCall, new AbortController().signal); + + const newResultText = getToolResultText(contextForNextCall[0]); + expect(newResultText.length).toBeLessThan(5_000); + expect(newResultText).toContain(CONTEXT_LIMIT_TRUNCATION_NOTICE); + }); + + it("keeps compacting oldest-first until overflow clears, including the newest tool result when needed", async () => { + const agent = makeGuardableAgent(); + + installToolResultContextGuard({ + agent, + contextWindowTokens: 1_000, + }); + + const contextForNextCall = [ + makeUser("u".repeat(2_600)), + makeToolResult("call_old", "x".repeat(700)), + makeToolResult("call_new", "y".repeat(1_000)), + ]; + + await agent.transformContext?.(contextForNextCall, new AbortController().signal); + + const oldResultText = getToolResultText(contextForNextCall[1]); + const newResultText = getToolResultText(contextForNextCall[2]); + + expect(oldResultText).toBe(PREEMPTIVE_TOOL_RESULT_COMPACTION_PLACEHOLDER); + expect(newResultText).toBe(PREEMPTIVE_TOOL_RESULT_COMPACTION_PLACEHOLDER); + expect(newResultText).not.toContain(CONTEXT_LIMIT_TRUNCATION_NOTICE); + }); + + it("wraps an existing transformContext and guards the transformed output", async () => { + const agent = makeGuardableAgent((messages) => { + return messages.map( + (msg) => + ({ + ...(msg as unknown as Record), + }) as unknown as AgentMessage, + ); + }); + + installToolResultContextGuard({ + agent, + contextWindowTokens: 1_000, + }); + + const contextForNextCall = [ + makeUser("u".repeat(2_000)), + makeToolResult("call_old", "x".repeat(1_000)), + makeToolResult("call_new", "y".repeat(1_000)), + ]; + + const transformed = await agent.transformContext?.( + contextForNextCall, + new AbortController().signal, + ); + + expect(transformed).not.toBe(contextForNextCall); + const transformedMessages = transformed as AgentMessage[]; + const oldResultText = getToolResultText(transformedMessages[1]); + expect(oldResultText).toBe(PREEMPTIVE_TOOL_RESULT_COMPACTION_PLACEHOLDER); + }); + + it("handles legacy role=tool string outputs when enforcing context budget", async () => { + const agent = makeGuardableAgent(); + + installToolResultContextGuard({ + agent, + contextWindowTokens: 1_000, + }); + + const contextForNextCall = [ + makeUser("u".repeat(2_000)), + makeLegacyToolResult("call_old", "x".repeat(1_000)), + makeLegacyToolResult("call_new", "y".repeat(1_000)), + ]; + + await agent.transformContext?.(contextForNextCall, new AbortController().signal); + + const oldResultText = (contextForNextCall[1] as { content?: unknown }).content; + const newResultText = (contextForNextCall[2] as { content?: unknown }).content; + + expect(oldResultText).toBe(PREEMPTIVE_TOOL_RESULT_COMPACTION_PLACEHOLDER); + expect(newResultText).toBe(PREEMPTIVE_TOOL_RESULT_COMPACTION_PLACEHOLDER); + }); + + it("drops oversized read-tool details payloads when compacting tool results", async () => { + const agent = makeGuardableAgent(); + + installToolResultContextGuard({ + agent, + contextWindowTokens: 1_000, + }); + + const contextForNextCall = [ + makeUser("u".repeat(1_600)), + makeToolResultWithDetails("call_old", "x".repeat(900), "d".repeat(8_000)), + makeToolResultWithDetails("call_new", "y".repeat(900), "d".repeat(8_000)), + ]; + + await agent.transformContext?.(contextForNextCall, new AbortController().signal); + + const oldResult = contextForNextCall[1] as unknown as { + details?: unknown; + }; + const newResult = contextForNextCall[2] as unknown as { + details?: unknown; + }; + const oldResultText = getToolResultText(contextForNextCall[1]); + const newResultText = getToolResultText(contextForNextCall[2]); + + expect(oldResultText).toBe(PREEMPTIVE_TOOL_RESULT_COMPACTION_PLACEHOLDER); + expect(newResultText).toBe(PREEMPTIVE_TOOL_RESULT_COMPACTION_PLACEHOLDER); + expect(oldResult.details).toBeUndefined(); + expect(newResult.details).toBeUndefined(); + }); +}); diff --git a/src/agents/pi-embedded-runner/tool-result-context-guard.ts b/src/agents/pi-embedded-runner/tool-result-context-guard.ts new file mode 100644 index 0000000000..2cc8d1baca --- /dev/null +++ b/src/agents/pi-embedded-runner/tool-result-context-guard.ts @@ -0,0 +1,336 @@ +import type { AgentMessage } from "@mariozechner/pi-agent-core"; + +const CHARS_PER_TOKEN_ESTIMATE = 4; +// Keep a conservative input budget to absorb tokenizer variance and provider framing overhead. +const CONTEXT_INPUT_HEADROOM_RATIO = 0.75; +const SINGLE_TOOL_RESULT_CONTEXT_SHARE = 0.5; +const TOOL_RESULT_CHARS_PER_TOKEN_ESTIMATE = 2; +const IMAGE_CHAR_ESTIMATE = 8_000; + +export const CONTEXT_LIMIT_TRUNCATION_NOTICE = "[truncated: output exceeded context limit]"; +const CONTEXT_LIMIT_TRUNCATION_SUFFIX = `\n${CONTEXT_LIMIT_TRUNCATION_NOTICE}`; + +export const PREEMPTIVE_TOOL_RESULT_COMPACTION_PLACEHOLDER = + "[compacted: tool output removed to free context]"; + +type GuardableTransformContext = ( + messages: AgentMessage[], + signal: AbortSignal, +) => AgentMessage[] | Promise; + +type GuardableAgent = object; + +type GuardableAgentRecord = { + transformContext?: GuardableTransformContext; +}; + +function isTextBlock(block: unknown): block is { type: "text"; text: string } { + return !!block && typeof block === "object" && (block as { type?: unknown }).type === "text"; +} + +function isImageBlock(block: unknown): boolean { + return !!block && typeof block === "object" && (block as { type?: unknown }).type === "image"; +} + +function estimateUnknownChars(value: unknown): number { + if (typeof value === "string") { + return value.length; + } + if (value === undefined) { + return 0; + } + try { + const serialized = JSON.stringify(value); + return typeof serialized === "string" ? serialized.length : 0; + } catch { + return 256; + } +} + +function isToolResultMessage(msg: AgentMessage): boolean { + const role = (msg as { role?: unknown }).role; + const type = (msg as { type?: unknown }).type; + return role === "toolResult" || role === "tool" || type === "toolResult"; +} + +function getToolResultContent(msg: AgentMessage): unknown[] { + if (!isToolResultMessage(msg)) { + return []; + } + const content = (msg as { content?: unknown }).content; + if (typeof content === "string") { + return [{ type: "text", text: content }]; + } + return Array.isArray(content) ? content : []; +} + +function getToolResultText(msg: AgentMessage): string { + const content = getToolResultContent(msg); + const chunks: string[] = []; + for (const block of content) { + if (isTextBlock(block)) { + chunks.push(block.text); + } + } + return chunks.join("\n"); +} + +function estimateMessageChars(msg: AgentMessage): number { + if (!msg || typeof msg !== "object") { + return 0; + } + + if (msg.role === "user") { + const content = msg.content; + if (typeof content === "string") { + return content.length; + } + let chars = 0; + if (Array.isArray(content)) { + for (const block of content) { + if (isTextBlock(block)) { + chars += block.text.length; + } else if (isImageBlock(block)) { + chars += IMAGE_CHAR_ESTIMATE; + } else { + chars += estimateUnknownChars(block); + } + } + } + return chars; + } + + if (msg.role === "assistant") { + let chars = 0; + const content = (msg as { content?: unknown }).content; + if (Array.isArray(content)) { + for (const block of content) { + if (!block || typeof block !== "object") { + continue; + } + const typed = block as { + type?: unknown; + text?: unknown; + thinking?: unknown; + arguments?: unknown; + }; + if (typed.type === "text" && typeof typed.text === "string") { + chars += typed.text.length; + } else if (typed.type === "thinking" && typeof typed.thinking === "string") { + chars += typed.thinking.length; + } else if (typed.type === "toolCall") { + try { + chars += JSON.stringify(typed.arguments ?? {}).length; + } catch { + chars += 128; + } + } else { + chars += estimateUnknownChars(block); + } + } + } + return chars; + } + + if (isToolResultMessage(msg)) { + let chars = 0; + const content = getToolResultContent(msg); + for (const block of content) { + if (isTextBlock(block)) { + chars += block.text.length; + } else if (isImageBlock(block)) { + chars += IMAGE_CHAR_ESTIMATE; + } else { + chars += estimateUnknownChars(block); + } + } + const details = (msg as { details?: unknown }).details; + chars += estimateUnknownChars(details); + const weightedChars = Math.ceil( + chars * (CHARS_PER_TOKEN_ESTIMATE / TOOL_RESULT_CHARS_PER_TOKEN_ESTIMATE), + ); + return Math.max(chars, weightedChars); + } + + return 256; +} + +function estimateContextChars(messages: AgentMessage[]): number { + return messages.reduce((sum, msg) => sum + estimateMessageChars(msg), 0); +} + +function truncateTextToBudget(text: string, maxChars: number): string { + if (text.length <= maxChars) { + return text; + } + + if (maxChars <= 0) { + return CONTEXT_LIMIT_TRUNCATION_NOTICE; + } + + const bodyBudget = Math.max(0, maxChars - CONTEXT_LIMIT_TRUNCATION_SUFFIX.length); + if (bodyBudget <= 0) { + return CONTEXT_LIMIT_TRUNCATION_NOTICE; + } + + let cutPoint = bodyBudget; + const newline = text.lastIndexOf("\n", bodyBudget); + if (newline > bodyBudget * 0.7) { + cutPoint = newline; + } + + return text.slice(0, cutPoint) + CONTEXT_LIMIT_TRUNCATION_SUFFIX; +} + +function replaceToolResultText(msg: AgentMessage, text: string): AgentMessage { + const content = (msg as { content?: unknown }).content; + const replacementContent = + typeof content === "string" || content === undefined ? text : [{ type: "text", text }]; + + const sourceRecord = msg as unknown as Record; + const { details: _details, ...rest } = sourceRecord; + return { + ...rest, + content: replacementContent, + } as AgentMessage; +} + +function truncateToolResultToChars(msg: AgentMessage, maxChars: number): AgentMessage { + if (!isToolResultMessage(msg)) { + return msg; + } + + const estimatedChars = estimateMessageChars(msg); + if (estimatedChars <= maxChars) { + return msg; + } + + const rawText = getToolResultText(msg); + if (!rawText) { + return replaceToolResultText(msg, CONTEXT_LIMIT_TRUNCATION_NOTICE); + } + + const truncatedText = truncateTextToBudget(rawText, maxChars); + return replaceToolResultText(msg, truncatedText); +} + +function compactExistingToolResultsInPlace(params: { + messages: AgentMessage[]; + charsNeeded: number; +}): number { + const { messages, charsNeeded } = params; + if (charsNeeded <= 0) { + return 0; + } + + let reduced = 0; + for (let i = 0; i < messages.length; i++) { + const msg = messages[i]; + if (!isToolResultMessage(msg)) { + continue; + } + + const before = estimateMessageChars(msg); + if (before <= PREEMPTIVE_TOOL_RESULT_COMPACTION_PLACEHOLDER.length) { + continue; + } + + const compacted = replaceToolResultText(msg, PREEMPTIVE_TOOL_RESULT_COMPACTION_PLACEHOLDER); + applyMessageMutationInPlace(msg, compacted); + const after = estimateMessageChars(msg); + if (after >= before) { + continue; + } + + reduced += before - after; + if (reduced >= charsNeeded) { + break; + } + } + + return reduced; +} + +function applyMessageMutationInPlace(target: AgentMessage, source: AgentMessage): void { + if (target === source) { + return; + } + + const targetRecord = target as unknown as Record; + const sourceRecord = source as unknown as Record; + for (const key of Object.keys(targetRecord)) { + if (!(key in sourceRecord)) { + delete targetRecord[key]; + } + } + Object.assign(targetRecord, sourceRecord); +} + +function enforceToolResultContextBudgetInPlace(params: { + messages: AgentMessage[]; + contextBudgetChars: number; + maxSingleToolResultChars: number; +}): void { + const { messages, contextBudgetChars, maxSingleToolResultChars } = params; + + // Ensure each tool result has an upper bound before considering total context usage. + for (const message of messages) { + if (!isToolResultMessage(message)) { + continue; + } + const truncated = truncateToolResultToChars(message, maxSingleToolResultChars); + applyMessageMutationInPlace(message, truncated); + } + + let currentChars = estimateContextChars(messages); + if (currentChars <= contextBudgetChars) { + return; + } + + // Compact oldest tool outputs first until the context is back under budget. + compactExistingToolResultsInPlace({ + messages, + charsNeeded: currentChars - contextBudgetChars, + }); +} + +export function installToolResultContextGuard(params: { + agent: GuardableAgent; + contextWindowTokens: number; +}): () => void { + const contextWindowTokens = Math.max(1, Math.floor(params.contextWindowTokens)); + const contextBudgetChars = Math.max( + 1_024, + Math.floor(contextWindowTokens * CHARS_PER_TOKEN_ESTIMATE * CONTEXT_INPUT_HEADROOM_RATIO), + ); + const maxSingleToolResultChars = Math.max( + 1_024, + Math.floor( + contextWindowTokens * TOOL_RESULT_CHARS_PER_TOKEN_ESTIMATE * SINGLE_TOOL_RESULT_CONTEXT_SHARE, + ), + ); + + // Agent.transformContext is private in pi-coding-agent, so access it via a + // narrow runtime view to keep callsites type-safe while preserving behavior. + const mutableAgent = params.agent as GuardableAgentRecord; + const originalTransformContext = mutableAgent.transformContext; + + mutableAgent.transformContext = (async (messages: AgentMessage[], signal: AbortSignal) => { + const transformed = originalTransformContext + ? await originalTransformContext.call(mutableAgent, messages, signal) + : messages; + + const contextMessages = Array.isArray(transformed) ? transformed : messages; + enforceToolResultContextBudgetInPlace({ + messages: contextMessages, + contextBudgetChars, + maxSingleToolResultChars, + }); + + return contextMessages; + }) as GuardableTransformContext; + + return () => { + mutableAgent.transformContext = originalTransformContext; + }; +} diff --git a/src/agents/pi-tools.create-openclaw-coding-tools.adds-claude-style-aliases-schemas-without-dropping.e2e.test.ts b/src/agents/pi-tools.create-openclaw-coding-tools.adds-claude-style-aliases-schemas-without-dropping.e2e.test.ts index baa05b7f60..a352dcbe33 100644 --- a/src/agents/pi-tools.create-openclaw-coding-tools.adds-claude-style-aliases-schemas-without-dropping.e2e.test.ts +++ b/src/agents/pi-tools.create-openclaw-coding-tools.adds-claude-style-aliases-schemas-without-dropping.e2e.test.ts @@ -7,7 +7,7 @@ import { describe, expect, it, vi } from "vitest"; import "./test-helpers/fast-coding-tools.js"; import { createOpenClawTools } from "./openclaw-tools.js"; import { __testing, createOpenClawCodingTools } from "./pi-tools.js"; -import { createSandboxedReadTool } from "./pi-tools.read.js"; +import { createOpenClawReadTool, createSandboxedReadTool } from "./pi-tools.read.js"; import { createHostSandboxFsBridge } from "./test-helpers/host-sandbox-fs-bridge.js"; import { createBrowserTool } from "./tools/browser-tool.js"; @@ -58,6 +58,25 @@ function findUnionKeywordOffenders( return offenders; } +function extractToolText(result: unknown): string { + if (!result || typeof result !== "object") { + return ""; + } + const content = (result as { content?: unknown }).content; + if (!Array.isArray(content)) { + return ""; + } + const textBlock = content.find((block) => { + return ( + block && + typeof block === "object" && + (block as { type?: unknown }).type === "text" && + typeof (block as { text?: unknown }).text === "string" + ); + }) as { text?: string } | undefined; + return textBlock?.text ?? ""; +} + describe("createOpenClawCodingTools", () => { describe("Claude/Gemini alias support", () => { it("adds Claude-style aliases to schemas without dropping metadata", () => { @@ -508,4 +527,89 @@ describe("createOpenClawCodingTools", () => { await fs.rm(tmpDir, { recursive: true, force: true }); } }); + + it("auto-pages read output across chunks when context window budget allows", async () => { + const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-read-autopage-")); + const filePath = path.join(tmpDir, "big.txt"); + const lines = Array.from( + { length: 5000 }, + (_unused, i) => `line-${String(i + 1).padStart(4, "0")}`, + ); + await fs.writeFile(filePath, lines.join("\n"), "utf8"); + try { + const readTool = createSandboxedReadTool({ + root: tmpDir, + bridge: createHostSandboxFsBridge(tmpDir), + modelContextWindowTokens: 200_000, + }); + const result = await readTool.execute("read-autopage-1", { path: "big.txt" }); + const text = extractToolText(result); + expect(text).toContain("line-0001"); + expect(text).toContain("line-5000"); + expect(text).not.toContain("Read output capped at"); + expect(text).not.toMatch(/Use offset=\d+ to continue\.\]$/); + } finally { + await fs.rm(tmpDir, { recursive: true, force: true }); + } + }); + + it("adds capped continuation guidance when aggregated read output reaches budget", async () => { + const tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-read-cap-")); + const filePath = path.join(tmpDir, "huge.txt"); + const lines = Array.from( + { length: 8000 }, + (_unused, i) => `line-${String(i + 1).padStart(4, "0")}-abcdefghijklmnopqrstuvwxyz`, + ); + await fs.writeFile(filePath, lines.join("\n"), "utf8"); + try { + const readTool = createSandboxedReadTool({ + root: tmpDir, + bridge: createHostSandboxFsBridge(tmpDir), + }); + const result = await readTool.execute("read-cap-1", { path: "huge.txt" }); + const text = extractToolText(result); + expect(text).toContain("line-0001"); + expect(text).toContain("[Read output capped at 50KB for this call. Use offset="); + expect(text).not.toContain("line-8000"); + } finally { + await fs.rm(tmpDir, { recursive: true, force: true }); + } + }); + + it("strips truncation.content details from read results while preserving other fields", async () => { + const baseRead: AgentTool = { + name: "read", + label: "read", + description: "test read", + parameters: Type.Object({ + path: Type.String(), + offset: Type.Optional(Type.Number()), + limit: Type.Optional(Type.Number()), + }), + execute: vi.fn(async () => ({ + content: [{ type: "text", text: "line-0001" }], + details: { + truncation: { + truncated: true, + outputLines: 1, + firstLineExceedsLimit: false, + content: "hidden duplicate payload", + }, + }, + })), + }; + + const wrapped = createOpenClawReadTool( + baseRead as unknown as Parameters[0], + ); + const result = await wrapped.execute("read-strip-1", { path: "demo.txt", limit: 1 }); + + const details = (result as { details?: { truncation?: Record } }).details; + expect(details?.truncation).toMatchObject({ + truncated: true, + outputLines: 1, + firstLineExceedsLimit: false, + }); + expect(details?.truncation).not.toHaveProperty("content"); + }); }); diff --git a/src/agents/pi-tools.read.ts b/src/agents/pi-tools.read.ts index f35a75a56d..f621ba52b4 100644 --- a/src/agents/pi-tools.read.ts +++ b/src/agents/pi-tools.read.ts @@ -13,6 +13,247 @@ type ToolContentBlock = AgentToolResult["content"][number]; type ImageContentBlock = Extract; type TextContentBlock = Extract; +const DEFAULT_READ_PAGE_MAX_BYTES = 50 * 1024; +const MAX_ADAPTIVE_READ_MAX_BYTES = 512 * 1024; +const ADAPTIVE_READ_CONTEXT_SHARE = 0.2; +const CHARS_PER_TOKEN_ESTIMATE = 4; +const MAX_ADAPTIVE_READ_PAGES = 8; + +type OpenClawReadToolOptions = { + modelContextWindowTokens?: number; +}; + +type ReadTruncationDetails = { + truncated: boolean; + outputLines: number; + firstLineExceedsLimit: boolean; +}; + +const READ_CONTINUATION_NOTICE_RE = + /\n\n\[(?:Showing lines [^\]]*?Use offset=\d+ to continue\.|\d+ more lines in file\. Use offset=\d+ to continue\.)\]\s*$/; + +function clamp(value: number, min: number, max: number): number { + return Math.max(min, Math.min(max, value)); +} + +function resolveAdaptiveReadMaxBytes(options?: OpenClawReadToolOptions): number { + const contextWindowTokens = options?.modelContextWindowTokens; + if ( + typeof contextWindowTokens !== "number" || + !Number.isFinite(contextWindowTokens) || + contextWindowTokens <= 0 + ) { + return DEFAULT_READ_PAGE_MAX_BYTES; + } + const fromContext = Math.floor( + contextWindowTokens * CHARS_PER_TOKEN_ESTIMATE * ADAPTIVE_READ_CONTEXT_SHARE, + ); + return clamp(fromContext, DEFAULT_READ_PAGE_MAX_BYTES, MAX_ADAPTIVE_READ_MAX_BYTES); +} + +function formatBytes(bytes: number): string { + if (bytes >= 1024 * 1024) { + return `${(bytes / (1024 * 1024)).toFixed(1)}MB`; + } + if (bytes >= 1024) { + return `${Math.round(bytes / 1024)}KB`; + } + return `${bytes}B`; +} + +function getToolResultText(result: AgentToolResult): string | undefined { + const content = Array.isArray(result.content) ? result.content : []; + const textBlocks = content + .map((block) => { + if ( + block && + typeof block === "object" && + (block as { type?: unknown }).type === "text" && + typeof (block as { text?: unknown }).text === "string" + ) { + return (block as { text: string }).text; + } + return undefined; + }) + .filter((value): value is string => typeof value === "string"); + if (textBlocks.length === 0) { + return undefined; + } + return textBlocks.join("\n"); +} + +function withToolResultText( + result: AgentToolResult, + text: string, +): AgentToolResult { + const content = Array.isArray(result.content) ? result.content : []; + let replaced = false; + const nextContent: ToolContentBlock[] = content.map((block) => { + if ( + !replaced && + block && + typeof block === "object" && + (block as { type?: unknown }).type === "text" + ) { + replaced = true; + return { + ...(block as TextContentBlock), + text, + }; + } + return block; + }); + if (replaced) { + return { + ...result, + content: nextContent as unknown as AgentToolResult["content"], + }; + } + const textBlock = { type: "text", text } as unknown as TextContentBlock; + return { + ...result, + content: [textBlock] as unknown as AgentToolResult["content"], + }; +} + +function extractReadTruncationDetails( + result: AgentToolResult, +): ReadTruncationDetails | null { + const details = (result as { details?: unknown }).details; + if (!details || typeof details !== "object") { + return null; + } + const truncation = (details as { truncation?: unknown }).truncation; + if (!truncation || typeof truncation !== "object") { + return null; + } + const record = truncation as Record; + if (record.truncated !== true) { + return null; + } + const outputLinesRaw = record.outputLines; + const outputLines = + typeof outputLinesRaw === "number" && Number.isFinite(outputLinesRaw) + ? Math.max(0, Math.floor(outputLinesRaw)) + : 0; + return { + truncated: true, + outputLines, + firstLineExceedsLimit: record.firstLineExceedsLimit === true, + }; +} + +function stripReadContinuationNotice(text: string): string { + return text.replace(READ_CONTINUATION_NOTICE_RE, ""); +} + +function stripReadTruncationContentDetails( + result: AgentToolResult, +): AgentToolResult { + const details = (result as { details?: unknown }).details; + if (!details || typeof details !== "object") { + return result; + } + + const detailsRecord = details as Record; + const truncationRaw = detailsRecord.truncation; + if (!truncationRaw || typeof truncationRaw !== "object") { + return result; + } + + const truncation = truncationRaw as Record; + if (!Object.prototype.hasOwnProperty.call(truncation, "content")) { + return result; + } + + const { content: _content, ...restTruncation } = truncation; + return { + ...result, + details: { + ...detailsRecord, + truncation: restTruncation, + }, + }; +} + +async function executeReadWithAdaptivePaging(params: { + base: AnyAgentTool; + toolCallId: string; + args: Record; + signal?: AbortSignal; + maxBytes: number; +}): Promise> { + const userLimit = params.args.limit; + const hasExplicitLimit = + typeof userLimit === "number" && Number.isFinite(userLimit) && userLimit > 0; + if (hasExplicitLimit) { + return await params.base.execute(params.toolCallId, params.args, params.signal); + } + + const offsetRaw = params.args.offset; + let nextOffset = + typeof offsetRaw === "number" && Number.isFinite(offsetRaw) && offsetRaw > 0 + ? Math.floor(offsetRaw) + : 1; + let firstResult: AgentToolResult | null = null; + let aggregatedText = ""; + let aggregatedBytes = 0; + let capped = false; + let continuationOffset: number | undefined; + + for (let page = 0; page < MAX_ADAPTIVE_READ_PAGES; page += 1) { + const pageArgs = { ...params.args, offset: nextOffset }; + const pageResult = await params.base.execute(params.toolCallId, pageArgs, params.signal); + firstResult ??= pageResult; + + const rawText = getToolResultText(pageResult); + if (typeof rawText !== "string") { + return pageResult; + } + + const truncation = extractReadTruncationDetails(pageResult); + const canContinue = + Boolean(truncation?.truncated) && + !truncation?.firstLineExceedsLimit && + (truncation?.outputLines ?? 0) > 0 && + page < MAX_ADAPTIVE_READ_PAGES - 1; + const pageText = canContinue ? stripReadContinuationNotice(rawText) : rawText; + const delimiter = aggregatedText ? "\n\n" : ""; + const nextBytes = Buffer.byteLength(`${delimiter}${pageText}`, "utf-8"); + + if (aggregatedText && aggregatedBytes + nextBytes > params.maxBytes) { + capped = true; + continuationOffset = nextOffset; + break; + } + + aggregatedText += `${delimiter}${pageText}`; + aggregatedBytes += nextBytes; + + if (!canContinue || !truncation) { + return withToolResultText(pageResult, aggregatedText); + } + + nextOffset += truncation.outputLines; + continuationOffset = nextOffset; + + if (aggregatedBytes >= params.maxBytes) { + capped = true; + break; + } + } + + if (!firstResult) { + return await params.base.execute(params.toolCallId, params.args, params.signal); + } + + let finalText = aggregatedText; + if (capped && continuationOffset) { + finalText += `\n\n[Read output capped at ${formatBytes(params.maxBytes)} for this call. Use offset=${continuationOffset} to continue.]`; + } + return withToolResultText(firstResult, finalText); +} + function rewriteReadImageHeader(text: string, mimeType: string): string { // pi-coding-agent uses: "Read image file [image/png]" if (text.startsWith("Read image file [") && text.endsWith("]")) { @@ -324,13 +565,16 @@ export function wrapToolWorkspaceRootGuard(tool: AnyAgentTool, root: string): An type SandboxToolParams = { root: string; bridge: SandboxFsBridge; + modelContextWindowTokens?: number; }; export function createSandboxedReadTool(params: SandboxToolParams) { const base = createReadTool(params.root, { operations: createSandboxReadOperations(params), }) as unknown as AnyAgentTool; - return createOpenClawReadTool(base); + return createOpenClawReadTool(base, { + modelContextWindowTokens: params.modelContextWindowTokens, + }); } export function createSandboxedWriteTool(params: SandboxToolParams) { @@ -347,7 +591,10 @@ export function createSandboxedEditTool(params: SandboxToolParams) { return wrapToolParamNormalization(base, CLAUDE_PARAM_GROUPS.edit); } -export function createOpenClawReadTool(base: AnyAgentTool): AnyAgentTool { +export function createOpenClawReadTool( + base: AnyAgentTool, + options?: OpenClawReadToolOptions, +): AnyAgentTool { const patched = patchToolSchemaForClaudeCompatibility(base); return { ...patched, @@ -357,9 +604,16 @@ export function createOpenClawReadTool(base: AnyAgentTool): AnyAgentTool { normalized ?? (params && typeof params === "object" ? (params as Record) : undefined); assertRequiredParams(record, CLAUDE_PARAM_GROUPS.read, base.name); - const result = await base.execute(toolCallId, normalized ?? params, signal); + const result = await executeReadWithAdaptivePaging({ + base, + toolCallId, + args: (normalized ?? params ?? {}) as Record, + signal, + maxBytes: resolveAdaptiveReadMaxBytes(options), + }); const filePath = typeof record?.path === "string" ? String(record.path) : ""; - const normalizedResult = await normalizeReadImageResult(result, filePath); + const strippedDetailsResult = stripReadTruncationContentDetails(result); + const normalizedResult = await normalizeReadImageResult(strippedDetailsResult, filePath); return sanitizeToolResultImages(normalizedResult, `read:${filePath}`); }, }; diff --git a/src/agents/pi-tools.ts b/src/agents/pi-tools.ts index 39fde7b58e..66f7902695 100644 --- a/src/agents/pi-tools.ts +++ b/src/agents/pi-tools.ts @@ -179,6 +179,8 @@ export function createOpenClawCodingTools(options?: { modelProvider?: string; /** Model id for the current provider (used for model-specific tool gating). */ modelId?: string; + /** Model context window in tokens (used to scale read-tool output budget). */ + modelContextWindowTokens?: number; /** * Auth mode for the current provider. We only need this for Anthropic OAuth * tool-name blocking quirks. @@ -305,11 +307,14 @@ export function createOpenClawCodingTools(options?: { const sandboxed = createSandboxedReadTool({ root: sandboxRoot, bridge: sandboxFsBridge!, + modelContextWindowTokens: options?.modelContextWindowTokens, }); return [workspaceOnly ? wrapToolWorkspaceRootGuard(sandboxed, sandboxRoot) : sandboxed]; } const freshReadTool = createReadTool(workspaceRoot); - const wrapped = createOpenClawReadTool(freshReadTool); + const wrapped = createOpenClawReadTool(freshReadTool, { + modelContextWindowTokens: options?.modelContextWindowTokens, + }); return [workspaceOnly ? wrapToolWorkspaceRootGuard(wrapped, workspaceRoot) : wrapped]; } if (tool.name === "bash" || tool.name === execToolName) { diff --git a/src/agents/subagent-announce.ts b/src/agents/subagent-announce.ts index 8a5588014a..999cec35ef 100644 --- a/src/agents/subagent-announce.ts +++ b/src/agents/subagent-announce.ts @@ -296,6 +296,7 @@ export function buildSubagentSystemPrompt(params: { "3. **Don't initiate** - No heartbeats, no proactive actions, no side quests", "4. **Be ephemeral** - You may be terminated after task completion. That's fine.", "5. **Trust push-based completion** - Descendant results are auto-announced back to you; do not busy-poll for status.", + "6. **Recover from compacted/truncated tool output** - If you see `[compacted: tool output removed to free context]` or `[truncated: output exceeded context limit]`, assume prior output was reduced. Re-read only what you need using smaller chunks (`read` with offset/limit, or targeted `rg`/`head`/`tail`) instead of full-file `cat`.", "", "## Output Format", "When complete, your final response should include:", diff --git a/src/agents/system-prompt.e2e.test.ts b/src/agents/system-prompt.e2e.test.ts index 18fc269e03..ee8c4b9281 100644 --- a/src/agents/system-prompt.e2e.test.ts +++ b/src/agents/system-prompt.e2e.test.ts @@ -561,6 +561,20 @@ describe("buildSubagentSystemPrompt", () => { expect(prompt).toContain("reported to the main agent"); }); + it("includes recovery guidance for compacted/truncated tool output", () => { + const prompt = buildSubagentSystemPrompt({ + childSessionKey: "agent:main:subagent:abc", + task: "investigate logs", + childDepth: 1, + maxSpawnDepth: 2, + }); + + expect(prompt).toContain("[compacted: tool output removed to free context]"); + expect(prompt).toContain("[truncated: output exceeded context limit]"); + expect(prompt).toContain("offset/limit"); + expect(prompt).toContain("instead of full-file `cat`"); + }); + it("defaults to depth 1 and maxSpawnDepth 1 when not provided", () => { const prompt = buildSubagentSystemPrompt({ childSessionKey: "agent:main:subagent:abc", diff --git a/src/agents/system-prompt.ts b/src/agents/system-prompt.ts index d176997c1e..b244dd901f 100644 --- a/src/agents/system-prompt.ts +++ b/src/agents/system-prompt.ts @@ -87,6 +87,7 @@ function buildReplyTagsSection(isMinimal: boolean) { return [ "## Reply Tags", "To request a native reply/quote on supported surfaces, include one tag in your reply:", + "- Reply tags must be the very first token in the message (no leading text/newlines): [[reply_to_current]] your reply.", "- [[reply_to_current]] replies to the triggering message.", "- Prefer [[reply_to_current]]. Use [[reply_to:]] only when an id was explicitly provided (e.g. by the user or a tool).", "Whitespace inside the tag is allowed (e.g. [[ reply_to_current ]] / [[ reply_to: 123 ]]).", diff --git a/src/auto-reply/reply/inbound-meta.test.ts b/src/auto-reply/reply/inbound-meta.test.ts index 7e0e8854b1..1f463834a4 100644 --- a/src/auto-reply/reply/inbound-meta.test.ts +++ b/src/auto-reply/reply/inbound-meta.test.ts @@ -146,6 +146,16 @@ describe("buildInboundUserContextPrefix", () => { expect(conversationInfo["sender"]).toBe("+15551234567"); }); + it("includes message_id in conversation info", () => { + const text = buildInboundUserContextPrefix({ + ChatType: "direct", + MessageSid: " msg-123 ", + } as TemplateContext); + + const conversationInfo = parseConversationInfoPayload(text); + expect(conversationInfo["message_id"]).toBe("msg-123"); + }); + it("falls back to SenderId when sender phone is missing", () => { const text = buildInboundUserContextPrefix({ ChatType: "direct", diff --git a/src/auto-reply/reply/inbound-meta.ts b/src/auto-reply/reply/inbound-meta.ts index 109a8a3b07..17338d0612 100644 --- a/src/auto-reply/reply/inbound-meta.ts +++ b/src/auto-reply/reply/inbound-meta.ts @@ -61,6 +61,7 @@ export function buildInboundUserContextPrefix(ctx: TemplateContext): string { const isDirect = !chatType || chatType === "direct"; const conversationInfo = { + message_id: safeTrim(ctx.MessageSid), conversation_label: isDirect ? undefined : safeTrim(ctx.ConversationLabel), sender: safeTrim(ctx.SenderE164) ?? safeTrim(ctx.SenderId) ?? safeTrim(ctx.SenderUsername), group_subject: safeTrim(ctx.GroupSubject), diff --git a/src/auto-reply/reply/reply-utils.test.ts b/src/auto-reply/reply/reply-utils.test.ts index aeb9e8d66a..946fb74131 100644 --- a/src/auto-reply/reply/reply-utils.test.ts +++ b/src/auto-reply/reply/reply-utils.test.ts @@ -644,6 +644,34 @@ describe("createStreamingDirectiveAccumulator", () => { expect(result?.replyToId).toBe("abc-123"); expect(result?.replyToTag).toBe(true); }); + + it("keeps explicit reply ids sticky across subsequent renderable chunks", () => { + const accumulator = createStreamingDirectiveAccumulator(); + + expect(accumulator.consume("[[reply_to: abc-123]]")).toBeNull(); + + const first = accumulator.consume("test 1"); + expect(first?.replyToId).toBe("abc-123"); + expect(first?.replyToTag).toBe(true); + + const second = accumulator.consume("test 2"); + expect(second?.replyToId).toBe("abc-123"); + expect(second?.replyToTag).toBe(true); + }); + + it("clears sticky reply context on reset", () => { + const accumulator = createStreamingDirectiveAccumulator(); + + expect(accumulator.consume("[[reply_to_current]]")).toBeNull(); + expect(accumulator.consume("first")?.replyToCurrent).toBe(true); + + accumulator.reset(); + + const afterReset = accumulator.consume("second"); + expect(afterReset?.replyToCurrent).toBe(false); + expect(afterReset?.replyToTag).toBe(false); + expect(afterReset?.replyToId).toBeUndefined(); + }); }); describe("resolveResponsePrefixTemplate", () => { diff --git a/src/auto-reply/reply/streaming-directives.ts b/src/auto-reply/reply/streaming-directives.ts index c3a0cec758..13c5047a9e 100644 --- a/src/auto-reply/reply/streaming-directives.ts +++ b/src/auto-reply/reply/streaming-directives.ts @@ -74,10 +74,12 @@ const hasRenderableContent = (parsed: ReplyDirectiveParseResult): boolean => export function createStreamingDirectiveAccumulator() { let pendingTail = ""; let pendingReply: PendingReplyState = { sawCurrent: false, hasTag: false }; + let activeReply: PendingReplyState = { sawCurrent: false, hasTag: false }; const reset = () => { pendingTail = ""; pendingReply = { sawCurrent: false, hasTag: false }; + activeReply = { sawCurrent: false, hasTag: false }; }; const consume = (raw: string, options: ConsumeOptions = {}): ReplyDirectiveParseResult | null => { @@ -95,9 +97,10 @@ export function createStreamingDirectiveAccumulator() { } const parsed = parseChunk(combined, { silentToken: options.silentToken }); - const hasTag = pendingReply.hasTag || parsed.replyToTag; - const sawCurrent = pendingReply.sawCurrent || parsed.replyToCurrent; - const explicitId = parsed.replyToExplicitId ?? pendingReply.explicitId; + const hasTag = activeReply.hasTag || pendingReply.hasTag || parsed.replyToTag; + const sawCurrent = activeReply.sawCurrent || pendingReply.sawCurrent || parsed.replyToCurrent; + const explicitId = + parsed.replyToExplicitId ?? pendingReply.explicitId ?? activeReply.explicitId; const combinedResult: ReplyDirectiveParseResult = { ...parsed, @@ -117,6 +120,13 @@ export function createStreamingDirectiveAccumulator() { return null; } + // Keep reply context sticky for the full assistant message so split/newline chunks + // stay on the same native reply target until reset() is called for the next message. + activeReply = { + explicitId, + sawCurrent, + hasTag, + }; pendingReply = { sawCurrent: false, hasTag: false }; return combinedResult; }; diff --git a/src/channels/plugins/outbound/imessage.test.ts b/src/channels/plugins/outbound/imessage.test.ts new file mode 100644 index 0000000000..7ebcc85379 --- /dev/null +++ b/src/channels/plugins/outbound/imessage.test.ts @@ -0,0 +1,69 @@ +import { describe, expect, it, vi } from "vitest"; +import type { OpenClawConfig } from "../../../config/config.js"; +import { imessageOutbound } from "./imessage.js"; + +describe("imessageOutbound", () => { + const cfg: OpenClawConfig = { + channels: { + imessage: { + mediaMaxMb: 2, + }, + }, + }; + + it("passes replyToId through sendText", async () => { + const sendIMessage = vi.fn().mockResolvedValue({ messageId: "text-1" }); + const sendText = imessageOutbound.sendText; + expect(sendText).toBeDefined(); + + const result = await sendText!({ + cfg, + to: "chat_id:123", + text: "hello", + accountId: "default", + replyToId: "msg-123", + deps: { sendIMessage }, + }); + + expect(sendIMessage).toHaveBeenCalledWith( + "chat_id:123", + "hello", + expect.objectContaining({ + replyToId: "msg-123", + accountId: "default", + maxBytes: 2 * 1024 * 1024, + }), + ); + expect(result).toEqual({ channel: "imessage", messageId: "text-1" }); + }); + + it("passes replyToId through sendMedia", async () => { + const sendIMessage = vi.fn().mockResolvedValue({ messageId: "media-1" }); + const sendMedia = imessageOutbound.sendMedia; + expect(sendMedia).toBeDefined(); + + const result = await sendMedia!({ + cfg, + to: "chat_id:123", + text: "caption", + mediaUrl: "https://example.com/file.jpg", + mediaLocalRoots: ["/tmp"], + accountId: "acct-1", + replyToId: "msg-456", + deps: { sendIMessage }, + }); + + expect(sendIMessage).toHaveBeenCalledWith( + "chat_id:123", + "caption", + expect.objectContaining({ + mediaUrl: "https://example.com/file.jpg", + mediaLocalRoots: ["/tmp"], + replyToId: "msg-456", + accountId: "acct-1", + maxBytes: 2 * 1024 * 1024, + }), + ); + expect(result).toEqual({ channel: "imessage", messageId: "media-1" }); + }); +}); diff --git a/src/channels/plugins/outbound/imessage.ts b/src/channels/plugins/outbound/imessage.ts index b4d2a8ad7a..6888ef1d58 100644 --- a/src/channels/plugins/outbound/imessage.ts +++ b/src/channels/plugins/outbound/imessage.ts @@ -21,22 +21,24 @@ export const imessageOutbound: ChannelOutboundAdapter = { chunker: chunkText, chunkerMode: "text", textChunkLimit: 4000, - sendText: async ({ cfg, to, text, accountId, deps }) => { + sendText: async ({ cfg, to, text, accountId, deps, replyToId }) => { const send = deps?.sendIMessage ?? sendMessageIMessage; const maxBytes = resolveIMessageMaxBytes({ cfg, accountId }); const result = await send(to, text, { maxBytes, accountId: accountId ?? undefined, + replyToId: replyToId ?? undefined, }); return { channel: "imessage", ...result }; }, - sendMedia: async ({ cfg, to, text, mediaUrl, mediaLocalRoots, accountId, deps }) => { + sendMedia: async ({ cfg, to, text, mediaUrl, mediaLocalRoots, accountId, deps, replyToId }) => { const send = deps?.sendIMessage ?? sendMessageIMessage; const maxBytes = resolveIMessageMaxBytes({ cfg, accountId }); const result = await send(to, text, { mediaUrl, maxBytes, accountId: accountId ?? undefined, + replyToId: replyToId ?? undefined, mediaLocalRoots, }); return { channel: "imessage", ...result }; diff --git a/src/cron/isolated-agent/run.skill-filter.test.ts b/src/cron/isolated-agent/run.skill-filter.test.ts index 52bc79b8c6..7fb8703f0e 100644 --- a/src/cron/isolated-agent/run.skill-filter.test.ts +++ b/src/cron/isolated-agent/run.skill-filter.test.ts @@ -109,11 +109,14 @@ vi.mock("../../config/sessions.js", () => ({ updateSessionStore: vi.fn().mockResolvedValue(undefined), })); -vi.mock("../../routing/session-key.js", () => ({ - buildAgentMainSessionKey: vi.fn().mockReturnValue("agent:default:cron:test"), - normalizeAgentId: vi.fn((id: string) => id), - DEFAULT_ACCOUNT_ID: "default", -})); +vi.mock("../../routing/session-key.js", async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + buildAgentMainSessionKey: vi.fn().mockReturnValue("agent:default:cron:test"), + normalizeAgentId: vi.fn((id: string) => id), + }; +}); vi.mock("../../infra/agent-events.js", () => ({ registerAgentRunContext: vi.fn(), diff --git a/src/discord/monitor/reply-delivery.test.ts b/src/discord/monitor/reply-delivery.test.ts index cf3b312ee1..9093da63a5 100644 --- a/src/discord/monitor/reply-delivery.test.ts +++ b/src/discord/monitor/reply-delivery.test.ts @@ -52,16 +52,18 @@ describe("deliverDiscordReply", () => { 1, "channel:123", "Hello there", - expect.objectContaining({ token: "token" }), + expect.objectContaining({ token: "token", replyTo: "reply-1" }), ); expect(sendMessageDiscordMock).toHaveBeenNthCalledWith( 2, "channel:123", "", - expect.objectContaining({ token: "token", mediaUrl: "https://example.com/extra.mp3" }), + expect.objectContaining({ + token: "token", + mediaUrl: "https://example.com/extra.mp3", + replyTo: "reply-1", + }), ); - expect(sendMessageDiscordMock.mock.calls[0]?.[2]).not.toHaveProperty("replyTo"); - expect(sendMessageDiscordMock.mock.calls[1]?.[2]).not.toHaveProperty("replyTo"); }); it("skips follow-up text when the voice payload text is blank", async () => { diff --git a/src/discord/monitor/reply-delivery.ts b/src/discord/monitor/reply-delivery.ts index 7690ed2440..ae22e70866 100644 --- a/src/discord/monitor/reply-delivery.ts +++ b/src/discord/monitor/reply-delivery.ts @@ -32,7 +32,6 @@ export async function deliverDiscordReply(params: { const replyTo = params.replyToId?.trim() || undefined; if (mediaList.length === 0) { - let isFirstChunk = true; const mode = params.chunkMode ?? "length"; const chunks = chunkDiscordTextWithMode(text, { maxChars: chunkLimit, @@ -51,9 +50,8 @@ export async function deliverDiscordReply(params: { token: params.token, rest: params.rest, accountId: params.accountId, - replyTo: isFirstChunk ? replyTo : undefined, + replyTo, }); - isFirstChunk = false; } continue; } @@ -77,6 +75,7 @@ export async function deliverDiscordReply(params: { token: params.token, rest: params.rest, accountId: params.accountId, + replyTo, }); } // Additional media items are sent as regular attachments (voice is single-file only) @@ -86,6 +85,7 @@ export async function deliverDiscordReply(params: { rest: params.rest, mediaUrl: extra, accountId: params.accountId, + replyTo, }); } continue; @@ -104,6 +104,7 @@ export async function deliverDiscordReply(params: { rest: params.rest, mediaUrl: extra, accountId: params.accountId, + replyTo, }); } } diff --git a/src/discord/send.sends-basic-channel-messages.test.ts b/src/discord/send.sends-basic-channel-messages.test.ts index e523cb6195..a765eed1f8 100644 --- a/src/discord/send.sends-basic-channel-messages.test.ts +++ b/src/discord/send.sends-basic-channel-messages.test.ts @@ -256,7 +256,7 @@ describe("sendMessageDiscord", () => { }); }); - it("replies only on the first chunk", async () => { + it("preserves reply reference across all text chunks", async () => { const { rest, postMock } = makeDiscordRest(); postMock.mockResolvedValue({ id: "msg1", channel_id: "789" }); await sendMessageDiscord("channel:789", "a".repeat(2001), { @@ -271,7 +271,32 @@ describe("sendMessageDiscord", () => { message_id: "orig-123", fail_if_not_exists: false, }); - expect(secondBody?.message_reference).toBeUndefined(); + expect(secondBody?.message_reference).toEqual({ + message_id: "orig-123", + fail_if_not_exists: false, + }); + }); + + it("preserves reply reference for follow-up text chunks after media caption split", async () => { + const { rest, postMock } = makeDiscordRest(); + postMock.mockResolvedValue({ id: "msg1", channel_id: "789" }); + await sendMessageDiscord("channel:789", "a".repeat(2500), { + rest, + token: "t", + mediaUrl: "file:///tmp/photo.jpg", + replyTo: "orig-123", + }); + expect(postMock).toHaveBeenCalledTimes(2); + const firstBody = postMock.mock.calls[0]?.[1]?.body; + const secondBody = postMock.mock.calls[1]?.[1]?.body; + expect(firstBody?.message_reference).toEqual({ + message_id: "orig-123", + fail_if_not_exists: false, + }); + expect(secondBody?.message_reference).toEqual({ + message_id: "orig-123", + fail_if_not_exists: false, + }); }); }); diff --git a/src/discord/send.shared.ts b/src/discord/send.shared.ts index f195e781ec..2d463f6185 100644 --- a/src/discord/send.shared.ts +++ b/src/discord/send.shared.ts @@ -362,7 +362,7 @@ async function sendDiscordText( }); const body = stripUndefinedFields({ ...serializePayload(payload), - ...(isFirst && messageReference ? { message_reference: messageReference } : {}), + ...(messageReference ? { message_reference: messageReference } : {}), }); return (await request( () => @@ -376,10 +376,8 @@ async function sendDiscordText( return await sendChunk(chunks[0], true); } let last: { id: string; channel_id: string } | null = null; - let isFirst = true; - for (const chunk of chunks) { - last = await sendChunk(chunk, isFirst); - isFirst = false; + for (const [index, chunk] of chunks.entries()) { + last = await sendChunk(chunk, index === 0); } if (!last) { throw new Error("Discord send failed (empty chunk result)"); @@ -450,7 +448,7 @@ async function sendDiscordMedia( rest, channelId, chunk, - undefined, + replyTo, request, maxLinesPerMessage, undefined, diff --git a/src/gateway/server-methods/agent-job.ts b/src/gateway/server-methods/agent-job.ts index b1105e8fa9..1acd1bea17 100644 --- a/src/gateway/server-methods/agent-job.ts +++ b/src/gateway/server-methods/agent-job.ts @@ -1,8 +1,16 @@ import { onAgentEvent } from "../../infra/agent-events.js"; const AGENT_RUN_CACHE_TTL_MS = 10 * 60_000; +/** + * Embedded runs can emit transient lifecycle `error` events while auth/model + * failover is still in progress. Give errors a short grace window so a + * subsequent `start` event can cancel premature terminal snapshots. + */ +const AGENT_RUN_ERROR_RETRY_GRACE_MS = 15_000; + const agentRunCache = new Map(); const agentRunStarts = new Map(); +const pendingAgentRunErrors = new Map(); let agentRunListenerStarted = false; type AgentRunSnapshot = { @@ -14,6 +22,12 @@ type AgentRunSnapshot = { ts: number; }; +type PendingAgentRunError = { + snapshot: AgentRunSnapshot; + dueAt: number; + timer: NodeJS.Timeout; +}; + function pruneAgentRunCache(now = Date.now()) { for (const [runId, entry] of agentRunCache) { if (now - entry.ts > AGENT_RUN_CACHE_TTL_MS) { @@ -27,6 +41,61 @@ function recordAgentRunSnapshot(entry: AgentRunSnapshot) { agentRunCache.set(entry.runId, entry); } +function clearPendingAgentRunError(runId: string) { + const pending = pendingAgentRunErrors.get(runId); + if (!pending) { + return; + } + clearTimeout(pending.timer); + pendingAgentRunErrors.delete(runId); +} + +function schedulePendingAgentRunError(snapshot: AgentRunSnapshot) { + clearPendingAgentRunError(snapshot.runId); + const dueAt = Date.now() + AGENT_RUN_ERROR_RETRY_GRACE_MS; + const timer = setTimeout(() => { + const pending = pendingAgentRunErrors.get(snapshot.runId); + if (!pending) { + return; + } + pendingAgentRunErrors.delete(snapshot.runId); + recordAgentRunSnapshot(pending.snapshot); + }, AGENT_RUN_ERROR_RETRY_GRACE_MS); + timer.unref?.(); + pendingAgentRunErrors.set(snapshot.runId, { snapshot, dueAt, timer }); +} + +function getPendingAgentRunError(runId: string) { + const pending = pendingAgentRunErrors.get(runId); + if (!pending) { + return undefined; + } + return { + snapshot: pending.snapshot, + dueAt: pending.dueAt, + }; +} + +function createSnapshotFromLifecycleEvent(params: { + runId: string; + phase: "end" | "error"; + data?: Record; +}): AgentRunSnapshot { + const { runId, phase, data } = params; + const startedAt = + typeof data?.startedAt === "number" ? data.startedAt : agentRunStarts.get(runId); + const endedAt = typeof data?.endedAt === "number" ? data.endedAt : undefined; + const error = typeof data?.error === "string" ? data.error : undefined; + return { + runId, + status: phase === "error" ? "error" : data?.aborted ? "timeout" : "ok", + startedAt, + endedAt, + error, + ts: Date.now(), + }; +} + function ensureAgentRunListener() { if (agentRunListenerStarted) { return; @@ -43,24 +112,27 @@ function ensureAgentRunListener() { if (phase === "start") { const startedAt = typeof evt.data?.startedAt === "number" ? evt.data.startedAt : undefined; agentRunStarts.set(evt.runId, startedAt ?? Date.now()); + clearPendingAgentRunError(evt.runId); + // A new start means this run is active again (or retried). Drop stale + // terminal snapshots so waiters don't resolve from old state. + agentRunCache.delete(evt.runId); return; } if (phase !== "end" && phase !== "error") { return; } - const startedAt = - typeof evt.data?.startedAt === "number" ? evt.data.startedAt : agentRunStarts.get(evt.runId); - const endedAt = typeof evt.data?.endedAt === "number" ? evt.data.endedAt : undefined; - const error = typeof evt.data?.error === "string" ? evt.data.error : undefined; - agentRunStarts.delete(evt.runId); - recordAgentRunSnapshot({ + const snapshot = createSnapshotFromLifecycleEvent({ runId: evt.runId, - status: phase === "error" ? "error" : evt.data?.aborted ? "timeout" : "ok", - startedAt, - endedAt, - error, - ts: Date.now(), + phase, + data: evt.data, }); + agentRunStarts.delete(evt.runId); + if (phase === "error") { + schedulePendingAgentRunError(snapshot); + return; + } + clearPendingAgentRunError(evt.runId); + recordAgentRunSnapshot(snapshot); }); } @@ -85,15 +157,50 @@ export async function waitForAgentJob(params: { return await new Promise((resolve) => { let settled = false; + let pendingErrorTimer: NodeJS.Timeout | undefined; + + const clearPendingErrorTimer = () => { + if (!pendingErrorTimer) { + return; + } + clearTimeout(pendingErrorTimer); + pendingErrorTimer = undefined; + }; + const finish = (entry: AgentRunSnapshot | null) => { if (settled) { return; } settled = true; clearTimeout(timer); + clearPendingErrorTimer(); unsubscribe(); resolve(entry); }; + + const scheduleErrorFinish = ( + snapshot: AgentRunSnapshot, + delayMs = AGENT_RUN_ERROR_RETRY_GRACE_MS, + ) => { + clearPendingErrorTimer(); + const effectiveDelay = Math.max(1, Math.min(Math.floor(delayMs), 2_147_483_647)); + pendingErrorTimer = setTimeout(() => { + const latest = getCachedAgentRun(runId); + if (latest) { + finish(latest); + return; + } + recordAgentRunSnapshot(snapshot); + finish(snapshot); + }, effectiveDelay); + pendingErrorTimer.unref?.(); + }; + + const pending = getPendingAgentRunError(runId); + if (pending) { + scheduleErrorFinish(pending.snapshot, pending.dueAt - Date.now()); + } + const unsubscribe = onAgentEvent((evt) => { if (!evt || evt.stream !== "lifecycle") { return; @@ -102,31 +209,31 @@ export async function waitForAgentJob(params: { return; } const phase = evt.data?.phase; + if (phase === "start") { + clearPendingErrorTimer(); + return; + } if (phase !== "end" && phase !== "error") { return; } - const cached = getCachedAgentRun(runId); - if (cached) { - finish(cached); + const latest = getCachedAgentRun(runId); + if (latest) { + finish(latest); return; } - const startedAt = - typeof evt.data?.startedAt === "number" - ? evt.data.startedAt - : agentRunStarts.get(evt.runId); - const endedAt = typeof evt.data?.endedAt === "number" ? evt.data.endedAt : undefined; - const error = typeof evt.data?.error === "string" ? evt.data.error : undefined; - const snapshot: AgentRunSnapshot = { + const snapshot = createSnapshotFromLifecycleEvent({ runId: evt.runId, - status: phase === "error" ? "error" : evt.data?.aborted ? "timeout" : "ok", - startedAt, - endedAt, - error, - ts: Date.now(), - }; + phase, + data: evt.data, + }); + if (phase === "error") { + scheduleErrorFinish(snapshot); + return; + } recordAgentRunSnapshot(snapshot); finish(snapshot); }); + const timerDelayMs = Math.max(1, Math.min(Math.floor(timeoutMs), 2_147_483_647)); const timer = setTimeout(() => finish(null), timerDelayMs); }); diff --git a/src/gateway/server-methods/server-methods.test.ts b/src/gateway/server-methods/server-methods.test.ts index 38e2de9dfb..f3527e61c0 100644 --- a/src/gateway/server-methods/server-methods.test.ts +++ b/src/gateway/server-methods/server-methods.test.ts @@ -25,6 +25,17 @@ type HealthStatusHandlerParams = Parameters< >[0]; describe("waitForAgentJob", () => { + const AGENT_RUN_ERROR_RETRY_GRACE_MS = 15_000; + + beforeEach(() => { + vi.useFakeTimers(); + vi.setSystemTime(new Date("2026-01-01T00:00:00.000Z")); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + it("maps lifecycle end events with aborted=true to timeout", async () => { const runId = `run-timeout-${Date.now()}-${Math.random().toString(36).slice(2)}`; const waitPromise = waitForAgentJob({ runId, timeoutMs: 1_000 }); @@ -56,6 +67,86 @@ describe("waitForAgentJob", () => { expect(snapshot?.startedAt).toBe(300); expect(snapshot?.endedAt).toBe(400); }); + + it("treats transient error->start->end as recovered when restart lands inside grace", async () => { + const runId = `run-recover-${Date.now()}-${Math.random().toString(36).slice(2)}`; + const waitPromise = waitForAgentJob({ runId, timeoutMs: 60_000 }); + + emitAgentEvent({ runId, stream: "lifecycle", data: { phase: "start", startedAt: 100 } }); + emitAgentEvent({ + runId, + stream: "lifecycle", + data: { phase: "error", endedAt: 110, error: "transient" }, + }); + + await vi.advanceTimersByTimeAsync(1_000); + emitAgentEvent({ runId, stream: "lifecycle", data: { phase: "start", startedAt: 200 } }); + emitAgentEvent({ runId, stream: "lifecycle", data: { phase: "end", endedAt: 260 } }); + + const snapshot = await waitPromise; + expect(snapshot).not.toBeNull(); + expect(snapshot?.status).toBe("ok"); + expect(snapshot?.startedAt).toBe(200); + expect(snapshot?.endedAt).toBe(260); + }); + + it("resolves error only after grace expires when no recovery start arrives", async () => { + const runId = `run-error-${Date.now()}-${Math.random().toString(36).slice(2)}`; + const waitPromise = waitForAgentJob({ runId, timeoutMs: 60_000 }); + + emitAgentEvent({ runId, stream: "lifecycle", data: { phase: "start", startedAt: 10 } }); + emitAgentEvent({ + runId, + stream: "lifecycle", + data: { phase: "error", endedAt: 20, error: "fatal" }, + }); + + let settled = false; + void waitPromise.finally(() => { + settled = true; + }); + + await vi.advanceTimersByTimeAsync(AGENT_RUN_ERROR_RETRY_GRACE_MS - 1); + expect(settled).toBe(false); + + await vi.advanceTimersByTimeAsync(1); + const snapshot = await waitPromise; + expect(snapshot).not.toBeNull(); + expect(snapshot?.status).toBe("error"); + expect(snapshot?.error).toBe("fatal"); + expect(snapshot?.startedAt).toBe(10); + expect(snapshot?.endedAt).toBe(20); + }); + + it("honors pending error grace when waiter attaches after the error event", async () => { + const runId = `run-late-wait-${Date.now()}-${Math.random().toString(36).slice(2)}`; + + emitAgentEvent({ runId, stream: "lifecycle", data: { phase: "start", startedAt: 900 } }); + emitAgentEvent({ + runId, + stream: "lifecycle", + data: { phase: "error", endedAt: 999, error: "late-listener" }, + }); + + await vi.advanceTimersByTimeAsync(5_000); + + const waitPromise = waitForAgentJob({ runId, timeoutMs: 60_000 }); + let settled = false; + void waitPromise.finally(() => { + settled = true; + }); + + await vi.advanceTimersByTimeAsync(AGENT_RUN_ERROR_RETRY_GRACE_MS - 5_001); + expect(settled).toBe(false); + + await vi.advanceTimersByTimeAsync(1); + const snapshot = await waitPromise; + expect(snapshot).not.toBeNull(); + expect(snapshot?.status).toBe("error"); + expect(snapshot?.error).toBe("late-listener"); + expect(snapshot?.startedAt).toBe(900); + expect(snapshot?.endedAt).toBe(999); + }); }); describe("injectTimestamp", () => { diff --git a/src/gateway/server-node-events.test.ts b/src/gateway/server-node-events.test.ts index ef9ff66f96..1874b7e070 100644 --- a/src/gateway/server-node-events.test.ts +++ b/src/gateway/server-node-events.test.ts @@ -263,7 +263,7 @@ describe("voice transcript events", () => { sessionKey: "voice-store-fail-session", }), }); - await new Promise((resolve) => setTimeout(resolve, 0)); + await Promise.resolve(); expect(agentCommandMock).toHaveBeenCalledTimes(1); expect(warn).toHaveBeenCalledWith(expect.stringContaining("voice session-store update failed")); diff --git a/src/imessage/monitor/deliver.test.ts b/src/imessage/monitor/deliver.test.ts new file mode 100644 index 0000000000..51fda4de76 --- /dev/null +++ b/src/imessage/monitor/deliver.test.ts @@ -0,0 +1,125 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; +import type { RuntimeEnv } from "../../runtime.js"; + +const sendMessageIMessageMock = vi.hoisted(() => + vi.fn().mockResolvedValue({ messageId: "imsg-1" }), +); +const chunkTextWithModeMock = vi.hoisted(() => vi.fn((text: string) => [text])); +const resolveChunkModeMock = vi.hoisted(() => vi.fn(() => "length")); +const convertMarkdownTablesMock = vi.hoisted(() => vi.fn((text: string) => text)); +const resolveMarkdownTableModeMock = vi.hoisted(() => vi.fn(() => "code")); + +vi.mock("../send.js", () => ({ + sendMessageIMessage: (...args: unknown[]) => sendMessageIMessageMock(...args), +})); + +vi.mock("../../auto-reply/chunk.js", () => ({ + chunkTextWithMode: (...args: unknown[]) => chunkTextWithModeMock(...args), + resolveChunkMode: (...args: unknown[]) => resolveChunkModeMock(...args), +})); + +vi.mock("../../config/config.js", () => ({ + loadConfig: () => ({}), +})); + +vi.mock("../../config/markdown-tables.js", () => ({ + resolveMarkdownTableMode: (...args: unknown[]) => resolveMarkdownTableModeMock(...args), +})); + +vi.mock("../../markdown/tables.js", () => ({ + convertMarkdownTables: (...args: unknown[]) => convertMarkdownTablesMock(...args), +})); + +import { deliverReplies } from "./deliver.js"; + +describe("deliverReplies", () => { + const runtime = { log: vi.fn(), error: vi.fn() } as unknown as RuntimeEnv; + const client = {} as Awaited>; + + beforeEach(() => { + vi.clearAllMocks(); + chunkTextWithModeMock.mockImplementation((text: string) => [text]); + }); + + it("propagates payload replyToId through all text chunks", async () => { + chunkTextWithModeMock.mockImplementation((text: string) => text.split("|")); + + await deliverReplies({ + replies: [{ text: "first|second", replyToId: "reply-1" }], + target: "chat_id:10", + client, + accountId: "default", + runtime, + maxBytes: 4096, + textLimit: 4000, + }); + + expect(sendMessageIMessageMock).toHaveBeenCalledTimes(2); + expect(sendMessageIMessageMock).toHaveBeenNthCalledWith( + 1, + "chat_id:10", + "first", + expect.objectContaining({ + client, + maxBytes: 4096, + accountId: "default", + replyToId: "reply-1", + }), + ); + expect(sendMessageIMessageMock).toHaveBeenNthCalledWith( + 2, + "chat_id:10", + "second", + expect.objectContaining({ + client, + maxBytes: 4096, + accountId: "default", + replyToId: "reply-1", + }), + ); + }); + + it("propagates payload replyToId through media sends", async () => { + await deliverReplies({ + replies: [ + { + text: "caption", + mediaUrls: ["https://example.com/a.jpg", "https://example.com/b.jpg"], + replyToId: "reply-2", + }, + ], + target: "chat_id:20", + client, + accountId: "acct-2", + runtime, + maxBytes: 8192, + textLimit: 4000, + }); + + expect(sendMessageIMessageMock).toHaveBeenCalledTimes(2); + expect(sendMessageIMessageMock).toHaveBeenNthCalledWith( + 1, + "chat_id:20", + "caption", + expect.objectContaining({ + mediaUrl: "https://example.com/a.jpg", + client, + maxBytes: 8192, + accountId: "acct-2", + replyToId: "reply-2", + }), + ); + expect(sendMessageIMessageMock).toHaveBeenNthCalledWith( + 2, + "chat_id:20", + "", + expect.objectContaining({ + mediaUrl: "https://example.com/b.jpg", + client, + maxBytes: 8192, + accountId: "acct-2", + replyToId: "reply-2", + }), + ); + }); +}); diff --git a/src/imessage/monitor/deliver.ts b/src/imessage/monitor/deliver.ts index f929f32c93..84bd8994c1 100644 --- a/src/imessage/monitor/deliver.ts +++ b/src/imessage/monitor/deliver.ts @@ -45,6 +45,7 @@ export async function deliverReplies(params: { maxBytes, client, accountId, + replyToId: payload.replyToId, }); sentMessageCache?.remember(scope, chunk); } @@ -58,6 +59,7 @@ export async function deliverReplies(params: { maxBytes, client, accountId, + replyToId: payload.replyToId, }); if (caption) { sentMessageCache?.remember(scope, caption); diff --git a/src/imessage/send.test.ts b/src/imessage/send.test.ts index 8a8dadd3e2..1e985a229a 100644 --- a/src/imessage/send.test.ts +++ b/src/imessage/send.test.ts @@ -78,4 +78,60 @@ describe("sendMessageIMessage", () => { }); expect(result.messageId).toBe("123"); }); + + it("prepends reply tag as the first token when replyToId is provided", async () => { + await sendMessageIMessage("chat_id:123", " hello\nworld", { + replyToId: "abc-123", + account: defaultAccount, + config: {}, + client: { + request: (...args: unknown[]) => requestMock(...args), + stop: (...args: unknown[]) => stopMock(...args), + } as unknown as import("./client.js").IMessageRpcClient, + }); + const params = requestMock.mock.calls[0]?.[1] as Record; + expect(params.text).toBe("[[reply_to:abc-123]] hello\nworld"); + }); + + it("rewrites an existing leading reply tag to keep the requested id first", async () => { + await sendMessageIMessage("chat_id:123", " [[reply_to:old-id]] hello", { + replyToId: "new-id", + account: defaultAccount, + config: {}, + client: { + request: (...args: unknown[]) => requestMock(...args), + stop: (...args: unknown[]) => stopMock(...args), + } as unknown as import("./client.js").IMessageRpcClient, + }); + const params = requestMock.mock.calls[0]?.[1] as Record; + expect(params.text).toBe("[[reply_to:new-id]] hello"); + }); + + it("sanitizes replyToId before writing the leading reply tag", async () => { + await sendMessageIMessage("chat_id:123", "hello", { + replyToId: " [ab]\n\u0000c\td ] ", + account: defaultAccount, + config: {}, + client: { + request: (...args: unknown[]) => requestMock(...args), + stop: (...args: unknown[]) => stopMock(...args), + } as unknown as import("./client.js").IMessageRpcClient, + }); + const params = requestMock.mock.calls[0]?.[1] as Record; + expect(params.text).toBe("[[reply_to:abcd]] hello"); + }); + + it("skips reply tagging when sanitized replyToId is empty", async () => { + await sendMessageIMessage("chat_id:123", "hello", { + replyToId: "[]\u0000\n\r", + account: defaultAccount, + config: {}, + client: { + request: (...args: unknown[]) => requestMock(...args), + stop: (...args: unknown[]) => stopMock(...args), + } as unknown as import("./client.js").IMessageRpcClient, + }); + const params = requestMock.mock.calls[0]?.[1] as Record; + expect(params.text).toBe("hello"); + }); }); diff --git a/src/imessage/send.ts b/src/imessage/send.ts index 03d4544d15..7c3345b757 100644 --- a/src/imessage/send.ts +++ b/src/imessage/send.ts @@ -13,6 +13,7 @@ export type IMessageSendOpts = { service?: IMessageService; region?: string; accountId?: string; + replyToId?: string; mediaUrl?: string; mediaLocalRoots?: readonly string[]; maxBytes?: number; @@ -33,6 +34,51 @@ export type IMessageSendResult = { messageId: string; }; +const LEADING_REPLY_TAG_RE = /^\s*\[\[\s*reply_to\s*:\s*([^\]\n]+)\s*\]\]\s*/i; +const MAX_REPLY_TO_ID_LENGTH = 256; + +function stripUnsafeReplyTagChars(value: string): string { + let next = ""; + for (const ch of value) { + const code = ch.charCodeAt(0); + if ((code >= 0 && code <= 31) || code === 127 || ch === "[" || ch === "]") { + continue; + } + next += ch; + } + return next; +} + +function sanitizeReplyToId(rawReplyToId?: string): string | undefined { + const trimmed = rawReplyToId?.trim(); + if (!trimmed) { + return undefined; + } + const sanitized = stripUnsafeReplyTagChars(trimmed).trim(); + if (!sanitized) { + return undefined; + } + if (sanitized.length > MAX_REPLY_TO_ID_LENGTH) { + return sanitized.slice(0, MAX_REPLY_TO_ID_LENGTH); + } + return sanitized; +} + +function prependReplyTagIfNeeded(message: string, replyToId?: string): string { + const resolvedReplyToId = sanitizeReplyToId(replyToId); + if (!resolvedReplyToId) { + return message; + } + const replyTag = `[[reply_to:${resolvedReplyToId}]]`; + const existingLeadingTag = message.match(LEADING_REPLY_TAG_RE); + if (existingLeadingTag) { + const remainder = message.slice(existingLeadingTag[0].length).trimStart(); + return remainder ? `${replyTag} ${remainder}` : replyTag; + } + const trimmedMessage = message.trimStart(); + return trimmedMessage ? `${replyTag} ${trimmedMessage}` : replyTag; +} + function resolveMessageId(result: Record | null | undefined): string | null { if (!result) { return null; @@ -101,6 +147,7 @@ export async function sendMessageIMessage( }); message = convertMarkdownTables(message, tableMode); } + message = prependReplyTagIfNeeded(message, opts.replyToId); const params: Record = { text: message, diff --git a/src/infra/outbound/deliver.test.ts b/src/infra/outbound/deliver.test.ts index 91760eb843..bc2031c8ac 100644 --- a/src/infra/outbound/deliver.test.ts +++ b/src/infra/outbound/deliver.test.ts @@ -117,6 +117,32 @@ describe("deliverOutboundPayloads", () => { } }); + it("keeps payload replyToId across all chunked telegram sends", async () => { + const sendTelegram = vi.fn().mockResolvedValue({ messageId: "m1", chatId: "c1" }); + const prevTelegramToken = process.env.TELEGRAM_BOT_TOKEN; + process.env.TELEGRAM_BOT_TOKEN = ""; + try { + await deliverOutboundPayloads({ + cfg: telegramChunkConfig, + channel: "telegram", + to: "123", + payloads: [{ text: "abcd", replyToId: "777" }], + deps: { sendTelegram }, + }); + + expect(sendTelegram).toHaveBeenCalledTimes(2); + for (const call of sendTelegram.mock.calls) { + expect(call[2]).toEqual(expect.objectContaining({ replyToMessageId: 777 })); + } + } finally { + if (prevTelegramToken === undefined) { + delete process.env.TELEGRAM_BOT_TOKEN; + } else { + process.env.TELEGRAM_BOT_TOKEN = prevTelegramToken; + } + } + }); + it("passes explicit accountId to sendTelegram", async () => { const sendTelegram = vi.fn().mockResolvedValue({ messageId: "m1", chatId: "c1" }); @@ -530,12 +556,10 @@ describe("deliverOutboundPayloads", () => { deps: { sendWhatsApp }, }); - await vi.waitFor(() => { - expect(hookMocks.runner.runMessageSent).toHaveBeenCalledWith( - expect.objectContaining({ to: "+1555", content: "hello", success: true }), - expect.objectContaining({ channelId: "whatsapp" }), - ); - }); + expect(hookMocks.runner.runMessageSent).toHaveBeenCalledWith( + expect.objectContaining({ to: "+1555", content: "hello", success: true }), + expect.objectContaining({ channelId: "whatsapp" }), + ); }); it("emits message_sent success for sendPayload deliveries", async () => { @@ -563,12 +587,10 @@ describe("deliverOutboundPayloads", () => { payloads: [{ text: "payload text", channelData: { mode: "custom" } }], }); - await vi.waitFor(() => { - expect(hookMocks.runner.runMessageSent).toHaveBeenCalledWith( - expect.objectContaining({ to: "!room:1", content: "payload text", success: true }), - expect.objectContaining({ channelId: "matrix" }), - ); - }); + expect(hookMocks.runner.runMessageSent).toHaveBeenCalledWith( + expect.objectContaining({ to: "!room:1", content: "payload text", success: true }), + expect.objectContaining({ channelId: "matrix" }), + ); }); it("emits message_sent failure when delivery errors", async () => { @@ -585,17 +607,15 @@ describe("deliverOutboundPayloads", () => { }), ).rejects.toThrow("downstream failed"); - await vi.waitFor(() => { - expect(hookMocks.runner.runMessageSent).toHaveBeenCalledWith( - expect.objectContaining({ - to: "+1555", - content: "hi", - success: false, - error: "downstream failed", - }), - expect.objectContaining({ channelId: "whatsapp" }), - ); - }); + expect(hookMocks.runner.runMessageSent).toHaveBeenCalledWith( + expect.objectContaining({ + to: "+1555", + content: "hi", + success: false, + error: "downstream failed", + }), + expect.objectContaining({ channelId: "whatsapp" }), + ); }); }); diff --git a/src/infra/outbound/deliver.ts b/src/infra/outbound/deliver.ts index c48dea0d04..77dba9bfc7 100644 --- a/src/infra/outbound/deliver.ts +++ b/src/infra/outbound/deliver.ts @@ -77,9 +77,28 @@ type ChannelHandler = { chunker: Chunker | null; chunkerMode?: "text" | "markdown"; textChunkLimit?: number; - sendPayload?: (payload: ReplyPayload) => Promise; - sendText: (text: string) => Promise; - sendMedia: (caption: string, mediaUrl: string) => Promise; + sendPayload?: ( + payload: ReplyPayload, + overrides?: { + replyToId?: string | null; + threadId?: string | number | null; + }, + ) => Promise; + sendText: ( + text: string, + overrides?: { + replyToId?: string | null; + threadId?: string | number | null; + }, + ) => Promise; + sendMedia: ( + caption: string, + mediaUrl: string, + overrides?: { + replyToId?: string | null; + threadId?: string | number | null; + }, + ) => Promise; }; type ChannelHandlerParams = { @@ -118,27 +137,35 @@ function createPluginHandler( const sendMedia = outbound.sendMedia; const chunker = outbound.chunker ?? null; const chunkerMode = outbound.chunkerMode; + const resolveCtx = (overrides?: { + replyToId?: string | null; + threadId?: string | number | null; + }): Omit => ({ + ...baseCtx, + replyToId: overrides?.replyToId ?? baseCtx.replyToId, + threadId: overrides?.threadId ?? baseCtx.threadId, + }); return { chunker, chunkerMode, textChunkLimit: outbound.textChunkLimit, sendPayload: outbound.sendPayload - ? async (payload) => + ? async (payload, overrides) => outbound.sendPayload!({ - ...baseCtx, + ...resolveCtx(overrides), text: payload.text ?? "", mediaUrl: payload.mediaUrl, payload, }) : undefined, - sendText: async (text) => + sendText: async (text, overrides) => sendText({ - ...baseCtx, + ...resolveCtx(overrides), text, }), - sendMedia: async (caption, mediaUrl) => + sendMedia: async (caption, mediaUrl, overrides) => sendMedia({ - ...baseCtx, + ...resolveCtx(overrides), text: caption, mediaUrl, }), @@ -302,10 +329,13 @@ async function deliverOutboundPayloadsCore( }) : undefined; - const sendTextChunks = async (text: string) => { + const sendTextChunks = async ( + text: string, + overrides?: { replyToId?: string | null; threadId?: string | number | null }, + ) => { throwIfAborted(abortSignal); if (!handler.chunker || textLimit === undefined) { - results.push(await handler.sendText(text)); + results.push(await handler.sendText(text, overrides)); return; } if (chunkMode === "newline") { @@ -325,7 +355,7 @@ async function deliverOutboundPayloadsCore( } for (const chunk of chunks) { throwIfAborted(abortSignal); - results.push(await handler.sendText(chunk)); + results.push(await handler.sendText(chunk, overrides)); } } return; @@ -333,7 +363,7 @@ async function deliverOutboundPayloadsCore( const chunks = handler.chunker(text, textLimit); for (const chunk of chunks) { throwIfAborted(abortSignal); - results.push(await handler.sendText(chunk)); + results.push(await handler.sendText(chunk, overrides)); } }; @@ -469,8 +499,12 @@ async function deliverOutboundPayloadsCore( } params.onPayload?.(payloadSummary); + const sendOverrides = { + replyToId: effectivePayload.replyToId ?? params.replyToId ?? undefined, + threadId: params.threadId ?? undefined, + }; if (handler.sendPayload && effectivePayload.channelData) { - results.push(await handler.sendPayload(effectivePayload)); + results.push(await handler.sendPayload(effectivePayload, sendOverrides)); emitMessageSent(true); continue; } @@ -478,7 +512,7 @@ async function deliverOutboundPayloadsCore( if (isSignalChannel) { await sendSignalTextChunks(payloadSummary.text); } else { - await sendTextChunks(payloadSummary.text); + await sendTextChunks(payloadSummary.text, sendOverrides); } emitMessageSent(true); continue; @@ -492,7 +526,7 @@ async function deliverOutboundPayloadsCore( if (isSignalChannel) { results.push(await sendSignalMedia(caption, url)); } else { - results.push(await handler.sendMedia(caption, url)); + results.push(await handler.sendMedia(caption, url, sendOverrides)); } } emitMessageSent(true); diff --git a/src/telegram/bot.create-telegram-bot.test.ts b/src/telegram/bot.create-telegram-bot.test.ts index 172b98fcff..c06b7cea6b 100644 --- a/src/telegram/bot.create-telegram-bot.test.ts +++ b/src/telegram/bot.create-telegram-bot.test.ts @@ -50,10 +50,6 @@ const TELEGRAM_TEST_TIMINGS = { textFragmentGapMs: 30, } as const; -const sleep = async (ms: number) => { - await new Promise((resolve) => setTimeout(resolve, ms)); -}; - describe("createTelegramBot", () => { beforeEach(() => { process.env.TZ = "UTC"; @@ -1570,7 +1566,7 @@ describe("createTelegramBot", () => { ).toBeUndefined(); } }); - it("honors replyToMode=first for threaded replies", async () => { + it("honors replyToMode=first for threaded replies across all chunks of the first reply", async () => { onSpy.mockReset(); sendMessageSpy.mockReset(); replySpy.mockReset(); @@ -1593,14 +1589,10 @@ describe("createTelegramBot", () => { }); expect(sendMessageSpy.mock.calls.length).toBeGreaterThan(1); - const [first, ...rest] = sendMessageSpy.mock.calls; - expect((first?.[2] as { reply_to_message_id?: number } | undefined)?.reply_to_message_id).toBe( - 101, - ); - for (const call of rest) { - expect( - (call[2] as { reply_to_message_id?: number } | undefined)?.reply_to_message_id, - ).toBeUndefined(); + for (const call of sendMessageSpy.mock.calls) { + expect((call[2] as { reply_to_message_id?: number } | undefined)?.reply_to_message_id).toBe( + 101, + ); } }); it("prefixes final replies with responsePrefix", async () => { @@ -1898,44 +1890,56 @@ describe("createTelegramBot", () => { }), ); - createTelegramBot({ token: "tok", testTimings: TELEGRAM_TEST_TIMINGS }); - const handler = getOnHandler("channel_post") as (ctx: Record) => Promise; + const setTimeoutSpy = vi.spyOn(globalThis, "setTimeout"); + try { + createTelegramBot({ token: "tok", testTimings: TELEGRAM_TEST_TIMINGS }); + const handler = getOnHandler("channel_post") as ( + ctx: Record, + ) => Promise; - const first = handler({ - channelPost: { - chat: { id: -100777111222, type: "channel", title: "Wake Channel" }, - message_id: 201, - caption: "album caption", - date: 1736380800, - media_group_id: "channel-album-1", - photo: [{ file_id: "p1" }], - }, - me: { username: "openclaw_bot" }, - getFile: async () => ({ file_path: "photos/p1.jpg" }), - }); + const first = handler({ + channelPost: { + chat: { id: -100777111222, type: "channel", title: "Wake Channel" }, + message_id: 201, + caption: "album caption", + date: 1736380800, + media_group_id: "channel-album-1", + photo: [{ file_id: "p1" }], + }, + me: { username: "openclaw_bot" }, + getFile: async () => ({ file_path: "photos/p1.jpg" }), + }); - const second = handler({ - channelPost: { - chat: { id: -100777111222, type: "channel", title: "Wake Channel" }, - message_id: 202, - date: 1736380801, - media_group_id: "channel-album-1", - photo: [{ file_id: "p2" }], - }, - me: { username: "openclaw_bot" }, - getFile: async () => ({ file_path: "photos/p2.jpg" }), - }); + const second = handler({ + channelPost: { + chat: { id: -100777111222, type: "channel", title: "Wake Channel" }, + message_id: 202, + date: 1736380801, + media_group_id: "channel-album-1", + photo: [{ file_id: "p2" }], + }, + me: { username: "openclaw_bot" }, + getFile: async () => ({ file_path: "photos/p2.jpg" }), + }); - await Promise.all([first, second]); - expect(replySpy).not.toHaveBeenCalled(); - await sleep(TELEGRAM_TEST_TIMINGS.mediaGroupFlushMs + 80); + await Promise.all([first, second]); + expect(replySpy).not.toHaveBeenCalled(); - expect(replySpy).toHaveBeenCalledTimes(1); - const payload = replySpy.mock.calls[0]?.[0] as { Body?: string; MediaPaths?: string[] }; - expect(payload.Body).toContain("album caption"); - expect(payload.MediaPaths).toHaveLength(2); + const flushTimerCall = [...setTimeoutSpy.mock.calls] + .toReversed() + .find((call) => call[1] === TELEGRAM_TEST_TIMINGS.mediaGroupFlushMs); + const flushTimer = flushTimerCall?.[0] as (() => unknown) | undefined; + expect(flushTimer).toBeTypeOf("function"); + await flushTimer?.(); - fetchSpy.mockRestore(); + expect(replySpy).toHaveBeenCalledTimes(1); + const payload = replySpy.mock.calls[0]?.[0] as { Body?: string; MediaPaths?: string[] }; + expect(payload.Body).toContain("album caption"); + expect(payload.MediaPaths).toHaveLength(2); + } finally { + setTimeoutSpy.mockRestore(); + fetchSpy.mockRestore(); + } }); it("coalesces channel_post near-limit text fragments into one message", async () => { onSpy.mockReset(); @@ -1955,41 +1959,48 @@ describe("createTelegramBot", () => { }, }); - createTelegramBot({ token: "tok", testTimings: TELEGRAM_TEST_TIMINGS }); - const handler = getOnHandler("channel_post") as (ctx: Record) => Promise; + vi.useFakeTimers(); + try { + createTelegramBot({ token: "tok", testTimings: TELEGRAM_TEST_TIMINGS }); + const handler = getOnHandler("channel_post") as ( + ctx: Record, + ) => Promise; - const part1 = "A".repeat(4050); - const part2 = "B".repeat(50); + const part1 = "A".repeat(4050); + const part2 = "B".repeat(50); - await handler({ - channelPost: { - chat: { id: -100777111222, type: "channel", title: "Wake Channel" }, - message_id: 301, - date: 1736380800, - text: part1, - }, - me: { username: "openclaw_bot" }, - getFile: async () => ({}), - }); + await handler({ + channelPost: { + chat: { id: -100777111222, type: "channel", title: "Wake Channel" }, + message_id: 301, + date: 1736380800, + text: part1, + }, + me: { username: "openclaw_bot" }, + getFile: async () => ({}), + }); - await handler({ - channelPost: { - chat: { id: -100777111222, type: "channel", title: "Wake Channel" }, - message_id: 302, - date: 1736380801, - text: part2, - }, - me: { username: "openclaw_bot" }, - getFile: async () => ({}), - }); + await handler({ + channelPost: { + chat: { id: -100777111222, type: "channel", title: "Wake Channel" }, + message_id: 302, + date: 1736380801, + text: part2, + }, + me: { username: "openclaw_bot" }, + getFile: async () => ({}), + }); - expect(replySpy).not.toHaveBeenCalled(); - await sleep(TELEGRAM_TEST_TIMINGS.textFragmentGapMs + 100); + expect(replySpy).not.toHaveBeenCalled(); + await vi.advanceTimersByTimeAsync(TELEGRAM_TEST_TIMINGS.textFragmentGapMs + 100); - expect(replySpy).toHaveBeenCalledTimes(1); - const payload = replySpy.mock.calls[0]?.[0] as { RawBody?: string }; - expect(payload.RawBody).toContain(part1.slice(0, 32)); - expect(payload.RawBody).toContain(part2.slice(0, 32)); + expect(replySpy).toHaveBeenCalledTimes(1); + const payload = replySpy.mock.calls[0]?.[0] as { RawBody?: string }; + expect(payload.RawBody).toContain(part1.slice(0, 32)); + expect(payload.RawBody).toContain(part2.slice(0, 32)); + } finally { + vi.useRealTimers(); + } }); it("drops oversized channel_post media instead of dispatching a placeholder message", async () => { onSpy.mockReset(); diff --git a/src/telegram/bot/delivery.ts b/src/telegram/bot/delivery.ts index a6fe055cdf..b5fbfc434e 100644 --- a/src/telegram/bot/delivery.ts +++ b/src/telegram/bot/delivery.ts @@ -104,6 +104,8 @@ export async function deliverReplies(params: { continue; } const replyToId = replyToMode === "off" ? undefined : resolveTelegramReplyId(reply.replyToId); + const replyToMessageIdForPayload = + replyToId && (replyToMode === "all" || !hasReplied) ? replyToId : undefined; const mediaList = reply.mediaUrls?.length ? reply.mediaUrls : reply.mediaUrl @@ -115,6 +117,7 @@ export async function deliverReplies(params: { const replyMarkup = buildInlineKeyboard(telegramData?.buttons); if (mediaList.length === 0) { const chunks = chunkText(reply.text || ""); + let sentTextChunk = false; for (let i = 0; i < chunks.length; i += 1) { const chunk = chunks[i]; if (!chunk) { @@ -123,8 +126,7 @@ export async function deliverReplies(params: { // Only attach buttons to the first chunk. const shouldAttachButtons = i === 0 && replyMarkup; await sendTelegramText(bot, chatId, chunk.html, runtime, { - replyToMessageId: - replyToId && (replyToMode === "all" || !hasReplied) ? replyToId : undefined, + replyToMessageId: replyToMessageIdForPayload, replyQuoteText, thread, textMode: "html", @@ -132,10 +134,11 @@ export async function deliverReplies(params: { linkPreview, replyMarkup: shouldAttachButtons ? replyMarkup : undefined, }); + sentTextChunk = true; markDelivered(); - if (replyToId && !hasReplied) { - hasReplied = true; - } + } + if (replyToMessageIdForPayload && !hasReplied && sentTextChunk) { + hasReplied = true; } continue; } @@ -167,8 +170,7 @@ export async function deliverReplies(params: { pendingFollowUpText = followUpText; } first = false; - const replyToMessageId = - replyToId && (replyToMode === "all" || !hasReplied) ? replyToId : undefined; + const replyToMessageId = replyToMessageIdForPayload; const shouldAttachButtonsToMedia = isFirstMedia && replyMarkup && !followUpText; const mediaParams: Record = { caption: htmlCaption, @@ -231,20 +233,21 @@ export async function deliverReplies(params: { logVerbose( "telegram sendVoice forbidden (recipient has voice messages blocked in privacy settings); falling back to text", ); - hasReplied = await sendTelegramVoiceFallbackText({ + await sendTelegramVoiceFallbackText({ bot, chatId, runtime, text: fallbackText, chunkText, - replyToId, - replyToMode, - hasReplied, + replyToId: replyToMessageIdForPayload, thread, linkPreview, replyMarkup, replyQuoteText, }); + if (replyToMessageIdForPayload && !hasReplied) { + hasReplied = true; + } markDelivered(); // Skip this media item; continue with next. continue; @@ -277,10 +280,8 @@ export async function deliverReplies(params: { const chunks = chunkText(pendingFollowUpText); for (let i = 0; i < chunks.length; i += 1) { const chunk = chunks[i]; - const replyToMessageIdFollowup = - replyToId && (replyToMode === "all" || !hasReplied) ? replyToId : undefined; await sendTelegramText(bot, chatId, chunk.html, runtime, { - replyToMessageId: replyToMessageIdFollowup, + replyToMessageId: replyToMessageIdForPayload, thread, textMode: "html", plainText: chunk.text, @@ -288,12 +289,12 @@ export async function deliverReplies(params: { replyMarkup: i === 0 ? replyMarkup : undefined, }); markDelivered(); - if (replyToId && !hasReplied) { - hasReplied = true; - } } pendingFollowUpText = undefined; } + if (replyToMessageIdForPayload && !hasReplied) { + hasReplied = true; + } } } @@ -485,20 +486,16 @@ async function sendTelegramVoiceFallbackText(opts: { text: string; chunkText: (markdown: string) => ReturnType; replyToId?: number; - replyToMode: ReplyToMode; - hasReplied: boolean; thread?: TelegramThreadSpec | null; linkPreview?: boolean; replyMarkup?: ReturnType; replyQuoteText?: string; -}): Promise { +}): Promise { const chunks = opts.chunkText(opts.text); - let hasReplied = opts.hasReplied; for (let i = 0; i < chunks.length; i += 1) { const chunk = chunks[i]; await sendTelegramText(opts.bot, opts.chatId, chunk.html, opts.runtime, { - replyToMessageId: - opts.replyToId && (opts.replyToMode === "all" || !hasReplied) ? opts.replyToId : undefined, + replyToMessageId: opts.replyToId, replyQuoteText: opts.replyQuoteText, thread: opts.thread, textMode: "html", @@ -506,11 +503,7 @@ async function sendTelegramVoiceFallbackText(opts: { linkPreview: opts.linkPreview, replyMarkup: i === 0 ? opts.replyMarkup : undefined, }); - if (opts.replyToId && !hasReplied) { - hasReplied = true; - } } - return hasReplied; } function buildTelegramSendParams(opts?: { diff --git a/ui/src/ui/app-render.helpers.node.test.ts b/ui/src/ui/app-render.helpers.node.test.ts index b8698abee8..7bea77067e 100644 --- a/ui/src/ui/app-render.helpers.node.test.ts +++ b/ui/src/ui/app-render.helpers.node.test.ts @@ -234,6 +234,24 @@ describe("resolveSessionDisplayName", () => { ).toBe("Cron: Nightly Sync"); }); + it("does not double-prefix cron labels that already include Cron:", () => { + expect( + resolveSessionDisplayName( + "agent:main:cron:abc-123", + row({ key: "agent:main:cron:abc-123", label: "Cron: Nightly Sync" }), + ), + ).toBe("Cron: Nightly Sync"); + }); + + it("does not double-prefix subagent display names that already include Subagent:", () => { + expect( + resolveSessionDisplayName( + "agent:main:subagent:abc-123", + row({ key: "agent:main:subagent:abc-123", displayName: "Subagent: Runner" }), + ), + ).toBe("Subagent: Runner"); + }); + it("does not prefix non-typed sessions with labels", () => { expect( resolveSessionDisplayName( diff --git a/ui/src/ui/app-render.helpers.ts b/ui/src/ui/app-render.helpers.ts index d4c3655813..d954147297 100644 --- a/ui/src/ui/app-render.helpers.ts +++ b/ui/src/ui/app-render.helpers.ts @@ -332,11 +332,19 @@ export function resolveSessionDisplayName( const displayName = row?.displayName?.trim() || ""; const { prefix, fallbackName } = parseSessionKey(key); + const applyTypedPrefix = (name: string): string => { + if (!prefix) { + return name; + } + const prefixPattern = new RegExp(`^${prefix.replace(/[.*+?^${}()|[\\]\\]/g, "\\$&")}\\s*`, "i"); + return prefixPattern.test(name) ? name : `${prefix} ${name}`; + }; + if (label && label !== key) { - return prefix ? `${prefix} ${label}` : label; + return applyTypedPrefix(label); } if (displayName && displayName !== key) { - return prefix ? `${prefix} ${displayName}` : displayName; + return applyTypedPrefix(displayName); } return fallbackName; }