Source code for wntr.epanet.io

"""
The wntr.epanet.io module contains methods for reading/writing EPANET input and output files.
"""
from __future__ import absolute_import

import datetime
import difflib
import io
import logging
import os
import re
import sys
import warnings
from collections import OrderedDict

import numpy as np
import pandas as pd
import six
import wntr
from wntr.epanet.exceptions import ENKeyError, ENSyntaxError, ENValueError, EpanetException
import wntr.network
from wntr.network.base import Link
from wntr.network.controls import (AndCondition, Comparison, Control,
                                   ControlAction, OrCondition, Rule,
                                   SimTimeCondition, TimeOfDayCondition,
                                   ValueCondition, _ControlType)
from wntr.network.elements import Junction, Pipe, Pump, Reservoir, Tank, Valve
from wntr.network.model import (Curve, Demands, LinkStatus, Pattern, Source,
                                WaterNetworkModel)
from wntr.network.options import Options

from .util import (EN, FlowUnits, HydParam, MassUnits, MixType, PressureUnits,
                   QualParam, QualType, ResultType, StatisticsType, from_si,
                   to_si)

#from .time_utils import run_lineprofile
sys_default_enc = sys.getdefaultencoding()


logger = logging.getLogger(__name__)

_INP_SECTIONS = ['[OPTIONS]', '[TITLE]', '[JUNCTIONS]', '[RESERVOIRS]',
                 '[TANKS]', '[PIPES]', '[PUMPS]', '[VALVES]', '[EMITTERS]',
                 '[CURVES]', '[PATTERNS]', '[ENERGY]', '[STATUS]',
                 '[CONTROLS]', '[RULES]', '[DEMANDS]', '[QUALITY]',
                 '[REACTIONS]', '[SOURCES]', '[MIXING]',
                 '[TIMES]', '[REPORT]', '[COORDINATES]', '[VERTICES]',
                 '[LABELS]', '[BACKDROP]', '[TAGS]']

_JUNC_ENTRY = ' {name:20} {elev:15.11g} {dem:15.11g} {pat:24} {com:>3s}\n'
_JUNC_LABEL = '{:21} {:>12s} {:>12s} {:24}\n'

_RES_ENTRY = ' {name:20s} {head:15.11g} {pat:>24s} {com:>3s}\n'
_RES_LABEL = '{:21s} {:>20s} {:>24s}\n'

_TANK_ENTRY = ' {name:20s} {elev:15.11g} {initlev:15.11g} {minlev:15.11g} {maxlev:15.11g} {diam:15.11g} {minvol:15.11g} {curve:20s} {overflow:20s} {com:>3s}\n'
_TANK_LABEL = '{:21s} {:>20s} {:>20s} {:>20s} {:>20s} {:>20s} {:>20s} {:20s} {:20s}\n'

_PIPE_ENTRY = ' {name:20s} {node1:20s} {node2:20s} {len:15.11g} {diam:15.11g} {rough:15.11g} {mloss:15.11g} {status:>20s} {com:>3s}\n'
_PIPE_LABEL = '{:21s} {:20s} {:20s} {:>20s} {:>20s} {:>20s} {:>20s} {:>20s}\n'

_PUMP_ENTRY = ' {name:20s} {node1:20s} {node2:20s} {ptype:8s} {params:20s} {com:>3s}\n'
_PUMP_LABEL = '{:21s} {:20s} {:20s} {:20s}\n'

_VALVE_ENTRY = ' {name:20s} {node1:20s} {node2:20s} {diam:15.11g} {vtype:4s} {set:15.11g} {mloss:15.11g} {com:>3s}\n'
_GPV_ENTRY = ' {name:20s} {node1:20s} {node2:20s} {diam:15.11g} {vtype:4s} {set:20s} {mloss:15.11g} {com:>3s}\n'
_VALVE_LABEL = '{:21s} {:20s} {:20s} {:>20s} {:4s} {:>20s} {:>20s}\n'

_CURVE_ENTRY = ' {name:10s} {x:12f} {y:12f} {com:>3s}\n'
_CURVE_LABEL = '{:11s} {:12s} {:12s}\n'

def _split_line(line):
    _vc = line.split(';', 1)
    _cmnt = None
    _vals = None
    if len(_vc) == 0:
        pass
    elif len(_vc) == 1:
        _vals = _vc[0].split()
    elif _vc[0] == '':
        _cmnt = _vc[1]
    else:
        _vals = _vc[0].split()
        _cmnt = _vc[1]
    return _vals, _cmnt

def _is_number(s):
    """
    Checks if input is a number


    Parameters
    ----------
    s : anything

    Returns
    -------
    bool
        Input is a number
    """

    try:
        float(s)
        return True
    except ValueError:
        return False


def _str_time_to_sec(s):
    """
    Converts EPANET time format to seconds.


    Parameters
    ----------
    s : string
        EPANET time string. Options are 'HH:MM:SS', 'HH:MM', 'HH'


    Returns
    -------
    int
        Integer value of time in seconds.
    """
    pattern1 = re.compile(r'^(\d+):(\d+):(\d+)$')
    time_tuple = pattern1.search(s)
    if bool(time_tuple):
        return (int(time_tuple.groups()[0])*60*60 +
                int(time_tuple.groups()[1])*60 +
                int(round(float(time_tuple.groups()[2]))))
    else:
        pattern2 = re.compile(r'^(\d+):(\d+)$')
        time_tuple = pattern2.search(s)
        if bool(time_tuple):
            return (int(time_tuple.groups()[0])*60*60 +
                    int(time_tuple.groups()[1])*60)
        else:
            pattern3 = re.compile(r'^(\d+)$')
            time_tuple = pattern3.search(s)
            if bool(time_tuple):
                return int(time_tuple.groups()[0])*60*60
            else:
                raise ENValueError(213, s)


def _clock_time_to_sec(s, am_pm):
    """
    Converts EPANET clocktime format to seconds.


    Parameters
    ----------
    s : string
        EPANET time string. Options are 'HH:MM:SS', 'HH:MM', HH'

    am : string
        options are AM or PM


    Returns
    -------
    int
        Integer value of time in seconds

    """
    if am_pm.upper() == 'AM':
        am = True
    elif am_pm.upper() == 'PM':
        am = False
    else:
        raise ENValueError(213, s, 'Ambiguous time of day')

    pattern1 = re.compile(r'^(\d+):(\d+):(\d+)$')
    time_tuple = pattern1.search(s)
    if bool(time_tuple):
        time_sec = (int(time_tuple.groups()[0])*60*60 +
                    int(time_tuple.groups()[1])*60 +
                    int(round(float(time_tuple.groups()[2]))))
        if s.startswith('12'):
            time_sec -= 3600*12
        if not am:
            if time_sec >= 3600*12:
                raise ENValueError(213, s, 'Cannot specify am/pm for times greater than 12:00:00')
            time_sec += 3600*12
        return time_sec
    else:
        pattern2 = re.compile(r'^(\d+):(\d+)$')
        time_tuple = pattern2.search(s)
        if bool(time_tuple):
            time_sec = (int(time_tuple.groups()[0])*60*60 +
                        int(time_tuple.groups()[1])*60)
            if s.startswith('12'):
                time_sec -= 3600*12
            if not am:
                if time_sec >= 3600 * 12:
                    raise ENValueError(213, s, 'Cannot specify am/pm for times greater than 12:00:00')
                time_sec += 3600*12
            return time_sec
        else:
            pattern3 = re.compile(r'^(\d+)$')
            time_tuple = pattern3.search(s)
            if bool(time_tuple):
                time_sec = int(time_tuple.groups()[0])*60*60
                if s.startswith('12'):
                    time_sec -= 3600*12
                if not am:
                    if time_sec >= 3600 * 12:
                        raise ENValueError(213, s, 'Cannot specify am/pm for times greater than 12:00:00')
                    time_sec += 3600*12
                return time_sec
            else:
                raise ENValueError(213, s, 'Cannot parse time')


def _sec_to_string(sec):
    hours = int(sec/3600.)
    sec -= hours*3600
    mm = int(sec/60.)
    sec -= mm*60
    return (hours, mm, int(sec))


