|
import yt_dlp |
|
from ultralytics import YOLO |
|
import time |
|
import os |
|
import logging |
|
|
|
import av |
|
import cv2 |
|
import numpy as np |
|
import streamlit as st |
|
from streamlit_webrtc import WebRtcMode, webrtc_streamer |
|
|
|
from utils.download import download_file |
|
from utils.turn import get_ice_servers |
|
|
|
from PIL import Image, ImageDraw |
|
from transformers import pipeline |
|
|
|
import requests |
|
from io import BytesIO |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ANALYSIS_TITLE = "YOLO-8 Object Detection Analysis" |
|
|
|
|
|
|
|
model = YOLO("yolov8n.pt") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def analyze_frame(frame: np.ndarray): |
|
start_time = time.time() |
|
img_container["input"] = frame |
|
frame = frame.copy() |
|
|
|
|
|
results = model.track(frame, persist=True) |
|
|
|
|
|
detections = [] |
|
object_counter = 1 |
|
|
|
|
|
for box in results[0].boxes: |
|
detection = {} |
|
|
|
class_id = int(box.cls) |
|
|
|
detection["id"] = object_counter |
|
detection["label"] = model.names[class_id] |
|
detection["score"] = float(box.conf) |
|
detection["box_coords"] = [round(value.item(), 2) |
|
for value in box.xyxy.flatten()] |
|
|
|
detections.append(detection) |
|
object_counter += 1 |
|
|
|
|
|
frame = results[0].plot() |
|
|
|
end_time = time.time() |
|
execution_time_ms = round( |
|
(end_time - start_time) * 1000, 2 |
|
) |
|
|
|
img_container["analysis_time"] = execution_time_ms |
|
|
|
|
|
img_container["detections"] = detections |
|
img_container["analyzed"] = frame |
|
|
|
return |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
os.environ["FFMPEG_LOG_LEVEL"] = "quiet" |
|
|
|
|
|
logging.getLogger("streamlit").setLevel(logging.ERROR) |
|
|
|
|
|
img_container = {"input": None, "analyzed": None, |
|
"analysis_time": None, "detections": None} |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
|
|
|
|
|
def video_frame_callback(frame: av.VideoFrame) -> av.VideoFrame: |
|
|
|
img = frame.to_ndarray(format="rgb24") |
|
analyze_frame(img) |
|
return frame |
|
|
|
|
|
|
|
ice_servers = get_ice_servers() |
|
|
|
|
|
st.set_page_config(layout="wide") |
|
|
|
|
|
st.markdown( |
|
""" |
|
<style> |
|
.main { |
|
padding: 2rem; |
|
} |
|
h1, h2, h3 { |
|
font-family: 'Arial', sans-serif; |
|
} |
|
h1 { |
|
font-weight: 700; |
|
font-size: 2.5rem; |
|
} |
|
h2 { |
|
font-weight: 600; |
|
font-size: 2rem; |
|
} |
|
h3 { |
|
font-weight: 500; |
|
font-size: 1.5rem; |
|
} |
|
</style> |
|
""", |
|
unsafe_allow_html=True, |
|
) |
|
|
|
|
|
st.title(ANALYSIS_TITLE) |
|
|
|
st.subheader("A Computer Vision Playground") |
|
|
|
|
|
st.markdown( |
|
""" |
|
<div style="text-align: left;"> |
|
<p>See the <a href="https://huggingface.co/spaces/eusholli/sentiment-analyzer/blob/main/README.md" |
|
target="_blank">README</a> to learn how to use this code to help you start your computer vision exploration.</p> |
|
</div> |
|
""", |
|
unsafe_allow_html=True, |
|
) |
|
|
|
|
|
col1, col2 = st.columns(2) |
|
|
|
with col1: |
|
st.header("Input Stream") |
|
input_subheader = st.empty() |
|
input_placeholder = st.empty() |
|
st.subheader("Input Options") |
|
|
|
webrtc_ctx = webrtc_streamer( |
|
key="input-webcam", |
|
mode=WebRtcMode.SENDONLY, |
|
rtc_configuration=ice_servers, |
|
video_frame_callback=video_frame_callback, |
|
media_stream_constraints={"video": True, "audio": False}, |
|
async_processing=True, |
|
) |
|
|
|
|
|
st.subheader("Upload an Image") |
|
uploaded_file = st.file_uploader( |
|
"Choose an image...", type=["jpg", "jpeg", "png"]) |
|
|
|
|
|
st.subheader("Or Enter Image URL") |
|
image_url = st.text_input("Image URL") |
|
|
|
|
|
st.subheader("Enter a YouTube URL") |
|
youtube_url = st.text_input("YouTube URL") |
|
|
|
|
|
st.subheader("Upload a Video") |
|
uploaded_video = st.file_uploader( |
|
"Choose a video...", type=["mp4", "avi", "mov", "mkv"] |
|
) |
|
|
|
|
|
st.subheader("Or Enter Video Download URL") |
|
video_url = st.text_input("Video URL") |
|
|
|
|
|
st.markdown( |
|
""" |
|
<div style="text-align: center; margin-top: 2rem;"> |
|
<p>If you want to set up your own computer vision playground see <a href="https://huggingface.co/spaces/eusholli/computer-vision-playground/blob/main/README.md" target="_blank">here</a>.</p> |
|
</div> |
|
""", |
|
unsafe_allow_html=True |
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
def analysis_init(): |
|
global analysis_time, show_labels, labels_placeholder, input_subheader, input_placeholder, output_placeholder |
|
|
|
with col2: |
|
st.header("Analysis") |
|
input_subheader.subheader("Input Frame") |
|
|
|
st.subheader("Output Frame") |
|
output_placeholder = st.empty() |
|
analysis_time = st.empty() |
|
show_labels = st.checkbox( |
|
"Show the detected labels", value=True |
|
) |
|
labels_placeholder = st.empty() |
|
|
|
|
|
|
|
|
|
|
|
def publish_frame(): |
|
|
|
img = img_container["input"] |
|
if img is None: |
|
return |
|
input_placeholder.image(img, channels="RGB") |
|
|
|
analyzed = img_container["analyzed"] |
|
if analyzed is None: |
|
return |
|
|
|
output_placeholder.image(analyzed, channels="RGB") |
|
|
|
time = img_container["analysis_time"] |
|
if time is None: |
|
return |
|
|
|
analysis_time.text(f"Analysis Time: {time} ms") |
|
|
|
detections = img_container["detections"] |
|
if detections is None: |
|
return |
|
|
|
if show_labels: |
|
labels_placeholder.table( |
|
detections |
|
) |
|
|
|
|
|
|
|
if webrtc_ctx.state.playing: |
|
analysis_init() |
|
while True: |
|
publish_frame() |
|
time.sleep(0.1) |
|
|
|
|
|
|
|
if uploaded_file is not None or image_url: |
|
analysis_init() |
|
|
|
if uploaded_file is not None: |
|
image = Image.open(uploaded_file) |
|
img = np.array(image.convert("RGB")) |
|
else: |
|
response = requests.get(image_url) |
|
|
|
image = Image.open(BytesIO(response.content)) |
|
img = np.array(image.convert("RGB")) |
|
|
|
analyze_frame(img) |
|
publish_frame() |
|
|
|
|
|
|
|
|
|
|
|
def process_video(video_path): |
|
cap = cv2.VideoCapture(video_path) |
|
while cap.isOpened(): |
|
ret, frame = cap.read() |
|
if not ret: |
|
break |
|
|
|
|
|
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
|
|
|
|
analyze_frame(rgb_frame) |
|
publish_frame() |
|
|
|
cap.release() |
|
|
|
|
|
|
|
|
|
|
|
def get_youtube_stream_url(youtube_url): |
|
ydl_opts = { |
|
'format': 'best[ext=mp4]', |
|
'quiet': True, |
|
} |
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
info_dict = ydl.extract_info(youtube_url, download=False) |
|
stream_url = info_dict['url'] |
|
return stream_url |
|
|
|
|
|
|
|
if youtube_url: |
|
analysis_init() |
|
|
|
stream_url = get_youtube_stream_url(youtube_url) |
|
|
|
process_video(stream_url) |
|
|
|
|
|
if uploaded_video is not None or video_url: |
|
analysis_init() |
|
|
|
if uploaded_video is not None: |
|
video_path = uploaded_video.name |
|
with open(video_path, "wb") as f: |
|
|
|
f.write(uploaded_video.getbuffer()) |
|
else: |
|
|
|
video_path = download_file(video_url) |
|
|
|
process_video(video_path) |
|
|