Merge branch 'main' into jd/cities_launcher

This commit is contained in:
ablouin-lmg
2024-10-04 11:45:08 -07:00
committed by GitHub
22 changed files with 392 additions and 81 deletions

View File

@@ -5,6 +5,7 @@ All notable changes to this project will be documented in this file.
Changes are grouped by the date they are merged to the main branch of the repository and are ordered from newest to oldest. Dates use the ISO 8601 extended calendar date format, i.e. YYYY-MM-DD.
## 2024-10-03
- Implemented the artifact manager to capture screenshots of in game settings and results for the following games:
- Cities Skylines 2
- Counterstrike 2
@@ -15,6 +16,10 @@ Changes are grouped by the date they are merged to the main branch of the reposi
- Total War Warhammer 3
- Also implemented steam build ID tracking for the same games
## 2024-09-23
- Add screen splitting to Keras Service.
## 2024-9-20
- Add Grid Legends test harness.

View File

@@ -158,9 +158,7 @@ console.setFormatter(formatter)
logging.getLogger("").addHandler(console)
args = get_args()
kerasService = KerasService(
args.keras_host, args.keras_port, os.path.join(LOG_DIRECTORY, "screenshot.jpg")
)
kerasService = KerasService(args.keras_host, args.keras_port)
try:
start_time, end_time = run_benchmark()

View File

@@ -242,10 +242,7 @@ parser.add_argument(
"--kerasPort", dest="keras_port", help="Port for Keras OCR service", required=True
)
args = parser.parse_args()
kerasService = KerasService(
args.keras_host, args.keras_port, os.path.join(
LOG_DIRECTORY, "screenshot.jpg")
)
kerasService = KerasService(args.keras_host, args.keras_port)
try:
start_time, end_time = run_benchmark()

View File

