some typing cleanups

This commit is contained in:
Francis Lata
2025-02-28 14:47:29 +00:00
parent dc394e8214
commit 87bfa77f4a
2 changed files with 8 additions and 8 deletions

View File

@@ -1,5 +1,5 @@
import os, random, pickle, queue
from typing import List, Tuple, Optional, Dict
from typing import List, Optional
from pathlib import Path
from multiprocessing import Queue, Process, shared_memory, connection, Lock, cpu_count
@@ -351,8 +351,8 @@ def batch_load_unet3d(preprocessed_dataset_dir:Path, batch_size:int=6, val:bool=
### RetinaNet
def load_retinanet_data(base_dir:Path, val:bool, queue_in:Queue, queue_out:Queue,
imgs:Tensor, boxes:Tensor, labels:Tensor, matches:Optional[Tensor] = None,
anchors:Optional[Tensor] = None, seed:Optional[int] = None):
imgs:Tensor, boxes:Tensor, labels:Tensor, matches:Optional[Tensor]=None,
anchors:Optional[Tensor]=None, seed:Optional[int]=None):
from extra.datasets.openimages import image_load, random_horizontal_flip, resize
from examples.mlperf.helpers import box_iou, find_matches, generate_anchors
import torch
@@ -401,7 +401,7 @@ def batch_load_retinanet(dataset, val:bool, base_dir:Path, batch_size:int=32, sh
queue_in.put((idx, img, tgt))
def _setup_shared_mem(shm_name:str, size:Tuple[int, ...], dtype:dtypes) -> Tuple[shared_memory.SharedMemory, Tensor]:
def _setup_shared_mem(shm_name:str, size:tuple[int, ...], dtype:dtypes) -> tuple[shared_memory.SharedMemory, Tensor]:
if os.path.exists(f"/dev/shm/{shm_name}"): os.unlink(f"/dev/shm/{shm_name}")
shm = shared_memory.SharedMemory(name=shm_name, create=True, size=prod(size))
shm_tensor = Tensor.empty(*size, dtype=dtype, device=f"disk:/dev/shm/{shm_name}")

View File

@@ -1,6 +1,6 @@
from collections import OrderedDict
import unicodedata
from typing import Optional, Tuple, List
from typing import Optional
import math
import numpy as np
from tinygrad.nn import state
@@ -258,7 +258,7 @@ def find_matches(match_quality_matrix:np.ndarray, high_threshold:float=0.5, low_
def box_iou(boxes1:np.ndarray, boxes2:np.ndarray) -> np.ndarray:
def _box_area(boxes:np.ndarray) -> np.ndarray: return (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1])
def _box_inter_union(boxes1:np.ndarray, boxes2:np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
def _box_inter_union(boxes1:np.ndarray, boxes2:np.ndarray) -> tuple[np.ndarray, np.ndarray]:
area1, area2 = _box_area(boxes1), _box_area(boxes2)
lt, rb = np.maximum(boxes1[:, None, :2], boxes2[:, :2]), np.minimum(boxes1[:, None, 2:], boxes2[:, 2:])
wh = np.clip(rb - lt, a_min=0, a_max=None)
@@ -269,8 +269,8 @@ def box_iou(boxes1:np.ndarray, boxes2:np.ndarray) -> np.ndarray:
inter, union = _box_inter_union(boxes1, boxes2)
return inter / union
def generate_anchors(input_size:Tuple[int, int], batch_size:int = 1, scales:Optional[Tuple[Tensor, ...]] = None, aspect_ratios:Optional[Tuple[Tensor, ...]] = None) -> List[np.ndarray]:
def _compute_grid_sizes(input_size:Tuple[int, int]) -> np.ndarray:
def generate_anchors(input_size:tuple[int, int], batch_size:int = 1, scales:Optional[tuple[Tensor, ...]] = None, aspect_ratios:Optional[tuple[Tensor, ...]] = None) -> list[np.ndarray]:
def _compute_grid_sizes(input_size:tuple[int, int]) -> np.ndarray:
return np.ceil(np.array(input_size)[None, :] / 2 ** np.arange(3, 8)[:, None])
scales = tuple((i, int(i * 2 ** (1/3)), int(i * 2 ** (2/3))) for i in 2 ** np.arange(5, 10)) if scales is None else scales