Compare commits

..

2 Commits

Author SHA1 Message Date
Abhimanyu Yadav
bea287957d Merge branch 'dev' into abhimanyuyadav/open-2604-create-e2e-tests-for-settings-page 2025-08-11 09:36:22 +05:30
abhi1992002
84d7b05cd2 feat(frontend): enhance SettingsForm with data-testid attributes for improved testing
Added data-testid attributes to various input fields and switches in the SettingsForm component to facilitate end-to-end testing. This includes email, password, confirmation password fields, and notification switches for agent run, block execution failures, continuous agent errors, zero balance, low balance, daily summary, weekly summary, and monthly summary. Also added a data-testid for the cancel button.
2025-08-10 12:14:30 +05:30
19 changed files with 330 additions and 1407 deletions

View File

@@ -33,7 +33,7 @@ from prisma.types import (
AgentNodeExecutionUpdateInput,
AgentNodeExecutionWhereInput,
)
from pydantic import BaseModel, ConfigDict, JsonValue, ValidationError
from pydantic import BaseModel, ConfigDict, JsonValue
from pydantic.fields import Field
from backend.server.v2.store.exceptions import DatabaseError
@@ -59,7 +59,7 @@ from .includes import (
GRAPH_EXECUTION_INCLUDE_WITH_NODES,
graph_execution_include,
)
from .model import GraphExecutionStats, NodeExecutionStats
from .model import GraphExecutionStats
T = TypeVar("T")
@@ -318,30 +318,18 @@ class NodeExecutionResult(BaseModel):
@staticmethod
def from_db(_node_exec: AgentNodeExecution, user_id: Optional[str] = None):
try:
stats = NodeExecutionStats.model_validate(_node_exec.stats or {})
except (ValueError, ValidationError):
stats = NodeExecutionStats()
if stats.cleared_inputs:
input_data: BlockInput = defaultdict()
for name, messages in stats.cleared_inputs.items():
input_data[name] = messages[-1] if messages else ""
elif _node_exec.executionData:
if _node_exec.executionData:
# Execution that has been queued for execution will persist its data.
input_data = type_utils.convert(_node_exec.executionData, dict[str, Any])
else:
# For incomplete execution, executionData will not be yet available.
input_data: BlockInput = defaultdict()
for data in _node_exec.Input or []:
input_data[data.name] = type_utils.convert(data.data, type[Any])
output_data: CompletedBlockOutput = defaultdict(list)
if stats.cleared_outputs:
for name, messages in stats.cleared_outputs.items():
output_data[name].extend(messages)
else:
for data in _node_exec.Output or []:
output_data[data.name].append(type_utils.convert(data.data, type[Any]))
for data in _node_exec.Output or []:
output_data[data.name].append(type_utils.convert(data.data, type[Any]))
graph_execution: AgentGraphExecution | None = _node_exec.GraphExecution
if graph_execution:

View File

@@ -655,9 +655,6 @@ class NodeExecutionStats(BaseModel):
output_token_count: int = 0
extra_cost: int = 0
extra_steps: int = 0
# Moderation fields
cleared_inputs: Optional[dict[str, list[str]]] = None
cleared_outputs: Optional[dict[str, list[str]]] = None
def __iadd__(self, other: "NodeExecutionStats") -> "NodeExecutionStats":
"""Mutate this instance by adding another NodeExecutionStats."""

View File

@@ -27,7 +27,7 @@ from backend.executor.activity_status_generator import (
)
from backend.executor.utils import LogMetadata
from backend.notifications.notifications import queue_notification
from backend.util.exceptions import InsufficientBalanceError, ModerationError
from backend.util.exceptions import InsufficientBalanceError
if TYPE_CHECKING:
from backend.executor import DatabaseManagerClient, DatabaseManagerAsyncClient
@@ -67,7 +67,6 @@ from backend.executor.utils import (
validate_exec,
)
from backend.integrations.creds_manager import IntegrationCredentialsManager
from backend.server.v2.AutoMod.manager import automod_manager
from backend.util import json
from backend.util.clients import (
get_async_execution_event_bus,
@@ -760,22 +759,6 @@ class ExecutionProcessor:
amount=1,
)
# Input moderation
try:
if moderation_error := asyncio.run_coroutine_threadsafe(
automod_manager.moderate_graph_execution_inputs(
db_client=get_db_async_client(),
graph_exec=graph_exec,
),
self.node_evaluation_loop,
).result(timeout=30.0):
raise moderation_error
except asyncio.TimeoutError:
log_metadata.warning(
f"Input moderation timed out for graph execution {graph_exec.graph_exec_id}, bypassing moderation and continuing execution"
)
# Continue execution without moderation
# ------------------------------------------------------------
# Prepopulate queue ---------------------------------------
# ------------------------------------------------------------
@@ -914,25 +897,6 @@ class ExecutionProcessor:
time.sleep(0.1)
# loop done --------------------------------------------------
# Output moderation
try:
if moderation_error := asyncio.run_coroutine_threadsafe(
automod_manager.moderate_graph_execution_outputs(
db_client=get_db_async_client(),
graph_exec_id=graph_exec.graph_exec_id,
user_id=graph_exec.user_id,
graph_id=graph_exec.graph_id,
),
self.node_evaluation_loop,
).result(timeout=30.0):
raise moderation_error
except asyncio.TimeoutError:
log_metadata.warning(
f"Output moderation timed out for graph execution {graph_exec.graph_exec_id}, bypassing moderation and continuing execution"
)
# Continue execution without moderation
# Determine final execution status based on whether there was an error or termination
if cancel.is_set():
execution_status = ExecutionStatus.TERMINATED
@@ -953,12 +917,11 @@ class ExecutionProcessor:
else Exception(f"{e.__class__.__name__}: {e}")
)
known_errors = (InsufficientBalanceError, ModerationError)
known_errors = (InsufficientBalanceError,)
if isinstance(error, known_errors):
execution_stats.error = str(error)
return ExecutionStatus.FAILED
execution_status = ExecutionStatus.FAILED
log_metadata.exception(
f"Failed graph execution {graph_exec.graph_exec_id}: {error}"
)

View File

