fix(backend): Add timeout on stopping message consumer on manager

This commit is contained in:
Zamil Majdy
2025-08-08 18:04:10 +07:00
parent 5338ab5b80
commit de7b6b503f

View File

@@ -1361,6 +1361,25 @@ class ExecutionManager(AppProcess):
else:
utilization_gauge.set(active_count / self.pool_size)
def _stop_message_consumers(
self, thread: threading.Thread, client: SyncRabbitMQ, prefix: str
):
try:
channel = client.get_channel()
channel.connection.add_callback_threadsafe(lambda: channel.stop_consuming())
try:
thread.join(timeout=300)
except TimeoutError:
logger.error(
f"{prefix} ⚠️ Run thread did not finish in time, forcing disconnect"
)
client.disconnect()
logger.info(f"{prefix} ✅ Run client disconnected")
except Exception as e:
logger.error(f"{prefix} ⚠️ Error disconnecting run client: {type(e)} {e}")
def cleanup(self):
"""Override cleanup to implement graceful shutdown with active execution waiting."""
prefix = f"[{self.service_name}][on_graph_executor_stop {os.getpid()}]"
@@ -1416,32 +1435,16 @@ class ExecutionManager(AppProcess):
logger.error(f"{prefix} ⚠️ Error during executor shutdown: {type(e)} {e}")
# Disconnect the run execution consumer
try:
run_channel = self.run_client.get_channel()
run_channel.connection.add_callback_threadsafe(
lambda: run_channel.stop_consuming()
)
self.run_thread.join()
run_channel.connection.add_callback_threadsafe(
lambda: self.run_client.disconnect()
)
logger.info(f"{prefix} ✅ Run client disconnected")
except Exception as e:
logger.error(f"{prefix} ⚠️ Error disconnecting run client: {type(e)} {e}")
# Disconnect the cancel execution consumer
try:
cancel_channel = self.cancel_client.get_channel()
cancel_channel.connection.add_callback_threadsafe(
lambda: cancel_channel.stop_consuming()
)
self.cancel_thread.join()
cancel_channel.connection.add_callback_threadsafe(
lambda: self.cancel_client.disconnect()
)
logger.info(f"{prefix} ✅ Cancel client disconnected")
except Exception as e:
logger.error(f"{prefix} ⚠️ Error disconnecting cancel client: {type(e)} {e}")
self._stop_message_consumers(
self.run_thread,
self.run_client,
prefix + " [run-consumer]",
)
self._stop_message_consumers(
self.cancel_thread,
self.cancel_client,
prefix + " [cancel-consumer]",
)
logger.info(f"{prefix} ✅ Finished GraphExec cleanup")