import datetime import logging import os import platform import subprocess import time from pathlib import Path import re import glob import random import cv2 import numpy as np import torch import torchvision logger = logging.getLogger(__name__) def git_describe(path=Path(__file__).parent): # path must be a directory # return human-readable git description, i.e. v5.0-5-g3e25f1e https://git-scm.com/docs/git-describe s = f'git -C {path} describe --tags --long --always' try: return subprocess.check_output(s, shell=True, stderr=subprocess.STDOUT).decode()[:-1] except subprocess.CalledProcessError as e: return '' # not a git repository def date_modified(path=__file__): # return human-readable file modification date, i.e. '2021-3-26' t = datetime.datetime.fromtimestamp(Path(path).stat().st_mtime) return f'{t.year}-{t.month}-{t.day}' def select_device(device='', batch_size=None): # device = 'cpu' or '0' or '0,1,2,3' s = f'YOLOPv2 🚀 {git_describe() or date_modified()} torch {torch.__version__} ' # string cpu = device.lower() == 'cpu' if cpu: os.environ['CUDA_VISIBLE_DEVICES'] = '-1' # force torch.cuda.is_available() = False elif device: # non-cpu device requested os.environ['CUDA_VISIBLE_DEVICES'] = device # set environment variable assert torch.cuda.is_available(), f'CUDA unavailable, invalid device {device} requested' # check availability cuda = not cpu and torch.cuda.is_available() if cuda: n = torch.cuda.device_count() if n > 1 and batch_size: # check that batch_size is compatible with device_count assert batch_size % n == 0, f'batch-size {batch_size} not multiple of GPU count {n}' space = ' ' * len(s) for i, d in enumerate(device.split(',') if device else range(n)): p = torch.cuda.get_device_properties(i) s += f"{'' if i == 0 else space}CUDA:{d} ({p.name}, {p.total_memory / 1024 ** 2}MB)\n" # bytes to MB else: s += 'CPU\n' logger.info(s.encode().decode('ascii', 'ignore') if platform.system() == 'Windows' else s) # emoji-safe return torch.device('cuda:0' if cuda else 'cpu') def time_synchronized(): # pytorch-accurate time if torch.cuda.is_available(): torch.cuda.synchronize() return time.time() def plot_one_box(x, img, color=None, label=None, line_thickness=3): # Plots one bounding box on image img tl = line_thickness or round(0.002 * (img.shape[0] + img.shape[1]) / 2) + 1 # line/font thickness color = color or [random.randint(0, 255) for _ in range(3)] c1, c2 = (int(x[0]), int(x[1])), (int(x[2]), int(x[3])) cv2.rectangle(img, c1, c2, [0,255,255], thickness=2, lineType=cv2.LINE_AA) if label: tf = max(tl - 1, 1) # font thickness t_size = cv2.getTextSize(label, 0, fontScale=tl / 3, thickness=tf)[0] c2 = c1[0] + t_size[0], c1[1] - t_size[1] - 3 class SegmentationMetric(object): ''' imgLabel [batch_size, height(144), width(256)] confusionMatrix [[0(TN),1(FP)], [2(FN),3(TP)]] ''' def __init__(self, numClass): self.numClass = numClass self.confusionMatrix = np.zeros((self.numClass,)*2) def pixelAccuracy(self): # return all class overall pixel accuracy # acc = (TP + TN) / (TP + TN + FP + TN) acc = np.diag(self.confusionMatrix).sum() / self.confusionMatrix.sum() return acc def lineAccuracy(self): Acc = np.diag(self.confusionMatrix) / (self.confusionMatrix.sum(axis=1) + 1e-12) return Acc[1] def classPixelAccuracy(self): # return each category pixel accuracy(A more accurate way to call it precision) # acc = (TP) / TP + FP classAcc = np.diag(self.confusionMatrix) / (self.confusionMatrix.sum(axis=0) + 1e-12) return classAcc def meanPixelAccuracy(self): classAcc = self.classPixelAccuracy() meanAcc = np.nanmean(classAcc) return meanAcc def meanIntersectionOverUnion(self): # Intersection = TP Union = TP + FP + FN # IoU = TP / (TP + FP + FN) intersection = np.diag(self.confusionMatrix) union = np.sum(self.confusionMatrix, axis=1) + np.sum(self.confusionMatrix, axis=0) - np.diag(self.confusionMatrix) IoU = intersection / union IoU[np.isnan(IoU)] = 0 mIoU = np.nanmean(IoU) return mIoU def IntersectionOverUnion(self): intersection = np.diag(self.confusionMatrix) union = np.sum(self.confusionMatrix, axis=1) + np.sum(self.confusionMatrix, axis=0) - np.diag(self.confusionMatrix) IoU = intersection / union IoU[np.isnan(IoU)] = 0 return IoU[1] def genConfusionMatrix(self, imgPredict, imgLabel): # remove classes from unlabeled pixels in gt image and predict # print(imgLabel.shape) mask = (imgLabel >= 0) & (imgLabel < self.numClass) label = self.numClass * imgLabel[mask] + imgPredict[mask] count = np.bincount(label, minlength=self.numClass**2) confusionMatrix = count.reshape(self.numClass, self.numClass) return confusionMatrix def Frequency_Weighted_Intersection_over_Union(self): # FWIOU = [(TP+FN)/(TP+FP+TN+FN)] *[TP / (TP + FP + FN)] freq = np.sum(self.confusionMatrix, axis=1) / np.sum(self.confusionMatrix) iu = np.diag(self.confusionMatrix) / ( np.sum(self.confusionMatrix, axis=1) + np.sum(self.confusionMatrix, axis=0) - np.diag(self.confusionMatrix)) FWIoU = (freq[freq > 0] * iu[freq > 0]).sum() return FWIoU def addBatch(self, imgPredict, imgLabel): assert imgPredict.shape == imgLabel.shape self.confusionMatrix += self.genConfusionMatrix(imgPredict, imgLabel) def reset(self): self.confusionMatrix = np.zeros((self.numClass, self.numClass)) class AverageMeter(object): """Computes and stores the average and current value""" def __init__(self): self.reset() def reset(self): self.val = 0 self.avg = 0 self.sum = 0 self.count = 0 def update(self, val, n=1): self.val = val self.sum += val * n self.count += n self.avg = self.sum / self.count if self.count != 0 else 0 def _make_grid(nx=20, ny=20): yv, xv = torch.meshgrid([torch.arange(ny), torch.arange(nx)]) return torch.stack((xv, yv), 2).view((1, 1, ny, nx, 2)).float() def split_for_trace_model(pred = None, anchor_grid = None): z = [] st = [8,16,32] for i in range(3): bs, _, ny, nx = pred[i].shape pred[i] = pred[i].view(bs, 3, 85, ny, nx).permute(0, 1, 3, 4, 2).contiguous() y = pred[i].sigmoid() gr = _make_grid(nx, ny).to(pred[i].device) y[..., 0:2] = (y[..., 0:2] * 2. - 0.5 + gr) * st[i] # xy y[..., 2:4] = (y[..., 2:4] * 2) ** 2 * anchor_grid[i] # wh z.append(y.view(bs, -1, 85)) pred = torch.cat(z, 1) return pred def show_seg_result(img, result, palette=None,is_demo=False): if palette is None: palette = np.random.randint( 0, 255, size=(3, 3)) palette[0] = [0, 0, 0] palette[1] = [0, 255, 0] palette[2] = [255, 0, 0] palette = np.array(palette) assert palette.shape[0] == 3 # len(classes) assert palette.shape[1] == 3 assert len(palette.shape) == 2 if not is_demo: color_seg = np.zeros((result.shape[0], result.shape[1], 3), dtype=np.uint8) for label, color in enumerate(palette): color_seg[result == label, :] = color else: color_area = np.zeros((result[0].shape[0], result[0].shape[1], 3), dtype=np.uint8) color_area[result[0] == 1] = [0, 255, 0] color_area[result[1] ==1] = [255, 0, 0] color_seg = color_area # convert to BGR color_seg = color_seg[..., ::-1] # print(color_seg.shape) color_mask = np.mean(color_seg, 2) img[color_mask != 0] = img[color_mask != 0] * 0.5 + color_seg[color_mask != 0] * 0.5 # img = img * 0.5 + color_seg * 0.5 #img = img.astype(np.uint8) #img = cv2.resize(img, (1280,720), interpolation=cv2.INTER_LINEAR) return def increment_path(path, exist_ok=True, sep=''): # Increment path, i.e. runs/exp --> runs/exp{sep}0, runs/exp{sep}1 etc. path = Path(path) # os-agnostic if (path.exists() and exist_ok) or (not path.exists()): return str(path) else: dirs = glob.glob(f"{path}{sep}*") # similar paths matches = [re.search(rf"%s{sep}(\d+)" % path.stem, d) for d in dirs] i = [int(m.groups()[0]) for m in matches if m] # indices n = max(i) + 1 if i else 2 # increment number return f"{path}{sep}{n}" # update path def scale_coords(img1_shape, coords, img0_shape, ratio_pad=None): # Rescale coords (xyxy) from img1_shape to img0_shape if ratio_pad is None: # calculate from img0_shape gain = min(img1_shape[0] / img0_shape[0], img1_shape[1] / img0_shape[1]) # gain = old / new pad = (img1_shape[1] - img0_shape[1] * gain) / 2, (img1_shape[0] - img0_shape[0] * gain) / 2 # wh padding else: gain = ratio_pad[0][0] pad = ratio_pad[1] coords[:, [0, 2]] -= pad[0] # x padding coords[:, [1, 3]] -= pad[1] # y padding coords[:, :4] /= gain clip_coords(coords, img0_shape) return coords def clip_coords(boxes, img_shape): # Clip bounding xyxy bounding boxes to image shape (height, width) boxes[:, 0].clamp_(0, img_shape[1]) # x1 boxes[:, 1].clamp_(0, img_shape[0]) # y1 boxes[:, 2].clamp_(0, img_shape[1]) # x2 boxes[:, 3].clamp_(0, img_shape[0]) # y2 def set_logging(rank=-1): logging.basicConfig( format="%(message)s", level=logging.INFO if rank in [-1, 0] else logging.WARN) def xywh2xyxy(x): # Convert nx4 boxes from [x, y, w, h] to [x1, y1, x2, y2] where xy1=top-left, xy2=bottom-right y = x.clone() if isinstance(x, torch.Tensor) else np.copy(x) y[:, 0] = x[:, 0] - x[:, 2] / 2 # top left x y[:, 1] = x[:, 1] - x[:, 3] / 2 # top left y y[:, 2] = x[:, 0] + x[:, 2] / 2 # bottom right x y[:, 3] = x[:, 1] + x[:, 3] / 2 # bottom right y return y def xyxy2xywh(x): # Convert nx4 boxes from [x1, y1, x2, y2] to [x, y, w, h] where xy1=top-left, xy2=bottom-right y = x.clone() if isinstance(x, torch.Tensor) else np.copy(x) y[:, 0] = (x[:, 0] + x[:, 2]) / 2 # x center y[:, 1] = (x[:, 1] + x[:, 3]) / 2 # y center y[:, 2] = x[:, 2] - x[:, 0] # width y[:, 3] = x[:, 3] - x[:, 1] # height return y def non_max_suppression(prediction, conf_thres=0.25, iou_thres=0.45, classes=None, agnostic=False, multi_label=False, labels=()): """Runs Non-Maximum Suppression (NMS) on inference results Returns: list of detections, on (n,6) tensor per image [xyxy, conf, cls] """ nc = prediction.shape[2] - 5 # number of classes xc = prediction[..., 4] > conf_thres # candidates # Settings min_wh, max_wh = 2, 4096 # (pixels) minimum and maximum box width and height max_det = 300 # maximum number of detections per image max_nms = 30000 # maximum number of boxes into torchvision.ops.nms() time_limit = 10.0 # seconds to quit after redundant = True # require redundant detections multi_label &= nc > 1 # multiple labels per box (adds 0.5ms/img) merge = False # use merge-NMS t = time.time() output = [torch.zeros((0, 6), device=prediction.device)] * prediction.shape[0] for xi, x in enumerate(prediction): # image index, image inference # Apply constraints # x[((x[..., 2:4] < min_wh) | (x[..., 2:4] > max_wh)).any(1), 4] = 0 # width-height x = x[xc[xi]] # confidence # Cat apriori labels if autolabelling if labels and len(labels[xi]): l = labels[xi] v = torch.zeros((len(l), nc + 5), device=x.device) v[:, :4] = l[:, 1:5] # box v[:, 4] = 1.0 # conf v[range(len(l)), l[:, 0].long() + 5] = 1.0 # cls x = torch.cat((x, v), 0) # If none remain process next image if not x.shape[0]: continue # Compute conf x[:, 5:] *= x[:, 4:5] # conf = obj_conf * cls_conf # Box (center x, center y, width, height) to (x1, y1, x2, y2) box = xywh2xyxy(x[:, :4]) # Detections matrix nx6 (xyxy, conf, cls) if multi_label: i, j = (x[:, 5:] > conf_thres).nonzero(as_tuple=False).T x = torch.cat((box[i], x[i, j + 5, None], j[:, None].float()), 1) else: # best class only conf, j = x[:, 5:].max(1, keepdim=True) x = torch.cat((box, conf, j.float()), 1)[conf.view(-1) > conf_thres] # Filter by class if classes is not None: x = x[(x[:, 5:6] == torch.tensor(classes, device=x.device)).any(1)] # Apply finite constraint # if not torch.isfinite(x).all(): # x = x[torch.isfinite(x).all(1)] # Check shape n = x.shape[0] # number of boxes if not n: # no boxes continue elif n > max_nms: # excess boxes x = x[x[:, 4].argsort(descending=True)[:max_nms]] # sort by confidence # Batched NMS c = x[:, 5:6] * (0 if agnostic else max_wh) # classes boxes, scores = x[:, :4] + c, x[:, 4] # boxes (offset by class), scores i = torchvision.ops.nms(boxes, scores, iou_thres) # NMS if i.shape[0] > max_det: # limit detections i = i[:max_det] if merge and (1 < n < 3E3): # Merge NMS (boxes merged using weighted mean) # update boxes as boxes(i,4) = weights(i,n) * boxes(n,4) iou = box_iou(boxes[i], boxes) > iou_thres # iou matrix weights = iou * scores[None] # box weights x[i, :4] = torch.mm(weights, x[:, :4]).float() / weights.sum(1, keepdim=True) # merged boxes if redundant: i = i[iou.sum(1) > 1] # require redundancy output[xi] = x[i] if (time.time() - t) > time_limit: print(f'WARNING: NMS time limit {time_limit}s exceeded') break # time limit exceeded return output def box_iou(box1, box2): # https://github.com/pytorch/vision/blob/master/torchvision/ops/boxes.py """ Return intersection-over-union (Jaccard index) of boxes. Both sets of boxes are expected to be in (x1, y1, x2, y2) format. Arguments: box1 (Tensor[N, 4]) box2 (Tensor[M, 4]) Returns: iou (Tensor[N, M]): the NxM matrix containing the pairwise IoU values for every element in boxes1 and boxes2 """ def box_area(box): # box = 4xn return (box[2] - box[0]) * (box[3] - box[1]) area1 = box_area(box1.T) area2 = box_area(box2.T) # inter(N,M) = (rb(N,M,2) - lt(N,M,2)).clamp(0).prod(2) inter = (torch.min(box1[:, None, 2:], box2[:, 2:]) - torch.max(box1[:, None, :2], box2[:, :2])).clamp(0).prod(2) return inter / (area1[:, None] + area2 - inter) # iou = inter / (area1 + area2 - inter) class LoadImages: # for inference def __init__(self, path, img_size=640, stride=32): p = str(Path(path).absolute()) # os-agnostic absolute path if '*' in p: files = sorted(glob.glob(p, recursive=True)) # glob elif os.path.isdir(p): files = sorted(glob.glob(os.path.join(p, '*.*'))) # dir elif os.path.isfile(p): files = [p] # files else: raise Exception(f'ERROR: {p} does not exist') img_formats = ['bmp', 'jpg', 'jpeg', 'png', 'tif', 'tiff', 'dng', 'webp', 'mpo'] # acceptable image suffixes vid_formats = ['mov', 'avi', 'mp4', 'mpg', 'mpeg', 'm4v', 'wmv', 'mkv'] # acceptable video suffixes images = [x for x in files if x.split('.')[-1].lower() in img_formats] videos = [x for x in files if x.split('.')[-1].lower() in vid_formats] ni, nv = len(images), len(videos) self.img_size = img_size self.stride = stride self.files = images + videos self.nf = ni + nv # number of files self.video_flag = [False] * ni + [True] * nv self.mode = 'image' if any(videos): self.new_video(videos[0]) # new video else: self.cap = None assert self.nf > 0, f'No images or videos found in {p}. ' \ f'Supported formats are:\nimages: {img_formats}\nvideos: {vid_formats}' def __iter__(self): self.count = 0 return self def __next__(self): if self.count == self.nf: raise StopIteration path = self.files[self.count] if self.video_flag[self.count]: # Read video self.mode = 'video' ret_val, img0 = self.cap.read() if not ret_val: self.count += 1 self.cap.release() if self.count == self.nf: # last video raise StopIteration else: path = self.files[self.count] self.new_video(path) ret_val, img0 = self.cap.read() self.frame += 1 print(f'video {self.count + 1}/{self.nf} ({self.frame}/{self.nframes}) {path}: ', end='') else: # Read image self.count += 1 img0 = cv2.imread(path) # BGR assert img0 is not None, 'Image Not Found ' + path #print(f'image {self.count}/{self.nf} {path}: ', end='') # Padded resize img0 = cv2.resize(img0, (1280,720), interpolation=cv2.INTER_LINEAR) img = letterbox(img0, self.img_size, stride=self.stride)[0] # Convert img = img[:, :, ::-1].transpose(2, 0, 1) # BGR to RGB, to 3x416x416 img = np.ascontiguousarray(img) return path, img, img0, self.cap def new_video(self, path): self.frame = 0 self.cap = cv2.VideoCapture(path) self.nframes = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT)) def __len__(self): return self.nf # number of files def letterbox(img, new_shape=(640, 640), color=(114, 114, 114), auto=True, scaleFill=False, scaleup=True, stride=32): # Resize and pad image while meeting stride-multiple constraints shape = img.shape[:2] # current shape [height, width] if isinstance(new_shape, int): new_shape = (new_shape, new_shape) #print(sem_img.shape) # Scale ratio (new / old) r = min(new_shape[0] / shape[0], new_shape[1] / shape[1]) if not scaleup: # only scale down, do not scale up (for better test mAP) r = min(r, 1.0) # Compute padding ratio = r, r # width, height ratios new_unpad = int(round(shape[1] * r)), int(round(shape[0] * r)) dw, dh = new_shape[1] - new_unpad[0], new_shape[0] - new_unpad[1] # wh padding if auto: # minimum rectangle dw, dh = np.mod(dw, stride), np.mod(dh, stride) # wh padding elif scaleFill: # stretch dw, dh = 0.0, 0.0 new_unpad = (new_shape[1], new_shape[0]) ratio = new_shape[1] / shape[1], new_shape[0] / shape[0] # width, height ratios dw /= 2 # divide padding into 2 sides dh /= 2 if shape[::-1] != new_unpad: # resize img = cv2.resize(img, new_unpad, interpolation=cv2.INTER_LINEAR) top, bottom = int(round(dh - 0.1)), int(round(dh + 0.1)) left, right = int(round(dw - 0.1)), int(round(dw + 0.1)) img = cv2.copyMakeBorder(img, top, bottom, left, right, cv2.BORDER_CONSTANT, value=color) # add border return img, ratio, (dw, dh) def driving_area_mask(seg = None): da_predict = seg[:, :, 12:372,:] da_seg_mask = torch.nn.functional.interpolate(da_predict, scale_factor=2, mode='bilinear') _, da_seg_mask = torch.max(da_seg_mask, 1) da_seg_mask = da_seg_mask.int().squeeze().cpu().numpy() return da_seg_mask def lane_line_mask(ll = None): ll_predict = ll[:, :, 12:372,:] ll_seg_mask = torch.nn.functional.interpolate(ll_predict, scale_factor=2, mode='bilinear') ll_seg_mask = torch.round(ll_seg_mask).squeeze(1) ll_seg_mask = ll_seg_mask.int().squeeze().cpu().numpy() return ll_seg_mask