Merge branch 'dev' into swiftyos/simplify-local-supabase

This commit is contained in:
Swifty
2025-08-08 15:28:35 +02:00
committed by GitHub
7 changed files with 132 additions and 39 deletions

View File

@@ -41,7 +41,13 @@ from backend.data.user import (
get_user_notification_preference,
update_user_integrations,
)
from backend.util.service import AppService, AppServiceClient, endpoint_to_sync, expose
from backend.util.service import (
AppService,
AppServiceClient,
UnhealthyServiceError,
endpoint_to_sync,
expose,
)
from backend.util.settings import Config
config = Config()
@@ -75,7 +81,7 @@ class DatabaseManager(AppService):
def health_check(self) -> str:
if not db.is_connected():
raise RuntimeError("Database is not connected")
raise UnhealthyServiceError("Database is not connected")
return super().health_check()
@classmethod

View File

@@ -1361,6 +1361,25 @@ class ExecutionManager(AppProcess):
else:
utilization_gauge.set(active_count / self.pool_size)
def _stop_message_consumers(
self, thread: threading.Thread, client: SyncRabbitMQ, prefix: str
):
try:
channel = client.get_channel()
channel.connection.add_callback_threadsafe(lambda: channel.stop_consuming())
try:
thread.join(timeout=300)
except TimeoutError:
logger.error(
f"{prefix} ⚠️ Run thread did not finish in time, forcing disconnect"
)
client.disconnect()
logger.info(f"{prefix} ✅ Run client disconnected")
except Exception as e:
logger.error(f"{prefix} ⚠️ Error disconnecting run client: {type(e)} {e}")
def cleanup(self):
"""Override cleanup to implement graceful shutdown with active execution waiting."""
prefix = f"[{self.service_name}][on_graph_executor_stop {os.getpid()}]"
@@ -1416,32 +1435,16 @@ class ExecutionManager(AppProcess):
logger.error(f"{prefix} ⚠️ Error during executor shutdown: {type(e)} {e}")
# Disconnect the run execution consumer
try:
run_channel = self.run_client.get_channel()
run_channel.connection.add_callback_threadsafe(
lambda: run_channel.stop_consuming()
)
self.run_thread.join()
run_channel.connection.add_callback_threadsafe(
lambda: self.run_client.disconnect()
)
logger.info(f"{prefix} ✅ Run client disconnected")
except Exception as e:
logger.error(f"{prefix} ⚠️ Error disconnecting run client: {type(e)} {e}")
# Disconnect the cancel execution consumer
try:
cancel_channel = self.cancel_client.get_channel()
cancel_channel.connection.add_callback_threadsafe(
lambda: cancel_channel.stop_consuming()
)
self.cancel_thread.join()
cancel_channel.connection.add_callback_threadsafe(
lambda: self.cancel_client.disconnect()
)
logger.info(f"{prefix} ✅ Cancel client disconnected")
except Exception as e:
logger.error(f"{prefix} ⚠️ Error disconnecting cancel client: {type(e)} {e}")
self._stop_message_consumers(
self.run_thread,
self.run_client,
prefix + " [run-consumer]",
)
self._stop_message_consumers(
self.cancel_thread,
self.cancel_client,
prefix + " [cancel-consumer]",
)
logger.info(f"{prefix} ✅ Finished GraphExec cleanup")

View File

