fix(backend/scheduler): Unbreak Scheduler.get_execution_schedules (#9919)

- Resolves #9918
- Follow-up fix for #9914

### Changes 🏗️

- In `get_graph_execution_schedules`, skip jobs when their kwargs can't
be parsed as `GraphExecutionJobArgs`
- Rename methods of `Scheduler` to clarify their scope (scheduled
*graph* executions)

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  - Go to `/library/agents/[id]` (which calls `GET /api/schedules`)
    - [x] -> `GET /api/schedules` request returns HTTP 200
This commit is contained in:
Reinier van der Leer
2025-05-08 13:26:51 +02:00
committed by GitHub
parent 1ad6c76f9c
commit 433b76b539
2 changed files with 32 additions and 24 deletions

View File

@@ -13,7 +13,7 @@ from apscheduler.triggers.cron import CronTrigger
from autogpt_libs.utils.cache import thread_cached
from dotenv import load_dotenv
from prisma.enums import NotificationType
from pydantic import BaseModel
from pydantic import BaseModel, ValidationError
from sqlalchemy import MetaData, create_engine
from backend.data.block import BlockInput
@@ -72,7 +72,7 @@ def get_notification_client():
def execute_graph(**kwargs):
args = ExecutionJobArgs(**kwargs)
args = GraphExecutionJobArgs(**kwargs)
try:
log(f"Executing recurring job for graph #{args.graph_id}")
execution_utils.add_graph_execution(
@@ -140,7 +140,7 @@ class Jobstores(Enum):
WEEKLY_NOTIFICATIONS = "weekly_notifications"
class ExecutionJobArgs(BaseModel):
class GraphExecutionJobArgs(BaseModel):
graph_id: str
input_data: BlockInput
user_id: str
@@ -148,14 +148,16 @@ class ExecutionJobArgs(BaseModel):
cron: str
class ExecutionJobInfo(ExecutionJobArgs):
class GraphExecutionJobInfo(GraphExecutionJobArgs):
id: str
name: str
next_run_time: str
@staticmethod
def from_db(job_args: ExecutionJobArgs, job_obj: JobObj) -> "ExecutionJobInfo":
return ExecutionJobInfo(
def from_db(
job_args: GraphExecutionJobArgs, job_obj: JobObj
) -> "GraphExecutionJobInfo":
return GraphExecutionJobInfo(
id=job_obj.id,
name=job_obj.name,
next_run_time=job_obj.next_run_time.isoformat(),
@@ -269,15 +271,15 @@ class Scheduler(AppService):
self.scheduler.shutdown(wait=False)
@expose
def add_execution_schedule(
def add_graph_execution_schedule(
self,
graph_id: str,
graph_version: int,
cron: str,
input_data: BlockInput,
user_id: str,
) -> ExecutionJobInfo:
job_args = ExecutionJobArgs(
) -> GraphExecutionJobInfo:
job_args = GraphExecutionJobArgs(
graph_id=graph_id,
input_data=input_data,
user_id=user_id,
@@ -292,40 +294,46 @@ class Scheduler(AppService):
jobstore=Jobstores.EXECUTION.value,
)
log(f"Added job {job.id} with cron schedule '{cron}' input data: {input_data}")
return ExecutionJobInfo.from_db(job_args, job)
return GraphExecutionJobInfo.from_db(job_args, job)
@expose
def delete_schedule(self, schedule_id: str, user_id: str) -> ExecutionJobInfo:
def delete_graph_execution_schedule(
self, schedule_id: str, user_id: str
) -> GraphExecutionJobInfo:
job = self.scheduler.get_job(schedule_id, jobstore=Jobstores.EXECUTION.value)
if not job:
log(f"Job {schedule_id} not found.")
raise ValueError(f"Job #{schedule_id} not found.")
job_args = ExecutionJobArgs(**job.kwargs)
job_args = GraphExecutionJobArgs(**job.kwargs)
if job_args.user_id != user_id:
raise ValueError("User ID does not match the job's user ID.")
log(f"Deleting job {schedule_id}")
job.remove()
return ExecutionJobInfo.from_db(job_args, job)
return GraphExecutionJobInfo.from_db(job_args, job)
@expose
def get_execution_schedules(
def get_graph_execution_schedules(
self, graph_id: str | None = None, user_id: str | None = None
) -> list[ExecutionJobInfo]:
) -> list[GraphExecutionJobInfo]:
jobs: list[JobObj] = self.scheduler.get_jobs(jobstore=Jobstores.EXECUTION.value)
schedules = []
for job in self.scheduler.get_jobs(jobstore=Jobstores.EXECUTION.value):
logger.info(
for job in jobs:
logger.debug(
f"Found job {job.id} with cron schedule {job.trigger} and args {job.kwargs}"
)
job_args = ExecutionJobArgs(**job.kwargs)
try:
job_args = GraphExecutionJobArgs.model_validate(job.kwargs)
except ValidationError:
continue
if (
job.next_run_time is not None
and (graph_id is None or job_args.graph_id == graph_id)
and (user_id is None or job_args.user_id == user_id)
):
schedules.append(ExecutionJobInfo.from_db(job_args, job))
schedules.append(GraphExecutionJobInfo.from_db(job_args, job))
return schedules
@expose
@@ -346,6 +354,6 @@ class SchedulerClient(AppServiceClient):
def get_service_type(cls):
return Scheduler
add_execution_schedule = endpoint_to_async(Scheduler.add_execution_schedule)
delete_schedule = endpoint_to_async(Scheduler.delete_schedule)
get_execution_schedules = endpoint_to_async(Scheduler.get_execution_schedules)
add_execution_schedule = endpoint_to_async(Scheduler.add_graph_execution_schedule)
delete_schedule = endpoint_to_async(Scheduler.delete_graph_execution_schedule)
get_execution_schedules = endpoint_to_async(Scheduler.get_graph_execution_schedules)

View File

@@ -773,7 +773,7 @@ class ScheduleCreationRequest(pydantic.BaseModel):
async def create_schedule(
user_id: Annotated[str, Depends(get_user_id)],
schedule: ScheduleCreationRequest,
) -> scheduler.ExecutionJobInfo:
) -> scheduler.GraphExecutionJobInfo:
graph = await graph_db.get_graph(
schedule.graph_id, schedule.graph_version, user_id=user_id
)
@@ -813,7 +813,7 @@ async def delete_schedule(
async def get_execution_schedules(
user_id: Annotated[str, Depends(get_user_id)],
graph_id: str | None = None,
) -> list[scheduler.ExecutionJobInfo]:
) -> list[scheduler.GraphExecutionJobInfo]:
return await execution_scheduler_client().get_execution_schedules(
user_id=user_id,
graph_id=graph_id,