refactor(queue): share next-item drain helper across queue drains

This commit is contained in:
Peter Steinberger
2026-02-19 06:43:17 +00:00
parent 6355bae1f9
commit 8d048d412f
3 changed files with 36 additions and 36 deletions

View File

@@ -10,6 +10,7 @@ import {
applyQueueDropPolicy,
buildCollectPrompt,
clearQueueSummaryState,
drainNextQueueItem,
hasCrossChannelItems,
previewQueueSummaryPrompt,
waitForQueueDebounce,
@@ -108,12 +109,9 @@ function scheduleAnnounceDrain(key: string) {
await waitForQueueDebounce(queue);
if (queue.mode === "collect") {
if (forceIndividualCollect) {
const next = queue.items[0];
if (!next) {
if (!(await drainNextQueueItem(queue.items, async (item) => await queue.send(item)))) {
break;
}
await queue.send(next);
queue.items.shift();
continue;
}
const isCrossChannel = hasCrossChannelItems(queue.items, (item) => {
@@ -127,12 +125,9 @@ function scheduleAnnounceDrain(key: string) {
});
if (isCrossChannel) {
forceIndividualCollect = true;
const next = queue.items[0];
if (!next) {
if (!(await drainNextQueueItem(queue.items, async (item) => await queue.send(item)))) {
break;
}
await queue.send(next);
queue.items.shift();
continue;
}
const items = queue.items.slice();
@@ -157,22 +152,21 @@ function scheduleAnnounceDrain(key: string) {
const summaryPrompt = previewQueueSummaryPrompt({ state: queue, noun: "announce" });
if (summaryPrompt) {
const next = queue.items[0];
if (!next) {
if (
!(await drainNextQueueItem(
queue.items,
async (item) => await queue.send({ ...item, prompt: summaryPrompt }),
))
) {
break;
}
await queue.send({ ...next, prompt: summaryPrompt });
queue.items.shift();
clearQueueSummaryState(queue);
continue;
}
const next = queue.items[0];
if (!next) {
if (!(await drainNextQueueItem(queue.items, async (item) => await queue.send(item)))) {
break;
}
await queue.send(next);
queue.items.shift();
}
} catch (err) {
// Keep items in queue and retry after debounce; avoid hot-loop retries.

View File

@@ -2,6 +2,7 @@ import { defaultRuntime } from "../../../runtime.js";
import {
buildCollectPrompt,
clearQueueSummaryState,
drainNextQueueItem,
hasCrossChannelItems,
previewQueueSummaryPrompt,
waitForQueueDebounce,
@@ -30,12 +31,9 @@ export function scheduleFollowupDrain(
//
// Debug: `pnpm test src/auto-reply/reply/queue.collect-routing.test.ts`
if (forceIndividualCollect) {
const next = queue.items[0];
if (!next) {
if (!(await drainNextQueueItem(queue.items, runFollowup))) {
break;
}
await runFollowup(next);
queue.items.shift();
continue;
}
@@ -60,12 +58,9 @@ export function scheduleFollowupDrain(
if (isCrossChannel) {
forceIndividualCollect = true;
const next = queue.items[0];
if (!next) {
if (!(await drainNextQueueItem(queue.items, runFollowup))) {
break;
}
await runFollowup(next);
queue.items.shift();
continue;
}
@@ -114,26 +109,24 @@ export function scheduleFollowupDrain(
if (!run) {
break;
}
const next = queue.items[0];
if (!next) {
if (
!(await drainNextQueueItem(queue.items, async () => {
await runFollowup({
prompt: summaryPrompt,
run,
enqueuedAt: Date.now(),
});
}))
) {
break;
}
await runFollowup({
prompt: summaryPrompt,
run,
enqueuedAt: Date.now(),
});
queue.items.shift();
clearQueueSummaryState(queue);
continue;
}
const next = queue.items[0];
if (!next) {
if (!(await drainNextQueueItem(queue.items, runFollowup))) {
break;
}
await runFollowup(next);
queue.items.shift();
}
} catch (err) {
queue.lastEnqueuedAt = Date.now();

View File

@@ -129,6 +129,19 @@ export function waitForQueueDebounce(queue: {
});
}
export async function drainNextQueueItem<T>(
items: T[],
run: (item: T) => Promise<void>,
): Promise<boolean> {
const next = items[0];
if (!next) {
return false;
}
await run(next);
items.shift();
return true;
}
export function buildQueueSummaryPrompt(params: {
state: QueueSummaryState;
noun: string;