#! /usr/bin/env python3
#
# Copyright 2018 California Institute of Technology
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# ISOFIT: Imaging Spectrometer Optimal FITting
# Author: David R Thompson, david.r.thompson@jpl.nasa.gov
#
import json
import os
from argparse import ArgumentError
from collections import OrderedDict
from os.path import expandvars
from typing import List
import numpy as np
import scipy.linalg
import xxhash
from scipy.interpolate import RegularGridInterpolator
### Variables ###
# small value used in finite difference derivatives
eps = 1e-5
### Classes ###
[docs]
class VectorInterpolator:
"""Linear look up table interpolator. Support linear interpolation through radial space by expanding the look
up tables with sin and cos dimensions.
Args:
grid_input: list of lists of floats, indicating the gridpoint elements in each grid dimension
data_input: n dimensional array of radiative transfer engine outputs (each dimension size corresponds to the
given grid_input list length, with the last dimensions equal to the number of sensor channels)
lut_interp_types: a list indicating if each dimension is in radiance (r), degrees (r), or normal (n) units.
version: version to use: 'rg' for scipy RegularGridInterpolator, 'mlg' for multilinear grid interpolator
"""
def __init__(
self,
grid_input: List[List[float]],
data_input: np.array,
lut_interp_types: List[str],
version="nds-1",
):
self.method = 1
self.lut_interp_types = lut_interp_types
self.single_point_data = None
# Lists and arrays are mutable, so copy first
grid = grid_input.copy()
data = data_input.copy()
# Check if we are using a single grid point. If so, store the grid input.
if np.prod(list(map(len, grid))) == 1:
self.single_point_data = data
# expand grid dimensionality as needed
[radian_locations] = np.where(self.lut_interp_types == "r")
[degree_locations] = np.where(self.lut_interp_types == "d")
angle_locations = np.hstack([radian_locations, degree_locations])
angle_types = np.hstack(
[
self.lut_interp_types[radian_locations],
self.lut_interp_types[degree_locations],
]
)
for _angle_loc in range(len(angle_locations)):
angle_loc = angle_locations[_angle_loc]
# get original grid at given location
original_grid_subset = np.array(grid[angle_loc])
# convert for angular coordinates
if angle_types[_angle_loc] == "r":
grid_subset_cosin = np.cos(original_grid_subset)
grid_subset_sin = np.sin(original_grid_subset)
elif angle_types[_angle_loc] == "d":
grid_subset_cosin = np.cos(np.deg2rad(original_grid_subset))
grid_subset_sin = np.sin(np.deg2rad(original_grid_subset))
# handle the fact that the grid may no longer be in order
grid_subset_cosin_order = np.argsort(grid_subset_cosin)
grid_subset_sin_order = np.argsort(grid_subset_sin)
# convert current grid location, and add a second
grid[angle_loc] = grid_subset_cosin[grid_subset_cosin_order].tolist()
grid.insert(angle_loc + 1, grid_subset_sin[grid_subset_sin_order].tolist())
# now copy the data to be interpolated through the extra dimension,
# at the specific angle_loc axes. We'll use broadcast_to to do
# this, but we need to do it on the last dimension. So start by
# temporarily moving the target axes there, then broadcasting
data = np.array(data)
data = np.swapaxes(data, -1, angle_loc)
data_dim = list(np.shape(data))
data_dim.append(data_dim[-1])
data = data[..., np.newaxis] * np.ones(data_dim)
# Now we need to actually copy the data between the first two axes,
# as broadcast_to doesn't do this
for ind in range(data.shape[-1]):
data[..., ind] = data[..., :, ind]
# Now re-order the cosin dimension
data = data[..., grid_subset_cosin_order, :]
# Now re-order the sin dimension
data = data[..., grid_subset_sin_order]
# now re-arrange the axes so they're in the right order again,
dst_axes = np.arange(len(data.shape) - 2).tolist()
dst_axes.insert(angle_loc, len(data.shape) - 2)
dst_axes.insert(angle_loc + 1, len(data.shape) - 1)
dst_axes.remove(angle_loc)
dst_axes.append(angle_loc)
data = np.ascontiguousarray(np.transpose(data, axes=dst_axes))
# update the rest of the angle locations
angle_locations += 1
self.n = data.shape[-1]
# RegularGrid
if version == "rg":
grid_aug = grid + [np.arange(data.shape[-1])]
self.itp = RegularGridInterpolator(
grid_aug, data, bounds_error=False, fill_value=None
)
self.method = 1
# Multilinear Grid
elif version == "mlg":
self.method = 2
self.gridtuples = [np.array(t) for t in grid]
self.gridarrays = data
self.binwidth = [
t[1:] - t[:-1] for t in self.gridtuples
] # binwidth arrays for each dimension
self.maxbaseinds = np.array([len(t) - 1 for t in self.gridtuples])
else:
raise ArgumentError(None, f"Unknown interpolator version: {version!r}")
def _interpolate(self, points):
"""
Supports styles 'rg' and 'nds-k'
"""
# If we only have one point, we can't do any interpolation, so just
# return the original data.
if self.single_point_data is not None:
return self.single_point_data
x = np.zeros((self.n, len(points) + 1 + np.sum(self.lut_interp_types != "n")))
offset_count = 0
for i in range(len(points)):
if self.lut_interp_types[i] == "n":
x[:, i + offset_count] = points[i]
elif self.lut_interp_types[i] == "r":
x[:, i + offset_count] = np.cos(points[i])
x[:, i + 1 + offset_count] = np.sin(points[i])
offset_count += 1
elif self.lut_interp_types[i] == "d":
x[:, i + offset_count] = np.cos(np.deg2rad(points[i]))
x[:, i + 1 + offset_count] = np.sin(np.deg2rad(points[i]))
offset_count += 1
# This last dimension is always an integer so no
# interpolation is performed. This is done only
# for performance reasons.
x[:, -1] = np.arange(self.n)
res = self.itp(x)
return res
def _multilinear_grid(self, points):
"""
Jouni's implementation
Args:
point: The point being interpolated. If at the limit, the extremal value in
the grid is returned.
Returns:
cube: np.ndarray
"""
x = np.zeros((len(points) + np.sum(self.lut_interp_types != "n")))
offset_count = 0
for i in range(len(points)):
if self.lut_interp_types[i] == "n":
x[i + offset_count] = points[i]
elif self.lut_interp_types[i] == "r":
x[i + offset_count] = np.cos(points[i])
x[i + 1 + offset_count] = np.sin(points[i])
offset_count += 1
elif self.lut_interp_types[i] == "d":
x[i + offset_count] = np.cos(np.deg2rad(points[i]))
x[i + 1 + offset_count] = np.sin(np.deg2rad(points[i]))
offset_count += 1
# Index of left side of bin, and don't go over (that's why it's t[:-1] instead of t)
inds = [
np.searchsorted(t[:-1], x[i]) - 1 for i, t in enumerate(self.gridtuples)
]
deltas = np.array(
[
(x[i] - self.gridtuples[i][j]) / self.binwidth[i][j]
for i, j in enumerate(inds)
]
)
diff = 1 - deltas
# Set the 'cube' data to be our interpolation data
idx = tuple(
[
slice(
max(min(self.maxbaseinds[j], i), 0),
max(min(self.maxbaseinds[j] + 2, i + 2), 2),
)
for j, i in enumerate(inds)
]
)
cube = np.copy(self.gridarrays[idx], order="A")
for i, di in enumerate(deltas):
# Eliminate those indexes where we are outside grid range or exactly on the grid point
if x[i] >= self.gridtuples[i][-1]:
cube = cube[1]
elif x[i] <= self.gridtuples[i][0]:
cube = cube[0]
# Otherwise eliminate index by linear interpolation
else:
cube[0] *= diff[i]
cube[1] *= di
cube[0] += cube[1]
cube = cube[0]
return cube
def __call__(self, *args, **kwargs):
"""
Passes args to the appropriate interpolation method defined by the version at
object init.
"""
if self.method == 1:
return self._interpolate(*args, **kwargs)
elif self.method == 2:
return self._multilinear_grid(*args, **kwargs)
[docs]
def load_wavelen(wavelength_file: str):
"""Load a wavelength file, and convert to nanometers if needed.
Args:
wavelength_file: file to read wavelengths from
Returns:
(np.array, np.array): wavelengths, full-width-half-max
"""
q = np.loadtxt(wavelength_file)
if q.shape[1] > 2:
q = q[:, 1:3]
if q[0, 0] < 100:
q = q * 1000.0
wl, fwhm = q.T
return wl, fwhm
[docs]
def emissive_radiance(
emissivity: np.array, T: np.array, wl: np.array
) -> (np.array, np.array):
"""Calcluate the radiance of a surface due to emission.
Args:
emissivity: surface emissivity.
T: surface temperature [K]
wl: emmissivity wavelengths [nm]
Returns:
np.array: surface upwelling radiance in uW $cm^{-2} sr^{-1} nm^{-nm}$
np.array: partial derivative of radiance with respect to temperature uW $cm^{-2} sr^{-1} nm^{-1} k^{-1}$
"""
c_1 = 1.88365e32 / np.pi
c_2 = 14387690
J_per_eV = 1.60218e-19
wl_um = wl / 1000.0
ph_per_sec_cm2_sr_nm = c_1 / (wl**4) / (np.exp(c_2 / wl / T) - 1.0) * emissivity
# photon energy in eV
eV_per_sec_cm2_sr_nm = 1.2398 * ph_per_sec_cm2_sr_nm / wl_um
W_per_cm2_sr_nm = J_per_eV * eV_per_sec_cm2_sr_nm
uW_per_cm2_sr_nm = W_per_cm2_sr_nm * 1e6
dRdn_dT = (
c_1
/ (wl**4)
* (-pow(np.exp(c_2 / wl / T) - 1.0, -2.0))
* np.exp(c_2 / wl / T)
* (-pow(T, -2) * c_2 / wl)
* emissivity
/ wl_um
* 1.2398
* J_per_eV
* 1e6
)
return uW_per_cm2_sr_nm, dRdn_dT
[docs]
def svd_inv(C: np.array, hashtable: OrderedDict = None, max_hash_size: int = None):
"""Matrix inversion, based on decomposition. Built to be stable, and positive.
Args:
C: matrix to invert
hashtable: if used, the hashtable to store/retrieve results in/from
max_hash_size: maximum size of hashtable
Return:
np.array: inverse of C
"""
return svd_inv_sqrt(C, hashtable, max_hash_size)[0]
[docs]
def svd_inv_sqrt(
C: np.array, hashtable: OrderedDict = None, max_hash_size: int = None
) -> (np.array, np.array):
"""Matrix inversion, based on decomposition. Built to be stable, and positive.
Args:
C: matrix to invert
hashtable: if used, the hashtable to store/retrieve results in/from
max_hash_size: maximum size of hashtable
Return:
(np.array, np.array): inverse of C and square root of the inverse of C
"""
# If we have a hash table, look for the precalculated solution
h = None
if hashtable is not None:
# If arrays are in Fortran ordering, they are not hashable.
if not C.flags["C_CONTIGUOUS"]:
C = C.copy(order="C")
h = xxhash.xxh64_digest(C)
if h in hashtable:
return hashtable[h]
D, P = scipy.linalg.eigh(C)
for count in range(3):
if np.any(D < 0) or np.any(np.isnan(D)):
inv_eps = 1e-6 * (count - 1) * 10
D, P = scipy.linalg.eigh(C + np.diag(np.ones(C.shape[0]) * inv_eps))
else:
break
if count == 2:
raise ValueError(
"Matrix inversion contains negative values,"
+ "even after adding {} to the diagonal.".format(inv_eps)
)
Ds = np.diag(1 / np.sqrt(D))
L = P @ Ds
Cinv_sqrt = L @ P.T
Cinv = L @ L.T
# If there is a hash table, cache our solution. Bound the total cache
# size by removing any extra items in FIFO order.
if (hashtable is not None) and (max_hash_size is not None):
hashtable[h] = (Cinv, Cinv_sqrt)
while len(hashtable) > max_hash_size:
hashtable.popitem(last=False)
return Cinv, Cinv_sqrt
[docs]
def expand_path(directory: str, subpath: str) -> str:
"""Expand a path variable to an absolute path, if it is not one already.
Args:
directory: absolute location
subpath: path to expand
Returns:
str: expanded path
"""
if subpath.startswith("/"):
return subpath
return os.path.join(directory, subpath)
[docs]
def recursive_replace(obj, key, val) -> None:
"""Find and replace a vector in a nested (mutable) structure.
Args:
obj: object to replace within
key: key to replace
val: value to replace with
"""
if isinstance(obj, dict):
if key in obj:
obj[key] = val
for item in obj.values():
recursive_replace(item, key, val)
elif any(isinstance(obj, t) for t in (list, tuple)):
for item in obj:
recursive_replace(item, key, val)
[docs]
def get_absorption(wl: np.array, absfile: str) -> (np.array, np.array):
"""Calculate water and ice absorption coefficients using indices of
refraction, and interpolate them to new wavelengths (user specifies nm).
Args:
wl: wavelengths to interpolate to
absfile: file containing indices of refraction
Returns:
np.array: interpolated, wavelength-specific water absorption coefficients
np.array: interpolated, wavelength-specific ice absorption coefficients
"""
# read the indices of refraction
q = np.loadtxt(absfile, delimiter=",")
wl_orig_nm = q[:, 0]
wl_orig_cm = wl_orig_nm / 1e9 * 1e2
water_imag = q[:, 2]
ice_imag = q[:, 4]
# calculate absorption coefficients in cm^-1
water_abscf = water_imag * np.pi * 4.0 / wl_orig_cm
ice_abscf = ice_imag * np.pi * 4.0 / wl_orig_cm
# interpolate to new wavelengths (user provides nm)
water_abscf_intrp = np.interp(wl, wl_orig_nm, water_abscf)
ice_abscf_intrp = np.interp(wl, wl_orig_nm, ice_abscf)
return water_abscf_intrp, ice_abscf_intrp
[docs]
def get_refractive_index(k_wi, a, b, col_wvl, col_k):
"""Convert refractive index table entries to numpy array.
Args:
k_wi: variable
a: start line
b: end line
col_wvl: wavelength column in pandas table
col_k: k column in pandas table
Returns:
wvl_arr: array of wavelengths
k_arr: array of imaginary parts of refractive index
"""
wvl_ = []
k_ = []
for ii in range(a, b):
wvl = k_wi.at[ii, col_wvl]
k = k_wi.at[ii, col_k]
wvl_.append(wvl)
k_.append(k)
wvl_arr = np.asarray(wvl_)
k_arr = np.asarray(k_)
return wvl_arr, k_arr
[docs]
def recursive_reencode(j, shell_replace: bool = True):
"""Recursively re-encode a mutable object (ascii->str).
Args:
j: object to reencode
shell_replace: boolean helper for recursive calls
Returns:
Object: expanded, reencoded object
"""
if isinstance(j, dict):
for key, value in j.items():
j[key] = recursive_reencode(value)
return j
elif isinstance(j, list):
for i, k in enumerate(j):
j[i] = recursive_reencode(k)
return j
elif isinstance(j, tuple):
return tuple([recursive_reencode(k) for k in j])
else:
if shell_replace and isinstance(j, str):
try:
j = expandvars(j)
except IndexError:
pass
return j
[docs]
def json_load_ascii(filename: str, shell_replace: bool = True) -> dict:
"""Load a hierarchical structure, convert all unicode to ASCII and
expand environment variables.
Args:
filename: json file to load from
shell_replace: boolean
Returns:
dict: encoded dictionary
"""
with open(filename, "r") as fin:
j = json.load(fin)
return recursive_reencode(j, shell_replace)
[docs]
def expand_all_paths(to_expand: dict, absdir: str):
"""Expand any dictionary entry containing the string 'file' into
an absolute path, if needed.
Args:
to_expand: dictionary to expand
absdir: path to expand with (absolute directory)
Returns:
dict: dictionary with expanded paths
"""
def recursive_expand(j):
if isinstance(j, dict):
for key, value in j.items():
if (
isinstance(key, str)
and ("file" in key or "directory" in key or "path" in key)
and isinstance(value, str)
):
j[key] = expand_path(absdir, value)
else:
j[key] = recursive_expand(value)
return j
elif isinstance(j, list):
for i, k in enumerate(j):
j[i] = recursive_expand(k)
return j
elif isinstance(j, tuple):
return tuple([recursive_reencode(k) for k in j])
return j
return recursive_expand(to_expand)
[docs]
def resample_spectrum(
x: np.array, wl: np.array, wl2: np.array, fwhm2: np.array, fill: bool = False
) -> np.array:
"""Resample a spectrum to a new wavelength / FWHM.
Assumes Gaussian SRFs.
Args:
x: radiance vector
wl: sample starting wavelengths
wl2: wavelengths to resample to
fwhm2: full-width-half-max at resample resolution
fill: boolean indicating whether to fill in extrapolated regions
Returns:
np.array: interpolated radiance vector
"""
H = np.array(
[
spectral_response_function(wl, wi, fwhmi / 2.355)
for wi, fwhmi in zip(wl2, fwhm2)
]
)
if fill is False:
return np.dot(H, x[:, np.newaxis]).ravel()
else:
xnew = np.dot(H, x[:, np.newaxis]).ravel()
good = np.isfinite(xnew)
for i, xi in enumerate(xnew):
if not good[i]:
nearest_good_ind = np.argmin(abs(wl2[good] - wl2[i]))
xnew[i] = xnew[nearest_good_ind]
return xnew
[docs]
def load_spectrum(spectrum_file: str) -> (np.array, np.array):
"""Load a single spectrum from a text file with initial columns giving
wavelength and magnitude, respectively.
Args:
spectrum_file: file to load spectrum from
Returns:
np.array: spectrum values
np.array: wavelengths, if available in the file
"""
spectrum = np.loadtxt(spectrum_file)
if spectrum.ndim > 1:
spectrum = spectrum[:, :2]
wavelengths, spectrum = spectrum.T
if wavelengths[0] < 100:
wavelengths = wavelengths * 1000.0 # convert microns -> nm if needed
return spectrum, wavelengths
else:
return spectrum, None
[docs]
def spectral_response_function(response_range: np.array, mu: float, sigma: float):
"""Calculate the spectral response function.
Args:
response_range: signal range to calculate over
mu: mean signal value
sigma: signal variation
Returns:
np.array: spectral response function
"""
u = (response_range - mu) / abs(sigma)
y = (1.0 / (np.sqrt(2.0 * np.pi) * abs(sigma))) * np.exp(-u * u / 2.0)
srf = y / y.sum()
return srf
[docs]
def combos(inds: List[List[float]]) -> np.array:
"""Return all combinations of indices in a list of index sublists.
For example, the call::
combos([[1, 2], [3, 4, 5]])
...[[1, 3], [2, 3], [1, 4], [2, 4], [1, 5], [2, 5]]
This is used for interpolation in the high-dimensional LUT.
Args:
inds: list of lists of values to expand
Returns:
np.array: meshgrid array of combinations
"""
n = len(inds)
cases = np.prod([len(i) for i in inds])
gridded_combinations = np.array(np.meshgrid(*inds)).reshape((n, cases)).T
return gridded_combinations
[docs]
def conditional_gaussian(
mu: np.array, C: np.array, window: np.array, remain: np.array, x: np.array
) -> (np.array, np.array):
"""Define the conditional Gaussian distribution for convenience.
len(window)+len(remain)=len(x)
Args:
mu: mean values
C: matrix for conditioning
window: contains all indices not in remain
remain: contains indices of the observed part x1
x: values to condition with
Returns:
(np.array, np.array): conditional mean, conditional covariance
"""
w = np.array(window)[:, np.newaxis]
r = np.array(remain)[:, np.newaxis]
C11 = C[r, r.T]
C12 = C[r, w.T]
C21 = C[w, r.T]
C22 = C[w, w.T]
Cinv = svd_inv(C11)
conditional_mean = mu[window] + C21 @ Cinv @ (x - mu[remain])
conditional_cov = C22 - C21 @ Cinv @ C12
return conditional_mean, conditional_cov
[docs]
def ray_start(num_cores, num_cpus=2, memory_b=-1):
import subprocess
base_args = [
"ray",
"start",
"--head",
"--num-cpus",
str(int(num_cores / num_cpus)),
"--include-dashboard",
"0",
]
if memory_b != -1:
base_args.append("--memory")
base_args.append(str(int(memory_b / num_cpus)))
head_args = base_args.copy()
head_args.append("--head")
result = subprocess.run(head_args, capture_output=True)
stdout = str(result.stdout, encoding="utf-8")
if num_cpus > 1:
key = "--address="
start_loc = stdout.find(key) + len(key) + 1
end_loc = stdout.find("'", start_loc)
address = stdout[start_loc:end_loc]
base_args.append("--address")
base_args.append(address)
result = subprocess.run(base_args, capture_output=True)
from datetime import datetime as dtt
[docs]
class Track:
"""
Tracks and reports the percentage complete for some arbitrary sized iterable.
Borrowed from mlky
"""
def __init__(self, total, step=5, print=print, reverse=False, message="complete"):
"""
Parameters
----------
total: int, iterable
Total items in iterable. If iterable, will call len() on it
step: float, default=0.05
Percentage step size to use for reporting, eg. 0.05 is every 5%
print: func, default=print
Print function to use, eg. logging.info
reverse: bool, default=False
Reverse the count such that 0 is 100%
message: str, default="complete"
Message to be included in the output
"""
if hasattr(total, "__iter__"):
total = len(total)
self.step = step
self.total = total
self.print = print
self.start = dtt.now()
self.percent = step
self.reverse = reverse
self.message = message
def __call__(self, count):
"""
Parameters
----------
count: int, iterable
The current count of items finished. If iterable, will call len() on it
Returns
-------
bool
True if a percentage step was just crossed, False otherwise
"""
if hasattr(count, "__iter__"):
count = len(count)
current = count / self.total
if self.reverse:
current = 1 - current
current *= 100
if current >= self.percent:
elap = dtt.now() - self.start
rate = elap / self.total
esti = 100 / self.percent * elap - elap
self.print(
f"{current:6.2f}% {self.message} (elapsed: {elap}, rate: {rate}, eta: {esti})"
)
self.percent += self.step
return True
return False