import gradio as gr from transformers import AutoModelForCausalLM, AutoProcessor, GenerationConfig from PIL import Image import torch import spaces import json import re from langdetect import detect, LangDetectException from googletrans import Translator # Load the processor and model processor = AutoProcessor.from_pretrained( 'allenai/Molmo-7B-D-0924', trust_remote_code=True, torch_dtype='auto', device_map='auto' ) model = AutoModelForCausalLM.from_pretrained( 'allenai/Molmo-7B-D-0924', trust_remote_code=True, torch_dtype='auto', device_map='auto' ) import json def wrap_json_in_markdown(text): result = [] stack = [] json_start = None in_json = False i = 0 while i < len(text): char = text[i] if char in ['{', '[']: if not in_json: json_start = i in_json = True stack.append(char) else: stack.append(char) elif char in ['}', ']'] and in_json: if not stack: # Unbalanced bracket, reset in_json = False json_start = None else: last = stack.pop() if (last == '{' and char != '}') or (last == '[' and char != ']'): # Mismatched brackets in_json = False json_start = None if in_json and not stack: # Potential end of JSON json_str = text[json_start:i+1] try: # Try to parse the JSON to ensure it's valid parsed = json.loads(json_str) # Wrap in Markdown code block wrapped = f"\n```json\n{json.dumps(parsed, indent=4)}\n```\n" result.append(text[:json_start]) # Append text before JSON result.append(wrapped) # Append wrapped JSON text = text[i+1:] # Update the remaining text i = -1 # Reset index except json.JSONDecodeError: # Not valid JSON, continue searching pass in_json = False json_start = None i += 1 result.append(text) # Append any remaining text return ''.join(result) def decode_unicode_sequences(unicode_seq): """ Decodes a sequence of Unicode escape sequences (e.g., \\u4F60\\u597D) to actual characters. Args: unicode_seq (str): A string containing Unicode escape sequences. Returns: str: The decoded Unicode string. """ # Regular expression to find \uXXXX unicode_escape_pattern = re.compile(r'\\u([0-9a-fA-F]{4})') # Function to replace each \uXXXX with the corresponding character def replace_match(match): hex_value = match.group(1) return chr(int(hex_value, 16)) # Decode all \uXXXX sequences decoded = unicode_escape_pattern.sub(replace_match, unicode_seq) return decoded def is_mandarin(text): """ Detects if the given text is in Mandarin. Args: text (str): The text to check. Returns: bool: True if the text is detected as Mandarin, False otherwise. """ try: lang = detect(text) return lang == 'zh-cn' or lang == 'zh-tw' or lang == 'zh' except LangDetectException: return False def translate_to_english(text, translator): """ Translates the given Mandarin text to English. Args: text (str): The Mandarin text to translate. translator (Translator): An instance of googletrans Translator. Returns: str: The translated English text. """ try: translation = translator.translate(text, src='zh-cn', dest='en') return translation.text except Exception as e: print(f"Translation error: {e}") return text # Return the original text if translation fails def process_text_for_mandarin_unicode(input_string): """ Processes the input string to find Unicode escape sequences representing Mandarin words, translates them to English, and replaces them accordingly. Args: input_string (str): The original string containing Unicode escape sequences. Returns: str: The processed string with translations where applicable. """ # Initialize the translator translator = Translator() # Regular expression to find groups of consecutive \uXXXX sequences unicode_word_pattern = re.compile(r'(?:\\u[0-9a-fA-F]{4})+') # Function to process each matched Unicode word def process_match(match): unicode_seq = match.group(0) decoded_word = decode_unicode_sequences(unicode_seq) if is_mandarin(decoded_word): translated = translate_to_english(decoded_word, translator) return f"{translated} ({decoded_word})" else: # If not Mandarin, return the original sequence return unicode_seq # Substitute all matched Unicode words with their translations if applicable processed_string = unicode_word_pattern.sub(process_match, input_string) return processed_string @spaces.GPU() def process_image_and_text(image, text): # Process the image and text inputs = processor.process( images=[Image.fromarray(image)], text=text ) # Move inputs to the correct device and make a batch of size 1 inputs = {k: v.to(model.device).unsqueeze(0) for k, v in inputs.items()} # Generate output output = model.generate_from_batch( inputs, GenerationConfig(max_new_tokens=1024, stop_strings="<|endoftext|>"), tokenizer=processor.tokenizer ) # Only get generated tokens; decode them to text generated_tokens = output[0, inputs['input_ids'].size(1):] generated_text = processor.tokenizer.decode(generated_tokens, skip_special_tokens=True) generated_text_w_json_wrapper = wrap_json_in_markdown(generated_text) generated_text_w_unicode_mdn = process_text_for_mandarin_unicode(generated_text_w_json_wrapper) return generated_text_w_unicode_mdn def chatbot(image, text, history): if image is None: return history + [("Please upload an image first.", None)] response = process_image_and_text(image, text) history.append({"role": "user", "content": text}) history.append({"role": "assistant", "content": response}) return history # Define the Gradio interface with gr.Blocks() as demo: gr.Markdown("# Image Chatbot with Molmo-7B-D-0924") with gr.Row(): image_input = gr.Image(type="numpy") chatbot_output = gr.Chatbot(type="messages") text_input = gr.Textbox(placeholder="Ask a question about the image...") submit_button = gr.Button("Submit") state = gr.State([]) submit_button.click( chatbot, inputs=[image_input, text_input, state], outputs=[chatbot_output] ) text_input.submit( chatbot, inputs=[image_input, text_input, state], outputs=[chatbot_output] ) demo.launch()