Compare commits

...

3 Commits

Author SHA1 Message Date
Zamil Majdy
5f802b6502 Merge branch 'dev' into fix/scheduler-thread-dump-debugging 2025-08-11 18:36:37 +04:00
Zamil Majdy
c5c206b6b2 Merge branch 'dev' of github.com:Significant-Gravitas/AutoGPT into fix/scheduler-thread-dump-debugging 2025-08-10 20:12:10 +07:00
Zamil Majdy
6888dc2f93 fix(backend): resolve scheduler deadlock and improve health checks (#10589)
## Summary
Fix critical deadlock issue where scheduler pods would freeze completely
and become unresponsive to health checks, causing pod restarts and stuck
QUEUED executions.

## Root Cause Analysis
The scheduler was using `BlockingScheduler` which blocked the main
thread, and when concurrent jobs deadlocked in the async event loop, the
entire process would freeze - unable to respond to health checks or
process any requests.

From crash analysis:
- At 01:18:00, two jobs started executing concurrently
- At 01:18:01.482, last successful health check
- Process completely froze - no more logs until pod was killed at
01:18:46
- Execution `8174c459-c975-4308-bc01-331ba67f26ab` was created in DB but
never published to RabbitMQ

## Changes Made

### Core Deadlock Fix
- **Switch from BlockingScheduler to BackgroundScheduler**: Prevents
main thread blocking, allows health checks to work even if scheduler
jobs deadlock
- **Make all health_check methods async**: Makes health checks
completely independent of thread pools and more resilient to blocking
operations

### Enhanced Monitoring & Debugging
- **Add execution timing**: Track and log how long each graph execution
takes to create and publish
- **Warn on slow operations**: Alert when operations take >10 seconds,
indicating resource contention
- **Enhanced error logging**: Include elapsed time and exception types
in error messages
- **Better APScheduler event listeners**: Add listeners for missed jobs
and max instances with actionable messages

### Files Modified
- `backend/executor/scheduler.py` - Switch to BackgroundScheduler, async
health_check, timing monitoring
- `backend/util/service.py` - Base async health_check method
- `backend/executor/database.py` - Async health_check override
- `backend/notifications/notifications.py` - Async health_check override

## Test Plan
- [x] All existing tests pass (914 passed, 1 failed unrelated connection
issue)
- [x] Scheduler starts correctly with BackgroundScheduler
- [x] Health checks respond properly under load
- [x] Enhanced logging provides visibility into execution timing

## Impact
- **Prevents pod freezes**: Scheduler remains responsive even when jobs
deadlock
- **Better observability**: Clear visibility into slow operations and
failures
- **No dropped executions**: Jobs won't get stuck in QUEUED state due to
process freezes
- **Faster incident response**: Health checks and logs provide
actionable debugging info

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-authored-by: Claude <noreply@anthropic.com>
2025-08-10 20:09:43 +07:00
2 changed files with 584 additions and 0 deletions

View File

@@ -1,7 +1,13 @@
import asyncio
import faulthandler
import io
import logging
import os
import signal
import sys
import threading
import traceback
from datetime import datetime
from enum import Enum
from typing import Optional
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse
@@ -247,11 +253,140 @@ class Scheduler(AppService):
if not self.scheduler.running:
raise UnhealthyServiceError("Scheduler is not running")
# Update health check timestamp for monitoring
self._update_health_check_time()
return await super().health_check()
def _signal_thread_dump_handler(self, signum, frame):
"""Signal handler for SIGUSR2 - dumps threads to stderr even when FastAPI is stuck"""
try:
import sys
sys.stderr.write(f"\n{'='*80}\n")
sys.stderr.write(f"SIGNAL THREAD DUMP - {datetime.now()}\n")
sys.stderr.write(f"Signal: {signum}, PID: {os.getpid()}\n")
sys.stderr.write(f"Total threads: {threading.active_count()}\n")
sys.stderr.write(f"{'='*80}\n")
current_frames = sys._current_frames()
threads = threading.enumerate()
for i, thread in enumerate(threads, 1):
sys.stderr.write(f"\n[{i}] Thread: {thread.name}\n")
sys.stderr.write(f" ID: {thread.ident}, Daemon: {thread.daemon}\n")
thread_frame = (
current_frames.get(thread.ident) if thread.ident else None
)
if thread_frame:
sys.stderr.write(" Stack:\n")
stack = traceback.extract_stack(thread_frame)
for j, (filename, lineno, name, line) in enumerate(stack[-12:]):
indent = " " + (" " * min(j, 8))
short_file = (
filename.split("/")[-1] if "/" in filename else filename
)
sys.stderr.write(f"{indent}{short_file}:{lineno} in {name}()\n")
if line and line.strip():
sys.stderr.write(f"{indent}{line.strip()}\n")
else:
sys.stderr.write(" No frame available\n")
# Scheduler info
sys.stderr.write(f"\n{'='*40}\n")
sys.stderr.write("SCHEDULER STATE:\n")
if hasattr(self, "scheduler") and self.scheduler:
sys.stderr.write(f"Running: {self.scheduler.running}\n")
try:
jobs = self.scheduler.get_jobs()
sys.stderr.write(f"Jobs: {len(jobs)}\n")
except Exception:
sys.stderr.write("Jobs: Error getting jobs\n")
else:
sys.stderr.write("Scheduler: Not initialized\n")
sys.stderr.write(f"{'='*80}\n")
sys.stderr.write("END SIGNAL THREAD DUMP\n")
sys.stderr.write(f"{'='*80}\n\n")
sys.stderr.flush()
except Exception as e:
import sys
sys.stderr.write(f"Error in signal handler: {e}\n")
sys.stderr.flush()
def _start_periodic_thread_dump(self):
"""Start background thread for periodic thread dumps"""
def periodic_dump():
import time
while True:
try:
time.sleep(300) # 5 minutes
# Only dump if we detect potential issues
current_time = time.time()
if hasattr(self, "_last_health_check"):
time_since_health = current_time - self._last_health_check
if time_since_health > 60: # No health check in 60 seconds
logger.warning(
"No health check in 60s, dumping threads for monitoring"
)
self._signal_thread_dump_handler(0, None)
# Also check if scheduler seems stuck
if hasattr(self, "scheduler") and self.scheduler:
try:
jobs = self.scheduler.get_jobs()
# Log periodic status
logger.info(
f"Periodic check: {len(jobs)} active jobs, {threading.active_count()} threads"
)
except Exception as e:
logger.warning(
f"Periodic check failed, dumping threads: {e}"
)
self._signal_thread_dump_handler(0, None)
except Exception as e:
logger.error(f"Error in periodic thread dump: {e}")
# Start daemon thread for periodic monitoring
dump_thread = threading.Thread(
target=periodic_dump, daemon=True, name="PeriodicThreadDump"
)
dump_thread.start()
logger.info("Periodic thread dump monitor started")
def _update_health_check_time(self):
"""Update last health check time for monitoring"""
import time
self._last_health_check = time.time()
def run_service(self):
load_dotenv()
# Enable faulthandler for debugging deadlocks
faulthandler.enable()
# Register SIGUSR1 to dump all thread stacks on demand
faulthandler.register(signal.SIGUSR1, all_threads=True)
# Also register SIGUSR2 for custom thread dump (in case faulthandler doesn't work)
signal.signal(signal.SIGUSR2, self._signal_thread_dump_handler)
# Start periodic thread dump for monitoring
self._start_periodic_thread_dump()
logger.info(
"Faulthandler enabled. Send SIGUSR1 or SIGUSR2 to dump thread stacks. Periodic dumps every 5 minutes."
)
# Initialize the event loop for async jobs
global _event_loop
_event_loop = asyncio.new_event_loop()
@@ -484,6 +619,102 @@ class Scheduler(AppService):
"""Manually trigger cleanup of expired cloud storage files."""
return cleanup_expired_files()
@expose
def debug_thread_dump(self) -> str:
"""Get comprehensive thread dump for debugging deadlocks."""
try:
# Create string buffer to capture thread info
output = io.StringIO()
# Header
output.write(f"SCHEDULER THREAD DUMP - {datetime.now()}\n")
output.write("=" * 80 + "\n")
output.write(f"Process PID: {os.getpid()}\n")
output.write(f"Total threads: {threading.active_count()}\n\n")
# Get all threads with stack traces
current_frames = sys._current_frames()
threads = threading.enumerate()
for i, thread in enumerate(threads, 1):
output.write(f"[{i}/{len(threads)}] Thread: {thread.name}\n")
output.write(f" ID: {thread.ident}\n")
output.write(f" Daemon: {thread.daemon}\n")
output.write(f" Alive: {thread.is_alive()}\n")
# Get target if available (internal attribute)
if hasattr(thread, "_target") and getattr(thread, "_target", None):
output.write(f" Target: {getattr(thread, '_target')}\n")
# Get stack trace
frame = current_frames.get(thread.ident) if thread.ident else None
if frame:
output.write(" Stack trace:\n")
stack = traceback.extract_stack(frame)
for j, (filename, lineno, name, line) in enumerate(stack):
indent = " " + (" " * min(j, 6))
short_file = (
filename.split("/")[-1] if "/" in filename else filename
)
output.write(f"{indent}[{j+1}] {short_file}:{lineno}\n")
output.write(f"{indent} in {name}()\n")
if line and line.strip():
output.write(f"{indent}{line.strip()}\n")
else:
output.write(" ⚠️ No frame available\n")
output.write("\n" + "-" * 60 + "\n")
# Scheduler state info
output.write("\nSCHEDULER STATE:\n")
output.write("=" * 40 + "\n")
if hasattr(self, "scheduler") and self.scheduler:
output.write(f"Scheduler running: {self.scheduler.running}\n")
try:
jobs = self.scheduler.get_jobs()
output.write(f"Active jobs: {len(jobs)}\n")
for job in jobs[:5]: # First 5 jobs
output.write(f" {job.id}: next run {job.next_run_time}\n")
except Exception as e:
output.write(f"Error getting jobs: {e}\n")
else:
output.write("Scheduler not initialized\n")
# Event loop info
output.write("\nEVENT LOOP STATE:\n")
output.write("=" * 40 + "\n")
global _event_loop
if _event_loop:
output.write(f"Event loop running: {_event_loop.is_running()}\n")
try:
import asyncio
tasks = asyncio.all_tasks(_event_loop)
output.write(f"Active tasks: {len(tasks)}\n")
for task in list(tasks)[:5]: # First 5 tasks
output.write(f" {task.get_name()}: {task._state}\n")
except Exception as e:
output.write(f"Error getting tasks: {e}\n")
else:
output.write("Event loop not initialized\n")
output.write("\n" + "=" * 80 + "\n")
output.write("END THREAD DUMP\n")
result = output.getvalue()
output.close()
# Also log that we got a thread dump request
logger.info("Thread dump requested via HTTP endpoint")
return result
except Exception as e:
error_msg = f"Error generating thread dump: {type(e).__name__}: {e}"
logger.error(error_msg)
return error_msg
class SchedulerClient(AppServiceClient):
@classmethod

View File

@@ -0,0 +1,353 @@
#!/usr/bin/env python3
"""
Unified scheduler debugging tool
- Test deployment
- Collect thread dumps (signal-based, works when FastAPI is stuck)
- Monitor periodic dumps
"""
import subprocess
import sys
import time
from datetime import datetime
import requests
def find_scheduler_pod():
"""Find the running scheduler pod"""
result = subprocess.run(
"kubectl get pods -n dev-agpt --no-headers".split(),
capture_output=True,
text=True,
)
for line in result.stdout.split("\n"):
if "scheduler-server" in line and "Running" in line:
return line.split()[0]
return None
def test_deployment():
"""Test if the deployment has debugging enabled"""
print("🧪 TESTING SCHEDULER DEBUG DEPLOYMENT")
print("=" * 50)
pod_name = find_scheduler_pod()
if not pod_name:
print("❌ No scheduler pod found")
return False
print(f"📍 Pod: {pod_name}")
# Check if faulthandler is enabled
print("🔍 Checking faulthandler setup...")
log_result = subprocess.run(
f"kubectl logs -n dev-agpt {pod_name} --tail=50".split(),
capture_output=True,
text=True,
)
faulthandler_enabled = "Faulthandler enabled" in log_result.stdout
periodic_enabled = "Periodic thread dump monitor started" in log_result.stdout
if faulthandler_enabled:
print("✅ Faulthandler is enabled")
else:
print("❌ Faulthandler not found in logs")
if periodic_enabled:
print("✅ Periodic monitoring is enabled")
else:
print("❌ Periodic monitoring not found in logs")
# Test signal sending
print("\\n📡 Testing signal delivery...")
signal_result = subprocess.run(
f"kubectl exec -n dev-agpt {pod_name} -- kill -USR2 1".split(),
capture_output=True,
text=True,
)
if signal_result.returncode == 0:
print("✅ Signal sent successfully")
time.sleep(2)
# Check for thread dump in logs
new_logs = subprocess.run(
f"kubectl logs -n dev-agpt {pod_name} --tail=20".split(),
capture_output=True,
text=True,
)
if "SIGNAL THREAD DUMP" in new_logs.stdout:
print("✅ Thread dump appeared in logs!")
else:
print("⚠️ No thread dump found (might take a moment)")
else:
print(f"❌ Signal failed: {signal_result.stderr}")
# Test HTTP API (should work when not stuck)
print("\\n🌐 Testing HTTP API...")
pf_process = None
try:
pf_process = subprocess.Popen(
f"kubectl port-forward -n dev-agpt {pod_name} 8003:8003".split(),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
time.sleep(2)
response = requests.get("http://localhost:8003/debug_thread_dump", timeout=10)
if response.status_code == 200:
print("✅ HTTP API working")
print(f" Thread count found: {'Total threads:' in response.text}")
else:
print(f"⚠️ HTTP API returned: {response.status_code}")
except Exception as e:
print(f"⚠️ HTTP API failed: {e}")
finally:
if pf_process:
try:
pf_process.terminate()
pf_process.wait()
except Exception:
pass
success = faulthandler_enabled and signal_result.returncode == 0
print(
f"\\n{'✅ DEPLOYMENT TEST PASSED' if success else '❌ DEPLOYMENT TEST FAILED'}"
)
return success
def collect_thread_dump():
"""Collect comprehensive thread dump (works even when scheduler is stuck)"""
print("🚨 COLLECTING THREAD DUMP FROM SCHEDULER")
print("=" * 60)
pod_name = find_scheduler_pod()
if not pod_name:
print("❌ No scheduler pod found")
return False
print(f"📍 Pod: {pod_name}")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Send both signals for maximum coverage
print("📡 Sending signals for thread dumps...")
# SIGUSR1 (faulthandler)
result1 = subprocess.run(
f"kubectl exec -n dev-agpt {pod_name} -- kill -USR1 1".split(),
capture_output=True,
text=True,
)
print(f" SIGUSR1: {'' if result1.returncode == 0 else ''}")
time.sleep(1)
# SIGUSR2 (custom handler)
result2 = subprocess.run(
f"kubectl exec -n dev-agpt {pod_name} -- kill -USR2 1".split(),
capture_output=True,
text=True,
)
print(f" SIGUSR2: {'' if result2.returncode == 0 else ''}")
time.sleep(3) # Give signals time to execute
# Collect logs with thread dumps
print("📋 Collecting logs...")
log_result = subprocess.run(
f"kubectl logs -n dev-agpt {pod_name} --tail=500".split(),
capture_output=True,
text=True,
)
# Save everything
dump_file = f"THREAD_DUMP_{timestamp}.txt"
with open(dump_file, "w") as f:
f.write("SCHEDULER THREAD DUMP COLLECTION\\n")
f.write(f"Timestamp: {datetime.now()}\\n")
f.write(f"Pod: {pod_name}\\n")
f.write("=" * 80 + "\\n\\n")
f.write("FULL LOGS (last 500 lines):\\n")
f.write("-" * 40 + "\\n")
f.write(log_result.stdout)
print(f"💾 Full dump saved: {dump_file}")
# Extract and show thread dump preview
lines = log_result.stdout.split("\\n")
thread_dumps = []
in_dump = False
current_dump = []
for line in lines:
if any(
marker in line
for marker in ["SIGNAL THREAD DUMP", "Fatal Python error", "Thread 0x"]
):
if current_dump:
thread_dumps.append(current_dump)
current_dump = [line]
in_dump = True
elif in_dump and (
"END SIGNAL THREAD DUMP" in line or "Current thread 0x" in line
):
current_dump.append(line)
thread_dumps.append(current_dump)
current_dump = []
in_dump = False
elif in_dump:
current_dump.append(line)
if current_dump:
thread_dumps.append(current_dump)
if thread_dumps:
print(f"\\n🔍 FOUND {len(thread_dumps)} THREAD DUMP(S):")
print("-" * 50)
# Show the most recent/complete dump
latest_dump = thread_dumps[-1]
for i, line in enumerate(latest_dump[:50]): # First 50 lines
print(line)
if len(latest_dump) > 50:
print("... (truncated, see full dump in file)")
# Create separate file with just thread dumps
clean_dump_file = f"CLEAN_THREAD_DUMP_{timestamp}.txt"
with open(clean_dump_file, "w") as f:
f.write(f"EXTRACTED THREAD DUMPS - {datetime.now()}\\n")
f.write("=" * 60 + "\\n\\n")
for i, dump in enumerate(thread_dumps, 1):
f.write(f"DUMP #{i}:\\n")
f.write("-" * 30 + "\\n")
f.write("\\n".join(dump))
f.write("\\n\\n")
print(f"🎯 Clean thread dumps saved: {clean_dump_file}")
else:
print("⚠️ No thread dumps found in logs")
print("Recent log lines:")
for line in lines[-10:]:
print(f" {line}")
# Try HTTP backup (will fail if scheduler is stuck, but worth trying)
print("\\n🌐 Attempting HTTP backup...")
pf_process = None
try:
pf_process = subprocess.Popen(
f"kubectl port-forward -n dev-agpt {pod_name} 8003:8003".split(),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
time.sleep(2)
response = requests.get("http://localhost:8003/debug_thread_dump", timeout=5)
if response.status_code == 200:
http_file = f"HTTP_THREAD_DUMP_{timestamp}.txt"
with open(http_file, "w") as f:
f.write(response.text)
print(f"✅ HTTP backup saved: {http_file}")
else:
print(f"⚠️ HTTP failed: {response.status_code}")
except Exception as e:
print(f"⚠️ HTTP failed (expected if stuck): {e}")
finally:
if pf_process:
try:
pf_process.terminate()
pf_process.wait()
except Exception:
pass
print("\\n✅ COLLECTION COMPLETE!")
return len(thread_dumps) > 0
def monitor_periodic_dumps(duration_minutes=10):
"""Monitor periodic thread dumps for a specified duration"""
print(f"👁️ MONITORING PERIODIC DUMPS FOR {duration_minutes} MINUTES")
print("=" * 50)
pod_name = find_scheduler_pod()
if not pod_name:
print("❌ No scheduler pod found")
return
print(f"📍 Pod: {pod_name}")
print("⏰ Watching for periodic status messages and thread dumps...")
start_time = time.time()
end_time = start_time + (duration_minutes * 60)
# Get current log position (for reference, not used currently)
# Could be used for tracking new vs old logs if needed
while time.time() < end_time:
try:
# Get new logs
current_logs = subprocess.run(
f"kubectl logs -n dev-agpt {pod_name} --tail=50".split(),
capture_output=True,
text=True,
)
for line in current_logs.stdout.split("\\n"):
if "Periodic check:" in line:
print(f"📊 {line}")
elif "SIGNAL THREAD DUMP" in line:
print(f"🚨 Thread dump detected: {line}")
elif "No health check" in line:
print(f"⚠️ Health issue: {line}")
time.sleep(30) # Check every 30 seconds
except KeyboardInterrupt:
print("\\n⏹ Monitoring stopped by user")
break
except Exception as e:
print(f"Error during monitoring: {e}")
break
print("\\n✅ MONITORING COMPLETE")
def main():
if len(sys.argv) < 2:
print("🔧 SCHEDULER DEBUG TOOL")
print("=" * 30)
print("Usage:")
print(" python scheduler_debug.py test - Test deployment")
print(" python scheduler_debug.py collect - Collect thread dump")
print(" python scheduler_debug.py monitor [min] - Monitor periodic dumps")
print(" python scheduler_debug.py all - Run test + collect")
return
command = sys.argv[1].lower()
if command == "test":
test_deployment()
elif command == "collect":
collect_thread_dump()
elif command == "monitor":
duration = int(sys.argv[2]) if len(sys.argv) > 2 else 10
monitor_periodic_dumps(duration)
elif command == "all":
print("Running complete debugging sequence...\\n")
if test_deployment():
print("\\n" + "=" * 50)
collect_thread_dump()
else:
print("❌ Test failed, skipping collection")
else:
print(f"❌ Unknown command: {command}")
if __name__ == "__main__":
main()