Compare commits

...

1 Commits

Author SHA1 Message Date
openhands
3c3fbd6e6c Move Redis polling to dedicated thread
- Add threading module and thread control event
- Replace asyncio task with daemon thread for Redis polling
- Use asyncio.run_coroutine_threadsafe for message processing
- Improve error handling and cleanup
2024-12-31 21:34:50 +00:00

View File

@@ -1,5 +1,6 @@
import asyncio
import json
import threading
import time
from dataclasses import dataclass, field
from uuid import uuid4
@@ -44,7 +45,8 @@ class SessionManager:
_local_agent_loops_by_sid: dict[str, Session] = field(default_factory=dict)
local_connection_id_to_session_id: dict[str, str] = field(default_factory=dict)
_last_alive_timestamps: dict[str, float] = field(default_factory=dict)
_redis_listen_task: asyncio.Task | None = None
_redis_listen_thread: threading.Thread | None = None
_redis_listen_stop_event: threading.Event = field(default_factory=threading.Event)
_session_is_running_checks: dict[str, _SessionIsRunningCheck] = field(
default_factory=dict
)
@@ -63,14 +65,18 @@ class SessionManager:
async def __aenter__(self):
redis_client = self._get_redis_client()
if redis_client:
self._redis_listen_task = asyncio.create_task(self._redis_subscribe())
self._redis_listen_stop_event.clear()
self._redis_listen_thread = threading.Thread(target=self._run_redis_subscribe)
self._redis_listen_thread.daemon = True
self._redis_listen_thread.start()
self._cleanup_task = asyncio.create_task(self._cleanup_detached_conversations())
return self
async def __aexit__(self, exc_type, exc_value, traceback):
if self._redis_listen_task:
self._redis_listen_task.cancel()
self._redis_listen_task = None
if self._redis_listen_thread:
self._redis_listen_stop_event.set()
self._redis_listen_thread.join()
self._redis_listen_thread = None
if self._cleanup_task:
self._cleanup_task.cancel()
self._cleanup_task = None
@@ -79,31 +85,32 @@ class SessionManager:
redis_client = getattr(self.sio.manager, 'redis', None)
return redis_client
async def _redis_subscribe(self):
def _run_redis_subscribe(self):
"""
We use a redis backchannel to send actions between server nodes
We use a redis backchannel to send actions between server nodes.
This method runs in a separate thread.
"""
logger.debug('_redis_subscribe')
redis_client = self._get_redis_client()
pubsub = redis_client.pubsub()
await pubsub.subscribe('oh_event')
while should_continue():
pubsub.subscribe('oh_event')
while not self._redis_listen_stop_event.is_set() and should_continue():
try:
message = await pubsub.get_message(
message = pubsub.get_message(
ignore_subscribe_messages=True, timeout=5
)
if message:
await self._process_message(message)
except asyncio.CancelledError:
return
except Exception:
try:
asyncio.get_running_loop()
logger.warning(
'error_reading_from_redis', exc_info=True, stack_info=True
# Schedule the message processing in the event loop
asyncio.run_coroutine_threadsafe(
self._process_message(message),
asyncio.get_event_loop()
)
except RuntimeError:
return # Loop has been shut down
except Exception:
logger.warning(
'error_reading_from_redis', exc_info=True, stack_info=True
)
time.sleep(1) # Avoid tight loop on error
async def _process_message(self, message: dict):
data = json.loads(message['data'])