@@ -141,8 +141,7 @@ parser.add_argument("--kerasHost", dest="keras_host",
parser.add_argument("--kerasPort", dest="keras_port",
help="Port for Keras OCR service", required=True)
args = parser.parse_args()
kerasService = KerasService(args.keras_host, args.keras_port, os.path.join(
LOG_DIRECTORY, "screenshot.jpg"))
kerasService = KerasService(args.keras_host, args.keras_port)
try:
start_time, end_time = run_benchmark()

View File

@@ -189,7 +189,7 @@ def main():
parser.add_argument("--kerasPort", dest="keras_port",
help="Port for Keras OCR service", required=True)
args = parser.parse_args()
keras_service = KerasService(args.keras_host, args.keras_port, LOG_DIR.joinpath("screenshot.jpg"))
keras_service = KerasService(args.keras_host, args.keras_port)
test_start_time, test_end_time = run_benchmark(keras_service)
resolution = read_current_resolution()

View File

@@ -19,7 +19,8 @@ from harness_utils.output import (
DEFAULT_DATE_FORMAT)
from harness_utils.process import terminate_processes
from harness_utils.keras_service import KerasService
from harness_utils.steam import exec_steam_game
from harness_utils.steam import exec_steam_game, get_registry_active_user, get_steam_folder_path, get_build_id
from harness_utils.artifacts import ArtifactManager, ArtifactType
SCRIPT_DIR = Path(__file__).resolve().parent
@@ -27,6 +28,9 @@ LOG_DIR = SCRIPT_DIR.joinpath("run")
PROCESS_NAME = "cs2.exe"
STEAM_GAME_ID = 730
STEAM_USER_ID = get_registry_active_user()
cfg = Path(get_steam_folder_path(), "userdata", str(STEAM_USER_ID), str(STEAM_GAME_ID), "local", "cfg", "cs2_video.txt")
def setup_logging():
"""default logging config"""
LOG_DIR.mkdir(exist_ok=True)
@@ -56,6 +60,7 @@ def run_benchmark(keras_service):
copy_config()
setup_start_time = time.time()
start_game()
am = ArtifactManager(LOG_DIR)
time.sleep(20) # wait for game to load into main menu
result = keras_service.wait_for_word("play", timeout=30, interval=0.1)
@@ -63,6 +68,62 @@ def run_benchmark(keras_service):
logging.info("Did not find the play menu. Did the game load?")
sys.exit(1)
height, width = get_resolution()
location = None
# We check the resolution so we know which screenshot to use for the locate on screen function
match width:
case "1920":
location = gui.locateOnScreen(f"{SCRIPT_DIR}\\screenshots\\settings_1080.png")
case "2560":
location = gui.locateOnScreen(f"{SCRIPT_DIR}\\screenshots\\settings_1440.png")
case "3840":
location = gui.locateOnScreen(f"{SCRIPT_DIR}\\screenshots\\settings_2160.png")
case _:
logging.error("Could not find the settings cog. The game resolution is currently %s, %s. Are you using a standard resolution?", height, width)
sys.exit(1)
click_me = gui.center(location)
gui.moveTo(click_me.x, click_me.y)
gui.mouseDown()
time.sleep(0.2)
gui.mouseUp()
time.sleep(0.2)
if keras_service.wait_for_word(word="brightness", timeout=30, interval=1) is None:
logging.info("Did not find the video settings menu. Did the menu get stuck?")
sys.exit(1)
am.take_screenshot("video.png", ArtifactType.CONFIG_IMAGE, "picture of video settings")
result = keras_service.look_for_word(word="advanced", attempts=10, interval=1)
if not result:
logging.info("Did not find the advanced video menu. Did Keras click correctly?")
sys.exit(1)
gui.moveTo(result["x"], result["y"])
gui.mouseDown()
time.sleep(0.2)
gui.mouseUp()
time.sleep(0.2)
am.take_screenshot("advanced_video_1.png", ArtifactType.CONFIG_IMAGE, "first picture of advanced video settings")
result = keras_service.look_for_word(word="boost", attempts=10, interval=1)
if not result:
logging.info("Did not find the keyword 'Boost' in the advanced video menu. Did Keras click correctly?")
sys.exit(1)
gui.moveTo(result["x"], result["y"])
time.sleep(1)
gui.scroll(-6000000)
time.sleep(1)
if keras_service.wait_for_word(word="particle", timeout=30, interval=1) is None:
logging.info("Did not find the keyword 'Particle' in advanced video menu. Did Keras scroll correctly?")
sys.exit(1)
am.take_screenshot("advanced_video_2.png", ArtifactType.CONFIG_IMAGE, "second picture of advanced video settings")
logging.info('Starting benchmark')
user.press("`")
time.sleep(0.5)
@@ -107,8 +168,13 @@ def run_benchmark(keras_service):
test_end_time = time.time()
logging.info("The console opened. Marking end time.")
time.sleep(10)
# allow time for result screen to populate
time.sleep(8)
am.take_screenshot("result.png", ArtifactType.RESULTS_IMAGE, "benchmark results")
am.copy_file(Path(cfg), ArtifactType.CONFIG_TEXT, "cs2 video config")
logging.info("Run completed. Closing game.")
time.sleep(2)
elapsed_test_time = round((test_end_time - test_start_time), 2)
logging.info("Benchmark took %f seconds", elapsed_test_time)
@@ -125,8 +191,7 @@ def main():
help="Port for Keras OCR service", required=True)
args = parser.parse_args()
keras_service = KerasService(args.keras_host, args.keras_port, os.path.join(
LOG_DIR, "screenshot.jpg"))
keras_service = KerasService(args.keras_host, args.keras_port)
start_time, end_time = run_benchmark(keras_service)
@@ -134,7 +199,8 @@ def main():
report = {
"resolution": format_resolution(width, height),
"start_time": seconds_to_milliseconds(start_time),
"end_time": seconds_to_milliseconds(end_time)
"end_time": seconds_to_milliseconds(end_time),
"version": get_build_id(STEAM_GAME_ID)
}
write_report_json(LOG_DIR, "report.json", report)

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.0 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.3 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.8 KiB

View File

@@ -126,8 +126,7 @@ console.setFormatter(formatter)
logging.getLogger('').addHandler(console)
args = get_args()
kerasService = KerasService(args.keras_host, args.keras_port, os.path.join(
LOG_DIRECTORY, "screenshot.jpg"))
kerasService = KerasService(args.keras_host, args.keras_port)
try:
start_time, end_time = run_benchmark()

View File

@@ -37,8 +37,7 @@ console.setFormatter(formatter)
logging.getLogger('').addHandler(console)
args = get_args()
kerasService = KerasService(args.keras_host, args.keras_port, os.path.join(
LOG_DIRECTORY, "screenshot.jpg"))
kerasService = KerasService(args.keras_host, args.keras_port)
def start_game():

View File

