fix(agent server): Updated function names and type checking (#7185)

* fix agent server

* renamed functions

* simplified dir naming
This commit is contained in:
Swifty
2024-06-04 11:02:38 +02:00
committed by GitHub
parent 246bc850a1
commit e3a5663a05
10 changed files with 50 additions and 55 deletions

View File

@@ -1 +0,0 @@
from .server import start_server # noqa

View File

@@ -1 +0,0 @@
from .executor import start_executors # noqa

View File

@@ -1,42 +0,0 @@
import logging
import time
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import Process
from autogpt_server.data import ExecutionQueue
logger = logging.getLogger(__name__)
class AgentExecutor:
# TODO: Replace this by an actual Agent Execution.
def __execute(id: str, data: str) -> None:
logger.warning(f"Executor processing started, execution_id: {id}, data: {data}")
for i in range(5):
logger.warning(
f"Executor processing step {i}, execution_id: {id}, data: {data}"
)
time.sleep(1)
logger.warning(
f"Executor processing completed, execution_id: {id}, data: {data}"
)
def start_executor(pool_size: int, queue: ExecutionQueue) -> None:
with ThreadPoolExecutor(max_workers=pool_size) as executor:
while True:
execution = queue.get()
if not execution:
time.sleep(1)
continue
executor.submit(
AgentExecutor.__execute,
execution.execution_id,
execution.data,
)
def start_executors(pool_size: int, queue: ExecutionQueue) -> None:
executor_process = Process(
target=AgentExecutor.start_executor, args=(pool_size, queue)
)
executor_process.start()

View File

@@ -1,11 +1,11 @@
from autogpt_server.agent_api import start_server
from autogpt_server.agent_executor import start_executors
from autogpt_server.server import start_server
from autogpt_server.executor import start_executor_manager
from autogpt_server.data import ExecutionQueue
def main() -> None:
queue = ExecutionQueue()
start_executors(5, queue)
start_executor_manager(5, queue)
start_server(queue)

View File

@@ -22,7 +22,7 @@ class ExecutionQueue:
"""
def __init__(self):
self.queue = Queue()
self.queue: Queue[Execution] = Queue()
def add(self, data: str) -> str:
execution_id = uuid.uuid4()

View File

@@ -0,0 +1 @@
from .executor import start_executor_manager # type: ignore # noqa

View File

@@ -0,0 +1,38 @@
import logging
import time
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import Process
from autogpt_server.data import Execution, ExecutionQueue
logger = logging.getLogger(__name__)
# TODO: Replace this by an actual Agent Execution.
def execute_node(id: str, data: str) -> None:
logger.warning(f"Executor processing started, execution_id: {id}, data: {data}")
for i in range(5):
logger.warning(
f"Executor processing step {i}, execution_id: {id}, data: {data}"
)
time.sleep(1)
logger.warning(f"Executor processing completed, execution_id: {id}, data: {data}")
def start_executor(pool_size: int, queue: ExecutionQueue) -> None:
with ThreadPoolExecutor(max_workers=pool_size) as executor:
while True:
execution: Execution | None = queue.get()
if not execution:
time.sleep(1)
continue
executor.submit(
execute_node,
execution.execution_id,
execution.data,
) # type: ignore
def start_executor_manager(pool_size: int, queue: ExecutionQueue) -> None:
executor_process = Process(target=start_executor, args=(pool_size, queue))
executor_process.start()

View File

@@ -0,0 +1 @@
from .server import start_server # type: ignore # noqa

View File

@@ -5,7 +5,6 @@ from autogpt_server.data import ExecutionQueue
class AgentServer:
def __init__(self, queue: ExecutionQueue):
self.app = FastAPI(
title="AutoGPT Agent Server",

View File

@@ -1,19 +1,19 @@
import pytest
from autogpt_server.data import ExecutionQueue
from autogpt_server.agent_api import start_server
from autogpt_server.agent_executor import start_executors
from fastapi.testclient import TestClient
from autogpt_server.server import start_server
from autogpt_server.executor import start_executor_manager
from autogpt_server.data import ExecutionQueue
@pytest.fixture
def client():
execution_queue = ExecutionQueue()
start_executors(5, execution_queue)
start_executor_manager(5, execution_queue)
return TestClient(start_server(execution_queue, use_uvicorn=False))
def test_execute_agent(client):
def test_execute_agent(client: TestClient):
# Assert API is working
response = client.post("/agents/dummy_agent_1/execute")
assert response.status_code == 200