Source code for jdaviz.configs.specviz2d.plugins.spectral_extraction.spectral_extraction
import numpy as np
from functools import cached_property
from traitlets import Bool, List, Unicode, observe
from jdaviz.configs.mosviz.plugins.viewers import Spectrum1DViewer
from jdaviz.core.events import (SnackbarMessage, NewViewerMessage,
ViewerVisibleLayersChangedMessage)
from jdaviz.core.registries import tray_registry
from jdaviz.core.template_mixin import (PluginTemplateMixin,
SelectPluginComponent,
DatasetSelect,
AddResults,
skip_if_no_updates_since_last_active,
skip_if_not_tray_instance,
skip_if_not_relevant,
with_spinner)
from jdaviz.core.user_api import PluginUserApi
from jdaviz.core.custom_traitlets import IntHandleEmpty, FloatHandleEmpty
from jdaviz.core.marks import PluginMarkCollection, PluginLine
from astropy.modeling import models
from astropy.nddata import StdDevUncertainty, VarianceUncertainty, UnknownUncertainty
from specutils import Spectrum
from specreduce import tracing
from specreduce import background
from specreduce import extract
__all__ = ['SpectralExtraction2D']
_model_cls = {'Spline': models.Spline1D,
'Polynomial': models.Polynomial1D,
'Legendre': models.Legendre1D,
'Chebyshev': models.Chebyshev1D}
[docs]
@tray_registry('spectral-extraction-2d', label="2D Spectral Extraction",
category="data:reduction")
class SpectralExtraction2D(PluginTemplateMixin):
"""
The Spectral Extraction 2D plugin exposes specreduce methods for tracing,
background subtraction, and spectral extraction from 2D spectra.
See the :ref:`2D Spectral Extraction Plugin Documentation <specviz2d-spectral-extraction>`
for more details.
Only the following attributes and methods are available through the
:ref:`public plugin API <plugin-apis>`:
* :meth:`~jdaviz.core.template_mixin.PluginTemplateMixin.show`
* :meth:`~jdaviz.core.template_mixin.PluginTemplateMixin.open_in_tray`
* :meth:`~jdaviz.core.template_mixin.PluginTemplateMixin.close_in_tray`
* ``interactive_extract``
Whether to automatically extract when parameters change.
* ``trace_dataset`` (:class:`~jdaviz.core.template_mixin.SelectPluginComponent`)
controls the input dataset for generating the trace.
* ``trace_type`` (:class:`~jdaviz.core.template_mixin.SelectPluginComponent`)
controls the type of trace to be generated.
* ``trace_peak_method`` (:class:`~jdaviz.core.template_mixin.SelectPluginComponent`)
only applicable if ``trace_type`` is not ``Flat``.
* ``trace_pixel``
pixel of the trace. If ``trace_type`` is not ``Flat``, then this
is the "guess" for the automated trace.
* ``trace_do_binning``
only applicable if ``trace_type`` is not ``Flat``. Bin the input data when fitting the
trace.
* ``trace_bins``
only applicable if ``trace_type`` is not ``Flat`` and ``trace_do_binning``.
* ``trace_order``
order of the polynomial fit for the trace.
* ``trace_offset``
offset to apply to the trace.
* ``trace_trace`` (:class:`~jdaviz.core.template_mixin.SelectPluginComponent`)
existing trace to use as a reference.
* ``trace_window``
full width of the trace.
* :meth:`import_trace`
* :meth:`export_trace`
* ``bg_dataset`` (:class:`~jdaviz.core.template_mixin.DatasetSelect`):
controls the input dataset for generating the background.
* ``bg_type`` (:class:`~jdaviz.core.template_mixin.SelectPluginComponent`):
controls the type of background to be generated.
* ``bg_trace`` (:class:`~jdaviz.core.template_mixin.SelectPluginComponent`)
existing trace to use as a reference for the background.
* ``bg_trace_pixel``
only applicable if ``bg_type`` is set to ``Manual``
* ``bg_separation``
only applicable if ``bg_type`` set set to ``OneSided`` or ``TwoSided``.
Separation from the referenced trace for the center of each of the background window(s).
* ``bg_width``
full width of each background window(s).
* ``bg_statistic`` (:class:`~jdaviz.core.template_mixin.SelectPluginComponent`)
statistic to use when computing the background. 'Average' will account for partial pixel
weights, 'Median' will include all partial pixels.
* ``bg_add_results`` (:class:`~jdaviz.core.template_mixin.AddResults`)
* ``bg_sub_add_results``
* :meth:`import_bg`
* :meth:`export_bg`
* :meth:`export_bg_spectrum`
* :meth:`export_bg_img`
* :meth:`export_bg_sub`
* ``ext_dataset`` (:class:`~jdaviz.core.template_mixin.DatasetSelect`):
controls the input dataset for generating the extracted spectrum. "From Plugin" will
use the background-subtracted image as defined by the background options above. To skip
background extraction, select the original input 2D spectrum.
* ``ext_trace`` (:class:`~jdaviz.core.template_mixin.DatasetSelect`)
* ``ext_type`` (:class:`~jdaviz.core.template_mixin.SelectPluginComponent`)
* ``ext_width``
full width of the extraction window.
* ``horne_ext_profile`` (:class:`~jdaviz.core.template_mixin.SelectPluginComponent`)
For Horne extract, choice of 'Gaussian' or 'Self (interpolated)' to use
empirical profile from data.
* ``self_prof_n_bins``
Number of bins to use when computing the self-derived profile for Horne Extract.
* ``self_prof_interp_degree_x``
Interpolation degree (in X) to use when computing the self-derived profile
for Horne Extract.
* ``self_prof_interp_degree_y``
Interpolation degree (in Y) to use when computing the self-derived profile
for Horne Extract.
* ``ext_add_results`` (:class:`~jdaviz.core.template_mixin.AddResults`)
* :meth:`import_extract`
* :meth:`export_extract`
* :meth:`export_extract_spectrum`
"""
dialog = Bool(False).tag(sync=True)
template_file = __file__, "spectral_extraction.vue"
uses_active_status = Bool(True).tag(sync=True)
active_step = Unicode().tag(sync=True)
# SETTINGS
interactive_extract = Bool(True).tag(sync=True)
# TRACE
trace_trace_items = List().tag(sync=True)
trace_trace_selected = Unicode().tag(sync=True)
trace_offset = IntHandleEmpty(0).tag(sync=True)
trace_dataset_items = List().tag(sync=True)
trace_dataset_selected = Unicode().tag(sync=True)
trace_type_items = List().tag(sync=True)
trace_type_selected = Unicode().tag(sync=True)
trace_pixel = FloatHandleEmpty(0).tag(sync=True)
trace_order = IntHandleEmpty(3).tag(sync=True)
trace_peak_method_items = List().tag(sync=True)
trace_peak_method_selected = Unicode().tag(sync=True)
trace_do_binning = Bool(True).tag(sync=True)
trace_bins = IntHandleEmpty(20).tag(sync=True)
trace_window = IntHandleEmpty(0).tag(sync=True)
trace_results_label = Unicode().tag(sync=True)
trace_results_label_default = Unicode().tag(sync=True)
trace_results_label_auto = Bool(True).tag(sync=True)
trace_results_label_invalid_msg = Unicode('').tag(sync=True)
trace_results_label_overwrite = Bool().tag(sync=True)
trace_add_to_viewer_items = List().tag(sync=True)
trace_add_to_viewer_selected = Unicode().tag(sync=True)
trace_spinner = Bool(False).tag(sync=True)
# BACKGROUND
bg_dataset_items = List().tag(sync=True)
bg_dataset_selected = Unicode().tag(sync=True)
bg_type_items = List().tag(sync=True)
bg_type_selected = Unicode().tag(sync=True)
bg_trace_items = List().tag(sync=True)
bg_trace_selected = Unicode().tag(sync=True)
bg_trace_pixel = FloatHandleEmpty(0).tag(sync=True)
bg_statistic_items = List().tag(sync=True)
bg_statistic_selected = Unicode().tag(sync=True)
bg_separation = FloatHandleEmpty(0).tag(sync=True)
bg_width = FloatHandleEmpty(0).tag(sync=True)
bg_results_label = Unicode().tag(sync=True)
bg_results_label_default = Unicode().tag(sync=True)
bg_results_label_auto = Bool(True).tag(sync=True)
bg_results_label_invalid_msg = Unicode('').tag(sync=True)
bg_results_label_overwrite = Bool().tag(sync=True)
bg_add_to_viewer_items = List().tag(sync=True)
bg_add_to_viewer_selected = Unicode().tag(sync=True)
bg_img_spinner = Bool(False).tag(sync=True)
bg_spec_results_label = Unicode().tag(sync=True)
bg_spec_results_label_default = Unicode().tag(sync=True)
bg_spec_results_label_auto = Bool(True).tag(sync=True)
bg_spec_results_label_invalid_msg = Unicode('').tag(sync=True)
bg_spec_results_label_overwrite = Bool().tag(sync=True)
bg_spec_add_to_viewer_items = List().tag(sync=True)
bg_spec_add_to_viewer_selected = Unicode().tag(sync=True)
bg_spec_spinner = Bool(False).tag(sync=True)
bg_sub_results_label = Unicode().tag(sync=True)
bg_sub_results_label_default = Unicode().tag(sync=True)
bg_sub_results_label_auto = Bool(True).tag(sync=True)
bg_sub_results_label_invalid_msg = Unicode('').tag(sync=True)
bg_sub_results_label_overwrite = Bool().tag(sync=True)
bg_sub_add_to_viewer_items = List().tag(sync=True)
bg_sub_add_to_viewer_selected = Unicode().tag(sync=True)
bg_sub_spinner = Bool(False).tag(sync=True)
# EXTRACT
ext_dataset_items = List().tag(sync=True)
ext_dataset_selected = Unicode().tag(sync=True)
ext_trace_items = List().tag(sync=True)
ext_trace_selected = Unicode().tag(sync=True)
ext_type_items = List().tag(sync=True)
ext_type_selected = Unicode().tag(sync=True)
horne_ext_profile_items = List().tag(sync=True)
horne_ext_profile_selected = Unicode().tag(sync=True)
self_prof_n_bins = IntHandleEmpty(10).tag(sync=True)
self_prof_interp_degree_x = IntHandleEmpty(1).tag(sync=True)
self_prof_interp_degree_y = IntHandleEmpty(1).tag(sync=True)
ext_width = FloatHandleEmpty(0).tag(sync=True)
ext_uncert_warn = Bool(False).tag(sync=True)
ext_specreduce_err = Unicode().tag(sync=True)
ext_results_label = Unicode().tag(sync=True)
ext_results_label_default = Unicode().tag(sync=True)
ext_results_label_auto = Bool(True).tag(sync=True)
ext_results_label_invalid_msg = Unicode('').tag(sync=True)
ext_results_label_overwrite = Bool().tag(sync=True)
ext_add_to_viewer_items = List().tag(sync=True)
ext_add_to_viewer_selected = Unicode().tag(sync=True)
# uses default "spinner"
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# description displayed under plugin title in tray
self._plugin_description = 'Extract 1D spectrum from 2D image.'
# TRACE
self.trace_trace = DatasetSelect(self,
'trace_trace_items',
'trace_trace_selected',
default_text='New Trace',
filters=['is_trace'])
self.trace_dataset = DatasetSelect(self,
'trace_dataset_items',
'trace_dataset_selected',
filters=['layer_in_spectrum_2d_viewer', 'not_trace'])
self.trace_dataset.get_data_cls = Spectrum
self.trace_type = SelectPluginComponent(self,
items='trace_type_items',
selected='trace_type_selected',
manual_options=['Flat', 'Polynomial',
'Legendre', 'Chebyshev',
'Spline'])
self.trace_peak_method = SelectPluginComponent(self,
items='trace_peak_method_items',
selected='trace_peak_method_selected',
manual_options=['Gaussian', 'Centroid', 'Max']) # noqa
self.trace_add_results = AddResults(self, 'trace_results_label',
'trace_results_label_default',
'trace_results_label_auto',
'trace_results_label_invalid_msg',
'trace_results_label_overwrite',
'trace_add_to_viewer_items',
'trace_add_to_viewer_selected')
self.trace_add_results.viewer.filters = ['is_spectrum_2d_viewer']
self.trace_results_label_default = 'trace'
# BACKGROUND
self.bg_dataset = DatasetSelect(self,
'bg_dataset_items',
'bg_dataset_selected',
filters=['layer_in_spectrum_2d_viewer', 'not_trace'])
self.bg_type = SelectPluginComponent(self,
items='bg_type_items',
selected='bg_type_selected',
manual_options=['TwoSided', 'OneSided', 'Manual'])
self.bg_trace = DatasetSelect(self,
items='bg_trace_items',
selected='bg_trace_selected',
default_text='From Plugin',
filters=['is_trace'])
self.bg_statistic = SelectPluginComponent(self,
items='bg_statistic_items',
selected='bg_statistic_selected',
manual_options=['Average', 'Median'])
self.bg_add_results = AddResults(self, 'bg_results_label',
'bg_results_label_default',
'bg_results_label_auto',
'bg_results_label_invalid_msg',
'bg_results_label_overwrite',
'bg_add_to_viewer_items',
'bg_add_to_viewer_selected')
self.bg_add_results.viewer.filters = ['is_spectrum_2d_viewer']
self.bg_results_label_default = 'background'
self.bg_spec_add_results = AddResults(self, 'bg_spec_results_label',
'bg_spec_results_label_default',
'bg_spec_results_label_auto',
'bg_spec_results_label_invalid_msg',
'bg_spec_results_label_overwrite',
'bg_spec_add_to_viewer_items',
'bg_spec_add_to_viewer_selected')
self.bg_spec_add_results.viewer.filters = ['is_spectrum_viewer']
self.bg_spec_results_label_default = 'background-spectrum'
self.bg_sub_add_results = AddResults(self, 'bg_sub_results_label',
'bg_sub_results_label_default',
'bg_sub_results_label_auto',
'bg_sub_results_label_invalid_msg',
'bg_sub_results_label_overwrite',
'bg_sub_add_to_viewer_items',
'bg_sub_add_to_viewer_selected')
self.bg_sub_add_results.viewer.filters = ['is_spectrum_2d_viewer']
self.bg_sub_results_label_default = 'background-subtracted'
# EXTRACT
self.ext_dataset = DatasetSelect(self,
'ext_dataset_items',
'ext_dataset_selected',
default_text='From Plugin',
filters=['layer_in_spectrum_2d_viewer', 'not_trace'])
self.ext_trace = DatasetSelect(self,
items='ext_trace_items',
selected='ext_trace_selected',
default_text='From Plugin',
filters=['is_trace'])
self.ext_type = SelectPluginComponent(self,
items='ext_type_items',
selected='ext_type_selected',
manual_options=['Boxcar', 'Horne'])
self.horne_ext_profile = SelectPluginComponent(self,
items='horne_ext_profile_items',
selected='horne_ext_profile_selected',
manual_options=['Gaussian', 'Self (interpolated)']) # noqa
self.ext_add_results = AddResults(self, 'ext_results_label',
'ext_results_label_default',
'ext_results_label_auto',
'ext_results_label_invalid_msg',
'ext_results_label_overwrite',
'ext_add_to_viewer_items',
'ext_add_to_viewer_selected')
self.ext_add_results.viewer.filters = ['is_spectrum_viewer']
# NOTE: defaults to overwriting original spectrum
self.ext_add_results.label_whitelist_overwrite = ['1D Spectrum', '2D Spectrum (auto-ext)']
self.ext_results_label_default = '2D Spectrum (auto-ext)'
self.app.hub.subscribe(self, ViewerVisibleLayersChangedMessage,
lambda _: self._update_plugin_marks())
if self.config == "deconfigged":
self.observe_traitlets_for_relevancy(traitlets_to_observe=['trace_dataset_items'])
@property
def user_api(self):
return PluginUserApi(self, expose=('interactive_extract',
'trace_bins', 'trace_dataset',
'trace_do_binning', 'trace_offset',
'trace_order', 'trace_peak_method',
'trace_pixel', 'trace_trace',
'trace_type', 'trace_window',
'import_trace', 'export_trace',
'bg_add_results', 'bg_dataset',
'bg_separation', 'bg_statistic',
'bg_sub_add_results', 'bg_trace',
'bg_trace_pixel', 'bg_type', 'bg_width',
'export_bg', 'export_bg_img',
'export_bg_spectrum', 'export_bg_sub',
'import_bg',
'ext_dataset', 'ext_trace', 'ext_type',
'ext_width', 'ext_add_results',
'horne_ext_profile',
'self_prof_n_bins',
'self_prof_interp_degree_x',
'self_prof_interp_degree_y',
'import_extract',
'export_extract', 'export_extract_spectrum'))
def _clear_default_inputs(self):
self.trace_pixel = 0
self.trace_window = 0
self.bg_trace_pixel = 0
self.bg_separation = 0
self.bg_width = 0
self.ext_width = 0
@observe('irrelevant_msg')
def _updates_when_becoming_relevant(self, msg):
if msg.get('new') != '':
return
# reset all defaults for the selected trace dataset, _trace_dataset_selected
# should be triggered shortly after
self._clear_default_inputs()
def _extract_in_new_instance(self, dataset=None, add_data=False):
# create a new instance of the 2D Spectral Extraction plugin (to not
# affect the instance in the tray) and extract the entire cube with defaults.
plg = self.new()
# all other settings remain at their plugin defaults
plg._clear_default_inputs()
plg.selected = self.selected if dataset is None else dataset
plg.trace_dataset.selected = self.trace_dataset.selected if dataset is None else dataset
plg.bg_dataset.selected = self.bg_dataset.selected if dataset is None else dataset
plg._trace_dataset_selected() # should only be necessary if default dataset
return plg.export_extract_spectrum(add_data=add_data)
@observe('trace_dataset_selected')
@skip_if_not_relevant()
def _trace_dataset_selected(self, msg=None):
if not hasattr(self, 'trace_dataset'):
# happens when first initializing plugin outside of tray
return
if not len(self.trace_dataset.selected):
return
width = self.trace_dataset.get_selected_spectrum(use_display_units=True).shape[0]
# estimate the pixel number by taking the median of the brightest pixel index
# in each column, ignoring columns where the sum in that column is not
# positive (ie. columns of all zeros or nans)
trace_flux = self.trace_dataset.get_selected_spectrum(use_display_units=True).flux
trace_flux_ignore_zeros = trace_flux[:, np.nansum(trace_flux, axis=0) != 0]
if trace_flux_ignore_zeros.shape[1] == 0:
# default to trace in middle of image
brightest_pixel = int(trace_flux.shape[0]/2)
else:
brightest_pixel = int(np.nanmedian(np.nanargmax(trace_flux_ignore_zeros, axis=0)))
# do not allow to be an edge pixel
if brightest_pixel < 1:
brightest_pixel = 1
if brightest_pixel > width - 1:
brightest_pixel = width - 1
distance_from_edge = min(brightest_pixel, width-brightest_pixel)
# default width will be 10% of cross-dispersion "height",
# but no larger than distance from the edge
default_bg_width = int(np.ceil(width / 10))
default_width = min(default_bg_width, distance_from_edge * 2)
# sign for one-sided and single trace-pixel depending on whether the brightest pixel is
# above or below the middle of the image
if default_bg_width * 2 >= distance_from_edge:
sign = 1 if (brightest_pixel < width / 2) else -1
default_bg_separation = sign * default_bg_width * 2
else:
default_bg_separation = default_bg_width * 2
if self.trace_pixel == 0:
self.trace_pixel = brightest_pixel
if self.trace_window == 0:
self.trace_window = default_width
if self.bg_trace_pixel == 0:
self.bg_trace_pixel = brightest_pixel + default_bg_separation
if self.bg_separation == 0:
if default_bg_width * 2 >= distance_from_edge:
self.bg_type_selected = 'OneSided'
self.bg_separation = default_bg_separation
if self.bg_width == 0:
self.bg_width = default_bg_width
if self.ext_width == 0:
self.ext_width = default_width
[docs]
def update_marks(self, step=None):
"""
Manually update the live-preview marks for a given step in spectral extraction. This API
mimics opening the plugin and interacting with one of the steps.
Parameters
----------
step : str
Step in the extraction process to visualize. Must be one of: 'trace', 'bg', 'ext'.
"""
if step is not None:
if step == 'trace':
self._interaction_in_trace_step()
elif step == 'bg':
self._interaction_in_bg_step()
elif step == 'ext':
self._interaction_in_ext_step()
elif step == '':
return
else:
raise ValueError("step must be one of: trace, bg, ext")
@observe('ext_add_to_viewer_selected', 'ext_results_label_overwrite')
def _spectrum1d_viewer_changed(self, *args):
self.update_marks(self.active_step)
@observe('is_active', 'active_step')
@skip_if_not_tray_instance()
def _update_plugin_marks(self, msg={}):
if self.app._jdaviz_helper is None:
return
if not len(self.marks):
# plugin has never been opened, no need to create marks just to hide them,
# we'll create marks when the plugin is first opened
return
if not (self.is_active):
for step, mark in self.marks.items():
mark.clear()
return
if self.active_step == '':
# on load, default to 'extract' (this will then trigger the observe to update the marks)
self.active_step = 'ext'
return
marks_info = self.marks_info(include_mark_obj=False) # viewer + step info for each mark
viewers = {'1d': self.marks_viewers1d, '2d': self.marks_viewers2d}
for step, mark in self.marks.items():
visible = self.active_step in marks_info[step]['steps']
mark.set_for_viewers('visible', visible, viewers[marks_info[step]['viewer']])
mark.clear_if_not_in_viewers(viewers[marks_info[step]['viewer']])
@property
def marks_viewers2d(self):
if self.active_step == 'bg':
return self.bg_dataset.viewers_with_selected_visible
return self.trace_dataset.viewers_with_selected_visible
@property
def marks_viewers1d(self):
return self.ext_add_results.results_viewers
[docs]
def marks_info(self, include_mark_obj=True):
"""
A dictionary containing each marker name, the viewer(s) (2d/1d spectral)
it belongs in, which step (bg, trace, extract) and plotting style kwargs.
"""
markers = {'trace': {'viewer': '2d', 'steps': ['trace', 'bg', 'ext']},
'extract': {'viewer': '1d', 'steps': ['trace', 'bg', 'ext']},
'bg1_lower': {'viewer': '2d', 'steps': ['bg']},
'bg1_upper': {'viewer': '2d', 'steps': ['bg']},
'bg1_center': {'viewer': '2d', 'steps': ['bg'], 'kw': {'line_style': 'dotted'}},
'bg2_lower': {'viewer': '2d', 'steps': ['bg']},
'bg2_upper': {'viewer': '2d', 'steps': ['bg']},
'bg2_center': {'viewer': '2d', 'steps': ['bg'], 'kw': {'line_style': 'dotted'}},
'bg_spec': {'viewer': '1d', 'steps': ['bg'], 'kw': {'stroke_width': 1}},
'ext_lower': {'viewer': '2d', 'steps': ['ext']},
'ext_upper': {'viewer': '2d', 'steps': ['ext']}}
if include_mark_obj: # add PluginMarkCollection
for key, d in markers.items():
kwargs = {'visible': self.is_active} | d.get('kw', {})
d['mark'] = PluginMarkCollection(PluginLine, **kwargs)
return markers
@cached_property
def marks(self):
"""
Access the marks created by this plugin in both the spectrum-viewer
and spectrum-2d-viewer.
"""
if not self._tray_instance:
return {}
return {k: v['mark'] for k, v in self.marks_info().items()}
@observe('interactive_extract')
@skip_if_no_updates_since_last_active()
@skip_if_not_tray_instance()
@skip_if_not_relevant()
def _update_interactive_extract(self, event={}):
# also called by any of the _interaction_in_*_step
if self.interactive_extract:
try:
sp1d = self.export_extract_spectrum(add_data=False)
except Exception as e:
# NOTE: ignore error, but will be raised when clicking ANY of the export buttons
# NOTE: FitTrace or manual background are often giving a
# "background regions overlapped" error from specreduce
self.ext_specreduce_err = repr(e)
self.marks['extract'].clear()
else:
self.ext_specreduce_err = ''
self.marks['extract'].update_xy(sp1d.spectral_axis.value,
sp1d.flux.value,
viewers=self.marks_viewers1d)
else:
self.marks['extract'].clear()
if self.interactive_extract and self.active_step == 'bg':
try:
spec = self.export_bg_spectrum()
except Exception:
self.marks['bg_spec'].clear()
else:
self.marks['bg_spec'].update_xy(spec.spectral_axis,
spec.flux,
viewers=self.marks_viewers1d)
else:
self.marks['bg_spec'].clear()
@observe('is_active', 'trace_dataset_selected', 'trace_type_selected',
'trace_trace_selected', 'trace_offset', 'trace_order',
'trace_pixel', 'trace_peak_method_selected',
'trace_do_binning', 'trace_bins', 'trace_window', 'active_step')
@skip_if_not_tray_instance()
@skip_if_no_updates_since_last_active()
@skip_if_not_relevant()
def _interaction_in_trace_step(self, event={}):
if ((event.get('name', '') in ('active_step', 'is_active') and self.active_step != 'trace')
or not self.is_active):
return
try:
trace = self.export_trace(add_data=False)
except Exception:
# NOTE: ignore error, but will be raised when clicking ANY of the export buttons
self.marks['trace'].clear()
else:
self.marks['trace'].update_xy(range(len(trace.trace)),
trace.trace,
viewers=self.marks_viewers2d)
self.marks['trace'].line_style = 'solid'
self._update_interactive_extract(event)
self.active_step = 'trace'
@observe('is_active', 'bg_dataset_selected', 'bg_type_selected',
'bg_trace_selected', 'bg_trace_pixel',
'bg_separation', 'bg_width', 'bg_statistic_selected', 'active_step')
@skip_if_not_tray_instance()
@skip_if_no_updates_since_last_active()
@skip_if_not_relevant()
def _interaction_in_bg_step(self, event={}):
if ((event.get('name', '') in ('active_step', 'is_active') and self.active_step != 'bg')
or not self.is_active):
return
try:
trace = self._get_bg_trace()
except Exception:
# NOTE: ignore error, but will be raised when clicking ANY of the export buttons
for mark in ['trace', 'bg1_center', 'bg1_lower', 'bg1_upper',
'bg2_center', 'bg2_lower', 'bg2_upper', 'bg_spec']:
self.marks[mark].clear()
else:
xs = range(len(trace.trace))
self.marks['trace'].update_xy(xs,
trace.trace,
viewers=self.marks_viewers2d)
self.marks['trace'].line_style = 'dashed'
if self.bg_type_selected in ['OneSided', 'TwoSided']:
self.marks['bg1_center'].update_xy(xs,
trace.trace+self.bg_separation,
viewers=self.marks_viewers2d)
self.marks['bg1_lower'].update_xy(xs,
trace.trace+self.bg_separation-self.bg_width/2,
viewers=self.marks_viewers2d)
self.marks['bg1_upper'].update_xy(xs,
trace.trace+self.bg_separation+self.bg_width/2,
viewers=self.marks_viewers2d)
else:
self.marks['bg1_center'].clear()
self.marks['bg1_lower'].update_xy(xs,
trace.trace-self.bg_width/2,
viewers=self.marks_viewers2d)
self.marks['bg1_upper'].update_xy(xs,
trace.trace+self.bg_width/2,
viewers=self.marks_viewers2d)
if self.bg_type_selected == 'TwoSided':
self.marks['bg2_center'].update_xy(xs,
trace.trace-self.bg_separation,
viewers=self.marks_viewers2d)
self.marks['bg2_lower'].update_xy(xs,
trace.trace-self.bg_separation-self.bg_width/2,
viewers=self.marks_viewers2d)
self.marks['bg2_upper'].update_xy(xs,
trace.trace-self.bg_separation+self.bg_width/2,
viewers=self.marks_viewers2d)
else:
for mark in ['bg2_center', 'bg2_lower', 'bg2_upper']:
self.marks[mark].clear()
self._update_interactive_extract(event)
self.active_step = 'bg'
@observe('is_active', 'ext_dataset_selected', 'ext_trace_selected',
'ext_type_selected', 'ext_width', 'active_step',
'horne_ext_profile_selected', 'self_prof_n_bins',
'self_prof_interp_degree_x', 'self_prof_interp_degree_y')
@skip_if_not_tray_instance()
@skip_if_no_updates_since_last_active()
@skip_if_not_relevant()
def _interaction_in_ext_step(self, event={}):
if ((event.get('name', '') in ('active_step', 'is_active') and self.active_step not in ('ext', '')) # noqa
or not self.is_active):
return
try:
trace = self._get_ext_trace()
except Exception:
# NOTE: ignore error, but will be raised when clicking ANY of the export buttons
for mark in ['trace', 'ext_lower', 'ext_upper']:
self.marks[mark].clear()
else:
xs = range(len(trace.trace))
self.marks['trace'].update_xy(xs,
trace.trace,
viewers=self.marks_viewers2d)
self.marks['trace'].line_style = 'dashed'
if self.ext_type_selected == 'Boxcar':
self.marks['ext_lower'].update_xy(xs,
trace.trace-self.ext_width/2,
viewers=self.marks_viewers2d)
self.marks['ext_upper'].update_xy(xs,
trace.trace+self.ext_width/2,
viewers=self.marks_viewers2d)
else:
for mark in ['ext_lower', 'ext_upper']:
self.marks[mark].clear()
self._update_interactive_extract(event)
self.active_step = 'ext'
# TODO: remove this, the traitlet, and the row in spectral_extraction.vue
# when specutils handles the warning/exception
if self.ext_type_selected == 'Horne':
inp_sp2d = self._get_ext_input_spectrum()
self.ext_uncert_warn = isinstance(inp_sp2d.uncertainty, UnknownUncertainty)
else:
self.ext_uncert_warn = False
def _set_create_kwargs(self, **kwargs):
invalid_kwargs = [k for k in kwargs.keys() if not hasattr(self, k)]
if len(invalid_kwargs):
raise ValueError(f"{invalid_kwargs} are not valid attributes to pass as kwargs")
for k, v in kwargs.items():
setattr(self, k, v)
[docs]
def import_trace(self, trace):
"""
Import the input parameters from an existing specreduce Trace object into the plugin.
Parameters
----------
trace : specreduce.tracing.Trace
Trace object to import
"""
if not isinstance(trace, tracing.Trace): # pragma: no cover
raise TypeError("trace must be a specreduce.tracing.Trace object")
if isinstance(trace, tracing.FlatTrace):
self.trace_type_selected = 'Flat'
self.trace_pixel = trace.trace_pos
elif isinstance(trace, tracing.FitTrace):
self.trace_type_selected = trace.trace_model.__class__.__name__.strip('1D')
self.trace_pixel = trace.guess
self.trace_window = trace.window
self.trace_bins = trace.bins
self.trace_do_binning = True
if hasattr(trace.trace_model, 'degree'):
self.trace_order = trace.trace_model.degree
elif isinstance(trace, tracing.ArrayTrace): # pragma: no cover
raise NotImplementedError(f"cannot import ArrayTrace into plugin. Use viz.load instead") # noqa
else: # pragma: no cover
raise NotImplementedError(f"trace of type {trace.__class__.__name__} not supported")
[docs]
@with_spinner('trace_spinner')
def export_trace(self, add_data=False, **kwargs):
"""
Create a specreduce Trace object from the input parameters
defined in the plugin.
Parameters
----------
add_data : bool
Whether to add the resulting trace to the application, according to the options
defined in the plugin.
"""
self._set_create_kwargs(**kwargs)
if len(kwargs) and self.active_step != 'trace':
self.update_marks(step='trace')
if self.trace_trace_selected != 'New Trace':
# then we're offsetting an existing trace
# for FlatTrace, we can keep and expose a new FlatTrace (which has the advantage of
# being able to load back into the plugin)
orig_trace = self.trace_trace.selected_obj
if isinstance(orig_trace, tracing.FlatTrace):
trace = tracing.FlatTrace(self.trace_dataset.selected_obj,
orig_trace.trace_pos+self.trace_offset)
else:
trace = tracing.ArrayTrace(self.trace_dataset.selected_obj,
self.trace_trace.selected_obj.trace+self.trace_offset)
elif self.trace_type_selected == 'Flat':
trace = tracing.FlatTrace(self.trace_dataset.selected_obj,
self.trace_pixel)
elif self.trace_type_selected in _model_cls:
trace_model = _model_cls[self.trace_type_selected](degree=self.trace_order)
trace = tracing.FitTrace(self.trace_dataset.selected_obj,
guess=self.trace_pixel,
bins=int(self.trace_bins) if self.trace_do_binning else None,
window=self.trace_window,
peak_method=self.trace_peak_method_selected.lower(),
trace_model=trace_model)
else:
raise NotImplementedError(f"trace_type={self.trace_type_selected} not implemented")
if add_data:
self.trace_add_results.add_results_from_plugin(trace,
format='Trace',
replace=False)
return trace
def _get_bg_trace(self):
if self.bg_type_selected == 'Manual':
trace = tracing.FlatTrace(self.trace_dataset.get_selected_spectrum(
use_display_units=True),
self.bg_trace_pixel)
elif self.bg_trace_selected == 'From Plugin':
trace = self.export_trace(add_data=False)
else:
trace = self.bg_trace.get_selected_spectrum(use_disaply_units=True)
return trace
[docs]
def import_bg(self, bg):
"""
Import the input parameters from an existing specreduce Background object into the plugin.
Parameters
----------
bg : specreduce.background.Background
Background object to import
"""
if not isinstance(bg, background.Background): # pragma: no cover
raise TypeError("bg must be a specreduce.background.Background object")
# TODO: should we detect/set the referenced dataset?
trace = self._get_bg_trace()
if len(bg.traces) == 2:
# try to detect constant separation
seps1 = bg.traces[0].trace - trace.trace
seps2 = trace.trace - bg.traces[1].trace
if np.all(seps1 == seps1[0]) and np.all(seps2 == seps1[0]):
self.bg_type_selected = 'TwoSided'
self.bg_separation = abs(int(seps1[0]))
else: # pragma: no cover
raise NotImplementedError("backgrounds with custom traces not supported (could not detect common separation)") # noqa
elif len(bg.traces) == 1:
# either one_sided or trace, let's see if its constant offset from the trace
seps = bg.traces[0].trace - trace.trace
if np.all(seps == seps[0]):
self.bg_type_selected = 'OneSided'
self.bg_separation = int(seps[0])
else: # pragma: no cover
raise NotImplementedError("backgrounds with custom traces not supported (could not detect common separation)") # noqa
else: # pragma: no cover
raise NotImplementedError("backgrounds with more than 2 traces not supported")
self.bg_width = bg.width
[docs]
@with_spinner('bg_spinner')
def export_bg(self, **kwargs):
"""
Create a specreduce Background object from the input parameters defined in the plugin.
"""
self._set_create_kwargs(**kwargs)
if len(kwargs) and self.active_step != 'bg':
self.update_marks(step='bg')
trace = self._get_bg_trace()
if self.bg_type_selected == 'Manual':
bg = background.Background(self.bg_dataset.get_selected_spectrum(
use_display_units=True),
[trace], width=self.bg_width,
statistic=self.bg_statistic.selected.lower())
elif self.bg_type_selected == 'OneSided':
bg = background.Background.one_sided(self.bg_dataset.get_selected_spectrum(
use_display_units=True),
trace,
self.bg_separation,
width=self.bg_width,
statistic=self.bg_statistic.selected.lower())
elif self.bg_type_selected == 'TwoSided':
bg = background.Background.two_sided(self.bg_dataset.get_selected_spectrum(
use_display_units=True),
trace,
self.bg_separation,
width=self.bg_width,
statistic=self.bg_statistic.selected.lower())
else: # pragma: no cover
raise NotImplementedError(f"bg_type={self.bg_type_selected} not implemented")
return bg
[docs]
@with_spinner('bg_img_spinner')
def export_bg_img(self, add_data=False, **kwargs):
"""
Create a background 2D spectrum from the input parameters defined in the plugin.
Parameters
----------
add_data : bool
Whether to add the resulting image to the application, according to the options
defined in the plugin.
"""
bg_spec = self.export_bg(**kwargs).bkg_image()
if add_data:
self.bg_add_results.add_results_from_plugin(bg_spec,
format='2D Spectrum',
replace=True)
return bg_spec
[docs]
def vue_create_bg_img(self, *args):
try:
self.export_bg_img(add_data=True)
except Exception as e:
self.app.hub.broadcast(
SnackbarMessage(f"Specreduce background failed with the following error: {repr(e)}",
color='error', sender=self, traceback=e)
)
[docs]
@with_spinner('bg_spec_spinner')
def export_bg_spectrum(self, add_data=False, **kwargs):
"""
Create a background 1D spectrum from the input parameters defined in the plugin.
Parameters
----------
add_data : bool
Whether to add the resulting spectrum to the application, according to the options
defined in the plugin.
"""
spec = self.export_bg(**kwargs).bkg_spectrum()
if add_data:
self.bg_spec_add_results.add_results_from_plugin(spec,
format='1D Spectrum',
replace=False)
return spec
[docs]
@with_spinner('bg_sub_spinner')
def export_bg_sub(self, add_data=False, **kwargs):
"""
Create a background-subtracted 2D spectrum from the input parameters defined in the plugin.
Parameters
----------
add_data : bool
Whether to add the resulting image to the application, according to the options
defined in the plugin.
"""
bg_sub_spec = self.export_bg(**kwargs).sub_image()
if add_data:
self.bg_sub_add_results.add_results_from_plugin(bg_sub_spec,
format='2D Spectrum',
replace=True)
return bg_sub_spec
def _get_ext_trace(self):
if self.ext_trace_selected == 'From Plugin':
return self.export_trace(add_data=False)
else:
return self.ext_trace.get_selected_spectrum(use_display_units=True)
def _get_ext_input_spectrum(self):
if self.ext_dataset_selected == 'From Plugin':
return self.export_bg_sub(add_data=False)
else:
return self.ext_dataset.get_selected_spectrum(use_display_units=True)
[docs]
def import_extract(self, ext):
"""
Import the input parameters from an existing specreduce extract object into the plugin.
Parameters
----------
ext : specreduce.extract.BoxcarExtract
Extract object to import
"""
if isinstance(ext, extract.BoxcarExtract):
self.ext_type_selected = 'Boxcar'
self.ext_width = ext.width
elif isinstance(ext, extract.HorneExtract):
self.ext_type_selected = 'Horne'
else: # pragma: no cover
raise TypeError("ext must be a specreduce.extract.BoxcarExtract or specreduce.extract.HorneExtract object") # noqa
[docs]
def export_extract(self, **kwargs):
"""
Create a specreduce extraction object from the input parameters defined in the plugin.
"""
self._set_create_kwargs(**kwargs)
if len(kwargs) and self.active_step != 'ext':
self.update_marks(step='ext')
trace = self._get_ext_trace()
inp_sp2d = self._get_ext_input_spectrum()
if self.ext_type_selected == 'Boxcar':
ext = extract.BoxcarExtract(inp_sp2d, trace, width=self.ext_width)
elif self.ext_type_selected == 'Horne':
spatial_profile = None
if inp_sp2d.uncertainty is None:
inp_sp2d.uncertainty = VarianceUncertainty(np.ones_like(inp_sp2d.data))
if not hasattr(inp_sp2d.uncertainty, 'uncertainty_type'):
inp_sp2d.uncertainty = StdDevUncertainty(inp_sp2d.uncert)
if self.horne_ext_profile_selected == 'Self (interpolated)':
# check inputs
if self.self_prof_n_bins <= 0:
raise ValueError('`self_prof_n_bins` must be greater than 0.')
if self.self_prof_interp_degree_x <= 0:
raise ValueError('`self_prof_interp_degree_x` must be greater than 0.')
if self.self_prof_interp_degree_y <= 0:
raise ValueError('`self_prof_interp_degree_y` must be greater than 0.')
# setup dict of interpolation options
n_bins_interpolated_profile = self.self_prof_n_bins
interp_degree = (self.self_prof_interp_degree_x, self.self_prof_interp_degree_y)
spatial_profile = {'name': 'interpolated_profile',
'n_bins_interpolated_profile': n_bins_interpolated_profile,
'interp_degree': interp_degree}
elif self.horne_ext_profile_selected == 'Gaussian':
spatial_profile = 'gaussian'
else:
raise ValueError("Horne extraction profile must either be 'Gaussian' or 'Self (interpolated)'") # noqa
ext = extract.HorneExtract(inp_sp2d, trace, spatial_profile=spatial_profile)
else:
raise NotImplementedError(f"extraction type '{self.ext_type_selected}' not supported") # noqa
return ext
[docs]
@with_spinner('spinner')
def export_extract_spectrum(self, add_data=False, **kwargs):
"""
Create an extracted 1D spectrum from the input parameters defined in the plugin.
Parameters
----------
add_data : bool
Whether to add the resulting spectrum to the application, according to the options
defined in the plugin.
"""
extract = self.export_extract(**kwargs)
spectrum = extract.spectrum
if add_data:
# TODO: eventually generalize this logic into add_results_from_plugin
if not len(self.marks_viewers1d):
# no spectrum1d viewer, create one now and set the default viewer
viewer_ref = self.app.return_unique_name('1D Spectrum',
typ='viewer')
self.app._on_new_viewer(NewViewerMessage(Spectrum1DViewer,
data=None,
sender=self.app),
vid=viewer_ref, name=viewer_ref,
open_data_menu_if_empty=False)
self.ext_add_results.viewer = viewer_ref
self.ext_add_results.add_results_from_plugin(spectrum,
format='1D Spectrum',
replace=False)
return spectrum