Spaces:
Running
Running
import json | |
import pathlib | |
import time | |
from typing import cast | |
import click | |
import filetype | |
from tqdm import tqdm | |
from watchdog.events import FileSystemEvent, FileSystemEventHandler | |
from watchdog.observers import Observer | |
from ..bg import remove | |
from ..session_factory import new_session | |
from ..sessions import sessions_names | |
def p_command( | |
model: str, | |
extras: str, | |
input: pathlib.Path, | |
output: pathlib.Path, | |
watch: bool, | |
delete_input: bool, | |
**kwargs, | |
) -> None: | |
""" | |
Command-line interface (CLI) program for performing background removal on images in a folder. | |
This program takes a folder as input and uses a specified model to remove the background from the images in the folder. | |
It provides various options for configuration, such as choosing the model, enabling alpha matting, setting trimap thresholds, erode size, etc. | |
Additional options include outputting only the mask and post-processing the mask. | |
The program can also watch the input folder for changes and automatically process new images. | |
The resulting images with the background removed are saved in the specified output folder. | |
Parameters: | |
model (str): The name of the model to use for background removal. | |
extras (str): Additional options in JSON format. | |
input (pathlib.Path): The path to the input folder. | |
output (pathlib.Path): The path to the output folder. | |
watch (bool): Whether to watch the input folder for changes. | |
delete_input (bool): Whether to delete the input file after processing. | |
**kwargs: Additional keyword arguments. | |
Returns: | |
None | |
""" | |
try: | |
kwargs.update(json.loads(extras)) | |
except Exception: | |
pass | |
session = new_session(model, **kwargs) | |
def process(each_input: pathlib.Path) -> None: | |
try: | |
mimetype = filetype.guess(each_input) | |
if mimetype is None: | |
return | |
if mimetype.mime.find("image") < 0: | |
return | |
each_output = (output / each_input.name).with_suffix(".png") | |
each_output.parents[0].mkdir(parents=True, exist_ok=True) | |
if not each_output.exists(): | |
each_output.write_bytes( | |
cast( | |
bytes, | |
remove(each_input.read_bytes(), session=session, **kwargs), | |
) | |
) | |
if watch: | |
print( | |
f"processed: {each_input.absolute()} -> {each_output.absolute()}" | |
) | |
if delete_input: | |
each_input.unlink() | |
except Exception as e: | |
print(e) | |
inputs = list(input.glob("**/*")) | |
if not watch: | |
inputs_tqdm = tqdm(inputs) | |
for each_input in inputs_tqdm: | |
if not each_input.is_dir(): | |
process(each_input) | |
if watch: | |
should_watch = True | |
observer = Observer() | |
class EventHandler(FileSystemEventHandler): | |
def on_any_event(self, event: FileSystemEvent) -> None: | |
src_path = cast(str, event.src_path) | |
if ( | |
not ( | |
event.is_directory or event.event_type in ["deleted", "closed"] | |
) | |
and pathlib.Path(src_path).exists() | |
): | |
if src_path.endswith("stop.txt"): | |
nonlocal should_watch | |
should_watch = False | |
pathlib.Path(src_path).unlink() | |
return | |
process(pathlib.Path(src_path)) | |
event_handler = EventHandler() | |
observer.schedule(event_handler, str(input), recursive=False) | |
observer.start() | |
try: | |
while should_watch: | |
time.sleep(1) | |
finally: | |
observer.stop() | |
observer.join() | |