Source code for isofit.core.fileio

#! /usr/bin/env python3
#
#  Copyright 2018 California Institute of Technology
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#
# ISOFIT: Imaging Spectrometer Optimal FITting
# Author: David R Thompson, david.r.thompson@jpl.nasa.gov
#

import logging
import os
from collections import OrderedDict
from typing import List

import numpy as np
import scipy.interpolate
import scipy.io
import xarray as xr
from spectral.io import envi

from isofit.configs import Config
from isofit.core.common import envi_header
from isofit.core.forward import ForwardModel
from isofit.inversion.inverse import Inversion
from isofit.inversion.inverse_simple import invert_algebraic, invert_simple

from .common import eps, load_spectrum, resample_spectrum
from .geometry import Geometry

### Variables ###

# Constants related to file I/O
typemap = {
    np.uint8: 1,
    np.int16: 2,
    np.int32: 3,
    np.float32: 4,
    np.float64: 5,
    np.complex64: 6,
    np.complex128: 9,
    np.uint16: 12,
    np.uint32: 13,
    np.int64: 14,
    np.uint64: 15,
}

max_frames_size = 100


### Classes ###


[docs] class SpectrumFile: """A buffered file object that contains configuration information about formatting, etc.""" def __init__( self, fname, write=False, n_rows=None, n_cols=None, n_bands=None, interleave=None, dtype=np.float32, wavelengths=None, fwhm=None, band_names=None, bad_bands="[]", zrange="{0.0, 1.0}", flag=-9999.0, ztitles="{Wavelength (nm), Magnitude}", map_info="{}", ): """.""" self.frames = OrderedDict() self.write = write self.fname = os.path.abspath(fname) self.wl = wavelengths self.band_names = band_names self.fwhm = fwhm self.flag = flag self.n_rows = n_rows self.n_cols = n_cols self.n_bands = n_bands if self.fname.endswith(".txt"): # The .txt suffix implies a space-separated ASCII text file of # one or more data columns. This is cheap to load and store, so # we do not defer read/write operations. logging.debug("Inferred ASCII file format for %s" % self.fname) self.format = "ASCII" if not self.write: self.data, self.wl = load_spectrum(self.fname) self.n_rows, self.n_cols, self.map_info = 1, 1, "{}" if self.wl is not None: self.n_bands = len(self.wl) else: self.n_bands = None self.meta = {} elif self.fname.endswith(".mat"): # The .mat suffix implies a matlab-style file, i.e. a dictionary # of 2D arrays and other matlab-like objects. This is typically # only used for specific output products associated with single # spectrum retrievals; there is no read option. logging.debug("Inferred MATLAB file format for %s" % self.fname) self.format = "MATLAB" if not self.write: logging.error("Unsupported MATLAB file in input block") raise IOError("MATLAB format in input block not supported") elif self.fname.endswith(".nc"): logging.debug(f"Inferred MATLAB file format for {self.fname}") self.format = "NETCDF" if not self.write: self.dataset = xr.open_dataset(self.fname) self.data = self.dataset.radiance self.meta = self.data.attrs self.n_rows = self.data.downtrack.size self.n_cols = self.data.crosstrack.size self.n_bands = self.data.bands.size else: # EMIT-specific metadata meta = { "ncei_template_version": "NCEI_NetCDF_Swath_Template_v2.0", "summary": "The Earth Surface Mineral Dust Source Investigation (EMIT) is an Earth Ventures-Instrument (EVI-4) Mission that maps the surface mineralogy of arid dust source regions via imaging spectroscopy in the visible and short-wave infrared (VSWIR). Installed on the International Space Station (ISS), the EMIT instrument is a Dyson imaging spectrometer that uses contiguous spectroscopic measurements from 410 to 2450 nm to resolve absoprtion features of iron oxides, clays, sulfates, carbonates, and other dust-forming minerals. During its one-year mission, EMIT will observe the sunlit Earth's dust source regions that occur within +/-52° latitude and produce maps of the source regions that can be used to improve forecasts of the role of mineral dust in the radiative forcing (warming or cooling) of the atmosphere.\\n\\nThis file contains L2A estimated surface reflectances and geolocation data. Reflectance estimates are created using an Optimal Estimation technique - see ATBD for details. Reflectance values are reported as fractions (relative to 1).", "keywords": "Imaging Spectroscopy, minerals, EMIT, dust, radiative forcing", "Conventions": "CF-1.63", "sensor": "EMIT (Earth Surface Mineral Dust Source Investigation)", "instrument": "EMIT", "platform": "ISS", "institution": "NASA Jet Propulsion Laboratory/California Institute of Technology", "license": "", "naming_authority": "", "date_created": str(dtt.now()), "keywords_vocabulary": "NASA Global Change Master Directory (GCMD) Science Keywords", "stdname_vocabulary": "NetCDF Climate and Forecast (CF) Metadata Convention", "creator_name": "ISOFIT", "creator_url": "", "project": "Earth Surface Mineral Dust Source Investigation", "project_url": "https://emit.jpl.nasa.gov/", "publisher_name": "", "publisher_url": "", "publisher_email": "", "identifier_product_doi_authority": "https://doi.org", "flight_line": "", "time_coverage_start": "", "time_coverage_end": "", "software_build_version": "", "software_delivery_version": "", "product_version": "", "history": "ISOFIT generated", "crosstrack_orientation": "", "easternmost_longitude": None, "northernmost_latitude": None, "westernmost_longitude": None, "southernmost_latitude": None, "spatialResolution": None, "spatial_ref": "", "geotransform": [], "day_night_flag": "", "title": "EMIT L2A Estimated Surface Reflectance", } self.n_rows = n_rows self.n_cols = n_cols self.n_bands = n_bands self.dataset = xr.Dataset( { "reflectance": ( ("downtrack", "crosstrack", "bands"), np.zeros((n_rows, n_cols, n_bands)), ) } ) self.dataset.attrs = meta self.data = self.dataset.reflectance # Initialize the file self.dataset.to_netcdf(self.fname) else: # Otherwise we assume it is an ENVI-format file, which is # basically just a binary data cube with a detached human- # readable ASCII header describing dimensions, interleave, and # metadata. We buffer this data in self.frames, reading and # writing individual rows of the cube on-demand. logging.debug("Inferred ENVI file format for %s" % self.fname) self.format = "ENVI" if not self.write: # If we are an input file, the header must preexist. if not os.path.exists(envi_header(self.fname)): logging.error("Could not find %s" % (envi_header(self.fname))) raise IOError("Could not find %s" % (envi_header(self.fname))) # open file and copy metadata self.file = envi.open(envi_header(self.fname), fname) self.meta = self.file.metadata.copy() self.n_rows = int(self.meta["lines"]) self.n_cols = int(self.meta["samples"]) self.n_bands = int(self.meta["bands"]) if "data ignore value" in self.meta: self.flag = float(self.meta["data ignore value"]) else: self.flag = -9999.0 else: # If we are an output file, we may have to build the header # from scratch. Hopefully the caller has supplied the # necessary metadata details. meta = { "lines": n_rows, "samples": n_cols, "bands": n_bands, "byte order": 0, "header offset": 0, "map info": map_info, "file_type": "ENVI Standard", "sensor type": "unknown", "interleave": interleave, "data type": typemap[dtype], "wavelength units": "nm", "z plot range": zrange, "z plot titles": ztitles, "fwhm": fwhm, "bbl": bad_bands, "band names": band_names, "wavelength": self.wl, } for k, v in meta.items(): if v is None: logging.error("Must specify %s" % (k)) raise IOError("Must specify %s" % (k)) if os.path.isfile(envi_header(fname)) is False: self.file = envi.create_image( envi_header(fname), meta, ext="", force=True ) else: self.file = envi.open(envi_header(fname)) self.open_map_with_retries()
[docs] def open_map_with_retries(self): """Try to open a memory map, handling Beowulf I/O issues.""" self.memmap = None for attempt in range(10): self.memmap = self.file.open_memmap(interleave="bip", writable=self.write) if self.memmap is not None: return raise IOError("could not open memmap for " + self.fname)
[docs] def get_frame(self, row): """The frame is a 2D array, essentially a list of spectra. The self.frames list acts as a hash table to avoid storing the entire cube in memory. So we read them or create them on an as-needed basis. When the buffer flushes via a call to flush_buffers, they will be deleted.""" if row not in self.frames: if not self.write: d = self.memmap[row, :, :] self.frames[row] = d.copy() else: self.frames[row] = np.nan * np.zeros((self.n_cols, self.n_bands)) return self.frames[row]
[docs] def write_spectrum(self, row, col, x): """We write a spectrum. If a binary format file, we simply change the data cached in self.frames and defer file I/O until flush_buffers is called.""" if self.format == "ASCII": # Multicolumn output for ASCII products np.savetxt(self.fname, x, fmt="%10.6f") elif self.format == "MATLAB": # Dictionary output for MATLAB products scipy.io.savemat(self.fname, x) elif self.format == "NETCDF": self.data[row, col][:] = x else: # Omit wavelength column for spectral products frame = self.get_frame(row) if x.ndim == 2: x = x[:, -1] frame[col, :] = x
[docs] def read_spectrum(self, row, col): """Get a spectrum from the frame list or ASCII file. Note that if we are an ASCII file, we have already read the single spectrum and return it as-is (ignoring the row/column indices).""" if self.format == "ASCII": return self.data elif self.format == "NETCDF": return self.data.sel(downtrack=row, crosstrack=col).data else: frame = self.get_frame(row) return frame[col]
[docs] def flush_buffers(self): """Write to file, and refresh the memory map object.""" if self.format == "ENVI": if self.write: for row, frame in self.frames.items(): valid = np.logical_not(np.isnan(frame[:, 0])) self.memmap[row, valid, :] = frame[valid, :] self.frames = OrderedDict() del self.file self.file = envi.open(envi_header(self.fname), self.fname) self.open_map_with_retries() elif self.format == "NETCDF": self.dataset.to_netcdf(self.fname)
[docs] class InputData: def __init__(self): self.meas = None self.geom = None self.reference_reflectance = None
[docs] def clear(self): self.__init__()
[docs] class IO: """...""" def __init__(self, config: Config, forward: ForwardModel): """Initialization specifies retrieval subwindows for calculating measurement cost distributions.""" self.config = config self.bbl = ( "{" + ",".join([str(1) for n in range(len(forward.instrument.wl_init))]) + "}" ) self.radiance_correction = None self.meas_wl = forward.instrument.wl_init self.meas_fwhm = forward.instrument.fwhm_init self.writes = 0 self.reads = 0 self.n_rows = 1 self.n_cols = 1 self.n_sv = len(forward.statevec) self.n_chan = len(forward.instrument.wl_init) self.flush_rate = config.implementation.io_buffer_size self.simulation_mode = config.implementation.mode == "simulation" self.total_time = 0 self.current_input_data = InputData() # Names of either the wavelength or statevector outputs wl_names = [("Channel %i" % i) for i in range(self.n_chan)] sv_names = forward.statevec.copy() self.input_datasets, self.output_datasets, self.map_info = {}, {}, "{}" # Load input files and record relevant metadata for element, element_name in zip(*self.config.input.get_elements()): self.input_datasets[element_name] = SpectrumFile(element) if (self.input_datasets[element_name].n_rows > self.n_rows) or ( self.input_datasets[element_name].n_cols > self.n_cols ): self.n_rows = self.input_datasets[element_name].n_rows self.n_cols = self.input_datasets[element_name].n_cols for inherit in ["map info", "bbl"]: if inherit in self.input_datasets[element_name].meta: setattr( self, inherit.replace(" ", "_"), self.input_datasets[element_name].meta[inherit], ) for element, element_header, element_name in zip( *self.config.output.get_output_files() ): band_names, ztitle, zrange = element_header if band_names == "statevector": band_names = sv_names elif band_names == "wavelength": band_names = wl_names elif band_names == "atm_coeffs": band_names = wl_names * 5 else: band_names = "{}" n_bands = len(band_names) self.output_datasets[element_name] = SpectrumFile( element, write=True, n_rows=self.n_rows, n_cols=self.n_cols, n_bands=n_bands, interleave="bip", dtype=np.float32, wavelengths=self.meas_wl, fwhm=self.meas_fwhm, band_names=band_names, bad_bands=self.bbl, map_info=self.map_info, zrange=zrange, ztitles=ztitle, ) # Do we apply a radiance correction? if self.config.input.radiometry_correction_file is not None: filename = self.config.input.radiometry_correction_file self.radiance_correction, wl = load_spectrum(filename)
[docs] def get_components_at_index(self, row: int, col: int) -> InputData: """ Load data from input files at the specified (row, col) index. Args: row: row to retrieve data from col: column to retrieve data from Returns: InputData: object containing all current data reads """ # Prepare out input data object by blanking it out self.current_input_data.clear() # Determine the appropriate row, column index. and initialize the # data dictionary with empty entries. data = dict([(i, None) for i in self.config.input.get_all_element_names()]) logging.debug(f"Row {row} Column {col}") # Read data from any of the input files that are defined. for source in self.input_datasets: data[source] = self.input_datasets[source].read_spectrum(row, col) self.reads += 1 if self.reads >= self.flush_rate: self.flush_buffers() if self.simulation_mode: # If solving the inverse problem, the measurment is the surface reflectance meas = data["reflectance_file"].copy() else: # If solving the inverse problem, the measurment is the radiance # We apply the calibration correciton here for simplicity. meas = data["measured_radiance_file"] if meas is not None: meas = meas.copy() if data["radiometry_correction_file"] is not None: meas *= data["radiometry_correction_file"] self.current_input_data.meas = meas if self.current_input_data.meas is None or np.all( self.current_input_data.meas < -49 ): return None ## Check for any bad data flags for source in self.input_datasets: if np.all(abs(data[source] - self.input_datasets[source].flag) < eps): return None # We build the geometry object for this spectrum. For files not # specified in the input configuration block, the associated entries # will be 'None'. The Geometry object will use reasonable defaults. geom = Geometry( obs=data["obs_file"], loc=data["loc_file"], bg_rfl=data["background_reflectance_file"], ) self.current_input_data.geom = geom self.current_input_data.reference_reflectance = data[ "reference_reflectance_file" ] return self.current_input_data
[docs] def flush_buffers(self): """Write all buffered output data to disk, and erase read buffers.""" for file_dictionary in [self.input_datasets, self.output_datasets]: for name, fi in file_dictionary.items(): fi.flush_buffers() self.reads = 0 self.writes = 0
[docs] def write_datasets( self, row: int, col: int, output: dict, states: List, flush_immediately=False ): """ Write all valid datasets to disk (possibly buffered). Args: row: row to write to col: column to write to output: dictionary with keys corresponding to config.input file references states: results states from inversion. In the MCMC case, these are interpreted as samples from the posterior, otherwise they are a gradient descent trajectory (with the last spectrum being the converged solution). flush_immediately: IO argument telling us to immediately write to disk, ignoring config settings """ for product in self.output_datasets: logging.debug("IO: Writing " + product) self.output_datasets[product].write_spectrum(row, col, output[product]) # Special case! samples file is matlab format. if self.config.output.mcmc_samples_file is not None: logging.debug("IO: Writing mcmc_samples_file") mdict = {"samples": states} scipy.io.savemat(self.config.output.mcmc_samples_file, mdict) self.writes += 1 if self.writes >= self.flush_rate or flush_immediately: self.flush_buffers()
[docs] def build_output( self, states: List, input_data: InputData, fm: ForwardModel, iv: Inversion ): """ Build the output to be written to disk as a dictionary Args: states: results states from inversion. In the MCMC case, these are interpreted as samples from the posterior, otherwise they are a gradient descent trajectory (with the last spectrum being the converged solution). input_data: an InputData object fm: the forward model used to solve the inversion iv: the inversion object """ if len(states) == 0: # Write a bad data flag atm_bad = np.zeros(len(fm.instrument.n_chan) * 5) * -9999.0 state_bad = np.zeros(len(fm.statevec)) * -9999.0 data_bad = np.zeros(fm.instrument.n_chan) * -9999.0 to_write = { "estimated_state_file": state_bad, "estimated_reflectance_file": data_bad, "estimated_emission_file": data_bad, "modeled_radiance_file": data_bad, "apparent_reflectance_file": data_bad, "path_radiance_file": data_bad, "simulated_measurement_file": data_bad, "algebraic_inverse_file": data_bad, "atmospheric_coefficients_file": atm_bad, "radiometry_correction_file": data_bad, "spectral_calibration_file": data_bad, "posterior_uncertainty_file": state_bad, } else: to_write = {} meas = input_data.meas geom = input_data.geom reference_reflectance = input_data.reference_reflectance if self.config.implementation.mode == "inversion_mcmc": state_est = states.mean(axis=0) else: state_est = states[-1, :] ############ Start with all of the 'independent' calculations if "estimated_state_file" in self.output_datasets: to_write["estimated_state_file"] = state_est if "path_radiance_file" in self.output_datasets: path_est = fm.calc_meas( state_est, geom, rfl=np.zeros(self.meas_wl.shape) ) to_write["path_radiance_file"] = np.column_stack( (fm.instrument.wl_init, path_est) ) if "spectral_calibration_file" in self.output_datasets: # Spectral calibration wl, fwhm = fm.calibration(state_est) cal = np.column_stack( [np.arange(0, len(wl)), wl / 1000.0, fwhm / 1000.0] ) to_write["spectral_calibration_file"] = cal if "posterior_uncertainty_file" in self.output_datasets: S_hat, K, G = iv.calc_posterior(state_est, geom, meas) to_write["posterior_uncertainty_file"] = np.sqrt(np.diag(S_hat)) ############ Now proceed to the calcs where they may be some overlap if any( item in ["estimated_emission_file", "apparent_reflectance_file"] for item in self.output_datasets ): Ls_est = fm.calc_Ls(state_est, geom) if any( item in [ "estimated_reflectance_file", "apparent_reflectance_file", "modeled_radiance_file", "simulated_measurement_file", ] for item in self.output_datasets ): lamb_est = fm.calc_lamb(state_est, geom) if any( item in ["modeled_radiance_file", "simulated_measurement_file"] for item in self.output_datasets ): meas_est = fm.calc_meas(state_est, geom, rfl=lamb_est) if "modeled_radiance_file" in self.output_datasets: to_write["modeled_radiance_file"] = np.column_stack( (fm.instrument.wl_init, meas_est) ) if "simulated_measurement_file" in self.output_datasets: meas_sim = fm.instrument.simulate_measurement(meas_est, geom) to_write["simulated_measurement_file"] = np.column_stack( (self.meas_wl, meas_sim) ) if "estimated_emission_file" in self.output_datasets: to_write["estimated_emission_file"] = np.column_stack( (self.meas_wl, Ls_est) ) if "estimated_reflectance_file" in self.output_datasets: to_write["estimated_reflectance_file"] = np.column_stack( (self.meas_wl, lamb_est) ) if "apparent_reflectance_file" in self.output_datasets: # Upward emission & glint and apparent reflectance apparent_rfl_est = lamb_est + Ls_est to_write["apparent_reflectance_file"] = np.column_stack( (self.meas_wl, apparent_rfl_est) ) x_surface, x_RT, x_instrument = fm.unpack(state_est) if any( item in ["algebraic_inverse_file", "atmospheric_coefficients_file"] for item in self.output_datasets ): rfl_alg_opt, Ls, coeffs = invert_algebraic( fm.surface, fm.RT, fm.instrument, x_surface, x_RT, x_instrument, meas, geom, ) if "algebraic_inverse_file" in self.output_datasets: to_write["algebraic_inverse_file"] = np.column_stack( (self.meas_wl, rfl_alg_opt) ) if "atmospheric_coefficients_file" in self.output_datasets: rhoatm, sphalb, transm, solar_irr, coszen, transup = coeffs atm = np.column_stack( list(coeffs[:4]) + [np.ones((len(self.meas_wl), 1)) * coszen] ) atm = atm.T.reshape((len(self.meas_wl) * 5,)) to_write["atmospheric_coefficients_file"] = atm if "radiometry_correction_file" in self.output_datasets: factors = np.ones(len(self.meas_wl)) if "reference_reflectance_file" in self.input_datasets: meas_est = fm.calc_meas(state_est, geom, rfl=reference_reflectance) factors = meas_est / meas else: logging.warning("No reflectance reference") to_write["radiometry_correction_file"] = factors return to_write
[docs] def write_spectrum( self, row: int, col: int, states: List, fm: ForwardModel, iv: Inversion, flush_immediately=False, input_data: InputData = None, ): """ Convenience function to build and write output in one step Args: row: data row to write col: data column to write states: results states from inversion. In the MCMC case, these are interpreted as samples from the posterior, otherwise they are a gradient descent trajectory (with the last spectrum being the converged solution). meas: measurement radiance geom: geometry object of the observation fm: the forward model used to solve the inversion iv: the inversion object flush_immediately: IO argument telling us to immediately write to disk, ignoring config settings input_data: optionally overwride self.current_input_data """ if input_data is None: to_write = self.build_output(states, self.current_input_data, fm, iv) else: to_write = self.build_output(states, input_data, fm, iv) self.write_datasets( row, col, to_write, states, flush_immediately=flush_immediately )
[docs] def write_bil_chunk( dat: np.array, outfile: str, line: int, shape: tuple, dtype: str = "float32" ) -> None: """ Write a chunk of data to a binary, BIL formatted data cube. Args: dat: data to write outfile: output file to write to line: line of the output file to write to shape: shape of the output file dtype: output data type Returns: None """ outfile = open(outfile, "rb+") outfile.seek(line * shape[1] * shape[2] * np.dtype(dtype).itemsize) outfile.write(dat.astype(dtype).tobytes()) outfile.close()