fix(platform): chat duplicate messages (#11332)

This commit is contained in:
Swifty
2025-11-06 17:20:46 +01:00
committed by GitHub
parent dcecb17bd1
commit 5559d978d7
8 changed files with 174 additions and 118 deletions

View File

@@ -11,7 +11,9 @@ class ChatConfig(BaseSettings):
"""Configuration for the chat system."""
# OpenAI API Configuration
model: str = Field(default="openai/gpt-5-mini", description="Default model to use")
model: str = Field(
default="qwen/qwen3-235b-a22b-2507", description="Default model to use"
)
api_key: str | None = Field(default=None, description="OpenAI API key")
base_url: str | None = Field(
default="https://openrouter.ai/api/v1",

View File

@@ -1,76 +1,97 @@
# AutoGPT Agent Setup Assistant
Your name is Otto.
You work for AutoGPT as an AI Co-Pilot acting as an AI Forward Deployed Engineer.
You were made by AutoGPT.
You are Otto, an AI Co-Pilot and Forward Deployed Engineer for AutoGPT, an AI Business Automation tool. Your mission is to help users quickly find and set up AutoGPT agents to solve their business problems.
AutoGPT is an AI Business Automation tool it help buisness capture the value from AI to accelerate there growth!
Here are the functions available to you:
You help users find and set up AutoGPT agents to solve their business problems. **Bias toward action** - move quickly to get agents running.
<functions>
1. **find_agent** - Search for agents that solve the user's problem
2. **get_agent_details** - Get comprehensive information about the chosen agent
3. **get_required_setup_info** - Verify user has required credentials (MANDATORY before execution)
4. **schedule_agent** - Schedules the agent to run based on a cron
5. **run_agent** - Execute the agent
</functions>
## THE FLOW (Always Follow This Order)
1. **find_agent** → Search for agents that solve their problem
2. **get_agent_details** → Get comprehensive info about chosen agent
3. **get_required_setup_info** → Verify user has required credentials (MANDATORY before next step)
4. **schedule_agent** or **run_agent** → Execute the agent
## MANDATORY WORKFLOW
You must follow these 4 steps in exact order:
1. **find_agent** - Search for agents that solve the user's problem
2. **get_agent_details** - Get comprehensive information about the chosen agent
3. **get_required_setup_info** - Verify user has required credentials (MANDATORY before execution)
4. **schedule_agent** or **run_agent** - Execute the agent
## YOUR APPROACH
### STEP 1: UNDERSTAND THE PROBLEM (Quick)
- One or two targeted questions max
- What business problem are they trying to solve?
**Step 1: Understand the Problem**
- Ask maximum 1-2 targeted questions
- Focus on: What business problem are they solving?
- Move quickly to searching for solutions
### STEP 2: FIND AGENTS
**Step 2: Find Agents**
- Use `find_agent` immediately with relevant keywords
- Suggest the best option based on what you know
- Suggest the best option from search results
- Explain briefly how it solves their problem
- Ask them if they would like to use it, if they do move to step 3
- Ask if they want to use it, then move to step 3
### STEP 3: GET DETAILS
**Step 3: Get Details**
- Use `get_agent_details` on their chosen agent
- Explain what the agent does and its requirements
- Keep explanations brief and outcome-focused
### STEP 4: VERIFY SETUP (CRITICAL)
- **ALWAYS** use `get_required_setup_info` before proceeding
- This checks if user has all required credentials
**Step 4: Verify Setup (CRITICAL)**
- ALWAYS use `get_required_setup_info` before execution
- Tell user what credentials they need (if any)
- Explain credentials are added via the frontend interface
- Explain that credentials are added via the frontend interface
### STEP 5: EXECUTE
<<<<<<< Updated upstream
- Once credentials verified, use `schedule_agent` for scheduled and tirggered runs OR `run_agent` for immediate execution
=======
- Once credentials verified, use `schedule_agent` for scheduled runs OR `run_agent` for immediate execution
>>>>>>> Stashed changes
- Confirm successful setup/run
**Step 5: Execute**
- Use `schedule_agent` for scheduled runs OR `run_agent` for immediate execution
- Confirm successful setup
- Provide clear next steps
## FUNCTION CALL FORMAT
To call a function, use this exact format:
`<function_call>function_name(parameter="value")</function_call>`
## KEY RULES
### What You DON'T Do:
**What You DON'T Do:**
- Don't help with login (frontend handles this)
- Don't help add credentials (frontend handles this)
- Don't skip `get_required_setup_info` (it's mandatory)
- Don't over-explain technical details
- Don't help add credentials (frontend handles this)
- Don't skip `get_required_setup_info` (mandatory before execution)
- Don't ask permission to use functions - just use them
- Don't write responses longer than 3 sentences
- Don't pretend to be ChatGPT
### What You DO:
**What You DO:**
- Act fast - get to agent discovery quickly
- Use tools proactively without asking permission
- Keep explanations short and business-focused
- Use functions proactively
- Keep all responses to maximum 3 sentences
- Always verify credentials before setup/run
- Focus on outcomes and value
- Maintain conversational, concise style
- Do use markdown to make your messages easier to read
### Error Handling:
- If authentication needed → Tell user to sign in via the interface
- If credentials missing → Tell user what's needed and where to add them in the frontend
- If setup fails → Identify issue, provide clear fix
**Error Handling:**
- Authentication needed → "Please sign in via the interface"
- Credentials missing → Tell user what's needed and where to add them
- Setup fails → Identify issue and provide clear fix
## SUCCESS LOOKS LIKE:
- User has an agent running within minutes
- User understands what their agent does
- User knows how to use their agent going forward
- Minimal back-and-forth, maximum action
## RESPONSE STRUCTURE
**Remember: Speed to value. Find agent → Get details → Verify credentials → Run. Keep it simple, keep it moving.**
Before responding, wrap your analysis in <thinking> tags to systematically plan your approach:
- Identify which step of the 4-step mandatory workflow you're currently on
- Extract the key business problem or request from the user's message
- Determine what function call (if any) you need to make next
- Plan your response to stay under the 3-sentence maximum
- Consider what specific keywords or parameters you'll use for any function calls
Example interaction pattern:
```
User: "I need to automate my social media posting"
Otto: Let me find social media automation agents for you. <function_call>find_agent(query="social media posting automation")</function_call> I'll show you the best options once I get the results.
```
Respond conversationally and begin helping them find the right AutoGPT agent for their needs.
KEEP ANSWERS TO 3 SENTENCES

View File

@@ -144,7 +144,11 @@ async def stream_chat(
async def event_generator() -> AsyncGenerator[str, None]:
async for chunk in chat_service.stream_chat_completion(
session_id, message, is_user_message=is_user_message, user_id=user_id
session_id,
message,
is_user_message=is_user_message,
user_id=user_id,
session=session, # Pass pre-fetched session to avoid double-fetch
):
yield chunk.to_sse()

View File

@@ -76,6 +76,7 @@ async def stream_chat_completion(
is_user_message: bool = True,
user_id: str | None = None,
retry_count: int = 0,
session: ChatSession | None = None,
) -> AsyncGenerator[StreamBaseResponse, None]:
"""Main entry point for streaming chat completions with database handling.
@@ -86,6 +87,7 @@ async def stream_chat_completion(
session_id: Chat session ID
user_message: User's input message
user_id: User ID for authentication (None for anonymous)
session: Optional pre-loaded session object (for recursive calls to avoid Redis refetch)
Yields:
StreamBaseResponse objects formatted as SSE
@@ -99,7 +101,18 @@ async def stream_chat_completion(
f"Streaming chat completion for session {session_id} for message {message} and user id {user_id}. Message is user message: {is_user_message}"
)
session = await get_chat_session(session_id, user_id)
# Only fetch from Redis if session not provided (initial call)
if session is None:
session = await get_chat_session(session_id, user_id)
logger.info(
f"Fetched session from Redis: {session.session_id if session else 'None'}, "
f"message_count={len(session.messages) if session else 0}"
)
else:
logger.info(
f"Using provided session object: {session.session_id}, "
f"message_count={len(session.messages)}"
)
if not session:
raise NotFoundError(
@@ -112,12 +125,17 @@ async def stream_chat_completion(
role="user" if is_user_message else "assistant", content=message
)
)
logger.info(
f"Appended message (role={'user' if is_user_message else 'assistant'}), "
f"new message_count={len(session.messages)}"
)
if len(session.messages) > config.max_context_messages:
raise ValueError(f"Max messages exceeded: {config.max_context_messages}")
logger.info(
f"Upserting session: {session.session_id} with user id {session.user_id}"
f"Upserting session: {session.session_id} with user id {session.user_id}, "
f"message_count={len(session.messages)}"
)
session = await upsert_chat_session(session)
assert session, "Session not found"
@@ -259,13 +277,15 @@ async def stream_chat_completion(
session_id=session.session_id,
user_id=user_id,
retry_count=retry_count + 1,
session=session,
):
yield chunk
return # Exit after retry to avoid double-saving in finally block
# Normal completion path - save session and handle tool call continuation
logger.info(
f"Upserting session: {session.session_id} with user id {session.user_id}"
f"Normal completion path: session={session.session_id}, "
f"current message_count={len(session.messages)}"
)
# Build the messages list in the correct order
@@ -285,9 +305,13 @@ async def stream_chat_completion(
# Add tool response messages after assistant message
messages_to_save.extend(tool_response_messages)
logger.info(f"Saving {len(tool_response_messages)} tool response messages")
logger.info(
f"Saving {len(tool_response_messages)} tool response messages, "
f"total_to_save={len(messages_to_save)}"
)
session.messages.extend(messages_to_save)
logger.info(f"Extended session messages, new message_count={len(session.messages)}")
await upsert_chat_session(session)
# If we did a tool call, stream the chat completion again to get the next response
@@ -296,7 +320,9 @@ async def stream_chat_completion(
"Tool call executed, streaming chat completion again to get assistant response"
)
async for chunk in stream_chat_completion(
session_id=session.session_id, user_id=user_id
session_id=session.session_id,
user_id=user_id,
session=session, # Pass session object to avoid Redis refetch
):
yield chunk

View File

@@ -26,9 +26,9 @@ export function ChatContainer({
});
const quickActions = [
"Find agents for data analysis",
"Show me automation agents",
"Help me build a workflow",
"Find agents for social media management",
"Show me agents for content creation",
"Help me automate my business",
"What can you help me with?",
];

View File

@@ -154,7 +154,12 @@ export function handleStreamEnd(
deps: HandlerDependencies,
) {
const completedContent = deps.streamingChunksRef.current.join("");
if (completedContent) {
// Only save message if there are uncommitted chunks
// (text_ended already saved if there were tool calls)
if (completedContent.trim()) {
console.log(
"[Stream End] Saving remaining streamed text as assistant message",
);
const assistantMessage: ChatMessageData = {
type: "message",
role: "assistant",
@@ -185,6 +190,8 @@ export function handleStreamEnd(
});
return updated;
});
} else {
console.log("[Stream End] No uncommitted chunks, message already saved");
}
deps.setStreamingChunks([]);
deps.streamingChunksRef.current = [];

View File

@@ -70,7 +70,10 @@ export function useChatSession({
} = useGetV2GetSession(sessionId || "", {
query: {
enabled: !!sessionId,
staleTime: 30000,
staleTime: Infinity, // Never mark as stale
refetchOnMount: false, // Don't refetch on component mount
refetchOnWindowFocus: false, // Don't refetch when window regains focus
refetchOnReconnect: false, // Don't refetch when network reconnects
retry: 1,
},
});

View File

@@ -105,63 +105,6 @@ export function useChatStream() {
eventSourceRef.current = null;
});
eventSource.onmessage = function (event) {
try {
const chunk = JSON.parse(event.data) as StreamChunk;
if (retryCountRef.current > 0) {
retryCountRef.current = 0;
}
onChunk(chunk);
if (chunk.type === "stream_end") {
stopStreaming();
}
} catch (err) {
const parseError =
err instanceof Error
? err
: new Error("Failed to parse stream chunk");
setError(parseError);
}
};
eventSource.onerror = function (_event) {
if (eventSourceRef.current) {
eventSourceRef.current.close();
eventSourceRef.current = null;
}
if (retryCountRef.current < MAX_RETRIES) {
retryCountRef.current += 1;
const retryDelay =
INITIAL_RETRY_DELAY * Math.pow(2, retryCountRef.current - 1);
toast.info("Connection interrupted", {
description: `Retrying in ${retryDelay / 1000} seconds...`,
});
retryTimeoutRef.current = setTimeout(() => {
sendMessage(sessionId, message, onChunk, isUserMessage).catch(
(_err) => {
// Retry failed
},
);
}, retryDelay);
} else {
const streamError = new Error(
"Stream connection failed after multiple retries",
);
setError(streamError);
toast.error("Connection Failed", {
description:
"Unable to connect to chat service. Please try again.",
});
stopStreaming();
}
};
return new Promise<void>((resolve, reject) => {
const cleanup = () => {
eventSource.removeEventListener("message", messageHandler);
@@ -171,8 +114,18 @@ export function useChatStream() {
const messageHandler = (event: MessageEvent) => {
try {
const chunk = JSON.parse(event.data) as StreamChunk;
if (retryCountRef.current > 0) {
retryCountRef.current = 0;
}
// Call the chunk handler
onChunk(chunk);
// Handle stream lifecycle
if (chunk.type === "stream_end") {
cleanup();
stopStreaming();
resolve();
} else if (chunk.type === "error") {
cleanup();
@@ -180,12 +133,52 @@ export function useChatStream() {
new Error(chunk.message || chunk.content || "Stream error"),
);
}
} catch {}
} catch (err) {
const parseError =
err instanceof Error
? err
: new Error("Failed to parse stream chunk");
setError(parseError);
cleanup();
reject(parseError);
}
};
const errorHandler = () => {
cleanup();
reject(new Error("Stream connection error"));
if (eventSourceRef.current) {
eventSourceRef.current.close();
eventSourceRef.current = null;
}
if (retryCountRef.current < MAX_RETRIES) {
retryCountRef.current += 1;
const retryDelay =
INITIAL_RETRY_DELAY * Math.pow(2, retryCountRef.current - 1);
toast.info("Connection interrupted", {
description: `Retrying in ${retryDelay / 1000} seconds...`,
});
retryTimeoutRef.current = setTimeout(() => {
sendMessage(sessionId, message, onChunk, isUserMessage).catch(
(_err) => {
// Retry failed
},
);
}, retryDelay);
} else {
const streamError = new Error(
"Stream connection failed after multiple retries",
);
setError(streamError);
toast.error("Connection Failed", {
description:
"Unable to connect to chat service. Please try again.",
});
cleanup();
stopStreaming();
reject(streamError);
}
};
eventSource.addEventListener("message", messageHandler);