feat(backend): Improve pyro reliability by adding connection timeout, retry, cleanup, and dynamic connection thread size (#8574)

This commit is contained in:
Zamil Majdy
2024-11-07 11:34:32 +07:00
committed by GitHub
parent b08ad973fa
commit 91edf08540
4 changed files with 45 additions and 5 deletions

View File

@@ -36,7 +36,12 @@ from backend.util import json
from backend.util.decorator import error_logged, time_measured
from backend.util.logging import configure_logging
from backend.util.process import set_service_name
from backend.util.service import AppService, expose, get_service_client
from backend.util.service import (
AppService,
close_service_client,
expose,
get_service_client,
)
from backend.util.settings import Settings
from backend.util.type import convert
@@ -452,6 +457,8 @@ class Executor:
cls.creds_manager.release_all_locks()
logger.info(f"[on_node_executor_stop {cls.pid}] ⏳ Disconnecting Redis...")
redis.disconnect()
logger.info(f"[on_node_executor_stop {cls.pid}] ⏳ Disconnecting DB manager...")
close_service_client(cls.db_client)
logger.info(f"[on_node_executor_stop {cls.pid}] ✅ Finished cleanup")
@classmethod
@@ -537,6 +544,8 @@ class Executor:
prefix = f"[on_graph_executor_stop {cls.pid}]"
logger.info(f"{prefix} ⏳ Terminating node executor pool...")
cls.executor.terminate()
logger.info(f"{prefix} ⏳ Disconnecting DB manager...")
close_service_client(cls.db_client)
logger.info(f"{prefix} ✅ Finished cleanup")
@classmethod

View File

@@ -30,6 +30,7 @@ from typing import (
import Pyro5.api
from pydantic import BaseModel
from Pyro5 import api as pyro
from Pyro5 import config as pyro_config
from backend.data import db, redis
from backend.util.process import AppProcess
@@ -40,7 +41,10 @@ logger = logging.getLogger(__name__)
T = TypeVar("T")
C = TypeVar("C", bound=Callable)
pyro_host = Config().pyro_host
config = Config()
pyro_host = config.pyro_host
pyro_config.MAX_RETRIES = config.pyro_client_comm_retry # type: ignore
pyro_config.COMMTIMEOUT = config.pyro_client_comm_timeout # type: ignore
def expose(func: C) -> C:
@@ -166,8 +170,14 @@ class AppService(AppProcess, ABC):
@conn_retry("Pyro", "Starting Pyro Service")
def __start_pyro(self):
host = Config().pyro_host
daemon = Pyro5.api.Daemon(host=host, port=self.get_port())
conf = Config()
maximum_connection_thread_count = max(
Pyro5.config.THREADPOOL_SIZE,
conf.num_node_workers * conf.num_graph_workers,
)
Pyro5.config.THREADPOOL_SIZE = maximum_connection_thread_count # type: ignore
daemon = Pyro5.api.Daemon(host=conf.pyro_host, port=self.get_port())
self.uri = daemon.register(self, objectId=self.service_name)
logger.info(f"[{self.service_name}] Connected to Pyro; URI = {self.uri}")
daemon.requestLoop()
@@ -182,10 +192,21 @@ class AppService(AppProcess, ABC):
AS = TypeVar("AS", bound=AppService)
class PyroClient:
proxy: Pyro5.api.Proxy
def close_service_client(client: AppService) -> None:
if isinstance(client, PyroClient):
client.proxy._pyroRelease()
else:
raise RuntimeError(f"Client {client.__class__} is not a Pyro client.")
def get_service_client(service_type: Type[AS]) -> AS:
service_name = service_type.service_name
class DynamicClient:
class DynamicClient(PyroClient):
@conn_retry("Pyro", f"Connecting to [{service_name}]")
def __init__(self):
host = os.environ.get(f"{service_name.upper()}_HOST", "localhost")

View File

@@ -69,6 +69,14 @@ class Config(UpdateTrackingModel["Config"], BaseSettings):
default="localhost",
description="The default hostname of the Pyro server.",
)
pyro_client_comm_timeout: float = Field(
default=15,
description="The default timeout in seconds, for Pyro client connections.",
)
pyro_client_comm_retry: int = Field(
default=3,
description="The default number of retries for Pyro client connections.",
)
enable_auth: bool = Field(
default=True,
description="If authentication is enabled or not",

View File

@@ -111,6 +111,8 @@ env:
AGENTSERVER_HOST: "autogpt-server.prod-agpt.svc.cluster.local"
EXECUTIONMANAGER_HOST: "autogpt-server-executor.prod-agpt.svc.cluster.local"
DATABASEMANAGER_HOST: "autogpt-server-executor.prod-agpt.svc.cluster.local"
PYRO_CLIENT_COMM_TIMEOUT: 15
PYRO_CLIENT_COMM_RETRY: 3
secrets:
REPLICATE_API_KEY: "AgCPCgcYb+tE8/k45Y7/my4G2jWPCuEMTXJIn1fG1q4x4ZJPFzb43m7Uqtwn23NkmUZ5Qvh8BXedrtHwxapuYzw/P6c7xK66xfLKRbTWtYk4twS3sxPb+pt1FXY4USEjj5yeIFduybkqhE2QfnGoyrbDZ4Bz3AIgnrRD0Ee5m9u5yNZTPmJqZZqg4MRdUBCxCWIJBkW6DCE9nCPAQeNPD6e+lZ1j+/LocT2HX/ZlcsPXCxbn6wkxoyLqA0vUKSG9azS6oLvn0/3Cb01ozG8S2OEAqWIImFqhKGMfGqL6jSZWln43cmQdMTzSzM+HiprA9JHjZqGK7wOV9HZvSR+58IXoJGPBEIM7jIg5KqPjpZY4KFZBp5OiiRRYu+nCbuD+KsY/7ogjPHjbi1rpR8TrtXdzWNmwsTTmjytB/KEqeUpLWOEPgArFPyrNTS5/nmREH7r9jNEhfIRdTlS3IVGGXp/VN8napbNND1GDyzowvF771neq7/zTmfCRCJ4J0gwPNKM5rzOuRW+caEf2qOFBKIldVa/J0PFg5bAgpGL6jhpXHj0Q/+j1s3FA/D2ZebZTPIpKe40It3sWsS/0Qjhbj1GMbL4yUWvGpBSUTk7kZazkaVND1LbhjC+4AolTQdIU4MgW0bkmDn5ZI4a9/dHyLS3lFeYNSQ6vnbz+Id7zB3O0D6/FH8nfAUGL8V+J3eFKMp+G67z+XYH6WGABaNicz41zFBDF5hRax+k/ZziPPlFY0kDc3cAB6pLc"