Source code for isofit.inversion.inverse_simple

#! /usr/bin/env python3
#
#  Copyright 2018 California Institute of Technology
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#
# ISOFIT: Imaging Spectrometer Optimal FITting
# Author: David R Thompson, david.r.thompson@jpl.nasa.gov

import os
from typing import OrderedDict

import numpy as np
import pandas as pd
from scipy.interpolate import interp1d
from scipy.optimize import least_squares, minimize
from scipy.optimize import minimize_scalar as min1d

from isofit.core.common import (
    emissive_radiance,
    eps,
    get_refractive_index,
    svd_inv_sqrt,
)
from isofit.core.forward import ForwardModel
from isofit.core.geometry import Geometry
from isofit.core.instrument import Instrument
from isofit.radiative_transfer.radiative_transfer import RadiativeTransfer
from isofit.surface.surface import Surface


[docs] def heuristic_atmosphere( RT: RadiativeTransfer, instrument: Instrument, x_RT: np.array, x_instrument: np.array, meas: np.array, geom: Geometry, ): """From a given radiance, estimate atmospheric state with band ratios. Used to initialize gradient descent inversions. Args: RT: radiative transfer model to use instrument: instrument for noise characterization x_RT: radiative transfer portion of the state vector x_instrument: instrument portion of the state vector meas: a one-D numpy vector of radiance in uW/nm/sr/cm2 geom: geometry object corresponding to given measurement Returns: x_new: updated estimate of x_RT """ # Identify the latest instrument wavelength calibration (possibly # state-dependent) and identify channel numbers for the band ratio. wl, fwhm = instrument.calibration(x_instrument) b865 = np.argmin(abs(wl - 865)) b945 = np.argmin(abs(wl - 945)) b1040 = np.argmin(abs(wl - 1040)) if not (any(RT.wl > 850) and any(RT.wl < 1050)): return x_RT x_new = x_RT.copy() # Figure out which RT object we are using # TODO: this is currently very specific to vswir-tir 2-mode, eventually generalize my_RT = None for rte in RT.rt_engines: if rte.treat_as_emissive is False: my_RT = rte break if not my_RT: raise ValueError("No suiutable RT object for initialization") # Band ratio retrieval of H2O. Depending on the radiative transfer # model we are using, this state parameter could go by several names. for h2oname in ["H2OSTR", "h2o"]: if h2oname not in RT.statevec_names: continue # ignore unused names if h2oname not in my_RT.lut_names: continue # find the index in the lookup table associated with water vapor ind_sv = RT.statevec_names.index(h2oname) h2os, ratios = [], [] # We iterate through every possible grid point in the lookup table, # calculating the band ratio that we would see if this were the # atmospheric H2O content. It assumes that defaults for all other # atmospheric parameters (such as aerosol, if it is there). for h2o in my_RT.lut_grid[h2oname]: # Get Atmospheric terms at high spectral resolution x_RT_2 = x_RT.copy() x_RT_2[ind_sv] = h2o rhi = RT.get_shared_rtm_quantities(x_RT_2, geom) rhoatm = instrument.sample(x_instrument, RT.wl, rhi["rhoatm"]) transm = instrument.sample( x_instrument, RT.wl, rhi["transm_down_dif"] ) # REVIEW: This was changed from transm as we're deprecating the key sphalb = instrument.sample(x_instrument, RT.wl, rhi["sphalb"]) solar_irr = instrument.sample(x_instrument, RT.wl, RT.solar_irr) # Assume no surface emission. "Correct" the at-sensor radiance # using this presumed amount of water vapor, and measure the # resulting residual (as measured from linear interpolation across # the absorption feature) rho = meas * np.pi / (solar_irr * RT.coszen) r = 1.0 / (transm / (rho - rhoatm) + sphalb) ratios.append((r[b945] * 2.0) / (r[b1040] + r[b865])) h2os.append(h2o) # Finally, interpolate to determine the actual water vapor level that # would optimize the continuum-relative correction p = interp1d(h2os, ratios) bounds = (h2os[0] + 0.001, h2os[-1] - 0.001) best = min1d(lambda h: abs(1 - p(h)), bounds=bounds, method="bounded") x_new[ind_sv] = best.x return x_new
[docs] def invert_algebraic( surface: Surface, RT: RadiativeTransfer, instrument: Instrument, x_surface: np.array, x_RT: np.array, x_instrument: np.array, meas: np.array, geom: Geometry, ): """Inverts radiance algebraically using Lambertian assumptions to get a reflectance. Args: surface: surface model RT: radiative transfer model to use instrument: instrument model x_surface: surface portion of the state vector x_RT: radiative transfer portion of the state vector x_instrument: instrument portion of the state vector meas: a one-D numpy vector of radiance in uW/nm/sr/cm2 geom: geometry object corresponding to given measurement Return: rfl_est: estimate of the surface reflectance based on the given surface model and specified atmospheric state Ls: estimate of the emitted surface leaving radiance coeffs: atmospheric parameters for the forward model """ # Get atmospheric optical parameters (possibly at high # spectral resolution) and resample them if needed. rhi = RT.get_shared_rtm_quantities(x_RT, geom) wl, fwhm = instrument.calibration(x_instrument) rhoatm = instrument.sample(x_instrument, RT.wl, rhi["rhoatm"]) transm = instrument.sample( x_instrument, RT.wl, rhi["transm_down_dif"] ) # REVIEW: Changed from transm solar_irr = instrument.sample(x_instrument, RT.wl, RT.solar_irr) sphalb = instrument.sample(x_instrument, RT.wl, rhi["sphalb"]) transup = instrument.sample( x_instrument, RT.wl, rhi["transm_up_dir"] ) # REVIEW: Changed from transup coszen = RT.coszen # Prevent NaNs transm[transm == 0] = 1e-5 # Calculate the initial emission and subtract from the measurement. # Surface and measured wavelengths may differ. Ls = surface.calc_Ls(x_surface, geom) Ls_meas = interp1d(surface.wl, Ls, fill_value="extrapolate")(wl) rdn_solrfl = meas - (transup * Ls_meas) # Now solve for the reflectance at measured wavelengths, # and back-translate to surface wavelengths rho = rdn_solrfl * np.pi / (solar_irr * coszen) rfl = 1.0 / (transm / (rho - rhoatm) + sphalb) rfl[rfl > 1.0] = 1.0 rfl_est = interp1d(wl, rfl, fill_value="extrapolate")(surface.wl) # Some downstream code will benefit from our precalculated # atmospheric optical parameters coeffs = rhoatm, sphalb, transm, solar_irr, coszen, transup return rfl_est, Ls, coeffs
[docs] def invert_analytical( fm: ForwardModel, winidx: np.array, meas: np.array, geom: Geometry, x_RT: np.array, num_iter: int = 1, hash_table: OrderedDict = None, hash_size: int = None, diag_uncert: bool = True, outside_ret_const: float = -0.01, ): """Perform an analytical estimate of the conditional MAP estimate for a fixed atmosphere. Based on the "Inner loop" from Susiluoto, 2022. Args: fm: isofit forward model winidx: indices of surface components of state vector (to be solved) meas: a one-D numpy vector of radiance in uW/nm/sr/cm2 geom: geometry object corresponding to given measurement x_RT: the radiative transfer state variables num_iter: number of interations to run through hash_table: a hash table to use locally hash_size: max size of given hash table diag_uncert: flag indicating whether to diagonalize the uncertainty Returns: x: MAP estimate of the mean S: diagonal conditional posterior covariance estimate """ from scipy.linalg.blas import dsymv from scipy.linalg.lapack import dpotrf, dpotri # x = x0.copy() # x_surface, x_RT, x_instrument = fm.unpack(x) # Note, this will fail if x_instrument is populated if len(fm.idx_instrument) > 0: raise AttributeError( "Invert analytical not currently set to handle instrument state variable" " indexing" ) x = np.zeros(fm.nstate) x[fm.idx_RT] = x_RT x_alg = invert_algebraic( fm.surface, fm.RT, fm.instrument, x[fm.idx_surface], x_RT, x[fm.idx_instrument], meas, geom, ) if fm.RT.glint_model: x_surf = fm.surface.fit_params(x_alg[0], geom) x[fm.idx_surface] = x_surf else: x[fm.idx_surface] = x_alg[0] trajectory = [] outside_ret_windows = np.ones(len(fm.idx_surface), dtype=bool) outside_ret_windows[winidx] = False outside_ret_windows = np.where(outside_ret_windows)[0] x_surface, x_RT, x_instrument = fm.unpack(x) Seps = fm.Seps(x, meas, geom)[winidx, :][:, winidx] Sa = fm.Sa(x, geom) Sa_surface = Sa[fm.idx_surface, :][:, fm.idx_surface] Sa_inv = svd_inv_sqrt(Sa_surface, hash_table, hash_size)[0] xa_full = fm.xa(x, geom) xa_surface = xa_full[fm.idx_surface] if fm.RT.glint_model: prprod = Sa_inv @ xa_surface # obtain needed RT vectors r = fm.RT.get_L_atm(x_RT, geom)[winidx] # path radiance t1 = fm.RT.get_L_down_transmitted(x_RT, geom)[ winidx ] # total transmittance (down * up, direct + diffuse) rtm_quant = fm.RT.get_shared_rtm_quantities(x_RT, geom) t_down_dir = rtm_quant["t_down_dir"][winidx] # downward direct transmittance t_down_dif = rtm_quant["t_down_dif"][winidx] # downward diffuse transmittance t_down_total = t_down_dir + t_down_dif # downward total transmittance s = rtm_quant["sphalb"][winidx] # spherical albedo rho_ls = 0.02 # fresnel reflectance factor (approx. 0.02 for nadir view) g_dir = rho_ls * (t_down_dir / t_down_total) # direct sky transmittance g_dif = rho_ls * (t_down_dif / t_down_total) # diffuse sky transmittance nl = len(r) H = np.zeros((nl, nl + 2)) np.fill_diagonal(H, 1) H[:, -2] = g_dir H[:, -1] = g_dif GIv = 1 / np.diag(Seps) xk = x[fm.idx_surface].copy() trajectory.append(xk) for n in range(num_iter): Dv = t1 / (1 - s * (xk[:nl] + xk[-2] * g_dir + xk[-1] * g_dif)) L = Dv.reshape((-1, 1)) * H M = GIv.reshape((-1, 1)) * L S = L.T @ M C_rcond = np.linalg.inv(Sa_inv + S) z = meas - r xk = C_rcond @ (M.T @ z + prprod) trajectory.append(xk) if fm.full_glint: trajectory.append(trajectory[-1][-2] * g_dir) trajectory.append(trajectory[-1][-1] * g_dif) else: for n in range(num_iter): L_atm = fm.RT.get_L_atm(x_RT, geom)[winidx] L_tot = fm.calc_rdn(x, geom)[winidx] L_surf = (L_tot - L_atm) / x_surface[winidx] # = Ldiag L_surf[L_surf < 0] = 1e-9 # Match Jouni's convention C = Seps # C = 'meas_Cov' = Seps L = dpotrf(C, 1)[0] P = dpotri(L, 1)[0] P_rpr = Sa_inv[winidx, :][:, winidx] mu_rpr = xa_surface[winidx] priorprod = P_rpr @ mu_rpr P_tilde = ((L_surf * P).T * L_surf).T P_rcond = P_rpr + P_tilde LI_rcond = dpotrf(P_rcond)[0] C_rcond = dpotri(LI_rcond)[0] # Calculate AOE mean mu = dsymv( 1, C_rcond, L_surf * dsymv(1, P, meas[winidx] - L_atm) + priorprod ) full_mu = np.zeros(len(x_surface)) full_mu[winidx] = mu # Calculate conditional prior mean to fill in # xa_cond, Sa_cond = conditional_gaussian(xa_full, Sa, outside_ret_windows, winidx, mu) # full_mu[outside_ret_windows] = xa_cond if outside_ret_const is None: full_mu[outside_ret_windows] = xa_surface[outside_ret_windows] else: full_mu[outside_ret_windows] = outside_ret_const x[fm.idx_surface] = full_mu trajectory.append(x) if diag_uncert: full_unc = np.ones(len(x)) if fm.RT.glint_model: C_rcond_idx = np.concatenate((winidx, fm.idx_surface[-2:]), axis=0) full_unc[C_rcond_idx] = np.sqrt(np.diag(C_rcond)) else: full_unc[winidx] = np.sqrt(np.diag(C_rcond)) return trajectory, full_unc else: return trajectory, C_rcond
[docs] def invert_simple(forward: ForwardModel, meas: np.array, geom: Geometry): """Find an initial guess at the state vector. This currently uses traditional (non-iterative, heuristic) atmospheric correction. Args: forward: isofit forward model meas: a one-D numpy vector of radiance in uW/nm/sr/cm2 geom: geometry object corresponding to given measurement Returns: x: estimate of the full statevector based on initial conditions, geometry, and a heuristic guess """ surface = forward.surface RT = forward.RT instrument = forward.instrument vswir_present = False if any(forward.surface.wl < 2600): vswir_present = True tir_present = False if any(forward.surface.wl > 2600): tir_present = True # First step is to get the atmosphere. We start from the initial state # and estimate atmospheric terms using traditional heuristics. x = forward.init.copy() x_surface, x_RT, x_instrument = forward.unpack(x) if vswir_present: x[forward.idx_RT] = heuristic_atmosphere( RT, instrument, x_RT, x_instrument, meas, geom ) # Now, with atmosphere fixed, we can invert the radiance algebraically # via Lambertian approximations to get reflectance x_surface, x_RT, x_instrument = forward.unpack(x) rfl_est, Ls_est, coeffs = invert_algebraic( surface, RT, instrument, x_surface, x_RT, x_instrument, meas, geom ) # Condition thermal part on the VSWIR portion. Only works for # Multicomponent surfaces. Finds the cluster nearest the VSWIR heuristic # inversion and uses it for the TIR suface initialization. if tir_present: tir_idx = np.where(forward.surface.wl > 3000)[0] if vswir_present: x_surface_temp = x_surface.copy() x_surface_temp[: len(rfl_est)] = rfl_est mu = forward.surface.xa(x_surface_temp, geom) rfl_est[tir_idx] = mu[tir_idx] else: rfl_est = 0.03 * np.ones(len(forward.surface.wl)) # Now we have an estimated reflectance. Fit the surface parameters. x_surface[forward.idx_surface] = forward.surface.fit_params(rfl_est, geom) # Find temperature of emissive surfaces if tir_present: # Estimate the total radiance at sensor, leaving out surface emission # Radiate transfer calculations could take place at high spectral resolution # so we upsample the surface reflectance rfl_hi = forward.upsample(forward.surface.wl, rfl_est) rhoatm, sphalb, transm, solar_irr, coszen, transup = coeffs L_atm = RT.get_L_atm(x_RT, geom) L_down_transmitted = RT.get_L_down_transmitted(x_RT, geom) L_total_without_surface_emission = L_atm + L_down_transmitted * rfl_hi / ( 1.0 - sphalb * rfl_hi ) # These tend to have high transmission factors; the emissivity of most # materials is nearly 1 for these bands, so they are good for # initializing the surface temperature. clearest_wavelengths = [10125.0, 10390.00, 10690.00] # This is fragile if other instruments have different wavelength # spacing or range clearest_indices = [ np.argmin(np.absolute(RT.wl - w)) for w in clearest_wavelengths ] # Error function for nonlinear temperature fit def err(z): T = z emissivity = forward.surface.emissivity_for_surface_T_init Ls_est, d = emissive_radiance( emissivity, T, forward.surface.wl[clearest_indices] ) resid = ( transup[clearest_indices] * Ls_est + L_total_without_surface_emission[clearest_indices] - meas[clearest_indices] ) return sum(resid**2) # Fit temperature, set bounds, and set the initial values idx_T = forward.surface.surf_temp_ind Tinit = np.array([forward.surface.init[idx_T]]) Tbest = minimize(err, Tinit).x T = max( forward.surface.bounds[idx_T][0] + eps, min(Tbest, forward.surface.bounds[idx_T][1] - eps), ) x_surface[idx_T] = Tbest forward.surface.init[idx_T] = T # Update the full state vector x[forward.idx_surface] = x_surface # If available, get initial guess of surface elevation from location file. if geom.surface_elevation_km and "surface_elevation_km" in RT.statevec_names: ind_sv = forward.idx_RT[RT.statevec_names.index("surface_elevation_km")] if geom.surface_elevation_km < 0.0: x[ind_sv] = 0.0 else: x[ind_sv] = geom.surface_elevation_km # We record these initial values in the geometry object - the only # "stateful" part of the retrieval geom.x_surf_init = x[forward.idx_surface] geom.x_RT_init = x[forward.idx_RT] return x
[docs] def invert_liquid_water( rfl_meas: np.array, wl: np.array, l_shoulder: float = 850, r_shoulder: float = 1100, lw_init: tuple = (0.02, 0.3, 0.0002), lw_bounds: tuple = ([0, 0.5], [0, 1.0], [-0.0004, 0.0004]), ewt_detection_limit: float = 0.5, return_abs_co: bool = False, ): """Given a reflectance estimate, fit a state vector including liquid water path length based on a simple Beer-Lambert surface model. Args: rfl_meas: surface reflectance spectrum wl: instrument wavelengths, must be same size as rfl_meas l_shoulder: wavelength of left absorption feature shoulder r_shoulder: wavelength of right absorption feature shoulder lw_init: initial guess for liquid water path length, intercept, and slope lw_bounds: lower and upper bounds for liquid water path length, intercept, and slope ewt_detection_limit: upper detection limit for ewt return_abs_co: if True, returns absorption coefficients of liquid water Returns: solution: estimated liquid water path length, intercept, and slope based on a given surface reflectance """ # params needed for liquid water fitting lw_feature_left = np.argmin(abs(l_shoulder - wl)) lw_feature_right = np.argmin(abs(r_shoulder - wl)) wl_sel = wl[lw_feature_left : lw_feature_right + 1] # adjust upper detection limit for ewt if specified if ewt_detection_limit != 0.5: lw_bounds[0][1] = ewt_detection_limit # load imaginary part of liquid water refractive index and calculate wavelength dependent absorption coefficient # __file__ should live at isofit/isofit/inversion/ isofit_path = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) path_k = os.path.join(isofit_path, "data", "iop", "k_liquid_water_ice.xlsx") k_wi = pd.read_excel(io=path_k, sheet_name="Sheet1", engine="openpyxl") wl_water, k_water = get_refractive_index( k_wi=k_wi, a=0, b=982, col_wvl="wvl_6", col_k="T = 20°C" ) kw = np.interp(x=wl_sel, xp=wl_water, fp=k_water) abs_co_w = 4 * np.pi * kw / wl_sel rfl_meas_sel = rfl_meas[lw_feature_left : lw_feature_right + 1] x_opt = least_squares( fun=beer_lambert_model, x0=lw_init, jac="2-point", method="trf", bounds=( np.array([lw_bounds[ii][0] for ii in range(3)]), np.array([lw_bounds[ii][1] for ii in range(3)]), ), max_nfev=15, args=(rfl_meas_sel, wl_sel, abs_co_w), ) solution = x_opt.x if return_abs_co: return solution, abs_co_w else: return solution
[docs] def beer_lambert_model(x, y, wl, alpha_lw): """Function, which computes the vector of residuals between measured and modeled surface reflectance optimizing for path length of surface liquid water based on the Beer-Lambert attenuation law. Args: x: state vector (liquid water path length, intercept, slope) y: measurement (surface reflectance spectrum) wl: instrument wavelengths alpha_lw: wavelength dependent absorption coefficients of liquid water Returns: resid: residual between modeled and measured surface reflectance """ attenuation = np.exp(-x[0] * 1e7 * alpha_lw) rho = (x[1] + x[2] * wl) * attenuation resid = rho - y return resid