diff --git a/rnd/autogpt_server/README.md b/rnd/autogpt_server/README.md
index 6aa15c42c1..d77a39ca35 100644
--- a/rnd/autogpt_server/README.md
+++ b/rnd/autogpt_server/README.md
@@ -1,100 +1,102 @@
-# Next Gen AutoGPT
+# AutoGPT Agent Server
-This is a research project into creating the next generation of autogpt, which is an autogpt agent server.
-
-The agent server will enable the creation of composite multi-agent system that utilize AutoGPT Agent as its default agent.
+This is an initial project for creating the next generation of agent execution, which is an AutoGPT agent server.
+The agent server will enable the creation of composite multi-agent systems that utilize AutoGPT agents and other non-agent components as its primitives.
## Setup
-This setup is for MacOS/Linux.
-To setup the project follow these steps inside the project directory:
+To set up the project, follow these steps inside the project directory:
-1. Enter poetry shell
- ```
+1. Enter the poetry shell
+
+ ```sh
poetry shell
```
-
+
2. Install dependencies
- ```
+
+ ```sh
poetry install
```
+
+3. Generate the Prisma client
-3. Generate prisma client
- ```
+ ```sh
poetry run prisma generate
```
+
+ In case Prisma generates the client for the global Python installation instead of the virtual environment, the current mitigation is to just uninstall the global Prisma package:
- In case prisma generates client for the global python installation instead of the virtual environment the current mitigation is to just uninstall the global prisma package:
- ```
+ ```sh
pip uninstall prisma
```
-
- And then run the generation again.
- The path *should* look something like this:
+
+ Then run the generation again. The path *should* look something like this:
`/pypoetry/virtualenvs/autogpt-server-TQIRSwR6-py3.12/bin/prisma`
-4. Migrate the database, be careful because this deletes current data in the database
- ```
+4. Migrate the database. Be careful because this deletes current data in the database.
+
+ ```sh
poetry run prisma migrate dev
```
+
+## Running The Server
-# Running The Server
-
-## Starting the server directly
+### Starting the server directly
Run the following command:
-```
+```sh
poetry run app
```
-## Running the App in the Background
+### Running the App in the Background
-1. Start the server, this starts the server in the background
- ```
+**Note: this is a Unix feature and can fail on Windows. If it fails, you can run the previous command and manually move the process to the background.*
+
+1. Start the server. This starts the server in the background.
+
+ ```sh
poetry run cli start
```
-
- You may need to change the permissions of the file to make it executable
- ```
- chmod +x autogpt_server/cli.py
- ```
+
+2. Stop the server.
-2. Stop the server
- ```
+ ```sh
poetry run cli stop
```
-
+
## Adding Test Data
-1. Start the server using 1 of the above methods
+1. Start the server using one of the above methods.
-2. Run the populate db command
+2. Run the populate DB command:
-```
-poetry run cli test populate-db http://0.0.0.0:8000
-```
-
-This will add a graph, a graph execution and a cron schedule to run the graph every 5 mins
+ ```sh
+ poetry run cli test populate-db http://0.0.0.0:8000
+ ```
+
+ This will add a graph, a graph execution, and a cron schedule to run the graph every 5 minutes.
### Reddit Graph
-There is a command to add the test reddit graph
+There is a command to add the test Reddit graph:
-```
+```sh
poetry run cli test reddit http://0.0.0.0:8000
```
-For help run:
-```
+For help, run:
+
+```sh
poetry run cli test reddit --help
-
```
-# Testing
+## Testing
-To run the tests
-```
+To run the tests:
+
+```sh
poetry run pytest
```
@@ -104,37 +106,37 @@ The current project has the following main modules:
### **blocks**
-This module stores all the Agent Block, a reusable component to build a graph that represents the agent's behavior.
+This module stores all the Agent Blocks, which are reusable components to build a graph that represents the agent's behavior.
### **data**
This module stores the logical model that is persisted in the database.
-This module abstracts the database operation into a function that can be called by the service layer.
-Any code that interacts with Prisma objects or databases should live in this module.
+It abstracts the database operations into functions that can be called by the service layer.
+Any code that interacts with Prisma objects or the database should reside in this module.
The main models are:
* `block`: anything related to the block used in the graph
* `execution`: anything related to the execution graph execution
-* `graph`: anything related to the graph, node, and its relation
+* `graph`: anything related to the graph, node, and its relations
### **execution**
This module stores the business logic of executing the graph.
It currently has the following main modules:
-* `manager`: A service that consumes the queue of the graph execution and executes the graph. It contains both of the logic.
-* `scheduler`: A service that triggers scheduled graph execution based on cron expression. It will push an execution request to the manager.
+* `manager`: A service that consumes the queue of the graph execution and executes the graph. It contains both pieces of logic.
+* `scheduler`: A service that triggers scheduled graph execution based on a cron expression. It pushes an execution request to the manager.
### **server**
This module stores the logic for the server API.
-It stores all the logic used for the API that allows the client to create/execute/monitor the graph and its execution.
-This API service will interact with other services like the ones defined in `manager` and `scheduler`.
+It contains all the logic used for the API that allows the client to create, execute, and monitor the graph and its execution.
+This API service interacts with other services like those defined in `manager` and `scheduler`.
### **utils**
-This module stores the utility functions that are used across the project.
-Currently, it only has two main modules:
+This module stores utility functions that are used across the project.
+Currently, it has two main modules:
* `process`: A module that contains the logic to spawn a new process.
-* `service`: A module that becomes a parent class for all the services in the project.
+* `service`: A module that serves as a parent class for all the services in the project.
## Service Communication
@@ -144,18 +146,18 @@ Currently, there are only 3 active services:
- ExecutionManager (the executor, defined in `manager.py`)
- ExecutionScheduler (the scheduler, defined in `scheduler.py`)
-The service is running in an independent python process and communicates through an IPC.
+The services run in independent Python processes and communicate through an IPC.
A communication layer (`service.py`) is created to decouple the communication library from the implementation.
-Currently, the IPC is done using Pyro5 and abstracted in a way that it allows a function that is decorated with an `@expose` function can be called from the different process.
+Currently, the IPC is done using Pyro5 and abstracted in a way that allows a function decorated with `@expose` to be called from a different process.
-## Adding a new Agent Block
+## Adding a New Agent Block
-To add a new agent block, you need to create a new class that inherits from `Block` that provide these information:
-* `input_schema`: the schema of the input data, represented by a pydantic object.
-* `output_schema`: the schema of the output data, represented by a pydantic object.
-* `run` method: the main logic of the block
-* `test_input` & `test_output`: the sample input and output data for the block, this will be used to auto-test the block.
-* You can mock the functions declared in the block using the `test_mock` field for your unit test.
-* If you introduced a new module under the `blocks` package, you need to import the module in `blocks/__init__.py` to make it available to the server.
-* Once you finished creating the block, you can test it by running the test using `pytest test/block/test_block.py`.
+To add a new agent block, you need to create a new class that inherits from `Block` and provides the following information:
+* `input_schema`: the schema of the input data, represented by a Pydantic object.
+* `output_schema`: the schema of the output data, represented by a Pydantic object.
+* `run` method: the main logic of the block.
+* `test_input` & `test_output`: the sample input and output data for the block, which will be used to auto-test the block.
+* You can mock the functions declared in the block using the `test_mock` field for your unit tests.
+* If you introduce a new module under the `blocks` package, you need to import the module in `blocks/__init__.py` to make it available to the server.
+* Once you finish creating the block, you can test it by running `pytest test/block/test_block.py`.
diff --git a/rnd/autogpt_server/autogpt_server/server/conn_manager.py b/rnd/autogpt_server/autogpt_server/server/conn_manager.py
index b07c881ab3..3d90551664 100644
--- a/rnd/autogpt_server/autogpt_server/server/conn_manager.py
+++ b/rnd/autogpt_server/autogpt_server/server/conn_manager.py
@@ -34,7 +34,7 @@ class ConnectionManager:
graph_id = result.graph_id
if graph_id in self.subscriptions:
message = WsMessage(
- method=Methods.UPDATE,
+ method=Methods.EXECUTION_EVENT,
channel=graph_id,
data=result.model_dump()
).model_dump_json()
diff --git a/rnd/autogpt_server/autogpt_server/server/model.py b/rnd/autogpt_server/autogpt_server/server/model.py
index 2dd0efc001..051bdc88ba 100644
--- a/rnd/autogpt_server/autogpt_server/server/model.py
+++ b/rnd/autogpt_server/autogpt_server/server/model.py
@@ -6,13 +6,24 @@ import pydantic
class Methods(enum.Enum):
SUBSCRIBE = "subscribe"
UNSUBSCRIBE = "unsubscribe"
- UPDATE = "update"
+ EXECUTION_EVENT = "execution_event"
+ GET_BLOCKS = "get_blocks"
+ EXECUTE_BLOCK = "execute_block"
+ GET_GRAPHS = "get_graphs"
+ GET_GRAPH = "get_graph"
+ CREATE_GRAPH = "create_graph"
+ RUN_GRAPH = "run_graph"
+ GET_GRAPH_RUNS = "get_graph_runs"
+ CREATE_SCHEDULED_RUN = "create_scheduled_run"
+ GET_SCHEDULED_RUNS = "get_scheduled_runs"
+ UPDATE_SCHEDULED_RUN = "update_scheduled_run"
+ UPDATE_CONFIG = "update_config"
ERROR = "error"
class WsMessage(pydantic.BaseModel):
method: Methods
- data: typing.Dict[str, typing.Any] | None = None
+ data: typing.Dict[str, typing.Any] | list[typing.Any] | None = None
success: bool | None = None
channel: str | None = None
error: str | None = None
diff --git a/rnd/autogpt_server/autogpt_server/server/server.py b/rnd/autogpt_server/autogpt_server/server/server.py
index 85a6439448..af04474ec4 100644
--- a/rnd/autogpt_server/autogpt_server/server/server.py
+++ b/rnd/autogpt_server/autogpt_server/server/server.py
@@ -3,12 +3,18 @@ import uuid
from typing import Annotated, Any, Dict
import uvicorn
-from fastapi import WebSocket
from fastapi.responses import JSONResponse
from fastapi.staticfiles import StaticFiles
from contextlib import asynccontextmanager
-from fastapi import APIRouter, Body, FastAPI, HTTPException
+from fastapi import (
+ APIRouter,
+ Body,
+ FastAPI,
+ HTTPException,
+ WebSocket,
+ WebSocketDisconnect,
+)
from fastapi.middleware.cors import CORSMiddleware
from autogpt_server.data import db, execution, block
@@ -20,11 +26,12 @@ from autogpt_server.data.graph import (
)
from autogpt_server.executor import ExecutionManager, ExecutionScheduler
from autogpt_server.server.conn_manager import ConnectionManager
-from autogpt_server.server.ws_api import websocket_router as ws_impl
+import autogpt_server.server.ws_api
from autogpt_server.util.data import get_frontend_path
from autogpt_server.util.service import expose # type: ignore
from autogpt_server.util.service import AppService, get_service_client
from autogpt_server.util.settings import Settings
+from autogpt_server.server.model import WsMessage, Methods
class AgentServer(AppService):
@@ -73,7 +80,7 @@ class AgentServer(AppService):
)
router.add_api_route(
path="/blocks/{block_id}/execute",
- endpoint=self.execute_graph_block,
+ endpoint=self.execute_graph_block, # type: ignore
methods=["POST"],
)
router.add_api_route(
@@ -128,7 +135,7 @@ class AgentServer(AppService):
methods=["POST"],
)
- app.add_exception_handler(500, self.handle_internal_error)
+ app.add_exception_handler(500, self.handle_internal_error) # type: ignore
app.mount(
path="/frontend",
@@ -140,7 +147,7 @@ class AgentServer(AppService):
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket): # type: ignore
- await ws_impl(websocket, self.manager)
+ await self.websocket_router(websocket)
uvicorn.run(app, host="0.0.0.0", port=8000)
@@ -153,22 +160,191 @@ class AgentServer(AppService):
return get_service_client(ExecutionScheduler)
@classmethod
- def handle_internal_error(cls, request, exc):
+ def handle_internal_error(cls, request, exc): # type: ignore
return JSONResponse(
content={
- "message": f"{request.url.path} call failure",
- "error": str(exc),
+ "message": f"{request.url.path} call failure", # type: ignore
+ "error": str(exc), # type: ignore
},
status_code=500,
)
- @classmethod
- def get_graph_blocks(cls) -> list[dict[Any, Any]]:
- return [v.to_dict() for v in block.get_blocks().values()]
+ async def websocket_router(self, websocket: WebSocket):
+ await self.manager.connect(websocket)
+ try:
+ while True:
+ data = await websocket.receive_text()
+ message = WsMessage.model_validate_json(data)
+ if message.method == Methods.SUBSCRIBE:
+ await autogpt_server.server.ws_api.handle_subscribe(
+ websocket, self.manager, message
+ )
+
+ elif message.method == Methods.UNSUBSCRIBE:
+ await autogpt_server.server.ws_api.handle_unsubscribe(
+ websocket, self.manager, message
+ )
+ elif message.method == Methods.EXECUTION_EVENT:
+ print("Execution event received")
+ elif message.method == Methods.GET_BLOCKS:
+ data = self.get_graph_blocks()
+ await websocket.send_text(
+ WsMessage(
+ method=Methods.GET_BLOCKS,
+ success=True,
+ data=data,
+ ).model_dump_json()
+ )
+ elif message.method == Methods.EXECUTE_BLOCK:
+ assert isinstance(message.data, dict), "Data must be a dictionary"
+ data = self.execute_graph_block(
+ message.data["block_id"], message.data["data"]
+ )
+ await websocket.send_text(
+ WsMessage(
+ method=Methods.EXECUTE_BLOCK,
+ success=True,
+ data=data,
+ ).model_dump_json()
+ )
+ elif message.method == Methods.GET_GRAPHS:
+ data = await self.get_graphs()
+ await websocket.send_text(
+ WsMessage(
+ method=Methods.GET_GRAPHS,
+ success=True,
+ data=data,
+ ).model_dump_json()
+ )
+ print("Get graphs request received")
+ elif message.method == Methods.GET_GRAPH:
+ assert isinstance(message.data, dict), "Data must be a dictionary"
+ data = await self.get_graph(message.data["graph_id"])
+ await websocket.send_text(
+ WsMessage(
+ method=Methods.GET_GRAPH,
+ success=True,
+ data=data.model_dump(),
+ ).model_dump_json()
+ )
+ print("Get graph request received")
+ elif message.method == Methods.CREATE_GRAPH:
+ assert isinstance(message.data, dict), "Data must be a dictionary"
+ graph = Graph.model_validate(message.data)
+ data = await self.create_new_graph(graph)
+ await websocket.send_text(
+ WsMessage(
+ method=Methods.CREATE_GRAPH,
+ success=True,
+ data=data.model_dump(),
+ ).model_dump_json()
+ )
+
+ print("Create graph request received")
+ elif message.method == Methods.RUN_GRAPH:
+ assert isinstance(message.data, dict), "Data must be a dictionary"
+ data = await self.execute_graph(
+ message.data["graph_id"], message.data["data"]
+ )
+ await websocket.send_text(
+ WsMessage(
+ method=Methods.RUN_GRAPH,
+ success=True,
+ data=data,
+ ).model_dump_json()
+ )
+
+ print("Run graph request received")
+ elif message.method == Methods.GET_GRAPH_RUNS:
+ assert isinstance(message.data, dict), "Data must be a dictionary"
+ data = await self.list_graph_runs(message.data["graph_id"])
+ await websocket.send_text(
+ WsMessage(
+ method=Methods.GET_GRAPH_RUNS,
+ success=True,
+ data=data,
+ ).model_dump_json()
+ )
+
+ print("Get graph runs request received")
+ elif message.method == Methods.CREATE_SCHEDULED_RUN:
+ assert isinstance(message.data, dict), "Data must be a dictionary"
+ data = await self.create_schedule(
+ message.data["graph_id"],
+ message.data["cron"],
+ message.data["data"],
+ )
+ await websocket.send_text(
+ WsMessage(
+ method=Methods.CREATE_SCHEDULED_RUN,
+ success=True,
+ data=data,
+ ).model_dump_json()
+ )
+
+ print("Create scheduled run request received")
+ elif message.method == Methods.GET_SCHEDULED_RUNS:
+ assert isinstance(message.data, dict), "Data must be a dictionary"
+ data = self.get_execution_schedules(message.data["graph_id"])
+ await websocket.send_text(
+ WsMessage(
+ method=Methods.GET_SCHEDULED_RUNS,
+ success=True,
+ data=data,
+ ).model_dump_json()
+ )
+ print("Get scheduled runs request received")
+ elif message.method == Methods.UPDATE_SCHEDULED_RUN:
+ assert isinstance(message.data, dict), "Data must be a dictionary"
+ data = self.update_schedule(
+ message.data["schedule_id"], message.data
+ )
+ await websocket.send_text(
+ WsMessage(
+ method=Methods.UPDATE_SCHEDULED_RUN,
+ success=True,
+ data=data,
+ ).model_dump_json()
+ )
+
+ print("Update scheduled run request received")
+ elif message.method == Methods.UPDATE_CONFIG:
+ assert isinstance(message.data, dict), "Data must be a dictionary"
+ data = self.update_configuration(message.data)
+ await websocket.send_text(
+ WsMessage(
+ method=Methods.UPDATE_CONFIG,
+ success=True,
+ data=data,
+ ).model_dump_json()
+ )
+
+ print("Update config request received")
+ elif message.method == Methods.ERROR:
+ print("Error message received")
+ else:
+ print("Message type is not processed by the server")
+ await websocket.send_text(
+ WsMessage(
+ method=Methods.ERROR,
+ success=False,
+ error="Message type is not processed by the server",
+ ).model_dump_json()
+ )
+
+ except WebSocketDisconnect:
+ self.manager.disconnect(websocket)
+ print("Client Disconnected")
@classmethod
- def execute_graph_block(cls, block_id: str, data: dict[str, Any]) -> list:
- obj = block.get_block(block_id)
+ def get_graph_blocks(cls) -> list[dict[Any, Any]]:
+ return [v.to_dict() for v in block.get_blocks().values()] # type: ignore
+
+ @classmethod
+ def execute_graph_block(
+ cls, block_id: str, data: dict[str, Any]
+ ) -> list[dict[str, Any]]:
+ obj = block.get_block(block_id) # type: ignore
if not obj:
raise HTTPException(status_code=404, detail=f"Block #{block_id} not found.")
return [{name: data} for name, data in obj.execute(data)]
@@ -252,9 +428,7 @@ class AgentServer(AppService):
@expose
def send_execution_update(self, execution_result_dict: dict[Any, Any]):
execution_result = execution.ExecutionResult(**execution_result_dict)
- self.run_and_wait(
- self.event_queue.put(execution_result)
- )
+ self.run_and_wait(self.event_queue.put(execution_result))
@classmethod
def update_configuration(
@@ -275,12 +449,9 @@ class AgentServer(AppService):
setattr(settings.secrets, key, value) # type: ignore
updated_fields["secrets"].append(key)
settings.save()
- return JSONResponse(
- content={
- "message": "Settings updated successfully",
- "updated_fields": updated_fields,
- },
- status_code=200,
- )
+ return {
+ "message": "Settings updated successfully",
+ "updated_fields": updated_fields,
+ }
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
diff --git a/rnd/autogpt_server/test/server/test_con_manager.py b/rnd/autogpt_server/test/server/test_con_manager.py
index 4cff74cfc9..8d5adac7d2 100644
--- a/rnd/autogpt_server/test/server/test_con_manager.py
+++ b/rnd/autogpt_server/test/server/test_con_manager.py
@@ -84,7 +84,7 @@ async def test_send_execution_result(
mock_websocket.send_text.assert_called_once_with(
WsMessage(
- method=Methods.UPDATE,
+ method=Methods.EXECUTION_EVENT,
channel="test_graph",
data=result.model_dump(),
).model_dump_json()
diff --git a/rnd/autogpt_server/test/server/test_ws_api.py b/rnd/autogpt_server/test/server/test_ws_api.py
index 816737967e..550867caa3 100644
--- a/rnd/autogpt_server/test/server/test_ws_api.py
+++ b/rnd/autogpt_server/test/server/test_ws_api.py
@@ -75,7 +75,8 @@ async def test_websocket_router_invalid_method(
mock_websocket: AsyncMock, mock_manager: AsyncMock
) -> None:
mock_websocket.receive_text.side_effect = [
- WsMessage(method=Methods.UPDATE).model_dump_json(),
+
+ WsMessage(method=Methods.EXECUTION_EVENT).model_dump_json(),
WebSocketDisconnect(),
]