Compare commits

...

4 Commits

Author SHA1 Message Date
Lluis Agusti
5af6a3ca61 fix(frontend/copilot): clear reconnect timeout on exhaustion, guard against stale fire, remove dead state
Three fixes from review comments:

1. Remove premature `isReconnectScheduledRef` guard from the 30s forced
   timeout callback — the ref is cleared by the reconnect timer before
   `resumeStream()` fires, causing the timeout to bail out while the
   backend is still connecting, leaving the UI stuck in "reconnecting".
   The `sessionEpochRef` check is sufficient to prevent stale fires.

2. Clear `reconnectTimeoutTimerRef` and `reconnectStartedAtRef` as soon
   as the stream transitions to "streaming"/"submitted" — prevents the
   30s timeout from firing and showing a "timed out" toast when the
   reconnect actually succeeded but took a while.

3. Allow resume when hydration completes with an empty message list —
   the `hydratedMessages.length === 0` guard blocked `resumeStream()`
   in the edge case where a just-started turn has no persisted messages
   yet, leaving the UI stuck until the forced timeout.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-16 19:15:29 +07:00
Lluis Agusti
be6eabce75 fix(frontend/copilot): clear reconnect timeout on exhaustion, guard against stale fire, remove dead state
- Clear reconnectTimeoutTimerRef when RECONNECT_MAX_ATTEMPTS exceeded to
  prevent a second conflicting toast from the 30s timeout
- Guard timeout callback with sessionEpoch and isReconnectScheduled checks
  so it won't fire after the stream has already resumed
- Remove unused reconnectTimedOut state variable (dead code)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-16 17:11:31 +07:00
Lluis Agusti
fdfb79530e test(frontend/copilot): add tests for splitReasoningAndResponse and isReasoningToolPart
Covers:
- isReasoningToolPart returns true for search/reasoning tools, false for action tools
- splitReasoningAndResponse: all text (no tools) → all response
- splitReasoningAndResponse: text + reasoning tools + text → proper split
- splitReasoningAndResponse: text + action tools + text → all response (no split)
- splitReasoningAndResponse: mixed reasoning + action tools → only reasoning triggers split
- splitReasoningAndResponse: reasoning tools with no text after → all response
- splitReasoningAndResponse: action tool after response text stays visible

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-16 16:54:16 +07:00
Lluis Agusti
83d25bf18a fix(frontend/copilot): fix streaming reconnect races, hydration ordering, and reasoning split
Three frontend streaming fixes:

1. Split reasoning/response by tool type instead of position — reasoning
   tools (find_block, search_docs, etc.) trigger the split, action tools
   (run_block, run_agent) do not. Prevents agent responses from being
   hidden inside the collapsed reasoning block.

2. Coordinate hydration and resume effects — adds hydrateCompletedRef
   gate so the resume effect waits for hydration to finish before
   reconnecting. Prevents duplicate messages and missing content on
   page reload with an active stream.

3. Fix reconnect race conditions:
   - Add sessionEpochRef counter so stale callbacks from old sessions
     bail out instead of blocking the new session's auto-resume.
   - Add 30s forced timeout on reconnection — if isReconnecting stays
     true for >30s, force the UI back to idle with a toast.
   - Guard onFinish against user-initiated stops to prevent spurious
     reconnect cycles.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-16 16:42:06 +07:00
3 changed files with 287 additions and 34 deletions

View File

