import re import logging from typing import List, Optional, Union import numpy as np import torch from transformers.feature_extraction_utils import BatchFeature from transformers.image_utils import ImageInput, is_valid_image from transformers.processing_utils import ProcessorMixin from transformers.tokenization_utils_base import ( PaddingStrategy, PreTokenizedInput, TextInput, TruncationStrategy, ) from transformers.utils import TensorType logger = logging.getLogger(__name__) # Copied from transformers.models.idefics2.processing_idefics2.is_url def is_url(val) -> bool: return isinstance(val, str) and val.startswith("http") # Copied from transformers.models.idefics2.processing_idefics2.is_image_or_image_url def is_image_or_image_url(elem): return is_url(elem) or is_valid_image(elem) def _is_str_or_image(elem): return isinstance(elem, (str)) or is_image_or_image_url(elem) class MonoProcessor(ProcessorMixin): attributes = ["image_processor", "tokenizer"] image_processor_class = "CLIPImageProcessor" # tokenizer_class = ("BartTokenizer", "BartTokenizerFast") tokenizer_class = ("Qwen2Tokenizer", "Qwen2TokenizerFast") def __init__( self, image_processor=None, tokenizer=None, ): if image_processor is None: raise ValueError("You need to specify an `image_processor`.") if tokenizer is None: raise ValueError("You need to specify a `tokenizer`.") tokens_to_add = { "additional_special_tokens": tokenizer.additional_special_tokens + ["", "", "", ""] + [f"" for x in range(1000)] + [ "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", "", ] } tokenizer.add_special_tokens(tokens_to_add) self.tasks_answer_post_processing_type = { "": "pure_text", "": "ocr", "": "pure_text", "": "pure_text", "": "pure_text", "": "description_with_bboxes", "": "description_with_bboxes", "": "phrase_grounding", "": "polygons", "": "polygons", "": "description_with_bboxes_or_polygons", "": "pure_text", "": "pure_text", "": "pure_text", "": "bboxes", } self.task_prompts_without_inputs = { "": "What is the text in the image?", "": "What is the text in the image, with regions?", "": "What does the image describe?", "": "Describe in detail what is shown in the image.", "": "Describe with a paragraph what is shown in the image.", "": "Locate the objects with category name in the image.", "": "Locate the objects in the image, with their descriptions.", "": "Locate the region proposals in the image.", } self.task_prompts_with_input = { "": "Locate the phrases in the caption: {input}", "": "Locate {input} in the image with mask", "": "What is the polygon mask of region {input}", "": "Locate {input} in the image.", "": "What is the region {input}?", "": "What does the region {input} describe?", "": "What text is in the region {input}?", } super().__init__(image_processor, tokenizer) def construct_prompts(self, text): # replace the task tokens with the task prompts if task token is in the text if isinstance(text, str): for task_token, task_prompt in self.task_prompts_without_inputs.items(): if task_token in text: _text = task_prompt break return _text prompts = [] for _text in text: # 1. fixed task prompts without additional inputs for task_token, task_prompt in self.task_prompts_without_inputs.items(): if task_token in _text: assert ( _text == task_token ), f"Task token {task_token} should be the only token in the text." _text = task_prompt break # 2. task prompts with additional inputs for task_token, task_prompt in self.task_prompts_with_input.items(): if task_token in _text: _text = task_prompt.format(input=_text.replace(task_token, "")) break prompts.append(_text) return prompts def __call__( self, text: Union[ TextInput, PreTokenizedInput, List[TextInput], List[PreTokenizedInput] ] = None, images: ImageInput = None, tokenize_newline_separately: bool = True, padding: Union[bool, str, PaddingStrategy] = False, truncation: Union[bool, str, TruncationStrategy] = None, max_length=None, return_tensors: Optional[Union[str, TensorType]] = TensorType.PYTORCH, do_resize: bool = None, size=None, do_normalize: bool = None, image_mean: Optional[Union[float, List[float]]] = None, image_std: Optional[Union[float, List[float]]] = None, data_format: Optional["ChannelDimension"] = "channels_first", # noqa: F821 input_data_format: Optional[ Union[str, "ChannelDimension"] # noqa: F821 ] = None, resample: "PILImageResampling" = None, # noqa: F821 do_convert_rgb: bool = None, do_thumbnail: bool = None, do_align_long_axis: bool = None, do_rescale: bool = None, ) -> BatchFeature: return_token_type_ids = False if text is None: logger.warning_once("You are using Florence-2 without a text prompt.") text = "" if isinstance(text, List) and isinstance(images, List): if len(images) < len(text): raise ValueError( f"Received {len(images)} images for {len(text)} prompts. Each prompt should be associated with an image." ) if _is_str_or_image(text): text = [text] elif isinstance(text, list) and _is_str_or_image(text[0]): pass if images is not None: pixel_values = self.image_processor( images, size=size, do_resize=do_resize, do_normalize=do_normalize, return_tensors=return_tensors, image_mean=image_mean, image_std=image_std, input_data_format=input_data_format, data_format=data_format, resample=resample, do_convert_rgb=do_convert_rgb, )["pixel_values"] # text = self.construct_prompts(text) inputs = self.tokenizer( text, return_tensors=return_tensors, padding=padding, max_length=max_length, truncation=truncation, return_token_type_ids=return_token_type_ids, ) if images is not None: # print(inputs) # add IMAGE_TOKEN inputs_with_image = [ torch.cat((torch.tensor([-200]), b), dim=0) for b in inputs["input_ids"] ] # inputs["input_ids"] = torch.stack(inputs_with_image) inputs["input_ids"] = inputs_with_image return_data = {**inputs, "pixel_values": pixel_values} else: return_data = {**inputs, "pixel_values": None} if return_token_type_ids: labels = inputs["input_ids"].masked_fill( inputs["token_type_ids"] == 0, -100 ) return_data.update({"labels": labels}) return BatchFeature(data=return_data) # Copied from transformers.models.clip.processing_clip.CLIPProcessor.batch_decode with CLIP->Florence2 def batch_decode(self, *args, **kwargs): """ This method forwards all its arguments to BartTokenizerFast's [`~PreTrainedTokenizer.batch_decode`]. Please refer to the docstring of this method for more information. """ return self.tokenizer.batch_decode(*args, **kwargs) # Copied from transformers.models.clip.processing_clip.CLIPProcessor.decode with CLIP->Florence2 def decode(self, *args, **kwargs): """ This method forwards all its arguments to BartTokenizerFast's [`~PreTrainedTokenizer.decode`]. Please refer to the docstring of this method for more information. """ return self.tokenizer.decode(*args, **kwargs) @property # Copied from transformers.models.clip.processing_clip.CLIPProcessor.model_input_names with CLIP->Florence2 def model_input_names(self): tokenizer_input_names = self.tokenizer.model_input_names image_processor_input_names = self.image_processor.model_input_names return list(dict.fromkeys(tokenizer_input_names + image_processor_input_names))