refactor(queue): reuse collect-mode item drain flow

This commit is contained in:
Peter Steinberger
2026-02-19 07:00:44 +00:00
parent 2f6b8663ff
commit b22deada9e
3 changed files with 45 additions and 23 deletions

View File

@@ -10,6 +10,7 @@ import {
applyQueueDropPolicy,
buildCollectPrompt,
clearQueueSummaryState,
drainCollectItemIfNeeded,
drainNextQueueItem,
hasCrossChannelItems,
previewQueueSummaryPrompt,
@@ -108,12 +109,6 @@ function scheduleAnnounceDrain(key: string) {
while (queue.items.length > 0 || queue.droppedCount > 0) {
await waitForQueueDebounce(queue);
if (queue.mode === "collect") {
if (forceIndividualCollect) {
if (!(await drainNextQueueItem(queue.items, async (item) => await queue.send(item)))) {
break;
}
continue;
}
const isCrossChannel = hasCrossChannelItems(queue.items, (item) => {
if (!item.origin) {
return {};
@@ -123,11 +118,19 @@ function scheduleAnnounceDrain(key: string) {
}
return { key: item.originKey };
});
if (isCrossChannel) {
forceIndividualCollect = true;
if (!(await drainNextQueueItem(queue.items, async (item) => await queue.send(item)))) {
break;
}
const collectDrainResult = await drainCollectItemIfNeeded({
forceIndividualCollect,
isCrossChannel,
setForceIndividualCollect: (next) => {
forceIndividualCollect = next;
},
items: queue.items,
run: async (item) => await queue.send(item),
});
if (collectDrainResult === "empty") {
break;
}
if (collectDrainResult === "drained") {
continue;
}
const items = queue.items.slice();

View File

@@ -2,6 +2,7 @@ import { defaultRuntime } from "../../../runtime.js";
import {
buildCollectPrompt,
clearQueueSummaryState,
drainCollectItemIfNeeded,
drainNextQueueItem,
hasCrossChannelItems,
previewQueueSummaryPrompt,
@@ -30,13 +31,6 @@ export function scheduleFollowupDrain(
// Prevents “collect after shift” collapsing different targets.
//
// Debug: `pnpm test src/auto-reply/reply/queue.collect-routing.test.ts`
if (forceIndividualCollect) {
if (!(await drainNextQueueItem(queue.items, runFollowup))) {
break;
}
continue;
}
// Check if messages span multiple channels.
// If so, process individually to preserve per-message routing.
const isCrossChannel = hasCrossChannelItems(queue.items, (item) => {
@@ -56,11 +50,19 @@ export function scheduleFollowupDrain(
};
});
if (isCrossChannel) {
forceIndividualCollect = true;
if (!(await drainNextQueueItem(queue.items, runFollowup))) {
break;
}
const collectDrainResult = await drainCollectItemIfNeeded({
forceIndividualCollect,
isCrossChannel,
setForceIndividualCollect: (next) => {
forceIndividualCollect = next;
},
items: queue.items,
run: runFollowup,
});
if (collectDrainResult === "empty") {
break;
}
if (collectDrainResult === "drained") {
continue;
}

View File

@@ -142,6 +142,23 @@ export async function drainNextQueueItem<T>(
return true;
}
export async function drainCollectItemIfNeeded<T>(params: {
forceIndividualCollect: boolean;
isCrossChannel: boolean;
setForceIndividualCollect?: (next: boolean) => void;
items: T[];
run: (item: T) => Promise<void>;
}): Promise<"skipped" | "drained" | "empty"> {
if (!params.forceIndividualCollect && !params.isCrossChannel) {
return "skipped";
}
if (params.isCrossChannel) {
params.setForceIndividualCollect?.(true);
}
const drained = await drainNextQueueItem(params.items, params.run);
return drained ? "drained" : "empty";
}
export function buildQueueSummaryPrompt(params: {
state: QueueSummaryState;
noun: string;