#! /usr/bin/env python3
#
# Copyright 2018 California Institute of Technology
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# ISOFIT: Imaging Spectrometer Optimal FITting
# Author: Philip G. Brodrick, philip.brodrick@jpl.nasa.gov
import logging
import multiprocessing
import os
import time
from collections import OrderedDict
from glob import glob
import click
import numpy as np
from spectral.io import envi
from isofit import ray
from isofit.configs import configs
from isofit.core.common import envi_header, load_spectrum
from isofit.core.fileio import write_bil_chunk
from isofit.core.forward import ForwardModel
from isofit.core.geometry import Geometry
from isofit.inversion.inverse import Inversion
from isofit.inversion.inverse_simple import invert_analytical
from isofit.utils.atm_interpolation import atm_interpolation
[docs]
def analytical_line(
rdn_file: str,
loc_file: str,
obs_file: str,
isofit_dir: str,
isofit_config: str = None,
segmentation_file: str = None,
n_atm_neighbors: list = None,
n_cores: int = -1,
smoothing_sigma: list = None,
output_rfl_file: str = None,
output_unc_file: str = None,
atm_file: str = None,
loglevel: str = "INFO",
logfile: str = None,
) -> None:
"""
TODO: Description
"""
if n_atm_neighbors is None:
n_atm_neighbors = [20]
if smoothing_sigma is None:
smoothing_sigma = [2]
logging.basicConfig(
format="%(levelname)s:%(asctime)s ||| %(message)s",
level=loglevel,
filename=logfile,
datefmt="%Y-%m-%d,%H:%M:%S",
)
if isofit_config is None:
file = glob(os.path.join(isofit_dir, "config", "") + "*_isofit.json")[0]
else:
file = isofit_config
config = configs.create_new_config(file)
config.forward_model.instrument.integrations = 1
subs_state_file = config.output.estimated_state_file
subs_loc_file = config.input.loc_file
if (
segmentation_file is None
or config.forward_model.surface.surface_category == "glint_model_surface"
):
lbl_file = subs_state_file.replace("_subs_state", "_lbl")
else:
lbl_file = segmentation_file
if (
output_rfl_file is None
or config.forward_model.surface.surface_category == "glint_model_surface"
):
analytical_state_file = subs_state_file.replace(
"_subs_state", "_state_analytical"
)
else:
analytical_state_file = output_rfl_file
if (
output_unc_file is None
or config.forward_model.surface.surface_category == "glint_model_surface"
):
analytical_state_unc_file = subs_state_file.replace(
"_subs_state", "_state_analytical_uncert"
)
else:
analytical_state_unc_file = output_unc_file
if (
atm_file is None
or config.forward_model.surface.surface_category == "glint_model_surface"
):
atm_file = subs_state_file.replace("_subs_state", "_atm_interp")
else:
atm_file = atm_file
fm = ForwardModel(config)
iv = Inversion(config, fm)
if os.path.isfile(atm_file) is False:
atm_interpolation(
reference_state_file=subs_state_file,
reference_locations_file=subs_loc_file,
input_locations_file=loc_file,
segmentation_file=lbl_file,
output_atm_file=atm_file,
atm_band_names=fm.RT.statevec_names,
nneighbors=n_atm_neighbors,
gaussian_smoothing_sigma=smoothing_sigma,
)
rdn_ds = envi.open(envi_header(rdn_file))
rdn = rdn_ds.open_memmap(interleave="bip")
rdns = rdn.shape
output_metadata = rdn_ds.metadata
output_metadata["interleave"] = "bil"
output_metadata["description"] = "L2A Analytyical per-pixel surface retrieval"
output_metadata["bands"] = str(len(fm.idx_surface))
outside_ret_windows = np.zeros(len(fm.surface.idx_lamb), dtype=int)
outside_ret_windows[iv.winidx] = 1
output_metadata["bbl"] = "{" + ",".join([str(x) for x in outside_ret_windows]) + "}"
if "emit pge input files" in list(output_metadata.keys()):
del output_metadata["emit pge input files"]
img = envi.create_image(
envi_header(analytical_state_file), ext="", metadata=output_metadata, force=True
)
del img
img = envi.create_image(
envi_header(analytical_state_unc_file),
ext="",
metadata=output_metadata,
force=True,
)
del rdn, img
ray_dict = {
"ignore_reinit_error": config.implementation.ray_ignore_reinit_error,
"address": config.implementation.ip_head,
"_temp_dir": config.implementation.ray_temp_dir,
"include_dashboard": config.implementation.ray_include_dashboard,
"_redis_password": config.implementation.redis_password,
"num_cpus": n_cores,
}
ray.init(**ray_dict)
n_workers = n_cores
if n_workers == -1:
n_workers = multiprocessing.cpu_count()
wargs = [
ray.put(obj)
for obj in (
config,
fm,
atm_file,
analytical_state_file,
analytical_state_unc_file,
rdn_file,
loc_file,
obs_file,
loglevel,
logfile,
)
]
workers = ray.util.ActorPool([Worker.remote(*wargs) for _ in range(n_workers)])
line_breaks = np.linspace(
0, rdns[0], n_workers * config.implementation.task_inflation_factor, dtype=int
)
line_breaks = [
(line_breaks[n], line_breaks[n + 1]) for n in range(len(line_breaks) - 1)
]
start_time = time.time()
res = list(workers.map_unordered(lambda a, b: a.run_lines.remote(b), line_breaks))
total_time = time.time() - start_time
logging.info(
f"Analytical line inversions complete. {round(total_time,2)}s total, "
f"{round(rdns[0]*rdns[1]/total_time,4)} spectra/s, "
f"{round(rdns[0]*rdns[1]/total_time/n_workers,4)} spectra/s/core"
)
@ray.remote(num_cpus=1)
class Worker(object):
def __init__(
self,
config: configs.Config,
fm: ForwardModel,
RT_state_file: str,
analytical_state_file: str,
analytical_state_unc_file: str,
rdn_file: str,
loc_file: str,
obs_file: str,
loglevel: str,
logfile: str,
subs_state_file: str = None,
lbl_file: str = None,
):
"""
Worker class to help run a subset of spectra.
Args:
fm: isofit forward_model
loglevel: output logging level
logfile: output logging file
"""
logging.basicConfig(
format="%(levelname)s:%(asctime)s ||| %(message)s",
level=loglevel,
filename=logfile,
datefmt="%Y-%m-%d,%H:%M:%S",
)
self.config = config
self.fm = fm
self.iv = Inversion(self.config, self.fm)
self.completed_spectra = 0
self.hash_table = OrderedDict()
self.hash_size = 500
self.RT_state_file = RT_state_file
self.rdn_file = rdn_file
self.loc_file = loc_file
self.obs_file = obs_file
self.analytical_state_file = analytical_state_file
self.analytical_state_unc_file = analytical_state_unc_file
if subs_state_file is not None and lbl_file is not None:
self.subs_state_file = subs_state_file
self.lbl_file = lbl_file
else:
self.subs_state_file = None
self.lbl_file = None
if config.input.radiometry_correction_file is not None:
self.radiance_correction, wl = load_spectrum(
config.input.radiometry_correction_file
)
else:
self.radiance_correction = None
def run_lines(self, startstop: tuple) -> None:
"""
TODO: Description
"""
rdn = envi.open(envi_header(self.rdn_file)).open_memmap(interleave="bip")
loc = envi.open(envi_header(self.loc_file)).open_memmap(interleave="bip")
obs = envi.open(envi_header(self.obs_file)).open_memmap(interleave="bip")
rt_state = envi.open(envi_header(self.RT_state_file)).open_memmap(
interleave="bip"
)
start_line, stop_line = startstop
output_state = (
np.zeros(
(stop_line - start_line, rt_state.shape[1], len(self.fm.idx_surface))
)
- 9999
)
output_state_unc = (
np.zeros(
(stop_line - start_line, rt_state.shape[1], len(self.fm.idx_surface))
)
- 9999
)
for r in range(start_line, stop_line):
for c in range(output_state.shape[1]):
meas = rdn[r, c, :]
if self.radiance_correction is not None:
meas *= self.radiance_correction
if np.all(meas < 0):
continue
x_RT = rt_state[r, c, self.fm.idx_RT - len(self.fm.idx_surface)]
geom = Geometry(obs=obs[r, c, :], loc=loc[r, c, :])
states, unc = invert_analytical(
self.iv.fm,
self.iv.winidx,
meas,
geom,
x_RT,
1,
self.hash_table,
self.hash_size,
)
output_state[r - start_line, c, :] = states[-1][self.fm.idx_surface]
output_state_unc[r - start_line, c, :] = unc[self.fm.idx_surface]
logging.info(f"Analytical line writing line {r}")
write_bil_chunk(
output_state[r - start_line, ...].T,
self.analytical_state_file,
r,
(rdn.shape[0], rdn.shape[1], len(self.fm.idx_surface)),
)
write_bil_chunk(
output_state_unc[r - start_line, ...].T,
self.analytical_state_unc_file,
r,
(rdn.shape[0], rdn.shape[1], len(self.fm.idx_surface)),
)
@click.command(name="analytical_line")
@click.argument("rdn_file")
@click.argument("loc_file")
@click.argument("obs_file")
@click.argument("isofit_dir")
@click.option("--isofit_config", type=str, default=None)
@click.option("--segmentation_file", help="TODO", type=str, default=None)
@click.option("--n_atm_neighbors", help="TODO", type=int, default=20)
@click.option("--n_cores", help="TODO", type=int, default=-1)
@click.option("--smoothing_sigma", help="TODO", type=int, default=2)
@click.option("--output_rfl_file", help="TODO", type=str, default=None)
@click.option("--output_unc_file", help="TODO", type=str, default=None)
@click.option("--atm_file", help="TODO", type=str, default=None)
@click.option("--loglevel", help="TODO", type=str, default="INFO")
@click.option("--logfile", help="TODO", type=str, default=None)
def cli_analytical_line(**kwargs):
"""Execute the analytical line algorithm"""
click.echo("Running analytical line")
analytical_line(**kwargs)
click.echo("Done")
if __name__ == "__main__":
raise NotImplementedError(
"analytical_line.py can no longer be called this way. Run as:\n isofit analytical_line [ARGS]"
)