import numpy as np import pandas as pd import cv2 import torch import warnings from detectron2.config import get_cfg from detectron2 import model_zoo from detectron2.engine import DefaultPredictor import ffmpeg import pytorchvideo from pytorchvideo.transforms.functional import ( uniform_temporal_subsample, short_side_scale_with_boxes, clip_boxes_to_image ) from torchvision.transforms._functional_video import normalize from pytorchvideo.data.ava import AvaLabeledVideoFramePaths from pytorchvideo.models.hub import slowfast_r50_detection # Another option is slow_r50_detection from visualization import VideoVisualizer # This method takes in an image and generates the bounding boxes for people in the image. def get_person_bboxes(inp_img, predictor): predictions = predictor(inp_img.cpu().detach().numpy())['instances'].to('cpu') boxes = predictions.pred_boxes if predictions.has("pred_boxes") else None scores = predictions.scores if predictions.has("scores") else None classes = np.array(predictions.pred_classes.tolist() if predictions.has("pred_classes") else None) predicted_boxes = boxes[np.logical_and(classes==0, scores>0.75 )].tensor.cpu() # only person return predicted_boxes def ava_inference_transform( clip, boxes, num_frames = 32, # 4 if using slowfast_r50_detection, change this to 32 crop_size = 256, data_mean = [0.45, 0.45, 0.45], data_std = [0.225, 0.225, 0.225], slow_fast_alpha = 4, # if using slowfast_r50_detection, change None to 4 device = 'cpu'): boxes = np.array(boxes) ori_boxes = boxes.copy() # Image [0, 255] -> [0, 1]. clip = uniform_temporal_subsample(clip, num_frames) clip = clip.float() clip = clip / 255.0 height, width = clip.shape[2], clip.shape[3] # The format of boxes is [x1, y1, x2, y2]. The input boxes are in the # range of [0, width] for x and [0,height] for y boxes = clip_boxes_to_image(boxes, height, width) # Resize short side to crop_size. Non-local and STRG uses 256. clip, boxes = short_side_scale_with_boxes(clip, size=crop_size, boxes=boxes) # Normalize images by mean and std. clip = normalize(clip, np.array(data_mean, dtype=np.float32), np.array(data_std, dtype=np.float32)) boxes = clip_boxes_to_image(boxes, clip.shape[2], clip.shape[3]) # Incase of slowfast, generate both pathways if slow_fast_alpha is not None: fast_pathway = clip # Perform temporal sampling from the fast pathway. slow_pathway = torch.index_select(clip, 1, torch.linspace( 0, clip.shape[1] - 1, clip.shape[1] // slow_fast_alpha).long()) clip = [slow_pathway.unsqueeze(0).to(device), fast_pathway.unsqueeze(0).to(device)] return clip, torch.from_numpy(boxes), ori_boxes # get video info def with_opencv(filename): video = cv2.VideoCapture(filename) frame_count = video.get(cv2.CAP_PROP_FRAME_COUNT) fps = video.get(cv2.CAP_PROP_FPS) s = round(frame_count / fps) video.release() return int(s), fps def slow_fast_train(file_path, gpu=False): device = 'cuda' if gpu else 'cpu' top_k = 1 video_model = slowfast_r50_detection(True) # Another option is slow_r50_detection(True) video_model = video_model.eval().to(device) cfg = get_cfg() cfg.merge_from_file(model_zoo.get_config_file("COCO-Detection/faster_rcnn_R_50_FPN_3x.yaml")) cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST = 0.55 # set threshold for this model cfg.MODEL.WEIGHTS = model_zoo.get_checkpoint_url("COCO-Detection/faster_rcnn_R_50_FPN_3x.yaml") cfg.MODEL.DEVICE = device predictor = DefaultPredictor(cfg) # Create an id to label name mapping label_map, allowed_class_ids = AvaLabeledVideoFramePaths.read_label_map('ava_action_list.pbtxt') # Create a video visualizer that can plot bounding boxes and visualize actions on bboxes. video_visualizer = VideoVisualizer(81, label_map, top_k=top_k, mode="thres",thres=0.5) #get top3 predictions show in each bounding box #preprocess video data encoded_vid = pytorchvideo.data.encoded_video.EncodedVideo.from_path(file_path) # Video predictions are generated each frame/second for the wholevideo. total_sec, fps = with_opencv(file_path) time_stamp_range = range(0, total_sec) # time stamps in video for which clip is sampled clip_duration = 1.0 # Duration of clip used for each inference step. gif_imgs = [] xleft, ytop, xright, ybottom = [], [], [], [] labels = [] time_frame = [] scores = [] for time_stamp in time_stamp_range: # Generate clip around the designated time stamps inp_imgs = encoded_vid.get_clip( time_stamp - clip_duration/2.0, time_stamp + clip_duration/2.0) inp_imgs = inp_imgs['video'] #if time_stamp % 15 == 0: # Generate people bbox predictions using Detectron2's off the self pre-trained predictor # We use the the middle image in each clip to generate the bounding boxes. inp_img = inp_imgs[:,inp_imgs.shape[1]//2,:,:] inp_img = inp_img.permute(1,2,0) # Predicted boxes are of the form List[(x_1, y_1, x_2, y_2)] predicted_boxes = get_person_bboxes(inp_img, predictor) if len(predicted_boxes) == 0: print("Skipping clip no frames detected at time stamp: ", time_stamp) continue # Preprocess clip and bounding boxes for video action recognition. inputs, inp_boxes, _ = ava_inference_transform(inp_imgs, predicted_boxes.numpy(), device=device) # Prepend data sample id for each bounding box. # For more details refere to the RoIAlign in Detectron2 inp_boxes = torch.cat([torch.zeros(inp_boxes.shape[0],1), inp_boxes], dim=1) # Generate actions predictions for the bounding boxes in the clip. # The model here takes in the pre-processed video clip and the detected bounding boxes. preds = video_model(inputs, inp_boxes.to(device)) #change inputs to inputs.unsqueeze(0).to(device) if using slow_r50 preds = preds.to('cpu') # The model is trained on AVA and AVA labels are 1 indexed so, prepend 0 to convert to 0 index. preds = torch.cat([torch.zeros(preds.shape[0],1), preds], dim=1) # Plot predictions on the video and save for later visualization. inp_imgs = inp_imgs.permute(1,2,3,0) inp_imgs = inp_imgs/255.0 out_img_pred = video_visualizer.draw_clip_range(inp_imgs, preds, predicted_boxes) gif_imgs += out_img_pred #format of bboxes(x_left, y_top, x_right, y_bottom) predicted_boxes_lst = predicted_boxes.tolist() topscores, topclasses = torch.topk(preds, k=1) topscores, topclasses = topscores.tolist(), topclasses.tolist() topclasses = np.concatenate(topclasses) topscores = np.concatenate(topscores) #add top 1 prediction of behaviors in each time step for i in range(len(predicted_boxes_lst)): xleft.append(predicted_boxes_lst[i][0]) ytop.append(predicted_boxes_lst[i][1]) xright.append(predicted_boxes_lst[i][2]) ybottom.append(predicted_boxes_lst[i][3]) labels.append(label_map.get(topclasses[i])) time_frame.append(time_stamp) scores.append(topscores[i]) print("Finished generating predictions.") # Generate Metadata file metadata = pd.DataFrame() metadata['frame'] = time_frame metadata['x_left'] = xleft metadata['y_top'] = ytop metadata['x_right'] = xright metadata['y_bottom'] = ybottom metadata['label'] = labels metadata['confidence'] = scores height, width = gif_imgs[0].shape[0], gif_imgs[0].shape[1] video_save_path = 'activity_recognition.mp4' video = cv2.VideoWriter(video_save_path, cv2.VideoWriter_fourcc(*'mp4v'), int(fps), (width, height)) for image in gif_imgs: img = (255*image).astype(np.uint8) img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) video.write(img) video.release() return video_save_path, metadata