import logging from abc import ABC, abstractmethod from pathlib import Path from typing import Any, AsyncIterator, Awaitable, ClassVar, Optional import pytest from agbenchmark.config import AgentBenchmarkConfig from agbenchmark.utils.data_types import Category, DifficultyLevel, EvalResult from agent_protocol_client import AgentApi, Step from colorama import Fore, Style from pydantic import BaseModel, Field logger = logging.getLogger(__name__) def format_step_output(step: Step, step_num: int, challenge_name: str) -> str: """Format a step for concise, informative console output. Format: [Challenge] step N: tool_name(args) → result [$cost] """ parts = [f"[{challenge_name}]", f"step {step_num}:"] # Get additional_output data ao: dict[str, Any] = step.additional_output or {} # Get the tool being used in this step use_tool = ao.get("use_tool", {}) tool_name = use_tool.get("name", "") tool_args = use_tool.get("arguments", {}) if tool_name: # Format tool call with abbreviated arguments args_str = _format_tool_args(tool_name, tool_args) parts.append(f"{Fore.CYAN}{tool_name}{Fore.RESET}({args_str})") else: parts.append(f"{Fore.YELLOW}(no tool){Fore.RESET}") # Get result from last action (this step's tool will be executed next iteration) last_action = ao.get("last_action", {}) if last_action: result = last_action.get("result", {}) if isinstance(result, dict): if result.get("error"): parts.append(f"→ {Fore.RED}error{Fore.RESET}") elif result.get("status") == "success": parts.append(f"→ {Fore.GREEN}✓{Fore.RESET}") # Add cost if available cost = ao.get("task_cumulative_cost", 0) if cost > 0: parts.append(f"{Fore.BLUE}${cost:.3f}{Fore.RESET}") return " ".join(parts) def _format_tool_args(tool_name: str, args: dict) -> str: """Format tool arguments for display, keeping it concise.""" if not args: return "" # For common tools, show the most relevant argument key_args = { "read_file": ["filename"], "write_file": ["filename"], "open_file": ["filename", "file_path"], "execute_python": ["filename"], "execute_shell": ["command_line"], "web_search": ["query"], "read_webpage": ["url"], "finish": ["reason"], "ask_user": ["question"], "todo_write": [], # Skip args for todo_write (too verbose) } if tool_name in key_args: keys = key_args[tool_name] if not keys: return "..." values = [str(args.get(k, ""))[:40] for k in keys if k in args] if values: return ", ".join( f'"{v}"' if " " not in v else f'"{v[:20]}..."' for v in values ) # Default: show first arg value, abbreviated if args: first_key = next(iter(args)) first_val = str(args[first_key])[:30] return f'{first_key}="{first_val}"' + ( "..." if len(str(args[first_key])) > 30 else "" ) return "" class ChallengeInfo(BaseModel): eval_id: str = "" name: str task: str task_artifacts_dir: Optional[Path] = None category: list[Category] difficulty: Optional[DifficultyLevel] = None description: Optional[str] = None dependencies: list[str] = Field(default_factory=list) reference_answer: Optional[str] source_uri: str """Internal reference indicating the source of the challenge specification""" available: bool = True unavailable_reason: str = "" class BaseChallenge(ABC): """ The base class and shared interface for all specific challenge implementations. """ info: ClassVar[ChallengeInfo] @classmethod @abstractmethod def from_source_uri(cls, source_uri: str) -> type["BaseChallenge"]: """ Construct an individual challenge subclass from a suitable `source_uri` (as in `ChallengeInfo.source_uri`). """ ... @abstractmethod def test_method( self, config: AgentBenchmarkConfig, request: pytest.FixtureRequest, i_attempt: int, ) -> None | Awaitable[None]: """ Test method for use by Pytest-based benchmark sessions. Should return normally if the challenge passes, and raise a (preferably descriptive) error otherwise. """ ... @classmethod async def run_challenge( cls, config: AgentBenchmarkConfig, timeout: int, *, mock: bool = False ) -> AsyncIterator[Step]: """ Runs the challenge on the subject agent with the specified timeout. Also prints basic challenge and status info to STDOUT. Params: config: The subject agent's benchmark config. timeout: Timeout (seconds) after which to stop the run if not finished. Yields: Step: The steps generated by the agent for the challenge task. """ # avoid circular import from agbenchmark.agent_api_interface import run_api_agent print() print( f"{Fore.MAGENTA + Style.BRIGHT}{'='*24} " f"Starting {cls.info.name} challenge" f" {'='*24}{Style.RESET_ALL}" ) print(f"{Fore.CYAN}Timeout:{Fore.RESET} {timeout} seconds") print(f"{Fore.CYAN}Task:{Fore.RESET} {cls.info.task}") print() logger.debug(f"Starting {cls.info.name} challenge run") i = 0 async for step in run_api_agent( cls.info.task, config, timeout, cls.info.task_artifacts_dir, mock=mock ): i += 1 print(format_step_output(step, i, cls.info.name)) yield step logger.debug(f"Finished {cls.info.name} challenge run") @classmethod @abstractmethod async def evaluate_task_state( cls, agent: AgentApi, task_id: str ) -> list[EvalResult]: ...