# Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. # # This source code is licensed under the license found in the # LICENSE file in the root directory of this source tree. import os import os.path as osp import re import tempfile from contextlib import contextmanager from pathlib import Path from shutil import SameFileError from typing import Generator, Iterator, Optional, Tuple, Union import mmengine from mmengine.utils import has_method from .base import BaseStorageBackend class PetrelBackend(BaseStorageBackend): """Petrel storage backend (for internal usage). PetrelBackend supports reading and writing data to multiple clusters. If the file path contains the cluster name, PetrelBackend will read data from specified cluster or write data to it. Otherwise, PetrelBackend will access the default cluster. Args: path_mapping (dict, optional): Path mapping dict from local path to Petrel path. When ``path_mapping={'src': 'dst'}``, ``src`` in ``filepath`` will be replaced by ``dst``. Defaults to None. enable_mc (bool, optional): Whether to enable memcached support. Defaults to True. conf_path (str, optional): Config path of Petrel client. Default: None. `New in version 0.3.3`. Examples: >>> backend = PetrelBackend() >>> filepath1 = 'petrel://path/of/file' >>> filepath2 = 'cluster-name:petrel://path/of/file' >>> backend.get(filepath1) # get data from default cluster >>> client.get(filepath2) # get data from 'cluster-name' cluster """ def __init__(self, path_mapping: Optional[dict] = None, enable_mc: bool = True, conf_path: Optional[str] = None): try: from petrel_client import client except ImportError: raise ImportError('Please install petrel_client to enable ' 'PetrelBackend.') self._client = client.Client(conf_path=conf_path, enable_mc=enable_mc) assert isinstance(path_mapping, dict) or path_mapping is None self.path_mapping = path_mapping def _map_path(self, filepath: Union[str, Path]) -> str: """Map ``filepath`` to a string path whose prefix will be replaced by :attr:`self.path_mapping`. Args: filepath (str or Path): Path to be mapped. """ filepath = str(filepath) if self.path_mapping is not None: for k, v in self.path_mapping.items(): filepath = filepath.replace(k, v, 1) return filepath def _format_path(self, filepath: str) -> str: """Convert a ``filepath`` to standard format of petrel oss. If the ``filepath`` is concatenated by ``os.path.join``, in a Windows environment, the ``filepath`` will be the format of 's3://bucket_name\\image.jpg'. By invoking :meth:`_format_path`, the above ``filepath`` will be converted to 's3://bucket_name/image.jpg'. Args: filepath (str): Path to be formatted. """ return re.sub(r'\\+', '/', filepath) def _replace_prefix(self, filepath: Union[str, Path]) -> str: filepath = str(filepath) return filepath.replace('petrel://', 's3://') def get(self, filepath: Union[str, Path]) -> bytes: """Read bytes from a given ``filepath`` with 'rb' mode. Args: filepath (str or Path): Path to read data. Returns: bytes: Return bytes read from filepath. Examples: >>> backend = PetrelBackend() >>> filepath = 'petrel://path/of/file' >>> backend.get(filepath) b'hello world' """ filepath = self._map_path(filepath) filepath = self._format_path(filepath) filepath = self._replace_prefix(filepath) value = self._client.Get(filepath) return value def get_text( self, filepath: Union[str, Path], encoding: str = 'utf-8', ) -> str: """Read text from a given ``filepath`` with 'r' mode. Args: filepath (str or Path): Path to read data. encoding (str): The encoding format used to open the ``filepath``. Defaults to 'utf-8'. Returns: str: Expected text reading from ``filepath``. Examples: >>> backend = PetrelBackend() >>> filepath = 'petrel://path/of/file' >>> backend.get_text(filepath) 'hello world' """ return str(self.get(filepath), encoding=encoding) def put(self, obj: bytes, filepath: Union[str, Path]) -> None: """Write bytes to a given ``filepath``. Args: obj (bytes): Data to be saved. filepath (str or Path): Path to write data. Examples: >>> backend = PetrelBackend() >>> filepath = 'petrel://path/of/file' >>> backend.put(b'hello world', filepath) """ filepath = self._map_path(filepath) filepath = self._format_path(filepath) filepath = self._replace_prefix(filepath) self._client.put(filepath, obj) def put_text( self, obj: str, filepath: Union[str, Path], encoding: str = 'utf-8', ) -> None: """Write text to a given ``filepath``. Args: obj (str): Data to be written. filepath (str or Path): Path to write data. encoding (str): The encoding format used to encode the ``obj``. Defaults to 'utf-8'. Examples: >>> backend = PetrelBackend() >>> filepath = 'petrel://path/of/file' >>> backend.put_text('hello world', filepath) """ self.put(bytes(obj, encoding=encoding), filepath) def exists(self, filepath: Union[str, Path]) -> bool: """Check whether a file path exists. Args: filepath (str or Path): Path to be checked whether exists. Returns: bool: Return ``True`` if ``filepath`` exists, ``False`` otherwise. Examples: >>> backend = PetrelBackend() >>> filepath = 'petrel://path/of/file' >>> backend.exists(filepath) True """ if not (has_method(self._client, 'contains') and has_method(self._client, 'isdir')): raise NotImplementedError( 'Current version of Petrel Python SDK has not supported ' 'the `contains` and `isdir` methods, please use a higher' 'version or dev branch instead.') filepath = self._map_path(filepath) filepath = self._format_path(filepath) filepath = self._replace_prefix(filepath) return self._client.contains(filepath) or self._client.isdir(filepath) def isdir(self, filepath: Union[str, Path]) -> bool: """Check whether a file path is a directory. Args: filepath (str or Path): Path to be checked whether it is a directory. Returns: bool: Return ``True`` if ``filepath`` points to a directory, ``False`` otherwise. Examples: >>> backend = PetrelBackend() >>> filepath = 'petrel://path/of/dir' >>> backend.isdir(filepath) True """ if not has_method(self._client, 'isdir'): raise NotImplementedError( 'Current version of Petrel Python SDK has not supported ' 'the `isdir` method, please use a higher version or dev' ' branch instead.') filepath = self._map_path(filepath) filepath = self._format_path(filepath) filepath = self._replace_prefix(filepath) return self._client.isdir(filepath) def isfile(self, filepath: Union[str, Path]) -> bool: """Check whether a file path is a file. Args: filepath (str or Path): Path to be checked whether it is a file. Returns: bool: Return ``True`` if ``filepath`` points to a file, ``False`` otherwise. Examples: >>> backend = PetrelBackend() >>> filepath = 'petrel://path/of/file' >>> backend.isfile(filepath) True """ if not has_method(self._client, 'contains'): raise NotImplementedError( 'Current version of Petrel Python SDK has not supported ' 'the `contains` method, please use a higher version or ' 'dev branch instead.') filepath = self._map_path(filepath) filepath = self._format_path(filepath) filepath = self._replace_prefix(filepath) return self._client.contains(filepath) def join_path( self, filepath: Union[str, Path], *filepaths: Union[str, Path], ) -> str: r"""Concatenate all file paths. Join one or more filepath components intelligently. The return value is the concatenation of filepath and any members of \*filepaths. Args: filepath (str or Path): Path to be concatenated. Returns: str: The result after concatenation. Examples: >>> backend = PetrelBackend() >>> filepath = 'petrel://path/of/file' >>> backend.join_path(filepath, 'another/path') 'petrel://path/of/file/another/path' >>> backend.join_path(filepath, '/another/path') 'petrel://path/of/file/another/path' """ filepath = self._format_path(self._map_path(filepath)) if filepath.endswith('/'): filepath = filepath[:-1] formatted_paths = [filepath] for path in filepaths: formatted_path = self._format_path(self._map_path(path)) formatted_paths.append(formatted_path.lstrip('/')) return '/'.join(formatted_paths) @contextmanager def get_local_path( self, filepath: Union[str, Path], ) -> Generator[Union[str, Path], None, None]: """Download a file from ``filepath`` to a local temporary directory, and return the temporary path. ``get_local_path`` is decorated by :meth:`contxtlib.contextmanager`. It can be called with ``with`` statement, and when exists from the ``with`` statement, the temporary path will be released. Args: filepath (str or Path): Download a file from ``filepath``. Yields: Iterable[str]: Only yield one temporary path. Examples: >>> backend = PetrelBackend() >>> # After existing from the ``with`` clause, >>> # the path will be removed >>> filepath = 'petrel://path/of/file' >>> with backend.get_local_path(filepath) as path: ... # do something here """ assert self.isfile(filepath) try: f = tempfile.NamedTemporaryFile(delete=False) f.write(self.get(filepath)) f.close() yield f.name finally: os.remove(f.name) def copyfile( self, src: Union[str, Path], dst: Union[str, Path], ) -> str: """Copy a file src to dst and return the destination file. src and dst should have the same prefix. If dst specifies a directory, the file will be copied into dst using the base filename from src. If dst specifies a file that already exists, it will be replaced. Args: src (str or Path): A file to be copied. dst (str or Path): Copy file to dst. Returns: str: The destination file. Raises: SameFileError: If src and dst are the same file, a SameFileError will be raised. Examples: >>> backend = PetrelBackend() >>> # dst is a file >>> src = 'petrel://path/of/file' >>> dst = 'petrel://path/of/file1' >>> backend.copyfile(src, dst) 'petrel://path/of/file1' >>> # dst is a directory >>> dst = 'petrel://path/of/dir' >>> backend.copyfile(src, dst) 'petrel://path/of/dir/file' """ src = self._format_path(self._map_path(src)) dst = self._format_path(self._map_path(dst)) if self.isdir(dst): dst = self.join_path(dst, src.split('/')[-1]) if src == dst: raise SameFileError('src and dst should not be same') self.put(self.get(src), dst) return dst def copytree( self, src: Union[str, Path], dst: Union[str, Path], ) -> str: """Recursively copy an entire directory tree rooted at src to a directory named dst and return the destination directory. src and dst should have the same prefix. Args: src (str or Path): A directory to be copied. dst (str or Path): Copy directory to dst. backend_args (dict, optional): Arguments to instantiate the prefix of uri corresponding backend. Defaults to None. Returns: str: The destination directory. Raises: FileExistsError: If dst had already existed, a FileExistsError will be raised. Examples: >>> backend = PetrelBackend() >>> src = 'petrel://path/of/dir' >>> dst = 'petrel://path/of/dir1' >>> backend.copytree(src, dst) 'petrel://path/of/dir1' """ src = self._format_path(self._map_path(src)) dst = self._format_path(self._map_path(dst)) if self.exists(dst): raise FileExistsError('dst should not exist') for path in self.list_dir_or_file(src, list_dir=False, recursive=True): src_path = self.join_path(src, path) dst_path = self.join_path(dst, path) self.put(self.get(src_path), dst_path) return dst def copyfile_from_local( self, src: Union[str, Path], dst: Union[str, Path], ) -> str: """Upload a local file src to dst and return the destination file. Args: src (str or Path): A local file to be copied. dst (str or Path): Copy file to dst. backend_args (dict, optional): Arguments to instantiate the prefix of uri corresponding backend. Defaults to None. Returns: str: If dst specifies a directory, the file will be copied into dst using the base filename from src. Examples: >>> backend = PetrelBackend() >>> # dst is a file >>> src = 'path/of/your/file' >>> dst = 'petrel://path/of/file1' >>> backend.copyfile_from_local(src, dst) 'petrel://path/of/file1' >>> # dst is a directory >>> dst = 'petrel://path/of/dir' >>> backend.copyfile_from_local(src, dst) 'petrel://path/of/dir/file' """ dst = self._format_path(self._map_path(dst)) if self.isdir(dst): dst = self.join_path(dst, osp.basename(src)) with open(src, 'rb') as f: self.put(f.read(), dst) return dst def copytree_from_local( self, src: Union[str, Path], dst: Union[str, Path], ) -> str: """Recursively copy an entire directory tree rooted at src to a directory named dst and return the destination directory. Args: src (str or Path): A local directory to be copied. dst (str or Path): Copy directory to dst. Returns: str: The destination directory. Raises: FileExistsError: If dst had already existed, a FileExistsError will be raised. Examples: >>> backend = PetrelBackend() >>> src = 'path/of/your/dir' >>> dst = 'petrel://path/of/dir1' >>> backend.copytree_from_local(src, dst) 'petrel://path/of/dir1' """ dst = self._format_path(self._map_path(dst)) if self.exists(dst): raise FileExistsError('dst should not exist') src = str(src) for cur_dir, _, files in os.walk(src): for f in files: src_path = osp.join(cur_dir, f) dst_path = self.join_path(dst, src_path.replace(src, '')) self.copyfile_from_local(src_path, dst_path) return dst def copyfile_to_local( self, src: Union[str, Path], dst: Union[str, Path], ) -> Union[str, Path]: """Copy the file src to local dst and return the destination file. If dst specifies a directory, the file will be copied into dst using the base filename from src. If dst specifies a file that already exists, it will be replaced. Args: src (str or Path): A file to be copied. dst (str or Path): Copy file to to local dst. Returns: str: If dst specifies a directory, the file will be copied into dst using the base filename from src. Examples: >>> backend = PetrelBackend() >>> # dst is a file >>> src = 'petrel://path/of/file' >>> dst = 'path/of/your/file' >>> backend.copyfile_to_local(src, dst) 'path/of/your/file' >>> # dst is a directory >>> dst = 'path/of/your/dir' >>> backend.copyfile_to_local(src, dst) 'path/of/your/dir/file' """ if osp.isdir(dst): basename = osp.basename(src) if isinstance(dst, str): dst = osp.join(dst, basename) else: assert isinstance(dst, Path) dst = dst / basename with open(dst, 'wb') as f: f.write(self.get(src)) return dst def copytree_to_local( self, src: Union[str, Path], dst: Union[str, Path], ) -> Union[str, Path]: """Recursively copy an entire directory tree rooted at src to a local directory named dst and return the destination directory. Args: src (str or Path): A directory to be copied. dst (str or Path): Copy directory to local dst. backend_args (dict, optional): Arguments to instantiate the prefix of uri corresponding backend. Defaults to None. Returns: str: The destination directory. Examples: >>> backend = PetrelBackend() >>> src = 'petrel://path/of/dir' >>> dst = 'path/of/your/dir' >>> backend.copytree_to_local(src, dst) 'path/of/your/dir' """ for path in self.list_dir_or_file(src, list_dir=False, recursive=True): dst_path = osp.join(dst, path) mmengine.mkdir_or_exist(osp.dirname(dst_path)) with open(dst_path, 'wb') as f: f.write(self.get(self.join_path(src, path))) return dst def remove(self, filepath: Union[str, Path]) -> None: """Remove a file. Args: filepath (str or Path): Path to be removed. Raises: FileNotFoundError: If filepath does not exist, an FileNotFoundError will be raised. IsADirectoryError: If filepath is a directory, an IsADirectoryError will be raised. Examples: >>> backend = PetrelBackend() >>> filepath = 'petrel://path/of/file' >>> backend.remove(filepath) """ if not has_method(self._client, 'delete'): raise NotImplementedError( 'Current version of Petrel Python SDK has not supported ' 'the `delete` method, please use a higher version or dev ' 'branch instead.') if not self.exists(filepath): raise FileNotFoundError(f'filepath {filepath} does not exist') if self.isdir(filepath): raise IsADirectoryError('filepath should be a file') filepath = self._map_path(filepath) filepath = self._format_path(filepath) filepath = self._replace_prefix(filepath) self._client.delete(filepath) def rmtree(self, dir_path: Union[str, Path]) -> None: """Recursively delete a directory tree. Args: dir_path (str or Path): A directory to be removed. Examples: >>> backend = PetrelBackend() >>> dir_path = 'petrel://path/of/dir' >>> backend.rmtree(dir_path) """ for path in self.list_dir_or_file( dir_path, list_dir=False, recursive=True): filepath = self.join_path(dir_path, path) self.remove(filepath) def copy_if_symlink_fails( self, src: Union[str, Path], dst: Union[str, Path], ) -> bool: """Create a symbolic link pointing to src named dst. Directly copy src to dst because PetrelBacekend does not support create a symbolic link. Args: src (str or Path): A file or directory to be copied. dst (str or Path): Copy a file or directory to dst. backend_args (dict, optional): Arguments to instantiate the prefix of uri corresponding backend. Defaults to None. Returns: bool: Return False because PetrelBackend does not support create a symbolic link. Examples: >>> backend = PetrelBackend() >>> src = 'petrel://path/of/file' >>> dst = 'petrel://path/of/your/file' >>> backend.copy_if_symlink_fails(src, dst) False >>> src = 'petrel://path/of/dir' >>> dst = 'petrel://path/of/your/dir' >>> backend.copy_if_symlink_fails(src, dst) False """ if self.isfile(src): self.copyfile(src, dst) else: self.copytree(src, dst) return False def list_dir_or_file(self, dir_path: Union[str, Path], list_dir: bool = True, list_file: bool = True, suffix: Optional[Union[str, Tuple[str]]] = None, recursive: bool = False) -> Iterator[str]: """Scan a directory to find the interested directories or files in arbitrary order. Note: Petrel has no concept of directories but it simulates the directory hierarchy in the filesystem through public prefixes. In addition, if the returned path ends with '/', it means the path is a public prefix which is a logical directory. Note: :meth:`list_dir_or_file` returns the path relative to ``dir_path``. In addition, the returned path of directory will not contains the suffix '/' which is consistent with other backends. Args: dir_path (str | Path): Path of the directory. list_dir (bool): List the directories. Defaults to True. list_file (bool): List the path of files. Defaults to True. suffix (str or tuple[str], optional): File suffix that we are interested in. Defaults to None. recursive (bool): If set to True, recursively scan the directory. Defaults to False. Yields: Iterable[str]: A relative path to ``dir_path``. Examples: >>> backend = PetrelBackend() >>> dir_path = 'petrel://path/of/dir' >>> # list those files and directories in current directory >>> for file_path in backend.list_dir_or_file(dir_path): ... print(file_path) >>> # only list files >>> for file_path in backend.list_dir_or_file(dir_path, list_dir=False): ... print(file_path) >>> # only list directories >>> for file_path in backend.list_dir_or_file(dir_path, list_file=False): ... print(file_path) >>> # only list files ending with specified suffixes >>> for file_path in backend.list_dir_or_file(dir_path, suffix='.txt'): ... print(file_path) >>> # list all files and directory recursively >>> for file_path in backend.list_dir_or_file(dir_path, recursive=True): ... print(file_path) """ # noqa: E501 if not has_method(self._client, 'list'): raise NotImplementedError( 'Current version of Petrel Python SDK has not supported ' 'the `list` method, please use a higher version or dev' ' branch instead.') dir_path = self._map_path(dir_path) dir_path = self._format_path(dir_path) dir_path = self._replace_prefix(dir_path) if list_dir and suffix is not None: raise TypeError( '`list_dir` should be False when `suffix` is not None') if (suffix is not None) and not isinstance(suffix, (str, tuple)): raise TypeError('`suffix` must be a string or tuple of strings') # Petrel's simulated directory hierarchy assumes that directory paths # should end with `/` if not dir_path.endswith('/'): dir_path += '/' root = dir_path def _list_dir_or_file(dir_path, list_dir, list_file, suffix, recursive): for path in self._client.list(dir_path): # the `self.isdir` is not used here to determine whether path # is a directory, because `self.isdir` relies on # `self._client.list` if path.endswith('/'): # a directory path next_dir_path = self.join_path(dir_path, path) if list_dir: # get the relative path and exclude the last # character '/' rel_dir = next_dir_path[len(root):-1] yield rel_dir if recursive: yield from _list_dir_or_file(next_dir_path, list_dir, list_file, suffix, recursive) else: # a file path absolute_path = self.join_path(dir_path, path) rel_path = absolute_path[len(root):] if (suffix is None or rel_path.endswith(suffix)) and list_file: yield rel_path return _list_dir_or_file(dir_path, list_dir, list_file, suffix, recursive) def generate_presigned_url(self, url: str, client_method: str = 'get_object', expires_in: int = 3600) -> str: """Generate the presigned url of video stream which can be passed to mmcv.VideoReader. Now only work on Petrel backend. Note: Now only work on Petrel backend. Args: url (str): Url of video stream. client_method (str): Method of client, 'get_object' or 'put_object'. Default: 'get_object'. expires_in (int): expires, in seconds. Default: 3600. Returns: str: Generated presigned url. """ return self._client.generate_presigned_url(url, client_method, expires_in)