|
import asyncio |
|
import asyncio.events |
|
import functools |
|
import inspect |
|
import io |
|
import numbers |
|
import os |
|
import re |
|
import threading |
|
from contextlib import contextmanager |
|
from glob import has_magic |
|
from typing import TYPE_CHECKING, Iterable |
|
|
|
from .callbacks import DEFAULT_CALLBACK |
|
from .exceptions import FSTimeoutError |
|
from .implementations.local import LocalFileSystem, make_path_posix, trailing_sep |
|
from .spec import AbstractBufferedFile, AbstractFileSystem |
|
from .utils import glob_translate, is_exception, other_paths |
|
|
|
private = re.compile("_[^_]") |
|
iothread = [None] |
|
loop = [None] |
|
_lock = None |
|
get_running_loop = asyncio.get_running_loop |
|
|
|
|
|
def get_lock(): |
|
"""Allocate or return a threading lock. |
|
|
|
The lock is allocated on first use to allow setting one lock per forked process. |
|
""" |
|
global _lock |
|
if not _lock: |
|
_lock = threading.Lock() |
|
return _lock |
|
|
|
|
|
def reset_lock(): |
|
"""Reset the global lock. |
|
|
|
This should be called only on the init of a forked process to reset the lock to |
|
None, enabling the new forked process to get a new lock. |
|
""" |
|
global _lock |
|
|
|
iothread[0] = None |
|
loop[0] = None |
|
_lock = None |
|
|
|
|
|
async def _runner(event, coro, result, timeout=None): |
|
timeout = timeout if timeout else None |
|
if timeout is not None: |
|
coro = asyncio.wait_for(coro, timeout=timeout) |
|
try: |
|
result[0] = await coro |
|
except Exception as ex: |
|
result[0] = ex |
|
finally: |
|
event.set() |
|
|
|
|
|
def sync(loop, func, *args, timeout=None, **kwargs): |
|
""" |
|
Make loop run coroutine until it returns. Runs in other thread |
|
|
|
Examples |
|
-------- |
|
>>> fsspec.asyn.sync(fsspec.asyn.get_loop(), func, *args, |
|
timeout=timeout, **kwargs) |
|
""" |
|
timeout = timeout if timeout else None |
|
|
|
|
|
if loop is None or loop.is_closed(): |
|
raise RuntimeError("Loop is not running") |
|
try: |
|
loop0 = asyncio.events.get_running_loop() |
|
if loop0 is loop: |
|
raise NotImplementedError("Calling sync() from within a running loop") |
|
except NotImplementedError: |
|
raise |
|
except RuntimeError: |
|
pass |
|
coro = func(*args, **kwargs) |
|
result = [None] |
|
event = threading.Event() |
|
asyncio.run_coroutine_threadsafe(_runner(event, coro, result, timeout), loop) |
|
while True: |
|
|
|
if event.wait(1): |
|
break |
|
if timeout is not None: |
|
timeout -= 1 |
|
if timeout < 0: |
|
raise FSTimeoutError |
|
|
|
return_result = result[0] |
|
if isinstance(return_result, asyncio.TimeoutError): |
|
|
|
raise FSTimeoutError from return_result |
|
elif isinstance(return_result, BaseException): |
|
raise return_result |
|
else: |
|
return return_result |
|
|
|
|
|
def sync_wrapper(func, obj=None): |
|
"""Given a function, make so can be called in blocking contexts |
|
|
|
Leave obj=None if defining within a class. Pass the instance if attaching |
|
as an attribute of the instance. |
|
""" |
|
|
|
@functools.wraps(func) |
|
def wrapper(*args, **kwargs): |
|
self = obj or args[0] |
|
return sync(self.loop, func, *args, **kwargs) |
|
|
|
return wrapper |
|
|
|
|
|
@contextmanager |
|
def _selector_policy(): |
|
original_policy = asyncio.get_event_loop_policy() |
|
try: |
|
if os.name == "nt" and hasattr(asyncio, "WindowsSelectorEventLoopPolicy"): |
|
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) |
|
|
|
yield |
|
finally: |
|
asyncio.set_event_loop_policy(original_policy) |
|
|
|
|
|
def get_loop(): |
|
"""Create or return the default fsspec IO loop |
|
|
|
The loop will be running on a separate thread. |
|
""" |
|
if loop[0] is None: |
|
with get_lock(): |
|
|
|
|
|
if loop[0] is None: |
|
with _selector_policy(): |
|
loop[0] = asyncio.new_event_loop() |
|
th = threading.Thread(target=loop[0].run_forever, name="fsspecIO") |
|
th.daemon = True |
|
th.start() |
|
iothread[0] = th |
|
return loop[0] |
|
|
|
|
|
if TYPE_CHECKING: |
|
import resource |
|
|
|
ResourceError = resource.error |
|
else: |
|
try: |
|
import resource |
|
except ImportError: |
|
resource = None |
|
ResourceError = OSError |
|
else: |
|
ResourceError = getattr(resource, "error", OSError) |
|
|
|
_DEFAULT_BATCH_SIZE = 128 |
|
_NOFILES_DEFAULT_BATCH_SIZE = 1280 |
|
|
|
|
|
def _get_batch_size(nofiles=False): |
|
from fsspec.config import conf |
|
|
|
if nofiles: |
|
if "nofiles_gather_batch_size" in conf: |
|
return conf["nofiles_gather_batch_size"] |
|
else: |
|
if "gather_batch_size" in conf: |
|
return conf["gather_batch_size"] |
|
if nofiles: |
|
return _NOFILES_DEFAULT_BATCH_SIZE |
|
if resource is None: |
|
return _DEFAULT_BATCH_SIZE |
|
|
|
try: |
|
soft_limit, _ = resource.getrlimit(resource.RLIMIT_NOFILE) |
|
except (ImportError, ValueError, ResourceError): |
|
return _DEFAULT_BATCH_SIZE |
|
|
|
if soft_limit == resource.RLIM_INFINITY: |
|
return -1 |
|
else: |
|
return soft_limit // 8 |
|
|
|
|
|
def running_async() -> bool: |
|
"""Being executed by an event loop?""" |
|
try: |
|
asyncio.get_running_loop() |
|
return True |
|
except RuntimeError: |
|
return False |
|
|
|
|
|
async def _run_coros_in_chunks( |
|
coros, |
|
batch_size=None, |
|
callback=DEFAULT_CALLBACK, |
|
timeout=None, |
|
return_exceptions=False, |
|
nofiles=False, |
|
): |
|
"""Run the given coroutines in chunks. |
|
|
|
Parameters |
|
---------- |
|
coros: list of coroutines to run |
|
batch_size: int or None |
|
Number of coroutines to submit/wait on simultaneously. |
|
If -1, then it will not be any throttling. If |
|
None, it will be inferred from _get_batch_size() |
|
callback: fsspec.callbacks.Callback instance |
|
Gets a relative_update when each coroutine completes |
|
timeout: number or None |
|
If given, each coroutine times out after this time. Note that, since |
|
there are multiple batches, the total run time of this function will in |
|
general be longer |
|
return_exceptions: bool |
|
Same meaning as in asyncio.gather |
|
nofiles: bool |
|
If inferring the batch_size, does this operation involve local files? |
|
If yes, you normally expect smaller batches. |
|
""" |
|
|
|
if batch_size is None: |
|
batch_size = _get_batch_size(nofiles=nofiles) |
|
|
|
if batch_size == -1: |
|
batch_size = len(coros) |
|
|
|
assert batch_size > 0 |
|
|
|
async def _run_coro(coro, i): |
|
try: |
|
return await asyncio.wait_for(coro, timeout=timeout), i |
|
except Exception as e: |
|
if not return_exceptions: |
|
raise |
|
return e, i |
|
finally: |
|
callback.relative_update(1) |
|
|
|
i = 0 |
|
n = len(coros) |
|
results = [None] * n |
|
pending = set() |
|
|
|
while pending or i < n: |
|
while len(pending) < batch_size and i < n: |
|
pending.add(asyncio.ensure_future(_run_coro(coros[i], i))) |
|
i += 1 |
|
|
|
if not pending: |
|
break |
|
|
|
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED) |
|
while done: |
|
result, k = await done.pop() |
|
results[k] = result |
|
|
|
return results |
|
|
|
|
|
|
|
async_methods = [ |
|
"_ls", |
|
"_cat_file", |
|
"_get_file", |
|
"_put_file", |
|
"_rm_file", |
|
"_cp_file", |
|
"_pipe_file", |
|
"_expand_path", |
|
"_info", |
|
"_isfile", |
|
"_isdir", |
|
"_exists", |
|
"_walk", |
|
"_glob", |
|
"_find", |
|
"_du", |
|
"_size", |
|
"_mkdir", |
|
"_makedirs", |
|
] |
|
|
|
|
|
class AsyncFileSystem(AbstractFileSystem): |
|
"""Async file operations, default implementations |
|
|
|
Passes bulk operations to asyncio.gather for concurrent operation. |
|
|
|
Implementations that have concurrent batch operations and/or async methods |
|
should inherit from this class instead of AbstractFileSystem. Docstrings are |
|
copied from the un-underscored method in AbstractFileSystem, if not given. |
|
""" |
|
|
|
|
|
|
|
|
|
async_impl = True |
|
mirror_sync_methods = True |
|
disable_throttling = False |
|
|
|
def __init__(self, *args, asynchronous=False, loop=None, batch_size=None, **kwargs): |
|
self.asynchronous = asynchronous |
|
self._pid = os.getpid() |
|
if not asynchronous: |
|
self._loop = loop or get_loop() |
|
else: |
|
self._loop = None |
|
self.batch_size = batch_size |
|
super().__init__(*args, **kwargs) |
|
|
|
@property |
|
def loop(self): |
|
if self._pid != os.getpid(): |
|
raise RuntimeError("This class is not fork-safe") |
|
return self._loop |
|
|
|
async def _rm_file(self, path, **kwargs): |
|
raise NotImplementedError |
|
|
|
async def _rm(self, path, recursive=False, batch_size=None, **kwargs): |
|
|
|
batch_size = batch_size or self.batch_size |
|
path = await self._expand_path(path, recursive=recursive) |
|
return await _run_coros_in_chunks( |
|
[self._rm_file(p, **kwargs) for p in reversed(path)], |
|
batch_size=batch_size, |
|
nofiles=True, |
|
) |
|
|
|
async def _cp_file(self, path1, path2, **kwargs): |
|
raise NotImplementedError |
|
|
|
async def _copy( |
|
self, |
|
path1, |
|
path2, |
|
recursive=False, |
|
on_error=None, |
|
maxdepth=None, |
|
batch_size=None, |
|
**kwargs, |
|
): |
|
if on_error is None and recursive: |
|
on_error = "ignore" |
|
elif on_error is None: |
|
on_error = "raise" |
|
|
|
if isinstance(path1, list) and isinstance(path2, list): |
|
|
|
|
|
paths1 = path1 |
|
paths2 = path2 |
|
else: |
|
source_is_str = isinstance(path1, str) |
|
paths1 = await self._expand_path( |
|
path1, maxdepth=maxdepth, recursive=recursive |
|
) |
|
if source_is_str and (not recursive or maxdepth is not None): |
|
|
|
paths1 = [ |
|
p for p in paths1 if not (trailing_sep(p) or await self._isdir(p)) |
|
] |
|
if not paths1: |
|
return |
|
|
|
source_is_file = len(paths1) == 1 |
|
dest_is_dir = isinstance(path2, str) and ( |
|
trailing_sep(path2) or await self._isdir(path2) |
|
) |
|
|
|
exists = source_is_str and ( |
|
(has_magic(path1) and source_is_file) |
|
or (not has_magic(path1) and dest_is_dir and not trailing_sep(path1)) |
|
) |
|
paths2 = other_paths( |
|
paths1, |
|
path2, |
|
exists=exists, |
|
flatten=not source_is_str, |
|
) |
|
|
|
batch_size = batch_size or self.batch_size |
|
coros = [self._cp_file(p1, p2, **kwargs) for p1, p2 in zip(paths1, paths2)] |
|
result = await _run_coros_in_chunks( |
|
coros, batch_size=batch_size, return_exceptions=True, nofiles=True |
|
) |
|
|
|
for ex in filter(is_exception, result): |
|
if on_error == "ignore" and isinstance(ex, FileNotFoundError): |
|
continue |
|
raise ex |
|
|
|
async def _pipe_file(self, path, value, **kwargs): |
|
raise NotImplementedError |
|
|
|
async def _pipe(self, path, value=None, batch_size=None, **kwargs): |
|
if isinstance(path, str): |
|
path = {path: value} |
|
batch_size = batch_size or self.batch_size |
|
return await _run_coros_in_chunks( |
|
[self._pipe_file(k, v, **kwargs) for k, v in path.items()], |
|
batch_size=batch_size, |
|
nofiles=True, |
|
) |
|
|
|
async def _process_limits(self, url, start, end): |
|
"""Helper for "Range"-based _cat_file""" |
|
size = None |
|
suff = False |
|
if start is not None and start < 0: |
|
|
|
if end is None: |
|
end = -start |
|
start = "" |
|
suff = True |
|
else: |
|
size = size or (await self._info(url))["size"] |
|
start = size + start |
|
elif start is None: |
|
start = 0 |
|
if not suff: |
|
if end is not None and end < 0: |
|
if start is not None: |
|
size = size or (await self._info(url))["size"] |
|
end = size + end |
|
elif end is None: |
|
end = "" |
|
if isinstance(end, numbers.Integral): |
|
end -= 1 |
|
return f"bytes={start}-{end}" |
|
|
|
async def _cat_file(self, path, start=None, end=None, **kwargs): |
|
raise NotImplementedError |
|
|
|
async def _cat( |
|
self, path, recursive=False, on_error="raise", batch_size=None, **kwargs |
|
): |
|
paths = await self._expand_path(path, recursive=recursive) |
|
coros = [self._cat_file(path, **kwargs) for path in paths] |
|
batch_size = batch_size or self.batch_size |
|
out = await _run_coros_in_chunks( |
|
coros, batch_size=batch_size, nofiles=True, return_exceptions=True |
|
) |
|
if on_error == "raise": |
|
ex = next(filter(is_exception, out), False) |
|
if ex: |
|
raise ex |
|
if ( |
|
len(paths) > 1 |
|
or isinstance(path, list) |
|
or paths[0] != self._strip_protocol(path) |
|
): |
|
return { |
|
k: v |
|
for k, v in zip(paths, out) |
|
if on_error != "omit" or not is_exception(v) |
|
} |
|
else: |
|
return out[0] |
|
|
|
async def _cat_ranges( |
|
self, |
|
paths, |
|
starts, |
|
ends, |
|
max_gap=None, |
|
batch_size=None, |
|
on_error="return", |
|
**kwargs, |
|
): |
|
"""Get the contents of byte ranges from one or more files |
|
|
|
Parameters |
|
---------- |
|
paths: list |
|
A list of of filepaths on this filesystems |
|
starts, ends: int or list |
|
Bytes limits of the read. If using a single int, the same value will be |
|
used to read all the specified files. |
|
""" |
|
|
|
if max_gap is not None: |
|
|
|
raise NotImplementedError |
|
if not isinstance(paths, list): |
|
raise TypeError |
|
if not isinstance(starts, Iterable): |
|
starts = [starts] * len(paths) |
|
if not isinstance(ends, Iterable): |
|
ends = [ends] * len(paths) |
|
if len(starts) != len(paths) or len(ends) != len(paths): |
|
raise ValueError |
|
coros = [ |
|
self._cat_file(p, start=s, end=e, **kwargs) |
|
for p, s, e in zip(paths, starts, ends) |
|
] |
|
batch_size = batch_size or self.batch_size |
|
return await _run_coros_in_chunks( |
|
coros, batch_size=batch_size, nofiles=True, return_exceptions=True |
|
) |
|
|
|
async def _put_file(self, lpath, rpath, **kwargs): |
|
raise NotImplementedError |
|
|
|
async def _put( |
|
self, |
|
lpath, |
|
rpath, |
|
recursive=False, |
|
callback=DEFAULT_CALLBACK, |
|
batch_size=None, |
|
maxdepth=None, |
|
**kwargs, |
|
): |
|
"""Copy file(s) from local. |
|
|
|
Copies a specific file or tree of files (if recursive=True). If rpath |
|
ends with a "/", it will be assumed to be a directory, and target files |
|
will go within. |
|
|
|
The put_file method will be called concurrently on a batch of files. The |
|
batch_size option can configure the amount of futures that can be executed |
|
at the same time. If it is -1, then all the files will be uploaded concurrently. |
|
The default can be set for this instance by passing "batch_size" in the |
|
constructor, or for all instances by setting the "gather_batch_size" key |
|
in ``fsspec.config.conf``, falling back to 1/8th of the system limit . |
|
""" |
|
if isinstance(lpath, list) and isinstance(rpath, list): |
|
|
|
|
|
rpaths = rpath |
|
lpaths = lpath |
|
else: |
|
source_is_str = isinstance(lpath, str) |
|
if source_is_str: |
|
lpath = make_path_posix(lpath) |
|
fs = LocalFileSystem() |
|
lpaths = fs.expand_path(lpath, recursive=recursive, maxdepth=maxdepth) |
|
if source_is_str and (not recursive or maxdepth is not None): |
|
|
|
lpaths = [p for p in lpaths if not (trailing_sep(p) or fs.isdir(p))] |
|
if not lpaths: |
|
return |
|
|
|
source_is_file = len(lpaths) == 1 |
|
dest_is_dir = isinstance(rpath, str) and ( |
|
trailing_sep(rpath) or await self._isdir(rpath) |
|
) |
|
|
|
rpath = self._strip_protocol(rpath) |
|
exists = source_is_str and ( |
|
(has_magic(lpath) and source_is_file) |
|
or (not has_magic(lpath) and dest_is_dir and not trailing_sep(lpath)) |
|
) |
|
rpaths = other_paths( |
|
lpaths, |
|
rpath, |
|
exists=exists, |
|
flatten=not source_is_str, |
|
) |
|
|
|
is_dir = {l: os.path.isdir(l) for l in lpaths} |
|
rdirs = [r for l, r in zip(lpaths, rpaths) if is_dir[l]] |
|
file_pairs = [(l, r) for l, r in zip(lpaths, rpaths) if not is_dir[l]] |
|
|
|
await asyncio.gather(*[self._makedirs(d, exist_ok=True) for d in rdirs]) |
|
batch_size = batch_size or self.batch_size |
|
|
|
coros = [] |
|
callback.set_size(len(file_pairs)) |
|
for lfile, rfile in file_pairs: |
|
put_file = callback.branch_coro(self._put_file) |
|
coros.append(put_file(lfile, rfile, **kwargs)) |
|
|
|
return await _run_coros_in_chunks( |
|
coros, batch_size=batch_size, callback=callback |
|
) |
|
|
|
async def _get_file(self, rpath, lpath, **kwargs): |
|
raise NotImplementedError |
|
|
|
async def _get( |
|
self, |
|
rpath, |
|
lpath, |
|
recursive=False, |
|
callback=DEFAULT_CALLBACK, |
|
maxdepth=None, |
|
**kwargs, |
|
): |
|
"""Copy file(s) to local. |
|
|
|
Copies a specific file or tree of files (if recursive=True). If lpath |
|
ends with a "/", it will be assumed to be a directory, and target files |
|
will go within. Can submit a list of paths, which may be glob-patterns |
|
and will be expanded. |
|
|
|
The get_file method will be called concurrently on a batch of files. The |
|
batch_size option can configure the amount of futures that can be executed |
|
at the same time. If it is -1, then all the files will be uploaded concurrently. |
|
The default can be set for this instance by passing "batch_size" in the |
|
constructor, or for all instances by setting the "gather_batch_size" key |
|
in ``fsspec.config.conf``, falling back to 1/8th of the system limit . |
|
""" |
|
if isinstance(lpath, list) and isinstance(rpath, list): |
|
|
|
|
|
rpaths = rpath |
|
lpaths = lpath |
|
else: |
|
source_is_str = isinstance(rpath, str) |
|
|
|
source_not_trailing_sep = source_is_str and not trailing_sep(rpath) |
|
rpath = self._strip_protocol(rpath) |
|
rpaths = await self._expand_path( |
|
rpath, recursive=recursive, maxdepth=maxdepth |
|
) |
|
if source_is_str and (not recursive or maxdepth is not None): |
|
|
|
rpaths = [ |
|
p for p in rpaths if not (trailing_sep(p) or await self._isdir(p)) |
|
] |
|
if not rpaths: |
|
return |
|
|
|
lpath = make_path_posix(lpath) |
|
source_is_file = len(rpaths) == 1 |
|
dest_is_dir = isinstance(lpath, str) and ( |
|
trailing_sep(lpath) or LocalFileSystem().isdir(lpath) |
|
) |
|
|
|
exists = source_is_str and ( |
|
(has_magic(rpath) and source_is_file) |
|
or (not has_magic(rpath) and dest_is_dir and source_not_trailing_sep) |
|
) |
|
lpaths = other_paths( |
|
rpaths, |
|
lpath, |
|
exists=exists, |
|
flatten=not source_is_str, |
|
) |
|
|
|
[os.makedirs(os.path.dirname(lp), exist_ok=True) for lp in lpaths] |
|
batch_size = kwargs.pop("batch_size", self.batch_size) |
|
|
|
coros = [] |
|
callback.set_size(len(lpaths)) |
|
for lpath, rpath in zip(lpaths, rpaths): |
|
get_file = callback.branch_coro(self._get_file) |
|
coros.append(get_file(rpath, lpath, **kwargs)) |
|
return await _run_coros_in_chunks( |
|
coros, batch_size=batch_size, callback=callback |
|
) |
|
|
|
async def _isfile(self, path): |
|
try: |
|
return (await self._info(path))["type"] == "file" |
|
except: |
|
return False |
|
|
|
async def _isdir(self, path): |
|
try: |
|
return (await self._info(path))["type"] == "directory" |
|
except OSError: |
|
return False |
|
|
|
async def _size(self, path): |
|
return (await self._info(path)).get("size", None) |
|
|
|
async def _sizes(self, paths, batch_size=None): |
|
batch_size = batch_size or self.batch_size |
|
return await _run_coros_in_chunks( |
|
[self._size(p) for p in paths], batch_size=batch_size |
|
) |
|
|
|
async def _exists(self, path, **kwargs): |
|
try: |
|
await self._info(path, **kwargs) |
|
return True |
|
except FileNotFoundError: |
|
return False |
|
|
|
async def _info(self, path, **kwargs): |
|
raise NotImplementedError |
|
|
|
async def _ls(self, path, detail=True, **kwargs): |
|
raise NotImplementedError |
|
|
|
async def _walk(self, path, maxdepth=None, on_error="omit", **kwargs): |
|
if maxdepth is not None and maxdepth < 1: |
|
raise ValueError("maxdepth must be at least 1") |
|
|
|
path = self._strip_protocol(path) |
|
full_dirs = {} |
|
dirs = {} |
|
files = {} |
|
|
|
detail = kwargs.pop("detail", False) |
|
try: |
|
listing = await self._ls(path, detail=True, **kwargs) |
|
except (FileNotFoundError, OSError) as e: |
|
if on_error == "raise": |
|
raise |
|
elif callable(on_error): |
|
on_error(e) |
|
if detail: |
|
yield path, {}, {} |
|
else: |
|
yield path, [], [] |
|
return |
|
|
|
for info in listing: |
|
|
|
|
|
pathname = info["name"].rstrip("/") |
|
name = pathname.rsplit("/", 1)[-1] |
|
if info["type"] == "directory" and pathname != path: |
|
|
|
full_dirs[name] = pathname |
|
dirs[name] = info |
|
elif pathname == path: |
|
|
|
files[""] = info |
|
else: |
|
files[name] = info |
|
|
|
if detail: |
|
yield path, dirs, files |
|
else: |
|
yield path, list(dirs), list(files) |
|
|
|
if maxdepth is not None: |
|
maxdepth -= 1 |
|
if maxdepth < 1: |
|
return |
|
|
|
for d in dirs: |
|
async for _ in self._walk( |
|
full_dirs[d], maxdepth=maxdepth, detail=detail, **kwargs |
|
): |
|
yield _ |
|
|
|
async def _glob(self, path, maxdepth=None, **kwargs): |
|
if maxdepth is not None and maxdepth < 1: |
|
raise ValueError("maxdepth must be at least 1") |
|
|
|
import re |
|
|
|
seps = (os.path.sep, os.path.altsep) if os.path.altsep else (os.path.sep,) |
|
ends_with_sep = path.endswith(seps) |
|
path = self._strip_protocol(path) |
|
append_slash_to_dirname = ends_with_sep or path.endswith( |
|
tuple(sep + "**" for sep in seps) |
|
) |
|
idx_star = path.find("*") if path.find("*") >= 0 else len(path) |
|
idx_qmark = path.find("?") if path.find("?") >= 0 else len(path) |
|
idx_brace = path.find("[") if path.find("[") >= 0 else len(path) |
|
|
|
min_idx = min(idx_star, idx_qmark, idx_brace) |
|
|
|
detail = kwargs.pop("detail", False) |
|
|
|
if not has_magic(path): |
|
if await self._exists(path, **kwargs): |
|
if not detail: |
|
return [path] |
|
else: |
|
return {path: await self._info(path, **kwargs)} |
|
else: |
|
if not detail: |
|
return [] |
|
else: |
|
return {} |
|
elif "/" in path[:min_idx]: |
|
min_idx = path[:min_idx].rindex("/") |
|
root = path[: min_idx + 1] |
|
depth = path[min_idx + 1 :].count("/") + 1 |
|
else: |
|
root = "" |
|
depth = path[min_idx + 1 :].count("/") + 1 |
|
|
|
if "**" in path: |
|
if maxdepth is not None: |
|
idx_double_stars = path.find("**") |
|
depth_double_stars = path[idx_double_stars:].count("/") + 1 |
|
depth = depth - depth_double_stars + maxdepth |
|
else: |
|
depth = None |
|
|
|
allpaths = await self._find( |
|
root, maxdepth=depth, withdirs=True, detail=True, **kwargs |
|
) |
|
|
|
pattern = glob_translate(path + ("/" if ends_with_sep else "")) |
|
pattern = re.compile(pattern) |
|
|
|
out = { |
|
p: info |
|
for p, info in sorted(allpaths.items()) |
|
if pattern.match( |
|
( |
|
p + "/" |
|
if append_slash_to_dirname and info["type"] == "directory" |
|
else p |
|
) |
|
) |
|
} |
|
|
|
if detail: |
|
return out |
|
else: |
|
return list(out) |
|
|
|
async def _du(self, path, total=True, maxdepth=None, **kwargs): |
|
sizes = {} |
|
|
|
for f in await self._find(path, maxdepth=maxdepth, **kwargs): |
|
info = await self._info(f) |
|
sizes[info["name"]] = info["size"] |
|
if total: |
|
return sum(sizes.values()) |
|
else: |
|
return sizes |
|
|
|
async def _find(self, path, maxdepth=None, withdirs=False, **kwargs): |
|
path = self._strip_protocol(path) |
|
out = {} |
|
detail = kwargs.pop("detail", False) |
|
|
|
|
|
|
|
if withdirs and path != "" and await self._isdir(path): |
|
out[path] = await self._info(path) |
|
|
|
|
|
async for _, dirs, files in self._walk(path, maxdepth, detail=True, **kwargs): |
|
if withdirs: |
|
files.update(dirs) |
|
out.update({info["name"]: info for name, info in files.items()}) |
|
if not out and (await self._isfile(path)): |
|
|
|
|
|
out[path] = {} |
|
names = sorted(out) |
|
if not detail: |
|
return names |
|
else: |
|
return {name: out[name] for name in names} |
|
|
|
async def _expand_path(self, path, recursive=False, maxdepth=None): |
|
if maxdepth is not None and maxdepth < 1: |
|
raise ValueError("maxdepth must be at least 1") |
|
|
|
if isinstance(path, str): |
|
out = await self._expand_path([path], recursive, maxdepth) |
|
else: |
|
out = set() |
|
path = [self._strip_protocol(p) for p in path] |
|
for p in path: |
|
if has_magic(p): |
|
bit = set(await self._glob(p, maxdepth=maxdepth)) |
|
out |= bit |
|
if recursive: |
|
|
|
|
|
|
|
if maxdepth is not None and maxdepth <= 1: |
|
continue |
|
out |= set( |
|
await self._expand_path( |
|
list(bit), |
|
recursive=recursive, |
|
maxdepth=maxdepth - 1 if maxdepth is not None else None, |
|
) |
|
) |
|
continue |
|
elif recursive: |
|
rec = set(await self._find(p, maxdepth=maxdepth, withdirs=True)) |
|
out |= rec |
|
if p not in out and (recursive is False or (await self._exists(p))): |
|
|
|
out.add(p) |
|
if not out: |
|
raise FileNotFoundError(path) |
|
return sorted(out) |
|
|
|
async def _mkdir(self, path, create_parents=True, **kwargs): |
|
pass |
|
|
|
async def _makedirs(self, path, exist_ok=False): |
|
pass |
|
|
|
async def open_async(self, path, mode="rb", **kwargs): |
|
if "b" not in mode or kwargs.get("compression"): |
|
raise ValueError |
|
raise NotImplementedError |
|
|
|
|
|
def mirror_sync_methods(obj): |
|
"""Populate sync and async methods for obj |
|
|
|
For each method will create a sync version if the name refers to an async method |
|
(coroutine) and there is no override in the child class; will create an async |
|
method for the corresponding sync method if there is no implementation. |
|
|
|
Uses the methods specified in |
|
- async_methods: the set that an implementation is expected to provide |
|
- default_async_methods: that can be derived from their sync version in |
|
AbstractFileSystem |
|
- AsyncFileSystem: async-specific default coroutines |
|
""" |
|
from fsspec import AbstractFileSystem |
|
|
|
for method in async_methods + dir(AsyncFileSystem): |
|
if not method.startswith("_"): |
|
continue |
|
smethod = method[1:] |
|
if private.match(method): |
|
isco = inspect.iscoroutinefunction(getattr(obj, method, None)) |
|
unsync = getattr(getattr(obj, smethod, False), "__func__", None) |
|
is_default = unsync is getattr(AbstractFileSystem, smethod, "") |
|
if isco and is_default: |
|
mth = sync_wrapper(getattr(obj, method), obj=obj) |
|
setattr(obj, smethod, mth) |
|
if not mth.__doc__: |
|
mth.__doc__ = getattr( |
|
getattr(AbstractFileSystem, smethod, None), "__doc__", "" |
|
) |
|
|
|
|
|
class FSSpecCoroutineCancel(Exception): |
|
pass |
|
|
|
|
|
def _dump_running_tasks( |
|
printout=True, cancel=True, exc=FSSpecCoroutineCancel, with_task=False |
|
): |
|
import traceback |
|
|
|
tasks = [t for t in asyncio.tasks.all_tasks(loop[0]) if not t.done()] |
|
if printout: |
|
[task.print_stack() for task in tasks] |
|
out = [ |
|
{ |
|
"locals": task._coro.cr_frame.f_locals, |
|
"file": task._coro.cr_frame.f_code.co_filename, |
|
"firstline": task._coro.cr_frame.f_code.co_firstlineno, |
|
"linelo": task._coro.cr_frame.f_lineno, |
|
"stack": traceback.format_stack(task._coro.cr_frame), |
|
"task": task if with_task else None, |
|
} |
|
for task in tasks |
|
] |
|
if cancel: |
|
for t in tasks: |
|
cbs = t._callbacks |
|
t.cancel() |
|
asyncio.futures.Future.set_exception(t, exc) |
|
asyncio.futures.Future.cancel(t) |
|
[cb[0](t) for cb in cbs] |
|
try: |
|
t._coro.throw(exc) |
|
except exc: |
|
pass |
|
return out |
|
|
|
|
|
class AbstractAsyncStreamedFile(AbstractBufferedFile): |
|
|
|
|
|
|
|
async def read(self, length=-1): |
|
""" |
|
Return data from cache, or fetch pieces as necessary |
|
|
|
Parameters |
|
---------- |
|
length: int (-1) |
|
Number of bytes to read; if <0, all remaining bytes. |
|
""" |
|
length = -1 if length is None else int(length) |
|
if self.mode != "rb": |
|
raise ValueError("File not in read mode") |
|
if length < 0: |
|
length = self.size - self.loc |
|
if self.closed: |
|
raise ValueError("I/O operation on closed file.") |
|
if length == 0: |
|
|
|
return b"" |
|
out = await self._fetch_range(self.loc, self.loc + length) |
|
self.loc += len(out) |
|
return out |
|
|
|
async def write(self, data): |
|
""" |
|
Write data to buffer. |
|
|
|
Buffer only sent on flush() or if buffer is greater than |
|
or equal to blocksize. |
|
|
|
Parameters |
|
---------- |
|
data: bytes |
|
Set of bytes to be written. |
|
""" |
|
if self.mode not in {"wb", "ab"}: |
|
raise ValueError("File not in write mode") |
|
if self.closed: |
|
raise ValueError("I/O operation on closed file.") |
|
if self.forced: |
|
raise ValueError("This file has been force-flushed, can only close") |
|
out = self.buffer.write(data) |
|
self.loc += out |
|
if self.buffer.tell() >= self.blocksize: |
|
await self.flush() |
|
return out |
|
|
|
async def close(self): |
|
"""Close file |
|
|
|
Finalizes writes, discards cache |
|
""" |
|
if getattr(self, "_unclosable", False): |
|
return |
|
if self.closed: |
|
return |
|
if self.mode == "rb": |
|
self.cache = None |
|
else: |
|
if not self.forced: |
|
await self.flush(force=True) |
|
|
|
if self.fs is not None: |
|
self.fs.invalidate_cache(self.path) |
|
self.fs.invalidate_cache(self.fs._parent(self.path)) |
|
|
|
self.closed = True |
|
|
|
async def flush(self, force=False): |
|
if self.closed: |
|
raise ValueError("Flush on closed file") |
|
if force and self.forced: |
|
raise ValueError("Force flush cannot be called more than once") |
|
if force: |
|
self.forced = True |
|
|
|
if self.mode not in {"wb", "ab"}: |
|
|
|
return |
|
|
|
if not force and self.buffer.tell() < self.blocksize: |
|
|
|
return |
|
|
|
if self.offset is None: |
|
|
|
self.offset = 0 |
|
try: |
|
await self._initiate_upload() |
|
except: |
|
self.closed = True |
|
raise |
|
|
|
if await self._upload_chunk(final=force) is not False: |
|
self.offset += self.buffer.seek(0, 2) |
|
self.buffer = io.BytesIO() |
|
|
|
async def __aenter__(self): |
|
return self |
|
|
|
async def __aexit__(self, exc_type, exc_val, exc_tb): |
|
await self.close() |
|
|
|
async def _fetch_range(self, start, end): |
|
raise NotImplementedError |
|
|
|
async def _initiate_upload(self): |
|
pass |
|
|
|
async def _upload_chunk(self, final=False): |
|
raise NotImplementedError |
|
|