@@ -6,7 +6,12 @@ from enum import Enum
from typing import Optional
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse
from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED
from apscheduler.events import (
EVENT_JOB_ERROR,
EVENT_JOB_EXECUTED,
EVENT_JOB_MAX_INSTANCES,
EVENT_JOB_MISSED,
)
from apscheduler.job import Job as JobObj
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
@@ -31,7 +36,13 @@ from backend.util.cloud_storage import cleanup_expired_files_async
from backend.util.exceptions import NotAuthorizedError, NotFoundError
from backend.util.logging import PrefixFilter
from backend.util.retry import func_retry
from backend.util.service import AppService, AppServiceClient, endpoint_to_async, expose
from backend.util.service import (
AppService,
AppServiceClient,
UnhealthyServiceError,
endpoint_to_async,
expose,
)
from backend.util.settings import Config
@@ -68,11 +79,30 @@ SCHEDULER_OPERATION_TIMEOUT_SECONDS = 300 # 5 minutes for scheduler operations
def job_listener(event):
"""Logs job execution outcomes for better monitoring."""
if event.exception:
logger.error(f"Job {event.job_id} failed.")
logger.error(
f"Job {event.job_id} failed: {type(event.exception).__name__}: {event.exception}"
)
else:
logger.info(f"Job {event.job_id} completed successfully.")
def job_missed_listener(event):
"""Logs when jobs are missed due to scheduling issues."""
logger.warning(
f"Job {event.job_id} was missed at scheduled time {event.scheduled_run_time}. "
f"This can happen if the scheduler is overloaded or if previous executions are still running."
)
def job_max_instances_listener(event):
"""Logs when jobs hit max instances limit."""
logger.warning(
f"Job {event.job_id} execution was SKIPPED - max instances limit reached. "
f"Previous execution(s) are still running. "
f"Consider increasing max_instances or check why previous executions are taking too long."
)
_event_loop: asyncio.AbstractEventLoop | None = None
_event_loop_thread: threading.Thread | None = None
@@ -89,7 +119,11 @@ def run_async(coro, timeout: float = SCHEDULER_OPERATION_TIMEOUT_SECONDS):
"""Run a coroutine in the shared event loop and wait for completion."""
loop = get_event_loop()
future = asyncio.run_coroutine_threadsafe(coro, loop)
return future.result(timeout=timeout)
try:
return future.result(timeout=timeout)
except Exception as e:
logger.error(f"Async operation failed: {type(e).__name__}: {e}")
raise
def execute_graph(**kwargs):
@@ -192,7 +226,7 @@ class Scheduler(AppService):
def health_check(self) -> str:
# Thread-safe health check with proper initialization handling
if not hasattr(self, "scheduler"):
raise RuntimeError("Scheduler is still initializing")
raise UnhealthyServiceError("Scheduler is still initializing")
# Check if we're in the middle of cleanup
if self.cleaned_up:
@@ -200,8 +234,7 @@ class Scheduler(AppService):
# Normal operation - check if scheduler is running
if not self.scheduler.running:
logger.error(f"{self.service_name} the scheduler is not running!")
raise RuntimeError("Scheduler is not running")
raise UnhealthyServiceError("Scheduler is not running")
return super().health_check()
@@ -220,7 +253,18 @@ class Scheduler(AppService):
_event_loop_thread.start()
db_schema, db_url = _extract_schema_from_url(os.getenv("DIRECT_URL"))
# Configure executors to limit concurrency without skipping jobs
from apscheduler.executors.pool import ThreadPoolExecutor
self.scheduler = BlockingScheduler(
executors={
"default": ThreadPoolExecutor(max_workers=10), # Max 10 concurrent jobs
},
job_defaults={
"coalesce": True, # Skip redundant missed jobs - just run the latest
"max_instances": 1000, # Effectively unlimited - never drop executions
"misfire_grace_time": None, # No time limit for missed jobs
},
jobstores={
Jobstores.EXECUTION.value: SQLAlchemyJobStore(
engine=create_engine(
@@ -300,6 +344,8 @@ class Scheduler(AppService):
)
self.scheduler.add_listener(job_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
self.scheduler.add_listener(job_missed_listener, EVENT_JOB_MISSED)
self.scheduler.add_listener(job_max_instances_listener, EVENT_JOB_MAX_INSTANCES)
self.scheduler.start()
def cleanup(self):

View File

@@ -879,11 +879,19 @@ async def add_graph_execution(
graph_exec_entry = graph_exec.to_graph_execution_entry()
if nodes_input_masks:
graph_exec_entry.nodes_input_masks = nodes_input_masks
logger.info(
f"Created graph execution #{graph_exec.id} for graph "
f"#{graph_id} with {len(starting_nodes_input)} starting nodes. "
f"Now publishing to execution queue."
)
await queue.publish_message(
routing_key=GRAPH_EXECUTION_ROUTING_KEY,
message=graph_exec_entry.model_dump_json(),
exchange=GRAPH_EXECUTION_EXCHANGE,
)
logger.info(f"Published execution {graph_exec.id} to RabbitMQ queue")
bus = get_async_execution_event_bus()
await bus.publish(graph_exec)

View File

@@ -31,7 +31,13 @@ from backend.util.clients import get_database_manager_async_client
from backend.util.logging import TruncatedLogger
from backend.util.metrics import discord_send_alert
from backend.util.retry import continuous_retry
from backend.util.service import AppService, AppServiceClient, endpoint_to_sync, expose
from backend.util.service import (
AppService,
AppServiceClient,
UnhealthyServiceError,
endpoint_to_sync,
expose,
)
from backend.util.settings import Settings
logger = TruncatedLogger(logging.getLogger(__name__), "[NotificationManager]")
@@ -183,16 +189,24 @@ class NotificationManager(AppService):
def rabbit(self) -> rabbitmq.AsyncRabbitMQ:
"""Access the RabbitMQ service. Will raise if not configured."""
if not hasattr(self, "rabbitmq_service") or not self.rabbitmq_service:
raise RuntimeError("RabbitMQ not configured for this service")
raise UnhealthyServiceError("RabbitMQ not configured for this service")
return self.rabbitmq_service
@property
def rabbit_config(self) -> rabbitmq.RabbitMQConfig:
"""Access the RabbitMQ config. Will raise if not configured."""
if not self.rabbitmq_config:
raise RuntimeError("RabbitMQ not configured for this service")
raise UnhealthyServiceError("RabbitMQ not configured for this service")
return self.rabbitmq_config
def health_check(self) -> str:
# Service is unhealthy if RabbitMQ is not ready
if not hasattr(self, "rabbitmq_service") or not self.rabbitmq_service:
raise UnhealthyServiceError("RabbitMQ not configured for this service")
if not self.rabbitmq_service.is_ready:
raise UnhealthyServiceError("RabbitMQ channel is not ready")
return super().health_check()
@classmethod
def get_port(cls) -> int:
return settings.config.notification_service_port

View File

@@ -41,6 +41,7 @@ from backend.server.external.api import external_app
from backend.server.middleware.security import SecurityHeadersMiddleware
from backend.util import json
from backend.util.cloud_storage import shutdown_cloud_storage_handler
from backend.util.service import UnhealthyServiceError
settings = backend.util.settings.Settings()
logger = logging.getLogger(__name__)
@@ -232,7 +233,7 @@ app.mount("/external-api", external_app)
@app.get(path="/health", tags=["health"], dependencies=[])
async def health():
if not backend.data.db.is_connected():
raise RuntimeError("Database is not connected")
raise UnhealthyServiceError("Database is not connected")
return {"status": "healthy"}

View File

@@ -97,6 +97,20 @@ class RemoteCallError(BaseModel):
args: Optional[Tuple[Any, ...]] = None
class UnhealthyServiceError(ValueError):
def __init__(
self, message: str = "Service is unhealthy or not ready", log: bool = True
):
msg = f"[{get_service_name()}] - {message}"
super().__init__(msg)
self.message = msg
if log:
logger.error(self.message)
def __str__(self):
return self.message
EXCEPTION_MAPPING = {
e.__name__: e
for e in [
@@ -104,6 +118,7 @@ EXCEPTION_MAPPING = {
RuntimeError,
TimeoutError,
ConnectionError,
UnhealthyServiceError,
*[
ErrorType
for _, ErrorType in inspect.getmembers(exceptions)