From fb86fcb67dc2f63d787e28d2ce403e8c6418d5e9 Mon Sep 17 00:00:00 2001 From: anvyle Date: Fri, 10 Apr 2026 16:34:46 +0200 Subject: [PATCH] feat(copilot): add server-side auto-approve fallback for decompose_goal The decompose_goal countdown was purely client-side: if the user closed the tab before the timer ran out, the agent never got built. Add a server-side timer that fires the same approval message even when no client is connected. - backend/copilot/model.py: add append_message_if helper that appends a message inside the session lock only if a predicate is satisfied. Used by the auto-approve task to no-op when the user has already acted. - backend/copilot/tools/decompose_goal.py: when the tool returns, schedule a fire-and-forget asyncio task (same _background_tasks pattern as agent_browser.py) that sleeps 90s, re-checks the session, and if no user message has appeared since, appends "Approved. Please build the agent." and enqueues a new copilot turn. Stays in process; restart-resilience is a documented follow-up. - backend/copilot/tools/models.py: expose auto_approve_seconds on TaskDecompositionResponse so the frontend countdown is sourced from the backend instead of a hard-coded constant. - frontend DecomposeGoal.tsx: seed secondsLeft from output.auto_approve_seconds with a 60s fallback for older sessions. - Regenerate openapi.json with the new field. - Tests: 9 new unit tests covering the predicate, the auto-approve flow (idle / user-acted / errors swallowed) and _schedule_auto_approve. Co-Authored-By: Claude Sonnet 4.6 --- .../backend/backend/copilot/model.py | 45 +++- .../backend/copilot/tools/decompose_goal.py | 112 ++++++++- .../copilot/tools/decompose_goal_test.py | 236 +++++++++++++++++- .../backend/backend/copilot/tools/models.py | 8 + .../tools/DecomposeGoal/DecomposeGoal.tsx | 32 ++- .../copilot/tools/DecomposeGoal/helpers.tsx | 1 + .../frontend/src/app/api/openapi.json | 6 + 7 files changed, 424 insertions(+), 16 deletions(-) diff --git a/autogpt_platform/backend/backend/copilot/model.py b/autogpt_platform/backend/backend/copilot/model.py index 9bb7964b93..4fc703063f 100644 --- a/autogpt_platform/backend/backend/copilot/model.py +++ b/autogpt_platform/backend/backend/copilot/model.py @@ -2,7 +2,7 @@ import asyncio import logging import uuid from datetime import UTC, datetime -from typing import Any, Self, cast +from typing import Any, Callable, Self, cast from weakref import WeakValueDictionary from openai.types.chat import ( @@ -677,6 +677,49 @@ async def append_and_save_message(session_id: str, message: ChatMessage) -> Chat return session +async def append_message_if( + session_id: str, + message: ChatMessage, + predicate: Callable[["ChatSession"], bool], +) -> "ChatSession | None": + """Atomically append a message iff ``predicate(session)`` returns True. + + Used by fire-and-forget tasks that need to no-op if the session state + has moved on while they were waiting (e.g. the decompose_goal server-side + auto-approve timer: skip the approval if the user has already sent a + message). The predicate runs inside the session lock, so the check and + the append are one atomic operation — no race with concurrent appends. + + Returns the updated session on append, or ``None`` if the predicate + rejected, the session no longer exists, or the append failed. + """ + lock = await _get_session_lock(session_id) + + async with lock: + session = await get_chat_session(session_id) + if session is None or not predicate(session): + return None + + session.messages.append(message) + existing_message_count = await chat_db().get_next_sequence(session_id) + + try: + await _save_session_to_db(session, existing_message_count) + except Exception as e: + logger.error( + f"append_message_if: failed to persist message to " + f"session {session_id}: {e}" + ) + return None + + try: + await cache_chat_session(session) + except Exception as e: + logger.warning(f"Cache write failed for session {session_id}: {e}") + + return session + + async def create_chat_session(user_id: str, *, dry_run: bool) -> ChatSession: """Create a new chat session and persist it. diff --git a/autogpt_platform/backend/backend/copilot/tools/decompose_goal.py b/autogpt_platform/backend/backend/copilot/tools/decompose_goal.py index 525ca1448e..8c143c928d 100644 --- a/autogpt_platform/backend/backend/copilot/tools/decompose_goal.py +++ b/autogpt_platform/backend/backend/copilot/tools/decompose_goal.py @@ -1,9 +1,11 @@ """DecomposeGoalTool - Breaks agent-building goals into sub-instructions.""" +import asyncio import logging from typing import Any +from uuid import uuid4 -from backend.copilot.model import ChatSession +from backend.copilot.model import ChatMessage, ChatSession, append_message_if from .base import BaseTool from .models import ( @@ -20,6 +22,111 @@ MAX_STEPS = 8 DEFAULT_ACTION = "add_block" VALID_ACTIONS = {"add_block", "connect_blocks", "configure", "add_input", "add_output"} +# Auto-approve countdown — single source of truth for both client and server. +# The frontend reads ``auto_approve_seconds`` from the tool response and runs +# the visible countdown. The server fallback runs slightly longer to absorb +# network latency / SSE round-trip when the client also sends "Approved". +AUTO_APPROVE_CLIENT_SECONDS = 60 +AUTO_APPROVE_SERVER_GRACE_SECONDS = 30 +AUTO_APPROVE_SERVER_SECONDS = ( + AUTO_APPROVE_CLIENT_SECONDS + AUTO_APPROVE_SERVER_GRACE_SECONDS +) +AUTO_APPROVE_MESSAGE = "Approved. Please build the agent." + +# Fire-and-forget tasks held to keep them alive and self-clean on completion. +# Same pattern as ``backend/copilot/tools/agent_browser.py``. +_auto_approve_tasks: set[asyncio.Task] = set() + + +def _no_user_action_since(baseline_sequence: int): + """Predicate: returns True iff no user message has been appended after + the message at ``baseline_sequence``.""" + + def _check(session: ChatSession) -> bool: + for m in session.messages: + if m.role == "user" and (m.sequence or 0) > baseline_sequence: + return False + return True + + return _check + + +async def _run_auto_approve( + session_id: str, + user_id: str | None, + baseline_sequence: int, +) -> None: + """Wait the server-side timeout and inject a synthetic approval if the + user has not acted in the meantime. + + Limitation: this lives in the executor process; if the worker restarts + during the wait, the pending approval is lost (the user falls back to + manual approve). Restart-resilience would need a Redis-backed scheduler. + + Modify-mode caveat: clicking "Modify" stops the *client* timer, not this + one. Users have ``AUTO_APPROVE_SERVER_SECONDS`` total to finish editing + and click Approve, otherwise the server fires the default approval. A + follow-up should add an explicit cancel endpoint. + """ + try: + await asyncio.sleep(AUTO_APPROVE_SERVER_SECONDS) + + approval = ChatMessage(role="user", content=AUTO_APPROVE_MESSAGE) + result = await append_message_if( + session_id=session_id, + message=approval, + predicate=_no_user_action_since(baseline_sequence), + ) + if result is None: + # User already acted (or the session is gone) — nothing to do. + return + + # Local imports avoid a circular dependency between this module and + # the executor / API stream registry packages. + from backend.copilot import stream_registry + from backend.copilot.executor.utils import enqueue_copilot_turn + + turn_id = str(uuid4()) + await stream_registry.create_session( + session_id=session_id, + user_id=user_id or "", + tool_call_id="chat_stream", + tool_name="chat", + turn_id=turn_id, + ) + await enqueue_copilot_turn( + session_id=session_id, + user_id=user_id, + message=AUTO_APPROVE_MESSAGE, + turn_id=turn_id, + is_user_message=True, + ) + logger.info("decompose_goal auto-approve fired for session %s", session_id) + except asyncio.CancelledError: + raise + except Exception: + logger.exception( + "decompose_goal auto-approve task failed for session %s", + session_id, + ) + + +def _schedule_auto_approve( + session_id: str | None, user_id: str | None, session: ChatSession +) -> None: + """Schedule the fire-and-forget auto-approve task for this session.""" + if not session_id: + return + baseline_sequence = max( + (m.sequence or 0 for m in session.messages), + default=0, + ) + task = asyncio.create_task( + _run_auto_approve(session_id, user_id, baseline_sequence) + ) + _auto_approve_tasks.add(task) + task.add_done_callback(_auto_approve_tasks.discard) + class DecomposeGoalTool(BaseTool): """Tool for decomposing an agent goal into sub-instructions.""" @@ -135,11 +242,14 @@ class DecomposeGoalTool(BaseTool): ) ) + _schedule_auto_approve(session_id, user_id, session) + return TaskDecompositionResponse( message=f"Here's the plan to build your agent ({len(decomposition_steps)} steps):", goal=goal, steps=decomposition_steps, step_count=len(decomposition_steps), requires_approval=True, + auto_approve_seconds=AUTO_APPROVE_CLIENT_SECONDS, session_id=session_id, ) diff --git a/autogpt_platform/backend/backend/copilot/tools/decompose_goal_test.py b/autogpt_platform/backend/backend/copilot/tools/decompose_goal_test.py index 0994eac13b..50984bc58f 100644 --- a/autogpt_platform/backend/backend/copilot/tools/decompose_goal_test.py +++ b/autogpt_platform/backend/backend/copilot/tools/decompose_goal_test.py @@ -1,11 +1,26 @@ """Unit tests for DecomposeGoalTool.""" +import asyncio +from unittest.mock import AsyncMock, patch + import pytest +from backend.copilot.model import ChatMessage + +from . import decompose_goal as decompose_goal_module from ._test_data import make_session -from .decompose_goal import DEFAULT_ACTION, MAX_STEPS, DecomposeGoalTool +from .decompose_goal import ( + AUTO_APPROVE_CLIENT_SECONDS, + DEFAULT_ACTION, + MAX_STEPS, + DecomposeGoalTool, + _no_user_action_since, +) from .models import ErrorResponse, TaskDecompositionResponse +# Captured before the autouse fixture stubs the real scheduler. +_REAL_SCHEDULE_AUTO_APPROVE = decompose_goal_module._schedule_auto_approve + _USER_ID = "test-user-decompose-goal" _VALID_STEPS = [ @@ -19,6 +34,18 @@ _VALID_STEPS = [ ] +@pytest.fixture(autouse=True) +def _stub_auto_approve_scheduler(): + """The existing happy-path tests don't have a database; stub the + fire-and-forget scheduler so they don't kick off real timers that try to + hit Redis/Postgres. Tests that exercise auto-approve override this with + their own patches inside the test body.""" + with patch.object( + decompose_goal_module, "_schedule_auto_approve", lambda *a, **kw: None + ): + yield + + @pytest.fixture() def tool() -> DecomposeGoalTool: return DecomposeGoalTool() @@ -253,3 +280,210 @@ async def test_step_ids_are_sequential(tool: DecomposeGoalTool, session): assert isinstance(result, TaskDecompositionResponse) for i, step in enumerate(result.steps): assert step.step_id == f"step_{i + 1}" + + +# --------------------------------------------------------------------------- +# auto_approve_seconds field +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_response_includes_auto_approve_seconds(tool: DecomposeGoalTool, session): + """The response carries the countdown so the frontend has a single + source of truth instead of a hard-coded constant.""" + result = await tool._execute( + user_id=_USER_ID, + session=session, + goal="Build agent", + steps=_VALID_STEPS, + ) + assert isinstance(result, TaskDecompositionResponse) + assert result.auto_approve_seconds == AUTO_APPROVE_CLIENT_SECONDS + + +# --------------------------------------------------------------------------- +# Predicate: _no_user_action_since +# --------------------------------------------------------------------------- + + +def test_predicate_passes_when_no_user_messages_after_baseline(): + session = make_session(_USER_ID) + session.messages.append( + ChatMessage(role="assistant", content="tool call", sequence=5) + ) + assert _no_user_action_since(5)(session) is True + + +def test_predicate_rejects_when_user_message_after_baseline(): + session = make_session(_USER_ID) + session.messages.append( + ChatMessage(role="assistant", content="tool call", sequence=5) + ) + session.messages.append( + ChatMessage(role="user", content="user replied", sequence=6) + ) + assert _no_user_action_since(5)(session) is False + + +def test_predicate_ignores_assistant_messages_after_baseline(): + """Only user messages count as 'user action' — assistant messages are + just the LLM continuing on its own.""" + session = make_session(_USER_ID) + session.messages.append( + ChatMessage(role="assistant", content="more stuff", sequence=6) + ) + assert _no_user_action_since(5)(session) is True + + +# --------------------------------------------------------------------------- +# Server-side auto-approve task — full flow +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_auto_approve_fires_when_user_idle(): + """When no user message is appended after the baseline sequence, the + task should append the synthetic approval and enqueue a new turn.""" + session_id = "session-auto-approve-idle" + + captured_message = {} + + async def fake_append_message_if(session_id, message, predicate): + captured_message["msg"] = message + return make_session(_USER_ID) + + fake_enqueue = AsyncMock() + fake_create_session = AsyncMock() + + with ( + patch( + "backend.copilot.tools.decompose_goal.append_message_if", + new=fake_append_message_if, + ), + patch( + "backend.copilot.tools.decompose_goal.AUTO_APPROVE_SERVER_SECONDS", + 0, + ), + patch( + "backend.copilot.executor.utils.enqueue_copilot_turn", + new=fake_enqueue, + ), + patch( + "backend.copilot.stream_registry.create_session", + new=fake_create_session, + ), + ): + await decompose_goal_module._run_auto_approve( + session_id=session_id, + user_id=_USER_ID, + baseline_sequence=5, + ) + + assert captured_message["msg"].role == "user" + assert captured_message["msg"].content == "Approved. Please build the agent." + fake_create_session.assert_awaited_once() + fake_enqueue.assert_awaited_once() + assert fake_enqueue.await_args is not None + enqueue_kwargs = fake_enqueue.await_args.kwargs + assert enqueue_kwargs["session_id"] == session_id + assert enqueue_kwargs["message"] == "Approved. Please build the agent." + assert enqueue_kwargs["is_user_message"] is True + + +@pytest.mark.asyncio +async def test_auto_approve_skips_when_user_already_acted(): + """If append_message_if returns None (predicate rejected because the + user already sent a message), no turn should be enqueued.""" + fake_append_message_if = AsyncMock(return_value=None) + fake_enqueue = AsyncMock() + fake_create_session = AsyncMock() + + with ( + patch( + "backend.copilot.tools.decompose_goal.append_message_if", + new=fake_append_message_if, + ), + patch( + "backend.copilot.tools.decompose_goal.AUTO_APPROVE_SERVER_SECONDS", + 0, + ), + patch( + "backend.copilot.executor.utils.enqueue_copilot_turn", + new=fake_enqueue, + ), + patch( + "backend.copilot.stream_registry.create_session", + new=fake_create_session, + ), + ): + await decompose_goal_module._run_auto_approve( + session_id="session-acted", + user_id=_USER_ID, + baseline_sequence=5, + ) + + fake_append_message_if.assert_awaited_once() + fake_enqueue.assert_not_awaited() + fake_create_session.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_auto_approve_swallows_unexpected_errors(): + """A failure inside the task must never propagate — the worker should + keep running.""" + + async def boom(*args, **kwargs): + raise RuntimeError("kaboom") + + with ( + patch( + "backend.copilot.tools.decompose_goal.append_message_if", + new=boom, + ), + patch( + "backend.copilot.tools.decompose_goal.AUTO_APPROVE_SERVER_SECONDS", + 0, + ), + ): + # Should not raise. + await decompose_goal_module._run_auto_approve( + session_id="session-error", + user_id=_USER_ID, + baseline_sequence=0, + ) + + +@pytest.mark.asyncio +async def test_schedule_auto_approve_creates_task(monkeypatch): + """_schedule_auto_approve should add a task to the tracking set and + auto-remove it on completion.""" + monkeypatch.setattr(decompose_goal_module, "AUTO_APPROVE_SERVER_SECONDS", 0) + fake_run = AsyncMock() + monkeypatch.setattr(decompose_goal_module, "_run_auto_approve", fake_run) + + session = make_session(_USER_ID) + session.messages.append( + ChatMessage(role="assistant", content="tool call", sequence=3) + ) + + _REAL_SCHEDULE_AUTO_APPROVE( + session_id="session-schedule", + user_id=_USER_ID, + session=session, + ) + + # Wait for the scheduled task to complete. + await asyncio.sleep(0) + while decompose_goal_module._auto_approve_tasks: + await asyncio.sleep(0) + + fake_run.assert_awaited_once_with("session-schedule", _USER_ID, 3) + + +def test_schedule_auto_approve_no_op_without_session_id(): + """Empty session id should be a no-op (defensive).""" + session = make_session(_USER_ID) + decompose_goal_module._schedule_auto_approve( + session_id=None, user_id=_USER_ID, session=session + ) + assert len(decompose_goal_module._auto_approve_tasks) == 0 diff --git a/autogpt_platform/backend/backend/copilot/tools/models.py b/autogpt_platform/backend/backend/copilot/tools/models.py index 63d63d3965..e9eb7ca4ed 100644 --- a/autogpt_platform/backend/backend/copilot/tools/models.py +++ b/autogpt_platform/backend/backend/copilot/tools/models.py @@ -729,6 +729,14 @@ class TaskDecompositionResponse(ToolResponseBase): default=0, description="Number of steps (auto-derived from steps list)" ) requires_approval: bool = True + auto_approve_seconds: int = Field( + default=60, + description=( + "Seconds the client should count down before auto-approving. " + "Kept in sync with the server-side fallback timer, which runs a " + "grace period longer to absorb network latency." + ), + ) @model_validator(mode="after") def sync_step_count(self) -> "TaskDecompositionResponse": diff --git a/autogpt_platform/frontend/src/app/(platform)/copilot/tools/DecomposeGoal/DecomposeGoal.tsx b/autogpt_platform/frontend/src/app/(platform)/copilot/tools/DecomposeGoal/DecomposeGoal.tsx index 74082a8277..f257a16023 100644 --- a/autogpt_platform/frontend/src/app/(platform)/copilot/tools/DecomposeGoal/DecomposeGoal.tsx +++ b/autogpt_platform/frontend/src/app/(platform)/copilot/tools/DecomposeGoal/DecomposeGoal.tsx @@ -27,7 +27,9 @@ import { ToolIcon, } from "./helpers"; -const COUNTDOWN_SECONDS = 60; +// Fallback used only if the backend response omits auto_approve_seconds +// (older sessions). The authoritative value comes from the tool output. +const FALLBACK_COUNTDOWN_SECONDS = 60; const RADIUS = 15; const CIRCUMFERENCE = 2 * Math.PI * RADIUS; @@ -62,7 +64,13 @@ export function DecomposeGoalTool({ part, isLastMessage }: Props) { isDecompositionOutput(output) && output.requires_approval; - const [secondsLeft, setSecondsLeft] = useState(COUNTDOWN_SECONDS); + // Authoritative countdown comes from the backend tool response so the + // server-side fallback timer and the client are guaranteed to agree. + const countdownSeconds = + (output && isDecompositionOutput(output) && output.auto_approve_seconds) || + FALLBACK_COUNTDOWN_SECONDS; + + const [secondsLeft, setSecondsLeft] = useState(countdownSeconds); // timerActive becomes false when the user clicks Modify — stops countdown and auto-approve. const [timerActive, setTimerActive] = useState(true); const [isEditing, setIsEditing] = useState(false); @@ -153,7 +161,7 @@ export function DecomposeGoalTool({ part, isLastMessage }: Props) { } }, [secondsLeft, timerActive, showActions]); // approve reads refs only — safe to omit - const progress = secondsLeft / COUNTDOWN_SECONDS; + const progress = secondsLeft / countdownSeconds; const dashOffset = CIRCUMFERENCE * (1 - progress); const stepCount = isEditing ? editableSteps.length @@ -263,10 +271,10 @@ export function DecomposeGoalTool({ part, isLastMessage }: Props) { ) : ( <> - {/* Timer button — same ghost style as Modify, ring wraps the number inline */} - - | -