fix(backend/executor): prevent infinite requeueing of malformed messages (#10746)

### Changes 🏗️

This PR fixes an infinite loop issue in the execution manager where
malformed or unparseable messages would be continuously requeued,
causing high CPU usage and preventing the system from processing
legitimate messages.

**Key changes:**
- Modified `_ack_message()` function to accept explicit `requeue`
parameter
- Set `requeue=False` for malformed/unparseable messages that cannot be
fixed by retrying
- Set `requeue=False` for duplicate execution attempts (graph already
running)
- Kept `requeue=True` for legitimate failures that may succeed on retry
(e.g., temporary resource constraints, network issues)

**Technical details:**
The previous implementation always set `requeue=True` when rejecting
messages with `basic_nack()`. This caused problematic messages to be
immediately re-delivered to the consumer, creating an infinite loop for:
1. Messages with invalid JSON that cannot be parsed
2. Messages for executions that are already running (duplicates)

These scenarios will never succeed regardless of how many times they're
retried, so they should be rejected without requeueing to prevent
resource exhaustion.

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  - [x] Verified malformed messages are rejected without requeue
- [x] Confirmed duplicate execution messages are rejected without
requeue
- [x] Ensured legitimate failures (shutdown, pool full) still requeue
properly
- [x] Tested that normal message processing continues to work correctly
This commit is contained in:
Zamil Majdy
2025-08-26 15:34:58 +04:00
committed by GitHub
parent 8a68e03eb1
commit c0172c93aa

View File

@@ -1394,14 +1394,14 @@ class ExecutionManager(AppProcess):
delivery_tag = method.delivery_tag
@func_retry
def _ack_message(reject: bool = False):
def _ack_message(reject: bool, requeue: bool):
"""Acknowledge or reject the message based on execution status."""
# Connection can be lost, so always get a fresh channel
channel = self.run_client.get_channel()
if reject:
channel.connection.add_callback_threadsafe(
lambda: channel.basic_nack(delivery_tag, requeue=True)
lambda: channel.basic_nack(delivery_tag, requeue=requeue)
)
else:
channel.connection.add_callback_threadsafe(
@@ -1413,13 +1413,13 @@ class ExecutionManager(AppProcess):
logger.info(
f"[{self.service_name}] Rejecting new execution during shutdown"
)
_ack_message(reject=True)
_ack_message(reject=True, requeue=True)
return
# Check if we can accept more runs
self._cleanup_completed_runs()
if len(self.active_graph_runs) >= self.pool_size:
_ack_message(reject=True)
_ack_message(reject=True, requeue=True)
return
try:
@@ -1428,7 +1428,7 @@ class ExecutionManager(AppProcess):
logger.error(
f"[{self.service_name}] Could not parse run message: {e}, body={body}"
)
_ack_message(reject=True)
_ack_message(reject=True, requeue=False)
return
graph_exec_id = graph_exec_entry.graph_exec_id
@@ -1440,7 +1440,7 @@ class ExecutionManager(AppProcess):
logger.error(
f"[{self.service_name}] Graph {graph_exec_id} already running; rejecting duplicate run."
)
_ack_message(reject=True)
_ack_message(reject=True, requeue=False)
return
cancel_event = threading.Event()
@@ -1456,9 +1456,9 @@ class ExecutionManager(AppProcess):
logger.error(
f"[{self.service_name}] Execution for {graph_exec_id} failed: {type(exec_error)} {exec_error}"
)
_ack_message(reject=True)
_ack_message(reject=True, requeue=True)
else:
_ack_message(reject=False)
_ack_message(reject=False, requeue=False)
except BaseException as e:
logger.exception(
f"[{self.service_name}] Error in run completion callback: {e}"