#! /usr/bin/env python3
#
# Copyright 2018 California Institute of Technology
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# ISOFIT: Imaging Spectrometer Optimal FITting
# Authors: Philip G. Brodrick, philip.brodrick@jpl.nasa.gov
# Niklas Bohn, urs.n.bohn@jpl.nasa.gov
# Jay E. Fahlen, jay.e.fahlen@jpl.nasa.gov
#
from types import SimpleNamespace
import numpy as np
from isofit.configs import Config
from isofit.configs.sections.radiative_transfer_config import (
RadiativeTransferEngineConfig,
)
from isofit.core.geometry import Geometry
from ..core.common import eps
from ..radiative_transfer.modtran import ModtranRT
from ..radiative_transfer.six_s import SixSRT
from ..radiative_transfer.sRTMnet import SimulatedModtranRT
# Match config string options to modules
RTE = {
"modtran": ModtranRT,
"6s": SixSRT,
"sRTMnet": SimulatedModtranRT,
}
[docs]
def confPriority(key, configs, none=False):
"""
Selects a key from a config if the value for that key is not None
Prioritizes returning the first value found in the configs list
TODO: ISOFIT configs are annoying and will create keys to NoneTypes
Should use mlky to handle key discovery at runtime instead of like this
"""
value = None
for config in configs:
if hasattr(config, key):
value = getattr(config, key)
if value is not None:
break
return value
[docs]
class RadiativeTransfer:
"""This class controls the radiative transfer component of the forward
model. An ordered dictionary is maintained of individual RTMs (MODTRAN,
for example). We loop over the dictionary concatenating the radiation
and derivatives from each RTM and interval to form the complete result.
In general, some of the state vector components will be shared between
RTMs and bands. For example, H20STR is shared between both VISNIR and
TIR. This class maintains the master list of statevectors.
"""
# Keys to retrieve from 3 sections to use the preferred
# Prioritizes retrieving from radiative_transfer_engines first, then instrument, then radiative_transfer
_keys = [
"interpolator_style",
"overwrite_interpolator",
"lut_grid",
"lut_path",
"wavelength_file",
]
def __init__(self, full_config: Config):
config = full_config.forward_model.radiative_transfer
confIT = full_config.forward_model.instrument
self.lut_grid = config.lut_grid
self.statevec_names = config.statevector.get_element_names()
self.rt_engines = []
for idx in range(len(config.radiative_transfer_engines)):
confRT = config.radiative_transfer_engines[idx]
if confRT.engine_name not in RTE:
raise AttributeError(
f"Invalid radiative transfer engine choice. Got: {confRT.engine_name}; Must be one of: {RTE}"
)
# Generate the params for this RTE
params = {
key: confPriority(key, [confRT, confIT, config]) for key in self._keys
}
params["engine_config"] = confRT
# Select the right RTE and initialize it
rte = RTE[confRT.engine_name](**params)
self.rt_engines.append(rte)
# If any engine is true, self is true
self.topography_model = any([rte.topography_model for rte in self.rt_engines])
# The rest of the code relies on sorted order of the individual RT engines which cannot
# be guaranteed by the dict JSON or YAML input
self.rt_engines.sort(key=lambda x: x.wl[0])
# Retrieved variables. We establish scaling, bounds, and
# initial guesses for each state vector element. The state
# vector elements are all free parameters in the RT lookup table,
# and they all have associated dimensions in the LUT grid.
self.bounds, self.scale, self.init = [], [], []
self.prior_mean, self.prior_sigma = [], []
for sv, sv_name in zip(*config.statevector.get_elements()):
self.bounds.append(sv.bounds)
self.scale.append(sv.scale)
self.init.append(sv.init)
self.prior_sigma.append(sv.prior_sigma)
self.prior_mean.append(sv.prior_mean)
self.bounds = np.array(self.bounds)
self.scale = np.array(self.scale)
self.init = np.array(self.init)
self.prior_mean = np.array(self.prior_mean)
self.prior_sigma = np.array(self.prior_sigma)
self.wl = np.concatenate([RT.wl for RT in self.rt_engines])
self.bvec = config.unknowns.get_element_names()
self.bval = np.array([x for x in config.unknowns.get_elements()[0]])
self.solar_irr = np.concatenate([RT.solar_irr for RT in self.rt_engines])
# TODO: Is code for this missing? We have if statements that rely on this
self.glint_model = False
[docs]
def xa(self):
"""Pull the priors from each of the individual RTs."""
return self.prior_mean
[docs]
def Sa(self):
"""Pull the priors from each of the individual RTs."""
return np.diagflat(np.power(np.array(self.prior_sigma), 2))
[docs]
def get_shared_rtm_quantities(self, x_RT, geom):
"""Return only the set of RTM quantities (transup, sphalb, etc.) that are contained
in all RT engines.
"""
ret = []
for RT in self.rt_engines:
ret.append(RT.get(x_RT, geom))
return self.pack_arrays(ret)
@property
def coszen(self):
"""
Backwards compatibility until Geometry takes over this param
Return some child RTE coszen
"""
for child in self.rt_engines:
if "coszen" in child.lut:
return child.lut.coszen.data
[docs]
def calc_rdn(self, x_RT, rfl, Ls, geom):
r = self.get_shared_rtm_quantities(x_RT, geom)
L_atm = self.get_L_atm(x_RT, geom)
L_up = Ls * (r["transm_up_dir"] + r["transm_up_dif"])
if geom.bg_rfl is not None:
# adjacency effects are counted
I = (self.solar_irr * self.coszen) / np.pi
bg = geom.bg_rfl
t_down = r["transm_down_dif"] + r["transm_down_dir"]
ret = (
L_atm
+ I / (1.0 - r["sphalb"] * bg) * bg * t_down * r["transm_up_dif"]
+ I / (1.0 - r["sphalb"] * bg) * rfl * t_down * r["transm_up_dir"]
+ L_up
)
elif self.topography_model:
I = self.solar_irr / np.pi
t_dir_down = r["transm_down_dir"]
t_dif_down = r["transm_down_dif"]
if geom.cos_i is None:
cos_i = self.coszen
else:
cos_i = geom.cos_i
t_total_up = r["transm_up_dif"] + r["transm_up_dir"]
t_total_down = t_dir_down + t_dif_down
s_alb = r["sphalb"]
# topographic flux (topoflux) effect corrected
ret = (
L_atm
+ (
I * cos_i / (1.0 - s_alb * rfl) * t_dir_down
+ I * self.coszen / (1.0 - s_alb * rfl) * t_dif_down
)
* rfl
* t_total_up
)
elif self.glint_model:
L_down_transmitted = self.get_L_down_transmitted(x_RT, geom)
E_dd = (self.solar_irr * self.coszen) / np.pi * r["t_down_dir"]
E_ds = (self.solar_irr * self.coszen) / np.pi * r["t_down_dif"]
E_d = E_dd + E_ds
L_sky = x_surface[-2] * E_dd + x_surface[-1] * E_ds
rho_ls = 0.02 # fresnel reflectance factor (approx. 0.02 for nadir view)
glint = rho_ls * (L_sky / E_d)
ret = (
L_atm
+ L_down_transmitted
* (rfl + glint)
/ (1.0 - r["sphalb"] * (rfl + glint))
+ L_up
)
else:
L_down_transmitted = self.get_L_down_transmitted(x_RT, geom)
ret = L_atm + L_down_transmitted * rfl / (1.0 - r["sphalb"] * rfl) + L_up
return ret
[docs]
def get_L_atm(self, x_RT: np.array, geom: Geometry) -> np.array:
"""Get the interpolated modeled atmospheric reflectance (aka path radiance).
Args:
x_RT: radiative-transfer portion of the statevector
geom: local geometry conditions for lookup
Returns:
interpolated modeled atmospheric reflectance
"""
L_atms = []
for RT in self.rt_engines:
if RT.treat_as_emissive:
r = RT.get(x_RT, geom)
rdn = r["thermal_upwelling"]
L_atms.append(rdn)
else:
r = RT.get(x_RT, geom)
rho = r["rhoatm"]
rdn = rho / np.pi * (self.solar_irr * self.coszen)
L_atms.append(rdn)
return np.hstack(L_atms)
[docs]
def get_L_down_transmitted(self, x_RT: np.array, geom: Geometry) -> np.array:
"""Get the interpolated total downward atmospheric transmittance.
Thermal_downwelling already includes the transmission factor. Also
assume there is no multiple scattering for TIR.
Args:
x_RT: radiative-transfer portion of the statevector
geom: local geometry conditions for lookup
Returns:
interpolated total downward atmospheric transmittance
"""
L_downs = []
for RT in self.rt_engines:
if RT.treat_as_emissive:
r = RT.get(x_RT, geom)
rdn = r["thermal_downwelling"]
L_downs.append(rdn)
else:
r = RT.get(x_RT, geom)
rdn = (
(self.solar_irr * self.coszen)
/ np.pi
* (r["transm_down_dir"] + r["transm_down_dif"])
)
L_downs.append(rdn)
return np.hstack(L_downs)
[docs]
def drdn_dRT(self, x_RT, rfl, drfl_dsurface, Ls, dLs_dsurface, geom: Geometry):
# first the rdn at the current state vector
rdn = self.calc_rdn(x_RT, rfl, Ls, geom)
# perturb each element of the RT state vector (finite difference)
K_RT = []
x_RTs_perturb = x_RT + np.eye(len(x_RT)) * eps
for x_RT_perturb in list(x_RTs_perturb):
rdne = self.calc_rdn(x_RT_perturb, rfl, Ls, geom)
K_RT.append((rdne - rdn) / eps)
K_RT = np.array(K_RT).T
# Get K_surface
r = self.get_shared_rtm_quantities(x_RT, geom)
if geom.bg_rfl is not None:
# adjacency effects are counted
I = (self.solar_irr * self.coszen) / np.pi
bg = geom.bg_rfl
t_down = r["transm_down_dif"] + r["transm_down_dir"]
drdn_drfl = I / (1.0 - r["sphalb"] * bg) * t_down * r["transm_up_dir"]
elif self.topography_model:
# jac w.r.t. topoflux correct radiance
I = self.solar_irr / np.pi
t_dir_down = r["transm_down_dir"]
t_dif_down = r["transm_down_dif"]
if geom.cos_i is None:
cos_i = self.coszen
else:
cos_i = geom.cos_i
t_total_up = r["transm_up_dif"] + r["transm_up_dir"]
t_total_down = t_dir_down + t_dif_down
s_alb = r["sphalb"]
a = t_total_up * (I * cos_i * t_dir_down + I * self.coszen * t_dif_down)
drdn_drfl = a / (1 - s_alb * rfl) ** 2
elif self.glint_model:
L_down_transmitted = self.get_L_down_transmitted(x_RT, geom)
E_dd = (self.solar_irr * self.coszen) / np.pi * r["t_down_dir"]
E_ds = (self.solar_irr * self.coszen) / np.pi * r["t_down_dif"]
E_d = E_dd + E_ds
L_sky = x_surface[-2] * E_dd + x_surface[-1] * E_ds
rho_ls = 0.02 # fresnel reflectance factor (approx. 0.02 for nadir view)
glint = rho_ls * (L_sky / E_d)
drho_scaled_for_multiscattering_drfl = (
1.0 / (1 - r["sphalb"] * (rfl + glint)) ** 2
)
drdn_drfl = L_down_transmitted * drho_scaled_for_multiscattering_drfl
else:
L_down_transmitted = self.get_L_down_transmitted(x_RT, geom)
# The reflected downwelling light is:
# L_down_transmitted * rfl / (1.0 - r['sphalb'] * rfl), or
# L_down_transmitted * rho_scaled_for_multiscattering
# This term is the derivative of rho_scaled_for_multiscattering
drho_scaled_for_multiscattering_drfl = 1.0 / (1 - r["sphalb"] * rfl) ** 2
drdn_drfl = L_down_transmitted * drho_scaled_for_multiscattering_drfl
drdn_dLs = r["transm_up_dir"] + r["transm_up_dif"]
K_surface = (
drdn_drfl[:, np.newaxis] * drfl_dsurface
+ drdn_dLs[:, np.newaxis] * dLs_dsurface
)
return K_RT, K_surface
[docs]
def drdn_dRTb(self, x_RT, x_surface, rfl, Ls, geom):
if len(self.bvec) == 0:
Kb_RT = np.zeros((0, len(self.wl.shape)))
# currently, the K_b matrix only covers forward model derivatives due to H2O_ABSCO unknowns,
# so that subsequent errors might occur when water vapor is not part of the state
# vector (which is very unlikely though). the following statement captures this case,
# but might need to be modified as soon as we add more unknowns
# ToDo: might require modification in case more unknowns are added
elif len(self.bvec) > 0 and "H2OSTR" not in self.statevec_names:
Kb_RT = np.zeros((1, len(self.wl)))
else:
# first the radiance at the current state vector
rdn = self.calc_rdn(x_RT, rfl, Ls, geom)
# unknown parameters modeled as random variables per
# Rodgers et al (2000) K_b matrix. We calculate these derivatives
# by finite differences
Kb_RT = []
perturb = 1.0 + eps
for unknown in self.bvec:
if unknown == "H2O_ABSCO" and "H2OSTR" in self.statevec_names:
i = self.statevec_names.index("H2OSTR")
x_RT_perturb = x_RT.copy()
x_RT_perturb[i] = x_RT[i] * perturb
rdne = self.calc_rdn(x_RT_perturb, rfl, Ls, geom)
Kb_RT.append((rdne - rdn) / eps)
Kb_RT = np.array(Kb_RT).T
return Kb_RT
[docs]
def summarize(self, x_RT, geom):
ret = []
for RT in self.rt_engines:
ret.append(RT.summarize(x_RT, geom))
ret = "\n".join(ret)
return ret
[docs]
def pack_arrays(self, rtm_quantities_from_RT_engines):
"""Take the list of dict outputs from each RT engine and
stack their internal arrays in the same order. Keep only
those quantities that are common to all RT engines.
"""
# Get the intersection of the sets of keys from each of the rtm_quantities_from_RT_engines
shared_rtm_keys = set(rtm_quantities_from_RT_engines[0].keys())
if len(rtm_quantities_from_RT_engines) > 1:
for rtm_quantities_from_one_RT_engine in rtm_quantities_from_RT_engines[1:]:
shared_rtm_keys.intersection_update(
rtm_quantities_from_one_RT_engine.keys()
)
# Concatenate the different band ranges
rtm_quantities_concatenated_over_RT_bands = {}
for key in shared_rtm_keys:
temp = [x[key] for x in rtm_quantities_from_RT_engines]
rtm_quantities_concatenated_over_RT_bands[key] = np.hstack(temp)
return rtm_quantities_concatenated_over_RT_bands
[docs]
def ext550_to_vis(ext550):
"""VIS is defined as a function of the surface aerosol extinction coefficient
at 550 nm in km-1, EXT550, by the formula VIS[km] = ln(50) / (EXT550 + 0.01159),
where 0.01159 is the surface Rayleigh scattering coefficient at 550 nm in km-1
(see MODTRAN6 manual, p. 50).
"""
return np.log(50.0) / (ext550 + 0.01159)