"""Pair and loop tools for InSAR time series analysis."""
from __future__ import annotations
from collections.abc import Iterable, Sequence
from datetime import datetime
from typing import TYPE_CHECKING, Callable, Literal, overload
import numpy as np
import pandas as pd
import xarray as xr
from xarray.core.indexes import Indexes
from faninsar._core.render import formatting_html
from faninsar.logging import setup_logger
from .acquisition import Acquisition, DateManager, DaySpan
if TYPE_CHECKING:
from numpy.typing import DTypeLike, NDArray
from faninsar import Loop, Loops, TripletLoops
from faninsar.typing import PairLike, PairsLike, PairsOrder
logger = setup_logger(log_name=__name__)
[docs]
class Pair:
"""Pair class for one pair."""
_values: np.ndarray
_name: str
_days: int
__slots__ = ["_days", "_name", "_values"]
[docs]
def __init__(
self,
pair: Sequence[datetime, datetime],
) -> None:
"""Initialize the Pair class.
Parameters
----------
pair: Sequence[datetime, datetime]
Sequence object of two dates. Each date is a datetime object.
For example, (date1, date2).
"""
self._values = np.sort(pair).astype("M8[D]")
self._name = "_".join(
[i.strftime("%Y%m%d") for i in self._values.astype("O")],
)
self._days = (self._values[1] - self._values[0]).astype(int)
def __str__(self) -> str:
"""Return the name of the pair."""
return self._name
def __repr__(self) -> str:
"""Return the representation of the pair."""
return f"Pair({self._name})"
def __eq__(self, other: Pair) -> bool:
"""Compare the pair with another pair."""
return self.name == other.name
def __hash__(self) -> int:
"""Return the hash of the pair."""
return hash(self._name)
def __array__(self, dtype: DTypeLike = None) -> np.ndarray:
"""Return the pair as a numpy array."""
return np.asarray(self._values, dtype=dtype)
@property
def values(self) -> NDArray[np.datetime64]:
"""The values of the pair with format of datetime."""
return self._values
@property
def name(self) -> str:
"""String of the pair with format of '%Y%m%d_%Y%m%d'."""
return self._name
@property
def days(self) -> int:
"""The time span of the pair in days."""
return self._days
@property
def primary(self) -> pd.Timestamp:
"""The primary dates of all pairs."""
return pd.Timestamp(self.values[0])
@property
def secondary(self) -> pd.Timestamp:
"""The secondary dates of all pairs."""
return pd.Timestamp(self.values[1])
[docs]
def primary_string(self, date_format: str = "%Y%m%d") -> str:
"""Return the primary dates of all pairs in string format.
Parameters
----------
date_format: str
Format of the date string. Default is '%Y%m%d'. See more at
`strftime Format Codes <https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes>`_.
"""
return pd.to_datetime(self.values[0]).strftime(date_format)
[docs]
def secondary_string(self, date_format: str = "%Y%m%d") -> str:
"""Return the secondary dates of all pairs in string format.
Parameters
----------
date_format: str
Format of the date string. Default is '%Y%m%d'. See more at
`strftime Format Codes <https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes>`_.
"""
return pd.to_datetime(self.values[1]).strftime(date_format)
[docs]
@classmethod
def from_name(
cls,
name: str,
parse_function: Callable | None = None,
date_args: dict | None = None,
) -> Pair:
"""Initialize the pair class from a pair name.
Parameters
----------
name: str
Pair name.
parse_function: Callable, optional
Function to parse the date strings from the pair name.
If None, the pair name will be split by '_' and
the last 2 items will be used. Default is None.
date_args: dict, optional
Keyword arguments for pd.to_datetime() to convert the date strings
to datetime objects. For example, {'format': '%Y%m%d'}.
Default is {}.
Returns
-------
pair: Pair
Pair class.
"""
if date_args is None:
date_args = {}
dates = DateManager.str_to_dates(name, 2, parse_function, date_args)
return cls(dates)
[docs]
def to_numpy(self, dtype: DTypeLike = None) -> NDArray[np.datetime64]:
"""Return the pair as a numpy array."""
return np.asarray(self._values, dtype=dtype)
[docs]
class Pairs:
"""Pairs class to handle pairs.
.. note::
- Each pair will be sorted in the acquisition order. The primary date
will be always earlier than the secondary date.
- The pairs will be sorted by the primary date.
Examples
--------
prepare dates and pairs for examples:
>>> dates = pd.date_range("20130101", "20231231").values
>>> n = len(dates)
>>> pair_ls = []
>>> loop_ls = []
>>> for i in range(5):
... np.random.seed(i)
... pair_ls.append(dates[np.random.randint(0, n, 2)])
initialize pairs from a list of pairs
>>> pairs = Pairs(pair_ls)
>>> print(pairs)
primary secondary
0 2013-06-24 2016-02-21
1 2013-08-24 2015-11-28
2 2017-08-16 2018-03-14
3 2020-01-20 2021-11-15
4 2020-02-21 2020-06-25
select pairs by date slice
>>> pairs1 = pairs["2018-03-09":]
>>> print(pairs1)
primary secondary
0 2020-01-20 2021-11-15
1 2020-02-21 2020-06-25
pairs can be added (union) and subtracted (difference)
>>> pairs2 = pairs - pairs1
>>> pairs3 = pairs1 + pairs2
>>> print(pairs2)
primary secondary
0 2013-06-24 2016-02-21
1 2013-08-24 2015-11-28
2 2017-08-16 2018-03-14
>>> print(pairs3)
primary secondary
0 2013-06-24 2016-02-21
1 2013-08-24 2015-11-28
2 2017-08-16 2018-03-14
3 2020-01-20 2021-11-15
4 2020-02-21 2020-06-25
pairs can be compared with `==`and `!=`
>>> print(pairs3 == pairs)
>>> print(pairs3 != pairs)
True
False
"""
_values: np.ndarray
_dates: np.ndarray
_length: int
_edge_index: np.ndarray
_names: np.ndarray
__slots__ = ["_dates", "_edge_index", "_length", "_names", "_values"]
[docs]
def __init__(
self,
pairs: Sequence[Sequence[datetime, datetime]] | Sequence[Pair],
sort: bool = True,
) -> None:
"""Initialize the pairs class.
Parameters
----------
pairs: Sequence
Sequence object of pairs. Each pair is an Sequence or Pair
object of two dates with format of datetime. For example,
[(date1, date2), ...].
sort: bool, optional
Whether to sort the pairs. Default is True.
"""
if pairs is None:
pairs = []
_values = np.array(pairs, dtype="M8[D]")
self._values = self._format_pair_values(_values)
self._parse_pair_meta()
if sort:
self.sort(inplace=True)
def _format_pair_values(
self,
values: NDArray[np.datetime64],
) -> NDArray[np.datetime64]:
"""Format the pair values.
This function is used to make sure the primary date is earlier than the
secondary date.
"""
if values.ndim == 1:
values = values.reshape(-1, 2)
idx = values[:, 0] > values[:, 1]
values[idx] = values[idx, ::-1]
return values
def _parse_pair_meta(self) -> None:
self._dates = np.unique(self._values.flatten())
self._length = self._values.shape[0]
self._edge_index = np.searchsorted(self._dates, self._values)
self._names = self.to_names()
def __len__(self) -> int:
"""Return the number of pairs."""
return self._length
def __str__(self) -> str:
"""Return the string representation of the pairs."""
return f"Pairs({self._length})"
def __repr__(self) -> str:
"""Return the representation of the pairs."""
return self._to_frame().__repr__()
def _repr_html_(self) -> str:
"""Return the HTML representation of the pairs."""
return formatting_html.pairs_repr(self)
def __eq__(self, other: Pairs) -> bool:
"""Compare the pairs with another pairs."""
return np.array_equal(self.values, other.values)
def __add__(self, other: Pairs) -> Pairs:
"""Return the unique, sorted union of the pairs."""
_pairs = np.union1d(self.names, other.names)
return Pairs.from_names(_pairs)
def __sub__(self, other: Pairs) -> Pairs:
"""Return the difference of the pairs."""
_pairs = np.setdiff1d(self.names, other.names)
return Pairs.from_names(_pairs)
@overload
def __getitem__(self, index: int | np.integer) -> Pair: ...
@overload
def __getitem__(self, index: slice) -> Pairs | None: ...
def __getitem__( # noqa: PLR0911, PLR0912
self,
index: int | slice | datetime | str | PairLike | Iterable,
) -> Pair | Pairs | None:
"""Return the pair or pairs by index or slice."""
if isinstance(index, slice):
start, stop, step = index.start, index.stop, index.step
if isinstance(start, (int, np.integer, type(None))) and isinstance(
stop,
(int, np.integer, type(None)),
):
if start is None:
start = 0
if stop is None:
stop = self._length
return Pairs(self._values[start:stop:step])
if isinstance(
start,
(datetime, np.datetime64, pd.Timestamp, str, type(None)),
) and isinstance(
stop,
(datetime, np.datetime64, pd.Timestamp, str, type(None)),
):
if isinstance(start, str):
start = DateManager.ensure_datetime(start)
if isinstance(stop, str):
stop = DateManager.ensure_datetime(stop)
if start is None:
start = self._dates[0]
if stop is None:
stop = self._dates[-1]
start, stop = (np.datetime64(start, "s"), np.datetime64(stop, "s"))
if start > stop:
msg = (
f"Index start {start} should be earlier than index stop {stop}."
)
raise ValueError(msg)
_pairs = []
for pair in self._values:
pair = pair.astype("M8[s]") # noqa: PLW2901
if start <= pair[0] <= stop and start <= pair[1] <= stop:
_pairs.append(pair)
if len(_pairs) > 0:
return Pairs(_pairs)
return None
msg = f"Unsupported slice index {index} for Pairs."
logger.warning(msg)
return None
if isinstance(index, (int, np.integer)):
if index >= self._length:
msg = f"Index {index} out of range. Pairs number is {self._length}."
raise IndexError(
msg,
)
return Pair(self._values[index])
if isinstance(index, (datetime, np.datetime64, pd.Timestamp, str)):
if isinstance(index, str):
try:
index = pd.to_datetime(index)
except Exception as e:
msg = f"String {index} cannot be converted to datetime."
raise ValueError(msg) from e
pairs = [pair for pair in self._values if index in pair]
if len(pairs) > 0:
return Pairs(pairs)
return None
if isinstance(index, Iterable):
index = np.array(index)
return Pairs(self._values[index])
msg = (
"Index should be int, slice, datetime, str, or bool or int array"
f" indexing, but got {type(index)}."
)
raise TypeError(
msg,
)
def __hash__(self) -> int:
"""Return the hash of the pairs."""
return hash("".join(self.names))
def __iter__(self) -> iter[Pair]:
"""Iterate the pairs."""
pairs_ls = [Pair(i) for i in self._values]
return iter(pairs_ls)
def __array__(self, dtype: DTypeLike = None) -> NDArray:
"""Return the pairs as a numpy array."""
return np.asarray(self._values, dtype=dtype)
def _to_frame(self) -> pd.DataFrame:
"""Return the pairs as a DataFrame."""
pairs_str = "Pairs"
columns = pd.MultiIndex.from_tuples(
[(pairs_str, "primary"), (pairs_str, "secondary")],
)
return pd.DataFrame(self._values, columns=columns)
def _ensure_pairs(
self,
pairs: str | Pair | Pairs | Sequence[str] | Sequence[Pair],
) -> Pairs:
"""Ensure the pairs are in the Pairs object."""
if isinstance(pairs, str):
pairs = Pairs.from_names([pairs])
elif isinstance(pairs, Pair):
pairs = Pairs([pairs])
elif isinstance(pairs, Pairs):
return pairs
elif isinstance(pairs, Sequence):
pairs = np.asarray(pairs)
if pairs.ndim == 1:
pairs = Pairs.from_names(pairs)
elif pairs.ndim == 2 and pairs.shape[1] == 2:
pairs = Pairs(pairs)
else:
msg = (
"pairs should be 1D array of str, 2D array of datetime, "
f"or Pairs, but got {pairs}."
)
raise ValueError(msg)
else:
msg = (
"pairs should be str, Pair, list of str, list of Pair, "
f"or Pairs, but got {type(pairs)}."
)
raise TypeError(msg)
return pairs
@property
def values(self) -> NDArray[np.datetime64]:
"""The numpy array of the pairs."""
return self._values
@property
def names(self) -> NDArray[np.str_]:
"""The names (string format) of the pairs."""
return self._names
@property
def dates(self) -> Acquisition:
"""The sorted dates array of all pairs in type of np.datetime64[D]."""
return Acquisition(pd.to_datetime(self._dates))
@property
def days(self) -> DaySpan:
"""The time span of all pairs in days."""
days = (self._values[:, 1] - self._values[:, 0]).astype(int)
return DaySpan(days, dtype=np.int32)
@property
def primary(self) -> Acquisition:
"""The primary dates of all pairs."""
if len(self._values) == 0:
return Acquisition([])
return Acquisition(pd.to_datetime(self._values[:, 0]))
@property
def secondary(self) -> Acquisition:
"""The secondary dates of all pairs."""
if len(self._values) == 0:
return Acquisition([])
return Acquisition(pd.to_datetime(self._values[:, 1]))
@property
def xindexes(self) -> Indexes:
"""The xarray indexes of the pairs."""
indexes = {
"primary": self.primary,
"secondary": self.secondary,
"days": self.days,
"dates": self.dates,
}
variables = {
"primary": self.primary.to_xarray(),
"secondary": self.secondary.to_xarray(),
"days": self.days.to_xarray(),
"dates": self.dates.to_xarray(),
}
return Indexes(indexes=indexes, variables=variables, index_type=pd.Index)
[docs]
def primary_string(self, date_format: str = "%Y%m%d") -> pd.Index:
"""Return the primary dates of all pairs in string format.
Parameters
----------
date_format: str
Format of the date string. Default is '%Y%m%d'. See more at
`strftime Format Codes <https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes>`_.
"""
return self.primary.strftime(date_format)
[docs]
def secondary_string(self, date_format: str = "%Y%m%d") -> pd.Index:
"""Return the secondary dates of all pairs in string format.
Parameters
----------
date_format: str
Format of the date string. Default is '%Y%m%d'. See more at
`strftime Format Codes <https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes>`_.
"""
return self.secondary.strftime(date_format)
@property
def edge_index(self) -> NDArray[np.int64]:
"""The index of the pairs in the dates coordinate.
This is useful to construct the edge index in graph theory.
"""
return self._edge_index
@property
def shape(self) -> tuple[int, int]:
"""The shape of the pairs array."""
return self._values.shape
[docs]
@classmethod
def from_names(
cls,
names: Sequence[str | None],
parse_function: Callable | None = None,
date_args: dict | None = None,
) -> Pairs:
"""Initialize the pair class from a pair name.
.. note::
The pairs will be in the order of the input names. If you want to
sort the pairs, you can call the :meth:`Pairs.sort()` method to
achieve it.
Parameters
----------
names: str
Pair names.
parse_function: Callable, optional
Function to parse the date strings from the pair name.
If None, the pair name will be split by '_' and
the last 2 items will be used. Default is None.
date_args: dict, optional
Keyword arguments for pd.to_datetime() to convert the date strings
to datetime objects. For example, {'format': '%Y%m%d'}.
Default is {}.
Returns
-------
pairs: Pairs
unsorted Pairs object.
"""
if date_args is None:
date_args = {}
pairs = []
# No names in list
if len(names) == 0:
return cls(pairs)
# Parse the names
for name in names:
pair = Pair.from_name(name, parse_function, date_args)
pairs.append(pair.values)
return cls(pairs, sort=False)
[docs]
def where(
self,
pairs: list[str] | list[Pair] | Pairs,
return_type: Literal["index", "mask"] = "index",
) -> NDArray[np.int64 | np.bool_]:
"""Return the index of the pairs.
Parameters
----------
pairs: list of str or Pair, or Pairs
Pair names or Pair objects, or Pairs object.
return_type: str, optional
Whether to return the index or mask of the pairs. Default is 'index'.
"""
if return_type not in ["index", "mask"]:
msg = (
"return_type should be one of ['index', 'mask'], "
f"but got {return_type}."
)
raise ValueError(msg)
pairs = self._ensure_pairs(pairs)
con = np.isin(self.names, pairs.names)
if return_type == "index":
con = np.where(con)[0]
return con
[docs]
def intersect(self, pairs: PairLike) -> Pairs | None:
"""Return the intersection of the pairs.
The pairs both in self and input pairs.
Parameters
----------
pairs: list of str or Pair, or Pairs
Pair names or Pair objects, or Pairs object.
"""
pairs = self._ensure_pairs(pairs)
return self[self.where(pairs)]
[docs]
def union(self, pairs: list[str] | list[Pair] | Pairs) -> Pairs:
"""Return the unique, sorted union of the pairs.
All pairs that in self and input pairs. Same as addition.
Parameters
----------
pairs: list of str or Pair, or Pairs
Pair names or Pair objects, or Pairs object.
"""
pairs = self._ensure_pairs(pairs)
return self + pairs
[docs]
def difference(self, pairs: PairsLike) -> Pairs | None:
"""Return the difference of the pairs.
The pairs in self but not in pairs. Same as subtraction.
Parameters
----------
pairs: list of str or Pair, or Pairs
Pair names or Pair objects, or Pairs object.
"""
pairs = self._ensure_pairs(pairs)
return self - pairs
[docs]
def copy(self) -> Pairs:
"""Return a copy of the pairs."""
return Pairs(self._values.copy())
[docs]
def sort(
self,
order: PairsOrder = "pairs",
ascending: bool = True,
inplace: bool = True,
) -> tuple[Pairs, NDArray[np.int64]] | None:
"""Sort the pairs.
Parameters
----------
order: str or list of str, optional
By which fields to sort the pairs. This argument specifies
which fields to compare first, second, etc. Default is 'pairs'.
The available options are one or a list of:
['pairs', 'primary', 'secondary', 'days'].
ascending: bool, optional
Whether to sort ascending. Default is True.
inplace: bool, optional
Whether to sort the pairs inplace. Default is True.
Returns
-------
None or (Pairs, np.ndarray). if inplace is True, return the sorted pairs
and the index of the sorted pairs in the original pairs. Otherwise,
return None.
"""
# for null pairs
if len(self) == 0:
if inplace:
return None
return self
# for valid pairs
item_map = {
"pairs": self._values,
"primary": self._values[:, 0],
"secondary": self._values[:, 1],
"days": self.days.data,
}
if isinstance(order, str):
order = [order]
_values_ = []
for i in order:
if i not in item_map:
msg = (
f"order should be one of {list(item_map.keys())}, but got {order}."
)
raise ValueError(
msg,
)
_values_.append(item_map[i].reshape(self._length, -1))
_values_ = np.hstack(_values_)
_values, _index = np.unique(_values_, axis=0, return_index=True)
if not ascending:
_index = _index[::-1]
if inplace:
self._values = _values
self._parse_pair_meta()
return None
return Pairs(_values), _index
[docs]
def to_names(self, prefix: str | None = None) -> NDArray[np.str_]:
"""Generate pairs names string with prefix.
Parameters
----------
prefix: str
Prefix of the pair file names. Default is None.
Returns
-------
names: np.ndarray
Pairs names string with format of '%Y%m%d_%Y%m%d'.
"""
names = (
pd.DatetimeIndex(self.primary).strftime("%Y%m%d")
+ "_"
+ pd.DatetimeIndex(self.secondary).strftime("%Y%m%d")
)
if prefix:
names = prefix + "_" + names
return names.to_numpy(dtype=np.str_)
[docs]
def to_numpy(self, dtype: DTypeLike = None) -> NDArray[np.datetime64]:
"""Return the pairs as a numpy array."""
return np.asarray(self._values, dtype=dtype)
[docs]
def to_frame(self) -> pd.DataFrame:
"""Return the pairs as a DataFrame."""
frame = pd.DataFrame()
frame["primary"] = self.primary
frame["secondary"] = self.secondary
frame["days"] = self.days
return frame
[docs]
def to_xarray(self) -> pd.DataFrame:
"""Return the pairs as a xarray DataArray."""
return xr.Variable(dims=["pairs", "primary-secondary"], data=self._values)
[docs]
def to_triplet_loops(self) -> TripletLoops:
"""Return all possible triplet loops from the pairs."""
from faninsar import TripletLoops
loops = []
for i, pair12 in enumerate(self._values):
for pair23 in self._values[i + 1 :]:
if pair12[1] == pair23[0] and Pair([pair12[0], pair23[1]]) in self:
loops.append([pair12[0], pair12[1], pair23[1]]) # noqa: PERF401
return TripletLoops(loops)
[docs]
def to_loops(
self,
max_acquisition: int = 5,
max_days: int | None = None,
edge_pairs: Pairs | None = None,
edge_days: int | None = None,
) -> Loops:
r"""Return all possible loops from the pairs.
.. important::
The pairs in the loops may be fewer than the input pairs. You can use
the :meth:`Pairs.where` method to get the index/mask of the pairs
in the loops from the input pairs.
**Example**:
>>> loops = pairs.to_loops()
>>> mask = pairs.where(loops.pairs, return_type="mask")
Parameters
----------
max_acquisition: int
The maximum number of acquisitions in the loops. It should be at least 3.
.. note::
The number of acquisitions is equal to the number of intervals + 1
:math:`n (acquisition) = n (edge\ pairs) + 1 (diagonal\ pair) =
n (intervals) + 1`.
max_days: int, optional
The maximum number of days for the pairs in the loops. If None, all
available pairs will be used. Default is None.
edge_pairs: Pairs, optional
The edge pairs to form loops. If None, ``edge_days`` must be provided.
edge_days: int, optional
The maximum number of days used to identify the edge pairs. If None,
``edge_pairs`` must be provided. Default is None.
.. note::
This parameter will be ignored if ``edge_pairs`` is provided.
"""
# a list containing all loops
loops = []
if edge_pairs is None:
if edge_days is None:
msg = "Either edge_days or edge_pairs should be provided."
raise ValueError(msg)
edge_pairs = self[self.days <= edge_days]
# find valid diagonal pairs
for i in self - edge_pairs:
if max_days is not None and i.days > max_days:
continue
if not valid_diagonal_pair(i, self, edge_pairs):
continue
start_date, end_date = i.values[0], i.values[1]
# find valid primary pairs
mask_primaries = (self.primary == start_date) & (self.secondary < end_date)
if not mask_primaries.any():
continue
pairs_primary = self[mask_primaries]
# initialize a loop with the primary acquisition
loop = [start_date]
# find all loops for all primary acquisitions
find_loops(
self,
loops,
loop,
pairs_primary,
end_date,
edge_pairs,
max_acquisition,
)
from faninsar import Loops
return Loops(loops)
[docs]
def to_matrix(self, dtype: DTypeLike = None) -> NDArray[np.number]:
"""Return the SBAS matrix.
Parameters
----------
matrix: np.ndarray
SBAS matrix in shape of (n_pairs, n_dates-1). The dates between
pairs are set to 1, otherwise 0.
dtype: np.dtype, optional
Data type of the matrix. Default is None.
"""
matrix = np.zeros((len(self), len(self.dates) - 1), dtype=dtype)
col_idxs = self.edge_index.copy()
for row_idx, col_idx in enumerate(col_idxs):
matrix[row_idx, col_idx[0] : col_idx[1]] = 1
return matrix
[docs]
def parse_gaps(self, pairs_removed: Pairs | None = None) -> pd.DatetimeIndex:
"""Parse network gaps where the acquisitions are not connected by pairs.
The gaps are detected by the dates that are not present in the secondary
acquisition of the pairs.
.. note::
Theoretically, the gaps should be the temporal spans (or intervals)
between the consecutive acquisitions. For simplicity, the end dates
of the gaps are returned here.
Parameters
----------
pairs_removed: Pairs, optional
Pairs that are removed from the original pairs. Default is None,
which means all pairs are used.
Returns
-------
gaps : pd.DatetimeIndex
Acquisition/date gaps that are not covered by any pairs.
"""
dates = self.dates[1:]
pairs_valid = self - pairs_removed if pairs_removed is not None else self
dates_secondary = np.unique(pairs_valid.secondary)
return np.setdiff1d(dates, dates_secondary)
[docs]
class PairsFactory:
"""A class used to generate interferometric pairs for InSAR processing."""
[docs]
def __init__(self, dates: Sequence, **kwargs) -> None:
"""Initialize the PairGenerator class.
Parameters
----------
dates: Sequence
Sequence object that contains the dates. Can be any object that
accepted by :func:`pd.to_datetime`
kwargs: dict, optional
Keyword arguments passed to :func:`pd.to_datetime`
"""
self.dates = pd.to_datetime(dates, **kwargs).unique().sort_values()
[docs]
def from_interval(self, max_interval: int = 2, max_day: int = 180) -> Pairs:
"""Generate interferometric pairs by SAR acquisition interval.
SAR acquisition interval is defined as the number of SAR acquisitions
between two SAR acquisitions.
.. admonition:: Example
If the SAR acquisition interval is 2, then the interferometric pairs
will be generated between SAR acquisitions with interval of 1 and 2.
This will be useful to generate interferometric pairs with different
temporal baselines.
Parameters
----------
max_interval: int
max interval between two SAR acquisitions for interferometric pair.
interval is defined as the number of SAR acquisitions between two SAR
acquisitions.
max_day:int
max day between two SAR acquisitions for interferometric pair
Returns
-------
pairs: Pairs object
"""
num = len(self.dates)
_pairs = []
for i, date in enumerate(self.dates):
n_interval = 1
while n_interval <= max_interval:
if i + n_interval < num:
if (self.dates[i + n_interval] - date).days < max_day:
pair = (date, self.dates[i + n_interval])
_pairs.append(pair)
n_interval += 1
else:
break
else:
break
return Pairs(_pairs)
[docs]
def linking_winter(
self,
winter_start: str = "0101",
winter_end: str = "0331",
n_per_winter: int = 5,
max_winter_interval: int = 1,
) -> Pairs:
"""Generate interferometric pairs linking winter in each year.
winter is defined by month and day for each year. For instance,
winter_start='0101', winter_end='0331' means the winter is from Jan 1 to
Mar 31 for each year in the time series. This will be useful to add pairs
for completely frozen period across years in permafrost region.
Parameters
----------
winter_start, winter_end: str
start and end date for the winter which expressed as month and day
with format '%m%d'
n_per_winter: int
how many dates will be used for each winter. Those dates will be
selected randomly in each winter. Default is 5
max_winter_interval: int
max interval between winters for interferometric pair. If
max_winter_interval=1, hen the interferometric pairs will be generated
between neighboring winters.
Returns
-------
pairs: Pairs object
"""
years = sorted(set(self.dates.year))
df_dates = pd.Series(self.dates, index=self.dates)
# check if period_start and period_end are in the same year. If not,
# the period_end should be in the next year
same_year = int(winter_start) < int(winter_end)
# randomly select n_per_period dates in each period/year
date_years = []
for year in years:
start = pd.to_datetime(f"{year}{winter_start}", format="%Y%m%d")
if same_year:
end = pd.to_datetime(f"{year}{winter_end}", format="%Y%m%d")
else:
end = pd.to_datetime(f"{year + 1}{winter_end}", format="%Y%m%d")
dt_year = df_dates[start:end]
if len(dt_year) > 0:
np.random.default_rng().shuffle(dt_year)
date_years.append(dt_year[:n_per_winter].to_list())
n_years = len(date_years)
_pairs = []
for i, date_year in enumerate(date_years):
# primary/reference dates
for date_primary in date_year:
# secondary dates
for j in range(1, max_winter_interval + 1):
if i + j < n_years:
for date_secondary in date_years[i + j]:
_pairs.append((date_primary, date_secondary)) # noqa: PERF401
return Pairs(_pairs)
[docs]
def from_period(
self,
period_start: str = "0101",
period_end: str = "0331",
n_per_period: int | None = None,
n_primary_period: str | None = None,
primary_years: list[int] | None = None,
) -> Pairs:
"""Generate interferometric pairs between periods for all years.
period is defined by month and day for each year. For example,
period_start='0101', period_end='0331' means the period is from Dec 1
to Mar 31 for each year in the time series. This function will randomly
select n_per_period dates in each period and generate interferometric
pairs between those dates. This will be useful to mitigate the temporal
cumulative bias.
Parameters
----------
period_start, period_end: str
start and end date for the period which expressed as month and day
with format '%m%d'
n_per_period: int | None
how many dates will be used for each period. Those dates will be
selected randomly in each period. If None, all dates in the period
will be used. Default is None.
n_primary_period: int, optional
how many periods/years used as primary date of ifg. For example, if
n_primary_period=2, then the interferometric pairs will be generated
between the first two periods and the rest periods. If None, all
periods will be used. Default is None.
primary_years: list, optional
years used as primary date of ifg. If None, all years in the time
series will be used. Default is None.
Returns
-------
pairs: Pairs object
"""
years = sorted(set(self.dates.year))
df_dates = pd.Series(self.dates, index=self.dates)
# check if period_start and period_end are in the same year. If not,
# the period_end should be in the next year
same_year = int(period_start) < int(period_end)
# randomly select n_per_period dates in each period/year
date_years = []
for year in years:
start = pd.to_datetime(f"{year}{period_start}", format="%Y%m%d")
if same_year:
end = pd.to_datetime(f"{year}{period_end}", format="%Y%m%d")
else:
end = pd.to_datetime(f"{year + 1}{period_end}", format="%Y%m%d")
dt_year = df_dates[start:end]
if (n_year := len(dt_year)) > 0:
n = n_year if n_per_period is None else n_per_period
np.random.default_rng().shuffle(dt_year)
date_years.append(dt_year[:n].to_list())
# generate interferometric pairs between primary period and the rest periods
_pairs = []
for i, date_year in enumerate(date_years):
# only generate pairs for n_primary_period
if n_primary_period is not None and i + 1 > n_primary_period:
break
if primary_years is not None and years[i] not in primary_years:
continue
for date_primary in date_year:
# all rest periods
for date_year1 in date_years[i + 1 :]:
for date_secondary in date_year1:
pair = (date_primary, date_secondary)
_pairs.append(pair)
return Pairs(_pairs)
[docs]
def from_summer_winter(
self,
summer_start: str = "0801",
summer_end: str = "1001",
winter_start: str = "1201",
winter_end: str = "0331",
) -> Pairs:
"""Generate interferometric pairs between summer and winter in each year.
summer and winter are defined by month and day for each year. For example,
summer_start='0801', summer_end='1001' means the summer is from Aug 1to
Oct 1 for each year in the time series. This will be useful to add pairs
for whole thawing and freezing process.
Parameters
----------
summer_start, summer_end: str
start and end date for the summer which expressed as month and day
with format '%m%d'
winter_start, winter_end: str
start and end date for the winter which expressed as month and day
with format '%m%d'
Returns
-------
Pairs object
"""
years = sorted(set(self.dates.year))
df_dates = pd.Series(self.dates.strftime("%Y%m%d"), index=self.dates)
_pairs = []
for year in years:
s_start = pd.to_datetime(f"{year}{summer_start}", format="%Y%m%d")
s_end = pd.to_datetime(f"{year}{summer_end}", format="%Y%m%d")
if int(winter_start) > int(summer_end):
w_start1 = pd.to_datetime(f"{year - 1}{winter_start}", format="%Y%m%d")
w_start2 = pd.to_datetime(f"{year}{winter_start}", format="%Y%m%d")
if int(winter_end) > int(summer_end):
w_end1 = pd.to_datetime(f"{year - 1}{winter_end}", format="%Y%m%d")
w_end2 = pd.to_datetime(f"{year}{winter_end}", format="%Y%m%d")
else:
w_end1 = pd.to_datetime(f"{year}{winter_end}", format="%Y%m%d")
w_end2 = pd.to_datetime(f"{year + 1}{winter_end}", format="%Y%m%d")
else:
w_start1 = pd.to_datetime(f"{year}{winter_start}", format="%Y%m%d")
w_start2 = pd.to_datetime(f"{year + 1}{winter_start}", format="%Y%m%d")
w_end1 = pd.to_datetime(f"{year}{winter_end}", format="%Y%m%d")
w_end2 = pd.to_datetime(f"{year + 1}{winter_end}", format="%Y%m%d")
dt_winter1 = df_dates[w_start1:w_end1].to_list()
dt_summer = df_dates[s_start:s_end].to_list()
dt_winter2 = df_dates[w_start2:w_end2].to_list()
# thawing process
if len(dt_winter1) > 0 and len(dt_summer) > 0:
for dt_w1 in dt_winter1:
for dt_s in dt_summer:
pair = (dt_w1, dt_s)
_pairs.append(pair)
# freezing process
if len(dt_winter2) > 0 and len(dt_summer) > 0:
for dt_w2 in dt_winter2:
for dt_s in dt_summer:
pair = (dt_s, dt_w2)
_pairs.append(pair)
return Pairs(_pairs)
# TODO: accelerate the process using numba or cython
def find_loops(
loops_pairs: Pairs,
loops: list[Loop],
loop: Loop,
pairs_left: Pairs,
end_date: datetime,
edge_pairs: Pairs,
max_acquisition: int = 5,
) -> None:
"""Recursively find all available loops within pairs_left and end_date."""
from faninsar import Loop
for pair_left in pairs_left:
if pair_left not in edge_pairs:
continue
m_candidate = (
(loops_pairs.primary == pair_left.secondary)
& (loops_pairs.secondary <= end_date)
& (loops_pairs.where(edge_pairs, return_type="mask"))
)
if not m_candidate.any():
continue
pairs_candidate = loops_pairs[m_candidate]
for pair_middle in pairs_candidate:
loop_i = loop.copy()
loop_i.append(pair_middle.primary)
if pair_middle.secondary == end_date:
loop_i.append(pair_middle.secondary)
loops.append(Loop(loop_i))
elif len(loop_i) + 1 < max_acquisition:
find_loops(
loops_pairs,
loops,
loop_i,
pairs_candidate,
end_date,
edge_pairs,
max_acquisition,
)
def valid_diagonal_pair(
pair: Pair,
pairs: Pairs,
edge_pairs: int | None = None,
) -> bool:
"""Check if the pair is a valid diagonal pair.
A valid diagonal pair can be used to form a loop using the edge pairs. This
function is just a simple check to see if days of diagonal pair is less than
the sum of edge pairs.
Parameters
----------
pair: Pair
Pair to be checked if it is a valid diagonal pair.
pairs: Pairs
All pairs of the loop.
edge_pairs: int, optional
The edge pairs to form loops.
Returns
-------
valid: bool
Whether the pair is a valid diagonal pair.
"""
valid = False
start_date, end_date = pair.values
mask_edge = (
(pairs.primary >= start_date)
& (pairs.secondary <= end_date)
& (pairs.where(edge_pairs, return_type="mask"))
)
if not mask_edge.any():
return False
_edge_pairs = pairs[mask_edge]
if _edge_pairs.days.values.sum() >= pair.days:
valid = True
return valid
if __name__ == "__main__":
names = [
"20170111_20170204",
"20170111_20170222",
"20170111_20170318",
"20170204_20170222",
"20170204_20170318",
"20170204_20170330",
"20170222_20170318",
"20170222_20170330",
"20170222_20170411",
"20170318_20170330",
]
pairs = Pairs.from_names(names)
loops = pairs.to_loops()
# pairs1 = loops.pairs