mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-01-09 15:17:59 -05:00
feat(rnd): add FastAPI support to existing project outline (#7165)
### Background ###### Project Outline Currently, the project mainly consists of these components: *agent_api* A component that will expose API endpoints for the creation & execution of agents. This component will make connections to the database to persist and read the agents. It will also trigger the agent execution by pushing its execution request to the ExecutionQueue. *agent_executor* A component that will execute the agents. This component will be a pool of processes/threads that will consume the ExecutionQueue and execute the agent accordingly. The result and progress of its execution will be persisted in the database. ###### How to test Execute `poetry run app`. Access the swagger page `http://localhost:8000/docs`, there is one API to trigger an execution of one dummy slow task, you fire the API a couple of times and see the `agent_executor` executes the multiple slow tasks concurrently by the pool of Python processes. The pool size is currently set to `5` (hardcoded in app.py, the code entry point). ##### Changes 🏗️ * Initialize FastAPI for the AutoGPT server project. * Reduced number of queues to 1 and abstracted into `ExecutionQueue` class. * Reduced the number of main components into two `api` and `executor`.
This commit is contained in:
20
rnd/autogpt_server/README.md
Normal file
20
rnd/autogpt_server/README.md
Normal file
@@ -0,0 +1,20 @@
|
||||
# Next Gen AutoGPT
|
||||
|
||||
This is a research project into creating the next generation of autogpt, which is an autogpt agent server.
|
||||
|
||||
The agent server will enable the creation of composite multi-agent system that utilize AutoGPT Agent as its default agent.
|
||||
|
||||
|
||||
## Project Outline
|
||||
|
||||
Currently the project mainly consist of these components:
|
||||
|
||||
*agent_api*
|
||||
A component that will expose API endpoints for the creation & execution of agents.
|
||||
This component will make connections to the database to persist and read the agents.
|
||||
It will also trigger the agent execution by pushing its execution request to the ExecutionQueue.
|
||||
|
||||
*agent_executor*
|
||||
A component that will execute the agents.
|
||||
This component will be a pool of processes/threads that will consume the ExecutionQueue and execute the agent accordingly.
|
||||
The result and progress of its execution will be persisted in the database.
|
||||
1
rnd/autogpt_server/autogpt_server/agent_api/__init__.py
Normal file
1
rnd/autogpt_server/autogpt_server/agent_api/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
from .server import start_server # noqa
|
||||
39
rnd/autogpt_server/autogpt_server/agent_api/server.py
Normal file
39
rnd/autogpt_server/autogpt_server/agent_api/server.py
Normal file
@@ -0,0 +1,39 @@
|
||||
import uvicorn
|
||||
from fastapi import FastAPI, APIRouter
|
||||
|
||||
from autogpt_server.data import ExecutionQueue
|
||||
|
||||
|
||||
class AgentServer:
|
||||
|
||||
def __init__(self, queue: ExecutionQueue):
|
||||
self.app = FastAPI(
|
||||
title="AutoGPT Agent Server",
|
||||
description=(
|
||||
"This server is used to execute agents that are created by the "
|
||||
"AutoGPT system."
|
||||
),
|
||||
summary="AutoGPT Agent Server",
|
||||
version="0.1",
|
||||
)
|
||||
self.execution_queue = queue
|
||||
|
||||
# Define the API routes
|
||||
self.router = APIRouter()
|
||||
self.router.add_api_route(
|
||||
path="/agents/{agent_id}/execute",
|
||||
endpoint=self.execute_agent,
|
||||
methods=["POST"],
|
||||
)
|
||||
self.app.include_router(self.router)
|
||||
|
||||
def execute_agent(self, agent_id: str):
|
||||
execution_id = self.execution_queue.add(agent_id)
|
||||
return {"execution_id": execution_id, "agent_id": agent_id}
|
||||
|
||||
|
||||
def start_server(queue: ExecutionQueue, use_uvicorn: bool = True):
|
||||
app = AgentServer(queue).app
|
||||
if use_uvicorn:
|
||||
uvicorn.run(app)
|
||||
return app
|
||||
@@ -0,0 +1 @@
|
||||
from .executor import start_executors # noqa
|
||||
42
rnd/autogpt_server/autogpt_server/agent_executor/executor.py
Normal file
42
rnd/autogpt_server/autogpt_server/agent_executor/executor.py
Normal file
@@ -0,0 +1,42 @@
|
||||
import logging
|
||||
import time
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from multiprocessing import Process
|
||||
|
||||
from autogpt_server.data import ExecutionQueue
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AgentExecutor:
|
||||
# TODO: Replace this by an actual Agent Execution.
|
||||
def __execute(id: str, data: str) -> None:
|
||||
logger.warning(f"Executor processing started, execution_id: {id}, data: {data}")
|
||||
for i in range(5):
|
||||
logger.warning(
|
||||
f"Executor processing step {i}, execution_id: {id}, data: {data}"
|
||||
)
|
||||
time.sleep(1)
|
||||
logger.warning(
|
||||
f"Executor processing completed, execution_id: {id}, data: {data}"
|
||||
)
|
||||
|
||||
def start_executor(pool_size: int, queue: ExecutionQueue) -> None:
|
||||
with ThreadPoolExecutor(max_workers=pool_size) as executor:
|
||||
while True:
|
||||
execution = queue.get()
|
||||
if not execution:
|
||||
time.sleep(1)
|
||||
continue
|
||||
executor.submit(
|
||||
AgentExecutor.__execute,
|
||||
execution.execution_id,
|
||||
execution.data,
|
||||
)
|
||||
|
||||
|
||||
def start_executors(pool_size: int, queue: ExecutionQueue) -> None:
|
||||
executor_process = Process(
|
||||
target=AgentExecutor.start_executor, args=(pool_size, queue)
|
||||
)
|
||||
executor_process.start()
|
||||
13
rnd/autogpt_server/autogpt_server/app.py
Normal file
13
rnd/autogpt_server/autogpt_server/app.py
Normal file
@@ -0,0 +1,13 @@
|
||||
from autogpt_server.agent_api import start_server
|
||||
from autogpt_server.agent_executor import start_executors
|
||||
from autogpt_server.data import ExecutionQueue
|
||||
|
||||
|
||||
def main() -> None:
|
||||
queue = ExecutionQueue()
|
||||
start_executors(5, queue)
|
||||
start_server(queue)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
36
rnd/autogpt_server/autogpt_server/data.py
Normal file
36
rnd/autogpt_server/autogpt_server/data.py
Normal file
@@ -0,0 +1,36 @@
|
||||
import uuid
|
||||
from multiprocessing import Queue
|
||||
|
||||
|
||||
class Execution:
|
||||
"""Data model for an execution of an Agent"""
|
||||
|
||||
def __init__(self, execution_id: str, data: str):
|
||||
self.execution_id = execution_id
|
||||
self.data = data
|
||||
|
||||
|
||||
# TODO: This shared class make api & executor coupled in one machine.
|
||||
# Replace this with a persistent & remote-hosted queue.
|
||||
# One very likely candidate would be persisted Redis (Redis Queue).
|
||||
# It will also open the possibility of using it for other purposes like
|
||||
# caching, execution engine broker (like Celery), user session management etc.
|
||||
class ExecutionQueue:
|
||||
"""
|
||||
Queue for managing the execution of agents.
|
||||
This will be shared between different processes
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.queue = Queue()
|
||||
|
||||
def add(self, data: str) -> str:
|
||||
execution_id = uuid.uuid4()
|
||||
self.queue.put(Execution(str(execution_id), data))
|
||||
return str(execution_id)
|
||||
|
||||
def get(self) -> Execution | None:
|
||||
return self.queue.get()
|
||||
|
||||
def empty(self) -> bool:
|
||||
return self.queue.empty()
|
||||
1151
rnd/autogpt_server/poetry.lock
generated
Normal file
1151
rnd/autogpt_server/poetry.lock
generated
Normal file
File diff suppressed because it is too large
Load Diff
22
rnd/autogpt_server/pyproject.toml
Normal file
22
rnd/autogpt_server/pyproject.toml
Normal file
@@ -0,0 +1,22 @@
|
||||
[tool.poetry]
|
||||
name = "autogpt_server"
|
||||
version = "0.1.0"
|
||||
description = ""
|
||||
authors = ["SwiftyOS <craigswift13@gmail.com>"]
|
||||
readme = "README.md"
|
||||
|
||||
[tool.poetry.dependencies]
|
||||
python = "^3.10"
|
||||
click = "^8.1.7"
|
||||
pydantic = "^2.7.1"
|
||||
pytest = "^8.2.1"
|
||||
uvicorn = "^0.30.1"
|
||||
fastapi = "^0.111.0"
|
||||
|
||||
|
||||
[build-system]
|
||||
requires = ["poetry-core"]
|
||||
build-backend = "poetry.core.masonry.api"
|
||||
|
||||
[tool.poetry.scripts]
|
||||
app = "autogpt_server.app:main"
|
||||
30
rnd/autogpt_server/test/test_app.py
Normal file
30
rnd/autogpt_server/test/test_app.py
Normal file
@@ -0,0 +1,30 @@
|
||||
import pytest
|
||||
|
||||
from autogpt_server.data import ExecutionQueue
|
||||
from autogpt_server.agent_api import start_server
|
||||
from autogpt_server.agent_executor import start_executors
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def client():
|
||||
execution_queue = ExecutionQueue()
|
||||
start_executors(5, execution_queue)
|
||||
return TestClient(start_server(execution_queue, use_uvicorn=False))
|
||||
|
||||
|
||||
def test_execute_agent(client):
|
||||
# Assert API is working
|
||||
response = client.post("/agents/dummy_agent_1/execute")
|
||||
assert response.status_code == 200
|
||||
|
||||
# Assert response is correct
|
||||
data = response.json()
|
||||
exec_id = data["execution_id"]
|
||||
agent_id = data["agent_id"]
|
||||
assert agent_id == "dummy_agent_1"
|
||||
assert isinstance(exec_id, str)
|
||||
assert len(exec_id) == 36
|
||||
|
||||
# TODO: Add assertion that the executor is executed after some time
|
||||
# Add this when db integration is done.
|
||||
Reference in New Issue
Block a user