import gradio as gr import os import torch import numpy as np device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu') from diffusers import DiffusionPipeline import torchvision.transforms as transforms from copy import deepcopy from collections import OrderedDict import requests import json from PIL import Image, ImageEnhance import base64 import io class BZHStableSignatureDemo(object): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.pipe = DiffusionPipeline.from_pretrained("stabilityai/sdxl-turbo", torch_dtype=torch.float16, variant="fp16").to("cuda") # load the patched VQ-VAEs sd1 = deepcopy(self.pipe.vae.state_dict()) # save initial state dict self.decoders = decoders = OrderedDict([("no watermark", sd1)]) for name, patched_decoder_ckpt in ( ("weak", "models/stable_signature/checkpoint_000.pth.50000"), ("medium", "models/stable_signature/checkpoint_000.pth.150000"), ("strong", "models/stable_signature/checkpoint_000.pth.500000"), ("extreme", "models/stable_signature/checkpoint_000.pth.1500000")): sd2 = torch.load(patched_decoder_ckpt)['ldm_decoder'] msg = self.pipe.vae.load_state_dict(sd2, strict=False) print(f"loaded LDM decoder state_dict with message\n{msg}") print("you should check that the decoder keys are correctly matched") decoders[name] = sd2 self.decoders = decoders def generate(self, mode, seed, prompt): generator = torch.Generator(device=device) if seed: torch.manual_seed(seed) # load the patched VAE decoder sd = self.decoders[mode] self.pipe.vae.load_state_dict(sd, strict=False) output = self.pipe(prompt, num_inference_steps=4, guidance_scale=0.0, output_type="pil") return output.images[0] @staticmethod def pad(img, padding, mode="edge"): npimg = np.asarray(img) nppad = ((padding[1], padding[3]), (padding[0], padding[2]), (0,0)) npimg = np.pad(npimg, nppad, mode=mode) return Image.fromarray(npimg) def attack_detect(self, img, jpeg_compression, downscale, saturation): # attack if downscale != 1: size = img.size size = (int(size[0] / downscale), int(size[1] / downscale)) img = img.resize(size, Image.BICUBIC) converter = ImageEnhance.Color(img) img = converter.enhance(saturation) # send to detection API and apply JPEG compression attack mf = io.BytesIO() img.save(mf, format='JPEG', quality=jpeg_compression) # includes JPEG attack b64 = base64.b64encode(mf.getvalue()) data = { 'image': b64.decode('utf8') } headers = {} api_key = os.environ.get('BZH_API_KEY', None) if api_key: headers['BZH_API_KEY'] = api_key response = requests.post('https://bzh.imatag.com/bzh/api/v1.0/detect', json=data, headers=headers) response.raise_for_status() data = response.json() pvalue = data['p-value'] mf.seek(0) img0 = Image.open(mf) # reload to show JPEG attack #result = "resolution = %dx%d p-value = %e" % (img.size[0], img.size[1], pvalue)) result = "No watermark detected." chances = int(1 / pvalue + 1) if pvalue < 1e-3: result = "Weak watermark detected (< 1/%d chances of being wrong)" % chances if pvalue < 1e-6: result = "Strong watermark detected (< 1/%d chances of being wrong)" % chances return (img0, result) def interface(): prompt = "sailing ship in storm by Rembrandt" backend = BZHStableSignatureDemo() decoders = list(backend.decoders.keys()) with gr.Blocks() as demo: gr.Markdown("""# Watermarked SDXL-Turbo demo This demo presents watermarking of images generated via StableDiffusion XL Turbo. Using the method presented in [StableSignature](https://ai.meta.com/blog/stable-signature-watermarking-generative-ai/), the VAE decoder of StableDiffusion is fine-tuned to produce images including a specific invisible watermark. We combined this method with our in-house decoder which operates in zero-bit mode for improved robustness.""") with gr.Row(): inp = gr.Textbox(label="Prompt", value=prompt) seed = gr.Number(label="Seed", precision=0) mode = gr.Dropdown(choices=decoders, label="Watermark strength", value="medium") with gr.Row(): btn1 = gr.Button("Generate") with gr.Row(): watermarked_image = gr.Image(type="pil", tool="select").style(width=512, height=512) with gr.Column(): downscale = gr.Slider(1, 3, value=1, step=0.1, label="Downscale ratio") saturation = gr.Slider(0, 2, value=1, step=0.1, label="Color saturation") jpeg_compression = gr.Slider(value=100, step=5, label="JPEG quality") btn2 = gr.Button("Attack & Detect") with gr.Row(): attacked_image = gr.Image(type="pil", tool="select").style(width=256) detection_label = gr.Label(label="Detection info") btn1.click(fn=backend.generate, inputs=[mode, seed, inp], outputs=[watermarked_image], api_name="generate") btn2.click(fn=backend.attack_detect, inputs=[watermarked_image, jpeg_compression, downscale, saturation], outputs=[attacked_image, detection_label], api_name="detect") return demo if __name__ == '__main__': demo = interface() demo.launch()