Merge branch 'aarushikansal/open-1574-deploy-backend-to-k8s' into aarushikansal/open-1577-helm-ci-linting

This commit is contained in:
Aarushi
2024-07-29 12:43:42 +01:00
committed by GitHub
47 changed files with 2435 additions and 208 deletions

View File

@@ -7,3 +7,60 @@
text-wrap: balance;
}
}
:root {
--background: 180 100% 95%;
--foreground: 180 5% 0%;
--card: 180 50% 90%;
--card-foreground: 180 5% 10%;
--popover: 180 100% 95%;
--popover-foreground: 180 100% 0%;
--primary: 180 76.5% 30%;
--primary-foreground: 0 0% 100%;
--secondary: 180 30% 70%;
--secondary-foreground: 0 0% 0%;
--muted: 142 30% 85%;
--muted-foreground: 180 5% 35%;
--accent: 142 30% 80%;
--accent-foreground: 180 5% 10%;
--destructive: 0 100% 30%;
--destructive-foreground: 180 5% 90%;
--border: 180 30% 50%;
--input: 180 30% 18%;
--ring: 180 76.5% 30%;
--radius: 0.5rem;
}
.dark {
--background: 180 50% 5%;
--foreground: 180 5% 90%;
--card: 180 50% 0%;
--card-foreground: 180 5% 90%;
--popover: 180 50% 5%;
--popover-foreground: 180 5% 90%;
--primary: 180 76.5% 30%;
--primary-foreground: 0 0% 100%;
--secondary: 180 30% 10%;
--secondary-foreground: 0 0% 100%;
--muted: 142 30% 15%;
--muted-foreground: 180 5% 60%;
--accent: 142 30% 15%;
--accent-foreground: 180 5% 90%;
--destructive: 0 100% 30%;
--destructive-foreground: 180 5% 90%;
--border: 180 30% 18%;
--input: 180 30% 18%;
--ring: 180 76.5% 30%;
--radius: 0.5rem;
}
@layer base {
* {
@apply border-border;
}
body {
@apply bg-background text-foreground;
}
}

View File

@@ -1,10 +1,8 @@
import React from 'react';
import type { Metadata } from "next";
import { ThemeProvider as NextThemeProvider } from "next-themes";
import { type ThemeProviderProps } from "next-themes/dist/types";
import { Inter } from "next/font/google";
import Link from "next/link";
import { CubeIcon, Pencil1Icon, ReaderIcon, TimerIcon } from "@radix-ui/react-icons";
import { Pencil1Icon, TimerIcon } from "@radix-ui/react-icons";
import "./globals.css";
@@ -13,6 +11,7 @@ import { Button, buttonVariants } from "@/components/ui/button";
import {
DropdownMenu, DropdownMenuContent, DropdownMenuItem, DropdownMenuTrigger
} from "@/components/ui/dropdown-menu";
import { Providers } from "@/app/providers";
const inter = Inter({ subsets: ["latin"] });
@@ -21,60 +20,57 @@ export const metadata: Metadata = {
description: "Your one stop shop to creating AI Agents",
};
function ThemeProvider({ children, ...props }: ThemeProviderProps) {
return <NextThemeProvider {...props}>{children}</NextThemeProvider>
}
const NavBar = () => (
<nav className="bg-white dark:bg-slate-800 p-4 flex justify-between items-center shadow">
<div className="flex space-x-4">
<Link href="/monitor" className={buttonVariants({ variant: "ghost" })}>
<TimerIcon className="mr-1" /> Monitor
</Link>
<Link href="/build" className={buttonVariants({ variant: "ghost" })}>
<Pencil1Icon className="mr-1" /> Build
</Link>
</div>
<DropdownMenu>
<DropdownMenuTrigger asChild>
<Button variant="ghost" className="h-8 w-8 rounded-full">
<Avatar>
<AvatarImage src="https://github.com/shadcn.png" alt="@shadcn" />
<AvatarFallback>CN</AvatarFallback>
</Avatar>
</Button>
</DropdownMenuTrigger>
<DropdownMenuContent align="end">
<DropdownMenuItem>Profile</DropdownMenuItem>
<DropdownMenuItem>Settings</DropdownMenuItem>
<DropdownMenuItem>Switch Workspace</DropdownMenuItem>
<DropdownMenuItem>Log out</DropdownMenuItem>
</DropdownMenuContent>
</DropdownMenu>
</nav>
<nav className="bg-white dark:bg-slate-800 p-4 flex justify-between items-center shadow">
<div className="flex space-x-4">
<Link href="/monitor" className={buttonVariants({ variant: "ghost" })}>
<TimerIcon className="mr-1" /> Monitor
</Link>
<Link href="/build" className={buttonVariants({ variant: "ghost" })}>
<Pencil1Icon className="mr-1" /> Build
</Link>
</div>
<DropdownMenu>
<DropdownMenuTrigger asChild>
<Button variant="ghost" className="h-8 w-8 rounded-full">
<Avatar>
<AvatarImage src="https://github.com/shadcn.png" alt="@shadcn" />
<AvatarFallback>CN</AvatarFallback>
</Avatar>
</Button>
</DropdownMenuTrigger>
<DropdownMenuContent align="end">
<DropdownMenuItem>Profile</DropdownMenuItem>
<DropdownMenuItem>Settings</DropdownMenuItem>
<DropdownMenuItem>Switch Workspace</DropdownMenuItem>
<DropdownMenuItem>Log out</DropdownMenuItem>
</DropdownMenuContent>
</DropdownMenu>
</nav>
);
export default function RootLayout({
children,
children,
}: Readonly<{
children: React.ReactNode;
}>) {
return (
<html lang="en">
<html lang="en">
<body className={inter.className}>
<ThemeProvider
<Providers
attribute="class"
defaultTheme="light"
// Feel free to remove this line if you want to use the system theme by default
// enableSystem
disableTransitionOnChange
>
<div className="min-h-screen bg-gray-200 text-gray-900">
<NavBar />
<main className="mx-auto p-4">
{children}
</main>
</div>
</ThemeProvider>
>
<div className="min-h-screen bg-gray-200 text-gray-900">
<NavBar />
<main className="mx-auto p-4">
{children}
</main>
</div>
</Providers>
</body>
</html>
</html>
);
}

View File

@@ -0,0 +1,14 @@
'use client'
import * as React from 'react'
import { ThemeProvider as NextThemesProvider } from 'next-themes'
import { ThemeProviderProps } from 'next-themes/dist/types'
import { TooltipProvider } from '@/components/ui/tooltip'
export function Providers({ children, ...props }: ThemeProviderProps) {
return (
<NextThemesProvider {...props}>
<TooltipProvider>{children}</TooltipProvider>
</NextThemesProvider>
)
}

View File

