import numpy as np import cv2 import pandas as pd import operator import matplotlib.pyplot as plt import os from sklearn.model_selection import train_test_split from tensorflow.keras.utils import Sequence from config import yolo_config def load_weights(model, weights_file_path): conv_layer_size = 110 conv_output_idxs = [93, 101, 109] with open(weights_file_path, 'rb') as file: major, minor, revision, seen, _ = np.fromfile(file, dtype=np.int32, count=5) bn_idx = 0 for conv_idx in range(conv_layer_size): conv_layer_name = f'conv2d_{conv_idx}' if conv_idx > 0 else 'conv2d' bn_layer_name = f'batch_normalization_{bn_idx}' if bn_idx > 0 else 'batch_normalization' conv_layer = model.get_layer(conv_layer_name) filters = conv_layer.filters kernel_size = conv_layer.kernel_size[0] input_dims = conv_layer.input_shape[-1] if conv_idx not in conv_output_idxs: # darknet bn layer weights: [beta, gamma, mean, variance] bn_weights = np.fromfile(file, dtype=np.float32, count=4 * filters) # tf bn layer weights: [gamma, beta, mean, variance] bn_weights = bn_weights.reshape((4, filters))[[1, 0, 2, 3]] bn_layer = model.get_layer(bn_layer_name) bn_idx += 1 else: conv_bias = np.fromfile(file, dtype=np.float32, count=filters) # darknet shape: (out_dim, input_dims, height, width) # tf shape: (height, width, input_dims, out_dim) conv_shape = (filters, input_dims, kernel_size, kernel_size) conv_weights = np.fromfile(file, dtype=np.float32, count=np.product(conv_shape)) conv_weights = conv_weights.reshape(conv_shape).transpose([2, 3, 1, 0]) if conv_idx not in conv_output_idxs: conv_layer.set_weights([conv_weights]) bn_layer.set_weights(bn_weights) else: conv_layer.set_weights([conv_weights, conv_bias]) if len(file.read()) == 0: print('all weights read') else: print(f'failed to read all weights, # of unread weights: {len(file.read())}') def get_detection_data(img, model_outputs, class_names): """ :param img: target raw image :param model_outputs: outputs from inference_model :param class_names: list of object class names :return: """ num_bboxes = model_outputs[-1][0] boxes, scores, classes = [output[0][:num_bboxes] for output in model_outputs[:-1]] h, w = img.shape[:2] df = pd.DataFrame(boxes, columns=['x1', 'y1', 'x2', 'y2']) df[['x1', 'x2']] = (df[['x1', 'x2']] * w).astype('int64') df[['y1', 'y2']] = (df[['y1', 'y2']] * h).astype('int64') df['class_name'] = np.array(class_names)[classes.astype('int64')] df['score'] = scores df['w'] = df['x2'] - df['x1'] df['h'] = df['y2'] - df['y1'] print(f'# of bboxes: {num_bboxes}') return df def read_annotation_lines(annotation_path, test_size=None, random_seed=5566): with open(annotation_path) as f: lines = f.readlines() if test_size: return train_test_split(lines, test_size=test_size, random_state=random_seed) else: return lines def draw_bbox(img, detections, cmap, random_color=True, figsize=(10, 10), show_img=True, show_text=True): """ Draw bounding boxes on the img. :param img: BGR img. :param detections: pandas DataFrame containing detections :param random_color: assign random color for each objects :param cmap: object colormap :param plot_img: if plot img with bboxes :return: None """ img = np.array(img) scale = max(img.shape[0:2]) / 416 line_width = int(2 * scale) for _, row in detections.iterrows(): x1, y1, x2, y2, cls, score, w, h = row.values color = list(np.random.random(size=3) * 255) if random_color else cmap[cls] cv2.rectangle(img, (x1, y1), (x2, y2), color, line_width) if show_text: text = f'{cls} {score:.2f}' font = cv2.FONT_HERSHEY_DUPLEX font_scale = max(0.3 * scale, 0.3) thickness = max(int(1 * scale), 1) (text_width, text_height) = cv2.getTextSize(text, font, fontScale=font_scale, thickness=thickness)[0] cv2.rectangle(img, (x1 - line_width//2, y1 - text_height), (x1 + text_width, y1), color, cv2.FILLED) cv2.putText(img, text, (x1, y1), font, font_scale, (255, 255, 255), thickness, cv2.LINE_AA) if show_img: plt.figure(figsize=figsize) plt.imshow(img) plt.show() return img class DataGenerator(Sequence): """ Generates data for Keras ref: https://stanford.edu/~shervine/blog/keras-how-to-generate-data-on-the-fly """ def __init__(self, annotation_lines, class_name_path, folder_path, max_boxes=100, shuffle=True): self.annotation_lines = annotation_lines self.class_name_path = class_name_path self.num_classes = len([line.strip() for line in open(class_name_path).readlines()]) self.num_gpu = yolo_config['num_gpu'] self.batch_size = yolo_config['batch_size'] * self.num_gpu self.target_img_size = yolo_config['img_size'] self.anchors = np.array(yolo_config['anchors']).reshape((9, 2)) self.shuffle = shuffle self.indexes = np.arange(len(self.annotation_lines)) self.folder_path = folder_path self.max_boxes = max_boxes self.on_epoch_end() def __len__(self): 'number of batches per epoch' return int(np.ceil(len(self.annotation_lines) / self.batch_size)) def __getitem__(self, index): 'Generate one batch of data' # Generate indexes of the batch idxs = self.indexes[index * self.batch_size:(index + 1) * self.batch_size] # Find list of IDs lines = [self.annotation_lines[i] for i in idxs] # Generate data X, y_tensor, y_bbox = self.__data_generation(lines) return [X, *y_tensor, y_bbox], np.zeros(len(lines)) def on_epoch_end(self): 'Updates indexes after each epoch' if self.shuffle: np.random.shuffle(self.indexes) def __data_generation(self, annotation_lines): """ Generates data containing batch_size samples :param annotation_lines: :return: """ X = np.empty((len(annotation_lines), *self.target_img_size), dtype=np.float32) y_bbox = np.empty((len(annotation_lines), self.max_boxes, 5), dtype=np.float32) # x1y1x2y2 for i, line in enumerate(annotation_lines): img_data, box_data = self.get_data(line) X[i] = img_data y_bbox[i] = box_data y_tensor, y_true_boxes_xywh = preprocess_true_boxes(y_bbox, self.target_img_size[:2], self.anchors, self.num_classes) return X, y_tensor, y_true_boxes_xywh def get_data(self, annotation_line): line = annotation_line.split() img_path = line[0] img = cv2.imread(os.path.join(self.folder_path, img_path))[:, :, ::-1] ih, iw = img.shape[:2] h, w, c = self.target_img_size boxes = np.array([np.array(list(map(float, box.split(',')))) for box in line[1:]], dtype=np.float32) # x1y1x2y2 scale_w, scale_h = w / iw, h / ih img = cv2.resize(img, (w, h)) image_data = np.array(img) / 255. # correct boxes coordinates box_data = np.zeros((self.max_boxes, 5)) if len(boxes) > 0: np.random.shuffle(boxes) boxes = boxes[:self.max_boxes] boxes[:, [0, 2]] = boxes[:, [0, 2]] * scale_w # + dx boxes[:, [1, 3]] = boxes[:, [1, 3]] * scale_h # + dy box_data[:len(boxes)] = boxes return image_data, box_data def preprocess_true_boxes(true_boxes, input_shape, anchors, num_classes): '''Preprocess true boxes to training input format Parameters ---------- true_boxes: array, shape=(bs, max boxes per img, 5) Absolute x_min, y_min, x_max, y_max, class_id relative to input_shape. input_shape: array-like, hw, multiples of 32 anchors: array, shape=(N, 2), (9, wh) num_classes: int Returns ------- y_true: list of array, shape like yolo_outputs, xywh are reletive value ''' num_stages = 3 # default setting for yolo, tiny yolo will be 2 anchor_mask = [[0, 1, 2], [3, 4, 5], [6, 7, 8]] bbox_per_grid = 3 true_boxes = np.array(true_boxes, dtype='float32') true_boxes_abs = np.array(true_boxes, dtype='float32') input_shape = np.array(input_shape, dtype='int32') true_boxes_xy = (true_boxes_abs[..., 0:2] + true_boxes_abs[..., 2:4]) // 2 # (100, 2) true_boxes_wh = true_boxes_abs[..., 2:4] - true_boxes_abs[..., 0:2] # (100, 2) # Normalize x,y,w, h, relative to img size -> (0~1) true_boxes[..., 0:2] = true_boxes_xy/input_shape[::-1] # xy true_boxes[..., 2:4] = true_boxes_wh/input_shape[::-1] # wh bs = true_boxes.shape[0] grid_sizes = [input_shape//{0:8, 1:16, 2:32}[stage] for stage in range(num_stages)] y_true = [np.zeros((bs, grid_sizes[s][0], grid_sizes[s][1], bbox_per_grid, 5+num_classes), dtype='float32') for s in range(num_stages)] # [(?, 52, 52, 3, 5+num_classes) (?, 26, 26, 3, 5+num_classes) (?, 13, 13, 3, 5+num_classes) ] y_true_boxes_xywh = np.concatenate((true_boxes_xy, true_boxes_wh), axis=-1) # Expand dim to apply broadcasting. anchors = np.expand_dims(anchors, 0) # (1, 9 , 2) anchor_maxes = anchors / 2. # (1, 9 , 2) anchor_mins = -anchor_maxes # (1, 9 , 2) valid_mask = true_boxes_wh[..., 0] > 0 # (1, 100) for batch_idx in range(bs): # Discard zero rows. wh = true_boxes_wh[batch_idx, valid_mask[batch_idx]] # (# of bbox, 2) num_boxes = len(wh) if num_boxes == 0: continue wh = np.expand_dims(wh, -2) # (# of bbox, 1, 2) box_maxes = wh / 2. # (# of bbox, 1, 2) box_mins = -box_maxes # (# of bbox, 1, 2) # Compute IoU between each anchors and true boxes for responsibility assignment intersect_mins = np.maximum(box_mins, anchor_mins) # (# of bbox, 9, 2) intersect_maxes = np.minimum(box_maxes, anchor_maxes) intersect_wh = np.maximum(intersect_maxes - intersect_mins, 0.) intersect_area = np.prod(intersect_wh, axis=-1) # (9,) box_area = wh[..., 0] * wh[..., 1] # (# of bbox, 1) anchor_area = anchors[..., 0] * anchors[..., 1] # (1, 9) iou = intersect_area / (box_area + anchor_area - intersect_area) # (# of bbox, 9) # Find best anchor for each true box best_anchors = np.argmax(iou, axis=-1) # (# of bbox,) for box_idx in range(num_boxes): best_anchor = best_anchors[box_idx] for stage in range(num_stages): if best_anchor in anchor_mask[stage]: x_offset = true_boxes[batch_idx, box_idx, 0]*grid_sizes[stage][1] y_offset = true_boxes[batch_idx, box_idx, 1]*grid_sizes[stage][0] # Grid Index grid_col = np.floor(x_offset).astype('int32') grid_row = np.floor(y_offset).astype('int32') anchor_idx = anchor_mask[stage].index(best_anchor) class_idx = true_boxes[batch_idx, box_idx, 4].astype('int32') # y_true[stage][batch_idx, grid_row, grid_col, anchor_idx, 0] = x_offset - grid_col # x # y_true[stage][batch_idx, grid_row, grid_col, anchor_idx, 1] = y_offset - grid_row # y # y_true[stage][batch_idx, grid_row, grid_col, anchor_idx, :4] = true_boxes_abs[batch_idx, box_idx, :4] # abs xywh y_true[stage][batch_idx, grid_row, grid_col, anchor_idx, :2] = true_boxes_xy[batch_idx, box_idx, :] # abs xy y_true[stage][batch_idx, grid_row, grid_col, anchor_idx, 2:4] = true_boxes_wh[batch_idx, box_idx, :] # abs wh y_true[stage][batch_idx, grid_row, grid_col, anchor_idx, 4] = 1 # confidence y_true[stage][batch_idx, grid_row, grid_col, anchor_idx, 5+class_idx] = 1 # one-hot encoding # smooth # onehot = np.zeros(num_classes, dtype=np.float) # onehot[class_idx] = 1.0 # uniform_distribution = np.full(num_classes, 1.0 / num_classes) # delta = 0.01 # smooth_onehot = onehot * (1 - delta) + delta * uniform_distribution # y_true[stage][batch_idx, grid_row, grid_col, anchor_idx, 5:] = smooth_onehot return y_true, y_true_boxes_xywh """ Calculate the AP given the recall and precision array 1st) We compute a version of the measured precision/recall curve with precision monotonically decreasing 2nd) We compute the AP as the area under this curve by numerical integration. """ def voc_ap(rec, prec): """ --- Official matlab code VOC2012--- mrec=[0 ; rec ; 1]; mpre=[0 ; prec ; 0]; for i=numel(mpre)-1:-1:1 mpre(i)=max(mpre(i),mpre(i+1)); end i=find(mrec(2:end)~=mrec(1:end-1))+1; ap=sum((mrec(i)-mrec(i-1)).*mpre(i)); """ rec.insert(0, 0.0) # insert 0.0 at begining of list rec.append(1.0) # insert 1.0 at end of list mrec = rec[:] prec.insert(0, 0.0) # insert 0.0 at begining of list prec.append(0.0) # insert 0.0 at end of list mpre = prec[:] """ This part makes the precision monotonically decreasing (goes from the end to the beginning) matlab: for i=numel(mpre)-1:-1:1 mpre(i)=max(mpre(i),mpre(i+1)); """ # matlab indexes start in 1 but python in 0, so I have to do: # range(start=(len(mpre) - 2), end=0, step=-1) # also the python function range excludes the end, resulting in: # range(start=(len(mpre) - 2), end=-1, step=-1) for i in range(len(mpre)-2, -1, -1): mpre[i] = max(mpre[i], mpre[i+1]) """ This part creates a list of indexes where the recall changes matlab: i=find(mrec(2:end)~=mrec(1:end-1))+1; """ i_list = [] for i in range(1, len(mrec)): if mrec[i] != mrec[i-1]: i_list.append(i) # if it was matlab would be i + 1 """ The Average Precision (AP) is the area under the curve (numerical integration) matlab: ap=sum((mrec(i)-mrec(i-1)).*mpre(i)); """ ap = 0.0 for i in i_list: ap += ((mrec[i]-mrec[i-1])*mpre[i]) return ap, mrec, mpre """ Draw plot using Matplotlib """ def draw_plot_func(dictionary, n_classes, window_title, plot_title, x_label, output_path, to_show, plot_color, true_p_bar): # sort the dictionary by decreasing value, into a list of tuples sorted_dic_by_value = sorted(dictionary.items(), key=operator.itemgetter(1)) print(sorted_dic_by_value) # unpacking the list of tuples into two lists sorted_keys, sorted_values = zip(*sorted_dic_by_value) # if true_p_bar != "": """ Special case to draw in: - green -> TP: True Positives (object detected and matches ground-truth) - red -> FP: False Positives (object detected but does not match ground-truth) - pink -> FN: False Negatives (object not detected but present in the ground-truth) """ fp_sorted = [] tp_sorted = [] for key in sorted_keys: fp_sorted.append(dictionary[key] - true_p_bar[key]) tp_sorted.append(true_p_bar[key]) plt.barh(range(n_classes), fp_sorted, align='center', color='crimson', label='False Positive') plt.barh(range(n_classes), tp_sorted, align='center', color='forestgreen', label='True Positive', left=fp_sorted) # add legend plt.legend(loc='lower right') """ Write number on side of bar """ fig = plt.gcf() # gcf - get current figure axes = plt.gca() r = fig.canvas.get_renderer() for i, val in enumerate(sorted_values): fp_val = fp_sorted[i] tp_val = tp_sorted[i] fp_str_val = " " + str(fp_val) tp_str_val = fp_str_val + " " + str(tp_val) # trick to paint multicolor with offset: # first paint everything and then repaint the first number t = plt.text(val, i, tp_str_val, color='forestgreen', va='center', fontweight='bold') plt.text(val, i, fp_str_val, color='crimson', va='center', fontweight='bold') if i == (len(sorted_values)-1): # largest bar adjust_axes(r, t, fig, axes) else: plt.barh(range(n_classes), sorted_values, color=plot_color) """ Write number on side of bar """ fig = plt.gcf() # gcf - get current figure axes = plt.gca() r = fig.canvas.get_renderer() for i, val in enumerate(sorted_values): str_val = " " + str(val) # add a space before if val < 1.0: str_val = " {0:.2f}".format(val) t = plt.text(val, i, str_val, color=plot_color, va='center', fontweight='bold') # re-set axes to show number inside the figure if i == (len(sorted_values)-1): # largest bar adjust_axes(r, t, fig, axes) # set window title fig.canvas.set_window_title(window_title) # write classes in y axis tick_font_size = 12 plt.yticks(range(n_classes), sorted_keys, fontsize=tick_font_size) """ Re-scale height accordingly """ init_height = fig.get_figheight() # comput the matrix height in points and inches dpi = fig.dpi height_pt = n_classes * (tick_font_size * 1.4) # 1.4 (some spacing) height_in = height_pt / dpi # compute the required figure height top_margin = 0.15 # in percentage of the figure height bottom_margin = 0.05 # in percentage of the figure height figure_height = height_in / (1 - top_margin - bottom_margin) # set new height if figure_height > init_height: fig.set_figheight(figure_height) # set plot title plt.title(plot_title, fontsize=14) # set axis titles # plt.xlabel('classes') plt.xlabel(x_label, fontsize='large') # adjust size of window fig.tight_layout() # save the plot fig.savefig(output_path) # show image # if to_show: plt.show() # close the plot # plt.close() """ Plot - adjust axes """ def adjust_axes(r, t, fig, axes): # get text width for re-scaling bb = t.get_window_extent(renderer=r) text_width_inches = bb.width / fig.dpi # get axis width in inches current_fig_width = fig.get_figwidth() new_fig_width = current_fig_width + text_width_inches propotion = new_fig_width / current_fig_width # get axis limit x_lim = axes.get_xlim() axes.set_xlim([x_lim[0], x_lim[1]*propotion]) def read_txt_to_list(path): # open txt file lines to a list with open(path) as f: content = f.readlines() # remove whitespace characters like `\n` at the end of each line content = [x.strip() for x in content] return content