fantos commited on
Commit
ae3e8a6
·
verified ·
1 Parent(s): a733f5b

Update utils.py

Browse files
Files changed (1) hide show
  1. utils.py +237 -135
utils.py CHANGED
@@ -1,153 +1,255 @@
 
 
1
  import cv2
2
  import numpy as np
3
- from PIL import Image, PngImagePlugin
 
 
 
 
 
4
 
5
- def png_encode(im_name, extra):
6
- """Encode watermark using PNG metadata"""
7
- try:
8
- im = Image.open(im_name)
9
- info = PngImagePlugin.PngInfo()
10
- info.add_text("TXT", extra)
11
- im.save("test.png", pnginfo=info)
12
- return "test.png", "Watermark added successfully"
13
- except Exception as e:
14
- return im_name, f"Error adding watermark: {str(e)}"
15
 
16
- def to_bin(data):
17
- """Convert data to binary format as string"""
18
- if isinstance(data, str):
19
- return ''.join(format(x, '08b') for x in data.encode('utf-8'))
20
- elif isinstance(data, bytes):
21
- return ''.join(format(x, '08b') for x in data)
22
- elif isinstance(data, np.ndarray):
23
- return [format(i, "08b") for i in data]
24
- elif isinstance(data, int) or isinstance(data, np.uint8):
25
- return format(data, "08b")
26
- else:
27
- raise TypeError("Type not supported.")
28
 
29
- def decode(image_name, txt=None):
30
- """Decode watermark from image"""
31
- try:
32
- # First try PNG metadata method
33
  try:
34
- im = Image.open(image_name)
35
- if "TXT" in im.info:
36
- return im.info["TXT"]
37
- except:
38
- pass
39
-
40
- # Steganography method
41
- image = cv2.imread(image_name)
42
- if image is None:
43
- raise ValueError("Could not read image file")
44
-
45
- image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
46
- binary_data = ""
47
-
48
- # Extract binary data from image
49
- for row in image:
50
- for pixel in row:
51
- r, g, b = to_bin(pixel)
52
- binary_data += r[-1]
53
- binary_data += g[-1]
54
- binary_data += b[-1]
55
 
56
- # Convert binary string to bytes
57
- bytes_data = b''
58
- for i in range(0, len(binary_data), 8):
59
- byte = binary_data[i:i+8]
60
- if len(byte) == 8:
61
- try:
62
- byte_val = int(byte, 2)
63
- bytes_data += bytes([byte_val])
64
- except ValueError:
65
- continue
66
 
67
- # Try to find the end marker in raw bytes
 
68
  try:
69
- end_marker = b'====='
70
- if end_marker in bytes_data:
71
- bytes_data = bytes_data.split(end_marker)[0]
 
 
 
 
 
 
 
 
 
72
 
73
- # Try to find the delimiter in raw bytes
74
- delimiter = b'#####'
75
- if delimiter in bytes_data:
76
- bytes_data = bytes_data.split(delimiter)[0]
77
 
78
- # Decode the bytes to string
79
- decoded_text = bytes_data.decode('utf-8', errors='ignore')
80
- return decoded_text.strip()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
81
 
82
  except Exception as e:
83
- print(f"Decoding error: {e}")
84
- # If regular decoding fails, try character by character
85
- result = ""
86
- current_bytes = bytearray()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
87
 
88
- for byte in bytes_data:
89
- current_bytes.append(byte)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
90
  try:
91
- char = current_bytes.decode('utf-8')
92
- result += char
93
  current_bytes = bytearray()
94
- except UnicodeDecodeError:
95
- continue
96
 
97
- if result:
98
- return result.strip()
99
- return "Error: Could not decode watermark"
100
-
101
- except Exception as e:
102
- return f"Error detecting watermark: {str(e)}"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
103
 
