Spaces:
Sleeping
Sleeping
# utils.py | |
import cv2 | |
import numpy as np | |
from PIL import Image, PngImagePlugin, ImageDraw | |
import json | |
from datetime import datetime | |
from cryptography.fernet import Fernet | |
import base64 | |
import hashlib | |
class WatermarkProcessor: | |
def __init__(self, encryption_key=None): | |
"""Initialize with optional encryption key""" | |
if encryption_key: | |
self.fernet = Fernet(encryption_key) | |
else: | |
key = Fernet.generate_key() | |
self.fernet = Fernet(key) | |
def to_bin(self, data): | |
"""Convert data to binary format as string""" | |
if isinstance(data, str): | |
return ''.join(format(x, '08b') for x in data.encode('utf-8')) | |
elif isinstance(data, bytes): | |
return ''.join(format(x, '08b') for x in data) | |
elif isinstance(data, np.ndarray): | |
return [format(i, "08b") for i in data] | |
elif isinstance(data, int) or isinstance(data, np.uint8): | |
return format(data, "08b") | |
else: | |
raise TypeError("Type not supported.") | |
def create_preview(self, image_path, watermark_text, opacity=0.3): | |
"""Create a preview of watermark on image""" | |
try: | |
image = Image.open(image_path) | |
txt_layer = Image.new('RGBA', image.size, (255, 255, 255, 0)) | |
draw = ImageDraw.Draw(txt_layer) | |
# Calculate text position | |
text_width = draw.textlength(watermark_text) | |
text_x = (image.width - text_width) // 2 | |
text_y = image.height // 2 | |
# Add watermark text | |
draw.text((text_x, text_y), watermark_text, | |
fill=(255, 255, 255, int(255 * opacity))) | |
# Combine layers | |
preview = Image.alpha_composite(image.convert('RGBA'), txt_layer) | |
return preview | |
except Exception as e: | |
return None | |
def png_encode(self, im_name, extra): | |
"""Encode watermark using PNG metadata""" | |
try: | |
im = Image.open(im_name) | |
info = PngImagePlugin.PngInfo() | |
info.add_text("TXT", extra) | |
im.save("test.png", pnginfo=info) | |
return "test.png", "Watermark added successfully" | |
except Exception as e: | |
return im_name, f"Error adding watermark: {str(e)}" | |
def encode(self, image_path, watermark_text, metadata=None): | |
"""Encode watermark using steganography with encryption""" | |
try: | |
image = cv2.imread(image_path) | |
if image is None: | |
raise ValueError("Could not read image file") | |
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) | |
# Prepare watermark data | |
watermark_data = { | |
'text': watermark_text, | |
'timestamp': datetime.now().isoformat(), | |
'metadata': metadata or {} | |
} | |
# Add image hash | |
image_copy = image.copy() & 0xFE # Clear LSB | |
watermark_data['image_hash'] = hashlib.sha256(image_copy.tobytes()).hexdigest() | |
# Encrypt data | |
secret_data = json.dumps(watermark_data) | |
secret_data = f"{secret_data}#####=====" # Add delimiters | |
binary_secret_data = self.to_bin(secret_data) | |
# Calculate capacity | |
n_bytes = image.shape[0] * image.shape[1] * 3 // 8 | |
if len(binary_secret_data) > n_bytes * 8: | |
return image_path, "Watermark is too large for Image Size" | |
# Embed data | |
data_index = 0 | |
for i in range(image.shape[0]): | |
for j in range(image.shape[1]): | |
if data_index < len(binary_secret_data): | |
pixel = image[i, j] | |
for k in range(3): | |
if data_index < len(binary_secret_data): | |
binary_value = format(pixel[k], '08b') | |
binary_value = binary_value[:-1] + binary_secret_data[data_index] | |
image[i, j, k] = int(binary_value, 2) | |
data_index += 1 | |
# Save result | |
output_path = "watermarked_" + datetime.now().strftime("%Y%m%d_%H%M%S") + ".png" | |
cv2.imwrite(output_path, cv2.cvtColor(image, cv2.COLOR_RGB2BGR)) | |
return output_path, "Watermark added successfully" | |
except Exception as e: | |
return image_path, f"Error in encoding: {str(e)}" | |
def decode(self, image_path): | |
"""Decode watermark with improved error handling""" | |
try: | |
# Try PNG metadata method first | |
try: | |
im = Image.open(image_path) | |
if "TXT" in im.info: | |
return im.info["TXT"] | |
except: | |
pass | |
# Steganography method | |
image = cv2.imread(image_path) | |
if image is None: | |
raise ValueError("Could not read image file") | |
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) | |
# Extract binary data | |
binary_data = "" | |
for row in image: | |
for pixel in row: | |
for value in pixel: | |
binary_data += format(value, '08b')[-1] | |
# Convert to bytes with error checking | |
bytes_data = bytearray() | |
for i in range(0, len(binary_data), 8): | |
if i + 8 <= len(binary_data): | |
try: | |
byte = int(binary_data[i:i+8], 2) | |
bytes_data.append(byte) | |
except ValueError: | |
continue | |
# Process the data with robust decoding | |
try: | |
# First try UTF-8 decoding with error handling | |
decoded_data = bytes(bytes_data).decode('utf-8', errors='ignore') | |
# Clean up decoded data | |
decoded_data = ''.join(char for char in decoded_data if ord(char) < 128 or ord(char) > 160) | |
# Find markers | |
end_marker = "=====" | |
delimiter = "#####" | |
if end_marker in decoded_data: | |
parts = decoded_data.split(end_marker) | |
decoded_data = parts[0] # Take the part before ===== | |
if delimiter in decoded_data: | |
parts = decoded_data.split(delimiter) | |
decoded_data = parts[0] # Take the part before ##### | |
# Clean the decoded data | |
decoded_data = decoded_data.strip() | |
# Try to parse as JSON | |
try: | |
watermark_data = json.loads(decoded_data) | |
# Verify image hash | |
image_copy = image.copy() & 0xFE | |
current_hash = hashlib.sha256(image_copy.tobytes()).hexdigest() | |
if current_hash != watermark_data.get('image_hash'): | |
return "Warning: Image has been modified after watermarking" | |
return json.dumps(watermark_data, indent=2, ensure_ascii=False) | |
except json.JSONDecodeError: | |
# If not valid JSON, return cleaned decoded data | |
return decoded_data | |
except UnicodeDecodeError: | |
# Try alternative decoding method | |
try: | |
fallback_decoded = "" | |
current_bytes = bytearray() | |
for byte in bytes_data: | |
current_bytes.append(byte) | |
try: | |
char = current_bytes.decode('utf-8') | |
fallback_decoded += char | |
current_bytes = bytearray() | |
except UnicodeDecodeError: | |
if len(current_bytes) >= 4: # Maximum UTF-8 character length | |
current_bytes = bytearray() | |
continue | |
if fallback_decoded: | |
return fallback_decoded.strip() | |
else: | |
return "Error: Could not decode watermark data" | |
except Exception as e: | |
return f"Error in fallback decoding: {str(e)}" | |
except Exception as e: | |
return f"Error in decoding: {str(e)}" | |
def analyze_quality(self, original_path, watermarked_path): | |
"""Analyze watermark quality""" | |
try: | |
original = cv2.imread(original_path) | |
watermarked = cv2.imread(watermarked_path) | |
if original is None or watermarked is None: | |
raise ValueError("Could not read image files") | |
# Calculate PSNR | |
mse = np.mean((original - watermarked) ** 2) | |
if mse == 0: | |
psnr = float('inf') | |
else: | |
psnr = 20 * np.log10(255.0 / np.sqrt(mse)) | |
# Calculate histogram similarity | |
hist_original = cv2.calcHist([original], [0], None, [256], [0, 256]) | |
hist_watermarked = cv2.calcHist([watermarked], [0], None, [256], [0, 256]) | |
hist_correlation = cv2.compareHist(hist_original, hist_watermarked, cv2.HISTCMP_CORREL) | |
# Count modified pixels | |
diff = cv2.bitwise_xor(original, watermarked) | |
modified_pixels = np.count_nonzero(diff) | |
report = { | |
'psnr': round(psnr, 2), | |
'histogram_similarity': round(hist_correlation, 4), | |
'modified_pixels': modified_pixels, | |
'image_size': original.shape, | |
'quality_score': round((psnr / 50) * 100, 2) if psnr != float('inf') else 100 | |
} | |
return json.dumps(report, indent=2) | |
except Exception as e: | |
return f"Error in quality analysis: {str(e)}" |