Merge branch 'master' into zamilmajdy/simplify-ai-block

This commit is contained in:
Zamil Majdy
2024-07-10 13:58:55 +04:00
committed by GitHub
7 changed files with 358 additions and 157 deletions

View File

@@ -34,34 +34,34 @@ type CustomNodeData = {
block_id: string;
};
const Sidebar: React.FC<{isOpen: boolean, availableNodes: Block[], addNode: (id: string, name: string) => void}> =
({isOpen, availableNodes, addNode}) => {
const [searchQuery, setSearchQuery] = useState('');
const Sidebar: React.FC<{ isOpen: boolean, availableNodes: Block[], addNode: (id: string, name: string) => void }> =
({ isOpen, availableNodes, addNode }) => {
const [searchQuery, setSearchQuery] = useState('');
if (!isOpen) return null;
if (!isOpen) return null;
const filteredNodes = availableNodes.filter(node =>
node.name.toLowerCase().includes(searchQuery.toLowerCase())
);
const filteredNodes = availableNodes.filter(node =>
node.name.toLowerCase().includes(searchQuery.toLowerCase())
);
return (
<div className={`sidebar dark-theme ${isOpen ? 'open' : ''}`}>
<h3>Nodes</h3>
<Input
type="text"
placeholder="Search nodes..."
value={searchQuery}
onChange={(e) => setSearchQuery(e.target.value)}
/>
{filteredNodes.map((node) => (
<div key={node.id} className="sidebarNodeRowStyle dark-theme">
<span>{node.name}</span>
<Button onClick={() => addNode(node.id, node.name)}>Add</Button>
</div>
))}
</div>
);
};
return (
<div className={`sidebar dark-theme ${isOpen ? 'open' : ''}`}>
<h3>Nodes</h3>
<Input
type="text"
placeholder="Search nodes..."
value={searchQuery}
onChange={(e) => setSearchQuery(e.target.value)}
/>
{filteredNodes.map((node) => (
<div key={node.id} className="sidebarNodeRowStyle dark-theme">
<span>{node.name}</span>
<Button onClick={() => addNode(node.id, node.name)}>Add</Button>
</div>
))}
</div>
);
};
const FlowEditor: React.FC<{ flowID?: string; className?: string }> = ({
flowID,
@@ -261,8 +261,7 @@ const FlowEditor: React.FC<{ flowID?: string; className?: string }> = ({
return inputData;
};
const runAgent = async () => {
const saveAgent = async () => {
try {
console.log("All nodes before formatting:", nodes);
const blockIdToNodeIdMap = {};
@@ -326,6 +325,20 @@ const FlowEditor: React.FC<{ flowID?: string; className?: string }> = ({
setNodes(updatedNodes);
return newAgentId;
} catch (error) {
console.error('Error running agent:', error);
}
};
const runAgent = async () => {
try {
const newAgentId = await saveAgent();
if (!newAgentId) {
console.error('Error saving agent');
return;
}
const executeData = await api.executeFlow(newAgentId);
const runId = executeData.id;
@@ -348,25 +361,25 @@ const FlowEditor: React.FC<{ flowID?: string; className?: string }> = ({
};
const updateNodesWithExecutionData = (executionData: any[]) => {
setNodes((nds) =>
nds.map((node) => {
const nodeExecution = executionData.find((exec) => exec.node_id === node.id);
if (nodeExecution) {
return {
...node,
data: {
...node.data,
status: nodeExecution.status,
output_data: nodeExecution.output_data,
isPropertiesOpen: true,
},
};
}
return node;
})
);
};
const updateNodesWithExecutionData = (executionData: any[]) => {
setNodes((nds) =>
nds.map((node) => {
const nodeExecution = executionData.find((exec) => exec.node_id === node.id);
if (nodeExecution) {
return {
...node,
data: {
...node.data,
status: nodeExecution.status,
output_data: nodeExecution.output_data,
isPropertiesOpen: true,
},
};
}
return node;
})
);
};
const toggleSidebar = () => setIsSidebarOpen(!isSidebarOpen);
@@ -394,19 +407,22 @@ const updateNodesWithExecutionData = (executionData: any[]) => {
nodeTypes={nodeTypes}
>
<div style={{ position: 'absolute', right: 10, zIndex: 4 }}>
<Input
type="text"
placeholder="Agent Name"
value={agentName}
onChange={(e) => setAgentName(e.target.value)}
<Input
type="text"
placeholder="Agent Name"
value={agentName}
onChange={(e) => setAgentName(e.target.value)}
/>
<Input
type="text"
placeholder="Agent Description"
value={agentDescription}
onChange={(e) => setAgentDescription(e.target.value)}
<Input
type="text"
placeholder="Agent Description"
value={agentDescription}
onChange={(e) => setAgentDescription(e.target.value)}
/>
<Button onClick={runAgent}>Run Agent</Button>
<div style={{ display: 'flex', flexDirection: 'column', gap: '10px' }}> {/* Added gap for spacing */}
<Button onClick={saveAgent}>Save Agent</Button>
<Button onClick={runAgent}>Save & Run Agent</Button>
</div>
</div>
</ReactFlow>
</div>

View File

@@ -1,100 +1,102 @@
# Next Gen AutoGPT
# AutoGPT Agent Server
This is a research project into creating the next generation of autogpt, which is an autogpt agent server.
The agent server will enable the creation of composite multi-agent system that utilize AutoGPT Agent as its default agent.
This is an initial project for creating the next generation of agent execution, which is an AutoGPT agent server.
The agent server will enable the creation of composite multi-agent systems that utilize AutoGPT agents and other non-agent components as its primitives.
## Setup
This setup is for MacOS/Linux.
To setup the project follow these steps inside the project directory:
To set up the project, follow these steps inside the project directory:
1. Enter poetry shell
```
1. Enter the poetry shell
```sh
poetry shell
```
2. Install dependencies
```
```sh
poetry install
```
3. Generate the Prisma client
3. Generate prisma client
```
```sh
poetry run prisma generate
```
In case Prisma generates the client for the global Python installation instead of the virtual environment, the current mitigation is to just uninstall the global Prisma package:
In case prisma generates client for the global python installation instead of the virtual environment the current mitigation is to just uninstall the global prisma package:
```
```sh
pip uninstall prisma
```
And then run the generation again.
The path *should* look something like this:
Then run the generation again. The path *should* look something like this:
`<some path>/pypoetry/virtualenvs/autogpt-server-TQIRSwR6-py3.12/bin/prisma`
4. Migrate the database, be careful because this deletes current data in the database
```
4. Migrate the database. Be careful because this deletes current data in the database.
```sh
poetry run prisma migrate dev
```
## Running The Server
# Running The Server
## Starting the server directly
### Starting the server directly
Run the following command:
```
```sh
poetry run app
```
## Running the App in the Background
### Running the App in the Background
1. Start the server, this starts the server in the background
```
**Note: this is a Unix feature and can fail on Windows. If it fails, you can run the previous command and manually move the process to the background.*
1. Start the server. This starts the server in the background.
```sh
poetry run cli start
```
You may need to change the permissions of the file to make it executable
```
chmod +x autogpt_server/cli.py
```
2. Stop the server.
2. Stop the server
```
```sh
poetry run cli stop
```
## Adding Test Data
1. Start the server using 1 of the above methods
1. Start the server using one of the above methods.
2. Run the populate db command
2. Run the populate DB command:
```
poetry run cli test populate-db http://0.0.0.0:8000
```
This will add a graph, a graph execution and a cron schedule to run the graph every 5 mins
```sh
poetry run cli test populate-db http://0.0.0.0:8000
```
This will add a graph, a graph execution, and a cron schedule to run the graph every 5 minutes.
### Reddit Graph
There is a command to add the test reddit graph
There is a command to add the test Reddit graph:
```
```sh
poetry run cli test reddit http://0.0.0.0:8000
```
For help run:
```
For help, run:
```sh
poetry run cli test reddit --help
```
# Testing
## Testing
To run the tests
```
To run the tests:
```sh
poetry run pytest
```
@@ -104,37 +106,37 @@ The current project has the following main modules:
### **blocks**
This module stores all the Agent Block, a reusable component to build a graph that represents the agent's behavior.
This module stores all the Agent Blocks, which are reusable components to build a graph that represents the agent's behavior.
### **data**
This module stores the logical model that is persisted in the database.
This module abstracts the database operation into a function that can be called by the service layer.
Any code that interacts with Prisma objects or databases should live in this module.
It abstracts the database operations into functions that can be called by the service layer.
Any code that interacts with Prisma objects or the database should reside in this module.
The main models are:
* `block`: anything related to the block used in the graph
* `execution`: anything related to the execution graph execution
* `graph`: anything related to the graph, node, and its relation
* `graph`: anything related to the graph, node, and its relations
### **execution**
This module stores the business logic of executing the graph.
It currently has the following main modules:
* `manager`: A service that consumes the queue of the graph execution and executes the graph. It contains both of the logic.
* `scheduler`: A service that triggers scheduled graph execution based on cron expression. It will push an execution request to the manager.
* `manager`: A service that consumes the queue of the graph execution and executes the graph. It contains both pieces of logic.
* `scheduler`: A service that triggers scheduled graph execution based on a cron expression. It pushes an execution request to the manager.
### **server**
This module stores the logic for the server API.
It stores all the logic used for the API that allows the client to create/execute/monitor the graph and its execution.
This API service will interact with other services like the ones defined in `manager` and `scheduler`.
It contains all the logic used for the API that allows the client to create, execute, and monitor the graph and its execution.
This API service interacts with other services like those defined in `manager` and `scheduler`.
### **utils**
This module stores the utility functions that are used across the project.
Currently, it only has two main modules:
This module stores utility functions that are used across the project.
Currently, it has two main modules:
* `process`: A module that contains the logic to spawn a new process.
* `service`: A module that becomes a parent class for all the services in the project.
* `service`: A module that serves as a parent class for all the services in the project.
## Service Communication
@@ -144,18 +146,18 @@ Currently, there are only 3 active services:
- ExecutionManager (the executor, defined in `manager.py`)
- ExecutionScheduler (the scheduler, defined in `scheduler.py`)
The service is running in an independent python process and communicates through an IPC.
The services run in independent Python processes and communicate through an IPC.
A communication layer (`service.py`) is created to decouple the communication library from the implementation.
Currently, the IPC is done using Pyro5 and abstracted in a way that it allows a function that is decorated with an `@expose` function can be called from the different process.
Currently, the IPC is done using Pyro5 and abstracted in a way that allows a function decorated with `@expose` to be called from a different process.
## Adding a new Agent Block
## Adding a New Agent Block
To add a new agent block, you need to create a new class that inherits from `Block` that provide these information:
* `input_schema`: the schema of the input data, represented by a pydantic object.
* `output_schema`: the schema of the output data, represented by a pydantic object.
* `run` method: the main logic of the block
* `test_input` & `test_output`: the sample input and output data for the block, this will be used to auto-test the block.
* You can mock the functions declared in the block using the `test_mock` field for your unit test.
* If you introduced a new module under the `blocks` package, you need to import the module in `blocks/__init__.py` to make it available to the server.
* Once you finished creating the block, you can test it by running the test using `pytest test/block/test_block.py`.
To add a new agent block, you need to create a new class that inherits from `Block` and provides the following information:
* `input_schema`: the schema of the input data, represented by a Pydantic object.
* `output_schema`: the schema of the output data, represented by a Pydantic object.
* `run` method: the main logic of the block.
* `test_input` & `test_output`: the sample input and output data for the block, which will be used to auto-test the block.
* You can mock the functions declared in the block using the `test_mock` field for your unit tests.
* If you introduce a new module under the `blocks` package, you need to import the module in `blocks/__init__.py` to make it available to the server.
* Once you finish creating the block, you can test it by running `pytest test/block/test_block.py`.

View File

@@ -34,7 +34,7 @@ class ConnectionManager:
graph_id = result.graph_id
if graph_id in self.subscriptions:
message = WsMessage(
method=Methods.UPDATE,
method=Methods.EXECUTION_EVENT,
channel=graph_id,
data=result.model_dump()
).model_dump_json()

View File

@@ -6,13 +6,24 @@ import pydantic
class Methods(enum.Enum):
SUBSCRIBE = "subscribe"
UNSUBSCRIBE = "unsubscribe"
UPDATE = "update"
EXECUTION_EVENT = "execution_event"
GET_BLOCKS = "get_blocks"
EXECUTE_BLOCK = "execute_block"
GET_GRAPHS = "get_graphs"
GET_GRAPH = "get_graph"
CREATE_GRAPH = "create_graph"
RUN_GRAPH = "run_graph"
GET_GRAPH_RUNS = "get_graph_runs"
CREATE_SCHEDULED_RUN = "create_scheduled_run"
GET_SCHEDULED_RUNS = "get_scheduled_runs"
UPDATE_SCHEDULED_RUN = "update_scheduled_run"
UPDATE_CONFIG = "update_config"
ERROR = "error"
class WsMessage(pydantic.BaseModel):
method: Methods
data: typing.Dict[str, typing.Any] | None = None
data: typing.Dict[str, typing.Any] | list[typing.Any] | None = None
success: bool | None = None
channel: str | None = None
error: str | None = None

View File

@@ -3,12 +3,18 @@ import uuid
from typing import Annotated, Any, Dict
import uvicorn
from fastapi import WebSocket
from fastapi.responses import JSONResponse
from fastapi.staticfiles import StaticFiles
from contextlib import asynccontextmanager
from fastapi import APIRouter, Body, FastAPI, HTTPException
from fastapi import (
APIRouter,
Body,
FastAPI,
HTTPException,
WebSocket,
WebSocketDisconnect,
)
from fastapi.middleware.cors import CORSMiddleware
from autogpt_server.data import db, execution, block
@@ -20,11 +26,12 @@ from autogpt_server.data.graph import (
)
from autogpt_server.executor import ExecutionManager, ExecutionScheduler
from autogpt_server.server.conn_manager import ConnectionManager
from autogpt_server.server.ws_api import websocket_router as ws_impl
import autogpt_server.server.ws_api
from autogpt_server.util.data import get_frontend_path
from autogpt_server.util.service import expose # type: ignore
from autogpt_server.util.service import AppService, get_service_client
from autogpt_server.util.settings import Settings
from autogpt_server.server.model import WsMessage, Methods
class AgentServer(AppService):
@@ -73,7 +80,7 @@ class AgentServer(AppService):
)
router.add_api_route(
path="/blocks/{block_id}/execute",
endpoint=self.execute_graph_block,
endpoint=self.execute_graph_block, # type: ignore
methods=["POST"],
)
router.add_api_route(
@@ -128,7 +135,7 @@ class AgentServer(AppService):
methods=["POST"],
)
app.add_exception_handler(500, self.handle_internal_error)
app.add_exception_handler(500, self.handle_internal_error) # type: ignore
app.mount(
path="/frontend",
@@ -140,7 +147,7 @@ class AgentServer(AppService):
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket): # type: ignore
await ws_impl(websocket, self.manager)
await self.websocket_router(websocket)
uvicorn.run(app, host="0.0.0.0", port=8000)
@@ -153,22 +160,191 @@ class AgentServer(AppService):
return get_service_client(ExecutionScheduler)
@classmethod
def handle_internal_error(cls, request, exc):
def handle_internal_error(cls, request, exc): # type: ignore
return JSONResponse(
content={
"message": f"{request.url.path} call failure",
"error": str(exc),
"message": f"{request.url.path} call failure", # type: ignore
"error": str(exc), # type: ignore
},
status_code=500,
)
@classmethod
def get_graph_blocks(cls) -> list[dict[Any, Any]]:
return [v.to_dict() for v in block.get_blocks().values()]
async def websocket_router(self, websocket: WebSocket):
await self.manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
message = WsMessage.model_validate_json(data)
if message.method == Methods.SUBSCRIBE:
await autogpt_server.server.ws_api.handle_subscribe(
websocket, self.manager, message
)
elif message.method == Methods.UNSUBSCRIBE:
await autogpt_server.server.ws_api.handle_unsubscribe(
websocket, self.manager, message
)
elif message.method == Methods.EXECUTION_EVENT:
print("Execution event received")
elif message.method == Methods.GET_BLOCKS:
data = self.get_graph_blocks()
await websocket.send_text(
WsMessage(
method=Methods.GET_BLOCKS,
success=True,
data=data,
).model_dump_json()
)
elif message.method == Methods.EXECUTE_BLOCK:
assert isinstance(message.data, dict), "Data must be a dictionary"
data = self.execute_graph_block(
message.data["block_id"], message.data["data"]
)
await websocket.send_text(
WsMessage(
method=Methods.EXECUTE_BLOCK,
success=True,
data=data,
).model_dump_json()
)
elif message.method == Methods.GET_GRAPHS:
data = await self.get_graphs()
await websocket.send_text(
WsMessage(
method=Methods.GET_GRAPHS,
success=True,
data=data,
).model_dump_json()
)
print("Get graphs request received")
elif message.method == Methods.GET_GRAPH:
assert isinstance(message.data, dict), "Data must be a dictionary"
data = await self.get_graph(message.data["graph_id"])
await websocket.send_text(
WsMessage(
method=Methods.GET_GRAPH,
success=True,
data=data.model_dump(),
).model_dump_json()
)
print("Get graph request received")
elif message.method == Methods.CREATE_GRAPH:
assert isinstance(message.data, dict), "Data must be a dictionary"
graph = Graph.model_validate(message.data)
data = await self.create_new_graph(graph)
await websocket.send_text(
WsMessage(
method=Methods.CREATE_GRAPH,
success=True,
data=data.model_dump(),
).model_dump_json()
)
print("Create graph request received")
elif message.method == Methods.RUN_GRAPH:
assert isinstance(message.data, dict), "Data must be a dictionary"
data = await self.execute_graph(
message.data["graph_id"], message.data["data"]
)
await websocket.send_text(
WsMessage(
method=Methods.RUN_GRAPH,
success=True,
data=data,
).model_dump_json()
)
print("Run graph request received")
elif message.method == Methods.GET_GRAPH_RUNS:
assert isinstance(message.data, dict), "Data must be a dictionary"
data = await self.list_graph_runs(message.data["graph_id"])
await websocket.send_text(
WsMessage(
method=Methods.GET_GRAPH_RUNS,
success=True,
data=data,
).model_dump_json()
)
print("Get graph runs request received")
elif message.method == Methods.CREATE_SCHEDULED_RUN:
assert isinstance(message.data, dict), "Data must be a dictionary"
data = await self.create_schedule(
message.data["graph_id"],
message.data["cron"],
message.data["data"],
)
await websocket.send_text(
WsMessage(
method=Methods.CREATE_SCHEDULED_RUN,
success=True,
data=data,
).model_dump_json()
)
print("Create scheduled run request received")
elif message.method == Methods.GET_SCHEDULED_RUNS:
assert isinstance(message.data, dict), "Data must be a dictionary"
data = self.get_execution_schedules(message.data["graph_id"])
await websocket.send_text(
WsMessage(
method=Methods.GET_SCHEDULED_RUNS,
success=True,
data=data,
).model_dump_json()
)
print("Get scheduled runs request received")
elif message.method == Methods.UPDATE_SCHEDULED_RUN:
assert isinstance(message.data, dict), "Data must be a dictionary"
data = self.update_schedule(
message.data["schedule_id"], message.data
)
await websocket.send_text(
WsMessage(
method=Methods.UPDATE_SCHEDULED_RUN,
success=True,
data=data,
).model_dump_json()
)
print("Update scheduled run request received")
elif message.method == Methods.UPDATE_CONFIG:
assert isinstance(message.data, dict), "Data must be a dictionary"
data = self.update_configuration(message.data)
await websocket.send_text(
WsMessage(
method=Methods.UPDATE_CONFIG,
success=True,
data=data,
).model_dump_json()
)
print("Update config request received")
elif message.method == Methods.ERROR:
print("Error message received")
else:
print("Message type is not processed by the server")
await websocket.send_text(
WsMessage(
method=Methods.ERROR,
success=False,
error="Message type is not processed by the server",
).model_dump_json()
)
except WebSocketDisconnect:
self.manager.disconnect(websocket)
print("Client Disconnected")
@classmethod
def execute_graph_block(cls, block_id: str, data: dict[str, Any]) -> list:
obj = block.get_block(block_id)
def get_graph_blocks(cls) -> list[dict[Any, Any]]:
return [v.to_dict() for v in block.get_blocks().values()] # type: ignore
@classmethod
def execute_graph_block(
cls, block_id: str, data: dict[str, Any]
) -> list[dict[str, Any]]:
obj = block.get_block(block_id) # type: ignore
if not obj:
raise HTTPException(status_code=404, detail=f"Block #{block_id} not found.")
return [{name: data} for name, data in obj.execute(data)]
@@ -252,9 +428,7 @@ class AgentServer(AppService):
@expose
def send_execution_update(self, execution_result_dict: dict[Any, Any]):
execution_result = execution.ExecutionResult(**execution_result_dict)
self.run_and_wait(
self.event_queue.put(execution_result)
)
self.run_and_wait(self.event_queue.put(execution_result))
@classmethod
def update_configuration(
@@ -275,12 +449,9 @@ class AgentServer(AppService):
setattr(settings.secrets, key, value) # type: ignore
updated_fields["secrets"].append(key)
settings.save()
return JSONResponse(
content={
"message": "Settings updated successfully",
"updated_fields": updated_fields,
},
status_code=200,
)
return {
"message": "Settings updated successfully",
"updated_fields": updated_fields,
}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))

View File

@@ -84,7 +84,7 @@ async def test_send_execution_result(
mock_websocket.send_text.assert_called_once_with(
WsMessage(
method=Methods.UPDATE,
method=Methods.EXECUTION_EVENT,
channel="test_graph",
data=result.model_dump(),
).model_dump_json()

View File

@@ -75,7 +75,8 @@ async def test_websocket_router_invalid_method(
mock_websocket: AsyncMock, mock_manager: AsyncMock
) -> None:
mock_websocket.receive_text.side_effect = [
WsMessage(method=Methods.UPDATE).model_dump_json(),
WsMessage(method=Methods.EXECUTION_EVENT).model_dump_json(),
WebSocketDisconnect(),
]