Source code for isofit.inversion.inverse_mcmc

#! /usr/bin/env python3
#
#  Copyright 2018 California Institute of Technology
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#
# ISOFIT: Imaging Spectrometer Optimal FITting
# Author: David R Thompson, david.r.thompson@jpl.nasa.gov
#

import numpy as np
import scipy.linalg
import scipy.stats

from isofit.configs import Config
from isofit.core.common import eps
from isofit.core.forward import ForwardModel

from .inverse import Inversion


[docs] class MCMCInversion(Inversion): def __init__(self, full_config: Config, forward: ForwardModel): """Initialize and apply defaults.""" Inversion.__init__(self, full_config, forward) mcmc_config = full_config.implementation.inversion.mcmc self.iterations = mcmc_config.iterations self.burnin = mcmc_config.burnin self.regularizer = mcmc_config.regularizer self.proposal_scaling = mcmc_config.proposal_scaling self.verbose = mcmc_config.verbose self.restart_every = mcmc_config.restart_every
[docs] def stable_mvnpdf(self, mean, cov, x): """Stable inverse via Singular Value Decomposition, using only the significant eigenvectors.""" U, V, D = scipy.linalg.svd(cov) use = np.where(V > 0)[0] Cinv = (D[use, :].T).dot(np.diag(1.0 / V[use])).dot(U[:, use].T) logCdet = np.sum(np.log(2.0 * np.pi * V[use])) # Multivariate Gaussian PDF lead = -0.5 * logCdet dist = (x - mean)[:, np.newaxis] diverg = -0.5 * (dist.T).dot(Cinv).dot(dist) return lead + diverg
[docs] def log_density(self, x, rdn_meas, geom, bounds): """Log probability density combines prior and likelihood terms.""" # First check bounds if bounds is not None and any(np.logical_or(x < bounds[0], x > bounds[1])): return -np.Inf # Prior term Sa = self.fm.Sa(x, geom) xa = self.fm.xa(x, geom) pa = self.stable_mvnpdf(xa, Sa, x) # Data likelihood term Seps = self.fm.Seps(x, rdn_meas, geom) Seps_win = np.array([Seps[i, self.winidx] for i in self.winidx]) rdn_est = self.fm.calc_rdn(x, geom, rfl=None, Ls=None) pm = self.stable_mvnpdf(rdn_est[self.winidx], Seps_win, rdn_meas[self.winidx]) return pa + pm
[docs] def invert(self, rdn_meas, geom): """Inverts a meaurement. Returns an array of state vector samples. Similar to Inversion.invert() but returns a list of samples.""" # We will truncate non-surface parameters to their bounds, but leave # Surface reflectance unconstrained so it can dip slightly below zero # in a channel without invalidating the whole vector bounds = np.array([self.fm.bounds[0].copy(), self.fm.bounds[1].copy()]) bounds[:, self.fm.idx_surface] = np.array([[-np.inf], [np.inf]]) # Initialize to conjugate gradient solution x_MAP = Inversion.invert(self, rdn_meas, geom)[-1] # Proposal is based on the posterior uncertainty S_hat, K, G = self.calc_posterior(x_MAP, geom, rdn_meas) proposal_Cov = S_hat * self.proposal_scaling proposal = scipy.stats.multivariate_normal(cov=proposal_Cov) # We will use this routine for initializing def initialize(): x = scipy.stats.multivariate_normal(mean=x_MAP, cov=S_hat).rvs() too_low = x < bounds[0] x[too_low] = bounds[0][too_low] + eps too_high = x > bounds[1] x[too_high] = bounds[1][too_high] - eps dens = self.log_density(x, rdn_meas, geom, bounds) return x, dens # Sample from the posterior using Metropolis/Hastings MCMC samples, acpts, rejs, x = [], 0, 0, None for i in range(self.iterations): if i % self.restart_every == 0: x, dens = initialize() xp = x + proposal.rvs() dens_new = self.log_density(xp, rdn_meas, geom, bounds=bounds) # Test vs. the Metropolis / Hastings criterion if np.isfinite(dens_new) and np.log(np.random.rand()) <= min( (dens_new - dens, 0.0) ): x = xp dens = dens_new acpts = acpts + 1 if self.verbose: print( "%8.5e %8.5e ACCEPT! rate %4.2f" % (dens, dens_new, np.mean(acpts / (acpts + rejs))) ) else: rejs = rejs + 1 if self.verbose: print( "%8.5e %8.5e REJECT rate %4.2f" % (dens, dens_new, np.mean(acpts / (acpts + rejs))) ) # Make sure we have not wandered off the map if not np.isfinite(dens_new): x, dens = initialize() if i % self.restart_every < self.burnin: samples.append(x) return np.array(samples)