Source code for isofit.core.isofit

#! /usr/bin/env python
#
#  Copyright 2018 California Institute of Technology
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#
# ISOFIT: Imaging Spectrometer Optimal FITting
# Authors: David R Thompson, david.r.thompson@jpl.nasa.gov
#          Philip G Brodrick, philip.brodrick@jpl.nasa.gov
#          Adam Erickson, adam.m.erickson@nasa.gov
#
import logging
import multiprocessing
import os
import time

import click
import numpy as np

from isofit import ray
from isofit.configs import configs
from isofit.core.fileio import IO
from isofit.core.forward import ForwardModel
from isofit.inversion.inverse import Inversion
from isofit.inversion.inverse_mcmc import MCMCInversion


[docs] class Isofit: """Initialize the Isofit class. Args: config_file: isofit configuration file in JSON or YAML format level: logging level (ERROR, WARNING, INFO, DEBUG) logfile: file to write output logs to """ def __init__(self, config_file, level="INFO", logfile=None): # Explicitly set the number of threads to be 1, so we more effectively # run in parallel os.environ["MKL_NUM_THREADS"] = "1" os.environ["OMP_NUM_THREADS"] = "1" # Set logging level self.loglevel = level self.logfile = logfile logging.basicConfig( format="%(levelname)s:%(asctime)s ||| %(message)s", level=self.loglevel, filename=self.logfile, datefmt="%Y-%m-%d,%H:%M:%S", ) self.rows = None self.cols = None self.config = None # Load configuration file self.config = configs.create_new_config(config_file) self.config.get_config_errors() # Initialize ray for parallel execution rayargs = { "address": self.config.implementation.ip_head, "_redis_password": self.config.implementation.redis_password, "_temp_dir": self.config.implementation.ray_temp_dir, "ignore_reinit_error": self.config.implementation.ray_ignore_reinit_error, "include_dashboard": self.config.implementation.ray_include_dashboard, "local_mode": self.config.implementation.n_cores == 1, } # We can only set the num_cpus if running on a single-node if ( self.config.implementation.ip_head is None and self.config.implementation.redis_password is None ): rayargs["num_cpus"] = self.config.implementation.n_cores ray.init(**rayargs) self.workers = None
[docs] def run(self, row_column=None): """ Iterate over spectra, reading and writing through the IO object to handle formatting, buffering, and deferred write-to-file. Attempts to avoid reading the entire file into memory, or hitting the physical disk too often. row_column: The user can specify * a single number, in which case it is interpreted as a row * a comma-separated pair, in which case it is interpreted as a row/column tuple (i.e. a single spectrum) * a comma-separated quartet, in which case it is interpreted as a row, column range in the order (line_start, line_end, sample_start, sample_end) all values are inclusive. If none of the above, the whole cube will be analyzed. """ logging.info("Building first forward model, will generate any necessary LUTs") self.fm = fm = ForwardModel(self.config) if row_column is not None: ranges = row_column.split(",") if len(ranges) == 1: self.rows, self.cols = [int(ranges[0])], None if len(ranges) == 2: row_start, row_end = ranges self.rows, self.cols = range(int(row_start), int(row_end)), None elif len(ranges) == 4: row_start, row_end, col_start, col_end = ranges self.rows = range(int(row_start), int(row_end) + 1) self.cols = range(int(col_start), int(col_end) + 1) else: io = IO(self.config, fm) self.rows = range(io.n_rows) self.cols = range(io.n_cols) del io index_pairs = np.vstack( [x.flatten(order="f") for x in np.meshgrid(self.rows, self.cols)] ).T n_iter = index_pairs.shape[0] if self.config.implementation.n_cores is None: n_workers = multiprocessing.cpu_count() else: n_workers = self.config.implementation.n_cores # Max out the number of workers based on the number of tasks n_workers = min(n_workers, n_iter) params = [ ray.put(obj) for obj in [self.config, fm, self.loglevel, self.logfile, n_workers] ] self.workers = ray.util.ActorPool( [Worker.remote(*params, n) for n in range(n_workers)] ) start_time = time.time() n_tasks = min( n_workers * self.config.implementation.task_inflation_factor, n_iter ) logging.info( f"Beginning {n_iter} inversions in {n_tasks} chunks using {n_workers} cores" ) # Divide up spectra to run into chunks index_sets = np.linspace(0, n_iter, num=n_tasks, dtype=int) if len(index_sets) == 1: indices_to_run = [index_pairs[0:1, :]] else: indices_to_run = [ index_pairs[index_sets[l] : index_sets[l + 1], :] for l in range(len(index_sets) - 1) ] res = list( self.workers.map_unordered( lambda a, b: a.run_set_of_spectra.remote(b), indices_to_run ) ) total_time = time.time() - start_time logging.info( f"Inversions complete. {round(total_time,2)}s total," f" {round(n_iter/total_time,4)} spectra/s," f" {round(n_iter/total_time/n_workers,4)} spectra/s/core" )
@ray.remote(num_cpus=1) class Worker(object): def __init__( self, config: configs.Config, forward_model: ForwardModel, loglevel: str, logfile: str, total_workers: int = None, worker_id: int = None, ): """ Worker class to help run a subset of spectra. Args: config: isofit configuration loglevel: output logging level logfile: output logging file worker_id: worker ID for logging reference total_workers: the total number of workers running, for logging reference """ logging.basicConfig( format="%(levelname)s:%(asctime)s ||| %(message)s", level=loglevel, filename=logfile, datefmt="%Y-%m-%d,%H:%M:%S", ) self.config = config self.fm = forward_model # self.fm = ForwardModel(self.config) if self.config.implementation.mode == "mcmc_inversion": self.iv = MCMCInversion(self.config, self.fm) elif self.config.implementation.mode in ["inversion", "simulation"]: self.iv = Inversion(self.config, self.fm) else: # This should never be reached due to configuration checking raise AttributeError("Config implementation mode node valid") self.io = IO(self.config, self.fm) self.approximate_total_spectra = None if total_workers is not None: self.approximate_total_spectra = ( self.io.n_cols * self.io.n_rows / total_workers ) self.worker_id = worker_id self.completed_spectra = 0 def run_set_of_spectra(self, indices: np.array): for index in range(0, indices.shape[0]): logging.debug("Read chunk of spectra") row, col = indices[index, 0], indices[index, 1] input_data = self.io.get_components_at_index(row, col) self.completed_spectra += 1 if input_data is not None: logging.debug("Run model") # The inversion returns a list of states, which are # intepreted either as samples from the posterior (MCMC case) # or as a gradient descent trajectory (standard case). For # a trajectory, the last spectrum is the converged solution. states = self.iv.invert(input_data.meas, input_data.geom) logging.debug("Write chunk of spectra") # Write the spectra to disk try: self.io.write_spectrum(row, col, states, self.fm, self.iv) except ValueError as err: logging.exception( f""" Encountered the following ValueError in (row,col) ({row},{col}). Results for this pixel will be all zeros. """ ) if index % 100 == 0: if ( self.worker_id is not None and self.approximate_total_spectra is not None ): percent = np.round( self.completed_spectra / self.approximate_total_spectra * 100, 2, ) logging.info( f"Worker {self.worker_id} completed" f" {self.completed_spectra}/~{self.approximate_total_spectra}::" f" {percent}% complete" ) logging.info( f"Worker at start location ({row},{col}) completed" f" {index}/{indices.shape[0]}" ) self.io.flush_buffers() @click.command(name="run") @click.argument("config_file") @click.option( "-l", "--level", help="Log level", type=click.Choice( ["DEBUG", "INFO", "WARNING", "ERROR", "EXCEPTION"], case_sensitive=True ), default="INFO", ) def cli_run(config_file, level): """Execute ISOFIT core""" click.echo(f"Running ISOFIT(config_file={config_file!r}, level={level})") logging.basicConfig(format="%(message)s", level=level) Isofit(config_file=config_file, level=level).run() click.echo("Done")