_windows.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. from __future__ import annotations
  2. import os
  3. import sys
  4. from contextlib import suppress
  5. from errno import EACCES
  6. from pathlib import Path
  7. from typing import cast
  8. from ._api import BaseFileLock
  9. from ._util import ensure_directory_exists, raise_on_not_writable_file
  10. if sys.platform == "win32": # pragma: win32 cover
  11. import ctypes
  12. import msvcrt
  13. from ctypes import wintypes
  14. # Windows API constants for reparse point detection
  15. FILE_ATTRIBUTE_REPARSE_POINT = 0x00000400
  16. INVALID_FILE_ATTRIBUTES = 0xFFFFFFFF
  17. # Load kernel32.dll
  18. _kernel32 = ctypes.WinDLL("kernel32", use_last_error=True)
  19. _kernel32.GetFileAttributesW.argtypes = [wintypes.LPCWSTR]
  20. _kernel32.GetFileAttributesW.restype = wintypes.DWORD
  21. def _is_reparse_point(path: str) -> bool:
  22. """
  23. Check if a path is a reparse point (symlink, junction, etc.) on Windows.
  24. :param path: Path to check
  25. :return: True if path is a reparse point, False otherwise
  26. :raises OSError: If GetFileAttributesW fails for reasons other than file-not-found
  27. """
  28. attrs = _kernel32.GetFileAttributesW(path)
  29. if attrs == INVALID_FILE_ATTRIBUTES:
  30. # File doesn't exist yet - that's fine, we'll create it
  31. err = ctypes.get_last_error()
  32. if err == 2: # noqa: PLR2004 # ERROR_FILE_NOT_FOUND
  33. return False
  34. if err == 3: # noqa: PLR2004 # ERROR_PATH_NOT_FOUND
  35. return False
  36. # Some other error - let caller handle it
  37. return False
  38. return bool(attrs & FILE_ATTRIBUTE_REPARSE_POINT)
  39. class WindowsFileLock(BaseFileLock):
  40. """Uses the :func:`msvcrt.locking` function to hard lock the lock file on Windows systems."""
  41. def _acquire(self) -> None:
  42. raise_on_not_writable_file(self.lock_file)
  43. ensure_directory_exists(self.lock_file)
  44. # Security check: Refuse to open reparse points (symlinks, junctions)
  45. # This prevents TOCTOU symlink attacks (CVE-TBD)
  46. if _is_reparse_point(self.lock_file):
  47. msg = f"Lock file is a reparse point (symlink/junction): {self.lock_file}"
  48. raise OSError(msg)
  49. flags = (
  50. os.O_RDWR # open for read and write
  51. | os.O_CREAT # create file if not exists
  52. | os.O_TRUNC # truncate file if not empty
  53. )
  54. try:
  55. fd = os.open(self.lock_file, flags, self._context.mode)
  56. except OSError as exception:
  57. if exception.errno != EACCES: # has no access to this lock
  58. raise
  59. else:
  60. try:
  61. msvcrt.locking(fd, msvcrt.LK_NBLCK, 1)
  62. except OSError as exception:
  63. os.close(fd) # close file first
  64. if exception.errno != EACCES: # file is already locked
  65. raise
  66. else:
  67. self._context.lock_file_fd = fd
  68. def _release(self) -> None:
  69. fd = cast("int", self._context.lock_file_fd)
  70. self._context.lock_file_fd = None
  71. msvcrt.locking(fd, msvcrt.LK_UNLCK, 1)
  72. os.close(fd)
  73. with suppress(OSError): # Probably another instance of the application hat acquired the file lock.
  74. Path(self.lock_file).unlink()
  75. else: # pragma: win32 no cover
  76. class WindowsFileLock(BaseFileLock):
  77. """Uses the :func:`msvcrt.locking` function to hard lock the lock file on Windows systems."""
  78. def _acquire(self) -> None:
  79. raise NotImplementedError
  80. def _release(self) -> None:
  81. raise NotImplementedError
  82. __all__ = [
  83. "WindowsFileLock",
  84. ]