Splitting rest services into routes

This commit is contained in:
SwiftyOS
2024-09-16 16:56:09 +02:00
parent 5a7193cfb7
commit 3d4aca9fcc
5 changed files with 150 additions and 0 deletions

View File

@@ -0,0 +1,108 @@
from fastapi import APIRouter, Depends, HTTPException
from typing import Annotated, Any, Dict, List
from autogpt_server.data import graph as graph_db
from autogpt_server.server.model import CreateGraph, SetGraphActiveVersion
from autogpt_server.server.utils import get_user_id
router = APIRouter()
@router.get("/graphs")
async def get_graphs(user_id: Annotated[str, Depends(get_user_id)]) -> List[graph_db.GraphMeta]:
# Stub implementation
return []
@router.get("/templates")
async def get_templates() -> List[graph_db.GraphMeta]:
# Stub implementation
return []
@router.post("/graphs")
async def create_new_graph(create_graph: CreateGraph, user_id: Annotated[str, Depends(get_user_id)]) -> graph_db.Graph:
# Stub implementation
return graph_db.Graph()
@router.post("/templates")
async def create_new_template(create_graph: CreateGraph, user_id: Annotated[str, Depends(get_user_id)]) -> graph_db.Graph:
# Stub implementation
return graph_db.Graph()
@router.get("/graphs/{graph_id}")
async def get_graph(graph_id: str, user_id: Annotated[str, Depends(get_user_id)], version: int | None = None) -> graph_db.Graph:
# Stub implementation
return graph_db.Graph()
@router.get("/templates/{graph_id}")
async def get_template(graph_id: str, version: int | None = None) -> graph_db.Graph:
# Stub implementation
return graph_db.Graph()
@router.put("/graphs/{graph_id}")
async def update_graph(graph_id: str, graph: graph_db.Graph, user_id: Annotated[str, Depends(get_user_id)]) -> graph_db.Graph:
# Stub implementation
return graph_db.Graph()
@router.put("/templates/{graph_id}")
async def update_template(graph_id: str, graph: graph_db.Graph, user_id: Annotated[str, Depends(get_user_id)]) -> graph_db.Graph:
# Stub implementation
return graph_db.Graph()
@router.get("/graphs/{graph_id}/versions")
async def get_graph_all_versions(graph_id: str, user_id: Annotated[str, Depends(get_user_id)]) -> List[graph_db.Graph]:
# Stub implementation
return []
@router.get("/templates/{graph_id}/versions")
async def get_template_all_versions(graph_id: str, user_id: Annotated[str, Depends(get_user_id)]) -> List[graph_db.Graph]:
# Stub implementation
return []
@router.get("/graphs/{graph_id}/versions/{version}")
async def get_graph_version(graph_id: str, version: int, user_id: Annotated[str, Depends(get_user_id)]) -> graph_db.Graph:
# Stub implementation
return graph_db.Graph()
@router.put("/graphs/{graph_id}/versions/active")
async def set_graph_active_version(graph_id: str, request_body: SetGraphActiveVersion, user_id: Annotated[str, Depends(get_user_id)]):
# Stub implementation
pass
@router.get("/graphs/{graph_id}/input_schema")
async def get_graph_input_schema(graph_id: str, user_id: Annotated[str, Depends(get_user_id)]) -> List[graph_db.InputSchemaItem]:
# Stub implementation
return []
@router.post("/graphs/{graph_id}/execute")
async def execute_graph(graph_id: str, node_input: Dict[Any, Any], user_id: Annotated[str, Depends(get_user_id)]) -> Dict[str, Any]:
# Stub implementation
return {}
@router.get("/graphs/{graph_id}/executions")
async def list_graph_runs(graph_id: str, user_id: Annotated[str, Depends(get_user_id)], graph_version: int | None = None) -> List[str]:
# Stub implementation
return []
@router.get("/graphs/{graph_id}/executions/{graph_exec_id}")
async def get_graph_run_node_execution_results(graph_id: str, graph_exec_id: str, user_id: Annotated[str, Depends(get_user_id)]) -> List[Any]:
# Stub implementation
return []
@router.post("/graphs/{graph_id}/executions/{graph_exec_id}/stop")
async def stop_graph_run(graph_id: str, graph_exec_id: str, user_id: Annotated[str, Depends(get_user_id)]) -> List[Any]:
# Stub implementation
return []
@router.post("/graphs/{graph_id}/schedules")
async def create_schedule(graph_id: str, cron: str, input_data: Dict[Any, Any], user_id: Annotated[str, Depends(get_user_id)]) -> Dict[Any, Any]:
# Stub implementation
return {}
@router.get("/graphs/{graph_id}/schedules")
async def get_execution_schedules(graph_id: str, user_id: Annotated[str, Depends(get_user_id)]) -> Dict[str, str]:
# Stub implementation
return {}
@router.put("/graphs/schedules/{schedule_id}")
async def update_schedule(schedule_id: str, input_data: Dict[Any, Any], user_id: Annotated[str, Depends(get_user_id)]) -> Dict[Any, Any]:
# Stub implementation
return {}

View File

@@ -0,0 +1,17 @@
from fastapi import APIRouter
from autogpt_server.server.rest_api import AgentServer
from typing import Any
router = APIRouter()
@router.get("/blocks")
async def get_graph_blocks():
return AgentServer.get_graph_blocks()
@router.get("/blocks/costs")
async def get_graph_block_costs():
return AgentServer.get_graph_block_costs()
@router.post("/blocks/{block_id}/execute")
async def execute_graph_block(block_id: str, data: dict[str, Any]):
return AgentServer.execute_graph_block(block_id, data)

View File

@@ -0,0 +1,25 @@
from fastapi import APIRouter
router = APIRouter()
@router.get("/")
async def root():
return {"message": "Welcome to the Autogpt Server API"}
@router.post("/auth/user")
async def get_or_create_user_route():
# Stub implementation
return {"message": "User created or retrieved successfully"}
@router.get("/credits")
async def get_user_credits():
# Stub implementation
return {"credits": 100} # Replace with actual credit retrieval logic
@router.post("/settings")
async def update_configuration(updated_settings: dict):
# Stub implementation
return {
"message": "Settings updated successfully",
"updated_fields": {"config": [], "secrets": []}
} # Replace with actual configuration update logic