Spaces:
Sleeping
Sleeping
from ultralytics import YOLO | |
import time | |
import numpy as np | |
import mediapipe as mp | |
from flask import Flask, request, jsonify, send_file | |
import uvicorn | |
from socketio import ASGIApp | |
import cv2 | |
from flask import Flask, render_template, request, Response, session, redirect, url_for, make_response | |
from flask_socketio import SocketIO | |
from flask_socketio import emit | |
from flask_cors import CORS | |
from flask_socketio import SocketIO | |
import yt_dlp as youtube_dl | |
import uvicorn | |
import base64 | |
import matplotlib.pyplot as plt | |
import numpy as np | |
import base64 | |
from io import BytesIO | |
from PIL import Image | |
def plot_base64_image(image_base64): | |
# Decode base64 string | |
image_data = base64.b64decode(image_base64) | |
# Convert bytes to PIL Image | |
image = Image.open(BytesIO(image_data)) | |
# Convert PIL Image to numpy array | |
image_array = np.array(image) | |
# Plot image | |
plt.imshow(image_array) | |
plt.axis('off') | |
plt.show() | |
# Example usage: | |
# base64_image = "..." # Your base64 encoded image string | |
# plot_base64_image(base64_image) | |
model_object_detection = YOLO("bisindov2.pt") | |
app = Flask(__name__) | |
socketio = SocketIO(app, cors_allowed_origins="*") | |
CORS(app) | |
app.secret_key = 'flask-sockets-builds' | |
###################################################### | |
classes_translation = { | |
"all": "الكل", | |
"A": "أ", | |
"B": "ب", | |
"C": "ج", | |
"D": "د", | |
"F": "ف", | |
"H": "هـ", | |
"I": "أنا", | |
"J": "جيم", | |
"L": "إل", | |
"M": "إم", | |
"O": "أو", | |
"R": "ر", | |
"T": "ت", | |
"U": "يو", | |
"V": "في", | |
"W": "دبليو", | |
"Z": "زد", | |
"additional": "إضافي", | |
"alcohol": "مدرسة", | |
"allergy": "حساسية", | |
"bacon": "لحم المقدد", | |
"bag": "حقيبة", | |
"barbecue": "شواء", | |
"bill": "فاتورة", | |
"biscuit": "بسكويت", | |
"bitter": "مر", | |
"bread": "خبز", | |
"burger": "برغر", | |
"bye": "وداعاً", | |
"cheese": "جبن", | |
"chicken": "دجاج", | |
"coke": "كوكاكولا", | |
"cold": "بارد", | |
"cost": "تكلفة", | |
"coupon": "كوبون", | |
"cup": "كوب", | |
"dessert": "حلوى", | |
"drink": "شراب", | |
"drive": "قيادة", | |
"eat": "تناول الطعام", | |
"eggs": "بيض", | |
"enjoy": "استمتع", | |
"fork": "شوكة", | |
"french fries": "بطاطس مقلية", | |
"fresh": "طازج", | |
"hello": "مرحبا", | |
"hot": "ساخن", | |
"icecream": "آيس كريم", | |
"ingredients": "مكونات", | |
"juicy": "عصيري", | |
"ketchup": "كاتشب", | |
"lactose": "لاكتوز", | |
"lettuce": "خس", | |
"lid": "غطاء", | |
"manager": "مدير", | |
"menu": "قائمة الطعام", | |
"milk": "حليب", | |
"mustard": "خردل", | |
"napkin": "منديل", | |
"no": "لا", | |
"order": "طلب", | |
"pepper": "فلفل", | |
"pickle": "مخلل", | |
"pizza": "بيتزا", | |
"please": "من فضلك", | |
"ready": "جاهز", | |
"refill": "إعادة ملء", | |
"repeat": "كرر", | |
"safe": "آمن", | |
"salt": "ملح", | |
"sandwich": "ساندويتش", | |
"sauce": "صلصة", | |
"small": "صغير", | |
"soda": "صودا", | |
"sorry": "آسف", | |
"spicy": "حار", | |
"spoon": "ملعقة", | |
"straw": "قش", | |
"sugar": "سكر", | |
"sweet": "حلو", | |
"tissues": "مناديل", | |
"total": "مجموع", | |
"urgent": "عاجل", | |
"vegetables": "خضروات", | |
"warm": "دافئ", | |
"water": "ماء", | |
"what": "ماذا", | |
"yoghurt": "زبادي", | |
"your": "لك", | |
"ILoveYou":"أحبك", | |
"Halo":"مرحبًا" | |
} | |
###################################################### | |
class VideoStreaming(object): | |
def __init__(self): | |
super(VideoStreaming, self).__init__() | |
print ("===== Video Streaming =====") | |
self._preview = False | |
self._flipH = False | |
self._detect = False | |
self._model = False | |
self._mediaPipe = False | |
self._confidence = 75.0 | |
self.mp_hands = mp.solutions.hands | |
self.hands = self.mp_hands.Hands() | |
def confidence(self): | |
return self._confidence | |
def confidence(self, value): | |
self._confidence = int(value) | |
def preview(self): | |
return self._preview | |
def preview(self, value): | |
self._preview = bool(value) | |
def flipH(self): | |
return self._flipH | |
def flipH(self, value): | |
self._flipH = bool(value) | |
def detect(self): | |
return self._detect | |
def detect(self, value): | |
self._detect = bool(value) | |
def mediaPipe(self): | |
return self._mediaPipe | |
def mediaPipe(self, value): | |
self._mediaPipe = bool(value) | |
def show(self, url): | |
print(url) | |
self._preview = False | |
self._flipH = False | |
self._detect = False | |
self._mediaPipe = False | |
self._confidence = 75.0 | |
ydl_opts = { | |
"quiet": True, | |
"no_warnings": True, | |
"format": "best", | |
"forceurl": True, | |
} | |
if url == '4': | |
print("am here with 0 to start cam") | |
cap = cv2.VideoCapture(0) | |
else: | |
ydl = youtube_dl.YoutubeDL(ydl_opts) | |
info = ydl.extract_info("https://www.youtube.com/watch?v=j4YZBRwVFFo", download=False) | |
url = info["url"] | |
cap = cv2.VideoCapture(url) | |
while True: | |
if self._preview: | |
if stop_flag: | |
print("Process Stopped") | |
return | |
grabbed, frame = cap.read() | |
if not grabbed: | |
break | |
if self.flipH: | |
print("flip part :") | |
frame = cv2.flip(frame, 1) | |
if self.detect: | |
frame_yolo = frame.copy() | |
results_yolo = model_object_detection.predict(frame_yolo, conf=self._confidence / 100) | |
frame_yolo, labels = results_yolo[0].plot() | |
list_labels = [] | |
# labels_confidences | |
for label in labels: | |
confidence = label.split(" ")[-1] | |
label_name = " ".join(label.split(" ")[:-1]) | |
# Translate the label if it exists in the translation dictionary | |
translated_label = classes_translation.get(label_name, label_name) | |
list_labels.append(translated_label) | |
list_labels.append(confidence) | |
socketio.emit('label', list_labels) | |
if self.mediaPipe: | |
# Convert the image to RGB for processing with MediaPipe | |
image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
results = self.hands.process(image) | |
if results.multi_hand_landmarks: | |
for hand_landmarks in results.multi_hand_landmarks: | |
mp.solutions.drawing_utils.draw_landmarks( | |
frame, | |
hand_landmarks, | |
self.mp_hands.HAND_CONNECTIONS, | |
landmark_drawing_spec=mp.solutions.drawing_utils.DrawingSpec(color=(255, 0, 0), thickness=4, circle_radius=3), | |
connection_drawing_spec=mp.solutions.drawing_utils.DrawingSpec(color=(255, 255, 255), thickness=2, circle_radius=2), | |
) | |
print("frame information in here : ") | |
frame = cv2.imencode(".jpg", frame)[1].tobytes() | |
yield ( | |
b'--frame\r\n' | |
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n' | |
) | |
else: | |
snap = np.zeros(( | |
1000, | |
1000 | |
), np.uint8) | |
label = "Streaming Off" | |
H, W = snap.shape | |
font = cv2.FONT_HERSHEY_PLAIN | |
color = (255, 255, 255) | |
cv2.putText(snap, label, (W//2 - 100, H//2), | |
font, 2, color, 2) | |
frame = cv2.imencode(".jpg", snap)[1].tobytes() | |
yield (b'--frame\r\n' | |
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n') | |
def show1(self, url): | |
print("url") | |
self._preview = False | |
self._flipH = False | |
self._detect = False | |
self._mediaPipe = False | |
self._confidence = 75.0 | |
ydl_opts = { | |
"quiet": True, | |
"no_warnings": True, | |
"format": "best", | |
"forceurl": True, | |
} | |
while True: | |
# Decoding the Base64 string to get the frame data | |
frame_bytes = base64.b64decode(url) | |
# Converting the frame data to an OpenCV image | |
frame_np = np.frombuffer(frame_bytes, np.uint8) | |
frame = cv2.imdecode(frame_np, cv2.IMREAD_COLOR) | |
# Encode the frame data to bytes | |
_, frame_encoded = cv2.imencode(".jpg", frame) | |
frame_bytes = frame_encoded.tobytes() | |
yield ( | |
b'--frame\r\n' | |
b'Content-Type: image/jpeg\r\n\r\n' + frame_bytes + b'\r\n' | |
) | |
# check_settings() | |
VIDEO = VideoStreaming() | |
def homepage(): | |
return render_template('hompage.html') | |
def index(): | |
print("index") | |
global stop_flag | |
stop_flag = False | |
if request.method == 'POST': | |
print("Index post request") | |
url = request.form['url'] | |
print("index: ", url) | |
# Create a response object | |
resp = make_response(redirect(url_for('index'))) | |
# Set a cookie containing the URL | |
resp.set_cookie('url', url) | |
return resp | |
return render_template('index.html') | |
def video_feed(): | |
#url = session.get('url', None) | |
#print("video feed: ", url) | |
# Retrieve the URL from the cookie | |
url = request.cookies.get('url') | |
url = '0' | |
print("video feed: ", url) | |
if url is None: | |
return redirect(url_for('homepage')) | |
print("video feed: ", url) | |
return Response(VIDEO.show(url), mimetype='multipart/x-mixed-replace; boundary=frame') | |
# * Button requests | |
def request_preview_switch(): | |
VIDEO.preview = not VIDEO.preview | |
print("*"*10, VIDEO.preview) | |
return "nothing" | |
def request_flipH_switch(): | |
VIDEO.flipH = not VIDEO.flipH | |
print("*"*10, VIDEO.flipH) | |
return "nothing" | |
def request_run_model_switch(): | |
VIDEO.detect = not VIDEO.detect | |
print("*"*10, VIDEO.detect) | |
return "nothing" | |
def request_mediapipe_switch(): | |
VIDEO.mediaPipe = not VIDEO.mediaPipe | |
print("*"*10, VIDEO.mediaPipe) | |
return "nothing" | |
def update_slider_value(): | |
slider_value = request.form['sliderValue'] | |
VIDEO.confidence = slider_value | |
return 'OK' | |
def stop_process(): | |
print("Process stop Request") | |
global stop_flag | |
stop_flag = True | |
return 'Process Stop Request' | |
def test_connect(): | |
print('Connected') | |
#emit('message', data, broadcast=True) | |
###################### | |
def preprocess_frame(frame_data): | |
if frame_data is None: | |
return None # Return None if frame_data is None | |
try: | |
# Convert base64 image string to numpy array | |
# Split frame_data to extract base64 part | |
base64_data = frame_data | |
# Convert base64 image string to numpy array | |
imgdata = base64.b64decode(base64_data) | |
imgarray = np.frombuffer(imgdata, np.uint8) | |
# Decode the image using cv2.imdecode | |
frame = cv2.imdecode(imgarray, cv2.IMREAD_COLOR) | |
print("rani dezte hna koolchi mezian") | |
# Apply image processing here (example: grayscale conversion) | |
processed_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
# Convert processed frame back to base64 image string | |
_, buffer = cv2.imencode('.jpg', processed_frame) | |
processed_frame_data = base64.b64encode(buffer).decode('utf-8') | |
#plot_base64_image(processed_frame_data) | |
return processed_frame_data | |
except Exception as e: | |
print("Error processing frame:", e) | |
return None | |
#@socketio.on('stream_frame') | |
#def handle_stream_frame(frame_data): | |
# processed_frame_data = preprocess_frame(frame_data) | |
# #emit('receive_frame', processed_frame_data, broadcast=True) | |
# return Response(VIDEO.show1(frame_data), mimetype='multipart/x-mixed-replace; boundary=frame') | |
# Route to receive a frame for processing | |
def process_frame(): | |
if 'frame' in request.files: | |
frame = request.files['frame'] | |
# Process the frame here | |
# For example, you can save the frame to a file | |
frame_path = 'result_frame.jpg' | |
frame.save(frame_path) | |
# Return the processed frame | |
return send_file(frame_path, mimetype='image/jpeg') | |
else: | |
return 'No frame data received' | |
if __name__ == '__main__': | |
socketio.run(app, host="0.0.0.0", allow_unsafe_werkzeug=True,port=7860) | |