Source code for amespahdbpythonsuite.spectrum

#!/usr/bin/env python3

from __future__ import annotations

from typing import TYPE_CHECKING, Optional, Union

import astropy.units as u  # type: ignore
import numpy as np
from astropy.nddata import StdDevUncertainty  # type: ignore
from scipy import optimize  # type: ignore
from specutils import Spectrum1D, manipulation  # type: ignore

from amespahdbpythonsuite.amespahdb import AmesPAHdb
from amespahdbpythonsuite.transitions import Transitions

if TYPE_CHECKING:
    from amespahdbpythonsuite.coadded import Coadded
    from amespahdbpythonsuite.fitted import Fitted
    from amespahdbpythonsuite.observation import Observation
    from amespahdbpythonsuite.mcfitted import MCFitted

import copy
import multiprocessing as mp
from functools import partial

message = AmesPAHdb.message


[docs] class Spectrum(Transitions): """ AmesPAHdbPythonSuite spectrum class. Contains methods to fit and plot the input spectrum. """ def __init__(self, d: Optional[dict] = None, **keywords) -> None: super().__init__(d, **keywords) self.__set(d, **keywords)
[docs] def set(self, d: Optional[dict] = None, **keywords) -> None: """ Calls class: :class:`amespahdbpythonsuite.transitions.Transitions.set` to parse keywords. """ Transitions.set(self, d, **keywords) self.__set(d, **keywords)
def __set(self, d: Optional[dict] = None, **keywords) -> None: """ Populate data dictionary helper. """ self.grid = keywords.get("grid", list()) self.profile = keywords.get("profile", "") self.fwhm = keywords.get("fwhm", 0.0) if isinstance(d, dict): if d.get("type", "") == self.__class__.__name__: if "grid" not in keywords: self.grid = d["grid"] if "profile" not in keywords: self.profile = d["profile"] if "fwhm" not in keywords: self.fwhm = d["fwhm"]
[docs] def get(self) -> dict: """ Calls class: :class:`amespahdbpythonsuite.transitions.Transitions.get`. Assigns class variables from inherited dictionary. """ d = Transitions.get(self) d["type"] = self.__class__.__name__ d["grid"] = self.grid d["profile"] = self.profile d["fwhm"] = self.fwhm return d
def __repr__(self) -> str: """ Class representation. """ return ( f"{self.__class__.__name__}(" f"{self.uids=},{self.grid=},{self.profile=},{self.fwhm=})" ) def __str__(self) -> str: """ A description of the instance. """ return f"AmesPAHdbPythonSuite Spectrum instance.\n" f"{self.uids=}"
[docs] def write(self, filename: str = "") -> None: """ Write the spectra to file as an IPAC-table. """ import datetime import sys from astropy.io import ascii # type: ignore from astropy.table import Table # type: ignore if filename == "": filename = self.__class__.__name__ + ".tbl" hdr = list() kv = { "DATE": datetime.datetime.now() .astimezone() .replace(microsecond=0) .isoformat(), "ORIGIN": "NASA Ames Research Center", "CREATOR": f"Python {sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}", "SOFTWARE": "AmesPAHdbPythonSuite", "AUTHOR": "Dr. C. Boersma", "TYPE": self.__class__.__name__.upper(), "SPECIES": str(len(self.data)), } for key, value in kv.items(): if not value.isnumeric(): hdr.append(f"{key:8} = '{value}'") else: hdr.append(f"{key:8} = {value}") tbl = Table( [ [uid for uid, v in self.data.items() for _ in v], np.array([f for _ in self.data.values() for f in self.grid]) * self.units["abscissa"]["unit"], np.array([t for v in self.data.values() for t in v]) * self.units["ordinate"]["unit"], ], names=["UID", "FREQUENCY", "INTENSITY"], meta={"comments": hdr}, ) ascii.write(tbl, filename, format="ipac", overwrite=True) message(f"WRITTEN: {filename}")
[docs] def fit( self, y: Union[Observation, Spectrum1D, list], yerr: list = list(), notice: bool = True, **keywords, ) -> Optional[Fitted]: """ Fits the input spectrum. """ from amespahdbpythonsuite import observation if isinstance(y, Spectrum1D): obs = copy.deepcopy(y) elif isinstance(y, observation.Observation): obs = copy.deepcopy(y.spectrum) else: unc = None if np.any(yerr): unc = StdDevUncertainty(yerr) obs = Spectrum1D( flux=y * u.Unit(), spectral_axis=self.grid * self.units["abscissa"]["unit"], uncertainty=unc, ) if obs.spectral_axis.unit != u.Unit() and obs.spectral_axis.unit != u.Unit( "1/cm" ): message("EXPECTING SPECTRAL UNITS OF 1 / CM") return None matrix = np.array(list(self.data.values())) if obs.uncertainty is None: method = "NNLS" b = obs.flux.value m = matrix.copy() else: method = "NNLC" b = np.divide(obs.flux.value, obs.uncertainty.array) m = np.divide(matrix, obs.uncertainty.array) if notice: message(f"DOING {method}") scl = m.max() m /= scl solution, _ = optimize.nnls(m.T, b, maxiter=1024, atol=1e-16) solution /= scl # Initialize lists and dictionaries. uids = list() data = dict() weights = dict() # Retrieve uids, data, and fit weights dictionaries. for ( uid, s, m, ) in zip(self.uids, solution, matrix): if s > 0: intensities = [] uids.append(uid) for d in m: intensities.append(s * d) data[uid] = np.array(intensities) weights[uid] = s if notice: message( [ " NOTICE: PLEASE TAKE CONSIDERABLE CARE WHEN INTERPRETING ", " THESE RESULTS AND PUTTING THEM IN AN ASTRONOMICAL ", " CONTEXT. THERE ARE MANY SUBTLETIES THAT NEED TO BE TAKEN", " INTO ACCOUNT, RANGING FROM PAH SIZE, INCLUSION OF ", " HETEROATOMS, ETC. TO DETAILS OF THE APPLIED EMISSION ", " MODEL, BEFORE ANY THOROUGH ASSESSMENT CAN BE MADE. ", ] ) from amespahdbpythonsuite.fitted import Fitted return Fitted( database=self.database, version=self.version, data=data, pahdb=self.pahdb, uids=uids, model=self.model, units=self.units, shift=self._shift, grid=self.grid, profile=self.profile, fwhm=self.fwhm, observation=obs, weights=weights, method=method, )
[docs] def plot(self, **keywords) -> None: """ Plot the spectrum. """ import matplotlib as mpl # type: ignore import matplotlib.pyplot as plt # type: ignore _, ax = plt.subplots() ax.minorticks_on() ax.tick_params(which="major", right="on", top="on", direction="in") colors = mpl.colormaps["rainbow"](np.linspace(0, 1, len(self.uids))) for y, col in zip(self.data.values(), colors): ax.plot(self.grid, y, color=col) ax.set_xlim((max(self.grid), min(self.grid))) ax.set_xlabel( self.units["abscissa"]["label"] + " [" + self.units["abscissa"]["unit"].to_string("latex_inline") + "]", ) unit = self.units["ordinate"]["unit"] scale = unit.scale unit /= scale unit = unit.decompose().cgs.unit pre = "" if scale != 1.0: s = np.log10(scale) pre = r"$\times10^{" + f"{s:.0f}" + r"}$ " ax.set_ylabel( self.units["ordinate"]["label"] + " [" + pre + unit.to_string("latex_inline") + "]", ) basename = keywords.get("save") if basename: if not isinstance(basename, str): basename = "spectrum" plt.savefig(f"{basename}.pdf") elif keywords.get("show", False): plt.show()
[docs] def getgrid(self) -> list: """ Return the grid. """ return self.grid
[docs] def coadd(self, weights: dict = dict(), average: bool = False) -> Coadded: """ Co-add PAHdb spectra. Parameters: weights: dict Dictionary of fit weights to use when co-adding. average: bool If True calculates the average coadded spectrum. """ data: Union[np.ndarray, float] if weights: data = np.zeros(len(self.grid)) for uid, weight in weights.items(): data += self.data[uid] * weight else: data = sum(self.data.values()) if average: data /= len(self.data.keys()) from amespahdbpythonsuite.coadded import Coadded return Coadded( database=self.database, version=self.version, data={0: data}, pahdb=self.pahdb, uids=[0], model=self.model, units=self.units, shift=self._shift, grid=self.grid, profile=self.profile, fwhm=self.fwhm, weights=weights, averaged=average, )
[docs] def normalize(self, all: bool = False) -> Union[float, dict]: """ Normalize spectral data """ max: Union[float, dict] = 0.0 if all: for intensities in self.data.values(): m = intensities.max() if m > max: max = m for intensities in self.data.values(): intensities /= max else: max = dict() for uid, intensities in self.data.items(): m = intensities.max() intensities /= m max[uid] = m return max
[docs] def resample(self, grid: np.ndarray) -> None: """ Resample the spectral data. """ resampler = manipulation.FluxConservingResampler( extrapolation_treatment="nan_fill" ) for uid, intensities in self.data.items(): s = resampler( Spectrum1D( spectral_axis=self.grid * self.units["abscissa"]["unit"], flux=intensities * self.units["ordinate"]["unit"], ), grid * self.units["abscissa"]["unit"], ) self.data[uid] = s.flux.value self.grid = grid
[docs] def mcfit( self, y: Union[Observation, Spectrum1D, list], yerr: list = list(), samples: int = 1024, uniform: bool = False, multiprocessing: bool = False, notice: bool = True, **keywords, ) -> Optional[MCFitted]: """ Monte Carlo sampling and fitting to the input spectrum. Parameters ---------- samples : Number of samples. int """ from tqdm import tqdm # type: ignore from amespahdbpythonsuite import observation from amespahdbpythonsuite.mcfitted import MCFitted if isinstance(y, Spectrum1D): obs = copy.deepcopy(y) elif isinstance(y, observation.Observation): obs = copy.deepcopy(y.spectrum) else: unc = None if np.any(yerr): unc = StdDevUncertainty(yerr) obs = Spectrum1D( flux=y * u.Unit(), spectral_axis=self.grid * self.units["abscissa"]["unit"], uncertainty=unc, ) if obs.spectral_axis.unit != u.Unit() and obs.spectral_axis.unit != u.Unit( "1/cm" ): message("EXPECTING SPECTRAL UNITS OF 1 / CM") return None if obs.uncertainty is None: message("UNCERTAINTIES REQUIRED FOR MCFIT") return None if notice: message( [ " NOTICE: PLEASE TAKE CONSIDERABLE CARE WHEN INTERPRETING ", " THESE RESULTS AND PUTTING THEM IN AN ASTRONOMICAL ", " CONTEXT. THERE ARE MANY SUBTLETIES THAT NEED TO BE TAKEN", " INTO ACCOUNT, RANGING FROM PAH SIZE, INCLUSION OF ", " HETEROATOMS, ETC. TO DETAILS OF THE APPLIED EMISSION ", " MODEL, BEFORE ANY THOROUGH ASSESSMENT CAN BE MADE. ", ] ) mcfits = list() # Start the MC sampling and fitting. if multiprocessing: from amespahdbpythonsuite.fitted import Fitted matrix = np.array(list(self.data.values())) m = np.divide(matrix, obs.uncertainty.array) scl = m.max() m /= scl pool = mp.Pool(mp.cpu_count() - 1) for solution, b in tqdm( pool.imap_unordered( partial( _mcfit, m=m, x=obs.flux.value, u=obs.uncertainty.array, uniform=uniform, ), range(samples), ), desc="samples", leave=True, unit="samples", colour="blue", total=samples, ): # Initialize lists and dictionaries. uids = list() data = dict() weights = dict() # Retrieve uids, data, and fit weights dictionaries. for ( uid, s, m, ) in zip(self.uids, solution / scl, matrix): if s > 0: uids.append(uid) data[uid] = s * m * obs.flux.unit weights[uid] = s obs_fit = Spectrum1D( flux=b * obs.flux.unit, spectral_axis=copy.deepcopy(obs.spectral_axis), uncertainty=copy.deepcopy(obs.uncertainty), ) mcfits.append( Fitted( database=self.database, version=self.version, data=data, pahdb=self.pahdb, uids=uids, model=self.model, units=self.units, shift=self._shift, grid=self.grid, profile=self.profile, fwhm=self.fwhm, observation=obs_fit, weights=weights, method="NNLC", ) ) else: for _ in tqdm( range(samples), desc="samples", leave=True, unit="samples", colour="blue", ): if uniform: # Calculate new flux based on random uniform distribution sampling. flux = ( obs.uncertainty.array * np.random.uniform(-1, 1, obs.flux.shape) + obs.flux.value ) else: # Calculate new flux based on random normal distribution sampling. flux = np.random.normal( obs.flux.value, obs.uncertainty.array, obs.flux.shape ) # Fit the spectrum. fit = self.fit(flux * obs.flux.unit, obs.uncertainty, notice=False) # Obtain the fit and weights. if fit: mcfits.append(fit) return MCFitted( mcfits=mcfits, distribution="uniform" if uniform else "normal", observation=obs, )
def _mcfit(_, m, x, u, uniform) -> tuple: if uniform: # Calculate new flux based on random uniform distribution sampling. b = u * np.random.uniform(-1, 1, x.shape) + x else: # Calculate new flux based on random normal distribution sampling. b = np.random.normal(x, u, x.shape) # Fit the spectrum. solution, _ = optimize.nnls(m.T, np.divide(b, u), maxiter=1024, atol=1e-16) return solution, b