diff --git a/src/agents/subagent-announce-queue.ts b/src/agents/subagent-announce-queue.ts index e0dc8fcbfa..9c18bffa07 100644 --- a/src/agents/subagent-announce-queue.ts +++ b/src/agents/subagent-announce-queue.ts @@ -10,6 +10,7 @@ import { applyQueueDropPolicy, buildCollectPrompt, clearQueueSummaryState, + drainCollectItemIfNeeded, drainNextQueueItem, hasCrossChannelItems, previewQueueSummaryPrompt, @@ -108,12 +109,6 @@ function scheduleAnnounceDrain(key: string) { while (queue.items.length > 0 || queue.droppedCount > 0) { await waitForQueueDebounce(queue); if (queue.mode === "collect") { - if (forceIndividualCollect) { - if (!(await drainNextQueueItem(queue.items, async (item) => await queue.send(item)))) { - break; - } - continue; - } const isCrossChannel = hasCrossChannelItems(queue.items, (item) => { if (!item.origin) { return {}; @@ -123,11 +118,19 @@ function scheduleAnnounceDrain(key: string) { } return { key: item.originKey }; }); - if (isCrossChannel) { - forceIndividualCollect = true; - if (!(await drainNextQueueItem(queue.items, async (item) => await queue.send(item)))) { - break; - } + const collectDrainResult = await drainCollectItemIfNeeded({ + forceIndividualCollect, + isCrossChannel, + setForceIndividualCollect: (next) => { + forceIndividualCollect = next; + }, + items: queue.items, + run: async (item) => await queue.send(item), + }); + if (collectDrainResult === "empty") { + break; + } + if (collectDrainResult === "drained") { continue; } const items = queue.items.slice(); diff --git a/src/auto-reply/reply/queue/drain.ts b/src/auto-reply/reply/queue/drain.ts index be409b3c74..35cb8de689 100644 --- a/src/auto-reply/reply/queue/drain.ts +++ b/src/auto-reply/reply/queue/drain.ts @@ -2,6 +2,7 @@ import { defaultRuntime } from "../../../runtime.js"; import { buildCollectPrompt, clearQueueSummaryState, + drainCollectItemIfNeeded, drainNextQueueItem, hasCrossChannelItems, previewQueueSummaryPrompt, @@ -30,13 +31,6 @@ export function scheduleFollowupDrain( // Prevents “collect after shift” collapsing different targets. // // Debug: `pnpm test src/auto-reply/reply/queue.collect-routing.test.ts` - if (forceIndividualCollect) { - if (!(await drainNextQueueItem(queue.items, runFollowup))) { - break; - } - continue; - } - // Check if messages span multiple channels. // If so, process individually to preserve per-message routing. const isCrossChannel = hasCrossChannelItems(queue.items, (item) => { @@ -56,11 +50,19 @@ export function scheduleFollowupDrain( }; }); - if (isCrossChannel) { - forceIndividualCollect = true; - if (!(await drainNextQueueItem(queue.items, runFollowup))) { - break; - } + const collectDrainResult = await drainCollectItemIfNeeded({ + forceIndividualCollect, + isCrossChannel, + setForceIndividualCollect: (next) => { + forceIndividualCollect = next; + }, + items: queue.items, + run: runFollowup, + }); + if (collectDrainResult === "empty") { + break; + } + if (collectDrainResult === "drained") { continue; } diff --git a/src/utils/queue-helpers.ts b/src/utils/queue-helpers.ts index 4ebb627e89..cb4889134c 100644 --- a/src/utils/queue-helpers.ts +++ b/src/utils/queue-helpers.ts @@ -142,6 +142,23 @@ export async function drainNextQueueItem( return true; } +export async function drainCollectItemIfNeeded(params: { + forceIndividualCollect: boolean; + isCrossChannel: boolean; + setForceIndividualCollect?: (next: boolean) => void; + items: T[]; + run: (item: T) => Promise; +}): Promise<"skipped" | "drained" | "empty"> { + if (!params.forceIndividualCollect && !params.isCrossChannel) { + return "skipped"; + } + if (params.isCrossChannel) { + params.setForceIndividualCollect?.(true); + } + const drained = await drainNextQueueItem(params.items, params.run); + return drained ? "drained" : "empty"; +} + export function buildQueueSummaryPrompt(params: { state: QueueSummaryState; noun: string;