Compare commits

...

2 Commits

Author SHA1 Message Date
openhands
e27efce042 Add info method to EventLogger 2025-03-03 22:14:47 +00:00
openhands
95bf98f21a Fix message queuing during client loading by waiting for backend ready signal 2025-03-03 22:12:44 +00:00
4 changed files with 117 additions and 9 deletions

View File

@@ -14,7 +14,10 @@ import { generateAgentStateChangeEvent } from "#/services/agent-state-service";
import { FeedbackModal } from "../feedback/feedback-modal";
import { useScrollToBottom } from "#/hooks/use-scroll-to-bottom";
import { TypingIndicator } from "./typing-indicator";
import { useWsClient } from "#/context/ws-client-provider";
import {
useWsClient,
WsClientProviderStatus,
} from "#/context/ws-client-provider";
import { Messages } from "./messages";
import { ChatSuggestions } from "./chat-suggestions";
import { ActionSuggestions } from "./action-suggestions";
@@ -34,7 +37,7 @@ function getEntryPoint(
}
export function ChatInterface() {
const { send, isLoadingMessages } = useWsClient();
const { send, isLoadingMessages, status, pendingMessages } = useWsClient();
const dispatch = useDispatch();
const scrollRef = React.useRef<HTMLDivElement>(null);
const { scrollDomToBottom, onChatBodyScroll, hitBottom } =
@@ -54,6 +57,9 @@ export function ChatInterface() {
const params = useParams();
const { mutate: getTrajectory } = useGetTrajectory();
const isClientDisconnected = status === WsClientProviderStatus.DISCONNECTED;
const hasPendingMessages = pendingMessages.length > 0;
const handleSendMessage = async (content: string, files: File[]) => {
if (messages.length === 0) {
posthog.capture("initial_query_submitted", {
@@ -76,7 +82,16 @@ export function ChatInterface() {
const timestamp = new Date().toISOString();
const pending = true;
dispatch(addUserMessage({ content, imageUrls, timestamp, pending }));
send(createChatMessage(content, imageUrls, timestamp));
// Create the chat message
const chatMessage = createChatMessage(content, imageUrls, timestamp);
// Send or queue the message depending on connection status
send(chatMessage);
// Set agent state to RUNNING when a message is sent
send(generateAgentStateChangeEvent(AgentState.RUNNING));
setMessageToSend(null);
};
@@ -131,8 +146,20 @@ export function ChatInterface() {
className="flex flex-col grow overflow-y-auto overflow-x-hidden px-4 pt-4 gap-2"
>
{isLoadingMessages && (
<div className="flex justify-center">
<div className="flex flex-col items-center gap-2">
<LoadingSpinner size="small" />
{isClientDisconnected && (
<div className="text-sm text-neutral-400">
Waiting for client to become ready...
{hasPendingMessages && (
<div className="text-xs text-neutral-500 mt-1">
{pendingMessages.length} message
{pendingMessages.length !== 1 ? "s" : ""} will be sent when
connected
</div>
)}
</div>
)}
</div>
)}
@@ -179,7 +206,7 @@ export function ChatInterface() {
onSubmit={handleSendMessage}
onStop={handleStop}
isDisabled={
curAgentState === AgentState.LOADING ||
// Allow input even when loading, but not during confirmation
curAgentState === AgentState.AWAITING_USER_CONFIRMATION
}
mode={curAgentState === AgentState.RUNNING ? "stop" : "submit"}

View File

@@ -22,10 +22,11 @@ export function AgentStatusBar() {
const { t, i18n } = useTranslation();
const { curAgentState } = useSelector((state: RootState) => state.agent);
const { curStatusMessage } = useSelector((state: RootState) => state.status);
const { status } = useWsClient();
const { status, pendingMessages } = useWsClient();
const { notify } = useNotification();
const [statusMessage, setStatusMessage] = React.useState<string>("");
const hasPendingMessages = pendingMessages.length > 0;
const updateStatusMessage = () => {
let message = curStatusMessage.message || "";
@@ -71,7 +72,13 @@ export function AgentStatusBar() {
React.useEffect(() => {
if (status === WsClientProviderStatus.DISCONNECTED) {
setStatusMessage("Connecting...");
if (hasPendingMessages) {
setStatusMessage(
`Connecting... (${pendingMessages.length} pending message${pendingMessages.length !== 1 ? "s" : ""})`,
);
} else {
setStatusMessage("Connecting...");
}
} else {
setStatusMessage(AGENT_STATUS_MAP[curAgentState].message);
if (notificationStates.includes(curAgentState)) {
@@ -87,7 +94,7 @@ export function AgentStatusBar() {
}
}
}
}, [curAgentState, notify, t]);
}, [curAgentState, status, pendingMessages.length, notify, t]);
return (
<div className="flex flex-col items-center">

View File

@@ -39,6 +39,14 @@ const isMessageAction = (
): event is UserMessageAction | AssistantMessageAction =>
isUserMessage(event) || isAssistantMessage(event);
// Check if an event is an agent state changed observation
const isAgentStateEvent = (event: Record<string, unknown>): boolean =>
isOpenHandsEvent(event) &&
"type" in event &&
event.type === "observation" &&
"observation_id" in event &&
event.observation_id === "agent_state_changed";
export enum WsClientProviderStatus {
CONNECTED,
DISCONNECTED,
@@ -49,15 +57,21 @@ interface UseWsClient {
isLoadingMessages: boolean;
events: Record<string, unknown>[];
send: (event: Record<string, unknown>) => void;
queueMessage: (event: Record<string, unknown>) => void;
pendingMessages: Record<string, unknown>[];
}
const WsClientContext = React.createContext<UseWsClient>({
status: WsClientProviderStatus.DISCONNECTED,
isLoadingMessages: true,
events: [],
pendingMessages: [],
send: () => {
throw new Error("not connected");
},
queueMessage: () => {
throw new Error("not connected");
},
});
interface WsClientProviderProps {
@@ -109,26 +123,50 @@ export function WsClientProvider({
WsClientProviderStatus.DISCONNECTED,
);
const [events, setEvents] = React.useState<Record<string, unknown>[]>([]);
const [pendingMessages, setPendingMessages] = React.useState<
Record<string, unknown>[]
>([]);
const [backendReady, setBackendReady] = React.useState(false);
const lastEventRef = React.useRef<Record<string, unknown> | null>(null);
const messageRateHandler = useRate({ threshold: 250 });
function queueMessage(event: Record<string, unknown>) {
setPendingMessages((prev) => [...prev, event]);
}
function send(event: Record<string, unknown>) {
if (!sioRef.current) {
EventLogger.error("WebSocket is not connected.");
queueMessage(event);
return;
}
if (!backendReady) {
// If backend is not ready yet, queue the message
EventLogger.info("Backend not ready, queueing message");
queueMessage(event);
return;
}
sioRef.current.emit("oh_action", event);
}
function handleConnect() {
setStatus(WsClientProviderStatus.CONNECTED);
// Don't send queued messages yet - wait for backend ready signal
}
function handleMessage(event: Record<string, unknown>) {
if (isOpenHandsEvent(event) && isMessageAction(event)) {
messageRateHandler.record(new Date().getTime());
}
// Check if this is a state change event indicating backend is ready
if (isAgentStateEvent(event)) {
setBackendReady(true);
}
setEvents((prevEvents) => [...prevEvents, event]);
if (!Number.isNaN(parseInt(event.id as string, 10))) {
lastEventRef.current = event;
@@ -139,6 +177,7 @@ export function WsClientProvider({
function handleDisconnect(data: unknown) {
setStatus(WsClientProviderStatus.DISCONNECTED);
setBackendReady(false);
const sio = sioRef.current;
if (!sio) {
return;
@@ -150,11 +189,34 @@ export function WsClientProvider({
function handleError(data: unknown) {
setStatus(WsClientProviderStatus.DISCONNECTED);
setBackendReady(false);
updateStatusWhenErrorMessagePresent(data);
}
// Watch for backend ready state and send queued messages when ready
React.useEffect(() => {
if (backendReady && pendingMessages.length > 0 && sioRef.current) {
// Backend is ready and we have pending messages
EventLogger.info(`Sending ${pendingMessages.length} queued messages`);
pendingMessages.forEach((event) => {
sioRef.current?.emit("oh_action", event);
});
// Also set the agent state to RUNNING if needed
const agentStateEvent = {
action: "change_agent_state",
args: { agent_state: "running" },
};
sioRef.current.emit("oh_action", agentStateEvent);
setPendingMessages([]);
}
}, [backendReady, pendingMessages.length]);
React.useEffect(() => {
lastEventRef.current = null;
setBackendReady(false);
}, [conversationId]);
React.useEffect(() => {
@@ -210,9 +272,11 @@ export function WsClientProvider({
status,
isLoadingMessages: messageRateHandler.isUnderThreshold,
events,
pendingMessages,
send,
queueMessage,
}),
[status, messageRateHandler.isUnderThreshold, events],
[status, messageRateHandler.isUnderThreshold, events, pendingMessages],
);
return <WsClientContext value={value}>{children}</WsClientContext>;

View File

@@ -37,6 +37,16 @@ class EventLogger {
}
}
/**
* Log an info message
* @param info The info message
*/
static info(info: string) {
if (this.isDevMode) {
console.info(info);
}
}
/**
* Log an error message
* @param error The error message