Source code for jdaviz.configs.cubeviz.plugins.spectral_extraction.spectral_extraction
from functools import cached_property
import numpy as np
import astropy
from astropy import units as u
from astropy.coordinates import SpectralCoord
from astropy.nddata import NDDataArray, StdDevUncertainty
from traitlets import Any, Bool, Dict, Float, List, Unicode, observe
from jdaviz.core.custom_traitlets import FloatHandleEmpty
from jdaviz.core.events import SnackbarMessage, SliceValueUpdatedMessage, GlobalDisplayUnitChanged
from jdaviz.core.marks import PluginLine
from jdaviz.core.registries import tray_registry
from jdaviz.core.template_mixin import (PluginTemplateMixin,
DatasetSelectMixin,
SelectPluginComponent,
ApertureSubsetSelectMixin,
ApertureSubsetSelect,
AddResults, AddResultsMixin,
skip_if_not_tray_instance,
skip_if_no_updates_since_last_active,
with_spinner, with_temp_disable)
from jdaviz.core.user_api import PluginUserApi
from jdaviz.core.unit_conversion_utils import (all_flux_unit_conversion_equivs,
flux_conversion_general,
check_if_unit_is_per_solid_angle)
from jdaviz.configs.cubeviz.plugins.parsers import _return_spectrum_with_correct_units
from jdaviz.configs.cubeviz.plugins.viewers import WithSliceIndicator
__all__ = ['SpectralExtraction3D']
[docs]
@tray_registry(
'spectral-extraction-3d', label="3D Spectral Extraction",
category='data:reduction'
)
class SpectralExtraction3D(PluginTemplateMixin, ApertureSubsetSelectMixin,
DatasetSelectMixin, AddResultsMixin):
"""
See the :ref:`3D Spectral Extraction Plugin Documentation <spectral-extraction>`
for more details.
Only the following attributes and methods are available through the
:ref:`public plugin API <plugin-apis>`:
* :meth:`~jdaviz.core.template_mixin.PluginTemplateMixin.show`
* :meth:`~jdaviz.core.template_mixin.PluginTemplateMixin.open_in_tray`
* :meth:`~jdaviz.core.template_mixin.PluginTemplateMixin.close_in_tray`
* ``show_live_preview``
Whether to show a live preview of the extracted spectrum.
* ``dataset`` (:class:`~jdaviz.core.template_mixin.DatasetSelect`)
Dataset to use for spectral extraction.
* ``aperture`` (:class:`~jdaviz.core.template_mixin.ApertureSubsetSelect`):
Subset to use for the spectral extraction, or ``Entire Cube``.
* ``wavelength_dependent``:
Whether the ``aperture`` should be considered wavelength-dependent. The cone is defined
to intersect ``aperture`` at ``reference_spectral_value``.
* ``reference_spectral_value``:
The wavelength that will be used to calculate the radius of the cone through the cube.
* ``background`` (:class:`~jdaviz.core.template_mixin.ApertureSubsetSelect`):
Subset to use for background subtraction, or ``None``.
* ``bg_wavelength_dependent``:
Whether the ``background`` aperture should be considered wavelength-dependent (requires
``wavelength_dependent`` to also be set to ``True``). The cone is defined
to intersect ``background`` at ``reference_spectral_value``.
* ```bg_spec_per_spaxel``:
Whether to normalize the background per spaxel when calling ``extract_bg_spectrum``.
Otherwise, the spectrum will be scaled by the ratio between the
areas of the aperture and the background aperture. Only applicable if ``function`` is 'Sum'.
* ``bg_spec_add_results`` (:class:`~jdaviz.core.template_mixin.AddResults`)
* :meth:`extract_bg_spectrum`
* ``aperture_method`` (:class:`~jdaviz.core.template_mixin.SelectPluginComponent`):
Method to use for extracting spectrum (and background, if applicable).
* ``add_results`` (:class:`~jdaviz.core.template_mixin.AddResults`)
* :meth:`extract`
"""
template_file = __file__, "spectral_extraction.vue"
uses_active_status = Bool(True).tag(sync=True)
show_live_preview = Bool(True).tag(sync=True)
active_step = Unicode().tag(sync=True)
resulting_product_name = Unicode("spectrum").tag(sync=True)
extracted_format = '1D Spectrum'
do_auto_extraction = True
# whether wavelength dependent options should be exposed to the user (in the UI)
wavelength_dependent_available = Bool(True).tag(sync=True)
bg_export_available = Bool(True).tag(sync=True)
wavelength_dependent = Bool(False).tag(sync=True)
reference_spectral_value = FloatHandleEmpty().tag(sync=True)
slice_spectral_value = Float().tag(sync=True)
bg_items = List([]).tag(sync=True)
bg_selected = Any('').tag(sync=True)
bg_selected_validity = Dict().tag(sync=True)
bg_scale_factor = Float(1).tag(sync=True)
bg_wavelength_dependent = Bool(False).tag(sync=True)
bg_spec_per_spaxel = Bool(False).tag(sync=True)
bg_spec_results_label = Unicode().tag(sync=True)
bg_spec_results_label_default = Unicode().tag(sync=True)
bg_spec_results_label_auto = Bool(True).tag(sync=True)
bg_spec_results_label_invalid_msg = Unicode('').tag(sync=True)
bg_spec_results_label_overwrite = Bool().tag(sync=True)
bg_spec_add_to_viewer_items = List().tag(sync=True)
bg_spec_add_to_viewer_selected = Unicode().tag(sync=True)
bg_spec_spinner = Bool(False).tag(sync=True)
function_items = List().tag(sync=True)
function_selected = Unicode('Sum').tag(sync=True)
extraction_available = Bool(False).tag(sync=True)
results_units = Unicode().tag(sync=True)
spectrum_y_units = Unicode().tag(sync=True)
flux_units = Unicode().tag(sync=True)
sb_units = Unicode().tag(sync=True)
aperture_method_items = List().tag(sync=True)
aperture_method_selected = Unicode('Center').tag(sync=True)
conflicting_aperture_and_function = Bool(False).tag(sync=True)
conflicting_aperture_error_message = Unicode('Aperture method Exact cannot be selected along'
' with Min or Max.').tag(sync=True)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# description displayed under plugin title in tray
self._plugin_description = 'Extract a spectrum from a spectral cube.'
self.extracted_spec = None
self.dataset.filters = ['is_flux_cube']
# TODO: in the future this could be generalized with support in SelectPluginComponent
self.aperture._default_text = 'Entire Cube'
self.aperture._manual_options = ['Entire Cube']
self.aperture.items = [{"label": "Entire Cube"}]
self.aperture._subset_selected_changed_callback = self._update_extract
# need to reinitialize choices since we overwrote items and some subsets may already
# exist.
self.aperture._initialize_choices()
self.aperture.select_default()
self.background = ApertureSubsetSelect(self,
'bg_items',
'bg_selected',
'bg_selected_validity',
'bg_scale_factor',
dataset='dataset',
multiselect=None,
default_text='None',
subset_selected_changed_callback=self._update_extract) # noqa
self.bg_spec_add_results = AddResults(self, 'bg_spec_results_label',
'bg_spec_results_label_default',
'bg_spec_results_label_auto',
'bg_spec_results_label_invalid_msg',
'bg_spec_results_label_overwrite',
'bg_spec_add_to_viewer_items',
'bg_spec_add_to_viewer_selected')
self.bg_spec_add_results.viewer.filters = ['is_slice_indicator_viewer']
self.bg_spec_results_label_default = f'background-{self.resulting_product_name}'
self.function = SelectPluginComponent(
self,
items='function_items',
selected='function_selected',
manual_options=['Mean', 'Min', 'Max', 'Sum']
)
self.aperture_method_manual_options = ['Exact', 'Center']
self.aperture_method = SelectPluginComponent(
self,
items='aperture_method_items',
selected='aperture_method_selected',
manual_options=self.aperture_method_manual_options
)
self._set_default_results_label()
self.add_results.viewer.filters = ['is_slice_indicator_viewer']
self.session.hub.subscribe(self, SliceValueUpdatedMessage,
handler=self._on_slice_changed)
self.hub.subscribe(self, GlobalDisplayUnitChanged,
handler=self._on_global_display_unit_changed)
self._update_disabled_msg()
if self.config == "deconfigged":
self.observe_traitlets_for_relevancy(traitlets_to_observe=['dataset_items'])
@property
def user_api(self):
expose = ['show_live_preview', 'dataset', 'function', 'aperture',
'background', 'bg_wavelength_dependent',
'bg_spec_per_spaxel', 'bg_spec_add_results', 'extract_bg_spectrum',
'add_results', 'extract',
'wavelength_dependent', 'reference_spectral_value',
'aperture_method']
return PluginUserApi(self, expose=expose)
@observe('dataset_items')
def _update_disabled_msg(self, msg={}):
for data in self.app.data_collection:
if data.data.ndim == 3:
self.disabled_msg = ''
break
else:
# no cube-like data loaded. Once loaded, the parser will unset this
self.disabled_msg = (
f"{self.__class__.__name__} requires a 3d cube dataset to be loaded, "
"please load data to enable this plugin."
)
@property
def live_update_subscriptions(self):
return {'data': ('dataset',), 'subset': ('aperture', 'background')}
@property
def slice_display_unit_name(self):
return 'spectral'
@property
def spatial_axes(self):
# Collapse an e.g. 3D spectral cube to 1D spectrum.
spatial_axes = [0, 1, 2]
spatial_axes.remove(self.spectral_axis_index)
return tuple(spatial_axes)
@property
def spectral_axis_index(self):
return self.cube.meta["spectral_axis_index"]
@property
def slice_indicator_viewers(self):
return [v for v in self.app._viewer_store.values() if isinstance(v, WithSliceIndicator)]
@observe('active_step', 'is_active')
def _active_step_changed(self, *args):
self.aperture._set_mark_visiblities(self.active_step in ('', 'ap', 'extract'))
self.background._set_mark_visiblities(self.active_step == 'bg')
if 'bg_extract' in self.marks:
self.marks['bg_extract'].visible = self.active_step == 'bg'
@property
def slice_plugin(self):
return self.app._jdaviz_helper.plugins['Spectral Slice']
@observe('aperture_items')
@skip_if_not_tray_instance()
def _aperture_items_changed(self, msg):
if not self.do_auto_extraction:
return
if not hasattr(self, 'aperture'):
return
if not len(self.dataset_items):
return
orig_labels = [item['label'] for item in msg['old']]
for item in msg['new']:
if item['label'] not in orig_labels:
subset_lbl = item.get('label')
# Check if an auto-extracted spectrum already exists for this subset
# by checking metadata (not data labels, since user may have renamed)
already_has_extraction = False
for data in self.app.data_collection:
plugin_inputs = data.meta.get('_update_live_plugin_results', None)
if plugin_inputs is None:
continue
# Simply check if aperture and dataset match - covers all rename scenarios
if (plugin_inputs.get('aperture') == subset_lbl and
plugin_inputs.get('dataset') == self.dataset.selected):
already_has_extraction = True
break
if already_has_extraction:
continue
try:
self._extract_in_new_instance(subset_lbl=subset_lbl,
auto_update=True, add_data=True)
except Exception as e:
msg = SnackbarMessage(
f"Automatic {self.resulting_product_name} extraction for {subset_lbl} failed", # noqa
color='error', sender=self, timeout=10000,
traceback=e)
else:
msg = SnackbarMessage(
f"Automatic {self.resulting_product_name} extraction for {subset_lbl} successful", # noqa
color='success', sender=self)
self.app.hub.broadcast(msg)
def _extract_in_new_instance(self, dataset=None, function='Sum', subset_lbl=None,
auto_update=False, add_data=False):
# create a new instance of the 3D Spectral Extraction plugin (to not
# affect the instance in the tray) and extract the entire cube with defaults.
plg = self.new()
plg.dataset.selected = self.dataset.selected if dataset is None else dataset
if subset_lbl is not None:
plg.aperture.selected = subset_lbl
plg.aperture_method.selected = 'Center'
plg.function.selected = function
plg.add_results.auto_update_result = auto_update
# TODO: once add_results.viewer supports multiselect, this should call select_all()
# for now it will select the first valid viewer to send results.
plg.add_results.viewer.select_default()
# all other settings remain at their plugin defaults
return plg(add_data=add_data)
@observe('wavelength_dependent', 'bg_wavelength_dependent')
def _wavelength_dependent_changed(self, *args):
if self.wavelength_dependent:
self.reference_spectral_value = self.slice_plugin.value
else:
self.bg_wavelength_dependent = False
# NOTE: this can be redundant in the case where reference_spectral_value changed and
# triggers the observe, but we need to ensure it is updated if reference_spectral_value
# is unchanged
self._update_mark_scale()
def _on_slice_changed(self, msg):
self.slice_spectral_value = msg.value
[docs]
def vue_goto_reference_spectral_value(self, *args):
self.slice_plugin.value = self.reference_spectral_value
[docs]
def vue_adopt_slice_as_reference(self, *args):
self.reference_spectral_value = self.slice_plugin.value
@observe('reference_spectral_value', 'slice_spectral_value')
def _update_mark_scale(self, *args):
if not self.wavelength_dependent:
self.aperture.scale_factor = 1.0
else:
self.aperture.scale_factor = self.slice_spectral_value/self.reference_spectral_value
if not self.bg_wavelength_dependent:
self.background.scale_factor = 1.0
else:
self.background.scale_factor = self.slice_spectral_value/self.reference_spectral_value
def _on_global_display_unit_changed(self, msg=None):
if msg is None:
self.flux_units = str(self.app._get_display_unit('flux'))
self.sb_units = str(self.app._get_display_unit('sb'))
self.spectrum_y_units = str(self.app._get_display_unit('spectral_y'))
elif msg.axis == 'flux':
self.flux_units = str(msg.unit)
elif msg.axis == 'sb':
self.sb_units = str(msg.unit)
elif msg.axis == 'spectral_y':
self.spectrum_y_units = str(msg.unit)
# no need to update results_units as separate messages will have been
# sent by unit conversion for flux and/or sb.
# updates to spectrum_y_units will trigger updating the extraction preview
return
else:
# ignore
return
# update results_units based on flux_units, sb_units, and currently selected function
self._update_results_units()
@observe('function_selected')
def _update_results_units(self, *args):
# NOTE this is also called by _on_global_display_unit_changed
# after flux_units and/or sb_units is set.
# results_units is ONLY used for the warning in the UI, so does not
# need to trigger an update to the preview
if self.function_selected.lower() == 'sum':
self.results_units = self.flux_units
else:
self.results_units = self.sb_units
@observe('function_selected', 'aperture_method_selected')
def _update_aperture_method_on_function_change(self, *args):
if (self.function_selected.lower() in ('min', 'max') and
self.aperture_method_selected.lower() != 'center'):
self.conflicting_aperture_and_function = True
else:
self.conflicting_aperture_and_function = False
@property
def cube(self):
return self.dataset.selected_dc_item
@property
def uncert_cube(self):
if (hasattr(self._app._jdaviz_helper, '_loaded_flux_cube') and
hasattr(self.app._jdaviz_helper, '_loaded_uncert_cube') and
self.dataset.selected == self._app._jdaviz_helper._loaded_flux_cube.label):
return self._app._jdaviz_helper._loaded_uncert_cube
else:
# TODO: allow selecting or associating an uncertainty cube?
return None
@property
def mask_cube(self):
if (hasattr(self._app._jdaviz_helper, '_loaded_flux_cube') and
hasattr(self.app._jdaviz_helper, '_loaded_mask_cube') and
self.dataset.selected == self._app._jdaviz_helper._loaded_flux_cube.label):
return self._app._jdaviz_helper._loaded_mask_cube
else:
# TODO: allow selecting or associating a mask/DQ cube?
return None
@property
def slice_display_unit(self):
return astropy.units.Unit(self.app._get_display_unit(self.slice_display_unit_name))
@property
def inverted_mask_non_science(self):
# Aperture masks begin by removing from consideration any pixel
# set to NaN, which corresponds to a pixel on the "non-science" portions
# of the detector. For JWST spectral cubes, these pixels are also marked in
# the DQ array with flag `513`. Also respect the loaded mask, if it exists.
# This "inverted mask" is `True` where the data are included, `False` where excluded.
mask_non_science = np.isnan(self.dataset.selected_obj.flux.value)
if self.mask_cube is not None:
mask_non_science = np.logical_or(self.mask_cube.get_component('flux').data,
mask_non_science)
return np.logical_not(mask_non_science)
@property
def aperture_weight_mask(self):
# Exact slice mask of cone or cylindrical aperture through the cube. `weight_mask` is
# a 3D array with fractions of each pixel within an aperture at each
# wavelength, on the range [0, 1].
if self.aperture.selected == self.aperture.default_text:
# Entire Cube
return self.inverted_mask_non_science
return (
self.inverted_mask_non_science *
self.aperture.get_mask(
self.dataset.selected_obj,
self.aperture_method_selected,
self.slice_display_unit,
self.spatial_axes,
self.reference_spectral_value if self.wavelength_dependent else None)
)
@property
def bg_weight_mask(self):
if self.background.selected == self.background.default_text:
# NO background
return np.zeros_like(self.dataset.selected_obj.flux.value)
return (
self.inverted_mask_non_science *
self.background.get_mask(
self.dataset.selected_obj,
self.aperture_method_selected,
self.slice_display_unit,
self.spatial_axes,
self.reference_spectral_value if self.bg_wavelength_dependent else None)
)
@property
def aperture_area_along_spectral(self):
# Weight mask summed along the spatial axes so that we get area of the aperture, in pixels,
# as a function of wavelength.
# To convert to steradians, multiply by self.cube.meta.get('PIXAR_SR', 1.0)
return np.sum(self.aperture_weight_mask, axis=self.spatial_axes)
@property
def bg_area_along_spectral(self):
return np.sum(self.bg_weight_mask, axis=self.spatial_axes)
def _extract_from_aperture(self, cube, uncert_cube, mask_cube, aperture,
weight_mask, wavelength_dependent,
selected_func, **kwargs):
# This plugin collapses over the *spatial axes* (optionally over a spatial subset,
# defaults to ``No Subset``). Since the Cubeviz parser puts the fluxes
# and uncertainties in different glue Data objects, we translate the spectral
# cube and its uncertainties into separate NDDataArrays, then combine them:
if not isinstance(aperture, ApertureSubsetSelect):
raise ValueError("aperture must be an ApertureSubsetSelect object")
if aperture.selected != aperture.default_text:
nddata = cube.get_subset_object(
subset_id=aperture.selected, cls=NDDataArray
)
if uncert_cube:
uncertainties = uncert_cube.get_subset_object(
subset_id=aperture.selected, cls=StdDevUncertainty
)
else:
uncertainties = None
if self.aperture_method_selected.lower() == 'center':
flux = nddata.data << nddata.unit
else: # exact (min/max not allowed here)
# Apply the fractional pixel array to the flux cube
flux = (weight_mask * nddata.data) << nddata.unit
# Boolean cube which is True outside of the aperture
# (i.e., the numpy boolean mask convention)
mask = np.isclose(weight_mask, 0)
# composite subset masks are in `nddata.mask`:
if nddata.mask is not None and np.all(weight_mask == 0):
mask &= nddata.mask
else:
nddata = cube.get_object(cls=NDDataArray)
if uncert_cube:
uncertainties = uncert_cube.get_object(cls=StdDevUncertainty)
else:
uncertainties = None
flux = nddata.data << nddata.unit
mask = nddata.mask
# Use the spectral coordinate from the WCS:
if '_orig_spec' in cube.meta:
wcs = cube.meta['_orig_spec'].wcs.spectral
if hasattr(cube.coords, 'spectral'):
wcs = cube.coords.spectral
elif hasattr(cube.coords, 'spectral_wcs'):
# This is the attribute for a PaddedSpectrumWCS in the 3D case
wcs = cube.coords.spectral_wcs
else:
wcs = cube.coords
# Filter out NaNs (False = good)
if mask is None:
mask = np.isnan(flux)
else:
mask = np.logical_or(mask, np.isnan(flux))
# Also apply the cube's original mask array
if mask_cube:
snackbar_message = SnackbarMessage(
"Note: Applied loaded mask cube during extraction",
color="warning",
sender=self)
self.hub.broadcast(snackbar_message)
mask_from_cube = mask_cube.get_component('flux').data.copy()
# Some mask cubes have NaNs where they are not masked instead of 0
mask_from_cube[np.where(np.isnan(mask_from_cube))] = 0
mask = np.logical_or(mask, mask_from_cube.astype('bool'))
nddata_reshaped = NDDataArray(
flux, mask=mask, uncertainty=uncertainties, wcs=wcs, meta=nddata.meta
)
# by default we want to use operation_ignores_mask=True in nddata:
kwargs.setdefault("operation_ignores_mask", True)
# by default we want to propagate uncertainties:
kwargs.setdefault("propagate_uncertainties", True)
if selected_func == 'mean':
# Use built-in sum function to collapse NDDataArray
collapsed_sum_for_mean = nddata_reshaped.sum(axis=self.spatial_axes, **kwargs)
# But we still need the mean function for everything except flux
collapsed_as_mean = nddata_reshaped.mean(axis=self.spatial_axes, **kwargs)
# Then normalize the flux based on the fractional pixel array
flux_for_mean = (collapsed_sum_for_mean.data /
np.sum(weight_mask, axis=self.spatial_axes)) << nddata_reshaped.unit
# Combine that information into a new NDDataArray
collapsed_nddata = NDDataArray(flux_for_mean, mask=collapsed_as_mean.mask,
uncertainty=collapsed_as_mean.uncertainty,
wcs=collapsed_as_mean.wcs,
meta=collapsed_as_mean.meta)
elif selected_func == 'sum':
collapsed_nddata = getattr(nddata_reshaped, selected_func)(
axis=self.spatial_axes, **kwargs
) # returns an NDDataArray
# Remove per solid angle denominator to turn sb into flux
sq_angle_unit = check_if_unit_is_per_solid_angle(collapsed_nddata.unit,
return_unit=True)
if sq_angle_unit is not None:
# convert aperture area in steradians to the selected square angle unit
# NOTE: just forcing these units for now!! this is in steradians and
# needs to be converted to the selected square angle unit but for now just
# force to correct units
if sq_angle_unit == u.sr:
aperture_area = self.cube.meta.get('PIXAR_SR', 1.0) * sq_angle_unit
else:
aperture_area = 1 * sq_angle_unit
collapsed_nddata = collapsed_nddata.multiply(aperture_area,
propagate_uncertainties=True)
else:
collapsed_nddata = getattr(nddata_reshaped, selected_func)(
axis=self.spatial_axes, **kwargs
) # returns an NDDataArray
return self._return_extracted(cube, wcs, collapsed_nddata)
def _return_extracted(self, cube, wcs, collapsed_nddata, pass_spectral_axis=False):
# Convert to Spectrum, with the spectral axis in correct units:
if hasattr(cube.coords, 'spectral_wcs'):
target_wave_unit = cube.coords.spectral_wcs.world_axis_units[0]
wcs = cube.coords.spectral_wcs
elif hasattr(cube.coords, 'spectral'):
target_wave_unit = cube.coords.spectral.world_axis_units[0]
wcs = cube.coords.spectral
else:
target_wave_unit = None
# Can't split out spectral from GWCS
wcs = cube.coords
pass_spectral_axis = True
if target_wave_unit == '':
target_wave_unit = 'pix'
flux = collapsed_nddata.data << collapsed_nddata.unit
mask = collapsed_nddata.mask
uncertainty = collapsed_nddata.uncertainty
if pass_spectral_axis:
wcs_args = [0, 0, 0]
spec_indices = np.arange(cube.shape[self.spectral_axis_index])
wcs_args[self.spectral_axis_index] = spec_indices
wcs_args.reverse()
spectral_and_spatial = wcs.pixel_to_world(*wcs_args)
spectral_axis = [x for x in spectral_and_spatial if isinstance(x, SpectralCoord)]
if len(spectral_axis):
spectral_axis = spectral_axis[0]
else:
# In this case we have a pixel spectral axis
world_index = cube.ndim - 1 - cube.meta['spectral_axis_index']
spectral_axis = spectral_and_spatial[world_index]
if spectral_axis.unit == "":
spectral_axis = spectral_axis * u.pixel
collapsed_spec = _return_spectrum_with_correct_units(
flux, wcs, collapsed_nddata.meta, data_type='flux',
target_wave_unit=target_wave_unit,
uncertainty=uncertainty,
mask=mask,
spectral_axis=spectral_axis
)
else:
collapsed_spec = _return_spectrum_with_correct_units(
flux, wcs, collapsed_nddata.meta, data_type='flux',
target_wave_unit=target_wave_unit,
uncertainty=uncertainty,
mask=mask
)
return collapsed_spec
def _preview_x_from_extracted(self, extracted):
return extracted.spectral_axis
def _preview_y_from_extracted(self, extracted):
"""
Convert y-axis units of extraction preview to display units,
if necessary.
"""
if extracted.flux.unit != self.spectrum_y_units:
eqv = all_flux_unit_conversion_equivs(self.dataset.selected_obj.meta.get('PIXAR_SR', 1.0), # noqa
self.dataset.selected_obj.spectral_axis)
return flux_conversion_general(extracted.flux.value, extracted.flux.unit,
self.spectrum_y_units, eqv)
return extracted.flux
[docs]
@with_spinner()
def extract(self, return_bg=False, add_data=True, **kwargs):
"""
Extract the spectrum from the data cube according to the plugin inputs.
Parameters
----------
return_bg : bool, optional
Whether to also return the spectrum of the background, if applicable.
add_data : bool, optional
Whether to load the resulting data back into the application according to
``add_results``.
kwargs : dict
Additional keyword arguments passed to the NDDataArray collapse operation.
Examples include ``propagate_uncertainties`` and ``operation_ignores_mask``.
"""
if self.conflicting_aperture_and_function:
raise ValueError(self.conflicting_aperture_error_message)
if self.aperture.selected == self.background.selected:
raise ValueError("aperture and background cannot be set to the same subset")
selected_func = self.function_selected.lower()
spec = self._extract_from_aperture(self.cube, self.uncert_cube, self.mask_cube,
self.aperture, self.aperture_weight_mask,
self.wavelength_dependent,
selected_func, **kwargs)
bg_spec = self.extract_bg_spectrum(add_data=False, bg_spec_per_spaxel=False)
if bg_spec is not None:
spec = spec - bg_spec
# per https://jwst-docs.stsci.edu/jwst-near-infrared-camera/nircam-performance/nircam-absolute-flux-calibration-and-zeropoints # noqa
pix_scale_factor = self.cube.meta.get('PIXAR_SR', 1.0)
spec.meta['_pixel_scale_factor'] = pix_scale_factor
# stuff for exporting to file
self.extracted_spec = spec
self.extraction_available = True
if add_data:
if default_color := self.aperture.selected_item.get('color', None):
spec.meta['_default_color'] = default_color
viewer = self.add_results.viewer.selected
if viewer == 'None':
viewer = []
self.add_results.add_results_from_plugin(spec, format=self.extracted_format,
load_kwargs={'viewer': viewer})
snackbar_message = SnackbarMessage(
f"{self.resulting_product_name.title()} extracted successfully.",
color="success",
sender=self)
self.hub.broadcast(snackbar_message)
if return_bg:
return spec, bg_spec
return spec
[docs]
@with_spinner('bg_spec_spinner')
def extract_bg_spectrum(self, add_data=False, **kwargs):
"""
Create a background 1D spectrum from the input parameters defined in the plugin.
If ``function`` is 'sum', then the value is scaled by the relative ratios of the area
(along the spectral axis) of ``aperture`` to ``background``.
Parameters
----------
add_data : bool
Whether to add the resulting spectrum to the application, according to the options
defined in the plugin.
kwargs : dict
Additional keyword arguments passed to the NDDataArray collapse operation.
Examples include ``propagate_uncertainties`` and ``operation_ignores_mask``.
"""
# allow internal calls to override the behavior of the bg_spec_per_spaxel traitlet
bg_spec_per_spaxel = kwargs.pop('bg_spec_per_spaxel', self.bg_spec_per_spaxel)
if self.background.selected != self.background.default_text:
bg_spec = self._extract_from_aperture(self.cube, self.uncert_cube, self.mask_cube,
self.background, self.bg_weight_mask,
self.bg_wavelength_dependent,
self.function_selected.lower(), **kwargs)
if self.function_selected.lower() == 'sum':
if not bg_spec_per_spaxel:
# then scale according to aperture areas across the spectral axis (allowing for
# independent wavelength-dependence btwn the aperture and background)
bg_spec *= self.aperture_area_along_spectral / self.bg_area_along_spectral
else:
bg_spec = None
if add_data:
if bg_spec is None:
raise ValueError(f"Background is set to {self.background.selected}")
self.bg_spec_add_results.add_results_from_plugin(bg_spec, replace=False)
return bg_spec
[docs]
def vue_spectral_extraction(self, *args, **kwargs):
try:
self.extract(add_data=True)
except Exception as e:
self.hub.broadcast(SnackbarMessage(
f"Extraction failed: {repr(e)}",
sender=self, color="error",
traceback=e))
@observe('aperture_selected', 'function_selected')
def _set_default_results_label(self, event={}):
if not hasattr(self, 'aperture'):
return
if self.aperture.selected == self.aperture.default_text:
self.results_label_default = f"{self.resulting_product_name.title()} ({self.function_selected.lower()})" # noqa
else:
self.results_label_default = f"{self.resulting_product_name.title()} ({self.aperture_selected}, {self.function_selected.lower()})" # noqa
@cached_property
def marks(self):
if not self._tray_instance:
return {}
if not len(self.slice_indicator_viewers):
return {}
# TODO: iterate over self.slice_indicator_viewers and handle adding/removing viewers
sv = self.slice_indicator_viewers[0]
marks = {'extract': PluginLine(sv, visible=self.is_active),
'bg_extract': PluginLine(sv,
line_style='dotted',
visible=self.is_active and self.active_step == 'bg')}
sv.figure.marks = sv.figure.marks + [marks['extract'], marks['bg_extract']]
return marks
def _clear_marks(self):
for mark in self.marks.values():
if mark.visible:
mark.visible = False
@observe('is_active', 'show_live_preview',
'dataset_selected', 'aperture_selected', 'bg_selected',
'wavelength_dependent', 'bg_wavelength_dependent', 'reference_spectral_value',
'function_selected',
'aperture_method_selected',
'spectrum_y_units',
'previews_temp_disabled')
def _live_update_marks(self, event={}):
if self.spectrum_y_units == '':
# ensure that units are populated
# which in turn will make a call back here
# from the observe on spectrum_y_units
self._on_global_display_unit_changed(None)
return
self._update_marks(event)
@skip_if_not_tray_instance()
def _update_marks(self, event={}):
visible = self.show_live_preview and self.is_active
if not visible:
self._clear_marks()
return
# ensure the correct visibility, always (whether or not there have been updates)
self.marks['bg_extract'].visible = self.active_step == 'bg' and self.bg_selected != self.background.default_text # noqa
self.marks['extract'].visible = True
# _live_update_extract will skip if no updates since last active
self._live_update_extract(event)
@skip_if_no_updates_since_last_active()
@with_temp_disable(timeout=0.4)
def _live_update_extract(self, event={}):
self._update_extract()
@skip_if_not_tray_instance()
def _update_extract(self):
try:
ext, bg_extract = self.extract(return_bg=True, add_data=False)
except (ValueError, Exception):
self._clear_marks()
return False
self.marks['extract'].update_xy(self._preview_x_from_extracted(ext),
self._preview_y_from_extracted(ext))
if bg_extract is None:
self.marks['bg_extract'].clear()
self.marks['bg_extract'].visible = False
else:
self.marks['bg_extract'].update_xy(self._preview_x_from_extracted(bg_extract),
self._preview_y_from_extracted(bg_extract))