fix(backend): Fix execution manager message consuming pattern (#9829)

We have seen instances where the executor gets stuck in a failing
message-consuming loop due to the upstream RabbitMQ being down. The
current message-consuming pattern is not optimal for handling this.

### Changes 🏗️

* Add a retry limit to the execution loop limit.
* Use `basic_consume` instead of `basic_get` for handling message
consumption.

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  - [x] Run agents cancel them
This commit is contained in:
Zamil Majdy
2025-04-16 17:54:26 +02:00
committed by GitHub
parent c0ee71fb27
commit 44e3770003
2 changed files with 61 additions and 28 deletions

View File

@@ -12,7 +12,7 @@ from multiprocessing.pool import AsyncResult, Pool
from typing import TYPE_CHECKING, Any, Generator, TypeVar, cast
from pika.adapters.blocking_connection import BlockingChannel
from pika.spec import Basic
from pika.spec import Basic, BasicProperties
from redis.lock import Lock as RedisLock
from backend.blocks.io import AgentOutputBlock
@@ -887,11 +887,29 @@ class ExecutionManager(AppProcess):
return settings.config.execution_manager_port
def run(self):
while True:
retry_count_max = settings.config.execution_manager_loop_max_retry
retry_count = 0
for retry_count in range(retry_count_max):
try:
self._run()
except Exception:
logger.exception(f"[{self.service_name}] error in graph executor loop")
except Exception as e:
if not self.running:
break
logger.exception(
f"[{self.service_name}] Error in execution manager: {e}"
)
if retry_count >= retry_count_max:
logger.error(
f"[{self.service_name}] Max retries reached ({retry_count_max}), exiting..."
)
break
else:
logger.info(
f"[{self.service_name}] Retrying execution loop in {retry_count} seconds..."
)
time.sleep(retry_count)
def _run(self):
logger.info(f"[{self.service_name}] ⏳ Spawn max-{self.pool_size} workers...")
@@ -903,29 +921,34 @@ class ExecutionManager(AppProcess):
logger.info(f"[{self.service_name}] ⏳ Connecting to Redis...")
redis.connect()
# Consume Cancel & Run execution requests.
channel = get_execution_queue().get_channel()
channel.basic_qos(prefetch_count=self.pool_size)
channel.basic_consume(
queue=GRAPH_EXECUTION_CANCEL_QUEUE_NAME,
on_message_callback=self._handle_cancel_message,
auto_ack=True,
)
channel.basic_consume(
queue=GRAPH_EXECUTION_QUEUE_NAME,
on_message_callback=self._handle_run_message,
auto_ack=False,
)
logger.info(f"[{self.service_name}] Ready to consume messages...")
while True:
channel = get_execution_queue().get_channel()
channel.start_consuming()
# cancel graph execution requests
method_frame, _, body = channel.basic_get(
queue=GRAPH_EXECUTION_CANCEL_QUEUE_NAME,
auto_ack=True,
)
if method_frame:
self._handle_cancel_message(body)
# start graph execution requests
method_frame, _, body = channel.basic_get(
queue=GRAPH_EXECUTION_QUEUE_NAME,
auto_ack=False,
)
if method_frame:
self._handle_run_message(channel, method_frame, body)
else:
time.sleep(0.2)
def _handle_cancel_message(self, body: bytes):
def _handle_cancel_message(
self,
channel: BlockingChannel,
method: Basic.Deliver,
properties: BasicProperties,
body: bytes,
):
"""
Called whenever we receive a CANCEL message from the queue.
(With auto_ack=True, message is considered 'acked' automatically.)
"""
try:
request = CancelExecutionEvent.model_validate_json(body)
graph_exec_id = request.graph_exec_id
@@ -953,9 +976,13 @@ class ExecutionManager(AppProcess):
logger.exception(f"Error handling cancel message: {e}")
def _handle_run_message(
self, channel: BlockingChannel, method_frame: Basic.GetOk, body: bytes
self,
channel: BlockingChannel,
method: Basic.Deliver,
properties: BasicProperties,
body: bytes,
):
delivery_tag = method_frame.delivery_tag
delivery_tag = method.delivery_tag
try:
graph_exec_entry = GraphExecutionEntry.model_validate_json(body)
except Exception as e:
@@ -983,12 +1010,14 @@ class ExecutionManager(AppProcess):
def _on_run_done(f: Future):
logger.info(f"[{self.service_name}] Run completed for {graph_exec_id}")
try:
channel.basic_ack(delivery_tag)
self.active_graph_runs.pop(graph_exec_id, None)
if f.exception():
logger.error(
f"[{self.service_name}] Execution for {graph_exec_id} failed: {f.exception()}"
)
channel.basic_nack(delivery_tag, requeue=False)
else:
channel.basic_ack(delivery_tag)
except Exception as e:
logger.error(f"[{self.service_name}] Error acknowledging message: {e}")

View File

@@ -137,6 +137,10 @@ class Config(UpdateTrackingModel["Config"], BaseSettings):
default=8002,
description="The port for execution manager daemon to run on",
)
execution_manager_loop_max_retry: int = Field(
default=5,
description="The maximum number of retries for the execution manager loop",
)
execution_scheduler_port: int = Field(
default=8003,