Spaces:
Running
Running
"""Extensible memoizing collections and decorators.""" | |
__all__ = ( | |
"Cache", | |
"FIFOCache", | |
"LFUCache", | |
"LRUCache", | |
"MRUCache", | |
"RRCache", | |
"TLRUCache", | |
"TTLCache", | |
"cached", | |
"cachedmethod", | |
) | |
__version__ = "5.5.0" | |
import collections | |
import collections.abc | |
import functools | |
import heapq | |
import random | |
import time | |
from . import keys | |
class _DefaultSize: | |
__slots__ = () | |
def __getitem__(self, _): | |
return 1 | |
def __setitem__(self, _, value): | |
assert value == 1 | |
def pop(self, _): | |
return 1 | |
class Cache(collections.abc.MutableMapping): | |
"""Mutable mapping to serve as a simple cache or cache base class.""" | |
__marker = object() | |
__size = _DefaultSize() | |
def __init__(self, maxsize, getsizeof=None): | |
if getsizeof: | |
self.getsizeof = getsizeof | |
if self.getsizeof is not Cache.getsizeof: | |
self.__size = dict() | |
self.__data = dict() | |
self.__currsize = 0 | |
self.__maxsize = maxsize | |
def __repr__(self): | |
return "%s(%s, maxsize=%r, currsize=%r)" % ( | |
self.__class__.__name__, | |
repr(self.__data), | |
self.__maxsize, | |
self.__currsize, | |
) | |
def __getitem__(self, key): | |
try: | |
return self.__data[key] | |
except KeyError: | |
return self.__missing__(key) | |
def __setitem__(self, key, value): | |
maxsize = self.__maxsize | |
size = self.getsizeof(value) | |
if size > maxsize: | |
raise ValueError("value too large") | |
if key not in self.__data or self.__size[key] < size: | |
while self.__currsize + size > maxsize: | |
self.popitem() | |
if key in self.__data: | |
diffsize = size - self.__size[key] | |
else: | |
diffsize = size | |
self.__data[key] = value | |
self.__size[key] = size | |
self.__currsize += diffsize | |
def __delitem__(self, key): | |
size = self.__size.pop(key) | |
del self.__data[key] | |
self.__currsize -= size | |
def __contains__(self, key): | |
return key in self.__data | |
def __missing__(self, key): | |
raise KeyError(key) | |
def __iter__(self): | |
return iter(self.__data) | |
def __len__(self): | |
return len(self.__data) | |
def get(self, key, default=None): | |
if key in self: | |
return self[key] | |
else: | |
return default | |
def pop(self, key, default=__marker): | |
if key in self: | |
value = self[key] | |
del self[key] | |
elif default is self.__marker: | |
raise KeyError(key) | |
else: | |
value = default | |
return value | |
def setdefault(self, key, default=None): | |
if key in self: | |
value = self[key] | |
else: | |
self[key] = value = default | |
return value | |
def maxsize(self): | |
"""The maximum size of the cache.""" | |
return self.__maxsize | |
def currsize(self): | |
"""The current size of the cache.""" | |
return self.__currsize | |
def getsizeof(value): | |
"""Return the size of a cache element's value.""" | |
return 1 | |
class FIFOCache(Cache): | |
"""First In First Out (FIFO) cache implementation.""" | |
def __init__(self, maxsize, getsizeof=None): | |
Cache.__init__(self, maxsize, getsizeof) | |
self.__order = collections.OrderedDict() | |
def __setitem__(self, key, value, cache_setitem=Cache.__setitem__): | |
cache_setitem(self, key, value) | |
try: | |
self.__order.move_to_end(key) | |
except KeyError: | |
self.__order[key] = None | |
def __delitem__(self, key, cache_delitem=Cache.__delitem__): | |
cache_delitem(self, key) | |
del self.__order[key] | |
def popitem(self): | |
"""Remove and return the `(key, value)` pair first inserted.""" | |
try: | |
key = next(iter(self.__order)) | |
except StopIteration: | |
raise KeyError("%s is empty" % type(self).__name__) from None | |
else: | |
return (key, self.pop(key)) | |
class LFUCache(Cache): | |
"""Least Frequently Used (LFU) cache implementation.""" | |
def __init__(self, maxsize, getsizeof=None): | |
Cache.__init__(self, maxsize, getsizeof) | |
self.__counter = collections.Counter() | |
def __getitem__(self, key, cache_getitem=Cache.__getitem__): | |
value = cache_getitem(self, key) | |
if key in self: # __missing__ may not store item | |
self.__counter[key] -= 1 | |
return value | |
def __setitem__(self, key, value, cache_setitem=Cache.__setitem__): | |
cache_setitem(self, key, value) | |
self.__counter[key] -= 1 | |
def __delitem__(self, key, cache_delitem=Cache.__delitem__): | |
cache_delitem(self, key) | |
del self.__counter[key] | |
def popitem(self): | |
"""Remove and return the `(key, value)` pair least frequently used.""" | |
try: | |
((key, _),) = self.__counter.most_common(1) | |
except ValueError: | |
raise KeyError("%s is empty" % type(self).__name__) from None | |
else: | |
return (key, self.pop(key)) | |
class LRUCache(Cache): | |
"""Least Recently Used (LRU) cache implementation.""" | |
def __init__(self, maxsize, getsizeof=None): | |
Cache.__init__(self, maxsize, getsizeof) | |
self.__order = collections.OrderedDict() | |
def __getitem__(self, key, cache_getitem=Cache.__getitem__): | |
value = cache_getitem(self, key) | |
if key in self: # __missing__ may not store item | |
self.__update(key) | |
return value | |
def __setitem__(self, key, value, cache_setitem=Cache.__setitem__): | |
cache_setitem(self, key, value) | |
self.__update(key) | |
def __delitem__(self, key, cache_delitem=Cache.__delitem__): | |
cache_delitem(self, key) | |
del self.__order[key] | |
def popitem(self): | |
"""Remove and return the `(key, value)` pair least recently used.""" | |
try: | |
key = next(iter(self.__order)) | |
except StopIteration: | |
raise KeyError("%s is empty" % type(self).__name__) from None | |
else: | |
return (key, self.pop(key)) | |
def __update(self, key): | |
try: | |
self.__order.move_to_end(key) | |
except KeyError: | |
self.__order[key] = None | |
class MRUCache(Cache): | |
"""Most Recently Used (MRU) cache implementation.""" | |
def __init__(self, maxsize, getsizeof=None): | |
from warnings import warn | |
warn("MRUCache is deprecated", DeprecationWarning, stacklevel=2) | |
Cache.__init__(self, maxsize, getsizeof) | |
self.__order = collections.OrderedDict() | |
def __getitem__(self, key, cache_getitem=Cache.__getitem__): | |
value = cache_getitem(self, key) | |
if key in self: # __missing__ may not store item | |
self.__update(key) | |
return value | |
def __setitem__(self, key, value, cache_setitem=Cache.__setitem__): | |
cache_setitem(self, key, value) | |
self.__update(key) | |
def __delitem__(self, key, cache_delitem=Cache.__delitem__): | |
cache_delitem(self, key) | |
del self.__order[key] | |
def popitem(self): | |
"""Remove and return the `(key, value)` pair most recently used.""" | |
try: | |
key = next(iter(self.__order)) | |
except StopIteration: | |
raise KeyError("%s is empty" % type(self).__name__) from None | |
else: | |
return (key, self.pop(key)) | |
def __update(self, key): | |
try: | |
self.__order.move_to_end(key, last=False) | |
except KeyError: | |
self.__order[key] = None | |
class RRCache(Cache): | |
"""Random Replacement (RR) cache implementation.""" | |
def __init__(self, maxsize, choice=random.choice, getsizeof=None): | |
Cache.__init__(self, maxsize, getsizeof) | |
self.__choice = choice | |
def choice(self): | |
"""The `choice` function used by the cache.""" | |
return self.__choice | |
def popitem(self): | |
"""Remove and return a random `(key, value)` pair.""" | |
try: | |
key = self.__choice(list(self)) | |
except IndexError: | |
raise KeyError("%s is empty" % type(self).__name__) from None | |
else: | |
return (key, self.pop(key)) | |
class _TimedCache(Cache): | |
"""Base class for time aware cache implementations.""" | |
class _Timer: | |
def __init__(self, timer): | |
self.__timer = timer | |
self.__nesting = 0 | |
def __call__(self): | |
if self.__nesting == 0: | |
return self.__timer() | |
else: | |
return self.__time | |
def __enter__(self): | |
if self.__nesting == 0: | |
self.__time = time = self.__timer() | |
else: | |
time = self.__time | |
self.__nesting += 1 | |
return time | |
def __exit__(self, *exc): | |
self.__nesting -= 1 | |
def __reduce__(self): | |
return _TimedCache._Timer, (self.__timer,) | |
def __getattr__(self, name): | |
return getattr(self.__timer, name) | |
def __init__(self, maxsize, timer=time.monotonic, getsizeof=None): | |
Cache.__init__(self, maxsize, getsizeof) | |
self.__timer = _TimedCache._Timer(timer) | |
def __repr__(self, cache_repr=Cache.__repr__): | |
with self.__timer as time: | |
self.expire(time) | |
return cache_repr(self) | |
def __len__(self, cache_len=Cache.__len__): | |
with self.__timer as time: | |
self.expire(time) | |
return cache_len(self) | |
def currsize(self): | |
with self.__timer as time: | |
self.expire(time) | |
return super().currsize | |
def timer(self): | |
"""The timer function used by the cache.""" | |
return self.__timer | |
def clear(self): | |
with self.__timer as time: | |
self.expire(time) | |
Cache.clear(self) | |
def get(self, *args, **kwargs): | |
with self.__timer: | |
return Cache.get(self, *args, **kwargs) | |
def pop(self, *args, **kwargs): | |
with self.__timer: | |
return Cache.pop(self, *args, **kwargs) | |
def setdefault(self, *args, **kwargs): | |
with self.__timer: | |
return Cache.setdefault(self, *args, **kwargs) | |
class TTLCache(_TimedCache): | |
"""LRU Cache implementation with per-item time-to-live (TTL) value.""" | |
class _Link: | |
__slots__ = ("key", "expires", "next", "prev") | |
def __init__(self, key=None, expires=None): | |
self.key = key | |
self.expires = expires | |
def __reduce__(self): | |
return TTLCache._Link, (self.key, self.expires) | |
def unlink(self): | |
next = self.next | |
prev = self.prev | |
prev.next = next | |
next.prev = prev | |
def __init__(self, maxsize, ttl, timer=time.monotonic, getsizeof=None): | |
_TimedCache.__init__(self, maxsize, timer, getsizeof) | |
self.__root = root = TTLCache._Link() | |
root.prev = root.next = root | |
self.__links = collections.OrderedDict() | |
self.__ttl = ttl | |
def __contains__(self, key): | |
try: | |
link = self.__links[key] # no reordering | |
except KeyError: | |
return False | |
else: | |
return self.timer() < link.expires | |
def __getitem__(self, key, cache_getitem=Cache.__getitem__): | |
try: | |
link = self.__getlink(key) | |
except KeyError: | |
expired = False | |
else: | |
expired = not (self.timer() < link.expires) | |
if expired: | |
return self.__missing__(key) | |
else: | |
return cache_getitem(self, key) | |
def __setitem__(self, key, value, cache_setitem=Cache.__setitem__): | |
with self.timer as time: | |
self.expire(time) | |
cache_setitem(self, key, value) | |
try: | |
link = self.__getlink(key) | |
except KeyError: | |
self.__links[key] = link = TTLCache._Link(key) | |
else: | |
link.unlink() | |
link.expires = time + self.__ttl | |
link.next = root = self.__root | |
link.prev = prev = root.prev | |
prev.next = root.prev = link | |
def __delitem__(self, key, cache_delitem=Cache.__delitem__): | |
cache_delitem(self, key) | |
link = self.__links.pop(key) | |
link.unlink() | |
if not (self.timer() < link.expires): | |
raise KeyError(key) | |
def __iter__(self): | |
root = self.__root | |
curr = root.next | |
while curr is not root: | |
# "freeze" time for iterator access | |
with self.timer as time: | |
if time < curr.expires: | |
yield curr.key | |
curr = curr.next | |
def __setstate__(self, state): | |
self.__dict__.update(state) | |
root = self.__root | |
root.prev = root.next = root | |
for link in sorted(self.__links.values(), key=lambda obj: obj.expires): | |
link.next = root | |
link.prev = prev = root.prev | |
prev.next = root.prev = link | |
self.expire(self.timer()) | |
def ttl(self): | |
"""The time-to-live value of the cache's items.""" | |
return self.__ttl | |
def expire(self, time=None): | |
"""Remove expired items from the cache and return an iterable of the | |
expired `(key, value)` pairs. | |
""" | |
if time is None: | |
time = self.timer() | |
root = self.__root | |
curr = root.next | |
links = self.__links | |
expired = [] | |
cache_delitem = Cache.__delitem__ | |
cache_getitem = Cache.__getitem__ | |
while curr is not root and not (time < curr.expires): | |
expired.append((curr.key, cache_getitem(self, curr.key))) | |
cache_delitem(self, curr.key) | |
del links[curr.key] | |
next = curr.next | |
curr.unlink() | |
curr = next | |
return expired | |
def popitem(self): | |
"""Remove and return the `(key, value)` pair least recently used that | |
has not already expired. | |
""" | |
with self.timer as time: | |
self.expire(time) | |
try: | |
key = next(iter(self.__links)) | |
except StopIteration: | |
raise KeyError("%s is empty" % type(self).__name__) from None | |
else: | |
return (key, self.pop(key)) | |
def __getlink(self, key): | |
value = self.__links[key] | |
self.__links.move_to_end(key) | |
return value | |
class TLRUCache(_TimedCache): | |
"""Time aware Least Recently Used (TLRU) cache implementation.""" | |
class _Item: | |
__slots__ = ("key", "expires", "removed") | |
def __init__(self, key=None, expires=None): | |
self.key = key | |
self.expires = expires | |
self.removed = False | |
def __lt__(self, other): | |
return self.expires < other.expires | |
def __init__(self, maxsize, ttu, timer=time.monotonic, getsizeof=None): | |
_TimedCache.__init__(self, maxsize, timer, getsizeof) | |
self.__items = collections.OrderedDict() | |
self.__order = [] | |
self.__ttu = ttu | |
def __contains__(self, key): | |
try: | |
item = self.__items[key] # no reordering | |
except KeyError: | |
return False | |
else: | |
return self.timer() < item.expires | |
def __getitem__(self, key, cache_getitem=Cache.__getitem__): | |
try: | |
item = self.__getitem(key) | |
except KeyError: | |
expired = False | |
else: | |
expired = not (self.timer() < item.expires) | |
if expired: | |
return self.__missing__(key) | |
else: | |
return cache_getitem(self, key) | |
def __setitem__(self, key, value, cache_setitem=Cache.__setitem__): | |
with self.timer as time: | |
expires = self.__ttu(key, value, time) | |
if not (time < expires): | |
return # skip expired items | |
self.expire(time) | |
cache_setitem(self, key, value) | |
# removing an existing item would break the heap structure, so | |
# only mark it as removed for now | |
try: | |
self.__getitem(key).removed = True | |
except KeyError: | |
pass | |
self.__items[key] = item = TLRUCache._Item(key, expires) | |
heapq.heappush(self.__order, item) | |
def __delitem__(self, key, cache_delitem=Cache.__delitem__): | |
with self.timer as time: | |
# no self.expire() for performance reasons, e.g. self.clear() [#67] | |
cache_delitem(self, key) | |
item = self.__items.pop(key) | |
item.removed = True | |
if not (time < item.expires): | |
raise KeyError(key) | |
def __iter__(self): | |
for curr in self.__order: | |
# "freeze" time for iterator access | |
with self.timer as time: | |
if time < curr.expires and not curr.removed: | |
yield curr.key | |
def ttu(self): | |
"""The local time-to-use function used by the cache.""" | |
return self.__ttu | |
def expire(self, time=None): | |
"""Remove expired items from the cache and return an iterable of the | |
expired `(key, value)` pairs. | |
""" | |
if time is None: | |
time = self.timer() | |
items = self.__items | |
order = self.__order | |
# clean up the heap if too many items are marked as removed | |
if len(order) > len(items) * 2: | |
self.__order = order = [item for item in order if not item.removed] | |
heapq.heapify(order) | |
expired = [] | |
cache_delitem = Cache.__delitem__ | |
cache_getitem = Cache.__getitem__ | |
while order and (order[0].removed or not (time < order[0].expires)): | |
item = heapq.heappop(order) | |
if not item.removed: | |
expired.append((item.key, cache_getitem(self, item.key))) | |
cache_delitem(self, item.key) | |
del items[item.key] | |
return expired | |
def popitem(self): | |
"""Remove and return the `(key, value)` pair least recently used that | |
has not already expired. | |
""" | |
with self.timer as time: | |
self.expire(time) | |
try: | |
key = next(iter(self.__items)) | |
except StopIteration: | |
raise KeyError("%s is empty" % self.__class__.__name__) from None | |
else: | |
return (key, self.pop(key)) | |
def __getitem(self, key): | |
value = self.__items[key] | |
self.__items.move_to_end(key) | |
return value | |
_CacheInfo = collections.namedtuple( | |
"CacheInfo", ["hits", "misses", "maxsize", "currsize"] | |
) | |
def cached(cache, key=keys.hashkey, lock=None, info=False): | |
"""Decorator to wrap a function with a memoizing callable that saves | |
results in a cache. | |
""" | |
def decorator(func): | |
if info: | |
hits = misses = 0 | |
if isinstance(cache, Cache): | |
def getinfo(): | |
nonlocal hits, misses | |
return _CacheInfo(hits, misses, cache.maxsize, cache.currsize) | |
elif isinstance(cache, collections.abc.Mapping): | |
def getinfo(): | |
nonlocal hits, misses | |
return _CacheInfo(hits, misses, None, len(cache)) | |
else: | |
def getinfo(): | |
nonlocal hits, misses | |
return _CacheInfo(hits, misses, 0, 0) | |
if cache is None: | |
def wrapper(*args, **kwargs): | |
nonlocal misses | |
misses += 1 | |
return func(*args, **kwargs) | |
def cache_clear(): | |
nonlocal hits, misses | |
hits = misses = 0 | |
cache_info = getinfo | |
elif lock is None: | |
def wrapper(*args, **kwargs): | |
nonlocal hits, misses | |
k = key(*args, **kwargs) | |
try: | |
result = cache[k] | |
hits += 1 | |
return result | |
except KeyError: | |
misses += 1 | |
v = func(*args, **kwargs) | |
try: | |
cache[k] = v | |
except ValueError: | |
pass # value too large | |
return v | |
def cache_clear(): | |
nonlocal hits, misses | |
cache.clear() | |
hits = misses = 0 | |
cache_info = getinfo | |
else: | |
def wrapper(*args, **kwargs): | |
nonlocal hits, misses | |
k = key(*args, **kwargs) | |
try: | |
with lock: | |
result = cache[k] | |
hits += 1 | |
return result | |
except KeyError: | |
with lock: | |
misses += 1 | |
v = func(*args, **kwargs) | |
# in case of a race, prefer the item already in the cache | |
try: | |
with lock: | |
return cache.setdefault(k, v) | |
except ValueError: | |
return v # value too large | |
def cache_clear(): | |
nonlocal hits, misses | |
with lock: | |
cache.clear() | |
hits = misses = 0 | |
def cache_info(): | |
with lock: | |
return getinfo() | |
else: | |
if cache is None: | |
def wrapper(*args, **kwargs): | |
return func(*args, **kwargs) | |
def cache_clear(): | |
pass | |
elif lock is None: | |
def wrapper(*args, **kwargs): | |
k = key(*args, **kwargs) | |
try: | |
return cache[k] | |
except KeyError: | |
pass # key not found | |
v = func(*args, **kwargs) | |
try: | |
cache[k] = v | |
except ValueError: | |
pass # value too large | |
return v | |
def cache_clear(): | |
cache.clear() | |
else: | |
def wrapper(*args, **kwargs): | |
k = key(*args, **kwargs) | |
try: | |
with lock: | |
return cache[k] | |
except KeyError: | |
pass # key not found | |
v = func(*args, **kwargs) | |
# in case of a race, prefer the item already in the cache | |
try: | |
with lock: | |
return cache.setdefault(k, v) | |
except ValueError: | |
return v # value too large | |
def cache_clear(): | |
with lock: | |
cache.clear() | |
cache_info = None | |
wrapper.cache = cache | |
wrapper.cache_key = key | |
wrapper.cache_lock = lock | |
wrapper.cache_clear = cache_clear | |
wrapper.cache_info = cache_info | |
return functools.update_wrapper(wrapper, func) | |
return decorator | |
def cachedmethod(cache, key=keys.methodkey, lock=None): | |
"""Decorator to wrap a class or instance method with a memoizing | |
callable that saves results in a cache. | |
""" | |
def decorator(method): | |
if lock is None: | |
def wrapper(self, *args, **kwargs): | |
c = cache(self) | |
if c is None: | |
return method(self, *args, **kwargs) | |
k = key(self, *args, **kwargs) | |
try: | |
return c[k] | |
except KeyError: | |
pass # key not found | |
v = method(self, *args, **kwargs) | |
try: | |
c[k] = v | |
except ValueError: | |
pass # value too large | |
return v | |
def clear(self): | |
c = cache(self) | |
if c is not None: | |
c.clear() | |
else: | |
def wrapper(self, *args, **kwargs): | |
c = cache(self) | |
if c is None: | |
return method(self, *args, **kwargs) | |
k = key(self, *args, **kwargs) | |
try: | |
with lock(self): | |
return c[k] | |
except KeyError: | |
pass # key not found | |
v = method(self, *args, **kwargs) | |
# in case of a race, prefer the item already in the cache | |
try: | |
with lock(self): | |
return c.setdefault(k, v) | |
except ValueError: | |
return v # value too large | |
def clear(self): | |
c = cache(self) | |
if c is not None: | |
with lock(self): | |
c.clear() | |
wrapper.cache = cache | |
wrapper.cache_key = key | |
wrapper.cache_lock = lock | |
wrapper.cache_clear = clear | |
return functools.update_wrapper(wrapper, method) | |
return decorator | |