104
- def encode(image_name, secret_data, txt=None):
105
- """Encode watermark using steganography"""
106
- try:
107
- image = cv2.imread(image_name)
108
- if image is None:
109
- raise ValueError("Could not read image file")
110
-
111
- image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
112
-
113
- # Calculate maximum bytes that can be encoded
114
- n_bytes = image.shape[0] * image.shape[1] * 3 // 8
115
-
116
- # Prepare the data with delimiters
117
- secret_data = str(secret_data)
118
- complete_data = secret_data + "#####" + "====="
119
-
120
- # Convert to binary
121
- binary_secret_data = to_bin(complete_data)
122
-
123
- # Check if the data can fit in the image
124
- if len(binary_secret_data) > n_bytes * 8:
125
- return image_name, "Watermark is too large for Image Size"
126
-
127
- data_index = 0
128
- binary_len = len(binary_secret_data)
129
-
130
- # Embed the data
131
- for i in range(image.shape[0]):
132
- for j in range(image.shape[1]):
133
- if data_index < binary_len:
134
- pixel = image[i, j]
135
- for color_channel in range(3):
136
- if data_index < binary_len:
137
- # Get the binary value of the pixel
138
- binary_value = format(pixel[color_channel], '08b')
139
- # Replace the least significant bit
140
- binary_value = binary_value[:-1] + binary_secret_data[data_index]
141
- # Update the pixel value
142
- image[i, j, color_channel] = int(binary_value, 2)
143
- data_index += 1
144
- else:
145
- break
146
-
147
- # Save the result
148
- output_path = "watermarked_image.png"
149
- cv2.imwrite(output_path, cv2.cvtColor(image, cv2.COLOR_RGB2BGR))
150
- return output_path, "Watermark embedded successfully"
151
-
152
- except Exception as e:
153
- return image_name, f"Error encoding watermark: {str(e)}"
 
1
+ # utils.py
2
+
3
  import cv2
4
  import numpy as np
5
+ from PIL import Image, PngImagePlugin, ImageDraw
6
+ import json
7
+ from datetime import datetime
8
+ from cryptography.fernet import Fernet
9
+ import base64
10
+ import hashlib
11
 
12
+ class WatermarkProcessor:
13
+ def __init__(self, encryption_key=None):
14
+ """Initialize with optional encryption key"""
15
+ if encryption_key:
16
+ self.fernet = Fernet(encryption_key)
17
+ else:
18
+ key = Fernet.generate_key()
19
+ self.fernet = Fernet(key)
 
 
20
 
21
+ def to_bin(self, data):
22
+ """Convert data to binary format as string"""
23
+ if isinstance(data, str):
24
+ return ''.join(format(x, '08b') for x in data.encode('utf-8'))
25
+ elif isinstance(data, bytes):
26
+ return ''.join(format(x, '08b') for x in data)
27
+ elif isinstance(data, np.ndarray):
28
+ return [format(i, "08b") for i in data]
29
+ elif isinstance(data, int) or isinstance(data, np.uint8):
30
+ return format(data, "08b")
31
+ else:
32
+ raise TypeError("Type not supported.")
33
 
34
+ def create_preview(self, image_path, watermark_text, opacity=0.3):
35
+ """Create a preview of watermark on image"""
 
 
36
  try:
37
+ image = Image.open(image_path)
38
+ txt_layer = Image.new('RGBA', image.size, (255, 255, 255, 0))
39
+ draw = ImageDraw.Draw(txt_layer)
40
+
41
+ # Calculate text position
42
+ text_width = draw.textlength(watermark_text)
43
+ text_x = (image.width - text_width) // 2
44
+ text_y = image.height // 2
45
+
46
+ # Add watermark text
47
+ draw.text((text_x, text_y), watermark_text,
48
+ fill=(255, 255, 255, int(255 * opacity)))
49
+
50
+ # Combine layers
51
+ preview = Image.alpha_composite(image.convert('RGBA'), txt_layer)
52
+ return preview
53
+ except Exception as e:
54
+ return None
 
 
 
55
 
