Source code for wntr.network.controls

"""
The wntr.network.controls module includes methods to define network controls
and control actions.  These controls modify parameters in the network during
simulation.
"""
import math
import enum
import numpy as np
import logging
import six
from .elements import LinkStatus
import abc
from wntr.utils.ordered_set import OrderedSet
from collections import OrderedDict
from .elements import Tank, Junction, Valve, Pump, Reservoir, Pipe, Link
from wntr.utils.doc_inheritor import DocInheritor
import warnings
from typing import Hashable, Dict, Any, Tuple, MutableSet, Iterable

logger = logging.getLogger(__name__)

# Control Priorities:
# 0 is the lowest
# 3 is the highest
#
# 0:
#    Open check valves/pumps if flow would be forward
#    Open links for time controls
#    Open links for conditional controls
#    Open links connected to tanks if the tank head is larger than the minimum head plus a tolerance
#    Open links connected to tanks if the tank head is smaller than the maximum head minus a tolerance
#    Open pumps if power comes back up
#    Start/stop leaks
# 1:
#    Close links connected to tanks if the tank head is less than the minimum head (except check valves and pumps than
#    only allow flow in).
#    Close links connected to tanks if the tank head is larger than the maximum head (exept check valves and pumps that
#    only allow flow out).
# 2:
#    Open links connected to tanks if the level is low but flow would be in
#    Open links connected to tanks if the level is high but flow would be out
#    Close links connected to tanks if the level is low and flow would be out
#    Close links connected to tanks if the level is high and flow would be in
# 3:
#    Close links for time controls
#    Close links for conditional controls
#    Close check valves/pumps for negative flow
#    Close pumps without power

def _ensure_iterable(to_check: Any)->Iterable[Any]:
    """Make sure the input is interable

    Parameters
    ----------
    to_check : Any
        The input to check which can be of any type including None

    Returns
    -------
    Iterable[Any]
        to_check as an iterable object, if None an empty list is returned
    """
    if isinstance(to_check, Iterable):
        to_return = list(to_check)
    elif to_check is not None:
        to_return = [to_check]
    else:
        to_return = []
    return to_return

