mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-01-21 04:57:58 -05:00
Compare commits
3 Commits
make-old-w
...
fix/schedu
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5f802b6502 | ||
|
|
c5c206b6b2 | ||
|
|
6888dc2f93 |
@@ -1,7 +1,13 @@
|
|||||||
import asyncio
|
import asyncio
|
||||||
|
import faulthandler
|
||||||
|
import io
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
|
import signal
|
||||||
|
import sys
|
||||||
import threading
|
import threading
|
||||||
|
import traceback
|
||||||
|
from datetime import datetime
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse
|
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse
|
||||||
@@ -247,11 +253,140 @@ class Scheduler(AppService):
|
|||||||
if not self.scheduler.running:
|
if not self.scheduler.running:
|
||||||
raise UnhealthyServiceError("Scheduler is not running")
|
raise UnhealthyServiceError("Scheduler is not running")
|
||||||
|
|
||||||
|
# Update health check timestamp for monitoring
|
||||||
|
self._update_health_check_time()
|
||||||
|
|
||||||
return await super().health_check()
|
return await super().health_check()
|
||||||
|
|
||||||
|
def _signal_thread_dump_handler(self, signum, frame):
|
||||||
|
"""Signal handler for SIGUSR2 - dumps threads to stderr even when FastAPI is stuck"""
|
||||||
|
try:
|
||||||
|
import sys
|
||||||
|
|
||||||
|
sys.stderr.write(f"\n{'='*80}\n")
|
||||||
|
sys.stderr.write(f"SIGNAL THREAD DUMP - {datetime.now()}\n")
|
||||||
|
sys.stderr.write(f"Signal: {signum}, PID: {os.getpid()}\n")
|
||||||
|
sys.stderr.write(f"Total threads: {threading.active_count()}\n")
|
||||||
|
sys.stderr.write(f"{'='*80}\n")
|
||||||
|
|
||||||
|
current_frames = sys._current_frames()
|
||||||
|
threads = threading.enumerate()
|
||||||
|
|
||||||
|
for i, thread in enumerate(threads, 1):
|
||||||
|
sys.stderr.write(f"\n[{i}] Thread: {thread.name}\n")
|
||||||
|
sys.stderr.write(f" ID: {thread.ident}, Daemon: {thread.daemon}\n")
|
||||||
|
|
||||||
|
thread_frame = (
|
||||||
|
current_frames.get(thread.ident) if thread.ident else None
|
||||||
|
)
|
||||||
|
if thread_frame:
|
||||||
|
sys.stderr.write(" Stack:\n")
|
||||||
|
stack = traceback.extract_stack(thread_frame)
|
||||||
|
|
||||||
|
for j, (filename, lineno, name, line) in enumerate(stack[-12:]):
|
||||||
|
indent = " " + (" " * min(j, 8))
|
||||||
|
short_file = (
|
||||||
|
filename.split("/")[-1] if "/" in filename else filename
|
||||||
|
)
|
||||||
|
sys.stderr.write(f"{indent}{short_file}:{lineno} in {name}()\n")
|
||||||
|
if line and line.strip():
|
||||||
|
sys.stderr.write(f"{indent} → {line.strip()}\n")
|
||||||
|
else:
|
||||||
|
sys.stderr.write(" No frame available\n")
|
||||||
|
|
||||||
|
# Scheduler info
|
||||||
|
sys.stderr.write(f"\n{'='*40}\n")
|
||||||
|
sys.stderr.write("SCHEDULER STATE:\n")
|
||||||
|
if hasattr(self, "scheduler") and self.scheduler:
|
||||||
|
sys.stderr.write(f"Running: {self.scheduler.running}\n")
|
||||||
|
try:
|
||||||
|
jobs = self.scheduler.get_jobs()
|
||||||
|
sys.stderr.write(f"Jobs: {len(jobs)}\n")
|
||||||
|
except Exception:
|
||||||
|
sys.stderr.write("Jobs: Error getting jobs\n")
|
||||||
|
else:
|
||||||
|
sys.stderr.write("Scheduler: Not initialized\n")
|
||||||
|
|
||||||
|
sys.stderr.write(f"{'='*80}\n")
|
||||||
|
sys.stderr.write("END SIGNAL THREAD DUMP\n")
|
||||||
|
sys.stderr.write(f"{'='*80}\n\n")
|
||||||
|
sys.stderr.flush()
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
import sys
|
||||||
|
|
||||||
|
sys.stderr.write(f"Error in signal handler: {e}\n")
|
||||||
|
sys.stderr.flush()
|
||||||
|
|
||||||
|
def _start_periodic_thread_dump(self):
|
||||||
|
"""Start background thread for periodic thread dumps"""
|
||||||
|
|
||||||
|
def periodic_dump():
|
||||||
|
import time
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
time.sleep(300) # 5 minutes
|
||||||
|
|
||||||
|
# Only dump if we detect potential issues
|
||||||
|
current_time = time.time()
|
||||||
|
if hasattr(self, "_last_health_check"):
|
||||||
|
time_since_health = current_time - self._last_health_check
|
||||||
|
if time_since_health > 60: # No health check in 60 seconds
|
||||||
|
logger.warning(
|
||||||
|
"No health check in 60s, dumping threads for monitoring"
|
||||||
|
)
|
||||||
|
self._signal_thread_dump_handler(0, None)
|
||||||
|
|
||||||
|
# Also check if scheduler seems stuck
|
||||||
|
if hasattr(self, "scheduler") and self.scheduler:
|
||||||
|
try:
|
||||||
|
jobs = self.scheduler.get_jobs()
|
||||||
|
# Log periodic status
|
||||||
|
logger.info(
|
||||||
|
f"Periodic check: {len(jobs)} active jobs, {threading.active_count()} threads"
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(
|
||||||
|
f"Periodic check failed, dumping threads: {e}"
|
||||||
|
)
|
||||||
|
self._signal_thread_dump_handler(0, None)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error in periodic thread dump: {e}")
|
||||||
|
|
||||||
|
# Start daemon thread for periodic monitoring
|
||||||
|
dump_thread = threading.Thread(
|
||||||
|
target=periodic_dump, daemon=True, name="PeriodicThreadDump"
|
||||||
|
)
|
||||||
|
dump_thread.start()
|
||||||
|
logger.info("Periodic thread dump monitor started")
|
||||||
|
|
||||||
|
def _update_health_check_time(self):
|
||||||
|
"""Update last health check time for monitoring"""
|
||||||
|
import time
|
||||||
|
|
||||||
|
self._last_health_check = time.time()
|
||||||
|
|
||||||
def run_service(self):
|
def run_service(self):
|
||||||
load_dotenv()
|
load_dotenv()
|
||||||
|
|
||||||
|
# Enable faulthandler for debugging deadlocks
|
||||||
|
faulthandler.enable()
|
||||||
|
|
||||||
|
# Register SIGUSR1 to dump all thread stacks on demand
|
||||||
|
faulthandler.register(signal.SIGUSR1, all_threads=True)
|
||||||
|
|
||||||
|
# Also register SIGUSR2 for custom thread dump (in case faulthandler doesn't work)
|
||||||
|
signal.signal(signal.SIGUSR2, self._signal_thread_dump_handler)
|
||||||
|
|
||||||
|
# Start periodic thread dump for monitoring
|
||||||
|
self._start_periodic_thread_dump()
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"Faulthandler enabled. Send SIGUSR1 or SIGUSR2 to dump thread stacks. Periodic dumps every 5 minutes."
|
||||||
|
)
|
||||||
|
|
||||||
# Initialize the event loop for async jobs
|
# Initialize the event loop for async jobs
|
||||||
global _event_loop
|
global _event_loop
|
||||||
_event_loop = asyncio.new_event_loop()
|
_event_loop = asyncio.new_event_loop()
|
||||||
@@ -484,6 +619,102 @@ class Scheduler(AppService):
|
|||||||
"""Manually trigger cleanup of expired cloud storage files."""
|
"""Manually trigger cleanup of expired cloud storage files."""
|
||||||
return cleanup_expired_files()
|
return cleanup_expired_files()
|
||||||
|
|
||||||
|
@expose
|
||||||
|
def debug_thread_dump(self) -> str:
|
||||||
|
"""Get comprehensive thread dump for debugging deadlocks."""
|
||||||
|
try:
|
||||||
|
# Create string buffer to capture thread info
|
||||||
|
output = io.StringIO()
|
||||||
|
|
||||||
|
# Header
|
||||||
|
output.write(f"SCHEDULER THREAD DUMP - {datetime.now()}\n")
|
||||||
|
output.write("=" * 80 + "\n")
|
||||||
|
output.write(f"Process PID: {os.getpid()}\n")
|
||||||
|
output.write(f"Total threads: {threading.active_count()}\n\n")
|
||||||
|
|
||||||
|
# Get all threads with stack traces
|
||||||
|
current_frames = sys._current_frames()
|
||||||
|
threads = threading.enumerate()
|
||||||
|
|
||||||
|
for i, thread in enumerate(threads, 1):
|
||||||
|
output.write(f"[{i}/{len(threads)}] Thread: {thread.name}\n")
|
||||||
|
output.write(f" ID: {thread.ident}\n")
|
||||||
|
output.write(f" Daemon: {thread.daemon}\n")
|
||||||
|
output.write(f" Alive: {thread.is_alive()}\n")
|
||||||
|
|
||||||
|
# Get target if available (internal attribute)
|
||||||
|
if hasattr(thread, "_target") and getattr(thread, "_target", None):
|
||||||
|
output.write(f" Target: {getattr(thread, '_target')}\n")
|
||||||
|
|
||||||
|
# Get stack trace
|
||||||
|
frame = current_frames.get(thread.ident) if thread.ident else None
|
||||||
|
if frame:
|
||||||
|
output.write(" Stack trace:\n")
|
||||||
|
stack = traceback.extract_stack(frame)
|
||||||
|
|
||||||
|
for j, (filename, lineno, name, line) in enumerate(stack):
|
||||||
|
indent = " " + (" " * min(j, 6))
|
||||||
|
short_file = (
|
||||||
|
filename.split("/")[-1] if "/" in filename else filename
|
||||||
|
)
|
||||||
|
output.write(f"{indent}[{j+1}] {short_file}:{lineno}\n")
|
||||||
|
output.write(f"{indent} in {name}()\n")
|
||||||
|
if line and line.strip():
|
||||||
|
output.write(f"{indent} → {line.strip()}\n")
|
||||||
|
else:
|
||||||
|
output.write(" ⚠️ No frame available\n")
|
||||||
|
|
||||||
|
output.write("\n" + "-" * 60 + "\n")
|
||||||
|
|
||||||
|
# Scheduler state info
|
||||||
|
output.write("\nSCHEDULER STATE:\n")
|
||||||
|
output.write("=" * 40 + "\n")
|
||||||
|
if hasattr(self, "scheduler") and self.scheduler:
|
||||||
|
output.write(f"Scheduler running: {self.scheduler.running}\n")
|
||||||
|
try:
|
||||||
|
jobs = self.scheduler.get_jobs()
|
||||||
|
output.write(f"Active jobs: {len(jobs)}\n")
|
||||||
|
for job in jobs[:5]: # First 5 jobs
|
||||||
|
output.write(f" {job.id}: next run {job.next_run_time}\n")
|
||||||
|
except Exception as e:
|
||||||
|
output.write(f"Error getting jobs: {e}\n")
|
||||||
|
else:
|
||||||
|
output.write("Scheduler not initialized\n")
|
||||||
|
|
||||||
|
# Event loop info
|
||||||
|
output.write("\nEVENT LOOP STATE:\n")
|
||||||
|
output.write("=" * 40 + "\n")
|
||||||
|
global _event_loop
|
||||||
|
if _event_loop:
|
||||||
|
output.write(f"Event loop running: {_event_loop.is_running()}\n")
|
||||||
|
try:
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
tasks = asyncio.all_tasks(_event_loop)
|
||||||
|
output.write(f"Active tasks: {len(tasks)}\n")
|
||||||
|
for task in list(tasks)[:5]: # First 5 tasks
|
||||||
|
output.write(f" {task.get_name()}: {task._state}\n")
|
||||||
|
except Exception as e:
|
||||||
|
output.write(f"Error getting tasks: {e}\n")
|
||||||
|
else:
|
||||||
|
output.write("Event loop not initialized\n")
|
||||||
|
|
||||||
|
output.write("\n" + "=" * 80 + "\n")
|
||||||
|
output.write("END THREAD DUMP\n")
|
||||||
|
|
||||||
|
result = output.getvalue()
|
||||||
|
output.close()
|
||||||
|
|
||||||
|
# Also log that we got a thread dump request
|
||||||
|
logger.info("Thread dump requested via HTTP endpoint")
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
error_msg = f"Error generating thread dump: {type(e).__name__}: {e}"
|
||||||
|
logger.error(error_msg)
|
||||||
|
return error_msg
|
||||||
|
|
||||||
|
|
||||||
class SchedulerClient(AppServiceClient):
|
class SchedulerClient(AppServiceClient):
|
||||||
@classmethod
|
@classmethod
|
||||||
|
|||||||
353
autogpt_platform/backend/scheduler_debug.py
Executable file
353
autogpt_platform/backend/scheduler_debug.py
Executable file
@@ -0,0 +1,353 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Unified scheduler debugging tool
|
||||||
|
- Test deployment
|
||||||
|
- Collect thread dumps (signal-based, works when FastAPI is stuck)
|
||||||
|
- Monitor periodic dumps
|
||||||
|
"""
|
||||||
|
import subprocess
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
import requests
|
||||||
|
|
||||||
|
|
||||||
|
def find_scheduler_pod():
|
||||||
|
"""Find the running scheduler pod"""
|
||||||
|
result = subprocess.run(
|
||||||
|
"kubectl get pods -n dev-agpt --no-headers".split(),
|
||||||
|
capture_output=True,
|
||||||
|
text=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
for line in result.stdout.split("\n"):
|
||||||
|
if "scheduler-server" in line and "Running" in line:
|
||||||
|
return line.split()[0]
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def test_deployment():
|
||||||
|
"""Test if the deployment has debugging enabled"""
|
||||||
|
print("🧪 TESTING SCHEDULER DEBUG DEPLOYMENT")
|
||||||
|
print("=" * 50)
|
||||||
|
|
||||||
|
pod_name = find_scheduler_pod()
|
||||||
|
if not pod_name:
|
||||||
|
print("❌ No scheduler pod found")
|
||||||
|
return False
|
||||||
|
|
||||||
|
print(f"📍 Pod: {pod_name}")
|
||||||
|
|
||||||
|
# Check if faulthandler is enabled
|
||||||
|
print("🔍 Checking faulthandler setup...")
|
||||||
|
log_result = subprocess.run(
|
||||||
|
f"kubectl logs -n dev-agpt {pod_name} --tail=50".split(),
|
||||||
|
capture_output=True,
|
||||||
|
text=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
faulthandler_enabled = "Faulthandler enabled" in log_result.stdout
|
||||||
|
periodic_enabled = "Periodic thread dump monitor started" in log_result.stdout
|
||||||
|
|
||||||
|
if faulthandler_enabled:
|
||||||
|
print("✅ Faulthandler is enabled")
|
||||||
|
else:
|
||||||
|
print("❌ Faulthandler not found in logs")
|
||||||
|
|
||||||
|
if periodic_enabled:
|
||||||
|
print("✅ Periodic monitoring is enabled")
|
||||||
|
else:
|
||||||
|
print("❌ Periodic monitoring not found in logs")
|
||||||
|
|
||||||
|
# Test signal sending
|
||||||
|
print("\\n📡 Testing signal delivery...")
|
||||||
|
signal_result = subprocess.run(
|
||||||
|
f"kubectl exec -n dev-agpt {pod_name} -- kill -USR2 1".split(),
|
||||||
|
capture_output=True,
|
||||||
|
text=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
if signal_result.returncode == 0:
|
||||||
|
print("✅ Signal sent successfully")
|
||||||
|
time.sleep(2)
|
||||||
|
|
||||||
|
# Check for thread dump in logs
|
||||||
|
new_logs = subprocess.run(
|
||||||
|
f"kubectl logs -n dev-agpt {pod_name} --tail=20".split(),
|
||||||
|
capture_output=True,
|
||||||
|
text=True,
|
||||||
|
)
|
||||||
|
if "SIGNAL THREAD DUMP" in new_logs.stdout:
|
||||||
|
print("✅ Thread dump appeared in logs!")
|
||||||
|
else:
|
||||||
|
print("⚠️ No thread dump found (might take a moment)")
|
||||||
|
else:
|
||||||
|
print(f"❌ Signal failed: {signal_result.stderr}")
|
||||||
|
|
||||||
|
# Test HTTP API (should work when not stuck)
|
||||||
|
print("\\n🌐 Testing HTTP API...")
|
||||||
|
pf_process = None
|
||||||
|
try:
|
||||||
|
pf_process = subprocess.Popen(
|
||||||
|
f"kubectl port-forward -n dev-agpt {pod_name} 8003:8003".split(),
|
||||||
|
stdout=subprocess.PIPE,
|
||||||
|
stderr=subprocess.PIPE,
|
||||||
|
)
|
||||||
|
time.sleep(2)
|
||||||
|
|
||||||
|
response = requests.get("http://localhost:8003/debug_thread_dump", timeout=10)
|
||||||
|
if response.status_code == 200:
|
||||||
|
print("✅ HTTP API working")
|
||||||
|
print(f" Thread count found: {'Total threads:' in response.text}")
|
||||||
|
else:
|
||||||
|
print(f"⚠️ HTTP API returned: {response.status_code}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"⚠️ HTTP API failed: {e}")
|
||||||
|
finally:
|
||||||
|
if pf_process:
|
||||||
|
try:
|
||||||
|
pf_process.terminate()
|
||||||
|
pf_process.wait()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
success = faulthandler_enabled and signal_result.returncode == 0
|
||||||
|
print(
|
||||||
|
f"\\n{'✅ DEPLOYMENT TEST PASSED' if success else '❌ DEPLOYMENT TEST FAILED'}"
|
||||||
|
)
|
||||||
|
return success
|
||||||
|
|
||||||
|
|
||||||
|
def collect_thread_dump():
|
||||||
|
"""Collect comprehensive thread dump (works even when scheduler is stuck)"""
|
||||||
|
print("🚨 COLLECTING THREAD DUMP FROM SCHEDULER")
|
||||||
|
print("=" * 60)
|
||||||
|
|
||||||
|
pod_name = find_scheduler_pod()
|
||||||
|
if not pod_name:
|
||||||
|
print("❌ No scheduler pod found")
|
||||||
|
return False
|
||||||
|
|
||||||
|
print(f"📍 Pod: {pod_name}")
|
||||||
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||||
|
|
||||||
|
# Send both signals for maximum coverage
|
||||||
|
print("📡 Sending signals for thread dumps...")
|
||||||
|
|
||||||
|
# SIGUSR1 (faulthandler)
|
||||||
|
result1 = subprocess.run(
|
||||||
|
f"kubectl exec -n dev-agpt {pod_name} -- kill -USR1 1".split(),
|
||||||
|
capture_output=True,
|
||||||
|
text=True,
|
||||||
|
)
|
||||||
|
print(f" SIGUSR1: {'✅' if result1.returncode == 0 else '❌'}")
|
||||||
|
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
# SIGUSR2 (custom handler)
|
||||||
|
result2 = subprocess.run(
|
||||||
|
f"kubectl exec -n dev-agpt {pod_name} -- kill -USR2 1".split(),
|
||||||
|
capture_output=True,
|
||||||
|
text=True,
|
||||||
|
)
|
||||||
|
print(f" SIGUSR2: {'✅' if result2.returncode == 0 else '❌'}")
|
||||||
|
|
||||||
|
time.sleep(3) # Give signals time to execute
|
||||||
|
|
||||||
|
# Collect logs with thread dumps
|
||||||
|
print("📋 Collecting logs...")
|
||||||
|
log_result = subprocess.run(
|
||||||
|
f"kubectl logs -n dev-agpt {pod_name} --tail=500".split(),
|
||||||
|
capture_output=True,
|
||||||
|
text=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Save everything
|
||||||
|
dump_file = f"THREAD_DUMP_{timestamp}.txt"
|
||||||
|
with open(dump_file, "w") as f:
|
||||||
|
f.write("SCHEDULER THREAD DUMP COLLECTION\\n")
|
||||||
|
f.write(f"Timestamp: {datetime.now()}\\n")
|
||||||
|
f.write(f"Pod: {pod_name}\\n")
|
||||||
|
f.write("=" * 80 + "\\n\\n")
|
||||||
|
f.write("FULL LOGS (last 500 lines):\\n")
|
||||||
|
f.write("-" * 40 + "\\n")
|
||||||
|
f.write(log_result.stdout)
|
||||||
|
|
||||||
|
print(f"💾 Full dump saved: {dump_file}")
|
||||||
|
|
||||||
|
# Extract and show thread dump preview
|
||||||
|
lines = log_result.stdout.split("\\n")
|
||||||
|
thread_dumps = []
|
||||||
|
in_dump = False
|
||||||
|
current_dump = []
|
||||||
|
|
||||||
|
for line in lines:
|
||||||
|
if any(
|
||||||
|
marker in line
|
||||||
|
for marker in ["SIGNAL THREAD DUMP", "Fatal Python error", "Thread 0x"]
|
||||||
|
):
|
||||||
|
if current_dump:
|
||||||
|
thread_dumps.append(current_dump)
|
||||||
|
current_dump = [line]
|
||||||
|
in_dump = True
|
||||||
|
elif in_dump and (
|
||||||
|
"END SIGNAL THREAD DUMP" in line or "Current thread 0x" in line
|
||||||
|
):
|
||||||
|
current_dump.append(line)
|
||||||
|
thread_dumps.append(current_dump)
|
||||||
|
current_dump = []
|
||||||
|
in_dump = False
|
||||||
|
elif in_dump:
|
||||||
|
current_dump.append(line)
|
||||||
|
|
||||||
|
if current_dump:
|
||||||
|
thread_dumps.append(current_dump)
|
||||||
|
|
||||||
|
if thread_dumps:
|
||||||
|
print(f"\\n🔍 FOUND {len(thread_dumps)} THREAD DUMP(S):")
|
||||||
|
print("-" * 50)
|
||||||
|
|
||||||
|
# Show the most recent/complete dump
|
||||||
|
latest_dump = thread_dumps[-1]
|
||||||
|
for i, line in enumerate(latest_dump[:50]): # First 50 lines
|
||||||
|
print(line)
|
||||||
|
|
||||||
|
if len(latest_dump) > 50:
|
||||||
|
print("... (truncated, see full dump in file)")
|
||||||
|
|
||||||
|
# Create separate file with just thread dumps
|
||||||
|
clean_dump_file = f"CLEAN_THREAD_DUMP_{timestamp}.txt"
|
||||||
|
with open(clean_dump_file, "w") as f:
|
||||||
|
f.write(f"EXTRACTED THREAD DUMPS - {datetime.now()}\\n")
|
||||||
|
f.write("=" * 60 + "\\n\\n")
|
||||||
|
for i, dump in enumerate(thread_dumps, 1):
|
||||||
|
f.write(f"DUMP #{i}:\\n")
|
||||||
|
f.write("-" * 30 + "\\n")
|
||||||
|
f.write("\\n".join(dump))
|
||||||
|
f.write("\\n\\n")
|
||||||
|
|
||||||
|
print(f"🎯 Clean thread dumps saved: {clean_dump_file}")
|
||||||
|
|
||||||
|
else:
|
||||||
|
print("⚠️ No thread dumps found in logs")
|
||||||
|
print("Recent log lines:")
|
||||||
|
for line in lines[-10:]:
|
||||||
|
print(f" {line}")
|
||||||
|
|
||||||
|
# Try HTTP backup (will fail if scheduler is stuck, but worth trying)
|
||||||
|
print("\\n🌐 Attempting HTTP backup...")
|
||||||
|
pf_process = None
|
||||||
|
try:
|
||||||
|
pf_process = subprocess.Popen(
|
||||||
|
f"kubectl port-forward -n dev-agpt {pod_name} 8003:8003".split(),
|
||||||
|
stdout=subprocess.PIPE,
|
||||||
|
stderr=subprocess.PIPE,
|
||||||
|
)
|
||||||
|
time.sleep(2)
|
||||||
|
|
||||||
|
response = requests.get("http://localhost:8003/debug_thread_dump", timeout=5)
|
||||||
|
if response.status_code == 200:
|
||||||
|
http_file = f"HTTP_THREAD_DUMP_{timestamp}.txt"
|
||||||
|
with open(http_file, "w") as f:
|
||||||
|
f.write(response.text)
|
||||||
|
print(f"✅ HTTP backup saved: {http_file}")
|
||||||
|
else:
|
||||||
|
print(f"⚠️ HTTP failed: {response.status_code}")
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"⚠️ HTTP failed (expected if stuck): {e}")
|
||||||
|
finally:
|
||||||
|
if pf_process:
|
||||||
|
try:
|
||||||
|
pf_process.terminate()
|
||||||
|
pf_process.wait()
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
print("\\n✅ COLLECTION COMPLETE!")
|
||||||
|
return len(thread_dumps) > 0
|
||||||
|
|
||||||
|
|
||||||
|
def monitor_periodic_dumps(duration_minutes=10):
|
||||||
|
"""Monitor periodic thread dumps for a specified duration"""
|
||||||
|
print(f"👁️ MONITORING PERIODIC DUMPS FOR {duration_minutes} MINUTES")
|
||||||
|
print("=" * 50)
|
||||||
|
|
||||||
|
pod_name = find_scheduler_pod()
|
||||||
|
if not pod_name:
|
||||||
|
print("❌ No scheduler pod found")
|
||||||
|
return
|
||||||
|
|
||||||
|
print(f"📍 Pod: {pod_name}")
|
||||||
|
print("⏰ Watching for periodic status messages and thread dumps...")
|
||||||
|
|
||||||
|
start_time = time.time()
|
||||||
|
end_time = start_time + (duration_minutes * 60)
|
||||||
|
|
||||||
|
# Get current log position (for reference, not used currently)
|
||||||
|
# Could be used for tracking new vs old logs if needed
|
||||||
|
|
||||||
|
while time.time() < end_time:
|
||||||
|
try:
|
||||||
|
# Get new logs
|
||||||
|
current_logs = subprocess.run(
|
||||||
|
f"kubectl logs -n dev-agpt {pod_name} --tail=50".split(),
|
||||||
|
capture_output=True,
|
||||||
|
text=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
for line in current_logs.stdout.split("\\n"):
|
||||||
|
if "Periodic check:" in line:
|
||||||
|
print(f"📊 {line}")
|
||||||
|
elif "SIGNAL THREAD DUMP" in line:
|
||||||
|
print(f"🚨 Thread dump detected: {line}")
|
||||||
|
elif "No health check" in line:
|
||||||
|
print(f"⚠️ Health issue: {line}")
|
||||||
|
|
||||||
|
time.sleep(30) # Check every 30 seconds
|
||||||
|
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
print("\\n⏹️ Monitoring stopped by user")
|
||||||
|
break
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error during monitoring: {e}")
|
||||||
|
break
|
||||||
|
|
||||||
|
print("\\n✅ MONITORING COMPLETE")
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
if len(sys.argv) < 2:
|
||||||
|
print("🔧 SCHEDULER DEBUG TOOL")
|
||||||
|
print("=" * 30)
|
||||||
|
print("Usage:")
|
||||||
|
print(" python scheduler_debug.py test - Test deployment")
|
||||||
|
print(" python scheduler_debug.py collect - Collect thread dump")
|
||||||
|
print(" python scheduler_debug.py monitor [min] - Monitor periodic dumps")
|
||||||
|
print(" python scheduler_debug.py all - Run test + collect")
|
||||||
|
return
|
||||||
|
|
||||||
|
command = sys.argv[1].lower()
|
||||||
|
|
||||||
|
if command == "test":
|
||||||
|
test_deployment()
|
||||||
|
elif command == "collect":
|
||||||
|
collect_thread_dump()
|
||||||
|
elif command == "monitor":
|
||||||
|
duration = int(sys.argv[2]) if len(sys.argv) > 2 else 10
|
||||||
|
monitor_periodic_dumps(duration)
|
||||||
|
elif command == "all":
|
||||||
|
print("Running complete debugging sequence...\\n")
|
||||||
|
if test_deployment():
|
||||||
|
print("\\n" + "=" * 50)
|
||||||
|
collect_thread_dump()
|
||||||
|
else:
|
||||||
|
print("❌ Test failed, skipping collection")
|
||||||
|
else:
|
||||||
|
print(f"❌ Unknown command: {command}")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
Reference in New Issue
Block a user