# coding=utf-8 # Copyright 2024 The HuggingFace Inc. team. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Image processor class for LLaVa-NeXT.""" import math from typing import Dict, List, Optional, Union import numpy as np from transformers.image_processing_utils import BaseImageProcessor, BatchFeature, get_size_dict, select_best_resolution from transformers.image_transforms import ( convert_to_rgb, get_resize_output_image_size, pad, resize, to_channel_dimension_format, ) from transformers.image_utils import ( OPENAI_CLIP_MEAN, OPENAI_CLIP_STD, ChannelDimension, ImageInput, PILImageResampling, get_image_size, infer_channel_dimension_format, is_scaled_image, make_list_of_images, to_numpy_array, valid_images, validate_preprocess_arguments, ) from transformers.utils import TensorType, is_vision_available, logging logger = logging.get_logger(__name__) if is_vision_available(): from PIL import Image def divide_to_patches(image: np.array, patch_size: int, input_data_format) -> List[np.array]: """ Divides an image into patches of a specified size. Args: image (`np.array`): The input image. patch_size (`int`): The size of each patch. input_data_format (`ChannelDimension` or `str`): The channel dimension format of the input image. Returns: list: A list of np.array representing the patches. """ patches = [] height, width = get_image_size(image, channel_dim=input_data_format) for i in range(0, height, patch_size): for j in range(0, width, patch_size): if input_data_format == ChannelDimension.LAST: patch = image[i : i + patch_size, j : j + patch_size] else: patch = image[:, i : i + patch_size, j : j + patch_size] patches.append(patch) return patches def expand_to_square(image: np.array, background_color, input_data_format) -> np.array: """ Expands an image to a square by adding a background color. """ height, width = get_image_size(image, channel_dim=input_data_format) if width == height: return image elif width > height: result = np.ones((width, width, image.shape[2]), dtype=image.dtype) * background_color result[(width - height) // 2 : (width - height) // 2 + height, :] = image return result else: result = np.ones((height, height, image.shape[2]), dtype=image.dtype) * background_color result[:, (height - width) // 2 : (height - width) // 2 + width] = image return result def _get_patch_output_size(image, target_resolution, input_data_format): original_height, original_width = get_image_size(image, channel_dim=input_data_format) target_height, target_width = target_resolution scale_w = target_width / original_width scale_h = target_height / original_height if scale_w < scale_h: new_width = target_width new_height = min(math.ceil(original_height * scale_w), target_height) else: new_height = target_height new_width = min(math.ceil(original_width * scale_h), target_width) return new_height, new_width class LlavaNextImageProcessor(BaseImageProcessor): r""" Constructs a LLaVa-NeXT image processor. Based on [`CLIPImageProcessor`] with incorporation of additional techniques for processing high resolution images as explained in the [LLaVa paper](https://arxiv.org/abs/2310.03744). Args: do_resize (`bool`, *optional*, defaults to `True`): Whether to resize the image's (height, width) dimensions to the specified `size`. Can be overridden by `do_resize` in the `preprocess` method. size (`Dict[str, int]` *optional*, defaults to `{"shortest_edge": 224}`): Size of the image after resizing. The shortest edge of the image is resized to size["shortest_edge"], with the longest edge resized to keep the input aspect ratio. Can be overridden by `size` in the `preprocess` method. image_grid_pinpoints (`List` *optional*, defaults to `[[672, 336], [336, 672], [672, 672], [336, 1008], [1008, 336]]`): A list of possible resolutions to use for processing high resolution images. The best resolution is selected based on the original size of the image. Can be overridden by `image_grid_pinpoints` in the `preprocess` method. resample (`PILImageResampling`, *optional*, defaults to `Resampling.BICUBIC`): Resampling filter to use if resizing the image. Can be overridden by `resample` in the `preprocess` method. do_center_crop (`bool`, *optional*, defaults to `True`): Whether to center crop the image to the specified `crop_size`. Can be overridden by `do_center_crop` in the `preprocess` method. crop_size (`Dict[str, int]` *optional*, defaults to 224): Size of the output image after applying `center_crop`. Can be overridden by `crop_size` in the `preprocess` method. do_rescale (`bool`, *optional*, defaults to `True`): Whether to rescale the image by the specified scale `rescale_factor`. Can be overridden by `do_rescale` in the `preprocess` method. rescale_factor (`int` or `float`, *optional*, defaults to `1/255`): Scale factor to use if rescaling the image. Can be overridden by `rescale_factor` in the `preprocess` method. do_normalize (`bool`, *optional*, defaults to `True`): Whether to normalize the image. Can be overridden by `do_normalize` in the `preprocess` method. image_mean (`float` or `List[float]`, *optional*, defaults to `[0.48145466, 0.4578275, 0.40821073]`): Mean to use if normalizing the image. This is a float or list of floats the length of the number of channels in the image. Can be overridden by the `image_mean` parameter in the `preprocess` method. image_std (`float` or `List[float]`, *optional*, defaults to `[0.26862954, 0.26130258, 0.27577711]`): Standard deviation to use if normalizing the image. This is a float or list of floats the length of the number of channels in the image. Can be overridden by the `image_std` parameter in the `preprocess` method. Can be overridden by the `image_std` parameter in the `preprocess` method. do_convert_rgb (`bool`, *optional*, defaults to `True`): Whether to convert the image to RGB. """ model_input_names = ["pixel_values"] def __init__( self, do_resize: bool = True, size: Dict[str, int] = None, image_grid_pinpoints: List = None, resample: PILImageResampling = PILImageResampling.BICUBIC, do_center_crop: bool = True, crop_size: Dict[str, int] = None, do_rescale: bool = True, rescale_factor: Union[int, float] = 1 / 255, do_normalize: bool = True, image_mean: Optional[Union[float, List[float]]] = None, image_std: Optional[Union[float, List[float]]] = None, do_convert_rgb: bool = True, **kwargs, ) -> None: super().__init__(**kwargs) size = size if size is not None else {"shortest_edge": 224} size = get_size_dict(size, default_to_square=False) image_grid_pinpoints = ( image_grid_pinpoints if image_grid_pinpoints is not None else [[336, 672], [672, 336], [672, 672], [1008, 336], [336, 1008]] ) crop_size = crop_size if crop_size is not None else {"height": 224, "width": 224} crop_size = get_size_dict(crop_size, default_to_square=True, param_name="crop_size") self.do_resize = do_resize self.size = size self.image_grid_pinpoints = image_grid_pinpoints self.resample = resample self.do_center_crop = do_center_crop self.crop_size = crop_size self.do_rescale = do_rescale self.rescale_factor = rescale_factor self.do_normalize = do_normalize self.image_mean = image_mean if image_mean is not None else OPENAI_CLIP_MEAN self.image_std = image_std if image_std is not None else OPENAI_CLIP_STD self.do_convert_rgb = do_convert_rgb # Copied from transformers.models.clip.image_processing_clip.CLIPImageProcessor.resize with CLIP->LLaVa def resize( self, image: np.ndarray, size: Dict[str, int], resample: PILImageResampling = PILImageResampling.BICUBIC, data_format: Optional[Union[str, ChannelDimension]] = None, input_data_format: Optional[Union[str, ChannelDimension]] = None, **kwargs, ) -> np.ndarray: """ Resize an image. The shortest edge of the image is resized to size["shortest_edge"], with the longest edge resized to keep the input aspect ratio. Args: image (`np.ndarray`): Image to resize. size (`Dict[str, int]`): Size of the output image. resample (`PILImageResampling`, *optional*, defaults to `PILImageResampling.BICUBIC`): Resampling filter to use when resiizing the image. data_format (`str` or `ChannelDimension`, *optional*): The channel dimension format of the image. If not provided, it will be the same as the input image. input_data_format (`ChannelDimension` or `str`, *optional*): The channel dimension format of the input image. If not provided, it will be inferred. """ default_to_square = True if "shortest_edge" in size: size = size["shortest_edge"] default_to_square = False elif "height" in size and "width" in size: size = (size["height"], size["width"]) else: raise ValueError("Size must contain either 'shortest_edge' or 'height' and 'width'.") output_size = get_resize_output_image_size( image, size=size, default_to_square=default_to_square, input_data_format=input_data_format, ) return resize( image, size=output_size, resample=resample, data_format=data_format, input_data_format=input_data_format, **kwargs, ) def _preprocess( self, images: ImageInput, do_resize: bool = None, size: Dict[str, int] = None, resample: PILImageResampling = None, do_center_crop: bool = None, crop_size: int = None, do_rescale: bool = None, rescale_factor: float = None, do_normalize: bool = None, image_mean: Optional[Union[float, List[float]]] = None, image_std: Optional[Union[float, List[float]]] = None, data_format: Optional[ChannelDimension] = ChannelDimension.FIRST, input_data_format: Optional[Union[str, ChannelDimension]] = None, ) -> Image.Image: """ Preprocess an image or batch of images. Copy of the `preprocess` method from `CLIPImageProcessor`. Args: images (`ImageInput`): Image to preprocess. Expects a single or batch of images with pixel values ranging from 0 to 255. If passing in images with pixel values between 0 and 1, set `do_rescale=False`. do_resize (`bool`, *optional*, defaults to `self.do_resize`): Whether to resize the image. size (`Dict[str, int]`, *optional*, defaults to `self.size`): Size of the image after resizing. Shortest edge of the image is resized to size["shortest_edge"], with the longest edge resized to keep the input aspect ratio. resample (`int`, *optional*, defaults to `self.resample`): Resampling filter to use if resizing the image. This can be one of the enum `PILImageResampling`. Only has an effect if `do_resize` is set to `True`. do_center_crop (`bool`, *optional*, defaults to `self.do_center_crop`): Whether to center crop the image. crop_size (`Dict[str, int]`, *optional*, defaults to `self.crop_size`): Size of the center crop. Only has an effect if `do_center_crop` is set to `True`. do_rescale (`bool`, *optional*, defaults to `self.do_rescale`): Whether to rescale the image. rescale_factor (`float`, *optional*, defaults to `self.rescale_factor`): Rescale factor to rescale the image by if `do_rescale` is set to `True`. do_normalize (`bool`, *optional*, defaults to `self.do_normalize`): Whether to normalize the image. image_mean (`float` or `List[float]`, *optional*, defaults to `self.image_mean`): Image mean to use for normalization. Only has an effect if `do_normalize` is set to `True`. image_std (`float` or `List[float]`, *optional*, defaults to `self.image_std`): Image standard deviation to use for normalization. Only has an effect if `do_normalize` is set to `True`. data_format (`ChannelDimension` or `str`, *optional*, defaults to `ChannelDimension.FIRST`): The channel dimension format for the output image. Can be one of: - `"channels_first"` or `ChannelDimension.FIRST`: image in (num_channels, height, width) format. - `"channels_last"` or `ChannelDimension.LAST`: image in (height, width, num_channels) format. - Unset: Use the channel dimension format of the input image. input_data_format (`ChannelDimension` or `str`, *optional*): The channel dimension format for the input image. If unset, the channel dimension format is inferred from the input image. Can be one of: - `"channels_first"` or `ChannelDimension.FIRST`: image in (num_channels, height, width) format. - `"channels_last"` or `ChannelDimension.LAST`: image in (height, width, num_channels) format. - `"none"` or `ChannelDimension.NONE`: image in (height, width) format. """ images = make_list_of_images(images) if do_resize: images = [ self.resize(image=image, size=size, resample=resample, input_data_format=input_data_format) for image in images ] if do_center_crop: images = [ self.center_crop(image=image, size=crop_size, input_data_format=input_data_format) for image in images ] if do_rescale: images = [ self.rescale(image=image, scale=rescale_factor, input_data_format=input_data_format) for image in images ] if do_normalize: images = [ self.normalize(image=image, mean=image_mean, std=image_std, input_data_format=input_data_format) for image in images ] images = [ to_channel_dimension_format(image, data_format, input_channel_dim=input_data_format) for image in images ] return images def _resize_for_patching( self, image: np.array, target_resolution: tuple, resample, input_data_format: ChannelDimension ) -> np.array: """ Resizes an image to a target resolution while maintaining aspect ratio. Args: image (np.array): The input image. target_resolution (tuple): The target resolution (height, width) of the image. resample (`PILImageResampling`): Resampling filter to use if resizing the image. input_data_format (`ChannelDimension` or `str`): The channel dimension format of the input image. Returns: np.array: The resized and padded image. """ new_height, new_width = _get_patch_output_size(image, target_resolution, input_data_format) # Resize the image resized_image = resize(image, (new_height, new_width), resample=resample, input_data_format=input_data_format) return resized_image def _pad_for_patching( self, image: np.array, target_resolution: tuple, input_data_format: ChannelDimension ) -> np.array: """ Pad an image to a target resolution while maintaining aspect ratio. """ target_height, target_width = target_resolution new_height, new_width = _get_patch_output_size(image, target_resolution, input_data_format) paste_x = (target_width - new_width) // 2 paste_y = (target_height - new_height) // 2 padded_image = pad(image, padding=((paste_y, paste_y), (paste_x, paste_x))) return padded_image def get_image_patches( self, image: np.array, grid_pinpoints, size: tuple, patch_size: int, resample: PILImageResampling, data_format: ChannelDimension, input_data_format: ChannelDimension, ) -> List[np.array]: """ Process an image with variable resolutions by dividing it into patches. Args: image (np.array): The input image to be processed. grid_pinpoints (List): A string representation of a list of possible resolutions. size (`tuple`): Size to resize the original image to. patch_size (`int`): Size of the patches to divide the image into. resample (`PILImageResampling`): Resampling filter to use if resizing the image. data_format (`ChannelDimension` or `str`): The channel dimension format for the output image. input_data_format (`ChannelDimension` or `str`): The channel dimension format of the input image. Returns: List[np.array]: A list of NumPy arrays containing the processed image patches. """ if not isinstance(grid_pinpoints, list): raise ValueError("grid_pinpoints must be a list of possible resolutions.") possible_resolutions = grid_pinpoints image_size = get_image_size(image, channel_dim=input_data_format) best_resolution = select_best_resolution(image_size, possible_resolutions) resized_image = self._resize_for_patching( image, best_resolution, resample=resample, input_data_format=input_data_format ) padded_image = self._pad_for_patching(resized_image, best_resolution, input_data_format=input_data_format) patches = divide_to_patches(padded_image, patch_size=patch_size, input_data_format=input_data_format) # make sure that all patches are in the input data format patches = [ to_channel_dimension_format(patch, channel_dim=data_format, input_channel_dim=input_data_format) for patch in patches ] print(f'patches: {image.shape=} {padded_image.shape=} | {len(patches)=} given {image_size=}, {best_resolution=}, {patch_size=} | {input_data_format=}') resized_original_image = resize( image, size=size, resample=resample, data_format=data_format, input_data_format=input_data_format, ) image_patches = [resized_original_image] + patches return image_patches def preprocess( self, images: ImageInput, do_resize: bool = None, size: Dict[str, int] = None, image_grid_pinpoints: List = None, resample: PILImageResampling = None, do_center_crop: bool = None, crop_size: int = None, do_rescale: bool = None, rescale_factor: float = None, do_normalize: bool = None, image_mean: Optional[Union[float, List[float]]] = None, image_std: Optional[Union[float, List[float]]] = None, do_convert_rgb: bool = None, return_tensors: Optional[Union[str, TensorType]] = None, data_format: Optional[ChannelDimension] = ChannelDimension.FIRST, input_data_format: Optional[Union[str, ChannelDimension]] = None, concat_images: bool = True, ): """ Args: images (`ImageInput`): Image to preprocess. Expects a single or batch of images with pixel values ranging from 0 to 255. If passing in images with pixel values between 0 and 1, set `do_rescale=False`. do_resize (`bool`, *optional*, defaults to `self.do_resize`): Whether to resize the image. size (`Dict[str, int]`, *optional*, defaults to `self.size`): Size of the image after resizing. Shortest edge of the image is resized to size["shortest_edge"], with the longest edge resized to keep the input aspect ratio. image_grid_pinpoints (`List` *optional*, defaults to `self.image_grid_pinpoints`): A list of possible resolutions to use for processing high resolution images. The best resolution is selected based on the original size of the image. resample (`int`, *optional*, defaults to `self.resample`): Resampling filter to use if resizing the image. This can be one of the enum `PILImageResampling`. Only has an effect if `do_resize` is set to `True`. do_center_crop (`bool`, *optional*, defaults to `self.do_center_crop`): Whether to center crop the image. crop_size (`Dict[str, int]`, *optional*, defaults to `self.crop_size`): Size of the center crop. Only has an effect if `do_center_crop` is set to `True`. do_rescale (`bool`, *optional*, defaults to `self.do_rescale`): Whether to rescale the image. rescale_factor (`float`, *optional*, defaults to `self.rescale_factor`): Rescale factor to rescale the image by if `do_rescale` is set to `True`. do_normalize (`bool`, *optional*, defaults to `self.do_normalize`): Whether to normalize the image. image_mean (`float` or `List[float]`, *optional*, defaults to `self.image_mean`): Image mean to use for normalization. Only has an effect if `do_normalize` is set to `True`. image_std (`float` or `List[float]`, *optional*, defaults to `self.image_std`): Image standard deviation to use for normalization. Only has an effect if `do_normalize` is set to `True`. do_convert_rgb (`bool`, *optional*, defaults to `self.do_convert_rgb`): Whether to convert the image to RGB. return_tensors (`str` or `TensorType`, *optional*): The type of tensors to return. Can be one of: - Unset: Return a list of `np.ndarray`. - `TensorType.TENSORFLOW` or `'tf'`: Return a batch of type `tf.Tensor`. - `TensorType.PYTORCH` or `'pt'`: Return a batch of type `torch.Tensor`. - `TensorType.NUMPY` or `'np'`: Return a batch of type `np.ndarray`. - `TensorType.JAX` or `'jax'`: Return a batch of type `jax.numpy.ndarray`. data_format (`ChannelDimension` or `str`, *optional*, defaults to `ChannelDimension.FIRST`): The channel dimension format for the output image. Can be one of: - `"channels_first"` or `ChannelDimension.FIRST`: image in (num_channels, height, width) format. - `"channels_last"` or `ChannelDimension.LAST`: image in (height, width, num_channels) format. - Unset: Use the channel dimension format of the input image. input_data_format (`ChannelDimension` or `str`, *optional*): The channel dimension format for the input image. If unset, the channel dimension format is inferred from the input image. Can be one of: - `"channels_first"` or `ChannelDimension.FIRST`: image in (num_channels, height, width) format. - `"channels_last"` or `ChannelDimension.LAST`: image in (height, width, num_channels) format. - `"none"` or `ChannelDimension.NONE`: image in (height, width) format. """ do_resize = do_resize if do_resize is not None else self.do_resize size = size if size is not None else self.size size = get_size_dict(size, param_name="size", default_to_square=False) image_grid_pinpoints = image_grid_pinpoints if image_grid_pinpoints is not None else self.image_grid_pinpoints resample = resample if resample is not None else self.resample do_center_crop = do_center_crop if do_center_crop is not None else self.do_center_crop crop_size = crop_size if crop_size is not None else self.crop_size crop_size = get_size_dict(crop_size, param_name="crop_size", default_to_square=True) do_rescale = do_rescale if do_rescale is not None else self.do_rescale rescale_factor = rescale_factor if rescale_factor is not None else self.rescale_factor do_normalize = do_normalize if do_normalize is not None else self.do_normalize image_mean = image_mean if image_mean is not None else self.image_mean image_std = image_std if image_std is not None else self.image_std do_convert_rgb = do_convert_rgb if do_convert_rgb is not None else self.do_convert_rgb images = make_list_of_images(images) if not valid_images(images): raise ValueError( "Invalid image type. Must be of type PIL.Image.Image, numpy.ndarray, " "torch.Tensor, tf.Tensor or jax.ndarray." ) validate_preprocess_arguments( do_rescale=do_rescale, rescale_factor=rescale_factor, do_normalize=do_normalize, image_mean=image_mean, image_std=image_std, do_center_crop=do_center_crop, crop_size=crop_size, do_resize=do_resize, size=size, resample=resample, ) if do_convert_rgb: images = [convert_to_rgb(image) for image in images] # All transformations expect numpy arrays. images = [to_numpy_array(image) for image in images] if is_scaled_image(images[0]) and do_rescale: logger.warning_once( "It looks like you are trying to rescale already rescaled images. If the input" " images have pixel values between 0 and 1, set `do_rescale=False` to avoid rescaling them again." ) if input_data_format is None: # We assume that all images have the same channel dimension format. input_data_format = infer_channel_dimension_format(images[0]) new_images = [] image_sizes = [get_image_size(image, channel_dim=input_data_format) for image in images] for image in images: # convert image into a list of patches # we intentially use the same data format as the input data format image_patches = self.get_image_patches( image, image_grid_pinpoints, size=(size["shortest_edge"], size["shortest_edge"]), patch_size=crop_size["height"], resample=resample, data_format=input_data_format, input_data_format=input_data_format, ) # preprocess patches pixel_values = self._preprocess( image_patches, do_resize=do_resize, size=size, resample=resample, do_center_crop=do_center_crop, crop_size=crop_size, do_rescale=do_rescale, rescale_factor=rescale_factor, do_normalize=do_normalize, image_mean=image_mean, image_std=image_std, data_format=data_format, input_data_format=input_data_format, ) pixel_values = np.array(pixel_values) new_images.append(pixel_values) if concat_images: # image_num_patches = [len(x) for x in new_images] print(f'{len(new_images)}') pixel_values = np.concatenate(new_images, axis=0) data = { "pixel_values": pixel_values, "image_sizes": image_sizes, # "image_num_patches": image_num_patches, } else: data = {"pixel_values": new_images, "image_sizes": image_sizes} return BatchFeature(data=data, tensor_type=return_tensors)