Compare commits

...

17 Commits

Author SHA1 Message Date
openhands
7a2e7d6270 Revert changes to pyproject.toml 2025-03-20 17:01:50 +00:00
openhands
f5c58adaf7 Refactor: Clean up WebSocket client code and fix logging 2025-03-20 16:49:38 +00:00
Xingyao Wang
c6cb025afe Merge branch 'main' into allow-message-during-client-loading 2025-03-20 12:41:16 -04:00
openhands
cb8214676e Simplify frontend logic by removing unnecessary backend readiness handling 2025-03-19 15:21:48 +00:00
openhands
fdfb7308b8 Move message queueing from frontend to backend 2025-03-19 15:06:16 +00:00
Xingyao Wang
4785de91b0 Merge branch 'main' into allow-message-during-client-loading 2025-03-19 10:59:12 -04:00
Xingyao Wang
effd2b7d06 Merge branch 'main' into allow-message-during-client-loading 2025-03-05 13:41:43 -05:00
openhands
608dd8f2c2 Fix linting issues 2025-03-04 01:59:59 +00:00
openhands
6d0c03509e Simplify backend ready detection and message sending 2025-03-03 22:39:10 +00:00
openhands
3e1070bbe9 Add enhanced logging to debug WebSocket connection and message queuing 2025-03-03 22:36:29 +00:00
openhands
2045350720 Fix message queuing by waiting for backend ready signal 2025-03-03 22:27:54 +00:00
openhands
5f83d4cf9a Add info method to EventLogger 2025-03-03 22:25:43 +00:00
openhands
d5a996a9e1 Update i18n declaration file 2025-02-28 02:27:47 +00:00
openhands
b0d38bbeb8 Merge main into allow-message-during-client-loading and resolve conflicts 2025-02-28 02:26:10 +00:00
openhands
ed50b3ee8f Fix agent response by sending agent state change event 2025-02-27 17:31:17 +00:00
openhands
4e5ed36213 Fix linting issue with queueMessage function order 2025-02-27 16:36:05 +00:00
openhands
9060452af6 Allow sending messages while client is connecting 2025-02-27 16:21:30 +00:00
7 changed files with 201 additions and 19 deletions

View File

