mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-30 03:00:41 -04:00
fix: resolve openapi.json merge conflict — keep cost_bearing_request_count
This commit is contained in:
@@ -268,96 +268,8 @@ def _resolve_baseline_model(mode: CopilotMode | None) -> str:
|
||||
return config.model
|
||||
|
||||
|
||||
# Tag pairs to strip from baseline streaming output. Different models use
|
||||
# different tag names for their internal reasoning (Claude uses <thinking>,
|
||||
# Gemini uses <internal_reasoning>, etc.).
|
||||
_REASONING_TAG_PAIRS: list[tuple[str, str]] = [
|
||||
("<thinking>", "</thinking>"),
|
||||
("<internal_reasoning>", "</internal_reasoning>"),
|
||||
]
|
||||
|
||||
# Longest opener — used to size the partial-tag buffer.
|
||||
_MAX_OPEN_TAG_LEN = max(len(o) for o, _ in _REASONING_TAG_PAIRS)
|
||||
|
||||
|
||||
class _ThinkingStripper:
|
||||
"""Strip reasoning blocks from a stream of text deltas.
|
||||
|
||||
Handles multiple tag patterns (``<thinking>``, ``<internal_reasoning>``,
|
||||
etc.) so the same stripper works across Claude, Gemini, and other models.
|
||||
|
||||
Buffers just enough characters to detect a tag that may be split
|
||||
across chunks; emits text immediately when no tag is in-flight.
|
||||
Robust to single chunks that open and close a block, multiple
|
||||
blocks per stream, and tags that straddle chunk boundaries.
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._buffer: str = ""
|
||||
self._in_thinking: bool = False
|
||||
self._close_tag: str = "" # closing tag for the currently open block
|
||||
|
||||
def _find_open_tag(self) -> tuple[int, str, str]:
|
||||
"""Find the earliest opening tag in the buffer.
|
||||
|
||||
Returns (position, open_tag, close_tag) or (-1, "", "") if none.
|
||||
"""
|
||||
best_pos = -1
|
||||
best_open = ""
|
||||
best_close = ""
|
||||
for open_tag, close_tag in _REASONING_TAG_PAIRS:
|
||||
pos = self._buffer.find(open_tag)
|
||||
if pos != -1 and (best_pos == -1 or pos < best_pos):
|
||||
best_pos = pos
|
||||
best_open = open_tag
|
||||
best_close = close_tag
|
||||
return best_pos, best_open, best_close
|
||||
|
||||
def process(self, chunk: str) -> str:
|
||||
"""Feed a chunk and return the text that is safe to emit now."""
|
||||
self._buffer += chunk
|
||||
out: list[str] = []
|
||||
while self._buffer:
|
||||
if self._in_thinking:
|
||||
end = self._buffer.find(self._close_tag)
|
||||
if end == -1:
|
||||
keep = len(self._close_tag) - 1
|
||||
self._buffer = self._buffer[-keep:] if keep else ""
|
||||
return "".join(out)
|
||||
self._buffer = self._buffer[end + len(self._close_tag) :]
|
||||
self._in_thinking = False
|
||||
self._close_tag = ""
|
||||
else:
|
||||
start, open_tag, close_tag = self._find_open_tag()
|
||||
if start == -1:
|
||||
# No opening tag; emit everything except a tail that
|
||||
# could start a partial opener on the next chunk.
|
||||
safe_end = len(self._buffer)
|
||||
for keep in range(
|
||||
min(_MAX_OPEN_TAG_LEN - 1, len(self._buffer)), 0, -1
|
||||
):
|
||||
tail = self._buffer[-keep:]
|
||||
if any(o[:keep] == tail for o, _ in _REASONING_TAG_PAIRS):
|
||||
safe_end = len(self._buffer) - keep
|
||||
break
|
||||
out.append(self._buffer[:safe_end])
|
||||
self._buffer = self._buffer[safe_end:]
|
||||
return "".join(out)
|
||||
out.append(self._buffer[:start])
|
||||
self._buffer = self._buffer[start + len(open_tag) :]
|
||||
self._in_thinking = True
|
||||
self._close_tag = close_tag
|
||||
return "".join(out)
|
||||
|
||||
def flush(self) -> str:
|
||||
"""Return any remaining emittable text when the stream ends."""
|
||||
if self._in_thinking:
|
||||
# Unclosed thinking block — discard the buffered reasoning.
|
||||
self._buffer = ""
|
||||
return ""
|
||||
out = self._buffer
|
||||
self._buffer = ""
|
||||
return out
|
||||
# Re-export from shared module so existing references keep working.
|
||||
from backend.copilot.thinking_stripper import ThinkingStripper as _ThinkingStripper
|
||||
|
||||
|
||||
@dataclass
|
||||
|
||||
@@ -12,7 +12,7 @@ import sys
|
||||
import time
|
||||
import uuid
|
||||
from collections.abc import AsyncGenerator, AsyncIterator
|
||||
from dataclasses import dataclass
|
||||
from dataclasses import dataclass, field as dataclass_field
|
||||
from typing import TYPE_CHECKING, Any, Literal, NamedTuple, TypedDict, cast
|
||||
|
||||
from typing_extensions import NotRequired
|
||||
@@ -120,6 +120,7 @@ from ..token_tracking import persist_and_record_usage
|
||||
from ..tools.e2b_sandbox import get_or_create_sandbox, pause_sandbox_direct
|
||||
from ..tools.sandbox import WORKSPACE_PREFIX, make_session_path
|
||||
from ..tracking import track_user_message
|
||||
from ..thinking_stripper import ThinkingStripper
|
||||
from .compaction import CompactionTracker, filter_compaction_messages
|
||||
from .env import build_sdk_env # noqa: F401 — re-export for backward compat
|
||||
from .response_adapter import SDKResponseAdapter
|
||||
@@ -1200,6 +1201,9 @@ class _StreamAccumulator:
|
||||
has_appended_assistant: bool = False
|
||||
has_tool_results: bool = False
|
||||
stream_completed: bool = False
|
||||
thinking_stripper: ThinkingStripper = dataclass_field(
|
||||
default_factory=ThinkingStripper,
|
||||
)
|
||||
|
||||
|
||||
def _dispatch_response(
|
||||
@@ -1256,7 +1260,15 @@ def _dispatch_response(
|
||||
)
|
||||
|
||||
if isinstance(response, StreamTextDelta):
|
||||
delta = response.delta or ""
|
||||
raw_delta = response.delta or ""
|
||||
# Strip <internal_reasoning> / <thinking> tags that non-extended-
|
||||
# thinking models (e.g. Sonnet) may emit as visible text.
|
||||
delta = acc.thinking_stripper.process(raw_delta)
|
||||
if not delta:
|
||||
# Stripper is buffering a potential tag — suppress this event.
|
||||
return None
|
||||
# Replace the delta with the stripped version for the SSE client.
|
||||
response = StreamTextDelta(id=response.id, delta=delta)
|
||||
if acc.has_tool_results and acc.has_appended_assistant:
|
||||
acc.assistant_response = ChatMessage(role="assistant", content=delta)
|
||||
acc.accumulated_tool_calls = []
|
||||
@@ -1871,6 +1883,14 @@ async def _run_stream_attempt(
|
||||
finally:
|
||||
await _safe_close_sdk_client(sdk_client, ctx.log_prefix)
|
||||
|
||||
# --- Flush any text buffered by the thinking stripper ---
|
||||
tail = acc.thinking_stripper.flush()
|
||||
if tail:
|
||||
acc.assistant_response.content = (
|
||||
acc.assistant_response.content or ""
|
||||
) + tail
|
||||
yield StreamTextDelta(id="", delta=tail)
|
||||
|
||||
# --- Post-stream processing (only on success) ---
|
||||
if state.adapter.has_unresolved_tool_calls:
|
||||
logger.warning(
|
||||
|
||||
102
autogpt_platform/backend/backend/copilot/thinking_stripper.py
Normal file
102
autogpt_platform/backend/backend/copilot/thinking_stripper.py
Normal file
@@ -0,0 +1,102 @@
|
||||
"""Streaming tag stripper for model reasoning blocks.
|
||||
|
||||
Different LLMs wrap internal chain-of-thought in different XML-style tags
|
||||
(Claude uses ``<thinking>``, Gemini uses ``<internal_reasoning>``, etc.).
|
||||
When extended thinking is **not** enabled, these tags may appear as plain text
|
||||
in the response stream and must be stripped before the content reaches the
|
||||
user.
|
||||
|
||||
The :class:`ThinkingStripper` handles chunk-boundary splitting so it can be
|
||||
plugged into any delta-based streaming pipeline.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
# Tag pairs to strip. Each entry is (open_tag, close_tag).
|
||||
REASONING_TAG_PAIRS: list[tuple[str, str]] = [
|
||||
("<thinking>", "</thinking>"),
|
||||
("<internal_reasoning>", "</internal_reasoning>"),
|
||||
]
|
||||
|
||||
# Longest opener — used to size the partial-tag buffer.
|
||||
_MAX_OPEN_TAG_LEN = max(len(o) for o, _ in REASONING_TAG_PAIRS)
|
||||
|
||||
|
||||
class ThinkingStripper:
|
||||
"""Strip reasoning blocks from a stream of text deltas.
|
||||
|
||||
Handles multiple tag patterns (``<thinking>``, ``<internal_reasoning>``,
|
||||
etc.) so the same stripper works across Claude, Gemini, and other models.
|
||||
|
||||
Buffers just enough characters to detect a tag that may be split
|
||||
across chunks; emits text immediately when no tag is in-flight.
|
||||
Robust to single chunks that open and close a block, multiple
|
||||
blocks per stream, and tags that straddle chunk boundaries.
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self._buffer: str = ""
|
||||
self._in_thinking: bool = False
|
||||
self._close_tag: str = "" # closing tag for the currently open block
|
||||
|
||||
def _find_open_tag(self) -> tuple[int, str, str]:
|
||||
"""Find the earliest opening tag in the buffer.
|
||||
|
||||
Returns (position, open_tag, close_tag) or (-1, "", "") if none.
|
||||
"""
|
||||
best_pos = -1
|
||||
best_open = ""
|
||||
best_close = ""
|
||||
for open_tag, close_tag in REASONING_TAG_PAIRS:
|
||||
pos = self._buffer.find(open_tag)
|
||||
if pos != -1 and (best_pos == -1 or pos < best_pos):
|
||||
best_pos = pos
|
||||
best_open = open_tag
|
||||
best_close = close_tag
|
||||
return best_pos, best_open, best_close
|
||||
|
||||
def process(self, chunk: str) -> str:
|
||||
"""Feed a chunk and return the text that is safe to emit now."""
|
||||
self._buffer += chunk
|
||||
out: list[str] = []
|
||||
while self._buffer:
|
||||
if self._in_thinking:
|
||||
end = self._buffer.find(self._close_tag)
|
||||
if end == -1:
|
||||
keep = len(self._close_tag) - 1
|
||||
self._buffer = self._buffer[-keep:] if keep else ""
|
||||
return "".join(out)
|
||||
self._buffer = self._buffer[end + len(self._close_tag) :]
|
||||
self._in_thinking = False
|
||||
self._close_tag = ""
|
||||
else:
|
||||
start, open_tag, close_tag = self._find_open_tag()
|
||||
if start == -1:
|
||||
# No opening tag; emit everything except a tail that
|
||||
# could start a partial opener on the next chunk.
|
||||
safe_end = len(self._buffer)
|
||||
for keep in range(
|
||||
min(_MAX_OPEN_TAG_LEN - 1, len(self._buffer)), 0, -1
|
||||
):
|
||||
tail = self._buffer[-keep:]
|
||||
if any(o[:keep] == tail for o, _ in REASONING_TAG_PAIRS):
|
||||
safe_end = len(self._buffer) - keep
|
||||
break
|
||||
out.append(self._buffer[:safe_end])
|
||||
self._buffer = self._buffer[safe_end:]
|
||||
return "".join(out)
|
||||
out.append(self._buffer[:start])
|
||||
self._buffer = self._buffer[start + len(open_tag) :]
|
||||
self._in_thinking = True
|
||||
self._close_tag = close_tag
|
||||
return "".join(out)
|
||||
|
||||
def flush(self) -> str:
|
||||
"""Return any remaining emittable text when the stream ends."""
|
||||
if self._in_thinking:
|
||||
# Unclosed thinking block — discard the buffered reasoning.
|
||||
self._buffer = ""
|
||||
return ""
|
||||
out = self._buffer
|
||||
self._buffer = ""
|
||||
return out
|
||||
@@ -0,0 +1,93 @@
|
||||
"""Tests for the shared ThinkingStripper."""
|
||||
|
||||
from backend.copilot.thinking_stripper import ThinkingStripper
|
||||
|
||||
|
||||
def test_basic_thinking_tag() -> None:
|
||||
"""<thinking>...</thinking> blocks are fully stripped."""
|
||||
s = ThinkingStripper()
|
||||
assert s.process("<thinking>internal reasoning here</thinking>Hello!") == "Hello!"
|
||||
|
||||
|
||||
def test_internal_reasoning_tag() -> None:
|
||||
"""<internal_reasoning>...</internal_reasoning> blocks are stripped."""
|
||||
s = ThinkingStripper()
|
||||
assert (
|
||||
s.process("<internal_reasoning>step by step</internal_reasoning>Answer")
|
||||
== "Answer"
|
||||
)
|
||||
|
||||
|
||||
def test_split_across_chunks() -> None:
|
||||
"""Tags split across multiple chunks are handled correctly."""
|
||||
s = ThinkingStripper()
|
||||
out = s.process("Hello <thin")
|
||||
out += s.process("king>secret</thinking> world")
|
||||
assert out == "Hello world"
|
||||
|
||||
|
||||
def test_plain_text_preserved() -> None:
|
||||
"""Plain text with the word 'thinking' is not stripped."""
|
||||
s = ThinkingStripper()
|
||||
assert (
|
||||
s.process("I am thinking about this problem")
|
||||
== "I am thinking about this problem"
|
||||
)
|
||||
|
||||
|
||||
def test_multiple_blocks() -> None:
|
||||
"""Multiple reasoning blocks in one stream are all stripped."""
|
||||
s = ThinkingStripper()
|
||||
result = s.process(
|
||||
"A<thinking>x</thinking>B<internal_reasoning>y</internal_reasoning>C"
|
||||
)
|
||||
assert result == "ABC"
|
||||
|
||||
|
||||
def test_flush_discards_unclosed() -> None:
|
||||
"""Unclosed reasoning block is discarded on flush."""
|
||||
s = ThinkingStripper()
|
||||
s.process("Start<thinking>never closed")
|
||||
flushed = s.flush()
|
||||
assert "never closed" not in flushed
|
||||
|
||||
|
||||
def test_empty_block() -> None:
|
||||
"""Empty reasoning blocks are handled gracefully."""
|
||||
s = ThinkingStripper()
|
||||
assert s.process("Before<thinking></thinking>After") == "BeforeAfter"
|
||||
|
||||
|
||||
def test_flush_emits_remaining_plain_text() -> None:
|
||||
"""flush() returns any plain text still in the buffer."""
|
||||
s = ThinkingStripper()
|
||||
# The trailing '<' could be a partial tag, so process buffers it.
|
||||
out = s.process("Hello")
|
||||
flushed = s.flush()
|
||||
assert out + flushed == "Hello"
|
||||
|
||||
|
||||
def test_internal_reasoning_split_open_tag() -> None:
|
||||
"""<internal_reasoning> split across three chunks."""
|
||||
s = ThinkingStripper()
|
||||
out = s.process("OK <inter")
|
||||
out += s.process("nal_reaso")
|
||||
out += s.process("ning>secret stuff</internal_reasoning> visible")
|
||||
out += s.flush()
|
||||
assert out == "OK visible"
|
||||
|
||||
|
||||
def test_no_tags_passthrough() -> None:
|
||||
"""Text without any tags passes through unchanged."""
|
||||
s = ThinkingStripper()
|
||||
out = s.process("Hello world, this is fine.")
|
||||
out += s.flush()
|
||||
assert out == "Hello world, this is fine."
|
||||
|
||||
|
||||
def test_reasoning_at_end_of_stream() -> None:
|
||||
"""Reasoning block at end of stream with no trailing text."""
|
||||
s = ThinkingStripper()
|
||||
out = s.process("Answer<internal_reasoning>my thoughts</internal_reasoning>")
|
||||
out += s.flush()
|
||||
assert out == "Answer"
|
||||
Reference in New Issue
Block a user