| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669 |
- # Human friendly input/output in Python.
- #
- # Author: Peter Odding <peter@peterodding.com>
- # Last Change: March 6, 2020
- # URL: https://humanfriendly.readthedocs.io
- """
- Utility classes and functions that make it easy to write :mod:`unittest` compatible test suites.
- Over the years I've developed the habit of writing test suites for Python
- projects using the :mod:`unittest` module. During those years I've come to know
- :pypi:`pytest` and in fact I use :pypi:`pytest` to run my test suites (due to
- its much better error reporting) but I've yet to publish a test suite that
- *requires* :pypi:`pytest`. I have several reasons for doing so:
- - It's nice to keep my test suites as simple and accessible as possible and
- not requiring a specific test runner is part of that attitude.
- - Whereas :mod:`unittest` is quite explicit, :pypi:`pytest` contains a lot of
- magic, which kind of contradicts the Python mantra "explicit is better than
- implicit" (IMHO).
- """
- # Standard library module
- import functools
- import logging
- import os
- import pipes
- import shutil
- import sys
- import tempfile
- import time
- import unittest
- # Modules included in our package.
- from humanfriendly.compat import StringIO
- from humanfriendly.text import random_string
- # Initialize a logger for this module.
- logger = logging.getLogger(__name__)
- # A unique object reference used to detect missing attributes.
- NOTHING = object()
- # Public identifiers that require documentation.
- __all__ = (
- 'CallableTimedOut',
- 'CaptureBuffer',
- 'CaptureOutput',
- 'ContextManager',
- 'CustomSearchPath',
- 'MockedProgram',
- 'PatchedAttribute',
- 'PatchedItem',
- 'TemporaryDirectory',
- 'TestCase',
- 'configure_logging',
- 'make_dirs',
- 'retry',
- 'run_cli',
- 'skip_on_raise',
- 'touch',
- )
- def configure_logging(log_level=logging.DEBUG):
- """configure_logging(log_level=logging.DEBUG)
- Automatically configure logging to the terminal.
- :param log_level: The log verbosity (a number, defaults
- to :mod:`logging.DEBUG <logging>`).
- When :mod:`coloredlogs` is installed :func:`coloredlogs.install()` will be
- used to configure logging to the terminal. When this fails with an
- :exc:`~exceptions.ImportError` then :func:`logging.basicConfig()` is used
- as a fall back.
- """
- try:
- import coloredlogs
- coloredlogs.install(level=log_level)
- except ImportError:
- logging.basicConfig(
- level=log_level,
- format='%(asctime)s %(name)s[%(process)d] %(levelname)s %(message)s',
- datefmt='%Y-%m-%d %H:%M:%S')
- def make_dirs(pathname):
- """
- Create missing directories.
- :param pathname: The pathname of a directory (a string).
- """
- if not os.path.isdir(pathname):
- os.makedirs(pathname)
- def retry(func, timeout=60, exc_type=AssertionError):
- """retry(func, timeout=60, exc_type=AssertionError)
- Retry a function until assertions no longer fail.
- :param func: A callable. When the callable returns
- :data:`False` it will also be retried.
- :param timeout: The number of seconds after which to abort (a number,
- defaults to 60).
- :param exc_type: The type of exceptions to retry (defaults
- to :exc:`~exceptions.AssertionError`).
- :returns: The value returned by `func`.
- :raises: Once the timeout has expired :func:`retry()` will raise the
- previously retried assertion error. When `func` keeps returning
- :data:`False` until `timeout` expires :exc:`CallableTimedOut`
- will be raised.
- This function sleeps between retries to avoid claiming CPU cycles we don't
- need. It starts by sleeping for 0.1 second but adjusts this to one second
- as the number of retries grows.
- """
- pause = 0.1
- timeout += time.time()
- while True:
- try:
- result = func()
- if result is not False:
- return result
- except exc_type:
- if time.time() > timeout:
- raise
- else:
- if time.time() > timeout:
- raise CallableTimedOut()
- time.sleep(pause)
- if pause < 1:
- pause *= 2
- def run_cli(entry_point, *arguments, **options):
- """
- Test a command line entry point.
- :param entry_point: The function that implements the command line interface
- (a callable).
- :param arguments: Any positional arguments (strings) become the command
- line arguments (:data:`sys.argv` items 1-N).
- :param options: The following keyword arguments are supported:
- **capture**
- Whether to use :class:`CaptureOutput`. Defaults
- to :data:`True` but can be disabled by passing
- :data:`False` instead.
- **input**
- Refer to :class:`CaptureOutput`.
- **merged**
- Refer to :class:`CaptureOutput`.
- **program_name**
- Used to set :data:`sys.argv` item 0.
- :returns: A tuple with two values:
- 1. The return code (an integer).
- 2. The captured output (a string).
- """
- # Add the `program_name' option to the arguments.
- arguments = list(arguments)
- arguments.insert(0, options.pop('program_name', sys.executable))
- # Log the command line arguments (and the fact that we're about to call the
- # command line entry point function).
- logger.debug("Calling command line entry point with arguments: %s", arguments)
- # Prepare to capture the return code and output even if the command line
- # interface raises an exception (whether the exception type is SystemExit
- # or something else).
- returncode = 0
- stdout = None
- stderr = None
- try:
- # Temporarily override sys.argv.
- with PatchedAttribute(sys, 'argv', arguments):
- # Manipulate the standard input/output/error streams?
- options['enabled'] = options.pop('capture', True)
- with CaptureOutput(**options) as capturer:
- try:
- # Call the command line interface.
- entry_point()
- finally:
- # Get the output even if an exception is raised.
- stdout = capturer.stdout.getvalue()
- stderr = capturer.stderr.getvalue()
- # Reconfigure logging to the terminal because it is very
- # likely that the entry point function has changed the
- # configured log level.
- configure_logging()
- except BaseException as e:
- if isinstance(e, SystemExit):
- logger.debug("Intercepting return code %s from SystemExit exception.", e.code)
- returncode = e.code
- else:
- logger.warning("Defaulting return code to 1 due to raised exception.", exc_info=True)
- returncode = 1
- else:
- logger.debug("Command line entry point returned successfully!")
- # Always log the output captured on stdout/stderr, to make it easier to
- # diagnose test failures (but avoid duplicate logging when merged=True).
- is_merged = options.get('merged', False)
- merged_streams = [('merged streams', stdout)]
- separate_streams = [('stdout', stdout), ('stderr', stderr)]
- streams = merged_streams if is_merged else separate_streams
- for name, value in streams:
- if value:
- logger.debug("Output on %s:\n%s", name, value)
- else:
- logger.debug("No output on %s.", name)
- return returncode, stdout
- def skip_on_raise(*exc_types):
- """
- Decorate a test function to translation specific exception types to :exc:`unittest.SkipTest`.
- :param exc_types: One or more positional arguments give the exception
- types to be translated to :exc:`unittest.SkipTest`.
- :returns: A decorator function specialized to `exc_types`.
- """
- def decorator(function):
- @functools.wraps(function)
- def wrapper(*args, **kw):
- try:
- return function(*args, **kw)
- except exc_types as e:
- logger.debug("Translating exception to unittest.SkipTest ..", exc_info=True)
- raise unittest.SkipTest("skipping test because %s was raised" % type(e))
- return wrapper
- return decorator
- def touch(filename):
- """
- The equivalent of the UNIX :man:`touch` program in Python.
- :param filename: The pathname of the file to touch (a string).
- Note that missing directories are automatically created using
- :func:`make_dirs()`.
- """
- make_dirs(os.path.dirname(filename))
- with open(filename, 'a'):
- os.utime(filename, None)
- class CallableTimedOut(Exception):
- """Raised by :func:`retry()` when the timeout expires."""
- class ContextManager(object):
- """Base class to enable composition of context managers."""
- def __enter__(self):
- """Enable use as context managers."""
- return self
- def __exit__(self, exc_type=None, exc_value=None, traceback=None):
- """Enable use as context managers."""
- class PatchedAttribute(ContextManager):
- """Context manager that temporary replaces an object attribute using :func:`setattr()`."""
- def __init__(self, obj, name, value):
- """
- Initialize a :class:`PatchedAttribute` object.
- :param obj: The object to patch.
- :param name: An attribute name.
- :param value: The value to set.
- """
- self.object_to_patch = obj
- self.attribute_to_patch = name
- self.patched_value = value
- self.original_value = NOTHING
- def __enter__(self):
- """
- Replace (patch) the attribute.
- :returns: The object whose attribute was patched.
- """
- # Enable composition of context managers.
- super(PatchedAttribute, self).__enter__()
- # Patch the object's attribute.
- self.original_value = getattr(self.object_to_patch, self.attribute_to_patch, NOTHING)
- setattr(self.object_to_patch, self.attribute_to_patch, self.patched_value)
- return self.object_to_patch
- def __exit__(self, exc_type=None, exc_value=None, traceback=None):
- """Restore the attribute to its original value."""
- # Enable composition of context managers.
- super(PatchedAttribute, self).__exit__(exc_type, exc_value, traceback)
- # Restore the object's attribute.
- if self.original_value is NOTHING:
- delattr(self.object_to_patch, self.attribute_to_patch)
- else:
- setattr(self.object_to_patch, self.attribute_to_patch, self.original_value)
- class PatchedItem(ContextManager):
- """Context manager that temporary replaces an object item using :meth:`~object.__setitem__()`."""
- def __init__(self, obj, item, value):
- """
- Initialize a :class:`PatchedItem` object.
- :param obj: The object to patch.
- :param item: The item to patch.
- :param value: The value to set.
- """
- self.object_to_patch = obj
- self.item_to_patch = item
- self.patched_value = value
- self.original_value = NOTHING
- def __enter__(self):
- """
- Replace (patch) the item.
- :returns: The object whose item was patched.
- """
- # Enable composition of context managers.
- super(PatchedItem, self).__enter__()
- # Patch the object's item.
- try:
- self.original_value = self.object_to_patch[self.item_to_patch]
- except KeyError:
- self.original_value = NOTHING
- self.object_to_patch[self.item_to_patch] = self.patched_value
- return self.object_to_patch
- def __exit__(self, exc_type=None, exc_value=None, traceback=None):
- """Restore the item to its original value."""
- # Enable composition of context managers.
- super(PatchedItem, self).__exit__(exc_type, exc_value, traceback)
- # Restore the object's item.
- if self.original_value is NOTHING:
- del self.object_to_patch[self.item_to_patch]
- else:
- self.object_to_patch[self.item_to_patch] = self.original_value
- class TemporaryDirectory(ContextManager):
- """
- Easy temporary directory creation & cleanup using the :keyword:`with` statement.
- Here's an example of how to use this:
- .. code-block:: python
- with TemporaryDirectory() as directory:
- # Do something useful here.
- assert os.path.isdir(directory)
- """
- def __init__(self, **options):
- """
- Initialize a :class:`TemporaryDirectory` object.
- :param options: Any keyword arguments are passed on to
- :func:`tempfile.mkdtemp()`.
- """
- self.mkdtemp_options = options
- self.temporary_directory = None
- def __enter__(self):
- """
- Create the temporary directory using :func:`tempfile.mkdtemp()`.
- :returns: The pathname of the directory (a string).
- """
- # Enable composition of context managers.
- super(TemporaryDirectory, self).__enter__()
- # Create the temporary directory.
- self.temporary_directory = tempfile.mkdtemp(**self.mkdtemp_options)
- return self.temporary_directory
- def __exit__(self, exc_type=None, exc_value=None, traceback=None):
- """Cleanup the temporary directory using :func:`shutil.rmtree()`."""
- # Enable composition of context managers.
- super(TemporaryDirectory, self).__exit__(exc_type, exc_value, traceback)
- # Cleanup the temporary directory.
- if self.temporary_directory is not None:
- shutil.rmtree(self.temporary_directory)
- self.temporary_directory = None
- class MockedHomeDirectory(PatchedItem, TemporaryDirectory):
- """
- Context manager to temporarily change ``$HOME`` (the current user's profile directory).
- This class is a composition of the :class:`PatchedItem` and
- :class:`TemporaryDirectory` context managers.
- """
- def __init__(self):
- """Initialize a :class:`MockedHomeDirectory` object."""
- PatchedItem.__init__(self, os.environ, 'HOME', os.environ.get('HOME'))
- TemporaryDirectory.__init__(self)
- def __enter__(self):
- """
- Activate the custom ``$PATH``.
- :returns: The pathname of the directory that has
- been added to ``$PATH`` (a string).
- """
- # Get the temporary directory.
- directory = TemporaryDirectory.__enter__(self)
- # Override the value to patch now that we have
- # the pathname of the temporary directory.
- self.patched_value = directory
- # Temporary patch $HOME.
- PatchedItem.__enter__(self)
- # Pass the pathname of the temporary directory to the caller.
- return directory
- def __exit__(self, exc_type=None, exc_value=None, traceback=None):
- """Deactivate the custom ``$HOME``."""
- super(MockedHomeDirectory, self).__exit__(exc_type, exc_value, traceback)
- class CustomSearchPath(PatchedItem, TemporaryDirectory):
- """
- Context manager to temporarily customize ``$PATH`` (the executable search path).
- This class is a composition of the :class:`PatchedItem` and
- :class:`TemporaryDirectory` context managers.
- """
- def __init__(self, isolated=False):
- """
- Initialize a :class:`CustomSearchPath` object.
- :param isolated: :data:`True` to clear the original search path,
- :data:`False` to add the temporary directory to the
- start of the search path.
- """
- # Initialize our own instance variables.
- self.isolated_search_path = isolated
- # Selectively initialize our superclasses.
- PatchedItem.__init__(self, os.environ, 'PATH', self.current_search_path)
- TemporaryDirectory.__init__(self)
- def __enter__(self):
- """
- Activate the custom ``$PATH``.
- :returns: The pathname of the directory that has
- been added to ``$PATH`` (a string).
- """
- # Get the temporary directory.
- directory = TemporaryDirectory.__enter__(self)
- # Override the value to patch now that we have
- # the pathname of the temporary directory.
- self.patched_value = (
- directory if self.isolated_search_path
- else os.pathsep.join([directory] + self.current_search_path.split(os.pathsep))
- )
- # Temporary patch the $PATH.
- PatchedItem.__enter__(self)
- # Pass the pathname of the temporary directory to the caller
- # because they may want to `install' custom executables.
- return directory
- def __exit__(self, exc_type=None, exc_value=None, traceback=None):
- """Deactivate the custom ``$PATH``."""
- super(CustomSearchPath, self).__exit__(exc_type, exc_value, traceback)
- @property
- def current_search_path(self):
- """The value of ``$PATH`` or :data:`os.defpath` (a string)."""
- return os.environ.get('PATH', os.defpath)
- class MockedProgram(CustomSearchPath):
- """
- Context manager to mock the existence of a program (executable).
- This class extends the functionality of :class:`CustomSearchPath`.
- """
- def __init__(self, name, returncode=0, script=None):
- """
- Initialize a :class:`MockedProgram` object.
- :param name: The name of the program (a string).
- :param returncode: The return code that the program should emit (a
- number, defaults to zero).
- :param script: Shell script code to include in the mocked program (a
- string or :data:`None`). This can be used to mock a
- program that is expected to generate specific output.
- """
- # Initialize our own instance variables.
- self.program_name = name
- self.program_returncode = returncode
- self.program_script = script
- self.program_signal_file = None
- # Initialize our superclasses.
- super(MockedProgram, self).__init__()
- def __enter__(self):
- """
- Create the mock program.
- :returns: The pathname of the directory that has
- been added to ``$PATH`` (a string).
- """
- directory = super(MockedProgram, self).__enter__()
- self.program_signal_file = os.path.join(directory, 'program-was-run-%s' % random_string(10))
- pathname = os.path.join(directory, self.program_name)
- with open(pathname, 'w') as handle:
- handle.write('#!/bin/sh\n')
- handle.write('echo > %s\n' % pipes.quote(self.program_signal_file))
- if self.program_script:
- handle.write('%s\n' % self.program_script.strip())
- handle.write('exit %i\n' % self.program_returncode)
- os.chmod(pathname, 0o755)
- return directory
- def __exit__(self, *args, **kw):
- """
- Ensure that the mock program was run.
- :raises: :exc:`~exceptions.AssertionError` when
- the mock program hasn't been run.
- """
- try:
- assert self.program_signal_file and os.path.isfile(self.program_signal_file), \
- ("It looks like %r was never run!" % self.program_name)
- finally:
- return super(MockedProgram, self).__exit__(*args, **kw)
- class CaptureOutput(ContextManager):
- """
- Context manager that captures what's written to :data:`sys.stdout` and :data:`sys.stderr`.
- .. attribute:: stdin
- The :class:`~humanfriendly.compat.StringIO` object used to feed the standard input stream.
- .. attribute:: stdout
- The :class:`CaptureBuffer` object used to capture the standard output stream.
- .. attribute:: stderr
- The :class:`CaptureBuffer` object used to capture the standard error stream.
- """
- def __init__(self, merged=False, input='', enabled=True):
- """
- Initialize a :class:`CaptureOutput` object.
- :param merged: :data:`True` to merge the streams,
- :data:`False` to capture them separately.
- :param input: The data that reads from :data:`sys.stdin`
- should return (a string).
- :param enabled: :data:`True` to enable capturing (the default),
- :data:`False` otherwise. This makes it easy to
- unconditionally use :class:`CaptureOutput` in
- a :keyword:`with` block while preserving the
- choice to opt out of capturing output.
- """
- self.stdin = StringIO(input)
- self.stdout = CaptureBuffer()
- self.stderr = self.stdout if merged else CaptureBuffer()
- self.patched_attributes = []
- if enabled:
- self.patched_attributes.extend(
- PatchedAttribute(sys, name, getattr(self, name))
- for name in ('stdin', 'stdout', 'stderr')
- )
- def __enter__(self):
- """Start capturing what's written to :data:`sys.stdout` and :data:`sys.stderr`."""
- super(CaptureOutput, self).__enter__()
- for context in self.patched_attributes:
- context.__enter__()
- return self
- def __exit__(self, exc_type=None, exc_value=None, traceback=None):
- """Stop capturing what's written to :data:`sys.stdout` and :data:`sys.stderr`."""
- super(CaptureOutput, self).__exit__(exc_type, exc_value, traceback)
- for context in self.patched_attributes:
- context.__exit__(exc_type, exc_value, traceback)
- def get_lines(self):
- """Get the contents of :attr:`stdout` split into separate lines."""
- return self.get_text().splitlines()
- def get_text(self):
- """Get the contents of :attr:`stdout` as a Unicode string."""
- return self.stdout.get_text()
- def getvalue(self):
- """Get the text written to :data:`sys.stdout`."""
- return self.stdout.getvalue()
- class CaptureBuffer(StringIO):
- """
- Helper for :class:`CaptureOutput` to provide an easy to use API.
- The two methods defined by this subclass were specifically chosen to match
- the names of the methods provided by my :pypi:`capturer` package which
- serves a similar role as :class:`CaptureOutput` but knows how to simulate
- an interactive terminal (tty).
- """
- def get_lines(self):
- """Get the contents of the buffer split into separate lines."""
- return self.get_text().splitlines()
- def get_text(self):
- """Get the contents of the buffer as a Unicode string."""
- return self.getvalue()
- class TestCase(unittest.TestCase):
- """Subclass of :class:`unittest.TestCase` with automatic logging and other miscellaneous features."""
- def __init__(self, *args, **kw):
- """
- Initialize a :class:`TestCase` object.
- Any positional and/or keyword arguments are passed on to the
- initializer of the superclass.
- """
- super(TestCase, self).__init__(*args, **kw)
- def setUp(self, log_level=logging.DEBUG):
- """setUp(log_level=logging.DEBUG)
- Automatically configure logging to the terminal.
- :param log_level: Refer to :func:`configure_logging()`.
- The :func:`setUp()` method is automatically called by
- :class:`unittest.TestCase` before each test method starts.
- It does two things:
- - Logging to the terminal is configured using
- :func:`configure_logging()`.
- - Before the test method starts a newline is emitted, to separate the
- name of the test method (which will be printed to the terminal by
- :mod:`unittest` or :pypi:`pytest`) from the first line of logging
- output that the test method is likely going to generate.
- """
- # Configure logging to the terminal.
- configure_logging(log_level)
- # Separate the name of the test method (printed by the superclass
- # and/or py.test without a newline at the end) from the first line of
- # logging output that the test method is likely going to generate.
- sys.stderr.write("\n")
|