Files
OpenHands/opendevin/server/agent/manager.py
Jirka Borovec 0c2ebfd6e1 Ruff: use I rule for isort (#1410)
Ruff: use I rule for isort
2024-04-29 15:41:58 -07:00

41 lines
1.2 KiB
Python

import atexit
from opendevin.logger import opendevin_logger as logger
from opendevin.server.session import session_manager
from .agent import AgentUnit
class AgentManager:
sid_to_agent: dict[str, 'AgentUnit'] = {}
def __init__(self):
atexit.register(self.close)
def register_agent(self, sid: str):
"""Registers a new agent.
Args:
sid: The session ID of the agent.
"""
if sid not in self.sid_to_agent:
self.sid_to_agent[sid] = AgentUnit(sid)
return
# TODO: confirm whether the agent is alive
async def dispatch(self, sid: str, action: str | None, data: dict):
"""Dispatches actions to the agent from the client."""
if sid not in self.sid_to_agent:
# self.register_agent(sid) # auto-register agent, may be opened later
logger.error(f'Agent not registered: {sid}')
await session_manager.send_error(sid, 'Agent not registered')
return
await self.sid_to_agent[sid].dispatch(action, data)
def close(self):
logger.info(f'Closing {len(self.sid_to_agent)} agent(s)...')
for sid, agent in self.sid_to_agent.items():
agent.close()