import numpy as np
import importlib
import copy
import warnings
[docs]
class calibrator(object):
def __init__(self,
emu=None,
y=None,
x=None,
thetaprior=None,
yvar=None,
method='directbayes',
args={}):
'''
A class to represent a calibrator. Fits a calibrator model provided
in ``calibrationmethods/[method].py`` where [method] is the user
option with default listed above.
.. tip::
To use a new calibrator, just drop a new file to the
``calibrationmethods/`` directory with the required formatting.
:Example:
.. code-block:: python
calibrator(emu=emu, y=y, x=x, thetaprior=thetaprior,
method='directbayes', args=args)
Parameters
----------
emu : surmise.emulation.emulator, optional
An emulator class instance as defined in surmise.emulation.
The default is None.
y : numpy.ndarray, optional
Array of observed values at x. The default is None.
x : numpy.ndarray, optional
An array of x values that match the definition of "emu.x".
Currently, existing methods supports only the case when x is a
subset of "emu.x". The default is None.
thetaprior : class, optional
class instance with two built-in functions. The default is None.
.. important::
If a calibration method requires sampling, then
the prior distribution of the parameters should be included
into the calibrator. In this case, thetaprior class
should include two methods:
- ``lpdf(theta)``
Returns the log of the pdf of a given theta with size
``(len(theta), 1)``
- ``rnd(n)``
Generates n random variable from a prior distribution.
:Example:
.. code-block:: python
class prior_example:
def lpdf(theta):
return sps.uniform.logpdf(
theta[:, 0], 0, 1).reshape((len(theta), 1))
def rnd(n):
return np.vstack((sps.uniform.rvs(0, 1, size=n)))
yvar : numpy.ndarray, optional
The vector of observation variances at y. The default is None.
method : str, optional
A string that points to the file located in ``calibrationmethods/``
you would like to use. The default is 'directbayes'.
args : dict, optional
Optional dictionary containing options you would like to pass to
[method].fit(x, theta, f, args)
or
[method].predict(x, theta args) The default is {}.
Raises
------
ValueError
If the dimension of the data do not match with the fitted emulator.
Returns
-------
None.
'''
# default to showing all warnings
if ('warnings' in args.keys()) and ~args['warnings']:
warnings.simplefilter('ignore')
else:
warnings.resetwarnings()
self.args = args
if y is None:
raise ValueError('You have not provided any y.')
if y.ndim > 1.5:
y = np.squeeze(y)
if y.shape[0] < 5:
raise ValueError('5 is the minimum number of observations at this '
'time.')
self.y = y
if emu is None:
raise ValueError('You have not provided any emulator.')
self.emu = emu
try:
thetatestsamp = thetaprior.rnd(100)
except Exception:
raise ValueError('thetaprior.rnd(100) failed.')
if thetatestsamp.shape[0] != 100:
raise ValueError('thetaprior.rnd(100) failed to give 100 values.')
try:
thetatestlpdf = thetaprior.lpdf(thetatestsamp)
except Exception:
raise ValueError('thetaprior.lpdf(thetatestsamp) failed.')
if thetatestlpdf.shape[0] != 100:
raise ValueError('thetaprior.lpdf(thetaprior.rnd(100)) failed to '
'give 100 values.')
# if thetatestlpdf.ndim != 1:
# raise ValueError('thetaprior.lpdf(thetaprior.rnd(100)) has '
# 'dimension higher than 1.')
self.info = {}
self.info['thetaprior'] = copy.deepcopy(thetaprior)
if x is not None:
if x.shape[0] != y.shape[0]:
raise ValueError('If x is provided, shape[0] must align with '
'the length of y.')
self.x = copy.deepcopy(x)
predtry = emu.predict(copy.copy(self.x), thetatestsamp)
if y.shape[0] != predtry().shape[0]:
if x is None:
raise ValueError('y and emu.predict(theta) must have the same '
'shape')
else:
raise ValueError('y and emu.predict(x,theta) must have the '
'same shape')
else:
prednotfinite = np.logical_not(np.isfinite(predtry()))
if np.any(prednotfinite):
warnings.warn('Some non-finite values from emulation '
'received.')
fracfail = np.mean(prednotfinite, 1)
if np.sum(fracfail <= 10**(-3)) < 5:
raise ValueError('Your emulator failed enough places to '
'give up.')
else:
warnings.warn('Current protocol is to remove observations'
' that have nonfinite values.')
whichrm = np.where(fracfail > 10**(-3))[0]
warnings.warn('Removing values at %s.'
% np.array2string(whichrm))
whichkeep = np.where(fracfail <= 10**(-3))[0]
if x is not None:
self.x = self.x[whichkeep, :]
self.y = self.y[whichkeep]
else:
whichkeep = None
if yvar is not None:
if yvar.shape[0] != y.shape[0] and yvar.shape[0] > 1.5:
raise ValueError('yvar must be the same size as y or '
'of size 1.')
if np.min(yvar) < 0:
raise ValueError('yvar has at least one negative value.')
if np.min(yvar) < 10 ** (-6) or np.max(yvar) > 10 ** (6):
raise ValueError('Rescale your problem so that the yvar'
' is between 10 ^ -6 and 10 ^ 6.')
self.info['yvar'] = copy.deepcopy(yvar)
if whichkeep is not None:
self.info['yvar'] = self.info['yvar'][whichkeep]
try:
self.method = importlib.import_module('surmise.calibrationmethods.'
+ method)
except Exception:
raise ValueError('Module not found!')
self.fit()
def __repr__(self):
object_method = [method_name for method_name in dir(self)
if callable(getattr(self, method_name))]
object_method = [x for x in object_method if not x.startswith('__')]
object_method = [x for x in object_method if not x.startswith('emu')]
strrepr = ('A calibration object where the code in located in the file'
' calibration. The main method are cal.' +
', cal.'. join(object_method) + '. Default of cal(x) is '
'cal.predict(x). Run help(cal) for the document string.')
return strrepr
def __call__(self, x=None):
return self.predict(x)
[docs]
def fit(self, args=None):
"""
Calls "calibrationmethods.[method].fit" where "[method]" is the user
option.
Parameters
----------
args : dict
A dictionary containing options you would like to pass
"""
if args is not None:
argstemp = {**self.args, **copy.deepcopy(args)}
else:
argstemp = copy.copy(self.args)
self.method.fit(self.info, self.emu, self.x, self.y, **argstemp)
if hasattr(self, 'theta'):
del self.theta
# : theta attribute of calibrator
self.theta = thetadist(self)
return
[docs]
def predict(self, x=None, args=None):
'''
Returns predictions at x.
:Example:
.. code-block:: python
calibrator.predict(x=x, args=args)
Parameters
----------
x : numpy.ndarray, optional
An array of inputs to the model where to predict. The default is
None.
args : dict, optional
A dictionary containing options. The default is None.
Returns
-------
surmise.calibration.prediction
An instance of calibration class prediction.
'''
if args is None:
args = self.args
if x is None:
x = self.x
info = {}
if 'predict' in dir(self.method):
self.method.predict(info, self.info, self.emu, x, args)
else:
emupred = self.emu.predict(x, self.theta.rnd(1000))
info['mean'] = np.mean(emupred.mean(), 1)
info['var'] = np.var(emupred.mean(), 1)
info['rnd'] = (emupred.mean()).T
return prediction(info, self)
[docs]
class prediction(object):
'''
A class to represent a calibration prediction.
predict.info will give the dictionary from the method.
:Example:
.. code-block:: python
prediction.lpdf()
prediction.mean()
prediction.var()
prediction.rnd()
'''
def __init__(self, info, cal):
self.info = info
self.cal = cal
def __repr__(self):
object_method = [method_name for method_name in dir(self)
if callable(getattr(self, method_name))]
object_method = [x for x in object_method if not x.startswith('_')]
object_method = [x for x in object_method if not x.startswith('cal')]
strrepr = ('A calibration prediction object predict where the code in'
' located in the file calibration. The main method are'
' predict.' +
', predict.'.join(object_method) + '. Default of predict() '
'is predict.mean() and ' +
'predict(s) will run predict.rnd(s). '
'Run help(predict) for the document' +
' string.')
return strrepr
def __call__(self, s=None, args=None):
if s is None:
return self.mean(args)
else:
return self.rnd(s, args)
def __methodnotfoundstr(self, pfstr, opstr):
warnings.warn(pfstr + opstr + ' functionality not in method... \n' +
' Key labeled ' + opstr + ' not ' +
'provided in ' + pfstr + '.info... \n' +
' Key labeled rnd not ' +
'provided in ' + pfstr + '.info...')
return 'Could not reconsile a good way to compute this value'
' in current method.'
[docs]
def mean(self, args=None):
"""
Returns the mean at all x in when building the prediction.
"""
pfstr = 'predict' # prefix string
opstr = 'mean' # operation string
if (pfstr + opstr) in dir(self.cal.method):
if args is None:
args = self.cal.args
return copy.deepcopy(self.cal.method.predictmean(self.info,
args))
elif opstr in self.info.keys():
return copy.deepcopy(self.info[opstr])
elif 'rnd' in self.info.keys():
self.info[opstr] = np.mean(self.info['rnd'], 0)
return copy.deepcopy(self.info[opstr])
else:
raise ValueError(self.__methodnotfoundstr(pfstr, opstr))
[docs]
def var(self, args=None):
"""
Returns the variance at all x in when building the prediction.
"""
pfstr = 'predict' # prefix string
opstr = 'var' # operation string
if (pfstr + opstr) in dir(self.cal.method):
if args is None:
args = self.cal.args
return copy.deepcopy(self.cal.method.predictvar(self.info,
args))
elif opstr in self.info.keys():
return copy.deepcopy(self.info[opstr])
elif 'rnd' in self.info.keys():
self.info[opstr] = np.var(self.info['rnd'], 0)
return copy.deepcopy(self.info[opstr])
else:
raise ValueError(self.__methodnotfoundstr(pfstr, opstr))
[docs]
def rnd(self, s=100, args=None):
"""
Returns s random draws at all x in when building the prediction.
"""
pfstr = 'predict' # prefix string
opstr = 'rnd' # operation string
if (pfstr + opstr) in dir(self.cal.method):
if args is None:
args = self.cal.args
return copy.deepcopy(self.cal.method.predictrnd(self.info,
args))
elif 'rnd' in self.info.keys():
return self.info['rnd'][np.random.choice(self.info['rnd'].shape[0],
size=s), :]
else:
raise ValueError(self.__methodnotfoundstr(pfstr, opstr))
[docs]
def lpdf(self, y=None, args=None):
"""
Returns a log pdf given theta.
"""
raise ValueError('lpdf functionality not in method')
[docs]
class thetadist(object):
"""
A class to represent a theta predictive distribution.
"""
def __init__(self, cal):
self.cal = cal
def __repr__(self):
object_method = [method_name for method_name in dir(self)
if callable(getattr(self, method_name))]
object_method = [x for x in object_method if not x.startswith('_')]
object_method = [x for x in object_method if not x.startswith('cal')]
strrepr = ('A theta distribution object where the code in located in'
' the file calibration. The main method are cal.theta' +
', cal.theta.'.join(object_method) + '. Default of '
'predict() is' +
' cal.theta.mean() and ' +
'cal.theta(s) will cal.theta.rnd(s).'
' Run help(cal.theta) for the document' +
' string.')
return strrepr
def __call__(self, s=None, args=None):
if s is None:
return self.mean(args)
else:
return self.rnd(s, args)
def __methodnotfoundstr(self, pfstr, opstr):
warnings.warn(pfstr + opstr + 'functionality not in method... \n' +
' Key labeled ' + (pfstr+opstr) + ' not ' +
'provided in cal.info... \n' +
' Key labeled ' + pfstr + 'rnd not ' +
'provided in cal.info...')
return 'Could not reconsile a good way to compute this value in'
' current method.'
[docs]
def mean(self, args=None):
"""
Returns mean of each element of theta found during calibration.
"""
pfstr = 'theta' # prefix string
opstr = 'mean' # operation string
if (pfstr + opstr) in dir(self.cal.method):
if args is None:
args = self.cal.args
return copy.deepcopy(self.cal.method.thetamean(self.cal.info,
args))
elif (pfstr+opstr) in self.cal.info.keys():
return copy.deepcopy(self.cal.info[(pfstr+opstr)])
elif (pfstr+'rnd') in self.cal.info.keys():
return np.mean(self.cal.info[(pfstr+'rnd')], 0)
else:
raise ValueError(self.__methodnotfoundstr(pfstr, opstr))
[docs]
def var(self, args=None):
"""
Returns predictive variance of each element of theta found
during calibration.
"""
pfstr = 'theta' # prefix string
opstr = 'var' # operation string
if (pfstr + opstr) in dir(self.cal.method):
if args is None:
args = self.cal.args
return copy.deepcopy(self.cal.method.thetavar(self.cal.info,
args))
elif (pfstr+opstr) in self.cal.info.keys():
return copy.deepcopy(self.cal.info[(pfstr+opstr)])
elif (pfstr+'rnd') in self.cal.info.keys():
return np.var(self.cal.info[(pfstr+'rnd')], 0)
else:
raise ValueError(self.__methodnotfoundstr(pfstr, opstr))
[docs]
def rnd(self, s=1000, args=None):
"""
Returns s predictive draws for theta found during calibration.
"""
pfstr = 'theta' # prefix string
opstr = 'rnd' # operation string
if (pfstr + opstr) in dir(self.cal.method):
if args is None:
args = self.cal.args
return copy.copy(self.cal.method.thetarnd(self.cal.info, s,
args))
elif (pfstr+opstr) in self.cal.info.keys():
return self.cal.info['thetarnd'][
np.random.choice(self.cal.info['thetarnd'].shape[0],
size=s), :]
else:
raise ValueError(self.__methodnotfoundstr(pfstr, opstr))
[docs]
def lpdf(self, theta=None, args=None):
"""
Returns a log pdf given theta.
"""
pfstr = 'theta' # prefix string
opstr = 'lpdf' # operation string
if (pfstr + opstr) in dir(self.cal.method):
if args is None:
args = self.cal.args
return copy.copy(self.cal.method.thetalpdf(self.cal.info, theta,
args))
else:
raise ValueError('lpdf functionality not in method')