Spaces:
Runtime error
Runtime error
import os | |
import subprocess | |
import concurrent.futures | |
from PIL import Image | |
from tqdm import tqdm | |
from PIL import Image, ExifTags | |
target_resolutions = [ | |
(640, 1632), # 640 * 1632 = 1044480 | |
(704, 1472), # 704 * 1472 = 1036288 | |
(768, 1360), # 768 * 1360 = 1044480 | |
(832, 1248), # 832 * 1248 = 1038336 | |
(896, 1152), | |
(960, 1088), # 960 * 1088 = 1044480 | |
(992, 1056), # 992 * 1056 = 1047552 | |
(1024, 1024), # 1024 * 1024 = 1048576 | |
(1056, 992), # 1056 * 992 = 1047552 | |
(1088, 960), # 1088 * 960 = 1044480 | |
(1152, 896), | |
(1248, 832), # 1248 * 832 = 1038336 | |
(1360, 768), # 1360 * 768 = 1044480 | |
(1472, 704), # 1472 * 704 = 1036288 | |
(1632, 640), # 1632 * 640 = 1044480 | |
# (768, 1360), # 768 * 1360 = 1044480 | |
# (1472, 704), # 1472 * 704 = 1036288 | |
# (1024, 1024), # 1024 * 1024 = 1048576 | |
] | |
# 图像预处理 | |
def apply_exif_orientation(image): | |
try: | |
for orientation in ExifTags.TAGS.keys(): | |
if ExifTags.TAGS[orientation] == 'Orientation': | |
break | |
exif = image._getexif() | |
if exif is not None: | |
exif = dict(exif.items()) | |
orientation_value = exif.get(orientation) | |
if orientation_value == 3: | |
image = image.rotate(180, expand=True) | |
elif orientation_value == 6: | |
image = image.rotate(270, expand=True) | |
elif orientation_value == 8: | |
image = image.rotate(90, expand=True) | |
except (AttributeError, KeyError, IndexError, TypeError): | |
# cases: image don't have getexif | |
pass | |
return image | |
def convert_image_to_jpg(img, img_path): | |
"""Convert an Image object to JPG.""" | |
# Remove extension from original filename and add .jpg | |
base_name = os.path.splitext(img_path)[0] | |
jpg_path = base_name + '.jpg' | |
# Convert image to RGB if it is RGBA (or any other mode) | |
if img.mode != 'RGB': | |
img = img.convert('RGB') | |
img.save(jpg_path, format='JPEG', quality=100) | |
def process_image(img_path): | |
try: | |
if img_path.lower().endswith((".jpg", ".png", ".bmp", ".gif", ".tif", ".tiff", ".jpeg", ".webp")): | |
img = Image.open(img_path) | |
img = apply_exif_orientation(img) # Apply the EXIF orientation | |
# Convert to 'RGB' if it is 'RGBA' or any other mode | |
img = img.convert('RGB') | |
# 计算原图像的宽高比 | |
original_aspect_ratio = img.width / img.height | |
# 找到最接近原图像宽高比的目标分辨率 | |
target_resolution = min(target_resolutions, key=lambda res: abs(original_aspect_ratio - res[0] / res[1])) | |
# 计算新的维度 | |
if img.width / target_resolution[0] < img.height / target_resolution[1]: | |
new_width = target_resolution[0] | |
new_height = int(img.height * target_resolution[0] / img.width) | |
else: | |
new_height = target_resolution[1] | |
new_width = int(img.width * target_resolution[1] / img.height) | |
# 等比缩放图像 | |
img = img.resize((new_width, new_height), Image.LANCZOS) | |
# 计算裁剪的区域 | |
left = int((img.width - target_resolution[0]) / 2) | |
top = int((img.height - target_resolution[1]) / 2) | |
right = int((img.width + target_resolution[0]) / 2) | |
bottom = int((img.height + target_resolution[1]) / 2) | |
# 裁剪图像 | |
img = img.crop((left, top, right, bottom)) | |
# 转换并保存图像为JPG格式 | |
convert_image_to_jpg(img, img_path) | |
except Exception as e: | |
print(f"Error processing image {img_path}: {e}") | |
return None | |
def delete_non_jpg_files(folder_path): | |
"""Delete all non-jpg image files in a directory, but keep txt files.""" | |
for dirpath, dirnames, filenames in os.walk(folder_path): | |
for filename in filenames: | |
if not filename.lower().endswith((".jpg", ".txt")): | |
file_path = os.path.join(dirpath, filename) | |
try: | |
os.remove(file_path) | |
except Exception as e: | |
print(f"Error occurred while deleting file : {file_path}. Error : {str(e)}") | |
def process_images_in_folder(folder_path): | |
""" | |
Process all images in the given folder according to the target resolutions, | |
then delete all non-jpg files except for .txt files. | |
""" | |
processed_files = [] | |
file_list = [os.path.join(dirpath, filename) | |
for dirpath, dirnames, filenames in os.walk(folder_path) | |
for filename in filenames] | |
with concurrent.futures.ThreadPoolExecutor() as executor: | |
list(tqdm(executor.map(process_image, file_list), total=len(file_list))) | |
delete_non_jpg_files(folder_path) | |
return f"Processed images in folder: {folder_path}" | |
# 失败检查 | |
def run_script(folder_path, keywords): | |
keywords = keywords if keywords else "sorry,error" | |
result = subprocess.run( | |
[ | |
'python', './lib/Failed_Tagging_File_Screening.py', | |
'--image_path', folder_path, | |
'--keywords', keywords | |
], | |
capture_output=True, text=True | |
) | |
return result.stdout if result.stdout else "No Output", result.stderr if result.stderr else "No Error" | |