fix: harden subagent completion announce retries

This commit is contained in:
Peter Steinberger
2026-02-18 03:19:30 +01:00
parent d7c6136c1f
commit edf7d6af61
4 changed files with 252 additions and 25 deletions

View File

@@ -477,6 +477,7 @@ Notes:
- `sessions_spawn` starts a sub-agent run and posts an announce reply back to the requester chat.
- Reply format includes `Status`, `Result`, and compact stats.
- `Result` is the assistant completion text; if missing, the latest `toolResult` is used as fallback.
- Manual completion-mode spawns send directly first, with queue fallback and retry on transient failures (`status: "ok"` means run finished, not that announce delivered).
- `sessions_spawn` is non-blocking and returns `status: "accepted"` immediately.
- `sessions_send` runs a replyback pingpong (reply `REPLY_SKIP` to stop; max turns via `session.agentToAgent.maxPingPongTurns`, 05).
- After the pingpong, the target agent runs an **announce step**; reply `ANNOUNCE_SKIP` to suppress the announcement.

View File

@@ -30,6 +30,10 @@ Use `/subagents` to inspect or control sub-agent runs for the **current session*
- The spawn command is non-blocking; it returns a run id immediately.
- On completion, the sub-agent announces a summary/result message back to the requester chat channel.
- For manual spawns, delivery is resilient:
- OpenClaw tries direct `agent` delivery first with a stable idempotency key.
- If direct delivery fails, it falls back to queue routing.
- If queue routing is still not available, the announce is retried with a short exponential backoff before final give-up.
- The completion message is a system message and includes:
- `Result` (`assistant` reply text, or latest `toolResult` if the assistant reply is empty)
- `Status` (`completed successfully` / `failed` / `timed out`)

View File