@@ -1,13 +1,7 @@
import asyncio
import faulthandler
import io
import logging
import os
import signal
import sys
import threading
import traceback
from datetime import datetime
from enum import Enum
from typing import Optional
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse
@@ -253,140 +247,11 @@ class Scheduler(AppService):
if not self.scheduler.running:
raise UnhealthyServiceError("Scheduler is not running")
# Update health check timestamp for monitoring
self._update_health_check_time()
return await super().health_check()
def _signal_thread_dump_handler(self, signum, frame):
"""Signal handler for SIGUSR2 - dumps threads to stderr even when FastAPI is stuck"""
try:
import sys
sys.stderr.write(f"\n{'='*80}\n")
sys.stderr.write(f"SIGNAL THREAD DUMP - {datetime.now()}\n")
sys.stderr.write(f"Signal: {signum}, PID: {os.getpid()}\n")
sys.stderr.write(f"Total threads: {threading.active_count()}\n")
sys.stderr.write(f"{'='*80}\n")
current_frames = sys._current_frames()
threads = threading.enumerate()
for i, thread in enumerate(threads, 1):
sys.stderr.write(f"\n[{i}] Thread: {thread.name}\n")
sys.stderr.write(f" ID: {thread.ident}, Daemon: {thread.daemon}\n")
thread_frame = (
current_frames.get(thread.ident) if thread.ident else None
)
if thread_frame:
sys.stderr.write(" Stack:\n")
stack = traceback.extract_stack(thread_frame)
for j, (filename, lineno, name, line) in enumerate(stack[-12:]):
indent = " " + (" " * min(j, 8))
short_file = (
filename.split("/")[-1] if "/" in filename else filename
)
sys.stderr.write(f"{indent}{short_file}:{lineno} in {name}()\n")
if line and line.strip():
sys.stderr.write(f"{indent}{line.strip()}\n")
else:
sys.stderr.write(" No frame available\n")
# Scheduler info
sys.stderr.write(f"\n{'='*40}\n")
sys.stderr.write("SCHEDULER STATE:\n")
if hasattr(self, "scheduler") and self.scheduler:
sys.stderr.write(f"Running: {self.scheduler.running}\n")
try:
jobs = self.scheduler.get_jobs()
sys.stderr.write(f"Jobs: {len(jobs)}\n")
except Exception:
sys.stderr.write("Jobs: Error getting jobs\n")
else:
sys.stderr.write("Scheduler: Not initialized\n")
sys.stderr.write(f"{'='*80}\n")
sys.stderr.write("END SIGNAL THREAD DUMP\n")
sys.stderr.write(f"{'='*80}\n\n")
sys.stderr.flush()
except Exception as e:
import sys
sys.stderr.write(f"Error in signal handler: {e}\n")
sys.stderr.flush()
def _start_periodic_thread_dump(self):
"""Start background thread for periodic thread dumps"""
def periodic_dump():
import time
while True:
try:
time.sleep(300) # 5 minutes
# Only dump if we detect potential issues
current_time = time.time()
if hasattr(self, "_last_health_check"):
time_since_health = current_time - self._last_health_check
if time_since_health > 60: # No health check in 60 seconds
logger.warning(
"No health check in 60s, dumping threads for monitoring"
)
self._signal_thread_dump_handler(0, None)
# Also check if scheduler seems stuck
if hasattr(self, "scheduler") and self.scheduler:
try:
jobs = self.scheduler.get_jobs()
# Log periodic status
logger.info(
f"Periodic check: {len(jobs)} active jobs, {threading.active_count()} threads"
)
except Exception as e:
logger.warning(
f"Periodic check failed, dumping threads: {e}"
)
self._signal_thread_dump_handler(0, None)
except Exception as e:
logger.error(f"Error in periodic thread dump: {e}")
# Start daemon thread for periodic monitoring
dump_thread = threading.Thread(
target=periodic_dump, daemon=True, name="PeriodicThreadDump"
)
dump_thread.start()
logger.info("Periodic thread dump monitor started")
def _update_health_check_time(self):
"""Update last health check time for monitoring"""
import time
self._last_health_check = time.time()
def run_service(self):
load_dotenv()
# Enable faulthandler for debugging deadlocks
faulthandler.enable()
# Register SIGUSR1 to dump all thread stacks on demand
faulthandler.register(signal.SIGUSR1, all_threads=True)
# Also register SIGUSR2 for custom thread dump (in case faulthandler doesn't work)
signal.signal(signal.SIGUSR2, self._signal_thread_dump_handler)
# Start periodic thread dump for monitoring
self._start_periodic_thread_dump()
logger.info(
"Faulthandler enabled. Send SIGUSR1 or SIGUSR2 to dump thread stacks. Periodic dumps every 5 minutes."
)
# Initialize the event loop for async jobs
global _event_loop
_event_loop = asyncio.new_event_loop()
@@ -619,102 +484,6 @@ class Scheduler(AppService):
"""Manually trigger cleanup of expired cloud storage files."""
return cleanup_expired_files()
@expose
def debug_thread_dump(self) -> str:
"""Get comprehensive thread dump for debugging deadlocks."""
try:
# Create string buffer to capture thread info
output = io.StringIO()
# Header
output.write(f"SCHEDULER THREAD DUMP - {datetime.now()}\n")
output.write("=" * 80 + "\n")
output.write(f"Process PID: {os.getpid()}\n")
output.write(f"Total threads: {threading.active_count()}\n\n")
# Get all threads with stack traces
current_frames = sys._current_frames()
threads = threading.enumerate()
for i, thread in enumerate(threads, 1):
output.write(f"[{i}/{len(threads)}] Thread: {thread.name}\n")
output.write(f" ID: {thread.ident}\n")
output.write(f" Daemon: {thread.daemon}\n")
output.write(f" Alive: {thread.is_alive()}\n")
# Get target if available (internal attribute)
if hasattr(thread, "_target") and getattr(thread, "_target", None):
output.write(f" Target: {getattr(thread, '_target')}\n")
# Get stack trace
frame = current_frames.get(thread.ident) if thread.ident else None
if frame:
output.write(" Stack trace:\n")
stack = traceback.extract_stack(frame)
for j, (filename, lineno, name, line) in enumerate(stack):
indent = " " + (" " * min(j, 6))
short_file = (
filename.split("/")[-1] if "/" in filename else filename
)
output.write(f"{indent}[{j+1}] {short_file}:{lineno}\n")
output.write(f"{indent} in {name}()\n")
if line and line.strip():
output.write(f"{indent}{line.strip()}\n")
else:
output.write(" ⚠️ No frame available\n")
output.write("\n" + "-" * 60 + "\n")
# Scheduler state info
output.write("\nSCHEDULER STATE:\n")
output.write("=" * 40 + "\n")
if hasattr(self, "scheduler") and self.scheduler:
output.write(f"Scheduler running: {self.scheduler.running}\n")
try:
jobs = self.scheduler.get_jobs()
output.write(f"Active jobs: {len(jobs)}\n")
for job in jobs[:5]: # First 5 jobs
output.write(f" {job.id}: next run {job.next_run_time}\n")
except Exception as e:
output.write(f"Error getting jobs: {e}\n")
else:
output.write("Scheduler not initialized\n")
# Event loop info
output.write("\nEVENT LOOP STATE:\n")
output.write("=" * 40 + "\n")
global _event_loop
if _event_loop:
output.write(f"Event loop running: {_event_loop.is_running()}\n")
try:
import asyncio
tasks = asyncio.all_tasks(_event_loop)
output.write(f"Active tasks: {len(tasks)}\n")
for task in list(tasks)[:5]: # First 5 tasks
output.write(f" {task.get_name()}: {task._state}\n")
except Exception as e:
output.write(f"Error getting tasks: {e}\n")
else:
output.write("Event loop not initialized\n")
output.write("\n" + "=" * 80 + "\n")
output.write("END THREAD DUMP\n")
result = output.getvalue()
output.close()
# Also log that we got a thread dump request
logger.info("Thread dump requested via HTTP endpoint")
return result
except Exception as e:
error_msg = f"Error generating thread dump: {type(e).__name__}: {e}"
logger.error(error_msg)
return error_msg
class SchedulerClient(AppServiceClient):
@classmethod

View File

@@ -1 +0,0 @@
# AutoMod integration for content moderation

View File

