fix(rnd): Fix prisma connection acquisition intermittent error on linux (#7999)

This commit is contained in:
Zamil Majdy
2024-09-05 15:22:25 -05:00
committed by GitHub
parent 79ebc4c13b
commit 2df325d033
9 changed files with 38 additions and 15 deletions

View File

@@ -1,3 +1,4 @@
from multiprocessing import set_start_method
from typing import TYPE_CHECKING
from .util.logging import configure_logging
@@ -11,6 +12,7 @@ def run_processes(*processes: "AppProcess", **kwargs):
Execute all processes in the app. The last process is run in the foreground.
"""
try:
set_start_method("spawn", force=True)
configure_logging()
for process in processes[:-1]:

View File

@@ -1,3 +1,4 @@
import asyncio
import os
from contextlib import asynccontextmanager
from uuid import uuid4
@@ -11,17 +12,33 @@ load_dotenv()
PRISMA_SCHEMA = os.getenv("PRISMA_SCHEMA", "schema.prisma")
os.environ["PRISMA_SCHEMA_PATH"] = PRISMA_SCHEMA
prisma = Prisma(auto_register=True)
prisma, conn_id = Prisma(auto_register=True), ""
async def connect():
if not prisma.is_connected():
await prisma.connect()
async def connect(call_count=0):
global conn_id
if not conn_id:
conn_id = str(uuid4())
try:
print(f"[Prisma-{conn_id}] Acquiring connection..")
if not prisma.is_connected():
await prisma.connect()
print(f"[Prisma-{conn_id}] Connection acquired!")
except Exception as e:
if call_count <= 5:
print(f"[Prisma-{conn_id}] Connection failed: {e}. Retrying now..")
await asyncio.sleep(call_count)
await connect(call_count + 1)
else:
raise e
async def disconnect():
if prisma.is_connected():
print(f"[Prisma-{conn_id}] Releasing connection.")
await prisma.disconnect()
print(f"[Prisma-{conn_id}] Connection released.")
@asynccontextmanager

View File

@@ -621,7 +621,7 @@ class Executor:
class ExecutionManager(AppService):
def __init__(self):
self.use_redis = False
self.use_db = True
self.pool_size = Config().num_graph_workers
self.queue = ExecutionQueue[GraphExecution]()
self.active_graph_runs: dict[str, tuple[Future, threading.Event]] = {}

View File

@@ -19,9 +19,9 @@ def log(msg, **kwargs):
class ExecutionScheduler(AppService):
def __init__(self, refresh_interval=10):
self.use_db = True
self.last_check = datetime.min
self.refresh_interval = refresh_interval
self.use_redis = False
@property
def execution_manager_client(self) -> ExecutionManager:

View File

@@ -29,7 +29,6 @@ settings = Settings()
class AgentServer(AppService):
mutex = KeyedMutex()
use_db = False
use_redis = True
_test_dependency_overrides = {}

View File

@@ -1,7 +1,7 @@
import os
import sys
from abc import ABC, abstractmethod
from multiprocessing import Process
from multiprocessing import Process, set_start_method
from typing import Optional
@@ -11,6 +11,7 @@ class AppProcess(ABC):
"""
process: Optional[Process] = None
set_start_method("spawn", force=True)
@abstractmethod
def run(self):

View File

@@ -0,0 +1,7 @@
from tenacity import retry, stop_after_attempt, wait_exponential
conn_retry = retry(
stop=stop_after_attempt(30),
wait=wait_exponential(multiplier=1, min=1, max=30),
reraise=True,
)

View File

@@ -7,17 +7,14 @@ from typing import Any, Callable, Coroutine, Type, TypeVar, cast
from Pyro5 import api as pyro
from Pyro5 import nameserver
from tenacity import retry, stop_after_attempt, wait_exponential
from autogpt_server.data import db
from autogpt_server.data.queue import AsyncEventQueue, AsyncRedisEventQueue
from autogpt_server.util.process import AppProcess
from autogpt_server.util.retry import conn_retry
from autogpt_server.util.settings import Config
logger = logging.getLogger(__name__)
conn_retry = retry(
stop=stop_after_attempt(30), wait=wait_exponential(multiplier=1, min=1, max=30)
)
T = TypeVar("T")
C = TypeVar("C", bound=Callable)
@@ -65,7 +62,7 @@ class PyroNameServer(AppProcess):
class AppService(AppProcess):
shared_event_loop: asyncio.AbstractEventLoop
event_queue: AsyncEventQueue = AsyncRedisEventQueue()
use_db: bool = True
use_db: bool = False
use_redis: bool = False
@classmethod

View File

@@ -25,7 +25,7 @@ requests = "*"
sentry-sdk = "^1.40.4"
[package.extras]
benchmark = ["agbenchmark"]
benchmark = ["agbenchmark @ file:///Users/majdyz/Code/AutoGPT/benchmark"]
[package.source]
type = "directory"
@@ -386,7 +386,7 @@ watchdog = "4.0.0"
webdriver-manager = "^4.0.1"
[package.extras]
benchmark = ["agbenchmark"]
benchmark = ["agbenchmark @ file:///Users/majdyz/Code/AutoGPT/benchmark"]
[package.source]
type = "directory"