Source code for jdaviz.configs.imviz.plugins.aper_phot_simple.aper_phot_simple

import os
import warnings
from datetime import datetime, timezone

import numpy as np
from astropy import units as u
from astropy.modeling.fitting import TRFLSQFitter
from astropy.modeling import Parameter
from astropy.modeling.models import Gaussian1D
from astropy.time import Time
from glue.core.message import SubsetUpdateMessage
from ipywidgets import widget_serialization
from photutils.aperture import (ApertureStats, CircularAperture, EllipticalAperture,
                                RectangularAperture)
from photutils.profiles import CurveOfGrowth, RadialProfile
from traitlets import Any, Bool, Integer, List, Unicode, observe

from jdaviz.core.custom_traitlets import FloatHandleEmpty
from jdaviz.core.custom_units_and_equivs import PIX2
from jdaviz.core.events import (GlobalDisplayUnitChanged, SnackbarMessage,
                                LinkUpdatedMessage, SliceValueUpdatedMessage)
from jdaviz.core.region_translators import regions2aperture
from jdaviz.core.registries import tray_registry
from jdaviz.core.template_mixin import (PluginTemplateMixin, DatasetMultiSelectMixin,
                                        SubsetSelect, ApertureSubsetSelectMixin,
                                        TableMixin, PlotMixin, MultiselectMixin, with_spinner)
from jdaviz.core.unit_conversion_utils import (all_flux_unit_conversion_equivs,
                                               check_if_unit_is_per_solid_angle,
                                               flux_conversion_general,
                                               handle_squared_flux_unit_conversions)
from jdaviz.core.user_api import PluginUserApi
from jdaviz.utils import PRIHDR_KEY

__all__ = ['SimpleAperturePhotometry']