@@ -1,353 +0,0 @@
import asyncio
import json
import logging
from typing import TYPE_CHECKING, Any, Literal
if TYPE_CHECKING:
from backend.executor import DatabaseManagerAsyncClient
from autogpt_libs.feature_flag.client import is_feature_enabled
from pydantic import ValidationError
from backend.data.execution import ExecutionStatus
from backend.server.v2.AutoMod.models import (
AutoModRequest,
AutoModResponse,
ModerationConfig,
)
from backend.util.exceptions import ModerationError
from backend.util.request import Requests
from backend.util.settings import Settings
logger = logging.getLogger(__name__)
class AutoModManager:
def __init__(self):
self.config = self._load_config()
def _load_config(self) -> ModerationConfig:
"""Load AutoMod configuration from settings"""
settings = Settings()
return ModerationConfig(
enabled=settings.config.automod_enabled,
api_url=settings.config.automod_api_url,
api_key=settings.secrets.automod_api_key,
timeout=settings.config.automod_timeout,
retry_attempts=settings.config.automod_retry_attempts,
retry_delay=settings.config.automod_retry_delay,
fail_open=settings.config.automod_fail_open,
)
async def moderate_graph_execution_inputs(
self, db_client: "DatabaseManagerAsyncClient", graph_exec, timeout: int = 10
) -> Exception | None:
"""
Complete input moderation flow for graph execution
Returns: error_if_failed (None means success)
"""
if not self.config.enabled:
return None
# Check if AutoMod feature is enabled for this user
if not is_feature_enabled("AutoMod", graph_exec.user_id, default=False):
logger.debug(f"AutoMod feature not enabled for user {graph_exec.user_id}")
return None
# Get graph model and collect all inputs
graph_model = await db_client.get_graph(
graph_exec.graph_id,
user_id=graph_exec.user_id,
version=graph_exec.graph_version,
)
if not graph_model or not graph_model.nodes:
return None
all_inputs = []
for node in graph_model.nodes:
if node.input_default:
all_inputs.extend(str(v) for v in node.input_default.values() if v)
if (masks := graph_exec.nodes_input_masks) and (mask := masks.get(node.id)):
all_inputs.extend(str(v) for v in mask.values() if v)
if not all_inputs:
return None
# Combine all content and moderate directly
content = " ".join(all_inputs)
# Run moderation
logger.warning(
f"Moderating inputs for graph execution {graph_exec.graph_exec_id}"
)
try:
moderation_passed = await self._moderate_content(
content,
{
"user_id": graph_exec.user_id,
"graph_id": graph_exec.graph_id,
"graph_exec_id": graph_exec.graph_exec_id,
"moderation_type": "execution_input",
},
)
if not moderation_passed:
logger.warning(
f"Moderation failed for graph execution {graph_exec.graph_exec_id}"
)
# Update node statuses for frontend display before raising error
await self._update_failed_nodes_for_moderation(
db_client, graph_exec.graph_exec_id, "input"
)
return ModerationError(
message="Execution failed due to input content moderation",
user_id=graph_exec.user_id,
graph_exec_id=graph_exec.graph_exec_id,
moderation_type="input",
)
return None
except asyncio.TimeoutError:
logger.warning(
f"Input moderation timed out for graph execution {graph_exec.graph_exec_id}, bypassing moderation"
)
return None # Bypass moderation on timeout
except Exception as e:
logger.warning(f"Input moderation execution failed: {e}")
return ModerationError(
message="Execution failed due to input content moderation error",
user_id=graph_exec.user_id,
graph_exec_id=graph_exec.graph_exec_id,
moderation_type="input",
)
async def moderate_graph_execution_outputs(
self,
db_client: "DatabaseManagerAsyncClient",
graph_exec_id: str,
user_id: str,
graph_id: str,
timeout: int = 10,
) -> Exception | None:
"""
Complete output moderation flow for graph execution
Returns: error_if_failed (None means success)
"""
if not self.config.enabled:
return None
# Check if AutoMod feature is enabled for this user
if not is_feature_enabled("AutoMod", user_id, default=False):
logger.debug(f"AutoMod feature not enabled for user {user_id}")
return None
# Get completed executions and collect outputs
completed_executions = await db_client.get_node_executions(
graph_exec_id, statuses=[ExecutionStatus.COMPLETED], include_exec_data=True
)
if not completed_executions:
return None
all_outputs = []
for exec_entry in completed_executions:
if exec_entry.output_data:
all_outputs.extend(str(v) for v in exec_entry.output_data.values() if v)
if not all_outputs:
return None
# Combine all content and moderate directly
content = " ".join(all_outputs)
# Run moderation
logger.warning(f"Moderating outputs for graph execution {graph_exec_id}")
try:
moderation_passed = await self._moderate_content(
content,
{
"user_id": user_id,
"graph_id": graph_id,
"graph_exec_id": graph_exec_id,
"moderation_type": "execution_output",
},
)
if not moderation_passed:
logger.warning(f"Moderation failed for graph execution {graph_exec_id}")
# Update node statuses for frontend display before raising error
await self._update_failed_nodes_for_moderation(
db_client, graph_exec_id, "output"
)
return ModerationError(
message="Execution failed due to output content moderation",
user_id=user_id,
graph_exec_id=graph_exec_id,
moderation_type="output",
)
return None
except asyncio.TimeoutError:
logger.warning(
f"Output moderation timed out for graph execution {graph_exec_id}, bypassing moderation"
)
return None # Bypass moderation on timeout
except Exception as e:
logger.warning(f"Output moderation execution failed: {e}")
return ModerationError(
message="Execution failed due to output content moderation error",
user_id=user_id,
graph_exec_id=graph_exec_id,
moderation_type="output",
)
async def _update_failed_nodes_for_moderation(
self,
db_client: "DatabaseManagerAsyncClient",
graph_exec_id: str,
moderation_type: Literal["input", "output"],
):
"""Update node execution statuses for frontend display when moderation fails"""
# Import here to avoid circular imports
from backend.executor.manager import send_async_execution_update
if moderation_type == "input":
# For input moderation, mark queued/running/incomplete nodes as failed
target_statuses = [
ExecutionStatus.QUEUED,
ExecutionStatus.RUNNING,
ExecutionStatus.INCOMPLETE,
]
else:
# For output moderation, mark completed nodes as failed
target_statuses = [ExecutionStatus.COMPLETED]
# Get the executions that need to be updated
executions_to_update = await db_client.get_node_executions(
graph_exec_id, statuses=target_statuses, include_exec_data=True
)
if not executions_to_update:
return
# Prepare database update tasks
exec_updates = []
for exec_entry in executions_to_update:
# Collect all input and output names to clear
cleared_inputs = {}
cleared_outputs = {}
if exec_entry.input_data:
for name in exec_entry.input_data.keys():
cleared_inputs[name] = ["Failed due to content moderation"]
if exec_entry.output_data:
for name in exec_entry.output_data.keys():
cleared_outputs[name] = ["Failed due to content moderation"]
# Add update task to list
exec_updates.append(
db_client.update_node_execution_status(
exec_entry.node_exec_id,
status=ExecutionStatus.FAILED,
stats={
"error": "Failed due to content moderation",
"cleared_inputs": cleared_inputs,
"cleared_outputs": cleared_outputs,
},
)
)
# Execute all database updates in parallel
updated_execs = await asyncio.gather(*exec_updates)
# Send all websocket updates in parallel
await asyncio.gather(
*[
send_async_execution_update(updated_exec)
for updated_exec in updated_execs
]
)
async def _moderate_content(self, content: str, metadata: dict[str, Any]) -> bool:
"""Moderate content using AutoMod API
Returns:
True: Content approved or timeout occurred
False: Content rejected by moderation
Raises:
asyncio.TimeoutError: When moderation times out (should be bypassed)
"""
try:
request_data = AutoModRequest(
type="text",
content=content,
metadata=metadata,
)
response = await self._make_request(request_data)
if response.success and response.status == "approved":
logger.debug(
f"Content approved for {metadata.get('graph_exec_id', 'unknown')}"
)
return True
else:
reasons = [r.reason for r in response.moderation_results if r.reason]
error_msg = f"Content rejected by AutoMod: {'; '.join(reasons)}"
logger.warning(f"Content rejected: {error_msg}")
return False
except asyncio.TimeoutError:
# Re-raise timeout to be handled by calling methods
logger.warning(
f"AutoMod API timeout for {metadata.get('graph_exec_id', 'unknown')}"
)
raise
except Exception as e:
logger.error(f"AutoMod moderation error: {e}")
return self.config.fail_open
async def _make_request(self, request_data: AutoModRequest) -> AutoModResponse:
"""Make HTTP request to AutoMod API using the standard request utility"""
url = f"{self.config.api_url}/moderate"
headers = {
"Content-Type": "application/json",
"X-API-Key": self.config.api_key,
}
# Create requests instance with timeout and retry configuration
requests = Requests(
extra_headers=headers,
retry_max_wait=float(self.config.timeout),
)
try:
response = await requests.post(
url, json=request_data.model_dump(), timeout=self.config.timeout
)
response_data = response.json()
return AutoModResponse.model_validate(response_data)
except asyncio.TimeoutError:
# Re-raise timeout error to be caught by _moderate_content
raise
except (json.JSONDecodeError, ValidationError) as e:
raise Exception(f"Invalid response from AutoMod API: {e}")
except Exception as e:
# Check if this is an aiohttp timeout that we should convert
if "timeout" in str(e).lower():
raise asyncio.TimeoutError(f"AutoMod API request timed out: {e}")
raise Exception(f"AutoMod API request failed: {e}")
# Global instance
automod_manager = AutoModManager()

View File

