Source code for wntr.epanet.msx.toolkit

# coding: utf-8
"""
The wntr.epanet.msx.toolkit module is a Python extension for the EPANET-MSX
Programmers Toolkit DLLs.

.. note::
    
    Code in this section is based on code from "EPANET-MSX-Python-wrapper",
    licensed under the BSD license. See LICENSE.md for details.
"""
import ctypes
import logging
import os
import os.path
import platform
import sys
from typing import Union

from pkg_resources import resource_filename

from wntr.epanet.msx.enums import TkObjectType, TkSourceType

from ..toolkit import ENepanet
from .exceptions import (MSX_ERROR_CODES, EpanetMsxException, MSXKeyError,
                         MSXValueError)

logger = logging.getLogger(__name__)

epanet_toolkit = "wntr.epanet.toolkit"

if os.name in ["nt", "dos"]:
    libepanet = resource_filename(__name__, "../libepanet/windows-x64/epanet2.dll")
    libmsx = resource_filename(__name__, "../libepanet/windows-x64/epanetmsx.dll")
elif sys.platform in ["darwin"]:
    if 'arm' in platform.platform().lower():
        libepanet = resource_filename(__name__, "../libepanet/darwin-arm/libepanet2.dylib")
        libmsx = resource_filename(__name__, "../libepanet/darwin-arm/libepanetmsx.dylib")
    else:
        libepanet = resource_filename(__name__, "../libepanet/darwin-x64/libepanet2.dylib")
        libmsx = resource_filename(__name__, "../libepanet/darwin-x64/libepanetmsx.dylib")
else:
    libepanet = resource_filename(__name__, "../libepanet/linux-x64/libepanet2.so")
    libmsx = resource_filename(__name__, "../libepanet/linux-x64/libepanetmsx.so")

dylib_dir = os.environ.get('DYLD_FALLBACK_LIBRARY_PATH','')
if dylib_dir != '':
    if 'arm' in platform.platform().lower():
        dylib_dir = dylib_dir + ':' + resource_filename(__name__, "../libepanet/darwin-arm")
    else:
        dylib_dir = dylib_dir + ':' + resource_filename(__name__, "../libepanet/darwin-x64")
    os.environ['DYLD_FALLBACK_LIBRARY_PATH'] = dylib_dir


