diff --git a/docs/tools/index.md b/docs/tools/index.md index 1ff08702b5..24f4016a4a 100644 --- a/docs/tools/index.md +++ b/docs/tools/index.md @@ -477,6 +477,7 @@ Notes: - `sessions_spawn` starts a sub-agent run and posts an announce reply back to the requester chat. - Reply format includes `Status`, `Result`, and compact stats. - `Result` is the assistant completion text; if missing, the latest `toolResult` is used as fallback. +- Manual completion-mode spawns send directly first, with queue fallback and retry on transient failures (`status: "ok"` means run finished, not that announce delivered). - `sessions_spawn` is non-blocking and returns `status: "accepted"` immediately. - `sessions_send` runs a reply‑back ping‑pong (reply `REPLY_SKIP` to stop; max turns via `session.agentToAgent.maxPingPongTurns`, 0–5). - After the ping‑pong, the target agent runs an **announce step**; reply `ANNOUNCE_SKIP` to suppress the announcement. diff --git a/docs/tools/subagents.md b/docs/tools/subagents.md index 3ff71f1f90..3022d55192 100644 --- a/docs/tools/subagents.md +++ b/docs/tools/subagents.md @@ -30,6 +30,10 @@ Use `/subagents` to inspect or control sub-agent runs for the **current session* - The spawn command is non-blocking; it returns a run id immediately. - On completion, the sub-agent announces a summary/result message back to the requester chat channel. +- For manual spawns, delivery is resilient: + - OpenClaw tries direct `agent` delivery first with a stable idempotency key. + - If direct delivery fails, it falls back to queue routing. + - If queue routing is still not available, the announce is retried with a short exponential backoff before final give-up. - The completion message is a system message and includes: - `Result` (`assistant` reply text, or latest `toolResult` if the assistant reply is empty) - `Status` (`completed successfully` / `failed` / `timed out`) diff --git a/src/agents/subagent-announce.ts b/src/agents/subagent-announce.ts index 9640b5432b..9c1dfdaae6 100644 --- a/src/agents/subagent-announce.ts +++ b/src/agents/subagent-announce.ts @@ -36,6 +36,31 @@ type ToolResultMessage = { content?: unknown; }; +type SubagentDeliveryPath = "queued" | "steered" | "direct" | "none"; + +type SubagentAnnounceDeliveryResult = { + delivered: boolean; + path: SubagentDeliveryPath; + error?: string; +}; + +function summarizeDeliveryError(error: unknown): string { + if (error instanceof Error) { + return error.message || "error"; + } + if (typeof error === "string") { + return error; + } + if (error === undefined || error === null) { + return "unknown error"; + } + try { + return JSON.stringify(error); + } catch { + return "error"; + } +} + function extractToolResultText(content: unknown): string { if (typeof content === "string") { return sanitizeTextContent(content); @@ -45,6 +70,9 @@ function extractToolResultText(content: unknown): string { text?: unknown; output?: unknown; content?: unknown; + result?: unknown; + error?: unknown; + summary?: unknown; }; if (typeof obj.text === "string") { return sanitizeTextContent(obj.text); @@ -55,6 +83,15 @@ function extractToolResultText(content: unknown): string { if (typeof obj.content === "string") { return sanitizeTextContent(obj.content); } + if (typeof obj.result === "string") { + return sanitizeTextContent(obj.result); + } + if (typeof obj.error === "string") { + return sanitizeTextContent(obj.error); + } + if (typeof obj.summary === "string") { + return sanitizeTextContent(obj.summary); + } } if (!Array.isArray(content)) { return ""; @@ -72,12 +109,41 @@ function extractSubagentOutputText(message: unknown): string { return ""; } const role = (message as { role?: unknown }).role; + const content = (message as { content?: unknown }).content; if (role === "assistant") { - return extractAssistantText(message) ?? ""; + const assistantText = extractAssistantText(message); + if (assistantText) { + return assistantText; + } + if (typeof content === "string") { + return sanitizeTextContent(content); + } + if (Array.isArray(content)) { + return ( + extractTextFromChatContent(content, { + sanitizeText: sanitizeTextContent, + normalizeText: (text) => text.trim(), + joinWith: "", + }) ?? "" + ); + } + return ""; } if (role === "toolResult" || role === "tool") { return extractToolResultText((message as ToolResultMessage).content); } + if (typeof content === "string") { + return sanitizeTextContent(content); + } + if (Array.isArray(content)) { + return ( + extractTextFromChatContent(content, { + sanitizeText: sanitizeTextContent, + normalizeText: (text) => text.trim(), + joinWith: "", + }) ?? "" + ); + } return ""; } @@ -314,6 +380,126 @@ async function maybeQueueSubagentAnnounce(params: { return "none"; } +function queueOutcomeToDeliveryResult( + outcome: "steered" | "queued" | "none", +): SubagentAnnounceDeliveryResult { + if (outcome === "steered") { + return { + delivered: true, + path: "steered", + }; + } + if (outcome === "queued") { + return { + delivered: true, + path: "queued", + }; + } + return { + delivered: false, + path: "none", + }; +} + +async function sendSubagentAnnounceDirectly(params: { + targetRequesterSessionKey: string; + triggerMessage: string; + directIdempotencyKey: string; + directOrigin?: DeliveryContext; + requesterIsSubagent: boolean; +}): Promise { + try { + await callGateway({ + method: "agent", + params: { + sessionKey: params.targetRequesterSessionKey, + message: params.triggerMessage, + deliver: !params.requesterIsSubagent, + channel: params.requesterIsSubagent ? undefined : params.directOrigin?.channel, + accountId: params.requesterIsSubagent ? undefined : params.directOrigin?.accountId, + to: params.requesterIsSubagent ? undefined : params.directOrigin?.to, + threadId: + !params.requesterIsSubagent && + params.directOrigin?.threadId != null && + params.directOrigin.threadId !== "" + ? String(params.directOrigin.threadId) + : undefined, + idempotencyKey: params.directIdempotencyKey, + }, + expectFinal: true, + timeoutMs: 15_000, + }); + + return { + delivered: true, + path: "direct", + }; + } catch (err) { + return { + delivered: false, + path: "direct", + error: summarizeDeliveryError(err), + }; + } +} + +async function deliverSubagentCompletionAnnouncement(params: { + requesterSessionKey: string; + announceId?: string; + triggerMessage: string; + summaryLine?: string; + requesterOrigin?: DeliveryContext; + directOrigin?: DeliveryContext; + targetRequesterSessionKey: string; + requesterIsSubagent: boolean; + expectsCompletionMessage: boolean; + directIdempotencyKey: string; +}): Promise { + // Non-completion mode mirrors historical behavior: try queued/steered delivery first, + // then (only if not queued) attempt direct delivery. + if (!params.expectsCompletionMessage) { + const queueOutcome = await maybeQueueSubagentAnnounce({ + requesterSessionKey: params.requesterSessionKey, + announceId: params.announceId, + triggerMessage: params.triggerMessage, + summaryLine: params.summaryLine, + requesterOrigin: params.requesterOrigin, + }); + const queued = queueOutcomeToDeliveryResult(queueOutcome); + if (queued.delivered) { + return queued; + } + } + + // Completion-mode uses direct send first so manual spawns can return immediately + // in the common ready-to-deliver case. + const direct = await sendSubagentAnnounceDirectly({ + targetRequesterSessionKey: params.targetRequesterSessionKey, + triggerMessage: params.triggerMessage, + directIdempotencyKey: params.directIdempotencyKey, + directOrigin: params.directOrigin, + requesterIsSubagent: params.requesterIsSubagent, + }); + if (direct.delivered || !params.expectsCompletionMessage) { + return direct; + } + + // If completion path failed direct delivery, try queueing as a fallback so the + // report can still be delivered once the requester session is idle. + const queueOutcome = await maybeQueueSubagentAnnounce({ + requesterSessionKey: params.requesterSessionKey, + announceId: params.announceId, + triggerMessage: params.triggerMessage, + summaryLine: params.summaryLine, + requesterOrigin: params.requesterOrigin, + }); + if (queueOutcome === "steered" || queueOutcome === "queued") { + return queueOutcomeToDeliveryResult(queueOutcome); + } + + return direct; +} + function loadSessionEntryByKey(sessionKey: string) { const cfg = loadConfig(); const agentId = resolveAgentIdFromSessionKey(sessionKey); @@ -472,7 +658,7 @@ export async function runSubagentAnnounceFlow(params: { let outcome: SubagentRunOutcome | undefined = params.outcome; // Lifecycle "end" can arrive before auto-compaction retries finish. If the // subagent is still active, wait for the embedded run to fully settle. - if (childSessionId && isEmbeddedPiRunActive(childSessionId)) { + if (!expectsCompletionMessage && childSessionId && isEmbeddedPiRunActive(childSessionId)) { const settled = await waitForEmbeddedPiRunEnd(childSessionId, settleTimeoutMs); if (!settled && isEmbeddedPiRunActive(childSessionId)) { // The child run is still active (e.g., compaction retry still in progress). @@ -531,7 +717,12 @@ export async function runSubagentAnnounceFlow(params: { }); } - if (!reply?.trim() && childSessionId && isEmbeddedPiRunActive(childSessionId)) { + if ( + !expectsCompletionMessage && + !reply?.trim() && + childSessionId && + isEmbeddedPiRunActive(childSessionId) + ) { // Avoid announcing "(no output)" while the child run is still producing output. shouldDeleteChildSession = false; return false; @@ -548,7 +739,7 @@ export async function runSubagentAnnounceFlow(params: { } catch { // Best-effort only; fall back to direct announce behavior when unavailable. } - if (activeChildDescendantRuns > 0) { + if (!expectsCompletionMessage && activeChildDescendantRuns > 0) { // The finished run still has active descendant subagents. Defer announcing // this run until descendants settle so we avoid posting in-progress updates. shouldDeleteChildSession = false; @@ -669,34 +860,32 @@ export async function runSubagentAnnounceFlow(params: { // Send to the requester session. For nested subagents this is an internal // follow-up injection (deliver=false) so the orchestrator receives it. let directOrigin = targetRequesterOrigin; - if (!requesterIsSubagent && !directOrigin) { + if (!requesterIsSubagent) { const { entry } = loadRequesterSessionEntry(targetRequesterSessionKey); - directOrigin = deliveryContextFromSession(entry); + directOrigin = resolveAnnounceOrigin(entry, targetRequesterOrigin); } // Use a deterministic idempotency key so the gateway dedup cache // catches duplicates if this announce is also queued by the gateway- // level message queue while the main session is busy (#17122). const directIdempotencyKey = buildAnnounceIdempotencyKey(announceId); - await callGateway({ - method: "agent", - params: { - sessionKey: targetRequesterSessionKey, - message: triggerMessage, - deliver: !requesterIsSubagent, - channel: requesterIsSubagent ? undefined : directOrigin?.channel, - accountId: requesterIsSubagent ? undefined : directOrigin?.accountId, - to: requesterIsSubagent ? undefined : directOrigin?.to, - threadId: - !requesterIsSubagent && directOrigin?.threadId != null && directOrigin.threadId !== "" - ? String(directOrigin.threadId) - : undefined, - idempotencyKey: directIdempotencyKey, - }, - expectFinal: true, - timeoutMs: 15_000, + const delivery = await deliverSubagentCompletionAnnouncement({ + requesterSessionKey: targetRequesterSessionKey, + announceId, + triggerMessage, + summaryLine: taskLabel, + requesterOrigin: targetRequesterOrigin, + directOrigin, + targetRequesterSessionKey, + requesterIsSubagent, + expectsCompletionMessage: expectsCompletionMessage, + directIdempotencyKey, }); - - didAnnounce = true; + didAnnounce = delivery.delivered; + if (!delivery.delivered && delivery.path === "direct" && delivery.error) { + defaultRuntime.error?.( + `Subagent completion direct announce failed for run ${params.childRunId}: ${delivery.error}`, + ); + } } catch (err) { defaultRuntime.error?.(`Subagent announce failed: ${String(err)}`); // Best-effort follow-ups; ignore failures to avoid breaking the caller response. diff --git a/src/agents/subagent-registry.ts b/src/agents/subagent-registry.ts index 22cb5b9bd3..2cf7a5832b 100644 --- a/src/agents/subagent-registry.ts +++ b/src/agents/subagent-registry.ts @@ -44,6 +44,8 @@ let listenerStop: (() => void) | null = null; // Use var to avoid TDZ when init runs across circular imports during bootstrap. var restoreAttempted = false; const SUBAGENT_ANNOUNCE_TIMEOUT_MS = 120_000; +const MIN_ANNOUNCE_RETRY_DELAY_MS = 1_000; +const MAX_ANNOUNCE_RETRY_DELAY_MS = 8_000; /** * Maximum number of announce delivery attempts before giving up. * Prevents infinite retry loops when `runSubagentAnnounceFlow` repeatedly @@ -56,6 +58,12 @@ const MAX_ANNOUNCE_RETRY_COUNT = 3; */ const ANNOUNCE_EXPIRY_MS = 5 * 60_000; // 5 minutes +function resolveAnnounceRetryDelayMs(retryCount: number) { + const boundedRetryCount = Math.max(0, Math.min(retryCount, 10)); + const baseDelay = MIN_ANNOUNCE_RETRY_DELAY_MS * 2 ** boundedRetryCount; + return Math.min(baseDelay, MAX_ANNOUNCE_RETRY_DELAY_MS); +} + function logAnnounceGiveUp(entry: SubagentRunRecord, reason: "retry-limit" | "expiry") { const retryCount = entry.announceRetryCount ?? 0; const endedAgoMs = @@ -131,6 +139,22 @@ function resumeSubagentRun(runId: string) { return; } + const now = Date.now(); + const delayMs = resolveAnnounceRetryDelayMs(entry.announceRetryCount ?? 0); + const earliestRetryAt = (entry.lastAnnounceRetryAt ?? 0) + delayMs; + if ( + entry.expectsCompletionMessage === true && + entry.lastAnnounceRetryAt && + now < earliestRetryAt + ) { + const waitMs = Math.max(1, earliestRetryAt - now); + setTimeout(() => { + resumeSubagentRun(runId); + }, waitMs).unref?.(); + resumedRuns.add(runId); + return; + } + if (typeof entry.endedAt === "number" && entry.endedAt > 0) { if (suppressAnnounceForSteerRestart(entry)) { resumedRuns.add(runId); @@ -317,6 +341,15 @@ function finalizeSubagentCleanup(runId: string, cleanup: "delete" | "keep", didA entry.cleanupHandled = false; resumedRuns.delete(runId); persistSubagentRuns(); + if (entry.expectsCompletionMessage !== true) { + return; + } + setTimeout( + () => { + resumeSubagentRun(runId); + }, + resolveAnnounceRetryDelayMs(entry.announceRetryCount ?? 0), + ).unref?.(); return; } if (cleanup === "delete") {