"""
The wntr.epanet.toolkit module is a Python extension for the EPANET
Programmers Toolkit DLLs.
"""
import ctypes
import logging
import os
import os.path
import platform
import sys
from ctypes import byref
from pkg_resources import resource_filename
from .exceptions import EN_ERROR_CODES, EpanetException
from .util import SizeLimits
logger = logging.getLogger(__name__)
epanet_toolkit = "wntr.epanet.toolkit"
if os.name in ["nt", "dos"]:
libepanet = "libepanet/windows-x64/epanet22.dll"
elif sys.platform in ["darwin"]:
if 'arm' in platform.platform().lower():
libepanet = "libepanet/darwin-arm/libepanet2.dylib"
else:
libepanet = "libepanet/darwin-x64/libepanet22.dylib"
else:
libepanet = "libepanet/linux-x64/libepanet22.so"
[docs]
def ENgetwarning(code, sec=-1):
if sec >= 0:
hours = int(sec / 3600.0)
sec -= hours * 3600
mm = int(sec / 60.0)
sec -= mm * 60
header = "%3d:%.2d:%.2d" % (hours, mm, sec)
else:
header = "{}".format(code)
if code < 100:
msg = EN_ERROR_CODES.get(code, "Unknown warning %s")
else:
raise EpanetException(code)
return msg % header
[docs]
def runepanet(inpfile, rptfile=None, binfile=None):
"""Run an EPANET command-line simulation
Parameters
----------
inpfile : str
The input file name
"""
file_prefix, file_ext = os.path.splitext(inpfile)
if rptfile is None:
rptfile = file_prefix + ".rpt"
if binfile is None:
binfile = file_prefix + ".bin"
enData = ENepanet()
enData.ENopen(inpfile, rptfile, binfile)
enData.ENsolveH()
enData.ENsolveQ()
try:
enData.ENreport()
except:
pass
enData.ENclose()
[docs]
class ENepanet:
"""Wrapper class to load the EPANET DLL object, then perform operations on
the EPANET object that is created when a file is loaded.
This simulator is thread safe **only** for EPANET `version=2.2`.
Parameters
----------
inpfile : str
Input file to use
rptfile : str
Output file to report to
binfile : str
Results file to generate
version : float
EPANET version to use (either 2.0 or 2.2)
"""
[docs]
def __init__(self, inpfile="", rptfile="", binfile="", version=2.2):
self.ENlib = None
self.errcode = 0
self.errcodelist = []
self.cur_time = 0
self.Warnflag = False
self.Errflag = False
self.fileLoaded = False
self.inpfile = inpfile
self.rptfile = rptfile
self.binfile = binfile
try:
if float(version) == 2.0:
libname = libepanet.replace('epanet22.','epanet20.')
if 'arm' in platform.platform():
raise NotImplementedError('ARM-based processors not supported for version 2.0 of EPANET. Please use version=2.2')
else:
libname = libepanet
libname = resource_filename(__name__, libname)
if os.name in ["nt", "dos"]:
self.ENlib = ctypes.windll.LoadLibrary(libname)
else:
self.ENlib = ctypes.cdll.LoadLibrary(libname)
except:
raise
finally:
if version >= 2.2:
self._project = ctypes.c_uint64()
else:
self._project = None
return
[docs]
def isOpen(self):
"""Checks to see if the file is open"""
return self.fileLoaded
def _error(self, *args):
"""Print the error text the corresponds to the error code returned"""
if not self.errcode:
return
# errtxt = self.ENlib.ENgeterror(self.errcode)
errtext = EN_ERROR_CODES.get(self.errcode, "unknown error")
if "%" in errtext and len(args) == 1:
errtext % args
if self.errcode >= 100:
self.Errflag = True
logger.error("EPANET error {} - {}".format(self.errcode, errtext))
raise EpanetException(self.errcode)
else:
self.Warnflag = True
# warnings.warn(ENgetwarning(self.errcode))
logger.warning("EPANET warning {} - {}".format(self.errcode, ENgetwarning(self.errcode, self.cur_time)))
self.errcodelist.append(ENgetwarning(self.errcode, self.cur_time))
return
[docs]
def ENopen(self, inpfile=None, rptfile=None, binfile=None):
"""
Opens an EPANET input file and reads in network data
Parameters
----------
inpfile : str
EPANET INP file (default to constructor value)
rptfile : str
Output file to create (default to constructor value)
binfile : str
Binary output file to create (default to constructor value)
"""
if self._project is not None:
if self.fileLoaded:
self.EN_close(self._project)
if self.fileLoaded:
raise RuntimeError("File is loaded and cannot be closed")
if inpfile is None:
inpfile = self.inpfile
if rptfile is None:
rptfile = self.rptfile
if binfile is None:
binfile = self.binfile
inpfile = inpfile.encode("latin-1")
rptfile = rptfile.encode("latin-1")
binfile = binfile.encode("latin-1")
self.ENlib.EN_createproject(ctypes.byref(self._project))
self.errcode = self.ENlib.EN_open(self._project, inpfile, rptfile, binfile)
self._error()
if self.errcode < 100:
self.fileLoaded = True
return
else:
if self.fileLoaded:
self.ENclose()
if self.fileLoaded:
raise RuntimeError("File is loaded and cannot be closed")
if inpfile is None:
inpfile = self.inpfile
if rptfile is None:
rptfile = self.rptfile
if binfile is None:
binfile = self.binfile
inpfile = inpfile.encode("latin-1")
rptfile = rptfile.encode("latin-1")
binfile = binfile.encode("latin-1")
self.errcode = self.ENlib.ENopen(inpfile, rptfile, binfile)
self._error()
if self.errcode < 100:
self.fileLoaded = True
return
[docs]
def ENclose(self):
"""Frees all memory and files used by EPANET"""
if self._project is not None:
self.errcode = self.ENlib.EN_close(self._project)
self.ENlib.EN_deleteproject(self._project)
self._project = None
self._project = ctypes.c_uint64()
else:
self.errcode = self.ENlib.ENclose()
self._error()
if self.errcode < 100:
self.fileLoaded = False
return
[docs]
def ENsolveH(self):
"""Solves for network hydraulics in all time periods"""
if self._project is not None:
self.errcode = self.ENlib.EN_solveH(self._project)
else:
self.errcode = self.ENlib.ENsolveH()
self._error()
return
[docs]
def ENsaveH(self):
"""Solves for network hydraulics in all time periods
Must be called before ENreport() if no water quality simulation made.
Should not be called if ENsolveQ() will be used.
"""
if self._project is not None:
self.errcode = self.ENlib.EN_saveH(self._project)
else:
self.errcode = self.ENlib.ENsaveH()
self._error()
return
[docs]
def ENopenH(self):
"""Sets up data structures for hydraulic analysis"""
if self._project is not None:
self.errcode = self.ENlib.EN_openH(self._project)
else:
self.errcode = self.ENlib.ENopenH()
self._error()
return
[docs]
def ENinitH(self, iFlag):
"""Initializes hydraulic analysis
Parameters
-----------
iFlag : 2-digit flag
2-digit flag where 1st (left) digit indicates
if link flows should be re-initialized (1) or
not (0) and 2nd digit indicates if hydraulic
results should be saved to file (1) or not (0)
"""
if self._project is not None:
self.errcode = self.ENlib.EN_initH(self._project, iFlag)
else:
self.errcode = self.ENlib.ENinitH(iFlag)
self._error()
return
[docs]
def ENrunH(self):
"""Solves hydraulics for conditions at time t
This function is used in a loop with ENnextH() to run
an extended period hydraulic simulation.
See ENsolveH() for an example.
Returns
--------
int
Current simulation time (seconds)
"""
lT = ctypes.c_long()
if self._project is not None:
self.errcode = self.ENlib.EN_runH(self._project, byref(lT))
else:
self.errcode = self.ENlib.ENrunH(byref(lT))
self._error()
self.cur_time = lT.value
return lT.value
[docs]
def ENnextH(self):
"""Determines time until next hydraulic event
This function is used in a loop with ENrunH() to run
an extended period hydraulic simulation.
See ENsolveH() for an example.
Returns
---------
int
Time (seconds) until next hydraulic event (0 marks end of simulation period)
"""
lTstep = ctypes.c_long()
if self._project is not None:
self.errcode = self.ENlib.EN_nextH(self._project, byref(lTstep))
else:
self.errcode = self.ENlib.ENnextH(byref(lTstep))
self._error()
return lTstep.value
[docs]
def ENcloseH(self):
"""Frees data allocated by hydraulics solver"""
if self._project is not None:
self.errcode = self.ENlib.EN_closeH(self._project)
else:
self.errcode = self.ENlib.ENcloseH()
self._error()
return
[docs]
def ENsavehydfile(self, filename):
"""Copies binary hydraulics file to disk
Parameters
-------------
filename : str
Name of hydraulics file to output
"""
if self._project is not None:
self.errcode = self.ENlib.EN_savehydfile(self._project, filename.encode("latin-1"))
else:
self.errcode = self.ENlib.ENsavehydfile(filename.encode("latin-1"))
self._error()
return
[docs]
def ENusehydfile(self, filename):
"""Opens previously saved binary hydraulics file
Parameters
-------------
filename : str
Name of hydraulics file to use
"""
if self._project is not None:
self.errcode = self.ENlib.EN_usehydfile(self._project, filename.encode("latin-1"))
else:
self.errcode = self.ENlib.ENusehydfile(filename.encode("latin-1"))
self._error()
return
[docs]
def ENsolveQ(self):
"""Solves for network water quality in all time periods"""
if self._project is not None:
self.errcode = self.ENlib.EN_solveQ(self._project)
else:
self.errcode = self.ENlib.ENsolveQ()
self._error()
return
[docs]
def ENopenQ(self):
"""Sets up data structures for water quality analysis"""
if self._project is not None:
self.errcode = self.ENlib.EN_openQ(self._project)
else:
self.errcode = self.ENlib.ENopenQ()
self._error()
return
[docs]
def ENinitQ(self, iSaveflag):
"""Initializes water quality analysis
Parameters
-------------
iSaveflag : int
EN_SAVE (1) if results saved to file, EN_NOSAVE (0) if not
"""
if self._project is not None:
self.errcode = self.ENlib.EN_initQ(self._project, iSaveflag)
else:
self.errcode = self.ENlib.ENinitQ(iSaveflag)
self._error()
return
[docs]
def ENrunQ(self):
"""Retrieves hydraulic and water quality results at time t
This function is used in a loop with ENnextQ() to run
an extended period water quality simulation. See ENsolveQ() for
an example.
Returns
-------
int
Current simulation time (seconds)
"""
lT = ctypes.c_long()
if self._project is not None:
self.errcode = self.ENlib.EN_runQ(self._project, byref(lT))
else:
self.errcode = self.ENlib.ENrunQ(byref(lT))
self._error()
return lT.value
[docs]
def ENnextQ(self):
"""Advances water quality simulation to next hydraulic event
This function is used in a loop with ENrunQ() to run
an extended period water quality simulation. See ENsolveQ() for
an example.
Returns
--------
int
Time (seconds) until next hydraulic event (0 marks end of simulation period)
"""
lTstep = ctypes.c_long()
if self._project is not None:
self.errcode = self.ENlib.EN_nextQ(self._project, byref(lTstep))
else:
self.errcode = self.ENlib.ENnextQ(byref(lTstep))
self._error()
return lTstep.value
[docs]
def ENcloseQ(self):
"""Frees data allocated by water quality solver"""
if self._project is not None:
self.errcode = self.ENlib.EN_closeQ(self._project)
else:
self.errcode = self.ENlib.ENcloseQ()
self._error()
return
[docs]
def ENreport(self):
"""Writes report to report file"""
if self._project is not None:
self.errcode = self.ENlib.EN_report(self._project)
else:
self.errcode = self.ENlib.ENreport()
self._error()
return
[docs]
def ENgetcount(self, iCode):
"""Retrieves the number of components of a given type in the network
Parameters
-------------
iCode : int
Component code (see toolkit.optComponentCounts)
Returns
---------
int
Number of components in network
"""
iCount = ctypes.c_int()
if self._project is not None:
self.errcode = self.ENlib.EN_getcount(self._project, iCode, byref(iCount))
else:
self.errcode = self.ENlib.ENgetcount(iCode, byref(iCount))
self._error()
return iCount.value
[docs]
def ENgetflowunits(self):
"""Retrieves flow units code
Returns
-----------
Code of flow units in use (see toolkit.optFlowUnits)
"""
iCode = ctypes.c_int()
if self._project is not None:
self.errcode = self.ENlib.EN_getflowunits(self._project, byref(iCode))
else:
self.errcode = self.ENlib.ENgetflowunits(byref(iCode))
self._error()
return iCode.value
[docs]
def ENgetnodeid(self, iIndex):
"""Gets the ID name of a node given its index.
Parameters
----------
iIndex : int
a node's index (starting from 1).
Returns
-------
str
the node name
"""
fValue = ctypes.create_string_buffer(SizeLimits.EN_MAX_ID.value)
if self._project is not None:
self.errcode = self.ENlib.EN_getnodeid(self._project, iIndex, byref(fValue))
else:
self.errcode = self.ENlib.ENgetnodeid(iIndex, byref(fValue))
self._error()
return str(fValue.value, "UTF-8")
[docs]
def ENgetnodeindex(self, sId):
"""Retrieves index of a node with specific ID
Parameters
-------------
sId : int
Node ID
Returns
---------
Index of node in list of nodes
"""
iIndex = ctypes.c_int()
if self._project is not None:
self.errcode = self.ENlib.EN_getnodeindex(self._project, sId.encode("latin-1"), byref(iIndex))
else:
self.errcode = self.ENlib.ENgetnodeindex(sId.encode("latin-1"), byref(iIndex))
self._error()
return iIndex.value
[docs]
def ENgetnodetype(self, iIndex):
"""Retrieves a node's type given its index.
Parameters
----------
iIndex : int
The index of the node
Returns
-------
int
the node type as an integer
"""
fValue = ctypes.c_int()
if self._project is not None:
self.errcode = self.ENlib.EN_getnodetype(self._project, iIndex, byref(fValue))
else:
self.errcode = self.ENlib.ENgetnodetype(iIndex, byref(fValue))
self._error()
return fValue.value
[docs]
def ENgetnodevalue(self, iIndex, iCode):
"""Retrieves parameter value for a node
Parameters
-------------
iIndex: int
Node index
iCode : int
Node parameter code (see toolkit.optNodeParams)
Returns
---------
Value of node's parameter
"""
fValue = ctypes.c_float()
if self._project is not None:
fValue = ctypes.c_double()
self.errcode = self.ENlib.EN_getnodevalue(self._project, iIndex, iCode, byref(fValue))
else:
self.errcode = self.ENlib.ENgetnodevalue(iIndex, iCode, byref(fValue))
self._error()
return fValue.value
[docs]
def ENgetlinktype(self, iIndex):
"""Retrieves a link's type given its index.
Parameters
----------
iIndex : int
The index of the link
Returns
-------
int
the link type as an integer
"""
fValue = ctypes.c_int()
if self._project is not None:
self.errcode = self.ENlib.EN_getlinktype(self._project, iIndex, byref(fValue))
else:
self.errcode = self.ENlib.EN_getlinktype(iIndex, byref(fValue))
self._error()
return fValue.value
[docs]
def ENgetlinkindex(self, sId):
"""Retrieves index of a link with specific ID
Parameters
-------------
sId : int
Link ID
Returns
---------
Index of link in list of links
"""
iIndex = ctypes.c_int()
if self._project is not None:
self.errcode = self.ENlib.EN_getlinkindex(self._project, sId.encode("latin-1"), byref(iIndex))
else:
self.errcode = self.ENlib.ENgetlinkindex(sId.encode("latin-1"), byref(iIndex))
self._error()
return iIndex.value
[docs]
def ENgetlinkvalue(self, iIndex, iCode):
"""Retrieves parameter value for a link
Parameters
-------------
iIndex : int
Link index
iCode : int
Link parameter code (see toolkit.optLinkParams)
Returns
---------
Value of link's parameter
"""
fValue = ctypes.c_float()
if self._project is not None:
fValue = ctypes.c_double()
self.errcode = self.ENlib.EN_getlinkvalue(self._project, iIndex, iCode, byref(fValue))
else:
self.errcode = self.ENlib.ENgetlinkvalue(iIndex, iCode, byref(fValue))
self._error()
return fValue.value
[docs]
def ENsetlinkvalue(self, iIndex, iCode, fValue):
"""Set the value on a link
Parameters
----------
iIndex : int
the link index
iCode : int
the parameter enum integer
fValue : float
the value to set on the link
"""
if self._project is not None:
self.errcode = self.ENlib.EN_setlinkvalue(
self._project, ctypes.c_int(iIndex), ctypes.c_int(iCode), ctypes.c_double(fValue)
)
else:
self.errcode = self.ENlib.ENsetlinkvalue(ctypes.c_int(iIndex), ctypes.c_int(iCode), ctypes.c_float(fValue))
self._error()
[docs]
def ENsetnodevalue(self, iIndex, iCode, fValue):
"""
Set the value on a node
Parameters
----------
iIndex : int
the node index
iCode : int
the parameter enum integer
fValue : float
the value to set on the node
"""
if self._project is not None:
self.errcode = self.ENlib.EN_setnodevalue(
self._project, ctypes.c_int(iIndex), ctypes.c_int(iCode), ctypes.c_double(fValue)
)
else:
self.errcode = self.ENlib.ENsetnodevalue(ctypes.c_int(iIndex), ctypes.c_int(iCode), ctypes.c_float(fValue))
self._error()
[docs]
def ENsettimeparam(self, eParam, lValue):
"""Set a time parameter value
Parameters
----------
eParam : int
the time parameter to set
lValue : long
the value to set, in seconds
"""
if self._project is not None:
self.errcode = self.ENlib.EN_settimeparam(self._project, ctypes.c_int(eParam), ctypes.c_long(lValue))
else:
self.errcode = self.ENlib.ENsettimeparam(ctypes.c_int(eParam), ctypes.c_long(lValue))
self._error()
[docs]
def ENgettimeparam(self, eParam):
"""Get a time parameter value
Parameters
----------
eParam : int
the time parameter to get
Returns
-------
long
the value of the time parameter, in seconds
"""
lValue = ctypes.c_long()
if self._project is not None:
self.errcode = self.ENlib.EN_gettimeparam(self._project, ctypes.c_int(eParam), byref(lValue))
else:
self.errcode = self.ENlib.ENgettimeparam(ctypes.c_int(eParam), byref(lValue))
self._error()
return lValue.value
[docs]
def ENaddcontrol(self, iType: int, iLinkIndex: int, dSetting: float, iNodeIndex: int, dLevel: float) -> int:
"""Add a new simple control
Parameters
----------
iType : int
the type of control
iLinkIndex : int
the index of the link
dSetting : float
the new link setting value
iNodeIndex : int
Set to 0 for time of day or timer
dLevel : float
the level to compare against
Returns
-------
int
the new control number
"""
lValue = ctypes.c_int()
if self._project is not None:
self.errcode = self.ENlib.EN_addcontrol(
self._project,
ctypes.c_int(iType),
ctypes.c_int(iLinkIndex),
ctypes.c_double(dSetting),
ctypes.c_int(iNodeIndex),
ctypes.c_double(dLevel),
byref(lValue),
)
else:
self.errcode = self.ENlib.ENaddcontrol(
ctypes.c_int(iType),
ctypes.c_int(iLinkIndex),
ctypes.c_double(dSetting),
ctypes.c_int(iNodeIndex),
ctypes.c_double(dLevel),
byref(lValue),
)
self._error()
return lValue.value
[docs]
def ENgetcontrol(self, iIndex: int):
"""Get values defined by a control.
Parameters
----------
iIndex : int
the control number
"""
iType = ctypes.c_int()
iLinkIndex = ctypes.c_int()
dSetting = ctypes.c_double()
iNodeIndex = ctypes.c_int()
dLevel = ctypes.c_double()
if self._project is not None:
self.errcode = self.ENlib.EN_getcontrol(
self._project,
ctypes.c_int(iIndex),
byref(iType),
byref(iLinkIndex),
byref(dSetting),
byref(iNodeIndex),
byref(dLevel),
)
else:
self.errcode = self.ENlib.ENgetcontrol(
ctypes.c_int(iIndex), byref(iType), byref(iLinkIndex), byref(dSetting), byref(iNodeIndex), byref(dLevel)
)
self._error()
return dict(
index=iIndex,
type=iType.value,
linkindex=iLinkIndex.value,
setting=dSetting.value,
nodeindex=iNodeIndex.value,
level=dLevel.value,
)
[docs]
def ENsetcontrol(self, iIndex: int, iType: int, iLinkIndex: int, dSetting: float, iNodeIndex: int, dLevel: float):
"""Change values on a simple control
Parameters
----------
iIndex : int
the control index
iType : int
the type of control comparison
iLinkIndex : int
the link being changed
dSetting : float
the setting to change to
iNodeIndex : int
the node being compared against, Set to 0 for time of day or timer
dLevel : float
the level being checked
Warning
-------
There is an error in EPANET 2.2 that sets the `dLevel` parameter to 0.0 on Macs
regardless of the value the user passes in. This means that to use this toolkit
functionality on a Mac, the user must delete and create a new control to change
the level.
"""
if self._project is not None:
try:
self.errcode = self.ENlib.EN_setcontrol(
self._project,
ctypes.c_int(iIndex),
ctypes.c_int(iType),
ctypes.c_int(iLinkIndex),
ctypes.c_double(dSetting),
ctypes.c_int(iNodeIndex),
ctypes.c_double(dLevel),
)
except:
self.errcode = self.ENlib.EN_setcontrol(
self._project,
ctypes.c_int(iIndex),
ctypes.c_int(iType),
ctypes.c_int(iLinkIndex),
ctypes.c_double(dSetting),
ctypes.c_int(iNodeIndex),
ctypes.c_float(dLevel),
)
else:
self.errcode = self.ENlib.ENsetcontrol(
ctypes.c_int(iIndex),
ctypes.c_int(iType),
ctypes.c_int(iLinkIndex),
ctypes.c_double(dSetting),
ctypes.c_int(iNodeIndex),
ctypes.c_double(dLevel),
)
self._error()
[docs]
def ENdeletecontrol(self, iControlIndex):
"""Delete a control.
Parameters
----------
iControlIndex : int
the simple control to delete
"""
if self._project is not None:
self.errcode = self.ENlib.EN_deletecontrol(self._project, ctypes.c_int(iControlIndex))
else:
self.errcode = self.ENlib.ENdeletecontrol(ctypes.c_int(iControlIndex))
self._error()
[docs]
def ENsaveinpfile(self, inpfile):
"""Saves EPANET input file
Parameters
-------------
inpfile : str
EPANET INP output file
"""
inpfile = inpfile.encode("latin-1")
if self._project is not None:
self.errcode = self.ENlib.EN_saveinpfile(self._project, inpfile)
else:
self.errcode = self.ENlib.ENsaveinpfile(inpfile)
self._error()
return