Files
AutoGPT/autogpt_platform/backend/backend/cli.py
Reinier van der Leer de78d062a9 refactor(backend/api): Clean up API file structure (#11629)
We'll soon be needing a more feature-complete external API. To make way
for this, I'm moving some files around so:
- We can more easily create new versions of our external API
- The file structure of our internal API is more homogeneous

These changes are quite opinionated, but IMO in any case they're better
than the chaotic structure we have now.

### Changes 🏗️

- Move `backend/server` -> `backend/api`
- Move `backend/server/routers` + `backend/server/v2` ->
`backend/api/features`
  - Change absolute sibling imports to relative imports
- Move `backend/server/v2/AutoMod` -> `backend/executor/automod`
- Combine `backend/server/routers/analytics_*test.py` ->
`backend/api/features/analytics_test.py`
- Sort OpenAPI spec file

### Checklist 📋

#### For code changes:
- [x] I have clearly listed my changes in the PR description
- [x] I have made a test plan
- [x] I have tested my changes according to the test plan:
  - CI tests
  - [x] Clicking around in the app -> no obvious breakage
2025-12-20 20:33:10 +00:00

275 lines
6.7 KiB
Python
Executable File

"""
The command line interface for the agent server
"""
import os
import pathlib
import click
import psutil
from backend.util.process import AppProcess
def get_pid_path() -> pathlib.Path:
home_dir = pathlib.Path.home()
new_dir = home_dir / ".config" / "agpt"
file_path = new_dir / "running.tmp"
return file_path
def get_pid() -> int | None:
file_path = get_pid_path()
if not file_path.exists():
return None
os.makedirs(file_path.parent, exist_ok=True)
with open(file_path, "r", encoding="utf-8") as file:
pid = file.read()
try:
return int(pid)
except ValueError:
return None
def write_pid(pid: int):
file_path = get_pid_path()
os.makedirs(file_path.parent, exist_ok=True)
with open(file_path, "w", encoding="utf-8") as file:
file.write(str(pid))
class MainApp(AppProcess):
def run(self):
from backend import app
app.main(silent=True)
@click.group()
def main():
"""AutoGPT Server CLI Tool"""
pass
@main.command()
def start():
"""
Starts the server in the background and saves the PID
"""
# Define the path for the new directory and file
pid = get_pid()
if pid and psutil.pid_exists(pid):
print("Server is already running")
exit(1)
elif pid:
print("PID does not exist deleting file")
os.remove(get_pid_path())
print("Starting server")
pid = MainApp().start(background=True, silent=True)
print(f"Server running in process: {pid}")
write_pid(pid)
print("done")
os._exit(status=0)
@main.command()
def stop():
"""
Stops the server
"""
pid = get_pid()
if not pid:
print("Server is not running")
return
os.remove(get_pid_path())
process = psutil.Process(int(pid))
for child in process.children(recursive=True):
child.terminate()
process.terminate()
print("Server Stopped")
@main.command()
def gen_encrypt_key():
"""
Generate a new encryption key
"""
from cryptography.fernet import Fernet
print(Fernet.generate_key().decode())
@click.group()
def test():
"""
Group for test commands
"""
pass
@test.command()
@click.argument("server_address")
async def reddit(server_address: str):
"""
Create an event graph
"""
from backend.usecases.reddit_marketing import create_test_graph
from backend.util.request import Requests
test_graph = create_test_graph()
url = f"{server_address}/graphs"
headers = {"Content-Type": "application/json"}
data = test_graph.model_dump_json()
response = await Requests(trusted_origins=[server_address]).post(
url, headers=headers, data=data
)
graph_id = response.json()["id"]
print(f"Graph created with ID: {graph_id}")
@test.command()
@click.argument("server_address")
async def populate_db(server_address: str):
"""
Create an event graph
"""
from backend.usecases.sample import create_test_graph
from backend.util.request import Requests
test_graph = create_test_graph()
url = f"{server_address}/graphs"
headers = {"Content-Type": "application/json"}
data = test_graph.model_dump_json()
response = await Requests(trusted_origins=[server_address]).post(
url, headers=headers, data=data
)
graph_id = response.json()["id"]
if response.status == 200:
execute_url = f"{server_address}/graphs/{response.json()['id']}/execute"
text = "Hello, World!"
input_data = {"input": text}
response = Requests(trusted_origins=[server_address]).post(
execute_url, headers=headers, json=input_data
)
schedule_url = f"{server_address}/graphs/{graph_id}/schedules"
data = {
"graph_id": graph_id,
"cron": "*/5 * * * *",
"input_data": {"input": "Hello, World!"},
}
response = Requests(trusted_origins=[server_address]).post(
schedule_url, headers=headers, json=data
)
print("Database populated with: \n- graph\n- execution\n- schedule")
@test.command()
@click.argument("server_address")
async def graph(server_address: str):
"""
Create an event graph
"""
from backend.usecases.sample import create_test_graph
from backend.util.request import Requests
url = f"{server_address}/graphs"
headers = {"Content-Type": "application/json"}
data = create_test_graph().model_dump_json()
response = await Requests(trusted_origins=[server_address]).post(
url, headers=headers, data=data
)
if response.status == 200:
print(response.json()["id"])
execute_url = f"{server_address}/graphs/{response.json()['id']}/execute"
text = "Hello, World!"
input_data = {"input": text}
response = await Requests(trusted_origins=[server_address]).post(
execute_url, headers=headers, json=input_data
)
else:
print("Failed to send graph")
print(f"Response: {response.text()}")
@test.command()
@click.argument("graph_id")
@click.argument("content")
async def execute(graph_id: str, content: dict):
"""
Create an event graph
"""
from backend.util.request import Requests
headers = {"Content-Type": "application/json"}
execute_url = f"http://0.0.0.0:8000/graphs/{graph_id}/execute"
await Requests(trusted_origins=["http://0.0.0.0:8000"]).post(
execute_url, headers=headers, json=content
)
@test.command()
def event():
"""
Send an event to the running server
"""
print("Event sent")
@test.command()
@click.argument("server_address")
@click.argument("graph_exec_id")
def websocket(server_address: str, graph_exec_id: str):
"""
Tests the websocket connection.
"""
import asyncio
import websockets.asyncio.client
from backend.api.ws_api import WSMessage, WSMethod, WSSubscribeGraphExecutionRequest
async def send_message(server_address: str):
uri = f"ws://{server_address}"
async with websockets.asyncio.client.connect(uri) as websocket:
try:
msg = WSMessage(
method=WSMethod.SUBSCRIBE_GRAPH_EXEC,
data=WSSubscribeGraphExecutionRequest(
graph_exec_id=graph_exec_id,
).model_dump(),
).model_dump_json()
await websocket.send(msg)
print(f"Sending: {msg}")
while True:
response = await websocket.recv()
print(f"Response from server: {response}")
except InterruptedError:
exit(0)
asyncio.run(send_message(server_address))
print("Testing WS")
main.add_command(test)
if __name__ == "__main__":
main()