fix(copilot): RPC DataError reconstruction, chat stream reconnection

Fix two issues:

1. RPC DataError deserialization crash: When the database-manager
   returns a 400 for a Prisma DataError/UniqueViolationError, the
   client-side reconstruction crashes because DataError.__init__
   expects a dict but exc.args only contains a string message.
   Wrap the string in the expected dict structure so the exception
   is properly caught by callers (e.g. workspace file overwrites).

2. Chat stream reconnection on page refresh: The AI SDK's built-in
   resume:true fires before message hydration completes, causing
   hydrated messages to overwrite the resumed stream. Replace with
   manual resumeStream() called after hydration + active_stream
   detection. Show the stop button immediately when an active stream
   is detected (isReconnecting flag) and prevent sending new messages
   until reconnected.
This commit is contained in:
Zamil Majdy
2026-02-19 15:50:51 +08:00
parent efb4b3b518
commit ecfe4e6a7a
6 changed files with 98 additions and 5 deletions

View File

@@ -599,6 +599,15 @@ def get_service_client(
if error_response and error_response.type in EXCEPTION_MAPPING:
exception_class = EXCEPTION_MAPPING[error_response.type]
args = error_response.args or [str(e)]
# Prisma DataError subclasses expect a dict `data` arg,
# but RPC serialization only preserves the string message
# from exc.args. Wrap it in the expected structure so
# the constructor doesn't crash on `.get()`.
if issubclass(exception_class, DataError):
msg = str(args[0]) if args else str(e)
raise exception_class({"user_facing_error": {"message": msg}})
raise exception_class(*args)
# Otherwise categorize by HTTP status code

View File

@@ -6,6 +6,7 @@ from unittest.mock import Mock
import httpx
import pytest
from prisma.errors import DataError, UniqueViolationError
from backend.util.service import (
AppService,
@@ -447,6 +448,39 @@ class TestHTTPErrorRetryBehavior:
assert "Invalid parameter value" in str(exc_info.value)
def test_prisma_data_error_reconstructed_correctly(self):
"""Test that DataError subclasses (e.g. UniqueViolationError) are
reconstructed without crashing.
Prisma's DataError.__init__ expects a dict `data` arg with
a 'user_facing_error' key. RPC serialization only preserves the
string message via exc.args, so the client must wrap it in the
expected dict structure.
"""
for exc_type in [DataError, UniqueViolationError]:
mock_response = Mock()
mock_response.status_code = 400
mock_response.json.return_value = {
"type": exc_type.__name__,
"args": ["Unique constraint failed on the fields: (`path`)"],
}
mock_response.raise_for_status.side_effect = httpx.HTTPStatusError(
"400 Bad Request", request=Mock(), response=mock_response
)
client = get_service_client(ServiceTestClient)
with pytest.raises(exc_type) as exc_info:
client._handle_call_method_response( # type: ignore[attr-defined]
response=mock_response, method_name="test_method"
)
# The exception should have the message preserved
assert "Unique constraint" in str(exc_info.value)
# And should have the expected data structure (not crash)
assert hasattr(exc_info.value, "data")
assert isinstance(exc_info.value.data, dict)
def test_client_error_status_codes_coverage(self):
"""Test that various 4xx status codes are all wrapped as HTTPClientError."""
client_error_codes = [400, 401, 403, 404, 405, 409, 422, 429]

View File

@@ -23,6 +23,7 @@ export function CopilotPage() {
status,
error,
stop,
isReconnecting,
createSession,
onSend,
isLoadingSession,
@@ -71,6 +72,7 @@ export function CopilotPage() {
sessionId={sessionId}
isLoadingSession={isLoadingSession}
isCreatingSession={isCreatingSession}
isReconnecting={isReconnecting}
onCreateSession={createSession}
onSend={onSend}
onStop={stop}

View File

@@ -14,6 +14,8 @@ export interface ChatContainerProps {
sessionId: string | null;
isLoadingSession: boolean;
isCreatingSession: boolean;
/** True when backend has an active stream but we haven't reconnected yet. */
isReconnecting?: boolean;
onCreateSession: () => void | Promise<string>;
onSend: (message: string) => void | Promise<void>;
onStop: () => void;
@@ -26,11 +28,13 @@ export const ChatContainer = ({
sessionId,
isLoadingSession,
isCreatingSession,
isReconnecting,
onCreateSession,
onSend,
onStop,
headerSlot,
}: ChatContainerProps) => {
const isBusy = status === "streaming" || !!isReconnecting;
const inputLayoutId = "copilot-2-chat-input";
return (
@@ -56,8 +60,8 @@ export const ChatContainer = ({
<ChatInput
inputId="chat-input-session"
onSend={onSend}
disabled={status === "streaming"}
isStreaming={status === "streaming"}
disabled={isBusy}
isStreaming={isBusy}
onStop={onStop}
placeholder="What else can I help with?"
/>

View File

@@ -50,6 +50,14 @@ export function useChatSession() {
);
}, [sessionQuery.data, sessionId]);
// Expose active_stream info so the caller can trigger manual resume
// after hydration completes (rather than relying on AI SDK's built-in
// resume which fires before hydration).
const hasActiveStream = useMemo(() => {
if (sessionQuery.data?.status !== 200) return false;
return !!sessionQuery.data.data.active_stream;
}, [sessionQuery.data]);
const { mutateAsync: createSessionMutation, isPending: isCreatingSession } =
usePostV2CreateSession({
mutation: {
@@ -102,6 +110,7 @@ export function useChatSession() {
sessionId,
setSessionId,
hydratedMessages,
hasActiveStream,
isLoadingSession: sessionQuery.isLoading,
createSession,
isCreatingSession,

View File

@@ -29,6 +29,7 @@ export function useCopilotPage() {
sessionId,
setSessionId,
hydratedMessages,
hasActiveStream,
isLoadingSession,
createSession,
isCreatingSession,
@@ -91,10 +92,20 @@ export function useCopilotPage() {
[sessionId],
);
const { messages, sendMessage, stop, status, error, setMessages } = useChat({
const {
messages,
sendMessage,
stop,
status,
error,
setMessages,
resumeStream,
} = useChat({
id: sessionId ?? undefined,
transport: transport ?? undefined,
resume: true,
// Don't use resume: true — it fires before hydration completes, causing
// the hydrated messages to overwrite the resumed stream. Instead we
// call resumeStream() manually after hydration + active_stream detection.
});
// Abort the stream if the backend doesn't start sending data within 12s.
@@ -115,13 +126,31 @@ export function useCopilotPage() {
return () => clearTimeout(timer);
}, [status]);
// Hydrate messages from the REST session endpoint.
// Skip hydration while streaming to avoid overwriting the live stream.
useEffect(() => {
if (!hydratedMessages || hydratedMessages.length === 0) return;
if (status === "streaming" || status === "submitted") return;
setMessages((prev) => {
if (prev.length >= hydratedMessages.length) return prev;
return hydratedMessages;
});
}, [hydratedMessages, setMessages]);
}, [hydratedMessages, setMessages, status]);
// Resume an active stream AFTER hydration completes.
// The backend returns active_stream info when a task is still running.
// We wait for hydration so the AI SDK has the conversation history
// before the resumed stream appends the in-progress assistant message.
const hasResumedRef = useRef<string | null>(null);
useEffect(() => {
if (!hasActiveStream || !sessionId) return;
if (!hydratedMessages || hydratedMessages.length === 0) return;
if (status === "streaming" || status === "submitted") return;
// Only resume once per session to avoid re-triggering after stream ends
if (hasResumedRef.current === sessionId) return;
hasResumedRef.current = sessionId;
resumeStream();
}, [hasActiveStream, sessionId, hydratedMessages, status, resumeStream]);
// Poll session endpoint when a long-running tool (create_agent, edit_agent)
// is in progress. When the backend completes, the session data will contain
@@ -204,12 +233,18 @@ export function useCopilotPage() {
}
}, [isDeleting]);
// True while we know the backend has an active stream but haven't
// reconnected yet. Used to disable the send button and show stop UI.
const isReconnecting =
hasActiveStream && status !== "streaming" && status !== "submitted";
return {
sessionId,
messages,
status,
error,
stop,
isReconnecting,
isLoadingSession,
isCreatingSession,
isUserLoading,