Source code for isofit.radiative_transfer.modtran

#! /usr/bin/env python3
#  Copyright 2018 California Institute of Technology
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  See the License for the specific language governing permissions and
#  limitations under the License.
# ISOFIT: Imaging Spectrometer Optimal FITting
# Authors: David R Thompson,
#          Nimrod Carmon,

import json
import logging
import os
import re
import subprocess
from copy import deepcopy
from sys import platform

import numpy as np
import scipy.interpolate
import scipy.stats

from isofit.radiative_transfer.radiative_transfer_engine import RadiativeTransferEngine

from ..core.common import json_load_ascii, recursive_replace

Logger = logging.getLogger(__file__)

### Variables ###

eps = 1e-5  # used for finite difference derivative calculations
tropopause_altitude_km = 17.0

### Classes ###

[docs] class FileExistsError(Exception): """FileExistsError with a message.""" def __init__(self, message): super(FileExistsError, self).__init__(message)
[docs] class ModtranRT(RadiativeTransferEngine): """A model of photon transport including the atmosphere."""
[docs] @staticmethod def parseTokens(tokens: list, coszen: float) -> dict: """ Processes tokens returned by parseLine() Parameters ---------- tokens: list List of floats returned by parseLine() coszen: float cos(zenith(filename)) Returns ------- dict Dictionary of calculated values using the tokens list """ irr = tokens[18] * 1e6 * np.pi / tokens[8] / coszen # uW/nm/sr/cm2 # fmt: off # If classic singlepart transmittance is used, # we store total transmittance ((down direct + down diffuse) * (up direct + up diffuse)) # under the diffuse down transmittance key (transm_down_dif) to ensure consistency # Tokens[24] contains only the direct upward transmittance, # so we store it under the direct upward transmittance key (transm_up_dir) # ToDo: remove in future versions and enforce the use of multipart transmittance return { 'solar_irr' : irr, # Solar irradiance 'wl' : tokens[0], # Wavelength 'rhoatm' : tokens[4] * 1e6 * np.pi / (irr * coszen), # uW/nm/sr/cm2 'width' : tokens[8], 'thermal_upwelling' : (tokens[11] + tokens[12]) / tokens[8] * 1e6, # uW/nm/sr/cm2 'thermal_downwelling': tokens[16] * 1e6 / tokens[8], 'path_rdn' : tokens[14] * 1e6 + tokens[15] * 1e6, # The sum of the (1) single scattering and (2) multiple scattering 'grnd_rflt' : tokens[16] * 1e6, # ground reflected radiance (direct+diffuse+multiple scattering) 'drct_rflt' : tokens[17] * 1e6, # same as 16 but only on the sun->surface->sensor path (only direct) 'transm_down_dif' : tokens[21] + tokens[22], # total transmittance (down * up, direct + diffuse) 'sphalb' : tokens[23], # atmospheric spherical albedo 'transm_up_dir' : tokens[24], # upward direct transmittance }
# fmt: on
[docs] @staticmethod def parseLine(line: str) -> list: """ Parses a single line of a .chn file into a list of token values Parameters ---------- line: str Singular data line of a MODTRAN .chn file Returns ------- list List of floats parsed from the line """ # Fixes issues in large datasets where irrelevant columns touch which breaks parseTokens() line = line[:17] + " " + line[18:] return [float(match) for match in re.findall(r"(\d\S*)", line)]
[docs] def load_chn(self, file: str, coszen: float, header: int = 5) -> dict: """ Parses a MODTRAN channel file and extracts relevant data Parameters ---------- file: str Path to a .chn file coszen: float ... header: int, defaults=5 Number of lines to skip for the header Returns ------- chn: dict Channel data """ with open(file, "r") as f: lines = f.readlines() data = [lines[header:]] # Determine if this is a multipart transmittance file, break if so L = len(lines) for N in range(2, 4): # Currently support 1, 2, and 3 part transmittance files # Length of each part n = int(L / N) # Check if the first line of the next part is the same if lines[1] == lines[n + 1]: Logger.debug(f"Channel file discovered to be {N} parts: {file}") # Parse the lines into N many parts data = [] for i in range(N): j = i + 1 data.append(lines[n * i : n * j][header:]) # No need to check other N sizes break else: Logger.warning( "Channel file detected to be a single transmittance, support for this will be dropped in a future version." + " Please start using 2 or 3 multipart transmittance files." ) parts = [] for part, lines in enumerate(data): parsed = [self.parseTokens(self.parseLine(line), coszen) for line in lines] # Convert from: [{k1: v11, k2: v21}, {k1: v12, k2: v22}] # to: {k1: [v11, v22], k2: [v21, v22]} - as numpy arrays combined = {} for i, parse in enumerate(parsed): for key, value in parse.items(): values = combined.setdefault(key, np.full(len(parsed), np.nan)) values[i] = value parts.append(combined) # Single transmittance files will be the first dict in the list, otherwise multiparts use two_albedo_method chn = parts[0] if len(parts) > 1: Logger.debug("Using two albedo method") chn = self.two_albedo_method(*parts, coszen, *self.test_rfls) return chn
[docs] @staticmethod def load_tp6(file): """ Parses relevant information from a tp6 file. Specifically, seeking a table in the unstructured text and extracting a column from it. Parameters ---------- tp6: str tp6 file path """ with open(file, "r") as tp6: lines = tp6.readlines() if not lines: raise ValueError(f"tp6 file is empty: {file}") for i, line in enumerate(lines): # Table found if "SINGLE SCATTER SOLAR" in line: i += 5 # Skip header break # Start at the table solzen = [] for line in lines[i:]: split = line.split() # End of table if not split: break # Retrieve solar zenith solzen.append(float(split[3])) if not solzen: raise ValueError(f"No solar zenith found in tp6 file: {file}") return np.mean(solzen)
[docs] def preSim(self): """ Post-initialized, pre-simulation setup """ self.filtpath = os.path.join( self.sim_path, f"wavelengths_{self.engine_config.engine_name}_{self.wl[0]}_{self.wl[-1]}.flt", ) self.template = json_load_ascii(self.engine_config.template_file)["MODTRAN"] # Regenerate MODTRAN input wavelength file if not os.path.exists(self.filtpath): self.wl2flt(self.wl, self.fwhm, self.filtpath) # Insert aerosol templates, if specified if self.engine_config.aerosol_model_file is not None: self.template[0]["MODTRANINPUT"]["AEROSOLS"] = json_load_ascii( self.engine_config.aerosol_template_file ) # Insert aerosol data, if specified if self.engine_config.aerosol_model_file is not None: aer_data = np.loadtxt(self.engine_config.aerosol_model_file) self.aer_wl = aer_data[:, 0] aer_data = np.transpose(aer_data[:, 1:]) self.naer = int(len(aer_data) / 3) aer_absc, aer_extc, aer_asym = [], [], [] for i in range(self.naer): aer_extc.append(aer_data[i * 3]) aer_absc.append(aer_data[i * 3 + 1]) aer_asym.append(aer_data[i * 3 + 2]) self.aer_absc = np.array(aer_absc) self.aer_extc = np.array(aer_extc) self.aer_asym = np.array(aer_asym)
[docs] def readSim(self, point): """ For a given point, parses the tp6 and chn file and returns the data """ file = os.path.join(self.sim_path, self.point_to_filename(point)) solzen = self.load_tp6(f"{file}.tp6") coszen = np.cos(solzen * np.pi / 180.0) params = self.load_chn(f"{file}.chn", coszen) # Remove thermal terms in VSWIR runs to avoid incorrect usage if self.treat_as_emissive is False: for key in ["thermal_upwelling", "thermal_downwelling"]: if key in params: Logger.debug( f"Deleting key because treat_as_emissive is False: {key}" ) del params[key] params["solzen"] = solzen params["coszen"] = coszen return params
[docs] def makeSim(self, point, file=None, timeout=None): """ Prepares the command to execute MODTRAN """ filename_base = file or self.point_to_filename(point) # Translate ISOFIT generic lut names to MODTRAN-specific names translation = { "surface_elevation_km": "GNDALT", "observer_altitude_km": "H1ALT", "observer_azimuth": "TRUEAZ", "observer_zenith": "OBSZEN", } names = [translation.get(key, key) for key in self.lut_names] vals = dict([(n, v) for n, v in zip(names, point)]) vals["DISALB"] = True vals["NAME"] = filename_base vals["FILTNM"] = os.path.normpath(self.filtpath) modtran_config_str, modtran_config = self.modtran_driver(dict(vals)) # Check rebuild conditions: LUT is missing or from a different config infilename = "LUT_" + filename_base + ".json" infilepath = os.path.join(self.sim_path, "LUT_" + filename_base + ".json") if not self.required_results_exist(filename_base): rebuild = True else: # We compare the two configuration files, ignoring names and # wavelength paths which tend to be non-portable with open(infilepath, "r") as fin: current_config = json.load(fin)["MODTRAN"] current_config[0]["MODTRANINPUT"]["NAME"] = "" modtran_config[0]["MODTRANINPUT"]["NAME"] = "" current_config[0]["MODTRANINPUT"]["SPECTRAL"]["FILTNM"] = "" modtran_config[0]["MODTRANINPUT"]["SPECTRAL"]["FILTNM"] = "" current_str = json.dumps(current_config) modtran_str = json.dumps(modtran_config) rebuild = modtran_str.strip() != current_str.strip() if not rebuild: raise FileExistsError(f"File exists: {filename_base=}") # write_config_file with open(infilepath, "w") as f: f.write(modtran_config_str) # Specify location of the proper MODTRAN 6.0 binary for this OS xdir = {"linux": "linux", "darwin": "macos", "windows": "windows"} # Generate the CLI path cmd = os.path.join( self.engine_base_dir, "bin", xdir[platform], "mod6c_cons " + infilename ) call = cmd, shell=True, timeout=timeout, cwd=self.sim_path, capture_output=True ) if call.stdout: Logger.error(call.stdout.decode())
[docs] def modtran_driver(self, overrides): """Write a MODTRAN 6.0 input file.""" param = deepcopy(self.template) if hasattr(self, "aer_absc"): fracs = np.zeros((self.naer)) if "IPARM" not in param[0]["MODTRANINPUT"]["GEOMETRY"]: raise AttributeError("MODTRAN template requires an IPARM specification") if param[0]["MODTRANINPUT"]["GEOMETRY"]["ITYPE"] != 3: raise AttributeError("Currently unsupported modtran ITYPE specification") # Geometry values that depend on IPARM if ( param[0]["MODTRANINPUT"]["GEOMETRY"]["IPARM"] == 12 and "GMTIME" in overrides.keys() ): raise AttributeError( "GMTIME in MODTRAN driver overrides, but IPARM set to 12. Check" " modtran template." ) elif param[0]["MODTRANINPUT"]["GEOMETRY"]["IPARM"] == 11 and { "solar_azimuth", "solaz", "solar_zenith", "solzen", }.intersection(set(overrides.keys())): raise AttributeError( "Solar geometry (solar az/azimuth zen/zenith) is specified, but IPARM" " is set to 12. Check MODTRAN template" ) if {"PARM1", "PARM2"}.intersection(set(overrides.keys())): raise AttributeError( "PARM1 and PARM2 keys not supported as LUT dimensions. Please use" " either solar_azimuth/solaz or solar_zenith/solzen" ) # Perform overrides for key, val in overrides.items(): recursive_replace(param, key, val) if key.startswith("AER"): i = int(key.split("_")[-1]) fracs[i] = val elif key in ["EXT550", "AOT550", "AOD550"]: # MODTRAN 6.0 convention treats negative visibility as AOT550 recursive_replace(param, "VIS", -val) elif key == "FILTNM": param[0]["MODTRANINPUT"]["SPECTRAL"]["FILTNM"] = val elif key == "FILTNM": param[0]["MODTRANINPUT"]["SPECTRAL"]["FILTNM"] = val # Geometry parameters we want to populate even if unassigned elif key in ["H1ALT", "IDAY", "TRUEAZ", "OBSZEN", "GMTIME"]: param[0]["MODTRANINPUT"]["GEOMETRY"][key] = val elif key == "AIRT_DELTA_K": # If there is no profile already provided ... if ( param[0]["MODTRANINPUT"]["ATMOSPHERE"]["MODEL"] != "ATM_USER_ALT_PROFILE" ): # MODTRAN cannot accept a ground altitude above 6 km, so keep all layers after that gndalt = param[0]["MODTRANINPUT"]["SURFACE"]["GNDALT"] # E.g.: [1.5, 2, 3, 4, 5] low_altitudes = [gndalt] + list( np.arange(6 - np.ceil(gndalt)) + np.ceil(gndalt) ) # MODTRAN cannot accept a ground altitude above 6 km, so keep all layers after that hi_altitudes = [ 6.0, 7.0, 8.0, 9.0, 10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, 19.0, 20.0, 21.0, 22.0, 23.0, 24.0, 25.0, 30.0, 35.0, 40.0, 45.0, 50.0, 55.0, 60.0, 70.0, 80.0, 100.0, ] altitudes = ( low_altitudes + hi_altitudes ) # Append lists, don't add altitudes! prof_unt_tdelta_kelvin = np.where( np.array(altitudes) <= tropopause_altitude_km, val, 0 ) altitude_dict = { "TYPE": "PROF_ALTITUDE", "UNITS": "UNT_KILOMETERS", "PROFILE": altitudes, } delta_kelvin_dict = { "TYPE": "PROF_TEMPERATURE", "UNITS": "UNT_TDELTA_KELVIN", "PROFILE": prof_unt_tdelta_kelvin.tolist(), } param[0]["MODTRANINPUT"]["ATMOSPHERE"][ "MODEL" ] = "ATM_USER_ALT_PROFILE" param[0]["MODTRANINPUT"]["ATMOSPHERE"]["NPROF"] = 2 param[0]["MODTRANINPUT"]["ATMOSPHERE"]["NLAYERS"] = len(altitudes) param[0]["MODTRANINPUT"]["ATMOSPHERE"]["PROFILES"] = [ altitude_dict, delta_kelvin_dict, ] else: # A profile is already provided, assume that it includes PROF_ALTITUDE nprof = param[0]["MODTRANINPUT"]["ATMOSPHERE"]["NPROF"] profile_types = [] for i in range(nprof): profile_types.append( param[0]["MODTRANINPUT"]["ATMOSPHERE"]["PROFILES"][i][ "TYPE" ] ) ind_prof_altitude = profile_types.index("PROF_ALTITUDE") prof_altitude = np.array( param[0]["MODTRANINPUT"]["ATMOSPHERE"]["PROFILES"][ ind_prof_altitude ]["PROFILE"] ) if "PROF_TEMPERATURE" in profile_types: # If a temperature profile already exists, then we must add the temperature delta to that # as MODTRAN apparently does not allow have both an offset and a specified temperature ind_prof_temperature = profile_types.index("PROF_TEMPERATURE") prof_temperature = np.array( param[0]["MODTRANINPUT"]["ATMOSPHERE"]["PROFILES"][ ind_prof_temperature ]["PROFILE"] ) prof_temperature = np.where( prof_altitude <= tropopause_altitude_km, prof_temperature + val, prof_temperature, ) param[0]["MODTRANINPUT"]["ATMOSPHERE"]["PROFILES"][ ind_prof_temperature ]["PROFILE"] = prof_temperature.tolist() else: # If a temperature profile does not exist, then use UNT_TDELTA_KELVIN prof_unt_tdelta_kelvin = np.where( prof_altitude <= tropopause_altitude_km, val, 0.0 ) prof_unt_tdelta_kelvin_dict = { "TYPE": "PROF_TEMPERATURE", "UNITS": "UNT_TDELTA_KELVIN", "PROFILE": prof_unt_tdelta_kelvin.tolist(), } param[0]["MODTRANINPUT"]["ATMOSPHERE"]["PROFILES"].append( prof_unt_tdelta_kelvin_dict ) param[0]["MODTRANINPUT"]["ATMOSPHERE"]["NPROF"] = nprof + 1 # Surface parameters we want to populate even if unassigned elif key in ["GNDALT"]: param[0]["MODTRANINPUT"]["SURFACE"][key] = val elif key in ["solar_azimuth", "solaz"]: if "TRUEAZ" not in param[0]["MODTRANINPUT"]["GEOMETRY"]: raise AttributeError( "Cannot have solar azimuth in LUT without specifying TRUEAZ. " " Use RELAZ instead." ) param[0]["MODTRANINPUT"]["GEOMETRY"]["PARM1"] = ( param[0]["MODTRANINPUT"]["GEOMETRY"]["TRUEAZ"] - val + 180 ) elif key in ["solar_zenith", "solzen"]: param[0]["MODTRANINPUT"]["GEOMETRY"]["PARM2"] = abs(val) # elif key in ['altitude_km'] # elif key in ['altitude_km'] elif key in ["DISALB", "NAME"]: recursive_replace(param, key, val) elif key in param[0]["MODTRANINPUT"]["ATMOSPHERE"].keys(): recursive_replace(param, key, val) else: raise AttributeError( "Unsupported MODTRAN parameter {} specified".format(key) ) # For custom aerosols, specify final extinction and absorption # MODTRAN 6.0 convention treats negative visibility as AOT550 if hasattr(self, "aer_absc"): total_aot = fracs.sum() recursive_replace(param, "VIS", -total_aot) total_extc = total_absc = norm_fracs = fracs / (fracs.sum()) total_asym = # Normalize to 550 nm total_extc550 = scipy.interpolate.interp1d(self.aer_wl, total_extc)(0.55) lvl0 = param[0]["MODTRANINPUT"]["AEROSOLS"]["IREGSPC"][0] lvl0["NARSPC"] = len(self.aer_wl) lvl0["VARSPC"] = [float(v) for v in self.aer_wl] lvl0["ASYM"] = [float(v) for v in total_asym] lvl0["EXTC"] = [float(v) / total_extc550 for v in total_extc] lvl0["ABSC"] = [float(v) / total_extc550 for v in total_absc] if self.multipart_transmittance: const_rfl = np.array(np.array(self.test_rfls) * 100, dtype=int) # Here we copy the original config and just change the surface reflectance param[0]["MODTRANINPUT"]["CASE"] = 0 param[0]["MODTRANINPUT"]["SURFACE"]["SURFP"][ "CSALB" ] = f"LAMB_CONST_{const_rfl[0]}_PCT" param1 = deepcopy(param[0]) param1["MODTRANINPUT"]["CASE"] = 1 param1["MODTRANINPUT"]["SURFACE"]["SURFP"][ "CSALB" ] = f"LAMB_CONST_{const_rfl[1]}_PCT" param.append(param1) param2 = deepcopy(param[0]) param2["MODTRANINPUT"]["CASE"] = 2 param2["MODTRANINPUT"]["SURFACE"]["SURFP"][ "CSALB" ] = f"LAMB_CONST_{const_rfl[2]}_PCT" param.append(param2) return json.dumps({"MODTRAN": param}), param
[docs] def check_modtran_water_upperbound(self) -> float: """Check to see what the max water vapor values is at the first point in the LUT Returns: float: max water vapor value, or None if test fails """ point = np.array([x[-1] for x in self.lut_grids]) # Set the H2OSTR value as arbitrarily high - 50 g/cm2 in this case point[self.lut_names.index("H2OSTR")] = 50 filebase = os.path.join(self.sim_path, "H2O_bound_test") self.makeSim(point, filebase) max_water = None with open( os.path.join(self.sim_path, filebase + ".tp6"), errors="ignore" ) as tp6file: for count, line in enumerate(tp6file): if "The water column is being set to the maximum" in line: max_water = line.split(",")[1].strip() max_water = float(max_water.split(" ")[0]) break return max_water
[docs] def required_results_exist(self, filename_base): infilename = os.path.join(self.sim_path, "LUT_" + filename_base + ".json") outchnname = os.path.join(self.sim_path, filename_base + ".chn") outtp6name = os.path.join(self.sim_path, filename_base + ".tp6") if ( os.path.isfile(infilename) and os.path.isfile(outchnname) and os.path.isfile(outtp6name) ): return True else: return False
[docs] def wl2flt(self, wavelengths: np.array, fwhms: np.array, outfile: str) -> None: """Helper function to generate Gaussian distributions around the center wavelengths. Args: wavelengths: wavelength centers fwhms: full width at half max outfile: file to write to """ sigmas = fwhms / 2.355 span = 2.0 * np.abs(wavelengths[1] - wavelengths[0]) # nm steps = 101 with open(outfile, "w") as fout: fout.write("Nanometer data for sensor\n") for wl, fwhm, sigma in zip(wavelengths, fwhms, sigmas): ws = wl + np.linspace(-span, span, steps) vs = scipy.stats.norm.pdf(ws, wl, sigma) vs = vs / vs[int(steps / 2)] wns = 10000.0 / (ws / 1000.0) fout.write("CENTER: %6.2f NM FWHM: %4.2f NM\n" % (wl, fwhm)) for w, v, wn in zip(ws, vs, wns): fout.write(" %9.4f %9.7f %9.2f\n" % (w, v, wn))