"""
The wntr.metrics.hydraulic module contains hydraulic metrics.
"""
import wntr.network
import numpy as np
import pandas as pd
import networkx as nx
import math
from collections import Counter
import sys
from functools import reduce
import logging
logger = logging.getLogger(__name__)
[docs]
def expected_demand(wn, start_time=None, end_time=None, timestep=None, category=None):
"""
Compute expected demand at each junction and time using base demands
and demand patterns along with the demand multiplier
Parameters
-----------
wn : wntr WaterNetworkModel
Water network model. The water network model is needed to
get demand timeseries at junctions and options related to
duration, timestep, and demand multiplier.
start_time : int (optional)
Start time in seconds, if None then value is set to 0
end_time : int (optional)
End time in seconds, if None then value is set to wn.options.time.duration
timestep : int (optional)
Timestep, if None then value is set to wn.options.time.report_timestep
category : str (optional)
Demand category name. If None, all demand categories are used.
Returns
-------
A pandas DataFrame that contains expected demand in m3/s (index = times, columns = junction names).
"""
if start_time is None:
start_time = 0
if end_time is None:
end_time = wn.options.time.duration
if timestep is None:
timestep = wn.options.time.report_timestep
exp_demand = {}
tsteps = np.arange(start_time, end_time+timestep, timestep)
for name, junc in wn.junctions():
dem = []
for ts in tsteps:
dem.append(junc.demand_timeseries_list.at(ts,
multiplier=wn.options.hydraulic.demand_multiplier, category=category))
exp_demand[name] = dem
exp_demand = pd.DataFrame(index=tsteps, data=exp_demand)
return exp_demand
[docs]
def average_expected_demand(wn, category=None):
"""
Compute average expected demand per day at each junction using base demands
and demand patterns along with the demand multiplier
Parameters
-----------
wn : wntr WaterNetworkModel
Water network model. The water network model is needed to
get demand timeseries at junctions and options related to
duration, timestep, and demand multiplier.
category : str (optional)
Demand category name. If None, all demand categories are used.
Returns
-------
A pandas Series that contains average expected demand in m3/s (index = junction names).
"""
L = [24*3600] # start with a 24 hour pattern
for name, pattern in wn.patterns():
L.append(len(pattern.multipliers)*wn.options.time.pattern_timestep)
lcm = int(_lcml(L))
start_time = wn.options.time.pattern_start
end_time = start_time+lcm
timestep = wn.options.time.pattern_timestep
exp_demand = expected_demand(wn, start_time, end_time-timestep, timestep, category=category)
ave_exp_demand = exp_demand.mean(axis=0)
return ave_exp_demand
def _gcd(x,y):
while y:
if y<0:
x,y=-x,-y
x,y=y,x % y
return x
def _gcdl(*list):
return reduce(_gcd, *list)
def _lcm(x,y):
return x*y / _gcd(x,y)
def _lcml(*list):
return reduce(_lcm, *list)
[docs]
def water_service_availability(expected_demand, demand):
r"""
Compute water service availability (WSA) at junctions, defined as follows:
.. math:: WSA = \dfrac{demand}{expected\_demand}
where
:math:`demand` is the actual demand computed from a hydraulic simulation, and
:math:`expected\_demand` is the expected demand computed from base demands and demand
patterns. Expected demand can be computed using the
:class:`~wntr.metrics.hydraulic.expected_demand` method.
WSA can be averaged over times and/or nodes (see below). If
expected demand is 0 for a particular junction, water service availability
will be set to NaN for that junction.
* To compute water service availability for each junction and timestep,
expected_demand and demand should be pandas DataFrames (index = times, columns = junction names).
* To compute an average water service availability for each junction (averaged over time),
expected_demand and demand should be a pandas Series, indexed by junction.
To convert a DataFrame (index = times, columns = junction names) to a
Series indexed by junction, use the following code:
:math:`expected\_demand.sum(axis=0)`
:math:`demand.sum(axis=0)`
* To compute an average water service availability for each timestep (averaged over junctions),
expected_demand and demand should be a pandas Series, indexed by time.
To convert a DataFrame (index = times, columns = junction names) to a
Series indexed by time, use the following code:
:math:`expected\_demand.sum(axis=1)`
:math:`demand.sum(axis=1)`
Parameters
----------
expected_demand : pandas DataFrame or pandas Series (see note above)
Expected demand at junctions
demand : pandas DataFrame or pandas Series (see note above)
Actual demand (generally from a PDD hydraulic simulation) at junctions
Returns
-------
A pandas DataFrame or pandas Series that contains water service
availability.
"""
wsa = demand.div(expected_demand)
return wsa
[docs]
def todini_index(head, pressure, demand, flowrate, wn, Pstar):
"""
Compute Todini index, equations from :cite:p:`todi00`.
The Todini index is related to the capability of a system to overcome
failures while still meeting demands and pressures at the nodes. The
Todini index defines resilience at a specific time as a measure of surplus
power at each node and measures relative energy redundancy.
Parameters
----------
head : pandas DataFrame
A pandas DataFrame containing node head
(index = times, columns = node names).
pressure : pandas DataFrame
A pandas DataFrame containing node pressure
(index = times, columns = node names).
demand : pandas DataFrame
A pandas DataFrame containing node demand
(index = times, columns = node names).
flowrate : pandas DataFrame
A pandas DataFrame containing pump flowrates
(index = times, columns = pump names).
wn : wntr WaterNetworkModel
Water network model. The water network model is needed to
find the start and end node to each pump.
Pstar : float
Pressure threshold.
Returns
-------
A pandas Series that contains a time-series of Todini indexes
"""
Pout = demand.loc[:,wn.junction_name_list]*head.loc[:,wn.junction_name_list]
elevation = head.loc[:,wn.junction_name_list]-pressure.loc[:,wn.junction_name_list]
Pexp = demand.loc[:,wn.junction_name_list]*(Pstar+elevation)
Pin_res = -demand.loc[:,wn.reservoir_name_list]*head.loc[:,wn.reservoir_name_list]
headloss = pd.DataFrame()
for name, link in wn.pumps():
start_node = link.start_node_name
end_node = link.end_node_name
start_head = head.loc[:,start_node] # (m)
end_head = head.loc[:,end_node] # (m)
headloss[name] = end_head - start_head # (m)
Pin_pump = flowrate.loc[:,wn.pump_name_list]*headloss.abs()
todini = (Pout.sum(axis=1) - Pexp.sum(axis=1))/ \
(Pin_res.sum(axis=1) + Pin_pump.sum(axis=1) - Pexp.sum(axis=1))
return todini
[docs]
def modified_resilience_index(pressure, elevation, Pstar, demand=None, per_junction=True):
"""
Compute the modified resilience index, equations from :cite:p:`jasr08`.
The modified resilience index is the total surplus power available at
demand junctions as a percentage of the total minimum required power at
demand junctions. The metric can be computed as a timeseries for each
junction or as a system average timeseries.
Parameters
----------
pressure : pandas DataFrame
A pandas DataFrame containing junction pressure
(index = times, columns = junction names).
elevation : pandas Series
Junction elevation (which can be obtained using `wn.query_node_attribute('elevation')`)
(index = junction names)
Pstar : float
Pressure threshold.
demand : pandas DataFrame
A pandas DataFrame containing junction demand (only needed if per_junction=False)
(index = times, columns = junction names).
per_junction : bool (optional)
If True, compute the modified resilience index per junction.
If False, compute the modified resilience index over all junctions.
Returns
-------
pandas Series or DataFrame
Modified resilience index time-series. If per_junction=True, columns=junction names.
"""
assert isinstance(pressure, pd.DataFrame), "pressure must be a pandas DataFrame"
assert isinstance(elevation, pd.Series), "elevation must be a pandas Series"
assert sorted(pressure.columns) == sorted(elevation.index), "The columns in pressure must be the same as the index in elevation"
assert isinstance(Pstar, (float, int)), "Pstar must be a float"
assert isinstance(per_junction, bool), "per_junction must be a Boolean"
if per_junction == False:
assert isinstance(demand, pd.DataFrame), "demand must be a pandas DataFrame when per_junction=False"
assert sorted(pressure.columns) == sorted(demand.columns), "The columns in pressure must be the same as the columns in demand"
if per_junction:
Pout = (pressure + elevation)
Pexp =(Pstar + elevation)
mri = (Pout - Pexp)/Pexp
else:
Pout = demand*(pressure + elevation)
Pexp = demand*(Pstar + elevation)
mri = (Pout.sum(axis=1) - Pexp.sum(axis=1))/Pexp.sum(axis=1)
return mri
[docs]
def tank_capacity(pressure, wn):
"""
Compute tank capacity, the ratio of water volume stored in tanks to the
maximum volume of water that can be stored.
Parameters
----------
pressure : pandas DataFrame
A pandas DataFrame containing tank water level (pressure)
(index = times, columns = tank names).
wn : wntr WaterNetworkModel
Water network model. The water network model is needed to
get the tank object to compute current and max volume.
Returns
-------
pandas DataFrame
Tank capacity (index = times, columns = tank names)
"""
assert isinstance(pressure, pd.DataFrame), "pressure must be a pandas DataFrame"
assert isinstance(wn, wntr.network.WaterNetworkModel), "wn must be a wntr WaterNetworkModel"
tank_capacity = pd.DataFrame(index=pressure.index, columns=pressure.columns)
for name in wn.tank_name_list:
tank = wn.get_node(name)
max_volume = tank.get_volume(tank.max_level)
tank_volume = tank.get_volume(pressure[name])
tank_capacity[name] = tank_volume/max_volume
return tank_capacity
[docs]
def entropy(G, sources=None, sinks=None):
"""
Compute entropy, equations from :cite:p:`awgb90`.
Entropy is a measure of uncertainty in a random variable.
In a water distribution network model, the random variable is
flow in the pipes and entropy can be used to measure alternate flow paths
when a network component fails. A network that carries maximum entropy
flow is considered reliable with multiple alternate paths.
Parameters
----------
G : NetworkX or WNTR graph
Entropy is computed using a directed graph based on pipe flow direction.
The 'weight' of each link is equal to the flow rate.
sources : list of strings, optional (default = all reservoirs)
List of node names to use as sources.
sinks : list of strings, optional (default = all nodes)
List of node names to use as sinks.
Returns
-------
A tuple which includes:
- A pandas Series that contains entropy for each node
- System entropy (float)
"""
if G.is_directed() == False:
return
if sources is None:
sources = [key for key,value in nx.get_node_attributes(G,'type').items() if value == 'Reservoir' ]
if sinks is None:
sinks = G.nodes()
S = {}
Q = {}
for nodej in sinks:
if nodej in sources:
S[nodej] = 0 # nodej is the source
continue
sp = [] # simple path
if G.nodes[nodej]['type'] == 'Junction':
for source in sources:
if nx.has_path(G, source, nodej):
simple_paths = nx.all_simple_paths(G,source,target=nodej)
sp = sp + ([p for p in simple_paths])
# all_simple_paths was modified to check 'has_path' in the
# loop, but this is still slow for large networks
# what if the network was skeletonized based on series pipes
# that have the same flow direction?
# what about duplicating paths that have pipes in series?
#print j, nodeid, len(sp)
if len(sp) == 0:
S[nodej] = np.nan # nodej is not connected to any sources
continue
# "dtype=object" is needed to create an array from a list of lists with differnet lengths
sp = np.array(sp, dtype=object)
# Uj = set of nodes on the upstream ends of links incident on node j
Uj = G.predecessors(nodej)
# qij = flow in link from node i to node j
qij = []
# aij = number of equivalnet independent paths through the link from node i to node j
aij = []
for nodei in Uj:
mask = np.array([nodei in path for path in sp])
# NDij = number of paths through the link from node i to node j
NDij = sum(mask)
if NDij == 0:
continue
temp = sp[mask]
# MDij = links in the NDij path
MDij = [(t[idx],t[idx+1]) for t in temp for idx in range(len(t)-1)]
flow = 0
for link in G[nodei][nodej].keys():
flow = flow + G[nodei][nodej][link]['weight']
qij.append(flow)
# dk = degree of link k in MDij
dk = Counter()
for elem in MDij:
# divide by the numnber of links between two nodes
dk[elem] += 1/len(G[elem[0]][elem[1]].keys())
V = np.array(list(dk.values()))
aij.append(NDij*(1-float(sum(V - 1))/sum(V)))
Q[nodej] = sum(qij) # Total flow into node j
# Equation 7
S[nodej] = 0
for idx in range(len(qij)):
if Q[nodej] != 0 and qij[idx]/Q[nodej] > 0:
S[nodej] = S[nodej] - \
qij[idx]/Q[nodej]*math.log(qij[idx]/Q[nodej]) + \
qij[idx]/Q[nodej]*math.log(aij[idx])
Q0 = sum(nx.get_edge_attributes(G, 'weight').values())
# Equation 3
S_ave = 0
for nodej in sinks:
if not np.isnan(S[nodej]):
if nodej not in sources:
if Q[nodej]/Q0 > 0:
S_ave = S_ave + \
(Q[nodej]*S[nodej])/Q0 - \
Q[nodej]/Q0*math.log(Q[nodej]/Q0)
S = pd.Series(S) # convert S to a series
return [S, S_ave]