Source code for isofit.utils.interpolate_atmosphere

#! /usr/bin/env python3
#
#  Copyright 2020 California Institute of Technology
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#
# ISOFIT: Imaging Spectrometer Optimal FITting
# Author: David R Thompson, david.r.thompson@jpl.nasa.gov
#

import logging
import multiprocessing
import time

import numpy as np
from scipy.linalg import inv
from scipy.spatial import KDTree
from sklearn.decomposition import PCA
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import RBF, WhiteKernel
from spectral.io import envi

from isofit.configs import configs
from isofit.core.common import envi_header
from isofit.core.instrument import Instrument


def _write_bil_chunk(
    dat: np.array, outfile: str, line: int, shape: tuple, dtype: str = "float32"
) -> None:
    """
    Write a chunk of data to a binary, BIL formatted data cube.
    Args:
        dat: data to write
        outfile: output file to write to
        line: line of the output file to write to
        shape: shape of the output file
        dtype: output data type

    Returns:
        None
    """
    outfile = open(outfile, "rb+")
    outfile.seek(line * shape[1] * shape[2] * np.dtype(dtype).itemsize)
    outfile.write(dat.astype(dtype).tobytes())
    outfile.close()


def _run_chunk(
    start_line: int,
    stop_line: int,
    reference_radiance_file: str,
    reference_atm_file: str,
    reference_locations_file: str,
    input_radiance_file: str,
    input_locations_file: str,
    segmentation_file: str,
    isofit_config: dict,
    output_reflectance_file: str,
    output_uncertainty_file: str,
    radiance_factors: np.array,
    nneighbors: int,
    nodata_value: float,
) -> None:
    """
    Args:
        start_line: line to start empirical line run at
        stop_line:  line to stop empirical line run at
        reference_radiance_file: source file for radiance (interpolation built from this)
        reference_atm_file:  source file for atmosphere coefficients (interpolation built from this)
        reference_locations_file:  source file for file locations (lon, lat, elev), (interpolation built from this)
        input_radiance_file: input radiance file (interpolate over this)
        input_locations_file: input location file (interpolate over this)
        segmentation_file: input file noting the per-pixel segmentation used
        isofit_config: dictionary-stype isofit configuration
        output_reflectance_file: location to write output reflectance to
        output_uncertainty_file: location to write output uncertainty to
        radiance_factors: radiance adjustment factors
        nneighbors: number of neighbors to use for interpolation
        nodata_value: nodata value of input and output

    Returns:
        None

    """

    # Load reference images
    reference_radiance_img = envi.open(
        envi_header(reference_radiance_file), reference_radiance_file
    )
    reference_atm_img = envi.open(envi_header(reference_atm_file), reference_atm_file)
    reference_locations_img = envi.open(
        envi_header(reference_locations_file), reference_locations_file
    )

    n_reference_lines, n_radiance_bands, n_reference_columns = [
        int(reference_radiance_img.metadata[n]) for n in ("lines", "bands", "samples")
    ]

    # Load input images
    input_radiance_img = envi.open(
        envi_header(input_radiance_file), input_radiance_file
    )
    n_input_lines, n_input_bands, n_input_samples = [
        int(input_radiance_img.metadata[n]) for n in ("lines", "bands", "samples")
    ]
    wl = np.array([float(w) for w in input_radiance_img.metadata["wavelength"]])

    input_locations_img = envi.open(
        envi_header(input_locations_file), input_locations_file
    )
    n_location_bands = int(input_locations_img.metadata["bands"])

    # Load output images
    output_reflectance_img = envi.open(
        envi_header(output_reflectance_file), output_reflectance_file
    )
    output_uncertainty_img = envi.open(
        envi_header(output_uncertainty_file), output_uncertainty_file
    )
    n_output_reflectance_bands = int(output_reflectance_img.metadata["bands"])
    n_output_uncertainty_bands = int(output_uncertainty_img.metadata["bands"])

    # Load reference data
    reference_locations_mm = reference_locations_img.open_memmap(
        interleave="source", writable=False
    )
    reference_locations = np.array(reference_locations_mm[:, :, :]).reshape(
        (n_reference_lines, n_location_bands)
    )

    reference_radiance_mm = reference_radiance_img.open_memmap(
        interleave="source", writable=False
    )
    reference_radiance = np.array(reference_radiance_mm[:, :, :]).reshape(
        (n_reference_lines, n_radiance_bands)
    )

    reference_atm_mm = reference_atm_img.open_memmap(
        interleave="source", writable=False
    )
    reference_atm = np.array(reference_atm_mm[:, :, :]).reshape(
        (n_reference_lines, n_radiance_bands * 5)
    )
    rhoatm = reference_atm[:, :n_radiance_bands]
    sphalb = reference_atm[:, n_radiance_bands : (n_radiance_bands * 2)]
    transm = reference_atm[:, (n_radiance_bands * 2) : (n_radiance_bands * 3)]
    solirr = reference_atm[:, (n_radiance_bands * 3) : (n_radiance_bands * 4)]
    coszen = reference_atm[:, (n_radiance_bands * 4) : (n_radiance_bands * 5)]

    # Load segmentation data
    if segmentation_file:
        segmentation_img = envi.open(envi_header(segmentation_file), segmentation_file)
        segmentation_img = segmentation_img.read_band(0)
    else:
        segmentation_img = None

    # Prepare instrument model, if available
    if isofit_config is not None:
        config = configs.create_new_config(isofit_config)
        instrument = Instrument(config)
        logging.info("Loading instrument")
    else:
        instrument = None

    # Load radiance factors
    if radiance_factors is None:
        radiance_adjustment = np.ones(
            n_radiance_bands,
        )
    else:
        radiance_adjustment = np.loadtxt(radiance_factors)

    # PCA coefficients
    rdn_pca = PCA(n_components=2)
    reference_pca = rdn_pca.fit_transform(reference_radiance * radiance_adjustment)

    # Create the tree to find nearest neighbor segments.
    # Assume (heuristically) that, for distance purposes, 1 m vertically is
    # comparable to 10 m horizontally, and that there are 100 km per latitude
    # degree.  This is all approximate of course.  Elevation appears in the
    # Third element, and the first two are latitude/longitude coordinates
    # The fourth and fifth elements are "spectral distance" determined by the
    # top principal component coefficients
    loc_scaling = np.array([1e5, 1e5, 10, 100, 100])
    scaled_ref_loc = (
        np.concatenate((reference_locations, reference_pca), axis=1) * loc_scaling
    )
    tree = KDTree(scaled_ref_loc)

    # Fit GP parameters on transmissivity of an H2O feature, in the
    # first 400 datapoints
    use = np.arange(min(len(rhoatm), 400))
    h2oband = np.argmin(abs(wl - 940))
    scale = (500, 500, 500, 500, 500)
    bounds = ((100, 2000), (100, 2000), (100, 2000), (100, 2000), (100, 2000))
    kernel = RBF(length_scale=scale, length_scale_bounds=bounds) + WhiteKernel(
        noise_level=0.01, noise_level_bounds=(1e-10, 0.1)
    )
    gp = GaussianProcessRegressor(kernel=kernel, alpha=0.0, normalize_y=True)
    gp = gp.fit(scaled_ref_loc[use, :], transm[use, h2oband])
    kernel = gp.kernel_

    # Iterate through image.  Each segment has its own GP, stored in a
    # hash table indexed by location in the segmentation map
    hash_table = {}

    for row in np.arange(start_line, stop_line):
        # Load inline input data
        input_radiance_mm = input_radiance_img.open_memmap(
            interleave="source", writable=False
        )
        input_radiance = np.array(input_radiance_mm[row, :, :])
        if input_radiance_img.metadata["interleave"] == "bil":
            input_radiance = input_radiance.transpose((1, 0))
        input_radiance = input_radiance * radiance_adjustment

        input_locations_mm = input_locations_img.open_memmap(
            interleave="source", writable=False
        )
        input_locations = np.array(input_locations_mm[row, :, :])
        if input_locations_img.metadata["interleave"] == "bil":
            input_locations = input_locations.transpose((1, 0))

        output_reflectance_row = np.zeros(input_radiance.shape) + nodata_value
        output_uncertainty_row = np.zeros(input_radiance.shape) + nodata_value

        nspectra, start = 0, time.time()
        for col in np.arange(n_input_samples):
            # Get radiance, pca coordinates, physical location for this datum
            my_rdn = input_radiance[col, :]
            my_pca = rdn_pca.transform(my_rdn[np.newaxis, :])
            my_loc = np.r_[input_locations[col, :], my_pca[0, :]] * loc_scaling

            if np.all(np.isclose(my_rdn, nodata_value)):
                output_reflectance_row[col, :] = nodata_value
                output_uncertainty_row[col, :] = nodata_value
                continue

            # Retrieve or build the GP
            gp_rhoatm, gp_sphalb, gp_transm, irr = None, None, None, None
            hash_idx = segmentation_img[row, col]
            if hash_idx in hash_table:
                gp_rhoatm, gp_sphalb, gp_transm, irr = hash_table[hash_idx]
            else:
                # There is no GP for this segment, so we build one from
                # the atmospheric coefficients from closest neighbors
                dists, nn = tree.query(my_loc, nneighbors)
                neighbor_rhoatm = rhoatm[nn, :]
                neighbor_transm = transm[nn, :]
                neighbor_sphalb = sphalb[nn, :]
                neighbor_coszen = coszen[nn, :]
                neighbor_solirr = solirr[nn, :]
                neighbor_locs = scaled_ref_loc[nn, :]

                # Create a new GP using the optimized parameters as a fixed kernel
                gp_rhoatm = GaussianProcessRegressor(
                    kernel=kernel, alpha=0.0, normalize_y=True, optimizer=None
                )
                gp_rhoatm.fit(neighbor_locs, neighbor_rhoatm)
                gp_sphalb = GaussianProcessRegressor(
                    kernel=kernel, alpha=0.0, normalize_y=True, optimizer=None
                )
                gp_sphalb.fit(neighbor_locs, neighbor_sphalb)
                gp_transm = GaussianProcessRegressor(
                    kernel=kernel, alpha=0.0, normalize_y=True, optimizer=None
                )
                gp_transm.fit(neighbor_locs, neighbor_transm)
                irr = solirr[1, :] * coszen[1, :]
                irr[irr < 1e-8] = 1e-8

                hash_table[hash_idx] = (gp_rhoatm, gp_sphalb, gp_transm, irr)

            my_rhoatm = gp_rhoatm.predict(my_loc[np.newaxis, :])
            my_sphalb = gp_sphalb.predict(my_loc[np.newaxis, :])
            my_transm = gp_transm.predict(my_loc[np.newaxis, :])
            my_rho = (my_rdn * np.pi) / irr
            my_rfl = 1.0 / (my_transm / (my_rho - my_rhoatm) + my_sphalb)
            output_reflectance_row[col, :] = my_rfl

            # Calculate uncertainties.  Sy approximation rather than Seps for
            # speed, for now... but we do take into account instrument
            # radiometric uncertainties
            # output_uncertainty_row[col, :] = np.zeros()
            # if instrument is None:
            # else:
            #    Sy = instrument.Sy(x, geom=None)
            #    calunc = instrument.bval[:instrument.n_chan]
            #    output_uncertainty_row[col, :] = np.sqrt(
            #        np.diag(Sy) + pow(calunc * x, 2)) * bhat[:, 1]
            # if loglevel == 'DEBUG':
            #    plot_example(xv, yv, bhat)

            nspectra = nspectra + 1

        elapsed = float(time.time() - start)
        logging.info(
            "row {}/{}, ({}/{} local), {} spectra per second".format(
                row,
                n_input_lines,
                int(row - start_line),
                int(stop_line - start_line),
                round(float(nspectra) / elapsed, 2),
            )
        )

        del input_locations_mm
        del input_radiance_mm

        output_reflectance_row = output_reflectance_row.transpose((1, 0))
        output_uncertainty_row = output_uncertainty_row.transpose((1, 0))
        shp = output_reflectance_row.shape
        output_reflectance_row = output_reflectance_row.reshape((1, shp[0], shp[1]))
        shp = output_uncertainty_row.shape
        output_uncertainty_row = output_uncertainty_row.reshape((1, shp[0], shp[1]))

        _write_bil_chunk(
            output_reflectance_row,
            output_reflectance_file,
            row,
            (n_input_lines, n_output_reflectance_bands, n_input_samples),
        )
        _write_bil_chunk(
            output_uncertainty_row,
            output_uncertainty_file,
            row,
            (n_input_lines, n_output_uncertainty_bands, n_input_samples),
        )


