File size: 5,938 Bytes
c97c8cb
 
 
 
 
d6ea99d
254bcc9
c97c8cb
 
 
254bcc9
c97c8cb
254bcc9
 
c97c8cb
3686b46
254bcc9
77dbaf5
c97c8cb
a6def05
 
 
 
 
 
 
 
 
 
 
77dbaf5
4ce7ea7
a6def05
 
 
4ce7ea7
77dbaf5
254bcc9
 
 
 
3686b46
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
254bcc9
c97c8cb
 
 
 
 
 
 
b4bc0d9
 
 
 
 
 
 
c97c8cb
046b29c
c97c8cb
046b29c
 
 
 
 
 
 
c97c8cb
 
3686b46
 
 
254bcc9
3686b46
c97c8cb
046b29c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c97c8cb
046b29c
254bcc9
046b29c
254bcc9
3686b46
 
 
046b29c
 
 
c97c8cb
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# import sys
# import base64
# import logging
# import copy
import numpy as np
from transformers import Blip2Processor, Blip2ForConditionalGeneration, BlipForQuestionAnswering, BitsAndBytesConfig
from typing import Dict, List, Any
from PIL import Image
from transformers import pipeline
import requests
import torch
        
class EndpointHandler():
    def __init__(self, path=""):
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        self.model_base = "Salesforce/blip2-opt-2.7b"
        self.model_name = "sooh-j/blip2-vizwizqa"
        # self.base_model = Blip2ForConditionalGeneration.from_pretrained(self.model_base, load_in_8bit=True)
        # self.pipe = Blip2ForConditionalGeneration.from_pretrained(self.model_base, load_in_8bit=True, torch_dtype=torch.float16)

        quantization_config = BitsAndBytesConfig(load_in_8bit=True,
                                         # llm_int8_threshold=200.0
                                                )
        # model = AutoModelForCausalLM.from_pretrained(
        #     "EleutherAI/gpt-neox-20b",
        #     torch_dtype=torch.float16,
        #     device_map="auto",
        #     quantization_config=quantization_config,

    
        self.processor = Blip2Processor.from_pretrained(self.model_name)
        self.model = BlipForQuestionAnswering.from_pretrained(self.model_name,
                                                              device_map="auto", 
                                                              # load_in_8bit=True,
                                                              quantization_config=quantization_config,
                                                             ).to(self.device)
        # self.model = PeftModel.from_pretrained(self.model_name, self.base_model_name).to(self.device)

        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        self.model.to(self.device)

    # def _generate_answer(
    #     self,
    #     model_path,
    #     prompt,
    #     # num_inference_steps=25, 
    #     # guidance_scale=7.5, 
    #     # num_images_per_prompt=1
    #     ):
    
    #     self.pipe.to(self.device)
    
    #     # pil_images = self.pipe(
    #     #   prompt=prompt,
    #     #   num_inference_steps=num_inference_steps,
    #     #   guidance_scale=guidance_scale,
    #     #   num_images_per_prompt=num_images_per_prompt).images
    
    #     # np_images = []
    #     # for i in range(len(pil_images)):
    #     #   np_images.append(np.asarray(pil_images[i]))
    
        # return np.stack(np_images, axis=0)

# inputs = data.get("inputs")
#         imageBase64 = inputs.get("image")
#         # imageURL = inputs.get("image")
#         text = inputs.get("text")
#         # print(imageURL)
#         # print(text)
#         # image = Image.open(requests.get(imageBase64, stream=True).raw)
        
#         image = Image.open(BytesIO(base64.b64decode(imageBase64.split(",")[1].encode())))
#         inputs = self.processor(text=text, images=image, return_tensors="pt", padding=True)
#         outputs = self.model(**inputs)
#         embeddings = outputs.image_embeds.detach().numpy().flatten().tolist()
#         return { "embeddings": embeddings }
      
    def __call__(self, data: Dict[str, Any]) -> List[Dict[str, Any]]:
        """
        data args:
            inputs (:obj: `str` | `PIL.Image` | `np.array`)
            kwargs
        Return:
            A :obj:`list` | `dict`: will be serialized and returned
        """
        # await hf.visualQuestionAnswering({
        #       model: 'dandelin/vilt-b32-finetuned-vqa',
        #       inputs: {
        #         question: 'How many cats are lying down?',
        #         image: await (await fetch('https://placekitten.com/300/300')).blob()
        #       }
        #     })
        inputs = data.pop("inputs", data)
        parameters = data.pop("parameters", {})

        # try:
        #     imageBase64 = inputs["image"]
        #     image = Image.open(BytesIO(base64.b64decode(imageBase64.split(",")[1].encode())))

        # except:
        #     image_url = inputs['image']
        #     image = Image.open(requests.get(image_url, stream=True).raw).convert('RGB')
            
        question = inputs["question"]
        
        # data = data.pop("inputs", data)
        # data = data.pop("image", image)

        # image = Image.open(requests.get(imageBase64, stream=True).raw)
        # image = Image.open(requests.get(image_url, stream=True).raw).convert('RGB')
#### https://huggingface.co/SlowPacer/witron-image-captioning/blob/main/handler.py
        inputs = data.pop("inputs", data)
        parameters = data.pop("parameters", {})

        if isinstance(inputs, Image.Image):
            image = [inputs]
        else:
            inputs = isinstance(inputs, str) and [inputs] or inputs
            image = [Image.open(BytesIO(base64.b64decode(_img))) for _img in inputs]
                                     
        # processed_images = self.processor(images=raw_images, return_tensors="pt")
        # processed_images["pixel_values"] = processed_images["pixel_values"].to(device)
        # processed_images = {**processed_images, **parameters}
        
        # with torch.no_grad():
        #     out = self.model.generate(**processed_images)
        # captions = self.processor.batch_decode(out, skip_special_tokens=True)
####

        
        prompt = f"Question: {question}, Answer:"
        processed = self.processor(images=image, text=prompt, return_tensors="pt").to(self.device)#, torch.float16)

        # answer = self._generate_answer(
        #     model_path, prompt, image, 
        # )
    
        with torch.no_grad():
            out = self.model.generate(**processed)
        
        result = {}
        text_output = self.processor.decode(out[0], skip_special_tokens=True)
        result["text_output"] = text_output
        return result