56
+ def png_encode(self, im_name, extra):
57
+ """Encode watermark using PNG metadata"""
58
+ try:
59
+ im = Image.open(im_name)
60
+ info = PngImagePlugin.PngInfo()
61
+ info.add_text("TXT", extra)
62
+ im.save("test.png", pnginfo=info)
63
+ return "test.png", "Watermark added successfully"
64
+ except Exception as e:
65
+ return im_name, f"Error adding watermark: {str(e)}"
66
 
67
+ def encode(self, image_path, watermark_text, metadata=None):
68
+ """Encode watermark using steganography with encryption"""
69
  try:
70
+ image = cv2.imread(image_path)
71
+ if image is None:
72
+ raise ValueError("Could not read image file")
73
+
74
+ image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
75
+
76
+ # Prepare watermark data
77
+ watermark_data = {
78
+ 'text': watermark_text,
79
+ 'timestamp': datetime.now().isoformat(),
80
+ 'metadata': metadata or {}
81
+ }
82
 
83
+ # Add image hash
84
+ image_copy = image.copy() & 0xFE # Clear LSB
85
+ watermark_data['image_hash'] = hashlib.sha256(image_copy.tobytes()).hexdigest()
 
86
 
87
+ # Encrypt data
88
+ secret_data = json.dumps(watermark_data)
89
+ secret_data = f"{secret_data}#####=====" # Add delimiters
90
+ binary_secret_data = self.to_bin(secret_data)
91
+
92
+ # Calculate capacity
93
+ n_bytes = image.shape[0] * image.shape[1] * 3 // 8
94
+ if len(binary_secret_data) > n_bytes * 8:
95
+ return image_path, "Watermark is too large for Image Size"
96
+
97
+ # Embed data
98
+ data_index = 0
99
+ for i in range(image.shape[0]):
100
+ for j in range(image.shape[1]):
101
+ if data_index < len(binary_secret_data):
102
+ pixel = image[i, j]
103
+ for k in range(3):
104
+ if data_index < len(binary_secret_data):
105
+ binary_value = format(pixel[k], '08b')
106
+ binary_value = binary_value[:-1] + binary_secret_data[data_index]
107
+ image[i, j, k] = int(binary_value, 2)
108
+ data_index += 1
109
+
110
+ # Save result
111
+ output_path = "watermarked_" + datetime.now().strftime("%Y%m%d_%H%M%S") + ".png"
112
+ cv2.imwrite(output_path, cv2.cvtColor(image, cv2.COLOR_RGB2BGR))
113
+ return output_path, "Watermark added successfully"
114
 
115
  except Exception as e:
116
+ return image_path, f"Error in encoding: {str(e)}"
117
+
118
+ def decode(self, image_path):
119
+ """Decode watermark with improved error handling"""
120
+ try:
121
+ # Try PNG metadata method first
122
+ try:
123
+ im = Image.open(image_path)
124
+ if "TXT" in im.info:
125
+ return im.info["TXT"]
126
+ except:
127
+ pass
128
+
129
+ # Steganography method
130
+ image = cv2.imread(image_path)
131
+ if image is None:
132
+ raise ValueError("Could not read image file")
133
+
134
+ image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
135
+
136
+ # Extract binary data
137
+ binary_data = ""
138
+ for row in image:
139
+ for pixel in row:
140
+ for value in pixel:
141
+ binary_data += format(value, '08b')[-1]
142
 
