diff --git a/docs/concepts/session-tool.md b/docs/concepts/session-tool.md index 6a4fcad944..945f3883f6 100644 --- a/docs/concepts/session-tool.md +++ b/docs/concepts/session-tool.md @@ -94,6 +94,7 @@ Behavior: - Announce delivery runs after the primary run completes and is best-effort; `status: "ok"` does not guarantee the announce was delivered. - Waits via gateway `agent.wait` (server-side) so reconnects don't drop the wait. - Agent-to-agent message context is injected for the primary run. +- Inter-session messages are persisted with `message.provenance.kind = "inter_session"` so transcript readers can distinguish routed agent instructions from external user input. - After the primary run completes, OpenClaw runs a **reply-back loop**: - Round 2+ alternates between requester and target agents. - Reply exactly `REPLY_SKIP` to stop the ping‑pong. diff --git a/docs/reference/transcript-hygiene.md b/docs/reference/transcript-hygiene.md index 078c01ed43..fd23d9c193 100644 --- a/docs/reference/transcript-hygiene.md +++ b/docs/reference/transcript-hygiene.md @@ -24,6 +24,7 @@ Scope includes: - Turn validation / ordering - Thought signature cleanup - Image payload sanitization +- User-input provenance tagging (for inter-session routed prompts) If you need transcript storage details, see: @@ -72,6 +73,23 @@ Implementation: --- +## Global rule: inter-session input provenance + +When an agent sends a prompt into another session via `sessions_send` (including +agent-to-agent reply/announce steps), OpenClaw persists the created user turn with: + +- `message.provenance.kind = "inter_session"` + +This metadata is written at transcript append time and does not change role +(`role: "user"` remains for provider compatibility). Transcript readers can use +this to avoid treating routed internal prompts as end-user-authored instructions. + +During context rebuild, OpenClaw also prepends a short `[Inter-session message]` +marker to those user turns in-memory so the model can distinguish them from +external end-user instructions. + +--- + ## Provider matrix (current behavior) **OpenAI / OpenAI Codex** diff --git a/src/agents/openclaw-tools.sessions.test.ts b/src/agents/openclaw-tools.sessions.test.ts index f1a0aea89e..972bc73d77 100644 --- a/src/agents/openclaw-tools.sessions.test.ts +++ b/src/agents/openclaw-tools.sessions.test.ts @@ -475,6 +475,7 @@ describe("sessions tools", () => { expect(call.params).toMatchObject({ lane: "nested", channel: "webchat", + inputProvenance: { kind: "inter_session" }, }); } expect( @@ -652,6 +653,7 @@ describe("sessions tools", () => { expect(call.params).toMatchObject({ lane: "nested", channel: "webchat", + inputProvenance: { kind: "inter_session" }, }); } diff --git a/src/agents/pi-embedded-runner.sanitize-session-history.test.ts b/src/agents/pi-embedded-runner.sanitize-session-history.test.ts index d8efba99a2..791525a64d 100644 --- a/src/agents/pi-embedded-runner.sanitize-session-history.test.ts +++ b/src/agents/pi-embedded-runner.sanitize-session-history.test.ts @@ -112,6 +112,36 @@ describe("sanitizeSessionHistory", () => { ); }); + it("annotates inter-session user messages before context sanitization", async () => { + vi.mocked(helpers.isGoogleModelApi).mockReturnValue(false); + + const messages: AgentMessage[] = [ + { + role: "user", + content: "forwarded instruction", + provenance: { + kind: "inter_session", + sourceSessionKey: "agent:main:req", + sourceTool: "sessions_send", + }, + } as unknown as AgentMessage, + ]; + + const result = await sanitizeSessionHistory({ + messages, + modelApi: "openai-responses", + provider: "openai", + sessionManager: mockSessionManager, + sessionId: "test-session", + }); + + const first = result[0] as Extract; + expect(first.role).toBe("user"); + expect(typeof first.content).toBe("string"); + expect(first.content as string).toContain("[Inter-session message]"); + expect(first.content as string).toContain("sourceSession=agent:main:req"); + }); + it("keeps reasoning-only assistant messages for openai-responses", async () => { vi.mocked(helpers.isGoogleModelApi).mockReturnValue(false); diff --git a/src/agents/pi-embedded-runner/google.ts b/src/agents/pi-embedded-runner/google.ts index fd18326354..91f40e1213 100644 --- a/src/agents/pi-embedded-runner/google.ts +++ b/src/agents/pi-embedded-runner/google.ts @@ -4,6 +4,10 @@ import type { TSchema } from "@sinclair/typebox"; import { EventEmitter } from "node:events"; import type { TranscriptPolicy } from "../transcript-policy.js"; import { registerUnhandledRejectionHandler } from "../../infra/unhandled-rejections.js"; +import { + hasInterSessionUserProvenance, + normalizeInputProvenance, +} from "../../sessions/input-provenance.js"; import { downgradeOpenAIReasoningBlocks, isCompactionFailureError, @@ -44,6 +48,7 @@ const GOOGLE_SCHEMA_UNSUPPORTED_KEYWORDS = new Set([ "maxProperties", ]); const ANTIGRAVITY_SIGNATURE_RE = /^[A-Za-z0-9+/]+={0,2}$/; +const INTER_SESSION_PREFIX_BASE = "[Inter-session message]"; function isValidAntigravitySignature(value: unknown): value is string { if (typeof value !== "string") { @@ -119,6 +124,85 @@ export function sanitizeAntigravityThinkingBlocks(messages: AgentMessage[]): Age return touched ? out : messages; } +function buildInterSessionPrefix(message: AgentMessage): string { + const provenance = normalizeInputProvenance((message as { provenance?: unknown }).provenance); + if (!provenance) { + return INTER_SESSION_PREFIX_BASE; + } + const details = [ + provenance.sourceSessionKey ? `sourceSession=${provenance.sourceSessionKey}` : undefined, + provenance.sourceChannel ? `sourceChannel=${provenance.sourceChannel}` : undefined, + provenance.sourceTool ? `sourceTool=${provenance.sourceTool}` : undefined, + ].filter(Boolean); + if (details.length === 0) { + return INTER_SESSION_PREFIX_BASE; + } + return `${INTER_SESSION_PREFIX_BASE} ${details.join(" ")}`; +} + +function annotateInterSessionUserMessages(messages: AgentMessage[]): AgentMessage[] { + let touched = false; + const out: AgentMessage[] = []; + for (const msg of messages) { + if (!hasInterSessionUserProvenance(msg as { role?: unknown; provenance?: unknown })) { + out.push(msg); + continue; + } + const prefix = buildInterSessionPrefix(msg); + const user = msg as Extract; + if (typeof user.content === "string") { + if (user.content.startsWith(prefix)) { + out.push(msg); + continue; + } + touched = true; + out.push({ + ...(msg as unknown as Record), + content: `${prefix}\n${user.content}`, + } as AgentMessage); + continue; + } + if (!Array.isArray(user.content)) { + out.push(msg); + continue; + } + + const textIndex = user.content.findIndex( + (block) => + block && + typeof block === "object" && + (block as { type?: unknown }).type === "text" && + typeof (block as { text?: unknown }).text === "string", + ); + + if (textIndex >= 0) { + const existing = user.content[textIndex] as { type: "text"; text: string }; + if (existing.text.startsWith(prefix)) { + out.push(msg); + continue; + } + const nextContent = [...user.content]; + nextContent[textIndex] = { + ...existing, + text: `${prefix}\n${existing.text}`, + }; + touched = true; + out.push({ + ...(msg as unknown as Record), + content: nextContent, + } as AgentMessage); + continue; + } + + touched = true; + out.push({ + ...(msg as unknown as Record), + content: [{ type: "text", text: prefix }, ...user.content], + } as AgentMessage); + } + return touched ? out : messages; +} + function findUnsupportedSchemaKeywords(schema: unknown, path: string): string[] { if (!schema || typeof schema !== "object") { return []; @@ -358,13 +442,18 @@ export async function sanitizeSessionHistory(params: { provider: params.provider, modelId: params.modelId, }); - const sanitizedImages = await sanitizeSessionMessagesImages(params.messages, "session:history", { - sanitizeMode: policy.sanitizeMode, - sanitizeToolCallIds: policy.sanitizeToolCallIds, - toolCallIdMode: policy.toolCallIdMode, - preserveSignatures: policy.preserveSignatures, - sanitizeThoughtSignatures: policy.sanitizeThoughtSignatures, - }); + const withInterSessionMarkers = annotateInterSessionUserMessages(params.messages); + const sanitizedImages = await sanitizeSessionMessagesImages( + withInterSessionMarkers, + "session:history", + { + sanitizeMode: policy.sanitizeMode, + sanitizeToolCallIds: policy.sanitizeToolCallIds, + toolCallIdMode: policy.toolCallIdMode, + preserveSignatures: policy.preserveSignatures, + sanitizeThoughtSignatures: policy.sanitizeThoughtSignatures, + }, + ); const sanitizedThinking = policy.normalizeAntigravityThinkingBlocks ? sanitizeAntigravityThinkingBlocks(sanitizedImages) : sanitizedImages; diff --git a/src/agents/pi-embedded-runner/run.ts b/src/agents/pi-embedded-runner/run.ts index 467ddba5d9..6cbd3dd4ca 100644 --- a/src/agents/pi-embedded-runner/run.ts +++ b/src/agents/pi-embedded-runner/run.ts @@ -470,6 +470,7 @@ export async function runEmbeddedPiAgent( onToolResult: params.onToolResult, onAgentEvent: params.onAgentEvent, extraSystemPrompt: params.extraSystemPrompt, + inputProvenance: params.inputProvenance, streamParams: params.streamParams, ownerNumbers: params.ownerNumbers, enforceFinalTag: params.enforceFinalTag, diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index 79384f6c47..c1adef08b5 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -428,6 +428,7 @@ export async function runEmbeddedAttempt( sessionManager = guardSessionManager(SessionManager.open(params.sessionFile), { agentId: sessionAgentId, sessionKey: params.sessionKey, + inputProvenance: params.inputProvenance, allowSyntheticToolResults: transcriptPolicy.allowSyntheticToolResults, }); trackSessionManagerAccess(params.sessionFile); diff --git a/src/agents/pi-embedded-runner/run/params.ts b/src/agents/pi-embedded-runner/run/params.ts index f56f3ecac2..c49f7fb656 100644 --- a/src/agents/pi-embedded-runner/run/params.ts +++ b/src/agents/pi-embedded-runner/run/params.ts @@ -3,6 +3,7 @@ import type { ReasoningLevel, ThinkLevel, VerboseLevel } from "../../../auto-rep import type { AgentStreamParams } from "../../../commands/agent/types.js"; import type { OpenClawConfig } from "../../../config/config.js"; import type { enqueueCommand } from "../../../process/command-queue.js"; +import type { InputProvenance } from "../../../sessions/input-provenance.js"; import type { ExecElevatedDefaults, ExecToolDefaults } from "../../bash-tools.js"; import type { BlockReplyChunking, ToolResultFormat } from "../../pi-embedded-subscribe.js"; import type { SkillSnapshot } from "../../skills.js"; @@ -99,6 +100,7 @@ export type RunEmbeddedPiAgentParams = { lane?: string; enqueue?: typeof enqueueCommand; extraSystemPrompt?: string; + inputProvenance?: InputProvenance; streamParams?: AgentStreamParams; ownerNumbers?: string[]; enforceFinalTag?: boolean; diff --git a/src/agents/pi-embedded-runner/run/types.ts b/src/agents/pi-embedded-runner/run/types.ts index 5cfc8bbca1..5201492b12 100644 --- a/src/agents/pi-embedded-runner/run/types.ts +++ b/src/agents/pi-embedded-runner/run/types.ts @@ -4,6 +4,7 @@ import type { ReasoningLevel, ThinkLevel, VerboseLevel } from "../../../auto-rep import type { AgentStreamParams } from "../../../commands/agent/types.js"; import type { OpenClawConfig } from "../../../config/config.js"; import type { SessionSystemPromptReport } from "../../../config/sessions/types.js"; +import type { InputProvenance } from "../../../sessions/input-provenance.js"; import type { ExecElevatedDefaults, ExecToolDefaults } from "../../bash-tools.js"; import type { MessagingToolSend } from "../../pi-embedded-messaging.js"; import type { BlockReplyChunking, ToolResultFormat } from "../../pi-embedded-subscribe.js"; @@ -87,6 +88,7 @@ export type EmbeddedRunAttemptParams = { /** If true, omit the message tool from the tool list. */ disableMessageTool?: boolean; extraSystemPrompt?: string; + inputProvenance?: InputProvenance; streamParams?: AgentStreamParams; ownerNumbers?: string[]; enforceFinalTag?: boolean; diff --git a/src/agents/session-tool-result-guard-wrapper.ts b/src/agents/session-tool-result-guard-wrapper.ts index 79b6e30237..32bfd27d35 100644 --- a/src/agents/session-tool-result-guard-wrapper.ts +++ b/src/agents/session-tool-result-guard-wrapper.ts @@ -1,5 +1,9 @@ import type { SessionManager } from "@mariozechner/pi-coding-agent"; import { getGlobalHookRunner } from "../plugins/hook-runner-global.js"; +import { + applyInputProvenanceToUserMessage, + type InputProvenance, +} from "../sessions/input-provenance.js"; import { installSessionToolResultGuard } from "./session-tool-result-guard.js"; export type GuardedSessionManager = SessionManager & { @@ -16,6 +20,7 @@ export function guardSessionManager( opts?: { agentId?: string; sessionKey?: string; + inputProvenance?: InputProvenance; allowSyntheticToolResults?: boolean; }, ): GuardedSessionManager { @@ -46,6 +51,8 @@ export function guardSessionManager( : undefined; const guard = installSessionToolResultGuard(sessionManager, { + transformMessageForPersistence: (message) => + applyInputProvenanceToUserMessage(message, opts?.inputProvenance), transformToolResultForPersistence: transform, allowSyntheticToolResults: opts?.allowSyntheticToolResults, }); diff --git a/src/agents/session-tool-result-guard.test.ts b/src/agents/session-tool-result-guard.test.ts index 2f0bc2a02f..e20c2fe3ba 100644 --- a/src/agents/session-tool-result-guard.test.ts +++ b/src/agents/session-tool-result-guard.test.ts @@ -269,4 +269,34 @@ describe("installSessionToolResultGuard", () => { }; expect(textBlock.text).toBe(originalText); }); + + it("applies message persistence transform to user messages", () => { + const sm = SessionManager.inMemory(); + installSessionToolResultGuard(sm, { + transformMessageForPersistence: (message) => + (message as { role?: string }).role === "user" + ? ({ + ...(message as unknown as Record), + provenance: { kind: "inter_session", sourceTool: "sessions_send" }, + } as AgentMessage) + : message, + }); + + sm.appendMessage( + asAppendMessage({ + role: "user", + content: "forwarded", + timestamp: Date.now(), + }), + ); + + const persisted = sm.getEntries().find((e) => e.type === "message") as + | { message?: Record } + | undefined; + expect(persisted?.message?.role).toBe("user"); + expect(persisted?.message?.provenance).toEqual({ + kind: "inter_session", + sourceTool: "sessions_send", + }); + }); }); diff --git a/src/agents/session-tool-result-guard.ts b/src/agents/session-tool-result-guard.ts index 72661a59ff..bbb2b0ff2d 100644 --- a/src/agents/session-tool-result-guard.ts +++ b/src/agents/session-tool-result-guard.ts @@ -113,6 +113,10 @@ function extractToolResultId(msg: Extract) export function installSessionToolResultGuard( sessionManager: SessionManager, opts?: { + /** + * Optional transform applied to any message before persistence. + */ + transformMessageForPersistence?: (message: AgentMessage) => AgentMessage; /** * Optional, synchronous transform applied to toolResult messages *before* they are * persisted to the session transcript. @@ -133,6 +137,10 @@ export function installSessionToolResultGuard( } { const originalAppend = sessionManager.appendMessage.bind(sessionManager); const pending = new Map(); + const persistMessage = (message: AgentMessage) => { + const transformer = opts?.transformMessageForPersistence; + return transformer ? transformer(message) : message; + }; const persistToolResult = ( message: AgentMessage, @@ -152,7 +160,7 @@ export function installSessionToolResultGuard( for (const [id, name] of pending.entries()) { const synthetic = makeMissingToolResult({ toolCallId: id, toolName: name }); originalAppend( - persistToolResult(synthetic, { + persistToolResult(persistMessage(synthetic), { toolCallId: id, toolName: name, isSynthetic: true, @@ -186,7 +194,7 @@ export function installSessionToolResultGuard( } // Apply hard size cap before persistence to prevent oversized tool results // from consuming the entire context window on subsequent LLM calls. - const capped = capToolResultSize(nextMessage); + const capped = capToolResultSize(persistMessage(nextMessage)); return originalAppend( persistToolResult(capped, { toolCallId: id ?? undefined, @@ -212,7 +220,7 @@ export function installSessionToolResultGuard( } } - const result = originalAppend(nextMessage as never); + const result = originalAppend(persistMessage(nextMessage) as never); const sessionFile = ( sessionManager as { getSessionFile?: () => string | null } diff --git a/src/agents/tools/agent-step.ts b/src/agents/tools/agent-step.ts index 5193fe519b..98b688d06c 100644 --- a/src/agents/tools/agent-step.ts +++ b/src/agents/tools/agent-step.ts @@ -24,6 +24,9 @@ export async function runAgentStep(params: { timeoutMs: number; channel?: string; lane?: string; + sourceSessionKey?: string; + sourceChannel?: string; + sourceTool?: string; }): Promise { const stepIdem = crypto.randomUUID(); const response = await callGateway<{ runId?: string }>({ @@ -36,6 +39,12 @@ export async function runAgentStep(params: { channel: params.channel ?? INTERNAL_MESSAGE_CHANNEL, lane: params.lane ?? AGENT_LANE_NESTED, extraSystemPrompt: params.extraSystemPrompt, + inputProvenance: { + kind: "inter_session", + sourceSessionKey: params.sourceSessionKey, + sourceChannel: params.sourceChannel, + sourceTool: params.sourceTool ?? "sessions_send", + }, }, timeoutMs: 10_000, }); diff --git a/src/agents/tools/sessions-send-tool.a2a.ts b/src/agents/tools/sessions-send-tool.a2a.ts index 2157e8461b..f6e428ec8d 100644 --- a/src/agents/tools/sessions-send-tool.a2a.ts +++ b/src/agents/tools/sessions-send-tool.a2a.ts @@ -83,6 +83,10 @@ export async function runSessionsSendA2AFlow(params: { extraSystemPrompt: replyPrompt, timeoutMs: params.announceTimeoutMs, lane: AGENT_LANE_NESTED, + sourceSessionKey: nextSessionKey, + sourceChannel: + nextSessionKey === params.requesterSessionKey ? params.requesterChannel : targetChannel, + sourceTool: "sessions_send", }); if (!replyText || isReplySkip(replyText)) { break; @@ -110,6 +114,9 @@ export async function runSessionsSendA2AFlow(params: { extraSystemPrompt: announcePrompt, timeoutMs: params.announceTimeoutMs, lane: AGENT_LANE_NESTED, + sourceSessionKey: params.requesterSessionKey, + sourceChannel: params.requesterChannel, + sourceTool: "sessions_send", }); if (announceTarget && announceReply && announceReply.trim() && !isAnnounceSkip(announceReply)) { try { diff --git a/src/agents/tools/sessions-send-tool.ts b/src/agents/tools/sessions-send-tool.ts index de97e2a368..e871847fb6 100644 --- a/src/agents/tools/sessions-send-tool.ts +++ b/src/agents/tools/sessions-send-tool.ts @@ -260,6 +260,12 @@ export function createSessionsSendTool(opts?: { channel: INTERNAL_MESSAGE_CHANNEL, lane: AGENT_LANE_NESTED, extraSystemPrompt: agentMessageContext, + inputProvenance: { + kind: "inter_session", + sourceSessionKey: opts?.agentSessionKey, + sourceChannel: opts?.agentChannel, + sourceTool: "sessions_send", + }, }; const requesterSessionKey = opts?.agentSessionKey; const requesterChannel = opts?.agentChannel; diff --git a/src/commands/agent.ts b/src/commands/agent.ts index 023ca94b46..fb919cd3ae 100644 --- a/src/commands/agent.ts +++ b/src/commands/agent.ts @@ -449,6 +449,7 @@ export async function agentCommand( lane: opts.lane, abortSignal: opts.abortSignal, extraSystemPrompt: opts.extraSystemPrompt, + inputProvenance: opts.inputProvenance, streamParams: opts.streamParams, agentDir, onAgentEvent: (evt) => { diff --git a/src/commands/agent/types.ts b/src/commands/agent/types.ts index e59c88725d..5dbe3d63a0 100644 --- a/src/commands/agent/types.ts +++ b/src/commands/agent/types.ts @@ -1,5 +1,6 @@ import type { ClientToolDefinition } from "../../agents/pi-embedded-runner/run/params.js"; import type { ChannelOutboundTargetMode } from "../../channels/plugins/types.js"; +import type { InputProvenance } from "../../sessions/input-provenance.js"; /** Image content block for Claude API multimodal messages. */ export type ImageContent = { @@ -72,6 +73,7 @@ export type AgentCommandOpts = { lane?: string; runId?: string; extraSystemPrompt?: string; + inputProvenance?: InputProvenance; /** Per-call stream param overrides (best-effort). */ streamParams?: AgentStreamParams; }; diff --git a/src/gateway/protocol/schema/agent.ts b/src/gateway/protocol/schema/agent.ts index f82f4f98e5..fbb34bee33 100644 --- a/src/gateway/protocol/schema/agent.ts +++ b/src/gateway/protocol/schema/agent.ts @@ -1,4 +1,5 @@ import { Type } from "@sinclair/typebox"; +import { INPUT_PROVENANCE_KIND_VALUES } from "../../../sessions/input-provenance.js"; import { NonEmptyString, SessionLabelString } from "./primitives.js"; export const AgentEventSchema = Type.Object( @@ -64,6 +65,17 @@ export const AgentParamsSchema = Type.Object( timeout: Type.Optional(Type.Integer({ minimum: 0 })), lane: Type.Optional(Type.String()), extraSystemPrompt: Type.Optional(Type.String()), + inputProvenance: Type.Optional( + Type.Object( + { + kind: Type.String({ enum: [...INPUT_PROVENANCE_KIND_VALUES] }), + sourceSessionKey: Type.Optional(Type.String()), + sourceChannel: Type.Optional(Type.String()), + sourceTool: Type.Optional(Type.String()), + }, + { additionalProperties: false }, + ), + ), idempotencyKey: NonEmptyString, label: Type.Optional(SessionLabelString), spawnedBy: Type.Optional(Type.String()), diff --git a/src/gateway/server-methods/agent.ts b/src/gateway/server-methods/agent.ts index 3f828103ab..6319a61025 100644 --- a/src/gateway/server-methods/agent.ts +++ b/src/gateway/server-methods/agent.ts @@ -17,6 +17,7 @@ import { } from "../../infra/outbound/agent-delivery.js"; import { normalizeAgentId } from "../../routing/session-key.js"; import { defaultRuntime } from "../../runtime.js"; +import { normalizeInputProvenance, type InputProvenance } from "../../sessions/input-provenance.js"; import { resolveSendPolicy } from "../../sessions/send-policy.js"; import { normalizeSessionDeliveryFields } from "../../utils/delivery-context.js"; import { @@ -85,6 +86,7 @@ export const agentHandlers: GatewayRequestHandlers = { timeout?: number; label?: string; spawnedBy?: string; + inputProvenance?: InputProvenance; }; const cfg = loadConfig(); const idem = request.idempotencyKey; @@ -97,6 +99,7 @@ export const agentHandlers: GatewayRequestHandlers = { let resolvedGroupSpace: string | undefined = groupSpaceRaw || undefined; let spawnedByValue = typeof request.spawnedBy === "string" ? request.spawnedBy.trim() : undefined; + const inputProvenance = normalizeInputProvenance(request.inputProvenance); const cached = context.dedupe.get(`agent:${idem}`); if (cached) { respond(cached.ok, cached.payload, cached.error, { @@ -400,6 +403,7 @@ export const agentHandlers: GatewayRequestHandlers = { runId, lane: request.lane, extraSystemPrompt: request.extraSystemPrompt, + inputProvenance, }, defaultRuntime, context.deps, diff --git a/src/gateway/server.sessions-send.e2e.test.ts b/src/gateway/server.sessions-send.e2e.test.ts index 52a3d380e1..58f7d65b19 100644 --- a/src/gateway/server.sessions-send.e2e.test.ts +++ b/src/gateway/server.sessions-send.e2e.test.ts @@ -9,6 +9,7 @@ import { getFreePort, installGatewayTestHooks, startGatewayServer, + testState, } from "./test-helpers.js"; installGatewayTestHooks({ scope: "suite" }); @@ -17,13 +18,15 @@ let server: Awaited>; let gatewayPort: number; let prevGatewayPort: string | undefined; let prevGatewayToken: string | undefined; +const gatewayToken = "test-token"; beforeAll(async () => { prevGatewayPort = process.env.OPENCLAW_GATEWAY_PORT; prevGatewayToken = process.env.OPENCLAW_GATEWAY_TOKEN; gatewayPort = await getFreePort(); + testState.gatewayAuth = { mode: "token", token: gatewayToken }; process.env.OPENCLAW_GATEWAY_PORT = String(gatewayPort); - process.env.OPENCLAW_GATEWAY_TOKEN = "test-token"; + process.env.OPENCLAW_GATEWAY_TOKEN = gatewayToken; server = await startGatewayServer(gatewayPort); }); @@ -105,8 +108,14 @@ describe("sessions_send gateway loopback", () => { expect(details.reply).toBe("pong"); expect(details.sessionKey).toBe("main"); - const firstCall = spy.mock.calls[0]?.[0] as { lane?: string } | undefined; + const firstCall = spy.mock.calls[0]?.[0] as + | { lane?: string; inputProvenance?: { kind?: string; sourceTool?: string } } + | undefined; expect(firstCall?.lane).toBe("nested"); + expect(firstCall?.inputProvenance).toMatchObject({ + kind: "inter_session", + sourceTool: "sessions_send", + }); }); }); diff --git a/src/gateway/session-utils.fs.test.ts b/src/gateway/session-utils.fs.test.ts index 3bdc1919d9..7ab83a3868 100644 --- a/src/gateway/session-utils.fs.test.ts +++ b/src/gateway/session-utils.fs.test.ts @@ -92,6 +92,27 @@ describe("readFirstUserMessageFromTranscript", () => { expect(result).toBe("First user question"); }); + test("skips inter-session user messages by default", () => { + const sessionId = "test-session-inter-session"; + const transcriptPath = path.join(tmpDir, `${sessionId}.jsonl`); + const lines = [ + JSON.stringify({ + message: { + role: "user", + content: "Forwarded by session tool", + provenance: { kind: "inter_session", sourceTool: "sessions_send" }, + }, + }), + JSON.stringify({ + message: { role: "user", content: "Real user message" }, + }), + ]; + fs.writeFileSync(transcriptPath, lines.join("\n"), "utf-8"); + + const result = readFirstUserMessageFromTranscript(sessionId, storePath); + expect(result).toBe("Real user message"); + }); + test("returns null when no user messages exist", () => { const sessionId = "test-session-4"; const transcriptPath = path.join(tmpDir, `${sessionId}.jsonl`); diff --git a/src/gateway/session-utils.fs.ts b/src/gateway/session-utils.fs.ts index c43d575d57..87ea63170a 100644 --- a/src/gateway/session-utils.fs.ts +++ b/src/gateway/session-utils.fs.ts @@ -8,6 +8,7 @@ import { resolveSessionTranscriptPathInDir, } from "../config/sessions.js"; import { resolveRequiredHomeDir } from "../infra/home-dir.js"; +import { hasInterSessionUserProvenance } from "../sessions/input-provenance.js"; import { extractToolCallNames, hasToolCall } from "../utils/transcript-tools.js"; import { stripEnvelope } from "./chat-sanitize.js"; @@ -139,6 +140,7 @@ const MAX_LINES_TO_SCAN = 10; type TranscriptMessage = { role?: string; content?: string | Array<{ type: string; text?: string }>; + provenance?: unknown; }; function extractTextFromContent(content: TranscriptMessage["content"]): string | null { @@ -167,6 +169,7 @@ export function readFirstUserMessageFromTranscript( storePath: string | undefined, sessionFile?: string, agentId?: string, + opts?: { includeInterSession?: boolean }, ): string | null { const candidates = resolveSessionTranscriptCandidates(sessionId, storePath, sessionFile, agentId); const filePath = candidates.find((p) => fs.existsSync(p)); @@ -193,6 +196,9 @@ export function readFirstUserMessageFromTranscript( const parsed = JSON.parse(line); const msg = parsed?.message as TranscriptMessage | undefined; if (msg?.role === "user") { + if (opts?.includeInterSession !== true && hasInterSessionUserProvenance(msg)) { + continue; + } const text = extractTextFromContent(msg.content); if (text) { return text; diff --git a/src/hooks/bundled/session-memory/handler.test.ts b/src/hooks/bundled/session-memory/handler.test.ts index 6c32488e08..a8723093fe 100644 --- a/src/hooks/bundled/session-memory/handler.test.ts +++ b/src/hooks/bundled/session-memory/handler.test.ts @@ -161,6 +161,58 @@ describe("session-memory hook", () => { expect(memoryContent).not.toContain("search"); }); + it("filters out inter-session user messages", async () => { + const tempDir = await makeTempWorkspace("openclaw-session-memory-"); + const sessionsDir = path.join(tempDir, "sessions"); + await fs.mkdir(sessionsDir, { recursive: true }); + + const sessionContent = [ + JSON.stringify({ + type: "message", + message: { + role: "user", + content: "Forwarded internal instruction", + provenance: { kind: "inter_session", sourceTool: "sessions_send" }, + }, + }), + JSON.stringify({ + type: "message", + message: { role: "assistant", content: "Acknowledged" }, + }), + JSON.stringify({ + type: "message", + message: { role: "user", content: "External follow-up" }, + }), + ].join("\n"); + const sessionFile = await writeWorkspaceFile({ + dir: sessionsDir, + name: "test-session.jsonl", + content: sessionContent, + }); + + const cfg: OpenClawConfig = { + agents: { defaults: { workspace: tempDir } }, + }; + + const event = createHookEvent("command", "new", "agent:main:main", { + cfg, + previousSessionEntry: { + sessionId: "test-123", + sessionFile, + }, + }); + + await handler(event); + + const memoryDir = path.join(tempDir, "memory"); + const files = await fs.readdir(memoryDir); + const memoryContent = await fs.readFile(path.join(memoryDir, files[0]), "utf-8"); + + expect(memoryContent).not.toContain("Forwarded internal instruction"); + expect(memoryContent).toContain("assistant: Acknowledged"); + expect(memoryContent).toContain("user: External follow-up"); + }); + it("filters out command messages starting with /", async () => { const tempDir = await makeTempWorkspace("openclaw-session-memory-"); const sessionsDir = path.join(tempDir, "sessions"); diff --git a/src/hooks/bundled/session-memory/handler.ts b/src/hooks/bundled/session-memory/handler.ts index fed2bbdde2..4f1a0662c8 100644 --- a/src/hooks/bundled/session-memory/handler.ts +++ b/src/hooks/bundled/session-memory/handler.ts @@ -14,6 +14,7 @@ import { resolveAgentWorkspaceDir } from "../../../agents/agent-scope.js"; import { resolveStateDir } from "../../../config/paths.js"; import { createSubsystemLogger } from "../../../logging/subsystem.js"; import { resolveAgentIdFromSessionKey } from "../../../routing/session-key.js"; +import { hasInterSessionUserProvenance } from "../../../sessions/input-provenance.js"; import { resolveHookConfig } from "../../config.js"; import { generateSlugViaLLM } from "../../llm-slug-generator.js"; @@ -40,6 +41,9 @@ async function getRecentSessionContent( const msg = entry.message; const role = msg.role; if ((role === "user" || role === "assistant") && msg.content) { + if (role === "user" && hasInterSessionUserProvenance(msg)) { + continue; + } // Extract text content const text = Array.isArray(msg.content) ? // oxlint-disable-next-line typescript/no-explicit-any diff --git a/src/sessions/input-provenance.ts b/src/sessions/input-provenance.ts new file mode 100644 index 0000000000..4540e68061 --- /dev/null +++ b/src/sessions/input-provenance.ts @@ -0,0 +1,79 @@ +import type { AgentMessage } from "@mariozechner/pi-agent-core"; + +export const INPUT_PROVENANCE_KIND_VALUES = [ + "external_user", + "inter_session", + "internal_system", +] as const; + +export type InputProvenanceKind = (typeof INPUT_PROVENANCE_KIND_VALUES)[number]; + +export type InputProvenance = { + kind: InputProvenanceKind; + sourceSessionKey?: string; + sourceChannel?: string; + sourceTool?: string; +}; + +function normalizeOptionalString(value: unknown): string | undefined { + if (typeof value !== "string") { + return undefined; + } + const trimmed = value.trim(); + return trimmed ? trimmed : undefined; +} + +function isInputProvenanceKind(value: unknown): value is InputProvenanceKind { + return ( + typeof value === "string" && (INPUT_PROVENANCE_KIND_VALUES as readonly string[]).includes(value) + ); +} + +export function normalizeInputProvenance(value: unknown): InputProvenance | undefined { + if (!value || typeof value !== "object") { + return undefined; + } + const record = value as Record; + if (!isInputProvenanceKind(record.kind)) { + return undefined; + } + return { + kind: record.kind, + sourceSessionKey: normalizeOptionalString(record.sourceSessionKey), + sourceChannel: normalizeOptionalString(record.sourceChannel), + sourceTool: normalizeOptionalString(record.sourceTool), + }; +} + +export function applyInputProvenanceToUserMessage( + message: AgentMessage, + inputProvenance: InputProvenance | undefined, +): AgentMessage { + if (!inputProvenance) { + return message; + } + if ((message as { role?: unknown }).role !== "user") { + return message; + } + const existing = normalizeInputProvenance((message as { provenance?: unknown }).provenance); + if (existing) { + return message; + } + return { + ...(message as unknown as Record), + provenance: inputProvenance, + } as unknown as AgentMessage; +} + +export function isInterSessionInputProvenance(value: unknown): boolean { + return normalizeInputProvenance(value)?.kind === "inter_session"; +} + +export function hasInterSessionUserProvenance( + message: { role?: unknown; provenance?: unknown } | undefined, +): boolean { + if (!message || message.role !== "user") { + return false; + } + return isInterSessionInputProvenance(message.provenance); +}