import streamlit as st from transformers import AutoProcessor, Qwen2VLForConditionalGeneration from PIL import Image import torch import cv2 import tempfile def load_model_and_processor(): processor = AutoProcessor.from_pretrained("Qwen/Qwen2-VL-2B-Instruct") model = Qwen2VLForConditionalGeneration.from_pretrained("Qwen/Qwen2-VL-2B-Instruct") device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model.to(device) return processor, model, device def process_image(uploaded_file): image = Image.open(uploaded_file) image = image.resize((512, 512)) return image def process_video(uploaded_file): tfile = tempfile.NamedTemporaryFile(delete=False) tfile.write(uploaded_file.read()) cap = cv2.VideoCapture(tfile.name) ret, frame = cap.read() cap.release() if not ret: return None image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) image = image.resize((512, 512)) return image def generate_description(processor, model, device, image, user_question): messages = [ { "role": "user", "content": [ { "type": "image", "image": image, }, {"type": "text", "text": user_question}, ], } ] text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) inputs = processor(text=[text], images=[image], padding=True, return_tensors="pt") inputs = inputs.to(device) generated_ids = model.generate(**inputs, max_new_tokens=512) generated_ids_trimmed = [out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)] output_text = processor.batch_decode(generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False) return output_text[0] def main(): st.title("Media Description Generator") processor, model, device = load_model_and_processor() uploaded_files = st.file_uploader("Choose images or videos...", type=["jpg", "jpeg", "png", "mp4", "avi", "mov"], accept_multiple_files=True) if uploaded_files: user_question = st.text_input("Ask a question about the images or videos:") if user_question: for uploaded_file in uploaded_files: file_type = uploaded_file.type.split('/')[0] if file_type == 'image': image = process_image(uploaded_file) st.image(image, caption='Uploaded Image.', use_column_width=True) st.write("Generating description...") elif file_type == 'video': image = process_video(uploaded_file) if image is None: st.error("Failed to read the video file.") continue st.image(image, caption='First Frame of Uploaded Video.', use_column_width=True) st.write("Generating description...") else: st.error("Unsupported file type.") continue description = generate_description(processor, model, device, image, user_question) st.write("Description:") st.write(description) if __name__ == "__main__": main()