Implement old polling mechanism (#5248)

Implement old polling mechanism

Signed-off-by: Merwane Hamadi <merwanehamadi@gmail.com>
This commit is contained in:
merwanehamadi
2023-09-18 16:23:06 -07:00
committed by GitHub
parent 8923e79b29
commit c09a0e7afa
16 changed files with 325 additions and 182 deletions

View File

@@ -127,5 +127,8 @@ jobs:
echo "Running the following command: ${prefix}agbenchmark --test=WriteFile"
${prefix}agbenchmark --test=WriteFile
sh run_benchmark &
sleep 5
python ../../benchmark/tests/test_web_server.py
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}

View File

@@ -0,0 +1,2 @@
Advanced commands to develop on the forge and the benchmark.
Stability not guaranteed.

View File

@@ -0,0 +1,9 @@
#!/bin/bash
# Kill processes using port 8080 if any.
if lsof -t -i :8080; then
kill $(lsof -t -i :8080)
fi
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
uvicorn agbenchmark.app:app --host localhost --port 8080 --reload --log-level info --reload-dir "$SCRIPT_DIR/../../../benchmark/agbenchmark"

View File

@@ -4,7 +4,6 @@ from dotenv import load_dotenv
load_dotenv()
import forge.sdk.forge_log
forge.sdk.forge_log.setup_logger()

View File

@@ -1,5 +1,7 @@
#!/bin/bash
kill $(lsof -t -i :8080)
# Kill processes using port 8080 if any.
if lsof -t -i :8080; then
kill $(lsof -t -i :8080)
fi
poetry run agbenchmark serve

View File

