Source code for jdaviz.configs.default.plugins.model_fitting.fitting_backend

from asteval import Interpreter
import multiprocessing as mp
import numpy as np

from astropy.modeling import fitting
from specutils import Spectrum
from specutils.fitting import fit_lines

from jdaviz.utils import parallelize_calculation

__all__ = ['fit_model_to_spectrum', 'generate_spaxel_list']


[docs] def fit_model_to_spectrum(spectrum, component_list, expression, run_fitter=False, fitter=fitting.TRFLSQFitter(calc_uncertainties=True), window=None, n_cpu=None, **kwargs): """Fits a `~astropy.modeling.CompoundModel` to a `~specutils.Spectrum` instance. If the input spectrum represents a spectral cube, then fits the model to every spaxel in the cube, using a multiprocessor pool running in parallel (if ``n_cpu`` is larger than 1). Parameters ---------- spectrum : `~specutils.Spectrum` The spectrum to be fitted. component_list : list Spectral model subcomponents stored in a list. Their ``'name'`` attribute must be unique. Each subcomponent should be an initialized object from `~astropy.modeling.Model`. expression : str The arithmetic expression that combines together the model subcomponents. The subcomponents are referred via their ``'name'`` attribute. run_fitter : bool **This is currently being ignored for 3D fits.** When `False` (the default), the function composes the compound model and returns it without fitting. window : `None` or `~specutils.SpectralRegion` See :func:`specutils.fitting.fit_lines`. n_cpu : `None` or int **This is only used for spectral cube fitting.** Number of cores to use for multiprocessing. Using all the cores at once is not recommended. If `None`, it will use max cores minus one. Set this to 1 for debugging. Returns ------- output_model : `~astropy.modeling.CompoundModel` or list The model resulting from the fit. In the case of a 1D input spectrum, a single model instance is returned. In case of a 3D spectral cube input, instead o model instances for every spaxel, a list with 2D arrays, each one storing fitted parameter values for all spaxels, is returned. output_spectrum : `~specutils.Spectrum` The realization of the fitted model as a spectrum. The spectrum will be 1D or 3D depending on the shape of input spectrum. """ # Initial guess for the fit. initial_model = _build_model(component_list, expression) if len(spectrum.shape) > 1: return _fit_3D(initial_model, spectrum, fitter=fitter, window=window, n_cpu=n_cpu, **kwargs) else: return _fit_1D(initial_model, spectrum, run_fitter, fitter=fitter, window=window, **kwargs)
def _fit_1D(initial_model, spectrum, run_fitter, fitter, window=None, **kwargs): """ Fits an astropy CompoundModel to a Spectrum instance. Parameters ---------- initial_model : :class: `astropy.modeling.CompoundModel` Initial guess for the model to be fitted. spectrum : :class:`specutils.Spectrum` The spectrum to be fitted. run_fitter : bool When False (the default), the function composes the compound model and returns it without fitting. fitter : :class: `astropy.modeling.fitting` Object Custom fitter for model. window : `None` or :class:`specutils.spectra.SpectralRegion` See :func:`specutils.fitting.fitmodels.fit_lines`. Returns ------- output_model : :class: `astropy.modeling.CompoundModel` The model resulting from the fit. output_spectrum : :class:`specutils.Spectrum` The realization of the fitted model as a spectrum. """ if run_fitter: if spectrum.uncertainty and not np.all(spectrum.uncertainty.array == 0): weights = 'unc' else: weights = None output_model = fit_lines(spectrum, initial_model, fitter=fitter, weights=weights, window=window, **kwargs) output_values = output_model(spectrum.spectral_axis) else: # Return without fitting. output_model = initial_model output_values = initial_model(spectrum.spectral_axis) # Build return spectrum output_spectrum = Spectrum(spectral_axis=spectrum.spectral_axis, flux=output_values, mask=spectrum.mask) return output_model, output_spectrum def _fit_3D(initial_model, spectrum, fitter, window=None, n_cpu=None, **kwargs): """ Fits an astropy CompoundModel to every spaxel in a cube using a multiprocessor pool running in parallel. Computes realizations of the models over each spaxel. Parameters ---------- initial_model : :class: `astropy.modeling.CompoundModel` Initial guess for the model to be fitted. spectrum : :class:`specutils.Spectrum` The spectrum that stores the cube in its 'flux' attribute. fitter : :class: `astropy.modeling.fitting` Object Custom fitter for model. window : `None` or :class:`specutils.spectra.SpectralRegion` See :func:`specutils.fitting.fitmodels.fit_lines`. n_cpu : `None` or int Number of cores to use for multiprocessing. Using all the cores at once is not recommended. If `None`, it will use max cores minus one. Set this to 1 for debugging. Returns ------- output_model : :list: a list that stores 2D arrays. Each array contains one parameter from `astropy.modeling.CompoundModel` instances fitted to every spaxel in the input cube. output_spectrum : :class:`specutils.Spectrum` The spectrum that stores the fitted model values in its 'flux' attribute. """ if n_cpu is None: n_cpu = mp.cpu_count() - 1 # Generate list of all spaxels to be fitted spaxels = generate_spaxel_list(spectrum) fitted_models = [] # Build cube with empty arrays, one per input spaxel. These # will store the flux values corresponding to the fitted # model realization over each spaxel. output_flux_cube = np.zeros(shape=spectrum.flux.shape) # Callback to collect results from workers into the cubes def collect_result(results): for i in range(len(results['x'])): x = results['x'][i] y = results['y'][i] model = results['fitted_model'][i] fitted_values = results['fitted_values'][i] # Store fitted model parameters fitted_models.append({"x": x, "y": y, "model": model}) # Store fitted values if spectrum.spectral_axis_index in [2, -1]: output_flux_cube[x, y, :] = fitted_values elif spectrum.spectral_axis_index == 0: output_flux_cube[:, y, x] = fitted_values # Allow selecting a futures-based streaming backend via # the `use_futures` kwarg. This preserves callback-like # behavior by processing results as each worker finishes. if n_cpu > 1: # Build workers (one per chunk) same as the Pool chunking workers = ( SpaxelWorker(spectrum.flux, spectrum.spectral_axis, initial_model, fitter=fitter, param_set=spx, window=window, mask=spectrum.mask, spectral_axis_index=spectrum.spectral_axis_index, **kwargs) for spx in np.array_split(spaxels, n_cpu)) parallelize_calculation(workers, collect_result, n_cpu=n_cpu) # This route is only for dev debugging because it is very slow # but exceptions will not get swallowed up by joblib. else: # pragma: no cover worker = SpaxelWorker(spectrum.flux, spectrum.spectral_axis, initial_model, fitter=fitter, param_set=spaxels, window=window, mask=spectrum.mask, spectral_axis_index=spectrum.spectral_axis_index, **kwargs) collect_result(worker()) # Build output 3D spectrum. Don't need spectral_axis_index because we use the WCS funit = spectrum.flux.unit output_spectrum = Spectrum(wcs=spectrum.wcs, flux=output_flux_cube * funit, mask=spectrum.mask) return fitted_models, output_spectrum class SpaxelWorker: """ A class with callable instances that perform fitting over a spaxel. It provides the callable for the `Pool.apply_async` function, and also holds everything necessary to perform the fit over one spaxel. Additionally, the callable computes the realization of the model just fitted, over that same spaxel. We cannot do these two steps (fit and compute) separately, since we cannot modify parameter values in an already built CompoundModel instance. We need to use the current model instance while it still exists. """ def __init__(self, flux_cube, wave_array, initial_model, fitter, param_set, window=None, mask=None, spectral_axis_index=2, **kwargs): self.cube = flux_cube self.wave = wave_array self.model = initial_model self.fitter = fitter self.param_set = param_set self.window = window self.mask = mask self.spectral_axis_index = spectral_axis_index self.kw = kwargs def __call__(self): results = {'x': [], 'y': [], 'fitted_model': [], 'fitted_values': []} for parameters in self.param_set: x = parameters[0] y = parameters[1] # Calling the Spectrum constructor for every spaxel # turned out to be less expensive than expected. Experiments # show that the cost amounts to a couple percent additional # running time in comparison with a version that uses a 3D # spectrum as input. Besides, letting an externally-created # spectrum reference into the callable somehow prevents it # to execute. This behavior was seen also with other functions # passed to the callable. if self.spectral_axis_index in [2, -1]: flux = self.cube[x, y, :] if self.mask is not None: mask = self.mask[x, y, :] elif self.spectral_axis_index == 0: flux = self.cube[:, y, x] if self.mask is not None: mask = self.mask[:, y, x] if self.mask is None: # If no mask is provided: mask = np.zeros_like(flux.value).astype(bool) sp = Spectrum(spectral_axis=self.wave, flux=flux, mask=mask) if sp.uncertainty and not np.all(sp.uncertainty.array == 0): weights = 'unc' else: weights = None fitted_model = fit_lines(sp, self.model, fitter=self.fitter, window=self.window, weights=weights, **self.kw) fitted_values = fitted_model(self.wave) results['x'].append(x) results['y'].append(y) results['fitted_model'].append(fitted_model) results['fitted_values'].append(fitted_values) return results def _build_model(component_list, expression): """ Builds an astropy CompoundModel from a list of components and an expression that links them to each other. Parameters ---------- component_list : list Spectral model subcomponents stored in a list. Their ``'name'`` attribute must be unique. Each subcomponent should be an initialized object from `astropy.modeling.models' expression : str The arithmetic expression that combines together the model subcomponents. The subcomponents are referred via their ``'name'`` attribute. Returns ------- model : :class:`astropy.modeling.CompoundModel` The model resulting from the fit. """ # The expression parser needs the subcomponents stored in a # dict, with keys taken from their names. This mechanism can # be augmented with status feedback to the UI. This requires # the code in here to be refactored to provide the necessary # hooks for the GUI (see # specviz/specviz/plugins/model_editor/models.py around lines # 200-230). model_dict = {} for component in component_list: model_dict[component.name] = component aeval = Interpreter(usersyms=model_dict) model = aeval(expression) return model
[docs] def generate_spaxel_list(spectrum, spectral_axis_index=None): """ Generates a list with tuples, each one addressing the (x,y) coordinates of a spaxel in a 3-D spectrum cube. If a mask is available, skip masked indices. Parameters ---------- spectrum : :class:`specutils.Spectrum` or numpy array The spectrum that stores the cube in its ``'flux'`` attribute, or a simple numpy array of values. Returns ------- spaxels : list List with spaxels """ # Handle this being either a Spectrum or an array passed from sonification if spectral_axis_index is None and hasattr(spectrum, 'spectral_axis_index'): spectral_axis_index = spectrum.spectral_axis_index flux = getattr(spectrum, 'flux', spectrum) if spectral_axis_index in [2, -1]: n_x, n_y, _ = flux.shape elif spectral_axis_index == 0: _, n_y, n_x = flux.shape if not hasattr(spectrum, 'mask') or spectrum.mask is None: spx = [[(x, y) for x in range(n_x)] for y in range(n_y)] else: # This is a specutils.Spectrum, return only non-masked spaxels if spectrum.spectral_axis_index in [2, -1]: spx = [[(x, y) for x in range(n_x) if np.any(~spectrum.mask[x, y])] for y in range(n_y) if np.any(~spectrum.mask[:, y])] elif spectrum.spectral_axis_index == 0: spx = [[(x, y) for y in range(n_y) if np.any(~spectrum.mask[:, y, x])] for x in range(n_x) if np.any(~spectrum.mask[:, :, x])] spaxels = [item for sublist in spx for item in sublist] return spaxels