feat(backend): Enable executing store agents without agent ownership (#9267)

This re-introduces PR #9179 with some fixes.

This PR enables the execution of store agents even if they are not owned
by the user. Key changes include handling store-listed agents in the
`get_graph` logic, improving execution flow, and ensuring
version-specific handling. These updates support more flexible agent
execution.

### Changes 🏗️
(copied from #9179)

- **Graph Retrieval:** Updated `get_graph` to check store listings for
agents not owned by the user.
- **Version Handling:** Added `graph_version` to execution methods for
consistent version-specific execution.
- **Execution Flow:** Refactored `scheduler.py`, `rest_api.py`, and
other modules for clearer logic and better maintainability.
- **Testing:** Updated `test_manager.py` and other test cases to
validate execution of store-listed agents added test for accessing graph

Out-of-scope changes:
- Add logic to pretty-print Pydantic validation error responses to
backend API client in frontend

---------

Co-authored-by: Nicholas Tindle <nicktindle@outlook.com>
Co-authored-by: Nicholas Tindle <nicholas.tindle@agpt.co>
Co-authored-by: Swifty <craigswift13@gmail.com>
Co-authored-by: Zamil Majdy <zamil.majdy@agpt.co>
This commit is contained in:
Reinier van der Leer
2025-01-28 12:00:32 +01:00
committed by GitHub
parent a7a59a26b9
commit 0811e8a990
17 changed files with 447 additions and 46 deletions

View File

@@ -6,7 +6,13 @@ from datetime import datetime, timezone
from typing import Any, Literal, Optional, Type
import prisma
from prisma.models import AgentGraph, AgentGraphExecution, AgentNode, AgentNodeLink
from prisma.models import (
AgentGraph,
AgentGraphExecution,
AgentNode,
AgentNodeLink,
StoreListingVersion,
)
from prisma.types import AgentGraphWhereInput
from pydantic.fields import computed_field
@@ -543,21 +549,35 @@ async def get_graph(
where_clause: AgentGraphWhereInput = {
"id": graph_id,
}
if version is not None:
where_clause["version"] = version
elif not template:
where_clause["isActive"] = True
# TODO: Fix hack workaround to get adding store agents to work
if user_id is not None and not template:
where_clause["userId"] = user_id
graph = await AgentGraph.prisma().find_first(
where=where_clause,
include=AGENT_GRAPH_INCLUDE,
order={"version": "desc"},
)
return GraphModel.from_db(graph, for_export) if graph else None
# For access, the graph must be owned by the user or listed in the store
if graph is None or (
graph.userId != user_id
and not (
await StoreListingVersion.prisma().find_first(
where={
"agentId": graph_id,
"agentVersion": version or graph.version,
"isDeleted": False,
"StoreListing": {"is": {"isApproved": True}},
}
)
)
):
return None
return GraphModel.from_db(graph, for_export)
async def set_graph_active_version(graph_id: str, version: int, user_id: str) -> None:

View File

@@ -8,7 +8,7 @@ import threading
from concurrent.futures import Future, ProcessPoolExecutor
from contextlib import contextmanager
from multiprocessing.pool import AsyncResult, Pool
from typing import TYPE_CHECKING, Any, Generator, TypeVar, cast
from typing import TYPE_CHECKING, Any, Generator, Optional, TypeVar, cast
from redis.lock import Lock as RedisLock
@@ -800,7 +800,7 @@ class ExecutionManager(AppService):
graph_id: str,
data: BlockInput,
user_id: str,
graph_version: int | None = None,
graph_version: Optional[int] = None,
) -> GraphExecutionEntry:
graph: GraphModel | None = self.db_client.get_graph(
graph_id=graph_id, user_id=user_id, version=graph_version

View File

@@ -63,7 +63,10 @@ def execute_graph(**kwargs):
try:
log(f"Executing recurring job for graph #{args.graph_id}")
get_execution_client().add_execution(
args.graph_id, args.input_data, args.user_id
graph_id=args.graph_id,
data=args.input_data,
user_id=args.user_id,
graph_version=args.graph_version,
)
except Exception as e:
logger.exception(f"Error executing graph {args.graph_id}: {e}")

View File

@@ -310,7 +310,8 @@ async def webhook_ingress_generic(
continue
logger.debug(f"Executing graph #{node.graph_id} node #{node.id}")
executor.add_execution(
node.graph_id,
graph_id=node.graph_id,
graph_version=node.graph_version,
data={f"webhook_{webhook_id}_payload": payload},
user_id=webhook.user_id,
)

View File

@@ -1,7 +1,8 @@
import contextlib
import logging
import typing
from typing import Any, Optional
import autogpt_libs.auth.models
import fastapi
import fastapi.responses
import starlette.middleware.cors
@@ -17,6 +18,7 @@ import backend.data.graph
import backend.data.user
import backend.server.routers.v1
import backend.server.v2.library.routes
import backend.server.v2.store.model
import backend.server.v2.store.routes
import backend.util.service
import backend.util.settings
@@ -120,9 +122,27 @@ class AgentServer(backend.util.service.AppProcess):
@staticmethod
async def test_execute_graph(
graph_id: str, node_input: dict[typing.Any, typing.Any], user_id: str
graph_id: str,
node_input: dict[str, Any],
user_id: str,
graph_version: Optional[int] = None,
):
return backend.server.routers.v1.execute_graph(graph_id, node_input, user_id)
return backend.server.routers.v1.execute_graph(
user_id=user_id,
graph_id=graph_id,
graph_version=graph_version,
node_input=node_input,
)
@staticmethod
async def test_get_graph(
graph_id: str,
graph_version: int,
user_id: str,
):
return await backend.server.routers.v1.get_graph(
graph_id, user_id, graph_version
)
@staticmethod
async def test_create_graph(
@@ -152,5 +172,18 @@ class AgentServer(backend.util.service.AppProcess):
async def test_delete_graph(graph_id: str, user_id: str):
return await backend.server.routers.v1.delete_graph(graph_id, user_id)
@staticmethod
async def test_create_store_listing(
request: backend.server.v2.store.model.StoreSubmissionRequest, user_id: str
):
return await backend.server.v2.store.routes.create_submission(request, user_id)
@staticmethod
async def test_review_store_listing(
request: backend.server.v2.store.model.ReviewSubmissionRequest,
user: autogpt_libs.auth.models.User,
):
return await backend.server.v2.store.routes.review_submission(request, user)
def set_test_dependency_overrides(self, overrides: dict):
app.dependency_overrides.update(overrides)

View File

@@ -475,10 +475,11 @@ def execute_graph(
graph_id: str,
node_input: dict[Any, Any],
user_id: Annotated[str, Depends(get_user_id)],
graph_version: Optional[int] = None,
) -> dict[str, Any]: # FIXME: add proper return type
try:
graph_exec = execution_manager_client().add_execution(
graph_id, node_input, user_id=user_id
graph_id, node_input, user_id=user_id, graph_version=graph_version
)
return {"id": graph_exec.graph_exec_id}
except Exception as e:

View File

@@ -91,7 +91,7 @@ async def add_agent_to_library(
# Create a new graph from the template
graph = await backend.data.graph.get_graph(
agent.id, agent.version, template=True, user_id=user_id
agent.id, agent.version, user_id=user_id
)
if not graph:

View File

@@ -325,7 +325,10 @@ async def get_store_submissions(
where = prisma.types.StoreSubmissionWhereInput(user_id=user_id)
# Query submissions from database
submissions = await prisma.models.StoreSubmission.prisma().find_many(
where=where, skip=skip, take=page_size, order=[{"date_submitted": "desc"}]
where=where,
skip=skip,
take=page_size,
order=[{"date_submitted": "desc"}],
)
# Get total count for pagination
@@ -405,9 +408,7 @@ async def delete_store_submission(
)
# Delete the submission
await prisma.models.StoreListing.prisma().delete(
where=prisma.types.StoreListingWhereUniqueInput(id=submission.id)
)
await prisma.models.StoreListing.prisma().delete(where={"id": submission.id})
logger.debug(
f"Successfully deleted submission {submission_id} for user {user_id}"
@@ -504,7 +505,15 @@ async def create_store_submission(
"subHeading": sub_heading,
}
},
}
},
include={"StoreListingVersions": True},
)
store_listing_version_id = (
listing.StoreListingVersions[0].id
if listing.StoreListingVersions is not None
and len(listing.StoreListingVersions) > 0
else None
)
logger.debug(f"Created store listing for agent {agent_id}")
@@ -521,6 +530,7 @@ async def create_store_submission(
status=prisma.enums.SubmissionStatus.PENDING,
runs=0,
rating=0.0,
store_listing_version_id=store_listing_version_id,
)
except (
@@ -799,7 +809,7 @@ async def get_agent(
try:
store_listing_version = (
await prisma.models.StoreListingVersion.prisma().find_unique(
where={"id": store_listing_version_id}, include={"Agent": True}
where={"id": store_listing_version_id}
)
)
@@ -809,15 +819,17 @@ async def get_agent(
detail=f"Store listing version {store_listing_version_id} not found",
)
agent = store_listing_version.Agent
graph = await backend.data.graph.get_graph(
agent.id, agent.version, template=True
)
graph_id = store_listing_version.agentId
graph_version = store_listing_version.agentVersion
graph = await backend.data.graph.get_graph(graph_id, graph_version)
if not graph:
raise fastapi.HTTPException(
status_code=404, detail=f"Agent {agent.id} not found"
status_code=404,
detail=(
f"Agent #{graph_id} not found "
f"for store listing version #{store_listing_version_id}"
),
)
graph.version = 1
@@ -832,3 +844,68 @@ async def get_agent(
raise backend.server.v2.store.exceptions.DatabaseError(
"Failed to fetch agent"
) from e
async def review_store_submission(
store_listing_version_id: str, is_approved: bool, comments: str, reviewer_id: str
) -> prisma.models.StoreListingSubmission:
"""Review a store listing submission."""
try:
store_listing_version = (
await prisma.models.StoreListingVersion.prisma().find_unique(
where={"id": store_listing_version_id},
include={"StoreListing": True},
)
)
if not store_listing_version or not store_listing_version.StoreListing:
raise fastapi.HTTPException(
status_code=404,
detail=f"Store listing version {store_listing_version_id} not found",
)
if is_approved:
await prisma.models.StoreListing.prisma().update(
where={"id": store_listing_version.StoreListing.id},
data={"isApproved": True},
)
submission_status = (
prisma.enums.SubmissionStatus.APPROVED
if is_approved
else prisma.enums.SubmissionStatus.REJECTED
)
update_data: prisma.types.StoreListingSubmissionUpdateInput = {
"Status": submission_status,
"reviewComments": comments,
"Reviewer": {"connect": {"id": reviewer_id}},
"StoreListing": {"connect": {"id": store_listing_version.StoreListing.id}},
}
create_data: prisma.types.StoreListingSubmissionCreateInput = {
**update_data,
"StoreListingVersion": {"connect": {"id": store_listing_version_id}},
}
submission = await prisma.models.StoreListingSubmission.prisma().upsert(
where={"storeListingVersionId": store_listing_version_id},
data={
"create": create_data,
"update": update_data,
},
)
if not submission:
raise fastapi.HTTPException( # FIXME: don't return HTTP exceptions here
status_code=404,
detail=f"Store listing submission {store_listing_version_id} not found",
)
return submission
except Exception as e:
logger.error(f"Could not create store submission review: {str(e)}")
raise backend.server.v2.store.exceptions.DatabaseError(
"Failed to create store submission review"
) from e

View File

@@ -115,6 +115,7 @@ class StoreSubmission(pydantic.BaseModel):
status: prisma.enums.SubmissionStatus
runs: int
rating: float
store_listing_version_id: str | None = None
class StoreSubmissionsResponse(pydantic.BaseModel):
@@ -151,3 +152,9 @@ class StoreReviewCreate(pydantic.BaseModel):
store_listing_version_id: str
score: int
comments: str | None = None
class ReviewSubmissionRequest(pydantic.BaseModel):
store_listing_version_id: str
is_approved: bool
comments: str

View File

@@ -642,3 +642,30 @@ async def download_agent_file(
return fastapi.responses.FileResponse(
tmp_file.name, filename=file_name, media_type="application/json"
)
@router.post(
"/submissions/review/{store_listing_version_id}",
tags=["store", "private"],
)
async def review_submission(
request: backend.server.v2.store.model.ReviewSubmissionRequest,
user: typing.Annotated[
autogpt_libs.auth.models.User,
fastapi.Depends(autogpt_libs.auth.depends.requires_admin_user),
],
):
# Proceed with the review submission logic
try:
submission = await backend.server.v2.store.db.review_store_submission(
store_listing_version_id=request.store_listing_version_id,
is_approved=request.is_approved,
comments=request.comments,
reviewer_id=user.user_id,
)
return submission
except Exception:
raise fastapi.HTTPException(
status_code=500,
detail="An error occurred while creating the store submission review",
)

View File

@@ -8,12 +8,19 @@ from backend.data.user import get_or_create_user
from backend.util.test import SpinTestServer, wait_execution
async def create_test_user() -> User:
test_user_data = {
"sub": "ef3b97d7-1161-4eb4-92b2-10c24fb154c1",
"email": "testuser#example.com",
"name": "Test User",
}
async def create_test_user(alt_user: bool = False) -> User:
if alt_user:
test_user_data = {
"sub": "3e53486c-cf57-477e-ba2a-cb02dc828e1b",
"email": "testuser2@example.com",
"name": "Test User 2",
}
else:
test_user_data = {
"sub": "ef3b97d7-1161-4eb4-92b2-10c24fb154c1",
"email": "testuser@example.com",
"name": "Test User",
}
user = await get_or_create_user(test_user_data)
return user

View File

@@ -0,0 +1,29 @@
/*
Warnings:
- A unique constraint covering the columns `[agentId]` on the table `StoreListing` will be added. If there are existing duplicate values, this will fail.
*/
-- DropIndex
DROP INDEX "StoreListing_agentId_idx";
-- DropIndex
DROP INDEX "StoreListing_isApproved_idx";
-- DropIndex
DROP INDEX "StoreListingVersion_agentId_agentVersion_isApproved_idx";
-- CreateIndex
CREATE INDEX "StoreListing_agentId_owningUserId_idx" ON "StoreListing"("agentId", "owningUserId");
-- CreateIndex
CREATE INDEX "StoreListing_isDeleted_isApproved_idx" ON "StoreListing"("isDeleted", "isApproved");
-- CreateIndex
CREATE INDEX "StoreListing_isDeleted_idx" ON "StoreListing"("isDeleted");
-- CreateIndex
CREATE UNIQUE INDEX "StoreListing_agentId_key" ON "StoreListing"("agentId");
-- CreateIndex
CREATE INDEX "StoreListingVersion_agentId_agentVersion_isDeleted_idx" ON "StoreListingVersion"("agentId", "agentVersion", "isDeleted");

View File

@@ -0,0 +1,8 @@
/*
Warnings:
- A unique constraint covering the columns `[storeListingVersionId]` on the table `StoreListingSubmission` will be added. If there are existing duplicate values, this will fail.
*/
-- CreateIndex
CREATE UNIQUE INDEX "StoreListingSubmission_storeListingVersionId_key" ON "StoreListingSubmission"("storeListingVersionId");

View File

@@ -235,7 +235,7 @@ model AgentGraphExecution {
AgentNodeExecutions AgentNodeExecution[]
// Link to User model
// Link to User model -- Executed by this user
userId String
user User @relation(fields: [userId], references: [id], onDelete: Cascade)
@@ -442,6 +442,8 @@ view Creator {
agent_rating Float
agent_runs Int
is_featured Boolean
// Index or unique are not applied to views
}
view StoreAgent {
@@ -464,11 +466,7 @@ view StoreAgent {
rating Float
versions String[]
@@unique([creator_username, slug])
@@index([creator_username])
@@index([featured])
@@index([categories])
@@index([storeListingVersionId])
// Index or unique are not applied to views
}
view StoreSubmission {
@@ -486,7 +484,7 @@ view StoreSubmission {
agent_id String
agent_version Int
@@index([user_id])
// Index or unique are not applied to views
}
model StoreListing {
@@ -509,9 +507,13 @@ model StoreListing {
StoreListingVersions StoreListingVersion[]
StoreListingSubmission StoreListingSubmission[]
@@index([isApproved])
@@index([agentId])
// Unique index on agentId to ensure only one listing per agent, regardless of number of versions the agent has.
@@unique([agentId])
@@index([agentId, owningUserId])
@@index([owningUserId])
// Used in the view query
@@index([isDeleted, isApproved])
@@index([isDeleted])
}
model StoreListingVersion {
@@ -552,7 +554,7 @@ model StoreListingVersion {
StoreListingReview StoreListingReview[]
@@unique([agentId, agentVersion])
@@index([agentId, agentVersion, isApproved])
@@index([agentId, agentVersion, isDeleted])
}
model StoreListingReview {
@@ -596,6 +598,7 @@ model StoreListingSubmission {
Status SubmissionStatus @default(PENDING)
reviewComments String?
@@unique([storeListingVersionId])
@@index([storeListingId])
}

View File

@@ -1,14 +1,18 @@
from typing import Any
from uuid import UUID
import autogpt_libs.auth.models
import fastapi.exceptions
import pytest
import backend.server.v2.store.model as store
from backend.blocks.basic import AgentInputBlock, AgentOutputBlock, StoreValueBlock
from backend.data.block import BlockSchema
from backend.data.graph import Graph, Link, Node
from backend.data.model import SchemaField
from backend.data.user import DEFAULT_USER_ID
from backend.server.model import CreateGraph
from backend.usecases.sample import create_test_user
from backend.util.test import SpinTestServer
@@ -202,3 +206,92 @@ async def test_clean_graph(server: SpinTestServer):
n for n in created_graph.nodes if n.block_id == AgentInputBlock().id
)
assert input_node.input_default["value"] == ""
@pytest.mark.asyncio(scope="session")
async def test_access_store_listing_graph(server: SpinTestServer):
"""
Test the access of a store listing graph.
"""
graph = Graph(
id="test_clean_graph",
name="Test Clean Graph",
description="Test graph cleaning",
nodes=[
Node(
id="input_node",
block_id=AgentInputBlock().id,
input_default={
"name": "test_input",
"value": "test value",
"description": "Test input description",
},
),
],
links=[],
)
# Create graph and get model
create_graph = CreateGraph(graph=graph)
created_graph = await server.agent_server.test_create_graph(
create_graph, DEFAULT_USER_ID
)
store_submission_request = store.StoreSubmissionRequest(
agent_id=created_graph.id,
agent_version=created_graph.version,
slug="test-slug",
name="Test name",
sub_heading="Test sub heading",
video_url=None,
image_urls=[],
description="Test description",
categories=[],
)
# First we check the graph an not be accessed by a different user
with pytest.raises(fastapi.exceptions.HTTPException) as exc_info:
await server.agent_server.test_get_graph(
created_graph.id,
created_graph.version,
"3e53486c-cf57-477e-ba2a-cb02dc828e1b",
)
assert exc_info.value.status_code == 404
assert "Graph" in str(exc_info.value.detail)
# Now we create a store listing
store_listing = await server.agent_server.test_create_store_listing(
store_submission_request, DEFAULT_USER_ID
)
if isinstance(store_listing, fastapi.responses.JSONResponse):
assert False, "Failed to create store listing"
slv_id = (
store_listing.store_listing_version_id
if store_listing.store_listing_version_id is not None
else None
)
assert slv_id is not None
admin_user = await create_test_user(alt_user=True)
await server.agent_server.test_review_store_listing(
store.ReviewSubmissionRequest(
store_listing_version_id=slv_id,
is_approved=True,
comments="Test comments",
),
autogpt_libs.auth.models.User(
user_id=admin_user.id,
role="admin",
email=admin_user.email,
phone_number="1234567890",
),
)
# Now we check the graph can be accessed by a user that does not own the graph
got_graph = await server.agent_server.test_get_graph(
created_graph.id, created_graph.version, "3e53486c-cf57-477e-ba2a-cb02dc828e1b"
)
assert got_graph is not None

View File

@@ -1,8 +1,11 @@
import logging
import autogpt_libs.auth.models
import fastapi.responses
import pytest
from prisma.models import User
import backend.server.v2.store.model
from backend.blocks.basic import FindInDictionaryBlock, StoreValueBlock
from backend.blocks.maths import CalculatorBlock, Operation
from backend.data import execution, graph
@@ -31,7 +34,10 @@ async def execute_graph(
# --- Test adding new executions --- #
response = await agent_server.test_execute_graph(
test_graph.id, input_data, test_user.id
user_id=test_user.id,
graph_id=test_graph.id,
graph_version=test_graph.version,
node_input=input_data,
)
graph_exec_id = response["id"]
logger.info(f"Created execution with ID: {graph_exec_id}")
@@ -40,7 +46,7 @@ async def execute_graph(
logger.info("Waiting for execution to complete...")
result = await wait_execution(test_user.id, test_graph.id, graph_exec_id)
logger.info(f"Execution completed with {len(result)} results")
assert result and len(result) == num_execs
assert len(result) == num_execs
return graph_exec_id
@@ -287,3 +293,67 @@ async def test_static_input_link_on_graph(server: SpinTestServer):
assert exec_data.status == execution.ExecutionStatus.COMPLETED
assert exec_data.output_data == {"result": [9]}
logger.info("Completed test_static_input_link_on_graph")
@pytest.mark.asyncio(scope="session")
async def test_store_listing_graph(server: SpinTestServer):
logger.info("Starting test_agent_execution")
test_user = await create_test_user()
test_graph = await create_graph(server, create_test_graph(), test_user)
store_submission_request = backend.server.v2.store.model.StoreSubmissionRequest(
agent_id=test_graph.id,
agent_version=test_graph.version,
slug="test-slug",
name="Test name",
sub_heading="Test sub heading",
video_url=None,
image_urls=[],
description="Test description",
categories=[],
)
store_listing = await server.agent_server.test_create_store_listing(
store_submission_request, test_user.id
)
if isinstance(store_listing, fastapi.responses.JSONResponse):
assert False, "Failed to create store listing"
slv_id = (
store_listing.store_listing_version_id
if store_listing.store_listing_version_id is not None
else None
)
assert slv_id is not None
admin_user = await create_test_user(alt_user=True)
await server.agent_server.test_review_store_listing(
backend.server.v2.store.model.ReviewSubmissionRequest(
store_listing_version_id=slv_id,
is_approved=True,
comments="Test comments",
),
autogpt_libs.auth.models.User(
user_id=admin_user.id,
role="admin",
email=admin_user.email,
phone_number="1234567890",
),
)
alt_test_user = admin_user
data = {"node_input": {"input_1": "Hello", "input_2": "World"}}
graph_exec_id = await execute_graph(
server.agent_server,
test_graph,
alt_test_user,
data,
4,
)
await assert_sample_graph_executions(
server.agent_server, test_graph, alt_test_user, graph_exec_id
)
logger.info("Completed test_agent_execution")

View File

@@ -566,7 +566,22 @@ export default class BackendAPI {
let errorDetail;
try {
const errorData = await response.json();
errorDetail = errorData.detail || response.statusText;
if (
Array.isArray(errorData.detail) &&
errorData.detail.length > 0 &&
errorData.detail[0].loc
) {
// This appears to be a Pydantic validation error
const errors = errorData.detail.map(
(err: _PydanticValidationError) => {
const location = err.loc.join(" -> ");
return `${location}: ${err.msg}`;
},
);
errorDetail = errors.join("\n");
} else {
errorDetail = errorData.detail || response.statusText;
}
} catch (e) {
errorDetail = response.statusText;
}
@@ -748,6 +763,13 @@ type WebsocketMessage = {
};
}[keyof WebsocketMessageTypeMap];
type _PydanticValidationError = {
type: string;
loc: string[];
msg: string;
input: any;
};
/* *** HELPER FUNCTIONS *** */
function parseNodeExecutionResultTimestamps(result: any): NodeExecutionResult {