Source code for dgfit.dustmodel

import math
import numpy as np
from scipy.special import erf

from astropy.io import fits

from dgfit.dustgrains import DustGrains

__all__ = ["DustModel", "MRNDustModel", "WDDustModel"]


[docs] class DustModel(object): """ Full dust model including arbitrary size and composition distributions. Includes the physical properties of the individual dust grains. Dust model that has each bin as an independent variable in the grain size distribution providing a truly arbitrary specification. Parameters ---------- componentnames : str list, optional if set, then read in the grain information from files path : str, optional path to grain files dustmodel : DustModel object, optional if set, create the grain info on the obsdata wavelengths using the input dustmodel grain information obsdata : ObsData object, optional observed data information Attributes ---------- origin : string origin of the dust grain physical properties allowed values are 'files' and 'onobsdata' n_components : int number of dust grain components components : array of DustGrain objects one DustGrain object per component sizedisttype : string functional form of component size distributions n_params : ints number of size distribution parameters per grain component parameters : dict Dictonary of parameters with an entry for each composition each entry is then a dictonary giving the value by parameter name. For the bins case, the dictonary is empty as the parameters is the size distribution. """ def __init__( self, componentnames=None, path="./", dustmodel=None, obsdata=None, every_nth=5 ): self.origin = None self.n_components = 0 self.components = [] self.sizedisttype = "bins" self.n_params = None self.parameters = {} # populate the grain info if componentnames is not None: self.read_grain_files(componentnames, path=path, every_nth=every_nth) elif dustmodel is not None: if obsdata is not None: self.grains_on_obs(dustmodel, obsdata) else: self.grains_on_model(dustmodel) # set the number of size distribution parametres if self.n_components > 0: self.n_params = [] for component in self.components: self.n_params.append(component.n_sizes)
[docs] def read_grain_files(self, componentnames, path="./", every_nth=5): """ Read in the precomputed dust grain physical properties from files for each grain component. Parameters ---------- componentnames : list of strings names of dust grain materials path : type path to files every_nth : int Only use every nth size, faster fitting Returns ------- updated class variables """ self.origin = "files" self.n_components = len(componentnames) # get the basic grain data for componentname in componentnames: cur_DG = DustGrains() cur_DG.from_files(componentname, path=path, every_nth=every_nth) self.components.append(cur_DG)
[docs] def grains_on_obs(self, full_dustmodel, observeddata): """ Calculate the dust grain properties on the observed wavelength grid. Uses an existing DustModel based on the full precomputed files and an ObsData object to get the wavelength grid. Makes the fitting faster to only do this transformation once. Parameters ---------- full_dustmodel : DustModel object full dust model based on input files observeddata: ObsData object observed data to use for transformation Returns ------- updated class variables """ self.origin = "onobsdata" self.n_components = full_dustmodel.n_components for component in full_dustmodel.components: cur_DG = DustGrains() cur_DG.from_object(component, observeddata) self.components.append(cur_DG)
[docs] def grains_on_model(self, full_dustmodel): """ Calculate the dust grain properties on the model grid (simple copy). Uses an existing DustModel based on the full precomputed files and an ObsData object to get the wavelength grid. Makes the fitting faster to only do this transformation once. Parameters ---------- full_dustmodel : DustModel object full dust model based on input files Returns ------- updated class variables """ self.origin = "onmodel" self.n_components = full_dustmodel.n_components for component in full_dustmodel.components: self.components.append(component)
[docs] def compute_size_dist(self, x, params): """ Compute the size distribution for the input sizes. For the bins case, just passes the parameters back. Allows for other functional forms of the size distribution with minimal new code. Parameters ---------- x : floats grains sizes params : floats Size distribution parameters For the arbitrary bins case, the parameters are the number of grains per size distribution Returns ------- floats Size distribution as a function of x """ return params
[docs] def set_size_dist_parameters(self, params): """ Set the size distribution parameters in the object dictonary. For the bins case, this does nothing. Allows for other functional forms of the size distribution with minimal new code. Parameters ---------- params : floats Size distribution parameters For the arbitrary bins case, the parameters are the number of grains per size distribution """ pass
[docs] def set_size_dist(self, params): """ Set the size distributions for each component based on the parameters of the functional form of the distributions. Parameters ---------- new_size_dists : type Description of parameter `new_size_dists`. Returns ------- type Description of returned object. """ k1 = 0 for k, component in enumerate(self.components): delta_val = self.n_params[k] k2 = k1 + delta_val component.size_dist[:] = self.compute_size_dist( component.sizes[:], params[k1:k2] ) k1 += delta_val
[docs] def eff_grain_props(self, OD, predict_all=False): """ Compute the effective grain properties of the ensemble of grain sizes and compositions. Parameters ---------- OD : ObsData object Observed data object specifically used to determine which observations to compute (only those needed for speed) predict_all : type Regardless of the ObsData, compute all possible observations Returns ------- dict Dictonary of predicted observations E.g., keys of cext, natoms, emission, albedo, g """ # storage for results _cabs = np.zeros(self.components[0].n_wavelengths) _csca = np.zeros(self.components[0].n_wavelengths) _natoms = {} if OD.fit_ir_emission or predict_all: _emission = np.zeros(self.components[0].n_wavelengths_emission) if OD.fit_scat_a or predict_all: _scat_a_cext = np.zeros(self.components[0].n_wavelengths_scat_a) _scat_a_csca = np.zeros(self.components[0].n_wavelengths_scat_a) if OD.fit_scat_g or predict_all: _g = np.zeros(self.components[0].n_wavelengths_scat_g) _scat_g_csca = np.zeros(self.components[0].n_wavelengths_scat_g) for component in self.components: results = component.eff_grain_props(OD, predict_all=predict_all) _tcabs = results["cabs"] _tcsca = results["csca"] _cabs += _tcabs _csca += _tcsca # for the depletions (# of atoms), a bit more careful work needed _tnatoms = results["natoms"] for aname in _tnatoms.keys(): if aname in _natoms.keys(): _natoms[aname] += _tnatoms[aname] else: _natoms[aname] = _tnatoms[aname] if OD.fit_ir_emission or predict_all: _temission = results["emission"] _emission += _temission if OD.fit_scat_a or predict_all: _tscat_a_cext = results["scat_a_cext"] _tscat_a_csca = results["scat_a_csca"] _scat_a_cext += _tscat_a_cext _scat_a_csca += _tscat_a_csca if OD.fit_scat_g or predict_all: _tg = results["g"] _tscat_g_csca = results["scat_g_csca"] _g += _tscat_g_csca * _tg _scat_g_csca += _tscat_g_csca results = {} results["cabs"] = _cabs results["csca"] = _csca results["natoms"] = _natoms if OD.fit_ir_emission or predict_all: results["emission"] = _emission if OD.fit_scat_a or predict_all: results["albedo"] = _scat_a_csca / _scat_a_cext if OD.fit_scat_g or predict_all: results["g"] = _g / _scat_g_csca return results
[docs] def read_sizedist_from_file(self, filename): """ Read in the size distribution from a file interpolating across sizes if needed Parameters ---------- filename : str name of FITS file with size distributions one component per extension """ for k, component in enumerate(self.components): fitsdata = fits.getdata(filename, k + 1) # interpolate, otherwise assume exact match in sizes # might want to add some checking here for robustness if len(component.size_dist) != len(fitsdata[:][1]): component.size_dist = 10 ** np.interp( np.log10(component.sizes), np.log10(fitsdata["SIZE"]), np.log10(fitsdata["DIST"]), ) else: component.size_dist = fitsdata["DIST"]
[docs] def lnprob_generic(self, obsdata): """ Compute the ln(prob) for the dust grain size and composition distribution as defined by the dustmodel. Parameters ---------- obsdata : ObsData object All the observed data Returns ------- float natural log of the probability """ # get the integrated dust properties results = self.eff_grain_props(obsdata) # compute the ln(prob) for A(l)/N(HI) lnp_alnhi = 0.0 if obsdata.fit_extinction: cabs = results["cabs"] csca = results["csca"] cext = cabs + csca dust_alnhi = 1.086 * cext lnp_alnhi = -0.5 * np.sum( ((obsdata.ext_alnhi - dust_alnhi) / obsdata.ext_alnhi_unc) ** 2 ) # lnp_alnhi /= obsdata.n_wavelengths # compute the ln(prob) for the depletions lnp_dep = 0.0 if obsdata.fit_abundance: natoms = results["natoms"] for atomname in natoms.keys(): lnp_dep = ( (natoms[atomname] - obsdata.abundance[atomname][0]) / obsdata.abundance[atomname][1] ) ** 2 lnp_dep *= -0.5 # compute the ln(prob) for IR emission lnp_emission = 0.0 if obsdata.fit_ir_emission: emission = results["emission"] lnp_emission = -0.5 * np.sum( (((obsdata.ir_emission - emission) / (obsdata.ir_emission_unc)) ** 2) ) # compute the ln(prob) for the dust albedo lnp_albedo = 0.0 if obsdata.fit_scat_a: albedo = results["albedo"] lnp_albedo = -0.5 * np.sum( (((obsdata.scat_albedo - albedo) / (obsdata.scat_albedo_unc)) ** 2) ) # compute the ln(prob) for the dust g lnp_g = 0.0 if obsdata.fit_scat_g: g = results["g"] lnp_albedo = -0.5 * np.sum( (((obsdata.scat_g - g) / (obsdata.scat_g_unc)) ** 2) ) # combine the lnps lnp = lnp_alnhi + lnp_dep + lnp_emission + lnp_albedo + lnp_g # print(params) # print(lnp_alnhi, lnp_dep, lnp_emission, lnp_albedo, lnp_g) if math.isinf(lnp) | math.isnan(lnp): print(lnp_alnhi, lnp_dep, lnp_emission, lnp_albedo, lnp_g) print(lnp) # print(params) exit() else: return lnp
[docs] @staticmethod def lnprob(params, obsdata, dustmodel): """ Compute the full probability function including priors Static function as it will be called form the fitter Parameters ---------- params : floats Parameters of the size distribution function obsdata : ObsData object Observed data to be fit dustmodel : DustModel object Dust model information Returns ------- float natural log of the probability """ # prior # make sure the size distributions are all positve lnp_bound = 0.0 for param in params: if param < 0.0: lnp_bound = -1e20 # return -np.inf # update the size distributions dustmodel.set_size_dist(params) return dustmodel.lnprob_generic(obsdata) + lnp_bound
[docs] def initial_walkers(self, p0, nwalkers): """ Setup the walkers based on the initial parameters p0 Specific to MCMC fitters (e.g., emcee). Parameters ---------- p0 : floats Initial values of the parameters nwalkers : int Number of walkers to initialize Returns ------- array of floats concatenated set of initial walker positions """ self.ndim = len(p0) self.nwalkers = nwalkers # Initial ball should be in log space p = [ 10 ** (np.log10(p0) + 1.0 * np.random.uniform(-1, 1.0, self.ndim)) for k in range(self.nwalkers) ] return p
[docs] def save_results(self, filename, OD, size_dist_uncs=[0]): """ Save fitting results to a file. Results include the size distribution and all predicted observations. Creates a FITS file with the results Parameters ---------- filename : str Name of the file to save the results OD : ObsData object All the observed data (may not be needed) size_dist_uncs : floats Uncertainties on the size distributions """ # write a small primary header pheader = fits.Header() pheader.set("NCOMPS", len(self.components), "number of dust grain components") for k, component in enumerate(self.components): pheader.set( "CNAME" + str(k), component.name, "name of dust grain component" ) pheader.set("SDMODEL", self.sizedisttype, "type of size distribution") pheader.add_comment("Dust Model results written by DustModel.py") pheader.add_comment("written by Karl D. Gordon") pheader.add_comment("kgordon@stsci.edu") phdu = fits.PrimaryHDU(header=pheader) hdulist = fits.HDUList([phdu]) # output the dust grain size distribution k1 = 0 for component in self.components: col1 = fits.Column(name="SIZE", format="E", array=component.sizes) col2 = fits.Column(name="DIST", format="E", array=component.size_dist) all_cols = [col1, col2] k2 = k1 + component.n_sizes if len(size_dist_uncs) > 1: col3 = fits.Column( name="DISTPUNC", format="E", array=size_dist_uncs[0][k1:k2] ) all_cols.append(col3) col4 = fits.Column( name="DISTMUNC", format="E", array=size_dist_uncs[1][k1:k2] ) all_cols.append(col4) k1 += component.n_sizes tbhdu = fits.BinTableHDU.from_columns(all_cols) tbhdu.header.set("EXTNAME", component.name, "dust grain component name") # save the parameter values if self.parameters: for cparam in self.parameters[component.name].items(): tbhdu.header.set( cparam[0], cparam[1], "parameters of size distribution model" ) hdulist.append(tbhdu) # output the resulting observable parameters results = self.eff_grain_props(OD, predict_all=True) cabs = results["cabs"] csca = results["csca"] natoms = results["natoms"] # natoms col1 = fits.Column( name="NAME", format="A2", array=np.array(list(natoms.keys())) ) col2 = fits.Column( name="ABUND", format="E", array=np.array(list(natoms.values())) ) cols = fits.ColDefs([col1, col2]) tbhdu = fits.BinTableHDU.from_columns(cols) tbhdu.header.set( "EXTNAME", "Abundances", "abundances in units of # atoms/1e6 H atoms" ) hdulist.append(tbhdu) # extinction col1 = fits.Column( name="WAVE", format="E", array=self.components[0].wavelengths ) col2 = fits.Column(name="EXT", format="E", array=1.086 * (cabs + csca)) all_cols_ext = [col1, col2] # emission # if ObsData.fit_ir_emission: emission = results["emission"] col1 = fits.Column( name="WAVE", format="E", array=self.components[0].wavelengths_emission ) col2 = fits.Column(name="EMIS", format="E", array=emission) all_cols_emis = [col1, col2] # albedo albedo = results["albedo"] tvals = self.components[0].wavelengths_scat_a col1 = fits.Column(name="WAVE", format="E", array=tvals) col2 = fits.Column(name="ALBEDO", format="E", array=albedo) all_cols_albedo = [col1, col2] # g g = results["g"] tvals = self.components[0].wavelengths_scat_g col1 = fits.Column(name="WAVE", format="E", array=tvals) col2 = fits.Column(name="G", format="E", array=g) all_cols_g = [col1, col2] for k, component in enumerate(self.components): results = component.eff_grain_props(OD, predict_all=True) tcabs = results["cabs"] tcsca = results["csca"] # tnatoms = results['natoms'] tcol = fits.Column( name="EXT" + str(k + 1), format="E", array=1.086 * (tcabs + tcsca) ) all_cols_ext.append(tcol) temission = results["emission"] tcol = fits.Column(name="EMIS" + str(k + 1), format="E", array=temission) all_cols_emis.append(tcol) talbedo = results["albedo"] tcol = fits.Column(name="ALBEDO" + str(k + 1), format="E", array=talbedo) all_cols_albedo.append(tcol) tg = results["g"] tcol = fits.Column(name="G" + str(k + 1), format="E", array=tg) all_cols_g.append(tcol) # now output the results # extinction cols = fits.ColDefs(all_cols_ext) tbhdu = fits.BinTableHDU.from_columns(cols) tbhdu.header.set("EXTNAME", "Extinction", "extinction in A(lambda)/N(HI) units") hdulist.append(tbhdu) # emission cols = fits.ColDefs(all_cols_emis) tbhdu = fits.BinTableHDU.from_columns(cols) tbhdu.header.set("EXTNAME", "Emission", "emission MJy/sr/H atom units") hdulist.append(tbhdu) # albedo cols = fits.ColDefs(all_cols_albedo) tbhdu = fits.BinTableHDU.from_columns(cols) tbhdu.header.set("EXTNAME", "Albedo", "dust scattering albedo") hdulist.append(tbhdu) # g cols = fits.ColDefs(all_cols_g) tbhdu = fits.BinTableHDU.from_columns(cols) tbhdu.header.set("EXTNAME", "G", "dust scattering phase function asymmetry") hdulist.append(tbhdu) hdulist.writeto(filename, overwrite=True)
[docs] @staticmethod def get_percentile_vals(chain, ndim): """ Compute the 50% +/- 33% values from the samples Parameters ---------- chain : sampler.chain Chain from the EMCEE sampler ndim : int number of paramaters Returns ------- tuple of floats (p50, p84-p50, p50-p16) """ samples = chain.reshape((-1, ndim)) values = map( lambda v: (v[1], v[2] - v[1], v[1] - v[0]), zip(*np.percentile(samples, [16, 50, 84], axis=0)), ) val_50p, punc, munc = zip(*values) return (val_50p, punc, munc)
[docs] def save_50percentile_results(self, oname, sampler, obsdata, nburn=0, cur_step=None): """ Compute the 50th percentile paramaters, set the size distribution, and save the results Creates a FITS file with the results Parameters ---------- oname : str Name of the file to save the results sampler : emcee.sampler Sampler object from EMCEE run obsdata : ObsData object All the observed data (may not be needed) cur_step : int Current step number """ if cur_step is None: cur_step = sampler.chain.shape[1] ( fin_size_dist_50p, fin_size_dist_punc, fin_size_dist_munc, ) = self.get_percentile_vals(sampler.chain[:, nburn : cur_step + 1, :], self.ndim) self.set_size_dist(fin_size_dist_50p) # save the model parameters for the size distribution # set here so that the saved results have the right info self.set_size_dist_parameters(fin_size_dist_50p) # save the final size distributions self.save_results( oname, obsdata, size_dist_uncs=[fin_size_dist_punc, fin_size_dist_munc] )
[docs] def save_best_results(self, oname, sampler, obsdata, cur_step=None): """ Compute the best fit paramaters using a sampler chain, set the size distribution, and save the results Creates a FITS file with the results Parameters ---------- oname : str Name of the file to save the results sampler : emcee.sampler Sampler object from EMCEE run obsdata : ObsData object All the observed data (may not be needed) cur_step : int Current step number """ # get the best fit values max_lnp = -1e20 if cur_step is None: cur_step = len(sampler.lnprobability[0]) for k in range(self.nwalkers): tmax_lnp = np.max(sampler.lnprobability[k, 0:cur_step]) if tmax_lnp > max_lnp: max_lnp = tmax_lnp (indxs,) = np.where(sampler.lnprobability[k] == tmax_lnp) fit_params_best = sampler.chain[k, indxs[0], :] self.set_size_dist(fit_params_best) # save the best fit size distributions self.save_results(oname, obsdata)
# ================================================================
[docs] class MRNDustModel(DustModel): """ Dust model that uses powerlaw size distributions with min/max sizes (MRN). Same keywords and attributes as the parent DustModel class. """ def __init__(self, **kwargs): super().__init__(**kwargs) self.sizedisttype = "MRN" self.n_params = [4] * self.n_components for component in self.components: self.parameters[component.name] = { "C": 1e-25, "alpha": 3.5, "a_min": 1e-7, "a_max": 1e-3, }
[docs] def compute_size_dist(self, x, params): """ Compute the size distribution for the input sizes. Powerlaw size distribution (aka MRN size distribution) sizedist = A*a^-alpha where a = grain size, A = amplitude, alpha = exponent of power law, amin = min grain size, amax = max grain size, Parameters ---------- x : floats grains sizes params : floats Size distribution parameters Returns ------- floats Size distribution as a function of x """ sizedist = params[0] * np.power(x, -1.0 * params[1]) (indxs,) = np.where(np.logical_or(x < params[2], x > params[3])) if len(indxs) > 0: sizedist[indxs] = 0.0 return sizedist
[docs] def set_size_dist_parameters(self, params): """ Set the size distribution parameters in the object dictonary. Parameters ---------- params : floats Size distribution parameters """ k1 = 0 for k, component in enumerate(self.components): k2 = k1 + self.n_params[k] cparams = params[k1:k2] k1 += self.n_params[k] self.parameters[component.name] = { "C": cparams[0], "alpha": cparams[1], "a_min": cparams[2], "a_max": cparams[3], }
[docs] @staticmethod def lnprob(params, obsdata, dustmodel): """ Compute the ln(prob) given the model parameters MRN model paramters for each component are A = amplitude alpha = negative of the power law exponent amin = min grain size amax = max grain size Parameters ---------- params : array of floats 4 x n_components parameters of the MRN model obsdata : ObsData object observed data for fitting dustmodel : DustModel object must be passed explicitly as the fitters require a static method (is this true?) Returns ------- lnprob : float natural log of the probability the input parameters describe the data """ # priors k1 = 0 lnp_bound = 0.0 for k, component in enumerate(dustmodel.components): # get the parameters for the current component k2 = k1 + dustmodel.n_params[k] cparams = params[k1:k2] k1 += dustmodel.n_params[k] # check that amin < amax (params 3 & 4) if cparams[2] > cparams[3]: lnp_bound = -1e20 # check that the amin and amax are within the bounds # of the dustmodel if cparams[2] < component.sizes[0]: lnp_bound = -1e20 if cparams[3] > component.sizes[-1]: lnp_bound = -1e20 # keep the normalization always positive if cparams[0] < 0.0: lnp_bound = -1e20 if cparams[1] < 0.0: lnp_bound = -1e20 if lnp_bound < 0.0: return lnp_bound else: dustmodel.set_size_dist(params) return dustmodel.lnprob_generic(obsdata) + lnp_bound
[docs] def initial_walkers(self, p0, nwalkers): """ Setup the walkers based on the initial parameters p0 Specific to MCMC fitters (e.g., emcee). Parameters ---------- p0 : floats Initial values of the parameters nwalkers : int Number of walkers to initialize Returns ------- array of floats concatenated set of initial walker positions """ self.ndim = len(p0) self.nwalkers = nwalkers # Initial ball # delts = np.array([1.0, 0.01, 1e-8, 1e-5, 1.0, 0.01, 1e-8, 1e-5]) # p = [p0 + delts*np.random.normal(0., 1., self.ndim) p = [ 10 ** (np.log10(p0) + 0.1 * np.random.uniform(-1, 1.0, self.ndim)) for k in range(self.nwalkers) ] return p
# ================================================================
[docs] class WDDustModel(DustModel): """ Dust model that uses the Weingartner & Draine (2001) size distributions. Same kewyords and attributes as the parent DustModel class. """ def __init__(self, **kwargs): super().__init__(**kwargs) self.sizedisttype = "WD" # set the number of size distribution parametres if self.n_components > 0: self.n_params = [] for component in self.components: if component.name == "astro-silicates": self.n_params.append(4) self.parameters["astro-silicates"] = { "C_s": 1.33e-12, "a_ts": 0.171e4, "alpha_s": -1.41, "beta_s": -11.5, } elif component.name == "astro-carbonaceous": self.n_params.append(6) self.parameters["astro-carbonaceous"] = { "C_g": 4.15e-11, "a_tg": 0.00837e4, "alpha_g": -1.91, "beta_g": -0.125, "a_cg": 0.499e4, "b_C": 3.0e-5, } else: raise ValueError( "%s grain material note supported" % component.name )
[docs] def compute_size_dist(self, x, params): """ Compute the size distribution for the input sizes. Parameters ---------- x : floats grain sizes params : floats Size distribution parameters Returns ------- floats Size distribution as a function of x """ # input grain sizes are in cm, needed in Angstroms a = x * 1e8 if len(params) == 6: # carbonaceous C, a_t, alpha, beta, a_c, input_bC = params else: # silicates C, a_t, alpha, beta = params a_c = 0.1e4 input_bC = None # larger grain size distribution # same for silicates and carbonaceous grains if beta >= 0.0: Fa = 1.0 + beta * a / a_t else: Fa = 1.0 / (1.0 - beta * a / a_t) Ga = np.full((len(a)), 1.0) (indxs,) = np.where(a > a_t) Ga[indxs] = np.exp(-1.0 * np.power((a[indxs] - a_t) / a_c, 3.0)) sizedist = (C / (1e-8 * a)) * np.power(a / a_t, alpha) * Fa * Ga # very small gain size distribution # only for carbonaceous grains if input_bC is not None: a0 = np.array([3.5, 30.0]) # in A bC = np.array([0.75, 0.25]) * input_bC sigma = 0.4 rho = 2.24 # in g/cm^3 for graphite mC = 12.0107 * 1.660e-24 Da = 0.0 for i in range(2): Bi = ( (3.0 / (np.power(2.0 * np.pi, 1.5))) * ( np.exp(-4.5 * np.power(sigma, 2.0)) / (rho * np.power(1e-8 * a0[i], 3.0) * sigma) ) * ( bC[i] * mC / ( 1.0 + erf( (3.0 * sigma / np.sqrt(2.0)) + np.log(a0[i] / 3.5) / (sigma * np.sqrt(2.0)) ) ) ) ) Da += (Bi / (1e-8 * a)) * np.exp( -0.5 * np.power(np.log(a / a0[i]) / sigma, 2.0) ) sizedist += Da return sizedist
[docs] def set_size_dist_parameters(self, params): """ Set the size distribution parameters in the object dictonary. Parameters ---------- params : floats Size distribution parameters """ k1 = 0 for k, component in enumerate(self.components): k2 = k1 + self.n_params[k] cparams = params[k1:k2] k1 += self.n_params[k] if component.name == "astro-silicates": self.parameters["astro-silicates"] = { "C_s": cparams[0], "a_ts": cparams[1], "alpha_s": cparams[2], "beta_s": cparams[3], } else: self.parameters["astro-carbonaceous"] = { "C_g": cparams[0], "a_tg": cparams[1], "alpha_g": cparams[2], "beta_g": cparams[3], "a_cg": cparams[4], "b_C": cparams[5], }
[docs] @staticmethod def lnprob(params, obsdata, dustmodel): """ Compute the ln(prob) given the model parameters Parameters ---------- params : array of floats 4 parameters of the WD model obsdata : ObsData object observed data for fitting dustmodel : DustModel object must be passed explicitly as the fitters require a static method (is this true?) Returns ------- lnprob : float natural log of the probability the input parameters describe the data """ # priors k1 = 0 lnp_bound = 0.0 for k, component in enumerate(dustmodel.components): # get the parameters for the current component k2 = k1 + dustmodel.n_params[k] cparams = params[k1:k2] k1 += dustmodel.n_params[k] # keep the normalization always positive if cparams[0] < 0.0: lnp_bound = -1e20 if cparams[1] < 0.0: lnp_bound = -1e20 if len(cparams) == 6: if cparams[4] < 0.0: lnp_bound = -1e20 if cparams[5] < 0.0: lnp_bound = -1e20 if lnp_bound < 0.0: return lnp_bound else: dustmodel.set_size_dist(params) return dustmodel.lnprob_generic(obsdata) + lnp_bound
[docs] def initial_walkers(self, p0, nwalkers): """ Setup the walkers based on the initial parameters p0 Specific to MCMC fitters (e.g., emcee). Parameters ---------- p0 : floats Initial values of the parameters nwalkers : int Number of walkers to initialize Returns ------- array of floats concatenated set of initial walker positions """ self.ndim = len(p0) self.nwalkers = nwalkers # some parameters are negative, so need to be handled psigns = np.sign(p0) p = [ psigns * ( 10 ** ( np.log10(np.absolute(p0)) + 0.1 * np.random.uniform(-1, 1.0, self.ndim) ) ) for k in range(self.nwalkers) ] return p