Compare commits

...

9 Commits

Author SHA1 Message Date
Zamil Majdy
11e6fca8c3 fix(copilot): resolve dangling tool spinners when stream finishes
When the backend doesn't emit StreamToolOutputAvailable for all tool
calls before StreamFinish (e.g. SDK built-in tools like WebSearch),
the frontend spinners would spin forever.

Add a useEffect that watches for the streaming→ready transition and
marks any remaining input-available/input-streaming tool parts as
output-available. Extract shared resolveInProgressTools helper used
by both the stop handler (cancelled) and stream-end (completed).
2026-02-20 03:48:20 +07:00
Zamil Majdy
6e737e0b74 style: fix Black formatting on cancel endpoint 2026-02-20 02:46:03 +07:00
Zamil Majdy
5ce002803d fix(copilot): toast when cancel confirmation times out
Check the reason field in the cancel response — if
"cancel_published_not_confirmed", show a non-destructive toast so the
user knows the stop was sent but not yet confirmed by the executor.
2026-02-20 02:31:18 +07:00
Zamil Majdy
f8ad8484ee refactor(copilot): convert stop to plain function declaration
Remove useCallback wrapper per project guidelines — stopRef.current
captures the latest closure on every render regardless.
2026-02-20 02:25:43 +07:00
Zamil Majdy
b6064d0155 fix(copilot): address round-2 PR review and fix tool loading on stop
Backend:
- Add _validate_and_get_session() call to cancel endpoint (404 for
  invalid sessions, consistent with other endpoints)
- Reduce polling max_wait from 10s to 5s (stay below reverse-proxy
  read timeouts)
- Return cancelled=True with reason="cancel_published_not_confirmed"
  on timeout (cancel event IS published, just not yet confirmed)

Frontend:
- Mark in-progress tool parts as output-error on stop so spinners
  clear immediately instead of spinning forever
- Toast on cancel API failure (network error / 5xx)
2026-02-20 02:21:28 +07:00
Zamil Majdy
76e0c96aa9 feat: fix openapi.json 2026-02-20 02:14:07 +07:00
Zamil Majdy
3364a8e415 refactor(copilot): use generated client for cancel API call
Replace raw fetch() with generated postV2CancelSessionTask() and
remove the now-unnecessary dedicated cancel proxy route — the general
/api/proxy handles auth and forwarding.

Toast on cancel failure so the user knows the backend may still be running.
2026-02-20 02:10:10 +07:00
Zamil Majdy
9f4f2749a4 fix(copilot): address PR review comments for cancel endpoint
- Add CancelTaskResponse Pydantic model with typed return annotation
- Handle non-JSON backend responses in cancel proxy route
- Check for "no-token-found" token before forwarding auth header
- Truncate IDs in log messages for consistency
- Add cancel endpoint to openapi.json for frontend codegen
2026-02-20 02:02:14 +07:00
Zamil Majdy
2b0f457985 feat(copilot): wire up stop button to cancel executor tasks
The stop button was completely disconnected — clicking it only aborted the
client-side SSE fetch while the executor kept running indefinitely.

- Add `enqueue_cancel_task()` to publish `CancelCoPilotEvent` to the
  existing RabbitMQ FANOUT exchange that the executor already consumes
- Add `POST /sessions/{session_id}/cancel` endpoint that finds the active
  task, publishes the cancel event, and polls Redis until the task status
  confirms stopped (up to 10s)
- Add Next.js API proxy route for the cancel endpoint
- Wrap the AI SDK's `stop()` to also call the cancel API so the executor
  actually terminates
2026-02-20 01:20:19 +07:00
4 changed files with 195 additions and 2 deletions

View File

