| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660 |
- #!/usr/bin/env python3
- # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
- # Use of this source code is governed by a BSD-style license that can be
- # found in the LICENSE file.
- """Tests for psutil.Process class."""
- import collections
- import contextlib
- import errno
- import getpass
- import io
- import itertools
- import os
- import signal
- import socket
- import stat
- import string
- import subprocess
- import sys
- import textwrap
- import time
- from unittest import mock
- import psutil
- from psutil import AIX
- from psutil import BSD
- from psutil import LINUX
- from psutil import MACOS
- from psutil import NETBSD
- from psutil import OPENBSD
- from psutil import OSX
- from psutil import POSIX
- from psutil import WINDOWS
- from psutil._common import open_text
- from psutil.tests import CI_TESTING
- from psutil.tests import GITHUB_ACTIONS
- from psutil.tests import GLOBAL_TIMEOUT
- from psutil.tests import HAS_CPU_AFFINITY
- from psutil.tests import HAS_ENVIRON
- from psutil.tests import HAS_IONICE
- from psutil.tests import HAS_MEMORY_MAPS
- from psutil.tests import HAS_PROC_CPU_NUM
- from psutil.tests import HAS_PROC_IO_COUNTERS
- from psutil.tests import HAS_RLIMIT
- from psutil.tests import HAS_THREADS
- from psutil.tests import MACOS_11PLUS
- from psutil.tests import PYPY
- from psutil.tests import PYTHON_EXE
- from psutil.tests import PYTHON_EXE_ENV
- from psutil.tests import PsutilTestCase
- from psutil.tests import ThreadTask
- from psutil.tests import call_until
- from psutil.tests import copyload_shared_lib
- from psutil.tests import create_c_exe
- from psutil.tests import create_py_exe
- from psutil.tests import process_namespace
- from psutil.tests import pytest
- from psutil.tests import reap_children
- from psutil.tests import retry_on_failure
- from psutil.tests import sh
- from psutil.tests import skip_on_access_denied
- from psutil.tests import skip_on_not_implemented
- from psutil.tests import wait_for_pid
- # ===================================================================
- # --- psutil.Process class tests
- # ===================================================================
- class TestProcess(PsutilTestCase):
- """Tests for psutil.Process class."""
- def test_pid(self):
- p = psutil.Process()
- assert p.pid == os.getpid()
- with pytest.raises(AttributeError):
- p.pid = 33
- def test_kill(self):
- p = self.spawn_psproc()
- p.kill()
- code = p.wait()
- if WINDOWS:
- assert code == signal.SIGTERM
- else:
- assert code == -signal.SIGKILL
- self.assert_proc_gone(p)
- def test_terminate(self):
- p = self.spawn_psproc()
- p.terminate()
- code = p.wait()
- if WINDOWS:
- assert code == signal.SIGTERM
- else:
- assert code == -signal.SIGTERM
- self.assert_proc_gone(p)
- def test_send_signal(self):
- sig = signal.SIGKILL if POSIX else signal.SIGTERM
- p = self.spawn_psproc()
- p.send_signal(sig)
- code = p.wait()
- if WINDOWS:
- assert code == sig
- else:
- assert code == -sig
- self.assert_proc_gone(p)
- @pytest.mark.skipif(not POSIX, reason="not POSIX")
- def test_send_signal_mocked(self):
- sig = signal.SIGTERM
- p = self.spawn_psproc()
- with mock.patch('psutil.os.kill', side_effect=ProcessLookupError):
- with pytest.raises(psutil.NoSuchProcess):
- p.send_signal(sig)
- p = self.spawn_psproc()
- with mock.patch('psutil.os.kill', side_effect=PermissionError):
- with pytest.raises(psutil.AccessDenied):
- p.send_signal(sig)
- def test_wait_exited(self):
- # Test waitpid() + WIFEXITED -> WEXITSTATUS.
- # normal return, same as exit(0)
- cmd = [PYTHON_EXE, "-c", "pass"]
- p = self.spawn_psproc(cmd)
- code = p.wait()
- assert code == 0
- self.assert_proc_gone(p)
- # exit(1), implicit in case of error
- cmd = [PYTHON_EXE, "-c", "1 / 0"]
- p = self.spawn_psproc(cmd, stderr=subprocess.PIPE)
- code = p.wait()
- assert code == 1
- self.assert_proc_gone(p)
- # via sys.exit()
- cmd = [PYTHON_EXE, "-c", "import sys; sys.exit(5);"]
- p = self.spawn_psproc(cmd)
- code = p.wait()
- assert code == 5
- self.assert_proc_gone(p)
- # via os._exit()
- cmd = [PYTHON_EXE, "-c", "import os; os._exit(5);"]
- p = self.spawn_psproc(cmd)
- code = p.wait()
- assert code == 5
- self.assert_proc_gone(p)
- @pytest.mark.skipif(NETBSD, reason="fails on NETBSD")
- def test_wait_stopped(self):
- p = self.spawn_psproc()
- if POSIX:
- # Test waitpid() + WIFSTOPPED and WIFCONTINUED.
- # Note: if a process is stopped it ignores SIGTERM.
- p.send_signal(signal.SIGSTOP)
- with pytest.raises(psutil.TimeoutExpired):
- p.wait(timeout=0.001)
- p.send_signal(signal.SIGCONT)
- with pytest.raises(psutil.TimeoutExpired):
- p.wait(timeout=0.001)
- p.send_signal(signal.SIGTERM)
- assert p.wait() == -signal.SIGTERM
- assert p.wait() == -signal.SIGTERM
- else:
- p.suspend()
- with pytest.raises(psutil.TimeoutExpired):
- p.wait(timeout=0.001)
- p.resume()
- with pytest.raises(psutil.TimeoutExpired):
- p.wait(timeout=0.001)
- p.terminate()
- assert p.wait() == signal.SIGTERM
- assert p.wait() == signal.SIGTERM
- def test_wait_non_children(self):
- # Test wait() against a process which is not our direct
- # child.
- child, grandchild = self.spawn_children_pair()
- with pytest.raises(psutil.TimeoutExpired):
- child.wait(0.01)
- with pytest.raises(psutil.TimeoutExpired):
- grandchild.wait(0.01)
- # We also terminate the direct child otherwise the
- # grandchild will hang until the parent is gone.
- child.terminate()
- grandchild.terminate()
- child_ret = child.wait()
- grandchild_ret = grandchild.wait()
- if POSIX:
- assert child_ret == -signal.SIGTERM
- # For processes which are not our children we're supposed
- # to get None.
- assert grandchild_ret is None
- else:
- assert child_ret == signal.SIGTERM
- assert child_ret == signal.SIGTERM
- def test_wait_timeout(self):
- p = self.spawn_psproc()
- p.name()
- with pytest.raises(psutil.TimeoutExpired):
- p.wait(0.01)
- with pytest.raises(psutil.TimeoutExpired):
- p.wait(0)
- with pytest.raises(ValueError):
- p.wait(-1)
- def test_wait_timeout_nonblocking(self):
- p = self.spawn_psproc()
- with pytest.raises(psutil.TimeoutExpired):
- p.wait(0)
- p.kill()
- stop_at = time.time() + GLOBAL_TIMEOUT
- while time.time() < stop_at:
- try:
- code = p.wait(0)
- break
- except psutil.TimeoutExpired:
- pass
- else:
- return pytest.fail('timeout')
- if POSIX:
- assert code == -signal.SIGKILL
- else:
- assert code == signal.SIGTERM
- self.assert_proc_gone(p)
- def test_cpu_percent(self):
- p = psutil.Process()
- p.cpu_percent(interval=0.001)
- p.cpu_percent(interval=0.001)
- for _ in range(100):
- percent = p.cpu_percent(interval=None)
- assert isinstance(percent, float)
- assert percent >= 0.0
- with pytest.raises(ValueError):
- p.cpu_percent(interval=-1)
- def test_cpu_percent_numcpus_none(self):
- # See: https://github.com/giampaolo/psutil/issues/1087
- with mock.patch('psutil.cpu_count', return_value=None) as m:
- psutil.Process().cpu_percent()
- assert m.called
- def test_cpu_times(self):
- times = psutil.Process().cpu_times()
- assert times.user >= 0.0, times
- assert times.system >= 0.0, times
- assert times.children_user >= 0.0, times
- assert times.children_system >= 0.0, times
- if LINUX:
- assert times.iowait >= 0.0, times
- # make sure returned values can be pretty printed with strftime
- for name in times._fields:
- time.strftime("%H:%M:%S", time.localtime(getattr(times, name)))
- @pytest.mark.skipif(not HAS_PROC_CPU_NUM, reason="not supported")
- def test_cpu_num(self):
- p = psutil.Process()
- num = p.cpu_num()
- assert num >= 0
- if psutil.cpu_count() == 1:
- assert num == 0
- assert p.cpu_num() in range(psutil.cpu_count())
- def test_create_time(self):
- p = self.spawn_psproc()
- now = time.time()
- # Fail if the difference with current time is > 2s.
- assert abs(p.create_time() - now) < 2
- # make sure returned value can be pretty printed with strftime
- time.strftime("%Y %m %d %H:%M:%S", time.localtime(p.create_time()))
- @pytest.mark.skipif(not POSIX, reason="POSIX only")
- def test_terminal(self):
- terminal = psutil.Process().terminal()
- if terminal is not None:
- try:
- tty = os.path.realpath(sh('tty'))
- except RuntimeError:
- # Note: happens if pytest is run without the `-s` opt.
- return pytest.skip("can't rely on `tty` CLI")
- else:
- assert terminal == tty
- @pytest.mark.skipif(not HAS_PROC_IO_COUNTERS, reason="not supported")
- @skip_on_not_implemented(only_if=LINUX)
- def test_io_counters(self):
- p = psutil.Process()
- # test reads
- io1 = p.io_counters()
- with open(PYTHON_EXE, 'rb') as f:
- f.read()
- io2 = p.io_counters()
- if not BSD and not AIX:
- assert io2.read_count > io1.read_count
- assert io2.write_count == io1.write_count
- if LINUX:
- assert io2.read_chars > io1.read_chars
- assert io2.write_chars == io1.write_chars
- else:
- assert io2.read_bytes >= io1.read_bytes
- assert io2.write_bytes >= io1.write_bytes
- # test writes
- io1 = p.io_counters()
- with open(self.get_testfn(), 'wb') as f:
- f.write(bytes("x" * 1000000, 'ascii'))
- io2 = p.io_counters()
- assert io2.write_count >= io1.write_count
- assert io2.write_bytes >= io1.write_bytes
- assert io2.read_count >= io1.read_count
- assert io2.read_bytes >= io1.read_bytes
- if LINUX:
- assert io2.write_chars > io1.write_chars
- assert io2.read_chars >= io1.read_chars
- # sanity check
- for i in range(len(io2)):
- if BSD and i >= 2:
- # On BSD read_bytes and write_bytes are always set to -1.
- continue
- assert io2[i] >= 0
- assert io2[i] >= 0
- @pytest.mark.skipif(not HAS_IONICE, reason="not supported")
- @pytest.mark.skipif(not LINUX, reason="linux only")
- def test_ionice_linux(self):
- def cleanup(init):
- ioclass, value = init
- if ioclass == psutil.IOPRIO_CLASS_NONE:
- value = 0
- p.ionice(ioclass, value)
- p = psutil.Process()
- if not CI_TESTING:
- assert p.ionice()[0] == psutil.IOPRIO_CLASS_NONE
- assert psutil.IOPRIO_CLASS_NONE == 0
- assert psutil.IOPRIO_CLASS_RT == 1 # high
- assert psutil.IOPRIO_CLASS_BE == 2 # normal
- assert psutil.IOPRIO_CLASS_IDLE == 3 # low
- init = p.ionice()
- self.addCleanup(cleanup, init)
- # low
- p.ionice(psutil.IOPRIO_CLASS_IDLE)
- assert tuple(p.ionice()) == (psutil.IOPRIO_CLASS_IDLE, 0)
- with pytest.raises(ValueError): # accepts no value
- p.ionice(psutil.IOPRIO_CLASS_IDLE, value=7)
- # normal
- p.ionice(psutil.IOPRIO_CLASS_BE)
- assert tuple(p.ionice()) == (psutil.IOPRIO_CLASS_BE, 0)
- p.ionice(psutil.IOPRIO_CLASS_BE, value=7)
- assert tuple(p.ionice()) == (psutil.IOPRIO_CLASS_BE, 7)
- with pytest.raises(ValueError):
- p.ionice(psutil.IOPRIO_CLASS_BE, value=8)
- try:
- p.ionice(psutil.IOPRIO_CLASS_RT, value=7)
- except psutil.AccessDenied:
- pass
- # errs
- with pytest.raises(ValueError, match="ioclass accepts no value"):
- p.ionice(psutil.IOPRIO_CLASS_NONE, 1)
- with pytest.raises(ValueError, match="ioclass accepts no value"):
- p.ionice(psutil.IOPRIO_CLASS_IDLE, 1)
- with pytest.raises(
- ValueError, match="'ioclass' argument must be specified"
- ):
- p.ionice(value=1)
- @pytest.mark.skipif(not HAS_IONICE, reason="not supported")
- @pytest.mark.skipif(
- not WINDOWS, reason="not supported on this win version"
- )
- def test_ionice_win(self):
- p = psutil.Process()
- if not CI_TESTING:
- assert p.ionice() == psutil.IOPRIO_NORMAL
- init = p.ionice()
- self.addCleanup(p.ionice, init)
- # base
- p.ionice(psutil.IOPRIO_VERYLOW)
- assert p.ionice() == psutil.IOPRIO_VERYLOW
- p.ionice(psutil.IOPRIO_LOW)
- assert p.ionice() == psutil.IOPRIO_LOW
- try:
- p.ionice(psutil.IOPRIO_HIGH)
- except psutil.AccessDenied:
- pass
- else:
- assert p.ionice() == psutil.IOPRIO_HIGH
- # errs
- with pytest.raises(
- TypeError, match="value argument not accepted on Windows"
- ):
- p.ionice(psutil.IOPRIO_NORMAL, value=1)
- with pytest.raises(ValueError, match="is not a valid priority"):
- p.ionice(psutil.IOPRIO_HIGH + 1)
- @pytest.mark.skipif(not HAS_RLIMIT, reason="not supported")
- def test_rlimit_get(self):
- import resource
- p = psutil.Process(os.getpid())
- names = [x for x in dir(psutil) if x.startswith('RLIMIT')]
- assert names, names
- for name in names:
- value = getattr(psutil, name)
- assert value >= 0
- if name in dir(resource):
- assert value == getattr(resource, name)
- # XXX - On PyPy RLIMIT_INFINITY returned by
- # resource.getrlimit() is reported as a very big long
- # number instead of -1. It looks like a bug with PyPy.
- if PYPY:
- continue
- assert p.rlimit(value) == resource.getrlimit(value)
- else:
- ret = p.rlimit(value)
- assert len(ret) == 2
- assert ret[0] >= -1
- assert ret[1] >= -1
- @pytest.mark.skipif(not HAS_RLIMIT, reason="not supported")
- def test_rlimit_set(self):
- p = self.spawn_psproc()
- p.rlimit(psutil.RLIMIT_NOFILE, (5, 5))
- assert p.rlimit(psutil.RLIMIT_NOFILE) == (5, 5)
- # If pid is 0 prlimit() applies to the calling process and
- # we don't want that.
- if LINUX:
- with pytest.raises(ValueError, match="can't use prlimit"):
- psutil._psplatform.Process(0).rlimit(0)
- with pytest.raises(ValueError):
- p.rlimit(psutil.RLIMIT_NOFILE, (5, 5, 5))
- @pytest.mark.skipif(not HAS_RLIMIT, reason="not supported")
- def test_rlimit(self):
- p = psutil.Process()
- testfn = self.get_testfn()
- soft, hard = p.rlimit(psutil.RLIMIT_FSIZE)
- try:
- p.rlimit(psutil.RLIMIT_FSIZE, (1024, hard))
- with open(testfn, "wb") as f:
- f.write(b"X" * 1024)
- # write() or flush() doesn't always cause the exception
- # but close() will.
- with pytest.raises(OSError) as exc:
- with open(testfn, "wb") as f:
- f.write(b"X" * 1025)
- assert exc.value.errno == errno.EFBIG
- finally:
- p.rlimit(psutil.RLIMIT_FSIZE, (soft, hard))
- assert p.rlimit(psutil.RLIMIT_FSIZE) == (soft, hard)
- @pytest.mark.skipif(not HAS_RLIMIT, reason="not supported")
- def test_rlimit_infinity(self):
- # First set a limit, then re-set it by specifying INFINITY
- # and assume we overridden the previous limit.
- p = psutil.Process()
- soft, hard = p.rlimit(psutil.RLIMIT_FSIZE)
- try:
- p.rlimit(psutil.RLIMIT_FSIZE, (1024, hard))
- p.rlimit(psutil.RLIMIT_FSIZE, (psutil.RLIM_INFINITY, hard))
- with open(self.get_testfn(), "wb") as f:
- f.write(b"X" * 2048)
- finally:
- p.rlimit(psutil.RLIMIT_FSIZE, (soft, hard))
- assert p.rlimit(psutil.RLIMIT_FSIZE) == (soft, hard)
- @pytest.mark.skipif(not HAS_RLIMIT, reason="not supported")
- def test_rlimit_infinity_value(self):
- # RLIMIT_FSIZE should be RLIM_INFINITY, which will be a really
- # big number on a platform with large file support. On these
- # platforms we need to test that the get/setrlimit functions
- # properly convert the number to a C long long and that the
- # conversion doesn't raise an error.
- p = psutil.Process()
- soft, hard = p.rlimit(psutil.RLIMIT_FSIZE)
- assert hard == psutil.RLIM_INFINITY
- p.rlimit(psutil.RLIMIT_FSIZE, (soft, hard))
- @pytest.mark.xdist_group(name="serial")
- def test_num_threads(self):
- # on certain platforms such as Linux we might test for exact
- # thread number, since we always have with 1 thread per process,
- # but this does not apply across all platforms (MACOS, Windows)
- p = psutil.Process()
- if OPENBSD:
- try:
- step1 = p.num_threads()
- except psutil.AccessDenied:
- return pytest.skip("on OpenBSD this requires root access")
- else:
- step1 = p.num_threads()
- with ThreadTask():
- step2 = p.num_threads()
- assert step2 == step1 + 1
- @pytest.mark.skipif(not WINDOWS, reason="WINDOWS only")
- def test_num_handles(self):
- # a better test is done later into test/_windows.py
- p = psutil.Process()
- assert p.num_handles() > 0
- @pytest.mark.skipif(not HAS_THREADS, reason="not supported")
- def test_threads(self):
- p = psutil.Process()
- if OPENBSD:
- try:
- step1 = p.threads()
- except psutil.AccessDenied:
- return pytest.skip("on OpenBSD this requires root access")
- else:
- step1 = p.threads()
- with ThreadTask():
- step2 = p.threads()
- assert len(step2) == len(step1) + 1
- athread = step2[0]
- # test named tuple
- assert athread.id == athread[0]
- assert athread.user_time == athread[1]
- assert athread.system_time == athread[2]
- @retry_on_failure()
- @skip_on_access_denied(only_if=MACOS)
- @pytest.mark.skipif(not HAS_THREADS, reason="not supported")
- def test_threads_2(self):
- p = self.spawn_psproc()
- if OPENBSD:
- try:
- p.threads()
- except psutil.AccessDenied:
- return pytest.skip("on OpenBSD this requires root access")
- assert (
- abs(p.cpu_times().user - sum(x.user_time for x in p.threads()))
- < 0.1
- )
- assert (
- abs(p.cpu_times().system - sum(x.system_time for x in p.threads()))
- < 0.1
- )
- @retry_on_failure()
- def test_memory_info(self):
- p = psutil.Process()
- # step 1 - get a base value to compare our results
- rss1, vms1 = p.memory_info()[:2]
- percent1 = p.memory_percent()
- assert rss1 > 0
- assert vms1 > 0
- # step 2 - allocate some memory
- memarr = [None] * 1500000
- rss2, vms2 = p.memory_info()[:2]
- percent2 = p.memory_percent()
- # step 3 - make sure that the memory usage bumped up
- assert rss2 > rss1
- assert vms2 >= vms1 # vms might be equal
- assert percent2 > percent1
- del memarr
- if WINDOWS:
- mem = p.memory_info()
- assert mem.rss == mem.wset
- assert mem.vms == mem.pagefile
- mem = p.memory_info()
- for name in mem._fields:
- assert getattr(mem, name) >= 0
- def test_memory_full_info(self):
- p = psutil.Process()
- total = psutil.virtual_memory().total
- mem = p.memory_full_info()
- for name in mem._fields:
- value = getattr(mem, name)
- assert value >= 0
- if (name == "vms" and OSX) or LINUX:
- continue
- assert value <= total
- if LINUX or WINDOWS or MACOS:
- assert mem.uss >= 0
- if LINUX:
- assert mem.pss >= 0
- assert mem.swap >= 0
- @pytest.mark.skipif(not HAS_MEMORY_MAPS, reason="not supported")
- def test_memory_maps(self):
- p = psutil.Process()
- maps = p.memory_maps()
- assert len(maps) == len(set(maps))
- ext_maps = p.memory_maps(grouped=False)
- for nt in maps:
- if nt.path.startswith('['):
- continue
- if BSD and nt.path == "pvclock":
- continue
- assert os.path.isabs(nt.path), nt.path
- if POSIX:
- try:
- assert os.path.exists(nt.path) or os.path.islink(
- nt.path
- ), nt.path
- except AssertionError:
- if not LINUX:
- raise
- # https://github.com/giampaolo/psutil/issues/759
- with open_text('/proc/self/smaps') as f:
- data = f.read()
- if f"{nt.path} (deleted)" not in data:
- raise
- elif '64' not in os.path.basename(nt.path):
- # XXX - On Windows we have this strange behavior with
- # 64 bit dlls: they are visible via explorer but cannot
- # be accessed via os.stat() (wtf?).
- try:
- st = os.stat(nt.path)
- except FileNotFoundError:
- pass
- else:
- assert stat.S_ISREG(st.st_mode), nt.path
- for nt in ext_maps:
- for fname in nt._fields:
- value = getattr(nt, fname)
- if fname == 'path':
- continue
- if fname in {'addr', 'perms'}:
- assert value, value
- else:
- assert isinstance(value, int)
- assert value >= 0, value
- @pytest.mark.skipif(not HAS_MEMORY_MAPS, reason="not supported")
- def test_memory_maps_lists_lib(self):
- # Make sure a newly loaded shared lib is listed.
- p = psutil.Process()
- with copyload_shared_lib() as path:
- def normpath(p):
- return os.path.realpath(os.path.normcase(p))
- libpaths = [normpath(x.path) for x in p.memory_maps()]
- assert normpath(path) in libpaths
- def test_memory_percent(self):
- p = psutil.Process()
- p.memory_percent()
- with pytest.raises(ValueError):
- p.memory_percent(memtype="?!?")
- if LINUX or MACOS or WINDOWS:
- p.memory_percent(memtype='uss')
- def test_is_running(self):
- p = self.spawn_psproc()
- assert p.is_running()
- assert p.is_running()
- p.kill()
- p.wait()
- assert not p.is_running()
- assert not p.is_running()
- def test_exe(self):
- p = self.spawn_psproc()
- exe = p.exe()
- try:
- assert exe == PYTHON_EXE
- except AssertionError:
- if WINDOWS and len(exe) == len(PYTHON_EXE):
- # on Windows we don't care about case sensitivity
- normcase = os.path.normcase
- assert normcase(exe) == normcase(PYTHON_EXE)
- else:
- # certain platforms such as BSD are more accurate returning:
- # "/usr/local/bin/python3.7"
- # ...instead of:
- # "/usr/local/bin/python"
- # We do not want to consider this difference in accuracy
- # an error.
- ver = f"{sys.version_info[0]}.{sys.version_info[1]}"
- try:
- assert exe.replace(ver, '') == PYTHON_EXE.replace(ver, '')
- except AssertionError:
- # Typically MACOS. Really not sure what to do here.
- pass
- out = sh([exe, "-c", "import os; print('hey')"])
- assert out == 'hey'
- def test_cmdline(self):
- cmdline = [
- PYTHON_EXE,
- "-c",
- "import time; [time.sleep(0.1) for x in range(100)]",
- ]
- p = self.spawn_psproc(cmdline)
- if NETBSD and p.cmdline() == []:
- # https://github.com/giampaolo/psutil/issues/2250
- return pytest.skip("OPENBSD: returned EBUSY")
- # XXX - most of the times the underlying sysctl() call on Net
- # and Open BSD returns a truncated string.
- # Also /proc/pid/cmdline behaves the same so it looks
- # like this is a kernel bug.
- # XXX - AIX truncates long arguments in /proc/pid/cmdline
- if NETBSD or OPENBSD or AIX:
- assert p.cmdline()[0] == PYTHON_EXE
- else:
- if MACOS and CI_TESTING:
- pyexe = p.cmdline()[0]
- if pyexe != PYTHON_EXE:
- assert ' '.join(p.cmdline()[1:]) == ' '.join(cmdline[1:])
- return None
- assert ' '.join(p.cmdline()) == ' '.join(cmdline)
- def test_long_cmdline(self):
- cmdline = [PYTHON_EXE]
- cmdline.extend(["-v"] * 50)
- cmdline.extend(
- ["-c", "import time; [time.sleep(0.1) for x in range(100)]"]
- )
- p = self.spawn_psproc(cmdline)
- # XXX - flaky test: exclude the python exe which, for some
- # reason, and only sometimes, on OSX appears different.
- cmdline = cmdline[1:]
- if OPENBSD:
- # XXX: for some reason the test process may turn into a
- # zombie (don't know why).
- try:
- assert p.cmdline()[1:] == cmdline
- except psutil.ZombieProcess:
- return pytest.skip("OPENBSD: process turned into zombie")
- else:
- ret = p.cmdline()[1:]
- if NETBSD and ret == []:
- # https://github.com/giampaolo/psutil/issues/2250
- return pytest.skip("OPENBSD: returned EBUSY")
- assert ret == cmdline
- def test_name(self):
- p = self.spawn_psproc()
- name = p.name().lower()
- if name.endswith("t"): # in the free-threaded build
- name = name[:-1]
- pyexe = os.path.basename(os.path.realpath(sys.executable)).lower()
- assert pyexe.startswith(name), (pyexe, name)
- @retry_on_failure()
- def test_long_name(self):
- pyexe = create_py_exe(self.get_testfn(suffix=string.digits * 2))
- cmdline = [
- pyexe,
- "-c",
- "import time; [time.sleep(0.1) for x in range(100)]",
- ]
- p = self.spawn_psproc(cmdline)
- if OPENBSD:
- # XXX: for some reason the test process may turn into a
- # zombie (don't know why). Because the name() is long, all
- # UNIX kernels truncate it to 15 chars, so internally psutil
- # tries to guess the full name() from the cmdline(). But the
- # cmdline() of a zombie on OpenBSD fails (internally), so we
- # just compare the first 15 chars. Full explanation:
- # https://github.com/giampaolo/psutil/issues/2239
- try:
- assert p.name() == os.path.basename(pyexe)
- except AssertionError:
- if p.status() == psutil.STATUS_ZOMBIE:
- assert os.path.basename(pyexe).startswith(p.name())
- else:
- raise
- else:
- assert p.name() == os.path.basename(pyexe)
- @pytest.mark.skipif(not POSIX, reason="POSIX only")
- def test_uids(self):
- p = psutil.Process()
- real, effective, _saved = p.uids()
- # os.getuid() refers to "real" uid
- assert real == os.getuid()
- # os.geteuid() refers to "effective" uid
- assert effective == os.geteuid()
- # No such thing as os.getsuid() ("saved" uid), but we have
- # os.getresuid() which returns all of them.
- if hasattr(os, "getresuid"):
- assert os.getresuid() == p.uids()
- @pytest.mark.skipif(not POSIX, reason="POSIX only")
- def test_gids(self):
- p = psutil.Process()
- real, effective, _saved = p.gids()
- # os.getuid() refers to "real" uid
- assert real == os.getgid()
- # os.geteuid() refers to "effective" uid
- assert effective == os.getegid()
- # No such thing as os.getsgid() ("saved" gid), but we have
- # os.getresgid() which returns all of them.
- if hasattr(os, "getresuid"):
- assert os.getresgid() == p.gids()
- def test_nice(self):
- def cleanup(init):
- try:
- p.nice(init)
- except psutil.AccessDenied:
- pass
- p = psutil.Process()
- with pytest.raises(TypeError):
- p.nice("str")
- init = p.nice()
- self.addCleanup(cleanup, init)
- if WINDOWS:
- highest_prio = None
- for prio in [
- psutil.IDLE_PRIORITY_CLASS,
- psutil.BELOW_NORMAL_PRIORITY_CLASS,
- psutil.NORMAL_PRIORITY_CLASS,
- psutil.ABOVE_NORMAL_PRIORITY_CLASS,
- psutil.HIGH_PRIORITY_CLASS,
- psutil.REALTIME_PRIORITY_CLASS,
- ]:
- with self.subTest(prio=prio):
- try:
- p.nice(prio)
- except psutil.AccessDenied:
- pass
- else:
- new_prio = p.nice()
- # The OS may limit our maximum priority,
- # even if the function succeeds. For higher
- # priorities, we match either the expected
- # value or the highest so far.
- if prio in {
- psutil.ABOVE_NORMAL_PRIORITY_CLASS,
- psutil.HIGH_PRIORITY_CLASS,
- psutil.REALTIME_PRIORITY_CLASS,
- }:
- if new_prio == prio or highest_prio is None:
- highest_prio = prio
- assert new_prio == highest_prio
- else:
- assert new_prio == prio
- else:
- try:
- if hasattr(os, "getpriority"):
- assert (
- os.getpriority(os.PRIO_PROCESS, os.getpid())
- == p.nice()
- )
- p.nice(1)
- assert p.nice() == 1
- if hasattr(os, "getpriority"):
- assert (
- os.getpriority(os.PRIO_PROCESS, os.getpid())
- == p.nice()
- )
- # XXX - going back to previous nice value raises
- # AccessDenied on MACOS
- if not MACOS:
- p.nice(0)
- assert p.nice() == 0
- except psutil.AccessDenied:
- pass
- def test_status(self):
- p = psutil.Process()
- assert p.status() == psutil.STATUS_RUNNING
- def test_username(self):
- p = self.spawn_psproc()
- username = p.username()
- if WINDOWS:
- domain, username = username.split('\\')
- getpass_user = getpass.getuser()
- if getpass_user.endswith('$'):
- # When running as a service account (most likely to be
- # NetworkService), these user name calculations don't produce
- # the same result, causing the test to fail.
- return pytest.skip('running as service account')
- assert username == getpass_user
- if 'USERDOMAIN' in os.environ:
- assert domain == os.environ['USERDOMAIN']
- else:
- assert username == getpass.getuser()
- def test_cwd(self):
- p = self.spawn_psproc()
- assert p.cwd() == os.getcwd()
- def test_cwd_2(self):
- cmd = [
- PYTHON_EXE,
- "-c",
- (
- "import os, time; os.chdir('..'); [time.sleep(0.1) for x in"
- " range(100)]"
- ),
- ]
- p = self.spawn_psproc(cmd)
- call_until(lambda: p.cwd() == os.path.dirname(os.getcwd()))
- @pytest.mark.skipif(not HAS_CPU_AFFINITY, reason="not supported")
- def test_cpu_affinity(self):
- p = psutil.Process()
- initial = p.cpu_affinity()
- assert initial, initial
- self.addCleanup(p.cpu_affinity, initial)
- if hasattr(os, "sched_getaffinity"):
- assert initial == list(os.sched_getaffinity(p.pid))
- assert len(initial) == len(set(initial))
- all_cpus = list(range(len(psutil.cpu_percent(percpu=True))))
- for n in all_cpus:
- p.cpu_affinity([n])
- assert p.cpu_affinity() == [n]
- if hasattr(os, "sched_getaffinity"):
- assert p.cpu_affinity() == list(os.sched_getaffinity(p.pid))
- # also test num_cpu()
- if hasattr(p, "num_cpu"):
- assert p.cpu_affinity()[0] == p.num_cpu()
- # [] is an alias for "all eligible CPUs"; on Linux this may
- # not be equal to all available CPUs, see:
- # https://github.com/giampaolo/psutil/issues/956
- p.cpu_affinity([])
- if LINUX:
- assert p.cpu_affinity() == p._proc._get_eligible_cpus()
- else:
- assert p.cpu_affinity() == all_cpus
- if hasattr(os, "sched_getaffinity"):
- assert p.cpu_affinity() == list(os.sched_getaffinity(p.pid))
- with pytest.raises(TypeError):
- p.cpu_affinity(1)
- p.cpu_affinity(initial)
- # it should work with all iterables, not only lists
- p.cpu_affinity(set(all_cpus))
- p.cpu_affinity(tuple(all_cpus))
- @pytest.mark.skipif(not HAS_CPU_AFFINITY, reason="not supported")
- def test_cpu_affinity_errs(self):
- p = self.spawn_psproc()
- invalid_cpu = [len(psutil.cpu_times(percpu=True)) + 10]
- with pytest.raises(ValueError):
- p.cpu_affinity(invalid_cpu)
- with pytest.raises(ValueError):
- p.cpu_affinity(range(10000, 11000))
- with pytest.raises((TypeError, ValueError)):
- p.cpu_affinity([0, "1"])
- with pytest.raises(ValueError):
- p.cpu_affinity([0, -1])
- @pytest.mark.skipif(not HAS_CPU_AFFINITY, reason="not supported")
- def test_cpu_affinity_all_combinations(self):
- p = psutil.Process()
- initial = p.cpu_affinity()
- assert initial, initial
- self.addCleanup(p.cpu_affinity, initial)
- # All possible CPU set combinations.
- if len(initial) > 12:
- initial = initial[:12] # ...otherwise it will take forever
- combos = []
- for i in range(len(initial) + 1):
- combos.extend(
- list(subset)
- for subset in itertools.combinations(initial, i)
- if subset
- )
- for combo in combos:
- p.cpu_affinity(combo)
- assert sorted(p.cpu_affinity()) == sorted(combo)
- # TODO: #595
- @pytest.mark.skipif(BSD, reason="broken on BSD")
- def test_open_files(self):
- p = psutil.Process()
- testfn = self.get_testfn()
- files = p.open_files()
- assert testfn not in files
- with open(testfn, 'wb') as f:
- f.write(b'x' * 1024)
- f.flush()
- # give the kernel some time to see the new file
- call_until(lambda: len(p.open_files()) != len(files))
- files = p.open_files()
- filenames = [os.path.normcase(x.path) for x in files]
- assert os.path.normcase(testfn) in filenames
- if LINUX:
- for file in files:
- if file.path == testfn:
- assert file.position == 1024
- for file in files:
- assert os.path.isfile(file.path), file
- # another process
- cmdline = (
- f"import time; f = open(r'{testfn}', 'r'); [time.sleep(0.1) for x"
- " in range(100)];"
- )
- p = self.spawn_psproc([PYTHON_EXE, "-c", cmdline])
- for x in range(100):
- filenames = [os.path.normcase(x.path) for x in p.open_files()]
- if testfn in filenames:
- break
- time.sleep(0.01)
- else:
- assert os.path.normcase(testfn) in filenames
- for file in filenames:
- assert os.path.isfile(file), file
- # TODO: #595
- @pytest.mark.skipif(BSD, reason="broken on BSD")
- def test_open_files_2(self):
- # test fd and path fields
- p = psutil.Process()
- normcase = os.path.normcase
- testfn = self.get_testfn()
- with open(testfn, 'w') as fileobj:
- for file in p.open_files():
- if (
- normcase(file.path) == normcase(fileobj.name)
- or file.fd == fileobj.fileno()
- ):
- break
- else:
- return pytest.fail(f"no file found; files={p.open_files()!r}")
- assert normcase(file.path) == normcase(fileobj.name)
- if WINDOWS:
- assert file.fd == -1
- else:
- assert file.fd == fileobj.fileno()
- # test positions
- ntuple = p.open_files()[0]
- assert ntuple[0] == ntuple.path
- assert ntuple[1] == ntuple.fd
- # test file is gone
- assert fileobj.name not in p.open_files()
- @pytest.mark.skipif(not POSIX, reason="POSIX only")
- @pytest.mark.xdist_group(name="serial")
- def test_num_fds(self):
- p = psutil.Process()
- testfn = self.get_testfn()
- start = p.num_fds()
- with open(testfn, 'w'):
- assert p.num_fds() == start + 1
- with socket.socket():
- assert p.num_fds() == start + 2
- assert p.num_fds() == start
- @skip_on_not_implemented(only_if=LINUX)
- @pytest.mark.skipif(
- OPENBSD or NETBSD, reason="not reliable on OPENBSD & NETBSD"
- )
- def test_num_ctx_switches(self):
- p = psutil.Process()
- before = sum(p.num_ctx_switches())
- for _ in range(2):
- time.sleep(0.05) # this shall ensure a context switch happens
- after = sum(p.num_ctx_switches())
- if after > before:
- return None
- return pytest.fail(
- "num ctx switches still the same after 2 iterations"
- )
- def test_ppid(self):
- p = psutil.Process()
- if hasattr(os, 'getppid'):
- assert p.ppid() == os.getppid()
- p = self.spawn_psproc()
- assert p.ppid() == os.getpid()
- def test_parent(self):
- p = self.spawn_psproc()
- assert p.parent().pid == os.getpid()
- lowest_pid = psutil.pids()[0]
- assert psutil.Process(lowest_pid).parent() is None
- def test_parent_mocked_ctime(self):
- # Make sure we get a fresh copy of the ctime before processing
- # parent().We make the assumption that the parent pid MUST have
- # a creation time < than the child. If system clock is updated
- # this assumption was broken.
- # https://github.com/giampaolo/psutil/issues/2542
- p = self.spawn_psproc()
- p.create_time() # trigger cache
- assert p._create_time
- p._create_time = 1
- assert p.parent().pid == os.getpid()
- def test_parent_multi(self):
- parent = psutil.Process()
- child, grandchild = self.spawn_children_pair()
- assert grandchild.parent() == child
- assert child.parent() == parent
- @retry_on_failure()
- def test_parents(self):
- parent = psutil.Process()
- assert parent.parents()
- child, grandchild = self.spawn_children_pair()
- assert child.parents()[0] == parent
- assert grandchild.parents()[0] == child
- assert grandchild.parents()[1] == parent
- def test_children(self):
- parent = psutil.Process()
- assert not parent.children()
- assert not parent.children(recursive=True)
- # On Windows we set the flag to 0 in order to cancel out the
- # CREATE_NO_WINDOW flag (enabled by default) which creates
- # an extra "conhost.exe" child.
- child = self.spawn_psproc(creationflags=0)
- children1 = parent.children()
- children2 = parent.children(recursive=True)
- for children in (children1, children2):
- assert len(children) == 1
- assert children[0].pid == child.pid
- assert children[0].ppid() == parent.pid
- def test_children_mocked_ctime(self):
- # Make sure we get a fresh copy of the ctime before processing
- # children(). We make the assumption that process children MUST
- # have a creation time > than the parent. If system clock is
- # updated this assumption was broken.
- # https://github.com/giampaolo/psutil/issues/2542
- parent = psutil.Process()
- parent.create_time() # trigger cache
- assert parent._create_time
- parent._create_time += 100000
- assert not parent.children()
- assert not parent.children(recursive=True)
- # On Windows we set the flag to 0 in order to cancel out the
- # CREATE_NO_WINDOW flag (enabled by default) which creates
- # an extra "conhost.exe" child.
- child = self.spawn_psproc(creationflags=0)
- children1 = parent.children()
- children2 = parent.children(recursive=True)
- for children in (children1, children2):
- assert len(children) == 1
- assert children[0].pid == child.pid
- assert children[0].ppid() == parent.pid
- def test_children_recursive(self):
- # Test children() against two sub processes, p1 and p2, where
- # p1 (our child) spawned p2 (our grandchild).
- parent = psutil.Process()
- child, grandchild = self.spawn_children_pair()
- assert parent.children() == [child]
- assert parent.children(recursive=True) == [child, grandchild]
- # If the intermediate process is gone there's no way for
- # children() to recursively find it.
- child.terminate()
- child.wait()
- assert not parent.children(recursive=True)
- def test_children_duplicates(self):
- # find the process which has the highest number of children
- table = collections.defaultdict(int)
- for p in psutil.process_iter():
- try:
- table[p.ppid()] += 1
- except psutil.Error:
- pass
- # this is the one, now let's make sure there are no duplicates
- pid = max(table.items(), key=lambda x: x[1])[0]
- if LINUX and pid == 0:
- return pytest.skip("PID 0")
- p = psutil.Process(pid)
- try:
- c = p.children(recursive=True)
- except psutil.AccessDenied: # windows
- pass
- else:
- assert len(c) == len(set(c))
- def test_parents_and_children(self):
- parent = psutil.Process()
- child, grandchild = self.spawn_children_pair()
- # forward
- children = parent.children(recursive=True)
- assert len(children) == 2
- assert children[0] == child
- assert children[1] == grandchild
- # backward
- parents = grandchild.parents()
- assert parents[0] == child
- assert parents[1] == parent
- def test_suspend_resume(self):
- p = self.spawn_psproc()
- p.suspend()
- for _ in range(100):
- if p.status() == psutil.STATUS_STOPPED:
- break
- time.sleep(0.01)
- p.resume()
- assert p.status() != psutil.STATUS_STOPPED
- def test_invalid_pid(self):
- with pytest.raises(TypeError):
- psutil.Process("1")
- with pytest.raises(ValueError):
- psutil.Process(-1)
- def test_as_dict(self):
- p = psutil.Process()
- d = p.as_dict(attrs=['exe', 'name'])
- assert sorted(d.keys()) == ['exe', 'name']
- p = psutil.Process(min(psutil.pids()))
- d = p.as_dict(attrs=['net_connections'], ad_value='foo')
- if not isinstance(d['net_connections'], list):
- assert d['net_connections'] == 'foo'
- # Test ad_value is set on AccessDenied.
- with mock.patch(
- 'psutil.Process.nice', create=True, side_effect=psutil.AccessDenied
- ):
- assert p.as_dict(attrs=["nice"], ad_value=1) == {"nice": 1}
- # Test that NoSuchProcess bubbles up.
- with mock.patch(
- 'psutil.Process.nice',
- create=True,
- side_effect=psutil.NoSuchProcess(p.pid, "name"),
- ):
- with pytest.raises(psutil.NoSuchProcess):
- p.as_dict(attrs=["nice"])
- # Test that ZombieProcess is swallowed.
- with mock.patch(
- 'psutil.Process.nice',
- create=True,
- side_effect=psutil.ZombieProcess(p.pid, "name"),
- ):
- assert p.as_dict(attrs=["nice"], ad_value="foo") == {"nice": "foo"}
- # By default APIs raising NotImplementedError are
- # supposed to be skipped.
- with mock.patch(
- 'psutil.Process.nice', create=True, side_effect=NotImplementedError
- ):
- d = p.as_dict()
- assert 'nice' not in list(d.keys())
- # ...unless the user explicitly asked for some attr.
- with pytest.raises(NotImplementedError):
- p.as_dict(attrs=["nice"])
- # errors
- with pytest.raises(TypeError):
- p.as_dict('name')
- with pytest.raises(ValueError):
- p.as_dict(['foo'])
- with pytest.raises(ValueError):
- p.as_dict(['foo', 'bar'])
- def test_oneshot(self):
- p = psutil.Process()
- with mock.patch("psutil._psplatform.Process.cpu_times") as m:
- with p.oneshot():
- p.cpu_times()
- p.cpu_times()
- assert m.call_count == 1
- with mock.patch("psutil._psplatform.Process.cpu_times") as m:
- p.cpu_times()
- p.cpu_times()
- assert m.call_count == 2
- def test_oneshot_twice(self):
- # Test the case where the ctx manager is __enter__ed twice.
- # The second __enter__ is supposed to resut in a NOOP.
- p = psutil.Process()
- with mock.patch("psutil._psplatform.Process.cpu_times") as m1:
- with mock.patch("psutil._psplatform.Process.oneshot_enter") as m2:
- with p.oneshot():
- p.cpu_times()
- p.cpu_times()
- with p.oneshot():
- p.cpu_times()
- p.cpu_times()
- assert m1.call_count == 1
- assert m2.call_count == 1
- with mock.patch("psutil._psplatform.Process.cpu_times") as m:
- p.cpu_times()
- p.cpu_times()
- assert m.call_count == 2
- def test_oneshot_cache(self):
- # Make sure oneshot() cache is nonglobal. Instead it's
- # supposed to be bound to the Process instance, see:
- # https://github.com/giampaolo/psutil/issues/1373
- p1, p2 = self.spawn_children_pair()
- p1_ppid = p1.ppid()
- p2_ppid = p2.ppid()
- assert p1_ppid != p2_ppid
- with p1.oneshot():
- assert p1.ppid() == p1_ppid
- assert p2.ppid() == p2_ppid
- with p2.oneshot():
- assert p1.ppid() == p1_ppid
- assert p2.ppid() == p2_ppid
- def test_halfway_terminated_process(self):
- # Test that NoSuchProcess exception gets raised in case the
- # process dies after we create the Process object.
- # Example:
- # >>> proc = Process(1234)
- # >>> time.sleep(2) # time-consuming task, process dies in meantime
- # >>> proc.name()
- # Refers to Issue #15
- def assert_raises_nsp(fun, fun_name):
- try:
- ret = fun()
- except psutil.ZombieProcess: # differentiate from NSP
- raise
- except psutil.NoSuchProcess:
- pass
- except psutil.AccessDenied:
- if OPENBSD and fun_name in {'threads', 'num_threads'}:
- return None
- raise
- else:
- # NtQuerySystemInformation succeeds even if process is gone.
- if WINDOWS and fun_name in {'exe', 'name'}:
- return None
- return pytest.fail(
- f"{fun!r} didn't raise NSP and returned {ret!r} instead"
- )
- p = self.spawn_psproc()
- p.terminate()
- p.wait()
- if WINDOWS: # XXX
- call_until(lambda: p.pid not in psutil.pids())
- self.assert_proc_gone(p)
- ns = process_namespace(p)
- for fun, name in ns.iter(ns.all):
- assert_raises_nsp(fun, name)
- @pytest.mark.skipif(not POSIX, reason="POSIX only")
- def test_zombie_process(self):
- _parent, zombie = self.spawn_zombie()
- self.assert_proc_zombie(zombie)
- if hasattr(psutil._psplatform.cext, "proc_is_zombie"):
- assert not psutil._psplatform.cext.proc_is_zombie(os.getpid())
- assert psutil._psplatform.cext.proc_is_zombie(zombie.pid)
- @pytest.mark.skipif(not POSIX, reason="POSIX only")
- def test_zombie_process_is_running_w_exc(self):
- # Emulate a case where internally is_running() raises
- # ZombieProcess.
- p = psutil.Process()
- with mock.patch(
- "psutil.Process", side_effect=psutil.ZombieProcess(0)
- ) as m:
- assert p.is_running()
- assert m.called
- @pytest.mark.skipif(not POSIX, reason="POSIX only")
- def test_zombie_process_status_w_exc(self):
- # Emulate a case where internally status() raises
- # ZombieProcess.
- p = psutil.Process()
- with mock.patch(
- "psutil._psplatform.Process.status",
- side_effect=psutil.ZombieProcess(0),
- ) as m:
- assert p.status() == psutil.STATUS_ZOMBIE
- assert m.called
- def test_reused_pid(self):
- # Emulate a case where PID has been reused by another process.
- subp = self.spawn_subproc()
- p = psutil.Process(subp.pid)
- p._ident = (p.pid, p.create_time() + 100)
- list(psutil.process_iter())
- assert p.pid in psutil._pmap
- assert not p.is_running()
- # make sure is_running() removed PID from process_iter()
- # internal cache
- with mock.patch.object(psutil._common, "PSUTIL_DEBUG", True):
- with contextlib.redirect_stderr(io.StringIO()) as f:
- list(psutil.process_iter())
- assert (
- f"refreshing Process instance for reused PID {p.pid}"
- in f.getvalue()
- )
- assert p.pid not in psutil._pmap
- assert p != psutil.Process(subp.pid)
- msg = "process no longer exists and its PID has been reused"
- ns = process_namespace(p)
- for fun, name in ns.iter(ns.setters + ns.killers, clear_cache=False):
- with self.subTest(name=name):
- with pytest.raises(psutil.NoSuchProcess, match=msg):
- fun()
- assert "terminated + PID reused" in str(p)
- assert "terminated + PID reused" in repr(p)
- with pytest.raises(psutil.NoSuchProcess, match=msg):
- p.ppid()
- with pytest.raises(psutil.NoSuchProcess, match=msg):
- p.parent()
- with pytest.raises(psutil.NoSuchProcess, match=msg):
- p.parents()
- with pytest.raises(psutil.NoSuchProcess, match=msg):
- p.children()
- def test_pid_0(self):
- # Process(0) is supposed to work on all platforms except Linux
- if 0 not in psutil.pids():
- with pytest.raises(psutil.NoSuchProcess):
- psutil.Process(0)
- # These 2 are a contradiction, but "ps" says PID 1's parent
- # is PID 0.
- assert not psutil.pid_exists(0)
- assert psutil.Process(1).ppid() == 0
- return
- p = psutil.Process(0)
- exc = psutil.AccessDenied if WINDOWS else ValueError
- with pytest.raises(exc):
- p.wait()
- with pytest.raises(exc):
- p.terminate()
- with pytest.raises(exc):
- p.suspend()
- with pytest.raises(exc):
- p.resume()
- with pytest.raises(exc):
- p.kill()
- with pytest.raises(exc):
- p.send_signal(signal.SIGTERM)
- # test all methods
- ns = process_namespace(p)
- for fun, name in ns.iter(ns.getters + ns.setters):
- try:
- ret = fun()
- except psutil.AccessDenied:
- pass
- else:
- if name in {"uids", "gids"}:
- assert ret.real == 0
- elif name == "username":
- user = 'NT AUTHORITY\\SYSTEM' if WINDOWS else 'root'
- assert p.username() == user
- elif name == "name":
- assert name, name
- if not OPENBSD:
- assert 0 in psutil.pids()
- assert psutil.pid_exists(0)
- @pytest.mark.skipif(not HAS_ENVIRON, reason="not supported")
- def test_environ(self):
- def clean_dict(d):
- exclude = {"PLAT", "HOME"}
- if MACOS:
- exclude.update([
- "__CF_USER_TEXT_ENCODING",
- "VERSIONER_PYTHON_PREFER_32_BIT",
- "VERSIONER_PYTHON_VERSION",
- ])
- for name in list(d.keys()):
- if name in exclude or name.startswith("PYTEST_"):
- d.pop(name)
- return {
- k.replace("\r", "").replace("\n", ""): (
- v.replace("\r", "").replace("\n", "")
- )
- for k, v in d.items()
- }
- self.maxDiff = None
- p = psutil.Process()
- d1 = clean_dict(p.environ())
- d2 = clean_dict(os.environ.copy())
- if not OSX and GITHUB_ACTIONS:
- assert d1 == d2
- @pytest.mark.skipif(not HAS_ENVIRON, reason="not supported")
- @pytest.mark.skipif(not POSIX, reason="POSIX only")
- @pytest.mark.skipif(
- MACOS_11PLUS,
- reason="macOS 11+ can't get another process environment, issue #2084",
- )
- @pytest.mark.skipif(
- NETBSD, reason="sometimes fails on `assert is_running()`"
- )
- def test_weird_environ(self):
- # environment variables can contain values without an equals sign
- code = textwrap.dedent("""
- #include <unistd.h>
- #include <fcntl.h>
- char * const argv[] = {"cat", 0};
- char * const envp[] = {"A=1", "X", "C=3", 0};
- int main(void) {
- // Close stderr on exec so parent can wait for the
- // execve to finish.
- if (fcntl(2, F_SETFD, FD_CLOEXEC) != 0)
- return 0;
- return execve("/bin/cat", argv, envp);
- }
- """)
- cexe = create_c_exe(self.get_testfn(), c_code=code)
- sproc = self.spawn_subproc(
- [cexe], stdin=subprocess.PIPE, stderr=subprocess.PIPE
- )
- p = psutil.Process(sproc.pid)
- wait_for_pid(p.pid)
- assert p.is_running()
- # Wait for process to exec or exit.
- assert sproc.stderr.read() == b""
- if MACOS and CI_TESTING:
- try:
- env = p.environ()
- except psutil.AccessDenied:
- # XXX: fails sometimes with:
- # PermissionError from 'sysctl(KERN_PROCARGS2) -> EIO'
- return
- else:
- env = p.environ()
- assert env == {"A": "1", "C": "3"}
- sproc.communicate()
- assert sproc.returncode == 0
- # ===================================================================
- # --- psutil.Popen tests
- # ===================================================================
- class TestPopen(PsutilTestCase):
- """Tests for psutil.Popen class."""
- @classmethod
- def tearDownClass(cls):
- reap_children()
- @pytest.mark.skipif(MACOS and GITHUB_ACTIONS, reason="hangs on OSX + CI")
- def test_misc(self):
- # XXX this test causes a ResourceWarning because
- # psutil.__subproc instance doesn't get properly freed.
- # Not sure what to do though.
- cmd = [
- PYTHON_EXE,
- "-c",
- "import time; [time.sleep(0.1) for x in range(100)];",
- ]
- with psutil.Popen(
- cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- env=PYTHON_EXE_ENV,
- ) as proc:
- proc.name()
- proc.cpu_times()
- proc.stdin # noqa: B018
- assert dir(proc)
- with pytest.raises(AttributeError):
- proc.foo # noqa: B018
- proc.terminate()
- if POSIX:
- assert proc.wait(5) == -signal.SIGTERM
- else:
- assert proc.wait(5) == signal.SIGTERM
- def test_ctx_manager(self):
- with psutil.Popen(
- [PYTHON_EXE, "-V"],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- stdin=subprocess.PIPE,
- env=PYTHON_EXE_ENV,
- ) as proc:
- proc.communicate()
- assert proc.stdout.closed
- assert proc.stderr.closed
- assert proc.stdin.closed
- assert proc.returncode == 0
- def test_kill_terminate(self):
- # subprocess.Popen()'s terminate(), kill() and send_signal() do
- # not raise exception after the process is gone. psutil.Popen
- # diverges from that.
- cmd = [
- PYTHON_EXE,
- "-c",
- "import time; [time.sleep(0.1) for x in range(100)];",
- ]
- with psutil.Popen(
- cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- env=PYTHON_EXE_ENV,
- ) as proc:
- proc.terminate()
- proc.wait()
- with pytest.raises(psutil.NoSuchProcess):
- proc.terminate()
- with pytest.raises(psutil.NoSuchProcess):
- proc.kill()
- with pytest.raises(psutil.NoSuchProcess):
- proc.send_signal(signal.SIGTERM)
- if WINDOWS:
- with pytest.raises(psutil.NoSuchProcess):
- proc.send_signal(signal.CTRL_C_EVENT)
- with pytest.raises(psutil.NoSuchProcess):
- proc.send_signal(signal.CTRL_BREAK_EVENT)
- def test__getattribute__(self):
- cmd = [
- PYTHON_EXE,
- "-c",
- "import time; [time.sleep(0.1) for x in range(100)];",
- ]
- with psutil.Popen(
- cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- env=PYTHON_EXE_ENV,
- ) as proc:
- proc.terminate()
- proc.wait()
- with pytest.raises(AttributeError):
- proc.foo # noqa: B018
|