refactor(benchmark/challenge): Set up structure to support more challenge providers

- Move `Challenge`, `ChallengeData`, `load_challenges` to `challenges/builtin.py` and rename to `BuiltinChallenge`, `BuiltinChallengeSpec`, `load_builtin_challenges`
- Create `BaseChallenge` to serve as interface and base class for different challenge implementations
- Create `ChallengeInfo` model to serve as universal challenge info object
- Create `get_challenge_from_source_uri` function in `challenges/__init__.py`
- Replace `ChallengeData` by `ChallengeInfo` everywhere except in `BuiltinChallenge`
- Add strong typing to `task_informations` store in app.py
- Use `call.duration` in `finalize_test_report` and remove `timer` fixture
- Update docstring on `challenges/__init__.py:get_unique_categories`
- Add docstring to `generate_test.py`
This commit is contained in:
Reinier van der Leer
2024-01-09 18:10:45 +01:00
parent 5df2aa7939
commit 7d6476d329
10 changed files with 664 additions and 534 deletions

View File

@@ -2,9 +2,15 @@ import logging
import os
import time
from pathlib import Path
from typing import Optional
from typing import AsyncIterator, Optional
from agent_protocol_client import AgentApi, ApiClient, Configuration, TaskRequestBody
from agent_protocol_client import (
AgentApi,
ApiClient,
Configuration,
Step,
TaskRequestBody,
)
from agbenchmark.agent_interface import get_list_of_file_paths
from agbenchmark.config import AgentBenchmarkConfig
@@ -17,7 +23,7 @@ async def run_api_agent(
config: AgentBenchmarkConfig,
timeout: int,
artifacts_location: Optional[Path] = None,
) -> None:
) -> AsyncIterator[Step]:
configuration = Configuration(host=config.host)
async with ApiClient(configuration) as api_client:
api_instance = AgentApi(api_client)
@@ -34,12 +40,9 @@ async def run_api_agent(
api_instance, artifacts_location, task_id, "artifacts_in"
)
i = 1
while True:
step = await api_instance.execute_agent_task_step(task_id=task_id)
print(f"- step {step.name} ({i}. request)")
i += 1
yield step
if time.time() - start_time > timeout:
raise TimeoutError("Time limit exceeded")

View File

