Source code for isofit.inversion.inverse

#! /usr/bin/env python3
#
#  Copyright 2018 California Institute of Technology
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#
# ISOFIT: Imaging Spectrometer Optimal FITting
# Author: David R Thompson, david.r.thompson@jpl.nasa.gov
#

import logging
import time
from collections import OrderedDict

import numpy as np
import scipy.linalg
from scipy.optimize import least_squares

from isofit.configs import Config
from isofit.configs.sections.implementation_config import InversionConfig
from isofit.core.common import combos, conditional_gaussian, eps, svd_inv, svd_inv_sqrt
from isofit.core.forward import ForwardModel

from .inverse_simple import invert_simple

### Variables ###

error_code = -1


### Classes ###


[docs] class Inversion: def __init__(self, full_config: Config, forward: ForwardModel): """Initialization specifies retrieval subwindows for calculating measurement cost distributions.""" config: InversionConfig = full_config.implementation.inversion self.config = config self.lasttime = time.time() self.fm = forward self.hashtable = OrderedDict() # Hash table for caching inverse matrices self.max_table_size = full_config.implementation.max_hash_table_size self.state_indep_S_hat = False self.windows = config.windows # Retrieval windows self.mode = full_config.implementation.mode self.state_indep_S_hat = config.cressie_map_confidence # We calculate the instrument channel indices associated with the # retrieval windows using the initial instrument calibration. These # window indices never change throughout the life of the object. self.winidx = np.array((), dtype=int) # indices of retrieval windows for lo, hi in self.windows: idx = np.where( np.logical_and( self.fm.instrument.wl_init > lo, self.fm.instrument.wl_init < hi ) )[0] self.winidx = np.concatenate((self.winidx, idx), axis=0) self.outside_ret_windows = np.ones(self.fm.n_meas, dtype=bool) self.outside_ret_windows[self.winidx] = False self.counts = 0 self.inversions = 0 self.integration_grid = OrderedDict(config.integration_grid) self.grid_as_starting_points = config.inversion_grid_as_preseed if self.grid_as_starting_points: # We're using the integration grid to preseed, not fix values. So # Track the grid, but don't fix the integration grid points self.inds_fixed = [] self.inds_preseed = np.array( [self.fm.statevec.index(k) for k in self.integration_grid.keys()] ) self.inds_free = np.array( [ i for i in np.arange(self.fm.nstate, dtype=int) if not (i in self.inds_fixed) ] ) else: # We're using the integration grid to fix values. So # Get set up to fix the integration grid points self.inds_fixed = np.array( [self.fm.statevec.index(k) for k in self.integration_grid.keys()] ) self.inds_free = np.array( [ i for i in np.arange(self.fm.nstate, dtype=int) if not (i in self.inds_fixed) ] ) self.inds_preseed = [] self.x_fixed = None # Set least squares params that come from the forward model self.least_squares_params = { "method": "trf", "max_nfev": 20, "bounds": ( self.fm.bounds[0][self.inds_free], self.fm.bounds[1][self.inds_free], ), "x_scale": self.fm.scale[self.inds_free], } # Update the rest from the config for ( key, item, ) in config.least_squares_params.get_config_options_as_dict().items(): self.least_squares_params[key] = item
[docs] def full_statevector(self, x_free): x = np.zeros(self.fm.nstate) if self.x_fixed is not None: x[self.inds_fixed] = self.x_fixed x[self.inds_free] = x_free return x
[docs] def calc_conditional_prior(self, x_free, geom): """Calculate prior distribution of radiance. This depends on the location in the state space. Return the inverse covariance and its square root (for non-quadratic error residual calculation).""" x = self.full_statevector(x_free) xa = self.fm.xa(x, geom) Sa = self.fm.Sa(x, geom) # If there aren't any fixed parameters, we just directly if self.x_fixed is None or self.grid_as_starting_points: Sa_inv, Sa_inv_sqrt = svd_inv_sqrt( Sa, hashtable=self.hashtable, max_hash_size=self.max_table_size ) return xa, Sa, Sa_inv, Sa_inv_sqrt else: # otherwise condition on fixed variables # TODO: could make the below calculation without the svd_inv (using full initial inversion), # which would be way cheaper xa_free, Sa_free = conditional_gaussian( xa, Sa, self.inds_free, self.inds_fixed, self.x_fixed ) Sa_free_inv, Sa_free_inv_sqrt = svd_inv_sqrt( Sa_free, hashtable=self.hashtable, max_hash_size=self.max_table_size ) return xa_free, Sa_free, Sa_free_inv, Sa_free_inv_sqrt
[docs] def calc_prior(self, x, geom): """Calculate prior distribution of radiance. This depends on the location in the state space. Return the inverse covariance and its square root (for non-quadratic error residual calculation).""" xa = self.fm.xa(x, geom) Sa = self.fm.Sa(x, geom) Sa_inv, Sa_inv_sqrt = svd_inv_sqrt( Sa, hashtable=self.hashtable, max_hash_size=self.max_table_size ) return xa, Sa, Sa_inv, Sa_inv_sqrt
[docs] def calc_posterior(self, x, geom, meas): """Calculate posterior distribution of state vector. This depends both on the location in the state space and the radiance (via noise).""" xa = self.fm.xa(x, geom) Sa = self.fm.Sa(x, geom) Sa_inv = svd_inv( Sa, hashtable=self.hashtable, max_hash_size=self.max_table_size ) K = self.fm.K(x, geom) Seps = self.fm.Seps(x, meas, geom) Seps_inv = svd_inv( Seps, hashtable=self.hashtable, max_hash_size=self.max_table_size ) # Gain matrix G reflects current state, so we use the state-dependent # Jacobian matrix K S_hat = svd_inv( K.T.dot(Seps_inv).dot(K) + Sa_inv, hashtable=self.hashtable, max_hash_size=self.max_table_size, ) G = S_hat.dot(K.T).dot(Seps_inv) # N. Cressie [ASA 2018] suggests an alternate definition of S_hat for # more statistically-consistent posterior confidence estimation if self.state_indep_S_hat: Ka = self.fm.K(xa, geom) S_hat = svd_inv( Ka.T.dot(Seps_inv).dot(Ka) + Sa_inv, hashtable=self.hashtable, max_hash_size=self.max_table_size, ) return S_hat, K, G
[docs] def calc_Seps(self, x, meas, geom): """Calculate (zero-mean) measurement distribution in radiance terms. This depends on the location in the state space. This distribution is calculated over one or more subwindows of the spectrum. Return the inverse covariance and its square root.""" Seps = self.fm.Seps(x, meas, geom) wn = len(self.winidx) Seps_win = np.zeros((wn, wn)) for i in range(wn): Seps_win[i, :] = Seps[self.winidx[i], self.winidx] return svd_inv_sqrt( Seps_win, hashtable=self.hashtable, max_hash_size=self.max_table_size )
[docs] def jacobian(self, x_free, geom, Seps_inv_sqrt) -> np.ndarray: """Calculate measurement Jacobian and prior Jacobians with respect to cost function. This is the derivative of cost with respect to the state, commonly known as the gradient or loss surface. The cost is expressed as a vector of 'residuals' with respect to the prior and measurement, expressed in absolute (not quadratic) terms for the solver; It is the square root of the Rodgers (2000) Chi-square version. All measurement distributions are calculated over subwindows of the full spectrum. Args: x_free: decision variables - portion of the statevector not fixed by a static integration grid geom: Geometry to use for inversion Seps_inv_sqrt: Inverse square root of the covariance of "observation noise", including both measurement noise from the instrument as well as variability due to unknown variables. Returns: total_jac: The complete (measurement and prior) jacobian """ x = self.full_statevector(x_free) # jacobian of measurment cost term WRT full state vector. K = self.fm.K(x, geom)[self.winidx, :] K = K[:, self.inds_free] meas_jac = Seps_inv_sqrt.dot(K) # jacobian of prior cost term with respect to state vector. xa_free, Sa_free, Sa_free_inv, Sa_free_inv_sqrt = self.calc_conditional_prior( x_free, geom ) prior_jac = Sa_free_inv_sqrt # The total cost vector (as presented to the solver) is the # concatenation of the "residuals" due to the measurement # and prior distributions. They will be squared internally by # the solver. total_jac = np.real(np.concatenate((meas_jac, prior_jac), axis=0)) return total_jac
[docs] def loss_function(self, x_free, geom, Seps_inv_sqrt, meas) -> (np.array, np.array): """Calculate cost function expressed here in absolute (not quadratic) terms for the solver, i.e., the square root of the Rodgers (2000) Chi-square version. We concatenate 'residuals' due to measurment and prior terms, suitably scaled. All measurement distributions are calculated over subwindows of the full spectrum. Args: x_free: decision variables - portion of the statevector not fixed by a static integration grid geom: Geometry to use for inversion Seps_inv_sqrt: Inverse square root of the covariance of "observation noise", including both measurement noise from the instrument as well as variability due to unknown variables. meas: a one-D scipy vector of radiance in uW/nm/sr/cm2 Returns: total_residual: the complete, calculated residual x: the complete (x_free + any x_fixed augmented in) """ # set up full-sized state vector x = self.full_statevector(x_free) # Measurement cost term. Will calculate reflectance and Ls from # the state vector. est_meas = self.fm.calc_meas(x, geom, rfl=None, Ls=None) est_meas_window = est_meas[self.winidx] meas_window = meas[self.winidx] meas_resid = (est_meas_window - meas_window).dot(Seps_inv_sqrt) # Prior cost term xa_free, Sa_free, Sa_free_inv, Sa_free_inv_sqrt = self.calc_conditional_prior( x_free, geom ) prior_resid = (x_free - xa_free).dot(Sa_free_inv_sqrt) # Total cost total_resid = np.concatenate((meas_resid, prior_resid)) return np.real(total_resid), x
[docs] def invert(self, meas, geom): """Inverts a meaurement and returns a state vector. Args: meas: a one-D scipy vector of radiance in uW/nm/sr/cm2 geom: a geometry object Returns: final_solution: a converged state vector solution """ self.counts = 0 costs, solutions = [], [] # Simulations are easy - return the initial state vector if self.mode == "simulation": self.fm.surface.rfl = meas return np.array([self.fm.init.copy()]) if len(self.integration_grid.values()) == 0: combo_values = [None] else: combo_values = combos(self.integration_grid.values()).copy() for combo in combo_values: if self.grid_as_starting_points is False: self.x_fixed = combo trajectory = [] # Calculate the initial solution, if needed. x0 = invert_simple(self.fm, meas, geom) # Update regions outside retrieval windows to match priors if self.config.priors_in_initial_guess: prior_subset_idx = np.arange(len(x0))[self.fm.idx_surf_rfl][ self.outside_ret_windows ] x0[prior_subset_idx] = self.fm.surface.xa(x0, geom)[prior_subset_idx] trajectory.append(x0) x0 = x0[self.inds_free] # Catch any state vector elements outside of bounds lower_bound_violation = x0 < self.fm.bounds[0][self.inds_free] x0[lower_bound_violation] = ( self.fm.bounds[0][self.inds_free][lower_bound_violation] + eps ) upper_bound_violation = x0 > self.fm.bounds[1][self.inds_free] x0[upper_bound_violation] = ( self.fm.bounds[1][self.inds_free][upper_bound_violation] - eps ) del lower_bound_violation, upper_bound_violation # Find the full state vector with bounds checked x = self.full_statevector(x0) # Regardless of anything we did for the heuristic guess, bring the # static preseed back into play (only does anything if inds_preseed # is not blank) if len(self.inds_preseed) > 0: x0[self.inds_preseed] = combo # Record initializaation state geom.x_surf_init = x[self.fm.idx_surface] geom.x_RT_init = x[self.fm.idx_RT] # Seps is the covariance of "observation noise" including both # measurement noise from the instrument as well as variability due to # unknown variables. For speed, we will calculate it just once based # on the initial solution (a potential minor source of inaccuracy). Seps_inv, Seps_inv_sqrt = self.calc_Seps(x, meas, geom) def jac(x_free): """Short wrapper function for use with scipy opt""" return self.jacobian(x_free, geom, Seps_inv_sqrt) def err(x_free): """Short wrapper function for use with scipy opt and logging""" residual, x = self.loss_function(x_free, geom, Seps_inv_sqrt, meas) trajectory.append(x) it = len(trajectory) rs = float(np.sum(np.power(residual, 2))) sm = self.fm.summarize(x, geom) logging.debug("Iteration: %02i Residual: %12.2f %s" % (it, rs, sm)) return np.real(residual) # Initialize and invert try: xopt = least_squares(err, x0, jac=jac, **self.least_squares_params) x_full_solution = self.full_statevector(xopt.x) trajectory.append(x_full_solution) solutions.append(trajectory) costs.append(np.sqrt(np.power(xopt.fun, 2).sum())) except scipy.linalg.LinAlgError: logging.warning("Optimization failed to converge") solutions.append(trajectory) costs.append(9e99) final_solution = np.array(solutions[np.argmin(costs)]) return final_solution
[docs] def forward_uncertainty(self, x, meas, geom): """ Args: x: statevector meas: a one-D scipy vector of radiance in uW/nm/sr/cm2 geom: a geometry object Returns: lamb: the converged lambertian surface reflectance path: the converged path radiance estimate mdl: the modeled radiance estimate S_hat: the posterior covariance of the state vector K: the derivative matrix d meas_x / d state_x G: the G matrix from the CD Rodgers 2000 formalism """ dark_surface = np.zeros(self.fm.surface.wl.shape) path = self.fm.calc_meas(x, geom, rfl=dark_surface) mdl = self.fm.calc_meas(x, geom, rfl=None, Ls=None) lamb = self.fm.calc_lamb(x, geom) S_hat, K, G = self.calc_posterior(x, geom, meas) return lamb, mdl, path, S_hat, K, G