test_windows.py 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946
  1. #!/usr/bin/env python3
  2. # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
  3. # Use of this source code is governed by a BSD-style license that can be
  4. # found in the LICENSE file.
  5. """Windows specific tests."""
  6. import ctypes
  7. import datetime
  8. import glob
  9. import os
  10. import platform
  11. import re
  12. import shutil
  13. import signal
  14. import subprocess
  15. import sys
  16. import time
  17. import warnings
  18. from unittest import mock
  19. import psutil
  20. from psutil import WINDOWS
  21. from psutil.tests import GITHUB_ACTIONS
  22. from psutil.tests import HAS_BATTERY
  23. from psutil.tests import IS_64BIT
  24. from psutil.tests import PYPY
  25. from psutil.tests import TOLERANCE_DISK_USAGE
  26. from psutil.tests import TOLERANCE_SYS_MEM
  27. from psutil.tests import PsutilTestCase
  28. from psutil.tests import pytest
  29. from psutil.tests import retry_on_failure
  30. from psutil.tests import sh
  31. from psutil.tests import spawn_subproc
  32. from psutil.tests import terminate
  33. if WINDOWS and not PYPY:
  34. with warnings.catch_warnings():
  35. warnings.simplefilter("ignore")
  36. import win32api # requires "pip install pywin32"
  37. import win32con
  38. import win32process
  39. import wmi # requires "pip install wmi" / "make install-pydeps-test"
  40. if WINDOWS:
  41. from psutil._pswindows import convert_oserror
  42. cext = psutil._psplatform.cext
  43. @pytest.mark.skipif(not WINDOWS, reason="WINDOWS only")
  44. @pytest.mark.skipif(PYPY, reason="pywin32 not available on PYPY")
  45. class WindowsTestCase(PsutilTestCase):
  46. pass
  47. def powershell(cmd):
  48. """Currently not used, but available just in case. Usage:
  49. >>> powershell(
  50. "Get-CIMInstance Win32_PageFileUsage | Select AllocatedBaseSize")
  51. """
  52. if not shutil.which("powershell.exe"):
  53. return pytest.skip("powershell.exe not available")
  54. cmdline = (
  55. "powershell.exe -ExecutionPolicy Bypass -NoLogo -NonInteractive "
  56. f"-NoProfile -WindowStyle Hidden -Command \"{cmd}\"" # noqa: Q003
  57. )
  58. return sh(cmdline)
  59. def wmic(path, what, converter=int):
  60. """Currently not used, but available just in case. Usage:
  61. >>> wmic("Win32_OperatingSystem", "FreePhysicalMemory")
  62. 2134124534
  63. """
  64. out = sh(f"wmic path {path} get {what}").strip()
  65. data = "".join(out.splitlines()[1:]).strip() # get rid of the header
  66. if converter is not None:
  67. if "," in what:
  68. return tuple(converter(x) for x in data.split())
  69. else:
  70. return converter(data)
  71. else:
  72. return data
  73. # ===================================================================
  74. # System APIs
  75. # ===================================================================
  76. class TestCpuAPIs(WindowsTestCase):
  77. @pytest.mark.skipif(
  78. 'NUMBER_OF_PROCESSORS' not in os.environ,
  79. reason="NUMBER_OF_PROCESSORS env var is not available",
  80. )
  81. def test_cpu_count_vs_NUMBER_OF_PROCESSORS(self):
  82. # Will likely fail on many-cores systems:
  83. # https://stackoverflow.com/questions/31209256
  84. num_cpus = int(os.environ['NUMBER_OF_PROCESSORS'])
  85. assert num_cpus == psutil.cpu_count()
  86. def test_cpu_count_vs_GetSystemInfo(self):
  87. # Will likely fail on many-cores systems:
  88. # https://stackoverflow.com/questions/31209256
  89. sys_value = win32api.GetSystemInfo()[5]
  90. psutil_value = psutil.cpu_count()
  91. assert sys_value == psutil_value
  92. def test_cpu_count_logical_vs_wmi(self):
  93. w = wmi.WMI()
  94. procs = sum(
  95. proc.NumberOfLogicalProcessors for proc in w.Win32_Processor()
  96. )
  97. assert psutil.cpu_count() == procs
  98. def test_cpu_count_cores_vs_wmi(self):
  99. w = wmi.WMI()
  100. cores = sum(proc.NumberOfCores for proc in w.Win32_Processor())
  101. assert psutil.cpu_count(logical=False) == cores
  102. def test_cpu_count_vs_cpu_times(self):
  103. assert psutil.cpu_count() == len(psutil.cpu_times(percpu=True))
  104. def test_cpu_freq(self):
  105. w = wmi.WMI()
  106. proc = w.Win32_Processor()[0]
  107. assert proc.CurrentClockSpeed == psutil.cpu_freq().current
  108. assert proc.MaxClockSpeed == psutil.cpu_freq().max
  109. class TestSystemAPIs(WindowsTestCase):
  110. def test_nic_names(self):
  111. out = sh('ipconfig /all')
  112. nics = psutil.net_io_counters(pernic=True).keys()
  113. for nic in nics:
  114. if "pseudo-interface" in nic.replace(' ', '-').lower():
  115. continue
  116. if nic not in out:
  117. return pytest.fail(
  118. f"{nic!r} nic wasn't found in 'ipconfig /all' output"
  119. )
  120. def test_total_phymem(self):
  121. w = wmi.WMI().Win32_ComputerSystem()[0]
  122. assert int(w.TotalPhysicalMemory) == psutil.virtual_memory().total
  123. def test_free_phymem(self):
  124. w = wmi.WMI().Win32_PerfRawData_PerfOS_Memory()[0]
  125. assert (
  126. abs(int(w.AvailableBytes) - psutil.virtual_memory().free)
  127. < TOLERANCE_SYS_MEM
  128. )
  129. def test_total_swapmem(self):
  130. w = wmi.WMI().Win32_PerfRawData_PerfOS_Memory()[0]
  131. assert (
  132. int(w.CommitLimit) - psutil.virtual_memory().total
  133. == psutil.swap_memory().total
  134. )
  135. if psutil.swap_memory().total == 0:
  136. assert psutil.swap_memory().free == 0
  137. assert psutil.swap_memory().used == 0
  138. def test_percent_swapmem(self):
  139. if psutil.swap_memory().total > 0:
  140. w = wmi.WMI().Win32_PerfRawData_PerfOS_PagingFile(Name="_Total")[0]
  141. # calculate swap usage to percent
  142. percentSwap = int(w.PercentUsage) * 100 / int(w.PercentUsage_Base)
  143. # exact percent may change but should be reasonable
  144. # assert within +/- 5% and between 0 and 100%
  145. assert psutil.swap_memory().percent >= 0
  146. assert abs(psutil.swap_memory().percent - percentSwap) < 5
  147. assert psutil.swap_memory().percent <= 100
  148. # @pytest.mark.skipif(wmi is None, reason="wmi module is not installed")
  149. # def test__UPTIME(self):
  150. # # _UPTIME constant is not public but it is used internally
  151. # # as value to return for pid 0 creation time.
  152. # # WMI behaves the same.
  153. # w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
  154. # p = psutil.Process(0)
  155. # wmic_create = str(w.CreationDate.split('.')[0])
  156. # psutil_create = time.strftime("%Y%m%d%H%M%S",
  157. # time.localtime(p.create_time()))
  158. # Note: this test is not very reliable
  159. @retry_on_failure()
  160. def test_pids(self):
  161. # Note: this test might fail if the OS is starting/killing
  162. # other processes in the meantime
  163. w = wmi.WMI().Win32_Process()
  164. wmi_pids = {x.ProcessId for x in w}
  165. psutil_pids = set(psutil.pids())
  166. assert wmi_pids == psutil_pids
  167. @retry_on_failure()
  168. def test_disks(self):
  169. ps_parts = psutil.disk_partitions(all=True)
  170. wmi_parts = wmi.WMI().Win32_LogicalDisk()
  171. for ps_part in ps_parts:
  172. for wmi_part in wmi_parts:
  173. if ps_part.device.replace('\\', '') == wmi_part.DeviceID:
  174. if not ps_part.mountpoint:
  175. # this is usually a CD-ROM with no disk inserted
  176. break
  177. if 'cdrom' in ps_part.opts:
  178. break
  179. if ps_part.mountpoint.startswith('A:'):
  180. break # floppy
  181. try:
  182. usage = psutil.disk_usage(ps_part.mountpoint)
  183. except FileNotFoundError:
  184. # usually this is the floppy
  185. break
  186. assert usage.total == int(wmi_part.Size)
  187. wmi_free = int(wmi_part.FreeSpace)
  188. assert usage.free == wmi_free
  189. # 10 MB tolerance
  190. if abs(usage.free - wmi_free) > 10 * 1024 * 1024:
  191. return pytest.fail(
  192. f"psutil={usage.free}, wmi={wmi_free}"
  193. )
  194. break
  195. else:
  196. return pytest.fail(f"can't find partition {ps_part!r}")
  197. @retry_on_failure()
  198. def test_disk_usage(self):
  199. for disk in psutil.disk_partitions():
  200. if 'cdrom' in disk.opts:
  201. continue
  202. sys_value = win32api.GetDiskFreeSpaceEx(disk.mountpoint)
  203. psutil_value = psutil.disk_usage(disk.mountpoint)
  204. assert abs(sys_value[0] - psutil_value.free) < TOLERANCE_DISK_USAGE
  205. assert (
  206. abs(sys_value[1] - psutil_value.total) < TOLERANCE_DISK_USAGE
  207. )
  208. assert psutil_value.used == psutil_value.total - psutil_value.free
  209. def test_disk_partitions(self):
  210. sys_value = [
  211. x + '\\'
  212. for x in win32api.GetLogicalDriveStrings().split("\\\x00")
  213. if x and not x.startswith('A:')
  214. ]
  215. psutil_value = [
  216. x.mountpoint
  217. for x in psutil.disk_partitions(all=True)
  218. if not x.mountpoint.startswith('A:')
  219. ]
  220. assert sys_value == psutil_value
  221. def test_convert_dos_path_drive(self):
  222. winpath = 'C:\\Windows\\Temp'
  223. driveletter = 'C:'
  224. # Mocked NT device path for C:
  225. devicepath = '\\Device\\HarddiskVolume1'
  226. # Path returned by RtlDosPathNameToNtPathName
  227. ntpath1 = '\\??\\C:\\Windows\\Temp'
  228. # Mocked normalized NT path
  229. ntpath2 = '\\Device\\HarddiskVolume1\\Windows\\Temp'
  230. devices = {devicepath: driveletter}
  231. with mock.patch(
  232. 'psutil._pswindows.cext.QueryDosDevice', side_effect=devices.get
  233. ) as m:
  234. assert psutil._pswindows.convert_dos_path(ntpath1) == winpath
  235. assert psutil._pswindows.convert_dos_path(ntpath2) == winpath
  236. assert m.called
  237. def test_convert_dos_path_unc(self):
  238. # UNC path
  239. winpath = '\\\\localhost\\C$\\Windows\\Temp'
  240. # Path returned by RtlDosPathNameToNtPathName
  241. ntpath1 = '\\??\\UNC\\localhost\\C$\\Windows\\Temp'
  242. # Normalized NT path
  243. ntpath2 = '\\Device\\Mup\\localhost\\C$\\Windows\\Temp'
  244. assert psutil._pswindows.convert_dos_path(winpath) == winpath
  245. assert psutil._pswindows.convert_dos_path(ntpath1) == winpath
  246. assert psutil._pswindows.convert_dos_path(ntpath2) == winpath
  247. def test_net_if_stats(self):
  248. ps_names = set(cext.net_if_stats())
  249. wmi_adapters = wmi.WMI().Win32_NetworkAdapter()
  250. wmi_names = set()
  251. for wmi_adapter in wmi_adapters:
  252. wmi_names.add(wmi_adapter.Name)
  253. wmi_names.add(wmi_adapter.NetConnectionID)
  254. assert (
  255. ps_names & wmi_names
  256. ), f"no common entries in {ps_names}, {wmi_names}"
  257. def test_boot_time(self):
  258. wmi_os = wmi.WMI().Win32_OperatingSystem()
  259. wmi_btime_str = wmi_os[0].LastBootUpTime.split('.')[0]
  260. wmi_btime_dt = datetime.datetime.strptime(
  261. wmi_btime_str, "%Y%m%d%H%M%S"
  262. )
  263. psutil_dt = datetime.datetime.fromtimestamp(psutil.boot_time())
  264. diff = abs((wmi_btime_dt - psutil_dt).total_seconds())
  265. assert diff <= 5, (psutil_dt, wmi_btime_dt)
  266. def test_uptime(self):
  267. # ...against GetTickCount64() (Windows < 7, does not include
  268. # time spent during suspend / hybernate).
  269. ms = ctypes.windll.kernel32.GetTickCount64()
  270. secs = ms / 1000.0
  271. assert abs(cext.uptime() - secs) < 0.5
  272. # ===================================================================
  273. # sensors_battery()
  274. # ===================================================================
  275. class TestSensorsBattery(WindowsTestCase):
  276. def test_has_battery(self):
  277. if win32api.GetPwrCapabilities()['SystemBatteriesPresent']:
  278. assert psutil.sensors_battery() is not None
  279. else:
  280. assert psutil.sensors_battery() is None
  281. @pytest.mark.skipif(not HAS_BATTERY, reason="no battery")
  282. def test_percent(self):
  283. w = wmi.WMI()
  284. battery_wmi = w.query('select * from Win32_Battery')[0]
  285. battery_psutil = psutil.sensors_battery()
  286. assert (
  287. abs(battery_psutil.percent - battery_wmi.EstimatedChargeRemaining)
  288. < 1
  289. )
  290. @pytest.mark.skipif(not HAS_BATTERY, reason="no battery")
  291. def test_power_plugged(self):
  292. w = wmi.WMI()
  293. battery_wmi = w.query('select * from Win32_Battery')[0]
  294. battery_psutil = psutil.sensors_battery()
  295. # Status codes:
  296. # https://msdn.microsoft.com/en-us/library/aa394074(v=vs.85).aspx
  297. assert battery_psutil.power_plugged == (battery_wmi.BatteryStatus == 2)
  298. def test_emulate_no_battery(self):
  299. with mock.patch(
  300. "psutil._pswindows.cext.sensors_battery",
  301. return_value=(0, 128, 0, 0),
  302. ) as m:
  303. assert psutil.sensors_battery() is None
  304. assert m.called
  305. def test_emulate_power_connected(self):
  306. with mock.patch(
  307. "psutil._pswindows.cext.sensors_battery", return_value=(1, 0, 0, 0)
  308. ) as m:
  309. assert (
  310. psutil.sensors_battery().secsleft
  311. == psutil.POWER_TIME_UNLIMITED
  312. )
  313. assert m.called
  314. def test_emulate_power_charging(self):
  315. with mock.patch(
  316. "psutil._pswindows.cext.sensors_battery", return_value=(0, 8, 0, 0)
  317. ) as m:
  318. assert (
  319. psutil.sensors_battery().secsleft
  320. == psutil.POWER_TIME_UNLIMITED
  321. )
  322. assert m.called
  323. def test_emulate_secs_left_unknown(self):
  324. with mock.patch(
  325. "psutil._pswindows.cext.sensors_battery",
  326. return_value=(0, 0, 0, -1),
  327. ) as m:
  328. assert (
  329. psutil.sensors_battery().secsleft == psutil.POWER_TIME_UNKNOWN
  330. )
  331. assert m.called
  332. # ===================================================================
  333. # Process APIs
  334. # ===================================================================
  335. class TestProcess(WindowsTestCase):
  336. @classmethod
  337. def setUpClass(cls):
  338. cls.pid = spawn_subproc().pid
  339. @classmethod
  340. def tearDownClass(cls):
  341. terminate(cls.pid)
  342. def test_issue_24(self):
  343. p = psutil.Process(0)
  344. with pytest.raises(psutil.AccessDenied):
  345. p.kill()
  346. def test_special_pid(self):
  347. p = psutil.Process(4)
  348. assert p.name() == 'System'
  349. # use __str__ to access all common Process properties to check
  350. # that nothing strange happens
  351. str(p)
  352. p.username()
  353. assert p.create_time() >= 0.0
  354. try:
  355. rss, _vms = p.memory_info()[:2]
  356. except psutil.AccessDenied:
  357. # expected on Windows Vista and Windows 7
  358. if platform.uname()[1] not in {'vista', 'win-7', 'win7'}:
  359. raise
  360. else:
  361. assert rss > 0
  362. def test_send_signal(self):
  363. p = psutil.Process(self.pid)
  364. with pytest.raises(ValueError):
  365. p.send_signal(signal.SIGINT)
  366. def test_num_handles_increment(self):
  367. p = psutil.Process(os.getpid())
  368. before = p.num_handles()
  369. handle = win32api.OpenProcess(
  370. win32con.PROCESS_QUERY_INFORMATION, win32con.FALSE, os.getpid()
  371. )
  372. after = p.num_handles()
  373. assert after == before + 1
  374. win32api.CloseHandle(handle)
  375. assert p.num_handles() == before
  376. def test_ctrl_signals(self):
  377. p = psutil.Process(self.spawn_subproc().pid)
  378. p.send_signal(signal.CTRL_C_EVENT)
  379. p.send_signal(signal.CTRL_BREAK_EVENT)
  380. p.kill()
  381. p.wait()
  382. with pytest.raises(psutil.NoSuchProcess):
  383. p.send_signal(signal.CTRL_C_EVENT)
  384. with pytest.raises(psutil.NoSuchProcess):
  385. p.send_signal(signal.CTRL_BREAK_EVENT)
  386. def test_username(self):
  387. name = win32api.GetUserNameEx(win32con.NameSamCompatible)
  388. if name.endswith('$'):
  389. # When running as a service account (most likely to be
  390. # NetworkService), these user name calculations don't produce the
  391. # same result, causing the test to fail.
  392. return pytest.skip('running as service account')
  393. assert psutil.Process().username() == name
  394. def test_cmdline(self):
  395. sys_value = re.sub(r"[ ]+", " ", win32api.GetCommandLine()).strip()
  396. psutil_value = ' '.join(psutil.Process().cmdline())
  397. # The PyWin32 command line may retain quotes around argv[0] if they
  398. # were used unnecessarily, while psutil will omit them. So remove
  399. # the first 2 quotes from sys_value if not in psutil_value.
  400. # A path to an executable will not contain quotes, so this is safe.
  401. sys_value = sys_value.replace('"', "")
  402. psutil_value = psutil_value.replace('"', "")
  403. assert sys_value == psutil_value
  404. # XXX - occasional failures
  405. # def test_cpu_times(self):
  406. # handle = win32api.OpenProcess(
  407. # win32con.PROCESS_QUERY_INFORMATION, win32con.FALSE, os.getpid()
  408. # )
  409. # self.addCleanup(win32api.CloseHandle, handle)
  410. # a = psutil.Process().cpu_times()
  411. # b = win32process.GetProcessTimes(handle)
  412. # assert abs(a.user - b['UserTime'] / 10000000.0) < 0.2
  413. # assert abs(a.user - b['KernelTime'] / 10000000.0) < 0.2
  414. def test_nice(self):
  415. handle = win32api.OpenProcess(
  416. win32con.PROCESS_QUERY_INFORMATION, win32con.FALSE, os.getpid()
  417. )
  418. self.addCleanup(win32api.CloseHandle, handle)
  419. sys_value = win32process.GetPriorityClass(handle)
  420. psutil_value = psutil.Process().nice()
  421. assert psutil_value == sys_value
  422. def test_memory_info(self):
  423. handle = win32api.OpenProcess(
  424. win32con.PROCESS_QUERY_INFORMATION, win32con.FALSE, self.pid
  425. )
  426. self.addCleanup(win32api.CloseHandle, handle)
  427. sys_value = win32process.GetProcessMemoryInfo(handle)
  428. psutil_value = psutil.Process(self.pid).memory_info()
  429. assert sys_value['PeakWorkingSetSize'] == psutil_value.peak_wset
  430. assert sys_value['WorkingSetSize'] == psutil_value.wset
  431. assert (
  432. sys_value['QuotaPeakPagedPoolUsage']
  433. == psutil_value.peak_paged_pool
  434. )
  435. assert sys_value['QuotaPagedPoolUsage'] == psutil_value.paged_pool
  436. assert (
  437. sys_value['QuotaPeakNonPagedPoolUsage']
  438. == psutil_value.peak_nonpaged_pool
  439. )
  440. assert (
  441. sys_value['QuotaNonPagedPoolUsage'] == psutil_value.nonpaged_pool
  442. )
  443. assert sys_value['PagefileUsage'] == psutil_value.pagefile
  444. assert sys_value['PeakPagefileUsage'] == psutil_value.peak_pagefile
  445. assert psutil_value.rss == psutil_value.wset
  446. assert psutil_value.vms == psutil_value.pagefile
  447. def test_wait(self):
  448. handle = win32api.OpenProcess(
  449. win32con.PROCESS_QUERY_INFORMATION, win32con.FALSE, self.pid
  450. )
  451. self.addCleanup(win32api.CloseHandle, handle)
  452. p = psutil.Process(self.pid)
  453. p.terminate()
  454. psutil_value = p.wait()
  455. sys_value = win32process.GetExitCodeProcess(handle)
  456. assert psutil_value == sys_value
  457. def test_cpu_affinity(self):
  458. def from_bitmask(x):
  459. return [i for i in range(64) if (1 << i) & x]
  460. handle = win32api.OpenProcess(
  461. win32con.PROCESS_QUERY_INFORMATION, win32con.FALSE, self.pid
  462. )
  463. self.addCleanup(win32api.CloseHandle, handle)
  464. sys_value = from_bitmask(
  465. win32process.GetProcessAffinityMask(handle)[0]
  466. )
  467. psutil_value = psutil.Process(self.pid).cpu_affinity()
  468. assert psutil_value == sys_value
  469. def test_io_counters(self):
  470. handle = win32api.OpenProcess(
  471. win32con.PROCESS_QUERY_INFORMATION, win32con.FALSE, os.getpid()
  472. )
  473. self.addCleanup(win32api.CloseHandle, handle)
  474. sys_value = win32process.GetProcessIoCounters(handle)
  475. psutil_value = psutil.Process().io_counters()
  476. assert psutil_value.read_count == sys_value['ReadOperationCount']
  477. assert psutil_value.write_count == sys_value['WriteOperationCount']
  478. assert psutil_value.read_bytes == sys_value['ReadTransferCount']
  479. assert psutil_value.write_bytes == sys_value['WriteTransferCount']
  480. assert psutil_value.other_count == sys_value['OtherOperationCount']
  481. assert psutil_value.other_bytes == sys_value['OtherTransferCount']
  482. def test_num_handles(self):
  483. import ctypes
  484. import ctypes.wintypes
  485. PROCESS_QUERY_INFORMATION = 0x400
  486. handle = ctypes.windll.kernel32.OpenProcess(
  487. PROCESS_QUERY_INFORMATION, 0, self.pid
  488. )
  489. self.addCleanup(ctypes.windll.kernel32.CloseHandle, handle)
  490. hndcnt = ctypes.wintypes.DWORD()
  491. ctypes.windll.kernel32.GetProcessHandleCount(
  492. handle, ctypes.byref(hndcnt)
  493. )
  494. sys_value = hndcnt.value
  495. psutil_value = psutil.Process(self.pid).num_handles()
  496. assert psutil_value == sys_value
  497. def test_error_partial_copy(self):
  498. # https://github.com/giampaolo/psutil/issues/875
  499. exc = OSError()
  500. exc.winerror = 299
  501. with mock.patch("psutil._psplatform.cext.proc_cwd", side_effect=exc):
  502. with mock.patch("time.sleep") as m:
  503. p = psutil.Process()
  504. with pytest.raises(psutil.AccessDenied):
  505. p.cwd()
  506. assert m.call_count >= 5
  507. def test_exe(self):
  508. # NtQuerySystemInformation succeeds if process is gone. Make sure
  509. # it raises NSP for a non existent pid.
  510. pid = psutil.pids()[-1] + 99999
  511. proc = psutil._psplatform.Process(pid)
  512. with pytest.raises(psutil.NoSuchProcess):
  513. proc.exe()
  514. class TestProcessWMI(WindowsTestCase):
  515. """Compare Process API results with WMI."""
  516. @classmethod
  517. def setUpClass(cls):
  518. cls.pid = spawn_subproc().pid
  519. @classmethod
  520. def tearDownClass(cls):
  521. terminate(cls.pid)
  522. def test_name(self):
  523. w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
  524. p = psutil.Process(self.pid)
  525. assert p.name() == w.Caption
  526. # This fail on github because using virtualenv for test environment
  527. @pytest.mark.skipif(
  528. GITHUB_ACTIONS, reason="unreliable path on GITHUB_ACTIONS"
  529. )
  530. def test_exe(self):
  531. w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
  532. p = psutil.Process(self.pid)
  533. # Note: wmi reports the exe as a lower case string.
  534. # Being Windows paths case-insensitive we ignore that.
  535. assert p.exe().lower() == w.ExecutablePath.lower()
  536. def test_cmdline(self):
  537. w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
  538. p = psutil.Process(self.pid)
  539. assert ' '.join(p.cmdline()) == w.CommandLine.replace('"', '')
  540. def test_username(self):
  541. w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
  542. p = psutil.Process(self.pid)
  543. domain, _, username = w.GetOwner()
  544. username = f"{domain}\\{username}"
  545. assert p.username() == username
  546. @retry_on_failure()
  547. def test_memory_rss(self):
  548. w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
  549. p = psutil.Process(self.pid)
  550. rss = p.memory_info().rss
  551. assert rss == int(w.WorkingSetSize)
  552. @retry_on_failure()
  553. def test_memory_vms(self):
  554. w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
  555. p = psutil.Process(self.pid)
  556. vms = p.memory_info().vms
  557. # http://msdn.microsoft.com/en-us/library/aa394372(VS.85).aspx
  558. # ...claims that PageFileUsage is represented in Kilo
  559. # bytes but funnily enough on certain platforms bytes are
  560. # returned instead.
  561. wmi_usage = int(w.PageFileUsage)
  562. if vms not in {wmi_usage, wmi_usage * 1024}:
  563. return pytest.fail(f"wmi={wmi_usage}, psutil={vms}")
  564. def test_create_time(self):
  565. w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
  566. p = psutil.Process(self.pid)
  567. wmic_create = str(w.CreationDate.split('.')[0])
  568. psutil_create = time.strftime(
  569. "%Y%m%d%H%M%S", time.localtime(p.create_time())
  570. )
  571. assert wmic_create == psutil_create
  572. # ---
  573. @pytest.mark.skipif(not WINDOWS, reason="WINDOWS only")
  574. class TestDualProcessImplementation(PsutilTestCase):
  575. """Certain APIs on Windows have 2 internal implementations, one
  576. based on documented Windows APIs, another one based
  577. NtQuerySystemInformation() which gets called as fallback in
  578. case the first fails because of limited permission error.
  579. Here we test that the two methods return the exact same value,
  580. see:
  581. https://github.com/giampaolo/psutil/issues/304.
  582. """
  583. @classmethod
  584. def setUpClass(cls):
  585. cls.pid = spawn_subproc().pid
  586. @classmethod
  587. def tearDownClass(cls):
  588. terminate(cls.pid)
  589. def test_memory_info(self):
  590. mem_1 = psutil.Process(self.pid).memory_info()
  591. with mock.patch(
  592. "psutil._psplatform.cext.proc_memory_info",
  593. side_effect=PermissionError,
  594. ) as fun:
  595. mem_2 = psutil.Process(self.pid).memory_info()
  596. assert len(mem_1) == len(mem_2)
  597. for i in range(len(mem_1)):
  598. assert mem_1[i] >= 0
  599. assert mem_2[i] >= 0
  600. assert abs(mem_1[i] - mem_2[i]) < 512
  601. assert fun.called
  602. def test_create_time(self):
  603. ctime = psutil.Process(self.pid).create_time()
  604. with mock.patch(
  605. "psutil._psplatform.cext.proc_times",
  606. side_effect=PermissionError,
  607. ) as fun:
  608. assert psutil.Process(self.pid).create_time() == ctime
  609. assert fun.called
  610. def test_cpu_times(self):
  611. cpu_times_1 = psutil.Process(self.pid).cpu_times()
  612. with mock.patch(
  613. "psutil._psplatform.cext.proc_times",
  614. side_effect=PermissionError,
  615. ) as fun:
  616. cpu_times_2 = psutil.Process(self.pid).cpu_times()
  617. assert fun.called
  618. assert abs(cpu_times_1.user - cpu_times_2.user) < 0.01
  619. assert abs(cpu_times_1.system - cpu_times_2.system) < 0.01
  620. def test_io_counters(self):
  621. io_counters_1 = psutil.Process(self.pid).io_counters()
  622. with mock.patch(
  623. "psutil._psplatform.cext.proc_io_counters",
  624. side_effect=PermissionError,
  625. ) as fun:
  626. io_counters_2 = psutil.Process(self.pid).io_counters()
  627. for i in range(len(io_counters_1)):
  628. assert abs(io_counters_1[i] - io_counters_2[i]) < 5
  629. assert fun.called
  630. def test_num_handles(self):
  631. num_handles = psutil.Process(self.pid).num_handles()
  632. with mock.patch(
  633. "psutil._psplatform.cext.proc_num_handles",
  634. side_effect=PermissionError,
  635. ) as fun:
  636. assert psutil.Process(self.pid).num_handles() == num_handles
  637. assert fun.called
  638. def test_cmdline(self):
  639. for pid in psutil.pids():
  640. try:
  641. a = cext.proc_cmdline(pid, use_peb=True)
  642. b = cext.proc_cmdline(pid, use_peb=False)
  643. except OSError as err:
  644. err = convert_oserror(err)
  645. if not isinstance(
  646. err, (psutil.AccessDenied, psutil.NoSuchProcess)
  647. ):
  648. raise
  649. else:
  650. assert a == b
  651. @pytest.mark.skipif(not WINDOWS, reason="WINDOWS only")
  652. class RemoteProcessTestCase(PsutilTestCase):
  653. """Certain functions require calling ReadProcessMemory.
  654. This trivially works when called on the current process.
  655. Check that this works on other processes, especially when they
  656. have a different bitness.
  657. """
  658. @staticmethod
  659. def find_other_interpreter():
  660. # find a python interpreter that is of the opposite bitness from us
  661. code = "import sys; sys.stdout.write(str(sys.maxsize > 2**32))"
  662. # XXX: a different and probably more stable approach might be to access
  663. # the registry but accessing 64 bit paths from a 32 bit process
  664. for filename in glob.glob(r"C:\Python*\python.exe"):
  665. proc = subprocess.Popen(
  666. args=[filename, "-c", code],
  667. stdout=subprocess.PIPE,
  668. stderr=subprocess.STDOUT,
  669. )
  670. output, _ = proc.communicate()
  671. proc.wait()
  672. if output == str(not IS_64BIT):
  673. return filename
  674. test_args = ["-c", "import sys; sys.stdin.read()"]
  675. def setUp(self):
  676. super().setUp()
  677. other_python = self.find_other_interpreter()
  678. if other_python is None:
  679. return pytest.skip(
  680. "could not find interpreter with opposite bitness"
  681. )
  682. if IS_64BIT:
  683. self.python64 = sys.executable
  684. self.python32 = other_python
  685. else:
  686. self.python64 = other_python
  687. self.python32 = sys.executable
  688. env = os.environ.copy()
  689. env["THINK_OF_A_NUMBER"] = str(os.getpid())
  690. self.proc32 = self.spawn_subproc(
  691. [self.python32] + self.test_args, env=env, stdin=subprocess.PIPE
  692. )
  693. self.proc64 = self.spawn_subproc(
  694. [self.python64] + self.test_args, env=env, stdin=subprocess.PIPE
  695. )
  696. def tearDown(self):
  697. super().tearDown()
  698. self.proc32.communicate()
  699. self.proc64.communicate()
  700. def test_cmdline_32(self):
  701. p = psutil.Process(self.proc32.pid)
  702. assert len(p.cmdline()) == 3
  703. assert p.cmdline()[1:] == self.test_args
  704. def test_cmdline_64(self):
  705. p = psutil.Process(self.proc64.pid)
  706. assert len(p.cmdline()) == 3
  707. assert p.cmdline()[1:] == self.test_args
  708. def test_cwd_32(self):
  709. p = psutil.Process(self.proc32.pid)
  710. assert p.cwd() == os.getcwd()
  711. def test_cwd_64(self):
  712. p = psutil.Process(self.proc64.pid)
  713. assert p.cwd() == os.getcwd()
  714. def test_environ_32(self):
  715. p = psutil.Process(self.proc32.pid)
  716. e = p.environ()
  717. assert "THINK_OF_A_NUMBER" in e
  718. assert e["THINK_OF_A_NUMBER"] == str(os.getpid())
  719. def test_environ_64(self):
  720. p = psutil.Process(self.proc64.pid)
  721. try:
  722. p.environ()
  723. except psutil.AccessDenied:
  724. pass
  725. # ===================================================================
  726. # Windows services
  727. # ===================================================================
  728. @pytest.mark.skipif(not WINDOWS, reason="WINDOWS only")
  729. class TestServices(PsutilTestCase):
  730. def test_win_service_iter(self):
  731. valid_statuses = {
  732. "running",
  733. "paused",
  734. "start",
  735. "pause",
  736. "continue",
  737. "stop",
  738. "stopped",
  739. }
  740. valid_start_types = {"automatic", "manual", "disabled"}
  741. valid_statuses = {
  742. "running",
  743. "paused",
  744. "start_pending",
  745. "pause_pending",
  746. "continue_pending",
  747. "stop_pending",
  748. "stopped",
  749. }
  750. for serv in psutil.win_service_iter():
  751. if serv.name() == "WaaSMedicSvc":
  752. # known issue in Windows 11 reading the description
  753. # https://learn.microsoft.com/en-us/answers/questions/1320388/in-windows-11-version-22h2-there-it-shows-(failed
  754. # https://github.com/giampaolo/psutil/issues/2383
  755. continue
  756. data = serv.as_dict()
  757. assert isinstance(data['name'], str)
  758. assert data['name'].strip()
  759. assert isinstance(data['display_name'], str)
  760. assert isinstance(data['username'], str)
  761. assert data['status'] in valid_statuses
  762. if data['pid'] is not None:
  763. psutil.Process(data['pid'])
  764. assert isinstance(data['binpath'], str)
  765. assert isinstance(data['username'], str)
  766. assert isinstance(data['start_type'], str)
  767. assert data['start_type'] in valid_start_types
  768. assert data['status'] in valid_statuses
  769. assert isinstance(data['description'], str)
  770. pid = serv.pid()
  771. if pid is not None:
  772. p = psutil.Process(pid)
  773. assert p.is_running()
  774. # win_service_get
  775. s = psutil.win_service_get(serv.name())
  776. # test __eq__
  777. assert serv == s
  778. def test_win_service_get(self):
  779. ERROR_SERVICE_DOES_NOT_EXIST = (
  780. psutil._psplatform.cext.ERROR_SERVICE_DOES_NOT_EXIST
  781. )
  782. ERROR_ACCESS_DENIED = psutil._psplatform.cext.ERROR_ACCESS_DENIED
  783. name = next(psutil.win_service_iter()).name()
  784. with pytest.raises(psutil.NoSuchProcess) as cm:
  785. psutil.win_service_get(name + '???')
  786. assert cm.value.name == name + '???'
  787. # test NoSuchProcess
  788. service = psutil.win_service_get(name)
  789. exc = OSError(0, "msg", 0)
  790. exc.winerror = ERROR_SERVICE_DOES_NOT_EXIST
  791. with mock.patch(
  792. "psutil._psplatform.cext.winservice_query_status", side_effect=exc
  793. ):
  794. with pytest.raises(psutil.NoSuchProcess):
  795. service.status()
  796. with mock.patch(
  797. "psutil._psplatform.cext.winservice_query_config", side_effect=exc
  798. ):
  799. with pytest.raises(psutil.NoSuchProcess):
  800. service.username()
  801. # test AccessDenied
  802. exc = OSError(0, "msg", 0)
  803. exc.winerror = ERROR_ACCESS_DENIED
  804. with mock.patch(
  805. "psutil._psplatform.cext.winservice_query_status", side_effect=exc
  806. ):
  807. with pytest.raises(psutil.AccessDenied):
  808. service.status()
  809. with mock.patch(
  810. "psutil._psplatform.cext.winservice_query_config", side_effect=exc
  811. ):
  812. with pytest.raises(psutil.AccessDenied):
  813. service.username()
  814. # test __str__ and __repr__
  815. assert service.name() in str(service)
  816. assert service.display_name() in str(service)
  817. assert service.name() in repr(service)
  818. assert service.display_name() in repr(service)