Source code for xdem.ddem

# Copyright (c) 2024 xDEM developers
#
# This file is part of the xDEM project:
# https://github.com/glaciohack/xdem
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
#
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Difference of DEMs classes and functions."""

from __future__ import annotations

import warnings
from typing import Any, Literal

import geoutils as gu
import numpy as np
import pyogrio
import rasterio as rio
import shapely
from geoutils.raster import Raster, RasterType
from geoutils.raster.array import get_array_and_mask
from rasterio.crs import CRS
from rasterio.warp import Affine

import xdem
from xdem._typing import MArrayf, NDArrayf


def _mask_as_array(reference_raster: gu.Raster, mask: str | gu.Vector | gu.Raster) -> NDArrayf:
    """
    Convert a given mask into an array.

    :param reference_raster: The raster to use for rasterizing the mask if the mask is a vector.
    :param mask: A valid Vector, Raster or a respective filepath to a mask.

    :raises: ValueError: If the mask path is invalid.
    :raises: TypeError: If the wrong mask type was given.

    :returns: The mask as a squeezed array.
    """
    # Try to load the mask file if it's a filepath
    if isinstance(mask, str):
        # First try to load it as a Vector
        try:
            mask = gu.Vector(mask)
        # If the format is unsupported, try loading as a Raster
        except pyogrio.errors.DataSourceError:
            try:
                mask = gu.Raster(mask)
            # If that fails, raise an error
            except rio.errors.RasterioIOError:
                raise ValueError(f"Raster path not in a supported Raster or Vector format: {mask}")

    # At this point, the mask variable is either a Raster or a Vector
    # Now, convert the mask into an array by either rasterizing a Vector or by fetching a Raster's data
    if isinstance(mask, gu.Vector):
        mask_array = mask.create_mask(reference_raster, as_array=True)
    elif isinstance(mask, gu.Raster):
        # The true value is the maximum value in the raster, unless the maximum value is 0 or False
        true_value = np.nanmax(mask.data) if not np.nanmax(mask.data) in [0, False] else True
        mask_array = (mask.data == true_value).squeeze()
    else:
        raise TypeError(
            f"Raster has invalid type: {type(mask)}. Expected one of: " f"{[gu.Raster, gu.Vector, str, type(None)]}"
        )

    return mask_array


[docs] class dDEM(Raster): # type: ignore """A difference-DEM object."""
[docs] def __init__(self, raster: gu.Raster, start_time: np.datetime64, end_time: np.datetime64, error: Any | None = None): """ Create a dDEM object from a Raster. :param raster: A georeferenced Raster object. :param start_time: The starting time of the dDEM. :param end_time: The end time of the dDEM. :param error: An error measure for the dDEM (UNUSED). :returns: A new dDEM instance. """ # super().__init__(raster) self.__dict__ = raster.__dict__ self.start_time = start_time self.end_time = end_time self.error = error self._filled_data: NDArrayf | None = None self._fill_method = ""
def __str__(self) -> str: """Return a summary of the dDEM.""" return f"dDEM from {self.start_time} to {self.end_time}.\n\n{super().__str__()}" def copy(self, new_array: NDArrayf = None) -> dDEM: """Return a copy of the DEM.""" if new_array is None: new_array = self.data.copy() new_ddem = dDEM.from_array(new_array, self.transform, self.crs, self.start_time, self.end_time) return new_ddem @property def filled_data(self) -> NDArrayf | None: """ Get the filled data array if it exists, or else the original data if it has no nans. Returns None if the filled_data array does not exist, and the original data has nans. :returns: An array or None """ if self._filled_data is not None: return self._filled_data if (isinstance(self.data, np.ma.masked_array) and np.any(self.data.mask)) or np.any(np.isnan(self.data)): return None return np.asarray(self.data) @filled_data.setter def filled_data(self, array: NDArrayf) -> None: """Set the filled_data attribute and make sure that it is valid.""" assert ( self.data.size == array.size ), f"Array shape '{array.shape}' differs from the data shape '{self.data.shape}'" self._filled_data = np.asarray(array).reshape(self.data.shape) @property def fill_method(self) -> str: """Return the fill method used for the filled_data.""" return self._fill_method @property def time(self) -> np.timedelta64: """Get the time duration.""" return self.end_time - self.start_time @classmethod def from_array( cls: type[RasterType], data: NDArrayf | MArrayf, transform: tuple[float, ...] | Affine, crs: CRS | int | None, start_time: np.datetime64, end_time: np.datetime64, nodata: int | float | None = None, error: float = None, ) -> dDEM: # type: ignore """ Create a new dDEM object from an array. :param data: The dDEM data array. :param transform: A geometric transform. :param crs: The coordinate reference system of the dDEM. :param start_time: The starting time of the dDEM. :param end_time: The end time of the dDEM. :param error: An error measure for the dDEM. :param nodata: The nodata value. :returns: A new dDEM instance. """ return cls( gu.Raster.from_array(data=data, transform=transform, crs=crs, nodata=nodata), start_time=start_time, end_time=end_time, error=error, ) def interpolate( self, method: Literal["idw", "local_hypsometric", "regional_hypsometric"] = "idw", reference_elevation: NDArrayf | np.ma.masked_array[Any, np.dtype[np.floating[Any]]] | xdem.DEM = None, mask: NDArrayf | xdem.DEM | gu.Vector = None, ) -> NDArrayf | None: """ Interpolate the dDEM using the given method. :param method: The method to use for interpolation. :param reference_elevation: Reference DEM. Only required for hypsometric approaches. """ if reference_elevation is not None: try: reference_elevation = reference_elevation.reproject(self, silent=True) # type: ignore except AttributeError as exception: if "object has no attribute 'reproject'" not in str(exception): raise exception if isinstance(reference_elevation, np.ndarray): reference_elevation = np.ma.masked_array(reference_elevation, mask=np.isnan(reference_elevation)) assert reference_elevation.data.shape == self.data.shape, ( f"'reference_elevation' shape ({reference_elevation.data.shape})" f" different from 'self' ({self.data.shape})" ) if method == "idw": self.filled_data = xdem.volume.idw_interpolation(self.data) elif method == "local_hypsometric": assert reference_elevation is not None assert mask is not None if not isinstance(mask, gu.Vector): mask = gu.Vector(mask) interpolated_ddem, nans = get_array_and_mask(self.data.copy()) entries = mask.ds[mask.ds.intersects(shapely.geometry.box(*self.bounds))] ddem_mask = nans.copy().squeeze() for i in entries.index: feature_mask = (gu.Vector(entries.loc[entries.index == i]).create_mask(self, as_array=True)).reshape( interpolated_ddem.shape ) if np.count_nonzero(feature_mask) == 0: continue try: with warnings.catch_warnings(): warnings.filterwarnings("ignore", "Not enough valid bins") warnings.filterwarnings("ignore", "invalid value encountered in divide") interpolated_ddem = np.asarray( xdem.volume.hypsometric_interpolation( interpolated_ddem, reference_elevation.data, mask=feature_mask ) ) except ValueError as exception: # Skip the feature if too few glacier values exist. if "x and y arrays must have at least 2 entries" in str(exception): continue raise exception # Set the validity flag of all values within the feature to be valid ddem_mask[feature_mask] = False # All values that were nan in the start and are without the updated validity mask should now be nan # The above interpolates values outside of the dDEM, so this is necessary. interpolated_ddem[ddem_mask] = np.nan diff = abs(np.nanmean(interpolated_ddem - self.data)) assert diff < 0.01, (diff, self.data.mean()) self.filled_data = xdem.volume.idw_interpolation(interpolated_ddem) elif method == "regional_hypsometric": assert reference_elevation is not None assert mask is not None mask_array = _mask_as_array(self, mask).reshape(self.data.shape) self.filled_data = xdem.volume.hypsometric_interpolation( self.data, reference_elevation.data, mask=mask_array ).data else: raise NotImplementedError(f"Interpolation method '{method}' not supported") return self.filled_data