diff --git a/src/agents/pi-embedded-subscribe.code-span-awareness.e2e.test.ts b/src/agents/pi-embedded-subscribe.code-span-awareness.e2e.test.ts index 59f7cfe66a..6bfe9db53d 100644 --- a/src/agents/pi-embedded-subscribe.code-span-awareness.e2e.test.ts +++ b/src/agents/pi-embedded-subscribe.code-span-awareness.e2e.test.ts @@ -1,5 +1,8 @@ import { describe, expect, it, vi } from "vitest"; -import { createStubSessionHarness } from "./pi-embedded-subscribe.e2e-harness.js"; +import { + createStubSessionHarness, + emitAssistantTextDelta, +} from "./pi-embedded-subscribe.e2e-harness.js"; import { subscribeEmbeddedPiSession } from "./pi-embedded-subscribe.js"; describe("subscribeEmbeddedPiSession thinking tag code span awareness", () => { @@ -19,13 +22,9 @@ describe("subscribeEmbeddedPiSession thinking tag code span awareness", () => { it("does not strip thinking tags inside inline code backticks", () => { const { emit, onPartialReply } = createPartialReplyHarness(); - emit({ - type: "message_update", - message: { role: "assistant" }, - assistantMessageEvent: { - type: "text_delta", - delta: "The fix strips leaked `` tags from messages.", - }, + emitAssistantTextDelta({ + emit, + delta: "The fix strips leaked `` tags from messages.", }); expect(onPartialReply).toHaveBeenCalled(); @@ -36,13 +35,9 @@ describe("subscribeEmbeddedPiSession thinking tag code span awareness", () => { it("does not strip thinking tags inside fenced code blocks", () => { const { emit, onPartialReply } = createPartialReplyHarness(); - emit({ - type: "message_update", - message: { role: "assistant" }, - assistantMessageEvent: { - type: "text_delta", - delta: "Example:\n ````\ncode example\n ````\nDone.", - }, + emitAssistantTextDelta({ + emit, + delta: "Example:\n ````\ncode example\n ````\nDone.", }); expect(onPartialReply).toHaveBeenCalled(); @@ -53,13 +48,9 @@ describe("subscribeEmbeddedPiSession thinking tag code span awareness", () => { it("still strips actual thinking tags outside code spans", () => { const { emit, onPartialReply } = createPartialReplyHarness(); - emit({ - type: "message_update", - message: { role: "assistant" }, - assistantMessageEvent: { - type: "text_delta", - delta: "Hello internal thought world", - }, + emitAssistantTextDelta({ + emit, + delta: "Hello internal thought world", }); expect(onPartialReply).toHaveBeenCalled(); diff --git a/src/agents/pi-embedded-subscribe.reply-tags.e2e.test.ts b/src/agents/pi-embedded-subscribe.reply-tags.e2e.test.ts index c1359648e5..a76db60229 100644 --- a/src/agents/pi-embedded-subscribe.reply-tags.e2e.test.ts +++ b/src/agents/pi-embedded-subscribe.reply-tags.e2e.test.ts @@ -1,6 +1,10 @@ import type { AssistantMessage } from "@mariozechner/pi-ai"; import { describe, expect, it, vi } from "vitest"; -import { createStubSessionHarness } from "./pi-embedded-subscribe.e2e-harness.js"; +import { + createStubSessionHarness, + emitAssistantTextDelta, + emitAssistantTextEnd, +} from "./pi-embedded-subscribe.e2e-harness.js"; import { subscribeEmbeddedPiSession } from "./pi-embedded-subscribe.js"; describe("subscribeEmbeddedPiSession reply tags", () => { @@ -27,19 +31,8 @@ describe("subscribeEmbeddedPiSession reply tags", () => { const { emit, onBlockReply } = createBlockReplyHarness(); emit({ type: "message_start", message: { role: "assistant" } }); - emit({ - type: "message_update", - message: { role: "assistant" }, - assistantMessageEvent: { - type: "text_delta", - delta: "[[reply_to_current]]\nHello", - }, - }); - emit({ - type: "message_update", - message: { role: "assistant" }, - assistantMessageEvent: { type: "text_end" }, - }); + emitAssistantTextDelta({ emit, delta: "[[reply_to_current]]\nHello" }); + emitAssistantTextEnd({ emit }); const assistantMessage = { role: "assistant", @@ -58,16 +51,8 @@ describe("subscribeEmbeddedPiSession reply tags", () => { const { emit, onBlockReply } = createBlockReplyHarness(); emit({ type: "message_start", message: { role: "assistant" } }); - emit({ - type: "message_update", - message: { role: "assistant" }, - assistantMessageEvent: { type: "text_delta", delta: "Hello [[" }, - }); - emit({ - type: "message_update", - message: { role: "assistant" }, - assistantMessageEvent: { type: "text_end" }, - }); + emitAssistantTextDelta({ emit, delta: "Hello [[" }); + emitAssistantTextEnd({ emit }); const assistantMessage = { role: "assistant", @@ -92,26 +77,10 @@ describe("subscribeEmbeddedPiSession reply tags", () => { }); emit({ type: "message_start", message: { role: "assistant" } }); - emit({ - type: "message_update", - message: { role: "assistant" }, - assistantMessageEvent: { type: "text_delta", delta: "[[reply_to:1897" }, - }); - emit({ - type: "message_update", - message: { role: "assistant" }, - assistantMessageEvent: { type: "text_delta", delta: "]] Hello" }, - }); - emit({ - type: "message_update", - message: { role: "assistant" }, - assistantMessageEvent: { type: "text_delta", delta: " world" }, - }); - emit({ - type: "message_update", - message: { role: "assistant" }, - assistantMessageEvent: { type: "text_end" }, - }); + emitAssistantTextDelta({ emit, delta: "[[reply_to:1897" }); + emitAssistantTextDelta({ emit, delta: "]] Hello" }); + emitAssistantTextDelta({ emit, delta: " world" }); + emitAssistantTextEnd({ emit }); const lastPayload = onPartialReply.mock.calls.at(-1)?.[0]; expect(lastPayload?.text).toBe("Hello world"); diff --git a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.calls-onblockreplyflush-before-tool-execution-start-preserve.e2e.test.ts b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.calls-onblockreplyflush-before-tool-execution-start-preserve.e2e.test.ts index 020d7e939d..e52ea881f9 100644 --- a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.calls-onblockreplyflush-before-tool-execution-start-preserve.e2e.test.ts +++ b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.calls-onblockreplyflush-before-tool-execution-start-preserve.e2e.test.ts @@ -1,21 +1,13 @@ import { describe, expect, it, vi } from "vitest"; +import { + createStubSessionHarness, + emitAssistantTextDelta, +} from "./pi-embedded-subscribe.e2e-harness.js"; import { subscribeEmbeddedPiSession } from "./pi-embedded-subscribe.js"; -type StubSession = { - subscribe: (fn: (evt: unknown) => void) => () => void; -}; - -type SessionEventHandler = (evt: unknown) => void; - describe("subscribeEmbeddedPiSession", () => { it("calls onBlockReplyFlush before tool_execution_start to preserve message boundaries", () => { - let handler: SessionEventHandler | undefined; - const session: StubSession = { - subscribe: (fn) => { - handler = fn; - return () => {}; - }, - }; + const { session, emit } = createStubSessionHarness(); const onBlockReplyFlush = vi.fn(); const onBlockReply = vi.fn(); @@ -29,24 +21,17 @@ describe("subscribeEmbeddedPiSession", () => { }); // Simulate text arriving before tool - handler?.({ + emit({ type: "message_start", message: { role: "assistant" }, }); - handler?.({ - type: "message_update", - message: { role: "assistant" }, - assistantMessageEvent: { - type: "text_delta", - delta: "First message before tool.", - }, - }); + emitAssistantTextDelta({ emit, delta: "First message before tool." }); expect(onBlockReplyFlush).not.toHaveBeenCalled(); // Tool execution starts - should trigger flush - handler?.({ + emit({ type: "tool_execution_start", toolName: "bash", toolCallId: "tool-flush-1", @@ -56,7 +41,7 @@ describe("subscribeEmbeddedPiSession", () => { expect(onBlockReplyFlush).toHaveBeenCalledTimes(1); // Another tool - should flush again - handler?.({ + emit({ type: "tool_execution_start", toolName: "read", toolCallId: "tool-flush-2", @@ -66,13 +51,7 @@ describe("subscribeEmbeddedPiSession", () => { expect(onBlockReplyFlush).toHaveBeenCalledTimes(2); }); it("flushes buffered block chunks before tool execution", () => { - let handler: SessionEventHandler | undefined; - const session: StubSession = { - subscribe: (fn) => { - handler = fn; - return () => {}; - }, - }; + const { session, emit } = createStubSessionHarness(); const onBlockReply = vi.fn(); const onBlockReplyFlush = vi.fn(); @@ -86,23 +65,16 @@ describe("subscribeEmbeddedPiSession", () => { blockReplyChunking: { minChars: 50, maxChars: 200 }, }); - handler?.({ + emit({ type: "message_start", message: { role: "assistant" }, }); - handler?.({ - type: "message_update", - message: { role: "assistant" }, - assistantMessageEvent: { - type: "text_delta", - delta: "Short chunk.", - }, - }); + emitAssistantTextDelta({ emit, delta: "Short chunk." }); expect(onBlockReply).not.toHaveBeenCalled(); - handler?.({ + emit({ type: "tool_execution_start", toolName: "bash", toolCallId: "tool-flush-buffer-1", diff --git a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.does-not-append-text-end-content-is.e2e.test.ts b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.does-not-append-text-end-content-is.e2e.test.ts index c268c11ff8..d25f3beb25 100644 --- a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.does-not-append-text-end-content-is.e2e.test.ts +++ b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.does-not-append-text-end-content-is.e2e.test.ts @@ -1,51 +1,21 @@ import { describe, expect, it, vi } from "vitest"; -import { subscribeEmbeddedPiSession } from "./pi-embedded-subscribe.js"; - -type StubSession = { - subscribe: (fn: (evt: unknown) => void) => () => void; -}; +import { + createTextEndBlockReplyHarness, + emitAssistantTextDelta, + emitAssistantTextEnd, +} from "./pi-embedded-subscribe.e2e-harness.js"; describe("subscribeEmbeddedPiSession", () => { function setupTextEndSubscription() { - let handler: ((evt: unknown) => void) | undefined; - const session: StubSession = { - subscribe: (fn) => { - handler = fn; - return () => {}; - }, - }; - const onBlockReply = vi.fn(); - - const subscription = subscribeEmbeddedPiSession({ - session: session as unknown as Parameters[0]["session"], - runId: "run", - onBlockReply, - blockReplyBreak: "text_end", - }); - - const emit = (evt: unknown) => handler?.(evt); + const { emit, subscription } = createTextEndBlockReplyHarness({ onBlockReply }); const emitDelta = (delta: string) => { - emit({ - type: "message_update", - message: { role: "assistant" }, - assistantMessageEvent: { - type: "text_delta", - delta, - }, - }); + emitAssistantTextDelta({ emit, delta }); }; const emitTextEnd = (content: string) => { - emit({ - type: "message_update", - message: { role: "assistant" }, - assistantMessageEvent: { - type: "text_end", - content, - }, - }); + emitAssistantTextEnd({ emit, content }); }; return { onBlockReply, subscription, emitDelta, emitTextEnd }; diff --git a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.does-not-duplicate-text-end-repeats-full.e2e.test.ts b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.does-not-duplicate-text-end-repeats-full.e2e.test.ts index 7dc6b6156b..ade01e2180 100644 --- a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.does-not-duplicate-text-end-repeats-full.e2e.test.ts +++ b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.does-not-duplicate-text-end-repeats-full.e2e.test.ts @@ -1,80 +1,40 @@ import { describe, expect, it, vi } from "vitest"; -import { createStubSessionHarness } from "./pi-embedded-subscribe.e2e-harness.js"; -import { subscribeEmbeddedPiSession } from "./pi-embedded-subscribe.js"; +import { + createTextEndBlockReplyHarness, + emitAssistantTextDelta, + emitAssistantTextEnd, +} from "./pi-embedded-subscribe.e2e-harness.js"; describe("subscribeEmbeddedPiSession", () => { - function createTextEndHarness(chunking?: { - minChars: number; - maxChars: number; - breakPreference: "newline"; - }) { - const { session, emit } = createStubSessionHarness(); - const onBlockReply = vi.fn(); - - const subscription = subscribeEmbeddedPiSession({ - session, - runId: "run", - onBlockReply, - blockReplyBreak: "text_end", - blockReplyChunking: chunking, - }); - - return { emit, onBlockReply, subscription }; - } - it("does not duplicate when text_end repeats full content", () => { - const { emit, onBlockReply, subscription } = createTextEndHarness(); + const onBlockReply = vi.fn(); + const { emit, subscription } = createTextEndBlockReplyHarness({ onBlockReply }); - emit({ - type: "message_update", - message: { role: "assistant" }, - assistantMessageEvent: { - type: "text_delta", - delta: "Good morning!", - }, - }); - - emit({ - type: "message_update", - message: { role: "assistant" }, - assistantMessageEvent: { - type: "text_end", - content: "Good morning!", - }, - }); + emitAssistantTextDelta({ emit, delta: "Good morning!" }); + emitAssistantTextEnd({ emit, content: "Good morning!" }); expect(onBlockReply).toHaveBeenCalledTimes(1); expect(subscription.assistantTexts).toEqual(["Good morning!"]); }); it("does not duplicate block chunks when text_end repeats full content", () => { - const { emit, onBlockReply } = createTextEndHarness({ - minChars: 5, - maxChars: 40, - breakPreference: "newline", + const onBlockReply = vi.fn(); + const { emit } = createTextEndBlockReplyHarness({ + onBlockReply, + blockReplyChunking: { + minChars: 5, + maxChars: 40, + breakPreference: "newline", + }, }); const fullText = "First line\nSecond line\nThird line\n"; - emit({ - type: "message_update", - message: { role: "assistant" }, - assistantMessageEvent: { - type: "text_delta", - delta: fullText, - }, - }); + emitAssistantTextDelta({ emit, delta: fullText }); const callsAfterDelta = onBlockReply.mock.calls.length; expect(callsAfterDelta).toBeGreaterThan(0); - emit({ - type: "message_update", - message: { role: "assistant" }, - assistantMessageEvent: { - type: "text_end", - content: fullText, - }, - }); + emitAssistantTextEnd({ emit, content: fullText }); expect(onBlockReply).toHaveBeenCalledTimes(callsAfterDelta); }); diff --git a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.does-not-emit-duplicate-block-replies-text.e2e.test.ts b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.does-not-emit-duplicate-block-replies-text.e2e.test.ts index ee7037a24c..44f829affb 100644 --- a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.does-not-emit-duplicate-block-replies-text.e2e.test.ts +++ b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.does-not-emit-duplicate-block-replies-text.e2e.test.ts @@ -1,45 +1,21 @@ import type { AssistantMessage } from "@mariozechner/pi-ai"; import { describe, expect, it, vi } from "vitest"; -import { createStubSessionHarness } from "./pi-embedded-subscribe.e2e-harness.js"; +import { + createStubSessionHarness, + createTextEndBlockReplyHarness, + emitAssistantTextDelta, + emitAssistantTextEnd, +} from "./pi-embedded-subscribe.e2e-harness.js"; import { subscribeEmbeddedPiSession } from "./pi-embedded-subscribe.js"; describe("subscribeEmbeddedPiSession", () => { it("does not emit duplicate block replies when text_end repeats", () => { - const { session, emit } = createStubSessionHarness(); - const onBlockReply = vi.fn(); + const { emit, subscription } = createTextEndBlockReplyHarness({ onBlockReply }); - const subscription = subscribeEmbeddedPiSession({ - session, - runId: "run", - onBlockReply, - blockReplyBreak: "text_end", - }); - - emit({ - type: "message_update", - message: { role: "assistant" }, - assistantMessageEvent: { - type: "text_delta", - delta: "Hello block", - }, - }); - - emit({ - type: "message_update", - message: { role: "assistant" }, - assistantMessageEvent: { - type: "text_end", - }, - }); - - emit({ - type: "message_update", - message: { role: "assistant" }, - assistantMessageEvent: { - type: "text_end", - }, - }); + emitAssistantTextDelta({ emit, delta: "Hello block" }); + emitAssistantTextEnd({ emit }); + emitAssistantTextEnd({ emit }); expect(onBlockReply).toHaveBeenCalledTimes(1); expect(subscription.assistantTexts).toEqual(["Hello block"]); diff --git a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.emits-block-replies-text-end-does-not.e2e.test.ts b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.emits-block-replies-text-end-does-not.e2e.test.ts index e13ffda120..cb383b04bf 100644 --- a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.emits-block-replies-text-end-does-not.e2e.test.ts +++ b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.emits-block-replies-text-end-does-not.e2e.test.ts @@ -1,42 +1,18 @@ import type { AssistantMessage } from "@mariozechner/pi-ai"; import { describe, expect, it, vi } from "vitest"; -import { createStubSessionHarness } from "./pi-embedded-subscribe.e2e-harness.js"; -import { subscribeEmbeddedPiSession } from "./pi-embedded-subscribe.js"; +import { + createTextEndBlockReplyHarness, + emitAssistantTextDelta, + emitAssistantTextEnd, +} from "./pi-embedded-subscribe.e2e-harness.js"; describe("subscribeEmbeddedPiSession", () => { - function createTextEndBlockReplyHarness() { - const { session, emit } = createStubSessionHarness(); - const onBlockReply = vi.fn(); - - const subscription = subscribeEmbeddedPiSession({ - session, - runId: "run", - onBlockReply, - blockReplyBreak: "text_end", - }); - - return { emit, onBlockReply, subscription }; - } - it("emits block replies on text_end and does not duplicate on message_end", () => { - const { emit, onBlockReply, subscription } = createTextEndBlockReplyHarness(); + const onBlockReply = vi.fn(); + const { emit, subscription } = createTextEndBlockReplyHarness({ onBlockReply }); - emit({ - type: "message_update", - message: { role: "assistant" }, - assistantMessageEvent: { - type: "text_delta", - delta: "Hello block", - }, - }); - - emit({ - type: "message_update", - message: { role: "assistant" }, - assistantMessageEvent: { - type: "text_end", - }, - }); + emitAssistantTextDelta({ emit, delta: "Hello block" }); + emitAssistantTextEnd({ emit }); expect(onBlockReply).toHaveBeenCalledTimes(1); const payload = onBlockReply.mock.calls[0][0]; @@ -54,18 +30,12 @@ describe("subscribeEmbeddedPiSession", () => { expect(subscription.assistantTexts).toEqual(["Hello block"]); }); it("does not duplicate when message_end flushes and a late text_end arrives", () => { - const { emit, onBlockReply, subscription } = createTextEndBlockReplyHarness(); + const onBlockReply = vi.fn(); + const { emit, subscription } = createTextEndBlockReplyHarness({ onBlockReply }); emit({ type: "message_start", message: { role: "assistant" } }); - emit({ - type: "message_update", - message: { role: "assistant" }, - assistantMessageEvent: { - type: "text_delta", - delta: "Hello block", - }, - }); + emitAssistantTextDelta({ emit, delta: "Hello block" }); const assistantMessage = { role: "assistant", @@ -79,14 +49,7 @@ describe("subscribeEmbeddedPiSession", () => { expect(subscription.assistantTexts).toEqual(["Hello block"]); // Some providers can still emit a late text_end; this must not re-emit. - emit({ - type: "message_update", - message: { role: "assistant" }, - assistantMessageEvent: { - type: "text_end", - content: "Hello block", - }, - }); + emitAssistantTextEnd({ emit, content: "Hello block" }); expect(onBlockReply).toHaveBeenCalledTimes(1); expect(subscription.assistantTexts).toEqual(["Hello block"]); diff --git a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.filters-final-suppresses-output-without-start-tag.e2e.test.ts b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.filters-final-suppresses-output-without-start-tag.e2e.test.ts index c5455dcd2a..9ccb78605a 100644 --- a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.filters-final-suppresses-output-without-start-tag.e2e.test.ts +++ b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.filters-final-suppresses-output-without-start-tag.e2e.test.ts @@ -2,6 +2,7 @@ import type { AssistantMessage } from "@mariozechner/pi-ai"; import { describe, expect, it, vi } from "vitest"; import { createStubSessionHarness, + emitAssistantTextDelta, emitMessageStartAndEndForAssistantText, expectSingleAgentEventText, } from "./pi-embedded-subscribe.e2e-harness.js"; @@ -23,14 +24,7 @@ describe("subscribeEmbeddedPiSession", () => { }); emit({ type: "message_start", message: { role: "assistant" } }); - emit({ - type: "message_update", - message: { role: "assistant" }, - assistantMessageEvent: { - type: "text_delta", - delta: "Hi there", - }, - }); + emitAssistantTextDelta({ emit, delta: "Hi there" }); expect(onPartialReply).toHaveBeenCalled(); const firstPayload = onPartialReply.mock.calls[0][0]; @@ -39,14 +33,7 @@ describe("subscribeEmbeddedPiSession", () => { onPartialReply.mockReset(); emit({ type: "message_start", message: { role: "assistant" } }); - emit({ - type: "message_update", - message: { role: "assistant" }, - assistantMessageEvent: { - type: "text_delta", - delta: "Oops no start", - }, - }); + emitAssistantTextDelta({ emit, delta: "Oops no start" }); expect(onPartialReply).not.toHaveBeenCalled(); }); @@ -75,14 +62,7 @@ describe("subscribeEmbeddedPiSession", () => { onPartialReply, }); - emit({ - type: "message_update", - message: { role: "assistant" }, - assistantMessageEvent: { - type: "text_delta", - delta: "Hello world", - }, - }); + emitAssistantTextDelta({ emit, delta: "Hello world" }); const payload = onPartialReply.mock.calls[0][0]; expect(payload.text).toBe("Hello world"); diff --git a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.keeps-assistanttexts-final-answer-block-replies-are.e2e.test.ts b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.keeps-assistanttexts-final-answer-block-replies-are.e2e.test.ts index 0bb70f3d8b..710b1f280f 100644 --- a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.keeps-assistanttexts-final-answer-block-replies-are.e2e.test.ts +++ b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.keeps-assistanttexts-final-answer-block-replies-are.e2e.test.ts @@ -1,51 +1,26 @@ import type { AssistantMessage } from "@mariozechner/pi-ai"; import { describe, expect, it, vi } from "vitest"; +import { + createStubSessionHarness, + emitAssistantTextDelta, + emitAssistantTextEnd, +} from "./pi-embedded-subscribe.e2e-harness.js"; import { subscribeEmbeddedPiSession } from "./pi-embedded-subscribe.js"; -type StubSession = { - subscribe: (fn: (evt: unknown) => void) => () => void; -}; - describe("subscribeEmbeddedPiSession", () => { it("keeps assistantTexts to the final answer when block replies are disabled", () => { - let handler: ((evt: unknown) => void) | undefined; - const session: StubSession = { - subscribe: (fn) => { - handler = fn; - return () => {}; - }, - }; + const { session, emit } = createStubSessionHarness(); const subscription = subscribeEmbeddedPiSession({ - session: session as unknown as Parameters[0]["session"], + session, runId: "run", reasoningMode: "on", }); - handler?.({ type: "message_start", message: { role: "assistant" } }); - handler?.({ - type: "message_update", - message: { role: "assistant" }, - assistantMessageEvent: { - type: "text_delta", - delta: "Final ", - }, - }); - handler?.({ - type: "message_update", - message: { role: "assistant" }, - assistantMessageEvent: { - type: "text_delta", - delta: "answer", - }, - }); - handler?.({ - type: "message_update", - message: { role: "assistant" }, - assistantMessageEvent: { - type: "text_end", - }, - }); + emit({ type: "message_start", message: { role: "assistant" } }); + emitAssistantTextDelta({ emit, delta: "Final " }); + emitAssistantTextDelta({ emit, delta: "answer" }); + emitAssistantTextEnd({ emit }); const assistantMessage = { role: "assistant", @@ -55,45 +30,25 @@ describe("subscribeEmbeddedPiSession", () => { ], } as AssistantMessage; - handler?.({ type: "message_end", message: assistantMessage }); + emit({ type: "message_end", message: assistantMessage }); expect(subscription.assistantTexts).toEqual(["Final answer"]); }); it("suppresses partial replies when reasoning is enabled and block replies are disabled", () => { - let handler: ((evt: unknown) => void) | undefined; - const session: StubSession = { - subscribe: (fn) => { - handler = fn; - return () => {}; - }, - }; + const { session, emit } = createStubSessionHarness(); const onPartialReply = vi.fn(); const subscription = subscribeEmbeddedPiSession({ - session: session as unknown as Parameters[0]["session"], + session, runId: "run", reasoningMode: "on", onPartialReply, }); - handler?.({ type: "message_start", message: { role: "assistant" } }); - handler?.({ - type: "message_update", - message: { role: "assistant" }, - assistantMessageEvent: { - type: "text_delta", - delta: "Draft ", - }, - }); - handler?.({ - type: "message_update", - message: { role: "assistant" }, - assistantMessageEvent: { - type: "text_delta", - delta: "reply", - }, - }); + emit({ type: "message_start", message: { role: "assistant" } }); + emitAssistantTextDelta({ emit, delta: "Draft " }); + emitAssistantTextDelta({ emit, delta: "reply" }); expect(onPartialReply).not.toHaveBeenCalled(); @@ -105,15 +60,8 @@ describe("subscribeEmbeddedPiSession", () => { ], } as AssistantMessage; - handler?.({ type: "message_end", message: assistantMessage }); - handler?.({ - type: "message_update", - message: { role: "assistant" }, - assistantMessageEvent: { - type: "text_end", - content: "Draft reply", - }, - }); + emit({ type: "message_end", message: assistantMessage }); + emitAssistantTextEnd({ emit, content: "Draft reply" }); expect(onPartialReply).not.toHaveBeenCalled(); expect(subscription.assistantTexts).toEqual(["Final answer"]); diff --git a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.suppresses-message-end-block-replies-message-tool.e2e.test.ts b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.suppresses-message-end-block-replies-message-tool.e2e.test.ts index 2bc0382f57..737edc7fda 100644 --- a/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.suppresses-message-end-block-replies-message-tool.e2e.test.ts +++ b/src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.suppresses-message-end-block-replies-message-tool.e2e.test.ts @@ -1,6 +1,10 @@ import type { AssistantMessage } from "@mariozechner/pi-ai"; import { describe, expect, it, vi } from "vitest"; -import { createStubSessionHarness } from "./pi-embedded-subscribe.e2e-harness.js"; +import { + createStubSessionHarness, + emitAssistantTextDelta, + emitAssistantTextEnd, +} from "./pi-embedded-subscribe.e2e-harness.js"; import { subscribeEmbeddedPiSession } from "./pi-embedded-subscribe.js"; function createBlockReplyHarness(blockReplyBreak: "message_end" | "text_end") { @@ -48,16 +52,8 @@ function emitAssistantMessageEnd(emit: (evt: unknown) => void, text: string) { function emitAssistantTextEndBlock(emit: (evt: unknown) => void, text: string) { emit({ type: "message_start", message: { role: "assistant" } }); - emit({ - type: "message_update", - message: { role: "assistant" }, - assistantMessageEvent: { type: "text_delta", delta: text }, - }); - emit({ - type: "message_update", - message: { role: "assistant" }, - assistantMessageEvent: { type: "text_end" }, - }); + emitAssistantTextDelta({ emit, delta: text }); + emitAssistantTextEnd({ emit }); } describe("subscribeEmbeddedPiSession", () => {