diff --git a/autogpt_platform/backend/backend/copilot/baseline/service.py b/autogpt_platform/backend/backend/copilot/baseline/service.py index 3d2837903b..894e95a91e 100644 --- a/autogpt_platform/backend/backend/copilot/baseline/service.py +++ b/autogpt_platform/backend/backend/copilot/baseline/service.py @@ -268,96 +268,8 @@ def _resolve_baseline_model(mode: CopilotMode | None) -> str: return config.model -# Tag pairs to strip from baseline streaming output. Different models use -# different tag names for their internal reasoning (Claude uses , -# Gemini uses , etc.). -_REASONING_TAG_PAIRS: list[tuple[str, str]] = [ - ("", ""), - ("", ""), -] - -# Longest opener — used to size the partial-tag buffer. -_MAX_OPEN_TAG_LEN = max(len(o) for o, _ in _REASONING_TAG_PAIRS) - - -class _ThinkingStripper: - """Strip reasoning blocks from a stream of text deltas. - - Handles multiple tag patterns (````, ````, - etc.) so the same stripper works across Claude, Gemini, and other models. - - Buffers just enough characters to detect a tag that may be split - across chunks; emits text immediately when no tag is in-flight. - Robust to single chunks that open and close a block, multiple - blocks per stream, and tags that straddle chunk boundaries. - """ - - def __init__(self) -> None: - self._buffer: str = "" - self._in_thinking: bool = False - self._close_tag: str = "" # closing tag for the currently open block - - def _find_open_tag(self) -> tuple[int, str, str]: - """Find the earliest opening tag in the buffer. - - Returns (position, open_tag, close_tag) or (-1, "", "") if none. - """ - best_pos = -1 - best_open = "" - best_close = "" - for open_tag, close_tag in _REASONING_TAG_PAIRS: - pos = self._buffer.find(open_tag) - if pos != -1 and (best_pos == -1 or pos < best_pos): - best_pos = pos - best_open = open_tag - best_close = close_tag - return best_pos, best_open, best_close - - def process(self, chunk: str) -> str: - """Feed a chunk and return the text that is safe to emit now.""" - self._buffer += chunk - out: list[str] = [] - while self._buffer: - if self._in_thinking: - end = self._buffer.find(self._close_tag) - if end == -1: - keep = len(self._close_tag) - 1 - self._buffer = self._buffer[-keep:] if keep else "" - return "".join(out) - self._buffer = self._buffer[end + len(self._close_tag) :] - self._in_thinking = False - self._close_tag = "" - else: - start, open_tag, close_tag = self._find_open_tag() - if start == -1: - # No opening tag; emit everything except a tail that - # could start a partial opener on the next chunk. - safe_end = len(self._buffer) - for keep in range( - min(_MAX_OPEN_TAG_LEN - 1, len(self._buffer)), 0, -1 - ): - tail = self._buffer[-keep:] - if any(o[:keep] == tail for o, _ in _REASONING_TAG_PAIRS): - safe_end = len(self._buffer) - keep - break - out.append(self._buffer[:safe_end]) - self._buffer = self._buffer[safe_end:] - return "".join(out) - out.append(self._buffer[:start]) - self._buffer = self._buffer[start + len(open_tag) :] - self._in_thinking = True - self._close_tag = close_tag - return "".join(out) - - def flush(self) -> str: - """Return any remaining emittable text when the stream ends.""" - if self._in_thinking: - # Unclosed thinking block — discard the buffered reasoning. - self._buffer = "" - return "" - out = self._buffer - self._buffer = "" - return out +# Re-export from shared module so existing references keep working. +from backend.copilot.thinking_stripper import ThinkingStripper as _ThinkingStripper @dataclass diff --git a/autogpt_platform/backend/backend/copilot/sdk/service.py b/autogpt_platform/backend/backend/copilot/sdk/service.py index ec5449644e..a84c451414 100644 --- a/autogpt_platform/backend/backend/copilot/sdk/service.py +++ b/autogpt_platform/backend/backend/copilot/sdk/service.py @@ -12,7 +12,7 @@ import sys import time import uuid from collections.abc import AsyncGenerator, AsyncIterator -from dataclasses import dataclass +from dataclasses import dataclass, field as dataclass_field from typing import TYPE_CHECKING, Any, Literal, NamedTuple, TypedDict, cast from typing_extensions import NotRequired @@ -120,6 +120,7 @@ from ..token_tracking import persist_and_record_usage from ..tools.e2b_sandbox import get_or_create_sandbox, pause_sandbox_direct from ..tools.sandbox import WORKSPACE_PREFIX, make_session_path from ..tracking import track_user_message +from ..thinking_stripper import ThinkingStripper from .compaction import CompactionTracker, filter_compaction_messages from .env import build_sdk_env # noqa: F401 — re-export for backward compat from .response_adapter import SDKResponseAdapter @@ -1200,6 +1201,9 @@ class _StreamAccumulator: has_appended_assistant: bool = False has_tool_results: bool = False stream_completed: bool = False + thinking_stripper: ThinkingStripper = dataclass_field( + default_factory=ThinkingStripper, + ) def _dispatch_response( @@ -1256,7 +1260,15 @@ def _dispatch_response( ) if isinstance(response, StreamTextDelta): - delta = response.delta or "" + raw_delta = response.delta or "" + # Strip / tags that non-extended- + # thinking models (e.g. Sonnet) may emit as visible text. + delta = acc.thinking_stripper.process(raw_delta) + if not delta: + # Stripper is buffering a potential tag — suppress this event. + return None + # Replace the delta with the stripped version for the SSE client. + response = StreamTextDelta(id=response.id, delta=delta) if acc.has_tool_results and acc.has_appended_assistant: acc.assistant_response = ChatMessage(role="assistant", content=delta) acc.accumulated_tool_calls = [] @@ -1871,6 +1883,14 @@ async def _run_stream_attempt( finally: await _safe_close_sdk_client(sdk_client, ctx.log_prefix) + # --- Flush any text buffered by the thinking stripper --- + tail = acc.thinking_stripper.flush() + if tail: + acc.assistant_response.content = ( + acc.assistant_response.content or "" + ) + tail + yield StreamTextDelta(id="", delta=tail) + # --- Post-stream processing (only on success) --- if state.adapter.has_unresolved_tool_calls: logger.warning( diff --git a/autogpt_platform/backend/backend/copilot/thinking_stripper.py b/autogpt_platform/backend/backend/copilot/thinking_stripper.py new file mode 100644 index 0000000000..77122c74ce --- /dev/null +++ b/autogpt_platform/backend/backend/copilot/thinking_stripper.py @@ -0,0 +1,102 @@ +"""Streaming tag stripper for model reasoning blocks. + +Different LLMs wrap internal chain-of-thought in different XML-style tags +(Claude uses ````, Gemini uses ````, etc.). +When extended thinking is **not** enabled, these tags may appear as plain text +in the response stream and must be stripped before the content reaches the +user. + +The :class:`ThinkingStripper` handles chunk-boundary splitting so it can be +plugged into any delta-based streaming pipeline. +""" + +from __future__ import annotations + +# Tag pairs to strip. Each entry is (open_tag, close_tag). +REASONING_TAG_PAIRS: list[tuple[str, str]] = [ + ("", ""), + ("", ""), +] + +# Longest opener — used to size the partial-tag buffer. +_MAX_OPEN_TAG_LEN = max(len(o) for o, _ in REASONING_TAG_PAIRS) + + +class ThinkingStripper: + """Strip reasoning blocks from a stream of text deltas. + + Handles multiple tag patterns (````, ````, + etc.) so the same stripper works across Claude, Gemini, and other models. + + Buffers just enough characters to detect a tag that may be split + across chunks; emits text immediately when no tag is in-flight. + Robust to single chunks that open and close a block, multiple + blocks per stream, and tags that straddle chunk boundaries. + """ + + def __init__(self) -> None: + self._buffer: str = "" + self._in_thinking: bool = False + self._close_tag: str = "" # closing tag for the currently open block + + def _find_open_tag(self) -> tuple[int, str, str]: + """Find the earliest opening tag in the buffer. + + Returns (position, open_tag, close_tag) or (-1, "", "") if none. + """ + best_pos = -1 + best_open = "" + best_close = "" + for open_tag, close_tag in REASONING_TAG_PAIRS: + pos = self._buffer.find(open_tag) + if pos != -1 and (best_pos == -1 or pos < best_pos): + best_pos = pos + best_open = open_tag + best_close = close_tag + return best_pos, best_open, best_close + + def process(self, chunk: str) -> str: + """Feed a chunk and return the text that is safe to emit now.""" + self._buffer += chunk + out: list[str] = [] + while self._buffer: + if self._in_thinking: + end = self._buffer.find(self._close_tag) + if end == -1: + keep = len(self._close_tag) - 1 + self._buffer = self._buffer[-keep:] if keep else "" + return "".join(out) + self._buffer = self._buffer[end + len(self._close_tag) :] + self._in_thinking = False + self._close_tag = "" + else: + start, open_tag, close_tag = self._find_open_tag() + if start == -1: + # No opening tag; emit everything except a tail that + # could start a partial opener on the next chunk. + safe_end = len(self._buffer) + for keep in range( + min(_MAX_OPEN_TAG_LEN - 1, len(self._buffer)), 0, -1 + ): + tail = self._buffer[-keep:] + if any(o[:keep] == tail for o, _ in REASONING_TAG_PAIRS): + safe_end = len(self._buffer) - keep + break + out.append(self._buffer[:safe_end]) + self._buffer = self._buffer[safe_end:] + return "".join(out) + out.append(self._buffer[:start]) + self._buffer = self._buffer[start + len(open_tag) :] + self._in_thinking = True + self._close_tag = close_tag + return "".join(out) + + def flush(self) -> str: + """Return any remaining emittable text when the stream ends.""" + if self._in_thinking: + # Unclosed thinking block — discard the buffered reasoning. + self._buffer = "" + return "" + out = self._buffer + self._buffer = "" + return out diff --git a/autogpt_platform/backend/backend/copilot/thinking_stripper_test.py b/autogpt_platform/backend/backend/copilot/thinking_stripper_test.py new file mode 100644 index 0000000000..221c15c855 --- /dev/null +++ b/autogpt_platform/backend/backend/copilot/thinking_stripper_test.py @@ -0,0 +1,93 @@ +"""Tests for the shared ThinkingStripper.""" + +from backend.copilot.thinking_stripper import ThinkingStripper + + +def test_basic_thinking_tag() -> None: + """... blocks are fully stripped.""" + s = ThinkingStripper() + assert s.process("internal reasoning hereHello!") == "Hello!" + + +def test_internal_reasoning_tag() -> None: + """... blocks are stripped.""" + s = ThinkingStripper() + assert ( + s.process("step by stepAnswer") + == "Answer" + ) + + +def test_split_across_chunks() -> None: + """Tags split across multiple chunks are handled correctly.""" + s = ThinkingStripper() + out = s.process("Hello secret world") + assert out == "Hello world" + + +def test_plain_text_preserved() -> None: + """Plain text with the word 'thinking' is not stripped.""" + s = ThinkingStripper() + assert ( + s.process("I am thinking about this problem") + == "I am thinking about this problem" + ) + + +def test_multiple_blocks() -> None: + """Multiple reasoning blocks in one stream are all stripped.""" + s = ThinkingStripper() + result = s.process( + "AxByC" + ) + assert result == "ABC" + + +def test_flush_discards_unclosed() -> None: + """Unclosed reasoning block is discarded on flush.""" + s = ThinkingStripper() + s.process("Startnever closed") + flushed = s.flush() + assert "never closed" not in flushed + + +def test_empty_block() -> None: + """Empty reasoning blocks are handled gracefully.""" + s = ThinkingStripper() + assert s.process("BeforeAfter") == "BeforeAfter" + + +def test_flush_emits_remaining_plain_text() -> None: + """flush() returns any plain text still in the buffer.""" + s = ThinkingStripper() + # The trailing '<' could be a partial tag, so process buffers it. + out = s.process("Hello") + flushed = s.flush() + assert out + flushed == "Hello" + + +def test_internal_reasoning_split_open_tag() -> None: + """ split across three chunks.""" + s = ThinkingStripper() + out = s.process("OK secret stuff visible") + out += s.flush() + assert out == "OK visible" + + +def test_no_tags_passthrough() -> None: + """Text without any tags passes through unchanged.""" + s = ThinkingStripper() + out = s.process("Hello world, this is fine.") + out += s.flush() + assert out == "Hello world, this is fine." + + +def test_reasoning_at_end_of_stream() -> None: + """Reasoning block at end of stream with no trailing text.""" + s = ThinkingStripper() + out = s.process("Answermy thoughts") + out += s.flush() + assert out == "Answer"