Source code for PCGP

"""PCGP (Higdon et al., 2008)"""
import numpy as np
import scipy.optimize as spo


[docs] def fit(fitinfo, x, theta, f, epsilon=0.1, **kwargs): ''' The purpose of fit is to take information and plug all of our fit information into fitinfo, which is a python dictionary. .. note:: This is an application of the method proposed by Higdon et al., 2008. The idea is to use PCA to project the original simulator outputs onto a lower-dimensional space spanned by an orthogonal basis. The main steps are - 1. Standardize f - 2. Compute the SVD of f, and get the PCs - 3. Project the original centred data into the orthonormal space to obtain the matrix of coefficients (say we use r PCs) - 4. Then, build r separate and independent GPs from the input space Parameters ---------- fitinfo : dict A dictionary including the emulation fitting information once complete. The dictionary is passed by reference, so it returns None. x : numpy.ndarray An array of inputs. Each row should correspond to a row in f. theta : numpy.ndarray An array of parameters. Each row should correspond to a column in f. f : numpy.ndarray An array of responses. Each column in f should correspond to a row in theta. Each row in f should correspond to a row in x. args : dict, optional A dictionary containing options. The default is None. Returns ------- None. ''' f = f.T fitinfo['theta'] = theta fitinfo['x'] = x fitinfo['f'] = f fitinfo['epsilon'] = epsilon # standardize function evaluations f __standardizef(fitinfo) # apply PCA to reduce the dimension of f __PCs(fitinfo) numpcs = fitinfo['pc'].shape[1] # create a dictionary to save the emu info for each PC emulist = [dict() for x in range(0, numpcs)] # fit a GP for each PC for pcanum in range(0, numpcs): emulist[pcanum] = emulation_fit(theta, fitinfo['pc'][:, pcanum]) fitinfo['emulist'] = emulist return
[docs] def predict(predinfo, fitinfo, x, theta, computecov=True, **kwargs): ''' Parameters ---------- predinfo : dict An arbitary dictionary where you should place all of your prediction information once complete. - predinfo['mean'] : mean prediction. - predinfo['cov'] : variance of the prediction. x : numpy.ndarray An array of inputs. Each row should correspond to a row in f. theta : numpy.ndarray An array of parameters. Each row should correspond to a column in f. f : numpy.ndarray An array of responses. Each column in f should correspond to a row in x. Each row in f should correspond to a row in x. args : dict, optional A dictionary containing options. The default is None. Returns ------- Prediction mean and variance at theta and x given the dictionary fitinfo. ''' infos = fitinfo['emulist'] predvecs = np.zeros((theta.shape[0], len(infos))) predvars = np.zeros((theta.shape[0], len(infos))) # xind = range(0, fitinfo['x'].shape[0]) try: if x is None or np.all(np.equal(x, fitinfo['x'])) or \ np.allclose(x, fitinfo['x']): xind = np.arange(0, x.shape[0]) xnewind = np.arange(0, x.shape[0]) else: raise except Exception: matchingmatrix = np.ones((x.shape[0], fitinfo['x'].shape[0])) for k in range(0, x[0].shape[0]): try: matchingmatrix *= np.isclose(x[:, k][:, None], fitinfo['x'][:, k]) except Exception: matchingmatrix *= np.equal(x[:, k][:, None], fitinfo['x'][:, k]) xind = np.argwhere(matchingmatrix > 0.5)[:, 1] xnewind = np.argwhere(matchingmatrix > 0.5)[:, 0] # For each PC, obtain the mean and variance for k in range(0, len(infos)): r = emulation_covmat(theta, fitinfo['theta'], infos[k]['hypcov']) predvecs[:, k] = infos[k]['muhat'] + r @ infos[k]['pw'] Rinv = infos[k]['Rinv'] if computecov: predvars[:, k] = infos[k]['sigma2hat'] * \ (1 + np.exp(infos[k]['hypnug']) - np.sum(r.T*(Rinv @ r.T), 0)) pctscale = (fitinfo['pct'].T * fitinfo['scale']).T # transfer back the PCs into the original space predinfo['mean'] = np.full((x.shape[0], theta.shape[0]), np.nan) predinfo['mean'][xnewind, :] = (predvecs @ pctscale[xind, :].T + fitinfo['offset'][xind]).T if computecov: predinfo['var'] = np.full((x.shape[0], theta.shape[0]), np.nan) predinfo['var'][xnewind, :] = (fitinfo['extravar'][xind] + (predvars @ pctscale[xind, :].T ** 2)).T CH = (np.sqrt(predvars)[:, :, None]*(pctscale[xind, :].T)[None, :, :]) predinfo['covxhalf'] = np.full((theta.shape[0], CH.shape[1], x.shape[0]), np.nan) predinfo['covxhalf'][:, :, xnewind] = CH predinfo['covxhalf'] = predinfo['covxhalf'].transpose((2, 0, 1)) return
def predictmean(predinfo, **kwargs): return predinfo['mean'] def predictvar(predinfo, **kwargs): return predinfo['var'] def emulation_covmat(theta1, theta2, gammav, returndir=False): ''' Returns covariance matrix R (Matern) such that .. math:: R = (1 + S)*\\exp(-S) where .. math:: S = \\frac{|\\theta_1 - \\theta_2|}{\\gamma}. The matrix inverse is obtained via eigendecomposition .. math:: R^{-1} = V (W^{-1}) V^T where V is eigenvector and W are eigenvalues. Parameters ---------- theta1 : numpy.ndarray An n1-by-d array of parameters. theta2 : numpy.ndarray An n2-by-d array of parameters. gammav : numpy.ndarray An array of length d covariance hyperparameters returndir : Bool, optional Boolean. If True, returns dR. The default is False. Returns ------- numpy.ndarray A n1-by-n2 array of covariance between theta1 and theta2 given parameter gammav. ''' d = gammav.shape[0] theta1 = theta1.reshape(1, d) if theta1.ndim < 1.5 else theta1 theta2 = theta2.reshape(1, d) if theta2.ndim < 1.5 else theta2 n1 = theta1.shape[0] n2 = theta2.shape[0] V = np.zeros([n1, n2]) R = np.ones([n1, n2]) # Matern covariance structure, that is, R = (1 + S)*exp(-S) if returndir: dR = np.zeros([n1, n2, d]) for k in range(0, d): S = np.abs(np.subtract.outer(theta1[:, k], theta2[:, k]) / np.exp(gammav[k])) R *= (1 + S) V -= S if returndir: dR[:, :, k] = (S * S) / (1 + S) R *= np.exp(V) if returndir: for k in range(0, d): dR[:, :, k] = R * dR[:, :, k] return R, dR else: return R def emulation_negloglik(hyperparameters, fitinfo): ''' Returns the negative log-likelihood of a univariate GP model for given hyperparameters. Hyperparameters minimize the following .. math:: \\frac{n}{2}\\log{\\hat{\\sigma}^2} + \\frac{1}{2} \\log |R|. Parameters ---------- hyperparameters : numpy.ndarray An array of hyperparameters. fitinfo : dict A dictionary including the emulation fitting information. Returns ------- negloglik : float Negative log-likelihood of a univariate GP model. ''' # obtain the hyperparameter values covhyp = hyperparameters[0:fitinfo['p']] nughyp = hyperparameters[fitinfo['p']] # get the fitinfo values theta = fitinfo['theta'] n = fitinfo['n'] f = fitinfo['f'] # obtain the covariance matrix R R = emulation_covmat(theta, theta, covhyp) R = R + np.exp(nughyp)*np.diag(np.ones(n)) # eigendecomposition of R W, V = np.linalg.eigh(R) # MLEs for mu and sigma^2 fspin = V.T @ f onespin = V.T @ np.ones(f.shape) muhat = np.sum(V @ (1/W * fspin)) / np.sum(V @ (1/W * onespin)) fcenter = fspin - muhat * onespin sigma2hat = np.mean((fcenter) ** 2 / W) # Negative log-likelihood of a univariate GP model negloglik = 1/2 * np.sum(np.log(W)) + n/2 * np.log(sigma2hat) negloglik += 1/2 * np.sum((hyperparameters - fitinfo['hypregmean'])**2 / (fitinfo['hypregstd'] ** 2)) return negloglik def emulation_negloglikgrad(hyperparameters, fitinfo): ''' Parameters ---------- hyperparameters : numpy.ndarray An array of hyperparameters. fitinfo : dict A dictionary including the emulation fitting information. Returns ------- dnegloglik : float Gradient of the log-likelihood of a univariate GP model. ''' # obtain the hyperparameter values covhyp = hyperparameters[0:fitinfo['p']] nughyp = hyperparameters[fitinfo['p']] # get the fitinfo values theta = fitinfo['theta'] n = fitinfo['n'] p = fitinfo['p'] f = fitinfo['f'] # obtain the covariance matrix R, dR = emulation_covmat(theta, theta, covhyp, True) R = R + np.exp(nughyp)*np.diag(np.ones(n)) dRappend = np.exp(nughyp)*np.diag(np.ones(n)).reshape(R.shape[0], R.shape[1], 1) dR = np.append(dR, dRappend, axis=2) # MLEs for mu and sigma^2 W, V = np.linalg.eigh(R) fspin = V.T @ f onespin = V.T @ np.ones(f.shape) mudenom = np.sum(V @ (1/W * onespin)) munum = np.sum(V @ (1/W * fspin)) muhat = munum / mudenom fcenter = fspin - muhat * onespin sigma2hat = np.mean((fcenter) ** 2 / W) # gradients dmuhat = np.zeros(p + 1) dsigma2hat = np.zeros(p + 1) dfcentercalc = (fcenter / W) @ V.T dfspincalc = (fspin / W) @ V.T donespincalc = (onespin / W) @ V.T Rinv = V @ np.diag(1/W) @ V.T dlogdet = np.zeros(p + 1) for k in range(0, dR.shape[2]): dRnorm = np.squeeze(dR[:, :, k]) dmuhat[k] = -np.sum(donespincalc @ dRnorm @ dfspincalc) \ / mudenom + \ muhat * (np.sum(donespincalc @ dRnorm @ donespincalc)/mudenom) dsigma2hat[k] = -(1/n) * (dfcentercalc.T @ dRnorm @ dfcentercalc) + \ 2*dmuhat[k] * np.mean((fcenter * onespin) / W) dlogdet[k] = np.sum(Rinv * dRnorm) # Gradient of the log-likelihood of a single dimensional GP model. dnegloglik = 1/2 * dlogdet + n/2 * 1/sigma2hat * dsigma2hat dnegloglik += ((hyperparameters - fitinfo['hypregmean']) / (fitinfo['hypregstd'] ** 2)) return dnegloglik def emulation_fit(theta, pcaval): ''' Fits a univariate GP. First, obtains the hyperparameter values via 'L-BFGS-B'. Then, finds the MLEs of mu and sigma^2 for the best hyperparameters such that .. math:: \\hat{\\mu} = \\frac{1^T R^{-1} f}{1^T R^{-1} 1} .. math:: \\hat{\\sigma^2} = \\frac{(f-\\hat{\\mu})^T R^{-1} (f-\\hat{\\mu})}{n} Parameters ---------- theta : numpy.ndarray An n-by-d array of parameters. pcaval : numpy.ndarray An array of length n. Returns ------- subinfo : dict Dictionary of the fitted emulator model. ''' subinfo = {} covhyp0 = np.log(np.std(theta, 0)*3) + 1 covhypLB = covhyp0 - 2 covhypUB = covhyp0 + 3 nughyp0 = -6 nughypLB = -15 nughypUB = 1 # Get a random sample of thetas to find the optimized hyperparameters n_train = np.min((20*theta.shape[1], theta.shape[0])) idx = np.random.choice(theta.shape[0], n_train, replace=False) # Start constructing the returning dictionary if theta.ndim == 1: subinfo['theta'] = theta[idx] else: subinfo['theta'] = theta[idx, :] subinfo['f'] = pcaval[idx] subinfo['n'] = subinfo['f'].shape[0] subinfo['p'] = covhyp0.shape[0] subinfo['hypregmean'] = np.append(covhyp0, nughyp0) subinfo['hypregstd'] = np.append((covhypUB - covhypLB)/3, 4) # Find the hyperparameters minimizing the negative loglikelihood bounds = spo.Bounds(np.append(covhypLB, nughypLB), np.append(covhypUB, nughypUB)) opval = spo.minimize(emulation_negloglik, np.append(covhyp0, nughyp0), bounds=bounds, args=(subinfo), method='L-BFGS-B', options={'disp': False}, jac=emulation_negloglikgrad) # Obtain the optimized hyperparameter values hypcov = opval.x[:subinfo['p']] hypnug = opval.x[subinfo['p']] # Obtain the covariance matrix R = emulation_covmat(theta, theta, hypcov) R = R + np.exp(hypnug)*np.diag(np.ones(R.shape[0])) # Obtain the eigenvalue decomposition of the covariance matrix W, V = np.linalg.eigh(R) # MLEs for mu and sigma^2 fspin = V.T @ pcaval onespin = V.T @ np.ones(pcaval.shape) muhat = np.sum(V @ (1/W * fspin)) / np.sum(V @ (1/W * onespin)) fcenter = fspin - muhat * onespin sigma2hat = np.mean((fcenter) ** 2 / W) # Obtain the inverse of the covariance matrix Rinv = V @ np.diag(1/W) @ V.T # Construct the dictionary with the fitted emulator subinfo['hypcov'] = hypcov subinfo['hypnug'] = hypnug subinfo['R'] = R subinfo['Rinv'] = Rinv subinfo['pw'] = Rinv @ (pcaval - muhat) subinfo['muhat'] = muhat subinfo['sigma2hat'] = sigma2hat subinfo['theta'] = theta return subinfo def __standardizef(fitinfo, offset=None, scale=None): "Standardizes f by creating offset, scale and fs." # Extracting from input dictionary f = fitinfo['f'] if (offset is not None) and (scale is not None): if offset.shape[0] == f.shape[1] and scale.shape[0] == f.shape[1]: if np.any(np.nanmean(np.abs(f-offset)/scale, 1) > 4): offset = None scale = None else: offset = None scale = None if offset is None or scale is None: offset = np.zeros(f.shape[1]) scale = np.zeros(f.shape[1]) for k in range(0, f.shape[1]): offset[k] = np.nanmean(f[:, k]) scale[k] = np.nanstd(f[:, k]) if scale[k] == 0: scale[k] = 0.0001 # Initializing values fs = np.zeros(f.shape) fs = (f - offset) / scale # Assigning new values to the dictionary fitinfo['offset'] = offset fitinfo['scale'] = scale fitinfo['fs'] = fs return def __PCs(fitinfo): "Apply PCA to reduce the dimension of f" # Extracting from input dictionary f = fitinfo['f'] fs = fitinfo['fs'] epsilon = fitinfo['epsilon'] pct = None pcw = None U, S, _ = np.linalg.svd(fs.T, full_matrices=False) Sp = S ** 2 - epsilon pct = U[:, Sp > 0] pcw = np.sqrt(Sp[Sp > 0]) pcstdvar = np.zeros((f.shape[0], pct.shape[1])) fitinfo['pcw'] = pcw fitinfo['pcto'] = 1*pct fitinfo['pct'] = pct * pcw / np.sqrt(pct.shape[0]) fitinfo['pcti'] = pct * (np.sqrt(pct.shape[0]) / pcw) fitinfo['pc'] = fs @ fitinfo['pcti'] fitinfo['extravar'] = np.mean((fs - fitinfo['pc'] @ fitinfo['pct'].T) ** 2, 0) *\ (fitinfo['scale'] ** 2) fitinfo['pcstdvar'] = 10*pcstdvar return