util.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440
  1. """
  2. Utility functions for
  3. - building and importing modules on test time, using a temporary location
  4. - detecting if compilers are present
  5. - determining paths to tests
  6. """
  7. import glob
  8. import os
  9. import sys
  10. import subprocess
  11. import tempfile
  12. import shutil
  13. import atexit
  14. import textwrap
  15. import re
  16. import pytest
  17. import contextlib
  18. import numpy
  19. from pathlib import Path
  20. from numpy.compat import asstr
  21. from numpy._utils import asunicode
  22. from numpy.testing import temppath, IS_WASM
  23. from importlib import import_module
  24. #
  25. # Maintaining a temporary module directory
  26. #
  27. _module_dir = None
  28. _module_num = 5403
  29. if sys.platform == "cygwin":
  30. NUMPY_INSTALL_ROOT = Path(__file__).parent.parent.parent
  31. _module_list = list(NUMPY_INSTALL_ROOT.glob("**/*.dll"))
  32. def _cleanup():
  33. global _module_dir
  34. if _module_dir is not None:
  35. try:
  36. sys.path.remove(_module_dir)
  37. except ValueError:
  38. pass
  39. try:
  40. shutil.rmtree(_module_dir)
  41. except OSError:
  42. pass
  43. _module_dir = None
  44. def get_module_dir():
  45. global _module_dir
  46. if _module_dir is None:
  47. _module_dir = tempfile.mkdtemp()
  48. atexit.register(_cleanup)
  49. if _module_dir not in sys.path:
  50. sys.path.insert(0, _module_dir)
  51. return _module_dir
  52. def get_temp_module_name():
  53. # Assume single-threaded, and the module dir usable only by this thread
  54. global _module_num
  55. get_module_dir()
  56. name = "_test_ext_module_%d" % _module_num
  57. _module_num += 1
  58. if name in sys.modules:
  59. # this should not be possible, but check anyway
  60. raise RuntimeError("Temporary module name already in use.")
  61. return name
  62. def _memoize(func):
  63. memo = {}
  64. def wrapper(*a, **kw):
  65. key = repr((a, kw))
  66. if key not in memo:
  67. try:
  68. memo[key] = func(*a, **kw)
  69. except Exception as e:
  70. memo[key] = e
  71. raise
  72. ret = memo[key]
  73. if isinstance(ret, Exception):
  74. raise ret
  75. return ret
  76. wrapper.__name__ = func.__name__
  77. return wrapper
  78. #
  79. # Building modules
  80. #
  81. @_memoize
  82. def build_module(source_files, options=[], skip=[], only=[], module_name=None):
  83. """
  84. Compile and import a f2py module, built from the given files.
  85. """
  86. code = f"import sys; sys.path = {sys.path!r}; import numpy.f2py; numpy.f2py.main()"
  87. d = get_module_dir()
  88. # Copy files
  89. dst_sources = []
  90. f2py_sources = []
  91. for fn in source_files:
  92. if not os.path.isfile(fn):
  93. raise RuntimeError("%s is not a file" % fn)
  94. dst = os.path.join(d, os.path.basename(fn))
  95. shutil.copyfile(fn, dst)
  96. dst_sources.append(dst)
  97. base, ext = os.path.splitext(dst)
  98. if ext in (".f90", ".f", ".c", ".pyf"):
  99. f2py_sources.append(dst)
  100. assert f2py_sources
  101. # Prepare options
  102. if module_name is None:
  103. module_name = get_temp_module_name()
  104. f2py_opts = ["-c", "-m", module_name] + options + f2py_sources
  105. if skip:
  106. f2py_opts += ["skip:"] + skip
  107. if only:
  108. f2py_opts += ["only:"] + only
  109. # Build
  110. cwd = os.getcwd()
  111. try:
  112. os.chdir(d)
  113. cmd = [sys.executable, "-c", code] + f2py_opts
  114. p = subprocess.Popen(cmd,
  115. stdout=subprocess.PIPE,
  116. stderr=subprocess.STDOUT)
  117. out, err = p.communicate()
  118. if p.returncode != 0:
  119. raise RuntimeError("Running f2py failed: %s\n%s" %
  120. (cmd[4:], asunicode(out)))
  121. finally:
  122. os.chdir(cwd)
  123. # Partial cleanup
  124. for fn in dst_sources:
  125. os.unlink(fn)
  126. # Rebase (Cygwin-only)
  127. if sys.platform == "cygwin":
  128. # If someone starts deleting modules after import, this will
  129. # need to change to record how big each module is, rather than
  130. # relying on rebase being able to find that from the files.
  131. _module_list.extend(
  132. glob.glob(os.path.join(d, "{:s}*".format(module_name)))
  133. )
  134. subprocess.check_call(
  135. ["/usr/bin/rebase", "--database", "--oblivious", "--verbose"]
  136. + _module_list
  137. )
  138. # Import
  139. return import_module(module_name)
  140. @_memoize
  141. def build_code(source_code,
  142. options=[],
  143. skip=[],
  144. only=[],
  145. suffix=None,
  146. module_name=None):
  147. """
  148. Compile and import Fortran code using f2py.
  149. """
  150. if suffix is None:
  151. suffix = ".f"
  152. with temppath(suffix=suffix) as path:
  153. with open(path, "w") as f:
  154. f.write(source_code)
  155. return build_module([path],
  156. options=options,
  157. skip=skip,
  158. only=only,
  159. module_name=module_name)
  160. #
  161. # Check if compilers are available at all...
  162. #
  163. _compiler_status = None
  164. def _get_compiler_status():
  165. global _compiler_status
  166. if _compiler_status is not None:
  167. return _compiler_status
  168. _compiler_status = (False, False, False)
  169. if IS_WASM:
  170. # Can't run compiler from inside WASM.
  171. return _compiler_status
  172. # XXX: this is really ugly. But I don't know how to invoke Distutils
  173. # in a safer way...
  174. code = textwrap.dedent(f"""\
  175. import os
  176. import sys
  177. sys.path = {repr(sys.path)}
  178. def configuration(parent_name='',top_path=None):
  179. global config
  180. from numpy.distutils.misc_util import Configuration
  181. config = Configuration('', parent_name, top_path)
  182. return config
  183. from numpy.distutils.core import setup
  184. setup(configuration=configuration)
  185. config_cmd = config.get_config_cmd()
  186. have_c = config_cmd.try_compile('void foo() {{}}')
  187. print('COMPILERS:%%d,%%d,%%d' %% (have_c,
  188. config.have_f77c(),
  189. config.have_f90c()))
  190. sys.exit(99)
  191. """)
  192. code = code % dict(syspath=repr(sys.path))
  193. tmpdir = tempfile.mkdtemp()
  194. try:
  195. script = os.path.join(tmpdir, "setup.py")
  196. with open(script, "w") as f:
  197. f.write(code)
  198. cmd = [sys.executable, "setup.py", "config"]
  199. p = subprocess.Popen(cmd,
  200. stdout=subprocess.PIPE,
  201. stderr=subprocess.STDOUT,
  202. cwd=tmpdir)
  203. out, err = p.communicate()
  204. finally:
  205. shutil.rmtree(tmpdir)
  206. m = re.search(br"COMPILERS:(\d+),(\d+),(\d+)", out)
  207. if m:
  208. _compiler_status = (
  209. bool(int(m.group(1))),
  210. bool(int(m.group(2))),
  211. bool(int(m.group(3))),
  212. )
  213. # Finished
  214. return _compiler_status
  215. def has_c_compiler():
  216. return _get_compiler_status()[0]
  217. def has_f77_compiler():
  218. return _get_compiler_status()[1]
  219. def has_f90_compiler():
  220. return _get_compiler_status()[2]
  221. #
  222. # Building with distutils
  223. #
  224. @_memoize
  225. def build_module_distutils(source_files, config_code, module_name, **kw):
  226. """
  227. Build a module via distutils and import it.
  228. """
  229. d = get_module_dir()
  230. # Copy files
  231. dst_sources = []
  232. for fn in source_files:
  233. if not os.path.isfile(fn):
  234. raise RuntimeError("%s is not a file" % fn)
  235. dst = os.path.join(d, os.path.basename(fn))
  236. shutil.copyfile(fn, dst)
  237. dst_sources.append(dst)
  238. # Build script
  239. config_code = textwrap.dedent(config_code).replace("\n", "\n ")
  240. code = fr"""
  241. import os
  242. import sys
  243. sys.path = {repr(sys.path)}
  244. def configuration(parent_name='',top_path=None):
  245. from numpy.distutils.misc_util import Configuration
  246. config = Configuration('', parent_name, top_path)
  247. {config_code}
  248. return config
  249. if __name__ == "__main__":
  250. from numpy.distutils.core import setup
  251. setup(configuration=configuration)
  252. """
  253. script = os.path.join(d, get_temp_module_name() + ".py")
  254. dst_sources.append(script)
  255. with open(script, "wb") as f:
  256. f.write(code.encode('latin1'))
  257. # Build
  258. cwd = os.getcwd()
  259. try:
  260. os.chdir(d)
  261. cmd = [sys.executable, script, "build_ext", "-i"]
  262. p = subprocess.Popen(cmd,
  263. stdout=subprocess.PIPE,
  264. stderr=subprocess.STDOUT)
  265. out, err = p.communicate()
  266. if p.returncode != 0:
  267. raise RuntimeError("Running distutils build failed: %s\n%s" %
  268. (cmd[4:], asstr(out)))
  269. finally:
  270. os.chdir(cwd)
  271. # Partial cleanup
  272. for fn in dst_sources:
  273. os.unlink(fn)
  274. # Import
  275. __import__(module_name)
  276. return sys.modules[module_name]
  277. #
  278. # Unittest convenience
  279. #
  280. class F2PyTest:
  281. code = None
  282. sources = None
  283. options = []
  284. skip = []
  285. only = []
  286. suffix = ".f"
  287. module = None
  288. @property
  289. def module_name(self):
  290. cls = type(self)
  291. return f'_{cls.__module__.rsplit(".",1)[-1]}_{cls.__name__}_ext_module'
  292. def setup_method(self):
  293. if sys.platform == "win32":
  294. pytest.skip("Fails with MinGW64 Gfortran (Issue #9673)")
  295. if self.module is not None:
  296. return
  297. # Check compiler availability first
  298. if not has_c_compiler():
  299. pytest.skip("No C compiler available")
  300. codes = []
  301. if self.sources:
  302. codes.extend(self.sources)
  303. if self.code is not None:
  304. codes.append(self.suffix)
  305. needs_f77 = False
  306. needs_f90 = False
  307. needs_pyf = False
  308. for fn in codes:
  309. if str(fn).endswith(".f"):
  310. needs_f77 = True
  311. elif str(fn).endswith(".f90"):
  312. needs_f90 = True
  313. elif str(fn).endswith(".pyf"):
  314. needs_pyf = True
  315. if needs_f77 and not has_f77_compiler():
  316. pytest.skip("No Fortran 77 compiler available")
  317. if needs_f90 and not has_f90_compiler():
  318. pytest.skip("No Fortran 90 compiler available")
  319. if needs_pyf and not (has_f90_compiler() or has_f77_compiler()):
  320. pytest.skip("No Fortran compiler available")
  321. # Build the module
  322. if self.code is not None:
  323. self.module = build_code(
  324. self.code,
  325. options=self.options,
  326. skip=self.skip,
  327. only=self.only,
  328. suffix=self.suffix,
  329. module_name=self.module_name,
  330. )
  331. if self.sources is not None:
  332. self.module = build_module(
  333. self.sources,
  334. options=self.options,
  335. skip=self.skip,
  336. only=self.only,
  337. module_name=self.module_name,
  338. )
  339. #
  340. # Helper functions
  341. #
  342. def getpath(*a):
  343. # Package root
  344. d = Path(numpy.f2py.__file__).parent.resolve()
  345. return d.joinpath(*a)
  346. @contextlib.contextmanager
  347. def switchdir(path):
  348. curpath = Path.cwd()
  349. os.chdir(path)
  350. try:
  351. yield
  352. finally:
  353. os.chdir(curpath)