Source code for metropolis_hastings

import numpy as np
import scipy.stats as sps

"""Metropolis Hastings"""


[docs] def sampler(logpost_func, draw_func, numsamp=2000, theta0=None, stepType='normal', stepParam=None, burnSamples=1000, verbose=False, **mh_options): ''' Parameters ---------- logpost_func : function a function returns the log of the posterior for a given theta. draw_func : function a function returns random draws of initial design theta numsamp : int, optional number of samples to draw. The default is 2000. theta0 : array, optional initial theta value. The default is None. stepType : str, optional either 'uniform' or 'normal'. The default is 'normal'. stepParam : array, optional scaling parameter. The default is None. **mh_options : dict additional options. Returns ------- sampler_info : dict returns numsamp random draws from posterior. ''' # scaling parameter if stepParam is None: stepParam = np.std(draw_func(burnSamples), axis=0) # intial theta to start the chain if theta0 is None: theta0 = draw_func(1) p = theta0.shape[1] lposterior = np.zeros(burnSamples + numsamp) theta = np.zeros((burnSamples + numsamp, theta0.shape[1])) # print(theta0) lposterior[0] = logpost_func(theta0, return_grad=False).item() theta[0] = theta0 n_acc = 0 lposterior_list = [] for i in range(1, burnSamples + numsamp): if verbose: if i % 30000 == 0: print("At sample {}, acceptance rate is {}.".format(i, n_acc/i)) # Candidate theta theta_cand = None if stepType == 'normal': theta_cand = [theta[i-1, :][k] + stepParam[k] * sps.norm.rvs(0, 1, size=1) for k in range(p)] elif stepType == 'uniform': theta_cand = [theta[i-1, :][k] + stepParam[k] * sps.uniform.rvs(-0.5, 0.5, size=1) for k in range(p)] theta_cand = np.reshape(np.array(theta_cand), (1, p)) # Compute loglikelihood logpost = logpost_func(theta_cand, return_grad=False).item() if np.isfinite(logpost): p_accept = min(1, np.exp(logpost - lposterior[i-1])) accept = np.random.uniform() < p_accept else: accept = False # Accept candidate? if accept: # Update position theta[i, :] = theta_cand lposterior[i] = logpost lposterior_list.append(logpost) if i >= burnSamples: n_acc += 1 else: theta[i, :] = theta[i-1, :] lposterior[i] = lposterior[i-1] lposterior_list.append(logpost) theta = theta[(burnSamples):(burnSamples + numsamp), :] sampler_info = {'theta': theta, 'acc_rate': n_acc/numsamp, 'lpostlist': np.array(lposterior_list)} if verbose: print("Final Acceptance Rate: ", n_acc/numsamp) return sampler_info