build_tracker.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. from __future__ import annotations
  2. import contextlib
  3. import hashlib
  4. import logging
  5. import os
  6. from collections.abc import Generator
  7. from types import TracebackType
  8. from pip._internal.req.req_install import InstallRequirement
  9. from pip._internal.utils.temp_dir import TempDirectory
  10. logger = logging.getLogger(__name__)
  11. @contextlib.contextmanager
  12. def update_env_context_manager(**changes: str) -> Generator[None, None, None]:
  13. target = os.environ
  14. # Save values from the target and change them.
  15. non_existent_marker = object()
  16. saved_values: dict[str, object | str] = {}
  17. for name, new_value in changes.items():
  18. try:
  19. saved_values[name] = target[name]
  20. except KeyError:
  21. saved_values[name] = non_existent_marker
  22. target[name] = new_value
  23. try:
  24. yield
  25. finally:
  26. # Restore original values in the target.
  27. for name, original_value in saved_values.items():
  28. if original_value is non_existent_marker:
  29. del target[name]
  30. else:
  31. assert isinstance(original_value, str) # for mypy
  32. target[name] = original_value
  33. @contextlib.contextmanager
  34. def get_build_tracker() -> Generator[BuildTracker, None, None]:
  35. root = os.environ.get("PIP_BUILD_TRACKER")
  36. with contextlib.ExitStack() as ctx:
  37. if root is None:
  38. root = ctx.enter_context(TempDirectory(kind="build-tracker")).path
  39. ctx.enter_context(update_env_context_manager(PIP_BUILD_TRACKER=root))
  40. logger.debug("Initialized build tracking at %s", root)
  41. with BuildTracker(root) as tracker:
  42. yield tracker
  43. class TrackerId(str):
  44. """Uniquely identifying string provided to the build tracker."""
  45. class BuildTracker:
  46. """Ensure that an sdist cannot request itself as a setup requirement.
  47. When an sdist is prepared, it identifies its setup requirements in the
  48. context of ``BuildTracker.track()``. If a requirement shows up recursively, this
  49. raises an exception.
  50. This stops fork bombs embedded in malicious packages."""
  51. def __init__(self, root: str) -> None:
  52. self._root = root
  53. self._entries: dict[TrackerId, InstallRequirement] = {}
  54. logger.debug("Created build tracker: %s", self._root)
  55. def __enter__(self) -> BuildTracker:
  56. logger.debug("Entered build tracker: %s", self._root)
  57. return self
  58. def __exit__(
  59. self,
  60. exc_type: type[BaseException] | None,
  61. exc_val: BaseException | None,
  62. exc_tb: TracebackType | None,
  63. ) -> None:
  64. self.cleanup()
  65. def _entry_path(self, key: TrackerId) -> str:
  66. hashed = hashlib.sha224(key.encode()).hexdigest()
  67. return os.path.join(self._root, hashed)
  68. def add(self, req: InstallRequirement, key: TrackerId) -> None:
  69. """Add an InstallRequirement to build tracking."""
  70. # Get the file to write information about this requirement.
  71. entry_path = self._entry_path(key)
  72. # Try reading from the file. If it exists and can be read from, a build
  73. # is already in progress, so a LookupError is raised.
  74. try:
  75. with open(entry_path) as fp:
  76. contents = fp.read()
  77. except FileNotFoundError:
  78. pass
  79. else:
  80. message = f"{req.link} is already being built: {contents}"
  81. raise LookupError(message)
  82. # If we're here, req should really not be building already.
  83. assert key not in self._entries
  84. # Start tracking this requirement.
  85. with open(entry_path, "w", encoding="utf-8") as fp:
  86. fp.write(str(req))
  87. self._entries[key] = req
  88. logger.debug("Added %s to build tracker %r", req, self._root)
  89. def remove(self, req: InstallRequirement, key: TrackerId) -> None:
  90. """Remove an InstallRequirement from build tracking."""
  91. # Delete the created file and the corresponding entry.
  92. os.unlink(self._entry_path(key))
  93. del self._entries[key]
  94. logger.debug("Removed %s from build tracker %r", req, self._root)
  95. def cleanup(self) -> None:
  96. for key, req in list(self._entries.items()):
  97. self.remove(req, key)
  98. logger.debug("Removed build tracker: %r", self._root)
  99. @contextlib.contextmanager
  100. def track(self, req: InstallRequirement, key: str) -> Generator[None, None, None]:
  101. """Ensure that `key` cannot install itself as a setup requirement.
  102. :raises LookupError: If `key` was already provided in a parent invocation of
  103. the context introduced by this method."""
  104. tracker_id = TrackerId(key)
  105. self.add(req, tracker_id)
  106. yield
  107. self.remove(req, tracker_id)