143
+ # Convert to bytes with error checking
144
+ bytes_data = bytearray()
145
+ for i in range(0, len(binary_data), 8):
146
+ if i + 8 <= len(binary_data):
147
+ try:
148
+ byte = int(binary_data[i:i+8], 2)
149
+ bytes_data.append(byte)
150
+ except ValueError:
151
+ continue
152
+
153
+ # Process the data with robust decoding
154
+ try:
155
+ # First try UTF-8 decoding with error handling
156
+ decoded_data = bytes(bytes_data).decode('utf-8', errors='ignore')
157
+
158
+ # Clean up decoded data
159
+ decoded_data = ''.join(char for char in decoded_data if ord(char) < 128 or ord(char) > 160)
160
+
161
+ # Find markers
162
+ end_marker = "====="
163
+ delimiter = "#####"
164
+
165
+ if end_marker in decoded_data:
166
+ parts = decoded_data.split(end_marker)
167
+ decoded_data = parts[0] # Take the part before =====
168
+
169
+ if delimiter in decoded_data:
170
+ parts = decoded_data.split(delimiter)
171
+ decoded_data = parts[0] # Take the part before #####
172
+
173
+ # Clean the decoded data
174
+ decoded_data = decoded_data.strip()
175
+
176
+ # Try to parse as JSON
177
+ try:
178
+ watermark_data = json.loads(decoded_data)
179
+ # Verify image hash
180
+ image_copy = image.copy() & 0xFE
181
+ current_hash = hashlib.sha256(image_copy.tobytes()).hexdigest()
182
+
183
+ if current_hash != watermark_data.get('image_hash'):
184
+ return "Warning: Image has been modified after watermarking"
185
+
186
+ return json.dumps(watermark_data, indent=2, ensure_ascii=False)
187
+ except json.JSONDecodeError:
188
+ # If not valid JSON, return cleaned decoded data
189
+ return decoded_data
190
+
191
+ except UnicodeDecodeError:
192
+ # Try alternative decoding method
193
  try:
194
+ fallback_decoded = ""
 
195
  current_bytes = bytearray()
 
 
196
 
197
+ for byte in bytes_data:
198
+ current_bytes.append(byte)
199
+ try:
200
+ char = current_bytes.decode('utf-8')
201
+ fallback_decoded += char
202
+ current_bytes = bytearray()
203
+ except UnicodeDecodeError:
204
+ if len(current_bytes) >= 4: # Maximum UTF-8 character length
205
+ current_bytes = bytearray()
206
+ continue
207
+
208
+ if fallback_decoded:
209
+ return fallback_decoded.strip()
210
+ else:
211
+ return "Error: Could not decode watermark data"
212
+
213
+ except Exception as e:
214
+ return f"Error in fallback decoding: {str(e)}"
215
+
216
+ except Exception as e:
217
+ return f"Error in decoding: {str(e)}"
218
 
219
+ def analyze_quality(self, original_path, watermarked_path):
220
+ """Analyze watermark quality"""
221
+ try:
222
+ original = cv2.imread(original_path)
223
+ watermarked = cv2.imread(watermarked_path)
224
+
225
+ if original is None or watermarked is None:
226
+ raise ValueError("Could not read image files")
227
+
228
+ # Calculate PSNR
229
+ mse = np.mean((original - watermarked) ** 2)
230
+ if mse == 0:
231
+ psnr = float('inf')
232
+ else:
233
+ psnr = 20 * np.log10(255.0 / np.sqrt(mse))
234
+
235
+ # Calculate histogram similarity
236
+ hist_original = cv2.calcHist([original], [0], None, [256], [0, 256])
237
+ hist_watermarked = cv2.calcHist([watermarked], [0], None, [256], [0, 256])
238
+ hist_correlation = cv2.compareHist(hist_original, hist_watermarked, cv2.HISTCMP_CORREL)
239
+
240
+ # Count modified pixels
241
+ diff = cv2.bitwise_xor(original, watermarked)
242
+ modified_pixels = np.count_nonzero(diff)
243
+
244
+ report = {
245
+ 'psnr': round(psnr, 2),
246
+ 'histogram_similarity': round(hist_correlation, 4),
247
+ 'modified_pixels': modified_pixels,
248
+ 'image_size': original.shape,
249
+ 'quality_score': round((psnr / 50) * 100, 2) if psnr != float('inf') else 100
250
+ }
251
+
252
+ return json.dumps(report, indent=2)
253
+
254
+ except Exception as e:
255
+ return f"Error in quality analysis: {str(e)}"