File size: 8,437 Bytes
fe41391 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 |
# util.py
import inspect
import warnings
import types
import collections
import itertools
from functools import lru_cache, wraps
from typing import Callable, List, Union, Iterable, TypeVar, cast
_bslash = chr(92)
C = TypeVar("C", bound=Callable)
class __config_flags:
"""Internal class for defining compatibility and debugging flags"""
_all_names: List[str] = []
_fixed_names: List[str] = []
_type_desc = "configuration"
@classmethod
def _set(cls, dname, value):
if dname in cls._fixed_names:
warnings.warn(
f"{cls.__name__}.{dname} {cls._type_desc} is {str(getattr(cls, dname)).upper()}"
f" and cannot be overridden",
stacklevel=3,
)
return
if dname in cls._all_names:
setattr(cls, dname, value)
else:
raise ValueError(f"no such {cls._type_desc} {dname!r}")
enable = classmethod(lambda cls, name: cls._set(name, True))
disable = classmethod(lambda cls, name: cls._set(name, False))
@lru_cache(maxsize=128)
def col(loc: int, strg: str) -> int:
"""
Returns current column within a string, counting newlines as line separators.
The first column is number 1.
Note: the default parsing behavior is to expand tabs in the input string
before starting the parsing process. See
:class:`ParserElement.parse_string` for more
information on parsing strings containing ``<TAB>`` s, and suggested
methods to maintain a consistent view of the parsed string, the parse
location, and line and column positions within the parsed string.
"""
s = strg
return 1 if 0 < loc < len(s) and s[loc - 1] == "\n" else loc - s.rfind("\n", 0, loc)
@lru_cache(maxsize=128)
def lineno(loc: int, strg: str) -> int:
"""Returns current line number within a string, counting newlines as line separators.
The first line is number 1.
Note - the default parsing behavior is to expand tabs in the input string
before starting the parsing process. See :class:`ParserElement.parse_string`
for more information on parsing strings containing ``<TAB>`` s, and
suggested methods to maintain a consistent view of the parsed string, the
parse location, and line and column positions within the parsed string.
"""
return strg.count("\n", 0, loc) + 1
@lru_cache(maxsize=128)
def line(loc: int, strg: str) -> str:
"""
Returns the line of text containing loc within a string, counting newlines as line separators.
"""
last_cr = strg.rfind("\n", 0, loc)
next_cr = strg.find("\n", loc)
return strg[last_cr + 1 : next_cr] if next_cr >= 0 else strg[last_cr + 1 :]
class _UnboundedCache:
def __init__(self):
cache = {}
cache_get = cache.get
self.not_in_cache = not_in_cache = object()
def get(_, key):
return cache_get(key, not_in_cache)
def set_(_, key, value):
cache[key] = value
def clear(_):
cache.clear()
self.size = None
self.get = types.MethodType(get, self)
self.set = types.MethodType(set_, self)
self.clear = types.MethodType(clear, self)
class _FifoCache:
def __init__(self, size):
self.not_in_cache = not_in_cache = object()
cache = {}
keyring = [object()] * size
cache_get = cache.get
cache_pop = cache.pop
keyiter = itertools.cycle(range(size))
def get(_, key):
return cache_get(key, not_in_cache)
def set_(_, key, value):
cache[key] = value
i = next(keyiter)
cache_pop(keyring[i], None)
keyring[i] = key
def clear(_):
cache.clear()
keyring[:] = [object()] * size
self.size = size
self.get = types.MethodType(get, self)
self.set = types.MethodType(set_, self)
self.clear = types.MethodType(clear, self)
class LRUMemo:
"""
A memoizing mapping that retains `capacity` deleted items
The memo tracks retained items by their access order; once `capacity` items
are retained, the least recently used item is discarded.
"""
def __init__(self, capacity):
self._capacity = capacity
self._active = {}
self._memory = collections.OrderedDict()
def __getitem__(self, key):
try:
return self._active[key]
except KeyError:
self._memory.move_to_end(key)
return self._memory[key]
def __setitem__(self, key, value):
self._memory.pop(key, None)
self._active[key] = value
def __delitem__(self, key):
try:
value = self._active.pop(key)
except KeyError:
pass
else:
while len(self._memory) >= self._capacity:
self._memory.popitem(last=False)
self._memory[key] = value
def clear(self):
self._active.clear()
self._memory.clear()
class UnboundedMemo(dict):
"""
A memoizing mapping that retains all deleted items
"""
def __delitem__(self, key):
pass
def _escape_regex_range_chars(s: str) -> str:
# escape these chars: ^-[]
for c in r"\^-[]":
s = s.replace(c, _bslash + c)
s = s.replace("\n", r"\n")
s = s.replace("\t", r"\t")
return str(s)
def _collapse_string_to_ranges(
s: Union[str, Iterable[str]], re_escape: bool = True
) -> str:
def is_consecutive(c):
c_int = ord(c)
is_consecutive.prev, prev = c_int, is_consecutive.prev
if c_int - prev > 1:
is_consecutive.value = next(is_consecutive.counter)
return is_consecutive.value
is_consecutive.prev = 0 # type: ignore [attr-defined]
is_consecutive.counter = itertools.count() # type: ignore [attr-defined]
is_consecutive.value = -1 # type: ignore [attr-defined]
def escape_re_range_char(c):
return "\\" + c if c in r"\^-][" else c
def no_escape_re_range_char(c):
return c
if not re_escape:
escape_re_range_char = no_escape_re_range_char
ret = []
s = "".join(sorted(set(s)))
if len(s) > 3:
for _, chars in itertools.groupby(s, key=is_consecutive):
first = last = next(chars)
last = collections.deque(
itertools.chain(iter([last]), chars), maxlen=1
).pop()
if first == last:
ret.append(escape_re_range_char(first))
else:
sep = "" if ord(last) == ord(first) + 1 else "-"
ret.append(
f"{escape_re_range_char(first)}{sep}{escape_re_range_char(last)}"
)
else:
ret = [escape_re_range_char(c) for c in s]
return "".join(ret)
def _flatten(ll: list) -> list:
ret = []
for i in ll:
if isinstance(i, list):
ret.extend(_flatten(i))
else:
ret.append(i)
return ret
def replaced_by_pep8(compat_name: str, fn: C) -> C:
# In a future version, uncomment the code in the internal _inner() functions
# to begin emitting DeprecationWarnings.
# Unwrap staticmethod/classmethod
fn = getattr(fn, "__func__", fn)
# (Presence of 'self' arg in signature is used by explain_exception() methods, so we take
# some extra steps to add it if present in decorated function.)
if "self" == list(inspect.signature(fn).parameters)[0]:
@wraps(fn)
def _inner(self, *args, **kwargs):
# warnings.warn(
# f"Deprecated - use {fn.__name__}", DeprecationWarning, stacklevel=2
# )
return fn(self, *args, **kwargs)
else:
@wraps(fn)
def _inner(*args, **kwargs):
# warnings.warn(
# f"Deprecated - use {fn.__name__}", DeprecationWarning, stacklevel=2
# )
return fn(*args, **kwargs)
_inner.__doc__ = f"""Deprecated - use :class:`{fn.__name__}`"""
_inner.__name__ = compat_name
_inner.__annotations__ = fn.__annotations__
if isinstance(fn, types.FunctionType):
_inner.__kwdefaults__ = fn.__kwdefaults__
elif isinstance(fn, type) and hasattr(fn, "__init__"):
_inner.__kwdefaults__ = fn.__init__.__kwdefaults__
else:
_inner.__kwdefaults__ = None
_inner.__qualname__ = fn.__qualname__
return cast(C, _inner)
|