From 9ef24fd400ba60bdca7cc9ab0e2d64a839c54f21 Mon Sep 17 00:00:00 2001 From: Tyler Yust <64381258+tyler6204@users.noreply.github.com> Date: Mon, 2 Feb 2026 01:22:41 -0800 Subject: [PATCH] fix: flush block streaming on paragraph boundaries for chunkMode=newline (#7014) * feat: Implement paragraph boundary flushing in block streaming - Added `flushOnParagraph` option to `BlockReplyChunking` for immediate flushing on paragraph breaks. - Updated `EmbeddedBlockChunker` to handle paragraph boundaries during chunking. - Enhanced `createBlockReplyCoalescer` to support flushing on enqueue. - Added tests to verify behavior of flushing with and without `flushOnEnqueue` set. - Updated relevant types and interfaces to include `flushOnParagraph` and `flushOnEnqueue` options. * fix: Improve streaming behavior and enhance block chunking logic - Resolved issue with stuck typing indicator after streamed BlueBubbles replies. - Refactored `EmbeddedBlockChunker` to streamline fence-split handling and ensure maxChars fallback for newline chunking. - Added tests to validate new chunking behavior, including handling of paragraph breaks and fence scenarios. - Updated changelog to reflect these changes. * test: Add test for clamping long paragraphs in EmbeddedBlockChunker - Introduced a new test case to verify that long paragraphs are correctly clamped to maxChars when flushOnParagraph is enabled. - Updated logic in EmbeddedBlockChunker to handle cases where the next paragraph break exceeds maxChars, ensuring proper chunking behavior. * refactor: streamline logging and improve error handling in message processing - Removed verbose logging statements from the `processMessage` function to reduce clutter. - Enhanced error handling by using `runtime.error` for typing restart failures. - Updated the `applySystemPromptOverrideToSession` function to accept a string directly instead of a function, simplifying the prompt application process. - Adjusted the `runEmbeddedAttempt` function to directly use the system prompt override without invoking it as a function. --- CHANGELOG.md | 4 +- docs/channels/bluebubbles.md | 4 +- extensions/bluebubbles/src/monitor.ts | 66 +++++--- src/agents/pi-embedded-block-chunker.test.ts | 97 ++++++++++++ src/agents/pi-embedded-block-chunker.ts | 141 ++++++++++++++---- src/agents/pi-embedded-runner/run/attempt.ts | 2 +- .../pi-embedded-runner/system-prompt.ts | 7 +- .../reply/agent-runner-execution.ts | 1 + src/auto-reply/reply/agent-runner.ts | 1 + src/auto-reply/reply/block-reply-coalescer.ts | 16 +- src/auto-reply/reply/block-streaming.ts | 34 +++-- src/auto-reply/reply/formatting.test.ts | 75 ++++++++++ src/auto-reply/reply/get-reply-directives.ts | 1 + src/auto-reply/reply/get-reply-run.ts | 1 + 14 files changed, 377 insertions(+), 73 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7c155b859a..a11b7dae8f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,7 +8,9 @@ Docs: https://docs.openclaw.ai - Telegram: add download timeouts for file fetches. (#6914) Thanks @hclsys. - Telegram: enforce thread specs for DM vs forum sends. (#6833) Thanks @obviyus. - +- Streaming: avoid stuck typing indicator after streamed BlueBubbles replies. +- Streaming: dedupe fence-split handling and cover maxChars fallback for newline chunking. + ## 2026.2.1 ### Changes diff --git a/docs/channels/bluebubbles.md b/docs/channels/bluebubbles.md index 7d46006bc9..69063304d5 100644 --- a/docs/channels/bluebubbles.md +++ b/docs/channels/bluebubbles.md @@ -192,7 +192,7 @@ Control whether responses are sent as a single message or streamed in blocks: { channels: { bluebubbles: { - blockStreaming: true, // enable block streaming (default behavior) + blockStreaming: true, // enable block streaming (off by default) }, }, } @@ -220,7 +220,7 @@ Provider options: - `channels.bluebubbles.groupAllowFrom`: Group sender allowlist. - `channels.bluebubbles.groups`: Per-group config (`requireMention`, etc.). - `channels.bluebubbles.sendReadReceipts`: Send read receipts (default: `true`). -- `channels.bluebubbles.blockStreaming`: Enable block streaming (default: `true`). +- `channels.bluebubbles.blockStreaming`: Enable block streaming (default: `false`; required for streaming replies). - `channels.bluebubbles.textChunkLimit`: Outbound chunk size in chars (default: 4000). - `channels.bluebubbles.chunkMode`: `length` (default) splits only when exceeding `textChunkLimit`; `newline` splits on blank lines (paragraph boundaries) before length chunking. - `channels.bluebubbles.mediaMaxMb`: Inbound media cap in MB (default: 8). diff --git a/extensions/bluebubbles/src/monitor.ts b/extensions/bluebubbles/src/monitor.ts index 00a5f36e24..45584057cb 100644 --- a/extensions/bluebubbles/src/monitor.ts +++ b/extensions/bluebubbles/src/monitor.ts @@ -2145,12 +2145,40 @@ async function processMessage( }; let sentMessage = false; + let streamingActive = false; + let typingRestartTimer: NodeJS.Timeout | undefined; + const typingRestartDelayMs = 150; + const clearTypingRestartTimer = () => { + if (typingRestartTimer) { + clearTimeout(typingRestartTimer); + typingRestartTimer = undefined; + } + }; + const restartTypingSoon = () => { + if (!streamingActive || !chatGuidForActions || !baseUrl || !password) { + return; + } + clearTypingRestartTimer(); + typingRestartTimer = setTimeout(() => { + typingRestartTimer = undefined; + if (!streamingActive) { + return; + } + sendBlueBubblesTyping(chatGuidForActions, true, { + cfg: config, + accountId: account.accountId, + }) + .catch((err) => { + runtime.error?.(`[bluebubbles] typing restart failed: ${String(err)}`); + }); + }, typingRestartDelayMs); + }; try { await core.channel.reply.dispatchReplyWithBufferedBlockDispatcher({ ctx: ctxPayload, cfg: config, dispatcherOptions: { - deliver: async (payload) => { + deliver: async (payload, info) => { const rawReplyToId = typeof payload.replyToId === "string" ? payload.replyToId.trim() : ""; // Resolve short ID (e.g., "5") to full UUID @@ -2185,6 +2213,9 @@ async function processMessage( maybeEnqueueOutboundMessageId(result.messageId, cachedBody); sentMessage = true; statusSink?.({ lastOutboundAt: Date.now() }); + if (info.kind === "block") { + restartTypingSoon(); + } } return; } @@ -2220,16 +2251,8 @@ async function processMessage( maybeEnqueueOutboundMessageId(result.messageId, chunk); sentMessage = true; statusSink?.({ lastOutboundAt: Date.now() }); - // In newline mode, restart typing after each chunk if more chunks remain - // Small delay allows the Apple API to finish clearing the typing state from message send - if (chunkMode === "newline" && i < chunks.length - 1 && chatGuidForActions) { - await new Promise((r) => setTimeout(r, 150)); - sendBlueBubblesTyping(chatGuidForActions, true, { - cfg: config, - accountId: account.accountId, - }).catch(() => { - // Ignore typing errors - }); + if (info.kind === "block") { + restartTypingSoon(); } } }, @@ -2240,7 +2263,8 @@ async function processMessage( if (!baseUrl || !password) { return; } - logVerbose(core, runtime, `typing start chatGuid=${chatGuidForActions}`); + streamingActive = true; + clearTypingRestartTimer(); try { await sendBlueBubblesTyping(chatGuidForActions, true, { cfg: config, @@ -2257,14 +2281,8 @@ async function processMessage( if (!baseUrl || !password) { return; } - try { - await sendBlueBubblesTyping(chatGuidForActions, false, { - cfg: config, - accountId: account.accountId, - }); - } catch (err) { - logVerbose(core, runtime, `typing stop failed: ${String(err)}`); - } + // Intentionally no-op for block streaming. We stop typing in finally + // after the run completes to avoid flicker between paragraph blocks. }, onError: (err, info) => { runtime.error?.(`BlueBubbles ${info.kind} reply failed: ${String(err)}`); @@ -2278,6 +2296,10 @@ async function processMessage( }, }); } finally { + const shouldStopTyping = + Boolean(chatGuidForActions && baseUrl && password) && (streamingActive || !sentMessage); + streamingActive = false; + clearTypingRestartTimer(); if (sentMessage && chatGuidForActions && ackMessageId) { core.channel.reactions.removeAckReactionAfterReply({ removeAfterReply: removeAckAfterReply, @@ -2301,8 +2323,8 @@ async function processMessage( }, }); } - if (chatGuidForActions && baseUrl && password && !sentMessage) { - // Stop typing indicator when no message was sent (e.g., NO_REPLY) + if (shouldStopTyping) { + // Stop typing after streaming completes to avoid a stuck indicator. sendBlueBubblesTyping(chatGuidForActions, false, { cfg: config, accountId: account.accountId, diff --git a/src/agents/pi-embedded-block-chunker.test.ts b/src/agents/pi-embedded-block-chunker.test.ts index 9af9f2c0b4..fe6614d210 100644 --- a/src/agents/pi-embedded-block-chunker.test.ts +++ b/src/agents/pi-embedded-block-chunker.test.ts @@ -30,4 +30,101 @@ describe("EmbeddedBlockChunker", () => { expect(chunks[0]).not.toContain("After"); expect(chunker.bufferedText).toMatch(/^After/); }); + + it("flushes paragraph boundaries before minChars when flushOnParagraph is set", () => { + const chunker = new EmbeddedBlockChunker({ + minChars: 100, + maxChars: 200, + breakPreference: "paragraph", + flushOnParagraph: true, + }); + + chunker.append("First paragraph.\n\nSecond paragraph."); + + const chunks: string[] = []; + chunker.drain({ force: false, emit: (chunk) => chunks.push(chunk) }); + + expect(chunks).toEqual(["First paragraph."]); + expect(chunker.bufferedText).toBe("Second paragraph."); + }); + + it("treats blank lines with whitespace as paragraph boundaries when flushOnParagraph is set", () => { + const chunker = new EmbeddedBlockChunker({ + minChars: 100, + maxChars: 200, + breakPreference: "paragraph", + flushOnParagraph: true, + }); + + chunker.append("First paragraph.\n \nSecond paragraph."); + + const chunks: string[] = []; + chunker.drain({ force: false, emit: (chunk) => chunks.push(chunk) }); + + expect(chunks).toEqual(["First paragraph."]); + expect(chunker.bufferedText).toBe("Second paragraph."); + }); + + it("falls back to maxChars when flushOnParagraph is set and no paragraph break exists", () => { + const chunker = new EmbeddedBlockChunker({ + minChars: 1, + maxChars: 10, + breakPreference: "paragraph", + flushOnParagraph: true, + }); + + chunker.append("abcdefghijKLMNOP"); + + const chunks: string[] = []; + chunker.drain({ force: false, emit: (chunk) => chunks.push(chunk) }); + + expect(chunks).toEqual(["abcdefghij"]); + expect(chunker.bufferedText).toBe("KLMNOP"); + }); + + it("clamps long paragraphs to maxChars when flushOnParagraph is set", () => { + const chunker = new EmbeddedBlockChunker({ + minChars: 1, + maxChars: 10, + breakPreference: "paragraph", + flushOnParagraph: true, + }); + + chunker.append("abcdefghijk\n\nRest"); + + const chunks: string[] = []; + chunker.drain({ force: false, emit: (chunk) => chunks.push(chunk) }); + + expect(chunks.every((chunk) => chunk.length <= 10)).toBe(true); + expect(chunks).toEqual(["abcdefghij", "k"]); + expect(chunker.bufferedText).toBe("Rest"); + }); + + it("ignores paragraph breaks inside fences when flushOnParagraph is set", () => { + const chunker = new EmbeddedBlockChunker({ + minChars: 100, + maxChars: 200, + breakPreference: "paragraph", + flushOnParagraph: true, + }); + + const text = [ + "Intro", + "```js", + "const a = 1;", + "", + "const b = 2;", + "```", + "", + "After fence", + ].join("\n"); + + chunker.append(text); + + const chunks: string[] = []; + chunker.drain({ force: false, emit: (chunk) => chunks.push(chunk) }); + + expect(chunks).toEqual(["Intro\n```js\nconst a = 1;\n\nconst b = 2;\n```"]); + expect(chunker.bufferedText).toBe("After fence"); + }); }); diff --git a/src/agents/pi-embedded-block-chunker.ts b/src/agents/pi-embedded-block-chunker.ts index ca9ce9cf6e..0416380beb 100644 --- a/src/agents/pi-embedded-block-chunker.ts +++ b/src/agents/pi-embedded-block-chunker.ts @@ -1,9 +1,12 @@ +import type { FenceSpan } from "../markdown/fences.js"; import { findFenceSpanAt, isSafeFenceBreak, parseFenceSpans } from "../markdown/fences.js"; export type BlockReplyChunking = { minChars: number; maxChars: number; breakPreference?: "paragraph" | "newline" | "sentence"; + /** When true, flush eagerly on \n\n paragraph boundaries regardless of minChars. */ + flushOnParagraph?: boolean; }; type FenceSplit = { @@ -16,6 +19,11 @@ type BreakResult = { fenceSplit?: FenceSplit; }; +type ParagraphBreak = { + index: number; + length: number; +}; + export class EmbeddedBlockChunker { #buffer = ""; readonly #chunking: BlockReplyChunking; @@ -49,6 +57,14 @@ export class EmbeddedBlockChunker { const { force, emit } = params; const minChars = Math.max(1, Math.floor(this.#chunking.minChars)); const maxChars = Math.max(minChars, Math.floor(this.#chunking.maxChars)); + + // When flushOnParagraph is set (chunkMode="newline"), eagerly split on \n\n + // boundaries regardless of minChars so each paragraph is sent immediately. + if (this.#chunking.flushOnParagraph && !force) { + this.#drainParagraphs(emit, maxChars); + return; + } + if (this.#buffer.length < minChars && !force) { return; } @@ -74,39 +90,10 @@ export class EmbeddedBlockChunker { return; } - const breakIdx = breakResult.index; - let rawChunk = this.#buffer.slice(0, breakIdx); - if (rawChunk.trim().length === 0) { - this.#buffer = stripLeadingNewlines(this.#buffer.slice(breakIdx)).trimStart(); + if (!this.#emitBreakResult(breakResult, emit)) { continue; } - let nextBuffer = this.#buffer.slice(breakIdx); - const fenceSplit = breakResult.fenceSplit; - if (fenceSplit) { - const closeFence = rawChunk.endsWith("\n") - ? `${fenceSplit.closeFenceLine}\n` - : `\n${fenceSplit.closeFenceLine}\n`; - rawChunk = `${rawChunk}${closeFence}`; - - const reopenFence = fenceSplit.reopenFenceLine.endsWith("\n") - ? fenceSplit.reopenFenceLine - : `${fenceSplit.reopenFenceLine}\n`; - nextBuffer = `${reopenFence}${nextBuffer}`; - } - - emit(rawChunk); - - if (fenceSplit) { - this.#buffer = nextBuffer; - } else { - const nextStart = - breakIdx < this.#buffer.length && /\s/.test(this.#buffer[breakIdx]) - ? breakIdx + 1 - : breakIdx; - this.#buffer = stripLeadingNewlines(this.#buffer.slice(nextStart)); - } - if (this.#buffer.length < minChars && !force) { return; } @@ -116,6 +103,76 @@ export class EmbeddedBlockChunker { } } + /** Eagerly emit complete paragraphs (text before \n\n) regardless of minChars. */ + #drainParagraphs(emit: (chunk: string) => void, maxChars: number) { + while (this.#buffer.length > 0) { + const fenceSpans = parseFenceSpans(this.#buffer); + const paragraphBreak = findNextParagraphBreak(this.#buffer, fenceSpans); + if (!paragraphBreak || paragraphBreak.index > maxChars) { + // No paragraph boundary yet (or the next boundary is too far). If the + // buffer exceeds maxChars, fall back to normal break logic to avoid + // oversized chunks or unbounded accumulation. + if (this.#buffer.length >= maxChars) { + const breakResult = this.#pickBreakIndex(this.#buffer, 1); + if (breakResult.index > 0) { + this.#emitBreakResult(breakResult, emit); + continue; + } + } + return; + } + + const chunk = this.#buffer.slice(0, paragraphBreak.index); + if (chunk.trim().length > 0) { + emit(chunk); + } + this.#buffer = stripLeadingNewlines( + this.#buffer.slice(paragraphBreak.index + paragraphBreak.length), + ); + } + } + + #emitBreakResult(breakResult: BreakResult, emit: (chunk: string) => void): boolean { + const breakIdx = breakResult.index; + if (breakIdx <= 0) { + return false; + } + + let rawChunk = this.#buffer.slice(0, breakIdx); + if (rawChunk.trim().length === 0) { + this.#buffer = stripLeadingNewlines(this.#buffer.slice(breakIdx)).trimStart(); + return false; + } + + let nextBuffer = this.#buffer.slice(breakIdx); + const fenceSplit = breakResult.fenceSplit; + if (fenceSplit) { + const closeFence = rawChunk.endsWith("\n") + ? `${fenceSplit.closeFenceLine}\n` + : `\n${fenceSplit.closeFenceLine}\n`; + rawChunk = `${rawChunk}${closeFence}`; + + const reopenFence = fenceSplit.reopenFenceLine.endsWith("\n") + ? fenceSplit.reopenFenceLine + : `${fenceSplit.reopenFenceLine}\n`; + nextBuffer = `${reopenFence}${nextBuffer}`; + } + + emit(rawChunk); + + if (fenceSplit) { + this.#buffer = nextBuffer; + } else { + const nextStart = + breakIdx < this.#buffer.length && /\s/.test(this.#buffer[breakIdx]) + ? breakIdx + 1 + : breakIdx; + this.#buffer = stripLeadingNewlines(this.#buffer.slice(nextStart)); + } + + return true; + } + #pickSoftBreakIndex(buffer: string, minCharsOverride?: number): BreakResult { const minChars = Math.max(1, Math.floor(minCharsOverride ?? this.#chunking.minChars)); if (buffer.length < minChars) { @@ -269,3 +326,27 @@ function stripLeadingNewlines(value: string): string { } return i > 0 ? value.slice(i) : value; } + +function findNextParagraphBreak( + buffer: string, + fenceSpans: FenceSpan[], + startIndex = 0, +): ParagraphBreak | null { + if (startIndex < 0) { + return null; + } + const re = /\n[\t ]*\n+/g; + re.lastIndex = startIndex; + let match: RegExpExecArray | null; + while ((match = re.exec(buffer)) !== null) { + const index = match.index ?? -1; + if (index < 0) { + continue; + } + if (!isSafeFenceBreak(fenceSpans, index)) { + continue; + } + return { index, length: match[0].length }; + } + return null; +} diff --git a/src/agents/pi-embedded-runner/run/attempt.ts b/src/agents/pi-embedded-runner/run/attempt.ts index 9a295c8b67..945d11eb22 100644 --- a/src/agents/pi-embedded-runner/run/attempt.ts +++ b/src/agents/pi-embedded-runner/run/attempt.ts @@ -390,7 +390,7 @@ export async function runEmbeddedAttempt( tools, }); const systemPromptOverride = createSystemPromptOverride(appendPrompt); - const systemPromptText = systemPromptOverride(); + const systemPromptText = systemPromptOverride; const sessionLock = await acquireSessionWriteLock({ sessionFile: params.sessionFile, diff --git a/src/agents/pi-embedded-runner/system-prompt.ts b/src/agents/pi-embedded-runner/system-prompt.ts index 8aa234e4ad..6384bc7e46 100644 --- a/src/agents/pi-embedded-runner/system-prompt.ts +++ b/src/agents/pi-embedded-runner/system-prompt.ts @@ -78,11 +78,8 @@ export function createSystemPromptOverride(systemPrompt: string): string { return systemPrompt.trim(); } -export function applySystemPromptOverrideToSession( - session: AgentSession, - override: (defaultPrompt?: string) => string, -) { - const prompt = override().trim(); +export function applySystemPromptOverrideToSession(session: AgentSession, override: string) { + const prompt = override.trim(); session.agent.setSystemPrompt(prompt); const mutableSession = session as unknown as { _baseSystemPrompt?: string; diff --git a/src/auto-reply/reply/agent-runner-execution.ts b/src/auto-reply/reply/agent-runner-execution.ts index 2cb64459d4..ebd58b38d0 100644 --- a/src/auto-reply/reply/agent-runner-execution.ts +++ b/src/auto-reply/reply/agent-runner-execution.ts @@ -63,6 +63,7 @@ export async function runAgentTurnWithFallback(params: { minChars: number; maxChars: number; breakPreference: "paragraph" | "newline" | "sentence"; + flushOnParagraph?: boolean; }; resolvedBlockStreamingBreak: "text_end" | "message_end"; applyReplyToMode: (payload: ReplyPayload) => ReplyPayload; diff --git a/src/auto-reply/reply/agent-runner.ts b/src/auto-reply/reply/agent-runner.ts index 51655ea178..3ca6e39774 100644 --- a/src/auto-reply/reply/agent-runner.ts +++ b/src/auto-reply/reply/agent-runner.ts @@ -68,6 +68,7 @@ export async function runReplyAgent(params: { minChars: number; maxChars: number; breakPreference: "paragraph" | "newline" | "sentence"; + flushOnParagraph?: boolean; }; resolvedBlockStreamingBreak: "text_end" | "message_end"; sessionCtx: TemplateContext; diff --git a/src/auto-reply/reply/block-reply-coalescer.ts b/src/auto-reply/reply/block-reply-coalescer.ts index 89e700f91f..5c5c16d0cb 100644 --- a/src/auto-reply/reply/block-reply-coalescer.ts +++ b/src/auto-reply/reply/block-reply-coalescer.ts @@ -18,6 +18,7 @@ export function createBlockReplyCoalescer(params: { const maxChars = Math.max(minChars, Math.floor(config.maxChars)); const idleMs = Math.max(0, Math.floor(config.idleMs)); const joiner = config.joiner ?? ""; + const flushOnEnqueue = config.flushOnEnqueue === true; let bufferText = ""; let bufferReplyToId: ReplyPayload["replyToId"]; @@ -57,7 +58,7 @@ export function createBlockReplyCoalescer(params: { if (!bufferText) { return; } - if (!options?.force && bufferText.length < minChars) { + if (!options?.force && !flushOnEnqueue && bufferText.length < minChars) { scheduleIdleFlush(); return; } @@ -86,6 +87,19 @@ export function createBlockReplyCoalescer(params: { return; } + // When flushOnEnqueue is set (chunkMode="newline"), each enqueued payload is treated + // as a separate paragraph and flushed immediately so delivery matches streaming boundaries. + if (flushOnEnqueue) { + if (bufferText) { + void flush({ force: true }); + } + bufferReplyToId = payload.replyToId; + bufferAudioAsVoice = payload.audioAsVoice; + bufferText = text; + void flush({ force: true }); + return; + } + if ( bufferText && (bufferReplyToId !== payload.replyToId || bufferAudioAsVoice !== payload.audioAsVoice) diff --git a/src/auto-reply/reply/block-streaming.ts b/src/auto-reply/reply/block-streaming.ts index 28b12d6962..96cadb9993 100644 --- a/src/auto-reply/reply/block-streaming.ts +++ b/src/auto-reply/reply/block-streaming.ts @@ -7,7 +7,7 @@ import { INTERNAL_MESSAGE_CHANNEL, listDeliverableMessageChannels, } from "../../utils/message-channel.js"; -import { resolveTextChunkLimit, type TextChunkProvider } from "../chunk.js"; +import { resolveChunkMode, resolveTextChunkLimit, type TextChunkProvider } from "../chunk.js"; const DEFAULT_BLOCK_STREAM_MIN = 800; const DEFAULT_BLOCK_STREAM_MAX = 1200; @@ -54,6 +54,8 @@ export type BlockStreamingCoalescing = { maxChars: number; idleMs: number; joiner: string; + /** When true, the coalescer flushes the buffer on each enqueue (paragraph-boundary flush). */ + flushOnEnqueue?: boolean; }; export function resolveBlockStreamingChunking( @@ -64,22 +66,23 @@ export function resolveBlockStreamingChunking( minChars: number; maxChars: number; breakPreference: "paragraph" | "newline" | "sentence"; + flushOnParagraph?: boolean; } { const providerKey = normalizeChunkProvider(provider); + const providerConfigKey = providerKey; const providerId = providerKey ? normalizeChannelId(providerKey) : null; const providerChunkLimit = providerId ? getChannelDock(providerId)?.outbound?.textChunkLimit : undefined; - const textLimit = resolveTextChunkLimit(cfg, providerKey, accountId, { + const textLimit = resolveTextChunkLimit(cfg, providerConfigKey, accountId, { fallbackLimit: providerChunkLimit, }); const chunkCfg = cfg?.agents?.defaults?.blockStreamingChunk; - // Note: chunkMode="newline" used to imply splitting on each newline, but outbound - // delivery now treats it as paragraph-aware chunking (only split on blank lines). - // Block streaming should follow the same rule, so we do NOT special-case newline - // mode here. - // (chunkMode no longer alters block streaming behavior) + // When chunkMode="newline", the outbound delivery splits on paragraph boundaries. + // The block chunker should flush eagerly on \n\n boundaries during streaming, + // regardless of minChars, so each paragraph is sent as its own message. + const chunkMode = resolveChunkMode(cfg, providerConfigKey, accountId); const maxRequested = Math.max(1, Math.floor(chunkCfg?.maxChars ?? DEFAULT_BLOCK_STREAM_MAX)); const maxChars = Math.max(1, Math.min(maxRequested, textLimit)); @@ -90,7 +93,12 @@ export function resolveBlockStreamingChunking( chunkCfg?.breakPreference === "newline" || chunkCfg?.breakPreference === "sentence" ? chunkCfg.breakPreference : "paragraph"; - return { minChars, maxChars, breakPreference }; + return { + minChars, + maxChars, + breakPreference, + flushOnParagraph: chunkMode === "newline", + }; } export function resolveBlockStreamingCoalescing( @@ -102,17 +110,20 @@ export function resolveBlockStreamingCoalescing( maxChars: number; breakPreference: "paragraph" | "newline" | "sentence"; }, + opts?: { chunkMode?: "length" | "newline" }, ): BlockStreamingCoalescing | undefined { const providerKey = normalizeChunkProvider(provider); + const providerConfigKey = providerKey; - // Note: chunkMode="newline" is paragraph-aware in outbound delivery (blank-line splits), - // so block streaming should not disable coalescing or flush per single newline. + // Resolve the outbound chunkMode so the coalescer can flush on paragraph boundaries + // when chunkMode="newline", matching the delivery-time splitting behavior. + const chunkMode = opts?.chunkMode ?? resolveChunkMode(cfg, providerConfigKey, accountId); const providerId = providerKey ? normalizeChannelId(providerKey) : null; const providerChunkLimit = providerId ? getChannelDock(providerId)?.outbound?.textChunkLimit : undefined; - const textLimit = resolveTextChunkLimit(cfg, providerKey, accountId, { + const textLimit = resolveTextChunkLimit(cfg, providerConfigKey, accountId, { fallbackLimit: providerChunkLimit, }); const providerDefaults = providerId @@ -149,5 +160,6 @@ export function resolveBlockStreamingCoalescing( maxChars, idleMs, joiner, + flushOnEnqueue: chunkMode === "newline", }; } diff --git a/src/auto-reply/reply/formatting.test.ts b/src/auto-reply/reply/formatting.test.ts index 2ad4d153f0..38729bf9a4 100644 --- a/src/auto-reply/reply/formatting.test.ts +++ b/src/auto-reply/reply/formatting.test.ts @@ -71,6 +71,81 @@ describe("block reply coalescer", () => { coalescer.stop(); }); + it("flushes each enqueued payload separately when flushOnEnqueue is set", async () => { + const flushes: string[] = []; + const coalescer = createBlockReplyCoalescer({ + config: { minChars: 1, maxChars: 200, idleMs: 100, joiner: "\n\n", flushOnEnqueue: true }, + shouldAbort: () => false, + onFlush: (payload) => { + flushes.push(payload.text ?? ""); + }, + }); + + coalescer.enqueue({ text: "First paragraph" }); + coalescer.enqueue({ text: "Second paragraph" }); + coalescer.enqueue({ text: "Third paragraph" }); + + await Promise.resolve(); + expect(flushes).toEqual(["First paragraph", "Second paragraph", "Third paragraph"]); + coalescer.stop(); + }); + + it("still accumulates when flushOnEnqueue is not set (default)", async () => { + vi.useFakeTimers(); + const flushes: string[] = []; + const coalescer = createBlockReplyCoalescer({ + config: { minChars: 1, maxChars: 2000, idleMs: 100, joiner: "\n\n" }, + shouldAbort: () => false, + onFlush: (payload) => { + flushes.push(payload.text ?? ""); + }, + }); + + coalescer.enqueue({ text: "First paragraph" }); + coalescer.enqueue({ text: "Second paragraph" }); + + await vi.advanceTimersByTimeAsync(100); + expect(flushes).toEqual(["First paragraph\n\nSecond paragraph"]); + coalescer.stop(); + }); + + it("flushes short payloads immediately when flushOnEnqueue is set", async () => { + const flushes: string[] = []; + const coalescer = createBlockReplyCoalescer({ + config: { minChars: 10, maxChars: 200, idleMs: 50, joiner: "\n\n", flushOnEnqueue: true }, + shouldAbort: () => false, + onFlush: (payload) => { + flushes.push(payload.text ?? ""); + }, + }); + + coalescer.enqueue({ text: "Hi" }); + await Promise.resolve(); + expect(flushes).toEqual(["Hi"]); + coalescer.stop(); + }); + + it("resets char budget per paragraph with flushOnEnqueue", async () => { + const flushes: string[] = []; + const coalescer = createBlockReplyCoalescer({ + config: { minChars: 1, maxChars: 30, idleMs: 100, joiner: "\n\n", flushOnEnqueue: true }, + shouldAbort: () => false, + onFlush: (payload) => { + flushes.push(payload.text ?? ""); + }, + }); + + // Each 20-char payload fits within maxChars=30 individually + coalescer.enqueue({ text: "12345678901234567890" }); + coalescer.enqueue({ text: "abcdefghijklmnopqrst" }); + + await Promise.resolve(); + // Without flushOnEnqueue, these would be joined to 40+ chars and trigger maxChars split. + // With flushOnEnqueue, each is sent independently within budget. + expect(flushes).toEqual(["12345678901234567890", "abcdefghijklmnopqrst"]); + coalescer.stop(); + }); + it("flushes buffered text before media payloads", () => { const flushes: Array<{ text?: string; mediaUrls?: string[] }> = []; const coalescer = createBlockReplyCoalescer({ diff --git a/src/auto-reply/reply/get-reply-directives.ts b/src/auto-reply/reply/get-reply-directives.ts index 4ab24e97cc..c9376e17f0 100644 --- a/src/auto-reply/reply/get-reply-directives.ts +++ b/src/auto-reply/reply/get-reply-directives.ts @@ -46,6 +46,7 @@ export type ReplyDirectiveContinuation = { minChars: number; maxChars: number; breakPreference: "paragraph" | "newline" | "sentence"; + flushOnParagraph?: boolean; }; resolvedBlockStreamingBreak: "text_end" | "message_end"; provider: string; diff --git a/src/auto-reply/reply/get-reply-run.ts b/src/auto-reply/reply/get-reply-run.ts index 8deefbb932..8be10a14ed 100644 --- a/src/auto-reply/reply/get-reply-run.ts +++ b/src/auto-reply/reply/get-reply-run.ts @@ -76,6 +76,7 @@ type RunPreparedReplyParams = { minChars: number; maxChars: number; breakPreference: "paragraph" | "newline" | "sentence"; + flushOnParagraph?: boolean; }; resolvedBlockStreamingBreak: "text_end" | "message_end"; modelState: Awaited>;