fix(platform/copilot): bypass Vercel SSE proxy, refactor hook architecture (#12254)

## Summary

Reliability, architecture, and UX improvements for the CoPilot SSE
streaming pipeline.

### Frontend

- **SSE proxy bypass**: Connect directly to the Python backend for SSE
streams, avoiding the Next.js serverless proxy and its 800s Vercel
function timeout ceiling
- **Hook refactor**: Decompose the 490-line `useCopilotPage` monolith
into focused domain modules:
- `helpers.ts` — pure functions (`deduplicateMessages`,
`resolveInProgressTools`)
- `store.ts` — Zustand store for shared UI state (`sessionToDelete`,
drawer open/close)
- `useCopilotStream.ts` — SSE transport, `useChat` wrapper,
reconnect/resume logic, stop+cancel
  - `useCopilotPage.ts` — thin orchestrator (~160 lines)
- **ChatMessagesContainer refactor**: Split 525-line monolith into
sub-components:
  - `helpers.ts` — pure text parsing (markers, workspace URLs)
- `components/ThinkingIndicator.tsx` — ScaleLoader animation + cycling
phrases with pulse
- `components/MessagePartRenderer.tsx` — tool dispatch switch +
workspace media
- **Stop UX fixes**:
- Guard `isReconnecting` and resume effect with `isUserStoppingRef` so
the input unlocks immediately after explicit stop (previously stuck
until page refresh)
- Inject cancellation marker locally in `stop()` so "You manually
stopped this chat" shows instantly
- **Thinking indicator polish**: Replace MorphingBlob SVG with
ScaleLoader (16px), fix initial dark circle flash via
`animation-fill-mode: backwards`, smooth `animate-pulse` text instead of
shimmer gradient
- **ChatSidebar consolidation**: Reads `sessionToDelete` from Zustand
store instead of duplicating delete state/mutation locally
- **Auth error handling**: `getAuthHeaders()` throws on failure instead
of silently returning empty headers; 401 errors show user-facing toast
- **Stale closure fix**: Use refs for reconnect guards to avoid stale
closures during rapid reconnect cycles
- **Session switch resume**: Clear `hasResumedRef` on session switch so
returning to a session with an active stream auto-reconnects
- **Target session cache invalidation**: Invalidate the target session's
React Query cache on switch so `active_stream` is accurate for resume
- **Dedup hardening**: Content-fingerprint dedup resets on non-assistant
messages, preventing legitimate repeated responses from being dropped
- **Marker prefixes**: Hex-suffixed markers (`[__COPILOT_ERROR_f7a1__]`)
to prevent LLM false-positives
- **Code style**: Remove unnecessary `useCallback` wrappers per project
convention, replace unsafe `as` cast with runtime type guard

### Backend (minimal)

- **Faster heartbeat**: 10s → 3s interval to keep SSE alive through
proxies/LBs
- **Faster stall detection**: SSE subscriber queue timeout 30s → 10s
- **Marker prefixes**: Matching hex-suffixed prefixes for error/system
markers

## Test plan

- [ ] Verify SSE streams connect directly to backend (no Next.js proxy
in network tab)
- [ ] Verify reconnect works on transient disconnects (up to 3 attempts
with backoff)
- [ ] Verify auth failure shows user-facing toast
- [ ] Verify switching sessions and switching back shows messages and
resumes active stream
- [ ] Verify deleting a chat from sidebar works (shared Zustand state)
- [ ] Verify mobile drawer delete works (shared Zustand state)
- [ ] Verify thinking indicator shows ScaleLoader + pulsing text, no
dark circle flash
- [ ] Verify stopping a stream immediately unlocks the input and shows
"You manually stopped this chat"
- [ ] Verify marker prefix parsing still works with hex-suffixed
prefixes
- [ ] `pnpm format && pnpm lint && pnpm types` pass

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Ubbe
2026-03-03 19:56:24 +08:00
committed by GitHub
parent 1c51dd18aa
commit 9442c648a4
17 changed files with 910 additions and 622 deletions

View File

@@ -487,7 +487,7 @@ async def stream_chat_post(
)
while True:
try:
chunk = await asyncio.wait_for(subscriber_queue.get(), timeout=30.0)
chunk = await asyncio.wait_for(subscriber_queue.get(), timeout=10.0)
chunks_yielded += 1
if not first_chunk_yielded:
@@ -640,7 +640,7 @@ async def resume_session_stream(
try:
while True:
try:
chunk = await asyncio.wait_for(subscriber_queue.get(), timeout=30.0)
chunk = await asyncio.wait_for(subscriber_queue.get(), timeout=10.0)
if chunk_count < 3:
logger.info(
"Resume stream chunk",

View File

@@ -134,13 +134,15 @@ class CapturedTranscript:
_SDK_CWD_PREFIX = WORKSPACE_PREFIX
# Special message prefixes for text-based markers (parsed by frontend)
COPILOT_ERROR_PREFIX = "[COPILOT_ERROR]" # Renders as ErrorCard
COPILOT_SYSTEM_PREFIX = "[COPILOT_SYSTEM]" # Renders as system info message
# Special message prefixes for text-based markers (parsed by frontend).
# The hex suffix makes accidental LLM generation of these strings virtually
# impossible, avoiding false-positive marker detection in normal conversation.
COPILOT_ERROR_PREFIX = "[__COPILOT_ERROR_f7a1__]" # Renders as ErrorCard
COPILOT_SYSTEM_PREFIX = "[__COPILOT_SYSTEM_e3b0__]" # Renders as system info message
# Heartbeat interval — keep SSE alive through proxies/LBs during tool execution.
# IMPORTANT: Must be less than frontend timeout (12s in useCopilotPage.ts)
_HEARTBEAT_INTERVAL = 10.0 # seconds
_HEARTBEAT_INTERVAL = 3.0 # seconds
# Appended to the system prompt to inform the agent about available tools.

View File

@@ -1,3 +1,4 @@
import logging
import uuid
from datetime import UTC, datetime
from os import getenv
@@ -12,12 +13,34 @@ from backend.blocks.firecrawl.scrape import FirecrawlScrapeBlock
from backend.blocks.io import AgentInputBlock, AgentOutputBlock
from backend.blocks.llm import AITextGeneratorBlock
from backend.copilot.model import ChatSession
from backend.data import db as db_module
from backend.data.db import prisma
from backend.data.graph import Graph, Link, Node, create_graph
from backend.data.model import APIKeyCredentials
from backend.data.user import get_or_create_user
from backend.integrations.credentials_store import IntegrationCredentialsStore
_logger = logging.getLogger(__name__)
async def _ensure_db_connected() -> None:
"""Ensure the Prisma connection is alive on the current event loop.
On Python 3.11, the httpx transport inside Prisma can reference a stale
(closed) event loop when session-scoped async fixtures are evaluated long
after the initial ``server`` fixture connected Prisma. A cheap health-check
followed by a reconnect fixes this without affecting other fixtures.
"""
try:
await prisma.query_raw("SELECT 1")
except Exception:
_logger.info("Prisma connection stale reconnecting")
try:
await db_module.disconnect()
except Exception:
pass
await db_module.connect()
def make_session(user_id: str):
return ChatSession(
@@ -43,6 +66,8 @@ async def setup_test_data(server):
Depends on ``server`` to ensure Prisma is connected.
"""
await _ensure_db_connected()
# 1. Create a test user
user_data = {
"sub": f"test-user-{uuid.uuid4()}",
@@ -164,6 +189,8 @@ async def setup_llm_test_data(server):
Depends on ``server`` to ensure Prisma is connected.
"""
await _ensure_db_connected()
key = getenv("OPENAI_API_KEY")
if not key:
return pytest.skip("OPENAI_API_KEY is not set")
@@ -330,6 +357,8 @@ async def setup_firecrawl_test_data(server):
Depends on ``server`` to ensure Prisma is connected.
"""
await _ensure_db_connected()
# 1. Create a test user
user_data = {
"sub": f"test-user-{uuid.uuid4()}",

View File

@@ -1,188 +1,15 @@
import { getGetWorkspaceDownloadFileByIdUrl } from "@/app/api/__generated__/endpoints/workspace/workspace";
import {
Conversation,
ConversationContent,
ConversationScrollButton,
} from "@/components/ai-elements/conversation";
import {
Message,
MessageContent,
MessageResponse,
} from "@/components/ai-elements/message";
import { Message, MessageContent } from "@/components/ai-elements/message";
import { LoadingSpinner } from "@/components/atoms/LoadingSpinner/LoadingSpinner";
import { ErrorCard } from "@/components/molecules/ErrorCard/ErrorCard";
import { ToolUIPart, UIDataTypes, UIMessage, UITools } from "ai";
import { useEffect, useState } from "react";
import { CreateAgentTool } from "../../tools/CreateAgent/CreateAgent";
import { EditAgentTool } from "../../tools/EditAgent/EditAgent";
import {
CreateFeatureRequestTool,
SearchFeatureRequestsTool,
} from "../../tools/FeatureRequests/FeatureRequests";
import { FindAgentsTool } from "../../tools/FindAgents/FindAgents";
import { FindBlocksTool } from "../../tools/FindBlocks/FindBlocks";
import { RunAgentTool } from "../../tools/RunAgent/RunAgent";
import { RunBlockTool } from "../../tools/RunBlock/RunBlock";
import { SearchDocsTool } from "../../tools/SearchDocs/SearchDocs";
import { GenericTool } from "../../tools/GenericTool/GenericTool";
import { ViewAgentOutputTool } from "../../tools/ViewAgentOutput/ViewAgentOutput";
import { UIDataTypes, UIMessage, UITools } from "ai";
import { MessagePartRenderer } from "./components/MessagePartRenderer";
import { ThinkingIndicator } from "./components/ThinkingIndicator";
// ---------------------------------------------------------------------------
// Special text parsing (error markers, workspace URLs, etc.)
// ---------------------------------------------------------------------------
// Special message prefixes for text-based markers (set by backend)
const COPILOT_ERROR_PREFIX = "[COPILOT_ERROR]";
const COPILOT_SYSTEM_PREFIX = "[COPILOT_SYSTEM]";
type MarkerType = "error" | "system" | null;
/**
* Parse special markers from message content (error, system).
*
* Detects markers added by the backend for special rendering:
* - `[COPILOT_ERROR] message` → ErrorCard
* - `[COPILOT_SYSTEM] message` → System info message
*
* Returns marker type, marker text, and cleaned text.
*/
function parseSpecialMarkers(text: string): {
markerType: MarkerType;
markerText: string;
cleanText: string;
} {
// Check for error marker
const errorMatch = text.match(
new RegExp(`\\${COPILOT_ERROR_PREFIX}\\s*(.+?)$`, "s"),
);
if (errorMatch) {
return {
markerType: "error",
markerText: errorMatch[1].trim(),
cleanText: text.replace(errorMatch[0], "").trim(),
};
}
// Check for system marker
const systemMatch = text.match(
new RegExp(`\\${COPILOT_SYSTEM_PREFIX}\\s*(.+?)$`, "s"),
);
if (systemMatch) {
return {
markerType: "system",
markerText: systemMatch[1].trim(),
cleanText: text.replace(systemMatch[0], "").trim(),
};
}
return { markerType: null, markerText: "", cleanText: text };
}
/**
* Resolve workspace:// URLs in markdown text to proxy download URLs.
*
* Handles both image syntax `![alt](workspace://id#mime)` and regular link
* syntax `[text](workspace://id)`. For images the MIME type hash fragment is
* inspected so that videos can be rendered with a `<video>` element via the
* custom img component.
*/
function resolveWorkspaceUrls(text: string): string {
// Handle image links: ![alt](workspace://id#mime)
let resolved = text.replace(
/!\[([^\]]*)\]\(workspace:\/\/([^)#\s]+)(?:#([^)#\s]*))?\)/g,
(_match, alt: string, fileId: string, mimeHint?: string) => {
const apiPath = getGetWorkspaceDownloadFileByIdUrl(fileId);
const url = `/api/proxy${apiPath}`;
if (mimeHint?.startsWith("video/")) {
return `![video:${alt || "Video"}](${url})`;
}
return `![${alt || "Image"}](${url})`;
},
);
// Handle regular links: [text](workspace://id) — without the leading "!"
// These are blocked by Streamdown's rehype-harden sanitizer because
// "workspace://" is not in the allowed URL-scheme whitelist, which causes
// "[blocked]" to appear next to the link text.
// Use an absolute URL so Streamdown's "Copy link" button copies the full
// URL (including host) rather than just the path.
resolved = resolved.replace(
/(?<!!)\[([^\]]*)\]\(workspace:\/\/([^)#\s]+)(?:#[^)#\s]*)?\)/g,
(_match, linkText: string, fileId: string) => {
const apiPath = getGetWorkspaceDownloadFileByIdUrl(fileId);
const origin =
typeof window !== "undefined" ? window.location.origin : "";
const url = `${origin}/api/proxy${apiPath}`;
return `[${linkText || "Download file"}](${url})`;
},
);
return resolved;
}
/**
* Custom img component for Streamdown that renders <video> elements
* for workspace video files (detected via "video:" alt-text prefix).
* Falls back to <video> when an <img> fails to load for workspace files.
*/
function WorkspaceMediaImage(props: React.JSX.IntrinsicElements["img"]) {
const { src, alt, ...rest } = props;
const [imgFailed, setImgFailed] = useState(false);
const isWorkspace = src?.includes("/workspace/files/") ?? false;
if (!src) return null;
if (alt?.startsWith("video:") || (imgFailed && isWorkspace)) {
return (
<span className="my-2 inline-block">
<video
controls
className="h-auto max-w-full rounded-md border border-zinc-200"
preload="metadata"
>
<source src={src} />
Your browser does not support the video tag.
</video>
</span>
);
}
return (
// eslint-disable-next-line @next/next/no-img-element
<img
src={src}
alt={alt || "Image"}
className="h-auto max-w-full rounded-md border border-zinc-200"
loading="lazy"
onError={() => {
if (isWorkspace) setImgFailed(true);
}}
{...rest}
/>
);
}
/** Stable components override for Streamdown (avoids re-creating on every render). */
const STREAMDOWN_COMPONENTS = { img: WorkspaceMediaImage };
const THINKING_PHRASES = [
"Thinking...",
"Considering this...",
"Working through this...",
"Analyzing your request...",
"Reasoning...",
"Looking into it...",
"Processing your request...",
"Mulling this over...",
"Piecing it together...",
"On it...",
];
function getRandomPhrase() {
return THINKING_PHRASES[Math.floor(Math.random() * THINKING_PHRASES.length)];
}
interface ChatMessagesContainerProps {
interface Props {
messages: UIMessage<unknown, UIDataTypes, UITools>[];
status: string;
error: Error | undefined;
@@ -190,15 +17,13 @@ interface ChatMessagesContainerProps {
headerSlot?: React.ReactNode;
}
export const ChatMessagesContainer = ({
export function ChatMessagesContainer({
messages,
status,
error,
isLoading,
headerSlot,
}: ChatMessagesContainerProps) => {
const [thinkingPhrase, setThinkingPhrase] = useState(getRandomPhrase);
}: Props) {
const lastMessage = messages[messages.length - 1];
// Determine if something is visibly "in-flight" in the last assistant message:
@@ -230,12 +55,6 @@ export const ChatMessagesContainer = ({
const showThinking =
status === "submitted" || (status === "streaming" && !hasInflight);
useEffect(() => {
if (showThinking) {
setThinkingPhrase(getRandomPhrase());
}
}, [showThinking]);
return (
<Conversation className="min-h-0 flex-1">
<ConversationContent className="flex flex-1 flex-col gap-6 px-3 py-6">
@@ -262,134 +81,16 @@ export const ChatMessagesContainer = ({
"group-[.is-assistant]:bg-transparent group-[.is-assistant]:text-slate-900"
}
>
{message.parts.map((part, i) => {
switch (part.type) {
case "text": {
// Check for special markers (error, system)
const { markerType, markerText, cleanText } =
parseSpecialMarkers(part.text);
if (markerType === "error") {
return (
<ErrorCard
key={`${message.id}-${i}`}
responseError={{ message: markerText }}
context="execution"
/>
);
}
if (markerType === "system") {
return (
<div
key={`${message.id}-${i}`}
className="my-2 rounded-lg bg-neutral-100 px-3 py-2 text-sm italic text-neutral-600"
>
{markerText}
</div>
);
}
return (
<MessageResponse
key={`${message.id}-${i}`}
components={STREAMDOWN_COMPONENTS}
>
{resolveWorkspaceUrls(cleanText)}
</MessageResponse>
);
}
case "tool-find_block":
return (
<FindBlocksTool
key={`${message.id}-${i}`}
part={part as ToolUIPart}
/>
);
case "tool-find_agent":
case "tool-find_library_agent":
return (
<FindAgentsTool
key={`${message.id}-${i}`}
part={part as ToolUIPart}
/>
);
case "tool-search_docs":
case "tool-get_doc_page":
return (
<SearchDocsTool
key={`${message.id}-${i}`}
part={part as ToolUIPart}
/>
);
case "tool-run_block":
return (
<RunBlockTool
key={`${message.id}-${i}`}
part={part as ToolUIPart}
/>
);
case "tool-run_agent":
case "tool-schedule_agent":
return (
<RunAgentTool
key={`${message.id}-${i}`}
part={part as ToolUIPart}
/>
);
case "tool-create_agent":
return (
<CreateAgentTool
key={`${message.id}-${i}`}
part={part as ToolUIPart}
/>
);
case "tool-edit_agent":
return (
<EditAgentTool
key={`${message.id}-${i}`}
part={part as ToolUIPart}
/>
);
case "tool-view_agent_output":
return (
<ViewAgentOutputTool
key={`${message.id}-${i}`}
part={part as ToolUIPart}
/>
);
case "tool-search_feature_requests":
return (
<SearchFeatureRequestsTool
key={`${message.id}-${i}`}
part={part as ToolUIPart}
/>
);
case "tool-create_feature_request":
return (
<CreateFeatureRequestTool
key={`${message.id}-${i}`}
part={part as ToolUIPart}
/>
);
default:
// Render a generic tool indicator for SDK built-in
// tools (Read, Glob, Grep, etc.) or any unrecognized tool
if (part.type.startsWith("tool-")) {
return (
<GenericTool
key={`${message.id}-${i}`}
part={part as ToolUIPart}
/>
);
}
return null;
}
})}
{message.parts.map((part, i) => (
<MessagePartRenderer
key={`${message.id}-${i}`}
part={part}
messageID={message.id}
partIndex={i}
/>
))}
{isLastAssistant && showThinking && (
<span className="inline-block animate-shimmer bg-gradient-to-r from-neutral-400 via-neutral-600 to-neutral-400 bg-[length:200%_100%] bg-clip-text text-transparent">
{thinkingPhrase}
</span>
<ThinkingIndicator active={showThinking} />
)}
</MessageContent>
</Message>
@@ -398,9 +99,7 @@ export const ChatMessagesContainer = ({
{showThinking && lastMessage?.role !== "assistant" && (
<Message from="assistant">
<MessageContent className="text-[1rem] leading-relaxed">
<span className="inline-block animate-shimmer bg-gradient-to-r from-neutral-400 via-neutral-600 to-neutral-400 bg-[length:200%_100%] bg-clip-text text-transparent">
{thinkingPhrase}
</span>
<ThinkingIndicator active={showThinking} />
</MessageContent>
</Message>
)}
@@ -419,4 +118,4 @@ export const ChatMessagesContainer = ({
<ConversationScrollButton />
</Conversation>
);
};
}

View File

@@ -0,0 +1,153 @@
import { MessageResponse } from "@/components/ai-elements/message";
import { ErrorCard } from "@/components/molecules/ErrorCard/ErrorCard";
import { ExclamationMarkIcon } from "@phosphor-icons/react";
import { ToolUIPart, UIDataTypes, UIMessage, UITools } from "ai";
import { useState } from "react";
import { CreateAgentTool } from "../../../tools/CreateAgent/CreateAgent";
import { EditAgentTool } from "../../../tools/EditAgent/EditAgent";
import {
CreateFeatureRequestTool,
SearchFeatureRequestsTool,
} from "../../../tools/FeatureRequests/FeatureRequests";
import { FindAgentsTool } from "../../../tools/FindAgents/FindAgents";
import { FindBlocksTool } from "../../../tools/FindBlocks/FindBlocks";
import { GenericTool } from "../../../tools/GenericTool/GenericTool";
import { RunAgentTool } from "../../../tools/RunAgent/RunAgent";
import { RunBlockTool } from "../../../tools/RunBlock/RunBlock";
import { SearchDocsTool } from "../../../tools/SearchDocs/SearchDocs";
import { ViewAgentOutputTool } from "../../../tools/ViewAgentOutput/ViewAgentOutput";
import { parseSpecialMarkers, resolveWorkspaceUrls } from "../helpers";
/**
* Custom img component for Streamdown that renders <video> elements
* for workspace video files (detected via "video:" alt-text prefix).
* Falls back to <video> when an <img> fails to load for workspace files.
*/
function WorkspaceMediaImage(props: React.JSX.IntrinsicElements["img"]) {
const { src, alt, ...rest } = props;
const [imgFailed, setImgFailed] = useState(false);
const isWorkspace = src?.includes("/workspace/files/") ?? false;
if (!src) return null;
if (alt?.startsWith("video:") || (imgFailed && isWorkspace)) {
return (
<span className="my-2 inline-block">
<video
controls
className="h-auto max-w-full rounded-md border border-zinc-200"
preload="metadata"
>
<source src={src} />
Your browser does not support the video tag.
</video>
</span>
);
}
return (
// eslint-disable-next-line @next/next/no-img-element
<img
src={src}
alt={alt || "Image"}
className="h-auto max-w-full rounded-md border border-zinc-200"
loading="lazy"
onError={() => {
if (isWorkspace) setImgFailed(true);
}}
{...rest}
/>
);
}
/** Stable components override for Streamdown (avoids re-creating on every render). */
const STREAMDOWN_COMPONENTS = { img: WorkspaceMediaImage };
interface Props {
part: UIMessage<unknown, UIDataTypes, UITools>["parts"][number];
messageID: string;
partIndex: number;
}
export function MessagePartRenderer({ part, messageID, partIndex }: Props) {
const key = `${messageID}-${partIndex}`;
switch (part.type) {
case "text": {
const { markerType, markerText, cleanText } = parseSpecialMarkers(
part.text,
);
if (markerType === "error") {
const lowerMarker = markerText.toLowerCase();
const isCancellation =
lowerMarker === "operation cancelled" ||
lowerMarker === "execution stopped by user";
if (isCancellation) {
return (
<div
key={key}
className="my-2 flex items-center gap-1 rounded-lg bg-neutral-200/50 px-3 py-2 text-sm text-neutral-600"
>
<ExclamationMarkIcon size={16} /> You manually stopped this chat
</div>
);
}
return (
<ErrorCard
key={key}
responseError={{ message: markerText }}
context="execution"
/>
);
}
if (markerType === "system") {
return (
<div
key={key}
className="my-2 rounded-lg bg-neutral-100 px-3 py-2 text-sm italic text-neutral-600"
>
{markerText}
</div>
);
}
return (
<MessageResponse key={key} components={STREAMDOWN_COMPONENTS}>
{resolveWorkspaceUrls(cleanText)}
</MessageResponse>
);
}
case "tool-find_block":
return <FindBlocksTool key={key} part={part as ToolUIPart} />;
case "tool-find_agent":
case "tool-find_library_agent":
return <FindAgentsTool key={key} part={part as ToolUIPart} />;
case "tool-search_docs":
case "tool-get_doc_page":
return <SearchDocsTool key={key} part={part as ToolUIPart} />;
case "tool-run_block":
return <RunBlockTool key={key} part={part as ToolUIPart} />;
case "tool-run_agent":
case "tool-schedule_agent":
return <RunAgentTool key={key} part={part as ToolUIPart} />;
case "tool-create_agent":
return <CreateAgentTool key={key} part={part as ToolUIPart} />;
case "tool-edit_agent":
return <EditAgentTool key={key} part={part as ToolUIPart} />;
case "tool-view_agent_output":
return <ViewAgentOutputTool key={key} part={part as ToolUIPart} />;
case "tool-search_feature_requests":
return <SearchFeatureRequestsTool key={key} part={part as ToolUIPart} />;
case "tool-create_feature_request":
return <CreateFeatureRequestTool key={key} part={part as ToolUIPart} />;
default:
// Render a generic tool indicator for SDK built-in
// tools (Read, Glob, Grep, etc.) or any unrecognized tool
if (part.type.startsWith("tool-")) {
return <GenericTool key={key} part={part as ToolUIPart} />;
}
return null;
}
}

View File

@@ -0,0 +1,93 @@
import { useEffect, useRef, useState } from "react";
import { ScaleLoader } from "../../ScaleLoader/ScaleLoader";
const THINKING_PHRASES = [
"Thinking...",
"Considering this...",
"Working through this...",
"Analyzing your request...",
"Reasoning...",
"Looking into it...",
"Processing your request...",
"Mulling this over...",
"Piecing it together...",
"On it...",
"Connecting the dots...",
"Exploring possibilities...",
"Weighing options...",
"Diving deeper...",
"Gathering thoughts...",
"Almost there...",
"Figuring this out...",
"Putting it together...",
"Running through ideas...",
"Wrapping my head around this...",
];
const PHRASE_CYCLE_MS = 6_000;
const FADE_DURATION_MS = 300;
/**
* Cycles through thinking phrases sequentially with a fade-out/in transition.
* Returns the current phrase and whether it's visible (for opacity).
*/
function useCyclingPhrase(active: boolean) {
const indexRef = useRef(0);
const [phrase, setPhrase] = useState(THINKING_PHRASES[0]);
const [visible, setVisible] = useState(true);
const fadeTimeoutRef = useRef<ReturnType<typeof setTimeout> | null>(null);
// Reset to the first phrase when thinking restarts
const prevActive = useRef(active);
useEffect(() => {
if (active && !prevActive.current) {
indexRef.current = 0;
setPhrase(THINKING_PHRASES[0]);
setVisible(true);
}
prevActive.current = active;
}, [active]);
useEffect(() => {
if (!active) return;
const id = setInterval(() => {
setVisible(false);
fadeTimeoutRef.current = setTimeout(() => {
indexRef.current = (indexRef.current + 1) % THINKING_PHRASES.length;
setPhrase(THINKING_PHRASES[indexRef.current]);
setVisible(true);
}, FADE_DURATION_MS);
}, PHRASE_CYCLE_MS);
return () => {
clearInterval(id);
if (fadeTimeoutRef.current) {
clearTimeout(fadeTimeoutRef.current);
fadeTimeoutRef.current = null;
}
};
}, [active]);
return { phrase, visible };
}
interface Props {
active: boolean;
}
export function ThinkingIndicator({ active }: Props) {
const { phrase, visible } = useCyclingPhrase(active);
return (
<span className="inline-flex items-center gap-1.5 text-neutral-500">
<ScaleLoader size={16} />
<span
className="transition-opacity duration-300"
style={{ opacity: visible ? 1 : 0 }}
>
<span className="animate-pulse [animation-duration:1.5s]">
{phrase}
</span>
</span>
</span>
);
}

View File

@@ -0,0 +1,92 @@
import { getGetWorkspaceDownloadFileByIdUrl } from "@/app/api/__generated__/endpoints/workspace/workspace";
// Special message prefixes for text-based markers (set by backend).
// The hex suffix makes it virtually impossible for an LLM to accidentally
// produce these strings in normal conversation.
const COPILOT_ERROR_PREFIX = "[__COPILOT_ERROR_f7a1__]";
const COPILOT_SYSTEM_PREFIX = "[__COPILOT_SYSTEM_e3b0__]";
export type MarkerType = "error" | "system" | null;
/** Escape all regex special characters in a string. */
function escapeRegExp(s: string): string {
return s.replace(/[.*+?^${}()|[\]\\]/g, "\\$&");
}
// Pre-compiled marker regexes (avoids re-creating on every call / render)
const ERROR_MARKER_RE = new RegExp(
`${escapeRegExp(COPILOT_ERROR_PREFIX)}\\s*(.+?)$`,
"s",
);
const SYSTEM_MARKER_RE = new RegExp(
`${escapeRegExp(COPILOT_SYSTEM_PREFIX)}\\s*(.+?)$`,
"s",
);
export function parseSpecialMarkers(text: string): {
markerType: MarkerType;
markerText: string;
cleanText: string;
} {
const errorMatch = text.match(ERROR_MARKER_RE);
if (errorMatch) {
return {
markerType: "error",
markerText: errorMatch[1].trim(),
cleanText: text.replace(errorMatch[0], "").trim(),
};
}
const systemMatch = text.match(SYSTEM_MARKER_RE);
if (systemMatch) {
return {
markerType: "system",
markerText: systemMatch[1].trim(),
cleanText: text.replace(systemMatch[0], "").trim(),
};
}
return { markerType: null, markerText: "", cleanText: text };
}
/**
* Resolve workspace:// URLs in markdown text to proxy download URLs.
*
* Handles both image syntax `![alt](workspace://id#mime)` and regular link
* syntax `[text](workspace://id)`. For images the MIME type hash fragment is
* inspected so that videos can be rendered with a `<video>` element via the
* custom img component.
*/
export function resolveWorkspaceUrls(text: string): string {
// Handle image links: ![alt](workspace://id#mime)
let resolved = text.replace(
/!\[([^\]]*)\]\(workspace:\/\/([^)#\s]+)(?:#([^)#\s]*))?\)/g,
(_match, alt: string, fileId: string, mimeHint?: string) => {
const apiPath = getGetWorkspaceDownloadFileByIdUrl(fileId);
const url = `/api/proxy${apiPath}`;
if (mimeHint?.startsWith("video/")) {
return `![video:${alt || "Video"}](${url})`;
}
return `![${alt || "Image"}](${url})`;
},
);
// Handle regular links: [text](workspace://id) — without the leading "!"
// These are blocked by Streamdown's rehype-harden sanitizer because
// "workspace://" is not in the allowed URL-scheme whitelist, which causes
// "[blocked]" to appear next to the link text.
// Use an absolute URL so Streamdown's "Copy link" button copies the full
// URL (including host) rather than just the path.
resolved = resolved.replace(
/(?<!!)\[([^\]]*)\]\(workspace:\/\/([^)#\s]+)(?:#[^)#\s]*)?\)/g,
(_match, linkText: string, fileId: string) => {
const apiPath = getGetWorkspaceDownloadFileByIdUrl(fileId);
const origin =
typeof window !== "undefined" ? window.location.origin : "";
const url = `${origin}/api/proxy${apiPath}`;
return `[${linkText || "Download file"}](${url})`;
},
);
return resolved;
}

View File

@@ -27,17 +27,14 @@ import { DotsThree, PlusCircleIcon, PlusIcon } from "@phosphor-icons/react";
import { useQueryClient } from "@tanstack/react-query";
import { motion } from "framer-motion";
import { parseAsString, useQueryState } from "nuqs";
import { useState } from "react";
import { useCopilotUIStore } from "../../store";
import { DeleteChatDialog } from "../DeleteChatDialog/DeleteChatDialog";
export function ChatSidebar() {
const { state } = useSidebar();
const isCollapsed = state === "collapsed";
const [sessionId, setSessionId] = useQueryState("sessionId", parseAsString);
const [sessionToDelete, setSessionToDelete] = useState<{
id: string;
title: string | null | undefined;
} | null>(null);
const { sessionToDelete, setSessionToDelete } = useCopilotUIStore();
const queryClient = useQueryClient();
@@ -48,11 +45,9 @@ export function ChatSidebar() {
useDeleteV2DeleteSession({
mutation: {
onSuccess: () => {
// Invalidate sessions list to refetch
queryClient.invalidateQueries({
queryKey: getGetV2ListSessionsQueryKey(),
});
// If we deleted the current session, clear selection
if (sessionToDelete?.id === sessionId) {
setSessionId(null);
}
@@ -86,8 +81,8 @@ export function ChatSidebar() {
id: string,
title: string | null | undefined,
) {
e.stopPropagation(); // Prevent session selection
if (isDeleting) return; // Prevent double-click during deletion
e.stopPropagation();
if (isDeleting) return;
setSessionToDelete({ id, title });
}

View File

@@ -17,6 +17,7 @@
left: 0;
top: 0;
animation: animloader 2s linear infinite;
animation-fill-mode: backwards;
}
.loader::after {

View File

@@ -0,0 +1,59 @@
import type { UIMessage } from "ai";
/** Mark any in-progress tool parts as completed/errored so spinners stop. */
export function resolveInProgressTools(
messages: UIMessage[],
outcome: "completed" | "cancelled",
): UIMessage[] {
return messages.map((msg) => ({
...msg,
parts: msg.parts.map((part) =>
"state" in part &&
(part.state === "input-streaming" || part.state === "input-available")
? outcome === "cancelled"
? { ...part, state: "output-error" as const, errorText: "Cancelled" }
: { ...part, state: "output-available" as const, output: "" }
: part,
),
}));
}
/**
* Deduplicate messages by ID and by consecutive content fingerprint.
*
* ID dedup catches exact duplicates within the same source.
* Content dedup only compares each assistant message to its **immediate
* predecessor** — this catches hydration/stream boundary duplicates (where
* the same content appears under different IDs) without accidentally
* removing legitimately repeated assistant responses that are far apart.
*/
export function deduplicateMessages(messages: UIMessage[]): UIMessage[] {
const seenIds = new Set<string>();
let lastAssistantFingerprint = "";
return messages.filter((msg) => {
if (seenIds.has(msg.id)) return false;
seenIds.add(msg.id);
if (msg.role === "assistant") {
const fingerprint = msg.parts
.map(
(p) =>
("text" in p && p.text) ||
("toolCallId" in p && p.toolCallId) ||
"",
)
.join("|");
// Only dedup if this assistant message is identical to the previous one
if (fingerprint && fingerprint === lastAssistantFingerprint) return false;
if (fingerprint) lastAssistantFingerprint = fingerprint;
} else {
// Reset on non-assistant messages so that identical assistant responses
// separated by a user message (e.g. "Done!" → user → "Done!") are kept.
lastAssistantFingerprint = "";
}
return true;
});
}

View File

@@ -116,10 +116,12 @@ export function convertChatSessionMessagesToUiMessages(
output: "",
});
} else {
// Active stream exists: Skip incomplete tool calls during hydration.
// The resume stream will deliver them fresh with proper SDK state.
// This prevents "No tool invocation found" errors on page refresh.
continue;
parts.push({
type: `tool-${toolName}`,
toolCallId,
state: "input-available",
input,
});
}
}
}

View File

@@ -0,0 +1,22 @@
import { create } from "zustand";
export interface DeleteTarget {
id: string;
title: string | null | undefined;
}
interface CopilotUIState {
sessionToDelete: DeleteTarget | null;
setSessionToDelete: (target: DeleteTarget | null) => void;
isDrawerOpen: boolean;
setDrawerOpen: (open: boolean) => void;
}
export const useCopilotUIStore = create<CopilotUIState>((set) => ({
sessionToDelete: null,
setSessionToDelete: (target) => set({ sessionToDelete: target }),
isDrawerOpen: false,
setDrawerOpen: (open) => set({ isDrawerOpen: open }),
}));

View File

@@ -25,10 +25,12 @@ export function useChatSession() {
},
});
// When the user navigates away from a session, invalidate its query cache.
// Invalidate query cache on session switch.
// useChat destroys its Chat instance on id change, so messages are lost.
// Invalidating ensures the next visit fetches fresh data from the API
// instead of hydrating from stale cache that's missing recent messages.
// We invalidate BOTH the old session (stale after leaving) and the new
// session (may have been cached before the user sent their last message,
// so active_stream and messages could be outdated). This guarantees a
// fresh fetch that gives the resume effect accurate hasActiveStream state.
const prevSessionIdRef = useRef(sessionId);
useEffect(() => {
@@ -39,15 +41,21 @@ export function useChatSession() {
queryKey: getGetV2GetSessionQueryKey(prev),
});
}
if (sessionId) {
queryClient.invalidateQueries({
queryKey: getGetV2GetSessionQueryKey(sessionId),
});
}
}, [sessionId, queryClient]);
// Expose active_stream info so the caller can trigger manual resume
// after hydration completes (rather than relying on AI SDK's built-in
// resume which fires before hydration).
const hasActiveStream = useMemo(() => {
if (sessionQuery.isFetching) return false;
if (sessionQuery.data?.status !== 200) return false;
return !!sessionQuery.data.data.active_stream;
}, [sessionQuery.data, sessionId]);
}, [sessionQuery.data, sessionQuery.isFetching, sessionId]);
// Memoize so the effect in useCopilotPage doesn't infinite-loop on a new
// array reference every render. Re-derives only when query data changes.

View File

@@ -1,62 +1,25 @@
import {
getGetV2GetSessionQueryKey,
getGetV2ListSessionsQueryKey,
postV2CancelSessionTask,
useDeleteV2DeleteSession,
useGetV2ListSessions,
} from "@/app/api/__generated__/endpoints/chat/chat";
import { toast } from "@/components/molecules/Toast/use-toast";
import { useBreakpoint } from "@/lib/hooks/useBreakpoint";
import { useSupabase } from "@/lib/supabase/hooks/useSupabase";
import { useChat } from "@ai-sdk/react";
import { useQueryClient } from "@tanstack/react-query";
import { DefaultChatTransport } from "ai";
import type { UIMessage } from "ai";
import { useCallback, useEffect, useMemo, useRef, useState } from "react";
import { useEffect, useState } from "react";
import { useCopilotUIStore } from "./store";
import { useChatSession } from "./useChatSession";
const RECONNECT_BASE_DELAY_MS = 1_000;
const RECONNECT_MAX_DELAY_MS = 30_000;
const RECONNECT_MAX_ATTEMPTS = 5;
/** Mark any in-progress tool parts as completed/errored so spinners stop. */
function resolveInProgressTools(
messages: UIMessage[],
outcome: "completed" | "cancelled",
): UIMessage[] {
return messages.map((msg) => ({
...msg,
parts: msg.parts.map((part) =>
"state" in part &&
(part.state === "input-streaming" || part.state === "input-available")
? outcome === "cancelled"
? { ...part, state: "output-error" as const, errorText: "Cancelled" }
: { ...part, state: "output-available" as const, output: "" }
: part,
),
}));
}
/** Simple ID-based deduplication - trust backend for correctness */
function deduplicateMessages(messages: UIMessage[]): UIMessage[] {
const seenIds = new Set<string>();
return messages.filter((msg) => {
if (seenIds.has(msg.id)) return false;
seenIds.add(msg.id);
return true;
});
}
import { useCopilotStream } from "./useCopilotStream";
export function useCopilotPage() {
const { isUserLoading, isLoggedIn } = useSupabase();
const [isDrawerOpen, setIsDrawerOpen] = useState(false);
const [pendingMessage, setPendingMessage] = useState<string | null>(null);
const [sessionToDelete, setSessionToDelete] = useState<{
id: string;
title: string | null | undefined;
} | null>(null);
const queryClient = useQueryClient();
const { sessionToDelete, setSessionToDelete, isDrawerOpen, setDrawerOpen } =
useCopilotUIStore();
const {
sessionId,
setSessionId,
@@ -69,6 +32,22 @@ export function useCopilotPage() {
refetchSession,
} = useChatSession();
const {
messages,
sendMessage,
stop,
status,
error,
isReconnecting,
isUserStoppingRef,
} = useCopilotStream({
sessionId,
hydratedMessages,
hasActiveStream,
refetchSession,
});
// --- Delete session ---
const { mutate: deleteSessionMutation, isPending: isDeleting } =
useDeleteV2DeleteSession({
mutation: {
@@ -93,224 +72,12 @@ export function useCopilotPage() {
},
});
// --- Responsive ---
const breakpoint = useBreakpoint();
const isMobile =
breakpoint === "base" || breakpoint === "sm" || breakpoint === "md";
const transport = useMemo(
() =>
sessionId
? new DefaultChatTransport({
api: `/api/chat/sessions/${sessionId}/stream`,
prepareSendMessagesRequest: ({ messages }) => {
const last = messages[messages.length - 1];
return {
body: {
message: (
last.parts?.map((p) => (p.type === "text" ? p.text : "")) ??
[]
).join(""),
is_user_message: last.role === "user",
context: null,
},
};
},
// Resume: GET goes to the same URL as POST (backend uses
// method to distinguish). Override the default formula which
// would append /{chatId}/stream to the existing path.
prepareReconnectToStreamRequest: () => ({
api: `/api/chat/sessions/${sessionId}/stream`,
}),
})
: null,
[sessionId],
);
// Reconnect state
const [reconnectAttempts, setReconnectAttempts] = useState(0);
const [isReconnectScheduled, setIsReconnectScheduled] = useState(false);
const reconnectTimerRef = useRef<ReturnType<typeof setTimeout>>();
const hasShownDisconnectToast = useRef(false);
// Consolidated reconnect logic
function handleReconnect(sid: string) {
if (isReconnectScheduled || !sid) return;
const nextAttempt = reconnectAttempts + 1;
if (nextAttempt > RECONNECT_MAX_ATTEMPTS) {
toast({
title: "Connection lost",
description: "Unable to reconnect. Please refresh the page.",
variant: "destructive",
});
return;
}
setIsReconnectScheduled(true);
setReconnectAttempts(nextAttempt);
if (!hasShownDisconnectToast.current) {
hasShownDisconnectToast.current = true;
toast({
title: "Connection lost",
description: "Reconnecting...",
});
}
const delay = Math.min(
RECONNECT_BASE_DELAY_MS * 2 ** reconnectAttempts,
RECONNECT_MAX_DELAY_MS,
);
reconnectTimerRef.current = setTimeout(() => {
setIsReconnectScheduled(false);
resumeStream();
}, delay);
}
const {
messages: rawMessages,
sendMessage,
stop: sdkStop,
status,
error,
setMessages,
resumeStream,
} = useChat({
id: sessionId ?? undefined,
transport: transport ?? undefined,
onFinish: async ({ isDisconnect, isAbort }) => {
if (isAbort || !sessionId) return;
if (isDisconnect) {
handleReconnect(sessionId);
return;
}
// Check if backend executor is still running after clean close
const result = await refetchSession();
const backendActive =
result.data?.status === 200 && !!result.data.data.active_stream;
if (backendActive) {
handleReconnect(sessionId);
}
},
onError: (error) => {
if (!sessionId) return;
// Only reconnect on network errors (not HTTP errors)
const isNetworkError =
error.name === "TypeError" || error.name === "AbortError";
if (isNetworkError) {
handleReconnect(sessionId);
}
},
});
// Deduplicate messages continuously to prevent duplicates when resuming streams
const messages = useMemo(
() => deduplicateMessages(rawMessages),
[rawMessages],
);
// Wrap AI SDK's stop() to also cancel the backend executor task.
// sdkStop() aborts the SSE fetch instantly (UI feedback), then we fire
// the cancel API to actually stop the executor and wait for confirmation.
async function stop() {
sdkStop();
setMessages((prev) => resolveInProgressTools(prev, "cancelled"));
if (!sessionId) return;
try {
const res = await postV2CancelSessionTask(sessionId);
if (
res.status === 200 &&
"reason" in res.data &&
res.data.reason === "cancel_published_not_confirmed"
) {
toast({
title: "Stop may take a moment",
description:
"The cancel was sent but not yet confirmed. The task should stop shortly.",
});
}
} catch {
toast({
title: "Could not stop the task",
description: "The task may still be running in the background.",
variant: "destructive",
});
}
}
// Hydrate messages from REST API when not actively streaming
useEffect(() => {
if (!hydratedMessages || hydratedMessages.length === 0) return;
if (status === "streaming" || status === "submitted") return;
if (isReconnectScheduled) return;
setMessages((prev) => {
if (prev.length >= hydratedMessages.length) return prev;
return deduplicateMessages(hydratedMessages);
});
}, [hydratedMessages, setMessages, status, isReconnectScheduled]);
// Track resume state per session
const hasResumedRef = useRef<Map<string, boolean>>(new Map());
// Clean up reconnect state on session switch
useEffect(() => {
clearTimeout(reconnectTimerRef.current);
reconnectTimerRef.current = undefined;
setReconnectAttempts(0);
setIsReconnectScheduled(false);
hasShownDisconnectToast.current = false;
prevStatusRef.current = status; // Reset to avoid cross-session state bleeding
}, [sessionId, status]);
// Invalidate session cache when stream completes
const prevStatusRef = useRef(status);
useEffect(() => {
const prev = prevStatusRef.current;
prevStatusRef.current = status;
const wasActive = prev === "streaming" || prev === "submitted";
const isIdle = status === "ready" || status === "error";
if (wasActive && isIdle && sessionId && !isReconnectScheduled) {
queryClient.invalidateQueries({
queryKey: getGetV2GetSessionQueryKey(sessionId),
});
if (status === "ready") {
setReconnectAttempts(0);
hasShownDisconnectToast.current = false;
}
}
}, [status, sessionId, queryClient, isReconnectScheduled]);
// Resume an active stream AFTER hydration completes.
// IMPORTANT: Only runs when page loads with existing active stream (reconnection).
// Does NOT run when new streams start during active conversation.
useEffect(() => {
if (!sessionId) return;
if (!hasActiveStream) return;
if (!hydratedMessages || hydratedMessages.length === 0) return;
// Never resume if currently streaming
if (status === "streaming" || status === "submitted") return;
// Only resume once per session
if (hasResumedRef.current.get(sessionId)) return;
// Mark as resumed immediately to prevent race conditions
hasResumedRef.current.set(sessionId, true);
resumeStream();
}, [sessionId, hasActiveStream, hydratedMessages, status, resumeStream]);
// Clear messages when session is null
useEffect(() => {
if (!sessionId) setMessages([]);
}, [sessionId, setMessages]);
// --- Send pending message after session creation ---
useEffect(() => {
if (!sessionId || !pendingMessage) return;
const msg = pendingMessage;
@@ -322,6 +89,8 @@ export function useCopilotPage() {
const trimmed = message.trim();
if (!trimmed) return;
isUserStoppingRef.current = false;
if (sessionId) {
sendMessage({ text: trimmed });
return;
@@ -331,6 +100,7 @@ export function useCopilotPage() {
await createSession();
}
// --- Session list (for mobile drawer & sidebar) ---
const { data: sessionsResponse, isLoading: isLoadingSessions } =
useGetV2ListSessions(
{ limit: 50 },
@@ -340,58 +110,52 @@ export function useCopilotPage() {
const sessions =
sessionsResponse?.status === 200 ? sessionsResponse.data.sessions : [];
// --- Mobile drawer handlers ---
function handleOpenDrawer() {
setIsDrawerOpen(true);
setDrawerOpen(true);
}
function handleCloseDrawer() {
setIsDrawerOpen(false);
setDrawerOpen(false);
}
function handleDrawerOpenChange(open: boolean) {
setIsDrawerOpen(open);
setDrawerOpen(open);
}
function handleSelectSession(id: string) {
setSessionId(id);
if (isMobile) setIsDrawerOpen(false);
if (isMobile) setDrawerOpen(false);
}
function handleNewChat() {
setSessionId(null);
if (isMobile) setIsDrawerOpen(false);
if (isMobile) setDrawerOpen(false);
}
const handleDeleteClick = useCallback(
(id: string, title: string | null | undefined) => {
if (isDeleting) return;
setSessionToDelete({ id, title });
},
[isDeleting],
);
// --- Delete handlers ---
function handleDeleteClick(id: string, title: string | null | undefined) {
if (isDeleting) return;
setSessionToDelete({ id, title });
}
const handleConfirmDelete = useCallback(() => {
function handleConfirmDelete() {
if (sessionToDelete) {
deleteSessionMutation({ sessionId: sessionToDelete.id });
}
}, [sessionToDelete, deleteSessionMutation]);
}
const handleCancelDelete = useCallback(() => {
function handleCancelDelete() {
if (!isDeleting) {
setSessionToDelete(null);
}
}, [isDeleting]);
// True while reconnecting or backend has active stream but we haven't connected yet
const isReconnecting =
isReconnectScheduled ||
(hasActiveStream && status !== "streaming" && status !== "submitted");
}
return {
sessionId,
messages,
status,
error: isReconnecting ? undefined : error,
error,
stop,
isReconnecting,
isLoadingSession,

View File

@@ -0,0 +1,367 @@
import {
getGetV2GetSessionQueryKey,
postV2CancelSessionTask,
} from "@/app/api/__generated__/endpoints/chat/chat";
import { toast } from "@/components/molecules/Toast/use-toast";
import { getWebSocketToken } from "@/lib/supabase/actions";
import { environment } from "@/services/environment";
import { useChat } from "@ai-sdk/react";
import { useQueryClient } from "@tanstack/react-query";
import { DefaultChatTransport } from "ai";
import type { UIMessage } from "ai";
import { useEffect, useMemo, useRef, useState } from "react";
import { deduplicateMessages, resolveInProgressTools } from "./helpers";
const RECONNECT_BASE_DELAY_MS = 1_000;
const RECONNECT_MAX_ATTEMPTS = 3;
/** Fetch a fresh JWT for direct backend requests (same pattern as WebSocket). */
async function getAuthHeaders(): Promise<Record<string, string>> {
const { token, error } = await getWebSocketToken();
if (error || !token) {
console.warn("[Copilot] Failed to get auth token:", error);
throw new Error("Authentication failed — please sign in again.");
}
return { Authorization: `Bearer ${token}` };
}
interface UseCopilotStreamArgs {
sessionId: string | null;
hydratedMessages: UIMessage[] | undefined;
hasActiveStream: boolean;
refetchSession: () => Promise<{ data?: unknown }>;
}
export function useCopilotStream({
sessionId,
hydratedMessages,
hasActiveStream,
refetchSession,
}: UseCopilotStreamArgs) {
const queryClient = useQueryClient();
// Connect directly to the Python backend for SSE, bypassing the Next.js
// serverless proxy. This eliminates the Vercel 800s function timeout that
// was the primary cause of stream disconnections on long-running tasks.
// Auth uses the same server-action token pattern as the WebSocket connection.
const transport = useMemo(
() =>
sessionId
? new DefaultChatTransport({
api: `${environment.getAGPTServerBaseUrl()}/api/chat/sessions/${sessionId}/stream`,
prepareSendMessagesRequest: async ({ messages }) => {
const last = messages[messages.length - 1];
return {
body: {
message: (
last.parts?.map((p) => (p.type === "text" ? p.text : "")) ??
[]
).join(""),
is_user_message: last.role === "user",
context: null,
},
headers: await getAuthHeaders(),
};
},
prepareReconnectToStreamRequest: async () => ({
api: `${environment.getAGPTServerBaseUrl()}/api/chat/sessions/${sessionId}/stream`,
headers: await getAuthHeaders(),
}),
})
: null,
[sessionId],
);
// Reconnect state — use refs for values read inside callbacks to avoid
// stale closures when multiple reconnect cycles fire in quick succession.
const reconnectAttemptsRef = useRef(0);
const isReconnectScheduledRef = useRef(false);
const [isReconnectScheduled, setIsReconnectScheduled] = useState(false);
const reconnectTimerRef = useRef<ReturnType<typeof setTimeout>>();
const hasShownDisconnectToast = useRef(false);
// Set when the user explicitly clicks stop — prevents onError from
// triggering a reconnect cycle for the resulting AbortError.
const isUserStoppingRef = useRef(false);
function handleReconnect(sid: string) {
if (isReconnectScheduledRef.current || !sid) return;
const nextAttempt = reconnectAttemptsRef.current + 1;
if (nextAttempt > RECONNECT_MAX_ATTEMPTS) {
toast({
title: "Connection lost",
description: "Unable to reconnect. Please refresh the page.",
variant: "destructive",
});
return;
}
isReconnectScheduledRef.current = true;
setIsReconnectScheduled(true);
reconnectAttemptsRef.current = nextAttempt;
if (!hasShownDisconnectToast.current) {
hasShownDisconnectToast.current = true;
toast({
title: "Connection lost",
description: "Reconnecting...",
});
}
const delay = RECONNECT_BASE_DELAY_MS * 2 ** (nextAttempt - 1);
reconnectTimerRef.current = setTimeout(() => {
isReconnectScheduledRef.current = false;
setIsReconnectScheduled(false);
resumeStream();
}, delay);
}
const {
messages: rawMessages,
sendMessage,
stop: sdkStop,
status,
error,
setMessages,
resumeStream,
} = useChat({
id: sessionId ?? undefined,
transport: transport ?? undefined,
onFinish: async ({ isDisconnect, isAbort }) => {
if (isAbort || !sessionId) return;
if (isDisconnect) {
handleReconnect(sessionId);
return;
}
// Check if backend executor is still running after clean close
const result = await refetchSession();
const d = result.data;
const backendActive =
d != null &&
typeof d === "object" &&
"status" in d &&
d.status === 200 &&
"data" in d &&
d.data != null &&
typeof d.data === "object" &&
"active_stream" in d.data &&
!!d.data.active_stream;
if (backendActive) {
handleReconnect(sessionId);
}
},
onError: (error) => {
if (!sessionId) return;
// Detect authentication failures (from getAuthHeaders or 401 responses)
const isAuthError =
error.message.includes("Authentication failed") ||
error.message.includes("Unauthorized") ||
error.message.includes("Not authenticated") ||
error.message.toLowerCase().includes("401");
if (isAuthError) {
toast({
title: "Authentication error",
description: "Your session may have expired. Please sign in again.",
variant: "destructive",
});
return;
}
// Only reconnect on network errors (not HTTP errors), and never
// reconnect when the user explicitly stopped the stream.
if (isUserStoppingRef.current) return;
const isNetworkError =
error.name === "TypeError" || error.name === "AbortError";
if (isNetworkError) {
handleReconnect(sessionId);
}
},
});
// Deduplicate messages continuously to prevent duplicates when resuming streams
const messages = useMemo(
() => deduplicateMessages(rawMessages),
[rawMessages],
);
// Wrap AI SDK's stop() to also cancel the backend executor task.
// sdkStop() aborts the SSE fetch instantly (UI feedback), then we fire
// the cancel API to actually stop the executor and wait for confirmation.
async function stop() {
isUserStoppingRef.current = true;
sdkStop();
// Resolve pending tool calls and inject a cancellation marker so the UI
// shows "You manually stopped this chat" immediately (the backend writes
// the same marker to the DB, but the SSE connection is already aborted).
// Marker must match COPILOT_ERROR_PREFIX in ChatMessagesContainer/helpers.ts.
setMessages((prev) => {
const resolved = resolveInProgressTools(prev, "cancelled");
const last = resolved[resolved.length - 1];
if (last?.role === "assistant") {
return [
...resolved.slice(0, -1),
{
...last,
parts: [
...last.parts,
{
type: "text" as const,
text: "[__COPILOT_ERROR_f7a1__] Operation cancelled",
},
],
},
];
}
return resolved;
});
if (!sessionId) return;
try {
const res = await postV2CancelSessionTask(sessionId);
if (
res.status === 200 &&
"reason" in res.data &&
res.data.reason === "cancel_published_not_confirmed"
) {
toast({
title: "Stop may take a moment",
description:
"The cancel was sent but not yet confirmed. The task should stop shortly.",
});
}
} catch {
toast({
title: "Could not stop the task",
description: "The task may still be running in the background.",
variant: "destructive",
});
}
}
// Hydrate messages from REST API when not actively streaming
useEffect(() => {
if (!hydratedMessages || hydratedMessages.length === 0) return;
if (status === "streaming" || status === "submitted") return;
if (isReconnectScheduled) return;
setMessages((prev) => {
if (prev.length >= hydratedMessages.length) return prev;
return deduplicateMessages(hydratedMessages);
});
}, [hydratedMessages, setMessages, status, isReconnectScheduled]);
// Track resume state per session
const hasResumedRef = useRef<Map<string, boolean>>(new Map());
// Clean up reconnect state on session switch
useEffect(() => {
clearTimeout(reconnectTimerRef.current);
reconnectTimerRef.current = undefined;
reconnectAttemptsRef.current = 0;
isReconnectScheduledRef.current = false;
setIsReconnectScheduled(false);
hasShownDisconnectToast.current = false;
isUserStoppingRef.current = false;
hasResumedRef.current.clear();
return () => {
clearTimeout(reconnectTimerRef.current);
reconnectTimerRef.current = undefined;
};
}, [sessionId]);
// Invalidate session cache when stream completes
const prevStatusRef = useRef(status);
useEffect(() => {
const prev = prevStatusRef.current;
prevStatusRef.current = status;
const wasActive = prev === "streaming" || prev === "submitted";
const isIdle = status === "ready" || status === "error";
if (wasActive && isIdle && sessionId && !isReconnectScheduled) {
queryClient.invalidateQueries({
queryKey: getGetV2GetSessionQueryKey(sessionId),
});
if (status === "ready") {
reconnectAttemptsRef.current = 0;
hasShownDisconnectToast.current = false;
}
}
}, [status, sessionId, queryClient, isReconnectScheduled]);
// Resume an active stream AFTER hydration completes.
// IMPORTANT: Only runs when page loads with existing active stream (reconnection).
// Does NOT run when new streams start during active conversation.
useEffect(() => {
if (!sessionId) return;
if (!hasActiveStream) return;
if (!hydratedMessages || hydratedMessages.length === 0) return;
// Never resume if currently streaming
if (status === "streaming" || status === "submitted") return;
// Only resume once per session
if (hasResumedRef.current.get(sessionId)) return;
// Don't resume a stream the user just cancelled
if (isUserStoppingRef.current) return;
// Mark as resumed immediately to prevent race conditions
hasResumedRef.current.set(sessionId, true);
// Remove the in-progress assistant message before resuming.
// The backend replays the stream from "0-0", so keeping the hydrated
// version would cause the old parts to overlap with replayed parts.
// Previous turns are preserved; the stream recreates the current turn.
setMessages((prev) => {
if (prev.length > 0 && prev[prev.length - 1].role === "assistant") {
return prev.slice(0, -1);
}
return prev;
});
resumeStream();
}, [
sessionId,
hasActiveStream,
hydratedMessages,
status,
resumeStream,
setMessages,
]);
// Clear messages when session is null
useEffect(() => {
if (!sessionId) setMessages([]);
}, [sessionId, setMessages]);
// Reset the user-stop flag once the backend confirms the stream is no
// longer active — this prevents the flag from staying stale forever.
useEffect(() => {
if (!hasActiveStream && isUserStoppingRef.current) {
isUserStoppingRef.current = false;
}
}, [hasActiveStream]);
// True while reconnecting or backend has active stream but we haven't connected yet.
// Suppressed when the user explicitly stopped — the backend may take a moment
// to clear active_stream but the UI should be responsive immediately.
const isReconnecting =
!isUserStoppingRef.current &&
(isReconnectScheduled ||
(hasActiveStream && status !== "streaming" && status !== "submitted"));
return {
messages,
sendMessage,
stop,
status,
error: isReconnecting || isUserStoppingRef.current ? undefined : error,
isReconnecting,
isUserStoppingRef,
};
}

View File

@@ -3,6 +3,8 @@ import { getServerAuthToken } from "@/lib/autogpt-server-api/helpers";
import { NextRequest } from "next/server";
import { normalizeSSEStream, SSE_HEADERS } from "../../../sse-helpers";
// Legacy SSE proxy fallback. Primary transport is direct backend SSE.
// See useCopilotStream.ts for active transport logic.
export const maxDuration = 800;
const DEBUG_SSE_TIMEOUT_MS = process.env.NEXT_PUBLIC_SSE_TIMEOUT_MS

View File

@@ -180,7 +180,7 @@ const config = {
"accordion-down": "accordion-down 0.2s ease-out",
"accordion-up": "accordion-up 0.2s ease-out",
"fade-in": "fade-in 0.2s ease-out",
shimmer: "shimmer 2s ease-in-out infinite",
shimmer: "shimmer 4s ease-in-out infinite",
loader: "loader 1s infinite",
},
transitionDuration: {