feat(rnd): Add AutoGPT server scheduling service (#7226)

### Background

Agent execution should be able to be triggered in a recurring manner.
This PR introduced an ExecutionScheduling service, a process responsible for managing the execution schedule and triggering its execution based on a predefined cron expression.

### Changes 🏗️

* Added `scheduler.py` / `ExecutionScheduler` implementation.
* Added scheduler test.
* Added `AgentExecutionSchedule` table and its logical model & prisma queries.
* Moved `add_execution` from API server to `execution_manager`
This commit is contained in:
Zamil Majdy
2024-06-24 06:41:02 +04:00
committed by GitHub
parent d9226888b2
commit 9f1e521857
12 changed files with 474 additions and 124 deletions

View File

@@ -22,8 +22,8 @@ def main(**kwargs):
run_processes(
[
PyroNameServer(),
ExecutionScheduler(),
ExecutionManager(pool_size=5),
ExecutionScheduler(),
AgentServer(),
],
**kwargs

View File

@@ -0,0 +1,89 @@
import json
from datetime import datetime
from typing import Optional, Any
from prisma.models import AgentExecutionSchedule
from autogpt_server.data.db import BaseDbModel
class ExecutionSchedule(BaseDbModel):
id: str
agent_id: str
schedule: str
is_enabled: bool
input_data: dict[str, Any]
last_updated: Optional[datetime] = None
def __init__(
self,
is_enabled: Optional[bool] = None,
**kwargs
):
if is_enabled is None:
is_enabled = True
super().__init__(is_enabled=is_enabled, **kwargs)
@staticmethod
def from_db(schedule: AgentExecutionSchedule):
return ExecutionSchedule(
id=schedule.id,
agent_id=schedule.agentGraphId,
schedule=schedule.schedule,
is_enabled=schedule.isEnabled,
last_updated=schedule.lastUpdated.replace(tzinfo=None),
input_data=json.loads(schedule.inputData),
)
async def get_active_schedules(last_fetch_time: datetime) -> list[ExecutionSchedule]:
query = AgentExecutionSchedule.prisma().find_many(
where={
"isEnabled": True,
"lastUpdated": {"gt": last_fetch_time}
},
order={"lastUpdated": "asc"}
)
return [
ExecutionSchedule.from_db(schedule)
for schedule in await query
]
async def disable_schedule(schedule_id: str):
await AgentExecutionSchedule.prisma().update(
where={"id": schedule_id},
data={"isEnabled": False}
)
async def get_schedules(agent_id: str) -> list[ExecutionSchedule]:
query = AgentExecutionSchedule.prisma().find_many(
where={
"isEnabled": True,
"agentGraphId": agent_id,
},
)
return [
ExecutionSchedule.from_db(schedule)
for schedule in await query
]
async def add_schedule(schedule: ExecutionSchedule):
await AgentExecutionSchedule.prisma().create(
data={
"id": schedule.id,
"agentGraphId": schedule.agent_id,
"schedule": schedule.schedule,
"isEnabled": schedule.is_enabled,
"inputData": json.dumps(schedule.input_data),
}
)
async def update_schedule(schedule_id: str, is_enabled: bool):
await AgentExecutionSchedule.prisma().update(
where={"id": schedule_id},
data={"isEnabled": is_enabled}
)

View File

@@ -1,12 +1,12 @@
import asyncio
import logging
import uuid
from concurrent.futures import ProcessPoolExecutor
from typing import Optional, Any
from autogpt_server.data import db
from autogpt_server.data.block import Block, get_block
from autogpt_server.data.graph import Node, get_node, get_node_input
from autogpt_server.data.graph import Node, get_node, get_node_input, get_graph
from autogpt_server.data.execution import (
Execution,
ExecutionQueue,
@@ -21,7 +21,7 @@ logger = logging.getLogger(__name__)
def get_log_prefix(run_id: str, exec_id: str, block_name: str = "-"):
return f"[Execution graph-{run_id}|node-{exec_id}|{block_name}]"
return f"[ExecutionManager] [graph-{run_id}|node-{exec_id}|{block_name}]"
def execute_node(loop: asyncio.AbstractEventLoop, data: Execution) -> Execution | None:
@@ -80,28 +80,40 @@ def execute_node(loop: asyncio.AbstractEventLoop, data: Execution) -> Execution
return None
next_node_input: dict[str, Any] = wait(get_node_input(next_node, run_id))
next_node_block: Block | None = wait(get_block(next_node.block_id))
if not next_node_block:
logger.error(f"{prefix} Error, next block {next_node.block_id} not found.")
is_valid, validation_resp = wait(validate_exec(next_node, next_node_input))
if not is_valid:
logger.warning(f"{prefix} Skipped {next_node_id}: {validation_resp}")
return None
if not set(next_node.input_nodes).issubset(next_node_input):
logger.warning(
f"{prefix} Skipped {next_node_id}-{next_node_block.name}, "
f"missing: {set(next_node.input_nodes) - set(next_node_input)}"
)
return None
if error := next_node_block.input_schema.validate_data(next_node_input):
logger.warning(
f"{prefix} Skipped {next_node_id}-{next_node_block.name}, {error}"
)
return None
logger.warning(f"{prefix} Enqueue next node {next_node_id}-{next_node_block.name}")
logger.warning(f"{prefix} Enqueue next node {next_node_id}-{validation_resp}")
return Execution(run_id=run_id, node_id=next_node_id, data=next_node_input)
async def validate_exec(node: Node, data: dict[str, Any]) -> tuple[bool, str]:
"""
Validate the input data for a node execution.
Args:
node: The node to execute.
data: The input data for the node execution.
Returns:
A tuple of a boolean indicating if the data is valid, and a message if not.
Return the executed block name if the data is valid.
"""
node_block: Block | None = await(get_block(node.block_id))
if not node_block:
return False, f"Block for {node.block_id} not found."
if not set(node.input_nodes).issubset(data):
return False, f"Input data missing: {set(node.input_nodes) - set(data)}"
if error := node_block.input_schema.validate_data(data):
return False, f"Input data doesn't match {node_block.name}: {error}"
return True, node_block.name
class Executor:
loop: asyncio.AbstractEventLoop
@@ -138,7 +150,7 @@ class ExecutionManager(AppService):
execution = f.result()
if execution:
return self.__add_execution(execution)
return self.add_node_execution(execution)
return None
@@ -155,14 +167,34 @@ class ExecutionManager(AppService):
future.add_done_callback(on_complete_execution) # type: ignore
@expose
def add_execution(self, run_id: str, node_id: str, data: dict[str, Any]) -> str:
try:
execution = Execution(run_id=run_id, node_id=node_id, data=data)
self.__add_execution(execution)
return execution.id
except Exception as e:
raise Exception("Error adding execution ", e)
def add_execution(self, graph_id: str, data: dict[str, Any]) -> dict:
run_id = str(uuid.uuid4())
def __add_execution(self, execution: Execution) -> Execution:
agent = self.run_and_wait(get_graph(graph_id))
if not agent:
raise Exception(f"Agent #{graph_id} not found.")
# Currently, there is no constraint on the number of root nodes in the graph.
for node in agent.starting_nodes:
valid, error = self.run_and_wait(validate_exec(node, data))
if not valid:
raise Exception(error)
executions = []
for node in agent.starting_nodes:
exec_id = self.add_node_execution(
Execution(run_id=run_id, node_id=node.id, data=data)
)
executions.append({
"exec_id": exec_id,
"node_id": node.id,
})
return {
"run_id": run_id,
"executions": executions,
}
def add_node_execution(self, execution: Execution) -> Execution:
self.run_and_wait(enqueue_execution(execution))
return self.queue.add(execution)

View File

@@ -1,23 +1,82 @@
import logging
import time
from autogpt_server.util.service import AppService, expose
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from datetime import datetime
from autogpt_server.data import schedule as model
from autogpt_server.util.service import AppService, expose, get_service_client
from autogpt_server.executor.manager import ExecutionManager
logger = logging.getLogger(__name__)
def log(msg, **kwargs):
logger.warning("[ExecutionScheduler] " + msg, **kwargs)
class ExecutionScheduler(AppService):
def __init__(self, refresh_interval=10):
self.last_check = datetime.min
self.refresh_interval = refresh_interval
@property
def execution_manager_client(self):
return get_service_client(ExecutionManager)
def run_service(self):
scheduler = BackgroundScheduler()
scheduler.start()
while True:
time.sleep(1) # This will be replaced with apscheduler executor.
self.__refresh_jobs_from_db(scheduler)
time.sleep(self.refresh_interval)
def __refresh_jobs_from_db(self, scheduler: BackgroundScheduler):
schedules = self.run_and_wait(model.get_active_schedules(self.last_check))
for schedule in schedules:
self.last_check = max(self.last_check, schedule.last_updated)
if not schedule.is_enabled:
log(f"Removing recurring job {schedule.id}: {schedule.schedule}")
scheduler.remove_job(schedule.id)
continue
log(f"Adding recurring job {schedule.id}: {schedule.schedule}")
scheduler.add_job(
self.__execute_agent,
CronTrigger.from_crontab(schedule.schedule),
id=schedule.id,
args=[schedule.agent_id, schedule.input_data],
replace_existing=True,
)
def __execute_agent(self, agent_id: str, input_data: dict):
try:
log(f"Executing recurring job for agent #{agent_id}")
execution_manager = self.execution_manager_client
execution_manager.add_execution(agent_id, input_data)
except Exception as e:
logger.error(f"Error executing agent {agent_id}: {e}")
@expose
def update_schedule(self, schedule_id: str, is_enabled: bool) -> str:
self.run_and_wait(model.update_schedule(schedule_id, is_enabled))
return schedule_id
@expose
def add_execution_schedule(self, agent_id: str, cron: str, input_data: dict) -> str:
print(
f"Adding execution schedule for agent {agent_id} with cron {cron} and "
f"input data {input_data}"
schedule = model.ExecutionSchedule(
agent_id=agent_id,
schedule=cron,
input_data=input_data,
)
return "dummy_schedule_id"
self.run_and_wait(model.add_schedule(schedule))
return schedule.id
@expose
def get_execution_schedules(self, agent_id: str) -> list[dict]:
print(f"Getting execution schedules for agent {agent_id}")
return [{"cron": "dummy_cron", "input_data": {"dummy_input": "dummy_value"}}]
def get_execution_schedules(self, agent_id: str) -> dict[str, str]:
query = model.get_schedules(agent_id)
schedules: list[model.ExecutionSchedule] = self.run_and_wait(query)
return {v.id: v.schedule for v in schedules}

View File

@@ -34,66 +34,75 @@ class AgentServer(AppProcess):
router = APIRouter()
router.add_api_route(
path="/blocks",
endpoint=AgentServer.get_agent_blocks,
endpoint=self.get_agent_blocks,
methods=["GET"],
)
router.add_api_route(
path="/agents",
endpoint=AgentServer.get_agents,
endpoint=self.get_agents,
methods=["GET"],
)
router.add_api_route(
path="/agents/{agent_id}",
endpoint=AgentServer.get_agent,
endpoint=self.get_agent,
methods=["GET"],
)
router.add_api_route(
path="/agents",
endpoint=AgentServer.create_agent,
endpoint=self.create_agent,
methods=["POST"],
)
router.add_api_route(
path="/agents/{agent_id}/execute",
endpoint=AgentServer.execute_agent,
endpoint=self.execute_agent,
methods=["POST"],
)
router.add_api_route(
path="/agents/{agent_id}/executions/{run_id}",
endpoint=AgentServer.get_executions,
endpoint=self.get_executions,
methods=["GET"],
)
router.add_api_route(
path="/agents/{agent_id}/schedules",
endpoint=AgentServer.schedule_agent,
endpoint=self.schedule_agent,
methods=["POST"],
)
router.add_api_route(
path="/agents/{agent_id}/schedules",
endpoint=AgentServer.get_execution_schedules,
endpoint=self.get_execution_schedules,
methods=["GET"],
)
router.add_api_route(
path="/agents/schedules/{schedule_id}",
endpoint=self.update_schedule,
methods=["PUT"],
)
app.include_router(router)
uvicorn.run(app, host="0.0.0.0", port=8000)
@staticmethod
async def get_agent_blocks() -> list[dict]:
@property
def execution_manager_client(self) -> ExecutionManager:
return get_service_client(ExecutionManager)
@property
def execution_scheduler_client(self) -> ExecutionScheduler:
return get_service_client(ExecutionScheduler)
async def get_agent_blocks(self) -> list[dict]:
return [v.to_dict() for v in await block.get_blocks()]
@staticmethod
async def get_agents() -> list[str]:
async def get_agents(self) -> list[str]:
return await graph.get_graph_ids()
@staticmethod
async def get_agent(agent_id: str) -> graph.Graph:
async def get_agent(self, agent_id: str) -> graph.Graph:
agent = await graph.get_graph(agent_id)
if not agent:
raise HTTPException(status_code=404, detail=f"Agent #{agent_id} not found.")
return agent
@staticmethod
async def create_agent(agent: graph.Graph) -> graph.Graph:
async def create_agent(self, agent: graph.Graph) -> graph.Graph:
agent.id = str(uuid.uuid4())
id_map = {node.id: str(uuid.uuid4()) for node in agent.nodes}
@@ -104,62 +113,36 @@ class AgentServer(AppProcess):
return await graph.create_graph(agent)
@staticmethod
async def execute_agent(agent_id: str, node_input: dict) -> dict:
agent = await graph.get_graph(agent_id)
if not agent:
raise HTTPException(status_code=404, detail=f"Agent #{agent_id} not found.")
async def execute_agent(self, agent_id: str, node_input: dict) -> dict:
try:
return self.execution_manager_client.add_execution(agent_id, node_input)
except Exception as e:
msg = e.__str__().encode().decode('unicode_escape')
raise HTTPException(status_code=400, detail=msg)
run_id = str(uuid.uuid4())
executions = []
execution_manager = get_service_client(ExecutionManager)
# Currently, there is no constraint on the number of root nodes in the graph.
for node in agent.starting_nodes:
node_block = await block.get_block(node.block_id)
if not node_block:
raise HTTPException(
status_code=404,
detail=f"Block #{node.block_id} not found.",
)
if error := node_block.input_schema.validate_data(node_input):
raise HTTPException(
status_code=400,
detail=f"Input data doesn't match {node_block.name} input: {error}",
)
exec_id = execution_manager.add_execution(
run_id=run_id, node_id=node.id, data=node_input
)
executions.append({
"exec_id": exec_id,
"node_id": node.id,
})
return {
"run_id": run_id,
"executions": executions,
}
@staticmethod
async def get_executions(
agent_id: str,
run_id: str
) -> list[execution.ExecutionResult]:
self, agent_id: str, run_id: str) -> list[execution.ExecutionResult]:
agent = await graph.get_graph(agent_id)
if not agent:
raise HTTPException(status_code=404, detail=f"Agent #{agent_id} not found.")
return await execution.get_executions(run_id)
@staticmethod
def schedule_agent(agent_id: str, cron: str, input_data: dict) -> dict:
execution_scheduler = get_service_client(ExecutionScheduler)
async def schedule_agent(self, agent_id: str, cron: str, input_data: dict) -> dict:
agent = await graph.get_graph(agent_id)
if not agent:
raise HTTPException(status_code=404, detail=f"Agent #{agent_id} not found.")
execution_scheduler = self.execution_scheduler_client
return {
"id": execution_scheduler.add_execution_schedule(agent_id, cron, input_data)
}
@staticmethod
def get_execution_schedules(agent_id: str) -> list[dict]:
execution_scheduler = get_service_client(ExecutionScheduler)
def update_schedule(self, schedule_id: str, input_data: dict) -> dict:
execution_scheduler = self.execution_scheduler_client
is_enabled = input_data.get("is_enabled", False)
execution_scheduler.update_schedule(schedule_id, is_enabled)
return {"id": schedule_id}
def get_execution_schedules(self, agent_id: str) -> dict[str, str]:
execution_scheduler = self.execution_scheduler_client
return execution_scheduler.get_execution_schedules(agent_id)

View File

@@ -15,7 +15,18 @@ from autogpt_server.util.process import AppProcess
logger = logging.getLogger(__name__)
conn_retry = retry(stop=stop_after_delay(5), wait=wait_exponential(multiplier=0.1))
expose = pyro.expose
def expose(func: Callable) -> Callable:
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
msg = f"Error in {func.__name__}: {e.__str__()}"
logger.error(msg)
raise Exception(msg, e)
return pyro.expose(wrapper)
class PyroNameServer(AppProcess):
@@ -28,7 +39,6 @@ class PyroNameServer(AppProcess):
class AppService(AppProcess):
shared_event_loop: asyncio.AbstractEventLoop
@classmethod

View File

@@ -33,6 +33,34 @@ doc = ["Sphinx (>=7)", "packaging", "sphinx-autodoc-typehints (>=1.2.0)", "sphin
test = ["anyio[trio]", "coverage[toml] (>=7)", "exceptiongroup (>=1.2.0)", "hypothesis (>=4.0)", "psutil (>=5.9)", "pytest (>=7.0)", "pytest-mock (>=3.6.1)", "trustme", "uvloop (>=0.17)"]
trio = ["trio (>=0.23)"]
[[package]]
name = "apscheduler"
version = "3.10.4"
description = "In-process task scheduler with Cron-like capabilities"
optional = false
python-versions = ">=3.6"
files = [
{file = "APScheduler-3.10.4-py3-none-any.whl", hash = "sha256:fb91e8a768632a4756a585f79ec834e0e27aad5860bac7eaa523d9ccefd87661"},
{file = "APScheduler-3.10.4.tar.gz", hash = "sha256:e6df071b27d9be898e486bc7940a7be50b4af2e9da7c08f0744a96d4bd4cef4a"},
]
[package.dependencies]
pytz = "*"
six = ">=1.4.0"
tzlocal = ">=2.0,<3.dev0 || >=4.dev0"
[package.extras]
doc = ["sphinx", "sphinx-rtd-theme"]
gevent = ["gevent"]
mongodb = ["pymongo (>=3.0)"]
redis = ["redis (>=3.0)"]
rethinkdb = ["rethinkdb (>=2.4.0)"]
sqlalchemy = ["sqlalchemy (>=1.4)"]
testing = ["pytest", "pytest-asyncio", "pytest-cov", "pytest-tornado5"]
tornado = ["tornado (>=4.3)"]
twisted = ["twisted"]
zookeeper = ["kazoo"]
[[package]]
name = "attrs"
version = "23.2.0"
@@ -88,6 +116,21 @@ files = [
{file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"},
]
[[package]]
name = "croniter"
version = "2.0.5"
description = "croniter provides iteration for datetime object with cron like format"
optional = false
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,>=2.6"
files = [
{file = "croniter-2.0.5-py2.py3-none-any.whl", hash = "sha256:fdbb44920944045cc323db54599b321325141d82d14fa7453bc0699826bbe9ed"},
{file = "croniter-2.0.5.tar.gz", hash = "sha256:f1f8ca0af64212fbe99b1bee125ee5a1b53a9c1b433968d8bca8817b79d237f3"},
]
[package.dependencies]
python-dateutil = "*"
pytz = ">2021.1"
[[package]]
name = "cx-freeze"
version = "7.0.0"
@@ -850,6 +893,24 @@ tomli = {version = ">=1", markers = "python_version < \"3.11\""}
[package.extras]
dev = ["argcomplete", "attrs (>=19.2)", "hypothesis (>=3.56)", "mock", "pygments (>=2.7.2)", "requests", "setuptools", "xmlschema"]
[[package]]
name = "pytest-asyncio"
version = "0.23.7"
description = "Pytest support for asyncio"
optional = false
python-versions = ">=3.8"
files = [
{file = "pytest_asyncio-0.23.7-py3-none-any.whl", hash = "sha256:009b48127fbe44518a547bddd25611551b0e43ccdbf1e67d12479f569832c20b"},
{file = "pytest_asyncio-0.23.7.tar.gz", hash = "sha256:5f5c72948f4c49e7db4f29f2521d4031f1c27f86e57b046126654083d4770268"},
]
[package.dependencies]
pytest = ">=7.0.0,<9"
[package.extras]
docs = ["sphinx (>=5.3)", "sphinx-rtd-theme (>=1.0)"]
testing = ["coverage (>=6.2)", "hypothesis (>=5.7.1)"]
[[package]]
name = "pytest-watcher"
version = "0.4.2"
@@ -865,6 +926,20 @@ files = [
tomli = {version = ">=2.0.1,<3.0.0", markers = "python_version < \"3.11\""}
watchdog = ">=2.0.0"
[[package]]
name = "python-dateutil"
version = "2.9.0.post0"
description = "Extensions to the standard Python datetime module"
optional = false
python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,>=2.7"
files = [
{file = "python-dateutil-2.9.0.post0.tar.gz", hash = "sha256:37dd54208da7e1cd875388217d5e00ebd4179249f90fb72437e91a35459a0ad3"},
{file = "python_dateutil-2.9.0.post0-py2.py3-none-any.whl", hash = "sha256:a8b2bc7bffae282281c8140a97d3aa9c14da0b136dfe83f850eea9a5f7470427"},
]
[package.dependencies]
six = ">=1.5"
[[package]]
name = "python-dotenv"
version = "1.0.1"
@@ -879,6 +954,17 @@ files = [
[package.extras]
cli = ["click (>=5.0)"]
[[package]]
name = "pytz"
version = "2024.1"
description = "World timezone definitions, modern and historical"
optional = false
python-versions = "*"
files = [
{file = "pytz-2024.1-py2.py3-none-any.whl", hash = "sha256:328171f4e3623139da4983451950b28e95ac706e13f3f2630a879749e7a8b319"},
{file = "pytz-2024.1.tar.gz", hash = "sha256:2a29735ea9c18baf14b448846bde5a48030ed267578472d8955cd0e7443a9812"},
]
[[package]]
name = "pyyaml"
version = "6.0.1"
@@ -1115,6 +1201,17 @@ docs = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "pygments
testing = ["build[virtualenv]", "filelock (>=3.4.0)", "importlib-metadata", "ini2toml[lite] (>=0.9)", "jaraco.develop (>=7.21)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "mypy (==1.9)", "packaging (>=23.2)", "pip (>=19.1)", "pytest (>=6,!=8.1.1)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-home (>=0.5)", "pytest-mypy", "pytest-perf", "pytest-ruff (>=0.2.1)", "pytest-timeout", "pytest-xdist (>=3)", "tomli", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel"]
testing-integration = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "packaging (>=23.2)", "pytest", "pytest-enabler", "pytest-xdist", "tomli", "virtualenv (>=13.0.0)", "wheel"]
[[package]]
name = "six"
version = "1.16.0"
description = "Python 2 and 3 compatibility utilities"
optional = false
python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*"
files = [
{file = "six-1.16.0-py2.py3-none-any.whl", hash = "sha256:8abb2f1d86890a2dfb989f9a77cfcfd3e47c2a354b01111771326f8aa26e0254"},
{file = "six-1.16.0.tar.gz", hash = "sha256:1e61c37477a1626458e36f7b1d82aa5c9b094fa4802892072e49de9c60c4c926"},
]
[[package]]
name = "sniffio"
version = "1.3.1"
@@ -1207,6 +1304,34 @@ files = [
{file = "typing_extensions-4.12.2.tar.gz", hash = "sha256:1a7ead55c7e559dd4dee8856e3a88b41225abfe1ce8df57b7c13915fe121ffb8"},
]
[[package]]
name = "tzdata"
version = "2024.1"
description = "Provider of IANA time zone data"
optional = false
python-versions = ">=2"
files = [
{file = "tzdata-2024.1-py2.py3-none-any.whl", hash = "sha256:9068bc196136463f5245e51efda838afa15aaeca9903f49050dfa2679db4d252"},
{file = "tzdata-2024.1.tar.gz", hash = "sha256:2674120f8d891909751c38abcdfd386ac0a5a1127954fbc332af6b5ceae07efd"},
]
[[package]]
name = "tzlocal"
version = "5.2"
description = "tzinfo object for the local timezone"
optional = false
python-versions = ">=3.8"
files = [
{file = "tzlocal-5.2-py3-none-any.whl", hash = "sha256:49816ef2fe65ea8ac19d19aa7a1ae0551c834303d5014c6d5a62e4cbda8047b8"},
{file = "tzlocal-5.2.tar.gz", hash = "sha256:8d399205578f1a9342816409cc1e46a93ebd5755e39ea2d85334bea911bf0e6e"},
]
[package.dependencies]
tzdata = {version = "*", markers = "platform_system == \"Windows\""}
[package.extras]
devenv = ["check-manifest", "pytest (>=4.3)", "pytest-cov", "pytest-mock (>=3.3)", "zest.releaser"]
[[package]]
name = "uvicorn"
version = "0.30.1"
@@ -1506,4 +1631,4 @@ test = ["pytest (>=6.0.0)", "setuptools (>=65)"]
[metadata]
lock-version = "2.0"
python-versions = "^3.10"
content-hash = "de508427e9804ded3b3139e13f209baa6cc97bc138d83952ad2b129d3aedc4e2"
content-hash = "17f25b61da5f54bb4bb13cecfedda56d23c097aacb95bb213f13ce63ee08c761"

View File

@@ -22,6 +22,9 @@ jsonschema = "^4.22.0"
psutil = "^5.9.8"
pyro5 = "^5.15"
tenacity = "^8.3.0"
apscheduler = "^3.10.4"
croniter = "^2.0.5"
pytest-asyncio = "^0.23.7"
[tool.poetry.group.dev.dependencies]
@@ -80,3 +83,6 @@ runner = "pytest"
runner_args = []
patterns = ["*.py"]
ignore_patterns = []
[tool.pytest.ini_options]
asyncio_mode = "auto"

View File

@@ -15,7 +15,8 @@ model AgentGraph {
name String?
description String?
AgentNodes AgentNode[] @relation("AgentGraphNodes")
AgentNodes AgentNode[] @relation("AgentGraphNodes")
AgentExecutionSchedule AgentExecutionSchedule[]
}
// This model describes a single node in the Agent Graph/Flow (Multi Agent System).
@@ -108,3 +109,20 @@ model FileDefinition {
ReferencedByInputFiles AgentNodeExecution[] @relation("InputFiles")
ReferencedByOutputFiles AgentNodeExecution[] @relation("OutputFiles")
}
// This model describes the recurring execution schedule of an Agent.
model AgentExecutionSchedule {
id String @id
agentGraphId String
AgentGraph AgentGraph @relation(fields: [agentGraphId], references: [id])
schedule String // cron expression
isEnabled Boolean @default(true)
inputData String // JSON serialized object
// default and set the value on each update, lastUpdated field has no time zone.
lastUpdated DateTime @updatedAt
@@index([isEnabled])
}

View File

@@ -1,6 +1,7 @@
import asyncio
import time
import pytest
from autogpt_server.data import block, db, execution, graph
from autogpt_server.executor import ExecutionManager
from autogpt_server.server import AgentServer
@@ -44,28 +45,29 @@ async def create_test_graph() -> graph.Graph:
return test_graph
def execute_agent(test_manager: ExecutionManager, test_graph: graph.Graph, wait_db):
async def execute_agent(test_manager: ExecutionManager, test_graph: graph.Graph):
# --- Test adding new executions --- #
text = "Hello, World!"
input_data = {"input": text}
response = wait_db(AgentServer.execute_agent(test_graph.id, input_data))
agent_server = AgentServer()
response = await agent_server.execute_agent(test_graph.id, input_data)
executions = response["executions"]
run_id = response["run_id"]
assert len(executions) == 2
async def is_execution_completed():
execs = await AgentServer.get_executions(test_graph.id, run_id)
execs = await agent_server.get_executions(test_graph.id, run_id)
return test_manager.queue.empty() and len(execs) == 4
# Wait for the executions to complete
for i in range(10):
if wait_db(is_execution_completed()):
if await is_execution_completed():
break
time.sleep(1)
# Execution queue should be empty
assert wait_db(is_execution_completed())
executions = wait_db(AgentServer.get_executions(test_graph.id, run_id))
assert await is_execution_completed()
executions = await agent_server.get_executions(test_graph.id, run_id)
# Executing ParrotBlock1
exec = executions[0]
@@ -108,14 +110,10 @@ def execute_agent(test_manager: ExecutionManager, test_graph: graph.Graph, wait_
assert exec.node_id == test_graph.nodes[3].id
def test_agent_execution():
@pytest.mark.asyncio(scope="session")
async def test_agent_execution():
with PyroNameServer():
time.sleep(0.5)
with ExecutionManager(1) as test_manager:
loop = asyncio.new_event_loop()
wait = loop.run_until_complete
wait(db.connect())
test_graph = wait(create_test_graph())
execute_agent(test_manager, test_graph, wait)
await db.connect()
test_graph = await create_test_graph()
await execute_agent(test_manager, test_graph)

View File

@@ -0,0 +1,33 @@
import pytest
import test_manager
from autogpt_server.executor.scheduler import ExecutionScheduler
from autogpt_server.util.service import PyroNameServer, get_service_client
@pytest.mark.asyncio(scope="session")
async def test_agent_schedule():
await test_manager.db.connect()
test_graph = await test_manager.create_test_graph()
with PyroNameServer():
with ExecutionScheduler():
scheduler = get_service_client(ExecutionScheduler)
schedules = scheduler.get_execution_schedules(test_graph.id)
assert len(schedules) == 0
schedule_id = scheduler.add_execution_schedule(
test_graph.id,
"0 0 * * *",
{"input": "data"}
)
assert schedule_id
schedules = scheduler.get_execution_schedules(test_graph.id)
assert len(schedules) == 1
assert schedules[schedule_id] == "0 0 * * *"
scheduler.update_schedule(schedule_id, is_enabled=False)
schedules = scheduler.get_execution_schedules(test_graph.id)
assert len(schedules) == 0

View File

@@ -1,5 +1,3 @@
import time
from autogpt_server.util.service import (
AppService,
PyroNameServer,
@@ -30,7 +28,6 @@ class TestService(AppService):
def test_service_creation():
with PyroNameServer():
time.sleep(0.5)
with TestService():
client = get_service_client(TestService)
assert client.add(5, 3) == 8