@@ -1,57 +0,0 @@
from typing import Any, Dict, List, Optional
from pydantic import BaseModel, Field
class AutoModRequest(BaseModel):
"""Request model for AutoMod API"""
type: str = Field(..., description="Content type - 'text', 'image', 'video'")
content: str = Field(..., description="The content to moderate")
metadata: Optional[Dict[str, Any]] = Field(
default=None, description="Additional context about the content"
)
class ModerationResult(BaseModel):
"""Individual moderation result"""
decision: str = Field(
..., description="Moderation decision: 'approved', 'rejected', 'flagged'"
)
reason: Optional[str] = Field(default=None, description="Reason for the decision")
class AutoModResponse(BaseModel):
"""Response model for AutoMod API"""
success: bool = Field(..., description="Whether the request was successful")
status: str = Field(
..., description="Overall status: 'approved', 'rejected', 'flagged', 'pending'"
)
moderation_results: List[ModerationResult] = Field(
default_factory=list, description="List of moderation results"
)
class ModerationConfig(BaseModel):
"""Configuration for AutoMod integration"""
enabled: bool = Field(default=True, description="Whether moderation is enabled")
api_url: str = Field(default="", description="AutoMod API base URL")
api_key: str = Field(..., description="AutoMod API key")
timeout: int = Field(default=30, description="Request timeout in seconds")
retry_attempts: int = Field(default=3, description="Number of retry attempts")
retry_delay: float = Field(
default=1.0, description="Delay between retries in seconds"
)
fail_open: bool = Field(
default=False,
description="If True, allow execution to continue if moderation fails",
)
moderate_inputs: bool = Field(
default=True, description="Whether to moderate block inputs"
)
moderate_outputs: bool = Field(
default=True, description="Whether to moderate block outputs"
)

View File

@@ -33,33 +33,6 @@ class InsufficientBalanceError(ValueError):
return self.message
class ModerationError(ValueError):
"""Content moderation failure during execution"""
user_id: str
message: str
graph_exec_id: str
moderation_type: str
def __init__(
self,
message: str,
user_id: str,
graph_exec_id: str,
moderation_type: str = "content",
):
super().__init__(message)
self.args = (message, user_id, graph_exec_id, moderation_type)
self.message = message
self.user_id = user_id
self.graph_exec_id = graph_exec_id
self.moderation_type = moderation_type
def __str__(self):
"""Used to display the error message in the frontend, because we str() the error when sending the execution update"""
return self.message
class GraphValidationError(ValueError):
"""Structured validation error for graph validation failures"""

View File

@@ -295,32 +295,6 @@ class Config(UpdateTrackingModel["Config"], BaseSettings):
description="Maximum file size in MB for file uploads (1-1024 MB)",
)
# AutoMod configuration
automod_enabled: bool = Field(
default=False,
description="Whether AutoMod content moderation is enabled",
)
automod_api_url: str = Field(
default="",
description="AutoMod API base URL - Make sure it ends in /api",
)
automod_timeout: int = Field(
default=30,
description="Timeout in seconds for AutoMod API requests",
)
automod_retry_attempts: int = Field(
default=3,
description="Number of retry attempts for AutoMod API requests",
)
automod_retry_delay: float = Field(
default=1.0,
description="Delay between retries for AutoMod API requests in seconds",
)
automod_fail_open: bool = Field(
default=False,
description="If True, allow execution to continue if AutoMod fails",
)
@field_validator("platform_base_url", "frontend_base_url")
@classmethod
def validate_platform_base_url(cls, v: str, info: ValidationInfo) -> str:
@@ -521,10 +495,6 @@ class Secrets(UpdateTrackingModel["Secrets"], BaseSettings):
apollo_api_key: str = Field(default="", description="Apollo API Key")
smartlead_api_key: str = Field(default="", description="SmartLead API Key")
zerobounce_api_key: str = Field(default="", description="ZeroBounce API Key")
# AutoMod API credentials
automod_api_key: str = Field(default="", description="AutoMod API key")
ayrshare_api_key: str = Field(default="", description="Ayrshare API Key")
ayrshare_jwt_key: str = Field(default="", description="Ayrshare private Key")
# Add more secret fields as needed

View File

@@ -6737,4 +6737,4 @@ cffi = ["cffi (>=1.11)"]
[metadata]
lock-version = "2.1"
python-versions = ">=3.10,<3.13"
content-hash = "795414d7ce8f288ea6c65893268b5c29a7c9a60ad75cde28ac7bcdb65f426dfe"
content-hash = "05e2b99bd6dc5a74a89df0e1e504853b66bd519062159b0de5fbedf6a1f4d986"

View File

@@ -10,7 +10,6 @@ packages = [{ include = "backend", format = "sdist" }]
[tool.poetry.dependencies]
python = ">=3.10,<3.13"
aio-pika = "^9.5.5"
aiohttp = "^3.10.0"
aiodns = "^3.5.0"
anthropic = "^0.59.0"
apscheduler = "^3.11.0"

View File