@@ -1,5 +1,29 @@
import { describe, expect, it } from "vitest";
import { extractWorkspaceArtifacts, filePartToArtifactRef } from "./helpers";
import {
extractWorkspaceArtifacts,
filePartToArtifactRef,
isReasoningToolPart,
splitReasoningAndResponse,
} from "./helpers";
import type { MessagePart } from "./helpers";
function textPart(text: string): MessagePart {
return { type: "text", text } as MessagePart;
}
function toolPart(
toolName: string,
state: string = "output-available",
): MessagePart {
return {
type: `tool-${toolName}`,
state,
toolCallId: `call-${toolName}`,
toolName,
args: {},
output: "{}",
} as unknown as MessagePart;
}
describe("extractWorkspaceArtifacts", () => {
it("extracts a single workspace:// link with its markdown title", () => {
@@ -101,3 +125,130 @@ describe("filePartToArtifactRef", () => {
expect(overridden?.origin).toBe("agent");
});
});
describe("isReasoningToolPart", () => {
it("returns true for reasoning/search tools", () => {
const reasoningTools = [
"find_block",
"find_agent",
"find_library_agent",
"search_docs",
"get_doc_page",
"search_feature_requests",
"ask_question",
];
for (const name of reasoningTools) {
expect(isReasoningToolPart(toolPart(name))).toBe(true);
}
});
it("returns false for action tools", () => {
const actionTools = [
"run_block",
"run_agent",
"create_agent",
"edit_agent",
"run_mcp_tool",
"schedule_agent",
"continue_run_block",
];
for (const name of actionTools) {
expect(isReasoningToolPart(toolPart(name))).toBe(false);
}
});
it("returns false for text parts", () => {
expect(isReasoningToolPart(textPart("hello"))).toBe(false);
});
});
describe("splitReasoningAndResponse", () => {
it("returns all parts as response when there are no tools", () => {
const parts = [textPart("Hello"), textPart("World")];
const result = splitReasoningAndResponse(parts);
expect(result.reasoning).toEqual([]);
expect(result.response).toEqual(parts);
});
it("splits on reasoning tools — text before goes to reasoning", () => {
const parts = [
textPart("Let me search..."),
toolPart("find_block"),
textPart("Here is your answer"),
];
const result = splitReasoningAndResponse(parts);
expect(result.reasoning).toHaveLength(2);
expect(result.response).toHaveLength(1);
expect((result.response[0] as { text: string }).text).toBe(
"Here is your answer",
);
});
it("does NOT split on action tools — response before run_block stays visible", () => {
const parts = [
textPart("Here is my answer"),
toolPart("run_block"),
textPart("Block finished"),
];
const result = splitReasoningAndResponse(parts);
expect(result.reasoning).toEqual([]);
expect(result.response).toEqual(parts);
});
it("splits only on reasoning tools when both reasoning and action tools are present", () => {
const parts = [
textPart("Planning..."),
toolPart("search_docs"),
textPart("Found it. Running now."),
toolPart("run_block"),
textPart("Done!"),
];
const result = splitReasoningAndResponse(parts);
expect(result.reasoning).toHaveLength(2);
expect(result.response).toHaveLength(3);
expect((result.response[0] as { text: string }).text).toBe(
"Found it. Running now.",
);
});
it("returns all as response when reasoning tools have no text after them", () => {
const parts = [
textPart("Hello"),
toolPart("find_agent"),
toolPart("run_block"),
];
const result = splitReasoningAndResponse(parts);
expect(result.reasoning).toEqual([]);
expect(result.response).toEqual(parts);
});
it("handles multiple reasoning tools correctly", () => {
const parts = [
textPart("Searching..."),
toolPart("find_block"),
textPart("Found one, searching more..."),
toolPart("search_docs"),
textPart("Here are the results"),
];
const result = splitReasoningAndResponse(parts);
expect(result.reasoning).toHaveLength(4);
expect(result.response).toHaveLength(1);
expect((result.response[0] as { text: string }).text).toBe(
"Here are the results",
);
});
it("handles action tool after response text without hiding the response", () => {
const parts = [
toolPart("find_block"),
textPart("I found it! Let me run it."),
toolPart("run_agent"),
];
const result = splitReasoningAndResponse(parts);
expect(result.reasoning).toHaveLength(1);
expect(result.response).toHaveLength(2);
expect((result.response[0] as { text: string }).text).toBe(
"I found it! Let me run it.",
);
});
});

View File

@@ -33,6 +33,20 @@ const CUSTOM_TOOL_TYPES = new Set([
"tool-create_feature_request",
]);
const REASONING_TOOL_TYPES = new Set([
"tool-find_block",
"tool-find_agent",
"tool-find_library_agent",
"tool-search_docs",
"tool-get_doc_page",
"tool-search_feature_requests",
"tool-ask_question",
]);
export function isReasoningToolPart(part: MessagePart): boolean {
return REASONING_TOOL_TYPES.has(part.type);
}
const WORKSPACE_FILE_PATTERN =
/\/api\/proxy\/api\/workspace\/files\/([a-f0-9-]+)\/download/;
const WORKSPACE_URI_PATTERN = /workspace:\/\/([a-f0-9-]+)(?:#([^\s)\]]+))?/g;
@@ -126,22 +140,22 @@ export function splitReasoningAndResponse(parts: MessagePart[]): {
reasoning: MessagePart[];
response: MessagePart[];
} {
const lastToolIndex = parts.findLastIndex((p) => p.type.startsWith("tool-"));
const lastReasoningIndex = parts.findLastIndex((p) => isReasoningToolPart(p));
if (lastToolIndex === -1) {
if (lastReasoningIndex === -1) {
return { reasoning: [], response: parts };
}
const hasResponseAfterTools = parts
.slice(lastToolIndex + 1)
const hasResponseAfterReasoning = parts
.slice(lastReasoningIndex + 1)
.some((p) => p.type === "text");
if (!hasResponseAfterTools) {
if (!hasResponseAfterReasoning) {
return { reasoning: [], response: parts };
}
const rawReasoning = parts.slice(0, lastToolIndex + 1);
const rawResponse = parts.slice(lastToolIndex + 1);
const rawReasoning = parts.slice(0, lastReasoningIndex + 1);
const rawResponse = parts.slice(lastReasoningIndex + 1);
const reasoning: MessagePart[] = [];
const pinnedParts: MessagePart[] = [];

View File

@@ -27,6 +27,9 @@ const RECONNECT_MAX_ATTEMPTS = 3;
/** Minimum time the page must have been hidden to trigger a wake re-sync. */
const WAKE_RESYNC_THRESHOLD_MS = 30_000;
/** Max time (ms) the UI can stay in "reconnecting" state before forcing idle. */
const RECONNECT_MAX_DURATION_MS = 30_000;
interface UseCopilotStreamArgs {
sessionId: string | null;
hydratedMessages: UIMessage[] | undefined;
@@ -122,12 +125,21 @@ export function useCopilotStream({
const [isSyncing, setIsSyncing] = useState(false);
// Tracks the last time the page was hidden — used to detect sleep/wake gaps.
const lastHiddenAtRef = useRef(Date.now());
// Monotonic counter that increments on each session switch — async callbacks
// from old sessions compare their captured epoch to detect staleness.
const sessionEpochRef = useRef(0);
// Timestamp when reconnection first started — used to force timeout.
const reconnectStartedAtRef = useRef<number | null>(null);
// Timer for the forced reconnect timeout.
const reconnectTimeoutTimerRef = useRef<ReturnType<typeof setTimeout>>();
function handleReconnect(sid: string) {
if (isReconnectScheduledRef.current || !sid) return;
const nextAttempt = reconnectAttemptsRef.current + 1;
if (nextAttempt > RECONNECT_MAX_ATTEMPTS) {
clearTimeout(reconnectTimeoutTimerRef.current);
reconnectTimeoutTimerRef.current = undefined;
setReconnectExhausted(true);
toast({
title: "Connection lost",
@@ -137,6 +149,26 @@ export function useCopilotStream({
return;
}
// Track when reconnection first started for the forced timeout.
if (reconnectStartedAtRef.current === null) {
reconnectStartedAtRef.current = Date.now();
// Schedule a forced timeout — if reconnecting takes longer than
// RECONNECT_MAX_DURATION_MS, force the UI back to idle.
clearTimeout(reconnectTimeoutTimerRef.current);
const capturedEpoch = sessionEpochRef.current;
reconnectTimeoutTimerRef.current = setTimeout(() => {
if (sessionEpochRef.current !== capturedEpoch) return;
setReconnectExhausted(true);
reconnectStartedAtRef.current = null;
toast({
title: "Connection timed out",
description:
"AutoPilot may still be working. Refresh to check for updates.",
variant: "destructive",
});
}, RECONNECT_MAX_DURATION_MS);
}
isReconnectScheduledRef.current = true;
setIsReconnectScheduled(true);
reconnectAttemptsRef.current = nextAttempt;
@@ -150,8 +182,12 @@ export function useCopilotStream({
}
const delay = RECONNECT_BASE_DELAY_MS * 2 ** (nextAttempt - 1);
const capturedEpoch = sessionEpochRef.current;
reconnectTimerRef.current = setTimeout(() => {
// Bail if the session switched while the timer was pending.
if (sessionEpochRef.current !== capturedEpoch) return;
isReconnectScheduledRef.current = false;
setIsReconnectScheduled(false);
// Strip the stale in-progress assistant message before resuming —
@@ -184,7 +220,11 @@ export function useCopilotStream({
transport: transport ?? undefined,
onFinish: async ({ isDisconnect, isAbort }) => {
if (isAbort || !sessionId) return;
// User-initiated stops should not trigger reconnection.
if (isUserStoppingRef.current) return;
// The AI SDK rarely sets isDisconnect — treat ANY non-user-initiated
// finish as a potential disconnect when the backend stream is active.
if (isDisconnect) {
handleReconnect(sessionId);
return;
@@ -419,20 +459,35 @@ export function useCopilotStream({
};
}, [refetchSession, setMessages]);
// Hydrate messages from REST API when not actively streaming
// Hydrate messages from REST API when not actively streaming.
// Sets hydrateCompletedRef so the resume effect knows it's safe to proceed.
useEffect(() => {
if (!hydratedMessages || hydratedMessages.length === 0) return;
if (!hydratedMessages) return;
if (status === "streaming" || status === "submitted") return;
if (isReconnectScheduled) return;
setMessages((prev) => {
if (prev.length >= hydratedMessages.length) return prev;
return deduplicateMessages(hydratedMessages);
});
if (hydratedMessages.length > 0) {
setMessages((prev) => {
if (prev.length >= hydratedMessages.length) return prev;
return deduplicateMessages(hydratedMessages);
});
}
hydrateCompletedRef.current = true;
// Flush any resume that was waiting for hydration to finish.
if (pendingResumeRef.current) {
const pendingResume = pendingResumeRef.current;
pendingResumeRef.current = null;
pendingResume();
}
}, [hydratedMessages, setMessages, status, isReconnectScheduled]);
// Track resume state per session
const hasResumedRef = useRef<Map<string, boolean>>(new Map());
// Coordination: hydration must complete before resume fires.
// Prevents duplicate messages / missing content from the two effects racing.
const hydrateCompletedRef = useRef(false);
const pendingResumeRef = useRef<(() => void) | null>(null);
// Clean up reconnect state on session switch.
// Abort the old stream's in-flight fetch and tell the backend to release
// its XREAD listeners immediately (fire-and-forget).
@@ -441,21 +496,24 @@ export function useCopilotStream({
const prevSid = prevStreamSessionRef.current;
prevStreamSessionRef.current = sessionId;
// Increment epoch so stale async callbacks from the old session bail out.
sessionEpochRef.current += 1;
const currentEpoch = sessionEpochRef.current;
const isSwitching = Boolean(prevSid && prevSid !== sessionId);
if (isSwitching) {
// Mark BEFORE stopping so the old stream's async onError (which fires
// after the abort) sees the flag and short-circuits the reconnect path.
// Without this, the AbortError can queue a reconnect against the new
// session's `sessionId` (captured in the fresh onError closure).
isUserStoppingRef.current = true;
sdkStopRef.current();
disconnectSessionStream(prevSid!);
// Schedule the reset as a task (not a microtask) so it runs AFTER the
// aborted fetch's onError has fired — otherwise the new session would
// be stuck with the "user stopping" flag set, preventing auto-resume
// when hydration detects an active backend stream.
// aborted fetch's onError has fired — but verify the epoch hasn't
// changed again (rapid session switches).
setTimeout(() => {
isUserStoppingRef.current = false;
if (sessionEpochRef.current === currentEpoch) {
isUserStoppingRef.current = false;
}
}, 0);
} else {
isUserStoppingRef.current = false;
@@ -463,7 +521,10 @@ export function useCopilotStream({
clearTimeout(reconnectTimerRef.current);
reconnectTimerRef.current = undefined;
clearTimeout(reconnectTimeoutTimerRef.current);
reconnectTimeoutTimerRef.current = undefined;
reconnectAttemptsRef.current = 0;
reconnectStartedAtRef.current = null;
isReconnectScheduledRef.current = false;
setIsReconnectScheduled(false);
setRateLimitMessage(null);
@@ -472,9 +533,13 @@ export function useCopilotStream({
setReconnectExhausted(false);
setIsSyncing(false);
hasResumedRef.current.clear();
hydrateCompletedRef.current = false;
pendingResumeRef.current = null;
return () => {
clearTimeout(reconnectTimerRef.current);
reconnectTimerRef.current = undefined;
clearTimeout(reconnectTimeoutTimerRef.current);
reconnectTimeoutTimerRef.current = undefined;
};
}, [sessionId]);
@@ -485,8 +550,18 @@ export function useCopilotStream({
prevStatusRef.current = status;
const wasActive = prev === "streaming" || prev === "submitted";
const isNowActive = status === "streaming" || status === "submitted";
const isIdle = status === "ready" || status === "error";
// Clear the forced reconnect timeout as soon as the stream resumes —
// otherwise the stale 30s timer can fire mid-stream and show a
// "timed out" toast even though reconnection succeeded.
if (isNowActive && reconnectStartedAtRef.current !== null) {
reconnectStartedAtRef.current = null;
clearTimeout(reconnectTimeoutTimerRef.current);
reconnectTimeoutTimerRef.current = undefined;
}
if (wasActive && isIdle && sessionId && !isReconnectScheduled) {
queryClient.invalidateQueries({
queryKey: getGetV2GetSessionQueryKey(sessionId),
@@ -497,6 +572,9 @@ export function useCopilotStream({
if (status === "ready") {
reconnectAttemptsRef.current = 0;
hasShownDisconnectToast.current = false;
reconnectStartedAtRef.current = null;
clearTimeout(reconnectTimeoutTimerRef.current);
reconnectTimeoutTimerRef.current = undefined;
// Intentionally NOT clearing lastSubmittedMsgRef here: keeping the last
// submitted text prevents getSendSuppressionReason from allowing a
// duplicate POST of the same message immediately after a successful turn
@@ -511,10 +589,11 @@ export function useCopilotStream({
// Resume an active stream AFTER hydration completes.
// IMPORTANT: Only runs when page loads with existing active stream (reconnection).
// Does NOT run when new streams start during active conversation.
// Gated on hydrateCompletedRef to prevent racing with the hydration effect.
useEffect(() => {
if (!sessionId) return;
if (!hasActiveStream) return;
if (!hydratedMessages || hydratedMessages.length === 0) return;
if (!hydratedMessages) return;
// Never resume if currently streaming
if (status === "streaming" || status === "submitted") return;
@@ -525,21 +604,30 @@ export function useCopilotStream({
// Don't resume a stream the user just cancelled
if (isUserStoppingRef.current) return;
// Mark as resumed immediately to prevent race conditions
hasResumedRef.current.set(sessionId, true);
function doResume() {
if (!sessionId || hasResumedRef.current.get(sessionId)) return;
if (isUserStoppingRef.current) return;
// Remove the in-progress assistant message before resuming.
// The backend replays the stream from "0-0", so keeping the hydrated
// version would cause the old parts to overlap with replayed parts.
// Previous turns are preserved; the stream recreates the current turn.
setMessages((prev) => {
if (prev.length > 0 && prev[prev.length - 1].role === "assistant") {
return prev.slice(0, -1);
}
return prev;
});
hasResumedRef.current.set(sessionId, true);
resumeStreamRef.current();
setMessages((prev) => {
if (prev.length > 0 && prev[prev.length - 1].role === "assistant") {
return prev.slice(0, -1);
}
return prev;
});
resumeStreamRef.current();
}
// Wait for hydration to complete before resuming to prevent
// the two effects from racing (duplicate messages / missing content).
if (!hydrateCompletedRef.current) {
pendingResumeRef.current = doResume;
return;
}
doResume();
}, [sessionId, hasActiveStream, hydratedMessages, status, setMessages]);
// Clear messages when session is null