[docs] class MSXepanet(ENepanet):
[docs] def __init__(self, inpfile="", rptfile="", binfile="", msxfile=""): self.ENlib = None self.errcode = 0 self.errcodelist = [] self.cur_time = 0 self.Warnflag = False self.Errflag = False self.fileLoaded = False self.inpfile = inpfile self.rptfile = rptfile self.binfile = binfile self.msxfile = msxfile try: if os.name in ["nt", "dos"]: self.ENlib = ctypes.windll.LoadLibrary(libmsx) else: self.ENlib = ctypes.cdll.LoadLibrary(libmsx) except: raise finally: self._project = None return
def _error(self, *args): """Print the error text the corresponds to the error code returned""" if not self.errcode: return # errtxt = self.ENlib.ENgeterror(self.errcode) errtext = MSX_ERROR_CODES.get(self.errcode, 'unknown error') if '%' in errtext and len(args) == 1: errtext % args if self.errcode >= 100: self.Errflag = True logger.error("EPANET error {} - {}".format(self.errcode, errtext)) raise EpanetMsxException(self.errcode) return
[docs] def ENopen(self, inpfile=None, rptfile=None, binfile=None): """ Opens an EPANET input file and reads in network data Parameters ---------- inpfile : str EPANET INP file (default to constructor value) rptfile : str Output file to create (default to constructor value) binfile : str Binary output file to create (default to constructor value) """ if self.fileLoaded: self.ENclose() if self.fileLoaded: raise RuntimeError("File is loaded and cannot be closed") if inpfile is None: inpfile = self.inpfile if rptfile is None: rptfile = self.rptfile if binfile is None: binfile = self.binfile inpfile = inpfile.encode("latin-1") rptfile = rptfile.encode("latin-1") binfile = binfile.encode("latin-1") self.errcode = self.ENlib.MSXENopen(inpfile, rptfile, binfile) self._error() if self.errcode < 100: self.fileLoaded = True return
[docs] def ENclose(self): """Frees all memory and files used by EPANET""" self.errcode = self.ENlib.MSXENclose() self._error() if self.errcode < 100: self.fileLoaded = False return
# ----------running the simulation-----------------------------------------
[docs] def MSXopen(self, msxfile): """Opens the MSX Toolkit to analyze a particular distribution system. Parameters ---------- msxfile : str Name of the MSX input file """ if msxfile is not None: msxfile = ctypes.c_char_p(msxfile.encode()) ierr = self.ENlib.MSXopen(msxfile) if ierr != 0: raise EpanetMsxException(ierr, msxfile)
[docs] def MSXclose(self): """Closes down the Toolkit system (including all files being processed)""" ierr = self.ENlib.MSXclose() if ierr != 0: raise EpanetMsxException(ierr)
[docs] def MSXusehydfile(self, filename): """Uses the contents of the specified file as the current binary hydraulics file Parameters ---------- filename : str Name of the hydraulics file to use """ ierr = self.ENlib.MSXusehydfile(ctypes.c_char_p(filename.encode())) if ierr != 0: raise EpanetMsxException(ierr, filename)
[docs] def MSXsolveH(self): """Runs a complete hydraulic simulation with results for all time periods written to the binary Hydraulics file.""" ierr = self.ENlib.MSXsolveH() if ierr != 0: raise EpanetMsxException(ierr)
[docs] def MSXinit(self, saveFlag=0): """Initializes the MSX system before solving for water quality results in step-wise fashion set saveFlag to 1 if water quality results should be saved to a scratch binary file, or to 0 is not saved to file""" saveFlag = int(saveFlag) ierr = self.ENlib.MSXinit(saveFlag) if ierr != 0: raise EpanetMsxException(ierr)
[docs] def MSXsolveQ(self): """Solves for water quality over the entire simulation period and saves the results to an internal scratch file""" ierr = self.ENlib.MSXsolveQ() if ierr != 0: raise EpanetMsxException(ierr)
[docs] def MSXstep(self): """Advances the water quality simulation one water quality time step. The time remaining in the overall simulation is returned as tleft, the current time as t.""" t = ctypes.c_long() tleft = ctypes.c_long() ierr = self.ENlib.MSXstep(ctypes.byref(t), ctypes.byref(tleft)) if ierr != 0: raise EpanetMsxException(ierr) out = [t.value, tleft.value] return out
[docs] def MSXsaveoutfile(self, filename): """Saves water quality results computed for each node, link and reporting time period to a named binary file Parameters ---------- filename : str Save a binary results file """ ierr = self.ENlib.MSXsaveoutfile(ctypes.c_char_p(filename.encode())) if ierr != 0: raise EpanetMsxException(ierr)
[docs] def MSXsavemsxfile(self, filename): """Saves the data associated with the current MSX project into a new MSX input file Parameters ---------- filename : str Name of the MSX input file to create """ ierr = self.ENlib.MSXsavemsxfile(ctypes.c_char_p(filename.encode())) if ierr != 0: raise EpanetMsxException(ierr, filename)
[docs] def MSXreport(self): """Writes water quality simulations results as instructed by the MSX input file to a text file""" ierr = self.ENlib.MSXreport() if ierr != 0: raise EpanetMsxException(ierr)
# ---------get parameters--------------------------------------------------
[docs] def MSXgetindex(self, _type: Union[int, TkObjectType], name): """Gets the internal index of an MSX object given its name. Parameters ---------- _type : int, str or ObjectType Type of object to get an index for name : str Name of the object to get an index for Returns ------- int Internal index Raises ------ MSXKeyError If an invalid str is passed for _type MSXValueError If _type is not a valid MSX object type """ try: _type = TkObjectType.get(_type) except KeyError: raise MSXKeyError(515, repr(_type)) type_ind = int(_type) ind = ctypes.c_int() ierr = self.ENlib.MSXgetindex(type_ind, ctypes.c_char_p(name.encode()), ctypes.byref(ind)) if ierr != 0: raise EpanetMsxException(ierr, repr(dict(_type=_type, name=name))) return ind.value
[docs] def MSXgetIDlen(self, _type, index): """Get the number of characters in the ID name of an MSX object given its internal index number. Parameters ---------- _type : int, str or ObjectType Type of object to get an index for index : int Index of the object to get the ID length for Returns ------- int Length of the object ID """ try: _type = TkObjectType.get(_type) except KeyError: raise MSXKeyError(515, repr(_type)) type_ind = int(_type) len = ctypes.c_int() ierr = self.ENlib.MSXgetIDlen(type_ind, ctypes.c_int(index), ctypes.byref(len)) if ierr != 0: raise EpanetMsxException(ierr, repr(dict(_type=_type, index=index))) return len.value
[docs] def MSXgetID(self, _type, index): """Get the ID name of an object given its internal index number Parameters ---------- _type : int, str or ObjectType Type of object to get an index for index : int Index of the object to get the ID for Returns ------- str Object ID """ try: _type = TkObjectType.get(_type) except KeyError: raise MSXKeyError(515, repr(_type)) type_ind = int(_type) maxlen = 32 id = ctypes.create_string_buffer(maxlen) ierr = self.ENlib.MSXgetID(type_ind, ctypes.c_int(index), ctypes.byref(id), ctypes.c_int(maxlen - 1)) if ierr != 0: raise EpanetMsxException(ierr, repr(dict(_type=_type, index=index))) # the .decode() added my MF 6/3/21 return id.value.decode()
[docs] def MSXgetinitqual(self, _type, node_link_index, species_index): """Get the initial concentration of a particular chemical species assigned to a specific node or link of the pipe network Parameters ---------- _type : str, int or ObjectType Type of object node_link_index : int Object index species_index : int Species index Returns ------- float Initial quality value for that node or link Raises ------ MSXKeyError Type passed in for ``_type`` is not valid MSXValueError Value for ``_type`` is not valid EpanetMsxException Any other error from the C-API """ try: _type = TkObjectType.get(_type) except KeyError: raise MSXKeyError(515, repr(_type)) if _type not in [TkObjectType.NODE, TkObjectType.LINK]: raise MSXValueError(515, repr(_type)) type_ind = int(_type) iniqual = ctypes.c_double() ierr = self.ENlib.MSXgetinitqual(ctypes.c_int(type_ind), ctypes.c_int(node_link_index), ctypes.c_int(species_index), ctypes.byref(iniqual)) if ierr != 0: raise EpanetMsxException(ierr, repr(dict(_type=_type, node_link_index=node_link_index, species_index=species_index))) return iniqual.value
[docs] def MSXgetqual(self, _type, node_link_index, species_index): """Get a chemical species concentration at a given node or the average concentration along a link at the current simulation time step Parameters ---------- _type : str, int or ObjectType Type of object node_link_index : int Object index species_index : int Species index Returns ------- float Current quality value for that node or link Raises ------ MSXKeyError Type passed in for ``_type`` is not valid MSXValueError Value for ``_type`` is not valid EpanetMsxException Any other error from the C-API """ try: _type = TkObjectType.get(_type) except KeyError: raise MSXKeyError(515, repr(_type)) if _type not in [TkObjectType.NODE, TkObjectType.LINK]: raise MSXValueError(515, repr(_type)) type_ind = int(_type) qual = ctypes.c_double() ierr = self.ENlib.MSXgetqual(ctypes.c_int(type_ind), ctypes.c_int(node_link_index), ctypes.c_int(species_index), ctypes.byref(qual)) if ierr != 0: raise EpanetMsxException(ierr, repr(dict(_type=_type, node_link_index=node_link_index, species_index=species_index))) return qual.value
[docs] def MSXgetconstant(self, constant_index): """Get the value of a particular reaction constant Parameters ---------- constant_index : int Index to the constant Returns ------- float Value of the constant Raises ------ EpanetMsxException Toolkit error occurred """ const = ctypes.c_double() ierr = self.ENlib.MSXgetconstant(constant_index, ctypes.byref(const)) if ierr != 0: raise EpanetMsxException(ierr, constant_index) return const.value
[docs] def MSXgetparameter(self, _type, node_link_index, param_index): """Get the value of a particular reaction parameter for a given TANK or PIPE. Parameters ---------- _type : int or str or Enum Get the type of the parameter node_link_index : int Link index param_index : int Parameter variable index Returns ------- float Parameter value Raises ------ MSXKeyError If there is no such _type MSXValueError If the _type is improper EpanetMsxException Any other error """ try: _type = TkObjectType.get(_type) except KeyError: raise MSXKeyError(515, repr(_type)) if _type not in [TkObjectType.NODE, TkObjectType.LINK]: raise MSXValueError(515, repr(_type)) type_ind = int(_type) param = ctypes.c_double() ierr = self.ENlib.MSXgetparameter(ctypes.c_int(type_ind), ctypes.c_int(node_link_index), ctypes.c_int(param_index), ctypes.byref(param)) if ierr != 0: raise EpanetMsxException(ierr, repr(dict(_type=_type, node_link_index=node_link_index, param_index=param_index))) return param.value
[docs] def MSXgetsource(self, node_index, species_index): """Get information on any external source of a particular chemical species assigned to a specific node of the pipe network Parameters ---------- node_index : int Node index species_index : int Species index Returns ------- list [source type, level, and pattern] where level is the baseline concentration (or mass flow rate) of the source and pattern the index of the time pattern used to add variability to the source's baseline level (0 if no pattern defined for the source) """ level = ctypes.c_double() _type = ctypes.c_int() pat = ctypes.c_int() ierr = self.ENlib.MSXgetsource(ctypes.c_int(node_index), ctypes.c_int(species_index), ctypes.byref(_type), ctypes.byref(level), ctypes.byref(pat)) if ierr != 0: raise EpanetMsxException(ierr, repr(dict(node_index=node_index, species_index=species_index))) src_out = [TkSourceType.get(_type.value), level.value, pat.value] return src_out
[docs] def MSXgetpatternlen(self, pat): """Get the number of time periods within a SOURCE time pattern. Parameters ---------- pat : int Pattern index Returns ------- int Number of time periods in the pattern """ len = ctypes.c_int() ierr = self.ENlib.MSXgetpatternlen(pat, ctypes.byref(len)) if ierr != 0: raise EpanetMsxException(ierr) return len.value
[docs] def MSXgetpatternvalue(self, pat, period): """Get the multiplier at a specific time period for a given SOURCE time pattern Parameters ---------- pat : int Pattern index period : int 1-indexed period of the pattern to retrieve Returns ------- Multiplier """ val = ctypes.c_double() ierr = self.ENlib.MSXgetpatternvalue(pat, period, ctypes.byref(val)) if ierr != 0: raise EpanetMsxException(ierr) return val.value
[docs] def MSXgetcount(self, _type): """Get the number of objects of a specified type. Parameters ---------- _type : int or str or Enum Type of object to count Returns ------- int Number of objects of specified type Raises ------ MSXKeyError If the _type is invalid """ try: _type = TkObjectType.get(_type) except KeyError: raise MSXKeyError(515, repr(_type)) type_ind = int(_type) count = ctypes.c_int() ierr = self.ENlib.MSXgetcount(type_ind, ctypes.byref(count)) if ierr != 0: raise EpanetMsxException(ierr) return count.value
[docs] def MSXgetspecies(self, species_index): """Get the attributes of a chemical species given its internal index number. Parameters ---------- species_index : int Species index to query (starting from 1 as listed in the MSX input file) Returns ------- int, str, float, float Type, units, aTol, and rTol for the species """ type_ind = ctypes.c_int() units = ctypes.create_string_buffer(15) aTol = ctypes.c_double() rTol = ctypes.c_double() ierr = self.ENlib.MSXgetspecies(species_index, ctypes.byref(type_ind), ctypes.byref(units), ctypes.byref(aTol), ctypes.byref(rTol)) if ierr != 0: raise EpanetMsxException(ierr) spe_out = [type_ind.value, units.value, aTol.value, rTol.value] return spe_out
[docs] def MSXgeterror(self, errcode, len=100): """Get the text for an error message given its error code Parameters ---------- errcode : int Error code len : int, optional Length of the error message, by default 100 and minimum 80 Returns ------- str String decoded from the DLL Warning ------- Getting string parameters in this way is not recommended, because it requires setting up string arrays that may or may not be the correct size. Use the wntr.epanet.msx.enums package to get error information. """ errmsg = ctypes.create_string_buffer(len) self.ENlib.MSXgeterror(errcode, ctypes.byref(errmsg), len) return errmsg.value.decode()
# --------------set parameters-----------------------------------
[docs] def MSXsetconstant(self, ind, value): """Set a new value to a specific reaction constant Parameters ---------- ind : int Index to the variable value : float Value to give the constant """ ierr = self.ENlib.MSXsetconstant(ctypes.c_int(ind), ctypes.c_double(value)) if ierr != 0: raise EpanetMsxException(ierr)
[docs] def MSXsetparameter(self, _type, ind, param, value): """Set a value to a particular reaction parameter for a given TANK or PIPE Parameters ---------- _type : int or str or enum Type of value to set ind : int Tank or pipe index param : int Parameter variable index value : float Value to be set Raises ------ MSXKeyError If there is no such _type MSXValueError If the _type is invalid """ try: _type = TkObjectType.get(_type) except KeyError: raise MSXKeyError(515, repr(_type)) if _type not in [TkObjectType.NODE, TkObjectType.LINK]: raise MSXValueError(515, repr(_type)) type_ind = int(_type) ierr = self.ENlib.MSXsetparameter(ctypes.c_int(type_ind), ctypes.c_int(ind), ctypes.c_int(param), ctypes.c_double(value)) if ierr != 0: raise EpanetMsxException(ierr)
[docs] def MSXsetinitqual(self, _type, ind, spe, value): """Set the initial concentration of a particular chemical species assigned to a specific node or link of the pipe network. Parameters ---------- _type : int or str or enum Type of network element to set ind : int Index of the network element spe : int Index of the species value : float Initial quality value """ try: _type = TkObjectType.get(_type) except KeyError: raise MSXKeyError(515, repr(_type)) if _type not in [TkObjectType.NODE, TkObjectType.LINK]: raise MSXValueError(515, repr(_type)) type_ind = int(_type) ierr = self.ENlib.MSXsetinitqual(ctypes.c_int(type_ind), ctypes.c_int(ind), ctypes.c_int(spe), ctypes.c_double(value)) if ierr != 0: raise EpanetMsxException(ierr)
[docs] def MSXsetsource(self, node, spe, _type, level, pat): """Set the attributes of an external source of a particular chemical species in a specific node of the pipe network Parameters ---------- node : int Node index spe : int Species index _type : int or str or enum Type of source level : float Source quality value pat : int Pattern index """ try: _type = TkSourceType.get(_type) except KeyError: raise MSXKeyError(515, repr(_type)) type_ind = int(_type) ierr = self.ENlib.MSXsetsource(ctypes.c_int(node), ctypes.c_int(spe), ctypes.c_int(type_ind), ctypes.c_double(level), ctypes.c_int(pat)) if ierr != 0: raise EpanetMsxException(ierr)
[docs] def MSXsetpattern(self, pat, mult): """Set multipliers to a given MSX SOURCE time pattern Parameters ---------- pat : int Pattern index mult : list-like Pattern multipliers """ length = len(mult) cfactors_type = ctypes.c_double * length cfactors = cfactors_type() for i in range(length): cfactors[i] = float(mult[i]) ierr = self.ENlib.MSXsetpattern(ctypes.c_int(pat), cfactors, ctypes.c_int(length)) if ierr != 0: raise EpanetMsxException(ierr)
[docs] def MSXsetpatternvalue(self, pat, period, value): """Set the multiplier factor for a specific period within a SOURCE time pattern. Parameters ---------- pat : int Pattern index period : int 1-indexed pattern time period index value : float Value to set at that time period """ ierr = self.ENlib.MSXsetpatternvalue(ctypes.c_int(pat), ctypes.c_int(period), ctypes.c_double(value)) if ierr != 0: raise EpanetMsxException(ierr)
[docs] def MSXaddpattern(self, patternid): """Add a new, empty MSX source time pattern to an MSX project. Parameters ---------- patternid : str Name of the new pattern """ ierr = self.ENlib.MSXaddpattern(ctypes.c_char_p(patternid.encode())) if ierr != 0: raise EpanetMsxException(ierr)