Files
AutoGPT/rnd/autogpt_server/test/executor/test_scheduler.py
Zamil Majdy 9f1e521857 feat(rnd): Add AutoGPT server scheduling service (#7226)
### Background

Agent execution should be able to be triggered in a recurring manner.
This PR introduced an ExecutionScheduling service, a process responsible for managing the execution schedule and triggering its execution based on a predefined cron expression.

### Changes 🏗️

* Added `scheduler.py` / `ExecutionScheduler` implementation.
* Added scheduler test.
* Added `AgentExecutionSchedule` table and its logical model & prisma queries.
* Moved `add_execution` from API server to `execution_manager`
2024-06-24 09:41:02 +07:00

34 lines
1.1 KiB
Python

import pytest
import test_manager
from autogpt_server.executor.scheduler import ExecutionScheduler
from autogpt_server.util.service import PyroNameServer, get_service_client
@pytest.mark.asyncio(scope="session")
async def test_agent_schedule():
await test_manager.db.connect()
test_graph = await test_manager.create_test_graph()
with PyroNameServer():
with ExecutionScheduler():
scheduler = get_service_client(ExecutionScheduler)
schedules = scheduler.get_execution_schedules(test_graph.id)
assert len(schedules) == 0
schedule_id = scheduler.add_execution_schedule(
test_graph.id,
"0 0 * * *",
{"input": "data"}
)
assert schedule_id
schedules = scheduler.get_execution_schedules(test_graph.id)
assert len(schedules) == 1
assert schedules[schedule_id] == "0 0 * * *"
scheduler.update_schedule(schedule_id, is_enabled=False)
schedules = scheduler.get_execution_schedules(test_graph.id)
assert len(schedules) == 0