# import streamlit as st # from transformers import AutoProcessor, Qwen2VLForConditionalGeneration # from PIL import Image # import torch # import cv2 # import tempfile # def load_model_and_processor(): # processor = AutoProcessor.from_pretrained("Qwen/Qwen2-VL-2B-Instruct") # model = Qwen2VLForConditionalGeneration.from_pretrained("Qwen/Qwen2-VL-2B-Instruct") # device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # model.to(device) # return processor, model, device # def process_image(uploaded_file): # image = Image.open(uploaded_file) # image = image.resize((512, 512)) # return image # def process_video(uploaded_file): # tfile = tempfile.NamedTemporaryFile(delete=False) # tfile.write(uploaded_file.read()) # cap = cv2.VideoCapture(tfile.name) # ret, frame = cap.read() # cap.release() # if not ret: # return None # image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) # image = image.resize((512, 512)) # return image # def generate_description(processor, model, device, image, user_question): # messages = [ # { # "role": "user", # "content": [ # { # "type": "image", # "image": image, # }, # {"type": "text", "text": user_question}, # ], # } # ] # text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) # inputs = processor(text=[text], images=[image], padding=True, return_tensors="pt") # inputs = inputs.to(device) # generated_ids = model.generate(**inputs, max_new_tokens=512) # generated_ids_trimmed = [out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)] # output_text = processor.batch_decode(generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False) # return output_text[0] # def main(): # st.title("Media Description Generator") # processor, model, device = load_model_and_processor() # uploaded_files = st.file_uploader("Choose images or videos...", type=["jpg", "jpeg", "png", "mp4", "avi", "mov"], accept_multiple_files=True) # if uploaded_files: # user_question = st.text_input("Ask a question about the images or videos:") # if user_question: # for uploaded_file in uploaded_files: # file_type = uploaded_file.type.split('/')[0] # if file_type == 'image': # image = process_image(uploaded_file) # st.image(image, caption='Uploaded Image.', use_column_width=True) # st.write("Generating description...") # elif file_type == 'video': # image = process_video(uploaded_file) # if image is None: # st.error("Failed to read the video file.") # continue # st.image(image, caption='First Frame of Uploaded Video.', use_column_width=True) # st.write("Generating description...") # else: # st.error("Unsupported file type.") # continue # description = generate_description(processor, model, device, image, user_question) # st.write("Description:") # st.write(description) # if __name__ == "__main__": # main() import streamlit as st from transformers import AutoProcessor, Qwen2VLForConditionalGeneration from PIL import Image import torch import cv2 import tempfile def load_model_and_processor(): processor = AutoProcessor.from_pretrained("Qwen/Qwen2-VL-2B-Instruct") model = Qwen2VLForConditionalGeneration.from_pretrained("Qwen/Qwen2-VL-2B-Instruct") device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model.to(device) return processor, model, device def process_image(uploaded_file): image = Image.open(uploaded_file) image = image.resize((512, 512)) return image def process_video(uploaded_file): tfile = tempfile.NamedTemporaryFile(delete=False) tfile.write(uploaded_file.read()) cap = cv2.VideoCapture(tfile.name) ret, frame = cap.read() cap.release() if not ret: return None image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)) image = image.resize((512, 512)) return image def generate_description(processor, model, device, image, user_question): messages = [ { "role": "user", "content": [ { "type": "image", "image": image, }, {"type": "text", "text": user_question}, ], } ] text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) inputs = processor(text=[text], images=[image], padding=True, return_tensors="pt") inputs = inputs.to(device) generated_ids = model.generate(**inputs, max_new_tokens=512) generated_ids_trimmed = [out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)] output_text = processor.batch_decode(generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False) return output_text[0] def main(): st.title("Media Description Generator") processor, model, device = load_model_and_processor() uploaded_files = st.file_uploader("Choose images or videos...", type=["jpg", "jpeg", "png", "mp4", "avi", "mov"], accept_multiple_files=True) if uploaded_files: user_question = st.text_input("Ask a question about the images or videos:") if user_question: generate_button = st.button("Generate Descriptions") if generate_button: for uploaded_file in uploaded_files: file_type = uploaded_file.type.split('/')[0] if file_type == 'image': image = process_image(uploaded_file) st.image(image, caption='Uploaded Image.', use_column_width=True) st.write("Generating description...") elif file_type == 'video': image = process_video(uploaded_file) if image is None: st.error("Failed to read the video file.") continue st.image(image, caption='First Frame of Uploaded Video.', use_column_width=True) st.write("Generating description...") else: st.error("Unsupported file type.") continue description = generate_description(processor, model, device, image, user_question) st.write("Description:") st.write(description) if __name__ == "__main__": main()