From e10c4ee4cd94cae926dfdac3656743b90455a93b Mon Sep 17 00:00:00 2001 From: Zamil Majdy Date: Wed, 10 Jul 2024 12:31:52 +0400 Subject: [PATCH 1/3] fix(doc): Fix Auto GPT server Running The Server doc (#7360) --- rnd/autogpt_server/README.md | 144 ++++++++++++++++++----------------- 1 file changed, 73 insertions(+), 71 deletions(-) diff --git a/rnd/autogpt_server/README.md b/rnd/autogpt_server/README.md index 6aa15c42c1..d77a39ca35 100644 --- a/rnd/autogpt_server/README.md +++ b/rnd/autogpt_server/README.md @@ -1,100 +1,102 @@ -# Next Gen AutoGPT +# AutoGPT Agent Server -This is a research project into creating the next generation of autogpt, which is an autogpt agent server. - -The agent server will enable the creation of composite multi-agent system that utilize AutoGPT Agent as its default agent. +This is an initial project for creating the next generation of agent execution, which is an AutoGPT agent server. +The agent server will enable the creation of composite multi-agent systems that utilize AutoGPT agents and other non-agent components as its primitives. ## Setup -This setup is for MacOS/Linux. -To setup the project follow these steps inside the project directory: +To set up the project, follow these steps inside the project directory: -1. Enter poetry shell - ``` +1. Enter the poetry shell + + ```sh poetry shell ``` - + 2. Install dependencies - ``` + + ```sh poetry install ``` + +3. Generate the Prisma client -3. Generate prisma client - ``` + ```sh poetry run prisma generate ``` + + In case Prisma generates the client for the global Python installation instead of the virtual environment, the current mitigation is to just uninstall the global Prisma package: - In case prisma generates client for the global python installation instead of the virtual environment the current mitigation is to just uninstall the global prisma package: - ``` + ```sh pip uninstall prisma ``` - - And then run the generation again. - The path *should* look something like this: + + Then run the generation again. The path *should* look something like this: `/pypoetry/virtualenvs/autogpt-server-TQIRSwR6-py3.12/bin/prisma` -4. Migrate the database, be careful because this deletes current data in the database - ``` +4. Migrate the database. Be careful because this deletes current data in the database. + + ```sh poetry run prisma migrate dev ``` + +## Running The Server -# Running The Server - -## Starting the server directly +### Starting the server directly Run the following command: -``` +```sh poetry run app ``` -## Running the App in the Background +### Running the App in the Background -1. Start the server, this starts the server in the background - ``` +**Note: this is a Unix feature and can fail on Windows. If it fails, you can run the previous command and manually move the process to the background.* + +1. Start the server. This starts the server in the background. + + ```sh poetry run cli start ``` - - You may need to change the permissions of the file to make it executable - ``` - chmod +x autogpt_server/cli.py - ``` + +2. Stop the server. -2. Stop the server - ``` + ```sh poetry run cli stop ``` - + ## Adding Test Data -1. Start the server using 1 of the above methods +1. Start the server using one of the above methods. -2. Run the populate db command +2. Run the populate DB command: -``` -poetry run cli test populate-db http://0.0.0.0:8000 -``` - -This will add a graph, a graph execution and a cron schedule to run the graph every 5 mins + ```sh + poetry run cli test populate-db http://0.0.0.0:8000 + ``` + + This will add a graph, a graph execution, and a cron schedule to run the graph every 5 minutes. ### Reddit Graph -There is a command to add the test reddit graph +There is a command to add the test Reddit graph: -``` +```sh poetry run cli test reddit http://0.0.0.0:8000 ``` -For help run: -``` +For help, run: + +```sh poetry run cli test reddit --help - ``` -# Testing +## Testing -To run the tests -``` +To run the tests: + +```sh poetry run pytest ``` @@ -104,37 +106,37 @@ The current project has the following main modules: ### **blocks** -This module stores all the Agent Block, a reusable component to build a graph that represents the agent's behavior. +This module stores all the Agent Blocks, which are reusable components to build a graph that represents the agent's behavior. ### **data** This module stores the logical model that is persisted in the database. -This module abstracts the database operation into a function that can be called by the service layer. -Any code that interacts with Prisma objects or databases should live in this module. +It abstracts the database operations into functions that can be called by the service layer. +Any code that interacts with Prisma objects or the database should reside in this module. The main models are: * `block`: anything related to the block used in the graph * `execution`: anything related to the execution graph execution -* `graph`: anything related to the graph, node, and its relation +* `graph`: anything related to the graph, node, and its relations ### **execution** This module stores the business logic of executing the graph. It currently has the following main modules: -* `manager`: A service that consumes the queue of the graph execution and executes the graph. It contains both of the logic. -* `scheduler`: A service that triggers scheduled graph execution based on cron expression. It will push an execution request to the manager. +* `manager`: A service that consumes the queue of the graph execution and executes the graph. It contains both pieces of logic. +* `scheduler`: A service that triggers scheduled graph execution based on a cron expression. It pushes an execution request to the manager. ### **server** This module stores the logic for the server API. -It stores all the logic used for the API that allows the client to create/execute/monitor the graph and its execution. -This API service will interact with other services like the ones defined in `manager` and `scheduler`. +It contains all the logic used for the API that allows the client to create, execute, and monitor the graph and its execution. +This API service interacts with other services like those defined in `manager` and `scheduler`. ### **utils** -This module stores the utility functions that are used across the project. -Currently, it only has two main modules: +This module stores utility functions that are used across the project. +Currently, it has two main modules: * `process`: A module that contains the logic to spawn a new process. -* `service`: A module that becomes a parent class for all the services in the project. +* `service`: A module that serves as a parent class for all the services in the project. ## Service Communication @@ -144,18 +146,18 @@ Currently, there are only 3 active services: - ExecutionManager (the executor, defined in `manager.py`) - ExecutionScheduler (the scheduler, defined in `scheduler.py`) -The service is running in an independent python process and communicates through an IPC. +The services run in independent Python processes and communicate through an IPC. A communication layer (`service.py`) is created to decouple the communication library from the implementation. -Currently, the IPC is done using Pyro5 and abstracted in a way that it allows a function that is decorated with an `@expose` function can be called from the different process. +Currently, the IPC is done using Pyro5 and abstracted in a way that allows a function decorated with `@expose` to be called from a different process. -## Adding a new Agent Block +## Adding a New Agent Block -To add a new agent block, you need to create a new class that inherits from `Block` that provide these information: -* `input_schema`: the schema of the input data, represented by a pydantic object. -* `output_schema`: the schema of the output data, represented by a pydantic object. -* `run` method: the main logic of the block -* `test_input` & `test_output`: the sample input and output data for the block, this will be used to auto-test the block. -* You can mock the functions declared in the block using the `test_mock` field for your unit test. -* If you introduced a new module under the `blocks` package, you need to import the module in `blocks/__init__.py` to make it available to the server. -* Once you finished creating the block, you can test it by running the test using `pytest test/block/test_block.py`. +To add a new agent block, you need to create a new class that inherits from `Block` and provides the following information: +* `input_schema`: the schema of the input data, represented by a Pydantic object. +* `output_schema`: the schema of the output data, represented by a Pydantic object. +* `run` method: the main logic of the block. +* `test_input` & `test_output`: the sample input and output data for the block, which will be used to auto-test the block. +* You can mock the functions declared in the block using the `test_mock` field for your unit tests. +* If you introduce a new module under the `blocks` package, you need to import the module in `blocks/__init__.py` to make it available to the server. +* Once you finish creating the block, you can test it by running `pytest test/block/test_block.py`. From f94e81f48b15cc916f2f3ca7f6a884b18e2f5617 Mon Sep 17 00:00:00 2001 From: Swifty Date: Wed, 10 Jul 2024 11:01:12 +0200 Subject: [PATCH 2/3] feat(builder) Add save Agent functionality (#7361) Add save functionality --- rnd/autogpt_builder/src/components/Flow.tsx | 130 +++++++++++--------- 1 file changed, 73 insertions(+), 57 deletions(-) diff --git a/rnd/autogpt_builder/src/components/Flow.tsx b/rnd/autogpt_builder/src/components/Flow.tsx index 4138294176..c4dd1431b0 100644 --- a/rnd/autogpt_builder/src/components/Flow.tsx +++ b/rnd/autogpt_builder/src/components/Flow.tsx @@ -34,34 +34,34 @@ type CustomNodeData = { block_id: string; }; -const Sidebar: React.FC<{isOpen: boolean, availableNodes: Block[], addNode: (id: string, name: string) => void}> = - ({isOpen, availableNodes, addNode}) => { - const [searchQuery, setSearchQuery] = useState(''); +const Sidebar: React.FC<{ isOpen: boolean, availableNodes: Block[], addNode: (id: string, name: string) => void }> = + ({ isOpen, availableNodes, addNode }) => { + const [searchQuery, setSearchQuery] = useState(''); - if (!isOpen) return null; + if (!isOpen) return null; - const filteredNodes = availableNodes.filter(node => - node.name.toLowerCase().includes(searchQuery.toLowerCase()) - ); + const filteredNodes = availableNodes.filter(node => + node.name.toLowerCase().includes(searchQuery.toLowerCase()) + ); - return ( -
-

Nodes

- setSearchQuery(e.target.value)} - /> - {filteredNodes.map((node) => ( -
- {node.name} - -
- ))} -
- ); -}; + return ( +
+

Nodes

+ setSearchQuery(e.target.value)} + /> + {filteredNodes.map((node) => ( +
+ {node.name} + +
+ ))} +
+ ); + }; const FlowEditor: React.FC<{ flowID?: string; className?: string }> = ({ flowID, @@ -261,8 +261,7 @@ const FlowEditor: React.FC<{ flowID?: string; className?: string }> = ({ return inputData; }; - - const runAgent = async () => { + const saveAgent = async () => { try { console.log("All nodes before formatting:", nodes); const blockIdToNodeIdMap = {}; @@ -326,6 +325,20 @@ const FlowEditor: React.FC<{ flowID?: string; className?: string }> = ({ setNodes(updatedNodes); + return newAgentId; + } catch (error) { + console.error('Error running agent:', error); + } + }; + + const runAgent = async () => { + try { + const newAgentId = await saveAgent(); + if (!newAgentId) { + console.error('Error saving agent'); + return; + } + const executeData = await api.executeFlow(newAgentId); const runId = executeData.id; @@ -348,25 +361,25 @@ const FlowEditor: React.FC<{ flowID?: string; className?: string }> = ({ }; -const updateNodesWithExecutionData = (executionData: any[]) => { - setNodes((nds) => - nds.map((node) => { - const nodeExecution = executionData.find((exec) => exec.node_id === node.id); - if (nodeExecution) { - return { - ...node, - data: { - ...node.data, - status: nodeExecution.status, - output_data: nodeExecution.output_data, - isPropertiesOpen: true, - }, - }; - } - return node; - }) - ); -}; + const updateNodesWithExecutionData = (executionData: any[]) => { + setNodes((nds) => + nds.map((node) => { + const nodeExecution = executionData.find((exec) => exec.node_id === node.id); + if (nodeExecution) { + return { + ...node, + data: { + ...node.data, + status: nodeExecution.status, + output_data: nodeExecution.output_data, + isPropertiesOpen: true, + }, + }; + } + return node; + }) + ); + }; const toggleSidebar = () => setIsSidebarOpen(!isSidebarOpen); @@ -394,19 +407,22 @@ const updateNodesWithExecutionData = (executionData: any[]) => { nodeTypes={nodeTypes} >
- setAgentName(e.target.value)} + setAgentName(e.target.value)} /> - setAgentDescription(e.target.value)} + setAgentDescription(e.target.value)} /> - +
{/* Added gap for spacing */} + + +
From 3789b004799f4eb547c15652d5afd72034455cb3 Mon Sep 17 00:00:00 2001 From: Swifty Date: Wed, 10 Jul 2024 11:54:18 +0200 Subject: [PATCH 3/3] feat(autogpt_server): Expose rest api via websocket (#7350) * Add in websocket event types * adding in api endpoints * Updated ws messages --- .../autogpt_server/server/conn_manager.py | 2 +- .../autogpt_server/server/model.py | 15 +- .../autogpt_server/server/server.py | 219 ++++++++++++++++-- .../test/server/test_con_manager.py | 2 +- rnd/autogpt_server/test/server/test_ws_api.py | 3 +- 5 files changed, 212 insertions(+), 29 deletions(-) diff --git a/rnd/autogpt_server/autogpt_server/server/conn_manager.py b/rnd/autogpt_server/autogpt_server/server/conn_manager.py index b07c881ab3..3d90551664 100644 --- a/rnd/autogpt_server/autogpt_server/server/conn_manager.py +++ b/rnd/autogpt_server/autogpt_server/server/conn_manager.py @@ -34,7 +34,7 @@ class ConnectionManager: graph_id = result.graph_id if graph_id in self.subscriptions: message = WsMessage( - method=Methods.UPDATE, + method=Methods.EXECUTION_EVENT, channel=graph_id, data=result.model_dump() ).model_dump_json() diff --git a/rnd/autogpt_server/autogpt_server/server/model.py b/rnd/autogpt_server/autogpt_server/server/model.py index 2dd0efc001..051bdc88ba 100644 --- a/rnd/autogpt_server/autogpt_server/server/model.py +++ b/rnd/autogpt_server/autogpt_server/server/model.py @@ -6,13 +6,24 @@ import pydantic class Methods(enum.Enum): SUBSCRIBE = "subscribe" UNSUBSCRIBE = "unsubscribe" - UPDATE = "update" + EXECUTION_EVENT = "execution_event" + GET_BLOCKS = "get_blocks" + EXECUTE_BLOCK = "execute_block" + GET_GRAPHS = "get_graphs" + GET_GRAPH = "get_graph" + CREATE_GRAPH = "create_graph" + RUN_GRAPH = "run_graph" + GET_GRAPH_RUNS = "get_graph_runs" + CREATE_SCHEDULED_RUN = "create_scheduled_run" + GET_SCHEDULED_RUNS = "get_scheduled_runs" + UPDATE_SCHEDULED_RUN = "update_scheduled_run" + UPDATE_CONFIG = "update_config" ERROR = "error" class WsMessage(pydantic.BaseModel): method: Methods - data: typing.Dict[str, typing.Any] | None = None + data: typing.Dict[str, typing.Any] | list[typing.Any] | None = None success: bool | None = None channel: str | None = None error: str | None = None diff --git a/rnd/autogpt_server/autogpt_server/server/server.py b/rnd/autogpt_server/autogpt_server/server/server.py index 85a6439448..af04474ec4 100644 --- a/rnd/autogpt_server/autogpt_server/server/server.py +++ b/rnd/autogpt_server/autogpt_server/server/server.py @@ -3,12 +3,18 @@ import uuid from typing import Annotated, Any, Dict import uvicorn -from fastapi import WebSocket from fastapi.responses import JSONResponse from fastapi.staticfiles import StaticFiles from contextlib import asynccontextmanager -from fastapi import APIRouter, Body, FastAPI, HTTPException +from fastapi import ( + APIRouter, + Body, + FastAPI, + HTTPException, + WebSocket, + WebSocketDisconnect, +) from fastapi.middleware.cors import CORSMiddleware from autogpt_server.data import db, execution, block @@ -20,11 +26,12 @@ from autogpt_server.data.graph import ( ) from autogpt_server.executor import ExecutionManager, ExecutionScheduler from autogpt_server.server.conn_manager import ConnectionManager -from autogpt_server.server.ws_api import websocket_router as ws_impl +import autogpt_server.server.ws_api from autogpt_server.util.data import get_frontend_path from autogpt_server.util.service import expose # type: ignore from autogpt_server.util.service import AppService, get_service_client from autogpt_server.util.settings import Settings +from autogpt_server.server.model import WsMessage, Methods class AgentServer(AppService): @@ -73,7 +80,7 @@ class AgentServer(AppService): ) router.add_api_route( path="/blocks/{block_id}/execute", - endpoint=self.execute_graph_block, + endpoint=self.execute_graph_block, # type: ignore methods=["POST"], ) router.add_api_route( @@ -128,7 +135,7 @@ class AgentServer(AppService): methods=["POST"], ) - app.add_exception_handler(500, self.handle_internal_error) + app.add_exception_handler(500, self.handle_internal_error) # type: ignore app.mount( path="/frontend", @@ -140,7 +147,7 @@ class AgentServer(AppService): @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): # type: ignore - await ws_impl(websocket, self.manager) + await self.websocket_router(websocket) uvicorn.run(app, host="0.0.0.0", port=8000) @@ -153,22 +160,191 @@ class AgentServer(AppService): return get_service_client(ExecutionScheduler) @classmethod - def handle_internal_error(cls, request, exc): + def handle_internal_error(cls, request, exc): # type: ignore return JSONResponse( content={ - "message": f"{request.url.path} call failure", - "error": str(exc), + "message": f"{request.url.path} call failure", # type: ignore + "error": str(exc), # type: ignore }, status_code=500, ) - @classmethod - def get_graph_blocks(cls) -> list[dict[Any, Any]]: - return [v.to_dict() for v in block.get_blocks().values()] + async def websocket_router(self, websocket: WebSocket): + await self.manager.connect(websocket) + try: + while True: + data = await websocket.receive_text() + message = WsMessage.model_validate_json(data) + if message.method == Methods.SUBSCRIBE: + await autogpt_server.server.ws_api.handle_subscribe( + websocket, self.manager, message + ) + + elif message.method == Methods.UNSUBSCRIBE: + await autogpt_server.server.ws_api.handle_unsubscribe( + websocket, self.manager, message + ) + elif message.method == Methods.EXECUTION_EVENT: + print("Execution event received") + elif message.method == Methods.GET_BLOCKS: + data = self.get_graph_blocks() + await websocket.send_text( + WsMessage( + method=Methods.GET_BLOCKS, + success=True, + data=data, + ).model_dump_json() + ) + elif message.method == Methods.EXECUTE_BLOCK: + assert isinstance(message.data, dict), "Data must be a dictionary" + data = self.execute_graph_block( + message.data["block_id"], message.data["data"] + ) + await websocket.send_text( + WsMessage( + method=Methods.EXECUTE_BLOCK, + success=True, + data=data, + ).model_dump_json() + ) + elif message.method == Methods.GET_GRAPHS: + data = await self.get_graphs() + await websocket.send_text( + WsMessage( + method=Methods.GET_GRAPHS, + success=True, + data=data, + ).model_dump_json() + ) + print("Get graphs request received") + elif message.method == Methods.GET_GRAPH: + assert isinstance(message.data, dict), "Data must be a dictionary" + data = await self.get_graph(message.data["graph_id"]) + await websocket.send_text( + WsMessage( + method=Methods.GET_GRAPH, + success=True, + data=data.model_dump(), + ).model_dump_json() + ) + print("Get graph request received") + elif message.method == Methods.CREATE_GRAPH: + assert isinstance(message.data, dict), "Data must be a dictionary" + graph = Graph.model_validate(message.data) + data = await self.create_new_graph(graph) + await websocket.send_text( + WsMessage( + method=Methods.CREATE_GRAPH, + success=True, + data=data.model_dump(), + ).model_dump_json() + ) + + print("Create graph request received") + elif message.method == Methods.RUN_GRAPH: + assert isinstance(message.data, dict), "Data must be a dictionary" + data = await self.execute_graph( + message.data["graph_id"], message.data["data"] + ) + await websocket.send_text( + WsMessage( + method=Methods.RUN_GRAPH, + success=True, + data=data, + ).model_dump_json() + ) + + print("Run graph request received") + elif message.method == Methods.GET_GRAPH_RUNS: + assert isinstance(message.data, dict), "Data must be a dictionary" + data = await self.list_graph_runs(message.data["graph_id"]) + await websocket.send_text( + WsMessage( + method=Methods.GET_GRAPH_RUNS, + success=True, + data=data, + ).model_dump_json() + ) + + print("Get graph runs request received") + elif message.method == Methods.CREATE_SCHEDULED_RUN: + assert isinstance(message.data, dict), "Data must be a dictionary" + data = await self.create_schedule( + message.data["graph_id"], + message.data["cron"], + message.data["data"], + ) + await websocket.send_text( + WsMessage( + method=Methods.CREATE_SCHEDULED_RUN, + success=True, + data=data, + ).model_dump_json() + ) + + print("Create scheduled run request received") + elif message.method == Methods.GET_SCHEDULED_RUNS: + assert isinstance(message.data, dict), "Data must be a dictionary" + data = self.get_execution_schedules(message.data["graph_id"]) + await websocket.send_text( + WsMessage( + method=Methods.GET_SCHEDULED_RUNS, + success=True, + data=data, + ).model_dump_json() + ) + print("Get scheduled runs request received") + elif message.method == Methods.UPDATE_SCHEDULED_RUN: + assert isinstance(message.data, dict), "Data must be a dictionary" + data = self.update_schedule( + message.data["schedule_id"], message.data + ) + await websocket.send_text( + WsMessage( + method=Methods.UPDATE_SCHEDULED_RUN, + success=True, + data=data, + ).model_dump_json() + ) + + print("Update scheduled run request received") + elif message.method == Methods.UPDATE_CONFIG: + assert isinstance(message.data, dict), "Data must be a dictionary" + data = self.update_configuration(message.data) + await websocket.send_text( + WsMessage( + method=Methods.UPDATE_CONFIG, + success=True, + data=data, + ).model_dump_json() + ) + + print("Update config request received") + elif message.method == Methods.ERROR: + print("Error message received") + else: + print("Message type is not processed by the server") + await websocket.send_text( + WsMessage( + method=Methods.ERROR, + success=False, + error="Message type is not processed by the server", + ).model_dump_json() + ) + + except WebSocketDisconnect: + self.manager.disconnect(websocket) + print("Client Disconnected") @classmethod - def execute_graph_block(cls, block_id: str, data: dict[str, Any]) -> list: - obj = block.get_block(block_id) + def get_graph_blocks(cls) -> list[dict[Any, Any]]: + return [v.to_dict() for v in block.get_blocks().values()] # type: ignore + + @classmethod + def execute_graph_block( + cls, block_id: str, data: dict[str, Any] + ) -> list[dict[str, Any]]: + obj = block.get_block(block_id) # type: ignore if not obj: raise HTTPException(status_code=404, detail=f"Block #{block_id} not found.") return [{name: data} for name, data in obj.execute(data)] @@ -252,9 +428,7 @@ class AgentServer(AppService): @expose def send_execution_update(self, execution_result_dict: dict[Any, Any]): execution_result = execution.ExecutionResult(**execution_result_dict) - self.run_and_wait( - self.event_queue.put(execution_result) - ) + self.run_and_wait(self.event_queue.put(execution_result)) @classmethod def update_configuration( @@ -275,12 +449,9 @@ class AgentServer(AppService): setattr(settings.secrets, key, value) # type: ignore updated_fields["secrets"].append(key) settings.save() - return JSONResponse( - content={ - "message": "Settings updated successfully", - "updated_fields": updated_fields, - }, - status_code=200, - ) + return { + "message": "Settings updated successfully", + "updated_fields": updated_fields, + } except Exception as e: raise HTTPException(status_code=400, detail=str(e)) diff --git a/rnd/autogpt_server/test/server/test_con_manager.py b/rnd/autogpt_server/test/server/test_con_manager.py index 4cff74cfc9..8d5adac7d2 100644 --- a/rnd/autogpt_server/test/server/test_con_manager.py +++ b/rnd/autogpt_server/test/server/test_con_manager.py @@ -84,7 +84,7 @@ async def test_send_execution_result( mock_websocket.send_text.assert_called_once_with( WsMessage( - method=Methods.UPDATE, + method=Methods.EXECUTION_EVENT, channel="test_graph", data=result.model_dump(), ).model_dump_json() diff --git a/rnd/autogpt_server/test/server/test_ws_api.py b/rnd/autogpt_server/test/server/test_ws_api.py index 816737967e..550867caa3 100644 --- a/rnd/autogpt_server/test/server/test_ws_api.py +++ b/rnd/autogpt_server/test/server/test_ws_api.py @@ -75,7 +75,8 @@ async def test_websocket_router_invalid_method( mock_websocket: AsyncMock, mock_manager: AsyncMock ) -> None: mock_websocket.receive_text.side_effect = [ - WsMessage(method=Methods.UPDATE).model_dump_json(), + + WsMessage(method=Methods.EXECUTION_EVENT).model_dump_json(), WebSocketDisconnect(), ]