Source code for isofit.utils.segment

#! /usr/bin/env python3
#
#  Copyright 2019 California Institute of Technology
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#
# ISOFIT: Imaging Spectrometer Optimal FITting
# Author: David R Thompson, david.r.thompson@jpl.nasa.gov
#

import logging

import numpy as np
import scipy
import skimage
from skimage.segmentation import slic
from spectral.io import envi

from isofit import ray
from isofit.core.common import envi_header


@ray.remote(num_cpus=1)
def segment_chunk(
    lstart, lend, in_file, nodata_value, npca, segsize, logfile=None, loglevel="INFO"
):
    """
    Segment a small chunk of the image

    Args:
        lstart: starting position in image file
        lend:  stopping position in image file
        in_file: file path to segment
        nodata_value: value to ignore
        npca:  number of pca components to use
        segsize: mean segmentation size
        logfile: logging file name
        loglevel: logging level

    Returns:
        lstart: starting position in image file
        lend: stopping position in image file
        labels: labeled image chunk

    """
    logging.basicConfig(
        format="%(levelname)s:%(asctime)s ||| %(message)s",
        level=loglevel,
        filename=logfile,
        datefmt="%Y-%m-%d,%H:%M:%S",
    )

    logging.info(f"{lstart}: starting")

    in_img = envi.open(envi_header(in_file), in_file)
    meta = in_img.metadata
    nl, nb, ns = [int(meta[n]) for n in ("lines", "bands", "samples")]
    img_mm = in_img.open_memmap(interleave="bip", writable=False)

    # Do quick single-band screen before reading all bands
    use = np.logical_not(np.isclose(np.array(img_mm[lstart:lend, :, 0]), nodata_value))
    if np.sum(use) == 0:
        logging.info(f"{lstart}: no non null data present, returning early")
        return lstart, lend, np.zeros((use.shape[0], ns))

    x = np.array(img_mm[lstart:lend, :, :]).astype(np.float32)
    nc = x.shape[0]
    x = x.reshape((nc * ns, nb))
    logging.debug(f"{lstart}: read and reshaped data")

    # Excluding bad locations, calculate top PCA coefficients
    use = np.all(abs(x - nodata_value) > 1e-6, axis=1)

    # If this chunk is empty, return immediately
    if np.sum(use) == 0:
        logging.info(f"{lstart}: no non null data present, returning early")
        return lstart, lend, np.zeros((nc, ns))

    mu = x[use, :].mean(axis=0)
    C = np.cov(x[use, :], rowvar=False)
    [v, d] = scipy.linalg.eigh(C)

    # Determine segmentation compactness scaling based on eigenvalues
    # Override with a floor value to prevent zeros
    cmpct = scipy.linalg.norm(np.sqrt(v[-npca:]))
    if cmpct < 1e-6:
        cmpct = 10.0
        print("Compactness override: %f" % cmpct)

    # Project, redimension as an image with "npca" channels, and segment
    x_pca_subset = (x[use, :] - mu) @ d[:, -npca:]
    del x, mu, d
    x_pca = np.zeros((nc, ns, npca))
    x_pca[use.reshape(nc, ns), :] = x_pca_subset
    del x_pca_subset

    x_pca = x_pca.reshape([nc, ns, npca])
    seg_in_chunk = int(sum(use) / float(segsize))

    logging.debug(f"{lstart}: starting slic")
    # for now, check the version of skimage to support call with deprecated parameters
    if skimage.__version__ >= "0.19.0":
        labels = slic(
            x_pca,
            n_segments=seg_in_chunk,
            compactness=cmpct,
            max_num_iter=10,
            sigma=0,
            channel_axis=2,
            enforce_connectivity=True,
            min_size_factor=0.5,
            max_size_factor=3,
            mask=use.reshape(nc, ns),
        )
    else:
        labels = slic(
            x_pca,
            n_segments=seg_in_chunk,
            compactness=cmpct,
            max_iter=10,
            sigma=0,
            multichannel=True,
            enforce_connectivity=True,
            min_size_factor=0.5,
            max_size_factor=3,
            mask=use.reshape(nc, ns),
        )

    # Reindex the subscene labels and place them into the larger scene
    labels = labels.reshape([nc * ns])
    labels[np.logical_not(use)] = 0
    labels = labels.reshape([nc, ns])

    logging.info(f"{lstart}: completing")
    return lstart, lend, labels


