mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-30 03:00:41 -04:00
Compare commits
4 Commits
master
...
claude/nic
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5af6a3ca61 | ||
|
|
be6eabce75 | ||
|
|
fdfb79530e | ||
|
|
83d25bf18a |
@@ -1,5 +1,29 @@
|
||||
import { describe, expect, it } from "vitest";
|
||||
import { extractWorkspaceArtifacts, filePartToArtifactRef } from "./helpers";
|
||||
import {
|
||||
extractWorkspaceArtifacts,
|
||||
filePartToArtifactRef,
|
||||
isReasoningToolPart,
|
||||
splitReasoningAndResponse,
|
||||
} from "./helpers";
|
||||
import type { MessagePart } from "./helpers";
|
||||
|
||||
function textPart(text: string): MessagePart {
|
||||
return { type: "text", text } as MessagePart;
|
||||
}
|
||||
|
||||
function toolPart(
|
||||
toolName: string,
|
||||
state: string = "output-available",
|
||||
): MessagePart {
|
||||
return {
|
||||
type: `tool-${toolName}`,
|
||||
state,
|
||||
toolCallId: `call-${toolName}`,
|
||||
toolName,
|
||||
args: {},
|
||||
output: "{}",
|
||||
} as unknown as MessagePart;
|
||||
}
|
||||
|
||||
describe("extractWorkspaceArtifacts", () => {
|
||||
it("extracts a single workspace:// link with its markdown title", () => {
|
||||
@@ -101,3 +125,130 @@ describe("filePartToArtifactRef", () => {
|
||||
expect(overridden?.origin).toBe("agent");
|
||||
});
|
||||
});
|
||||
|
||||
describe("isReasoningToolPart", () => {
|
||||
it("returns true for reasoning/search tools", () => {
|
||||
const reasoningTools = [
|
||||
"find_block",
|
||||
"find_agent",
|
||||
"find_library_agent",
|
||||
"search_docs",
|
||||
"get_doc_page",
|
||||
"search_feature_requests",
|
||||
"ask_question",
|
||||
];
|
||||
for (const name of reasoningTools) {
|
||||
expect(isReasoningToolPart(toolPart(name))).toBe(true);
|
||||
}
|
||||
});
|
||||
|
||||
it("returns false for action tools", () => {
|
||||
const actionTools = [
|
||||
"run_block",
|
||||
"run_agent",
|
||||
"create_agent",
|
||||
"edit_agent",
|
||||
"run_mcp_tool",
|
||||
"schedule_agent",
|
||||
"continue_run_block",
|
||||
];
|
||||
for (const name of actionTools) {
|
||||
expect(isReasoningToolPart(toolPart(name))).toBe(false);
|
||||
}
|
||||
});
|
||||
|
||||
it("returns false for text parts", () => {
|
||||
expect(isReasoningToolPart(textPart("hello"))).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe("splitReasoningAndResponse", () => {
|
||||
it("returns all parts as response when there are no tools", () => {
|
||||
const parts = [textPart("Hello"), textPart("World")];
|
||||
const result = splitReasoningAndResponse(parts);
|
||||
expect(result.reasoning).toEqual([]);
|
||||
expect(result.response).toEqual(parts);
|
||||
});
|
||||
|
||||
it("splits on reasoning tools — text before goes to reasoning", () => {
|
||||
const parts = [
|
||||
textPart("Let me search..."),
|
||||
toolPart("find_block"),
|
||||
textPart("Here is your answer"),
|
||||
];
|
||||
const result = splitReasoningAndResponse(parts);
|
||||
expect(result.reasoning).toHaveLength(2);
|
||||
expect(result.response).toHaveLength(1);
|
||||
expect((result.response[0] as { text: string }).text).toBe(
|
||||
"Here is your answer",
|
||||
);
|
||||
});
|
||||
|
||||
it("does NOT split on action tools — response before run_block stays visible", () => {
|
||||
const parts = [
|
||||
textPart("Here is my answer"),
|
||||
toolPart("run_block"),
|
||||
textPart("Block finished"),
|
||||
];
|
||||
const result = splitReasoningAndResponse(parts);
|
||||
expect(result.reasoning).toEqual([]);
|
||||
expect(result.response).toEqual(parts);
|
||||
});
|
||||
|
||||
it("splits only on reasoning tools when both reasoning and action tools are present", () => {
|
||||
const parts = [
|
||||
textPart("Planning..."),
|
||||
toolPart("search_docs"),
|
||||
textPart("Found it. Running now."),
|
||||
toolPart("run_block"),
|
||||
textPart("Done!"),
|
||||
];
|
||||
const result = splitReasoningAndResponse(parts);
|
||||
expect(result.reasoning).toHaveLength(2);
|
||||
expect(result.response).toHaveLength(3);
|
||||
expect((result.response[0] as { text: string }).text).toBe(
|
||||
"Found it. Running now.",
|
||||
);
|
||||
});
|
||||
|
||||
it("returns all as response when reasoning tools have no text after them", () => {
|
||||
const parts = [
|
||||
textPart("Hello"),
|
||||
toolPart("find_agent"),
|
||||
toolPart("run_block"),
|
||||
];
|
||||
const result = splitReasoningAndResponse(parts);
|
||||
expect(result.reasoning).toEqual([]);
|
||||
expect(result.response).toEqual(parts);
|
||||
});
|
||||
|
||||
it("handles multiple reasoning tools correctly", () => {
|
||||
const parts = [
|
||||
textPart("Searching..."),
|
||||
toolPart("find_block"),
|
||||
textPart("Found one, searching more..."),
|
||||
toolPart("search_docs"),
|
||||
textPart("Here are the results"),
|
||||
];
|
||||
const result = splitReasoningAndResponse(parts);
|
||||
expect(result.reasoning).toHaveLength(4);
|
||||
expect(result.response).toHaveLength(1);
|
||||
expect((result.response[0] as { text: string }).text).toBe(
|
||||
"Here are the results",
|
||||
);
|
||||
});
|
||||
|
||||
it("handles action tool after response text without hiding the response", () => {
|
||||
const parts = [
|
||||
toolPart("find_block"),
|
||||
textPart("I found it! Let me run it."),
|
||||
toolPart("run_agent"),
|
||||
];
|
||||
const result = splitReasoningAndResponse(parts);
|
||||
expect(result.reasoning).toHaveLength(1);
|
||||
expect(result.response).toHaveLength(2);
|
||||
expect((result.response[0] as { text: string }).text).toBe(
|
||||
"I found it! Let me run it.",
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -33,6 +33,20 @@ const CUSTOM_TOOL_TYPES = new Set([
|
||||
"tool-create_feature_request",
|
||||
]);
|
||||
|
||||
const REASONING_TOOL_TYPES = new Set([
|
||||
"tool-find_block",
|
||||
"tool-find_agent",
|
||||
"tool-find_library_agent",
|
||||
"tool-search_docs",
|
||||
"tool-get_doc_page",
|
||||
"tool-search_feature_requests",
|
||||
"tool-ask_question",
|
||||
]);
|
||||
|
||||
export function isReasoningToolPart(part: MessagePart): boolean {
|
||||
return REASONING_TOOL_TYPES.has(part.type);
|
||||
}
|
||||
|
||||
const WORKSPACE_FILE_PATTERN =
|
||||
/\/api\/proxy\/api\/workspace\/files\/([a-f0-9-]+)\/download/;
|
||||
const WORKSPACE_URI_PATTERN = /workspace:\/\/([a-f0-9-]+)(?:#([^\s)\]]+))?/g;
|
||||
@@ -126,22 +140,22 @@ export function splitReasoningAndResponse(parts: MessagePart[]): {
|
||||
reasoning: MessagePart[];
|
||||
response: MessagePart[];
|
||||
} {
|
||||
const lastToolIndex = parts.findLastIndex((p) => p.type.startsWith("tool-"));
|
||||
const lastReasoningIndex = parts.findLastIndex((p) => isReasoningToolPart(p));
|
||||
|
||||
if (lastToolIndex === -1) {
|
||||
if (lastReasoningIndex === -1) {
|
||||
return { reasoning: [], response: parts };
|
||||
}
|
||||
|
||||
const hasResponseAfterTools = parts
|
||||
.slice(lastToolIndex + 1)
|
||||
const hasResponseAfterReasoning = parts
|
||||
.slice(lastReasoningIndex + 1)
|
||||
.some((p) => p.type === "text");
|
||||
|
||||
if (!hasResponseAfterTools) {
|
||||
if (!hasResponseAfterReasoning) {
|
||||
return { reasoning: [], response: parts };
|
||||
}
|
||||
|
||||
const rawReasoning = parts.slice(0, lastToolIndex + 1);
|
||||
const rawResponse = parts.slice(lastToolIndex + 1);
|
||||
const rawReasoning = parts.slice(0, lastReasoningIndex + 1);
|
||||
const rawResponse = parts.slice(lastReasoningIndex + 1);
|
||||
|
||||
const reasoning: MessagePart[] = [];
|
||||
const pinnedParts: MessagePart[] = [];
|
||||
|
||||
@@ -27,6 +27,9 @@ const RECONNECT_MAX_ATTEMPTS = 3;
|
||||
/** Minimum time the page must have been hidden to trigger a wake re-sync. */
|
||||
const WAKE_RESYNC_THRESHOLD_MS = 30_000;
|
||||
|
||||
/** Max time (ms) the UI can stay in "reconnecting" state before forcing idle. */
|
||||
const RECONNECT_MAX_DURATION_MS = 30_000;
|
||||
|
||||
interface UseCopilotStreamArgs {
|
||||
sessionId: string | null;
|
||||
hydratedMessages: UIMessage[] | undefined;
|
||||
@@ -122,12 +125,21 @@ export function useCopilotStream({
|
||||
const [isSyncing, setIsSyncing] = useState(false);
|
||||
// Tracks the last time the page was hidden — used to detect sleep/wake gaps.
|
||||
const lastHiddenAtRef = useRef(Date.now());
|
||||
// Monotonic counter that increments on each session switch — async callbacks
|
||||
// from old sessions compare their captured epoch to detect staleness.
|
||||
const sessionEpochRef = useRef(0);
|
||||
// Timestamp when reconnection first started — used to force timeout.
|
||||
const reconnectStartedAtRef = useRef<number | null>(null);
|
||||
// Timer for the forced reconnect timeout.
|
||||
const reconnectTimeoutTimerRef = useRef<ReturnType<typeof setTimeout>>();
|
||||
|
||||
function handleReconnect(sid: string) {
|
||||
if (isReconnectScheduledRef.current || !sid) return;
|
||||
|
||||
const nextAttempt = reconnectAttemptsRef.current + 1;
|
||||
if (nextAttempt > RECONNECT_MAX_ATTEMPTS) {
|
||||
clearTimeout(reconnectTimeoutTimerRef.current);
|
||||
reconnectTimeoutTimerRef.current = undefined;
|
||||
setReconnectExhausted(true);
|
||||
toast({
|
||||
title: "Connection lost",
|
||||
@@ -137,6 +149,26 @@ export function useCopilotStream({
|
||||
return;
|
||||
}
|
||||
|
||||
// Track when reconnection first started for the forced timeout.
|
||||
if (reconnectStartedAtRef.current === null) {
|
||||
reconnectStartedAtRef.current = Date.now();
|
||||
// Schedule a forced timeout — if reconnecting takes longer than
|
||||
// RECONNECT_MAX_DURATION_MS, force the UI back to idle.
|
||||
clearTimeout(reconnectTimeoutTimerRef.current);
|
||||
const capturedEpoch = sessionEpochRef.current;
|
||||
reconnectTimeoutTimerRef.current = setTimeout(() => {
|
||||
if (sessionEpochRef.current !== capturedEpoch) return;
|
||||
setReconnectExhausted(true);
|
||||
reconnectStartedAtRef.current = null;
|
||||
toast({
|
||||
title: "Connection timed out",
|
||||
description:
|
||||
"AutoPilot may still be working. Refresh to check for updates.",
|
||||
variant: "destructive",
|
||||
});
|
||||
}, RECONNECT_MAX_DURATION_MS);
|
||||
}
|
||||
|
||||
isReconnectScheduledRef.current = true;
|
||||
setIsReconnectScheduled(true);
|
||||
reconnectAttemptsRef.current = nextAttempt;
|
||||
@@ -150,8 +182,12 @@ export function useCopilotStream({
|
||||
}
|
||||
|
||||
const delay = RECONNECT_BASE_DELAY_MS * 2 ** (nextAttempt - 1);
|
||||
const capturedEpoch = sessionEpochRef.current;
|
||||
|
||||
reconnectTimerRef.current = setTimeout(() => {
|
||||
// Bail if the session switched while the timer was pending.
|
||||
if (sessionEpochRef.current !== capturedEpoch) return;
|
||||
|
||||
isReconnectScheduledRef.current = false;
|
||||
setIsReconnectScheduled(false);
|
||||
// Strip the stale in-progress assistant message before resuming —
|
||||
@@ -184,7 +220,11 @@ export function useCopilotStream({
|
||||
transport: transport ?? undefined,
|
||||
onFinish: async ({ isDisconnect, isAbort }) => {
|
||||
if (isAbort || !sessionId) return;
|
||||
// User-initiated stops should not trigger reconnection.
|
||||
if (isUserStoppingRef.current) return;
|
||||
|
||||
// The AI SDK rarely sets isDisconnect — treat ANY non-user-initiated
|
||||
// finish as a potential disconnect when the backend stream is active.
|
||||
if (isDisconnect) {
|
||||
handleReconnect(sessionId);
|
||||
return;
|
||||
@@ -419,20 +459,35 @@ export function useCopilotStream({
|
||||
};
|
||||
}, [refetchSession, setMessages]);
|
||||
|
||||
// Hydrate messages from REST API when not actively streaming
|
||||
// Hydrate messages from REST API when not actively streaming.
|
||||
// Sets hydrateCompletedRef so the resume effect knows it's safe to proceed.
|
||||
useEffect(() => {
|
||||
if (!hydratedMessages || hydratedMessages.length === 0) return;
|
||||
if (!hydratedMessages) return;
|
||||
if (status === "streaming" || status === "submitted") return;
|
||||
if (isReconnectScheduled) return;
|
||||
setMessages((prev) => {
|
||||
if (prev.length >= hydratedMessages.length) return prev;
|
||||
return deduplicateMessages(hydratedMessages);
|
||||
});
|
||||
if (hydratedMessages.length > 0) {
|
||||
setMessages((prev) => {
|
||||
if (prev.length >= hydratedMessages.length) return prev;
|
||||
return deduplicateMessages(hydratedMessages);
|
||||
});
|
||||
}
|
||||
hydrateCompletedRef.current = true;
|
||||
// Flush any resume that was waiting for hydration to finish.
|
||||
if (pendingResumeRef.current) {
|
||||
const pendingResume = pendingResumeRef.current;
|
||||
pendingResumeRef.current = null;
|
||||
pendingResume();
|
||||
}
|
||||
}, [hydratedMessages, setMessages, status, isReconnectScheduled]);
|
||||
|
||||
// Track resume state per session
|
||||
const hasResumedRef = useRef<Map<string, boolean>>(new Map());
|
||||
|
||||
// Coordination: hydration must complete before resume fires.
|
||||
// Prevents duplicate messages / missing content from the two effects racing.
|
||||
const hydrateCompletedRef = useRef(false);
|
||||
const pendingResumeRef = useRef<(() => void) | null>(null);
|
||||
|
||||
// Clean up reconnect state on session switch.
|
||||
// Abort the old stream's in-flight fetch and tell the backend to release
|
||||
// its XREAD listeners immediately (fire-and-forget).
|
||||
@@ -441,21 +496,24 @@ export function useCopilotStream({
|
||||
const prevSid = prevStreamSessionRef.current;
|
||||
prevStreamSessionRef.current = sessionId;
|
||||
|
||||
// Increment epoch so stale async callbacks from the old session bail out.
|
||||
sessionEpochRef.current += 1;
|
||||
const currentEpoch = sessionEpochRef.current;
|
||||
|
||||
const isSwitching = Boolean(prevSid && prevSid !== sessionId);
|
||||
if (isSwitching) {
|
||||
// Mark BEFORE stopping so the old stream's async onError (which fires
|
||||
// after the abort) sees the flag and short-circuits the reconnect path.
|
||||
// Without this, the AbortError can queue a reconnect against the new
|
||||
// session's `sessionId` (captured in the fresh onError closure).
|
||||
isUserStoppingRef.current = true;
|
||||
sdkStopRef.current();
|
||||
disconnectSessionStream(prevSid!);
|
||||
// Schedule the reset as a task (not a microtask) so it runs AFTER the
|
||||
// aborted fetch's onError has fired — otherwise the new session would
|
||||
// be stuck with the "user stopping" flag set, preventing auto-resume
|
||||
// when hydration detects an active backend stream.
|
||||
// aborted fetch's onError has fired — but verify the epoch hasn't
|
||||
// changed again (rapid session switches).
|
||||
setTimeout(() => {
|
||||
isUserStoppingRef.current = false;
|
||||
if (sessionEpochRef.current === currentEpoch) {
|
||||
isUserStoppingRef.current = false;
|
||||
}
|
||||
}, 0);
|
||||
} else {
|
||||
isUserStoppingRef.current = false;
|
||||
@@ -463,7 +521,10 @@ export function useCopilotStream({
|
||||
|
||||
clearTimeout(reconnectTimerRef.current);
|
||||
reconnectTimerRef.current = undefined;
|
||||
clearTimeout(reconnectTimeoutTimerRef.current);
|
||||
reconnectTimeoutTimerRef.current = undefined;
|
||||
reconnectAttemptsRef.current = 0;
|
||||
reconnectStartedAtRef.current = null;
|
||||
isReconnectScheduledRef.current = false;
|
||||
setIsReconnectScheduled(false);
|
||||
setRateLimitMessage(null);
|
||||
@@ -472,9 +533,13 @@ export function useCopilotStream({
|
||||
setReconnectExhausted(false);
|
||||
setIsSyncing(false);
|
||||
hasResumedRef.current.clear();
|
||||
hydrateCompletedRef.current = false;
|
||||
pendingResumeRef.current = null;
|
||||
return () => {
|
||||
clearTimeout(reconnectTimerRef.current);
|
||||
reconnectTimerRef.current = undefined;
|
||||
clearTimeout(reconnectTimeoutTimerRef.current);
|
||||
reconnectTimeoutTimerRef.current = undefined;
|
||||
};
|
||||
}, [sessionId]);
|
||||
|
||||
@@ -485,8 +550,18 @@ export function useCopilotStream({
|
||||
prevStatusRef.current = status;
|
||||
|
||||
const wasActive = prev === "streaming" || prev === "submitted";
|
||||
const isNowActive = status === "streaming" || status === "submitted";
|
||||
const isIdle = status === "ready" || status === "error";
|
||||
|
||||
// Clear the forced reconnect timeout as soon as the stream resumes —
|
||||
// otherwise the stale 30s timer can fire mid-stream and show a
|
||||
// "timed out" toast even though reconnection succeeded.
|
||||
if (isNowActive && reconnectStartedAtRef.current !== null) {
|
||||
reconnectStartedAtRef.current = null;
|
||||
clearTimeout(reconnectTimeoutTimerRef.current);
|
||||
reconnectTimeoutTimerRef.current = undefined;
|
||||
}
|
||||
|
||||
if (wasActive && isIdle && sessionId && !isReconnectScheduled) {
|
||||
queryClient.invalidateQueries({
|
||||
queryKey: getGetV2GetSessionQueryKey(sessionId),
|
||||
@@ -497,6 +572,9 @@ export function useCopilotStream({
|
||||
if (status === "ready") {
|
||||
reconnectAttemptsRef.current = 0;
|
||||
hasShownDisconnectToast.current = false;
|
||||
reconnectStartedAtRef.current = null;
|
||||
clearTimeout(reconnectTimeoutTimerRef.current);
|
||||
reconnectTimeoutTimerRef.current = undefined;
|
||||
// Intentionally NOT clearing lastSubmittedMsgRef here: keeping the last
|
||||
// submitted text prevents getSendSuppressionReason from allowing a
|
||||
// duplicate POST of the same message immediately after a successful turn
|
||||
@@ -511,10 +589,11 @@ export function useCopilotStream({
|
||||
// Resume an active stream AFTER hydration completes.
|
||||
// IMPORTANT: Only runs when page loads with existing active stream (reconnection).
|
||||
// Does NOT run when new streams start during active conversation.
|
||||
// Gated on hydrateCompletedRef to prevent racing with the hydration effect.
|
||||
useEffect(() => {
|
||||
if (!sessionId) return;
|
||||
if (!hasActiveStream) return;
|
||||
if (!hydratedMessages || hydratedMessages.length === 0) return;
|
||||
if (!hydratedMessages) return;
|
||||
|
||||
// Never resume if currently streaming
|
||||
if (status === "streaming" || status === "submitted") return;
|
||||
@@ -525,21 +604,30 @@ export function useCopilotStream({
|
||||
// Don't resume a stream the user just cancelled
|
||||
if (isUserStoppingRef.current) return;
|
||||
|
||||
// Mark as resumed immediately to prevent race conditions
|
||||
hasResumedRef.current.set(sessionId, true);
|
||||
function doResume() {
|
||||
if (!sessionId || hasResumedRef.current.get(sessionId)) return;
|
||||
if (isUserStoppingRef.current) return;
|
||||
|
||||
// Remove the in-progress assistant message before resuming.
|
||||
// The backend replays the stream from "0-0", so keeping the hydrated
|
||||
// version would cause the old parts to overlap with replayed parts.
|
||||
// Previous turns are preserved; the stream recreates the current turn.
|
||||
setMessages((prev) => {
|
||||
if (prev.length > 0 && prev[prev.length - 1].role === "assistant") {
|
||||
return prev.slice(0, -1);
|
||||
}
|
||||
return prev;
|
||||
});
|
||||
hasResumedRef.current.set(sessionId, true);
|
||||
|
||||
resumeStreamRef.current();
|
||||
setMessages((prev) => {
|
||||
if (prev.length > 0 && prev[prev.length - 1].role === "assistant") {
|
||||
return prev.slice(0, -1);
|
||||
}
|
||||
return prev;
|
||||
});
|
||||
|
||||
resumeStreamRef.current();
|
||||
}
|
||||
|
||||
// Wait for hydration to complete before resuming to prevent
|
||||
// the two effects from racing (duplicate messages / missing content).
|
||||
if (!hydrateCompletedRef.current) {
|
||||
pendingResumeRef.current = doResume;
|
||||
return;
|
||||
}
|
||||
|
||||
doResume();
|
||||
}, [sessionId, hasActiveStream, hydratedMessages, status, setMessages]);
|
||||
|
||||
// Clear messages when session is null
|
||||
|
||||
Reference in New Issue
Block a user