"""Pair and loop tools for InSAR time series analysis."""
from __future__ import annotations
from collections.abc import Iterable, Sequence
from datetime import datetime
from typing import TYPE_CHECKING, Callable, Literal, overload
import numpy as np
import pandas as pd
from faninsar.logging import setup_logger
from .acquisition import DateManager
from .pairs import Pair, Pairs
if TYPE_CHECKING:
from numpy.typing import DTypeLike, NDArray
from faninsar.typing import TripletLoopLike
logger = setup_logger(log_name=__name__)
class TripletLoop:
"""TripletLoop class containing three pairs/acquisitions."""
_values: np.ndarray
_name: str
_pairs: list[Pair]
__slots__ = ["_days12", "_days13", "_days23", "_name", "_pairs", "_values"]
def __init__(self, loop: Sequence[datetime, datetime, datetime]) -> None:
"""Initialize the TripletLoop class.
Parameters
----------
loop: Sequence
Sequence object of three dates. Each date is a datetime object.
For example, (date1, date2, date3).
"""
self._values = np.sort(loop).astype("M8[D]")
loop_dt = self._values.astype(datetime)
self._name = "_".join([i.strftime("%Y%m%d") for i in loop_dt])
self._pairs = [
Pair([loop_dt[0], loop_dt[1]]),
Pair([loop_dt[1], loop_dt[2]]),
Pair([loop_dt[0], loop_dt[2]]),
]
def __str__(self) -> str:
"""Return the name of the loop."""
return self._name
def __repr__(self) -> str:
"""Return the representation of the loop."""
return f"TripletLoop({self._name})"
def __eq__(self, other: TripletLoop) -> bool:
"""Compare whether the loop is same to another loop."""
return self.name == other.name
def __hash__(self) -> int:
"""Return the hash of the loop."""
return hash(self.name)
def __array__(self, dtype: DTypeLike = None) -> NDArray:
"""Return the loop as a numpy array."""
return np.asarray(self._values, dtype=dtype)
@property
def values(self) -> NDArray[np.datetime64]:
"""Three dates of the loop with format of np.datetime64[D]."""
return self._values
@property
def pairs(self) -> list[Pair]:
"""All three pairs of the loop."""
return self._pairs
@property
def days12(self) -> int:
"""Return the time span of the first pair in days.
Returns
-------
days12: int
Time span of the first pair in days.
"""
return (self._values[1] - self._values[0]).astype(int)
@property
def days23(self) -> int:
"""Return the time span of the second pair in days.
Returns
-------
days23: int
Time span of the second pair in days.
"""
return (self._values[2] - self._values[1]).astype(int)
@property
def days13(self) -> int:
"""Return the time span of the third pair in days.
Returns
-------
days13: int
Time span of the third pair in days.
"""
return (self._values[2] - self._values[0]).astype(int)
@property
def name(self) -> str:
"""Return the string format the loop.
Returns
-------
name: str
String of the loop.
"""
return self._name
@classmethod
def from_name(
cls,
name: str,
parse_function: Callable | None = None,
date_args: dict | None = None,
) -> TripletLoop:
"""Initialize the loop class from a loop name.
Parameters
----------
name: str
TripletLoop name.
parse_function: Callable, optional
Function to parse the date strings from the loop name.
If None, the loop name will be split by '_' and the
last 3 items will be used. Default is None.
date_args: dict, optional
Keyword arguments for pd.to_datetime() to convert the
date strings to datetime objects. For example,
{'format': '%Y%m%d'}. Default is {}.
Returns
-------
loop: TripletLoop
TripletLoop object.
"""
if date_args is None:
date_args = {}
dates = DateManager.str_to_dates(name, 3, parse_function, date_args)
return cls(dates)
def to_numpy(self, dtype: DTypeLike = None) -> NDArray:
"""Return the loop as a numpy array."""
return np.asarray(self._values, dtype=dtype)
[docs]
class TripletLoops:
"""TripletLoops class to handle loops with three pairs/acquisitions."""
_values: np.ndarray
_dates: np.ndarray
_length: int
__slots__ = ["_dates", "_length", "_values"]
[docs]
def __init__(
self,
loops: Sequence[Sequence[datetime, datetime, datetime]] | Sequence[TripletLoop],
sort: bool = True,
) -> None:
"""Initialize the triplet loops class.
Parameters
----------
loops: Sequence
Sequence object of triplet loops. Each loop is an Sequence object
of three dates with format of datetime or TripletLoop object.
For example, [(date1, date2, date3), ...].
sort: bool, optional
Whether to sort the loops. Default is True.
"""
if loops is None or len(loops) == 0:
msg = "loops cannot be None."
raise ValueError(msg)
_values = np.array(loops, dtype="M8[D]")
self._values = _values
self._parse_loop_meta()
if sort:
self.sort(inplace=True)
def _parse_loop_meta(self) -> None:
self._dates = np.unique(self._values)
self._length = self._values.shape[0]
self._names = self.to_names()
def __str__(self) -> str:
"""Return the string representation of the loops."""
return f"TripletLoops({self._length})"
def __repr__(self) -> str:
"""Return the representation of the loops."""
return self.to_frame("dates").__repr__()
def __len__(self) -> int:
"""Return the number of loops."""
return self._length
def __eq__(self, other: TripletLoops) -> bool:
"""Compare whether the loops are same to another loops."""
return np.array_equal(self.values, other.values)
def __add__(self, other: TripletLoops) -> TripletLoops:
"""Return the union of the loops."""
_loops = np.union1d(self.names, other.names)
return TripletLoops.from_names(_loops)
def __sub__(self, other: TripletLoops) -> TripletLoops | None:
"""Return the difference of the loops."""
_loops = np.setdiff1d(self.names, other.names)
if len(_loops) > 0:
return TripletLoops.from_names(_loops)
return None
@overload
def __getitem__(self, index: int) -> TripletLoop: ...
@overload
def __getitem__(self, index: slice) -> TripletLoops: ...
def __getitem__( # noqa: PLR0911, PLR0912
self,
index: int | slice | datetime | Iterable[datetime],
) -> TripletLoop | TripletLoops:
"""Return the loop or loops by index or slice."""
if isinstance(index, slice):
start, stop, step = index.start, index.stop, index.step
if isinstance(start, (int, np.integer, type(None))) and isinstance(
stop,
(int, np.integer, type(None)),
):
if start is None:
start = 0
if stop is None:
stop = self._length
return TripletLoops(self._values[start:stop:step])
if isinstance(
start,
(datetime, np.datetime64, pd.Timestamp, str, type(None)),
) and isinstance(
stop,
(datetime, np.datetime64, pd.Timestamp, str, type(None)),
):
if isinstance(start, str):
start = DateManager.ensure_datetime(start)
if isinstance(stop, str):
stop = DateManager.ensure_datetime(stop)
if start is None:
start = self._dates[0]
if stop is None:
stop = self._dates[-1]
start, stop = (np.datetime64(start, "s"), np.datetime64(stop, "s"))
if start > stop:
msg = (
f"Index start {start} should be earlier than index stop {stop}."
)
raise ValueError(
msg,
)
_loops = []
for loop in self._values:
loop = loop.astype("M8[s]") # noqa: PLW2901
if np.all((start <= loop) & (loop <= stop)):
_loops.append(loop)
if len(_loops) > 0:
return TripletLoops(_loops)
return None
return None
if isinstance(index, (int, np.integer)):
if index >= self._length:
msg = (
f"Index {index} out of range. TripletLoops number "
f"is {self._length}."
)
raise IndexError(msg)
return TripletLoop(self._values[index])
if isinstance(index, (datetime, np.datetime64, pd.Timestamp, str)):
if isinstance(index, str):
try:
index = pd.to_datetime(index)
except Exception as e:
msg = f"String {index} cannot be converted to datetime."
raise ValueError(msg) from e
loops = [loop for loop in self._values if index in loop]
if len(loops) > 0:
return TripletLoops(loops)
return None
if isinstance(index, Iterable):
index = np.array(index)
return TripletLoops(self._values[index])
msg = (
f"Index should be int, slice, datetime, str, or bool or int array"
f"indexing, but got {type(index)}."
)
raise TypeError(msg)
def __hash__(self) -> int:
"""Return the hash of the loops."""
return hash("".join(self.names))
def __iter__(self) -> iter[TripletLoop]:
"""Iterate the loops."""
return iter(self.values)
def __contains__(self, item: TripletLoopLike) -> bool:
"""Check if the item is in the loops."""
if isinstance(item, TripletLoop):
item = item.to_numpy()
elif isinstance(item, str):
item = TripletLoop.from_name(item).to_numpy()
elif isinstance(item, Sequence):
item = np.sort(item)
else:
msg = f"item should be TripletLoop, str, or Sequence, but got {type(item)}."
raise TypeError(
msg,
)
return np.any(np.all(item == self.values, axis=1))
def __array__(self, dtype: DTypeLike = None) -> NDArray:
"""Return the loops as a numpy array."""
return np.asarray(self._values, dtype=dtype)
@property
def values(self) -> NDArray[np.datetime64]:
"""Return the values of the loops.
Returns
-------
values: np.ndarray
Values of the loops with format of datetime.
"""
return self._values
@property
def names(self) -> NDArray[np.str_]:
"""The names (sting format) of the loops."""
return self._names
@property
def dates(self) -> NDArray[np.datetime64]:
"""Sorted dates of the loops with format of datetime."""
return self._dates
@property
def shape(self) -> tuple[int, int]:
"""The shape of the loop array."""
return self._values.shape
@property
def pairs(self) -> Pairs:
"""All sorted pairs of the loops."""
pairs = np.unique(
np.vstack(
[self._values[:, :2], self._values[:, 1:], self._values[:, [0, 2]]],
),
axis=0,
)
return Pairs(pairs, sort=False)
@property
def pairs12(self) -> Pairs:
"""The first pairs of the loops."""
return Pairs(self._values[:, :2], sort=False)
@property
def pairs23(self) -> Pairs:
"""The second pairs of the loops."""
return Pairs(self._values[:, 1:], sort=False)
@property
def pairs13(self) -> Pairs:
"""The third pairs of the loops."""
return Pairs(self._values[:, [0, 2]], sort=False)
@property
def days12(self) -> NDArray[np.int64]:
"""The time span of the first pair in days."""
return (self._values[:, 1] - self._values[:, 0]).astype(int)
@property
def days23(self) -> NDArray[np.int64]:
"""The time span of the second pair in days."""
return (self._values[:, 2] - self._values[:, 1]).astype(int)
@property
def days13(self) -> NDArray[np.int64]:
"""The time span of the third pair in days."""
return (self._values[:, 2] - self._values[:, 0]).astype(int)
@property
def index(self) -> NDArray[np.int64]:
"""The index of the loops in dates coordinates."""
return np.searchsorted(self._dates, self._values)
[docs]
@classmethod
def from_names(
cls,
names: list[str],
parse_function: Callable | None = None,
date_args: dict | None = None,
) -> TripletLoops:
"""Initialize the loops class from a list of loop file names.
Parameters
----------
names: list
list of loop file names.
parse_function: Callable, optional
Function to parse the date strings from the loop file name.
If None, the loop file name will be split by '_' and
the last 3 items will be used. Default is None.
date_args: dict, optional
Keyword arguments for pd.to_datetime() to convert the date strings
to datetime objects. For example, {'format': '%Y%m%d'}.
Default is {}.
Returns
-------
loops: TripletLoops
unsorted TripletLoops object.
"""
if date_args is None:
date_args = {}
loops = []
for name in names:
loop = TripletLoop.from_name(name, parse_function, date_args)
loops.append(loop.values)
return cls(loops, sort=False)
[docs]
def to_names(self, prefix: str | None = None) -> NDArray[np.str_]:
"""Return the string name of each loop.
Parameters
----------
prefix: str, optional
Prefix of the output loop names. Default is None.
Returns
-------
names: np.ndarray
String names of the loops.
"""
names = (
pd.DatetimeIndex(self.values[:, 0]).strftime("%Y%m%d")
+ "_"
+ pd.DatetimeIndex(self.values[:, 1]).strftime("%Y%m%d")
+ "_"
+ pd.DatetimeIndex(self.values[:, 1]).strftime("%Y%m%d")
)
if prefix:
names = prefix + "_" + names
return names.to_numpy(dtype="S")
[docs]
def to_frame(self, target: Literal["pairs", "dates"] = "pairs") -> pd.DataFrame:
"""Return the loops as a DataFrame.
Parameters
----------
target: str, one of ['pairs', 'dates']
Target of the DataFrame. Default is 'pairs'.
"""
if target == "pairs":
return pd.DataFrame(
zip(self.pairs12.values, self.pairs23.values, self.pairs13.values),
columns=["pair12", "pair23", "pair13"],
)
if target == "dates":
return pd.DataFrame(self.values, columns=["date1", "date2", "date3"])
msg = f"target should be 'pairs' or 'dates', but got {target}."
raise ValueError(msg)
[docs]
def to_matrix(self) -> NDArray[np.int8]:
"""Return loop matrix (containing 1, -1, 0) from pairs.
Returns
-------
matrix: np.ndarray
TripletLoop matrix with the shape of (n_loop, n_pair). The values of each
loop/row in matrix are:
- 1: pair12 and pair23
- -1: pair13
- 0: otherwise
"""
n_loop = len(self)
n_pair = len(self.pairs)
matrix = np.zeros((n_loop, n_pair))
pairs_ls = self.pairs.values.tolist()
for i, loop in enumerate(self.values):
matrix[i, pairs_ls.index(loop[:2].tolist())] = 1
matrix[i, pairs_ls.index(loop[1:].tolist())] = 1
matrix[i, pairs_ls.index(loop[[0, 2]].tolist())] = -1
return matrix
[docs]
def where(
self,
loop: str | TripletLoop,
return_type: Literal["index", "mask"] = "index", # noqa: ARG002
) -> int | None:
"""Return the index of the loop.
Parameters
----------
loop: str or TripletLoop
TripletLoop name or TripletLoop object.
return_type: str, optional
Whether to return the index or mask of the loop. Default is 'index'.
"""
if isinstance(loop, str):
loop = TripletLoop.from_name(loop)
elif not isinstance(loop, TripletLoop):
msg = f"loop should be str or TripletLoop, but got {type(loop)}."
raise TypeError(msg)
# TODO: finish this function
[docs]
def sort(
self,
order: str | list = "pairs",
ascending: bool = True,
inplace: bool = True,
) -> tuple[TripletLoops, NDArray[np.int64]] | None:
"""Sort the loops.
Parameters
----------
order: str or list of str, optional
By which fields to sort the loops. this argument specifies
which fields to compare first, second, etc. Default is 'pairs'.
The available options are one or a list of:
- **date:**: 'date1', 'date2', 'date3'
- **pairs:** 'pairs12', 'pairs23', 'pairs13'
- **days:** 'days12', 'days23', 'days13'
- **short name:** 'date', 'pairs', 'days'. short name will be
treated as a combination of the above options. For example,
'date' is equivalent to ['date1', 'date2', 'date3'].
ascending: bool, optional
Whether to sort ascending. Default is True.
inplace: bool, optional
Whether to sort the loops inplace. Default is True.
Returns
-------
sorted: (TripletLoops, np.ndarray) | None
if inplace is True, return the sorted loops and the index of the
sorted loops in the original loops. Otherwise, return None.
"""
item_map = {
"date1": self._values[:, 0],
"date2": self._values[:, 1],
"date3": self._values[:, 2],
"pairs12": self.pairs12.values,
"pairs23": self.pairs23.values,
"pairs13": self.pairs13.values,
"days12": self.days12,
"days23": self.days23,
"days13": self.days13,
"date": self._values,
"pairs": np.hstack(
[self.pairs12.values, self.pairs23.values, self.pairs13.values],
),
"days": np.hstack([self.days12, self.days23, self.days13]),
}
if isinstance(order, str):
order = [order]
_values = []
for i in order:
if i not in item_map:
msg = (
f"order should be one of {list(item_map.keys())}, but got {order}."
)
raise ValueError(
msg,
)
_values.append(item_map[i])
_values = np.hstack(_values)
_, _index = np.unique(_values, axis=0, return_index=True)
if not ascending:
_index = _index[::-1]
if inplace:
self._values = self._values[_index]
self._parse_loop_meta()
return None
return TripletLoops(self._values[_index]), _index
[docs]
def to_seasons(self) -> NDArray[np.int8]:
"""Return the season of each loop.
Returns
-------
seasons: list
list of seasons of each loop.
0: not the same season
1: spring
2: summer
3: fall
4: winter
"""
seasons = []
for loop in self.values.astype("O"):
season1 = DateManager.season_of_month(loop[0].month)
season2 = DateManager.season_of_month(loop[1].month)
season3 = DateManager.season_of_month(loop[2].month)
if season1 == season2 == season3 and loop.days13 < 180:
seasons.append(season1)
else:
seasons.append(0)
return np.asarray(seasons, dtype=np.int8)
[docs]
class Loop:
"""Loop class containing multiple pairs/acquisitions."""
[docs]
def __init__(self, loop: Sequence[datetime]) -> None:
"""Initialize the Loop class.
loop: Sequence
Sequence object of dates. Each date is a datetime object.
For example, (date1, ..., date_n).
"""
self._values = np.asarray(loop).astype("M8[D]")
loop_dt = self._values.astype(datetime)
self._name = "_".join([i.strftime("%Y%m%d") for i in loop_dt])
num = len(self._values)
self._length = num
_pairs = []
for i in range(num - 1):
_pair = Pair([loop_dt[i], loop_dt[i + 1]])
_pairs.append(_pair)
_pairs.append(Pair([loop_dt[0], loop_dt[-1]]))
self._pairs = Pairs(_pairs, sort=False)
def __len__(self) -> int:
"""Return the number of pairs in the loop."""
return self._length
def __str__(self) -> str:
"""Return the name of the loop."""
return self._name
def __repr__(self) -> str:
"""Return the representation of the loop."""
return f"Loop({self._name})"
def __eq__(self, other: Loop) -> bool:
"""Compare whether the loop is same to another loop."""
return self.name == other.name
def __hash__(self) -> int:
"""Return the hash of the loop."""
return hash(self.name)
@property
def values(self) -> NDArray[np.datetime64]:
"""Return the values array of the loop.
Returns
-------
values: np.ndarray
dates of the loop with format of np.datetime64[D].
"""
return self._values
@property
def pairs(self) -> Pairs:
"""All pairs of the loop."""
return self._pairs
@property
def name(self) -> str:
"""The string format the loop.
Returns
-------
name: str
String of the loop.
"""
return self._name
[docs]
def from_name(
self,
name: str,
parse_function: Callable | None = None,
date_args: dict | None = None,
) -> Loop:
"""Initialize the loop class from a loop name.
Parameters
----------
name: str
Loop name.
parse_function: Callable, optional
Function to parse the date strings from the loop name.
If None, the loop name will be split by '_' and the
last 3 items will be used. Default is None.
date_args: dict, optional
Keyword arguments for pd.to_datetime() to convert the date strings
to datetime objects. For example, {'format': '%Y%m%d'}.
Default is None.
Returns
-------
loop: Loop
Loop object.
"""
if date_args is None:
date_args = {}
dates = DateManager.str_to_dates(name, 0, parse_function, date_args)
return self(dates)
[docs]
class Loops:
"""Loops class to handle loops with multiple acquisitions."""
[docs]
def __init__(self, loops: list, sort: bool = True) -> None:
"""Initialize the loops class.
Parameters
----------
loops: list
a list containing Loop objects.
sort: bool, optional
Whether to sort the loops. Default is True.
"""
self._loops = np.array(loops, dtype=object)
self._parse_loop_meta()
if sort:
self.sort()
def _parse_loop_meta(self) -> None:
self._length = len(self._loops)
self._names = np.array([i.name for i in self.loops])
self._pairs, self._edge_pairs, self._diagonal_pairs = self._parse_pairs()
def __str__(self) -> str:
"""Return the string representation of the loops."""
return (
f"Loops(loops={len(self)}, pairs={len(self.pairs)}, "
f"edge_pairs={len(self.edge_pairs)}, "
f"diagonal_pairs={len(self.diagonal_pairs)})"
)
def __repr__(self) -> str:
"""Return the representation of the loops."""
return (
"Loops("
f"\n loops={len(self)},"
f"\n pairs={len(self.pairs)},"
f"\n edge_pairs={len(self.edge_pairs)},"
f"\n diagonal_pairs={len(self.diagonal_pairs)}"
f"\n)"
)
def __len__(self) -> int:
"""Return the number of loops."""
return self._length
def __getitem__(self, index: int) -> Loop:
"""Return the loop by index."""
return self.loops[index]
def __iter__(self) -> iter[Loop]:
"""Iterate the loops."""
return iter(self.loops)
def __contains__(self, item: Loop) -> bool:
"""Check if the item is in the loops."""
return item in self.loops
def _parse_pairs(self) -> tuple[Pairs, Pairs, Pairs]:
"""Parse the pairs in the loops.
Returns
-------
(Pairs, Pairs, Pairs) : all pairs, edge pairs, and diagonal pairs
"""
pairs = []
edge_pairs = []
diagonal_pairs = []
for loop in self.loops:
pairs.extend(loop.pairs)
edge_pairs.extend(loop.pairs[:-1])
diagonal_pairs.extend(loop.pairs[-1:])
return (
Pairs(pairs, sort=True),
Pairs(edge_pairs, sort=True),
Pairs(diagonal_pairs, sort=True),
)
@property
def loops(self) -> NDArray[np.object_]:
"""The loops in the numpy array format."""
return self._loops
@property
def shape(self) -> tuple[int, int]:
"""The shape of the loops array with format of (n_loops, n_pairs)."""
return (len(self), len(self.pairs))
@property
def names(self) -> NDArray[np.str_]:
"""The names (str format) of the loops."""
return self._names
@property
def pairs(self) -> Pairs:
"""All pairs in the loops."""
return self._pairs
@property
def edge_pairs(self) -> Pairs:
"""All edge pairs in the loops."""
return self._edge_pairs
@property
def diagonal_pairs(self) -> Pairs:
"""All diagonal pairs in the loops."""
return self._diagonal_pairs
[docs]
def sort(self, ascending: bool = True, inplace: bool = True) -> Loops | None:
"""Sort the loops.
Parameters
----------
ascending: bool, optional
Whether to sort the loops ascending. Default is True.
inplace: bool, optional
Whether to sort the loops in place. if False, return the sorted loops.
Default is True.
"""
names = self.names
_, _index = np.unique(names, return_index=True)
if not ascending:
_index = _index[::-1]
if inplace:
self._loops = self._loops[_index]
self._parse_loop_meta()
return None
return Loops(self._loops[_index])
[docs]
def to_matrix(self, dtype: DTypeLike = None) -> NDArray[np.number]:
"""Return a design matrix describes the relationship between loops and pairs.
The rows and columns of this matrix are loops and pairs respectively.
The values of the matrix are 1 for the edge pairs, -1 for the diagonal
pairs, and 0 otherwise.
"""
loop_pairs_ls = [i.pairs.names for i in self.loops]
all_pairs = self.pairs.names
matrix = np.zeros((len(self), len(self.pairs)), dtype=dtype)
for i, loop_pairs in enumerate(loop_pairs_ls):
matrix[i][np.isin(all_pairs, loop_pairs[:-1])] = 1
matrix[i][np.isin(all_pairs, loop_pairs[-1])] = -1
return matrix