import gradio as gr import spaces import os import time import json from PIL import Image import functools from transformers import AutoProcessor, Idefics2ForConditionalGeneration from models.conversation import conv_templates from typing import List processor = AutoProcessor.from_pretrained("MFuyu/mantis-8b-idefics2-video-eval_8192_lora") model = Idefics2ForConditionalGeneration.from_pretrained("MFuyu/mantis-8b-idefics2-video-eval_8192_lora") conv_template = conv_templates["idefics_2"] with open("./examples/data_subset.json", 'r') as f: examples = json.load(f) for item in examples: video_id = item['images'][0].split("_")[0] item['images'] = [os.path.join("./examples", video_id, x) for x in item['images']] prompt = "Suppose you are an expert in judging and evaluating the quality of AI-generated videos, \nplease watch the following frames of a given video and see the text prompt for generating the video, \nthen give scores from 7 different dimensions:\n(1) visual quality, \n(2) object consistency,\n(3) dynamic degree,\n(4) motion smoothness,\n(5) text-to-video alignment,\n(6) factual consistency, \n(7) overall score\nfor each dimension, output a number from [1,2,3], in which '1' stands for 'Bad', '2' stands for 'Average', '3' stands for 'Good'.\nHere is an output example: \nvisual quality: 3\nobject consistency: 2 \ndynamic degree: 2\nmotion smoothness: 1\ntext-to-video alignment: 1\nfactual consistency: 2\noverall score: 1\n\nFor this item, the text prompt is the beautiful girl, long hair,walk on the sity street, red cloth ,\nall the frames of video are as follows: \n\n" @spaces.GPU def generate(text:str, images:List[Image.Image], history: List[dict], **kwargs): global processor, model model = model.to("cuda") if model.device.type != "cuda" else model if not images: images = None user_role = conv_template.roles[0] assistant_role = conv_template.roles[1] idefics_2_message = [] cur_img_idx = 0 print(history) for i, message in enumerate(history): if message["role"] == user_role: idefics_2_message.append({ "role": user_role, "content": [] }) message_text = message["text"] num_image_tokens_in_text = message_text.count("") if num_image_tokens_in_text > 0: sub_texts = [x.strip() for x in message_text.split("")] if sub_texts[0]: idefics_2_message[-1]["content"].append({"type": "text", "text": sub_texts[0]}) for sub_text in sub_texts[1:]: idefics_2_message[-1]["content"].append({"type": "image"}) if sub_text: idefics_2_message.append({ "role": user_role, "content": [{"type": "text", "text": sub_text}] }) else: idefics_2_message[-1]["content"].append({"type": "text", "text": message_text}) elif message["role"] == assistant_role: if i == len(history) - 1 and not message["text"]: break idefics_2_message.append({ "role": assistant_role, "content": [{"type": "text", "text": message["text"]}] }) if text: assert idefics_2_message[-1]["role"] == assistant_role and not idefics_2_message[-1]["content"], "Internal error" idefics_2_message.append({ "role": user_role, "content": [{"type": "text", "text": text}] }) print(idefics_2_message) prompt = processor.apply_chat_template(idefics_2_message, add_generation_prompt=True) images = [Image.open(x) for x in images] inputs = processor(text=prompt, images=images, return_tensors="pt") inputs = {k: v.to(model.device) for k, v in inputs.items()} outputs = model.generate(**inputs, max_new_tokens=1024) generated_text = processor.decode(outputs[0, inputs["input_ids"].shape[-1]:], skip_special_tokens=True) return generated_text def enable_next_image(uploaded_images, image): uploaded_images.append(image) return uploaded_images, gr.MultimodalTextbox(value=None, interactive=False) def add_message(history, message): if message["files"]: for file in message["files"]: history.append([(file,), None]) if message["text"]: history.append([message["text"], None]) return history, gr.MultimodalTextbox(value=None) def print_like_dislike(x: gr.LikeData): print(x.index, x.value, x.liked) def get_chat_history(history): chat_history = [] user_role = conv_template.roles[0] assistant_role = conv_template.roles[1] for i, message in enumerate(history): if isinstance(message[0], str): chat_history.append({"role": user_role, "text": message[0]}) if i != len(history) - 1: assert message[1], "The bot message is not provided, internal error" chat_history.append({"role": assistant_role, "text": message[1]}) else: assert not message[1], "the bot message internal error, get: {}".format(message[1]) chat_history.append({"role": assistant_role, "text": ""}) return chat_history def get_chat_images(history): images = [] for message in history: if isinstance(message[0], tuple): images.extend(message[0]) return images def bot(history): cur_messages = {"text": "", "images": []} for message in history[::-1]: if message[1]: break if isinstance(message[0], str): cur_messages["text"] = message[0] + " " + cur_messages["text"] elif isinstance(message[0], tuple): cur_messages["images"].extend(message[0]) cur_messages["text"] = cur_messages["text"].strip() cur_messages["images"] = cur_messages["images"][::-1] if not cur_messages["text"]: raise gr.Error("Please enter a message") if cur_messages['text'].count("") < len(cur_messages['images']): gr.Warning("The number of images uploaded is more than the number of placeholders in the text. Will automatically prepend to the text.") cur_messages['text'] += " "* (len(cur_messages['images']) - cur_messages['text'].count("")) history[-1][0] = cur_messages["text"] if cur_messages['text'].count("") > len(cur_messages['images']): gr.Warning("The number of images uploaded is less than the number of placeholders in the text. Will automatically remove extra placeholders from the text.") cur_messages['text'] = cur_messages['text'][::-1].replace(""[::-1], "", cur_messages['text'].count("") - len(cur_messages['images']))[::-1] history[-1][0] = cur_messages["text"] chat_history = get_chat_history(history) chat_images = get_chat_images(history) generation_kwargs = { "max_new_tokens": 4096, "num_beams": 1, "do_sample": False } response = generate(None, chat_images, chat_history, **generation_kwargs) return response # for _output in response: # history[-1][1] = _output # time.sleep(0.05) # yield history def get_images(video_folder:str): """ video folder contains images files like {video_folder_name}_00.jpg, {video_folder_name}_01.jpg, ... """ images = [] for file in os.listdir(video_folder): if file.endswith(".jpg"): images.append(Image.open(os.path.join(video_folder, file))) # sort images by name images.sort(key=lambda x: int(x.filename.split("_")[-1].split(".")[0])) return images def build_demo(): with gr.Blocks() as demo: gr.Markdown(""" # Mantis Mantis is a multimodal conversational AI model that can chat with users about images and text. It's optimized for multi-image reasoning, where inverleaved text and images can be used to generate responses. ### [Paper](https://arxiv.org/abs/2405.01483) | [Github](https://github.com/TIGER-AI-Lab/Mantis) | [Models](https://huggingface.co/collections/TIGER-Lab/mantis-6619b0834594c878cdb1d6e4) | [Dataset](https://huggingface.co/datasets/TIGER-Lab/Mantis-Instruct) | [Website](https://tiger-ai-lab.github.io/Mantis/) """) gr.Markdown("""## Chat with Mantis Mantis supports interleaved text-image input format, where you can simply use the placeholder `` to indicate the position of uploaded images. The model is optimized for multi-image reasoning, while preserving the ability to chat about text and images in a single conversation. (The model currently serving is [🤗 TIGER-Lab/Mantis-8B-siglip-llama3](https://huggingface.co/TIGER-Lab/Mantis-8B-siglip-llama3)) """) chatbot = gr.Chatbot(line_breaks=True) chat_input = gr.MultimodalTextbox(interactive=True, file_types=["image"], placeholder="Enter message or upload images. Please use to indicate the position of uploaded images", show_label=True) chat_msg = chat_input.submit(add_message, [chatbot, chat_input], [chatbot, chat_input]) """ with gr.Accordion(label='Advanced options', open=False): temperature = gr.Slider( label='Temperature', minimum=0.1, maximum=2.0, step=0.1, value=0.2, interactive=True ) top_p = gr.Slider( label='Top-p', minimum=0.05, maximum=1.0, step=0.05, value=1.0, interactive=True ) """ bot_msg = chat_msg.success(bot, chatbot, chatbot, api_name="bot_response") chatbot.like(print_like_dislike, None, None) with gr.Row(): send_button = gr.Button("Send") clear_button = gr.ClearButton([chatbot, chat_input]) send_button.click( add_message, [chatbot, chat_input], [chatbot, chat_input] ).then( bot, chatbot, chatbot, api_name="bot_response" ) dummy_id = gr.Textbox("dummy_id", label="dummy_id", visible=False) dummy_output = gr.Textbox("dummy_output", label="dummy_output", visible=False) gr.Examples( examples=[ [ item['id'], { "text": item['conversations'][0]['value'], "files": item['images'] }, item['conversations'][1]['value'] ] for item in examples ], inputs=[dummy_id, chat_input, dummy_output], ) gr.Markdown(""" ## Citation ``` @article{jiang2024mantis, title={MANTIS: Interleaved Multi-Image Instruction Tuning}, author={Jiang, Dongfu and He, Xuan and Zeng, Huaye and Wei, Con and Ku, Max and Liu, Qian and Chen, Wenhu}, journal={arXiv preprint arXiv:2405.01483}, year={2024} } ```""") return demo if __name__ == "__main__": demo = build_demo() demo.launch()