feat(rnd): Add code-formatter & linting on AutoGPT server (#7458)

### Background

Add formatter & linter command.
Tools: ruff --> isort --> black --> pyright.

### Changes 🏗️

Introduced:
* `poetry run format`
* `poetry run lint`

`poetry run lint` will be executed on CI.
This commit is contained in:
Zamil Majdy
2024-07-17 14:54:29 +04:00
committed by GitHub
parent 9e22409d66
commit 78b84289cb
40 changed files with 799 additions and 598 deletions

View File

@@ -111,6 +111,9 @@ jobs:
- name: Run Database Migrations
run: poetry run prisma migrate dev --name updates
- name: Run Linter
run: poetry run lint
- name: Run pytest with coverage
run: |
poetry run pytest -vv \

View File

@@ -64,6 +64,26 @@ To run the tests:
poetry run pytest
```
## Development
### Formatting & Linting
Auto formatter and linter are set up in the project. To run them:
Install:
```sh
poetry install --with dev
```
Format the code:
```sh
poetry run format
```
Lint the code:
```sh
poetry run lint
```
## Project Outline
The current project has the following main modules:

View File

@@ -1,4 +1,5 @@
from multiprocessing import freeze_support, set_start_method
from autogpt_server.executor import ExecutionManager, ExecutionScheduler
from autogpt_server.server import AgentServer
from autogpt_server.util.process import AppProcess

View File

@@ -1,7 +1,8 @@
import os
import glob
import importlib
import os
from pathlib import Path
from autogpt_server.data.block import Block
# Dynamically load all modules under autogpt_server.blocks
@@ -18,9 +19,6 @@ for module in modules:
AVAILABLE_MODULES.append(module)
# Load all Block instances from the available modules
AVAILABLE_BLOCKS = {
block.id: block
for block in [v() for v in Block.__subclasses__()]
}
AVAILABLE_BLOCKS = {block.id: block for block in [v() for v in Block.__subclasses__()]}
__all__ = ["AVAILABLE_MODULES", "AVAILABLE_BLOCKS"]

View File

@@ -7,8 +7,6 @@ from typing import TYPE_CHECKING, Iterator
from autogpt.agents.agent import Agent, AgentSettings
from autogpt.app.config import ConfigBuilder
from autogpt_server.data.block import Block, BlockOutput, BlockSchema
from autogpt_server.data.model import BlockSecret, SchemaField, SecretField
from forge.agent.components import AgentComponent
from forge.agent.protocols import CommandProvider
from forge.command import command
@@ -21,6 +19,9 @@ from forge.llm.providers.schema import ModelProviderName
from forge.models.json_schema import JSONSchema
from pydantic import Field, SecretStr
from autogpt_server.data.block import Block, BlockOutput, BlockSchema
from autogpt_server.data.model import BlockSecret, SchemaField, SecretField
if TYPE_CHECKING:
from autogpt.app.config import AppConfig
@@ -76,7 +77,6 @@ class BlockAgent(Agent):
class AutoGPTAgentBlock(Block):
class Input(BlockSchema):
task: str = Field()
task: str = SchemaField(
description="Task description for the agent.",
placeholder="Calculate and use Output command",
@@ -110,7 +110,7 @@ class AutoGPTAgentBlock(Block):
input_schema=AutoGPTAgentBlock.Input,
output_schema=AutoGPTAgentBlock.Output,
test_input={
"task": "Make calculations and use output command to output the result.",
"task": "Make calculations and use output command to output the result",
"input": "5 + 3",
"openai_api_key": "openai_api_key",
"enabled_components": [OutputComponent.__name__],
@@ -141,18 +141,17 @@ class AutoGPTAgentBlock(Block):
@staticmethod
def get_result(agent: BlockAgent) -> str:
# Execute agent
error: Exception | None = None
for tries in range(3):
try:
proposal = asyncio.run(agent.propose_action())
break
result = asyncio.run(agent.execute(proposal))
return str(result)
except Exception as e:
if tries == 2:
raise e
error = e
result = asyncio.run(agent.execute(proposal))
return str(result)
raise error or Exception("Failed to get result")
def run(self, input_data: Input) -> BlockOutput:
# Set up configuration

View File

@@ -1,22 +1,23 @@
from autogpt_server.data.block import Block, BlockSchema, BlockOutput
from typing import Any
from pydantic import Field
from autogpt_server.data.block import Block, BlockOutput, BlockSchema
class ValueBlock(Block):
"""
This block allows you to provide a constant value as a block, in a stateless manner.
The common use-case is simply pass the `input` data, it will `output` the same data.
But this will not retain the state, once it is executed, the output is consumed.
But this will not retain the state, once it is executed, the output is consumed.
To retain the state, you can feed the `output` to the `data` input, so that the data
is retained in the block for the next execution. You can then trigger the block by
feeding the `input` pin with any data, and the block will produce value of `data`.
Ex:
<constant_data> <any_trigger>
|| ||
|| ||
=====> `data` `input`
|| \\ //
|| ValueBlock
@@ -25,10 +26,15 @@ class ValueBlock(Block):
"""
class Input(BlockSchema):
input: Any = Field(description="Trigger the block to produce the output. "
"The value is only used when `data` is None.")
data: Any = Field(description="The constant data to be retained in the block. "
"This value is passed as `output`.", default=None)
input: Any = Field(
description="Trigger the block to produce the output. "
"The value is only used when `data` is None."
)
data: Any = Field(
description="The constant data to be retained in the block. "
"This value is passed as `output`.",
default=None,
)
class Output(BlockSchema):
output: Any

View File

@@ -1,7 +1,7 @@
import re
import os
import re
from typing import Type
from autogpt_server.data.block import Block, BlockOutput, BlockSchema
from autogpt_server.util.test import execute_block_test
@@ -43,7 +43,7 @@ class BlockInstallationBlock(Block):
else:
yield "error", "No UUID found in the code."
return
block_dir = os.path.dirname(__file__)
file_path = f"{block_dir}/{file_name}.py"
module_name = f"autogpt_server.blocks.{file_name}"

View File

@@ -1,7 +1,8 @@
from enum import Enum
import requests
from enum import Enum
from autogpt_server.data.block import Block, BlockSchema, BlockOutput
from autogpt_server.data.block import Block, BlockOutput, BlockSchema
class HttpMethod(Enum):

View File

@@ -86,18 +86,19 @@ class LlmCallBlock(Block):
"prompt": "User prompt",
},
test_output=("response", {"key1": "key1Value", "key2": "key2Value"}),
test_mock={"llm_call": lambda *args, **kwargs: json.dumps({
"key1": "key1Value",
"key2": "key2Value",
})},
test_mock={
"llm_call": lambda *args, **kwargs: json.dumps(
{
"key1": "key1Value",
"key2": "key2Value",
}
)
},
)
@staticmethod
def llm_call(
api_key: str,
model: LlmModel,
prompt: list[dict],
json_format: bool
api_key: str, model: LlmModel, prompt: list[dict], json_format: bool
) -> str:
provider = model.metadata.provider
@@ -145,16 +146,17 @@ class LlmCallBlock(Block):
if input_data.expected_format:
expected_format = [
f'"{k}": "{v}"' for k, v in
input_data.expected_format.items()
f'"{k}": "{v}"' for k, v in input_data.expected_format.items()
]
format_prompt = ",\n ".join(expected_format)
sys_prompt = trim_prompt(f"""
sys_prompt = trim_prompt(
f"""
|Reply in json format:
|{{
| {format_prompt}
| {format_prompt}
|}}
""")
"""
)
prompt.append({"role": "system", "content": sys_prompt})
prompt.append({"role": "user", "content": input_data.prompt})
@@ -173,8 +175,8 @@ class LlmCallBlock(Block):
retry_prompt = ""
model = input_data.model
api_key = (
input_data.api_key.get_secret_value() or
LlmApiKeys[model.metadata.provider].get_secret_value()
input_data.api_key.get_secret_value()
or LlmApiKeys[model.metadata.provider].get_secret_value()
)
for retry_count in range(input_data.retry):
@@ -196,7 +198,8 @@ class LlmCallBlock(Block):
yield "response", {"response": response_text}
return
retry_prompt = trim_prompt(f"""
retry_prompt = trim_prompt(
f"""
|This is your previous error response:
|--
|{response_text}
@@ -206,7 +209,8 @@ class LlmCallBlock(Block):
|--
|{parsed_error}
|--
""")
"""
)
prompt.append({"role": "user", "content": retry_prompt})
except Exception as e:
logger.error(f"Error calling LLM: {e}")
@@ -236,11 +240,12 @@ class TextSummarizerBlock(Block):
test_input={"text": "Lorem ipsum..." * 100},
test_output=("summary", "Final summary of a long text"),
test_mock={
"llm_call": lambda input_data:
{"final_summary": "Final summary of a long text"}
if "final_summary" in input_data.expected_format
else {"summary": "Summary of a chunk of text"}
}
"llm_call": lambda input_data: (
{"final_summary": "Final summary of a long text"}
if "final_summary" in input_data.expected_format
else {"summary": "Summary of a chunk of text"}
)
},
)
def run(self, input_data: Input) -> BlockOutput:
@@ -252,9 +257,7 @@ class TextSummarizerBlock(Block):
def _run(self, input_data: Input) -> BlockOutput:
chunks = self._split_text(
input_data.text,
input_data.max_tokens,
input_data.chunk_overlap
input_data.text, input_data.max_tokens, input_data.chunk_overlap
)
summaries = []
@@ -272,7 +275,7 @@ class TextSummarizerBlock(Block):
chunk_size = max_tokens - overlap
for i in range(0, len(words), chunk_size):
chunk = " ".join(words[i:i + max_tokens])
chunk = " ".join(words[i : i + max_tokens])
chunks.append(chunk)
return chunks
@@ -288,12 +291,14 @@ class TextSummarizerBlock(Block):
def _summarize_chunk(self, chunk: str, input_data: Input) -> str:
prompt = f"Summarize the following text concisely:\n\n{chunk}"
llm_response = self.llm_call(LlmCallBlock.Input(
prompt=prompt,
api_key=input_data.api_key,
model=input_data.model,
expected_format={"summary": "The summary of the given text."}
))
llm_response = self.llm_call(
LlmCallBlock.Input(
prompt=prompt,
api_key=input_data.api_key,
model=input_data.model,
expected_format={"summary": "The summary of the given text."},
)
)
return llm_response["summary"]
@@ -301,25 +306,33 @@ class TextSummarizerBlock(Block):
combined_text = " ".join(summaries)
if len(combined_text.split()) <= input_data.max_tokens:
prompt = ("Provide a final, concise summary of the following summaries:\n\n"
+ combined_text)
prompt = (
"Provide a final, concise summary of the following summaries:\n\n"
+ combined_text
)
llm_response = self.llm_call(LlmCallBlock.Input(
prompt=prompt,
api_key=input_data.api_key,
model=input_data.model,
expected_format={
"final_summary": "The final summary of all provided summaries."
}
))
llm_response = self.llm_call(
LlmCallBlock.Input(
prompt=prompt,
api_key=input_data.api_key,
model=input_data.model,
expected_format={
"final_summary": "The final summary of all provided summaries."
},
)
)
return llm_response["final_summary"]
else:
# If combined summaries are still too long, recursively summarize
return self._run(TextSummarizerBlock.Input(
text=combined_text,
api_key=input_data.api_key,
model=input_data.model,
max_tokens=input_data.max_tokens,
chunk_overlap=input_data.chunk_overlap
)).send(None)[1] # Get the first yielded value
return self._run(
TextSummarizerBlock.Input(
text=combined_text,
api_key=input_data.api_key,
model=input_data.model,
max_tokens=input_data.max_tokens,
chunk_overlap=input_data.chunk_overlap,
)
).send(None)[
1
] # Get the first yielded value

View File

@@ -1,13 +1,12 @@
from datetime import datetime, timedelta, timezone
import praw
from typing import Any
from pydantic import BaseModel, Field
from datetime import datetime, timezone
from typing import Iterator
import praw
from pydantic import BaseModel, Field
from autogpt_server.data.block import Block, BlockOutput, BlockSchema
from autogpt_server.util.mock import MockObject
from autogpt_server.data.model import BlockSecret, SecretField
from autogpt_server.util.mock import MockObject
class RedditCredentials(BaseModel):
@@ -54,15 +53,14 @@ class RedditGetPostsBlock(Block):
)
last_minutes: int | None = Field(
description="Post time to stop minutes ago while fetching posts",
default=None
default=None,
)
last_post: str | None = Field(
description="Post ID to stop when reached while fetching posts",
default=None
default=None,
)
post_limit: int | None = Field(
description="Number of posts to fetch",
default=10
description="Number of posts to fetch", default=10
)
class Output(BlockSchema):
@@ -86,10 +84,18 @@ class RedditGetPostsBlock(Block):
"post_limit": 2,
},
test_output=[
("post", RedditPost(
id="id1", subreddit="subreddit", title="title1", body="body1")),
("post", RedditPost(
id="id2", subreddit="subreddit", title="title2", body="body2")),
(
"post",
RedditPost(
id="id1", subreddit="subreddit", title="title1", body="body1"
),
),
(
"post",
RedditPost(
id="id2", subreddit="subreddit", title="title2", body="body2"
),
),
],
test_mock={
"get_posts": lambda _: [
@@ -97,7 +103,7 @@ class RedditGetPostsBlock(Block):
MockObject(id="id2", title="title2", selftext="body2"),
MockObject(id="id3", title="title2", selftext="body2"),
]
}
},
)
@staticmethod
@@ -111,8 +117,7 @@ class RedditGetPostsBlock(Block):
for post in self.get_posts(input_data):
if input_data.last_minutes:
post_datetime = datetime.fromtimestamp(
post.created_utc,
tz=timezone.utc
post.created_utc, tz=timezone.utc
)
time_difference = current_time - post_datetime
if time_difference.total_seconds() / 60 > input_data.last_minutes:
@@ -125,15 +130,14 @@ class RedditGetPostsBlock(Block):
id=post.id,
subreddit=input_data.subreddit,
title=post.title,
body=post.selftext
body=post.selftext,
)
class RedditPostCommentBlock(Block):
class Input(BlockSchema):
creds: RedditCredentials = Field(
description="Reddit credentials",
default=RedditCredentials()
description="Reddit credentials", default=RedditCredentials()
)
data: RedditComment = Field(description="Reddit comment")
@@ -147,7 +151,7 @@ class RedditPostCommentBlock(Block):
output_schema=RedditPostCommentBlock.Output,
test_input={"data": {"post_id": "id", "comment": "comment"}},
test_output=[("comment_id", "dummy_comment_id")],
test_mock={"reply_post": lambda creds, comment: "dummy_comment_id"}
test_mock={"reply_post": lambda creds, comment: "dummy_comment_id"},
)
@staticmethod
@@ -155,7 +159,7 @@ class RedditPostCommentBlock(Block):
client = get_praw(creds)
submission = client.submission(id=comment.post_id)
comment = submission.reply(comment.comment)
return comment.id
return comment.id # type: ignore
def run(self, input_data: Input) -> BlockOutput:
yield "comment_id", self.reply_post(input_data.creds, input_data.data)

View File

@@ -3,7 +3,7 @@ from urllib.parse import quote
import requests
from autogpt_server.data.block import Block, BlockSchema, BlockOutput
from autogpt_server.data.block import Block, BlockOutput, BlockSchema
class GetRequest:
@@ -37,7 +37,7 @@ class WikipediaSummaryBlock(Block, GetRequest):
topic = input_data.topic
url = f"https://en.wikipedia.org/api/rest_v1/page/summary/{topic}"
response = self.get_request(url, json=True)
yield "summary", response['extract']
yield "summary", response["extract"]
except requests.exceptions.HTTPError as http_err:
yield "error", f"HTTP error occurred: {http_err}"

View File

@@ -1,8 +1,9 @@
import re
import json
import re
from typing import Any
from pydantic import Field
from autogpt_server.data.block import Block, BlockOutput, BlockSchema
@@ -44,12 +45,12 @@ class TextMatcherBlock(Block):
flags = flags | re.IGNORECASE
if input_data.dot_all:
flags = flags | re.DOTALL
if isinstance(input_data.text, str):
text = input_data.text
else:
text = json.dumps(input_data.text)
if re.search(input_data.match, text, flags=flags):
yield "positive", output
else:
@@ -93,7 +94,7 @@ class TextParserBlock(Block):
flags = flags | re.IGNORECASE
if input_data.dot_all:
flags = flags | re.DOTALL
if isinstance(input_data.text, str):
text = input_data.text
else:
@@ -108,13 +109,9 @@ class TextParserBlock(Block):
class TextFormatterBlock(Block):
class Input(BlockSchema):
texts: list[str] = Field(
description="Texts (list) to format",
default=[]
)
texts: list[str] = Field(description="Texts (list) to format", default=[])
named_texts: dict[str, str] = Field(
description="Texts (dict) to format",
default={}
description="Texts (dict) to format", default={}
)
format: str = Field(
description="Template to format the text using `texts` and `named_texts`",

View File

@@ -108,7 +108,9 @@ def reddit(server_address: str):
Create an event graph
"""
import requests
from autogpt_server.usecases.reddit_marketing import create_test_graph
test_graph = create_test_graph()
url = f"{server_address}/graphs"
headers = {"Content-Type": "application/json"}
@@ -127,7 +129,9 @@ def populate_db(server_address: str):
Create an event graph
"""
import requests
from autogpt_server.usecases.sample import create_test_graph
test_graph = create_test_graph()
url = f"{server_address}/graphs"
headers = {"Content-Type": "application/json"}
@@ -161,7 +165,9 @@ def graph(server_address: str):
Create an event graph
"""
import requests
from autogpt_server.usecases.sample import create_test_graph
url = f"{server_address}/graphs"
headers = {"Content-Type": "application/json"}
data = create_test_graph().model_dump_json()
@@ -213,9 +219,6 @@ def websocket(server_address: str, graph_id: str):
import websockets
from autogpt_server.server.ws_api import ExecutionSubscription, Methods, WsMessage
import websockets
from autogpt_server.server.ws_api import ExecutionSubscription, Methods, WsMessage
async def send_message(server_address: str):

View File

@@ -1,4 +1,5 @@
from uuid import uuid4
from prisma import Prisma
from pydantic import BaseModel, Field, field_validator

View File

@@ -103,7 +103,7 @@ EXECUTION_RESULT_INCLUDE = {
async def create_graph_execution(
graph_id: str, graph_version: int, node_ids: list[str], data: dict[str, Any]
graph_id: str, graph_version: int, node_ids: list[str], data: dict[str, Any]
) -> tuple[str, list[ExecutionResult]]:
"""
Create a new AgentGraphExecution record.
@@ -185,9 +185,9 @@ async def upsert_execution_input(
async def upsert_execution_output(
node_exec_id: str,
output_name: str,
output_data: Any,
node_exec_id: str,
output_name: str,
output_data: Any,
) -> None:
"""
Insert AgentNodeExecutionInputOutput record for as one of AgentNodeExecution.Output.
@@ -202,8 +202,7 @@ async def upsert_execution_output(
async def update_execution_status(
node_exec_id: str,
status: ExecutionStatus
node_exec_id: str, status: ExecutionStatus
) -> ExecutionResult:
now = datetime.now(tz=timezone.utc)
data = {

View File

@@ -5,9 +5,9 @@ from typing import Any, Callable, ClassVar, Optional, TypeVar
from pydantic import Field, GetCoreSchemaHandler
from pydantic_core import (
CoreSchema,
core_schema,
PydanticUndefined,
PydanticUndefinedType,
core_schema,
)
from autogpt_server.util.settings import Secrets

View File

@@ -1,5 +1,5 @@
from datetime import datetime
from typing import Optional, Any
from typing import Any, Optional
from prisma.models import AgentGraphExecutionSchedule
@@ -15,14 +15,9 @@ class ExecutionSchedule(BaseDbModel):
input_data: dict[str, Any]
last_updated: Optional[datetime] = None
def __init__(
self,
is_enabled: Optional[bool] = None,
**kwargs
):
if is_enabled is None:
is_enabled = True
super().__init__(is_enabled=is_enabled, **kwargs)
def __init__(self, is_enabled: Optional[bool] = None, **kwargs):
kwargs["is_enabled"] = (is_enabled is None) or is_enabled
super().__init__(**kwargs)
@staticmethod
def from_db(schedule: AgentGraphExecutionSchedule):
@@ -39,22 +34,15 @@ class ExecutionSchedule(BaseDbModel):
async def get_active_schedules(last_fetch_time: datetime) -> list[ExecutionSchedule]:
query = AgentGraphExecutionSchedule.prisma().find_many(
where={
"isEnabled": True,
"lastUpdated": {"gt": last_fetch_time}
},
order={"lastUpdated": "asc"}
where={"isEnabled": True, "lastUpdated": {"gt": last_fetch_time}},
order={"lastUpdated": "asc"},
)
return [
ExecutionSchedule.from_db(schedule)
for schedule in await query
]
return [ExecutionSchedule.from_db(schedule) for schedule in await query]
async def disable_schedule(schedule_id: str):
await AgentGraphExecutionSchedule.prisma().update(
where={"id": schedule_id},
data={"isEnabled": False}
where={"id": schedule_id}, data={"isEnabled": False}
)
@@ -65,10 +53,7 @@ async def get_schedules(graph_id: str) -> list[ExecutionSchedule]:
"agentGraphId": graph_id,
},
)
return [
ExecutionSchedule.from_db(schedule)
for schedule in await query
]
return [ExecutionSchedule.from_db(schedule) for schedule in await query]
async def add_schedule(schedule: ExecutionSchedule) -> ExecutionSchedule:
@@ -87,6 +72,5 @@ async def add_schedule(schedule: ExecutionSchedule) -> ExecutionSchedule:
async def update_schedule(schedule_id: str, is_enabled: bool):
await AgentGraphExecutionSchedule.prisma().update(
where={"id": schedule_id},
data={"isEnabled": is_enabled}
where={"id": schedule_id}, data={"isEnabled": is_enabled}
)

View File

@@ -5,4 +5,3 @@ __all__ = [
"ExecutionManager",
"ExecutionScheduler",
]

View File

@@ -1,25 +1,25 @@
import asyncio
import logging
from concurrent.futures import ProcessPoolExecutor
from typing import Any, Coroutine, Generator, TypeVar, TYPE_CHECKING
from typing import TYPE_CHECKING, Any, Coroutine, Generator, TypeVar
if TYPE_CHECKING:
from autogpt_server.server.server import AgentServer
from autogpt_server.data import db
from autogpt_server.data.block import Block, get_block
from autogpt_server.data.execution import ExecutionQueue, ExecutionStatus
from autogpt_server.data.execution import NodeExecution as Execution
from autogpt_server.data.execution import (
create_graph_execution,
get_node_execution_input,
merge_execution_input,
parse_execution_output,
update_execution_status,
upsert_execution_output,
upsert_execution_input,
NodeExecution as Execution,
ExecutionStatus,
ExecutionQueue,
upsert_execution_output,
)
from autogpt_server.data.graph import Link, Node, get_node, get_graph, Graph
from autogpt_server.data.graph import Graph, Link, Node, get_graph, get_node
from autogpt_server.util.service import AppService, expose, get_service_client
logger = logging.getLogger(__name__)
@@ -34,9 +34,7 @@ ExecutionStream = Generator[Execution, None, None]
def execute_node(
loop: asyncio.AbstractEventLoop,
api_client: "AgentServer",
data: Execution
loop: asyncio.AbstractEventLoop, api_client: "AgentServer", data: Execution
) -> ExecutionStream:
"""
Execute a node in the graph. This will trigger a block execution on a node,
@@ -59,7 +57,7 @@ def execute_node(
def wait(f: Coroutine[T, Any, T]) -> T:
return loop.run_until_complete(f)
def update_execution(status: ExecutionStatus):
api_client.send_execution_update(
wait(update_execution_status(node_exec_id, status)).model_dump()
@@ -105,16 +103,16 @@ def execute_node(
def enqueue_next_nodes(
api_client: "AgentServer",
loop: asyncio.AbstractEventLoop,
node: Node,
output: tuple[str, Any],
graph_exec_id: str,
prefix: str,
api_client: "AgentServer",
loop: asyncio.AbstractEventLoop,
node: Node,
output: tuple[str, Any],
graph_exec_id: str,
prefix: str,
) -> list[Execution]:
def wait(f: Coroutine[T, Any, T]) -> T:
return loop.run_until_complete(f)
def execution_update(node_exec_id: str, status: ExecutionStatus):
api_client.send_execution_update(
wait(update_execution_status(node_exec_id, status)).model_dump()
@@ -134,12 +132,14 @@ def enqueue_next_nodes(
logger.error(f"{prefix} Error, next node {next_node_id} not found.")
return
next_node_exec_id = wait(upsert_execution_input(
node_id=next_node_id,
graph_exec_id=graph_exec_id,
input_name=next_input_name,
data=next_data
))
next_node_exec_id = wait(
upsert_execution_input(
node_id=next_node_id,
graph_exec_id=graph_exec_id,
input_name=next_input_name,
data=next_data,
)
)
next_node_input = wait(get_node_execution_input(next_node_exec_id))
is_valid, validation_msg = validate_exec(next_node, next_node_input)
@@ -160,7 +160,8 @@ def enqueue_next_nodes(
)
return [
execution for link in node.output_links
execution
for link in node.output_links
if (execution := update_execution_result(link))
]
@@ -183,8 +184,8 @@ def validate_exec(node: Node, data: dict[str, Any]) -> tuple[bool, str]:
error_message = f"Input data missing for {node_block.name}:"
input_fields_from_schema = node_block.input_schema.get_required_fields() # type: ignore
if not input_fields_from_schema.issubset(data): # type: ignore
input_fields_from_schema = node_block.input_schema.get_required_fields()
if not input_fields_from_schema.issubset(data):
return False, f"{error_message} {input_fields_from_schema - set(data)}"
input_fields_from_nodes = {link.sink_name for link in node.input_links}
@@ -201,6 +202,7 @@ def validate_exec(node: Node, data: dict[str, Any]) -> tuple[bool, str]:
def get_agent_server_client() -> "AgentServer":
from autogpt_server.server.server import AgentServer
return get_service_client(AgentServer)
@@ -296,9 +298,8 @@ class ExecutionManager(AppService):
}
def add_node_execution(self, execution: Execution) -> Execution:
res = self.run_and_wait(update_execution_status(
execution.node_exec_id,
ExecutionStatus.QUEUED
))
res = self.run_and_wait(
update_execution_status(execution.node_exec_id, ExecutionStatus.QUEUED)
)
self.agent_server_client.send_execution_update(res.model_dump())
return self.queue.add(execution)

View File

@@ -1,14 +1,14 @@
import logging
import time
from datetime import datetime
from typing import Any
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from datetime import datetime
from autogpt_server.data import schedule as model
from autogpt_server.util.service import AppService, expose, get_service_client
from autogpt_server.executor.manager import ExecutionManager
from autogpt_server.util.service import AppService, expose, get_service_client
logger = logging.getLogger(__name__)

View File

@@ -3,7 +3,8 @@ from typing import Dict, Set
from fastapi import WebSocket
from autogpt_server.data import execution
from autogpt_server.server.model import WsMessage, Methods
from autogpt_server.server.model import Methods, WsMessage
class ConnectionManager:
def __init__(self):
@@ -36,7 +37,7 @@ class ConnectionManager:
message = WsMessage(
method=Methods.EXECUTION_EVENT,
channel=graph_id,
data=result.model_dump()
data=result.model_dump(),
).model_dump_json()
for connection in self.subscriptions[graph_id]:
await connection.send_text(message)

View File

@@ -17,7 +17,8 @@ from fastapi.responses import JSONResponse
from fastapi.staticfiles import StaticFiles
import autogpt_server.server.ws_api
from autogpt_server.data import block, db, execution, graph as graph_db
from autogpt_server.data import block, db, execution
from autogpt_server.data import graph as graph_db
from autogpt_server.executor import ExecutionManager, ExecutionScheduler
from autogpt_server.server.conn_manager import ConnectionManager
from autogpt_server.server.model import (
@@ -27,8 +28,7 @@ from autogpt_server.server.model import (
WsMessage,
)
from autogpt_server.util.data import get_frontend_path
from autogpt_server.util.service import expose # type: ignore
from autogpt_server.util.service import AppService, get_service_client
from autogpt_server.util.service import AppService, expose, get_service_client
from autogpt_server.util.settings import Settings
@@ -530,7 +530,7 @@ class AgentServer(AppService):
self, graph_id: str, node_input: dict[Any, Any]
) -> dict[Any, Any]:
try:
return self.execution_manager_client.add_execution(graph_id, node_input) # type: ignore
return self.execution_manager_client.add_execution(graph_id, node_input)
except Exception as e:
msg = e.__str__().encode().decode("unicode_escape")
raise HTTPException(status_code=400, detail=msg)

View File

@@ -1,8 +1,8 @@
from fastapi import WebSocket, WebSocketDisconnect
from autogpt_server.server.conn_manager import ConnectionManager
from autogpt_server.server.model import ExecutionSubscription, WsMessage, Methods
from autogpt_server.server.model import ExecutionSubscription, Methods, WsMessage
async def websocket_router(websocket: WebSocket, manager: ConnectionManager):
await manager.connect(websocket)

View File

@@ -1,14 +1,13 @@
from pathlib import Path
from autogpt_server.blocks.llm import LlmCallBlock
from autogpt_server.blocks.basic import ValueBlock
from autogpt_server.blocks.block import BlockInstallationBlock
from autogpt_server.blocks.http import HttpRequestBlock
from autogpt_server.blocks.text import TextParserBlock, TextFormatterBlock
from autogpt_server.data.graph import Graph, Node, Link, create_graph
from autogpt_server.blocks.llm import LlmCallBlock
from autogpt_server.blocks.text import TextFormatterBlock, TextParserBlock
from autogpt_server.data.graph import Graph, Link, Node, create_graph
from autogpt_server.util.test import SpinTestServer, wait_execution
sample_block_modules = {
"ai": "Block that calls the AI model to generate text.",
"basic": "Block that does basic operations.",
@@ -32,7 +31,7 @@ def create_test_graph() -> Graph:
TextFormatterBlock (input query)
||
v
HttpRequestBlock (browse)
HttpRequestBlock (browse)
||
v
------> ValueBlock===============
@@ -50,9 +49,7 @@ def create_test_graph() -> Graph:
------ BlockInstallationBlock ======
"""
# ======= Nodes ========= #
input_data = Node(
block_id=ValueBlock().id
)
input_data = Node(block_id=ValueBlock().id)
input_text_formatter = Node(
block_id=TextFormatterBlock().id,
input_default={
@@ -83,7 +80,7 @@ Here is the information I get to write a Python code for that:
Here is your previous attempt:
{previous_attempt}
""",
"named_texts_#_previous_attempt": "No previous attempt found."
"named_texts_#_previous_attempt": "No previous attempt found.",
},
)
code_gen_llm_call = Node(
@@ -134,81 +131,77 @@ Here are a couple of sample of the Block class implementation:
code_text_parser,
block_installation,
]
# ======= Links ========= #
links = [
Link(
source_id=input_data.id,
sink_id=input_text_formatter.id,
source_name="output",
sink_name="named_texts_#_query"),
sink_name="named_texts_#_query",
),
Link(
source_id=input_text_formatter.id,
sink_id=search_http_request.id,
source_name="output",
sink_name="body_#_query"),
sink_name="body_#_query",
),
Link(
source_id=search_http_request.id,
sink_id=search_result_constant.id,
source_name="response_#_reply",
sink_name="input"),
sink_name="input",
),
Link( # Loopback for constant block
source_id=search_result_constant.id,
sink_id=search_result_constant.id,
source_name="output",
sink_name="data"
sink_name="data",
),
Link(
source_id=search_result_constant.id,
sink_id=prompt_text_formatter.id,
source_name="output",
sink_name="named_texts_#_search_result"
sink_name="named_texts_#_search_result",
),
Link(
source_id=input_data.id,
sink_id=prompt_text_formatter.id,
source_name="output",
sink_name="named_texts_#_query"
sink_name="named_texts_#_query",
),
Link(
source_id=prompt_text_formatter.id,
sink_id=code_gen_llm_call.id,
source_name="output",
sink_name="prompt"
sink_name="prompt",
),
Link(
source_id=code_gen_llm_call.id,
sink_id=code_text_parser.id,
source_name="response_#_response",
sink_name="text"
sink_name="text",
),
Link(
source_id=code_text_parser.id,
sink_id=block_installation.id,
source_name="positive",
sink_name="code"
sink_name="code",
),
Link(
source_id=block_installation.id,
sink_id=prompt_text_formatter.id,
source_name="error",
sink_name="named_texts_#_previous_attempt"
sink_name="named_texts_#_previous_attempt",
),
Link( # Re-trigger search result.
source_id=block_installation.id,
sink_id=search_result_constant.id,
source_name="error",
sink_name="input"
sink_name="input",
),
]
# ======= Graph ========= #
return Graph(
name="BlockAutoGen",
@@ -225,7 +218,13 @@ async def block_autogen_agent():
input_data = {"input": "Write me a block that writes a string into a file."}
response = await server.agent_server.execute_graph(test_graph.id, input_data)
print(response)
result = await wait_execution(test_manager, test_graph.id, response["id"], 10, 1200)
result = await wait_execution(
exec_manager=test_manager,
graph_id=test_graph.id,
graph_exec_id=response["id"],
num_execs=10,
timeout=1200,
)
print(result)

View File

@@ -1,17 +1,35 @@
from autogpt_server.data.graph import Graph, Link, Node, create_graph
from autogpt_server.blocks.llm import LlmCallBlock
from autogpt_server.blocks.reddit import (
RedditGetPostsBlock,
RedditPostCommentBlock,
)
from autogpt_server.blocks.reddit import RedditGetPostsBlock, RedditPostCommentBlock
from autogpt_server.blocks.text import TextFormatterBlock, TextMatcherBlock
from autogpt_server.data.graph import Graph, Link, Node, create_graph
from autogpt_server.util.test import SpinTestServer, wait_execution
def create_test_graph() -> Graph:
# /--- post_id -----------\ /--- post_id ---\
# subreddit --> RedditGetPostsBlock ---- post_body -------- TextFormatterBlock ----- LlmCallBlock / TextRelevancy --- relevant/not -- TextMatcherBlock -- Yes {postid, text} --- RedditPostCommentBlock
# \--- post_title -------/ \--- marketing_text ---/ -- No
"""
subreddit
||
v
RedditGetPostsBlock (post_id, post_title, post_body)
// || \\
post_id post_title post_body
|| || ||
v v v
TextFormatterBlock (format)
||
v
LlmCallBlock / TextRelevancy
|| || ||
post_id is_relevant marketing_text
|| || ||
v v v
TextMatcherBlock
|| ||
positive negative
||
v
RedditPostCommentBlock
"""
# Hardcoded inputs
reddit_get_post_input = {
"post_limit": 3,
@@ -26,7 +44,8 @@ Based on the following post, write your marketing comment:
}
llm_call_input = {
"sys_prompt": """
You are an expert at marketing, and have been tasked with picking Reddit posts that are relevant to your product.
You are an expert at marketing.
You have been tasked with picking Reddit posts that are relevant to your product.
The product you are marketing is: Auto-GPT an autonomous AI agent utilizing GPT model.
You reply the post that you find it relevant to be replied with marketing text.
Make sure to only comment on a relevant post.
@@ -130,4 +149,5 @@ async def reddit_marketing_agent():
if __name__ == "__main__":
import asyncio
asyncio.run(reddit_marketing_agent())

View File

@@ -1,4 +1,4 @@
from autogpt_server.blocks.basic import ValueBlock, PrintingBlock
from autogpt_server.blocks.basic import PrintingBlock, ValueBlock
from autogpt_server.blocks.text import TextFormatterBlock
from autogpt_server.data import graph
from autogpt_server.data.graph import create_graph
@@ -30,19 +30,19 @@ def create_test_graph() -> graph.Graph:
source_id=nodes[0].id,
sink_id=nodes[2].id,
source_name="output",
sink_name="texts_$_1"
sink_name="texts_$_1",
),
graph.Link(
source_id=nodes[1].id,
sink_id=nodes[2].id,
source_name="output",
sink_name="texts_$_2"
sink_name="texts_$_2",
),
graph.Link(
source_id=nodes[2].id,
sink_id=nodes[3].id,
source_name="output",
sink_name="text"
sink_name="text",
),
]
@@ -67,4 +67,5 @@ async def sample_agent():
if __name__ == "__main__":
import asyncio
asyncio.run(sample_agent())

View File

@@ -1,4 +1,5 @@
import json
from fastapi.encoders import jsonable_encoder

View File

@@ -1,7 +1,7 @@
import os
import sys
from abc import ABC, abstractmethod
from multiprocessing import Process, freeze_support, set_start_method
from multiprocessing import Process, set_start_method
from typing import Optional

View File

@@ -1,10 +1,9 @@
import time
import asyncio
import logging
import threading
import time
from abc import abstractmethod
from typing import Any, Callable, Type, TypeVar, cast, Coroutine
from typing import Any, Callable, Coroutine, Type, TypeVar, cast
from Pyro5 import api as pyro
from Pyro5 import nameserver

View File

@@ -1,6 +1,7 @@
import json
import os
from typing import Any, Dict, Generic, Set, Tuple, Type, TypeVar
from pydantic import BaseModel, Field, PrivateAttr
from pydantic_settings import (
BaseSettings,
@@ -57,27 +58,28 @@ class Config(UpdateTrackingModel["Config"], BaseSettings):
@classmethod
def settings_customise_sources(
cls,
settings_cls: Type[BaseSettings],
init_settings: PydanticBaseSettingsSource,
env_settings: PydanticBaseSettingsSource,
dotenv_settings: PydanticBaseSettingsSource,
file_secret_settings: PydanticBaseSettingsSource,
cls,
settings_cls: Type[BaseSettings],
init_settings: PydanticBaseSettingsSource,
env_settings: PydanticBaseSettingsSource,
dotenv_settings: PydanticBaseSettingsSource,
file_secret_settings: PydanticBaseSettingsSource,
) -> Tuple[PydanticBaseSettingsSource, ...]:
return (JsonConfigSettingsSource(settings_cls),)
class Secrets(UpdateTrackingModel["Secrets"], BaseSettings):
"""Secrets for the server."""
openai_api_key: str = Field(default="", description="OpenAI API key")
anthropic_api_key: str = Field(default="", description="Anthropic API key")
groq_api_key: str = Field(default="", description="Groq API key")
reddit_client_id: str = Field(default="", description="Reddit client ID")
reddit_client_secret: str = Field(default="", description="Reddit client secret")
reddit_username: str = Field(default="", description="Reddit username")
reddit_password: str = Field(default="", description="Reddit password")
# Add more secret fields as needed
model_config = SettingsConfigDict(

View File

@@ -37,17 +37,21 @@ class SpinTestServer:
async def wait_execution(
exec_manager: ExecutionManager,
graph_id: str,
graph_exec_id: str,
num_execs: int,
timeout: int = 20,
exec_manager: ExecutionManager,
graph_id: str,
graph_exec_id: str,
num_execs: int,
timeout: int = 20,
) -> list:
async def is_execution_completed():
execs = await AgentServer().get_run_execution_results(graph_id, graph_exec_id)
return exec_manager.queue.empty() and len(execs) == num_execs and all(
v.status in [ExecutionStatus.COMPLETED, ExecutionStatus.FAILED]
for v in execs
return (
exec_manager.queue.empty()
and len(execs) == num_execs
and all(
v.status in [ExecutionStatus.COMPLETED, ExecutionStatus.FAILED]
for v in execs
)
)
# Wait for the executions to complete
@@ -98,7 +102,8 @@ def execute_block_test(block: Block):
log(f"{prefix} {mark} comparing `{data}` vs `{expected_data}`")
if not is_matching:
raise ValueError(
f"{prefix}: wrong output {data} vs {expected_data}")
f"{prefix}: wrong output {data} vs {expected_data}"
)
compare(output_data, ex_output_data)
compare(output_name, ex_output_name)

View File

@@ -0,0 +1,23 @@
import os
import subprocess
directory = os.path.dirname(os.path.realpath(__file__))
def run(*command: str) -> None:
print(f">>>>> Running poetry run {' '.join(command)}")
subprocess.run(["poetry", "run"] + list(command), cwd=directory, check=True)
def lint():
run("ruff", "check", ".", "--exit-zero")
run("isort", "--diff", "--check", "--profile", "black", ".")
run("black", "--diff", "--check", ".")
run("pyright")
def format():
run("ruff", "check", "--fix", ".")
run("isort", "--profile", "black", ".")
run("black", ".")
run("pyright", ".")

File diff suppressed because it is too large Load Diff

View File

@@ -18,7 +18,6 @@ pydantic = "^2.7.2"
pytest = "^8.2.1"
uvicorn = { extras = ["standard"], version = "^0.30.1" }
fastapi = "^0.109.0"
ruff = "^0.4.8"
flake8 = "^7.0.0"
jsonschema = "^4.22.0"
psutil = "^5.9.8"
@@ -42,6 +41,10 @@ poethepoet = "^0.26.1"
httpx = "^0.27.0"
pytest-watcher = "^0.4.2"
requests = "^2.32.3"
ruff = "^0.5.2"
pyright = "^1.1.371"
isort = "^5.13.2"
black = "^24.4.2"
[build-system]
requires = ["poetry-core"]
@@ -50,6 +53,8 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry.scripts]
app = "autogpt_server.app:main"
cli = "autogpt_server.cli:main"
format = "code:format"
lint = "code:lint"
# https://poethepoet.natn.io/index.html
[tool.poe]

View File

@@ -87,7 +87,9 @@ setup(
"excludes": ["readability.compat.two"],
"include_files": [
# source, destination in the bundle
# (../frontend, example_files) would also work but you'd need to load the frontend differently in the data.py to correctly get the path when frozen
# (../frontend, example_files) would also work, but you'd need to load
# the frontend differently in the data.py to correctly
# get the path when frozen.
("../example_files", "example_files"),
],
},
@@ -101,7 +103,6 @@ setup(
"applications_shortcut": True,
"volume_label": "AutoGPTServer",
"background": "builtin-arrow",
"license": {
"default-language": "en_US",
"licenses": {"en_US": license_file},
@@ -112,7 +113,8 @@ setup(
"Disagree",
"Print",
"Save",
"If you agree, click Agree to continue the installation. If you do not agree, click Disagree to cancel the installation.",
"If you agree, click Agree to continue the installation. If "
"you do not agree, click Disagree to cancel the installation.",
]
},
},

View File

@@ -3,8 +3,8 @@ import pytest
from autogpt_server.data import execution, graph
from autogpt_server.executor import ExecutionManager
from autogpt_server.server import AgentServer
from autogpt_server.util.test import SpinTestServer, wait_execution
from autogpt_server.usecases.sample import create_test_graph
from autogpt_server.util.test import SpinTestServer, wait_execution
async def execute_graph(test_manager: ExecutionManager, test_graph: graph.Graph) -> str:

View File

@@ -1,9 +1,10 @@
import pytest
from autogpt_server.data import db, graph
from autogpt_server.executor import ExecutionScheduler
from autogpt_server.usecases.sample import create_test_graph
from autogpt_server.util.service import get_service_client
from autogpt_server.util.test import SpinTestServer
from autogpt_server.usecases.sample import create_test_graph
@pytest.mark.asyncio(scope="session")

View File

@@ -6,7 +6,7 @@ from fastapi import WebSocket
from autogpt_server.data.execution import ExecutionResult, ExecutionStatus
from autogpt_server.server.conn_manager import ConnectionManager
from autogpt_server.server.model import WsMessage, Methods
from autogpt_server.server.model import Methods, WsMessage
@pytest.fixture

View File

@@ -75,7 +75,6 @@ async def test_websocket_router_invalid_method(
mock_websocket: AsyncMock, mock_manager: AsyncMock
) -> None:
mock_websocket.receive_text.side_effect = [
WsMessage(method=Methods.EXECUTION_EVENT).model_dump_json(),
WebSocketDisconnect(),
]

View File

@@ -23,6 +23,7 @@ class TestService(AppService):
def fun_with_async(self, a: int, b: int) -> int:
async def add_async(a: int, b: int) -> int:
return a + b
return self.run_and_wait(add_async(a, b))