{
+ const { sessionId, abortController } = stream;
+
+ try {
+ const url = `/api/chat/sessions/${sessionId}/stream`;
+ const body = JSON.stringify({
+ message,
+ is_user_message: isUserMessage,
+ context: context || null,
+ });
+
+ const response = await fetch(url, {
+ method: "POST",
+ headers: {
+ "Content-Type": "application/json",
+ Accept: "text/event-stream",
+ },
+ body,
+ signal: abortController.signal,
+ });
+
+ if (!response.ok) {
+ const errorText = await response.text();
+ throw new Error(errorText || `HTTP ${response.status}`);
+ }
+
+ if (!response.body) {
+ throw new Error("Response body is null");
+ }
+
+ const reader = response.body.getReader();
+ const decoder = new TextDecoder();
+ let buffer = "";
+
+ while (true) {
+ const { done, value } = await reader.read();
+
+ if (done) {
+ notifySubscribers(stream, { type: "stream_end" });
+ stream.status = "completed";
+ return;
+ }
+
+ buffer += decoder.decode(value, { stream: true });
+ const lines = buffer.split("\n");
+ buffer = lines.pop() || "";
+
+ for (const line of lines) {
+ const data = parseSSELine(line);
+ if (data !== null) {
+ if (data === "[DONE]") {
+ notifySubscribers(stream, { type: "stream_end" });
+ stream.status = "completed";
+ return;
+ }
+
+ try {
+ const rawChunk = JSON.parse(data) as
+ | StreamChunk
+ | VercelStreamChunk;
+ const chunk = normalizeStreamChunk(rawChunk);
+ if (!chunk) continue;
+
+ notifySubscribers(stream, chunk);
+
+ if (chunk.type === "stream_end") {
+ stream.status = "completed";
+ return;
+ }
+
+ if (chunk.type === "error") {
+ stream.status = "error";
+ stream.error = new Error(
+ chunk.message || chunk.content || "Stream error",
+ );
+ return;
+ }
+ } catch (err) {
+ console.warn("[StreamExecutor] Failed to parse SSE chunk:", err);
+ }
+ }
+ }
+ }
+ } catch (err) {
+ if (err instanceof Error && err.name === "AbortError") {
+ notifySubscribers(stream, { type: "stream_end" });
+ stream.status = "completed";
+ return;
+ }
+
+ if (retryCount < MAX_RETRIES) {
+ const retryDelay = INITIAL_RETRY_DELAY * Math.pow(2, retryCount);
+ console.log(
+ `[StreamExecutor] Retrying in ${retryDelay}ms (attempt ${retryCount + 1}/${MAX_RETRIES})`,
+ );
+ await new Promise((resolve) => setTimeout(resolve, retryDelay));
+ return executeStream(
+ stream,
+ message,
+ isUserMessage,
+ context,
+ retryCount + 1,
+ );
+ }
+
+ stream.status = "error";
+ stream.error = err instanceof Error ? err : new Error("Stream failed");
+ notifySubscribers(stream, {
+ type: "error",
+ message: stream.error.message,
+ });
+ }
+}
diff --git a/autogpt_platform/frontend/src/components/contextual/Chat/stream-utils.ts b/autogpt_platform/frontend/src/components/contextual/Chat/stream-utils.ts
new file mode 100644
index 0000000000..4100926e79
--- /dev/null
+++ b/autogpt_platform/frontend/src/components/contextual/Chat/stream-utils.ts
@@ -0,0 +1,84 @@
+import type { ToolArguments, ToolResult } from "@/types/chat";
+import type { StreamChunk, VercelStreamChunk } from "./chat-types";
+
+const LEGACY_STREAM_TYPES = new Set
([
+ "text_chunk",
+ "text_ended",
+ "tool_call",
+ "tool_call_start",
+ "tool_response",
+ "login_needed",
+ "need_login",
+ "credentials_needed",
+ "error",
+ "usage",
+ "stream_end",
+]);
+
+export function isLegacyStreamChunk(
+ chunk: StreamChunk | VercelStreamChunk,
+): chunk is StreamChunk {
+ return LEGACY_STREAM_TYPES.has(chunk.type as StreamChunk["type"]);
+}
+
+export function normalizeStreamChunk(
+ chunk: StreamChunk | VercelStreamChunk,
+): StreamChunk | null {
+ if (isLegacyStreamChunk(chunk)) return chunk;
+
+ switch (chunk.type) {
+ case "text-delta":
+ return { type: "text_chunk", content: chunk.delta };
+ case "text-end":
+ return { type: "text_ended" };
+ case "tool-input-available":
+ return {
+ type: "tool_call_start",
+ tool_id: chunk.toolCallId,
+ tool_name: chunk.toolName,
+ arguments: chunk.input as ToolArguments,
+ };
+ case "tool-output-available":
+ return {
+ type: "tool_response",
+ tool_id: chunk.toolCallId,
+ tool_name: chunk.toolName,
+ result: chunk.output as ToolResult,
+ success: chunk.success ?? true,
+ };
+ case "usage":
+ return {
+ type: "usage",
+ promptTokens: chunk.promptTokens,
+ completionTokens: chunk.completionTokens,
+ totalTokens: chunk.totalTokens,
+ };
+ case "error":
+ return {
+ type: "error",
+ message: chunk.errorText,
+ code: chunk.code,
+ details: chunk.details,
+ };
+ case "finish":
+ return { type: "stream_end" };
+ case "start":
+ case "text-start":
+ return null;
+ case "tool-input-start":
+ return {
+ type: "tool_call_start",
+ tool_id: chunk.toolCallId,
+ tool_name: chunk.toolName,
+ arguments: {},
+ };
+ }
+}
+
+export const MAX_RETRIES = 3;
+export const INITIAL_RETRY_DELAY = 1000;
+
+export function parseSSELine(line: string): string | null {
+ if (line.startsWith("data: ")) return line.slice(6);
+ return null;
+}
diff --git a/autogpt_platform/frontend/src/components/contextual/Chat/useChat.ts b/autogpt_platform/frontend/src/components/contextual/Chat/useChat.ts
index cf629a287c..124301abc4 100644
--- a/autogpt_platform/frontend/src/components/contextual/Chat/useChat.ts
+++ b/autogpt_platform/frontend/src/components/contextual/Chat/useChat.ts
@@ -2,7 +2,6 @@
import { useSupabase } from "@/lib/supabase/hooks/useSupabase";
import { useEffect, useRef, useState } from "react";
-import { toast } from "sonner";
import { useChatSession } from "./useChatSession";
import { useChatStream } from "./useChatStream";
@@ -27,6 +26,7 @@ export function useChat({ urlSessionId }: UseChatArgs = {}) {
claimSession,
clearSession: clearSessionBase,
loadSession,
+ startPollingForOperation,
} = useChatSession({
urlSessionId,
autoCreate: false,
@@ -67,38 +67,16 @@ export function useChat({ urlSessionId }: UseChatArgs = {}) {
],
);
- useEffect(() => {
- if (isLoading || isCreating) {
- const timer = setTimeout(() => {
- setShowLoader(true);
- }, 300);
- return () => clearTimeout(timer);
- } else {
+ useEffect(
+ function showLoaderWithDelay() {
+ if (isLoading || isCreating) {
+ const timer = setTimeout(() => setShowLoader(true), 300);
+ return () => clearTimeout(timer);
+ }
setShowLoader(false);
- }
- }, [isLoading, isCreating]);
-
- useEffect(function monitorNetworkStatus() {
- function handleOnline() {
- toast.success("Connection restored", {
- description: "You're back online",
- });
- }
-
- function handleOffline() {
- toast.error("You're offline", {
- description: "Check your internet connection",
- });
- }
-
- window.addEventListener("online", handleOnline);
- window.addEventListener("offline", handleOffline);
-
- return () => {
- window.removeEventListener("online", handleOnline);
- window.removeEventListener("offline", handleOffline);
- };
- }, []);
+ },
+ [isLoading, isCreating],
+ );
function clearSession() {
clearSessionBase();
@@ -117,5 +95,6 @@ export function useChat({ urlSessionId }: UseChatArgs = {}) {
loadSession,
sessionId: sessionIdFromHook,
showLoader,
+ startPollingForOperation,
};
}
diff --git a/autogpt_platform/frontend/src/components/contextual/Chat/useChatDrawer.ts b/autogpt_platform/frontend/src/components/contextual/Chat/useChatDrawer.ts
deleted file mode 100644
index 62e1a5a569..0000000000
--- a/autogpt_platform/frontend/src/components/contextual/Chat/useChatDrawer.ts
+++ /dev/null
@@ -1,17 +0,0 @@
-"use client";
-
-import { create } from "zustand";
-
-interface ChatDrawerState {
- isOpen: boolean;
- open: () => void;
- close: () => void;
- toggle: () => void;
-}
-
-export const useChatDrawer = create((set) => ({
- isOpen: false,
- open: () => set({ isOpen: true }),
- close: () => set({ isOpen: false }),
- toggle: () => set((state) => ({ isOpen: !state.isOpen })),
-}));
diff --git a/autogpt_platform/frontend/src/components/contextual/Chat/useChatSession.ts b/autogpt_platform/frontend/src/components/contextual/Chat/useChatSession.ts
index 553e348f79..936a49936c 100644
--- a/autogpt_platform/frontend/src/components/contextual/Chat/useChatSession.ts
+++ b/autogpt_platform/frontend/src/components/contextual/Chat/useChatSession.ts
@@ -1,6 +1,7 @@
import {
getGetV2GetSessionQueryKey,
getGetV2GetSessionQueryOptions,
+ getGetV2ListSessionsQueryKey,
postV2CreateSession,
useGetV2GetSession,
usePatchV2SessionAssignUser,
@@ -58,6 +59,7 @@ export function useChatSession({
query: {
enabled: !!sessionId,
select: okData,
+ staleTime: 0,
retry: shouldRetrySessionLoad,
retryDelay: getSessionRetryDelay,
},
@@ -101,6 +103,125 @@ export function useChatSession({
}
}, [createError, loadError]);
+ // Track if we should be polling (set by external callers when they receive operation_started via SSE)
+ const [forcePolling, setForcePolling] = useState(false);
+ // Track if we've seen server acknowledge the pending operation (to avoid clearing forcePolling prematurely)
+ const hasSeenServerPendingRef = useRef(false);
+
+ // Check if there are any pending operations in the messages
+ // Must check all operation types: operation_pending, operation_started, operation_in_progress
+ const hasPendingOperationsFromServer = useMemo(() => {
+ if (!messages || messages.length === 0) return false;
+ const pendingTypes = new Set([
+ "operation_pending",
+ "operation_in_progress",
+ "operation_started",
+ ]);
+ return messages.some((msg) => {
+ if (msg.role !== "tool" || !msg.content) return false;
+ try {
+ const content =
+ typeof msg.content === "string"
+ ? JSON.parse(msg.content)
+ : msg.content;
+ return pendingTypes.has(content?.type);
+ } catch {
+ return false;
+ }
+ });
+ }, [messages]);
+
+ // Track when server has acknowledged the pending operation
+ useEffect(() => {
+ if (hasPendingOperationsFromServer) {
+ hasSeenServerPendingRef.current = true;
+ }
+ }, [hasPendingOperationsFromServer]);
+
+ // Combined: poll if server has pending ops OR if we received operation_started via SSE
+ const hasPendingOperations = hasPendingOperationsFromServer || forcePolling;
+
+ // Clear forcePolling only after server has acknowledged AND completed the operation
+ useEffect(() => {
+ if (
+ forcePolling &&
+ !hasPendingOperationsFromServer &&
+ hasSeenServerPendingRef.current
+ ) {
+ // Server acknowledged the operation and it's now complete
+ setForcePolling(false);
+ hasSeenServerPendingRef.current = false;
+ }
+ }, [forcePolling, hasPendingOperationsFromServer]);
+
+ // Function to trigger polling (called when operation_started is received via SSE)
+ function startPollingForOperation() {
+ setForcePolling(true);
+ hasSeenServerPendingRef.current = false; // Reset for new operation
+ }
+
+ // Refresh sessions list when a pending operation completes
+ // (hasPendingOperations transitions from true to false)
+ const prevHasPendingOperationsRef = useRef(hasPendingOperations);
+ useEffect(
+ function refreshSessionsListOnOperationComplete() {
+ const wasHasPending = prevHasPendingOperationsRef.current;
+ prevHasPendingOperationsRef.current = hasPendingOperations;
+
+ // Only invalidate when transitioning from pending to not pending
+ if (wasHasPending && !hasPendingOperations && sessionId) {
+ queryClient.invalidateQueries({
+ queryKey: getGetV2ListSessionsQueryKey(),
+ });
+ }
+ },
+ [hasPendingOperations, sessionId, queryClient],
+ );
+
+ // Poll for updates when there are pending operations
+ // Backoff: 2s, 4s, 6s, 8s, 10s, ... up to 30s max
+ const pollAttemptRef = useRef(0);
+ const hasPendingOperationsRef = useRef(hasPendingOperations);
+ hasPendingOperationsRef.current = hasPendingOperations;
+
+ useEffect(
+ function pollForPendingOperations() {
+ if (!sessionId || !hasPendingOperations) {
+ pollAttemptRef.current = 0;
+ return;
+ }
+
+ let cancelled = false;
+ let timeoutId: ReturnType | null = null;
+
+ function schedule() {
+ // 2s, 4s, 6s, 8s, 10s, ... 30s (max)
+ const delay = Math.min((pollAttemptRef.current + 1) * 2000, 30000);
+ timeoutId = setTimeout(async () => {
+ if (cancelled) return;
+ pollAttemptRef.current += 1;
+ try {
+ await refetch();
+ } catch (err) {
+ console.error("[useChatSession] Poll failed:", err);
+ } finally {
+ if (!cancelled && hasPendingOperationsRef.current) {
+ schedule();
+ }
+ }
+ }, delay);
+ }
+
+ schedule();
+
+ return () => {
+ cancelled = true;
+ if (timeoutId) clearTimeout(timeoutId);
+ };
+ },
+ [sessionId, hasPendingOperations, refetch],
+ );
+
async function createSession() {
try {
setError(null);
@@ -227,11 +348,13 @@ export function useChatSession({
isCreating,
error,
isSessionNotFound: isNotFoundError(loadError),
+ hasPendingOperations,
createSession,
loadSession,
refreshSession,
claimSession,
clearSession,
+ startPollingForOperation,
};
}
diff --git a/autogpt_platform/frontend/src/components/contextual/Chat/useChatStream.ts b/autogpt_platform/frontend/src/components/contextual/Chat/useChatStream.ts
index 903c19cd30..5a9f637457 100644
--- a/autogpt_platform/frontend/src/components/contextual/Chat/useChatStream.ts
+++ b/autogpt_platform/frontend/src/components/contextual/Chat/useChatStream.ts
@@ -1,543 +1,110 @@
-import type { ToolArguments, ToolResult } from "@/types/chat";
-import { useCallback, useEffect, useRef, useState } from "react";
+"use client";
+
+import { useEffect, useRef, useState } from "react";
import { toast } from "sonner";
+import { useChatStore } from "./chat-store";
+import type { StreamChunk } from "./chat-types";
-const MAX_RETRIES = 3;
-const INITIAL_RETRY_DELAY = 1000;
-
-export interface StreamChunk {
- type:
- | "text_chunk"
- | "text_ended"
- | "tool_call"
- | "tool_call_start"
- | "tool_response"
- | "login_needed"
- | "need_login"
- | "credentials_needed"
- | "error"
- | "usage"
- | "stream_end";
- timestamp?: string;
- content?: string;
- message?: string;
- code?: string;
- details?: Record;
- tool_id?: string;
- tool_name?: string;
- arguments?: ToolArguments;
- result?: ToolResult;
- success?: boolean;
- idx?: number;
- session_id?: string;
- agent_info?: {
- graph_id: string;
- name: string;
- trigger_type: string;
- };
- provider?: string;
- provider_name?: string;
- credential_type?: string;
- scopes?: string[];
- title?: string;
- [key: string]: unknown;
-}
-
-type VercelStreamChunk =
- | { type: "start"; messageId: string }
- | { type: "finish" }
- | { type: "text-start"; id: string }
- | { type: "text-delta"; id: string; delta: string }
- | { type: "text-end"; id: string }
- | { type: "tool-input-start"; toolCallId: string; toolName: string }
- | {
- type: "tool-input-available";
- toolCallId: string;
- toolName: string;
- input: ToolArguments;
- }
- | {
- type: "tool-output-available";
- toolCallId: string;
- toolName?: string;
- output: ToolResult;
- success?: boolean;
- }
- | {
- type: "usage";
- promptTokens: number;
- completionTokens: number;
- totalTokens: number;
- }
- | {
- type: "error";
- errorText: string;
- code?: string;
- details?: Record;
- };
-
-const LEGACY_STREAM_TYPES = new Set([
- "text_chunk",
- "text_ended",
- "tool_call",
- "tool_call_start",
- "tool_response",
- "login_needed",
- "need_login",
- "credentials_needed",
- "error",
- "usage",
- "stream_end",
-]);
-
-function isLegacyStreamChunk(
- chunk: StreamChunk | VercelStreamChunk,
-): chunk is StreamChunk {
- return LEGACY_STREAM_TYPES.has(chunk.type as StreamChunk["type"]);
-}
-
-function normalizeStreamChunk(
- chunk: StreamChunk | VercelStreamChunk,
-): StreamChunk | null {
- if (isLegacyStreamChunk(chunk)) {
- return chunk;
- }
- switch (chunk.type) {
- case "text-delta":
- return { type: "text_chunk", content: chunk.delta };
- case "text-end":
- return { type: "text_ended" };
- case "tool-input-available":
- return {
- type: "tool_call_start",
- tool_id: chunk.toolCallId,
- tool_name: chunk.toolName,
- arguments: chunk.input,
- };
- case "tool-output-available":
- return {
- type: "tool_response",
- tool_id: chunk.toolCallId,
- tool_name: chunk.toolName,
- result: chunk.output,
- success: chunk.success ?? true,
- };
- case "usage":
- return {
- type: "usage",
- promptTokens: chunk.promptTokens,
- completionTokens: chunk.completionTokens,
- totalTokens: chunk.totalTokens,
- };
- case "error":
- return {
- type: "error",
- message: chunk.errorText,
- code: chunk.code,
- details: chunk.details,
- };
- case "finish":
- return { type: "stream_end" };
- case "start":
- case "text-start":
- return null;
- case "tool-input-start":
- const toolInputStart = chunk as Extract<
- VercelStreamChunk,
- { type: "tool-input-start" }
- >;
- return {
- type: "tool_call_start",
- tool_id: toolInputStart.toolCallId,
- tool_name: toolInputStart.toolName,
- arguments: {},
- };
- }
-}
+export type { StreamChunk } from "./chat-types";
export function useChatStream() {
const [isStreaming, setIsStreaming] = useState(false);
const [error, setError] = useState(null);
- const retryCountRef = useRef(0);
- const retryTimeoutRef = useRef(null);
- const abortControllerRef = useRef(null);
const currentSessionIdRef = useRef(null);
- const requestStartTimeRef = useRef(null);
-
- const stopStreaming = useCallback(
- (sessionId?: string, force: boolean = false) => {
- console.log("[useChatStream] stopStreaming called", {
- hasAbortController: !!abortControllerRef.current,
- isAborted: abortControllerRef.current?.signal.aborted,
- currentSessionId: currentSessionIdRef.current,
- requestedSessionId: sessionId,
- requestStartTime: requestStartTimeRef.current,
- timeSinceStart: requestStartTimeRef.current
- ? Date.now() - requestStartTimeRef.current
- : null,
- force,
- stack: new Error().stack,
- });
-
- if (
- sessionId &&
- currentSessionIdRef.current &&
- currentSessionIdRef.current !== sessionId
- ) {
- console.log(
- "[useChatStream] Session changed, aborting previous stream",
- {
- oldSessionId: currentSessionIdRef.current,
- newSessionId: sessionId,
- },
- );
- }
-
- const controller = abortControllerRef.current;
- if (controller) {
- const timeSinceStart = requestStartTimeRef.current
- ? Date.now() - requestStartTimeRef.current
- : null;
-
- if (!force && timeSinceStart !== null && timeSinceStart < 100) {
- console.log(
- "[useChatStream] Request just started (<100ms), skipping abort to prevent race condition",
- {
- timeSinceStart,
- },
- );
- return;
- }
-
- try {
- const signal = controller.signal;
-
- if (
- signal &&
- typeof signal.aborted === "boolean" &&
- !signal.aborted
- ) {
- console.log("[useChatStream] Aborting stream");
- controller.abort();
- } else {
- console.log(
- "[useChatStream] Stream already aborted or signal invalid",
- );
- }
- } catch (error) {
- if (error instanceof Error && error.name === "AbortError") {
- console.log(
- "[useChatStream] AbortError caught (expected during cleanup)",
- );
- } else {
- console.warn("[useChatStream] Error aborting stream:", error);
- }
- } finally {
- abortControllerRef.current = null;
- requestStartTimeRef.current = null;
- }
- }
- if (retryTimeoutRef.current) {
- clearTimeout(retryTimeoutRef.current);
- retryTimeoutRef.current = null;
- }
- setIsStreaming(false);
- },
- [],
+ const onChunkCallbackRef = useRef<((chunk: StreamChunk) => void) | null>(
+ null,
);
+ const stopStream = useChatStore((s) => s.stopStream);
+ const unregisterActiveSession = useChatStore(
+ (s) => s.unregisterActiveSession,
+ );
+ const isSessionActive = useChatStore((s) => s.isSessionActive);
+ const onStreamComplete = useChatStore((s) => s.onStreamComplete);
+ const getCompletedStream = useChatStore((s) => s.getCompletedStream);
+ const registerActiveSession = useChatStore((s) => s.registerActiveSession);
+ const startStream = useChatStore((s) => s.startStream);
+ const getStreamStatus = useChatStore((s) => s.getStreamStatus);
+
+ function stopStreaming(sessionId?: string) {
+ const targetSession = sessionId || currentSessionIdRef.current;
+ if (targetSession) {
+ stopStream(targetSession);
+ unregisterActiveSession(targetSession);
+ }
+ setIsStreaming(false);
+ }
+
useEffect(() => {
- console.log("[useChatStream] Component mounted");
- return () => {
- const sessionIdAtUnmount = currentSessionIdRef.current;
- console.log(
- "[useChatStream] Component unmounting, calling stopStreaming",
- {
- sessionIdAtUnmount,
- },
- );
- stopStreaming(undefined, false);
+ return function cleanup() {
+ const sessionId = currentSessionIdRef.current;
+ if (sessionId && !isSessionActive(sessionId)) {
+ stopStream(sessionId);
+ }
currentSessionIdRef.current = null;
+ onChunkCallbackRef.current = null;
};
- }, [stopStreaming]);
+ }, []);
- const sendMessage = useCallback(
- async (
- sessionId: string,
- message: string,
- onChunk: (chunk: StreamChunk) => void,
- isUserMessage: boolean = true,
- context?: { url: string; content: string },
- isRetry: boolean = false,
- ) => {
- console.log("[useChatStream] sendMessage called", {
- sessionId,
- message: message.substring(0, 50),
- isUserMessage,
- isRetry,
- stack: new Error().stack,
- });
+ useEffect(() => {
+ const unsubscribe = onStreamComplete(
+ function handleStreamComplete(completedSessionId) {
+ if (completedSessionId !== currentSessionIdRef.current) return;
- const previousSessionId = currentSessionIdRef.current;
- stopStreaming(sessionId, true);
- currentSessionIdRef.current = sessionId;
-
- const abortController = new AbortController();
- abortControllerRef.current = abortController;
- requestStartTimeRef.current = Date.now();
- console.log("[useChatStream] Created new AbortController", {
- sessionId,
- previousSessionId,
- requestStartTime: requestStartTimeRef.current,
- });
-
- if (abortController.signal.aborted) {
- console.warn(
- "[useChatStream] AbortController was aborted before request started",
- );
- requestStartTimeRef.current = null;
- return Promise.reject(new Error("Request aborted"));
- }
-
- if (!isRetry) {
- retryCountRef.current = 0;
- }
- setIsStreaming(true);
- setError(null);
-
- try {
- const url = `/api/chat/sessions/${sessionId}/stream`;
- const body = JSON.stringify({
- message,
- is_user_message: isUserMessage,
- context: context || null,
- });
-
- const response = await fetch(url, {
- method: "POST",
- headers: {
- "Content-Type": "application/json",
- Accept: "text/event-stream",
- },
- body,
- signal: abortController.signal,
- });
-
- console.info("[useChatStream] Stream response", {
- sessionId,
- status: response.status,
- ok: response.ok,
- contentType: response.headers.get("content-type"),
- });
-
- if (!response.ok) {
- const errorText = await response.text();
- console.warn("[useChatStream] Stream response error", {
- sessionId,
- status: response.status,
- errorText,
- });
- throw new Error(errorText || `HTTP ${response.status}`);
- }
-
- if (!response.body) {
- console.warn("[useChatStream] Response body is null", { sessionId });
- throw new Error("Response body is null");
- }
-
- const reader = response.body.getReader();
- const decoder = new TextDecoder();
- let buffer = "";
- let receivedChunkCount = 0;
- let firstChunkAt: number | null = null;
- let loggedLineCount = 0;
-
- return new Promise((resolve, reject) => {
- let didDispatchStreamEnd = false;
-
- function dispatchStreamEnd() {
- if (didDispatchStreamEnd) return;
- didDispatchStreamEnd = true;
- onChunk({ type: "stream_end" });
- }
-
- const cleanup = () => {
- reader.cancel().catch(() => {
- // Ignore cancel errors
- });
- };
-
- async function readStream() {
- try {
- while (true) {
- const { done, value } = await reader.read();
-
- if (done) {
- cleanup();
- console.info("[useChatStream] Stream closed", {
- sessionId,
- receivedChunkCount,
- timeSinceStart: requestStartTimeRef.current
- ? Date.now() - requestStartTimeRef.current
- : null,
- });
- dispatchStreamEnd();
- retryCountRef.current = 0;
- stopStreaming();
- resolve();
- return;
- }
-
- buffer += decoder.decode(value, { stream: true });
- const lines = buffer.split("\n");
- buffer = lines.pop() || "";
-
- for (const line of lines) {
- if (line.startsWith("data: ")) {
- const data = line.slice(6);
- if (loggedLineCount < 3) {
- console.info("[useChatStream] Raw stream line", {
- sessionId,
- data:
- data.length > 300 ? `${data.slice(0, 300)}...` : data,
- });
- loggedLineCount += 1;
- }
- if (data === "[DONE]") {
- cleanup();
- console.info("[useChatStream] Stream done marker", {
- sessionId,
- receivedChunkCount,
- timeSinceStart: requestStartTimeRef.current
- ? Date.now() - requestStartTimeRef.current
- : null,
- });
- dispatchStreamEnd();
- retryCountRef.current = 0;
- stopStreaming();
- resolve();
- return;
- }
-
- try {
- const rawChunk = JSON.parse(data) as
- | StreamChunk
- | VercelStreamChunk;
- const chunk = normalizeStreamChunk(rawChunk);
- if (!chunk) {
- continue;
- }
-
- if (!firstChunkAt) {
- firstChunkAt = Date.now();
- console.info("[useChatStream] First stream chunk", {
- sessionId,
- chunkType: chunk.type,
- timeSinceStart: requestStartTimeRef.current
- ? firstChunkAt - requestStartTimeRef.current
- : null,
- });
- }
- receivedChunkCount += 1;
-
- // Call the chunk handler
- onChunk(chunk);
-
- // Handle stream lifecycle
- if (chunk.type === "stream_end") {
- didDispatchStreamEnd = true;
- cleanup();
- console.info("[useChatStream] Stream end chunk", {
- sessionId,
- receivedChunkCount,
- timeSinceStart: requestStartTimeRef.current
- ? Date.now() - requestStartTimeRef.current
- : null,
- });
- retryCountRef.current = 0;
- stopStreaming();
- resolve();
- return;
- } else if (chunk.type === "error") {
- cleanup();
- reject(
- new Error(
- chunk.message || chunk.content || "Stream error",
- ),
- );
- return;
- }
- } catch (err) {
- // Skip invalid JSON lines
- console.warn("Failed to parse SSE chunk:", err, data);
- }
- }
- }
- }
- } catch (err) {
- if (err instanceof Error && err.name === "AbortError") {
- cleanup();
- dispatchStreamEnd();
- stopStreaming();
- resolve();
- return;
- }
-
- const streamError =
- err instanceof Error ? err : new Error("Failed to read stream");
-
- if (retryCountRef.current < MAX_RETRIES) {
- retryCountRef.current += 1;
- const retryDelay =
- INITIAL_RETRY_DELAY * Math.pow(2, retryCountRef.current - 1);
-
- toast.info("Connection interrupted", {
- description: `Retrying in ${retryDelay / 1000} seconds...`,
- });
-
- retryTimeoutRef.current = setTimeout(() => {
- sendMessage(
- sessionId,
- message,
- onChunk,
- isUserMessage,
- context,
- true,
- ).catch((_err) => {
- // Retry failed
- });
- }, retryDelay);
- } else {
- setError(streamError);
- toast.error("Connection Failed", {
- description:
- "Unable to connect to chat service. Please try again.",
- });
- cleanup();
- dispatchStreamEnd();
- retryCountRef.current = 0;
- stopStreaming();
- reject(streamError);
- }
- }
- }
-
- readStream();
- });
- } catch (err) {
- if (err instanceof Error && err.name === "AbortError") {
- setIsStreaming(false);
- return Promise.resolve();
- }
- const streamError =
- err instanceof Error ? err : new Error("Failed to start stream");
- setError(streamError);
setIsStreaming(false);
- throw streamError;
+ const completed = getCompletedStream(completedSessionId);
+ if (completed?.error) {
+ setError(completed.error);
+ }
+ unregisterActiveSession(completedSessionId);
+ },
+ );
+
+ return unsubscribe;
+ }, []);
+
+ async function sendMessage(
+ sessionId: string,
+ message: string,
+ onChunk: (chunk: StreamChunk) => void,
+ isUserMessage: boolean = true,
+ context?: { url: string; content: string },
+ ) {
+ const previousSessionId = currentSessionIdRef.current;
+ if (previousSessionId && previousSessionId !== sessionId) {
+ stopStreaming(previousSessionId);
+ }
+
+ currentSessionIdRef.current = sessionId;
+ onChunkCallbackRef.current = onChunk;
+ setIsStreaming(true);
+ setError(null);
+
+ registerActiveSession(sessionId);
+
+ try {
+ await startStream(sessionId, message, isUserMessage, context, onChunk);
+
+ const status = getStreamStatus(sessionId);
+ if (status === "error") {
+ const completed = getCompletedStream(sessionId);
+ if (completed?.error) {
+ setError(completed.error);
+ toast.error("Connection Failed", {
+ description: "Unable to connect to chat service. Please try again.",
+ });
+ throw completed.error;
+ }
}
- },
- [stopStreaming],
- );
+ } catch (err) {
+ const streamError =
+ err instanceof Error ? err : new Error("Failed to start stream");
+ setError(streamError);
+ throw streamError;
+ } finally {
+ setIsStreaming(false);
+ }
+ }
return {
isStreaming,
diff --git a/autogpt_platform/frontend/src/components/layout/Navbar/components/Wallet/Wallet.tsx b/autogpt_platform/frontend/src/components/layout/Navbar/components/Wallet/Wallet.tsx
index 0a3c7de6c8..4a25c84f92 100644
--- a/autogpt_platform/frontend/src/components/layout/Navbar/components/Wallet/Wallet.tsx
+++ b/autogpt_platform/frontend/src/components/layout/Navbar/components/Wallet/Wallet.tsx
@@ -255,13 +255,18 @@ export function Wallet() {
(notification: WebSocketNotification) => {
if (
notification.type !== "onboarding" ||
- notification.event !== "step_completed" ||
- !walletRef.current
+ notification.event !== "step_completed"
) {
return;
}
- // Only trigger confetti for tasks that are in groups
+ // Always refresh credits when any onboarding step completes
+ fetchCredits();
+
+ // Only trigger confetti for tasks that are in displayed groups
+ if (!walletRef.current) {
+ return;
+ }
const taskIds = groups
.flatMap((group) => group.tasks)
.map((task) => task.id);
@@ -274,7 +279,6 @@ export function Wallet() {
return;
}
- fetchCredits();
party.confetti(walletRef.current, {
count: 30,
spread: 120,
@@ -284,7 +288,7 @@ export function Wallet() {
modules: [fadeOut],
});
},
- [fetchCredits, fadeOut],
+ [fetchCredits, fadeOut, groups],
);
// WebSocket setup for onboarding notifications
diff --git a/autogpt_platform/frontend/src/lib/autogpt-server-api/types.ts b/autogpt_platform/frontend/src/lib/autogpt-server-api/types.ts
index 82c03bc9f1..2d583d2062 100644
--- a/autogpt_platform/frontend/src/lib/autogpt-server-api/types.ts
+++ b/autogpt_platform/frontend/src/lib/autogpt-server-api/types.ts
@@ -1003,6 +1003,7 @@ export type OnboardingStep =
| "AGENT_INPUT"
| "CONGRATS"
// First Wins
+ | "VISIT_COPILOT"
| "GET_RESULTS"
| "MARKETPLACE_VISIT"
| "MARKETPLACE_ADD_AGENT"
diff --git a/autogpt_platform/frontend/src/providers/posthog/posthog-provider.tsx b/autogpt_platform/frontend/src/providers/posthog/posthog-provider.tsx
new file mode 100644
index 0000000000..674f6c55eb
--- /dev/null
+++ b/autogpt_platform/frontend/src/providers/posthog/posthog-provider.tsx
@@ -0,0 +1,72 @@
+"use client";
+
+import { useSupabase } from "@/lib/supabase/hooks/useSupabase";
+import { environment } from "@/services/environment";
+import { PostHogProvider as PHProvider } from "@posthog/react";
+import { usePathname, useSearchParams } from "next/navigation";
+import posthog from "posthog-js";
+import { ReactNode, useEffect, useRef } from "react";
+
+export function PostHogProvider({ children }: { children: ReactNode }) {
+ const isPostHogEnabled = environment.isPostHogEnabled();
+ const postHogCredentials = environment.getPostHogCredentials();
+
+ useEffect(() => {
+ if (postHogCredentials.key) {
+ posthog.init(postHogCredentials.key, {
+ api_host: postHogCredentials.host,
+ defaults: "2025-11-30",
+ capture_pageview: false,
+ capture_pageleave: true,
+ autocapture: true,
+ });
+ }
+ }, []);
+
+ if (!isPostHogEnabled) return <>{children}>;
+
+ return {children};
+}
+
+export function PostHogUserTracker() {
+ const { user, isUserLoading } = useSupabase();
+ const previousUserIdRef = useRef(null);
+ const isPostHogEnabled = environment.isPostHogEnabled();
+
+ useEffect(() => {
+ if (isUserLoading || !isPostHogEnabled) return;
+
+ if (user) {
+ if (previousUserIdRef.current !== user.id) {
+ posthog.identify(user.id, {
+ email: user.email,
+ ...(user.user_metadata?.name && { name: user.user_metadata.name }),
+ });
+ previousUserIdRef.current = user.id;
+ }
+ } else if (previousUserIdRef.current !== null) {
+ posthog.reset();
+ previousUserIdRef.current = null;
+ }
+ }, [user, isUserLoading, isPostHogEnabled]);
+
+ return null;
+}
+
+export function PostHogPageViewTracker() {
+ const pathname = usePathname();
+ const searchParams = useSearchParams();
+ const isPostHogEnabled = environment.isPostHogEnabled();
+
+ useEffect(() => {
+ if (pathname && isPostHogEnabled) {
+ let url = window.origin + pathname;
+ if (searchParams && searchParams.toString()) {
+ url = url + `?${searchParams.toString()}`;
+ }
+ posthog.capture("$pageview", { $current_url: url });
+ }
+ }, [pathname, searchParams, isPostHogEnabled]);
+
+ return null;
+}
diff --git a/autogpt_platform/frontend/src/services/environment/index.ts b/autogpt_platform/frontend/src/services/environment/index.ts
index cdd5b421b5..f19bc417e3 100644
--- a/autogpt_platform/frontend/src/services/environment/index.ts
+++ b/autogpt_platform/frontend/src/services/environment/index.ts
@@ -76,6 +76,13 @@ function getPreviewStealingDev() {
return branch;
}
+function getPostHogCredentials() {
+ return {
+ key: process.env.NEXT_PUBLIC_POSTHOG_KEY,
+ host: process.env.NEXT_PUBLIC_POSTHOG_HOST,
+ };
+}
+
function isProductionBuild() {
return process.env.NODE_ENV === "production";
}
@@ -116,6 +123,13 @@ function areFeatureFlagsEnabled() {
return process.env.NEXT_PUBLIC_LAUNCHDARKLY_ENABLED === "enabled";
}
+function isPostHogEnabled() {
+ const inCloud = isCloud();
+ const key = process.env.NEXT_PUBLIC_POSTHOG_KEY;
+ const host = process.env.NEXT_PUBLIC_POSTHOG_HOST;
+ return inCloud && key && host;
+}
+
export const environment = {
// Generic
getEnvironmentStr,
@@ -128,6 +142,7 @@ export const environment = {
getSupabaseUrl,
getSupabaseAnonKey,
getPreviewStealingDev,
+ getPostHogCredentials,
// Assertions
isServerSide,
isClientSide,
@@ -138,5 +153,6 @@ export const environment = {
isCloud,
isLocal,
isVercelPreview,
+ isPostHogEnabled,
areFeatureFlagsEnabled,
};
diff --git a/autogpt_platform/frontend/src/services/network-status/NetworkStatusMonitor.tsx b/autogpt_platform/frontend/src/services/network-status/NetworkStatusMonitor.tsx
new file mode 100644
index 0000000000..7552bbf78c
--- /dev/null
+++ b/autogpt_platform/frontend/src/services/network-status/NetworkStatusMonitor.tsx
@@ -0,0 +1,8 @@
+"use client";
+
+import { useNetworkStatus } from "./useNetworkStatus";
+
+export function NetworkStatusMonitor() {
+ useNetworkStatus();
+ return null;
+}
diff --git a/autogpt_platform/frontend/src/services/network-status/useNetworkStatus.ts b/autogpt_platform/frontend/src/services/network-status/useNetworkStatus.ts
new file mode 100644
index 0000000000..472a6e0e90
--- /dev/null
+++ b/autogpt_platform/frontend/src/services/network-status/useNetworkStatus.ts
@@ -0,0 +1,28 @@
+"use client";
+
+import { useEffect } from "react";
+import { toast } from "sonner";
+
+export function useNetworkStatus() {
+ useEffect(function monitorNetworkStatus() {
+ function handleOnline() {
+ toast.success("Connection restored", {
+ description: "You're back online",
+ });
+ }
+
+ function handleOffline() {
+ toast.error("You're offline", {
+ description: "Check your internet connection",
+ });
+ }
+
+ window.addEventListener("online", handleOnline);
+ window.addEventListener("offline", handleOffline);
+
+ return function cleanup() {
+ window.removeEventListener("online", handleOnline);
+ window.removeEventListener("offline", handleOffline);
+ };
+ }, []);
+}
diff --git a/autogpt_platform/frontend/src/services/storage/session-storage.ts b/autogpt_platform/frontend/src/services/storage/session-storage.ts
index 8404da571c..1be82c98fb 100644
--- a/autogpt_platform/frontend/src/services/storage/session-storage.ts
+++ b/autogpt_platform/frontend/src/services/storage/session-storage.ts
@@ -3,6 +3,7 @@ import { environment } from "../environment";
export enum SessionKey {
CHAT_SENT_INITIAL_PROMPTS = "chat_sent_initial_prompts",
+ CHAT_INITIAL_PROMPTS = "chat_initial_prompts",
}
function get(key: SessionKey) {
diff --git a/autogpt_platform/frontend/src/tests/pages/login.page.ts b/autogpt_platform/frontend/src/tests/pages/login.page.ts
index 9082cc6219..adcb8d908b 100644
--- a/autogpt_platform/frontend/src/tests/pages/login.page.ts
+++ b/autogpt_platform/frontend/src/tests/pages/login.page.ts
@@ -37,9 +37,13 @@ export class LoginPage {
this.page.on("load", (page) => console.log(`ℹ️ Now at URL: ${page.url()}`));
// Start waiting for navigation before clicking
+ // Wait for redirect to marketplace, onboarding, library, or copilot (new landing pages)
const leaveLoginPage = this.page
.waitForURL(
- (url) => /^\/(marketplace|onboarding(\/.*)?)?$/.test(url.pathname),
+ (url: URL) =>
+ /^\/(marketplace|onboarding(\/.*)?|library|copilot)?$/.test(
+ url.pathname,
+ ),
{ timeout: 10_000 },
)
.catch((reason) => {
diff --git a/autogpt_platform/frontend/src/tests/utils/signup.ts b/autogpt_platform/frontend/src/tests/utils/signup.ts
index 7c8fdbe01b..192a9129b9 100644
--- a/autogpt_platform/frontend/src/tests/utils/signup.ts
+++ b/autogpt_platform/frontend/src/tests/utils/signup.ts
@@ -36,14 +36,16 @@ export async function signupTestUser(
const signupButton = getButton("Sign up");
await signupButton.click();
- // Wait for successful signup - could redirect to onboarding or marketplace
+ // Wait for successful signup - could redirect to various pages depending on onboarding state
try {
- // Wait for either onboarding or marketplace redirect
- await Promise.race([
- page.waitForURL(/\/onboarding/, { timeout: 15000 }),
- page.waitForURL(/\/marketplace/, { timeout: 15000 }),
- ]);
+ // Wait for redirect to onboarding, marketplace, copilot, or library
+ // Use a single waitForURL with a callback to avoid Promise.race race conditions
+ await page.waitForURL(
+ (url: URL) =>
+ /\/(onboarding|marketplace|copilot|library)/.test(url.pathname),
+ { timeout: 15000 },
+ );
} catch (error) {
console.error(
"❌ Timeout waiting for redirect, current URL:",
@@ -54,14 +56,19 @@ export async function signupTestUser(
const currentUrl = page.url();
- // Handle onboarding or marketplace redirect
+ // Handle onboarding redirect if needed
if (currentUrl.includes("/onboarding") && ignoreOnboarding) {
await page.goto("http://localhost:3000/marketplace");
await page.waitForLoadState("domcontentloaded", { timeout: 10000 });
}
- // Verify we're on the expected final page
- if (ignoreOnboarding || currentUrl.includes("/marketplace")) {
+ // Verify we're on an expected final page and user is authenticated
+ if (currentUrl.includes("/copilot") || currentUrl.includes("/library")) {
+ // For copilot/library landing pages, just verify user is authenticated
+ await page
+ .getByTestId("profile-popout-menu-trigger")
+ .waitFor({ state: "visible", timeout: 10000 });
+ } else if (ignoreOnboarding || currentUrl.includes("/marketplace")) {
// Verify we're on marketplace
await page
.getByText(
diff --git a/backend/blocks/video/__init__.py b/backend/blocks/video/__init__.py
deleted file mode 100644
index fd95ef9a58..0000000000
--- a/backend/blocks/video/__init__.py
+++ /dev/null
@@ -1 +0,0 @@
-# Video editing blocks