_exceptions.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. from __future__ import annotations
  2. import contextlib
  3. import inspect
  4. import os
  5. import re
  6. from typing import (
  7. TYPE_CHECKING,
  8. Any,
  9. )
  10. import warnings
  11. if TYPE_CHECKING:
  12. from collections.abc import Generator
  13. from types import FrameType
  14. @contextlib.contextmanager
  15. def rewrite_exception(old_name: str, new_name: str) -> Generator[None]:
  16. """
  17. Rewrite the message of an exception.
  18. """
  19. try:
  20. yield
  21. except Exception as err:
  22. if not err.args:
  23. raise
  24. msg = str(err.args[0])
  25. msg = msg.replace(old_name, new_name)
  26. args: tuple[Any, ...] = (msg,)
  27. if len(err.args) > 1:
  28. args = args + err.args[1:]
  29. err.args = args
  30. raise
  31. def find_stack_level() -> int:
  32. """
  33. Find the first place in the stack that is not inside pandas
  34. (tests notwithstanding).
  35. """
  36. import pandas as pd
  37. pkg_dir = os.path.dirname(pd.__file__)
  38. test_dir = os.path.join(pkg_dir, "tests")
  39. # https://stackoverflow.com/questions/17407119/python-inspect-stack-is-slow
  40. frame: FrameType | None = inspect.currentframe()
  41. try:
  42. n = 0
  43. while frame:
  44. filename = inspect.getfile(frame)
  45. if filename.startswith(pkg_dir) and not filename.startswith(test_dir):
  46. frame = frame.f_back
  47. n += 1
  48. else:
  49. break
  50. finally:
  51. # See note in
  52. # https://docs.python.org/3/library/inspect.html#inspect.Traceback
  53. del frame
  54. return n
  55. @contextlib.contextmanager
  56. def rewrite_warning(
  57. target_message: str,
  58. target_category: type[Warning],
  59. new_message: str,
  60. new_category: type[Warning] | None = None,
  61. ) -> Generator[None]:
  62. """
  63. Rewrite the message of a warning.
  64. Parameters
  65. ----------
  66. target_message : str
  67. Warning message to match.
  68. target_category : Warning
  69. Warning type to match.
  70. new_message : str
  71. New warning message to emit.
  72. new_category : Warning or None, default None
  73. New warning type to emit. When None, will be the same as target_category.
  74. """
  75. if new_category is None:
  76. new_category = target_category
  77. with warnings.catch_warnings(record=True) as record:
  78. yield
  79. if len(record) > 0:
  80. match = re.compile(target_message)
  81. for warning in record:
  82. if warning.category is target_category and re.search(
  83. match, str(warning.message)
  84. ):
  85. category = new_category
  86. message: Warning | str = new_message
  87. else:
  88. category, message = warning.category, warning.message
  89. warnings.warn_explicit(
  90. message=message,
  91. category=category,
  92. filename=warning.filename,
  93. lineno=warning.lineno,
  94. )