#! /usr/bin/env python3
#
# Copyright 2018 California Institute of Technology
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# ISOFIT: Imaging Spectrometer Optimal FITting
# Author: David R Thompson, david.r.thompson@jpl.nasa.gov
#
import logging
import os
from collections import OrderedDict
from typing import List
import numpy as np
import scipy.interpolate
import scipy.io
import xarray as xr
from spectral.io import envi
from isofit.configs import Config
from isofit.core.common import envi_header
from isofit.core.forward import ForwardModel
from isofit.inversion.inverse import Inversion
from isofit.inversion.inverse_simple import invert_algebraic, invert_simple
from .common import eps, load_spectrum, resample_spectrum
from .geometry import Geometry
### Variables ###
# Constants related to file I/O
typemap = {
np.uint8: 1,
np.int16: 2,
np.int32: 3,
np.float32: 4,
np.float64: 5,
np.complex64: 6,
np.complex128: 9,
np.uint16: 12,
np.uint32: 13,
np.int64: 14,
np.uint64: 15,
}
max_frames_size = 100
### Classes ###
[docs]
class SpectrumFile:
"""A buffered file object that contains configuration information about formatting, etc."""
def __init__(
self,
fname,
write=False,
n_rows=None,
n_cols=None,
n_bands=None,
interleave=None,
dtype=np.float32,
wavelengths=None,
fwhm=None,
band_names=None,
bad_bands="[]",
zrange="{0.0, 1.0}",
flag=-9999.0,
ztitles="{Wavelength (nm), Magnitude}",
map_info="{}",
):
"""."""
self.frames = OrderedDict()
self.write = write
self.fname = os.path.abspath(fname)
self.wl = wavelengths
self.band_names = band_names
self.fwhm = fwhm
self.flag = flag
self.n_rows = n_rows
self.n_cols = n_cols
self.n_bands = n_bands
if self.fname.endswith(".txt"):
# The .txt suffix implies a space-separated ASCII text file of
# one or more data columns. This is cheap to load and store, so
# we do not defer read/write operations.
logging.debug("Inferred ASCII file format for %s" % self.fname)
self.format = "ASCII"
if not self.write:
self.data, self.wl = load_spectrum(self.fname)
self.n_rows, self.n_cols, self.map_info = 1, 1, "{}"
if self.wl is not None:
self.n_bands = len(self.wl)
else:
self.n_bands = None
self.meta = {}
elif self.fname.endswith(".mat"):
# The .mat suffix implies a matlab-style file, i.e. a dictionary
# of 2D arrays and other matlab-like objects. This is typically
# only used for specific output products associated with single
# spectrum retrievals; there is no read option.
logging.debug("Inferred MATLAB file format for %s" % self.fname)
self.format = "MATLAB"
if not self.write:
logging.error("Unsupported MATLAB file in input block")
raise IOError("MATLAB format in input block not supported")
elif self.fname.endswith(".nc"):
logging.debug(f"Inferred MATLAB file format for {self.fname}")
self.format = "NETCDF"
if not self.write:
self.dataset = xr.open_dataset(self.fname)
self.data = self.dataset.radiance
self.meta = self.data.attrs
self.n_rows = self.data.downtrack.size
self.n_cols = self.data.crosstrack.size
self.n_bands = self.data.bands.size
else:
# EMIT-specific metadata
meta = {
"ncei_template_version": "NCEI_NetCDF_Swath_Template_v2.0",
"summary": "The Earth Surface Mineral Dust Source Investigation (EMIT) is an Earth Ventures-Instrument (EVI-4) Mission that maps the surface mineralogy of arid dust source regions via imaging spectroscopy in the visible and short-wave infrared (VSWIR). Installed on the International Space Station (ISS), the EMIT instrument is a Dyson imaging spectrometer that uses contiguous spectroscopic measurements from 410 to 2450 nm to resolve absoprtion features of iron oxides, clays, sulfates, carbonates, and other dust-forming minerals. During its one-year mission, EMIT will observe the sunlit Earth's dust source regions that occur within +/-52° latitude and produce maps of the source regions that can be used to improve forecasts of the role of mineral dust in the radiative forcing (warming or cooling) of the atmosphere.\\n\\nThis file contains L2A estimated surface reflectances and geolocation data. Reflectance estimates are created using an Optimal Estimation technique - see ATBD for details. Reflectance values are reported as fractions (relative to 1).",
"keywords": "Imaging Spectroscopy, minerals, EMIT, dust, radiative forcing",
"Conventions": "CF-1.63",
"sensor": "EMIT (Earth Surface Mineral Dust Source Investigation)",
"instrument": "EMIT",
"platform": "ISS",
"institution": "NASA Jet Propulsion Laboratory/California Institute of Technology",
"license": "",
"naming_authority": "",
"date_created": str(dtt.now()),
"keywords_vocabulary": "NASA Global Change Master Directory (GCMD) Science Keywords",
"stdname_vocabulary": "NetCDF Climate and Forecast (CF) Metadata Convention",
"creator_name": "ISOFIT",
"creator_url": "",
"project": "Earth Surface Mineral Dust Source Investigation",
"project_url": "https://emit.jpl.nasa.gov/",
"publisher_name": "",
"publisher_url": "",
"publisher_email": "",
"identifier_product_doi_authority": "https://doi.org",
"flight_line": "",
"time_coverage_start": "",
"time_coverage_end": "",
"software_build_version": "",
"software_delivery_version": "",
"product_version": "",
"history": "ISOFIT generated",
"crosstrack_orientation": "",
"easternmost_longitude": None,
"northernmost_latitude": None,
"westernmost_longitude": None,
"southernmost_latitude": None,
"spatialResolution": None,
"spatial_ref": "",
"geotransform": [],
"day_night_flag": "",
"title": "EMIT L2A Estimated Surface Reflectance",
}
self.n_rows = n_rows
self.n_cols = n_cols
self.n_bands = n_bands
self.dataset = xr.Dataset(
{
"reflectance": (
("downtrack", "crosstrack", "bands"),
np.zeros((n_rows, n_cols, n_bands)),
)
}
)
self.dataset.attrs = meta
self.data = self.dataset.reflectance
# Initialize the file
self.dataset.to_netcdf(self.fname)
else:
# Otherwise we assume it is an ENVI-format file, which is
# basically just a binary data cube with a detached human-
# readable ASCII header describing dimensions, interleave, and
# metadata. We buffer this data in self.frames, reading and
# writing individual rows of the cube on-demand.
logging.debug("Inferred ENVI file format for %s" % self.fname)
self.format = "ENVI"
if not self.write:
# If we are an input file, the header must preexist.
if not os.path.exists(envi_header(self.fname)):
logging.error("Could not find %s" % (envi_header(self.fname)))
raise IOError("Could not find %s" % (envi_header(self.fname)))
# open file and copy metadata
self.file = envi.open(envi_header(self.fname), fname)
self.meta = self.file.metadata.copy()
self.n_rows = int(self.meta["lines"])
self.n_cols = int(self.meta["samples"])
self.n_bands = int(self.meta["bands"])
if "data ignore value" in self.meta:
self.flag = float(self.meta["data ignore value"])
else:
self.flag = -9999.0
else:
# If we are an output file, we may have to build the header
# from scratch. Hopefully the caller has supplied the
# necessary metadata details.
meta = {
"lines": n_rows,
"samples": n_cols,
"bands": n_bands,
"byte order": 0,
"header offset": 0,
"map info": map_info,
"file_type": "ENVI Standard",
"sensor type": "unknown",
"interleave": interleave,
"data type": typemap[dtype],
"wavelength units": "nm",
"z plot range": zrange,
"z plot titles": ztitles,
"fwhm": fwhm,
"bbl": bad_bands,
"band names": band_names,
"wavelength": self.wl,
}
for k, v in meta.items():
if v is None:
logging.error("Must specify %s" % (k))
raise IOError("Must specify %s" % (k))
if os.path.isfile(envi_header(fname)) is False:
self.file = envi.create_image(
envi_header(fname), meta, ext="", force=True
)
else:
self.file = envi.open(envi_header(fname))
self.open_map_with_retries()
[docs]
def open_map_with_retries(self):
"""Try to open a memory map, handling Beowulf I/O issues."""
self.memmap = None
for attempt in range(10):
self.memmap = self.file.open_memmap(interleave="bip", writable=self.write)
if self.memmap is not None:
return
raise IOError("could not open memmap for " + self.fname)
[docs]
def get_frame(self, row):
"""The frame is a 2D array, essentially a list of spectra. The
self.frames list acts as a hash table to avoid storing the
entire cube in memory. So we read them or create them on an
as-needed basis. When the buffer flushes via a call to
flush_buffers, they will be deleted."""
if row not in self.frames:
if not self.write:
d = self.memmap[row, :, :]
self.frames[row] = d.copy()
else:
self.frames[row] = np.nan * np.zeros((self.n_cols, self.n_bands))
return self.frames[row]
[docs]
def write_spectrum(self, row, col, x):
"""We write a spectrum. If a binary format file, we simply change
the data cached in self.frames and defer file I/O until
flush_buffers is called."""
if self.format == "ASCII":
# Multicolumn output for ASCII products
np.savetxt(self.fname, x, fmt="%10.6f")
elif self.format == "MATLAB":
# Dictionary output for MATLAB products
scipy.io.savemat(self.fname, x)
elif self.format == "NETCDF":
self.data[row, col][:] = x
else:
# Omit wavelength column for spectral products
frame = self.get_frame(row)
if x.ndim == 2:
x = x[:, -1]
frame[col, :] = x
[docs]
def read_spectrum(self, row, col):
"""Get a spectrum from the frame list or ASCII file. Note that if
we are an ASCII file, we have already read the single spectrum and
return it as-is (ignoring the row/column indices)."""
if self.format == "ASCII":
return self.data
elif self.format == "NETCDF":
return self.data.sel(downtrack=row, crosstrack=col).data
else:
frame = self.get_frame(row)
return frame[col]
[docs]
def flush_buffers(self):
"""Write to file, and refresh the memory map object."""
if self.format == "ENVI":
if self.write:
for row, frame in self.frames.items():
valid = np.logical_not(np.isnan(frame[:, 0]))
self.memmap[row, valid, :] = frame[valid, :]
self.frames = OrderedDict()
del self.file
self.file = envi.open(envi_header(self.fname), self.fname)
self.open_map_with_retries()
elif self.format == "NETCDF":
self.dataset.to_netcdf(self.fname)
[docs]
class IO:
"""..."""
def __init__(self, config: Config, forward: ForwardModel):
"""Initialization specifies retrieval subwindows for calculating
measurement cost distributions."""
self.config = config
self.bbl = (
"{"
+ ",".join([str(1) for n in range(len(forward.instrument.wl_init))])
+ "}"
)
self.radiance_correction = None
self.meas_wl = forward.instrument.wl_init
self.meas_fwhm = forward.instrument.fwhm_init
self.writes = 0
self.reads = 0
self.n_rows = 1
self.n_cols = 1
self.n_sv = len(forward.statevec)
self.n_chan = len(forward.instrument.wl_init)
self.flush_rate = config.implementation.io_buffer_size
self.simulation_mode = config.implementation.mode == "simulation"
self.total_time = 0
self.current_input_data = InputData()
# Names of either the wavelength or statevector outputs
wl_names = [("Channel %i" % i) for i in range(self.n_chan)]
sv_names = forward.statevec.copy()
self.input_datasets, self.output_datasets, self.map_info = {}, {}, "{}"
# Load input files and record relevant metadata
for element, element_name in zip(*self.config.input.get_elements()):
self.input_datasets[element_name] = SpectrumFile(element)
if (self.input_datasets[element_name].n_rows > self.n_rows) or (
self.input_datasets[element_name].n_cols > self.n_cols
):
self.n_rows = self.input_datasets[element_name].n_rows
self.n_cols = self.input_datasets[element_name].n_cols
for inherit in ["map info", "bbl"]:
if inherit in self.input_datasets[element_name].meta:
setattr(
self,
inherit.replace(" ", "_"),
self.input_datasets[element_name].meta[inherit],
)
for element, element_header, element_name in zip(
*self.config.output.get_output_files()
):
band_names, ztitle, zrange = element_header
if band_names == "statevector":
band_names = sv_names
elif band_names == "wavelength":
band_names = wl_names
elif band_names == "atm_coeffs":
band_names = wl_names * 5
else:
band_names = "{}"
n_bands = len(band_names)
self.output_datasets[element_name] = SpectrumFile(
element,
write=True,
n_rows=self.n_rows,
n_cols=self.n_cols,
n_bands=n_bands,
interleave="bip",
dtype=np.float32,
wavelengths=self.meas_wl,
fwhm=self.meas_fwhm,
band_names=band_names,
bad_bands=self.bbl,
map_info=self.map_info,
zrange=zrange,
ztitles=ztitle,
)
# Do we apply a radiance correction?
if self.config.input.radiometry_correction_file is not None:
filename = self.config.input.radiometry_correction_file
self.radiance_correction, wl = load_spectrum(filename)
[docs]
def get_components_at_index(self, row: int, col: int) -> InputData:
"""
Load data from input files at the specified (row, col) index.
Args:
row: row to retrieve data from
col: column to retrieve data from
Returns:
InputData: object containing all current data reads
"""
# Prepare out input data object by blanking it out
self.current_input_data.clear()
# Determine the appropriate row, column index. and initialize the
# data dictionary with empty entries.
data = dict([(i, None) for i in self.config.input.get_all_element_names()])
logging.debug(f"Row {row} Column {col}")
# Read data from any of the input files that are defined.
for source in self.input_datasets:
data[source] = self.input_datasets[source].read_spectrum(row, col)
self.reads += 1
if self.reads >= self.flush_rate:
self.flush_buffers()
if self.simulation_mode:
# If solving the inverse problem, the measurment is the surface reflectance
meas = data["reflectance_file"].copy()
else:
# If solving the inverse problem, the measurment is the radiance
# We apply the calibration correciton here for simplicity.
meas = data["measured_radiance_file"]
if meas is not None:
meas = meas.copy()
if data["radiometry_correction_file"] is not None:
meas *= data["radiometry_correction_file"]
self.current_input_data.meas = meas
if self.current_input_data.meas is None or np.all(
self.current_input_data.meas < -49
):
return None
## Check for any bad data flags
for source in self.input_datasets:
if np.all(abs(data[source] - self.input_datasets[source].flag) < eps):
return None
# We build the geometry object for this spectrum. For files not
# specified in the input configuration block, the associated entries
# will be 'None'. The Geometry object will use reasonable defaults.
geom = Geometry(
obs=data["obs_file"],
loc=data["loc_file"],
bg_rfl=data["background_reflectance_file"],
)
self.current_input_data.geom = geom
self.current_input_data.reference_reflectance = data[
"reference_reflectance_file"
]
return self.current_input_data
[docs]
def flush_buffers(self):
"""Write all buffered output data to disk, and erase read buffers."""
for file_dictionary in [self.input_datasets, self.output_datasets]:
for name, fi in file_dictionary.items():
fi.flush_buffers()
self.reads = 0
self.writes = 0
[docs]
def write_datasets(
self, row: int, col: int, output: dict, states: List, flush_immediately=False
):
"""
Write all valid datasets to disk (possibly buffered).
Args:
row: row to write to
col: column to write to
output: dictionary with keys corresponding to config.input file references
states: results states from inversion. In the MCMC case, these are interpreted as samples from the
posterior, otherwise they are a gradient descent trajectory (with the last spectrum being the converged
solution).
flush_immediately: IO argument telling us to immediately write to disk, ignoring config settings
"""
for product in self.output_datasets:
logging.debug("IO: Writing " + product)
self.output_datasets[product].write_spectrum(row, col, output[product])
# Special case! samples file is matlab format.
if self.config.output.mcmc_samples_file is not None:
logging.debug("IO: Writing mcmc_samples_file")
mdict = {"samples": states}
scipy.io.savemat(self.config.output.mcmc_samples_file, mdict)
self.writes += 1
if self.writes >= self.flush_rate or flush_immediately:
self.flush_buffers()
[docs]
def build_output(
self, states: List, input_data: InputData, fm: ForwardModel, iv: Inversion
):
"""
Build the output to be written to disk as a dictionary
Args:
states: results states from inversion. In the MCMC case, these are interpreted as samples from the
posterior, otherwise they are a gradient descent trajectory (with the last spectrum being the converged
solution).
input_data: an InputData object
fm: the forward model used to solve the inversion
iv: the inversion object
"""
if len(states) == 0:
# Write a bad data flag
atm_bad = np.zeros(len(fm.instrument.n_chan) * 5) * -9999.0
state_bad = np.zeros(len(fm.statevec)) * -9999.0
data_bad = np.zeros(fm.instrument.n_chan) * -9999.0
to_write = {
"estimated_state_file": state_bad,
"estimated_reflectance_file": data_bad,
"estimated_emission_file": data_bad,
"modeled_radiance_file": data_bad,
"apparent_reflectance_file": data_bad,
"path_radiance_file": data_bad,
"simulated_measurement_file": data_bad,
"algebraic_inverse_file": data_bad,
"atmospheric_coefficients_file": atm_bad,
"radiometry_correction_file": data_bad,
"spectral_calibration_file": data_bad,
"posterior_uncertainty_file": state_bad,
}
else:
to_write = {}
meas = input_data.meas
geom = input_data.geom
reference_reflectance = input_data.reference_reflectance
if self.config.implementation.mode == "inversion_mcmc":
state_est = states.mean(axis=0)
else:
state_est = states[-1, :]
############ Start with all of the 'independent' calculations
if "estimated_state_file" in self.output_datasets:
to_write["estimated_state_file"] = state_est
if "path_radiance_file" in self.output_datasets:
path_est = fm.calc_meas(
state_est, geom, rfl=np.zeros(self.meas_wl.shape)
)
to_write["path_radiance_file"] = np.column_stack(
(fm.instrument.wl_init, path_est)
)
if "spectral_calibration_file" in self.output_datasets:
# Spectral calibration
wl, fwhm = fm.calibration(state_est)
cal = np.column_stack(
[np.arange(0, len(wl)), wl / 1000.0, fwhm / 1000.0]
)
to_write["spectral_calibration_file"] = cal
if "posterior_uncertainty_file" in self.output_datasets:
S_hat, K, G = iv.calc_posterior(state_est, geom, meas)
to_write["posterior_uncertainty_file"] = np.sqrt(np.diag(S_hat))
############ Now proceed to the calcs where they may be some overlap
if any(
item in ["estimated_emission_file", "apparent_reflectance_file"]
for item in self.output_datasets
):
Ls_est = fm.calc_Ls(state_est, geom)
if any(
item
in [
"estimated_reflectance_file",
"apparent_reflectance_file",
"modeled_radiance_file",
"simulated_measurement_file",
]
for item in self.output_datasets
):
lamb_est = fm.calc_lamb(state_est, geom)
if any(
item in ["modeled_radiance_file", "simulated_measurement_file"]
for item in self.output_datasets
):
meas_est = fm.calc_meas(state_est, geom, rfl=lamb_est)
if "modeled_radiance_file" in self.output_datasets:
to_write["modeled_radiance_file"] = np.column_stack(
(fm.instrument.wl_init, meas_est)
)
if "simulated_measurement_file" in self.output_datasets:
meas_sim = fm.instrument.simulate_measurement(meas_est, geom)
to_write["simulated_measurement_file"] = np.column_stack(
(self.meas_wl, meas_sim)
)
if "estimated_emission_file" in self.output_datasets:
to_write["estimated_emission_file"] = np.column_stack(
(self.meas_wl, Ls_est)
)
if "estimated_reflectance_file" in self.output_datasets:
to_write["estimated_reflectance_file"] = np.column_stack(
(self.meas_wl, lamb_est)
)
if "apparent_reflectance_file" in self.output_datasets:
# Upward emission & glint and apparent reflectance
apparent_rfl_est = lamb_est + Ls_est
to_write["apparent_reflectance_file"] = np.column_stack(
(self.meas_wl, apparent_rfl_est)
)
x_surface, x_RT, x_instrument = fm.unpack(state_est)
if any(
item in ["algebraic_inverse_file", "atmospheric_coefficients_file"]
for item in self.output_datasets
):
rfl_alg_opt, Ls, coeffs = invert_algebraic(
fm.surface,
fm.RT,
fm.instrument,
x_surface,
x_RT,
x_instrument,
meas,
geom,
)
if "algebraic_inverse_file" in self.output_datasets:
to_write["algebraic_inverse_file"] = np.column_stack(
(self.meas_wl, rfl_alg_opt)
)
if "atmospheric_coefficients_file" in self.output_datasets:
rhoatm, sphalb, transm, solar_irr, coszen, transup = coeffs
atm = np.column_stack(
list(coeffs[:4]) + [np.ones((len(self.meas_wl), 1)) * coszen]
)
atm = atm.T.reshape((len(self.meas_wl) * 5,))
to_write["atmospheric_coefficients_file"] = atm
if "radiometry_correction_file" in self.output_datasets:
factors = np.ones(len(self.meas_wl))
if "reference_reflectance_file" in self.input_datasets:
meas_est = fm.calc_meas(state_est, geom, rfl=reference_reflectance)
factors = meas_est / meas
else:
logging.warning("No reflectance reference")
to_write["radiometry_correction_file"] = factors
return to_write
[docs]
def write_spectrum(
self,
row: int,
col: int,
states: List,
fm: ForwardModel,
iv: Inversion,
flush_immediately=False,
input_data: InputData = None,
):
"""
Convenience function to build and write output in one step
Args:
row: data row to write
col: data column to write
states: results states from inversion. In the MCMC case, these are interpreted as samples from the
posterior, otherwise they are a gradient descent trajectory (with the last spectrum being the converged
solution).
meas: measurement radiance
geom: geometry object of the observation
fm: the forward model used to solve the inversion
iv: the inversion object
flush_immediately: IO argument telling us to immediately write to disk, ignoring config settings
input_data: optionally overwride self.current_input_data
"""
if input_data is None:
to_write = self.build_output(states, self.current_input_data, fm, iv)
else:
to_write = self.build_output(states, input_data, fm, iv)
self.write_datasets(
row, col, to_write, states, flush_immediately=flush_immediately
)
[docs]
def write_bil_chunk(
dat: np.array, outfile: str, line: int, shape: tuple, dtype: str = "float32"
) -> None:
"""
Write a chunk of data to a binary, BIL formatted data cube.
Args:
dat: data to write
outfile: output file to write to
line: line of the output file to write to
shape: shape of the output file
dtype: output data type
Returns:
None
"""
outfile = open(outfile, "rb+")
outfile.seek(line * shape[1] * shape[2] * np.dtype(dtype).itemsize)
outfile.write(dat.astype(dtype).tobytes())
outfile.close()