Source code for faninsar.NSBAS.inversion

"""NSBAS inversion module."""

from __future__ import annotations

from typing import TYPE_CHECKING, Sequence

import numpy as np
import psutil
import torch
from tqdm import tqdm

from faninsar import Loops, Pairs, parse_device
from faninsar.NSBAS.tsmodels import TimeSeriesModels

if TYPE_CHECKING:
    from numpy.typing import NDArray


[docs] class NSBASMatrixFactory: """Factory class to generate/format NSBAS matrix. The NSBAS matrix is usually expressed as: ``d = Gm``, where ``d`` is the unwrapped interferograms matrix, ``G`` is the NSBAS matrix, and ``m`` is the model parameters, which is the combination of the deformation increment and the model parameters. see paper: TODO for more details. .. note:: After initialization, the ``d`` can still be updated by assigning a new unwrapped interferograms matrix to ``d``. This is useful when the unwrapped interferograms is divided into multiple patches and the NSBAS matrix is calculated for each patch separately. Examples -------- >>> import faninsar as fis >>> import numpy as np >>> names = ['20170111_20170204', '20170111_20170222', '20170111_20170318', '20170204_20170222', '20170204_20170318', '20170204_20170330', '20170222_20170318', '20170222_20170330', '20170222_20170411', '20170318_20170330'] >>> pairs = fis.Pairs.from_names(names) >>> unw = np.random.randint(0, 255, (len(pairs), 5)) >>> model = fis.AnnualSinusoidalModel(pairs.dates) >>> nsbas_matrix = fis.NSBASMatrixFactory(unw, pairs, model) >>> nsbas_matrix NSBASMatrixFactory( pairs: Pairs(10) model: AnnualSinusoidalModel(dates: 6, unit: day) gamma: 0.0001 G shape: (16, 9) d shape: (16, 5) ) reset ``d`` by assigning a new unwrapped interferograms matrix with same pairs >>> nsbas_matrix.d = np.random.randint(0, 255, (len(pairs), 10)) NSBASMatrixFactory( pairs: Pairs(10) model: AnnualSinusoidalModel(dates: 6, unit: day) gamma: 0.0001 G shape: (16, 9) d shape: (16, 10) ) """ _pairs: Pairs _model: TimeSeriesModels | None _gamma: float _G: NDArray[np.float32] _d: NDArray[np.float32 | np.float64] __slots__ = ["_G", "_d", "_gamma", "_model", "_pairs"]
[docs] def __init__( self, unw: NDArray[np.floating], pairs: Pairs | Sequence[str], model: TimeSeriesModels | None = None, gamma: float = 0.0001, ) -> None: """Initialize NSBASMatrixFactory. Parameters ---------- unw : NDArray (n_pairs, n_pixels) Unwrapped interferograms matrix pairs : Pairs | Sequence[str] Pairs or Sequence of pair names model : Optional[TimeSeriesModels], optional Time series model. If None, generate SBAS matrix rather than NSBAS matrix, by default None. gamma : float, optional weight for the model component, by default 0.0001. This parameter will be ignored if model is None. """ if isinstance(pairs, Pairs): self._pairs = pairs elif isinstance(pairs, Sequence): self._pairs = Pairs.from_names(pairs) else: msg = "pairs must be either Pairs or Sequence" raise TypeError(msg) if isinstance(unw, np.ma.MaskedArray): unw = unw.filled(np.nan) self._model = None self._gamma = None if model is not None: self._check_model(model) self._check_gamma(gamma) self._model = model self._gamma = gamma self.G = self._make_nsbas_matrix(model.G_br, gamma) else: self.G = self._make_sbas_matrix() self.d = unw
def __str__(self) -> str: """Return string representation.""" return ( f"{self.__class__.__name__}(pairs: {self.pairs}, model:" f"{self.model}, gamma: {self.gamma})" ) def __repr__(self) -> str: """Return string representation.""" return ( f"{self.__class__.__name__}(\n" f" pairs: {self.pairs}\n" f" model: {self.model!s}\n" f" gamma: {self.gamma}\n" f" G shape: {self.G.shape}\n" f" d shape: {self.d.shape}\n" ")" ) @property def pairs(self) -> Pairs: """Return pairs.""" return self._pairs @property def model(self) -> TimeSeriesModels | None: """Return model.""" return self._model def _check_model(self, model: TimeSeriesModels) -> None: """Check model.""" if not isinstance(model, TimeSeriesModels): msg = "model must be a TimeSeriesModels instance" raise TypeError(msg) @property def gamma(self) -> float: """Return gamma.""" return self._gamma def _check_gamma(self, gamma: float) -> None: """Update gamma and G by input gamma.""" if not isinstance(gamma, (float, int)): msg = "gamma must be either float or int" raise TypeError(msg) if gamma <= 0: msg = "gamma must be positive" raise ValueError(msg) @property def d(self) -> NDArray[np.float32 | np.float64]: """Return ``d`` matrix for NSBAS ``d = Gm``.""" return self._d @d.setter def d(self, unw: np.ndarray) -> None: """Update d: restructure unw by appending model matrix part.""" if not isinstance(unw, np.ndarray): msg = "d must be a numpy array" raise TypeError(msg) if len(unw.shape) != 2: msg = "d must be a 2D array" raise ValueError(msg) if unw.shape[0] != len(self.pairs): msg = "input unw must have the same rows number as pairs number" raise ValueError(msg) if self.model is None: self._d = unw else: self._d = self._restructure_unw(unw) @property def G(self) -> NDArray[np.float32]: # noqa: N802 """Return ``G`` matrix for NSBAS ``d = Gm``.""" return self._G @G.setter def G(self, G: np.ndarray) -> None: # noqa: N802, N803 """Update G by input G.""" if not isinstance(G, np.ndarray): msg = "G must be a numpy array" raise TypeError(msg) if (self.model is not None) & ( G.shape[0] != (len(self.pairs) + len(self.pairs.dates)) ): msg = ( "G must have the same number of rows as (n_pairs + n_dates)" " if model is not None." ) raise ValueError( msg, ) self._G = G def _make_nsbas_matrix( self, G_br: np.ndarray, # noqa: N803 gamma: np.ndarray, ) -> NDArray[np.float32]: G_br = np.asarray(G_br, dtype=np.float32) # noqa: N806 G_tl = self.pairs.to_matrix() # noqa: N806 if len(G_br.shape) == 1: G_br = G_br.reshape(-1, 1) # noqa: N806 n_param = G_br.shape[1] n_date = len(self.pairs.dates) G_bl = np.tril(np.ones((n_date, n_date - 1), dtype=np.float32), k=-1) # noqa: N806 G_b = np.hstack((G_bl, G_br)) * gamma # noqa: N806 G_t = np.hstack((G_tl, np.zeros((len(self._pairs), n_param)))) # noqa: N806 return np.vstack((G_t, G_b)) def _make_sbas_matrix(self) -> NDArray[np.float32]: return self.pairs.to_matrix() def _restructure_unw( self, unw: NDArray[np.number], ) -> NDArray[np.float32 | np.float64]: if self.model is not None: unw = np.vstack((unw, np.zeros((len(self.pairs.dates), unw.shape[1])))) return unw
[docs] class NSBASInversion: """a class used to operate NSBAS inversion. The NSBAS inversion is usually expressed as: ``d = Gm``, where ``d`` is the unwrapped interferograms matrix, ``G`` is the NSBAS matrix, and ``m`` is the model parameters, which is the combination of the deformation increment and the model parameters. see paper: TODO for more details. """
[docs] def __init__( self, matrix_factory: NSBASMatrixFactory, device: str | torch.device | None = None, dtype: torch.dtype = torch.float64, verbose: bool = True, ) -> None: """Initialize NSBASInversion. Parameters ---------- matrix_factory : NSBASMatrixFactory NSBASMatrixFactory instance device : Optional[str | torch.device], optional device of torch.tensor used for computation. If None, use GPU if available, otherwise use CPU. dtype : torch.dtype dtype of torch.tensor used for computation. verbose : bool, optional If True, show progress bar, by default True """ self.matrix_factory = matrix_factory self.device = parse_device(device) self.dtype = dtype self.verbose = verbose self.G = matrix_factory.G self.d = matrix_factory.d self.n_param = len(matrix_factory.model.param_names) self.n_pair = len(matrix_factory.pairs)
def __str__(self) -> str: """Return string representation.""" return f"{self.__class__.__name__}()"
[docs] def inverse( self, ) -> tuple[ NDArray[np.floating], NDArray[np.floating], NDArray[np.floating], NDArray[np.floating], ]: """Calculate increment displacement difference by NSBAS inversion. Returns ------- incs: np.ndarray (n_date - 1, n_pt) Incremental displacement params: np.ndarray (n_param, n_pt) parameters of model in NSBAS inversion residual_pair: np.ndarray (n_pair, n_pt) residual between interferograms and model result residual_tsm: np.ndarray (n_date, n_pt) residual between time-series model and model result """ result = batch_lstsq( self.G, self.d, dtype=self.dtype, device=self.device, verbose=self.verbose, tqdm_args={"desc": " NSBAS inversion"}, ) residual = self.d - np.dot(self.G, result) incs = result[: -self.n_param, :] params = result[-self.n_param :, :] residual_pair = residual[: self.n_pair] residual_tsm = residual[self.n_pair :] return incs, params, residual_pair, residual_tsm
def device_mem_size(device: str | torch.device | None) -> int: """Get memory size (in MB) for GPU or CPU. Parameters ---------- device : Optional[str | torch.device] device of torch.tensor used for computation. Returns ------- mem_size : int memory size (in MB) for GPU or CPU. """ device_type = parse_device(device).type if device_type == "cuda": free_memory, _ = torch.cuda.mem_get_info() mem_size = int(free_memory / 1024**2) else: # macos share memory with CPU mem_free = psutil.virtual_memory().available mem_size = int(mem_free / 1024**2) return mem_size def _get_patch_col( G: np.ndarray | torch.Tensor, # noqa: N803 d: np.ndarray | torch.Tensor, mem_size: int, dtype: np.dtype, safe_factor: float = 2, ) -> list[list[int]]: """Get patch number of cols for memory size (in MB) for SBAS inversion. Parameters ---------- G : np.ndarray | torch.Tensor model field matrix with shape of (n_im, n_param) or (n_pt, n_im, n_param). d : np.ndarray | torch.Tensor data field matrix with shape of (n_im, n_pt). mem_size : int memory size (in MB) for GPU or CPU. dtype: numpy.dtype or torch.dtype dtype of ndarray safe_factor : float, optional safe factor for memory size, by default 2 Returns ------- patch_col : list[list[int]] List of the number of rows for each patch. eg: [[0, 1234], [1235, 2469],... ] """ m, n = d.shape r = G.shape[-1] # rough value of n_patch n_patch = int( np.ceil( m * n * r**2 * torch.tensor([], dtype=dtype).element_size() * safe_factor / 2**20 / mem_size, ), ) # accurate value of n_patch row_spacing = int(np.ceil(n / n_patch)) n_patch = int(np.ceil(n / row_spacing)) patch_col = [] for i in range(n_patch): patch_col.append([i * row_spacing, (i + 1) * row_spacing]) if i == n_patch - 1: patch_col[-1][-1] = n return patch_col
[docs] def batch_lstsq( G: np.ndarray | torch.Tensor, # noqa: N803 d: np.ndarray | torch.Tensor, dtype: torch.dtype = torch.float64, device: str | torch.device | None = None, verbose: bool = True, tqdm_args: dict | None = None, return_numpy: bool = True, ) -> NDArray[np.floating] | torch.Tensor: """Batch least-squares solver for solving the least squares problem. Parameters ---------- G : np.ndarray | torch.Tensor model field matrix with shape of (n_im, n_param) or (n_pt, n_im, n_param). If G is 3D, the first dimension is the G matrix for every pixel. d : np.ndarray | torch.Tensor data field matrix with shape of (n_im, n_pt). dtype : torch.dtype, optional dtype of torch.tensor used for computation device : Optional[str | torch.device], optional device of torch.tensor used for computation. If None, use GPU if available, otherwise use CPU. verbose : bool, optional If True, show progress bar, by default True tqdm_args : dict, optional Arguments to be passed to `tqdm.tqdm <https://tqdm.github.io/docs/tqdm#tqdm-objects>`_ Object for progress bar. return_numpy : bool, optional If True, return a numpy array, otherwise return a torch tensor. Returns ------- X : np.ndarray | torch.Tensor (n_im x n_pt) matrix that minimizes norm(M*(GX - d)). If return_numpy is True, return a numpy array, otherwise return a torch tensor. """ if tqdm_args is None: tqdm_args = {} tqdm_args.setdefault("desc", "Batch least-squares") tqdm_args.setdefault("unit", "Batch") n_pt = d.shape[1] n_param = G.shape[1] if G.ndim == 2 else G.shape[2] device = parse_device(device) result = torch.full((n_param, n_pt), torch.nan, dtype=dtype) mem_size = device_mem_size(device) patch_col = _get_patch_col(G, d, mem_size, dtype) if verbose: patch_col = tqdm(patch_col, **tqdm_args) for col in patch_col: if G.ndim == 2: result[:, col[0] : col[1]] = censored_lstsq( G, d[:, col[0] : col[1]], dtype, device, return_numpy=False, ) elif G.ndim == 3: result[:, col[0] : col[1]] = censored_lstsq( G[col[0] : col[1], :, :], d[:, col[0] : col[1]], dtype, device, return_numpy=False, ) else: msg = "Dimension of G must be 2 or 3" raise ValueError(msg) if return_numpy: result = result.cpu().numpy() return result
[docs] def censored_lstsq( G: np.ndarray | torch.Tensor, # noqa: N803 d: np.ndarray | torch.Tensor, dtype: torch.dtype = torch.float64, device: str | torch.device | None = None, return_numpy: bool = True, ) -> NDArray[np.floating] | torch.Tensor: """Solves least squares problem subject to missing data. Reference: http://alexhwilliams.info/itsneuronalblog/2018/02/26/censored-lstsq/. .. note:: This function is used for solving the least squares problem with **missing data**. The missing data is represented by nan values in the data matrix ``d``. If there are no nan values in d, you are recommended to use :func:`torch.linalg.lstsq` instead. Parameters ---------- G : np.ndarray | torch.Tensor, model field matrix in shape of (n_im, n_param) or (n_pt, n_im, n_param). If G is 3D, the first dimension is the G matrix for each pixel. d : np.ndarray | torch.Tensor, (n_im, n_pt) matrix data field matrix. dtype : torch.dtype dtype of torch.tensor used for computation. device : Optional[str | torch.device] device of torch.tensor used for computation. If None, use GPU if available, otherwise use CPU. return_numpy : bool, optional If True, return a numpy array, otherwise return a torch tensor. Returns ------- X : np.ndarray | torch.Tensor (n_im x n_pt) matrix that minimizes norm(M*(GX - d)). If return_numpy is True, return a numpy array, otherwise return a torch tensor. """ device = parse_device(device) G = torch.tensor(G, dtype=dtype, device=device) # noqa: N806 d = torch.tensor(d, dtype=dtype, device=device) # set nan values to zero d_nan = torch.isnan(d) d[d_nan] = 0 M = ~d_nan # noqa: N806 # get the filter for pixels that could be solved m = torch.sum(M, axis=0) > G.shape[-1] X = torch.full((G.shape[-1], d.shape[-1]), torch.nan, dtype=dtype, device=device) # noqa: N806 if G.ndim == 2: rhs = torch.matmul(G.T, M[:, m] * d[:, m]).T[:, :, None] # n x r x 1 tensor T = torch.matmul( # noqa: N806 G.T[None, :, :], M[:, m].T[:, :, None] * G[None, :, :], ) # n x r x r tensor else: rhs = torch.matmul( G[m].transpose(0, 2, 1), (M[:, m] * d[:, m]).T[:, :, None], ) # n x r x 1 tensor # n x r x r tensor T = torch.matmul(G[m].transpose(0, 2, 1), M[:, m].T[:, :, None] * G[m]) # noqa: N806 X[:, m] = torch.squeeze( torch.linalg.solve(T, rhs), dim=2, ).T # transpose to get r x n device_type = device.type if device_type != "cpu": X_np = X.detach().cpu() # noqa: N806 G, d, M, d_nan = None, None, None, None # noqa: N806 rhs, T, X = None, None, None # noqa: N806 if device_type == "cuda": torch.cuda.empty_cache() elif device_type == "mps": torch.mps.empty_cache() if return_numpy: X_np = X_np.numpy() # noqa: N806 return X_np if return_numpy: return X.numpy() return X
[docs] def calculate_u( loops: Loops, unw_phases: np.ndarray, device: str | torch.device | None = None, dtype: torch.dtype = torch.float64, ) -> NDArray[np.floating]: """Calculate correction matrix u by loop closure phase using least square. More details see paper: .. tip:: The pairs in the loops may be fewer than the input pairs. To make sure the pairs in the loops are the same as the pairs in the unw_phases, you can use the :meth:`.Pairs.where` method to get the index/mask of the pairs in the loops from the input pairs. Parameters ---------- loops : Loops Loops object. Loops are used to calculate the design matrix, which describes the relationship between the loops and pairs. unw_phases : ndarray unwrapped interferograms phases with shape of (n_pair, n_pixel). device : Optional[str | torch.device], optional device of torch.tensor used for computation. If None, use GPU if available, otherwise use CPU. dtype : torch.dtype, optional dtype of torch.tensor used for computation. Examples -------- get the loops from the pairs: >>> loops = pairs.to_loops() >>> idx = pairs.where( ... loops.pairs ... ) # get the index of the pairs in the loops from the input pairs >>> unw_used = unw[idx] calculate u by loops and unwrapped interferometric phases: >>> u = np.zeros_like(unw, dtype=np.float32) >>> u[idx] = np.round(NSBAS.calculate_u(loops, unw_used)) calculate the corrected interferometric phases >>> unw_c = unw - 2 * np.pi * u """ contain_nan = False if np.any(np.isnan(unw_phases)): contain_nan = True C = loops.to_matrix() # noqa: N806 # edge pairs are not contributing to the loop closure phase, remove them # from the matrix C to avoid being involved in the calculation of u mask = loops.pairs.where(loops.diagonal_pairs) Cc = C[:, mask] # noqa: N806 u = np.zeros_like(unw_phases) C = torch.tensor(C, dtype=dtype, device=device) # noqa: N806 unw_phases = torch.tensor(unw_phases, dtype=dtype, device=device) Cc = torch.tensor(Cc, dtype=dtype, device=device) # noqa: N806 closure_phase = torch.mm(C, unw_phases) if contain_nan: _Uc = batch_lstsq( # noqa: N806 Cc, closure_phase, dtype=dtype, device=device, tqdm_args={"desc": " Calculate u"}, ).numpy() else: _Uc = torch.linalg.lstsq(Cc, closure_phase).solution.numpy() # noqa: N806 u[mask] = _Uc / (2 * np.pi) return u