Source code for wntr.gis.geospatial

"""
The wntr.gis.geospatial module contains functions to snap data and find 
intersects with polygons.
"""

import pandas as pd
import numpy as np


try:
    from shapely.geometry import MultiPoint, LineString, Point, shape
    has_shapely = True
except ModuleNotFoundError:
    has_shapely = False

try:
    import geopandas as gpd
    has_geopandas = True
except ModuleNotFoundError:
    gpd = None
    has_geopandas = False
    
try:
    import rasterio
    has_rasterio = True
except ModuleNotFoundError:
    rasterio = None
    has_rasterio = False


[docs] def snap(A, B, tolerance): """ Snap Points in A to Points or Lines in B For each Point geometry in A, the function returns snapped Point geometry and associated element in B. Note the CRS of A must equal the CRS of B. Parameters ---------- A : geopandas GeoDataFrame GeoDataFrame containing Point geometries. B : geopandas GeoDataFrame GeoDataFrame containing Point, LineString, or MultiLineString geometries. tolerance : float Maximum allowable distance (in the coordinate reference system units) between Points in A and Points or Lines in B. Returns ------- GeoPandas GeoDataFrame Snapped points (index = A.index, columns = defined below) If B contains Points, columns include: - node: closest Point in B to Point in A - snap_distance: distance between Point in A and snapped point - geometry: GeoPandas Point object of the snapped point If B contains Lines or MultiLineString, columns include: - link: closest Line in B to Point in A - node: start or end node of Line in B that is closest to the snapped point (if B contains columns "start_node_name" and "end_node_name") - snap_distance: distance between Point A and snapped point - line_position: normalized distance of snapped point along Line in B from the start node (0.0) and end node (1.0) - geometry: GeoPandas Point object of the snapped point """ if not has_shapely or not has_geopandas: raise ModuleNotFoundError('shapley and geopandas are required') assert isinstance(A, gpd.GeoDataFrame) assert(A['geometry'].geom_type).isin(['Point']).all() assert isinstance(B, gpd.GeoDataFrame) assert (B['geometry'].geom_type).isin(['Point', 'LineString', 'MultiLineString']).all() assert A.crs == B.crs # Modify B to include "indexB" as a separate column B = B.reset_index(names='indexB') # Define the coordinate reference system, based on B crs = B.crs # Determine which Bs are closest to each A bbox = A.bounds + [-tolerance, -tolerance, tolerance, tolerance] hits = bbox.apply(lambda row: list(B.loc[list(B.sindex.intersection(row))]['indexB']), axis=1) closest = pd.DataFrame({ # index of points table "point": np.repeat(hits.index, hits.apply(len)), # name of link "indexB": np.concatenate(hits.values) }) # Merge the closest dataframe with the lines dataframe on the line names closest = pd.merge(closest, B, on="indexB") # Join back to the original points to get their geometry # rename the point geometry as "points" closest = closest.join(A.geometry.rename("points"), on="point") # Convert back to a GeoDataFrame, so we can do spatial ops closest = gpd.GeoDataFrame(closest, geometry="geometry", crs=crs) # Calculate distance between the point and nearby links closest["snap_distance"] = closest.geometry.distance(gpd.GeoSeries(closest.points, crs=crs)) # Collect only point/link pairs within snap distance radius # This is needed because B.sindex.intersection(row) above can return false positives closest = closest[closest['snap_distance'] <= tolerance] # Sort on ascending snap distance, so that closest goes to top closest = closest.sort_values(by=["snap_distance", "indexB"]) # group by the index of the points and take the first, which is the closest line closest = closest.groupby("point").first() # construct a GeoDataFrame of the closest elements of B closest = gpd.GeoDataFrame(closest, geometry="geometry", crs=crs) # Reset B index B.set_index('indexB', inplace=True) B.index.name = None # snap to points if B['geometry'].geom_type.isin(['Point']).all(): snapped_points = closest.rename(columns={"indexB":"node"}) snapped_points = snapped_points[["node", "snap_distance", "geometry"]] snapped_points.index.name = None # snap to lines if B['geometry'].geom_type.isin(['LineString', 'MultiLineString']).all(): closest = closest.rename(columns={"indexB":"link"}) # position of nearest point from start of the line pos = closest.geometry.project(gpd.GeoSeries(closest.points)) # get new point location geometry snapped_points = closest.geometry.interpolate(pos) snapped_points = gpd.GeoDataFrame(data=closest ,geometry=snapped_points, crs=crs) # determine whether the snapped point is closer to the start or end node snapped_points["line_position"] = closest.geometry.project(snapped_points, normalized=True) if ("start_node_name" in closest.columns) and ("end_node_name" in closest.columns): snapped_points.loc[snapped_points["line_position"]<0.5, "node"] = closest["start_node_name"] snapped_points.loc[snapped_points["line_position"]>=0.5, "node"] = closest["end_node_name"] snapped_points = snapped_points[["link", "node", "snap_distance", "line_position", "geometry"]] else: snapped_points = snapped_points[["link", "snap_distance", "line_position", "geometry"]] snapped_points.index.name = None return snapped_points
def _backgound(A, B): hull_geom = A.unary_union.convex_hull hull_data = gpd.GeoDataFrame(pd.DataFrame([{'geometry': hull_geom}]), crs=A.crs) background_geom = hull_data.overlay(B, how='difference').unary_union background = gpd.GeoDataFrame(pd.DataFrame([{'geometry': background_geom}]), crs=A.crs) background.index = ['BACKGROUND'] return background
[docs] def intersect(A, B, B_value=None, include_background=False, background_value=0): """ Intersect Points, Lines or Polygons in A with Points, Lines, or Polygons in B. Return statistics on the intersection. The function returns information about the intersection for each geometry in A. Each geometry in B is assigned a value based on a column of data in that GeoDataFrame. Note the CRS of A must equal the CRS of B. Parameters ---------- A : geopandas GeoDataFrame GeoDataFrame containing Point, LineString, or Polygon geometries B : geopandas GeoDataFrame GeoDataFrame containing Point, LineString, or Polygon geometries B_value : str or None (optional) Column name in B used to assign a value to each geometry. Default is None. include_background : bool (optional) Include background, defined as space covered by A that is not covered by B (overlay difference between A and B). The background geometry is added to B and is given the name 'BACKGROUND'. Default is False. background_value : int or float (optional) The value given to background space. This value is used in the intersection statistics if a B_value column name is provided. Default is 0. Returns ------- intersects : DataFrame Intersection statistics (index = A.index, columns = defined below) Columns include: - n: number of intersecting B geometries - intersections: list of intersecting B indices If B_value is given: - values: list of intersecting B values - sum: sum of the intersecting B values - min: minimum of the intersecting B values - max: maximum of the intersecting B values - mean: mean of the intersecting B values If A contains Lines and B contains Polygons: - weighted_mean: weighted mean of intersecting B values """ if not has_shapely or not has_geopandas: raise ModuleNotFoundError('shapley and geopandas are required') assert isinstance(A, gpd.GeoDataFrame) assert (A['geometry'].geom_type).isin(['Point', 'LineString', 'MultiLineString', 'Polygon', 'MultiPolygon']).all() assert isinstance(B, gpd.GeoDataFrame) assert (B['geometry'].geom_type).isin(['Point', 'LineString', 'MultiLineString', 'Polygon', 'MultiPolygon']).all() if isinstance(B_value, str): assert B_value in B.columns assert isinstance(include_background, bool) assert isinstance(background_value, (int, float)) assert A.crs == B.crs, "A and B must have the same crs." if include_background: background = _backgound(A, B) if B_value is not None: background[B_value] = background_value B = pd.concat([B, background]) intersects = gpd.sjoin(A, B, predicate='intersects') intersects.index.name = '_tmp_index_name' # set a temp index name for grouping # Sort values by index and intersecting object intersects['sort_order'] = 1 # make sure 'BACKGROUND' is listed first intersects.loc[intersects['index_right'] == 'BACKGROUND', 'sort_order'] = 0 intersects.sort_values(['_tmp_index_name', 'sort_order', 'index_right'], inplace=True) n = intersects.groupby('_tmp_index_name')['geometry'].count() B_indices = intersects.groupby('_tmp_index_name')['index_right'].apply(list) stats = pd.DataFrame(index=A.index.copy(), data={'intersections': B_indices, 'n': n,}) stats['n'] = stats['n'].fillna(0) stats['n'] = stats['n'].apply(int) stats.loc[stats['intersections'].isnull(), 'intersections'] = stats.loc[stats['intersections'].isnull(), 'intersections'] .apply(lambda x: []) if B_value is not None: stats['values'] = intersects.groupby('_tmp_index_name')[B_value].apply(list) stats['sum'] = intersects.groupby('_tmp_index_name')[B_value].sum() stats['min'] = intersects.groupby('_tmp_index_name')[B_value].min() stats['max'] = intersects.groupby('_tmp_index_name')[B_value].max() stats['mean'] = intersects.groupby('_tmp_index_name')[B_value].mean() stats = stats.reindex(['intersections', 'values', 'n', 'sum', 'min', 'max', 'mean'], axis=1) stats.loc[stats['values'].isnull(), 'values'] = stats.loc[stats['values'].isnull(), 'values'] .apply(lambda x: []) weighted_mean = False if (A['geometry'].geom_type).isin(['LineString', 'MultiLineString']).all(): if (B['geometry'].geom_type).isin(['Polygon', 'MultiPolygon']).all(): weighted_mean = True if weighted_mean and B_value is not None: stats['weighted_mean'] = 0.0 A_length = A.length covered_length = pd.Series(0.0, index = A.index) for i in B.index: B_geom = gpd.GeoDataFrame(B.loc[[i],:], crs=B.crs) val = float(B_geom.iloc[0][B_value]) A_subset = A.loc[stats['intersections'].apply(lambda x: i in x),:] #print(i, lines_subset) A_clip = gpd.clip(A_subset, B_geom) A_clip_length = A_clip.length A_clip_index = A_clip.index if A_clip_length.shape[0] > 0: fraction_length = A_clip_length/A_length[A_clip_index] covered_length[A_clip_index] = covered_length[A_clip_index] + fraction_length weighed_val = fraction_length*val stats.loc[A_clip_index, 'weighted_mean'] = stats.loc[A_clip_index, 'weighted_mean'] + weighed_val # Normalize weighted mean by covered length (can be over 1 if polygons overlap) # Can be less than 1 if there are gaps (when background is not used) stats['weighted_mean'] = stats['weighted_mean']/covered_length # Covered_length is NaN if length A is 0, set weighted mean to mean stats.loc[covered_length.isna(), 'weighted_mean'] = stats.loc[covered_length.isna(), 'mean'] # No intersection, set weighted mean to NaN stats.loc[stats['n']==0, 'weighted_mean'] = np.NaN stats.index.name = None return stats
[docs] def sample_raster(A, filepath, bands=1): """Sample a raster (e.g., GeoTIFF file) using Points in GeoDataFrame A. This function can take either a filepath to a raster or a virtual raster (VRT), which combines multiple raster tiles into a single object. The function opens the raster(s) and samples it at the coordinates of the point geometries in A. This function assigns nan to values that match the raster's `nodata` attribute. These sampled values are returned as a Series which has an index matching A. Parameters ---------- A : GeoDataFrame GeoDataFrame containing Point geometries filepath : str Path to raster or alternatively a virtual raster (VRT) bands : int or list[int] (optional, default = 1) Index or indices of the bands to sample (using 1-based indexing) Returns ------- Series Pandas Series containing the sampled values for each geometry in gdf """ # further functionality could include other geometries (Line, Polygon), # and use of multiprocessing to speed up querying. if not has_rasterio: raise ModuleNotFoundError('rasterio is required') assert (A['geometry'].geom_type == "Point").all() assert isinstance(filepath, str) assert isinstance(bands, (int, list)) with rasterio.open(filepath) as raster: xys = zip(A.geometry.x, A.geometry.y) values = np.array( tuple(raster.sample(xys, bands)), dtype=float # force to float to allow for conversion of nodata to nan ).squeeze() values[values == raster.nodata] = np.nan values = pd.Series(values, index=A.index) return values