File size: 7,012 Bytes
7575d3e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9e1b840
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b63e96a
9e1b840
 
 
 
 
 
 
 
 
 
 
 
7575d3e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9e1b840
7575d3e
 
 
9e1b840
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# import streamlit as st
# from transformers import AutoProcessor, Qwen2VLForConditionalGeneration
# from PIL import Image
# import torch
# import cv2
# import tempfile

# def load_model_and_processor():
#     processor = AutoProcessor.from_pretrained("Qwen/Qwen2-VL-2B-Instruct")
#     model = Qwen2VLForConditionalGeneration.from_pretrained("Qwen/Qwen2-VL-2B-Instruct")
#     device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
#     model.to(device)
#     return processor, model, device

# def process_image(uploaded_file):
#     image = Image.open(uploaded_file)
#     image = image.resize((512, 512))
#     return image

# def process_video(uploaded_file):
#     tfile = tempfile.NamedTemporaryFile(delete=False)
#     tfile.write(uploaded_file.read())
#     cap = cv2.VideoCapture(tfile.name)
#     ret, frame = cap.read()
#     cap.release()
#     if not ret:
#         return None
#     image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
#     image = image.resize((512, 512))
#     return image

# def generate_description(processor, model, device, image, user_question):
#     messages = [
#         {
#             "role": "user",
#             "content": [
#                 {
#                     "type": "image",
#                     "image": image,
#                 },
#                 {"type": "text", "text": user_question},
#             ],
#         }
#     ]
#     text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
#     inputs = processor(text=[text], images=[image], padding=True, return_tensors="pt")
#     inputs = inputs.to(device)
#     generated_ids = model.generate(**inputs, max_new_tokens=512)
#     generated_ids_trimmed = [out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)]
#     output_text = processor.batch_decode(generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False)
#     return output_text[0]

# def main():
#     st.title("Media Description Generator")
#     processor, model, device = load_model_and_processor()
#     uploaded_files = st.file_uploader("Choose images or videos...", type=["jpg", "jpeg", "png", "mp4", "avi", "mov"], accept_multiple_files=True)

#     if uploaded_files:
#         user_question = st.text_input("Ask a question about the images or videos:")
#         if user_question:
#             for uploaded_file in uploaded_files:
#                 file_type = uploaded_file.type.split('/')[0]
#                 if file_type == 'image':
#                     image = process_image(uploaded_file)
#                     st.image(image, caption='Uploaded Image.', use_column_width=True)
#                     st.write("Generating description...")
#                 elif file_type == 'video':
#                     image = process_video(uploaded_file)
#                     if image is None:
#                         st.error("Failed to read the video file.")
#                         continue
#                     st.image(image, caption='First Frame of Uploaded Video.', use_column_width=True)
#                     st.write("Generating description...")
#                 else:
#                     st.error("Unsupported file type.")
#                     continue
#                 description = generate_description(processor, model, device, image, user_question)
#                 st.write("Description:")
#                 st.write(description)

# if __name__ == "__main__":
#     main()


import streamlit as st
from transformers import AutoProcessor, Qwen2VLForConditionalGeneration
from PIL import Image
import torch
import cv2
import tempfile

def load_model_and_processor():
    processor = AutoProcessor.from_pretrained("Qwen/Qwen2-VL-2B-Instruct")
    model = Qwen2VLForConditionalGeneration.from_pretrained("Qwen/Qwen2-VL-2B-Instruct")
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    return processor, model, device

def process_image(uploaded_file):
    image = Image.open(uploaded_file)
    image = image.resize((512, 512))
    return image

def process_video(uploaded_file):
    tfile = tempfile.NamedTemporaryFile(delete=False)
    tfile.write(uploaded_file.read())
    cap = cv2.VideoCapture(tfile.name)
    ret, frame = cap.read()
    cap.release()
    if not ret:
        return None
    image = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    image = image.resize((512, 512))
    return image

def generate_description(processor, model, device, image, user_question):
    messages = [
        {
            "role": "user",
            "content": [
                {
                    "type": "image",
                    "image": image,
                },
                {"type": "text", "text": user_question},
            ],
        }
    ]
    text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
    inputs = processor(text=[text], images=[image], padding=True, return_tensors="pt")
    inputs = inputs.to(device)
    generated_ids = model.generate(**inputs, max_new_tokens=512)
    generated_ids_trimmed = [out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)]
    output_text = processor.batch_decode(generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False)
    return output_text[0]

def main():
    st.title("Media Description Generator")
    processor, model, device = load_model_and_processor()
    uploaded_files = st.file_uploader("Choose images or videos...", type=["jpg", "jpeg", "png", "mp4", "avi", "mov"], accept_multiple_files=True)

    if uploaded_files:
        user_question = st.text_input("Ask a question about the images or videos:")
        if user_question:
            generate_button = st.button("Generate Descriptions")
            if generate_button:
                for uploaded_file in uploaded_files:
                    file_type = uploaded_file.type.split('/')[0]
                    if file_type == 'image':
                        image = process_image(uploaded_file)
                        st.image(image, caption='Uploaded Image.', use_column_width=True)
                        st.write("Generating description...")
                    elif file_type == 'video':
                        image = process_video(uploaded_file)
                        if image is None:
                            st.error("Failed to read the video file.")
                            continue
                        st.image(image, caption='First Frame of Uploaded Video.', use_column_width=True)
                        st.write("Generating description...")
                    else:
                        st.error("Unsupported file type.")
                        continue
                    description = generate_description(processor, model, device, image, user_question)
                    st.write("Description:")
                    st.write(description)

if __name__ == "__main__":
    main()