# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""General purpose timer related functions."""
# STDLIB
import time
import warnings
from collections import OrderedDict
from collections.abc import Iterable
from functools import partial, wraps
# THIRD-PARTY
import numpy as np
# LOCAL
from astropy import units as u
from astroquery import log
from astropy import modeling
from astropy.utils.exceptions import AstropyUserWarning
__all__ = ['timefunc', 'RunTimePredictor']
__doctest_skip__ = ['timefunc']
[docs]
def timefunc(*, num_tries=1, verbose=True):
"""Decorator to time a function or method.
Parameters
----------
num_tries : int, optional
Number of calls to make. Timer will take the
average run time.
verbose : bool, optional
Extra log information.
Returns
-------
tt : float
Average run time in seconds.
result
Output(s) from the function.
Examples
--------
To add timer to time `numpy.log` for 100 times with
verbose output::
import numpy as np
from astroquery.utils.timer import timefunc
@timefunc(100)
def timed_log(x):
return np.log(x)
To run the decorated function above:
>>> t, y = timed_log(100)
INFO: timed_log took 9.29832458496e-06 s on AVERAGE for 100 call(s). [...]
>>> t
9.298324584960938e-06
>>> y
4.6051701859880918
"""
def real_decorator(function):
@wraps(function)
def wrapper(*args, **kwargs):
ts = time.time()
for i in range(num_tries):
result = function(*args, **kwargs)
te = time.time()
tt = (te - ts) / num_tries
if verbose: # pragma: no cover
log.info('{} took {} s on AVERAGE for {} call(s).'.format(
function.__name__, tt, num_tries))
return tt, result
return wrapper
return real_decorator
[docs]
class RunTimePredictor:
"""Class to predict run time.
.. note:: Only predict for single varying numeric input parameter.
Parameters
----------
func : function
Function to time.
args : tuple
Fixed positional argument(s) for the function.
kwargs : dict
Fixed keyword argument(s) for the function.
Examples
--------
>>> from astroquery.utils.timer import RunTimePredictor
Set up a predictor for :math:`10^{x}`:
>>> p = RunTimePredictor(pow, 10)
Give it baseline data to use for prediction and
get the function output values:
>>> p.time_func(range(10, 1000, 200))
>>> for input, result in sorted(p.results.items()):
... print("pow(10, {0})\\n{1}".format(input, result))
pow(10, 10)
10000000000
pow(10, 210)
10000000000...
pow(10, 410)
10000000000...
pow(10, 610)
10000000000...
pow(10, 810)
10000000000...
Fit a straight line assuming :math:`\\text{arg}^{1}` relationship
(coefficients are returned):
>>> p.do_fit() # doctest: +SKIP
array([1.16777420e-05, 1.00135803e-08])
Predict run time for :math:`10^{5000}`:
>>> p.predict_time(5000) # doctest: +SKIP
6.174564361572262e-05
Plot the prediction:
>>> p.plot(xlabeltext='Power of 10') # doctest: +SKIP
.. image:: /_static/timer_prediction_pow10.png
:width: 450px
:alt: Example plot from `astroquery.utils.timer.RunTimePredictor`
When the changing argument is not the last, e.g.,
:math:`x^{2}`, something like this might work:
>>> p = RunTimePredictor(lambda x: pow(x, 2))
>>> p.time_func([2, 3, 5])
>>> sorted(p.results.items())
[(2, 4), (3, 9), (5, 25)]
"""
def __init__(self, func, *args, **kwargs):
self._funcname = func.__name__
self._pfunc = partial(func, *args, **kwargs)
self._cache_good = OrderedDict()
self._cache_bad = []
self._cache_est = OrderedDict()
self._cache_out = OrderedDict()
self._fit_func = None
self._power = None
@property
def results(self):
"""Function outputs from `time_func`.
A dictionary mapping input arguments (fixed arguments
are not included) to their respective output values.
"""
return self._cache_out
@timefunc(num_tries=1, verbose=False)
def _timed_pfunc(self, arg):
"""Run partial func once for single arg and time it."""
return self._pfunc(arg)
def _cache_time(self, arg):
"""Cache timing results without repetition."""
if arg not in self._cache_good and arg not in self._cache_bad:
try:
result = self._timed_pfunc(arg)
except Exception as e:
warnings.warn(str(e), AstropyUserWarning)
self._cache_bad.append(arg)
else:
self._cache_good[arg] = result[0] # Run time
self._cache_out[arg] = result[1] # Function output
[docs]
def time_func(self, arglist):
"""Time the partial function for a list of single args
and store run time in a cache. This forms a baseline for
the prediction.
This also stores function outputs in `results`.
Parameters
----------
arglist : list of numbers
List of input arguments to time.
"""
if not isinstance(arglist, Iterable):
arglist = [arglist]
# Preserve arglist order
for arg in arglist:
self._cache_time(arg)
# FUTURE: Implement N^x * O(log(N)) fancy fitting.
[docs]
def do_fit(self, *, model=None, fitter=None, power=1, min_datapoints=3):
"""Fit a function to the lists of arguments and
their respective run time in the cache.
By default, this does a linear least-square fitting
to a straight line on run time w.r.t. argument values
raised to the given power, and returns the optimal
intercept and slope.
Parameters
----------
model : `astropy.modeling.Model`
Model for the expected trend of run time (Y-axis)
w.r.t. :math:`\\text{arg}^{\\text{power}}` (X-axis).
If `None`, will use `~astropy.modeling.polynomial.Polynomial1D`
with ``degree=1``.
fitter : `astropy.modeling.fitting.Fitter`
Fitter for the given model to extract optimal coefficient values.
If `None`, will use `~astropy.modeling.fitting.LinearLSQFitter`.
power : int, optional
Power of values to fit.
min_datapoints : int, optional
Minimum number of data points required for fitting.
They can be built up with `time_func`.
Returns
-------
a : array-like
Fitted `~astropy.modeling.FittableModel` parameters.
Raises
------
ValueError
Insufficient data points for fitting.
ModelsError
Invalid model or fitter.
"""
# Reset related attributes
self._power = power
self._cache_est = OrderedDict()
x_arr = np.array(list(self._cache_good.keys()))
if x_arr.size < min_datapoints:
raise ValueError('requires {} points but has {}'.format(
min_datapoints, x_arr.size))
if model is None:
model = modeling.models.Polynomial1D(1)
elif not isinstance(model, modeling.core.Model):
raise modeling.fitting.ModelsError(
'{} is not a model.'.format(model))
if fitter is None:
fitter = modeling.fitting.LinearLSQFitter()
elif not isinstance(fitter, modeling.fitting.Fitter):
raise modeling.fitting.ModelsError(
'{} is not a fitter.'.format(fitter))
self._fit_func = fitter(
model, x_arr**power, list(self._cache_good.values()))
return self._fit_func.parameters
[docs]
def predict_time(self, arg):
"""Predict run time for given argument.
If prediction is already cached, cached value is returned.
Parameters
----------
arg : number
Input argument to predict run time for.
Returns
-------
t_est : float
Estimated run time for given argument.
Raises
------
RuntimeError
No fitted data for prediction.
"""
if arg in self._cache_est:
t_est = self._cache_est[arg]
else:
if self._fit_func is None:
raise RuntimeError('no fitted data for prediction')
t_est = self._fit_func(arg**self._power)
self._cache_est[arg] = t_est
return t_est
[docs]
def plot(self, *, xscale='linear', yscale='linear', xlabeltext='args',
save_as=''): # pragma: no cover
"""Plot prediction.
.. note:: Uses `matplotlib <https://matplotlib.org/>`_.
Parameters
----------
xscale, yscale : {'linear', 'log', 'symlog'}
Scaling for `matplotlib.axes.Axes`.
xlabeltext : str, optional
Text for X-label.
save_as : str, optional
Save plot as given filename.
Raises
------
RuntimeError
Insufficient data for plotting.
"""
import matplotlib.pyplot as plt
# Actual data
x_arr = sorted(self._cache_good)
y_arr = np.array([self._cache_good[x] for x in x_arr])
if len(x_arr) <= 1:
raise RuntimeError('insufficient data for plotting')
# Auto-ranging
qmean = y_arr.mean() * u.second
for cur_u in (u.minute, u.second, u.millisecond, u.microsecond,
u.nanosecond):
val = qmean.to_value(cur_u)
if 1000 > val >= 1:
break
y_arr = (y_arr * u.second).to_value(cur_u)
fig, ax = plt.subplots()
ax.plot(x_arr, y_arr, 'kx-', label='Actual')
# Fitted data
if self._fit_func is not None:
x_est = list(self._cache_est.keys())
y_est = (np.array(list(self._cache_est.values())) * u.second).to_value(cur_u)
ax.scatter(x_est, y_est, marker='o', c='r', label='Predicted')
x_fit = np.array(sorted(x_arr + x_est))
y_fit = (self._fit_func(x_fit**self._power) * u.second).to_value(cur_u)
ax.plot(x_fit, y_fit, 'b--', label='Fit')
ax.set_xscale(xscale)
ax.set_yscale(yscale)
ax.set_xlabel(xlabeltext)
ax.set_ylabel('Run time ({})'.format(cur_u.to_string()))
ax.set_title(self._funcname)
ax.legend(loc='best', numpoints=1)
plt.draw()
if save_as:
plt.savefig(save_as)