initial commit, linter come at me

This commit is contained in:
Nikolas
2024-09-23 14:59:05 -07:00
parent 1171cc5d49
commit 35a5d63034
18 changed files with 151 additions and 54 deletions

View File

@@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.
Changes are grouped by the date they are merged to the main branch of the repository and are ordered from newest to oldest. Dates use the ISO 8601 extended calendar date format, i.e. YYYY-MM-DD.
## 2024-09-23
- Add screen splitting to Keras Service.
## 2024-9-20
- Add Grid Legends test harness.

View File

@@ -158,9 +158,7 @@ console.setFormatter(formatter)
logging.getLogger("").addHandler(console)
args = get_args()
kerasService = KerasService(
args.keras_host, args.keras_port, os.path.join(LOG_DIRECTORY, "screenshot.jpg")
)
kerasService = KerasService(args.keras_host, args.keras_port)
try:
start_time, end_time = run_benchmark()

View File

@@ -228,10 +228,7 @@ parser.add_argument(
"--kerasPort", dest="keras_port", help="Port for Keras OCR service", required=True
)
args = parser.parse_args()
kerasService = KerasService(
args.keras_host, args.keras_port, os.path.join(
LOG_DIRECTORY, "screenshot.jpg")
)
kerasService = KerasService(args.keras_host, args.keras_port)
try:
start_time, end_time = run_benchmark()

View File

@@ -141,8 +141,7 @@ parser.add_argument("--kerasHost", dest="keras_host",
parser.add_argument("--kerasPort", dest="keras_port",
help="Port for Keras OCR service", required=True)
args = parser.parse_args()
kerasService = KerasService(args.keras_host, args.keras_port, os.path.join(
LOG_DIRECTORY, "screenshot.jpg"))
kerasService = KerasService(args.keras_host, args.keras_port)
try:
start_time, end_time = run_benchmark()

View File

@@ -123,7 +123,7 @@ def main():
parser.add_argument("--kerasPort", dest="keras_port",
help="Port for Keras OCR service", required=True)
args = parser.parse_args()
keras_service = KerasService(args.keras_host, args.keras_port, LOG_DIR.joinpath("screenshot.jpg"))
keras_service = KerasService(args.keras_host, args.keras_port)
test_start_time, test_end_time = run_benchmark(keras_service)
resolution = read_current_resolution()

View File

@@ -125,8 +125,7 @@ def main():
help="Port for Keras OCR service", required=True)
args = parser.parse_args()
keras_service = KerasService(args.keras_host, args.keras_port, os.path.join(
LOG_DIR, "screenshot.jpg"))
keras_service = KerasService(args.keras_host, args.keras_port)
start_time, end_time = run_benchmark(keras_service)

View File

@@ -126,8 +126,7 @@ console.setFormatter(formatter)
logging.getLogger('').addHandler(console)
args = get_args()
kerasService = KerasService(args.keras_host, args.keras_port, os.path.join(
LOG_DIRECTORY, "screenshot.jpg"))
kerasService = KerasService(args.keras_host, args.keras_port)
try:
start_time, end_time = run_benchmark()

View File

@@ -37,8 +37,7 @@ console.setFormatter(formatter)
logging.getLogger('').addHandler(console)
args = get_args()
kerasService = KerasService(args.keras_host, args.keras_port, os.path.join(
LOG_DIRECTORY, "screenshot.jpg"))
kerasService = KerasService(args.keras_host, args.keras_port)
def start_game():

View File

