#!/usr/bin/env python3
import builtins
import operator
import os
from typing import Literal, Optional, Union
import numpy as np
from scipy import integrate # type: ignore
from specutils import Spectrum1D # type: ignore
from amespahdbpythonsuite.amespahdb import AmesPAHdb
from amespahdbpythonsuite.spectrum import Spectrum
message = AmesPAHdb.message
[docs]
class Fitted(Spectrum):
"""
AmesPAHdbPythonSuite fitted class
"""
def __init__(self, d: Optional[dict] = None, **keywords) -> None:
"""
Initialize fitted class.
"""
super().__init__(d, **keywords)
self.__set(d, **keywords)
[docs]
def plot(self, **keywords) -> None:
"""
Plotting method for the fitted spectrum and breakdown components.
"""
import matplotlib as mpl # type: ignore
import matplotlib.gridspec as gs # type: ignore
import matplotlib.pyplot as plt # type: ignore
if keywords.get("datalabel", False):
datalabel = keywords["datalabel"]
else:
datalabel = "obs"
axis = []
if keywords.get("sizedistribution", False):
fig, ax = plt.subplots()
axis.append(ax)
h, edges = self.getsizedistribution()
h = 100.0 * h / np.sum(h)
axis[0].bar(
edges[:-1],
h,
align="edge",
edgecolor="black",
width=(np.roll(edges, -1) - edges)[:-1],
)
axis[0].set_xlabel(r"n$_{\mathregular{carbon}}$")
axis[0].set_ylabel("frequency [%]")
else:
if keywords.get("residual", False):
fig = plt.figure()
spec = gs.GridSpec(2, 1, height_ratios=[3, 1])
axis.append(plt.subplot(spec[0]))
axis.append(plt.subplot(spec[1], sharex=axis[0]))
fig.subplots_adjust(hspace=0)
axis[0].tick_params(axis="x", which="both", labelbottom="off")
elif (
keywords.get("charge", False)
or keywords.get("size", False)
or keywords.get("composition", False)
):
fig, ax = plt.subplots()
axis.append(ax)
elif not keywords.get("sizedistribution", False):
fig = plt.figure()
spec = gs.GridSpec(1, 2, width_ratios=[2, 3])
axis.append(plt.subplot(spec[0]))
axis.append(plt.subplot(spec[1]))
fig.subplots_adjust(wspace=0.25)
axis[0].tick_params(
axis="x", which="both", bottom="off", top="off", labelbottom="off"
)
axis[0].tick_params(
axis="y", which="both", left="off", right="off", labelleft="off"
)
axis[0].set_ylim((0, 1))
axis = list(reversed(axis))
if keywords.get("wavelength", False):
x = 1e4 / self.grid
axis[0].set_xlim((min(x), max(x)))
xtitle = "Wavelength [$\\mu$m]"
else:
x = self.grid
axis[0].set_xlim((max(x), min(x)))
xtitle = (
self.units["abscissa"]["label"]
+ " ["
+ self.units["abscissa"]["unit"].to_string("latex_inline")
+ "]"
)
if "sigma" in keywords:
axis[0].errorbar(
x,
self.observation.flux.value,
yerr=keywords["sigma"],
fmt="o",
mfc="white",
color="k",
ecolor="k",
markersize=3,
elinewidth=0.2,
capsize=0.8,
label=datalabel,
)
else:
axis[0].scatter(
x, self.observation.flux.value, color="k", s=5, label=datalabel
)
if "title" in keywords:
axis[0].set_title(keywords["title"])
axis[0].minorticks_on()
axis[0].tick_params(
which="major", right="on", top="on", direction="in", length=5
)
axis[0].tick_params(
which="minor", right="on", top="on", direction="in", length=3
)
if (
not keywords.get("residual", False)
and not keywords.get("charge", False)
and not keywords.get("size", False)
and not keywords.get("composition", False)
):
colors = mpl.colormaps["rainbow"](np.linspace(0, 1, len(self.uids)))
for uid, col in zip(self.uids, colors):
axis[0].plot(x, self.data[uid], color=col)
axis[0].plot(x, self.getfit(), color="tab:purple", label="fit", lw=1)
if (
keywords.get("charge", False)
or keywords.get("size", False)
or keywords.get("composition", False)
):
classes = self.getclasses()
if isinstance(classes, dict):
if keywords.get("charge", False):
if not isinstance(classes["anion"], int):
axis[0].plot(x, classes["anion"], "r-", label="anion")
if not isinstance(classes["neutral"], int):
axis[0].plot(x, classes["neutral"], "g-", label="neutral")
if not isinstance(classes["cation"], int):
axis[0].plot(x, classes["cation"], "b-", label="cation")
axis[0].axhline(0, linestyle="--", color="gray")
axis[0].legend()
elif keywords.get("size", False):
if not isinstance(classes["small"], int):
axis[0].plot(x, classes["small"], "r-", label="small")
if not isinstance(classes["large"], int):
axis[0].plot(x, classes["large"], "g-", label="large")
axis[0].axhline(0, linestyle="--", color="gray")
axis[0].legend()
elif keywords.get("composition", False):
if not isinstance(classes["pure"], int):
axis[0].plot(x, classes["pure"], "r-", label="pure")
if not isinstance(classes["nitrogen"], int):
axis[0].plot(x, classes["nitrogen"], "g-", label="nitrogen")
axis[0].axhline(0, linestyle="--", color="gray")
axis[0].legend()
elif keywords.get("residual", False):
y = self.getresidual()
axis[1].plot(x, y)
axis[1].axhline(0, linestyle="--", color="gray")
axis[0].legend()
else:
axis[1].text(
0.05,
0.95,
("%s" + 5 * " " + "%s") % ("UID", "WEIGHT"),
family="monospace",
)
axis[1].xaxis.set_visible(False)
axis[1].yaxis.set_visible(False)
ypos = 0.95 - 0.05
for uid, w, col in zip(self.uids, self.weights.values(), colors):
txt = ("%d" + (5 * " ") + "%.2e") % (uid, w)
axis[1].text(0.05, ypos, txt, color=col, family="monospace")
ypos -= 0.05
if ypos <= 0.05:
axis[1].text(0.05, ypos, "more...", family="monospace")
break
axis[0].ticklabel_format(axis="y", style="sci", scilimits=(4, 4))
axis[0].set_ylabel(
self.units["ordinate"]["label"]
+ " ["
+ self.units["ordinate"]["unit"].to_string("latex_inline")
+ "]",
)
if keywords.get("residual", False):
axis[1].set_xlabel(f"{xtitle}")
axis[1].set_ylabel("residual")
axis[1].minorticks_on()
axis[1].tick_params(
which="major", right="on", top="on", direction="in", length=5
)
axis[1].tick_params(
which="minor", right="on", top="on", direction="in", length=3
)
else:
axis[0].set_xlabel(f"{xtitle}")
if keywords.get("save", False):
if keywords.get("charge"):
ptype = "charge"
elif keywords.get("size"):
ptype = "size"
elif keywords.get("composition"):
ptype = "composition"
elif keywords.get("residual"):
ptype = "residual"
else:
ptype = "fitted"
if keywords["output"]:
if os.path.isdir(keywords["output"]):
fig.savefig(f"{keywords['output']}/{ptype}.{keywords['ftype']}")
else:
fig.savefig(f"{keywords['output']}_{ptype}.{keywords['ftype']}")
else:
fig.savefig(f"{ptype}.{keywords['ftype']}")
else:
plt.show()
plt.close(fig)
[docs]
def set(self, d: Optional[dict] = None, **keywords) -> None:
"""
Calls class: :class:`amespahdbpythonsuite.spectrum.Spectrum.set` to parse keywords.
"""
Spectrum.set(self, d, **keywords)
self.__set(d, **keywords)
def __set(self, d: Optional[dict] = None, **keywords) -> None:
"""
Populate data dictionary helper.
"""
self.observation = keywords.get("observation", None)
self.weights = keywords.get("weights", dict())
self.method = keywords.get("method", "")
if isinstance(d, dict):
if d.get("type", "") == self.__class__.__name__:
if "observation" not in keywords:
self.observation = d["observation"]
if "weights" not in keywords:
self.weights = d["weights"]
if "method" not in keywords:
self.method = d["method"]
self._chisquared: Optional[float] = None
self._norm: Optional[float] = None
self._fit: Optional[np.ndarray] = None
self._classes: Optional[dict] = None
self._error: Optional[dict] = None
self._residual: Optional[np.ndarray] = None
[docs]
def get(self) -> dict:
"""
Calls class: :class:`amespahdbpythonsuite.spectrum.Spectrum.get`.
Assigns class variables from inherited dictionary.
"""
d = Spectrum.get(self)
d["type"] = self.__class__.__name__
d["observation"] = self.observation
d["weights"] = self.weights
d["method"] = self.method
return d
def __repr__(self) -> str:
"""
Class representation.
"""
return f"{self.__class__.__name__}(" f"{self.uids=},{self.method=})"
def __str__(self) -> str:
"""
A description of the instance.
"""
return f"AmesPAHdbPythonSuite Fitted instance.\n" f"{self.uids=}"
[docs]
def write(self, filename: str = "") -> None:
"""
Write the fitted spectra to file as an IPAC-table.
"""
import datetime
import sys
from astropy.io import ascii # type: ignore
from astropy.table import Table # type: ignore
if not filename:
filename = self.__class__.__name__ + ".tbl"
hdr = list()
kv = {
"DATE": datetime.datetime.now()
.astimezone()
.replace(microsecond=0)
.isoformat(),
"ORIGIN": "NASA Ames Research Center",
"CREATOR": f"Python {sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}",
"SOFTWARE": "AmesPAHdbPythonSuite",
"AUTHOR": "Dr. C. Boersma",
"TYPE": self.__class__.__name__.upper(),
"SPECIES": str(len(self.data)),
}
for key, value in kv.items():
if not value.isnumeric():
hdr.append(f"{key:8} = '{value}'")
else:
hdr.append(f"{key:8} = {value}")
tbl = Table(
[
[uid for uid, v in self.data.items() for _ in v],
np.array([f for _ in self.data.values() for f in self.grid])
* self.units["abscissa"]["unit"],
np.array([t for v in self.data.values() for t in v])
* self.units["ordinate"]["unit"],
np.array(
[self.weights[uid] for uid, v in self.data.items() for _ in v]
),
],
names=["UID", "FREQUENCY", "INTENSITY", "WEIGHT"],
meta={"comments": hdr},
)
ascii.write(tbl, filename, format="ipac", overwrite=True)
message(f"WRITTEN: {filename}")
[docs]
def getmethod(self) -> str:
"""
Retrieves the method used for the fit.
"""
return self.method
[docs]
def getchisquared(self) -> Optional[float]:
"""
Calculates the chi-squared of the fit.
"""
if self._chisquared is None:
if self.observation:
self._chisqured = np.sum(
(self.observation.flux.value - self.getfit()) ** 2
/ self.observation.uncertainty.array
)
return self._chisquared
[docs]
def getnorm(self) -> float:
"""
Retrieves the Norm of the fit.
"""
if self._norm is None:
self._norm = np.sum((self.observation.flux.value - self.getfit()) ** 2)
return self._norm
[docs]
def getobservation(self) -> Spectrum1D:
"""
Retrieves the ordinate values of the observation.
"""
return self.observation
[docs]
def getfit(self) -> Union[np.ndarray, Literal[0]]:
"""
Retrieves the sum of fit values.
"""
if self._fit is None:
self._fit = sum(self.data.values())
return self._fit
[docs]
def getresidual(self) -> np.ndarray:
"""
Retrieves the residual of the fit.
"""
if self._residual is None:
self._residual = self.observation.flux.value - self.getfit()
return self._residual
[docs]
def getweights(self) -> dict:
"""
Retrieves the weights of the fitted PAHs.
"""
return self.weights
[docs]
def getsizedistribution(
self,
nbins: int = 0,
min: Optional[float] = None,
max: Optional[float] = None,
) -> tuple:
"""
Retrieves the size distribution of the fitted PAHs.
"""
nc = [self.pahdb["species"][uid]["n_c"] for uid in self.weights]
if not min:
min = builtins.min(nc)
if not max:
max = builtins.max(nc)
if nbins == 0:
nbins = int(np.ceil(2.0 * len(self.uids) ** (1.0 / 3.0)))
return np.histogram(nc, bins=nbins, weights=list(self.weights.values()))
[docs]
def getclasses(self, **keywords) -> Optional[dict]:
"""
Retrieves the spectra of the different classes of the fitted PAHs.
"""
if self._classes is None:
if self.pahdb is None:
message("VALID DATABASE NEEDED TO GET CLASSES")
return None
self._classes = {
key: self.__classes(val)
for key, val in self._subclasses(**keywords).items()
}
uids = [
uid
for uid in self.uids
if self.pahdb["species"][uid]["n_n"] == 0
and self.pahdb["species"][uid]["n_o"] == 0
and self.pahdb["species"][uid]["n_mg"] == 0
and self.pahdb["species"][uid]["n_si"] == 0
and self.pahdb["species"][uid]["n_fe"] == 0
]
self._classes["pure"] = sum(
{key: val for key, val in self.data.items() if key in uids}.values()
)
return self._classes
def __classes(self, s: dict) -> np.ndarray:
"""
Retrieves the intensities of a given subclass.
"""
if s["subclass"] == "charge":
uids = [
uid
for uid in self.uids
if s["operator"](
self.pahdb["species"][uid][s["subclass"]], s["operand"]
)
]
else:
uids = [
uid
for uid in self.uids
if s["operator"](
self.pahdb["species"][uid][s["subclass"]], s["operand"]
)
]
if not uids:
return np.zeros(self.grid.size)
return sum({key: val for key, val in self.data.items() if key in uids}.values())
[docs]
def getbreakdown(self, **keywords) -> Optional[dict]:
"""
Retrieves the breakdown of the fitted PAHs.
"""
if not self.pahdb:
message("VALID DATABASE NEEDED TO GET CLASSES")
return None
# Getting fit weights
fweight = np.array(list(self.weights.values()))
breakdown = {
"solo": np.sum([self.pahdb["species"][uid]["n_solo"] for uid in self.uids]),
"duo": np.sum([self.pahdb["species"][uid]["n_duo"] for uid in self.uids]),
"trio": np.sum([self.pahdb["species"][uid]["n_trio"] for uid in self.uids]),
"quartet": np.sum(
[self.pahdb["species"][uid]["n_quartet"] for uid in self.uids]
),
"quintet": np.sum(
[self.pahdb["species"][uid]["n_quintet"] for uid in self.uids]
),
}
total = 1.0
if keywords.get("flux", False):
if not self.grid:
message("GRID IS NOT SET")
return None
classes = self.getclasses(**keywords)
if keywords.get("absolute", False):
total, err = -integrate.trapezoid(self.getfit(), x=self.grid)
if classes:
for key in classes:
breakdown[key] = (
-integrate.trapezoid(classes[key], x=self.grid) / total
)
return breakdown
if not self.weights:
message("WEIGHTS ARE NOT SET")
return None
if "absolute" not in keywords:
total = np.sum(fweight)
# Set subclasses dictionary.
subclasses = self._subclasses(**keywords)
for key in subclasses:
breakdown[key] = self.__breakdown(subclasses[key]) / total
# Obtaining pure PAH breakdown.
uids = [
uid
for uid in self.uids
if self.pahdb["species"][uid]["n_n"] == 0
and self.pahdb["species"][uid]["n_o"] == 0
and self.pahdb["species"][uid]["n_mg"] == 0
and self.pahdb["species"][uid]["n_si"] == 0
and self.pahdb["species"][uid]["n_fe"] == 0
]
if len(uids) > 0:
breakdown["pure"] = np.sum([self.weights[uid] for uid in uids]) / total
# Getting Nc.
nc = np.array([self.pahdb["species"][uid]["n_c"] for uid in self.uids])
breakdown["n_c"] = np.sum(nc * fweight) / np.sum(fweight)
return breakdown
def __breakdown(self, s: dict) -> float:
"""
Retrieve the sum of the fitting weights for the fitted PAHs.
"""
if s["subclass"] == "charge":
uids = [
uid
for uid in self.uids
if s["operator"](
self.pahdb["species"][uid][s["subclass"]], s["operand"]
)
]
else:
uids = [
uid
for uid in self.uids
if s["operator"](
self.pahdb["species"][uid][s["subclass"]], s["operand"]
)
]
if len(uids) > 0:
return np.sum([self.weights[uid] for uid in uids])
else:
return 0.0
def _subclasses(self, **keywords) -> dict:
"""
Create subclasses dictionary.
"""
subclasses = {
"anion": {"subclass": "charge", "operator": operator.lt, "operand": 0},
"neutral": {"subclass": "charge", "operator": operator.eq, "operand": 0},
"cation": {"subclass": "charge", "operator": operator.gt, "operand": 0},
"small": {
"subclass": "n_c",
"operator": operator.le,
"operand": keywords.get("small", 50),
},
"large": {
"subclass": "n_c",
"operator": operator.gt,
"operand": keywords.get("small", 50),
},
"nitrogen": {"subclass": "n_n", "operator": operator.gt, "operand": 0},
}
return subclasses
[docs]
def geterror(self) -> Optional[dict]:
"""
Calculates the PAHdb fitting uncertainty
as the ratio of the residual over the total spectrum area.
"""
if self._error is None:
range = {
"err": [min(self.grid), max(self.grid)],
"e127": [754.0, 855.0],
"e112": [855.0, 1000.0],
"e77": [1000.0, 1495.0],
"e62": [1495.0, 1712.0],
"e33": [2900.0, 3125.0],
}
self._error = {k: 0.0 for k in range.keys()}
if self.observation:
srt = np.argsort(self.grid)
x = self.grid[srt]
y = self.observation.flux.value[srt]
r = np.abs(self.getresidual())[srt]
for key, rng in range.items():
sel = np.where((x >= rng[0]) & (x <= rng[1]))[0]
if len(sel) == 0:
continue
self._error[key] = np.trapz(r[sel], x=x[sel]) / np.trapz(
y[sel], x=x[sel]
)
return self._error
[docs]
def sort(self, flux: bool = False) -> dict:
"""
Sort UIDs and weights by their contribution to the fit
"""
w = (
self.weights
if not flux
else {
uid: -integrate.trapezoid(flux, x=self.grid)
for uid, flux in self.data.items()
}
)
self.weights = dict(
sorted(w.items(), key=lambda item: (item[1], item[0]), reverse=True)
)
self.uids = list(self.weights.keys())
return self.weights