@@ -5,10 +5,10 @@ import logging
import sys
import time
import uuid
from collections import defaultdict, deque
from collections import deque
from multiprocessing import Process
from pathlib import Path
from typing import Any, Optional
from typing import Optional
import httpx
import psutil
@@ -18,6 +18,7 @@ from fastapi import APIRouter, FastAPI, HTTPException, Request, Response
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Extra, ValidationError
from agbenchmark.challenges import ChallengeInfo
from agbenchmark.config import AgentBenchmarkConfig
from agbenchmark.reports.processing.report_types_v2 import (
BenchmarkRun,
@@ -27,14 +28,13 @@ from agbenchmark.reports.processing.report_types_v2 import (
TaskInfo,
)
from agbenchmark.schema import TaskEvalRequestBody
from agbenchmark.utils.data_types import ChallengeData
from agbenchmark.utils.utils import write_pretty_json
sys.path.append(str(Path(__file__).parent.parent))
logger = logging.getLogger(__name__)
CHALLENGES: dict[str, ChallengeData] = {}
CHALLENGES: dict[str, ChallengeInfo] = {}
challenges_path = Path(__file__).parent / "challenges"
challenge_spec_files = deque(
glob.glob(
@@ -52,7 +52,7 @@ while challenge_spec_files:
logger.debug(f"Loading {challenge_relpath}...")
try:
challenge_info = ChallengeData.parse_file(challenge_spec_file)
challenge_info = ChallengeInfo.parse_file(challenge_spec_file)
except ValidationError as e:
if logging.getLogger().level == logging.DEBUG:
logger.warning(f"Spec file {challenge_relpath} failed to load:\n{e}")
@@ -68,7 +68,14 @@ while challenge_spec_files:
CHALLENGES[challenge_info.eval_id] = challenge_info
task_informations = defaultdict(dict[str, Any])
class BenchmarkTaskInfo(BaseModel):
task_id: str
start_time: datetime.datetime
challenge_info: ChallengeInfo
task_informations: dict[str, BenchmarkTaskInfo] = {}
def find_agbenchmark_without_uvicorn():
@@ -124,12 +131,8 @@ def stream_output(pipe):
def setup_fastapi_app(agbenchmark_config: AgentBenchmarkConfig) -> FastAPI:
from agbenchmark.agent_api_interface import (
download_agent_artifacts_into_folder,
upload_artifacts,
)
from agbenchmark.agent_interface import copy_challenge_artifacts_into_workspace
from agbenchmark.generate_test import create_challenge_from_spec_file
from agbenchmark.agent_api_interface import upload_artifacts
from agbenchmark.challenges import get_challenge_from_source_uri
from agbenchmark.main import run_benchmark
configuration = Configuration(
@@ -231,28 +234,29 @@ def setup_fastapi_app(agbenchmark_config: AgentBenchmarkConfig) -> FastAPI:
}
"""
try:
challenge_info = CHALLENGES[task_eval_request.eval_id]
async with ApiClient(configuration) as api_client:
api_instance = AgentApi(api_client)
task_input = CHALLENGES[task_eval_request.eval_id].task
task_input = challenge_info.task
task_request_body = TaskRequestBody(input=task_input)
task_response = await api_instance.create_agent_task(
task_request_body=task_request_body
)
task_informations[task_response.task_id][
"benchmark_start_time"
] = datetime.datetime.now(datetime.timezone.utc).strftime(
"%Y-%m-%dT%H:%M:%S+00:00"
)
task_informations[task_response.task_id][
"eval_id"
] = task_eval_request.eval_id
await upload_artifacts(
api_instance,
CHALLENGES[task_eval_request.eval_id].spec_file.parent,
task_response.task_id,
"artifacts_in",
task_info = BenchmarkTaskInfo(
task_id=task_response.task_id,
start_time=datetime.datetime.now(datetime.timezone.utc),
challenge_info=challenge_info,
)
task_informations[task_info.task_id] = task_info
if input_artifacts_dir := challenge_info.task_artifacts_dir:
await upload_artifacts(
api_instance,
input_artifacts_dir,
task_response.task_id,
"artifacts_in",
)
return task_response
except ApiException as e:
logger.error(f"Error whilst trying to create a task:\n{e}")
@@ -281,45 +285,39 @@ def setup_fastapi_app(agbenchmark_config: AgentBenchmarkConfig) -> FastAPI:
@router.post("/agent/tasks/{task_id}/evaluations")
async def create_evaluation(task_id: str) -> BenchmarkRun:
challenge_info = CHALLENGES[task_informations[task_id]["eval_id"]]
workspace = agbenchmark_config.temp_folder
task_info = task_informations[task_id]
challenge = get_challenge_from_source_uri(task_info.challenge_info.source_uri)
try:
async with ApiClient(configuration) as api_client:
api_instance = AgentApi(api_client)
await download_agent_artifacts_into_folder(
api_instance, task_id, workspace
eval_results = await challenge.evaluate_task_state(
api_instance, task_id
)
artifact_path = challenge_info.spec_file.parent
copy_challenge_artifacts_into_workspace(
artifact_path, "custom_python", workspace
)
challenge = create_challenge_from_spec_file(challenge_info.spec_file)
scores = challenge.get_scores(workspace)
is_score_100 = 1 in scores["values"]
eval_info = BenchmarkRun(
repository_info=RepositoryInfo(),
run_details=RunDetails(
command=f"agbenchmark --test={challenge_info.name}",
command=f"agbenchmark --test={challenge.info.name}",
benchmark_start_time=(
task_informations[task_id]["benchmark_start_time"]
task_info.start_time.strftime("%Y-%m-%dT%H:%M:%S+00:00")
),
test_name=challenge_info.name,
test_name=challenge.info.name,
),
task_info=TaskInfo(
data_path=str(
challenge_info.spec_file.relative_to(challenges_path.parent)
),
data_path=challenge.info.source_uri,
is_regression=None,
category=[c.value for c in challenge_info.category],
task=challenge_info.task,
answer=challenge_info.ground.answer,
description=challenge_info.info.description,
category=[c.value for c in challenge.info.category],
task=challenge.info.task,
answer=challenge.info.reference_answer or "",
description=challenge.info.description or "",
),
metrics=Metrics(
success=is_score_100,
success=all(e.passed for e in eval_results),
success_percentage=(
100 * sum(e.score for e in eval_results) / len(eval_results)
if eval_results # avoid division by 0
else 0
),
attempted=True,
),
config={},

View File

@@ -3,14 +3,26 @@ import json
import logging
from pathlib import Path
from .base import BaseChallenge, ChallengeInfo
from .builtin import OPTIONAL_CATEGORIES
logger = logging.getLogger(__name__)
def get_challenge_from_source_uri(source_uri: str) -> type[BaseChallenge]:
from .builtin import BuiltinChallenge
provider_prefix = source_uri.split("/", 1)[0]
if provider_prefix == BuiltinChallenge.SOURCE_URI_PREFIX:
return BuiltinChallenge.from_source_uri(source_uri)
raise ValueError(f"Cannot resolve source_uri '{source_uri}'")
def get_unique_categories() -> set[str]:
"""
Find all data.json files in the directory relative to this file and its
subdirectories, read the "category" field from each file, and return a set of unique
categories.
Reads all challenge spec files and returns a set of all their categories.
"""
categories = set()
@@ -30,3 +42,11 @@ def get_unique_categories() -> set[str]:
continue
return categories
__all__ = [
"BaseChallenge",
"ChallengeInfo",
"get_unique_categories",
"OPTIONAL_CATEGORIES",
]

View File

@@ -0,0 +1,99 @@
import logging
from abc import ABC, abstractmethod
from pathlib import Path
from typing import AsyncIterator, ClassVar, Optional
import pytest
from agent_protocol_client import AgentApi, Step
from colorama import Fore, Style
from pydantic import BaseModel, Field
from agbenchmark.config import AgentBenchmarkConfig
from agbenchmark.utils.data_types import Category, DifficultyLevel, EvalResult
logger = logging.getLogger(__name__)
class ChallengeInfo(BaseModel):
eval_id: str = ""
name: str
task: str
task_artifacts_dir: Optional[Path] = None
category: list[Category]
difficulty: Optional[DifficultyLevel] = None
description: Optional[str] = None
dependencies: list[str] = Field(default_factory=list)
reference_answer: Optional[str]
source_uri: str
"""Internal reference indicating the source of the challenge specification"""
class BaseChallenge(ABC):
"""
The base class and shared interface for all specific challenge implementations.
"""
info: ClassVar[ChallengeInfo]
@classmethod
@abstractmethod
def from_source_uri(cls, source_uri: str) -> type["BaseChallenge"]:
"""
Construct an individual challenge subclass from a suitable `source_uri` (as in
`ChallengeInfo.source_uri`).
"""
...
@abstractmethod
def test_method(
self, config: AgentBenchmarkConfig, request: pytest.FixtureRequest
) -> None:
"""
Test method for use by Pytest-based benchmark sessions. Should return normally
if the challenge passes, and raise a (preferably descriptive) error otherwise.
"""
...
@classmethod
async def run_challenge(
cls, config: AgentBenchmarkConfig, timeout: int
) -> AsyncIterator[Step]:
"""
Runs the challenge on the subject agent with the specified timeout.
Also prints basic challenge and status info to STDOUT.
Params:
config: The subject agent's benchmark config.
timeout: Timeout (seconds) after which to stop the run if not finished.
Yields:
Step: The steps generated by the agent for the challenge task.
"""
# avoid circular import
from agbenchmark.agent_api_interface import run_api_agent
print()
print(
f"{Fore.MAGENTA + Style.BRIGHT}{'='*24} "
f"Starting {cls.info.name} challenge"
f" {'='*24}{Style.RESET_ALL}"
)
print(f"{Fore.CYAN}Timeout:{Fore.RESET} {timeout} seconds")
print(f"{Fore.CYAN}Task:{Fore.RESET} {cls.info.task}")
print()
logger.debug(f"Starting {cls.info.name} challenge run")
i = 0
async for step in run_api_agent(cls.info.task, config, timeout):
i += 1
print(f"[{cls.info.name}] - step {step.name} ({i}. request)")
yield step
logger.debug(f"Finished {cls.info.name} challenge run")
@classmethod
@abstractmethod
async def evaluate_task_state(
cls, agent: AgentApi, task_id: str
) -> list[EvalResult]:
...

View File

@@ -0,0 +1,422 @@
from collections import deque
import glob
import json
import logging
import os
import subprocess
import sys
import tempfile
from pathlib import Path
from typing import Any, ClassVar, Iterator, Literal, Optional
import openai
import pytest
from agent_protocol_client import AgentApi, ApiClient, Configuration as ClientConfig
from colorama import Fore, Style
from pydantic import BaseModel, constr, Field, validator
from agbenchmark.agent_api_interface import download_agent_artifacts_into_folder
from agbenchmark.agent_interface import copy_challenge_artifacts_into_workspace
from agbenchmark.config import AgentBenchmarkConfig
from agbenchmark.utils.data_types import Category, DifficultyLevel, EvalResult
from agbenchmark.utils.prompts import (
END_PROMPT,
FEW_SHOT_EXAMPLES,
PROMPT_MAP,
SCORING_MAP,
)
from .base import BaseChallenge, ChallengeInfo
logger = logging.getLogger(__name__)
with open(
Path(__file__).parent / "optional_categories.json"
) as f:
OPTIONAL_CATEGORIES: list[str] = json.load(f)["optional_categories"]
class BuiltinChallengeSpec(BaseModel):
eval_id: str = ""
name: str
task: str
category: list[Category]
dependencies: list[str]
cutoff: int
class Info(BaseModel):
difficulty: DifficultyLevel
description: constr(regex=r"^Tests if the agent can.*")
side_effects: list[str] = Field(default_factory=list)
info: Info
class Ground(BaseModel):
answer: str
should_contain: Optional[list[str]] = None
should_not_contain: Optional[list[str]] = None
files: list[str]
case_sensitive: Optional[bool] = True
class Eval(BaseModel):
type: str
scoring: Optional[Literal["percentage", "scale", "binary"]]
template: Optional[Literal["rubric", "reference", "question", "custom"]]
examples: Optional[str]
@validator("scoring", "template", always=True)
def validate_eval_fields(cls, v, values, field):
if "type" in values and values["type"] == "llm":
if v is None:
raise ValueError(
f"{field.name} must be provided when eval type is 'llm'"
)
else:
if v is not None:
raise ValueError(
f"{field.name} should only exist when eval type is 'llm'"
)
return v
eval: Eval
ground: Ground
metadata: Optional[dict[str, Any]] = None
spec_file: Path | None = Field(None, exclude=True)
class BuiltinChallenge(BaseChallenge):
"""
Base class for AGBenchmark's built-in challenges (challenges/**/*.json).
All of the logic is present in this class. Individual challenges are created as
subclasses of `BuiltinChallenge` with challenge-specific values assigned to the
ClassVars `_spec` etc.
Dynamically constructing subclasses rather than class instances for the individual
challenges makes them suitable for collection by Pytest, which will run their
`test_method` like any regular test item.
"""
_spec: ClassVar[BuiltinChallengeSpec]
CHALLENGE_LOCATION: ClassVar[str]
ARTIFACTS_LOCATION: ClassVar[str]
SOURCE_URI_PREFIX = "__BUILTIN__"
@classmethod
def from_challenge_spec(
cls, spec: BuiltinChallengeSpec
) -> type["BuiltinChallenge"]:
if not spec.spec_file:
raise ValueError("spec.spec_file not defined")
challenge_info = ChallengeInfo(
eval_id=spec.eval_id,
name=spec.name,
task=spec.task,
task_artifacts_dir=spec.spec_file.parent,
category=spec.category,
difficulty=spec.info.difficulty,
description=spec.info.description,
dependencies=spec.dependencies,
reference_answer=spec.ground.answer,
source_uri=(
f"__BUILTIN__/{spec.spec_file.relative_to(Path(__file__).parent)}"
)
)
challenge_class_name = f"Test{challenge_info.name}"
logger.debug(f"Creating {challenge_class_name} from spec: {spec.spec_file}")
return type(
challenge_class_name,
(BuiltinChallenge,),
{
"info": challenge_info,
"_spec": spec,
"CHALLENGE_LOCATION": str(spec.spec_file),
"ARTIFACTS_LOCATION": str(spec.spec_file.resolve().parent),
},
)
@classmethod
def from_challenge_spec_file(cls, spec_file: Path) -> type["BuiltinChallenge"]:
challenge_spec = BuiltinChallengeSpec.parse_file(spec_file)
challenge_spec.spec_file = spec_file
return cls.from_challenge_spec(challenge_spec)
@classmethod
def from_source_uri(cls, source_uri: str) -> type["BuiltinChallenge"]:
if not source_uri.startswith(cls.SOURCE_URI_PREFIX):
raise ValueError(f"Invalid source_uri for BuiltinChallenge: {source_uri}")
path = source_uri.split("/", 1)[1]
spec_file = Path(__file__).parent / path
return cls.from_challenge_spec_file(spec_file)
@pytest.mark.asyncio
async def test_method(
self, config: AgentBenchmarkConfig, request: pytest.FixtureRequest
) -> None:
if os.environ.get("HELICONE_API_KEY"):
from helicone.lock import HeliconeLockManager
HeliconeLockManager.write_custom_property("challenge", self.info.name)
timeout = self._spec.cutoff or 60
if request.config.getoption("--nc"):
timeout = 100000
elif cutoff := request.config.getoption("--cutoff"):
timeout = int(cutoff) # type: ignore
task_id = ""
timed_out = None
try:
async for step in self.run_challenge(config, timeout):
if not task_id:
task_id = step.task_id
timed_out = False
except TimeoutError:
timed_out = True
request.node.user_properties.append(("timed_out", timed_out))
agent_client_config = ClientConfig(host=config.host)
async with ApiClient(agent_client_config) as api_client:
api_instance = AgentApi(api_client)
eval_results = await self.evaluate_task_state(api_instance, task_id)
if not eval_results:
if timed_out:
raise TimeoutError("Timed out, no results to evaluate")
else:
raise ValueError("No results to evaluate")
request.node.user_properties.append((
"answers",
[r.result for r in eval_results]
if request.config.getoption("--keep-answers")
else None
))
request.node.user_properties.append(("scores", [r.score for r in eval_results]))
# FIXME: this allows partial failure
assert any(r.passed for r in eval_results), (
f"No passed evals: {eval_results}" if not timed_out
else f"Timed out; no passed evals: {eval_results}"
)
@classmethod
async def evaluate_task_state(
cls, agent: AgentApi, task_id: str
) -> list[EvalResult]:
with tempfile.TemporaryDirectory() as workspace:
workspace = Path(workspace)
await download_agent_artifacts_into_folder(
agent, task_id, workspace
)
if cls.info.task_artifacts_dir:
copy_challenge_artifacts_into_workspace(
cls.info.task_artifacts_dir, "custom_python", workspace
)
return list(cls.evaluate_workspace_content(workspace))
@classmethod
def evaluate_workspace_content(cls, workspace: Path) -> Iterator[EvalResult]:
if cls._spec.task == "" and os.getenv("IS_MOCK"):
yield EvalResult(
result="This is a mock answer",
result_source="step_output",
score=1.0,
passed=True,
)
return
result_ground = cls._spec.ground
outputs_for_eval = cls.get_outputs_for_eval(workspace, result_ground)
if result_ground.should_contain or result_ground.should_not_contain:
for source, content in outputs_for_eval:
score = cls.score_result(content, result_ground)
if score is not None:
print(f"{Fore.GREEN}Your score is:{Style.RESET_ALL}", score)
yield EvalResult(
result=content,
result_source=str(source),
score=score,
passed=score > 0.9, # FIXME: arbitrary threshold
)
if result_ground.eval.type == "llm":
combined_results = "\n".join(output[1] for output in outputs_for_eval)
llm_eval = cls.score_result_with_llm(combined_results, result_ground)
print(f"{Fore.GREEN}Your score is:{Style.RESET_ALL}", llm_eval)
if result_ground.eval.scoring == "percentage":
score = llm_eval / 100
elif result_ground.eval.scoring == "scale":
score = llm_eval / 10
else:
score = llm_eval
yield EvalResult(
result=combined_results,
result_source=", ".join(str(res[0]) for res in outputs_for_eval),
score=score,
passed=score > 0.9, # FIXME: arbitrary threshold
)
@staticmethod
def get_outputs_for_eval(
workspace: str | Path | dict[str, str], ground: BuiltinChallengeSpec.Ground
) -> Iterator[tuple[str | Path, str]]:
if isinstance(workspace, dict):
workspace = workspace["output"]
script_dir = workspace
for file_pattern in ground.files:
# Check if it is a file extension
if file_pattern.startswith("."):
# Find all files with the given extension in the workspace
matching_files = glob.glob(os.path.join(script_dir, "*" + file_pattern))
else:
# Otherwise, it is a specific file
matching_files = [os.path.join(script_dir, file_pattern)]
for file_path in matching_files:
if ground.eval.type == "python":
result = subprocess.run(
[sys.executable, file_path],
cwd=os.path.abspath(workspace),
capture_output=True,
text=True,
)
if "error" in result.stderr or result.returncode != 0:
print(result.stderr)
assert False, result.stderr
yield (
Path(file_path).relative_to(workspace),
f"Output: {result.stdout}\n"
)
else:
with open(file_path, "r") as f:
yield Path(file_path).relative_to(workspace), f.read()
else:
if ground.eval.type == "pytest":
result = subprocess.run(
[sys.executable, "-m", "pytest"],
cwd=os.path.abspath(workspace),
capture_output=True,
text=True,
)
if "error" in result.stderr or result.returncode != 0:
print(result.stderr)
assert False, result.stderr
yield "pytest", f"Output: {result.stdout}\n"
@staticmethod
def score_result(
content: str, ground: BuiltinChallengeSpec.Ground
) -> float | None:
print(f"{Fore.BLUE}Scoring content:{Style.RESET_ALL}", content)
if ground.should_contain:
for should_contain_word in ground.should_contain:
if not ground.case_sensitive:
should_contain_word = should_contain_word.lower()
content = content.lower()
print_content = (
f"{Fore.BLUE}Word that should exist{Style.RESET_ALL}"
f" - {should_contain_word}:"
)
if should_contain_word not in content:
print(print_content, "False")
return 0.0
else:
print(print_content, "True")
return 1.0
if ground.should_not_contain:
for should_not_contain_word in ground.should_not_contain:
if not ground.case_sensitive:
should_not_contain_word = should_not_contain_word.lower()
content = content.lower()
print_content = (
f"{Fore.BLUE}Word that should not exist{Style.RESET_ALL}"
f" - {should_not_contain_word}:"
)
if should_not_contain_word in content:
print(print_content, "False")
return 0.0
else:
print(print_content, "True")
return 1.0
@classmethod
def score_result_with_llm(
cls, content: str, ground: BuiltinChallengeSpec.Ground
) -> float:
if os.getenv("IS_MOCK"):
return 1.0
# the validation for this is done in the Eval BaseModel
scoring = SCORING_MAP[ground.eval.scoring] # type: ignore
prompt = PROMPT_MAP[ground.eval.template].format( # type: ignore
task=cls._spec.task, scoring=scoring, answer=ground.answer, response=content
)
if ground.eval.examples:
prompt += FEW_SHOT_EXAMPLES.format(examples=ground.eval.examples)
prompt += END_PROMPT
answer = openai.ChatCompletion.create(
model="gpt-4",
messages=[
{"role": "system", "content": prompt},
],
)
return float(answer["choices"][0]["message"]["content"]) # type: ignore
def load_builtin_challenges() -> Iterator[type[BuiltinChallenge]]:
logger.info("Loading built-in challenges...")
challenges_path = os.path.dirname(__file__)
logger.debug(f"Looking for challenge spec files in {challenges_path}...")
json_files = deque(
glob.glob(
f"{challenges_path}/**/data.json",
recursive=True,
)
)
logger.debug(f"Found {len(json_files)} built-in challenges.")
loaded, ignored = 0, 0
while json_files:
# Take and remove the first element from json_files
json_file = json_files.popleft()
if _challenge_should_be_ignored(json_file):
ignored += 1
continue
challenge = BuiltinChallenge.from_challenge_spec_file(Path(json_file))
logger.debug(f"Generated test for {challenge.info.name}")
yield challenge
loaded += 1
logger.info(
f"Loading built-in challenges complete: loaded {loaded}, ignored {ignored}."
)
def _challenge_should_be_ignored(json_file_path: str):
return (
"challenges/deprecated" in json_file_path
or "challenges/library" in json_file_path
)

View File

@@ -10,6 +10,7 @@ from typing import Generator
import pytest
from agbenchmark.challenges import OPTIONAL_CATEGORIES, BaseChallenge
from agbenchmark.config import AgentBenchmarkConfig
from agbenchmark.reports.ReportManager import RegressionTestsTracker
from agbenchmark.reports.reports import (
@@ -17,7 +18,6 @@ from agbenchmark.reports.reports import (
initialize_test_report,
session_finish,
)
from agbenchmark.utils.challenge import OPTIONAL_CATEGORIES, Challenge
from agbenchmark.utils.data_types import Category
GLOBAL_TIMEOUT = (
@@ -149,24 +149,6 @@ def mock(request: pytest.FixtureRequest) -> bool:
return request.config.getoption("--mock")
@pytest.fixture(autouse=True, scope="function")
def timer(request: pytest.FixtureRequest) -> Generator[None, None, None]:
"""
Pytest fixture that times the execution of each test.
At the start of each test, it records the current time.
After the test function completes, it calculates the run time and adds it to
the test node's `user_properties`.
Args:
request: The `pytest.FixtureRequest` object through which the run time is stored
in the test node's `user_properties`.
"""
start_time = time.time()
yield
run_time = time.time() - start_time
request.node.user_properties.append(("run_time", run_time))
def pytest_runtest_makereport(item: pytest.Item, call: pytest.CallInfo) -> None:
"""
Pytest hook that is called when a test report is being generated.
@@ -176,12 +158,12 @@ def pytest_runtest_makereport(item: pytest.Item, call: pytest.CallInfo) -> None:
item: The test item for which the report is being generated.
call: The call object from which the test result is retrieved.
"""
challenge: type[Challenge] = item.cls # type: ignore
challenge: type[BaseChallenge] = item.cls # type: ignore
if call.when == "setup":
test_name = item.nodeid.split("::")[1]
item.user_properties.append(("test_name", test_name))
initialize_test_report(item, challenge.data)
initialize_test_report(item, challenge.info)
if call.when == "call":
finalize_test_report(item, call, agbenchmark_config)
@@ -254,7 +236,7 @@ def pytest_collection_modifyitems(
challenge = item.cls
challenge_name = item.cls.__name__
if not issubclass(challenge, Challenge):
if not issubclass(challenge, BaseChallenge):
item.warn(
pytest.PytestCollectionWarning(
f"Non-challenge item collected: {challenge}"
@@ -264,7 +246,7 @@ def pytest_collection_modifyitems(
continue
# --test: remove the test from the set if it's not specifically selected
if selected_tests and challenge.data.name not in selected_tests:
if selected_tests and challenge.info.name not in selected_tests:
items.remove(item)
continue
@@ -272,8 +254,8 @@ def pytest_collection_modifyitems(
# --maintain -> only challenges expected to be passed (= regression tests)
# --improve -> only challenges that so far are not passed (reliably)
# --explore -> only challenges that have never been passed
is_regression_test = rt_tracker.has_regression_test(challenge.data.name)
has_been_passed = challenges_beaten_in_the_past.get(challenge.data.name, False)
is_regression_test = rt_tracker.has_regression_test(challenge.info.name)
has_been_passed = challenges_beaten_in_the_past.get(challenge.info.name, False)
if (
(config.getoption("--maintain") and not is_regression_test)
or (config.getoption("--improve") and is_regression_test)
@@ -282,7 +264,7 @@ def pytest_collection_modifyitems(
items.remove(item)
continue
dependencies = challenge.data.dependencies
dependencies = challenge.info.dependencies
if (
config.getoption("--test")
or config.getoption("--no-dep")
@@ -300,7 +282,7 @@ def pytest_collection_modifyitems(
]
# Set category markers
challenge_categories = set(c.value for c in challenge.data.category)
challenge_categories = set(c.value for c in challenge.info.category)
for category in challenge_categories:
item.add_marker(category)

View File

@@ -1,75 +1,24 @@
import glob
"""
AGBenchmark's test discovery endpoint for Pytest.
This module is picked up by Pytest's *_test.py file matching pattern, and all challenge
classes in the module that conform to the `Test*` pattern are collected.
"""
import importlib
import logging
import os
from collections import deque
from pathlib import Path
from agbenchmark.utils.challenge import Challenge
from agbenchmark.utils.data_types import ChallengeData
DATA_CATEGORY = {}
from agbenchmark.challenges.builtin import load_builtin_challenges
logger = logging.getLogger(__name__)
DATA_CATEGORY = {}
def create_challenge_from_spec_file(spec_file: Path) -> type[Challenge]:
challenge = Challenge.from_challenge_spec(spec_file)
DATA_CATEGORY[challenge.data.name] = challenge.data.category[0].value
return challenge
def create_challenge_from_spec_file_path(spec_file_path: str) -> type[Challenge]:
spec_file = Path(spec_file_path).resolve()
return create_challenge_from_spec_file(spec_file)
def load_challenges() -> None:
logger.info("Loading challenges...")
challenges_path = os.path.join(os.path.dirname(__file__), "challenges")
logger.debug(f"Looking for challenges in {challenges_path}...")
json_files = deque(
glob.glob(
f"{challenges_path}/**/data.json",
recursive=True,
)
)
logger.debug(f"Found {len(json_files)} challenges.")
logger.debug(f"Sample path: {json_files[0]}")
loaded, ignored = 0, 0
while json_files:
# Take and remove the first element from json_files
json_file = json_files.popleft()
if challenge_should_be_ignored(json_file):
ignored += 1
continue
challenge_info = ChallengeData.parse_file(json_file)
challenge_class = create_challenge_from_spec_file_path(json_file)
logger.debug(f"Generated test for {challenge_info.name}")
_add_challenge_to_module(challenge_class)
loaded += 1
logger.info(f"Loading challenges complete: loaded {loaded}, ignored {ignored}.")
def challenge_should_be_ignored(json_file_path: str):
return (
"challenges/deprecated" in json_file_path
or "challenges/library" in json_file_path
)
def _add_challenge_to_module(challenge: type[Challenge]):
# Load challenges and attach them to this module
for challenge in load_builtin_challenges():
# Attach the Challenge class to this module so it can be discovered by pytest
module = importlib.import_module(__name__)
setattr(module, f"{challenge.__name__}", challenge)
setattr(module, challenge.__name__, challenge)
load_challenges()
# Build a map of challenge names and their primary category
DATA_CATEGORY[challenge.info.name] = challenge.info.category[0].value

View File

@@ -6,10 +6,11 @@ from pathlib import Path
import pytest
from agbenchmark.challenges import ChallengeInfo
from agbenchmark.config import AgentBenchmarkConfig
from agbenchmark.reports.processing.report_types import Metrics, Test
from agbenchmark.reports.ReportManager import SingletonReportManager
from agbenchmark.utils.data_types import ChallengeData, DifficultyLevel
from agbenchmark.utils.data_types import DifficultyLevel
from agbenchmark.utils.get_data_from_helicone import get_data_from_helicone
from agbenchmark.utils.utils import calculate_success_percentage
@@ -53,10 +54,9 @@ def update_regression_tests(
def initialize_test_report(
item: pytest.Item,
challenge_info: ChallengeData,
challenge_info: ChallengeInfo,
):
difficulty = challenge_info.info.difficulty
difficulty = challenge_info.difficulty
if isinstance(difficulty, DifficultyLevel):
difficulty = difficulty.value
@@ -66,14 +66,14 @@ def initialize_test_report(
# item.test_name = test_name
test_info = dict(item.user_properties).get("info_details") or Test(
data_path=str(challenge_info.spec_file),
data_path=challenge_info.source_uri,
is_regression=False,
category=[c.value for c in challenge_info.category],
task=challenge_info.task,
answer=challenge_info.ground.answer,
description=challenge_info.info.description,
answer=challenge_info.reference_answer or "",
description=challenge_info.description or "",
metrics=Metrics(
difficulty=challenge_info.info.difficulty.value,
difficulty=difficulty,
attempted=False,
),
)
@@ -89,25 +89,24 @@ def finalize_test_report(
item: pytest.Item, call: pytest.CallInfo, config: AgentBenchmarkConfig
) -> None:
user_properties: dict = dict(item.user_properties)
run_time = user_properties.get("run_time")
info_details: Test = user_properties.get("info_details", {})
test_name: str = user_properties.get("test_name", "")
mock = os.getenv("IS_MOCK") # Check if --mock is in sys.argv
if call:
logger.debug(f"Finalizing report with CallInfo: {vars(call)}")
if call.excinfo is None:
info_details.metrics.success = True
else:
if not mock: # don't remove if it's a mock test
SingletonReportManager().REGRESSION_MANAGER.remove_test(test_name)
info_details.metrics.fail_reason = str(call.excinfo.value)
if call.excinfo.typename == "Skipped":
info_details.metrics.attempted = False
info_details.metrics.attempted = True
info_details.metrics.run_time = f"{str(round(call.duration, 3))} seconds"
info_details.reached_cutoff = user_properties.get("timed_out", False)
logger.debug(f"Finalizing report with CallInfo: {vars(call)}")
if call.excinfo is None:
info_details.metrics.success = True
else:
if not mock: # don't remove if it's a mock test
SingletonReportManager().REGRESSION_MANAGER.remove_test(test_name)
info_details.metrics.fail_reason = str(call.excinfo.value)
if call.excinfo.typename == "Skipped":
info_details.metrics.attempted = False
info_details.metrics.attempted = True
info_details.metrics.run_time = f"{str(round(call.duration, 3))} seconds"
info_details.reached_cutoff = user_properties.get("timed_out", False)
prev_test_results: list[bool] = get_and_update_success_history(
test_name, info_details
@@ -116,19 +115,18 @@ def finalize_test_report(
update_regression_tests(prev_test_results, info_details, test_name)
if info_details and test_name:
if run_time is not None:
cost = None
if "--mock" not in sys.argv and os.environ.get("HELICONE_API_KEY"):
logger.debug("Getting cost from Helicone")
cost = get_data_from_helicone(test_name)
logger.debug(f"Cost: {cost}")
cost = None
if "--mock" not in sys.argv and os.environ.get("HELICONE_API_KEY"):
logger.debug("Getting cost from Helicone")
cost = get_data_from_helicone(test_name)
logger.debug(f"Cost: {cost}")
info_details.metrics.cost = cost
info_details.metrics.cost = cost
if "--mock" not in sys.argv:
update_challenges_already_beaten(
config.challenges_already_beaten_file, info_details, test_name
)
if "--mock" not in sys.argv:
update_challenges_already_beaten(
config.challenges_already_beaten_file, info_details, test_name
)
SingletonReportManager().INFO_MANAGER.add_test_report(test_name, info_details)

View File

@@ -1,272 +0,0 @@
import glob
import json
import logging
import math
import os
import subprocess
import sys
from abc import ABC
from pathlib import Path
from typing import Any, ClassVar, List
import openai
import pytest
from colorama import Fore, Style
from agbenchmark.agent_api_interface import run_api_agent
from agbenchmark.config import AgentBenchmarkConfig
from agbenchmark.utils.data_types import ChallengeData, Ground
from agbenchmark.utils.prompts import (
END_PROMPT,
FEW_SHOT_EXAMPLES,
PROMPT_MAP,
SCORING_MAP,
)
logger = logging.getLogger(__name__)
with open(
Path(__file__).parent.parent / "challenges" / "optional_categories.json"
) as f:
OPTIONAL_CATEGORIES: list[str] = json.load(f)["optional_categories"]
class Challenge(ABC):
"""The parent class to all specific challenges classes.
Defines helper methods for running a challenge"""
data: ChallengeData
CHALLENGE_LOCATION: ClassVar[str]
ARTIFACTS_LOCATION: ClassVar[str]
scores: ClassVar[dict[str, Any]] = {} # this is for suites
@staticmethod
def from_challenge_spec(spec_file: Path) -> type["Challenge"]:
challenge_data = ChallengeData.parse_file(spec_file)
challenge_class_name = f"Test{challenge_data.name}"
logger.debug(f"Creating {challenge_class_name} from spec: {spec_file}")
return type(
challenge_class_name,
(Challenge,),
{
"data": challenge_data,
"CHALLENGE_LOCATION": str(spec_file),
"ARTIFACTS_LOCATION": str(spec_file.resolve().parent),
},
)
# Define test method within the dynamically created class
@pytest.mark.asyncio
async def test_method(
self, config: AgentBenchmarkConfig, request: pytest.FixtureRequest
) -> None:
if os.environ.get("HELICONE_API_KEY"):
from helicone.lock import HeliconeLockManager
HeliconeLockManager.write_custom_property("challenge", self.data.name)
timeout = self.data.cutoff or 60
if request.config.getoption("--nc"):
timeout = 100000
elif cutoff := request.config.getoption("--cutoff"):
timeout = int(cutoff)
await self.run_challenge(config, timeout)
scores = self.get_scores(config.temp_folder)
request.node.answers = (
scores["answers"] if request.config.getoption("--keep-answers") else None
)
del scores["answers"] # remove answers from scores
request.node.scores = scores # store scores in request.node
is_score_100 = 1 in scores["values"]
assert is_score_100
async def run_challenge(self, config: AgentBenchmarkConfig, cutoff: int) -> None:
from agbenchmark.agent_interface import copy_challenge_artifacts_into_workspace
if not self.data.task:
return
print(
f"{Fore.MAGENTA + Style.BRIGHT}{'='*24} "
f"Starting {self.data.name} challenge"
f" {'='*24}{Style.RESET_ALL}"
)
print(f"{Fore.BLACK}Task: {self.data.task}{Fore.RESET}")
await run_api_agent(
self.data.task, config, cutoff, Path(self.ARTIFACTS_LOCATION)
)
# hidden files are added after the agent runs. Hidden files can be python test files.
# We copy them in the temporary folder to make it easy to import the code produced by the agent
artifact_paths = [
self.ARTIFACTS_LOCATION,
str(Path(self.CHALLENGE_LOCATION).parent),
]
for path in artifact_paths:
copy_challenge_artifacts_into_workspace(
path, "custom_python", config.temp_folder
)
@staticmethod
def get_artifacts_out(
workspace: str | Path | dict[str, str], ground: Ground
) -> List[str]:
if isinstance(workspace, dict):
workspace = workspace["output"]
script_dir = workspace
files_contents = []
for file_pattern in ground.files:
# Check if it is a file extension
if file_pattern.startswith("."):
# Find all files with the given extension in the workspace
matching_files = glob.glob(os.path.join(script_dir, "*" + file_pattern))
else:
# Otherwise, it is a specific file
matching_files = [os.path.join(script_dir, file_pattern)]
for file_path in matching_files:
if ground.eval.type == "python":
result = subprocess.run(
[sys.executable, file_path],
cwd=os.path.abspath(workspace),
capture_output=True,
text=True,
)
if "error" in result.stderr or result.returncode != 0:
print(result.stderr)
assert False, result.stderr
files_contents.append(f"Output: {result.stdout}\n")
else:
with open(file_path, "r") as f:
files_contents.append(f.read())
else:
if ground.eval.type == "pytest":
result = subprocess.run(
[sys.executable, "-m", "pytest"],
cwd=os.path.abspath(workspace),
capture_output=True,
text=True,
)
if "error" in result.stderr or result.returncode != 0:
print(result.stderr)
assert False, result.stderr
files_contents.append(f"Output: {result.stdout}\n")
return files_contents
@staticmethod
def scoring(content: str, ground: Ground) -> float:
print(f"{Fore.BLUE}Scoring content:{Style.RESET_ALL}", content)
if ground.should_contain:
for should_contain_word in ground.should_contain:
if not getattr(ground, "case_sensitive", True):
should_contain_word = should_contain_word.lower()
content = content.lower()
print_content = (
f"{Fore.BLUE}Word that should exist{Style.RESET_ALL}"
f" - {should_contain_word}:"
)
if should_contain_word not in content:
print(print_content, "False")
return 0.0
else:
print(print_content, "True")
if ground.should_not_contain:
for should_not_contain_word in ground.should_not_contain:
if not getattr(ground, "case_sensitive", True):
should_not_contain_word = should_not_contain_word.lower()
content = content.lower()
print_content = (
f"{Fore.BLUE}Word that should not exist{Style.RESET_ALL}"
f" - {should_not_contain_word}:"
)
if should_not_contain_word in content:
print(print_content, "False")
return 0.0
else:
print(print_content, "True")
return 1.0
@classmethod
def llm_eval(cls, content: str, ground: Ground) -> float:
openai.api_key = os.getenv("OPENAI_API_KEY")
if os.getenv("IS_MOCK"):
return 1.0
# the validation for this is done in the Eval BaseModel
scoring = SCORING_MAP[ground.eval.scoring] # type: ignore
prompt = PROMPT_MAP[ground.eval.template].format( # type: ignore
task=cls.data.task, scoring=scoring, answer=ground.answer, response=content
)
if ground.eval.examples:
prompt += FEW_SHOT_EXAMPLES.format(examples=ground.eval.examples)
prompt += END_PROMPT
answer = openai.ChatCompletion.create(
model="gpt-4",
messages=[
{"role": "system", "content": prompt},
],
)
return float(answer["choices"][0]["message"]["content"]) # type: ignore
@classmethod
def get_scores(cls, workspace: Path) -> dict[str, Any]:
scores = []
scores_dict: Any = {}
percentage = None
answers = {}
try:
if cls.data.task == "" and os.getenv("IS_MOCK"):
scores = [1.0]
answers = {"mock": "This is a mock answer"}
elif isinstance(cls.data.ground, Ground):
files_contents = cls.get_artifacts_out(workspace, cls.data.ground)
answers = {"answer": files_contents}
for file_content in files_contents:
score = cls.scoring(file_content, cls.data.ground)
print(f"{Fore.GREEN}Your score is:{Style.RESET_ALL}", score)
scores.append(score)
if cls.data.ground.eval.type == "llm":
llm_eval = cls.llm_eval("\n".join(files_contents), cls.data.ground)
if cls.data.ground.eval.scoring == "percentage":
scores.append(math.ceil(llm_eval / 100))
elif cls.data.ground.eval.scoring == "scale":
scores.append(math.ceil(llm_eval / 10))
print(f"{Fore.GREEN}Your score is:{Style.RESET_ALL}", llm_eval)
scores.append(llm_eval)
except Exception as e:
print("Error getting scores", e)
scores_data = {
"values": scores,
"scores_obj": scores_dict,
"percentage": percentage,
"answers": answers,
}
cls.scores[cls.__name__] = scores_data
return scores_data
def get_dummy_scores(self, test_name: str, scores: dict[str, Any]) -> int | None:
return 1 # remove this once this works
if 1 in scores.get("scores_obj", {}).get(test_name, []):
return 1
return None

View File

@@ -1,8 +1,7 @@
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional
from typing import Literal
from pydantic import BaseModel, Field, constr, validator
from pydantic import BaseModel
class DifficultyLevel(Enum):
@@ -29,87 +28,19 @@ DIFFICULTY_MAP = {
STRING_DIFFICULTY_MAP = {e.value: DIFFICULTY_MAP[e] for e in DifficultyLevel}
class Info(BaseModel):
difficulty: DifficultyLevel
description: constr(regex=r"^Tests if the agent can.*")
side_effects: List[str]
@validator("difficulty", pre=True)
def difficulty_to_enum(cls: "Info", v: str | DifficultyLevel) -> DifficultyLevel:
"""Convert a string to an instance of DifficultyLevel."""
if isinstance(v, DifficultyLevel):
return v
if isinstance(v, str):
try:
return DifficultyLevel(v.lower())
except ValueError:
pass
raise ValueError(f"Cannot convert {v} to DifficultyLevel.")
class Eval(BaseModel):
type: str
scoring: Optional[str]
template: Optional[str]
examples: Optional[str]
@validator("scoring", "template", always=True)
def validate_eval_fields(cls, v, values, field):
if "type" in values and values["type"] == "llm":
if v is None:
raise ValueError(f"{field.name} must be provided when type is 'llm'")
else:
if v is not None:
raise ValueError(f"{field.name} should only exist when type is 'llm'")
return v
@validator("scoring")
def validate_scoring(cls, v):
if v is not None and v not in ["percentage", "scale", "binary"]:
raise ValueError(
"scoring must be either 'percentage', 'scale', or 'binary'"
)
return v
@validator("template")
def validate_template(cls, v):
if v is not None and v not in ["rubric", "reference", "question", "custom"]:
raise ValueError(
"template must be either 'rubric', 'reference', 'question', or 'custom'"
)
return v
class Ground(BaseModel):
answer: str
should_contain: Optional[List[str]] = None
should_not_contain: Optional[List[str]] = None
files: List[str]
case_sensitive: Optional[bool] = True
eval: Eval
class Category(str, Enum):
DATA = "data"
GENERALIST = "general"
CODING = "coding"
SCRAPE_SYNTHESIZE = "scrape_synthesize"
WEB = "web"
GAIA_1 = "GAIA_1"
GAIA_2 = "GAIA_2"
GAIA_3 = "GAIA_3"
class ChallengeData(BaseModel):
eval_id: str = ""
name: str
category: List[Category]
task: str
dependencies: List[str]
cutoff: int
ground: Ground
info: Info
metadata: Optional[Dict[str, Any]] = None
spec_file: Path | None = Field(None, exclude=True)
class EvalResult(BaseModel):
result: str
result_source: Literal["step_output"] | str
score: float
passed: bool