mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-08 03:00:28 -04:00
## Summary - Adds a session-level `dry_run` flag that forces ALL tool calls (`run_block`, `run_agent`) in a copilot/autopilot session to use dry-run simulation mode - Stores the flag in a typed `ChatSessionMetadata` JSON model on the `ChatSession` DB row, accessed via `session.dry_run` property - Adds `dry_run` to the AutoPilot block Input schema so graph builders can create dry-run autopilot nodes - Refactors multiple copilot tools from `**kwargs` to explicit parameters for type safety ## Changes - **Prisma schema**: Added `metadata` JSON column to `ChatSession` model with migration - **Python models**: Added `ChatSessionMetadata` model with `dry_run` field, added `metadata` field to `ChatSessionInfo` and `ChatSession`, updated `from_db()`, `new()`, and `create_chat_session()` - **Session propagation**: `set_execution_context(user_id, session)` called from `baseline/service.py` so tool handlers can read session-level flags via `session.dry_run` - **Tool enforcement**: `run_block` and `run_agent` check `session.dry_run` and force `dry_run=True` when set; `run_agent` blocks scheduling in dry-run sessions - **AutoPilot block**: Added `dry_run` input field, passes it when creating sessions - **Chat API**: Added `CreateSessionRequest` model with `dry_run` field to `POST /sessions` endpoint; added `metadata` to session responses - **Frontend**: Updated `useChatSession.ts` to pass body to the create session mutation - **Tool refactoring**: Multiple copilot tools refactored from `**kwargs` to explicit named parameters (agent_browser, manage_folders, workspace_files, connect_integration, agent_output, bash_exec, etc.) for better type safety ## Test plan - [x] Unit tests for `ChatSession.new()` with dry_run parameter - [x] Unit tests for `RunBlockTool` session dry_run override - [x] Unit tests for `RunAgentTool` session dry_run override - [x] Unit tests for session dry_run blocks scheduling - [x] Existing dry_run tests still pass (12/12) - [x] Existing permissions tests still pass - [x] All pre-commit hooks pass (ruff, isort, pyright, tsc) - [ ] Manual: Create autopilot session with `dry_run=True`, verify run_block/run_agent calls use simulation --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
408 lines
15 KiB
Python
408 lines
15 KiB
Python
"""Tests for sdk/compaction.py — event builders, filtering, persistence, and
|
|
CompactionTracker state machine."""
|
|
|
|
import pytest
|
|
|
|
from backend.copilot.constants import COMPACTION_DONE_MSG, COMPACTION_TOOL_NAME
|
|
from backend.copilot.model import ChatMessage, ChatSession
|
|
from backend.copilot.response_model import (
|
|
StreamFinishStep,
|
|
StreamStartStep,
|
|
StreamToolInputAvailable,
|
|
StreamToolInputStart,
|
|
StreamToolOutputAvailable,
|
|
)
|
|
from backend.copilot.sdk.compaction import (
|
|
CompactionTracker,
|
|
compaction_events,
|
|
emit_compaction,
|
|
filter_compaction_messages,
|
|
)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _make_session() -> ChatSession:
|
|
return ChatSession.new(user_id="test-user", dry_run=False)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# compaction_events
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestCompactionEvents:
|
|
def test_returns_start_and_end_events(self):
|
|
evts = compaction_events("done")
|
|
assert len(evts) == 5
|
|
assert isinstance(evts[0], StreamStartStep)
|
|
assert isinstance(evts[1], StreamToolInputStart)
|
|
assert isinstance(evts[2], StreamToolInputAvailable)
|
|
assert isinstance(evts[3], StreamToolOutputAvailable)
|
|
assert isinstance(evts[4], StreamFinishStep)
|
|
|
|
def test_uses_provided_tool_call_id(self):
|
|
evts = compaction_events("msg", tool_call_id="my-id")
|
|
tool_start = evts[1]
|
|
assert isinstance(tool_start, StreamToolInputStart)
|
|
assert tool_start.toolCallId == "my-id"
|
|
|
|
def test_generates_id_when_not_provided(self):
|
|
evts = compaction_events("msg")
|
|
tool_start = evts[1]
|
|
assert isinstance(tool_start, StreamToolInputStart)
|
|
assert tool_start.toolCallId.startswith("compaction-")
|
|
|
|
def test_tool_name_is_context_compaction(self):
|
|
evts = compaction_events("msg")
|
|
tool_start = evts[1]
|
|
assert isinstance(tool_start, StreamToolInputStart)
|
|
assert tool_start.toolName == COMPACTION_TOOL_NAME
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# emit_compaction
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestEmitCompaction:
|
|
def test_persists_to_session(self):
|
|
session = _make_session()
|
|
assert len(session.messages) == 0
|
|
evts = emit_compaction(session)
|
|
assert len(evts) == 5
|
|
# Should have appended 2 messages (assistant tool call + tool result)
|
|
assert len(session.messages) == 2
|
|
assert session.messages[0].role == "assistant"
|
|
assert session.messages[0].tool_calls is not None
|
|
assert (
|
|
session.messages[0].tool_calls[0]["function"]["name"]
|
|
== COMPACTION_TOOL_NAME
|
|
)
|
|
assert session.messages[1].role == "tool"
|
|
assert session.messages[1].content == COMPACTION_DONE_MSG
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# filter_compaction_messages
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestFilterCompactionMessages:
|
|
def test_removes_compaction_tool_calls(self):
|
|
msgs = [
|
|
ChatMessage(role="user", content="hello"),
|
|
ChatMessage(
|
|
role="assistant",
|
|
content="",
|
|
tool_calls=[
|
|
{
|
|
"id": "comp-1",
|
|
"type": "function",
|
|
"function": {"name": COMPACTION_TOOL_NAME, "arguments": "{}"},
|
|
}
|
|
],
|
|
),
|
|
ChatMessage(
|
|
role="tool", content=COMPACTION_DONE_MSG, tool_call_id="comp-1"
|
|
),
|
|
ChatMessage(role="assistant", content="world"),
|
|
]
|
|
filtered = filter_compaction_messages(msgs)
|
|
assert len(filtered) == 2
|
|
assert filtered[0].content == "hello"
|
|
assert filtered[1].content == "world"
|
|
|
|
def test_keeps_non_compaction_tool_calls(self):
|
|
msgs = [
|
|
ChatMessage(
|
|
role="assistant",
|
|
content="",
|
|
tool_calls=[
|
|
{
|
|
"id": "real-1",
|
|
"type": "function",
|
|
"function": {"name": "search", "arguments": "{}"},
|
|
}
|
|
],
|
|
),
|
|
ChatMessage(role="tool", content="result", tool_call_id="real-1"),
|
|
]
|
|
filtered = filter_compaction_messages(msgs)
|
|
assert len(filtered) == 2
|
|
|
|
def test_keeps_assistant_with_content_and_compaction_call(self):
|
|
"""If assistant message has both content and a compaction tool call,
|
|
the message is kept (has real content)."""
|
|
msgs = [
|
|
ChatMessage(
|
|
role="assistant",
|
|
content="I have content",
|
|
tool_calls=[
|
|
{
|
|
"id": "comp-1",
|
|
"type": "function",
|
|
"function": {"name": COMPACTION_TOOL_NAME, "arguments": "{}"},
|
|
}
|
|
],
|
|
),
|
|
]
|
|
filtered = filter_compaction_messages(msgs)
|
|
assert len(filtered) == 1
|
|
|
|
def test_empty_list(self):
|
|
assert filter_compaction_messages([]) == []
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CompactionTracker
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestCompactionTracker:
|
|
def test_on_compact_sets_event(self):
|
|
tracker = CompactionTracker()
|
|
tracker.on_compact()
|
|
assert tracker._compact_start.is_set()
|
|
|
|
def test_emit_start_if_ready_no_event(self):
|
|
tracker = CompactionTracker()
|
|
assert tracker.emit_start_if_ready() == []
|
|
|
|
def test_emit_start_if_ready_with_event(self):
|
|
tracker = CompactionTracker()
|
|
tracker.on_compact()
|
|
evts = tracker.emit_start_if_ready()
|
|
assert len(evts) == 3
|
|
assert isinstance(evts[0], StreamStartStep)
|
|
assert isinstance(evts[1], StreamToolInputStart)
|
|
assert isinstance(evts[2], StreamToolInputAvailable)
|
|
|
|
def test_emit_start_only_once(self):
|
|
tracker = CompactionTracker()
|
|
tracker.on_compact()
|
|
evts1 = tracker.emit_start_if_ready()
|
|
assert len(evts1) == 3
|
|
# Second call should return empty
|
|
evts2 = tracker.emit_start_if_ready()
|
|
assert evts2 == []
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_emit_end_after_start(self):
|
|
tracker = CompactionTracker()
|
|
session = _make_session()
|
|
tracker.on_compact()
|
|
tracker.emit_start_if_ready()
|
|
result = await tracker.emit_end_if_ready(session)
|
|
assert result.just_ended is True
|
|
assert len(result.events) == 2
|
|
assert isinstance(result.events[0], StreamToolOutputAvailable)
|
|
assert isinstance(result.events[1], StreamFinishStep)
|
|
# Should persist
|
|
assert len(session.messages) == 2
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_emit_end_without_start_self_contained(self):
|
|
"""If PreCompact fired but start was never emitted, emit_end
|
|
produces a self-contained compaction event."""
|
|
tracker = CompactionTracker()
|
|
session = _make_session()
|
|
tracker.on_compact()
|
|
# Don't call emit_start_if_ready
|
|
result = await tracker.emit_end_if_ready(session)
|
|
assert result.just_ended is True
|
|
assert len(result.events) == 5 # Full self-contained event
|
|
assert isinstance(result.events[0], StreamStartStep)
|
|
assert len(session.messages) == 2
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_emit_end_no_op_when_no_new_compaction(self):
|
|
tracker = CompactionTracker()
|
|
session = _make_session()
|
|
tracker.on_compact()
|
|
tracker.emit_start_if_ready()
|
|
result1 = await tracker.emit_end_if_ready(session)
|
|
assert result1.just_ended is True
|
|
# Second call should be no-op (no new on_compact)
|
|
result2 = await tracker.emit_end_if_ready(session)
|
|
assert result2.just_ended is False
|
|
assert result2.events == []
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_emit_end_no_op_when_nothing_happened(self):
|
|
tracker = CompactionTracker()
|
|
session = _make_session()
|
|
result = await tracker.emit_end_if_ready(session)
|
|
assert result.just_ended is False
|
|
assert result.events == []
|
|
|
|
def test_emit_pre_query(self):
|
|
tracker = CompactionTracker()
|
|
session = _make_session()
|
|
evts = tracker.emit_pre_query(session)
|
|
assert len(evts) == 5
|
|
assert len(session.messages) == 2
|
|
assert tracker._done is True
|
|
|
|
def test_reset_for_query(self):
|
|
tracker = CompactionTracker()
|
|
tracker._done = True
|
|
tracker._start_emitted = True
|
|
tracker._tool_call_id = "old"
|
|
tracker._transcript_path = "/some/path"
|
|
tracker.reset_for_query()
|
|
assert tracker._done is False
|
|
assert tracker._start_emitted is False
|
|
assert tracker._tool_call_id == ""
|
|
assert tracker._transcript_path == ""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_pre_query_blocks_sdk_compaction_until_reset(self):
|
|
"""After pre-query compaction, SDK compaction is blocked until
|
|
reset_for_query is called."""
|
|
tracker = CompactionTracker()
|
|
session = _make_session()
|
|
tracker.emit_pre_query(session)
|
|
tracker.on_compact()
|
|
# _done is True so emit_start_if_ready is blocked
|
|
evts = tracker.emit_start_if_ready()
|
|
assert evts == []
|
|
# Reset clears _done, allowing subsequent compaction
|
|
tracker.reset_for_query()
|
|
tracker.on_compact()
|
|
evts = tracker.emit_start_if_ready()
|
|
assert len(evts) == 3
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_reset_allows_new_compaction(self):
|
|
"""After reset_for_query, compaction can fire again."""
|
|
tracker = CompactionTracker()
|
|
session = _make_session()
|
|
tracker.emit_pre_query(session)
|
|
tracker.reset_for_query()
|
|
tracker.on_compact()
|
|
evts = tracker.emit_start_if_ready()
|
|
assert len(evts) == 3 # Start events emitted
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_tool_call_id_consistency(self):
|
|
"""Start and end events use the same tool_call_id."""
|
|
tracker = CompactionTracker()
|
|
session = _make_session()
|
|
tracker.on_compact()
|
|
start_evts = tracker.emit_start_if_ready()
|
|
result = await tracker.emit_end_if_ready(session)
|
|
start_evt = start_evts[1]
|
|
end_evt = result.events[0]
|
|
assert isinstance(start_evt, StreamToolInputStart)
|
|
assert isinstance(end_evt, StreamToolOutputAvailable)
|
|
assert start_evt.toolCallId == end_evt.toolCallId
|
|
# Persisted ID should also match
|
|
tool_calls = session.messages[0].tool_calls
|
|
assert tool_calls is not None
|
|
assert tool_calls[0]["id"] == start_evt.toolCallId
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_multiple_compactions_within_query(self):
|
|
"""Two mid-stream compactions within a single query both trigger."""
|
|
tracker = CompactionTracker()
|
|
session = _make_session()
|
|
|
|
# First compaction cycle
|
|
tracker.on_compact("/path/1")
|
|
tracker.emit_start_if_ready()
|
|
result1 = await tracker.emit_end_if_ready(session)
|
|
assert result1.just_ended is True
|
|
assert len(result1.events) == 2
|
|
assert result1.transcript_path == "/path/1"
|
|
|
|
# Second compaction cycle (should NOT be blocked — _done resets
|
|
# because emit_end_if_ready sets it True, but the next on_compact
|
|
# + emit_start_if_ready checks !_done which IS True now.
|
|
# So we need reset_for_query between queries, but within a single
|
|
# query multiple compactions work because _done blocks emit_start
|
|
# until the next message arrives, at which point emit_end detects it)
|
|
#
|
|
# Actually: _done=True blocks emit_start_if_ready, so we need
|
|
# the stream loop to reset. In practice service.py doesn't call
|
|
# reset between compactions within the same query — let's verify
|
|
# the actual behavior.
|
|
tracker.on_compact("/path/2")
|
|
# _done is True from first compaction, so start is blocked
|
|
start_evts = tracker.emit_start_if_ready()
|
|
assert start_evts == []
|
|
# But emit_end returns no-op because _done is True
|
|
result2 = await tracker.emit_end_if_ready(session)
|
|
assert result2.just_ended is False
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_multiple_compactions_with_intervening_message(self):
|
|
"""Multiple compactions work when the stream loop processes messages between them.
|
|
|
|
In the real service.py flow:
|
|
1. PreCompact fires → on_compact()
|
|
2. emit_start shows spinner
|
|
3. Next message arrives → emit_end completes compaction (_done=True)
|
|
4. Stream continues processing messages...
|
|
5. If a second PreCompact fires, _done=True blocks emit_start
|
|
6. But the next message triggers emit_end, which sees _done=True → no-op
|
|
7. The stream loop needs to detect this and handle accordingly
|
|
|
|
The actual flow for multiple compactions within a query requires
|
|
_done to be cleared between them. The service.py code uses
|
|
CompactionResult.just_ended to trigger replace_entries, and _done
|
|
stays True until reset_for_query.
|
|
"""
|
|
tracker = CompactionTracker()
|
|
session = _make_session()
|
|
|
|
# First compaction
|
|
tracker.on_compact("/path/1")
|
|
tracker.emit_start_if_ready()
|
|
result1 = await tracker.emit_end_if_ready(session)
|
|
assert result1.just_ended is True
|
|
assert result1.transcript_path == "/path/1"
|
|
|
|
# Simulate reset between queries
|
|
tracker.reset_for_query()
|
|
|
|
# Second compaction in new query
|
|
tracker.on_compact("/path/2")
|
|
start_evts = tracker.emit_start_if_ready()
|
|
assert len(start_evts) == 3
|
|
result2 = await tracker.emit_end_if_ready(session)
|
|
assert result2.just_ended is True
|
|
assert result2.transcript_path == "/path/2"
|
|
|
|
def test_on_compact_stores_transcript_path(self):
|
|
tracker = CompactionTracker()
|
|
tracker.on_compact("/some/path.jsonl")
|
|
assert tracker._transcript_path == "/some/path.jsonl"
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_emit_end_returns_transcript_path(self):
|
|
"""CompactionResult includes the transcript_path from on_compact."""
|
|
tracker = CompactionTracker()
|
|
session = _make_session()
|
|
tracker.on_compact("/my/session.jsonl")
|
|
tracker.emit_start_if_ready()
|
|
result = await tracker.emit_end_if_ready(session)
|
|
assert result.just_ended is True
|
|
assert result.transcript_path == "/my/session.jsonl"
|
|
# transcript_path is cleared after emit_end
|
|
assert tracker._transcript_path == ""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_emit_end_clears_transcript_path(self):
|
|
"""After emit_end, _transcript_path is reset so it doesn't leak to
|
|
subsequent non-compaction emit_end calls."""
|
|
tracker = CompactionTracker()
|
|
session = _make_session()
|
|
tracker.on_compact("/first/path.jsonl")
|
|
tracker.emit_start_if_ready()
|
|
await tracker.emit_end_if_ready(session)
|
|
# After compaction, _transcript_path is cleared
|
|
assert tracker._transcript_path == ""
|