mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-02-09 22:35:54 -05:00
- Prep work for #8782 - Prep work for #8779 ### Changes 🏗️ - refactor(platform): Differentiate graph/node execution events - fix(platform): Subscribe to execution updates by `graph_exec_id` instead of `graph_id`+`graph_version` - refactor(backend): Move all execution related models and functions from `.data.graph` to `.data.execution` - refactor(backend): Reorganize & refactor `.data.execution` - fix(libs): Remove `load_dotenv` in `.auth.config` to fix test config issues - dx: Bump version of `black` in pre-commit config to v24.10.0 to match poetry.lock - Other minor refactoring in both frontend and backend ### Checklist 📋 #### For code changes: - [x] I have clearly listed my changes in the PR description - [x] I have made a test plan - [x] I have tested my changes according to the test plan: - Run an agent in the builder - [x] -> works normally, node I/O is updated in real time - Run an agent in the library - [x] -> works normally --------- Co-authored-by: Zamil Majdy <zamil.majdy@agpt.co>
54 lines
1.7 KiB
Python
54 lines
1.7 KiB
Python
import logging
|
|
import os
|
|
|
|
import pytest
|
|
from dotenv import load_dotenv
|
|
|
|
from backend.util.logging import configure_logging
|
|
|
|
load_dotenv()
|
|
|
|
# NOTE: You can run tests like with the --log-cli-level=INFO to see the logs
|
|
# Set up logging
|
|
configure_logging()
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Reduce Prisma log spam unless PRISMA_DEBUG is set
|
|
if not os.getenv("PRISMA_DEBUG"):
|
|
prisma_logger = logging.getLogger("prisma")
|
|
prisma_logger.setLevel(logging.INFO)
|
|
|
|
|
|
@pytest.fixture(scope="session")
|
|
async def server():
|
|
from backend.util.test import SpinTestServer
|
|
|
|
async with SpinTestServer() as server:
|
|
yield server
|
|
|
|
|
|
@pytest.fixture(scope="session", autouse=True)
|
|
async def graph_cleanup(server):
|
|
created_graph_ids = []
|
|
original_create_graph = server.agent_server.test_create_graph
|
|
|
|
async def create_graph_wrapper(*args, **kwargs):
|
|
created_graph = await original_create_graph(*args, **kwargs)
|
|
# Extract user_id correctly
|
|
user_id = kwargs.get("user_id", args[2] if len(args) > 2 else None)
|
|
created_graph_ids.append((created_graph.id, user_id))
|
|
return created_graph
|
|
|
|
try:
|
|
server.agent_server.test_create_graph = create_graph_wrapper
|
|
yield # This runs the test function
|
|
finally:
|
|
server.agent_server.test_create_graph = original_create_graph
|
|
|
|
# Delete the created graphs and assert they were deleted
|
|
for graph_id, user_id in created_graph_ids:
|
|
if user_id:
|
|
resp = await server.agent_server.test_delete_graph(graph_id, user_id)
|
|
num_deleted = resp["version_counts"]
|
|
assert num_deleted > 0, f"Graph {graph_id} was not deleted."
|