File size: 4,948 Bytes
59e8091
 
 
 
 
 
 
 
99ad72b
 
 
59e8091
 
 
 
 
 
99ad72b
 
 
 
 
59e8091
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
99ad72b
 
 
 
 
 
 
 
 
 
59e8091
ece2105
 
 
 
 
 
 
 
 
 
 
 
 
 
59e8091
99ad72b
59e8091
 
 
 
 
99ad72b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
59e8091
99ad72b
 
 
 
 
59e8091
 
99ad72b
59e8091
 
 
 
 
 
 
 
 
 
 
 
99ad72b
59e8091
 
 
 
 
99ad72b
 
9fb5714
 
 
 
99ad72b
 
9fb5714
99ad72b
59e8091
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import gradio as gr
from transformers import LlavaProcessor, LlavaForConditionalGeneration, TextIteratorStreamer
from threading import Thread
import re
import time 
from PIL import Image
import torch
import cv2
import spaces

model_id = "llava-hf/llava-interleave-qwen-0.5b-hf"

processor = LlavaProcessor.from_pretrained(model_id)

model = LlavaForConditionalGeneration.from_pretrained(model_id, torch_dtype=torch.float16)
model.to("cuda")


def replace_video_with_images(text, frames):
  return text.replace("<video>", "<image>" * frames)

def sample_frames(video_file, num_frames):
    video = cv2.VideoCapture(video_file)
    total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
    interval = total_frames // num_frames
    frames = []
    for i in range(total_frames):
        ret, frame = video.read()
        pil_img = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
        if not ret:
            continue
        if i % interval == 0:
            frames.append(pil_img)
    video.release()
    return frames

@spaces.GPU
def bot_streaming(message, history):

  txt = message.text
  ext_buffer = f"user\n{txt} assistant"

  if message.files:
    if len(message.files) == 1:
      image = [message.files[0].path]
    # interleaved images or video
    elif len(message.files) > 1:
      image = [msg.path for msg in message.files]
  else:
      
    def has_file_data(lst):
      return any(isinstance(item, FileData) for sublist in lst if isinstance(sublist, tuple) for item in sublist)

    def extract_paths(lst):
        return [item.path for sublist in lst if isinstance(sublist, tuple) for item in sublist if isinstance(item, FileData)]

    latest_text_only_index = -1

    for i, item in enumerate(history):
        if all(isinstance(sub_item, str) for sub_item in item):
            latest_text_only_index = i

    image = [path for i, item in enumerate(history) if i < latest_text_only_index and has_file_data(item) for path in extract_paths(item)]

  if message.files is None:
      gr.Error("You need to upload an image or video for LLaVA to work.")
      
  video_extensions = ("avi", "mp4", "mov", "mkv", "flv", "wmv", "mjpeg")
  image_extensions = Image.registered_extensions()
  image_extensions = tuple([ex for ex, f in image_extensions.items()])
  if len(image) == 1:
    if image[0].endswith(video_extensions):
        
        image = sample_frames(image[0], 12)
        image_tokens = "<image>" * 13 
        prompt = f"<|im_start|>user {image_tokens}\n{message.text}<|im_end|><|im_start|>assistant"
    elif image[0].endswith(image_extensions):
        image = Image.open(image[0]).convert("RGB")
        prompt = f"<|im_start|>user <image>\n{message.text}<|im_end|><|im_start|>assistant"

  elif len(image) > 1:
    image_list = []
    user_prompt = message.text

    for img in image:
      if img.endswith(image_extensions):
        img = Image.open(img).convert("RGB")
        image_list.append(img)

      elif img.endswith(video_extensions):        
        frames = sample_frames(img, 6)
        for frame in frames:
          image_list.append(frame)
      
    toks = "<image>" * len(image_list)
    prompt = "<|im_start|>user"+ toks + f"\n{user_prompt}<|im_end|><|im_start|>assistant"

    image = image_list


  inputs = processor(prompt, image, return_tensors="pt").to("cuda", torch.float16)
  streamer = TextIteratorStreamer(processor, **{"max_new_tokens": 200, "skip_special_tokens": True})
  generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=100)
  generated_text = ""

  thread = Thread(target=model.generate, kwargs=generation_kwargs)
  thread.start()

  

  buffer = ""
  for new_text in streamer:
    
    buffer += new_text
    
    generated_text_without_prompt = buffer[len(ext_buffer):]
    time.sleep(0.01)
    yield generated_text_without_prompt


demo = gr.ChatInterface(fn=bot_streaming, title="LLaVA Interleave", examples=[
     {"text": "The input contains two videos, are the cats in this video and this video doing the same thing?", "files":["./cats_1.mp4", "./cats_2.mp4"]},
    {"text": "There are two images in the input. What is the relationship between this image and this image?", "files":["./bee.jpg", "./depth-bee.png"]},
     {"text": "What are these cats doing?", "files":["./cats.mp4"]}, 

    {"text": "What is on the flower?", "files":["./bee.jpg"]},
    {"text": "How to make this pastry?", "files":["./baklava.png"]}], 
      textbox=gr.MultimodalTextbox(file_count="multiple"), 
      description="Try [LLaVA Interleave](https://huggingface.co/docs/transformers/main/en/model_doc/llava) in this demo (more specifically, the [Qwen-1.5-0.5B variant](https://huggingface.co/llava-hf/llava-interleave-qwen-7b-hf)). Upload an image or a video, and start chatting about it, or simply try one of the examples below. If you don't upload an image, you will receive an error. ",
      stop_btn="Stop Generation", multimodal=True)
demo.launch(debug=True)