mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-30 03:00:41 -04:00
fix(copilot): use Redis flag for cross-process auto-approve cancellation
The cancel endpoint runs in the AgentServer process while the asyncio auto-approve task lives in the CoPilotExecutor process — separate memory. The in-process dict cancel from the previous commit was a no-op across processes. - cancel_auto_approve now SETs a Redis key with TTL as the primary cancel signal, plus best-effort in-process task.cancel() for single-worker. - _run_auto_approve checks the Redis key before firing. If set, skips. - Tests stub get_redis_async with a fake to avoid real Redis connections. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -735,7 +735,7 @@ async def cancel_auto_approve_task(
|
||||
|
||||
from backend.copilot.tools.decompose_goal import cancel_auto_approve
|
||||
|
||||
cancelled = cancel_auto_approve(session_id)
|
||||
cancelled = await cancel_auto_approve(session_id)
|
||||
return CancelSessionResponse(
|
||||
cancelled=cancelled,
|
||||
reason=None if cancelled else "no_pending_auto_approve",
|
||||
|
||||
@@ -6,6 +6,7 @@ from typing import Any
|
||||
from uuid import uuid4
|
||||
|
||||
from backend.copilot.model import ChatMessage, ChatSession, append_message_if
|
||||
from backend.data.redis_client import get_redis_async
|
||||
|
||||
from .base import BaseTool
|
||||
from .models import (
|
||||
@@ -29,9 +30,14 @@ AUTO_APPROVE_CLIENT_SECONDS = 60
|
||||
AUTO_APPROVE_SERVER_SECONDS = AUTO_APPROVE_CLIENT_SECONDS
|
||||
AUTO_APPROVE_MESSAGE = "Approved. Please build the agent."
|
||||
|
||||
# Pending auto-approve tasks keyed by session_id. The dict allows
|
||||
# cancel_auto_approve() to look up and cancel the task for a specific
|
||||
# session when the user clicks "Modify" in the frontend.
|
||||
# Redis key prefix for cross-process cancel signalling. The cancel
|
||||
# endpoint (AgentServer process) SETs the key; _run_auto_approve
|
||||
# (CoPilotExecutor process) checks it before firing.
|
||||
_CANCEL_KEY_PREFIX = "copilot:cancel_auto_approve:"
|
||||
_CANCEL_KEY_TTL_SECONDS = AUTO_APPROVE_SERVER_SECONDS + 30
|
||||
|
||||
# In-process dict for best-effort cancel when both the cancel call and
|
||||
# the asyncio task happen to live in the same process (single-worker).
|
||||
_pending_auto_approvals: dict[str, asyncio.Task] = {}
|
||||
|
||||
|
||||
@@ -77,6 +83,15 @@ async def _run_auto_approve(
|
||||
try:
|
||||
await asyncio.sleep(AUTO_APPROVE_SERVER_SECONDS)
|
||||
|
||||
# Check the cross-process cancel flag set by cancel_auto_approve().
|
||||
redis = await get_redis_async()
|
||||
if await redis.get(f"{_CANCEL_KEY_PREFIX}{session_id}"):
|
||||
logger.info(
|
||||
"decompose_goal auto-approve skipped (cancelled) for session %s",
|
||||
session_id,
|
||||
)
|
||||
return
|
||||
|
||||
approval = ChatMessage(role="user", content=AUTO_APPROVE_MESSAGE)
|
||||
result = await append_message_if(
|
||||
session_id=session_id,
|
||||
@@ -117,19 +132,35 @@ async def _run_auto_approve(
|
||||
)
|
||||
|
||||
|
||||
def cancel_auto_approve(session_id: str) -> bool:
|
||||
async def cancel_auto_approve(session_id: str) -> bool:
|
||||
"""Cancel the pending auto-approve task for a session.
|
||||
|
||||
Called by the ``/sessions/{session_id}/cancel-auto-approve`` endpoint
|
||||
when the user clicks "Modify" in the build-plan UI. Returns True if a
|
||||
pending task was found and cancelled, False otherwise.
|
||||
when the user clicks "Modify" in the build-plan UI.
|
||||
|
||||
Uses **two** cancellation channels:
|
||||
1. **Redis flag** (cross-process) — the executor checks this before
|
||||
firing. Works even when the cancel endpoint runs in the AgentServer
|
||||
process and the asyncio task lives in the CoPilotExecutor process.
|
||||
2. **In-process task cancel** (best-effort) — if both happen to share
|
||||
the same process, cancels the asyncio task directly.
|
||||
"""
|
||||
redis = await get_redis_async()
|
||||
await redis.set(
|
||||
f"{_CANCEL_KEY_PREFIX}{session_id}",
|
||||
"1",
|
||||
ex=_CANCEL_KEY_TTL_SECONDS,
|
||||
)
|
||||
logger.info(
|
||||
"decompose_goal auto-approve cancel flag set for session %s", session_id
|
||||
)
|
||||
|
||||
# Best-effort in-process cancel (no-op if the task is in another process).
|
||||
task = _pending_auto_approvals.pop(session_id, None)
|
||||
if task is not None and not task.done():
|
||||
task.cancel()
|
||||
logger.info("decompose_goal auto-approve cancelled for session %s", session_id)
|
||||
return True
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def _schedule_auto_approve(
|
||||
@@ -145,8 +176,11 @@ def _schedule_auto_approve(
|
||||
if not session_id:
|
||||
return
|
||||
# Cancel any existing pending approval for this session (e.g. if the
|
||||
# LLM called decompose_goal twice in one turn).
|
||||
cancel_auto_approve(session_id)
|
||||
# LLM called decompose_goal twice in one turn). Best-effort in-process
|
||||
# cancel only — skip the async Redis call here to keep scheduling fast.
|
||||
old_task = _pending_auto_approvals.pop(session_id, None)
|
||||
if old_task is not None and not old_task.done():
|
||||
old_task.cancel()
|
||||
baseline_index = len(session.messages)
|
||||
task = asyncio.create_task(_run_auto_approve(session_id, user_id, baseline_index))
|
||||
_pending_auto_approvals[session_id] = task
|
||||
|
||||
@@ -381,6 +381,24 @@ def test_predicate_handles_messages_with_none_sequence():
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class _FakeRedisNoCancelFlag:
|
||||
"""Stub Redis that reports no cancel flag and ignores writes."""
|
||||
|
||||
async def get(self, key):
|
||||
return None
|
||||
|
||||
async def set(self, key, value, ex=None):
|
||||
pass
|
||||
|
||||
|
||||
def _stub_redis():
|
||||
"""Patch get_redis_async to return a fake Redis (no real connection)."""
|
||||
return patch(
|
||||
"backend.copilot.tools.decompose_goal.get_redis_async",
|
||||
new=AsyncMock(return_value=_FakeRedisNoCancelFlag()),
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_auto_approve_fires_when_user_idle():
|
||||
"""When no user message is appended after the baseline sequence, the
|
||||
@@ -397,6 +415,7 @@ async def test_auto_approve_fires_when_user_idle():
|
||||
fake_create_session = AsyncMock()
|
||||
|
||||
with (
|
||||
_stub_redis(),
|
||||
patch(
|
||||
"backend.copilot.tools.decompose_goal.append_message_if",
|
||||
new=fake_append_message_if,
|
||||
@@ -440,6 +459,7 @@ async def test_auto_approve_skips_when_user_already_acted():
|
||||
fake_create_session = AsyncMock()
|
||||
|
||||
with (
|
||||
_stub_redis(),
|
||||
patch(
|
||||
"backend.copilot.tools.decompose_goal.append_message_if",
|
||||
new=fake_append_message_if,
|
||||
@@ -477,6 +497,7 @@ async def test_auto_approve_swallows_unexpected_errors():
|
||||
raise RuntimeError("kaboom")
|
||||
|
||||
with (
|
||||
_stub_redis(),
|
||||
patch(
|
||||
"backend.copilot.tools.decompose_goal.append_message_if",
|
||||
new=boom,
|
||||
@@ -538,12 +559,26 @@ def test_schedule_auto_approve_no_op_without_session_id():
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_cancel_auto_approve_cancels_pending_task(monkeypatch):
|
||||
"""Calling cancel_auto_approve should cancel the pending task and return True."""
|
||||
async def test_cancel_auto_approve_sets_redis_flag_and_cancels_task(monkeypatch):
|
||||
"""cancel_auto_approve should set a Redis cancel flag AND cancel the
|
||||
in-process task. Returns True always (Redis flag is authoritative)."""
|
||||
monkeypatch.setattr(decompose_goal_module, "AUTO_APPROVE_SERVER_SECONDS", 999)
|
||||
fake_run = AsyncMock()
|
||||
monkeypatch.setattr(decompose_goal_module, "_run_auto_approve", fake_run)
|
||||
|
||||
captured_redis_calls: list[tuple] = []
|
||||
|
||||
class FakeRedis:
|
||||
async def set(self, key, value, ex=None):
|
||||
captured_redis_calls.append(("set", key, value, ex))
|
||||
|
||||
async def get(self, key):
|
||||
return None
|
||||
|
||||
monkeypatch.setattr(
|
||||
decompose_goal_module, "get_redis_async", AsyncMock(return_value=FakeRedis())
|
||||
)
|
||||
|
||||
session = make_session(_USER_ID)
|
||||
_REAL_SCHEDULE_AUTO_APPROVE(
|
||||
session_id="session-cancel-test",
|
||||
@@ -552,11 +587,27 @@ async def test_cancel_auto_approve_cancels_pending_task(monkeypatch):
|
||||
)
|
||||
|
||||
assert "session-cancel-test" in decompose_goal_module._pending_auto_approvals
|
||||
result = cancel_auto_approve("session-cancel-test")
|
||||
result = await cancel_auto_approve("session-cancel-test")
|
||||
assert result is True
|
||||
assert "session-cancel-test" not in decompose_goal_module._pending_auto_approvals
|
||||
assert len(captured_redis_calls) == 1
|
||||
assert captured_redis_calls[0][0] == "set"
|
||||
assert "session-cancel-test" in captured_redis_calls[0][1]
|
||||
|
||||
|
||||
def test_cancel_auto_approve_returns_false_for_unknown_session():
|
||||
"""Cancelling a session with no pending task should return False."""
|
||||
assert cancel_auto_approve("nonexistent-session") is False
|
||||
@pytest.mark.asyncio
|
||||
async def test_cancel_auto_approve_returns_true_even_without_in_process_task(
|
||||
monkeypatch,
|
||||
):
|
||||
"""Even if no in-process task exists (e.g. task is in another process),
|
||||
cancel_auto_approve should still set the Redis flag and return True."""
|
||||
|
||||
class FakeRedis:
|
||||
async def set(self, key, value, ex=None):
|
||||
pass
|
||||
|
||||
monkeypatch.setattr(
|
||||
decompose_goal_module, "get_redis_async", AsyncMock(return_value=FakeRedis())
|
||||
)
|
||||
result = await cancel_auto_approve("nonexistent-session")
|
||||
assert result is True
|
||||
|
||||
Reference in New Issue
Block a user