lock.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. """holds locking functionality that works across processes."""
  2. from __future__ import annotations
  3. import logging
  4. import os
  5. from abc import ABC, abstractmethod
  6. from contextlib import contextmanager, suppress
  7. from pathlib import Path
  8. from threading import Lock, RLock
  9. from filelock import FileLock, Timeout
  10. LOGGER = logging.getLogger(__name__)
  11. class _CountedFileLock(FileLock):
  12. def __init__(self, lock_file) -> None:
  13. parent = os.path.dirname(lock_file)
  14. with suppress(OSError):
  15. os.makedirs(parent, exist_ok=True)
  16. super().__init__(lock_file)
  17. self.count = 0
  18. self.thread_safe = RLock()
  19. def acquire(self, timeout=None, poll_interval=0.05):
  20. if not self.thread_safe.acquire(timeout=-1 if timeout is None else timeout):
  21. raise Timeout(self.lock_file)
  22. if self.count == 0:
  23. try:
  24. super().acquire(timeout, poll_interval)
  25. except BaseException:
  26. self.thread_safe.release()
  27. raise
  28. self.count += 1
  29. def release(self, force=False): # noqa: FBT002
  30. with self.thread_safe:
  31. if self.count > 0:
  32. if self.count == 1:
  33. super().release(force=force)
  34. self.count -= 1
  35. if self.count == 0:
  36. # if we have no more users of this lock, release the thread lock
  37. self.thread_safe.release()
  38. _lock_store = {}
  39. _store_lock = Lock()
  40. class PathLockBase(ABC):
  41. def __init__(self, folder) -> None:
  42. path = Path(folder)
  43. self.path = path.resolve() if path.exists() else path
  44. def __repr__(self) -> str:
  45. return f"{self.__class__.__name__}({self.path})"
  46. def __truediv__(self, other):
  47. return type(self)(self.path / other)
  48. @abstractmethod
  49. def __enter__(self):
  50. raise NotImplementedError
  51. @abstractmethod
  52. def __exit__(self, exc_type, exc_val, exc_tb):
  53. raise NotImplementedError
  54. @abstractmethod
  55. @contextmanager
  56. def lock_for_key(self, name, no_block=False): # noqa: FBT002
  57. raise NotImplementedError
  58. @abstractmethod
  59. @contextmanager
  60. def non_reentrant_lock_for_key(self, name):
  61. raise NotImplementedError
  62. class ReentrantFileLock(PathLockBase):
  63. def __init__(self, folder) -> None:
  64. super().__init__(folder)
  65. self._lock = None
  66. def _create_lock(self, name=""):
  67. lock_file = str(self.path / f"{name}.lock")
  68. with _store_lock:
  69. if lock_file not in _lock_store:
  70. _lock_store[lock_file] = _CountedFileLock(lock_file)
  71. return _lock_store[lock_file]
  72. @staticmethod
  73. def _del_lock(lock):
  74. if lock is not None:
  75. with _store_lock, lock.thread_safe:
  76. if lock.count == 0:
  77. _lock_store.pop(lock.lock_file, None)
  78. def __del__(self) -> None:
  79. self._del_lock(self._lock)
  80. def __enter__(self):
  81. self._lock = self._create_lock()
  82. self._lock_file(self._lock)
  83. def __exit__(self, exc_type, exc_val, exc_tb):
  84. self._release(self._lock)
  85. self._del_lock(self._lock)
  86. self._lock = None
  87. def _lock_file(self, lock, no_block=False): # noqa: FBT002
  88. # multiple processes might be trying to get a first lock... so we cannot check if this directory exist without
  89. # a lock, but that lock might then become expensive, and it's not clear where that lock should live.
  90. # Instead here we just ignore if we fail to create the directory.
  91. with suppress(OSError):
  92. os.makedirs(str(self.path), exist_ok=True)
  93. try:
  94. lock.acquire(0.0001)
  95. except Timeout:
  96. if no_block:
  97. raise
  98. LOGGER.debug("lock file %s present, will block until released", lock.lock_file)
  99. lock.release() # release the acquire try from above
  100. lock.acquire()
  101. @staticmethod
  102. def _release(lock):
  103. lock.release()
  104. @contextmanager
  105. def lock_for_key(self, name, no_block=False): # noqa: FBT002
  106. lock = self._create_lock(name)
  107. try:
  108. try:
  109. self._lock_file(lock, no_block)
  110. yield
  111. finally:
  112. self._release(lock)
  113. finally:
  114. self._del_lock(lock)
  115. lock = None
  116. @contextmanager
  117. def non_reentrant_lock_for_key(self, name):
  118. with _CountedFileLock(str(self.path / f"{name}.lock")):
  119. yield
  120. class NoOpFileLock(PathLockBase):
  121. def __enter__(self):
  122. raise NotImplementedError
  123. def __exit__(self, exc_type, exc_val, exc_tb):
  124. raise NotImplementedError
  125. @contextmanager
  126. def lock_for_key(self, name, no_block=False): # noqa: ARG002, FBT002
  127. yield
  128. @contextmanager
  129. def non_reentrant_lock_for_key(self, name): # noqa: ARG002
  130. yield
  131. __all__ = [
  132. "NoOpFileLock",
  133. "ReentrantFileLock",
  134. "Timeout",
  135. ]