import os, zipfile, shutil, subprocess, shlex, sys # noqa from urllib.parse import urlparse import re import logging def load_file_from_url( url: str, model_dir: str, file_name: str | None = None, overwrite: bool = False, progress: bool = True, ) -> str: """Download a file from `url` into `model_dir`, using the file present if possible. Returns the path to the downloaded file. """ os.makedirs(model_dir, exist_ok=True) if not file_name: parts = urlparse(url) file_name = os.path.basename(parts.path) cached_file = os.path.abspath(os.path.join(model_dir, file_name)) # Overwrite if os.path.exists(cached_file): if overwrite or os.path.getsize(cached_file) == 0: remove_files(cached_file) # Download if not os.path.exists(cached_file): logger.info(f'Downloading: "{url}" to {cached_file}\n') from torch.hub import download_url_to_file download_url_to_file(url, cached_file, progress=progress) else: logger.debug(cached_file) return cached_file def friendly_name(file: str): if file.startswith("http"): file = urlparse(file).path file = os.path.basename(file) model_name, extension = os.path.splitext(file) return model_name, extension def download_manager( url: str, path: str, extension: str = "", overwrite: bool = False, progress: bool = True, ): url = url.strip() name, ext = friendly_name(url) name += ext if not extension else f".{extension}" if url.startswith("http"): filename = load_file_from_url( url=url, model_dir=path, file_name=name, overwrite=overwrite, progress=progress, ) else: filename = path return filename def remove_files(file_list): if isinstance(file_list, str): file_list = [file_list] for file in file_list: if os.path.exists(file): os.remove(file) def remove_directory_contents(directory_path): """ Removes all files and subdirectories within a directory. Parameters: directory_path (str): Path to the directory whose contents need to be removed. """ if os.path.exists(directory_path): for filename in os.listdir(directory_path): file_path = os.path.join(directory_path, filename) try: if os.path.isfile(file_path): os.remove(file_path) elif os.path.isdir(file_path): shutil.rmtree(file_path) except Exception as e: logger.error(f"Failed to delete {file_path}. Reason: {e}") logger.info(f"Content in '{directory_path}' removed.") else: logger.error(f"Directory '{directory_path}' does not exist.") # Create directory if not exists def create_directories(directory_path): if isinstance(directory_path, str): directory_path = [directory_path] for one_dir_path in directory_path: if not os.path.exists(one_dir_path): os.makedirs(one_dir_path) logger.debug(f"Directory '{one_dir_path}' created.") def setup_logger(name_log): logger = logging.getLogger(name_log) logger.setLevel(logging.INFO) _default_handler = logging.StreamHandler() # Set sys.stderr as stream. _default_handler.flush = sys.stderr.flush logger.addHandler(_default_handler) logger.propagate = False handlers = logger.handlers for handler in handlers: formatter = logging.Formatter("[%(levelname)s] >> %(message)s") handler.setFormatter(formatter) # logger.handlers return logger logger = setup_logger("ss") logger.setLevel(logging.INFO)