Source code for wntr.morph.skel

"""
The wntr.morph.skel module contains functions to skeletonize water
network models.
"""
import logging
import copy
import itertools
import networkx as nx
    
from wntr.network.elements import Pipe, Junction
from wntr.sim.core import WNTRSimulator
from wntr.sim import EpanetSimulator

logger = logging.getLogger(__name__)

[docs] def skeletonize(wn, pipe_diameter_threshold, branch_trim=True, series_pipe_merge=True, parallel_pipe_merge=True, max_cycles=None, use_epanet=True, pipes_to_exclude:list=[], junctions_to_exclude:list=[], return_map=False, return_copy=True): """ Perform network skeletonization using branch trimming, series pipe merge, and parallel pipe merge operations. Candidate pipes for removal is based on a pipe diameter threshold. Parameters ------------- wn: wntr WaterNetworkModel Water network model pipe_diameter_threshold: float Pipe diameter threshold. Pipes with diameter <= threshold are candidates for removal branch_trim: bool, optional If True, include branch trimming in skeletonization series_pipe_merge: bool, optional If True, include series pipe merge in skeletonization parallel_pipe_merge: bool, optional If True, include parallel pipe merge in skeletonization max_cycles: int or None, optional Maximum number of cycles in the skeletonization process. One cycle performs branch trimming for all candidate pipes, followed by series pipe merging for all candidate pipes, followed by parallel pipe merging for all candidate pipes. If max_cycles is set to None, skeletonization will run until the network can no longer be reduced. use_epanet: bool, optional If True, use the EpanetSimulator to compute headloss in pipes. If False, use the WNTRSimulator to compute headloss in pipes. pipes_to_exclude: list, optional List of pipe names to exclude from skeletonization junctions_to_exclude: list, optional List of junction names to exclude from skeletonization return_map: bool, optional If True, return a skeletonization map. The map is a dictionary that includes original nodes as keys and a list of skeletonized nodes that were merged into each original node as values. return_copy: bool, optional If True, modify and return a copy of the WaterNetworkModel object. If False, modify and return the original WaterNetworkModel object. Returns -------- wntr WaterNetworkModel Skeletonized water network model dictionary Skeletonization map (if return_map = True) which includes original nodes as keys and a list of skeletonized nodes that were merged into each original node as values. """ if len(pipes_to_exclude) > 0: assert len(set(pipes_to_exclude) - set(wn.pipe_name_list)) == 0 if len(junctions_to_exclude) > 0: assert len(set(junctions_to_exclude) - set(wn.junction_name_list)) == 0 skel = _Skeletonize(wn, use_epanet, return_copy, pipes_to_exclude, junctions_to_exclude) skel.run(pipe_diameter_threshold, branch_trim, series_pipe_merge, parallel_pipe_merge, max_cycles) if return_map: return skel.wn, skel.skeleton_map else: return skel.wn
class _Skeletonize(object): def __init__(self, wn, use_epanet, return_copy, pipes_to_exclude, junctions_to_exclude): if return_copy: # Get a copy of the WaterNetworkModel self.wn = copy.deepcopy(wn) else: self.wn = wn # Get the WaterNetworkModel graph G = self.wn.to_graph() G = G.to_undirected() self.G = G # Create a map of original nodes to skeletonized nodes skel_map = {} for node_name in self.wn.node_name_list: skel_map[node_name] = [node_name] self.skeleton_map = skel_map # Get a list of junction and pipe names that are associated with controls # Add them to junctions and pipes to exclude junc_with_controls = [] pipe_with_controls = [] for name, control in self.wn.controls(): for req in control.requires(): if isinstance(req, Junction): junc_with_controls.append(req.name) elif isinstance(req, Pipe): pipe_with_controls.append(req.name) self.junc_to_exclude = list(set(junc_with_controls)) self.junc_to_exclude.extend(junctions_to_exclude) self.pipe_to_exclude = list(set(pipe_with_controls)) self.pipe_to_exclude.extend(pipes_to_exclude) # Calculate pipe headloss using a single period EPANET simulation duration = self.wn.options.time.duration if use_epanet: sim = EpanetSimulator(self.wn) else: sim = WNTRSimulator(self.wn) self.wn.options.time.duration = 0 results = sim.run_sim() head = results.node['head'] headloss = {} for link_name, link in self.wn.links(): headloss[link_name] = float(abs(head.loc[0,link.start_node_name] - head.loc[0,link.end_node_name])) self.headloss = headloss self.wn.options.time.duration = duration self.num_branch_trim = 0 self.num_series_merge = 0 self.num_parallel_merge = 0 def run(self, pipe_threshold, branch_trim=True, series_pipe_merge=True, parallel_pipe_merge=True, max_cycles=None): """ Run iterative branch trim, series pipe merge, and parallel pipe merge operations based on a pipe diameter threshold. """ num_junctions = self.wn.num_junctions iteration = 0 flag = True while flag: if branch_trim: self.branch_trim(pipe_threshold) if series_pipe_merge: self.series_pipe_merge(pipe_threshold) if parallel_pipe_merge: self.parallel_pipe_merge(pipe_threshold) iteration = iteration + 1 if (max_cycles is not None) and (iteration > max_cycles): flag = False if num_junctions == self.wn.num_junctions: flag = False else: num_junctions = self.wn.num_junctions return self.wn, self.skeleton_map def branch_trim(self, pipe_threshold): """ Run a single branch trim operation based on a pipe diameter threshold. Branch trimming removes dead-end pipes smaller than the pipe diameter threshold and redistributes demands (and associated demand patterns) to the neighboring junction. """ for junc_name in self.wn.junction_name_list: if junc_name in self.junc_to_exclude: continue neighbors = list(nx.neighbors(self.G,junc_name)) if len(neighbors) > 1: continue if len(neighbors) == 0: continue neigh_junc_name = neighbors[0] # only one neighbor nPipes = len(self.G.adj[junc_name][neigh_junc_name]) if nPipes > 1: continue neigh_junc = self.wn.get_node(neigh_junc_name) if not (isinstance(neigh_junc, Junction)): continue pipe_name = list(self.G.adj[junc_name][neigh_junc_name].keys())[0] # only one pipe pipe = self.wn.get_link(pipe_name) if not ((isinstance(pipe, Pipe)) and \ (pipe.diameter <= pipe_threshold) and \ pipe_name not in self.pipe_to_exclude): continue logger.info('Branch trim: '+ str(junc_name) + str(neighbors)) # Update skeleton map self.skeleton_map[neigh_junc_name].extend(self.skeleton_map[junc_name]) self.skeleton_map[junc_name] = [] # Move demand junc = self.wn.get_node(junc_name) for demand in junc.demand_timeseries_list: neigh_junc.demand_timeseries_list.append(demand) junc.demand_timeseries_list.clear() # Remove node and links from wn and G self.wn.remove_link(pipe_name, force=True) self.wn.remove_node(junc_name, force=True) self.G.remove_node(junc_name) self.num_branch_trim +=1 return self.wn, self.skeleton_map def series_pipe_merge(self, pipe_threshold): """ Run a single series pipe merge operation based on a pipe diameter threshold. This operation combines pipes in series if both pipes are smaller than the pipe diameter threshold. The larger diameter pipe is retained, demands (and associated demand patterns) are redistributed to the nearest junction. """ for junc_name in self.wn.junction_name_list: if junc_name in self.junc_to_exclude: continue neighbors = list(nx.neighbors(self.G,junc_name)) if not (len(neighbors) == 2): continue neigh_junc_name0 = neighbors[0] neigh_junc_name1 = neighbors[1] neigh_junc0 = self.wn.get_node(neigh_junc_name0) neigh_junc1 = self.wn.get_node(neigh_junc_name1) if not ((isinstance(neigh_junc0, Junction)) or \ (isinstance(neigh_junc1, Junction))): continue pipe_name0 = list(self.G.adj[junc_name][neigh_junc_name0].keys()) pipe_name1 = list(self.G.adj[junc_name][neigh_junc_name1].keys()) if (len(pipe_name0) > 1) or (len(pipe_name1) > 1): continue pipe_name0 = pipe_name0[0] # only one pipe pipe_name1 = pipe_name1[0] # only one pipe pipe0 = self.wn.get_link(pipe_name0) pipe1 = self.wn.get_link(pipe_name1) if not ((isinstance(pipe0, Pipe)) and \ (isinstance(pipe1, Pipe)) and \ ((pipe0.diameter <= pipe_threshold) and \ (pipe1.diameter <= pipe_threshold)) and \ pipe_name0 not in self.pipe_to_exclude and \ pipe_name1 not in self.pipe_to_exclude): continue # Find closest neighbor junction if (isinstance(neigh_junc0, Junction)) and \ (isinstance(neigh_junc1, Junction)): if pipe0.length < pipe1.length: closest_junc = neigh_junc0 else: closest_junc = neigh_junc1 elif (isinstance(neigh_junc0, Junction)): closest_junc = neigh_junc0 elif (isinstance(neigh_junc1, Junction)): closest_junc = neigh_junc1 else: continue logger.info('Series pipe merge: ' + str(junc_name) + str(neighbors)) # Update skeleton map self.skeleton_map[closest_junc.name].extend(self.skeleton_map[junc_name]) self.skeleton_map[junc_name] = [] # Move demand junc = self.wn.get_node(junc_name) for demand in junc.demand_timeseries_list: closest_junc.demand_timeseries_list.append(demand) junc.demand_timeseries_list.clear() # Remove node and links from wn and G self.wn.remove_link(pipe_name0, force=True) self.wn.remove_link(pipe_name1, force=True) self.wn.remove_node(junc_name, force=True) self.G.remove_node(junc_name) # Compute new pipe properties props = self._series_merge_properties(pipe0, pipe1) # Add new pipe to wn and G dominant_pipe = self._select_dominant_pipe(pipe0, pipe1) self.wn.add_pipe(dominant_pipe.name, start_node_name=neigh_junc_name0, end_node_name=neigh_junc_name1, length=props['length'], diameter=props['diameter'], roughness=props['roughness'], minor_loss=props['minorloss'], initial_status=props['status']) self.G.add_edge(neigh_junc_name0, neigh_junc_name1, dominant_pipe.name) self.num_series_merge +=1 return self.wn, self.skeleton_map def parallel_pipe_merge(self, pipe_threshold): """ Run a single parallel pipe merge operation based on a pipe diameter threshold. This operation combines pipes in parallel if both pipes are smaller than the pipe diameter threshold. The larger diameter pipe is retained. """ for junc_name in self.wn.junction_name_list: if junc_name in self.junc_to_exclude: continue neighbors = nx.neighbors(self.G,junc_name) for neighbor in [n for n in neighbors]: parallel_pipe_names = list(self.G.adj[junc_name][neighbor].keys()) if len(parallel_pipe_names) == 1: continue for (pipe_name0, pipe_name1) in itertools.combinations(parallel_pipe_names, 2): try: pipe0 = self.wn.get_link(pipe_name0) pipe1 = self.wn.get_link(pipe_name1) except: continue # one of the pipes removed in previous loop if not ((isinstance(pipe0, Pipe)) and \ (isinstance(pipe1, Pipe)) and \ ((pipe0.diameter <= pipe_threshold) and \ (pipe1.diameter <= pipe_threshold)) and \ pipe_name0 not in self.pipe_to_exclude and \ pipe_name1 not in self.pipe_to_exclude): continue logger.info('Parallel pipe merge: '+ str(junc_name) + str((pipe_name0, pipe_name1))) # Remove links from wn and G self.wn.remove_link(pipe_name0, force=True) self.wn.remove_link(pipe_name1, force=True) self.G.remove_edge(neighbor, junc_name, pipe_name0) self.G.remove_edge(junc_name, neighbor, pipe_name1) # Compute new pipe properties props = self._parallel_merge_properties(pipe0, pipe1) # Add a new pipe to wn and G dominant_pipe = self._select_dominant_pipe(pipe0, pipe1) self.wn.add_pipe(dominant_pipe.name, start_node_name=dominant_pipe.start_node_name, end_node_name=dominant_pipe.end_node_name, length=props['length'], diameter=props['diameter'], roughness=props['roughness'], minor_loss=props['minorloss'], initial_status=props['status']) self.G.add_edge(dominant_pipe.start_node_name, dominant_pipe.end_node_name, dominant_pipe.name) self.num_parallel_merge +=1 return self.wn, self.skeleton_map def _select_dominant_pipe(self, pipe0, pipe1): # Dominant pipe = larger diameter if pipe0.diameter >= pipe1.diameter: dominant_pipe = pipe0 else: dominant_pipe = pipe1 return dominant_pipe def _series_merge_properties(self, pipe0, pipe1): props = {} dominant_pipe = self._select_dominant_pipe(pipe0, pipe1) props['length'] = pipe0.length + pipe1.length props['diameter'] = dominant_pipe.diameter props['minorloss'] = dominant_pipe.minor_loss props['status'] = dominant_pipe.status props['roughness'] = (props['length']/(props['diameter']**4.87))**0.54 * \ ((pipe0.length/((pipe0.diameter**4.87)*(pipe0.roughness**1.85))) + \ (pipe1.length/((pipe1.diameter**4.87)*(pipe1.roughness**1.85))))**-0.54 return props def _parallel_merge_properties(self, pipe0, pipe1): props = {} dominant_pipe = self._select_dominant_pipe(pipe0, pipe1) props['length'] = dominant_pipe.length props['diameter'] = dominant_pipe.diameter props['minorloss'] = dominant_pipe.minor_loss props['status'] = dominant_pipe.status props['roughness'] = ((props['length']**0.54)/(props['diameter']**2.63)) * \ ((pipe0.roughness*(pipe0.diameter**2.63))/(pipe0.length**0.54) + \ (pipe1.roughness*(pipe1.diameter**2.63))/(pipe1.length**0.54)) return props