@@ -133,8 +133,7 @@ parser.add_argument("--kerasHost", dest="keras_host",
parser.add_argument("--kerasPort", dest="keras_port",
help="Port for Keras OCR service", required=True)
args = parser.parse_args()
kerasService = KerasService(args.keras_host, args.keras_port, os.path.join(
LOG_DIRECTORY, "screenshot.jpg"))
kerasService = KerasService(args.keras_host, args.keras_port)
try:
start_time, end_time = run_benchmark()

View File

@@ -142,7 +142,7 @@ def run_benchmark(keras_service):
def main():
"""entry point"""
args = get_args()
keras_service = KerasService(args.keras_host, args.keras_port, LOG_DIR.joinpath("screenshot.jpg"))
keras_service = KerasService(args.keras_host, args.keras_port)
start_time, end_time = run_benchmark(keras_service)
elapsed_test_time = round((end_time - start_time), 2)
logging.info("Benchmark took %f seconds", elapsed_test_time)

View File

@@ -1,45 +1,90 @@
"""Allows accessing Keras Service if available."""
import io
import json
import logging
import os
import time
import mss
import cv2
import requests
import numpy as np
from enum import Enum
from dataclasses import dataclass
logging.getLogger("requests").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)
DEFAULT_TIMEOUT = 120.0
class ScreenShotDivideMethod(Enum):
"""split method"""
horizontal = "horizontal"
vertical = "vertical"
quadrant = "quadrant"
none = "none"
class ScreenShotQuadrant(Enum):
"""split arguments"""
TOP = 1
BOTTOM = 2
LEFT = 1
RIGHT = 2
TOP_LEFT = 1
TOP_RIGHT = 2
BOTTOM_LEFT = 3
BOTTOM_RIGHT = 4
class FrameDivideException(ValueError):
"""Exception indicating was encountered while trying to divide a frame"""
@dataclass
class ScreenSplitConfig:
"""data class to contain config for taking splitting a screen shot"""
divide_method: ScreenShotDivideMethod
quadrant: ScreenShotQuadrant
class KerasService():
"""Sets up connection to a Keras service and provides methods to use it"""
def __init__(
self,
ip_addr: str,
port: int | str,
screenshot_path: str | os.PathLike,
timeout: float = DEFAULT_TIMEOUT) -> None:
self.ip_addr = ip_addr
self.port = str(port)
self.url = f"http://{ip_addr}:{str(port)}/process"
self.screenshot_path = screenshot_path
self.timeout = timeout
def _capture_screenshot(self) -> None:
def _capture_screenshot(self, split_config: ScreenSplitConfig) -> io.BytesIO:
screenshot = None
with mss.mss() as sct:
monitor_1 = sct.monitors[1] # Identify the display to capture
screen = np.array(sct.grab(monitor_1))
screen = cv2.cvtColor(screen, cv2.COLOR_RGB2GRAY)
cv2.imwrite(str(self.screenshot_path), screen)
screenshot = np.array(sct.grab(monitor_1))
screenshot = cv2.cvtColor(screenshot, cv2.COLOR_RGB2GRAY)
def _query_service(self, word: str, report_file: any) -> any:
if split_config.divide_method == ScreenShotDivideMethod.horizontal:
screenshot = self._divide_horizontal(screenshot)
elif split_config.divide_method == ScreenShotDivideMethod.vertical:
screenshot = self._divide_vertical(screenshot)
elif split_config.divide_method == ScreenShotDivideMethod.quadrant:
screenshot = self._divide_in_four(screenshot)
elif split_config.divide_method == ScreenShotDivideMethod.none:
screenshot = screenshot
_, encoded_image = cv2.imencode('.jpg', screenshot)
return io.BytesIO(encoded_image)
def _query_service(self, word: str, image_bytes: io.BytesIO) -> any:
try:
keras_response = requests.post(
self.url,
data={"word": word},
files={"file": report_file},
files={"file": image_bytes},
timeout=self.timeout
)
@@ -53,11 +98,56 @@ class KerasService():
except requests.exceptions.Timeout:
return None
def capture_screenshot_find_word(self, word: str) -> any:
"""Take a screenshot and try to find the given word within it."""
self._capture_screenshot()
with open(self.screenshot_path, "rb") as report_file:
return self._query_service(word, report_file)
def _divide_horizontal(self, screenshot, quadrant: ScreenShotQuadrant):
"""divide the screenshot horizontally"""
height, _, _ = screenshot.shape
if quadrant == ScreenShotQuadrant.TOP:
return screenshot[0:int(height/2), :]
elif quadrant == ScreenShotQuadrant.BOTTOM:
return screenshot[int(height/2):int(height), :]
else:
raise FrameDivideException(
f"Unrecognized quadrant for horizontal: {self.quadrant}")
def _divide_vertical(self, screenshot, quadrant: ScreenShotQuadrant):
"""divide the screenshot vertically"""
_, width, _ = screenshot.shape
if self.quadrant == quadrant.LEFT:
return screenshot[:, 0:int(width/2)]
elif self.quadrant == quadrant.RIGHT:
return screenshot[:, int(width/2):int(width)]
else:
raise FrameDivideException(
f"Unrecognized quadrant for vertical: {self.quadrant}")
def _divide_in_four(self, screenshot, quadrant: ScreenShotQuadrant):
"""divide the screenshot in four quadrants"""
height, width, _ = screenshot.shape
if self.quadrant == ScreenShotQuadrant.TOP_LEFT:
return screenshot[0:int(height/2), 0:int(width/2)]
elif self.quadrant == ScreenShotQuadrant.TOP_RIGHT:
return screenshot[0:int(height/2), int(width/2):int(width)]
elif self.quadrant == ScreenShotQuadrant.BOTTOM_LEFT:
return screenshot[int(height/2):int(height), 0:int(width/2)]
elif self.quadrant == ScreenShotQuadrant.BOTTOM_RIGHT:
return screenshot[int(height/2):int(height), int(width/2):int(width)]
else:
raise FrameDivideException(
f"Unrecognized quadrant for in four: {self.quadrant}")
def _look_for_word(
self, word: str,
attempts: int = 1,
interval: float = 0.0,
split_config: ScreenSplitConfig = None) -> bool:
"""implementation of look for word"""
for _ in range(attempts):
image_bytes = self._capture_screenshot(split_config)
result = self._query_service(word, image_bytes)
if result is not None:
return result
time.sleep(interval)
return None
def look_for_word(self, word: str, attempts: int = 1, interval: float = 0.0) -> bool:
"""Takes a screenshot of the monitor and searches for a given word.
@@ -65,23 +155,40 @@ class KerasService():
of attempts is reached.
Will return early if the query result comes back with a match.
"""
for _ in range(attempts):
result = self.capture_screenshot_find_word(word)
return self._look_for_word(word, attempts, interval, ScreenSplitConfig(divide_method=ScreenShotDivideMethod.none))
def look_for_word(self, word: str, attempts: int = 1, interval: float = 0.0, split_config: ScreenSplitConfig = None) -> bool:
"""Overload for look_for_word but allows for screen splitting
which will look for a word in only part of the screen
"""
return self._look_for_word(word, attempts, interval, split_config)
def _wait_for_word(
self,
word: str,
interval: float,
timeout: float,
split_config: ScreenSplitConfig) -> bool:
"""implementation of wait for word"""
search_start_time = time.time()
while time.time() - search_start_time < timeout:
image_bytes = self._capture_screenshot(split_config)
result = self._query_service(word, image_bytes)
if result is not None:
return result
time.sleep(interval)
return None
def wait_for_word(self, word: str, interval: float = 0.0, timeout: float = 0.0):
def wait_for_word(self, word: str, interval: float = 0.0, timeout: float = 0.0) -> bool:
"""Takes a screenshot of the monitor and searches for a given word.
Will look for the word at a given time interval until the specified timeout
has been exceeded.
Will return early if the query result comes back with a match.
"""
search_start_time = time.time()
while time.time() - search_start_time < timeout:
result = self.capture_screenshot_find_word(word)
if result is not None:
return result
time.sleep(interval)
return None
return self._wait_for_word(word, interval, timeout, ScreenSplitConfig(divide_method=ScreenShotDivideMethod.none))
def wait_for_word(self, word: str, interval: float = 0.0, timeout: float = 0.0, split_config: ScreenSplitConfig = None) -> bool:
"""Overload for wait_for_word but allows for screen splitting
which will look for a word in only part of the screen
"""
return self._wait_for_word(word, interval, timeout, split_config)

