|
from functools import wraps |
|
|
|
|
|
class Callback: |
|
""" |
|
Base class and interface for callback mechanism |
|
|
|
This class can be used directly for monitoring file transfers by |
|
providing ``callback=Callback(hooks=...)`` (see the ``hooks`` argument, |
|
below), or subclassed for more specialised behaviour. |
|
|
|
Parameters |
|
---------- |
|
size: int (optional) |
|
Nominal quantity for the value that corresponds to a complete |
|
transfer, e.g., total number of tiles or total number of |
|
bytes |
|
value: int (0) |
|
Starting internal counter value |
|
hooks: dict or None |
|
A dict of named functions to be called on each update. The signature |
|
of these must be ``f(size, value, **kwargs)`` |
|
""" |
|
|
|
def __init__(self, size=None, value=0, hooks=None, **kwargs): |
|
self.size = size |
|
self.value = value |
|
self.hooks = hooks or {} |
|
self.kw = kwargs |
|
|
|
def __enter__(self): |
|
return self |
|
|
|
def __exit__(self, *exc_args): |
|
self.close() |
|
|
|
def close(self): |
|
"""Close callback.""" |
|
|
|
def branched(self, path_1, path_2, **kwargs): |
|
""" |
|
Return callback for child transfers |
|
|
|
If this callback is operating at a higher level, e.g., put, which may |
|
trigger transfers that can also be monitored. The function returns a callback |
|
that has to be passed to the child method, e.g., put_file, |
|
as `callback=` argument. |
|
|
|
The implementation uses `callback.branch` for compatibility. |
|
When implementing callbacks, it is recommended to override this function instead |
|
of `branch` and avoid calling `super().branched(...)`. |
|
|
|
Prefer using this function over `branch`. |
|
|
|
Parameters |
|
---------- |
|
path_1: str |
|
Child's source path |
|
path_2: str |
|
Child's destination path |
|
**kwargs: |
|
Arbitrary keyword arguments |
|
|
|
Returns |
|
------- |
|
callback: Callback |
|
A callback instance to be passed to the child method |
|
""" |
|
self.branch(path_1, path_2, kwargs) |
|
|
|
return kwargs.pop("callback", DEFAULT_CALLBACK) |
|
|
|
def branch_coro(self, fn): |
|
""" |
|
Wraps a coroutine, and pass a new child callback to it. |
|
""" |
|
|
|
@wraps(fn) |
|
async def func(path1, path2: str, **kwargs): |
|
with self.branched(path1, path2, **kwargs) as child: |
|
return await fn(path1, path2, callback=child, **kwargs) |
|
|
|
return func |
|
|
|
def set_size(self, size): |
|
""" |
|
Set the internal maximum size attribute |
|
|
|
Usually called if not initially set at instantiation. Note that this |
|
triggers a ``call()``. |
|
|
|
Parameters |
|
---------- |
|
size: int |
|
""" |
|
self.size = size |
|
self.call() |
|
|
|
def absolute_update(self, value): |
|
""" |
|
Set the internal value state |
|
|
|
Triggers ``call()`` |
|
|
|
Parameters |
|
---------- |
|
value: int |
|
""" |
|
self.value = value |
|
self.call() |
|
|
|
def relative_update(self, inc=1): |
|
""" |
|
Delta increment the internal counter |
|
|
|
Triggers ``call()`` |
|
|
|
Parameters |
|
---------- |
|
inc: int |
|
""" |
|
self.value += inc |
|
self.call() |
|
|
|
def call(self, hook_name=None, **kwargs): |
|
""" |
|
Execute hook(s) with current state |
|
|
|
Each function is passed the internal size and current value |
|
|
|
Parameters |
|
---------- |
|
hook_name: str or None |
|
If given, execute on this hook |
|
kwargs: passed on to (all) hook(s) |
|
""" |
|
if not self.hooks: |
|
return |
|
kw = self.kw.copy() |
|
kw.update(kwargs) |
|
if hook_name: |
|
if hook_name not in self.hooks: |
|
return |
|
return self.hooks[hook_name](self.size, self.value, **kw) |
|
for hook in self.hooks.values() or []: |
|
hook(self.size, self.value, **kw) |
|
|
|
def wrap(self, iterable): |
|
""" |
|
Wrap an iterable to call ``relative_update`` on each iterations |
|
|
|
Parameters |
|
---------- |
|
iterable: Iterable |
|
The iterable that is being wrapped |
|
""" |
|
for item in iterable: |
|
self.relative_update() |
|
yield item |
|
|
|
def branch(self, path_1, path_2, kwargs): |
|
""" |
|
Set callbacks for child transfers |
|
|
|
If this callback is operating at a higher level, e.g., put, which may |
|
trigger transfers that can also be monitored. The passed kwargs are |
|
to be *mutated* to add ``callback=``, if this class supports branching |
|
to children. |
|
|
|
Parameters |
|
---------- |
|
path_1: str |
|
Child's source path |
|
path_2: str |
|
Child's destination path |
|
kwargs: dict |
|
arguments passed to child method, e.g., put_file. |
|
|
|
Returns |
|
------- |
|
|
|
""" |
|
return None |
|
|
|
def no_op(self, *_, **__): |
|
pass |
|
|
|
def __getattr__(self, item): |
|
""" |
|
If undefined methods are called on this class, nothing happens |
|
""" |
|
return self.no_op |
|
|
|
@classmethod |
|
def as_callback(cls, maybe_callback=None): |
|
"""Transform callback=... into Callback instance |
|
|
|
For the special value of ``None``, return the global instance of |
|
``NoOpCallback``. This is an alternative to including |
|
``callback=DEFAULT_CALLBACK`` directly in a method signature. |
|
""" |
|
if maybe_callback is None: |
|
return DEFAULT_CALLBACK |
|
return maybe_callback |
|
|
|
|
|
class NoOpCallback(Callback): |
|
""" |
|
This implementation of Callback does exactly nothing |
|
""" |
|
|
|
def call(self, *args, **kwargs): |
|
return None |
|
|
|
|
|
class DotPrinterCallback(Callback): |
|
""" |
|
Simple example Callback implementation |
|
|
|
Almost identical to Callback with a hook that prints a char; here we |
|
demonstrate how the outer layer may print "#" and the inner layer "." |
|
""" |
|
|
|
def __init__(self, chr_to_print="#", **kwargs): |
|
self.chr = chr_to_print |
|
super().__init__(**kwargs) |
|
|
|
def branch(self, path_1, path_2, kwargs): |
|
"""Mutate kwargs to add new instance with different print char""" |
|
kwargs["callback"] = DotPrinterCallback(".") |
|
|
|
def call(self, **kwargs): |
|
"""Just outputs a character""" |
|
print(self.chr, end="") |
|
|
|
|
|
class TqdmCallback(Callback): |
|
""" |
|
A callback to display a progress bar using tqdm |
|
|
|
Parameters |
|
---------- |
|
tqdm_kwargs : dict, (optional) |
|
Any argument accepted by the tqdm constructor. |
|
See the `tqdm doc <https://tqdm.github.io/docs/tqdm/#__init__>`_. |
|
Will be forwarded to `tqdm_cls`. |
|
tqdm_cls: (optional) |
|
subclass of `tqdm.tqdm`. If not passed, it will default to `tqdm.tqdm`. |
|
|
|
Examples |
|
-------- |
|
>>> import fsspec |
|
>>> from fsspec.callbacks import TqdmCallback |
|
>>> fs = fsspec.filesystem("memory") |
|
>>> path2distant_data = "/your-path" |
|
>>> fs.upload( |
|
".", |
|
path2distant_data, |
|
recursive=True, |
|
callback=TqdmCallback(), |
|
) |
|
|
|
You can forward args to tqdm using the ``tqdm_kwargs`` parameter. |
|
|
|
>>> fs.upload( |
|
".", |
|
path2distant_data, |
|
recursive=True, |
|
callback=TqdmCallback(tqdm_kwargs={"desc": "Your tqdm description"}), |
|
) |
|
|
|
You can also customize the progress bar by passing a subclass of `tqdm`. |
|
|
|
.. code-block:: python |
|
|
|
class TqdmFormat(tqdm): |
|
'''Provides a `total_time` format parameter''' |
|
@property |
|
def format_dict(self): |
|
d = super().format_dict |
|
total_time = d["elapsed"] * (d["total"] or 0) / max(d["n"], 1) |
|
d.update(total_time=self.format_interval(total_time) + " in total") |
|
return d |
|
|
|
>>> with TqdmCallback( |
|
tqdm_kwargs={ |
|
"desc": "desc", |
|
"bar_format": "{total_time}: {percentage:.0f}%|{bar}{r_bar}", |
|
}, |
|
tqdm_cls=TqdmFormat, |
|
) as callback: |
|
fs.upload(".", path2distant_data, recursive=True, callback=callback) |
|
""" |
|
|
|
def __init__(self, tqdm_kwargs=None, *args, **kwargs): |
|
try: |
|
from tqdm import tqdm |
|
|
|
except ImportError as exce: |
|
raise ImportError( |
|
"Using TqdmCallback requires tqdm to be installed" |
|
) from exce |
|
|
|
self._tqdm_cls = kwargs.pop("tqdm_cls", tqdm) |
|
self._tqdm_kwargs = tqdm_kwargs or {} |
|
self.tqdm = None |
|
super().__init__(*args, **kwargs) |
|
|
|
def call(self, *args, **kwargs): |
|
if self.tqdm is None: |
|
self.tqdm = self._tqdm_cls(total=self.size, **self._tqdm_kwargs) |
|
self.tqdm.total = self.size |
|
self.tqdm.update(self.value - self.tqdm.n) |
|
|
|
def close(self): |
|
if self.tqdm is not None: |
|
self.tqdm.close() |
|
self.tqdm = None |
|
|
|
def __del__(self): |
|
return self.close() |
|
|
|
|
|
DEFAULT_CALLBACK = _DEFAULT_CALLBACK = NoOpCallback() |
|
|