fix(backend): Avoid releasing lock that is no longer owned by the current thread (#9878)

There are instances of node executions that were failed and end up stuck
in the RUNNING status due to the execution failed to release the lock:
```
2025-04-24 20:53:31,573 INFO  [ExecutionManager|uid:25eba2d1-e9c1-44bc-88c7-43e0f4fbad5a|gid:01f8c315-c163-4dd1-a8a0-d396477c5a9f|nid:f8bf84ae-b1f0-4434-8f04-80f43852bc30]|geid:2e1b35c6-0d2f-4e97-adea-f6fe0d9965d0|neid:590b29ea-63ee-4e24-a429-de5a3e191e72|-] Failed node execution 590b29ea-63ee-4e24-a429-de5a3e191e72: Cannot release a lock that's no longer owned
```

### Changes 🏗️

Check the ownership of the lock before releasing.

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  <!-- Put your test plan here: -->
  - [x] Existing CI tests.

(cherry picked from commit ef022720d5)
This commit is contained in:
Zamil Majdy
2025-04-25 10:39:10 +03:00
parent 4ddb206f86
commit 96a173a85f
3 changed files with 5 additions and 5 deletions

View File

@@ -31,7 +31,7 @@ class RedisKeyedMutex:
try:
yield
finally:
if lock.locked():
if lock.locked() and lock.owned():
lock.release()
def acquire(self, key: Any) -> "RedisLock":

View File

@@ -267,7 +267,7 @@ def execute_node(
raise e
finally:
# Ensure credentials are released even if execution fails
if creds_lock and creds_lock.locked():
if creds_lock and creds_lock.locked() and creds_lock.owned():
try:
creds_lock.release()
except Exception as e:
@@ -1115,7 +1115,7 @@ def synchronized(key: str, timeout: int = 60):
lock.acquire()
yield
finally:
if lock.locked():
if lock.locked() and lock.owned():
lock.release()

View File

@@ -93,7 +93,7 @@ class IntegrationCredentialsManager:
fresh_credentials = oauth_handler.refresh_tokens(credentials)
self.store.update_creds(user_id, fresh_credentials)
if _lock and _lock.locked():
if _lock and _lock.locked() and _lock.owned():
_lock.release()
credentials = fresh_credentials
@@ -145,7 +145,7 @@ class IntegrationCredentialsManager:
try:
yield
finally:
if lock.locked():
if lock.locked() and lock.owned():
lock.release()
def release_all_locks(self):