Release v0.4.1 (#4686)

Co-authored-by: Reinier van der Leer <github@pwuts.nl>
Co-authored-by: Nicholas Tindle <nick@ntindle.com>
Co-authored-by: Nicholas Tindle <nicktindle@outlook.com>
Co-authored-by: k-boikov <64261260+k-boikov@users.noreply.github.com>
Co-authored-by: merwanehamadi <merwanehamadi@gmail.com>
Co-authored-by: Merwane Hamadi <merwanehamadi@gmail.com>
Co-authored-by: Richard Beales <rich@richbeales.net>
Co-authored-by: Luke K <2609441+lc0rp@users.noreply.github.com>
Co-authored-by: Luke K (pr-0f3t) <2609441+lc0rp@users.noreply.github.com>
Co-authored-by: Erik Peterson <e@eriklp.com>
Co-authored-by: Auto-GPT-Bot <github-bot@agpt.co>
Co-authored-by: Benny van der Lans <49377421+bfalans@users.noreply.github.com>
Co-authored-by: Jan <jan-github@phobia.de>
Co-authored-by: Robin Richtsfeld <robin.richtsfeld@gmail.com>
Co-authored-by: Marc Bornträger <marc.borntraeger@gmail.com>
Co-authored-by: Stefan Ayala <stefanayala3266@gmail.com>
Co-authored-by: javableu <45064273+javableu@users.noreply.github.com>
Co-authored-by: DGdev91 <DGdev91@users.noreply.github.com>
Co-authored-by: Kinance <kinance@gmail.com>
Co-authored-by: digger yu <digger-yu@outlook.com>
Co-authored-by: David <scenaristeur@gmail.com>
Co-authored-by: gravelBridge <john.tian31@gmail.com>
Fix Python CI "update cassettes" step (#4591)
fix CI (#4596)
Fix inverted logic for deny_command (#4563)
fix current_score.json generation (#4601)
Fix duckduckgo rate limiting (#4592)
Fix debug code challenge (#4632)
Fix issues with information retrieval challenge a (#4622)
fix issues with env configuration and .env.template (#4630)
Fix prompt issue causing 'No Command' issues and challenge to fail (#4623)
Fix benchmark logs (#4653)
Fix typo in docs/setup.md (#4613)
Fix run.sh shebang (#4561)
Fix autogpt docker image not working because missing prompt_settings (#4680)
Fix execute_command coming from plugins (#4730)
This commit is contained in:
Luke K (pr-0f3t)
2023-06-19 12:41:40 -04:00
committed by GitHub
parent 25a7957bb8
commit abb397e442
142 changed files with 3185 additions and 2562 deletions

View File

@@ -1,26 +1,24 @@
import pytest
from autogpt.agent import Agent
from tests.integration.challenges.challenge_decorator.challenge_decorator import (
challenge,
)
from tests.integration.challenges.utils import run_interaction_loop
from tests.utils import requires_api_key
from tests.challenges.challenge_decorator.challenge_decorator import challenge
from tests.challenges.utils import run_interaction_loop
CYCLE_COUNT = 2
@requires_api_key("OPENAI_API_KEY")
@pytest.mark.vcr
@challenge
@challenge()
def test_browse_website(
browser_agent: Agent,
patched_api_requestor: None,
monkeypatch: pytest.MonkeyPatch,
level_to_run: int,
challenge_name: str,
) -> None:
file_path = browser_agent.workspace.get_path("browse_website.txt")
run_interaction_loop(monkeypatch, browser_agent, CYCLE_COUNT)
run_interaction_loop(
monkeypatch, browser_agent, CYCLE_COUNT, challenge_name, level_to_run
)
# content = read_file(file_path, config)
content = open(file_path, encoding="utf-8").read()

View File

@@ -0,0 +1,42 @@
from typing import List
import pytest
from autogpt.agent import Agent
from autogpt.commands.file_operations import read_file
from tests.challenges.challenge_decorator.challenge_decorator import challenge
from tests.challenges.utils import get_workspace_path, run_interaction_loop
CYCLE_COUNT_PER_LEVEL = [1, 1]
EXPECTED_OUTPUTS_PER_LEVEL = [
{"hello_world.txt": ["Hello World"]},
{"hello_world_1.txt": ["Hello World"], "hello_world_2.txt": ["Hello World"]},
]
@challenge()
def test_write_file(
file_system_agents: List[Agent],
patched_api_requestor: None,
monkeypatch: pytest.MonkeyPatch,
level_to_run: int,
challenge_name: str,
) -> None:
file_system_agent = file_system_agents[level_to_run - 1]
run_interaction_loop(
monkeypatch,
file_system_agent,
CYCLE_COUNT_PER_LEVEL[level_to_run - 1],
challenge_name,
level_to_run,
)
expected_outputs = EXPECTED_OUTPUTS_PER_LEVEL[level_to_run - 1]
for file_name, expected_lines in expected_outputs.items():
file_path = get_workspace_path(file_system_agent, file_name)
content = read_file(file_path, file_system_agent)
for expected_line in expected_lines:
assert (
expected_line in content
), f"Expected '{expected_line}' in file {file_name}, but it was not found"

View File

@@ -3,6 +3,7 @@ from typing import Optional
class Challenge:
BEAT_CHALLENGES = False
DEFAULT_CHALLENGE_NAME = "default_challenge_name"
def __init__(
self,
@@ -10,7 +11,7 @@ class Challenge:
category: str,
max_level: int,
is_new_challenge: bool,
max_level_beaten: Optional[int],
max_level_beaten: Optional[int] = None,
level_to_run: Optional[int] = None,
) -> None:
self.name = name

View File

@@ -0,0 +1,89 @@
import os
from functools import wraps
from typing import Any, Callable, Optional
import pytest
from flaky import flaky # type: ignore
from tests.challenges.challenge_decorator.challenge import Challenge
from tests.challenges.challenge_decorator.challenge_utils import create_challenge
from tests.challenges.challenge_decorator.score_utils import (
get_scores,
update_new_score,
)
from tests.utils import requires_api_key
MAX_LEVEL_TO_IMPROVE_ON = (
1 # we will attempt to beat 1 level above the current level for now.
)
CHALLENGE_FAILED_MESSAGE = "Challenges can sometimes fail randomly, please run this test again and if it fails reach out to us on https://discord.gg/autogpt in the 'challenges' channel to let us know the challenge you're struggling with."
def challenge(
max_runs: int = 2, min_passes: int = 1, api_key: str = "OPENAI_API_KEY"
) -> Callable[[Callable[..., Any]], Callable[..., None]]:
def decorator(func: Callable[..., Any]) -> Callable[..., None]:
@requires_api_key(api_key)
@pytest.mark.vcr
@flaky(max_runs=max_runs, min_passes=min_passes)
@wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> None:
run_remaining = MAX_LEVEL_TO_IMPROVE_ON if Challenge.BEAT_CHALLENGES else 1
original_error: Optional[Exception] = None
while run_remaining > 0:
current_score, new_score, new_score_location = get_scores()
level_to_run = (
kwargs["level_to_run"] if "level_to_run" in kwargs else None
)
challenge = create_challenge(
func, current_score, Challenge.BEAT_CHALLENGES, level_to_run
)
if challenge.level_to_run is not None:
kwargs["level_to_run"] = challenge.level_to_run
kwargs["challenge_name"] = challenge.name
try:
func(*args, **kwargs)
challenge.succeeded = True
except AssertionError as err:
original_error = AssertionError(
f"{CHALLENGE_FAILED_MESSAGE}\n{err}"
)
challenge.succeeded = False
except Exception as err:
original_error = err
challenge.succeeded = False
else:
challenge.skipped = True
if os.environ.get("CI") == "true":
new_max_level_beaten = get_new_max_level_beaten(
challenge, Challenge.BEAT_CHALLENGES
)
update_new_score(
new_score_location, new_score, challenge, new_max_level_beaten
)
if challenge.level_to_run is None:
pytest.skip("This test has not been unlocked yet.")
if not challenge.succeeded:
if Challenge.BEAT_CHALLENGES or challenge.is_new_challenge:
pytest.xfail(str(original_error))
if original_error:
raise original_error
run_remaining -= 1
return wrapper
return decorator
def get_new_max_level_beaten(
challenge: Challenge, beat_challenges: bool
) -> Optional[int]:
if challenge.succeeded:
return challenge.level_to_run
if challenge.skipped:
return challenge.max_level_beaten
# Challenge failed
return challenge.max_level_beaten if beat_challenges else None

View File

@@ -1,7 +1,7 @@
import os
from typing import Any, Callable, Dict, Optional, Tuple
from tests.integration.challenges.challenge_decorator.challenge import Challenge
from tests.challenges.challenge_decorator.challenge import Challenge
CHALLENGE_PREFIX = "test_"

View File

@@ -2,7 +2,7 @@ import json
import os
from typing import Any, Dict, Optional, Tuple
from tests.integration.challenges.challenge_decorator.challenge import Challenge
from tests.challenges.challenge_decorator.challenge import Challenge
CURRENT_SCORE_LOCATION = "../current_score"
NEW_SCORE_LOCATION = "../new_score"

View File

@@ -5,9 +5,8 @@ from _pytest.config import Config
from _pytest.config.argparsing import Parser
from _pytest.fixtures import FixtureRequest
from tests.integration.challenges.challenge_decorator.challenge import Challenge
from tests.integration.conftest import BASE_VCR_CONFIG
from tests.vcr.vcr_filter import before_record_response
from tests.challenges.challenge_decorator.challenge import Challenge
from tests.vcr import before_record_response
def before_record_response_filter_errors(
@@ -21,9 +20,9 @@ def before_record_response_filter_errors(
@pytest.fixture(scope="module")
def vcr_config() -> Dict[str, Any]:
def vcr_config(get_base_vcr_config: Dict[str, Any]) -> Dict[str, Any]:
# this fixture is called by the pytest-recording vcr decorator.
return BASE_VCR_CONFIG | {
return get_base_vcr_config | {
"before_record_response": before_record_response_filter_errors,
}
@@ -52,6 +51,11 @@ def level_to_run(request: FixtureRequest) -> int:
return request.config.option.level
@pytest.fixture
def challenge_name() -> str:
return Challenge.DEFAULT_CHALLENGE_NAME
@pytest.fixture(autouse=True)
def check_beat_challenges(request: FixtureRequest) -> None:
Challenge.BEAT_CHALLENGES = request.config.getoption("--beat-challenges")

View File

@@ -5,20 +5,20 @@
"max_level_beaten": 1
},
"write_file": {
"max_level": 1,
"max_level": 2,
"max_level_beaten": 1
}
},
"debug_code": {
"debug_code_challenge_a": {
"max_level": 1,
"max_level": 2,
"max_level_beaten": 1
}
},
"information_retrieval": {
"information_retrieval_challenge_a": {
"max_level": 3,
"max_level_beaten": 1
"max_level_beaten": null
},
"information_retrieval_challenge_b": {
"max_level": 1,
@@ -42,7 +42,11 @@
},
"memory_challenge_c": {
"max_level": 5,
"max_level_beaten": 1
"max_level_beaten": null
},
"memory_challenge_d": {
"max_level": 5,
"max_level_beaten": null
}
}
}

View File

@@ -2,18 +2,12 @@
from typing import List, Optional
def two_sum(nums: List, target: int) -> Optional[int]:
def two_sum(nums: List, target: int) -> Optional[List[int]]:
seen = {}
for i, num in enumerate(nums):
typo
complement = target - num
if complement in seen:
return [seen[complement], i]
seen[num] = i
return None
# Example usage:
nums = [2, 7, 11, 15]
target = 9
result = two_sum(nums, target)
print(result) # Output: [0, 1]

View File

@@ -0,0 +1,31 @@
# mypy: ignore-errors
from code import two_sum
from typing import List
def test_two_sum(nums: List, target: int, expected_result: List[int]) -> None:
result = two_sum(nums, target)
print(result)
assert (
result == expected_result
), f"AssertionError: Expected the output to be {expected_result}"
if __name__ == "__main__":
# test the trivial case with the first two numbers
nums = [2, 7, 11, 15]
target = 9
expected_result = [0, 1]
test_two_sum(nums, target, expected_result)
# test for ability to use zero and the same number twice
nums = [2, 7, 0, 15, 12, 0]
target = 0
expected_result = [2, 5]
test_two_sum(nums, target, expected_result)
# test for first and last index usage and negative numbers
nums = [-6, 7, 11, 4]
target = -2
expected_result = [0, 3]
test_two_sum(nums, target, expected_result)

View File

@@ -0,0 +1,56 @@
from pathlib import Path
import pytest
from pytest_mock import MockerFixture
from autogpt.agent import Agent
from autogpt.commands.execute_code import execute_python_file
from tests.challenges.challenge_decorator.challenge_decorator import challenge
from tests.challenges.utils import (
copy_file_into_workspace,
get_workspace_path,
run_interaction_loop,
)
CYCLE_COUNT = 5
EXPECTED_VALUES = ["[0, 1]", "[2, 5]", "[0, 3]"]
DIRECTORY_PATH = Path(__file__).parent / "data"
CODE_FILE_PATH = "code.py"
TEST_FILE_PATH = "test.py"
@challenge()
def test_debug_code_challenge_a(
debug_code_agents: Agent,
monkeypatch: pytest.MonkeyPatch,
patched_api_requestor: MockerFixture,
level_to_run: int,
challenge_name: str,
) -> None:
"""
Test whether the agent can debug a simple code snippet.
:param debug_code_agent: The agent to test.
:param monkeypatch: pytest's monkeypatch utility for modifying builtins.
:patched_api_requestor: Sends api requests to our API CI pipeline
:level_to_run: The level to run.
"""
debug_code_agent = debug_code_agents[level_to_run - 1]
copy_file_into_workspace(debug_code_agent, DIRECTORY_PATH, CODE_FILE_PATH)
copy_file_into_workspace(debug_code_agent, DIRECTORY_PATH, TEST_FILE_PATH)
run_interaction_loop(
monkeypatch, debug_code_agent, CYCLE_COUNT, challenge_name, level_to_run
)
output = execute_python_file(
get_workspace_path(debug_code_agent, TEST_FILE_PATH), debug_code_agent
)
assert "error" not in output.lower(), f"Errors found in output: {output}!"
for expected_value in EXPECTED_VALUES:
assert (
expected_value in output
), f"Expected output to contain {expected_value}, but it was not found in {output}!"

View File

@@ -1,27 +1,24 @@
import pytest
from pytest_mock import MockerFixture
from autogpt.commands.file_operations import read_file
from autogpt.config import Config
from tests.integration.challenges.challenge_decorator.challenge_decorator import (
challenge,
)
from tests.integration.challenges.utils import run_interaction_loop
from tests.utils import requires_api_key
from tests.challenges.challenge_decorator.challenge_decorator import challenge
from tests.challenges.utils import get_workspace_path, run_interaction_loop
CYCLE_COUNT = 3
EXPECTED_REVENUES = [["81"], ["81"], ["81", "53", "24", "21", "11", "7", "4", "3", "2"]]
from autogpt.agent import Agent
OUTPUT_LOCATION = "output.txt"
@pytest.mark.vcr
@requires_api_key("OPENAI_API_KEY")
@challenge
@challenge()
def test_information_retrieval_challenge_a(
information_retrieval_agents: Agent,
monkeypatch: pytest.MonkeyPatch,
patched_api_requestor: None,
config: Config,
patched_api_requestor: MockerFixture,
level_to_run: int,
challenge_name: str,
) -> None:
"""
Test the challenge_a function in a given agent by mocking user inputs and checking the output file content.
@@ -30,10 +27,16 @@ def test_information_retrieval_challenge_a(
:param monkeypatch: pytest's monkeypatch utility for modifying builtins.
"""
information_retrieval_agent = information_retrieval_agents[level_to_run - 1]
run_interaction_loop(monkeypatch, information_retrieval_agent, CYCLE_COUNT)
run_interaction_loop(
monkeypatch,
information_retrieval_agent,
CYCLE_COUNT,
challenge_name,
level_to_run,
)
file_path = str(information_retrieval_agent.workspace.get_path("output.txt"))
content = read_file(file_path, config)
file_path = get_workspace_path(information_retrieval_agent, OUTPUT_LOCATION)
content = read_file(file_path, information_retrieval_agent)
expected_revenues = EXPECTED_REVENUES[level_to_run - 1]
for revenue in expected_revenues:
assert (

View File

@@ -1,28 +1,24 @@
import contextlib
import pytest
from pytest_mock import MockerFixture
from autogpt.agent import Agent
from autogpt.commands.file_operations import read_file
from autogpt.config import Config
from tests.integration.challenges.challenge_decorator.challenge_decorator import (
challenge,
)
from tests.integration.challenges.utils import run_interaction_loop
from tests.utils import requires_api_key
from tests.challenges.challenge_decorator.challenge_decorator import challenge
from tests.challenges.utils import get_workspace_path, run_interaction_loop
CYCLE_COUNT = 3
OUTPUT_LOCATION = "2010_nobel_prize_winners.txt"
@pytest.mark.vcr
@requires_api_key("OPENAI_API_KEY")
@challenge
@challenge()
def test_information_retrieval_challenge_b(
get_nobel_prize_agent: Agent,
monkeypatch: pytest.MonkeyPatch,
patched_api_requestor: None,
patched_api_requestor: MockerFixture,
level_to_run: int,
config: Config,
challenge_name: str,
) -> None:
"""
Test the challenge_b function in a given agent by mocking user inputs and checking the output file content.
@@ -31,16 +27,19 @@ def test_information_retrieval_challenge_b(
:param monkeypatch: pytest's monkeypatch utility for modifying builtins.
:param patched_api_requestor: APIRequestor Patch to override the openai.api_requestor module for testing.
:param level_to_run: The level to run.
:param config: The config object.
"""
with contextlib.suppress(SystemExit):
run_interaction_loop(monkeypatch, get_nobel_prize_agent, CYCLE_COUNT)
run_interaction_loop(
monkeypatch,
get_nobel_prize_agent,
CYCLE_COUNT,
challenge_name,
level_to_run,
)
file_path = get_workspace_path(get_nobel_prize_agent, OUTPUT_LOCATION)
file_path = str(
get_nobel_prize_agent.workspace.get_path("2010_nobel_prize_winners.txt")
)
content = read_file(file_path, config)
content = read_file(file_path, get_nobel_prize_agent)
assert "Andre Geim" in content, "Expected the file to contain Andre Geim"
assert (
"Konstantin Novoselov" in content

View File

@@ -1,26 +1,23 @@
import pytest
import yaml
from pytest_mock import MockerFixture
from autogpt.agent import Agent
from autogpt.commands.file_operations import read_file
from autogpt.config import Config
from tests.integration.challenges.challenge_decorator.challenge_decorator import (
challenge,
)
from tests.integration.challenges.utils import run_interaction_loop
from tests.utils import requires_api_key
from tests.challenges.challenge_decorator.challenge_decorator import challenge
from tests.challenges.utils import get_workspace_path, run_interaction_loop
CYCLE_COUNT = 3
OUTPUT_LOCATION = "kube.yaml"
@pytest.mark.vcr
@requires_api_key("OPENAI_API_KEY")
@challenge
@challenge()
def test_kubernetes_template_challenge_a(
kubernetes_agent: Agent,
monkeypatch: pytest.MonkeyPatch,
config: Config,
patched_api_requestor: MockerFixture,
level_to_run: int,
challenge_name: str,
) -> None:
"""
Test the challenge_a function in a given agent by mocking user inputs
@@ -29,13 +26,14 @@ def test_kubernetes_template_challenge_a(
Args:
kubernetes_agent (Agent)
monkeypatch (pytest.MonkeyPatch)
config (Config)
level_to_run (int)
"""
run_interaction_loop(monkeypatch, kubernetes_agent, CYCLE_COUNT)
run_interaction_loop(
monkeypatch, kubernetes_agent, CYCLE_COUNT, challenge_name, level_to_run
)
file_path = str(kubernetes_agent.workspace.get_path("kube.yaml"))
content = read_file(file_path, config)
file_path = get_workspace_path(kubernetes_agent, OUTPUT_LOCATION)
content = read_file(file_path, kubernetes_agent)
for word in ["apiVersion", "kind", "metadata", "spec"]:
assert word in content, f"Expected the file to contain {word}"

View File

@@ -1,24 +1,21 @@
import pytest
from pytest_mock import MockerFixture
from autogpt.agent import Agent
from autogpt.commands.file_operations import read_file, write_to_file
from autogpt.config import Config
from tests.integration.challenges.challenge_decorator.challenge_decorator import (
challenge,
)
from tests.integration.challenges.utils import run_interaction_loop
from tests.utils import requires_api_key
from tests.challenges.challenge_decorator.challenge_decorator import challenge
from tests.challenges.utils import get_workspace_path, run_interaction_loop
OUTPUT_LOCATION = "output.txt"
@pytest.mark.vcr
@requires_api_key("OPENAI_API_KEY")
@challenge
@challenge()
def test_memory_challenge_a(
memory_management_agent: Agent,
patched_api_requestor: None,
patched_api_requestor: MockerFixture,
monkeypatch: pytest.MonkeyPatch,
config: Config,
level_to_run: int,
challenge_name: str,
) -> None:
"""
The agent reads a file containing a task_id. Then, it reads a series of other files.
@@ -27,17 +24,21 @@ def test_memory_challenge_a(
memory_management_agent (Agent)
patched_api_requestor (MockerFixture)
monkeypatch (pytest.MonkeyPatch)
config (Config)
level_to_run (int)
"""
task_id = "2314"
create_instructions_files(memory_management_agent, level_to_run, task_id, config)
create_instructions_files(memory_management_agent, level_to_run, task_id)
run_interaction_loop(monkeypatch, memory_management_agent, level_to_run + 2)
run_interaction_loop(
monkeypatch,
memory_management_agent,
level_to_run + 2,
challenge_name,
level_to_run,
)
file_path = str(memory_management_agent.workspace.get_path("output.txt"))
content = read_file(file_path, config)
file_path = get_workspace_path(memory_management_agent, OUTPUT_LOCATION)
content = read_file(file_path, memory_management_agent)
assert task_id in content, f"Expected the file to contain {task_id}"
@@ -45,7 +46,6 @@ def create_instructions_files(
memory_management_agent: Agent,
num_files: int,
task_id: str,
config: Config,
base_filename: str = "instructions_",
) -> None:
"""
@@ -59,8 +59,8 @@ def create_instructions_files(
for i in range(1, num_files + 1):
content = generate_content(i, task_id, base_filename, num_files)
file_name = f"{base_filename}{i}.txt"
file_path = str(memory_management_agent.workspace.get_path(file_name))
write_to_file(file_path, content, config)
file_path = get_workspace_path(memory_management_agent, file_name)
write_to_file(file_path, content, memory_management_agent)
def generate_content(

View File

@@ -1,26 +1,26 @@
import pytest
from pytest_mock import MockerFixture
from autogpt.agent import Agent
from autogpt.commands.file_operations import read_file, write_to_file
from autogpt.config import Config
from tests.integration.challenges.challenge_decorator.challenge_decorator import (
challenge,
from tests.challenges.challenge_decorator.challenge_decorator import challenge
from tests.challenges.utils import (
generate_noise,
get_workspace_path,
run_interaction_loop,
)
from tests.integration.challenges.utils import generate_noise, run_interaction_loop
from tests.utils import requires_api_key
NOISE = 1000
OUTPUT_LOCATION = "output.txt"
@pytest.mark.vcr
@requires_api_key("OPENAI_API_KEY")
@challenge
@challenge()
def test_memory_challenge_b(
memory_management_agent: Agent,
patched_api_requestor: None,
patched_api_requestor: MockerFixture,
monkeypatch: pytest.MonkeyPatch,
config: Config,
level_to_run: int,
challenge_name: str,
) -> None:
"""
The agent reads a series of files, each containing a task_id and noise. After reading 'n' files,
@@ -33,12 +33,18 @@ def test_memory_challenge_b(
level_to_run (int)
"""
task_ids = [str(i * 1111) for i in range(1, level_to_run + 1)]
create_instructions_files(memory_management_agent, level_to_run, task_ids, config)
create_instructions_files(memory_management_agent, level_to_run, task_ids)
run_interaction_loop(monkeypatch, memory_management_agent, level_to_run + 2)
run_interaction_loop(
monkeypatch,
memory_management_agent,
level_to_run + 2,
challenge_name,
level_to_run,
)
file_path = str(memory_management_agent.workspace.get_path("output.txt"))
content = read_file(file_path, config)
file_path = get_workspace_path(memory_management_agent, OUTPUT_LOCATION)
content = read_file(file_path, memory_management_agent)
for task_id in task_ids:
assert task_id in content, f"Expected the file to contain {task_id}"
@@ -47,7 +53,6 @@ def create_instructions_files(
memory_management_agent: Agent,
level: int,
task_ids: list,
config: Config,
base_filename: str = "instructions_",
) -> None:
"""
@@ -62,8 +67,9 @@ def create_instructions_files(
for i in range(1, level + 1):
content = generate_content(i, task_ids, base_filename, level)
file_name = f"{base_filename}{i}.txt"
file_path = str(memory_management_agent.workspace.get_path(file_name))
write_to_file(file_path, content, config)
file_path = get_workspace_path(memory_management_agent, file_name)
write_to_file(file_path, content, memory_management_agent)
def generate_content(index: int, task_ids: list, base_filename: str, level: int) -> str:

View File

@@ -1,27 +1,26 @@
import pytest
from pytest_mock import MockerFixture
from autogpt.agent import Agent
from autogpt.commands.file_operations import read_file, write_to_file
from autogpt.config import Config
from tests.integration.challenges.challenge_decorator.challenge_decorator import (
challenge,
from tests.challenges.challenge_decorator.challenge_decorator import challenge
from tests.challenges.utils import (
generate_noise,
get_workspace_path,
run_interaction_loop,
)
from tests.integration.challenges.utils import generate_noise, run_interaction_loop
from tests.utils import requires_api_key
NOISE = 1000
NOISE = 1200
OUTPUT_LOCATION = "output.txt"
# @pytest.mark.vcr
@pytest.mark.vcr
@requires_api_key("OPENAI_API_KEY")
@challenge
@challenge()
def test_memory_challenge_c(
memory_management_agent: Agent,
patched_api_requestor: None,
patched_api_requestor: MockerFixture,
monkeypatch: pytest.MonkeyPatch,
config: Config,
level_to_run: int,
challenge_name: str,
) -> None:
"""
Instead of reading task Ids from files as with the previous challenges, the agent now must remember
@@ -32,31 +31,37 @@ def test_memory_challenge_c(
memory_management_agent (Agent)
patched_api_requestor (MockerFixture)
monkeypatch (pytest.MonkeyPatch)
config (Config)
level_to_run (int)
"""
silly_phrases = [
"The purple elephant danced on a rainbow while eating a taco.",
"The sneaky toaster stole my socks and ran away to Hawaii.",
"My pet rock sings better than Beyoncé on Tuesdays.",
"The giant hamster rode a unicycle through the crowded mall.",
"The talking tree gave me a high-five and then flew away.",
"I have a collection of invisible hats that I wear on special occasions.",
"The flying spaghetti monster stole my sandwich and left a note saying 'thanks for the snack!'",
"My imaginary friend is a dragon who loves to play video games.",
"I once saw a cloud shaped like a giant chicken eating a pizza.",
"The ninja unicorn disguised itself as a potted plant and infiltrated the office.",
"The purple elephant danced on a rainbow while eating a taco",
"The sneaky toaster stole my socks and ran away to Hawaii",
"My pet rock sings better than Beyoncé on Tuesdays",
"The giant hamster rode a unicycle through the crowded mall",
"The talking tree gave me a high-five and then flew away",
"I have a collection of invisible hats that I wear on special occasions",
"The flying spaghetti monster stole my sandwich and left a note saying 'thanks for the snack'",
"My imaginary friend is a dragon who loves to play video games",
"I once saw a cloud shaped like a giant chicken eating a pizza",
"The ninja unicorn disguised itself as a potted plant and infiltrated the office",
]
level_silly_phrases = silly_phrases[:level_to_run]
create_instructions_files(
memory_management_agent, level_to_run, level_silly_phrases, config=config
memory_management_agent,
level_to_run,
level_silly_phrases,
)
run_interaction_loop(monkeypatch, memory_management_agent, level_to_run + 2)
file_path = str(memory_management_agent.workspace.get_path("output.txt"))
content = read_file(file_path, config)
run_interaction_loop(
monkeypatch,
memory_management_agent,
level_to_run + 2,
challenge_name,
level_to_run,
)
file_path = get_workspace_path(memory_management_agent, OUTPUT_LOCATION)
content = read_file(file_path, agent=memory_management_agent)
for phrase in level_silly_phrases:
assert phrase in content, f"Expected the file to contain {phrase}"
@@ -65,7 +70,6 @@ def create_instructions_files(
memory_management_agent: Agent,
level: int,
task_ids: list,
config: Config,
base_filename: str = "instructions_",
) -> None:
"""
@@ -80,8 +84,8 @@ def create_instructions_files(
for i in range(1, level + 1):
content = generate_content(i, task_ids, base_filename, level)
file_name = f"{base_filename}{i}.txt"
file_path = str(memory_management_agent.workspace.get_path(file_name))
write_to_file(file_path, content, config)
file_path = get_workspace_path(memory_management_agent, file_name)
write_to_file(file_path, content, memory_management_agent)
def generate_content(

View File

@@ -0,0 +1,241 @@
import json
from typing import Dict
import pytest
from pytest_mock import MockerFixture
from autogpt.agent import Agent
from autogpt.commands.file_operations import read_file, write_to_file
from tests.challenges.challenge_decorator.challenge_decorator import challenge
from tests.challenges.utils import get_workspace_path, run_interaction_loop
LEVEL_CURRENTLY_BEATEN = 1
MAX_LEVEL = 5
OUTPUT_LOCATION = "output.txt"
@challenge()
def test_memory_challenge_d(
memory_management_agent: Agent,
patched_api_requestor: MockerFixture,
monkeypatch: pytest.MonkeyPatch,
level_to_run: int,
challenge_name: str,
) -> None:
"""
The agent is given a series of events and must remember the respective beliefs of the characters.
Args:
memory_management_agent (Agent)
user_selected_level (int)
"""
sally_anne_test_phrases = [
"Sally has a marble (marble A) and she puts it in her basket (basket S), then leaves the room. Anne moves marble A from Sally's basket (basket S) to her own basket (basket A).",
"Sally gives a new marble (marble B) to Bob who is outside with her. Bob goes into the room and places marble B into Anne's basket (basket A). Anne tells Bob to tell Sally that he lost the marble b. Bob leaves the room and speaks to Sally about the marble B. Meanwhile, after Bob left the room, Anne moves marble A into the green box, but tells Charlie to tell Sally that marble A is under the sofa. Charlie leaves the room and speaks to Sally about the marble A as instructed by Anne.",
"Sally gives a new marble (marble C) to Charlie who is outside with her. Charlie enters the room and exchanges marble C with marble B in Anne's basket (basket A). Anne tells Charlie to tell Sally that he put marble C into the red box. Charlie leaves the room and speak to Sally about marble C as instructed by Anne. Meanwhile, after Charlie leaves the room, Bob enters into the room and moves marble A from the green box to under the sofa, but tells Anne to tell Sally that marble A is in the green box. Anne leaves the room and speak to Sally about the marble A as instructed by Bob",
"Sally gives a new marble (marble D) to Anne. Anne gives the marble to Charlie. Charlie enters the room and gives marble D to Bob. Bob tells Charlie to tell Sally that he put marble D under the sofa. Bob put marble D under the sofa Charlie leaves the room and speaks to Sally about marble D. Meanwhile, after Charlie leaves the room, Bob takes marble A from under the sofa and places it in the blue box.",
"Sally gives a new marble (marble E) to Charlie who is outside with her. Charlie enters the room and places marble E in the red box. Anne, who is already in the room, takes marble E from the red box, and hides it under the sofa. Then Anne leaves the room and tells Sally that marble E is in the green box. Meanwhile, after Anne leaves the room, Charlie who re-enters the room takes marble D from under the sofa and places it in his own basket (basket C).",
]
level_sally_anne_test_phrases = sally_anne_test_phrases[:level_to_run]
create_instructions_files(
memory_management_agent, level_to_run, level_sally_anne_test_phrases
)
run_interaction_loop(
monkeypatch,
memory_management_agent,
level_to_run + 2,
challenge_name,
level_to_run,
)
file_path = get_workspace_path(memory_management_agent, OUTPUT_LOCATION)
content = read_file(file_path, memory_management_agent)
check_beliefs(content, level_to_run)
def check_beliefs(content: str, level: int) -> None:
# Define the expected beliefs for each level
expected_beliefs = {
1: {
"Sally": {
"marble A": "basket S",
},
"Anne": {
"marble A": "basket A",
},
},
2: {
"Sally": {
"marble A": "sofa", # Because Charlie told her
"marble B": "lost", # Because Bob told her
},
"Anne": {
"marble A": "green box", # Because she moved it there
"marble B": "basket A", # Because Bob put it there and she was in the room
},
"Bob": {
"marble B": "basket A", # Last place he put it
},
"Charlie": {
"marble A": "sofa", # Because Anne told him to tell Sally so
},
},
3: {
"Sally": {
"marble A": "green box", # Because Anne told her
"marble C": "red box", # Because Charlie told her
},
"Anne": {
"marble A": "sofa", # Because Bob moved it there and told her
"marble B": "basket A", # Because Charlie exchanged marble C with marble B in her basket
"marble C": "basket A", # Because Charlie exchanged marble C with marble B in her basket
},
"Bob": {
"marble A": "sofa", # Because he moved it there
"marble B": "basket A",
# Because Charlie exchanged marble C with marble B in Anne's basket, and he was in the room
"marble C": "basket A",
# Because Charlie exchanged marble C with marble B in Anne's basket, and he was in the room
},
"Charlie": {
"marble A": "sofa", # Last place he knew it was
"marble B": "basket A", # Because he exchanged marble C with marble B in Anne's basket
"marble C": "red box", # Because Anne told him to tell Sally so
},
},
4: {
"Sally": {
"marble A": "green box", # Because Anne told her in the last conversation
"marble C": "red box", # Because Charlie told her
"marble D": "sofa", # Because Charlie told her
},
"Anne": {
"marble A": "blue box", # Because Bob moved it there, and she was not in the room to see
"marble B": "basket A", # Last place she knew it was
"marble C": "basket A", # Last place she knew it was
"marble D": "sofa", # Because Bob moved it there, and she was in the room to see
},
"Bob": {
"marble A": "blue box", # Because he moved it there
"marble B": "basket A", # Last place he knew it was
"marble C": "basket A", # Last place he knew it was
"marble D": "sofa", # Because he moved it there
},
"Charlie": {
"marble A": "sofa", # Last place he knew it was
"marble B": "basket A", # Last place he knew it was
"marble C": "red box", # Last place he knew it was
"marble D": "sofa", # Because Bob told him to tell Sally so
},
},
5: {
"Sally": {
"marble A": "green box", # Because Anne told her in the last level
"marble C": "red box", # Because Charlie told her
"marble D": "sofa", # Because Charlie told her
"marble E": "green box", # Because Anne told her
},
"Anne": {
"marble A": "blue box", # Last place she knew it was
"marble B": "basket A", # Last place she knew it was
"marble C": "basket A", # Last place she knew it was
"marble D": "basket C", # Last place she knew it was
"marble E": "sofa", # Because she moved it there
},
"Charlie": {
"marble A": "blue box", # Last place he knew it was
"marble B": "basket A", # Last place he knew it was
"marble C": "basket A", # Last place he knew it was
"marble D": "basket C", # Because he moved it there
"marble E": "red box", # Last place he knew it was
},
"Bob": {
"marble A": "blue box", # Last place he knew it was
"marble C": "red box", # Last place he knew it was
"marble D": "sofa", # Last place he knew it was
},
},
}
# Extract the beliefs from the AI's response
ai_beliefs = extract_beliefs(content)
# Check the AI's beliefs against the expected beliefs
for character, belief in expected_beliefs[level].items():
for marble, location in belief.items():
ai_belief = ai_beliefs.get(character, {}).get(marble, "")
assert (
location in ai_belief
), f"For {character}'s {marble}, expected '{location}' to be in '{ai_belief}'"
def extract_beliefs(content: str) -> Dict[str, Dict[str, str]]:
"""Extract the beliefs of each character from the AI's output."""
# Parse the JSON content
content_dict = json.loads(content)
beliefs = content_dict.get("beliefs", {})
return beliefs
def create_instructions_files(
memory_management_agent: Agent,
level: int,
test_phrases: list,
base_filename: str = "instructions_",
) -> None:
"""
Creates a series of instructions files for the memory challenge.
Args:
level:
memory_management_agent (Agent)
test_phrases (list)
base_filename (str, optional)
"""
for i in range(1, level + 1):
content = generate_content(i, test_phrases, base_filename, level)
file_name = f"{base_filename}{i}.txt"
file_path = get_workspace_path(memory_management_agent, file_name)
write_to_file(file_path, content, memory_management_agent)
def generate_content(
index: int, test_phrases: list, base_filename: str, level: int
) -> str:
"""
Args:
index: int
test_phrases: list
base_filename: str
level: int
Returns: str
"""
phrase = test_phrases[index - 1]
if index != level:
return f"The current event is '{phrase}'.\nRead the file {base_filename}{index + 1}.txt using the read_file command."
else:
return f"""
This event is '{phrase}'
rules:
1. Everyone inside the room see and know what are the actions of the people inside the room.
2. Everyone outside the room do not see and do not know the actions of the people inside the room.
3. Only write about the marbles that are present in the level
4. Marbles names are marble A, marble B, marble C, marble D, ...
5. Believed_location_of_the_specific_marble describes where is the marble like drawer, basket S, sofa, ...
6. Do not use expression like <Anne's basket> use <basket A> instead.
7. All characters believe what they're being told by other characters.
Instructions:
I) Write the following information in the file output.txt in JSON format:
1. The respective beliefs of the characters (which means where every marble is according to character x, y, z. Character x should say where it believes every marble it is aware exist is)
The format should be as follows:
{{
"beliefs": {{
"<character_name>": {{
"<marble_name>": "<believed_location_of_the_specific_marble>",
...
}},
...
}},
}}
Example of output (only use this to understand and learn how to use the format. Nothing else):
{{"beliefs": {{"Sally": {{"marble A": "basket A"}}, "Bob": {{"marble B": "basket S"}}, "Anne": {{"marble A": "green box"}}, "Charlie": {{"marble B": "sofa"}}}}
II) The file output.txt has not been created yet. You need to create it. After that, use the task_complete command.
"""

76
tests/challenges/utils.py Normal file
View File

@@ -0,0 +1,76 @@
import contextlib
import random
import shutil
from pathlib import Path
from typing import Any, Generator
import pytest
from autogpt.agent import Agent
from autogpt.log_cycle.log_cycle import LogCycleHandler
def generate_noise(noise_size: int) -> str:
random.seed(42)
return "".join(
random.choices(
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789",
k=noise_size,
)
)
def setup_mock_input(monkeypatch: pytest.MonkeyPatch, cycle_count: int) -> None:
"""
Sets up the mock input for testing.
:param monkeypatch: pytest's monkeypatch utility for modifying builtins.
:param cycle_count: The number of cycles to mock.
"""
input_sequence = ["y"] * (cycle_count) + ["EXIT"]
def input_generator() -> Generator[str, None, None]:
"""
Creates a generator that yields input strings from the given sequence.
"""
yield from input_sequence
gen = input_generator()
monkeypatch.setattr("autogpt.utils.session.prompt", lambda _: next(gen))
def run_interaction_loop(
monkeypatch: pytest.MonkeyPatch,
agent: Agent,
cycle_count: int,
challenge_name: str,
level_to_run: int,
) -> None:
setup_mock_input(monkeypatch, cycle_count)
setup_mock_log_cycle_agent_name(monkeypatch, challenge_name, level_to_run)
with contextlib.suppress(SystemExit):
agent.start_interaction_loop()
def setup_mock_log_cycle_agent_name(
monkeypatch: pytest.MonkeyPatch, challenge_name: str, level_to_run: int
) -> None:
def mock_get_agent_short_name(*args: Any, **kwargs: Any) -> str:
return f"{challenge_name}_level_{level_to_run}"
monkeypatch.setattr(
LogCycleHandler, "get_agent_short_name", mock_get_agent_short_name
)
def get_workspace_path(agent: Agent, file_name: str) -> str:
return str(agent.workspace.get_path(file_name))
def copy_file_into_workspace(
agent: Agent, directory_path: Path, file_path: str
) -> None:
workspace_code_file_path = get_workspace_path(agent, file_path)
code_file_path = directory_path / file_path
shutil.copy(code_file_path, workspace_code_file_path)

View File

@@ -26,12 +26,8 @@ def recursive_sort_dict(data: dict) -> dict:
cwd = os.getcwd() # get current working directory
new_score_filename_pattern = os.path.join(
cwd, "tests/integration/challenges/new_score_*.json"
)
current_score_filename = os.path.join(
cwd, "tests/integration/challenges/current_score.json"
)
new_score_filename_pattern = os.path.join(cwd, "tests/challenges/new_score_*.json")
current_score_filename = os.path.join(cwd, "tests/challenges/current_score.json")
merged_data: Dict[str, Any] = {}
for filename in glob.glob(new_score_filename_pattern):
@@ -44,4 +40,5 @@ for filename in glob.glob(new_score_filename_pattern):
sorted_data = recursive_sort_dict(merged_data)
with open(current_score_filename, "w") as f_current:
json.dump(sorted_data, f_current, indent=4)
json_data = json.dumps(sorted_data, indent=4)
f_current.write(json_data + "\n")

View File

@@ -1,22 +1,26 @@
import os
from pathlib import Path
from tempfile import TemporaryDirectory
import pytest
import yaml
from pytest_mock import MockerFixture
from autogpt.agent.agent import Agent
from autogpt.commands.command import CommandRegistry
from autogpt.config.ai_config import AIConfig
from autogpt.config.config import Config
from autogpt.llm.api_manager import ApiManager
from autogpt.logs import TypingConsoleHandler
from autogpt.memory.vector import get_memory
from autogpt.prompts.prompt import DEFAULT_TRIGGERING_PROMPT
from autogpt.workspace import Workspace
pytest_plugins = ["tests.integration.agent_factory", "tests.integration.memory.utils"]
PROXY = os.environ.get("PROXY")
@pytest.fixture()
def vcr_cassette_dir(request):
test_name = os.path.splitext(request.node.name)[0]
return os.path.join("tests/Auto-GPT-test-cassettes", test_name)
pytest_plugins = [
"tests.integration.agent_factory",
"tests.integration.memory.utils",
"tests.vcr",
]
@pytest.fixture()
@@ -30,9 +34,25 @@ def workspace(workspace_root: Path) -> Workspace:
return Workspace(workspace_root, restrict_to_workspace=True)
@pytest.fixture
def temp_plugins_config_file():
"""Create a plugins_config.yaml file in a temp directory so that it doesn't mess with existing ones"""
config_directory = TemporaryDirectory()
config_file = os.path.join(config_directory.name, "plugins_config.yaml")
with open(config_file, "w+") as f:
f.write(yaml.dump({}))
yield config_file
@pytest.fixture()
def config(mocker: MockerFixture, workspace: Workspace) -> Config:
def config(
temp_plugins_config_file: str, mocker: MockerFixture, workspace: Workspace
) -> Config:
config = Config()
config.plugins_dir = "tests/unit/data/test_plugins"
config.plugins_config_file = temp_plugins_config_file
config.load_plugins_config()
# Do a little setup and teardown since the config object is a singleton
mocker.patch.multiple(
@@ -48,3 +68,44 @@ def api_manager() -> ApiManager:
if ApiManager in ApiManager._instances:
del ApiManager._instances[ApiManager]
return ApiManager()
@pytest.fixture(autouse=True)
def patch_emit(monkeypatch):
# convert plain_output to a boolean
if bool(os.environ.get("PLAIN_OUTPUT")):
def quick_emit(self, record: str):
print(self.format(record))
monkeypatch.setattr(TypingConsoleHandler, "emit", quick_emit)
@pytest.fixture
def agent(config: Config, workspace: Workspace) -> Agent:
ai_config = AIConfig(
ai_name="Base",
ai_role="A base AI",
ai_goals=[],
)
command_registry = CommandRegistry()
ai_config.command_registry = command_registry
config.set_memory_backend("json_file")
memory_json_file = get_memory(config, init=True)
system_prompt = ai_config.construct_full_prompt()
return Agent(
ai_name=ai_config.ai_name,
memory=memory_json_file,
command_registry=command_registry,
ai_config=ai_config,
config=config,
next_action_count=0,
system_prompt=system_prompt,
triggering_prompt=DEFAULT_TRIGGERING_PROMPT,
workspace_directory=workspace.root,
)

View File

@@ -59,7 +59,8 @@ def browser_agent(agent_test_config, memory_none: NoMemory, workspace: Workspace
ai_name="",
memory=memory_none,
command_registry=command_registry,
config=ai_config,
ai_config=ai_config,
config=agent_test_config,
next_action_count=0,
system_prompt=system_prompt,
triggering_prompt=DEFAULT_TRIGGERING_PROMPT,
@@ -70,49 +71,45 @@ def browser_agent(agent_test_config, memory_none: NoMemory, workspace: Workspace
@pytest.fixture
def writer_agent(agent_test_config, memory_none: NoMemory, workspace: Workspace):
command_registry = CommandRegistry()
command_registry.import_commands("autogpt.commands.file_operations")
command_registry.import_commands("autogpt.app")
command_registry.import_commands("autogpt.commands.task_statuses")
def file_system_agents(
agent_test_config, memory_json_file: NoMemory, workspace: Workspace
):
agents = []
command_registry = get_command_registry(agent_test_config)
ai_config = AIConfig(
ai_name="write_to_file-GPT",
ai_role="an AI designed to use the write_to_file command to write 'Hello World' into a file named \"hello_world.txt\" and then use the task_complete command to complete the task.",
ai_goals=[
"Use the write_to_file command to write 'Hello World' into a file named \"hello_world.txt\".",
"Use the task_complete command to complete the task.",
"Do not use any other commands.",
],
)
ai_config.command_registry = command_registry
ai_goals = [
"Write 'Hello World' into a file named \"hello_world.txt\".",
'Write \'Hello World\' into 2 files named "hello_world_1.txt"and "hello_world_2.txt".',
]
triggering_prompt = (
"Determine which next command to use, and respond using the"
" format specified above:"
)
system_prompt = ai_config.construct_full_prompt()
agent = Agent(
ai_name="",
memory=memory_none,
command_registry=command_registry,
config=ai_config,
next_action_count=0,
system_prompt=system_prompt,
triggering_prompt=triggering_prompt,
workspace_directory=workspace.root,
)
return agent
for ai_goal in ai_goals:
ai_config = AIConfig(
ai_name="File System Agent",
ai_role="an AI designed to manage a file system.",
ai_goals=[ai_goal],
)
ai_config.command_registry = command_registry
system_prompt = ai_config.construct_full_prompt()
Config().set_continuous_mode(False)
agents.append(
Agent(
ai_name="File System Agent",
memory=memory_json_file,
command_registry=command_registry,
ai_config=ai_config,
config=agent_test_config,
next_action_count=0,
system_prompt=system_prompt,
triggering_prompt=DEFAULT_TRIGGERING_PROMPT,
workspace_directory=workspace.root,
)
)
return agents
@pytest.fixture
def memory_management_agent(agent_test_config, memory_json_file, workspace: Workspace):
command_registry = CommandRegistry()
command_registry.import_commands("autogpt.commands.file_operations")
command_registry.import_commands("autogpt.app")
command_registry.import_commands("autogpt.commands.task_statuses")
command_registry = get_command_registry(agent_test_config)
ai_config = AIConfig(
ai_name="Follow-Instructions-GPT",
@@ -127,10 +124,11 @@ def memory_management_agent(agent_test_config, memory_json_file, workspace: Work
system_prompt = ai_config.construct_full_prompt()
agent = Agent(
ai_name="",
ai_name="Follow-Instructions-GPT",
memory=memory_json_file,
command_registry=command_registry,
config=ai_config,
ai_config=ai_config,
config=agent_test_config,
next_action_count=0,
system_prompt=system_prompt,
triggering_prompt=DEFAULT_TRIGGERING_PROMPT,
@@ -145,19 +143,12 @@ def information_retrieval_agents(
agent_test_config, memory_json_file, workspace: Workspace
):
agents = []
command_registry = CommandRegistry()
enabled_command_categories = [
x
for x in COMMAND_CATEGORIES
if x not in agent_test_config.disabled_command_categories
]
command_registry = get_command_registry(agent_test_config)
for command_category in enabled_command_categories:
command_registry.import_commands(command_category)
ai_goals = [
"Write to a file called output.txt tesla's revenue in 2022 after searching for 'tesla revenue 2022'.",
"Write to a file called output.txt tesla's revenue in 2022.",
"Write to a file called output.txt tesla's revenue every year since its creation.",
"Write to a file called output.txt containing tesla's revenue in 2022 after searching for 'tesla revenue 2022'.",
"Write to a file called output.txt containing tesla's revenue in 2022.",
"Write to a file called output.txt containing tesla's revenue every year since its creation.",
]
for ai_goal in ai_goals:
ai_config = AIConfig(
@@ -173,7 +164,8 @@ def information_retrieval_agents(
ai_name="Information Retrieval Agent",
memory=memory_json_file,
command_registry=command_registry,
config=ai_config,
ai_config=ai_config,
config=agent_test_config,
next_action_count=0,
system_prompt=system_prompt,
triggering_prompt=DEFAULT_TRIGGERING_PROMPT,
@@ -184,7 +176,9 @@ def information_retrieval_agents(
@pytest.fixture
def kubernetes_agent(memory_json_file, workspace: Workspace):
def kubernetes_agent(
agent_test_config: Config, memory_json_file: NoMemory, workspace: Workspace
) -> Agent:
command_registry = CommandRegistry()
command_registry.import_commands("autogpt.commands.file_operations")
command_registry.import_commands("autogpt.app")
@@ -205,7 +199,8 @@ def kubernetes_agent(memory_json_file, workspace: Workspace):
ai_name="Kubernetes-Demo",
memory=memory_json_file,
command_registry=command_registry,
config=ai_config,
ai_config=ai_config,
config=agent_test_config,
next_action_count=0,
system_prompt=system_prompt,
triggering_prompt=DEFAULT_TRIGGERING_PROMPT,
@@ -238,7 +233,8 @@ def get_nobel_prize_agent(agent_test_config, memory_json_file, workspace: Worksp
ai_name="Get-PhysicsNobelPrize",
memory=memory_json_file,
command_registry=command_registry,
config=ai_config,
ai_config=ai_config,
config=agent_test_config,
next_action_count=0,
system_prompt=system_prompt,
triggering_prompt=DEFAULT_TRIGGERING_PROMPT,
@@ -249,38 +245,57 @@ def get_nobel_prize_agent(agent_test_config, memory_json_file, workspace: Worksp
@pytest.fixture
def debug_code_agent(agent_test_config, memory_json_file, workspace: Workspace):
command_registry = CommandRegistry()
command_registry.import_commands("autogpt.commands.file_operations")
command_registry.import_commands("autogpt.commands.execute_code")
command_registry.import_commands("autogpt.commands.improve_code")
command_registry.import_commands("autogpt.app")
command_registry.import_commands("autogpt.commands.task_statuses")
ai_config = AIConfig(
ai_name="Debug Code Agent",
ai_role="an autonomous agent that specializes in debugging python code",
ai_goals=[
"1-Run the code in the file named 'code.py' using the execute_code command.",
"2-Read code.py to understand why the code is not working as expected.",
"3-Modify code.py to fix the error.",
"Repeat step 1, 2 and 3 until the code is working as expected. When you're done use the task_complete command.",
"Do not use any other commands than execute_python_file and write_file",
def debug_code_agents(agent_test_config, memory_json_file, workspace: Workspace):
agents = []
goals = [
[
"1- Run test.py using the execute_python_file command.",
"2- Read code.py using the read_file command.",
"3- Modify code.py using the write_to_file command."
"Repeat step 1, 2 and 3 until test.py runs without errors.",
],
)
ai_config.command_registry = command_registry
[
"1- Run test.py.",
"2- Read code.py.",
"3- Modify code.py."
"Repeat step 1, 2 and 3 until test.py runs without errors.",
],
["1- Make test.py run without errors."],
]
system_prompt = ai_config.construct_full_prompt()
Config().set_continuous_mode(False)
agent = Agent(
ai_name="Debug Code Agent",
memory=memory_json_file,
command_registry=command_registry,
config=ai_config,
next_action_count=0,
system_prompt=system_prompt,
triggering_prompt=DEFAULT_TRIGGERING_PROMPT,
workspace_directory=workspace.root,
)
for goal in goals:
ai_config = AIConfig(
ai_name="Debug Code Agent",
ai_role="an autonomous agent that specializes in debugging python code",
ai_goals=goal,
)
command_registry = get_command_registry(agent_test_config)
ai_config.command_registry = command_registry
system_prompt = ai_config.construct_full_prompt()
Config().set_continuous_mode(False)
agents.append(
Agent(
ai_name="Debug Code Agent",
memory=memory_json_file,
command_registry=command_registry,
ai_config=ai_config,
config=agent_test_config,
next_action_count=0,
system_prompt=system_prompt,
triggering_prompt=DEFAULT_TRIGGERING_PROMPT,
workspace_directory=workspace.root,
)
)
return agents
return agent
def get_command_registry(agent_test_config):
command_registry = CommandRegistry()
enabled_command_categories = [
x
for x in COMMAND_CATEGORIES
if x not in agent_test_config.disabled_command_categories
]
for command_category in enabled_command_categories:
command_registry.import_commands(command_category)
return command_registry

View File

@@ -1,29 +0,0 @@
import pytest
from autogpt.agent import Agent
from autogpt.commands.file_operations import read_file
from autogpt.config import Config
from tests.integration.challenges.challenge_decorator.challenge_decorator import (
challenge,
)
from tests.integration.challenges.utils import run_interaction_loop
from tests.utils import requires_api_key
CYCLE_COUNT = 3
@requires_api_key("OPENAI_API_KEY")
@pytest.mark.vcr
@challenge
def test_write_file(
writer_agent: Agent,
patched_api_requestor: None,
monkeypatch: pytest.MonkeyPatch,
config: Config,
level_to_run: int,
) -> None:
file_path = str(writer_agent.workspace.get_path("hello_world.txt"))
run_interaction_loop(monkeypatch, writer_agent, CYCLE_COUNT)
content = read_file(file_path, config)
assert content == "Hello World", f"Expected 'Hello World', got {content}"

View File

@@ -1,73 +0,0 @@
import os
from functools import wraps
from typing import Any, Callable, Optional
import pytest
from tests.integration.challenges.challenge_decorator.challenge import Challenge
from tests.integration.challenges.challenge_decorator.challenge_utils import (
create_challenge,
)
from tests.integration.challenges.challenge_decorator.score_utils import (
get_scores,
update_new_score,
)
MAX_LEVEL_TO_IMPROVE_ON = (
1 # we will attempt to beat 1 level above the current level for now.
)
def challenge(func: Callable[..., Any]) -> Callable[..., None]:
@wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> None:
run_remaining = MAX_LEVEL_TO_IMPROVE_ON if Challenge.BEAT_CHALLENGES else 1
original_error = None
while run_remaining > 0:
current_score, new_score, new_score_location = get_scores()
level_to_run = kwargs["level_to_run"] if "level_to_run" in kwargs else None
challenge = create_challenge(
func, current_score, Challenge.BEAT_CHALLENGES, level_to_run
)
if challenge.level_to_run is not None:
kwargs["level_to_run"] = challenge.level_to_run
try:
func(*args, **kwargs)
challenge.succeeded = True
except AssertionError as err:
original_error = err
challenge.succeeded = False
else:
challenge.skipped = True
if os.environ.get("CI") == "true":
new_max_level_beaten = get_new_max_level_beaten(
challenge, Challenge.BEAT_CHALLENGES
)
update_new_score(
new_score_location, new_score, challenge, new_max_level_beaten
)
if challenge.level_to_run is None:
pytest.skip("This test has not been unlocked yet.")
if not challenge.succeeded:
if Challenge.BEAT_CHALLENGES or challenge.is_new_challenge:
# xfail
pytest.xfail("Challenge failed")
if original_error:
raise original_error
raise AssertionError("Challenge failed")
run_remaining -= 1
return wrapper
def get_new_max_level_beaten(
challenge: Challenge, beat_challenges: bool
) -> Optional[int]:
if challenge.succeeded:
return challenge.level_to_run
if challenge.skipped:
return challenge.max_level_beaten
# Challenge failed
return challenge.max_level_beaten if beat_challenges else None

View File

@@ -1,30 +0,0 @@
# mypy: ignore-errors
# we need a new line at the top of the file to avoid a syntax error
def test_two_sum(nums, target, expected_result):
# These tests are appended to the two_sum file so we can ignore this error for now
result = two_sum(nums, target)
print(result)
assert (
result == expected_result
), f"AssertionError: Expected the output to be {expected_result}"
# test the trivial case with the first two numbers
nums = [2, 7, 11, 15]
target = 9
expected_result = [0, 1]
test_two_sum(nums, target, expected_result)
# test for ability to use zero and the same number twice
nums = [2, 7, 0, 15, 12, 0]
target = 0
expected_result = [2, 5]
test_two_sum(nums, target, expected_result)
# test for first and last index usage and negative numbers
nums = [-6, 7, 11, 4]
target = -2
expected_result = [0, 3]
test_two_sum(nums, target, expected_result)

View File

@@ -1,51 +0,0 @@
from pathlib import Path
import pytest
from pytest_mock import MockerFixture
from autogpt.agent import Agent
from autogpt.commands.execute_code import execute_python_file
from autogpt.commands.file_operations import append_to_file, write_to_file
from autogpt.config import Config
from tests.integration.challenges.challenge_decorator.challenge_decorator import (
challenge,
)
from tests.integration.challenges.utils import run_interaction_loop
from tests.utils import requires_api_key
CYCLE_COUNT = 5
@pytest.mark.vcr
@requires_api_key("OPENAI_API_KEY")
@challenge
def test_debug_code_challenge_a(
debug_code_agent: Agent,
monkeypatch: pytest.MonkeyPatch,
patched_api_requestor: MockerFixture,
config: Config,
level_to_run: int,
) -> None:
"""
Test whether the agent can debug a simple code snippet.
:param debug_code_agent: The agent to test.
:param monkeypatch: pytest's monkeypatch utility for modifying builtins.
:patched_api_requestor: Sends api requests to our API CI pipeline
:config: The config object for the agent.
:level_to_run: The level to run.
"""
file_path = str(debug_code_agent.workspace.get_path("code.py"))
code_file_path = Path(__file__).parent / "data" / "two_sum.py"
test_file_path = Path(__file__).parent / "data" / "two_sum_tests.py"
write_to_file(file_path, code_file_path.read_text(), config)
run_interaction_loop(monkeypatch, debug_code_agent, CYCLE_COUNT)
append_to_file(file_path, test_file_path.read_text(), config)
output = execute_python_file(file_path, config)
assert "error" not in output.lower(), f"Errors found in output: {output}!"

View File

@@ -1,44 +0,0 @@
import contextlib
import random
from typing import Generator
import pytest
from autogpt.agent import Agent
def generate_noise(noise_size: int) -> str:
random.seed(42)
return "".join(
random.choices(
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789",
k=noise_size,
)
)
def setup_mock_input(monkeypatch: pytest.MonkeyPatch, cycle_count: int) -> None:
"""
Sets up the mock input for testing.
:param monkeypatch: pytest's monkeypatch utility for modifying builtins.
:param cycle_count: The number of cycles to mock.
"""
input_sequence = ["y"] * (cycle_count) + ["EXIT"]
def input_generator() -> Generator[str, None, None]:
"""
Creates a generator that yields input strings from the given sequence.
"""
yield from input_sequence
gen = input_generator()
monkeypatch.setattr("builtins.input", lambda _: next(gen))
def run_interaction_loop(
monkeypatch: pytest.MonkeyPatch, agent: Agent, cycle_count: int
) -> None:
setup_mock_input(monkeypatch, cycle_count)
with contextlib.suppress(SystemExit):
agent.start_interaction_loop()

View File

@@ -1,56 +0,0 @@
import os
import openai.api_requestor
import pytest
from pytest_mock import MockerFixture
from tests.conftest import PROXY
from tests.vcr.vcr_filter import before_record_request, before_record_response
BASE_VCR_CONFIG = {
"record_mode": "new_episodes",
"before_record_request": before_record_request,
"before_record_response": before_record_response,
"filter_headers": [
"Authorization",
"X-OpenAI-Client-User-Agent",
"User-Agent",
],
"match_on": ["method", "body"],
}
@pytest.fixture(scope="session")
def vcr_config():
# this fixture is called by the pytest-recording vcr decorator.
return BASE_VCR_CONFIG
def patch_api_base(requestor):
new_api_base = f"{PROXY}/v1"
requestor.api_base = new_api_base
return requestor
@pytest.fixture
def patched_api_requestor(mocker: MockerFixture):
original_init = openai.api_requestor.APIRequestor.__init__
original_validate_headers = openai.api_requestor.APIRequestor._validate_headers
def patched_init(requestor, *args, **kwargs):
original_init(requestor, *args, **kwargs)
patch_api_base(requestor)
def patched_validate_headers(self, supplied_headers):
headers = original_validate_headers(self, supplied_headers)
headers["AGENT-MODE"] = os.environ.get("AGENT_MODE")
headers["AGENT-TYPE"] = os.environ.get("AGENT_TYPE")
return headers
if PROXY:
mocker.patch("openai.api_requestor.APIRequestor.__init__", new=patched_init)
mocker.patch.object(
openai.api_requestor.APIRequestor,
"_validate_headers",
new=patched_validate_headers,
)

View File

@@ -1,32 +0,0 @@
"""Unit tests for the commands module"""
from unittest.mock import MagicMock, patch
import pytest
from autogpt.app import list_agents, start_agent
from tests.utils import requires_api_key
@pytest.mark.vcr
@pytest.mark.integration_test
@requires_api_key("OPENAI_API_KEY")
def test_make_agent(patched_api_requestor, config) -> None:
"""Test that an agent can be created"""
# Use the mock agent manager to avoid creating a real agent
with patch("openai.ChatCompletion.create") as mock:
response = MagicMock()
# del response.error
response.choices[0].messages[0].content = "Test message"
response.usage.prompt_tokens = 1
response.usage.completion_tokens = 1
mock.return_value = response
start_agent(
"Test Agent", "chat", "Hello, how are you?", config, "gpt-3.5-turbo"
)
agents = list_agents(config)
assert "List of agents:\n0: chat" == agents
start_agent(
"Test Agent 2", "write", "Hello, how are you?", config, "gpt-3.5-turbo"
)
agents = list_agents(config)
assert "List of agents:\n0: chat\n1: write" == agents

View File

@@ -1,23 +1,25 @@
import os
import random
import string
import tempfile
from typing import Callable
import pytest
from pytest_mock import MockerFixture
import autogpt.commands.execute_code as sut # system under testing
from autogpt.agent.agent import Agent
from autogpt.config import Config
@pytest.fixture
def config_allow_execute(config: Config, mocker: MockerFixture):
yield mocker.patch.object(config, "execute_local_commands", True)
def random_code(random_string) -> Callable:
return f"print('Hello {random_string}!')"
@pytest.fixture
def python_test_file(config: Config, random_string):
def python_test_file(config: Config, random_code: str) -> Callable:
temp_file = tempfile.NamedTemporaryFile(dir=config.workspace_path, suffix=".py")
temp_file.write(str.encode(f"print('Hello {random_string}!')"))
temp_file.write(str.encode(random_code))
temp_file.flush()
yield temp_file.name
@@ -29,22 +31,98 @@ def random_string():
return "".join(random.choice(string.ascii_lowercase) for _ in range(10))
def test_execute_python_file(python_test_file: str, random_string: str, config):
result: str = sut.execute_python_file(python_test_file, config)
def test_execute_python_file(python_test_file: str, random_string: str, agent: Agent):
result: str = sut.execute_python_file(python_test_file, agent=agent)
assert result.replace("\r", "") == f"Hello {random_string}!\n"
def test_execute_python_file_invalid(config):
def test_execute_python_code(random_code: str, random_string: str, agent: Agent):
ai_name = agent.ai_name
result: str = sut.execute_python_code(random_code, "test_code", agent=agent)
assert result.replace("\r", "") == f"Hello {random_string}!\n"
# Check that the code is stored
destination = os.path.join(
agent.config.workspace_path, ai_name, "executed_code", "test_code.py"
)
with open(destination) as f:
assert f.read() == random_code
def test_execute_python_code_overwrites_file(
random_code: str, random_string: str, agent: Agent
):
ai_name = agent.ai_name
destination = os.path.join(
agent.config.workspace_path, ai_name, "executed_code", "test_code.py"
)
os.makedirs(os.path.dirname(destination), exist_ok=True)
with open(destination, "w+") as f:
f.write("This will be overwritten")
sut.execute_python_code(random_code, "test_code.py", agent=agent)
# Check that the file is updated with the new code
with open(destination) as f:
assert f.read() == random_code
def test_execute_python_file_invalid(agent: Agent):
assert all(
s in sut.execute_python_file("not_python", config).lower()
s in sut.execute_python_file("not_python", agent).lower()
for s in ["error:", "invalid", ".py"]
)
def test_execute_python_file_not_found(agent: Agent):
assert all(
s in sut.execute_python_file("notexist.py", config).lower()
for s in ["error:", "does not exist"]
s in sut.execute_python_file("notexist.py", agent).lower()
for s in [
"python: can't open file 'notexist.py'",
"[errno 2] no such file or directory",
]
)
def test_execute_shell(config_allow_execute, random_string, config):
result = sut.execute_shell(f"echo 'Hello {random_string}!'", config)
def test_execute_shell(random_string: str, agent: Agent):
result = sut.execute_shell(f"echo 'Hello {random_string}!'", agent)
assert f"Hello {random_string}!" in result
def test_execute_shell_local_commands_not_allowed(random_string: str, agent: Agent):
result = sut.execute_shell(f"echo 'Hello {random_string}!'", agent)
assert f"Hello {random_string}!" in result
def test_execute_shell_denylist_should_deny(agent: Agent, random_string: str):
agent.config.shell_denylist = ["echo"]
result = sut.execute_shell(f"echo 'Hello {random_string}!'", agent)
assert "Error:" in result and "not allowed" in result
def test_execute_shell_denylist_should_allow(agent: Agent, random_string: str):
agent.config.shell_denylist = ["cat"]
result = sut.execute_shell(f"echo 'Hello {random_string}!'", agent)
assert "Hello" in result and random_string in result
assert "Error" not in result
def test_execute_shell_allowlist_should_deny(agent: Agent, random_string: str):
agent.config.shell_command_control = sut.ALLOWLIST_CONTROL
agent.config.shell_allowlist = ["cat"]
result = sut.execute_shell(f"echo 'Hello {random_string}!'", agent)
assert "Error:" in result and "not allowed" in result
def test_execute_shell_allowlist_should_allow(agent: Agent, random_string: str):
agent.config.shell_command_control = sut.ALLOWLIST_CONTROL
agent.config.shell_allowlist = ["echo"]
result = sut.execute_shell(f"echo 'Hello {random_string}!'", agent)
assert "Hello" in result and random_string in result
assert "Error" not in result

View File

@@ -6,6 +6,7 @@ from unittest.mock import patch
import pytest
from PIL import Image
from autogpt.agent.agent import Agent
from autogpt.commands.image_gen import generate_image, generate_image_with_sd_webui
from tests.utils import requires_api_key
@@ -18,10 +19,10 @@ def image_size(request):
@requires_api_key("OPENAI_API_KEY")
@pytest.mark.vcr
def test_dalle(config, workspace, image_size, patched_api_requestor):
def test_dalle(agent: Agent, workspace, image_size, patched_api_requestor):
"""Test DALL-E image generation."""
generate_and_validate(
config,
agent,
workspace,
image_provider="dalle",
image_size=image_size,
@@ -36,10 +37,10 @@ def test_dalle(config, workspace, image_size, patched_api_requestor):
"image_model",
["CompVis/stable-diffusion-v1-4", "stabilityai/stable-diffusion-2-1"],
)
def test_huggingface(config, workspace, image_size, image_model):
def test_huggingface(agent: Agent, workspace, image_size, image_model):
"""Test HuggingFace image generation."""
generate_and_validate(
config,
agent,
workspace,
image_provider="huggingface",
image_size=image_size,
@@ -48,10 +49,10 @@ def test_huggingface(config, workspace, image_size, image_model):
@pytest.mark.xfail(reason="SD WebUI call does not work.")
def test_sd_webui(config, workspace, image_size):
def test_sd_webui(agent: Agent, workspace, image_size):
"""Test SD WebUI image generation."""
generate_and_validate(
config,
agent,
workspace,
image_provider="sd_webui",
image_size=image_size,
@@ -59,11 +60,11 @@ def test_sd_webui(config, workspace, image_size):
@pytest.mark.xfail(reason="SD WebUI call does not work.")
def test_sd_webui_negative_prompt(config, workspace, image_size):
def test_sd_webui_negative_prompt(agent: Agent, workspace, image_size):
gen_image = functools.partial(
generate_image_with_sd_webui,
prompt="astronaut riding a horse",
config=config,
agent=agent,
size=image_size,
extra={"seed": 123},
)
@@ -87,7 +88,7 @@ def lst(txt):
def generate_and_validate(
config,
agent: Agent,
workspace,
image_size,
image_provider,
@@ -95,11 +96,11 @@ def generate_and_validate(
**kwargs,
):
"""Generate an image and validate the output."""
config.image_provider = image_provider
config.huggingface_image_model = hugging_face_image_model
agent.config.image_provider = image_provider
agent.config.huggingface_image_model = hugging_face_image_model
prompt = "astronaut riding a horse"
image_path = lst(generate_image(prompt, config, image_size, **kwargs))
image_path = lst(generate_image(prompt, agent, image_size, **kwargs))
assert image_path.exists()
with Image.open(image_path) as img:
assert img.size == (image_size, image_size)
@@ -120,7 +121,7 @@ def generate_and_validate(
)
@pytest.mark.parametrize("delay", [10, 0])
def test_huggingface_fail_request_with_delay(
config, workspace, image_size, image_model, return_text, delay
agent: Agent, workspace, image_size, image_model, return_text, delay
):
return_text = return_text.replace("[model]", image_model).replace(
"[delay]", str(delay)
@@ -138,13 +139,13 @@ def test_huggingface_fail_request_with_delay(
mock_post.return_value.ok = False
mock_post.return_value.text = return_text
config.image_provider = "huggingface"
config.huggingface_image_model = image_model
agent.config.image_provider = "huggingface"
agent.config.huggingface_image_model = image_model
prompt = "astronaut riding a horse"
with patch("time.sleep") as mock_sleep:
# Verify request fails.
result = generate_image(prompt, config, image_size)
result = generate_image(prompt, agent, image_size)
assert result == "Error creating image."
# Verify retry was called with delay if delay is in return_text
@@ -154,8 +155,8 @@ def test_huggingface_fail_request_with_delay(
mock_sleep.assert_not_called()
def test_huggingface_fail_request_with_delay(mocker, config):
config.huggingface_api_token = "1"
def test_huggingface_fail_request_with_delay(mocker, agent: Agent):
agent.config.huggingface_api_token = "1"
# Mock requests.post
mock_post = mocker.patch("requests.post")
@@ -166,10 +167,10 @@ def test_huggingface_fail_request_with_delay(mocker, config):
# Mock time.sleep
mock_sleep = mocker.patch("time.sleep")
config.image_provider = "huggingface"
config.huggingface_image_model = "CompVis/stable-diffusion-v1-4"
agent.config.image_provider = "huggingface"
agent.config.huggingface_image_model = "CompVis/stable-diffusion-v1-4"
result = generate_image("astronaut riding a horse", config, 512)
result = generate_image("astronaut riding a horse", agent, 512)
assert result == "Error creating image."
@@ -177,8 +178,8 @@ def test_huggingface_fail_request_with_delay(mocker, config):
mock_sleep.assert_called_with(0)
def test_huggingface_fail_request_no_delay(mocker, config):
config.huggingface_api_token = "1"
def test_huggingface_fail_request_no_delay(mocker, agent: Agent):
agent.config.huggingface_api_token = "1"
# Mock requests.post
mock_post = mocker.patch("requests.post")
@@ -191,10 +192,10 @@ def test_huggingface_fail_request_no_delay(mocker, config):
# Mock time.sleep
mock_sleep = mocker.patch("time.sleep")
config.image_provider = "huggingface"
config.huggingface_image_model = "CompVis/stable-diffusion-v1-4"
agent.config.image_provider = "huggingface"
agent.config.huggingface_image_model = "CompVis/stable-diffusion-v1-4"
result = generate_image("astronaut riding a horse", config, 512)
result = generate_image("astronaut riding a horse", agent, 512)
assert result == "Error creating image."
@@ -202,8 +203,8 @@ def test_huggingface_fail_request_no_delay(mocker, config):
mock_sleep.assert_not_called()
def test_huggingface_fail_request_bad_json(mocker, config):
config.huggingface_api_token = "1"
def test_huggingface_fail_request_bad_json(mocker, agent: Agent):
agent.config.huggingface_api_token = "1"
# Mock requests.post
mock_post = mocker.patch("requests.post")
@@ -214,10 +215,10 @@ def test_huggingface_fail_request_bad_json(mocker, config):
# Mock time.sleep
mock_sleep = mocker.patch("time.sleep")
config.image_provider = "huggingface"
config.huggingface_image_model = "CompVis/stable-diffusion-v1-4"
agent.config.image_provider = "huggingface"
agent.config.huggingface_image_model = "CompVis/stable-diffusion-v1-4"
result = generate_image("astronaut riding a horse", config, 512)
result = generate_image("astronaut riding a horse", agent, 512)
assert result == "Error creating image."
@@ -225,28 +226,28 @@ def test_huggingface_fail_request_bad_json(mocker, config):
mock_sleep.assert_not_called()
def test_huggingface_fail_request_bad_image(mocker, config):
config.huggingface_api_token = "1"
def test_huggingface_fail_request_bad_image(mocker, agent: Agent):
agent.config.huggingface_api_token = "1"
# Mock requests.post
mock_post = mocker.patch("requests.post")
mock_post.return_value.status_code = 200
config.image_provider = "huggingface"
config.huggingface_image_model = "CompVis/stable-diffusion-v1-4"
agent.config.image_provider = "huggingface"
agent.config.huggingface_image_model = "CompVis/stable-diffusion-v1-4"
result = generate_image("astronaut riding a horse", config, 512)
result = generate_image("astronaut riding a horse", agent, 512)
assert result == "Error creating image."
def test_huggingface_fail_missing_api_token(mocker, config):
config.image_provider = "huggingface"
config.huggingface_image_model = "CompVis/stable-diffusion-v1-4"
def test_huggingface_fail_missing_api_token(mocker, agent: Agent):
agent.config.image_provider = "huggingface"
agent.config.huggingface_image_model = "CompVis/stable-diffusion-v1-4"
# Mock requests.post to raise ValueError
mock_post = mocker.patch("requests.post", side_effect=ValueError)
# Verify request raises an error.
with pytest.raises(ValueError):
generate_image("astronaut riding a horse", config, 512)
generate_image("astronaut riding a horse", agent, 512)

View File

@@ -1,71 +0,0 @@
import pytest
from autogpt.config import Config
from autogpt.plugins import scan_plugins
PLUGINS_TEST_DIR = "tests/unit/data/test_plugins"
PLUGIN_TEST_OPENAI = "https://weathergpt.vercel.app/"
@pytest.fixture
def mock_config_denylist_allowlist_check():
class MockConfig:
"""Mock config object for testing the denylist_allowlist_check function"""
plugins_denylist = ["BadPlugin"]
plugins_allowlist = ["GoodPlugin"]
authorise_key = "y"
exit_key = "n"
return MockConfig()
@pytest.fixture
def config_with_plugins():
"""Mock config object for testing the scan_plugins function"""
# Test that the function returns the correct number of plugins
cfg = Config()
cfg.plugins_dir = PLUGINS_TEST_DIR
cfg.plugins_openai = ["https://weathergpt.vercel.app/"]
return cfg
@pytest.fixture
def mock_config_openai_plugin():
"""Mock config object for testing the scan_plugins function"""
class MockConfig:
"""Mock config object for testing the scan_plugins function"""
plugins_dir = PLUGINS_TEST_DIR
plugins_openai = [PLUGIN_TEST_OPENAI]
plugins_denylist = ["AutoGPTPVicuna"]
plugins_allowlist = [PLUGIN_TEST_OPENAI]
return MockConfig()
def test_scan_plugins_openai(mock_config_openai_plugin):
# Test that the function returns the correct number of plugins
result = scan_plugins(mock_config_openai_plugin, debug=True)
assert len(result) == 1
@pytest.fixture
def mock_config_generic_plugin():
"""Mock config object for testing the scan_plugins function"""
# Test that the function returns the correct number of plugins
class MockConfig:
plugins_dir = PLUGINS_TEST_DIR
plugins_openai = []
plugins_denylist = []
plugins_allowlist = ["AutoGPTPVicuna"]
return MockConfig()
def test_scan_plugins_generic(mock_config_generic_plugin):
# Test that the function returns the correct number of plugins
result = scan_plugins(mock_config_generic_plugin, debug=True)
assert len(result) == 1

View File

@@ -11,7 +11,7 @@ from tests.utils import requires_api_key
@requires_api_key("OPENAI_API_KEY")
def test_generate_aiconfig_automatic_default(patched_api_requestor):
user_inputs = [""]
with patch("builtins.input", side_effect=user_inputs):
with patch("autogpt.utils.session.prompt", side_effect=user_inputs):
ai_config = prompt_user()
assert isinstance(ai_config, AIConfig)
@@ -44,7 +44,7 @@ def test_generate_aiconfig_automatic_fallback(patched_api_requestor):
"",
"",
]
with patch("builtins.input", side_effect=user_inputs):
with patch("autogpt.utils.session.prompt", side_effect=user_inputs):
ai_config = prompt_user()
assert isinstance(ai_config, AIConfig)
@@ -65,7 +65,7 @@ def test_prompt_user_manual_mode(patched_api_requestor):
"",
"",
]
with patch("builtins.input", side_effect=user_inputs):
with patch("autogpt.utils.session.prompt", side_effect=user_inputs):
ai_config = prompt_user()
assert isinstance(ai_config, AIConfig)

View File

@@ -1,14 +1,18 @@
import pytest
from pytest_mock import MockerFixture
from autogpt.agent.agent import Agent
from autogpt.commands.web_selenium import browse_website
from autogpt.config import Config
from tests.utils import requires_api_key
def test_browse_website(config: Config, patched_api_requestor: MockerFixture):
@pytest.mark.vcr
@requires_api_key("OPENAI_API_KEY")
def test_browse_website(agent: Agent, patched_api_requestor: MockerFixture):
url = "https://barrel-roll.com"
question = "How to execute a barrel roll"
response = browse_website(url, question, config)
response = browse_website(url, question, agent)
assert "Error" in response
# Sanity check that the response is not too long
assert len(response) < 200

View File

@@ -1,74 +0,0 @@
# Date: 2023-5-13
# Author: Generated by GoCodeo.
import pytest
from autogpt.commands.analyze_code import analyze_code
from autogpt.config import Config
@pytest.fixture
def mock_call_ai_function(mocker):
return mocker.patch("autogpt.commands.analyze_code.call_ai_function")
class TestAnalyzeCode:
def test_positive_analyze_code(self, mock_call_ai_function):
# Positive Test
mock_call_ai_function.return_value = ["Suggestion 1", "Suggestion 2"]
code = "def example_function():\n pass"
config = Config()
result = analyze_code(code, config)
assert result == ["Suggestion 1", "Suggestion 2"]
mock_call_ai_function.assert_called_once_with(
"def analyze_code(code: str) -> list[str]:",
[code],
"Analyzes the given code and returns a list of suggestions for improvements.",
config=config,
)
def test_negative_analyze_code(
self,
mock_call_ai_function,
config: Config,
):
# Negative Test
mock_call_ai_function.return_value = []
code = "def example_function():\n pass"
result = analyze_code(code, config)
assert result == []
mock_call_ai_function.assert_called_once_with(
"def analyze_code(code: str) -> list[str]:",
[code],
"Analyzes the given code and returns a list of suggestions for improvements.",
config=config,
)
def test_error_analyze_code(self, mock_call_ai_function, config: Config):
# Error Test
mock_call_ai_function.side_effect = Exception("Error occurred")
code = "def example_function():\n pass"
with pytest.raises(Exception):
result = analyze_code(code, config)
mock_call_ai_function.assert_called_once_with(
"def analyze_code(code: str) -> list[str]:",
[code],
"Analyzes the given code and returns a list of suggestions for improvements.",
config=config,
)
def test_edge_analyze_code_empty_code(
self,
mock_call_ai_function,
config: Config,
):
# Edge Test
mock_call_ai_function.return_value = ["Suggestion 1", "Suggestion 2"]
code = ""
result = analyze_code(code, config)
assert result == ["Suggestion 1", "Suggestion 2"]
mock_call_ai_function.assert_called_once_with(
"def analyze_code(code: str) -> list[str]:",
[code],
"Analyzes the given code and returns a list of suggestions for improvements.",
config=config,
)

View File

@@ -1,56 +0,0 @@
# Date: 2023-5-13
# Author: Generated by GoCodeo.
import json
from unittest.mock import MagicMock, patch
import pytest
from autogpt.commands.audio_text import read_audio
class TestReadAudio:
@patch("requests.post")
def test_positive_read_audio(self, mock_post, config):
# Positive Test
audio_data = b"test_audio_data"
mock_response = MagicMock()
mock_response.content.decode.return_value = json.dumps(
{"text": "Hello, world!"}
)
mock_post.return_value = mock_response
config.huggingface_api_token = "testing-token"
result = read_audio(audio_data, config)
assert result == "The audio says: Hello, world!"
mock_post.assert_called_once_with(
f"https://api-inference.huggingface.co/models/{config.huggingface_audio_to_text_model}",
headers={"Authorization": f"Bearer {config.huggingface_api_token}"},
data=audio_data,
)
@patch("requests.post")
def test_negative_read_audio(self, mock_post, config):
# Negative Test
audio_data = b"test_audio_data"
mock_response = MagicMock()
mock_response.content.decode.return_value = json.dumps({"text": ""})
mock_post.return_value = mock_response
config.huggingface_api_token = "testing-token"
result = read_audio(audio_data, config)
assert result == "The audio says: "
mock_post.assert_called_once_with(
f"https://api-inference.huggingface.co/models/{config.huggingface_audio_to_text_model}",
headers={"Authorization": f"Bearer {config.huggingface_api_token}"},
data=audio_data,
)
def test_error_read_audio(self, config):
# Error Test
config.huggingface_api_token = None
with pytest.raises(ValueError):
read_audio(b"test_audio_data", config)
def test_edge_read_audio_empty_audio(self, config):
# Edge Test
with pytest.raises(ValueError):
read_audio(b"", config)

View File

@@ -1,55 +0,0 @@
# Date: 2023-5-13
# Author: Generated by GoCodeo.
from unittest.mock import mock_open, patch
import pytest
from autogpt.commands.audio_text import read_audio_from_file
from autogpt.config import Config
@pytest.fixture
def mock_read_audio(mocker):
return mocker.patch("autogpt.commands.audio_text.read_audio")
class TestReadAudioFromFile:
def test_positive_read_audio_from_file(self, mock_read_audio):
# Positive test
mock_read_audio.return_value = "This is a sample text."
mock_file_data = b"Audio data"
m = mock_open(read_data=mock_file_data)
with patch("builtins.open", m):
result = read_audio_from_file("test_audio.wav", Config())
assert result == "This is a sample text."
m.assert_called_once_with("test_audio.wav", "rb")
def test_negative_read_audio_from_file(self, mock_read_audio):
# Negative test
mock_read_audio.return_value = "This is a sample text."
mock_file_data = b"Audio data"
m = mock_open(read_data=mock_file_data)
with patch("builtins.open", m):
result = read_audio_from_file("test_audio.wav", Config())
assert result != "Incorrect text."
m.assert_called_once_with("test_audio.wav", "rb")
def test_error_read_audio_from_file(self):
# Error test
with pytest.raises(FileNotFoundError):
read_audio_from_file("non_existent_file.wav", Config())
def test_edge_empty_audio_file(self, mock_read_audio):
# Edge test
mock_read_audio.return_value = ""
mock_file_data = b""
m = mock_open(read_data=mock_file_data)
with patch("builtins.open", m):
result = read_audio_from_file("empty_audio.wav", Config())
assert result == ""
m.assert_called_once_with("empty_audio.wav", "rb")

View File

@@ -0,0 +1,274 @@
"""This is the Test plugin for Auto-GPT."""
from typing import Any, Dict, List, Optional, Tuple, TypeVar
from auto_gpt_plugin_template import AutoGPTPluginTemplate
PromptGenerator = TypeVar("PromptGenerator")
class AutoGPTGuanaco(AutoGPTPluginTemplate):
"""
This is plugin for Auto-GPT.
"""
def __init__(self):
super().__init__()
self._name = "Auto-GPT-Guanaco"
self._version = "0.1.0"
self._description = "This is a Guanaco local model plugin."
def can_handle_on_response(self) -> bool:
"""This method is called to check that the plugin can
handle the on_response method.
Returns:
bool: True if the plugin can handle the on_response method."""
return False
def on_response(self, response: str, *args, **kwargs) -> str:
"""This method is called when a response is received from the model."""
if len(response):
print("OMG OMG It's Alive!")
else:
print("Is it alive?")
def can_handle_post_prompt(self) -> bool:
"""This method is called to check that the plugin can
handle the post_prompt method.
Returns:
bool: True if the plugin can handle the post_prompt method."""
return False
def post_prompt(self, prompt: PromptGenerator) -> PromptGenerator:
"""This method is called just after the generate_prompt is called,
but actually before the prompt is generated.
Args:
prompt (PromptGenerator): The prompt generator.
Returns:
PromptGenerator: The prompt generator.
"""
def can_handle_on_planning(self) -> bool:
"""This method is called to check that the plugin can
handle the on_planning method.
Returns:
bool: True if the plugin can handle the on_planning method."""
return False
def on_planning(
self, prompt: PromptGenerator, messages: List[str]
) -> Optional[str]:
"""This method is called before the planning chat completeion is done.
Args:
prompt (PromptGenerator): The prompt generator.
messages (List[str]): The list of messages.
"""
def can_handle_post_planning(self) -> bool:
"""This method is called to check that the plugin can
handle the post_planning method.
Returns:
bool: True if the plugin can handle the post_planning method."""
return False
def post_planning(self, response: str) -> str:
"""This method is called after the planning chat completeion is done.
Args:
response (str): The response.
Returns:
str: The resulting response.
"""
def can_handle_pre_instruction(self) -> bool:
"""This method is called to check that the plugin can
handle the pre_instruction method.
Returns:
bool: True if the plugin can handle the pre_instruction method."""
return False
def pre_instruction(self, messages: List[str]) -> List[str]:
"""This method is called before the instruction chat is done.
Args:
messages (List[str]): The list of context messages.
Returns:
List[str]: The resulting list of messages.
"""
def can_handle_on_instruction(self) -> bool:
"""This method is called to check that the plugin can
handle the on_instruction method.
Returns:
bool: True if the plugin can handle the on_instruction method."""
return False
def on_instruction(self, messages: List[str]) -> Optional[str]:
"""This method is called when the instruction chat is done.
Args:
messages (List[str]): The list of context messages.
Returns:
Optional[str]: The resulting message.
"""
def can_handle_post_instruction(self) -> bool:
"""This method is called to check that the plugin can
handle the post_instruction method.
Returns:
bool: True if the plugin can handle the post_instruction method."""
return False
def post_instruction(self, response: str) -> str:
"""This method is called after the instruction chat is done.
Args:
response (str): The response.
Returns:
str: The resulting response.
"""
def can_handle_pre_command(self) -> bool:
"""This method is called to check that the plugin can
handle the pre_command method.
Returns:
bool: True if the plugin can handle the pre_command method."""
return False
def pre_command(
self, command_name: str, arguments: Dict[str, Any]
) -> Tuple[str, Dict[str, Any]]:
"""This method is called before the command is executed.
Args:
command_name (str): The command name.
arguments (Dict[str, Any]): The arguments.
Returns:
Tuple[str, Dict[str, Any]]: The command name and the arguments.
"""
def can_handle_post_command(self) -> bool:
"""This method is called to check that the plugin can
handle the post_command method.
Returns:
bool: True if the plugin can handle the post_command method."""
return False
def post_command(self, command_name: str, response: str) -> str:
"""This method is called after the command is executed.
Args:
command_name (str): The command name.
response (str): The response.
Returns:
str: The resulting response.
"""
def can_handle_chat_completion(
self,
messages: list[Dict[Any, Any]],
model: str,
temperature: float,
max_tokens: int,
) -> bool:
"""This method is called to check that the plugin can
handle the chat_completion method.
Args:
messages (Dict[Any, Any]): The messages.
model (str): The model name.
temperature (float): The temperature.
max_tokens (int): The max tokens.
Returns:
bool: True if the plugin can handle the chat_completion method."""
return False
def handle_chat_completion(
self,
messages: list[Dict[Any, Any]],
model: str,
temperature: float,
max_tokens: int,
) -> str:
"""This method is called when the chat completion is done.
Args:
messages (Dict[Any, Any]): The messages.
model (str): The model name.
temperature (float): The temperature.
max_tokens (int): The max tokens.
Returns:
str: The resulting response.
"""
def can_handle_text_embedding(self, text: str) -> bool:
"""This method is called to check that the plugin can
handle the text_embedding method.
Args:
text (str): The text to be convert to embedding.
Returns:
bool: True if the plugin can handle the text_embedding method."""
return False
def handle_text_embedding(self, text: str) -> list:
"""This method is called when the chat completion is done.
Args:
text (str): The text to be convert to embedding.
Returns:
list: The text embedding.
"""
def can_handle_user_input(self, user_input: str) -> bool:
"""This method is called to check that the plugin can
handle the user_input method.
Args:
user_input (str): The user input.
Returns:
bool: True if the plugin can handle the user_input method."""
return False
def user_input(self, user_input: str) -> str:
"""This method is called to request user input to the user.
Args:
user_input (str): The question or prompt to ask the user.
Returns:
str: The user input.
"""
def can_handle_report(self) -> bool:
"""This method is called to check that the plugin can
handle the report method.
Returns:
bool: True if the plugin can handle the report method."""
return False
def report(self, message: str) -> None:
"""This method is called to report a message to the user.
Args:
message (str): The message to report.
"""

View File

@@ -4,28 +4,30 @@ import pytest
from autogpt.agent import Agent
from autogpt.config import AIConfig
from autogpt.config.config import Config
@pytest.fixture
def agent():
def agent(config: Config):
ai_name = "Test AI"
memory = MagicMock()
next_action_count = 0
command_registry = MagicMock()
config = AIConfig()
ai_config = AIConfig(ai_name=ai_name)
system_prompt = "System prompt"
triggering_prompt = "Triggering prompt"
workspace_directory = "workspace_directory"
agent = Agent(
ai_name,
memory,
next_action_count,
command_registry,
config,
system_prompt,
triggering_prompt,
workspace_directory,
ai_name=ai_name,
memory=memory,
next_action_count=next_action_count,
command_registry=command_registry,
ai_config=ai_config,
config=config,
system_prompt=system_prompt,
triggering_prompt=triggering_prompt,
workspace_directory=workspace_directory,
)
return agent
@@ -36,7 +38,7 @@ def test_agent_initialization(agent: Agent):
assert agent.history.messages == []
assert agent.next_action_count == 0
assert agent.command_registry == agent.command_registry
assert agent.config == agent.config
assert agent.ai_config == agent.ai_config
assert agent.system_prompt == "System prompt"
assert agent.triggering_prompt == "Triggering prompt"

View File

@@ -1,8 +1,9 @@
from unittest.mock import MagicMock, patch
import pytest
from pytest_mock import MockerFixture
from autogpt.llm.api_manager import COSTS, ApiManager
from autogpt.llm.api_manager import OPEN_AI_MODELS, ApiManager
api_manager = ApiManager()
@@ -14,16 +15,17 @@ def reset_api_manager():
@pytest.fixture(autouse=True)
def mock_costs():
with patch.dict(
COSTS,
{
"gpt-3.5-turbo": {"prompt": 0.002, "completion": 0.002},
"text-embedding-ada-002": {"prompt": 0.0004, "completion": 0},
},
clear=True,
):
yield
def mock_costs(mocker: MockerFixture):
mocker.patch.multiple(
OPEN_AI_MODELS["gpt-3.5-turbo"],
prompt_token_cost=0.0013,
completion_token_cost=0.0025,
)
mocker.patch.multiple(
OPEN_AI_MODELS["text-embedding-ada-002"],
prompt_token_cost=0.0004,
)
yield
class TestApiManager:
@@ -87,15 +89,15 @@ class TestApiManager:
assert api_manager.get_total_prompt_tokens() == 10
assert api_manager.get_total_completion_tokens() == 20
assert api_manager.get_total_cost() == (10 * 0.002 + 20 * 0.002) / 1000
assert api_manager.get_total_cost() == (10 * 0.0013 + 20 * 0.0025) / 1000
def test_getter_methods(self):
"""Test the getter methods for total tokens, cost, and budget."""
api_manager.update_cost(60, 120, "gpt-3.5-turbo")
api_manager.update_cost(600, 1200, "gpt-3.5-turbo")
api_manager.set_total_budget(10.0)
assert api_manager.get_total_prompt_tokens() == 60
assert api_manager.get_total_completion_tokens() == 120
assert api_manager.get_total_cost() == (60 * 0.002 + 120 * 0.002) / 1000
assert api_manager.get_total_prompt_tokens() == 600
assert api_manager.get_total_completion_tokens() == 1200
assert api_manager.get_total_cost() == (600 * 0.0013 + 1200 * 0.0025) / 1000
assert api_manager.get_total_budget() == 10.0
@staticmethod
@@ -107,7 +109,7 @@ class TestApiManager:
assert api_manager.get_total_budget() == total_budget
@staticmethod
def test_update_cost():
def test_update_cost_completion_model():
"""Test if updating the cost works correctly."""
prompt_tokens = 50
completion_tokens = 100
@@ -115,9 +117,24 @@ class TestApiManager:
api_manager.update_cost(prompt_tokens, completion_tokens, model)
assert api_manager.get_total_prompt_tokens() == 50
assert api_manager.get_total_completion_tokens() == 100
assert api_manager.get_total_cost() == (50 * 0.002 + 100 * 0.002) / 1000
assert api_manager.get_total_prompt_tokens() == prompt_tokens
assert api_manager.get_total_completion_tokens() == completion_tokens
assert (
api_manager.get_total_cost()
== (prompt_tokens * 0.0013 + completion_tokens * 0.0025) / 1000
)
@staticmethod
def test_update_cost_embedding_model():
"""Test if updating the cost works correctly."""
prompt_tokens = 1337
model = "text-embedding-ada-002"
api_manager.update_cost(prompt_tokens, 0, model)
assert api_manager.get_total_prompt_tokens() == prompt_tokens
assert api_manager.get_total_completion_tokens() == 0
assert api_manager.get_total_cost() == (prompt_tokens * 0.0004) / 1000
@staticmethod
def test_get_models():

View File

@@ -3,6 +3,7 @@
# Dependencies:
# pip install pytest-mock
from autogpt.agent.agent import Agent
from autogpt.commands.web_requests import scrape_links
"""
@@ -42,14 +43,14 @@ class TestScrapeLinks:
provided with a valid url that returns a webpage with hyperlinks.
"""
def test_valid_url_with_hyperlinks(self, config):
def test_valid_url_with_hyperlinks(self, agent: Agent):
url = "https://www.google.com"
result = scrape_links(url, config=config)
result = scrape_links(url, agent=agent)
assert len(result) > 0
assert isinstance(result, list)
assert isinstance(result[0], str)
def test_valid_url(self, mocker, config):
def test_valid_url(self, mocker, agent: Agent):
"""Test that the function returns correctly formatted hyperlinks when given a valid url."""
# Mock the requests.get() function to return a response with sample HTML containing hyperlinks
mock_response = mocker.Mock()
@@ -60,12 +61,12 @@ class TestScrapeLinks:
mocker.patch("requests.Session.get", return_value=mock_response)
# Call the function with a valid URL
result = scrape_links("https://www.example.com", config)
result = scrape_links("https://www.example.com", agent)
# Assert that the function returns correctly formatted hyperlinks
assert result == ["Google (https://www.google.com)"]
def test_invalid_url(self, mocker, config):
def test_invalid_url(self, mocker, agent: Agent):
"""Test that the function returns "error" when given an invalid url."""
# Mock the requests.get() function to return an HTTP error response
mock_response = mocker.Mock()
@@ -73,12 +74,12 @@ class TestScrapeLinks:
mocker.patch("requests.Session.get", return_value=mock_response)
# Call the function with an invalid URL
result = scrape_links("https://www.invalidurl.com", config)
result = scrape_links("https://www.invalidurl.com", agent)
# Assert that the function returns "error"
assert "Error:" in result
def test_no_hyperlinks(self, mocker, config):
def test_no_hyperlinks(self, mocker, agent: Agent):
"""Test that the function returns an empty list when the html contains no hyperlinks."""
# Mock the requests.get() function to return a response with sample HTML containing no hyperlinks
mock_response = mocker.Mock()
@@ -87,12 +88,12 @@ class TestScrapeLinks:
mocker.patch("requests.Session.get", return_value=mock_response)
# Call the function with a URL containing no hyperlinks
result = scrape_links("https://www.example.com", config)
result = scrape_links("https://www.example.com", agent)
# Assert that the function returns an empty list
assert result == []
def test_scrape_links_with_few_hyperlinks(self, mocker, config):
def test_scrape_links_with_few_hyperlinks(self, mocker, agent: Agent):
"""Test that scrape_links() correctly extracts and formats hyperlinks from a sample HTML containing a few hyperlinks."""
mock_response = mocker.Mock()
mock_response.status_code = 200
@@ -108,7 +109,7 @@ class TestScrapeLinks:
mocker.patch("requests.Session.get", return_value=mock_response)
# Call the function being tested
result = scrape_links("https://www.example.com", config)
result = scrape_links("https://www.example.com", agent)
# Assert that the function returns a list of formatted hyperlinks
assert isinstance(result, list)

View File

@@ -3,6 +3,7 @@
import pytest
import requests
from autogpt.agent.agent import Agent
from autogpt.commands.web_requests import scrape_text
"""
@@ -42,7 +43,7 @@ Additional aspects:
class TestScrapeText:
def test_scrape_text_with_valid_url(self, mocker, config):
def test_scrape_text_with_valid_url(self, mocker, agent: Agent):
"""Tests that scrape_text() returns the expected text when given a valid URL."""
# Mock the requests.get() method to return a response with expected text
expected_text = "This is some sample text"
@@ -57,14 +58,14 @@ class TestScrapeText:
# Call the function with a valid URL and assert that it returns the
# expected text
url = "http://www.example.com"
assert scrape_text(url, config) == expected_text
assert scrape_text(url, agent) == expected_text
def test_invalid_url(self, config):
def test_invalid_url(self, agent: Agent):
"""Tests that an error is raised when an invalid url is provided."""
url = "invalidurl.com"
pytest.raises(ValueError, scrape_text, url, config)
pytest.raises(ValueError, scrape_text, url, agent)
def test_unreachable_url(self, mocker, config):
def test_unreachable_url(self, mocker, agent: Agent):
"""Test that scrape_text returns an error message when an invalid or unreachable url is provided."""
# Mock the requests.get() method to raise an exception
mocker.patch(
@@ -74,10 +75,10 @@ class TestScrapeText:
# Call the function with an invalid URL and assert that it returns an error
# message
url = "http://thiswebsitedoesnotexist.net/"
error_message = scrape_text(url, config)
error_message = scrape_text(url, agent)
assert "Error:" in error_message
def test_no_text(self, mocker, config):
def test_no_text(self, mocker, agent: Agent):
"""Test that scrape_text returns an empty string when the html page contains no text to be scraped."""
# Mock the requests.get() method to return a response with no text
mock_response = mocker.Mock()
@@ -87,20 +88,20 @@ class TestScrapeText:
# Call the function with a valid URL and assert that it returns an empty string
url = "http://www.example.com"
assert scrape_text(url, config) == ""
assert scrape_text(url, agent) == ""
def test_http_error(self, mocker, config):
def test_http_error(self, mocker, agent: Agent):
"""Test that scrape_text returns an error message when the response status code is an http error (>=400)."""
# Mock the requests.get() method to return a response with a 404 status code
mocker.patch("requests.Session.get", return_value=mocker.Mock(status_code=404))
# Call the function with a URL
result = scrape_text("https://www.example.com", config)
result = scrape_text("https://www.example.com", agent)
# Check that the function returns an error message
assert result == "Error: HTTP 404 error"
def test_scrape_text_with_html_tags(self, mocker, config):
def test_scrape_text_with_html_tags(self, mocker, agent: Agent):
"""Test that scrape_text() properly handles HTML tags."""
# Create a mock response object with HTML containing tags
html = "<html><body><p>This is <b>bold</b> text.</p></body></html>"
@@ -110,7 +111,7 @@ class TestScrapeText:
mocker.patch("requests.Session.get", return_value=mock_response)
# Call the function with a URL
result = scrape_text("https://www.example.com", config)
result = scrape_text("https://www.example.com", agent)
# Check that the function properly handles HTML tags
assert result == "This is bold text."

View File

@@ -20,9 +20,7 @@ def test_initial_values(config: Config):
assert config.continuous_mode == False
assert config.speak_mode == False
assert config.fast_llm_model == "gpt-3.5-turbo"
assert config.smart_llm_model == "gpt-4"
assert config.fast_token_limit == 4000
assert config.smart_token_limit == 8000
assert config.smart_llm_model == "gpt-3.5-turbo"
def test_set_continuous_mode(config: Config):
@@ -81,34 +79,6 @@ def test_set_smart_llm_model(config: Config):
config.set_smart_llm_model(smart_llm_model)
def test_set_fast_token_limit(config: Config):
"""
Test if the set_fast_token_limit() method updates the fast_token_limit attribute.
"""
# Store token limit to reset it after the test
fast_token_limit = config.fast_token_limit
config.set_fast_token_limit(5000)
assert config.fast_token_limit == 5000
# Reset token limit
config.set_fast_token_limit(fast_token_limit)
def test_set_smart_token_limit(config: Config):
"""
Test if the set_smart_token_limit() method updates the smart_token_limit attribute.
"""
# Store token limit to reset it after the test
smart_token_limit = config.smart_token_limit
config.set_smart_token_limit(9000)
assert config.smart_token_limit == 9000
# Reset token limit
config.set_smart_token_limit(smart_token_limit)
def test_set_debug_mode(config: Config):
"""
Test if the set_debug_mode() method updates the debug_mode attribute.

View File

@@ -0,0 +1,24 @@
from autogpt.agent import Agent
from autogpt.app import execute_command
def check_plan():
return "hi"
def test_execute_command_plugin(agent: Agent):
"""Test that executing a command that came from a plugin works as expected"""
agent.ai_config.prompt_generator.add_command(
"check_plan",
"Read the plan.md with the next goals to achieve",
{},
check_plan,
)
command_name = "check_plan"
arguments = {}
command_result = execute_command(
command_name=command_name,
arguments=arguments,
agent=agent,
)
assert command_result == "hi"

View File

@@ -12,7 +12,7 @@ import pytest
from pytest_mock import MockerFixture
import autogpt.commands.file_operations as file_ops
from autogpt.config import Config
from autogpt.agent.agent import Agent
from autogpt.memory.vector.memory_item import MemoryItem
from autogpt.memory.vector.utils import Embedding
from autogpt.utils import readable_file_size
@@ -42,7 +42,7 @@ def mock_MemoryItem_from_text(mocker: MockerFixture, mock_embedding: Embedding):
@pytest.fixture()
def test_file_path(config, workspace: Workspace):
def test_file_path(workspace: Workspace):
return workspace.get_path("test_file.txt")
@@ -55,22 +55,22 @@ def test_file(test_file_path: Path):
@pytest.fixture()
def test_file_with_content_path(test_file: TextIOWrapper, file_content, config):
def test_file_with_content_path(test_file: TextIOWrapper, file_content, agent: Agent):
test_file.write(file_content)
test_file.close()
file_ops.log_operation(
"write", test_file.name, config, file_ops.text_checksum(file_content)
"write", test_file.name, agent, file_ops.text_checksum(file_content)
)
return Path(test_file.name)
@pytest.fixture()
def test_directory(config, workspace: Workspace):
def test_directory(workspace: Workspace):
return workspace.get_path("test_directory")
@pytest.fixture()
def test_nested_file(config, workspace: Workspace):
def test_nested_file(workspace: Workspace):
return workspace.get_path("nested/test_file.txt")
@@ -117,7 +117,7 @@ def test_file_operations_state(test_file: TextIOWrapper):
assert file_ops.file_operations_state(test_file.name) == expected_state
def test_is_duplicate_operation(config: Config, mocker: MockerFixture):
def test_is_duplicate_operation(agent: Agent, mocker: MockerFixture):
# Prepare a fake state dictionary for the function to use
state = {
"path/to/file1.txt": "checksum1",
@@ -128,42 +128,48 @@ def test_is_duplicate_operation(config: Config, mocker: MockerFixture):
# Test cases with write operations
assert (
file_ops.is_duplicate_operation(
"write", "path/to/file1.txt", config, "checksum1"
"write", "path/to/file1.txt", agent.config, "checksum1"
)
is True
)
assert (
file_ops.is_duplicate_operation(
"write", "path/to/file1.txt", config, "checksum2"
"write", "path/to/file1.txt", agent.config, "checksum2"
)
is False
)
assert (
file_ops.is_duplicate_operation(
"write", "path/to/file3.txt", config, "checksum3"
"write", "path/to/file3.txt", agent.config, "checksum3"
)
is False
)
# Test cases with append operations
assert (
file_ops.is_duplicate_operation(
"append", "path/to/file1.txt", config, "checksum1"
"append", "path/to/file1.txt", agent.config, "checksum1"
)
is False
)
# Test cases with delete operations
assert (
file_ops.is_duplicate_operation("delete", "path/to/file1.txt", config) is False
file_ops.is_duplicate_operation(
"delete", "path/to/file1.txt", config=agent.config
)
is False
)
assert (
file_ops.is_duplicate_operation("delete", "path/to/file3.txt", config) is True
file_ops.is_duplicate_operation(
"delete", "path/to/file3.txt", config=agent.config
)
is True
)
# Test logging a file operation
def test_log_operation(config: Config):
file_ops.log_operation("log_test", "path/to/test", config)
with open(config.file_logger_path, "r", encoding="utf-8") as f:
def test_log_operation(agent: Agent):
file_ops.log_operation("log_test", "path/to/test", agent=agent)
with open(agent.config.file_logger_path, "r", encoding="utf-8") as f:
content = f.read()
assert f"log_test: path/to/test\n" in content
@@ -175,104 +181,120 @@ def test_text_checksum(file_content: str):
assert checksum != different_checksum
def test_log_operation_with_checksum(config: Config):
file_ops.log_operation("log_test", "path/to/test", config, checksum="ABCDEF")
with open(config.file_logger_path, "r", encoding="utf-8") as f:
def test_log_operation_with_checksum(agent: Agent):
file_ops.log_operation("log_test", "path/to/test", agent=agent, checksum="ABCDEF")
with open(agent.config.file_logger_path, "r", encoding="utf-8") as f:
content = f.read()
assert f"log_test: path/to/test #ABCDEF\n" in content
@pytest.mark.parametrize(
"max_length, overlap, content, expected",
[
(
4,
1,
"abcdefghij",
["abcd", "defg", "ghij"],
),
(
4,
0,
"abcdefghijkl",
["abcd", "efgh", "ijkl"],
),
(
4,
0,
"abcdefghijklm",
["abcd", "efgh", "ijkl", "m"],
),
(
4,
0,
"abcdefghijk",
["abcd", "efgh", "ijk"],
),
],
)
# Test splitting a file into chunks
def test_split_file(max_length, overlap, content, expected):
assert (
list(file_ops.split_file(content, max_length=max_length, overlap=overlap))
== expected
)
def test_read_file(
mock_MemoryItem_from_text,
test_file_with_content_path: Path,
file_content,
config: Config,
agent: Agent,
):
content = file_ops.read_file(test_file_with_content_path, config)
content = file_ops.read_file(test_file_with_content_path, agent=agent)
assert content.replace("\r", "") == file_content
def test_write_to_file(test_file_path: Path, config):
def test_read_file_not_found(agent: Agent):
filename = "does_not_exist.txt"
content = file_ops.read_file(filename, agent=agent)
assert "Error:" in content and filename in content and "no such file" in content
def test_write_to_file(test_file_path: Path, agent: Agent):
new_content = "This is new content.\n"
file_ops.write_to_file(str(test_file_path), new_content, config)
file_ops.write_to_file(str(test_file_path), new_content, agent=agent)
with open(test_file_path, "r", encoding="utf-8") as f:
content = f.read()
assert content == new_content
def test_write_file_logs_checksum(test_file_path: Path, config):
def test_write_file_logs_checksum(test_file_path: Path, agent: Agent):
new_content = "This is new content.\n"
new_checksum = file_ops.text_checksum(new_content)
file_ops.write_to_file(str(test_file_path), new_content, config)
with open(config.file_logger_path, "r", encoding="utf-8") as f:
file_ops.write_to_file(str(test_file_path), new_content, agent=agent)
with open(agent.config.file_logger_path, "r", encoding="utf-8") as f:
log_entry = f.read()
assert log_entry == f"write: {test_file_path} #{new_checksum}\n"
def test_write_file_fails_if_content_exists(test_file_path: Path, config):
def test_write_file_fails_if_content_exists(test_file_path: Path, agent: Agent):
new_content = "This is new content.\n"
file_ops.log_operation(
"write",
str(test_file_path),
config,
agent=agent,
checksum=file_ops.text_checksum(new_content),
)
result = file_ops.write_to_file(str(test_file_path), new_content, config)
result = file_ops.write_to_file(str(test_file_path), new_content, agent=agent)
assert result == "Error: File has already been updated."
def test_write_file_succeeds_if_content_different(
test_file_with_content_path: Path, config
test_file_with_content_path: Path, agent: Agent
):
new_content = "This is different content.\n"
result = file_ops.write_to_file(
str(test_file_with_content_path), new_content, config
str(test_file_with_content_path), new_content, agent=agent
)
assert result == "File written to successfully."
def test_append_to_file(test_nested_file: Path, config):
append_text = "This is appended text.\n"
file_ops.write_to_file(test_nested_file, append_text, config)
# Update file testing
def test_replace_in_file_all_occurrences(test_file, test_file_path, agent: Agent):
old_content = "This is a test file.\n we test file here\na test is needed"
expected_content = (
"This is a update file.\n we update file here\na update is needed"
)
test_file.write(old_content)
test_file.close()
file_ops.replace_in_file(test_file_path, "test", "update", agent=agent)
with open(test_file_path) as f:
new_content = f.read()
print(new_content)
print(expected_content)
assert new_content == expected_content
file_ops.append_to_file(test_nested_file, append_text, config)
def test_replace_in_file_one_occurrence(test_file, test_file_path, agent: Agent):
old_content = "This is a test file.\n we test file here\na test is needed"
expected_content = "This is a test file.\n we update file here\na test is needed"
test_file.write(old_content)
test_file.close()
file_ops.replace_in_file(
test_file_path, "test", "update", agent=agent, occurrence_index=1
)
with open(test_file_path) as f:
new_content = f.read()
assert new_content == expected_content
def test_replace_in_file_multiline_old_text(test_file, test_file_path, agent: Agent):
old_content = "This is a multi_line\ntest for testing\nhow well this function\nworks when the input\nis multi-lined"
expected_content = "This is a multi_line\nfile. succeeded test\nis multi-lined"
test_file.write(old_content)
test_file.close()
file_ops.replace_in_file(
test_file_path,
"\ntest for testing\nhow well this function\nworks when the input\n",
"\nfile. succeeded test\n",
agent=agent,
)
with open(test_file_path) as f:
new_content = f.read()
assert new_content == expected_content
def test_append_to_file(test_nested_file: Path, agent: Agent):
append_text = "This is appended text.\n"
file_ops.write_to_file(test_nested_file, append_text, agent=agent)
file_ops.append_to_file(test_nested_file, append_text, agent=agent)
with open(test_nested_file, "r") as f:
content_after = f.read()
@@ -280,11 +302,13 @@ def test_append_to_file(test_nested_file: Path, config):
assert content_after == append_text + append_text
def test_append_to_file_uses_checksum_from_appended_file(test_file_path: Path, config):
def test_append_to_file_uses_checksum_from_appended_file(
test_file_path: Path, agent: Agent
):
append_text = "This is appended text.\n"
file_ops.append_to_file(test_file_path, append_text, config)
file_ops.append_to_file(test_file_path, append_text, config)
with open(config.file_logger_path, "r", encoding="utf-8") as f:
file_ops.append_to_file(test_file_path, append_text, agent=agent)
file_ops.append_to_file(test_file_path, append_text, agent=agent)
with open(agent.config.file_logger_path, "r", encoding="utf-8") as f:
log_contents = f.read()
digest = hashlib.md5()
@@ -298,25 +322,25 @@ def test_append_to_file_uses_checksum_from_appended_file(test_file_path: Path, c
)
def test_delete_file(test_file_with_content_path: Path, config):
result = file_ops.delete_file(str(test_file_with_content_path), config)
def test_delete_file(test_file_with_content_path: Path, agent: Agent):
result = file_ops.delete_file(str(test_file_with_content_path), agent=agent)
assert result == "File deleted successfully."
assert os.path.exists(test_file_with_content_path) is False
def test_delete_missing_file(config):
def test_delete_missing_file(agent: Agent):
filename = "path/to/file/which/does/not/exist"
# confuse the log
file_ops.log_operation("write", filename, config, checksum="fake")
file_ops.log_operation("write", filename, agent=agent, checksum="fake")
try:
os.remove(filename)
except FileNotFoundError as err:
assert str(err) in file_ops.delete_file(filename, config)
assert str(err) in file_ops.delete_file(filename, agent=agent)
return
assert False, f"Failed to test delete_file; {filename} not expected to exist"
def test_list_files(workspace: Workspace, test_directory: Path, config):
def test_list_files(workspace: Workspace, test_directory: Path, agent: Agent):
# Case 1: Create files A and B, search for A, and ensure we don't return A and B
file_a = workspace.get_path("file_a.txt")
file_b = workspace.get_path("file_b.txt")
@@ -334,7 +358,7 @@ def test_list_files(workspace: Workspace, test_directory: Path, config):
with open(os.path.join(test_directory, file_a.name), "w") as f:
f.write("This is file A in the subdirectory.")
files = file_ops.list_files(str(workspace.root), config)
files = file_ops.list_files(str(workspace.root), agent=agent)
assert file_a.name in files
assert file_b.name in files
assert os.path.join(Path(test_directory).name, file_a.name) in files
@@ -347,17 +371,17 @@ def test_list_files(workspace: Workspace, test_directory: Path, config):
# Case 2: Search for a file that does not exist and make sure we don't throw
non_existent_file = "non_existent_file.txt"
files = file_ops.list_files("", config)
files = file_ops.list_files("", agent=agent)
assert non_existent_file not in files
def test_download_file(workspace: Workspace, config):
def test_download_file(workspace: Workspace, agent: Agent):
url = "https://github.com/Significant-Gravitas/Auto-GPT/archive/refs/tags/v0.2.2.tar.gz"
local_name = workspace.get_path("auto-gpt.tar.gz")
size = 365023
readable_size = readable_file_size(size)
assert (
file_ops.download_file(url, local_name, config)
file_ops.download_file(url, local_name, agent=agent)
== f'Successfully downloaded and locally stored file: "{local_name}"! (Size: {readable_size})'
)
assert os.path.isfile(local_name) is True
@@ -365,10 +389,10 @@ def test_download_file(workspace: Workspace, config):
url = "https://github.com/Significant-Gravitas/Auto-GPT/archive/refs/tags/v0.0.0.tar.gz"
assert "Got an HTTP Error whilst trying to download file" in file_ops.download_file(
url, local_name, config
url, local_name, agent=agent
)
url = "https://thiswebsiteiswrong.hmm/v0.0.0.tar.gz"
assert "Failed to establish a new connection:" in file_ops.download_file(
url, local_name, config
url, local_name, agent=agent
)

View File

@@ -1,12 +1,15 @@
from datetime import datetime
from pytest_mock import MockerFixture
from autogpt.agent.agent import Agent
from autogpt.config import AIConfig
from autogpt.config.config import Config
from autogpt.llm.chat import create_chat_completion
from autogpt.log_cycle.log_cycle import LogCycleHandler
def test_get_self_feedback(mocker):
def test_get_self_feedback(config: Config, mocker: MockerFixture):
# Define a sample thoughts dictionary
thoughts = {
"reasoning": "Sample reasoning.",
@@ -32,7 +35,8 @@ def test_get_self_feedback(mocker):
agent_mock = mocker.MagicMock(spec=Agent)
# Mock the config attribute of the Agent instance
agent_mock.config = AIConfig()
agent_mock.config = config
agent_mock.ai_config = AIConfig()
# Mock the log_cycle_handler attribute of the Agent instance
agent_mock.log_cycle_handler = LogCycleHandler()

View File

@@ -2,6 +2,7 @@ import pytest
from git.exc import GitCommandError
from git.repo.base import Repo
from autogpt.agent.agent import Agent
from autogpt.commands.git_operations import clone_repository
@@ -10,7 +11,7 @@ def mock_clone_from(mocker):
return mocker.patch.object(Repo, "clone_from")
def test_clone_auto_gpt_repository(workspace, mock_clone_from, config):
def test_clone_auto_gpt_repository(workspace, mock_clone_from, agent: Agent):
mock_clone_from.return_value = None
repo = "github.com/Significant-Gravitas/Auto-GPT.git"
@@ -20,16 +21,16 @@ def test_clone_auto_gpt_repository(workspace, mock_clone_from, config):
expected_output = f"Cloned {url} to {clone_path}"
clone_result = clone_repository(url=url, clone_path=clone_path, config=config)
clone_result = clone_repository(url=url, clone_path=clone_path, agent=agent)
assert clone_result == expected_output
mock_clone_from.assert_called_once_with(
url=f"{scheme}{config.github_username}:{config.github_api_key}@{repo}",
url=f"{scheme}{agent.config.github_username}:{agent.config.github_api_key}@{repo}",
to_path=clone_path,
)
def test_clone_repository_error(workspace, mock_clone_from, config):
def test_clone_repository_error(workspace, mock_clone_from, agent: Agent):
url = "https://github.com/this-repository/does-not-exist.git"
clone_path = str(workspace.get_path("does-not-exist"))
@@ -37,6 +38,6 @@ def test_clone_repository_error(workspace, mock_clone_from, config):
"clone", "fatal: repository not found", ""
)
result = clone_repository(url=url, clone_path=clone_path, config=config)
result = clone_repository(url=url, clone_path=clone_path, agent=agent)
assert "Error: " in result

View File

@@ -3,6 +3,7 @@ import json
import pytest
from googleapiclient.errors import HttpError
from autogpt.agent.agent import Agent
from autogpt.commands.google_search import (
google_official_search,
google_search,
@@ -39,13 +40,13 @@ def test_safe_google_results_invalid_input():
],
)
def test_google_search(
query, num_results, expected_output, return_value, mocker, config
query, num_results, expected_output, return_value, mocker, agent: Agent
):
mock_ddg = mocker.Mock()
mock_ddg.return_value = return_value
mocker.patch("autogpt.commands.google_search.DDGS.text", mock_ddg)
actual_output = google_search(query, config, num_results=num_results)
actual_output = google_search(query, agent=agent, num_results=num_results)
expected_output = safe_google_results(expected_output)
assert actual_output == expected_output
@@ -79,10 +80,15 @@ def mock_googleapiclient(mocker):
],
)
def test_google_official_search(
query, num_results, expected_output, search_results, mock_googleapiclient, config
query,
num_results,
expected_output,
search_results,
mock_googleapiclient,
agent: Agent,
):
mock_googleapiclient.return_value = search_results
actual_output = google_official_search(query, config, num_results=num_results)
actual_output = google_official_search(query, agent=agent, num_results=num_results)
assert actual_output == safe_google_results(expected_output)
@@ -113,7 +119,7 @@ def test_google_official_search_errors(
mock_googleapiclient,
http_code,
error_msg,
config,
agent: Agent,
):
class resp:
def __init__(self, _status, _reason):
@@ -130,5 +136,5 @@ def test_google_official_search_errors(
)
mock_googleapiclient.side_effect = error
actual_output = google_official_search(query, config, num_results=num_results)
actual_output = google_official_search(query, agent=agent, num_results=num_results)
assert actual_output == safe_google_results(expected_output)

View File

@@ -1,71 +0,0 @@
from unittest import TestCase
from autogpt.json_utils.json_fix_llm import fix_and_parse_json
class TestParseJson(TestCase):
def test_valid_json(self):
"""Test that a valid JSON string is parsed correctly."""
json_str = '{"name": "John", "age": 30, "city": "New York"}'
obj = fix_and_parse_json(json_str)
self.assertEqual(obj, {"name": "John", "age": 30, "city": "New York"})
def test_invalid_json_minor(self):
"""Test that an invalid JSON string can not be fixed without gpt"""
json_str = '{"name": "John", "age": 30, "city": "New York",}'
with self.assertRaises(Exception):
fix_and_parse_json(json_str, try_to_fix_with_gpt=False)
def test_invalid_json_major_with_gpt(self):
"""Test that an invalid JSON string raises an error when try_to_fix_with_gpt is False"""
json_str = 'BEGIN: "name": "John" - "age": 30 - "city": "New York" :END'
with self.assertRaises(Exception):
fix_and_parse_json(json_str, try_to_fix_with_gpt=False)
def test_invalid_json_major_without_gpt(self):
"""Test that a REALLY invalid JSON string raises an error when try_to_fix_with_gpt is False"""
json_str = 'BEGIN: "name": "John" - "age": 30 - "city": "New York" :END'
# Assert that this raises an exception:
with self.assertRaises(Exception):
fix_and_parse_json(json_str, try_to_fix_with_gpt=False)
def test_invalid_json_leading_sentence_with_gpt(self):
"""Test that a REALLY invalid JSON string raises an error when try_to_fix_with_gpt is False"""
json_str = """I suggest we start by browsing the repository to find any issues that we can fix.
{
"command": {
"name": "browse_website",
"args":{
"url": "https://github.com/Torantulino/Auto-GPT"
}
},
"thoughts":
{
"text": "I suggest we start browsing the repository to find any issues that we can fix.",
"reasoning": "Browsing the repository will give us an idea of the current state of the codebase and identify any issues that we can address to improve the repo.",
"plan": "- Look through the repository to find any issues.\n- Investigate any issues to determine what needs to be fixed\n- Identify possible solutions to fix the issues\n- Open Pull Requests with fixes",
"criticism": "I should be careful while browsing so as not to accidentally introduce any new bugs or issues.",
"speak": "I will start browsing the repository to find any issues we can fix."
}
}"""
good_obj = {
"command": {
"name": "browse_website",
"args": {"url": "https://github.com/Torantulino/Auto-GPT"},
},
"thoughts": {
"text": "I suggest we start browsing the repository to find any issues that we can fix.",
"reasoning": "Browsing the repository will give us an idea of the current state of the codebase and identify any issues that we can address to improve the repo.",
"plan": "- Look through the repository to find any issues.\n- Investigate any issues to determine what needs to be fixed\n- Identify possible solutions to fix the issues\n- Open Pull Requests with fixes",
"criticism": "I should be careful while browsing so as not to accidentally introduce any new bugs or issues.",
"speak": "I will start browsing the repository to find any issues we can fix.",
},
}
# # Assert that this can be fixed with GPT
# self.assertEqual(fix_and_parse_json(json_str), good_obj)
# Assert that trying to fix this without GPT raises an exception
with self.assertRaises(Exception):
fix_and_parse_json(json_str, try_to_fix_with_gpt=False)

View File

@@ -1,114 +0,0 @@
# Generated by CodiumAI
from autogpt.json_utils.json_fix_llm import (
fix_and_parse_json,
fix_json_using_multiple_techniques,
)
"""
Code Analysis
Objective:
- The objective of the function is to fix a given JSON string to make it parseable and fully compliant with two techniques.
Inputs:
- The function takes in a string called 'assistant_reply', which is the JSON string to be fixed.
Flow:
- The function first calls the 'fix_and_parse_json' function to parse and print the Assistant response.
- If the parsed JSON is an empty dictionary, the function calls the 'attempt_to_fix_json_by_finding_outermost_brackets' function to fix the JSON string.
- If the parsed JSON is not an empty dictionary, the function returns the parsed JSON.
- If the parsed JSON is an empty dictionary and cannot be fixed, the function logs an error and returns an empty dictionary.
Outputs:
- The main output of the function is a dictionary containing the fixed JSON string.
Additional aspects:
- The function uses two techniques to fix the JSON string: parsing and finding outermost brackets.
- The function logs an error if the JSON string cannot be fixed and returns an empty dictionary.
- The function uses the 'CFG' object to determine whether to speak the error message or not.
"""
class TestFixJsonUsingMultipleTechniques:
# Tests that the function successfully fixes and parses a JSON string that is already compliant with both techniques.
def test_fix_and_parse_json_happy_path(self):
# Happy path test case where the JSON string is already compliant with both techniques
json_string = '{"text": "Hello world", "confidence": 0.9}'
expected_output = {"text": "Hello world", "confidence": 0.9}
assert fix_json_using_multiple_techniques(json_string) == expected_output
# Tests that the function successfully fixes and parses a JSON string that contains only whitespace characters.
# @requires_api_key("OPEN_API_KEY")
def test_fix_and_parse_json_whitespace(self, mocker):
# Happy path test case where the JSON string contains only whitespace characters
json_string = " \n\t "
# mock try_ai_fix to avoid calling the AI model:
mocker.patch("autogpt.json_utils.json_fix_llm.try_ai_fix", return_value={})
expected_output = {}
assert fix_json_using_multiple_techniques(json_string) == expected_output
# Tests that the function successfully converts a string with arrays to an array
def test_fix_and_parse_json_array(self):
# Happy path test case where the JSON string contains an array of string
json_string = '[ "Add type hints", "Move docstrings", "Consider using" ]'
expected_output = ["Add type hints", "Move docstrings", "Consider using"]
assert fix_json_using_multiple_techniques(json_string) == expected_output
# Tests that the function returns an empty dictionary when the JSON string is not parseable and cannot be fixed using either technique.
# @requires_api_key("OPEN_API_KEY")
def test_fix_and_parse_json_can_not(self, mocker):
# Edge case test case where the JSON string is not parseable and cannot be fixed using either technique
json_string = "This is not a JSON string"
# mock try_ai_fix to avoid calling the AI model:
mocker.patch("autogpt.json_utils.json_fix_llm.try_ai_fix", return_value={})
expected_output = {}
# Use the actual function name in the test
result = fix_json_using_multiple_techniques(json_string)
assert result == expected_output
# Tests that the function returns an empty dictionary when the JSON string is empty.
# @requires_api_key("OPEN_API_KEY")
def test_fix_and_parse_json_empty_string(self, mocker):
# Arrange
json_string = ""
# Act
# mock try_ai_fix to avoid calling the AI model:
mocker.patch("autogpt.json_utils.json_fix_llm.try_ai_fix", return_value={})
result = fix_and_parse_json(json_string)
# Assert
assert result == {}
# Tests that the function successfully fixes and parses a JSON string that contains escape characters.
def test_fix_and_parse_json_escape_characters(self):
# Arrange
json_string = '{"text": "This is a \\"test\\" string."}'
# Act
result = fix_json_using_multiple_techniques(json_string)
# Assert
assert result == {"text": 'This is a "test" string.'}
# Tests that the function successfully fixes and parses a JSON string that contains nested objects or arrays.
def test_fix_and_parse_json_nested_objects(self):
# Arrange
json_string = '{"person": {"name": "John", "age": 30}, "hobbies": ["reading", "swimming"]}'
# Act
result = fix_json_using_multiple_techniques(json_string)
# Assert
assert result == {
"person": {"name": "John", "age": 30},
"hobbies": ["reading", "swimming"],
}

View File

@@ -1,128 +0,0 @@
from unittest.mock import patch
import pytest
from openai.error import APIError, RateLimitError
from autogpt.llm import utils as llm_utils
@pytest.fixture(params=[RateLimitError, APIError])
def error(request):
if request.param == APIError:
return request.param("Error", http_status=502)
else:
return request.param("Error")
def error_factory(error_instance, error_count, retry_count, warn_user=True):
class RaisesError:
def __init__(self):
self.count = 0
@llm_utils.retry_openai_api(
num_retries=retry_count, backoff_base=0.001, warn_user=warn_user
)
def __call__(self):
self.count += 1
if self.count <= error_count:
raise error_instance
return self.count
return RaisesError()
def test_retry_open_api_no_error(capsys):
@llm_utils.retry_openai_api()
def f():
return 1
result = f()
assert result == 1
output = capsys.readouterr()
assert output.out == ""
assert output.err == ""
@pytest.mark.parametrize(
"error_count, retry_count, failure",
[(2, 10, False), (2, 2, False), (10, 2, True), (3, 2, True), (1, 0, True)],
ids=["passing", "passing_edge", "failing", "failing_edge", "failing_no_retries"],
)
def test_retry_open_api_passing(capsys, error, error_count, retry_count, failure):
call_count = min(error_count, retry_count) + 1
raises = error_factory(error, error_count, retry_count)
if failure:
with pytest.raises(type(error)):
raises()
else:
result = raises()
assert result == call_count
assert raises.count == call_count
output = capsys.readouterr()
if error_count and retry_count:
if type(error) == RateLimitError:
assert "Reached rate limit, passing..." in output.out
assert "Please double check" in output.out
if type(error) == APIError:
assert "API Bad gateway" in output.out
else:
assert output.out == ""
def test_retry_open_api_rate_limit_no_warn(capsys):
error_count = 2
retry_count = 10
raises = error_factory(RateLimitError, error_count, retry_count, warn_user=False)
result = raises()
call_count = min(error_count, retry_count) + 1
assert result == call_count
assert raises.count == call_count
output = capsys.readouterr()
assert "Reached rate limit, passing..." in output.out
assert "Please double check" not in output.out
def test_retry_openapi_other_api_error(capsys):
error_count = 2
retry_count = 10
raises = error_factory(APIError("Error", http_status=500), error_count, retry_count)
with pytest.raises(APIError):
raises()
call_count = 1
assert raises.count == call_count
output = capsys.readouterr()
assert output.out == ""
def test_check_model(api_manager):
"""
Test if check_model() returns original model when valid.
Test if check_model() returns gpt-3.5-turbo when model is invalid.
"""
with patch("openai.Model.list") as mock_list_models:
# Test when correct model is returned
mock_list_models.return_value = {"data": [{"id": "gpt-4"}]}
result = llm_utils.check_model("gpt-4", "smart_llm_model")
assert result == "gpt-4"
# Reset api manager models
api_manager.models = None
# Test when incorrect model is returned
mock_list_models.return_value = {"data": [{"id": "gpt-3.5-turbo"}]}
result = llm_utils.check_model("gpt-4", "fast_llm_model")
assert result == "gpt-3.5-turbo"
# Reset api manager models
api_manager.models = None

View File

@@ -0,0 +1,25 @@
from unittest.mock import MagicMock
from pytest_mock import MockerFixture
from autogpt.agent.agent import Agent
from autogpt.app import list_agents, start_agent
def test_make_agent(agent: Agent, mocker: MockerFixture) -> None:
"""Test that an agent can be created"""
mock = mocker.patch("openai.ChatCompletion.create")
response = MagicMock()
response.choices[0].message.content = "Test message"
response.usage.prompt_tokens = 1
response.usage.completion_tokens = 1
del response.error
mock.return_value = response
start_agent("Test Agent", "chat", "Hello, how are you?", agent, "gpt-3.5-turbo")
agents = list_agents(agent)
assert "List of agents:\n0: chat" == agents
start_agent("Test Agent 2", "write", "Hello, how are you?", agent, "gpt-3.5-turbo")
agents = list_agents(agent.config)
assert "List of agents:\n0: chat\n1: write" == agents

View File

@@ -0,0 +1,145 @@
import math
import time
from unittest.mock import MagicMock
import pytest
from autogpt.agent import Agent
from autogpt.config import AIConfig
from autogpt.config.config import Config
from autogpt.llm.base import ChatSequence, Message
from autogpt.llm.providers.openai import OPEN_AI_CHAT_MODELS
from autogpt.llm.utils import count_string_tokens
from autogpt.memory.message_history import MessageHistory
@pytest.fixture
def agent(config: Config):
ai_name = "Test AI"
memory = MagicMock()
next_action_count = 0
command_registry = MagicMock()
ai_config = AIConfig(ai_name=ai_name)
system_prompt = "System prompt"
triggering_prompt = "Triggering prompt"
workspace_directory = "workspace_directory"
agent = Agent(
ai_name=ai_name,
memory=memory,
next_action_count=next_action_count,
command_registry=command_registry,
ai_config=ai_config,
config=config,
system_prompt=system_prompt,
triggering_prompt=triggering_prompt,
workspace_directory=workspace_directory,
)
return agent
def test_message_history_batch_summary(mocker, agent):
config = Config()
history = MessageHistory(agent)
model = config.fast_llm_model
message_tlength = 0
message_count = 0
# Setting the mock output and inputs
mock_summary_text = "I executed browse_website command for each of the websites returned from Google search, but none of them have any job openings."
mock_summary = mocker.patch(
"autogpt.memory.message_history.create_chat_completion",
return_value=mock_summary_text,
)
system_prompt = 'You are AIJobSearcher, an AI designed to search for job openings for software engineer role\nYour decisions must always be made independently without seeking user assistance. Play to your strengths as an LLM and pursue simple strategies with no legal complications.\n\nGOALS:\n\n1. Find any job openings for software engineers online\n2. Go through each of the websites and job openings to summarize their requirements and URL, and skip that if you already visit the website\n\nIt takes money to let you run. Your API budget is $5.000\n\nConstraints:\n1. ~4000 word limit for short term memory. Your short term memory is short, so immediately save important information to files.\n2. If you are unsure how you previously did something or want to recall past events, thinking about similar events will help you remember.\n3. No user assistance\n4. Exclusively use the commands listed in double quotes e.g. "command name"\n\nCommands:\n1. google_search: Google Search, args: "query": "<query>"\n2. browse_website: Browse Website, args: "url": "<url>", "question": "<what_you_want_to_find_on_website>"\n3. task_complete: Task Complete (Shutdown), args: "reason": "<reason>"\n\nResources:\n1. Internet access for searches and information gathering.\n2. Long Term memory management.\n3. GPT-3.5 powered Agents for delegation of simple tasks.\n4. File output.\n\nPerformance Evaluation:\n1. Continuously review and analyze your actions to ensure you are performing to the best of your abilities.\n2. Constructively self-criticize your big-picture behavior constantly.\n3. Reflect on past decisions and strategies to refine your approach.\n4. Every command has a cost, so be smart and efficient. Aim to complete tasks in the least number of steps.\n5. Write all code to a file.\n\nYou should only respond in JSON format as described below \nResponse Format: \n{\n "thoughts": {\n "text": "thought",\n "reasoning": "reasoning",\n "plan": "- short bulleted\\n- list that conveys\\n- long-term plan",\n "criticism": "constructive self-criticism",\n "speak": "thoughts summary to say to user"\n },\n "command": {\n "name": "command name",\n "args": {\n "arg name": "value"\n }\n }\n} \nEnsure the response can be parsed by Python json.loads'
message_sequence = ChatSequence.for_model(
model,
[
Message("system", system_prompt),
Message("system", f"The current time and date is {time.strftime('%c')}"),
],
)
insertion_index = len(message_sequence)
user_input = "Determine which next command to use, and respond using the format specified above:'"
user_input_msg = Message("user", user_input)
history.append(user_input_msg)
# mock a reponse from AI
assistant_reply = '{\n "thoughts": {\n "text": "I will use the \'google_search\' command to find more websites with job openings for software engineering manager role.",\n "reasoning": "Since the previous website did not provide any relevant information, I will use the \'google_search\' command to find more websites with job openings for software engineer role.",\n "plan": "- Use \'google_search\' command to find more websites with job openings for software engineer role",\n "criticism": "I need to ensure that I am able to extract the relevant information from each website and job opening.",\n "speak": "I will now use the \'google_search\' command to find more websites with job openings for software engineer role."\n },\n "command": {\n "name": "google_search",\n "args": {\n "query": "software engineer job openings"\n }\n }\n}'
msg = Message("assistant", assistant_reply, "ai_response")
history.append(msg)
message_tlength += count_string_tokens(str(msg), config.fast_llm_model)
message_count += 1
# mock some websites returned from google search command in the past
result = "Command google_search returned: ["
for i in range(50):
result += "http://www.job" + str(i) + ".com,"
result += "]"
msg = Message("system", result, "action_result")
history.append(msg)
message_tlength += count_string_tokens(str(msg), config.fast_llm_model)
message_count += 1
user_input = "Determine which next command to use, and respond using the format specified above:'"
user_input_msg = Message("user", user_input)
history.append(user_input_msg)
# mock numbers of AI response and action results from browse_website commands in the past, doesn't need the thoughts part, as the summarization code discard them anyway
for i in range(50):
assistant_reply = (
'{\n "command": {\n "name": "browse_website",\n "args": {\n "url": "https://www.job'
+ str(i)
+ '.com",\n "question": "software engineer"\n }\n }\n}'
)
msg = Message("assistant", assistant_reply, "ai_response")
history.append(msg)
message_tlength += count_string_tokens(str(msg), config.fast_llm_model)
message_count += 1
result = (
"Command browse_website returned: Answer gathered from website: The text in job"
+ str(i)
+ " does not provide information on specific job requirements or a job URL.]",
)
msg = Message("system", result, "action_result")
history.append(msg)
message_tlength += count_string_tokens(str(msg), config.fast_llm_model)
message_count += 1
user_input = "Determine which next command to use, and respond using the format specified above:'"
user_input_msg = Message("user", user_input)
history.append(user_input_msg)
# only take the last cycle of the message history, trim the rest of previous messages, and generate a summary for them
for cycle in reversed(list(history.per_cycle())):
messages_to_add = [msg for msg in cycle if msg is not None]
message_sequence.insert(insertion_index, *messages_to_add)
break
# count the expected token length of the trimmed message by reducing the token length of messages in the last cycle
for message in messages_to_add:
if message.role != "user":
message_tlength -= count_string_tokens(str(message), config.fast_llm_model)
message_count -= 1
# test the main trim_message function
new_summary_message, trimmed_messages = history.trim_messages(
current_message_chain=list(message_sequence),
)
expected_call_count = math.ceil(
message_tlength / (OPEN_AI_CHAT_MODELS.get(config.fast_llm_model).max_tokens)
)
# Expecting 2 batches because of over max token
assert mock_summary.call_count == expected_call_count # 2 at the time of writing
# Expecting 100 messages because 50 pairs of ai_response and action_result, based on the range set above
assert len(trimmed_messages) == message_count # 100 at the time of writing
assert new_summary_message == Message(
role="system",
content="This reminds you of these events from your past: \n"
+ mock_summary_text,
type=None,
)

View File

@@ -1,10 +1,61 @@
import pytest
import os
from autogpt.plugins import denylist_allowlist_check, inspect_zip_for_modules
import yaml
from autogpt.config.config import Config
from autogpt.plugins import inspect_zip_for_modules, scan_plugins
from autogpt.plugins.plugin_config import PluginConfig
PLUGINS_TEST_DIR = "tests/unit/data/test_plugins"
PLUGIN_TEST_ZIP_FILE = "Auto-GPT-Plugin-Test-master.zip"
PLUGIN_TEST_INIT_PY = "Auto-GPT-Plugin-Test-master/src/auto_gpt_vicuna/__init__.py"
PLUGIN_TEST_OPENAI = "https://weathergpt.vercel.app/"
def test_scan_plugins_openai(config: Config):
config.plugins_openai = [PLUGIN_TEST_OPENAI]
plugins_config = config.plugins_config
plugins_config.plugins[PLUGIN_TEST_OPENAI] = PluginConfig(
name=PLUGIN_TEST_OPENAI, enabled=True
)
# Test that the function returns the correct number of plugins
result = scan_plugins(config, debug=True)
assert len(result) == 1
def test_scan_plugins_generic(config: Config):
# Test that the function returns the correct number of plugins
plugins_config = config.plugins_config
plugins_config.plugins["auto_gpt_guanaco"] = PluginConfig(
name="auto_gpt_guanaco", enabled=True
)
plugins_config.plugins["AutoGPTPVicuna"] = PluginConfig(
name="AutoGPTPVicuna", enabled=True
)
result = scan_plugins(config, debug=True)
plugin_class_names = [plugin.__class__.__name__ for plugin in result]
assert len(result) == 2
assert "AutoGPTGuanaco" in plugin_class_names
assert "AutoGPTPVicuna" in plugin_class_names
def test_scan_plugins_not_enabled(config: Config):
# Test that the function returns the correct number of plugins
plugins_config = config.plugins_config
plugins_config.plugins["auto_gpt_guanaco"] = PluginConfig(
name="auto_gpt_guanaco", enabled=True
)
plugins_config.plugins["auto_gpt_vicuna"] = PluginConfig(
name="auto_gptp_vicuna", enabled=False
)
result = scan_plugins(config, debug=True)
plugin_class_names = [plugin.__class__.__name__ for plugin in result]
assert len(result) == 1
assert "AutoGPTGuanaco" in plugin_class_names
assert "AutoGPTPVicuna" not in plugin_class_names
def test_inspect_zip_for_modules():
@@ -12,62 +63,49 @@ def test_inspect_zip_for_modules():
assert result == [PLUGIN_TEST_INIT_PY]
@pytest.fixture
def mock_config_denylist_allowlist_check():
class MockConfig:
"""Mock config object for testing the denylist_allowlist_check function"""
def test_create_base_config(config: Config):
"""Test the backwards-compatibility shim to convert old plugin allow/deny list to a config file"""
config.plugins_allowlist = ["a", "b"]
config.plugins_denylist = ["c", "d"]
plugins_denylist = ["BadPlugin"]
plugins_allowlist = ["GoodPlugin"]
authorise_key = "y"
exit_key = "n"
os.remove(config.plugins_config_file)
plugins_config = config.load_plugins_config()
return MockConfig()
# Check the structure of the plugins config data
assert len(plugins_config.plugins) == 4
assert plugins_config.get("a").enabled
assert plugins_config.get("b").enabled
assert not plugins_config.get("c").enabled
assert not plugins_config.get("d").enabled
# Check the saved config file
with open(config.plugins_config_file, "r") as saved_config_file:
saved_config = yaml.load(saved_config_file, Loader=yaml.FullLoader)
assert saved_config == {
"a": {"enabled": True, "config": {}},
"b": {"enabled": True, "config": {}},
"c": {"enabled": False, "config": {}},
"d": {"enabled": False, "config": {}},
}
def test_denylist_allowlist_check_denylist(
mock_config_denylist_allowlist_check, monkeypatch
):
# Test that the function returns False when the plugin is in the denylist
monkeypatch.setattr("builtins.input", lambda _: "y")
assert not denylist_allowlist_check(
"BadPlugin", mock_config_denylist_allowlist_check
)
def test_load_config(config: Config):
"""Test that the plugin config is loaded correctly from the plugins_config.yaml file"""
# Create a test config and write it to disk
test_config = {
"a": {"enabled": True, "config": {"api_key": "1234"}},
"b": {"enabled": False, "config": {}},
}
with open(config.plugins_config_file, "w+") as f:
f.write(yaml.dump(test_config))
# Load the config from disk
plugins_config = config.load_plugins_config()
def test_denylist_allowlist_check_allowlist(
mock_config_denylist_allowlist_check, monkeypatch
):
# Test that the function returns True when the plugin is in the allowlist
monkeypatch.setattr("builtins.input", lambda _: "y")
assert denylist_allowlist_check("GoodPlugin", mock_config_denylist_allowlist_check)
def test_denylist_allowlist_check_user_input_yes(
mock_config_denylist_allowlist_check, monkeypatch
):
# Test that the function returns True when the user inputs "y"
monkeypatch.setattr("builtins.input", lambda _: "y")
assert denylist_allowlist_check(
"UnknownPlugin", mock_config_denylist_allowlist_check
)
def test_denylist_allowlist_check_user_input_no(
mock_config_denylist_allowlist_check, monkeypatch
):
# Test that the function returns False when the user inputs "n"
monkeypatch.setattr("builtins.input", lambda _: "n")
assert not denylist_allowlist_check(
"UnknownPlugin", mock_config_denylist_allowlist_check
)
def test_denylist_allowlist_check_user_input_invalid(
mock_config_denylist_allowlist_check, monkeypatch
):
# Test that the function returns False when the user inputs an invalid value
monkeypatch.setattr("builtins.input", lambda _: "invalid")
assert not denylist_allowlist_check(
"UnknownPlugin", mock_config_denylist_allowlist_check
)
# Check that the loaded config is equal to the test config
assert len(plugins_config.plugins) == 2
assert plugins_config.get("a").enabled
assert plugins_config.get("a").config == {"api_key": "1234"}
assert not plugins_config.get("b").enabled
assert plugins_config.get("b").config == {}

View File

@@ -49,25 +49,17 @@ def test_url_validation_succeeds(url):
assert dummy_method(url) == url
bad_protocol_data = (
("htt://example.com"),
("httppp://example.com"),
(" https://example.com"),
@pytest.mark.parametrize(
"url,expected_error",
[
("htt://example.com", "Invalid URL format"),
("httppp://example.com", "Invalid URL format"),
(" https://example.com", "Invalid URL format"),
("http://?query=q", "Missing Scheme or Network location"),
],
)
@pytest.mark.parametrize("url", bad_protocol_data)
def test_url_validation_fails_bad_protocol(url):
with raises(ValueError, match="Invalid URL format"):
dummy_method(url)
missing_loc = (("http://?query=q"),)
@pytest.mark.parametrize("url", missing_loc)
def test_url_validation_fails_bad_protocol(url):
with raises(ValueError, match="Missing Scheme or Network location"):
def test_url_validation_fails_invalid_url(url, expected_error):
with raises(ValueError, match=expected_error):
dummy_method(url)

View File

@@ -4,6 +4,7 @@ from unittest.mock import patch
import pytest
import requests
from autogpt.json_utils.utilities import extract_json_from_response, validate_json
from autogpt.utils import (
get_bulletin_from_web,
get_current_git_branch,
@@ -14,6 +15,37 @@ from autogpt.utils import (
from tests.utils import skip_in_ci
@pytest.fixture
def valid_json_response() -> dict:
return {
"thoughts": {
"text": "My task is complete. I will use the 'task_complete' command to shut down.",
"reasoning": "I will use the 'task_complete' command because it allows me to shut down and signal that my task is complete.",
"plan": "I will use the 'task_complete' command with the reason 'Task complete: retrieved Tesla's revenue in 2022.' to shut down.",
"criticism": "I need to ensure that I have completed all necessary tasks before shutting down.",
"speak": "",
},
"command": {
"name": "task_complete",
"args": {"reason": "Task complete: retrieved Tesla's revenue in 2022."},
},
}
@pytest.fixture
def invalid_json_response() -> dict:
return {
"thoughts": {
"text": "My task is complete. I will use the 'task_complete' command to shut down.",
"reasoning": "I will use the 'task_complete' command because it allows me to shut down and signal that my task is complete.",
"plan": "I will use the 'task_complete' command with the reason 'Task complete: retrieved Tesla's revenue in 2022.' to shut down.",
"criticism": "I need to ensure that I have completed all necessary tasks before shutting down.",
"speak": "",
},
"command": {"name": "", "args": {}},
}
def test_validate_yaml_file_valid():
with open("valid_test_file.yaml", "w") as f:
f.write("setting: value")
@@ -153,5 +185,23 @@ def test_get_current_git_branch_failure(mock_repo):
assert branch_name == ""
if __name__ == "__main__":
pytest.main()
def test_validate_json_valid(valid_json_response):
assert validate_json(valid_json_response)
def test_validate_json_invalid(invalid_json_response):
assert not validate_json(valid_json_response)
def test_extract_json_from_response(valid_json_response: dict):
emulated_response_from_openai = str(valid_json_response)
assert (
extract_json_from_response(emulated_response_from_openai) == valid_json_response
)
def test_extract_json_from_response_wrapped_in_code_block(valid_json_response: dict):
emulated_response_from_openai = "```" + str(valid_json_response) + "```"
assert (
extract_json_from_response(emulated_response_from_openai) == valid_json_response
)

View File

@@ -0,0 +1,71 @@
import os
import openai.api_requestor
import pytest
from pytest_mock import MockerFixture
from .vcr_filter import PROXY, before_record_request, before_record_response
DEFAULT_RECORD_MODE = "new_episodes"
BASE_VCR_CONFIG = {
"before_record_request": before_record_request,
"before_record_response": before_record_response,
"filter_headers": [
"Authorization",
"X-OpenAI-Client-User-Agent",
"User-Agent",
],
"match_on": ["method", "body"],
}
@pytest.fixture(scope="session")
def vcr_config(get_base_vcr_config):
return get_base_vcr_config
@pytest.fixture(scope="session")
def get_base_vcr_config(request):
record_mode = request.config.getoption("--record-mode", default="new_episodes")
config = BASE_VCR_CONFIG
if record_mode is None:
config["record_mode"] = DEFAULT_RECORD_MODE
return config
@pytest.fixture()
def vcr_cassette_dir(request):
test_name = os.path.splitext(request.node.name)[0]
return os.path.join("tests/Auto-GPT-test-cassettes", test_name)
def patch_api_base(requestor):
new_api_base = f"{PROXY}/v1"
requestor.api_base = new_api_base
return requestor
@pytest.fixture
def patched_api_requestor(mocker: MockerFixture):
original_init = openai.api_requestor.APIRequestor.__init__
original_validate_headers = openai.api_requestor.APIRequestor._validate_headers
def patched_init(requestor, *args, **kwargs):
original_init(requestor, *args, **kwargs)
patch_api_base(requestor)
def patched_validate_headers(self, supplied_headers):
headers = original_validate_headers(self, supplied_headers)
headers["AGENT-MODE"] = os.environ.get("AGENT_MODE")
headers["AGENT-TYPE"] = os.environ.get("AGENT_TYPE")
return headers
if PROXY:
mocker.patch("openai.api_requestor.APIRequestor.__init__", new=patched_init)
mocker.patch.object(
openai.api_requestor.APIRequestor,
"_validate_headers",
new=patched_validate_headers,
)

View File

@@ -1,8 +1,9 @@
import json
import os
import re
from typing import Any, Dict, List
from tests.conftest import PROXY
PROXY = os.environ.get("PROXY")
REPLACEMENTS: List[Dict[str, str]] = [
{