mirror of
https://github.com/tinygrad/tinygrad.git
synced 2026-01-08 14:43:57 -05:00
The current yolov3 example is broken with the current implementation of of fetch in the helpers. I was tempted to fix the helpers instead but that could have just as well broken other examples.
407 lines
16 KiB
Python
Executable File
407 lines
16 KiB
Python
Executable File
# https://raw.githubusercontent.com/pjreddie/darknet/master/cfg/yolov3.cfg
|
|
import sys
|
|
import io
|
|
import time
|
|
import math
|
|
import cv2
|
|
import numpy as np
|
|
from PIL import Image
|
|
from tinygrad.tensor import Tensor
|
|
from tinygrad.nn import BatchNorm2d, Conv2d
|
|
from tinygrad.helpers import fetch
|
|
|
|
def show_labels(prediction, confidence=0.5, num_classes=80):
|
|
coco_labels = fetch('https://raw.githubusercontent.com/pjreddie/darknet/master/data/coco.names').read_bytes()
|
|
coco_labels = coco_labels.decode('utf-8').split('\n')
|
|
prediction = prediction.detach().numpy()
|
|
conf_mask = (prediction[:,:,4] > confidence)
|
|
prediction *= np.expand_dims(conf_mask, 2)
|
|
labels = []
|
|
# Iterate over batches
|
|
for img_pred in prediction:
|
|
max_conf = np.amax(img_pred[:,5:5+num_classes], axis=1)
|
|
max_conf_score = np.argmax(img_pred[:,5:5+num_classes], axis=1)
|
|
max_conf_score = np.expand_dims(max_conf_score, axis=1)
|
|
max_conf = np.expand_dims(max_conf, axis=1)
|
|
seq = (img_pred[:,:5], max_conf, max_conf_score)
|
|
image_pred = np.concatenate(seq, axis=1)
|
|
non_zero_ind = np.nonzero(image_pred[:,4])[0]
|
|
assert all(image_pred[non_zero_ind,0] > 0)
|
|
image_pred_ = np.reshape(image_pred[np.squeeze(non_zero_ind),:], (-1, 7))
|
|
classes, indexes = np.unique(image_pred_[:, -1], return_index=True)
|
|
for index, coco_class in enumerate(classes):
|
|
label, probability = coco_labels[int(coco_class)], image_pred_[indexes[index]][4] * 100
|
|
print(f"Detected {label} {probability:.2f}")
|
|
labels.append(label)
|
|
return labels
|
|
|
|
def add_boxes(img, prediction):
|
|
if isinstance(prediction, int): # no predictions
|
|
return img
|
|
coco_labels = fetch('https://raw.githubusercontent.com/pjreddie/darknet/master/data/coco.names').read_bytes()
|
|
coco_labels = coco_labels.decode('utf-8').split('\n')
|
|
height, width = img.shape[0:2]
|
|
scale_factor = 608 / width
|
|
prediction[:,[1,3]] -= (608 - scale_factor * width) / 2
|
|
prediction[:,[2,4]] -= (608 - scale_factor * height) / 2
|
|
for pred in prediction:
|
|
corner1 = tuple(pred[1:3].astype(int))
|
|
corner2 = tuple(pred[3:5].astype(int))
|
|
w = corner2[0] - corner1[0]
|
|
h = corner2[1] - corner1[1]
|
|
corner2 = (corner2[0] + w, corner2[1] + h)
|
|
label = coco_labels[int(pred[-1])]
|
|
img = cv2.rectangle(img, corner1, corner2, (255, 0, 0), 2)
|
|
t_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_PLAIN, 1 , 1)[0]
|
|
c2 = corner1[0] + t_size[0] + 3, corner1[1] + t_size[1] + 4
|
|
img = cv2.rectangle(img, corner1, c2, (255, 0, 0), -1)
|
|
img = cv2.putText(img, label, (corner1[0], corner1[1] + t_size[1] + 4), cv2.FONT_HERSHEY_PLAIN, 1, [225,255,255], 1)
|
|
return img
|
|
|
|
def bbox_iou(box1, box2):
|
|
"""
|
|
Returns the IoU of two bounding boxes
|
|
IoU: IoU = Area Of Overlap / Area of Union -> How close the predicted bounding box is
|
|
to the ground truth bounding box. Higher IoU = Better accuracy
|
|
In training, used to track accuracy. with inference, using to remove duplicate bounding boxes
|
|
"""
|
|
# Get the coordinates of bounding boxes
|
|
b1_x1, b1_y1, b1_x2, b1_y2 = box1[:,0], box1[:,1], box1[:,2], box1[:,3]
|
|
b2_x1, b2_y1, b2_x2, b2_y2 = box2[:,0], box2[:,1], box2[:,2], box2[:,3]
|
|
# get the coordinates of the intersection rectangle
|
|
inter_rect_x1 = np.maximum(b1_x1, b2_x1)
|
|
inter_rect_y1 = np.maximum(b1_y1, b2_y1)
|
|
inter_rect_x2 = np.maximum(b1_x2, b2_x2)
|
|
inter_rect_y2 = np.maximum(b1_y2, b2_y2)
|
|
#Intersection area
|
|
inter_area = np.clip(inter_rect_x2 - inter_rect_x1 + 1, 0, 99999) * np.clip(inter_rect_y2 - inter_rect_y1 + 1, 0, 99999)
|
|
#Union Area
|
|
b1_area = (b1_x2 - b1_x1 + 1)*(b1_y2 - b1_y1 + 1)
|
|
b2_area = (b2_x2 - b2_x1 + 1)*(b2_y2 - b2_y1 + 1)
|
|
iou = inter_area / (b1_area + b2_area - inter_area)
|
|
return iou
|
|
|
|
def process_results(prediction, confidence=0.9, num_classes=80, nms_conf=0.4):
|
|
prediction = prediction.detach().numpy()
|
|
conf_mask = (prediction[:,:,4] > confidence)
|
|
conf_mask = np.expand_dims(conf_mask, 2)
|
|
prediction = prediction * conf_mask
|
|
# Non max suppression
|
|
box_corner = prediction
|
|
box_corner[:,:,0] = (prediction[:,:,0] - prediction[:,:,2]/2)
|
|
box_corner[:,:,1] = (prediction[:,:,1] - prediction[:,:,3]/2)
|
|
box_corner[:,:,2] = (prediction[:,:,0] + prediction[:,:,2]/2)
|
|
box_corner[:,:,3] = (prediction[:,:,1] + prediction[:,:,3]/2)
|
|
prediction[:,:,:4] = box_corner[:,:,:4]
|
|
write = False
|
|
# Process img
|
|
img_pred = prediction[0]
|
|
max_conf = np.amax(img_pred[:,5:5+num_classes], axis=1)
|
|
max_conf_score = np.argmax(img_pred[:,5:5+num_classes], axis=1)
|
|
max_conf_score = np.expand_dims(max_conf_score, axis=1)
|
|
max_conf = np.expand_dims(max_conf, axis=1)
|
|
seq = (img_pred[:,:5], max_conf, max_conf_score)
|
|
image_pred = np.concatenate(seq, axis=1)
|
|
non_zero_ind = np.nonzero(image_pred[:,4])[0]
|
|
assert all(image_pred[non_zero_ind,0] > 0)
|
|
image_pred_ = np.reshape(image_pred[np.squeeze(non_zero_ind),:], (-1, 7))
|
|
if image_pred_.shape[0] == 0:
|
|
print("No detections found!")
|
|
return 0
|
|
for cls in np.unique(image_pred_[:, -1]):
|
|
# perform NMS, get the detections with one particular class
|
|
cls_mask = image_pred_*np.expand_dims(image_pred_[:, -1] == cls, axis=1)
|
|
class_mask_ind = np.squeeze(np.nonzero(cls_mask[:,-2]))
|
|
# class_mask_ind = np.nonzero()
|
|
image_pred_class = np.reshape(image_pred_[class_mask_ind], (-1, 7))
|
|
# sort the detections such that the entry with the maximum objectness
|
|
# confidence is at the top
|
|
conf_sort_index = np.argsort(image_pred_class[:,4])
|
|
image_pred_class = image_pred_class[conf_sort_index]
|
|
for i in range(image_pred_class.shape[0]):
|
|
# Get the IOUs of all boxes that come after the one we are looking at in the loop
|
|
try:
|
|
ious = bbox_iou(np.expand_dims(image_pred_class[i], axis=0), image_pred_class[i+1:])
|
|
except:
|
|
break
|
|
# Zero out all the detections that have IoU > threshold
|
|
iou_mask = np.expand_dims((ious < nms_conf), axis=1)
|
|
image_pred_class[i+1:] *= iou_mask
|
|
# Remove the non-zero entries
|
|
non_zero_ind = np.squeeze(np.nonzero(image_pred_class[:,4]))
|
|
image_pred_class = np.reshape(image_pred_class[non_zero_ind], (-1, 7))
|
|
batch_ind = np.array([[0]])
|
|
seq = (batch_ind, image_pred_class)
|
|
if not write:
|
|
output, write = np.concatenate(seq, axis=1), True
|
|
else:
|
|
out = np.concatenate(seq, axis=1)
|
|
output = np.concatenate((output,out))
|
|
return output
|
|
|
|
def infer(model, img):
|
|
img = np.array(Image.fromarray(img).resize((608, 608)))
|
|
img = img[:,:,::-1].transpose((2,0,1))
|
|
img = img[np.newaxis,:,:,:]/255.0
|
|
prediction = model.forward(Tensor(img.astype(np.float32)))
|
|
return prediction
|
|
|
|
|
|
def parse_cfg(cfg):
|
|
# Return a list of blocks
|
|
lines = cfg.decode("utf-8").split('\n')
|
|
lines = [x for x in lines if len(x) > 0]
|
|
lines = [x for x in lines if x[0] != '#']
|
|
lines = [x.rstrip().lstrip() for x in lines]
|
|
block, blocks = {}, []
|
|
for line in lines:
|
|
if line[0] == "[":
|
|
if len(block) != 0:
|
|
blocks.append(block)
|
|
block = {}
|
|
block["type"] = line[1:-1].rstrip()
|
|
else:
|
|
key,value = line.split("=")
|
|
block[key.rstrip()] = value.lstrip()
|
|
blocks.append(block)
|
|
return blocks
|
|
|
|
# TODO: Speed up this function, avoid copying stuff from GPU to CPU
|
|
def predict_transform(prediction, inp_dim, anchors, num_classes):
|
|
batch_size = prediction.shape[0]
|
|
stride = inp_dim // prediction.shape[2]
|
|
grid_size = inp_dim // stride
|
|
bbox_attrs = 5 + num_classes
|
|
num_anchors = len(anchors)
|
|
prediction = prediction.reshape(shape=(batch_size, bbox_attrs*num_anchors, grid_size*grid_size))
|
|
prediction = prediction.transpose(1, 2)
|
|
prediction = prediction.reshape(shape=(batch_size, grid_size*grid_size*num_anchors, bbox_attrs))
|
|
prediction_cpu = prediction.numpy()
|
|
for i in (0, 1, 4):
|
|
prediction_cpu[:,:,i] = 1 / (1 + np.exp(-prediction_cpu[:,:,i]))
|
|
# Add the center offsets
|
|
grid = np.arange(grid_size)
|
|
a, b = np.meshgrid(grid, grid)
|
|
x_offset = a.reshape((-1, 1))
|
|
y_offset = b.reshape((-1, 1))
|
|
x_y_offset = np.concatenate((x_offset, y_offset), 1)
|
|
x_y_offset = np.tile(x_y_offset, (1, num_anchors))
|
|
x_y_offset = x_y_offset.reshape((-1,2))
|
|
x_y_offset = np.expand_dims(x_y_offset, 0)
|
|
anchors = [(a[0]/stride, a[1]/stride) for a in anchors]
|
|
anchors = np.tile(anchors, (grid_size*grid_size, 1))
|
|
anchors = np.expand_dims(anchors, 0)
|
|
prediction_cpu[:,:,:2] += x_y_offset
|
|
prediction_cpu[:,:,2:4] = np.exp(prediction_cpu[:,:,2:4])*anchors
|
|
prediction_cpu[:,:,5:5+num_classes] = 1 / (1 + np.exp(-prediction_cpu[:,:,5:5+num_classes]))
|
|
prediction_cpu[:,:,:4] *= stride
|
|
return Tensor(prediction_cpu)
|
|
|
|
|
|
class Darknet:
|
|
def __init__(self, cfg):
|
|
self.blocks = parse_cfg(cfg)
|
|
self.net_info, self.module_list = self.create_modules(self.blocks)
|
|
print("Modules length:", len(self.module_list))
|
|
|
|
def create_modules(self, blocks):
|
|
net_info = blocks[0] # Info about model hyperparameters
|
|
prev_filters, filters = 3, None
|
|
output_filters, module_list = [], []
|
|
## module
|
|
for index, x in enumerate(blocks[1:]):
|
|
module_type = x["type"]
|
|
module = []
|
|
if module_type == "convolutional":
|
|
try:
|
|
batch_normalize, bias = int(x["batch_normalize"]), False
|
|
except:
|
|
batch_normalize, bias = 0, True
|
|
# layer
|
|
activation = x["activation"]
|
|
filters = int(x["filters"])
|
|
padding = int(x["pad"])
|
|
pad = (int(x["size"]) - 1) // 2 if padding else 0
|
|
module.append(Conv2d(prev_filters, filters, int(x["size"]), int(x["stride"]), pad, bias=bias))
|
|
# BatchNorm2d
|
|
if batch_normalize:
|
|
module.append(BatchNorm2d(filters, eps=1e-05, track_running_stats=True))
|
|
# LeakyReLU activation
|
|
if activation == "leaky":
|
|
module.append(lambda x: x.leakyrelu(0.1))
|
|
elif module_type == "maxpool":
|
|
size, stride = int(x["size"]), int(x["stride"])
|
|
module.append(lambda x: x.max_pool2d(kernel_size=(size, size), stride=stride))
|
|
elif module_type == "upsample":
|
|
module.append(lambda x: Tensor(x.numpy().repeat(2, axis=-2).repeat(2, axis=-1)))
|
|
elif module_type == "route":
|
|
x["layers"] = x["layers"].split(",")
|
|
# Start of route
|
|
start = int(x["layers"][0])
|
|
# End if it exists
|
|
try:
|
|
end = int(x["layers"][1])
|
|
except:
|
|
end = 0
|
|
if start > 0: start -= index
|
|
if end > 0: end -= index
|
|
module.append(lambda x: x)
|
|
if end < 0:
|
|
filters = output_filters[index + start] + output_filters[index + end]
|
|
else:
|
|
filters = output_filters[index + start]
|
|
# Shortcut corresponds to skip connection
|
|
elif module_type == "shortcut":
|
|
module.append(lambda x: x)
|
|
elif module_type == "yolo":
|
|
mask = list(map(int, x["mask"].split(",")))
|
|
anchors = [int(a) for a in x["anchors"].split(",")]
|
|
anchors = [(anchors[i], anchors[i+1]) for i in range(0, len(anchors), 2)]
|
|
module.append([anchors[i] for i in mask])
|
|
# Append to module_list
|
|
module_list.append(module)
|
|
if filters is not None:
|
|
prev_filters = filters
|
|
output_filters.append(filters)
|
|
return (net_info, module_list)
|
|
|
|
def dump_weights(self):
|
|
for i in range(len(self.module_list)):
|
|
module_type = self.blocks[i + 1]["type"]
|
|
if module_type == "convolutional":
|
|
print(self.blocks[i + 1]["type"], "weights", i)
|
|
model = self.module_list[i]
|
|
conv = model[0]
|
|
print(conv.weight.numpy()[0][0][0])
|
|
if conv.bias is not None:
|
|
print("biases")
|
|
print(conv.bias.shape)
|
|
print(conv.bias.numpy()[0][0:5])
|
|
else:
|
|
print("None biases for layer", i)
|
|
|
|
def load_weights(self, url):
|
|
weights = np.frombuffer(fetch(url).read_bytes(), dtype=np.float32)[5:]
|
|
ptr = 0
|
|
for i in range(len(self.module_list)):
|
|
module_type = self.blocks[i + 1]["type"]
|
|
if module_type == "convolutional":
|
|
model = self.module_list[i]
|
|
try: # we have batchnorm, load conv weights without biases, and batchnorm values
|
|
batch_normalize = int(self.blocks[i+1]["batch_normalize"])
|
|
except: # no batchnorm, load conv weights + biases
|
|
batch_normalize = 0
|
|
conv = model[0]
|
|
if batch_normalize:
|
|
bn = model[1]
|
|
# Get the number of weights of batchnorm
|
|
num_bn_biases = math.prod(bn.bias.shape)
|
|
# Load weights
|
|
bn_biases = Tensor(weights[ptr:ptr + num_bn_biases])
|
|
ptr += num_bn_biases
|
|
bn_weights = Tensor(weights[ptr:ptr+num_bn_biases])
|
|
ptr += num_bn_biases
|
|
bn_running_mean = Tensor(weights[ptr:ptr+num_bn_biases])
|
|
ptr += num_bn_biases
|
|
bn_running_var = Tensor(weights[ptr:ptr+num_bn_biases])
|
|
ptr += num_bn_biases
|
|
# Cast the loaded weights into dims of model weights
|
|
bn_biases = bn_biases.reshape(shape=tuple(bn.bias.shape))
|
|
bn_weights = bn_weights.reshape(shape=tuple(bn.weight.shape))
|
|
bn_running_mean = bn_running_mean.reshape(shape=tuple(bn.running_mean.shape))
|
|
bn_running_var = bn_running_var.reshape(shape=tuple(bn.running_var.shape))
|
|
# Copy data
|
|
bn.bias = bn_biases
|
|
bn.weight = bn_weights
|
|
bn.running_mean = bn_running_mean
|
|
bn.running_var = bn_running_var
|
|
else:
|
|
# load biases of the conv layer
|
|
num_biases = math.prod(conv.bias.shape)
|
|
# Load weights
|
|
conv_biases = Tensor(weights[ptr: ptr+num_biases])
|
|
ptr += num_biases
|
|
# Reshape
|
|
conv_biases = conv_biases.reshape(shape=tuple(conv.bias.shape))
|
|
# Copy
|
|
conv.bias = conv_biases
|
|
# Load weighys for conv layers
|
|
num_weights = math.prod(conv.weight.shape)
|
|
conv_weights = Tensor(weights[ptr:ptr+num_weights])
|
|
ptr += num_weights
|
|
conv_weights = conv_weights.reshape(shape=tuple(conv.weight.shape))
|
|
conv.weight = conv_weights
|
|
|
|
def forward(self, x):
|
|
modules = self.blocks[1:]
|
|
outputs = {} # Cached outputs for route layer
|
|
detections, write = None, False
|
|
for i, module in enumerate(modules):
|
|
module_type = (module["type"])
|
|
if module_type == "convolutional" or module_type == "upsample":
|
|
for layer in self.module_list[i]:
|
|
x = layer(x)
|
|
elif module_type == "route":
|
|
layers = module["layers"]
|
|
layers = [int(a) for a in layers]
|
|
if (layers[0]) > 0:
|
|
layers[0] = layers[0] - i
|
|
if len(layers) == 1:
|
|
x = outputs[i + (layers[0])]
|
|
else:
|
|
if (layers[1]) > 0: layers[1] = layers[1] - i
|
|
map1 = outputs[i + layers[0]]
|
|
map2 = outputs[i + layers[1]]
|
|
x = Tensor(np.concatenate((map1.numpy(), map2.numpy()), axis=1))
|
|
elif module_type == "shortcut":
|
|
from_ = int(module["from"])
|
|
x = outputs[i - 1] + outputs[i + from_]
|
|
elif module_type == "yolo":
|
|
anchors = self.module_list[i][0]
|
|
inp_dim = int(self.net_info["height"]) # 416
|
|
num_classes = int(module["classes"])
|
|
x = predict_transform(x, inp_dim, anchors, num_classes)
|
|
if not write:
|
|
detections, write = x, True
|
|
else:
|
|
detections = Tensor(np.concatenate((detections.numpy(), x.numpy()), axis=1))
|
|
outputs[i] = x
|
|
return detections
|
|
|
|
if __name__ == "__main__":
|
|
model = Darknet(fetch('https://raw.githubusercontent.com/pjreddie/darknet/master/cfg/yolov3.cfg').read_bytes())
|
|
print("Loading weights file (237MB). This might take a while…")
|
|
model.load_weights('https://pjreddie.com/media/files/yolov3.weights')
|
|
if len(sys.argv) > 1:
|
|
url = sys.argv[1]
|
|
else:
|
|
url = "https://github.com/ayooshkathuria/pytorch-yolo-v3/raw/master/dog-cycle-car.png"
|
|
if url == 'webcam':
|
|
cap = cv2.VideoCapture(0)
|
|
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
|
|
while 1:
|
|
_ = cap.grab() # discard one frame to circumvent capture buffering
|
|
ret, frame = cap.read()
|
|
prediction = process_results(infer(model, frame))
|
|
img = Image.fromarray(frame[:, :, [2,1,0]])
|
|
boxes = add_boxes(np.array(img.resize((608, 608))), prediction)
|
|
boxes = cv2.cvtColor(boxes, cv2.COLOR_RGB2BGR)
|
|
cv2.imshow('yolo', boxes)
|
|
if cv2.waitKey(1) & 0xFF == ord('q'):
|
|
break
|
|
cap.release()
|
|
cv2.destroyAllWindows()
|
|
elif url.startswith('http'):
|
|
img_stream = io.BytesIO(fetch(url).read_bytes())
|
|
img = cv2.imdecode(np.frombuffer(img_stream.read(), np.uint8), 1)
|
|
else:
|
|
img = cv2.imread(url)
|
|
st = time.time()
|
|
print('running inference…')
|
|
prediction = infer(model, img)
|
|
print(f'did inference in {(time.time() - st):2f}s')
|
|
show_labels(prediction)
|
|
prediction = process_results(prediction)
|
|
boxes = add_boxes(np.array(Image.fromarray(img).resize((608, 608))), prediction)
|
|
cv2.imwrite('boxes.jpg', boxes)
|