acquire.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. """Bootstrap."""
  2. from __future__ import annotations
  3. import logging
  4. import sys
  5. from operator import eq, lt
  6. from pathlib import Path
  7. from subprocess import PIPE, CalledProcessError, Popen
  8. from .bundle import from_bundle
  9. from .periodic_update import add_wheel_to_update_log
  10. from .util import Version, Wheel, discover_wheels
  11. LOGGER = logging.getLogger(__name__)
  12. def get_wheel( # noqa: PLR0913
  13. distribution,
  14. version,
  15. for_py_version,
  16. search_dirs,
  17. download,
  18. app_data,
  19. do_periodic_update,
  20. env,
  21. ):
  22. """Get a wheel with the given distribution-version-for_py_version trio, by using the extra search dir + download."""
  23. # not all wheels are compatible with all python versions, so we need to py version qualify it
  24. wheel = None
  25. if not download or version != Version.bundle:
  26. # 1. acquire from bundle
  27. wheel = from_bundle(distribution, version, for_py_version, search_dirs, app_data, do_periodic_update, env)
  28. if download and wheel is None and version != Version.embed:
  29. # 2. download from the internet
  30. wheel = download_wheel(
  31. distribution=distribution,
  32. version_spec=Version.as_version_spec(version),
  33. for_py_version=for_py_version,
  34. search_dirs=search_dirs,
  35. app_data=app_data,
  36. to_folder=app_data.house,
  37. env=env,
  38. )
  39. if wheel is not None and app_data.can_update:
  40. add_wheel_to_update_log(wheel, for_py_version, app_data)
  41. return wheel
  42. def download_wheel(distribution, version_spec, for_py_version, search_dirs, app_data, to_folder, env): # noqa: PLR0913
  43. to_download = f"{distribution}{version_spec or ''}"
  44. LOGGER.debug("download wheel %s %s to %s", to_download, for_py_version, to_folder)
  45. cmd = [
  46. sys.executable,
  47. "-m",
  48. "pip",
  49. "download",
  50. "--progress-bar",
  51. "off",
  52. "--disable-pip-version-check",
  53. "--only-binary=:all:",
  54. "--no-deps",
  55. "--python-version",
  56. for_py_version,
  57. "-d",
  58. str(to_folder),
  59. to_download,
  60. ]
  61. # pip has no interface in python - must be a new sub-process
  62. env = pip_wheel_env_run(search_dirs, app_data, env)
  63. process = Popen(cmd, env=env, stdout=PIPE, stderr=PIPE, universal_newlines=True, encoding="utf-8")
  64. out, err = process.communicate()
  65. if process.returncode != 0:
  66. kwargs = {"output": out, "stderr": err}
  67. raise CalledProcessError(process.returncode, cmd, **kwargs)
  68. result = _find_downloaded_wheel(distribution, version_spec, for_py_version, to_folder, out)
  69. LOGGER.debug("downloaded wheel %s", result.name)
  70. return result
  71. def _find_downloaded_wheel(distribution, version_spec, for_py_version, to_folder, out):
  72. for line in out.splitlines():
  73. stripped_line = line.lstrip()
  74. for marker in ("Saved ", "File was already downloaded "):
  75. if stripped_line.startswith(marker):
  76. return Wheel(Path(stripped_line[len(marker) :]).absolute())
  77. # if for some reason the output does not match fallback to the latest version with that spec
  78. return find_compatible_in_house(distribution, version_spec, for_py_version, to_folder)
  79. def find_compatible_in_house(distribution, version_spec, for_py_version, in_folder):
  80. wheels = discover_wheels(in_folder, distribution, None, for_py_version)
  81. start, end = 0, len(wheels)
  82. if version_spec is not None and version_spec:
  83. if version_spec.startswith("<"):
  84. from_pos, op = 1, lt
  85. elif version_spec.startswith("=="):
  86. from_pos, op = 2, eq
  87. else:
  88. raise ValueError(version_spec)
  89. version = Wheel.as_version_tuple(version_spec[from_pos:])
  90. start = next((at for at, w in enumerate(wheels) if op(w.version_tuple, version)), len(wheels))
  91. return None if start == end else wheels[start]
  92. def pip_wheel_env_run(search_dirs, app_data, env):
  93. env = env.copy()
  94. env.update({"PIP_USE_WHEEL": "1", "PIP_USER": "0", "PIP_NO_INPUT": "1", "PYTHONIOENCODING": "utf-8"})
  95. wheel = get_wheel(
  96. distribution="pip",
  97. version=None,
  98. for_py_version=f"{sys.version_info.major}.{sys.version_info.minor}",
  99. search_dirs=search_dirs,
  100. download=False,
  101. app_data=app_data,
  102. do_periodic_update=False,
  103. env=env,
  104. )
  105. if wheel is None:
  106. msg = "could not find the embedded pip"
  107. raise RuntimeError(msg)
  108. env["PYTHONPATH"] = str(wheel.path)
  109. return env
  110. __all__ = [
  111. "download_wheel",
  112. "get_wheel",
  113. "pip_wheel_env_run",
  114. ]