@@ -133,8 +133,7 @@ parser.add_argument("--kerasHost", dest="keras_host",
parser.add_argument("--kerasPort", dest="keras_port",
help="Port for Keras OCR service", required=True)
args = parser.parse_args()
kerasService = KerasService(args.keras_host, args.keras_port, os.path.join(
LOG_DIRECTORY, "screenshot.jpg"))
kerasService = KerasService(args.keras_host, args.keras_port)
try:
start_time, end_time = run_benchmark()

View File

@@ -20,24 +20,25 @@ from harness_utils.output import (
DEFAULT_DATE_FORMAT)
from harness_utils.process import terminate_processes
from harness_utils.keras_service import KerasService
from harness_utils.steam import exec_steam_game
from harness_utils.steam import exec_steam_game, get_build_id
from harness_utils.artifacts import ArtifactManager, ArtifactType
SCRIPT_DIR = Path(__file__).resolve().parent
LOG_DIR = SCRIPT_DIR.joinpath("run")
PROCESS_NAME = "gridlegends.exe"
STEAM_GAME_ID = 1307710
username = os.getlogin()
CONFIG_PATH = f"C:\\Users\\{username}\\Documents\\My Games\\GRID Legends\\hardwaresettings"
CONFIG_FILENAME = "hardware_settings_config.xml"
CONFIG_FULL_PATH = f"{CONFIG_PATH}\\{CONFIG_FILENAME}"
def get_resolution() -> tuple[int]:
"""Gets resolution width and height from local xml file created by game."""
username = os.getlogin()
config_path = f"C:\\Users\\{username}\\Documents\\My Games\\GRID Legends\\hardwaresettings"
config_filename = "hardware_settings_config.xml"
resolution = re.compile(r"<resolution width=\"(\d+)\" height=\"(\d+)\"")
cfg = f"{config_path}\\{config_filename}"
height = 0
width = 0
with open(cfg, encoding="utf-8") as file:
with open(CONFIG_FULL_PATH, encoding="utf-8") as file:
lines = file.readlines()
for line in lines:
height_match = resolution.search(line)
@@ -81,6 +82,8 @@ def run_benchmark(keras_service):
"""Run Grid Legends benchmark"""
setup_start_time = time.time()
start_game()
am = ArtifactManager(LOG_DIR)
time.sleep(20) # wait for game to load to the start screen
if keras_service.wait_for_word(word="press", timeout=80, interval=1) is None:
@@ -110,9 +113,26 @@ def run_benchmark(keras_service):
if keras_service.wait_for_word(word="basic", timeout=30, interval=0.1) is None:
logging.error("Didn't basic video options. Did the menu navigate correctly?")
sys.exit(1)
am.take_screenshot("basic.png", ArtifactType.CONFIG_IMAGE, "picture of basic settings")
user.press("f3")
time.sleep(0.2)
if keras_service.wait_for_word(word="benchmark", timeout=30, interval=0.1) is None:
logging.error("Didn't reach advanced video options. Did the menu navigate correctly?")
sys.exit(1)
am.take_screenshot("advanced_1.png", ArtifactType.CONFIG_IMAGE, "first picture of advanced settings")
user.press("up")
time.sleep(0.2)
if keras_service.wait_for_word(word="shading", timeout=30, interval=0.1) is None:
logging.error("Didn't reach bottom of advanced video settings. Did the menu navigate correctly?")
sys.exit(1)
am.take_screenshot("advanced_2.png", ArtifactType.CONFIG_IMAGE, "second picture of advanced settings")
user.press("down")
time.sleep(0.2)
user.press("enter")
time.sleep(0.2)
@@ -134,6 +154,9 @@ def run_benchmark(keras_service):
test_end_time = time.time() - 2
time.sleep(2)
am.take_screenshot("results.png", ArtifactType.RESULTS_IMAGE, "benchmark results")
am.copy_file(Path(CONFIG_FULL_PATH), ArtifactType.CONFIG_TEXT, "game config")
logging.info("Run completed. Closing game.")
time.sleep(2)
return test_start_time, test_end_time
@@ -142,7 +165,7 @@ def run_benchmark(keras_service):
def main():
"""entry point"""
args = get_args()
keras_service = KerasService(args.keras_host, args.keras_port, LOG_DIR.joinpath("screenshot.jpg"))
keras_service = KerasService(args.keras_host, args.keras_port)
start_time, end_time = run_benchmark(keras_service)
elapsed_test_time = round((end_time - start_time), 2)
logging.info("Benchmark took %f seconds", elapsed_test_time)
@@ -151,7 +174,8 @@ def main():
report = {
"resolution": format_resolution(width, height),
"start_time": seconds_to_milliseconds(start_time),
"end_time": seconds_to_milliseconds(end_time)
"end_time": seconds_to_milliseconds(end_time),
"version": get_build_id(STEAM_GAME_ID)
}
write_report_json(LOG_DIR, "report.json", report)

View File

@@ -78,6 +78,66 @@ Given the configuration and artifacts captured in the above code snippets, the r
Contains class for instancing connection to a Keras Service and provides access to its web API.
### Usage:
Import KerasService
```python
from harness_utils.keras_service import KerasService
```
Instantiate a Keras
```python
# usually your host and port will come from the script arguments
keras_service = KerasService(args.keras_host, args.keras_port)
```
You can look a word on the screen for a number of attempts, or wait for a word to appear on the screen for an amount of time.
```python
# this will send a screenshot every 1 second to keras and look for the word "options" in it
# this function call will block until the word has been found returning True, or None once 30 seconds has passed
result = keras_service.wait_for_word("options", timeout=30, interval=1)
# this will send a screenshot to keras every 1 second ato keras and look for the word "continue" in it
# this function call will block until it has tried 20 times
result = keras_service.look_for_word("continue", attempts=20, interval=1)
```
You can optionally check only half the screen, or 1/4 of the screen. This shrinks the amount of screenshot that Keras has to search for a word which means a faster result time.
First import ScreenSplitConfig, ScreenShotDivideMethod, and ScreenShotQuadrant
```python
from harness_utils.keras_service import KerasService, ScreenSplitConfig, ScreenShotDivideMethod, ScreenShotQuadrant
```
Then create your ScreenSplitConfig object and pass it to the look_for_word or wait_for_word functions.
```python
# this config will split the screen horizontally and look in the top of the screen
ss_config = ScreenSplitConfig(
divide_method=ScreenShotDivideMethod.HORIZONTAL
quadrant=ScreenShotQuadrant.TOP
)
# this one will split the screen vertically and look in the right of the screen
ss_config = ScreenSplitConfig(
divide_method=ScreenShotDivideMethod.VERTICAL
quadrant=ScreenShotQuadrant.RIGHT
)
# ans this will split the screen into 4 and look in the bottom left of the screen
ss_config = ScreenSplitConfig(
divide_method=ScreenShotDivideMethod.QUADRANT
quadrant=ScreenShotQuadrant.BOTTOM_LEFT
)
# pass the config to the function call
result = keras_service.wait_for_word("options", timeout=30, interval=1, split_config=ss_config)
```
## Output
`output.py`

View File

@@ -1,45 +1,91 @@
"""Allows accessing Keras Service if available."""
import io
import json
import logging
import os
import time
import mss
import cv2
import requests
import numpy as np
from enum import Enum
from dataclasses import dataclass
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
DEFAULT_TIMEOUT = 120.0
class ScreenShotDivideMethod(Enum):
"""split method"""
HORIZONTAL = "horizontal"
VERTICAL = "vertical"
QUADRANT = "quadrant"
NONE = "none"
class ScreenShotQuadrant(Enum):
"""split arguments"""
TOP = 1
BOTTOM = 2
LEFT = 1
RIGHT = 2
TOP_LEFT = 1
TOP_RIGHT = 2
BOTTOM_LEFT = 3
BOTTOM_RIGHT = 4
class FrameDivideException(ValueError):
"""Exception indicating was encountered while trying to divide a frame"""
@dataclass
class ScreenSplitConfig:
"""data class to contain config for taking splitting a screen shot"""
divide_method: ScreenShotDivideMethod
quadrant: ScreenShotQuadrant
class KerasService():
"""Sets up connection to a Keras service and provides methods to use it"""
def __init__(
self,
ip_addr: str,
port: int | str,
screenshot_path: str | os.PathLike,
timeout: float = DEFAULT_TIMEOUT) -> None:
self.ip_addr = ip_addr
self.port = str(port)
self.url = f"http://{ip_addr}:{str(port)}/process"
self.screenshot_path = screenshot_path
self.timeout = timeout
def _capture_screenshot(self) -> None:
def _capture_screenshot(self, split_config: ScreenSplitConfig) -> io.BytesIO:
screenshot = None
with mss.mss() as sct:
monitor_1 = sct.monitors[1] # Identify the display to capture
screen = np.array(sct.grab(monitor_1))
screen = cv2.cvtColor(screen, cv2.COLOR_RGB2GRAY)
cv2.imwrite(str(self.screenshot_path), screen)
screenshot = np.array(sct.grab(monitor_1))
screenshot = cv2.cvtColor(screenshot, cv2.COLOR_RGB2GRAY)
def _query_service(self, word: str, report_file: any) -> any:
if split_config.divide_method == ScreenShotDivideMethod.HORIZONTAL:
screenshot = self._divide_horizontal(
screenshot, split_config.quadrant)
elif split_config.divide_method == ScreenShotDivideMethod.VERTICAL:
screenshot = self._divide_vertical(
screenshot, split_config.quadrant)
elif split_config.divide_method == ScreenShotDivideMethod.QUADRANT:
screenshot = self._divide_in_four(
screenshot, split_config.quadrant)
_, encoded_image = cv2.imencode('.jpg', screenshot)
return io.BytesIO(encoded_image)
def _query_service(self, word: str, image_bytes: io.BytesIO) -> any:
try:
keras_response = requests.post(
self.url,
data={"word": word},
files={"file": report_file},
files={"file": image_bytes},
timeout=self.timeout
)
@@ -53,34 +99,68 @@ class KerasService():
except requests.exceptions.Timeout:
return None
def capture_screenshot_find_word(self, word: str) -> any:
"""Take a screenshot and try to find the given word within it."""
self._capture_screenshot()
with open(self.screenshot_path, "rb") as report_file:
return self._query_service(word, report_file)
def _divide_horizontal(self, screenshot, quadrant: ScreenShotQuadrant):
"""divide the screenshot horizontally"""
height, _ = screenshot.shape
if quadrant == ScreenShotQuadrant.TOP:
return screenshot[0:int(height/2), :]
if quadrant == ScreenShotQuadrant.BOTTOM:
return screenshot[int(height/2):int(height), :]
raise FrameDivideException(
f"Unrecognized quadrant for horizontal: {quadrant}")
def look_for_word(self, word: str, attempts: int = 1, interval: float = 0.0) -> bool:
"""Takes a screenshot of the monitor and searches for a given word.
Will look for the word at a given time interval until the specified number
of attempts is reached.
Will return early if the query result comes back with a match.
def _divide_vertical(self, screenshot, quadrant: ScreenShotQuadrant):
"""divide the screenshot vertically"""
_, width = screenshot.shape
if quadrant == quadrant.LEFT:
return screenshot[:, 0:int(width/2)]
if quadrant == quadrant.RIGHT:
return screenshot[:, int(width/2):int(width)]
raise FrameDivideException(
f"Unrecognized quadrant for vertical: {quadrant}")
def _divide_in_four(self, screenshot, quadrant: ScreenShotQuadrant):
"""divide the screenshot in four quadrants"""
height, width = screenshot.shape
if quadrant == ScreenShotQuadrant.TOP_LEFT:
return screenshot[0:int(height/2), 0:int(width/2)]
if quadrant == ScreenShotQuadrant.TOP_RIGHT:
return screenshot[0:int(height/2), int(width/2):int(width)]
if quadrant == ScreenShotQuadrant.BOTTOM_LEFT:
return screenshot[int(height/2):int(height), 0:int(width/2)]
if quadrant == ScreenShotQuadrant.BOTTOM_RIGHT:
return screenshot[int(height/2):int(height), int(width/2):int(width)]
raise FrameDivideException(
f"Unrecognized quadrant for in four: {quadrant}")
def look_for_word(self, word: str, attempts: int = 1, interval: float = 0.0, split_config: ScreenSplitConfig = None) -> bool:
"""Overload for look_for_word but allows for screen splitting
which will look for a word in only part of the screen
"""
if split_config is None:
split_config = ScreenSplitConfig(
divide_method=ScreenShotDivideMethod.NONE, quadrant=ScreenShotQuadrant.TOP)
for _ in range(attempts):
result = self.capture_screenshot_find_word(word)
image_bytes = self._capture_screenshot(split_config)
result = self._query_service(word, image_bytes)
if result is not None:
return result
time.sleep(interval)
return None
def wait_for_word(self, word: str, interval: float = 0.0, timeout: float = 0.0):
def wait_for_word(self, word: str, interval: float = 0.0, timeout: float = 0.0, split_config: ScreenSplitConfig = None) -> bool:
"""Takes a screenshot of the monitor and searches for a given word.
Will look for the word at a given time interval until the specified timeout
has been exceeded.
Will return early if the query result comes back with a match.
"""
if split_config is None:
split_config = ScreenSplitConfig(
divide_method=ScreenShotDivideMethod.NONE, quadrant=ScreenShotQuadrant.TOP)
search_start_time = time.time()
while time.time() - search_start_time < timeout:
result = self.capture_screenshot_find_word(word)
image_bytes = self._capture_screenshot(split_config)
result = self._query_service(word, image_bytes)
if result is not None:
return result
time.sleep(interval)

View File

@@ -18,12 +18,14 @@ from harness_utils.output import (
DEFAULT_LOGGING_FORMAT,
DEFAULT_DATE_FORMAT,
)
from harness_utils.misc import remove_files
from harness_utils.misc import remove_files, press_n_times
from harness_utils.process import terminate_processes
from harness_utils.steam import (
exec_steam_run_command,
get_steamapps_common_path,
get_build_id
)
from harness_utils.artifacts import ArtifactManager, ArtifactType
STEAM_GAME_ID = 1649240
SCRIPT_DIRECTORY = os.path.dirname(os.path.realpath(__file__))
@@ -90,6 +92,7 @@ def run_benchmark() -> tuple[float]:
logging.info("Starting game")
exec_steam_run_command(STEAM_GAME_ID)
setup_start_time = time.time()
am = ArtifactManager(LOG_DIRECTORY)
time.sleep(10)
@@ -105,8 +108,58 @@ def run_benchmark() -> tuple[float]:
logging.info("Could not find prompt to open menu!")
sys.exit(1)
# Navigate to in-game benchmark and start it
navigate_options_menu()
# Navigate to display menu
user.press("esc")
time.sleep(1)
user.press("enter")
time.sleep(1)
user.press("q")
time.sleep(1)
user.press("q")
time.sleep(1)
# Verify that we have navigated to the video settings menu and take a screenshot
if kerasService.wait_for_word(word="aspect", timeout=30, interval=1) is None:
logging.info("Did not find the video settings menu. Did the menu get stuck?")
sys.exit(1)
am.take_screenshot("video.png", ArtifactType.CONFIG_IMAGE, "picture of video settings")
# Navigate to graphics menu
user.press("e")
time.sleep(1)
if kerasService.wait_for_word(word="vsync", timeout=30, interval=1) is None:
logging.info("Did not find the graphics settings menu. Did the menu get stuck?")
sys.exit(1)
am.take_screenshot("graphics_1.png", ArtifactType.CONFIG_IMAGE, "first picture of graphics settings")
# We check for a keyword that indicates DLSS is active because this changes how we navigate the menu
if kerasService.wait_for_word(word="sharpness", timeout=10, interval=1) is None:
logging.info("No DLSS Settings Detected")
# Scroll down graphics menu
press_n_times("down", 15, 0.2)
else:
logging.info("DLSS Settings Detected")
# Scroll down graphics menu
press_n_times("down", 17, 0.2)
if kerasService.wait_for_word(word="volumetric", timeout=30, interval=1) is None:
logging.info("Did not find the keyword 'volumetric'. Did the the menu scroll correctly?")
sys.exit(1)
am.take_screenshot("graphics_2.png", ArtifactType.CONFIG_IMAGE, "second picture of graphics settings")
# Scroll down graphics menu
press_n_times("down", 15, 0.2)
if kerasService.wait_for_word(word="hdr", timeout=30, interval=1) is None:
logging.info("Did not find the keyword 'hdr'. Did the the menu scroll correctly?")
sys.exit(1)
am.take_screenshot("graphics_3.png", ArtifactType.CONFIG_IMAGE, "third picture of graphics settings")
# Launch the benchmark
user.keyDown("tab")
time.sleep(5)
user.keyUp("tab")
setup_end_time = time.time()
elapsed_setup_time = round((setup_end_time - setup_start_time), 2)
@@ -137,6 +190,11 @@ def run_benchmark() -> tuple[float]:
"Results screen was not found! Did harness not wait long enough? Or test was too long?")
sys.exit(1)
# Give results screen time to fill out, then save screenshot and config file
time.sleep(2)
am.take_screenshot("result.png", ArtifactType.RESULTS_IMAGE, "screenshot of benchmark result")
am.copy_file(LOCAL_USER_SETTINGS, ArtifactType.CONFIG_TEXT, "config file")
elapsed_test_time = round((test_end_time - test_start_time), 2)
logging.info("Benchmark took %s seconds", elapsed_test_time)
@@ -156,8 +214,7 @@ console.setFormatter(formatter)
logging.getLogger('').addHandler(console)
args = get_args()
kerasService = KerasService(
args.keras_host, args.keras_port, os.path.join(LOG_DIRECTORY, "screenshot.jpg"))
kerasService = KerasService(args.keras_host, args.keras_port)
try:
start_time, end_time = run_benchmark()
@@ -165,7 +222,8 @@ try:
report = {
"resolution": format_resolution(width, height),
"start_time": seconds_to_milliseconds(start_time),
"end_time": seconds_to_milliseconds(end_time)
"end_time": seconds_to_milliseconds(end_time),
"version": get_build_id(STEAM_GAME_ID)
}
write_report_json(LOG_DIRECTORY, "report.json", report)

View File

@@ -38,8 +38,7 @@ console.setFormatter(formatter)
logging.getLogger('').addHandler(console)
args = get_args()
kerasService = KerasService(args.keras_host, args.keras_port, os.path.join(
LOG_DIRECTORY, "screenshot.jpg"))
kerasService = KerasService(args.keras_host, args.keras_port)
def get_run_game_id_command(game_id: int) -> str:

View File

@@ -18,7 +18,7 @@ from harness_utils.output import (
DEFAULT_LOGGING_FORMAT,
DEFAULT_DATE_FORMAT)
from harness_utils.process import terminate_processes
from harness_utils.keras_service import KerasService
from harness_utils.keras_service import KerasService, ScreenShotDivideMethod, ScreenShotQuadrant, ScreenSplitConfig
from harness_utils.steam import exec_steam_game
from harness_utils.artifacts import ArtifactManager, ArtifactType
@@ -46,25 +46,29 @@ def start_game():
return exec_steam_game(STEAM_GAME_ID, game_params=["-nolauncher"])
def run_benchmark():
def run_benchmark(keras_service, am):
"""Start game via Steam and enter fullscreen mode"""
setup_start_time = time.time()
start_game()
time.sleep(10)
args = get_args()
keras_service = KerasService(args.keras_host, args.keras_port, LOG_DIR.joinpath("screenshot.jpg"))
am = ArtifactManager(LOG_DIR)
ss_config = ScreenSplitConfig(
divide_method=ScreenShotDivideMethod.HORIZONTAL,
quadrant=ScreenShotQuadrant.TOP
)
if keras_service.wait_for_word(word="options", timeout=30, interval=1) is None:
if keras_service.wait_for_word(word="options", timeout=30, interval=1, split_config=ss_config) is None:
logging.info("Did not find the options menu. Did the game launch correctly?")
sys.exit(1)
logging.info("found options")
user.press("up")
time.sleep(0.2)
time.sleep(0.5)
user.press("up")
time.sleep(0.2)
time.sleep(0.5)
user.press("up")
time.sleep(0.2)
time.sleep(0.5)
user.press("enter")
time.sleep(1)
@@ -72,12 +76,14 @@ def run_benchmark():
logging.info("Did not find the graphics menu. Did the menu get stuck?")
sys.exit(1)
logging.info("found graphics")
user.press("down")
time.sleep(0.2)
time.sleep(0.5)
user.press("down")
time.sleep(0.2)
time.sleep(0.5)
user.press("down")
time.sleep(0.2)
time.sleep(0.5)
user.press("enter")
time.sleep(1)
@@ -143,10 +149,18 @@ def run_benchmark():
write_report_json(LOG_DIR, "report.json", report)
def main():
"""entry point"""
setup_logging()
args = get_args()
keras_service = KerasService(args.keras_host, args.keras_port)
am = ArtifactManager(LOG_DIR)
run_benchmark(keras_service, am)
if __name__ == "__main__":
try:
setup_logging()
run_benchmark()
main()
except Exception as ex:
logging.error("Something went wrong running the benchmark!")
logging.exception(ex)

View File

@@ -61,7 +61,7 @@ def console_command(command):
def run_benchmark(keras_host, keras_port):
"""Starts the benchmark"""
keras_service = KerasService(keras_host, keras_port, str(LOG_DIR.joinpath("screenshot.jpg")))
keras_service = KerasService(keras_host, keras_port)
copy_benchmarkfiles()
copy_benchmarksave()
start_game()

View File

@@ -126,8 +126,7 @@ console.setFormatter(formatter)
logging.getLogger('').addHandler(console)
args = get_args()
kerasService = KerasService(
args.keras_host, args.keras_port, os.path.join(LOG_DIRECTORY, "screenshot.jpg"))
kerasService = KerasService(args.keras_host, args.keras_port)
try:
start_time, end_time = run_benchmark()

View File

@@ -221,7 +221,7 @@ def main():
parser.add_argument("--kerasPort", dest="keras_port",
help="Port for Keras OCR service", required=True)
args = parser.parse_args()
keras_service = KerasService(args.keras_host, args.keras_port, LOG_DIR.joinpath("screenshot.jpg"))
keras_service = KerasService(args.keras_host, args.keras_port)
start_time, endtime = run_benchmark(keras_service)
height, width = read_current_resolution()
report = {

View File

@@ -2,6 +2,7 @@
from argparse import ArgumentParser
import logging
import os
from pathlib import Path
import time
import sys
import pyautogui as gui
@@ -19,14 +20,20 @@ from harness_utils.output import (
DEFAULT_LOGGING_FORMAT,
DEFAULT_DATE_FORMAT
)
from harness_utils.steam import get_app_install_location
from harness_utils.steam import get_app_install_location, get_build_id
from harness_utils.keras_service import KerasService
from harness_utils.artifacts import ArtifactManager, ArtifactType
SCRIPT_DIRECTORY = os.path.dirname(os.path.realpath(__file__))
LOG_DIRECTORY = os.path.join(SCRIPT_DIRECTORY, "run")
PROCESS_NAME = "Warhammer3.exe"
STEAM_GAME_ID = 1142710
APPDATA = os.getenv("APPDATA")
CONFIG_LOCATION = f"{APPDATA}\\The Creative Assembly\\Warhammer3\\scripts"
CONFIG_FILENAME = "preferences.script.txt"
CONFIG_FULL_PATH = f"{CONFIG_LOCATION}\\{CONFIG_FILENAME}"
user.FAILSAFE = False
@@ -63,6 +70,7 @@ def run_benchmark():
start_game()
setup_start_time = time.time()
time.sleep(5)
am = ArtifactManager(LOG_DIRECTORY)
result = kerasService.look_for_word("warning", attempts=10, interval=5)
if not result:
@@ -84,6 +92,8 @@ def run_benchmark():
gui.mouseUp()
time.sleep(2)
am.take_screenshot("main.png", ArtifactType.CONFIG_IMAGE, "picture of basic settings")
result = kerasService.look_for_word("ad", attempts=10, interval=1)
if not result:
logging.info("Did not find the advanced menu. Did the game skip the intros?")
@@ -96,6 +106,8 @@ def run_benchmark():
gui.mouseUp()
time.sleep(0.5)
am.take_screenshot("advanced.png", ArtifactType.CONFIG_IMAGE, "picture of advanced settings")
result = kerasService.look_for_word("bench", attempts=10, interval=1)
if not result:
logging.info("Did not find the benchmark menu. Did the game skip the intros?")
@@ -144,6 +156,9 @@ def run_benchmark():
# Wait 5 seconds for benchmark info
time.sleep(5)
am.take_screenshot("result.png", ArtifactType.RESULTS_IMAGE, "benchmark results")
am.copy_file(Path(CONFIG_FULL_PATH), ArtifactType.RESULTS_TEXT, "preferences.script.txt")
# End the run
elapsed_test_time = round(test_end_time - test_start_time, 2)
logging.info("Benchmark took %f seconds", elapsed_test_time)
@@ -172,8 +187,7 @@ parser.add_argument("--kerasHost", dest="keras_host",
parser.add_argument("--kerasPort", dest="keras_port",
help="Port for Keras OCR service", required=True)
args = parser.parse_args()
kerasService = KerasService(args.keras_host, args.keras_port, os.path.join(
LOG_DIRECTORY, "screenshot.jpg"))
kerasService = KerasService(args.keras_host, args.keras_port)
try:
start_time, endtime = run_benchmark()
@@ -181,7 +195,8 @@ try:
report = {
"resolution": format_resolution(width, height),
"start_time": seconds_to_milliseconds(start_time),
"end_time": seconds_to_milliseconds(endtime)
"end_time": seconds_to_milliseconds(endtime),
"version": get_build_id(STEAM_GAME_ID)
}
write_report_json(LOG_DIRECTORY, "report.json", report)