Vivien Chappelier
use LANCZOS downsampling filter
fbe5687
raw
history blame
7.04 kB
import gradio as gr
import os
import torch
import numpy as np
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
from diffusers import DiffusionPipeline
import torchvision.transforms as transforms
from copy import deepcopy
from collections import OrderedDict
import requests
import json
from PIL import Image, ImageEnhance
import base64
import io
import random
import math
class BZHStableSignatureDemo(object):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.pipe = DiffusionPipeline.from_pretrained("stabilityai/sdxl-turbo", torch_dtype=torch.float16, variant="fp16").to("cuda")
try:
print("self.pipe.watermark = ", self.pipe.watermark)
except:
print("no self.pipe.watermark")
# load the patched VQ-VAEs
sd1 = deepcopy(self.pipe.vae.state_dict()) # save initial state dict
self.decoders = decoders = OrderedDict([("no watermark", sd1)])
for name, patched_decoder_ckpt in (
("weak", "models/checkpoint_000.pth.50000"),
("medium", "models/checkpoint_000.pth.150000"),
("strong", "models/checkpoint_000.pth.500000"),
("extreme", "models/checkpoint_000.pth.1500000")):
sd2 = torch.load(patched_decoder_ckpt)['ldm_decoder']
msg = self.pipe.vae.load_state_dict(sd2, strict=False)
print(f"loaded LDM decoder state_dict with message\n{msg}")
print("you should check that the decoder keys are correctly matched")
decoders[name] = sd2
self.decoders = decoders
def generate(self, mode, seed, prompt):
generator = torch.Generator(device=device)
#if seed:
torch.manual_seed(seed)
# load the patched VAE decoder
sd = self.decoders[mode]
self.pipe.vae.load_state_dict(sd, strict=False)
output = self.pipe(prompt, num_inference_steps=4, guidance_scale=0.0, output_type="pil")
return output.images[0] #{ "background": output.images[0], "layers": [], "composite": None }
def attack_detect(self, img, jpeg_compression, downscale, crop, saturation):
#img = img_edit["composite"]
img = img.convert("RGB")
# attack
if downscale != 1:
size = img.size
size = (int(size[0] / downscale), int(size[1] / downscale))
img = img.resize(size, Image.Resampling.LANCZOS)
if crop != 0:
width, height = img.size
area = width * height
log_rmin = math.log(0.5)
log_rmax = math.log(2.0)
for _ in range(10):
target_area = area * (1 - crop)
aspect_ratio = math.exp(random.random() * (log_rmax - log_rmin) + log_rmin)
w = int(round(math.sqrt(target_area * aspect_ratio)))
h = int(round(math.sqrt(target_area / aspect_ratio)))
if 0 < w <= width and 0 < h <= height:
top = random.randint(0, height - h + 1)
left = random.randint(0, width - w + 1)
img = img.crop((left, top, left+w, top+h))
break
converter = ImageEnhance.Color(img)
img = converter.enhance(saturation)
# send to detection API and apply JPEG compression attack
mf = io.BytesIO()
img.save(mf, format='JPEG', quality=jpeg_compression) # includes JPEG attack
b64 = base64.b64encode(mf.getvalue())
data = {
'image': b64.decode('utf8')
}
headers = {}
api_key = os.getenv('BZH_API_KEY')
if api_key:
headers['x-api-key'] = api_key
response = requests.post('https://bzh.imatag.com/bzh/api/v1.0/detect',
json=data, headers=headers)
response.raise_for_status()
data = response.json()
pvalue = data['p-value']
mf.seek(0)
img0 = Image.open(mf) # reload to show JPEG attack
#result = "resolution = %dx%d p-value = %e" % (img.size[0], img.size[1], pvalue))
result = "No watermark detected."
chances = int(1 / pvalue + 1)
rpv = 10**int(math.log10(pvalue))
if pvalue < 1e-3:
result = "Watermark detected with low confidence (p-value<%.0e)" % rpv # (< 1/%d chances of being wrong)" % chances
if pvalue < 1e-9:
result = "Watermark detected with high confidence (p-value<%.0e)" % rpv # (< 1/%d chances of being wrong)" % chances
return (img0, result)
def interface():
prompt = "sailing ship in storm by Rembrandt"
backend = BZHStableSignatureDemo()
decoders = list(backend.decoders.keys())
with gr.Blocks() as demo:
gr.Markdown("""# Watermarked SDXL-Turbo demo
This demo brought to you by [IMATAG](https://www.imatag.com/) presents watermarking of images generated via [StableDiffusion XL Turbo](https://huggingface.co/stabilityai/sdxl-turbo).
Using the method presented in [StableSignature](https://ai.meta.com/blog/stable-signature-watermarking-generative-ai/),
the VAE decoder of StableDiffusion is fine-tuned to produce images including a specific invisible watermark. We combined
this method with a demo version of [IMATAG](https://www.imatag.com/)'s in-house decoder. The watermarking system operates in zero-bit mode for improved robustness.""")
with gr.Row():
inp = gr.Textbox(label="Prompt", value=prompt)
seed = gr.Number(label="Seed", precision=0)
mode = gr.Dropdown(choices=decoders, label="Watermark strength", value="medium")
with gr.Row():
btn1 = gr.Button("Generate")
with gr.Row():
watermarked_image = gr.Image(type="pil", width=512, height=512)
with gr.Column():
gr.Markdown("""With these controls you may alter the generated image before detection. You may also upload your own edited image instead.""")
downscale = gr.Slider(1, 3, value=1, step=0.1, label="Downscale ratio")
crop = gr.Slider(0, 0.9, value=0, step=0.01, label="Random crop ratio")
saturation = gr.Slider(0, 2, value=1, step=0.1, label="Color saturation")
jpeg_compression = gr.Slider(value=100, step=5, label="JPEG quality")
btn2 = gr.Button("Modify & Detect")
with gr.Row():
attacked_image = gr.Image(type="pil", width=256)
detection_label = gr.Label(label="Detection info")
btn1.click(fn=backend.generate, inputs=[mode, seed, inp], outputs=[watermarked_image], api_name="generate")
btn2.click(fn=backend.attack_detect, inputs=[watermarked_image, jpeg_compression, downscale, crop, saturation], outputs=[attacked_image, detection_label], api_name="detect")
return demo
if __name__ == '__main__':
demo = interface()
demo.launch()