View File

@@ -156,8 +156,7 @@ console.setFormatter(formatter)
logging.getLogger('').addHandler(console)
args = get_args()
kerasService = KerasService(
args.keras_host, args.keras_port, os.path.join(LOG_DIRECTORY, "screenshot.jpg"))
kerasService = KerasService(args.keras_host, args.keras_port)
try:
start_time, end_time = run_benchmark()

View File

@@ -38,8 +38,7 @@ console.setFormatter(formatter)
logging.getLogger('').addHandler(console)
args = get_args()
kerasService = KerasService(args.keras_host, args.keras_port, os.path.join(
LOG_DIRECTORY, "screenshot.jpg"))
kerasService = KerasService(args.keras_host, args.keras_port)
def get_run_game_id_command(game_id: int) -> str:

View File

@@ -52,7 +52,7 @@ def run_benchmark():
start_game()
args = get_args()
keras_service = KerasService(args.keras_host, args.keras_port, LOG_DIR.joinpath("screenshot.jpg"))
keras_service = KerasService(args.keras_host, args.keras_port)
am = ArtifactManager(LOG_DIR)
if keras_service.wait_for_word(word="options", timeout=30, interval=1) is None:

View File

@@ -61,7 +61,7 @@ def console_command(command):
def run_benchmark(keras_host, keras_port):
"""Starts the benchmark"""
keras_service = KerasService(keras_host, keras_port, str(LOG_DIR.joinpath("screenshot.jpg")))
keras_service = KerasService(keras_host, keras_port)
copy_benchmarkfiles()
copy_benchmarksave()
start_game()

View File

@@ -126,8 +126,7 @@ console.setFormatter(formatter)
logging.getLogger('').addHandler(console)
args = get_args()
kerasService = KerasService(
args.keras_host, args.keras_port, os.path.join(LOG_DIRECTORY, "screenshot.jpg"))
kerasService = KerasService(args.keras_host, args.keras_port)
try:
start_time, end_time = run_benchmark()

View File

@@ -167,7 +167,7 @@ def main():
parser.add_argument("--kerasPort", dest="keras_port",
help="Port for Keras OCR service", required=True)
args = parser.parse_args()
keras_service = KerasService(args.keras_host, args.keras_port, LOG_DIR.joinpath("screenshot.jpg"))
keras_service = KerasService(args.keras_host, args.keras_port)
start_time, endtime = run_benchmark(keras_service)
height, width = read_current_resolution()
report = {

View File

@@ -172,8 +172,7 @@ parser.add_argument("--kerasHost", dest="keras_host",
parser.add_argument("--kerasPort", dest="keras_port",
help="Port for Keras OCR service", required=True)
args = parser.parse_args()
kerasService = KerasService(args.keras_host, args.keras_port, os.path.join(
LOG_DIRECTORY, "screenshot.jpg"))
kerasService = KerasService(args.keras_host, args.keras_port)
try:
start_time, endtime = run_benchmark()