# Copyright (c) 2024 xDEM developers
#
# This file is part of the xDEM project:
# https://github.com/glaciohack/xdem
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
#
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Base coregistration classes to define generic methods and pre/post-processing of input data."""
from __future__ import annotations
import copy
import inspect
import logging
import warnings
from typing import (
Any,
Callable,
Generator,
Iterable,
Literal,
Mapping,
TypedDict,
TypeVar,
overload,
)
import affine
import geopandas as gpd
import geoutils as gu
import numpy as np
import pandas as pd
import rasterio as rio
import rasterio.warp # pylint: disable=unused-import
import scipy
import scipy.interpolate
import scipy.ndimage
import scipy.optimize
from geoutils import profiler
from geoutils.interface.gridding import _grid_pointcloud
from geoutils.interface.interpolate import _interp_points
from geoutils.pointcloud.pointcloud import PointCloud, PointCloudType
from geoutils.raster import Raster, RasterType, raster
from geoutils.raster._geotransformations import _resampling_method_from_str
from geoutils.raster.array import get_array_and_mask
from geoutils.raster.georeferencing import _cast_pixel_interpretation, _coords
from geoutils.raster.geotransformations import _translate
import xdem
from xdem._typing import MArrayf, NDArrayb, NDArrayf
from xdem.fit import (
polynomial_1d,
robust_nfreq_sumsin_fit,
robust_norder_polynomial_fit,
sumsin_1d,
)
from xdem.spatialstats import nd_binning
# Map each workflow name to a function and optimizer
fit_workflows = {
"norder_polynomial": {"func": polynomial_1d, "optimizer": robust_norder_polynomial_fit},
"nfreq_sumsin": {"func": sumsin_1d, "optimizer": robust_nfreq_sumsin_fit},
}
# Map each key name to a descriptor string
dict_key_to_str = {
"subsample": "Subsample size requested",
"random_state": "Random generator",
"subsample_final": "Subsample size drawn from valid values",
"fit_or_bin": "Fit, bin or bin+fit",
"fit_func": "Function to fit",
"fit_optimizer": "Optimizer for fitting",
"fit_minimizer": "Minimizer of method",
"fit_loss_func": "Loss function of method",
"bin_statistic": "Binning statistic",
"bin_sizes": "Bin sizes or edges",
"bin_apply_method": "Bin apply method",
"bias_var_names": "Names of bias variables",
"nd": "Number of dimensions of binning and fitting",
"fit_params": "Optimized function parameters",
"fit_perr": "Error on optimized function parameters",
"bin_dataframe": "Binning output dataframe",
"max_iterations": "Maximum number of iterations",
"tolerance": "Tolerance to reach (pixel size)",
"last_iteration": "Iteration at which algorithm stopped",
"all_tolerances": "Tolerances at each iteration",
"terrain_attribute": "Terrain attribute used for correction",
"angle": "Angle of directional correction",
"poly_order": "Polynomial order",
"best_poly_order": "Best polynomial order",
"best_nb_sin_freq": "Best number of sinusoid frequencies",
"vshift_reduc_func": "Reduction function used to remove vertical shift",
"apply_vshift": "Vertical shift activated",
"centroid": "Centroid found for affine rotation",
"shift_x": "Eastward shift estimated (georeferenced unit)",
"shift_y": "Northward shift estimated (georeferenced unit)",
"shift_z": "Vertical shift estimated (elevation unit)",
"initial_shift": "Estimated initial shift (georeferenced unit)",
"matrix": "Affine transformation matrix estimated",
"only_translation": "Only translations are considered",
"standardize": "Input data was standardized",
"icp_method": "Type of ICP method",
"icp_picky": "Picky closest pair selection",
"cpd_weight": "Weight of CPD outlier removal",
}
#####################################
# Generic functions for preprocessing
###########################################
def _preprocess_coreg_fit_raster_raster(
reference_dem: NDArrayf | MArrayf | RasterType,
dem_to_be_aligned: NDArrayf | MArrayf | RasterType,
inlier_mask: NDArrayb | Raster | None = None,
transform: rio.transform.Affine | None = None,
crs: rio.crs.CRS | None = None,
area_or_point: Literal["Area", "Point"] | None = None,
) -> tuple[NDArrayf, NDArrayf, NDArrayb, affine.Affine, rio.crs.CRS, Literal["Area", "Point"] | None]:
"""Pre-processing and checks of fit() for two raster input."""
# Validate that both inputs are valid array-like (or Raster) types.
if not all(isinstance(dem, (np.ndarray, gu.Raster)) for dem in (reference_dem, dem_to_be_aligned)):
raise ValueError(
"Both DEMs need to be array-like (implement a numpy array interface)."
f"'reference_dem': {reference_dem}, 'dem_to_be_aligned': {dem_to_be_aligned}"
)
if inlier_mask is not None:
# If inlier_mask has not the same shape of the input dem, reproject it
if reference_dem.shape != inlier_mask.shape:
if isinstance(inlier_mask, gu.Raster):
if isinstance(reference_dem, gu.Raster):
inlier_mask = inlier_mask.reproject(reference_dem, resampling=rio.warp.Resampling.nearest)
else:
ref = Raster.from_array(data=reference_dem, transform=transform, crs=crs)
inlier_mask = inlier_mask.reproject(ref, resampling=rio.warp.Resampling.nearest)
# in case of mask is a array
else:
raise ValueError("Input mask array can't be a different size array as input elevation.")
# If both DEMs are Rasters, validate that 'dem_to_be_aligned' is in the right grid. Then extract its data.
if isinstance(dem_to_be_aligned, gu.Raster) and isinstance(reference_dem, gu.Raster):
dem_to_be_aligned = dem_to_be_aligned.reproject(reference_dem, silent=True)
# If both inputs are raster, cast their pixel interpretation and override any individual interpretation
indiv_check = True
new_aop = None
if isinstance(reference_dem, gu.Raster) and isinstance(dem_to_be_aligned, gu.Raster):
# Casts pixel interpretation, raises a warning if they differ (can be silenced with global config)
new_aop = _cast_pixel_interpretation(reference_dem.area_or_point, dem_to_be_aligned.area_or_point)
if area_or_point is not None:
warnings.warn("Pixel interpretation cast from the two input rasters overrides the given 'area_or_point'.")
indiv_check = False
# If any input is a Raster, use its transform if 'transform is None'.
# If 'transform' was given and any input is a Raster, trigger a warning.
# Finally, extract only the data of the raster.
new_transform = None
new_crs = None
for name, dem in [("reference_dem", reference_dem), ("dem_to_be_aligned", dem_to_be_aligned)]:
if isinstance(dem, gu.Raster):
# If a raster was passed, override the transform, reference raster has priority to set new_transform.
if transform is None:
new_transform = dem.transform
elif transform is not None and new_transform is None:
new_transform = dem.transform
warnings.warn(f"'{name}' of type {type(dem)} overrides the given 'transform'")
# Same for crs
if crs is None:
new_crs = dem.crs
elif crs is not None and new_crs is None:
new_crs = dem.crs
warnings.warn(f"'{name}' of type {type(dem)} overrides the given 'crs'")
# Same for pixel interpretation, only if both inputs aren't rasters (which requires casting, see above)
if indiv_check:
if area_or_point is None:
new_aop = dem.area_or_point
elif crs is not None and new_aop is None:
new_aop = dem.area_or_point
warnings.warn(f"'{name}' of type {type(dem)} overrides the given 'area_or_point'")
# Override transform, CRS and pixel interpretation
if new_transform is not None:
transform = new_transform
if new_crs is not None:
crs = new_crs
if new_aop is not None:
area_or_point = new_aop
if transform is None:
raise ValueError("'transform' must be given if both DEMs are array-like.")
if crs is None:
raise ValueError("'crs' must be given if both DEMs are array-like.")
# Get a NaN array covering nodatas from the raster, masked array or integer-type array
with warnings.catch_warnings():
warnings.filterwarnings(action="ignore", category=UserWarning)
ref_dem, ref_mask = get_array_and_mask(reference_dem, copy=True)
tba_dem, tba_mask = get_array_and_mask(dem_to_be_aligned, copy=True)
# Make sure that the mask has an expected format.
if inlier_mask is not None:
if isinstance(inlier_mask, Raster):
inlier_mask = inlier_mask.data.filled(False).squeeze()
else:
inlier_mask = np.asarray(inlier_mask).squeeze()
assert inlier_mask.dtype == bool, f"Invalid mask dtype: '{inlier_mask.dtype}'. Expected 'bool'"
if np.all(~inlier_mask):
raise ValueError("'inlier_mask' had no inliers.")
else:
inlier_mask = np.ones(np.shape(ref_dem), dtype=bool)
if np.all(ref_mask):
raise ValueError("'reference_dem' had only NaNs")
if np.all(tba_mask):
raise ValueError("'dem_to_be_aligned' had only NaNs")
# Isolate all invalid values
invalid_mask = np.logical_or.reduce((~inlier_mask, ref_mask, tba_mask))
if np.all(invalid_mask):
raise ValueError("All values of the inlier mask are NaNs in either 'reference_dem' or 'dem_to_be_aligned'.")
return ref_dem, tba_dem, inlier_mask, transform, crs, area_or_point
def _preprocess_coreg_fit_raster_point(
raster_elev: NDArrayf | MArrayf | RasterType,
point_elev: gpd.GeoDataFrame,
inlier_mask: NDArrayb | Raster | None = None,
transform: rio.transform.Affine | None = None,
crs: rio.crs.CRS | None = None,
area_or_point: Literal["Area", "Point"] | None = None,
) -> tuple[NDArrayf, gpd.GeoDataFrame, NDArrayb, affine.Affine, rio.crs.CRS, Literal["Area", "Point"] | None]:
"""Pre-processing and checks of fit for raster-point input."""
if inlier_mask is not None:
# If inlier_mask has not the same shape of the input dem, reproject it
if raster_elev.shape != inlier_mask.shape:
if isinstance(inlier_mask, gu.Raster):
if isinstance(raster_elev, gu.Raster):
inlier_mask = inlier_mask.reproject(raster_elev, resampling=rio.warp.Resampling.nearest)
else:
raster_rst = Raster.from_array(data=raster_elev, transform=transform, crs=crs)
inlier_mask = inlier_mask.reproject(raster_rst, resampling=rio.warp.Resampling.nearest)
# If inlier_mask is an array, it is not possible to reproject it
else:
raise ValueError("Input mask array can't be a different size array as input elevation.")
# TODO: Convert to point cloud once class is done
# TODO: Raise warnings consistently with raster-raster function, see Amelie's Dask PR? #525
if isinstance(raster_elev, gu.Raster):
rst_elev = raster_elev.data
crs = raster_elev.crs
transform = raster_elev.transform
area_or_point = raster_elev.area_or_point
else:
rst_elev = raster_elev
crs = crs
transform = transform
area_or_point = area_or_point
if transform is None:
raise ValueError("'transform' must be given if both DEMs are array-like.")
if crs is None:
raise ValueError("'crs' must be given if both DEMs are array-like.")
# Make sure that the mask has an expected format.
if inlier_mask is not None:
if isinstance(inlier_mask, Raster):
inlier_mask = inlier_mask.data.filled(False).squeeze()
else:
inlier_mask = np.asarray(inlier_mask).squeeze()
assert inlier_mask.dtype == bool, f"Invalid mask dtype: '{inlier_mask.dtype}'. Expected 'bool'"
if np.all(~inlier_mask):
raise ValueError("'inlier_mask' had no inliers.")
else:
inlier_mask = np.ones(np.shape(rst_elev), dtype=bool)
# TODO: Convert to point cloud?
# Convert geodataframe to vector
point_elev = point_elev.to_crs(crs=crs)
return rst_elev, point_elev, inlier_mask, transform, crs, area_or_point
def _preprocess_coreg_fit_point_point(
reference_elev: gpd.GeoDataFrame, to_be_aligned_elev: gpd.GeoDataFrame
) -> tuple[gpd.GeoDataFrame, gpd.GeoDataFrame]:
"""Pre-processing and checks of fit for point-point input."""
ref_elev = reference_elev
tba_elev = to_be_aligned_elev.to_crs(crs=reference_elev.crs)
return ref_elev, tba_elev
def _preprocess_coreg_fit(
reference_elev: NDArrayf | MArrayf | RasterType | gpd.GeoDataFrame | PointCloudType,
to_be_aligned_elev: NDArrayf | MArrayf | RasterType | gpd.GeoDataFrame | PointCloudType,
inlier_mask: NDArrayb | Raster | None = None,
transform: rio.transform.Affine | None = None,
crs: rio.crs.CRS | None = None,
area_or_point: Literal["Area", "Point"] | None = None,
z_name: str | None = None,
) -> tuple[
NDArrayf | gpd.GeoDataFrame,
NDArrayf | gpd.GeoDataFrame,
NDArrayb | None,
affine.Affine | None,
rio.crs.CRS | None,
Literal["Area", "Point"] | None,
str | None,
]:
"""Pre-processing and checks of fit for any input."""
for elev in (reference_elev, to_be_aligned_elev):
if not isinstance(elev, (np.ndarray, gu.Raster, gpd.GeoDataFrame, gu.PointCloud)):
raise ValueError(
f"Input elevation data should be a raster, array, geodataframe or point cloud, " f"got {type(elev)}."
)
# If both inputs are raster or arrays, reprojection on the same grid is needed for raster-raster methods
if all(isinstance(elev, (np.ndarray, gu.Raster)) for elev in (reference_elev, to_be_aligned_elev)):
ref_elev, tba_elev, inlier_mask, transform, crs, area_or_point = _preprocess_coreg_fit_raster_raster(
reference_dem=reference_elev,
dem_to_be_aligned=to_be_aligned_elev,
inlier_mask=inlier_mask,
transform=transform,
crs=crs,
area_or_point=area_or_point,
)
# If one input is raster, and the other is point, we reproject the point data to the same CRS and extract arrays
elif any(isinstance(dem, (np.ndarray, gu.Raster)) for dem in (reference_elev, to_be_aligned_elev)):
if isinstance(reference_elev, (np.ndarray, gu.Raster)):
raster_elev = reference_elev
point_elev = (
to_be_aligned_elev
if isinstance(to_be_aligned_elev, gpd.GeoDataFrame)
else to_be_aligned_elev.ds # type: ignore
)
z_name = (
z_name
if isinstance(to_be_aligned_elev, gpd.GeoDataFrame)
else to_be_aligned_elev.data_column # type: ignore
)
ref = "raster"
else:
raster_elev = to_be_aligned_elev
point_elev = reference_elev if isinstance(reference_elev, gpd.GeoDataFrame) else reference_elev.ds
z_name = z_name if isinstance(reference_elev, gpd.GeoDataFrame) else reference_elev.data_column
ref = "point"
raster_elev, point_elev, inlier_mask, transform, crs, area_or_point = _preprocess_coreg_fit_raster_point(
raster_elev=raster_elev,
point_elev=point_elev,
inlier_mask=inlier_mask,
transform=transform,
crs=crs,
area_or_point=area_or_point,
)
if ref == "raster":
ref_elev = raster_elev
tba_elev = point_elev
else:
ref_elev = point_elev
tba_elev = raster_elev
# If both inputs are points, simply reproject to the same CRS
else:
ref_elev = reference_elev if isinstance(reference_elev, gpd.GeoDataFrame) else reference_elev.ds # type: ignore
tba_elev = (
to_be_aligned_elev
if isinstance(to_be_aligned_elev, gpd.GeoDataFrame)
else to_be_aligned_elev.ds # type: ignore
)
z_name = (
z_name
if isinstance(to_be_aligned_elev, gpd.GeoDataFrame)
else to_be_aligned_elev.data_column # type: ignore
)
ref_elev, tba_elev = _preprocess_coreg_fit_point_point(reference_elev=ref_elev, to_be_aligned_elev=tba_elev)
return ref_elev, tba_elev, inlier_mask, transform, crs, area_or_point, z_name
def _preprocess_coreg_apply(
elev: NDArrayf | MArrayf | RasterType | gpd.GeoDataFrame | PointCloudType,
transform: rio.transform.Affine | None = None,
crs: rio.crs.CRS | None = None,
z_name: str | None = None,
) -> tuple[NDArrayf | gpd.GeoDataFrame, affine.Affine, rio.crs.CRS, str | None]:
"""Pre-processing and checks of apply for any input."""
if not isinstance(elev, (np.ndarray, Raster, gpd.GeoDataFrame, PointCloud)):
raise ValueError(
f"Input elevation data should be a raster, array, geodataframe or point cloud, " f"got {type(elev)}."
)
# If input is geodataframe or point cloud
if isinstance(elev, (gpd.GeoDataFrame, PointCloud)):
if isinstance(elev, PointCloud):
elev_out = elev.ds
z_name = elev.data_column
else:
elev_out = elev
z_name = z_name
new_transform = None
new_crs = None
# If input is a raster or array
else:
# If input is raster
if isinstance(elev, gu.Raster):
if transform is not None:
warnings.warn(f"DEM of type {type(elev)} overrides the given 'transform'")
if crs is not None:
warnings.warn(f"DEM of type {type(elev)} overrides the given 'crs'")
new_transform = elev.transform
new_crs = elev.crs
# If input is an array
else:
if transform is None:
raise ValueError("'transform' must be given if DEM is array-like.")
if crs is None:
raise ValueError("'crs' must be given if DEM is array-like.")
new_transform = transform
new_crs = crs
# The array to provide the functions will be an ndarray with NaNs for masked out areas.
elev_out, elev_mask = get_array_and_mask(elev)
if np.all(elev_mask):
raise ValueError("'dem' had only NaNs")
return elev_out, new_transform, new_crs, z_name
def _postprocess_coreg_apply_pts(
elev: gpd.GeoDataFrame | PointCloudType,
applied_elev: gpd.GeoDataFrame,
) -> gpd.GeoDataFrame | PointCloudType:
"""Post-processing and checks of apply for point input."""
if isinstance(elev, PointCloud):
applied_elev = xdem.EPC(applied_elev, data_column=elev.data_column)
else:
applied_elev = applied_elev
# TODO: Convert CRS back if the CRS did not match the one of the fit?
return applied_elev
def _postprocess_coreg_apply_rst(
elev: NDArrayf | gu.Raster,
applied_elev: NDArrayf,
transform: affine.Affine,
out_transform: affine.Affine,
crs: rio.crs.CRS,
resample: bool,
resampling: rio.warp.Resampling | None = None,
) -> tuple[NDArrayf | gu.Raster, affine.Affine]:
"""
Post-processing and checks of apply for raster input.
Here, "elev" and "transform" corresponds to user input, and are required to transform back the output that is
composed of "applied_elev" and "out_transform".
"""
# Ensure the dtype is OK
applied_elev = applied_elev.astype("float32")
# Set default dst_nodata
if isinstance(elev, gu.Raster):
nodata = elev.nodata
else:
nodata = raster._default_nodata(elev.dtype)
# Resample the array on the original grid
if resample:
# TODO: Use this function for a translation only, for consistency with the rest of Coreg?
# (would require checking transform difference is only a translation)
# applied_elev = _reproject_horizontal_shift_samecrs(raster_arr=applied_elev, src_transform=out_transform,
# dst_transform=transform)
# Reproject the DEM from its out_transform onto the transform
applied_rst = gu.Raster.from_array(applied_elev, out_transform, crs=crs, nodata=nodata)
if not isinstance(elev, gu.Raster):
match_rst = gu.Raster.from_array(elev, transform, crs=crs, nodata=nodata)
else:
match_rst = elev
applied_rst = applied_rst.reproject(match_rst, resampling=resampling, silent=True)
applied_elev = applied_rst.data
# Now that the raster data is reprojected, the new out_transform is set as the original transform
out_transform = transform
# Calculate final mask
final_mask = np.logical_or(~np.isfinite(applied_elev), applied_elev == nodata)
# If the DEM was a masked_array, copy the mask to the new DEM
if isinstance(elev, (np.ma.masked_array, gu.Raster)):
applied_elev = np.ma.masked_array(applied_elev, mask=final_mask) # type: ignore
else:
applied_elev[final_mask] = np.nan
# If the input was a Raster, returns a Raster, else returns array and transform
if isinstance(elev, gu.Raster):
out_dem = elev.from_array(applied_elev, out_transform, crs, nodata=elev.nodata)
return out_dem, out_transform
else:
return applied_elev, out_transform
def _postprocess_coreg_apply(
elev: NDArrayf | gu.Raster | gpd.GeoDataFrame | PointCloudType,
applied_elev: NDArrayf | gpd.GeoDataFrame,
transform: affine.Affine,
out_transform: affine.Affine,
crs: rio.crs.CRS,
resample: bool,
resampling: rio.warp.Resampling | None = None,
) -> tuple[NDArrayf | gu.Raster | gpd.GeoDataFrame | gu.PointCloud, affine.Affine]:
"""
Post-processing and checks of apply for any input.
Here, "elev" and "transform" corresponds to user input, and are required to transform back the output that is
composed of "applied_elev" and "out_transform".
"""
# Define resampling
resampling = resampling if isinstance(resampling, rio.warp.Resampling) else _resampling_method_from_str(resampling)
# Distribute between raster and point apply methods
if isinstance(applied_elev, np.ndarray):
applied_elev, out_transform = _postprocess_coreg_apply_rst(
elev=elev,
applied_elev=applied_elev,
transform=transform,
crs=crs,
out_transform=out_transform,
resample=resample,
resampling=resampling,
)
else:
applied_elev = _postprocess_coreg_apply_pts(elev=elev, applied_elev=applied_elev)
return applied_elev, out_transform
###############################################
# Statistical functions (to be moved in future)
###############################################
def _get_subsample_on_valid_mask(params_random: InRandomDict, valid_mask: NDArrayb) -> NDArrayb:
"""
Get mask of values to subsample on valid mask (works for both 1D or 2D arrays).
:param valid_mask: Raster of valid values (inlier and not nodata).
"""
# This should never happen
if params_random["subsample"] is None:
raise ValueError("Subsample should have been defined in metadata before reaching this class method.")
# If valid mask is empty
if np.count_nonzero(valid_mask) == 0:
raise ValueError(
"There is no valid points common to the input and auxiliary data (bias variables, or "
"derivatives required for this method, for example slope, aspect, etc)."
)
# If subsample is not equal to one, subsampling should be performed.
elif params_random["subsample"] != 1.0:
# Build a low memory masked array with invalid values masked to pass to subsampling
ma_valid = np.ma.masked_array(data=np.ones(np.shape(valid_mask), dtype=bool), mask=~valid_mask)
# Take a subsample within the valid values
indices = gu.stats.sampling.subsample_array(
ma_valid,
subsample=params_random["subsample"],
return_indices=True,
random_state=params_random["random_state"],
)
# We return a boolean mask of the subsample within valid values
subsample_mask = np.zeros(np.shape(valid_mask), dtype=bool)
if len(indices) == 2:
subsample_mask[indices[0], indices[1]] = True
else:
subsample_mask[indices[0]] = True
else:
# If no subsample is taken, use all valid values
subsample_mask = valid_mask
logging.debug(
"Using a subsample of %d among %d valid values.", np.count_nonzero(subsample_mask), np.count_nonzero(valid_mask)
)
return subsample_mask
def _get_subsample_mask_pts_rst(
params_random: InRandomDict,
ref_elev: NDArrayf | gpd.GeoDataFrame,
tba_elev: NDArrayf | gpd.GeoDataFrame,
inlier_mask: NDArrayb,
transform: rio.transform.Affine, # Never None thanks to Coreg.fit() pre-process
z_name: str,
area_or_point: Literal["Area", "Point"] | None,
aux_vars: None | dict[str, NDArrayf] = None,
) -> NDArrayb:
"""
Get subsample mask for raster-raster or point-raster datasets on valid points of all inputs (including
potential auxiliary variables).
Returns a boolean array to use for subsampling (2D for raster-raster, 1D for point-raster to be used on point).
"""
# TODO: Return more detailed error message for no valid points (which variable was full of NaNs?)
if isinstance(ref_elev, gpd.GeoDataFrame) and isinstance(tba_elev, gpd.GeoDataFrame):
raise TypeError(
"This pre-processing function is only intended for raster-point or raster-raster methods, "
"not point-point methods."
)
# For two rasters
if isinstance(ref_elev, np.ndarray) and isinstance(tba_elev, np.ndarray):
# Compute mask of valid data
if aux_vars is not None:
valid_mask = np.logical_and.reduce(
(
inlier_mask,
np.isfinite(ref_elev),
np.isfinite(tba_elev),
*(np.isfinite(var) for var in aux_vars.values()),
)
)
else:
valid_mask = np.logical_and.reduce((inlier_mask, np.isfinite(ref_elev), np.isfinite(tba_elev)))
# Raise errors if all values are NaN after introducing masks from the variables
# (Others are already checked in pre-processing of Coreg.fit())
# Perform subsampling
sub_mask = _get_subsample_on_valid_mask(params_random=params_random, valid_mask=valid_mask)
# For one raster and one point cloud
else:
# Interpolate inlier mask and bias vars at point coordinates
pts_elev: gpd.GeoDataFrame = ref_elev if isinstance(ref_elev, gpd.GeoDataFrame) else tba_elev
rst_elev: NDArrayf = ref_elev if not isinstance(ref_elev, gpd.GeoDataFrame) else tba_elev
# Remove non-finite values from point dataset
pts_elev = pts_elev[np.isfinite(pts_elev[z_name].values)]
# Get coordinates
pts = (pts_elev.geometry.x.values, pts_elev.geometry.y.values)
# Get valid mask ahead of subsampling to have the exact number of requested subsamples
if aux_vars is not None:
valid_mask = np.logical_and.reduce(
(inlier_mask, np.isfinite(rst_elev), *(np.isfinite(var) for var in aux_vars.values()))
)
else:
valid_mask = np.logical_and.reduce((inlier_mask, np.isfinite(rst_elev)))
# Convert inlier mask to points to be able to determine subsample later
# The location needs to be surrounded by inliers, use floor to get 0 for at least one outlier
# Interpolates boolean mask as integers
# TODO: Create a function in GeoUtils that can compute the valid boolean mask of an interpolation without
# having to convert data to float32
valid_mask = valid_mask.astype(np.float32)
valid_mask[valid_mask == 0] = np.nan
valid_mask = np.isfinite(
_interp_points(array=valid_mask, transform=transform, points=pts, area_or_point=area_or_point)
)
# If there is a subsample, it needs to be done now on the point dataset to reduce later calculations
sub_mask = _get_subsample_on_valid_mask(params_random=params_random, valid_mask=valid_mask)
return sub_mask
def _subsample_on_mask(
ref_elev: NDArrayf | gpd.GeoDataFrame,
tba_elev: NDArrayf | gpd.GeoDataFrame,
aux_vars: None | dict[str, NDArrayf],
sub_mask: NDArrayb,
transform: rio.transform.Affine,
area_or_point: Literal["Area", "Point"] | None,
z_name: str,
return_coords: bool = False,
) -> tuple[NDArrayf, NDArrayf, None | dict[str, NDArrayf], None | tuple[NDArrayf, NDArrayf]]:
"""
Perform subsampling on mask for raster-raster or point-raster datasets on valid points of all inputs (including
potential auxiliary variables).
Returns 1D arrays of subsampled inputs: reference elevation, to-be-aligned elevation and auxiliary variables
(in dictionary), and (optionally) tuple of X/Y coordinates.
"""
# For two rasters
if isinstance(ref_elev, np.ndarray) and isinstance(tba_elev, np.ndarray):
# Subsample all datasets with the mask
sub_ref = ref_elev[sub_mask]
sub_tba = tba_elev[sub_mask]
if aux_vars is not None:
sub_bias_vars = {}
for var in aux_vars.keys():
sub_bias_vars[var] = aux_vars[var][sub_mask]
else:
sub_bias_vars = None
# Return coordinates if required
if return_coords:
coords = _coords(transform=transform, shape=ref_elev.shape, area_or_point=area_or_point)
sub_coords = (coords[0][sub_mask], coords[1][sub_mask])
else:
sub_coords = None
# For one raster and one point cloud
else:
# Identify which dataset is point or raster
pts_elev: gpd.GeoDataFrame = ref_elev if isinstance(ref_elev, gpd.GeoDataFrame) else tba_elev
rst_elev: NDArrayf = ref_elev if not isinstance(ref_elev, gpd.GeoDataFrame) else tba_elev
# Remove invalid points
pts_elev = pts_elev[np.isfinite(pts_elev[z_name].values)]
# Subsample point coordinates
pts = (pts_elev.geometry.x.values, pts_elev.geometry.y.values)
pts = (pts[0][sub_mask], pts[1][sub_mask])
# Interpolate raster array to the subsample point coordinates
# Convert ref or tba depending on which is the point dataset
sub_rst = _interp_points(array=rst_elev, transform=transform, points=pts, area_or_point=area_or_point)
sub_pts = pts_elev[z_name].values[sub_mask]
# Assign arrays depending on which one is the reference
if isinstance(ref_elev, gpd.GeoDataFrame):
sub_ref = sub_pts
sub_tba = sub_rst
else:
sub_ref = sub_rst
sub_tba = sub_pts
# Interpolate arrays of bias variables to the subsample point coordinates
if aux_vars is not None:
sub_bias_vars = {}
for var in aux_vars.keys():
sub_bias_vars[var] = _interp_points(
array=aux_vars[var], transform=transform, points=pts, area_or_point=area_or_point
)
else:
sub_bias_vars = None
# Return coordinates if required
if return_coords:
sub_coords = pts
else:
sub_coords = None
return sub_ref, sub_tba, sub_bias_vars, sub_coords
@overload
def _preprocess_pts_rst_subsample(
params_random: InRandomDict,
ref_elev: NDArrayf | gpd.GeoDataFrame,
tba_elev: NDArrayf | gpd.GeoDataFrame,
inlier_mask: NDArrayb,
transform: rio.transform.Affine,
crs: rio.crs.CRS,
area_or_point: Literal["Area", "Point"] | None,
z_name: str,
aux_vars: None | dict[str, NDArrayf] = None,
*,
return_coords: Literal[False] = False,
) -> tuple[NDArrayf, NDArrayf, None | dict[str, NDArrayf], None]: ...
@overload
def _preprocess_pts_rst_subsample(
params_random: InRandomDict,
ref_elev: NDArrayf | gpd.GeoDataFrame,
tba_elev: NDArrayf | gpd.GeoDataFrame,
inlier_mask: NDArrayb,
transform: rio.transform.Affine,
crs: rio.crs.CRS,
area_or_point: Literal["Area", "Point"] | None,
z_name: str,
aux_vars: None | dict[str, NDArrayf] = None,
*,
return_coords: Literal[True],
) -> tuple[NDArrayf, NDArrayf, None | dict[str, NDArrayf], tuple[NDArrayf, NDArrayf]]: ...
def _preprocess_pts_rst_subsample(
params_random: InRandomDict,
ref_elev: NDArrayf | gpd.GeoDataFrame,
tba_elev: NDArrayf | gpd.GeoDataFrame,
inlier_mask: NDArrayb,
transform: rio.transform.Affine, # Never None thanks to Coreg.fit() pre-process
crs: rio.crs.CRS, # Never None thanks to Coreg.fit() pre-process
area_or_point: Literal["Area", "Point"] | None,
z_name: str,
aux_vars: None | dict[str, NDArrayf] = None,
return_coords: bool = False,
) -> tuple[NDArrayf, NDArrayf, None | dict[str, NDArrayf], None | tuple[NDArrayf, NDArrayf]]:
"""
Pre-process raster-raster or point-raster datasets into 1D arrays subsampled at the same points
(and interpolated in the case of point-raster input).
Return 1D arrays of reference elevation, to-be-aligned elevation and dictionary of 1D arrays of auxiliary variables
at subsampled points.
"""
# Get subsample mask (a 2D array for raster-raster, a 1D array of length the point data for point-raster)
sub_mask = _get_subsample_mask_pts_rst(
params_random=params_random,
ref_elev=ref_elev,
tba_elev=tba_elev,
inlier_mask=inlier_mask,
transform=transform,
area_or_point=area_or_point,
z_name=z_name,
aux_vars=aux_vars,
)
# Perform subsampling on mask for all inputs
sub_ref, sub_tba, sub_bias_vars, sub_coords = _subsample_on_mask(
ref_elev=ref_elev,
tba_elev=tba_elev,
aux_vars=aux_vars,
sub_mask=sub_mask,
transform=transform,
area_or_point=area_or_point,
z_name=z_name,
return_coords=return_coords,
)
# Return 1D arrays of subsampled points at the same location
return sub_ref, sub_tba, sub_bias_vars, sub_coords
@overload
def _bin_or_and_fit_nd(
fit_or_bin: Literal["fit"],
params_fit_or_bin: InFitOrBinDict,
values: NDArrayf,
bias_vars: None | dict[str, NDArrayf] = None,
weights: None | NDArrayf = None,
**kwargs: Any,
) -> tuple[None, tuple[NDArrayf, Any]]: ...
@overload
def _bin_or_and_fit_nd(
fit_or_bin: Literal["bin"],
params_fit_or_bin: InFitOrBinDict,
values: NDArrayf,
bias_vars: None | dict[str, NDArrayf] = None,
weights: None | NDArrayf = None,
**kwargs: Any,
) -> tuple[pd.DataFrame, None]: ...
@overload
def _bin_or_and_fit_nd(
fit_or_bin: Literal["bin_and_fit"],
params_fit_or_bin: InFitOrBinDict,
values: NDArrayf,
bias_vars: None | dict[str, NDArrayf] = None,
weights: None | NDArrayf = None,
**kwargs: Any,
) -> tuple[pd.DataFrame, tuple[NDArrayf, Any]]: ...
def _bin_or_and_fit_nd(
fit_or_bin: Literal["fit", "bin", "bin_and_fit"],
params_fit_or_bin: InFitOrBinDict,
values: NDArrayf,
bias_vars: None | dict[str, NDArrayf] = None,
weights: None | NDArrayf = None,
**kwargs: Any,
) -> tuple[pd.DataFrame | None, tuple[NDArrayf, Any] | None]:
"""
Generic binning and/or fitting method to model values along N variables for a coregistration/correction,
used for all affine and bias-correction subclasses. Expects either 2D arrays for rasters, or 1D arrays for
points.
:param params_fit_or_bin: Dictionary of parameters for fitting and/or binning (bias variable names,
optimizer, bin sizes, etc), see FitOrBinDict for details.
:param values: Valid values to bin or fit.
:param bias_vars: Auxiliary variables for certain bias correction classes, as raster or arrays.
:param weights: Array of weights for the coregistration.
"""
if fit_or_bin is None:
raise ValueError("This function should not be called for methods not supporting fit_or_bin logic.")
# This is called by subclasses, so the bias_var should always be defined
if bias_vars is None:
raise ValueError("At least one `bias_var` should be passed to the fitting function, got None.")
# Check number of variables
nd = params_fit_or_bin["nd"]
if nd is not None and len(bias_vars) != nd:
raise ValueError(
"A number of {} variable(s) has to be provided through the argument 'bias_vars', "
"got {}.".format(nd, len(bias_vars))
)
# If bias var names were explicitly passed at instantiation, check that they match the one from the dict
if params_fit_or_bin["bias_var_names"] is not None:
if not sorted(bias_vars.keys()) == sorted(params_fit_or_bin["bias_var_names"]):
raise ValueError(
"The keys of `bias_vars` do not match the `bias_var_names` defined during "
"instantiation: {}.".format(params_fit_or_bin["bias_var_names"])
)
# Get number of variables
nd = len(bias_vars)
# Remove random state for keyword argument if its value is not in the optimizer function
if fit_or_bin in ["fit", "bin_and_fit"]:
fit_func_args = inspect.getfullargspec(params_fit_or_bin["fit_optimizer"]).args
if "random_state" not in fit_func_args and "random_state" in kwargs:
kwargs.pop("random_state")
# We need to sort the bin sizes in the same order as the bias variables if a dict is passed for bin_sizes
if fit_or_bin in ["bin", "bin_and_fit"]:
if isinstance(params_fit_or_bin["bin_sizes"], dict):
var_order = list(bias_vars.keys())
# Declare type to write integer or tuple to the variable
bin_sizes: int | tuple[int, ...] | tuple[NDArrayf, ...] = tuple(
np.array(params_fit_or_bin["bin_sizes"][var]) for var in var_order
)
# Otherwise, write integer directly
else:
bin_sizes = params_fit_or_bin["bin_sizes"]
# Option 1: Run fit and save optimized function parameters
if fit_or_bin == "fit":
logging.debug(
"Estimating alignment along variables %s by fitting with function %s.",
", ".join(list(bias_vars.keys())),
params_fit_or_bin["fit_func"].__name__,
)
results = params_fit_or_bin["fit_optimizer"](
f=params_fit_or_bin["fit_func"],
xdata=np.array([var.flatten() for var in bias_vars.values()]).squeeze(),
ydata=values.flatten(),
sigma=weights.flatten() if weights is not None else None,
absolute_sigma=True,
**kwargs,
)
df = None
# Option 2: Run binning and save dataframe of result
elif fit_or_bin == "bin":
logging.debug(
"Estimating alignment along variables %s by binning with statistic %s.",
", ".join(list(bias_vars.keys())),
params_fit_or_bin["bin_statistic"].__name__,
)
df = nd_binning(
values=values,
list_var=list(bias_vars.values()),
list_var_names=list(bias_vars.keys()),
list_var_bins=bin_sizes,
statistics=(params_fit_or_bin["bin_statistic"], "count"),
)
results = None
# Option 3: Run binning, then fitting, and save both results
else:
logging.debug(
"Estimating alignment along variables %s by binning with statistic %s and then fitting with function %s.",
", ".join(list(bias_vars.keys())),
params_fit_or_bin["bin_statistic"].__name__,
params_fit_or_bin["fit_func"].__name__,
)
df = nd_binning(
values=values,
list_var=list(bias_vars.values()),
list_var_names=list(bias_vars.keys()),
list_var_bins=bin_sizes,
statistics=(params_fit_or_bin["bin_statistic"], "count"),
)
# Now, we need to pass this new data to the fitting function and optimizer
# We use only the N-D binning estimates (maximum dimension, equal to length of variable list)
df_nd = df[df.nd == len(bias_vars)]
# We get the middle of bin values for variable, and statistic for the diff
new_vars = [pd.IntervalIndex(df_nd[var_name]).mid.values for var_name in bias_vars.keys()]
new_diff = df_nd[params_fit_or_bin["bin_statistic"].__name__].values
# TODO: pass a new sigma based on "count" and original sigma (and correlation?)?
# sigma values would have to be binned above also
# Valid values for the binning output
ind_valid = np.logical_and.reduce((np.isfinite(new_diff), *(np.isfinite(var) for var in new_vars)))
if np.all(~ind_valid):
raise ValueError("Only NaN values after binning, did you pass the right bin edges?")
results = params_fit_or_bin["fit_optimizer"](
f=params_fit_or_bin["fit_func"],
xdata=np.array([var[ind_valid].flatten() for var in new_vars]).squeeze(),
ydata=new_diff[ind_valid].flatten(),
sigma=weights[ind_valid].flatten() if weights is not None else None,
absolute_sigma=True,
**kwargs,
)
logging.debug("%dD bias estimated.", nd)
return df, results
###############################################
# Affine matrix manipulation and transformation
###############################################
def _check_matrix(matrix: NDArrayf, atol: float = 10e-8, warn_failure_reason: bool = False) -> bool:
"""
Check affine matrix is valid.
:param matrix: Input affine matrix.
:param atol: Absolute tolerance for check.
:param warn_failure_reason: Whether to warn of reason of check failure.
:return: True or False.
"""
# Is affine, shape should be 4x4 and last row must be [0, 0, 0, 1]
is_affine = matrix.shape == (4, 4) and np.allclose(matrix[3], [0, 0, 0, 1], atol=atol)
# The rotation part should be invertible
R = matrix[:3, :3]
is_invertible = abs(np.linalg.det(R)) > atol
# The matrix should be finite
is_finite = all(np.isfinite(matrix))
# Complete validity
checks = np.array([is_affine, is_invertible, is_finite])
check_names = ["Not affine shape (4x4 with last row of [0, 0, 0, 1])", "Not invertible", "Not finite"]
complete_validitiy = all(checks)
if not complete_validitiy and warn_failure_reason:
where_fail = np.nonzero(~np.array(checks))[0]
warnings.warn(
category=UserWarning, message=f"Validity check failed: {', '.join([check_names[w] for w in where_fail])}."
)
return complete_validitiy
def _make_matrix_valid(matrix: NDArrayf) -> NDArrayf:
"""
Make affine matrix valid given numerical imprecisions.
:param matrix: Input affine matrix.
:return: Valid matrix.
"""
# Copy matrix
T = np.asarray(matrix).copy()
# Enforce last row
T[3, :] = [0, 0, 0, 1]
# Orthogonalize rotation
U, _, Vt = np.linalg.svd(T[:3, :3])
R_ortho = U @ Vt
# Enforce right-handed system
if np.linalg.det(R_ortho) < 0:
U[:, -1] *= -1
R_ortho = U @ Vt
T[:3, :3] = R_ortho
return T
def _euler_to_matrix(alpha1: float, alpha2: float, alpha3: float) -> NDArrayf:
"""
Extrinsic Euler angles to affine matrix.
:param alpha1: Angle around X in radians.
:param alpha2: Angle around Y in radians.
:param alpha3: Angle around Z in radians.
:return: Rotation matrix.
"""
def Rx(a: float) -> NDArrayf:
ca, sa = np.cos(a), np.sin(a)
return np.array(
[
[1, 0, 0],
[0, ca, -sa],
[0, sa, ca],
]
)
def Ry(a: float) -> NDArrayf:
ca, sa = np.cos(a), np.sin(a)
return np.array(
[
[ca, 0, sa],
[0, 1, 0],
[-sa, 0, ca],
]
)
def Rz(a: float) -> NDArrayf:
ca, sa = np.cos(a), np.sin(a)
return np.array(
[
[ca, -sa, 0],
[sa, ca, 0],
[0, 0, 1],
]
)
return Rz(alpha3) @ Ry(alpha2) @ Rx(alpha1)
def _matrix_to_euler(rotation_matrix: NDArrayf, atol: float = 10e-8) -> tuple[float, float, float]:
"""
Affine matrix to extrinsic Euler angles.
:param rotation_matrix: Rotation matrix.
:return: Euler extrinsic angles in radians (rotations about X, Y and Z).
"""
if not np.allclose(rotation_matrix.T @ rotation_matrix, np.eye(3), atol=atol):
raise ValueError("Matrix is not orthogonal")
if abs(rotation_matrix[2, 0]) < 1 - atol:
beta = -np.arcsin(rotation_matrix[2, 0])
cb = np.cos(beta)
alpha = np.arctan2(rotation_matrix[2, 1] / cb, rotation_matrix[2, 2] / cb)
gamma = np.arctan2(rotation_matrix[1, 0] / cb, rotation_matrix[0, 0] / cb)
# Gimbal lock
else:
beta = np.pi / 2 if rotation_matrix[2, 0] <= -1 else -np.pi / 2
alpha = 0.0
gamma = np.arctan2(-rotation_matrix[0, 1], rotation_matrix[1, 1])
return float(alpha), float(beta), float(gamma)
[docs]
def matrix_from_translations_rotations(
t1: float = 0.0,
t2: float = 0.0,
t3: float = 0.0,
alpha1: float = 0.0,
alpha2: float = 0.0,
alpha3: float = 0.0,
use_degrees: bool = True,
) -> NDArrayf:
"""
Build rigid affine matrix based on 3 translations (unit of coordinates) and 3 rotations (degrees or radians).
The euler rotations use the extrinsic convention.
:param t1: Translation in the X (west-east) direction (unit of coordinates).
:param t2: Translation in the Y (south-north) direction (unit of coordinates).
:param t3: Translation in the Z (vertical) direction (unit of DEM).
:param alpha1: Rotation around the X (west-east) direction.
:param alpha2: Rotation around the Y (south-north) direction.
:param alpha3: Rotation around the Z (vertical) direction.
:param use_degrees: Whether to use degrees for input rotations, otherwise radians.
:raises ValueError: If the given translation or rotations contained invalid values.
:return: Rigid affine matrix of transformation.
"""
# Initialize diagonal matrix
matrix = np.eye(4)
# Convert euler angles to rotation matrix
e = np.array([alpha1, alpha2, alpha3])
# If angles were given in degrees
if use_degrees:
e = np.deg2rad(e)
rot_matrix = _euler_to_matrix(alpha1=e[0], alpha2=e[1], alpha3=e[2])
# Add rotation matrix, and translations
matrix[0:3, 0:3] = rot_matrix
matrix[:3, 3] = [t1, t2, t3]
return matrix
[docs]
def translations_rotations_from_matrix(
matrix: NDArrayf, return_degrees: bool = True
) -> tuple[float, float, float, float, float, float]:
"""
Extract 3 translations (unit of coordinates) and 3 rotations (degrees or radians) from rigid affine matrix.
The extracted euler rotations use the extrinsic convention.
:param matrix: Rigid affine matrix of transformation.
:param return_degrees: Whether to return rotations in degrees, otherwise radians.
:return: Translations in the X, Y and Z direction and rotations around the X, Y and Z directions.
"""
# Extract translations
t1, t2, t3 = matrix[:3, 3]
# Get rotations from affine matrix
rots = _matrix_to_euler(matrix[:3, :3])
if return_degrees:
rots = np.rad2deg(np.array(rots))
# Extract rotations
alpha1, alpha2, alpha3 = rots
return t1, t2, t3, alpha1, alpha2, alpha3
[docs]
def invert_matrix(matrix: NDArrayf, atol: float = 10e-8) -> NDArrayf:
"""
Invert a transformation matrix.
:param matrix: Affine transformation matrix.
:return: Inverted transformation matrix.
"""
if not np.allclose(matrix[3], [0, 0, 0, 1], atol=atol):
raise ValueError("Not affine")
R = matrix[:3, :3]
t = matrix[:3, 3]
if not np.allclose(R.T @ R, np.eye(3), atol=atol):
raise ValueError("Not a rigid transform")
# Make valid before inversion
valid_matrix = _make_matrix_valid(matrix)
R = valid_matrix[:3, :3]
t = valid_matrix[:3, 3]
Tinv = np.eye(4)
Tinv[:3, :3] = R.T
Tinv[:3, 3] = -R.T @ t
return Tinv
def _apply_matrix_pts_mat(
mat: NDArrayf,
matrix: NDArrayf,
centroid: tuple[float, float, float] | None = None,
invert: bool = False,
) -> NDArrayf:
"""Apply matrix to points as a 3D array with 3D array output (to improve speed in some functions)."""
# Invert matrix if required
if invert:
matrix = invert_matrix(matrix)
# First, get 4xN array, adding a column of ones for translations during matrix multiplication
points = np.concatenate((mat, np.ones((1, mat.shape[1]))))
# Temporarily subtract centroid coordinates
if centroid is not None:
points[:3, :] -= np.array(centroid)[:, None]
# Transform using matrix multiplication, and get only the first three columns
transformed_points = (matrix @ points)[:3, :]
# Add back centroid coordinates
if centroid is not None:
transformed_points += np.array(centroid)[:, None]
return transformed_points
def _apply_matrix_pts_arr(
x: NDArrayf,
y: NDArrayf,
z: NDArrayf,
matrix: NDArrayf,
centroid: tuple[float, float, float] | None = None,
invert: bool = False,
) -> tuple[NDArrayf, NDArrayf, NDArrayf]:
"""Apply matrix to points as arrays with array outputs (to improve speed in some functions)."""
# Invert matrix if required
if invert:
matrix = invert_matrix(matrix)
# First, get 4xN array, adding a column of ones for translations during matrix multiplication
points = np.vstack([x, y, z, np.ones(len(x))])
# Temporarily subtract centroid coordinates
if centroid is not None:
points[:3, :] -= np.array(centroid)[:, None]
# Transform using matrix multiplication, and get only the first three columns
transformed_points = (matrix @ points)[:3, :]
# Add back centroid coordinates
if centroid is not None:
transformed_points += np.array(centroid)[:, None]
return transformed_points[0, :], transformed_points[1, :], transformed_points[2, :]
def _apply_matrix_pts(
epc: gpd.GeoDataFrame,
matrix: NDArrayf,
invert: bool = False,
centroid: tuple[float, float, float] | None = None,
z_name: str = "z",
) -> gpd.GeoDataFrame:
"""
Apply a 3D affine transformation matrix to a 3D elevation point cloud.
:param epc: Elevation point cloud.
:param matrix: Affine (4x4) transformation matrix to apply to the DEM.
:param invert: Whether to invert the transformation matrix.
:param centroid: The X/Y/Z transformation centroid. Irrelevant for pure translations.
Defaults to the midpoint (Z=0).
:param z_name: Column name to use as elevation, only for point elevation data passed as geodataframe.
:return: Transformed elevation point cloud.
"""
# Apply transformation to X/Y/Z arrays
tx, ty, tz = _apply_matrix_pts_arr(
x=epc.geometry.x.values,
y=epc.geometry.y.values,
z=epc[z_name].values,
matrix=matrix,
centroid=centroid,
invert=invert,
)
# Finally, transform back to a new GeoDataFrame
transformed_epc = gpd.GeoDataFrame(
geometry=gpd.points_from_xy(x=tx, y=ty, crs=epc.crs),
data={z_name: tz},
)
return transformed_epc
def _iterate_affine_regrid_small_rotations(
dem: NDArrayf,
transform: rio.transform.Affine,
matrix: NDArrayf,
centroid: tuple[float, float, float] | None = None,
resampling: Literal["nearest", "linear", "cubic", "quintic"] = "linear",
) -> tuple[NDArrayf, rio.transform.Affine]:
"""
Iterative process to find the best reprojection of affine transformation for small rotations.
Faster than regridding point cloud by triangulation of points (for instance with scipy.interpolate.griddata).
"""
# Convert DEM to elevation point cloud, keeping all exact grid coordinates X/Y even for NaNs
dem_rst = gu.Raster.from_array(dem, transform=transform, crs=None, nodata=99999)
epc = dem_rst.to_pointcloud(data_column_name="z", skip_nodata=False).ds
# Exact affine transform of elevation point cloud (which yields irregular coordinates in 2D)
tz0 = _apply_matrix_pts_arr(
x=epc.geometry.x.values, y=epc.geometry.y.values, z=epc.z.values, matrix=matrix, centroid=centroid
)[2]
# We need to find the elevation Z of a transformed DEM at the exact grid coordinates X,Y
# Which means we need to find coordinates X',Y',Z' of the original DEM that, after the exact affine transform,
# fall exactly on regular X,Y coordinates
# 1/ The elevation of the original DEM, Z', is simply a 2D interpolator function of X',Y' (bilinear, typically)
# (We create the interpolator only once here for computational speed, instead of using Raster.interp_points)
xycoords = dem_rst.coords(grid=False)
z_interp = scipy.interpolate.RegularGridInterpolator(
points=(np.flip(xycoords[1], axis=0), xycoords[0]), values=dem, method=resampling, bounds_error=False
)
# 2/ As a first guess of a transformed DEM elevation Z near the grid coordinates, we initialize with the elevations
# of the nearest point from the transformed elevation point cloud
# OLD METHOD
# (Longest step computationally)
# with warnings.catch_warnings():
# warnings.filterwarnings("ignore", category=UserWarning, message="Geometry is in a geographic CRS.*")
# nearest = gpd.sjoin_nearest(epc, trans_epc)
#
# # In case several points are found at exactly the same distance, take the mean of their elevations
# new_z = nearest.groupby(by=nearest.index)["z_left"].mean().values
# NEW METHOD: Use the transformed elevation instead of searching for a nearest neighbour,
# is close enough for small rotations! (and only creates a couple more iterations instead of a full search)
new_z = tz0
# 3/ We then iterate between two steps until convergence:
# a/ Use the Z guess to derive invert affine transform X',Y' coordinates for the original DEM,
# b/ Interpolate Z' at new coordinates X',Y' on the original DEM, and apply affine transform to get updated Z guess
# Start with full array of X/Y regular coordinates (subset during iterations to improve computational efficiency)
x = epc.geometry.x.values
y = epc.geometry.y.values
# Initialize output z array, and array to store points that have converged
zfinal = np.ones(len(x), dtype=dem.dtype)
ind_converged = np.zeros(len(x), dtype=bool)
# For small rotations, and large DEMs (elevation range smaller than the DEM extent), this converges fast
max_niter = 20 # Maximum iteration number
niter_check = 5 # Number of iterations between residual checks
tolerance = 10 ** (-4) # Tolerance in X/Y relative to resolution of X/Y
res_x = dem_rst.res[0] # Resolution in X
res_y = dem_rst.res[1] # Resolution in Y
niter = 1 # Starting iteration
while niter < max_niter:
# Invert X,Y (exact grid coordinates) with Z guess to find X',Y' coordinates on original DEM
tx, ty = _apply_matrix_pts_arr(x=x, y=y, z=new_z, matrix=matrix, invert=True, centroid=centroid)[:2]
# Interpolate original DEM at X', Y' to get Z', and convert to point cloud
tz = z_interp((ty, tx))
# Transform to see if we fall back on our feet (on the regular grid), or if we need to iterate more
x0, y0, z0 = _apply_matrix_pts_arr(x=tx, y=ty, z=tz, matrix=matrix, centroid=centroid)
# Only check residuals after first iteration (to remove NaNs) then every 5 iterations to reduce computing time
if niter == 1 or niter == niter_check:
# Compute difference between exact grid coordinates and current coordinates, and stop if tolerance reached
diff_x = x0 - x
diff_y = y0 - y
logging.debug(
"Residual check at iteration number %d:" "\n Mean diff x: %f" "\n Mean diff y: %f",
niter,
np.nanmean(np.abs(diff_x)),
np.nanmean(np.abs(diff_y)),
)
# Get index of points below tolerance in both X/Y for this subsample (all points before convergence update)
# Nodata values are considered having converged
subind_diff_x = np.logical_or(np.abs(diff_x) < (tolerance * res_x), ~np.isfinite(diff_x))
subind_diff_y = np.logical_or(np.abs(diff_y) < (tolerance * res_y), ~np.isfinite(diff_y))
subind_converged = np.logical_and(subind_diff_x, subind_diff_y)
logging.debug(
" Points not within tolerance: %d for X; %d for Y",
np.count_nonzero(~subind_diff_x),
np.count_nonzero(~subind_diff_y),
)
# If all points left are below convergence, update Z one final time and stop here
if all(subind_converged):
zfinal[~ind_converged] = z0
break
# Otherwise, save Z for new converged points and keep only not converged in next iterations (for speed)
else:
zfinal[~ind_converged] = z0
x = x[~subind_converged]
y = y[~subind_converged]
z0 = z0[~subind_converged]
# Otherwise, for this check, update convergence status for points not having converged yet
ind_converged[~ind_converged] = subind_converged
# If another iteration is required, update Z guess and increment
new_z = z0
niter += 1
# 4/ Write the regular-grid point cloud back into a raster
epc.z = zfinal # We just replace the Z of the original grid to ensure exact coordinates
transformed_dem = dem_rst.from_pointcloud_regular(
epc, transform=transform, shape=dem.shape, data_column_name="z", nodata=-99999
)
return transformed_dem.data.filled(np.nan), transform
def _apply_matrix_rst(
dem: NDArrayf,
transform: rio.transform.Affine,
matrix: NDArrayf,
invert: bool = False,
centroid: tuple[float, float, float] | None = None,
resampling: Literal["nearest", "linear", "cubic", "quintic"] = "linear",
force_regrid_method: Literal["iterative", "griddata"] | None = None,
) -> tuple[NDArrayf, rio.transform.Affine]:
"""
Apply a 3D affine transformation matrix to a 2.5D DEM.
See details in description of apply_matrix().
:param dem: DEM to transform.
:param transform: Geotransform of the DEM.
:param matrix: Affine (4x4) transformation matrix to apply to the DEM.
:param invert: Whether to invert the transformation matrix.
:param centroid: The X/Y/Z transformation centroid. Irrelevant for pure translations.
Defaults to the midpoint (Z=0).
:param resampling: Point interpolation method, one of 'nearest', 'linear', 'cubic', or 'quintic'. For more
information, see scipy.ndimage.map_coordinates and scipy.interpolate.interpn. Default is linear.
:param force_regrid_method: Force re-gridding method to convert 3D point cloud to 2.5 DEM, only for testing.
:returns: Transformed DEM, Transform.
"""
# Invert matrix if required
if invert:
matrix = invert_matrix(matrix)
# Check DEM has valid values
if np.count_nonzero(np.isfinite(dem)) == 0:
raise ValueError("Input DEM has all nans.")
shift_z_only_matrix = np.diag(np.ones(4, float))
shift_z_only_matrix[2, 3] = matrix[2, 3]
shift_only_matrix = np.diag(np.ones(4, float))
shift_only_matrix[:3, 3] = matrix[:3, 3]
# 1/ Check if the matrix only contains a Z correction, in that case only shift the DEM values by the vertical shift
if np.array_equal(shift_z_only_matrix, matrix) and force_regrid_method is None:
return dem + matrix[2, 3], transform
# 2/ Check if the matrix contains only translations, in that case only shift the DEM only by translation
if np.array_equal(shift_only_matrix, matrix) and force_regrid_method is None:
new_transform = _translate(transform, xoff=matrix[0, 3], yoff=matrix[1, 3])
return dem + matrix[2, 3], new_transform
# 3/ If matrix contains only small rotations (less than 20 degrees), use the fast iterative reprojection
rotations = translations_rotations_from_matrix(matrix)[3:]
if all(np.abs(rot) < 20 for rot in rotations) and force_regrid_method is None or force_regrid_method == "iterative":
new_dem, transform = _iterate_affine_regrid_small_rotations(
dem=dem, transform=transform, matrix=matrix, centroid=centroid, resampling=resampling
)
return new_dem, transform
# 4/ Otherwise, use a delauney triangulation interpolation of the transformed point cloud
# Convert DEM to elevation point cloud, keeping all exact grid coordinates X/Y even for NaNs
dem_rst = gu.Raster.from_array(dem, transform=transform, crs=None, nodata=99999)
epc = dem_rst.to_pointcloud(data_column_name="z").ds
trans_epc = _apply_matrix_pts(epc, matrix=matrix, centroid=centroid)
new_dem = _grid_pointcloud(
trans_epc, grid_coords=dem_rst.coords(grid=False), data_column_name="z", resampling=resampling
)[0]
return new_dem, transform
@overload
def _reproject_horizontal_shift_samecrs(
raster_arr: NDArrayf,
src_transform: rio.transform.Affine,
dst_transform: rio.transform.Affine = None,
*,
return_interpolator: Literal[False] = False,
resampling: Literal["nearest", "linear", "cubic", "quintic", "slinear", "pchip", "splinef2d"] = "linear",
) -> NDArrayf: ...
@overload
def _reproject_horizontal_shift_samecrs(
raster_arr: NDArrayf,
src_transform: rio.transform.Affine,
dst_transform: rio.transform.Affine = None,
*,
return_interpolator: Literal[True],
resampling: Literal["nearest", "linear", "cubic", "quintic", "slinear", "pchip", "splinef2d"] = "linear",
) -> Callable[[tuple[NDArrayf, NDArrayf]], NDArrayf]: ...
def _reproject_horizontal_shift_samecrs(
raster_arr: NDArrayf,
src_transform: rio.transform.Affine,
dst_transform: rio.transform.Affine = None,
return_interpolator: bool = False,
resampling: Literal["nearest", "linear", "cubic", "quintic", "slinear", "pchip", "splinef2d"] = "linear",
) -> NDArrayf | Callable[[tuple[NDArrayf, NDArrayf]], NDArrayf]:
"""
Reproject a raster only for a horizontal shift (transform update) in the same CRS.
This function exists independently of Raster.reproject() because Rasterio has unexplained reprojection issues
that can create non-negligible sub-pixel shifts that should be crucially avoided for coregistration.
See https://github.com/rasterio/rasterio/issues/2052#issuecomment-2078732477.
Here we use SciPy interpolation instead, modified for nodata propagation in geoutils.interp_points().
"""
# We are reprojecting the raster array relative to itself without changing its pixel interpretation, so we can
# force any pixel interpretation (area_or_point) without it having any influence on the result, here "Area"
if not return_interpolator:
coords_dst = _coords(transform=dst_transform, area_or_point="Area", shape=raster_arr.shape)
# Flatten the arrays (only 1D supported in rowcol/xy after Rasterio 1.4)
coords_dst = (coords_dst[0].ravel(), coords_dst[1].ravel())
# If we just want the interpolator, we don't need to coordinates of destination points
else:
coords_dst = None
output = _interp_points(
array=raster_arr,
area_or_point="Area",
transform=src_transform,
points=coords_dst,
method=resampling,
return_interpolator=return_interpolator,
)
# Reshape output
if coords_dst is not None:
output = output.reshape(np.shape(raster_arr))
return output
@overload
def apply_matrix(
elev: NDArrayf,
matrix: NDArrayf,
invert: bool = False,
centroid: tuple[float, float, float] | None = None,
resample: bool = True,
resampling: Literal["nearest", "linear", "cubic", "quintic"] = "linear",
transform: rio.transform.Affine = None,
z_name: str = "z",
**kwargs: Any,
) -> tuple[NDArrayf, affine.Affine]: ...
@overload
def apply_matrix(
elev: gu.Raster | gpd.GeoDataFrame,
matrix: NDArrayf,
invert: bool = False,
centroid: tuple[float, float, float] | None = None,
resample: bool = True,
resampling: Literal["nearest", "linear", "cubic", "quintic"] = "linear",
transform: rio.transform.Affine = None,
z_name: str = "z",
**kwargs: Any,
) -> gu.Raster | gpd.GeoDataFrame: ...
[docs]
def apply_matrix(
elev: gu.Raster | NDArrayf | gpd.GeoDataFrame,
matrix: NDArrayf,
invert: bool = False,
centroid: tuple[float, float, float] | None = None,
resample: bool = True,
resampling: Literal["nearest", "linear", "cubic", "quintic"] = "linear",
transform: rio.transform.Affine = None,
z_name: str = "z",
**kwargs: Any,
) -> tuple[NDArrayf, affine.Affine] | gu.Raster | gpd.GeoDataFrame:
"""
Apply a 3D affine transformation matrix to a 3D elevation point cloud or 2.5D DEM.
For an elevation point cloud, the transformation is exact.
For a DEM, it requires re-gridding because the affine-transformed point cloud of the DEM does not fall onto a
regular grid anymore (except if the affine transformation only has translations). For this, this function uses the
three following methods:
1. For transformations with only translations, the transform is updated and vertical shift added to the array,
2. For transformations with a small rotation (20 degrees or less for all axes), this function maps which 2D
point coordinates will fall back exactly onto the original DEM grid coordinates after affine transformation by
searching iteratively using the invert affine transformation and 2D point regular-grid interpolation on the
original DEM (see geoutils.Raster.interp_points, or scipy.interpolate.interpn),
3. For transformations with large rotations (20 degrees or more), scipy.interpolate.griddata is used to
re-grid the irregular affine-transformed 3D point cloud using Delauney triangulation interpolation (slower).
:param elev: Elevation point cloud or DEM to transform, either a 2D array (requires transform) or
geodataframe (requires z_name).
:param matrix: Affine (4x4) transformation matrix to apply to the DEM.
:param invert: Whether to invert the transformation matrix.
:param centroid: The X/Y/Z transformation centroid. Irrelevant for pure translations.
Defaults to the midpoint (Z=0).
:param resample: (For translations) If set to True, will resample output on the translated grid to match the input
transform. Otherwise, only the transform will be updated and no resampling is done.
:param resampling: Point interpolation method, one of 'nearest', 'linear', 'cubic', or 'quintic'. For more
information, see scipy.ndimage.map_coordinates and scipy.interpolate.interpn. Default is linear.
:param transform: Geotransform of the DEM, only for DEM passed as 2D array.
:param z_name: Column name to use as elevation, only for point elevation data passed as geodataframe.
:param kwargs: Keywords passed to _apply_matrix_rst for testing.
:return: Affine transformed elevation point cloud or DEM.
"""
# Apply matrix to elevation point cloud
if isinstance(elev, gpd.GeoDataFrame):
return _apply_matrix_pts(epc=elev, matrix=matrix, invert=invert, centroid=centroid, z_name=z_name)
# Or apply matrix to raster (often requires re-gridding)
else:
# First, we apply the affine matrix for the array/transform
if isinstance(elev, gu.Raster):
transform = elev.transform
dem = elev.data.filled(np.nan)
else:
dem = elev
applied_dem, out_transform = _apply_matrix_rst(
dem=dem,
transform=transform,
matrix=matrix,
invert=invert,
centroid=centroid,
resampling=resampling,
**kwargs,
)
# Then, if resample is True, we reproject the DEM from its out_transform onto the transform
if resample:
applied_dem = _reproject_horizontal_shift_samecrs(
applied_dem, src_transform=out_transform, dst_transform=transform, resampling=resampling
)
out_transform = transform
# We return a raster if input was a raster
if isinstance(elev, gu.Raster):
applied_dem = gu.Raster.from_array(applied_dem, out_transform, elev.crs, elev.nodata)
return applied_dem
return applied_dem, out_transform
###########################################
# Generic coregistration processing classes
###########################################
class NotImplementedCoregFit(NotImplementedError):
"""
Error subclass for not implemented coregistration fit methods; mainly to differentiate with NotImplementedError
"""
class NotImplementedCoregApply(NotImplementedError):
"""
Error subclass for not implemented coregistration fit methods; mainly to differentiate with NotImplementedError
"""
class InRandomDict(TypedDict, total=False):
"""Keys and types of inputs associated with randomization and subsampling."""
# Subsample size input by user
subsample: int | float
# Random state (for subsampling, but also possibly for some fitting methods)
random_state: int | np.random.Generator | None
class OutRandomDict(TypedDict, total=False):
"""Keys and types of outputs associated with randomization and subsampling."""
# Final subsample size available from valid data
subsample_final: int
class InFitOrBinDict(TypedDict, total=False):
"""Keys and types of inputs associated with binning and/or fitting."""
# Whether to fit, bin or bin then fit
fit_or_bin: Literal["fit", "bin", "bin_and_fit"]
# Fit parameters: function to fit and optimizer
fit_func: Callable[..., NDArrayf]
fit_optimizer: Callable[..., tuple[NDArrayf, Any]]
# TODO: Solve redundancy between optimizer and minimizer (curve_fit or minimize as default?)
# For a minimization problem
fit_minimizer: Callable[..., tuple[NDArrayf, Any]]
fit_loss_func: Callable[[NDArrayf], np.floating[Any]]
# Bin parameters: bin sizes, statistic and apply method
bin_sizes: int | dict[str, int | Iterable[float]]
bin_statistic: Callable[[NDArrayf], np.floating[Any]]
bin_apply_method: Literal["linear", "per_bin"]
# Name of variables, and number of dimensions
bias_var_names: list[str]
nd: int | None
class OutFitOrBinDict(TypedDict, total=False):
"""Keys and types of outputs associated with binning and/or fitting."""
# Optimized parameters for fitted function, and its error
fit_params: NDArrayf
fit_perr: NDArrayf
# Binning dataframe
bin_dataframe: pd.DataFrame
class InIterativeDict(TypedDict, total=False):
"""Keys and types of inputs associated with iterative methods."""
# Maximum number of iterations
max_iterations: int
# Tolerance at which to stop algorithm (unit specified in method)
tolerance: float
class OutIterativeDict(TypedDict, total=False):
"""Keys and types of outputs associated with iterative methods."""
# Iteration at which the algorithm stopped
last_iteration: int
# Tolerances of each iteration until threshold
all_tolerances: list[float]
class InSpecificDict(TypedDict, total=False):
"""Keys and types of inputs associated with specific methods."""
# (Using TerrainBias) Selected terrain attribute
terrain_attribute: str
# (Using DirectionalBias) Angle for directional correction
angle: float
# (Using Deramp) Polynomial order selected for deramping
poly_order: int
# (Using ICP) Method type to compute 3D distances
icp_method: Literal["point-to-point", "point-to-plane"]
# (Using ICP) Picky selection of closest pairs
icp_picky: bool
# (Using CPD) Weight for outlier removal
cpd_weight: float
class OutSpecificDict(TypedDict, total=False):
"""Keys and types of outputs associated with specific methods."""
# (Using multi-order polynomial fit) Best performing polynomial order
best_poly_order: int
# (Using multi-frequency sum of sinusoids fit) Best performing number of frequencies
best_nb_sin_freq: int
class InAffineDict(TypedDict, total=False):
"""Keys and types of inputs associated with affine methods."""
# Estimated initial shift (z currently always equal to zero)
initial_shift: tuple[float, float, float] | None
# Vertical shift reduction function for methods focusing on translation coregistration
vshift_reduc_func: Callable[[NDArrayf], np.floating[Any]]
# Vertical shift activated
apply_vshift: bool
# Apply coregistration method only for translations
only_translation: bool
# Standardize input data for numerics
standardize: bool
class OutAffineDict(TypedDict, total=False):
"""Keys and types of outputs associated with affine methods."""
# Output common to all affine transforms
centroid: tuple[float, float, float]
matrix: NDArrayf
# For translation methods
shift_x: float
shift_y: float
shift_z: float
class InputCoregDict(TypedDict, total=False):
random: InRandomDict
fitorbin: InFitOrBinDict
iterative: InIterativeDict
specific: InSpecificDict
affine: InAffineDict
class OutputCoregDict(TypedDict, total=False):
random: OutRandomDict
fitorbin: OutFitOrBinDict
iterative: OutIterativeDict
specific: OutSpecificDict
affine: OutAffineDict
class CoregDict(TypedDict, total=False):
"""
Defining the type of each possible key in the metadata dictionary of Coreg classes.
The parameter total=False means that the key are not required. In the recent PEP 655 (
https://peps.python.org/pep-0655/) there is an easy way to specific Required or NotRequired for each key, if we
want to change this in the future.
"""
# For a classic coregistration
inputs: InputCoregDict
outputs: OutputCoregDict
# For pipelines and blocks
# TODO: Move out to separate TypedDict?
step_meta: list[Any]
pipeline: list[Any]
CoregType = TypeVar("CoregType", bound="Coreg")
[docs]
class Coreg:
"""
Generic co-registration processing class.
Used to implement methods common to all processing steps (rigid alignment, bias corrections, filtering).
Those are: instantiation, copying and addition (which casts to a Pipeline object).
Made to be subclassed.
"""
_fit_called: bool = False # Flag to check if the .fit() method has been called.
_is_affine: bool | None = None
_is_translation: bool | None = None
_needs_vars: bool = False
_meta: CoregDict
[docs]
def __init__(self, meta: dict[str, Any] | None = None) -> None:
"""Instantiate a generic processing step method."""
# Automatically sort input keys into their appropriate nested level using only the TypedDicts defined
# above which make up the CoregDict altogether
dict_meta = CoregDict(inputs={}, outputs={})
if meta is not None:
# First, we get the typed dictionary keys ("random", "fitorbin", etc),
# this is a typing class so requires to get its keys in __annotations__
list_input_levels = list(InputCoregDict.__annotations__.keys())
# Then the list of keys per level, getting the nested class value for each key (via __forward_arg__)
keys_per_level = [
list(globals()[InputCoregDict.__annotations__[lv].__forward_arg__].__annotations__.keys())
for lv in list_input_levels
]
# Join all keys for input check
all_keys = [k for lv in keys_per_level for k in lv]
for k in meta.keys():
if k not in all_keys:
raise ValueError(
f"Coregistration metadata key {k} is not supported. " f"Should be one of {', '.join(all_keys)}"
)
# Add keys to inputs
for k, v in meta.items():
for i, lv in enumerate(list_input_levels):
# If level does not exist, create it
if lv not in dict_meta["inputs"]:
dict_meta["inputs"].update({lv: {}}) # type: ignore
# If key exist, write and continue
if k in keys_per_level[i]:
dict_meta["inputs"][lv][k] = v # type: ignore
continue
self._meta: CoregDict = dict_meta
def copy(self: CoregType) -> CoregType:
"""Return an identical copy of the class."""
new_coreg = self.__new__(type(self))
# Need a deepcopy for dictionaries, or it would just point towards the copied coreg
new_coreg.__dict__ = {key: copy.deepcopy(value) for key, value in self.__dict__.items()}
return new_coreg
def __add__(self, other: CoregType) -> CoregPipeline:
"""Return a pipeline consisting of self and the other processing function."""
if not isinstance(other, Coreg):
raise ValueError(f"Incompatible add type: {type(other)}. Expected 'Coreg' subclass")
# Cancel possible initial shift(s) in CoregPipeline case
if "affine" in self.meta["inputs"] and "initial_shift" in self.meta["inputs"]["affine"]:
del self.meta["inputs"]["affine"]["initial_shift"]
if "affine" in other.meta["inputs"] and "initial_shift" in other.meta["inputs"]["affine"]:
del other.meta["inputs"]["affine"]["initial_shift"]
return CoregPipeline([self, other])
@property
def is_affine(self) -> bool:
"""Check if the transform be explained by a 3D affine transform."""
# _is_affine is found by seeing if to_matrix() raises an error.
# If this hasn't been done yet, it will be None
if self._is_affine is None:
try: # See if to_matrix() raises an error.
self.to_matrix()
self._is_affine = True
except (AttributeError, ValueError, NotImplementedError):
self._is_affine = False
return self._is_affine
@property
def is_translation(self) -> bool | None:
# If matrix exists in keys, or can be derived from to_matrix(), we conclude
if "matrix" in self._meta["outputs"]["affine"].keys():
matrix = self._meta["outputs"]["affine"]["matrix"]
else:
try:
matrix = self.to_matrix()
# Otherwise we can't yet and return None
except (AttributeError, ValueError, NotImplementedError):
self._is_translation = None
return None
# If the 3x3 rotation sub-matrix is the identity matrix, we have a translation
return np.allclose(matrix[:3, :3], np.diag(np.ones(3)), rtol=10e-3)
@property
def meta(self) -> CoregDict:
"""Metadata dictionary of the coregistration."""
return self._meta
@overload
def info(self, as_str: Literal[False] = ...) -> None: ...
@overload
def info(self, as_str: Literal[True]) -> str: ...
[docs]
def info(self, as_str: bool = False) -> None | str:
"""Summarize information about this coregistration."""
# Define max tabulation: longest name + 2 spaces
tab = np.max([len(v) for v in dict_key_to_str.values()]) + 2
# Get list of existing deepest level keys in this coreg metadata
def recursive_items(dictionary: Mapping[str, Any]) -> Iterable[tuple[str, Any]]:
for key, value in dictionary.items():
if type(value) is dict:
yield from recursive_items(value)
else:
yield (key, value)
existing_deep_keys = [k for k, v in recursive_items(self._meta)]
# Formatting function for key values, rounding up digits for numbers and returning function names
def format_coregdict_values(val: Any, tab: int) -> str:
"""
Format coregdict values for printing.
:param val: Input value.
:param tab: Tabulation (if value is printed on multiple lines).
:return: String representing input value.
"""
# Function to get decimal to round to a certain number of digits relative to magnitude, for floating numbers
def dec_round_to_n(x: float | np.floating[Any], n: int) -> int:
return -int(np.floor(np.log10(np.abs(x)))) + (n - 1)
# Different formatting to string depending on key value type
if isinstance(val, (float, np.floating)):
if np.isfinite(val):
str_val = str(round(val, dec_round_to_n(val, 3)))
else:
str_val = str(val)
elif isinstance(val, np.ndarray):
min_val = np.min(val)
str_val = str(np.round(val, decimals=dec_round_to_n(min_val, 3)))
elif callable(val):
str_val = val.__name__
else:
str_val = str(val)
# Add tabulation if string has a return to line
if "\n" in str_val:
str_val = "\n".ljust(tab).join(str_val.split("\n"))
return str_val
# Sublevels of metadata to show
sublevels = {
"random": "Randomization",
"fitorbin": "Fitting and binning",
"affine": "Affine",
"iterative": "Iterative",
"specific": "Specific",
}
header_str = [
"Generic coregistration information \n",
f" Method: {self.__class__.__name__} \n",
f" Is affine? {self.is_affine} \n",
f" Fit called? {self._fit_called} \n",
]
# Add lines for inputs
inputs_str = [
"Inputs\n",
]
for lk, lv in sublevels.items():
if lk in self._meta["inputs"].keys():
existing_level_keys = [
(k, v) for k, v in self._meta["inputs"][lk].items() if k in existing_deep_keys # type: ignore
]
if len(existing_level_keys) > 0:
inputs_str += [f" {lv}\n"]
inputs_str += [
f" {dict_key_to_str[k]}:".ljust(tab) + f"{format_coregdict_values(v, tab)}\n"
for k, v in existing_level_keys
]
# And for outputs
outputs_str = ["Outputs\n"]
# If dict not empty
if self._meta["outputs"]:
for lk, lv in sublevels.items():
if lk in self._meta["outputs"].keys():
existing_level_keys = [
(k, v) for k, v in self._meta["outputs"][lk].items() if k in existing_deep_keys # type: ignore
]
if len(existing_level_keys) > 0:
outputs_str += [f" {lv}\n"]
outputs_str += [
f" {dict_key_to_str[k]}:".ljust(tab) + f"{format_coregdict_values(v, tab)}\n"
for k, v in existing_level_keys
]
elif not self._fit_called:
outputs_str += [" None yet (fit not called)"]
# Not sure this case can happen, but just in case
else:
outputs_str += [" None"]
# Combine into final string
final_str = header_str + inputs_str + outputs_str
# Return as string or print (default)
if as_str:
return "".join(final_str)
else:
print("".join(final_str))
return None
def _get_subsample_on_valid_mask(self, valid_mask: NDArrayb) -> NDArrayb:
"""
Get mask of values to subsample on valid mask.
:param valid_mask: Raster of valid values (inlier and not nodata).
"""
# Get random parameters
params_random = self._meta["inputs"]["random"]
# Derive subsampling mask
sub_mask = _get_subsample_on_valid_mask(
params_random=params_random,
valid_mask=valid_mask,
)
# Write final subsample to class
self._meta["outputs"]["random"] = {"subsample_final": int(np.count_nonzero(sub_mask))}
return sub_mask
def _preprocess_rst_pts_subsample(
self,
ref_elev: NDArrayf | gpd.GeoDataFrame,
tba_elev: NDArrayf | gpd.GeoDataFrame,
inlier_mask: NDArrayb,
aux_vars: dict[str, NDArrayf] | None = None,
weights: NDArrayf | None = None,
transform: rio.transform.Affine | None = None,
crs: rio.crs.CRS | None = None,
area_or_point: Literal["Area", "Point"] | None = None,
z_name: str = "z",
) -> tuple[NDArrayf, NDArrayf, None | dict[str, NDArrayf]]:
"""
Pre-process raster-raster or point-raster datasets into 1D arrays subsampled at the same points
(and interpolated in the case of point-raster input).
Return 1D arrays of reference elevation, to-be-aligned elevation and dictionary of 1D arrays of auxiliary
variables at subsampled points.
"""
# Get random parameters
params_random: InRandomDict = self._meta["inputs"]["random"]
# Get subsample mask (a 2D array for raster-raster, a 1D array of length the point data for point-raster)
sub_mask = _get_subsample_mask_pts_rst(
params_random=params_random,
ref_elev=ref_elev,
tba_elev=tba_elev,
inlier_mask=inlier_mask,
transform=transform,
area_or_point=area_or_point,
z_name=z_name,
aux_vars=aux_vars,
)
# Perform subsampling on mask for all inputs
sub_ref, sub_tba, sub_bias_vars, _ = _subsample_on_mask(
ref_elev=ref_elev,
tba_elev=tba_elev,
aux_vars=aux_vars,
sub_mask=sub_mask,
transform=transform,
area_or_point=area_or_point,
z_name=z_name,
)
# Write final subsample to class
self._meta["outputs"]["random"] = {"subsample_final": int(np.count_nonzero(sub_mask))}
return sub_ref, sub_tba, sub_bias_vars
[docs]
def fit(
self: CoregType,
reference_elev: NDArrayf | MArrayf | RasterType | gpd.GeoDataFrame | PointCloudType,
to_be_aligned_elev: NDArrayf | MArrayf | RasterType | gpd.GeoDataFrame | PointCloudType,
inlier_mask: NDArrayb | Raster | None = None,
bias_vars: dict[str, NDArrayf | MArrayf | RasterType] | None = None,
weights: NDArrayf | None = None,
subsample: float | int | None = None,
transform: rio.transform.Affine | None = None,
crs: rio.crs.CRS | None = None,
area_or_point: Literal["Area", "Point"] | None = None,
z_name: str | None = None,
random_state: int | np.random.Generator | None = None,
**kwargs: Any,
) -> CoregType: # type: ignore
"""
Estimate the coregistration transform on the given DEMs.
:param reference_elev: Reference elevation, either a DEM or an elevation point cloud.
:param to_be_aligned_elev: To-be-aligned elevation, either a DEM or an elevation point cloud.
:param inlier_mask: Raster or boolean array of areas to include (inliers=True).
:param bias_vars: Auxiliary variables for certain bias correction classes, as raster or arrays.
:param weights: Array of weights for the coregistration.
:param subsample: Subsample the input to increase performance. <1 is parsed as a fraction. >1 is a pixel count.
:param transform: Transform of the reference elevation, only if provided as 2D array.
:param crs: CRS of the reference elevation, only if provided as 2D array.
:param area_or_point: Pixel interpretation of the DEMs, only if provided as 2D arrays.
:param z_name: Column name to use as elevation, only for point elevation data passed as geodataframe.
:param random_state: Random state or seed number to use for calculations (to fix random sampling).
"""
if weights is not None:
raise NotImplementedError("Weights have not yet been implemented")
# Override subsample argument of instantiation if passed to fit
if subsample is not None:
# Check if subsample argument was also defined at instantiation (not default value), and raise warning
argspec = inspect.getfullargspec(self.__class__)
sub_meta = self._meta["inputs"]["random"]["subsample"]
if argspec.defaults is None or "subsample" not in argspec.args:
raise ValueError("The subsample argument and default need to be defined in this Coreg class.")
sub_is_default = argspec.defaults[argspec.args.index("subsample") - 1] == sub_meta # type: ignore
if not sub_is_default:
warnings.warn(
"Subsample argument passed to fit() will override non-default subsample value defined at "
"instantiation. To silence this warning: only define 'subsample' in either fit(subsample=...) or "
"instantiation e.g. VerticalShift(subsample=...)."
)
# In any case, override!
self._meta["inputs"]["random"]["subsample"] = subsample
# Save random_state if a subsample is used
if self._meta["inputs"]["random"]["subsample"] != 1:
self._meta["inputs"]["random"]["random_state"] = random_state
# Apply the shift to the source dem if given
initial_shift_apply = False
if self._meta["inputs"]["affine"].get("initial_shift") is not None:
shift_x = self._meta["inputs"]["affine"]["initial_shift"][0] # type: ignore
shift_y = self._meta["inputs"]["affine"]["initial_shift"][1] # type: ignore
# shift_z is currently always equal to zero
reference_elev = reference_elev.translate(-shift_x, -shift_y) # type: ignore
initial_shift_apply = True
# Pre-process the inputs, by reprojecting and converting to arrays
ref_elev, tba_elev, inlier_mask, transform, crs, area_or_point, z_name = _preprocess_coreg_fit(
reference_elev=reference_elev,
to_be_aligned_elev=to_be_aligned_elev,
inlier_mask=inlier_mask,
transform=transform,
crs=crs,
area_or_point=area_or_point,
z_name=z_name,
)
main_args = {
"ref_elev": ref_elev,
"tba_elev": tba_elev,
"inlier_mask": inlier_mask,
"transform": transform,
"crs": crs,
"area_or_point": area_or_point,
"z_name": z_name,
"weights": weights,
}
# If bias_vars are defined, update dictionary content to array
if bias_vars is not None:
# Check if the current class actually requires bias_vars
if self._is_affine:
warnings.warn("This coregistration method is affine, ignoring `bias_vars` passed to fit().")
for var in bias_vars.keys():
bias_vars[var] = gu.raster.get_array_and_mask(bias_vars[var])[0]
main_args.update({"bias_vars": bias_vars})
# Run the associated fitting function, which has fallback logic for "raster-raster", "raster-point" or
# "point-point" depending on what is available for a certain Coreg function
self._fit_func(
**main_args,
**kwargs,
)
# Unapply the shift to the source dem if apply before
if initial_shift_apply and "outputs" in self.meta and "affine" in self.meta["outputs"]:
if "shift_x" in self.meta["outputs"]["affine"]:
self.meta["outputs"]["affine"]["shift_x"] += shift_x
logging.debug(f"Updated shift_x by {shift_x} in {self}")
if "shift_y" in self.meta["outputs"]["affine"]:
self.meta["outputs"]["affine"]["shift_y"] += shift_y
logging.debug(f"Updated shift_y by {shift_y} in {self}")
# Flag that the fitting function has been called.
self._fit_called = True
return self
@overload
def apply(
self,
elev: MArrayf,
bias_vars: dict[str, NDArrayf | MArrayf | RasterType] | None = None,
resample: bool = True,
resampling: str | rio.warp.Resampling = "bilinear",
transform: rio.transform.Affine | None = None,
crs: rio.crs.CRS | None = None,
z_name: str | None = None,
**kwargs: Any,
) -> tuple[MArrayf, rio.transform.Affine]: ...
@overload
def apply(
self,
elev: NDArrayf,
bias_vars: dict[str, NDArrayf | MArrayf | RasterType] | None = None,
resample: bool = True,
resampling: str | rio.warp.Resampling = "bilinear",
transform: rio.transform.Affine | None = None,
crs: rio.crs.CRS | None = None,
z_name: str | None = None,
**kwargs: Any,
) -> tuple[NDArrayf, rio.transform.Affine]: ...
@overload
def apply(
self,
elev: RasterType | gpd.GeoDataFrame | PointCloudType,
bias_vars: dict[str, NDArrayf | MArrayf | RasterType] | None = None,
resample: bool = True,
resampling: str | rio.warp.Resampling = "bilinear",
transform: rio.transform.Affine | None = None,
crs: rio.crs.CRS | None = None,
z_name: str | None = None,
**kwargs: Any,
) -> RasterType | gpd.GeoDataFrame | PointCloudType: ...
[docs]
def apply(
self,
elev: MArrayf | NDArrayf | RasterType | gpd.GeoDataFrame | PointCloudType,
bias_vars: dict[str, NDArrayf | MArrayf | RasterType] | None = None,
resample: bool = True,
resampling: str | rio.warp.Resampling = "bilinear",
transform: rio.transform.Affine | None = None,
crs: rio.crs.CRS | None = None,
z_name: str | None = None,
**kwargs: Any,
) -> (
RasterType
| gpd.GeoDataFrame
| PointCloudType
| tuple[NDArrayf, rio.transform.Affine]
| tuple[MArrayf, rio.transform.Affine]
):
"""
Apply the estimated transform to a DEM.
:param elev: Elevation to apply the transform to, either a DEM or an elevation point cloud.
:param bias_vars: Only for some bias correction classes. 2D array of bias variables used.
:param resample: If set to True, will reproject output Raster on the same grid as input. Otherwise, \
only the transform might be updated and no resampling is done.
:param resampling: Resampling method if resample is used. Defaults to "bilinear".
:param transform: Geotransform of the elevation, only if provided as 2D array.
:param crs: CRS of elevation, only if provided as 2D array.
:param z_name: Column name to use as elevation, only for point elevation data passed as geodataframe.
:param kwargs: Any optional arguments to be passed to either self._apply_rst or apply_matrix.
:returns: The transformed DEM.
"""
if not self._fit_called and self._meta["outputs"]["affine"].get("matrix") is None:
raise AssertionError(".fit() does not seem to have been called yet")
elev_array, transform, crs, z_name = _preprocess_coreg_apply(
elev=elev, transform=transform, crs=crs, z_name=z_name
)
main_args = {"elev": elev_array, "transform": transform, "crs": crs, "resample": resample, "z_name": z_name}
# If bias_vars are defined, update dictionary content to array
if bias_vars is not None:
# Check if the current class actually requires bias_vars
if self._is_affine:
warnings.warn("This coregistration method is affine, ignoring `bias_vars` passed to apply().")
for var in bias_vars.keys():
bias_vars[var] = gu.raster.get_array_and_mask(bias_vars[var])[0]
main_args.update({"bias_vars": bias_vars})
# Call _apply_func to choose method depending on point/raster input and if specific apply method exists
applied_elev, out_transform = self._apply_func(**main_args, **kwargs)
# Post-process output depending on input type
applied_elev, out_transform = _postprocess_coreg_apply(
elev=elev,
applied_elev=applied_elev,
transform=transform,
out_transform=out_transform,
crs=crs,
resample=resample,
resampling=resampling,
)
# Only return object if raster or geodataframe, also return transform if object was an array
if isinstance(applied_elev, (Raster, gpd.GeoDataFrame, PointCloud)):
return applied_elev
else:
return applied_elev, out_transform
@overload
def fit_and_apply(
self,
reference_elev: NDArrayf | MArrayf | RasterType | gpd.GeoDataFrame | PointCloudType,
to_be_aligned_elev: MArrayf,
inlier_mask: NDArrayb | Raster | None = None,
bias_vars: dict[str, NDArrayf | MArrayf | RasterType] | None = None,
weights: NDArrayf | None = None,
subsample: float | int | None = None,
transform: rio.transform.Affine | None = None,
crs: rio.crs.CRS | None = None,
area_or_point: Literal["Area", "Point"] | None = None,
z_name: str = "z",
resample: bool = True,
resampling: str | rio.warp.Resampling = "bilinear",
random_state: int | np.random.Generator | None = None,
fit_kwargs: dict[str, Any] | None = None,
apply_kwargs: dict[str, Any] | None = None,
) -> tuple[MArrayf, rio.transform.Affine]: ...
@overload
def fit_and_apply(
self,
reference_elev: NDArrayf | MArrayf | RasterType | gpd.GeoDataFrame | PointCloudType,
to_be_aligned_elev: NDArrayf,
inlier_mask: NDArrayb | Raster | None = None,
bias_vars: dict[str, NDArrayf | MArrayf | RasterType] | None = None,
weights: NDArrayf | None = None,
subsample: float | int | None = None,
transform: rio.transform.Affine | None = None,
crs: rio.crs.CRS | None = None,
area_or_point: Literal["Area", "Point"] | None = None,
z_name: str = "z",
resample: bool = True,
resampling: str | rio.warp.Resampling = "bilinear",
random_state: int | np.random.Generator | None = None,
fit_kwargs: dict[str, Any] | None = None,
apply_kwargs: dict[str, Any] | None = None,
) -> tuple[NDArrayf, rio.transform.Affine]: ...
@overload
def fit_and_apply(
self,
reference_elev: NDArrayf | MArrayf | RasterType | gpd.GeoDataFrame | PointCloudType,
to_be_aligned_elev: RasterType | gpd.GeoDataFrame | PointCloudType,
inlier_mask: NDArrayb | Raster | None = None,
bias_vars: dict[str, NDArrayf | MArrayf | RasterType] | None = None,
weights: NDArrayf | None = None,
subsample: float | int | None = None,
transform: rio.transform.Affine | None = None,
crs: rio.crs.CRS | None = None,
area_or_point: Literal["Area", "Point"] | None = None,
z_name: str = "z",
resample: bool = True,
resampling: str | rio.warp.Resampling = "bilinear",
random_state: int | np.random.Generator | None = None,
fit_kwargs: dict[str, Any] | None = None,
apply_kwargs: dict[str, Any] | None = None,
) -> RasterType | gpd.GeoDataFrame: ...
[docs]
@profiler.profile("xdem.coreg.base.fit_and_apply", memprof=True)
def fit_and_apply(
self,
reference_elev: NDArrayf | MArrayf | RasterType | gpd.GeoDataFrame | PointCloudType,
to_be_aligned_elev: NDArrayf | MArrayf | RasterType | gpd.GeoDataFrame | PointCloudType,
inlier_mask: NDArrayb | Raster | None = None,
bias_vars: dict[str, NDArrayf | MArrayf | RasterType] | None = None,
weights: NDArrayf | None = None,
subsample: float | int | None = None,
transform: rio.transform.Affine | None = None,
crs: rio.crs.CRS | None = None,
area_or_point: Literal["Area", "Point"] | None = None,
z_name: str = "z",
resample: bool = True,
resampling: str | rio.warp.Resampling = "bilinear",
random_state: int | np.random.Generator | None = None,
fit_kwargs: dict[str, Any] | None = None,
apply_kwargs: dict[str, Any] | None = None,
) -> RasterType | gpd.GeoDataFrame | tuple[NDArrayf, rio.transform.Affine] | tuple[MArrayf, rio.transform.Affine]:
"""Estimate and apply the coregistration to a pair of elevation data.
:param reference_elev: Reference elevation, either a DEM or an elevation point cloud.
:param to_be_aligned_elev: To-be-aligned elevation, either a DEM or an elevation point cloud.
:param inlier_mask: Raster or boolean array of areas to include (inliers=True).
:param bias_vars: Auxiliary variables for certain bias correction classes, as raster or arrays.
:param weights: Array of weights for the coregistration.
:param subsample: Subsample the input to increase performance. <1 is parsed as a fraction. >1 is a pixel count.
:param transform: Transform of the reference elevation, only if provided as 2D array.
:param crs: CRS of the reference elevation, only if provided as 2D array.
:param area_or_point: Pixel interpretation of the DEMs, only if provided as 2D arrays.
:param z_name: Column name to use as elevation, only for point elevation data passed as geodataframe.
:param resample: If set to True, will reproject output Raster on the same grid as input. Otherwise, \
only the transform might be updated and no resampling is done.
:param resampling: Resampling method if resample is used. Defaults to "bilinear".
:param random_state: Random state or seed number to use for calculations (to fix random sampling).
:param fit_kwargs: Keyword arguments to be passed to fit.
:param apply_kwargs: Keyword argument to be passed to apply.
"""
if fit_kwargs is None:
fit_kwargs = {}
if apply_kwargs is None:
apply_kwargs = {}
self.fit(
reference_elev=reference_elev,
to_be_aligned_elev=to_be_aligned_elev,
inlier_mask=inlier_mask,
bias_vars=bias_vars,
weights=weights,
subsample=subsample,
transform=transform,
crs=crs,
area_or_point=area_or_point,
z_name=z_name,
random_state=random_state,
**fit_kwargs,
)
aligned_dem = self.apply(
elev=to_be_aligned_elev,
bias_vars=bias_vars,
resample=resample,
resampling=resampling,
transform=transform,
crs=crs,
z_name=z_name,
**apply_kwargs,
)
return aligned_dem
def _fit_func(
self,
**kwargs: Any,
) -> None:
"""
Distribute to _fit_rst_rst, fit_rst_pts or fit_pts_pts depending on input and method availability.
Needs to be _fit_func of the main class to simplify calls from CoregPipeline and BlockwiseCoreg.
"""
# Determine if input is raster-raster, raster-point or point-point
if all(isinstance(dem, np.ndarray) for dem in (kwargs["ref_elev"], kwargs["tba_elev"])):
rop = "r-r"
elif all(isinstance(dem, gpd.GeoDataFrame) for dem in (kwargs["ref_elev"], kwargs["tba_elev"])):
rop = "p-p"
else:
rop = "r-p"
# Fallback logic is always the same: 1/ raster-raster, 2/ raster-point, 3/ point-point
try_rp = False
try_pp = False
# For raster-raster
if rop == "r-r":
# Check if raster-raster function exists, if yes run it and stop
try:
self._fit_rst_rst(**kwargs)
# Otherwise, convert the tba raster to points and try raster-points
except NotImplementedCoregFit:
warnings.warn(
f"No raster-raster method found for coregistration {self.__class__.__name__}, "
f"trying raster-point method by converting to-be-aligned DEM to points.",
UserWarning,
)
tba_elev_pts = (
gu.Raster.from_array(data=kwargs["tba_elev"], transform=kwargs["transform"], crs=kwargs["crs"])
.to_pointcloud()
.ds
)
kwargs.update({"tba_elev": tba_elev_pts})
try_rp = True
# For raster-point
if rop == "r-p" or try_rp:
try:
self._fit_rst_pts(**kwargs)
except NotImplementedCoregFit:
warnings.warn(
f"No raster-point method found for coregistration {self.__class__.__name__}, "
f"trying point-point method by converting all elevation data to points.",
UserWarning,
)
ref_elev_pts = (
gu.Raster.from_array(data=kwargs["ref_elev"], transform=kwargs["transform"], crs=kwargs["crs"])
.to_pointcloud()
.ds
)
kwargs.update({"ref_elev": ref_elev_pts})
try_pp = True
# For point-point
if rop == "p-p" or try_pp:
try:
self._fit_pts_pts(**kwargs)
except NotImplementedCoregFit:
if try_pp and try_rp:
raise NotImplementedCoregFit(
f"No raster-raster, raster-point or point-point method found for "
f"coregistration {self.__class__.__name__}."
)
elif try_pp:
raise NotImplementedCoregFit(
f"No raster-point or point-point method found for coregistration {self.__class__.__name__}."
)
else:
raise NotImplementedCoregFit(
f"No point-point method found for coregistration {self.__class__.__name__}."
)
def _apply_func(self, **kwargs: Any) -> tuple[NDArrayf | gpd.GeoDataFrame, affine.Affine]:
"""Distribute to _apply_rst and _apply_pts based on input and method availability."""
# If input is a raster
if isinstance(kwargs["elev"], np.ndarray):
# See if a _apply_rst exists
try:
# Run the associated apply function
applied_elev, out_transform = self._apply_rst(**kwargs) # pylint: disable=assignment-from-no-return
# If it doesn't exist, use apply_matrix()
except NotImplementedCoregApply:
if self.is_affine: # This only works for affine, however.
# Not resampling is only possible for translation methods, fail with warning if passed by user
if not self.is_translation:
if not kwargs["resample"]:
raise NotImplementedError(
f"Option `resample=False` not supported by {self.__class__},"
f" only available for translation coregistrations such as NuthKaab."
)
# Apply the matrix around the centroid (if defined, otherwise just from the center).
transform = kwargs.pop("transform")
applied_elev, out_transform = _apply_matrix_rst(
dem=kwargs.pop("elev"),
transform=transform,
matrix=self.to_matrix(),
centroid=self._meta["outputs"]["affine"].get("centroid"),
)
else:
raise ValueError("Cannot transform, Coreg method is non-affine and has no implemented _apply_rst.")
# If input is a point
else:
out_transform = None
# See if an _apply_pts_func exists
try:
applied_elev = self._apply_pts(**kwargs)
# If it doesn't exist, use apply_matrix()
except NotImplementedCoregApply:
if self.is_affine:
applied_elev = _apply_matrix_pts(
epc=kwargs["elev"],
matrix=self.to_matrix(),
centroid=self._meta["outputs"]["affine"].get("centroid"),
z_name=kwargs.pop("z_name"),
)
else:
raise ValueError("Cannot transform, Coreg method is non-affine and has no implemented _apply_pts.")
return applied_elev, out_transform
def _bin_or_and_fit_nd( # type: ignore
self,
values: NDArrayf,
bias_vars: None | dict[str, NDArrayf] = None,
weights: None | NDArrayf = None,
**kwargs,
) -> None:
"""
Generic binning and/or fitting method to model values along N variables for a coregistration/correction,
used for all affine and bias-correction subclasses. Expects either 2D arrays for rasters, or 1D arrays for
points.
Should only be called through subclassing.
"""
# Store bias variable names from the dictionary if undefined
if self._meta["inputs"]["fitorbin"]["bias_var_names"] is None:
self._meta["inputs"]["fitorbin"]["bias_var_names"] = list(bias_vars.keys())
# Run the fit or bin, passing the dictionary of parameters
params_fit_or_bin = self._meta["inputs"]["fitorbin"]
df, results = _bin_or_and_fit_nd(
fit_or_bin=self._meta["inputs"]["fitorbin"]["fit_or_bin"],
params_fit_or_bin=params_fit_or_bin,
values=values,
bias_vars=bias_vars,
weights=weights,
**kwargs,
)
# Initialize output dictionary
self.meta["outputs"]["fitorbin"] = {}
# Save results if fitting was performed
if self._meta["inputs"]["fitorbin"]["fit_or_bin"] in ["fit", "bin_and_fit"] and results is not None:
# Write the results to metadata in different ways depending on optimizer returns
if self._meta["inputs"]["fitorbin"]["fit_optimizer"] in (w["optimizer"] for w in fit_workflows.values()):
params = results[0]
order_or_freq = results[1]
if self._meta["inputs"]["fitorbin"]["fit_optimizer"] == robust_norder_polynomial_fit:
self._meta["outputs"]["specific"] = {"best_poly_order": order_or_freq}
else:
self._meta["outputs"]["specific"] = {"best_nb_sin_freq": order_or_freq}
elif self._meta["inputs"]["fitorbin"]["fit_optimizer"] == scipy.optimize.curve_fit:
params = results[0]
# Calculation to get the error on parameters (see description of scipy.optimize.curve_fit)
perr = np.sqrt(np.diag(results[1]))
self._meta["outputs"]["fitorbin"].update({"fit_perr": perr})
else:
params = results[0]
self._meta["outputs"]["fitorbin"].update({"fit_params": params})
# Save results of binning if it was performed
elif self._meta["inputs"]["fitorbin"]["fit_or_bin"] in ["bin", "bin_and_fit"] and df is not None:
self._meta["outputs"]["fitorbin"].update({"bin_dataframe": df})
def _fit_rst_rst(
self,
ref_elev: NDArrayf,
tba_elev: NDArrayf,
inlier_mask: NDArrayb,
transform: rio.transform.Affine,
crs: rio.crs.CRS,
area_or_point: Literal["Area", "Point"] | None,
z_name: str,
weights: NDArrayf | None = None,
bias_vars: dict[str, NDArrayf] | None = None,
**kwargs: Any,
) -> None:
# FOR DEVELOPERS: This function needs to be implemented by subclassing.
raise NotImplementedCoregFit("This step has to be implemented by subclassing.")
def _fit_rst_pts(
self,
ref_elev: NDArrayf | gpd.GeoDataFrame,
tba_elev: NDArrayf | gpd.GeoDataFrame,
inlier_mask: NDArrayb,
transform: rio.transform.Affine,
crs: rio.crs.CRS,
area_or_point: Literal["Area", "Point"] | None,
z_name: str,
weights: NDArrayf | None = None,
bias_vars: dict[str, NDArrayf] | None = None,
**kwargs: Any,
) -> None:
# FOR DEVELOPERS: This function needs to be implemented by subclassing.
raise NotImplementedCoregFit("This step has to be implemented by subclassing.")
def _fit_pts_pts(
self,
ref_elev: gpd.GeoDataFrame,
tba_elev: gpd.GeoDataFrame,
inlier_mask: NDArrayb,
transform: rio.transform.Affine,
crs: rio.crs.CRS,
z_name: str,
weights: NDArrayf | None = None,
bias_vars: dict[str, NDArrayf] | None = None,
**kwargs: Any,
) -> None:
# FOR DEVELOPERS: This function needs to be implemented by subclassing.
raise NotImplementedCoregFit("This step has to be implemented by subclassing.")
def _apply_rst(
self,
elev: NDArrayf,
transform: rio.transform.Affine,
crs: rio.crs.CRS,
bias_vars: dict[str, NDArrayf] | None = None,
**kwargs: Any,
) -> tuple[NDArrayf, rio.transform.Affine]:
# FOR DEVELOPERS: This function needs to be implemented by subclassing.
raise NotImplementedCoregApply("This should have been implemented by subclassing.")
def _apply_pts(
self,
elev: gpd.GeoDataFrame,
z_name: str = "z",
bias_vars: dict[str, NDArrayf] | None = None,
**kwargs: Any,
) -> gpd.GeoDataFrame:
# FOR DEVELOPERS: This function needs to be implemented by subclassing.
raise NotImplementedCoregApply("This should have been implemented by subclassing.")
[docs]
class CoregPipeline(Coreg):
"""
A sequential set of co-registration processing steps.
"""
[docs]
def __init__(self, pipeline: list[Coreg]) -> None:
"""
Instantiate a new processing pipeline.
:param: Processing steps to run in the sequence they are given.
"""
self.pipeline = pipeline
super().__init__()
def __repr__(self) -> str:
return f"Pipeline: {self.pipeline}"
@overload
def info(self, as_str: Literal[False] = ...) -> None: ...
@overload
def info(self, as_str: Literal[True]) -> str: ...
def info(self, as_str: bool = False) -> None | str:
"""Summarize information about this coregistration."""
# Get the pipeline information for each step as a string
final_str = []
for i, step in enumerate(self.pipeline):
final_str.append(f"Pipeline step {i}:\n" f"################\n")
step_str = step.info(as_str=True)
final_str.append(step_str)
# Return as string or print (default)
if as_str:
return "".join(final_str)
else:
print("".join(final_str))
return None
def copy(self: CoregType) -> CoregType:
"""Return an identical copy of the class."""
new_coreg = self.__new__(type(self))
new_coreg.__dict__ = {key: copy.deepcopy(value) for key, value in self.__dict__.items() if key != "pipeline"}
new_coreg.pipeline = [step.copy() for step in self.pipeline]
return new_coreg
def _parse_bias_vars(self, step: int, bias_vars: dict[str, NDArrayf] | None) -> dict[str, NDArrayf]:
"""Parse bias variables for a pipeline step requiring them."""
# Get number of non-affine coregistration requiring bias variables to be passed
nb_needs_vars = sum(c._needs_vars for c in self.pipeline)
# Get step object
coreg = self.pipeline[step]
# Check that all variable names of this were passed
var_names = coreg._meta["inputs"]["fitorbin"]["bias_var_names"]
# Raise error if bias_vars is None
if bias_vars is None:
msg = f"No `bias_vars` passed to .fit() for bias correction step {coreg.__class__} of the pipeline."
if nb_needs_vars > 1:
msg += (
" As you are using several bias correction steps requiring `bias_vars`, don't forget to "
"explicitly define their `bias_var_names` during "
"instantiation, e.g. {}(bias_var_names=['slope']).".format(coreg.__class__.__name__)
)
raise ValueError(msg)
# Raise error if no variable were explicitly assigned and there is more than 1 step with bias_vars
if var_names is None and nb_needs_vars > 1:
raise ValueError(
"When using several bias correction steps requiring `bias_vars` in a pipeline,"
"the `bias_var_names` need to be explicitly defined at each step's "
"instantiation, e.g. {}(bias_var_names=['slope']).".format(coreg.__class__.__name__)
)
# Raise error if the variables explicitly assigned don't match the ones passed in bias_vars
if not all(n in bias_vars.keys() for n in var_names):
raise ValueError(
"Not all keys of `bias_vars` in .fit() match the `bias_var_names` defined during "
"instantiation of the bias correction step {}: {}.".format(coreg.__class__, var_names)
)
# Add subset dict for this pipeline step to args of fit and apply
return {n: bias_vars[n] for n in var_names}
# Need to override base Coreg method to work on pipeline steps
def fit(
self: CoregType,
reference_elev: NDArrayf | MArrayf | RasterType | gpd.GeoDataFrame | PointCloudType,
to_be_aligned_elev: NDArrayf | MArrayf | RasterType | gpd.GeoDataFrame | PointCloudType,
inlier_mask: NDArrayb | Raster | None = None,
bias_vars: dict[str, NDArrayf | MArrayf | RasterType] | None = None,
weights: NDArrayf | None = None,
subsample: float | int | None = None,
transform: rio.transform.Affine | None = None,
crs: rio.crs.CRS | None = None,
area_or_point: Literal["Area", "Point"] | None = None,
z_name: str | None = None,
random_state: int | np.random.Generator | None = None,
**kwargs: Any,
) -> CoregType:
# Check if subsample arguments are different from their default value for any of the coreg steps:
# get default value in argument spec and "subsample" stored in meta, and compare both are consistent
argspec = [inspect.getfullargspec(c.__class__) for c in self.pipeline]
sub_meta = [c.meta["inputs"]["random"]["subsample"] for c in self.pipeline]
sub_is_default = [
argspec[i].defaults[argspec[i].args.index("subsample") - 1] == sub_meta[i] # type: ignore
for i in range(len(argspec))
]
if subsample is not None and not all(sub_is_default):
warnings.warn(
"Subsample argument passed to fit() will override non-default subsample values defined for"
" individual steps of the pipeline. To silence this warning: only define 'subsample' in "
"either fit(subsample=...) or instantiation e.g., VerticalShift(subsample=...)."
)
# Filter warnings of individual pipelines now that the one above was raised
warnings.filterwarnings("ignore", message="Subsample argument passed to*", category=UserWarning)
# Pre-process the inputs, by reprojecting and subsampling, without any subsampling (done in each step)
ref_dem, tba_dem, inlier_mask, transform, crs, area_or_point, z_name = _preprocess_coreg_fit(
reference_elev=reference_elev,
to_be_aligned_elev=to_be_aligned_elev,
inlier_mask=inlier_mask,
transform=transform,
crs=crs,
area_or_point=area_or_point,
z_name=z_name,
)
tba_dem_mod = tba_dem.copy()
out_transform = transform
for i, coreg in enumerate(self.pipeline):
logging.debug("Running pipeline step: %d / %d", i + 1, len(self.pipeline))
main_args_fit = {
"reference_elev": ref_dem,
"to_be_aligned_elev": tba_dem_mod,
"inlier_mask": inlier_mask,
"transform": out_transform,
"crs": crs,
"z_name": z_name,
"weights": weights,
"subsample": subsample,
"random_state": random_state,
}
main_args_apply = {"elev": tba_dem_mod, "transform": out_transform, "crs": crs, "z_name": z_name}
# If non-affine method that expects a bias_vars argument
if coreg._needs_vars:
step_bias_vars = self._parse_bias_vars(step=i, bias_vars=bias_vars)
main_args_fit.update({"bias_vars": step_bias_vars})
main_args_apply.update({"bias_vars": step_bias_vars})
# Perform the step fit
coreg.fit(**main_args_fit)
# Step apply: one output for a geodataframe, two outputs for array/transform
# We only run this step if it's not the last, otherwise it is unused!
if i != (len(self.pipeline) - 1):
if isinstance(tba_dem_mod, gpd.GeoDataFrame):
tba_dem_mod = coreg.apply(**main_args_apply)
else:
tba_dem_mod, out_transform = coreg.apply(**main_args_apply)
# Flag that the fitting function has been called.
self._fit_called = True
return self
@overload
def apply(
self,
elev: MArrayf,
bias_vars: dict[str, NDArrayf | MArrayf | RasterType] | None = None,
resample: bool = True,
resampling: str | rio.warp.Resampling = "bilinear",
transform: rio.transform.Affine | None = None,
crs: rio.crs.CRS | None = None,
z_name: str | None = None,
**kwargs: Any,
) -> tuple[MArrayf, rio.transform.Affine]: ...
@overload
def apply(
self,
elev: NDArrayf,
bias_vars: dict[str, NDArrayf | MArrayf | RasterType] | None = None,
resample: bool = True,
resampling: str | rio.warp.Resampling = "bilinear",
transform: rio.transform.Affine | None = None,
crs: rio.crs.CRS | None = None,
z_name: str | None = None,
**kwargs: Any,
) -> tuple[NDArrayf, rio.transform.Affine]: ...
@overload
def apply(
self,
elev: RasterType | gpd.GeoDataFrame | PointCloudType,
bias_vars: dict[str, NDArrayf | MArrayf | RasterType] | None = None,
resample: bool = True,
resampling: str | rio.warp.Resampling = "bilinear",
transform: rio.transform.Affine | None = None,
crs: rio.crs.CRS | None = None,
z_name: str | None = None,
**kwargs: Any,
) -> RasterType | gpd.GeoDataFrame | gu.PointCloud: ...
# Need to override base Coreg method to work on pipeline steps
def apply(
self,
elev: MArrayf | NDArrayf | RasterType | gpd.GeoDataFrame | PointCloudType,
bias_vars: dict[str, NDArrayf | MArrayf | RasterType] | None = None,
resample: bool = True,
resampling: str | rio.warp.Resampling = "bilinear",
transform: rio.transform.Affine | None = None,
crs: rio.crs.CRS | None = None,
z_name: str | None = None,
**kwargs: Any,
) -> (
RasterType
| gpd.GeoDataFrame
| gu.PointCloud
| tuple[NDArrayf, rio.transform.Affine]
| tuple[MArrayf, rio.transform.Affine]
):
# First step and preprocessing
if not self._fit_called and self._meta["outputs"]["affine"].get("matrix") is None:
raise AssertionError(".fit() does not seem to have been called yet")
elev_array, transform, crs, z_name = _preprocess_coreg_apply(
elev=elev, transform=transform, crs=crs, z_name=z_name
)
elev_mod = elev_array.copy()
out_transform = copy.copy(transform)
# Apply each step of the coregistration
for i, coreg in enumerate(self.pipeline):
main_args_apply = {
"elev": elev_mod,
"transform": out_transform,
"crs": crs,
"z_name": z_name,
"resample": resample,
"resampling": resampling,
}
# If non-affine method that expects a bias_vars argument
if coreg._needs_vars:
step_bias_vars = self._parse_bias_vars(step=i, bias_vars=bias_vars)
main_args_apply.update({"bias_vars": step_bias_vars})
# Step apply: one return for a geodataframe, two returns for array/transform
if isinstance(elev_mod, gpd.GeoDataFrame):
elev_mod = coreg.apply(**main_args_apply, **kwargs)
else:
elev_mod, out_transform = coreg.apply(**main_args_apply, **kwargs)
# Post-process output depending on input type
applied_elev, out_transform = _postprocess_coreg_apply(
elev=elev,
applied_elev=elev_mod,
transform=transform,
out_transform=out_transform,
crs=crs,
resample=resample,
resampling=resampling,
)
# Only return object if raster or geodataframe, also return transform if object was an array
if isinstance(applied_elev, (gu.Raster, gpd.GeoDataFrame, gu.PointCloud)):
return applied_elev
else:
return applied_elev, out_transform
def __iter__(self) -> Generator[Coreg]:
"""Iterate over the pipeline steps."""
yield from self.pipeline
def __add__(self, other: list[Coreg] | Coreg | CoregPipeline) -> CoregPipeline:
"""Append a processing step or a pipeline to the pipeline."""
if not isinstance(other, Coreg):
other = list(other)
else:
other = [other]
pipelines = self.pipeline + other
# Cancel possible initial shift(s) in CoregPipeline case
for method in pipelines:
if "affine" in method.meta["inputs"] and "initial_shift" in method.meta["inputs"]["affine"]:
del method.meta["inputs"]["affine"]["initial_shift"]
return CoregPipeline(pipelines)
def to_matrix(self) -> NDArrayf:
"""Convert the transform to a 4x4 transformation matrix."""
return self._to_matrix_func()
def _to_matrix_func(self) -> NDArrayf:
"""Try to join the coregistration steps to a single transformation matrix."""
total_transform = np.eye(4)
for coreg in self.pipeline:
new_matrix = coreg.to_matrix()
total_transform = new_matrix @ total_transform
return total_transform