@@ -18,7 +18,7 @@ from backend.copilot.completion_handler import (
process_operation_success,
)
from backend.copilot.config import ChatConfig
from backend.copilot.executor.utils import enqueue_copilot_task
from backend.copilot.executor.utils import enqueue_cancel_task, enqueue_copilot_task
from backend.copilot.model import (
ChatMessage,
ChatSession,
@@ -132,6 +132,14 @@ class ListSessionsResponse(BaseModel):
total: int
class CancelTaskResponse(BaseModel):
"""Response model for the cancel task endpoint."""
cancelled: bool
task_id: str | None = None
reason: str | None = None
class OperationCompleteRequest(BaseModel):
"""Request model for external completion webhook."""
@@ -314,6 +322,57 @@ async def get_session(
)
@router.post(
"/sessions/{session_id}/cancel",
status_code=200,
)
async def cancel_session_task(
session_id: str,
user_id: Annotated[str | None, Depends(auth.get_user_id)],
) -> CancelTaskResponse:
"""Cancel the active streaming task for a session.
Publishes a cancel event to the executor via RabbitMQ FANOUT, then
polls Redis until the task status flips from ``running`` or a timeout
(5 s) is reached. Returns only after the cancellation is confirmed.
"""
await _validate_and_get_session(session_id, user_id)
active_task, _ = await stream_registry.get_active_task_for_session(
session_id, user_id
)
if not active_task:
return CancelTaskResponse(cancelled=False, reason="no_active_task")
task_id = active_task.task_id
await enqueue_cancel_task(task_id)
logger.info(
f"[CANCEL] Published cancel for task ...{task_id[-8:]} "
f"session ...{session_id[-8:]}"
)
# Poll until the executor confirms the task is no longer running.
# Keep max_wait below typical reverse-proxy read timeouts.
poll_interval = 0.5
max_wait = 5.0
waited = 0.0
while waited < max_wait:
await asyncio.sleep(poll_interval)
waited += poll_interval
task = await stream_registry.get_task(task_id)
if task is None or task.status != "running":
logger.info(
f"[CANCEL] Task ...{task_id[-8:]} confirmed stopped "
f"(status={task.status if task else 'gone'}) after {waited:.1f}s"
)
return CancelTaskResponse(cancelled=True, task_id=task_id)
logger.warning(f"[CANCEL] Task ...{task_id[-8:]} not confirmed after {max_wait}s")
return CancelTaskResponse(
cancelled=True, task_id=task_id, reason="cancel_published_not_confirmed"
)
@router.post(
"/sessions/{session_id}/stream",
)

View File

@@ -205,3 +205,20 @@ async def enqueue_copilot_task(
message=entry.model_dump_json(),
exchange=COPILOT_EXECUTION_EXCHANGE,
)
async def enqueue_cancel_task(task_id: str) -> None:
"""Publish a cancel request for a running CoPilot task.
Sends a ``CancelCoPilotEvent`` to the FANOUT exchange so all executor
pods receive the cancellation signal.
"""
from backend.util.clients import get_async_copilot_queue
event = CancelCoPilotEvent(task_id=task_id)
queue_client = await get_async_copilot_queue()
await queue_client.publish_message(
routing_key="", # FANOUT ignores routing key
message=event.model_dump_json(),
exchange=COPILOT_CANCEL_EXCHANGE,
)

View File

@@ -1,5 +1,6 @@
import {
getGetV2ListSessionsQueryKey,
postV2CancelSessionTask,
useDeleteV2DeleteSession,
useGetV2ListSessions,
} from "@/app/api/__generated__/endpoints/chat/chat";
@@ -8,6 +9,7 @@ import { useBreakpoint } from "@/lib/hooks/useBreakpoint";
import { useSupabase } from "@/lib/supabase/hooks/useSupabase";
import { useChat } from "@ai-sdk/react";
import { useQueryClient } from "@tanstack/react-query";
import type { UIMessage } from "ai";
import { DefaultChatTransport } from "ai";
import { useCallback, useEffect, useMemo, useRef, useState } from "react";
import { useChatSession } from "./useChatSession";
@@ -15,6 +17,24 @@ import { useLongRunningToolPolling } from "./hooks/useLongRunningToolPolling";
const STREAM_START_TIMEOUT_MS = 12_000;
/** Mark any in-progress tool parts as completed/errored so spinners stop. */
function resolveInProgressTools(
messages: UIMessage[],
outcome: "completed" | "cancelled",
): UIMessage[] {
return messages.map((msg) => ({
...msg,
parts: msg.parts.map((part) =>
"state" in part &&
(part.state === "input-streaming" || part.state === "input-available")
? outcome === "cancelled"
? { ...part, state: "output-error" as const, errorText: "Cancelled" }
: { ...part, state: "output-available" as const, output: "" }
: part,
),
}));
}
export function useCopilotPage() {
const { isUserLoading, isLoggedIn } = useSupabase();
const [isDrawerOpen, setIsDrawerOpen] = useState(false);
@@ -95,7 +115,7 @@ export function useCopilotPage() {
const {
messages,
sendMessage,
stop,
stop: sdkStop,
status,
error,
setMessages,
@@ -108,6 +128,36 @@ export function useCopilotPage() {
// call resumeStream() manually after hydration + active_stream detection.
});
// Wrap AI SDK's stop() to also cancel the backend executor task.
// sdkStop() aborts the SSE fetch instantly (UI feedback), then we fire
// the cancel API to actually stop the executor and wait for confirmation.
async function stop() {
sdkStop();
setMessages((prev) => resolveInProgressTools(prev, "cancelled"));
if (!sessionId) return;
try {
const res = await postV2CancelSessionTask(sessionId);
if (
res.status === 200 &&
"reason" in res.data &&
res.data.reason === "cancel_published_not_confirmed"
) {
toast({
title: "Stop may take a moment",
description:
"The cancel was sent but not yet confirmed. The task should stop shortly.",
});
}
} catch {
toast({
title: "Could not stop the task",
description: "The task may still be running in the background.",
variant: "destructive",
});
}
}
// Abort the stream if the backend doesn't start sending data within 12s.
const stopRef = useRef(stop);
stopRef.current = stop;
@@ -152,6 +202,18 @@ export function useCopilotPage() {
resumeStream();
}, [hasActiveStream, sessionId, hydratedMessages, status, resumeStream]);
// When the stream finishes, resolve any tool parts still showing spinners.
// This can happen if the backend didn't emit StreamToolOutputAvailable for
// a tool call before sending StreamFinish (e.g. SDK built-in tools).
const prevStatusRef = useRef(status);
useEffect(() => {
const prev = prevStatusRef.current;
prevStatusRef.current = status;
if (prev === "streaming" && status === "ready") {
setMessages((msgs) => resolveInProgressTools(msgs, "completed"));
}
}, [status, setMessages]);
// Poll session endpoint when a long-running tool (create_agent, edit_agent)
// is in progress. When the backend completes, the session data will contain
// the final tool output — this hook detects the change and updates messages.

View File

@@ -1263,6 +1263,44 @@
}
}
},
"/api/chat/sessions/{session_id}/cancel": {
"post": {
"tags": ["v2", "chat", "chat"],
"summary": "Cancel Session Task",
"description": "Cancel the active streaming task for a session.\n\nPublishes a cancel event to the executor via RabbitMQ FANOUT, then\npolls Redis until the task status flips from ``running`` or a timeout\n(5 s) is reached. Returns only after the cancellation is confirmed.",
"operationId": "postV2CancelSessionTask",
"security": [{ "HTTPBearerJWT": [] }],
"parameters": [
{
"name": "session_id",
"in": "path",
"required": true,
"schema": { "type": "string", "title": "Session Id" }
}
],
"responses": {
"200": {
"description": "Successful Response",
"content": {
"application/json": {
"schema": { "$ref": "#/components/schemas/CancelTaskResponse" }
}
}
},
"401": {
"$ref": "#/components/responses/HTTP401NotAuthenticatedError"
},
"422": {
"description": "Validation Error",
"content": {
"application/json": {
"schema": { "$ref": "#/components/schemas/HTTPValidationError" }
}
}
}
}
}
},
"/api/chat/sessions/{session_id}/stream": {
"get": {
"tags": ["v2", "chat", "chat"],
@@ -7537,6 +7575,23 @@
"required": ["file"],
"title": "Body_postV2Upload submission media"
},
"CancelTaskResponse": {
"properties": {
"cancelled": { "type": "boolean", "title": "Cancelled" },
"task_id": {
"anyOf": [{ "type": "string" }, { "type": "null" }],
"title": "Task Id"
},
"reason": {
"anyOf": [{ "type": "string" }, { "type": "null" }],
"title": "Reason"
}
},
"type": "object",
"required": ["cancelled"],
"title": "CancelTaskResponse",
"description": "Response model for the cancel task endpoint."
},
"ChangelogEntry": {
"properties": {
"version": { "type": "string", "title": "Version" },