@@ -1,353 +0,0 @@
#!/usr/bin/env python3
"""
Unified scheduler debugging tool
- Test deployment
- Collect thread dumps (signal-based, works when FastAPI is stuck)
- Monitor periodic dumps
"""
import subprocess
import sys
import time
from datetime import datetime
import requests
def find_scheduler_pod():
"""Find the running scheduler pod"""
result = subprocess.run(
"kubectl get pods -n dev-agpt --no-headers".split(),
capture_output=True,
text=True,
)
for line in result.stdout.split("\n"):
if "scheduler-server" in line and "Running" in line:
return line.split()[0]
return None
def test_deployment():
"""Test if the deployment has debugging enabled"""
print("🧪 TESTING SCHEDULER DEBUG DEPLOYMENT")
print("=" * 50)
pod_name = find_scheduler_pod()
if not pod_name:
print("❌ No scheduler pod found")
return False
print(f"📍 Pod: {pod_name}")
# Check if faulthandler is enabled
print("🔍 Checking faulthandler setup...")
log_result = subprocess.run(
f"kubectl logs -n dev-agpt {pod_name} --tail=50".split(),
capture_output=True,
text=True,
)
faulthandler_enabled = "Faulthandler enabled" in log_result.stdout
periodic_enabled = "Periodic thread dump monitor started" in log_result.stdout
if faulthandler_enabled:
print("✅ Faulthandler is enabled")
else:
print("❌ Faulthandler not found in logs")
if periodic_enabled:
print("✅ Periodic monitoring is enabled")
else:
print("❌ Periodic monitoring not found in logs")
# Test signal sending
print("\\n📡 Testing signal delivery...")
signal_result = subprocess.run(
f"kubectl exec -n dev-agpt {pod_name} -- kill -USR2 1".split(),
capture_output=True,
text=True,
)
if signal_result.returncode == 0:
print("✅ Signal sent successfully")
time.sleep(2)
# Check for thread dump in logs
new_logs = subprocess.run(
f"kubectl logs -n dev-agpt {pod_name} --tail=20".split(),
capture_output=True,
text=True,
)
if "SIGNAL THREAD DUMP" in new_logs.stdout:
print("✅ Thread dump appeared in logs!")
else:
print("⚠️ No thread dump found (might take a moment)")
else:
print(f"❌ Signal failed: {signal_result.stderr}")
# Test HTTP API (should work when not stuck)
print("\\n🌐 Testing HTTP API...")
pf_process = None
try:
pf_process = subprocess.Popen(
f"kubectl port-forward -n dev-agpt {pod_name} 8003:8003".split(),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
time.sleep(2)
response = requests.get("http://localhost:8003/debug_thread_dump", timeout=10)
if response.status_code == 200:
print("✅ HTTP API working")
print(f" Thread count found: {'Total threads:' in response.text}")
else:
print(f"⚠️ HTTP API returned: {response.status_code}")
except Exception as e:
print(f"⚠️ HTTP API failed: {e}")
finally:
if pf_process:
try:
pf_process.terminate()
pf_process.wait()
except Exception:
pass
success = faulthandler_enabled and signal_result.returncode == 0
print(
f"\\n{'✅ DEPLOYMENT TEST PASSED' if success else '❌ DEPLOYMENT TEST FAILED'}"
)
return success
def collect_thread_dump():
"""Collect comprehensive thread dump (works even when scheduler is stuck)"""
print("🚨 COLLECTING THREAD DUMP FROM SCHEDULER")
print("=" * 60)
pod_name = find_scheduler_pod()
if not pod_name:
print("❌ No scheduler pod found")
return False
print(f"📍 Pod: {pod_name}")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Send both signals for maximum coverage
print("📡 Sending signals for thread dumps...")
# SIGUSR1 (faulthandler)
result1 = subprocess.run(
f"kubectl exec -n dev-agpt {pod_name} -- kill -USR1 1".split(),
capture_output=True,
text=True,
)
print(f" SIGUSR1: {'' if result1.returncode == 0 else ''}")
time.sleep(1)
# SIGUSR2 (custom handler)
result2 = subprocess.run(
f"kubectl exec -n dev-agpt {pod_name} -- kill -USR2 1".split(),
capture_output=True,
text=True,
)
print(f" SIGUSR2: {'' if result2.returncode == 0 else ''}")
time.sleep(3) # Give signals time to execute
# Collect logs with thread dumps
print("📋 Collecting logs...")
log_result = subprocess.run(
f"kubectl logs -n dev-agpt {pod_name} --tail=500".split(),
capture_output=True,
text=True,
)
# Save everything
dump_file = f"THREAD_DUMP_{timestamp}.txt"
with open(dump_file, "w") as f:
f.write("SCHEDULER THREAD DUMP COLLECTION\\n")
f.write(f"Timestamp: {datetime.now()}\\n")
f.write(f"Pod: {pod_name}\\n")
f.write("=" * 80 + "\\n\\n")
f.write("FULL LOGS (last 500 lines):\\n")
f.write("-" * 40 + "\\n")
f.write(log_result.stdout)
print(f"💾 Full dump saved: {dump_file}")
# Extract and show thread dump preview
lines = log_result.stdout.split("\\n")
thread_dumps = []
in_dump = False
current_dump = []
for line in lines:
if any(
marker in line
for marker in ["SIGNAL THREAD DUMP", "Fatal Python error", "Thread 0x"]
):
if current_dump:
thread_dumps.append(current_dump)
current_dump = [line]
in_dump = True
elif in_dump and (
"END SIGNAL THREAD DUMP" in line or "Current thread 0x" in line
):
current_dump.append(line)
thread_dumps.append(current_dump)
current_dump = []
in_dump = False
elif in_dump:
current_dump.append(line)
if current_dump:
thread_dumps.append(current_dump)
if thread_dumps:
print(f"\\n🔍 FOUND {len(thread_dumps)} THREAD DUMP(S):")
print("-" * 50)
# Show the most recent/complete dump
latest_dump = thread_dumps[-1]
for i, line in enumerate(latest_dump[:50]): # First 50 lines
print(line)
if len(latest_dump) > 50:
print("... (truncated, see full dump in file)")
# Create separate file with just thread dumps
clean_dump_file = f"CLEAN_THREAD_DUMP_{timestamp}.txt"
with open(clean_dump_file, "w") as f:
f.write(f"EXTRACTED THREAD DUMPS - {datetime.now()}\\n")
f.write("=" * 60 + "\\n\\n")
for i, dump in enumerate(thread_dumps, 1):
f.write(f"DUMP #{i}:\\n")
f.write("-" * 30 + "\\n")
f.write("\\n".join(dump))
f.write("\\n\\n")
print(f"🎯 Clean thread dumps saved: {clean_dump_file}")
else:
print("⚠️ No thread dumps found in logs")
print("Recent log lines:")
for line in lines[-10:]:
print(f" {line}")
# Try HTTP backup (will fail if scheduler is stuck, but worth trying)
print("\\n🌐 Attempting HTTP backup...")
pf_process = None
try:
pf_process = subprocess.Popen(
f"kubectl port-forward -n dev-agpt {pod_name} 8003:8003".split(),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
time.sleep(2)
response = requests.get("http://localhost:8003/debug_thread_dump", timeout=5)
if response.status_code == 200:
http_file = f"HTTP_THREAD_DUMP_{timestamp}.txt"
with open(http_file, "w") as f:
f.write(response.text)
print(f"✅ HTTP backup saved: {http_file}")
else:
print(f"⚠️ HTTP failed: {response.status_code}")
except Exception as e:
print(f"⚠️ HTTP failed (expected if stuck): {e}")
finally:
if pf_process:
try:
pf_process.terminate()
pf_process.wait()
except Exception:
pass
print("\\n✅ COLLECTION COMPLETE!")
return len(thread_dumps) > 0
def monitor_periodic_dumps(duration_minutes=10):
"""Monitor periodic thread dumps for a specified duration"""
print(f"👁️ MONITORING PERIODIC DUMPS FOR {duration_minutes} MINUTES")
print("=" * 50)
pod_name = find_scheduler_pod()
if not pod_name:
print("❌ No scheduler pod found")
return
print(f"📍 Pod: {pod_name}")
print("⏰ Watching for periodic status messages and thread dumps...")
start_time = time.time()
end_time = start_time + (duration_minutes * 60)
# Get current log position (for reference, not used currently)
# Could be used for tracking new vs old logs if needed
while time.time() < end_time:
try:
# Get new logs
current_logs = subprocess.run(
f"kubectl logs -n dev-agpt {pod_name} --tail=50".split(),
capture_output=True,
text=True,
)
for line in current_logs.stdout.split("\\n"):
if "Periodic check:" in line:
print(f"📊 {line}")
elif "SIGNAL THREAD DUMP" in line:
print(f"🚨 Thread dump detected: {line}")
elif "No health check" in line:
print(f"⚠️ Health issue: {line}")
time.sleep(30) # Check every 30 seconds
except KeyboardInterrupt:
print("\\n⏹ Monitoring stopped by user")
break
except Exception as e:
print(f"Error during monitoring: {e}")
break
print("\\n✅ MONITORING COMPLETE")
def main():
if len(sys.argv) < 2:
print("🔧 SCHEDULER DEBUG TOOL")
print("=" * 30)
print("Usage:")
print(" python scheduler_debug.py test - Test deployment")
print(" python scheduler_debug.py collect - Collect thread dump")
print(" python scheduler_debug.py monitor [min] - Monitor periodic dumps")
print(" python scheduler_debug.py all - Run test + collect")
return
command = sys.argv[1].lower()
if command == "test":
test_deployment()
elif command == "collect":
collect_thread_dump()
elif command == "monitor":
duration = int(sys.argv[2]) if len(sys.argv) > 2 else 10
monitor_periodic_dumps(duration)
elif command == "all":
print("Running complete debugging sequence...\\n")
if test_deployment():
print("\\n" + "=" * 50)
collect_thread_dump()
else:
print("❌ Test failed, skipping collection")
else:
print(f"❌ Unknown command: {command}")
if __name__ == "__main__":
main()

View File

@@ -44,7 +44,7 @@ export const SettingsForm = ({
<FormItem>
<FormLabel>Email</FormLabel>
<FormControl>
<Input {...field} type="email" />
<Input {...field} type="email" data-testid="settings-email" />
</FormControl>
<FormMessage />
</FormItem>
@@ -62,6 +62,7 @@ export const SettingsForm = ({
{...field}
type="password"
placeholder="************"
data-testid="settings-password"
/>
</FormControl>
<FormMessage />
@@ -80,6 +81,7 @@ export const SettingsForm = ({
{...field}
type="password"
placeholder="************"
data-testid="settings-confirm-password"
/>
</FormControl>
<FormMessage />
@@ -117,6 +119,7 @@ export const SettingsForm = ({
<Switch
checked={field.value}
onCheckedChange={field.onChange}
data-testid="settings-notify-on-agent-run"
/>
</FormControl>
</FormItem>
@@ -141,6 +144,7 @@ export const SettingsForm = ({
<Switch
checked={field.value}
onCheckedChange={field.onChange}
data-testid="settings-notify-on-block-execution-failed"
/>
</FormControl>
</FormItem>
@@ -164,6 +168,7 @@ export const SettingsForm = ({
<Switch
checked={field.value}
onCheckedChange={field.onChange}
data-testid="settings-notify-on-continuous-agent-error"
/>
</FormControl>
</FormItem>
@@ -193,6 +198,7 @@ export const SettingsForm = ({
<Switch
checked={field.value}
onCheckedChange={field.onChange}
data-testid="settings-notify-on-zero-balance"
/>
</FormControl>
</FormItem>
@@ -216,6 +222,7 @@ export const SettingsForm = ({
<Switch
checked={field.value}
onCheckedChange={field.onChange}
data-testid="settings-notify-on-low-balance"
/>
</FormControl>
</FormItem>
@@ -243,6 +250,7 @@ export const SettingsForm = ({
<Switch
checked={field.value}
onCheckedChange={field.onChange}
data-testid="settings-notify-on-daily-summary"
/>
</FormControl>
</FormItem>
@@ -264,6 +272,7 @@ export const SettingsForm = ({
<Switch
checked={field.value}
onCheckedChange={field.onChange}
data-testid="settings-notify-on-weekly-summary"
/>
</FormControl>
</FormItem>
@@ -285,6 +294,7 @@ export const SettingsForm = ({
<Switch
checked={field.value}
onCheckedChange={field.onChange}
data-testid="settings-notify-on-monthly-summary"
/>
</FormControl>
</FormItem>
@@ -300,6 +310,7 @@ export const SettingsForm = ({
type="button"
onClick={onCancel}
disabled={form.formState.isSubmitting}
data-testid="settings-cancel"
>
Cancel
</Button>

View File

@@ -56,10 +56,7 @@ export function ProfileInfoForm({ profile }: { profile: ProfileDetails }) {
return (
<div className="w-full min-w-[800px] px-4 sm:px-8">
<h1
data-testid="profile-info-form-title"
className="font-circular mb-6 text-[28px] font-normal text-neutral-900 dark:text-neutral-100 sm:mb-8 sm:text-[35px]"
>
<h1 className="font-circular mb-6 text-[28px] font-normal text-neutral-900 dark:text-neutral-100 sm:mb-8 sm:text-[35px]">
Profile
</h1>
@@ -95,18 +92,13 @@ export function ProfileInfoForm({ profile }: { profile: ProfileDetails }) {
<form className="space-y-4 sm:space-y-6" onSubmit={submitForm}>
<div className="w-full">
<label
htmlFor="displayName"
className="font-circular mb-1.5 block text-base font-normal leading-tight text-neutral-700 dark:text-neutral-300"
>
<label className="font-circular mb-1.5 block text-base font-normal leading-tight text-neutral-700 dark:text-neutral-300">
Display name
</label>
<div className="rounded-[55px] border border-slate-200 px-4 py-2.5 dark:border-slate-700 dark:bg-slate-800">
<input
type="text"
id="displayName"
name="displayName"
data-testid="profile-info-form-display-name"
defaultValue={profileData.name}
placeholder="Enter your display name"
className="font-circular w-full border-none bg-transparent text-base font-normal text-neutral-900 placeholder:text-neutral-400 focus:outline-none dark:text-white dark:placeholder:text-neutral-500"
@@ -122,17 +114,13 @@ export function ProfileInfoForm({ profile }: { profile: ProfileDetails }) {
</div>
<div className="w-full">
<label
htmlFor="handle"
className="font-circular mb-1.5 block text-base font-normal leading-tight text-neutral-700 dark:text-neutral-300"
>
<label className="font-circular mb-1.5 block text-base font-normal leading-tight text-neutral-700 dark:text-neutral-300">
Handle
</label>
<div className="rounded-[55px] border border-slate-200 px-4 py-2.5 dark:border-slate-700 dark:bg-slate-800">
<input
type="text"
name="handle"
id="handle"
defaultValue={profileData.username}
placeholder="@username"
className="font-circular w-full border-none bg-transparent text-base font-normal text-neutral-900 placeholder:text-neutral-400 focus:outline-none dark:text-white dark:placeholder:text-neutral-500"
@@ -148,16 +136,12 @@ export function ProfileInfoForm({ profile }: { profile: ProfileDetails }) {
</div>
<div className="w-full">
<label
htmlFor="bio"
className="font-circular mb-1.5 block text-base font-normal leading-tight text-neutral-700 dark:text-neutral-300"
>
<label className="font-circular mb-1.5 block text-base font-normal leading-tight text-neutral-700 dark:text-neutral-300">
Bio
</label>
<div className="h-[220px] rounded-2xl border border-slate-200 py-2.5 pl-4 pr-4 dark:border-slate-700 dark:bg-slate-800">
<textarea
name="bio"
id="bio"
defaultValue={profileData.description}
placeholder="Tell us about yourself..."
className="font-circular h-full w-full resize-none border-none bg-transparent text-base font-normal text-neutral-900 placeholder:text-neutral-400 focus:outline-none dark:text-white dark:placeholder:text-neutral-500"
@@ -185,17 +169,13 @@ export function ProfileInfoForm({ profile }: { profile: ProfileDetails }) {
const link = profileData.links[linkNum - 1];
return (
<div key={linkNum} className="w-full">
<label
htmlFor={`link${linkNum}`}
className="font-circular mb-1.5 block text-base font-normal leading-tight text-neutral-700 dark:text-neutral-300"
>
<label className="font-circular mb-1.5 block text-base font-normal leading-tight text-neutral-700 dark:text-neutral-300">
Link {linkNum}
</label>
<div className="rounded-[55px] border border-slate-200 px-4 py-2.5 dark:border-slate-700 dark:bg-slate-800">
<input
type="text"
name={`link${linkNum}`}
id={`link${linkNum}`}
placeholder="https://"
defaultValue={link || ""}
className="font-circular w-full border-none bg-transparent text-base font-normal text-neutral-900 placeholder:text-neutral-400 focus:outline-none dark:text-white dark:placeholder:text-neutral-500"
@@ -219,8 +199,7 @@ export function ProfileInfoForm({ profile }: { profile: ProfileDetails }) {
<Separator />
<div className="flex h-[50px] items-center justify-end gap-3 py-8">
{/* FRONTEND-TODO: Need to fix it */}
{/* <Button
<Button
type="button"
variant="secondary"
className="font-circular h-[50px] rounded-[35px] bg-neutral-200 px-6 py-3 text-base font-medium text-neutral-800 transition-colors hover:bg-neutral-300 dark:border-neutral-700 dark:bg-neutral-700 dark:text-neutral-200 dark:hover:border-neutral-600 dark:hover:bg-neutral-600"
@@ -229,7 +208,7 @@ export function ProfileInfoForm({ profile }: { profile: ProfileDetails }) {
}}
>
Cancel
</Button> */}
</Button>
<Button
type="submit"
disabled={isSubmitting}

View File

@@ -13,8 +13,15 @@ const MILLISECONDS_PER_MINUTE = SECONDS_PER_MINUTE * MILLISECONDS_PER_SECOND;
const MILLISECONDS_PER_HOUR = MINUTES_PER_HOUR * MILLISECONDS_PER_MINUTE;
const MILLISECONDS_PER_DAY = HOURS_PER_DAY * MILLISECONDS_PER_HOUR;
const MILLISECONDS_PER_WEEK = DAYS_PER_WEEK * MILLISECONDS_PER_DAY;
// Display constants
const SHORT_DURATION_THRESHOLD_SECONDS = 5;
// State sanity limits - keep only most recent executions to prevent unbounded growth
const MAX_ACTIVE_EXECUTIONS_IN_STATE = 200; // Most important - these are running
const MAX_RECENT_COMPLETIONS_IN_STATE = 100;
const MAX_RECENT_FAILURES_IN_STATE = 100;
export function formatTimeAgo(dateStr: string): string {
const date = new Date(dateStr);
const now = new Date();
@@ -203,15 +210,17 @@ export function categorizeExecutions(
);
// Filter and limit each category to prevent unbounded state growth
const activeExecutions = enrichedExecutions.filter(isActiveExecution);
const activeExecutions = enrichedExecutions
.filter(isActiveExecution)
.slice(0, MAX_ACTIVE_EXECUTIONS_IN_STATE);
const recentCompletions = enrichedExecutions.filter((execution) =>
isRecentCompletion(execution, oneWeekAgo),
);
const recentCompletions = enrichedExecutions
.filter((execution) => isRecentCompletion(execution, oneWeekAgo))
.slice(0, MAX_RECENT_COMPLETIONS_IN_STATE);
const recentFailures = enrichedExecutions.filter((execution) =>
isRecentFailure(execution, oneWeekAgo),
);
const recentFailures = enrichedExecutions
.filter((execution) => isRecentFailure(execution, oneWeekAgo))
.slice(0, MAX_RECENT_FAILURES_IN_STATE);
return {
activeExecutions,
@@ -254,11 +263,20 @@ export function addExecutionToCategory(
const newState = { ...state };
if (isActiveExecution(execution)) {
newState.activeExecutions = [execution, ...newState.activeExecutions];
newState.activeExecutions = [execution, ...newState.activeExecutions].slice(
0,
MAX_ACTIVE_EXECUTIONS_IN_STATE,
);
} else if (isRecentCompletion(execution, oneWeekAgo)) {
newState.recentCompletions = [execution, ...newState.recentCompletions];
newState.recentCompletions = [
execution,
...newState.recentCompletions,
].slice(0, MAX_RECENT_COMPLETIONS_IN_STATE);
} else if (isRecentFailure(execution, oneWeekAgo)) {
newState.recentFailures = [execution, ...newState.recentFailures];
newState.recentFailures = [execution, ...newState.recentFailures].slice(
0,
MAX_RECENT_FAILURES_IN_STATE,
);
}
return newState;

View File

@@ -1,145 +0,0 @@
import { Locator, Page } from "@playwright/test";
import { BasePage } from "./base.page";
import { getSelectors } from "../utils/selectors";
export class ProfileFormPage extends BasePage {
constructor(page: Page) {
super(page);
}
private getId(id: string | RegExp): Locator {
const { getId } = getSelectors(this.page);
return getId(id);
}
private async hideFloatingWidgets(): Promise<void> {
await this.page.addStyleTag({
content: `
[data-tally-open] { display: none !important; }
`,
});
}
// Locators
title(): Locator {
return this.getId("profile-info-form-title");
}
displayNameField(): Locator {
const { getField } = getSelectors(this.page);
return getField("Display name");
}
handleField(): Locator {
const { getField } = getSelectors(this.page);
return getField("Handle");
}
bioField(): Locator {
const { getField } = getSelectors(this.page);
return getField("Bio");
}
linkField(index: number): Locator {
this.assertValidLinkIndex(index);
const { getField } = getSelectors(this.page);
return getField(`Link ${index}`);
}
cancelButton(): Locator {
const { getButton } = getSelectors(this.page);
return getButton("Cancel");
}
saveButton(): Locator {
const { getButton } = getSelectors(this.page);
return getButton("Save changes");
}
// State
async isLoaded(): Promise<boolean> {
try {
await this.title().waitFor({ state: "visible", timeout: 10_000 });
await this.displayNameField().waitFor({
state: "visible",
timeout: 10_000,
});
await this.handleField().waitFor({ state: "visible", timeout: 10_000 });
return true;
} catch {
return false;
}
}
// Actions
async setDisplayName(name: string): Promise<void> {
await this.displayNameField().fill(name);
}
async getDisplayName(): Promise<string> {
return this.displayNameField().inputValue();
}
async setHandle(handle: string): Promise<void> {
await this.handleField().fill(handle);
}
async getHandle(): Promise<string> {
return this.handleField().inputValue();
}
async setBio(bio: string): Promise<void> {
await this.bioField().fill(bio);
}
async getBio(): Promise<string> {
return this.bioField().inputValue();
}
async setLink(index: number, url: string): Promise<void> {
await this.linkField(index).fill(url);
}
async getLink(index: number): Promise<string> {
return this.linkField(index).inputValue();
}
async setLinks(links: Array<string | undefined>): Promise<void> {
for (let i = 1; i <= 5; i++) {
const val = links[i - 1] ?? "";
await this.setLink(i, val);
}
}
async clickCancel(): Promise<void> {
await this.cancelButton().click();
}
async clickSave(): Promise<void> {
await this.saveButton().click();
}
async saveChanges(): Promise<void> {
await this.hideFloatingWidgets();
await this.clickSave();
await this.waitForSaveComplete();
}
async waitForSaveComplete(timeoutMs: number = 15_000): Promise<void> {
const { getButton } = getSelectors(this.page);
await getButton("Save changes").waitFor({
state: "attached",
timeout: timeoutMs,
});
await getButton("Save changes").waitFor({
state: "visible",
timeout: timeoutMs,
});
}
private assertValidLinkIndex(index: number) {
if (index < 1 || index > 5) {
throw new Error(`Link index must be between 1 and 5. Received: ${index}`);
}
}
}

View File

@@ -0,0 +1,182 @@
import { expect, Locator, Page } from "@playwright/test";
import { BasePage } from "./base.page";
import { getSelectors } from "../utils/selectors";
export async function getSwitchState(toggle: Locator): Promise<boolean> {
const ariaChecked = await toggle.getAttribute("aria-checked");
if (ariaChecked === "true") return true;
if (ariaChecked === "false") return false;
const dataState = await toggle.getAttribute("data-state");
if (dataState === "checked") return true;
if (dataState === "unchecked") return false;
const dataChecked = await toggle.getAttribute("data-checked");
return dataChecked !== null;
}
type ToggleId =
| "settings-notify-on-agent-run"
| "settings-notify-on-block-execution-failed"
| "settings-notify-on-continuous-agent-error"
| "settings-notify-on-zero-balance"
| "settings-notify-on-low-balance"
| "settings-notify-on-daily-summary"
| "settings-notify-on-weekly-summary"
| "settings-notify-on-monthly-summary";
export const TOGGLE_IDS: ReadonlyArray<ToggleId> = [
"settings-notify-on-agent-run",
"settings-notify-on-block-execution-failed",
"settings-notify-on-continuous-agent-error",
"settings-notify-on-zero-balance",
"settings-notify-on-low-balance",
"settings-notify-on-daily-summary",
"settings-notify-on-weekly-summary",
"settings-notify-on-monthly-summary",
];
export class SettingsPage extends BasePage {
static readonly path = "/profile/settings";
constructor(page: Page) {
super(page);
}
async goto(): Promise<void> {
await this.page.goto(SettingsPage.path);
await this.isLoaded();
}
async isLoaded(): Promise<boolean> {
try {
await this.page.waitForLoadState("domcontentloaded");
const header = this.page.getByRole("heading", { name: "My account" });
const email = this.getEmailInput();
await Promise.all([
header.waitFor({ state: "visible" }),
email.waitFor({ state: "visible" }),
]);
return true;
} catch {
return false;
}
}
getEmailInput(): Locator {
return this.page.getByTestId("settings-email");
}
getPasswordInput(): Locator {
return this.page.getByTestId("settings-password");
}
getConfirmPasswordInput(): Locator {
return this.page.getByTestId("settings-confirm-password");
}
getToggle(id: ToggleId): Locator {
return this.page.getByTestId(id);
}
getCancelButton(): Locator {
return this.page.getByTestId("settings-cancel");
}
getSaveButton(): Locator {
return this.page.getByRole("button", { name: /Save changes|Saving\.\.\./ });
}
async setEmail(value: string): Promise<void> {
const input = this.getEmailInput();
await input.waitFor({ state: "visible" });
await input.fill(value);
await expect(input).toHaveValue(value);
}
async setPassword(value: string): Promise<void> {
const input = this.getPasswordInput();
await input.waitFor({ state: "visible" });
await input.fill(value);
await expect(input).toHaveValue(value);
}
async setConfirmPassword(value: string): Promise<void> {
const input = this.getConfirmPasswordInput();
await input.waitFor({ state: "visible" });
await input.fill(value);
await expect(input).toHaveValue(value);
}
private async getSwitchState(toggle: Locator): Promise<boolean> {
const ariaChecked = await toggle.getAttribute("aria-checked");
if (ariaChecked === "true") return true;
if (ariaChecked === "false") return false;
const dataState = await toggle.getAttribute("data-state");
if (dataState === "checked") return true;
if (dataState === "unchecked") return false;
const dataChecked = await toggle.getAttribute("data-checked");
return dataChecked !== null;
}
private async setSwitchState(toggle: Locator, desired: boolean): Promise<void> {
await toggle.waitFor({ state: "visible" });
const current = await this.getSwitchState(toggle);
if (current !== desired) {
await toggle.click();
await expect
.poll(async () => this.getSwitchState(toggle))
.toBe(desired);
}
}
async toggle(id: ToggleId): Promise<void> {
await this.getToggle(id).click();
}
async enable(id: ToggleId): Promise<void> {
await this.setSwitchState(this.getToggle(id), true);
}
async disable(id: ToggleId): Promise<void> {
await this.setSwitchState(this.getToggle(id), false);
}
async cancelChanges(): Promise<void> {
const btn = this.getCancelButton();
await btn.waitFor({ state: "visible" });
await btn.click();
await this.getEmailInput().waitFor({ state: "visible" });
}
async saveChanges(): Promise<void> {
const btn = this.getSaveButton();
await btn.waitFor({ state: "visible" });
await btn.click();
await this.waitForSaveComplete();
}
async waitForSaveComplete(): Promise<void> {
const { getText } = getSelectors(this.page);
const toast = getText("Successfully updated settings");
await Promise.race([
toast.waitFor({ state: "visible" }),
expect(this.getSaveButton()).toHaveText("Save changes"),
]);
}
async getEmailValue(): Promise<string> {
return this.getEmailInput().inputValue();
}
async expectValidationError(text: string | RegExp): Promise<void> {
const { getText } = getSelectors(this.page);
await expect(getText(text)).toBeVisible();
}
}
export async function navigateToSettings(page: Page): Promise<SettingsPage> {
const settings = new SettingsPage(page);
await settings.goto();
return settings;
}

View File

@@ -1,108 +0,0 @@
import test, { expect } from "@playwright/test";
import { ProfileFormPage } from "./pages/profile-form.page";
import { LoginPage } from "./pages/login.page";
import { hasUrl } from "./utils/assertion";
import { TEST_CREDENTIALS } from "./credentials";
test.describe("Profile Form", () => {
let profileFormPage: ProfileFormPage;
test.beforeEach(async ({ page }) => {
profileFormPage = new ProfileFormPage(page);
const loginPage = new LoginPage(page);
await loginPage.goto();
await loginPage.login(TEST_CREDENTIALS.email, TEST_CREDENTIALS.password);
await hasUrl(page, "/marketplace");
});
test("redirects to login when user is not authenticated", async ({
browser,
}) => {
const context = await browser.newContext();
const page = await context.newPage();
try {
await page.goto("/profile");
await hasUrl(page, "/login");
} finally {
await page.close();
await context.close();
}
});
test("can save profile changes successfully", async ({ page }) => {
await profileFormPage.navbar.clickProfileLink();
await expect(profileFormPage.isLoaded()).resolves.toBeTruthy();
await hasUrl(page, new RegExp("/profile"));
const suffix = Date.now().toString().slice(-6);
const newDisplayName = `E2E Name ${suffix}`;
const newHandle = `e2euser${suffix}`;
const newBio = `E2E bio ${suffix}`;
const newLinks = [
`https://example.com/${suffix}/1`,
`https://example.com/${suffix}/2`,
`https://example.com/${suffix}/3`,
`https://example.com/${suffix}/4`,
`https://example.com/${suffix}/5`,
];
await profileFormPage.setDisplayName(newDisplayName);
await profileFormPage.setHandle(newHandle);
await profileFormPage.setBio(newBio);
await profileFormPage.setLinks(newLinks);
await profileFormPage.saveChanges();
expect(await profileFormPage.getDisplayName()).toBe(newDisplayName);
expect(await profileFormPage.getHandle()).toBe(newHandle);
expect(await profileFormPage.getBio()).toBe(newBio);
for (let i = 1; i <= 5; i++) {
expect(await profileFormPage.getLink(i)).toBe(newLinks[i - 1]);
}
await page.reload();
await expect(profileFormPage.isLoaded()).resolves.toBeTruthy();
expect(await profileFormPage.getDisplayName()).toBe(newDisplayName);
expect(await profileFormPage.getHandle()).toBe(newHandle);
expect(await profileFormPage.getBio()).toBe(newBio);
for (let i = 1; i <= 5; i++) {
expect(await profileFormPage.getLink(i)).toBe(newLinks[i - 1]);
}
});
// Currently we are not using hook form inside the profile form, so cancel button is not working as expected, once that's fixed, we can unskip this test
test.skip("can cancel profile changes", async ({ page }) => {
await profileFormPage.navbar.clickProfileLink();
await expect(profileFormPage.isLoaded()).resolves.toBeTruthy();
await hasUrl(page, new RegExp("/profile"));
const originalDisplayName = await profileFormPage.getDisplayName();
const originalHandle = await profileFormPage.getHandle();
const originalBio = await profileFormPage.getBio();
const originalLinks: string[] = [];
for (let i = 1; i <= 5; i++) {
originalLinks.push(await profileFormPage.getLink(i));
}
const suffix = `${Date.now().toString().slice(-6)}_cancel`;
await profileFormPage.setDisplayName(`Tmp Name ${suffix}`);
await profileFormPage.setHandle(`tmpuser${suffix}`);
await profileFormPage.setBio(`Tmp bio ${suffix}`);
for (let i = 1; i <= 5; i++) {
await profileFormPage.setLink(i, `https://tmp.example/${suffix}/${i}`);
}
await profileFormPage.clickCancel();
expect(await profileFormPage.getDisplayName()).toBe(originalDisplayName);
expect(await profileFormPage.getHandle()).toBe(originalHandle);
expect(await profileFormPage.getBio()).toBe(originalBio);
for (let i = 1; i <= 5; i++) {
expect(await profileFormPage.getLink(i)).toBe(originalLinks[i - 1]);
}
});
});

View File

@@ -0,0 +1,91 @@
import test, { expect } from "@playwright/test";
import { LoginPage } from "./pages/login.page";
import { TEST_CREDENTIALS } from "./credentials";
import { hasFieldValue, hasUrl } from "./utils/assertion";
import { navigateToSettings, getSwitchState, TOGGLE_IDS } from "./pages/settings.page";
test.describe("Settings", () => {
test.beforeEach(async ({ page }) => {
await page.goto("/login");
const loginPage = new LoginPage(page);
await loginPage.login(TEST_CREDENTIALS.email, TEST_CREDENTIALS.password);
await hasUrl(page, "/marketplace");
});
test("settings page redirects to login when not authenticated", async ({ browser }) => {
const context = await browser.newContext();
const page = await context.newPage();
await page.goto("/profile/settings");
await hasUrl(page, /\/login/);
await context.close();
});
test("user can successfully update settings", async ({ page }) => {
const settings = await navigateToSettings(page);
for (const id of TOGGLE_IDS) {
const state = await getSwitchState(settings.getToggle(id));
expect.soft(state, `Initial state of ${id}`).toBe(true);
}
// Turn all ON, change email/password, save
for (const id of TOGGLE_IDS) {
await settings.disable(id);
}
const tempEmail = `temp+e2e@example.com`;
await settings.setEmail(tempEmail);
await settings.setPassword("temporarypassword123");
await settings.setConfirmPassword("temporarypassword123");
await settings.saveChanges();
// Reload and verify change email/password and OFF persisted
await settings.goto();
for (const id of TOGGLE_IDS) {
const state = await getSwitchState(settings.getToggle(id));
expect.soft(state, `Persisted OFF state of ${id}`).toBe(false);
}
await hasFieldValue(settings.getEmailInput(), tempEmail);
await hasFieldValue(settings.getPasswordInput(), "temporarypassword123");
// Restore original test email and password
await settings.setEmail(TEST_CREDENTIALS.email);
await settings.setPassword(TEST_CREDENTIALS.password);
await settings.setConfirmPassword(TEST_CREDENTIALS.password);
for (const id of TOGGLE_IDS) {
await settings.enable(id);
}
await settings.saveChanges();
// Reload and verify change email/password and ON persisted
await settings.goto();
for (const id of TOGGLE_IDS) {
const state = await getSwitchState(settings.getToggle(id));
expect.soft(state, `Persisted ON state of ${id}`).toBe(true);
}
await hasFieldValue(settings.getEmailInput(), TEST_CREDENTIALS.email);
await hasFieldValue(settings.getPasswordInput(), TEST_CREDENTIALS.password);
});
test("user can cancel changes", async ({ page }) => {
const settings = await navigateToSettings(page);
const initialEmail = await settings.getEmailValue();
await settings.setEmail("settings+cancel@example.com");
await settings.cancelChanges();
await hasFieldValue(settings.getEmailInput(), initialEmail);
});
test("settings form shows validation errors for invalid inputs", async ({ page }) => {
const settings = await navigateToSettings(page);
await settings.setEmail("invalid-email");
await settings.setPassword("short");
await settings.setConfirmPassword("short");
await settings.saveChanges();
await settings.expectValidationError("Invalid email");
await settings.expectValidationError("String must contain at least 12 character(s)");
});
});