refactor(agents): dedupe pi subscribe e2e stream fixtures

This commit is contained in:
Peter Steinberger
2026-02-19 09:49:46 +00:00
parent 150a76ca9a
commit fa726792ce
10 changed files with 119 additions and 394 deletions

View File

@@ -1,5 +1,8 @@
import { describe, expect, it, vi } from "vitest";
import { createStubSessionHarness } from "./pi-embedded-subscribe.e2e-harness.js";
import {
createStubSessionHarness,
emitAssistantTextDelta,
} from "./pi-embedded-subscribe.e2e-harness.js";
import { subscribeEmbeddedPiSession } from "./pi-embedded-subscribe.js";
describe("subscribeEmbeddedPiSession thinking tag code span awareness", () => {
@@ -19,13 +22,9 @@ describe("subscribeEmbeddedPiSession thinking tag code span awareness", () => {
it("does not strip thinking tags inside inline code backticks", () => {
const { emit, onPartialReply } = createPartialReplyHarness();
emit({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
type: "text_delta",
delta: "The fix strips leaked `<thinking>` tags from messages.",
},
emitAssistantTextDelta({
emit,
delta: "The fix strips leaked `<thinking>` tags from messages.",
});
expect(onPartialReply).toHaveBeenCalled();
@@ -36,13 +35,9 @@ describe("subscribeEmbeddedPiSession thinking tag code span awareness", () => {
it("does not strip thinking tags inside fenced code blocks", () => {
const { emit, onPartialReply } = createPartialReplyHarness();
emit({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
type: "text_delta",
delta: "Example:\n ````\n<thinking>code example</thinking>\n ````\nDone.",
},
emitAssistantTextDelta({
emit,
delta: "Example:\n ````\n<thinking>code example</thinking>\n ````\nDone.",
});
expect(onPartialReply).toHaveBeenCalled();
@@ -53,13 +48,9 @@ describe("subscribeEmbeddedPiSession thinking tag code span awareness", () => {
it("still strips actual thinking tags outside code spans", () => {
const { emit, onPartialReply } = createPartialReplyHarness();
emit({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
type: "text_delta",
delta: "Hello <thinking>internal thought</thinking> world",
},
emitAssistantTextDelta({
emit,
delta: "Hello <thinking>internal thought</thinking> world",
});
expect(onPartialReply).toHaveBeenCalled();

View File

@@ -1,6 +1,10 @@
import type { AssistantMessage } from "@mariozechner/pi-ai";
import { describe, expect, it, vi } from "vitest";
import { createStubSessionHarness } from "./pi-embedded-subscribe.e2e-harness.js";
import {
createStubSessionHarness,
emitAssistantTextDelta,
emitAssistantTextEnd,
} from "./pi-embedded-subscribe.e2e-harness.js";
import { subscribeEmbeddedPiSession } from "./pi-embedded-subscribe.js";
describe("subscribeEmbeddedPiSession reply tags", () => {
@@ -27,19 +31,8 @@ describe("subscribeEmbeddedPiSession reply tags", () => {
const { emit, onBlockReply } = createBlockReplyHarness();
emit({ type: "message_start", message: { role: "assistant" } });
emit({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
type: "text_delta",
delta: "[[reply_to_current]]\nHello",
},
});
emit({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: { type: "text_end" },
});
emitAssistantTextDelta({ emit, delta: "[[reply_to_current]]\nHello" });
emitAssistantTextEnd({ emit });
const assistantMessage = {
role: "assistant",
@@ -58,16 +51,8 @@ describe("subscribeEmbeddedPiSession reply tags", () => {
const { emit, onBlockReply } = createBlockReplyHarness();
emit({ type: "message_start", message: { role: "assistant" } });
emit({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: { type: "text_delta", delta: "Hello [[" },
});
emit({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: { type: "text_end" },
});
emitAssistantTextDelta({ emit, delta: "Hello [[" });
emitAssistantTextEnd({ emit });
const assistantMessage = {
role: "assistant",
@@ -92,26 +77,10 @@ describe("subscribeEmbeddedPiSession reply tags", () => {
});
emit({ type: "message_start", message: { role: "assistant" } });
emit({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: { type: "text_delta", delta: "[[reply_to:1897" },
});
emit({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: { type: "text_delta", delta: "]] Hello" },
});
emit({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: { type: "text_delta", delta: " world" },
});
emit({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: { type: "text_end" },
});
emitAssistantTextDelta({ emit, delta: "[[reply_to:1897" });
emitAssistantTextDelta({ emit, delta: "]] Hello" });
emitAssistantTextDelta({ emit, delta: " world" });
emitAssistantTextEnd({ emit });
const lastPayload = onPartialReply.mock.calls.at(-1)?.[0];
expect(lastPayload?.text).toBe("Hello world");

View File

@@ -1,21 +1,13 @@
import { describe, expect, it, vi } from "vitest";
import {
createStubSessionHarness,
emitAssistantTextDelta,
} from "./pi-embedded-subscribe.e2e-harness.js";
import { subscribeEmbeddedPiSession } from "./pi-embedded-subscribe.js";
type StubSession = {
subscribe: (fn: (evt: unknown) => void) => () => void;
};
type SessionEventHandler = (evt: unknown) => void;
describe("subscribeEmbeddedPiSession", () => {
it("calls onBlockReplyFlush before tool_execution_start to preserve message boundaries", () => {
let handler: SessionEventHandler | undefined;
const session: StubSession = {
subscribe: (fn) => {
handler = fn;
return () => {};
},
};
const { session, emit } = createStubSessionHarness();
const onBlockReplyFlush = vi.fn();
const onBlockReply = vi.fn();
@@ -29,24 +21,17 @@ describe("subscribeEmbeddedPiSession", () => {
});
// Simulate text arriving before tool
handler?.({
emit({
type: "message_start",
message: { role: "assistant" },
});
handler?.({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
type: "text_delta",
delta: "First message before tool.",
},
});
emitAssistantTextDelta({ emit, delta: "First message before tool." });
expect(onBlockReplyFlush).not.toHaveBeenCalled();
// Tool execution starts - should trigger flush
handler?.({
emit({
type: "tool_execution_start",
toolName: "bash",
toolCallId: "tool-flush-1",
@@ -56,7 +41,7 @@ describe("subscribeEmbeddedPiSession", () => {
expect(onBlockReplyFlush).toHaveBeenCalledTimes(1);
// Another tool - should flush again
handler?.({
emit({
type: "tool_execution_start",
toolName: "read",
toolCallId: "tool-flush-2",
@@ -66,13 +51,7 @@ describe("subscribeEmbeddedPiSession", () => {
expect(onBlockReplyFlush).toHaveBeenCalledTimes(2);
});
it("flushes buffered block chunks before tool execution", () => {
let handler: SessionEventHandler | undefined;
const session: StubSession = {
subscribe: (fn) => {
handler = fn;
return () => {};
},
};
const { session, emit } = createStubSessionHarness();
const onBlockReply = vi.fn();
const onBlockReplyFlush = vi.fn();
@@ -86,23 +65,16 @@ describe("subscribeEmbeddedPiSession", () => {
blockReplyChunking: { minChars: 50, maxChars: 200 },
});
handler?.({
emit({
type: "message_start",
message: { role: "assistant" },
});
handler?.({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
type: "text_delta",
delta: "Short chunk.",
},
});
emitAssistantTextDelta({ emit, delta: "Short chunk." });
expect(onBlockReply).not.toHaveBeenCalled();
handler?.({
emit({
type: "tool_execution_start",
toolName: "bash",
toolCallId: "tool-flush-buffer-1",

View File

@@ -1,51 +1,21 @@
import { describe, expect, it, vi } from "vitest";
import { subscribeEmbeddedPiSession } from "./pi-embedded-subscribe.js";
type StubSession = {
subscribe: (fn: (evt: unknown) => void) => () => void;
};
import {
createTextEndBlockReplyHarness,
emitAssistantTextDelta,
emitAssistantTextEnd,
} from "./pi-embedded-subscribe.e2e-harness.js";
describe("subscribeEmbeddedPiSession", () => {
function setupTextEndSubscription() {
let handler: ((evt: unknown) => void) | undefined;
const session: StubSession = {
subscribe: (fn) => {
handler = fn;
return () => {};
},
};
const onBlockReply = vi.fn();
const subscription = subscribeEmbeddedPiSession({
session: session as unknown as Parameters<typeof subscribeEmbeddedPiSession>[0]["session"],
runId: "run",
onBlockReply,
blockReplyBreak: "text_end",
});
const emit = (evt: unknown) => handler?.(evt);
const { emit, subscription } = createTextEndBlockReplyHarness({ onBlockReply });
const emitDelta = (delta: string) => {
emit({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
type: "text_delta",
delta,
},
});
emitAssistantTextDelta({ emit, delta });
};
const emitTextEnd = (content: string) => {
emit({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
type: "text_end",
content,
},
});
emitAssistantTextEnd({ emit, content });
};
return { onBlockReply, subscription, emitDelta, emitTextEnd };

View File

@@ -1,80 +1,40 @@
import { describe, expect, it, vi } from "vitest";
import { createStubSessionHarness } from "./pi-embedded-subscribe.e2e-harness.js";
import { subscribeEmbeddedPiSession } from "./pi-embedded-subscribe.js";
import {
createTextEndBlockReplyHarness,
emitAssistantTextDelta,
emitAssistantTextEnd,
} from "./pi-embedded-subscribe.e2e-harness.js";
describe("subscribeEmbeddedPiSession", () => {
function createTextEndHarness(chunking?: {
minChars: number;
maxChars: number;
breakPreference: "newline";
}) {
const { session, emit } = createStubSessionHarness();
const onBlockReply = vi.fn();
const subscription = subscribeEmbeddedPiSession({
session,
runId: "run",
onBlockReply,
blockReplyBreak: "text_end",
blockReplyChunking: chunking,
});
return { emit, onBlockReply, subscription };
}
it("does not duplicate when text_end repeats full content", () => {
const { emit, onBlockReply, subscription } = createTextEndHarness();
const onBlockReply = vi.fn();
const { emit, subscription } = createTextEndBlockReplyHarness({ onBlockReply });
emit({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
type: "text_delta",
delta: "Good morning!",
},
});
emit({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
type: "text_end",
content: "Good morning!",
},
});
emitAssistantTextDelta({ emit, delta: "Good morning!" });
emitAssistantTextEnd({ emit, content: "Good morning!" });
expect(onBlockReply).toHaveBeenCalledTimes(1);
expect(subscription.assistantTexts).toEqual(["Good morning!"]);
});
it("does not duplicate block chunks when text_end repeats full content", () => {
const { emit, onBlockReply } = createTextEndHarness({
minChars: 5,
maxChars: 40,
breakPreference: "newline",
const onBlockReply = vi.fn();
const { emit } = createTextEndBlockReplyHarness({
onBlockReply,
blockReplyChunking: {
minChars: 5,
maxChars: 40,
breakPreference: "newline",
},
});
const fullText = "First line\nSecond line\nThird line\n";
emit({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
type: "text_delta",
delta: fullText,
},
});
emitAssistantTextDelta({ emit, delta: fullText });
const callsAfterDelta = onBlockReply.mock.calls.length;
expect(callsAfterDelta).toBeGreaterThan(0);
emit({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
type: "text_end",
content: fullText,
},
});
emitAssistantTextEnd({ emit, content: fullText });
expect(onBlockReply).toHaveBeenCalledTimes(callsAfterDelta);
});

View File

@@ -1,45 +1,21 @@
import type { AssistantMessage } from "@mariozechner/pi-ai";
import { describe, expect, it, vi } from "vitest";
import { createStubSessionHarness } from "./pi-embedded-subscribe.e2e-harness.js";
import {
createStubSessionHarness,
createTextEndBlockReplyHarness,
emitAssistantTextDelta,
emitAssistantTextEnd,
} from "./pi-embedded-subscribe.e2e-harness.js";
import { subscribeEmbeddedPiSession } from "./pi-embedded-subscribe.js";
describe("subscribeEmbeddedPiSession", () => {
it("does not emit duplicate block replies when text_end repeats", () => {
const { session, emit } = createStubSessionHarness();
const onBlockReply = vi.fn();
const { emit, subscription } = createTextEndBlockReplyHarness({ onBlockReply });
const subscription = subscribeEmbeddedPiSession({
session,
runId: "run",
onBlockReply,
blockReplyBreak: "text_end",
});
emit({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
type: "text_delta",
delta: "Hello block",
},
});
emit({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
type: "text_end",
},
});
emit({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
type: "text_end",
},
});
emitAssistantTextDelta({ emit, delta: "Hello block" });
emitAssistantTextEnd({ emit });
emitAssistantTextEnd({ emit });
expect(onBlockReply).toHaveBeenCalledTimes(1);
expect(subscription.assistantTexts).toEqual(["Hello block"]);

View File

@@ -1,42 +1,18 @@
import type { AssistantMessage } from "@mariozechner/pi-ai";
import { describe, expect, it, vi } from "vitest";
import { createStubSessionHarness } from "./pi-embedded-subscribe.e2e-harness.js";
import { subscribeEmbeddedPiSession } from "./pi-embedded-subscribe.js";
import {
createTextEndBlockReplyHarness,
emitAssistantTextDelta,
emitAssistantTextEnd,
} from "./pi-embedded-subscribe.e2e-harness.js";
describe("subscribeEmbeddedPiSession", () => {
function createTextEndBlockReplyHarness() {
const { session, emit } = createStubSessionHarness();
const onBlockReply = vi.fn();
const subscription = subscribeEmbeddedPiSession({
session,
runId: "run",
onBlockReply,
blockReplyBreak: "text_end",
});
return { emit, onBlockReply, subscription };
}
it("emits block replies on text_end and does not duplicate on message_end", () => {
const { emit, onBlockReply, subscription } = createTextEndBlockReplyHarness();
const onBlockReply = vi.fn();
const { emit, subscription } = createTextEndBlockReplyHarness({ onBlockReply });
emit({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
type: "text_delta",
delta: "Hello block",
},
});
emit({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
type: "text_end",
},
});
emitAssistantTextDelta({ emit, delta: "Hello block" });
emitAssistantTextEnd({ emit });
expect(onBlockReply).toHaveBeenCalledTimes(1);
const payload = onBlockReply.mock.calls[0][0];
@@ -54,18 +30,12 @@ describe("subscribeEmbeddedPiSession", () => {
expect(subscription.assistantTexts).toEqual(["Hello block"]);
});
it("does not duplicate when message_end flushes and a late text_end arrives", () => {
const { emit, onBlockReply, subscription } = createTextEndBlockReplyHarness();
const onBlockReply = vi.fn();
const { emit, subscription } = createTextEndBlockReplyHarness({ onBlockReply });
emit({ type: "message_start", message: { role: "assistant" } });
emit({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
type: "text_delta",
delta: "Hello block",
},
});
emitAssistantTextDelta({ emit, delta: "Hello block" });
const assistantMessage = {
role: "assistant",
@@ -79,14 +49,7 @@ describe("subscribeEmbeddedPiSession", () => {
expect(subscription.assistantTexts).toEqual(["Hello block"]);
// Some providers can still emit a late text_end; this must not re-emit.
emit({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
type: "text_end",
content: "Hello block",
},
});
emitAssistantTextEnd({ emit, content: "Hello block" });
expect(onBlockReply).toHaveBeenCalledTimes(1);
expect(subscription.assistantTexts).toEqual(["Hello block"]);

View File

@@ -2,6 +2,7 @@ import type { AssistantMessage } from "@mariozechner/pi-ai";
import { describe, expect, it, vi } from "vitest";
import {
createStubSessionHarness,
emitAssistantTextDelta,
emitMessageStartAndEndForAssistantText,
expectSingleAgentEventText,
} from "./pi-embedded-subscribe.e2e-harness.js";
@@ -23,14 +24,7 @@ describe("subscribeEmbeddedPiSession", () => {
});
emit({ type: "message_start", message: { role: "assistant" } });
emit({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
type: "text_delta",
delta: "<final>Hi there</final>",
},
});
emitAssistantTextDelta({ emit, delta: "<final>Hi there</final>" });
expect(onPartialReply).toHaveBeenCalled();
const firstPayload = onPartialReply.mock.calls[0][0];
@@ -39,14 +33,7 @@ describe("subscribeEmbeddedPiSession", () => {
onPartialReply.mockReset();
emit({ type: "message_start", message: { role: "assistant" } });
emit({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
type: "text_delta",
delta: "</final>Oops no start",
},
});
emitAssistantTextDelta({ emit, delta: "</final>Oops no start" });
expect(onPartialReply).not.toHaveBeenCalled();
});
@@ -75,14 +62,7 @@ describe("subscribeEmbeddedPiSession", () => {
onPartialReply,
});
emit({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
type: "text_delta",
delta: "Hello world",
},
});
emitAssistantTextDelta({ emit, delta: "Hello world" });
const payload = onPartialReply.mock.calls[0][0];
expect(payload.text).toBe("Hello world");

View File

@@ -1,51 +1,26 @@
import type { AssistantMessage } from "@mariozechner/pi-ai";
import { describe, expect, it, vi } from "vitest";
import {
createStubSessionHarness,
emitAssistantTextDelta,
emitAssistantTextEnd,
} from "./pi-embedded-subscribe.e2e-harness.js";
import { subscribeEmbeddedPiSession } from "./pi-embedded-subscribe.js";
type StubSession = {
subscribe: (fn: (evt: unknown) => void) => () => void;
};
describe("subscribeEmbeddedPiSession", () => {
it("keeps assistantTexts to the final answer when block replies are disabled", () => {
let handler: ((evt: unknown) => void) | undefined;
const session: StubSession = {
subscribe: (fn) => {
handler = fn;
return () => {};
},
};
const { session, emit } = createStubSessionHarness();
const subscription = subscribeEmbeddedPiSession({
session: session as unknown as Parameters<typeof subscribeEmbeddedPiSession>[0]["session"],
session,
runId: "run",
reasoningMode: "on",
});
handler?.({ type: "message_start", message: { role: "assistant" } });
handler?.({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
type: "text_delta",
delta: "Final ",
},
});
handler?.({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
type: "text_delta",
delta: "answer",
},
});
handler?.({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
type: "text_end",
},
});
emit({ type: "message_start", message: { role: "assistant" } });
emitAssistantTextDelta({ emit, delta: "Final " });
emitAssistantTextDelta({ emit, delta: "answer" });
emitAssistantTextEnd({ emit });
const assistantMessage = {
role: "assistant",
@@ -55,45 +30,25 @@ describe("subscribeEmbeddedPiSession", () => {
],
} as AssistantMessage;
handler?.({ type: "message_end", message: assistantMessage });
emit({ type: "message_end", message: assistantMessage });
expect(subscription.assistantTexts).toEqual(["Final answer"]);
});
it("suppresses partial replies when reasoning is enabled and block replies are disabled", () => {
let handler: ((evt: unknown) => void) | undefined;
const session: StubSession = {
subscribe: (fn) => {
handler = fn;
return () => {};
},
};
const { session, emit } = createStubSessionHarness();
const onPartialReply = vi.fn();
const subscription = subscribeEmbeddedPiSession({
session: session as unknown as Parameters<typeof subscribeEmbeddedPiSession>[0]["session"],
session,
runId: "run",
reasoningMode: "on",
onPartialReply,
});
handler?.({ type: "message_start", message: { role: "assistant" } });
handler?.({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
type: "text_delta",
delta: "Draft ",
},
});
handler?.({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
type: "text_delta",
delta: "reply",
},
});
emit({ type: "message_start", message: { role: "assistant" } });
emitAssistantTextDelta({ emit, delta: "Draft " });
emitAssistantTextDelta({ emit, delta: "reply" });
expect(onPartialReply).not.toHaveBeenCalled();
@@ -105,15 +60,8 @@ describe("subscribeEmbeddedPiSession", () => {
],
} as AssistantMessage;
handler?.({ type: "message_end", message: assistantMessage });
handler?.({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: {
type: "text_end",
content: "Draft reply",
},
});
emit({ type: "message_end", message: assistantMessage });
emitAssistantTextEnd({ emit, content: "Draft reply" });
expect(onPartialReply).not.toHaveBeenCalled();
expect(subscription.assistantTexts).toEqual(["Final answer"]);

View File

@@ -1,6 +1,10 @@
import type { AssistantMessage } from "@mariozechner/pi-ai";
import { describe, expect, it, vi } from "vitest";
import { createStubSessionHarness } from "./pi-embedded-subscribe.e2e-harness.js";
import {
createStubSessionHarness,
emitAssistantTextDelta,
emitAssistantTextEnd,
} from "./pi-embedded-subscribe.e2e-harness.js";
import { subscribeEmbeddedPiSession } from "./pi-embedded-subscribe.js";
function createBlockReplyHarness(blockReplyBreak: "message_end" | "text_end") {
@@ -48,16 +52,8 @@ function emitAssistantMessageEnd(emit: (evt: unknown) => void, text: string) {
function emitAssistantTextEndBlock(emit: (evt: unknown) => void, text: string) {
emit({ type: "message_start", message: { role: "assistant" } });
emit({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: { type: "text_delta", delta: text },
});
emit({
type: "message_update",
message: { role: "assistant" },
assistantMessageEvent: { type: "text_end" },
});
emitAssistantTextDelta({ emit, delta: text });
emitAssistantTextEnd({ emit });
}
describe("subscribeEmbeddedPiSession", () => {