[docs] def segment( spectra: tuple, nodata_value: float, npca: int, segsize: int, nchunk: int, n_cores: int = 1, ray_address: str = None, ray_redis_password: str = None, ray_temp_dir=None, ray_ip_head=None, logfile=None, loglevel="INFO", ): """ Segment an image using SLIC on a PCA. Args: spectra: tuple of filepaths of image to segment and (optionally) output label file nodata_value: data to ignore in radiance image npca: number of pca components to use segsize: mean segmentation size nchunk: size of each image chunk n_cores: number of cores to use ray_address: ray address to connect to (for multinode implementation) ray_redis_password: ray password to use (for multinode implementation) ray_temp_dir: ray temp directory to reference ray_ip_head: ray ip head to reference (for multinode use) logfile: logging file to output to loglevel: logging level to use """ logging.basicConfig( format="%(levelname)s:%(message)s", level=loglevel, filename=logfile ) in_file = spectra[0] if len(spectra) > 1 and type(spectra) is tuple: lbl_file = spectra[1] else: lbl_file = spectra + "_lbl" # Open input data, get dimensions in_img = envi.open(envi_header(in_file), in_file) meta = in_img.metadata nl, nb, ns = [int(meta[n]) for n in ("lines", "bands", "samples")] # Start up a ray instance for parallel work rayargs = { "ignore_reinit_error": True, "local_mode": n_cores == 1, "address": ray_address, "include_dashboard": False, "_temp_dir": ray_temp_dir, "_redis_password": ray_redis_password, } # We can only set the num_cpus if running on a single-node if ray_ip_head is None and ray_redis_password is None: rayargs["num_cpus"] = n_cores ray.init(**rayargs) # Iterate through image "chunks," segmenting as we go all_labels = np.zeros((nl, ns), dtype=np.int64) jobs = [] # Enforce a minimum chunk size to prevent singularities downstream # This could eventually be made a user-tunable parameter but this # value should work in all cases min_lines_per_chunk = 10 for lstart in np.arange(0, nl - min_lines_per_chunk, nchunk): # Extend any chunk that falls within a small margin of the # end of the flightline lend = min(lstart + nchunk, nl) if lend > (nl - min_lines_per_chunk): lend = nl # Extract data jobs.append( segment_chunk.remote( lstart, lend, in_file, nodata_value, npca, segsize, logfile=logfile, loglevel=loglevel, ) ) # Collect results, making sure each chunk is distinct, and enforce an order next_label = 1 rreturn = [ray.get(jid) for jid in jobs] for lstart, lend, ret in rreturn: if ret is not None: logging.debug(f"Collecting chunk: {lstart}") chunk_label = ret.copy() unique_chunk_labels = np.unique(chunk_label[chunk_label != 0]) ordered_chunk_labels = np.zeros(chunk_label.shape) for lbl in unique_chunk_labels: ordered_chunk_labels[chunk_label == lbl] = next_label next_label += 1 all_labels[lstart:lend, ...] = ordered_chunk_labels del rreturn # Final file I/O logging.debug("Writing output") lbl_meta = { "samples": str(ns), "lines": str(nl), "bands": "1", "header offset": "0", "file type": "ENVI Standard", "data type": "4", "interleave": "bil", } lbl_img = envi.create_image(envi_header(lbl_file), lbl_meta, ext="", force=True) lbl_mm = lbl_img.open_memmap(interleave="source", writable=True) lbl_mm[:, :] = np.array(all_labels, dtype=np.float32).reshape((nl, 1, ns)) del lbl_mm