mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-04-08 03:00:28 -04:00
Compare commits
2 Commits
fix/schedu
...
abhimanyuy
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bea287957d | ||
|
|
84d7b05cd2 |
@@ -33,7 +33,7 @@ from prisma.types import (
|
||||
AgentNodeExecutionUpdateInput,
|
||||
AgentNodeExecutionWhereInput,
|
||||
)
|
||||
from pydantic import BaseModel, ConfigDict, JsonValue, ValidationError
|
||||
from pydantic import BaseModel, ConfigDict, JsonValue
|
||||
from pydantic.fields import Field
|
||||
|
||||
from backend.server.v2.store.exceptions import DatabaseError
|
||||
@@ -59,7 +59,7 @@ from .includes import (
|
||||
GRAPH_EXECUTION_INCLUDE_WITH_NODES,
|
||||
graph_execution_include,
|
||||
)
|
||||
from .model import GraphExecutionStats, NodeExecutionStats
|
||||
from .model import GraphExecutionStats
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
@@ -318,30 +318,18 @@ class NodeExecutionResult(BaseModel):
|
||||
|
||||
@staticmethod
|
||||
def from_db(_node_exec: AgentNodeExecution, user_id: Optional[str] = None):
|
||||
try:
|
||||
stats = NodeExecutionStats.model_validate(_node_exec.stats or {})
|
||||
except (ValueError, ValidationError):
|
||||
stats = NodeExecutionStats()
|
||||
|
||||
if stats.cleared_inputs:
|
||||
input_data: BlockInput = defaultdict()
|
||||
for name, messages in stats.cleared_inputs.items():
|
||||
input_data[name] = messages[-1] if messages else ""
|
||||
elif _node_exec.executionData:
|
||||
if _node_exec.executionData:
|
||||
# Execution that has been queued for execution will persist its data.
|
||||
input_data = type_utils.convert(_node_exec.executionData, dict[str, Any])
|
||||
else:
|
||||
# For incomplete execution, executionData will not be yet available.
|
||||
input_data: BlockInput = defaultdict()
|
||||
for data in _node_exec.Input or []:
|
||||
input_data[data.name] = type_utils.convert(data.data, type[Any])
|
||||
|
||||
output_data: CompletedBlockOutput = defaultdict(list)
|
||||
|
||||
if stats.cleared_outputs:
|
||||
for name, messages in stats.cleared_outputs.items():
|
||||
output_data[name].extend(messages)
|
||||
else:
|
||||
for data in _node_exec.Output or []:
|
||||
output_data[data.name].append(type_utils.convert(data.data, type[Any]))
|
||||
for data in _node_exec.Output or []:
|
||||
output_data[data.name].append(type_utils.convert(data.data, type[Any]))
|
||||
|
||||
graph_execution: AgentGraphExecution | None = _node_exec.GraphExecution
|
||||
if graph_execution:
|
||||
|
||||
@@ -655,9 +655,6 @@ class NodeExecutionStats(BaseModel):
|
||||
output_token_count: int = 0
|
||||
extra_cost: int = 0
|
||||
extra_steps: int = 0
|
||||
# Moderation fields
|
||||
cleared_inputs: Optional[dict[str, list[str]]] = None
|
||||
cleared_outputs: Optional[dict[str, list[str]]] = None
|
||||
|
||||
def __iadd__(self, other: "NodeExecutionStats") -> "NodeExecutionStats":
|
||||
"""Mutate this instance by adding another NodeExecutionStats."""
|
||||
|
||||
@@ -27,7 +27,7 @@ from backend.executor.activity_status_generator import (
|
||||
)
|
||||
from backend.executor.utils import LogMetadata
|
||||
from backend.notifications.notifications import queue_notification
|
||||
from backend.util.exceptions import InsufficientBalanceError, ModerationError
|
||||
from backend.util.exceptions import InsufficientBalanceError
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from backend.executor import DatabaseManagerClient, DatabaseManagerAsyncClient
|
||||
@@ -67,7 +67,6 @@ from backend.executor.utils import (
|
||||
validate_exec,
|
||||
)
|
||||
from backend.integrations.creds_manager import IntegrationCredentialsManager
|
||||
from backend.server.v2.AutoMod.manager import automod_manager
|
||||
from backend.util import json
|
||||
from backend.util.clients import (
|
||||
get_async_execution_event_bus,
|
||||
@@ -760,22 +759,6 @@ class ExecutionProcessor:
|
||||
amount=1,
|
||||
)
|
||||
|
||||
# Input moderation
|
||||
try:
|
||||
if moderation_error := asyncio.run_coroutine_threadsafe(
|
||||
automod_manager.moderate_graph_execution_inputs(
|
||||
db_client=get_db_async_client(),
|
||||
graph_exec=graph_exec,
|
||||
),
|
||||
self.node_evaluation_loop,
|
||||
).result(timeout=30.0):
|
||||
raise moderation_error
|
||||
except asyncio.TimeoutError:
|
||||
log_metadata.warning(
|
||||
f"Input moderation timed out for graph execution {graph_exec.graph_exec_id}, bypassing moderation and continuing execution"
|
||||
)
|
||||
# Continue execution without moderation
|
||||
|
||||
# ------------------------------------------------------------
|
||||
# Pre‑populate queue ---------------------------------------
|
||||
# ------------------------------------------------------------
|
||||
@@ -914,25 +897,6 @@ class ExecutionProcessor:
|
||||
time.sleep(0.1)
|
||||
|
||||
# loop done --------------------------------------------------
|
||||
|
||||
# Output moderation
|
||||
try:
|
||||
if moderation_error := asyncio.run_coroutine_threadsafe(
|
||||
automod_manager.moderate_graph_execution_outputs(
|
||||
db_client=get_db_async_client(),
|
||||
graph_exec_id=graph_exec.graph_exec_id,
|
||||
user_id=graph_exec.user_id,
|
||||
graph_id=graph_exec.graph_id,
|
||||
),
|
||||
self.node_evaluation_loop,
|
||||
).result(timeout=30.0):
|
||||
raise moderation_error
|
||||
except asyncio.TimeoutError:
|
||||
log_metadata.warning(
|
||||
f"Output moderation timed out for graph execution {graph_exec.graph_exec_id}, bypassing moderation and continuing execution"
|
||||
)
|
||||
# Continue execution without moderation
|
||||
|
||||
# Determine final execution status based on whether there was an error or termination
|
||||
if cancel.is_set():
|
||||
execution_status = ExecutionStatus.TERMINATED
|
||||
@@ -953,12 +917,11 @@ class ExecutionProcessor:
|
||||
else Exception(f"{e.__class__.__name__}: {e}")
|
||||
)
|
||||
|
||||
known_errors = (InsufficientBalanceError, ModerationError)
|
||||
known_errors = (InsufficientBalanceError,)
|
||||
if isinstance(error, known_errors):
|
||||
execution_stats.error = str(error)
|
||||
return ExecutionStatus.FAILED
|
||||
|
||||
execution_status = ExecutionStatus.FAILED
|
||||
log_metadata.exception(
|
||||
f"Failed graph execution {graph_exec.graph_exec_id}: {error}"
|
||||
)
|
||||
|
||||
@@ -1,13 +1,7 @@
|
||||
import asyncio
|
||||
import faulthandler
|
||||
import io
|
||||
import logging
|
||||
import os
|
||||
import signal
|
||||
import sys
|
||||
import threading
|
||||
import traceback
|
||||
from datetime import datetime
|
||||
from enum import Enum
|
||||
from typing import Optional
|
||||
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse
|
||||
@@ -253,140 +247,11 @@ class Scheduler(AppService):
|
||||
if not self.scheduler.running:
|
||||
raise UnhealthyServiceError("Scheduler is not running")
|
||||
|
||||
# Update health check timestamp for monitoring
|
||||
self._update_health_check_time()
|
||||
|
||||
return await super().health_check()
|
||||
|
||||
def _signal_thread_dump_handler(self, signum, frame):
|
||||
"""Signal handler for SIGUSR2 - dumps threads to stderr even when FastAPI is stuck"""
|
||||
try:
|
||||
import sys
|
||||
|
||||
sys.stderr.write(f"\n{'='*80}\n")
|
||||
sys.stderr.write(f"SIGNAL THREAD DUMP - {datetime.now()}\n")
|
||||
sys.stderr.write(f"Signal: {signum}, PID: {os.getpid()}\n")
|
||||
sys.stderr.write(f"Total threads: {threading.active_count()}\n")
|
||||
sys.stderr.write(f"{'='*80}\n")
|
||||
|
||||
current_frames = sys._current_frames()
|
||||
threads = threading.enumerate()
|
||||
|
||||
for i, thread in enumerate(threads, 1):
|
||||
sys.stderr.write(f"\n[{i}] Thread: {thread.name}\n")
|
||||
sys.stderr.write(f" ID: {thread.ident}, Daemon: {thread.daemon}\n")
|
||||
|
||||
thread_frame = (
|
||||
current_frames.get(thread.ident) if thread.ident else None
|
||||
)
|
||||
if thread_frame:
|
||||
sys.stderr.write(" Stack:\n")
|
||||
stack = traceback.extract_stack(thread_frame)
|
||||
|
||||
for j, (filename, lineno, name, line) in enumerate(stack[-12:]):
|
||||
indent = " " + (" " * min(j, 8))
|
||||
short_file = (
|
||||
filename.split("/")[-1] if "/" in filename else filename
|
||||
)
|
||||
sys.stderr.write(f"{indent}{short_file}:{lineno} in {name}()\n")
|
||||
if line and line.strip():
|
||||
sys.stderr.write(f"{indent} → {line.strip()}\n")
|
||||
else:
|
||||
sys.stderr.write(" No frame available\n")
|
||||
|
||||
# Scheduler info
|
||||
sys.stderr.write(f"\n{'='*40}\n")
|
||||
sys.stderr.write("SCHEDULER STATE:\n")
|
||||
if hasattr(self, "scheduler") and self.scheduler:
|
||||
sys.stderr.write(f"Running: {self.scheduler.running}\n")
|
||||
try:
|
||||
jobs = self.scheduler.get_jobs()
|
||||
sys.stderr.write(f"Jobs: {len(jobs)}\n")
|
||||
except Exception:
|
||||
sys.stderr.write("Jobs: Error getting jobs\n")
|
||||
else:
|
||||
sys.stderr.write("Scheduler: Not initialized\n")
|
||||
|
||||
sys.stderr.write(f"{'='*80}\n")
|
||||
sys.stderr.write("END SIGNAL THREAD DUMP\n")
|
||||
sys.stderr.write(f"{'='*80}\n\n")
|
||||
sys.stderr.flush()
|
||||
|
||||
except Exception as e:
|
||||
import sys
|
||||
|
||||
sys.stderr.write(f"Error in signal handler: {e}\n")
|
||||
sys.stderr.flush()
|
||||
|
||||
def _start_periodic_thread_dump(self):
|
||||
"""Start background thread for periodic thread dumps"""
|
||||
|
||||
def periodic_dump():
|
||||
import time
|
||||
|
||||
while True:
|
||||
try:
|
||||
time.sleep(300) # 5 minutes
|
||||
|
||||
# Only dump if we detect potential issues
|
||||
current_time = time.time()
|
||||
if hasattr(self, "_last_health_check"):
|
||||
time_since_health = current_time - self._last_health_check
|
||||
if time_since_health > 60: # No health check in 60 seconds
|
||||
logger.warning(
|
||||
"No health check in 60s, dumping threads for monitoring"
|
||||
)
|
||||
self._signal_thread_dump_handler(0, None)
|
||||
|
||||
# Also check if scheduler seems stuck
|
||||
if hasattr(self, "scheduler") and self.scheduler:
|
||||
try:
|
||||
jobs = self.scheduler.get_jobs()
|
||||
# Log periodic status
|
||||
logger.info(
|
||||
f"Periodic check: {len(jobs)} active jobs, {threading.active_count()} threads"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
f"Periodic check failed, dumping threads: {e}"
|
||||
)
|
||||
self._signal_thread_dump_handler(0, None)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in periodic thread dump: {e}")
|
||||
|
||||
# Start daemon thread for periodic monitoring
|
||||
dump_thread = threading.Thread(
|
||||
target=periodic_dump, daemon=True, name="PeriodicThreadDump"
|
||||
)
|
||||
dump_thread.start()
|
||||
logger.info("Periodic thread dump monitor started")
|
||||
|
||||
def _update_health_check_time(self):
|
||||
"""Update last health check time for monitoring"""
|
||||
import time
|
||||
|
||||
self._last_health_check = time.time()
|
||||
|
||||
def run_service(self):
|
||||
load_dotenv()
|
||||
|
||||
# Enable faulthandler for debugging deadlocks
|
||||
faulthandler.enable()
|
||||
|
||||
# Register SIGUSR1 to dump all thread stacks on demand
|
||||
faulthandler.register(signal.SIGUSR1, all_threads=True)
|
||||
|
||||
# Also register SIGUSR2 for custom thread dump (in case faulthandler doesn't work)
|
||||
signal.signal(signal.SIGUSR2, self._signal_thread_dump_handler)
|
||||
|
||||
# Start periodic thread dump for monitoring
|
||||
self._start_periodic_thread_dump()
|
||||
|
||||
logger.info(
|
||||
"Faulthandler enabled. Send SIGUSR1 or SIGUSR2 to dump thread stacks. Periodic dumps every 5 minutes."
|
||||
)
|
||||
|
||||
# Initialize the event loop for async jobs
|
||||
global _event_loop
|
||||
_event_loop = asyncio.new_event_loop()
|
||||
@@ -619,102 +484,6 @@ class Scheduler(AppService):
|
||||
"""Manually trigger cleanup of expired cloud storage files."""
|
||||
return cleanup_expired_files()
|
||||
|
||||
@expose
|
||||
def debug_thread_dump(self) -> str:
|
||||
"""Get comprehensive thread dump for debugging deadlocks."""
|
||||
try:
|
||||
# Create string buffer to capture thread info
|
||||
output = io.StringIO()
|
||||
|
||||
# Header
|
||||
output.write(f"SCHEDULER THREAD DUMP - {datetime.now()}\n")
|
||||
output.write("=" * 80 + "\n")
|
||||
output.write(f"Process PID: {os.getpid()}\n")
|
||||
output.write(f"Total threads: {threading.active_count()}\n\n")
|
||||
|
||||
# Get all threads with stack traces
|
||||
current_frames = sys._current_frames()
|
||||
threads = threading.enumerate()
|
||||
|
||||
for i, thread in enumerate(threads, 1):
|
||||
output.write(f"[{i}/{len(threads)}] Thread: {thread.name}\n")
|
||||
output.write(f" ID: {thread.ident}\n")
|
||||
output.write(f" Daemon: {thread.daemon}\n")
|
||||
output.write(f" Alive: {thread.is_alive()}\n")
|
||||
|
||||
# Get target if available (internal attribute)
|
||||
if hasattr(thread, "_target") and getattr(thread, "_target", None):
|
||||
output.write(f" Target: {getattr(thread, '_target')}\n")
|
||||
|
||||
# Get stack trace
|
||||
frame = current_frames.get(thread.ident) if thread.ident else None
|
||||
if frame:
|
||||
output.write(" Stack trace:\n")
|
||||
stack = traceback.extract_stack(frame)
|
||||
|
||||
for j, (filename, lineno, name, line) in enumerate(stack):
|
||||
indent = " " + (" " * min(j, 6))
|
||||
short_file = (
|
||||
filename.split("/")[-1] if "/" in filename else filename
|
||||
)
|
||||
output.write(f"{indent}[{j+1}] {short_file}:{lineno}\n")
|
||||
output.write(f"{indent} in {name}()\n")
|
||||
if line and line.strip():
|
||||
output.write(f"{indent} → {line.strip()}\n")
|
||||
else:
|
||||
output.write(" ⚠️ No frame available\n")
|
||||
|
||||
output.write("\n" + "-" * 60 + "\n")
|
||||
|
||||
# Scheduler state info
|
||||
output.write("\nSCHEDULER STATE:\n")
|
||||
output.write("=" * 40 + "\n")
|
||||
if hasattr(self, "scheduler") and self.scheduler:
|
||||
output.write(f"Scheduler running: {self.scheduler.running}\n")
|
||||
try:
|
||||
jobs = self.scheduler.get_jobs()
|
||||
output.write(f"Active jobs: {len(jobs)}\n")
|
||||
for job in jobs[:5]: # First 5 jobs
|
||||
output.write(f" {job.id}: next run {job.next_run_time}\n")
|
||||
except Exception as e:
|
||||
output.write(f"Error getting jobs: {e}\n")
|
||||
else:
|
||||
output.write("Scheduler not initialized\n")
|
||||
|
||||
# Event loop info
|
||||
output.write("\nEVENT LOOP STATE:\n")
|
||||
output.write("=" * 40 + "\n")
|
||||
global _event_loop
|
||||
if _event_loop:
|
||||
output.write(f"Event loop running: {_event_loop.is_running()}\n")
|
||||
try:
|
||||
import asyncio
|
||||
|
||||
tasks = asyncio.all_tasks(_event_loop)
|
||||
output.write(f"Active tasks: {len(tasks)}\n")
|
||||
for task in list(tasks)[:5]: # First 5 tasks
|
||||
output.write(f" {task.get_name()}: {task._state}\n")
|
||||
except Exception as e:
|
||||
output.write(f"Error getting tasks: {e}\n")
|
||||
else:
|
||||
output.write("Event loop not initialized\n")
|
||||
|
||||
output.write("\n" + "=" * 80 + "\n")
|
||||
output.write("END THREAD DUMP\n")
|
||||
|
||||
result = output.getvalue()
|
||||
output.close()
|
||||
|
||||
# Also log that we got a thread dump request
|
||||
logger.info("Thread dump requested via HTTP endpoint")
|
||||
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"Error generating thread dump: {type(e).__name__}: {e}"
|
||||
logger.error(error_msg)
|
||||
return error_msg
|
||||
|
||||
|
||||
class SchedulerClient(AppServiceClient):
|
||||
@classmethod
|
||||
|
||||
@@ -1 +0,0 @@
|
||||
# AutoMod integration for content moderation
|
||||
@@ -1,353 +0,0 @@
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
from typing import TYPE_CHECKING, Any, Literal
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from backend.executor import DatabaseManagerAsyncClient
|
||||
|
||||
from autogpt_libs.feature_flag.client import is_feature_enabled
|
||||
from pydantic import ValidationError
|
||||
|
||||
from backend.data.execution import ExecutionStatus
|
||||
from backend.server.v2.AutoMod.models import (
|
||||
AutoModRequest,
|
||||
AutoModResponse,
|
||||
ModerationConfig,
|
||||
)
|
||||
from backend.util.exceptions import ModerationError
|
||||
from backend.util.request import Requests
|
||||
from backend.util.settings import Settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AutoModManager:
|
||||
|
||||
def __init__(self):
|
||||
self.config = self._load_config()
|
||||
|
||||
def _load_config(self) -> ModerationConfig:
|
||||
"""Load AutoMod configuration from settings"""
|
||||
settings = Settings()
|
||||
return ModerationConfig(
|
||||
enabled=settings.config.automod_enabled,
|
||||
api_url=settings.config.automod_api_url,
|
||||
api_key=settings.secrets.automod_api_key,
|
||||
timeout=settings.config.automod_timeout,
|
||||
retry_attempts=settings.config.automod_retry_attempts,
|
||||
retry_delay=settings.config.automod_retry_delay,
|
||||
fail_open=settings.config.automod_fail_open,
|
||||
)
|
||||
|
||||
async def moderate_graph_execution_inputs(
|
||||
self, db_client: "DatabaseManagerAsyncClient", graph_exec, timeout: int = 10
|
||||
) -> Exception | None:
|
||||
"""
|
||||
Complete input moderation flow for graph execution
|
||||
Returns: error_if_failed (None means success)
|
||||
"""
|
||||
if not self.config.enabled:
|
||||
return None
|
||||
|
||||
# Check if AutoMod feature is enabled for this user
|
||||
if not is_feature_enabled("AutoMod", graph_exec.user_id, default=False):
|
||||
logger.debug(f"AutoMod feature not enabled for user {graph_exec.user_id}")
|
||||
return None
|
||||
|
||||
# Get graph model and collect all inputs
|
||||
graph_model = await db_client.get_graph(
|
||||
graph_exec.graph_id,
|
||||
user_id=graph_exec.user_id,
|
||||
version=graph_exec.graph_version,
|
||||
)
|
||||
|
||||
if not graph_model or not graph_model.nodes:
|
||||
return None
|
||||
|
||||
all_inputs = []
|
||||
for node in graph_model.nodes:
|
||||
if node.input_default:
|
||||
all_inputs.extend(str(v) for v in node.input_default.values() if v)
|
||||
if (masks := graph_exec.nodes_input_masks) and (mask := masks.get(node.id)):
|
||||
all_inputs.extend(str(v) for v in mask.values() if v)
|
||||
|
||||
if not all_inputs:
|
||||
return None
|
||||
|
||||
# Combine all content and moderate directly
|
||||
content = " ".join(all_inputs)
|
||||
|
||||
# Run moderation
|
||||
logger.warning(
|
||||
f"Moderating inputs for graph execution {graph_exec.graph_exec_id}"
|
||||
)
|
||||
try:
|
||||
moderation_passed = await self._moderate_content(
|
||||
content,
|
||||
{
|
||||
"user_id": graph_exec.user_id,
|
||||
"graph_id": graph_exec.graph_id,
|
||||
"graph_exec_id": graph_exec.graph_exec_id,
|
||||
"moderation_type": "execution_input",
|
||||
},
|
||||
)
|
||||
|
||||
if not moderation_passed:
|
||||
logger.warning(
|
||||
f"Moderation failed for graph execution {graph_exec.graph_exec_id}"
|
||||
)
|
||||
# Update node statuses for frontend display before raising error
|
||||
await self._update_failed_nodes_for_moderation(
|
||||
db_client, graph_exec.graph_exec_id, "input"
|
||||
)
|
||||
|
||||
return ModerationError(
|
||||
message="Execution failed due to input content moderation",
|
||||
user_id=graph_exec.user_id,
|
||||
graph_exec_id=graph_exec.graph_exec_id,
|
||||
moderation_type="input",
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning(
|
||||
f"Input moderation timed out for graph execution {graph_exec.graph_exec_id}, bypassing moderation"
|
||||
)
|
||||
return None # Bypass moderation on timeout
|
||||
except Exception as e:
|
||||
logger.warning(f"Input moderation execution failed: {e}")
|
||||
return ModerationError(
|
||||
message="Execution failed due to input content moderation error",
|
||||
user_id=graph_exec.user_id,
|
||||
graph_exec_id=graph_exec.graph_exec_id,
|
||||
moderation_type="input",
|
||||
)
|
||||
|
||||
async def moderate_graph_execution_outputs(
|
||||
self,
|
||||
db_client: "DatabaseManagerAsyncClient",
|
||||
graph_exec_id: str,
|
||||
user_id: str,
|
||||
graph_id: str,
|
||||
timeout: int = 10,
|
||||
) -> Exception | None:
|
||||
"""
|
||||
Complete output moderation flow for graph execution
|
||||
Returns: error_if_failed (None means success)
|
||||
"""
|
||||
if not self.config.enabled:
|
||||
return None
|
||||
|
||||
# Check if AutoMod feature is enabled for this user
|
||||
if not is_feature_enabled("AutoMod", user_id, default=False):
|
||||
logger.debug(f"AutoMod feature not enabled for user {user_id}")
|
||||
return None
|
||||
|
||||
# Get completed executions and collect outputs
|
||||
completed_executions = await db_client.get_node_executions(
|
||||
graph_exec_id, statuses=[ExecutionStatus.COMPLETED], include_exec_data=True
|
||||
)
|
||||
|
||||
if not completed_executions:
|
||||
return None
|
||||
|
||||
all_outputs = []
|
||||
for exec_entry in completed_executions:
|
||||
if exec_entry.output_data:
|
||||
all_outputs.extend(str(v) for v in exec_entry.output_data.values() if v)
|
||||
|
||||
if not all_outputs:
|
||||
return None
|
||||
|
||||
# Combine all content and moderate directly
|
||||
content = " ".join(all_outputs)
|
||||
|
||||
# Run moderation
|
||||
logger.warning(f"Moderating outputs for graph execution {graph_exec_id}")
|
||||
try:
|
||||
moderation_passed = await self._moderate_content(
|
||||
content,
|
||||
{
|
||||
"user_id": user_id,
|
||||
"graph_id": graph_id,
|
||||
"graph_exec_id": graph_exec_id,
|
||||
"moderation_type": "execution_output",
|
||||
},
|
||||
)
|
||||
|
||||
if not moderation_passed:
|
||||
logger.warning(f"Moderation failed for graph execution {graph_exec_id}")
|
||||
# Update node statuses for frontend display before raising error
|
||||
await self._update_failed_nodes_for_moderation(
|
||||
db_client, graph_exec_id, "output"
|
||||
)
|
||||
|
||||
return ModerationError(
|
||||
message="Execution failed due to output content moderation",
|
||||
user_id=user_id,
|
||||
graph_exec_id=graph_exec_id,
|
||||
moderation_type="output",
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning(
|
||||
f"Output moderation timed out for graph execution {graph_exec_id}, bypassing moderation"
|
||||
)
|
||||
return None # Bypass moderation on timeout
|
||||
except Exception as e:
|
||||
logger.warning(f"Output moderation execution failed: {e}")
|
||||
return ModerationError(
|
||||
message="Execution failed due to output content moderation error",
|
||||
user_id=user_id,
|
||||
graph_exec_id=graph_exec_id,
|
||||
moderation_type="output",
|
||||
)
|
||||
|
||||
async def _update_failed_nodes_for_moderation(
|
||||
self,
|
||||
db_client: "DatabaseManagerAsyncClient",
|
||||
graph_exec_id: str,
|
||||
moderation_type: Literal["input", "output"],
|
||||
):
|
||||
"""Update node execution statuses for frontend display when moderation fails"""
|
||||
# Import here to avoid circular imports
|
||||
from backend.executor.manager import send_async_execution_update
|
||||
|
||||
if moderation_type == "input":
|
||||
# For input moderation, mark queued/running/incomplete nodes as failed
|
||||
target_statuses = [
|
||||
ExecutionStatus.QUEUED,
|
||||
ExecutionStatus.RUNNING,
|
||||
ExecutionStatus.INCOMPLETE,
|
||||
]
|
||||
else:
|
||||
# For output moderation, mark completed nodes as failed
|
||||
target_statuses = [ExecutionStatus.COMPLETED]
|
||||
|
||||
# Get the executions that need to be updated
|
||||
executions_to_update = await db_client.get_node_executions(
|
||||
graph_exec_id, statuses=target_statuses, include_exec_data=True
|
||||
)
|
||||
|
||||
if not executions_to_update:
|
||||
return
|
||||
|
||||
# Prepare database update tasks
|
||||
exec_updates = []
|
||||
for exec_entry in executions_to_update:
|
||||
# Collect all input and output names to clear
|
||||
cleared_inputs = {}
|
||||
cleared_outputs = {}
|
||||
|
||||
if exec_entry.input_data:
|
||||
for name in exec_entry.input_data.keys():
|
||||
cleared_inputs[name] = ["Failed due to content moderation"]
|
||||
|
||||
if exec_entry.output_data:
|
||||
for name in exec_entry.output_data.keys():
|
||||
cleared_outputs[name] = ["Failed due to content moderation"]
|
||||
|
||||
# Add update task to list
|
||||
exec_updates.append(
|
||||
db_client.update_node_execution_status(
|
||||
exec_entry.node_exec_id,
|
||||
status=ExecutionStatus.FAILED,
|
||||
stats={
|
||||
"error": "Failed due to content moderation",
|
||||
"cleared_inputs": cleared_inputs,
|
||||
"cleared_outputs": cleared_outputs,
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
# Execute all database updates in parallel
|
||||
updated_execs = await asyncio.gather(*exec_updates)
|
||||
|
||||
# Send all websocket updates in parallel
|
||||
await asyncio.gather(
|
||||
*[
|
||||
send_async_execution_update(updated_exec)
|
||||
for updated_exec in updated_execs
|
||||
]
|
||||
)
|
||||
|
||||
async def _moderate_content(self, content: str, metadata: dict[str, Any]) -> bool:
|
||||
"""Moderate content using AutoMod API
|
||||
|
||||
Returns:
|
||||
True: Content approved or timeout occurred
|
||||
False: Content rejected by moderation
|
||||
|
||||
Raises:
|
||||
asyncio.TimeoutError: When moderation times out (should be bypassed)
|
||||
"""
|
||||
try:
|
||||
request_data = AutoModRequest(
|
||||
type="text",
|
||||
content=content,
|
||||
metadata=metadata,
|
||||
)
|
||||
|
||||
response = await self._make_request(request_data)
|
||||
|
||||
if response.success and response.status == "approved":
|
||||
logger.debug(
|
||||
f"Content approved for {metadata.get('graph_exec_id', 'unknown')}"
|
||||
)
|
||||
return True
|
||||
else:
|
||||
reasons = [r.reason for r in response.moderation_results if r.reason]
|
||||
error_msg = f"Content rejected by AutoMod: {'; '.join(reasons)}"
|
||||
logger.warning(f"Content rejected: {error_msg}")
|
||||
return False
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
# Re-raise timeout to be handled by calling methods
|
||||
logger.warning(
|
||||
f"AutoMod API timeout for {metadata.get('graph_exec_id', 'unknown')}"
|
||||
)
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"AutoMod moderation error: {e}")
|
||||
return self.config.fail_open
|
||||
|
||||
async def _make_request(self, request_data: AutoModRequest) -> AutoModResponse:
|
||||
"""Make HTTP request to AutoMod API using the standard request utility"""
|
||||
url = f"{self.config.api_url}/moderate"
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
"X-API-Key": self.config.api_key,
|
||||
}
|
||||
|
||||
# Create requests instance with timeout and retry configuration
|
||||
requests = Requests(
|
||||
extra_headers=headers,
|
||||
retry_max_wait=float(self.config.timeout),
|
||||
)
|
||||
|
||||
try:
|
||||
response = await requests.post(
|
||||
url, json=request_data.model_dump(), timeout=self.config.timeout
|
||||
)
|
||||
|
||||
response_data = response.json()
|
||||
return AutoModResponse.model_validate(response_data)
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
# Re-raise timeout error to be caught by _moderate_content
|
||||
raise
|
||||
except (json.JSONDecodeError, ValidationError) as e:
|
||||
raise Exception(f"Invalid response from AutoMod API: {e}")
|
||||
except Exception as e:
|
||||
# Check if this is an aiohttp timeout that we should convert
|
||||
if "timeout" in str(e).lower():
|
||||
raise asyncio.TimeoutError(f"AutoMod API request timed out: {e}")
|
||||
raise Exception(f"AutoMod API request failed: {e}")
|
||||
|
||||
|
||||
# Global instance
|
||||
automod_manager = AutoModManager()
|
||||
@@ -1,57 +0,0 @@
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class AutoModRequest(BaseModel):
|
||||
"""Request model for AutoMod API"""
|
||||
|
||||
type: str = Field(..., description="Content type - 'text', 'image', 'video'")
|
||||
content: str = Field(..., description="The content to moderate")
|
||||
metadata: Optional[Dict[str, Any]] = Field(
|
||||
default=None, description="Additional context about the content"
|
||||
)
|
||||
|
||||
|
||||
class ModerationResult(BaseModel):
|
||||
"""Individual moderation result"""
|
||||
|
||||
decision: str = Field(
|
||||
..., description="Moderation decision: 'approved', 'rejected', 'flagged'"
|
||||
)
|
||||
reason: Optional[str] = Field(default=None, description="Reason for the decision")
|
||||
|
||||
|
||||
class AutoModResponse(BaseModel):
|
||||
"""Response model for AutoMod API"""
|
||||
|
||||
success: bool = Field(..., description="Whether the request was successful")
|
||||
status: str = Field(
|
||||
..., description="Overall status: 'approved', 'rejected', 'flagged', 'pending'"
|
||||
)
|
||||
moderation_results: List[ModerationResult] = Field(
|
||||
default_factory=list, description="List of moderation results"
|
||||
)
|
||||
|
||||
|
||||
class ModerationConfig(BaseModel):
|
||||
"""Configuration for AutoMod integration"""
|
||||
|
||||
enabled: bool = Field(default=True, description="Whether moderation is enabled")
|
||||
api_url: str = Field(default="", description="AutoMod API base URL")
|
||||
api_key: str = Field(..., description="AutoMod API key")
|
||||
timeout: int = Field(default=30, description="Request timeout in seconds")
|
||||
retry_attempts: int = Field(default=3, description="Number of retry attempts")
|
||||
retry_delay: float = Field(
|
||||
default=1.0, description="Delay between retries in seconds"
|
||||
)
|
||||
fail_open: bool = Field(
|
||||
default=False,
|
||||
description="If True, allow execution to continue if moderation fails",
|
||||
)
|
||||
moderate_inputs: bool = Field(
|
||||
default=True, description="Whether to moderate block inputs"
|
||||
)
|
||||
moderate_outputs: bool = Field(
|
||||
default=True, description="Whether to moderate block outputs"
|
||||
)
|
||||
@@ -33,33 +33,6 @@ class InsufficientBalanceError(ValueError):
|
||||
return self.message
|
||||
|
||||
|
||||
class ModerationError(ValueError):
|
||||
"""Content moderation failure during execution"""
|
||||
|
||||
user_id: str
|
||||
message: str
|
||||
graph_exec_id: str
|
||||
moderation_type: str
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
message: str,
|
||||
user_id: str,
|
||||
graph_exec_id: str,
|
||||
moderation_type: str = "content",
|
||||
):
|
||||
super().__init__(message)
|
||||
self.args = (message, user_id, graph_exec_id, moderation_type)
|
||||
self.message = message
|
||||
self.user_id = user_id
|
||||
self.graph_exec_id = graph_exec_id
|
||||
self.moderation_type = moderation_type
|
||||
|
||||
def __str__(self):
|
||||
"""Used to display the error message in the frontend, because we str() the error when sending the execution update"""
|
||||
return self.message
|
||||
|
||||
|
||||
class GraphValidationError(ValueError):
|
||||
"""Structured validation error for graph validation failures"""
|
||||
|
||||
|
||||
@@ -295,32 +295,6 @@ class Config(UpdateTrackingModel["Config"], BaseSettings):
|
||||
description="Maximum file size in MB for file uploads (1-1024 MB)",
|
||||
)
|
||||
|
||||
# AutoMod configuration
|
||||
automod_enabled: bool = Field(
|
||||
default=False,
|
||||
description="Whether AutoMod content moderation is enabled",
|
||||
)
|
||||
automod_api_url: str = Field(
|
||||
default="",
|
||||
description="AutoMod API base URL - Make sure it ends in /api",
|
||||
)
|
||||
automod_timeout: int = Field(
|
||||
default=30,
|
||||
description="Timeout in seconds for AutoMod API requests",
|
||||
)
|
||||
automod_retry_attempts: int = Field(
|
||||
default=3,
|
||||
description="Number of retry attempts for AutoMod API requests",
|
||||
)
|
||||
automod_retry_delay: float = Field(
|
||||
default=1.0,
|
||||
description="Delay between retries for AutoMod API requests in seconds",
|
||||
)
|
||||
automod_fail_open: bool = Field(
|
||||
default=False,
|
||||
description="If True, allow execution to continue if AutoMod fails",
|
||||
)
|
||||
|
||||
@field_validator("platform_base_url", "frontend_base_url")
|
||||
@classmethod
|
||||
def validate_platform_base_url(cls, v: str, info: ValidationInfo) -> str:
|
||||
@@ -521,10 +495,6 @@ class Secrets(UpdateTrackingModel["Secrets"], BaseSettings):
|
||||
apollo_api_key: str = Field(default="", description="Apollo API Key")
|
||||
smartlead_api_key: str = Field(default="", description="SmartLead API Key")
|
||||
zerobounce_api_key: str = Field(default="", description="ZeroBounce API Key")
|
||||
|
||||
# AutoMod API credentials
|
||||
automod_api_key: str = Field(default="", description="AutoMod API key")
|
||||
|
||||
ayrshare_api_key: str = Field(default="", description="Ayrshare API Key")
|
||||
ayrshare_jwt_key: str = Field(default="", description="Ayrshare private Key")
|
||||
# Add more secret fields as needed
|
||||
|
||||
2
autogpt_platform/backend/poetry.lock
generated
2
autogpt_platform/backend/poetry.lock
generated
@@ -6737,4 +6737,4 @@ cffi = ["cffi (>=1.11)"]
|
||||
[metadata]
|
||||
lock-version = "2.1"
|
||||
python-versions = ">=3.10,<3.13"
|
||||
content-hash = "795414d7ce8f288ea6c65893268b5c29a7c9a60ad75cde28ac7bcdb65f426dfe"
|
||||
content-hash = "05e2b99bd6dc5a74a89df0e1e504853b66bd519062159b0de5fbedf6a1f4d986"
|
||||
|
||||
@@ -10,7 +10,6 @@ packages = [{ include = "backend", format = "sdist" }]
|
||||
[tool.poetry.dependencies]
|
||||
python = ">=3.10,<3.13"
|
||||
aio-pika = "^9.5.5"
|
||||
aiohttp = "^3.10.0"
|
||||
aiodns = "^3.5.0"
|
||||
anthropic = "^0.59.0"
|
||||
apscheduler = "^3.11.0"
|
||||
|
||||
@@ -1,353 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Unified scheduler debugging tool
|
||||
- Test deployment
|
||||
- Collect thread dumps (signal-based, works when FastAPI is stuck)
|
||||
- Monitor periodic dumps
|
||||
"""
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
from datetime import datetime
|
||||
|
||||
import requests
|
||||
|
||||
|
||||
def find_scheduler_pod():
|
||||
"""Find the running scheduler pod"""
|
||||
result = subprocess.run(
|
||||
"kubectl get pods -n dev-agpt --no-headers".split(),
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
|
||||
for line in result.stdout.split("\n"):
|
||||
if "scheduler-server" in line and "Running" in line:
|
||||
return line.split()[0]
|
||||
return None
|
||||
|
||||
|
||||
def test_deployment():
|
||||
"""Test if the deployment has debugging enabled"""
|
||||
print("🧪 TESTING SCHEDULER DEBUG DEPLOYMENT")
|
||||
print("=" * 50)
|
||||
|
||||
pod_name = find_scheduler_pod()
|
||||
if not pod_name:
|
||||
print("❌ No scheduler pod found")
|
||||
return False
|
||||
|
||||
print(f"📍 Pod: {pod_name}")
|
||||
|
||||
# Check if faulthandler is enabled
|
||||
print("🔍 Checking faulthandler setup...")
|
||||
log_result = subprocess.run(
|
||||
f"kubectl logs -n dev-agpt {pod_name} --tail=50".split(),
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
|
||||
faulthandler_enabled = "Faulthandler enabled" in log_result.stdout
|
||||
periodic_enabled = "Periodic thread dump monitor started" in log_result.stdout
|
||||
|
||||
if faulthandler_enabled:
|
||||
print("✅ Faulthandler is enabled")
|
||||
else:
|
||||
print("❌ Faulthandler not found in logs")
|
||||
|
||||
if periodic_enabled:
|
||||
print("✅ Periodic monitoring is enabled")
|
||||
else:
|
||||
print("❌ Periodic monitoring not found in logs")
|
||||
|
||||
# Test signal sending
|
||||
print("\\n📡 Testing signal delivery...")
|
||||
signal_result = subprocess.run(
|
||||
f"kubectl exec -n dev-agpt {pod_name} -- kill -USR2 1".split(),
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
|
||||
if signal_result.returncode == 0:
|
||||
print("✅ Signal sent successfully")
|
||||
time.sleep(2)
|
||||
|
||||
# Check for thread dump in logs
|
||||
new_logs = subprocess.run(
|
||||
f"kubectl logs -n dev-agpt {pod_name} --tail=20".split(),
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
if "SIGNAL THREAD DUMP" in new_logs.stdout:
|
||||
print("✅ Thread dump appeared in logs!")
|
||||
else:
|
||||
print("⚠️ No thread dump found (might take a moment)")
|
||||
else:
|
||||
print(f"❌ Signal failed: {signal_result.stderr}")
|
||||
|
||||
# Test HTTP API (should work when not stuck)
|
||||
print("\\n🌐 Testing HTTP API...")
|
||||
pf_process = None
|
||||
try:
|
||||
pf_process = subprocess.Popen(
|
||||
f"kubectl port-forward -n dev-agpt {pod_name} 8003:8003".split(),
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
)
|
||||
time.sleep(2)
|
||||
|
||||
response = requests.get("http://localhost:8003/debug_thread_dump", timeout=10)
|
||||
if response.status_code == 200:
|
||||
print("✅ HTTP API working")
|
||||
print(f" Thread count found: {'Total threads:' in response.text}")
|
||||
else:
|
||||
print(f"⚠️ HTTP API returned: {response.status_code}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"⚠️ HTTP API failed: {e}")
|
||||
finally:
|
||||
if pf_process:
|
||||
try:
|
||||
pf_process.terminate()
|
||||
pf_process.wait()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
success = faulthandler_enabled and signal_result.returncode == 0
|
||||
print(
|
||||
f"\\n{'✅ DEPLOYMENT TEST PASSED' if success else '❌ DEPLOYMENT TEST FAILED'}"
|
||||
)
|
||||
return success
|
||||
|
||||
|
||||
def collect_thread_dump():
|
||||
"""Collect comprehensive thread dump (works even when scheduler is stuck)"""
|
||||
print("🚨 COLLECTING THREAD DUMP FROM SCHEDULER")
|
||||
print("=" * 60)
|
||||
|
||||
pod_name = find_scheduler_pod()
|
||||
if not pod_name:
|
||||
print("❌ No scheduler pod found")
|
||||
return False
|
||||
|
||||
print(f"📍 Pod: {pod_name}")
|
||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
|
||||
# Send both signals for maximum coverage
|
||||
print("📡 Sending signals for thread dumps...")
|
||||
|
||||
# SIGUSR1 (faulthandler)
|
||||
result1 = subprocess.run(
|
||||
f"kubectl exec -n dev-agpt {pod_name} -- kill -USR1 1".split(),
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
print(f" SIGUSR1: {'✅' if result1.returncode == 0 else '❌'}")
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
# SIGUSR2 (custom handler)
|
||||
result2 = subprocess.run(
|
||||
f"kubectl exec -n dev-agpt {pod_name} -- kill -USR2 1".split(),
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
print(f" SIGUSR2: {'✅' if result2.returncode == 0 else '❌'}")
|
||||
|
||||
time.sleep(3) # Give signals time to execute
|
||||
|
||||
# Collect logs with thread dumps
|
||||
print("📋 Collecting logs...")
|
||||
log_result = subprocess.run(
|
||||
f"kubectl logs -n dev-agpt {pod_name} --tail=500".split(),
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
|
||||
# Save everything
|
||||
dump_file = f"THREAD_DUMP_{timestamp}.txt"
|
||||
with open(dump_file, "w") as f:
|
||||
f.write("SCHEDULER THREAD DUMP COLLECTION\\n")
|
||||
f.write(f"Timestamp: {datetime.now()}\\n")
|
||||
f.write(f"Pod: {pod_name}\\n")
|
||||
f.write("=" * 80 + "\\n\\n")
|
||||
f.write("FULL LOGS (last 500 lines):\\n")
|
||||
f.write("-" * 40 + "\\n")
|
||||
f.write(log_result.stdout)
|
||||
|
||||
print(f"💾 Full dump saved: {dump_file}")
|
||||
|
||||
# Extract and show thread dump preview
|
||||
lines = log_result.stdout.split("\\n")
|
||||
thread_dumps = []
|
||||
in_dump = False
|
||||
current_dump = []
|
||||
|
||||
for line in lines:
|
||||
if any(
|
||||
marker in line
|
||||
for marker in ["SIGNAL THREAD DUMP", "Fatal Python error", "Thread 0x"]
|
||||
):
|
||||
if current_dump:
|
||||
thread_dumps.append(current_dump)
|
||||
current_dump = [line]
|
||||
in_dump = True
|
||||
elif in_dump and (
|
||||
"END SIGNAL THREAD DUMP" in line or "Current thread 0x" in line
|
||||
):
|
||||
current_dump.append(line)
|
||||
thread_dumps.append(current_dump)
|
||||
current_dump = []
|
||||
in_dump = False
|
||||
elif in_dump:
|
||||
current_dump.append(line)
|
||||
|
||||
if current_dump:
|
||||
thread_dumps.append(current_dump)
|
||||
|
||||
if thread_dumps:
|
||||
print(f"\\n🔍 FOUND {len(thread_dumps)} THREAD DUMP(S):")
|
||||
print("-" * 50)
|
||||
|
||||
# Show the most recent/complete dump
|
||||
latest_dump = thread_dumps[-1]
|
||||
for i, line in enumerate(latest_dump[:50]): # First 50 lines
|
||||
print(line)
|
||||
|
||||
if len(latest_dump) > 50:
|
||||
print("... (truncated, see full dump in file)")
|
||||
|
||||
# Create separate file with just thread dumps
|
||||
clean_dump_file = f"CLEAN_THREAD_DUMP_{timestamp}.txt"
|
||||
with open(clean_dump_file, "w") as f:
|
||||
f.write(f"EXTRACTED THREAD DUMPS - {datetime.now()}\\n")
|
||||
f.write("=" * 60 + "\\n\\n")
|
||||
for i, dump in enumerate(thread_dumps, 1):
|
||||
f.write(f"DUMP #{i}:\\n")
|
||||
f.write("-" * 30 + "\\n")
|
||||
f.write("\\n".join(dump))
|
||||
f.write("\\n\\n")
|
||||
|
||||
print(f"🎯 Clean thread dumps saved: {clean_dump_file}")
|
||||
|
||||
else:
|
||||
print("⚠️ No thread dumps found in logs")
|
||||
print("Recent log lines:")
|
||||
for line in lines[-10:]:
|
||||
print(f" {line}")
|
||||
|
||||
# Try HTTP backup (will fail if scheduler is stuck, but worth trying)
|
||||
print("\\n🌐 Attempting HTTP backup...")
|
||||
pf_process = None
|
||||
try:
|
||||
pf_process = subprocess.Popen(
|
||||
f"kubectl port-forward -n dev-agpt {pod_name} 8003:8003".split(),
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
)
|
||||
time.sleep(2)
|
||||
|
||||
response = requests.get("http://localhost:8003/debug_thread_dump", timeout=5)
|
||||
if response.status_code == 200:
|
||||
http_file = f"HTTP_THREAD_DUMP_{timestamp}.txt"
|
||||
with open(http_file, "w") as f:
|
||||
f.write(response.text)
|
||||
print(f"✅ HTTP backup saved: {http_file}")
|
||||
else:
|
||||
print(f"⚠️ HTTP failed: {response.status_code}")
|
||||
|
||||
except Exception as e:
|
||||
print(f"⚠️ HTTP failed (expected if stuck): {e}")
|
||||
finally:
|
||||
if pf_process:
|
||||
try:
|
||||
pf_process.terminate()
|
||||
pf_process.wait()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
print("\\n✅ COLLECTION COMPLETE!")
|
||||
return len(thread_dumps) > 0
|
||||
|
||||
|
||||
def monitor_periodic_dumps(duration_minutes=10):
|
||||
"""Monitor periodic thread dumps for a specified duration"""
|
||||
print(f"👁️ MONITORING PERIODIC DUMPS FOR {duration_minutes} MINUTES")
|
||||
print("=" * 50)
|
||||
|
||||
pod_name = find_scheduler_pod()
|
||||
if not pod_name:
|
||||
print("❌ No scheduler pod found")
|
||||
return
|
||||
|
||||
print(f"📍 Pod: {pod_name}")
|
||||
print("⏰ Watching for periodic status messages and thread dumps...")
|
||||
|
||||
start_time = time.time()
|
||||
end_time = start_time + (duration_minutes * 60)
|
||||
|
||||
# Get current log position (for reference, not used currently)
|
||||
# Could be used for tracking new vs old logs if needed
|
||||
|
||||
while time.time() < end_time:
|
||||
try:
|
||||
# Get new logs
|
||||
current_logs = subprocess.run(
|
||||
f"kubectl logs -n dev-agpt {pod_name} --tail=50".split(),
|
||||
capture_output=True,
|
||||
text=True,
|
||||
)
|
||||
|
||||
for line in current_logs.stdout.split("\\n"):
|
||||
if "Periodic check:" in line:
|
||||
print(f"📊 {line}")
|
||||
elif "SIGNAL THREAD DUMP" in line:
|
||||
print(f"🚨 Thread dump detected: {line}")
|
||||
elif "No health check" in line:
|
||||
print(f"⚠️ Health issue: {line}")
|
||||
|
||||
time.sleep(30) # Check every 30 seconds
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\\n⏹️ Monitoring stopped by user")
|
||||
break
|
||||
except Exception as e:
|
||||
print(f"Error during monitoring: {e}")
|
||||
break
|
||||
|
||||
print("\\n✅ MONITORING COMPLETE")
|
||||
|
||||
|
||||
def main():
|
||||
if len(sys.argv) < 2:
|
||||
print("🔧 SCHEDULER DEBUG TOOL")
|
||||
print("=" * 30)
|
||||
print("Usage:")
|
||||
print(" python scheduler_debug.py test - Test deployment")
|
||||
print(" python scheduler_debug.py collect - Collect thread dump")
|
||||
print(" python scheduler_debug.py monitor [min] - Monitor periodic dumps")
|
||||
print(" python scheduler_debug.py all - Run test + collect")
|
||||
return
|
||||
|
||||
command = sys.argv[1].lower()
|
||||
|
||||
if command == "test":
|
||||
test_deployment()
|
||||
elif command == "collect":
|
||||
collect_thread_dump()
|
||||
elif command == "monitor":
|
||||
duration = int(sys.argv[2]) if len(sys.argv) > 2 else 10
|
||||
monitor_periodic_dumps(duration)
|
||||
elif command == "all":
|
||||
print("Running complete debugging sequence...\\n")
|
||||
if test_deployment():
|
||||
print("\\n" + "=" * 50)
|
||||
collect_thread_dump()
|
||||
else:
|
||||
print("❌ Test failed, skipping collection")
|
||||
else:
|
||||
print(f"❌ Unknown command: {command}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -44,7 +44,7 @@ export const SettingsForm = ({
|
||||
<FormItem>
|
||||
<FormLabel>Email</FormLabel>
|
||||
<FormControl>
|
||||
<Input {...field} type="email" />
|
||||
<Input {...field} type="email" data-testid="settings-email" />
|
||||
</FormControl>
|
||||
<FormMessage />
|
||||
</FormItem>
|
||||
@@ -62,6 +62,7 @@ export const SettingsForm = ({
|
||||
{...field}
|
||||
type="password"
|
||||
placeholder="************"
|
||||
data-testid="settings-password"
|
||||
/>
|
||||
</FormControl>
|
||||
<FormMessage />
|
||||
@@ -80,6 +81,7 @@ export const SettingsForm = ({
|
||||
{...field}
|
||||
type="password"
|
||||
placeholder="************"
|
||||
data-testid="settings-confirm-password"
|
||||
/>
|
||||
</FormControl>
|
||||
<FormMessage />
|
||||
@@ -117,6 +119,7 @@ export const SettingsForm = ({
|
||||
<Switch
|
||||
checked={field.value}
|
||||
onCheckedChange={field.onChange}
|
||||
data-testid="settings-notify-on-agent-run"
|
||||
/>
|
||||
</FormControl>
|
||||
</FormItem>
|
||||
@@ -141,6 +144,7 @@ export const SettingsForm = ({
|
||||
<Switch
|
||||
checked={field.value}
|
||||
onCheckedChange={field.onChange}
|
||||
data-testid="settings-notify-on-block-execution-failed"
|
||||
/>
|
||||
</FormControl>
|
||||
</FormItem>
|
||||
@@ -164,6 +168,7 @@ export const SettingsForm = ({
|
||||
<Switch
|
||||
checked={field.value}
|
||||
onCheckedChange={field.onChange}
|
||||
data-testid="settings-notify-on-continuous-agent-error"
|
||||
/>
|
||||
</FormControl>
|
||||
</FormItem>
|
||||
@@ -193,6 +198,7 @@ export const SettingsForm = ({
|
||||
<Switch
|
||||
checked={field.value}
|
||||
onCheckedChange={field.onChange}
|
||||
data-testid="settings-notify-on-zero-balance"
|
||||
/>
|
||||
</FormControl>
|
||||
</FormItem>
|
||||
@@ -216,6 +222,7 @@ export const SettingsForm = ({
|
||||
<Switch
|
||||
checked={field.value}
|
||||
onCheckedChange={field.onChange}
|
||||
data-testid="settings-notify-on-low-balance"
|
||||
/>
|
||||
</FormControl>
|
||||
</FormItem>
|
||||
@@ -243,6 +250,7 @@ export const SettingsForm = ({
|
||||
<Switch
|
||||
checked={field.value}
|
||||
onCheckedChange={field.onChange}
|
||||
data-testid="settings-notify-on-daily-summary"
|
||||
/>
|
||||
</FormControl>
|
||||
</FormItem>
|
||||
@@ -264,6 +272,7 @@ export const SettingsForm = ({
|
||||
<Switch
|
||||
checked={field.value}
|
||||
onCheckedChange={field.onChange}
|
||||
data-testid="settings-notify-on-weekly-summary"
|
||||
/>
|
||||
</FormControl>
|
||||
</FormItem>
|
||||
@@ -285,6 +294,7 @@ export const SettingsForm = ({
|
||||
<Switch
|
||||
checked={field.value}
|
||||
onCheckedChange={field.onChange}
|
||||
data-testid="settings-notify-on-monthly-summary"
|
||||
/>
|
||||
</FormControl>
|
||||
</FormItem>
|
||||
@@ -300,6 +310,7 @@ export const SettingsForm = ({
|
||||
type="button"
|
||||
onClick={onCancel}
|
||||
disabled={form.formState.isSubmitting}
|
||||
data-testid="settings-cancel"
|
||||
>
|
||||
Cancel
|
||||
</Button>
|
||||
|
||||
@@ -56,10 +56,7 @@ export function ProfileInfoForm({ profile }: { profile: ProfileDetails }) {
|
||||
|
||||
return (
|
||||
<div className="w-full min-w-[800px] px-4 sm:px-8">
|
||||
<h1
|
||||
data-testid="profile-info-form-title"
|
||||
className="font-circular mb-6 text-[28px] font-normal text-neutral-900 dark:text-neutral-100 sm:mb-8 sm:text-[35px]"
|
||||
>
|
||||
<h1 className="font-circular mb-6 text-[28px] font-normal text-neutral-900 dark:text-neutral-100 sm:mb-8 sm:text-[35px]">
|
||||
Profile
|
||||
</h1>
|
||||
|
||||
@@ -95,18 +92,13 @@ export function ProfileInfoForm({ profile }: { profile: ProfileDetails }) {
|
||||
|
||||
<form className="space-y-4 sm:space-y-6" onSubmit={submitForm}>
|
||||
<div className="w-full">
|
||||
<label
|
||||
htmlFor="displayName"
|
||||
className="font-circular mb-1.5 block text-base font-normal leading-tight text-neutral-700 dark:text-neutral-300"
|
||||
>
|
||||
<label className="font-circular mb-1.5 block text-base font-normal leading-tight text-neutral-700 dark:text-neutral-300">
|
||||
Display name
|
||||
</label>
|
||||
<div className="rounded-[55px] border border-slate-200 px-4 py-2.5 dark:border-slate-700 dark:bg-slate-800">
|
||||
<input
|
||||
type="text"
|
||||
id="displayName"
|
||||
name="displayName"
|
||||
data-testid="profile-info-form-display-name"
|
||||
defaultValue={profileData.name}
|
||||
placeholder="Enter your display name"
|
||||
className="font-circular w-full border-none bg-transparent text-base font-normal text-neutral-900 placeholder:text-neutral-400 focus:outline-none dark:text-white dark:placeholder:text-neutral-500"
|
||||
@@ -122,17 +114,13 @@ export function ProfileInfoForm({ profile }: { profile: ProfileDetails }) {
|
||||
</div>
|
||||
|
||||
<div className="w-full">
|
||||
<label
|
||||
htmlFor="handle"
|
||||
className="font-circular mb-1.5 block text-base font-normal leading-tight text-neutral-700 dark:text-neutral-300"
|
||||
>
|
||||
<label className="font-circular mb-1.5 block text-base font-normal leading-tight text-neutral-700 dark:text-neutral-300">
|
||||
Handle
|
||||
</label>
|
||||
<div className="rounded-[55px] border border-slate-200 px-4 py-2.5 dark:border-slate-700 dark:bg-slate-800">
|
||||
<input
|
||||
type="text"
|
||||
name="handle"
|
||||
id="handle"
|
||||
defaultValue={profileData.username}
|
||||
placeholder="@username"
|
||||
className="font-circular w-full border-none bg-transparent text-base font-normal text-neutral-900 placeholder:text-neutral-400 focus:outline-none dark:text-white dark:placeholder:text-neutral-500"
|
||||
@@ -148,16 +136,12 @@ export function ProfileInfoForm({ profile }: { profile: ProfileDetails }) {
|
||||
</div>
|
||||
|
||||
<div className="w-full">
|
||||
<label
|
||||
htmlFor="bio"
|
||||
className="font-circular mb-1.5 block text-base font-normal leading-tight text-neutral-700 dark:text-neutral-300"
|
||||
>
|
||||
<label className="font-circular mb-1.5 block text-base font-normal leading-tight text-neutral-700 dark:text-neutral-300">
|
||||
Bio
|
||||
</label>
|
||||
<div className="h-[220px] rounded-2xl border border-slate-200 py-2.5 pl-4 pr-4 dark:border-slate-700 dark:bg-slate-800">
|
||||
<textarea
|
||||
name="bio"
|
||||
id="bio"
|
||||
defaultValue={profileData.description}
|
||||
placeholder="Tell us about yourself..."
|
||||
className="font-circular h-full w-full resize-none border-none bg-transparent text-base font-normal text-neutral-900 placeholder:text-neutral-400 focus:outline-none dark:text-white dark:placeholder:text-neutral-500"
|
||||
@@ -185,17 +169,13 @@ export function ProfileInfoForm({ profile }: { profile: ProfileDetails }) {
|
||||
const link = profileData.links[linkNum - 1];
|
||||
return (
|
||||
<div key={linkNum} className="w-full">
|
||||
<label
|
||||
htmlFor={`link${linkNum}`}
|
||||
className="font-circular mb-1.5 block text-base font-normal leading-tight text-neutral-700 dark:text-neutral-300"
|
||||
>
|
||||
<label className="font-circular mb-1.5 block text-base font-normal leading-tight text-neutral-700 dark:text-neutral-300">
|
||||
Link {linkNum}
|
||||
</label>
|
||||
<div className="rounded-[55px] border border-slate-200 px-4 py-2.5 dark:border-slate-700 dark:bg-slate-800">
|
||||
<input
|
||||
type="text"
|
||||
name={`link${linkNum}`}
|
||||
id={`link${linkNum}`}
|
||||
placeholder="https://"
|
||||
defaultValue={link || ""}
|
||||
className="font-circular w-full border-none bg-transparent text-base font-normal text-neutral-900 placeholder:text-neutral-400 focus:outline-none dark:text-white dark:placeholder:text-neutral-500"
|
||||
@@ -219,8 +199,7 @@ export function ProfileInfoForm({ profile }: { profile: ProfileDetails }) {
|
||||
<Separator />
|
||||
|
||||
<div className="flex h-[50px] items-center justify-end gap-3 py-8">
|
||||
{/* FRONTEND-TODO: Need to fix it */}
|
||||
{/* <Button
|
||||
<Button
|
||||
type="button"
|
||||
variant="secondary"
|
||||
className="font-circular h-[50px] rounded-[35px] bg-neutral-200 px-6 py-3 text-base font-medium text-neutral-800 transition-colors hover:bg-neutral-300 dark:border-neutral-700 dark:bg-neutral-700 dark:text-neutral-200 dark:hover:border-neutral-600 dark:hover:bg-neutral-600"
|
||||
@@ -229,7 +208,7 @@ export function ProfileInfoForm({ profile }: { profile: ProfileDetails }) {
|
||||
}}
|
||||
>
|
||||
Cancel
|
||||
</Button> */}
|
||||
</Button>
|
||||
<Button
|
||||
type="submit"
|
||||
disabled={isSubmitting}
|
||||
|
||||
@@ -13,8 +13,15 @@ const MILLISECONDS_PER_MINUTE = SECONDS_PER_MINUTE * MILLISECONDS_PER_SECOND;
|
||||
const MILLISECONDS_PER_HOUR = MINUTES_PER_HOUR * MILLISECONDS_PER_MINUTE;
|
||||
const MILLISECONDS_PER_DAY = HOURS_PER_DAY * MILLISECONDS_PER_HOUR;
|
||||
const MILLISECONDS_PER_WEEK = DAYS_PER_WEEK * MILLISECONDS_PER_DAY;
|
||||
|
||||
// Display constants
|
||||
const SHORT_DURATION_THRESHOLD_SECONDS = 5;
|
||||
|
||||
// State sanity limits - keep only most recent executions to prevent unbounded growth
|
||||
const MAX_ACTIVE_EXECUTIONS_IN_STATE = 200; // Most important - these are running
|
||||
const MAX_RECENT_COMPLETIONS_IN_STATE = 100;
|
||||
const MAX_RECENT_FAILURES_IN_STATE = 100;
|
||||
|
||||
export function formatTimeAgo(dateStr: string): string {
|
||||
const date = new Date(dateStr);
|
||||
const now = new Date();
|
||||
@@ -203,15 +210,17 @@ export function categorizeExecutions(
|
||||
);
|
||||
|
||||
// Filter and limit each category to prevent unbounded state growth
|
||||
const activeExecutions = enrichedExecutions.filter(isActiveExecution);
|
||||
const activeExecutions = enrichedExecutions
|
||||
.filter(isActiveExecution)
|
||||
.slice(0, MAX_ACTIVE_EXECUTIONS_IN_STATE);
|
||||
|
||||
const recentCompletions = enrichedExecutions.filter((execution) =>
|
||||
isRecentCompletion(execution, oneWeekAgo),
|
||||
);
|
||||
const recentCompletions = enrichedExecutions
|
||||
.filter((execution) => isRecentCompletion(execution, oneWeekAgo))
|
||||
.slice(0, MAX_RECENT_COMPLETIONS_IN_STATE);
|
||||
|
||||
const recentFailures = enrichedExecutions.filter((execution) =>
|
||||
isRecentFailure(execution, oneWeekAgo),
|
||||
);
|
||||
const recentFailures = enrichedExecutions
|
||||
.filter((execution) => isRecentFailure(execution, oneWeekAgo))
|
||||
.slice(0, MAX_RECENT_FAILURES_IN_STATE);
|
||||
|
||||
return {
|
||||
activeExecutions,
|
||||
@@ -254,11 +263,20 @@ export function addExecutionToCategory(
|
||||
const newState = { ...state };
|
||||
|
||||
if (isActiveExecution(execution)) {
|
||||
newState.activeExecutions = [execution, ...newState.activeExecutions];
|
||||
newState.activeExecutions = [execution, ...newState.activeExecutions].slice(
|
||||
0,
|
||||
MAX_ACTIVE_EXECUTIONS_IN_STATE,
|
||||
);
|
||||
} else if (isRecentCompletion(execution, oneWeekAgo)) {
|
||||
newState.recentCompletions = [execution, ...newState.recentCompletions];
|
||||
newState.recentCompletions = [
|
||||
execution,
|
||||
...newState.recentCompletions,
|
||||
].slice(0, MAX_RECENT_COMPLETIONS_IN_STATE);
|
||||
} else if (isRecentFailure(execution, oneWeekAgo)) {
|
||||
newState.recentFailures = [execution, ...newState.recentFailures];
|
||||
newState.recentFailures = [execution, ...newState.recentFailures].slice(
|
||||
0,
|
||||
MAX_RECENT_FAILURES_IN_STATE,
|
||||
);
|
||||
}
|
||||
|
||||
return newState;
|
||||
|
||||
@@ -1,145 +0,0 @@
|
||||
import { Locator, Page } from "@playwright/test";
|
||||
import { BasePage } from "./base.page";
|
||||
import { getSelectors } from "../utils/selectors";
|
||||
|
||||
export class ProfileFormPage extends BasePage {
|
||||
constructor(page: Page) {
|
||||
super(page);
|
||||
}
|
||||
|
||||
private getId(id: string | RegExp): Locator {
|
||||
const { getId } = getSelectors(this.page);
|
||||
return getId(id);
|
||||
}
|
||||
|
||||
private async hideFloatingWidgets(): Promise<void> {
|
||||
await this.page.addStyleTag({
|
||||
content: `
|
||||
[data-tally-open] { display: none !important; }
|
||||
`,
|
||||
});
|
||||
}
|
||||
|
||||
// Locators
|
||||
title(): Locator {
|
||||
return this.getId("profile-info-form-title");
|
||||
}
|
||||
|
||||
displayNameField(): Locator {
|
||||
const { getField } = getSelectors(this.page);
|
||||
return getField("Display name");
|
||||
}
|
||||
|
||||
handleField(): Locator {
|
||||
const { getField } = getSelectors(this.page);
|
||||
return getField("Handle");
|
||||
}
|
||||
|
||||
bioField(): Locator {
|
||||
const { getField } = getSelectors(this.page);
|
||||
return getField("Bio");
|
||||
}
|
||||
|
||||
linkField(index: number): Locator {
|
||||
this.assertValidLinkIndex(index);
|
||||
const { getField } = getSelectors(this.page);
|
||||
return getField(`Link ${index}`);
|
||||
}
|
||||
|
||||
cancelButton(): Locator {
|
||||
const { getButton } = getSelectors(this.page);
|
||||
return getButton("Cancel");
|
||||
}
|
||||
|
||||
saveButton(): Locator {
|
||||
const { getButton } = getSelectors(this.page);
|
||||
return getButton("Save changes");
|
||||
}
|
||||
|
||||
// State
|
||||
async isLoaded(): Promise<boolean> {
|
||||
try {
|
||||
await this.title().waitFor({ state: "visible", timeout: 10_000 });
|
||||
await this.displayNameField().waitFor({
|
||||
state: "visible",
|
||||
timeout: 10_000,
|
||||
});
|
||||
await this.handleField().waitFor({ state: "visible", timeout: 10_000 });
|
||||
return true;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// Actions
|
||||
async setDisplayName(name: string): Promise<void> {
|
||||
await this.displayNameField().fill(name);
|
||||
}
|
||||
|
||||
async getDisplayName(): Promise<string> {
|
||||
return this.displayNameField().inputValue();
|
||||
}
|
||||
|
||||
async setHandle(handle: string): Promise<void> {
|
||||
await this.handleField().fill(handle);
|
||||
}
|
||||
|
||||
async getHandle(): Promise<string> {
|
||||
return this.handleField().inputValue();
|
||||
}
|
||||
|
||||
async setBio(bio: string): Promise<void> {
|
||||
await this.bioField().fill(bio);
|
||||
}
|
||||
|
||||
async getBio(): Promise<string> {
|
||||
return this.bioField().inputValue();
|
||||
}
|
||||
|
||||
async setLink(index: number, url: string): Promise<void> {
|
||||
await this.linkField(index).fill(url);
|
||||
}
|
||||
|
||||
async getLink(index: number): Promise<string> {
|
||||
return this.linkField(index).inputValue();
|
||||
}
|
||||
|
||||
async setLinks(links: Array<string | undefined>): Promise<void> {
|
||||
for (let i = 1; i <= 5; i++) {
|
||||
const val = links[i - 1] ?? "";
|
||||
await this.setLink(i, val);
|
||||
}
|
||||
}
|
||||
|
||||
async clickCancel(): Promise<void> {
|
||||
await this.cancelButton().click();
|
||||
}
|
||||
|
||||
async clickSave(): Promise<void> {
|
||||
await this.saveButton().click();
|
||||
}
|
||||
|
||||
async saveChanges(): Promise<void> {
|
||||
await this.hideFloatingWidgets();
|
||||
await this.clickSave();
|
||||
await this.waitForSaveComplete();
|
||||
}
|
||||
|
||||
async waitForSaveComplete(timeoutMs: number = 15_000): Promise<void> {
|
||||
const { getButton } = getSelectors(this.page);
|
||||
await getButton("Save changes").waitFor({
|
||||
state: "attached",
|
||||
timeout: timeoutMs,
|
||||
});
|
||||
await getButton("Save changes").waitFor({
|
||||
state: "visible",
|
||||
timeout: timeoutMs,
|
||||
});
|
||||
}
|
||||
|
||||
private assertValidLinkIndex(index: number) {
|
||||
if (index < 1 || index > 5) {
|
||||
throw new Error(`Link index must be between 1 and 5. Received: ${index}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
182
autogpt_platform/frontend/src/tests/pages/settings.page.ts
Normal file
182
autogpt_platform/frontend/src/tests/pages/settings.page.ts
Normal file
@@ -0,0 +1,182 @@
|
||||
import { expect, Locator, Page } from "@playwright/test";
|
||||
import { BasePage } from "./base.page";
|
||||
import { getSelectors } from "../utils/selectors";
|
||||
|
||||
export async function getSwitchState(toggle: Locator): Promise<boolean> {
|
||||
const ariaChecked = await toggle.getAttribute("aria-checked");
|
||||
if (ariaChecked === "true") return true;
|
||||
if (ariaChecked === "false") return false;
|
||||
|
||||
const dataState = await toggle.getAttribute("data-state");
|
||||
if (dataState === "checked") return true;
|
||||
if (dataState === "unchecked") return false;
|
||||
|
||||
const dataChecked = await toggle.getAttribute("data-checked");
|
||||
return dataChecked !== null;
|
||||
}
|
||||
|
||||
type ToggleId =
|
||||
| "settings-notify-on-agent-run"
|
||||
| "settings-notify-on-block-execution-failed"
|
||||
| "settings-notify-on-continuous-agent-error"
|
||||
| "settings-notify-on-zero-balance"
|
||||
| "settings-notify-on-low-balance"
|
||||
| "settings-notify-on-daily-summary"
|
||||
| "settings-notify-on-weekly-summary"
|
||||
| "settings-notify-on-monthly-summary";
|
||||
|
||||
export const TOGGLE_IDS: ReadonlyArray<ToggleId> = [
|
||||
"settings-notify-on-agent-run",
|
||||
"settings-notify-on-block-execution-failed",
|
||||
"settings-notify-on-continuous-agent-error",
|
||||
"settings-notify-on-zero-balance",
|
||||
"settings-notify-on-low-balance",
|
||||
"settings-notify-on-daily-summary",
|
||||
"settings-notify-on-weekly-summary",
|
||||
"settings-notify-on-monthly-summary",
|
||||
];
|
||||
|
||||
export class SettingsPage extends BasePage {
|
||||
static readonly path = "/profile/settings";
|
||||
|
||||
constructor(page: Page) {
|
||||
super(page);
|
||||
}
|
||||
|
||||
async goto(): Promise<void> {
|
||||
await this.page.goto(SettingsPage.path);
|
||||
await this.isLoaded();
|
||||
}
|
||||
|
||||
async isLoaded(): Promise<boolean> {
|
||||
try {
|
||||
await this.page.waitForLoadState("domcontentloaded");
|
||||
const header = this.page.getByRole("heading", { name: "My account" });
|
||||
const email = this.getEmailInput();
|
||||
|
||||
await Promise.all([
|
||||
header.waitFor({ state: "visible" }),
|
||||
email.waitFor({ state: "visible" }),
|
||||
]);
|
||||
|
||||
return true;
|
||||
} catch {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
getEmailInput(): Locator {
|
||||
return this.page.getByTestId("settings-email");
|
||||
}
|
||||
getPasswordInput(): Locator {
|
||||
return this.page.getByTestId("settings-password");
|
||||
}
|
||||
getConfirmPasswordInput(): Locator {
|
||||
return this.page.getByTestId("settings-confirm-password");
|
||||
}
|
||||
getToggle(id: ToggleId): Locator {
|
||||
return this.page.getByTestId(id);
|
||||
}
|
||||
getCancelButton(): Locator {
|
||||
return this.page.getByTestId("settings-cancel");
|
||||
}
|
||||
getSaveButton(): Locator {
|
||||
return this.page.getByRole("button", { name: /Save changes|Saving\.\.\./ });
|
||||
}
|
||||
|
||||
async setEmail(value: string): Promise<void> {
|
||||
const input = this.getEmailInput();
|
||||
await input.waitFor({ state: "visible" });
|
||||
await input.fill(value);
|
||||
await expect(input).toHaveValue(value);
|
||||
}
|
||||
|
||||
async setPassword(value: string): Promise<void> {
|
||||
const input = this.getPasswordInput();
|
||||
await input.waitFor({ state: "visible" });
|
||||
await input.fill(value);
|
||||
await expect(input).toHaveValue(value);
|
||||
}
|
||||
|
||||
async setConfirmPassword(value: string): Promise<void> {
|
||||
const input = this.getConfirmPasswordInput();
|
||||
await input.waitFor({ state: "visible" });
|
||||
await input.fill(value);
|
||||
await expect(input).toHaveValue(value);
|
||||
}
|
||||
|
||||
private async getSwitchState(toggle: Locator): Promise<boolean> {
|
||||
const ariaChecked = await toggle.getAttribute("aria-checked");
|
||||
if (ariaChecked === "true") return true;
|
||||
if (ariaChecked === "false") return false;
|
||||
|
||||
const dataState = await toggle.getAttribute("data-state");
|
||||
if (dataState === "checked") return true;
|
||||
if (dataState === "unchecked") return false;
|
||||
|
||||
const dataChecked = await toggle.getAttribute("data-checked");
|
||||
return dataChecked !== null;
|
||||
}
|
||||
|
||||
private async setSwitchState(toggle: Locator, desired: boolean): Promise<void> {
|
||||
await toggle.waitFor({ state: "visible" });
|
||||
const current = await this.getSwitchState(toggle);
|
||||
if (current !== desired) {
|
||||
await toggle.click();
|
||||
await expect
|
||||
.poll(async () => this.getSwitchState(toggle))
|
||||
.toBe(desired);
|
||||
}
|
||||
}
|
||||
|
||||
async toggle(id: ToggleId): Promise<void> {
|
||||
await this.getToggle(id).click();
|
||||
}
|
||||
|
||||
async enable(id: ToggleId): Promise<void> {
|
||||
await this.setSwitchState(this.getToggle(id), true);
|
||||
}
|
||||
|
||||
async disable(id: ToggleId): Promise<void> {
|
||||
await this.setSwitchState(this.getToggle(id), false);
|
||||
}
|
||||
|
||||
async cancelChanges(): Promise<void> {
|
||||
const btn = this.getCancelButton();
|
||||
await btn.waitFor({ state: "visible" });
|
||||
await btn.click();
|
||||
await this.getEmailInput().waitFor({ state: "visible" });
|
||||
}
|
||||
|
||||
async saveChanges(): Promise<void> {
|
||||
const btn = this.getSaveButton();
|
||||
await btn.waitFor({ state: "visible" });
|
||||
await btn.click();
|
||||
await this.waitForSaveComplete();
|
||||
}
|
||||
|
||||
async waitForSaveComplete(): Promise<void> {
|
||||
const { getText } = getSelectors(this.page);
|
||||
const toast = getText("Successfully updated settings");
|
||||
|
||||
await Promise.race([
|
||||
toast.waitFor({ state: "visible" }),
|
||||
expect(this.getSaveButton()).toHaveText("Save changes"),
|
||||
]);
|
||||
}
|
||||
|
||||
async getEmailValue(): Promise<string> {
|
||||
return this.getEmailInput().inputValue();
|
||||
}
|
||||
|
||||
async expectValidationError(text: string | RegExp): Promise<void> {
|
||||
const { getText } = getSelectors(this.page);
|
||||
await expect(getText(text)).toBeVisible();
|
||||
}
|
||||
}
|
||||
|
||||
export async function navigateToSettings(page: Page): Promise<SettingsPage> {
|
||||
const settings = new SettingsPage(page);
|
||||
await settings.goto();
|
||||
return settings;
|
||||
}
|
||||
@@ -1,108 +0,0 @@
|
||||
import test, { expect } from "@playwright/test";
|
||||
import { ProfileFormPage } from "./pages/profile-form.page";
|
||||
import { LoginPage } from "./pages/login.page";
|
||||
import { hasUrl } from "./utils/assertion";
|
||||
import { TEST_CREDENTIALS } from "./credentials";
|
||||
|
||||
test.describe("Profile Form", () => {
|
||||
let profileFormPage: ProfileFormPage;
|
||||
|
||||
test.beforeEach(async ({ page }) => {
|
||||
profileFormPage = new ProfileFormPage(page);
|
||||
|
||||
const loginPage = new LoginPage(page);
|
||||
await loginPage.goto();
|
||||
await loginPage.login(TEST_CREDENTIALS.email, TEST_CREDENTIALS.password);
|
||||
await hasUrl(page, "/marketplace");
|
||||
});
|
||||
|
||||
test("redirects to login when user is not authenticated", async ({
|
||||
browser,
|
||||
}) => {
|
||||
const context = await browser.newContext();
|
||||
const page = await context.newPage();
|
||||
|
||||
try {
|
||||
await page.goto("/profile");
|
||||
await hasUrl(page, "/login");
|
||||
} finally {
|
||||
await page.close();
|
||||
await context.close();
|
||||
}
|
||||
});
|
||||
|
||||
test("can save profile changes successfully", async ({ page }) => {
|
||||
await profileFormPage.navbar.clickProfileLink();
|
||||
|
||||
await expect(profileFormPage.isLoaded()).resolves.toBeTruthy();
|
||||
await hasUrl(page, new RegExp("/profile"));
|
||||
|
||||
const suffix = Date.now().toString().slice(-6);
|
||||
const newDisplayName = `E2E Name ${suffix}`;
|
||||
const newHandle = `e2euser${suffix}`;
|
||||
const newBio = `E2E bio ${suffix}`;
|
||||
const newLinks = [
|
||||
`https://example.com/${suffix}/1`,
|
||||
`https://example.com/${suffix}/2`,
|
||||
`https://example.com/${suffix}/3`,
|
||||
`https://example.com/${suffix}/4`,
|
||||
`https://example.com/${suffix}/5`,
|
||||
];
|
||||
|
||||
await profileFormPage.setDisplayName(newDisplayName);
|
||||
await profileFormPage.setHandle(newHandle);
|
||||
await profileFormPage.setBio(newBio);
|
||||
await profileFormPage.setLinks(newLinks);
|
||||
await profileFormPage.saveChanges();
|
||||
|
||||
expect(await profileFormPage.getDisplayName()).toBe(newDisplayName);
|
||||
expect(await profileFormPage.getHandle()).toBe(newHandle);
|
||||
expect(await profileFormPage.getBio()).toBe(newBio);
|
||||
for (let i = 1; i <= 5; i++) {
|
||||
expect(await profileFormPage.getLink(i)).toBe(newLinks[i - 1]);
|
||||
}
|
||||
|
||||
await page.reload();
|
||||
await expect(profileFormPage.isLoaded()).resolves.toBeTruthy();
|
||||
|
||||
expect(await profileFormPage.getDisplayName()).toBe(newDisplayName);
|
||||
expect(await profileFormPage.getHandle()).toBe(newHandle);
|
||||
expect(await profileFormPage.getBio()).toBe(newBio);
|
||||
for (let i = 1; i <= 5; i++) {
|
||||
expect(await profileFormPage.getLink(i)).toBe(newLinks[i - 1]);
|
||||
}
|
||||
});
|
||||
|
||||
// Currently we are not using hook form inside the profile form, so cancel button is not working as expected, once that's fixed, we can unskip this test
|
||||
test.skip("can cancel profile changes", async ({ page }) => {
|
||||
await profileFormPage.navbar.clickProfileLink();
|
||||
|
||||
await expect(profileFormPage.isLoaded()).resolves.toBeTruthy();
|
||||
await hasUrl(page, new RegExp("/profile"));
|
||||
|
||||
const originalDisplayName = await profileFormPage.getDisplayName();
|
||||
const originalHandle = await profileFormPage.getHandle();
|
||||
const originalBio = await profileFormPage.getBio();
|
||||
const originalLinks: string[] = [];
|
||||
for (let i = 1; i <= 5; i++) {
|
||||
originalLinks.push(await profileFormPage.getLink(i));
|
||||
}
|
||||
|
||||
const suffix = `${Date.now().toString().slice(-6)}_cancel`;
|
||||
await profileFormPage.setDisplayName(`Tmp Name ${suffix}`);
|
||||
await profileFormPage.setHandle(`tmpuser${suffix}`);
|
||||
await profileFormPage.setBio(`Tmp bio ${suffix}`);
|
||||
for (let i = 1; i <= 5; i++) {
|
||||
await profileFormPage.setLink(i, `https://tmp.example/${suffix}/${i}`);
|
||||
}
|
||||
|
||||
await profileFormPage.clickCancel();
|
||||
|
||||
expect(await profileFormPage.getDisplayName()).toBe(originalDisplayName);
|
||||
expect(await profileFormPage.getHandle()).toBe(originalHandle);
|
||||
expect(await profileFormPage.getBio()).toBe(originalBio);
|
||||
for (let i = 1; i <= 5; i++) {
|
||||
expect(await profileFormPage.getLink(i)).toBe(originalLinks[i - 1]);
|
||||
}
|
||||
});
|
||||
});
|
||||
91
autogpt_platform/frontend/src/tests/settings.spec.ts
Normal file
91
autogpt_platform/frontend/src/tests/settings.spec.ts
Normal file
@@ -0,0 +1,91 @@
|
||||
import test, { expect } from "@playwright/test";
|
||||
import { LoginPage } from "./pages/login.page";
|
||||
import { TEST_CREDENTIALS } from "./credentials";
|
||||
import { hasFieldValue, hasUrl } from "./utils/assertion";
|
||||
import { navigateToSettings, getSwitchState, TOGGLE_IDS } from "./pages/settings.page";
|
||||
|
||||
test.describe("Settings", () => {
|
||||
test.beforeEach(async ({ page }) => {
|
||||
await page.goto("/login");
|
||||
const loginPage = new LoginPage(page);
|
||||
await loginPage.login(TEST_CREDENTIALS.email, TEST_CREDENTIALS.password);
|
||||
await hasUrl(page, "/marketplace");
|
||||
});
|
||||
|
||||
test("settings page redirects to login when not authenticated", async ({ browser }) => {
|
||||
const context = await browser.newContext();
|
||||
const page = await context.newPage();
|
||||
await page.goto("/profile/settings");
|
||||
await hasUrl(page, /\/login/);
|
||||
await context.close();
|
||||
});
|
||||
|
||||
test("user can successfully update settings", async ({ page }) => {
|
||||
const settings = await navigateToSettings(page);
|
||||
|
||||
for (const id of TOGGLE_IDS) {
|
||||
const state = await getSwitchState(settings.getToggle(id));
|
||||
expect.soft(state, `Initial state of ${id}`).toBe(true);
|
||||
}
|
||||
|
||||
// Turn all ON, change email/password, save
|
||||
for (const id of TOGGLE_IDS) {
|
||||
await settings.disable(id);
|
||||
}
|
||||
const tempEmail = `temp+e2e@example.com`;
|
||||
await settings.setEmail(tempEmail);
|
||||
await settings.setPassword("temporarypassword123");
|
||||
await settings.setConfirmPassword("temporarypassword123");
|
||||
await settings.saveChanges();
|
||||
|
||||
// Reload and verify change email/password and OFF persisted
|
||||
await settings.goto();
|
||||
for (const id of TOGGLE_IDS) {
|
||||
const state = await getSwitchState(settings.getToggle(id));
|
||||
expect.soft(state, `Persisted OFF state of ${id}`).toBe(false);
|
||||
}
|
||||
await hasFieldValue(settings.getEmailInput(), tempEmail);
|
||||
await hasFieldValue(settings.getPasswordInput(), "temporarypassword123");
|
||||
|
||||
// Restore original test email and password
|
||||
await settings.setEmail(TEST_CREDENTIALS.email);
|
||||
await settings.setPassword(TEST_CREDENTIALS.password);
|
||||
await settings.setConfirmPassword(TEST_CREDENTIALS.password);
|
||||
for (const id of TOGGLE_IDS) {
|
||||
await settings.enable(id);
|
||||
}
|
||||
await settings.saveChanges();
|
||||
|
||||
// Reload and verify change email/password and ON persisted
|
||||
await settings.goto();
|
||||
for (const id of TOGGLE_IDS) {
|
||||
const state = await getSwitchState(settings.getToggle(id));
|
||||
expect.soft(state, `Persisted ON state of ${id}`).toBe(true);
|
||||
}
|
||||
await hasFieldValue(settings.getEmailInput(), TEST_CREDENTIALS.email);
|
||||
await hasFieldValue(settings.getPasswordInput(), TEST_CREDENTIALS.password);
|
||||
});
|
||||
|
||||
test("user can cancel changes", async ({ page }) => {
|
||||
const settings = await navigateToSettings(page);
|
||||
|
||||
const initialEmail = await settings.getEmailValue();
|
||||
await settings.setEmail("settings+cancel@example.com");
|
||||
|
||||
await settings.cancelChanges();
|
||||
await hasFieldValue(settings.getEmailInput(), initialEmail);
|
||||
});
|
||||
|
||||
test("settings form shows validation errors for invalid inputs", async ({ page }) => {
|
||||
const settings = await navigateToSettings(page);
|
||||
|
||||
await settings.setEmail("invalid-email");
|
||||
await settings.setPassword("short");
|
||||
await settings.setConfirmPassword("short");
|
||||
|
||||
await settings.saveChanges();
|
||||
|
||||
await settings.expectValidationError("Invalid email");
|
||||
await settings.expectValidationError("String must contain at least 12 character(s)");
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user