@@ -12,11 +12,9 @@ import toml
from helicone.lock import HeliconeLockManager
from agbenchmark.app import app
from agbenchmark.reports.ReportManager import SingletonReportManager
from agbenchmark.utils.data_types import AgentBenchmarkConfig
from .reports.ReportManager import ReportManager
from .utils.data_types import AgentBenchmarkConfig
BENCHMARK_START_TIME_DT = datetime.now(timezone.utc)
BENCHMARK_START_TIME = BENCHMARK_START_TIME_DT.strftime("%Y-%m-%dT%H:%M:%S+00:00")
TEMP_FOLDER_ABS_PATH = Path.cwd() / "agbenchmark_config" / "temp_folder"
@@ -26,50 +24,6 @@ CHALLENGES_ALREADY_BEATEN = (
UPDATES_JSON_PATH = Path.cwd() / "agbenchmark_config" / "updates.json"
def get_agent_benchmark_config() -> AgentBenchmarkConfig:
agent_benchmark_config_path = str(Path.cwd() / "agbenchmark_config" / "config.json")
try:
with open(agent_benchmark_config_path, "r") as f:
agent_benchmark_config = AgentBenchmarkConfig(**json.load(f))
agent_benchmark_config.agent_benchmark_config_path = (
agent_benchmark_config_path
)
return agent_benchmark_config
except json.JSONDecodeError:
print("Error: benchmark_config.json is not a valid JSON file.")
raise
def get_report_managers() -> tuple[ReportManager, ReportManager, ReportManager]:
agent_benchmark_config = get_agent_benchmark_config()
# tests that consistently pass are considered regression tests
REGRESSION_MANAGER = ReportManager(
agent_benchmark_config.get_regression_reports_path(), BENCHMARK_START_TIME_DT
)
# print(f"Using {REPORTS_PATH} for reports")
# user facing reporting information
INFO_MANAGER = ReportManager(
str(
agent_benchmark_config.get_reports_path(
benchmark_start_time=BENCHMARK_START_TIME_DT
)
/ "report.json"
),
BENCHMARK_START_TIME_DT,
)
# internal db step in replacement track pass/fail rate
INTERNAL_INFO_MANAGER = ReportManager(
agent_benchmark_config.get_success_rate_path(), BENCHMARK_START_TIME_DT
)
return REGRESSION_MANAGER, INFO_MANAGER, INTERNAL_INFO_MANAGER
(REGRESSION_MANAGER, INFO_MANAGER, INTERNAL_INFO_MANAGER) = get_report_managers()
if os.environ.get("HELICONE_API_KEY"):
HeliconeLockManager.write_custom_property(
"benchmark_start_time", BENCHMARK_START_TIME
@@ -122,6 +76,9 @@ def run_benchmark(
) -> int:
"""Start the benchmark tests. If a category flag is provided, run the categories with that mark."""
# Check if configuration file exists and is not empty
initialize_updates_file()
SingletonReportManager()
agent_benchmark_config_path = str(Path.cwd() / "agbenchmark_config" / "config.json")
try:
with open(agent_benchmark_config_path, "r") as f:
@@ -214,7 +171,8 @@ def run_benchmark(
current_dir = Path(__file__).resolve().parent
print(f"Current directory: {current_dir}")
pytest_args.extend((str(current_dir), "--cache-clear"))
return pytest.main(pytest_args)
exit_code = pytest.main(pytest_args)
SingletonReportManager().clear_instance()
@click.group(invoke_without_command=True)
@@ -226,7 +184,7 @@ def run_benchmark(
multiple=True,
help="Skips preventing the tests from this category from running",
)
@click.option("--test", help="Specific test to run")
@click.option("--test", multiple=True, help="Specific test to run")
@click.option("--maintain", is_flag=True, help="Runs only regression tests")
@click.option("--improve", is_flag=True, help="Run only non-regression tests")
@click.option(
@@ -314,6 +272,9 @@ def version():
print(f"Benchmark Tool Version {version}")
from pathlib import Path
def serve():
import uvicorn
@@ -321,5 +282,18 @@ def serve():
uvicorn.run(app, host="0.0.0.0", port=8080)
def initialize_updates_file():
if os.path.exists(UPDATES_JSON_PATH):
# If the file already exists, overwrite it with an empty list
with open(UPDATES_JSON_PATH, "w") as file:
json.dump([], file, indent=2)
print("Initialized updates.json by overwriting with an empty array")
else:
# If the file doesn't exist, create it and write an empty list
with open(UPDATES_JSON_PATH, "w") as file:
json.dump([], file, indent=2)
print("Created updates.json and initialized it with an empty array")
if __name__ == "__main__":
cli()

View File

@@ -1,18 +1,11 @@
import os
import platform
import queue
import select
import shutil
import subprocess
import sys
import time
from threading import Thread
from typing import Any, List
from typing import List
import psutil
from dotenv import load_dotenv
from agbenchmark.utils.data_types import AgentBenchmarkConfig
from agbenchmark.execute_sub_process import execute_subprocess
load_dotenv()
@@ -22,82 +15,12 @@ HELICONE_GRAPHQL_LOGS = (
)
def run_linux_env(process: Any, start_time: float, timeout: float) -> None:
while True:
try:
# This checks if there's data to be read from stdout without blocking.
if process.stdout and select.select([process.stdout], [], [], 0)[0]:
output = process.stdout.readline()
print(output.strip())
except Exception as e:
continue
# Check if process has ended, has no more output, or exceeded timeout
if process.poll() is not None or (time.time() - start_time > timeout):
break
if time.time() - start_time > timeout:
print("The Python function has exceeded the time limit and was terminated.")
parent = psutil.Process(process.pid)
for child in parent.children(recursive=True):
child.kill()
parent.kill()
else:
print("The Python function has finished running.")
def enqueue_output(out: Any, my_queue: Any) -> None:
for line in iter(out.readline, b""):
my_queue.put(line)
out.close()
def run_windows_env(process: Any, start_time: float, timeout: float) -> None:
my_queue: Any = queue.Queue()
thread = Thread(target=enqueue_output, args=(process.stdout, my_queue))
thread.daemon = True
thread.start()
while True:
try:
output = my_queue.get_nowait().strip()
print(output)
except queue.Empty:
pass
if process.poll() is not None or (time.time() - start_time > timeout):
break
if time.time() - start_time > timeout:
print("The Python function has exceeded the time limit and was terminated.")
process.terminate()
def run_agent(task: str, timeout: int, agent_config: AgentBenchmarkConfig) -> None:
def run_agent(task: str, timeout: int) -> None:
print(f"Running agbenchmark/benchmarks.py with timeout {timeout}")
command = [sys.executable, "-m", "agbenchmark_config.benchmarks", str(task)]
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
bufsize=1,
)
start_time = time.time()
if platform.system() == "Windows":
run_windows_env(process, start_time, timeout)
else:
run_linux_env(process, start_time, timeout)
process.wait()
if process.returncode != 0:
print(f"The agent timed out")
execute_subprocess(command, timeout)
def get_list_of_file_paths(

View File

@@ -1,8 +1,9 @@
import json
import os
import sys
from typing import Any, List, Optional
from typing import Any, Optional
import psutil
from fastapi import FastAPI
from fastapi import (
HTTPException as FastAPIHTTPException, # Import HTTPException from FastAPI
@@ -10,11 +11,11 @@ from fastapi import (
from fastapi import Request, Response
from fastapi.middleware.cors import CORSMiddleware
# from agbenchmark.app import app
from agbenchmark.execute_sub_process import execute_subprocess
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from fastapi import FastAPI
from pydantic import BaseModel
from pydantic import BaseModel, Extra
# Change the current working directory to the benchmark path
# home_path = find_absolute_benchmark_path()
@@ -22,12 +23,45 @@ from pydantic import BaseModel
general_command = ["poetry", "run", "agbenchmark", "start", "--backend"]
import psutil
def find_agbenchmark_without_uvicorn():
pids = []
for process in psutil.process_iter(
attrs=[
"pid",
"cmdline",
"name",
"username",
"status",
"cpu_percent",
"memory_info",
"create_time",
"cwd",
"connections",
]
):
try:
# Convert the process.info dictionary values to strings and concatenate them
full_info = " ".join([str(v) for k, v in process.info.items()])
if "agbenchmark" in full_info and "uvicorn" not in full_info:
pids.append(process.info["pid"])
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
return pids
class CreateReportRequest(BaseModel):
tests: Optional[List[str]] = []
category: Optional[str] = []
test: str = None
test_run_id: str = None
# category: Optional[str] = []
mock: Optional[bool] = False
class Config:
extra = Extra.forbid # this will forbid any extra fields
updates_list = []
@@ -50,25 +84,30 @@ app.add_middleware(
)
def stream_output(pipe):
for line in pipe:
print(line, end="")
@app.post("/reports")
def run_single_test(body: CreateReportRequest) -> Any:
from agbenchmark.__main__ import run_benchmark
pids = find_agbenchmark_without_uvicorn()
print(f"pids already running with agbenchmark: {pids}")
print(body.dict())
# it's a hack because other parts of the code are using sys.argv
sys.argv = [sys.argv[0]]
sys.argv.append("start")
if body.category:
sys.argv.append(f"--category={body.category}")
for body_test in body.tests:
sys.argv.append(f"--test={body_test}")
categories = None
if body.category:
categories = tuple([body.category])
print(os.getcwd())
command_options = ["agbenchmark"]
# if body.category:
# sys.argv.append(f"--category={body.category}")
command_options.append(f"--test={body.test}")
if body.mock:
command_options.append("--mock")
run_benchmark(category=categories, mock=body.mock, test=tuple(body.tests))
execute_subprocess(command_options, 200)
import json
from pathlib import Path
print("finished running")
# List all folders in the current working directory
path_reports = Path.cwd() / "agbenchmark_config" / "reports"
folders = [folder for folder in path_reports.iterdir() if folder.is_dir()]
@@ -82,6 +121,7 @@ def run_single_test(body: CreateReportRequest) -> Any:
# Read report.json from this folder
if last_folder:
report_path = last_folder / "report.json"
print(report_path)
if report_path.exists():
with report_path.open() as file:
data = json.load(file)

View File

@@ -17,7 +17,7 @@
},
"info": {
"difficulty": "basic",
"description": "s ability to generate content based on the content of 2 files.",
"description": "ability to generate content based on the content of 2 files.",
"side_effects": []
}
}

View File

@@ -0,0 +1,79 @@
import platform
import queue
import select
import subprocess
import time
from threading import Thread
from typing import Any
import psutil
def run_linux_env(process: Any, start_time: float, timeout: float) -> None:
while True:
try:
# This checks if there's data to be read from stdout without blocking.
if process.stdout and select.select([process.stdout], [], [], 0)[0]:
output = process.stdout.readline()
print(output.strip())
except Exception as e:
continue
# Check if process has ended, has no more output, or exceeded timeout
if process.poll() is not None or (time.time() - start_time > timeout):
break
if time.time() - start_time > timeout:
print("The Python function has exceeded the time limit and was terminated.")
parent = psutil.Process(process.pid)
for child in parent.children(recursive=True):
child.kill()
parent.kill()
else:
print("The Python function has finished running.")
def enqueue_output(out: Any, my_queue: Any) -> None:
for line in iter(out.readline, b""):
my_queue.put(line)
out.close()
def run_windows_env(process: Any, start_time: float, timeout: float) -> None:
my_queue: Any = queue.Queue()
thread = Thread(target=enqueue_output, args=(process.stdout, my_queue))
thread.daemon = True
thread.start()
while True:
try:
output = my_queue.get_nowait().strip()
print(output)
except queue.Empty:
pass
if process.poll() is not None or (time.time() - start_time > timeout):
break
if time.time() - start_time > timeout:
print("The Python function has exceeded the time limit and was terminated.")
process.terminate()
def execute_subprocess(command, timeout):
process = subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
bufsize=1,
)
start_time = time.time()
if platform.system() == "Windows":
run_windows_env(process, start_time, timeout)
else:
run_linux_env(process, start_time, timeout)
process.wait()
if process.returncode != 0:
print(f"The agent timed out")

View File

@@ -10,7 +10,7 @@ from typing import Any, Dict, Optional
import pytest
from agbenchmark.__main__ import CHALLENGES_ALREADY_BEATEN, UPDATES_JSON_PATH
from agbenchmark.__main__ import CHALLENGES_ALREADY_BEATEN
from agbenchmark.agent_api_interface import append_updates_file
from agbenchmark.agent_protocol_client.models.step import Step
from agbenchmark.utils.challenge import Challenge
@@ -218,18 +218,4 @@ def challenge_should_be_ignored(json_file):
return "challenges/deprecated" in json_file or "challenges/library" in json_file
def initialize_updates_file():
if os.path.exists(UPDATES_JSON_PATH):
# If the file already exists, overwrite it with an empty list
with open(UPDATES_JSON_PATH, "w") as file:
json.dump([], file, indent=2)
print("Initialized updates.json by overwriting with an empty array")
else:
# If the file doesn't exist, create it and write an empty list
with open(UPDATES_JSON_PATH, "w") as file:
json.dump([], file, indent=2)
print("Created updates.json and initialized it with an empty array")
initialize_updates_file()
generate_tests()

View File

@@ -1,3 +1,4 @@
import copy
import json
import os
import sys
@@ -11,6 +12,48 @@ from agbenchmark.utils.data_types import AgentBenchmarkConfig
from agbenchmark.utils.utils import get_highest_success_difficulty
class SingletonReportManager:
instance = None
def __new__(cls):
from agbenchmark.reports.agent_benchmark_config import (
get_agent_benchmark_config,
)
if not cls.instance:
cls.instance = super(SingletonReportManager, cls).__new__(cls)
agent_benchmark_config = get_agent_benchmark_config()
benchmark_start_time_dt = (
datetime.now()
) # or any logic to fetch the datetime
# Make the Managers class attributes
cls.REGRESSION_MANAGER = ReportManager(
agent_benchmark_config.get_regression_reports_path(),
benchmark_start_time_dt,
)
cls.INFO_MANAGER = ReportManager(
str(
agent_benchmark_config.get_reports_path(benchmark_start_time_dt)
/ "report.json"
),
benchmark_start_time_dt,
)
cls.INTERNAL_INFO_MANAGER = ReportManager(
agent_benchmark_config.get_success_rate_path(), benchmark_start_time_dt
)
return cls.instance
@classmethod
def clear_instance(cls):
cls.instance = None
cls.REGRESSION_MANAGER = None
cls.INFO_MANAGER = None
cls.INTERNAL_INFO_MANAGER = None
class ReportManager:
"""Abstracts interaction with the regression tests file"""
@@ -81,7 +124,7 @@ class ReportManager:
"highest_difficulty": get_highest_success_difficulty(self.tests),
"total_cost": self.get_total_costs(),
},
"tests": self.tests,
"tests": copy.copy(self.tests),
"config": {
k: v for k, v in json.loads(config.json()).items() if v is not None
},
@@ -105,6 +148,7 @@ class ReportManager:
cost = test_data["metrics"].get(
"cost", 0
) # gets the cost or defaults to 0 if cost is missing
if cost is not None: # check if cost is not None
all_costs_none = False
total_cost += cost # add cost to total

View File

@@ -0,0 +1,18 @@
import json
from pathlib import Path
from agbenchmark.utils.data_types import AgentBenchmarkConfig
def get_agent_benchmark_config() -> AgentBenchmarkConfig:
agent_benchmark_config_path = str(Path.cwd() / "agbenchmark_config" / "config.json")
try:
with open(agent_benchmark_config_path, "r") as f:
agent_benchmark_config = AgentBenchmarkConfig(**json.load(f))
agent_benchmark_config.agent_benchmark_config_path = (
agent_benchmark_config_path
)
return agent_benchmark_config
except json.JSONDecodeError:
print("Error: benchmark_config.json is not a valid JSON file.")
raise

View File

@@ -3,13 +3,9 @@ import os
import sys
from typing import Any, Dict
from agbenchmark.__main__ import (
CHALLENGES_ALREADY_BEATEN,
INFO_MANAGER,
INTERNAL_INFO_MANAGER,
REGRESSION_MANAGER,
get_agent_benchmark_config,
)
from agbenchmark.__main__ import CHALLENGES_ALREADY_BEATEN
from agbenchmark.reports.agent_benchmark_config import get_agent_benchmark_config
from agbenchmark.reports.ReportManager import SingletonReportManager
from agbenchmark.utils.data_types import DifficultyLevel
from agbenchmark.utils.get_data_from_helicone import get_data_from_helicone
from agbenchmark.utils.utils import calculate_success_percentage
@@ -21,12 +17,16 @@ def get_previous_test_results(
agent_tests: dict[str, list[bool]] = {}
mock = os.getenv("IS_MOCK") # Check if --mock is in sys.argv
prev_test_results = INTERNAL_INFO_MANAGER.tests.get(test_name, [])
prev_test_results = SingletonReportManager().INTERNAL_INFO_MANAGER.tests.get(
test_name, []
)
if not mock:
# only add if it's an actual test
prev_test_results.append(info_details["metrics"]["success"])
INTERNAL_INFO_MANAGER.add_test(test_name, prev_test_results)
SingletonReportManager().INTERNAL_INFO_MANAGER.add_test(
test_name, prev_test_results
)
# can calculate success rate regardless of mock
info_details["metrics"]["success_%"] = calculate_success_percentage(
@@ -45,7 +45,7 @@ def update_regression_tests(
if len(prev_test_results) >= 3 and prev_test_results[-3:] == [True, True, True]:
# if the last 3 tests were successful, add to the regression tests
info_details["is_regression"] = True
REGRESSION_MANAGER.add_test(test_name, test_details)
SingletonReportManager().REGRESSION_MANAGER.add_test(test_name, test_details)
def generate_single_call_report(
@@ -95,7 +95,7 @@ def generate_single_call_report(
info_details["metrics"]["success"] = True
else:
if not mock: # don't remove if it's a mock test
REGRESSION_MANAGER.remove_test(test_name)
SingletonReportManager().REGRESSION_MANAGER.remove_test(test_name)
info_details["metrics"]["fail_reason"] = str(call.excinfo.value)
if call.excinfo.typename == "Skipped":
info_details["metrics"]["attempted"] = False
@@ -146,7 +146,7 @@ def finalize_reports(item: Any, challenge_data: dict[str, Any]) -> None:
nested_test_info, nested_test_name
)
INFO_MANAGER.add_test(test_name, info_details)
SingletonReportManager().INFO_MANAGER.add_test(test_name, info_details)
def update_challenges_already_beaten(
@@ -171,6 +171,6 @@ def update_challenges_already_beaten(
def session_finish(suite_reports: dict) -> None:
agent_benchmark_config = get_agent_benchmark_config()
INTERNAL_INFO_MANAGER.save()
INFO_MANAGER.end_info_report(agent_benchmark_config)
REGRESSION_MANAGER.save()
SingletonReportManager().INTERNAL_INFO_MANAGER.save()
SingletonReportManager().INFO_MANAGER.end_info_report(agent_benchmark_config)
SingletonReportManager().REGRESSION_MANAGER.save()

View File

View File

@@ -0,0 +1,64 @@
import threading
import time
import unittest
import requests
class TestAPIRequests(unittest.TestCase):
URL = "http://localhost:8080"
def test_post_correct_then_incorrect_test_name(self):
payload1 = {"test": "WriteFile", "mock": True}
# First POST request
response1 = requests.post(self.URL + "/reports", json=payload1)
self.assertEqual(response1.status_code, 200)
# Here you might want to check other aspects of the response, e.g., response1.json()
print(response1.json())
self.assertNotEqual(response1.json()["tests"], {})
payload2 = {"test": "TestWriteFile", "mock": True}
# Second POST request
response2 = requests.post(self.URL + "/reports", json=payload2)
print(response2.json())
self.assertEqual(response2.json()["tests"], {})
assert response1.json() != {}
# Here you might want to check other aspects of the response, e.g., response2.json()
def test_invalid_payload(self):
invalid_payload = {"invalid_key": "value"}
response = requests.post(self.URL + "/reports", json=invalid_payload)
self.assertEqual(response.status_code, 422) # Assuming 400 for Bad Request
def test_post_report_and_poll_updates(self):
payload1 = {"test": "WriteFile", "mock": True}
last_update_time = int(time.time())
# First POST request in a separate thread
threading.Thread(target=self.send_post_request, args=(payload1,)).start()
# Give a short time to ensure POST request is initiated before GET requests start
# Start GET requests
for _ in range(5):
# get the current UNIX time
response = requests.get(
f"{self.URL}/updates?last_update_time={last_update_time}"
)
if response.status_code == 200 and response.json():
print("Received a non-empty response:", response.json())
break
time.sleep(1) # wait for 1 second before the next request
else:
self.fail("No updates received")
def send_post_request(self, payload):
response = requests.post(f"{self.URL}/reports", json=payload)
if response.status_code == 200:
print(response.json())
if __name__ == "__main__":
unittest.main()