[docs] class InpFile(object): """ EPANET INP file reader and writer class. This class provides read and write functionality for EPANET INP files. The EPANET Users Manual provides full documentation for the INP file format. """
[docs] def __init__(self): self.sections = OrderedDict() for sec in _INP_SECTIONS: self.sections[sec] = [] self.mass_units = None self.flow_units = None self.top_comments = [] self.curves = OrderedDict()
[docs] def read(self, inp_files, wn=None): """ Read an EPANET INP file and load data into a water network model object. Both EPANET 2.0 and EPANET 2.2 INP file options are recognized and handled. Parameters ---------- inp_files : str or list An EPANET INP input file or list of INP files to be combined wn : WaterNetworkModel, optional An optional network model to append onto; by default a new model is created. Returns ------- WaterNetworkModel A water network model object """ if wn is None: wn = WaterNetworkModel() self.wn = wn if not isinstance(inp_files, list): inp_files = [inp_files] wn.name = inp_files[0] self.curves = OrderedDict() self.top_comments = [] self.sections = OrderedDict() for sec in _INP_SECTIONS: self.sections[sec] = [] self.mass_units = None self.flow_units = None for filename in inp_files: section = None lnum = 0 edata = {'fname': filename} with io.open(filename, 'r', encoding='utf-8') as f: for line in f: lnum += 1 edata['lnum'] = lnum line = line.strip() nwords = len(line.split()) if len(line) == 0 or nwords == 0: # Blank line continue elif line.startswith('['): vals = line.split(None, 1) sec = vals[0].upper() # Add handlers to deal with extra 'S'es (or missing 'S'es) in INP file if sec not in _INP_SECTIONS: trsec = sec.replace(']','S]') if trsec in _INP_SECTIONS: sec = trsec if sec not in _INP_SECTIONS: trsec = sec.replace('S]',']') if trsec in _INP_SECTIONS: sec = trsec edata['sec'] = sec if sec in _INP_SECTIONS: section = sec #logger.info('%(fname)s:%(lnum)-6d %(sec)13s section found' % edata) continue elif sec == '[END]': #logger.info('%(fname)s:%(lnum)-6d %(sec)13s end of file found' % edata) section = None break else: raise ENSyntaxError(201, line_num=lnum, line = line) elif section is None and line.startswith(';'): self.top_comments.append(line[1:]) continue elif section is None: logger.debug('Found confusing line: %s', repr(line)) raise ENSyntaxError(201, line_num=lnum, line=line) # We have text, and we are in a section self.sections[section].append((lnum, line)) # Parse each of the sections # The order of operations is important as certain things require prior knowledge try: ### OPTIONS self._read_options() ### TIMES self._read_times() ### CURVES self._read_curves() ### PATTERNS self._read_patterns() ### JUNCTIONS self._read_junctions() ### RESERVOIRS self._read_reservoirs() ### TANKS self._read_tanks() ### PIPES self._read_pipes() ### PUMPS self._read_pumps() ### VALVES self._read_valves() ### COORDINATES self._read_coordinates() ### SOURCES self._read_sources() ### STATUS self._read_status() ### CONTROLS self._read_controls() ### RULES self._read_rules() ### REACTIONS self._read_reactions() ### TITLE self._read_title() ### ENERGY self._read_energy() ### DEMANDS self._read_demands() ### EMITTERS self._read_emitters() ### QUALITY self._read_quality() self._read_mixing() self._read_report() self._read_vertices() self._read_labels() ### Parse Backdrop self._read_backdrop() ### TAGS self._read_tags() # Set the _inpfile io data inside the water network, so it is saved somewhere wn._inpfile = self ### Finish tags self._read_end() except EpanetException as e: raise EpanetException(200, filename) from e return self.wn
[docs] def write(self, filename, wn, units=None, version=2.2, force_coordinates=False): """ Write a water network model into an EPANET INP file. .. note:: Please note that by default, an EPANET 2.2 formatted file is written by WNTR. An INP file with version 2.2 options *will not* work with EPANET 2.0 (neither command line nor GUI). By default, WNTR will use the EPANET 2.2 toolkit. Parameters ---------- filename : str Name of the EPANET INP file. units : str, int or FlowUnits Name of the units for the EPANET INP file to be written in. version : float, {2.0, **2.2**} Defaults to 2.2; use 2.0 to guarantee backward compatability, but this will turn off PDD mode and supress the writing of other EPANET 2.2-specific options. If PDD mode is specified, a warning will be issued. force_coordinates : bool This only applies if `self.options.graphics.map_filename` is not `None`, and will force the COORDINATES section to be written even if a MAP file is provided. False by default, but coordinates **are** written by default since the MAP file is `None` by default. """ if not isinstance(wn, WaterNetworkModel): raise ValueError('Must pass a WaterNetworkModel object') if units is not None and isinstance(units, str): units=units.upper() if units=="SI": raise Exception("unit cannot be 'SI'") self.flow_units = FlowUnits[units] elif units is not None and isinstance(units, FlowUnits): self.flow_units = units elif units is not None and isinstance(units, int): self.flow_units = FlowUnits(units) elif self.flow_units is not None: self.flow_units = self.flow_units elif isinstance(wn.options.hydraulic.inpfile_units, str): units = wn.options.hydraulic.inpfile_units.upper() self.flow_units = FlowUnits[units] else: self.flow_units = FlowUnits.GPM if self.mass_units is None: self.mass_units = MassUnits.mg with io.open(filename, 'wb') as f: self._write_title(f, wn) self._write_junctions(f, wn) self._write_reservoirs(f, wn) self._write_tanks(f, wn, version=version) self._write_pipes(f, wn) self._write_pumps(f, wn) self._write_valves(f, wn) self._write_tags(f, wn) self._write_demands(f, wn) self._write_status(f, wn) self._write_patterns(f, wn) self._write_curves(f, wn) self._write_controls(f, wn) self._write_rules(f, wn) self._write_energy(f, wn) self._write_emitters(f, wn) self._write_quality(f, wn) self._write_sources(f, wn) self._write_reactions(f, wn) self._write_mixing(f, wn) self._write_times(f, wn) self._write_report(f, wn) self._write_options(f, wn, version=version) if wn.options.graphics.map_filename is None or force_coordinates is True: self._write_coordinates(f, wn) self._write_vertices(f, wn) self._write_labels(f, wn) self._write_backdrop(f, wn) self._write_end(f, wn)
### Network Components def _read_title(self): lines = [] for lnum, line in self.sections['[TITLE]']: line = line.split(';')[0] current = line.split() if current == []: continue lines.append(line) self.wn.title = lines def _write_title(self, f, wn): if wn.name is not None: f.write('; Filename: {0}\n'.format(wn.name).encode(sys_default_enc)) f.write('; WNTR: {}\n; Created: {:%Y-%m-%d %H:%M:%S}\n'.format(wntr.__version__, datetime.datetime.now()).encode(sys_default_enc)) f.write('[TITLE]\n'.encode(sys_default_enc)) if hasattr(wn, 'title'): for line in wn.title: f.write('{}\n'.format(line).encode(sys_default_enc)) f.write('\n'.encode(sys_default_enc)) def _read_junctions(self): # try: for lnum, line in self.sections['[JUNCTIONS]']: line = line.split(';')[0] current = line.split() if current == []: continue if len(current) > 3: pat = current[3] elif self.wn.options.hydraulic.pattern: pat = self.wn.options.hydraulic.pattern else: pat = self.wn.patterns.default_pattern base_demand = 0.0 if len(current) > 2: base_demand = to_si(self.flow_units, float(current[2]), HydParam.Demand) self.wn.add_junction(current[0], base_demand, pat, to_si(self.flow_units, float(current[1]), HydParam.Elevation), demand_category=None) # except Exception as e: # print(line) # raise e def _write_junctions(self, f, wn): f.write('[JUNCTIONS]\n'.encode(sys_default_enc)) f.write(_JUNC_LABEL.format(';ID', 'Elevation', 'Demand', 'Pattern').encode(sys_default_enc)) nnames = list(wn.junction_name_list) # nnames.sort() for junction_name in nnames: junction = wn.nodes[junction_name] if junction.demand_timeseries_list: base_demands = junction.demand_timeseries_list.base_demand_list() demand_patterns = junction.demand_timeseries_list.pattern_list() if base_demands: base_demand = base_demands[0] else: base_demand = 0.0 if demand_patterns: if demand_patterns[0] == wn.options.hydraulic.pattern: demand_pattern = None else: demand_pattern = demand_patterns[0] else: demand_pattern = None else: base_demand = 0.0 demand_pattern = None E = {'name': junction_name, 'elev': from_si(self.flow_units, junction.elevation, HydParam.Elevation), 'dem': from_si(self.flow_units, base_demand, HydParam.Demand), 'pat': '', 'com': ';'} if demand_pattern is not None: E['pat'] = str(demand_pattern) f.write(_JUNC_ENTRY.format(**E).encode(sys_default_enc)) f.write('\n'.encode(sys_default_enc)) def _read_reservoirs(self): for lnum, line in self.sections['[RESERVOIRS]']: line = line.split(';')[0] current = line.split() if current == []: continue if len(current) == 2: self.wn.add_reservoir(current[0], to_si(self.flow_units, float(current[1]), HydParam.HydraulicHead)) else: self.wn.add_reservoir(current[0], to_si(self.flow_units, float(current[1]), HydParam.HydraulicHead), current[2]) def _write_reservoirs(self, f, wn): f.write('[RESERVOIRS]\n'.encode(sys_default_enc)) f.write(_RES_LABEL.format(';ID', 'Head', 'Pattern').encode(sys_default_enc)) nnames = list(wn.reservoir_name_list) # nnames.sort() for reservoir_name in nnames: reservoir = wn.nodes[reservoir_name] E = {'name': reservoir_name, 'head': from_si(self.flow_units, reservoir.head_timeseries.base_value, HydParam.HydraulicHead), 'com': ';'} if reservoir.head_timeseries.pattern is None: E['pat'] = '' else: E['pat'] = reservoir.head_timeseries.pattern.name f.write(_RES_ENTRY.format(**E).encode(sys_default_enc)) f.write('\n'.encode(sys_default_enc)) def _read_tanks(self): for lnum, line in self.sections['[TANKS]']: line = line.split(';')[0] current = line.split() if current == []: continue volume = None if len(current) >= 8: # Volume curve provided volume = float(current[6]) curve_name = current[7] if curve_name == '*': curve_name = None else: curve_points = [] for point in self.curves[curve_name]: x = to_si(self.flow_units, point[0], HydParam.Length) y = to_si(self.flow_units, point[1], HydParam.Volume) curve_points.append((x, y)) self.wn.add_curve(curve_name, 'VOLUME', curve_points) # curve = self.wn.get_curve(curve_name) if len(current) == 9: overflow = current[8] else: overflow = False elif len(current) == 7: curve_name = None overflow = False volume = float(current[6]) elif len(current) == 6: curve_name = None overflow = False volume = 0.0 else: raise ENSyntaxError(201, 'Tank entry format not recognized.', line_num=lnum, line=line) self.wn.add_tank(current[0], to_si(self.flow_units, float(current[1]), HydParam.Elevation), to_si(self.flow_units, float(current[2]), HydParam.Length), to_si(self.flow_units, float(current[3]), HydParam.Length), to_si(self.flow_units, float(current[4]), HydParam.Length), to_si(self.flow_units, float(current[5]), HydParam.TankDiameter), to_si(self.flow_units, float(volume), HydParam.Volume), curve_name, overflow) def _write_tanks(self, f, wn, version=2.2): f.write('[TANKS]\n'.encode(sys_default_enc)) if version != 2.2: f.write(_TANK_LABEL.format(';ID', 'Elevation', 'Init Level', 'Min Level', 'Max Level', 'Diameter', 'Min Volume', 'Volume Curve','').encode(sys_default_enc)) else: f.write(_TANK_LABEL.format(';ID', 'Elevation', 'Init Level', 'Min Level', 'Max Level', 'Diameter', 'Min Volume', 'Volume Curve','Overflow').encode(sys_default_enc)) nnames = list(wn.tank_name_list) # nnames.sort() for tank_name in nnames: tank = wn.nodes[tank_name] E = {'name': tank_name, 'elev': from_si(self.flow_units, tank.elevation, HydParam.Elevation), 'initlev': from_si(self.flow_units, tank.init_level, HydParam.HydraulicHead), 'minlev': from_si(self.flow_units, tank.min_level, HydParam.HydraulicHead), 'maxlev': from_si(self.flow_units, tank.max_level, HydParam.HydraulicHead), 'diam': from_si(self.flow_units, tank.diameter, HydParam.TankDiameter), 'minvol': from_si(self.flow_units, tank.min_vol, HydParam.Volume), 'curve': '', 'overflow': '', 'com': ';'} if tank.vol_curve is not None: E['curve'] = tank.vol_curve.name if version ==2.2: if tank.overflow: E['overflow'] = 'YES' if tank.vol_curve is None: E['curve'] = '*' f.write(_TANK_ENTRY.format(**E).encode(sys_default_enc)) f.write('\n'.encode(sys_default_enc)) def _read_pipes(self): darcy_weisbach = self.wn.options.hydraulic.headloss == "D-W" for lnum, line in self.sections['[PIPES]']: line = line.split(';')[0] current = line.split() if current == []: continue if len(current) == 8: minor_loss = float(current[6]) if current[7].upper() == 'CV': link_status = LinkStatus.Open check_valve = True else: link_status = LinkStatus[current[7].upper()] check_valve = False elif len(current) == 7: minor_loss = float(current[6]) link_status = LinkStatus.Open check_valve = False elif len(current) == 6: minor_loss = 0. link_status = LinkStatus.Open check_valve = False try: self.wn.add_pipe(current[0], current[1], current[2], to_si(self.flow_units, float(current[3]), HydParam.Length), to_si(self.flow_units, float(current[4]), HydParam.PipeDiameter), to_si(self.flow_units, float(current[5]), HydParam.RoughnessCoeff, darcy_weisbach=darcy_weisbach), minor_loss, link_status, check_valve) except KeyError as e: raise ENKeyError(203, str(e.args[0]), line_num=lnum) from e except ValueError as e: raise ENValueError(211, str(e.args[0]), line_num=lnum) from e def _write_pipes(self, f, wn): darcy_weisbach = wn.options.hydraulic.headloss == "D-W" f.write('[PIPES]\n'.encode(sys_default_enc)) f.write(_PIPE_LABEL.format(';ID', 'Node1', 'Node2', 'Length', 'Diameter', 'Roughness', 'Minor Loss', 'Status').encode(sys_default_enc)) lnames = list(wn.pipe_name_list) # lnames.sort() for pipe_name in lnames: pipe = wn.links[pipe_name] E = {'name': pipe_name, 'node1': pipe.start_node_name, 'node2': pipe.end_node_name, 'len': from_si(self.flow_units, pipe.length, HydParam.Length), 'diam': from_si(self.flow_units, pipe.diameter, HydParam.PipeDiameter), 'rough': from_si(self.flow_units, pipe.roughness, HydParam.RoughnessCoeff, darcy_weisbach=darcy_weisbach), 'mloss': pipe.minor_loss, 'status': str(pipe.initial_status), 'com': ';'} if pipe.check_valve: E['status'] = 'CV' f.write(_PIPE_ENTRY.format(**E).encode(sys_default_enc)) f.write('\n'.encode(sys_default_enc)) def _read_pumps(self): def create_curve(curve_name): curve_points = [] if curve_name not in self.wn.curve_name_list or \ self.wn.get_curve(curve_name) is None: for point in self.curves[curve_name]: x = to_si(self.flow_units, point[0], HydParam.Flow) y = to_si(self.flow_units, point[1], HydParam.HydraulicHead) curve_points.append((x,y)) self.wn.add_curve(curve_name, 'HEAD', curve_points) curve = self.wn.get_curve(curve_name) return curve for lnum, line in self.sections['[PUMPS]']: line = line.split(';')[0] current = line.split() if current == []: continue pump_type = None value = None speed = None pattern = None for i in range(3, len(current), 2): if current[i].upper() == 'HEAD': # assert pump_type is None, 'In [PUMPS] entry, specify either HEAD or POWER once.' pump_type = 'HEAD' value = create_curve(current[i+1]).name elif current[i].upper() == 'POWER': # assert pump_type is None, 'In [PUMPS] entry, specify either HEAD or POWER once.' pump_type = 'POWER' value = to_si(self.flow_units, float(current[i+1]), HydParam.Power) elif current[i].upper() == 'SPEED': # assert speed is None, 'In [PUMPS] entry, SPEED may only be specified once.' speed = float(current[i+1]) elif current[i].upper() == 'PATTERN': # assert pattern is None, 'In [PUMPS] entry, PATTERN may only be specified once.' pattern = self.wn.get_pattern(current[i+1]).name else: raise ENSyntaxError(201, 'Pump keyword not recognized: {}'.format(current[i].upper()), line_num=lnum, line=line) if speed is None: speed = 1.0 if pump_type is None: raise ENSyntaxError(217, line_num=lnum, line=line) self.wn.add_pump(current[0], current[1], current[2], pump_type, value, speed, pattern) def _write_pumps(self, f, wn): f.write('[PUMPS]\n'.encode(sys_default_enc)) f.write(_PUMP_LABEL.format(';ID', 'Node1', 'Node2', 'Properties').encode(sys_default_enc)) lnames = list(wn.pump_name_list) # lnames.sort() for pump_name in lnames: pump = wn.links[pump_name] E = {'name': pump_name, 'node1': pump.start_node_name, 'node2': pump.end_node_name, 'ptype': pump.pump_type, 'params': '', # 'speed_keyword': 'SPEED', # 'speed': pump.speed_timeseries.base_value, 'com': ';'} if pump.pump_type == 'HEAD': E['params'] = pump.pump_curve_name elif pump.pump_type == 'POWER': E['params'] = str(from_si(self.flow_units, pump.power, HydParam.Power)) else: raise RuntimeError('Only head or power info is supported of pumps.') tmp_entry = _PUMP_ENTRY if pump.speed_timeseries.base_value != 1: E['speed_keyword'] = 'SPEED' E['speed'] = pump.speed_timeseries.base_value tmp_entry = (tmp_entry.rstrip('\n').rstrip('}').rstrip('com:>3s').rstrip(' {') + ' {speed_keyword:8s} {speed:15.11g} {com:>3s}\n') if pump.speed_timeseries.pattern is not None: tmp_entry = (tmp_entry.rstrip('\n').rstrip('}').rstrip('com:>3s').rstrip(' {') + ' {pattern_keyword:10s} {pattern:20s} {com:>3s}\n') E['pattern_keyword'] = 'PATTERN' E['pattern'] = pump.speed_timeseries.pattern.name f.write(tmp_entry.format(**E).encode(sys_default_enc)) f.write('\n'.encode(sys_default_enc)) def _read_valves(self): for lnum, line in self.sections['[VALVES]']: line = line.split(';')[0] current = line.split() if current == []: continue if len(current) == 6: current.append(0.0) else: if len(current) != 7: raise ENSyntaxError(201, 'valve definitions must have 6 or 7 values', line_num=lnum, line=line) valve_type = current[4].upper() if valve_type in ['PRV', 'PSV', 'PBV']: valve_set = to_si(self.flow_units, float(current[5]), HydParam.Pressure) elif valve_type == 'FCV': valve_set = to_si(self.flow_units, float(current[5]), HydParam.Flow) elif valve_type == 'TCV': valve_set = float(current[5]) elif valve_type == 'GPV': curve_name = current[5] curve_points = [] for point in self.curves[curve_name]: x = to_si(self.flow_units, point[0], HydParam.Flow) y = to_si(self.flow_units, point[1], HydParam.HeadLoss) curve_points.append((x, y)) self.wn.add_curve(curve_name, 'HEADLOSS', curve_points) valve_set = curve_name else: raise ENSyntaxError(213, 'valve type unrecognized', line_num=lnum, line=line) self.wn.add_valve(current[0], current[1], current[2], to_si(self.flow_units, float(current[3]), HydParam.PipeDiameter), current[4].upper(), float(current[6]), valve_set) def _write_valves(self, f, wn): f.write('[VALVES]\n'.encode(sys_default_enc)) f.write(_VALVE_LABEL.format(';ID', 'Node1', 'Node2', 'Diameter', 'Type', 'Setting', 'Minor Loss').encode(sys_default_enc)) lnames = list(wn.valve_name_list) # lnames.sort() for valve_name in lnames: valve = wn.links[valve_name] E = {'name': valve_name, 'node1': valve.start_node_name, 'node2': valve.end_node_name, 'diam': from_si(self.flow_units, valve.diameter, HydParam.PipeDiameter), 'vtype': valve.valve_type, 'set': valve.initial_setting, 'mloss': valve.minor_loss, 'com': ';'} valve_type = valve.valve_type formatter = _VALVE_ENTRY if valve_type in ['PRV', 'PSV', 'PBV']: valve_set = from_si(self.flow_units, valve.initial_setting, HydParam.Pressure) elif valve_type == 'FCV': valve_set = from_si(self.flow_units, valve.initial_setting, HydParam.Flow) elif valve_type == 'TCV': valve_set = valve.initial_setting elif valve_type == 'GPV': valve_set = valve.headloss_curve_name formatter = _GPV_ENTRY E['set'] = valve_set f.write(formatter.format(**E).encode(sys_default_enc)) f.write('\n'.encode(sys_default_enc)) def _read_emitters(self): for lnum, line in self.sections['[EMITTERS]']: # Private attribute on junctions line = line.split(';')[0] current = line.split() if current == []: continue junction = self.wn.get_node(current[0]) junction.emitter_coefficient = to_si(self.flow_units, float(current[1]), HydParam.EmitterCoeff) def _write_emitters(self, f, wn): f.write('[EMITTERS]\n'.encode(sys_default_enc)) entry = '{:10s} {:10s}\n' label = '{:10s} {:10s}\n' f.write(label.format(';ID', 'Flow coefficient').encode(sys_default_enc)) njunctions = list(wn.junction_name_list) # njunctions.sort() for junction_name in njunctions: junction = wn.nodes[junction_name] if junction.emitter_coefficient: val = from_si(self.flow_units, junction.emitter_coefficient, HydParam.EmitterCoeff) f.write(entry.format(junction_name, str(val)).encode(sys_default_enc)) f.write('\n'.encode(sys_default_enc)) ### System Operation def _read_curves(self): for lnum, line in self.sections['[CURVES]']: # It should be noted carefully that these lines are never directly # applied to the WaterNetworkModel object. Because different curve # types are treated differently, each of the curves are converted # the first time they are used, and this is used to build up a # dictionary for those conversions to take place. line = line.split(';')[0] current = line.split() if current == []: continue curve_name = current[0] if curve_name not in self.curves: self.curves[curve_name] = [] self.curves[curve_name].append((float(current[1]), float(current[2]))) self.wn.curves[curve_name] = None def _write_curves(self, f, wn): f.write('[CURVES]\n'.encode(sys_default_enc)) f.write(_CURVE_LABEL.format(';ID', 'X-Value', 'Y-Value').encode(sys_default_enc)) curves = list(wn.curve_name_list) # curves.sort() for curve_name in curves: curve = wn.get_curve(curve_name) if curve.curve_type == 'VOLUME': f.write(';VOLUME: {}\n'.format(curve_name).encode(sys_default_enc)) for point in curve.points: x = from_si(self.flow_units, point[0], HydParam.Length) y = from_si(self.flow_units, point[1], HydParam.Volume) f.write(_CURVE_ENTRY.format(name=curve_name, x=x, y=y, com=';').encode(sys_default_enc)) elif curve.curve_type == 'HEAD': f.write(';PUMP: {}\n'.format(curve_name).encode(sys_default_enc)) for point in curve.points: x = from_si(self.flow_units, point[0], HydParam.Flow) y = from_si(self.flow_units, point[1], HydParam.HydraulicHead) f.write(_CURVE_ENTRY.format(name=curve_name, x=x, y=y, com=';').encode(sys_default_enc)) elif curve.curve_type == 'EFFICIENCY': f.write(';EFFICIENCY: {}\n'.format(curve_name).encode(sys_default_enc)) for point in curve.points: x = from_si(self.flow_units, point[0], HydParam.Flow) y = point[1] f.write(_CURVE_ENTRY.format(name=curve_name, x=x, y=y, com=';').encode(sys_default_enc)) elif curve.curve_type == 'HEADLOSS': f.write(';HEADLOSS: {}\n'.format(curve_name).encode(sys_default_enc)) for point in curve.points: x = from_si(self.flow_units, point[0], HydParam.Flow) y = from_si(self.flow_units, point[1], HydParam.HeadLoss) f.write(_CURVE_ENTRY.format(name=curve_name, x=x, y=y, com=';').encode(sys_default_enc)) else: f.write(';UNKNOWN: {}\n'.format(curve_name).encode(sys_default_enc)) for point in curve.points: x = point[0] y = point[1] f.write(_CURVE_ENTRY.format(name=curve_name, x=x, y=y, com=';').encode(sys_default_enc)) f.write('\n'.encode(sys_default_enc)) f.write('\n'.encode(sys_default_enc)) def _read_patterns(self): _patterns = OrderedDict() for lnum, line in self.sections['[PATTERNS]']: # read the lines for each pattern -- patterns can be multiple lines of arbitrary length line = line.split(';')[0] current = line.split() if current == []: continue pattern_name = current[0] if pattern_name not in _patterns: _patterns[pattern_name] = [] for i in current[1:]: _patterns[pattern_name].append(float(i)) else: for i in current[1:]: _patterns[pattern_name].append(float(i)) for pattern_name, pattern in _patterns.items(): # add the patterns to the water newtork model self.wn.add_pattern(pattern_name, pattern) if not self.wn.options.hydraulic.pattern and '1' in _patterns.keys(): # If there is a pattern called "1", then it is the default pattern if no other is supplied self.wn.options.hydraulic.pattern = '1' elif self.wn.options.hydraulic.pattern not in _patterns.keys(): # Sanity check - if the default pattern does not exist and it is not '1' then balk # If default is '1' but it does not exist, then it is constant # Any other default that does not exist is an error if self.wn.options.hydraulic.pattern is not None and self.wn.options.hydraulic.pattern != '1': raise ENKeyError(205, self.wn.options.hydraulic.pattern) self.wn.options.hydraulic.pattern = None def _write_patterns(self, f, wn): num_columns = 6 f.write('[PATTERNS]\n'.encode(sys_default_enc)) f.write('{:10s} {:10s}\n'.format(';ID', 'Multipliers').encode(sys_default_enc)) patterns = list(wn.pattern_name_list) # patterns.sort() for pattern_name in patterns: pattern = wn.get_pattern(pattern_name) count = 0 for i in pattern.multipliers: if count % num_columns == 0: f.write('\n{:s} {:f}'.format(pattern_name, i).encode(sys_default_enc)) else: f.write(' {:f}'.format(i).encode(sys_default_enc)) count += 1 f.write('\n'.encode(sys_default_enc)) f.write('\n'.encode(sys_default_enc)) def _read_energy(self): for lnum, line in self.sections['[ENERGY]']: line = line.split(';')[0] current = line.split() if current == []: continue # Only add head curves for pumps if current[0].upper() == 'GLOBAL': if current[1].upper() == 'PRICE': self.wn.options.energy.global_price = from_si(self.flow_units, float(current[2]), HydParam.Energy) elif current[1].upper() == 'PATTERN': self.wn.options.energy.global_pattern = current[2] elif current[1].upper() in ['EFFIC', 'EFFICIENCY']: self.wn.options.energy.global_efficiency = float(current[2]) else: logger.warning('Unknown entry in ENERGY section: %s', line) elif current[0].upper() == 'DEMAND': self.wn.options.energy.demand_charge = float(current[2]) elif current[0].upper() == 'PUMP': pump_name = current[1] pump = self.wn.links[pump_name] if current[2].upper() == 'PRICE': pump.energy_price = from_si(self.flow_units, float(current[3]), HydParam.Energy) elif current[2].upper() == 'PATTERN': pump.energy_pattern = current[3] elif current[2].upper() in ['EFFIC', 'EFFICIENCY']: curve_name = current[3] curve_points = [] for point in self.curves[curve_name]: x = to_si(self.flow_units, point[0], HydParam.Flow) y = point[1] curve_points.append((x, y)) self.wn.add_curve(curve_name, 'EFFICIENCY', curve_points) curve = self.wn.get_curve(curve_name) pump.efficiency = curve else: logger.warning('Unknown entry in ENERGY section: %s', line) else: logger.warning('Unknown entry in ENERGY section: %s', line) def _write_energy(self, f, wn): f.write('[ENERGY]\n'.encode(sys_default_enc)) if True: #wn.energy is not None: if wn.options.energy.global_efficiency is not None: f.write('GLOBAL EFFICIENCY {:.4f}\n'.format(wn.options.energy.global_efficiency).encode(sys_default_enc)) if wn.options.energy.global_price is not None: f.write('GLOBAL PRICE {:.4f}\n'.format(to_si(self.flow_units, wn.options.energy.global_price, HydParam.Energy)).encode(sys_default_enc)) if wn.options.energy.demand_charge is not None: f.write('DEMAND CHARGE {:.4f}\n'.format(wn.options.energy.demand_charge).encode(sys_default_enc)) if wn.options.energy.global_pattern is not None: f.write('GLOBAL PATTERN {:s}\n'.format(wn.options.energy.global_pattern).encode(sys_default_enc)) lnames = list(wn.pump_name_list) lnames.sort() for pump_name in lnames: pump = wn.links[pump_name] if pump.efficiency is not None: f.write('PUMP {:10s} EFFIC {:s}\n'.format(pump_name, pump.efficiency.name).encode(sys_default_enc)) if pump.energy_price is not None: f.write('PUMP {:10s} PRICE {:.4f}\n'.format(pump_name, to_si(self.flow_units, pump.energy_price, HydParam.Energy)).encode(sys_default_enc)) if pump.energy_pattern is not None: f.write('PUMP {:10s} PATTERN {:s}\n'.format(pump_name, pump.energy_pattern).encode(sys_default_enc)) f.write('\n'.encode(sys_default_enc)) def _read_status(self): for lnum, line in self.sections['[STATUS]']: line = line.split(';')[0] current = line.split() if current == []: continue # assert(len(current) == 2), ("Error reading [STATUS] block, Check format.") link = self.wn.get_link(current[0]) if (current[1].upper() == 'OPEN' or current[1].upper() == 'CLOSED' or current[1].upper() == 'ACTIVE'): new_status = LinkStatus[current[1].upper()] link.initial_status = new_status link._user_status = new_status else: if isinstance(link, wntr.network.Valve): new_status = LinkStatus.Active valve_type = link.valve_type if valve_type in ['PRV', 'PSV', 'PBV']: setting = to_si(self.flow_units, float(current[1]), HydParam.Pressure) elif valve_type == 'FCV': setting = to_si(self.flow_units, float(current[1]), HydParam.Flow) elif valve_type == 'TCV': setting = float(current[1]) else: continue else: new_status = LinkStatus.Open setting = float(current[1]) # link.setting = setting link.initial_setting = setting link._user_status = new_status link.initial_status = new_status def _write_status(self, f, wn): f.write('[STATUS]\n'.encode(sys_default_enc)) f.write( '{:10s} {:10s}\n'.format(';ID', 'Setting').encode(sys_default_enc)) pnames = list(wn.pump_name_list) for pump_name in pnames: pump = wn.links[pump_name] if pump.initial_status in (LinkStatus.Closed,): f.write('{:10s} {:10s}\n'.format(pump_name, LinkStatus(pump.initial_status).name).encode(sys_default_enc)) else: setting = pump.initial_setting if isinstance(setting, float) and setting != 1.0: f.write('{:10s} {:10.7g}\n'.format(pump_name, setting).encode(sys_default_enc)) vnames = list(wn.valve_name_list) # lnames.sort() for valve_name in vnames: valve = wn.links[valve_name] #valve_type = valve.valve_type if valve.initial_status not in (LinkStatus.Active,): #LinkStatus.Opened, LinkStatus.Open, f.write('{:10s} {:10s}\n'.format(valve_name, LinkStatus(valve.initial_status).name).encode(sys_default_enc)) # if valve_type in ['PRV', 'PSV', 'PBV']: # valve_set = from_si(self.flow_units, valve.initial_setting, HydParam.Pressure) # elif valve_type == 'FCV': # valve_set = from_si(self.flow_units, valve.initial_setting, HydParam.Flow) # elif valve_type == 'TCV': # valve_set = valve.initial_setting # elif valve_type == 'GPV': # valve_set = None # if valve_set is not None: # f.write('{:10s} {:10.7g}\n'.format(valve_name, float(valve_set)).encode(sys_default_enc)) f.write('\n'.encode(sys_default_enc)) def _read_controls(self): control_count = 0 for lnum, line in self.sections['[CONTROLS]']: control_count += 1 control_name = 'control '+str(control_count) control_obj = _read_control_line(line, self.wn, self.flow_units, control_name) if control_obj is None: control_count -= 1 # control was not found continue if control_name in self.wn.control_name_list: warnings.warn('One or more [CONTROLS] were duplicated in "{}"; duplicates are ignored.'.format(self.wn.name), stacklevel=0) logger.warning('Control already exists: "{}"'.format(control_name)) else: self.wn.add_control(control_name, control_obj) def _write_controls(self, f, wn): def get_setting(control_action, control_name): value = control_action._value attribute = control_action._attribute.lower() if attribute == 'status': setting = LinkStatus(value).name elif attribute == 'base_speed': setting = str(value) elif attribute == 'setting' and isinstance(control_action._target_obj, Valve): valve = control_action._target_obj valve_type = valve.valve_type if valve_type == 'PRV' or valve_type == 'PSV' or valve_type == 'PBV': setting = str(from_si(self.flow_units, value, HydParam.Pressure)) elif valve_type == 'FCV': setting = str(from_si(self.flow_units, value, HydParam.Flow)) elif valve_type == 'TCV': setting = str(value) elif valve_type == 'GPV': setting = value else: raise ValueError('Valve type not recognized' + str(valve_type)) elif attribute == 'setting': setting = value else: setting = None logger.warning('Could not write control '+str(control_name)+' - skipping') return setting f.write('[CONTROLS]\n'.encode(sys_default_enc)) # Time controls and conditional controls only for text, all_control in wn.controls(): control_action = all_control._then_actions[0] if all_control.epanet_control_type is not _ControlType.rule: if len(all_control._then_actions) != 1 or len(all_control._else_actions) != 0: logger.error('Too many actions on CONTROL "%s"'%text) raise RuntimeError('Too many actions on CONTROL "%s"'%text) if not isinstance(control_action.target()[0], Link): continue if isinstance(all_control._condition, (SimTimeCondition, TimeOfDayCondition)): entry = '{ltype} {link} {setting} AT {compare} {time:g}\n' vals = {'ltype': control_action._target_obj.link_type, 'link': control_action._target_obj.name, 'setting': get_setting(control_action, text), 'compare': 'TIME', 'time': all_control._condition._threshold / 3600.0} if vals['setting'] is None: continue if isinstance(all_control._condition, TimeOfDayCondition): vals['compare'] = 'CLOCKTIME' f.write(entry.format(**vals).encode(sys_default_enc)) elif isinstance(all_control._condition, (ValueCondition)): entry = '{ltype} {link} {setting} IF {ntype} {node} {compare} {thresh}\n' vals = {'ltype': control_action._target_obj.link_type, 'link': control_action._target_obj.name, 'setting': get_setting(control_action, text), 'ntype': all_control._condition._source_obj.node_type, 'node': all_control._condition._source_obj.name, 'compare': 'above', 'thresh': 0.0} if vals['setting'] is None: continue if all_control._condition._relation in [np.less, np.less_equal, Comparison.le, Comparison.lt]: vals['compare'] = 'below' threshold = all_control._condition._threshold if isinstance(all_control._condition._source_obj, Tank): vals['thresh'] = from_si(self.flow_units, threshold, HydParam.HydraulicHead) elif isinstance(all_control._condition._source_obj, Junction): vals['thresh'] = from_si(self.flow_units, threshold, HydParam.Pressure) else: raise RuntimeError('Unknown control for EPANET INP files: %s' %type(all_control)) f.write(entry.format(**vals).encode(sys_default_enc)) elif not isinstance(all_control, Control): raise RuntimeError('Unknown control for EPANET INP files: %s' % type(all_control)) f.write('\n'.encode(sys_default_enc)) def _read_rules(self): rules = _EpanetRule.parse_rules_lines(self.sections['[RULES]'], self.flow_units, self.mass_units) for rule in rules: ctrl = rule.generate_control(self.wn) self.wn.add_control(ctrl.name, ctrl) logger.debug('Added %s', str(ctrl)) # wn._en_rules = '\n'.join(self.sections['[RULES]']) #logger.warning('RULES are reapplied directly to an Epanet INP file on write; otherwise unsupported.') def _write_rules(self, f, wn): f.write('[RULES]\n'.encode(sys_default_enc)) for text, all_control in wn.controls(): entry = '{}\n' if all_control.epanet_control_type == _ControlType.rule: if all_control.name == '': all_control._name = text rule = _EpanetRule('blah', self.flow_units, self.mass_units) rule.from_if_then_else(all_control) f.write(entry.format(str(rule)).encode(sys_default_enc)) f.write('\n'.encode(sys_default_enc)) def _read_demands(self): demand_num = 0 has_been_read = set() for lnum, line in self.sections['[DEMANDS]']: ldata = line.split(';') if len(ldata) > 1 and (ldata[1] != ""): category = ldata[1] else: category = None current = ldata[0].split() if current == []: continue demand_num = demand_num + 1 node = self.wn.get_node(current[0]) if len(current) == 2: pattern = None else: pattern = self.wn.get_pattern(current[2]) if node.name not in has_been_read: has_been_read.add(node.name) while len(node.demand_timeseries_list) > 0: del node.demand_timeseries_list[-1] # In EPANET, the [DEMANDS] section overrides demands specified in [JUNCTIONS] # node.demand_timeseries_list.remove_category('EN2 base') node.demand_timeseries_list.append((to_si(self.flow_units, float(current[1]), HydParam.Demand), pattern, category)) def _write_demands(self, f, wn): f.write('[DEMANDS]\n'.encode(sys_default_enc)) entry = '{:10s} {:10s} {:10s}{:s}\n' label = '{:10s} {:10s} {:10s}\n' f.write(label.format(';ID', 'Demand', 'Pattern').encode(sys_default_enc)) nodes = list(wn.junction_name_list) # nodes.sort() for node in nodes: demands = wn.get_node(node).demand_timeseries_list if len(demands) > 1: for ct, demand in enumerate(demands): cat = str(demand.category) #if cat == 'EN2 base': # cat = '' if cat.lower() == 'none': cat = '' else: cat = ' ;' + demand.category E = {'node': node, 'base': from_si(self.flow_units, demand.base_value, HydParam.Demand), 'pat': '', 'cat': cat } if demand.pattern_name in wn.pattern_name_list: E['pat'] = demand.pattern_name f.write(entry.format(E['node'], str(E['base']), E['pat'], E['cat']).encode(sys_default_enc)) f.write('\n'.encode(sys_default_enc)) ### Water Quality def _read_quality(self): for lnum, line in self.sections['[QUALITY]']: line = line.split(';')[0] current = line.split() if current == []: continue node = self.wn.get_node(current[0]) if self.wn.options.quality.parameter == 'CHEMICAL': quality = to_si(self.flow_units, float(current[1]), QualParam.Concentration, mass_units=self.mass_units) elif self.wn.options.quality.parameter == 'AGE': quality = to_si(self.flow_units, float(current[1]), QualParam.WaterAge) else : quality = float(current[1]) node.initial_quality = quality def _write_quality(self, f, wn): f.write('[QUALITY]\n'.encode(sys_default_enc)) entry = '{:10s} {:10s}\n' label = '{:10s} {:10s}\n' nnodes = list(wn.nodes.keys()) # nnodes.sort() for node_name in nnodes: node = wn.nodes[node_name] if node.initial_quality: if wn.options.quality.parameter == 'CHEMICAL': quality = from_si(self.flow_units, node.initial_quality, QualParam.Concentration, mass_units=self.mass_units) elif wn.options.quality.parameter == 'AGE': quality = from_si(self.flow_units, node.initial_quality, QualParam.WaterAge) else: quality = node.initial_quality f.write(entry.format(node_name, str(quality)).encode(sys_default_enc)) f.write('\n'.encode(sys_default_enc)) def _read_reactions(self): BulkReactionCoeff = QualParam.BulkReactionCoeff WallReactionCoeff = QualParam.WallReactionCoeff if self.mass_units is None: self.mass_units = MassUnits.mg for lnum, line in self.sections['[REACTIONS]']: line = line.split(';')[0] current = line.split() if current == []: continue # assert len(current) == 3, ('INP file option in [REACTIONS] block ' # 'not recognized: ' + line) key1 = current[0].upper() key2 = current[1].upper() val3 = float(current[2]) if key1 == 'ORDER': if key2 == 'BULK': self.wn.options.reaction.bulk_order = int(float(current[2])) elif key2 == 'WALL': self.wn.options.reaction.wall_order = int(float(current[2])) elif key2 == 'TANK': self.wn.options.reaction.tank_order = int(float(current[2])) elif key1 == 'GLOBAL': if key2 == 'BULK': self.wn.options.reaction.bulk_coeff = to_si(self.flow_units, val3, BulkReactionCoeff, mass_units=self.mass_units, reaction_order=self.wn.options.reaction.bulk_order) elif key2 == 'WALL': self.wn.options.reaction.wall_coeff = to_si(self.flow_units, val3, WallReactionCoeff, mass_units=self.mass_units, reaction_order=self.wn.options.reaction.wall_order) elif key1 == 'BULK': pipe = self.wn.get_link(current[1]) pipe.bulk_coeff = to_si(self.flow_units, val3, BulkReactionCoeff, mass_units=self.mass_units, reaction_order=self.wn.options.reaction.bulk_order) elif key1 == 'WALL': pipe = self.wn.get_link(current[1]) pipe.wall_coeff = to_si(self.flow_units, val3, WallReactionCoeff, mass_units=self.mass_units, reaction_order=self.wn.options.reaction.wall_order) elif key1 == 'TANK': tank = self.wn.get_node(current[1]) tank.bulk_coeff = to_si(self.flow_units, val3, BulkReactionCoeff, mass_units=self.mass_units, reaction_order=self.wn.options.reaction.bulk_order) elif key1 == 'LIMITING': self.wn.options.reaction.limiting_potential = float(current[2]) elif key1 == 'ROUGHNESS': self.wn.options.reaction.roughness_correl = float(current[2]) else: raise ENValueError(213, key1, line_num=lnum, line=line) def _write_reactions(self, f, wn): f.write( '[REACTIONS]\n'.encode(sys_default_enc)) f.write(';Type Pipe/Tank Coefficient\n'.encode(sys_default_enc)) entry_int = ' {:s} {:s} {:d}\n' entry_float = ' {:s} {:s} {:<10.4f}\n' for tank_name, tank in wn.nodes(Tank): if tank.bulk_coeff is not None: f.write(entry_float.format('TANK',tank_name, from_si(self.flow_units, tank.bulk_coeff, QualParam.BulkReactionCoeff, mass_units=self.mass_units, reaction_order=wn.options.reaction.bulk_order)).encode(sys_default_enc)) for pipe_name, pipe in wn.links(Pipe): if pipe.bulk_coeff is not None: f.write(entry_float.format('BULK',pipe_name, from_si(self.flow_units, pipe.bulk_coeff, QualParam.BulkReactionCoeff, mass_units=self.mass_units, reaction_order=wn.options.reaction.bulk_order)).encode(sys_default_enc)) if pipe.wall_coeff is not None: f.write(entry_float.format('WALL',pipe_name, from_si(self.flow_units, pipe.wall_coeff, QualParam.WallReactionCoeff, mass_units=self.mass_units, reaction_order=wn.options.reaction.wall_order)).encode(sys_default_enc)) f.write('\n'.encode(sys_default_enc)) # f.write('[REACTIONS]\n'.encode(sys_default_enc)) # EPANET GUI puts this line in here f.write(entry_int.format('ORDER', 'BULK', int(wn.options.reaction.bulk_order)).encode(sys_default_enc)) f.write(entry_int.format('ORDER', 'TANK', int(wn.options.reaction.tank_order)).encode(sys_default_enc)) f.write(entry_int.format('ORDER', 'WALL', int(wn.options.reaction.wall_order)).encode(sys_default_enc)) f.write(entry_float.format('GLOBAL','BULK', from_si(self.flow_units, wn.options.reaction.bulk_coeff, QualParam.BulkReactionCoeff, mass_units=self.mass_units, reaction_order=wn.options.reaction.bulk_order)).encode(sys_default_enc)) f.write(entry_float.format('GLOBAL','WALL', from_si(self.flow_units, wn.options.reaction.wall_coeff, QualParam.WallReactionCoeff, mass_units=self.mass_units, reaction_order=wn.options.reaction.wall_order)).encode(sys_default_enc)) if wn.options.reaction.limiting_potential is not None: f.write(entry_float.format('LIMITING','POTENTIAL',wn.options.reaction.limiting_potential).encode(sys_default_enc)) if wn.options.reaction.roughness_correl is not None: f.write(entry_float.format('ROUGHNESS','CORRELATION',wn.options.reaction.roughness_correl).encode(sys_default_enc)) f.write('\n'.encode(sys_default_enc)) def _read_sources(self): source_num = 0 for lnum, line in self.sections['[SOURCES]']: line = line.split(';')[0] current = line.split() if current == []: continue # assert(len(current) >= 3), ("Error reading sources. Check format.") source_num = source_num + 1 if current[0].upper() == 'MASS': strength = to_si(self.flow_units, float(current[2]), QualParam.SourceMassInject, self.mass_units) else: strength = to_si(self.flow_units, float(current[2]), QualParam.Concentration, self.mass_units) if len(current) == 3: self.wn.add_source('INP'+str(source_num), current[0], current[1], strength, None) else: self.wn.add_source('INP'+str(source_num), current[0], current[1], strength, current[3]) def _write_sources(self, f, wn): f.write('[SOURCES]\n'.encode(sys_default_enc)) entry = '{:10s} {:10s} {:10s} {:10s}\n' label = '{:10s} {:10s} {:10s} {:10s}\n' f.write(label.format(';Node', 'Type', 'Quality', 'Pattern').encode(sys_default_enc)) nsources = list(wn._sources.keys()) # nsources.sort() for source_name in nsources: source = wn._sources[source_name] if source.source_type.upper() == 'MASS': strength = from_si(self.flow_units, source.strength_timeseries.base_value, QualParam.SourceMassInject, self.mass_units) else: # CONC, SETPOINT, FLOWPACED strength = from_si(self.flow_units, source.strength_timeseries.base_value, QualParam.Concentration, self.mass_units) E = {'node': source.node_name, 'type': source.source_type, 'quality': str(strength), 'pat': ''} if source.strength_timeseries.pattern_name is not None: E['pat'] = source.strength_timeseries.pattern_name f.write(entry.format(E['node'], E['type'], str(E['quality']), E['pat']).encode(sys_default_enc)) f.write('\n'.encode(sys_default_enc)) def _read_mixing(self): for lnum, line in self.sections['[MIXING]']: line = line.split(';')[0] current = line.split() if current == []: continue key = current[1].upper() tank_name = current[0] tank = self.wn.get_node(tank_name) if key == 'MIXED': tank.mixing_model = MixType.Mix1 elif key == '2COMP' and len(current) > 2: tank.mixing_model = MixType.Mix2 tank.mixing_fraction = float(current[2]) elif key == '2COMP' and len(current) < 3: raise ENSyntaxError(201, 'missing fraction for mixing', line_num=lnum, line=line) elif key == 'FIFO': tank.mixing_model = MixType.FIFO elif key == 'LIFO': tank.mixing_model = MixType.LIFO def _write_mixing(self, f, wn): f.write('[MIXING]\n'.encode(sys_default_enc)) f.write('{:20s} {:5s} {}\n'.format(';Tank ID', 'Model', 'Fraction').encode(sys_default_enc)) lnames = list(wn.tank_name_list) # lnames.sort() for tank_name in lnames: tank = wn.nodes[tank_name] if tank._mixing_model is not None: if tank._mixing_model in [MixType.Mixed, MixType.Mix1, 0]: f.write(' {:19s} MIXED\n'.format(tank_name).encode(sys_default_enc)) elif tank._mixing_model in [MixType.TwoComp, MixType.Mix2, '2comp', '2COMP', 1]: f.write(' {:19s} 2COMP {}\n'.format(tank_name, tank.mixing_fraction).encode(sys_default_enc)) elif tank._mixing_model in [MixType.FIFO, 2]: f.write(' {:19s} FIFO\n'.format(tank_name).encode(sys_default_enc)) elif tank._mixing_model in [MixType.LIFO, 3]: f.write(' {:19s} LIFO\n'.format(tank_name).encode(sys_default_enc)) elif isinstance(tank._mixing_model, str) and tank.mixing_fraction is not None: f.write(' {:19s} {} {}\n'.format(tank_name, tank._mixing_model, tank.mixing_fraction).encode(sys_default_enc)) elif isinstance(tank._mixing_model, str): f.write(' {:19s} {}\n'.format(tank_name, tank._mixing_model).encode(sys_default_enc)) else: logger.warning('Unknown mixing model: %s', tank._mixing_model) f.write('\n'.encode(sys_default_enc)) ### Options and Reporting def _read_options(self): edata = OrderedDict() wn = self.wn opts = wn.options for lnum, line in self.sections['[OPTIONS]']: edata['lnum'] = lnum edata['sec'] = '[OPTIONS]' words, comments = _split_line(line) if words is not None and len(words) > 0: if len(words) < 2: edata['key'] = words[0] raise ENValueError(213, 'NULL', line_num=lnum, line=line) key = words[0].upper() if key == 'UNITS': self.flow_units = FlowUnits[words[1].upper()] opts.hydraulic.inpfile_units = words[1].upper() elif key == 'HEADLOSS': opts.hydraulic.headloss = words[1].upper() elif key == 'HYDRAULICS': opts.hydraulic.hydraulics = words[1].upper() opts.hydraulic.hydraulics_filename = words[2] elif key == 'QUALITY': mode = words[1].upper() if mode in ['NONE', 'AGE']: opts.quality.parameter = words[1].upper() elif mode in ['TRACE']: opts.quality.parameter = 'TRACE' opts.quality.trace_node = words[2] else: opts.quality.parameter = 'CHEMICAL' opts.quality.chemical_name = words[1] if len(words) > 2: if 'mg' in words[2].lower(): self.mass_units = MassUnits.mg opts.quality.inpfile_units = words[2] elif 'ug' in words[2].lower(): self.mass_units = MassUnits.ug opts.quality.inpfile_units = words[2] else: raise ENValueError(213, 'for chemical units', line_num=lnum, line=line) else: self.mass_units = MassUnits.mg opts.quality.inpfile_units = 'mg/L' elif key == 'VISCOSITY': opts.hydraulic.viscosity = float(words[1]) elif key == 'DIFFUSIVITY': opts.quality.diffusivity = float(words[1]) elif key == 'SPECIFIC': opts.hydraulic.specific_gravity = float(words[2]) elif key == 'TRIALS': opts.hydraulic.trials = int(float(words[1])) elif key == 'ACCURACY': opts.hydraulic.accuracy = float(words[1]) elif key == 'HEADERROR': opts.hydraulic.headerror = float(words[1]) elif key == 'FLOWCHANGE': opts.hydraulic.flowchange = float(words[1]) elif key == 'UNBALANCED': opts.hydraulic.unbalanced = words[1].upper() if len(words) > 2: opts.hydraulic.unbalanced_value = int(words[2]) elif key == 'MINIMUM': minimum_pressure = to_si(self.flow_units, float(words[2]), HydParam.Pressure) opts.hydraulic.minimum_pressure = minimum_pressure elif key == 'REQUIRED': required_pressure = to_si(self.flow_units, float(words[2]), HydParam.Pressure) opts.hydraulic.required_pressure = required_pressure elif key == 'PRESSURE': if len(words) > 2: if words[1].upper() == 'EXPONENT': opts.hydraulic.pressure_exponent = float(words[2]) else: edata['key'] = ' '.join(words) raise ENSyntaxError(201, 'unknown option', line_num=lnum, line=line) else: opts.hydraulic.inpfile_pressure_units = words[1] elif key == 'PATTERN': opts.hydraulic.pattern = words[1] elif key == 'DEMAND': if len(words) > 2: if words[1].upper() == 'MULTIPLIER': opts.hydraulic.demand_multiplier = float(words[2]) elif words[1].upper() == 'MODEL': opts.hydraulic.demand_model = words[2] else: edata['key'] = ' '.join(words) raise ENSyntaxError(201, 'unknown option', line_num=lnum, line=line) else: edata['key'] = ' '.join(words) raise ENSyntaxError(201, 'unknown option', line_num=lnum, line=line) elif key == 'EMITTER': if len(words) > 2: opts.hydraulic.emitter_exponent = float(words[2]) else: edata['key'] = 'EMITTER EXPONENT' raise ENSyntaxError(201, 'unknown option', line_num=lnum, line=line) elif key == 'TOLERANCE': opts.quality.tolerance = float(words[1]) elif key == 'CHECKFREQ': opts.hydraulic.checkfreq = float(words[1]) elif key == 'MAXCHECK': opts.hydraulic.maxcheck = float(words[1]) elif key == 'DAMPLIMIT': opts.hydraulic.damplimit = float(words[1]) elif key == 'MAP': opts.graphics.map_filename = words[1] else: if len(words) == 2: edata['key'] = words[0] setattr(opts, words[0].lower(), float(words[1])) logger.warn('%(lnum)-6d %(sec)13s option "%(key)s" is undocumented; adding, but please verify syntax', edata) elif len(words) == 3: edata['key'] = words[0] + ' ' + words[1] setattr(opts, words[0].lower() + '_' + words[1].lower(), float(words[2])) logger.warn('%(lnum)-6d %(sec)13s option "%(key)s" is undocumented; adding, but please verify syntax', edata) if isinstance(opts.time.report_timestep, (float, int)): if opts.time.report_timestep < opts.time.hydraulic_timestep: raise ENValueError(202, 'report timestep less than hydraulic timestep') if opts.time.report_timestep % opts.time.hydraulic_timestep != 0: raise ENValueError(202, 'report timestep must be integer multiple of hydraulic timestep') def _write_options(self, f, wn, version=2.2): f.write('[OPTIONS]\n'.encode(sys_default_enc)) entry_string = '{:20s} {:20s}\n' entry_float = '{:20s} {:.11g}\n' f.write(entry_string.format('UNITS', self.flow_units.name).encode(sys_default_enc)) f.write(entry_string.format('HEADLOSS', wn.options.hydraulic.headloss).encode(sys_default_enc)) f.write(entry_float.format('SPECIFIC GRAVITY', wn.options.hydraulic.specific_gravity).encode(sys_default_enc)) f.write(entry_float.format('VISCOSITY', wn.options.hydraulic.viscosity).encode(sys_default_enc)) f.write(entry_float.format('TRIALS', wn.options.hydraulic.trials).encode(sys_default_enc)) f.write(entry_float.format('ACCURACY', wn.options.hydraulic.accuracy).encode(sys_default_enc)) f.write(entry_float.format('CHECKFREQ', wn.options.hydraulic.checkfreq).encode(sys_default_enc)) f.write(entry_float.format('MAXCHECK', wn.options.hydraulic.maxcheck).encode(sys_default_enc)) # EPANET 2.2 OPTIONS if version == 2.0: pass else: if wn.options.hydraulic.headerror != 0: f.write(entry_float.format('HEADERROR', wn.options.hydraulic.headerror).encode(sys_default_enc)) if wn.options.hydraulic.flowchange != 0: f.write(entry_float.format('FLOWCHANGE', wn.options.hydraulic.flowchange).encode(sys_default_enc)) # EPANET 2.x OPTIONS if wn.options.hydraulic.damplimit != 0: f.write(entry_float.format('DAMPLIMIT', wn.options.hydraulic.damplimit).encode(sys_default_enc)) if wn.options.hydraulic.unbalanced_value is None: f.write(entry_string.format('UNBALANCED', wn.options.hydraulic.unbalanced).encode(sys_default_enc)) else: f.write('{:20s} {:s} {:d}\n'.format('UNBALANCED', wn.options.hydraulic.unbalanced, wn.options.hydraulic.unbalanced_value).encode(sys_default_enc)) if wn.options.hydraulic.pattern is not None: f.write(entry_string.format('PATTERN', wn.options.hydraulic.pattern).encode(sys_default_enc)) f.write(entry_float.format('DEMAND MULTIPLIER', wn.options.hydraulic.demand_multiplier).encode(sys_default_enc)) # EPANET 2.2 OPTIONS if version == 2.0: if wn.options.hydraulic.demand_model in ['PDA', 'PDD']: logger.critical('You have specified a PDD analysis using EPANET 2.0. This is not supported in EPANET 2.0. The analysis will default to DD mode.') else: if wn.options.hydraulic.demand_model in ['PDA', 'PDD']: f.write('{:20s} {}\n'.format('DEMAND MODEL', wn.options.hydraulic.demand_model).encode(sys_default_enc)) minimum_pressure = from_si(self.flow_units, wn.options.hydraulic.minimum_pressure, HydParam.Pressure) f.write('{:20s} {:.2f}\n'.format('MINIMUM PRESSURE', minimum_pressure).encode(sys_default_enc)) required_pressure = from_si(self.flow_units, wn.options.hydraulic.required_pressure, HydParam.Pressure) if required_pressure >= 0.1: # EPANET lower limit on required pressure = 0.1 (in psi or m) f.write('{:20s} {:.2f}\n'.format('REQUIRED PRESSURE', required_pressure).encode(sys_default_enc)) else: warnings.warn('REQUIRED PRESSURE is below the lower limit for EPANET (0.1 in psi or m). The value has been set to 0.1 in the INP file.') logger.warning('REQUIRED PRESSURE is below the lower limit for EPANET (0.1 in psi or m). The value has been set to 0.1 in the INP file.') f.write('{:20s} {:.2f}\n'.format('REQUIRED PRESSURE', 0.1).encode(sys_default_enc)) f.write('{:20s} {}\n'.format('PRESSURE EXPONENT', wn.options.hydraulic.pressure_exponent).encode(sys_default_enc)) if wn.options.hydraulic.inpfile_pressure_units is not None: f.write(entry_string.format('PRESSURE', wn.options.hydraulic.inpfile_pressure_units).encode(sys_default_enc)) # EPANET 2.0+ OPTIONS f.write(entry_float.format('EMITTER EXPONENT', wn.options.hydraulic.emitter_exponent).encode(sys_default_enc)) if wn.options.quality.parameter.upper() in ['NONE', 'AGE']: f.write(entry_string.format('QUALITY', wn.options.quality.parameter).encode(sys_default_enc)) elif wn.options.quality.parameter.upper() in ['TRACE']: f.write('{:20s} {} {}\n'.format('QUALITY', wn.options.quality.parameter, wn.options.quality.trace_node).encode(sys_default_enc)) else: f.write('{:20s} {} {}\n'.format('QUALITY', wn.options.quality.chemical_name, wn.options.quality.inpfile_units).encode(sys_default_enc)) f.write(entry_float.format('DIFFUSIVITY', wn.options.quality.diffusivity).encode(sys_default_enc)) f.write(entry_float.format('TOLERANCE', wn.options.quality.tolerance).encode(sys_default_enc)) if wn.options.hydraulic.hydraulics is not None: f.write('{:20s} {:s} {:<30s}\n'.format('HYDRAULICS', wn.options.hydraulic.hydraulics, wn.options.hydraulic.hydraulics_filename).encode(sys_default_enc)) if wn.options.graphics.map_filename is not None: f.write(entry_string.format('MAP', wn.options.graphics.map_filename).encode(sys_default_enc)) f.write('\n'.encode(sys_default_enc)) def _read_times(self): opts = self.wn.options time_format = ['am', 'AM', 'pm', 'PM'] for lnum, line in self.sections['[TIMES]']: line = line.split(';')[0] current = line.split() if current == []: continue if (current[0].upper() == 'DURATION'): opts.time.duration = int(float(current[1]) * 3600) if _is_number(current[1]) else int(_str_time_to_sec(current[1])) elif (current[0].upper() == 'HYDRAULIC'): opts.time.hydraulic_timestep = int(float(current[2]) * 3600) if _is_number(current[2]) else int(_str_time_to_sec(current[2])) elif (current[0].upper() == 'QUALITY'): opts.time.quality_timestep = int(float(current[2]) * 3600) if _is_number(current[2]) else int(_str_time_to_sec(current[2])) elif (current[1].upper() == 'CLOCKTIME'): if len(current) > 3: time_format = current[3].upper() else: # Kludge for 24hr time that needs an AM/PM time_format = 'AM' time = current[2] opts.time.start_clocktime = _clock_time_to_sec(time, time_format) elif (current[0].upper() == 'STATISTIC'): opts.time.statistic = current[1].upper() else: # Other time options: RULE TIMESTEP, PATTERN TIMESTEP, REPORT TIMESTEP, REPORT START key_string = current[0] + '_' + current[1] setattr(opts.time, key_string.lower(), int(float(current[2]) * 3600) if _is_number(current[2]) else int(_str_time_to_sec(current[2]))) def _write_times(self, f, wn): f.write('[TIMES]\n'.encode(sys_default_enc)) entry = '{:20s} {:10s}\n' time_entry = '{:20s} {:02d}:{:02d}:{:02d}\n' time = wn.options.time hrs, mm, sec = _sec_to_string(time.duration) f.write(time_entry.format('DURATION', hrs, mm, sec).encode(sys_default_enc)) hrs, mm, sec = _sec_to_string(time.hydraulic_timestep) f.write(time_entry.format('HYDRAULIC TIMESTEP', hrs, mm, sec).encode(sys_default_enc)) hrs, mm, sec = _sec_to_string(time.quality_timestep) f.write(time_entry.format('QUALITY TIMESTEP', hrs, mm, sec).encode(sys_default_enc)) hrs, mm, sec = _sec_to_string(time.pattern_timestep) f.write(time_entry.format('PATTERN TIMESTEP', hrs, mm, sec).encode(sys_default_enc)) hrs, mm, sec = _sec_to_string(time.pattern_start) f.write(time_entry.format('PATTERN START', hrs, mm, sec).encode(sys_default_enc)) hrs, mm, sec = _sec_to_string(time.report_timestep) f.write(time_entry.format('REPORT TIMESTEP', hrs, mm, sec).encode(sys_default_enc)) hrs, mm, sec = _sec_to_string(time.report_start) f.write(time_entry.format('REPORT START', hrs, mm, sec).encode(sys_default_enc)) hrs, mm, sec = _sec_to_string(time.start_clocktime) if hrs < 12: time_format = ' AM' else: hrs -= 12 time_format = ' PM' f.write('{:20s} {:02d}:{:02d}:{:02d}{:s}\n'.format('START CLOCKTIME', hrs, mm, sec, time_format).encode(sys_default_enc)) hrs, mm, sec = _sec_to_string(time.rule_timestep) f.write(time_entry.format('RULE TIMESTEP', hrs, mm, int(sec)).encode(sys_default_enc)) f.write(entry.format('STATISTIC', wn.options.time.statistic).encode(sys_default_enc)) f.write('\n'.encode(sys_default_enc)) def _read_report(self): for lnum, line in self.sections['[REPORT]']: line = line.split(';')[0] current = line.split() if current == []: continue if current[0].upper() in ['PAGE', 'PAGESIZE']: self.wn.options.report.pagesize = int(current[1]) elif current[0].upper() in ['FILE']: self.wn.options.report.file = current[1] elif current[0].upper() in ['STATUS']: self.wn.options.report.status = current[1].upper() elif current[0].upper() in ['SUMMARY']: self.wn.options.report.summary = current[1].upper() elif current[0].upper() in ['ENERGY']: self.wn.options.report.energy = current[1].upper() elif current[0].upper() in ['NODES']: if current[1].upper() in ['NONE']: self.wn.options.report.nodes = False elif current[1].upper() in ['ALL']: self.wn.options.report.nodes = True elif not isinstance(self.wn.options.report.nodes, list): self.wn.options.report.nodes = [] for ct in range(len(current)-2): i = ct + 2 self.wn.options.report.nodes.append(current[i]) else: for ct in range(len(current)-2): i = ct + 2 self.wn.options.report.nodes.append(current[i]) elif current[0].upper() in ['LINKS']: if current[1].upper() in ['NONE']: self.wn.options.report.links = False elif current[1].upper() in ['ALL']: self.wn.options.report.links = True elif not isinstance(self.wn.options.report.links, list): self.wn.options.report.links = [] for ct in range(len(current)-2): i = ct + 2 self.wn.options.report.links.append(current[i]) else: for ct in range(len(current)-2): i = ct + 2 self.wn.options.report.links.append(current[i]) else: if current[0].lower() not in self.wn.options.report.report_params.keys(): logger.warning('Unknown report parameter: %s', current[0]) continue elif current[1].upper() in ['YES']: self.wn.options.report.report_params[current[0].lower()] = True elif current[1].upper() in ['NO']: self.wn.options.report.report_params[current[0].lower()] = False else: self.wn.options.report.param_opts[current[0].lower()][current[1].upper()] = float(current[2]) def _write_report(self, f, wn): f.write('[REPORT]\n'.encode(sys_default_enc)) report = wn.options.report if report.status.upper() != 'NO': f.write('STATUS {}\n'.format(report.status).encode(sys_default_enc)) if report.summary.upper() != 'YES': f.write('SUMMARY {}\n'.format(report.summary).encode(sys_default_enc)) if report.pagesize is not None: f.write('PAGE {}\n'.format(report.pagesize).encode(sys_default_enc)) if report.report_filename is not None: f.write('FILE {}\n'.format(report.report_filename).encode(sys_default_enc)) if report.energy.upper() != 'NO': f.write('ENERGY {}\n'.format(report.status).encode(sys_default_enc)) if report.nodes is True: f.write('NODES ALL\n'.encode(sys_default_enc)) elif isinstance(report.nodes, str): f.write('NODES {}\n'.format(report.nodes).encode(sys_default_enc)) elif isinstance(report.nodes, list): for ct, node in enumerate(report.nodes): if ct == 0: f.write('NODES {}'.format(node).encode(sys_default_enc)) elif ct % 10 == 0: f.write('\nNODES {}'.format(node).encode(sys_default_enc)) else: f.write(' {}'.format(node).encode(sys_default_enc)) f.write('\n'.encode(sys_default_enc)) if report.links is True: f.write('LINKS ALL\n'.encode(sys_default_enc)) elif isinstance(report.links, str): f.write('LINKS {}\n'.format(report.links).encode(sys_default_enc)) elif isinstance(report.links, list): for ct, link in enumerate(report.links): if ct == 0: f.write('LINKS {}'.format(link).encode(sys_default_enc)) elif ct % 10 == 0: f.write('\nLINKS {}'.format(link).encode(sys_default_enc)) else: f.write(' {}'.format(link).encode(sys_default_enc)) f.write('\n'.encode(sys_default_enc)) # FIXME: defaults no longer located here # for key, item in report.report_params.items(): # if item[1] != item[0]: # f.write('{:10s} {}\n'.format(key.upper(), item[1]).encode(sys_default_enc)) for key, item in report.param_opts.items(): for opt, val in item.items(): f.write('{:10s} {:10s} {}\n'.format(key.upper(), opt.upper(), val).encode(sys_default_enc)) f.write('\n'.encode(sys_default_enc)) ### Network Map/Tags def _read_coordinates(self): for lnum, line in self.sections['[COORDINATES]']: line = line.split(';')[0] current = line.split() if current == []: continue # assert(len(current) == 3), ("Error reading node coordinates. Check format.") node = self.wn.get_node(current[0]) node.coordinates = (float(current[1]), float(current[2])) def _write_coordinates(self, f, wn): f.write('[COORDINATES]\n'.encode(sys_default_enc)) entry = '{:10s} {:20.9f} {:20.9f}\n' label = '{:10s} {:10s} {:10s}\n' f.write(label.format(';Node', 'X-Coord', 'Y-Coord').encode(sys_default_enc)) for name, node in wn.nodes(): val = node.coordinates f.write(entry.format(name, val[0], val[1]).encode(sys_default_enc)) f.write('\n'.encode(sys_default_enc)) def _read_vertices(self): for lnum, line in self.sections['[VERTICES]']: line = line.split(';')[0].strip() current = line.split() if current == []: continue if len(current) != 3: logger.warning('Invalid VERTICES line: %s', line) continue link_name = current[0] link = self.wn.get_link(link_name) link._vertices.append((float(current[1]), float(current[2]))) def _write_vertices(self, f, wn): f.write('[VERTICES]\n'.encode(sys_default_enc)) entry = '{:10s} {:20.9f} {:20.9f}\n' label = '{:10s} {:10s} {:10s}\n' f.write(label.format(';Link', 'X-Coord', 'Y-Coord').encode(sys_default_enc)) for name, link in wn.links(): for vert in link._vertices: f.write(entry.format(name, vert[0], vert[1]).encode(sys_default_enc)) f.write('\n'.encode(sys_default_enc)) def _read_labels(self): labels = [] for lnum, line in self.sections['[LABELS]']: line = line.split(';')[0].strip() current = line.split() if current == []: continue labels.append(line) self.wn._labels = labels def _write_labels(self, f, wn): f.write('[LABELS]\n'.encode(sys_default_enc)) if wn._labels is not None: for label in wn._labels: f.write(' {}\n'.format(label).encode(sys_default_enc)) f.write('\n'.encode(sys_default_enc)) def _read_backdrop(self): for lnum, line in self.sections['[BACKDROP]']: line = line.split(';')[0] current = line.split() if current == []: continue key = current[0].upper() if key == 'DIMENSIONS' and len(current) > 4: self.wn.options.graphics.dimensions = [current[1], current[2], current[3], current[4]] elif key == 'UNITS' and len(current) > 1: self.wn.options.graphics.units = current[1] elif key == 'FILE' and len(current) > 1: self.wn.options.graphics.image_filename = current[1] elif key == 'OFFSET' and len(current) > 2: self.wn.options.graphics.offset = [current[1], current[2]] def _write_backdrop(self, f, wn): if wn.options.graphics is not None: f.write('[BACKDROP]\n'.encode(sys_default_enc)) if wn.options.graphics.dimensions is not None: f.write('DIMENSIONS {0} {1} {2} {3}\n'.format(wn.options.graphics.dimensions[0], wn.options.graphics.dimensions[1], wn.options.graphics.dimensions[2], wn.options.graphics.dimensions[3]).encode(sys_default_enc)) if wn.options.graphics.units is not None: f.write('UNITS {0}\n'.format(wn.options.graphics.units).encode(sys_default_enc)) if wn.options.graphics.image_filename is not None: f.write('FILE {0}\n'.format(wn.options.graphics.image_filename).encode(sys_default_enc)) if wn.options.graphics.offset is not None: f.write('OFFSET {0} {1}\n'.format(wn.options.graphics.offset[0], wn.options.graphics.offset[1]).encode(sys_default_enc)) f.write('\n'.encode(sys_default_enc)) def _read_tags(self): for lnum, line in self.sections['[TAGS]']: line = line.split(';')[0] current = line.split() if current == []: continue if current[0] == 'NODE': node = self.wn.get_node(current[1]) node.tag = current[2] elif current[0] == 'LINK': link = self.wn.get_link(current[1]) link.tag = current[2] else: continue def _write_tags(self, f, wn): f.write('[TAGS]\n'.encode(sys_default_enc)) entry = '{:10s} {:10s} {:10s}\n' label = '{:10s} {:10s} {:10s}\n' f.write(label.format(';type', 'name', 'tag').encode(sys_default_enc)) nnodes = list(wn.node_name_list) # nnodes.sort() for node_name in nnodes: node = wn.nodes[node_name] if node.tag: f.write(entry.format('NODE', node_name, node.tag).encode(sys_default_enc)) nlinks = list(wn.link_name_list) nlinks.sort() for link_name in nlinks: link = wn.links[link_name] if link.tag: f.write(entry.format('LINK', link_name, link.tag).encode(sys_default_enc)) f.write('\n'.encode(sys_default_enc)) ### End of File def _read_end(self): """Finalize read by verifying that all curves have been dealt with""" def create_curve(curve_name): curve_points = [] if curve_name not in self.wn.curve_name_list or self.wn.get_curve(curve_name) is None: for point in self.curves[curve_name]: x = point[0] y = point[1] curve_points.append((x,y)) self.wn.add_curve(curve_name, None, curve_points) curve = self.wn.get_curve(curve_name) return curve curve_name_list = self.wn.curve_name_list for name, curvedata in self.curves.items(): if name not in curve_name_list or self.wn.get_curve(name) is None: warnings.warn('Not all curves were used in "{}"; added with type None, units conversion left to user'.format(self.wn.name)) logger.warning('Curve was not used: "{}"; saved as curve type None and unit conversion not performed'.format(name)) create_curve(name) def _write_end(self, f, wn): f.write('[END]\n'.encode(sys_default_enc))
class _EpanetRule(object): """contains the text for an EPANET rule""" def __init__(self, ruleID, inp_units=None, mass_units=None): self.inp_units = inp_units self.mass_units = mass_units self.ruleID = ruleID self._if_clauses = [] self._then_clauses = [] self._else_clauses = [] self.priority = 0 @classmethod def parse_rules_lines(cls, lines, flow_units=FlowUnits.SI, mass_units=MassUnits.mg) -> list: rules = list() rule = None in_if = False in_then = False in_else = False new_lines = list() new_line = list() for line in lines: if isinstance(line, (tuple, list)): line = line[1] line = line.split(';')[0] words = line.strip().split() for word in words: if word.upper() in ['RULE', 'IF', 'THEN', 'ELSE', 'AND', 'OR', 'PRIORITY']: if len(new_line) > 0: text = ' '.join(new_line) new_lines.append(text) new_line = list() new_line.append(word) if len(new_line) > 0: text = ' '.join(new_line) new_lines.append(text) for line in new_lines: words = line.split() if words == []: continue if len(words) == 0: continue if words[0].upper() == 'RULE': if rule is not None: rules.append(rule) rule = _EpanetRule(words[1], flow_units, mass_units) in_if = False in_then = False in_else = False elif words[0].upper() == 'IF': in_if = True in_then = False in_else = False rule.add_if(line) elif words[0].upper() == 'THEN': in_if = False in_then = True in_else = False rule.add_then(line) elif words[0].upper() == 'ELSE': in_if = False in_then = False in_else = True rule.add_else(line) elif words[0].upper() == 'PRIORITY': in_if = False in_then = False in_else = False rule.set_priority(words[1]) elif in_if: rule.add_if(line) elif in_then: rule.add_then(line) elif in_else: rule.add_else(line) else: continue if rule is not None: rules.append(rule) return rules def from_if_then_else(self, control): """Create a rule from a Rule object""" if isinstance(control, Rule): self.ruleID = control.name self.add_control_condition(control._condition) for ct, action in enumerate(control._then_actions): if ct == 0: self.add_action_on_true(action) else: self.add_action_on_true(action, ' AND') for ct, action in enumerate(control._else_actions): if ct == 0: self.add_action_on_false(action) else: self.add_action_on_false(action, ' AND') self.set_priority(control._priority) else: raise ValueError('Invalid control type for rules: %s'%control.__class__.__name__) def add_if(self, clause): """Add an "if/and/or" clause from an INP file""" self._if_clauses.append(clause) def add_control_condition(self, condition, prefix=' IF'): """Add a ControlCondition from an IfThenElseControl""" if isinstance(condition, OrCondition): self.add_control_condition(condition._condition_1, prefix) self.add_control_condition(condition._condition_2, ' OR') elif isinstance(condition, AndCondition): self.add_control_condition(condition._condition_1, prefix) self.add_control_condition(condition._condition_2, ' AND') elif isinstance(condition, TimeOfDayCondition): fmt = '{} SYSTEM CLOCKTIME {} {}' clause = fmt.format(prefix, condition._relation.text, condition._sec_to_clock(condition._threshold)) self.add_if(clause) elif isinstance(condition, SimTimeCondition): fmt = '{} SYSTEM TIME {} {}' clause = fmt.format(prefix, condition._relation.text, condition._sec_to_hours_min_sec(condition._threshold)) self.add_if(clause) elif isinstance(condition, ValueCondition): fmt = '{} {} {} {} {} {}' # CONJ, TYPE, ID, ATTRIBUTE, RELATION, THRESHOLD attr = condition._source_attr val_si = condition._repr_value(attr, condition._threshold) if attr.lower() in ['demand']: value = '{:.6g}'.format(from_si(self.inp_units, val_si, HydParam.Demand)) elif attr.lower() in ['head', 'level']: value = '{:.6g}'.format(from_si(self.inp_units, val_si, HydParam.HydraulicHead)) elif attr.lower() in ['flow']: value = '{:.6g}'.format(from_si(self.inp_units, val_si, HydParam.Flow)) elif attr.lower() in ['pressure']: value = '{:.6g}'.format(from_si(self.inp_units, val_si, HydParam.Pressure)) elif attr.lower() in ['setting']: if isinstance(condition._source_obj, Valve): if condition._source_obj.valve_type.upper() in ['PRV', 'PBV', 'PSV']: value = from_si(self.inp_units, val_si, HydParam.Pressure) elif condition._source_obj.valve_type.upper() in ['FCV']: value = from_si(self.inp_units, val_si, HydParam.Flow) else: value = val_si else: value = val_si value = '{:.6g}'.format(value) else: # status value = val_si if isinstance(condition._source_obj, Valve): cls = 'Valve' elif isinstance(condition._source_obj, Pump): cls = 'Pump' else: cls = condition._source_obj.__class__.__name__ clause = fmt.format(prefix, cls, condition._source_obj.name, condition._source_attr, condition._relation.symbol, value) self.add_if(clause) else: raise ValueError('Unknown ControlCondition for EPANET Rules') def add_then(self, clause): """Add a "then/and" clause from an INP file""" self._then_clauses.append(clause) def add_action_on_true(self, action, prefix=' THEN'): """Add a "then" action from an IfThenElseControl""" if isinstance(action, ControlAction): fmt = '{} {} {} {} = {}' attr = action._attribute val_si = action._repr_value() if attr.lower() in ['demand']: value = '{:.6g}'.format(from_si(self.inp_units, val_si, HydParam.Demand)) elif attr.lower() in ['head', 'level']: value = '{:.6g}'.format(from_si(self.inp_units, val_si, HydParam.HydraulicHead)) elif attr.lower() in ['flow']: value = '{:.6g}'.format(from_si(self.inp_units, val_si, HydParam.Flow)) elif attr.lower() in ['pressure']: value = '{:.6g}'.format(from_si(self.inp_units, val_si, HydParam.Pressure)) elif attr.lower() in ['setting']: if isinstance(action.target()[0], Valve): if action.target()[0].valve_type.upper() in ['PRV', 'PBV', 'PSV']: value = from_si(self.inp_units, val_si, HydParam.Pressure) elif action.target()[0].valve_type.upper() in ['FCV']: value = from_si(self.inp_units, val_si, HydParam.Flow) else: value = val_si else: value = val_si value = '{:.6g}'.format(value) else: # status value = val_si if isinstance(action.target()[0], Valve): cls = 'Valve' elif isinstance(action.target()[0], Pump): cls = 'Pump' else: cls = action.target()[0].__class__.__name__ clause = fmt.format(prefix, cls, action.target()[0].name, action.target()[1], value) self.add_then(clause) def add_else(self, clause): """Add an "else/and" clause from an INP file""" self._else_clauses.append(clause) def add_action_on_false(self, action, prefix=' ELSE'): """Add an "else" action from an IfThenElseControl""" if isinstance(action, ControlAction): fmt = '{} {} {} {} = {}' attr = action._attribute val_si = action._repr_value() if attr.lower() in ['demand']: value = '{:.6g}'.format(from_si(self.inp_units, val_si, HydParam.Demand)) elif attr.lower() in ['head', 'level']: value = '{:.6g}'.format(from_si(self.inp_units, val_si, HydParam.HydraulicHead)) elif attr.lower() in ['flow']: value = '{:.6g}'.format(from_si(self.inp_units, val_si, HydParam.Flow)) elif attr.lower() in ['pressure']: value = '{:.6g}'.format(from_si(self.inp_units, val_si, HydParam.Pressure)) elif attr.lower() in ['setting']: if isinstance(action.target()[0], Valve): if action.target()[0].valve_type.upper() in ['PRV', 'PBV', 'PSV']: value = from_si(self.inp_units, val_si, HydParam.Pressure) elif action.target()[0].valve_type.upper() in ['FCV']: value = from_si(self.inp_units, val_si, HydParam.Flow) else: value = val_si else: value = val_si value = '{:.6g}'.format(value) else: # status value = val_si if isinstance(action.target()[0], Valve): cls = 'Valve' elif isinstance(action.target()[0], Pump): cls = 'Pump' else: cls = action.target()[0].__class__.__name__ clause = fmt.format(prefix, cls, action.target()[0].name, action.target()[1], value) self.add_else(clause) def set_priority(self, priority): self.priority = int(float(priority)) def __str__(self): if self.priority >= 0: if len(self._else_clauses) > 0: return 'RULE {}\n{}\n{}\n{}\n PRIORITY {}\n ; end of rule\n'.format(self.ruleID, '\n'.join(self._if_clauses), '\n'.join(self._then_clauses), '\n'.join(self._else_clauses), self.priority) else: return 'RULE {}\n{}\n{}\n PRIORITY {}\n ; end of rule\n'.format(self.ruleID, '\n'.join(self._if_clauses), '\n'.join(self._then_clauses), self.priority) else: if len(self._else_clauses) > 0: return 'RULE {}\n{}\n{}\n{}\n ; end of rule\n'.format(self.ruleID, '\n'.join(self._if_clauses), '\n'.join(self._then_clauses), '\n'.join(self._else_clauses)) else: return 'RULE {}\n{}\n{}\n ; end of rule\n'.format(self.ruleID, '\n'.join(self._if_clauses), '\n'.join(self._then_clauses)) def generate_control(self, model): condition_list = [] for line in self._if_clauses: condition = None words = line.split() if words[1].upper() == 'SYSTEM': if words[2].upper() == 'DEMAND': ### TODO: system demand pass elif words[2].upper() == 'TIME': condition = SimTimeCondition(model, words[3], ' '.join(words[4:])) else: condition = TimeOfDayCondition(model, words[3], ' '.join(words[4:])) else: attr = words[3].lower() value = ValueCondition._parse_value(words[5]) if attr.lower() in ['demand']: value = to_si(self.inp_units, value, HydParam.Demand) elif attr.lower() in ['head']: value = to_si(self.inp_units, value, HydParam.HydraulicHead) elif attr.lower() in ['level']: value = to_si(self.inp_units, value, HydParam.HydraulicHead) elif attr.lower() in ['flow']: value = to_si(self.inp_units, value, HydParam.Flow) elif attr.lower() in ['pressure']: value = to_si(self.inp_units, value, HydParam.Pressure) elif attr.lower() in ['setting']: link = model.get_link(words[2]) if isinstance(link, wntr.network.Pump): value = value elif isinstance(link, wntr.network.Valve): if link.valve_type.upper() in ['PRV', 'PBV', 'PSV']: value = to_si(self.inp_units, value, HydParam.Pressure) elif link.valve_type.upper() in ['FCV']: value = to_si(self.inp_units, value, HydParam.Flow) if words[1].upper() in ['NODE', 'JUNCTION', 'RESERVOIR', 'TANK']: condition = ValueCondition(model.get_node(words[2]), words[3].lower(), words[4].lower(), value) elif words[1].upper() in ['LINK', 'PIPE', 'PUMP', 'VALVE']: condition = ValueCondition(model.get_link(words[2]), words[3].lower(), words[4].lower(), value) else: ### FIXME: raise error pass if words[0].upper() == 'IF': condition_list.append(condition) elif words[0].upper() == 'AND': condition_list.append(condition) elif words[0].upper() == 'OR': if len(condition_list) > 0: other = condition_list[-1] condition_list.remove(other) else: ### FIXME: raise error pass conj = OrCondition(other, condition) condition_list.append(conj) final_condition = None for condition in condition_list: if final_condition is None: final_condition = condition else: final_condition = AndCondition(final_condition, condition) then_acts = [] for act in self._then_clauses: words = act.strip().split() if len(words) < 6: # TODO: raise error pass link = model.get_link(words[2]) attr = words[3].lower() value = ValueCondition._parse_value(words[5]) if attr.lower() in ['demand']: value = to_si(self.inp_units, value, HydParam.Demand) elif attr.lower() in ['head', 'level']: value = to_si(self.inp_units, value, HydParam.HydraulicHead) elif attr.lower() in ['flow']: value = to_si(self.inp_units, value, HydParam.Flow) elif attr.lower() in ['pressure']: value = to_si(self.inp_units, value, HydParam.Pressure) elif attr.lower() in ['setting']: if isinstance(link, Valve): if link.valve_type.upper() in ['PRV', 'PBV', 'PSV']: value = to_si(self.inp_units, value, HydParam.Pressure) elif link.valve_type.upper() in ['FCV']: value = to_si(self.inp_units, value, HydParam.Flow) then_acts.append(ControlAction(link, attr, value)) else_acts = [] for act in self._else_clauses: words = act.strip().split() if len(words) < 6: # TODO: raise error pass link = model.get_link(words[2]) attr = words[3].lower() value = ValueCondition._parse_value(words[5]) if attr.lower() in ['demand']: value = to_si(self.inp_units, value, HydParam.Demand) elif attr.lower() in ['head', 'level']: value = to_si(self.inp_units, value, HydParam.HydraulicHead) elif attr.lower() in ['flow']: value = to_si(self.inp_units, value, HydParam.Flow) elif attr.lower() in ['pressure']: value = to_si(self.inp_units, value, HydParam.Pressure) elif attr.lower() in ['setting']: if isinstance(link, Valve): if link.valve_type.upper() in ['PRV', 'PBV', 'PSV']: value = to_si(self.inp_units, value, HydParam.Pressure) elif link.valve_type.upper() in ['FCV']: value = to_si(self.inp_units, value, HydParam.Flow) else_acts.append(ControlAction(link, attr, value)) return Rule(final_condition, then_acts, else_acts, priority=self.priority, name=self.ruleID)
[docs] class BinFile(object): """EPANET binary output file reader. This class provides read functionality for EPANET binary output files. Parameters ---------- results_type : list of ResultType, optional This parameter is *only* active when using a subclass of the BinFile that implements a custom reader or writer, by default None. If None, then all results will be saved (node quality, demand, link flow, etc.). Otherwise, a list of result types can be passed to limit the memory used. network : bool, optional Save a new WaterNetworkModel from the description in the output binary file, by default None. Certain elements may be missing, such as patterns and curves, if this is done. energy : bool, optional Save the pump energy results, by default False. statistics : bool, optional Save the statistics lines (different from the stats flag in the inp file) that are automatically calculated regarding hydraulic conditions, by default False. convert_status : bool, optional Convert the EPANET link status (8 values) to simpler WNTR status (3 values), by default True. When this is done, the encoded-cause status values are converted simple stat values, instead. Returns ------- SimulationResults A WNTR results object will be created and added to the instance after read. """
[docs] def __init__(self, result_types=None, network=False, energy=False, statistics=False, convert_status=True): if os.name in ['nt', 'dos'] or sys.platform in ['darwin']: self.ftype = '=f4' else: self.ftype = '=f4' self.idlen = 32 self.convert_status = convert_status self.hydraulic_id = None self.quality_id = None self.node_names = None self.link_names = None self.report_times = None self.flow_units = None self.pres_units = None self.mass_units = None self.quality_type = None self.num_nodes = None self.num_tanks = None self.num_links = None self.num_pumps = None self.num_valves = None self.report_start = None self.report_step = None self.duration = None self.chemical = None self.chem_units = None self.inp_file = None self.report_file = None self.results = wntr.sim.SimulationResults() if result_types is None: self.items = [ member for name, member in ResultType.__members__.items() ] else: self.items = result_types self.create_network = network self.keep_energy = energy self.keep_statistics = statistics
def _get_time(self, t): s = int(t) h = int(s/3600) s -= h*3600 m = int(s/60) s -= m*60 s = int(s) return '{:02}:{:02}:{:02}'.format(h, m, s)
[docs] def save_network_desc_line(self, element, values): """Save network description meta-data and element characteristics. This method, by default, does nothing. It is available to be overloaded, but the core implementation assumes that an INP file exists that will have a better, human readable network description. Parameters ---------- element : str The information being saved values : numpy.array The values that go with the information """ pass
[docs] def save_energy_line(self, pump_idx, pump_name, values): """Save pump energy from the output file. This method, by default, does nothing. It is available to be overloaded in order to save information for pump energy calculations. Parameters ---------- pump_idx : int the pump index pump_name : str the pump name values : numpy.array the values to save """ pass
[docs] def finalize_save(self, good_read, sim_warnings): """Post-process data before writing results. This method, by default, does nothing. It is available to be overloaded in order to post process data. Parameters ---------- good_read : bool was the full file read correctly sim_warnings : int were there warnings issued during the simulation """ pass
# @run_lineprofile()
[docs] def read(self, filename, convergence_error=False, darcy_weisbach=False, convert=True): """Read a binary file and create a results object. Parameters ---------- filename : str An EPANET BIN output file convergence_error: bool (optional) If convergence_error is True, an error will be raised if the simulation does not converge. If convergence_error is False, partial results are returned, a warning will be issued, and results.error_code will be set to 0 if the simulation does not converge. Default = False. Returns ------- object returns a WaterNetworkResults object """ self.results = wntr.sim.SimulationResults() logger.debug('Read binary EPANET data from %s',filename) dt_str = 'u1' #.format(self.idlen) with open(filename, 'rb') as fin: ftype = self.ftype idlen = self.idlen logger.debug('... read prolog information ...') prolog = np.fromfile(fin, dtype=np.int32, count=15) magic1 = prolog[0] version = prolog[1] nnodes = prolog[2] ntanks = prolog[3] nlinks = prolog[4] npumps = prolog[5] nvalve = prolog[6] wqopt = QualType(prolog[7]) srctrace = prolog[8] flowunits = FlowUnits(prolog[9]) presunits = PressureUnits(prolog[10]) statsflag = StatisticsType(prolog[11]) reportstart = prolog[12] reportstep = prolog[13] duration = prolog[14] logger.debug('EPANET/Toolkit version %d',version) logger.debug('Nodes: %d; Tanks/Resrv: %d Links: %d; Pumps: %d; Valves: %d', nnodes, ntanks, nlinks, npumps, nvalve) logger.debug('WQ opt: %s; Trace Node: %s; Flow Units %s; Pressure Units %s', wqopt, srctrace, flowunits, presunits) logger.debug('Statistics: %s; Report Start %d, step %d; Duration=%d sec', statsflag, reportstart, reportstep, duration) # Ignore the title lines np.fromfile(fin, dtype=np.uint8, count=240) inpfile = np.fromfile(fin, dtype=np.uint8, count=260) rptfile = np.fromfile(fin, dtype=np.uint8, count=260) chemical = bytes(np.fromfile(fin, dtype=dt_str, count=self.idlen)[:]).decode(sys_default_enc) # wqunits = ''.join([chr(f) for f in np.fromfile(fin, dtype=np.uint8, count=idlen) if f!=0 ]) wqunits = bytes(np.fromfile(fin, dtype=dt_str, count=self.idlen)[:]).decode(sys_default_enc) mass = wqunits.split('/',1)[0] if mass in ['mg', 'ug', u'mg', u'ug']: massunits = MassUnits[mass] else: massunits = MassUnits.mg self.flow_units = flowunits self.pres_units = presunits self.quality_type = wqopt self.mass_units = massunits self.num_nodes = nnodes self.num_tanks = ntanks self.num_links = nlinks self.num_pumps = npumps self.num_valves = nvalve self.report_start = reportstart self.report_step = reportstep self.duration = duration self.chemical = chemical self.chem_units = wqunits self.inp_file = inpfile self.report_file = rptfile nodenames = [] linknames = [] nodenames = [bytes(np.fromfile(fin, dtype=dt_str, count=self.idlen)).decode(sys_default_enc).replace('\x00','') for _ in range(nnodes)] linknames = [bytes(np.fromfile(fin, dtype=dt_str, count=self.idlen)).decode(sys_default_enc).replace('\x00','') for _ in range(nlinks)] self.node_names = np.array(nodenames) self.link_names = np.array(linknames) linkstart = np.array(np.fromfile(fin, dtype=np.int32, count=nlinks), dtype=int) linkend = np.array(np.fromfile(fin, dtype=np.int32, count=nlinks), dtype=int) linktype = np.fromfile(fin, dtype=np.int32, count=nlinks) tankidxs = np.fromfile(fin, dtype=np.int32, count=ntanks) tankarea = np.fromfile(fin, dtype=np.dtype(ftype), count=ntanks) elevation = np.fromfile(fin, dtype=np.dtype(ftype), count=nnodes) linklen = np.fromfile(fin, dtype=np.dtype(ftype), count=nlinks) diameter = np.fromfile(fin, dtype=np.dtype(ftype), count=nlinks) """ self.save_network_desc_line('link_start', linkstart) self.save_network_desc_line('link_end', linkend) self.save_network_desc_line('link_type', linktype) self.save_network_desc_line('tank_node_index', tankidxs) self.save_network_desc_line('tank_area', tankarea) self.save_network_desc_line('node_elevation', elevation) self.save_network_desc_line('link_length', linklen) self.save_network_desc_line('link_diameter', diameter) """ logger.debug('... read energy data ...') for i in range(npumps): pidx = int(np.fromfile(fin,dtype=np.int32, count=1)[0]) energy = np.fromfile(fin, dtype=np.dtype(ftype), count=6) self.save_energy_line(pidx, linknames[pidx-1], energy) peakenergy = np.fromfile(fin, dtype=np.dtype(ftype), count=1) self.peak_energy = peakenergy logger.debug('... read EP simulation data ...') reporttimes = np.arange(reportstart, duration+reportstep-(duration%reportstep), reportstep) nrptsteps = len(reporttimes) statsN = nrptsteps if statsflag in [StatisticsType.Maximum, StatisticsType.Minimum, StatisticsType.Range]: nrptsteps = 1 reporttimes = [reportstart + reportstep] self.num_periods = nrptsteps self.report_times = reporttimes # set up results metadata dictionary """ if wqopt == QualType.Age: self.results.meta['quality_mode'] = 'AGE' self.results.meta['quality_units'] = 's' elif wqopt == QualType.Trace: self.results.meta['quality_mode'] = 'TRACE' self.results.meta['quality_units'] = '%' self.results.meta['quality_trace'] = srctrace elif wqopt == QualType.Chem: self.results.meta['quality_mode'] = 'CHEMICAL' self.results.meta['quality_units'] = wqunits self.results.meta['quality_chem'] = chemical self.results.time = reporttimes self.save_network_desc_line('report_times', reporttimes) self.save_network_desc_line('node_elevation', pd.Series(data=elevation, index=nodenames)) self.save_network_desc_line('link_length', pd.Series(data=linklen, index=linknames)) self.save_network_desc_line('link_diameter', pd.Series(data=diameter, index=linknames)) self.save_network_desc_line('stats_mode', statsflag) self.save_network_desc_line('stats_N', statsN) nodetypes = np.array(['Junction']*self.num_nodes, dtype='|S10') nodetypes[tankidxs-1] = 'Tank' nodetypes[tankidxs[tankarea==0]-1] = 'Reservoir' linktypes = np.array(['Pipe']*self.num_links) linktypes[ linktype == EN.PUMP ] = 'Pump' linktypes[ linktype > EN.PUMP ] = 'Valve' self.save_network_desc_line('link_type', pd.Series(data=linktypes, index=linknames, copy=True)) linktypes[ linktype == EN.CVPIPE ] = 'CV' linktypes[ linktype == EN.FCV ] = 'FCV' linktypes[ linktype == EN.PRV ] = 'PRV' linktypes[ linktype == EN.PSV ] = 'PSV' linktypes[ linktype == EN.PBV ] = 'PBV' linktypes[ linktype == EN.TCV ] = 'TCV' linktypes[ linktype == EN.GPV ] = 'GPV' self.save_network_desc_line('link_subtype', pd.Series(data=linktypes, index=linknames, copy=True)) self.save_network_desc_line('node_type', pd.Series(data=nodetypes, index=nodenames, copy=True)) self.save_network_desc_line('node_names', np.array(nodenames, dtype=str)) self.save_network_desc_line('link_names', np.array(linknames, dtype=str)) names = np.array(nodenames, dtype=str) self.save_network_desc_line('link_start', pd.Series(data=names[linkstart-1], index=linknames, copy=True)) self.save_network_desc_line('link_end', pd.Series(data=names[linkend-1], index=linknames, copy=True)) """ # type_list = 4*nnodes*['node'] + 8*nlinks*['link'] name_list = nodenames*4 + linknames*8 valuetype = nnodes*['demand']+nnodes*['head']+nnodes*['pressure']+nnodes*['quality'] + nlinks*['flow']+nlinks*['velocity']+nlinks*['headloss']+nlinks*['linkquality']+nlinks*['linkstatus']+nlinks*['linksetting']+nlinks*['reactionrate']+nlinks*['frictionfactor'] # tuples = zip(type_list, valuetype, name_list) tuples = list(zip(valuetype, name_list)) # tuples = [(valuetype[i], v) for i, v in enumerate(name_list)] index = pd.MultiIndex.from_tuples(tuples, names=['value','name']) try: data = np.fromfile(fin, dtype = np.dtype(ftype), count = (4*nnodes+8*nlinks)*nrptsteps) except Exception as e: logger.exception('Failed to process file: %s', e) N = int(np.floor(len(data)/(4*nnodes+8*nlinks))) if N < nrptsteps: t = reporttimes[N] if convergence_error: logger.error('Simulation did not converge at time ' + self._get_time(t) + '.') raise RuntimeError('Simulation did not converge at time ' + self._get_time(t) + '.') else: data = data[0:N*(4*nnodes+8*nlinks)] data = np.reshape(data, (N, (4*nnodes+8*nlinks))) reporttimes = reporttimes[0:N] warnings.warn('Simulation did not converge at time ' + self._get_time(t) + '.') self.results.error_code = wntr.sim.results.ResultsStatus.error else: data = np.reshape(data, (nrptsteps, (4*nnodes+8*nlinks))) self.results.error_code = None df = pd.DataFrame(data.transpose(), index =index, columns = reporttimes) df = df.transpose() self.results.node = {} self.results.link = {} self.results.network_name = self.inp_file if convert: # Node Results self.results.node['demand'] = HydParam.Demand._to_si(self.flow_units, df['demand']) self.results.node['head'] = HydParam.HydraulicHead._to_si(self.flow_units, df['head']) self.results.node['pressure'] = HydParam.Pressure._to_si(self.flow_units, df['pressure']) # Water Quality Results (node and link) if self.quality_type is QualType.Chem: self.results.node['quality'] = QualParam.Concentration._to_si(self.flow_units, df['quality'], mass_units=self.mass_units) self.results.link['quality'] = QualParam.Concentration._to_si(self.flow_units, df['linkquality'], mass_units=self.mass_units) elif self.quality_type is QualType.Age: self.results.node['quality'] = QualParam.WaterAge._to_si(self.flow_units, df['quality'], mass_units=self.mass_units) self.results.link['quality'] = QualParam.WaterAge._to_si(self.flow_units, df['linkquality'], mass_units=self.mass_units) else: self.results.node['quality'] = df['quality'] self.results.link['quality'] = df['linkquality'] # Link Results self.results.link['flowrate'] = HydParam.Flow._to_si(self.flow_units, df['flow']) self.results.link['velocity'] = HydParam.Velocity._to_si(self.flow_units, df['velocity']) headloss = np.array(df['headloss']) headloss[:, linktype < 2] = to_si(self.flow_units, headloss[:, linktype < 2], HydParam.HeadLoss) # Pipe or CV headloss[:, linktype >= 2] = to_si(self.flow_units, headloss[:, linktype >= 2], HydParam.Length) # Pump or Valve self.results.link["headloss"] = pd.DataFrame(data=headloss, columns=linknames, index=reporttimes) status = np.array(df['linkstatus']) if self.convert_status: status[status <= 2] = 0 status[status == 3] = 1 status[status >= 5] = 1 status[status == 4] = 2 self.results.link['status'] = pd.DataFrame(data=status, columns=linknames, index=reporttimes) setting = np.array(df['linksetting']) # pump setting is relative speed (unitless) setting[:, linktype == EN.PIPE] = to_si(self.flow_units, setting[:, linktype == EN.PIPE], HydParam.RoughnessCoeff, darcy_weisbach=darcy_weisbach) setting[:, linktype == EN.PRV] = to_si(self.flow_units, setting[:, linktype == EN.PRV], HydParam.Pressure) setting[:, linktype == EN.PSV] = to_si(self.flow_units, setting[:, linktype == EN.PSV], HydParam.Pressure) setting[:, linktype == EN.PBV] = to_si(self.flow_units, setting[:, linktype == EN.PBV], HydParam.Pressure) setting[:, linktype == EN.FCV] = to_si(self.flow_units, setting[:, linktype == EN.FCV], HydParam.Flow) self.results.link['setting'] = pd.DataFrame(data=setting, columns=linknames, index=reporttimes) self.results.link['friction_factor'] = df['frictionfactor'] self.results.link['reaction_rate'] = QualParam.ReactionRate._to_si(self.flow_units, df['reactionrate'],self.mass_units) else: self.results.node['demand'] = df['demand'] self.results.node['head'] = df['head'] self.results.node['pressure'] = df['pressure'] self.results.node['quality'] = df['quality'] self.results.link['flowrate'] = df['flow'] self.results.link['headloss'] = df['headloss'] self.results.link['velocity'] = df['velocity'] self.results.link['quality'] = df['linkquality'] self.results.link['status'] = df['linkstatus'] self.results.link['setting'] = df['linksetting'] self.results.link['friction_factor'] = df['frictionfactor'] self.results.link['reaction_rate'] = df['reactionrate'] logger.debug('... read epilog ...') # Read the averages and then the number of periods for checks averages = np.fromfile(fin, dtype=np.dtype(ftype), count=4) self.averages = averages np.fromfile(fin, dtype=np.int32, count=1) warnflag = np.fromfile(fin, dtype=np.int32, count=1) magic2 = np.fromfile(fin, dtype=np.int32, count=1) if magic1 != magic2: logger.critical('The magic number did not match -- binary incomplete or incorrectly read. If you believe this file IS complete, please try a different float type. Current type is "%s"',ftype) #print numperiods, warnflag, magic if warnflag != 0: logger.warning('Warnings were issued during simulation') self.finalize_save(magic1==magic2, warnflag) return self.results
[docs] class NoSectionError(Exception): pass
class _InpFileDifferHelper(object): # pragma: no cover def __init__(self, f): """ Parameters ---------- f: str """ self._f = open(f, 'r') self._num_lines = len(self._f.readlines()) self._end = self._f.tell() self._f.seek(0) @property def f(self): return self._f def iter(self, start=0, stop=None, skip_section_headings=True): if stop is None: stop = self._end f = self.f f.seek(start) while f.tell() != stop: loc = f.tell() line = f.readline() if line.startswith(';'): continue if skip_section_headings: if line.startswith('['): continue if len(line.split()) == 0: continue line = line.split(';')[0] yield loc, line def get_section(self, sec): """ Parameters ---------- sec: str The section Returns ------- start: int The starting point in the file for sec end: int The ending point in the file for sec """ start = None end = None in_sec = False for loc, line in self.iter(0, None, skip_section_headings=False): line = line.split(';')[0] if sec in line: start = loc in_sec = True elif '[' in line: if in_sec: end = loc in_sec = False break if start is None: raise NoSectionError('Could not find section ' + sec) if end is None: end = self._end return start, end def contains_section(self, sec): """ Parameters ---------- sec: str """ try: self.get_section(sec) return True except NoSectionError: return False def _convert_line(line): # pragma: no cover """ Parameters ---------- line: str Returns ------- list """ line = line.upper().split() tmp = [] for i in line: if '.' in i: try: tmp.append(float(i)) except: tmp.append(i) else: try: tmp.append(int(i)) except: tmp.append(i) return tmp def _compare_lines(line1, line2, tol=1e-14): # pragma: no cover """ Parameters ---------- line1: list of str line2: list of str Returns ------- bool """ if len(line1) != len(line2): return False for i, a in enumerate(line1): b = line2[i] if isinstance(a, (int, float)): if a != b: return False elif isinstance(a, int) and isinstance(b, int): if a != b: return False elif isinstance(a, (int, float)) and isinstance(b, (int, float)): if abs(a - b) > tol: return False else: if a != b: return False return True def _clean_line(wn, sec, line): # pragma: no cover """ Parameters ---------- wn: wntr.network.WaterNetworkModel sec: str line: list of str Returns ------- new_list: list of str """ if sec == '[JUNCTIONS]': if len(line) == 4: other = wn.options.hydraulic.pattern if other is None: other = 1 if isinstance(line[3], int) and isinstance(other, int): other = int(other) if line[3] == other: return line[:3] return line def _read_control_line(line, wn, flow_units, control_name): """ Parameters ---------- line: str wn: wntr.network.WaterNetworkModel flow_units: str control_name: str Returns ------- control_obj: Control """ line = line.split(';')[0] current = line.split() if current == []: return element_name = current[1] # For the case of leak demands if current[0] == 'JUNCTION': element = wn.get_node(element_name) else: element = wn.get_link(element_name) if current[5].upper() != 'TIME' and current[5].upper() != 'CLOCKTIME': node_name = current[5] current = [i.upper() for i in current] current[1] = element_name # don't capitalize the link name # Create the control action object status = current[2].upper() if status == 'OPEN' or status == 'OPENED' or status == 'CLOSED' or status == 'ACTIVE': setting = LinkStatus[status].value action_obj = wntr.network.ControlAction(element, 'status', setting) else: if isinstance(element, wntr.network.Pump): action_obj = wntr.network.ControlAction(element, 'base_speed', float(current[2])) elif isinstance(element, wntr.network.Valve): if element.valve_type == 'PRV' or element.valve_type == 'PSV' or element.valve_type == 'PBV': setting = to_si(flow_units, float(current[2]), HydParam.Pressure) elif element.valve_type == 'FCV': setting = to_si(flow_units, float(current[2]), HydParam.Flow) elif element.valve_type == 'TCV': setting = float(current[2]) elif element.valve_type == 'GPV': setting = current[2] else: raise ValueError('Unrecognized valve type {0} while parsing control {1}'.format(element.valve_type, line)) action_obj = wntr.network.ControlAction(element, 'setting', setting) elif isinstance(element, wntr.network.Node): if status=="TRUE": setting = True elif status=="FALSE": setting = False action_obj = wntr.network.ControlAction(element, 'leak_status', setting) else: raise RuntimeError(('Links of type {0} can only have controls that change\n'.format(type(element))+ 'the link status. Control: {0}'.format(line))) # Create the control object #control_count += 1 #control_name = 'control '+str(control_count) if 'TIME' not in current and 'CLOCKTIME' not in current: threshold = None if 'IF' in current: node = wn.get_node(node_name) if current[6] == 'ABOVE': oper = np.greater elif current[6] == 'BELOW': oper = np.less else: raise RuntimeError("The following control is not recognized: " + line) # OKAY - we are adding in the elevation. This is A PROBLEM # IN THE INP WRITER. Now that we know, we can fix it, but # if this changes, it will affect multiple pieces, just an # FYI. if node.node_type == 'Junction': threshold = to_si(flow_units, float(current[7]), HydParam.Pressure)# + node.elevation control_obj = Control._conditional_control(node, 'pressure', oper, threshold, action_obj, control_name) elif node.node_type == 'Tank': threshold = to_si(flow_units, float(current[7]), HydParam.HydraulicHead)# + node.elevation control_obj = Control._conditional_control(node, 'level', oper, threshold, action_obj, control_name) else: raise RuntimeError("The following control is not recognized: " + line) # control_name = '' # for i in range(len(current)-1): # control_name = control_name + '/' + current[i] # control_name = control_name + '/' + str(round(threshold, 2)) else: if 'CLOCKTIME' not in current: # at time if 'TIME' not in current: raise ValueError('Unrecognized line in inp file: {0}'.format(line)) if ':' in current[5]: run_at_time = int(_str_time_to_sec(current[5])) else: run_at_time = int(float(current[5])*3600) control_obj = Control._time_control(wn, run_at_time, 'SIM_TIME', False, action_obj, control_name) # control_name = '' # for i in range(len(current)-1): # control_name = control_name + '/' + current[i] # control_name = control_name + '/' + str(run_at_time) else: # at clocktime if len(current) < 7: if ':' in current[5]: run_at_time = int(_str_time_to_sec(current[5])) else: run_at_time = int(float(current[5])*3600) else: run_at_time = int(_clock_time_to_sec(current[5], current[6])) control_obj = Control._time_control(wn, run_at_time, 'CLOCK_TIME', True, action_obj, control_name) # control_name = '' # for i in range(len(current)-1): # control_name = control_name + '/' + current[i] # control_name = control_name + '/' + str(run_at_time) return control_obj def _diff_inp_files(file1, file2=None, float_tol=1e-8, max_diff_lines_per_section=5, htmldiff_file='diff.html'): # pragma: no cover """ Parameters ---------- file1: str file2: str float_tol: float max_diff_lines_per_section: int htmldiff_file: str """ wn = InpFile().read(file1) f1 = _InpFileDifferHelper(file1) if file2 is None: file2 = 'temp.inp' wn.write_inpfile(file2) f2 = _InpFileDifferHelper(file2) different_lines_1 = [] different_lines_2 = [] n = 0 for section in _INP_SECTIONS: if not f1.contains_section(section): if f2.contains_section(section): print('\tfile1 does not contain section {0} but file2 does.'.format(section)) continue start1, stop1 = f1.get_section(section) start2, stop2 = f2.get_section(section) if section == '[PATTERNS]': new_lines_1 = [] new_lines_2 = [] label = None tmp_line = None tmp_loc = None for loc1, line1 in f1.iter(start1, stop1): tmp_label = line1.split()[0] if tmp_label != label: if label is not None: new_lines_1.append((tmp_loc, tmp_line)) tmp_loc = loc1 tmp_line = line1 label = tmp_label else: tmp_line += " " + " ".join(line1.split()[1:]) if tmp_line is not None: new_lines_1.append((tmp_loc, tmp_line)) label = None tmp_line = None tmp_loc = None for loc2, line2 in f2.iter(start2, stop2): tmp_label = line2.split()[0] if tmp_label != label: if label is not None: new_lines_2.append((tmp_loc, tmp_line)) tmp_loc = loc2 tmp_line = line2 label = tmp_label else: tmp_line += " " + " ".join(line2.split()[1:]) if tmp_line is not None: new_lines_2.append((tmp_loc, tmp_line)) else: new_lines_1 = list(f1.iter(start1, stop1)) new_lines_2 = list(f2.iter(start2, stop2)) different_lines_1.append(section) different_lines_2.append(section) if len(new_lines_1) != len(new_lines_2): assert len(different_lines_1) == len(different_lines_2) n1 = 0 n2 = 0 for loc1, line1 in new_lines_1: different_lines_1.append(line1) n1 += 1 for loc2, line2 in new_lines_2: different_lines_2.append(line2) n2 += 1 if n1 > n2: n = n1 - n2 for i in range(n): different_lines_2.append("") elif n2 > n1: n = n2 - n1 for i in range(n): different_lines_1.append("") else: raise RuntimeError('Unexpected') continue section_line_counter = 0 f2_iter = iter(new_lines_2) for loc1, line1 in new_lines_1: orig_line_1 = line1 loc2, line2 = next(f2_iter) orig_line_2 = line2 line1 = _convert_line(line1) line2 = _convert_line(line2) line1 = _clean_line(wn, section, line1) line2 = _clean_line(wn, section, line2) if not _compare_lines(line1, line2, tol=float_tol): if section_line_counter < max_diff_lines_per_section: section_line_counter = section_line_counter+1 else: break different_lines_1.append(orig_line_1) different_lines_2.append(orig_line_2) if len(different_lines_1) < 200: # If lines < 200 use difflib differ = difflib.HtmlDiff() html_diff = differ.make_file(different_lines_1, different_lines_2) else: # otherwise, create a simple html file differ_df = pd.DataFrame([different_lines_1, different_lines_2], index=[file1, file2]).transpose() html_diff = differ_df.to_html() g = open(htmldiff_file, 'w') g.write(html_diff) g.close() return n