CaesarCloudSync
CaesarAI Fixed I think
4b734ff
raw
history blame
8.65 kB
import cv2
import numpy as np
import os
import time
import ffmpeg
class CaesarYolo:
def __init__(self) -> None:
self.CONFIDENCE = 0.5
self.SCORE_THRESHOLD = 0.5
self.IOU_THRESHOLD = 0.5
self.current_dir = os.path.realpath(__file__).replace(f"/CaesarYolo.py","")
config_path = f"{self.current_dir}/cfg/yolov3.cfg"
weights_path = f"{self.current_dir}/weights/yolov3.weights"
self.font_scale = 1
self.thickness = 1
self.LABELS = open(f"{self.current_dir}/data/coco.names").read().strip().split("\n")
self.COLORS = np.random.randint(0, 255, size=(len(self.LABELS), 3), dtype="uint8")
self.net = cv2.dnn.readNetFromDarknet(config_path, weights_path)
self.ln = self.net.getLayerNames()
try:
self.ln = [self.ln[i[0] - 1] for i in self.net.getUnconnectedOutLayers()]
except IndexError:
# in case getUnconnectedOutLayers() returns 1D array when CUDA isn't available
self.ln = [self.ln[i - 1] for i in self.net.getUnconnectedOutLayers()]
@staticmethod
def compress_video(video_full_path, output_file_name, target_size):
# Reference: https://en.wikipedia.org/wiki/Bit_rate#Encoding_bit_rate
min_audio_bitrate = 32000
max_audio_bitrate = 256000
probe = ffmpeg.probe(video_full_path)
# Video duration, in s.
duration = float(probe['format']['duration'])
# Audio bitrate, in bps.
audio_bitrate = float(next((s for s in probe['streams'] if s['codec_type'] == 'audio'), None)['bit_rate'])
# Target total bitrate, in bps.
target_total_bitrate = (target_size * 1024 * 8) / (1.073741824 * duration)
# Target audio bitrate, in bps
if 10 * audio_bitrate > target_total_bitrate:
audio_bitrate = target_total_bitrate / 10
if audio_bitrate < min_audio_bitrate < target_total_bitrate:
audio_bitrate = min_audio_bitrate
elif audio_bitrate > max_audio_bitrate:
audio_bitrate = max_audio_bitrate
# Target video bitrate, in bps.
video_bitrate = target_total_bitrate - audio_bitrate
i = ffmpeg.input(video_full_path)
ffmpeg.output(i, os.devnull,
**{'c:v': 'libx264', 'b:v': video_bitrate, 'pass': 1, 'f': 'mp4'}
).overwrite_output().run()
ffmpeg.output(i, output_file_name,
**{'c:v': 'libx264', 'b:v': video_bitrate, 'pass': 2, 'c:a': 'aac', 'b:a': audio_bitrate}
).overwrite_output().run()
def video_load(self,videofile):
self.video_file = f"{ self.current_dir}/{videofile}"
if self.video_file:
self.cap = cv2.VideoCapture(self.video_file)
_, image = self.cap.read()
h, w = image.shape[:2]
fourcc = cv2.VideoWriter_fourcc(*"XVID")
frames = self.cap.get(cv2.CAP_PROP_FRAME_COUNT)
fps = self.cap.get(cv2.CAP_PROP_FPS)
# calculate duration of the video
self.duration_seconds = round(frames / fps)
self.out = cv2.VideoWriter(f"{self.current_dir}/output.avi", fourcc, 20.0, (w, h))
self.overall_time_taken = []
def caesar_object_detect(self,image,verbose=False):
if self.video_file and image is "video":
_,image = self.cap.read()
try:
h, w = image.shape[:2]
except AttributeError as aex:
return None,None,None
blob = cv2.dnn.blobFromImage(image, 1/255.0, (416, 416), swapRB=True, crop=False)
self.net.setInput(blob)
start = time.perf_counter()
layer_outputs = self.net.forward(self.ln)
time_took = time.perf_counter() - start
if verbose == True:
print("Time took:", time_took)
if self.video_file:
self.overall_time_taken.append(time_took)
time_elapsed = round(sum(self.overall_time_taken),3)
approx_finish = self.duration_seconds *4.6 # seconds
boxes, confidences, class_ids = [], [], []
# loop over each of the layer outputs
for output in layer_outputs:
# loop over each of the object detections
for detection in output:
# extract the class id (label) and confidence (as a probability) of
# the current object detection
scores = detection[5:]
class_id = np.argmax(scores)
confidence = scores[class_id]
# discard weak predictions by ensuring the detected
# probability is greater than the minimum probability
if confidence > self.CONFIDENCE:
# scale the bounding box coordinates back relative to the
# size of the image, keeping in mind that YOLO actually
# returns the center (x, y)-coordinates of the bounding
# box followed by the boxes' width and height
box = detection[:4] * np.array([w, h, w, h])
(centerX, centerY, width, height) = box.astype("int")
# use the center (x, y)-coordinates to derive the top and
# and left corner of the bounding box
x = int(centerX - (width / 2))
y = int(centerY - (height / 2))
# update our list of bounding box coordinates, confidences,
# and class IDs
boxes.append([x, y, int(width), int(height)])
confidences.append(float(confidence))
class_ids.append(class_id)
# perform the non maximum suppression given the scores defined before
idxs = cv2.dnn.NMSBoxes(boxes, confidences, self.SCORE_THRESHOLD, self.IOU_THRESHOLD)
self.font_scale = 1
self.thickness = 1
# ensure at least one detection exists
if len(idxs) > 0:
# loop over the indexes we are keeping
for i in idxs.flatten():
# extract the bounding box coordinates
x, y = boxes[i][0], boxes[i][1]
w, h = boxes[i][2], boxes[i][3]
# draw a bounding box rectangle and label on the image
color = [int(c) for c in self.COLORS[class_ids[i]]]
cv2.rectangle(image, (x, y), (x + w, y + h), color=color, thickness=self.thickness)
text = f"{self.LABELS[class_ids[i]]}: {confidences[i]:.2f}"
# calculate text width & height to draw the transparent boxes as background of the text
(text_width, text_height) = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, fontScale=self.font_scale, thickness=self.thickness)[0]
text_offset_x = x
text_offset_y = y - 5
box_coords = ((text_offset_x, text_offset_y), (text_offset_x + text_width + 2, text_offset_y - text_height))
overlay = image.copy()
cv2.rectangle(overlay, box_coords[0], box_coords[1], color=color, thickness=cv2.FILLED)
# add opacity (transparency to the box)
image = cv2.addWeighted(overlay, 0.6, image, 0.4, 0)
# now put the text (label: confidence %)
cv2.putText(image, text, (x, y - 5), cv2.FONT_HERSHEY_SIMPLEX,
fontScale=self.font_scale, color=(0, 0, 0), thickness=self.thickness)
if self.video_file:
self.out.write(image)
return image,time_elapsed,approx_finish
elif not self.video_file:
return image,0,0
if __name__ == "__main__":
def test():
caesaryolo = CaesarYolo()
caesaryolo.video_load("car-detection.mp4")
while True:
image,time_elapsed,end_time = caesaryolo.caesar_object_detect("video")
if image is not None:
print(round(time_elapsed,3),"out of",end_time)
cv2.imshow("image", image)
if ord("q") == cv2.waitKey(1):
break
else:
break
caesaryolo.cap.release()
cv2.destroyAllWindows()
def convert_avi_to_mp4(avi_file_path, output_name):
os.system(f"ffmpeg -y -i {avi_file_path} {output_name}")
return True
CURRENT_DIR = os.path.realpath(__file__).replace(f"/CaesarYolo.py","")
#convert_avi_to_mp4(,)
import subprocess
#process = subprocess.Popen(ffmpeg_command, stdout = subprocess.PIPE, stderr = subprocess.STDOUT, bufsize = -1)