remove yolo examples

This commit is contained in:
George Hotz
2023-07-31 10:52:30 -07:00
parent 846923703c
commit cd1f49d466
6 changed files with 0 additions and 1361 deletions

View File

@@ -1,250 +0,0 @@
import sys
import os
import random
import json
import numpy
from PIL import Image
from tinygrad.tensor import Tensor
from tinygrad.nn.optim import SGD
from examples.vgg7_helpers.kinne import KinneDir
from examples.vgg7_helpers.waifu2x import image_load, image_save, Vgg7
# amount of context erased by model
CONTEXT = 7
def get_sample_count(samples_dir):
try:
samples_dir_count_file = open(samples_dir + "/sample_count.txt", "r")
v = samples_dir_count_file.readline()
samples_dir_count_file.close()
return int(v)
except:
return 0
def set_sample_count(samples_dir, sc):
with open(samples_dir + "/sample_count.txt", "w") as file:
file.write(str(sc) + "\n")
if len(sys.argv) < 2:
print("python3 -m examples.vgg7 import MODELJSON MODELDIR")
print(" imports a waifu2x JSON vgg_7 model, i.e. waifu2x/models/vgg_7/art/scale2.0x_model.json")
print(" into a directory of float binaries along with a meta.txt file containing tensor sizes")
print(" weight tensors are ordered in tinygrad/ncnn form, as so: (outC,inC,H,W)")
print(" *this format is used by all other commands in this program*")
print("python3 -m examples.vgg7 execute MODELDIR IMG_IN IMG_OUT")
print(" given an already-nearest-neighbour-scaled image, runs vgg7 on it")
print(" output image has 7 pixels removed on all edges")
print(" do not run on large images, will have *hilarious* RAM use")
print("python3 -m examples.vgg7 execute_full MODELDIR IMG_IN IMG_OUT")
print(" does the 'whole thing' (padding, tiling)")
print(" safe for large images, etc.")
print("python3 -m examples.vgg7 new MODELDIR")
print(" creates a new model (experimental)")
print("python3 -m examples.vgg7 train MODELDIR SAMPLES_DIR ROUNDS ROUNDS_SAVE")
print(" trains a model (experimental)")
print(" (how experimental? well, every time I tried it, it flooded w/ NaNs)")
print(" note: ROUNDS < 0 means 'forever'. ROUNDS_SAVE <= 0 is not a good idea.")
print(" expects roughly execute's input as SAMPLES_DIR/IDXa.png")
print(" expects roughly execute's output as SAMPLES_DIR/IDXb.png")
print(" (i.e. my_samples/0a.png is the first pre-nearest-scaled image,")
print(" my_samples/0b.png is the first original image)")
print(" in addition, SAMPLES_DIR/samples_count.txt indicates sample count")
print(" won't pad or tile, so keep image sizes sane")
print("python3 -m examples.vgg7 samplify IMG_A IMG_B SAMPLES_DIR SIZE")
print(" creates overlapping micropatches (SIZExSIZE w/ 7-pixel border) for training")
print(" maintains/creates samples_count.txt automatically")
print(" unlike training, IMG_A must be exactly half the size of IMG_B")
sys.exit(1)
cmd = sys.argv[1]
vgg7 = Vgg7()
def nansbane(p):
if numpy.isnan(numpy.min(p.numpy())):
raise Exception("A NaN in the model has been detected. This model will not be interacted with to prevent further damage.")
def load_and_save(path, save):
if save:
for v in vgg7.get_parameters():
nansbane(v)
kn = KinneDir(model, save)
kn.parameters(vgg7.get_parameters())
kn.close()
if not save:
for v in vgg7.get_parameters():
nansbane(v)
if cmd == "import":
src = sys.argv[2]
model = sys.argv[3]
vgg7.load_waifu2x_json(json.load(open(src, "rb")))
if not os.path.isdir(model):
os.mkdir(model)
load_and_save(model, True)
elif cmd == "execute":
model = sys.argv[2]
in_file = sys.argv[3]
out_file = sys.argv[4]
load_and_save(model, False)
image_save(out_file, vgg7.forward(Tensor(image_load(in_file))).numpy())
elif cmd == "execute_full":
model = sys.argv[2]
in_file = sys.argv[3]
out_file = sys.argv[4]
load_and_save(model, False)
image_save(out_file, vgg7.forward_tiled(image_load(in_file), 156))
elif cmd == "new":
model = sys.argv[2]
if not os.path.isdir(model):
os.mkdir(model)
load_and_save(model, True)
elif cmd == "train":
model = sys.argv[2]
samples_base = sys.argv[3]
samples_count = get_sample_count(samples_base)
rounds = int(sys.argv[4])
rounds_per_save = int(sys.argv[5])
load_and_save(model, False)
# Initialize sample probabilities.
# This is used to try and get the network to focus on "interesting" samples,
# which works nicely with the microsample system.
sample_probs = None
sample_probs_path = model + "/sample_probs.bin"
try:
# try to read...
sample_probs = numpy.fromfile(sample_probs_path, "<f8")
if sample_probs.shape[0] != samples_count:
print("sample probs size != sample count - initializing")
sample_probs = None
except:
# it's fine
print("sample probs could not be loaded - initializing")
if sample_probs is None:
# This stupidly high amount is used to force an initial pass over all samples
sample_probs = numpy.ones(samples_count) * 1000
print("Training...")
# Adam has a tendency to destroy the state of the network when restarted
# Plus it's slower
optim = SGD(vgg7.get_parameters())
rnum = 0
while True:
# The way the -1 option works is that rnum is never -1.
if rnum == rounds:
break
sample_idx = 0
try:
sample_idx = numpy.random.choice(samples_count, p = sample_probs / sample_probs.sum())
except:
print("exception occurred (PROBABLY value-probabilities-dont-sum-to-1)")
sample_idx = random.randint(0, samples_count - 1)
x_img = image_load(samples_base + "/" + str(sample_idx) + "a.png")
y_img = image_load(samples_base + "/" + str(sample_idx) + "b.png")
sample_x = Tensor(x_img, requires_grad = False)
sample_y = Tensor(y_img, requires_grad = False)
# magic code roughly from readme example
# An explanation, in case anyone else has to go down this path:
# This runs the actual network normally
out = vgg7.forward(sample_x)
# Subtraction determines error here (as this is an image, not classification).
# *Abs is the important bit* - at least for me, anyway.
# The training process seeks to minimize this 'loss' value.
# Minimization of loss *tends towards negative infinity*, so without the abs,
# or without an implicit abs (the mul in the README),
# loss will always go haywire in one direction or another.
# Mean determines how errors are treated.
# Do not use Sum. I tried that. It worked while I was using 1x1 patches...
# Then it went exponential.
# Also, Mean goes *after* abs. I realize this should have been obvious to me.
loss = sample_y.sub(out).abs().mean()
# This is the bit where tinygrad works backward from the loss
optim.zero_grad()
loss.backward()
# And this updates the parameters
optim.step()
# warning: used by sample probability adjuster
loss_indicator = loss.max().numpy()[0]
print("Round " + str(rnum) + " : " + str(loss_indicator))
if (rnum % rounds_per_save) == 0:
print("Saving")
load_and_save(model, True)
sample_probs.astype("<f8", "C").tofile(sample_probs_path)
# Update round state
# Number
rnum = rnum + 1
# Probability management
# there must always be a probability, no matter how slim, even if loss goes to 0
sample_probs[sample_idx] = max(loss_indicator, 1.e-10)
# if we were told to save every round, we already saved
if rounds_per_save != 1:
print("Done with all rounds, saving")
load_and_save(model, True)
sample_probs.astype("<f8", "C").tofile(sample_probs_path)
elif cmd == "samplify":
a_img = sys.argv[2]
b_img = sys.argv[3]
samples_base = sys.argv[4]
sample_size = int(sys.argv[5])
samples_count = get_sample_count(samples_base)
# This bit is interesting because it actually does some work.
# Not much, but some work.
a_img = image_load(a_img)
b_img = image_load(b_img)
# as with the main library body,
# Y X order is used here
# assertion before pre-upscaling is performed
assert a_img.shape[2] == (b_img.shape[2] // 2)
assert a_img.shape[3] == (b_img.shape[3] // 2)
# pre-upscaling - this matches the sizes (and coordinates)
a_img = a_img.repeat(2, 2).repeat(2, 3)
samples_added = 0
# actual patch extraction
for posy in range(CONTEXT, b_img.shape[2] - (CONTEXT + sample_size - 1), sample_size):
for posx in range(CONTEXT, b_img.shape[3] - (CONTEXT + sample_size - 1), sample_size):
# this is a viable patch location, add it
# note the ranges here:
# + there are always CONTEXT pixels *before* the point
# + with no subtraction at the end, there'd already be a pixel *at* the point,
# as ranges are exclusive
# + additionally, there are sample_size - 1 additional sample pixels
# + additionally, there are CONTEXT additional pixels
# + therefore there are CONTEXT + sample_size pixels *at & after* the point
patch_x = a_img[:, :, posy - CONTEXT : posy + CONTEXT + sample_size, posx - CONTEXT : posx + CONTEXT + sample_size]
patch_y = b_img[:, :, posy : posy + sample_size, posx : posx + sample_size]
image_save(f"{samples_base}/{str(samples_count)}a.png", patch_x)
image_save(f"{samples_base}/{str(samples_count)}b.png", patch_y)
samples_count += 1
samples_added += 1
print(f"Added {str(samples_added)} samples")
set_sample_count(samples_base, samples_count)
else:
print("unknown command")

View File

@@ -1,71 +0,0 @@
from tinygrad.tensor import Tensor
import numpy
import os
# Format Details:
# A KINNE parameter set is stored as a set of files named "snoop_bin_*.bin",
# where the * is a number starting at 0.
# Each file is simply raw little-endian floats,
# as readable by: numpy.fromfile(path, "<f4")
# and as writable by: t.numpy().astype("<f4", "C").tofile(path)
# This format is intended to be extremely simple to get into literally anything.
# It is not intended to be structural or efficient - reloading a network when
# unnecessary is inefficient anyway.
# Ultimately, the idea behind this is as a format that, while it will always
# require code to implement, requires as little code as possible, and therefore
# works as a suitable interchange for any situation.
# To add to the usability of the format, some informal metadata is provided,
# in "meta.txt", which provides human-readable shape information.
# This is intended to help with debugging other implementations of the network,
# by providing concrete human-readable information on tensor shapes.
# It is NOT meant to be read by machines.
class KinneDir:
"""
A KinneDir is an intermediate object used to save or load a model.
"""
def __init__(self, base: str, save: bool):
"""
Opens a new KINNE directory with the given base path.
If save is true, the directory is created if possible.
(This does not create parents.)
Save being true or false determines if tensors are loaded or saved.
The base path is of the form "models/abc" - no trailing slash.
It is important that if you wish to save in the current directory,
you use ".", not the empty string.
"""
if save and not os.path.isdir(base):
os.mkdir(base)
self.base = base + "/snoop_bin_"
self.next_part_index = 0
self.save = save
if save:
self.metadata = open(base + "/meta.txt", "w")
def parameter(self, t: Tensor):
"""
parameter loads or saves a parameter, given as a tensor.
"""
path = f"{self.base}{self.next_part_index}.bin"
if self.save:
t.numpy().astype("<f4", "C").tofile(path)
self.metadata.write(f"{self.next_part_index}: {t.shape}\n")
else:
t.assign(Tensor(numpy.fromfile(path, "<f4")).reshape(shape=t.shape))
self.next_part_index += 1
def parameters(self, params):
"""
parameters loads or saves a sequence of parameters.
It's intended for easily attaching to an existing model,
assuming that your parameters list orders are consistent.
(In other words, usage with tinygrad.utils.get_parameters isn't advised -
it's too 'implicit'.)
"""
for t in params:
self.parameter(t)
def close(self):
if self.save:
self.metadata.close()

View File

@@ -1,178 +0,0 @@
# Implementation of waifu2x vgg7 in tinygrad.
# Obviously, not developed, supported, etc. by the original waifu2x author(s).
import numpy
from tinygrad.tensor import Tensor
from PIL import Image
# File Formats
# tinygrad convolution tensor input layout is (1,c,y,x) - and therefore the form for all images used in the project
# tinygrad convolution tensor weight layout is (outC,inC,H,W) - this matches NCNN (and therefore KINNE), but not waifu2x json
def image_load(path) -> numpy.ndarray:
"""
Loads an image in the shape expected by other functions in this module.
Doesn't Tensor it, in case you need to do further work with it.
"""
# file
na = numpy.array(Image.open(path))
# fix shape
na = numpy.moveaxis(na, [2,0,1], [0,1,2])
# shape is now (3,h,w), add 1
na = na.reshape(1,3,na.shape[1],na.shape[2])
# change type
na = na.astype("float32") / 255.0
return na
def image_save(path, na: numpy.ndarray):
"""
Saves an image of the shape expected by other functions in this module.
However, note this expects a numpy array.
"""
# change type
na = numpy.fmax(numpy.fmin(na * 255.0, 255), 0).astype("uint8")
# shape is now (1,3,h,w), remove 1
na = na.reshape(3,na.shape[2],na.shape[3])
# fix shape
na = numpy.moveaxis(na, [0,1,2], [2,0,1])
# shape is now (h,w,3)
# file
Image.fromarray(na).save(path)
# The Model
class Conv3x3Biased:
"""
A 3x3 convolution layer with some utility functions.
"""
def __init__(self, inC, outC, last = False):
# Massively overstate the weights to get them to be focused on,
# since otherwise the biases overrule everything
self.weight = Tensor.uniform(outC, inC, 3, 3) * 16.0
# Layout-wise, blatant cheat, but serious_mnist does it. I'd guess channels either have to have a size of 1 or whatever the target is?
# Values-wise, entirely different blatant cheat.
# In most cases, use uniform bias, but tiny.
# For the last layer, use just 0.5, constant.
if last:
self.bias = Tensor.zeros(1, outC, 1, 1) + 0.5
else:
self.bias = Tensor.uniform(1, outC, 1, 1)
def forward(self, x):
# You might be thinking, "but what about padding?"
# Answer: Tiling is used to stitch everything back together, though you could pad the image before providing it.
return x.conv2d(self.weight).add(self.bias)
def get_parameters(self) -> list:
return [self.weight, self.bias]
def load_waifu2x_json(self, layer: dict):
# Weights in this file are outChannel,inChannel,X,Y.
# Not outChannel,inChannel,Y,X.
# Therefore, transpose it before assignment.
# I have long since forgotten how I worked this out.
self.weight.assign(Tensor(layer["weight"]).reshape(shape=self.weight.shape).transpose(2, 3))
self.bias.assign(Tensor(layer["bias"]).reshape(shape=self.bias.shape))
class Vgg7:
"""
The 'vgg7' waifu2x network.
Lower quality and slower than even upconv7 (nevermind cunet), but is very easy to implement and test.
"""
def __init__(self):
self.conv1 = Conv3x3Biased(3, 32)
self.conv2 = Conv3x3Biased(32, 32)
self.conv3 = Conv3x3Biased(32, 64)
self.conv4 = Conv3x3Biased(64, 64)
self.conv5 = Conv3x3Biased(64, 128)
self.conv6 = Conv3x3Biased(128, 128)
self.conv7 = Conv3x3Biased(128, 3, True)
def forward(self, x):
"""
Forward pass: Actually runs the network.
Input format: (1, 3, Y, X)
Output format: (1, 3, Y - 14, X - 14)
(the - 14 represents the 7-pixel context border that is lost)
"""
x = self.conv1.forward(x).leakyrelu(0.1)
x = self.conv2.forward(x).leakyrelu(0.1)
x = self.conv3.forward(x).leakyrelu(0.1)
x = self.conv4.forward(x).leakyrelu(0.1)
x = self.conv5.forward(x).leakyrelu(0.1)
x = self.conv6.forward(x).leakyrelu(0.1)
x = self.conv7.forward(x)
return x
def get_parameters(self) -> list:
return self.conv1.get_parameters() + self.conv2.get_parameters() + self.conv3.get_parameters() + self.conv4.get_parameters() + self.conv5.get_parameters() + self.conv6.get_parameters() + self.conv7.get_parameters()
def load_waifu2x_json(self, data: list):
"""
Loads weights from one of the waifu2x JSON files, i.e. waifu2x/models/vgg_7/art/noise0_model.json
data (passed in) is assumed to be the output of json.load or some similar on such a file
"""
self.conv1.load_waifu2x_json(data[0])
self.conv2.load_waifu2x_json(data[1])
self.conv3.load_waifu2x_json(data[2])
self.conv4.load_waifu2x_json(data[3])
self.conv5.load_waifu2x_json(data[4])
self.conv6.load_waifu2x_json(data[5])
self.conv7.load_waifu2x_json(data[6])
def forward_tiled(self, image: numpy.ndarray, tile_size: int) -> numpy.ndarray:
"""
Given an ndarray image as loaded by image_load (NOT a tensor), scales it, pads it, splits it up, forwards the pieces, and reconstitutes it.
Note that you really shouldn't try to run anything not (1, 3, *, *) through this.
"""
# Constant that only really gets repeated a ton here.
context = 7
context2 = context + context
# Notably, numpy is used here because it makes this fine manipulation a lot simpler.
# Scaling first - repeat on axis 2 and axis 3 (Y & X)
image = image.repeat(2, 2).repeat(2, 3)
# Resulting image buffer. This is made before the input is padded,
# since the input has the padded shape right now.
image_out = numpy.zeros(image.shape)
# Padding next. Note that this padding is done on the whole image.
# Padding the tiles would lose critical context, cause seams, etc.
image = numpy.pad(image, [[0, 0], [0, 0], [context, context], [context, context]], mode = "edge")
# Now for tiling.
# The output tile size is the usable output from an input tile (tile_size).
# As such, the tiles overlap.
out_tile_size = tile_size - context2
for out_y in range(0, image_out.shape[2], out_tile_size):
for out_x in range(0, image_out.shape[3], out_tile_size):
# Input is sourced from the same coordinates, but some stuff ought to be
# noted here for future reference:
# + out_x/y's equivalent position w/ the padding is out_x + context.
# + The output, however, is without context. Input needs context.
# + Therefore, the input rectangle is expanded on all sides by context.
# + Therefore, the input position has the context subtracted again.
# + Therefore:
in_y = out_y
in_x = out_x
# not shown: in_w/in_h = tile_size (as opposed to out_tile_size)
# Extract tile.
# Note that numpy will auto-crop this at the bottom-right.
# This will never be a problem, as tiles are specifically chosen within the padded section.
tile = image[:, :, in_y:in_y + tile_size, in_x:in_x + tile_size]
# Extracted tile dimensions -> output dimensions
# This is important because of said cropping, otherwise it'd be interior tile size.
out_h = tile.shape[2] - context2
out_w = tile.shape[3] - context2
# Process tile.
tile_t = Tensor(tile)
tile_fwd_t = self.forward(tile_t)
# Replace tile.
image_out[:, :, out_y:out_y + out_h, out_x:out_x + out_w] = tile_fwd_t.numpy()
return image_out

View File

@@ -1,406 +0,0 @@
# https://raw.githubusercontent.com/pjreddie/darknet/master/cfg/yolov3.cfg
import sys
import io
import time
import math
import cv2
import numpy as np
from PIL import Image
from tinygrad.tensor import Tensor
from tinygrad.nn import BatchNorm2d, Conv2d
from extra.utils import fetch
def show_labels(prediction, confidence=0.5, num_classes=80):
coco_labels = fetch('https://raw.githubusercontent.com/pjreddie/darknet/master/data/coco.names')
coco_labels = coco_labels.decode('utf-8').split('\n')
prediction = prediction.detach().cpu().numpy()
conf_mask = (prediction[:,:,4] > confidence)
prediction *= np.expand_dims(conf_mask, 2)
labels = []
# Iterate over batches
for img_pred in prediction:
max_conf = np.amax(img_pred[:,5:5+num_classes], axis=1)
max_conf_score = np.argmax(img_pred[:,5:5+num_classes], axis=1)
max_conf_score = np.expand_dims(max_conf_score, axis=1)
max_conf = np.expand_dims(max_conf, axis=1)
seq = (img_pred[:,:5], max_conf, max_conf_score)
image_pred = np.concatenate(seq, axis=1)
non_zero_ind = np.nonzero(image_pred[:,4])[0]
assert all(image_pred[non_zero_ind,0] > 0)
image_pred_ = np.reshape(image_pred[np.squeeze(non_zero_ind),:], (-1, 7))
classes, indexes = np.unique(image_pred_[:, -1], return_index=True)
for index, coco_class in enumerate(classes):
label, probability = coco_labels[int(coco_class)], image_pred_[indexes[index]][4] * 100
print(f"Detected {label} {probability:.2f}")
labels.append(label)
return labels
def add_boxes(img, prediction):
if isinstance(prediction, int): # no predictions
return img
coco_labels = fetch('https://raw.githubusercontent.com/pjreddie/darknet/master/data/coco.names')
coco_labels = coco_labels.decode('utf-8').split('\n')
height, width = img.shape[0:2]
scale_factor = 608 / width
prediction[:,[1,3]] -= (608 - scale_factor * width) / 2
prediction[:,[2,4]] -= (608 - scale_factor * height) / 2
for pred in prediction:
corner1 = tuple(pred[1:3].astype(int))
corner2 = tuple(pred[3:5].astype(int))
w = corner2[0] - corner1[0]
h = corner2[1] - corner1[1]
corner2 = (corner2[0] + w, corner2[1] + h)
label = coco_labels[int(pred[-1])]
img = cv2.rectangle(img, corner1, corner2, (255, 0, 0), 2)
t_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_PLAIN, 1 , 1)[0]
c2 = corner1[0] + t_size[0] + 3, corner1[1] + t_size[1] + 4
img = cv2.rectangle(img, corner1, c2, (255, 0, 0), -1)
img = cv2.putText(img, label, (corner1[0], corner1[1] + t_size[1] + 4), cv2.FONT_HERSHEY_PLAIN, 1, [225,255,255], 1)
return img
def bbox_iou(box1, box2):
"""
Returns the IoU of two bounding boxes
IoU: IoU = Area Of Overlap / Area of Union -> How close the predicted bounding box is
to the ground truth bounding box. Higher IoU = Better accuracy
In training, used to track accuracy. with inference, using to remove duplicate bounding boxes
"""
# Get the coordinates of bounding boxes
b1_x1, b1_y1, b1_x2, b1_y2 = box1[:,0], box1[:,1], box1[:,2], box1[:,3]
b2_x1, b2_y1, b2_x2, b2_y2 = box2[:,0], box2[:,1], box2[:,2], box2[:,3]
# get the coordinates of the intersection rectangle
inter_rect_x1 = np.maximum(b1_x1, b2_x1)
inter_rect_y1 = np.maximum(b1_y1, b2_y1)
inter_rect_x2 = np.maximum(b1_x2, b2_x2)
inter_rect_y2 = np.maximum(b1_y2, b2_y2)
#Intersection area
inter_area = np.clip(inter_rect_x2 - inter_rect_x1 + 1, 0, 99999) * np.clip(inter_rect_y2 - inter_rect_y1 + 1, 0, 99999)
#Union Area
b1_area = (b1_x2 - b1_x1 + 1)*(b1_y2 - b1_y1 + 1)
b2_area = (b2_x2 - b2_x1 + 1)*(b2_y2 - b2_y1 + 1)
iou = inter_area / (b1_area + b2_area - inter_area)
return iou
def process_results(prediction, confidence=0.9, num_classes=80, nms_conf=0.4):
prediction = prediction.detach().cpu().numpy()
conf_mask = (prediction[:,:,4] > confidence)
conf_mask = np.expand_dims(conf_mask, 2)
prediction = prediction * conf_mask
# Non max suppression
box_corner = prediction
box_corner[:,:,0] = (prediction[:,:,0] - prediction[:,:,2]/2)
box_corner[:,:,1] = (prediction[:,:,1] - prediction[:,:,3]/2)
box_corner[:,:,2] = (prediction[:,:,0] + prediction[:,:,2]/2)
box_corner[:,:,3] = (prediction[:,:,1] + prediction[:,:,3]/2)
prediction[:,:,:4] = box_corner[:,:,:4]
write = False
# Process img
img_pred = prediction[0]
max_conf = np.amax(img_pred[:,5:5+num_classes], axis=1)
max_conf_score = np.argmax(img_pred[:,5:5+num_classes], axis=1)
max_conf_score = np.expand_dims(max_conf_score, axis=1)
max_conf = np.expand_dims(max_conf, axis=1)
seq = (img_pred[:,:5], max_conf, max_conf_score)
image_pred = np.concatenate(seq, axis=1)
non_zero_ind = np.nonzero(image_pred[:,4])[0]
assert all(image_pred[non_zero_ind,0] > 0)
image_pred_ = np.reshape(image_pred[np.squeeze(non_zero_ind),:], (-1, 7))
if image_pred_.shape[0] == 0:
print("No detections found!")
return 0
for cls in np.unique(image_pred_[:, -1]):
# perform NMS, get the detections with one particular class
cls_mask = image_pred_*np.expand_dims(image_pred_[:, -1] == cls, axis=1)
class_mask_ind = np.squeeze(np.nonzero(cls_mask[:,-2]))
# class_mask_ind = np.nonzero()
image_pred_class = np.reshape(image_pred_[class_mask_ind], (-1, 7))
# sort the detections such that the entry with the maximum objectness
# confidence is at the top
conf_sort_index = np.argsort(image_pred_class[:,4])
image_pred_class = image_pred_class[conf_sort_index]
for i in range(image_pred_class.shape[0]):
# Get the IOUs of all boxes that come after the one we are looking at in the loop
try:
ious = bbox_iou(np.expand_dims(image_pred_class[i], axis=0), image_pred_class[i+1:])
except:
break
# Zero out all the detections that have IoU > threshold
iou_mask = np.expand_dims((ious < nms_conf), axis=1)
image_pred_class[i+1:] *= iou_mask
# Remove the non-zero entries
non_zero_ind = np.squeeze(np.nonzero(image_pred_class[:,4]))
image_pred_class = np.reshape(image_pred_class[non_zero_ind], (-1, 7))
batch_ind = np.array([[0]])
seq = (batch_ind, image_pred_class)
if not write:
output, write = np.concatenate(seq, axis=1), True
else:
out = np.concatenate(seq, axis=1)
output = np.concatenate((output,out))
return output
def infer(model, img):
img = np.array(Image.fromarray(img).resize((608, 608)))
img = img[:,:,::-1].transpose((2,0,1))
img = img[np.newaxis,:,:,:]/255.0
prediction = model.forward(Tensor(img.astype(np.float32)))
return prediction
def parse_cfg(cfg):
# Return a list of blocks
lines = cfg.decode("utf-8").split('\n')
lines = [x for x in lines if len(x) > 0]
lines = [x for x in lines if x[0] != '#']
lines = [x.rstrip().lstrip() for x in lines]
block, blocks = {}, []
for line in lines:
if line[0] == "[":
if len(block) != 0:
blocks.append(block)
block = {}
block["type"] = line[1:-1].rstrip()
else:
key,value = line.split("=")
block[key.rstrip()] = value.lstrip()
blocks.append(block)
return blocks
# TODO: Speed up this function, avoid copying stuff from GPU to CPU
def predict_transform(prediction, inp_dim, anchors, num_classes):
batch_size = prediction.shape[0]
stride = inp_dim // prediction.shape[2]
grid_size = inp_dim // stride
bbox_attrs = 5 + num_classes
num_anchors = len(anchors)
prediction = prediction.reshape(shape=(batch_size, bbox_attrs*num_anchors, grid_size*grid_size))
prediction = prediction.transpose(1, 2)
prediction = prediction.reshape(shape=(batch_size, grid_size*grid_size*num_anchors, bbox_attrs))
prediction_cpu = prediction.cpu().numpy()
for i in (0, 1, 4):
prediction_cpu[:,:,i] = 1 / (1 + np.exp(-prediction_cpu[:,:,i]))
# Add the center offsets
grid = np.arange(grid_size)
a, b = np.meshgrid(grid, grid)
x_offset = a.reshape((-1, 1))
y_offset = b.reshape((-1, 1))
x_y_offset = np.concatenate((x_offset, y_offset), 1)
x_y_offset = np.tile(x_y_offset, (1, num_anchors))
x_y_offset = x_y_offset.reshape((-1,2))
x_y_offset = np.expand_dims(x_y_offset, 0)
anchors = [(a[0]/stride, a[1]/stride) for a in anchors]
anchors = np.tile(anchors, (grid_size*grid_size, 1))
anchors = np.expand_dims(anchors, 0)
prediction_cpu[:,:,:2] += x_y_offset
prediction_cpu[:,:,2:4] = np.exp(prediction_cpu[:,:,2:4])*anchors
prediction_cpu[:,:,5:5+num_classes] = 1 / (1 + np.exp(-prediction_cpu[:,:,5:5+num_classes]))
prediction_cpu[:,:,:4] *= stride
return Tensor(prediction_cpu)
class Darknet:
def __init__(self, cfg):
self.blocks = parse_cfg(cfg)
self.net_info, self.module_list = self.create_modules(self.blocks)
print("Modules length:", len(self.module_list))
def create_modules(self, blocks):
net_info = blocks[0] # Info about model hyperparameters
prev_filters, filters = 3, None
output_filters, module_list = [], []
## module
for index, x in enumerate(blocks[1:]):
module_type = x["type"]
module = []
if module_type == "convolutional":
try:
batch_normalize, bias = int(x["batch_normalize"]), False
except:
batch_normalize, bias = 0, True
# layer
activation = x["activation"]
filters = int(x["filters"])
padding = int(x["pad"])
pad = (int(x["size"]) - 1) // 2 if padding else 0
module.append(Conv2d(prev_filters, filters, int(x["size"]), int(x["stride"]), pad, bias=bias))
# BatchNorm2d
if batch_normalize:
module.append(BatchNorm2d(filters, eps=1e-05, track_running_stats=True))
# LeakyReLU activation
if activation == "leaky":
module.append(lambda x: x.leakyrelu(0.1))
elif module_type == "maxpool":
size, stride = int(x["size"]), int(x["stride"])
module.append(lambda x: x.max_pool2d(kernel_size=(size, size), stride=stride))
elif module_type == "upsample":
module.append(lambda x: Tensor(x.cpu().numpy().repeat(2, axis=-2).repeat(2, axis=-1)))
elif module_type == "route":
x["layers"] = x["layers"].split(",")
# Start of route
start = int(x["layers"][0])
# End if it exists
try:
end = int(x["layers"][1])
except:
end = 0
if start > 0: start -= index
if end > 0: end -= index
module.append(lambda x: x)
if end < 0:
filters = output_filters[index + start] + output_filters[index + end]
else:
filters = output_filters[index + start]
# Shortcut corresponds to skip connection
elif module_type == "shortcut":
module.append(lambda x: x)
elif module_type == "yolo":
mask = list(map(int, x["mask"].split(",")))
anchors = [int(a) for a in x["anchors"].split(",")]
anchors = [(anchors[i], anchors[i+1]) for i in range(0, len(anchors), 2)]
module.append([anchors[i] for i in mask])
# Append to module_list
module_list.append(module)
if filters is not None:
prev_filters = filters
output_filters.append(filters)
return (net_info, module_list)
def dump_weights(self):
for i in range(len(self.module_list)):
module_type = self.blocks[i + 1]["type"]
if module_type == "convolutional":
print(self.blocks[i + 1]["type"], "weights", i)
model = self.module_list[i]
conv = model[0]
print(conv.weight.cpu().numpy()[0][0][0])
if conv.bias is not None:
print("biases")
print(conv.bias.shape)
print(conv.bias.cpu().numpy()[0][0:5])
else:
print("None biases for layer", i)
def load_weights(self, url):
weights = np.frombuffer(fetch(url), dtype=np.float32)[5:]
ptr = 0
for i in range(len(self.module_list)):
module_type = self.blocks[i + 1]["type"]
if module_type == "convolutional":
model = self.module_list[i]
try: # we have batchnorm, load conv weights without biases, and batchnorm values
batch_normalize = int(self.blocks[i+1]["batch_normalize"])
except: # no batchnorm, load conv weights + biases
batch_normalize = 0
conv = model[0]
if batch_normalize:
bn = model[1]
# Get the number of weights of batchnorm
num_bn_biases = math.prod(bn.bias.shape)
# Load weights
bn_biases = Tensor(weights[ptr:ptr + num_bn_biases])
ptr += num_bn_biases
bn_weights = Tensor(weights[ptr:ptr+num_bn_biases])
ptr += num_bn_biases
bn_running_mean = Tensor(weights[ptr:ptr+num_bn_biases])
ptr += num_bn_biases
bn_running_var = Tensor(weights[ptr:ptr+num_bn_biases])
ptr += num_bn_biases
# Cast the loaded weights into dims of model weights
bn_biases = bn_biases.reshape(shape=tuple(bn.bias.shape))
bn_weights = bn_weights.reshape(shape=tuple(bn.weight.shape))
bn_running_mean = bn_running_mean.reshape(shape=tuple(bn.running_mean.shape))
bn_running_var = bn_running_var.reshape(shape=tuple(bn.running_var.shape))
# Copy data
bn.bias = bn_biases
bn.weight = bn_weights
bn.running_mean = bn_running_mean
bn.running_var = bn_running_var
else:
# load biases of the conv layer
num_biases = math.prod(conv.bias.shape)
# Load weights
conv_biases = Tensor(weights[ptr: ptr+num_biases])
ptr += num_biases
# Reshape
conv_biases = conv_biases.reshape(shape=tuple(conv.bias.shape))
# Copy
conv.bias = conv_biases
# Load weighys for conv layers
num_weights = math.prod(conv.weight.shape)
conv_weights = Tensor(weights[ptr:ptr+num_weights])
ptr += num_weights
conv_weights = conv_weights.reshape(shape=tuple(conv.weight.shape))
conv.weight = conv_weights
def forward(self, x):
modules = self.blocks[1:]
outputs = {} # Cached outputs for route layer
detections, write = None, False
for i, module in enumerate(modules):
module_type = (module["type"])
if module_type == "convolutional" or module_type == "upsample":
for layer in self.module_list[i]:
x = layer(x)
elif module_type == "route":
layers = module["layers"]
layers = [int(a) for a in layers]
if (layers[0]) > 0:
layers[0] = layers[0] - i
if len(layers) == 1:
x = outputs[i + (layers[0])]
else:
if (layers[1]) > 0: layers[1] = layers[1] - i
map1 = outputs[i + layers[0]]
map2 = outputs[i + layers[1]]
x = Tensor(np.concatenate((map1.cpu().numpy(), map2.cpu().numpy()), axis=1))
elif module_type == "shortcut":
from_ = int(module["from"])
x = outputs[i - 1] + outputs[i + from_]
elif module_type == "yolo":
anchors = self.module_list[i][0]
inp_dim = int(self.net_info["height"]) # 416
num_classes = int(module["classes"])
x = predict_transform(x, inp_dim, anchors, num_classes)
if not write:
detections, write = x, True
else:
detections = Tensor(np.concatenate((detections.cpu().numpy(), x.cpu().numpy()), axis=1))
outputs[i] = x
return detections
if __name__ == "__main__":
model = Darknet(fetch('https://raw.githubusercontent.com/pjreddie/darknet/master/cfg/yolov3.cfg'))
print("Loading weights file (237MB). This might take a while…")
model.load_weights('https://pjreddie.com/media/files/yolov3.weights')
if len(sys.argv) > 1:
url = sys.argv[1]
else:
url = "https://github.com/ayooshkathuria/pytorch-yolo-v3/raw/master/dog-cycle-car.png"
if url == 'webcam':
cap = cv2.VideoCapture(0)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
while 1:
_ = cap.grab() # discard one frame to circumvent capture buffering
ret, frame = cap.read()
prediction = process_results(infer(model, frame))
img = Image.fromarray(frame[:, :, [2,1,0]])
boxes = add_boxes(np.array(img.resize((608, 608))), prediction)
boxes = cv2.cvtColor(boxes, cv2.COLOR_RGB2BGR)
cv2.imshow('yolo', boxes)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
elif url.startswith('http'):
img_stream = io.BytesIO(fetch(url))
img = cv2.imdecode(np.frombuffer(img_stream.read(), np.uint8), 1)
else:
img = cv2.imread(url)
st = time.time()
print('running inference…')
prediction = infer(model, img)
print(f'did inference in {(time.time() - st):2f}s')
show_labels(prediction)
prediction = process_results(prediction)
boxes = add_boxes(np.array(Image.fromarray(img).resize((608, 608))), prediction)
cv2.imwrite('boxes.jpg', boxes)

View File

@@ -1,17 +0,0 @@
#!/usr/bin/env python3
import os
from ultralytics import YOLO
import onnx
from extra.onnx import get_run_onnx
from tinygrad.tensor import Tensor
os.chdir("/tmp")
if not os.path.isfile("yolov8n-seg.onnx"):
model = YOLO("yolov8n-seg.pt")
model.export(format="onnx", imgsz=[480,640])
onnx_model = onnx.load(open("yolov8n-seg.onnx", "rb"))
# TODO: move get example inputs to onnx
input_shapes = {inp.name:tuple(x.dim_value for x in inp.type.tensor_type.shape.dim) for inp in onnx_model.graph.input}
print(input_shapes)
run_onnx = get_run_onnx(onnx_model)
run_onnx({"images": Tensor.zeros(1,3,480,640)}, debug=True)

View File

@@ -1,439 +0,0 @@
from tinygrad.nn import Conv2d, BatchNorm2d
from tinygrad.tensor import Tensor
import numpy as np
from itertools import chain
from extra.utils import get_child, fetch, download_file
from pathlib import Path
import cv2
from collections import defaultdict
import os
import time, io, sys
from tinygrad.state import safe_load, load_state_dict
#Model architecture from https://github.com/ultralytics/ultralytics/issues/189
#The upsampling class has been taken from this pull request https://github.com/tinygrad/tinygrad/pull/784 by dc-dc-dc. Now 2(?) models use upsampling. (retinet and this)
#Pre processing image functions.
def compute_transform(image, new_shape=(640, 640), auto=False, scaleFill=False, scaleup=True, stride=32):
shape = image.shape[:2] # current shape [height, width]
new_shape = (new_shape, new_shape) if isinstance(new_shape, int) else new_shape
r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])
r = min(r, 1.0) if not scaleup else r
new_unpad = (int(round(shape[1] * r)), int(round(shape[0] * r)))
dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1]
dw, dh = (np.mod(dw, stride), np.mod(dh, stride)) if auto else (0.0, 0.0)
new_unpad = (new_shape[1], new_shape[0]) if scaleFill else new_unpad
dw /= 2
dh /= 2
image = cv2.resize(image, new_unpad, interpolation=cv2.INTER_LINEAR) if shape[::-1] != new_unpad else image
top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1))
left, right = int(round(dw - 0.1)), int(round(dw + 0.1))
image = cv2.copyMakeBorder(image, top, bottom, left, right, cv2.BORDER_CONSTANT, value=(114, 114, 114))
return image
def preprocess(im, imgsz=640, model_stride=32, model_pt=True):
same_shapes = all(x.shape == im[0].shape for x in im)
auto = same_shapes and model_pt
im = Tensor([compute_transform(x, new_shape=imgsz, auto=auto, stride=model_stride) for x in im])
im = Tensor.stack(im) if im.shape[0] > 1 else im
im = im[..., ::-1].permute(0, 3, 1, 2) # BGR to RGB, BHWC to BCHW, (n, 3, h, w)
im /= 255 # 0 - 255 to 0.0 - 1.0
return im
# Post Processing functions
def box_area(box):
return (box[:, 2] - box[:, 0]) * (box[:, 3] - box[:, 1])
def box_iou(box1, box2):
lt = np.maximum(box1[:, None, :2], box2[:, :2])
rb = np.minimum(box1[:, None, 2:], box2[:, 2:])
wh = np.clip(rb - lt, 0, None)
inter = wh[:, :, 0] * wh[:, :, 1]
area1 = box_area(box1)[:, None]
area2 = box_area(box2)[None, :]
iou = inter / (area1 + area2 - inter)
return iou
def compute_nms(boxes, scores, iou_threshold):
order, keep = scores.argsort()[::-1], []
while order.size > 0:
i = order[0]
keep.append(i)
if order.size == 1:
break
iou = box_iou(boxes[i][None, :], boxes[order[1:]])
inds = np.where(iou.squeeze() <= iou_threshold)[0]
order = order[inds + 1]
return np.array(keep)
def non_max_suppression(prediction, conf_thres=0.25, iou_thres=0.45, agnostic=False, max_det=300, nc=0, max_wh=7680):
prediction = prediction[0] if isinstance(prediction, (list, tuple)) else prediction
bs, nc = prediction.shape[0], nc or (prediction.shape[1] - 4)
xc = np.amax(prediction[:, 4:4 + nc], axis=1) > conf_thres
nm = prediction.shape[1] - nc - 4
output = [np.zeros((0, 6 + nm))] * bs
for xi, x in enumerate(prediction):
x = x.swapaxes(0, -1)[xc[xi]]
if not x.shape[0]: continue
box, cls, mask = np.split(x, [4, 4 + nc], axis=1)
conf, j = np.max(cls, axis=1, keepdims=True), np.argmax(cls, axis=1, keepdims=True)
x = np.concatenate((xywh2xyxy(box), conf, j.astype(np.float32), mask), axis=1)
x = x[conf.ravel() > conf_thres]
if not x.shape[0]: continue
x = x[np.argsort(-x[:, 4])]
c = x[:, 5:6] * (0 if agnostic else max_wh)
boxes, scores = x[:, :4] + c, x[:, 4]
i = compute_nms(boxes, scores, iou_thres)[:max_det]
output[xi] = x[i]
return output
def postprocess(preds, img, orig_imgs):
print('copying to CPU now for post processing')
#if you are on CPU, this causes an overflow runtime error. doesn't "seem" to make any difference in the predictions though.
# TODO: make non_max_suppression in tinygrad - to make this faster
preds = preds.cpu().numpy() if isinstance(preds, Tensor) else preds
preds = non_max_suppression(prediction=preds, conf_thres=0.25, iou_thres=0.7, agnostic=False, max_det=300)
all_preds = []
for i, pred in enumerate(preds):
orig_img = orig_imgs[i] if isinstance(orig_imgs, list) else orig_imgs
if not isinstance(orig_imgs, Tensor):
pred[:, :4] = scale_boxes(img.shape[2:], pred[:, :4], orig_img.shape)
all_preds.append(pred)
return all_preds
def draw_bounding_boxes_and_save(orig_img_paths, output_img_paths, all_predictions, class_labels, iou_threshold=0.5):
color_dict = {label: tuple((((i+1) * 50) % 256, ((i+1) * 100) % 256, ((i+1) * 150) % 256)) for i, label in enumerate(class_labels)}
font = cv2.FONT_HERSHEY_SIMPLEX
def is_bright_color(color):
r, g, b = color
brightness = (r * 299 + g * 587 + b * 114) / 1000
return brightness > 127
for img_idx, (orig_img_path, output_img_path, predictions) in enumerate(zip(orig_img_paths, output_img_paths, all_predictions)):
predictions = np.array(predictions)
orig_img = cv2.imread(orig_img_path) if not isinstance(orig_img_path, np.ndarray) else cv2.imdecode(orig_img_path, 1)
height, width, _ = orig_img.shape
box_thickness = int((height + width) / 400)
font_scale = (height + width) / 2500
grouped_preds = defaultdict(list)
object_count = defaultdict(int)
for pred_np in predictions:
grouped_preds[int(pred_np[-1])].append(pred_np)
def draw_box_and_label(pred, color):
x1, y1, x2, y2, conf, _ = pred
x1, y1, x2, y2 = map(int, (x1, y1, x2, y2))
cv2.rectangle(orig_img, (x1, y1), (x2, y2), color, box_thickness)
label = f"{class_labels[class_id]} {conf:.2f}"
text_size, _ = cv2.getTextSize(label, font, font_scale, 1)
label_y, bg_y = (y1 - 4, y1 - text_size[1] - 4) if y1 - text_size[1] - 4 > 0 else (y1 + text_size[1], y1)
cv2.rectangle(orig_img, (x1, bg_y), (x1 + text_size[0], bg_y + text_size[1]), color, -1)
font_color = (0, 0, 0) if is_bright_color(color) else (255, 255, 255)
cv2.putText(orig_img, label, (x1, label_y), font, font_scale, font_color, 1, cv2.LINE_AA)
for class_id, pred_list in grouped_preds.items():
pred_list = np.array(pred_list)
while len(pred_list) > 0:
max_conf_idx = np.argmax(pred_list[:, 4])
max_conf_pred = pred_list[max_conf_idx]
pred_list = np.delete(pred_list, max_conf_idx, axis=0)
color = color_dict[class_labels[class_id]]
draw_box_and_label(max_conf_pred, color)
object_count[class_labels[class_id]] += 1
iou_scores = box_iou(np.array([max_conf_pred[:4]]), pred_list[:, :4])
low_iou_indices = np.where(iou_scores[0] < iou_threshold)[0]
pred_list = pred_list[low_iou_indices]
for low_conf_pred in pred_list:
draw_box_and_label(low_conf_pred, color)
print(f"Image {img_idx + 1}:")
print("Objects detected:")
for obj, count in object_count.items():
print(f"- {obj}: {count}")
cv2.imwrite(output_img_path, orig_img)
print(f'saved detections at {output_img_path}')
# utility functions for forward pass.
def dist2bbox(distance, anchor_points, xywh=True, dim=-1):
lt, rb = distance.chunk(2, dim)
x1y1 = anchor_points - lt
x2y2 = anchor_points + rb
if xywh:
c_xy = (x1y1 + x2y2) / 2
wh = x2y2 - x1y1
return c_xy.cat(wh, dim=1)
return x1y1.cat(x2y2, dim=1)
def make_anchors(feats, strides, grid_cell_offset=0.5):
anchor_points, stride_tensor = [], []
assert feats is not None
for i, stride in enumerate(strides):
_, _, h, w = feats[i].shape
sx = Tensor.arange(w) + grid_cell_offset
sy = Tensor.arange(h) + grid_cell_offset
# this is np.meshgrid but in tinygrad
sx = sx.reshape(1, -1).repeat([h, 1]).reshape(-1)
sy = sy.reshape(-1, 1).repeat([1, w]).reshape(-1)
anchor_points.append(Tensor.stack((sx, sy), -1).reshape(-1, 2))
stride_tensor.append(Tensor.full((h * w), stride))
anchor_points = anchor_points[0].cat(anchor_points[1], anchor_points[2])
stride_tensor = stride_tensor[0].cat(stride_tensor[1], stride_tensor[2]).unsqueeze(1)
return anchor_points, stride_tensor
# this function is from the original implementation
def autopad(k, p=None, d=1): # kernel, padding, dilation
if d > 1:
k = d * (k - 1) + 1 if isinstance(k, int) else [d * (x - 1) + 1 for x in k] # actual kernel-size
if p is None:
p = k // 2 if isinstance(k, int) else [x // 2 for x in k] # auto-pad
return p
def clip_boxes(boxes, shape):
boxes[..., [0, 2]] = np.clip(boxes[..., [0, 2]], 0, shape[1]) # x1, x2
boxes[..., [1, 3]] = np.clip(boxes[..., [1, 3]], 0, shape[0]) # y1, y2
return boxes
def scale_boxes(img1_shape, boxes, img0_shape, ratio_pad=None):
gain = ratio_pad if ratio_pad else min(img1_shape[0] / img0_shape[0], img1_shape[1] / img0_shape[1])
pad = ((img1_shape[1] - img0_shape[1] * gain) / 2, (img1_shape[0] - img0_shape[0] * gain) / 2)
boxes_np = boxes.numpy() if isinstance(boxes, Tensor) else boxes
boxes_np[..., [0, 2]] -= pad[0]
boxes_np[..., [1, 3]] -= pad[1]
boxes_np[..., :4] /= gain
boxes_np = clip_boxes(boxes_np, img0_shape)
return boxes_np
def xywh2xyxy(x):
xy = x[..., :2] # center x, y
wh = x[..., 2:4] # width, height
xy1 = xy - wh / 2 # top left x, y
xy2 = xy + wh / 2 # bottom right x, y
result = np.concatenate((xy1, xy2), axis=-1)
return Tensor(result) if isinstance(x, Tensor) else result
def get_variant_multiples(variant):
return {'n':(0.33, 0.25, 2.0), 's':(0.33, 0.50, 2.0), 'm':(0.67, 0.75, 1.5), 'l':(1.0, 1.0, 1.0), 'x':(1, 1.25, 1.0) }.get(variant, None)
def label_predictions(all_predictions):
class_index_count = defaultdict(int)
for predictions in all_predictions:
predictions = np.array(predictions)
for pred_np in predictions:
class_id = int(pred_np[-1])
class_index_count[class_id] += 1
return dict(class_index_count)
#this is taken from https://github.com/tinygrad/tinygrad/pull/784/files by dc-dc-dc (Now 2 models use upsampling)
class Upsample:
def __init__(self, scale_factor:int, mode: str = "nearest") -> None:
assert mode == "nearest" # only mode supported for now
self.mode = mode
self.scale_factor = scale_factor
def __call__(self, x: Tensor) -> Tensor:
assert len(x.shape) > 2 and len(x.shape) <= 5
(b, c), _lens = x.shape[:2], len(x.shape[2:])
tmp = x.reshape([b, c, -1] + [1] * _lens) * Tensor.ones(*[1, 1, 1] + [self.scale_factor] * _lens)
return tmp.reshape(list(x.shape) + [self.scale_factor] * _lens).permute([0, 1] + list(chain.from_iterable([[y+2, y+2+_lens] for y in range(_lens)]))).reshape([b, c] + [x * self.scale_factor for x in x.shape[2:]])
class Conv_Block():
def __init__(self, c1, c2, kernel_size=1, stride=1, groups=1, dilation=1, padding=None):
self.conv = Conv2d(c1,c2, kernel_size, stride, padding=autopad(kernel_size, padding, dilation), bias=False, groups=groups, dilation=dilation)
self.bn = BatchNorm2d(c2, eps=0.001)
def __call__(self, x):
return self.bn(self.conv(x)).silu()
class Bottleneck:
def __init__(self, c1, c2 , shortcut: bool, g=1, kernels: list = (3,3), channel_factor=0.5):
c_ = int(c2 * channel_factor)
self.cv1 = Conv_Block(c1, c_, kernel_size=kernels[0], stride=1, padding=None)
self.cv2 = Conv_Block(c_, c2, kernel_size=kernels[1], stride=1, padding=None, groups=g)
self.residual = c1 == c2 and shortcut
def __call__(self, x):
return x + self.cv2(self.cv1(x)) if self.residual else self.cv2(self.cv1(x))
class C2f:
def __init__(self, c1, c2, n=1, shortcut=False, g=1, e=0.5):
self.c = int(c2 * e)
self.cv1 = Conv_Block(c1, 2 * self.c, 1,)
self.cv2 = Conv_Block((2 + n) * self.c, c2, 1)
self.bottleneck = [Bottleneck(self.c, self.c, shortcut, g, kernels=[(3, 3), (3, 3)], channel_factor=1.0) for _ in range(n)]
def __call__(self, x):
y= list(self.cv1(x).chunk(2, 1))
y.extend(m(y[-1]) for m in self.bottleneck)
z = y[0]
for i in y[1:]: z = z.cat(i, dim=1)
return self.cv2(z)
class SPPF:
def __init__(self, c1, c2, k=5):
c_ = c1 // 2 # hidden channels
self.cv1 = Conv_Block(c1, c_, 1, 1, padding=None)
self.cv2 = Conv_Block(c_ * 4, c2, 1, 1, padding=None)
# TODO: this pads with 0s, whereas torch function pads with -infinity. This results in a < 2% difference in prediction which does not make a difference visually.
self.maxpool = lambda x : x.pad2d((k // 2, k // 2, k // 2, k // 2)).max_pool2d(kernel_size=k, stride=1)
def __call__(self, x):
x = self.cv1(x)
x2 = self.maxpool(x)
x3 = self.maxpool(x2)
x4 = self.maxpool(x3)
return self.cv2(x.cat(x2, x3, x4, dim=1))
class DFL:
def __init__(self, c1=16):
self.conv = Conv2d(c1, 1, 1, bias=False)
x = Tensor.arange(c1)
self.conv.weight.assign(x.reshape(1, c1, 1, 1))
self.c1 = c1
def __call__(self, x):
b, c, a = x.shape # batch, channels, anchors
return self.conv(x.reshape(b, 4, self.c1, a).transpose(2, 1).softmax(1)).reshape(b, 4, a)
#backbone
class Darknet:
def __init__(self, w, r, d):
self.b1 = [Conv_Block(c1=3, c2= int(64*w), kernel_size=3, stride=2, padding=1), Conv_Block(int(64*w), int(128*w), kernel_size=3, stride=2, padding=1)]
self.b2 = [C2f(c1=int(128*w), c2=int(128*w), n=round(3*d), shortcut=True), Conv_Block(int(128*w), int(256*w), 3, 2, 1), C2f(int(256*w), int(256*w), round(6*d), True)]
self.b3 = [Conv_Block(int(256*w), int(512*w), kernel_size=3, stride=2, padding=1), C2f(int(512*w), int(512*w), round(6*d), True)]
self.b4 = [Conv_Block(int(512*w), int(512*w*r), kernel_size=3, stride=2, padding=1), C2f(int(512*w*r), int(512*w*r), round(3*d), True)]
self.b5 = [SPPF(int(512*w*r), int(512*w*r), 5)]
def return_modules(self):
return [*self.b1, *self.b2, *self.b3, *self.b4, *self.b5]
def __call__(self, x):
x1 = x.sequential(self.b1)
x2 = x1.sequential(self.b2)
x3 = x2.sequential(self.b3)
x4 = x3.sequential(self.b4)
x5 = x4.sequential(self.b5)
return (x2, x3, x5)
#yolo fpn (neck)
class Yolov8NECK:
def __init__(self, w, r, d): #width_multiple, ratio_multiple, depth_multiple
self.up = Upsample(2, mode='nearest')
self.n1 = C2f(c1=int(512*w*(1+r)), c2=int(512*w), n=round(3*d), shortcut=False)
self.n2 = C2f(c1=int(768*w), c2=int(256*w), n=round(3*d), shortcut=False)
self.n3 = Conv_Block(c1=int(256*w), c2=int(256*w), kernel_size=3, stride=2, padding=1)
self.n4 = C2f(c1=int(768*w), c2=int(512*w), n=round(3*d), shortcut=False)
self.n5 = Conv_Block(c1=int(512* w), c2=int(512 * w), kernel_size=3, stride=2, padding=1)
self.n6 = C2f(c1=int(512*w*(1+r)), c2=int(512*w*r), n=round(3*d), shortcut=False)
def return_modules(self):
return [self.n1, self.n2, self.n3, self.n4, self.n5, self.n6]
def __call__(self, p3, p4, p5):
x = self.n1(self.up(p5).cat(p4, dim=1))
head_1 = self.n2(self.up(x).cat(p3, dim=1))
head_2 = self.n4(self.n3(head_1).cat(x, dim=1))
head_3 = self.n6(self.n5(head_2).cat(p5, dim=1))
return [head_1, head_2, head_3]
#task specific head.
class DetectionHead:
def __init__(self, nc=80, filters=()):
self.ch = 16
self.nc = nc # number of classes
self.nl = len(filters)
self.no = nc + self.ch * 4 #
self.stride = [8, 16, 32]
c1 = max(filters[0], self.nc)
c2 = max((filters[0] // 4, self.ch * 4))
self.dfl = DFL(self.ch)
self.cv3 = [[Conv_Block(x, c1, 3), Conv_Block(c1, c1, 3), Conv2d(c1, self.nc, 1)] for x in filters]
self.cv2 = [[Conv_Block(x, c2, 3), Conv_Block(c2, c2, 3), Conv2d(c2, 4 * self.ch, 1)] for x in filters]
def __call__(self, x):
for i in range(self.nl):
x[i] = (x[i].sequential(self.cv2[i]).cat(x[i].sequential(self.cv3[i]), dim=1))
self.anchors, self.strides = (x.transpose(0, 1) for x in make_anchors(x, self.stride, 0.5))
y = [(i.reshape(x[0].shape[0], self.no, -1)) for i in x]
x_cat = y[0].cat(y[1], y[2], dim=2)
box, cls = x_cat[:, :self.ch * 4], x_cat[:, self.ch * 4:]
dbox = dist2bbox(self.dfl(box), self.anchors.unsqueeze(0), xywh=True, dim=1) * self.strides
z = dbox.cat(cls.sigmoid(), dim=1)
return z
class YOLOv8:
def __init__(self, w, r, d, num_classes): #width_multiple, ratio_multiple, depth_multiple
self.net = Darknet(w, r, d)
self.fpn = Yolov8NECK(w, r, d)
self.head = DetectionHead(num_classes, filters=(int(256*w), int(512*w), int(512*w*r)))
def __call__(self, x):
x = self.net(x)
x = self.fpn(*x)
return self.head(x)
def return_all_trainable_modules(self):
backbone_modules = [*range(10)]
yolov8neck_modules = [12, 15, 16, 18, 19, 21]
yolov8_head_weights = [(22, self.head)]
return [*zip(backbone_modules, self.net.return_modules()), *zip(yolov8neck_modules, self.fpn.return_modules()), *yolov8_head_weights]
if __name__ == '__main__':
# usage : python3 yolov8.py "image_URL OR image_path" "v8 variant" (optional, n is default)
if len(sys.argv) < 2:
print("Error: Image URL or path not provided.")
sys.exit(1)
img_path = sys.argv[1]
yolo_variant = sys.argv[2] if len(sys.argv) >= 3 else (print("No variant given, so choosing 'n' as the default. Yolov8 has different variants, you can choose from ['n', 's', 'm', 'l', 'x']") or 'n')
print(f'running inference for YOLO version {yolo_variant}')
output_folder_path = './outputs_yolov8'
if not os.path.exists(output_folder_path):
os.makedirs(output_folder_path)
#absolute image path or URL
image_location = [np.frombuffer(io.BytesIO(fetch(img_path)).read(), np.uint8)]
image = [cv2.imdecode(image_location[0], 1)]
out_paths = [os.path.join(output_folder_path, img_path.split("/")[-1].split('.')[0] + "_output" + '.' + img_path.split("/")[-1].split('.')[1])]
if not isinstance(image[0], np.ndarray):
print('Error in image loading. Check your image file.')
sys.exit(1)
pre_processed_image = preprocess(image)
# Different YOLOv8 variants use different w , r, and d multiples. For a list , refer to this yaml file (the scales section) https://github.com/ultralytics/ultralytics/blob/main/ultralytics/models/v8/yolov8.yaml
depth, width, ratio = get_variant_multiples(yolo_variant)
yolo_infer = YOLOv8(w=width, r=ratio, d=depth, num_classes=80)
weights_location = Path(__file__).parent.parent / "weights" / f'yolov8{yolo_variant}.safetensors'
download_file(f'https://gitlab.com/r3sist/yolov8_weights/-/raw/master/yolov8{yolo_variant}.safetensors', weights_location)
state_dict = safe_load(weights_location)
load_state_dict(yolo_infer, state_dict)
st = time.time()
predictions = yolo_infer(pre_processed_image)
print(f'did inference in {int(round(((time.time() - st) * 1000)))}ms')
post_predictions = postprocess(preds=predictions, img=pre_processed_image, orig_imgs=image)
#v8 and v3 have same 80 class names for Object Detection
class_labels = fetch('https://raw.githubusercontent.com/pjreddie/darknet/master/data/coco.names')
class_labels = class_labels.decode('utf-8').split('\n')
draw_bounding_boxes_and_save(orig_img_paths=image_location, output_img_paths=out_paths, all_predictions=post_predictions, class_labels=class_labels)
# TODO for later:
# 1. Fix SPPF minor difference due to maxpool
# 2. AST exp overflow warning while on cpu
# 3. Make NMS faster
# 4. Add video inference and webcam support