@@ -36,6 +36,31 @@ type ToolResultMessage = {
content?: unknown;
};
type SubagentDeliveryPath = "queued" | "steered" | "direct" | "none";
type SubagentAnnounceDeliveryResult = {
delivered: boolean;
path: SubagentDeliveryPath;
error?: string;
};
function summarizeDeliveryError(error: unknown): string {
if (error instanceof Error) {
return error.message || "error";
}
if (typeof error === "string") {
return error;
}
if (error === undefined || error === null) {
return "unknown error";
}
try {
return JSON.stringify(error);
} catch {
return "error";
}
}
function extractToolResultText(content: unknown): string {
if (typeof content === "string") {
return sanitizeTextContent(content);
@@ -45,6 +70,9 @@ function extractToolResultText(content: unknown): string {
text?: unknown;
output?: unknown;
content?: unknown;
result?: unknown;
error?: unknown;
summary?: unknown;
};
if (typeof obj.text === "string") {
return sanitizeTextContent(obj.text);
@@ -55,6 +83,15 @@ function extractToolResultText(content: unknown): string {
if (typeof obj.content === "string") {
return sanitizeTextContent(obj.content);
}
if (typeof obj.result === "string") {
return sanitizeTextContent(obj.result);
}
if (typeof obj.error === "string") {
return sanitizeTextContent(obj.error);
}
if (typeof obj.summary === "string") {
return sanitizeTextContent(obj.summary);
}
}
if (!Array.isArray(content)) {
return "";
@@ -72,12 +109,41 @@ function extractSubagentOutputText(message: unknown): string {
return "";
}
const role = (message as { role?: unknown }).role;
const content = (message as { content?: unknown }).content;
if (role === "assistant") {
return extractAssistantText(message) ?? "";
const assistantText = extractAssistantText(message);
if (assistantText) {
return assistantText;
}
if (typeof content === "string") {
return sanitizeTextContent(content);
}
if (Array.isArray(content)) {
return (
extractTextFromChatContent(content, {
sanitizeText: sanitizeTextContent,
normalizeText: (text) => text.trim(),
joinWith: "",
}) ?? ""
);
}
return "";
}
if (role === "toolResult" || role === "tool") {
return extractToolResultText((message as ToolResultMessage).content);
}
if (typeof content === "string") {
return sanitizeTextContent(content);
}
if (Array.isArray(content)) {
return (
extractTextFromChatContent(content, {
sanitizeText: sanitizeTextContent,
normalizeText: (text) => text.trim(),
joinWith: "",
}) ?? ""
);
}
return "";
}
@@ -314,6 +380,126 @@ async function maybeQueueSubagentAnnounce(params: {
return "none";
}
function queueOutcomeToDeliveryResult(
outcome: "steered" | "queued" | "none",
): SubagentAnnounceDeliveryResult {
if (outcome === "steered") {
return {
delivered: true,
path: "steered",
};
}
if (outcome === "queued") {
return {
delivered: true,
path: "queued",
};
}
return {
delivered: false,
path: "none",
};
}
async function sendSubagentAnnounceDirectly(params: {
targetRequesterSessionKey: string;
triggerMessage: string;
directIdempotencyKey: string;
directOrigin?: DeliveryContext;
requesterIsSubagent: boolean;
}): Promise<SubagentAnnounceDeliveryResult> {
try {
await callGateway({
method: "agent",
params: {
sessionKey: params.targetRequesterSessionKey,
message: params.triggerMessage,
deliver: !params.requesterIsSubagent,
channel: params.requesterIsSubagent ? undefined : params.directOrigin?.channel,
accountId: params.requesterIsSubagent ? undefined : params.directOrigin?.accountId,
to: params.requesterIsSubagent ? undefined : params.directOrigin?.to,
threadId:
!params.requesterIsSubagent &&
params.directOrigin?.threadId != null &&
params.directOrigin.threadId !== ""
? String(params.directOrigin.threadId)
: undefined,
idempotencyKey: params.directIdempotencyKey,
},
expectFinal: true,
timeoutMs: 15_000,
});
return {
delivered: true,
path: "direct",
};
} catch (err) {
return {
delivered: false,
path: "direct",
error: summarizeDeliveryError(err),
};
}
}
async function deliverSubagentCompletionAnnouncement(params: {
requesterSessionKey: string;
announceId?: string;
triggerMessage: string;
summaryLine?: string;
requesterOrigin?: DeliveryContext;
directOrigin?: DeliveryContext;
targetRequesterSessionKey: string;
requesterIsSubagent: boolean;
expectsCompletionMessage: boolean;
directIdempotencyKey: string;
}): Promise<SubagentAnnounceDeliveryResult> {
// Non-completion mode mirrors historical behavior: try queued/steered delivery first,
// then (only if not queued) attempt direct delivery.
if (!params.expectsCompletionMessage) {
const queueOutcome = await maybeQueueSubagentAnnounce({
requesterSessionKey: params.requesterSessionKey,
announceId: params.announceId,
triggerMessage: params.triggerMessage,
summaryLine: params.summaryLine,
requesterOrigin: params.requesterOrigin,
});
const queued = queueOutcomeToDeliveryResult(queueOutcome);
if (queued.delivered) {
return queued;
}
}
// Completion-mode uses direct send first so manual spawns can return immediately
// in the common ready-to-deliver case.
const direct = await sendSubagentAnnounceDirectly({
targetRequesterSessionKey: params.targetRequesterSessionKey,
triggerMessage: params.triggerMessage,
directIdempotencyKey: params.directIdempotencyKey,
directOrigin: params.directOrigin,
requesterIsSubagent: params.requesterIsSubagent,
});
if (direct.delivered || !params.expectsCompletionMessage) {
return direct;
}
// If completion path failed direct delivery, try queueing as a fallback so the
// report can still be delivered once the requester session is idle.
const queueOutcome = await maybeQueueSubagentAnnounce({
requesterSessionKey: params.requesterSessionKey,
announceId: params.announceId,
triggerMessage: params.triggerMessage,
summaryLine: params.summaryLine,
requesterOrigin: params.requesterOrigin,
});
if (queueOutcome === "steered" || queueOutcome === "queued") {
return queueOutcomeToDeliveryResult(queueOutcome);
}
return direct;
}
function loadSessionEntryByKey(sessionKey: string) {
const cfg = loadConfig();
const agentId = resolveAgentIdFromSessionKey(sessionKey);
@@ -472,7 +658,7 @@ export async function runSubagentAnnounceFlow(params: {
let outcome: SubagentRunOutcome | undefined = params.outcome;
// Lifecycle "end" can arrive before auto-compaction retries finish. If the
// subagent is still active, wait for the embedded run to fully settle.
if (childSessionId && isEmbeddedPiRunActive(childSessionId)) {
if (!expectsCompletionMessage && childSessionId && isEmbeddedPiRunActive(childSessionId)) {
const settled = await waitForEmbeddedPiRunEnd(childSessionId, settleTimeoutMs);
if (!settled && isEmbeddedPiRunActive(childSessionId)) {
// The child run is still active (e.g., compaction retry still in progress).
@@ -531,7 +717,12 @@ export async function runSubagentAnnounceFlow(params: {
});
}
if (!reply?.trim() && childSessionId && isEmbeddedPiRunActive(childSessionId)) {
if (
!expectsCompletionMessage &&
!reply?.trim() &&
childSessionId &&
isEmbeddedPiRunActive(childSessionId)
) {
// Avoid announcing "(no output)" while the child run is still producing output.
shouldDeleteChildSession = false;
return false;
@@ -548,7 +739,7 @@ export async function runSubagentAnnounceFlow(params: {
} catch {
// Best-effort only; fall back to direct announce behavior when unavailable.
}
if (activeChildDescendantRuns > 0) {
if (!expectsCompletionMessage && activeChildDescendantRuns > 0) {
// The finished run still has active descendant subagents. Defer announcing
// this run until descendants settle so we avoid posting in-progress updates.
shouldDeleteChildSession = false;
@@ -669,34 +860,32 @@ export async function runSubagentAnnounceFlow(params: {
// Send to the requester session. For nested subagents this is an internal
// follow-up injection (deliver=false) so the orchestrator receives it.
let directOrigin = targetRequesterOrigin;
if (!requesterIsSubagent && !directOrigin) {
if (!requesterIsSubagent) {
const { entry } = loadRequesterSessionEntry(targetRequesterSessionKey);
directOrigin = deliveryContextFromSession(entry);
directOrigin = resolveAnnounceOrigin(entry, targetRequesterOrigin);
}
// Use a deterministic idempotency key so the gateway dedup cache
// catches duplicates if this announce is also queued by the gateway-
// level message queue while the main session is busy (#17122).
const directIdempotencyKey = buildAnnounceIdempotencyKey(announceId);
await callGateway({
method: "agent",
params: {
sessionKey: targetRequesterSessionKey,
message: triggerMessage,
deliver: !requesterIsSubagent,
channel: requesterIsSubagent ? undefined : directOrigin?.channel,
accountId: requesterIsSubagent ? undefined : directOrigin?.accountId,
to: requesterIsSubagent ? undefined : directOrigin?.to,
threadId:
!requesterIsSubagent && directOrigin?.threadId != null && directOrigin.threadId !== ""
? String(directOrigin.threadId)
: undefined,
idempotencyKey: directIdempotencyKey,
},
expectFinal: true,
timeoutMs: 15_000,
const delivery = await deliverSubagentCompletionAnnouncement({
requesterSessionKey: targetRequesterSessionKey,
announceId,
triggerMessage,
summaryLine: taskLabel,
requesterOrigin: targetRequesterOrigin,
directOrigin,
targetRequesterSessionKey,
requesterIsSubagent,
expectsCompletionMessage: expectsCompletionMessage,
directIdempotencyKey,
});
didAnnounce = true;
didAnnounce = delivery.delivered;
if (!delivery.delivered && delivery.path === "direct" && delivery.error) {
defaultRuntime.error?.(
`Subagent completion direct announce failed for run ${params.childRunId}: ${delivery.error}`,
);
}
} catch (err) {
defaultRuntime.error?.(`Subagent announce failed: ${String(err)}`);
// Best-effort follow-ups; ignore failures to avoid breaking the caller response.

View File

@@ -44,6 +44,8 @@ let listenerStop: (() => void) | null = null;
// Use var to avoid TDZ when init runs across circular imports during bootstrap.
var restoreAttempted = false;
const SUBAGENT_ANNOUNCE_TIMEOUT_MS = 120_000;
const MIN_ANNOUNCE_RETRY_DELAY_MS = 1_000;
const MAX_ANNOUNCE_RETRY_DELAY_MS = 8_000;
/**
* Maximum number of announce delivery attempts before giving up.
* Prevents infinite retry loops when `runSubagentAnnounceFlow` repeatedly
@@ -56,6 +58,12 @@ const MAX_ANNOUNCE_RETRY_COUNT = 3;
*/
const ANNOUNCE_EXPIRY_MS = 5 * 60_000; // 5 minutes
function resolveAnnounceRetryDelayMs(retryCount: number) {
const boundedRetryCount = Math.max(0, Math.min(retryCount, 10));
const baseDelay = MIN_ANNOUNCE_RETRY_DELAY_MS * 2 ** boundedRetryCount;
return Math.min(baseDelay, MAX_ANNOUNCE_RETRY_DELAY_MS);
}
function logAnnounceGiveUp(entry: SubagentRunRecord, reason: "retry-limit" | "expiry") {
const retryCount = entry.announceRetryCount ?? 0;
const endedAgoMs =
@@ -131,6 +139,22 @@ function resumeSubagentRun(runId: string) {
return;
}
const now = Date.now();
const delayMs = resolveAnnounceRetryDelayMs(entry.announceRetryCount ?? 0);
const earliestRetryAt = (entry.lastAnnounceRetryAt ?? 0) + delayMs;
if (
entry.expectsCompletionMessage === true &&
entry.lastAnnounceRetryAt &&
now < earliestRetryAt
) {
const waitMs = Math.max(1, earliestRetryAt - now);
setTimeout(() => {
resumeSubagentRun(runId);
}, waitMs).unref?.();
resumedRuns.add(runId);
return;
}
if (typeof entry.endedAt === "number" && entry.endedAt > 0) {
if (suppressAnnounceForSteerRestart(entry)) {
resumedRuns.add(runId);
@@ -317,6 +341,15 @@ function finalizeSubagentCleanup(runId: string, cleanup: "delete" | "keep", didA
entry.cleanupHandled = false;
resumedRuns.delete(runId);
persistSubagentRuns();
if (entry.expectsCompletionMessage !== true) {
return;
}
setTimeout(
() => {
resumeSubagentRun(runId);
},
resolveAnnounceRetryDelayMs(entry.announceRetryCount ?? 0),
).unref?.();
return;
}
if (cleanup === "delete") {