import gradio as gr from huggingface_hub import InferenceClient import gradio as gr import numpy as np import cv2 import librosa import moviepy.editor as mp import speech_recognition as sr import tempfile import wave import os import tensorflow as tf from tensorflow.keras.preprocessing.text import tokenizer_from_json from tensorflow.keras.models import load_model, model_from_json from sklearn.preprocessing import StandardScaler from tensorflow.keras.preprocessing.sequence import pad_sequences import nltk nltk.download('stopwords') nltk.download('punkt') nltk.download('punkt_tab') nltk.download('wordnet') from nltk.corpus import stopwords from nltk.stem import WordNetLemmatizer import pickle import json from tensorflow.keras.preprocessing.image import img_to_array, load_img from collections import Counter # Load the text model with open('model_architecture_for_text_emotion_updated_json.json', 'r') as json_file: model_json = json_file.read() text_model = model_from_json(model_json) text_model.load_weights("model_for_text_emotion_updated(1).keras") # Load the encoder and scaler for audio with open('encoder.pkl', 'rb') as file: encoder = pickle.load(file) with open('scaler.pkl', 'rb') as file: scaler = pickle.load(file) # Load the tokenizer for text with open('tokenizer.json') as json_file: tokenizer_json = json.load(json_file) tokenizer = tokenizer_from_json(tokenizer_json) # Load the audio model audio_model = load_model('my_model.h5') # Load the image model image_model = load_model('model_emotion.h5') # Initialize NLTK lemmatizer = WordNetLemmatizer() stop_words = set(stopwords.words('english')) # Preprocess text function def preprocess_text(text): tokens = nltk.word_tokenize(text.lower()) tokens = [word for word in tokens if word.isalnum() and word not in stop_words] lemmatized_tokens = [lemmatizer.lemmatize(word) for word in tokens] return ' '.join(lemmatized_tokens) # Extract features from audio # Extract features from audio def extract_features(data, sample_rate): result = [] try: zcr = np.mean(librosa.feature.zero_crossing_rate(y=data).T, axis=0) result.append(zcr) stft = np.abs(librosa.stft(data)) chroma_stft = np.mean(librosa.feature.chroma_stft(S=stft, sr=sample_rate).T, axis=0) result.append(chroma_stft) mfcc = np.mean(librosa.feature.mfcc(y=data, sr=sample_rate).T, axis=0) result.append(mfcc) rms = np.mean(librosa.feature.rms(y=data).T, axis=0) result.append(rms) mel = np.mean(librosa.feature.melspectrogram(y=data, sr=sample_rate).T, axis=0) result.append(mel) # Ensure all features are numpy arrays result = [np.atleast_1d(feature) for feature in result] # Stack features horizontally return np.hstack(result) except Exception as e: print(f"Error extracting features: {e}") return np.zeros(1) # Return a default feature array if extraction fails # Predict emotion from text def find_emotion_using_text(sample_rate, audio_data, recognizer): mapping = {0: "anger", 1: "disgust", 2: "fear", 3: "joy", 4: "neutral", 5: "sadness", 6: "surprise"} with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as temp_audio_file: temp_audio_path = temp_audio_file.name with wave.open(temp_audio_path, 'w') as wf: wf.setnchannels(1) wf.setsampwidth(2) wf.setframerate(sample_rate) wf.writeframes(audio_data.tobytes()) with sr.AudioFile(temp_audio_path) as source: audio_record = recognizer.record(source) text = recognizer.recognize_google(audio_record) pre_text = preprocess_text(text) title_seq = tokenizer.texts_to_sequences([pre_text]) padded_title_seq = pad_sequences(title_seq, maxlen=35, padding='post', truncating='post') inp1 = np.array(padded_title_seq) text_prediction = text_model.predict(inp1) os.remove(temp_audio_path) max_index = text_prediction.argmax() return mapping[max_index] # Predict emotion from audio def predict_emotion(audio_data): sample_rate, data = audio_data data = data.flatten() if data.dtype != np.float32: data = data.astype(np.float32) data = data / np.max(np.abs(data)) features = extract_features(data, sample_rate) features = np.expand_dims(features, axis=0) if features.ndim == 3: features = np.squeeze(features, axis=2) elif features.ndim != 2: raise ValueError("Features array has unexpected dimensions.") scaled_features = scaler.transform(features) scaled_features = np.expand_dims(scaled_features, axis=2) prediction = audio_model.predict(scaled_features) emotion_index = np.argmax(prediction) num_classes = len(encoder.categories_[0]) emotion_array = np.zeros((1, num_classes)) emotion_array[0, emotion_index] = 1 emotion_label = encoder.inverse_transform(emotion_array)[0] return emotion_label # Preprocess image def preprocess_image(image): image = load_img(image, target_size=(48, 48), color_mode="grayscale") image = img_to_array(image) image = np.expand_dims(image, axis=0) image = image / 255.0 return image # Predict emotion from image def predict_emotion_from_image(image): preprocessed_image = preprocess_image(image) prediction = image_model.predict(preprocessed_image) emotion_index = np.argmax(prediction) mapping = {0: "anger", 1: "disgust", 2: "fear", 3: "joy", 4: "neutral", 5: "sadness", 6: "surprise"} return mapping[emotion_index] # Main function to handle text, audio, and image emotion recognition # Load the models and other necessary files (as before) # Preprocess image (as before) # Predict emotion from image (as before) # Extract features from audio (as before) # Predict emotion from text (as before) # Predict emotion from audio (as before) def process_video(video_path): cap = cv2.VideoCapture(video_path) frame_rate = cap.get(cv2.CAP_PROP_FPS) frame_count = 0 predictions = [] while cap.isOpened(): ret, frame = cap.read() if not ret: break # Process every nth frame (to speed up processing) if frame_count % int(frame_rate) == 0: # Convert frame to grayscale as required by your model frame = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) frame = cv2.resize(frame, (48, 48)) # Resize to match model input size frame = img_to_array(frame) frame = np.expand_dims(frame, axis=0) / 255.0 # Predict emotion prediction = image_model.predict(frame) predictions.append(np.argmax(prediction)) frame_count += 1 cap.release() # cv2.destroyAllWindows() # Find the most common prediction most_common_emotion = Counter(predictions).most_common(1)[0][0] mapping = {0: "anger", 1: "disgust", 2: "fear", 3: "joy", 4: "neutral", 5: "sadness", 6: "surprise"} return mapping[most_common_emotion] # Process audio from video and predict emotions def process_audio_from_video(video_path): video = mp.VideoFileClip(video_path) audio = video.audio with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as temp_audio_file: temp_audio_path = temp_audio_file.name audio.write_audiofile(temp_audio_path) recognizer = sr.Recognizer() with sr.AudioFile(temp_audio_path) as source: audio_record = recognizer.record(source) text = recognizer.recognize_google(audio_record) pre_text = preprocess_text(text) title_seq = tokenizer.texts_to_sequences([pre_text]) padded_title_seq = pad_sequences(title_seq, maxlen=35, padding='post', truncating='post') inp1 = np.array(padded_title_seq) text_prediction = text_model.predict(inp1) os.remove(temp_audio_path) max_index = text_prediction.argmax() text_emotion = {0: "anger", 1: "disgust", 2: "fear", 3: "joy", 4: "neutral", 5: "sadness", 6: "surprise"}[max_index] audio_emotion = predict_emotion((audio.fps, np.array(audio.to_soundarray()))) return text_emotion, audio_emotion, text # Main function to handle video emotion recognition def transcribe_and_predict_video(video): """ Process video for emotion detection (image, audio, text) and transcription. (Replace process_video & process_audio_from_video with actual implementations) """ image_emotion = process_video(video) # Emotion from video frames print("Image processing done.") text_emotion, audio_emotion, extracted_text = process_audio_from_video(video) # Speech-to-text + emotions print("Audio processing done.") return { "text_emotion": text_emotion, "audio_emotion": audio_emotion, "image_emotion": image_emotion, "extracted_text": extracted_text, } # Load Zephyr-7B Model MODEL_NAME = "HuggingFaceH4/zephyr-7b-beta" client = InferenceClient(MODEL_NAME) # # Chatbot response function # def respond(video, history, system_message, max_tokens, temperature, top_p): # video_path = video.name # Get the uploaded video file path # # Process the video for emotions & text # result = transcribe_and_predict_video(video_path) # # Construct a system prompt with extracted emotions & text # system_prompt = ( # f"{system_message}\n\n" # f"Detected Emotions:\n" # f"- Text Emotion: {result['text_emotion']}\n" # f"- Audio Emotion: {result['audio_emotion']}\n" # f"- Image Emotion: {result['image_emotion']}\n\n" # f"Extracted Speech: {result['extracted_text']}" # ) # messages = [{"role": "system", "content": system_prompt}] # for val in history: # if val[0]: # messages.append({"role": "user", "content": val[0]}) # if val[1]: # messages.append({"role": "assistant", "content": val[1]}) # messages.append({"role": "user", "content": result['extracted_text']}) # response = "" # try: # for message in client.chat_completion( # messages, # max_tokens=max_tokens, # stream=True, # temperature=temperature, # top_p=top_p, # ): # token = message.choices[0].delta.content if message.choices[0].delta else "" # response += token # yield response # except Exception as e: # yield f"Error: {str(e)}" # # Gradio UI for video chatbot # demo = gr.ChatInterface( # respond, # additional_inputs=[ # gr.Video(label="Upload a Video"), # Video input # gr.Textbox(value="You are a chatbot that analyzes emotions and responds accordingly.", label="System message"), # gr.Slider(minimum=1, maximum=2048, value=512, step=1, label="Max Tokens"), # gr.Slider(minimum=0.1, maximum=2.0, value=0.7, step=0.1, label="Temperature"), # gr.Slider(minimum=0.1, maximum=1.0, value=0.95, step=0.05, label="Top-p"), # ], # ) # if __name__ == "__main__": # demo.launch() def respond(video, text_input, history): """Processes user input (video, text, or both) and generates a chatbot response.""" messages = [] system_prompt = "You are a chatbot that can analyze emotions from videos and respond accordingly." print("DEBUG: Function called with video:", video) print("DEBUG: Function called with text_input:", text_input) print("DEBUG: Function called with history:", history) # Handle video input safely if video: try: video_path = video if isinstance(video, str) else getattr(video, "name", None) if not video_path: return "Error: Invalid video input." result = transcribe_and_predict_video(video_path) print("DEBUG: Video Analysis Result:", result) system_prompt += f"\n\nDetected Emotions:\n" system_prompt += f"- Text Emotion: {result['text_emotion']}\n" system_prompt += f"- Audio Emotion: {result['audio_emotion']}\n" system_prompt += f"- Image Emotion: {result['image_emotion']}\n\n" system_prompt += f"Extracted Speech: {result['extracted_text']}\n" messages.append({"role": "user", "content": result["extracted_text"]}) # Add extracted speech except Exception as e: return f"Error processing video: {str(e)}" # Ensure history is a list if isinstance(history, list): for val in history: if isinstance(val, (list, tuple)) and len(val) == 2: if val[0]: messages.append({"role": "user", "content": val[0]}) if val[1]: messages.append({"role": "assistant", "content": val[1]}) else: return "Error: Chat history is not in the correct format." # Include system prompt messages.insert(0, {"role": "system", "content": system_prompt}) print("DEBUG: Final messages sent to chatbot:", messages) response = "" try: for message in client.chat_completion(messages, max_tokens=512, stream=True, temperature=0.7, top_p=0.95): token = message.choices[0].delta.content if message.choices[0].delta else "" response += token print("DEBUG: Received token:", token) # Log tokens received yield response except Exception as e: print("DEBUG: Chatbot Error:", str(e)) yield f"Error: {str(e)}" # Define ChatGPT-style UI with gr.Blocks(theme="soft") as demo: gr.Markdown("

📹🎤💬 Multi-Modal Chatbot (Video + Text)

") chatbot = gr.Chatbot(label="ChatGPT-Like Chat") video_input = gr.Video(label="Upload Video (Optional)") text_input = gr.Textbox(label="Enter Text (Optional)", placeholder="Type your message here...") submit_button = gr.Button("Submit") # ✅ Added a submit button clear_button = gr.Button("Clear Chat") def clear_chat(): return [], None, "" # ✅ Fix: Prevent video from disappearing instantly def process_input(video, text): return respond(video, text, chatbot) submit_button.click(process_input, inputs=[video_input, text_input], outputs=[chatbot]) clear_button.click(clear_chat, outputs=[chatbot, video_input, text_input]) # Launch chatbot if __name__ == "__main__": demo.launch()