Merge branch 'dev' into open-2967-disable-onboarding-redirects

This commit is contained in:
Nicholas Tindle
2026-01-27 21:11:35 -06:00
committed by GitHub
7 changed files with 214 additions and 79 deletions

View File

@@ -73,6 +73,90 @@ langfuse = get_client()
# Used for idempotency across Kubernetes pods - prevents duplicate executions on browser refresh
RUNNING_OPERATION_PREFIX = "chat:running_operation:"
# Default system prompt used when Langfuse is not configured
# This is a snapshot of the "CoPilot Prompt" from Langfuse (version 11)
DEFAULT_SYSTEM_PROMPT = """You are **Otto**, an AI Co-Pilot for AutoGPT and a Forward-Deployed Automation Engineer serving small business owners. Your mission is to help users automate business tasks with AI by delivering tangible value through working automations—not through documentation or lengthy explanations.
Here is everything you know about the current user from previous interactions:
<users_information>
{users_information}
</users_information>
## YOUR CORE MANDATE
You are action-oriented. Your success is measured by:
- **Value Delivery**: Does the user think "wow, that was amazing" or "what was the point"?
- **Demonstrable Proof**: Show working automations, not descriptions of what's possible
- **Time Saved**: Focus on tangible efficiency gains
- **Quality Output**: Deliver results that meet or exceed expectations
## YOUR WORKFLOW
Adapt flexibly to the conversation context. Not every interaction requires all stages:
1. **Explore & Understand**: Learn about the user's business, tasks, and goals. Use `add_understanding` to capture important context that will improve future conversations.
2. **Assess Automation Potential**: Help the user understand whether and how AI can automate their task.
3. **Prepare for AI**: Provide brief, actionable guidance on prerequisites (data, access, etc.).
4. **Discover or Create Agents**:
- **Always check the user's library first** with `find_library_agent` (these may be customized to their needs)
- Search the marketplace with `find_agent` for pre-built automations
- Find reusable components with `find_block`
- Create custom solutions with `create_agent` if nothing suitable exists
- Modify existing library agents with `edit_agent`
5. **Execute**: Run automations immediately, schedule them, or set up webhooks using `run_agent`. Test specific components with `run_block`.
6. **Show Results**: Display outputs using `agent_output`.
## AVAILABLE TOOLS
**Understanding & Discovery:**
- `add_understanding`: Create a memory about the user's business or use cases for future sessions
- `search_docs`: Search platform documentation for specific technical information
- `get_doc_page`: Retrieve full text of a specific documentation page
**Agent Discovery:**
- `find_library_agent`: Search the user's existing agents (CHECK HERE FIRST—these may be customized)
- `find_agent`: Search the marketplace for pre-built automations
- `find_block`: Find pre-written code units that perform specific tasks (agents are built from blocks)
**Agent Creation & Editing:**
- `create_agent`: Create a new automation agent
- `edit_agent`: Modify an agent in the user's library
**Execution & Output:**
- `run_agent`: Run an agent now, schedule it, or set up a webhook trigger
- `run_block`: Test or run a specific block independently
- `agent_output`: View results from previous agent runs
## BEHAVIORAL GUIDELINES
**Be Concise:**
- Target 2-5 short lines maximum
- Make every word count—no repetition or filler
- Use lightweight structure for scannability (bullets, numbered lists, short prompts)
- Avoid jargon (blocks, slugs, cron) unless the user asks
**Be Proactive:**
- Suggest next steps before being asked
- Anticipate needs based on conversation context and user information
- Look for opportunities to expand scope when relevant
- Reveal capabilities through action, not explanation
**Use Tools Effectively:**
- Select the right tool for each task
- **Always check `find_library_agent` before searching the marketplace**
- Use `add_understanding` to capture valuable business context
- When tool calls fail, try alternative approaches
## CRITICAL REMINDER
You are NOT a chatbot. You are NOT documentation. You are a partner who helps busy business owners get value quickly by showing proof through working automations. Bias toward action over explanation."""
# Module-level set to hold strong references to background tasks.
# This prevents asyncio from garbage collecting tasks before they complete.
# Tasks are automatically removed on completion via done_callback.
@@ -107,12 +191,6 @@ async def _mark_operation_completed(tool_call_id: str) -> None:
logger.warning(f"Failed to delete running operation key {tool_call_id}: {e}")
class LangfuseNotConfiguredError(Exception):
"""Raised when Langfuse is required but not configured."""
pass
def _is_langfuse_configured() -> bool:
"""Check if Langfuse credentials are configured."""
return bool(
@@ -120,6 +198,30 @@ def _is_langfuse_configured() -> bool:
)
async def _get_system_prompt_template(context: str) -> str:
"""Get the system prompt, trying Langfuse first with fallback to default.
Args:
context: The user context/information to compile into the prompt.
Returns:
The compiled system prompt string.
"""
if _is_langfuse_configured():
try:
# cache_ttl_seconds=0 disables SDK caching to always get the latest prompt
# Use asyncio.to_thread to avoid blocking the event loop
prompt = await asyncio.to_thread(
langfuse.get_prompt, config.langfuse_prompt_name, cache_ttl_seconds=0
)
return prompt.compile(users_information=context)
except Exception as e:
logger.warning(f"Failed to fetch prompt from Langfuse, using default: {e}")
# Fallback to default prompt
return DEFAULT_SYSTEM_PROMPT.format(users_information=context)
async def _build_system_prompt(user_id: str | None) -> tuple[str, Any]:
"""Build the full system prompt including business understanding if available.
@@ -128,12 +230,8 @@ async def _build_system_prompt(user_id: str | None) -> tuple[str, Any]:
If "default" and this is the user's first session, will use "onboarding" instead.
Returns:
Tuple of (compiled prompt string, Langfuse prompt object for tracing)
Tuple of (compiled prompt string, business understanding object)
"""
# cache_ttl_seconds=0 disables SDK caching to always get the latest prompt
prompt = langfuse.get_prompt(config.langfuse_prompt_name, cache_ttl_seconds=0)
# If user is authenticated, try to fetch their business understanding
understanding = None
if user_id:
@@ -142,12 +240,13 @@ async def _build_system_prompt(user_id: str | None) -> tuple[str, Any]:
except Exception as e:
logger.warning(f"Failed to fetch business understanding: {e}")
understanding = None
if understanding:
context = format_understanding_for_prompt(understanding)
else:
context = "This is the first time you are meeting the user. Greet them and introduce them to the platform"
compiled = prompt.compile(users_information=context)
compiled = await _get_system_prompt_template(context)
return compiled, understanding
@@ -255,16 +354,6 @@ async def stream_chat_completion(
f"Streaming chat completion for session {session_id} for message {message} and user id {user_id}. Message is user message: {is_user_message}"
)
# Check if Langfuse is configured - required for chat functionality
if not _is_langfuse_configured():
logger.error("Chat request failed: Langfuse is not configured")
yield StreamError(
errorText="Chat service is not available. Langfuse must be configured "
"with LANGFUSE_PUBLIC_KEY and LANGFUSE_SECRET_KEY environment variables."
)
yield StreamFinish()
return
# Only fetch from Redis if session not provided (initial call)
if session is None:
session = await get_chat_session(session_id, user_id)
@@ -1480,7 +1569,6 @@ async def _yield_tool_call(
tool_name = tool_calls[yield_idx]["function"]["name"]
tool_call_id = tool_calls[yield_idx]["id"]
logger.info(f"Yielding tool call: {tool_calls[yield_idx]}")
# Parse tool call arguments - handle empty arguments gracefully
raw_arguments = tool_calls[yield_idx]["function"]["arguments"]

View File

@@ -35,6 +35,7 @@ export function Chat({
sessionId,
createSession,
showLoader,
startPollingForOperation,
} = useChat({ urlSessionId });
useEffect(() => {
@@ -86,6 +87,7 @@ export function Chat({
initialPrompt={initialPrompt}
className="flex-1"
onStreamingChange={onStreamingChange}
onOperationStarted={startPollingForOperation}
/>
)}
</main>

View File

@@ -16,6 +16,7 @@ export interface ChatContainerProps {
initialPrompt?: string;
className?: string;
onStreamingChange?: (isStreaming: boolean) => void;
onOperationStarted?: () => void;
}
export function ChatContainer({
@@ -24,6 +25,7 @@ export function ChatContainer({
initialPrompt,
className,
onStreamingChange,
onOperationStarted,
}: ChatContainerProps) {
const {
messages,
@@ -38,6 +40,7 @@ export function ChatContainer({
sessionId,
initialMessages,
initialPrompt,
onOperationStarted,
});
useEffect(() => {

View File

@@ -22,6 +22,7 @@ export interface HandlerDependencies {
setIsStreamingInitiated: Dispatch<SetStateAction<boolean>>;
setIsRegionBlockedModalOpen: Dispatch<SetStateAction<boolean>>;
sessionId: string;
onOperationStarted?: () => void;
}
export function isRegionBlockedError(chunk: StreamChunk): boolean {
@@ -163,6 +164,11 @@ export function handleToolResponse(
}
return;
}
// Trigger polling when operation_started is received
if (responseMessage.type === "operation_started") {
deps.onOperationStarted?.();
}
deps.setMessages((prev) => {
const toolCallIndex = prev.findIndex(
(msg) => msg.type === "tool_call" && msg.toolId === chunk.tool_id,

View File

@@ -14,16 +14,40 @@ import {
processInitialMessages,
} from "./helpers";
// Helper to generate deduplication key for a message
function getMessageKey(msg: ChatMessageData): string {
if (msg.type === "message") {
// Don't include timestamp - dedupe by role + content only
// This handles the case where local and server timestamps differ
// Server messages are authoritative, so duplicates from local state are filtered
return `msg:${msg.role}:${msg.content}`;
} else if (msg.type === "tool_call") {
return `toolcall:${msg.toolId}`;
} else if (msg.type === "tool_response") {
return `toolresponse:${(msg as any).toolId}`;
} else if (
msg.type === "operation_started" ||
msg.type === "operation_pending" ||
msg.type === "operation_in_progress"
) {
return `op:${(msg as any).toolId || (msg as any).operationId || (msg as any).toolCallId || ""}:${msg.toolName}`;
} else {
return `${msg.type}:${JSON.stringify(msg).slice(0, 100)}`;
}
}
interface Args {
sessionId: string | null;
initialMessages: SessionDetailResponse["messages"];
initialPrompt?: string;
onOperationStarted?: () => void;
}
export function useChatContainer({
sessionId,
initialMessages,
initialPrompt,
onOperationStarted,
}: Args) {
const [messages, setMessages] = useState<ChatMessageData[]>([]);
const [streamingChunks, setStreamingChunks] = useState<string[]>([]);
@@ -73,13 +97,20 @@ export function useChatContainer({
setIsRegionBlockedModalOpen,
sessionId,
setIsStreamingInitiated,
onOperationStarted,
});
setIsStreamingInitiated(true);
const skipReplay = initialMessages.length > 0;
return subscribeToStream(sessionId, dispatcher, skipReplay);
},
[sessionId, stopStreaming, activeStreams, subscribeToStream],
[
sessionId,
stopStreaming,
activeStreams,
subscribeToStream,
onOperationStarted,
],
);
// Collect toolIds from completed tool results in initialMessages
@@ -130,12 +161,19 @@ export function useChatContainer({
);
// Combine initial messages from backend with local streaming messages,
// then deduplicate to prevent duplicates when polling refreshes initialMessages
// Server messages maintain correct order; only append truly new local messages
const allMessages = useMemo(() => {
const processedInitial = processInitialMessages(initialMessages);
// Filter local messages to remove operation messages for completed tools
const filteredLocalMessages = messages.filter((msg) => {
// Build a set of keys from server messages for deduplication
const serverKeys = new Set<string>();
for (const msg of processedInitial) {
serverKeys.add(getMessageKey(msg));
}
// Filter local messages: remove duplicates and completed operation messages
const newLocalMessages = messages.filter((msg) => {
// Remove operation messages for completed tools
if (
msg.type === "operation_started" ||
msg.type === "operation_pending" ||
@@ -143,48 +181,17 @@ export function useChatContainer({
) {
const toolId = (msg as any).toolId || (msg as any).toolCallId;
if (toolId && completedToolIds.has(toolId)) {
return false; // Filter out - operation completed
return false;
}
}
return true;
// Remove messages that already exist in server data
const key = getMessageKey(msg);
return !serverKeys.has(key);
});
const combined = [...processedInitial, ...filteredLocalMessages];
// Deduplicate by content+role+timestamp. When initialMessages is refreshed via polling,
// it may contain messages that are also in the local `messages` state.
// Including timestamp prevents dropping legitimate repeated messages (e.g., user sends "yes" twice)
const seen = new Set<string>();
return combined.filter((msg) => {
// Create a key based on type, role, content, and timestamp for deduplication
let key: string;
if (msg.type === "message") {
// Use timestamp (rounded to nearest second) to allow slight variations
// while still catching true duplicates from SSE/polling overlap
const ts = msg.timestamp
? Math.floor(new Date(msg.timestamp).getTime() / 1000)
: "";
key = `msg:${msg.role}:${ts}:${msg.content}`;
} else if (msg.type === "tool_call") {
key = `toolcall:${msg.toolId}`;
} else if (
msg.type === "operation_started" ||
msg.type === "operation_pending" ||
msg.type === "operation_in_progress"
) {
// Dedupe operation messages by toolId or operationId
key = `op:${(msg as any).toolId || (msg as any).operationId || (msg as any).toolCallId || ""}:${msg.toolName}`;
} else {
// For other types, use a combination of type and first few fields
key = `${msg.type}:${JSON.stringify(msg).slice(0, 100)}`;
}
if (seen.has(key)) {
return false;
}
seen.add(key);
return true;
});
}, [initialMessages, messages]);
// Server messages first (correct order), then new local messages
return [...processedInitial, ...newLocalMessages];
}, [initialMessages, messages, completedToolIds]);
async function sendMessage(
content: string,
@@ -217,6 +224,7 @@ export function useChatContainer({
setIsRegionBlockedModalOpen,
sessionId,
setIsStreamingInitiated,
onOperationStarted,
});
try {

View File

@@ -26,6 +26,7 @@ export function useChat({ urlSessionId }: UseChatArgs = {}) {
claimSession,
clearSession: clearSessionBase,
loadSession,
startPollingForOperation,
} = useChatSession({
urlSessionId,
autoCreate: false,
@@ -94,5 +95,6 @@ export function useChat({ urlSessionId }: UseChatArgs = {}) {
loadSession,
sessionId: sessionIdFromHook,
showLoader,
startPollingForOperation,
};
}

View File

@@ -103,9 +103,14 @@ export function useChatSession({
}
}, [createError, loadError]);
// Track if we should be polling (set by external callers when they receive operation_started via SSE)
const [forcePolling, setForcePolling] = useState(false);
// Track if we've seen server acknowledge the pending operation (to avoid clearing forcePolling prematurely)
const hasSeenServerPendingRef = useRef(false);
// Check if there are any pending operations in the messages
// Must check all operation types: operation_pending, operation_started, operation_in_progress
const hasPendingOperations = useMemo(() => {
const hasPendingOperationsFromServer = useMemo(() => {
if (!messages || messages.length === 0) return false;
const pendingTypes = new Set([
"operation_pending",
@@ -126,6 +131,35 @@ export function useChatSession({
});
}, [messages]);
// Track when server has acknowledged the pending operation
useEffect(() => {
if (hasPendingOperationsFromServer) {
hasSeenServerPendingRef.current = true;
}
}, [hasPendingOperationsFromServer]);
// Combined: poll if server has pending ops OR if we received operation_started via SSE
const hasPendingOperations = hasPendingOperationsFromServer || forcePolling;
// Clear forcePolling only after server has acknowledged AND completed the operation
useEffect(() => {
if (
forcePolling &&
!hasPendingOperationsFromServer &&
hasSeenServerPendingRef.current
) {
// Server acknowledged the operation and it's now complete
setForcePolling(false);
hasSeenServerPendingRef.current = false;
}
}, [forcePolling, hasPendingOperationsFromServer]);
// Function to trigger polling (called when operation_started is received via SSE)
function startPollingForOperation() {
setForcePolling(true);
hasSeenServerPendingRef.current = false; // Reset for new operation
}
// Refresh sessions list when a pending operation completes
// (hasPendingOperations transitions from true to false)
const prevHasPendingOperationsRef = useRef(hasPendingOperations);
@@ -144,7 +178,8 @@ export function useChatSession({
[hasPendingOperations, sessionId, queryClient],
);
// Poll for updates when there are pending operations (long poll - 10s intervals with backoff)
// Poll for updates when there are pending operations
// Backoff: 2s, 4s, 6s, 8s, 10s, ... up to 30s max
const pollAttemptRef = useRef(0);
const hasPendingOperationsRef = useRef(hasPendingOperations);
hasPendingOperationsRef.current = hasPendingOperations;
@@ -159,27 +194,17 @@ export function useChatSession({
let cancelled = false;
let timeoutId: ReturnType<typeof setTimeout> | null = null;
// Calculate delay with exponential backoff: 10s, 15s, 20s, 25s, 30s (max)
const baseDelay = 10000;
const maxDelay = 30000;
function schedule() {
const delay = Math.min(
baseDelay + pollAttemptRef.current * 5000,
maxDelay,
);
// 2s, 4s, 6s, 8s, 10s, ... 30s (max)
const delay = Math.min((pollAttemptRef.current + 1) * 2000, 30000);
timeoutId = setTimeout(async () => {
if (cancelled) return;
console.info(
`[useChatSession] Polling for pending operation updates (attempt ${pollAttemptRef.current + 1})`,
);
pollAttemptRef.current += 1;
try {
await refetch();
} catch (err) {
console.error("[useChatSession] Poll failed:", err);
} finally {
// Continue polling if still pending and not cancelled
if (!cancelled && hasPendingOperationsRef.current) {
schedule();
}
@@ -329,6 +354,7 @@ export function useChatSession({
refreshSession,
claimSession,
clearSession,
startPollingForOperation,
};
}