Source code for wntr.gis.geospatial

"""
The wntr.gis.geospatial module contains functions to snap data and find 
intersects with polygons.
"""

import pandas as pd
import numpy as np
import matplotlib.pylab as plt
from scipy.spatial.distance import cdist
from scipy.spatial.distance import pdist, squareform
from scipy.cluster.hierarchy import linkage, fcluster, dendrogram

try:
    from shapely.geometry import MultiPoint, LineString, Point, shape
    has_shapely = True
except ModuleNotFoundError:
    has_shapely = False

try:
    import geopandas as gpd
    has_geopandas = True
except ModuleNotFoundError:
    gpd = None
    has_geopandas = False
    
try:
    import rasterio
    has_rasterio = True
except ModuleNotFoundError:
    rasterio = None
    has_rasterio = False


[docs] def snap(A, B, tolerance): """ Snap Points in A to Points or Lines in B For each Point geometry in A, the function returns snapped Point geometry and associated element in B. Note the CRS of A must equal the CRS of B. Parameters ---------- A : geopandas GeoDataFrame GeoDataFrame containing Point geometries. B : geopandas GeoDataFrame GeoDataFrame containing Point, LineString, or MultiLineString geometries. tolerance : float Maximum allowable distance (in the coordinate reference system units) between Points in A and Points or Lines in B. Returns ------- GeoPandas GeoDataFrame Snapped points (index = A.index, columns = defined below) If B contains Points, columns include: - node: closest Point in B to Point in A - snap_distance: distance between Point in A and snapped point - geometry: GeoPandas Point object of the snapped point If B contains Lines or MultiLineString, columns include: - link: closest Line in B to Point in A - node: start or end node of Line in B that is closest to the snapped point (if B contains columns "start_node_name" and "end_node_name") - snap_distance: distance between Point A and snapped point - line_position: normalized distance of snapped point along Line in B from the start node (0.0) and end node (1.0) - geometry: GeoPandas Point object of the snapped point """ if not has_shapely or not has_geopandas: raise ModuleNotFoundError('shapley and geopandas are required') assert isinstance(A, gpd.GeoDataFrame) assert(A['geometry'].geom_type).isin(['Point']).all() assert isinstance(B, gpd.GeoDataFrame) assert (B['geometry'].geom_type).isin(['Point', 'LineString', 'MultiLineString']).all() assert A.crs == B.crs # Modify B to include "indexB" as a separate column B = B.reset_index(names='indexB') # Define the coordinate reference system, based on B crs = B.crs # Determine which Bs are closest to each A bbox = A.bounds + [-tolerance, -tolerance, tolerance, tolerance] hits = bbox.apply(lambda row: list(B.loc[list(B.sindex.intersection(row))]['indexB']), axis=1) closest = pd.DataFrame({ # index of points table "point": np.repeat(hits.index, hits.apply(len)), # name of link "indexB": np.concatenate(hits.values) }) # Merge the closest dataframe with the lines dataframe on the line names closest = pd.merge(closest, B, on="indexB") # Join back to the original points to get their geometry # rename the point geometry as "points" closest = closest.join(A.geometry.rename("points"), on="point") # Convert back to a GeoDataFrame, so we can do spatial ops closest = gpd.GeoDataFrame(closest, geometry="geometry", crs=crs) # Calculate distance between the point and nearby links closest["snap_distance"] = closest.geometry.distance(gpd.GeoSeries(closest.points, crs=crs)) # Collect only point/link pairs within snap distance radius # This is needed because B.sindex.intersection(row) above can return false positives closest = closest[closest['snap_distance'] <= tolerance] # Sort on ascending snap distance, so that closest goes to top closest = closest.sort_values(by=["snap_distance", "indexB"]) # group by the index of the points and take the first, which is the closest line closest = closest.groupby("point").first() # construct a GeoDataFrame of the closest elements of B closest = gpd.GeoDataFrame(closest, geometry="geometry", crs=crs) # Reset B index B.set_index('indexB', inplace=True) B.index.name = None # snap to points if B['geometry'].geom_type.isin(['Point']).all(): snapped_points = closest.rename(columns={"indexB":"node"}) snapped_points = snapped_points[["node", "snap_distance", "geometry"]] snapped_points.index.name = None # snap to lines if B['geometry'].geom_type.isin(['LineString', 'MultiLineString']).all(): closest = closest.rename(columns={"indexB":"link"}) # position of nearest point from start of the line pos = closest.geometry.project(gpd.GeoSeries(closest.points)) # get new point location geometry snapped_points = closest.geometry.interpolate(pos) snapped_points = gpd.GeoDataFrame(data=closest ,geometry=snapped_points, crs=crs) # determine whether the snapped point is closer to the start or end node snapped_points["line_position"] = closest.geometry.project(snapped_points, normalized=True) if ("start_node_name" in closest.columns) and ("end_node_name" in closest.columns): snapped_points.loc[snapped_points["line_position"]<0.5, "node"] = closest["start_node_name"] snapped_points.loc[snapped_points["line_position"]>=0.5, "node"] = closest["end_node_name"] snapped_points = snapped_points[["link", "node", "snap_distance", "line_position", "geometry"]] else: snapped_points = snapped_points[["link", "snap_distance", "line_position", "geometry"]] snapped_points.index.name = None return snapped_points
def _backgound(A, B): hull_geom = A.unary_union.convex_hull hull_data = gpd.GeoDataFrame(pd.DataFrame([{'geometry': hull_geom}]), crs=A.crs) background_geom = hull_data.overlay(B, how='difference').unary_union background = gpd.GeoDataFrame(pd.DataFrame([{'geometry': background_geom}]), crs=A.crs) background.index = ['BACKGROUND'] return background
[docs] def intersect(A, B, B_value=None, include_background=False, background_value=0): """ Intersect Points, Lines or Polygons in A with Points, Lines, or Polygons in B. Return statistics on the intersection. The function returns information about the intersection for each geometry in A. Each geometry in B is assigned a value based on a column of data in that GeoDataFrame. Note the CRS of A must equal the CRS of B. Parameters ---------- A : geopandas GeoDataFrame GeoDataFrame containing Point, LineString, or Polygon geometries B : geopandas GeoDataFrame GeoDataFrame containing Point, LineString, or Polygon geometries B_value : str or None (optional) Column name in B used to assign a value to each geometry. Default is None. include_background : bool (optional) Include background, defined as space covered by A that is not covered by B (overlay difference between A and B). The background geometry is added to B and is given the name 'BACKGROUND'. Default is False. background_value : int or float (optional) The value given to background space. This value is used in the intersection statistics if a B_value column name is provided. Default is 0. Returns ------- intersects : DataFrame Intersection statistics (index = A.index, columns = defined below) Columns include: - n: number of intersecting B geometries - intersections: list of intersecting B indices If B_value is given: - values: list of intersecting B values - sum: sum of the intersecting B values - min: minimum of the intersecting B values - max: maximum of the intersecting B values - mean: mean of the intersecting B values If A contains Lines and B contains Polygons: - weighted_mean: weighted mean of intersecting B values """ if not has_shapely or not has_geopandas: raise ModuleNotFoundError('shapley and geopandas are required') assert isinstance(A, gpd.GeoDataFrame) assert (A['geometry'].geom_type).isin(['Point', 'LineString', 'MultiLineString', 'Polygon', 'MultiPolygon']).all() assert isinstance(B, gpd.GeoDataFrame) assert (B['geometry'].geom_type).isin(['Point', 'LineString', 'MultiLineString', 'Polygon', 'MultiPolygon']).all() if isinstance(B_value, str): assert B_value in B.columns assert isinstance(include_background, bool) assert isinstance(background_value, (int, float)) assert A.crs == B.crs, "A and B must have the same crs." if include_background: background = _backgound(A, B) if B_value is not None: background[B_value] = background_value B = pd.concat([B, background]) intersects = gpd.sjoin(A, B, predicate='intersects') intersects.index.name = '_tmp_index_name' # set a temp index name for grouping # Sort values by index and intersecting object intersects['sort_order'] = 1 # make sure 'BACKGROUND' is listed first intersects.loc[intersects['index_right'] == 'BACKGROUND', 'sort_order'] = 0 intersects.sort_values(['_tmp_index_name', 'sort_order', 'index_right'], inplace=True) n = intersects.groupby('_tmp_index_name')['geometry'].count() B_indices = intersects.groupby('_tmp_index_name')['index_right'].apply(list) stats = pd.DataFrame(index=A.index.copy(), data={'intersections': B_indices, 'n': n,}) stats['n'] = stats['n'].fillna(0) stats['n'] = stats['n'].apply(int) stats.loc[stats['intersections'].isnull(), 'intersections'] = stats.loc[stats['intersections'].isnull(), 'intersections'] .apply(lambda x: []) if B_value is not None: stats['values'] = intersects.groupby('_tmp_index_name')[B_value].apply(list) stats['sum'] = intersects.groupby('_tmp_index_name')[B_value].sum() stats['min'] = intersects.groupby('_tmp_index_name')[B_value].min() stats['max'] = intersects.groupby('_tmp_index_name')[B_value].max() stats['mean'] = intersects.groupby('_tmp_index_name')[B_value].mean() stats = stats.reindex(['intersections', 'values', 'n', 'sum', 'min', 'max', 'mean'], axis=1) stats.loc[stats['values'].isnull(), 'values'] = stats.loc[stats['values'].isnull(), 'values'] .apply(lambda x: []) weighted_mean = False if (A['geometry'].geom_type).isin(['LineString', 'MultiLineString']).all(): if (B['geometry'].geom_type).isin(['Polygon', 'MultiPolygon']).all(): weighted_mean = True if weighted_mean and B_value is not None: stats['weighted_mean'] = 0.0 A_length = A.length covered_length = pd.Series(0.0, index = A.index) for i in B.index: B_geom = gpd.GeoDataFrame(B.loc[[i],:], crs=B.crs) val = float(B_geom.iloc[0][B_value]) A_subset = A.loc[stats['intersections'].apply(lambda x: i in x),:] #print(i, lines_subset) A_clip = gpd.clip(A_subset, B_geom) A_clip_length = A_clip.length A_clip_index = A_clip.index if A_clip_length.shape[0] > 0: fraction_length = A_clip_length/A_length[A_clip_index] covered_length[A_clip_index] = covered_length[A_clip_index] + fraction_length weighed_val = fraction_length*val stats.loc[A_clip_index, 'weighted_mean'] = stats.loc[A_clip_index, 'weighted_mean'] + weighed_val # Normalize weighted mean by covered length (can be over 1 if polygons overlap) # Can be less than 1 if there are gaps (when background is not used) stats['weighted_mean'] = stats['weighted_mean']/covered_length # Covered_length is NaN if length A is 0, set weighted mean to mean stats.loc[covered_length.isna(), 'weighted_mean'] = stats.loc[covered_length.isna(), 'mean'] # No intersection, set weighted mean to NaN stats.loc[stats['n']==0, 'weighted_mean'] = np.NaN stats.index.name = None return stats
[docs] def sample_raster(A, filepath, bands=1): """Sample a raster (e.g., GeoTIFF file) using Points in GeoDataFrame A. This function can take either a filepath to a raster or a virtual raster (VRT), which combines multiple raster tiles into a single object. The function opens the raster(s) and samples it at the coordinates of the point geometries in A. This function assigns nan to values that match the raster's `nodata` attribute. These sampled values are returned as a Series which has an index matching A. Parameters ---------- A : GeoDataFrame GeoDataFrame containing Point geometries filepath : str Path to raster or alternatively a virtual raster (VRT) bands : int or list[int] (optional, default = 1) Index or indices of the bands to sample (using 1-based indexing) Returns ------- Series Pandas Series containing the sampled values for each geometry in gdf """ # further functionality could include other geometries (Line, Polygon), # and use of multiprocessing to speed up querying. if not has_rasterio: raise ModuleNotFoundError('rasterio is required') assert (A['geometry'].geom_type == "Point").all() assert isinstance(filepath, str) assert isinstance(bands, (int, list)) with rasterio.open(filepath) as raster: xys = zip(A.geometry.x, A.geometry.y) values = np.array( tuple(raster.sample(xys, bands)), dtype=float # force to float to allow for conversion of nodata to nan ).squeeze() values[values == raster.nodata] = np.nan values = pd.Series(values, index=A.index) return values
[docs] def connect_lines(lines, threshold, plot=False): """ Connect lines by identifying start and end nodes that are within a threshold distance Parameters ---------- lines : gpd.GeoDataFrame GeoDataFrame with LineString geometry tolerance : float Maximum distance between line endpoints, used to define connecting Point geometry plot : bool Boolean indicating if a plot is created for the dendogram, and original and connected lines Returns ------- Tuple[line GeoDataFrame, node GeoDataFrame] * line GeoDataFrame contains LineString geometry, start_node_name, and end_node_name, along with original columns in lines * node GeoDataFrame contains connecting Point geometry and node names """ if not has_shapely or not has_geopandas: raise ModuleNotFoundError('shapley and geopandas are required') original_lines = lines lines = lines.copy() # Create start and end node name and Point geometry for each line nodes = [] geometry = [] lines['start_node_name'] = None #create new columns lines['end_node_name'] = None j = 0 for i, line in lines.iterrows(): # loop through every line try: geom = line.geometry.geoms[0] except: geom = line.geometry geometry.append(Point([geom.coords[0][0], geom.coords[0][1]])) nodes.append({'Line': i, 'Node': j}) #node list holds the line ID #unique number for each start node and end node for each line lines.loc[i,'start_node_name'] = j j = j+1 geometry.append(Point([geom.coords[-1][0], geom.coords[-1][1]])) nodes.append({'Line': i, 'Node': j}) lines.loc[i,'end_node_name'] = j j = j+1 nodes = gpd.GeoDataFrame(nodes, geometry=geometry) nodes.set_crs(lines.crs, inplace=True) # Create a distance matrix points = nodes['geometry'] coords = points.apply(lambda geom: (geom.x, geom.y)).tolist() condensed = pdist(coords) D = squareform(condensed) D = pd.DataFrame(D, index=nodes.index, columns=nodes.index) # Several options exist for linkage and fcluster below # currently using ward/euclidean/distance # Compute a linkage model Z = linkage(condensed, method='ward', metric='euclidean', optimal_ordering=True) # Form clusters #clusters = fcluster(Z, threshold, criterion='inconsistent', depth=2) clusters = fcluster(Z, threshold, criterion='distance') #clusters = fcluster(Z, threshold, criterion='monocrit') #clusters = fcluster(Z, threshold, criterion='maxclust') #clusters = fcluster(Z, threshold, criterion='maxclust_monocrit') clusters = pd.Series(clusters, index=nodes.index) nodes['supernode'] = clusters.copy() # Update lines GeoDataFrame with start and end supernode names map_node_to_supernode = nodes['supernode'] lines['start_node_name'] = map_node_to_supernode[lines['start_node_name']].values lines['end_node_name'] = map_node_to_supernode[lines['end_node_name']].values # Remove lines with the same start and end node name lines = lines.loc[~(lines['start_node_name'] == lines['end_node_name']),:] # Create a nodes GeoDataFrame with centroid of each supernode supernode_name = [] supernode_geom = [] for name, group in nodes.groupby('supernode'): supernode_name.append(name) supernode_geom.append(group.dissolve().centroid[0]) nodes = gpd.GeoDataFrame({'Node': supernode_name}, geometry=supernode_geom) nodes.set_index('Node', inplace=True) nodes.index.name = None nodes.crs = lines.crs # Convert names to string nodes.index = nodes.index.astype(str) lines['start_node_name'] = lines['start_node_name'].astype(str) lines['end_node_name'] = lines['end_node_name'].astype(str) # Add start and end node Points to LineStrings start_coords = nodes.loc[lines['start_node_name']] end_coords = nodes.loc[lines['end_node_name']] for i, (line_name, row) in enumerate(lines.iterrows()): line = row['geometry'] l_coords = list(line.coords) start_point = nodes.loc[row['start_node_name'], 'geometry'] end_point = nodes.loc[row['end_node_name'], 'geometry'] if start_point.coords[0] != l_coords[0]: l_coords.insert(0, start_point.coords[0]) if end_point.coords[0] != l_coords[-1]: l_coords.append(end_point.coords[0]) lines.loc[line_name, 'geometry'] = LineString(l_coords) if plot: plt.figure(figsize=(10, 5)) dendro = dendrogram(Z) #unique_clusters = np.unique(clusters) #colors = plt.cm.get_cmap('tab10', len(unique_clusters)) # Use a colormap with enough colors #color_dict = {cluster: colors(i) for i, cluster in enumerate(unique_clusters)} #dendro = dendrogram(Z, color_threshold=threshold, link_color_func=lambda k: color_dict[k]) plt.title('Hierarchical Cluster Dendrogram') plt.xlabel('Data Point Indexes') plt.ylabel('Distance') plt.figure() ax = plt.gca() ax = original_lines.plot(ax=ax, color='r', label='Disconnected lines') ax = lines.plot(ax=ax, color='k', linewidth=6, alpha=0.35, label='Connected lines') ax = nodes.plot(ax=ax, color='k', label='Connected nodes') ax.legend() bounds = ax.axis('equal') plt.tight_layout() plt.axis('off') return lines, nodes