From b2dab8afad90c8bf2b6954a141aaa1ff5ad895ca Mon Sep 17 00:00:00 2001 From: anvyle Date: Tue, 14 Apr 2026 20:34:58 +0200 Subject: [PATCH] fix(copilot): use Redis flag for cross-process auto-approve cancellation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The cancel endpoint runs in the AgentServer process while the asyncio auto-approve task lives in the CoPilotExecutor process — separate memory. The in-process dict cancel from the previous commit was a no-op across processes. - cancel_auto_approve now SETs a Redis key with TTL as the primary cancel signal, plus best-effort in-process task.cancel() for single-worker. - _run_auto_approve checks the Redis key before firing. If set, skips. - Tests stub get_redis_async with a fake to avoid real Redis connections. Co-Authored-By: Claude Sonnet 4.6 --- .../backend/api/features/chat/routes.py | 2 +- .../backend/copilot/tools/decompose_goal.py | 56 +++++++++++++---- .../copilot/tools/decompose_goal_test.py | 63 +++++++++++++++++-- 3 files changed, 103 insertions(+), 18 deletions(-) diff --git a/autogpt_platform/backend/backend/api/features/chat/routes.py b/autogpt_platform/backend/backend/api/features/chat/routes.py index bfc9d45bfe..28507bf5fa 100644 --- a/autogpt_platform/backend/backend/api/features/chat/routes.py +++ b/autogpt_platform/backend/backend/api/features/chat/routes.py @@ -735,7 +735,7 @@ async def cancel_auto_approve_task( from backend.copilot.tools.decompose_goal import cancel_auto_approve - cancelled = cancel_auto_approve(session_id) + cancelled = await cancel_auto_approve(session_id) return CancelSessionResponse( cancelled=cancelled, reason=None if cancelled else "no_pending_auto_approve", diff --git a/autogpt_platform/backend/backend/copilot/tools/decompose_goal.py b/autogpt_platform/backend/backend/copilot/tools/decompose_goal.py index 2ac82fc9b8..185cedacee 100644 --- a/autogpt_platform/backend/backend/copilot/tools/decompose_goal.py +++ b/autogpt_platform/backend/backend/copilot/tools/decompose_goal.py @@ -6,6 +6,7 @@ from typing import Any from uuid import uuid4 from backend.copilot.model import ChatMessage, ChatSession, append_message_if +from backend.data.redis_client import get_redis_async from .base import BaseTool from .models import ( @@ -29,9 +30,14 @@ AUTO_APPROVE_CLIENT_SECONDS = 60 AUTO_APPROVE_SERVER_SECONDS = AUTO_APPROVE_CLIENT_SECONDS AUTO_APPROVE_MESSAGE = "Approved. Please build the agent." -# Pending auto-approve tasks keyed by session_id. The dict allows -# cancel_auto_approve() to look up and cancel the task for a specific -# session when the user clicks "Modify" in the frontend. +# Redis key prefix for cross-process cancel signalling. The cancel +# endpoint (AgentServer process) SETs the key; _run_auto_approve +# (CoPilotExecutor process) checks it before firing. +_CANCEL_KEY_PREFIX = "copilot:cancel_auto_approve:" +_CANCEL_KEY_TTL_SECONDS = AUTO_APPROVE_SERVER_SECONDS + 30 + +# In-process dict for best-effort cancel when both the cancel call and +# the asyncio task happen to live in the same process (single-worker). _pending_auto_approvals: dict[str, asyncio.Task] = {} @@ -77,6 +83,15 @@ async def _run_auto_approve( try: await asyncio.sleep(AUTO_APPROVE_SERVER_SECONDS) + # Check the cross-process cancel flag set by cancel_auto_approve(). + redis = await get_redis_async() + if await redis.get(f"{_CANCEL_KEY_PREFIX}{session_id}"): + logger.info( + "decompose_goal auto-approve skipped (cancelled) for session %s", + session_id, + ) + return + approval = ChatMessage(role="user", content=AUTO_APPROVE_MESSAGE) result = await append_message_if( session_id=session_id, @@ -117,19 +132,35 @@ async def _run_auto_approve( ) -def cancel_auto_approve(session_id: str) -> bool: +async def cancel_auto_approve(session_id: str) -> bool: """Cancel the pending auto-approve task for a session. Called by the ``/sessions/{session_id}/cancel-auto-approve`` endpoint - when the user clicks "Modify" in the build-plan UI. Returns True if a - pending task was found and cancelled, False otherwise. + when the user clicks "Modify" in the build-plan UI. + + Uses **two** cancellation channels: + 1. **Redis flag** (cross-process) — the executor checks this before + firing. Works even when the cancel endpoint runs in the AgentServer + process and the asyncio task lives in the CoPilotExecutor process. + 2. **In-process task cancel** (best-effort) — if both happen to share + the same process, cancels the asyncio task directly. """ + redis = await get_redis_async() + await redis.set( + f"{_CANCEL_KEY_PREFIX}{session_id}", + "1", + ex=_CANCEL_KEY_TTL_SECONDS, + ) + logger.info( + "decompose_goal auto-approve cancel flag set for session %s", session_id + ) + + # Best-effort in-process cancel (no-op if the task is in another process). task = _pending_auto_approvals.pop(session_id, None) if task is not None and not task.done(): task.cancel() - logger.info("decompose_goal auto-approve cancelled for session %s", session_id) - return True - return False + + return True def _schedule_auto_approve( @@ -145,8 +176,11 @@ def _schedule_auto_approve( if not session_id: return # Cancel any existing pending approval for this session (e.g. if the - # LLM called decompose_goal twice in one turn). - cancel_auto_approve(session_id) + # LLM called decompose_goal twice in one turn). Best-effort in-process + # cancel only — skip the async Redis call here to keep scheduling fast. + old_task = _pending_auto_approvals.pop(session_id, None) + if old_task is not None and not old_task.done(): + old_task.cancel() baseline_index = len(session.messages) task = asyncio.create_task(_run_auto_approve(session_id, user_id, baseline_index)) _pending_auto_approvals[session_id] = task diff --git a/autogpt_platform/backend/backend/copilot/tools/decompose_goal_test.py b/autogpt_platform/backend/backend/copilot/tools/decompose_goal_test.py index 1498f78104..514114b43c 100644 --- a/autogpt_platform/backend/backend/copilot/tools/decompose_goal_test.py +++ b/autogpt_platform/backend/backend/copilot/tools/decompose_goal_test.py @@ -381,6 +381,24 @@ def test_predicate_handles_messages_with_none_sequence(): # --------------------------------------------------------------------------- +class _FakeRedisNoCancelFlag: + """Stub Redis that reports no cancel flag and ignores writes.""" + + async def get(self, key): + return None + + async def set(self, key, value, ex=None): + pass + + +def _stub_redis(): + """Patch get_redis_async to return a fake Redis (no real connection).""" + return patch( + "backend.copilot.tools.decompose_goal.get_redis_async", + new=AsyncMock(return_value=_FakeRedisNoCancelFlag()), + ) + + @pytest.mark.asyncio async def test_auto_approve_fires_when_user_idle(): """When no user message is appended after the baseline sequence, the @@ -397,6 +415,7 @@ async def test_auto_approve_fires_when_user_idle(): fake_create_session = AsyncMock() with ( + _stub_redis(), patch( "backend.copilot.tools.decompose_goal.append_message_if", new=fake_append_message_if, @@ -440,6 +459,7 @@ async def test_auto_approve_skips_when_user_already_acted(): fake_create_session = AsyncMock() with ( + _stub_redis(), patch( "backend.copilot.tools.decompose_goal.append_message_if", new=fake_append_message_if, @@ -477,6 +497,7 @@ async def test_auto_approve_swallows_unexpected_errors(): raise RuntimeError("kaboom") with ( + _stub_redis(), patch( "backend.copilot.tools.decompose_goal.append_message_if", new=boom, @@ -538,12 +559,26 @@ def test_schedule_auto_approve_no_op_without_session_id(): @pytest.mark.asyncio -async def test_cancel_auto_approve_cancels_pending_task(monkeypatch): - """Calling cancel_auto_approve should cancel the pending task and return True.""" +async def test_cancel_auto_approve_sets_redis_flag_and_cancels_task(monkeypatch): + """cancel_auto_approve should set a Redis cancel flag AND cancel the + in-process task. Returns True always (Redis flag is authoritative).""" monkeypatch.setattr(decompose_goal_module, "AUTO_APPROVE_SERVER_SECONDS", 999) fake_run = AsyncMock() monkeypatch.setattr(decompose_goal_module, "_run_auto_approve", fake_run) + captured_redis_calls: list[tuple] = [] + + class FakeRedis: + async def set(self, key, value, ex=None): + captured_redis_calls.append(("set", key, value, ex)) + + async def get(self, key): + return None + + monkeypatch.setattr( + decompose_goal_module, "get_redis_async", AsyncMock(return_value=FakeRedis()) + ) + session = make_session(_USER_ID) _REAL_SCHEDULE_AUTO_APPROVE( session_id="session-cancel-test", @@ -552,11 +587,27 @@ async def test_cancel_auto_approve_cancels_pending_task(monkeypatch): ) assert "session-cancel-test" in decompose_goal_module._pending_auto_approvals - result = cancel_auto_approve("session-cancel-test") + result = await cancel_auto_approve("session-cancel-test") assert result is True assert "session-cancel-test" not in decompose_goal_module._pending_auto_approvals + assert len(captured_redis_calls) == 1 + assert captured_redis_calls[0][0] == "set" + assert "session-cancel-test" in captured_redis_calls[0][1] -def test_cancel_auto_approve_returns_false_for_unknown_session(): - """Cancelling a session with no pending task should return False.""" - assert cancel_auto_approve("nonexistent-session") is False +@pytest.mark.asyncio +async def test_cancel_auto_approve_returns_true_even_without_in_process_task( + monkeypatch, +): + """Even if no in-process task exists (e.g. task is in another process), + cancel_auto_approve should still set the Redis flag and return True.""" + + class FakeRedis: + async def set(self, key, value, ex=None): + pass + + monkeypatch.setattr( + decompose_goal_module, "get_redis_async", AsyncMock(return_value=FakeRedis()) + ) + result = await cancel_auto_approve("nonexistent-session") + assert result is True