File size: 3,781 Bytes
2fdce3c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
#!/workspace/kohya_ss/venv/bin/python

"""
invisible-watermark is the utility to add hidden watermark and extract watermark from image

embed watermark:  ./invisible-watermark -v -a encode -t bytes -m dwtDct -w 'hello' -o ./test_vectors/wm.png ./test_vectors/original.jpg

decode watermark: ./invisible-watermark -v -a decode -t bytes -m dwtDct -l 40 ./test_vectors/wm.png
"""

import argparse
import os
import shutil
import sys
import time
import uuid
import cv2
import base64
import time
import pprint

from imwatermark import WatermarkEncoder, WatermarkDecoder

pp = pprint.PrettyPrinter(indent=2)


class CustomArgParseFormatter(
    argparse.ArgumentDefaultsHelpFormatter,
    argparse.RawDescriptionHelpFormatter):
  pass


def main():
    description = __doc__.format()
    parser = argparse.ArgumentParser(description=description,
                                   formatter_class=CustomArgParseFormatter)

    parser.add_argument('input',
                        help='The path of input')
    parser.add_argument('-a', '--action', required=True, help='encode|decode')
    parser.add_argument('-t', '--type', default='bits', help='bytes|b16|bits|uuid|ipv4')
    parser.add_argument('-m', '--method', default='maxDct', help='dwtDct|dwtDctSvd|rivaGan')
    parser.add_argument('-w', '--watermark', default='', help='embedded string')
    parser.add_argument('-l', '--length', default=0, type=int,
                        help='watermark bits length, required for bytes|b16|bits watermark')
    parser.add_argument('-o', '--output', required=False,
                        help='The path of output')
    parser.add_argument('-v', '--verbose', required=False, default=False,
                        action='store_true', help='print info')
    args = parser.parse_args()
    action = args.action
    inputFile = args.input

    if action == 'encode':
        if not args.output:
            sys.stderr.write('output is required. exiting...\n')
            sys.exit(1)
        if not args.watermark:
            sys.stderr.write('watermark is required. exiting...\n')
            sys.exit(1)

        encoder = WatermarkEncoder()
        wmType = args.type
        if args.method == 'rivaGan':
            WatermarkEncoder.loadModel()
        bgr = cv2.imread(args.input)
        if args.type == 'bytes':
            wm = args.watermark.encode('utf-8')
        elif args.type == 'b16':
            wm = args.watermark.upper().encode('utf-8')
        elif args.type == 'bits':
            wm = [int(c) % 2 for c in args.watermark]
        else:
            wm = args.watermark
        encoder.set_watermark(wmType, wm)
        start = time.time()
        bgr_encoded = encoder.encode(bgr, args.method)
        if args.verbose:
            print('watermark length:', encoder.get_length())
            print('encode time ms:', (time.time()-start) * 1000)
        cv2.imwrite(args.output, bgr_encoded)
    elif action == 'decode':
        if args.type in ['bytes', 'bits', 'b16']:
            if args.length <= 0:
                sys.stderr.write('length is required for bytes watermark decoding\n')
                sys.exit(1)
            wmType = args.type
            decoder = WatermarkDecoder(wmType, args.length)
        else:
            decoder = WatermarkDecoder(args.type)
        if args.method == 'rivaGan':
            WatermarkDecoder.loadModel()
        bgr = cv2.imread(args.input)
        start = time.time()
        wm = decoder.decode(bgr, args.method)
        if args.verbose:
            print('decode time ms:', (time.time()-start) * 1000)
        if args.type in ['bytes', 'b16']:
            wm = wm.decode('utf-8')
        print(wm)
    else:
        sys.stderr.write('unknown action\n')
        sys.exit(1)

if __name__ == '__main__':
    sys.exit(main())