add dataloader + test

This commit is contained in:
Francis Lata
2024-10-16 15:38:47 -04:00
parent 3d857d758e
commit 4bebe61a9c
9 changed files with 1573 additions and 11 deletions

View File

@@ -359,8 +359,13 @@ def batch_load_unet3d(preprocessed_dataset_dir:Path, batch_size:int=6, val:bool=
def load_retinanet_data(base_dir:Path, val:bool, queue_in:Queue, queue_out:Queue, X:Tensor, Y_boxes:Tensor, Y_labels:Tensor, anchors:np.ndarray):
from extra.datasets.openimages import image_load, prepare_target, random_horizontal_flip, resize
from examples.mlperf.helpers import box_iou, find_matches
import torch
while (data:=queue_in.get()) is not None:
np.random.seed(42)
random.seed(42)
torch.manual_seed(42)
idx, img, ann = data
img_id = img["id"]
img = image_load(base_dir, img["subset"], img["file_name"])
@@ -387,9 +392,9 @@ def batch_load_retinanet(dataset, val:bool, anchors:np.ndarray, base_dir:Path, b
queue_in.put((idx, img, ann))
def _setup_shared_mem(shm_name:str, size:Tuple[int, ...], dtype:dtypes) -> Tuple[shared_memory.SharedMemory, Tensor]:
if os.path.exists(f"/dev/shm/{shm_name}"): os.unlink(f"/dev/shm/{shm_name}")
if os.path.exists(f"/Users/flata/Downloads/shm/{shm_name}"): os.unlink(f"/Users/flata/Downloads/shm/{shm_name}")
shm = shared_memory.SharedMemory(name=shm_name, create=True, size=prod(size))
shm_tensor = Tensor.empty(*size, dtype=dtype, device=f"disk:/dev/shm/{shm_name}")
shm_tensor = Tensor.empty(*size, dtype=dtype, device=f"disk:/Users/flata/Downloads/shm/{shm_name}")
return shm, shm_tensor
image_ids = sorted(dataset.imgs.keys())

View File

@@ -1,10 +1,14 @@
from extra.datasets.kits19 import iterate, preprocess
from examples.mlperf.dataloader import batch_load_unet3d
from test.external.mlperf_retinanet.openimages import get_openimages
from extra.datasets.openimages import download_dataset
from examples.mlperf.dataloader import batch_load_unet3d, batch_load_retinanet
from test.external.mlperf_retinanet.openimages import get_openimages, postprocess_targets
from test.external.mlperf_retinanet.presets import DetectionPresetTrain, DetectionPresetEval
from test.external.mlperf_retinanet.transforms import GeneralizedRCNNTransform
from test.external.mlperf_unet3d.kits19 import PytTrain, PytVal
from tinygrad.helpers import temp
from pathlib import Path
from PIL import Image
from pycocotools.coco import COCO
import json
import nibabel as nib
@@ -12,12 +16,14 @@ import numpy as np
import os
import random
import tempfile
import torch
import unittest
class ExternalTestDatasets(unittest.TestCase):
def _set_seed(self):
np.random.seed(42)
random.seed(42)
torch.manual_seed(42)
class TestKiTS19Dataset(ExternalTestDatasets):
def _create_samples(self, val, num_samples=2):
@@ -54,7 +60,7 @@ class TestKiTS19Dataset(ExternalTestDatasets):
if use_old_dataloader:
dataset = iterate(list(Path(tempfile.gettempdir()).glob("case_*")), preprocessed_dir=preproc_pth, val=val, shuffle=shuffle, bs=batch_size)
else:
dataset = iter(batch_load_unet3d(preproc_pth, batch_size=batch_size, val=val, shuffle=shuffle, seed=seed))
dataset = batch_load_unet3d(preproc_pth, batch_size=batch_size, val=val, shuffle=shuffle, seed=seed)
return iter(dataset)
@@ -80,6 +86,8 @@ class TestKiTS19Dataset(ExternalTestDatasets):
class TestOpenImagesDataset(ExternalTestDatasets):
def _create_samples(self, subset):
self._set_seed()
os.makedirs(Path(base_dir:=tempfile.gettempdir() + "/openimages") / f"{subset}/data", exist_ok=True)
os.makedirs(base_dir / Path(f"{subset}/labels"), exist_ok=True)
@@ -102,16 +110,35 @@ class TestOpenImagesDataset(ExternalTestDatasets):
return base_dir, ann_file
def _create_ref_dataloader(self, subset):
def _create_ref_dataloader(self, subset, batch_size=1):
base_dir, ann_file = self._create_samples(subset)
print(f"{base_dir=} {ann_file=}")
transforms = DetectionPresetTrain("hflip")
dataset = get_openimages(ann_file.stem, base_dir, subset, transforms)
return iter(dataset)
def _create_tinygrad_dataloader(self):
pass
def _create_tinygrad_dataloader(self, subset, anchors, batch_size=1):
base_dir, ann_file = self._create_samples(subset)
dataset = COCO(ann_file)
dataloader = batch_load_retinanet(dataset, subset == "validation", anchors, Path(base_dir), batch_size=batch_size)
return iter(dataloader)
def test_training_set(self):
self._create_ref_dataloader("train")
assert 1==0
img_size, img_mean, img_std, anchors = (800, 800), [0.485, 0.456, 0.406], [0.229, 0.224, 0.225], torch.ones((120087, 4))
tinygrad_dataloader, ref_dataloader = self._create_tinygrad_dataloader("train", anchors.numpy()), self._create_ref_dataloader("train")
transform = GeneralizedRCNNTransform(img_size, img_mean, img_std)
for ((tinygrad_img, tinygrad_boxes, tinygrad_labels, _), (ref_img, ref_tgt)) in zip(tinygrad_dataloader, ref_dataloader):
self._set_seed()
ref_tgt = [ref_tgt]
print(f"{ref_img=} {ref_tgt=}")
ref_img, ref_tgt = transform(ref_img, [ref_tgt])
ref_tgt = postprocess_targets(ref_tgt, anchors.unsqueeze(0))
ref_boxes, ref_labels = ref_tgt[0]["boxes"], ref_tgt[0]["labels"]
np.testing.assert_equal(tinygrad_img.numpy(), ref_img.tensors.transpose(1, 3).numpy())
# print(f"{tinygrad_img.shape=} {tinygrad_boxes.shape=} {tinygrad_labels.shape=}")
# print(f"{ref_boxes.shape=} {ref_labels.shape=} {ref_img.tensors.shape=}")
if __name__ == '__main__':
unittest.main()

197
test/external/mlperf_retinanet/boxes.py vendored Normal file
View File

@@ -0,0 +1,197 @@
# https://github.com/mlcommons/training/blob/cdd928d4596c142c15a7d86b2eeadbac718c8da2/single_stage_detector/ssd/model/boxes.py
import torch
from torch import Tensor
from typing import Tuple
import torchvision
from torchvision.extension import _assert_has_ops
def nms(boxes: Tensor, scores: Tensor, iou_threshold: float) -> Tensor:
"""
Performs non-maximum suppression (NMS) on the boxes according
to their intersection-over-union (IoU).
NMS iteratively removes lower scoring boxes which have an
IoU greater than iou_threshold with another (higher scoring)
box.
If multiple boxes have the exact same score and satisfy the IoU
criterion with respect to a reference box, the selected box is
not guaranteed to be the same between CPU and GPU. This is similar
to the behavior of argsort in PyTorch when repeated values are present.
Args:
boxes (Tensor[N, 4])): boxes to perform NMS on. They
are expected to be in ``(x1, y1, x2, y2)`` format with ``0 <= x1 < x2`` and
``0 <= y1 < y2``.
scores (Tensor[N]): scores for each one of the boxes
iou_threshold (float): discards all overlapping boxes with IoU > iou_threshold
Returns:
Tensor: int64 tensor with the indices of the elements that have been kept
by NMS, sorted in decreasing order of scores
"""
_assert_has_ops()
return torch.ops.torchvision.nms(boxes, scores, iou_threshold)
def batched_nms(
boxes: Tensor,
scores: Tensor,
idxs: Tensor,
iou_threshold: float,
) -> Tensor:
"""
Performs non-maximum suppression in a batched fashion.
Each index value correspond to a category, and NMS
will not be applied between elements of different categories.
Args:
boxes (Tensor[N, 4]): boxes where NMS will be performed. They
are expected to be in ``(x1, y1, x2, y2)`` format with ``0 <= x1 < x2`` and
``0 <= y1 < y2``.
scores (Tensor[N]): scores for each one of the boxes
idxs (Tensor[N]): indices of the categories for each one of the boxes.
iou_threshold (float): discards all overlapping boxes with IoU > iou_threshold
Returns:
Tensor: int64 tensor with the indices of the elements that have been kept by NMS, sorted
in decreasing order of scores
"""
# Benchmarks that drove the following thresholds are at
# https://github.com/pytorch/vision/issues/1311#issuecomment-781329339
# Ideally for GPU we'd use a higher threshold
if boxes.numel() > 4_000 and not torchvision._is_tracing():
return _batched_nms_vanilla(boxes, scores, idxs, iou_threshold)
else:
return _batched_nms_coordinate_trick(boxes, scores, idxs, iou_threshold)
@torch.jit._script_if_tracing
def _batched_nms_coordinate_trick(
boxes: Tensor,
scores: Tensor,
idxs: Tensor,
iou_threshold: float,
) -> Tensor:
# strategy: in order to perform NMS independently per class,
# we add an offset to all the boxes. The offset is dependent
# only on the class idx, and is large enough so that boxes
# from different classes do not overlap
if boxes.numel() == 0:
return torch.empty((0,), dtype=torch.int64, device=boxes.device)
max_coordinate = boxes.max()
offsets = idxs.to(boxes) * (max_coordinate + torch.tensor(1).to(boxes))
boxes_for_nms = boxes + offsets[:, None]
keep = nms(boxes_for_nms, scores, iou_threshold)
return keep
@torch.jit._script_if_tracing
def _batched_nms_vanilla(
boxes: Tensor,
scores: Tensor,
idxs: Tensor,
iou_threshold: float,
) -> Tensor:
# Based on Detectron2 implementation, just manually call nms() on each class independently
keep_mask = torch.zeros_like(scores, dtype=torch.bool)
for class_id in torch.unique(idxs):
curr_indices = torch.where(idxs == class_id)[0]
curr_keep_indices = nms(boxes[curr_indices], scores[curr_indices], iou_threshold)
keep_mask[curr_indices[curr_keep_indices]] = True
keep_indices = torch.where(keep_mask)[0]
return keep_indices[scores[keep_indices].sort(descending=True)[1]]
def clip_boxes_to_image(boxes: Tensor, size: Tuple[int, int]) -> Tensor:
"""
Clip boxes so that they lie inside an image of size `size`.
Args:
boxes (Tensor[N, 4]): boxes in ``(x1, y1, x2, y2)`` format
with ``0 <= x1 < x2`` and ``0 <= y1 < y2``.
size (Tuple[height, width]): size of the image
Returns:
Tensor[N, 4]: clipped boxes
"""
dim = boxes.dim()
boxes_x = boxes[..., 0::2]
boxes_y = boxes[..., 1::2]
height, width = size
if torchvision._is_tracing():
boxes_x = torch.max(boxes_x, torch.tensor(0, dtype=boxes.dtype, device=boxes.device))
boxes_x = torch.min(boxes_x, torch.tensor(width, dtype=boxes.dtype, device=boxes.device))
boxes_y = torch.max(boxes_y, torch.tensor(0, dtype=boxes.dtype, device=boxes.device))
boxes_y = torch.min(boxes_y, torch.tensor(height, dtype=boxes.dtype, device=boxes.device))
else:
boxes_x = boxes_x.clamp(min=0, max=width)
boxes_y = boxes_y.clamp(min=0, max=height)
clipped_boxes = torch.stack((boxes_x, boxes_y), dim=dim)
return clipped_boxes.reshape(boxes.shape)
def _upcast(t: Tensor) -> Tensor:
# Protects from numerical overflows in multiplications by upcasting to the equivalent higher type
if t.is_floating_point():
return t if t.dtype in (torch.float32, torch.float64) else t.float()
else:
return t if t.dtype in (torch.int32, torch.int64) else t.int()
def box_area(boxes: Tensor) -> Tensor:
"""
Computes the area of a set of bounding boxes, which are specified by their
(x1, y1, x2, y2) coordinates.
Args:
boxes (Tensor[N, 4]): boxes for which the area will be computed. They
are expected to be in (x1, y1, x2, y2) format with
``0 <= x1 < x2`` and ``0 <= y1 < y2``.
Returns:
Tensor[N]: the area for each box
"""
boxes = _upcast(boxes)
return (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1])
# implementation from https://github.com/kuangliu/torchcv/blob/master/torchcv/utils/box.py
# with slight modifications
def _box_inter_union(boxes1: Tensor, boxes2: Tensor) -> Tuple[Tensor, Tensor]:
area1 = box_area(boxes1)
area2 = box_area(boxes2)
lt = torch.max(boxes1[:, None, :2], boxes2[:, :2]) # [N,M,2]
rb = torch.min(boxes1[:, None, 2:], boxes2[:, 2:]) # [N,M,2]
wh = _upcast(rb - lt).clamp(min=0) # [N,M,2]
inter = wh[:, :, 0] * wh[:, :, 1] # [N,M]
union = area1[:, None] + area2 - inter
return inter, union
def box_iou(boxes1: Tensor, boxes2: Tensor) -> Tensor:
"""
Return intersection-over-union (Jaccard index) between two sets of boxes.
Both sets of boxes are expected to be in ``(x1, y1, x2, y2)`` format with
``0 <= x1 < x2`` and ``0 <= y1 < y2``.
Args:
boxes1 (Tensor[N, 4]): first set of boxes
boxes2 (Tensor[M, 4]): second set of boxes
Returns:
Tensor[N, M]: the NxM matrix containing the pairwise IoU values for every element in boxes1 and boxes2
"""
inter, union = _box_inter_union(boxes1, boxes2)
iou = inter / union
return iou

View File

@@ -0,0 +1,27 @@
# https://github.com/mlcommons/training/blob/cdd928d4596c142c15a7d86b2eeadbac718c8da2/single_stage_detector/ssd/model/image_list.py
import torch
from torch import Tensor
from typing import List, Tuple
class ImageList(object):
"""
Structure that holds a list of images (of possibly
varying sizes) as a single tensor.
This works by padding the images to the same size,
and storing in a field the original sizes of each image
"""
def __init__(self, tensors: Tensor, image_sizes: List[Tuple[int, int]]):
"""
Args:
tensors (tensor)
image_sizes (list[tuple[int, int]])
"""
self.tensors = tensors
self.image_sizes = image_sizes
def to(self, device: torch.device) -> 'ImageList':
cast_tensor = self.tensors.to(device)
return ImageList(cast_tensor, self.image_sizes)

View File

@@ -0,0 +1,258 @@
# https://github.com/mlcommons/training/blob/cdd928d4596c142c15a7d86b2eeadbac718c8da2/single_stage_detector/ssd/coco_utils.py
import copy
import os
from PIL import Image
import random
import numpy as np
import torch
import torch.utils.data
import torchvision
from pycocotools import mask as coco_mask
from pycocotools.coco import COCO
from test.external.mlperf_retinanet import transforms as T
from test.external.mlperf_retinanet.boxes import box_iou
from test.external.mlperf_retinanet.utils import Matcher
def convert_coco_poly_to_mask(segmentations, height, width):
masks = []
for polygons in segmentations:
rles = coco_mask.frPyObjects(polygons, height, width)
mask = coco_mask.decode(rles)
if len(mask.shape) < 3:
mask = mask[..., None]
mask = torch.as_tensor(mask, dtype=torch.uint8)
mask = mask.any(dim=2)
masks.append(mask)
if masks:
masks = torch.stack(masks, dim=0)
else:
masks = torch.zeros((0, height, width), dtype=torch.uint8)
return masks
class ConvertCocoPolysToMask(object):
def __init__(self, filter_iscrowd=True):
self.filter_iscrowd = filter_iscrowd
def __call__(self, image, target):
w, h = image.size
image_id = target["image_id"]
image_id = torch.tensor([image_id])
anno = target["annotations"]
if self.filter_iscrowd:
anno = [obj for obj in anno if obj['iscrowd'] == 0]
boxes = [obj["bbox"] for obj in anno]
# guard against no boxes via resizing
boxes = torch.as_tensor(boxes, dtype=torch.float32).reshape(-1, 4)
boxes[:, 2:] += boxes[:, :2]
boxes[:, 0::2].clamp_(min=0, max=w)
boxes[:, 1::2].clamp_(min=0, max=h)
classes = [obj["category_id"] for obj in anno]
classes = torch.tensor(classes, dtype=torch.int64)
keypoints = None
if anno and "keypoints" in anno[0]:
keypoints = [obj["keypoints"] for obj in anno]
keypoints = torch.as_tensor(keypoints, dtype=torch.float32)
num_keypoints = keypoints.shape[0]
if num_keypoints:
keypoints = keypoints.view(num_keypoints, -1, 3)
keep = (boxes[:, 3] > boxes[:, 1]) & (boxes[:, 2] > boxes[:, 0])
boxes = boxes[keep]
classes = classes[keep]
target = {}
target["boxes"] = boxes
target["labels"] = classes
target["image_id"] = image_id
# for conversion to coco api
area = torch.tensor([obj["area"] for obj in anno])
iscrowd = torch.tensor([obj["iscrowd"] for obj in anno])
target["area"] = area
target["iscrowd"] = iscrowd
return image, target
def _coco_remove_images_without_annotations(dataset, cat_list=None):
def _has_only_empty_bbox(anno):
return all(any(o <= 1 for o in obj["bbox"][2:]) for obj in anno)
def _count_visible_keypoints(anno):
return sum(sum(1 for v in ann["keypoints"][2::3] if v > 0) for ann in anno)
min_keypoints_per_image = 10
def _has_valid_annotation(anno):
# if it's empty, there is no annotation
if len(anno) == 0:
return False
# if all boxes have close to zero area, there is no annotation
if _has_only_empty_bbox(anno):
return False
# keypoints task have a slight different critera for considering
# if an annotation is valid
if "keypoints" not in anno[0]:
return True
# for keypoint detection tasks, only consider valid images those
# containing at least min_keypoints_per_image
if _count_visible_keypoints(anno) >= min_keypoints_per_image:
return True
return False
assert isinstance(dataset, torchvision.datasets.CocoDetection)
ids = []
for ds_idx, img_id in enumerate(dataset.ids):
ann_ids = dataset.coco.getAnnIds(imgIds=img_id, iscrowd=None)
anno = dataset.coco.loadAnns(ann_ids)
if cat_list:
anno = [obj for obj in anno if obj["category_id"] in cat_list]
if _has_valid_annotation(anno):
ids.append(ds_idx)
dataset = torch.utils.data.Subset(dataset, ids)
return dataset
def convert_to_coco_api(ds):
coco_ds = COCO()
# annotation IDs need to start at 1, not 0, see torchvision issue #1530
ann_id = 1
dataset = {'images': [], 'categories': [], 'annotations': []}
categories = set()
for img_idx in range(len(ds)):
# find better way to get target
# targets = ds.get_annotations(img_idx)
img, targets = ds[img_idx]
image_id = targets["image_id"].item()
img_dict = {}
img_dict['id'] = image_id
img_dict['height'] = img.shape[-2]
img_dict['width'] = img.shape[-1]
dataset['images'].append(img_dict)
bboxes = targets["boxes"]
bboxes[:, 2:] -= bboxes[:, :2]
bboxes = bboxes.tolist()
labels = targets['labels'].tolist()
areas = targets['area'].tolist()
iscrowd = targets['iscrowd'].tolist()
num_objs = len(bboxes)
for i in range(num_objs):
ann = {}
ann['image_id'] = image_id
ann['bbox'] = bboxes[i]
ann['category_id'] = labels[i]
categories.add(labels[i])
ann['area'] = areas[i]
ann['iscrowd'] = iscrowd[i]
ann['id'] = ann_id
dataset['annotations'].append(ann)
ann_id += 1
dataset['categories'] = [{'id': i} for i in sorted(categories)]
coco_ds.dataset = dataset
coco_ds.createIndex()
return coco_ds
def get_coco_api_from_dataset(dataset):
for _ in range(10):
if isinstance(dataset, torchvision.datasets.CocoDetection):
break
if isinstance(dataset, torch.utils.data.Subset):
dataset = dataset.dataset
if isinstance(dataset, torchvision.datasets.CocoDetection):
return dataset.coco
return convert_to_coco_api(dataset)
class CocoDetection(torchvision.datasets.CocoDetection):
def __init__(self, img_folder, ann_file, transforms):
super(CocoDetection, self).__init__(img_folder, ann_file)
self._transforms = transforms
def __getitem__(self, idx):
img, target = super(CocoDetection, self).__getitem__(idx)
image_id = self.ids[idx]
target = dict(image_id=image_id, annotations=target)
if self._transforms is not None:
img, target = self._transforms(img, target)
return img, target
def get_coco(name, root, image_set, transforms, mode='instances'):
anno_file_template = "{}_{}2017.json"
PATHS = {
"train": ("train2017", os.path.join("annotations", anno_file_template.format(mode, "train"))),
"val": ("val2017", os.path.join("annotations", anno_file_template.format(mode, "val"))),
}
t = [ConvertCocoPolysToMask(filter_iscrowd=True)]
if transforms is not None:
t.append(transforms)
transforms = T.Compose(t)
img_folder, ann_file = PATHS[image_set]
img_folder = os.path.join(root, img_folder)
ann_file = os.path.join(root, ann_file)
dataset = CocoDetection(img_folder, ann_file, transforms=transforms)
if image_set == "train":
dataset = _coco_remove_images_without_annotations(dataset)
return dataset
def get_openimages(name, root, image_set, transforms):
PATHS = {
"train": os.path.join(root, "train"),
"val": os.path.join(root, "validation"),
}
t = [ConvertCocoPolysToMask(filter_iscrowd=False)]
if transforms is not None:
t.append(transforms)
transforms = T.Compose(t)
img_folder = os.path.join(PATHS[image_set], "data")
ann_file = os.path.join(PATHS[image_set], "labels", f"{name}.json")
dataset = CocoDetection(img_folder, ann_file, transforms=transforms)
return dataset
# https://github.com/mlcommons/training/blob/cdd928d4596c142c15a7d86b2eeadbac718c8da2/single_stage_detector/ssd/model/retinanet.py#L401
# NOTE: this applies the following filtering in https://github.com/mlcommons/training/blob/cdd928d4596c142c15a7d86b2eeadbac718c8da2/single_stage_detector/ssd/model/retinanet.py#L117
# and https://github.com/mlcommons/training/blob/cdd928d4596c142c15a7d86b2eeadbac718c8da2/single_stage_detector/ssd/model/retinanet.py#L203 to match with
# tinygrad's dataloader implementation
def postprocess_targets(targets, anchors):
proposal_matcher, matched_idxs = Matcher(0.5, 0.4, allow_low_quality_matches=True), []
for anchors_per_image, targets_per_image in zip(anchors, targets):
if targets_per_image['boxes'].numel() == 0:
matched_idxs.append(torch.full((anchors_per_image.size(0),), -1, dtype=torch.int64,
device=anchors_per_image.device))
continue
match_quality_matrix = box_iou(targets_per_image['boxes'], anchors_per_image)
matched_idxs.append(proposal_matcher(match_quality_matrix))
for targets_per_image, matched_idxs_per_image in zip(targets, matched_idxs):
foreground_idxs_per_image = matched_idxs_per_image >= 0
targets_per_image["boxes"] = targets_per_image["boxes"][matched_idxs_per_image[foreground_idxs_per_image]]
targets_per_image["labels"] = targets_per_image["labels"][matched_idxs_per_image[foreground_idxs_per_image]]
return targets

View File

@@ -0,0 +1,39 @@
# https://github.com/mlcommons/training/blob/cdd928d4596c142c15a7d86b2eeadbac718c8da2/single_stage_detector/ssd/presets.py
from test.external.mlperf_retinanet import transforms as T
class DetectionPresetTrain:
def __init__(self, data_augmentation, hflip_prob=0.5, mean=(123., 117., 104.)):
if data_augmentation == 'hflip':
self.transforms = T.Compose([
T.RandomHorizontalFlip(p=hflip_prob),
T.ToTensor(),
])
elif data_augmentation == 'ssd':
self.transforms = T.Compose([
T.RandomPhotometricDistort(),
T.RandomZoomOut(fill=list(mean)),
T.RandomIoUCrop(),
T.RandomHorizontalFlip(p=hflip_prob),
T.ToTensor(),
])
elif data_augmentation == 'ssdlite':
self.transforms = T.Compose([
T.RandomIoUCrop(),
T.RandomHorizontalFlip(p=hflip_prob),
T.ToTensor(),
])
else:
raise ValueError(f'Unknown data augmentation policy "{data_augmentation}"')
def __call__(self, img, target):
return self.transforms(img, target)
class DetectionPresetEval:
def __init__(self):
self.transforms = T.ToTensor()
def __call__(self, img, target):
return self.transforms(img, target)

View File

@@ -0,0 +1,83 @@
# https://github.com/mlcommons/training/blob/cdd928d4596c142c15a7d86b2eeadbac718c8da2/single_stage_detector/ssd/model/roi_heads.py
import torch
import torchvision
import torch.nn.functional as F
from torch import nn, Tensor
from torchvision.ops import boxes as box_ops
from torchvision.ops import roi_align
from typing import Optional, List, Dict, Tuple
from test.external.mlperf_retinanet.utils import BoxCoder, Matcher
def expand_boxes(boxes, scale):
# type: (Tensor, float) -> Tensor
w_half = (boxes[:, 2] - boxes[:, 0]) * .5
h_half = (boxes[:, 3] - boxes[:, 1]) * .5
x_c = (boxes[:, 2] + boxes[:, 0]) * .5
y_c = (boxes[:, 3] + boxes[:, 1]) * .5
w_half *= scale
h_half *= scale
boxes_exp = torch.zeros_like(boxes)
boxes_exp[:, 0] = x_c - w_half
boxes_exp[:, 2] = x_c + w_half
boxes_exp[:, 1] = y_c - h_half
boxes_exp[:, 3] = y_c + h_half
return boxes_exp
def expand_masks(mask, padding):
# type: (Tensor, int) -> Tuple[Tensor, float]
M = mask.shape[-1]
scale = float(M + 2 * padding) / M
padded_mask = F.pad(mask, (padding,) * 4)
return padded_mask, scale
def paste_mask_in_image(mask, box, im_h, im_w):
# type: (Tensor, Tensor, int, int) -> Tensor
TO_REMOVE = 1
w = int(box[2] - box[0] + TO_REMOVE)
h = int(box[3] - box[1] + TO_REMOVE)
w = max(w, 1)
h = max(h, 1)
# Set shape to [batchxCxHxW]
mask = mask.expand((1, 1, -1, -1))
# Resize mask
mask = F.interpolate(mask, size=(h, w), mode='bilinear', align_corners=False)
mask = mask[0][0]
im_mask = torch.zeros((im_h, im_w), dtype=mask.dtype, device=mask.device)
x_0 = max(box[0], 0)
x_1 = min(box[2] + 1, im_w)
y_0 = max(box[1], 0)
y_1 = min(box[3] + 1, im_h)
im_mask[y_0:y_1, x_0:x_1] = mask[
(y_0 - box[1]):(y_1 - box[1]), (x_0 - box[0]):(x_1 - box[0])
]
return im_mask
def paste_masks_in_image(masks, boxes, img_shape, padding=1):
# type: (Tensor, Tensor, Tuple[int, int], int) -> Tensor
masks, scale = expand_masks(masks, padding=padding)
boxes = expand_boxes(boxes, scale).to(dtype=torch.int64)
im_h, im_w = img_shape
res = [
paste_mask_in_image(m[0], b, im_h, im_w)
for m, b in zip(masks, boxes)
]
if len(res) > 0:
ret = torch.stack(res, dim=0)[:, None]
else:
ret = masks.new_empty((0, 1, im_h, im_w))
return ret

View File

@@ -0,0 +1,517 @@
# https://github.com/mlcommons/training/blob/cdd928d4596c142c15a7d86b2eeadbac718c8da2/single_stage_detector/ssd/transforms.py
import torch
import torchvision
from torch import nn, Tensor
from torchvision.transforms import functional as F
from torchvision.transforms import transforms as T
from typing import List, Tuple, Dict, Optional
def _flip_coco_person_keypoints(kps, width):
flip_inds = [0, 2, 1, 4, 3, 6, 5, 8, 7, 10, 9, 12, 11, 14, 13, 16, 15]
flipped_data = kps[:, flip_inds]
flipped_data[..., 0] = width - flipped_data[..., 0]
# Maintain COCO convention that if visibility == 0, then x, y = 0
inds = flipped_data[..., 2] == 0
flipped_data[inds] = 0
return flipped_data
################################################################################
# TODO(ahmadki): remove this block, and replace get_image_size with F.get_image_size
# once https://github.com/pytorch/vision/pull/4321 is public
from PIL import Image, ImageOps, ImageEnhance
Image.MAX_IMAGE_PIXELS = None
from typing import Any
try:
import accimage
except ImportError:
accimage = None
@torch.jit.unused
def _is_pil_image(img: Any) -> bool:
if accimage is not None:
return isinstance(img, (Image.Image, accimage.Image))
else:
return isinstance(img, Image.Image)
def get_image_size_tensor(img: Tensor) -> List[int]:
# Returns (w, h) of tensor image
_assert_image_tensor(img)
return [img.shape[-1], img.shape[-2]]
@torch.jit.unused
def get_image_size_pil(img: Any) -> List[int]:
if _is_pil_image(img):
return list(img.size)
raise TypeError("Unexpected type {}".format(type(img)))
def get_image_size(img: Tensor) -> List[int]:
"""Returns the size of an image as [width, height].
Args:
img (PIL Image or Tensor): The image to be checked.
Returns:
List[int]: The image size.
"""
if isinstance(img, torch.Tensor):
return get_image_size_tensor(img)
return get_image_size_pil(img)
def get_image_num_channels_tensor(img: Tensor) -> int:
_assert_image_tensor(img)
if img.ndim == 2:
return 1
elif img.ndim > 2:
return img.shape[-3]
raise TypeError(f"Input ndim should be 2 or more. Got {img.ndim}")
@torch.jit.unused
def get_image_num_channels_pil(img: Any) -> int:
if _is_pil_image(img):
return len(img.getbands())
raise TypeError("Unexpected type {}".format(type(img)))
def get_image_num_channels(img: Tensor) -> int:
if isinstance(img, torch.Tensor):
return get_image_num_channels_tensor(img)
return get_image_num_channels_pil(img)
################################################################################
class Compose(object):
def __init__(self, transforms):
self.transforms = transforms
def __call__(self, image, target):
for t in self.transforms:
image, target = t(image, target)
return image, target
class RandomHorizontalFlip(T.RandomHorizontalFlip):
def forward(self, image: Tensor,
target: Optional[Dict[str, Tensor]] = None) -> Tuple[Tensor, Optional[Dict[str, Tensor]]]:
if torch.rand(1) < self.p:
image = F.hflip(image)
if target is not None:
width, _ = get_image_size(image)
target["boxes"][:, [0, 2]] = width - target["boxes"][:, [2, 0]]
if "masks" in target:
target["masks"] = target["masks"].flip(-1)
if "keypoints" in target:
keypoints = target["keypoints"]
keypoints = _flip_coco_person_keypoints(keypoints, width)
target["keypoints"] = keypoints
return image, target
class ToTensor(nn.Module):
def forward(self, image: Tensor,
target: Optional[Dict[str, Tensor]] = None) -> Tuple[Tensor, Optional[Dict[str, Tensor]]]:
image = F.to_tensor(image)
return image, target
class RandomIoUCrop(nn.Module):
def __init__(self, min_scale: float = 0.3, max_scale: float = 1.0, min_aspect_ratio: float = 0.5,
max_aspect_ratio: float = 2.0, sampler_options: Optional[List[float]] = None, trials: int = 40):
super().__init__()
# Configuration similar to https://github.com/weiliu89/caffe/blob/ssd/examples/ssd/ssd_coco.py#L89-L174
self.min_scale = min_scale
self.max_scale = max_scale
self.min_aspect_ratio = min_aspect_ratio
self.max_aspect_ratio = max_aspect_ratio
if sampler_options is None:
sampler_options = [0.0, 0.1, 0.3, 0.5, 0.7, 0.9, 1.0]
self.options = sampler_options
self.trials = trials
def forward(self, image: Tensor,
target: Optional[Dict[str, Tensor]] = None) -> Tuple[Tensor, Optional[Dict[str, Tensor]]]:
if target is None:
raise ValueError("The targets can't be None for this transform.")
if isinstance(image, torch.Tensor):
if image.ndimension() not in {2, 3}:
raise ValueError('image should be 2/3 dimensional. Got {} dimensions.'.format(image.ndimension()))
elif image.ndimension() == 2:
image = image.unsqueeze(0)
orig_w, orig_h = get_image_size(image)
while True:
# sample an option
idx = int(torch.randint(low=0, high=len(self.options), size=(1,)))
min_jaccard_overlap = self.options[idx]
if min_jaccard_overlap >= 1.0: # a value larger than 1 encodes the leave as-is option
return image, target
for _ in range(self.trials):
# check the aspect ratio limitations
r = self.min_scale + (self.max_scale - self.min_scale) * torch.rand(2)
new_w = int(orig_w * r[0])
new_h = int(orig_h * r[1])
aspect_ratio = new_w / new_h
if not (self.min_aspect_ratio <= aspect_ratio <= self.max_aspect_ratio):
continue
# check for 0 area crops
r = torch.rand(2)
left = int((orig_w - new_w) * r[0])
top = int((orig_h - new_h) * r[1])
right = left + new_w
bottom = top + new_h
if left == right or top == bottom:
continue
# check for any valid boxes with centers within the crop area
cx = 0.5 * (target["boxes"][:, 0] + target["boxes"][:, 2])
cy = 0.5 * (target["boxes"][:, 1] + target["boxes"][:, 3])
is_within_crop_area = (left < cx) & (cx < right) & (top < cy) & (cy < bottom)
if not is_within_crop_area.any():
continue
# check at least 1 box with jaccard limitations
boxes = target["boxes"][is_within_crop_area]
ious = torchvision.ops.boxes.box_iou(boxes, torch.tensor([[left, top, right, bottom]],
dtype=boxes.dtype, device=boxes.device))
if ious.max() < min_jaccard_overlap:
continue
# keep only valid boxes and perform cropping
target["boxes"] = boxes
target["labels"] = target["labels"][is_within_crop_area]
target["boxes"][:, 0::2] -= left
target["boxes"][:, 1::2] -= top
target["boxes"][:, 0::2].clamp_(min=0, max=new_w)
target["boxes"][:, 1::2].clamp_(min=0, max=new_h)
image = F.crop(image, top, left, new_h, new_w)
return image, target
class RandomZoomOut(nn.Module):
def __init__(self, fill: Optional[List[float]] = None, side_range: Tuple[float, float] = (1., 4.), p: float = 0.5):
super().__init__()
if fill is None:
fill = [0., 0., 0.]
self.fill = fill
self.side_range = side_range
if side_range[0] < 1. or side_range[0] > side_range[1]:
raise ValueError("Invalid canvas side range provided {}.".format(side_range))
self.p = p
@torch.jit.unused
def _get_fill_value(self, is_pil):
# type: (bool) -> int
# We fake the type to make it work on JIT
return tuple(int(x) for x in self.fill) if is_pil else 0
def forward(self, image: Tensor,
target: Optional[Dict[str, Tensor]] = None) -> Tuple[Tensor, Optional[Dict[str, Tensor]]]:
if isinstance(image, torch.Tensor):
if image.ndimension() not in {2, 3}:
raise ValueError('image should be 2/3 dimensional. Got {} dimensions.'.format(image.ndimension()))
elif image.ndimension() == 2:
image = image.unsqueeze(0)
if torch.rand(1) < self.p:
return image, target
orig_w, orig_h = get_image_size(image)
r = self.side_range[0] + torch.rand(1) * (self.side_range[1] - self.side_range[0])
canvas_width = int(orig_w * r)
canvas_height = int(orig_h * r)
r = torch.rand(2)
left = int((canvas_width - orig_w) * r[0])
top = int((canvas_height - orig_h) * r[1])
right = canvas_width - (left + orig_w)
bottom = canvas_height - (top + orig_h)
if torch.jit.is_scripting():
fill = 0
else:
fill = self._get_fill_value(_is_pil_image(image))
image = F.pad(image, [left, top, right, bottom], fill=fill)
if isinstance(image, torch.Tensor):
v = torch.tensor(self.fill, device=image.device, dtype=image.dtype).view(-1, 1, 1)
image[..., :top, :] = image[..., :, :left] = image[..., (top + orig_h):, :] = \
image[..., :, (left + orig_w):] = v
if target is not None:
target["boxes"][:, 0::2] += left
target["boxes"][:, 1::2] += top
return image, target
class RandomPhotometricDistort(nn.Module):
def __init__(self, contrast: Tuple[float] = (0.5, 1.5), saturation: Tuple[float] = (0.5, 1.5),
hue: Tuple[float] = (-0.05, 0.05), brightness: Tuple[float] = (0.875, 1.125), p: float = 0.5):
super().__init__()
self._brightness = T.ColorJitter(brightness=brightness)
self._contrast = T.ColorJitter(contrast=contrast)
self._hue = T.ColorJitter(hue=hue)
self._saturation = T.ColorJitter(saturation=saturation)
self.p = p
def forward(self, image: Tensor,
target: Optional[Dict[str, Tensor]] = None) -> Tuple[Tensor, Optional[Dict[str, Tensor]]]:
if isinstance(image, torch.Tensor):
if image.ndimension() not in {2, 3}:
raise ValueError('image should be 2/3 dimensional. Got {} dimensions.'.format(image.ndimension()))
elif image.ndimension() == 2:
image = image.unsqueeze(0)
r = torch.rand(7)
if r[0] < self.p:
image = self._brightness(image)
contrast_before = r[1] < 0.5
if contrast_before:
if r[2] < self.p:
image = self._contrast(image)
if r[3] < self.p:
image = self._saturation(image)
if r[4] < self.p:
image = self._hue(image)
if not contrast_before:
if r[5] < self.p:
image = self._contrast(image)
if r[6] < self.p:
channels = get_image_num_channels(image)
permutation = torch.randperm(channels)
is_pil = _is_pil_image(image)
if is_pil:
image = F.to_tensor(image)
image = image[..., permutation, :, :]
if is_pil:
image = F.to_pil_image(image)
return image, target
import math
import torch
import torchvision
from torch import nn, Tensor
from typing import List, Tuple, Dict, Optional
from test.external.mlperf_retinanet.image_list import ImageList
from test.external.mlperf_retinanet.roi_heads import paste_masks_in_image
@torch.jit.unused
def _get_shape_onnx(image: Tensor) -> Tensor:
from torch.onnx import operators
return operators.shape_as_tensor(image)[-2:]
@torch.jit.unused
def _fake_cast_onnx(v: Tensor) -> float:
# ONNX requires a tensor but here we fake its type for JIT.
return v
def _resize_image_and_masks(image: Tensor,
target: Optional[Dict[str, Tensor]] = None,
image_size: Optional[Tuple[int, int]] = None,
) -> Tuple[Tensor, Optional[Dict[str, Tensor]]]:
if torchvision._is_tracing():
im_shape = _get_shape_onnx(image)
else:
im_shape = torch.tensor(image.shape[-2:])
image = torch.nn.functional.interpolate(image[None], size=image_size, scale_factor=None, mode='bilinear',
recompute_scale_factor=None, align_corners=False)[0]
if target is None:
return image, target
if "masks" in target:
mask = target["masks"]
mask = torch.nn.functional.interpolate(mask[:, None].float(), size=image_size, scale_factor=None,
recompute_scale_factor=None)[:, 0].byte()
target["masks"] = mask
return image, target
# https://github.com/mlcommons/training/blob/cdd928d4596c142c15a7d86b2eeadbac718c8da2/single_stage_detector/ssd/model/transform.py
class GeneralizedRCNNTransform(nn.Module):
"""
Performs input / target transformation before feeding the data to a GeneralizedRCNN
model.
The transformations it perform are:
- input normalization (mean subtraction and std division)
- input / target resizing to match image_size
It returns a ImageList for the inputs, and a List[Dict[Tensor]] for the targets
"""
def __init__(self, image_size: Optional[Tuple[int, int]],
image_mean: List[float], image_std: List[float],):
super(GeneralizedRCNNTransform, self).__init__()
self.image_size = image_size
self.image_mean = image_mean
self.image_std = image_std
def forward(self,
images: List[Tensor],
targets: Optional[List[Dict[str, Tensor]]] = None
) -> Tuple[ImageList, Optional[List[Dict[str, Tensor]]]]:
images = [img for img in images]
if targets is not None:
# make a copy of targets to avoid modifying it in-place
# once torchscript supports dict comprehension
# this can be simplified as follows
# targets = [{k: v for k,v in t.items()} for t in targets]
targets_copy: List[Dict[str, Tensor]] = []
for t in targets:
data: Dict[str, Tensor] = {}
for k, v in t.items():
data[k] = v
targets_copy.append(data)
targets = targets_copy
for i in range(len(images)):
image = images[i]
target_index = targets[i] if targets is not None else None
if image.dim() != 3:
raise ValueError("images is expected to be a list of 3d tensors "
"of shape [C, H, W], got {}".format(image.shape))
image = self.normalize(image)
image, target_index = self.resize(image, target_index)
images[i] = image
if targets is not None and target_index is not None:
targets[i] = target_index
image_sizes = [img.shape[-2:] for img in images]
images = torch.stack(images)
image_sizes_list: List[Tuple[int, int]] = []
for image_size in image_sizes:
assert len(image_size) == 2
image_sizes_list.append((image_size[0], image_size[1]))
image_list = ImageList(images, image_sizes_list)
return image_list, targets
def normalize(self, image: Tensor) -> Tensor:
if not image.is_floating_point():
raise TypeError(
f"Expected input images to be of floating type (in range [0, 1]), "
f"but found type {image.dtype} instead"
)
dtype, device = image.dtype, image.device
mean = torch.as_tensor(self.image_mean, dtype=dtype, device=device)
std = torch.as_tensor(self.image_std, dtype=dtype, device=device)
return (image - mean[:, None, None]) / std[:, None, None]
def torch_choice(self, k: List[int]) -> int:
"""
Implements `random.choice` via torch ops so it can be compiled with
TorchScript. Remove if https://github.com/pytorch/pytorch/issues/25803
is fixed.
"""
index = int(torch.empty(1).uniform_(0., float(len(k))).item())
return k[index]
def resize(self,
image: Tensor,
target: Optional[Dict[str, Tensor]] = None,
) -> Tuple[Tensor, Optional[Dict[str, Tensor]]]:
h, w = image.shape[-2:]
image, target = _resize_image_and_masks(image, target, self.image_size)
if target is None:
return image, target
bbox = target["boxes"]
bbox = resize_boxes(bbox, (h, w), image.shape[-2:])
target["boxes"] = bbox
if "keypoints" in target:
keypoints = target["keypoints"]
keypoints = resize_keypoints(keypoints, (h, w), image.shape[-2:])
target["keypoints"] = keypoints
return image, target
def postprocess(self,
result: List[Dict[str, Tensor]],
image_shapes: List[Tuple[int, int]],
original_image_sizes: List[Tuple[int, int]]
) -> List[Dict[str, Tensor]]:
if self.training:
return result
for i, (pred, im_s, o_im_s) in enumerate(zip(result, image_shapes, original_image_sizes)):
boxes = pred["boxes"]
boxes = resize_boxes(boxes, im_s, o_im_s)
result[i]["boxes"] = boxes
if "masks" in pred:
masks = pred["masks"]
masks = paste_masks_in_image(masks, boxes, o_im_s)
result[i]["masks"] = masks
if "keypoints" in pred:
keypoints = pred["keypoints"]
keypoints = resize_keypoints(keypoints, im_s, o_im_s)
result[i]["keypoints"] = keypoints
return result
def __repr__(self) -> str:
format_string = self.__class__.__name__ + '('
_indent = '\n '
format_string += "{0}Normalize(mean={1}, std={2})".format(_indent, self.image_mean, self.image_std)
format_string += "{0}Resize(height={1}, width={2}, mode='bilinear')".format(_indent, self.image_size[0],
self.image_size[1])
format_string += '\n)'
return format_string
def resize_keypoints(keypoints: Tensor, original_size: List[int], new_size: List[int]) -> Tensor:
ratios = [
torch.tensor(s, dtype=torch.float32, device=keypoints.device) /
torch.tensor(s_orig, dtype=torch.float32, device=keypoints.device)
for s, s_orig in zip(new_size, original_size)
]
ratio_h, ratio_w = ratios
resized_data = keypoints.clone()
if torch._C._get_tracing_state():
resized_data_0 = resized_data[:, :, 0] * ratio_w
resized_data_1 = resized_data[:, :, 1] * ratio_h
resized_data = torch.stack((resized_data_0, resized_data_1, resized_data[:, :, 2]), dim=2)
else:
resized_data[..., 0] *= ratio_w
resized_data[..., 1] *= ratio_h
return resized_data
def resize_boxes(boxes: Tensor, original_size: List[int], new_size: List[int]) -> Tensor:
ratios = [
torch.tensor(s, dtype=torch.float32, device=boxes.device) /
torch.tensor(s_orig, dtype=torch.float32, device=boxes.device)
for s, s_orig in zip(new_size, original_size)
]
ratio_height, ratio_width = ratios
xmin, ymin, xmax, ymax = boxes.unbind(1)
xmin = xmin * ratio_width
xmax = xmax * ratio_width
ymin = ymin * ratio_height
ymax = ymax * ratio_height
return torch.stack((xmin, ymin, xmax, ymax), dim=1)

409
test/external/mlperf_retinanet/utils.py vendored Normal file
View File

@@ -0,0 +1,409 @@
# https://github.com/mlcommons/training/blob/cdd928d4596c142c15a7d86b2eeadbac718c8da2/single_stage_detector/ssd/model/utils.py
import math
import torch
from collections import OrderedDict
from torch import Tensor, nn
from typing import List, Tuple, Dict
from torchvision.ops.misc import FrozenBatchNorm2d
class IntermediateLayerGetter(nn.ModuleDict):
"""
Module wrapper that returns intermediate layers from a model
It has a strong assumption that the modules have been registered
into the model in the same order as they are used.
This means that one should **not** reuse the same nn.Module
twice in the forward if you want this to work.
Additionally, it is only able to query submodules that are directly
assigned to the model. So if `model` is passed, `model.feature1` can
be returned, but not `model.feature1.layer2`.
Args:
model (nn.Module): model on which we will extract the features
return_layers (Dict[name, new_name]): a dict containing the names
of the modules for which the activations will be returned as
the key of the dict, and the value of the dict is the name
of the returned activation (which the user can specify).
Examples::
>>> m = torchvision.models.resnet18(pretrained=True)
>>> # extract layer1 and layer3, giving as names `feat1` and feat2`
>>> new_m = torchvision.models._utils.IntermediateLayerGetter(m,
>>> {'layer1': 'feat1', 'layer3': 'feat2'})
>>> out = new_m(torch.rand(1, 3, 224, 224))
>>> print([(k, v.shape) for k, v in out.items()])
>>> [('feat1', torch.Size([1, 64, 56, 56])),
>>> ('feat2', torch.Size([1, 256, 14, 14]))]
"""
_version = 2
__annotations__ = {
"return_layers": Dict[str, str],
}
def __init__(self, model: nn.Module, return_layers: Dict[str, str]) -> None:
if not set(return_layers).issubset([name for name, _ in model.named_children()]):
raise ValueError("return_layers are not present in model")
orig_return_layers = return_layers
return_layers = {str(k): str(v) for k, v in return_layers.items()}
layers = OrderedDict()
for name, module in model.named_children():
layers[name] = module
if name in return_layers:
del return_layers[name]
if not return_layers:
break
super(IntermediateLayerGetter, self).__init__(layers)
self.return_layers = orig_return_layers
def forward(self, x):
out = OrderedDict()
for name, module in self.items():
x = module(x)
if name in self.return_layers:
out_name = self.return_layers[name]
out[out_name] = x
return out
@torch.jit._script_if_tracing
def encode_boxes(reference_boxes, proposals, weights):
# type: (torch.Tensor, torch.Tensor, torch.Tensor) -> torch.Tensor
"""
Encode a set of proposals with respect to some
reference boxes
Args:
reference_boxes (Tensor): reference boxes
proposals (Tensor): boxes to be encoded
weights (Tensor[4]): the weights for ``(x, y, w, h)``
"""
# perform some unpacking to make it JIT-fusion friendly
wx = weights[0]
wy = weights[1]
ww = weights[2]
wh = weights[3]
proposals_x1 = proposals[:, 0].unsqueeze(1)
proposals_y1 = proposals[:, 1].unsqueeze(1)
proposals_x2 = proposals[:, 2].unsqueeze(1)
proposals_y2 = proposals[:, 3].unsqueeze(1)
reference_boxes_x1 = reference_boxes[:, 0].unsqueeze(1)
reference_boxes_y1 = reference_boxes[:, 1].unsqueeze(1)
reference_boxes_x2 = reference_boxes[:, 2].unsqueeze(1)
reference_boxes_y2 = reference_boxes[:, 3].unsqueeze(1)
# implementation starts here
ex_widths = proposals_x2 - proposals_x1
ex_heights = proposals_y2 - proposals_y1
ex_ctr_x = proposals_x1 + 0.5 * ex_widths
ex_ctr_y = proposals_y1 + 0.5 * ex_heights
gt_widths = reference_boxes_x2 - reference_boxes_x1
gt_heights = reference_boxes_y2 - reference_boxes_y1
gt_ctr_x = reference_boxes_x1 + 0.5 * gt_widths
gt_ctr_y = reference_boxes_y1 + 0.5 * gt_heights
targets_dx = wx * (gt_ctr_x - ex_ctr_x) / ex_widths
targets_dy = wy * (gt_ctr_y - ex_ctr_y) / ex_heights
targets_dw = ww * torch.log(gt_widths / ex_widths)
targets_dh = wh * torch.log(gt_heights / ex_heights)
targets = torch.cat((targets_dx, targets_dy, targets_dw, targets_dh), dim=1)
return targets
class BoxCoder(object):
"""
This class encodes and decodes a set of bounding boxes into
the representation used for training the regressors.
"""
def __init__(self, weights, bbox_xform_clip=math.log(1000. / 16)):
# type: (Tuple[float, float, float, float], float) -> None
"""
Args:
weights (4-element tuple)
bbox_xform_clip (float)
"""
self.weights = weights
self.bbox_xform_clip = bbox_xform_clip
def encode(self, reference_boxes, proposals):
# type: (List[Tensor], List[Tensor]) -> List[Tensor]
boxes_per_image = [len(b) for b in reference_boxes]
reference_boxes = torch.cat(reference_boxes, dim=0)
proposals = torch.cat(proposals, dim=0)
targets = self.encode_single(reference_boxes, proposals)
return targets.split(boxes_per_image, 0)
def encode_single(self, reference_boxes, proposals):
"""
Encode a set of proposals with respect to some
reference boxes
Args:
reference_boxes (Tensor): reference boxes
proposals (Tensor): boxes to be encoded
"""
dtype = reference_boxes.dtype
device = reference_boxes.device
weights = torch.as_tensor(self.weights, dtype=dtype, device=device)
targets = encode_boxes(reference_boxes, proposals, weights)
return targets
def decode(self, rel_codes, boxes):
# type: (Tensor, List[Tensor]) -> Tensor
assert isinstance(boxes, (list, tuple))
assert isinstance(rel_codes, torch.Tensor)
boxes_per_image = [b.size(0) for b in boxes]
concat_boxes = torch.cat(boxes, dim=0)
box_sum = 0
for val in boxes_per_image:
box_sum += val
if box_sum > 0:
rel_codes = rel_codes.reshape(box_sum, -1)
pred_boxes = self.decode_single(
rel_codes, concat_boxes
)
if box_sum > 0:
pred_boxes = pred_boxes.reshape(box_sum, -1, 4)
return pred_boxes
def decode_single(self, rel_codes, boxes):
"""
From a set of original boxes and encoded relative box offsets,
get the decoded boxes.
Args:
rel_codes (Tensor): encoded boxes
boxes (Tensor): reference boxes.
"""
boxes = boxes.to(rel_codes.dtype)
widths = boxes[:, 2] - boxes[:, 0]
heights = boxes[:, 3] - boxes[:, 1]
ctr_x = boxes[:, 0] + 0.5 * widths
ctr_y = boxes[:, 1] + 0.5 * heights
wx, wy, ww, wh = self.weights
dx = rel_codes[:, 0::4] / wx
dy = rel_codes[:, 1::4] / wy
dw = rel_codes[:, 2::4] / ww
dh = rel_codes[:, 3::4] / wh
# Prevent sending too large values into torch.exp()
dw = torch.clamp(dw, max=self.bbox_xform_clip)
dh = torch.clamp(dh, max=self.bbox_xform_clip)
pred_ctr_x = dx * widths[:, None] + ctr_x[:, None]
pred_ctr_y = dy * heights[:, None] + ctr_y[:, None]
pred_w = torch.exp(dw) * widths[:, None]
pred_h = torch.exp(dh) * heights[:, None]
# Distance from center to box's corner.
c_to_c_h = torch.tensor(0.5, dtype=pred_ctr_y.dtype, device=pred_h.device) * pred_h
c_to_c_w = torch.tensor(0.5, dtype=pred_ctr_x.dtype, device=pred_w.device) * pred_w
pred_boxes1 = pred_ctr_x - c_to_c_w
pred_boxes2 = pred_ctr_y - c_to_c_h
pred_boxes3 = pred_ctr_x + c_to_c_w
pred_boxes4 = pred_ctr_y + c_to_c_h
pred_boxes = torch.stack((pred_boxes1, pred_boxes2, pred_boxes3, pred_boxes4), dim=2).flatten(1)
return pred_boxes
class Matcher(object):
"""
This class assigns to each predicted "element" (e.g., a box) a ground-truth
element. Each predicted element will have exactly zero or one matches; each
ground-truth element may be assigned to zero or more predicted elements.
Matching is based on the MxN match_quality_matrix, that characterizes how well
each (ground-truth, predicted)-pair match. For example, if the elements are
boxes, the matrix may contain box IoU overlap values.
The matcher returns a tensor of size N containing the index of the ground-truth
element m that matches to prediction n. If there is no match, a negative value
is returned.
"""
BELOW_LOW_THRESHOLD = -1
BETWEEN_THRESHOLDS = -2
__annotations__ = {
'BELOW_LOW_THRESHOLD': int,
'BETWEEN_THRESHOLDS': int,
}
def __init__(self, high_threshold, low_threshold, allow_low_quality_matches=False):
# type: (float, float, bool) -> None
"""
Args:
high_threshold (float): quality values greater than or equal to
this value are candidate matches.
low_threshold (float): a lower quality threshold used to stratify
matches into three levels:
1) matches >= high_threshold
2) BETWEEN_THRESHOLDS matches in [low_threshold, high_threshold)
3) BELOW_LOW_THRESHOLD matches in [0, low_threshold)
allow_low_quality_matches (bool): if True, produce additional matches
for predictions that have only low-quality match candidates. See
set_low_quality_matches_ for more details.
"""
self.BELOW_LOW_THRESHOLD = -1
self.BETWEEN_THRESHOLDS = -2
assert low_threshold <= high_threshold
self.high_threshold = high_threshold
self.low_threshold = low_threshold
self.allow_low_quality_matches = allow_low_quality_matches
def __call__(self, match_quality_matrix):
"""
Args:
match_quality_matrix (Tensor[float]): an MxN tensor, containing the
pairwise quality between M ground-truth elements and N predicted elements.
Returns:
matches (Tensor[int64]): an N tensor where N[i] is a matched gt in
[0, M - 1] or a negative value indicating that prediction i could not
be matched.
"""
if match_quality_matrix.numel() == 0:
# empty targets or proposals not supported during training
if match_quality_matrix.shape[0] == 0:
raise ValueError(
"No ground-truth boxes available for one of the images "
"during training")
else:
raise ValueError(
"No proposal boxes available for one of the images "
"during training")
# match_quality_matrix is M (gt) x N (predicted)
# Max over gt elements (dim 0) to find best gt candidate for each prediction
matched_vals, matches = match_quality_matrix.max(dim=0)
if self.allow_low_quality_matches:
all_matches = matches.clone()
else:
all_matches = None
# Assign candidate matches with low quality to negative (unassigned) values
below_low_threshold = matched_vals < self.low_threshold
between_thresholds = (matched_vals >= self.low_threshold) & (
matched_vals < self.high_threshold
)
matches[below_low_threshold] = self.BELOW_LOW_THRESHOLD
matches[between_thresholds] = self.BETWEEN_THRESHOLDS
if self.allow_low_quality_matches:
assert all_matches is not None
self.set_low_quality_matches_(matches, all_matches, match_quality_matrix)
return matches
def set_low_quality_matches_(self, matches, all_matches, match_quality_matrix):
"""
Produce additional matches for predictions that have only low-quality matches.
Specifically, for each ground-truth find the set of predictions that have
maximum overlap with it (including ties); for each prediction in that set, if
it is unmatched, then match it to the ground-truth with which it has the highest
quality value.
"""
# For each gt, find the prediction with which it has highest quality
highest_quality_foreach_gt, _ = match_quality_matrix.max(dim=1)
# Find highest quality match available, even if it is low, including ties
gt_pred_pairs_of_highest_quality = torch.where(
match_quality_matrix == highest_quality_foreach_gt[:, None]
)
# Example gt_pred_pairs_of_highest_quality:
# tensor([[ 0, 39796],
# [ 1, 32055],
# [ 1, 32070],
# [ 2, 39190],
# [ 2, 40255],
# [ 3, 40390],
# [ 3, 41455],
# [ 4, 45470],
# [ 5, 45325],
# [ 5, 46390]])
# Each row is a (gt index, prediction index)
# Note how gt items 1, 2, 3, and 5 each have two ties
pred_inds_to_update = gt_pred_pairs_of_highest_quality[1]
matches[pred_inds_to_update] = all_matches[pred_inds_to_update]
class SSDMatcher(Matcher):
def __init__(self, threshold):
super().__init__(threshold, threshold, allow_low_quality_matches=False)
def __call__(self, match_quality_matrix):
matches = super().__call__(match_quality_matrix)
# For each gt, find the prediction with which it has the highest quality
_, highest_quality_pred_foreach_gt = match_quality_matrix.max(dim=1)
matches[highest_quality_pred_foreach_gt] = torch.arange(highest_quality_pred_foreach_gt.size(0),
dtype=torch.int64,
device=highest_quality_pred_foreach_gt.device)
return matches
def overwrite_eps(model, eps):
"""
This method overwrites the default eps values of all the
FrozenBatchNorm2d layers of the model with the provided value.
This is necessary to address the BC-breaking change introduced
by the bug-fix at pytorch/vision#2933. The overwrite is applied
only when the pretrained weights are loaded to maintain compatibility
with previous versions.
Args:
model (nn.Module): The model on which we perform the overwrite.
eps (float): The new value of eps.
"""
for module in model.modules():
if isinstance(module, FrozenBatchNorm2d):
module.eps = eps
def retrieve_out_channels(model, size):
"""
This method retrieves the number of output channels of a specific model.
Args:
model (nn.Module): The model for which we estimate the out_channels.
It should return a single Tensor or an OrderedDict[Tensor].
size (Tuple[int, int]): The size (wxh) of the input.
Returns:
out_channels (List[int]): A list of the output channels of the model.
"""
in_training = model.training
model.eval()
with torch.no_grad():
# Use dummy data to retrieve the feature map sizes to avoid hard-coding their values
device = next(model.parameters()).device
tmp_img = torch.zeros((1, 3, size[1], size[0]), device=device)
features = model(tmp_img)
if isinstance(features, torch.Tensor):
features = OrderedDict([('0', features)])
out_channels = [x.size(1) for x in features.values()]
if in_training:
model.train()
return out_channels