split execution manager

This commit is contained in:
Aarushi
2024-09-06 14:04:51 +01:00
parent b0a710f75a
commit 2ce1717d13
7 changed files with 34 additions and 12 deletions

View File

@@ -26,10 +26,8 @@ def main(**kwargs):
from autogpt_server.executor import ExecutionManager, ExecutionScheduler
from autogpt_server.server import AgentServer, WebsocketServer
from autogpt_server.util.service import PyroNameServer
run_processes(
PyroNameServer(),
ExecutionManager(),
ExecutionScheduler(),
WebsocketServer(),
@@ -38,5 +36,12 @@ def main(**kwargs):
)
def execution_manager(**kwargs):
from autogpt_server.executor import ExecutionManager
run_processes(ExecutionManager(), **kwargs)
if __name__ == "__main__":
main()

View File

@@ -371,7 +371,7 @@ def validate_exec(
def get_agent_server_client() -> "AgentServer":
from autogpt_server.server.rest_api import AgentServer
return get_service_client(AgentServer, 8004)
return get_service_client(AgentServer, Config().agent_server_port)
class Executor:
@@ -623,7 +623,7 @@ class Executor:
class ExecutionManager(AppService):
def __init__(self):
super().__init__(port=8002)
super().__init__(port=Config().execution_manager_port)
self.use_db = True
self.pool_size = Config().num_graph_workers
self.queue = ExecutionQueue[GraphExecution]()

View File

@@ -9,6 +9,7 @@ from autogpt_server.data import schedule as model
from autogpt_server.data.block import BlockInput
from autogpt_server.executor.manager import ExecutionManager
from autogpt_server.util.service import AppService, expose, get_service_client
from autogpt_server.util.settings import Config
logger = logging.getLogger(__name__)
@@ -19,7 +20,7 @@ def log(msg, **kwargs):
class ExecutionScheduler(AppService):
def __init__(self, refresh_interval=10):
super().__init__(port=8003)
super().__init__(port=Config().execution_scheduler_port)
self.use_db = True
self.last_check = datetime.min
self.refresh_interval = refresh_interval
@@ -27,7 +28,7 @@ class ExecutionScheduler(AppService):
@property
def execution_manager_client(self) -> ExecutionManager:
return get_service_client(ExecutionManager, 8002)
return get_service_client(ExecutionManager, Config().execution_manager_port)
def run_service(self):
scheduler = BackgroundScheduler()

View File

@@ -22,7 +22,7 @@ from autogpt_server.server.model import CreateGraph, SetGraphActiveVersion
from autogpt_server.util.auth import get_user_id
from autogpt_server.util.lock import KeyedMutex
from autogpt_server.util.service import AppService, expose, get_service_client
from autogpt_server.util.settings import Settings
from autogpt_server.util.settings import Config, Settings
settings = Settings()
@@ -33,7 +33,7 @@ class AgentServer(AppService):
_test_dependency_overrides = {}
def __init__(self, event_queue: AsyncEventQueue | None = None):
super().__init__(port=8004)
super().__init__(port=Config().agent_server_port)
self.event_queue = event_queue or AsyncRedisEventQueue()
@asynccontextmanager
@@ -234,11 +234,11 @@ class AgentServer(AppService):
@property
def execution_manager_client(self) -> ExecutionManager:
return get_service_client(ExecutionManager, 8002)
return get_service_client(ExecutionManager, Config().execution_manager_port)
@property
def execution_scheduler_client(self) -> ExecutionScheduler:
return get_service_client(ExecutionScheduler, 8003)
return get_service_client(ExecutionScheduler, Config().execution_scheduler_port)
@classmethod
def handle_internal_http_error(cls, request: Request, exc: Exception):

View File

@@ -72,6 +72,21 @@ class Config(UpdateTrackingModel["Config"], BaseSettings):
extra="allow",
)
execution_manager_port: int = Field(
default=8002,
description="The port for execution manager daemon to run on",
)
execution_scheduler_port: int = Field(
default=8003,
description="The port for execution scheduler daemon to run on",
)
agent_server_port: int = Field(
default=8004,
description="The port for agent server daemon to run on",
)
@classmethod
def settings_customise_sources(
cls,

View File

@@ -13,7 +13,7 @@ async def test_agent_schedule(server: SpinTestServer):
test_user = await create_test_user()
test_graph = await graph.create_graph(create_test_graph(), user_id=test_user.id)
scheduler = get_service_client(ExecutionScheduler, 8002)
scheduler = get_service_client(ExecutionScheduler, 8003)
schedules = scheduler.get_execution_schedules(test_graph.id, test_user.id)
assert len(schedules) == 0

View File

@@ -5,6 +5,7 @@ from autogpt_server.util.service import AppService, expose, get_service_client
class TestService(AppService):
def __init__(self):
super().__init__(port=8005)
self.use_redis = False
def run_service(self):
@@ -29,7 +30,7 @@ class TestService(AppService):
@pytest.mark.asyncio(scope="session")
async def test_service_creation(server):
with TestService():
client = get_service_client(TestService, 8000)
client = get_service_client(TestService, 8005)
assert client.add(5, 3) == 8
assert client.subtract(10, 4) == 6
assert client.fun_with_async(5, 3) == 8