| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388 |
- """
- Support for random optimizers, including the random-greedy path.
- """
- import functools
- import heapq
- import math
- import numbers
- import time
- from collections import deque
- from . import helpers, paths
- # random.choices was introduced in python 3.6
- try:
- from random import choices as random_choices
- from random import seed as random_seed
- except ImportError:
- import numpy as np
- def random_choices(population, weights):
- norm = sum(weights)
- return np.random.choice(population, p=[w / norm for w in weights], size=1)
- random_seed = np.random.seed
- __all__ = ["RandomGreedy", "random_greedy", "random_greedy_128"]
- class RandomOptimizer(paths.PathOptimizer):
- """Base class for running any random path finder that benefits
- from repeated calling, possibly in a parallel fashion. Custom random
- optimizers should subclass this, and the ``setup`` method should be
- implemented with the following signature::
- def setup(self, inputs, output, size_dict):
- # custom preparation here ...
- return trial_fn, trial_args
- Where ``trial_fn`` itself should have the signature::
- def trial_fn(r, *trial_args):
- # custom computation of path here
- return ssa_path, cost, size
- Where ``r`` is the run number and could for example be used to seed a
- random number generator. See ``RandomGreedy`` for an example.
- Parameters
- ----------
- max_repeats : int, optional
- The maximum number of repeat trials to have.
- max_time : float, optional
- The maximum amount of time to run the algorithm for.
- minimize : {'flops', 'size'}, optional
- Whether to favour paths that minimize the total estimated flop-count or
- the size of the largest intermediate created.
- parallel : {bool, int, or executor-pool like}, optional
- Whether to parallelize the random trials, by default ``False``. If
- ``True``, use a ``concurrent.futures.ProcessPoolExecutor`` with the same
- number of processes as cores. If an integer is specified, use that many
- processes instead. Finally, you can supply a custom executor-pool which
- should have an API matching that of the python 3 standard library
- module ``concurrent.futures``. Namely, a ``submit`` method that returns
- ``Future`` objects, themselves with ``result`` and ``cancel`` methods.
- pre_dispatch : int, optional
- If running in parallel, how many jobs to pre-dispatch so as to avoid
- submitting all jobs at once. Should also be more than twice the number
- of workers to avoid under-subscription. Default: 128.
- Attributes
- ----------
- path : list[tuple[int]]
- The best path found so far.
- costs : list[int]
- The list of each trial's costs found so far.
- sizes : list[int]
- The list of each trial's largest intermediate size so far.
- See Also
- --------
- RandomGreedy
- """
- def __init__(self, max_repeats=32, max_time=None, minimize='flops', parallel=False, pre_dispatch=128):
- if minimize not in ('flops', 'size'):
- raise ValueError("`minimize` should be one of {'flops', 'size'}.")
- self.max_repeats = max_repeats
- self.max_time = max_time
- self.minimize = minimize
- self.better = paths.get_better_fn(minimize)
- self.parallel = parallel
- self.pre_dispatch = pre_dispatch
- self.costs = []
- self.sizes = []
- self.best = {'flops': float('inf'), 'size': float('inf')}
- self._repeats_start = 0
- @property
- def path(self):
- """The best path found so far.
- """
- return paths.ssa_to_linear(self.best['ssa_path'])
- @property
- def parallel(self):
- return self._parallel
- @parallel.setter
- def parallel(self, parallel):
- # shutdown any previous executor if we are managing it
- if getattr(self, '_managing_executor', False):
- self._executor.shutdown()
- self._parallel = parallel
- self._managing_executor = False
- if parallel is False:
- self._executor = None
- return
- if parallel is True:
- from concurrent.futures import ProcessPoolExecutor
- self._executor = ProcessPoolExecutor()
- self._managing_executor = True
- return
- if isinstance(parallel, numbers.Number):
- from concurrent.futures import ProcessPoolExecutor
- self._executor = ProcessPoolExecutor(parallel)
- self._managing_executor = True
- return
- # assume a pool-executor has been supplied
- self._executor = parallel
- def _gen_results_parallel(self, repeats, trial_fn, args):
- """Lazily generate results from an executor without submitting all jobs at once.
- """
- self._futures = deque()
- # the idea here is to submit at least ``pre_dispatch`` jobs *before* we
- # yield any results, then do both in tandem, before draining the queue
- for r in repeats:
- if len(self._futures) < self.pre_dispatch:
- self._futures.append(self._executor.submit(trial_fn, r, *args))
- continue
- yield self._futures.popleft().result()
- while self._futures:
- yield self._futures.popleft().result()
- def _cancel_futures(self):
- if self._executor is not None:
- for f in self._futures:
- f.cancel()
- def setup(self, inputs, output, size_dict):
- raise NotImplementedError
- def __call__(self, inputs, output, size_dict, memory_limit):
- self._check_args_against_first_call(inputs, output, size_dict)
- # start a timer?
- if self.max_time is not None:
- t0 = time.time()
- trial_fn, trial_args = self.setup(inputs, output, size_dict)
- r_start = self._repeats_start + len(self.costs)
- r_stop = r_start + self.max_repeats
- repeats = range(r_start, r_stop)
- # create the trials lazily
- if self._executor is not None:
- trials = self._gen_results_parallel(repeats, trial_fn, trial_args)
- else:
- trials = (trial_fn(r, *trial_args) for r in repeats)
- # assess the trials
- for ssa_path, cost, size in trials:
- # keep track of all costs and sizes
- self.costs.append(cost)
- self.sizes.append(size)
- # check if we have found a new best
- found_new_best = self.better(cost, size, self.best['flops'], self.best['size'])
- if found_new_best:
- self.best['flops'] = cost
- self.best['size'] = size
- self.best['ssa_path'] = ssa_path
- # check if we have run out of time
- if (self.max_time is not None) and (time.time() > t0 + self.max_time):
- break
- self._cancel_futures()
- return self.path
- def __del__(self):
- # if we created the parallel pool-executor, shut it down
- if getattr(self, '_managing_executor', False):
- self._executor.shutdown()
- def thermal_chooser(queue, remaining, nbranch=8, temperature=1, rel_temperature=True):
- """A contraction 'chooser' that weights possible contractions using a
- Boltzmann distribution. Explicitly, given costs ``c_i`` (with ``c_0`` the
- smallest), the relative weights, ``w_i``, are computed as:
- w_i = exp( -(c_i - c_0) / temperature)
- Additionally, if ``rel_temperature`` is set, scale ``temperature`` by
- ``abs(c_0)`` to account for likely fluctuating cost magnitudes during the
- course of a contraction.
- Parameters
- ----------
- queue : list
- The heapified list of candidate contractions.
- remaining : dict[str, int]
- Mapping of remaining inputs' indices to the ssa id.
- temperature : float, optional
- When choosing a possible contraction, its relative probability will be
- proportional to ``exp(-cost / temperature)``. Thus the larger
- ``temperature`` is, the further random paths will stray from the normal
- 'greedy' path. Conversely, if set to zero, only paths with exactly the
- same cost as the best at each step will be explored.
- rel_temperature : bool, optional
- Whether to normalize the ``temperature`` at each step to the scale of
- the best cost. This is generally beneficial as the magnitude of costs
- can vary significantly throughout a contraction.
- nbranch : int, optional
- How many potential paths to calculate probability for and choose from
- at each step.
- Returns
- -------
- cost, k1, k2, k12
- """
- n = 0
- choices = []
- while queue and n < nbranch:
- cost, k1, k2, k12 = heapq.heappop(queue)
- if k1 not in remaining or k2 not in remaining:
- continue # candidate is obsolete
- choices.append((cost, k1, k2, k12))
- n += 1
- if n == 0:
- return None
- if n == 1:
- return choices[0]
- costs = [choice[0][0] for choice in choices]
- cmin = costs[0]
- # adjust by the overall scale to account for fluctuating absolute costs
- if rel_temperature:
- temperature *= max(1, abs(cmin))
- # compute relative probability for each potential contraction
- if temperature == 0.0:
- energies = [1 if c == cmin else 0 for c in costs]
- else:
- # shift by cmin for numerical reasons
- energies = [math.exp(-(c - cmin) / temperature) for c in costs]
- # randomly choose a contraction based on energies
- chosen, = random_choices(range(n), weights=energies)
- cost, k1, k2, k12 = choices.pop(chosen)
- # put the other choise back in the heap
- for other in choices:
- heapq.heappush(queue, other)
- return cost, k1, k2, k12
- def ssa_path_compute_cost(ssa_path, inputs, output, size_dict):
- """Compute the flops and max size of an ssa path.
- """
- inputs = list(map(frozenset, inputs))
- output = frozenset(output)
- remaining = set(range(len(inputs)))
- total_cost = 0
- max_size = 0
- for i, j in ssa_path:
- k12, flops12 = paths.calc_k12_flops(inputs, output, remaining, i, j, size_dict)
- remaining.discard(i)
- remaining.discard(j)
- remaining.add(len(inputs))
- inputs.append(k12)
- total_cost += flops12
- max_size = max(max_size, helpers.compute_size_by_dict(k12, size_dict))
- return total_cost, max_size
- def _trial_greedy_ssa_path_and_cost(r, inputs, output, size_dict, choose_fn, cost_fn):
- """A single, repeatable, greedy trial run. Returns ``ssa_path`` and cost.
- """
- if r == 0:
- # always start with the standard greedy approach
- choose_fn = None
- random_seed(r)
- ssa_path = paths.ssa_greedy_optimize(inputs, output, size_dict, choose_fn, cost_fn)
- cost, size = ssa_path_compute_cost(ssa_path, inputs, output, size_dict)
- return ssa_path, cost, size
- class RandomGreedy(RandomOptimizer):
- """
- Parameters
- ----------
- cost_fn : callable, optional
- A function that returns a heuristic 'cost' of a potential contraction
- with which to sort candidates. Should have signature
- ``cost_fn(size12, size1, size2, k12, k1, k2)``.
- temperature : float, optional
- When choosing a possible contraction, its relative probability will be
- proportional to ``exp(-cost / temperature)``. Thus the larger
- ``temperature`` is, the further random paths will stray from the normal
- 'greedy' path. Conversely, if set to zero, only paths with exactly the
- same cost as the best at each step will be explored.
- rel_temperature : bool, optional
- Whether to normalize the ``temperature`` at each step to the scale of
- the best cost. This is generally beneficial as the magnitude of costs
- can vary significantly throughout a contraction. If False, the
- algorithm will end up branching when the absolute cost is low, but
- stick to the 'greedy' path when the cost is high - this can also be
- beneficial.
- nbranch : int, optional
- How many potential paths to calculate probability for and choose from
- at each step.
- kwargs
- Supplied to RandomOptimizer.
- See Also
- --------
- RandomOptimizer
- """
- def __init__(self, cost_fn='memory-removed-jitter', temperature=1.0, rel_temperature=True, nbranch=8, **kwargs):
- self.cost_fn = cost_fn
- self.temperature = temperature
- self.rel_temperature = rel_temperature
- self.nbranch = nbranch
- super().__init__(**kwargs)
- @property
- def choose_fn(self):
- """The function that chooses which contraction to take - make this a
- property so that ``temperature`` and ``nbranch`` etc. can be updated
- between runs.
- """
- if self.nbranch == 1:
- return None
- return functools.partial(thermal_chooser,
- temperature=self.temperature,
- nbranch=self.nbranch,
- rel_temperature=self.rel_temperature)
- def setup(self, inputs, output, size_dict):
- fn = _trial_greedy_ssa_path_and_cost
- args = (inputs, output, size_dict, self.choose_fn, self.cost_fn)
- return fn, args
- def random_greedy(inputs, output, idx_dict, memory_limit=None, **optimizer_kwargs):
- """
- """
- optimizer = RandomGreedy(**optimizer_kwargs)
- return optimizer(inputs, output, idx_dict, memory_limit)
- random_greedy_128 = functools.partial(random_greedy, max_repeats=128)
|