#! /usr/bin/env python3
#
# Copyright 2019 California Institute of Technology
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# ISOFIT: Imaging Spectrometer Optimal FITting
# Author: David R Thompson, david.r.thompson@jpl.nasa.gov
#
import os
from types import SimpleNamespace
import click
import numpy as np
import scipy
from scipy.linalg import inv
from sklearn.cluster import KMeans
from spectral.io import envi
from isofit.core.common import envi_header, expand_path, json_load_ascii, svd_inv
[docs]
def next_diag_val(C: np.ndarray, starting_index, direction):
if direction == 1:
for i in range(starting_index, C.shape[0]):
if np.isnan(C[i, i]) == False:
return C[i, i]
if direction == -1:
for i in range(starting_index, -1, -1):
if np.isnan(C[i, i]) == False:
return C[i, i]
return None
[docs]
def surface_model(
config_path: str, wavelength_path: str = None, output_path: str = None, seed=13
) -> None:
"""The surface model tool contains everything you need to build basic
multicomponent (i.e. colleciton of Gaussian) surface priors for the
multicomponent surface model.
Args:
config_path: path to a JSON formatted surface model configuration
wavelength_path: optional path to a three-column wavelength file,
overriding the configuration file settings
output_path: optional path to the destination .mat file, overriding
the configuration file settings
seed: seed used for clustering
Returns:
None
"""
# Set seed for random draws later
np.random.seed(seed)
# Load configuration JSON into a local dictionary
configdir, _ = os.path.split(os.path.abspath(config_path))
config = json_load_ascii(config_path, shell_replace=True)
if wavelength_path:
wavelength_file = wavelength_path
else:
if "wavelength_file" not in config:
raise ValueError(
"Missing config parameter 'wavelength_file', set via surface_model(wavelength_path=...)"
)
wavelength_file = expand_path(configdir, config["wavelength_file"])
if output_path:
outfile = output_path
else:
if "output_model_file" not in config:
raise ValueError(
"Missing config parameter 'output_model_file', set via surface_model(output_path=...)"
)
outfile = expand_path(configdir, config["output_model_file"])
# Determine top level parameters
for q in ["sources", "normalize"]:
if q not in config:
raise ValueError("Missing parameter: %s" % q)
normalize = config["normalize"]
reference_windows = config["reference_windows"]
# load wavelengths file, and change units to nm if needed
if os.path.splitext(wavelength_file)[-1] == ".hdr":
ds = envi.open(wavelength_file)
wl = np.array([float(x) for x in ds.metadata["wavelength"]])
else:
q = np.loadtxt(wavelength_file)
if q.shape[1] > 2:
q = q[:, 1:]
wl = q[:, 0]
if wl[0] < 100:
wl *= 1000.0
nchan = len(wl)
# build global reference windows
refwl = []
for wi, window in enumerate(reference_windows):
active_wl = np.logical_and(wl >= window[0], wl < window[1])
refwl.extend(wl[active_wl])
normind = np.array([np.argmin(abs(wl - w)) for w in refwl])
refwl = np.array(refwl, dtype=float)
# create basic model template
model = {
"normalize": normalize,
"wl": wl,
"means": [],
"covs": [],
"attribute_means": [],
"attribute_covs": [],
"attributes": [],
"refwl": refwl,
}
# each "source" (i.e. spectral library) is treated separately
for si, source_config in enumerate(config["sources"]):
# Determine source parameters
for q in ["input_spectrum_files", "windows", "n_components", "windows"]:
if q not in source_config:
raise ValueError("Source %i is missing a parameter: %s" % (si, q))
# Determine whether we should synthesize our own mixtures
if "mixtures" in source_config:
mixtures = source_config["mixtures"]
elif "mixtures" in config:
mixtures = config["mixtures"]
else:
mixtures = 0
# open input files associated with this source
infiles = [
expand_path(configdir, fi) for fi in source_config["input_spectrum_files"]
]
# associate attributes, if they exist. These will not be used
# in the retrieval, but can be used in post-analysis
if "input_attribute_files" in source_config:
infiles_attributes = [
expand_path(configdir, fi)
for fi in source_config["input_attribute_files"]
]
if len(infiles_attributes) != len(infiles):
raise IndexError("spectrum / attribute file mismatch")
else:
infiles_attributes = [None for fi in source_config["input_spectrum_files"]]
ncomp = int(source_config["n_components"])
windows = source_config["windows"]
# load spectra
spectra, attributes = [], []
for infile, attribute_file in zip(infiles, infiles_attributes):
rfl = envi.open(envi_header(infile), infile)
nl, nb, ns = [int(rfl.metadata[n]) for n in ("lines", "bands", "samples")]
swl = np.array([float(f) for f in rfl.metadata["wavelength"]])
# Maybe convert to nanometers
if swl[0] < 100:
swl = swl * 1000.0
# Load library and adjust interleave, if needed
rfl_mm = rfl.open_memmap(interleave="bip", writable=False)
x = np.array(rfl_mm[:, :, :])
x = x.reshape(nl * ns, nb)
# import spectra and resample
for x1 in x:
p = scipy.interpolate.interp1d(
swl, x1, kind="linear", bounds_error=False, fill_value="extrapolate"
)
spectra.append(p(wl))
# Load attributes
if attribute_file is not None:
attr = envi.open(envi_header(attribute_file), attribute_file)
nla, nba, nsa = [
int(attr.metadata[n]) for n in ("lines", "bands", "samples")
]
# Load library and adjust interleave, if needed
attr_mm = attr.open_memmap(interleave="bip", writable=False)
x = np.array(attr_mm[:, :, :])
x = x.reshape(nla * nsa, nba)
model["attributes"] = attr.metadata["band names"]
# import spectra and resample
for x1 in x:
attributes.append(x1)
if len(attributes) > 0 and len(attributes) != len(spectra):
raise IndexError("Mismatch in number of spectra vs. attributes")
# calculate mixtures, if needed
if len(attributes) > 0 and mixtures > 0:
raise ValueError("Synthetic mixtures w/ attributes is not advised")
n = float(len(spectra))
nmix = int(n * mixtures)
for mi in range(nmix):
s1, m1 = spectra[int(np.random.rand() * n)], np.random.rand()
s2, m2 = spectra[int(np.random.rand() * n)], 1.0 - m1
spectra.append(m1 * s1 + m2 * s2)
# Lists to arrays
spectra = np.array(spectra)
attributes = np.array(attributes)
# Flag bad data
use = np.all(np.isfinite(spectra), axis=1)
spectra = spectra[use, :]
if len(attributes) > 0:
attributes = attributes[use, :]
## Accumulate total list of window indices
# window_idx = -np.ones((nchan), dtype=int)
# for wi, win in enumerate(windows):
# active_wl = np.logical_and(wl >= win['interval'][0], wl < win['interval'][1])
# window_idx[active_wl] = wi
# Two step model generation. First step is k-means clustering.
# This is more "stable" than Expectation Maximization with an
# unconstrained covariance matrix
kmeans = KMeans(
init="k-means++", n_clusters=ncomp, n_init=10, random_state=seed
)
kmeans.fit(spectra)
Z = kmeans.predict(spectra)
# Build a combined dataset of attributes and spectra
if len(attributes) > 0:
spectra_attr = np.concatenate((spectra, attributes), axis=1)
# now fit the full covariance for each component
for ci in range(ncomp):
print(ci, source_config["input_spectrum_files"])
m = np.mean(spectra[Z == ci, :], axis=0)
C = np.cov(spectra[Z == ci, :], rowvar=False)
C_base = C.copy()
if len(attributes) > 0:
m_attr = np.mean(spectra_attr[Z == ci, :], axis=0)
C_attr = np.cov(spectra_attr[Z == ci, :], rowvar=False)
# for i in range(nchan):
for window in windows:
window_idx = np.where(
np.logical_and(
wl >= window["interval"][0], wl < window["interval"][1]
)
)[0]
if len(window_idx) == 0:
continue
window_range = slice(window_idx[0], window_idx[-1] + 1)
# To minimize bias, leave the channels independent
# and uncorrelated
if window["correlation"] == "decorrelated":
c_diag = (
C[window_range, window_range] + float(window["regularizer"])
) * np.eye(len(window_idx))
C[window_range, :] = 0
C[:, window_range] = 0
C[window_range, window_range] = c_diag
for window in windows:
window_idx = np.where(
np.logical_and(
wl >= window["interval"][0], wl < window["interval"][1]
)
)[0]
if len(window_idx) == 0:
continue
window_range = slice(window_idx[0], window_idx[-1] + 1)
# Each spectral interval, or window, is constructed
# using one of several rules. We can draw the covariance
# directly from the data...
if window["correlation"] in ["EM", "EM-gauss"]:
cdiag = (
C_base[window_range, window_range]
+ np.eye(len(window_idx)) * float(window["regularizer"])
).copy()
if "isolated" in list(window.keys()) and window["isolated"] == 1:
C[window_range, :] = 0
C[:, window_range] = 0
C[window_range, window_range] = cdiag
window_idx = np.where(
np.logical_and(
wl >= window["interval"][0], wl < window["interval"][1]
)
)[0]
if len(window_idx) == 0:
continue
window_range = slice(window_idx[0], window_idx[-1] + 1)
if window["correlation"] == "GP":
for i in window_idx:
# Alternatively, we can use a band diagonal form,
# a Gaussian process that promotes local smoothness.
width = float(window["gp_width"])
magnitude = float(window["gp_magnitude"])
kernel = scipy.stats.norm.pdf((wl - wl[i]) / width)
kernel = kernel / kernel.sum() * magnitude
C[i, :] = kernel
C[:, i] = kernel
C[i, i] = C[i, i] + float(window["regularizer"])
elif window["correlation"] in ["decorrelated", "EM"]:
# already handled
continue
else:
raise ValueError(
"I do not recognize the method " + window["correlation"]
)
# Now do any cross-block feathering by augmenting the precision matrix
P = inv(C)
for window in windows:
window_idx = np.where(
np.logical_and(
wl >= window["interval"][0], wl < window["interval"][1]
)
)[0]
if len(window_idx) == 0:
continue
# Look for the "feather_forward" or "feather_backward" options
if window["correlation"] in ["EM", "decorrelated"]:
if "feather_backward" in list(window.keys()):
P[window_idx[0] - 1, window_idx[0]] -= (
1.0 / window["feather_backward"]
)
P[window_idx[0], window_idx[0] - 1] -= (
1.0 / window["feather_backward"]
)
if "feather_forward" in list(window.keys()):
P[window_idx[-1] + 1, window_idx[-1]] -= (
1.0 / window["feather_forward"]
)
P[window_idx[-1], window_idx[-1] + 1] -= (
1.0 / window["feather_forward"]
)
C = inv(P)
# Normalize the component spectrum if desired
if normalize == "Euclidean":
z = np.sqrt(np.sum(pow(m[normind], 2)))
elif normalize == "RMS":
z = np.sqrt(np.mean(pow(m[normind], 2)))
elif normalize == "None":
z = 1.0
else:
raise ValueError("Unrecognized normalization: %s\n" % normalize)
m = m / z
C = C / (z**2)
try:
Cinv = svd_inv(C)
except:
errstr = f"C from group {si, ci} is not invertible."
raise AttributeError(errstr)
model["means"].append(m)
model["covs"].append(C)
if len(attributes) > 0:
model["attribute_means"].append(m_attr)
model["attribute_covs"].append(C_attr)
model["means"] = np.array(model["means"])
model["covs"] = np.array(model["covs"])
model["attribute_means"] = np.array(model["attribute_means"])
model["attribute_covs"] = np.array(model["attribute_covs"])
scipy.io.savemat(outfile, model)
# Input arguments
@click.command(name="surface_model")
@click.argument("config_path", type=str)
@click.argument("--wavelength_path", type=str)
@click.argument("--output_path", type=str)
@click.argument("--seed", default=13, type=int)
def cli_surface_model(**kwargs):
"""Build a new surface model to a block of data"""
# SimpleNamespace converts a dict into dot-notational
surface_model(SimpleNamespace(**kwargs))
click.echo("Done")
if __name__ == "__main__":
raise NotImplementedError(
"surface_model.py cannot be called this way. Run as:\n isofit surface_model [CONFIG_PATH]"
)