Source code for jdaviz.app

import operator
import os
import pathlib
import re
import uuid
import warnings
import ipyvue
from astropy import units as u
from astropy.nddata import NDData, NDDataArray
from astropy.io import fits
from astropy.time import Time
from echo import (CallbackProperty, DictCallbackProperty,
                  ListCallbackProperty, delay_callback)
from ipygoldenlayout import GoldenLayout
from ipysplitpanes import SplitPanes
import numpy as np
from glue.config import data_translator, settings as glue_settings
from glue.core import HubListener
from glue.core.link_helpers import LinkSame, LinkSameWithUnits
from glue.core.message import (DataCollectionAddMessage,
                               DataCollectionDeleteMessage,
                               SubsetCreateMessage,
                               SubsetUpdateMessage,
                               SubsetDeleteMessage)
from glue.core.roi import CircularROI, CircularAnnulusROI, EllipticalROI, RectangularROI
from glue.core.state_objects import State
from glue.core.subset import (RangeSubsetState, RoiSubsetState,
                              CompositeSubsetState, InvertState)
from glue.core.units import unit_converter
from glue_astronomy.spectral_coordinates import SpectralCoordinates
from glue_astronomy.translators.regions import roi_subset_state_to_region
from glue_jupyter.app import JupyterApplication
from glue_jupyter.common.toolbar_vuetify import read_icon
from glue_jupyter.state_traitlets_helpers import GlueState
from ipypopout import PopoutButton
from ipyvuetify import VuetifyTemplate
from ipywidgets import widget_serialization
from traitlets import Dict, Bool, List, Unicode, Any
from specutils import Spectrum, SpectralRegion
from specutils.utils.wcs_utils import SpectralGWCS

from jdaviz import __version__
from jdaviz import style_registry
from jdaviz.core.config import read_configuration, get_configuration
from jdaviz.core.events import (LoadDataMessage, NewViewerMessage, AddDataMessage,
                                DataRenamedMessage, SnackbarMessage, RemoveDataMessage,
                                SubsetRenameMessage, AddDataToViewerMessage,
                                RemoveDataFromViewerMessage, ViewerAddedMessage,
                                ViewerRemovedMessage, ViewerRenamedMessage, ChangeRefDataMessage,
                                IconsUpdatedMessage, LayersFinalizedMessage)
from jdaviz.core.registries import (tool_registry, tray_registry,
                                    viewer_registry, viewer_creator_registry,
                                    data_parser_registry, loader_resolver_registry)
from jdaviz.core.tools import ICON_DIR
from jdaviz.utils import (SnackbarQueue, alpha_index, data_has_valid_wcs,
                          layer_is_table_data, MultiMaskSubsetState,
                          _wcs_only_label, CONFIGS_WITH_LOADERS,
                          _get_celestial_wcs)
from jdaviz.core.custom_units_and_equivs import SPEC_PHOTON_FLUX_DENSITY_UNITS, enable_spaxel_unit
from jdaviz.core.unit_conversion_utils import (check_if_unit_is_per_solid_angle,
                                               combine_flux_and_angle_units,
                                               flux_conversion_general,
                                               spectral_axis_conversion,
                                               supported_sq_angle_units,
                                               viewer_flux_conversion_equivalencies)

__all__ = ['Application', 'ALL_JDAVIZ_CONFIGS', 'UnitConverterWithSpectral']

SplitPanes()
GoldenLayout()

enable_spaxel_unit()

# This shows up repeatedly when doing many operations, I think it's reasonable to disable it
warnings.filterwarnings('ignore', message="The unit 'Angstrom' has been deprecated"
                        "in the VOUnit standard")

CONTAINER_TYPES = dict(row='gl-row', col='gl-col', stack='gl-stack')
EXT_TYPES = dict(flux=['flux', 'sci'],
                 uncert=['ivar', 'err', 'var', 'uncert'],
                 mask=['mask', 'dq'])
ALL_JDAVIZ_CONFIGS = ['cubeviz', 'specviz', 'specviz2d', 'mosviz', 'imviz']