[docs] @tray_registry('imviz-aper-phot-simple', label="Aperture Photometry", category="data:analysis") class SimpleAperturePhotometry(PluginTemplateMixin, ApertureSubsetSelectMixin, DatasetMultiSelectMixin, TableMixin, PlotMixin, MultiselectMixin): """ The Aperture Photometry plugin performs aperture photometry for drawn regions. See the :ref:`Aperture Photometry Plugin Documentation <aper-phot-simple>` for more details. Only the following attributes and methods are available through the :ref:`public plugin API <plugin-apis>`: * :meth:`~jdaviz.core.template_mixin.PluginTemplateMixin.close_in_tray` * :meth:`~jdaviz.core.template_mixin.PluginTemplateMixin.open_in_tray` * :meth:`~jdaviz.core.template_mixin.PluginTemplateMixin.show` * :meth:`~jdaviz.core.template_mixin.TableMixin.clear_table` * :meth:`~jdaviz.core.template_mixin.TableMixin.export_table` * :meth:`calculate_batch_photometry` * :meth:`calculate_photometry` * ``fitted_models`` Dictionary of fitted models. * ``fit_radial_profile`` Whether to fit a radial profile to the data. * :meth:`unpack_batch_options` * ``aperture`` (:class:`~jdaviz.core.template_mixin.SubsetSelect`): * ``background`` (:class:`~jdaviz.core.template_mixin.SubsetSelect`): * ``background_value`` Fixed value to use as background level. * ``counts_factor`` Factor to convert data unit to counts, in unit of flux/counts. * ``current_plot_type`` Choice of Curve of Growth, Radial Profile, or Radial Profile (Raw). Only applicable when ``multiselect=False``. * ``dataset`` (:class:`~jdaviz.core.template_mixin.DatasetSelect`): * ``flux_scaling`` Flux scaling factor for calculation of magnitudes in output table. * ``multiselect`` Enable multiselect mode to select multiple datasets for aperture photometry. * ``pixel_area`` In arcsec squared, only used if data is in units of surface brightness. * ``plot`` (:class:`~jdaviz.core.template_mixin.Plot`): Plot, based on selection of ``current_plot_type``. Only applicable when ``multiselect=False`` * ``table`` (:class:`~jdaviz.core.template_mixin.Table`): Table with photometry results. * ``cube_slice`` Current slice wavelength being used for aperture photometry (cubes only, read-only). """ template_file = __file__, "aper_phot_simple.vue" uses_active_status = Bool(True).tag(sync=True) aperture_area = Integer().tag(sync=True) background_items = List().tag(sync=True) background_selected = Unicode("").tag(sync=True) background_value = FloatHandleEmpty(0).tag(sync=True) pixel_area_multi_auto = Bool(True).tag(sync=True) pixel_area = FloatHandleEmpty(0).tag(sync=True) counts_factor = FloatHandleEmpty(0).tag(sync=True) flux_scaling_multi_auto = Bool(True).tag(sync=True) flux_scaling_warning = Unicode("").tag(sync=True) flux_scaling = FloatHandleEmpty(0).tag(sync=True) result_available = Bool(False).tag(sync=True) result_failed_msg = Unicode("").tag(sync=True) results = List().tag(sync=True) plot_types = List([]).tag(sync=True) current_plot_type = Unicode().tag(sync=True) plot_available = Bool(False).tag(sync=True) radial_plot = Any('').tag(sync=True, **widget_serialization) fit_radial_profile = Bool(False).tag(sync=True) fit_results = List().tag(sync=True) # Cubes only cube_slice = Unicode("").tag(sync=True) is_cube = Bool(False).tag(sync=True) # surface brightness display unit display_unit = Unicode("").tag(sync=True) # angle component of `display_unit`, to avoid repetition of seperating # it out from `display_unit` display_solid_angle_unit = Unicode("").tag(sync=True) # flux scaling display unit will always be flux, not sb. again its own # traitlet to avoid avoid repetition of seperating it out from `display_unit` flux_scaling_display_unit = Unicode("").tag(sync=True) def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # description displayed under plugin title in tray self._plugin_description = 'Perform aperture photometry for drawn regions.' self.dataset.add_filter('is_image_or_flux_cube') self.background = SubsetSelect(self, 'background_items', 'background_selected', dataset='dataset', default_text='Manual', manual_options=['Manual'], filters=['is_spatial', 'is_not_composite']) headers = ['xcenter', 'ycenter', 'sky_center', 'sum', 'sum_aper_area', 'aperture_sum_counts', 'aperture_sum_counts_err', 'aperture_sum_mag', 'min', 'max', 'mean', 'median', 'mode', 'std', 'mad_std', 'var', 'biweight_location', 'biweight_midvariance', 'fwhm', 'semimajor_sigma', 'semiminor_sigma', 'orientation', 'eccentricity', 'data_label', 'subset_label'] self.table.headers_avail = headers self.table.headers_visible = headers self.plot_types = ["Curve of Growth", "Radial Profile", "Radial Profile (Raw)"] self.current_plot_type = self.plot_types[0] self._fitted_models = {} self._fitted_model_name = 'phot_radial_profile' # override default plot styling self.plot.figure.fig_margin = {'top': 60, 'bottom': 60, 'left': 65, 'right': 15} self.plot.viewer.axis_y.tick_format = '0.2e' self.plot.viewer.axis_y.label_offset = '55px' self.session.hub.subscribe(self, SubsetUpdateMessage, handler=self._on_subset_update) self.session.hub.subscribe(self, LinkUpdatedMessage, handler=self._on_link_update) # Custom dataset filters for Cubes def valid_cube_datasets(data): if self.config == 'deconfigged' and data.ndim < 3: return True comp = data.get_component(data.main_components[0]) img_unit = u.Unit(comp.units) if comp.units else u.dimensionless_unscaled solid_angle_unit = check_if_unit_is_per_solid_angle(img_unit, return_unit=True) if solid_angle_unit is None: # this is encountered sometimes ?? return # multiply out solid angle so we can check physical type of numerator img_unit *= solid_angle_unit acceptable_types = ['spectral flux density wav', 'photon flux density wav', 'spectral flux density', 'photon flux density'] return ((data.ndim in (2, 3)) and (img_unit.physical_type in acceptable_types)) if self.config != 'imviz': self.dataset.add_filter(valid_cube_datasets) self.session.hub.subscribe(self, SliceValueUpdatedMessage, handler=self._on_slice_changed) self.hub.subscribe(self, GlobalDisplayUnitChanged, handler=self._on_display_units_changed) if self.config == "deconfigged": self.observe_traitlets_for_relevancy(traitlets_to_observe=['dataset_items']) @property def user_api(self): expose = ('multiselect', 'dataset', 'aperture', 'background', 'background_value', 'pixel_area', 'counts_factor', 'flux_scaling', 'calculate_photometry', 'unpack_batch_options', 'calculate_batch_photometry', 'table', 'clear_table', 'export_table', 'fitted_models', 'current_plot_type', 'fit_radial_profile', 'plot') if self.config == 'Imviz': return PluginUserApi(self, expose=expose) else: return PluginUserApi(self, expose=expose, readonly=('cube_slice',)) @property def fitted_models(self): return self._fitted_models @property def _has_display_unit_support(self): """ Currently, unit conversion in this plugin is only supported for cubes. When this is expanded to images in deconfigged, this logic will need to change. """ return self.config != 'imviz' and self.display_unit != '' and self.is_cube def _on_slice_changed(self, msg): self.cube_slice = f"{msg.value:.3e} {msg.value_unit}" self._cube_wave = u.Quantity(msg.value, msg.value_unit) @observe("dataset_selected") def _on_dataset_selected_changed(self, event={}): # self.dataset might not exist when app is setting itself up. if not hasattr(self, "dataset"): return if self.dataset.selected_dc_item is None: return if isinstance(self.dataset.selected_dc_item, list): datasets = self.dataset.selected_dc_item else: datasets = [self.dataset.selected_dc_item] self.is_cube = False for dataset in datasets: # This assumes all cubes, or no cubes. # 'Is cube' here means is it a cube, or a collapsed cube. if dataset.ndim > 2 or dataset.meta.get('plugin', None) == 'Collapse': self.is_cube = True break if self.multiselect: # defaults are applied within the loop if the auto-switches are enabled, # but we still need to update the flux-scaling warning self._multiselect_flux_scaling_warning() return try: defaults = self._get_defaults_from_metadata() self.counts_factor = 0 self.pixel_area = defaults.get('pixel_area', 0) self.flux_scaling = defaults.get('flux_scaling', 0) if 'flux_scaling' in defaults: self.flux_scaling_warning = '' else: self.flux_scaling_warning = ('Could not determine flux scaling for ' f'{self.dataset.selected}, defaulting to zero.') except Exception as e: self.hub.broadcast(SnackbarMessage( f"Failed to extract {self.dataset_selected}: {repr(e)}", color='error', sender=self, traceback=e)) # get correct display unit for newly selected dataset if self._has_display_unit_support: # sets display_unit and flux_scaling_display_unit traitlets self._set_display_unit_of_selected_dataset() # auto-populate background, if applicable. self._aperture_selected_changed() def _on_display_units_changed(self, event={}): """ Handle change of display units from Unit Conversion plugin. If new display units differ from input data units, input parameters for ap. phot. (i.e background, flux scaling) are converted to the new units. Photometry will remain in previous unit until 'calculate' is pressed again. """ # get previously selected display units prev_display_unit = self.display_unit prev_flux_scale_unit = self.flux_scaling_display_unit # update display unit traitlets to new selection self._set_display_unit_of_selected_dataset() # convert the previous background and flux scaling values to new unit so # re-calculating photometry with the current selections will produce # the previous output with the new unit. if prev_display_unit != '': # convert background to new unit if self.background_value is not None: # FIXME: Kinda hacky, better solution welcomed. # Basically we want to forget about PIX2 and do normal conversion. # Since traitlet does not keep unit, we do not need to reapply PIX2. if "pix2" in prev_display_unit and "pix2" in self.display_unit: prev_unit = u.Unit(prev_display_unit) * PIX2 new_unit = u.Unit(self.display_unit) * PIX2 else: prev_unit = u.Unit(prev_display_unit) new_unit = u.Unit(self.display_unit) if self.multiselect: if len(self.dataset.selected) == 1: data = self.dataset.selected_dc_item[0] else: raise ValueError("cannot calculate background median in multiselect") else: data = self.dataset.selected_dc_item if prev_unit != new_unit: # NOTE: I don't think all of these equivalencies are necessary, # but I'm keeping them since they were already here. Background # should only be converted flux<>flux or sb<>sb so only a possible # u.spectral_density would be needed. explore removing these as a follow up pixar_sr = data.meta.get('_pixel_scale_factor', 1.0) if data else 1.0 equivs = all_flux_unit_conversion_equivs(pixar_sr, self._cube_wave) + u.spectral() self.background_value = flux_conversion_general(self.background_value, prev_unit, new_unit, equivs, with_unit=False) # convert flux scaling to new unit if self.flux_scaling is not None: prev_unit = u.Unit(prev_flux_scale_unit) new_unit = u.Unit(self.flux_scaling_display_unit) if prev_unit != new_unit: equivs = u.spectral_density(self._cube_wave) self.flux_scaling = flux_conversion_general(self.flux_scaling, prev_unit, new_unit, equivs, with_unit=False) def _set_display_unit_of_selected_dataset(self): """ Set the display_unit and flux_scaling_display_unit traitlets, which depend on if the selected data set is flux or surface brightness, and the corresponding global display unit for either flux or surface brightness. """ # all cubes are in sb so we can get display unit for plugin from SB display unit # this can be changed to listen specifically to changes in surface brightness # from UC plugin GlobalDisplayUnitChange message, but will require some refactoring disp_unit = self.app._get_display_unit('sb') # this check needs to be here because 'get_display_unit' will sometimes # return non surface brightness units or even None when the app is starting # up. this can be removed once that is fixed (see PR #3144) if disp_unit is None or not check_if_unit_is_per_solid_angle(disp_unit): self.display_unit = '' self.flux_scaling_display_unit = '' return self.display_unit = disp_unit # get angle component of surface brightness # note: could add 'axis=angle' when cleaning this code up to avoid repeating this display_solid_angle_unit = check_if_unit_is_per_solid_angle(disp_unit, return_unit=True) if display_solid_angle_unit is not None: self.display_solid_angle_unit = display_solid_angle_unit.to_string() else: # there should always be a solid angle, but I think this is # encountered sometimes when initializing something... self.display_solid_angle_unit = '' # flux scaling will be applied when the solid angle component is # multiplied out, so use 'flux' display unit fs_unit = self.app._get_display_unit('flux') self.flux_scaling_display_unit = fs_unit # if cube loaded is per-pixel-squared sb (i.e flux cube loaded) # pixel_area should be fixed to 1 if self.display_solid_angle_unit == 'pix2': self.pixel_area = 1.0 def _get_defaults_from_metadata(self, dataset=None): defaults = {} if dataset is None: meta = self.dataset.selected_dc_item.meta.copy() else: meta = self.dataset._get_dc_item(dataset).meta.copy() # Extract telescope specific unit conversion factors, if applicable. if PRIHDR_KEY in meta: meta.update(meta[PRIHDR_KEY]) del meta[PRIHDR_KEY] if 'telescope' in meta: telescope = meta['telescope'] else: telescope = meta.get('TELESCOP', '') if telescope == 'JWST': # Hardcode the flux conversion factor from MJy to ABmag mjy2abmag = 0.003631 # if display unit is different, translate if self._has_display_unit_support: disp_unit = u.Unit(self.display_unit) mjy2abmag = flux_conversion_general(mjy2abmag, u.MJy / u.sr, disp_unit, u.spectral_density(self._cube_wave), with_unit=False) if 'photometry' in meta and 'pixelarea_arcsecsq' in meta['photometry']: defaults['pixel_area'] = meta['photometry']['pixelarea_arcsecsq'] if 'bunit_data' in meta and meta['bunit_data'] == u.Unit("MJy/sr"): defaults['flux_scaling'] = mjy2abmag elif 'PIXAR_A2' in meta: defaults['pixel_area'] = meta['PIXAR_A2'] if 'BUNIT' in meta and meta['BUNIT'] == u.Unit("MJy/sr"): defaults['flux_scaling'] = mjy2abmag elif telescope == 'HST': # TODO: Add more HST support, as needed. # HST pixel scales are from instrument handbooks. # This is really not used because HST data does not have sr in unit. # This is only for completeness. # For counts conversion, PHOTFLAM is used to convert "counts" to flux manually, # which is the opposite of JWST, so we just do not do it here. instrument = meta.get('INSTRUME', '').lower() detector = meta.get('DETECTOR', '').lower() if instrument == 'acs': if detector == 'wfc': defaults['pixel_area'] = 0.05 * 0.05 elif detector == 'hrc': # pragma: no cover defaults['pixel_area'] = 0.028 * 0.025 elif detector == 'sbc': # pragma: no cover defaults['pixel_area'] = 0.034 * 0.03 elif instrument == 'wfc3' and detector == 'uvis': # pragma: no cover defaults['pixel_area'] = 0.04 * 0.04 return defaults @observe('flux_scaling_multi_auto') def _multiselect_flux_scaling_warning(self, event={}): if not self.flux_scaling_multi_auto: self.flux_scaling_warning = '' return no_flux_scaling = [dataset for dataset in self.dataset.selected if 'flux_scaling' not in self._get_defaults_from_metadata(dataset)] if len(no_flux_scaling): self.flux_scaling_warning = ('Could not determine flux scaling for ' f'{", ".join(no_flux_scaling)}. Those entries will ' 'default to zero. Turn off auto-mode to provide ' 'flux-scaling manually.') else: self.flux_scaling_warning = '' @observe('flux_scaling') def _singleselect_flux_scaling_warning(self, event={}): if not self.multiselect: # disable warning once user changes value self.flux_scaling_warning = '' def _on_subset_update(self, msg): if not self.dataset_selected or not self.aperture_selected: return if self.multiselect: self._background_selected_changed() return sbst = msg.subset if sbst.label == self.aperture_selected and sbst.data.label == self.dataset_selected: self._aperture_selected_changed() elif sbst.label == self.background_selected and sbst.data.label == self.dataset_selected: self._background_selected_changed() def _on_link_update(self, msg): if not self.dataset_selected or not self.aperture_selected: return # Force background auto-calculation to update when linking has changed. self._aperture_selected_changed() @observe('aperture_selected', 'multiselect') def _aperture_selected_changed(self, event={}): if not self.dataset_selected or not self.aperture_selected: return if self.multiselect is not isinstance(self.aperture_selected, list): # then multiselect is in the process of changing but the traitlet for aperture_selected # has not been updated internally yet return if self.multiselect: self._background_selected_changed() return if self._has_display_unit_support: self._set_display_unit_of_selected_dataset() # NOTE: aperture_selected can be triggered here before aperture_selected_validity is updated # so we'll still allow the snackbar to be raised as a second warning to the user and to # avoid acting on outdated information # NOTE: aperture area is only used to determine if a warning should be shown in the UI # and so does not need to be calculated within user API calls that don't act on traitlets try: # Sky subset does not have area. Not worth it to calculate just for a warning. if hasattr(self.aperture.selected_spatial_region, 'area'): self.aperture_area = int(np.ceil(self.aperture.selected_spatial_region.area)) else: self.aperture_area = 0 except Exception as e: self.hub.broadcast(SnackbarMessage( f"Failed to extract {self.aperture_selected}: {repr(e)}", color='error', sender=self, traceback=e)) else: self._background_selected_changed() @property def _cube_slice_ind(self): # TODO: performance improvements, change to listen to slice change event slice_plugin = self.app._jdaviz_helper.plugins.get('Spectral Slice', None) if slice_plugin is None: return None selected_obj = self.dataset.selected_obj if isinstance(selected_obj, list): # This should always be length 1 here thanks to checks elsewhere # but just in case... if len(selected_obj) != 1: raise IndexError("Cannot determine cube slice index with multiple datasets.") spectral_axis = selected_obj[0].spectral_axis else: spectral_axis = selected_obj.spectral_axis sp_disp_unit = self.app._get_display_unit('spectral') # Use the spectral axis directly, convert to spectral axis display unit, and compare # to the value reported by the slice plugin. return np.argmin(abs(spectral_axis.to_value( sp_disp_unit, equivalencies=u.spectral()) - slice_plugin.value)) def _calc_background_median(self, reg, data=None): # Basically same way image stats are calculated in vue_do_aper_phot() # except here we only care about one stat for the background. if data is None: if self.multiselect: if len(self.dataset.selected) == 1: data = self.dataset.selected_dc_item[0] else: raise ValueError("cannot calculate background median in multiselect") else: data = self.dataset.selected_dc_item comp = data.get_component(data.main_components[0]) if data.ndim > 2: spectral_axis_index = getattr(data, "meta", {}).get("spectral_axis_index", 0) if self._cube_slice_ind is not None: if spectral_axis_index == 0: comp_data = comp.data[self._cube_slice_ind, :, :] else: comp_data = comp.data[:, :, self._cube_slice_ind].T else: # fallback if slice index cannot be determined if spectral_axis_index == 0: comp_data = comp.data[0, :, :] else: comp_data = comp.data[:, :, 0].T # Similar to coords_info logic. if '_orig_spec' in getattr(data, 'meta', {}): w = data.meta['_orig_spec'].wcs.celestial else: w = data.coords.celestial else: # "imviz" comp_data = comp.data # ny, nx w = data.coords if hasattr(reg, 'to_pixel'): reg = reg.to_pixel(w) aper_mask_stat = reg.to_mask(mode='center') img_stat = aper_mask_stat.get_values(comp_data, mask=None) # photutils/background/_utils.py --> nanmedian() bg_md = np.nanmedian(img_stat) # Naturally in data unit # convert background median to display unit, if necessary (cubes only) if self._has_display_unit_support and comp.units: bg_md = flux_conversion_general(bg_md, u.Unit(comp.units), u.Unit(self.display_unit), u.spectral_density(self._cube_wave), with_unit=False) return bg_md @observe('background_selected') def _background_selected_changed(self, event={}): background_selected = event.get('new', self.background_selected) if background_selected == 'Manual': # we'll later access the user's self.background_value directly return if self.multiselect: # background_value will be recomputed within batch mode anyways and will # be replaced in the UI with a message self.background_value = -1 return try: type = 'sky_region' if self.app._align_by == 'wcs' else 'region' reg = self.app.get_subsets(subset_name=self.background_selected, include_sky_region=type == 'sky_region', spatial_only=True)[0][type] self.background_value = self._calc_background_median(reg) except Exception as e: self.background_value = 0 self.hub.broadcast(SnackbarMessage( f"Failed to extract {background_selected}: {repr(e)}", color='error', sender=self, traceback=e))
[docs] @with_spinner() def calculate_photometry(self, dataset=None, aperture=None, background=None, background_value=None, pixel_area=None, counts_factor=None, flux_scaling=None, add_to_table=True, update_plots=True): """ Calculate aperture photometry given the values set in the plugin or any overrides provided as arguments here (which will temporarily override plugin values for this calculation only). Note: Values set in the plugin for cubes are in the selected display unit from the Unit conversion plugin. Overrides are, as the docstrings note, assumed to be in the units of the selected dataset. Parameters ---------- dataset : str, optional Dataset to use for photometry. aperture : str, optional Subset to use as the aperture. background : str, optional Subset to use to calculate the background. background_value : float, optional Background to subtract, same unit as data. Automatically computed if ``background`` is set to a subset. pixel_area : float, optional Pixel area in arcsec squared, only used if data unit is a surface brightness unit. counts_factor : float, optional Factor to convert data unit to counts, in unit of flux/counts. flux_scaling : float, optional Same unit as data, used in -2.5 * log(flux / flux_scaling). add_to_table : bool, optional update_plots : bool, optional Returns ------- table row, fit results """ if self.multiselect and (dataset is None or aperture is None): # pragma: no cover raise ValueError("for batch mode, use calculate_batch_photometry") if dataset is not None: if dataset not in self.dataset.choices: # pragma: no cover raise ValueError(f"dataset must be one of {self.dataset.choices}") data = self.dataset._get_dc_item(dataset) else: # we can use the pre-cached value data = self.dataset.selected_dc_item if data.ndim > 2: if "spectral_axis_index" in getattr(data, "meta", {}): spectral_axis_index = data.meta["spectral_axis_index"] else: spectral_axis_index = 0 if aperture is not None: if aperture not in self.aperture.choices: raise ValueError(f"aperture must be one of {self.aperture.choices}") if aperture is not None or dataset is not None: reg = self.aperture._get_spatial_region(subset=aperture if aperture is not None else self.aperture.selected, # noqa dataset=dataset if dataset is not None else self.dataset.selected) # noqa # determine if a valid aperture (since selected_validity only applies to selected entry) _, _, validity = self.aperture._get_mark_coords_and_validate(selected=aperture) if not validity.get('is_aperture'): raise ValueError(f"Selected aperture {aperture} is not valid: {validity.get('aperture_message')}") # noqa else: # use the pre-cached value if not self.aperture.selected_validity.get('is_aperture'): raise ValueError(f"Selected aperture is not valid: {self.aperture.selected_validity.get('aperture_message')}") # noqa reg = self.aperture.selected_spatial_region # Reset last fitted model fit_model = None # TODO: remove _fitted_model_name cache? if self._fitted_model_name in self._fitted_models: del self._fitted_models[self._fitted_model_name] comp = data.get_component(data.main_components[0]) if comp.units: img_unit = u.Unit(comp.units) else: img_unit = None if self._has_display_unit_support: display_unit = u.Unit(self.display_unit) if background is not None and background not in self.background.choices: # pragma: no cover raise ValueError(f"background must be one of {self.background.choices}") if background_value is not None: if ((background not in (None, 'Manual')) or (background is None and self.background_selected != 'Manual')): raise ValueError("cannot provide background_value with background!='Manual'") elif (background == 'Manual' or (background is None and self.background.selected == 'Manual')): background_value = self.background_value # cube loaders: background_value set in plugin is in display units # convert temporarily to image units for calculations if self._has_display_unit_support and img_unit is not None: background_value = flux_conversion_general(background_value, display_unit, img_unit, u.spectral_density(self._cube_wave), with_unit=False) elif background is None and dataset is None: # use the previously-computed value in the plugin background_value = self.background_value # cubes: background_value set in plugin is in display units # convert temporarily to image units for calculations if self._has_display_unit_support and img_unit is not None: background_value = flux_conversion_general(background_value, display_unit, img_unit, u.spectral_density(self._cube_wave), with_unit=False) else: bg_reg = self.aperture._get_spatial_region(subset=background if background is not None else self.background.selected, # noqa dataset=dataset if dataset is not None else self.dataset.selected) # noqa background_value = self._calc_background_median(bg_reg, data=data) # cubes: computed background median will be in display units, # convert temporarily back to image units for calculations if self._has_display_unit_support and img_unit is not None: background_value = flux_conversion_general(background_value, display_unit, img_unit, u.spectral_density(self._cube_wave), with_unit=False) try: bg = float(background_value) except ValueError: # Clearer error message raise ValueError('Missing or invalid background value') if data.ndim > 2: if spectral_axis_index == 0: comp_data = comp.data[self._cube_slice_ind, :, :] else: comp_data = comp.data[:, :, self._cube_slice_ind].T # Similar to coords_info logic. if '_orig_spec' in getattr(data, 'meta', {}): w = data.meta['_orig_spec'].wcs else: w = data.coords else: # "imviz" comp_data = comp.data # ny, nx w = data.coords if hasattr(reg, 'to_pixel'): sky_center = reg.center if data.ndim > 2: ycenter, xcenter = w.world_to_pixel(self._cube_wave, sky_center)[1] else: # "imviz" xcenter, ycenter = w.world_to_pixel(sky_center) else: xcenter = reg.center.x ycenter = reg.center.y if data.coords is not None: if data.ndim > 2: if spectral_axis_index == 0: sky = w.pixel_to_world(xcenter, ycenter, self._cube_slice_ind) else: sky = w.pixel_to_world(self._cube_slice_ind, ycenter, xcenter) sky_center = [coord for coord in sky if hasattr(coord, "icrs")][0] else: # "imviz" sky_center = w.pixel_to_world(xcenter, ycenter) else: sky_center = None aperture = regions2aperture(reg) include_pixarea_fac = False include_counts_fac = False include_flux_scale = False if comp.units: # work for now in units of currently selected dataset (which may or # may not be the desired output units, depending on the display # units selected in the Unit Conversion plugin. background value # has already been converted to image units above, and flux scaling # will be converted from display unit > img_unit comp_data = comp_data << img_unit bg = bg * img_unit if check_if_unit_is_per_solid_angle(img_unit): # if units are surface brightness try: pixarea = float(pixel_area if pixel_area is not None else self.pixel_area) except ValueError: # Clearer error message raise ValueError('Missing or invalid pixel area') if not np.allclose(pixarea, 0): include_pixarea_fac = True if img_unit != u.count: try: ctfac = float(counts_factor if counts_factor is not None else self.counts_factor) # noqa: E501 except ValueError: # Clearer error message raise ValueError('Missing or invalid counts conversion factor') else: if ctfac < 0: raise ValueError('Counts conversion factor cannot be negative ' f'but got {ctfac}.') if not np.allclose(ctfac, 0): include_counts_fac = True # if cube and flux_scaling is provided as override, it is in the data units # if set in the app, it is in the display units and needs to be converted # if provided as an override keyword arg, it is assumed to be in the # data units and does not need to be converted if (self._has_display_unit_support and (flux_scaling is None) and (self.flux_scaling is not None)): # convert flux_scaling from flux display unit to native flux unit flux_scaling = flux_conversion_general( self.flux_scaling, u.Unit(self.flux_scaling_display_unit), img_unit * u.Unit(self.display_solid_angle_unit), u.spectral_density(self._cube_wave), with_unit=False) try: flux_scale = float(flux_scaling if flux_scaling is not None else self.flux_scaling) except ValueError: # Clearer error message raise ValueError('Missing or invalid flux scaling') if not np.allclose(flux_scale, 0): include_flux_scale = True # from now, we will just need the image unit as a string for display img_unit = img_unit.to_string() else: img_unit = None phot_aperstats = ApertureStats(comp_data, aperture, wcs=data.coords, local_bkg=bg) phot_table = phot_aperstats.to_table(columns=( 'id', 'sum', 'sum_aper_area', 'min', 'max', 'mean', 'median', 'mode', 'std', 'mad_std', 'var', 'biweight_location', 'biweight_midvariance', 'fwhm', 'semimajor_sigma', 'semiminor_sigma', 'orientation', 'eccentricity')) # Some cols excluded, add back as needed. # noqa rawsum = phot_table['sum'][0] if include_pixarea_fac: # convert pixarea, which is in arcsec2/pix2 to the display solid angle unit / pix2 if data.ndim == 2: # 2D images # can remove once unit conversion implemented in imviz and # display_solid_angle_unit traitlet is set, for now it will always be the data units display_solid_angle_unit = check_if_unit_is_per_solid_angle(comp.units, return_unit=True) elif self._has_display_unit_support: display_solid_angle_unit = u.Unit(self.display_solid_angle_unit) else: raise NotImplementedError( f"Unsupported config {self.config} for aperture photometry.") # if angle unit is pix2, pixarea should be 1 pixel2 per pixel2 if display_solid_angle_unit == PIX2: pixarea_fac = 1 * PIX2 else: pixarea = pixarea * (u.arcsec * u.arcsec / PIX2) # NOTE: Sum already has npix value encoded, so we simply apply the npix unit here. # don't need to go though flux_conversion_general since these units # arent per-pixel and won't need a workaround. pixarea_fac = PIX2 * pixarea.to(display_solid_angle_unit / PIX2) phot_table['sum'] = [rawsum * pixarea_fac] else: pixarea_fac = None if include_counts_fac: ctfac = ctfac * (rawsum.unit / u.count) sum_ct = rawsum / ctfac sum_ct_err = np.sqrt(sum_ct.value) * sum_ct.unit else: ctfac = None sum_ct = None sum_ct_err = None if include_flux_scale: flux_scale = flux_scale * phot_table['sum'][0].unit sum_mag = -2.5 * np.log10(phot_table['sum'][0] / flux_scale) * u.mag else: flux_scale = None sum_mag = None # Extra info beyond photutils. phot_table.add_columns( [xcenter * u.pix, ycenter * u.pix, sky_center, bg, pixarea_fac, sum_ct, sum_ct_err, ctfac, sum_mag, flux_scale, data.label, reg.meta.get('label', ''), Time(datetime.now(tz=timezone.utc))], names=['xcenter', 'ycenter', 'sky_center', 'background', 'pixarea_tot', 'aperture_sum_counts', 'aperture_sum_counts_err', 'counts_fac', 'aperture_sum_mag', 'flux_scaling', 'data_label', 'subset_label', 'timestamp'], indexes=[1, 1, 1, 1, 3, 3, 3, 3, 3, 3, 18, 18, 18]) if self._has_display_unit_support: if data.ndim > 2: slice_val = self._cube_wave else: slice_val = u.Quantity(np.nan, self._cube_wave.unit) phot_table.add_column(slice_val, name="slice_wave", index=29) if comp.units: # convert units of output table to reflect display units # selected in Unit Conversion plugin display_unit = u.Unit(self.display_unit) # equivalencies for unit conversion, will never be flux<>sb # so only need spectral_density equivs = u.spectral_density(self._cube_wave) if display_unit != '': if phot_table['background'].unit != display_unit: bg_conv = flux_conversion_general(phot_table['background'].value, phot_table['background'].unit, display_unit, equivs) phot_table['background'] = bg_conv phot_sum = phot_table['sum'] if include_pixarea_fac: if phot_sum.unit != (display_unit * pixarea_fac).unit: phot_table['sum'] = flux_conversion_general(phot_sum.value, phot_sum.unit, (display_unit * pixarea_fac).unit, # noqa: E501 equivs) elif phot_sum.unit != display_unit: phot_table['sum'] = flux_conversion_general(phot_sum.value, phot_sum.unit, display_unit, equivs) for key in ['min', 'max', 'mean', 'median', 'mode', 'std', 'mad_std', 'biweight_location']: if phot_table[key].unit != display_unit: phot_table[key] = flux_conversion_general(phot_table[key].value, phot_table[key].unit, display_unit, equivs) for key in ['var', 'biweight_midvariance']: # these values will be in units of flux or surface brightness # squared, so unit conversion is another special case if additional # equivalencies are required if phot_table[key].unit != display_unit**2: conv = handle_squared_flux_unit_conversions(phot_table[key].value, phot_table[key].unit, display_unit**2, equivs) phot_table[key] = conv if add_to_table: try: phot_table['id'][0] = self.table._qtable['id'].max() + 1 self.table.add_item(phot_table) except Exception: # Discard incompatible QTable self.table.clear_table() phot_table['id'][0] = 1 self.table.add_item(phot_table) # User wants 'sum' as scientific notation. self.table._qtable['sum'].info.format = '.6e' # Plots. if update_plots: # for cube unit conversion display units if self.display_unit != '': plot_display_unit = self.display_unit else: plot_display_unit = None if self.current_plot_type == "Curve of Growth": if data.ndim > 2: self.plot.figure.title = f'Curve of growth from aperture center at {slice_val:.4e}' # noqa: E501 eqv = u.spectral_density(self._cube_wave) else: self.plot.figure.title = 'Curve of growth from aperture center' eqv = [] x_arr, sum_arr, x_label, y_label = _curve_of_growth( comp_data, (xcenter, ycenter), aperture, wcs=data.coords, background=bg, pixarea_fac=pixarea_fac, image_unit=img_unit, display_unit=plot_display_unit, equivalencies=eqv) self.plot._update_data('profile', x=x_arr, y=sum_arr, reset_lims=True) self.plot.update_style('profile', line_visible=True, color='gray', size=32) self.plot.update_style('fit', visible=False) self.plot.figure.axes[0].label = x_label self.plot.figure.axes[1].label = y_label else: # Radial profile self.plot.figure.axes[0].label = 'pix' if plot_display_unit: self.plot.figure.axes[1].label = plot_display_unit else: self.plot.figure.axes[1].label = img_unit or 'Value' if self.current_plot_type == "Radial Profile": if data.ndim > 2: self.plot.figure.title = f'Radial profile from aperture center at {slice_val:.4e}' # noqa: E501 eqv = u.spectral_density(self._cube_wave) else: self.plot.figure.title = 'Radial profile from aperture center' eqv = [] x_data, y_data = _radial_profile( comp_data, phot_aperstats.bbox, (xcenter, ycenter), raw=False, image_unit=img_unit, display_unit=plot_display_unit, equivalencies=eqv, background=bg) self.plot._update_data('profile', x=x_data, y=y_data, reset_lims=True) self.plot.update_style('profile', line_visible=True, color='gray', size=32) else: # Radial Profile (Raw) if data.ndim > 2: self.plot.figure.title = f'Raw radial profile from aperture center at {slice_val:.4e}' # noqa: E501 else: self.plot.figure.title = 'Raw radial profile from aperture center' x_data, y_data = _radial_profile( comp_data, phot_aperstats.bbox, (xcenter, ycenter), raw=True, image_unit=img_unit, display_unit=plot_display_unit, background=bg) self.plot._update_data('profile', x=x_data, y=y_data, reset_lims=True) self.plot.update_style('profile', line_visible=False, color='gray', size=10) # Fit Gaussian1D to radial profile data. # Even though photutils radial profile has gaussian fit option, the fit is done # before Jdaviz unit conversion, so we do our own fit here after unit conversion. if self.fit_radial_profile: fitter = TRFLSQFitter() y_max = np.nanmax(y_data) x_mean = np.nanmean(x_data[np.where(y_data == y_max)]) std = 0.5 * (phot_table['semimajor_sigma'][0] + phot_table['semiminor_sigma'][0]) if isinstance(std, u.Quantity): std = std.value gs = Gaussian1D(amplitude=y_max, mean=x_mean, stddev=std, fixed={'amplitude': True}, bounds={'amplitude': (y_max * 0.5, y_max)}) with warnings.catch_warnings(record=True) as warns: fit_model = fitter(gs, x_data, y_data, filter_non_finite=True) if len(warns) > 0: msg = os.linesep.join([str(w.message) for w in warns]) self.hub.broadcast(SnackbarMessage( f"Radial profile fitting: {msg}", color='warning', sender=self)) y_fit = fit_model(x_data) self._fitted_models[self._fitted_model_name] = fit_model self.plot._update_data('fit', x=x_data, y=y_fit, reset_lims=True) self.plot.update_style('fit', color='magenta', markers_visible=False, line_visible=True) else: self.plot.update_style('fit', visible=False) # Parse results for GUI. tmp = [] for key in phot_table.colnames: if key in ('id', 'data_label', 'subset_label', 'background', 'pixarea_tot', 'counts_fac', 'aperture_sum_counts_err', 'flux_scaling', 'timestamp'): continue x = phot_table[key][0] if isinstance(x, u.Quantity): # split up unit and value to put in different cols unit = x.unit.to_string() if unit == '': # for eccentricity which is a quantity with an empty unit unit = '-' x = x.value else: unit = '-' if (isinstance(x, (int, float)) and key not in ('xcenter', 'ycenter', 'sky_center', 'sum_aper_area', 'aperture_sum_counts', 'aperture_sum_mag', 'slice_wave')): if x == 0: tmp.append({'function': key, 'result': f'{x:.1f}', 'unit': unit}) else: tmp.append({'function': key, 'result': f'{x:.3e}', 'unit': unit}) elif key == 'sky_center' and x is not None: tmp.append({'function': 'RA center', 'result': f'{x.ra.deg:.6f}', 'unit': 'deg'}) tmp.append({'function': 'Dec center', 'result': f'{x.dec.deg:.6f}', 'unit': 'deg'}) elif key in ('xcenter', 'ycenter', 'sum_aper_area'): tmp.append({'function': key, 'result': f'{x:.1f}', 'unit': unit}) elif key == 'aperture_sum_counts' and x is not None: tmp.append({'function': key, 'result': f'{x:.4e} ({phot_table["aperture_sum_counts_err"][0]:.4e})', 'unit': unit}) elif key == 'aperture_sum_mag' and x is not None: tmp.append({'function': key, 'result': f'{x:.3f}', 'unit': unit}) elif key == 'slice_wave': if data.ndim > 2: tmp.append({'function': key, 'result': f'{slice_val.value:.4e}', 'unit': slice_val.unit.to_string()}) # noqa: E501 else: tmp.append({'function': key, 'result': str(x), 'unit': unit}) if update_plots: # Also display fit results fit_tmp = [] if fit_model is not None and isinstance(fit_model, Gaussian1D): for param in ('mean', 'fwhm', 'amplitude'): p_val = getattr(fit_model, param) if isinstance(p_val, Parameter): p_val = p_val.value fit_tmp.append({'function': param, 'result': f'{p_val:.4e}'}) self.results = tmp self.result_available = True if update_plots: self.fit_results = fit_tmp self.plot_available = True return phot_table, fit_model
[docs] def vue_do_aper_phot(self, *args, **kwargs): if self.dataset_selected == '' or self.aperture_selected == '': self.hub.broadcast(SnackbarMessage( "No data for aperture photometry", color='error', sender=self)) return try: if self.multiselect: # even though plots aren't show in the UI when in multiselect mode, # we'll create the last entry so if multiselect is disabled, the last # iteration will show and not result in confusing behavior self.calculate_batch_photometry(add_to_table=True, update_plots=True) else: self.calculate_photometry(add_to_table=True, update_plots=True) except Exception as e: # pragma: no cover self.plot.clear_all_marks() msg = f"Aperture photometry failed: {repr(e)}" self.hub.broadcast(SnackbarMessage(msg, color='error', sender=self, traceback=e)) self.result_failed_msg = msg else: self.result_failed_msg = ''
[docs] def unpack_batch_options(self, **options): """ Unpacks a dictionary of options for batch mode, including all combinations of any values passed as tuples or lists. For example:: unpack_batch_options(dataset=['image1', 'image2'], aperture=['Subset 1', 'Subset 2'], background=['Subset 3'], flux_scaling=3 ) would result in:: [{'aperture': 'Subset 1', 'dataset': 'image1', 'background': 'Subset 3', 'flux_scaling': 3}, {'aperture': 'Subset 2', 'dataset': 'image1', 'background': 'Subset 3', 'flux_scaling': 3}, {'aperture': 'Subset 1', 'dataset': 'image2', 'background': 'Subset 3', 'flux_scaling': 3}, {'aperture': 'Subset 2', 'dataset': 'image2', 'background': 'Subset 3', 'flux_scaling': 3}] Parameters ---------- options : dict, optional Dictionary of values to override from the values set in the plugin/traitlets. Each entry can either be a single value, or a list. All combinations of those that contain a list will be exposed. If not provided and the plugin is in multiselect mode (``multiselect = True``), then the current values set in the plugin will be used. Returns ------- options : list List of all combinations of input parameters, which can then be used as input to `calculate_batch_photometry`. """ if not isinstance(options, dict): raise TypeError("options must be a dictionary") if not options: if not self.multiselect: # pragma: no cover raise ValueError("must either provide a dictionary or set plugin to multiselect mode") # noqa options = {'dataset': self.dataset.selected, 'aperture': self.aperture.selected} # TODO: use self.user_api once API is made public user_api = self # .user_api invalid_keys = [k for k in options.keys() if not hasattr(user_api, k)] if len(invalid_keys): raise ValueError(f"{invalid_keys} are not valid inputs for batch photometry") def _is_single(v): if isinstance(v, (list, tuple)): if len(v) == 1: return True, v[0] return False, v return True, v single_values, mult_values = {}, {} for k, v in options.items(): is_single, this_value = _is_single(v) if is_single: single_values[k] = this_value else: mult_values[k] = this_value def _unpack_dict_list(mult_values, single_values): if not len(mult_values): return [single_values] options_list = [] # loop over the first item in mult_values # any remaining mult values will require recursion this_attr, this_values = list(mult_values.items())[0] remaining_mult_values = {k: v for j, (k, v) in enumerate(mult_values.items()) if j > 0} for this_value in this_values: if not len(remaining_mult_values): options_list += [{this_attr: this_value, **single_values}] continue options_list += _unpack_dict_list(remaining_mult_values, {this_attr: this_value, **single_values}) return options_list return _unpack_dict_list(mult_values, single_values)
[docs] @with_spinner() def calculate_batch_photometry(self, options=[], add_to_table=True, update_plots=True, full_exceptions=False): """ Run aperture photometry over a list of options. Unprovided options will remain at their values defined in the plugin. To provide a list of values per-input, use `unpack_batch_options` to and pass that as input here. Parameters ---------- options : list Each entry will result in one computation of aperture photometry and should be a dictionary of values to override from the values set in the plugin/traitlets. add_to_table : bool Whether to add results to the plugin table. update_plots : bool Whether to update the plugin plots for the last iteration. full_exceptions : bool, optional Whether to expose the full exception message for all failed iterations. """ # input validation if not isinstance(options, list): raise TypeError("options must be a list of dictionaries") if not np.all([isinstance(option, dict) for option in options]): raise TypeError("options must be a list of dictionaries") if not len(options): if not self.multiselect: # pragma: no cover raise ValueError("must either provide manual options or put the plugin in multiselect mode") # noqa # unpack the batch options as provided in the app options = self.unpack_batch_options() failed_iters, exceptions = [], [] for i, option in enumerate(options): # only update plots on the last iteration this_update_plots = i == len(options) and update_plots defaults = self._get_defaults_from_metadata(option.get('dataset', self.dataset.selected)) if self.pixel_area_multi_auto: option.setdefault('pixel_area', defaults.get('pixel_area', 0)) if self.flux_scaling_multi_auto: option.setdefault('flux_scaling', defaults.get('flux_scaling', 0)) try: self.calculate_photometry(add_to_table=add_to_table, update_plots=this_update_plots, **option) except Exception as e: failed_iters.append(i) if full_exceptions: exceptions.append(e) if len(failed_iters): err_msg = f"inputs {failed_iters} failed and were skipped." if full_exceptions: err_msg += f" Exception messages: {exceptions}" else: err_msg += " To see full exceptions, run individually or pass full_exceptions=True" # noqa raise RuntimeError(err_msg)
# NOTE: These are hidden because the APIs are for internal use only # but we need them as a separate functions for unit testing. def _radial_profile(data, reg_bb, centroid, raw=False, image_unit=None, display_unit=None, equivalencies=[], background=0): """Calculate radial profile. Parameters ---------- data : ndarray or `~astropy.units.Quantity` Data for radial profile. reg_bb : obj Bounding box from ``ApertureStats``. centroid : tuple of int ``ApertureStats`` centroid or desired center in ``(x, y)``. raw : bool If `True`, returns raw data points for scatter plot. Otherwise, use ``imexam`` algorithm for a clean plot. image_unit : str or None (For cubes only to deal with display unit conversion). Unit of input data, used with `display_unit` to convert output to desired display unit. display_unit : str or None (For cubes only to deal with display unit conversion). Desired unit for output. equivalencies : list or None Optional, equivalencies for unit conversion to convert radial profile to display unit selected in the unit conversion plugin, if it differs from the native data unit. background : float or `~astropy.units.Quantity` Background to subtract, if any. Unit must match ``data``. Returns ------- x_arr, y_arr : ndarray X and Y data to plot. Y should already be background subtracted. """ if isinstance(data, u.Quantity): data = data.value if isinstance(background, u.Quantity): background = background.value # data unit edge_radii = np.arange(max(reg_bb.shape) // 2 + 2) # Edges need one extra rp = RadialProfile(data, centroid, edge_radii, error=None, mask=None) if raw: x_arr = rp.data_radius # pix y_arr = rp.data_profile - background # data unit else: x_arr = rp.radius # pix y_arr = rp.profile - background # data unit if display_unit is not None: if image_unit is None: raise ValueError('Must provide image_unit with display_unit.') # convert array from native data unit to display unit, if they differ if image_unit != display_unit: y_arr = flux_conversion_general(y_arr, u.Unit(image_unit), u.Unit(display_unit), equivalencies=equivalencies, with_unit=False) return x_arr, y_arr def _curve_of_growth(data, centroid, aperture, wcs=None, background=0, n_datapoints=10, pixarea_fac=None, image_unit=None, display_unit=None, equivalencies=[]): """Calculate curve of growth for aperture photometry. Parameters ---------- data : ndarray or `~astropy.units.Quantity` Data for the calculation. centroid : tuple of int ``ApertureStats`` centroid or desired center in ``(x, y)``. aperture : obj ``photutils`` aperture used to set radii for the curve. Actual aperture used is always circular. wcs : obj or `None` Supported WCS objects or `None`. background : float or `~astropy.units.Quantity` Background to subtract, if any. Unit must match ``data``. n_datapoints : int Number of data points in the curve. pixarea_fac : `~astropy.units.Quantity` or `None` For ``flux_unit/sr`` to ``flux_unit`` conversion. image_unit : str or None (For cubes only to deal with display unit conversion). Unit of input data, used with `display_unit` to convert output to desired display unit. display_unit : str or None (For cubes only to deal with display unit conversion). Desired unit for output. If unit is a surface brightness, a Flux unit will be returned if pixarea_fac is provided. equivalencies : list or None Optional, equivalencies for unit conversion to convert radial profile to display unit selected in the unit conversion plugin, if it differs from the native data unit. Returns ------- x_arr : ndarray Data for X-axis of the curve. sum_arr : ndarray or `~astropy.units.Quantity` Data for Y-axis of the curve. x_label, y_label : str X- and Y-axis labels, respectively. Raises ------ TypeError Unsupported aperture. """ n_datapoints += 1 # n + 1 if isinstance(data, u.Quantity): data = data.value if isinstance(background, u.Quantity): background = background.value # data unit if hasattr(aperture, 'to_pixel'): aperture = aperture.to_pixel(wcs) if isinstance(aperture, CircularAperture): r = aperture.r elif isinstance(aperture, EllipticalAperture): r = max(aperture.a, aperture.b) elif isinstance(aperture, RectangularAperture): r = max(aperture.w, aperture.h) * 0.5 else: raise TypeError(f'Unsupported aperture: {aperture}') radii = np.linspace(0, r, num=n_datapoints)[1:] cog = CurveOfGrowth(data, centroid, radii, error=None, mask=None) x_arr = cog.radius sum_arr = cog.profile - (cog.radii ** 2 * np.pi * background) # Determine desired unit for output sum array and y label. # Cubes only: To handle unit conversion when display unit changes. if image_unit is not None: sum_unit = u.Unit(image_unit) if (display_unit is not None) and (image_unit != display_unit): disp_unit = u.Unit(display_unit) do_flux_conv = True else: do_flux_conv = False # multiply data unit by its solid angle to convert sum in sb to sum in flux if pixarea_fac is not None: sa = check_if_unit_is_per_solid_angle(sum_unit, return_unit=True) sum_unit *= sa sum_arr = sum_arr * pixarea_fac.value if do_flux_conv: disp_unit *= sa # convert array from native data unit to display unit, if they differ if do_flux_conv: sum_arr = flux_conversion_general(sum_arr, sum_unit, disp_unit, equivalencies=equivalencies, with_unit=False) y_label = disp_unit.to_string() else: y_label = sum_unit.to_string() else: y_label = 'Value' return x_arr, sum_arr, 'Radius (pix)', y_label