helpers.py 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. """Reusable functions and classes for different types of integration tests.
  2. For example ``Archive`` can be used to check the contents of distribution built
  3. with setuptools, and ``run`` will always try to be as verbose as possible to
  4. facilitate debugging.
  5. """
  6. import os
  7. import subprocess
  8. import tarfile
  9. from pathlib import Path
  10. from zipfile import ZipFile
  11. def run(cmd, env=None):
  12. r = subprocess.run(
  13. cmd,
  14. capture_output=True,
  15. text=True,
  16. encoding="utf-8",
  17. env={**os.environ, **(env or {})},
  18. # ^-- allow overwriting instead of discarding the current env
  19. )
  20. out = r.stdout + "\n" + r.stderr
  21. # pytest omits stdout/err by default, if the test fails they help debugging
  22. print("~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~")
  23. print(f"Command: {cmd}\nreturn code: {r.returncode}\n\n{out}")
  24. if r.returncode == 0:
  25. return out
  26. raise subprocess.CalledProcessError(r.returncode, cmd, r.stdout, r.stderr)
  27. class Archive:
  28. """Compatibility layer for ZipFile/Info and TarFile/Info"""
  29. def __init__(self, filename):
  30. self._filename = filename
  31. if filename.endswith("tar.gz"):
  32. self._obj = tarfile.open(filename, "r:gz")
  33. elif filename.endswith("zip"):
  34. self._obj = ZipFile(filename)
  35. else:
  36. raise ValueError(f"{filename} doesn't seem to be a zip or tar.gz")
  37. def __iter__(self):
  38. if hasattr(self._obj, "infolist"):
  39. return iter(self._obj.infolist())
  40. return iter(self._obj)
  41. def get_name(self, zip_or_tar_info):
  42. if hasattr(zip_or_tar_info, "filename"):
  43. return zip_or_tar_info.filename
  44. return zip_or_tar_info.name
  45. def get_content(self, zip_or_tar_info):
  46. if hasattr(self._obj, "extractfile"):
  47. content = self._obj.extractfile(zip_or_tar_info)
  48. if content is None:
  49. msg = f"Invalid {zip_or_tar_info.name} in {self._filename}"
  50. raise ValueError(msg)
  51. return str(content.read(), "utf-8")
  52. return str(self._obj.read(zip_or_tar_info), "utf-8")
  53. def get_sdist_members(sdist_path):
  54. with tarfile.open(sdist_path, "r:gz") as tar:
  55. files = [Path(f) for f in tar.getnames()]
  56. # remove root folder
  57. relative_files = ("/".join(f.parts[1:]) for f in files)
  58. return {f for f in relative_files if f}
  59. def get_wheel_members(wheel_path):
  60. with ZipFile(wheel_path) as zipfile:
  61. return set(zipfile.namelist())