Source code for isofit.core.forward

#! /usr/bin/env python3
#
#  Copyright 2018 California Institute of Technology
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#
# ISOFIT: Imaging Spectrometer Optimal FITting
# Author: David R Thompson, david.r.thompson@jpl.nasa.gov
#

import logging
from copy import deepcopy

import numpy as np
from scipy.interpolate import interp1d
from scipy.io import loadmat
from scipy.linalg import block_diag

from isofit.configs import Config
from isofit.surface import (
    AdditiveGlintSurface,
    GlintModelSurface,
    LUTSurface,
    MultiComponentSurface,
    Surface,
    ThermalSurface,
)

from ..radiative_transfer.radiative_transfer import RadiativeTransfer
from .common import eps
from .instrument import Instrument

Logger = logging.getLogger(__file__)

### Classes ###


[docs] class ForwardModel: """ForwardModel contains all the information about how to calculate radiance measurements at a specific spectral calibration, given a state vector. It also manages the distributions of unretrieved, unknown parameters of the state vector (i.e. the S_b and K_b matrices of Rodgers et al. State vector elements always go in the following order: (1) Surface parameters (2) Radiative Transfer (RT) parameters (3) Instrument parameters The parameter bounds, scales, initial values, and names are all ordered in this way. The variable self.statevec contains the name of each state vector element, in the proper ordering. The "b" vector corresponds to the K_b calculations in Rogers (2000); the variables bvec and bval represent the model unknowns' names and their magnitudes, respectively. Larger magnitudes correspond to a larger variance in the unknown values. This acts as additional noise for the purpose of weighting the measurement information against the prior.""" def __init__(self, full_config: Config): # load in the full config (in case of inter-module dependencies) and # then designate the current config self.full_config = full_config self.config = full_config.forward_model # Build the instrument model self.instrument = Instrument(self.full_config) self.n_meas = self.instrument.n_chan # Build the radiative transfer model self.RT = RadiativeTransfer(self.full_config) # Build the surface model - this is a bit ugly with the conditionals, but the surface config should already be # forced to have appropriate properties, so at least safe # TODO: make surface a class with inheritance to make this cleaner self.surface = None if self.config.surface.surface_category == "surface": self.surface = Surface(self.full_config) elif self.config.surface.surface_category == "multicomponent_surface": self.surface = MultiComponentSurface(self.full_config) elif self.config.surface.surface_category == "additive_glint_surface": self.surface = AdditiveGlintSurface(self.full_config) elif self.config.surface.surface_category == "glint_model_surface": self.surface = GlintModelSurface(self.full_config) self.full_glint = ( self.config.surface.full_glint if "full_glint" in self.config.surface.keys() else False ) elif self.config.surface.surface_category == "thermal_surface": self.surface = ThermalSurface(self.full_config) elif self.config.surface.surface_category == "lut_surface": self.surface = LUTSurface(self.full_config) else: raise ValueError("Must specify a valid surface model") # No need to be more specific - should have been checked in config already # check if wavelength grid provided by surface model matches instrument wavelengths if self.surface.n_wl != len(self.RT.wl): raise ValueError( "Number of channels provided in surface model file does not match" " wavelengths in radiance cube. Please rebuild your surface model." ) if not np.all(np.isclose(self.surface.wl, self.RT.wl, atol=0.01)): Logger.warning( "Center wavelengths provided in surface model file do not match" " wavelengths in radiance cube. Please consider rebuilding your" " surface model for optimal performance." ) # Build combined vectors from surface, RT, and instrument bounds, scale, init, statevec, bvec, bval = ([] for i in range(6)) for obj_with_statevec in [self.surface, self.RT, self.instrument]: bounds.extend([deepcopy(x) for x in obj_with_statevec.bounds]) scale.extend([deepcopy(x) for x in obj_with_statevec.scale]) init.extend([deepcopy(x) for x in obj_with_statevec.init]) statevec.extend([deepcopy(x) for x in obj_with_statevec.statevec_names]) bvec.extend([deepcopy(x) for x in obj_with_statevec.bvec]) bval.extend([deepcopy(x) for x in obj_with_statevec.bval]) self.bounds = tuple(np.array(bounds).T) self.scale = np.array(scale) self.init = np.array(init) self.statevec = statevec self.nstate = len(self.statevec) self.bvec = np.array(bvec) self.nbvec = len(self.bvec) self.bval = np.array(bval) self.Sb = np.diagflat(np.power(self.bval, 2)) # Set up indices for references - MUST MATCH ORDER FROM ABOVE ASSIGNMENT self.idx_surface = np.arange(len(self.surface.statevec_names), dtype=int) # Sometimes, it's convenient to have the index of the entire surface # as one variable, and sometimes you want the sub-components # Split surface state vector indices to cover cases where we retrieve # additional non-reflectance surface parameters self.idx_surf_rfl = self.idx_surface[ : len(self.surface.idx_lamb) ] # reflectance portion self.idx_surf_nonrfl = self.idx_surface[ len(self.surface.idx_lamb) : ] # all non-reflectance surface parameters self.idx_RT = np.arange(len(self.RT.statevec_names), dtype=int) + len( self.idx_surface ) self.idx_instrument = ( np.arange(len(self.instrument.statevec_names), dtype=int) + len(self.idx_surface) + len(self.idx_RT) ) self.surface_b_inds = np.arange(len(self.surface.bvec), dtype=int) self.RT_b_inds = np.arange(len(self.RT.bvec), dtype=int) + len( self.surface_b_inds ) self.instrument_b_inds = ( np.arange(len(self.instrument.bvec), dtype=int) + len(self.surface_b_inds) + len(self.RT_b_inds) ) # Load model discrepancy correction if self.config.model_discrepancy_file is not None: D = loadmat(self.config.model_discrepancy_file) self.model_discrepancy = D["cov"] else: self.model_discrepancy = None
[docs] def out_of_bounds(self, x): """Check if state vector is within bounds.""" x_RT = x[self.idx_RT] bound_lwr = self.bounds[0] bound_upr = self.bounds[1] return any(x_RT >= (bound_upr[self.idx_RT] - eps * 2.0)) or any( x_RT <= (bound_lwr[self.idx_RT] + eps * 2.0) )
[docs] def xa(self, x, geom): """Calculate the prior mean of the state vector (the concatenation of state vectors for the surface, Radiative Transfer model, and instrument). NOTE: the surface prior mean depends on the current state; this is so we can calculate the local prior. """ x_surface = x[self.idx_surface] xa_surface = self.surface.xa(x_surface, geom) xa_RT = self.RT.xa() xa_instrument = self.instrument.xa() return np.concatenate((xa_surface, xa_RT, xa_instrument), axis=0)
[docs] def Sa(self, x, geom): """Calculate the prior covariance of the state vector (the concatenation of state vectors for the surface and radiative transfer model). NOTE: the surface prior depends on the current state; this is so we can calculate the local prior. """ x_surface = x[self.idx_surface] Sa_surface = self.surface.Sa(x_surface, geom)[:, :] Sa_RT = self.RT.Sa()[:, :] Sa_instrument = self.instrument.Sa()[:, :] return block_diag(Sa_surface, Sa_RT, Sa_instrument)
[docs] def calc_rdn(self, x, geom, rfl=None, Ls=None): """Calculate the high-resolution radiance, permitting overrides. Project to top-of-atmosphere and translate to radiance. The radiative transfer calculations may take place at higher resolution so we upsample surface terms. """ x_surface, x_RT, x_instrument = self.unpack(x) if rfl is None: rfl = self.surface.calc_rfl(x_surface, geom) if Ls is None: Ls = self.surface.calc_Ls(x_surface, geom) rfl_hi = self.upsample(self.surface.wl, rfl) Ls_hi = self.upsample(self.surface.wl, Ls) return self.RT.calc_rdn(x_RT, rfl_hi, Ls_hi, geom)
[docs] def calc_meas(self, x, geom, rfl=None, Ls=None): """Calculate the model observation at instrument wavelengths.""" x_surface, x_RT, x_instrument = self.unpack(x) rdn_hi = self.calc_rdn(x, geom, rfl, Ls) return self.instrument.sample(x_instrument, self.RT.wl, rdn_hi)
[docs] def calc_Ls(self, x, geom): """Calculate the surface emission.""" return self.surface.calc_Ls(x[self.idx_surface], geom)
[docs] def calc_rfl(self, x, geom): """Calculate the surface reflectance.""" return self.surface.calc_rfl(x[self.idx_surface], geom)
[docs] def calc_lamb(self, x, geom): """Calculate the Lambertian surface reflectance.""" return self.surface.calc_lamb(x[self.idx_surface], geom)
[docs] def Seps(self, x, meas, geom): """Calculate the total uncertainty of the observation, including up to three terms: (1) the instrument noise; (2) the uncertainty due to explicit unmodeled variables, i.e. the S_epsilon matrix of Rodgers et al.; and (3) an aggregate 'model discrepancy' term, Gamma.""" if self.model_discrepancy is not None: Gamma = self.model_discrepancy else: Gamma = 0 Kb = self.Kb(x, geom) Sy = self.instrument.Sy(meas, geom) return Sy + Kb.dot(self.Sb).dot(Kb.T) + Gamma
[docs] def K(self, x, geom): """Derivative of observation with respect to state vector. This is the concatenation of jacobians with respect to parameters of the surface and radiative transfer model.""" # Unpack state vector x_surface, x_RT, x_instrument = self.unpack(x) # Get partials of reflectance WRT surface state variables, upsample rfl = self.surface.calc_rfl(x_surface, geom) drfl_dsurface = self.surface.drfl_dsurface(x_surface, geom) rfl_hi = self.upsample(self.surface.wl, rfl) drfl_dsurface_hi = self.upsample(self.surface.wl, drfl_dsurface.T).T # Get partials of emission WRT surface state variables, upsample Ls = self.surface.calc_Ls(x_surface, geom) dLs_dsurface = self.surface.dLs_dsurface(x_surface, geom) Ls_hi = self.upsample(self.surface.wl, Ls) dLs_dsurface_hi = self.upsample(self.surface.wl, dLs_dsurface.T).T # Derivatives of RTM radiance drdn_dRT, drdn_dsurface = self.RT.drdn_dRT( x_RT, rfl_hi, drfl_dsurface_hi, Ls_hi, dLs_dsurface_hi, geom ) # Derivatives of measurement, avoiding recalculation of rfl, Ls dmeas_dsurface = self.instrument.sample( x_instrument, self.RT.wl, drdn_dsurface.T ).T dmeas_dRT = self.instrument.sample(x_instrument, self.RT.wl, drdn_dRT.T).T rdn_hi = self.calc_rdn(x, geom, rfl=rfl, Ls=Ls) dmeas_dinstrument = self.instrument.dmeas_dinstrument( x_instrument, self.RT.wl, rdn_hi ) # Put it all together K = np.zeros((self.n_meas, self.nstate), dtype=float) K[:, self.idx_surface] = dmeas_dsurface K[:, self.idx_RT] = dmeas_dRT K[:, self.idx_instrument] = dmeas_dinstrument return K
[docs] def Kb(self, x, geom): """Derivative of measurement with respect to unmodeled & unretrieved unknown variables, e.g. S_b. This is the concatenation of Jacobians with respect to parameters of the surface, radiative transfer model, and instrument. Currently we only treat uncertainties in the instrument and RT model.""" # Unpack state vector x_surface, x_RT, x_instrument = self.unpack(x) # Get partials of reflectance and upsample rfl = self.surface.calc_rfl(x_surface, geom) rfl_hi = self.upsample(self.surface.wl, rfl) Ls = self.surface.calc_Ls(x_surface, geom) Ls_hi = self.upsample(self.surface.wl, Ls) rdn_hi = self.calc_rdn(x, geom, rfl=rfl, Ls=Ls) drdn_dRTb = self.RT.drdn_dRTb(x_RT, x_surface, rfl_hi, Ls_hi, geom) dmeas_dRTb = self.instrument.sample(x_instrument, self.RT.wl, drdn_dRTb.T).T dmeas_dinstrumentb = self.instrument.dmeas_dinstrumentb( x_instrument, self.RT.wl, rdn_hi ) # Put it together Kb = np.zeros((self.n_meas, self.nbvec), dtype=float) Kb[:, self.RT_b_inds] = dmeas_dRTb Kb[:, self.instrument_b_inds] = dmeas_dinstrumentb return Kb
[docs] def summarize(self, x, geom): """State vector summary.""" x_surface, x_RT, x_instrument = self.unpack(x) return ( self.surface.summarize(x_surface, geom) + " " + self.RT.summarize(x_RT, geom) + " " + self.instrument.summarize(x_instrument, geom) )
[docs] def calibration(self, x): """Calculate measured wavelengths and fwhm.""" x_inst = x[self.idx_instrument] return self.instrument.calibration(x_inst)
[docs] def upsample(self, wl, q): """Linear interpolation to RT wavelengths.""" # In some cases, these differ only by a tiny amount, # so no need to waste time interpolating if (len(wl) == len(self.RT.wl)) and np.allclose(wl, self.RT.wl): return q if q.ndim > 1: q2 = [] for qi in q: p = interp1d(wl, qi, fill_value="extrapolate") q2.append(p(self.RT.wl)) return np.array(q2) else: p = interp1d(wl, q, fill_value="extrapolate") return p(self.RT.wl)
[docs] def unpack(self, x): """Unpack the state vector in appropriate index ordering.""" x_surface = x[self.idx_surface] x_RT = x[self.idx_RT] x_instrument = x[self.idx_instrument] return x_surface, x_RT, x_instrument