@@ -15,7 +15,7 @@ import ReactFlow, {
import 'reactflow/dist/style.css';
import CustomNode from './CustomNode';
import './flow.css';
import AutoGPTServerAPI, { Block, Graph, ObjectSchema } from '@/lib/autogpt-server-api';
import AutoGPTServerAPI, { Block, Graph, NodeExecutionResult, ObjectSchema } from '@/lib/autogpt-server-api';
import { Button } from './ui/button';
import { Input } from './ui/input';
import { ChevronRight, ChevronLeft } from "lucide-react";
@@ -440,7 +440,7 @@ const FlowEditor: React.FC<{
const updateNodesWithExecutionData = (executionData: any[]) => {
const updateNodesWithExecutionData = (executionData: NodeExecutionResult[]) => {
setNodes((nds) =>
nds.map((node) => {
const nodeExecution = executionData.find((exec) => exec.node_id === node.data.backend_id);

View File

@@ -182,7 +182,9 @@ export default class AutoGPTServerAPI {
}
}
sendWebSocketMessage(method: string, data: any) {
sendWebSocketMessage<M extends keyof WebsocketMessageTypeMap>(
method: M, data: WebsocketMessageTypeMap[M]
) {
if (this.socket && this.socket.readyState === WebSocket.OPEN) {
this.socket.send(JSON.stringify({ method, data }));
} else {
@@ -190,7 +192,9 @@ export default class AutoGPTServerAPI {
}
}
onWebSocketMessage(method: string, handler: (data: any) => void) {
onWebSocketMessage<M extends keyof WebsocketMessageTypeMap>(
method: M, handler: (data: WebsocketMessageTypeMap[M]) => void
) {
this.messageHandlers[method] = handler;
}
@@ -198,7 +202,7 @@ export default class AutoGPTServerAPI {
this.sendWebSocketMessage('subscribe', { graph_id: graphId });
}
runGraph(graphId: string, data: any = {}) {
runGraph(graphId: string, data: WebsocketMessageTypeMap["run_graph"]["data"] = {}) {
this.sendWebSocketMessage('run_graph', { graph_id: graphId, data });
}
}
@@ -212,3 +216,9 @@ type GraphCreateRequestBody = {
} | {
graph: GraphCreatable;
}
type WebsocketMessageTypeMap = {
subscribe: { graph_id: string; };
run_graph: { graph_id: string; data: { [key: string]: any }; };
execution_event: NodeExecutionResult;
}

View File

@@ -15,21 +15,65 @@ const config = {
},
},
extend: {
fontFamily: {
sans: ['var(--font-geist-sans)'],
mono: ['var(--font-geist-mono)']
},
colors: {
border: 'hsl(var(--border))',
input: 'hsl(var(--input))',
ring: 'hsl(var(--ring))',
background: 'hsl(var(--background))',
foreground: 'hsl(var(--foreground))',
primary: {
DEFAULT: 'hsl(var(--primary))',
foreground: 'hsl(var(--primary-foreground))'
},
secondary: {
DEFAULT: 'hsl(var(--secondary))',
foreground: 'hsl(var(--secondary-foreground))'
},
destructive: {
DEFAULT: 'hsl(var(--destructive))',
foreground: 'hsl(var(--destructive-foreground))'
},
muted: {
DEFAULT: 'hsl(var(--muted))',
foreground: 'hsl(var(--muted-foreground))'
},
accent: {
DEFAULT: 'hsl(var(--accent))',
foreground: 'hsl(var(--accent-foreground))'
},
popover: {
DEFAULT: 'hsl(var(--popover))',
foreground: 'hsl(var(--popover-foreground))'
},
card: {
DEFAULT: 'hsl(var(--card))',
foreground: 'hsl(var(--card-foreground))'
}
},
borderRadius: {
lg: 'var(--radius)',
md: 'calc(var(--radius) - 2px)',
sm: 'calc(var(--radius) - 4px)'
},
keyframes: {
"accordion-down": {
from: { height: "0" },
to: { height: "var(--radix-accordion-content-height)" },
},
"accordion-up": {
from: { height: "var(--radix-accordion-content-height)" },
to: { height: "0" },
'accordion-down': {
from: { height: '0' },
to: { height: 'var(--radix-accordion-content-height)' }
},
'accordion-up': {
from: { height: 'var(--radix-accordion-content-height)' },
to: { height: '0' }
}
},
animation: {
"accordion-down": "accordion-down 0.2s ease-out",
"accordion-up": "accordion-up 0.2s ease-out",
},
},
'accordion-down': 'accordion-down 0.2s ease-out',
'accordion-up': 'accordion-up 0.2s ease-out'
}
}
},
plugins: [require("tailwindcss-animate")],
} satisfies Config;

View File

@@ -28,14 +28,13 @@ def run_processes(processes: list[AppProcess], **kwargs):
def main(**kwargs):
settings = get_config_and_secrets()
set_start_method("spawn", force=True)
freeze_support()
run_processes(
[
PyroNameServer(),
ExecutionManager(pool_size=settings.config.num_workers),
ExecutionManager(),
ExecutionScheduler(),
AgentServer(),
],

View File

@@ -0,0 +1,101 @@
from enum import Enum
from typing import Any
from autogpt_server.data.block import Block, BlockOutput, BlockSchema
from autogpt_server.data.model import SchemaField
class ComparisonOperator(Enum):
EQUAL = "=="
NOT_EQUAL = "!="
GREATER_THAN = ">"
LESS_THAN = "<"
GREATER_THAN_OR_EQUAL = ">="
LESS_THAN_OR_EQUAL = "<="
class ConditionBlock(Block):
class Input(BlockSchema):
value1: Any = SchemaField(
description="Enter the first value for comparison",
placeholder="For example: 10 or 'hello' or True",
)
operator: ComparisonOperator = SchemaField(
description="Choose the comparison operator",
placeholder="Select an operator",
)
value2: Any = SchemaField(
description="Enter the second value for comparison",
placeholder="For example: 20 or 'world' or False",
)
yes_value: Any = SchemaField(
description="(Optional) Value to output if the condition is true. If not provided, value1 will be used.",
placeholder="Leave empty to use value1, or enter a specific value",
default=None,
)
no_value: Any = SchemaField(
description="(Optional) Value to output if the condition is false. If not provided, value1 will be used.",
placeholder="Leave empty to use value1, or enter a specific value",
default=None,
)
class Output(BlockSchema):
result: bool = SchemaField(
description="The result of the condition evaluation (True or False)"
)
yes_output: Any = SchemaField(
description="The output value if the condition is true"
)
no_output: Any = SchemaField(
description="The output value if the condition is false"
)
def __init__(self):
super().__init__(
id="715696a0-e1da-45c8-b209-c2fa9c3b0be6",
input_schema=ConditionBlock.Input,
output_schema=ConditionBlock.Output,
description="Handles conditional logic based on comparison operators",
test_input={
"value1": 10,
"operator": ComparisonOperator.GREATER_THAN.value,
"value2": 5,
"yes_value": "Greater",
"no_value": "Not greater",
},
test_output=[
("result", True),
("yes_output", "Greater"),
],
)
def run(self, input_data: Input) -> BlockOutput:
value1 = input_data.value1
operator = input_data.operator
value2 = input_data.value2
yes_value = input_data.yes_value if input_data.yes_value is not None else value1
no_value = input_data.no_value if input_data.no_value is not None else value1
comparison_funcs = {
ComparisonOperator.EQUAL: lambda a, b: a == b,
ComparisonOperator.NOT_EQUAL: lambda a, b: a != b,
ComparisonOperator.GREATER_THAN: lambda a, b: a > b,
ComparisonOperator.LESS_THAN: lambda a, b: a < b,
ComparisonOperator.GREATER_THAN_OR_EQUAL: lambda a, b: a >= b,
ComparisonOperator.LESS_THAN_OR_EQUAL: lambda a, b: a <= b,
}
try:
result = comparison_funcs[operator](value1, value2)
yield "result", result
if result:
yield "yes_output", yes_value
else:
yield "no_output", no_value
except Exception:
yield "result", None
yield "yes_output", None
yield "no_output", None

View File

@@ -2,7 +2,7 @@ from collections import defaultdict
from datetime import datetime, timezone
from enum import Enum
from multiprocessing import Manager
from typing import Any
from typing import Any, Generic, TypeVar
from prisma.models import (
AgentGraphExecution,
@@ -16,6 +16,11 @@ from autogpt_server.data.block import BlockData, BlockInput, CompletedBlockOutpu
from autogpt_server.util import json
class GraphExecution(BaseModel):
graph_exec_id: str
start_node_execs: list["NodeExecution"]
class NodeExecution(BaseModel):
graph_exec_id: str
node_exec_id: str
@@ -31,7 +36,10 @@ class ExecutionStatus(str, Enum):
FAILED = "FAILED"
class ExecutionQueue:
T = TypeVar("T")
class ExecutionQueue(Generic[T]):
"""
Queue for managing the execution of agents.
This will be shared between different processes
@@ -40,11 +48,11 @@ class ExecutionQueue:
def __init__(self):
self.queue = Manager().Queue()
def add(self, execution: NodeExecution) -> NodeExecution:
def add(self, execution: T) -> T:
self.queue.put(execution)
return execution
def get(self) -> NodeExecution:
def get(self) -> T:
return self.queue.get()
def empty(self) -> bool:
@@ -146,14 +154,15 @@ async def upsert_execution_input(
node_id: str,
graph_exec_id: str,
input_name: str,
data: Any,
) -> str:
input_data: Any,
) -> tuple[str, BlockInput]:
"""
Insert AgentNodeExecutionInputOutput record for as one of AgentNodeExecution.Input.
If there is no AgentNodeExecution that has no `input_name` as input, create new one.
Returns:
The id of the created or existing AgentNodeExecution.
* The id of the created or existing AgentNodeExecution.
* Dict of node input data, key is the input name, value is the input data.
"""
existing_execution = await AgentNodeExecution.prisma().find_first(
where={ # type: ignore
@@ -162,18 +171,25 @@ async def upsert_execution_input(
"Input": {"every": {"name": {"not": input_name}}},
},
order={"addedTime": "asc"},
include={"Input": True},
)
json_data = json.dumps(data)
json_input_data = json.dumps(input_data)
if existing_execution:
await AgentNodeExecutionInputOutput.prisma().create(
data={
"name": input_name,
"data": json_data,
"data": json_input_data,
"referencedByInputExecId": existing_execution.id,
}
)
return existing_execution.id
return existing_execution.id, {
**{
input_data.name: json.loads(input_data.data)
for input_data in existing_execution.Input or []
},
input_name: input_data,
}
else:
result = await AgentNodeExecution.prisma().create(
@@ -181,10 +197,10 @@ async def upsert_execution_input(
"agentNodeId": node_id,
"agentGraphExecutionId": graph_exec_id,
"executionStatus": ExecutionStatus.INCOMPLETE,
"Input": {"create": {"name": input_name, "data": json_data}},
"Input": {"create": {"name": input_name, "data": json_input_data}},
}
)
return result.id
return result.id, {input_name: input_data}
async def upsert_execution_output(
@@ -245,26 +261,6 @@ async def get_execution_results(graph_exec_id: str) -> list[ExecutionResult]:
return res
async def get_node_execution_input(node_exec_id: str) -> BlockInput:
"""
Get execution node input data from the previous node execution result.
Returns:
dictionary of input data, key is the input name, value is the input data.
"""
execution = await AgentNodeExecution.prisma().find_unique_or_raise(
where={"id": node_exec_id},
include=EXECUTION_RESULT_INCLUDE, # type: ignore
)
if not execution.AgentNode:
raise ValueError(f"Node {execution.agentNodeId} not found.")
return {
input_data.name: json.loads(input_data.data)
for input_data in execution.Input or []
}
LIST_SPLIT = "_$_"
DICT_SPLIT = "_#_"
OBJC_SPLIT = "_@_"

View File

@@ -1,6 +1,7 @@
import asyncio
import logging
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import Future, ProcessPoolExecutor, TimeoutError
from contextlib import contextmanager
from typing import TYPE_CHECKING, Any, Coroutine, Generator, TypeVar
if TYPE_CHECKING:
@@ -8,11 +9,12 @@ if TYPE_CHECKING:
from autogpt_server.data import db
from autogpt_server.data.block import Block, BlockData, BlockInput, get_block
from autogpt_server.data.execution import ExecutionQueue, ExecutionStatus
from autogpt_server.data.execution import NodeExecution as Execution
from autogpt_server.data.execution import (
ExecutionQueue,
ExecutionStatus,
GraphExecution,
NodeExecution,
create_graph_execution,
get_node_execution_input,
merge_execution_input,
parse_execution_output,
update_execution_status,
@@ -21,20 +23,21 @@ from autogpt_server.data.execution import (
)
from autogpt_server.data.graph import Graph, Link, Node, get_graph, get_node
from autogpt_server.util.service import AppService, expose, get_service_client
from autogpt_server.util.settings import Config
logger = logging.getLogger(__name__)
def get_log_prefix(graph_eid: str, node_eid: str, block_name: str = "-"):
return f"[ExecutionManager] [graph-{graph_eid}|node-{node_eid}|{block_name}]"
return f"[ExecutionManager][graph-eid-{graph_eid}|node-eid-{node_eid}|{block_name}]"
T = TypeVar("T")
ExecutionStream = Generator[Execution, None, None]
ExecutionStream = Generator[NodeExecution, None, None]
def execute_node(
loop: asyncio.AbstractEventLoop, api_client: "AgentServer", data: Execution
loop: asyncio.AbstractEventLoop, api_client: "AgentServer", data: NodeExecution
) -> ExecutionStream:
"""
Execute a node in the graph. This will trigger a block execution on a node,
@@ -58,9 +61,8 @@ def execute_node(
return loop.run_until_complete(f)
def update_execution(status: ExecutionStatus):
api_client.send_execution_update(
wait(update_execution_status(node_exec_id, status)).model_dump()
)
exec_update = wait(update_execution_status(node_exec_id, status))
api_client.send_execution_update(exec_update.model_dump())
node = wait(get_node(node_id))
if not node:
@@ -89,7 +91,7 @@ def execute_node(
wait(upsert_execution_output(node_exec_id, output_name, output_data))
update_execution(ExecutionStatus.COMPLETED)
for execution in enqueue_next_nodes(
for execution in _enqueue_next_nodes(
api_client=api_client,
loop=loop,
node=node,
@@ -107,23 +109,31 @@ def execute_node(
raise e
def enqueue_next_nodes(
@contextmanager
def synchronized(api_client: "AgentServer", key: Any):
api_client.acquire_lock(key)
try:
yield
finally:
api_client.release_lock(key)
def _enqueue_next_nodes(
api_client: "AgentServer",
loop: asyncio.AbstractEventLoop,
node: Node,
output: BlockData,
graph_exec_id: str,
prefix: str,
) -> list[Execution]:
) -> list[NodeExecution]:
def wait(f: Coroutine[T, Any, T]) -> T:
return loop.run_until_complete(f)
def execution_update(node_exec_id: str, status: ExecutionStatus):
api_client.send_execution_update(
wait(update_execution_status(node_exec_id, status)).model_dump()
)
exec_update = wait(update_execution_status(node_exec_id, status))
api_client.send_execution_update(exec_update.model_dump())
def update_execution_result(node_link: Link) -> Execution | None:
def register_next_execution(node_link: Link) -> NodeExecution | None:
next_output_name = node_link.source_name
next_input_name = node_link.sink_name
next_node_id = node_link.sink_id
@@ -137,18 +147,21 @@ def enqueue_next_nodes(
logger.error(f"{prefix} Error, next node {next_node_id} not found.")
return
next_node_exec_id = wait(
upsert_execution_input(
node_id=next_node_id,
graph_exec_id=graph_exec_id,
input_name=next_input_name,
data=next_data,
# Upserting execution input includes reading the existing input pins in the node
# which then either updating the existing execution input or creating a new one.
# While reading, we should avoid any other process to add input to the same node.
with synchronized(api_client, ("upsert_input", next_node_id, graph_exec_id)):
next_node_exec_id, next_node_input = wait(
upsert_execution_input(
node_id=next_node_id,
graph_exec_id=graph_exec_id,
input_name=next_input_name,
input_data=next_data,
)
)
)
next_node_input = wait(get_node_execution_input(next_node_exec_id))
next_node_input, validation_msg = validate_exec(next_node, next_node_input)
suffix = f"{next_output_name}~{next_input_name}#{next_node_id}:{validation_msg}"
suffix = f"{next_output_name}>{next_input_name}~{next_node_id}:{validation_msg}"
if not next_node_input:
logger.warning(f"{prefix} Skipped queueing {suffix}")
@@ -157,7 +170,7 @@ def enqueue_next_nodes(
# Input is complete, enqueue the execution.
logger.warning(f"{prefix} Enqueued {suffix}")
execution_update(next_node_exec_id, ExecutionStatus.QUEUED)
return Execution(
return NodeExecution(
graph_exec_id=graph_exec_id,
node_exec_id=next_node_exec_id,
node_id=next_node.id,
@@ -167,7 +180,7 @@ def enqueue_next_nodes(
return [
execution
for link in node.output_links
if (execution := update_execution_result(link))
if (execution := register_next_execution(link))
]
@@ -228,44 +241,118 @@ def get_agent_server_client() -> "AgentServer":
class Executor:
loop: asyncio.AbstractEventLoop
"""
This class contains event handlers for the process pool executor events.
The main events are:
on_node_executor_start: Initialize the process that executes the node.
on_node_execution: Execution logic for a node.
on_graph_executor_start: Initialize the process that executes the graph.
on_graph_execution: Execution logic for a graph.
The execution flow:
1. Graph execution request is added to the queue.
2. Graph executor loop picks the request from the queue.
3. Graph executor loop submits the graph execution request to the executor pool.
[on_graph_execution]
4. Graph executor initialize the node execution queue.
5. Graph executor adds the starting nodes to the node execution queue.
6. Graph executor waits for all nodes to be executed.
[on_node_execution]
7. Node executor picks the node execution request from the queue.
8. Node executor executes the node.
9. Node executor enqueues the next executed nodes to the node execution queue.
"""
@classmethod
def on_executor_start(cls):
def on_node_executor_start(cls):
cls.loop = asyncio.new_event_loop()
cls.loop.run_until_complete(db.connect())
cls.agent_server_client = get_agent_server_client()
@classmethod
def on_start_execution(cls, q: ExecutionQueue, data: Execution) -> bool:
def on_node_execution(cls, q: ExecutionQueue[NodeExecution], data: NodeExecution):
prefix = get_log_prefix(data.graph_exec_id, data.node_exec_id)
try:
logger.warning(f"{prefix} Start execution")
logger.warning(f"{prefix} Start node execution")
for execution in execute_node(cls.loop, cls.agent_server_client, data):
q.add(execution)
return True
logger.warning(f"{prefix} Finished node execution")
except Exception as e:
logger.exception(f"{prefix} Error: {e}")
return False
logger.exception(f"{prefix} Failed node execution: {e}")
@classmethod
def on_graph_executor_start(cls):
cls.pool_size = Config().num_node_workers
cls.executor = ProcessPoolExecutor(
max_workers=cls.pool_size,
initializer=cls.on_node_executor_start,
)
logger.warning(f"Graph executor started with max-{cls.pool_size} node workers.")
@classmethod
def on_graph_execution(cls, graph_data: GraphExecution):
prefix = get_log_prefix(graph_data.graph_exec_id, "*")
logger.warning(f"{prefix} Start graph execution")
try:
queue = ExecutionQueue[NodeExecution]()
for node_exec in graph_data.start_node_execs:
queue.add(node_exec)
futures: dict[str, Future] = {}
while not queue.empty():
execution = queue.get()
# Avoid parallel execution of the same node.
fut = futures.get(execution.node_id)
if fut and not fut.done():
cls.wait_future(fut)
logger.warning(f"{prefix} Re-enqueueing {execution.node_id}")
queue.add(execution)
continue
futures[execution.node_id] = cls.executor.submit(
cls.on_node_execution, queue, execution
)
# Avoid terminating graph execution when some nodes are still running.
while queue.empty() and futures:
for node_id, future in list(futures.items()):
if future.done():
del futures[node_id]
elif queue.empty():
cls.wait_future(future)
logger.warning(f"{prefix} Finished graph execution")
except Exception as e:
logger.exception(f"{prefix} Failed graph execution: {e}")
@classmethod
def wait_future(cls, future: Future):
try:
future.result(timeout=3)
except TimeoutError:
# Avoid being blocked by long-running node, by not waiting its completion.
pass
class ExecutionManager(AppService):
def __init__(self, pool_size: int):
self.pool_size = pool_size
self.queue = ExecutionQueue()
def __init__(self):
self.pool_size = Config().num_graph_workers
self.queue = ExecutionQueue[GraphExecution]()
def run_service(self):
with ProcessPoolExecutor(
max_workers=self.pool_size,
initializer=Executor.on_executor_start,
initializer=Executor.on_graph_executor_start,
) as executor:
logger.warning(f"Execution manager started with {self.pool_size} workers.")
logger.warning(
f"Execution manager started with max-{self.pool_size} graph workers."
)
while True:
executor.submit(
Executor.on_start_execution,
self.queue,
self.queue.get(),
)
executor.submit(Executor.on_graph_execution, self.queue.get())
@property
def agent_server_client(self) -> "AgentServer":
@@ -292,32 +379,26 @@ class ExecutionManager(AppService):
nodes_input=nodes_input,
)
)
executions: list[BlockInput] = []
starting_node_execs = []
for node_exec in node_execs:
self.add_node_execution(
Execution(
starting_node_execs.append(
NodeExecution(
graph_exec_id=node_exec.graph_exec_id,
node_exec_id=node_exec.node_exec_id,
node_id=node_exec.node_id,
data=node_exec.input_data,
)
)
executions.append(
{
"id": node_exec.node_exec_id,
"node_id": node_exec.node_id,
}
exec_update = self.run_and_wait(
update_execution_status(node_exec.node_exec_id, ExecutionStatus.QUEUED)
)
self.agent_server_client.send_execution_update(exec_update.model_dump())
return {
"id": graph_exec_id,
"executions": executions,
}
def add_node_execution(self, execution: Execution) -> Execution:
res = self.run_and_wait(
update_execution_status(execution.node_exec_id, ExecutionStatus.QUEUED)
graph_exec = GraphExecution(
graph_exec_id=graph_exec_id,
start_node_execs=starting_node_execs,
)
self.agent_server_client.send_execution_update(res.model_dump())
return self.queue.add(execution)
self.queue.add(graph_exec)
return {"id": graph_exec_id}

View File

@@ -15,12 +15,16 @@ from fastapi import (
)
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from fastapi.staticfiles import StaticFiles
import autogpt_server.server.ws_api
from autogpt_server.data import block, db, execution
from autogpt_server.data import block, db
from autogpt_server.data import graph as graph_db
from autogpt_server.data.block import BlockInput, CompletedBlockOutput
from autogpt_server.data.execution import (
ExecutionResult,
get_execution_results,
list_executions,
)
from autogpt_server.executor import ExecutionManager, ExecutionScheduler
from autogpt_server.server.conn_manager import ConnectionManager
from autogpt_server.server.model import (
@@ -29,18 +33,19 @@ from autogpt_server.server.model import (
SetGraphActiveVersion,
WsMessage,
)
from autogpt_server.util.data import get_frontend_path
from autogpt_server.util.lock import KeyedMutex
from autogpt_server.util.service import AppService, expose, get_service_client
from autogpt_server.util.settings import Settings
class AgentServer(AppService):
event_queue: asyncio.Queue[execution.ExecutionResult] = asyncio.Queue()
event_queue: asyncio.Queue[ExecutionResult] = asyncio.Queue()
manager = ConnectionManager()
mutex = KeyedMutex()
async def event_broadcaster(self):
while True:
event: execution.ExecutionResult = await self.event_queue.get()
event: ExecutionResult = await self.event_queue.get()
await self.manager.send_execution_result(event)
@asynccontextmanager
@@ -546,17 +551,17 @@ class AgentServer(AppService):
status_code=404, detail=f"Agent #{graph_id}{rev} not found."
)
return await execution.list_executions(graph_id, graph_version)
return await list_executions(graph_id, graph_version)
@classmethod
async def get_run_execution_results(
cls, graph_id: str, run_id: str
) -> list[execution.ExecutionResult]:
) -> list[ExecutionResult]:
graph = await graph_db.get_graph(graph_id)
if not graph:
raise HTTPException(status_code=404, detail=f"Graph #{graph_id} not found.")
return await execution.get_execution_results(run_id)
return await get_execution_results(run_id)
async def create_schedule(
self, graph_id: str, cron: str, input_data: dict[Any, Any]
@@ -585,14 +590,32 @@ class AgentServer(AppService):
@expose
def send_execution_update(self, execution_result_dict: dict[Any, Any]):
execution_result = execution.ExecutionResult(**execution_result_dict)
execution_result = ExecutionResult(**execution_result_dict)
self.run_and_wait(self.event_queue.put(execution_result))
@expose
def acquire_lock(self, key: Any):
self.mutex.lock(key)
@expose
def release_lock(self, key: Any):
self.mutex.unlock(key)
@classmethod
def update_configuration(
cls,
updated_settings: Annotated[
Dict[str, Any], Body(examples=[{"config": {"num_workers": 10}}])
Dict[str, Any],
Body(
examples=[
{
"config": {
"num_graph_workers": 10,
"num_node_workers": 10,
}
}
]
),
],
):
settings = Settings()

View File

@@ -0,0 +1,32 @@
from threading import Lock
from typing import Any
from expiringdict import ExpiringDict
class KeyedMutex:
"""
This class provides a mutex that can be locked and unlocked by a specific key.
It uses an ExpiringDict to automatically clear the mutex after a specified timeout,
in case the key is not unlocked for a specified duration, to prevent memory leaks.
"""
def __init__(self):
self.locks: dict[Any, Lock] = ExpiringDict(max_len=6000, max_age_seconds=60)
self.locks_lock = Lock()
def lock(self, key: Any):
with self.locks_lock:
if key not in self.locks:
self.locks[key] = (lock := Lock())
else:
lock = self.locks[key]
lock.acquire()
def unlock(self, key: Any):
with self.locks_lock:
if key in self.locks:
lock = self.locks.pop(key)
else:
return
lock.release()

View File

@@ -1,5 +1,6 @@
import asyncio
import logging
import os
import threading
import time
from abc import abstractmethod
@@ -16,6 +17,8 @@ logger = logging.getLogger(__name__)
conn_retry = retry(stop=stop_after_delay(5), wait=wait_exponential(multiplier=0.1))
T = TypeVar("T")
host = os.environ.get("PYRO_HOST", "localhost")
def expose(func: Callable) -> Callable:
def wrapper(*args, **kwargs):
@@ -33,7 +36,7 @@ class PyroNameServer(AppProcess):
def run(self):
try:
print("Starting NameServer loop")
nameserver.start_ns_loop()
nameserver.start_ns_loop(host=host, port=9090)
except KeyboardInterrupt:
print("Shutting down NameServer")
@@ -51,11 +54,11 @@ class AppService(AppProcess):
while True:
time.sleep(10)
def run_async(self, coro: Coroutine[T, Any, T]):
def __run_async(self, coro: Coroutine[T, Any, T]):
return asyncio.run_coroutine_threadsafe(coro, self.shared_event_loop)
def run_and_wait(self, coro: Coroutine[T, Any, T]) -> T:
future = self.run_async(coro)
future = self.__run_async(coro)
return future.result()
def run(self):
@@ -77,15 +80,14 @@ class AppService(AppProcess):
@conn_retry
def __start_pyro(self):
daemon = pyro.Daemon()
ns = pyro.locate_ns()
daemon = pyro.Daemon(host=host)
ns = pyro.locate_ns(host=host, port=9090)
uri = daemon.register(self)
ns.register(self.service_name, uri)
logger.warning(f"Service [{self.service_name}] Ready. Object URI = {uri}")
daemon.requestLoop()
def __start_async_loop(self):
# asyncio.set_event_loop(self.shared_event_loop)
self.shared_event_loop.run_forever()

View File

@@ -41,8 +41,17 @@ class UpdateTrackingModel(BaseModel, Generic[T]):
class Config(UpdateTrackingModel["Config"], BaseSettings):
"""Config for the server."""
num_workers: int = Field(
default=9, ge=1, le=100, description="Number of workers to use for execution."
num_graph_workers: int = Field(
default=1,
ge=1,
le=100,
description="Maximum number of workers to use for graph execution.",
)
num_node_workers: int = Field(
default=1,
ge=1,
le=100,
description="Maximum number of workers to use for node execution within a single graph.",
)
# Add more configuration fields as needed

View File

@@ -13,7 +13,7 @@ log = print
class SpinTestServer:
def __init__(self):
self.name_server = PyroNameServer()
self.exec_manager = ExecutionManager(1)
self.exec_manager = ExecutionManager()
self.agent_server = AgentServer()
self.scheduler = ExecutionScheduler()

View File

@@ -1,3 +1,4 @@
{
"num_workers": 5
}
"num_graph_workers": 10,
"num_node_workers": 10
}

View File

@@ -0,0 +1,8 @@
/*
Warnings:
- A unique constraint covering the columns `[referencedByInputExecId,referencedByOutputExecId,name]` on the table `AgentNodeExecutionInputOutput` will be added. If there are existing duplicate values, this will fail.
*/
-- CreateIndex
CREATE UNIQUE INDEX "AgentNodeExecutionInputOutput_referencedByInputExecId_referencedByOutputExecId_name_key" ON "AgentNodeExecutionInputOutput"("referencedByInputExecId", "referencedByOutputExecId", "name");

View File

@@ -1,3 +1,3 @@
# Please do not edit this file manually
# It should be added in your version-control system (i.e. Git)
provider = "sqlite"
provider = "sqlite"

View File

@@ -25,7 +25,7 @@ requests = "*"
sentry-sdk = "^1.40.4"
[package.extras]
benchmark = ["agbenchmark @ file:///workspaces/AutoGPT/benchmark"]
benchmark = ["agbenchmark"]
[package.source]
type = "directory"
@@ -329,7 +329,7 @@ watchdog = "4.0.0"
webdriver-manager = "^4.0.1"
[package.extras]
benchmark = ["agbenchmark @ file:///workspaces/AutoGPT/benchmark"]
benchmark = ["agbenchmark"]
[package.source]
type = "directory"
@@ -1179,6 +1179,20 @@ files = [
[package.extras]
test = ["pytest (>=6)"]
[[package]]
name = "expiringdict"
version = "1.2.2"
description = "Dictionary with auto-expiring values for caching purposes"
optional = false
python-versions = "*"
files = [
{file = "expiringdict-1.2.2-py3-none-any.whl", hash = "sha256:09a5d20bc361163e6432a874edd3179676e935eb81b925eccef48d409a8a45e8"},
{file = "expiringdict-1.2.2.tar.gz", hash = "sha256:300fb92a7e98f15b05cf9a856c1415b3bc4f2e132be07daa326da6414c23ee09"},
]
[package.extras]
tests = ["coverage", "coveralls", "dill", "mock", "nose"]
[[package]]
name = "fastapi"
version = "0.109.2"
@@ -6348,4 +6362,4 @@ test = ["big-O", "importlib-resources", "jaraco.functools", "jaraco.itertools",
[metadata]
lock-version = "2.0"
python-versions = "^3.10"
content-hash = "422edccf59dc5cdcc1cb0f6991228b2f58ef807a934d1aeda9643fa0cad27780"
content-hash = "9013ff78344cb68878809bd7220453879c32c9291e39d99321dbcc9a7359855c"

View File

@@ -0,0 +1,8 @@
/*
Warnings:
- A unique constraint covering the columns `[referencedByInputExecId,referencedByOutputExecId,name]` on the table `AgentNodeExecutionInputOutput` will be added. If there are existing duplicate values, this will fail.
*/
-- CreateIndex
CREATE UNIQUE INDEX "AgentNodeExecutionInputOutput_referencedByInputExecId_refer_key" ON "AgentNodeExecutionInputOutput"("referencedByInputExecId", "referencedByOutputExecId", "name");

View File

@@ -127,6 +127,9 @@ model AgentNodeExecutionInputOutput {
ReferencedByInputExec AgentNodeExecution? @relation("AgentNodeExecutionInput", fields: [referencedByInputExecId], references: [id])
referencedByOutputExecId String?
ReferencedByOutputExec AgentNodeExecution? @relation("AgentNodeExecutionOutput", fields: [referencedByOutputExecId], references: [id])
// Input and Output pin names are unique for each AgentNodeExecution.
@@unique([referencedByInputExecId, referencedByOutputExecId, name])
}
// This model describes the recurring execution schedule of an Agent.
@@ -145,4 +148,4 @@ model AgentGraphExecutionSchedule {
lastUpdated DateTime @updatedAt
@@index([isEnabled])
}
}

View File

@@ -38,6 +38,7 @@ youtube-transcript-api = "^0.6.2"
ollama = "^0.3.0"
feedparser = "^6.0.11"
python-dotenv = "^1.0.1"
expiringdict = "^1.2.2"
[tool.poetry.group.dev.dependencies]
cx-freeze = { git = "https://github.com/ntindle/cx_Freeze.git", rev = "main", develop = true }

View File

@@ -127,6 +127,9 @@ model AgentNodeExecutionInputOutput {
ReferencedByInputExec AgentNodeExecution? @relation("AgentNodeExecutionInput", fields: [referencedByInputExecId], references: [id])
referencedByOutputExecId String?
ReferencedByOutputExec AgentNodeExecution? @relation("AgentNodeExecutionOutput", fields: [referencedByOutputExecId], references: [id])
// Input and Output pin names are unique for each AgentNodeExecution.
@@unique([referencedByInputExecId, referencedByOutputExecId, name])
}
// This model describes the recurring execution schedule of an Agent.

View File

@@ -37,7 +37,7 @@ async def assert_sample_graph_executions(test_graph: graph.Graph, graph_exec_id:
assert exec.graph_exec_id == graph_exec_id
assert exec.output_data == {"output": ["Hello, World!"]}
assert exec.input_data == {"input": text}
assert exec.node_id == test_graph.nodes[0].id
assert exec.node_id in [test_graph.nodes[0].id, test_graph.nodes[1].id]
# Executing ConstantBlock2
exec = executions[1]
@@ -45,7 +45,7 @@ async def assert_sample_graph_executions(test_graph: graph.Graph, graph_exec_id:
assert exec.graph_exec_id == graph_exec_id
assert exec.output_data == {"output": ["Hello, World!"]}
assert exec.input_data == {"input": text}
assert exec.node_id == test_graph.nodes[1].id
assert exec.node_id in [test_graph.nodes[0].id, test_graph.nodes[1].id]
# Executing TextFormatterBlock
exec = executions[2]

View File

@@ -7,4 +7,4 @@ type: application
version: 0.1.0
appVersion: "1.16.0"
appVersion: "1.0.0"

View File

@@ -4,6 +4,10 @@ metadata:
name: {{ include "autogpt-server.fullname" . }}
labels:
{{- include "autogpt-server.labels" . | nindent 4 }}
{{- with .Values.service.annotations }}
annotations:
{{- toYaml . | nindent 4 }}
{{- end }}
spec:
type: {{ .Values.service.type }}
ports:

View File

@@ -2,7 +2,7 @@
image:
repository: us-east1-docker.pkg.dev/agpt-dev/agpt-server-dev/agpt-server-dev
pullPolicy: IfNotPresent
pullPolicy: Always
tag: "latest"
serviceAccount:
@@ -11,9 +11,11 @@ serviceAccount:
name: "dev-agpt-server-sa"
service:
type: LoadBalancer
port: 80
type: ClusterIP
port: 8000
targetPort: 8000
annotations:
cloud.google.com/neg: '{"ingress": true}'
ingress:
enabled: true
@@ -22,12 +24,21 @@ ingress:
kubernetes.io/ingress.class: gce
kubernetes.io/ingress.global-static-ip-name: "agpt-dev-agpt-server-ip"
networking.gke.io/managed-certificates: "autogpt-server-cert"
kubernetes.io/ingress.allow-http: "true"
hosts:
- host: server.agpt.co
- host: dev-server.agpt.co
paths:
- path: /*
pathType: ImplementationSpecific
tls: []
- path: /
pathType: Prefix
backend:
service:
name: autogpt-server
port: 8000
defaultBackend:
service:
name: autogpt-server
port:
number: 8000
resources:
requests:
@@ -37,7 +48,24 @@ resources:
cpu: 500m
memory: 512Mi
domain: "server.agpt.co"
livenessProbe:
httpGet:
path: /docs
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 6
readinessProbe:
httpGet:
path: /docs
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
timeoutSeconds: 5
failureThreshold: 6
domain: "dev-server.agpt.co"
cloudSqlProxy:
image:
@@ -51,4 +79,5 @@ cloudSqlProxy:
cpu: "1"
env:
APP_ENV: "dev"
APP_ENV: "dev"
PYRO_HOST: "0.0.0.0"

View File

@@ -1,6 +1,6 @@
# base values, environment specific variables should be specified/overwritten in environment values
replicaCount: 2
replicaCount: 1
image:
repository: us-east1-docker.pkg.dev/agpt-dev/agpt-server-dev/agpt-server-dev

View File

@@ -43,4 +43,13 @@ role_bindings = {
"roles/iam.workloadIdentityUser" = [
"serviceAccount:dev-agpt-server-sa@agpt-dev.iam.gserviceaccount.com"
]
"roles/compute.networkUser" = [
"serviceAccount:dev-agpt-server-sa@agpt-dev.iam.gserviceaccount.com"
],
"roles/container.hostServiceAgentUser" = [
"serviceAccount:dev-agpt-server-sa@agpt-dev.iam.gserviceaccount.com"
]
}
pods_ip_cidr_range = "10.1.0.0/16"
services_ip_cidr_range = "10.2.0.0/20"

View File

@@ -30,11 +30,13 @@ module "static_ips" {
module "networking" {
source = "./modules/networking"
project_id = var.project_id
region = var.region
network_name = var.network_name
subnet_name = var.subnet_name
subnet_cidr = var.subnet_cidr
project_id = var.project_id
region = var.region
network_name = var.network_name
subnet_name = var.subnet_name
subnet_cidr = var.subnet_cidr
pods_ip_cidr_range = var.pods_ip_cidr_range
services_ip_cidr_range = var.services_ip_cidr_range
}
module "gke_cluster" {

View File

@@ -26,5 +26,10 @@ resource "google_container_cluster" "primary" {
network = var.network
subnetwork = var.subnetwork
ip_allocation_policy {
cluster_secondary_range_name = "pods"
services_secondary_range_name = "services"
}
}

View File

@@ -8,5 +8,15 @@ resource "google_compute_subnetwork" "subnet" {
ip_cidr_range = var.subnet_cidr
region = var.region
network = google_compute_network.vpc_network.self_link
secondary_ip_range {
range_name = "pods"
ip_cidr_range = var.pods_ip_cidr_range
}
secondary_ip_range {
range_name = "services"
ip_cidr_range = var.services_ip_cidr_range
}
}

View File

@@ -18,4 +18,12 @@ variable "subnet_cidr" {
description = "The CIDR range for the subnet"
}
variable "pods_ip_cidr_range" {
description = "The IP address range for pods"
default = "10.1.0.0/16"
}
variable "services_ip_cidr_range" {
description = "The IP address range for services"
default = "10.2.0.0/20"
}

View File

@@ -1,6 +1,5 @@
resource "google_compute_address" "static_ip" {
resource "google_compute_global_address" "static_ip" {
count = length(var.ip_names)
name = "${var.project_id}-${var.ip_names[count.index]}"
region = var.region
address_type = "EXTERNAL"
}

View File

@@ -1,9 +1,9 @@
output "ip_addresses" {
description = "Map of created static IP addresses"
value = { for i, ip in google_compute_address.static_ip : var.ip_names[i] => ip.address }
value = { for i, ip in google_compute_global_address.static_ip : var.ip_names[i] => ip.address }
}
output "ip_names" {
description = "List of full names of the created static IP addresses"
value = google_compute_address.static_ip[*].name
value = google_compute_global_address.static_ip[*].name
}

View File

@@ -99,3 +99,15 @@ variable "role_bindings" {
type = map(list(string))
default = {}
}
variable "pods_ip_cidr_range" {
description = "The IP address range for pods"
type = string
default = "10.1.0.0/16"
}
variable "services_ip_cidr_range" {
description = "The IP address range for services"
type = string
default = "10.2.0.0/20"
}

7
rnd/market/.env.example Normal file
View File

@@ -0,0 +1,7 @@
RUN_ENV=local
DB_USER=marketplace
DB_PASS=pass123
DB_NAME=marketplace
DB_PORT=5432
DATABASE_URL=postgresql://${DB_USER}:${DB_PASS}@localhost:${DB_PORT}/${DB_NAME}
SENTRY_DSN=https://sentry.io

6
rnd/market/.gitignore vendored Normal file
View File

@@ -0,0 +1,6 @@
database.db
database.db-journal
build/
config.json
secrets/*
!secrets/.gitkeep

14
rnd/market/README.md Normal file
View File

@@ -0,0 +1,14 @@
# AutoGPT Agent Marketplace
## Overview
AutoGPT Agent Marketplace is an open-source platform for autonomous AI agents. This project aims to create a user-friendly, accessible marketplace where users can discover, utilize, and contribute to a diverse ecosystem of AI solutions.
## Vision
Our vision is to empower users with customizable and free AI agents, fostering an open-source community that drives innovation in AI automation across various industries.
# Key Features
- Agent Discovery and Search
- Agent Listings with Detailed Information
- User Profiles
- Data Protection and Compliance

View File

@@ -0,0 +1,16 @@
version: "3"
services:
postgres:
image: ankane/pgvector:latest
environment:
POSTGRES_USER: ${DB_USER}
POSTGRES_PASSWORD: ${DB_PASS}
POSTGRES_DB: ${DB_NAME}
PGUSER: ${DB_USER}
healthcheck:
test: pg_isready -U $$POSTGRES_USER -d $$POSTGRES_DB
interval: 10s
timeout: 5s
retries: 5
ports:
- "${DB_PORT}:5432"

27
rnd/market/linter.py Normal file
View File

@@ -0,0 +1,27 @@
import os
import subprocess
directory = os.path.dirname(os.path.realpath(__file__))
def run(*command: str) -> None:
print(f">>>>> Running poetry run {' '.join(command)}")
subprocess.run(["poetry", "run"] + list(command), cwd=directory, check=True)
def lint():
try:
run("ruff", "check", ".", "--exit-zero")
run("isort", "--diff", "--check", "--profile", "black", ".")
run("black", "--diff", "--check", ".")
run("pyright")
except subprocess.CalledProcessError as e:
print("Lint failed, try running `poetry run format` to fix the issues: ", e)
raise e
def format():
run("ruff", "check", "--fix", ".")
run("isort", "--profile", "black", ".")
run("black", ".")
run("pyright", ".")

View File

60
rnd/market/market/app.py Normal file
View File

@@ -0,0 +1,60 @@
from contextlib import asynccontextmanager
import os
from dotenv import load_dotenv
from fastapi import FastAPI
from fastapi.middleware.gzip import GZipMiddleware
from prisma import Prisma
import sentry_sdk
from sentry_sdk.integrations.asyncio import AsyncioIntegration
from sentry_sdk.integrations.fastapi import FastApiIntegration
from sentry_sdk.integrations.starlette import StarletteIntegration
from market.routes import agents
load_dotenv()
if os.environ.get("SENTRY_DSN"):
sentry_sdk.init(
dsn=os.environ.get("SENTRY_DSN"),
# Set traces_sample_rate to 1.0 to capture 100%
# of transactions for performance monitoring.
traces_sample_rate=1.0,
# Set profiles_sample_rate to 1.0 to profile 100%
# of sampled transactions.
# We recommend adjusting this value in production.
profiles_sample_rate=1.0,
enable_tracing=True,
environment=os.environ.get("RUN_ENV", default="CLOUD").lower(),
integrations=[
StarletteIntegration(transaction_style="url"),
FastApiIntegration(transaction_style="url"),
AsyncioIntegration(),
],
)
db_client = Prisma(auto_register=True)
@asynccontextmanager
async def lifespan(app: FastAPI):
await db_client.connect()
yield
await db_client.disconnect()
app = FastAPI(
title="Marketplace API",
description=(
"AutoGPT Marketplace API is a service that allows users to share AI agents."
),
summary="Maketplace API",
version="0.1",
lifespan=lifespan,
)
# Add gzip middleware to compress responses
app.add_middleware(GZipMiddleware, minimum_size=1000)
app.include_router(agents.router, prefix="/market/agents", tags=["agents"])

View File

@@ -0,0 +1,3 @@
from fastapi import APIRouter
router = APIRouter()

1491
rnd/market/poetry.lock generated Normal file

File diff suppressed because it is too large Load Diff

45
rnd/market/pyproject.toml Normal file
View File

@@ -0,0 +1,45 @@
[tool.poetry]
name = "market"
version = "0.1.0"
description = ""
authors = ["SwiftyOS <craigswift13@gmail.com>"]
readme = "README.md"
[tool.poetry.dependencies]
python = "^3.10"
prisma = "^0.12.0"
python-dotenv = "^1.0.1"
uvicorn = "^0.30.3"
fastapi = "^0.111.1"
sentry-sdk = {extras = ["fastapi"], version = "^2.11.0"}
[tool.poetry.group.dev.dependencies]
pytest = "^8.2.1"
pytest-asyncio = "^0.23.7"
pytest-watcher = "^0.4.2"
requests = "^2.32.3"
ruff = "^0.5.2"
pyright = "^1.1.371"
isort = "^5.13.2"
black = "^24.4.2"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
[tool.poetry.scripts]
format = "linter:format"
lint = "linter:lint"
[tool.pytest-watcher]
now = false
clear = true
delay = 0.2
runner = "pytest"
runner_args = []
patterns = ["*.py"]
ignore_patterns = []
[tool.pytest.ini_options]
asyncio_mode = "auto"

78
rnd/market/schema.prisma Normal file
View File

@@ -0,0 +1,78 @@
datasource db {
provider = "postgresql"
url = env("DATABASE_URL")
}
generator client {
provider = "prisma-client-py"
recursive_type_depth = 5
interface = "asyncio"
}
// This model describes the Agent Graph/Flow (Multi Agent System).
model AgentGraph {
id String @default(uuid())
version Int @default(1)
name String?
description String?
isActive Boolean @default(true)
isTemplate Boolean @default(false)
AgentNodes AgentNode[]
@@id(name: "graphVersionId", [id, version])
}
// This model describes a single node in the Agent Graph/Flow (Multi Agent System).
model AgentNode {
id String @id @default(uuid())
agentBlockId String
AgentBlock AgentBlock @relation(fields: [agentBlockId], references: [id])
agentGraphId String
agentGraphVersion Int @default(1)
AgentGraph AgentGraph @relation(fields: [agentGraphId, agentGraphVersion], references: [id, version])
// List of consumed input, that the parent node should provide.
Input AgentNodeLink[] @relation("AgentNodeSink")
// List of produced output, that the child node should be executed.
Output AgentNodeLink[] @relation("AgentNodeSource")
// JSON serialized dict[str, str] containing predefined input values.
constantInput String @default("{}")
// JSON serialized dict[str, str] containing the node metadata.
metadata String @default("{}")
}
// This model describes the link between two AgentNodes.
model AgentNodeLink {
id String @id @default(uuid())
// Output of a node is connected to the source of the link.
agentNodeSourceId String
AgentNodeSource AgentNode @relation("AgentNodeSource", fields: [agentNodeSourceId], references: [id])
sourceName String
// Input of a node is connected to the sink of the link.
agentNodeSinkId String
AgentNodeSink AgentNode @relation("AgentNodeSink", fields: [agentNodeSinkId], references: [id])
sinkName String
}
// This model describes a component that will be executed by the AgentNode.
model AgentBlock {
id String @id @default(uuid())
name String @unique
// We allow a block to have multiple types of input & output.
// Serialized object-typed `jsonschema` with top-level properties as input/output name.
inputSchema String
outputSchema String
// Prisma requires explicit back-references.
ReferencedByAgentNode AgentNode[]
}