[docs] @unit_converter('custom-jdaviz') class UnitConverterWithSpectral:
[docs] def equivalent_units(self, data, cid, units): if (data.meta.get('_importer') == 'ImageImporter' and u.Unit(data.get_component(cid).units).physical_type == 'surface brightness'): all_flux_units = SPEC_PHOTON_FLUX_DENSITY_UNITS + ['ct'] angle_units = supported_sq_angle_units() all_sb_units = combine_flux_and_angle_units(all_flux_units, angle_units) list_of_units = set(list(map(str, u.Unit(units).find_equivalent_units( include_prefix_units=True))) + all_flux_units + all_sb_units ) elif cid.label in ("flux"): eqv = u.spectral_density(1 * u.m) # Value does not matter here. all_flux_units = SPEC_PHOTON_FLUX_DENSITY_UNITS + ['ct'] angle_units = supported_sq_angle_units() all_sb_units = combine_flux_and_angle_units(all_flux_units, angle_units) # list of all possible units for spectral y axis, independent of data loaded # list_of_units = set(list(map(str, u.Unit(units).find_equivalent_units( include_prefix_units=True, equivalencies=eqv))) + all_flux_units + all_sb_units ) else: # spectral axis # prefer Hz over Bq and um over micron exclude = {'Bq', 'micron'} list_of_units = set(list(map(str, u.Unit(units).find_equivalent_units( include_prefix_units=True, equivalencies=u.spectral())))) - exclude return list_of_units
[docs] def to_unit(self, data, cid, values, original_units, target_units): # Given a glue data object (data), a component ID (cid), the values # to convert, and the original and target units of the values, this method # should return the converted values. Note that original_units # gives the units of the values array, which might not be the same # as the original native units of the component in the data. if cid.label == 'Pixel Axis 0 [z]' and target_units == '': # handle ramps loaded into Rampviz by avoiding conversion # of the groups axis: return values elif (data.meta.get('_importer') == 'ImageImporter' and u.Unit(data.get_component(cid).units).physical_type == 'surface brightness'): # handle surface brightness units in image-like data return (values * u.Unit(original_units)).to_value(target_units) elif cid.label in ("flux"): try: spec = data.get_object(cls=Spectrum) except RuntimeError: data = data.get_object(cls=NDDataArray) spec = Spectrum(flux=data.data * u.Unit(original_units)) # equivalencies for flux/surface brightness conversions viewer_equivs = viewer_flux_conversion_equivalencies(values, spec) return flux_conversion_general(values, original_units, target_units, viewer_equivs, with_unit=False) else: # spectral axis return spectral_axis_conversion(values, original_units, target_units)
# Set default opacity for data layers to 1 instead of 0.8 in # some glue-core versions glue_settings.DATA_ALPHA = 1 # Enable spectrum unit conversion. glue_settings.UNIT_CONVERTER = 'custom-jdaviz' custom_components = {'j-tooltip': 'components/tooltip.vue', 'j-external-link': 'components/external_link.vue', 'j-docs-link': 'components/docs_link.vue', 'j-layer-viewer-icon': 'components/layer_viewer_icon.vue', 'j-layer-viewer-icon-stylized': 'components/layer_viewer_icon_stylized.vue', 'j-loader-panel': 'components/loader_panel.vue', 'j-new-viewer-panel': 'components/new_viewer_panel.vue', 'j-loader': 'components/loader.vue', 'j-viewer-creator': 'components/viewer_creator.vue', 'j-tray-plugin': 'components/tray_plugin.vue', 'j-play-pause-widget': 'components/play_pause_widget.vue', 'j-plugin-section-header': 'components/plugin_section_header.vue', 'j-number-uncertainty': 'components/number_uncertainty.vue', 'j-plugin-popout': 'components/plugin_popout.vue', 'j-multiselect-toggle': 'components/multiselect_toggle.vue', 'j-subset-icon': 'components/subset_icon.vue', 'j-plugin-live-results-icon': 'components/plugin_live_results_icon.vue', 'j-child-layer-icon': 'components/child_layer_icon.vue', 'j-about-menu': 'components/about_menu.vue', 'j-custom-toolbar-toggle': 'components/custom_toolbar_toggle.vue', 'plugin-previews-temp-disabled': 'components/plugin_previews_temp_disabled.vue', # noqa 'plugin-table': 'components/plugin_table.vue', 'plugin-select': 'components/plugin_select.vue', 'plugin-select-filter': 'components/plugin_select_filter.vue', 'plugin-dataset-select': 'components/plugin_dataset_select.vue', 'plugin-subset-select': 'components/plugin_subset_select.vue', 'plugin-viewer-select': 'components/plugin_viewer_select.vue', 'plugin-viewer-create-new': 'components/plugin_viewer_create_new.vue', 'plugin-layer-select': 'components/plugin_layer_select.vue', 'plugin-layer-select-tabs': 'components/plugin_layer_select_tabs.vue', 'plugin-editable-select': 'components/plugin_editable_select.vue', 'plugin-inline-select': 'components/plugin_inline_select.vue', 'plugin-inline-select-item': 'components/plugin_inline_select_item.vue', 'plugin-switch': 'components/plugin_switch.vue', 'plugin-action-button': 'components/plugin_action_button.vue', 'plugin-add-results': 'components/plugin_add_results.vue', 'plugin-auto-label': 'components/plugin_auto_label.vue', 'plugin-file-import-select': 'components/plugin_file_import_select.vue', 'plugin-slider': 'components/plugin_slider.vue', 'plugin-color-picker': 'components/plugin_color_picker.vue', 'plugin-input-header': 'components/plugin_input_header.vue', 'plugin-loaders-panel': 'components/plugin_loaders_panel.vue', 'glue-state-sync-wrapper': 'components/glue_state_sync_wrapper.vue', 'glue-state-select': 'components/glue_state_select.vue', 'data-menu-add': 'components/data_menu_add.vue', 'data-menu-remove': 'components/data_menu_remove.vue', 'data-menu-subset-edit': 'components/data_menu_subset_edit.vue', 'hover-api-hint': 'components/hover_api_hint.vue', 'j-rename-text': 'components/j_rename_text.vue'} # Register pure vue component. This allows us to do recursive component instantiation only in the # vue component file for name, path in custom_components.items(): ipyvue.register_component_from_file(None, name, os.path.join(os.path.dirname(__file__), path)) ipyvue.register_component_from_file('g-viewer-tab', "container.vue", __file__) style_registry.add((__file__, 'main_styles.vue')) class ApplicationState(State): """ The application state object contains all the current front-end state, including the loaded data name references, the active viewers, plugins, and layout. This state object allows for nested callbacks in mutable objects like dictionaries and makes it so incremental changes to nested values propagate to the traitlet in order to trigger a UI re-render. """ drawer_content = CallbackProperty( '', docstring="Content shown in the tray drawer.") add_subtab = CallbackProperty( 0, docstring="Index of the active subtab in the add sidebar.") settings_subtab = CallbackProperty( 0, docstring="Index of the active subtab in the settings sidebar.") info_subtab = CallbackProperty( 0, docstring="Index of the active subtab in the info sidebar.") jdaviz_version = CallbackProperty( __version__, docstring="Version of Jdaviz.") global_search = CallbackProperty( '', docstring="Global search string.") global_search_menu = CallbackProperty( False, docstring="Whether to show the global search menu.") show_toolbar_buttons = CallbackProperty( True, docstring="Whether to show app-level toolbar buttons (left of sidebar menu button).") show_api_hints = CallbackProperty( False, docstring="Whether to show API hints.") subset_mode_create = CallbackProperty( False, docstring="Whether to create a new subset.") snackbar = DictCallbackProperty({ 'show': False, 'test': "", 'color': None, 'timeout': 3000, 'loading': False }, docstring="State of the quick toast messages.") snackbar_queue = SnackbarQueue() settings = DictCallbackProperty({ 'data': { 'auto_populate': False, 'parser': None }, 'visible': { 'menu_bar': True, 'toolbar': True, 'tray': True, 'tab_headers': True, }, 'dense_toolbar': True, # In the context of a remote server, allow/disallow showing the loader # server_is_remote == False -> Usual behavior, show loader, etc. # server_is_remote + remote_enable_importers==False -> hide loader panel completely, # prepopulate the data # server_is_remote + remote_enable_importers==True -> hide the loader, # but allow selecting and loading items from the file. This is used for # Spectrum Lists or multi-extension images. 'server_is_remote': False, # sets some defaults, should be set before loading the config 'remote_enable_importers': True, # Depends on server_is_remote, see above 'context': { 'notebook': { 'max_height': '600px' } }, 'layout': { } }, docstring="Top-level application settings.") icons = DictCallbackProperty({ 'radialtocheck': read_icon(os.path.join(ICON_DIR, 'radialtocheck.svg'), 'svg+xml'), 'checktoradial': read_icon(os.path.join(ICON_DIR, 'checktoradial.svg'), 'svg+xml'), 'nuer': read_icon(os.path.join(ICON_DIR, 'right-east.svg'), 'svg+xml'), 'nuel': read_icon(os.path.join(ICON_DIR, 'left-east.svg'), 'svg+xml'), 'api': read_icon(os.path.join(ICON_DIR, 'api.svg'), 'svg+xml'), 'api-lock': read_icon(os.path.join(ICON_DIR, 'api_lock.svg'), 'svg+xml'), }, docstring="Custom application icons") viewer_icons = DictCallbackProperty({}, docstring="Indexed icons (numbers) for viewers across the app") # noqa layer_icons = DictCallbackProperty({}, docstring="Indexed icons (letters) for layers across the app") # noqa dev_loaders = CallbackProperty( False, docstring='Whether to enable developer mode for new loaders infrastructure') # catalogs_in_dc PRs (include in changelog when removing the dev-flag): # https://github.com/spacetelescope/jdaviz/pull/3761 # https://github.com/spacetelescope/jdaviz/pull/3777 # https://github.com/spacetelescope/jdaviz/pull/3778 # https://github.com/spacetelescope/jdaviz/pull/3799 # https://github.com/spacetelescope/jdaviz/pull/3814 # https://github.com/spacetelescope/jdaviz/pull/3835 # https://github.com/spacetelescope/jdaviz/pull/3854 # https://github.com/spacetelescope/jdaviz/pull/3856 # https://github.com/spacetelescope/jdaviz/pull/3863 # https://github.com/spacetelescope/jdaviz/pull/3867 - table viewer # https://github.com/spacetelescope/jdaviz/pull/3930 - table viewer subset select # https://github.com/spacetelescope/jdaviz/pull/3906 - SDSS/Gaia in astroquery resolver # https://github.com/spacetelescope/jdaviz/pull/3912 # https://github.com/spacetelescope/jdaviz/pull/3899 catalogs_in_dc = CallbackProperty( False, docstring="Whether to enable developer mode for adding catalogs to data collection.") loader_items = ListCallbackProperty( docstring="List of loaders available to the application.") loader_selected = CallbackProperty( '', docstring="Active loader shown in the loaders panel.") new_viewer_items = ListCallbackProperty( docstring="List of new viewer items available to the application.") new_viewer_selected = CallbackProperty( '', docstring="Active new viewer shown in the new viewers panel.") data_items = ListCallbackProperty( docstring="List of data items parsed from the Glue data collection.") tool_items = ListCallbackProperty( docstring="Collection of toolbar items displayed in the application.") tray_items = ListCallbackProperty( docstring="List of plugins displayed in the sidebar tray area.") tray_items_open = CallbackProperty( [], docstring="The plugin(s) opened in sidebar tray area.") tray_items_filter = CallbackProperty( '', docstring='User-filter on tray items') stack_items = ListCallbackProperty( docstring="Nested collection of viewers constructed to support the " "Golden Layout viewer area.") viewer_items = ListCallbackProperty( docstring="List (flat) of viewer objects") style_widget = CallbackProperty( '', docstring="Jupyter widget that won't be displayed but can apply css to the app" )
[docs] class Application(VuetifyTemplate, HubListener): """ The main application object containing implementing the ipyvue/vuetify template instructions for composing the interface. """ _metadata = Dict({"mount_id": "content"}).tag(sync=True) state = GlueState().tag(sync=True) template_file = __file__, "app.vue" existing_data_in_dc = List([]).tag(sync=True) loading = Bool(False).tag(sync=True) config = Unicode("").tag(sync=True) api_hints_obj = Unicode("").tag(sync=True) # will use config if not defined vdocs = Unicode("").tag(sync=True) docs_link = Unicode("").tag(sync=True) popout_button = Any().tag(sync=True, **widget_serialization) style_registry_instance = Any().tag(sync=True, **widget_serialization) invisible_children = List(Any()).tag(sync=True, **widget_serialization) golden_layout_state = Dict(default_value=None, allow_none=True).tag(sync=True) force_open_about = Bool(False).tag(sync=True) def __init__(self, configuration=None, *args, **kwargs): super().__init__(*args, **kwargs) self._jdaviz_helper = None self.popout_button = PopoutButton(self) self.style_registry_instance = style_registry.get_style_registry() # Generate a state object for this application to maintain the state of # the user interface. self.state = ApplicationState() # The application handler stores the state of the data and the # underlying glue infrastructure self._application_handler = JupyterApplication( settings={'new_subset_on_selection_tool_change': True, 'single_global_active_tool': False}) # Add a reference to this application to the Glue session object. This # allows the jdaviz Application object to then be accessed via e.g. # viewer.session.jdaviz_app self._application_handler.session.jdaviz_app = self # Create a dictionary for holding non-ipywidget viewer objects so we # can reference their state easily since glue does not store viewers self._viewer_store = {} # Flag to indicate a data rename operation is in progress # Handlers can check this to skip processing during renames self._renaming_data = False from jdaviz.core.events import PluginTableAddedMessage, PluginPlotAddedMessage self._plugin_tables = {} self.hub.subscribe(self, PluginTableAddedMessage, handler=self._on_plugin_table_added) self._plugin_plots = {} self.hub.subscribe(self, PluginPlotAddedMessage, handler=self._on_plugin_plot_added) # Convenient reference of all existing subset names self._reserved_labels = set([]) # Parse the yaml configuration file used to compose the front-end UI self.load_configuration(configuration) # If true, link data on load. If false, do not link data to speed up # data loading self.auto_link = kwargs.pop('auto_link', True) # Imviz linking self._align_by = 'pixels' if self.config == "imviz": self._wcs_fast_approximation = None # Subscribe to messages indicating that a new viewer needs to be # created. When received, information is passed to the application # handler to generate the appropriate viewer instance. self.hub.subscribe(self, NewViewerMessage, handler=self._on_new_viewer) # Subscribe to messages indicating new data should be loaded into the # application self.hub.subscribe(self, LoadDataMessage, handler=lambda msg: self.load_data(msg.path)) # Subscribe to the event fired when data is added to the application- # level data collection object self.hub.subscribe(self, DataCollectionAddMessage, handler=self._on_data_added) # Subscribe to the event fired when data is deleted from the # application-level data collection object self.hub.subscribe(self, DataCollectionDeleteMessage, handler=self._on_data_deleted) self.hub.subscribe(self, AddDataToViewerMessage, handler=lambda msg: self.add_data_to_viewer( msg.viewer_reference, msg.data_label)) self.hub.subscribe(self, RemoveDataFromViewerMessage, handler=lambda msg: self.remove_data_from_viewer( msg.viewer_reference, msg.data_label)) # Subscribe to snackbar messages and tie them to the display of the # message box self.hub.subscribe(self, SnackbarMessage, handler=self._on_snackbar_message) # Internal cache so we don't have to keep calling get_object for the same Data. # Key should be (data_label, statistic) and value the translated object. self._get_object_cache = {} self.hub.subscribe(self, SubsetUpdateMessage, handler=self._on_subset_update_message) # These both call _on_layers_changed self.hub.subscribe(self, SubsetDeleteMessage, handler=self._on_subset_delete_message) self.hub.subscribe(self, SubsetCreateMessage, handler=self._on_subset_create_message) self.hub.subscribe(self, SubsetRenameMessage, handler=self._on_subset_rename_message) # Store for associations between Data entries: self._data_associations = self._init_data_associations() # Subscribe to messages that result in changes to the layers self.hub.subscribe(self, AddDataMessage, handler=self._on_add_data_message) self.hub.subscribe(self, RemoveDataMessage, handler=self._on_layers_changed) # Emit messages when icons are updated self.state.add_callback('viewer_icons', lambda value: self.hub.broadcast(IconsUpdatedMessage('viewer', value, sender=self))) # noqa self.state.add_callback('layer_icons', lambda value: self.hub.broadcast(IconsUpdatedMessage('layer', value, sender=self))) # noqa def _on_plugin_table_added(self, msg): if msg.plugin._plugin_name is None: # plugin was instantiated after the app was created, ignore return key = f"{msg.plugin._plugin_name}: {msg.table._table_name}" self._plugin_tables.setdefault(key, msg.table.user_api) def _iter_live_plugin_results(self, trigger_data_lbl=None, trigger_subset=None): trigger_subset_lbl = trigger_subset.label if trigger_subset is not None else None for data in self.data_collection: plugin_inputs = data.meta.get('_update_live_plugin_results', None) if plugin_inputs is None: continue data_subs = plugin_inputs.get('_subscriptions', {}).get('data', []) subset_subs = plugin_inputs.get('_subscriptions', {}).get('subset', []) if (trigger_data_lbl is not None and not np.any([plugin_inputs.get(attr) == trigger_data_lbl for attr in data_subs])): # trigger data does not match subscribed data entries continue if trigger_subset_lbl is not None: if not np.any([plugin_inputs.get(attr) == trigger_subset_lbl for attr in subset_subs]): # trigger subset does not match subscribed subsets continue if not np.any([plugin_inputs.get(attr) == trigger_subset.data.label for attr in data_subs]): # trigger parent data of subset does not match subscribed data entries continue yield (data, plugin_inputs) def _update_live_plugin_results(self, trigger_data_lbl=None, trigger_subset=None): for data, plugin_inputs in self._iter_live_plugin_results(trigger_data_lbl, trigger_subset): # update and overwrite data # make a new instance of the plugin to avoid changing any UI settings plg = self._jdaviz_helper.plugins.get(data.meta.get('plugin'))._obj.new() if not plg.supports_auto_update: raise NotImplementedError(f"{data.meta.get('plugin')} does not support live-updates") # noqa plg.user_api.from_dict(plugin_inputs) # keep auto-updating, even if the option is hidden from the user API # (can remove this line if auto_update is exposed to the user API in the future) plg.add_results.auto_update_result = True try: plg() except Exception as e: self.hub.broadcast(SnackbarMessage( f"Auto-update for {plugin_inputs['add_results']['label']} failed: {e}", sender=self, color="error")) def _remove_live_plugin_results(self, trigger_data_lbl=None, trigger_subset=None): for data, plugin_inputs in self._iter_live_plugin_results(trigger_data_lbl, trigger_subset): self.hub.broadcast(SnackbarMessage( f"Removing {data.label} due to deletion of {trigger_subset.label if trigger_subset is not None else trigger_data_lbl}", # noqa sender=self, color="warning")) self.data_item_remove(data.label) def _on_add_data_message(self, msg): self._on_layers_changed(msg) self._update_live_plugin_results(trigger_data_lbl=msg.data.label) def _on_subset_update_message(self, msg): # NOTE: print statements in here will require the viewer output_widget self._clear_object_cache(msg.subset.label) if msg.attribute == 'subset_state': self._update_live_plugin_results(trigger_subset=msg.subset) def _on_subset_delete_message(self, msg): self._remove_live_plugin_results(trigger_subset=msg.subset) if msg.subset.label in self._reserved_labels: # This might already be gone in test teardowns self._reserved_labels.remove(msg.subset.label) self._on_layers_changed(msg) def _on_subset_create_message(self, msg): self._reserved_labels.add(msg.subset.label) self._on_layers_changed(msg) def _on_subset_rename_message(self, msg): # Update _reserved_labels when a subset is renamed # msg has old_label and new_label attributes if msg.old_label in self._reserved_labels: self._reserved_labels.remove(msg.old_label) self._reserved_labels.add(msg.new_label) # Update metadata in all data entries that reference the old subset label # This ensures auto-update tracking continues after subset renaming for data in self.data_collection: self._update_live_plugin_results_metadata(data, msg.old_label, msg.new_label, 'subset') def _on_plugin_plot_added(self, msg): if msg.plugin._plugin_name is None: # plugin was instantiated after the app was created, ignore return key = f"{msg.plugin._plugin_name}: {msg.plot._plot_name}" self._plugin_plots.setdefault(key, msg.plot.user_api) @property def hub(self): """ Reference to the stored application handler `~glue.core.hub.Hub` instance for the application. """ return self._application_handler.data_collection.hub @property def session(self): """ Reference to the stored `~glue.core.session.Session` instance maintained by Glue for this application. """ return self._application_handler.session @property def data_collection(self): """ Reference to the stored `~glue.core.data_collection.DataCollection` instance, used to maintain the the data objects that been loaded into the application this session. """ return self._application_handler.data_collection def _add_style(self, path): """ Appends an addition vue file containing a <style> tag that will be applied on top of the style defined in ``main_styles.vue``. This is useful for config-specific or downstream styling at the app-level. Parameters ---------- path : str or tuple Path to a ``.vue`` file containing style rules to inject into the app. """ style_registry.add(path) def _on_snackbar_message(self, msg): """ Displays a toast message with an editable message that be dismissed manually or will dismiss automatically after a timeout. Whether the message shows as a snackbar popup is controlled by ``logger_plg.verbosity``, whether the message is added to the history log is controlled by ``logger_plg.history_verbosity``. Parameters ---------- msg : `~glue.core.SnackbarMessage` The Glue snackbar message containing information about displaying the message box. """ # https://material-ui.com/customization/palette/ provides these options: # success, info, warning, error, secondary, primary # We have these options: # debug, info, warning, error # Therefore: # * debug is not used, it is more for the future if we also have a logger. # * info lets everything through # * success, secondary, and primary are treated as info (not sure what they are used for) # * None is also treated as info (when color is not set) logger_plg = self._jdaviz_helper.plugins.get('Logger', None) logger_plg._obj.queue_message(msg, msg.color) def _on_layers_changed(self, msg): if hasattr(msg, 'data'): layer_name = msg.data.label is_wcs_only = msg.data.meta.get(_wcs_only_label, False) is_not_child = self._get_assoc_data_parent(layer_name) is None children_layers = self._get_assoc_data_children(layer_name) elif hasattr(msg, 'subset'): # We don't need to reprocess the subset for every data collection entry if msg.subset.data != self.data_collection[0]: return layer_name = msg.subset.label is_wcs_only = False is_not_child = True children_layers = [] else: raise NotImplementedError(f"cannot recognize new layer from {msg}") wcs_only_refdata_icon = '' # blank - might be replaced with custom icon in the future # any changes here should also be manually reflected in orientation.vue orientation_icons = {'Default orientation': 'mdi-image-outline', 'North-up, East-left': 'nuel', 'North-up, East-right': 'nuer'} if layer_name not in self.state.layer_icons: if is_wcs_only: self.state.layer_icons = {**self.state.layer_icons, layer_name: orientation_icons.get(layer_name, wcs_only_refdata_icon)} elif not is_not_child: parent_label = self._get_assoc_data_parent(layer_name) parent_icon = self.state.layer_icons.get(parent_label) index = len([ln for ln, ic in self.state.layer_icons.items() if not ic[:4] == 'mdi-' and self._get_assoc_data_parent(ln) == parent_label]) + 1 self.state.layer_icons = { **self.state.layer_icons, layer_name: f"{parent_icon}{index}" } else: self.state.layer_icons = { **self.state.layer_icons, layer_name: alpha_index(len([ln for ln, ic in self.state.layer_icons.items() if not ic[:4] == 'mdi-' and self._get_assoc_data_parent(ln) is None])) } # all remaining layers at this point have a parent: child_layer_icons = {} for layer_name in self.state.layer_icons: children_layers = self._get_assoc_data_children(layer_name) if children_layers is not None: parent_icon = self.state.layer_icons[layer_name] for i, child_layer in enumerate(children_layers, start=1): if child_layer not in self.state.layer_icons: child_layer_icons[child_layer] = f'{parent_icon}{i}' if child_layer_icons: self.state.layer_icons = { **self.state.layer_icons, **child_layer_icons } def _change_reference_data(self, new_refdata_label, viewer_id=None): """ Change reference data to Data with ``data_label``. This does not work on data without WCS. """ if self.config not in ('imviz', 'deconfigged'): # this method is only meant for Imviz for now return if viewer_id is None: viewer = self._jdaviz_helper.default_viewer._obj.glue_viewer else: viewer = self.get_viewer(viewer_id) if not hasattr(viewer, '_get_center_skycoord'): return old_refdata = viewer.state.reference_data if old_refdata is not None and ((new_refdata_label == old_refdata.label) or (old_refdata.coords is None)): # if there's no refdata change nor WCS, don't do anything: return if old_refdata is None: return # locate the central coordinate of old refdata in this viewer: sky_cen = viewer._get_center_skycoord() # estimate FOV in the viewer with old reference data: fov_sky_init = viewer._get_fov() new_refdata = self.data_collection[new_refdata_label] # make sure new refdata can be selected: refdata_choices = [choice.label for choice in viewer.state.ref_data_helper.choices] if new_refdata_label not in refdata_choices: viewer.state.ref_data_helper.append_data(new_refdata) if new_refdata_label not in [lyr.layer.label for lyr in viewer.layers]: viewer.add_data(new_refdata) viewer.state.ref_data_helper.refresh() # set the new reference data in the viewer: viewer.state.reference_data = new_refdata # also update the viewer item's reference data label: viewer_ref = viewer.reference viewer_item = self._get_viewer_item(viewer_ref) viewer_item['reference_data_label'] = new_refdata.label self.hub.broadcast(ChangeRefDataMessage( new_refdata, viewer, viewer_id=viewer.reference, old=old_refdata, sender=self)) with delay_callback(viewer.state, 'x_min', 'x_max', 'y_min', 'y_max', 'zoom_center_x', 'zoom_center_y', 'zoom_radius'): if ( all('_WCS_ONLY' in refdata.meta for refdata in [old_refdata, new_refdata]) and viewer.shape is not None ): # adjust zoom to account for new refdata if both the # old and new refdata are WCS-only layers # (which also ensures zoom_level is already determined): fov_sky_final = viewer._get_fov() viewer.zoom( float(fov_sky_final / fov_sky_init) ) # only re-center the viewer if all data layers have WCS: has_wcs_per_data = [data_has_valid_wcs(d) for d in viewer.data()] if all(has_wcs_per_data): # re-center the viewer on previous location. viewer.center_on(sky_cen) def _link_new_data_by_component_type(self, new_data_label): new_data = self.data_collection[new_data_label] if (new_data.meta.get('_importer') in ('ImageImporter', 'CatalogImporter') and 'Orientation' in self._jdaviz_helper.plugins): # Orientation plugin alreadly listens for messages for added Data and handles linking # orientation_plugin._link_image_data() # NOTE: eventually we may only want to skip for pixel/sky coordinates and allow # flux, etc, to link to other data for custom histograms/scatter plots return new_links = [] for new_comp in new_data.components: if getattr(new_comp, '_component_type', None) in (None, 'unknown'): continue # Don't link flux to flux in cubes elif new_comp.label.lower() in ('flux', 'uncertainty', 'mask') and new_data.ndim == 3: continue found_match = False for existing_data in self.data_collection: if existing_data.label == new_data_label: continue for existing_comp in existing_data.components: if getattr(existing_comp, '_component_type', None) in (None, 'unknown'): continue # Create link if component-types match if new_comp._component_type == existing_comp._component_type: msg_text = f"Creating link {new_data.label}:{new_comp.label}({new_comp._component_type}) > {existing_data.label}:{existing_comp.label}({existing_comp._component_type})" # noqa msg = SnackbarMessage(text=msg_text, color='info', sender=self) self.hub.broadcast(msg) link = LinkSameWithUnits(new_comp, existing_comp) new_links.append(link) # only need one link for the new component, reparenting will handle # if that data entry is deleted found_match = True break # break out of existing_comp loop if found_match: break # break out of existing_data loop # Add all new links to the data collection if new_links: self.data_collection.add_link(new_links) def _link_new_data(self, reference_data=None, data_to_be_linked=None): """ When additional data is loaded, check to see if the spectral axis of any components are compatible with already loaded data. If so, link them so that they can be displayed on the same profile1D plot. """ if self.config in CONFIGS_WITH_LOADERS and self.config not in ['specviz2d', 'deconfigged']: # automatic linking based on component physical types handled by importers return elif not self.auto_link: return elif self.config == 'mosviz' and self.get_viewer('spectrum-viewer').state.reference_data: # Mosviz turns auto_link to False in order to batch # link the data after they have all been loaded. # It then reverts auto_link to True, which means that when # plugin data is added from mosviz, it can use the following line # to set reference data. reference_data = self.get_viewer('spectrum-viewer').state.reference_data.label dc = self.data_collection # This will need to be changed for cubeviz to support multiple cubes default_refdata_index = 0 if self.config == 'mosviz': # In Mosviz, first data is always MOS Table. Use the next data default_refdata_index = 1 elif self.config in ('cubeviz', 'deconfigged'): cube_data = None for data in dc: if 'spectral_axis_index' in data.meta and data.ndim == 3: cube_data = data spectral_axis_index = cube_data.meta['spectral_axis_index'] break ref_data = dc[reference_data] if reference_data else dc[default_refdata_index] linked_data = dc[data_to_be_linked] if data_to_be_linked else dc[-1] if self.config in ('cubeviz', 'deconfigged') and linked_data.ndim == 1 and cube_data is not None: # noqa # Don't want to use negative indices in case there are extra components like a mask ref_wavelength_component = cube_data.components[spectral_axis_index] # May need to update this for specutils 2 linked_wavelength_component = linked_data.components[1] dc.add_link(LinkSame(ref_wavelength_component, linked_wavelength_component)) return elif (linked_data.meta.get('plugin', None) == '3D Spectral Extraction' or (linked_data.meta.get('plugin', None) == 'Gaussian Smooth' and linked_data.ndim < 3 and # Cube linking requires special logic. See below ref_data.ndim < 3) ): links = [LinkSame(linked_data.components[0], ref_data.components[0]), LinkSame(linked_data.components[1], ref_data.components[1])] dc.add_link(links) return elif (ref_data.ndim == 2 and linked_data.ndim == 1): # Needed for subset linking between 1D and 2D viewers # Spectrum 1D: Pixel Axis 0 [x] <=> Spectrum 2D: Pixel Axis 1 [x] links = [LinkSameWithUnits(linked_data.components[0], ref_data.components[1])] dc.add_link(links) return # The glue-astronomy SpectralCoordinates currently seems incompatible with glue # WCSLink. This gets around it until there's an upstream fix. Also need to do this # for SpectralGWCS in 1D case (pixel linking below handles cubes) if (isinstance(linked_data.coords, SpectralCoordinates) or isinstance(linked_data.coords, SpectralGWCS) and linked_data.ndim == 1): wc_old = ref_data.world_component_ids[ref_data.meta['spectral_axis_index']] wc_new = linked_data.world_component_ids[linked_data.meta['spectral_axis_index']] self.data_collection.add_link(LinkSameWithUnits(wc_old, wc_new)) return # NOTE: if Cubeviz ever supports multiple cubes, we might want to reintroduce WCS-linking # but will likely need affine approximation for performance pc_ref = [str(id).split(" ")[-1][1] for id in ref_data.pixel_component_ids] pc_linked = [str(id).split(" ")[-1][1] for id in linked_data.pixel_component_ids] links = [] # This code loops through the returned locations of the x, y, and z # axes in the pixel_coordinate_ids of the reference data. It matches # the axes with the pixel_coordinate_ids of the linked data and links # those axes. There are special rules for linking 2D and 3D data, which # is as follows: 2D y to 3D z and 2D x to 3D y, and vice versa in the # case of moment map and collapse data because they need to be transposed. # See github comment: # https://github.com/spacetelescope/jdaviz/pull/1412#discussion_r907630682 len_linked_pixel = len(linked_data.pixel_component_ids) for ind, pixel_coord in enumerate(pc_ref): ref_index = ind if (len_linked_pixel == 2 and (linked_data.meta.get('plugin', None) in ['Moment Maps', 'Collapse', 'Sonify Data'])): if spectral_axis_index in (2, -1): link_to_x = 'z' elif spectral_axis_index == 0: link_to_x = 'x' if pixel_coord == link_to_x: linked_index = pc_linked.index('x') elif pixel_coord == 'y': linked_index = pc_linked.index('y') else: continue elif len_linked_pixel == 2: if pixel_coord == 'z': linked_index = pc_linked.index('y') elif pixel_coord == 'y': linked_index = pc_linked.index('x') else: continue elif pixel_coord in pc_linked: linked_index = pc_linked.index(pixel_coord) else: continue links.append(LinkSameWithUnits(ref_data.pixel_component_ids[ref_index], linked_data.pixel_component_ids[linked_index])) dc.add_link(links)
[docs] def load_data(self, file_obj, parser_reference=None, **kwargs): """ Provided a path to a data file, open and parse the data into the `~glue.core.data_collection.DataCollection` for this session. For some parsers, this also attempts to find WCS links that exist between data components. Parameters ---------- file_obj : str or file-like File object for the data to be loaded. parser_reference : str or `None` The actual data parser to use. It must already be registered to glue's data parser registry. This is mainly for internal use. **kwargs : dict Additional keywords to be passed into the parser defined by ``parser_reference``. """ self.loading = True try: if isinstance(file_obj, pathlib.Path): file_obj = str(file_obj) # attempt to get a data parser from the config settings parser = None data = self.state.settings.get('data', None) if parser_reference: parser = data_parser_registry.members.get(parser_reference) elif data and isinstance(data, dict): data_parser = data.get('parser', None) if data_parser: parser = data_parser_registry.members.get(data_parser) if parser is not None: parser(self, file_obj, **kwargs) else: self._application_handler.load_data(file_obj) except Exception: # Reset state on uncaught errors cfg_name = self.state.settings.get('configuration', 'unknown') if cfg_name in ('mosviz', ): # Add more as needed. self.data_collection.clear() raise finally: self.loading = False
[docs] def get_viewer(self, viewer_reference): """ Return a `~glue_jupyter.bqplot.common.BqplotBaseView` viewer instance. This is *not* an ``IPyWidget``. This is stored here because the state of the viewer and data methods that allow add/removing data to the viewer exist in a wrapper around the core ``IPyWidget``, which is needed to interact with the data rendered within a viewer. Notes ----- If viewer does not have a reference, it is going to try to look up the viewer using the given reference as ID. Parameters ---------- viewer_reference : str The reference to the viewer defined with the ``reference`` key in the YAML configuration file. Returns ------- viewer : `~glue_jupyter.bqplot.common.BqplotBaseView` The viewer class instance. """ return self._viewer_by_reference(viewer_reference, fallback_to_id=True)
[docs] def get_viewer_by_id(self, vid): """Like :meth:`get_viewer` but use ID directly instead of reference name. This is useful when reference name is `None`. """ return self._viewer_store.get(vid)
def _override_viewer_tools(self, callable, name): """ Override the default set of tools available in a viewer. Parameters ---------- callable : function A function that takes a viewer instance as its only argument and returns a list of tool instances to be used in the viewer. If `None` is returned, the default tools will be used (first three sections of the original tools, containing zoom/pan tools only). name : str The label to show in the viewer toolbar for the custom tool set. """ for viewer in self._viewer_store.values(): tools_nested, selected_tool = callable(viewer) if tools_nested is None: tools_nested = viewer.toolbar._original_tools_nested[:3] viewer.toolbar.override_tools(tools_nested, name) if selected_tool is not None: viewer.toolbar.active_tool_id = selected_tool def _get_wcs_from_subset(self, subset_state, data=None): """ Usually WCS is subset.parent.coords, except special cubeviz case.""" if data is None: parent_data = self.data_collection[subset_state.attributes[0].parent.label] else: parent_data = self.data_collection[data] # If 3D spectral coords, extract celestial WCS if (getattr(parent_data.coords, 'world_n_dim', None) == 3 and _get_celestial_wcs(parent_data.coords)): return _get_celestial_wcs(parent_data.coords) # If 2D coords, return as is elif getattr(parent_data.coords, 'world_n_dim', None) == 2: return parent_data.coords # If _orig_spatial_wcs is stored, use that (cubeviz case) elif parent_data.meta.get("_orig_spatial_wcs"): return parent_data.meta.get("_orig_spatial_wcs") else: return None
[docs] def get_subsets(self, subset_name=None, spectral_only=False, spatial_only=False, object_only=False, simplify_spectral=True, use_display_units=False, include_sky_region=False, wrt_data=None): """ Returns all branches of glue subset tree in the form that subset plugin can recognize. Parameters ---------- subset_name : str The subset name. spectral_only : bool Return only spectral subsets. spatial_only : bool Return only spatial subsets, except masked subsets (uncommon). object_only : bool Return only object relevant information and leave out the region class name and glue_state. simplify_spectral : bool Return a composite spectral subset collapsed to a simplified SpectralRegion. use_display_units : bool, optional Whether to convert to the display units defined in the :ref:`Unit Conversion <unit-conversion>` plugin. include_sky_region : bool If True, for spatial subsets that have a WCS associated with their parent data, return a sky region in addition to pixel region. If subset is composite, a sky region for each constituent subset will be returned. wrt_data : str or None Name of data to use for applying WCS to subset when returning as a sky region object. Returns ------- data : dict Returns a nested dictionary with, for each subset, 'name', 'glue_state', 'region', 'sky_region' (set to None if not applicable), and 'subset_state'. If subset is composite, each constituant subset will be included individually. """ dc = self.data_collection subsets = dc.subset_groups all_subsets = {} # If we only want one subset, no need to loop through them all if subset_name not in (None, ""): if isinstance(subset_name, str): subsets = [subset for subset in subsets if subset.label == subset_name] if subsets == []: all_labels = sorted(sg.label for sg in dc.subset_groups) raise ValueError(f"{subset_name} not in {all_labels}") else: raise ValueError("subset_name must be a string.") for subset in subsets: label = subset.label if isinstance(subset.subset_state, CompositeSubsetState): # Region composed of multiple ROI or Range subset # objects that must be traversed subset_region = self.get_sub_regions(subset.subset_state, simplify_spectral, use_display_units, get_sky_regions=include_sky_region, wrt_data=wrt_data) elif isinstance(subset.subset_state, RoiSubsetState): subset_region = self._get_roi_subset_definition(subset.subset_state, to_sky=include_sky_region, wrt_data=wrt_data) elif isinstance(subset.subset_state, RangeSubsetState): # 2D regions represented as SpectralRegion objects subset_region = self._get_range_subset_bounds(subset.subset_state, simplify_spectral, use_display_units) elif isinstance(subset.subset_state, MultiMaskSubsetState): subset_region = self._get_multi_mask_subset_definition(subset.subset_state) else: # subset.subset_state can be an instance of something else # we do not know how to handle yet all_subsets[label] = [{"name": subset.subset_state.__class__.__name__, "glue_state": subset.subset_state.__class__.__name__, "region": None, "sky_region": None, "subset_state": subset.subset_state}] continue # Is the subset spectral, spatial, temporal? is_spectral = self._is_subset_spectral(subset_region) is_temporal = self._is_subset_temporal(subset_region) # Remove duplicate spectral regions if is_spectral and isinstance(subset_region, SpectralRegion): subset_region = self._remove_duplicate_bounds(subset_region) if spectral_only and is_spectral: if object_only and not simplify_spectral: all_subsets[label] = [reg['region'] for reg in subset_region] else: all_subsets[label] = subset_region elif spatial_only and not is_spectral: if object_only: all_subsets[label] = [reg['region'] for reg in subset_region] else: all_subsets[label] = subset_region elif not spectral_only and not spatial_only: # General else statement if no type was specified if object_only and not isinstance(subset_region, SpectralRegion): all_subsets[label] = [reg['region'] for reg in subset_region] else: all_subsets[label] = subset_region if not (spectral_only or spatial_only) and is_temporal: if object_only: all_subsets[label] = [reg['region'] for reg in subset_region] else: all_subsets[label] = subset_region if subset_name: return all_subsets[subset_name] else: return all_subsets
def _is_subset_spectral(self, subset_region): if isinstance(subset_region, SpectralRegion): return True elif isinstance(subset_region, list) and len(subset_region) > 0: if isinstance(subset_region[0]['region'], SpectralRegion): return True return False def _is_subset_temporal(self, subset_region): if isinstance(subset_region, Time): return True elif isinstance(subset_region, list): if isinstance(subset_region[0]['region'], Time): return True return False def _remove_duplicate_bounds(self, spec_regions): regions_no_dups = None for region in spec_regions: if not regions_no_dups: regions_no_dups = region elif region.bounds not in regions_no_dups.subregions: regions_no_dups += region return regions_no_dups def _get_range_subset_bounds(self, subset_state, simplify_spectral=True, use_display_units=False): # TODO: Use global display units # units = dc[0].data.coords.spectral_axis.unit if not hasattr(subset_state.att, "parent"): # e.g., Cubeviz viewer = self.get_viewer(self._jdaviz_helper._default_spectrum_viewer_reference_name) data = viewer.data() if data and len(data) > 0 and isinstance(data[0], Spectrum): units = data[0].spectral_axis.unit else: raise ValueError("Unable to find spectral axis units") else: data = subset_state.att.parent ndim = data.get_component("flux").ndim if ndim == 2: units = u.pix else: handler, _ = data_translator.get_handler_for(Spectrum) spec = handler.to_object(data) units = spec.spectral_axis.unit if use_display_units and units != u.pix: # converting may result in flipping order (wavelength <-> frequency) ret_units = self._get_display_unit('spectral') subset_bounds = [(subset_state.lo * units).to(ret_units, u.spectral()), (subset_state.hi * units).to(ret_units, u.spectral())] spec_region = SpectralRegion(min(subset_bounds), max(subset_bounds)) else: spec_region = SpectralRegion(subset_state.lo * units, subset_state.hi * units) if not simplify_spectral: return [{"name": subset_state.__class__.__name__, "glue_state": subset_state.__class__.__name__, "region": spec_region, "sky_region": None, # not implemented for spectral, include for consistancy "subset_state": subset_state}] return spec_region def _get_multi_mask_subset_definition(self, subset_state): return [{"name": subset_state.__class__.__name__, "glue_state": subset_state.__class__.__name__, "region": subset_state.total_masked_first_data(), "sky_region": None, "subset_state": subset_state}] def _get_roi_subset_definition(self, subset_state, to_sky=False, wrt_data=None): wcs = None # If SkyRegion is explicitly requested or the user has specified something for # wrt_data, this will be true if to_sky or wrt_data is not None: wcs = self._get_wcs_from_subset(subset_state, data=wrt_data) # if no spatial wcs on subset, we have to skip computing sky region for this subset # but want to do so without raising an error (since many subsets could be requested) roi_as_sky_region = None if wcs is not None: if self.config == 'cubeviz': # the value for wcs returned above will be in 2D dimensions, which is needed # for this translation to work roi_as_sky_region = roi_subset_state_to_region(subset_state).to_sky(wcs) else: roi_as_sky_region = roi_subset_state_to_region(subset_state, to_sky=True) roi_as_region = roi_as_sky_region.to_pixel(wcs) else: # pixel region roi_as_region = roi_subset_state_to_region(subset_state) return [{"name": subset_state.roi.__class__.__name__, "glue_state": subset_state.__class__.__name__, "region": roi_as_region, "sky_region": roi_as_sky_region, "subset_state": subset_state}]
[docs] def get_sub_regions(self, subset_state, simplify_spectral=True, use_display_units=False, get_sky_regions=False, wrt_data=None): if isinstance(subset_state, CompositeSubsetState): if subset_state and hasattr(subset_state, "state2") and subset_state.state2: one = self.get_sub_regions(subset_state.state1, simplify_spectral, use_display_units, get_sky_regions=get_sky_regions, wrt_data=wrt_data) two = self.get_sub_regions(subset_state.state2, simplify_spectral, use_display_units, get_sky_regions=get_sky_regions, wrt_data=wrt_data) if simplify_spectral and isinstance(two, SpectralRegion): merge_func = self._merge_overlapping_spectral_regions_worker else: def merge_func(spectral_region): # noop return spectral_region if isinstance(one, list) and "glue_state" in one[0]: one[0]["glue_state"] = subset_state.__class__.__name__ if (isinstance(one, list) and isinstance(one[0]["subset_state"], MultiMaskSubsetState) and simplify_spectral): return two elif (isinstance(two, list) and isinstance(two[0]["subset_state"], MultiMaskSubsetState) and simplify_spectral): return one if isinstance(subset_state.state2, InvertState): # This covers the REMOVE subset mode # As an example for how this works: # a = SpectralRegion(4 * u.um, 7 * u.um) + SpectralRegion(9 * u.um, 11 * u.um) # b = SpectralRegion(5 * u.um, 6 * u.um) # After running the following code with a as one and b as two: # Spectral Region, 3 sub-regions: # (4.0 um, 5.0 um) (6.0 um, 7.0 um) (9.0 um, 11.0 um) if isinstance(two, SpectralRegion): new_spec = None for sub in one: if not new_spec: new_spec = two.invert(sub.lower, sub.upper) else: new_spec += two.invert(sub.lower, sub.upper) return new_spec else: if isinstance(two, list): two[0]['glue_state'] = "AndNotState" return merge_func(one + two) elif subset_state.op is operator.and_: # This covers the AND subset mode # Example of how this works with "one" being the AND region # and "two" being two Range subsets connected by an OR state: # one = SpectralRegion(4.5 * u.um, 7.5 * u.um) # two = SpectralRegion(4 * u.um, 5 * u.um) + SpectralRegion(7 * u.um, 8 * u.um) # # oppo = two.invert(one.lower, one.upper) # Spectral Region, 1 sub-regions: # (5.0 um, 7.0 um) # # oppo.invert(one.lower, one.upper) # Spectral Region, 2 sub-regions: # (4.5 um, 5.0 um) (7.0 um, 7.5 um) if isinstance(two, SpectralRegion): # Taking an AND state of an empty region is allowed # but there is no way for SpectralRegion to display that information. # Instead, we raise a ValueError if one.upper.value < two.lower.value or one.lower.value > two.upper.value: raise ValueError("AND mode should overlap with existing subset") oppo = two.invert(one.lower, one.upper) return oppo.invert(one.lower, one.upper) else: return merge_func(two + one) elif subset_state.op is operator.or_: # This covers the ADD subset mode # one + two works for both Range and ROI subsets if one and two: return merge_func(two + one) elif one: return one elif two: return two elif subset_state.op is operator.xor: # This covers the XOR subset mode # Example of how this works, with "one" acting # as the XOR region and "two" as two ranges joined # by an OR: # a = SpectralRegion(4 * u.um, 5 * u.um) # b = SpectralRegion(6 * u.um, 9 * u.um) # # one = SpectralRegion(4.5 * u.um, 12 * u.um) # two = a + b # two.invert(one.lower, one.upper) # Spectral Region, 2 sub-regions: # (5.0 um, 6.0 um) (9.0 um, 12.0 um) # one.invert(two.lower, two.upper) # Spectral Region, 1 sub-regions: # (4.0 um, 4.5 um) # two.invert(one.lower, one.upper) + one.invert(two.lower, two.upper) # Spectral Region, 3 sub-regions: # (4.0 um, 4.5 um) (5.0 um, 6.0 um) (9.0 um, 12.0 um) if isinstance(two, SpectralRegion): # This is the main application of XOR to other regions if one.lower > two.lower and one.upper < two.upper: if len(two) < 2: inverted_region = one.invert(two.lower, two.upper) else: two_2 = None for subregion in two: temp_region = None # No overlap if subregion.lower > one.upper or subregion.upper < one.lower: continue temp_lo = max(subregion.lower, one.lower) temp_hi = min(subregion.upper, one.upper) temp_region = SpectralRegion(temp_lo, temp_hi) if two_2: two_2 += temp_region else: two_2 = temp_region inverted_region = two_2.invert(one.lower, one.upper) else: inverted_region = two.invert(one.lower, one.upper) new_region = None for subregion in two: temp_region = None # Add all subregions that do not intersect with XOR region # to a new SpectralRegion object if subregion.lower > one.upper or subregion.upper < one.lower: temp_region = subregion # Partial overlap elif subregion.lower < one.lower and subregion.upper < one.upper: temp_region = SpectralRegion(subregion.lower, one.lower) elif subregion.upper > one.upper and subregion.lower > one.lower: temp_region = SpectralRegion(one.upper, subregion.upper) if not temp_region: continue if new_region: new_region += temp_region else: new_region = temp_region # This adds the edge regions that are otherwise not included if new_region: return merge_func(new_region + inverted_region) return inverted_region else: return merge_func(two + one) else: # This gets triggered in the InvertState case where state1 # is an object and state2 is None return self.get_sub_regions(subset_state.state1, simplify_spectral, use_display_units, get_sky_regions=get_sky_regions, wrt_data=wrt_data) elif subset_state is not None: # This is the leaf node of the glue subset state tree where # a subset_state is either ROI, Range, or MultiMask. if isinstance(subset_state, RoiSubsetState): return self._get_roi_subset_definition(subset_state, to_sky=get_sky_regions, wrt_data=wrt_data) elif isinstance(subset_state, RangeSubsetState): return self._get_range_subset_bounds(subset_state, simplify_spectral, use_display_units) elif isinstance(subset_state, MultiMaskSubsetState): return self._get_multi_mask_subset_definition(subset_state)
[docs] def delete_subsets(self, subset_labels=None): """ Delete all or specified subsets in app. This method removes subsets based on the provided ``subset_labels`` (a single subset label or multiple labels). If ``subset_labels`` is None (default), all subsets will be removed. Parameters ---------- subset_labels : str or list of str or None The label(s) of the subsets to delete. If None, all subsets will be removed. """ # delete all subsets if subset_labels is None: for subset_group in self.data_collection.subset_groups: self.data_collection.remove_subset_group(subset_group) else: if isinstance(subset_labels, str): subset_labels = [subset_labels] labels = np.asarray(subset_labels) sg = self.data_collection.subset_groups subset_grp_labels = {s.label: s for s in sg} # raise ValueError if any subset in 'subset_labels' isn't actually # in the data collection, before deleting subsets invalid_labels = ~np.isin(labels, list(subset_grp_labels.keys())) if np.any(invalid_labels): bad = ', '.join([x for x in labels[invalid_labels]]) raise ValueError(f'{bad} not in data collection, can not delete.') else: for label in labels: self.data_collection.remove_subset_group(subset_grp_labels[label])
def _get_display_unit(self, axis): if self._jdaviz_helper is None: # cannot access either the plugin or the spectrum viewer. # Plugins that access the unit at this point will need to # detect that they are set to unitless and attempt again later. return '' try: uc = self.get_tray_item_from_name('g-unit-conversion') # even if not relevant except KeyError: # UC plugin not available (should only be for: imviz, mosviz) uc = None if uc is None: # noqa # fallback on native units (unit conversion is not enabled) if hasattr(self._jdaviz_helper, '_default_spectrum_viewer_reference_name'): viewer_name = self._jdaviz_helper._default_spectrum_viewer_reference_name elif hasattr(self._jdaviz_helper, '_default_spectrum_2d_viewer_reference_name'): viewer_name = self._jdaviz_helper._default_spectrum_2d_viewer_reference_name elif axis == 'temporal': # No unit for ramp's time (group/resultant) axis: return None else: raise NotImplementedError("No default viewer reference found.") if axis == 'spectral': sv = self.get_viewer(viewer_name) return sv.data()[0].spectral_axis.unit elif axis in ('flux', 'sb', 'spectral_y'): sv = self.get_viewer(viewer_name) sv_y_unit = sv.data()[0].flux.unit if axis == 'spectral_y': return sv_y_unit # since this is where we're falling back on native units (UC plugin might not exist) # first check the spectrum viewer y axis for any solid angle unit (i think that it # will ALWAYS be in flux, but just to be sure). If no solid angle unit is found, # check the flux viewer for surface brightness units sv_y_angle_unit = check_if_unit_is_per_solid_angle(sv_y_unit, return_unit=True) # check flux viewer if none in spectral viewer fv_angle_unit = None if not sv_y_angle_unit: if hasattr(self._jdaviz_helper, '_default_flux_viewer_reference_name'): vname = self._jdaviz_helper._default_flux_viewer_reference_name fv = self.get_viewer(vname) fv_unit = fv.data()[0].get_object().flux.unit fv_angle_unit = check_if_unit_is_per_solid_angle(fv_unit, return_unit=True) else: # mosviz, not sure what to do here but can't access flux # viewer the same way. once we force the UC plugin to # exist this won't matter anyway because units can be # acessed from the plugin directly. assume u.sr for now fv_angle_unit = u.sr solid_angle_unit = sv_y_angle_unit or fv_angle_unit if axis == 'flux': if sv_y_angle_unit: return sv_y_unit * solid_angle_unit return sv_y_unit elif axis == 'sb': if sv_y_angle_unit: return sv_y_unit return sv_y_unit / solid_angle_unit else: raise ValueError(f"could not find units for axis='{axis}'") if axis == 'spectral_y': return uc.spectral_y_unit try: return getattr(uc, f'{axis}_unit_selected') except AttributeError: raise ValueError(f"could not find display unit for axis='{axis}'")
[docs] def simplify_spectral_subset(self, subset_name, att, overwrite=False): """ Convert a composite spectral subset consisting of And, AndNot, Or, Replace, and Xor into one consisting of just Range and Or state objects. Parameters ---------- subset_name : str Name of subset to simplify. att : str Attribute that the subset uses to apply to data. overwrite : bool Whether to update the current subset with the simplified state or apply it to a new subset. """ if self.is_there_overlap_spectral_subset(subset_name): new_state = self.merge_overlapping_spectral_regions(subset_name, att) else: new_state = None spectral_region = self.get_subsets(subset_name, spectral_only=True) # Reverse through sub regions so that they are added back # in the order of lowest values to highest for index in range(len(spectral_region) - 1, -1, -1): convert_to_range = RangeSubsetState(spectral_region[index].lower.value, spectral_region[index].upper.value, att) if new_state is None: new_state = convert_to_range else: new_state = new_state | convert_to_range dc = self.data_collection if not overwrite: dc.new_subset_group(subset_state=new_state) else: old_subset = [subsets for subsets in dc.subset_groups if subsets.label == subset_name][0] old_subset.subset_state = new_state
[docs] def is_there_overlap_spectral_subset(self, subset_name): """ Returns True if the spectral subset with subset_name has overlapping subregions. """ spectral_region = self.get_subsets(subset_name, simplify_spectral=False, spectral_only=True) n_reg = len(spectral_region) if not spectral_region or n_reg < 2: return False for index in range(n_reg - 1): if spectral_region[index]['region'].upper.value >= spectral_region[index + 1]['region'].lower.value: # noqa: E501 return True return False
@staticmethod def _merge_overlapping_spectral_regions_worker(spectral_region): if len(spectral_region) < 2: # noop return spectral_region merged_regions = spectral_region[0] # Instantiate merged regions for cur_reg in spectral_region[1:]: # If the lower value of the current subregion is less than or equal to the upper # value of the last subregion added to merged_regions, update last region in # merged_regions with the max upper value between the two regions. if cur_reg.lower <= merged_regions.upper: merged_regions._subregions[-1] = ( merged_regions._subregions[-1][0], max(cur_reg.upper, merged_regions._subregions[-1][1])) else: merged_regions += cur_reg return merged_regions
[docs] def merge_overlapping_spectral_regions(self, subset_name, att): """ Takes a spectral subset with subset_name and returns an ``OrState`` object that merges all overlapping subregions. Parameters ---------- subset_name : str Name of subset to simplify. att : str Attribute that the subset uses to apply to data. """ merged_regions = self.get_subsets(subset_name, spectral_only=True) new_state = None for region in reversed(merged_regions): convert_to_range = RangeSubsetState(region.lower.value, region.upper.value, att) if new_state: new_state = new_state | convert_to_range else: new_state = convert_to_range return new_state
[docs] def add_data(self, data, data_label=None, notify_done=True, parent=None): """ Add data to the Glue ``DataCollection``. Parameters ---------- data : any Data to be stored in the ``DataCollection``. This must either be a `~glue.core.data.Data` instance, or an arbitrary data instance for which there exists data translation functions in the glue astronomy repository. data_label : str, optional The name associated with this data. If none is given, label is pulled from the input data (if `~glue.core.data.Data`) or a generic name is generated. notify_done : bool Flag controlling whether a snackbar message is set when the data is added to the app. Set to False to avoid overwhelming the user if lots of data is getting loaded at once. parent : str, optional Associate the added Data entry as the child of layer ``parent``. """ if not data_label and hasattr(data, "label"): data_label = data.label data_label = self.return_unique_name(data_label) if data_label in self.data_collection.labels: warnings.warn(f"Overwriting existing data entry with label '{data_label}'") self.data_collection[data_label] = data # manage associated Data entries: self._add_assoc_data_as_parent(data_label) if parent is not None: data_collection_labels = [data.label for data in self.data_collection] if parent not in data_collection_labels: raise ValueError(f'parent "{parent}" is not a valid data label in ' f'the data collection: {data_collection_labels}.') # Does the parent Data have a parent? If so, raise error: parent_of_parent = self._get_assoc_data_parent(parent) if parent_of_parent is not None: raise NotImplementedError('Data associations are currently supported ' 'between root layers (without parents) and their ' f'children, but the proposed parent "{parent}" has ' f'parent "{parent_of_parent}".') self._set_assoc_data_as_child(data_label, new_parent_label=parent) # Send out a toast message if notify_done: snackbar_message = SnackbarMessage( f"Data '{data_label}' successfully added.", sender=self, color="success") self.hub.broadcast(snackbar_message)
def _update_layer_icons(self, old_label, new_label): """ Update layer icons dictionary when renaming an item. Parameters ---------- old_label : str The old label of the item. new_label : str The new label of the item. """ if old_label in self.state.layer_icons: self.state.layer_icons[new_label] = self.state.layer_icons[old_label] _ = self.state.layer_icons.pop(old_label) # Update _data_associations to maintain parent-child relationships if old_label in self._data_associations: # Move the association entry to the new label self._data_associations[new_label] = self._data_associations.pop(old_label) # Update parent references in children children = self._data_associations[new_label].get('children', []) for child_label in children: if child_label in self._data_associations: self._data_associations[child_label]['parent'] = new_label # Check if this is a child being renamed - update parent's children list for data_label, assoc_data in self._data_associations.items(): if old_label in assoc_data.get('children', []): # Replace old label with new label in children list children = assoc_data['children'] children[children.index(old_label)] = new_label break def _update_data_items_list(self, old_label, new_label): """ Update state.data_items list when renaming an item. Parameters ---------- old_label : str The old label of the item. new_label : str The new label of the item. """ for data_item in self.state.data_items: if data_item['name'] == old_label: data_item['name'] = new_label break def _update_live_plugin_results_metadata(self, data, old_label, new_label, subscription_type): """ Update live plugin results metadata when renaming an item. Parameters ---------- data : glue.core.Data The data object containing metadata to update. old_label : str The old label of the item. new_label : str The new label of the item. subscription_type : str The type of subscription to update ('data' or 'subset'). Returns ------- modified : bool True if metadata was modified, False otherwise. """ if not (hasattr(data, 'meta') and '_update_live_plugin_results' in data.meta): return False results_dict = data.meta['_update_live_plugin_results'] # Get list of attribute names that reference the item attrs = results_dict.get('_subscriptions', {}).get(subscription_type, []) # Update any reference attributes that match old_label modified = False for attr_name in attrs: if results_dict.get(attr_name) == old_label: results_dict[attr_name] = new_label modified = True # Update add_results label if it exists and matches # This ensures that auto-update tracking continues after data renaming if 'add_results' in results_dict and isinstance(results_dict['add_results'], dict): if results_dict['add_results'].get('label') == old_label: results_dict['add_results']['label'] = new_label modified = True # Note: We modify the dict in place, which is sufficient for the metadata # to be updated. No need to reassign to data.meta return modified
[docs] def check_rename_availability(self, old_label, new_label, is_subset=False): """ Check if new label already exists in reserved labels. Used by front-end for dynamic user warning. """ # Always check that the new label doesn't already exist in reserved labels # (unless we're just keeping the same name) if new_label is None: return if new_label != old_label and new_label.strip() in self._reserved_labels: msg = (f'Cannot rename to {new_label}: ' 'name already exists in data collection or subsets or is unavailable.') raise ValueError(msg) # When renaming a subset, also perform subset-specific validation if is_subset: self._check_valid_subset_label(new_label)
[docs] def rename_data(self, old_label, new_label): """ Rename data in the data collection and update all references. Parameters ---------- old_label : str The current label of the data to rename. new_label : str The new label for the data. """ self.check_rename_availability(old_label, new_label) data = self.data_collection[old_label] # Set flag to indicate rename is in progress # This allows handlers to skip processing during renames self._renaming_data = True # Rename the data object # Note: _rename_single_data will reset _renaming_data at the end # to ensure callbacks have access to the flag during processing self._rename_single_data(old_label, new_label, data)
def _rename_single_data(self, old_label, new_label, data): """ Rename a single data entry. This is a helper method used by rename_data. Parameters ---------- old_label : str The current label of the data to rename. new_label : str The new label for the data. data : glue.core.Data The data object to rename. """ try: # Update the data label in the data object itself data.label = new_label # Update state.data_items self._update_data_items_list(old_label, new_label) # Update live plugin results if metadata exists self._update_live_plugin_results_metadata(data, old_label, new_label, 'data') # Update metadata in OTHER data that subscribe to the renamed data for d in self.data_collection: if d is data: continue self._update_live_plugin_results_metadata(d, old_label, new_label, 'data') # Clear cached references to old label self._clear_object_cache(old_label) # Update reserved labels if old_label in self._reserved_labels: self._reserved_labels.remove(old_label) self._reserved_labels.add(new_label) # Broadcast the message BEFORE updating layer_icons # This is critical: layer_icons has callbacks that trigger # _update_items() in DatasetSelect/LayerSelect. If we # broadcast DataRenamedMessage first, the handlers can # update their 'selected' trait values before _update_items() # runs. This prevents _apply_default_selection from changing # 'selected' and triggering observers that would cause # plugins to re-process data (like auto-extraction). self.hub.broadcast(DataRenamedMessage(data, old_label, new_label, sender=self)) # Now update layer icons - callbacks will fire but selected # values are already updated self._update_layer_icons(old_label, new_label) # Update viewer layer states and reference data for viewer_id, viewer in self._viewer_store.items(): # Update reference data if it matches old label if (hasattr(viewer.state, 'reference_data') and viewer.state.reference_data is not None and viewer.state.reference_data.label == old_label): viewer.state.reference_data = data finally: # Reset the renaming flag after all callbacks have been processed # This ensures that _apply_default_selection in plugin components # will skip resetting the selected value during the rename process self._renaming_data = False
[docs] def return_data_label(self, loaded_object, ext=None, alt_name=None, check_unique=True): """ Returns a unique data label that can be safely used to load data into data collection. Parameters ---------- loaded_object : str or object The path to a data file or FITS HDUList or image object or Spectrum or NDData array or numpy.ndarray. ext : str, optional The extension (or other distinguishing feature) of data used to identify it. For example, "filename[FLUX]" where "FLUX" is the ext value. alt_name : str, optional Alternate names that can be used if none of the options provided are valid. check_unique : bool Included so that this method can be used with data label retrieval in addition to generation. Returns ------- data_label : str A unique data label that at its root is either given by the user at load time or created by Jdaviz using a description of the loaded filetype. """ data_label = None if loaded_object is None: pass elif isinstance(loaded_object, str): # This is either the full file path or the basename or the user input data label if loaded_object.lower().endswith(('.jpg', '.jpeg', '.png')): file_format = loaded_object.lower().split(".")[-1] loaded_object = os.path.splitext(os.path.basename(loaded_object))[0] data_label = f"{loaded_object}[{file_format}]" elif os.path.isfile(loaded_object) or os.path.isdir(loaded_object): # File that is not an image file data_label = os.path.splitext(os.path.basename(loaded_object))[0] else: # Not a file path so assumed to be a data label data_label = loaded_object elif isinstance(loaded_object, fits.hdu.hdulist.HDUList): if hasattr(loaded_object, 'file_name'): data_label = f"{loaded_object.file_name}[HDU object]" else: data_label = "Unknown HDU object" elif isinstance(loaded_object, Spectrum): data_label = "Spectrum" elif isinstance(loaded_object, NDData): data_label = "NDData" elif isinstance(loaded_object, np.ndarray): data_label = "Array" if data_label is None and alt_name is not None and isinstance(alt_name, str): data_label = alt_name elif data_label is None: data_label = "Unknown" if check_unique: data_label = self.return_unique_name(data_label, ext=ext) return data_label
[docs] def return_unique_name(self, label, typ='data', ext=None): if typ == 'data': exist_labels = self.data_collection.labels elif typ == 'viewer': exist_labels = list(self._viewer_store.keys()) elif isinstance(typ, list): exist_labels = typ else: raise ValueError("typ must be either 'data', 'viewer', or a list of strings") if label is None: label = "Unknown" # This regex checks for any length of characters that end # with a space followed by parenthesis with a number inside. # If there is a match, the space and parenthesis section will be # removed so that the remainder of the label can be checked # against the label. check_if_dup = re.compile(r"(.*)(\s\(\d*\))$") number_of_duplicates = 0 max_number = 0 for exist_label in exist_labels: # If label is a duplicate of another label if re.fullmatch(check_if_dup, exist_label): exist_label_split = exist_label.split(" ") exist_label_without_dup = " ".join(exist_label_split[:-1]) exist_label = exist_label_without_dup # Remove parentheses and cast to float number_dup = int(exist_label_split[-1][1:-1]) # Used to keep track the max number of duplicates, # even if not all duplicates are loaded (or some # are renamed) if number_dup > max_number: max_number = number_dup if ext and f"{label}[{ext}]" == exist_label: number_of_duplicates += 1 elif ext is None and label == exist_label: number_of_duplicates += 1 if ext: label = f"{label}[{ext}]" if number_of_duplicates > 0: label = f"{label} ({number_of_duplicates})" # It is possible to add data named "test (1)" and then # add another data named "test" and return_unique_name will see the # first test and assume the second is the duplicate, appending # "(1)" to the end. This overwrites the original data and # causes issues. This block alters the duplicate number to be something unique # (one more than the max number duplicate found) # if a duplicate is still found in data_collection. if label in exist_labels: label_split = label.split(" ") label_without_dup = " ".join(label_split[:-1]) label = f"{label_without_dup} ({max_number + 1})" return label
[docs] def add_data_to_viewer(self, viewer_reference, data_label, visible=True, clear_other_data=False): """ Plots a data set from the data collection in the specific viewer. Parameters ---------- viewer_reference : str The reference to the viewer defined with the ``reference`` key in the yaml configuration file. data_label : str The Glue data label found in the ``DataCollection``. visible : bool Whether the layer should be initialized as visibile. clear_other_data : bool Removes all other currently plotted data and only shows the newly defined data set. """ if ('mosviz_row' in self.state.settings and not (self.get_viewer( self._jdaviz_helper._default_table_viewer_reference_name ).row_selection_in_progress) and self.data_collection[data_label].meta['mosviz_row'] != self.state.settings['mosviz_row']): raise NotImplementedError("Intra-row plotting not supported. " "Please use the table viewer to change rows") viewer_item = self._get_viewer_item(viewer_reference) if viewer_item is None: raise ValueError(f"Could not identify viewer with reference {viewer_reference}") self.set_data_visibility(viewer_item, data_label, visible=visible, replace=clear_other_data)
[docs] def remove_data_from_viewer(self, viewer_reference, data_label): """ Removes a data set from the specified viewer. Parameters ---------- viewer_reference : str The reference to the viewer defined with the ``reference`` key in the yaml configuration file. data_label : str The Glue data label found in the ``DataCollection``. """ viewer_item = self._get_viewer_item(viewer_reference) viewer_id = viewer_item['id'] viewer = self.get_viewer_by_id(viewer_id) [data] = [x for x in self.data_collection if x.label == data_label] viewer.remove_data(data) viewer._layers_with_defaults_applied = [layer_info for layer_info in getattr(viewer, '_layers_with_defaults_applied', []) # noqa if layer_info['data_label'] != data.label] remove_data_message = RemoveDataMessage(data, viewer, viewer_id=viewer_id, sender=self) self.hub.broadcast(remove_data_message)
def _data_id_from_label(self, label): """ Retrieve the data item given the Glue ``DataCollection`` data label. Parameters ---------- label : str Label associated with the ``DataCollection`` item. This is also the name shown in the data list. Returns ------- dict The data item dictionary containing the id and name of the given data set. """ for data_item in self.state.data_items: if data_item['name'] == label: return data_item['id']
[docs] def get_viewer_ids(self, prefix=None): """Return a list of available viewer IDs. Parameters ---------- prefix : str or `None` If not `None`, only return viewer IDs with given prefix (case-sensitive). Otherwise, all viewer IDs are returned. Returns ------- vids : list of str Sorted list of viewer IDs. """ all_keys = sorted(self._viewer_store.keys()) if isinstance(prefix, str): vids = [k for k in all_keys if k.startswith(prefix)] else: vids = all_keys return vids
[docs] def get_viewer_reference_names(self): """Return a list of available viewer reference names.""" # Cannot sort because of None return [self._viewer_item_by_id(vid).get('reference') for vid in self._viewer_store]
[docs] def get_viewers_of_cls(self, cls): """Return a list of viewers of a specific class.""" if isinstance(cls, str): cls_name = cls else: cls_name = cls.__name__ return [viewer for viewer in self._viewer_store.values() if viewer.__class__.__name__ == cls_name]
def _update_viewer_reference_name( self, old_reference, new_reference, update_id=False ): """ Update viewer reference names. Viewer IDs will not be changed unless ``update_id`` is True. Parameters ---------- old_reference : str The viewer reference name to be changed new_reference : str The viewer reference name to use instead of ``old_reference`` update_id : bool, optional If True, update the viewer IDs as well as the viewer reference names. """ if old_reference == new_reference: # no-op return # ensure new reference is a string: new_reference = str(new_reference) viewer_reference_names = self.get_viewer_reference_names() if new_reference in viewer_reference_names: raise ValueError(f"Viewer with reference='{new_reference}' already exists.") if old_reference == 'imviz-0': raise ValueError(f"The default Imviz viewer reference " f"'{old_reference}' cannot be changed.") if old_reference not in viewer_reference_names: raise ValueError(f"Viewer with reference='{old_reference}' does not exist.") # update the viewer item's reference name viewer_item = self._get_viewer_item(old_reference) viewer_item['reference'] = new_reference if viewer_item['name'] == old_reference: viewer_item['name'] = new_reference # optionally update the viewer IDs: if update_id: old_id = viewer_item['id'] viewer_item['id'] = new_reference self._viewer_store[new_reference] = self._viewer_store.pop(old_id) self._viewer_store[new_reference]._reference_id = new_reference self.state.viewer_icons[new_reference] = self.state.viewer_icons.pop(old_id) # update the viewer name attributes on the helper: old_viewer_ref_attrs = [ attr for attr in dir(self._jdaviz_helper) if attr.startswith('_default_') and getattr(self._jdaviz_helper, attr) == old_reference ] if old_viewer_ref_attrs: # if there is an attr to update, update it: setattr(self._jdaviz_helper, old_viewer_ref_attrs[0], new_reference) self.hub.broadcast(ViewerRenamedMessage(old_reference, new_reference, sender=self)) def _get_first_viewer_reference_name( self, require_no_selected_data=False, require_spectrum_viewer=False, require_spectrum_2d_viewer=False, require_table_viewer=False, require_flux_viewer=False, require_image_viewer=False, require_profile_viewer=False, ): """ Return the viewer reference name of the first available viewer. Optionally use ``require_no_selected_data`` to require that the viewer has not yet loaded data, or e.g. ``require_spectrum_viewer`` to require that the viewer supports spectrum visualization. """ from jdaviz.configs.imviz.plugins.viewers import ImvizImageView from jdaviz.configs.specviz.plugins.viewers import Spectrum1DViewer, Spectrum2DViewer from jdaviz.configs.cubeviz.plugins.viewers import CubevizProfileView, CubevizImageView from jdaviz.configs.mosviz.plugins.viewers import ( MosvizTableViewer, MosvizProfile2DView ) from jdaviz.configs.rampviz.plugins.viewers import ( RampvizImageView, RampvizProfileView ) spectral_viewers = (Spectrum1DViewer, CubevizProfileView) spectral_2d_viewers = (Spectrum2DViewer, MosvizProfile2DView) table_viewers = (MosvizTableViewer, ) image_viewers = (ImvizImageView, CubevizImageView, RampvizImageView) flux_viewers = (CubevizImageView, RampvizImageView) ramp_viewers = (RampvizProfileView, ) for vid in self._viewer_store: viewer_item = self._viewer_item_by_id(vid) is_returnable = ( (require_no_selected_data and not len(viewer_item['selected_data_items'])) or (not require_no_selected_data) ) if require_spectrum_viewer: if isinstance(self._viewer_store[vid], spectral_viewers) and is_returnable: return viewer_item['reference'] elif require_spectrum_2d_viewer: if isinstance(self._viewer_store[vid], spectral_2d_viewers) and is_returnable: return viewer_item['reference'] elif require_table_viewer: if isinstance(self._viewer_store[vid], table_viewers) and is_returnable: return viewer_item['reference'] elif require_image_viewer: if isinstance(self._viewer_store[vid], image_viewers) and is_returnable: return viewer_item['reference'] elif require_flux_viewer: if isinstance(self._viewer_store[vid], flux_viewers) and is_returnable: return viewer_item['reference'] elif require_profile_viewer: if isinstance(self._viewer_store[vid], ramp_viewers) and is_returnable: return viewer_item['reference'] else: if is_returnable: return viewer_item['reference'] def _viewer_by_id(self, vid): """ Viewer instance by id. Parameters ---------- vid : str The UUID associated with the parent viewer item dictionary. Returns ------- `~glue_jupyter.bqplot.common.BqplotBaseView` The viewer class instance. """ return self._viewer_store.get(vid) def _viewer_item_by_id(self, vid): """ Retrieve a viewer item dictionary by id. Parameters ---------- vid : str The UUID associated with the desired viewer item. Returns ------- viewer_item : dict The dictionary containing the viewer instances and associated attributes. """ def find_viewer_item(stack_items): for stack_item in stack_items: for viewer_item in stack_item.get('viewers'): if viewer_item.get('id') == vid: return viewer_item if len(stack_item.get('children')) > 0: result = find_viewer_item(stack_item.get('children')) if result is not None: return result return None viewer_item = find_viewer_item(self.state.stack_items) return viewer_item def _viewer_by_reference(self, reference, fallback_to_id=False): """ Viewer instance by reference defined in the yaml configuration file. Parameters ---------- reference : str Reference for viewer defined in the yaml configuration file. Returns ------- viewer : `~glue_jupyter.bqplot.common.BqplotBaseView` The viewer class instance. """ viewer_item = self._viewer_item_by_reference(reference) if viewer_item is None and fallback_to_id: return self._viewer_by_id(reference) return self._viewer_store[viewer_item['id']] def _viewer_item_by_reference(self, reference): """ Retrieve a viewer item dictionary by reference. Parameters ---------- reference : str Reference for viewer defined in the yaml configuration file. Returns ------- viewer_item : dict The dictionary containing the viewer instances and associated attributes. """ def find_viewer_item(stack_items): out_viewer_item = None for stack_item in stack_items: for viewer_item in stack_item.get('viewers'): if viewer_item['reference'] == reference: out_viewer_item = viewer_item break if len(stack_item.get('children')) > 0: out_viewer_item = find_viewer_item(stack_item.get('children')) return out_viewer_item viewer_item = find_viewer_item(self.state.stack_items) return viewer_item def _get_viewer_item(self, ref_or_id): """ Retrieve a viewer item dictionary by reference or id (trying reference first). Parameters ---------- ref_or_id : str Reference or ID for viewer defined in the yaml configuration file. Returns ------- viewer_item : dict The dictionary containing the viewer instances and associated attributes. """ if isinstance(ref_or_id, dict): return ref_or_id viewer_item = self._viewer_item_by_reference(ref_or_id) if viewer_item is None: # Maybe they mean the ID viewer_item = self._viewer_item_by_id(ref_or_id) return viewer_item def _check_valid_subset_label(self, subset_name, raise_if_invalid=True): """Check that `subset_name` is a valid choice for a subset name. This check is run when renaming subsets. A valid subset name must not be the name of another subset in the data collection (case insensitive? there cant be a subset 1 and a Subset 1.) The name may match a subset in the data collection if it the current active selection (i.e the renaming is not really a renaming, it is just keeping the old name, which is valid). Unlike dataset names, the attempted renaming of a subset to an existing subset label will not append a number (e.g Subset 1 repeated because Subset 1(1)). If the name exists, a warning will be raised and the original subset name will be returned. """ # get active selection, if there is one if self.session.edit_subset_mode.edit_subset == []: subset_selected = None else: subset_selected = self.session.edit_subset_mode.edit_subset[0].label # Create a copy of reserved labels to check against, excluding the current # selection label, to allow subsets to keep their current name reserved_labels_to_check = set(self._reserved_labels) if subset_selected in reserved_labels_to_check: reserved_labels_to_check.remove(subset_selected) # now check `subset_name` against list of non-active current subset labels # and warn and return if it is if subset_name in reserved_labels_to_check: if raise_if_invalid: raise ValueError("Cannot rename subset to name of an existing subset" f" or data item: ({subset_name}).") return False elif not subset_name.replace(" ", "").isalnum(): if raise_if_invalid: raise ValueError("Subset labels must be purely alphanumeric") return False else: split_label = subset_name.split(" ") if len(split_label) > 1: if split_label[0] == "Subset" and split_label[1].isdigit(): if raise_if_invalid: raise ValueError("The pattern 'Subset N' is reserved for " "auto-generated labels") return False return True def _rename_subset(self, old_label, new_label, subset_group=None, check_valid=True): # Change the label of a subset, making sure it propagates to as many places as it can # I don't think there's an easier way to get subset_group by label, it's just a tuple if subset_group is None: for s in self.data_collection.subset_groups: if s.label == old_label: subset_group = s break # If we couldn't find a matching subset group, raise an error else: raise ValueError(f"No subset named {old_label} to rename") if check_valid: if self._check_valid_subset_label(new_label): subset_group.label = new_label else: subset_group.label = new_label # Update layer icon self._update_layer_icons(old_label, new_label) # Updated derived data if applicable for d in self.data_collection: data_renamed = False # Extracted spectra are named, e.g., 'Data (Subset 1, sum)' if d.label.split('(')[-1].split(',')[0] == old_label: old_data_label = d.label new_data_label = d.label.replace(old_label, new_label) d.label = new_data_label self._update_layer_icons(old_data_label, new_data_label) # Update the entries in the old data menu self._update_data_items_list(old_data_label, new_data_label) data_renamed = True # Update live plugin results subscriptions modified = self._update_live_plugin_results_metadata(d, old_label, new_label, 'subset') # If derived data was renamed, update the label in add_results if data_renamed and modified: if hasattr(d, 'meta') and '_update_live_plugin_results' in d.meta: results_dict = d.meta['_update_live_plugin_results'] if 'add_results' in results_dict: results_dict['add_results']['label'] = new_data_label d.meta['_update_live_plugin_results'] = results_dict self.hub.broadcast(SubsetRenameMessage(subset_group, old_label, new_label, sender=self)) def _reparent_subsets(self, old_parent, new_parent=None): ''' Re-parent subsets that belong to the specified data Parameters ---------- old_parent : glue.core.Data, str The item from the data collection off of which to move the subset definitions. new_parent : glue.core.Data, str The item from the data collection to make the new parent. If None, the first item in the data collection that doesn't match ``old_parent`` will be chosen. ''' from astropy.wcs.utils import pixel_to_pixel from jdaviz.configs.imviz.wcs_utils import get_compass_info if isinstance(old_parent, str): old_parent = self.data_collection[old_parent] if isinstance(new_parent, str): new_parent = self.data_collection[new_parent] elif new_parent is None: for data in self.data_collection: if data is not old_parent: new_parent = data break # Set subset attributes to match a remaining data collection member, using get_subsets to # get components of composite subsets. for key, subset_list in self.get_subsets(simplify_spectral=False).items(): # Get the subset group entry for later. Unfortunately can't just index on label. [subset_group] = [sg for sg in self.data_collection.subset_groups if sg.label == key] for subset in subset_list: subset_state = subset['subset_state'] # Only reparent if needed if subset_state.attributes[0].parent is old_parent: for att in ("att", "xatt", "yatt", "x_att", "y_att"): if hasattr(subset_state, att): subset_att = getattr(subset_state, att) data_components = new_parent.components if subset_att not in data_components: cid = [c for c in data_components if c.label == subset_att.label][0] setattr(subset_state, att, cid) # Translate bounds through WCS if needed if (self.config == "imviz" and self._jdaviz_helper.plugins["Orientation"].align_by == "WCS"): # Default shape for WCS-only layers is 10x10, but it doesn't really matter # since we only need the angles. old_angle, _, old_flip = get_compass_info(old_parent.coords, (10, 10))[-3:] new_angle, _, new_flip = get_compass_info(new_parent.coords, (10, 10))[-3:] if old_flip != new_flip: # Note that this won't work for an irregular/assymetric region if we # ever implement those. relative_angle = 180 - new_angle - old_angle else: relative_angle = new_angle - old_angle # Get the correct link to use for translation roi = subset_state.roi old_xc, old_yc = subset_state.center() if isinstance(roi, (CircularROI, CircularAnnulusROI, EllipticalROI)): # Convert center x, y = pixel_to_pixel(old_parent.coords, new_parent.coords, roi.xc, roi.yc) subset_state.move_to(x, y) for att in ("radius", "inner_radius", "outer_radius", "radius_x", "radius_y"): # Hacky way to get new radii with point on edge of circle # Do we need to worry about using x for the radius conversion for # radius_y if there is distortion? r = getattr(roi, att, None) if r is not None: dummy_x = old_xc + r x2, y2 = pixel_to_pixel(old_parent.coords, new_parent.coords, dummy_x, old_yc) # Need to use x and y in this radius calculation because the # new orientation is likely rotated compared to the original. new_radius = np.sqrt((x2 - x)**2 + (y2 - y)**2) setattr(roi, att, new_radius) elif isinstance(roi, RectangularROI): x1, y1 = pixel_to_pixel(old_parent.coords, new_parent.coords, roi.xmin, roi.ymin) x2, y2 = pixel_to_pixel(old_parent.coords, new_parent.coords, roi.xmin, roi.ymax) x3, y3 = pixel_to_pixel(old_parent.coords, new_parent.coords, roi.xmax, roi.ymin) # Calculate new width and height from possibly rotated result new_half_width = np.sqrt((x3-x1)**2 + (y3-y1)**2) * 0.5 new_half_height = np.sqrt((x2-x1)**2 + (y2-y1)**2) * 0.5 # Convert center new_center = pixel_to_pixel(old_parent.coords, new_parent.coords, old_xc, old_yc) # New min/max before applying theta roi.xmin = new_center[0] - new_half_width roi.xmax = new_center[0] + new_half_width roi.ymin = new_center[1] - new_half_height roi.ymax = new_center[1] + new_half_height # Account for rotation between orientations if hasattr(roi, "theta"): fac = 1.0 if (old_flip != new_flip) else -1.0 roi.theta = (fac * (np.deg2rad(relative_angle) - roi.theta)) % (2 * np.pi) # noqa: E501 elif isinstance(subset_group.subset_state, RangeSubsetState): range_state = subset_group.subset_state wcs_index = (new_parent.coords.world_n_dim - 1 - new_parent.meta['spectral_axis_index']) cur_unit = u.Unit(old_parent.coords.world_axis_units[wcs_index]) new_unit = u.Unit(new_parent.coords.world_axis_units[wcs_index]) if cur_unit is not new_unit: range_state.lo, range_state.hi = cur_unit.to(new_unit, [range_state.lo, range_state.hi]) # Force subset plugin to update bounds and such for subset in subset_group.subsets: subset_message = SubsetUpdateMessage(sender=subset) self.hub.broadcast(subset_message)
[docs] def vue_destroy_viewer_item(self, cid): """ Callback for when viewer area tabs are destroyed. Finds the viewer item associated with the provided id and removes it from the ``stack_items`` list. Parameters ---------- cid : str The viewer ID associated with the viewer item dictionary. """ def remove(stack_items): for stack in stack_items: for viewer in stack['viewers']: if viewer['id'] == cid: stack['viewers'].remove(viewer) if len(stack.get('children', [])) > 0: stack['children'] = remove(stack['children']) for empty_stack in [s for s in stack_items if not s['viewers'] and not s.get('children')]: stack_items.remove(empty_stack) return stack_items remove(self.state.stack_items) # Also remove the viewer from the stored viewer instance dictionary if cid in self._viewer_store: del self._viewer_store[cid] # clear from the viewer icons dictionary if cid in self.state.viewer_icons: del self.state.viewer_icons[cid] self.hub.broadcast(ViewerRemovedMessage(cid, sender=self))
[docs] def vue_change_reference_data(self, event): self._change_reference_data( self._get_data_item_by_id(event['item_id'])['name'], viewer_id=self._get_viewer_item(event['id'])['name'] )
[docs] def set_data_visibility(self, viewer_reference, data_label, visible=True, replace=False): """ Set the visibility of the layers corresponding to ``data_label`` in a given viewer. Parameters ---------- viewer_reference : str Reference (or ID) of the viewer data_label : str Label of the data to set the visiblity. If not already loaded in the viewer, the data will automatically be loaded before setting the visibility visible : bool Whether to set the layer(s) to visible. replace : bool Whether to disable the visibility of all other layers in the viewer """ viewer_item = self._get_viewer_item(viewer_reference) viewer_id = viewer_item['id'] viewer = self.get_viewer_by_id(viewer_id) # if the data_label is in the app, but not loaded in the viewer, automatically load it first viewer_data_labels = [layer.layer.label for layer in viewer.layers] layers_finalized_message = None if data_label not in viewer_data_labels: dc_labels = [data.label for data in self.data_collection] if data_label not in dc_labels: if os.path.exists(data_label): raise ValueError(f'The data label "{data_label}" is not available ' f'to add to the viewer, but it does specify a file path. ' f'If you intended to load the data from that file, use the ' f'`load_data` method or similar.') raise ValueError( f"No data item found with label '{data_label}'. Label must be one " "of:\n\t" + "\n\t".join(dc_labels)) data = self.data_collection[data_label] # set the original color based on metadata preferences, if provided, and otherwise # based on the colorcycler # NOTE: this is intentionally not a single line to avoid incrementing the color-cycler # unless it is used color = data.meta.get('_default_color') if color is None: color = viewer.color_cycler() viewer.add_data(data, percentile=95, color=color) # Specviz removes the data from collection in viewer.py if flux unit incompatible. if data_label not in self.data_collection: return viewer.set_plot_axes() add_data_message = AddDataMessage(data, viewer, viewer_id=viewer_id, sender=self) self.hub.broadcast(add_data_message) layers_finalized_message = LayersFinalizedMessage(data, viewer, viewer_id=viewer_id, sender=self) assoc_children = self._get_assoc_data_children(data_label) # set visibility state of all applicable layers for layer in viewer.layers: layer_is_wcs_only = getattr(layer.layer, 'meta', {}).get(_wcs_only_label, False) if layer.layer.data.label in [data_label] + assoc_children: if layer_is_wcs_only: layer.visible = False layer.update() elif visible and not layer.visible: layer.visible = True layer.update() else: layer.visible = visible # if replace, do another loop (we do a second loop to ensure the visible layer is added # first BEFORE other layers are removed) if replace: for layer in viewer.layers: if layer.layer.data.label != data_label: layer.visible = False # if Data has children, update their visibilities to match Data: available_plugins = [tray_item['name'] for tray_item in self.state.tray_items] for child in assoc_children: if child not in viewer.data_menu.data_labels_loaded: self.add_data_to_viewer(viewer.reference, child, visible=visible) if 'g-data-quality' in available_plugins and visible: # if we're adding a DQ layer to a viewer, make sure that # the layer is appropriately colormapped as DQ: data_quality_plugin = self.get_tray_item_from_name('g-data-quality') old_viewer = data_quality_plugin.viewer_selected data_quality_plugin.viewer_selected = viewer.reference data_quality_plugin.science_layer_selected = data_label data_quality_plugin.dq_layer_selected = child data_quality_plugin.init_decoding(viewers=[viewer]) data_quality_plugin.viewer_selected = old_viewer for layer in viewer.layers: if layer.layer.data.label in assoc_children: if visible and not layer.visible: layer.visible = True layer.update() else: layer.visible = visible # Sets the plot axes labels to be the units of the most recently # active data. viewer_data_labels = [layer.layer.label for layer in viewer.layers] if len(viewer_data_labels) > 0 and getattr(self._jdaviz_helper, '_in_batch_load', 0) == 0: # This "if" is nested on purpose to make parent "if" available # for other configs in the future, as needed. if self.config == 'imviz': viewer.on_limits_change() # Trigger compass redraw if layers_finalized_message: self.hub.broadcast(layers_finalized_message)
[docs] def data_item_remove(self, data_label): data = self.data_collection[data_label] orientation_plugin = self._jdaviz_helper.plugins.get("Orientation") if orientation_plugin is not None and orientation_plugin.align_by == "WCS": from jdaviz.configs.imviz.plugins.orientation.orientation import base_wcs_layer_label orient = orientation_plugin.orientation.selected if orient == data_label: orient = base_wcs_layer_label self._reparent_subsets(data, new_parent=orient) else: self._reparent_subsets(data) # Make sure the data isn't loaded in any viewers and isn't the selected orientation for viewer_id, viewer in self._viewer_store.items(): if orientation_plugin is not None and self._align_by == 'wcs': if viewer.state.reference_data.label == data_label: self._change_reference_data(base_wcs_layer_label, viewer_id) self.remove_data_from_viewer(viewer_id, data_label) data_to_remove = self.data_collection[data_label] if self.config in CONFIGS_WITH_LOADERS: data_needing_relinking = [] for link in self.data_collection.external_links: if hasattr(link, 'data1') and hasattr(link, 'data2'): if data_to_remove == link.data1: data_needing_relinking.append(link.data2) elif data_to_remove == link.data2: data_needing_relinking.append(link.data1) # ComponentsLink has comp_from and comp_to instead of data1 and data2, # and is currently used in linking for Source Catalog elif hasattr(link, 'comp_from') and hasattr(link, 'comp_to'): if data_to_remove == link.comp_from.data: data_needing_relinking.append(link.comp_to.data) elif data_to_remove == link.comp_to.data: data_needing_relinking.append(link.comp_from.data) self.data_collection.remove(data_to_remove) if self.config in CONFIGS_WITH_LOADERS: for data in data_needing_relinking: self._link_new_data_by_component_type(data.label) # If there are two or more datasets left we need to link them back together if anything # was linked only through the removed data. if orientation_plugin is not None: if (len(self.data_collection) > 1 and len(self.data_collection.external_links) < len(self.data_collection) - 1): orientation_plugin._obj._link_image_data() # Hack to restore responsiveness to imviz layers for viewer_ref in self.get_viewer_reference_names(): viewer = self.get_viewer(viewer_ref) loaded_layers = [layer.layer.label for layer in viewer.layers if "Subset" not in layer.layer.label and layer.layer.label not in orientation_plugin.orientation.labels] if len(loaded_layers): self.remove_data_from_viewer(viewer_ref, loaded_layers[-1]) self.add_data_to_viewer(viewer_ref, loaded_layers[-1])
[docs] def vue_close_snackbar_message(self, event): """ Callback to close a message in the snackbar when the "close" button is clicked. """ self.state.snackbar_queue.close_current_message(self.state)
[docs] def vue_call_viewer_method(self, event): viewer_id, method = event['id'], event['method'] args = event.get('args', []) kwargs = event.get('kwargs', {}) return getattr(self._viewer_store[viewer_id], method)(*args, **kwargs)
[docs] def vue_search_item_clicked(self, event): attr, label = event['attr'], event['label'] if attr == 'data_menus': item = self._jdaviz_helper.viewers[label].data_menu else: item = getattr(self._jdaviz_helper, attr)[label] if label == 'About': item.show_popup() elif attr == 'data_menus': item.open_menu() else: kw = {'scroll_to': item._obj._sidebar == 'plugins'} if attr == 'plugins' else {} # noqa item.open_in_tray(**kw)
def _get_data_item_by_id(self, data_id): return next((x for x in self.state.data_items if x['id'] == data_id), None) def _update_existing_data_in_dc(self, msg, data_added): """ Update the ``existing_data_in_dc`` state list to reflect whether data with a given label is in the internal ``DataCollection``. Parameters ---------- msg : `~glue.core.message.DataCollectionAddMessage` or `~glue.core.message.DataCollectionDeleteMessage` The Glue data collection add or delete message containing information about the new or removed data. data_added : bool Whether data was added or removed from the ``DataCollection``. """ data_hash = msg.data.meta.get('_data_hash', None) if data_hash is None: return # Need to make a copy to ensure traitlets notices the change new_existing_data_in_dc = self.existing_data_in_dc.copy() if data_added: if data_hash not in new_existing_data_in_dc: new_existing_data_in_dc.append(data_hash) else: if data_hash in new_existing_data_in_dc: new_existing_data_in_dc.remove(data_hash) self.existing_data_in_dc = new_existing_data_in_dc def _on_data_added(self, msg): """ Callback for when data is added to the internal ``DataCollection``. Adds a new data item dictionary to the ``data_items`` state list and links the new data to any compatible previously loaded data. Parameters ---------- msg : `~glue.core.message.DataCollectionAddMessage` The Glue data collection add message containing information about the new data. """ # We don't need to link the first data to itself if len(self.data_collection) > 1: self._link_new_data() data_item = self._create_data_item(msg.data) self.state.data_items.append(data_item) self._reserved_labels.add(msg.data.label) self._update_existing_data_in_dc(msg, data_added=True) def _clear_object_cache(self, data_label=None): if data_label is None: self._get_object_cache.clear() else: # keys are (data_label, statistic) tuples self._get_object_cache = {k: v for k, v in self._get_object_cache.items() if k[0] != data_label} def _on_data_deleted(self, msg): """ Callback for when data is removed from the internal ``DataCollection``. Removes the data item dictionary in the ``data_items`` state list. Parameters ---------- msg : `~glue.core.message.DataCollectionAddMessage` The Glue data collection add message containing information about the new data. """ for data_item in self.state.data_items: if data_item['name'] == msg.data.label: self.state.data_items.remove(data_item) self._clear_object_cache(msg.data.label) self._update_existing_data_in_dc(msg, data_added=False) def _create_data_item(self, data): ndims = len(data.shape) wcsaxes = data.meta.get('WCSAXES', None) wcs_only = data.meta.get(_wcs_only_label, False) if wcsaxes is None: # then we'll need to determine type another way, we want to avoid # this when we can though since its not as cheap component_ids = [str(c) for c in data.component_ids()] if wcs_only: typ = 'wcs-only' elif data.label == 'MOS Table': typ = 'table' elif 'Trace' in data.meta: typ = 'trace' elif ndims == 1: typ = '1d spectrum' elif ndims == 2 and wcsaxes is not None: if wcsaxes == 3: typ = '2d spectrum' elif wcsaxes == 2: typ = 'image' else: typ = 'unknown' elif ndims == 2 and wcsaxes is None: typ = '2d spectrum' if 'Wavelength' in component_ids else 'image' elif ndims == 3: typ = 'cube' else: typ = 'unknown' # we'll expose any information we need here. For "meta", not all entries are guaranteed # to be serializable, so we'll just send those that we need. def _expose_meta(key): """ Whether to expose this metadata entry from the glue data object to the vue-frontend via the data-item, based on the dictionary key. """ if key in ('plugin', 'mosviz_row'): # mosviz_row is used to hide entries from the spectrum1d/2d viewers if they # do not correspond to the currently selected row return True if key.lower().startswith(f'_{self.config}'): # other internal metadata (like lcviz's '_LCVIZ_EPHEMERIS') # _LCVIZ_EPHEMERIS is used in lcviz to only display data phased to a specific # ephemeris in the appropriate viewer return True return False return { 'id': str(uuid.uuid4()), 'name': data.label, 'locked': False, 'ndims': data.ndim, 'type': typ, 'has_wcs': data_has_valid_wcs(data), 'is_astrowidgets_markers_table': (self.config == "imviz") and layer_is_table_data(data), 'meta': {k: v for k, v in data.meta.items() if _expose_meta(k)}, 'children': [], 'parent': None, } @staticmethod def _create_stack_item(container='gl-stack', children=None, viewers=None): """ Convenience method for generating stack item dictionaries. Parameters ---------- container : str The GoldenLayout container type used to encapsulate the children items within this stack item. children : list List of children stack item dictionaries used for recursively including layout items within each GoldenLayout component cell. viewers : list List of viewer item dictionaries containing the information used to render the viewer and associated tool widgets. Returns ------- dict Dictionary containing information for this stack item. """ children = [] if children is None else children viewers = [] if viewers is None else viewers return { 'id': str(uuid.uuid4()), 'container': container, 'children': children, 'viewers': viewers} def _next_viewer_num(self, prefix): all_vids = self.get_viewer_ids(prefix=prefix) if len(all_vids) == 0: return 0 # Assume name-num format last_vid = all_vids[-1] last_num = int(last_vid.split('-')[-1]) return last_num + 1 def _create_viewer_item(self, viewer, vid=None, name=None, reference=None, open_data_menu_if_empty=True): """ Convenience method for generating viewer item dictionaries. Parameters ---------- viewer : `~glue_jupyter.bqplot.common.BqplotBaseView` The ``Bqplot`` viewer instance. vid : str or `None`, optional The ID of the viewer. name : str or `None`, optional The name shown in the GoldenLayout tab for this viewer. If `None`, it is the same as viewer ID. reference : str, optional The reference associated with this viewer as defined in the yaml configuration file. open_data_menu_if_empty : bool, optional Whether the data menu should be opened when creating the viewer if the viewer is empty. Pass this as False if immediately populating the viewer. Returns ------- dict Dictionary containing information for this viewer item. """ if vid is None: pfx = self.state.settings.get('configuration', str(name)) n = self._next_viewer_num(pfx) vid = f"{pfx}-{n}" # There is a viewer.LABEL inherited from glue-jupyter but there was # objection in using it here because it is not hidden, so we use our # own attribute instead. viewer._reference_id = vid # For reverse look-up if vid not in self.state.viewer_icons: self.state.viewer_icons = {**self.state.viewer_icons, vid: len(self.state.viewer_icons) + 1} # for some reason that state callback is not triggering this self.hub.broadcast(IconsUpdatedMessage('viewer', self.state.viewer_icons, sender=self) ) reference_data = getattr(viewer.state, 'reference_data', None) reference_data_label = getattr(reference_data, 'label', None) linked_by_wcs = getattr(viewer.state, 'linked_by_wcs', False) from jdaviz.configs.default.plugins.viewers import JdavizViewerWindow viewer_container = JdavizViewerWindow(viewer, app=self, reference=reference, name=name) return { 'id': vid, 'reference': reference or name or vid, 'name': name or vid, 'widget': "IPY_MODEL_" + viewer_container.model_id, 'api_methods': viewer._data_menu.api_methods if hasattr(viewer, '_data_menu') else [], 'reference_data_label': reference_data_label, 'canvas_angle': 0, # canvas rotation clockwise rotation angle in deg 'canvas_flip_horizontal': False, # canvas rotation horizontal flip 'collapse': True, 'linked_by_wcs': linked_by_wcs, } def _on_new_viewer(self, msg, vid=None, name=None, add_layers_to_viewer=False, open_data_menu_if_empty=True): """ Callback for when the `~jdaviz.core.events.NewViewerMessage` message is raised. This method asks the application handler to generate a new viewer and then created the associated stack and viewer items. Parameters ---------- msg : `~jdaviz.core.events.NewViewerMessage` The message received from the ``Hub`` broadcast. vid : str or `None` ID of the viewer. If `None`, it is auto-generated from configuration settings. name : str or `None` Name of the viewer. If `None`, it is auto-generated from class name. open_data_menu_if_empty : bool, optional Whether the data menu should be opened when creating the viewer if the viewer is empty. Pass this as False if immediately populating the viewer. Returns ------- viewer : `~glue_jupyter.bqplot.common.BqplotBaseView` The new viewer instance. """ viewer = self._application_handler.new_data_viewer( msg.cls, data=msg.data, show=False) viewer.figure_widget.layout.height = '100%' linked_by_wcs = self._align_by == 'wcs' if hasattr(viewer.state, 'linked_by_wcs'): orientation_plugin = self._jdaviz_helper.plugins.get('Orientation', None) if orientation_plugin is not None: linked_by_wcs = orientation_plugin.align_by.selected == 'WCS' elif len(self._viewer_store) and hasattr(self._jdaviz_helper, 'default_viewer'): # The plugin would only not exist for instances of Imviz where the user has # intentionally removed the Orientation plugin, but in that case we will # adopt "linked_by_wcs" from the first (assuming all are the same) # NOTE: deleting the default viewer is forbidden both by API and UI, but if # for some reason that was the case here, linked_by_wcs will default to False linked_by_wcs = self._jdaviz_helper.default_viewer._obj.glue_viewer.state.linked_by_wcs # noqa else: linked_by_wcs = False viewer.state.linked_by_wcs = linked_by_wcs if msg.x_attr is not None: x = msg.data.id[msg.x_attr] viewer.state.x_att = x # Create the viewer item dictionary if name is None: name = vid new_viewer_item = self._create_viewer_item( viewer=viewer, vid=vid, name=name, reference=name, open_data_menu_if_empty=open_data_menu_if_empty ) if self.config == 'imviz': # NOTE: if ever extending image rotation beyond imviz or adding non-image viewers # to imviz: this currently assumes that the helper has a default_viewer and that is an # image viewer ref_data = self._jdaviz_helper.default_viewer._obj.glue_viewer.state.reference_data new_viewer_item['reference_data_label'] = getattr(ref_data, 'label', None) if hasattr(viewer, 'reference'): viewer.state.reference_data = ref_data new_stack_item = self._create_stack_item( container='gl-stack', viewers=[new_viewer_item]) self.state.viewer_items.append(new_viewer_item) # Store the glupyter viewer object so we can access the add and remove # data methods in the future vid = new_viewer_item['id'] self._viewer_store[vid] = viewer # Add viewer locally if (self.config in CONFIGS_WITH_LOADERS and len(self.state.stack_items)): # add to bottom (eventually will want more control in placement) self.state.stack_items[0]['children'].append(new_stack_item) else: self.state.stack_items.append(new_stack_item) self.session.application.viewers.append(viewer) if add_layers_to_viewer: for layer_label in add_layers_to_viewer: if hasattr(viewer, 'reference'): self.add_data_to_viewer(viewer.reference, layer_label) # Send out a toast message self.hub.broadcast(ViewerAddedMessage(vid, sender=self)) return viewer
[docs] def load_configuration(self, path=None, config=None): """ Parses the provided input into a configuration dictionary and populates the appropriate state values with the results. Provided input can either be a configuration YAML file or a pre-made configuration dictionary. Parameters ---------- path : str, optional Path to the configuration file to be loaded. In the case where this is ``None``, it loads the default configuration. Optionally, this can be provided as name reference. **NOTE** This optional way to define the configuration will be removed in future versions. config : dict, optional A dictionary of configuration settings to be loaded. The dictionary contents should be the same as a YAML config file specification. """ # reset the application state self._reset_state() # load the configuration from the yaml file or configuration object assert not (path and config), 'Cannot specify both a path and a config object!' if config: assert isinstance(config, dict), 'configuration object must be a dictionary' else: # check if the input path is actually a dict object if isinstance(path, dict): config = path else: config = read_configuration(path=path) # store the loaded config object self._loaded_configuration = config # give the vue templates access to the current config/layout self.config = config['settings'].get('configuration', 'unknown').lower() self.vdocs = 'latest' if 'dev' in __version__ else 'v'+__version__ if self.config in ALL_JDAVIZ_CONFIGS: self.docs_link = f'https://jdaviz.readthedocs.io/en/{self.vdocs}/{self.config}/index.html' # noqa else: self.docs_link = f'https://jdaviz.readthedocs.io/en/{self.vdocs}' self.state.settings.update(config.get('settings')) def compose_viewer_area(viewer_area_items): stack_items = [] for item in viewer_area_items: stack_item = self._create_stack_item( container=CONTAINER_TYPES[item.get('container')]) stack_items.append(stack_item) for view in item.get('viewers', []): viewer = self._application_handler.new_data_viewer( viewer_registry.members.get(view['plot'])['cls'], data=None, show=False) viewer.figure_widget.layout.height = '100%' viewer_item = self._create_viewer_item( name=view.get('name'), viewer=viewer, reference=view.get('reference')) self._viewer_store[viewer_item['id']] = viewer stack_item.get('viewers').append(viewer_item) if len(item.get('children', [])) > 0: child_stack_items = compose_viewer_area( item.get('children')) stack_item['children'] = child_stack_items return stack_items if config.get('viewer_area') is not None: stack_items = compose_viewer_area(config.get('viewer_area')) self.state.stack_items.extend(stack_items) # Add the toolbar item filter to the toolbar component for name in config.get('toolbar', []): tool = tool_registry.members.get(name)(app=self) self.state.tool_items.append({ 'name': name, 'widget': "IPY_MODEL_" + tool.model_id }) self._application_handler._tools[name] = tool # Loaders def open(): self.state.drawer_content = 'loaders' # TODO: rename to "add"? self.state.add_subtab = 0 def close(): self.state.loader_selected = '' def set_active_loader(resolver): self.state.loader_selected = resolver # registry will be populated at import if self.config in CONFIGS_WITH_LOADERS: import jdaviz.core.loaders # noqa for name, Resolver in loader_resolver_registry.members.items(): loader = Resolver(app=self, open_callback=open, close_callback=close, set_active_loader_callback=set_active_loader) self.state.loader_items.append({ 'name': name, 'label': name, 'requires_api_support': loader.requires_api_support, 'widget': "IPY_MODEL_" + loader.model_id, 'api_methods': loader.api_methods, }) # initialize selection (tab) to first entry if len(self.state.loader_items): self.state.loader_selected = self.state.loader_items[0]['name'] # Tray plugins if self.config == 'deconfigged': self.update_tray_items_from_registry() import jdaviz.core.viewer_creators # noqa self.update_new_viewers_from_registry() else: for name in config.get('tray', []): tray_registry_member = tray_registry.members.get(name) self.state.tray_items.append(self._create_tray_item(tray_registry_member))
[docs] def update_loaders_from_registry(self): if self.config != 'deconfigged': raise NotImplementedError("update_loaders_from_registry is only " "implemented for the deconfigged app") for loader in self._jdaviz_helper.loaders.values(): loader.format._update_items()
[docs] def update_tray_items_from_registry(self): if self.config != 'deconfigged': raise NotImplementedError("update_tray_items_from_registry is only " "implemented for the deconfigged app") # need to rebuild in order, just pulling from existing dict if its already there tray_items = [] for category in ['app:options', 'data:reduction', 'data:manipulation', 'data:analysis', 'core']: for tray_registry_member in tray_registry.members_in_category(category): if not tray_registry_member.get('overwrite', False): try: tray_item = self.get_tray_item_from_name( tray_registry_member.get('name'), return_widget=False) except KeyError: create_new = True else: create_new = False else: create_new = True if create_new: try: tray_item = self._create_tray_item(tray_registry_member) except Exception as e: self.hub.broadcast(SnackbarMessage( f"Failed to load plugin {tray_registry_member.get('name')}: {e}", sender=self, color='error', traceback=e)) tray_items.append(tray_item) self.state.tray_items = tray_items
def _create_tray_item(self, tray_registry_member): tray_item_instance = tray_registry_member.get('cls')(app=self, tray_instance=True) # store a copy of the tray name in the instance so it can be accessed by the # plugin itself tray_item_label = tray_registry_member.get('label') tray_item_description = tray_item_instance.plugin_description # NOTE: is_relevant is later updated by observing irrelvant_msg traitlet tray_item = { 'name': tray_registry_member.get('name'), 'label': tray_item_label, 'sidebar': tray_item_instance._sidebar, 'subtab': tray_item_instance._subtab, 'tray_item_description': tray_item_description, 'api_methods': tray_item_instance.api_methods, 'is_relevant': len(tray_item_instance.irrelevant_msg) == 0, 'widget': "IPY_MODEL_" + tray_item_instance.model_id } return tray_item
[docs] def update_new_viewers_from_registry(self): # TODO: implement jdaviz.new_viewers dictionary to instantiated items here if self.config != 'deconfigged': raise NotImplementedError("update_new_viewers_from_registry is only " "implemented for the deconfigged app") # need to rebuild in order, just pulling from existing dict if its already there new_viewer_items = [] for name, vc_registry_member in viewer_creator_registry.members.items(): try: item = self.get_new_viewer_item_from_name( name, return_widget=False) except KeyError: try: item = self._create_new_viewer_item(vc_registry_member) except Exception as e: self.hub.broadcast(SnackbarMessage( f"Failed to load viewer {name}: {e}", sender=self, color='error', traceback=e)) continue new_viewer_items.append(item) self.state.new_viewer_items = new_viewer_items relevant_items = [nvi for nvi in new_viewer_items if nvi['is_relevant']] if not len(self.state.new_viewer_selected) and len(relevant_items): self.state.new_viewer_selected = relevant_items[0]['label']
def _create_new_viewer_item(self, vc_registry_member): def open(): self.state.drawer_content = 'loaders' # TODO: rename to "add"? self.state.add_subtab = 1 def close(): self.state.new_viewer_selected = '' def set_active_viewer_creator(new_viewer_label): self.state.new_viewer_selected = new_viewer_label vc_instance = vc_registry_member(app=self, open_callback=open, close_callback=close, set_active_callback=set_active_viewer_creator) new_viewer_item = { 'label': vc_registry_member._registry_label, 'widget': "IPY_MODEL_" + vc_instance.model_id, 'api_methods': vc_instance.api_methods, 'is_relevant': vc_instance.is_relevant, } return new_viewer_item
[docs] def get_new_viewer_item_from_name(self, name, return_widget=True): return self._get_state_item_from_name(self.state.new_viewer_items, name, return_widget)
def _reset_state(self): """ Resets the application state """ self.state = ApplicationState() self._application_handler._tools = {}
[docs] def get_configuration(self, path=None, section=None): """Returns a copy of the application configuration. Returns a copy of the configuration specification. If path is not specified, returns the currently loaded configuration. Parameters ---------- path : str, optional path to the configuration file to be retrieved. section : str, optional A section of the configuration to retrieve. Returns ------- cfg : dict A configuration specification dictionary. """ config = None if not path: config = self._loaded_configuration cfg = get_configuration(path=path, section=section, config=config) return cfg
def _get_state_item_from_name(self, state_list, name, return_widget=True): from ipywidgets.widgets import widget_serialization ret_item = None for item in state_list: if item.get('name') == name or item.get('label') == name: ipy_model_id = item['widget'] if return_widget: ret_item = widget_serialization['from_json'](ipy_model_id, None) else: ret_item = item break if ret_item is None: raise KeyError(f'{name} not found') return ret_item
[docs] def get_tray_item_from_name(self, name, return_widget=True): """Return the instance of a tray item for a given name. This is useful for direct programmatic access to Jdaviz plugins registered under tray items. Parameters ---------- name : str The name used when the plugin was registered to an internal `~jdaviz.core.registries.TrayRegistry`. Returns ------- tray_item : obj The instance of the plugin registered to tray items. Raises ------ KeyError Name not found. """ return self._get_state_item_from_name(self.state.tray_items, name, return_widget)
def _init_data_associations(self): # assume all Data are parents: data_associations = { data.label: {'parent': None, 'children': []} for data in self.data_collection } return data_associations def _add_assoc_data_as_parent(self, data_label): self._data_associations[data_label] = {'parent': None, 'children': []} def _set_assoc_data_as_child(self, data_label, new_parent_label): for data_item in self.state.data_items: if data_item['name'] == data_label: child_id = data_item['id'] elif data_item['name'] == new_parent_label: new_parent_id = data_item['id'] # Data has a new parent: self._data_associations[data_label]['parent'] = new_parent_label # parent has a new child: self._data_associations[new_parent_label]['children'].append(data_label) # update the data item so vue can see the change: for data_item in self.state.data_items: if data_item['name'] == data_label: data_item['parent'] = new_parent_id elif data_item['name'] == new_parent_label: data_item['children'].append(child_id) def _get_assoc_data_children(self, data_label): # intentionally not recursive for now, just one generation: return self._data_associations.get(data_label, {}).get('children', []) def _get_assoc_data_parent(self, data_label): return self._data_associations.get(data_label, {}).get('parent')