mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-08 03:00:28 -04:00
Requested by @majdyz When CoPilot compacts (summarizes/truncates) conversation history to fit within context limits, the user now sees it rendered like a tool call — a spinner while compaction runs, then a completion notice. **Backend:** - Added `compaction_start_events()`, `compaction_end_events()`, `compaction_events()` in `response_model.py` using the existing tool-call SSE protocol (`tool-input-start` → `tool-input-available` → `tool-output-available`) - All three compaction paths (legacy `service.py`, SDK pre-query, SDK mid-stream) use the same pattern - Pre-query and SDK-internal compaction tracked independently so neither suppresses the other **Frontend:** - Added `compaction` tool category to `GenericTool` with `ArrowsClockwise` icon - Shows "Summarizing earlier messages…" with spinner while running - Shows "Earlier messages were summarized" when done - No expandable accordion — just the status line **Cleanup:** - Removed unused `system_notice_start/end_events`, `COMPACTION_STARTED_MSG` - Removed unused `system_notice_events`, `system_error_events`, `_system_text_events` Closes SECRT-2053 --------- Co-authored-by: Zamil Majdy <zamil.majdy@agpt.co>
240 lines
8.2 KiB
Python
240 lines
8.2 KiB
Python
"""Compaction tracking for SDK-based chat sessions.
|
|
|
|
Encapsulates the state machine and event emission for context compaction,
|
|
both pre-query (history compressed before SDK query) and SDK-internal
|
|
(PreCompact hook fires mid-stream).
|
|
|
|
All compaction-related helpers live here: event builders, message filtering,
|
|
persistence, and the ``CompactionTracker`` state machine.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import uuid
|
|
from collections.abc import Callable
|
|
|
|
from ..constants import COMPACTION_DONE_MSG, COMPACTION_TOOL_NAME
|
|
from ..model import ChatMessage, ChatSession
|
|
from ..response_model import (
|
|
StreamBaseResponse,
|
|
StreamFinishStep,
|
|
StreamStartStep,
|
|
StreamToolInputAvailable,
|
|
StreamToolInputStart,
|
|
StreamToolOutputAvailable,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Event builders (private — use CompactionTracker or compaction_events)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _start_events(tool_call_id: str) -> list[StreamBaseResponse]:
|
|
"""Build the opening events for a compaction tool call."""
|
|
return [
|
|
StreamStartStep(),
|
|
StreamToolInputStart(toolCallId=tool_call_id, toolName=COMPACTION_TOOL_NAME),
|
|
StreamToolInputAvailable(
|
|
toolCallId=tool_call_id, toolName=COMPACTION_TOOL_NAME, input={}
|
|
),
|
|
]
|
|
|
|
|
|
def _end_events(tool_call_id: str, message: str) -> list[StreamBaseResponse]:
|
|
"""Build the closing events for a compaction tool call."""
|
|
return [
|
|
StreamToolOutputAvailable(
|
|
toolCallId=tool_call_id,
|
|
toolName=COMPACTION_TOOL_NAME,
|
|
output=message,
|
|
),
|
|
StreamFinishStep(),
|
|
]
|
|
|
|
|
|
def _new_tool_call_id() -> str:
|
|
return f"compaction-{uuid.uuid4().hex[:12]}"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Public event builder
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def emit_compaction(session: ChatSession) -> list[StreamBaseResponse]:
|
|
"""Create, persist, and return a self-contained compaction tool call.
|
|
|
|
Convenience for callers that don't use ``CompactionTracker`` (e.g. the
|
|
legacy non-SDK streaming path in ``service.py``).
|
|
"""
|
|
tc_id = _new_tool_call_id()
|
|
evts = compaction_events(COMPACTION_DONE_MSG, tool_call_id=tc_id)
|
|
_persist(session, tc_id, COMPACTION_DONE_MSG)
|
|
return evts
|
|
|
|
|
|
def compaction_events(
|
|
message: str, tool_call_id: str | None = None
|
|
) -> list[StreamBaseResponse]:
|
|
"""Emit a self-contained compaction tool call (already completed).
|
|
|
|
When *tool_call_id* is provided it is reused (e.g. for persistence that
|
|
must match an already-streamed start event). Otherwise a new ID is
|
|
generated.
|
|
"""
|
|
tc_id = tool_call_id or _new_tool_call_id()
|
|
return _start_events(tc_id) + _end_events(tc_id, message)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Message filtering
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def filter_compaction_messages(
|
|
messages: list[ChatMessage],
|
|
) -> list[ChatMessage]:
|
|
"""Remove synthetic compaction tool-call messages (UI-only artifacts).
|
|
|
|
Strips assistant messages whose only tool calls are compaction calls,
|
|
and their corresponding tool-result messages.
|
|
"""
|
|
compaction_ids: set[str] = set()
|
|
filtered: list[ChatMessage] = []
|
|
for msg in messages:
|
|
if msg.role == "assistant" and msg.tool_calls:
|
|
for tc in msg.tool_calls:
|
|
if tc.get("function", {}).get("name") == COMPACTION_TOOL_NAME:
|
|
compaction_ids.add(tc.get("id", ""))
|
|
real_calls = [
|
|
tc
|
|
for tc in msg.tool_calls
|
|
if tc.get("function", {}).get("name") != COMPACTION_TOOL_NAME
|
|
]
|
|
if not real_calls and not msg.content:
|
|
continue
|
|
if msg.role == "tool" and msg.tool_call_id in compaction_ids:
|
|
continue
|
|
filtered.append(msg)
|
|
return filtered
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Persistence
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _persist(session: ChatSession, tool_call_id: str, message: str) -> None:
|
|
"""Append compaction tool-call + result to session messages.
|
|
|
|
Compaction events are synthetic so they bypass the normal adapter
|
|
accumulation. This explicitly records them so they survive a page refresh.
|
|
"""
|
|
session.messages.append(
|
|
ChatMessage(
|
|
role="assistant",
|
|
content="",
|
|
tool_calls=[
|
|
{
|
|
"id": tool_call_id,
|
|
"type": "function",
|
|
"function": {
|
|
"name": COMPACTION_TOOL_NAME,
|
|
"arguments": "{}",
|
|
},
|
|
}
|
|
],
|
|
)
|
|
)
|
|
session.messages.append(
|
|
ChatMessage(role="tool", content=message, tool_call_id=tool_call_id)
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CompactionTracker — state machine for streaming sessions
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class CompactionTracker:
|
|
"""Tracks compaction state and yields UI events.
|
|
|
|
Two compaction paths:
|
|
|
|
1. **Pre-query** — history compressed before the SDK query starts.
|
|
Call :meth:`emit_pre_query` to yield a self-contained tool call.
|
|
|
|
2. **SDK-internal** — ``PreCompact`` hook fires mid-stream.
|
|
Call :meth:`emit_start_if_ready` on heartbeat ticks and
|
|
:meth:`emit_end_if_ready` when a message arrives.
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
self._compact_start = asyncio.Event()
|
|
self._start_emitted = False
|
|
self._done = False
|
|
self._tool_call_id = ""
|
|
|
|
@property
|
|
def on_compact(self) -> Callable[[], None]:
|
|
"""Callback for the PreCompact hook."""
|
|
return self._compact_start.set
|
|
|
|
# ------------------------------------------------------------------
|
|
# Pre-query compaction
|
|
# ------------------------------------------------------------------
|
|
|
|
def emit_pre_query(self, session: ChatSession) -> list[StreamBaseResponse]:
|
|
"""Emit + persist a self-contained compaction tool call."""
|
|
self._done = True
|
|
return emit_compaction(session)
|
|
|
|
# ------------------------------------------------------------------
|
|
# SDK-internal compaction
|
|
# ------------------------------------------------------------------
|
|
|
|
def reset_for_query(self) -> None:
|
|
"""Reset per-query state before a new SDK query."""
|
|
self._done = False
|
|
self._start_emitted = False
|
|
self._tool_call_id = ""
|
|
|
|
def emit_start_if_ready(self) -> list[StreamBaseResponse]:
|
|
"""If the PreCompact hook fired, emit start events (spinning tool)."""
|
|
if self._compact_start.is_set() and not self._start_emitted and not self._done:
|
|
self._compact_start.clear()
|
|
self._start_emitted = True
|
|
self._tool_call_id = _new_tool_call_id()
|
|
return _start_events(self._tool_call_id)
|
|
return []
|
|
|
|
async def emit_end_if_ready(self, session: ChatSession) -> list[StreamBaseResponse]:
|
|
"""If compaction is in progress, emit end events and persist."""
|
|
# Yield so pending hook tasks can set compact_start
|
|
await asyncio.sleep(0)
|
|
|
|
if self._done:
|
|
return []
|
|
if not self._start_emitted and not self._compact_start.is_set():
|
|
return []
|
|
|
|
if self._start_emitted:
|
|
# Close the open spinner
|
|
done_events = _end_events(self._tool_call_id, COMPACTION_DONE_MSG)
|
|
persist_id = self._tool_call_id
|
|
else:
|
|
# PreCompact fired but start never emitted — self-contained
|
|
persist_id = _new_tool_call_id()
|
|
done_events = compaction_events(
|
|
COMPACTION_DONE_MSG, tool_call_id=persist_id
|
|
)
|
|
|
|
self._compact_start.clear()
|
|
self._start_emitted = False
|
|
self._done = True
|
|
_persist(session, persist_id, COMPACTION_DONE_MSG)
|
|
return done_events
|