sign_final / app.py
EL GHAFRAOUI AYOUB
C
8331ff3
from ultralytics import YOLO
import time
import numpy as np
import mediapipe as mp
from flask import Flask, request, jsonify, send_file
import uvicorn
from socketio import ASGIApp
import cv2
from flask import Flask, render_template, request, Response, session, redirect, url_for, make_response
from flask_socketio import SocketIO
from flask_socketio import emit
from flask_cors import CORS
from flask_socketio import SocketIO
import yt_dlp as youtube_dl
import uvicorn
import base64
import matplotlib.pyplot as plt
import numpy as np
import base64
from io import BytesIO
from PIL import Image
def plot_base64_image(image_base64):
# Decode base64 string
image_data = base64.b64decode(image_base64)
# Convert bytes to PIL Image
image = Image.open(BytesIO(image_data))
# Convert PIL Image to numpy array
image_array = np.array(image)
# Plot image
plt.imshow(image_array)
plt.axis('off')
plt.show()
# Example usage:
# base64_image = "..." # Your base64 encoded image string
# plot_base64_image(base64_image)
model_object_detection = YOLO("bisindov2.pt")
app = Flask(__name__)
socketio = SocketIO(app, cors_allowed_origins="*")
CORS(app)
app.secret_key = 'flask-sockets-builds'
######################################################
classes_translation = {
"all": "الكل",
"A": "أ",
"B": "ب",
"C": "ج",
"D": "د",
"F": "ف",
"H": "هـ",
"I": "أنا",
"J": "جيم",
"L": "إل",
"M": "إم",
"O": "أو",
"R": "ر",
"T": "ت",
"U": "يو",
"V": "في",
"W": "دبليو",
"Z": "زد",
"additional": "إضافي",
"alcohol": "مدرسة",
"allergy": "حساسية",
"bacon": "لحم المقدد",
"bag": "حقيبة",
"barbecue": "شواء",
"bill": "فاتورة",
"biscuit": "بسكويت",
"bitter": "مر",
"bread": "خبز",
"burger": "برغر",
"bye": "وداعاً",
"cheese": "جبن",
"chicken": "دجاج",
"coke": "كوكاكولا",
"cold": "بارد",
"cost": "تكلفة",
"coupon": "كوبون",
"cup": "كوب",
"dessert": "حلوى",
"drink": "شراب",
"drive": "قيادة",
"eat": "تناول الطعام",
"eggs": "بيض",
"enjoy": "استمتع",
"fork": "شوكة",
"french fries": "بطاطس مقلية",
"fresh": "طازج",
"hello": "مرحبا",
"hot": "ساخن",
"icecream": "آيس كريم",
"ingredients": "مكونات",
"juicy": "عصيري",
"ketchup": "كاتشب",
"lactose": "لاكتوز",
"lettuce": "خس",
"lid": "غطاء",
"manager": "مدير",
"menu": "قائمة الطعام",
"milk": "حليب",
"mustard": "خردل",
"napkin": "منديل",
"no": "لا",
"order": "طلب",
"pepper": "فلفل",
"pickle": "مخلل",
"pizza": "بيتزا",
"please": "من فضلك",
"ready": "جاهز",
"refill": "إعادة ملء",
"repeat": "كرر",
"safe": "آمن",
"salt": "ملح",
"sandwich": "ساندويتش",
"sauce": "صلصة",
"small": "صغير",
"soda": "صودا",
"sorry": "آسف",
"spicy": "حار",
"spoon": "ملعقة",
"straw": "قش",
"sugar": "سكر",
"sweet": "حلو",
"tissues": "مناديل",
"total": "مجموع",
"urgent": "عاجل",
"vegetables": "خضروات",
"warm": "دافئ",
"water": "ماء",
"what": "ماذا",
"yoghurt": "زبادي",
"your": "لك",
"ILoveYou":"أحبك",
"Halo":"مرحبًا"
}
######################################################
class VideoStreaming(object):
def __init__(self):
super(VideoStreaming, self).__init__()
print ("===== Video Streaming =====")
self._preview = False
self._flipH = False
self._detect = False
self._model = False
self._mediaPipe = False
self._confidence = 75.0
self.mp_hands = mp.solutions.hands
self.hands = self.mp_hands.Hands()
@property
def confidence(self):
return self._confidence
@confidence.setter
def confidence(self, value):
self._confidence = int(value)
@property
def preview(self):
return self._preview
@preview.setter
def preview(self, value):
self._preview = bool(value)
@property
def flipH(self):
return self._flipH
@flipH.setter
def flipH(self, value):
self._flipH = bool(value)
@property
def detect(self):
return self._detect
@detect.setter
def detect(self, value):
self._detect = bool(value)
@property
def mediaPipe(self):
return self._mediaPipe
@mediaPipe.setter
def mediaPipe(self, value):
self._mediaPipe = bool(value)
def show(self, url):
print(url)
self._preview = False
self._flipH = False
self._detect = False
self._mediaPipe = False
self._confidence = 75.0
ydl_opts = {
"quiet": True,
"no_warnings": True,
"format": "best",
"forceurl": True,
}
if url == '4':
print("am here with 0 to start cam")
cap = cv2.VideoCapture(0)
else:
ydl = youtube_dl.YoutubeDL(ydl_opts)
info = ydl.extract_info("https://www.youtube.com/watch?v=j4YZBRwVFFo", download=False)
url = info["url"]
cap = cv2.VideoCapture(url)
while True:
if self._preview:
if stop_flag:
print("Process Stopped")
return
grabbed, frame = cap.read()
if not grabbed:
break
if self.flipH:
print("flip part :")
frame = cv2.flip(frame, 1)
if self.detect:
frame_yolo = frame.copy()
results_yolo = model_object_detection.predict(frame_yolo, conf=self._confidence / 100)
frame_yolo, labels = results_yolo[0].plot()
list_labels = []
# labels_confidences
for label in labels:
confidence = label.split(" ")[-1]
label_name = " ".join(label.split(" ")[:-1])
# Translate the label if it exists in the translation dictionary
translated_label = classes_translation.get(label_name, label_name)
list_labels.append(translated_label)
list_labels.append(confidence)
socketio.emit('label', list_labels)
if self.mediaPipe:
# Convert the image to RGB for processing with MediaPipe
image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
results = self.hands.process(image)
if results.multi_hand_landmarks:
for hand_landmarks in results.multi_hand_landmarks:
mp.solutions.drawing_utils.draw_landmarks(
frame,
hand_landmarks,
self.mp_hands.HAND_CONNECTIONS,
landmark_drawing_spec=mp.solutions.drawing_utils.DrawingSpec(color=(255, 0, 0), thickness=4, circle_radius=3),
connection_drawing_spec=mp.solutions.drawing_utils.DrawingSpec(color=(255, 255, 255), thickness=2, circle_radius=2),
)
print("frame information in here : ")
frame = cv2.imencode(".jpg", frame)[1].tobytes()
yield (
b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n'
)
else:
snap = np.zeros((
1000,
1000
), np.uint8)
label = "Streaming Off"
H, W = snap.shape
font = cv2.FONT_HERSHEY_PLAIN
color = (255, 255, 255)
cv2.putText(snap, label, (W//2 - 100, H//2),
font, 2, color, 2)
frame = cv2.imencode(".jpg", snap)[1].tobytes()
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')
def show1(self, url):
print("url")
self._preview = False
self._flipH = False
self._detect = False
self._mediaPipe = False
self._confidence = 75.0
ydl_opts = {
"quiet": True,
"no_warnings": True,
"format": "best",
"forceurl": True,
}
while True:
# Decoding the Base64 string to get the frame data
frame_bytes = base64.b64decode(url)
# Converting the frame data to an OpenCV image
frame_np = np.frombuffer(frame_bytes, np.uint8)
frame = cv2.imdecode(frame_np, cv2.IMREAD_COLOR)
# Encode the frame data to bytes
_, frame_encoded = cv2.imencode(".jpg", frame)
frame_bytes = frame_encoded.tobytes()
yield (
b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame_bytes + b'\r\n'
)
# check_settings()
VIDEO = VideoStreaming()
@app.route('/', methods=['GET', 'POST'])
def homepage():
return render_template('hompage.html')
@app.route('/index', methods=['GET', 'POST'])
def index():
print("index")
global stop_flag
stop_flag = False
if request.method == 'POST':
print("Index post request")
url = request.form['url']
print("index: ", url)
# Create a response object
resp = make_response(redirect(url_for('index')))
# Set a cookie containing the URL
resp.set_cookie('url', url)
return resp
return render_template('index.html')
@app.route('/video_feed')
def video_feed():
#url = session.get('url', None)
#print("video feed: ", url)
# Retrieve the URL from the cookie
url = request.cookies.get('url')
url = '0'
print("video feed: ", url)
if url is None:
return redirect(url_for('homepage'))
print("video feed: ", url)
return Response(VIDEO.show(url), mimetype='multipart/x-mixed-replace; boundary=frame')
# * Button requests
@app.route("/request_preview_switch")
def request_preview_switch():
VIDEO.preview = not VIDEO.preview
print("*"*10, VIDEO.preview)
return "nothing"
@app.route("/request_flipH_switch")
def request_flipH_switch():
VIDEO.flipH = not VIDEO.flipH
print("*"*10, VIDEO.flipH)
return "nothing"
@app.route("/request_run_model_switch")
def request_run_model_switch():
VIDEO.detect = not VIDEO.detect
print("*"*10, VIDEO.detect)
return "nothing"
@app.route("/request_mediapipe_switch")
def request_mediapipe_switch():
VIDEO.mediaPipe = not VIDEO.mediaPipe
print("*"*10, VIDEO.mediaPipe)
return "nothing"
@app.route('/update_slider_value', methods=['POST'])
def update_slider_value():
slider_value = request.form['sliderValue']
VIDEO.confidence = slider_value
return 'OK'
@app.route('/stop_process')
def stop_process():
print("Process stop Request")
global stop_flag
stop_flag = True
return 'Process Stop Request'
@socketio.on('connect')
def test_connect():
print('Connected')
#emit('message', data, broadcast=True)
######################
def preprocess_frame(frame_data):
if frame_data is None:
return None # Return None if frame_data is None
try:
# Convert base64 image string to numpy array
# Split frame_data to extract base64 part
base64_data = frame_data
# Convert base64 image string to numpy array
imgdata = base64.b64decode(base64_data)
imgarray = np.frombuffer(imgdata, np.uint8)
# Decode the image using cv2.imdecode
frame = cv2.imdecode(imgarray, cv2.IMREAD_COLOR)
print("rani dezte hna koolchi mezian")
# Apply image processing here (example: grayscale conversion)
processed_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# Convert processed frame back to base64 image string
_, buffer = cv2.imencode('.jpg', processed_frame)
processed_frame_data = base64.b64encode(buffer).decode('utf-8')
#plot_base64_image(processed_frame_data)
return processed_frame_data
except Exception as e:
print("Error processing frame:", e)
return None
#@socketio.on('stream_frame')
#def handle_stream_frame(frame_data):
# processed_frame_data = preprocess_frame(frame_data)
# #emit('receive_frame', processed_frame_data, broadcast=True)
# return Response(VIDEO.show1(frame_data), mimetype='multipart/x-mixed-replace; boundary=frame')
# Route to receive a frame for processing
@app.route('/process_frame', methods=['POST'])
def process_frame():
if 'frame' in request.files:
frame = request.files['frame']
# Process the frame here
# For example, you can save the frame to a file
frame_path = 'result_frame.jpg'
frame.save(frame_path)
# Return the processed frame
return send_file(frame_path, mimetype='image/jpeg')
else:
return 'No frame data received'
if __name__ == '__main__':
socketio.run(app, host="0.0.0.0", allow_unsafe_werkzeug=True,port=7860)