SecureWatermark / utils.py
fantos's picture
Update utils.py
ae3e8a6 verified
raw
history blame
10.4 kB
# utils.py
import cv2
import numpy as np
from PIL import Image, PngImagePlugin, ImageDraw
import json
from datetime import datetime
from cryptography.fernet import Fernet
import base64
import hashlib
class WatermarkProcessor:
def __init__(self, encryption_key=None):
"""Initialize with optional encryption key"""
if encryption_key:
self.fernet = Fernet(encryption_key)
else:
key = Fernet.generate_key()
self.fernet = Fernet(key)
def to_bin(self, data):
"""Convert data to binary format as string"""
if isinstance(data, str):
return ''.join(format(x, '08b') for x in data.encode('utf-8'))
elif isinstance(data, bytes):
return ''.join(format(x, '08b') for x in data)
elif isinstance(data, np.ndarray):
return [format(i, "08b") for i in data]
elif isinstance(data, int) or isinstance(data, np.uint8):
return format(data, "08b")
else:
raise TypeError("Type not supported.")
def create_preview(self, image_path, watermark_text, opacity=0.3):
"""Create a preview of watermark on image"""
try:
image = Image.open(image_path)
txt_layer = Image.new('RGBA', image.size, (255, 255, 255, 0))
draw = ImageDraw.Draw(txt_layer)
# Calculate text position
text_width = draw.textlength(watermark_text)
text_x = (image.width - text_width) // 2
text_y = image.height // 2
# Add watermark text
draw.text((text_x, text_y), watermark_text,
fill=(255, 255, 255, int(255 * opacity)))
# Combine layers
preview = Image.alpha_composite(image.convert('RGBA'), txt_layer)
return preview
except Exception as e:
return None
def png_encode(self, im_name, extra):
"""Encode watermark using PNG metadata"""
try:
im = Image.open(im_name)
info = PngImagePlugin.PngInfo()
info.add_text("TXT", extra)
im.save("test.png", pnginfo=info)
return "test.png", "Watermark added successfully"
except Exception as e:
return im_name, f"Error adding watermark: {str(e)}"
def encode(self, image_path, watermark_text, metadata=None):
"""Encode watermark using steganography with encryption"""
try:
image = cv2.imread(image_path)
if image is None:
raise ValueError("Could not read image file")
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
# Prepare watermark data
watermark_data = {
'text': watermark_text,
'timestamp': datetime.now().isoformat(),
'metadata': metadata or {}
}
# Add image hash
image_copy = image.copy() & 0xFE # Clear LSB
watermark_data['image_hash'] = hashlib.sha256(image_copy.tobytes()).hexdigest()
# Encrypt data
secret_data = json.dumps(watermark_data)
secret_data = f"{secret_data}#####=====" # Add delimiters
binary_secret_data = self.to_bin(secret_data)
# Calculate capacity
n_bytes = image.shape[0] * image.shape[1] * 3 // 8
if len(binary_secret_data) > n_bytes * 8:
return image_path, "Watermark is too large for Image Size"
# Embed data
data_index = 0
for i in range(image.shape[0]):
for j in range(image.shape[1]):
if data_index < len(binary_secret_data):
pixel = image[i, j]
for k in range(3):
if data_index < len(binary_secret_data):
binary_value = format(pixel[k], '08b')
binary_value = binary_value[:-1] + binary_secret_data[data_index]
image[i, j, k] = int(binary_value, 2)
data_index += 1
# Save result
output_path = "watermarked_" + datetime.now().strftime("%Y%m%d_%H%M%S") + ".png"
cv2.imwrite(output_path, cv2.cvtColor(image, cv2.COLOR_RGB2BGR))
return output_path, "Watermark added successfully"
except Exception as e:
return image_path, f"Error in encoding: {str(e)}"
def decode(self, image_path):
"""Decode watermark with improved error handling"""
try:
# Try PNG metadata method first
try:
im = Image.open(image_path)
if "TXT" in im.info:
return im.info["TXT"]
except:
pass
# Steganography method
image = cv2.imread(image_path)
if image is None:
raise ValueError("Could not read image file")
image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
# Extract binary data
binary_data = ""
for row in image:
for pixel in row:
for value in pixel:
binary_data += format(value, '08b')[-1]
# Convert to bytes with error checking
bytes_data = bytearray()
for i in range(0, len(binary_data), 8):
if i + 8 <= len(binary_data):
try:
byte = int(binary_data[i:i+8], 2)
bytes_data.append(byte)
except ValueError:
continue
# Process the data with robust decoding
try:
# First try UTF-8 decoding with error handling
decoded_data = bytes(bytes_data).decode('utf-8', errors='ignore')
# Clean up decoded data
decoded_data = ''.join(char for char in decoded_data if ord(char) < 128 or ord(char) > 160)
# Find markers
end_marker = "====="
delimiter = "#####"
if end_marker in decoded_data:
parts = decoded_data.split(end_marker)
decoded_data = parts[0] # Take the part before =====
if delimiter in decoded_data:
parts = decoded_data.split(delimiter)
decoded_data = parts[0] # Take the part before #####
# Clean the decoded data
decoded_data = decoded_data.strip()
# Try to parse as JSON
try:
watermark_data = json.loads(decoded_data)
# Verify image hash
image_copy = image.copy() & 0xFE
current_hash = hashlib.sha256(image_copy.tobytes()).hexdigest()
if current_hash != watermark_data.get('image_hash'):
return "Warning: Image has been modified after watermarking"
return json.dumps(watermark_data, indent=2, ensure_ascii=False)
except json.JSONDecodeError:
# If not valid JSON, return cleaned decoded data
return decoded_data
except UnicodeDecodeError:
# Try alternative decoding method
try:
fallback_decoded = ""
current_bytes = bytearray()
for byte in bytes_data:
current_bytes.append(byte)
try:
char = current_bytes.decode('utf-8')
fallback_decoded += char
current_bytes = bytearray()
except UnicodeDecodeError:
if len(current_bytes) >= 4: # Maximum UTF-8 character length
current_bytes = bytearray()
continue
if fallback_decoded:
return fallback_decoded.strip()
else:
return "Error: Could not decode watermark data"
except Exception as e:
return f"Error in fallback decoding: {str(e)}"
except Exception as e:
return f"Error in decoding: {str(e)}"
def analyze_quality(self, original_path, watermarked_path):
"""Analyze watermark quality"""
try:
original = cv2.imread(original_path)
watermarked = cv2.imread(watermarked_path)
if original is None or watermarked is None:
raise ValueError("Could not read image files")
# Calculate PSNR
mse = np.mean((original - watermarked) ** 2)
if mse == 0:
psnr = float('inf')
else:
psnr = 20 * np.log10(255.0 / np.sqrt(mse))
# Calculate histogram similarity
hist_original = cv2.calcHist([original], [0], None, [256], [0, 256])
hist_watermarked = cv2.calcHist([watermarked], [0], None, [256], [0, 256])
hist_correlation = cv2.compareHist(hist_original, hist_watermarked, cv2.HISTCMP_CORREL)
# Count modified pixels
diff = cv2.bitwise_xor(original, watermarked)
modified_pixels = np.count_nonzero(diff)
report = {
'psnr': round(psnr, 2),
'histogram_similarity': round(hist_correlation, 4),
'modified_pixels': modified_pixels,
'image_size': original.shape,
'quality_score': round((psnr / 50) * 100, 2) if psnr != float('inf') else 100
}
return json.dumps(report, indent=2)
except Exception as e:
return f"Error in quality analysis: {str(e)}"