Source code for faninsar.query.points

"""A module for handling points query."""

from __future__ import annotations

import warnings
from collections.abc import Iterator
from collections.abc import Sequence as SequenceABC
from typing import TYPE_CHECKING, Sequence

import geopandas as gpd
import numpy as np
import pandas as pd
from rasterio.crs import CRS
from rasterio.warp import transform as warp_transform

if TYPE_CHECKING:
    from pathlib import Path

    from faninsar.typing import CrsLike


[docs] class Points: """A class to represent a collection of points. Examples -------- >>> pts = Points([[1, 2], [2, 3], [3, 4]]) >>> pt = pts[1] set difference of two Points: >>> pts - pt Points: x y 0 1.0 2.0 1 3.0 4.0 [count=2, crs='None'] set union of two Points: >>> pts + Points([1, 5]) Points: x y 0 1.0 2.0 1 2.0 3.0 2 3.0 4.0 3 1.0 5.0 [count=4, crs='None'] in operator: >>> pts[1] in pts True >>> Points([1, 5]) in pts False convert to numpy array using ``np.array``: >>> np.array(pts, dtype=np.int16) array([[1, 2], [2, 3], [3, 4]], dtype=int16) extract x and y coordinates: >>> pts.x array([1., 2., 3.], dtype=float32) >>> pts.y array([2., 3., 4.], dtype=float32) extract values: >>> pts.values array([[1., 2.], [2., 3.], [3., 4.]], dtype=float32) convert to GeoDataFrame: >>> pts.to_GeoDataFrame() x y geometry 0 1.0 2.0 POINT (1.00000 2.00000) 1 2.0 3.0 POINT (2.00000 3.00000) 2 3.0 4.0 POINT (3.00000 4.00000) """ _values: np.ndarray _crs: CRS | str | None __slots__ = ["_crs", "_values"]
[docs] def __init__( self, points: Sequence[float | Sequence[float]], crs: CrsLike = None, dtype: np.dtype = np.float32, ) -> None: """Initialize a Points object. Parameters ---------- points : Sequence[float | Sequence[float]] The points to be sampled. The shape of the points can be (2) or (n, 2) where n is the number of points. The first column is the x coordinate and the second column is the y coordinate. if the shape is (2), the points will be reshaped to (1, 2). crs: Any, optional, default: None The coordinate reference system of the points. Can be any object that can be passed to :meth:`pyproj.crs.CRS.from_user_input`. dtype : np.dtype, optional The data type of the points. Default is np.float32. Raises ------ ValueError If the shape of the points is not (n, 2). """ self._values = np.asarray(points, dtype=dtype) self._crs = crs if self._values.ndim == 1: self._values = self._values.reshape(1, -1) if self._values.ndim != 2 or self._values.shape[1] != 2: msg = f"points must be a 2D array with 2 columns. Got {self._values}" raise ValueError(msg)
def __len__(self) -> int: """Return the number of points.""" return self._values.shape[0] def __iter__(self) -> Iterator: """Return an iterator of the points.""" yield from self._values def __getitem__(self, key: int) -> Points: """Get the point by index.""" return Points(self._values[key, :], crs=self.crs) def __contains__(self, item: Points | Sequence[float]) -> bool: """Check if the item is in the points.""" if isinstance(item, Points): item = item.values elif isinstance(item, SequenceABC): item = np.array(item, dtype=np.float64) else: msg = f"item must be an Points or Sequence. Got {type(item)}" raise TypeError(msg) if item.ndim > 2 or item.shape[1] != 2: msg = f"item must be a 2D array with shape (n, 2). Got {item.shape()}" raise ValueError(msg) return np.any(np.all(self._values == item, axis=1)) def __str__(self) -> str: """Return the string representation of the Points.""" return f"Points(count={len(self)}, crs='{self.crs}')" def __repr__(self) -> str: """Return the string representation of the Points.""" prefix = "Points:\n" middle = self.to_DataFrame().to_string(max_rows=10) suffix = f"\n[count={len(self)}, crs='{self.crs}']" return f"{prefix}{middle}{suffix}" def __array__(self, dtype: np.dtype | None = None) -> np.ndarray: """Return the values of the points as a numpy array.""" if dtype is not None: return self._values.astype(dtype) return self._values def __add__(self, other: Points) -> Points: """Return the union of two Points.""" if not isinstance(other, Points): msg = f"other must be an instance of Points. Got {type(other)}" raise TypeError(msg) other, crs_new = self._ensure_points_crs(other) return Points(np.vstack([self.values, other.values]), crs=crs_new) def __sub__(self, other: Points) -> Points | None: """Return the set difference of two Points.""" if not isinstance(other, Points): msg = f"other must be an instance of Points. Got {type(other)}" raise TypeError(msg) other, crs_new = self._ensure_points_crs(other) mask = ~np.all(self._values == other._values, axis=1) values = self._values[mask] if len(values) == 0: return None return Points(values, crs=crs_new) def _ensure_points_crs(self, other: Points) -> tuple[Points, CRS]: """Ensure the coordinate reference system of the points are the same.""" if self.crs != other.crs: if self.crs is None or other.crs is None: crs_new = self.crs or other.crs warnings.warn( "Cannot find the coordinate reference system of the points. " "The crs of two points will assume to be the same. ", stacklevel=2, ) else: other = other.to_crs(self.crs) crs_new = self.crs else: crs_new = self.crs return other, crs_new @staticmethod def _find_field(gdf: gpd.GeoDataFrame, field_names: list[str]) -> str | None: """Find the field name in the GeoDataFrame. Parameters ---------- gdf : gpd.GeoDataFrame The GeoDataFrame to be searched. field_names : list[str] The field names to be searched. Returns ------- str | None The field name found. If not found, return None. """ for name in gdf.columns: if name.lower() in field_names: return name return None @classmethod def _ensure_fields( cls, gdf: gpd.GeoDataFrame, x_field: str, y_field: str, ) -> np.ndarray | None: """Parse the field from the GeoDataFrame. Parameters ---------- gdf : gpd.GeoDataFrame The GeoDataFrame to be parsed. x_field : str The field name of the x coordinate. y_field : str The field name of the y coordinate. Returns ------- np.ndarray | None The values of the field. If the field does not exist, return None. """ if x_field == "auto": x_field = cls._find_field( gdf, ["x", "xs", "lon", "longitude", "long", "longs", "longitudes"], ) if x_field is None: msg = ( "Cannot find the field name of the x coordinate. " "Please provide the field name manually." ) raise ValueError(msg) if y_field == "auto": y_field = cls._find_field( gdf, ["y", "ys", "lat", "latitude", "lats", "latitudes"], ) if y_field is None: msg = ( "Cannot find the field name of the y coordinate. " "Please provide the field name manually." ) raise ValueError(msg) return x_field, y_field @property def x(self) -> np.ndarray: """Return the x coordinates of the points.""" return self._values[:, 0] @property def y(self) -> np.ndarray: """Return the y coordinates of the points.""" return self._values[:, 1] @property def values(self) -> np.ndarray: """Return the values of the points with shape (n, 2). n is the number of points. """ return self._values @property def dtype(self) -> np.dtype: """Return the data type of the points.""" return self._values.dtype @property def crs(self) -> CRS | None: """Return the coordinate reference system of the points.""" return self._crs
[docs] def set_crs(self, crs: CrsLike) -> None: """Set the coordinate reference system of the points. .. warning:: This method will only set the crs attribute without converting the points to a new coordinate reference system. If you want to convert the points values to a new coordinate, please use :meth:`to_crs` """ self._crs = CRS.from_user_input(crs)
[docs] def to_crs(self, crs: CrsLike) -> Points: """Convert the points values to a new coordinate reference system. Parameters ---------- crs : Any The new coordinate reference system. Can be any object that can be passed to :meth:`pyproj.crs.CRS.from_user_input`. Returns ------- Points The points in the new coordinate reference system. """ if self.crs is None: msg = ( "The current coordinate reference system is None. " "Please set the crs using set_crs() first." ) raise ValueError(msg) if isinstance(crs, str): crs = CRS.from_user_input(crs) if self.crs == crs: return self values = np.array(list(zip(*warp_transform(self.crs, crs, self.x, self.y)))) return Points(values, crs=crs)
[docs] @classmethod def from_geo_dataframe( cls, gdf: gpd.GeoDataFrame, x_field: str = "auto", y_field: str = "auto", ) -> Points: """Initialize a Points object from a GeoDataFrame. Parameters ---------- gdf : gpd.GeoDataFrame The GeoDataFrame to be parsed. x_field, y_field : str, optional, default: "auto" The field name of the x/y coordinates if ``geometry`` not exists. If ``auto``, will try to find the field name automatically from following fields (case insensitive): * ``x``: x, xs, lon, longitude, long, longs, longitudes * ``y``: y, ys, lat, latitude, lats, latitudes Returns ------- Points The Points object. """ if "geometry" not in gdf.columns: x_field, y_field = cls._ensure_fields(gdf, x_field, y_field) return cls(gdf[[x_field, y_field]].values, crs=gdf.crs) points = list(zip(gdf.geometry.values.x, gdf.geometry.values.y)) return cls(points, crs=gdf.crs)
[docs] @classmethod def from_shapefile( cls, filename: str | Path, **kwargs, ) -> Points: """Initialize a Points object from a file. Parameters ---------- filename : str | Path The path to the shapefile. file type can be any type that can be passed to :func:`geopandas.read_file`. **kwargs : dict Other parameters passed to :func:`geopandas.read_file`. Returns ------- Points The Points object. """ gdf = gpd.read_file(filename, **kwargs) geometry = gdf.geometry.explode().values points = list(zip(geometry.x, geometry.y)) return cls(points, crs=gdf.crs)
[docs] @classmethod def from_csv( cls, filename: str | Path, x_field: str = "auto", y_field: str = "auto", crs: CrsLike = None, **kwargs, ) -> Points: """Initialize a Points object from a csv/txt file. Parameters ---------- filename : str | Path The path to the csv/txt file. x_field, y_field : str, optional, default: "auto" The field name of the x/y coordinates. If "auto", will try to find the field name automatically from following fields (case insensitive): * ``x`` : x, xs, lon, longitude * ``y`` : y, ys, lat, latitude crs : Any, optional, default: None The coordinate reference system of the points. Can be any object that can be passed to :meth:`pyproj.crs.CRS.from_user_input`. **kwargs : dict Other parameters passed to :func:`pandas.read_csv`. Returns ------- Points The Points object. """ _df = pd.read_csv(filename, **kwargs) gdf = gpd.GeoDataFrame( _df, geometry=gpd.points_from_xy(_df["x"], _df["y"]), crs=crs, ) x_field, y_field = cls._ensure_fields(_df, x_field, y_field) return cls.from_geo_dataframe(gdf, x_field, y_field)
[docs] def to_DataFrame(self) -> pd.DataFrame: """Convert the Points to a DataFrame. Returns ------- pd.DataFrame The DataFrame with columns ``x`` and ``y``. """ return pd.DataFrame(self._values, columns=["x", "y"])
[docs] def to_GeoDataFrame(self) -> gpd.GeoDataFrame: """Convert the Points to a GeoDataFrame. Returns ------- gpd.GeoDataFrame The GeoDataFrame. """ _df = self.to_DataFrame() return gpd.GeoDataFrame( _df, geometry=gpd.points_from_xy(_df["x"], _df["y"]), crs=self.crs, )
[docs] def to_shapefile(self, filename: str | Path, **kwargs) -> None: """Save the Points to a shapefile. Parameters ---------- filename : str | Path The path to the shapefile. **kwargs : dict Other parameters passed to :meth:`geopandas.GeoDataFrame.to_file`. """ gdf = self.to_GeoDataFrame() gdf.to_file(filename, **kwargs)