import os
import pathlib
from collections import defaultdict
import numpy as np
from astropy.io.registry import IORegistryError
from astropy.nddata import StdDevUncertainty
from astropy.io import fits
from specutils import Spectrum, SpectrumList, SpectrumCollection
from jdaviz.core.events import SnackbarMessage
from jdaviz.core.registries import data_parser_registry
from jdaviz.utils import standardize_metadata, download_uri_to_path
__all__ = ["specviz_spectrum1d_parser"]
[docs]
@data_parser_registry("specviz-spectrum1d-parser")
def specviz_spectrum1d_parser(app, data, data_label=None, format=None, show_in_viewer=True,
concat_by_file=False, cache=None, local_path=os.curdir, timeout=None,
load_as_list=False):
"""
Loads a data file or `~specutils.Spectrum` object into Specviz.
Parameters
----------
data : str, `~specutils.Spectrum`, or `~specutils.SpectrumList`
Spectrum, SpectrumList, or path to compatible data file.
data_label : str, list, or tuple, optional
The Glue data label found in the ``DataCollection``.
format : str
Loader format specification used to indicate data format in
`~specutils.Spectrum.read` io method.
concat_by_file : bool
If True and there is more than one available extension, concatenate
the extensions within each spectrum file passed to the parser and
add a concatenated spectrum to the data collection.
cache : None, bool, or str
Cache the downloaded file if the data are retrieved by a query
to a URL or URI.
local_path : str, optional
Cache remote files to this path. This is only used if data is
requested from `astroquery.mast`.
timeout : float, optional
If downloading from a remote URI, set the timeout limit for
remote requests in seconds (passed to
`~astropy.utils.data.download_file` or
`~astroquery.mast.Conf.timeout`).
load_as_list : bool, optional
Force the parser to load the input file with the `~specutils.SpectrumList` read function
instead of `~specutils.Spectrum`.
"""
spectrum_viewer_reference_name = app._jdaviz_helper._default_spectrum_viewer_reference_name
# If no data label is assigned, give it a unique name
if not data_label:
data_label = app.return_data_label(data, alt_name="specviz_data")
# Still need to standardize the label
elif isinstance(data_label, (list, tuple)):
data_label = [app.return_data_label(label, alt_name="specviz_data") for label in data_label] # noqa
else:
data_label = app.return_data_label(data_label, alt_name="specviz_data")
if isinstance(data, SpectrumCollection):
raise TypeError("SpectrumCollection detected."
" Please provide a Spectrum or SpectrumList")
elif isinstance(data, Spectrum):
# Handle the possibility of 2D spectra by splitting into separate spectra
if data.flux.ndim == 1:
data_label = [data_label]
data = [data]
elif data.flux.ndim == 2:
data = split_spectrum_with_2D_flux_array(data)
data_label = [f"{data_label} [{i}]" for i in range(len(data))]
# No special processing is needed in this case, but we include it for completeness
elif isinstance(data, SpectrumList):
pass
elif isinstance(data, list):
# special processing for HDUList
if isinstance(data, fits.HDUList):
data = [Spectrum.read(data)]
data_label = [data_label]
else:
# list treated as SpectrumList if not an HDUList
data = SpectrumList.read(data, format=format)
else:
# try parsing file_obj as a URI/URL:
data = download_uri_to_path(data, cache=cache, local_path=local_path, timeout=timeout)
path = pathlib.Path(data)
if path.is_dir() or load_as_list:
data = SpectrumList.read(str(path), format=format)
if data == []:
raise ValueError(f"`specutils.SpectrumList.read('{str(path)}')` "
"returned an empty list")
elif path.is_file():
try:
data = Spectrum.read(str(path), format=format)
if data.flux.ndim == 2:
data = split_spectrum_with_2D_flux_array(data)
else:
data = [data]
data_label = [app.return_data_label(data_label, alt_name="specviz_data")]
except IORegistryError:
# Multi-extension files may throw a registry error
data = SpectrumList.read(str(path), format=format)
else:
raise FileNotFoundError("No such file: " + str(path))
# step through SpectrumList and convert any 2D spectra to 1D spectra
if isinstance(data, SpectrumList):
new_data = []
for spec in data:
if spec.flux.ndim == 2:
new_data.extend(split_spectrum_with_2D_flux_array(spec))
else:
new_data.append(spec)
data = SpectrumList(new_data)
if not isinstance(data_label, (list, tuple)):
data_label = [f"{data_label} [{i}]" for i in range(len(data))]
elif len(data_label) != len(data):
raise ValueError(f"Length of data labels list ({len(data_label)}) is different"
f" than length of list of data ({len(data)})")
# these are used to build a combined spectrum with all
# input spectra included (taken from https://github.com/spacetelescope/
# dat_pyinthesky/blob/main/jdat_notebooks/MRS_Mstar_analysis/
# JWST_Mstar_dataAnalysis_analysis.ipynb)
wlallorig = [] # to collect wavelengths
fnuallorig = [] # fluxes
dfnuallorig = [] # and uncertanties (if present)
for spec in data:
for wlind in range(len(spec.spectral_axis)):
wlallorig.append(spec.spectral_axis[wlind].value)
fnuallorig.append(spec.flux[wlind].value)
# because some spec in the list might have uncertainties and
# others may not, if uncert is None, set to list of NaN. later,
# if the concatenated list of uncertanties is all nan (meaning
# they were all nan to begin with, or all None), it will be set
# to None on the final Spectrum
if spec.uncertainty is not None and spec.uncertainty[wlind] is not None:
dfnuallorig.append(spec.uncertainty[wlind].array)
else:
dfnuallorig.append(np.nan)
# if the entire uncert. array is Nan and the data is not, model fitting won't
# work (more generally, if uncert[i] is nan/inf and flux[i] is not, fitting will
# fail, but just deal with the all nan case here since it is straightforward).
# set uncerts. to None if they are all nan/inf, and display a warning message.
set_nans_to_none = False
if isinstance(data, SpectrumList):
uncerts = dfnuallorig # alias these for clarity later on
if uncerts is not None and not np.any(uncerts):
uncerts = None
set_nans_to_none = True
else:
if data[0].uncertainty is not None:
uncerts_finite = np.isfinite(data[0].uncertainty.array)
if not np.any(uncerts_finite):
data[0].uncertainty = None
set_nans_to_none = True
if set_nans_to_none:
# alert user that we have changed their all-nan uncertainty array to None
msg = 'All uncertainties are nonfinite, replacing with uncertainty=None.'
app.hub.broadcast(SnackbarMessage(msg, color="warning", sender=app))
with app.data_collection.delay_link_manager_update():
for i, spec in enumerate(data):
# note: if SpectrumList, this is just going to be the last unit when
# combined in the next block. should put a check here to make sure
# units are all the same or collect them in a list?
wave_units = spec.spectral_axis.unit
flux_units = spec.flux.unit
# Make metadata layout conform with other viz.
spec.meta = standardize_metadata(spec.meta)
app.add_data(spec, data_label[i])
# handle display, with the SpectrumList special case in mind.
if i == 0 and show_in_viewer:
app.add_data_to_viewer(spectrum_viewer_reference_name, data_label[i])
if concat_by_file and isinstance(data, SpectrumList):
# If >1 spectra in the list were opened from the same FITS file,
# group them by their original FITS filenames and add their combined
# 1D spectrum to the DataCollection.
unique_files = group_spectra_by_filename(app.data_collection)
for filename, datasets in unique_files.items():
if len(datasets) > 1:
spec = combine_lists_to_1d_spectrum(wlallorig, fnuallorig,
dfnuallorig, wave_units,
flux_units)
# Make metadata layout conform with other viz.
spec.meta = standardize_metadata(spec.meta)
label = f"Combined [{filename}]"
app.add_data(spec, label)
if show_in_viewer:
app.add_data_to_viewer(spectrum_viewer_reference_name, label)
def group_spectra_by_filename(datasets):
"""
Organize the elements of a `~glue.core.data_collection.DataCollection` by the name of
the FITS file they came from, if available via the header card "FILENAME".
Parameters
----------
datasets : `~glue.core.data_collection.DataCollection`
Collection of datasets.
Returns
-------
spectra_to_combine : dict
Datasets of spectra organized by their filename.
"""
spectra_to_combine = defaultdict(list)
for data in datasets:
filename = data.meta.get("FILENAME")
spectra_to_combine[filename].append(data)
return spectra_to_combine
def combine_lists_to_1d_spectrum(wl, fnu, dfnu, wave_units, flux_units):
"""
Combine lists of 1D spectra into a composite `~specutils.Spectrum` object.
Parameters
----------
wl : list of `~astropy.units.Quantity`s
Wavelength in each spectral channel
fnu : list of `~astropy.units.Quantity`s
Flux in each spectral channel
dfnu : list of `~astropy.units.Quantity`s or None
Uncertainty on each flux
Returns
-------
spec : `~specutils.Spectrum`
Composite 1D spectrum.
"""
wlallarr = np.array(wl)
fnuallarr = np.array(fnu)
srtind = np.argsort(wlallarr)
if dfnu is not None:
dfnuallarr = np.array(dfnu)
fnuallerr = dfnuallarr[srtind]
wlall = wlallarr[srtind]
fnuall = fnuallarr[srtind]
# units are not being handled properly yet.
if dfnu is not None:
unc = StdDevUncertainty(fnuallerr * flux_units)
else:
unc = None
spec = Spectrum(flux=fnuall * flux_units, spectral_axis=wlall * wave_units,
uncertainty=unc)
return spec
def split_spectrum_with_2D_flux_array(data):
"""
Helper function to split Spectrum of 2D flux to a SpectrumList of nD objects.
Parameters
----------
data : `~specutils.Spectrum`
Spectrum with 2D flux array
Returns
-------
new_data : `~specutils.SpectrumList`
List of unpacked spectra.
"""
new_data = []
for i in range(data.flux.shape[0]):
unc = None
mask = None
if data.uncertainty is not None:
unc = data.uncertainty[i, :]
if data.mask is not None:
mask = data.mask[i, :]
new_data.append(Spectrum(flux=data.flux[i, :], spectral_axis=data.spectral_axis,
uncertainty=unc, mask=mask, meta=data.meta))
return new_data