feat(copilot): wire up stop button to cancel executor tasks

The stop button was completely disconnected — clicking it only aborted the
client-side SSE fetch while the executor kept running indefinitely.

- Add `enqueue_cancel_task()` to publish `CancelCoPilotEvent` to the
  existing RabbitMQ FANOUT exchange that the executor already consumes
- Add `POST /sessions/{session_id}/cancel` endpoint that finds the active
  task, publishes the cancel event, and polls Redis until the task status
  confirms stopped (up to 10s)
- Add Next.js API proxy route for the cancel endpoint
- Wrap the AI SDK's `stop()` to also call the cancel API so the executor
  actually terminates
This commit is contained in:
Zamil Majdy
2026-02-20 01:20:19 +07:00
parent 0b151f64e8
commit 2b0f457985
4 changed files with 124 additions and 2 deletions

View File

@@ -18,7 +18,7 @@ from backend.copilot.completion_handler import (
process_operation_success,
)
from backend.copilot.config import ChatConfig
from backend.copilot.executor.utils import enqueue_copilot_task
from backend.copilot.executor.utils import enqueue_cancel_task, enqueue_copilot_task
from backend.copilot.model import (
ChatMessage,
ChatSession,
@@ -314,6 +314,49 @@ async def get_session(
)
@router.post(
"/sessions/{session_id}/cancel",
status_code=200,
)
async def cancel_session_task(
session_id: str,
user_id: Annotated[str | None, Depends(auth.get_user_id)],
):
"""Cancel the active streaming task for a session.
Publishes a cancel event to the executor via RabbitMQ FANOUT, then
polls Redis until the task status flips from ``running`` or a timeout
(10 s) is reached. Returns only after the cancellation is confirmed.
"""
active_task, _ = await stream_registry.get_active_task_for_session(
session_id, user_id
)
if not active_task:
return {"cancelled": False, "reason": "no_active_task"}
task_id = active_task.task_id
await enqueue_cancel_task(task_id)
logger.info(f"[CANCEL] Published cancel for task {task_id} session {session_id}")
# Poll until the executor confirms the task is no longer running.
poll_interval = 0.5
max_wait = 10.0
waited = 0.0
while waited < max_wait:
await asyncio.sleep(poll_interval)
waited += poll_interval
task = await stream_registry.get_task(task_id)
if task is None or task.status != "running":
logger.info(
f"[CANCEL] Task {task_id} confirmed stopped "
f"(status={task.status if task else 'gone'}) after {waited:.1f}s"
)
return {"cancelled": True, "task_id": task_id}
logger.warning(f"[CANCEL] Task {task_id} still running after {max_wait}s")
return {"cancelled": False, "task_id": task_id, "reason": "timeout"}
@router.post(
"/sessions/{session_id}/stream",
)

View File

@@ -205,3 +205,20 @@ async def enqueue_copilot_task(
message=entry.model_dump_json(),
exchange=COPILOT_EXECUTION_EXCHANGE,
)
async def enqueue_cancel_task(task_id: str) -> None:
"""Publish a cancel request for a running CoPilot task.
Sends a ``CancelCoPilotEvent`` to the FANOUT exchange so all executor
pods receive the cancellation signal.
"""
from backend.util.clients import get_async_copilot_queue
event = CancelCoPilotEvent(task_id=task_id)
queue_client = await get_async_copilot_queue()
await queue_client.publish_message(
routing_key="", # FANOUT ignores routing key
message=event.model_dump_json(),
exchange=COPILOT_CANCEL_EXCHANGE,
)

View File

@@ -95,7 +95,7 @@ export function useCopilotPage() {
const {
messages,
sendMessage,
stop,
stop: sdkStop,
status,
error,
setMessages,
@@ -108,6 +108,21 @@ export function useCopilotPage() {
// call resumeStream() manually after hydration + active_stream detection.
});
// Wrap AI SDK's stop() to also cancel the backend executor task.
// sdkStop() aborts the SSE fetch instantly (UI feedback), then we fire
// the cancel API to actually stop the executor and wait for confirmation.
const stop = useCallback(async () => {
sdkStop();
if (!sessionId) return;
try {
await fetch(`/api/chat/sessions/${sessionId}/cancel`, {
method: "POST",
});
} catch {
// Best-effort — SSE already aborted for instant UI feedback
}
}, [sdkStop, sessionId]);
// Abort the stream if the backend doesn't start sending data within 12s.
const stopRef = useRef(stop);
stopRef.current = stop;

View File

@@ -0,0 +1,47 @@
import { environment } from "@/services/environment";
import { getServerAuthToken } from "@/lib/autogpt-server-api/helpers";
import { NextRequest } from "next/server";
export async function POST(
_request: NextRequest,
{ params }: { params: Promise<{ sessionId: string }> },
) {
const { sessionId } = await params;
try {
const token = await getServerAuthToken();
const backendUrl = environment.getAGPTServerBaseUrl();
const cancelUrl = new URL(
`/api/chat/sessions/${sessionId}/cancel`,
backendUrl,
);
const headers: Record<string, string> = {
"Content-Type": "application/json",
};
if (token) {
headers["Authorization"] = `Bearer ${token}`;
}
const response = await fetch(cancelUrl.toString(), {
method: "POST",
headers,
});
const data = await response.json();
return new Response(JSON.stringify(data), {
status: response.status,
headers: { "Content-Type": "application/json" },
});
} catch (error) {
console.error("Cancel proxy error:", error);
return new Response(
JSON.stringify({
cancelled: false,
reason: "proxy_error",
detail: error instanceof Error ? error.message : String(error),
}),
{ status: 500, headers: { "Content-Type": "application/json" } },
);
}
}