fix(backend/executor): Make executor continuously running and retrying message consumption (#9999)

The executor can sometimes become dangling due to the executor stopping
executing messages but the process is not fully killed. This PR avoids
such a scenario by simply keeping retrying it.

### Changes 🏗️

Introduced continuous_retry decorator and use it to executor message
consumption/

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  <!-- Put your test plan here: -->
  - [x] Run executor service and execute some agents.
This commit is contained in:
Zamil Majdy
2025-05-22 13:11:08 +01:00
parent 5e7b66da90
commit 793d056d81
2 changed files with 43 additions and 19 deletions

View File

@@ -67,7 +67,7 @@ from backend.util.decorator import error_logged, time_measured
from backend.util.file import clean_exec_files
from backend.util.logging import TruncatedLogger, configure_logging
from backend.util.process import AppProcess, set_service_name
from backend.util.retry import func_retry
from backend.util.retry import continuous_retry, func_retry
from backend.util.service import get_service_client
from backend.util.settings import Settings
@@ -938,8 +938,6 @@ class ExecutionManager(AppProcess):
self.pool_size = settings.config.num_graph_workers
self.running = True
self.active_graph_runs: dict[str, tuple[Future, threading.Event]] = {}
signal.signal(signal.SIGTERM, lambda sig, frame: self._on_sigterm())
signal.signal(signal.SIGINT, lambda sig, frame: self._on_sigterm())
def run(self):
pool_size_gauge.set(self.pool_size)
@@ -965,22 +963,29 @@ class ExecutionManager(AppProcess):
logger.info(f"[{self.service_name}] ⏳ Connecting to Redis...")
redis.connect()
threading.Thread(
target=lambda: self._consume_execution_cancel(),
daemon=True,
).start()
self._consume_execution_run()
@continuous_retry()
def _consume_execution_cancel(self):
cancel_client = SyncRabbitMQ(create_execution_queue_config())
cancel_client.connect()
cancel_channel = cancel_client.get_channel()
logger.info(f"[{self.service_name}] ⏳ Starting cancel message consumer...")
threading.Thread(
target=lambda: (
cancel_channel.basic_consume(
queue=GRAPH_EXECUTION_CANCEL_QUEUE_NAME,
on_message_callback=self._handle_cancel_message,
auto_ack=True,
),
cancel_channel.start_consuming(),
),
daemon=True,
).start()
cancel_channel.basic_consume(
queue=GRAPH_EXECUTION_CANCEL_QUEUE_NAME,
on_message_callback=self._handle_cancel_message,
auto_ack=True,
)
cancel_channel.start_consuming()
raise RuntimeError(f"❌ cancel message consumer is stopped: {cancel_channel}")
@continuous_retry()
def _consume_execution_run(self):
run_client = SyncRabbitMQ(create_execution_queue_config())
run_client.connect()
run_channel = run_client.get_channel()
@@ -992,6 +997,7 @@ class ExecutionManager(AppProcess):
)
logger.info(f"[{self.service_name}] ⏳ Starting to consume run messages...")
run_channel.start_consuming()
raise RuntimeError(f"❌ run message consumer is stopped: {run_channel}")
def _handle_cancel_message(
self,
@@ -1090,10 +1096,6 @@ class ExecutionManager(AppProcess):
super().cleanup()
self._on_cleanup()
def _on_sigterm(self):
llprint(f"[{self.service_name}] ⚠️ GraphExec SIGTERM received")
self._on_cleanup(log=llprint)
def _on_cleanup(self, log=logger.info):
prefix = f"[{self.service_name}][on_graph_executor_stop {os.getpid()}]"
log(f"{prefix} ⏳ Shutting down service loop...")
@@ -1110,7 +1112,7 @@ class ExecutionManager(AppProcess):
redis.disconnect()
log(f"{prefix} ✅ Finished GraphExec cleanup")
exit(0)
sys.exit(0)
# ------- UTILITIES ------- #

View File

@@ -2,6 +2,7 @@ import asyncio
import logging
import os
import threading
import time
from functools import wraps
from uuid import uuid4
@@ -80,3 +81,24 @@ func_retry = retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=1, max=30),
)
def continuous_retry(*, retry_delay: float = 1.0):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
while True:
try:
return func(*args, **kwargs)
except Exception as exc:
logger.exception(
"%s failed with %s — retrying in %.2f s",
func.__name__,
exc,
retry_delay,
)
time.sleep(retry_delay)
return wrapper
return decorator