@@ -13,7 +13,10 @@ import { generateAgentStateChangeEvent } from "#/services/agent-state-service";
import { FeedbackModal } from "../feedback/feedback-modal";
import { useScrollToBottom } from "#/hooks/use-scroll-to-bottom";
import { TypingIndicator } from "./typing-indicator";
import { useWsClient } from "#/context/ws-client-provider";
import {
useWsClient,
WsClientProviderStatus,
} from "#/context/ws-client-provider";
import { Messages } from "./messages";
import { ChatSuggestions } from "./chat-suggestions";
import { ActionSuggestions } from "./action-suggestions";
@@ -34,7 +37,7 @@ function getEntryPoint(
}
export function ChatInterface() {
const { send, isLoadingMessages } = useWsClient();
const { send, isLoadingMessages, status, pendingMessages } = useWsClient();
const dispatch = useDispatch();
const scrollRef = React.useRef<HTMLDivElement>(null);
const { scrollDomToBottom, onChatBodyScroll, hitBottom } =
@@ -54,6 +57,9 @@ export function ChatInterface() {
const params = useParams();
const { mutate: getTrajectory } = useGetTrajectory();
const isClientDisconnected = status === WsClientProviderStatus.DISCONNECTED;
const hasPendingMessages = pendingMessages.length > 0;
const handleSendMessage = async (content: string, files: File[]) => {
if (messages.length === 0) {
posthog.capture("initial_query_submitted", {
@@ -76,7 +82,15 @@ export function ChatInterface() {
const timestamp = new Date().toISOString();
const pending = true;
dispatch(addUserMessage({ content, imageUrls, timestamp, pending }));
send(createChatMessage(content, imageUrls, timestamp));
// Create and send the chat message
const chatMessage = createChatMessage(content, imageUrls, timestamp);
send(chatMessage);
// Send the agent state change event immediately
// The backend will handle the ordering and queueing
send(generateAgentStateChangeEvent(AgentState.RUNNING));
setMessageToSend(null);
};
@@ -131,8 +145,20 @@ export function ChatInterface() {
className="flex flex-col grow overflow-y-auto overflow-x-hidden px-4 pt-4 gap-2"
>
{isLoadingMessages && (
<div className="flex justify-center">
<div className="flex flex-col items-center gap-2">
<LoadingSpinner size="small" />
{isClientDisconnected && (
<div className="text-sm text-neutral-400">
Waiting for client to become ready...
{hasPendingMessages && (
<div className="text-xs text-neutral-500 mt-1">
{pendingMessages.length} message
{pendingMessages.length !== 1 ? "s" : ""} will be sent when
connected
</div>
)}
</div>
)}
</div>
)}
@@ -179,7 +205,7 @@ export function ChatInterface() {
onSubmit={handleSendMessage}
onStop={handleStop}
isDisabled={
curAgentState === AgentState.LOADING ||
// Allow input even when loading, but not during confirmation
curAgentState === AgentState.AWAITING_USER_CONFIRMATION
}
mode={curAgentState === AgentState.RUNNING ? "stop" : "submit"}

View File

@@ -22,10 +22,11 @@ export function AgentStatusBar() {
const { t, i18n } = useTranslation();
const { curAgentState } = useSelector((state: RootState) => state.agent);
const { curStatusMessage } = useSelector((state: RootState) => state.status);
const { status } = useWsClient();
const { status, pendingMessages } = useWsClient();
const { notify } = useNotification();
const [statusMessage, setStatusMessage] = React.useState<string>("");
const hasPendingMessages = pendingMessages.length > 0;
const updateStatusMessage = () => {
let message = curStatusMessage.message || "";
@@ -71,7 +72,13 @@ export function AgentStatusBar() {
React.useEffect(() => {
if (status === WsClientProviderStatus.DISCONNECTED) {
setStatusMessage("Connecting...");
if (hasPendingMessages) {
setStatusMessage(
`Connecting... (${pendingMessages.length} pending message${pendingMessages.length !== 1 ? "s" : ""})`,
);
} else {
setStatusMessage("Connecting...");
}
} else {
setStatusMessage(AGENT_STATUS_MAP[curAgentState].message);
if (notificationStates.includes(curAgentState)) {
@@ -87,7 +94,7 @@ export function AgentStatusBar() {
}
}
}
}, [curAgentState, notify, t]);
}, [curAgentState, status, pendingMessages.length, notify, t]);
return (
<div className="flex flex-col items-center">

View File

@@ -49,12 +49,14 @@ interface UseWsClient {
isLoadingMessages: boolean;
events: Record<string, unknown>[];
send: (event: Record<string, unknown>) => void;
pendingMessages: Record<string, unknown>[];
}
const WsClientContext = React.createContext<UseWsClient>({
status: WsClientProviderStatus.DISCONNECTED,
isLoadingMessages: true,
events: [],
pendingMessages: [],
send: () => {
throw new Error("not connected");
},
@@ -109,26 +111,43 @@ export function WsClientProvider({
WsClientProviderStatus.DISCONNECTED,
);
const [events, setEvents] = React.useState<Record<string, unknown>[]>([]);
const [pendingMessages, setPendingMessages] = React.useState<
Record<string, unknown>[]
>([]);
const lastEventRef = React.useRef<Record<string, unknown> | null>(null);
const messageRateHandler = useRate({ threshold: 250 });
// Private function to queue messages for later sending
const queueMessage = (event: Record<string, unknown>) => {
EventLogger.info(`Queueing message: ${JSON.stringify(event)}`);
setPendingMessages((prev) => [...prev, event]);
};
function send(event: Record<string, unknown>) {
if (!sioRef.current) {
EventLogger.error("WebSocket is not connected.");
EventLogger.info("WebSocket is not connected, queueing message");
queueMessage(event);
return;
}
// Send the message to the backend
EventLogger.info(`Sending message: ${JSON.stringify(event)}`);
sioRef.current.emit("oh_action", event);
}
function handleConnect() {
setStatus(WsClientProviderStatus.CONNECTED);
EventLogger.info(
`WebSocket connected. Pending messages: ${pendingMessages.length}`,
);
}
function handleMessage(event: Record<string, unknown>) {
if (isOpenHandsEvent(event) && isMessageAction(event)) {
messageRateHandler.record(new Date().getTime());
}
setEvents((prevEvents) => [...prevEvents, event]);
if (!Number.isNaN(parseInt(event.id as string, 10))) {
lastEventRef.current = event;
@@ -145,14 +164,39 @@ export function WsClientProvider({
}
sio.io.opts.query = sio.io.opts.query || {};
sio.io.opts.query.latest_event_id = lastEventRef.current?.id;
EventLogger.info(
`WebSocket disconnected. Latest event ID: ${lastEventRef.current?.id}`,
);
updateStatusWhenErrorMessagePresent(data);
}
function handleError(data: unknown) {
setStatus(WsClientProviderStatus.DISCONNECTED);
EventLogger.error(`WebSocket connection error: ${JSON.stringify(data)}`);
updateStatusWhenErrorMessagePresent(data);
}
// Process any pending messages when the WebSocket connects
React.useEffect(() => {
if (
status === WsClientProviderStatus.CONNECTED &&
pendingMessages.length > 0 &&
sioRef.current
) {
// We're connected and have pending messages
EventLogger.info(
`Connected! Sending ${pendingMessages.length} queued messages`,
);
pendingMessages.forEach((event) => {
sioRef.current?.emit("oh_action", event);
});
setPendingMessages([]);
EventLogger.info("All queued messages sent, queue cleared");
}
}, [status, pendingMessages.length]);
React.useEffect(() => {
lastEventRef.current = null;
}, [conversationId]);
@@ -210,9 +254,10 @@ export function WsClientProvider({
status,
isLoadingMessages: messageRateHandler.isUnderThreshold,
events,
pendingMessages,
send,
}),
[status, messageRateHandler.isUnderThreshold, events],
[status, messageRateHandler.isUnderThreshold, events, pendingMessages],
);
return <WsClientContext value={value}>{children}</WsClientContext>;

View File

@@ -1,18 +1,28 @@
/* eslint-disable no-console */
/**
* A utility class for logging events. This class will only log events in development mode.
* A utility class for logging events. This class will log events in development mode
* and can be forced to log in any environment by setting FORCE_LOGGING to true.
*/
class EventLogger {
static isDevMode = process.env.NODE_ENV === "development";
static FORCE_LOGGING = false; // Set to false for production, true only for debugging
static shouldLog() {
return this.isDevMode || this.FORCE_LOGGING;
}
/**
* Format and log a message event
* @param event The raw event object
*/
static message(event: MessageEvent) {
if (this.isDevMode) {
console.warn(JSON.stringify(JSON.parse(event.data.toString()), null, 2));
if (this.shouldLog()) {
console.warn(
"[OpenHands]",
JSON.stringify(JSON.parse(event.data.toString()), null, 2),
);
}
}
@@ -22,8 +32,8 @@ class EventLogger {
* @param name The name of the event
*/
static event(event: Event, name?: string) {
if (this.isDevMode) {
console.warn(name || "EVENT", event);
if (this.shouldLog()) {
console.warn("[OpenHands]", name || "EVENT", event);
}
}
@@ -32,8 +42,18 @@ class EventLogger {
* @param warning The warning message
*/
static warning(warning: string) {
if (this.isDevMode) {
console.warn(warning);
if (this.shouldLog()) {
console.warn("[OpenHands]", warning);
}
}
/**
* Log an info message
* @param info The info message
*/
static info(info: string) {
if (this.shouldLog()) {
console.info("[OpenHands]", info);
}
}
@@ -42,8 +62,8 @@ class EventLogger {
* @param error The error message
*/
static error(error: string) {
if (this.isDevMode) {
console.error(error);
if (this.shouldLog()) {
console.error("[OpenHands]", error);
}
}
}

View File

@@ -330,6 +330,15 @@ class StandaloneConversationManager(ConversationManager):
session = self._local_agent_loops_by_sid.get(sid)
if session:
# Check if the session is ready to process actions
if not session.is_ready():
logger.info(
f'Session not ready, queueing action: {data}',
extra={'session_id': sid},
)
session.queue_action(data)
return
await session.dispatch(data)
return

View File

@@ -1,7 +1,9 @@
import asyncio
import time
from copy import deepcopy
from dataclasses import field
from logging import LoggerAdapter
from typing import List
import socketio
@@ -12,6 +14,7 @@ from openhands.core.config.condenser_config import (
)
from openhands.core.const.guide_url import TROUBLESHOOTING_URL
from openhands.core.logger import OpenHandsLoggerAdapter
from openhands.core.logger import openhands_logger as logger
from openhands.core.schema import AgentState
from openhands.events.action import MessageAction, NullAction
from openhands.events.event import Event, EventSource
@@ -43,6 +46,9 @@ class Session:
file_store: FileStore
user_id: str | None
logger: LoggerAdapter
_pending_actions: List[dict] = []
_is_ready: bool = False
_ready_event: asyncio.Event = field(default_factory=asyncio.Event)
def __init__(
self,
@@ -70,6 +76,16 @@ class Session:
self.config = deepcopy(config)
self.loop = asyncio.get_event_loop()
self.user_id = user_id
self._pending_actions = []
self._is_ready = False
self._ready_event = asyncio.Event()
# Subscribe to agent state changes to detect when the agent is ready
self.agent_session.event_stream.subscribe(
EventStreamSubscriber.SERVER,
self._on_agent_state_change,
f'{self.sid}_state_change',
)
async def close(self):
if self.sio:
@@ -86,6 +102,11 @@ class Session:
async def initialize_agent(
self, settings: Settings, initial_message: MessageAction | None
):
# Reset the ready state when initializing a new agent
self._is_ready = False
self._ready_event.clear()
# Set the agent state to LOADING
self.agent_session.event_stream.add_event(
AgentStateChangedObservation('', AgentState.LOADING),
EventSource.ENVIRONMENT,
@@ -279,3 +300,55 @@ class Session:
asyncio.run_coroutine_threadsafe(
self._send_status_message(msg_type, id, message), self.loop
)
def is_ready(self) -> bool:
"""Check if the session is ready to process actions."""
return self._is_ready
def queue_action(self, action_data: dict):
"""Queue an action to be processed when the session is ready."""
logger.info(f'Queueing action for session {self.sid}: {action_data}')
self._pending_actions.append(action_data)
# Start a task to process the queue when the session becomes ready
asyncio.run_coroutine_threadsafe(self._process_queue_when_ready(), self.loop)
async def _process_queue_when_ready(self):
"""Process the queue of actions when the session becomes ready."""
if not self._ready_event.is_set():
try:
# Wait for the session to become ready
await asyncio.wait_for(self._ready_event.wait(), timeout=60)
except asyncio.TimeoutError:
logger.warning(
f'Timeout waiting for session {self.sid} to become ready'
)
return
# Process all pending actions
if self._pending_actions:
logger.info(
f'Processing {len(self._pending_actions)} queued actions for session {self.sid}'
)
# Process all pending actions
for action_data in self._pending_actions:
logger.info(f'Processing queued action: {action_data}')
await self.dispatch(action_data)
# Clear the queue
self._pending_actions = []
def _on_agent_state_change(self, event: Event):
"""Handle agent state change events to detect when the agent is ready."""
if isinstance(event, AgentStateChangedObservation):
# Check if the agent state indicates it's ready
if event.agent_state in ['idle', 'ready']:
logger.info(f'Agent for session {self.sid} is now ready')
self._is_ready = True
self._ready_event.set()
# Process any pending actions
asyncio.run_coroutine_threadsafe(
self._process_queue_when_ready(), self.loop
)

View File

@@ -99,6 +99,7 @@ reportlab = "*"
[tool.coverage.run]
concurrency = ["gevent"]
[tool.poetry.group.runtime.dependencies]
jupyterlab = "*"
notebook = "*"
@@ -127,6 +128,7 @@ ignore = ["D1"]
[tool.ruff.lint.pydocstyle]
convention = "google"
[tool.poetry.group.evaluation.dependencies]
streamlit = "*"
whatthepatch = "*"