Source code for dgfit.obsdata

import numpy as np
from astropy.table import Table

__all__ = ["ObsData"]


# Object for the observed dust data
[docs] class ObsData(object): """ ObsData Class observed data that will be used to constrain the dust model Parameters ---------- obs_filename: 'string' filename with the observed data types and filenames Attributes ---------- alnhi : float A(lambda)/N(HI) value for extinction curve alnhi_unc : float uncertainty in A(lambda)/N(HI) value for extinction curve ext_waves : 'numpy.ndarray' wavelengths for the extinction curve ext_alav : 'numpy.ndarray' extinction curve in A(lambda)/A(V) units ext_alav_unc : 'numpy.ndarray' extinction curve uncertainties in A(lambda)/A(V) units ext_alnhi : 'numpy.ndarray' extinction curve in A(lambda)/N(HI) units ext_alnhi_unc : 'numpy.ndarray' extinction curve uncertainties in A(lambda)/N(HI) units """ # read in the data from files def __init__(self, obs_filename, path="./"): obs_filename = path + obs_filename # get the observed data filenames self.parse_obsfile(obs_filename) # extinction curve self.fit_extinction = False if self.obs_filenames["ext"] is not None: self.fit_extinction = True t = Table.read( path + self.obs_filenames["ext"], format="ascii.commented_header" ) self.ext_waves = np.array(t["wave"]) self.ext_alav = np.array(t["A(l)/A(V)"]) self.ext_alav_unc = np.array(t["unc"]) if "type" in t.colnames: self.ext_type = np.array(t["type"]) else: self.ext_type = np.full(len(t), "spec") # sort sindxs = np.argsort(self.ext_waves) self.ext_waves = self.ext_waves[sindxs] self.ext_alav = self.ext_alav[sindxs] self.ext_alav_unc = self.ext_alav_unc[sindxs] self.ext_type = self.ext_type[sindxs] self.ext_npts = len(self.ext_alav) else: self.ext_waves = np.logspace(np.log10(0.0912), np.log10(32.0), 200) self.ext_npts = 0 # dust abundances self.fit_abundance = False if self.obs_filenames["abund"] is not None: self.fit_abundance = True t = Table.read( path + self.obs_filenames["abund"], format="ascii.commented_header" ) self.abundance = {} self.total_abundance = {} for i in range(len(t)): self.abundance[t["atom"][i]] = (t["abund"][i], t["abund_unc"][i]) self.total_abundance[t["atom"][i]] = ( t["total_abund"][i], t["total_abund_unc"][i], ) self.abundance_npts = len(self.abundance) else: self.abundance_npts = 0 # diffuse IR emission spectrum self.fit_ir_emission = False if self.obs_filenames["ir_emis"] is not None: self.fit_ir_emission = True t = Table.read( path + self.obs_filenames["ir_emis"], format="ascii.commented_header" ) self.ir_emission_waves = np.array(t["WAVE"]) self.ir_emission = np.array(t["SPEC"]) / 1e20 self.ir_emission_unc = np.array(t["ERROR"]) / 1e20 # check if any uncs are zero (gindxs,) = np.where(self.ir_emission_unc == 0.0) if len(gindxs) > 0: self.ir_emission_unc[gindxs] = 0.1 * self.ir_emission[gindxs] # sort sindxs = np.argsort(self.ir_emission_waves) self.ir_emission_waves = self.ir_emission_waves[sindxs] self.ir_emission = self.ir_emission[sindxs] self.ir_emission_unc = self.ir_emission_unc[sindxs] self.ir_emission_npts = len(self.ir_emission) else: self.ir_emission_waves = np.logspace(np.log10(1.0), np.log10(500.0), 100) self.ir_emission_npts = 0 # dust albedo (Gordon et al. 2004 AoD proceedings) self.fit_scat_a = False self.fit_scat_g = False if (self.obs_filenames["scat_a"] is not None) & ( self.obs_filenames["scat_g"] is not None ): self.fit_scat_a = True self.fit_scat_g = True t = Table.read( path + self.obs_filenames["scat_a"], format="ascii.commented_header" ) self.scat_a_waves = np.array(t["wave"]) self.scat_albedo = np.array(t["albedo"]) self.scat_albedo_unc = np.array(t["unc"]) t = Table.read( path + self.obs_filenames["scat_g"], format="ascii.commented_header" ) self.scat_g_waves = np.array(t["wave"]) self.scat_g = np.array(t["g"]) self.scat_g_unc = np.array(t["unc"]) self.scat_a_npts = len(self.scat_albedo) self.scat_g_npts = len(self.scat_g) else: self.scat_a_waves = np.logspace(np.log10(0.1), np.log10(5.0), 100) self.scat_g_waves = np.logspace(np.log10(0.1), np.log10(5.0), 100) self.scat_a_npts = 0 self.scat_g_npts = 0 # normalization from N(HI) to A(V) for emission and abundance data if self.obs_filenames["avnhi"] is not None: t = Table.read( path + self.obs_filenames["avnhi"], format="ascii.commented_header", header_start=-1, ) self.avnhi = t["Av_to_NHI"][0] self.avnhi_unc = t["unc"][0] avnhi_rel_unc = self.avnhi_unc / self.avnhi # emission conversion rel_ir_emission_unc = self.ir_emission_unc / self.ir_emission self.ir_emission_av = self.ir_emission / self.avnhi self.ir_emission_av_unc = self.ir_emission_av * np.sqrt( np.square(rel_ir_emission_unc) + np.square(avnhi_rel_unc) ) # abundance conversion if self.obs_filenames["abund"] is not None: self.abundance_av = {} self.total_abundance_av = {} for key in self.abundance.keys(): old_abund, old_abund_unc = self.abundance[key] old_tot_abund, old_tot_abund_unc = self.total_abundance[key] rel_abund_unc = old_abund_unc / old_abund rel_tot_abund_unc = old_tot_abund_unc / old_tot_abund new_abund = old_abund / (10**6) new_abund /= self.avnhi new_tot_abund = old_tot_abund / (10**6) new_tot_abund /= self.avnhi new_abund_unc = new_abund * np.sqrt( np.square(rel_abund_unc) + np.square(avnhi_rel_unc) ) new_tot_abund_unc = new_tot_abund * np.sqrt( np.square(rel_tot_abund_unc) + np.square(avnhi_rel_unc) ) self.abundance_av[key] = (new_abund, new_abund_unc) self.total_abundance_av[key] = (new_tot_abund, new_tot_abund_unc) # extinction normalization from A(V) to N(HI) self.ext_alnhi = self.ext_alav * self.avnhi self.ext_alnhi_unc = np.square( self.ext_alav_unc / self.ext_alav ) + np.square(avnhi_rel_unc) self.ext_alnhi_unc = self.ext_alnhi * np.sqrt(self.ext_alnhi_unc)
[docs] def parse_obsfile(self, obs_filename): """ Pares the observations file and get the filenames for the different kinds of observations """ self.obs_filenames = {} # parse the observations file t = Table.read(obs_filename, format="ascii.commented_header") poss_types = ["ext", "ir_emis", "abund", "avnhi", "scat_a", "scat_g"] for ctype in poss_types: if ctype in t["type"]: gvals = t["type"] == ctype self.obs_filenames[ctype] = t["filename"][gvals].data[0] else: self.obs_filenames[ctype] = None