File size: 4,380 Bytes
ed4e013
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
from typing import  Dict, List, Any
import torch
from diffusers import DPMSolverMultistepScheduler, DiffusionPipeline, StableDiffusionImg2ImgPipeline, StableDiffusionInpaintPipelineLegacy
from PIL import Image
import base64
from io import BytesIO


# set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

class EndpointHandler():
    def __init__(self, path=""):
        # load StableDiffusionInpaintPipeline pipeline
        self.txt2img_pipe = DiffusionPipeline.from_pretrained(path, torch_dtype=torch.float16)
        # Set safety_checker
        self.txt2img_pipe.safety_checker = None
        # use DPMSolverMultistepScheduler
        self.txt2img_pipe.scheduler = DPMSolverMultistepScheduler.from_config(self.txt2img_pipe.scheduler.config)

        self.img2img_pipe = StableDiffusionImg2ImgPipeline(
            vae=self.txt2img_pipe.vae,
            text_encoder=self.txt2img_pipe.text_encoder,
            tokenizer=self.txt2img_pipe.tokenizer,
            unet=self.txt2img_pipe.unet,
            scheduler=self.txt2img_pipe.scheduler,
            safety_checker=self.txt2img_pipe.safety_checker,
            feature_extractor=self.txt2img_pipe.feature_extractor,
        ).to(device)
        self.inpaint_pipe = StableDiffusionInpaintPipelineLegacy(
            vae=self.txt2img_pipe.vae,
            text_encoder=self.txt2img_pipe.text_encoder,
            tokenizer=self.txt2img_pipe.tokenizer,
            unet=self.txt2img_pipe.unet,
            scheduler=self.txt2img_pipe.scheduler,
            safety_checker=self.txt2img_pipe.safety_checker,
            feature_extractor=self.txt2img_pipe.feature_extractor,
        ).to(device)


    def __call__(self, data: Any) -> List[List[Dict[str, float]]]:
        """
        :param data: A dictionary contains `inputs` and optional `image` field.
        :return: A dictionary with `image` field contains image in base64.
        """
        inputs = data.pop("inputs", data)
        encoded_image = data.pop("image", None)
        encoded_mask_image = data.pop("mask_image", None)
        
        # hyperparamters
        num_inference_steps = data.pop("num_inference_steps", 25)
        guidance_scale = data.pop("guidance_scale", 7.5)
        negative_prompt = data.pop("negative_prompt", None)
        height = data.pop("height", 512)
        width = data.pop("width", 512)
        strength = data.pop("strength", 0.8)
        
        # run inference pipeline
        if encoded_image is not None and encoded_mask_image is not None:
            image = self.decode_base64_image(encoded_image)
            mask_image = self.decode_base64_image(encoded_mask_image)

            out = self.inpaint_pipe(inputs, 
                        init_image=image,             
                        mask_image=mask_image,
                        strength=strength,
                        num_inference_steps=num_inference_steps,
                        guidance_scale=guidance_scale,
                        num_images_per_prompt=1,
                        negative_prompt=negative_prompt
            )
            return out.images[0]
            
        elif encoded_image is not None:
            image = self.decode_base64_image(encoded_image)

            out = self.img2img_pipe(inputs, 
                        init_image=image,
                        strength=strength,
                        num_inference_steps=num_inference_steps,
                        guidance_scale=guidance_scale,
                        num_images_per_prompt=1,
                        negative_prompt=negative_prompt
            )
            return out.images[0]
        else:
            out = self.txt2img_pipe(inputs,           
                            num_inference_steps=num_inference_steps,
                            guidance_scale=guidance_scale,
                            num_images_per_prompt=1,
                            negative_prompt=negative_prompt,
                            height=height,
                            width=width
            )
                
            # return first generate PIL image
            return out.images[0]
    
    # helper to decode input image
    def decode_base64_image(self, image_string):
        base64_image = base64.b64decode(image_string)
        buffer = BytesIO(base64_image)
        image = Image.open(buffer)
        return image