"""
The wntr.gis.network module contains methods to convert between water network
models and GIS formatted data
"""
import pandas as pd
import numpy as np
try:
from shapely.geometry import LineString, Point
has_shapely = True
except ModuleNotFoundError:
has_shapely = False
try:
import geopandas as gpd
has_geopandas = True
except ModuleNotFoundError:
has_geopandas = False
import wntr.network.elements
[docs]
class WaterNetworkGIS:
"""
Water network GIS class
Contains methods to create GeoDataFrames from WaterNetworkModel and
create WaterNetworkModel from GeoDataFrames.
Parameters
----------
gis_data : dict, optional
Dictionary of GeoDataFrames containing data to populate an instance
of WaterNetworkGIS. Valid dictionary keys are 'junction', 'tanks',
'reservoirs', 'pipes', 'pumps', and 'valves'
Raises
------
ModuleNotFoundError
if missing either shapely or geopandas
"""
[docs]
def __init__(self, gis_data=None) -> None:
if not has_shapely or not has_geopandas:
raise ModuleNotFoundError('shapley and geopandas are required')
self.junctions = gpd.GeoDataFrame()
self.tanks = gpd.GeoDataFrame()
self.reservoirs = gpd.GeoDataFrame()
self.pipes = gpd.GeoDataFrame()
self.pumps = gpd.GeoDataFrame()
self.valves = gpd.GeoDataFrame()
if isinstance(gis_data, dict):
if 'junctions' in gis_data.keys():
assert isinstance(gis_data['junctions'], gpd.GeoDataFrame)
self.junctions = gis_data['junctions']
if 'tanks' in gis_data.keys():
assert isinstance(gis_data['tanks'], gpd.GeoDataFrame)
self.tanks = gis_data['tanks']
if 'reservoirs' in gis_data.keys():
assert isinstance(gis_data['reservoirs'], gpd.GeoDataFrame)
self.reservoirs = gis_data['reservoirs']
if 'pipes' in gis_data.keys():
assert isinstance(gis_data['pipes'], gpd.GeoDataFrame)
self.pipes = gis_data['pipes']
if 'pumps' in gis_data.keys():
assert isinstance(gis_data['pumps'], gpd.GeoDataFrame)
self.pumps = gis_data['pumps']
if 'valves' in gis_data.keys():
assert isinstance(gis_data['valves'], gpd.GeoDataFrame)
self.valves = gis_data['valves']
def _create_gis(self, wn, crs: str = None, pumps_as_points: bool = False,
valves_as_points: bool = False,) -> None:
"""
Create GIS data from a water network model.
This method is used by wntr.network.io.to_gis
Note: patterns, curves, rules, controls, sources, and options are not
saved to the GIS data
Parameters
----------
wn : WaterNetworkModel
Water network model
crs : str, optional
Coordinate reference system, by default None
pumps_as_points : bool, optional
Represent pumps as points (True) or lines (False), by default False
valves_as_points : bool, optional
Represent valves as points (True) or lines (False), by default False
"""
def _extract_geodataframe(df, crs=None, valid_base_names=None,
links_as_points=False):
if valid_base_names is None:
valid_base_names = []
# Drop any column with all NaN, this removes excess attributes
# Valid base attributes that have all None values are added back
# at the end of this routine
df = df.loc[:, ~df.isna().all()]
# Define geom and drop node_type/link_type
if df.shape[0] > 0:
if 'node_type' in df.columns:
geom = [Point((x,y)) for x,y in df['coordinates']]
del df['node_type']
# do not carry over leak attributes to dataframe.
del df['leak']
del df['leak_area']
del df['leak_discharge_coeff']
elif 'link_type' in df.columns:
geom = []
for link_name in df['name']:
link = wn.get_link(link_name)
if links_as_points: #Point
geom.append(Point(link.start_node.coordinates))
else: # LineString
ls = list()
ls.append(link.start_node.coordinates)
for v in link.vertices:
ls.append(v)
ls.append(link.end_node.coordinates)
geom.append(LineString(ls))
del df['link_type']
# Drop column if not a str, float, int, or bool (or np.bool_)
# This drops columns like coordinates, vertices
# This could be extended to keep additional data type (list,
# tuple, network elements like Patterns, Curves)
drop_cols = []
for col in df.columns:
# Added np.bool_ to the following check
# Returned by df.to_dict('records') for some network models
if not isinstance(df.iloc[0][col], (str, float, int, bool, np.bool_)):
drop_cols.append(col)
df = df.drop(columns=drop_cols)
# Add back in valid base attributes that had all None values
cols = list(set(valid_base_names) - set(df.columns))
cols.sort()
if len(cols) > 0:
df[cols] = None
# Set index
if len(df) > 0:
df.set_index('name', inplace=True)
df = gpd.GeoDataFrame(df, crs=crs, geometry=geom)
else:
df = gpd.GeoDataFrame()
return df
# Convert the WaterNetworkModel to a dictionary
wn_dict = wn.to_dict()
# Create dataframes for node and link attributes
df_nodes = pd.DataFrame(wn_dict['nodes'])
df_links = pd.DataFrame(wn_dict['links'])
valid_base_names = self._valid_names(complete_list=False, truncate_names=None)
# Junctions
df = df_nodes[df_nodes['node_type'] == 'Junction']
self.junctions = _extract_geodataframe(df, crs, valid_base_names['junctions'])
# Tanks
df = df_nodes[df_nodes['node_type'] == 'Tank']
self.tanks = _extract_geodataframe(df, crs, valid_base_names['tanks'])
# Reservoirs
df = df_nodes[df_nodes['node_type'] == 'Reservoir']
self.reservoirs = _extract_geodataframe(df, crs, valid_base_names['reservoirs'])
# Pipes
df = df_links[df_links['link_type'] == 'Pipe']
self.pipes = _extract_geodataframe(df, crs, valid_base_names['pipes'], False)
# Pumps
df = df_links[df_links['link_type'] == 'Pump']
self.pumps = _extract_geodataframe(df, crs, valid_base_names['pumps'], pumps_as_points)
# Valves
df = df_links[df_links['link_type'] == 'Valve']
self.valves = _extract_geodataframe(df, crs, valid_base_names['valves'], valves_as_points)
def _create_wn(self, append=None):
"""
Create or append a WaterNetworkModel from GeoDataFrames
This method is used by wntr.network.io.from_gis
Parameters
----------
append : WaterNetworkModel or None, optional
Existing WaterNetworkModel to append. If None, a new WaterNetworkModel
is created.
"""
# Convert the WaterNetworkGIS to a dictionary
wn_dict = {}
wn_dict['nodes'] = []
wn_dict['links'] = []
# Modifications to create a WaterNetworkModel from a dict
# Reset index
# Create coordinates/vertices from geometry
# Add node_type/link_type
for node_type, element in [('Junction', self.junctions),
('Tank', self.tanks),
('Reservoir', self.reservoirs)]:
if element.shape[0] > 0:
assert (element['geometry'].geom_type).isin(['Point']).all()
df = element.reset_index(names="name")
df.rename(columns={'geometry':'coordinates'}, inplace=True)
df['coordinates'] = [[x,y] for x,y in zip(df['coordinates'].x,
df['coordinates'].y)]
df['node_type'] = node_type
wn_dict['nodes'].extend(df.to_dict('records'))
for link_type, element in [('Pipe', self.pipes),
('Pump', self.pumps),
('Valve', self.valves)]:
if element.shape[0] > 0:
assert 'start_node_name' in element.columns
assert 'end_node_name' in element.columns
df = element.reset_index(names="name")
df['vertices'] = df.apply(lambda row: list(row.geometry.coords)[1:-1], axis=1)
df.drop(columns=['geometry'], inplace=True)
df['link_type'] = link_type
wn_dict['links'].extend(df.to_dict('records'))
# Create WaterNetworkModel from dictionary
from wntr.network import from_dict
wn = from_dict(wn_dict, append)
return wn
[docs]
def to_crs(self, crs):
"""
Transform CRS of the junctions, tanks, reservoirs, pipes, pumps,
and valves GeoDataFrames.
Calls geopandas.GeoDataFrame.to_crs on each GeoDataFrame.
Parameters
----------
crs : str
Coordinate reference system
"""
for data in [self.junctions, self.tanks, self.reservoirs,
self.pipes, self.pumps, self.valves]:
if 'geometry' in data.columns:
data = data.to_crs(crs, inplace=True)
[docs]
def set_crs(self, crs, allow_override=False):
"""
Set CRS of the junctions, tanks, reservoirs, pipes, pumps,
and valves GeoDataFrames.
Calls geopandas.GeoDataFrame.set_crs on each GeoDataFrame.
Parameters
----------
crs : str
Coordinate reference system
allow_override : bool (optional)
Allow override of existing coordinate reference system
"""
for data in [self.junctions, self.tanks, self.reservoirs,
self.pipes, self.pumps, self.valves]:
if 'geometry' in data.columns:
data = data.set_crs(crs, inplace=True,
allow_override=allow_override)
[docs]
def add_node_attributes(self, values, name):
"""
Add attribute to junctions, tanks, or reservoirs GeoDataFrames
Parameters
----------
values : dict or Series or row of a DataFrame
Attribute values
name : str
Attribute name
"""
for node_name, value in values.items():
if node_name in self.junctions.index:
if name not in self.junctions.columns:
self.junctions[name] = np.nan
self.junctions.loc[node_name, name] = value
elif node_name in self.tanks.index:
if name not in self.tanks.columns:
self.tanks[name] = np.nan
self.tanks.loc[node_name, name] = value
elif node_name in self.reservoirs.index:
if name not in self.reservoirs.columns:
self.reservoirs[name] = np.nan
self.reservoirs.loc[node_name, name] = value
[docs]
def add_link_attributes(self, values, name):
"""
Add attribute to pipes, pumps, or valves GeoDataFrames
Parameters
----------
values : dict or Series or row of a DataFrame
Attribute values
name : str
Attribute name
"""
for link_name, value in values.items():
if link_name in self.pipes.index:
if name not in self.pipes.columns:
self.pipes[name] = np.nan
self.pipes.loc[link_name, name] = value
elif link_name in self.valves.index:
if name not in self.valves.columns:
self.valves[name] = np.nan
self.valves.loc[link_name, name] = value
elif link_name in self.pumps.index:
if name not in self.pumps.columns:
self.pumps[name] = np.nan
self.pumps.loc[link_name, name] = value
def _read(self, files, index_col='name'):
if 'junctions' in files.keys():
data = gpd.read_file(files['junctions']).set_index(index_col)
self.junctions = pd.concat([self.junctions, data])
if 'tanks' in files.keys():
data = gpd.read_file(files['tanks']).set_index(index_col)
self.tanks = pd.concat([self.tanks, data])
if 'reservoirs' in files.keys():
data = gpd.read_file(files['reservoirs']).set_index(index_col)
self.reservoirs = pd.concat([self.reservoirs, data])
if 'pipes' in files.keys():
data = gpd.read_file(files['pipes']).set_index(index_col)
self.pipes = pd.concat([self.pipes, data])
if 'pumps' in files.keys():
data = gpd.read_file(files['pumps']).set_index(index_col)
self.pumps = pd.concat([self.pumps, data])
if 'valves' in files.keys():
data = gpd.read_file(files['valves']).set_index(index_col)
self.valves = pd.concat([self.valves, data])
[docs]
def read_geojson(self, files, index_col='name'):
"""
Append information from GeoJSON files to a WaterNetworkGIS object
Parameters
----------
files : dictionary
Dictionary of GeoJSON filenames, where the keys are in the set
('junction', 'tanks', 'reservoirs', 'pipes', 'pumps', 'valves') and
values are the corresponding GeoJSON filename
index_col : str, optional
Column that contains the element name
"""
self._read(files, index_col)
[docs]
def read_shapefile(self, files, index_col='name'):
"""
Append information from Esri Shapefiles to a WaterNetworkGIS object
Parameters
----------
files : dictionary
Dictionary of Shapefile directory or filenames, where the keys are
in the set ('junction', 'tanks', 'reservoirs', 'pipes', 'pumps',
'valves') and values are the corresponding GeoJSON filename
index_col : str, optional
Column that contains the element name
"""
self._read(files, index_col)
field_name_map = self._shapefile_field_name_map()
self.junctions.rename(columns=field_name_map['junctions'], inplace=True)
self.tanks.rename(columns=field_name_map['tanks'], inplace=True)
self.reservoirs.rename(columns=field_name_map['reservoirs'], inplace=True)
self.pipes.rename(columns=field_name_map['pipes'], inplace=True)
self.pumps.rename(columns=field_name_map['pumps'], inplace=True)
self.valves.rename(columns=field_name_map['valves'], inplace=True)
def _write(self, prefix: str, driver="GeoJSON") -> None:
"""
Write the WaterNetworkGIS object to GIS files
One file will be created for each type of network element (junctions,
pipes, etc.) if those elements exists in the network
Parameters
----------
prefix : str
Filename prefix, will have the element type (junctions,
pipes, etc.) appended
driver : str, optional
GeoPandas driver. Use "GeoJSON" for GeoJSON files, use :code:`None`
for Esri Shapefile folders, by default "GeoJSON"
"""
if driver is None or driver == "":
extension = ""
else:
extension = "." + driver.lower()
if len(self.junctions) > 0:
filename = prefix + "_junctions" + extension
self.junctions.to_file(filename, driver=driver)
if len(self.tanks) > 0:
filename = prefix + "_tanks" + extension
self.tanks.to_file(filename, driver=driver)
if len(self.reservoirs) > 0:
filename = prefix + "_reservoirs" + extension
self.reservoirs.to_file(filename, driver=driver)
if len(self.pipes) > 0:
filename = prefix + "_pipes" + extension
self.pipes.to_file(filename, driver=driver)
if len(self.pumps) > 0:
filename = prefix + "_pumps" + extension
self.pumps.to_file(filename, driver=driver)
if len(self.valves) > 0:
filename = prefix + "_valves" + extension
self.valves.to_file(filename, driver=driver)
[docs]
def write_geojson(self, prefix: str):
"""
Write the WaterNetworkGIS object to a set of GeoJSON files, one file
for each network element.
Parameters
----------
prefix : str
File prefix
"""
self._write(prefix=prefix, driver="GeoJSON")
[docs]
def write_shapefile(self, prefix: str):
"""
Write the WaterNetworkGIS object to a set of Esri Shapefiles, one
directory for each network element.
Parameters
----------
prefix : str
File and directory prefix
"""
self._write(prefix=prefix, driver=None)
def _valid_names(self, complete_list=True, truncate_names=None):
"""
Valid column/field names for GeoJSON or Shapefiles
Note that Shapefile field names are truncated to 10 characters
(set truncate=10)
Parameters
----------
complete_list : bool
Include a complete list of column/field names (beyond basic attributes)
truncate_names : None or int
Truncate column/field names to specified number of characters,
set truncate=10 for Shapefiles. None indicates no truncation.
Returns
---------
dict : Dictionary of valid GeoJSON or Shapefile column/field names
"""
valid_names = {}
element_objects = {
'junctions': wntr.network.elements.Junction,
'tanks': wntr.network.elements.Tank,
'reservoirs': wntr.network.elements.Reservoir,
'pipes': wntr.network.elements.Pipe,
'pumps': wntr.network.elements.Pump,
'valves': wntr.network.elements.Valve}
valid_names = {}
for element, obj in element_objects.items():
if complete_list:
valid_names[element] = obj._base_attributes + obj._optional_attributes
else:
valid_names[element] = obj._base_attributes
if truncate_names is not None and truncate_names > 0:
for element, attributes in valid_names.items():
valid_names[element] = [attribute[:truncate_names] for attribute in attributes]
for key, vals in valid_names.items():
# Remove coordinates and vertices (not used to create GeoDataFrame geometry)
if 'coordinates' in valid_names[key]:
valid_names[key].remove('coordinates')
if 'vertices' in valid_names[key]:
valid_names[key].remove('vertices')
# Add geometry
if 'geometry' not in valid_names[key]:
valid_names[key].append('geometry')
return valid_names
def _shapefile_field_name_map(self):
"""
Return a map (dictionary) of tuncated shapefile field names to
valid base WaterNetworkModel attribute names
Esri Shapefiles truncate field names to 10 characters. The field name
map links truncated shapefile field names to complete (and ofen longer)
WaterNetworkModel attribute names. This assumes that the first 10
characters of each attribute name are unique.
Returns
-------
field_name_map : dict
Map (dictionary) of valid base shapefile field names to
WaterNetworkModel attribute names
"""
valid_names = self._valid_names()
field_name_map = {}
for element, attributes in valid_names.items():
truncated = [attribute[:10] for attribute in attributes]
field_name_map[element] = pd.Series(dict(zip(truncated, attributes)))
return field_name_map