mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-08 03:00:28 -04:00
fix(backend/executor): ensure cluster lock release on all execution submission failures (#11281)
## Root Cause
During rolling deployment, execution
`97058338-052a-4528-87f4-98c88416bb7f` got stuck in QUEUED state
because:
1. Pod acquired cluster lock successfully during shutdown
2. Subsequent setup operations failed (ThreadPoolExecutor shutdown,
resource exhaustion, etc.)
3. **No error handling existed** around the critical section after lock
acquisition
4. Cluster lock remained stuck in Redis for 5 minutes (TTL timeout)
5. Other pods couldn't acquire the lock, leaving execution permanently
queued
## The Fix
### Problem: Critical Section Not Protected
The original code had no error handling for the entire critical section
after successful lock acquisition:
```python
# Original code - no error handling after lock acquired
current_owner = cluster_lock.try_acquire()
if current_owner != self.executor_id:
return # didn't get lock
# CRITICAL SECTION - any failure here leaves lock stuck
self._execution_locks[graph_exec_id] = cluster_lock # Could fail: memory
logger.info("Acquired cluster lock...") # Could fail: logging
cancel_event = threading.Event() # Could fail: resources
future = self.executor.submit(...) # Could fail: shutdown
self.active_graph_runs[...] = (future, cancel_event) # Could fail: memory
```
### Solution: Wrap Entire Critical Section
Protect ALL operations after successful lock acquisition:
```python
# Fixed code - comprehensive error handling
current_owner = cluster_lock.try_acquire()
if current_owner != self.executor_id:
return # didn't get lock
# Wrap ENTIRE critical section after successful acquisition
try:
self._execution_locks[graph_exec_id] = cluster_lock
logger.info("Acquired cluster lock...")
cancel_event = threading.Event()
future = self.executor.submit(...)
self.active_graph_runs[...] = (future, cancel_event)
except Exception as e:
# Release cluster lock before requeue
cluster_lock.release()
del self._execution_locks[graph_exec_id]
_ack_message(reject=True, requeue=True)
return
```
### Why This Comprehensive Approach Works
- **Complete protection**: Any failure in critical section → lock
released
- **Proper cleanup order**: Lock released → message requeued → another
pod can try
- **Uses existing infrastructure**: Leverages established
`_ack_message()` requeue logic
- **Handles all scenarios**: ThreadPoolExecutor shutdown, resource
exhaustion, memory issues, logging failures
## Protected Failure Scenarios
1. **Memory exhaustion**: `_execution_locks` assignment or
`active_graph_runs` assignment
2. **Resource exhaustion**: `threading.Event()` creation fails
3. **ThreadPoolExecutor shutdown**: `executor.submit()` with "cannot
schedule new futures after shutdown"
4. **Logging system failures**: `logger.info()` calls fail
5. **Any unexpected exceptions**: Network issues, disk problems, etc.
## Validation
- ✅ All existing tests pass
- ✅ Maintains exact same success path behavior
- ✅ Comprehensive error handling for all failure points
- ✅ Minimal code change with maximum protection
## Impact
- **Eliminates stuck executions** during pod lifecycle events (rolling
deployments, scaling, crashes)
- **Faster recovery**: Immediate requeue vs 5-minute Redis TTL wait
- **Higher reliability**: Handles ANY failure in the critical section
- **Production-ready**: Comprehensive solution for distributed lock
management
This prevents the exact race condition that caused execution
`97058338-052a-4528-87f4-98c88416bb7f` to be stuck for >300 seconds,
plus many other potential failure scenarios.
---------
Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -1555,18 +1555,30 @@ class ExecutionManager(AppProcess):
|
||||
)
|
||||
_ack_message(reject=True, requeue=True)
|
||||
return
|
||||
self._execution_locks[graph_exec_id] = cluster_lock
|
||||
|
||||
logger.info(
|
||||
f"[{self.service_name}] Acquired cluster lock for {graph_exec_id} with executor {self.executor_id}"
|
||||
)
|
||||
# Wrap entire block after successful lock acquisition
|
||||
try:
|
||||
self._execution_locks[graph_exec_id] = cluster_lock
|
||||
|
||||
cancel_event = threading.Event()
|
||||
logger.info(
|
||||
f"[{self.service_name}] Acquired cluster lock for {graph_exec_id} with executor {self.executor_id}"
|
||||
)
|
||||
|
||||
future = self.executor.submit(
|
||||
execute_graph, graph_exec_entry, cancel_event, cluster_lock
|
||||
)
|
||||
self.active_graph_runs[graph_exec_id] = (future, cancel_event)
|
||||
cancel_event = threading.Event()
|
||||
future = self.executor.submit(
|
||||
execute_graph, graph_exec_entry, cancel_event, cluster_lock
|
||||
)
|
||||
self.active_graph_runs[graph_exec_id] = (future, cancel_event)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"[{self.service_name}] Failed to setup execution for {graph_exec_id}: {type(e).__name__}: {e}"
|
||||
)
|
||||
# Release cluster lock before requeue
|
||||
cluster_lock.release()
|
||||
if graph_exec_id in self._execution_locks:
|
||||
del self._execution_locks[graph_exec_id]
|
||||
_ack_message(reject=True, requeue=True)
|
||||
return
|
||||
self._update_prompt_metrics()
|
||||
|
||||
def _on_run_done(f: Future):
|
||||
|
||||
Reference in New Issue
Block a user