bluestarburst's picture
Upload folder using huggingface_hub
ddb7519
#!/usr/bin/env python3
"""Compress images in a folder to a maximum megapixel size."""
import argparse
import asyncio
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from glob import iglob
from multiprocessing import cpu_count
from queue import Queue
from PIL import Image, ImageFile, ImageOps
# Prevent errors from halting the script.
ImageFile.LOAD_TRUNCATED_IMAGES = True
Image.warnings.simplefilter("error", Image.DecompressionBombWarning)
VERSION = "2.0"
SHORT_DESCRIPTION = "Compress images in a directory."
SUPPORTED_EXTENSIONS = [".jpg", ".jpeg", ".png", ".webp"]
def get_args(**parser_kwargs):
"""Get command-line options."""
parser = argparse.ArgumentParser(**parser_kwargs)
parser.add_argument(
"--img_dir",
type=str,
default="input",
help="path to image directory (default: 'input')",
)
parser.add_argument(
"--out_dir",
type=str,
default=None,
help="path to output directory (default: IMG_DIR)",
)
parser.add_argument(
"--max_mp",
type=float,
default=1.5,
help="maximum megapixels (default: 1.5)",
)
parser.add_argument(
"--quality",
type=int,
default=95,
help="save quality (default: 95, range: 0-100, suggested: 90+)",
)
parser.add_argument(
"--overwrite",
action="store_true",
default=False,
help="overwrite files in output directory",
)
parser.add_argument(
"--noresize",
action="store_true",
default=False,
help="do not resize, just fix orientation",
)
parser.add_argument(
"--delete",
action="store_true",
default=False,
help="delete original files after processing",
)
args = parser.parse_args()
args.out_dir = args.out_dir or args.img_dir
args.max_mp = args.max_mp * 1024000
return args
def images(img_dir):
"""Return each image in the input directory."""
for file in iglob(f"{img_dir}/*.*"):
if file.lower().endswith(tuple(SUPPORTED_EXTENSIONS)):
yield file
def inline(msg, newline=False):
"""Print a message on the same line."""
msg = f"\r{msg}"
msg += " " * (79 - len(msg))
print(msg, end="\n" if newline else "", flush=True)
def launch_workers(queue, args):
"""Launch a pool of workers."""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
tasks = [loop.create_task(worker(queue, args)) for _ in range(10)]
loop.run_until_complete(asyncio.wait(tasks))
async def open_img(path):
"""Open an image."""
loop = asyncio.get_running_loop()
try:
return await loop.run_in_executor(None, Image.open, path)
except Exception as err:
inline(f"[!] Error Opening: {path} - {err}", True)
return None
def oversize(img, max_mp):
"""Check if an image is larger than the maximum size."""
return (img.width * img.height) > max_mp
async def process(image, args):
"""Process an image."""
outfile = image.replace(args.img_dir, args.out_dir).replace(
os.path.splitext(image)[1], ".webp"
)
if args.overwrite or not os.path.exists(outfile):
img = await open_img(image)
if img:
newimg = transpose(img)
if not args.noresize and oversize(newimg, args.max_mp):
newimg = shrink(newimg, args)
if newimg != img:
await save_img(newimg, outfile, args)
if args.delete and outfile != image:
os.remove(image)
def slow_save(path, args, img):
"""Save an image."""
try:
img.save(path, "webp", quality=args.quality)
inline(f"[+] Compressed: {path}")
except Exception as err:
inline(f"[!] Error Saving: {path} - {err}", True)
async def save_img(img, path, args):
"""Save an image."""
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, slow_save, path, args, img)
def scan_path(queue, args):
"""Scan the input directory for images."""
inline("[*] Scanning for images...", True)
for image in images(args.img_dir):
inline(f"[+] {image}")
queue.put(image)
def shrink(img, args):
"""Shrink an image."""
hw = img.size
ratio = args.max_mp / (hw[0]*hw[1])
newhw = (int(hw[0]*ratio**0.5), int(hw[1]*ratio**0.5))
try:
return img.resize(newhw, Image.BICUBIC)
except Exception as err:
inline(f"[!] Error Shrinking: {img.filename} - {err}", True)
return img
def start_compression(queue, args):
"""Start the compression process."""
inline("[*] Compressing images...", True)
inline("[-] (scanning...)")
with ThreadPoolExecutor() as executor:
workers = {
executor.submit(launch_workers, queue, args): None
for _ in range(cpu_count())
}
for _ in as_completed(workers):
pass
inline("[!] Done!", True)
def transpose(img):
"""Transpose an image."""
try:
return ImageOps.exif_transpose(img)
except Exception as err:
inline(f"[!] Error Transposing: {img.filename} - {err}", True)
return img
async def worker(queue, args):
"""Handle images from the queue until they're gone."""
while not queue.empty():
image = queue.get()
await process(image, args)
def main():
"""Run the program."""
queue = Queue()
args = get_args(description=SHORT_DESCRIPTION)
inline(f"[>] Image Compression Utility v{VERSION}", True)
scan_path(queue, args)
start_compression(queue, args)
if __name__ == "__main__":
main()