util.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486
  1. # util.py
  2. import contextlib
  3. import re
  4. from functools import lru_cache, wraps
  5. import inspect
  6. import itertools
  7. import types
  8. from typing import Callable, Union, Iterable, TypeVar, cast, Any
  9. import warnings
  10. _bslash = chr(92)
  11. C = TypeVar("C", bound=Callable)
  12. class __config_flags:
  13. """Internal class for defining compatibility and debugging flags"""
  14. _all_names: list[str] = []
  15. _fixed_names: list[str] = []
  16. _type_desc = "configuration"
  17. @classmethod
  18. def _set(cls, dname, value):
  19. if dname in cls._fixed_names:
  20. warnings.warn(
  21. f"{cls.__name__}.{dname} {cls._type_desc} is {str(getattr(cls, dname)).upper()}"
  22. f" and cannot be overridden",
  23. stacklevel=3,
  24. )
  25. return
  26. if dname in cls._all_names:
  27. setattr(cls, dname, value)
  28. else:
  29. raise ValueError(f"no such {cls._type_desc} {dname!r}")
  30. enable = classmethod(lambda cls, name: cls._set(name, True))
  31. disable = classmethod(lambda cls, name: cls._set(name, False))
  32. @lru_cache(maxsize=128)
  33. def col(loc: int, strg: str) -> int:
  34. """
  35. Returns current column within a string, counting newlines as line separators.
  36. The first column is number 1.
  37. Note: the default parsing behavior is to expand tabs in the input string
  38. before starting the parsing process. See
  39. :meth:`ParserElement.parse_string` for more
  40. information on parsing strings containing ``<TAB>`` s, and suggested
  41. methods to maintain a consistent view of the parsed string, the parse
  42. location, and line and column positions within the parsed string.
  43. """
  44. s = strg
  45. return 1 if 0 < loc < len(s) and s[loc - 1] == "\n" else loc - s.rfind("\n", 0, loc)
  46. @lru_cache(maxsize=128)
  47. def lineno(loc: int, strg: str) -> int:
  48. """Returns current line number within a string, counting newlines as line separators.
  49. The first line is number 1.
  50. Note - the default parsing behavior is to expand tabs in the input string
  51. before starting the parsing process. See :meth:`ParserElement.parse_string`
  52. for more information on parsing strings containing ``<TAB>`` s, and
  53. suggested methods to maintain a consistent view of the parsed string, the
  54. parse location, and line and column positions within the parsed string.
  55. """
  56. return strg.count("\n", 0, loc) + 1
  57. @lru_cache(maxsize=128)
  58. def line(loc: int, strg: str) -> str:
  59. """
  60. Returns the line of text containing loc within a string, counting newlines as line separators.
  61. """
  62. last_cr = strg.rfind("\n", 0, loc)
  63. next_cr = strg.find("\n", loc)
  64. return strg[last_cr + 1 : next_cr] if next_cr >= 0 else strg[last_cr + 1 :]
  65. class _UnboundedCache:
  66. def __init__(self):
  67. cache = {}
  68. cache_get = cache.get
  69. self.not_in_cache = not_in_cache = object()
  70. def get(_, key):
  71. return cache_get(key, not_in_cache)
  72. def set_(_, key, value):
  73. cache[key] = value
  74. def clear(_):
  75. cache.clear()
  76. self.size = None
  77. self.get = types.MethodType(get, self)
  78. self.set = types.MethodType(set_, self)
  79. self.clear = types.MethodType(clear, self)
  80. class _FifoCache:
  81. def __init__(self, size):
  82. cache = {}
  83. self.size = size
  84. self.not_in_cache = not_in_cache = object()
  85. cache_get = cache.get
  86. cache_pop = cache.pop
  87. def get(_, key):
  88. return cache_get(key, not_in_cache)
  89. def set_(_, key, value):
  90. cache[key] = value
  91. while len(cache) > size:
  92. # pop oldest element in cache by getting the first key
  93. cache_pop(next(iter(cache)))
  94. def clear(_):
  95. cache.clear()
  96. self.get = types.MethodType(get, self)
  97. self.set = types.MethodType(set_, self)
  98. self.clear = types.MethodType(clear, self)
  99. class LRUMemo:
  100. """
  101. A memoizing mapping that retains `capacity` deleted items
  102. The memo tracks retained items by their access order; once `capacity` items
  103. are retained, the least recently used item is discarded.
  104. """
  105. def __init__(self, capacity):
  106. self._capacity = capacity
  107. self._active = {}
  108. self._memory = {}
  109. def __getitem__(self, key):
  110. try:
  111. return self._active[key]
  112. except KeyError:
  113. self._memory[key] = self._memory.pop(key)
  114. return self._memory[key]
  115. def __setitem__(self, key, value):
  116. self._memory.pop(key, None)
  117. self._active[key] = value
  118. def __delitem__(self, key):
  119. try:
  120. value = self._active.pop(key)
  121. except KeyError:
  122. pass
  123. else:
  124. oldest_keys = list(self._memory)[: -(self._capacity + 1)]
  125. for key_to_delete in oldest_keys:
  126. self._memory.pop(key_to_delete)
  127. self._memory[key] = value
  128. def clear(self):
  129. self._active.clear()
  130. self._memory.clear()
  131. class UnboundedMemo(dict):
  132. """
  133. A memoizing mapping that retains all deleted items
  134. """
  135. def __delitem__(self, key):
  136. pass
  137. def _escape_regex_range_chars(s: str) -> str:
  138. # escape these chars: ^-[]
  139. for c in r"\^-[]":
  140. s = s.replace(c, _bslash + c)
  141. s = s.replace("\n", r"\n")
  142. s = s.replace("\t", r"\t")
  143. return str(s)
  144. class _GroupConsecutive:
  145. """
  146. Used as a callable `key` for itertools.groupby to group
  147. characters that are consecutive:
  148. .. testcode::
  149. from itertools import groupby
  150. from pyparsing.util import _GroupConsecutive
  151. grouped = groupby("abcdejkmpqrs", key=_GroupConsecutive())
  152. for index, group in grouped:
  153. print(tuple([index, list(group)]))
  154. prints:
  155. .. testoutput::
  156. (0, ['a', 'b', 'c', 'd', 'e'])
  157. (1, ['j', 'k'])
  158. (2, ['m'])
  159. (3, ['p', 'q', 'r', 's'])
  160. """
  161. def __init__(self) -> None:
  162. self.prev = 0
  163. self.counter = itertools.count()
  164. self.value = -1
  165. def __call__(self, char: str) -> int:
  166. c_int = ord(char)
  167. self.prev, prev = c_int, self.prev
  168. if c_int - prev > 1:
  169. self.value = next(self.counter)
  170. return self.value
  171. def _collapse_string_to_ranges(
  172. s: Union[str, Iterable[str]], re_escape: bool = True
  173. ) -> str:
  174. r"""
  175. Take a string or list of single-character strings, and return
  176. a string of the consecutive characters in that string collapsed
  177. into groups, as might be used in a regular expression '[a-z]'
  178. character set::
  179. 'a' -> 'a' -> '[a]'
  180. 'bc' -> 'bc' -> '[bc]'
  181. 'defgh' -> 'd-h' -> '[d-h]'
  182. 'fdgeh' -> 'd-h' -> '[d-h]'
  183. 'jklnpqrtu' -> 'j-lnp-rtu' -> '[j-lnp-rtu]'
  184. Duplicates get collapsed out::
  185. 'aaa' -> 'a' -> '[a]'
  186. 'bcbccb' -> 'bc' -> '[bc]'
  187. 'defghhgf' -> 'd-h' -> '[d-h]'
  188. 'jklnpqrjjjtu' -> 'j-lnp-rtu' -> '[j-lnp-rtu]'
  189. Spaces are preserved::
  190. 'ab c' -> ' a-c' -> '[ a-c]'
  191. Characters that are significant when defining regex ranges
  192. get escaped::
  193. 'acde[]-' -> r'\-\[\]ac-e' -> r'[\-\[\]ac-e]'
  194. """
  195. # Developer notes:
  196. # - Do not optimize this code assuming that the given input string
  197. # or internal lists will be short (such as in loading generators into
  198. # lists to make it easier to find the last element); this method is also
  199. # used to generate regex ranges for character sets in the pyparsing.unicode
  200. # classes, and these can be _very_ long lists of strings
  201. def escape_re_range_char(c: str) -> str:
  202. return "\\" + c if c in r"\^-][" else c
  203. def no_escape_re_range_char(c: str) -> str:
  204. return c
  205. if not re_escape:
  206. escape_re_range_char = no_escape_re_range_char
  207. ret = []
  208. # reduce input string to remove duplicates, and put in sorted order
  209. s_chars: list[str] = sorted(set(s))
  210. if len(s_chars) > 2:
  211. # find groups of characters that are consecutive (can be collapsed
  212. # down to "<first>-<last>")
  213. for _, chars in itertools.groupby(s_chars, key=_GroupConsecutive()):
  214. # _ is unimportant, is just used to identify groups
  215. # chars is an iterator of one or more consecutive characters
  216. # that comprise the current group
  217. first = last = next(chars)
  218. with contextlib.suppress(ValueError):
  219. *_, last = chars
  220. if first == last:
  221. # there was only a single char in this group
  222. ret.append(escape_re_range_char(first))
  223. elif last == chr(ord(first) + 1):
  224. # there were only 2 characters in this group
  225. # 'a','b' -> 'ab'
  226. ret.append(f"{escape_re_range_char(first)}{escape_re_range_char(last)}")
  227. else:
  228. # there were > 2 characters in this group, make into a range
  229. # 'c','d','e' -> 'c-e'
  230. ret.append(
  231. f"{escape_re_range_char(first)}-{escape_re_range_char(last)}"
  232. )
  233. else:
  234. # only 1 or 2 chars were given to form into groups
  235. # 'a' -> ['a']
  236. # 'bc' -> ['b', 'c']
  237. # 'dg' -> ['d', 'g']
  238. # no need to list them with "-", just return as a list
  239. # (after escaping)
  240. ret = [escape_re_range_char(c) for c in s_chars]
  241. return "".join(ret)
  242. def _flatten(ll: Iterable) -> list:
  243. ret = []
  244. for i in ll:
  245. # Developer notes:
  246. # - do not collapse this section of code, isinstance checks are done
  247. # in optimal order
  248. if isinstance(i, str):
  249. ret.append(i)
  250. elif isinstance(i, Iterable):
  251. ret.extend(_flatten(i))
  252. else:
  253. ret.append(i)
  254. return ret
  255. def make_compressed_re(
  256. word_list: Iterable[str],
  257. max_level: int = 2,
  258. *,
  259. non_capturing_groups: bool = True,
  260. _level: int = 1,
  261. ) -> str:
  262. """
  263. Create a regular expression string from a list of words, collapsing by common
  264. prefixes and optional suffixes.
  265. Calls itself recursively to build nested sublists for each group of suffixes
  266. that have a shared prefix.
  267. """
  268. def get_suffixes_from_common_prefixes(namelist: list[str]):
  269. if len(namelist) > 1:
  270. for prefix, suffixes in itertools.groupby(namelist, key=lambda s: s[:1]):
  271. yield prefix, sorted([s[1:] for s in suffixes], key=len, reverse=True)
  272. else:
  273. yield namelist[0][0], [namelist[0][1:]]
  274. if _level == 1:
  275. if not word_list:
  276. raise ValueError("no words given to make_compressed_re()")
  277. if "" in word_list:
  278. raise ValueError("word list cannot contain empty string")
  279. else:
  280. # internal recursive call, just return empty string if no words
  281. if not word_list:
  282. return ""
  283. # dedupe the word list
  284. word_list = list({}.fromkeys(word_list))
  285. if max_level == 0:
  286. if any(len(wd) > 1 for wd in word_list):
  287. return "|".join(
  288. sorted([re.escape(wd) for wd in word_list], key=len, reverse=True)
  289. )
  290. else:
  291. return f"[{''.join(_escape_regex_range_chars(wd) for wd in word_list)}]"
  292. ret = []
  293. sep = ""
  294. ncgroup = "?:" if non_capturing_groups else ""
  295. for initial, suffixes in get_suffixes_from_common_prefixes(sorted(word_list)):
  296. ret.append(sep)
  297. sep = "|"
  298. initial = re.escape(initial)
  299. trailing = ""
  300. if "" in suffixes:
  301. trailing = "?"
  302. suffixes.remove("")
  303. if len(suffixes) > 1:
  304. if all(len(s) == 1 for s in suffixes):
  305. ret.append(
  306. f"{initial}[{''.join(_escape_regex_range_chars(s) for s in suffixes)}]{trailing}"
  307. )
  308. else:
  309. if _level < max_level:
  310. suffix_re = make_compressed_re(
  311. sorted(suffixes),
  312. max_level,
  313. non_capturing_groups=non_capturing_groups,
  314. _level=_level + 1,
  315. )
  316. ret.append(f"{initial}({ncgroup}{suffix_re}){trailing}")
  317. else:
  318. if all(len(s) == 1 for s in suffixes):
  319. ret.append(
  320. f"{initial}[{''.join(_escape_regex_range_chars(s) for s in suffixes)}]{trailing}"
  321. )
  322. else:
  323. suffixes.sort(key=len, reverse=True)
  324. ret.append(
  325. f"{initial}({ncgroup}{'|'.join(re.escape(s) for s in suffixes)}){trailing}"
  326. )
  327. else:
  328. if suffixes:
  329. suffix = re.escape(suffixes[0])
  330. if len(suffix) > 1 and trailing:
  331. ret.append(f"{initial}({ncgroup}{suffix}){trailing}")
  332. else:
  333. ret.append(f"{initial}{suffix}{trailing}")
  334. else:
  335. ret.append(initial)
  336. return "".join(ret)
  337. def replaced_by_pep8(compat_name: str, fn: C) -> C:
  338. # Unwrap staticmethod/classmethod
  339. fn = getattr(fn, "__func__", fn)
  340. # (Presence of 'self' arg in signature is used by explain_exception() methods, so we take
  341. # some extra steps to add it if present in decorated function.)
  342. if ["self"] == list(inspect.signature(fn).parameters)[:1]:
  343. @wraps(fn)
  344. def _inner(self, *args, **kwargs):
  345. warnings.warn(
  346. f"{compat_name!r} deprecated - use {fn.__name__!r}",
  347. DeprecationWarning,
  348. stacklevel=2,
  349. )
  350. return fn(self, *args, **kwargs)
  351. else:
  352. @wraps(fn)
  353. def _inner(*args, **kwargs):
  354. warnings.warn(
  355. f"{compat_name!r} deprecated - use {fn.__name__!r}",
  356. DeprecationWarning,
  357. stacklevel=2,
  358. )
  359. return fn(*args, **kwargs)
  360. _inner.__doc__ = f"""
  361. .. deprecated:: 3.0.0
  362. Use :class:`{fn.__name__}` instead
  363. """
  364. _inner.__name__ = compat_name
  365. _inner.__annotations__ = fn.__annotations__
  366. if isinstance(fn, types.FunctionType):
  367. _inner.__kwdefaults__ = fn.__kwdefaults__ # type: ignore [attr-defined]
  368. elif isinstance(fn, type) and hasattr(fn, "__init__"):
  369. _inner.__kwdefaults__ = fn.__init__.__kwdefaults__ # type: ignore [misc,attr-defined]
  370. else:
  371. _inner.__kwdefaults__ = None # type: ignore [attr-defined]
  372. _inner.__qualname__ = fn.__qualname__
  373. return cast(C, _inner)
  374. def deprecate_argument(
  375. kwargs: dict[str, Any], arg_name: str, default_value=None, *, new_name: str = ""
  376. ) -> Any:
  377. def to_pep8_name(s: str, _re_sub_pattern=re.compile(r"([a-z])([A-Z])")) -> str:
  378. s = _re_sub_pattern.sub(r"\1_\2", s)
  379. return s.lower()
  380. if arg_name in kwargs:
  381. new_name = new_name or to_pep8_name(arg_name)
  382. warnings.warn(
  383. f"{arg_name!r} argument is deprecated, use {new_name!r}",
  384. category=DeprecationWarning,
  385. stacklevel=3,
  386. )
  387. else:
  388. kwargs[arg_name] = default_value
  389. return kwargs[arg_name]