diff --git a/autogpt_platform/backend/backend/api/features/chat/service.py b/autogpt_platform/backend/backend/api/features/chat/service.py index df5ff7a60a..20216162b5 100644 --- a/autogpt_platform/backend/backend/api/features/chat/service.py +++ b/autogpt_platform/backend/backend/api/features/chat/service.py @@ -73,6 +73,90 @@ langfuse = get_client() # Used for idempotency across Kubernetes pods - prevents duplicate executions on browser refresh RUNNING_OPERATION_PREFIX = "chat:running_operation:" +# Default system prompt used when Langfuse is not configured +# This is a snapshot of the "CoPilot Prompt" from Langfuse (version 11) +DEFAULT_SYSTEM_PROMPT = """You are **Otto**, an AI Co-Pilot for AutoGPT and a Forward-Deployed Automation Engineer serving small business owners. Your mission is to help users automate business tasks with AI by delivering tangible value through working automations—not through documentation or lengthy explanations. + +Here is everything you know about the current user from previous interactions: + + +{users_information} + + +## YOUR CORE MANDATE + +You are action-oriented. Your success is measured by: +- **Value Delivery**: Does the user think "wow, that was amazing" or "what was the point"? +- **Demonstrable Proof**: Show working automations, not descriptions of what's possible +- **Time Saved**: Focus on tangible efficiency gains +- **Quality Output**: Deliver results that meet or exceed expectations + +## YOUR WORKFLOW + +Adapt flexibly to the conversation context. Not every interaction requires all stages: + +1. **Explore & Understand**: Learn about the user's business, tasks, and goals. Use `add_understanding` to capture important context that will improve future conversations. + +2. **Assess Automation Potential**: Help the user understand whether and how AI can automate their task. + +3. **Prepare for AI**: Provide brief, actionable guidance on prerequisites (data, access, etc.). + +4. **Discover or Create Agents**: + - **Always check the user's library first** with `find_library_agent` (these may be customized to their needs) + - Search the marketplace with `find_agent` for pre-built automations + - Find reusable components with `find_block` + - Create custom solutions with `create_agent` if nothing suitable exists + - Modify existing library agents with `edit_agent` + +5. **Execute**: Run automations immediately, schedule them, or set up webhooks using `run_agent`. Test specific components with `run_block`. + +6. **Show Results**: Display outputs using `agent_output`. + +## AVAILABLE TOOLS + +**Understanding & Discovery:** +- `add_understanding`: Create a memory about the user's business or use cases for future sessions +- `search_docs`: Search platform documentation for specific technical information +- `get_doc_page`: Retrieve full text of a specific documentation page + +**Agent Discovery:** +- `find_library_agent`: Search the user's existing agents (CHECK HERE FIRST—these may be customized) +- `find_agent`: Search the marketplace for pre-built automations +- `find_block`: Find pre-written code units that perform specific tasks (agents are built from blocks) + +**Agent Creation & Editing:** +- `create_agent`: Create a new automation agent +- `edit_agent`: Modify an agent in the user's library + +**Execution & Output:** +- `run_agent`: Run an agent now, schedule it, or set up a webhook trigger +- `run_block`: Test or run a specific block independently +- `agent_output`: View results from previous agent runs + +## BEHAVIORAL GUIDELINES + +**Be Concise:** +- Target 2-5 short lines maximum +- Make every word count—no repetition or filler +- Use lightweight structure for scannability (bullets, numbered lists, short prompts) +- Avoid jargon (blocks, slugs, cron) unless the user asks + +**Be Proactive:** +- Suggest next steps before being asked +- Anticipate needs based on conversation context and user information +- Look for opportunities to expand scope when relevant +- Reveal capabilities through action, not explanation + +**Use Tools Effectively:** +- Select the right tool for each task +- **Always check `find_library_agent` before searching the marketplace** +- Use `add_understanding` to capture valuable business context +- When tool calls fail, try alternative approaches + +## CRITICAL REMINDER + +You are NOT a chatbot. You are NOT documentation. You are a partner who helps busy business owners get value quickly by showing proof through working automations. Bias toward action over explanation.""" + # Module-level set to hold strong references to background tasks. # This prevents asyncio from garbage collecting tasks before they complete. # Tasks are automatically removed on completion via done_callback. @@ -107,12 +191,6 @@ async def _mark_operation_completed(tool_call_id: str) -> None: logger.warning(f"Failed to delete running operation key {tool_call_id}: {e}") -class LangfuseNotConfiguredError(Exception): - """Raised when Langfuse is required but not configured.""" - - pass - - def _is_langfuse_configured() -> bool: """Check if Langfuse credentials are configured.""" return bool( @@ -120,6 +198,30 @@ def _is_langfuse_configured() -> bool: ) +async def _get_system_prompt_template(context: str) -> str: + """Get the system prompt, trying Langfuse first with fallback to default. + + Args: + context: The user context/information to compile into the prompt. + + Returns: + The compiled system prompt string. + """ + if _is_langfuse_configured(): + try: + # cache_ttl_seconds=0 disables SDK caching to always get the latest prompt + # Use asyncio.to_thread to avoid blocking the event loop + prompt = await asyncio.to_thread( + langfuse.get_prompt, config.langfuse_prompt_name, cache_ttl_seconds=0 + ) + return prompt.compile(users_information=context) + except Exception as e: + logger.warning(f"Failed to fetch prompt from Langfuse, using default: {e}") + + # Fallback to default prompt + return DEFAULT_SYSTEM_PROMPT.format(users_information=context) + + async def _build_system_prompt(user_id: str | None) -> tuple[str, Any]: """Build the full system prompt including business understanding if available. @@ -128,12 +230,8 @@ async def _build_system_prompt(user_id: str | None) -> tuple[str, Any]: If "default" and this is the user's first session, will use "onboarding" instead. Returns: - Tuple of (compiled prompt string, Langfuse prompt object for tracing) + Tuple of (compiled prompt string, business understanding object) """ - - # cache_ttl_seconds=0 disables SDK caching to always get the latest prompt - prompt = langfuse.get_prompt(config.langfuse_prompt_name, cache_ttl_seconds=0) - # If user is authenticated, try to fetch their business understanding understanding = None if user_id: @@ -142,12 +240,13 @@ async def _build_system_prompt(user_id: str | None) -> tuple[str, Any]: except Exception as e: logger.warning(f"Failed to fetch business understanding: {e}") understanding = None + if understanding: context = format_understanding_for_prompt(understanding) else: context = "This is the first time you are meeting the user. Greet them and introduce them to the platform" - compiled = prompt.compile(users_information=context) + compiled = await _get_system_prompt_template(context) return compiled, understanding @@ -255,16 +354,6 @@ async def stream_chat_completion( f"Streaming chat completion for session {session_id} for message {message} and user id {user_id}. Message is user message: {is_user_message}" ) - # Check if Langfuse is configured - required for chat functionality - if not _is_langfuse_configured(): - logger.error("Chat request failed: Langfuse is not configured") - yield StreamError( - errorText="Chat service is not available. Langfuse must be configured " - "with LANGFUSE_PUBLIC_KEY and LANGFUSE_SECRET_KEY environment variables." - ) - yield StreamFinish() - return - # Only fetch from Redis if session not provided (initial call) if session is None: session = await get_chat_session(session_id, user_id) @@ -1480,7 +1569,6 @@ async def _yield_tool_call( tool_name = tool_calls[yield_idx]["function"]["name"] tool_call_id = tool_calls[yield_idx]["id"] - logger.info(f"Yielding tool call: {tool_calls[yield_idx]}") # Parse tool call arguments - handle empty arguments gracefully raw_arguments = tool_calls[yield_idx]["function"]["arguments"] diff --git a/autogpt_platform/frontend/src/components/contextual/Chat/Chat.tsx b/autogpt_platform/frontend/src/components/contextual/Chat/Chat.tsx index d16fd60b98..ada8c26231 100644 --- a/autogpt_platform/frontend/src/components/contextual/Chat/Chat.tsx +++ b/autogpt_platform/frontend/src/components/contextual/Chat/Chat.tsx @@ -35,6 +35,7 @@ export function Chat({ sessionId, createSession, showLoader, + startPollingForOperation, } = useChat({ urlSessionId }); useEffect(() => { @@ -86,6 +87,7 @@ export function Chat({ initialPrompt={initialPrompt} className="flex-1" onStreamingChange={onStreamingChange} + onOperationStarted={startPollingForOperation} /> )} diff --git a/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatContainer/ChatContainer.tsx b/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatContainer/ChatContainer.tsx index f062df1397..dec221338a 100644 --- a/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatContainer/ChatContainer.tsx +++ b/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatContainer/ChatContainer.tsx @@ -16,6 +16,7 @@ export interface ChatContainerProps { initialPrompt?: string; className?: string; onStreamingChange?: (isStreaming: boolean) => void; + onOperationStarted?: () => void; } export function ChatContainer({ @@ -24,6 +25,7 @@ export function ChatContainer({ initialPrompt, className, onStreamingChange, + onOperationStarted, }: ChatContainerProps) { const { messages, @@ -38,6 +40,7 @@ export function ChatContainer({ sessionId, initialMessages, initialPrompt, + onOperationStarted, }); useEffect(() => { diff --git a/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatContainer/handlers.ts b/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatContainer/handlers.ts index f406d33db4..f3cac01f96 100644 --- a/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatContainer/handlers.ts +++ b/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatContainer/handlers.ts @@ -22,6 +22,7 @@ export interface HandlerDependencies { setIsStreamingInitiated: Dispatch>; setIsRegionBlockedModalOpen: Dispatch>; sessionId: string; + onOperationStarted?: () => void; } export function isRegionBlockedError(chunk: StreamChunk): boolean { @@ -163,6 +164,11 @@ export function handleToolResponse( } return; } + // Trigger polling when operation_started is received + if (responseMessage.type === "operation_started") { + deps.onOperationStarted?.(); + } + deps.setMessages((prev) => { const toolCallIndex = prev.findIndex( (msg) => msg.type === "tool_call" && msg.toolId === chunk.tool_id, diff --git a/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatContainer/useChatContainer.ts b/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatContainer/useChatContainer.ts index 83730cc308..46f384d055 100644 --- a/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatContainer/useChatContainer.ts +++ b/autogpt_platform/frontend/src/components/contextual/Chat/components/ChatContainer/useChatContainer.ts @@ -14,16 +14,40 @@ import { processInitialMessages, } from "./helpers"; +// Helper to generate deduplication key for a message +function getMessageKey(msg: ChatMessageData): string { + if (msg.type === "message") { + // Don't include timestamp - dedupe by role + content only + // This handles the case where local and server timestamps differ + // Server messages are authoritative, so duplicates from local state are filtered + return `msg:${msg.role}:${msg.content}`; + } else if (msg.type === "tool_call") { + return `toolcall:${msg.toolId}`; + } else if (msg.type === "tool_response") { + return `toolresponse:${(msg as any).toolId}`; + } else if ( + msg.type === "operation_started" || + msg.type === "operation_pending" || + msg.type === "operation_in_progress" + ) { + return `op:${(msg as any).toolId || (msg as any).operationId || (msg as any).toolCallId || ""}:${msg.toolName}`; + } else { + return `${msg.type}:${JSON.stringify(msg).slice(0, 100)}`; + } +} + interface Args { sessionId: string | null; initialMessages: SessionDetailResponse["messages"]; initialPrompt?: string; + onOperationStarted?: () => void; } export function useChatContainer({ sessionId, initialMessages, initialPrompt, + onOperationStarted, }: Args) { const [messages, setMessages] = useState([]); const [streamingChunks, setStreamingChunks] = useState([]); @@ -73,13 +97,20 @@ export function useChatContainer({ setIsRegionBlockedModalOpen, sessionId, setIsStreamingInitiated, + onOperationStarted, }); setIsStreamingInitiated(true); const skipReplay = initialMessages.length > 0; return subscribeToStream(sessionId, dispatcher, skipReplay); }, - [sessionId, stopStreaming, activeStreams, subscribeToStream], + [ + sessionId, + stopStreaming, + activeStreams, + subscribeToStream, + onOperationStarted, + ], ); // Collect toolIds from completed tool results in initialMessages @@ -130,12 +161,19 @@ export function useChatContainer({ ); // Combine initial messages from backend with local streaming messages, - // then deduplicate to prevent duplicates when polling refreshes initialMessages + // Server messages maintain correct order; only append truly new local messages const allMessages = useMemo(() => { const processedInitial = processInitialMessages(initialMessages); - // Filter local messages to remove operation messages for completed tools - const filteredLocalMessages = messages.filter((msg) => { + // Build a set of keys from server messages for deduplication + const serverKeys = new Set(); + for (const msg of processedInitial) { + serverKeys.add(getMessageKey(msg)); + } + + // Filter local messages: remove duplicates and completed operation messages + const newLocalMessages = messages.filter((msg) => { + // Remove operation messages for completed tools if ( msg.type === "operation_started" || msg.type === "operation_pending" || @@ -143,48 +181,17 @@ export function useChatContainer({ ) { const toolId = (msg as any).toolId || (msg as any).toolCallId; if (toolId && completedToolIds.has(toolId)) { - return false; // Filter out - operation completed + return false; } } - return true; + // Remove messages that already exist in server data + const key = getMessageKey(msg); + return !serverKeys.has(key); }); - const combined = [...processedInitial, ...filteredLocalMessages]; - - // Deduplicate by content+role+timestamp. When initialMessages is refreshed via polling, - // it may contain messages that are also in the local `messages` state. - // Including timestamp prevents dropping legitimate repeated messages (e.g., user sends "yes" twice) - const seen = new Set(); - return combined.filter((msg) => { - // Create a key based on type, role, content, and timestamp for deduplication - let key: string; - if (msg.type === "message") { - // Use timestamp (rounded to nearest second) to allow slight variations - // while still catching true duplicates from SSE/polling overlap - const ts = msg.timestamp - ? Math.floor(new Date(msg.timestamp).getTime() / 1000) - : ""; - key = `msg:${msg.role}:${ts}:${msg.content}`; - } else if (msg.type === "tool_call") { - key = `toolcall:${msg.toolId}`; - } else if ( - msg.type === "operation_started" || - msg.type === "operation_pending" || - msg.type === "operation_in_progress" - ) { - // Dedupe operation messages by toolId or operationId - key = `op:${(msg as any).toolId || (msg as any).operationId || (msg as any).toolCallId || ""}:${msg.toolName}`; - } else { - // For other types, use a combination of type and first few fields - key = `${msg.type}:${JSON.stringify(msg).slice(0, 100)}`; - } - if (seen.has(key)) { - return false; - } - seen.add(key); - return true; - }); - }, [initialMessages, messages]); + // Server messages first (correct order), then new local messages + return [...processedInitial, ...newLocalMessages]; + }, [initialMessages, messages, completedToolIds]); async function sendMessage( content: string, @@ -217,6 +224,7 @@ export function useChatContainer({ setIsRegionBlockedModalOpen, sessionId, setIsStreamingInitiated, + onOperationStarted, }); try { diff --git a/autogpt_platform/frontend/src/components/contextual/Chat/useChat.ts b/autogpt_platform/frontend/src/components/contextual/Chat/useChat.ts index f6b2031059..124301abc4 100644 --- a/autogpt_platform/frontend/src/components/contextual/Chat/useChat.ts +++ b/autogpt_platform/frontend/src/components/contextual/Chat/useChat.ts @@ -26,6 +26,7 @@ export function useChat({ urlSessionId }: UseChatArgs = {}) { claimSession, clearSession: clearSessionBase, loadSession, + startPollingForOperation, } = useChatSession({ urlSessionId, autoCreate: false, @@ -94,5 +95,6 @@ export function useChat({ urlSessionId }: UseChatArgs = {}) { loadSession, sessionId: sessionIdFromHook, showLoader, + startPollingForOperation, }; } diff --git a/autogpt_platform/frontend/src/components/contextual/Chat/useChatSession.ts b/autogpt_platform/frontend/src/components/contextual/Chat/useChatSession.ts index 3fe4f801c6..936a49936c 100644 --- a/autogpt_platform/frontend/src/components/contextual/Chat/useChatSession.ts +++ b/autogpt_platform/frontend/src/components/contextual/Chat/useChatSession.ts @@ -103,9 +103,14 @@ export function useChatSession({ } }, [createError, loadError]); + // Track if we should be polling (set by external callers when they receive operation_started via SSE) + const [forcePolling, setForcePolling] = useState(false); + // Track if we've seen server acknowledge the pending operation (to avoid clearing forcePolling prematurely) + const hasSeenServerPendingRef = useRef(false); + // Check if there are any pending operations in the messages // Must check all operation types: operation_pending, operation_started, operation_in_progress - const hasPendingOperations = useMemo(() => { + const hasPendingOperationsFromServer = useMemo(() => { if (!messages || messages.length === 0) return false; const pendingTypes = new Set([ "operation_pending", @@ -126,6 +131,35 @@ export function useChatSession({ }); }, [messages]); + // Track when server has acknowledged the pending operation + useEffect(() => { + if (hasPendingOperationsFromServer) { + hasSeenServerPendingRef.current = true; + } + }, [hasPendingOperationsFromServer]); + + // Combined: poll if server has pending ops OR if we received operation_started via SSE + const hasPendingOperations = hasPendingOperationsFromServer || forcePolling; + + // Clear forcePolling only after server has acknowledged AND completed the operation + useEffect(() => { + if ( + forcePolling && + !hasPendingOperationsFromServer && + hasSeenServerPendingRef.current + ) { + // Server acknowledged the operation and it's now complete + setForcePolling(false); + hasSeenServerPendingRef.current = false; + } + }, [forcePolling, hasPendingOperationsFromServer]); + + // Function to trigger polling (called when operation_started is received via SSE) + function startPollingForOperation() { + setForcePolling(true); + hasSeenServerPendingRef.current = false; // Reset for new operation + } + // Refresh sessions list when a pending operation completes // (hasPendingOperations transitions from true to false) const prevHasPendingOperationsRef = useRef(hasPendingOperations); @@ -144,7 +178,8 @@ export function useChatSession({ [hasPendingOperations, sessionId, queryClient], ); - // Poll for updates when there are pending operations (long poll - 10s intervals with backoff) + // Poll for updates when there are pending operations + // Backoff: 2s, 4s, 6s, 8s, 10s, ... up to 30s max const pollAttemptRef = useRef(0); const hasPendingOperationsRef = useRef(hasPendingOperations); hasPendingOperationsRef.current = hasPendingOperations; @@ -159,27 +194,17 @@ export function useChatSession({ let cancelled = false; let timeoutId: ReturnType | null = null; - // Calculate delay with exponential backoff: 10s, 15s, 20s, 25s, 30s (max) - const baseDelay = 10000; - const maxDelay = 30000; - function schedule() { - const delay = Math.min( - baseDelay + pollAttemptRef.current * 5000, - maxDelay, - ); + // 2s, 4s, 6s, 8s, 10s, ... 30s (max) + const delay = Math.min((pollAttemptRef.current + 1) * 2000, 30000); timeoutId = setTimeout(async () => { if (cancelled) return; - console.info( - `[useChatSession] Polling for pending operation updates (attempt ${pollAttemptRef.current + 1})`, - ); pollAttemptRef.current += 1; try { await refetch(); } catch (err) { console.error("[useChatSession] Poll failed:", err); } finally { - // Continue polling if still pending and not cancelled if (!cancelled && hasPendingOperationsRef.current) { schedule(); } @@ -329,6 +354,7 @@ export function useChatSession({ refreshSession, claimSession, clearSession, + startPollingForOperation, }; }