Add type hints to runtime directory (#7491)

Co-authored-by: openhands <openhands@all-hands.dev>
This commit is contained in:
Graham Neubig
2025-04-03 13:51:31 -04:00
committed by GitHub
parent 47515ea876
commit a828318494
5 changed files with 39 additions and 23 deletions

View File

@@ -1,7 +1,7 @@
import docker
def stop_all_containers(prefix: str):
def stop_all_containers(prefix: str) -> None:
docker_client = docker.from_env()
try:
containers = docker_client.containers.list(all=True)

View File

@@ -1,8 +1,17 @@
from typing import Protocol
from openhands.storage.files import FileStore
class SupportsFilesystemOperations(Protocol):
def write(self, path: str, contents: str | bytes) -> None: ...
def read(self, path: str) -> str: ...
def list(self, path: str) -> list[str]: ...
def delete(self, path: str) -> None: ...
class E2BFileStore(FileStore):
def __init__(self, filesystem):
def __init__(self, filesystem: SupportsFilesystemOperations) -> None:
self.filesystem = filesystem
def write(self, path: str, contents: str | bytes) -> None:

View File

@@ -38,7 +38,7 @@ def _output_error(error_msg: str) -> bool:
return False
def _is_valid_filename(file_name) -> bool:
def _is_valid_filename(file_name: str) -> bool:
if not file_name or not isinstance(file_name, str) or not file_name.strip():
return False
invalid_chars = '<>:"/\\|?*'
@@ -53,7 +53,7 @@ def _is_valid_filename(file_name) -> bool:
return True
def _is_valid_path(path) -> bool:
def _is_valid_path(path: str) -> bool:
if not path or not isinstance(path, str):
return False
try:
@@ -62,7 +62,7 @@ def _is_valid_path(path) -> bool:
return False
def _create_paths(file_name) -> bool:
def _create_paths(file_name: str) -> bool:
try:
dirname = os.path.dirname(file_name)
if dirname:
@@ -81,7 +81,7 @@ def _check_current_file(file_path: str | None = None) -> bool:
return True
def _clamp(value, min_value, max_value):
def _clamp(value: int, min_value: int, max_value: int) -> int:
return max(min_value, min(value, max_value))
@@ -112,10 +112,15 @@ def _lint_file(file_path: str) -> tuple[str | None, int | None]:
def _print_window(
file_path, targeted_line, window, return_str=False, ignore_window=False
):
file_path: str | None,
targeted_line: int,
window: int,
return_str: bool = False,
ignore_window: bool = False,
) -> str:
global CURRENT_LINE
_check_current_file(file_path)
if not _check_current_file(file_path) or file_path is None:
return ''
with open(file_path) as file:
content = file.read()
@@ -166,9 +171,10 @@ def _print_window(
return output
else:
print(output)
return ''
def _cur_file_header(current_file, total_lines) -> str:
def _cur_file_header(current_file: str | None, total_lines: int) -> str:
if not current_file:
return ''
return f'[File: {os.path.abspath(current_file)} ({total_lines} lines total)]\n'
@@ -229,7 +235,8 @@ def goto_line(line_number: int) -> None:
line_number: int: The line number to move to.
"""
global CURRENT_FILE, CURRENT_LINE, WINDOW
_check_current_file()
if not _check_current_file():
return
with open(str(CURRENT_FILE)) as file:
total_lines = max(1, sum(1 for _ in file))
@@ -238,7 +245,6 @@ def goto_line(line_number: int) -> None:
return
CURRENT_LINE = _clamp(line_number, 1, total_lines)
output = _cur_file_header(CURRENT_FILE, total_lines)
output += _print_window(
CURRENT_FILE, CURRENT_LINE, WINDOW, return_str=True, ignore_window=False
@@ -253,8 +259,8 @@ def scroll_down() -> None:
None
"""
global CURRENT_FILE, CURRENT_LINE, WINDOW
_check_current_file()
if not _check_current_file():
return
with open(str(CURRENT_FILE)) as file:
total_lines = max(1, sum(1 for _ in file))
CURRENT_LINE = _clamp(CURRENT_LINE + WINDOW, 1, total_lines)
@@ -272,8 +278,8 @@ def scroll_up() -> None:
None
"""
global CURRENT_FILE, CURRENT_LINE, WINDOW
_check_current_file()
if not _check_current_file():
return
with open(str(CURRENT_FILE)) as file:
total_lines = max(1, sum(1 for _ in file))
CURRENT_LINE = _clamp(CURRENT_LINE - WINDOW, 1, total_lines)

View File

@@ -19,6 +19,7 @@ Note:
"""
import base64
from typing import Any
import docx
import PyPDF2
@@ -103,7 +104,7 @@ def _base64_video(file_path: str, frame_interval: int = 10) -> list[str]:
return base64_frames
def _prepare_image_messages(task: str, base64_image: str):
def _prepare_image_messages(task: str, base64_image: str) -> list[dict[str, Any]]:
return [
{
'role': 'user',

View File

@@ -9,22 +9,22 @@ from openai import OpenAI
# NOTE: we need to get env vars inside functions because they will be set in IPython
# AFTER the agentskills is imported (the case for DockerRuntime)
# ==================================================================================================
def _get_openai_api_key():
def _get_openai_api_key() -> str:
return os.getenv('OPENAI_API_KEY', os.getenv('SANDBOX_ENV_OPENAI_API_KEY', ''))
def _get_openai_base_url():
def _get_openai_base_url() -> str:
return os.getenv('OPENAI_BASE_URL', 'https://api.openai.com/v1')
def _get_openai_model():
def _get_openai_model() -> str:
return os.getenv('OPENAI_MODEL', 'gpt-4o')
def _get_max_token():
return os.getenv('MAX_TOKEN', 500)
def _get_max_token() -> int:
return int(os.getenv('MAX_TOKEN', '500'))
def _get_openai_client():
def _get_openai_client() -> OpenAI:
client = OpenAI(api_key=_get_openai_api_key(), base_url=_get_openai_base_url())
return client