#!/usr/bin/env python3
"""decomposer.py
Subclass of DecomposerBase for writting results to disk.
This file is part of pypahdb - see the module docs for more
information.
"""
import copy
import sys
from datetime import datetime, timezone
from functools import cached_property
import matplotlib.gridspec as gridspec
import matplotlib.pyplot as plt
import numpy as np
from astropy.io import fits
from astropy.wcs import WCS
from matplotlib import cm, colormaps, colors
from matplotlib.backends.backend_pdf import PdfPages
from mpl_toolkits.axes_grid1.inset_locator import inset_axes
import pypahdb
from pypahdb.decomposer_base import MEDIUM_SIZE, SMALL_SIZE, DecomposerBase
[docs]
class Decomposer(DecomposerBase):
"""Extends DecomposerBase to write results to disk (PDF, FITS)."""
def __init__(self, spectrum):
"""Initialize Decomposer object.
Inherits from DecomposerBase defined in decomposer_base.py.
Args:
spectrum (specutils.Spectrum1D): The data to fit/decompose.
"""
DecomposerBase.__init__(self, spectrum)
[docs]
@cached_property
def cation_neutral_ratio(self):
"""
Compute the cation to neutral ratio and return it.
"""
cation_neutral_ratio = self.charge_fractions["cation"].copy()
nonzero = np.nonzero(self.charge_fractions["neutral"])
cation_neutral_ratio[nonzero] /= self.charge_fractions["neutral"][nonzero]
return cation_neutral_ratio
[docs]
def save_pdf(self, filename, header="", domaps=True, doplots=True):
"""Save a PDF summary of the fit results.
Notes:
None.
Args:
filename (str): Path to save to.
header (str): Optional, header data.
Keywords:
domaps (bool): Save maps to PDF (defaults to True).
doplots (bool): Save plots to PDF (defaults to True).
Returns:
None.
"""
with PdfPages(filename) as pdf:
d = pdf.infodict()
d["Title"] = "pyPAHdb Results Summary"
d["Author"] = "Dr. C. Boersma, Dr. M.J. Shannon, and Dr. A. Maragkoudakis"
d["Producer"] = "NASA Ames Research Center"
d["Creator"] = "pypahdb v{}(Python {}.{}.{})".format(
pypahdb.__version__,
sys.version_info.major,
sys.version_info.minor,
sys.version_info.micro,
)
d["Subject"] = "Summary of pyPAHdb Decomposition"
d["Keywords"] = "pyPAHdb, PAH, database, ERS, JWST"
d["CreationDate"] = datetime.now(timezone.utc).strftime("D:%Y%m%d%H%M%S")
d["Description"] = (
"This file contains results from pyPAHdb. "
"pyPAHdb was created as part of the JWST ERS "
"Program titled 'Radiative Feedback from Massive Stars as "
"Traced by Multiband Imaging and Spectroscopic Mosaics' (ID "
"1288)). Visit https://github.com/pahdb/pypahdb/ for more"
"information."
)
if domaps is True:
if isinstance(header, fits.header.Header):
if "OBJECT" in header:
d["Title"] = d["Title"] + " - " + header["OBJECT"]
hdr = copy.deepcopy(header)
hdr["NAXIS"] = 2
cards = [
"NAXIS3",
"PC3_3",
"CRPIX3",
"CRVAL3",
"CTYPE3",
"CDELT3",
"CUNIT3",
"PS3_0",
"PS3_1",
"WCSAXES",
]
for c in cards:
if c in hdr:
del hdr[c]
wcs = WCS(hdr)
else:
wcs = None
fig = self.plot_map(
self.cation_neutral_ratio.value,
self.mask,
"n$_{{cation}}$/n$_{{neutral}}$",
wcs=wcs,
)
pdf.savefig(fig)
plt.close(fig)
fig = self.plot_map(
self.nc,
self.mask,
"average PAH size (N$_{{C}}$)",
wcs=wcs,
)
pdf.savefig(fig)
plt.close(fig)
fig = self.plot_map(
self.charge_fractions["neutral"],
self.mask,
"PAH neutral fraction",
wcs=wcs,
)
pdf.savefig(fig)
plt.close(fig)
fig = self.plot_map(
self.charge_fractions["cation"],
self.mask,
"PAH cation fraction",
wcs=wcs,
)
pdf.savefig(fig)
plt.close(fig)
fig = self.plot_map(
self.charge_fractions["anion"],
self.mask,
"PAH anion fraction",
wcs=wcs,
)
pdf.savefig(fig)
plt.close(fig)
fig = self.plot_map(
self.size_fractions["large"],
self.mask,
f"large PAH fraction (N$_{{C}}$ > {MEDIUM_SIZE})",
wcs=wcs,
)
pdf.savefig(fig)
plt.close(fig)
fig = self.plot_map(
self.size_fractions["medium"],
self.mask,
f"medium PAH fraction ({SMALL_SIZE} < N$_{{C}}$ ≤ {MEDIUM_SIZE})",
wcs=wcs,
)
pdf.savefig(fig)
plt.close(fig)
fig = self.plot_map(
self.size_fractions["small"],
self.mask,
f"small PAH fraction (N$_{{C}}$ ≤ {SMALL_SIZE})",
wcs=wcs,
)
pdf.savefig(fig)
plt.close(fig)
fig = self.plot_map(self.error, self.mask, "error", wcs=wcs)
pdf.savefig(fig)
plt.close(fig)
fig = self.plot_map(
self.mask.astype(float),
np.ones(self.mask.shape, dtype=bool),
"mask",
wcs=wcs,
)
pdf.savefig(fig)
plt.close(fig)
if doplots:
ordinate = self.spectrum.flux.T
for i in range(ordinate.shape[1]):
for j in range(ordinate.shape[2]):
fig = self.plot_fit(i, j)
pdf.savefig(fig)
plt.close(fig)
return
[docs]
def save_fits(self, filename, header=""):
"""Save FITS file summary of the fit results.
Args:
filename (str): Path to save to.
header (str): Optional, header for the FITS file.
"""
def _fits_to_disk(hdr, filename):
"""Writes the FITS file to disk, with header.
Args:
hdr (fits.header.Header): FITS header.
filename (str): Path of FITS file to be saved.
"""
hdr["DATE"] = (datetime.today().isoformat(), "When this file was generated")
hdr["ORIGIN"] = (
"NASA Ames Research Center",
"Organization generating this file",
)
hdr["CREATOR"] = (
"pypahdb v{} (Python {}.{}.{})".format(
pypahdb.__version__,
sys.version_info.major,
sys.version_info.minor,
sys.version_info.micro,
),
"Software used to create this file",
)
hdr["AUTHOR"] = (
"Dr. C. Boersma, Dr. M.J. Shannon, and Dr. A. Maragkoudakis",
"Authors of the software",
)
cards = [
"PC3_3",
"CRPIX3",
"CRVAL3",
"CTYPE3",
"CDELT3",
"CUNIT3",
"PS3_0",
"PS3_1",
"WCSAXES",
]
for c in cards:
if c in hdr:
del hdr[c]
comments = (
"This file contains results from pypahdb.\n"
"Pypahdb was created as part of the JWST ERS Program "
"titled 'Radiative Feedback from Massive Stars as "
"Traced by Multiband Imaging and Spectroscopic "
"Mosaics' (ID 1288).\n"
"Visit https://github.com/pahdb/pypahdb/ for more "
"information on pypahdb."
)
for line in comments.split("\n"):
for chunk in [line[i: i + 72] for i in range(0, len(line), 72)]:
hdr["COMMENT"] = chunk
hdr["COMMENT"] = "1st extension has the PAH neutral fraction."
hdr["COMMENT"] = "2nd extension has the PAH cation fraction."
hdr["COMMENT"] = "3rd extension has the PAH anion fraction."
hdr["COMMENT"] = "4th extension has the PAH large fraction."
hdr["COMMENT"] = "5th extension has the PAH medium fraction."
hdr["COMMENT"] = "6th extension has the PAH small fraction."
hdr["COMMENT"] = "7th extension has the error."
hdr["COMMENT"] = "8th extension has the cation to neutral PAH ratio."
hdr["COMMENT"] = "9th extension has the average Nc."
hdr["COMMENT"] = "10th extension has the computed mask."
# Write results to FITS-file with multiple extension.
primary_hdu = fits.PrimaryHDU(header=hdr)
hdulist = fits.HDUList([primary_hdu])
for key, value in self.charge_fractions.items():
hdulist.append(fits.ImageHDU(data=value.value, name=key.upper()))
for key, value in self.size_fractions.items():
hdulist.append(fits.ImageHDU(data=value.value, name=key.upper()))
hdulist.append(fits.ImageHDU(data=self.error.value, name="ERROR"))
hdulist.append(fits.ImageHDU(data=self.nc.value, name="NC"))
hdulist.append(
fits.ImageHDU(
data=self.cation_neutral_ratio.value, name="CATION_NEUTRAL_RATIO"
)
)
hdulist.append(fits.ImageHDU(data=self.mask.astype(int), name="MASK"))
hdulist.writeto(filename, overwrite=True, output_verify="fix")
return
# Save results to FITS-file
if isinstance(header, fits.header.Header):
# TODO: Clean up header.
hdr = copy.deepcopy(header)
else:
hdr = fits.Header()
_fits_to_disk(hdr, filename)
return
[docs]
@staticmethod
def plot_map(data, mask, title, wcs=None):
"""Plots a map.
Notes:
None.
Args:
data (numpy): Image.
mask (numpy): Mask.
title (string): Image title.
Keywords:
wcs (wcs.wcs): WCS (defaults to None).
Returns:
fig (matplotlib.figure.Figure): Instance of figure.
"""
mmin = np.nanmin(data[mask])
im = data - mmin
mmax = np.nanmax(im[mask])
im /= mmax
cmap = colormaps["rainbow"]
x, y = np.meshgrid(np.arange(0, im.shape[1] + 1), np.arange(0, im.shape[0] + 1))
x = x.astype("float") - 0.5
y = y.astype("float") - 0.5
if wcs:
a, d = wcs.pixel_to_world_values(x, y)
wcs_proj = wcs.deepcopy()
wcs_proj.wcs.pc = [[-1, 0], [0, 1]]
x, y = wcs_proj.world_to_pixel_values(a, d)
ax = plt.subplot(projection=wcs_proj)
else:
ax = plt.subplot()
ax.set_aspect("equal", adjustable="box", anchor="SW")
ax.set_facecolor("#000000")
ax.set_xlim(x.min() - 1, x.max() + 1)
ax.set_ylim(y.min() - 1, y.max() + 1)
args = list()
for i in range(im.shape[0]):
ii = [i, i + 1, i + 1, i, i]
for j in range(im.shape[1]):
if mask[i, j] and np.isfinite(im[i, j]):
jj = [j, j, j + 1, j + 1, j]
args += [x[ii, jj], y[ii, jj], colors.to_hex(cmap(im[i, j]))]
plt.fill(*tuple(args))
if wcs:
reverse = x[0, 0] < x[-1, 0]
if reverse:
ax.invert_xaxis()
plt.arrow(
0.98,
0.02,
0.0,
0.1,
transform=ax.transAxes,
width=0.005,
color="white",
)
plt.text(
0.84,
0.02,
"E",
transform=ax.transAxes,
color="white",
horizontalalignment="center",
verticalalignment="center",
)
plt.arrow(
0.98,
0.02,
-0.1,
0.0,
transform=ax.transAxes,
width=0.005,
color="white",
)
plt.text(
0.98,
0.16,
"N",
transform=ax.transAxes,
color="white",
horizontalalignment="center",
verticalalignment="center",
)
x0, y0 = ax.transLimits.inverted().transform((0.075, 0.1))
if reverse:
x1 = x0 - 1
else:
x1 = x0 + 1
y1 = y0 + 1
x = [x0, x1, x1, x0, x0]
y = [y0, y0, y1, y1, y0]
plt.fill(
x,
y,
hatch=r"\\\\\\\\\/////////",
edgecolor="white",
facecolor=(1, 1, 1, 0.0),
)
plt.text(
x0 + (x1 - x0) / 2.0,
y0 - 0.5,
"pixel",
color="white",
horizontalalignment="center",
verticalalignment="top",
)
x0, y0 = ax.transLimits.inverted().transform((0.25, 0.1))
if reverse:
x1 = x0 - 1.0 / (3600 * wcs.wcs.cdelt[0])
else:
x1 = x0 + 1.0 / (3600 * wcs.wcs.cdelt[0])
plt.plot([x0, x1], [y0, y0], color="white", linewidth=1.5)
plt.text(
x0 + (x1 - x0) / 2.0,
y0 - 0.5,
'1"',
color="white",
horizontalalignment="center",
verticalalignment="top",
)
plt.xlabel("right ascension")
plt.ylabel("declination")
else:
plt.xlabel("pixel [#]")
plt.ylabel("pixel [#]")
fig = plt.gcf()
fig.set_layout_engine("constrained")
colorbar = cm.ScalarMappable(cmap=cmap)
colorbar.set_clim(mmin, mmax)
cax = inset_axes(
ax, width="2%", height="100%", loc="center right", borderpad=-1
)
plt.colorbar(colorbar, cax=cax)
cax.locator_params(axis="y", steps=[1, 10])
cax.set_ylabel(title)
return fig
[docs]
def plot_fit(self, i=0, j=0):
"""Plots a fit and saves it to a PDF.
Notes:
None.
Args:
i (int): Pixel coordinate (abscissa).
j (int): Pixel coordinate (ordinate).
Returns:
fig (matplotlib.figure.Figure): Instance of figure.
"""
# Create figure on shared axes.
fig = plt.figure()
gs = gridspec.GridSpec(4, 1, height_ratios=[2, 1, 2, 2], figure=fig)
# Add some spacing between axes.
gs.update(wspace=0.025, hspace=0.00)
ax0 = fig.add_subplot(gs[0])
ax1 = fig.add_subplot(gs[1], sharex=ax0)
ax2 = fig.add_subplot(gs[2], sharex=ax0)
ax3 = fig.add_subplot(gs[3], sharex=ax0)
for ax in [ax0, ax2, ax3]:
fmt = ax.yaxis.get_major_formatter()
fmt.set_scientific(True)
fmt.set_powerlimits((-3, 1))
for ax in [ax0, ax1, ax2]:
plt.setp(ax.get_xticklabels(), visible=False)
# Convenience definitions.
abscissa = self.spectrum.spectral_axis
charge = self.charge
# Check if size of datapoints are too large and change marker size.
ms = 5 if len(abscissa) < 1000 else 2
# ax0: Best fit.
data = self.spectrum.flux.T[:, i, j]
unc = None
if self.spectrum.uncertainty:
unc = self.spectrum.uncertainty.quantity.T[:, i, j]
model = self.fit[:, i, j]
ax0.errorbar(
abscissa,
data,
yerr=unc,
marker=".",
ms=ms,
mew=0.5,
lw=0,
color="black",
ecolor="grey",
capsize=2,
label="input",
zorder=0,
)
ax0.plot(abscissa, model, label="fit", color="tab:red", lw=1.5)
error_str = "$error$=%-4.2f" % (self.error[i][j])
ax0.text(
0.025, 0.88, error_str, ha="left", va="center", transform=ax0.transAxes
)
ax0.set_ylabel(
f'{self.spectrum.meta["colnames"][1]} [{self.spectrum.flux.unit}]'
)
# ax1: Residual.
ax1.plot(abscissa, data - model, lw=1, label="residual", color="gray")
ax1.axhline(y=0, color="0.5", ls="--", dashes=(12, 16), zorder=-10, lw=0.5)
# ax2: Size breakdown.
ax2.errorbar(
abscissa,
data,
yerr=unc,
marker=".",
ms=ms,
mew=0.5,
lw=0,
color="black",
ecolor="grey",
capsize=2,
zorder=0,
)
ax2.plot(abscissa, model, color="tab:red", lw=1.5)
ax2.plot(
abscissa,
self.size["large"][:, i, j],
label="large",
lw=1,
color="tab:orange",
)
ax2.plot(
abscissa,
self.size["medium"][:, i, j],
label="medium",
lw=1,
color="tab:green",
)
ax2.plot(
abscissa, self.size["small"][:, i, j], label="small", lw=1, color="tab:blue"
)
size_str = "$f_{large}$=%3.1f" % (self.size_fractions["large"][i][j])
ax2.text(0.025, 0.88, size_str, ha="left", va="center", transform=ax2.transAxes)
ax2.set_ylabel(
f'{self.spectrum.meta["colnames"][1]} [{self.spectrum.flux.unit}]'
)
# ax3: Charge breakdown.
ax3.errorbar(
abscissa,
data,
yerr=unc,
marker=".",
ms=ms,
mew=0.5,
lw=0,
color="black",
ecolor="grey",
capsize=2,
zorder=0,
)
ax3.plot(abscissa, model, color="red", lw=1.5)
ax3.plot(
abscissa, charge["anion"][:, i, j], label="anion", lw=1, color="tab:orange"
)
ax3.plot(
abscissa,
charge["neutral"][:, i, j],
label="neutral",
lw=1,
color="tab:cyan",
)
ax3.plot(
abscissa,
charge["cation"][:, i, j],
label="cation",
lw=1,
color="tab:purple",
)
cnr_str = "$n_{cation}/n_{neutral}$=%3.1f" % (
self.charge_fractions["cation"][i][j]
/ self.charge_fractions["neutral"][i][j]
)
ax3.text(0.025, 0.88, cnr_str, ha="left", va="center", transform=ax3.transAxes)
ax3.set_xlabel(
f'{self.spectrum.meta["colnames"][0]} [{self.spectrum.spectral_axis.unit}]'
)
ax3.set_ylabel(
f'{self.spectrum.meta["colnames"][1]} [{self.spectrum.flux.unit}]'
)
# Set tick parameters and add legends to axes.
for ax in (ax0, ax1, ax2, ax3):
ax.tick_params(
axis="both", which="both", direction="in", top=True, right=True
)
ax.minorticks_on()
ax.legend(loc=0, frameon=False)
fig.set_layout_engine("constrained")
return fig