Source code for wntr.msx.model

# coding: utf-8
"""
The wntr.msx.model module includes methods to build a multi-species water
quality model.
"""

from __future__ import annotations

import logging
import warnings
from typing import Dict, Generator, List, Union
from wntr.epanet.util import NoteType

from wntr.utils.disjoint_mapping import KeyExistsError

from .base import (
    EXPR_FUNCTIONS,
    HYDRAULIC_VARIABLES,
    QualityModelBase,
    ReactionBase,
    NetworkDataBase,
    ReactionSystemBase,
    ExpressionType,
    ReactionType,
    SpeciesType,
    VariableType,
)
from .elements import Constant, HydraulicVariable, InitialQuality, MathFunction, Parameter, ParameterValues, Reaction, Species, Term
from .options import MsxSolverOptions

logger = logging.getLogger(__name__)

MsxVariable = Union[Constant, HydraulicVariable, MathFunction, Parameter, Species, Term]
"""A class that is a valid MSX variable class"""


[docs] class MsxReactionSystem(ReactionSystemBase): """Registry for all the variables registered in the multi-species reactions model. This object can be used like a mapping. """
[docs] def __init__(self) -> None: super().__init__() self._vars.add_disjoint_group("reserved") self._species = self._vars.add_disjoint_group("species") self._const = self._vars.add_disjoint_group("constant") self._param = self._vars.add_disjoint_group("parameter") self._term = self._vars.add_disjoint_group("term") self._rxns = dict(pipe=dict(), tank=dict()) self._pipes = self._rxns["pipe"] self._tanks = self._rxns["tank"]
@property def species(self) -> Dict[str, Species]: """Dictionary view onto only species""" return self._species @property def constants(self) -> Dict[str, Constant]: """Dictionary view onto only constants""" return self._const @property def parameters(self) -> Dict[str, Parameter]: """Dictionary view onto only parameters""" return self._param @property def terms(self) -> Dict[str, Term]: """Dictionary view onto only named terms""" return self._term @property def pipe_reactions(self) -> Dict[str, Reaction]: """Dictionary view onto pipe reactions""" return self._pipes @property def tank_reactions(self) -> Dict[str, Reaction]: """Dictionary view onto tank reactions""" return self._tanks
[docs] def add_variable(self, variable: MsxVariable) -> None: """Add a variable object to the registry. The appropriate group is determined by querying the object's var_type attribute. Parameters ---------- variable Variable to add. Raises ------ TypeError If `variable` is not an MsxVariable KeyExistsError If `variable` has a name that is already used in the registry """ if not isinstance(variable, (Species, Constant, Parameter, Term, MathFunction, HydraulicVariable)): raise TypeError("Expected AVariable object") if variable.name in self: raise KeyExistsError("Variable name {} already exists in model".format(variable.name)) variable._vars = self self._vars.add_item_to_group(variable.var_type.name.lower(), variable.name, variable)
[docs] def add_reaction(self, reaction: Reaction) -> None: """Add a reaction to the model Parameters ---------- reaction : Reaction Water quality reaction definition Raises ------ TypeError If `reaction` is not a Reaction KeyError If the `species_name` in the `reaction` does not exist in the model """ if not isinstance(reaction, Reaction): raise TypeError("Expected a Reaction object") if reaction.species_name not in self: raise KeyError("Species {} does not exist in the model".format(reaction.species_name)) self._rxns[reaction.reaction_type.name.lower()][reaction.species_name] = reaction
[docs] def variables(self) -> Generator[tuple, None, None]: """Generator looping through all variables""" for k, v in self._vars.items(): if v.var_type.name.lower() not in ['reserved']: yield k, v
[docs] def reactions(self) -> Generator[tuple, None, None]: """Generator looping through all reactions""" for k2, v in self._rxns.items(): for k1, v1 in v.items(): yield k1, v1
[docs] def to_dict(self) -> dict: """Dictionary representation of the MsxModel.""" return dict( species=[v.to_dict() for v in self._species.values()], constants=[v.to_dict() for v in self._const.values()], parameters=[v.to_dict() for v in self._param.values()], terms=[v.to_dict() for v in self._term.values()], pipe_reactions=[v.to_dict() for v in self.pipe_reactions.values()], tank_reactions=[v.to_dict() for v in self.tank_reactions.values()], )
[docs] class MsxNetworkData(NetworkDataBase): """Network-specific values associated with a multi-species water quality model Data is copied from dictionaries passed in, so once created, the dictionaries passed are not connected to this object. Parameters ---------- patterns : dict, optional Patterns to use for sources sources : dict, optional Sources defined for the model initial_quality : dict, optional Initial values for different species at different nodes, links, and the global value parameter_values : dict, optional Parameter values for different pipes and tanks Notes ----- ``patterns`` Dictionary keyed by pattern name (str) with values being the multipliers (list of float) ``sources`` Dictionary keyed by species name (str) with values being dictionaries keyed by junction name (str) with values being the dictionary of settings for the source ``initial_quality`` Dictionary keyed by species name (str) with values being either an :class:`~wntr.msx.elements.InitialQuality` object or the appropriate dictionary representation thereof. ``parameter_values`` Dictionary keyed by parameter name (str) with values being either a :class:`~wntr.msx.elements.ParameterValues` object or the appropriate dictionary representation thereof. """
[docs] def __init__(self, patterns: Dict[str, List[float]] = None, sources: Dict[str, Dict[str, dict]] = None, initial_quality: Dict[str, Union[dict, InitialQuality]] = None, parameter_values: Dict[str, Union[dict, ParameterValues]] = None) -> None: if sources is None: sources = dict() if initial_quality is None: initial_quality = dict() if patterns is None: patterns = dict() if parameter_values is None: parameter_values = dict() self._source_dict = dict() self._pattern_dict = dict() self._initial_quality_dict: Dict[str, InitialQuality] = dict() self._parameter_value_dict: Dict[str, ParameterValues] = dict() self._source_dict = sources.copy() self._pattern_dict = patterns.copy() for k, v in initial_quality.items(): self._initial_quality_dict[k] = InitialQuality(**v) for k, v in parameter_values.items(): self._parameter_value_dict[k] = ParameterValues(**v)
@property def sources(self): """Dictionary of sources, keyed by species name""" return self._source_dict @property def initial_quality(self) -> Dict[str, InitialQuality]: """Dictionary of initial quality values, keyed by species name""" return self._initial_quality_dict @property def patterns(self): """Dictionary of patterns, specific for the water quality model, keyed by pattern name. .. note:: the WaterNetworkModel cannot see these patterns, so names can be reused, so be careful. Likewise, this model cannot see the WaterNetworkModel patterns, so this could be a source of some confusion. """ return self._pattern_dict @property def parameter_values(self) -> Dict[str, ParameterValues]: """Dictionary of parameter values, keyed by parameter name""" return self._parameter_value_dict
[docs] def add_pattern(self, name: str, multipliers: List[float]): """Add a water quality model specific pattern. Arguments --------- name : str Pattern name multipliers : list of float Pattern multipliers """ self._pattern_dict[name] = multipliers
[docs] def init_new_species(self, species: Species): """(Re)set the initial quality values for a species Arguments --------- species : Species Species to (re)initialized. Returns ------- InitialQuality New initial quality values """ self._initial_quality_dict[str(species)] = InitialQuality() if isinstance(species, Species): species._vals = self._initial_quality_dict[str(species)] return self._initial_quality_dict[str(species)]
[docs] def remove_species(self, species: Union[Species, str]): """Remove a species from the network specific model Arguments --------- species : Species or str Species to be removed from the network data """ if isinstance(species, Species): species._vals = None try: self._initial_quality_dict.__delitem__(str(species)) except KeyError: pass
[docs] def init_new_parameter(self, param: Parameter): """(Re)initialize parameter values for a parameter Arguments --------- param : Parameter Parameter to be (re)initialized with network data Returns ------- ParameterValues New network data for the specific parameter """ self._parameter_value_dict[str(param)] = ParameterValues() if isinstance(param, Parameter): param._vals = self._parameter_value_dict[str(param)] return self._parameter_value_dict[str(param)]
[docs] def remove_parameter(self, param: Union[Parameter, str]): """Remove values associated with a specific parameter Ignores non-parameters. Arguments --------- param : Parameter or str Parameter or parameter name to be removed from the network data """ if isinstance(param, Parameter): param._vals = None try: self._parameter_value_dict.__delitem__(str(param)) except KeyError: pass
[docs] def to_dict(self) -> dict: ret = dict(initial_quality=dict(), parameter_values=dict(), sources=dict(), patterns=dict()) for k, v in self._initial_quality_dict.items(): ret["initial_quality"][k] = v.to_dict() for k, v in self._parameter_value_dict.items(): ret["parameter_values"][k] = v.to_dict() ret["sources"] = self._source_dict.copy() ret["patterns"] = self._pattern_dict.copy() return ret
[docs] class MsxModel(QualityModelBase): """Multi-species water quality model Arguments --------- msx_file_name : str, optional MSX file to to load into the MsxModel object, by default None """
[docs] def __init__(self, msx_file_name=None) -> None: super().__init__(msx_file_name) self._references: List[Union[str, Dict[str, str]]] = list() self._options: MsxSolverOptions = MsxSolverOptions() self._rxn_system: MsxReactionSystem = MsxReactionSystem() self._net_data: MsxNetworkData = MsxNetworkData() self._wn = None for v in HYDRAULIC_VARIABLES: self._rxn_system.add_variable(HydraulicVariable(**v)) for k, v in EXPR_FUNCTIONS.items(): self._rxn_system.add_variable(MathFunction(name=k.lower(), func=v)) self._rxn_system.add_variable(MathFunction(name=k.capitalize(), func=v)) self._rxn_system.add_variable(MathFunction(name=k.upper(), func=v)) if msx_file_name is not None: from wntr.epanet.msx.io import MsxFile MsxFile.read(msx_file_name, self)
def __repr__(self) -> str: ret = "{}(".format(self.__class__.__name__) if self.name: ret = ret + "name={}".format(repr(self.name)) elif self.title: ret = ret + "title={}".format(repr(self.title)) elif self._orig_file: ret = ret + "{}".format(repr(self._orig_file)) ret = ret + ")" return ret @property def references(self) -> List[Union[str, Dict[str, str]]]: """List of strings or mappings that provide references for this model .. note:: This property is a list, and should be modified using append/insert/remove. Members of the list should be json serializable (i.e., strings or dicts of strings). """ return self._references @property def reaction_system(self) -> MsxReactionSystem: """Reaction variables defined for this model""" return self._rxn_system @property def network_data(self) -> MsxNetworkData: """Network-specific values added to this model""" return self._net_data @property def options(self) -> MsxSolverOptions: """MSX model options""" return self._options @property def species_name_list(self) -> List[str]: """Get a list of species names""" return list(self.reaction_system.species.keys()) @property def constant_name_list(self) -> List[str]: """Get a list of coefficient names""" return list(self.reaction_system.constants.keys()) @property def parameter_name_list(self) -> List[str]: """Get a list of coefficient names""" return list(self.reaction_system.parameters.keys()) @property def term_name_list(self) -> List[str]: """Get a list of function (MSX 'terms') names""" return list(self.reaction_system.terms.keys()) @options.setter def options(self, value: Union[dict, MsxSolverOptions]): if isinstance(value, dict): self._options = MsxSolverOptions.factory(value) elif not isinstance(value, MsxSolverOptions): raise TypeError("Expected a MsxSolverOptions object, got {}".format(type(value))) else: self._options = value
[docs] def add_species( self, name: str, species_type: SpeciesType, units: str, atol: float = None, rtol: float = None, note: NoteType = None, diffusivity: float = None, ) -> Species: """Add a species to the model Arguments --------- name : str Species name species_type : SpeciesType Type of species, either BULK or WALL units : str Mass units for this species atol : float, optional unless rtol is not None Absolute solver tolerance for this species, by default None rtol : float, optional unless atol is not None Relative solver tolerance for this species, by default None note : NoteType, optional keyword Supplementary information regarding this variable, by default None (see also :class:`~wntr.epanet.util.ENcomment`) diffusivity : float, optional Diffusivity of this species in water Raises ------ KeyExistsError If a variable with this name already exists ValueError If `atol` or `rtol` ≤ 0 Returns ------- Species New species """ if name in self._rxn_system: raise KeyExistsError("Variable named {} already exists in model as type {}".format(name, self._rxn_system._vars.get_groupname(name))) species_type = SpeciesType.get(species_type, allow_none=False) iq = self.network_data.init_new_species(name) new = Species( name=name, species_type=species_type, units=units, atol=atol, rtol=rtol, note=note, _vars=self._rxn_system, _vals=iq, diffusivity=diffusivity, ) self.reaction_system.add_variable(new) return new
[docs] def remove_species(self, variable_or_name): """Remove a species from the model Removes from both the reaction_system and the network_data. Parameters ---------- variable_or_name : Species or str Species (or name of the species) to be removed Raises ------ KeyError If `variable_or_name` is not a species in the model """ name = str(variable_or_name) if name not in self.reaction_system.species: raise KeyError('The specified variable is not a registered species in the reaction system') self.network_data.remove_species(name) self.reaction_system.__delitem__(name)
[docs] def add_constant(self, name: str, value: float, units: str = None, note: NoteType = None) -> Constant: """Add a constant coefficient to the model Arguments --------- name : str Name of the coefficient value : float Constant value of the coefficient units : str, optional Units for this coefficient, by default None note : NoteType, optional Supplementary information regarding this variable, by default None Raises ------ KeyExistsError Variable with this name already exists Returns ------- Constant New constant coefficient """ if name in self._rxn_system: raise KeyExistsError("Variable named {} already exists in model as type {}".format(name, self._rxn_system._vars.get_groupname(name))) new = Constant(name=name, value=value, units=units, note=note, _vars=self._rxn_system) self.reaction_system.add_variable(new) return new
[docs] def remove_constant(self, variable_or_name): """Remove a constant coefficient from the model Parameters ---------- variable_or_name : Constant or str Constant (or name of the constant) to be removed Raises ------ KeyError If `variable_or_name` is not a constant coefficient in the model """ name = str(variable_or_name) if name not in self.reaction_system.constants: raise KeyError('The specified variable is not a registered constant in the reaction system') self.reaction_system.__delitem__(name)
[docs] def add_parameter(self, name: str, global_value: float, units: str = None, note: NoteType = None) -> Parameter: """Add a parameterized coefficient to the model Arguments --------- name : str Name of the parameter global_value : float Global value of the coefficient (can be overridden for specific pipes/tanks) units : str, optional Units for the coefficient, by default None note : NoteType, optional keyword Supplementary information regarding this variable, by default None (see also :class:`~wntr.epanet.util.ENcomment`). Raises ------ KeyExistsError If a variable with this name already exists Returns ------- Parameter New parameterized coefficient """ if name in self._rxn_system: raise KeyExistsError("Variable named {} already exists in model as type {}".format(name, self._rxn_system._vars.get_groupname(name))) pv = self.network_data.init_new_parameter(name) new = Parameter(name=name, global_value=global_value, units=units, note=note, _vars=self._rxn_system, _vals=pv) self.reaction_system.add_variable(new) return new
[docs] def remove_parameter(self, variable_or_name): """Remove a parameterized coefficient from the model Parameters ---------- variable_or_name : Parameter or str Parameter (or name of the parameter) to be removed Raises ------ KeyError If `variable_or_name` is not a parameter in the model """ name = str(variable_or_name) if name not in self.reaction_system.parameters: raise KeyError('The specified variable is not a registered parameter in the reaction system') self.network_data.remove_parameter(name) self.reaction_system.__delitem__(name)
[docs] def add_term(self, name: str, expression: str, note: NoteType = None) -> Term: """Add a named expression (term) to the model Parameters ---------- name : str Name of the functional term to be added expression : str Expression that the term defines note : NoteType, optional keyword Supplementary information regarding this variable, by default None (see also :class:`~wntr.epanet.util.ENcomment`) Raises ------ KeyExistsError if a variable with this name already exists Returns ------- Term New term """ if name in self._rxn_system: raise KeyError("Variable named {} already exists in model as type {}".format(name, self._rxn_system._vars.get_groupname(name))) new = Term(name=name, expression=expression, note=note, _vars=self._rxn_system) self.reaction_system.add_variable(new) return new
[docs] def remove_term(self, variable_or_name): """Remove a named expression (term) from the model Parameters ---------- variable_or_name : Term or str Term (or name of the term) to be deleted Raises ------ KeyError If `variable_or_name` is not a term in the model """ name = str(variable_or_name) if name not in self.reaction_system.terms: raise KeyError('The specified variable is not a registered term in the reaction system') self.reaction_system.__delitem__(name)
[docs] def add_reaction(self, species_name: Union[Species, str], reaction_type: ReactionType, expression_type: ExpressionType, expression: str, note: NoteType = None) -> ReactionBase: """Add a reaction to a species in the model Note that all species need to have both a pipe and tank reaction defined unless all species are bulk species and the tank reactions are identical to the pipe reactions. However, it is not recommended that users take this approach. Once added, access the reactions from the species' object. Arguments --------- species_name : Species or str Species (or name of species) the reaction is being defined for reaction_type: ReactionType Reaction type (location), from {PIPE, TANK} expression_type : ExpressionType Expression type (left-hand-side) of the equation, from {RATE, EQUIL, FORMULA} expression : str Expression defining the reaction note : NoteType, optional keyword Supplementary information regarding this reaction, by default None (see also :class:`~wntr.epanet.util.ENcomment`) Raises ------ TypeError If a variable that is not species is passed Returns ------- MsxReactionSystem New reaction object """ species_name = str(species_name) species = self.reaction_system.species[species_name] if species.var_type is not VariableType.SPECIES: raise TypeError("Variable {} is not a Species, is a {}".format(species.name, species.var_type)) reaction_type = ReactionType.get(reaction_type, allow_none=False) expression_type = ExpressionType.get(expression_type, allow_none=False) new = Reaction( reaction_type=reaction_type, expression_type=expression_type, species_name=species_name, expression=expression, note=note, ) self.reaction_system.add_reaction(new) return new
[docs] def remove_reaction(self, species_name: str, reaction_type: ReactionType) -> None: """Remove a reaction at a specified location from a species Parameters ---------- species : Species or str Species (or name of the species) of the reaction to remove reaction_type : ReactionType Reaction type (location) of the reaction to remove """ reaction_type = ReactionType.get(reaction_type, allow_none=False) species_name = str(species_name) del self.reaction_system._rxns[reaction_type.name.lower()][species_name]
[docs] def to_dict(self) -> dict: """Dictionary representation of the MsxModel""" from wntr import __version__ return { "version": "wntr-{}".format(__version__), "name": self.name, "title": self.title, "description": self.description if self.description is None or "\n" not in self.description else self.description.splitlines(), "references": self.references.copy(), "reaction_system": self.reaction_system.to_dict(), "network_data": self.network_data.to_dict(), "options": self.options.to_dict(), }
[docs] @classmethod def from_dict(cls, data) -> "MsxModel": """Create a new multi-species water quality model from a dictionary Parameters ---------- data : dict Model data """ from wntr import __version__ ver = data.get("version", None) if ver != 'wntr-{}'.format(__version__): logger.warn("Importing from a file created by a different version of wntr, compatibility not guaranteed") # warnings.warn("Importing from a file created by a different version of wntr, compatibility not guaranteed") new = cls() new.name = data.get("name", None) new.title = data.get("title", None) new.description = data.get("description", None) if isinstance(new.description, (list, tuple)): desc = "\n".join(new.description) new.description = desc new.references.extend(data.get("references", list())) rxn_sys = data.get("reaction_system", dict()) for var in rxn_sys.get("species", list()): new.add_species(**var) for var in rxn_sys.get("constants", list()): new.add_constant(**var) for var in rxn_sys.get("parameters", list()): new.add_parameter(**var) for var in rxn_sys.get("terms", list()): new.add_term(**var) for rxn in rxn_sys.get("pipe_reactions", list()): rxn["reaction_type"] = "pipe" new.add_reaction(**rxn) for rxn in rxn_sys.get("tank_reactions", list()): rxn["reaction_type"] = "tank" new.add_reaction(**rxn) new._net_data = MsxNetworkData(**data.get("network_data", dict())) for species in new.reaction_system.species: if species not in new.network_data.initial_quality: new.network_data.init_new_species(species) for param in new.reaction_system.parameters: if param not in new.network_data.parameter_values: new.network_data.init_new_parameter(param) opts = data.get("options", None) if opts: new.options = opts return new