diff --git a/autogpt_platform/backend/backend/copilot/tools/decompose_goal.py b/autogpt_platform/backend/backend/copilot/tools/decompose_goal.py index a4b53f48ec..5c2fd4e35f 100644 --- a/autogpt_platform/backend/backend/copilot/tools/decompose_goal.py +++ b/autogpt_platform/backend/backend/copilot/tools/decompose_goal.py @@ -3,9 +3,8 @@ import asyncio import logging from typing import Any -from uuid import uuid4 -from backend.copilot.model import ChatMessage, ChatSession, append_message_if +from backend.copilot.model import ChatSession from backend.data.redis_client import get_redis_async from .base import BaseTool @@ -93,44 +92,12 @@ def needs_build_plan_approval(session: ChatSession) -> bool: return True -def _no_user_action_since(baseline_index: int): - """Predicate: returns True iff no ``role == "user"`` message exists at - or after ``baseline_index`` in the session message list. - - Why an index instead of ``ChatMessage.sequence``: ``_save_session_to_db`` - persists messages with auto-assigned sequences in the DB but does NOT - write those sequences back onto the in-memory ``ChatMessage`` objects, - and ``cache_chat_session`` writes the in-memory copy to Redis. So when - this predicate later loads the session from cache, freshly-appended - messages have ``sequence=None``, which would falsely register as 0 and - miss them entirely — the predicate would treat the user's manual - "Approved" as if it never happened, and the auto-approve would fire a - duplicate after the agent build had already completed. Indices are - monotonic and require no DB-side bookkeeping. - """ - - def _check(session: ChatSession) -> bool: - for m in session.messages[baseline_index:]: - if m.role == "user": - return False - return True - - return _check - - -async def _run_auto_approve( - session_id: str, - user_id: str | None, - baseline_index: int, -) -> None: - """Wait the server-side timeout and inject a synthetic approval if the - user has not acted in the meantime. +async def _run_auto_approve(session_id: str, user_id: str | None) -> None: + """Wait the server-side timeout and dispatch the approval via + ``run_copilot_turn_via_queue`` — the canonical helper that queues the + message if a turn is already in flight, or starts a new turn if idle. Cancelled when the user clicks "Modify" (via ``cancel_auto_approve``). - - Limitation: this lives in the executor process; if the worker restarts - during the wait, the pending approval is lost (the user falls back to - manual approve). Restart-resilience would need a Redis-backed scheduler. """ try: await asyncio.sleep(AUTO_APPROVE_SERVER_SECONDS) @@ -144,45 +111,22 @@ async def _run_auto_approve( ) return - approval = ChatMessage(role="user", content=AUTO_APPROVE_MESSAGE) - result = await append_message_if( - session_id=session_id, - message=approval, - predicate=_no_user_action_since(baseline_index), - ) - if result is None: - # User already acted (or the session is gone) — nothing to do. - return + from backend.copilot.sdk.session_waiter import run_copilot_turn_via_queue - # Local imports avoid a circular dependency between this module and - # the executor / API stream registry packages. - from backend.copilot import stream_registry - from backend.copilot.executor.utils import enqueue_copilot_turn - - turn_id = str(uuid4()) - await stream_registry.create_session( + outcome, result = await run_copilot_turn_via_queue( session_id=session_id, user_id=user_id or "", - tool_call_id="chat_stream", - tool_name="chat", - turn_id=turn_id, + message=AUTO_APPROVE_MESSAGE, + timeout=0, + tool_call_id="auto_approve", + tool_name="decompose_goal_auto_approve", + ) + logger.info( + "decompose_goal auto-approve fired for session %s (outcome=%s, queued=%s)", + session_id, + outcome, + result.queued, ) - try: - await enqueue_copilot_turn( - session_id=session_id, - user_id=user_id, - message=AUTO_APPROVE_MESSAGE, - turn_id=turn_id, - is_user_message=True, - ) - except Exception: - # If enqueueing fails, mark the session completed so it doesn't - # stay stuck in "running" state in the stream registry forever. - await stream_registry.mark_session_completed( - session_id, error_message="Auto-approve enqueue failed" - ) - raise - logger.info("decompose_goal auto-approve fired for session %s", session_id) except asyncio.CancelledError: raise except Exception: @@ -226,13 +170,7 @@ async def cancel_auto_approve(session_id: str) -> bool: async def _schedule_auto_approve( session_id: str | None, user_id: str | None, session: ChatSession ) -> None: - """Schedule the fire-and-forget auto-approve task for this session. - - The baseline is the current message-list length: any message that - arrives at or after this index is "after the decomposition", so a - user message there means the user (or a follow-up turn) has acted - and the auto-approve should be skipped. - """ + """Schedule the fire-and-forget auto-approve task for this session.""" if not session_id: return # Cancel any existing pending approval for this session (e.g. if the @@ -244,8 +182,7 @@ async def _schedule_auto_approve( # the new auto-approve task isn't incorrectly suppressed. redis = await get_redis_async() await redis.delete(f"{_CANCEL_KEY_PREFIX}{session_id}") - baseline_index = len(session.messages) - task = asyncio.create_task(_run_auto_approve(session_id, user_id, baseline_index)) + task = asyncio.create_task(_run_auto_approve(session_id, user_id)) _pending_auto_approvals[session_id] = task # Only remove from dict if this task is still the current one — a # cancelled old task's callback must not clobber a newly-scheduled one. diff --git a/autogpt_platform/backend/backend/copilot/tools/decompose_goal_test.py b/autogpt_platform/backend/backend/copilot/tools/decompose_goal_test.py index de0ed8dcad..df68392d57 100644 --- a/autogpt_platform/backend/backend/copilot/tools/decompose_goal_test.py +++ b/autogpt_platform/backend/backend/copilot/tools/decompose_goal_test.py @@ -14,7 +14,6 @@ from .decompose_goal import ( AUTO_APPROVE_CLIENT_SECONDS, DEFAULT_ACTION, DecomposeGoalTool, - _no_user_action_since, cancel_auto_approve, needs_build_plan_approval, ) @@ -292,60 +291,6 @@ async def test_response_includes_created_at(tool: DecomposeGoalTool, session): assert before <= result.created_at <= after -# --------------------------------------------------------------------------- -# Predicate: _no_user_action_since -# --------------------------------------------------------------------------- - - -def test_predicate_passes_when_no_user_messages_after_baseline(): - session = make_session(_USER_ID) - # Two pre-existing messages (indices 0, 1). - session.messages.append(ChatMessage(role="user", content="initial")) - session.messages.append(ChatMessage(role="assistant", content="tool call")) - # Tool result lands at index 2 — this is what the executor appends after - # _execute returns. baseline_index was captured at 2 inside _execute. - session.messages.append(ChatMessage(role="tool", content="{...}")) - assert _no_user_action_since(2)(session) is True - - -def test_predicate_rejects_when_user_message_after_baseline(): - session = make_session(_USER_ID) - session.messages.append(ChatMessage(role="user", content="initial")) - session.messages.append(ChatMessage(role="assistant", content="tool call")) - session.messages.append(ChatMessage(role="tool", content="{...}")) - session.messages.append(ChatMessage(role="user", content="Approved")) - assert _no_user_action_since(2)(session) is False - - -def test_predicate_ignores_assistant_messages_after_baseline(): - """Only user messages count as 'user action' — assistant messages are - just the LLM continuing on its own.""" - session = make_session(_USER_ID) - session.messages.append(ChatMessage(role="user", content="initial")) - session.messages.append(ChatMessage(role="assistant", content="tool call")) - session.messages.append(ChatMessage(role="tool", content="{...}")) - session.messages.append(ChatMessage(role="assistant", content="summary")) - assert _no_user_action_since(2)(session) is True - - -def test_predicate_handles_messages_with_none_sequence(): - """Regression: the previous sequence-based predicate ignored messages - whose sequence was None (which is what cached/in-memory messages have - until they're round-tripped through the DB), causing the auto-approve - to fire after the user had already manually approved. The new - index-based predicate must catch user messages regardless of sequence. - """ - session = make_session(_USER_ID) - session.messages.append(ChatMessage(role="user", content="initial")) - session.messages.append(ChatMessage(role="assistant", content="tool call")) - session.messages.append(ChatMessage(role="tool", content="{...}")) - # Sequence intentionally None — the cache often returns this state. - user_msg = ChatMessage(role="user", content="Approved", sequence=None) - session.messages.append(user_msg) - assert user_msg.sequence is None - assert _no_user_action_since(2)(session) is False - - # --------------------------------------------------------------------------- # needs_build_plan_approval — build-tool approval gate # --------------------------------------------------------------------------- @@ -448,7 +393,7 @@ def test_needs_approval_case_insensitive(): # --------------------------------------------------------------------------- -# Server-side auto-approve task — full flow +# Server-side auto-approve task — uses run_copilot_turn_via_queue # --------------------------------------------------------------------------- @@ -474,98 +419,34 @@ def _stub_redis(): @pytest.mark.asyncio -async def test_auto_approve_fires_when_user_idle(): - """When no user message is appended after the baseline sequence, the - task should append the synthetic approval and enqueue a new turn.""" - session_id = "session-auto-approve-idle" - - captured_message = {} - - async def fake_append_message_if(session_id, message, predicate): - captured_message["msg"] = message - return make_session(_USER_ID) - - fake_enqueue = AsyncMock() - fake_create_session = AsyncMock() +async def test_auto_approve_dispatches_via_queue_helper(): + """_run_auto_approve should delegate to run_copilot_turn_via_queue.""" + fake_dispatch = AsyncMock(return_value=("completed", None)) with ( _stub_redis(), patch( - "backend.copilot.tools.decompose_goal.append_message_if", - new=fake_append_message_if, + "backend.copilot.sdk.session_waiter.run_copilot_turn_via_queue", + new=fake_dispatch, ), patch( "backend.copilot.tools.decompose_goal.AUTO_APPROVE_SERVER_SECONDS", 0, ), - patch( - "backend.copilot.executor.utils.enqueue_copilot_turn", - new=fake_enqueue, - ), - patch( - "backend.copilot.stream_registry.create_session", - new=fake_create_session, - ), ): - await decompose_goal_module._run_auto_approve( - session_id=session_id, - user_id=_USER_ID, - baseline_index=5, - ) + await decompose_goal_module._run_auto_approve("session-idle", _USER_ID) - assert captured_message["msg"].role == "user" - assert captured_message["msg"].content == "Approved. Please build the agent." - fake_create_session.assert_awaited_once() - fake_enqueue.assert_awaited_once() - assert fake_enqueue.await_args is not None - enqueue_kwargs = fake_enqueue.await_args.kwargs - assert enqueue_kwargs["session_id"] == session_id - assert enqueue_kwargs["message"] == "Approved. Please build the agent." - assert enqueue_kwargs["is_user_message"] is True - - -@pytest.mark.asyncio -async def test_auto_approve_skips_when_user_already_acted(): - """If append_message_if returns None (predicate rejected because the - user already sent a message), no turn should be enqueued.""" - fake_append_message_if = AsyncMock(return_value=None) - fake_enqueue = AsyncMock() - fake_create_session = AsyncMock() - - with ( - _stub_redis(), - patch( - "backend.copilot.tools.decompose_goal.append_message_if", - new=fake_append_message_if, - ), - patch( - "backend.copilot.tools.decompose_goal.AUTO_APPROVE_SERVER_SECONDS", - 0, - ), - patch( - "backend.copilot.executor.utils.enqueue_copilot_turn", - new=fake_enqueue, - ), - patch( - "backend.copilot.stream_registry.create_session", - new=fake_create_session, - ), - ): - await decompose_goal_module._run_auto_approve( - session_id="session-acted", - user_id=_USER_ID, - baseline_index=5, - ) - - fake_append_message_if.assert_awaited_once() - fake_enqueue.assert_not_awaited() - fake_create_session.assert_not_awaited() + fake_dispatch.assert_awaited_once() + call_kwargs = fake_dispatch.await_args.kwargs + assert call_kwargs["session_id"] == "session-idle" + assert call_kwargs["message"] == "Approved. Please build the agent." + assert call_kwargs["timeout"] == 0 + assert call_kwargs["tool_name"] == "decompose_goal_auto_approve" @pytest.mark.asyncio async def test_auto_approve_swallows_unexpected_errors(): - """A failure inside the task must never propagate — the worker should - keep running.""" + """A failure inside the task must never propagate.""" async def boom(*args, **kwargs): raise RuntimeError("kaboom") @@ -573,7 +454,7 @@ async def test_auto_approve_swallows_unexpected_errors(): with ( _stub_redis(), patch( - "backend.copilot.tools.decompose_goal.append_message_if", + "backend.copilot.sdk.session_waiter.run_copilot_turn_via_queue", new=boom, ), patch( @@ -581,19 +462,12 @@ async def test_auto_approve_swallows_unexpected_errors(): 0, ), ): - # Should not raise. - await decompose_goal_module._run_auto_approve( - session_id="session-error", - user_id=_USER_ID, - baseline_index=0, - ) + await decompose_goal_module._run_auto_approve("session-error", None) @pytest.mark.asyncio async def test_schedule_auto_approve_creates_task(monkeypatch): - """_schedule_auto_approve should add a task to the tracking set and - auto-remove it on completion. The baseline passed to _run_auto_approve - must be the current message-list length at schedule time.""" + """_schedule_auto_approve should add a task to the tracking dict.""" monkeypatch.setattr(decompose_goal_module, "AUTO_APPROVE_SERVER_SECONDS", 0) fake_run = AsyncMock() monkeypatch.setattr(decompose_goal_module, "_run_auto_approve", fake_run) @@ -604,10 +478,6 @@ async def test_schedule_auto_approve_creates_task(monkeypatch): ) session = make_session(_USER_ID) - # make_session pre-populates 1 message (guide_read). Add 2 more. - session.messages.append(ChatMessage(role="user", content="initial")) - session.messages.append(ChatMessage(role="assistant", content="tool call")) - expected_baseline = len(session.messages) await _REAL_SCHEDULE_AUTO_APPROVE( session_id="session-schedule", @@ -615,12 +485,11 @@ async def test_schedule_auto_approve_creates_task(monkeypatch): session=session, ) - # Wait for the scheduled task to complete. await asyncio.sleep(0) while decompose_goal_module._pending_auto_approvals: await asyncio.sleep(0) - fake_run.assert_awaited_once_with("session-schedule", _USER_ID, expected_baseline) + fake_run.assert_awaited_once_with("session-schedule", _USER_ID) @pytest.mark.asyncio