JAMESPARK3's picture
Upload folder using huggingface_hub
1380717 verified
raw
history blame
25.6 kB
"""Extensible memoizing collections and decorators."""
__all__ = (
"Cache",
"FIFOCache",
"LFUCache",
"LRUCache",
"MRUCache",
"RRCache",
"TLRUCache",
"TTLCache",
"cached",
"cachedmethod",
)
__version__ = "5.5.0"
import collections
import collections.abc
import functools
import heapq
import random
import time
from . import keys
class _DefaultSize:
__slots__ = ()
def __getitem__(self, _):
return 1
def __setitem__(self, _, value):
assert value == 1
def pop(self, _):
return 1
class Cache(collections.abc.MutableMapping):
"""Mutable mapping to serve as a simple cache or cache base class."""
__marker = object()
__size = _DefaultSize()
def __init__(self, maxsize, getsizeof=None):
if getsizeof:
self.getsizeof = getsizeof
if self.getsizeof is not Cache.getsizeof:
self.__size = dict()
self.__data = dict()
self.__currsize = 0
self.__maxsize = maxsize
def __repr__(self):
return "%s(%s, maxsize=%r, currsize=%r)" % (
self.__class__.__name__,
repr(self.__data),
self.__maxsize,
self.__currsize,
)
def __getitem__(self, key):
try:
return self.__data[key]
except KeyError:
return self.__missing__(key)
def __setitem__(self, key, value):
maxsize = self.__maxsize
size = self.getsizeof(value)
if size > maxsize:
raise ValueError("value too large")
if key not in self.__data or self.__size[key] < size:
while self.__currsize + size > maxsize:
self.popitem()
if key in self.__data:
diffsize = size - self.__size[key]
else:
diffsize = size
self.__data[key] = value
self.__size[key] = size
self.__currsize += diffsize
def __delitem__(self, key):
size = self.__size.pop(key)
del self.__data[key]
self.__currsize -= size
def __contains__(self, key):
return key in self.__data
def __missing__(self, key):
raise KeyError(key)
def __iter__(self):
return iter(self.__data)
def __len__(self):
return len(self.__data)
def get(self, key, default=None):
if key in self:
return self[key]
else:
return default
def pop(self, key, default=__marker):
if key in self:
value = self[key]
del self[key]
elif default is self.__marker:
raise KeyError(key)
else:
value = default
return value
def setdefault(self, key, default=None):
if key in self:
value = self[key]
else:
self[key] = value = default
return value
@property
def maxsize(self):
"""The maximum size of the cache."""
return self.__maxsize
@property
def currsize(self):
"""The current size of the cache."""
return self.__currsize
@staticmethod
def getsizeof(value):
"""Return the size of a cache element's value."""
return 1
class FIFOCache(Cache):
"""First In First Out (FIFO) cache implementation."""
def __init__(self, maxsize, getsizeof=None):
Cache.__init__(self, maxsize, getsizeof)
self.__order = collections.OrderedDict()
def __setitem__(self, key, value, cache_setitem=Cache.__setitem__):
cache_setitem(self, key, value)
try:
self.__order.move_to_end(key)
except KeyError:
self.__order[key] = None
def __delitem__(self, key, cache_delitem=Cache.__delitem__):
cache_delitem(self, key)
del self.__order[key]
def popitem(self):
"""Remove and return the `(key, value)` pair first inserted."""
try:
key = next(iter(self.__order))
except StopIteration:
raise KeyError("%s is empty" % type(self).__name__) from None
else:
return (key, self.pop(key))
class LFUCache(Cache):
"""Least Frequently Used (LFU) cache implementation."""
def __init__(self, maxsize, getsizeof=None):
Cache.__init__(self, maxsize, getsizeof)
self.__counter = collections.Counter()
def __getitem__(self, key, cache_getitem=Cache.__getitem__):
value = cache_getitem(self, key)
if key in self: # __missing__ may not store item
self.__counter[key] -= 1
return value
def __setitem__(self, key, value, cache_setitem=Cache.__setitem__):
cache_setitem(self, key, value)
self.__counter[key] -= 1
def __delitem__(self, key, cache_delitem=Cache.__delitem__):
cache_delitem(self, key)
del self.__counter[key]
def popitem(self):
"""Remove and return the `(key, value)` pair least frequently used."""
try:
((key, _),) = self.__counter.most_common(1)
except ValueError:
raise KeyError("%s is empty" % type(self).__name__) from None
else:
return (key, self.pop(key))
class LRUCache(Cache):
"""Least Recently Used (LRU) cache implementation."""
def __init__(self, maxsize, getsizeof=None):
Cache.__init__(self, maxsize, getsizeof)
self.__order = collections.OrderedDict()
def __getitem__(self, key, cache_getitem=Cache.__getitem__):
value = cache_getitem(self, key)
if key in self: # __missing__ may not store item
self.__update(key)
return value
def __setitem__(self, key, value, cache_setitem=Cache.__setitem__):
cache_setitem(self, key, value)
self.__update(key)
def __delitem__(self, key, cache_delitem=Cache.__delitem__):
cache_delitem(self, key)
del self.__order[key]
def popitem(self):
"""Remove and return the `(key, value)` pair least recently used."""
try:
key = next(iter(self.__order))
except StopIteration:
raise KeyError("%s is empty" % type(self).__name__) from None
else:
return (key, self.pop(key))
def __update(self, key):
try:
self.__order.move_to_end(key)
except KeyError:
self.__order[key] = None
class MRUCache(Cache):
"""Most Recently Used (MRU) cache implementation."""
def __init__(self, maxsize, getsizeof=None):
from warnings import warn
warn("MRUCache is deprecated", DeprecationWarning, stacklevel=2)
Cache.__init__(self, maxsize, getsizeof)
self.__order = collections.OrderedDict()
def __getitem__(self, key, cache_getitem=Cache.__getitem__):
value = cache_getitem(self, key)
if key in self: # __missing__ may not store item
self.__update(key)
return value
def __setitem__(self, key, value, cache_setitem=Cache.__setitem__):
cache_setitem(self, key, value)
self.__update(key)
def __delitem__(self, key, cache_delitem=Cache.__delitem__):
cache_delitem(self, key)
del self.__order[key]
def popitem(self):
"""Remove and return the `(key, value)` pair most recently used."""
try:
key = next(iter(self.__order))
except StopIteration:
raise KeyError("%s is empty" % type(self).__name__) from None
else:
return (key, self.pop(key))
def __update(self, key):
try:
self.__order.move_to_end(key, last=False)
except KeyError:
self.__order[key] = None
class RRCache(Cache):
"""Random Replacement (RR) cache implementation."""
def __init__(self, maxsize, choice=random.choice, getsizeof=None):
Cache.__init__(self, maxsize, getsizeof)
self.__choice = choice
@property
def choice(self):
"""The `choice` function used by the cache."""
return self.__choice
def popitem(self):
"""Remove and return a random `(key, value)` pair."""
try:
key = self.__choice(list(self))
except IndexError:
raise KeyError("%s is empty" % type(self).__name__) from None
else:
return (key, self.pop(key))
class _TimedCache(Cache):
"""Base class for time aware cache implementations."""
class _Timer:
def __init__(self, timer):
self.__timer = timer
self.__nesting = 0
def __call__(self):
if self.__nesting == 0:
return self.__timer()
else:
return self.__time
def __enter__(self):
if self.__nesting == 0:
self.__time = time = self.__timer()
else:
time = self.__time
self.__nesting += 1
return time
def __exit__(self, *exc):
self.__nesting -= 1
def __reduce__(self):
return _TimedCache._Timer, (self.__timer,)
def __getattr__(self, name):
return getattr(self.__timer, name)
def __init__(self, maxsize, timer=time.monotonic, getsizeof=None):
Cache.__init__(self, maxsize, getsizeof)
self.__timer = _TimedCache._Timer(timer)
def __repr__(self, cache_repr=Cache.__repr__):
with self.__timer as time:
self.expire(time)
return cache_repr(self)
def __len__(self, cache_len=Cache.__len__):
with self.__timer as time:
self.expire(time)
return cache_len(self)
@property
def currsize(self):
with self.__timer as time:
self.expire(time)
return super().currsize
@property
def timer(self):
"""The timer function used by the cache."""
return self.__timer
def clear(self):
with self.__timer as time:
self.expire(time)
Cache.clear(self)
def get(self, *args, **kwargs):
with self.__timer:
return Cache.get(self, *args, **kwargs)
def pop(self, *args, **kwargs):
with self.__timer:
return Cache.pop(self, *args, **kwargs)
def setdefault(self, *args, **kwargs):
with self.__timer:
return Cache.setdefault(self, *args, **kwargs)
class TTLCache(_TimedCache):
"""LRU Cache implementation with per-item time-to-live (TTL) value."""
class _Link:
__slots__ = ("key", "expires", "next", "prev")
def __init__(self, key=None, expires=None):
self.key = key
self.expires = expires
def __reduce__(self):
return TTLCache._Link, (self.key, self.expires)
def unlink(self):
next = self.next
prev = self.prev
prev.next = next
next.prev = prev
def __init__(self, maxsize, ttl, timer=time.monotonic, getsizeof=None):
_TimedCache.__init__(self, maxsize, timer, getsizeof)
self.__root = root = TTLCache._Link()
root.prev = root.next = root
self.__links = collections.OrderedDict()
self.__ttl = ttl
def __contains__(self, key):
try:
link = self.__links[key] # no reordering
except KeyError:
return False
else:
return self.timer() < link.expires
def __getitem__(self, key, cache_getitem=Cache.__getitem__):
try:
link = self.__getlink(key)
except KeyError:
expired = False
else:
expired = not (self.timer() < link.expires)
if expired:
return self.__missing__(key)
else:
return cache_getitem(self, key)
def __setitem__(self, key, value, cache_setitem=Cache.__setitem__):
with self.timer as time:
self.expire(time)
cache_setitem(self, key, value)
try:
link = self.__getlink(key)
except KeyError:
self.__links[key] = link = TTLCache._Link(key)
else:
link.unlink()
link.expires = time + self.__ttl
link.next = root = self.__root
link.prev = prev = root.prev
prev.next = root.prev = link
def __delitem__(self, key, cache_delitem=Cache.__delitem__):
cache_delitem(self, key)
link = self.__links.pop(key)
link.unlink()
if not (self.timer() < link.expires):
raise KeyError(key)
def __iter__(self):
root = self.__root
curr = root.next
while curr is not root:
# "freeze" time for iterator access
with self.timer as time:
if time < curr.expires:
yield curr.key
curr = curr.next
def __setstate__(self, state):
self.__dict__.update(state)
root = self.__root
root.prev = root.next = root
for link in sorted(self.__links.values(), key=lambda obj: obj.expires):
link.next = root
link.prev = prev = root.prev
prev.next = root.prev = link
self.expire(self.timer())
@property
def ttl(self):
"""The time-to-live value of the cache's items."""
return self.__ttl
def expire(self, time=None):
"""Remove expired items from the cache and return an iterable of the
expired `(key, value)` pairs.
"""
if time is None:
time = self.timer()
root = self.__root
curr = root.next
links = self.__links
expired = []
cache_delitem = Cache.__delitem__
cache_getitem = Cache.__getitem__
while curr is not root and not (time < curr.expires):
expired.append((curr.key, cache_getitem(self, curr.key)))
cache_delitem(self, curr.key)
del links[curr.key]
next = curr.next
curr.unlink()
curr = next
return expired
def popitem(self):
"""Remove and return the `(key, value)` pair least recently used that
has not already expired.
"""
with self.timer as time:
self.expire(time)
try:
key = next(iter(self.__links))
except StopIteration:
raise KeyError("%s is empty" % type(self).__name__) from None
else:
return (key, self.pop(key))
def __getlink(self, key):
value = self.__links[key]
self.__links.move_to_end(key)
return value
class TLRUCache(_TimedCache):
"""Time aware Least Recently Used (TLRU) cache implementation."""
@functools.total_ordering
class _Item:
__slots__ = ("key", "expires", "removed")
def __init__(self, key=None, expires=None):
self.key = key
self.expires = expires
self.removed = False
def __lt__(self, other):
return self.expires < other.expires
def __init__(self, maxsize, ttu, timer=time.monotonic, getsizeof=None):
_TimedCache.__init__(self, maxsize, timer, getsizeof)
self.__items = collections.OrderedDict()
self.__order = []
self.__ttu = ttu
def __contains__(self, key):
try:
item = self.__items[key] # no reordering
except KeyError:
return False
else:
return self.timer() < item.expires
def __getitem__(self, key, cache_getitem=Cache.__getitem__):
try:
item = self.__getitem(key)
except KeyError:
expired = False
else:
expired = not (self.timer() < item.expires)
if expired:
return self.__missing__(key)
else:
return cache_getitem(self, key)
def __setitem__(self, key, value, cache_setitem=Cache.__setitem__):
with self.timer as time:
expires = self.__ttu(key, value, time)
if not (time < expires):
return # skip expired items
self.expire(time)
cache_setitem(self, key, value)
# removing an existing item would break the heap structure, so
# only mark it as removed for now
try:
self.__getitem(key).removed = True
except KeyError:
pass
self.__items[key] = item = TLRUCache._Item(key, expires)
heapq.heappush(self.__order, item)
def __delitem__(self, key, cache_delitem=Cache.__delitem__):
with self.timer as time:
# no self.expire() for performance reasons, e.g. self.clear() [#67]
cache_delitem(self, key)
item = self.__items.pop(key)
item.removed = True
if not (time < item.expires):
raise KeyError(key)
def __iter__(self):
for curr in self.__order:
# "freeze" time for iterator access
with self.timer as time:
if time < curr.expires and not curr.removed:
yield curr.key
@property
def ttu(self):
"""The local time-to-use function used by the cache."""
return self.__ttu
def expire(self, time=None):
"""Remove expired items from the cache and return an iterable of the
expired `(key, value)` pairs.
"""
if time is None:
time = self.timer()
items = self.__items
order = self.__order
# clean up the heap if too many items are marked as removed
if len(order) > len(items) * 2:
self.__order = order = [item for item in order if not item.removed]
heapq.heapify(order)
expired = []
cache_delitem = Cache.__delitem__
cache_getitem = Cache.__getitem__
while order and (order[0].removed or not (time < order[0].expires)):
item = heapq.heappop(order)
if not item.removed:
expired.append((item.key, cache_getitem(self, item.key)))
cache_delitem(self, item.key)
del items[item.key]
return expired
def popitem(self):
"""Remove and return the `(key, value)` pair least recently used that
has not already expired.
"""
with self.timer as time:
self.expire(time)
try:
key = next(iter(self.__items))
except StopIteration:
raise KeyError("%s is empty" % self.__class__.__name__) from None
else:
return (key, self.pop(key))
def __getitem(self, key):
value = self.__items[key]
self.__items.move_to_end(key)
return value
_CacheInfo = collections.namedtuple(
"CacheInfo", ["hits", "misses", "maxsize", "currsize"]
)
def cached(cache, key=keys.hashkey, lock=None, info=False):
"""Decorator to wrap a function with a memoizing callable that saves
results in a cache.
"""
def decorator(func):
if info:
hits = misses = 0
if isinstance(cache, Cache):
def getinfo():
nonlocal hits, misses
return _CacheInfo(hits, misses, cache.maxsize, cache.currsize)
elif isinstance(cache, collections.abc.Mapping):
def getinfo():
nonlocal hits, misses
return _CacheInfo(hits, misses, None, len(cache))
else:
def getinfo():
nonlocal hits, misses
return _CacheInfo(hits, misses, 0, 0)
if cache is None:
def wrapper(*args, **kwargs):
nonlocal misses
misses += 1
return func(*args, **kwargs)
def cache_clear():
nonlocal hits, misses
hits = misses = 0
cache_info = getinfo
elif lock is None:
def wrapper(*args, **kwargs):
nonlocal hits, misses
k = key(*args, **kwargs)
try:
result = cache[k]
hits += 1
return result
except KeyError:
misses += 1
v = func(*args, **kwargs)
try:
cache[k] = v
except ValueError:
pass # value too large
return v
def cache_clear():
nonlocal hits, misses
cache.clear()
hits = misses = 0
cache_info = getinfo
else:
def wrapper(*args, **kwargs):
nonlocal hits, misses
k = key(*args, **kwargs)
try:
with lock:
result = cache[k]
hits += 1
return result
except KeyError:
with lock:
misses += 1
v = func(*args, **kwargs)
# in case of a race, prefer the item already in the cache
try:
with lock:
return cache.setdefault(k, v)
except ValueError:
return v # value too large
def cache_clear():
nonlocal hits, misses
with lock:
cache.clear()
hits = misses = 0
def cache_info():
with lock:
return getinfo()
else:
if cache is None:
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
def cache_clear():
pass
elif lock is None:
def wrapper(*args, **kwargs):
k = key(*args, **kwargs)
try:
return cache[k]
except KeyError:
pass # key not found
v = func(*args, **kwargs)
try:
cache[k] = v
except ValueError:
pass # value too large
return v
def cache_clear():
cache.clear()
else:
def wrapper(*args, **kwargs):
k = key(*args, **kwargs)
try:
with lock:
return cache[k]
except KeyError:
pass # key not found
v = func(*args, **kwargs)
# in case of a race, prefer the item already in the cache
try:
with lock:
return cache.setdefault(k, v)
except ValueError:
return v # value too large
def cache_clear():
with lock:
cache.clear()
cache_info = None
wrapper.cache = cache
wrapper.cache_key = key
wrapper.cache_lock = lock
wrapper.cache_clear = cache_clear
wrapper.cache_info = cache_info
return functools.update_wrapper(wrapper, func)
return decorator
def cachedmethod(cache, key=keys.methodkey, lock=None):
"""Decorator to wrap a class or instance method with a memoizing
callable that saves results in a cache.
"""
def decorator(method):
if lock is None:
def wrapper(self, *args, **kwargs):
c = cache(self)
if c is None:
return method(self, *args, **kwargs)
k = key(self, *args, **kwargs)
try:
return c[k]
except KeyError:
pass # key not found
v = method(self, *args, **kwargs)
try:
c[k] = v
except ValueError:
pass # value too large
return v
def clear(self):
c = cache(self)
if c is not None:
c.clear()
else:
def wrapper(self, *args, **kwargs):
c = cache(self)
if c is None:
return method(self, *args, **kwargs)
k = key(self, *args, **kwargs)
try:
with lock(self):
return c[k]
except KeyError:
pass # key not found
v = method(self, *args, **kwargs)
# in case of a race, prefer the item already in the cache
try:
with lock(self):
return c.setdefault(k, v)
except ValueError:
return v # value too large
def clear(self):
c = cache(self)
if c is not None:
with lock(self):
c.clear()
wrapper.cache = cache
wrapper.cache_key = key
wrapper.cache_lock = lock
wrapper.cache_clear = clear
return functools.update_wrapper(wrapper, method)
return decorator