mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-02-24 03:00:28 -05:00
Compare commits
9 Commits
dev
...
fix/copilo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
11e6fca8c3 | ||
|
|
6e737e0b74 | ||
|
|
5ce002803d | ||
|
|
f8ad8484ee | ||
|
|
b6064d0155 | ||
|
|
76e0c96aa9 | ||
|
|
3364a8e415 | ||
|
|
9f4f2749a4 | ||
|
|
2b0f457985 |
@@ -18,7 +18,7 @@ from backend.copilot.completion_handler import (
|
|||||||
process_operation_success,
|
process_operation_success,
|
||||||
)
|
)
|
||||||
from backend.copilot.config import ChatConfig
|
from backend.copilot.config import ChatConfig
|
||||||
from backend.copilot.executor.utils import enqueue_copilot_task
|
from backend.copilot.executor.utils import enqueue_cancel_task, enqueue_copilot_task
|
||||||
from backend.copilot.model import (
|
from backend.copilot.model import (
|
||||||
ChatMessage,
|
ChatMessage,
|
||||||
ChatSession,
|
ChatSession,
|
||||||
@@ -132,6 +132,14 @@ class ListSessionsResponse(BaseModel):
|
|||||||
total: int
|
total: int
|
||||||
|
|
||||||
|
|
||||||
|
class CancelTaskResponse(BaseModel):
|
||||||
|
"""Response model for the cancel task endpoint."""
|
||||||
|
|
||||||
|
cancelled: bool
|
||||||
|
task_id: str | None = None
|
||||||
|
reason: str | None = None
|
||||||
|
|
||||||
|
|
||||||
class OperationCompleteRequest(BaseModel):
|
class OperationCompleteRequest(BaseModel):
|
||||||
"""Request model for external completion webhook."""
|
"""Request model for external completion webhook."""
|
||||||
|
|
||||||
@@ -314,6 +322,57 @@ async def get_session(
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@router.post(
|
||||||
|
"/sessions/{session_id}/cancel",
|
||||||
|
status_code=200,
|
||||||
|
)
|
||||||
|
async def cancel_session_task(
|
||||||
|
session_id: str,
|
||||||
|
user_id: Annotated[str | None, Depends(auth.get_user_id)],
|
||||||
|
) -> CancelTaskResponse:
|
||||||
|
"""Cancel the active streaming task for a session.
|
||||||
|
|
||||||
|
Publishes a cancel event to the executor via RabbitMQ FANOUT, then
|
||||||
|
polls Redis until the task status flips from ``running`` or a timeout
|
||||||
|
(5 s) is reached. Returns only after the cancellation is confirmed.
|
||||||
|
"""
|
||||||
|
await _validate_and_get_session(session_id, user_id)
|
||||||
|
|
||||||
|
active_task, _ = await stream_registry.get_active_task_for_session(
|
||||||
|
session_id, user_id
|
||||||
|
)
|
||||||
|
if not active_task:
|
||||||
|
return CancelTaskResponse(cancelled=False, reason="no_active_task")
|
||||||
|
|
||||||
|
task_id = active_task.task_id
|
||||||
|
await enqueue_cancel_task(task_id)
|
||||||
|
logger.info(
|
||||||
|
f"[CANCEL] Published cancel for task ...{task_id[-8:]} "
|
||||||
|
f"session ...{session_id[-8:]}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Poll until the executor confirms the task is no longer running.
|
||||||
|
# Keep max_wait below typical reverse-proxy read timeouts.
|
||||||
|
poll_interval = 0.5
|
||||||
|
max_wait = 5.0
|
||||||
|
waited = 0.0
|
||||||
|
while waited < max_wait:
|
||||||
|
await asyncio.sleep(poll_interval)
|
||||||
|
waited += poll_interval
|
||||||
|
task = await stream_registry.get_task(task_id)
|
||||||
|
if task is None or task.status != "running":
|
||||||
|
logger.info(
|
||||||
|
f"[CANCEL] Task ...{task_id[-8:]} confirmed stopped "
|
||||||
|
f"(status={task.status if task else 'gone'}) after {waited:.1f}s"
|
||||||
|
)
|
||||||
|
return CancelTaskResponse(cancelled=True, task_id=task_id)
|
||||||
|
|
||||||
|
logger.warning(f"[CANCEL] Task ...{task_id[-8:]} not confirmed after {max_wait}s")
|
||||||
|
return CancelTaskResponse(
|
||||||
|
cancelled=True, task_id=task_id, reason="cancel_published_not_confirmed"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@router.post(
|
@router.post(
|
||||||
"/sessions/{session_id}/stream",
|
"/sessions/{session_id}/stream",
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -205,3 +205,20 @@ async def enqueue_copilot_task(
|
|||||||
message=entry.model_dump_json(),
|
message=entry.model_dump_json(),
|
||||||
exchange=COPILOT_EXECUTION_EXCHANGE,
|
exchange=COPILOT_EXECUTION_EXCHANGE,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def enqueue_cancel_task(task_id: str) -> None:
|
||||||
|
"""Publish a cancel request for a running CoPilot task.
|
||||||
|
|
||||||
|
Sends a ``CancelCoPilotEvent`` to the FANOUT exchange so all executor
|
||||||
|
pods receive the cancellation signal.
|
||||||
|
"""
|
||||||
|
from backend.util.clients import get_async_copilot_queue
|
||||||
|
|
||||||
|
event = CancelCoPilotEvent(task_id=task_id)
|
||||||
|
queue_client = await get_async_copilot_queue()
|
||||||
|
await queue_client.publish_message(
|
||||||
|
routing_key="", # FANOUT ignores routing key
|
||||||
|
message=event.model_dump_json(),
|
||||||
|
exchange=COPILOT_CANCEL_EXCHANGE,
|
||||||
|
)
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
import {
|
import {
|
||||||
getGetV2ListSessionsQueryKey,
|
getGetV2ListSessionsQueryKey,
|
||||||
|
postV2CancelSessionTask,
|
||||||
useDeleteV2DeleteSession,
|
useDeleteV2DeleteSession,
|
||||||
useGetV2ListSessions,
|
useGetV2ListSessions,
|
||||||
} from "@/app/api/__generated__/endpoints/chat/chat";
|
} from "@/app/api/__generated__/endpoints/chat/chat";
|
||||||
@@ -8,6 +9,7 @@ import { useBreakpoint } from "@/lib/hooks/useBreakpoint";
|
|||||||
import { useSupabase } from "@/lib/supabase/hooks/useSupabase";
|
import { useSupabase } from "@/lib/supabase/hooks/useSupabase";
|
||||||
import { useChat } from "@ai-sdk/react";
|
import { useChat } from "@ai-sdk/react";
|
||||||
import { useQueryClient } from "@tanstack/react-query";
|
import { useQueryClient } from "@tanstack/react-query";
|
||||||
|
import type { UIMessage } from "ai";
|
||||||
import { DefaultChatTransport } from "ai";
|
import { DefaultChatTransport } from "ai";
|
||||||
import { useCallback, useEffect, useMemo, useRef, useState } from "react";
|
import { useCallback, useEffect, useMemo, useRef, useState } from "react";
|
||||||
import { useChatSession } from "./useChatSession";
|
import { useChatSession } from "./useChatSession";
|
||||||
@@ -15,6 +17,24 @@ import { useLongRunningToolPolling } from "./hooks/useLongRunningToolPolling";
|
|||||||
|
|
||||||
const STREAM_START_TIMEOUT_MS = 12_000;
|
const STREAM_START_TIMEOUT_MS = 12_000;
|
||||||
|
|
||||||
|
/** Mark any in-progress tool parts as completed/errored so spinners stop. */
|
||||||
|
function resolveInProgressTools(
|
||||||
|
messages: UIMessage[],
|
||||||
|
outcome: "completed" | "cancelled",
|
||||||
|
): UIMessage[] {
|
||||||
|
return messages.map((msg) => ({
|
||||||
|
...msg,
|
||||||
|
parts: msg.parts.map((part) =>
|
||||||
|
"state" in part &&
|
||||||
|
(part.state === "input-streaming" || part.state === "input-available")
|
||||||
|
? outcome === "cancelled"
|
||||||
|
? { ...part, state: "output-error" as const, errorText: "Cancelled" }
|
||||||
|
: { ...part, state: "output-available" as const, output: "" }
|
||||||
|
: part,
|
||||||
|
),
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
export function useCopilotPage() {
|
export function useCopilotPage() {
|
||||||
const { isUserLoading, isLoggedIn } = useSupabase();
|
const { isUserLoading, isLoggedIn } = useSupabase();
|
||||||
const [isDrawerOpen, setIsDrawerOpen] = useState(false);
|
const [isDrawerOpen, setIsDrawerOpen] = useState(false);
|
||||||
@@ -95,7 +115,7 @@ export function useCopilotPage() {
|
|||||||
const {
|
const {
|
||||||
messages,
|
messages,
|
||||||
sendMessage,
|
sendMessage,
|
||||||
stop,
|
stop: sdkStop,
|
||||||
status,
|
status,
|
||||||
error,
|
error,
|
||||||
setMessages,
|
setMessages,
|
||||||
@@ -108,6 +128,36 @@ export function useCopilotPage() {
|
|||||||
// call resumeStream() manually after hydration + active_stream detection.
|
// call resumeStream() manually after hydration + active_stream detection.
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Wrap AI SDK's stop() to also cancel the backend executor task.
|
||||||
|
// sdkStop() aborts the SSE fetch instantly (UI feedback), then we fire
|
||||||
|
// the cancel API to actually stop the executor and wait for confirmation.
|
||||||
|
async function stop() {
|
||||||
|
sdkStop();
|
||||||
|
setMessages((prev) => resolveInProgressTools(prev, "cancelled"));
|
||||||
|
|
||||||
|
if (!sessionId) return;
|
||||||
|
try {
|
||||||
|
const res = await postV2CancelSessionTask(sessionId);
|
||||||
|
if (
|
||||||
|
res.status === 200 &&
|
||||||
|
"reason" in res.data &&
|
||||||
|
res.data.reason === "cancel_published_not_confirmed"
|
||||||
|
) {
|
||||||
|
toast({
|
||||||
|
title: "Stop may take a moment",
|
||||||
|
description:
|
||||||
|
"The cancel was sent but not yet confirmed. The task should stop shortly.",
|
||||||
|
});
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
toast({
|
||||||
|
title: "Could not stop the task",
|
||||||
|
description: "The task may still be running in the background.",
|
||||||
|
variant: "destructive",
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Abort the stream if the backend doesn't start sending data within 12s.
|
// Abort the stream if the backend doesn't start sending data within 12s.
|
||||||
const stopRef = useRef(stop);
|
const stopRef = useRef(stop);
|
||||||
stopRef.current = stop;
|
stopRef.current = stop;
|
||||||
@@ -152,6 +202,18 @@ export function useCopilotPage() {
|
|||||||
resumeStream();
|
resumeStream();
|
||||||
}, [hasActiveStream, sessionId, hydratedMessages, status, resumeStream]);
|
}, [hasActiveStream, sessionId, hydratedMessages, status, resumeStream]);
|
||||||
|
|
||||||
|
// When the stream finishes, resolve any tool parts still showing spinners.
|
||||||
|
// This can happen if the backend didn't emit StreamToolOutputAvailable for
|
||||||
|
// a tool call before sending StreamFinish (e.g. SDK built-in tools).
|
||||||
|
const prevStatusRef = useRef(status);
|
||||||
|
useEffect(() => {
|
||||||
|
const prev = prevStatusRef.current;
|
||||||
|
prevStatusRef.current = status;
|
||||||
|
if (prev === "streaming" && status === "ready") {
|
||||||
|
setMessages((msgs) => resolveInProgressTools(msgs, "completed"));
|
||||||
|
}
|
||||||
|
}, [status, setMessages]);
|
||||||
|
|
||||||
// Poll session endpoint when a long-running tool (create_agent, edit_agent)
|
// Poll session endpoint when a long-running tool (create_agent, edit_agent)
|
||||||
// is in progress. When the backend completes, the session data will contain
|
// is in progress. When the backend completes, the session data will contain
|
||||||
// the final tool output — this hook detects the change and updates messages.
|
// the final tool output — this hook detects the change and updates messages.
|
||||||
|
|||||||
@@ -1263,6 +1263,44 @@
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
"/api/chat/sessions/{session_id}/cancel": {
|
||||||
|
"post": {
|
||||||
|
"tags": ["v2", "chat", "chat"],
|
||||||
|
"summary": "Cancel Session Task",
|
||||||
|
"description": "Cancel the active streaming task for a session.\n\nPublishes a cancel event to the executor via RabbitMQ FANOUT, then\npolls Redis until the task status flips from ``running`` or a timeout\n(5 s) is reached. Returns only after the cancellation is confirmed.",
|
||||||
|
"operationId": "postV2CancelSessionTask",
|
||||||
|
"security": [{ "HTTPBearerJWT": [] }],
|
||||||
|
"parameters": [
|
||||||
|
{
|
||||||
|
"name": "session_id",
|
||||||
|
"in": "path",
|
||||||
|
"required": true,
|
||||||
|
"schema": { "type": "string", "title": "Session Id" }
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"responses": {
|
||||||
|
"200": {
|
||||||
|
"description": "Successful Response",
|
||||||
|
"content": {
|
||||||
|
"application/json": {
|
||||||
|
"schema": { "$ref": "#/components/schemas/CancelTaskResponse" }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"401": {
|
||||||
|
"$ref": "#/components/responses/HTTP401NotAuthenticatedError"
|
||||||
|
},
|
||||||
|
"422": {
|
||||||
|
"description": "Validation Error",
|
||||||
|
"content": {
|
||||||
|
"application/json": {
|
||||||
|
"schema": { "$ref": "#/components/schemas/HTTPValidationError" }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
"/api/chat/sessions/{session_id}/stream": {
|
"/api/chat/sessions/{session_id}/stream": {
|
||||||
"get": {
|
"get": {
|
||||||
"tags": ["v2", "chat", "chat"],
|
"tags": ["v2", "chat", "chat"],
|
||||||
@@ -7537,6 +7575,23 @@
|
|||||||
"required": ["file"],
|
"required": ["file"],
|
||||||
"title": "Body_postV2Upload submission media"
|
"title": "Body_postV2Upload submission media"
|
||||||
},
|
},
|
||||||
|
"CancelTaskResponse": {
|
||||||
|
"properties": {
|
||||||
|
"cancelled": { "type": "boolean", "title": "Cancelled" },
|
||||||
|
"task_id": {
|
||||||
|
"anyOf": [{ "type": "string" }, { "type": "null" }],
|
||||||
|
"title": "Task Id"
|
||||||
|
},
|
||||||
|
"reason": {
|
||||||
|
"anyOf": [{ "type": "string" }, { "type": "null" }],
|
||||||
|
"title": "Reason"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"type": "object",
|
||||||
|
"required": ["cancelled"],
|
||||||
|
"title": "CancelTaskResponse",
|
||||||
|
"description": "Response model for the cancel task endpoint."
|
||||||
|
},
|
||||||
"ChangelogEntry": {
|
"ChangelogEntry": {
|
||||||
"properties": {
|
"properties": {
|
||||||
"version": { "type": "string", "title": "Version" },
|
"version": { "type": "string", "title": "Version" },
|
||||||
|
|||||||
Reference in New Issue
Block a user