[docs] class Subject(object): """ A subject base class for the observer design pattern """
[docs] def __init__(self): self._observers = OrderedSet()
[docs] def subscribe(self, observer): """ Subscribe observer to this subject. The update method of any observers of this subject will be called when notify is called on this subject. Parameters ---------- observer: Observer """ self._observers.add(observer)
[docs] def unsubscribe(self, observer): """ Unsubscribe observer from this subject. Parameters ---------- observer: Observer """ self._observers.remove(observer)
[docs] def notify(self): """ Call the update method for all observers of this subject. """ for o in self._observers: o.update(self)
[docs] class Observer(six.with_metaclass(abc.ABCMeta, object)): """ A base class for observers in the observer design pattern. """
[docs] @abc.abstractmethod def update(self, subject): """ This method is called when the subject being observed calls notify. Parameters ---------- subject: Subject The subject that called notify. """ pass
[docs] class Comparison(enum.Enum): """ An enum class for comparison operators. .. rubric:: Enum Members =========== ============================================== :attr:`~gt` greater than :attr:`~ge` greater than or equal to :attr:`~lt` less than :attr:`~le` less than or equal to :attr:`~eq` equal to :attr:`~ne` not equal to =========== ============================================== """ gt = (1, np.greater) ge = (2, np.greater_equal) lt = (3, np.less) le = (4, np.less_equal) eq = (5, np.equal) ne = (6, np.not_equal) def __str__(self): return '-' + self.name @property def func(self): """The function call to use for this comparison""" value = getattr(self, '_value_') return value[1] __call__ = func @property def symbol(self): if self is Comparison.eq: return '=' elif self is Comparison.ne: return '<>' elif self is Comparison.gt: return '>' elif self is Comparison.ge: return '>=' elif self is Comparison.lt: return '<' elif self is Comparison.le: return '<=' raise ValueError('Unknown Enum: Comparison.%s'%self) @property def text(self): if self is Comparison.eq: return 'Is' elif self is Comparison.ne: return 'Not' elif self is Comparison.gt: return 'Above' elif self is Comparison.ge: return '>=' elif self is Comparison.lt: return 'Below' elif self is Comparison.le: return '<=' raise ValueError('Unknown Enum: Comparison.%s'%self)
[docs] @classmethod def parse(cls, func): if isinstance(func, six.string_types): func = func.lower().strip() elif isinstance(func, cls): func = func.func if func in [np.equal, '=', 'eq', '-eq', '==', 'is', 'equal', 'equal to']: return cls.eq elif func in [np.not_equal, '<>', 'ne', '-ne', '!=', 'not', 'not_equal', 'not equal to']: return cls.ne elif func in [np.greater, '>', 'gt', '-gt', 'above', 'after', 'greater', 'greater than']: return cls.gt elif func in [np.less, '<', 'lt', '-lt', 'below', 'before', 'less', 'less than']: return cls.lt elif func in [np.greater_equal, '>=', 'ge', '-ge', 'greater_equal', 'greater than or equal to']: return cls.ge elif func in [np.less_equal, '<=', 'le', '-le', 'less_equal', 'less than or equal to']: return cls.le raise ValueError('Invalid Comparison name: %s'%func)
# # Control Condition classes #
[docs] class ControlPriority(enum.IntEnum): """ An enum class for control priorities. .. rubric:: Enum Members ==================== ===================================================== :attr:`~very_low` very low priority :attr:`~low` low priority :attr:`~medium_low` medium low priority :attr:`~medium` medium priority :attr:`~medium_high` medium high priority :attr:`~high` high priority :attr:`~very_high` very high priority ==================== ===================================================== """ very_low = 0 low = 1 medium_low = 2 medium = 3 medium_high = 4 high = 5 very_high = 6
class _ControlType(enum.Enum): presolve = 0 postsolve = 1 rule = 2 pre_and_postsolve = 3 feasibility = 4 # controls necessary to ensure the problem being solved is feasible
[docs] class ControlCondition(six.with_metaclass(abc.ABCMeta, object)): """A base class for control conditions"""
[docs] def __init__(self): self._backtrack = 0
def _reset(self): pass
[docs] @abc.abstractmethod def requires(self): """ Returns a set of objects required to evaluate this condition Returns ------- required_objects: OrderedSet of object """ return OrderedSet()
@property def name(self): """ Returns the string representation of the condition. Returns ------- name: str """ return str(self) @property def backtrack(self): """ The amount of time by which the simulation should be backed up. Should be updated by the :class:`~wntr.network.controls.ControlCondition.evaluate` method if appropriate. Returns ------- backtrack: int """ return self._backtrack
[docs] @abc.abstractmethod def evaluate(self): """ Check if the condition is satisfied. Returns ------- check: bool """ pass
def __bool__(self): """ Check if the condition is satisfied. Returns ------- check: bool """ return self.evaluate() __nonzero__ = __bool__ @classmethod def _parse_value(cls, value): try: v = float(value) return v except ValueError: value = value.upper() if value == 'CLOSED': return 0 if value == 'OPEN': return 1 if value == 'ACTIVE': return 2 PM = 0 words = value.split() if len(words) > 1: if words[1] == 'PM': PM = 86400 / 2 hms = words[0].split(':') v = 0 if len(hms) > 2: v += int(hms[2]) if len(hms) > 1: v += int(hms[1])*60 if len(hms) > 0: v += int(hms[0])*3600 if int(hms[0]) <= 12: v += PM return v def _repr_value(self, attr, value): if attr.lower() in ['status'] and isinstance(value, str): return value.upper() if attr.lower() in ['status'] and int(value) == value: return LinkStatus(int(value)).name.upper() return value @classmethod def _sec_to_hours_min_sec(cls, value): sec = float(value) hours = int(sec/3600.) sec -= hours*3600 mm = int(sec/60.) sec -= mm*60 return '{:02d}:{:02d}:{:02d}'.format(hours, mm, int(sec)) @classmethod def _sec_to_days_hours_min_sec(cls, value): sec = float(value) days = int(sec/86400.) sec -= days*86400 hours = int(sec/3600.) sec -= hours*3600 mm = int(sec/60.) sec -= mm*60 if days > 0: return '{}-{:02d}:{:02d}:{:02d}'.format(days, hours, mm, int(sec)) else: return '{:02d}:{:02d}:{:02d}'.format(hours, mm, int(sec)) @classmethod def _sec_to_clock(cls, value): sec = float(value) hours = int(sec/3600.) sec -= hours*3600 mm = int(sec/60.) sec -= mm*60 if hours >= 12: pm = 'PM' if hours > 12: hours -= 12 elif hours == 0: pm = 'AM' hours = 12 else: pm = 'AM' return '{}:{:02d}:{:02d} {}'.format(hours, mm, int(sec), pm)
[docs] @DocInheritor({'requires', 'evaluate', 'name'}) class TimeOfDayCondition(ControlCondition): """Time-of-day or "clocktime" based condition statement. Resets automatically at 12 AM in clock time (shifted time) every day simulated. Evaluated from 12 AM the first day of the simulation, even if this is prior to simulation start. Unlike the :class:`~wntr.network.controls.SimTimeCondition`, greater-than and less-than relationships make sense, and reset at midnight. Parameters ---------- model : WaterNetworkModel The model that the time is being compared against relation : str or None String options are 'at', 'after' or 'before'. The 'at' and None are equivalent, and only evaluate as True during the simulation step the time occurs. `after` evaluates as True from the time specified until midnight, `before` evaluates as True from midnight until the specified time. threshold : float or str The time (a ``float`` in decimal hours since 12 AM) used in the condition; if provided as a string in 'hh:mm[:ss] [am|pm]' format, the time will be parsed from the string repeat : bool, optional True by default; if False, allows for a single, timed trigger, and probably needs an entry for `first_day`; in this case a relation of `after` becomes True from the time until the end of the simulation, and `before` is True from the beginning of the simulation until the time specified. first_day : float, default=0 Start rule on day `first_day`, with the first day of simulation as day 0 TODO: WE ARE NOT TESTING THIS!!!! """
[docs] def __init__(self, model, relation, threshold, repeat=True, first_day=0): self._model = model if isinstance(threshold, str) and not ':' in threshold: self._threshold = float(threshold) * 3600. else: self._threshold = self._parse_value(threshold) if relation is None: self._relation = Comparison.eq else: self._relation = Comparison.parse(relation) self._first_day = first_day self._repeat = repeat self._backtrack = 0 if model is not None and not self._repeat and self._threshold < model.options.time.start_clocktime and first_day < 1: self._first_day = 1
def _compare(self, other): """ Parameters ---------- other: TimeOfDayCondition Returns ------- bool """ if type(self) != type(other): return False if abs(self._threshold - other._threshold) > 1e-10: return False if self._relation != other._relation: return False if self._first_day != other._first_day: return False if self._repeat != other._repeat: return False return True @property def name(self): if not self._repeat: rep = '/Once' else: rep = '/Daily' if self._first_day > 0: start = '/FirstDay/{}'.format(self._first_day) else: start = '/' return 'ClockTime/{}/{}{}{}'.format(self._relation.text, self._sec_to_hours_min_sec(self._threshold), rep, start)
[docs] def requires(self): return OrderedSet()
def __repr__(self): fmt = '<TimeOfDayCondition: model, {}, {}, {}, {}>' return fmt.format(repr(self._relation.text), repr(self._sec_to_clock(self._threshold)), repr(self._repeat), repr(self._first_day)) def __str__(self): fmt = 'SYSTEM CLOCKTIME {:s} {}'.format(self._relation.text.upper(), self._sec_to_clock(self._threshold)) if not self._repeat: fmt = '( ' + ' && clock_day == {} )'.format(self._first_day) elif self._first_day > 0: fmt = '( ' + ' && clock_day >= {} )'.format(self._first_day) return fmt
[docs] def evaluate(self): cur_time = self._model._shifted_time prev_time = self._model._prev_shifted_time day = np.floor(cur_time/86400) if day < self._first_day: self._backtrack = None return False if self._repeat: cur_time = int(cur_time - self._threshold) % 86400 prev_time = int(prev_time - self._threshold) % 86400 else: cur_time = cur_time - self._first_day * 86400. prev_time = prev_time - self._first_day * 86400. if self._relation is Comparison.eq and (prev_time < self._threshold and self._threshold <= cur_time): self._backtrack = int(cur_time - self._threshold) return True elif self._relation is Comparison.gt and cur_time >= self._threshold and prev_time < self._threshold: self._backtrack = int(cur_time - self._threshold) return True elif self._relation is Comparison.gt and cur_time >= self._threshold and prev_time >= self._threshold: self._backtrack = 0 return True elif self._relation is Comparison.lt and cur_time >= self._threshold and prev_time < self._threshold: self._backtrack = int(cur_time - self._threshold) return False elif self._relation is Comparison.lt and cur_time >= self._threshold and prev_time >= self._threshold: self._backtrack = 0 return False else: self._backtrack = 0 return False
[docs] @DocInheritor({'requires', 'evaluate', 'name'}) class SimTimeCondition(ControlCondition): """Condition based on time since start of the simulation. Generally, the relation should be ``None`` (converted to "at") -- then it is *only* evaluated "at" specific times. Using greater-than or less-than type relationships should be reserved for complex, multi-condition statements and should not be used for simple controls. If ``repeat`` is used, the relationship will automatically be changed to an "at time" evaluation, and a warning will be raised. Parameters ---------- model : WaterNetworkModel The model that the time threshold is being compared against relation : str or None String options are 'at', 'after' or 'before'. The 'at' and None are equivalent, and only evaluate as True during the simulation step the time occurs. After evaluates as True from the time specified until the end of simulation, before evaluates as True from start of simulation until the specified time. threshold : float or str The time (a ``float`` in decimal hours) used in the condition; if provided as a string in '[dd-]hh:mm[:ss]' format, then the time will be parsed from the string; repeat : bool or float, default=False If True, then repeat every 24-hours; if non-zero float, reset the condition every `repeat` seconds after the first_time. first_time : float, default=0 Start rule at `first_time`, using that time as 0 for the condition evaluation """
[docs] def __init__(self, model, relation, threshold, repeat=False, first_time=0): self._model = model if isinstance(threshold, str) and not ':' in threshold: self._threshold = float(threshold) * 3600. else: self._threshold = self._parse_value(threshold) if relation is None: self._relation = Comparison.eq else: self._relation = Comparison.parse(relation) self._repeat = repeat if repeat is True: self._repeat = 86400 self._backtrack = 0 self._first_time = first_time
def _compare(self, other): """ Parameters ---------- other: SimTimeCondition Returns ------- bool """ if type(self) != type(other): return False if abs(self._threshold - other._threshold) > 1e-10: return False if self._repeat != other._repeat: return False if self._first_time != other._first_time: return False if self._relation != other._relation: return False return True @property def name(self): if not self._repeat: rep = '' else: rep = '%Every{}sec'.format(self._repeat) if self._first_time > 0: start = '#Start@{}sec'.format((self._first_time)) else: start = '' return 'SimTime{}{}{}{}'.format(self._relation.symbol, (self._threshold), rep, start) def __repr__(self): fmt = '<SimTimeCondition: model, {}, {}, {}, {}>' return fmt.format(repr(self._relation.text), repr(self._sec_to_days_hours_min_sec(self._threshold)), repr(self._repeat), repr(self._first_time)) def __str__(self): fmt = 'SYSTEM TIME {} {}'.format(self._relation.text.upper(), self._sec_to_hours_min_sec(self._threshold)) if self._repeat is True: fmt = '% 86400.0 ' + fmt elif self._repeat > 0: fmt = '% {:.1f} '.format(int(self._repeat)) + fmt if self._first_time > 0: fmt = '(sim_time - {:d}) '.format(int(self._first_time)) + fmt else: fmt = '' + fmt return fmt
[docs] def requires(self): return OrderedSet()
[docs] def evaluate(self): cur_time = self._model.sim_time prev_time = self._model._prev_sim_time if self._repeat and cur_time > self._threshold: cur_time = (cur_time - self._threshold) % self._repeat prev_time = (prev_time - self._threshold) % self._repeat if self._relation is Comparison.eq and (prev_time < self._threshold and self._threshold <= cur_time): self._backtrack = int(cur_time - self._threshold) return True elif self._relation is Comparison.gt and cur_time > self._threshold: self._backtrack = 0 return True elif self._relation is Comparison.ge and cur_time >= self._threshold and prev_time < self._threshold: self._backtrack = int(cur_time - self._threshold) return True elif self._relation is Comparison.ge and cur_time >= self._threshold and prev_time >= self._threshold: self._backtrack = 0 return True elif self._relation is Comparison.lt and cur_time < self._threshold: self._backtrack = 0 return True elif self._relation is Comparison.le and cur_time <= self._threshold: self._backtrack = 0 return True elif self._relation is Comparison.le and prev_time < self._threshold: self._backtrack = int(cur_time - self._threshold) return True else: self._backtrack = 0 return False
[docs] @DocInheritor({'requires', 'evaluate', 'name'}) class ValueCondition(ControlCondition): """Compare a network element attribute to a set value. Parameters ---------- source_obj : object The object (such as a Junction, Tank, Pipe, etc.) to use in the comparison source_attr : str The attribute of the object (such as level, pressure, setting, etc.) to compare against the threshold operation : function or str A two-parameter comparison function (e.g., numpy.greater, numpy.less_equal), or a string describing the comparison (e.g., '=', 'below', 'is', '>=', etc.) Words, such as 'below', are only accepted from the EPANET rules conditions list (see ...) threshold : float A value to compare the source object attribute against """
[docs] def __new__(cls, source_obj, source_attr, relation, threshold): if isinstance(source_obj, Tank) and source_attr in {'level', 'pressure', 'head'}: return object.__new__(TankLevelCondition) else: return object.__new__(ValueCondition)
def __getnewargs__(self): return self._source_obj, self._source_attr, self._relation, self._threshold
[docs] def __init__(self, source_obj, source_attr, relation, threshold): self._source_obj = source_obj self._source_attr = source_attr self._relation = Comparison.parse(relation) self._threshold = ControlCondition._parse_value(threshold) self._backtrack = 0
def _compare(self, other): """ Parameters ---------- other: ValueCondition Returns ------- bool """ if type(self) != type(other): return False if not self._source_obj._compare(other._source_obj): return False if self._source_attr != other._source_attr: return False if abs(self._threshold - other._threshold) > 1e-10: return False if self._relation != other._relation: return False return True
[docs] def requires(self): return OrderedSet([self._source_obj])
@property def name(self): if hasattr(self._source_obj, 'name'): obj = self._source_obj.name else: obj = str(self._source_obj) return '{}:{}{}{}'.format(obj, self._source_attr, self._relation.symbol, self._threshold) def __repr__(self): return "<ValueCondition: {}, {}, {}, {}>".format(str(self._source_obj), str(self._source_attr), str(self._relation.symbol), str(self._threshold)) def __str__(self): typ = self._source_obj.__class__.__name__ if 'Pump' in typ: typ = 'Pump' elif 'Valve' in typ: typ = 'Valve' obj = str(self._source_obj) if hasattr(self._source_obj, 'name'): obj = self._source_obj.name att = self._source_attr rel = self._relation.text val = self._repr_value(att, self._threshold) return "{} {} {} {} {}".format(typ.upper(), obj, att.upper(), rel.upper(), val)
[docs] def evaluate(self): cur_value = getattr(self._source_obj, self._source_attr) thresh_value = self._threshold relation = self._relation.func if np.isnan(self._threshold): relation = np.greater thresh_value = 0.0 state = relation(np.round(cur_value,10), np.round(thresh_value,10)) return bool(state)
[docs] @DocInheritor({'requires', 'evaluate', 'name'}) class FunctionCondition(ControlCondition): """ A ControlCondition which calls a function to determine if the control needs activated or not. If the function returns True, then the control is activated. """
[docs] def __init__(self, func, func_kwargs=None, requires=None): super(FunctionCondition, self).__init__() self._func = func if func_kwargs is None: self._func_kwargs = dict() else: self._func_kwargs = func_kwargs if requires is None: self._requires = OrderedSet() else: self._requires = OrderedSet(requires)
[docs] def evaluate(self): return bool(self._func(**self._func_kwargs))
[docs] def requires(self): return self._requires
[docs] @DocInheritor({'requires', 'evaluate'}) class TankLevelCondition(ValueCondition): """ A special type of ValueCondition for tank levels/heads/pressures. """
[docs] def __init__(self, source_obj, source_attr, relation, threshold): relation = Comparison.parse(relation) if relation not in {Comparison.ge, Comparison.le, Comparison.gt, Comparison.lt}: raise ValueError('TankLevelConditions only support <= and >= relations.') super(TankLevelCondition, self).__init__(source_obj, source_attr, relation, threshold) assert source_attr in {'level', 'pressure', 'head'} # this is used to see if backtracking is needed self._last_value = getattr(self._source_obj, self._source_attr)
def _reset(self): self._last_value = getattr(self._source_obj, self._source_attr) # this is used to see if backtracking is needed def _compare(self, other): """ Parameters ---------- other: TankLevelCondition Returns ------- bool """ if type(self) != type(other): return False if not self._source_obj._compare(other._source_obj): return False if self._source_attr != other._source_attr: return False if abs(self._threshold - other._threshold) > 1e-10: return False if self._relation != other._relation: return False return True
[docs] def evaluate(self): self._backtrack = 0 # no backtracking is needed unless specified in the if statement below cur_value = getattr(self._source_obj, self._source_attr) # get the current tank level, head, or pressure thresh_value = self._threshold relation = self._relation if relation is Comparison.gt: relation = Comparison.ge if relation is Comparison.lt: relation = Comparison.le if np.isnan(self._threshold): # what is this doing? relation = np.greater thresh_value = 0.0 state = relation(np.round(cur_value,10), np.round(thresh_value,10)) # determine if the condition is satisfied if state and not relation(np.round(self._last_value,10), np.round(thresh_value,10)): # if the condition is satisfied and the last value did not satisfy the condition, then backtracking # is needed. # The math.floor is not actually needed, but I leave it here for clarity. We want the backtrack value to be # slightly lower than what the floating point computation would give. This ensures the next time step will # be slightly later than when the tank level hits the threshold. This ensures the tank level will go # slightly beyond the threshold. This ensures that relation(self._last_value, thresh_value) will be True # next time. This prevents us from computing very small backtrack values over and over. if self._source_obj.demand != 0 and not self._source_obj.demand is None: if self._source_obj.vol_curve is None: self._backtrack = int(math.floor((cur_value - thresh_value) *math.pi/4.0*self._source_obj.diameter**2 /self._source_obj.demand)) else: # a volume curve must be used instead if self._source_attr == 'head': thresh_level = thresh_value - self._source_obj.elevation level = cur_value - self._source_obj.elevation elif self._source_attr == 'level': thresh_level = thresh_value level = cur_value else: raise NotImplementedError("Pressure tank value conditions with a " + "volume curve have not been implemented.") cur_value_volume = self._source_obj.get_volume(level) thresh_volume = self._source_obj.get_volume(thresh_level) self._backtrack = int(math.floor((cur_value_volume - thresh_volume) / self._source_obj.demand)) self._last_value = cur_value # update the last value return bool(state)
[docs] @DocInheritor({'requires', 'evaluate', 'name'}) class RelativeCondition(ControlCondition): """Compare attributes of two different objects (e.g., levels from tanks 1 and 2) This type of condition does not work with the EpanetSimulator, only the WNTRSimulator. Parameters ---------- source_obj : object The object (such as a Junction, Tank, Pipe, etc.) to use in the comparison source_attr : str The attribute of the object (such as level, pressure, setting, etc.) to compare against the threshold relation : function A numpy or other comparison method that takes two values and returns a bool (e.g., numpy.greater, numpy.less_equal) threshold_obj : object The object (such as a Junction, Tank, Pipe, etc.) to use in the comparison of attributes threshold_attr : str The attribute to used in the comparison evaluation """
[docs] def __init__(self, source_obj, source_attr, relation, threshold_obj, threshold_attr): self._source_obj = source_obj self._source_attr = source_attr self._relation = Comparison.parse(relation) self._threshold_obj = threshold_obj self._threshold_attr = threshold_attr self._backtrack = 0
def _compare(self, other): """ Parameters ---------- other: RelativeCondition Returns ------- bool """ if type(self) != type(other): return False if not self._source_obj._compare(other._source_obj): return False if self._source_attr != other._source_attr: return False if self._relation != other._relation: return False if not self._threshold_obj._compare(other._threshold_obj): return False if self._threshold_attr != other._threshold_attr: return False return True @property def name(self): if hasattr(self._source_obj, 'name'): obj = self._source_obj.name else: obj = str(self._source_obj) if hasattr(self._threshold_obj, 'name'): tobj = self._threshold_obj.name else: tobj = str(self._threshold_obj) return '{}:{}_{}_{}:{}'.format(obj, self._source_attr, self._relation.symbol, tobj, self._threshold_attr)
[docs] def requires(self): return OrderedSet([self._source_obj, self._threshold_obj])
def __repr__(self): return "RelativeCondition({}, {}, {}, {}, {})".format(str(self._source_obj), str(self._source_attr), str(self._relation), str(self._threshold_obj), str(self._threshold_attr)) def __str__(self): typ = self._source_obj.__class__.__name__ obj = str(self._source_obj) if hasattr(self._source_obj, 'name'): obj = self._source_obj.name att = self._source_attr rel = self._relation.symbol ttyp = self._threshold_obj.__class__.__name__ if hasattr(self._threshold_obj, 'name'): tobj = self._threshold_obj.name else: tobj = str(self._threshold_obj) tatt = self._threshold_attr fmt = "{}('{}').{} {} {}('{}').{}" return fmt.format(typ, obj, att, rel, ttyp, tobj, tatt)
[docs] def evaluate(self): cur_value = getattr(self._source_obj, self._source_attr) thresh_value = getattr(self._threshold_obj, self._threshold_attr) relation = self._relation.func state = relation(cur_value, thresh_value) return bool(state)
[docs] @DocInheritor({'requires', 'evaluate', 'backtrack'}) class OrCondition(ControlCondition): """Combine two WNTR Conditions with an OR. Parameters ---------- cond1 : ControlCondition The first condition cond2 : ControlCondition The second condition """
[docs] def __init__(self, cond1, cond2): self._condition_1 = cond1 self._condition_2 = cond2 if isinstance(cond1, TankLevelCondition): if cond1._relation is Comparison.eq: logger.warning('Using Comparison.eq with {0} will probably not work!'.format(type(cond1))) warnings.warn('Using Comparison.eq with {0} will probably not work!'.format(type(cond1))) if isinstance(cond2, TankLevelCondition): if cond2._relation is Comparison.eq: logger.warning('Using Comparison.eq with {0} will probably not work!'.format(type(cond2))) warnings.warn('Using Comparison.eq with {0} will probably not work!'.format(type(cond2)))
def _reset(self): self._condition_1._reset() self._condition_2._reset() def _compare(self, other): """ Parameters ---------- other: OrCondition Returns ------- bool """ if type(self) != type(other): return False if not self._condition_1._compare(other._condition_1): return False if not self._condition_2._compare(other._condition_2): return False return True def __str__(self): return " " + str(self._condition_1) + " OR " + str(self._condition_2) + " " def __repr__(self): return 'Or({}, {})'.format(repr(self._condition_1), repr(self._condition_2))
[docs] def evaluate(self): return bool(self._condition_1) or bool(self._condition_2)
@property def backtrack(self): return np.max([self._condition_1.backtrack, self._condition_2.backtrack])
[docs] def requires(self): req = self._condition_1.requires() req.update(self._condition_2.requires()) return req
[docs] @DocInheritor({'requires', 'evaluate', 'backtrack'}) class AndCondition(ControlCondition): """Combine two WNTR Conditions with an AND Parameters ---------- cond1 : ControlCondition The first condition cond2 : ControlCondition The second condition """
[docs] def __init__(self, cond1, cond2): self._condition_1 = cond1 self._condition_2 = cond2 if isinstance(cond1, TankLevelCondition): if cond1._relation is Comparison.eq: logger.warning('Using Comparison.eq with {0} will probably not work!'.format(type(cond1))) warnings.warn('Using Comparison.eq with {0} will probably not work!'.format(type(cond1))) if isinstance(cond2, TankLevelCondition): if cond2._relation is Comparison.eq: logger.warning('Using Comparison.eq with {0} will probably not work!'.format(type(cond2))) warnings.warn('Using Comparison.eq with {0} will probably not work!'.format(type(cond2)))
def _reset(self): self._condition_1._reset() self._condition_2._reset() def _compare(self, other): """ Parameters ---------- other: OrCondition Returns ------- bool """ if type(self) != type(other): return False if not self._condition_1._compare(other._condition_1): return False if not self._condition_2._compare(other._condition_2): return False return True def __str__(self): return " "+ str(self._condition_1) + " AND " + str(self._condition_2) + " " def __repr__(self): return 'And({}, {})'.format(repr(self._condition_1), repr(self._condition_2))
[docs] def evaluate(self): return bool(self._condition_1) and bool(self._condition_2)
@property def backtrack(self): return np.min([self._condition_1.backtrack, self._condition_2.backtrack])
[docs] def requires(self): req = self._condition_1.requires() req.update(self._condition_2.requires()) return req
class _CloseCVCondition(ControlCondition): Htol = 0.0001524 Qtol = 2.83168e-6 def __init__(self, wn, cv): self._cv = cv self._start_node = wn.get_node(cv.start_node) self._end_node = wn.get_node(cv.end_node) self._backtrack = 0 def requires(self): return OrderedSet([self._cv, self._start_node, self._end_node]) def evaluate(self): """ If True is returned, the cv needs to be closed """ dh = self._start_node.head - self._end_node.head if abs(dh) > self.Htol: if dh < -self.Htol: return True elif self._cv.flow < -self.Qtol: return True else: return False else: if self._cv.flow < -self.Qtol: return True else: return False def __str__(self): s = '{0} head - {1} head < -{2} or {3} flow < {4}'.format(self._start_node.name, self._end_node.name, self.Htol, self._cv.name, -self.Qtol) return s class _OpenCVCondition(ControlCondition): Htol = 0.0001524 Qtol = 2.83168e-6 def __init__(self, wn, cv): self._cv = cv self._start_node = wn.get_node(cv.start_node) self._end_node = wn.get_node(cv.end_node) self._backtrack = 0 def requires(self): return OrderedSet([self._cv, self._start_node, self._end_node]) def evaluate(self): """ If True is returned, the cv needs to be closed """ dh = self._start_node.head - self._end_node.head if abs(dh) > self.Htol: if dh < -self.Htol: return False elif self._cv.flow < -self.Qtol: return False else: return True else: return False def __str__(self): s = '{0} head - {1} head > {2} and {3} flow >= {4}'.format(self._start_node.name, self._end_node.name, self.Htol, self._cv.name, -self.Qtol) return s class _ClosePowerPumpCondition(ControlCondition): """ Prevents reverse flow in pumps. """ Htol = 0.0001524 Qtol = 2.83168e-6 Hmax = 1e10 def __init__(self, wn, pump): """ Parameters ---------- wn: wntr.network.WaterNetworkModel pump: wntr.network.Pump """ self._pump = pump self._start_node = wn.get_node(pump.start_node) self._end_node = wn.get_node(pump.end_node) self._backtrack = 0 def requires(self): return OrderedSet([self._pump, self._start_node, self._end_node]) def evaluate(self): """ If True is returned, the pump needs to be closed """ dh = self._end_node.head - self._start_node.head if dh > self.Hmax + self.Htol: return True return False def __str__(self): s = '{0} head - {1} head > {2:.4f}'.format(self._end_node.name, self._start_node.name, self.Hmax + self.Htol) return s class _OpenPowerPumpCondition(ControlCondition): Htol = 0.0001524 Qtol = 2.83168e-6 Hmax = 1e10 def __init__(self, wn, pump): """ Parameters ---------- wn: wntr.network.WaterNetworkModel pump: wntr.network.Pump """ self._pump = pump self._start_node = wn.get_node(pump.start_node) self._end_node = wn.get_node(pump.end_node) self._backtrack = 0 def requires(self): return OrderedSet([self._pump, self._start_node, self._end_node]) def evaluate(self): """ If True is returned, the pump needs to be opened """ dh = self._end_node.head - self._start_node.head if dh <= self.Hmax + self.Htol: return True return False def __str__(self): s = '{0} head - {1} head <= {2:.4f}'.format(self._end_node.name, self._start_node.name, self.Hmax + self.Htol) return s class _CloseHeadPumpCondition(ControlCondition): """ Prevents reverse flow in pumps. """ _Htol = 0.0001524 def __init__(self, wn, pump): """ Parameters ---------- wn: wntr.network.WaterNetworkModel pump: wntr.network.Pump """ self._pump = pump self._start_node = wn.get_node(pump.start_node) self._end_node = wn.get_node(pump.end_node) self._backtrack = 0 self._wn = wn def requires(self): return OrderedSet([self._pump, self._start_node, self._end_node]) def evaluate(self): """ If True is returned, the pump needs to be closed """ a, b, c = self._pump.get_head_curve_coefficients() if self._pump.speed_timeseries.at(self._wn.sim_time) != 1.0: raise NotImplementedError('Pump speeds other than 1.0 are not yet supported.') Hmax = a dh = self._end_node.head - self._start_node.head if dh > Hmax + self._Htol: return True return False def __str__(self): a, b, c = self._pump.get_head_curve_coefficients() if self._pump.speed_timeseries.at(self._wn.sim_time) != 1.0: raise NotImplementedError('Pump speeds other than 1.0 are not yet supported.') Hmax = a s = '{0} head - {1} head > {2:.4f}'.format(self._end_node.name, self._start_node.name, Hmax + self._Htol) return s class _OpenHeadPumpCondition(ControlCondition): """ Prevents reverse flow in pumps. """ _Htol = 0.0001524 def __init__(self, wn, pump): """ Parameters ---------- wn: wntr.network.WaterNetworkModel pump: wntr.network.Pump """ self._pump = pump self._start_node = wn.get_node(pump.start_node) self._end_node = wn.get_node(pump.end_node) self._backtrack = 0 self._wn = wn def requires(self): return OrderedSet([self._pump, self._start_node, self._end_node]) def evaluate(self): """ If True is returned, the pump needs to be closed """ a, b, c = self._pump.get_head_curve_coefficients() if self._pump.speed_timeseries.at(self._wn.sim_time) != 1.0: raise NotImplementedError('Pump speeds other than 1.0 are not yet supported.') Hmax = a dh = self._end_node.head - self._start_node.head if dh <= Hmax + self._Htol: return True return False def __str__(self): a, b, c = self._pump.get_head_curve_coefficients() if self._pump.speed_timeseries.at(self._wn.sim_time) != 1.0: raise NotImplementedError('Pump speeds other than 1.0 are not yet supported.') Hmax = a s = '{0} head - {1} head <= {2:.4f}'.format(self._end_node.name, self._start_node.name, Hmax + self._Htol) return s class _ClosePRVCondition(ControlCondition): _Qtol = 2.83168e-6 def __init__(self, wn, prv): """ Parameters ---------- wn: wntr.network.WaterNetworkModel prv: wntr.network.Valve """ super(_ClosePRVCondition, self).__init__() self._prv = prv self._start_node = wn.get_node(self._prv.start_node) self._end_node = wn.get_node(self._prv.end_node) self._backtrack = 0 def requires(self): return OrderedSet([self._prv]) def evaluate(self): if self._prv._internal_status == LinkStatus.Active: if self._prv.flow < -self._Qtol: return True return False elif self._prv._internal_status == LinkStatus.Open: if self._prv.flow < -self._Qtol: return True return False elif self._prv._internal_status == LinkStatus.Closed: return False else: raise RuntimeError('Unexpected PRV _internal_status for valve {0}: {1}.'.format(self._prv, self._prv._internal_status)) def __str__(self): s = 'prv {0} needs to be closed'.format(self._prv.name) return s class _OpenPRVCondition(ControlCondition): _Qtol = 2.83168e-6 _Htol = 0.0001524 def __init__(self, wn, prv): """ Parameters ---------- wn: wntr.network.WaterNetworkModel prv: wntr.network.Valve """ super(_OpenPRVCondition, self).__init__() self._prv = prv self._start_node = wn.get_node(self._prv.start_node) self._end_node = wn.get_node(self._prv.end_node) self._backtrack = 0 self._r = 8.0 * self._prv.minor_loss / (9.81 * math.pi**2 * self._prv.diameter**4) def requires(self): return OrderedSet([self._prv, self._start_node, self._end_node]) def evaluate(self): if self._prv._internal_status == LinkStatus.Active: if self._prv.flow < -self._Qtol: return False elif self._start_node.head < self._prv.setting + self._end_node.elevation + self._r * abs(self._prv.flow)**2 - self._Htol: return True return False elif self._prv._internal_status == LinkStatus.Open: return False elif self._prv._internal_status == LinkStatus.Closed: if self._start_node.head >= self._prv.setting + self._end_node.elevation + self._Htol and self._end_node.head < self._prv.setting + self._end_node.elevation - self._Htol: return False elif self._start_node.head < self._prv.setting + self._end_node.elevation - self._Htol and self._start_node.head > self._end_node.head + self._Htol: return True return False else: raise RuntimeError('Unexpected PRV _internal_status for valve {0}: {1}.'.format(self._prv, self._prv._internal_status)) def __str__(self): s = 'prv {0} needs to be open'.format(self._prv.name) return s class _ActivePRVCondition(ControlCondition): _Qtol = 2.83168e-6 _Htol = 0.0001524 def __init__(self, wn, prv): """ Parameters ---------- wn: wntr.network.WaterNetworkModel prv: wntr.network.Valve """ self._prv = prv self._start_node = wn.get_node(self._prv.start_node) self._end_node = wn.get_node(self._prv.end_node) self._backtrack = 0 self._r = 8.0 * self._prv.minor_loss / (9.81 * math.pi**2 * self._prv.diameter**4) def requires(self): return OrderedSet([self._prv, self._start_node, self._end_node]) def evaluate(self): if self._prv._internal_status == LinkStatus.Active: return False elif self._prv._internal_status == LinkStatus.Open: if self._prv.flow < -self._Qtol: return False elif (self._end_node.head >= self._prv.setting + self._end_node.elevation + self._Htol): return True return False elif self._prv._internal_status == LinkStatus.Closed: if ((self._start_node.head >= self._prv.setting + self._end_node.elevation + self._Htol) and (self._end_node.head < self._prv.setting + self._end_node.elevation - self._Htol)): return True return False else: raise RuntimeError('Unexpected PRV _internal_status for valve {0}: {1}.'.format(self._prv, self._prv._internal_status)) def __str__(self): s = 'prv {0} needs to be active'.format(self._prv.name) return s class _ClosePSVCondition(ControlCondition): _Qtol = 2.83168e-6 def __init__(self, wn, psv): """ Parameters ---------- wn: wntr.network.WaterNetworkModel psv: wntr.network.Valve """ super(_ClosePSVCondition, self).__init__() self._psv = psv self._start_node = wn.get_node(self._psv.start_node) self._end_node = wn.get_node(self._psv.end_node) self._backtrack = 0 def requires(self): return OrderedSet([self._psv]) def evaluate(self): if self._psv._internal_status == LinkStatus.Active: if self._psv.flow < -self._Qtol: return True return False elif self._psv._internal_status == LinkStatus.Open: if self._psv.flow < -self._Qtol: return True return False elif self._psv._internal_status == LinkStatus.Closed: return False else: raise RuntimeError('Unexpected PSV _internal_status for valve {0}: {1}.'.format(self._psv, self._psv._internal_status)) def __str__(self): s = 'psv {0} needs to be closed'.format(self._psv.name) return s class _OpenPSVCondition(ControlCondition): _Qtol = 2.83168e-6 _Htol = 0.0001524 def __init__(self, wn, psv): """ Parameters ---------- wn: wntr.network.WaterNetworkModel psv: wntr.network.Valve """ super(_OpenPSVCondition, self).__init__() self._psv = psv self._start_node = wn.get_node(self._psv.start_node) self._end_node = wn.get_node(self._psv.end_node) self._backtrack = 0 self._r = 8.0 * self._psv.minor_loss / (9.81 * math.pi**2 * self._psv.diameter**4) def requires(self): return OrderedSet([self._psv, self._start_node, self._end_node]) def evaluate(self): setting = self._psv.setting + self._start_node.elevation if self._psv._internal_status == LinkStatus.Active: if self._psv.flow < -self._Qtol: return False elif self._end_node.head + self._r * abs(self._psv.flow)**2 > setting + self._Htol: return True return False elif self._psv._internal_status == LinkStatus.Open: return False elif self._psv._internal_status == LinkStatus.Closed: if ((self._end_node.head > setting + self._Htol) and (self._start_node.head > self._end_node.head + self._Htol)): return True return False else: raise RuntimeError('Unexpected PSV _internal_status for valve {0}: {1}.'.format(self._psv, self._psv._internal_status)) def __str__(self): s = 'psv {0} needs to be open'.format(self._psv.name) return s class _ActivePSVCondition(ControlCondition): _Qtol = 2.83168e-6 _Htol = 0.0001524 def __init__(self, wn, psv): """ Parameters ---------- wn: wntr.network.WaterNetworkModel psv: wntr.network.Valve """ self._psv = psv self._start_node = wn.get_node(self._psv.start_node) self._end_node = wn.get_node(self._psv.end_node) self._backtrack = 0 self._r = 8.0 * self._psv.minor_loss / (9.81 * math.pi**2 * self._psv.diameter**4) def requires(self): return OrderedSet([self._psv, self._start_node, self._end_node]) def evaluate(self): setting = self._psv.setting + self._start_node.elevation if self._psv._internal_status == LinkStatus.Active: return False elif self._psv._internal_status == LinkStatus.Open: if self._psv.flow < -self._Qtol: return False elif (self._start_node.head < setting - self._Htol): return True return False elif self._psv._internal_status == LinkStatus.Closed: if ((self._end_node.head > setting + self._Htol) and (self._start_node.head > self._end_node.head + self._Htol)): return False elif ((self._start_node.head >= setting + self._Htol) and (self._start_node.head > self._end_node.head + self._Htol)): return True return False else: raise RuntimeError('Unexpected PSV _internal_status for valve {0}: {1}.'.format(self._psv, self._psv._internal_status)) def __str__(self): s = 'psv {0} needs to be active'.format(self._psv.name) return s class _OpenFCVCondition(ControlCondition): _Qtol = 2.83168e-6 _Htol = 0.0001524 def __init__(self, wn, fcv): """ Parameters ---------- wn: wntr.network.WaterNetworkModel fcv: wntr.network.Valve """ self._fcv = fcv self._start_node = wn.get_node(self._fcv.start_node) self._end_node = wn.get_node(self._fcv.end_node) self._backtrack = 0 def requires(self): return OrderedSet([self._fcv, self._start_node, self._end_node]) def evaluate(self): if self._start_node.head - self._end_node.head < -self._Htol: return True elif self._fcv.flow < -self._Qtol: return True else: return False class _ActiveFCVCondition(ControlCondition): _Qtol = 2.83168e-6 _Htol = 0.0001524 def __init__(self, wn, fcv): """ Parameters ---------- wn: wntr.network.WaterNetworkModel fcv: wntr.network.Valve """ self._fcv = fcv self._start_node = wn.get_node(self._fcv.start_node) self._end_node = wn.get_node(self._fcv.end_node) self._backtrack = 0 def requires(self): return OrderedSet([self._fcv, self._start_node, self._end_node]) def evaluate(self): if self._start_node.head - self._end_node.head < -self._Htol: return False elif self._fcv.flow < -self._Qtol: return False elif self._fcv._internal_status == LinkStatus.Open and self._fcv.flow >= self._fcv.setting + self._Qtol: return True else: return False
[docs] class BaseControlAction(six.with_metaclass(abc.ABCMeta, Subject)): """ A base class for deriving new control actions. The control action is run by calling run_control_action. This class is not meant to be used directly. Derived classes must implement the run_control_action, requires, and target methods. """
[docs] def __init__(self): super(BaseControlAction, self).__init__() self._value = None
[docs] @abc.abstractmethod def run_control_action(self): """ This method is called to run the corresponding control action. """ pass
[docs] @abc.abstractmethod def requires(self): """ Returns a set of objects used to evaluate the control Returns ------- req: OrderedSet The objects required to run the control action. """ pass
[docs] @abc.abstractmethod def target(self): """ Returns a tuple (object, attribute) containing the object and attribute that the control action may change Returns ------- target: tuple A tuple containing the target object and the attribute to be changed (target, attr). """ pass
def _compare(self, other): """ Parameters ---------- other: BaseControlAction Returns ------- bool """ if type(self) != type(other): return False target1, attr1 = self.target() target2, attr2 = other.target() val1 = self._value val2 = other._value if not target1._compare(target2): return False if attr1 != attr2: return False if isinstance(val1, float): if abs(val1 - val2) > 1e-10: return False else: if val1 != val2: return False return True
[docs] @DocInheritor({'requires', 'target', 'run_control_action'}) class ControlAction(BaseControlAction): """ A general class for specifying a control action that simply modifies the attribute of an object (target). Parameters ---------- target_obj : object The object whose attribute will be changed when the control runs. attribute : string The attribute that will be changed on the target_obj when the control runs. value : any The new value for target_obj.attribute when the control runs. """
[docs] def __init__(self, target_obj, attribute, value): super(ControlAction, self).__init__() if target_obj is None: raise ValueError('target_obj is None in ControlAction::__init__. A valid target_obj is needed.') if not hasattr(target_obj, attribute): raise ValueError('attribute given in ControlAction::__init__ is not valid for target_obj') self._target_obj = target_obj self._attribute = attribute self._value = value self._private_attribute = attribute if attribute == 'status': self._private_attribute = '_user_status' elif attribute == 'leak_status': self._private_attribute = '_leak_status' elif attribute == 'setting': self._private_attribute = '_setting'
[docs] def requires(self): return OrderedSet([self._target_obj])
def __repr__(self): return '<ControlAction: {}, {}, {}>'.format(str(self._target_obj), str(self._attribute), str(self._repr_value())) def __str__(self): target_obj_type = (self._target_obj.link_type if isinstance(self._target_obj, Link) else self._target_obj.node_type) return "{} {} {} IS {}".format(target_obj_type.upper(), self._target_obj.name, self._attribute.upper(), self._repr_value()) def _repr_value(self): if self._attribute.lower() in ['status']: return LinkStatus(int(self._value)).name.upper() return self._value
[docs] def run_control_action(self): setattr(self._target_obj, self._private_attribute, self._value) self.notify()
[docs] def target(self): return self._target_obj, self._attribute
class _InternalControlAction(BaseControlAction): """ A control action class that modifies a private attribute in order to change a property on an object. For example, a valve has a status property, but the control action must act on the _internal_status. Parameters ---------- target_obj: object The object for which an attribute is being changed. internal_attribute: str The attribute being modified (e.g., _internal_stats) value: any The new value for the internal_attribute property_attribute: str The attribute to be checked for an actual change (e.g., status) """ def __init__(self, target_obj, internal_attribute, value, property_attribute): super(_InternalControlAction, self).__init__() if not hasattr(target_obj, internal_attribute): raise AttributeError('{0} does not have attribute {1}'.format(target_obj, internal_attribute)) if not hasattr(target_obj, property_attribute): raise AttributeError('{0} does not have attribute {1}'.format(target_obj, property_attribute)) self._target_obj = target_obj self._internal_attr = internal_attribute self._value = value self._property_attr = property_attribute def requires(self): """ Return a list of objects required by the control action. Returns ------- required_objects: list of object """ return OrderedSet([self._target_obj]) def run_control_action(self): """ Activate the control action. """ if self._target_obj is None: raise ValueError('target is None inside _InternalControlAction::RunControlAction.' + 'This may be because a target_obj was added, but later the object itself was deleted.') setattr(self._target_obj, self._internal_attr, self._value) self.notify() def target(self): """ Returns a tuple containing the target object and the attribute to check for modification. Returns ------- target: tuple """ return self._target_obj, self._property_attr def __repr__(self): return '<_InternalControlAction: {}, {}, {}>'.format(str(self._target_obj), self._internal_attr, str(self._value)) def __str__(self): return "set {}('{}').{} to {}".format(self._target_obj.__class__.__name__, self._target_obj.name, self._internal_attr, self._value) # # Control classes #
[docs] class ControlBase(six.with_metaclass(abc.ABCMeta, object)): """ This is the base class for all control objects. Control objects are used to check the conditions under which a ControlAction should be run. For example, if a pump is supposed to be turned on when the simulation time reaches 6 AM, the ControlAction would be "turn the pump on", and the ControlCondition would be "when the simulation reaches 6 AM". """
[docs] def __init__(self): super().__init__() self._control_type = None self._condition = None self._priority = None
[docs] @abc.abstractmethod def is_control_action_required(self): """ This method is called to see if any action is required by this control object. This method returns a tuple that indicates if action is required (a bool) and a recommended time for the simulation to backup (in seconds as a positive int). Returns ------- req: tuple A tuple (bool, int) indicating if an action should be run and how far to back up the simulation. """ pass
[docs] @abc.abstractmethod def run_control_action(self): """ This method is called to run the control action after a call to IsControlActionRequired indicates that an action is required. """ pass
[docs] @abc.abstractmethod def requires(self): """ Returns a set of objects required for this control. Returns ------- required_objects: OrderedSet of object """ return OrderedSet()
[docs] @abc.abstractmethod def actions(self): """ Returns a list of all actions used by this control. Returns ------- act: list of BaseControlAction """ pass
def _control_type_str(self): if self._control_type is _ControlType.rule: return 'Rule' else: return 'Control' def _reset(self): self._condition._reset() @property def condition(self): return self._condition @property def priority(self): return self._priority def _compare(self, other): """ Parameters ---------- other: ControlBase Returns ------- bool """ ret = True msg = '_compare failed in ControlBase because ' if self.priority != other.priority: ret = False msg += 'priorities were not equal' if self._control_type_str() != other._control_type_str(): ret = False msg += '_control_type_strs were not equal' if not self.condition._compare(other.condition): ret = False msg += 'conditions were not equal' for action1, action2 in zip(self.actions(), other.actions()): if not action1._compare(action2): ret = False msg += 'actions were not equal' break if ret is False: print(msg) return ret
[docs] @DocInheritor({'is_control_action_required', 'run_control_action', 'requires', 'actions'}) class Rule(ControlBase): """ A very general and flexible class for defining both controls rules. """
[docs] def __init__(self, condition, then_actions, else_actions=None, priority=ControlPriority.medium, name=None): """ Parameters ---------- condition: ControlCondition The condition that should be used to determine when the actions need to be activated. When the condition evaluates to True, the then_actions are activated. When the condition evaluates to False, the else_actions are activated. then_actions: Iterable of ControlAction The actions that should be activated when the condition evaluates to True. else_actions: Iterable of ControlAction The actions that should be activated when the condition evaluates to False. priority: ControlPriority The priority of the control. Default is ControlPriority.medium name: str The name of the control """ self.update_condition(condition) self.update_then_actions(then_actions) self.update_else_actions(else_actions) self._which = None self.update_priority(priority) self._name = name if self._name is None: self._name = '' self._control_type = _ControlType.rule if isinstance(condition, TankLevelCondition): if condition._relation is Comparison.eq: logger.warning('Using Comparison.eq with {0} will probably not work!'.format(type(condition))) warnings.warn('Using Comparison.eq with {0} will probably not work!'.format(type(condition)))
[docs] def to_dict(self): ret = dict() if self._control_type == _ControlType.rule: ret['type'] = 'rule' ret['name'] = str(self._name) ret['condition'] = str(self._condition) ret['then_actions'] = [str(a) for a in self._then_actions] ret['else_actions'] = [str(a) for a in self._else_actions] ret['priority'] = int(self._priority) else: ret['type'] = 'simple' ret['condition'] = str(self._condition) ret['then_actions'] = [str(a) for a in self._then_actions] return ret
@property def epanet_control_type(self): """ The control type. Note that presolve and postsolve controls are both simple controls in Epanet. Returns ------- control_type: _ControlType """ return self._control_type
[docs] def requires(self): req = self._condition.requires() for action in self._then_actions: req.update(action.requires()) for action in self._else_actions: req.update(action.requires()) return req
[docs] def actions(self): return self._then_actions + self._else_actions
@property def name(self): """ A string representation of the Control. """ if self._name is not None: return self._name else: return '/'.join(str(self).split()) def __repr__(self): fmt = "<Control: '{}', {}, {}, {}, priority={}>" return fmt.format(self._name, repr(self._condition), repr(self._then_actions), repr(self._else_actions), self._priority) def __str__(self): text = 'IF {}'.format(str(self._condition)) if self._then_actions is not None and len(self._then_actions) > 0: then_text = ' THEN ' for ct, act in enumerate(self._then_actions): if ct == 0: then_text += str(act) else: then_text += ' AND {}'.format(str(act)) text += then_text if self._else_actions is not None and len(self._else_actions) > 0: else_text = ' ELSE ' for ct, act in enumerate(self._else_actions): if ct == 0: else_text += str(act) else: else_text += ' AND {}'.format(str(act)) text += else_text if self._priority is not None and self._priority >= 0: text += ' PRIORITY {}'.format(self._priority) return text
[docs] def is_control_action_required(self): do = self._condition.evaluate() back = self._condition.backtrack if do: self._which = 'then' return True, back elif not do and self._else_actions is not None and len(self._else_actions) > 0: self._which = 'else' return True, back else: return False, None
[docs] def run_control_action(self): if self._which == 'then': for control_action in self._then_actions: control_action.run_control_action() elif self._which == 'else': for control_action in self._else_actions: control_action.run_control_action() else: raise RuntimeError('control actions called even though if-then statement was False')
[docs] def update_condition(self, condition:ControlCondition): """Update the controls condition in place Parameters ---------- condition : ControlCondition The new condition for this control to use Raises ------ ValueError If the provided condition isn't a valid ControlCondition """ try: logger.info(f"Replacing {self._condition} with {condition}") except AttributeError: # Occurs during intialisation pass if not isinstance(condition, ControlCondition): raise ValueError('The conditions argument must be a ControlCondition instance') self._condition = condition
[docs] def update_then_actions(self, then_actions:Iterable[ControlAction]): """Update the controls then_actions in place Parameters ---------- then_actions : Iterable[ControlAction] The new then_actions for this control to use """ try: logger.info(f"Replacing {self._then_actions} with {then_actions}") except AttributeError: # Occurs during intialisation pass self._then_actions = _ensure_iterable(then_actions)
[docs] def update_else_actions(self, else_actions:Iterable[ControlAction]): """Update the controls else_actions in place Parameters ---------- else_actions : Iterable[ControlAction] The new else_actions for this control to use """ try: logger.info(f"Replacing {self._else_actions} with {else_actions}") except AttributeError: # Occurs during intialisation pass self._else_actions = _ensure_iterable(else_actions)
[docs] def update_priority(self, priority:ControlPriority): """Update the controls priority in place Parameters ---------- priority : ControlPriority The new priority for this control to use """ try: logger.info(f"Replacing {self._priority} with {priority}") except AttributeError: # Occurs during intialisation pass self._priority = priority
[docs] class Control(Rule): """ A class for controls. """
[docs] def __init__(self, condition, then_action: BaseControlAction, priority=ControlPriority.medium, name=None): """ Parameters ---------- condition: ControlCondition The condition that should be used to determine when the actions need to be activated. When the condition evaluates to True, the then_actions are activated. When the condition evaluates to False, the else_actions are activated. then_action: BaseControlAction The action that should be activated when the condition evaluates to True. priority: ControlPriority The priority of the control. Default is ControlPriority.medium name: str The name of the control """ super().__init__(condition=condition, then_actions=then_action, priority=priority, name=name) if isinstance(condition, TankLevelCondition): self._control_type = _ControlType.pre_and_postsolve elif isinstance(condition, (TimeOfDayCondition, SimTimeCondition)): self._control_type = _ControlType.presolve else: self._control_type = _ControlType.postsolve
@classmethod def _time_control(cls, wnm, run_at_time, time_flag, daily_flag, control_action, name=None): """ This is a class method for creating simple time controls. Parameters ---------- wnm: wntr.network.WaterNetworkModel The WaterNetworkModel instance this control will be added to. run_at_time: int The time to activate the control action. time_flag: str Options are 'SIM_TIME' and 'CLOCK_TIME'. SIM_TIME indicates that run_at_time is the time since the start of the simulation. CLOCK_TIME indicates that run_at_time is the time of day. daily_flag: bool If True, then the control will repeat every day. control_action: BaseControlAction The control action that should occur at run_at_time. name: str An optional name for the control. Returns ------- ctrl: Control """ if time_flag.upper() == 'SIM_TIME': condition = SimTimeCondition(model=wnm, relation=Comparison.eq, threshold=run_at_time, repeat=daily_flag, first_time=0) elif time_flag.upper() == 'CLOCK_TIME': condition = TimeOfDayCondition(model=wnm, relation=Comparison.eq, threshold=run_at_time, repeat=daily_flag, first_day=0) else: raise ValueError("time_flag not recognized; expected either 'sim_time' or 'clock_time'") control = Control(condition=condition, then_action=control_action) return control @classmethod def _conditional_control(cls, source_obj, source_attr, operation, threshold, control_action, name=None): """ This is a class method for creating simple conditional controls. Parameters ---------- source_obj: object The object whose source_attr attribute will be compared to threshold to determine if control_action needs activated. source_attr: str The attribute of source_obj to compare to threshold. operation: Comparison The comparison function used to compare the source_attr attribute of source_obj to threshold. threshold: any The threshold used in the comparison. control_action: ControlAction The control action that should occur when operation(getattr(source_obj, source_attr), threshold) is True. name: str An optional name for the control Returns ------- ctrl: Control """ condition = ValueCondition(source_obj=source_obj, source_attr=source_attr, relation=operation, threshold=threshold) control = Control(condition=condition, then_action=control_action) return control
[docs] class ControlChangeTracker(Observer):
[docs] def __init__(self): self._actions = dict() self._previous_values: Dict[Any, Dict[Tuple[Any, str], Any]] = dict() # {key: {(obj, attr): value}} self._changed: Dict[Any, MutableSet[Tuple[Any, str]]] = dict() # {key: set of (obj, attr) that has been changed from _previous_values}
[docs] def clear_all_reference_points(self): self._previous_values = dict() self._changed = dict()
def _set_reference_point(self, key): self._previous_values[key] = dict() self._changed[key] = OrderedSet() for action in self._actions.keys(): obj, attr = action.target() self._previous_values[key][(obj, attr)] = getattr(obj, attr)
[docs] def set_reference_point(self, key): if key in self._previous_values: raise ValueError(f'The ControlChangeTracker already has reference point {key}') self._set_reference_point(key)
[docs] def reset_reference_point(self, key): self._set_reference_point(key)
[docs] def remove_reference_point(self, key): del self._previous_values[key] del self._changed[key]
[docs] def update(self, subject): """ The update method gets called when a subject (control action) is activated. Parameters ----------- subject: BaseControlAction """ obj_attr = subject.target() val = getattr(*obj_attr) for ref_point in self._previous_values.keys(): if val == self._previous_values[ref_point][obj_attr]: self._changed[ref_point].discard(obj_attr) else: self._changed[ref_point].add(obj_attr)
[docs] def register_control(self, control): """ Register a control Parameters ---------- control: ControlBase """ if len(self._previous_values) != 0: raise RuntimeError('Please call clear_reference_points() before registering more controls') for action in control.actions(): if action not in self._actions: self._actions[action] = OrderedSet() self._actions[action].add(control) action.subscribe(self)
[docs] def changes_made(self, ref_point): """ Specifies if changes were made. Returns ------- changes: bool """ return len(self._changed[ref_point]) > 0
[docs] def get_changes(self, ref_point): """ A generator for iterating over the objects, attributes that were changed. Returns ------- changes: tuple (object, attr) """ for obj, attr in self._changed[ref_point]: yield obj, attr
[docs] def deregister(self, control): """ Deregister a control Parameters ---------- control: ControlBase """ for action in control.actions(): self._actions[action].discard(control) if len(self._actions[action]) == 0: action.unsubscribe(self) del self._actions[action] obj_attr = action.target() for ref_point in self._previous_values.keys(): self._previous_values[ref_point].pop(obj_attr) self._changed[ref_point].discard(obj_attr)
[docs] class ControlChecker(object):
[docs] def __init__(self): self._controls = OrderedSet() """OrderedSet of ControlBase"""
def __iter__(self): return iter(self._controls)
[docs] def register_control(self, control): """ Register a control Parameters ---------- control: ControlBase """ self._controls.add(control)
[docs] def deregister(self, control): """ Deregister a control Parameters ---------- control: ControlBase """ self._controls.remove(control)
[docs] def check(self): """ Check which controls have actions that need activated. Returns ------- controls_to_run: list of tuple The tuple is (ControlBase, backtrack) """ controls_to_run = [] for c in self._controls: do, back = c.is_control_action_required() if do: controls_to_run.append((c, back)) return controls_to_run