mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-30 03:00:41 -04:00
refactor(copilot): use run_copilot_turn_via_queue for auto-approve
Replace the manual append_message_if + stream_registry.create_session + enqueue_copilot_turn logic in _run_auto_approve with Zamil's canonical run_copilot_turn_via_queue helper from session_waiter.py. This helper handles "add or queue based on session state" consistently — the same path used by run_sub_session, AutoPilotBlock, and the frontend. - If session is idle: creates stream registry entry + enqueues turn - If turn is in flight: queues into pending buffer for mid-turn drain - timeout=0 makes it fire-and-forget Removed: _no_user_action_since predicate, append_message_if import, baseline_index parameter, manual stream_registry + enqueue calls. Kept: Redis cancel flag for cross-process Modify cancellation, _pending_auto_approvals dict for in-process task cancel, client-side approve() at T=60 + server at T=65. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -3,9 +3,8 @@
|
||||
import asyncio
|
||||
import logging
|
||||
from typing import Any
|
||||
from uuid import uuid4
|
||||
|
||||
from backend.copilot.model import ChatMessage, ChatSession, append_message_if
|
||||
from backend.copilot.model import ChatSession
|
||||
from backend.data.redis_client import get_redis_async
|
||||
|
||||
from .base import BaseTool
|
||||
@@ -93,44 +92,12 @@ def needs_build_plan_approval(session: ChatSession) -> bool:
|
||||
return True
|
||||
|
||||
|
||||
def _no_user_action_since(baseline_index: int):
|
||||
"""Predicate: returns True iff no ``role == "user"`` message exists at
|
||||
or after ``baseline_index`` in the session message list.
|
||||
|
||||
Why an index instead of ``ChatMessage.sequence``: ``_save_session_to_db``
|
||||
persists messages with auto-assigned sequences in the DB but does NOT
|
||||
write those sequences back onto the in-memory ``ChatMessage`` objects,
|
||||
and ``cache_chat_session`` writes the in-memory copy to Redis. So when
|
||||
this predicate later loads the session from cache, freshly-appended
|
||||
messages have ``sequence=None``, which would falsely register as 0 and
|
||||
miss them entirely — the predicate would treat the user's manual
|
||||
"Approved" as if it never happened, and the auto-approve would fire a
|
||||
duplicate after the agent build had already completed. Indices are
|
||||
monotonic and require no DB-side bookkeeping.
|
||||
"""
|
||||
|
||||
def _check(session: ChatSession) -> bool:
|
||||
for m in session.messages[baseline_index:]:
|
||||
if m.role == "user":
|
||||
return False
|
||||
return True
|
||||
|
||||
return _check
|
||||
|
||||
|
||||
async def _run_auto_approve(
|
||||
session_id: str,
|
||||
user_id: str | None,
|
||||
baseline_index: int,
|
||||
) -> None:
|
||||
"""Wait the server-side timeout and inject a synthetic approval if the
|
||||
user has not acted in the meantime.
|
||||
async def _run_auto_approve(session_id: str, user_id: str | None) -> None:
|
||||
"""Wait the server-side timeout and dispatch the approval via
|
||||
``run_copilot_turn_via_queue`` — the canonical helper that queues the
|
||||
message if a turn is already in flight, or starts a new turn if idle.
|
||||
|
||||
Cancelled when the user clicks "Modify" (via ``cancel_auto_approve``).
|
||||
|
||||
Limitation: this lives in the executor process; if the worker restarts
|
||||
during the wait, the pending approval is lost (the user falls back to
|
||||
manual approve). Restart-resilience would need a Redis-backed scheduler.
|
||||
"""
|
||||
try:
|
||||
await asyncio.sleep(AUTO_APPROVE_SERVER_SECONDS)
|
||||
@@ -144,45 +111,22 @@ async def _run_auto_approve(
|
||||
)
|
||||
return
|
||||
|
||||
approval = ChatMessage(role="user", content=AUTO_APPROVE_MESSAGE)
|
||||
result = await append_message_if(
|
||||
session_id=session_id,
|
||||
message=approval,
|
||||
predicate=_no_user_action_since(baseline_index),
|
||||
)
|
||||
if result is None:
|
||||
# User already acted (or the session is gone) — nothing to do.
|
||||
return
|
||||
from backend.copilot.sdk.session_waiter import run_copilot_turn_via_queue
|
||||
|
||||
# Local imports avoid a circular dependency between this module and
|
||||
# the executor / API stream registry packages.
|
||||
from backend.copilot import stream_registry
|
||||
from backend.copilot.executor.utils import enqueue_copilot_turn
|
||||
|
||||
turn_id = str(uuid4())
|
||||
await stream_registry.create_session(
|
||||
outcome, result = await run_copilot_turn_via_queue(
|
||||
session_id=session_id,
|
||||
user_id=user_id or "",
|
||||
tool_call_id="chat_stream",
|
||||
tool_name="chat",
|
||||
turn_id=turn_id,
|
||||
message=AUTO_APPROVE_MESSAGE,
|
||||
timeout=0,
|
||||
tool_call_id="auto_approve",
|
||||
tool_name="decompose_goal_auto_approve",
|
||||
)
|
||||
logger.info(
|
||||
"decompose_goal auto-approve fired for session %s (outcome=%s, queued=%s)",
|
||||
session_id,
|
||||
outcome,
|
||||
result.queued,
|
||||
)
|
||||
try:
|
||||
await enqueue_copilot_turn(
|
||||
session_id=session_id,
|
||||
user_id=user_id,
|
||||
message=AUTO_APPROVE_MESSAGE,
|
||||
turn_id=turn_id,
|
||||
is_user_message=True,
|
||||
)
|
||||
except Exception:
|
||||
# If enqueueing fails, mark the session completed so it doesn't
|
||||
# stay stuck in "running" state in the stream registry forever.
|
||||
await stream_registry.mark_session_completed(
|
||||
session_id, error_message="Auto-approve enqueue failed"
|
||||
)
|
||||
raise
|
||||
logger.info("decompose_goal auto-approve fired for session %s", session_id)
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
except Exception:
|
||||
@@ -226,13 +170,7 @@ async def cancel_auto_approve(session_id: str) -> bool:
|
||||
async def _schedule_auto_approve(
|
||||
session_id: str | None, user_id: str | None, session: ChatSession
|
||||
) -> None:
|
||||
"""Schedule the fire-and-forget auto-approve task for this session.
|
||||
|
||||
The baseline is the current message-list length: any message that
|
||||
arrives at or after this index is "after the decomposition", so a
|
||||
user message there means the user (or a follow-up turn) has acted
|
||||
and the auto-approve should be skipped.
|
||||
"""
|
||||
"""Schedule the fire-and-forget auto-approve task for this session."""
|
||||
if not session_id:
|
||||
return
|
||||
# Cancel any existing pending approval for this session (e.g. if the
|
||||
@@ -244,8 +182,7 @@ async def _schedule_auto_approve(
|
||||
# the new auto-approve task isn't incorrectly suppressed.
|
||||
redis = await get_redis_async()
|
||||
await redis.delete(f"{_CANCEL_KEY_PREFIX}{session_id}")
|
||||
baseline_index = len(session.messages)
|
||||
task = asyncio.create_task(_run_auto_approve(session_id, user_id, baseline_index))
|
||||
task = asyncio.create_task(_run_auto_approve(session_id, user_id))
|
||||
_pending_auto_approvals[session_id] = task
|
||||
# Only remove from dict if this task is still the current one — a
|
||||
# cancelled old task's callback must not clobber a newly-scheduled one.
|
||||
|
||||
@@ -14,7 +14,6 @@ from .decompose_goal import (
|
||||
AUTO_APPROVE_CLIENT_SECONDS,
|
||||
DEFAULT_ACTION,
|
||||
DecomposeGoalTool,
|
||||
_no_user_action_since,
|
||||
cancel_auto_approve,
|
||||
needs_build_plan_approval,
|
||||
)
|
||||
@@ -292,60 +291,6 @@ async def test_response_includes_created_at(tool: DecomposeGoalTool, session):
|
||||
assert before <= result.created_at <= after
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Predicate: _no_user_action_since
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_predicate_passes_when_no_user_messages_after_baseline():
|
||||
session = make_session(_USER_ID)
|
||||
# Two pre-existing messages (indices 0, 1).
|
||||
session.messages.append(ChatMessage(role="user", content="initial"))
|
||||
session.messages.append(ChatMessage(role="assistant", content="tool call"))
|
||||
# Tool result lands at index 2 — this is what the executor appends after
|
||||
# _execute returns. baseline_index was captured at 2 inside _execute.
|
||||
session.messages.append(ChatMessage(role="tool", content="{...}"))
|
||||
assert _no_user_action_since(2)(session) is True
|
||||
|
||||
|
||||
def test_predicate_rejects_when_user_message_after_baseline():
|
||||
session = make_session(_USER_ID)
|
||||
session.messages.append(ChatMessage(role="user", content="initial"))
|
||||
session.messages.append(ChatMessage(role="assistant", content="tool call"))
|
||||
session.messages.append(ChatMessage(role="tool", content="{...}"))
|
||||
session.messages.append(ChatMessage(role="user", content="Approved"))
|
||||
assert _no_user_action_since(2)(session) is False
|
||||
|
||||
|
||||
def test_predicate_ignores_assistant_messages_after_baseline():
|
||||
"""Only user messages count as 'user action' — assistant messages are
|
||||
just the LLM continuing on its own."""
|
||||
session = make_session(_USER_ID)
|
||||
session.messages.append(ChatMessage(role="user", content="initial"))
|
||||
session.messages.append(ChatMessage(role="assistant", content="tool call"))
|
||||
session.messages.append(ChatMessage(role="tool", content="{...}"))
|
||||
session.messages.append(ChatMessage(role="assistant", content="summary"))
|
||||
assert _no_user_action_since(2)(session) is True
|
||||
|
||||
|
||||
def test_predicate_handles_messages_with_none_sequence():
|
||||
"""Regression: the previous sequence-based predicate ignored messages
|
||||
whose sequence was None (which is what cached/in-memory messages have
|
||||
until they're round-tripped through the DB), causing the auto-approve
|
||||
to fire after the user had already manually approved. The new
|
||||
index-based predicate must catch user messages regardless of sequence.
|
||||
"""
|
||||
session = make_session(_USER_ID)
|
||||
session.messages.append(ChatMessage(role="user", content="initial"))
|
||||
session.messages.append(ChatMessage(role="assistant", content="tool call"))
|
||||
session.messages.append(ChatMessage(role="tool", content="{...}"))
|
||||
# Sequence intentionally None — the cache often returns this state.
|
||||
user_msg = ChatMessage(role="user", content="Approved", sequence=None)
|
||||
session.messages.append(user_msg)
|
||||
assert user_msg.sequence is None
|
||||
assert _no_user_action_since(2)(session) is False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# needs_build_plan_approval — build-tool approval gate
|
||||
# ---------------------------------------------------------------------------
|
||||
@@ -448,7 +393,7 @@ def test_needs_approval_case_insensitive():
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Server-side auto-approve task — full flow
|
||||
# Server-side auto-approve task — uses run_copilot_turn_via_queue
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@@ -474,98 +419,34 @@ def _stub_redis():
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_auto_approve_fires_when_user_idle():
|
||||
"""When no user message is appended after the baseline sequence, the
|
||||
task should append the synthetic approval and enqueue a new turn."""
|
||||
session_id = "session-auto-approve-idle"
|
||||
|
||||
captured_message = {}
|
||||
|
||||
async def fake_append_message_if(session_id, message, predicate):
|
||||
captured_message["msg"] = message
|
||||
return make_session(_USER_ID)
|
||||
|
||||
fake_enqueue = AsyncMock()
|
||||
fake_create_session = AsyncMock()
|
||||
async def test_auto_approve_dispatches_via_queue_helper():
|
||||
"""_run_auto_approve should delegate to run_copilot_turn_via_queue."""
|
||||
fake_dispatch = AsyncMock(return_value=("completed", None))
|
||||
|
||||
with (
|
||||
_stub_redis(),
|
||||
patch(
|
||||
"backend.copilot.tools.decompose_goal.append_message_if",
|
||||
new=fake_append_message_if,
|
||||
"backend.copilot.sdk.session_waiter.run_copilot_turn_via_queue",
|
||||
new=fake_dispatch,
|
||||
),
|
||||
patch(
|
||||
"backend.copilot.tools.decompose_goal.AUTO_APPROVE_SERVER_SECONDS",
|
||||
0,
|
||||
),
|
||||
patch(
|
||||
"backend.copilot.executor.utils.enqueue_copilot_turn",
|
||||
new=fake_enqueue,
|
||||
),
|
||||
patch(
|
||||
"backend.copilot.stream_registry.create_session",
|
||||
new=fake_create_session,
|
||||
),
|
||||
):
|
||||
await decompose_goal_module._run_auto_approve(
|
||||
session_id=session_id,
|
||||
user_id=_USER_ID,
|
||||
baseline_index=5,
|
||||
)
|
||||
await decompose_goal_module._run_auto_approve("session-idle", _USER_ID)
|
||||
|
||||
assert captured_message["msg"].role == "user"
|
||||
assert captured_message["msg"].content == "Approved. Please build the agent."
|
||||
fake_create_session.assert_awaited_once()
|
||||
fake_enqueue.assert_awaited_once()
|
||||
assert fake_enqueue.await_args is not None
|
||||
enqueue_kwargs = fake_enqueue.await_args.kwargs
|
||||
assert enqueue_kwargs["session_id"] == session_id
|
||||
assert enqueue_kwargs["message"] == "Approved. Please build the agent."
|
||||
assert enqueue_kwargs["is_user_message"] is True
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_auto_approve_skips_when_user_already_acted():
|
||||
"""If append_message_if returns None (predicate rejected because the
|
||||
user already sent a message), no turn should be enqueued."""
|
||||
fake_append_message_if = AsyncMock(return_value=None)
|
||||
fake_enqueue = AsyncMock()
|
||||
fake_create_session = AsyncMock()
|
||||
|
||||
with (
|
||||
_stub_redis(),
|
||||
patch(
|
||||
"backend.copilot.tools.decompose_goal.append_message_if",
|
||||
new=fake_append_message_if,
|
||||
),
|
||||
patch(
|
||||
"backend.copilot.tools.decompose_goal.AUTO_APPROVE_SERVER_SECONDS",
|
||||
0,
|
||||
),
|
||||
patch(
|
||||
"backend.copilot.executor.utils.enqueue_copilot_turn",
|
||||
new=fake_enqueue,
|
||||
),
|
||||
patch(
|
||||
"backend.copilot.stream_registry.create_session",
|
||||
new=fake_create_session,
|
||||
),
|
||||
):
|
||||
await decompose_goal_module._run_auto_approve(
|
||||
session_id="session-acted",
|
||||
user_id=_USER_ID,
|
||||
baseline_index=5,
|
||||
)
|
||||
|
||||
fake_append_message_if.assert_awaited_once()
|
||||
fake_enqueue.assert_not_awaited()
|
||||
fake_create_session.assert_not_awaited()
|
||||
fake_dispatch.assert_awaited_once()
|
||||
call_kwargs = fake_dispatch.await_args.kwargs
|
||||
assert call_kwargs["session_id"] == "session-idle"
|
||||
assert call_kwargs["message"] == "Approved. Please build the agent."
|
||||
assert call_kwargs["timeout"] == 0
|
||||
assert call_kwargs["tool_name"] == "decompose_goal_auto_approve"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_auto_approve_swallows_unexpected_errors():
|
||||
"""A failure inside the task must never propagate — the worker should
|
||||
keep running."""
|
||||
"""A failure inside the task must never propagate."""
|
||||
|
||||
async def boom(*args, **kwargs):
|
||||
raise RuntimeError("kaboom")
|
||||
@@ -573,7 +454,7 @@ async def test_auto_approve_swallows_unexpected_errors():
|
||||
with (
|
||||
_stub_redis(),
|
||||
patch(
|
||||
"backend.copilot.tools.decompose_goal.append_message_if",
|
||||
"backend.copilot.sdk.session_waiter.run_copilot_turn_via_queue",
|
||||
new=boom,
|
||||
),
|
||||
patch(
|
||||
@@ -581,19 +462,12 @@ async def test_auto_approve_swallows_unexpected_errors():
|
||||
0,
|
||||
),
|
||||
):
|
||||
# Should not raise.
|
||||
await decompose_goal_module._run_auto_approve(
|
||||
session_id="session-error",
|
||||
user_id=_USER_ID,
|
||||
baseline_index=0,
|
||||
)
|
||||
await decompose_goal_module._run_auto_approve("session-error", None)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_schedule_auto_approve_creates_task(monkeypatch):
|
||||
"""_schedule_auto_approve should add a task to the tracking set and
|
||||
auto-remove it on completion. The baseline passed to _run_auto_approve
|
||||
must be the current message-list length at schedule time."""
|
||||
"""_schedule_auto_approve should add a task to the tracking dict."""
|
||||
monkeypatch.setattr(decompose_goal_module, "AUTO_APPROVE_SERVER_SECONDS", 0)
|
||||
fake_run = AsyncMock()
|
||||
monkeypatch.setattr(decompose_goal_module, "_run_auto_approve", fake_run)
|
||||
@@ -604,10 +478,6 @@ async def test_schedule_auto_approve_creates_task(monkeypatch):
|
||||
)
|
||||
|
||||
session = make_session(_USER_ID)
|
||||
# make_session pre-populates 1 message (guide_read). Add 2 more.
|
||||
session.messages.append(ChatMessage(role="user", content="initial"))
|
||||
session.messages.append(ChatMessage(role="assistant", content="tool call"))
|
||||
expected_baseline = len(session.messages)
|
||||
|
||||
await _REAL_SCHEDULE_AUTO_APPROVE(
|
||||
session_id="session-schedule",
|
||||
@@ -615,12 +485,11 @@ async def test_schedule_auto_approve_creates_task(monkeypatch):
|
||||
session=session,
|
||||
)
|
||||
|
||||
# Wait for the scheduled task to complete.
|
||||
await asyncio.sleep(0)
|
||||
while decompose_goal_module._pending_auto_approvals:
|
||||
await asyncio.sleep(0)
|
||||
|
||||
fake_run.assert_awaited_once_with("session-schedule", _USER_ID, expected_baseline)
|
||||
fake_run.assert_awaited_once_with("session-schedule", _USER_ID)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
|
||||
Reference in New Issue
Block a user