Source code for wntr.gis.network

"""
The wntr.gis.network module contains methods to convert between water network 
models and GIS formatted data
"""

import pandas as pd
import numpy as np

try:
    from shapely.geometry import LineString, Point
    has_shapely = True
except ModuleNotFoundError:
    has_shapely = False

try:
    import geopandas as gpd
    has_geopandas = True
except ModuleNotFoundError:
    has_geopandas = False

import wntr.network.elements

[docs] class WaterNetworkGIS: """ Water network GIS class Contains methods to create GeoDataFrames from WaterNetworkModel and create WaterNetworkModel from GeoDataFrames. Parameters ---------- gis_data : dict, optional Dictionary of GeoDataFrames containing data to populate an instance of WaterNetworkGIS. Valid dictionary keys are 'junction', 'tanks', 'reservoirs', 'pipes', 'pumps', and 'valves' Raises ------ ModuleNotFoundError if missing either shapely or geopandas """
[docs] def __init__(self, gis_data=None) -> None: if not has_shapely or not has_geopandas: raise ModuleNotFoundError('shapley and geopandas are required') self.junctions = gpd.GeoDataFrame() self.tanks = gpd.GeoDataFrame() self.reservoirs = gpd.GeoDataFrame() self.pipes = gpd.GeoDataFrame() self.pumps = gpd.GeoDataFrame() self.valves = gpd.GeoDataFrame() if isinstance(gis_data, dict): if 'junctions' in gis_data.keys(): assert isinstance(gis_data['junctions'], gpd.GeoDataFrame) self.junctions = gis_data['junctions'] if 'tanks' in gis_data.keys(): assert isinstance(gis_data['tanks'], gpd.GeoDataFrame) self.tanks = gis_data['tanks'] if 'reservoirs' in gis_data.keys(): assert isinstance(gis_data['reservoirs'], gpd.GeoDataFrame) self.reservoirs = gis_data['reservoirs'] if 'pipes' in gis_data.keys(): assert isinstance(gis_data['pipes'], gpd.GeoDataFrame) self.pipes = gis_data['pipes'] if 'pumps' in gis_data.keys(): assert isinstance(gis_data['pumps'], gpd.GeoDataFrame) self.pumps = gis_data['pumps'] if 'valves' in gis_data.keys(): assert isinstance(gis_data['valves'], gpd.GeoDataFrame) self.valves = gis_data['valves']
def _create_gis(self, wn, crs: str = None, pumps_as_points: bool = False, valves_as_points: bool = False,) -> None: """ Create GIS data from a water network model. This method is used by wntr.network.io.to_gis Note: patterns, curves, rules, controls, sources, and options are not saved to the GIS data Parameters ---------- wn : WaterNetworkModel Water network model crs : str, optional Coordinate reference system, by default None pumps_as_points : bool, optional Represent pumps as points (True) or lines (False), by default False valves_as_points : bool, optional Represent valves as points (True) or lines (False), by default False """ def _extract_geodataframe(df, crs=None, links_as_points=False): # Drop any column with all NaN df = df.loc[:, ~df.isna().all()] if df.shape[0] > 0: # Define geom if 'node_type' in df.columns: geom = [Point((x,y)) for x,y in df['coordinates']] elif 'link_type' in df.columns: geom = [] for link_name in df['name']: link = wn.get_link(link_name) if links_as_points: #Point geom.append(Point(link.start_node.coordinates)) else: # LineString ls = list() ls.append(link.start_node.coordinates) for v in link.vertices: ls.append(v) ls.append(link.end_node.coordinates) geom.append(LineString(ls)) # Drop column if not a str, float, int, or bool # This could be extended to keep additional data type (list, # tuple, network elements like Patterns, Curves) drop_cols = [] for col in df.columns: if not isinstance(df.iloc[0][col], (str, float, int, bool)): drop_cols.append(col) df = df.drop(columns=drop_cols) # Set index if len(df) > 0: df.set_index('name', inplace=True) df = gpd.GeoDataFrame(df, crs=crs, geometry=geom) else: df = gpd.GeoDataFrame() return df # Convert the WaterNetworkModel to a dictionary wn_dict = wn.to_dict() # Create dataframes for node and link attributes df_nodes = pd.DataFrame(wn_dict['nodes']) df_links = pd.DataFrame(wn_dict['links']) # Junctions df = df_nodes[df_nodes['node_type'] == 'Junction'] self.junctions = _extract_geodataframe(df, crs) # Tanks df = df_nodes[df_nodes['node_type'] == 'Tank'] self.tanks = _extract_geodataframe(df, crs) # Reservoirs df = df_nodes[df_nodes['node_type'] == 'Reservoir'] self.reservoirs = _extract_geodataframe(df, crs) # Pipes df = df_links[df_links['link_type'] == 'Pipe'] self.pipes = _extract_geodataframe(df, crs, False) # Pumps df = df_links[df_links['link_type'] == 'Pump'] self.pumps = _extract_geodataframe(df, crs, pumps_as_points) # Valves df = df_links[df_links['link_type'] == 'Valve'] self.valves = _extract_geodataframe(df, crs, valves_as_points) def _create_wn(self, append=None): """ Create or append a WaterNetworkModel from GeoDataFrames This method is used by wntr.network.io.from_gis Parameters ---------- append : WaterNetworkModel or None, optional Existing WaterNetworkModel to append. If None, a new WaterNetworkModel is created. """ # Convert the WaterNetworkGIS to a dictionary wn_dict = {} wn_dict['nodes'] = [] wn_dict['links'] = [] for element in [self.junctions, self.tanks, self.reservoirs]: if element.shape[0] > 0: assert (element['geometry'].geom_type).isin(['Point']).all() df = element.reset_index(names="name") df.rename(columns={'geometry':'coordinates'}, inplace=True) df['coordinates'] = [[x,y] for x,y in zip(df['coordinates'].x, df['coordinates'].y)] wn_dict['nodes'].extend(df.to_dict('records')) for element in [self.pipes, self.pumps, self.valves]: if element.shape[0] > 0: assert 'start_node_name' in element.columns assert 'end_node_name' in element.columns df = element.reset_index(names="name") df['vertices'] = df.apply(lambda row: list(row.geometry.coords)[1:-1], axis=1) df.drop(columns=['geometry'], inplace=True) wn_dict['links'].extend(df.to_dict('records')) # Create WaterNetworkModel from dictionary from wntr.network import from_dict wn = from_dict(wn_dict, append) return wn
[docs] def to_crs(self, crs): """ Transform CRS of the junctions, tanks, reservoirs, pipes, pumps, and valves GeoDataFrames. Calls geopandas.GeoDataFrame.to_crs on each GeoDataFrame. Parameters ---------- crs : str Coordinate reference system """ for data in [self.junctions, self.tanks, self.reservoirs, self.pipes, self.pumps, self.valves]: if 'geometry' in data.columns: data = data.to_crs(crs, inplace=True)
[docs] def set_crs(self, crs, allow_override=False): """ Set CRS of the junctions, tanks, reservoirs, pipes, pumps, and valves GeoDataFrames. Calls geopandas.GeoDataFrame.set_crs on each GeoDataFrame. Parameters ---------- crs : str Coordinate reference system allow_override : bool (optional) Allow override of existing coordinate reference system """ for data in [self.junctions, self.tanks, self.reservoirs, self.pipes, self.pumps, self.valves]: if 'geometry' in data.columns: data = data.set_crs(crs, inplace=True, allow_override=allow_override)
[docs] def add_node_attributes(self, values, name): """ Add attribute to junctions, tanks, or reservoirs GeoDataFrames Parameters ---------- values : dict or Series or row of a DataFrame Attribute values name : str Attribute name """ for node_name, value in values.items(): if node_name in self.junctions.index: if name not in self.junctions.columns: self.junctions[name] = np.nan self.junctions.loc[node_name, name] = value elif node_name in self.tanks.index: if name not in self.tanks.columns: self.tanks[name] = np.nan self.tanks.loc[node_name, name] = value elif node_name in self.reservoirs.index: if name not in self.reservoirs.columns: self.reservoirs[name] = np.nan self.reservoirs.loc[node_name, name] = value
def _read(self, files, index_col='name'): if 'junctions' in files.keys(): data = gpd.read_file(files['junctions']).set_index(index_col) self.junctions = pd.concat([self.junctions, data]) if 'tanks' in files.keys(): data = gpd.read_file(files['tanks']).set_index(index_col) self.tanks = pd.concat([self.tanks, data]) if 'reservoirs' in files.keys(): data = gpd.read_file(files['reservoirs']).set_index(index_col) self.reservoirs = pd.concat([self.reservoirs, data]) if 'pipes' in files.keys(): data = gpd.read_file(files['pipes']).set_index(index_col) self.pipes = pd.concat([self.pipes, data]) if 'pumps' in files.keys(): data = gpd.read_file(files['pumps']).set_index(index_col) self.pumps = pd.concat([self.pumps, data]) if 'valves' in files.keys(): data = gpd.read_file(files['valves']).set_index(index_col) self.valves = pd.concat([self.valves, data])
[docs] def read_geojson(self, files, index_col='name'): """ Append information from GeoJSON files to a WaterNetworkGIS object Parameters ---------- files : dictionary Dictionary of GeoJSON filenames, where the keys are in the set ('junction', 'tanks', 'reservoirs', 'pipes', 'pumps', 'valves') and values are the corresponding GeoJSON filename index_col : str, optional Column that contains the element name """ self._read(files, index_col)
[docs] def read_shapefile(self, files, index_col='name'): """ Append information from Esri Shapefiles to a WaterNetworkGIS object Parameters ---------- files : dictionary Dictionary of Shapefile directory or filenames, where the keys are in the set ('junction', 'tanks', 'reservoirs', 'pipes', 'pumps', 'valves') and values are the corresponding GeoJSON filename index_col : str, optional Column that contains the element name """ self._read(files, index_col) field_name_map = self._shapefile_field_name_map() self.junctions.rename(columns=field_name_map['junctions'], inplace=True) self.tanks.rename(columns=field_name_map['tanks'], inplace=True) self.reservoirs.rename(columns=field_name_map['reservoirs'], inplace=True) self.pipes.rename(columns=field_name_map['pipes'], inplace=True) self.pumps.rename(columns=field_name_map['pumps'], inplace=True) self.valves.rename(columns=field_name_map['valves'], inplace=True)
def _write(self, prefix: str, driver="GeoJSON") -> None: """ Write the WaterNetworkGIS object to GIS files One file will be created for each type of network element (junctions, pipes, etc.) if those elements exists in the network Parameters ---------- prefix : str Filename prefix, will have the element type (junctions, pipes, etc.) appended driver : str, optional GeoPandas driver. Use "GeoJSON" for GeoJSON files, use :code:`None` for Esri Shapefile folders, by default "GeoJSON" """ if driver is None or driver == "": extension = "" else: extension = "." + driver.lower() if len(self.junctions) > 0: filename = prefix + "_junctions" + extension self.junctions.to_file(filename, driver=driver) if len(self.tanks) > 0: filename = prefix + "_tanks" + extension self.tanks.to_file(filename, driver=driver) if len(self.reservoirs) > 0: filename = prefix + "_reservoirs" + extension self.reservoirs.to_file(filename, driver=driver) if len(self.pipes) > 0: filename = prefix + "_pipes" + extension self.pipes.to_file(filename, driver=driver) if len(self.pumps) > 0: filename = prefix + "_pumps" + extension self.pumps.to_file(filename, driver=driver) if len(self.valves) > 0: filename = prefix + "_valves" + extension self.valves.to_file(filename, driver=driver)
[docs] def write_geojson(self, prefix: str): """ Write the WaterNetworkGIS object to a set of GeoJSON files, one file for each network element. Parameters ---------- prefix : str File prefix """ self._write(prefix=prefix, driver="GeoJSON")
[docs] def write_shapefile(self, prefix: str): """ Write the WaterNetworkGIS object to a set of Esri Shapefiles, one directory for each network element. Parameters ---------- prefix : str File and directory prefix """ self._write(prefix=prefix, driver=None)
def _valid_names(self, complete_list=True, truncate_names=None): """ Valid column/field names for GeoJSON or Shapefiles Note that Shapefile field names are truncated to 10 characters (set truncate=10) Parameters ---------- complete_list : bool Include a complete list of column/field names (beyond basic attributes) truncate_names : None or int Truncate column/field names to specified number of characters, set truncate=10 for Shapefiles. None indicates no truncation. Returns --------- dict : Dictionary of valid GeoJSON or Shapefile column/field names """ valid_names = {} element_objects = { 'junctions': wntr.network.elements.Junction, 'tanks': wntr.network.elements.Tank, 'reservoirs': wntr.network.elements.Reservoir, 'pipes': wntr.network.elements.Pipe, 'pumps': wntr.network.elements.Pump, 'valves': wntr.network.elements.Valve} valid_names = {} for element, obj in element_objects.items(): if complete_list: valid_names[element] = obj._base_attributes + obj._optional_attributes else: valid_names[element] = obj._base_attributes if truncate_names is not None and truncate_names > 0: for element, attributes in valid_names.items(): valid_names[element] = [attribute[:truncate_names] for attribute in attributes] return valid_names def _shapefile_field_name_map(self): """ Return a map (dictionary) of tuncated shapefile field names to valid base WaterNetworkModel attribute names Esri Shapefiles truncate field names to 10 characters. The field name map links truncated shapefile field names to complete (and ofen longer) WaterNetworkModel attribute names. This assumes that the first 10 characters of each attribute name are unique. Returns ------- field_name_map : dict Map (dictionary) of valid base shapefile field names to WaterNetworkModel attribute names """ valid_names = self._valid_names() field_name_map = {} for element, attributes in valid_names.items(): truncated = [attribute[:10] for attribute in attributes] field_name_map[element] = pd.Series(dict(zip(truncated, attributes))) return field_name_map