[docs] def interpolate_atmosphere( reference_radiance_file: str, reference_atm_file: str, reference_locations_file: str, segmentation_file: str, input_radiance_file: str, input_locations_file: str, output_reflectance_file: str, output_uncertainty_file: str, nneighbors: int = 15, nodata_value: float = -9999.0, level: str = "INFO", radiance_factors: np.array = None, isofit_config: dict = None, n_cores: int = -1, ) -> None: """ Perform a Gaussian process interpolation of atmospheric parameters. It relies on precalculated atmospheric coefficients at a subset of spatial locations stored in a file. The file has each coefficient defined for every radiance channel, appearing in the order: (1) atmospheric path reflectance; (2) spherical sky albedo; (3) total diffuse and direct transmittance of the two-part downwelling and upwelling path; (4) extraterrestrial solar irradiance; (5) cosine of solar zenith angle. Args: reference_radiance_file: source file for radiance (interpolation built from this) reference_atm_file: source file for atmospheric coefficients (interpolation from this) reference_locations_file: source file for file locations (lon, lat, elev), (interpolation from this) segmentation_file: input file noting the per-pixel segmentation used input_radiance_file: input radiance file (interpolate over this) input_locations_file: input location file (interpolate over this) output_reflectance_file: location to write output reflectance output_uncertainty_file: location to write output uncertainty nneighbors: number of neighbors to use for interpolation nodata_value: nodata value of input and output level: logging level radiance_factors: radiance adjustment factors isofit_config: dictionary-stype isofit configuration n_cores: number of cores to run on Returns: None """ loglevel = level logging.basicConfig(format="%(message)s", level=loglevel) # Open input data to check that band formatting is correct # Load reference set radiance reference_radiance_img = envi.open( envi_header(reference_radiance_file), reference_radiance_file ) n_reference_lines, n_radiance_bands, n_reference_columns = [ int(reference_radiance_img.metadata[n]) for n in ("lines", "bands", "samples") ] if n_reference_columns != 1: raise IndexError("Reference data should be a single-column list") # Load reference set atmospheric coefficients reference_atm_img = envi.open(envi_header(reference_atm_file), reference_atm_file) nrefa, nba, srefa = [ int(reference_atm_img.metadata[n]) for n in ("lines", "bands", "samples") ] if nrefa != n_reference_lines or srefa != n_reference_columns: raise IndexError("Reference file dimension mismatch (atmosphere)") if nba != (n_radiance_bands * 5): raise IndexError("Reference atmosphere file has incorrect dimensioning") # Load reference set locations reference_locations_img = envi.open( envi_header(reference_locations_file), reference_locations_file ) nrefl, lb, ls = [ int(reference_locations_img.metadata[n]) for n in ("lines", "bands", "samples") ] if nrefl != n_reference_lines or lb != 3: raise IndexError("Reference file dimension mismatch (locations)") input_radiance_img = envi.open( envi_header(input_radiance_file), input_radiance_file ) n_input_lines, n_input_bands, n_input_samples = [ int(input_radiance_img.metadata[n]) for n in ("lines", "bands", "samples") ] if n_radiance_bands != n_input_bands: msg = "Number of channels mismatch: input (%i) vs. reference (%i)" raise IndexError(msg % (n_input_bands, n_radiance_bands)) input_locations_img = envi.open( envi_header(input_locations_file), input_locations_file ) nll, nlb, nls = [ int(input_locations_img.metadata[n]) for n in ("lines", "bands", "samples") ] if nll != n_input_lines or nlb != 3 or nls != n_input_samples: raise IndexError("Input location dimension mismatch") # Create output files output_metadata = input_radiance_img.metadata output_metadata["interleave"] = "bil" output_reflectance_img = envi.create_image( envi_header(output_reflectance_file), ext="", metadata=output_metadata, force=True, ) output_uncertainty_img = envi.create_image( envi_header(output_uncertainty_file), ext="", metadata=output_metadata, force=True, ) # Now cleanup inputs and outputs, we'll write dynamically above del output_reflectance_img, output_uncertainty_img del ( reference_atm_img, reference_locations_img, input_radiance_img, input_locations_img, ) # Determine the number of cores to use if n_cores == -1: n_cores = multiprocessing.cpu_count() n_cores = min(n_cores, n_input_lines) # Break data into sections line_sections = np.linspace(0, n_input_lines, num=n_cores + 1, dtype=int) # Set up our pool pool = multiprocessing.Pool(processes=n_cores) start_time = time.time() logging.info( "Beginning atmospheric interpolation inversions using {} cores".format(n_cores) ) # Run the pool (or run serially) results = [] for l in range(len(line_sections) - 1): args = ( line_sections[l], line_sections[l + 1], reference_radiance_file, reference_atm_file, reference_locations_file, input_radiance_file, input_locations_file, segmentation_file, isofit_config, output_reflectance_file, output_uncertainty_file, radiance_factors, nneighbors, nodata_value, ) if n_cores != 1: results.append(pool.apply_async(_run_chunk, args)) else: _run_chunk(*args) pool.close() pool.join() total_time = time.time() - start_time logging.info( "Parallel empirical line inversions complete. {} s total, {} spectra/s, {}" " spectra/s/core".format( total_time, line_sections[-1] * n_input_samples / total_time, line_sections[-1] * n_input_samples / total_time / n_cores, ) )