mirror of
https://github.com/All-Hands-AI/OpenHands.git
synced 2026-04-29 03:00:45 -04:00
Compare commits
17 Commits
enyst/cli-
...
allow-mess
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7a2e7d6270 | ||
|
|
f5c58adaf7 | ||
|
|
c6cb025afe | ||
|
|
cb8214676e | ||
|
|
fdfb7308b8 | ||
|
|
4785de91b0 | ||
|
|
effd2b7d06 | ||
|
|
608dd8f2c2 | ||
|
|
6d0c03509e | ||
|
|
3e1070bbe9 | ||
|
|
2045350720 | ||
|
|
5f83d4cf9a | ||
|
|
d5a996a9e1 | ||
|
|
b0d38bbeb8 | ||
|
|
ed50b3ee8f | ||
|
|
4e5ed36213 | ||
|
|
9060452af6 |
@@ -13,7 +13,10 @@ import { generateAgentStateChangeEvent } from "#/services/agent-state-service";
|
||||
import { FeedbackModal } from "../feedback/feedback-modal";
|
||||
import { useScrollToBottom } from "#/hooks/use-scroll-to-bottom";
|
||||
import { TypingIndicator } from "./typing-indicator";
|
||||
import { useWsClient } from "#/context/ws-client-provider";
|
||||
import {
|
||||
useWsClient,
|
||||
WsClientProviderStatus,
|
||||
} from "#/context/ws-client-provider";
|
||||
import { Messages } from "./messages";
|
||||
import { ChatSuggestions } from "./chat-suggestions";
|
||||
import { ActionSuggestions } from "./action-suggestions";
|
||||
@@ -34,7 +37,7 @@ function getEntryPoint(
|
||||
}
|
||||
|
||||
export function ChatInterface() {
|
||||
const { send, isLoadingMessages } = useWsClient();
|
||||
const { send, isLoadingMessages, status, pendingMessages } = useWsClient();
|
||||
const dispatch = useDispatch();
|
||||
const scrollRef = React.useRef<HTMLDivElement>(null);
|
||||
const { scrollDomToBottom, onChatBodyScroll, hitBottom } =
|
||||
@@ -54,6 +57,9 @@ export function ChatInterface() {
|
||||
const params = useParams();
|
||||
const { mutate: getTrajectory } = useGetTrajectory();
|
||||
|
||||
const isClientDisconnected = status === WsClientProviderStatus.DISCONNECTED;
|
||||
const hasPendingMessages = pendingMessages.length > 0;
|
||||
|
||||
const handleSendMessage = async (content: string, files: File[]) => {
|
||||
if (messages.length === 0) {
|
||||
posthog.capture("initial_query_submitted", {
|
||||
@@ -76,7 +82,15 @@ export function ChatInterface() {
|
||||
const timestamp = new Date().toISOString();
|
||||
const pending = true;
|
||||
dispatch(addUserMessage({ content, imageUrls, timestamp, pending }));
|
||||
send(createChatMessage(content, imageUrls, timestamp));
|
||||
|
||||
// Create and send the chat message
|
||||
const chatMessage = createChatMessage(content, imageUrls, timestamp);
|
||||
send(chatMessage);
|
||||
|
||||
// Send the agent state change event immediately
|
||||
// The backend will handle the ordering and queueing
|
||||
send(generateAgentStateChangeEvent(AgentState.RUNNING));
|
||||
|
||||
setMessageToSend(null);
|
||||
};
|
||||
|
||||
@@ -131,8 +145,20 @@ export function ChatInterface() {
|
||||
className="flex flex-col grow overflow-y-auto overflow-x-hidden px-4 pt-4 gap-2"
|
||||
>
|
||||
{isLoadingMessages && (
|
||||
<div className="flex justify-center">
|
||||
<div className="flex flex-col items-center gap-2">
|
||||
<LoadingSpinner size="small" />
|
||||
{isClientDisconnected && (
|
||||
<div className="text-sm text-neutral-400">
|
||||
Waiting for client to become ready...
|
||||
{hasPendingMessages && (
|
||||
<div className="text-xs text-neutral-500 mt-1">
|
||||
{pendingMessages.length} message
|
||||
{pendingMessages.length !== 1 ? "s" : ""} will be sent when
|
||||
connected
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
)}
|
||||
|
||||
@@ -179,7 +205,7 @@ export function ChatInterface() {
|
||||
onSubmit={handleSendMessage}
|
||||
onStop={handleStop}
|
||||
isDisabled={
|
||||
curAgentState === AgentState.LOADING ||
|
||||
// Allow input even when loading, but not during confirmation
|
||||
curAgentState === AgentState.AWAITING_USER_CONFIRMATION
|
||||
}
|
||||
mode={curAgentState === AgentState.RUNNING ? "stop" : "submit"}
|
||||
|
||||
@@ -22,10 +22,11 @@ export function AgentStatusBar() {
|
||||
const { t, i18n } = useTranslation();
|
||||
const { curAgentState } = useSelector((state: RootState) => state.agent);
|
||||
const { curStatusMessage } = useSelector((state: RootState) => state.status);
|
||||
const { status } = useWsClient();
|
||||
const { status, pendingMessages } = useWsClient();
|
||||
const { notify } = useNotification();
|
||||
|
||||
const [statusMessage, setStatusMessage] = React.useState<string>("");
|
||||
const hasPendingMessages = pendingMessages.length > 0;
|
||||
|
||||
const updateStatusMessage = () => {
|
||||
let message = curStatusMessage.message || "";
|
||||
@@ -71,7 +72,13 @@ export function AgentStatusBar() {
|
||||
|
||||
React.useEffect(() => {
|
||||
if (status === WsClientProviderStatus.DISCONNECTED) {
|
||||
setStatusMessage("Connecting...");
|
||||
if (hasPendingMessages) {
|
||||
setStatusMessage(
|
||||
`Connecting... (${pendingMessages.length} pending message${pendingMessages.length !== 1 ? "s" : ""})`,
|
||||
);
|
||||
} else {
|
||||
setStatusMessage("Connecting...");
|
||||
}
|
||||
} else {
|
||||
setStatusMessage(AGENT_STATUS_MAP[curAgentState].message);
|
||||
if (notificationStates.includes(curAgentState)) {
|
||||
@@ -87,7 +94,7 @@ export function AgentStatusBar() {
|
||||
}
|
||||
}
|
||||
}
|
||||
}, [curAgentState, notify, t]);
|
||||
}, [curAgentState, status, pendingMessages.length, notify, t]);
|
||||
|
||||
return (
|
||||
<div className="flex flex-col items-center">
|
||||
|
||||
@@ -49,12 +49,14 @@ interface UseWsClient {
|
||||
isLoadingMessages: boolean;
|
||||
events: Record<string, unknown>[];
|
||||
send: (event: Record<string, unknown>) => void;
|
||||
pendingMessages: Record<string, unknown>[];
|
||||
}
|
||||
|
||||
const WsClientContext = React.createContext<UseWsClient>({
|
||||
status: WsClientProviderStatus.DISCONNECTED,
|
||||
isLoadingMessages: true,
|
||||
events: [],
|
||||
pendingMessages: [],
|
||||
send: () => {
|
||||
throw new Error("not connected");
|
||||
},
|
||||
@@ -109,26 +111,43 @@ export function WsClientProvider({
|
||||
WsClientProviderStatus.DISCONNECTED,
|
||||
);
|
||||
const [events, setEvents] = React.useState<Record<string, unknown>[]>([]);
|
||||
const [pendingMessages, setPendingMessages] = React.useState<
|
||||
Record<string, unknown>[]
|
||||
>([]);
|
||||
const lastEventRef = React.useRef<Record<string, unknown> | null>(null);
|
||||
|
||||
const messageRateHandler = useRate({ threshold: 250 });
|
||||
|
||||
// Private function to queue messages for later sending
|
||||
const queueMessage = (event: Record<string, unknown>) => {
|
||||
EventLogger.info(`Queueing message: ${JSON.stringify(event)}`);
|
||||
setPendingMessages((prev) => [...prev, event]);
|
||||
};
|
||||
|
||||
function send(event: Record<string, unknown>) {
|
||||
if (!sioRef.current) {
|
||||
EventLogger.error("WebSocket is not connected.");
|
||||
EventLogger.info("WebSocket is not connected, queueing message");
|
||||
queueMessage(event);
|
||||
return;
|
||||
}
|
||||
|
||||
// Send the message to the backend
|
||||
EventLogger.info(`Sending message: ${JSON.stringify(event)}`);
|
||||
sioRef.current.emit("oh_action", event);
|
||||
}
|
||||
|
||||
function handleConnect() {
|
||||
setStatus(WsClientProviderStatus.CONNECTED);
|
||||
EventLogger.info(
|
||||
`WebSocket connected. Pending messages: ${pendingMessages.length}`,
|
||||
);
|
||||
}
|
||||
|
||||
function handleMessage(event: Record<string, unknown>) {
|
||||
if (isOpenHandsEvent(event) && isMessageAction(event)) {
|
||||
messageRateHandler.record(new Date().getTime());
|
||||
}
|
||||
|
||||
setEvents((prevEvents) => [...prevEvents, event]);
|
||||
if (!Number.isNaN(parseInt(event.id as string, 10))) {
|
||||
lastEventRef.current = event;
|
||||
@@ -145,14 +164,39 @@ export function WsClientProvider({
|
||||
}
|
||||
sio.io.opts.query = sio.io.opts.query || {};
|
||||
sio.io.opts.query.latest_event_id = lastEventRef.current?.id;
|
||||
EventLogger.info(
|
||||
`WebSocket disconnected. Latest event ID: ${lastEventRef.current?.id}`,
|
||||
);
|
||||
updateStatusWhenErrorMessagePresent(data);
|
||||
}
|
||||
|
||||
function handleError(data: unknown) {
|
||||
setStatus(WsClientProviderStatus.DISCONNECTED);
|
||||
EventLogger.error(`WebSocket connection error: ${JSON.stringify(data)}`);
|
||||
updateStatusWhenErrorMessagePresent(data);
|
||||
}
|
||||
|
||||
// Process any pending messages when the WebSocket connects
|
||||
React.useEffect(() => {
|
||||
if (
|
||||
status === WsClientProviderStatus.CONNECTED &&
|
||||
pendingMessages.length > 0 &&
|
||||
sioRef.current
|
||||
) {
|
||||
// We're connected and have pending messages
|
||||
EventLogger.info(
|
||||
`Connected! Sending ${pendingMessages.length} queued messages`,
|
||||
);
|
||||
|
||||
pendingMessages.forEach((event) => {
|
||||
sioRef.current?.emit("oh_action", event);
|
||||
});
|
||||
|
||||
setPendingMessages([]);
|
||||
EventLogger.info("All queued messages sent, queue cleared");
|
||||
}
|
||||
}, [status, pendingMessages.length]);
|
||||
|
||||
React.useEffect(() => {
|
||||
lastEventRef.current = null;
|
||||
}, [conversationId]);
|
||||
@@ -210,9 +254,10 @@ export function WsClientProvider({
|
||||
status,
|
||||
isLoadingMessages: messageRateHandler.isUnderThreshold,
|
||||
events,
|
||||
pendingMessages,
|
||||
send,
|
||||
}),
|
||||
[status, messageRateHandler.isUnderThreshold, events],
|
||||
[status, messageRateHandler.isUnderThreshold, events, pendingMessages],
|
||||
);
|
||||
|
||||
return <WsClientContext value={value}>{children}</WsClientContext>;
|
||||
|
||||
@@ -1,18 +1,28 @@
|
||||
/* eslint-disable no-console */
|
||||
|
||||
/**
|
||||
* A utility class for logging events. This class will only log events in development mode.
|
||||
* A utility class for logging events. This class will log events in development mode
|
||||
* and can be forced to log in any environment by setting FORCE_LOGGING to true.
|
||||
*/
|
||||
class EventLogger {
|
||||
static isDevMode = process.env.NODE_ENV === "development";
|
||||
|
||||
static FORCE_LOGGING = false; // Set to false for production, true only for debugging
|
||||
|
||||
static shouldLog() {
|
||||
return this.isDevMode || this.FORCE_LOGGING;
|
||||
}
|
||||
|
||||
/**
|
||||
* Format and log a message event
|
||||
* @param event The raw event object
|
||||
*/
|
||||
static message(event: MessageEvent) {
|
||||
if (this.isDevMode) {
|
||||
console.warn(JSON.stringify(JSON.parse(event.data.toString()), null, 2));
|
||||
if (this.shouldLog()) {
|
||||
console.warn(
|
||||
"[OpenHands]",
|
||||
JSON.stringify(JSON.parse(event.data.toString()), null, 2),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -22,8 +32,8 @@ class EventLogger {
|
||||
* @param name The name of the event
|
||||
*/
|
||||
static event(event: Event, name?: string) {
|
||||
if (this.isDevMode) {
|
||||
console.warn(name || "EVENT", event);
|
||||
if (this.shouldLog()) {
|
||||
console.warn("[OpenHands]", name || "EVENT", event);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -32,8 +42,18 @@ class EventLogger {
|
||||
* @param warning The warning message
|
||||
*/
|
||||
static warning(warning: string) {
|
||||
if (this.isDevMode) {
|
||||
console.warn(warning);
|
||||
if (this.shouldLog()) {
|
||||
console.warn("[OpenHands]", warning);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Log an info message
|
||||
* @param info The info message
|
||||
*/
|
||||
static info(info: string) {
|
||||
if (this.shouldLog()) {
|
||||
console.info("[OpenHands]", info);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -42,8 +62,8 @@ class EventLogger {
|
||||
* @param error The error message
|
||||
*/
|
||||
static error(error: string) {
|
||||
if (this.isDevMode) {
|
||||
console.error(error);
|
||||
if (this.shouldLog()) {
|
||||
console.error("[OpenHands]", error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -330,6 +330,15 @@ class StandaloneConversationManager(ConversationManager):
|
||||
|
||||
session = self._local_agent_loops_by_sid.get(sid)
|
||||
if session:
|
||||
# Check if the session is ready to process actions
|
||||
if not session.is_ready():
|
||||
logger.info(
|
||||
f'Session not ready, queueing action: {data}',
|
||||
extra={'session_id': sid},
|
||||
)
|
||||
session.queue_action(data)
|
||||
return
|
||||
|
||||
await session.dispatch(data)
|
||||
return
|
||||
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
import asyncio
|
||||
import time
|
||||
from copy import deepcopy
|
||||
from dataclasses import field
|
||||
from logging import LoggerAdapter
|
||||
from typing import List
|
||||
|
||||
import socketio
|
||||
|
||||
@@ -12,6 +14,7 @@ from openhands.core.config.condenser_config import (
|
||||
)
|
||||
from openhands.core.const.guide_url import TROUBLESHOOTING_URL
|
||||
from openhands.core.logger import OpenHandsLoggerAdapter
|
||||
from openhands.core.logger import openhands_logger as logger
|
||||
from openhands.core.schema import AgentState
|
||||
from openhands.events.action import MessageAction, NullAction
|
||||
from openhands.events.event import Event, EventSource
|
||||
@@ -43,6 +46,9 @@ class Session:
|
||||
file_store: FileStore
|
||||
user_id: str | None
|
||||
logger: LoggerAdapter
|
||||
_pending_actions: List[dict] = []
|
||||
_is_ready: bool = False
|
||||
_ready_event: asyncio.Event = field(default_factory=asyncio.Event)
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -70,6 +76,16 @@ class Session:
|
||||
self.config = deepcopy(config)
|
||||
self.loop = asyncio.get_event_loop()
|
||||
self.user_id = user_id
|
||||
self._pending_actions = []
|
||||
self._is_ready = False
|
||||
self._ready_event = asyncio.Event()
|
||||
|
||||
# Subscribe to agent state changes to detect when the agent is ready
|
||||
self.agent_session.event_stream.subscribe(
|
||||
EventStreamSubscriber.SERVER,
|
||||
self._on_agent_state_change,
|
||||
f'{self.sid}_state_change',
|
||||
)
|
||||
|
||||
async def close(self):
|
||||
if self.sio:
|
||||
@@ -86,6 +102,11 @@ class Session:
|
||||
async def initialize_agent(
|
||||
self, settings: Settings, initial_message: MessageAction | None
|
||||
):
|
||||
# Reset the ready state when initializing a new agent
|
||||
self._is_ready = False
|
||||
self._ready_event.clear()
|
||||
|
||||
# Set the agent state to LOADING
|
||||
self.agent_session.event_stream.add_event(
|
||||
AgentStateChangedObservation('', AgentState.LOADING),
|
||||
EventSource.ENVIRONMENT,
|
||||
@@ -279,3 +300,55 @@ class Session:
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
self._send_status_message(msg_type, id, message), self.loop
|
||||
)
|
||||
|
||||
def is_ready(self) -> bool:
|
||||
"""Check if the session is ready to process actions."""
|
||||
return self._is_ready
|
||||
|
||||
def queue_action(self, action_data: dict):
|
||||
"""Queue an action to be processed when the session is ready."""
|
||||
logger.info(f'Queueing action for session {self.sid}: {action_data}')
|
||||
self._pending_actions.append(action_data)
|
||||
|
||||
# Start a task to process the queue when the session becomes ready
|
||||
asyncio.run_coroutine_threadsafe(self._process_queue_when_ready(), self.loop)
|
||||
|
||||
async def _process_queue_when_ready(self):
|
||||
"""Process the queue of actions when the session becomes ready."""
|
||||
if not self._ready_event.is_set():
|
||||
try:
|
||||
# Wait for the session to become ready
|
||||
await asyncio.wait_for(self._ready_event.wait(), timeout=60)
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning(
|
||||
f'Timeout waiting for session {self.sid} to become ready'
|
||||
)
|
||||
return
|
||||
|
||||
# Process all pending actions
|
||||
if self._pending_actions:
|
||||
logger.info(
|
||||
f'Processing {len(self._pending_actions)} queued actions for session {self.sid}'
|
||||
)
|
||||
|
||||
# Process all pending actions
|
||||
for action_data in self._pending_actions:
|
||||
logger.info(f'Processing queued action: {action_data}')
|
||||
await self.dispatch(action_data)
|
||||
|
||||
# Clear the queue
|
||||
self._pending_actions = []
|
||||
|
||||
def _on_agent_state_change(self, event: Event):
|
||||
"""Handle agent state change events to detect when the agent is ready."""
|
||||
if isinstance(event, AgentStateChangedObservation):
|
||||
# Check if the agent state indicates it's ready
|
||||
if event.agent_state in ['idle', 'ready']:
|
||||
logger.info(f'Agent for session {self.sid} is now ready')
|
||||
self._is_ready = True
|
||||
self._ready_event.set()
|
||||
|
||||
# Process any pending actions
|
||||
asyncio.run_coroutine_threadsafe(
|
||||
self._process_queue_when_ready(), self.loop
|
||||
)
|
||||
|
||||
@@ -99,6 +99,7 @@ reportlab = "*"
|
||||
[tool.coverage.run]
|
||||
concurrency = ["gevent"]
|
||||
|
||||
|
||||
[tool.poetry.group.runtime.dependencies]
|
||||
jupyterlab = "*"
|
||||
notebook = "*"
|
||||
@@ -127,6 +128,7 @@ ignore = ["D1"]
|
||||
[tool.ruff.lint.pydocstyle]
|
||||
convention = "google"
|
||||
|
||||
|
||||
[tool.poetry.group.evaluation.dependencies]
|
||||
streamlit = "*"
|
||||
whatthepatch = "*"
|
||||
|
||||
Reference in New Issue
Block a user