Source code for wntr.epanet.msx.io

# coding: utf-8
"""
The wntr.epanet.msx io module contains methods for reading/writing EPANET
MSX input and output files.
"""

import datetime
import logging
import sys
from typing import Union

import numpy as np
import pandas as pd
from wntr.msx.elements import Constant, Parameter, Species, Term

import wntr.network
from wntr.epanet.msx.exceptions import EpanetMsxException, MSXValueError
from wntr.epanet.util import ENcomment
from wntr.msx.base import VariableType, SpeciesType
from wntr.msx.model import MsxModel

sys_default_enc = sys.getdefaultencoding()

logger = logging.getLogger(__name__)

MAX_LINE = 1024

_INP_SECTIONS = [
    "[TITLE]",
    "[OPTIONS]",
    "[SPECIES]",
    "[COEFFICIENTS]",
    "[TERMS]",
    "[PIPES]",
    "[TANKS]",
    "[SOURCES]",
    "[QUALITY]",
    "[PARAMETERS]",
    "[DIFFUSIVITY]",
    "[PATTERNS]",
    "[REPORT]",
]


def _split_line(line):
    _vc = line.split(";", 1)
    _cmnt = None
    _vals = None
    if len(_vc) == 0:
        pass
    elif len(_vc) == 1:
        _vals = _vc[0].split()
    elif _vc[0] == "":
        _cmnt = _vc[1].strip()
    else:
        _vals = _vc[0].split()
        _cmnt = _vc[1].strip()
    return _vals, _cmnt


[docs] class MsxFile(object): """An EPANET-MSX input file reader. .. rubric:: Class Methods .. autosummary:: :nosignatures: read write """
[docs] def __init__(self): self.rxn: MsxModel = None self.sections = dict() for sec in _INP_SECTIONS: self.sections[sec] = [] self.top_comments = [] self.patterns = dict()
[docs] @classmethod def read(cls, msx_filename: str, rxn_model: MsxModel = None): """ Read an EPANET-MSX input file (.msx) and load data into a MsxModel. Only MSX 2.0 files are recognized. Parameters ---------- msx_file : str Filename of the .msx file to read in rxn_model : MsxModel, optional Multi-species water quality model to put data into, by default None (new model) Returns ------- MsxModel Multi-species water quality model with the new species, reactions and other options added """ if rxn_model is None: rxn_model = MsxModel() obj = cls() obj.rxn = rxn_model # if not isinstance(msx_file, list): # msx_file = [msx_file] rxn_model._orig_file = msx_filename obj.patterns = dict() obj.top_comments = [] obj.sections = dict() for sec in _INP_SECTIONS: obj.sections[sec] = [] def _read(): section = None lnum = 0 edata = {"fname": msx_filename} with open(msx_filename, "r", encoding=sys_default_enc) as f: for line in f: lnum += 1 edata["lnum"] = lnum line = line.strip() nwords = len(line.split()) if len(line) == 0 or nwords == 0: # Blank line continue elif line.startswith("["): vals = line.split(None, 1) sec = vals[0].upper() # Add handlers to deal with extra 'S'es (or missing 'S'es) in INP file if sec not in _INP_SECTIONS: trsec = sec.replace("]", "S]") if trsec in _INP_SECTIONS: sec = trsec if sec not in _INP_SECTIONS: trsec = sec.replace("S]", "]") if trsec in _INP_SECTIONS: sec = trsec edata["sec"] = sec if sec in _INP_SECTIONS: section = sec # logger.info('%(fname)s:%(lnum)-6d %(sec)13s section found' % edata) continue # elif sec == "[END]": # # logger.info('%(fname)s:%(lnum)-6d %(sec)13s end of file found' % edata) # section = None # break else: logger.warning('%(fname)s:%(lnum)d: Invalid section "%(sec)s"' % edata) raise EpanetMsxException(201, note="at line {}:\n{}".format(lnum, line)) elif section is None and line.startswith(";"): obj.top_comments.append(line[1:]) continue elif section is None: logger.debug("Found confusing line: %s", repr(line)) raise EpanetMsxException(201, note="at line {}:\n{}".format(lnum, line)) # We have text, and we are in a section obj.sections[section].append((lnum, line)) try: _read() obj._read_title() obj._read_options() obj._read_species() obj._read_coefficients() obj._read_terms() obj._read_pipes() obj._read_tanks() obj._read_source_dict() obj._read_quality() obj._read_parameters() obj._read_diffusivity() obj._read_patterns() obj._read_report() return obj.rxn except Exception as e: raise EpanetMsxException(200) from e
[docs] @classmethod def write(cls, filename: str, msx: MsxModel): """Write an MSX input file. Parameters ---------- filename : str Filename to write rxn : MsxModel Multi-species water quality model """ obj = cls() obj.rxn = msx with open(filename, "w") as fout: fout.write("; WNTR-reactions MSX file generated {}\n".format(datetime.datetime.now())) fout.write("\n") obj._write_title(fout) obj._write_options(fout) obj._write_species(fout) obj._write_coefficients(fout) obj._write_terms(fout) obj._write_pipes(fout) obj._write_tanks(fout) obj._write_diffusivity(fout) obj._write_parameters(fout) obj._write_patterns(fout) obj._write_report(fout) obj._write_quality(fout) obj._write_source_dict(fout)
def _read_title(self): lines = [] title = None comments = "" for lnum, line in self.sections["[TITLE]"]: vals, comment = _split_line(line) if title is None and vals is not None: title = " ".join(vals) if comment: lines.append(comment.strip()) if self.top_comments: comments = "\n".join(self.top_comments) if len(lines) > 0: comments = comments + ("\n" if comments else "") + "\n".join(lines) self.rxn.title = title self.rxn.description = comments def _read_options(self): for lnum, line in self.sections["[OPTIONS]"]: vals, comment = _split_line(line) try: if len(vals) < 2: raise EpanetMsxException(402, note="at line {} of [OPTIONS] section:\n{}".format(lnum, line)) name, val = vals[0].upper(), vals[1].upper() if name == "AREA_UNITS": self.rxn._options.area_units = val elif name == "RATE_UNITS": self.rxn._options.rate_units = val elif name == "SOLVER": self.rxn._options.solver = val elif name == "COUPLING": self.rxn._options.coupling = val elif name == "TIMESTEP": self.rxn._options.timestep = int(val) elif name == "ATOL": self.rxn._options.atol = float(val) elif name == "RTOL": self.rxn._options.rtol = float(val) elif name == "COMPILER": self.rxn._options.compiler = val elif name == "SEGMENTS": self.rxn._options.segments = int(val) elif name == "PECLET": self.rxn._options.peclet = int(val) else: raise EpanetMsxException(403, note="at line {} of [OPTIONS] section:\n{}".format(lnum, line)) except ValueError: raise EpanetMsxException(404, note="at line {} of [OPTIONS] section:\n{}".format(lnum, line)) except EpanetMsxException: raise except Exception as e: raise EpanetMsxException(201, note="at line {} of [OPTIONS] section:\n{}".format(lnum, line)) from e def _read_species(self): note = ENcomment() for lnum, line in self.sections["[SPECIES]"]: vals, comment = _split_line(line) if vals is None: if comment is not None: note.pre.append(comment) continue try: if comment is not None: note.post = comment if len(vals) < 3: raise EpanetMsxException(402, note="at line {} of [SPECIES] section:\n{}".format(lnum, line)) try: typ = SpeciesType.get(vals[0], allow_none=False) except ValueError as e: raise MSXValueError(403, vals[0], note="at line {} of [SPECIES] section:\n{}".format(lnum, line)) from e if len(vals) == 3: self.rxn.add_species(vals[1], typ, vals[2], note=note) elif len(vals) == 5: self.rxn.add_species(vals[1], typ, vals[2], float(vals[3]), float(vals[4]), note=note) else: raise EpanetMsxException(201, note="at line {} of [SPECIES] section:\n{}".format(lnum, line)) except EpanetMsxException: raise except Exception as e: raise EpanetMsxException(201, note="at line {} of [SPECIES] section:\n{}".format(lnum, line)) from e else: note = ENcomment() def _read_coefficients(self): note = ENcomment() for lnum, line in self.sections["[COEFFICIENTS]"]: vals, comment = _split_line(line) if vals is None: if comment is not None: note.pre.append(comment) continue try: if comment is not None: note.post = comment if len(vals) < 3: raise EpanetMsxException(402, note="at line {} of [COEFFICIENTS] section:\n{}".format(lnum, line)) typ = VariableType.get(vals[0], allow_none=False) if typ is VariableType.CONSTANT: self.rxn.add_constant(vals[1], float(vals[2]), note=note) elif typ is VariableType.PARAMETER: self.rxn.add_parameter(vals[1], float(vals[2]), note=note) else: raise MSXValueError(403, vals[0], note="at line {} of [COEFFICIENTS] section:\n{}".format(lnum, line)) except EpanetMsxException: raise except Exception as e: raise EpanetMsxException(201, note="at line {} of [COEFFICIENTS] section:\n{}".format(lnum, line)) from e else: note = ENcomment() def _read_terms(self): note = ENcomment() for lnum, line in self.sections["[TERMS]"]: vals, comment = _split_line(line) if vals is None: if comment is not None: note.pre.append(comment) continue try: if comment is not None: note.post = comment if len(vals) < 2: raise SyntaxError("Invalid [TERMS] entry") self.rxn.add_term(vals[0], " ".join(vals[1:]), note=note) except EpanetMsxException: raise except Exception as e: raise EpanetMsxException(201, note="at line {} of [TERMS] section:\n{}".format(lnum, line)) from e else: note = ENcomment() def _read_pipes(self): note = ENcomment() for lnum, line in self.sections["[PIPES]"]: vals, comment = _split_line(line) if vals is None: if comment is not None: note.pre.append(comment) continue try: if comment is not None: note.post = comment if len(vals) < 3: raise SyntaxError("Invalid [PIPES] entry") reaction = self.rxn.add_reaction(vals[1], "pipe", vals[0], " ".join(vals[2:]), note=note) except EpanetMsxException: raise except Exception as e: raise EpanetMsxException(201, note="at line {} of [PIPES] section:\n{}".format(lnum, line)) from e else: note = ENcomment() def _read_tanks(self): note = ENcomment() for lnum, line in self.sections["[TANKS]"]: vals, comment = _split_line(line) if vals is None: if comment is not None: note.pre.append(comment) continue try: if comment is not None: note.post = comment if len(vals) < 3: raise SyntaxError("Invalid [TANKS] entry") self.rxn.add_reaction(vals[1], "tank", vals[0], " ".join(vals[2:]), note=note) except EpanetMsxException: raise except Exception as e: raise EpanetMsxException(201, note="at line {} of [TANKS] section:\n{}".format(lnum, line)) from e else: note = ENcomment() def _read_source_dict(self): note = ENcomment() for lnum, line in self.sections["[SOURCES]"]: vals, comment = _split_line(line) if vals is None: if comment is not None: note.pre.append(comment) continue try: if comment is not None: note.post = comment if len(vals) == 4: typ, node, spec, strength = vals pat = None else: typ, node, spec, strength, pat = vals if spec not in self.rxn.reaction_system: raise ValueError("Undefined species in [SOURCES] section: {}".format(spec)) if spec not in self.rxn.network_data.sources.keys(): self.rxn.network_data.sources[spec] = dict() source = dict(source_type=typ, strength=strength, pattern=pat, note=note) self.rxn.network_data.sources[spec][node] = source except EpanetMsxException: raise except Exception as e: raise EpanetMsxException(201, note="at line {} of [SOURCES] section:\n{}".format(lnum, line)) from e else: note = ENcomment() def _read_quality(self): note = ENcomment() for lnum, line in self.sections["[QUALITY]"]: vals, comment = _split_line(line) if vals is None: if comment is not None: note.pre.append(comment) continue try: if comment is not None: note.post = comment if len(vals) == 4: cmd, netid, spec, concen = vals else: cmd, spec, concen = vals if cmd[0].lower() not in ["g", "n", "l"]: raise SyntaxError("Unknown first word in [QUALITY] section") if spec not in self.rxn.reaction_system.species: raise ValueError("Undefined species in [QUALITY] section: {}".format(spec)) # FIXME: check for existence # if spec not in self.rxn.net.initial_quality: # self.rxn.net.new_quality_values(spec) # self.rxn._inital_qual_dict[spec]["global"] = None # self.rxn._inital_qual_dict[spec]["nodes"] = dict() # self.rxn._inital_qual_dict[spec]["links"] = dict() if cmd[0].lower() == "g": self.rxn.network_data.initial_quality[spec].global_value = float(concen) elif cmd[0].lower() == "n": self.rxn.network_data.initial_quality[spec].node_values[netid] = float(concen) elif cmd[1].lower() == "l": self.rxn.network_data.initial_quality[spec].link_values[netid] = float(concen) except EpanetMsxException: raise except Exception as e: raise EpanetMsxException(201, note="at line {} of [QUALITY] section:\n{}".format(lnum, line)) from e else: note = ENcomment() def _read_parameters(self): note = ENcomment() for lnum, line in self.sections["[PARAMETERS]"]: vals, comment = _split_line(line) if vals is None: if comment is not None: note.pre.append(comment) continue try: if comment is not None: note.post = comment typ, netid, paramid, value = vals if paramid not in self.rxn.reaction_system.parameters: raise ValueError("Invalid parameter {}".format(paramid)) value = float(value) if typ.lower()[0] == "p": self.rxn.network_data.parameter_values[paramid].pipe_values[netid] = value elif typ.lower()[0] == "t": self.rxn.network_data.parameter_values[paramid].tank_values[netid] = value else: raise ValueError("Invalid parameter type {}".format(typ)) except EpanetMsxException: raise except Exception as e: raise EpanetMsxException(201, note="at line {} of [PARAMETERS] section:\n{}".format(lnum, line)) from e else: note = ENcomment() def _read_diffusivity(self): note = ENcomment() for lnum, line in self.sections["[DIFFUSIVITY]"]: vals, comment = _split_line(line) if vals is None: if comment is not None: note.pre.append(comment) continue try: if comment is not None: note.post = comment if len(vals) != 2: raise SyntaxError("Invalid [DIFFUSIVITIES] entry") species = self.rxn.reaction_system.species[vals[0]] species.diffusivity = float(vals[1]) except EpanetMsxException: raise except Exception as e: raise EpanetMsxException(201, note="at line {} of [DIFFUSIVITIES] section:\n{}".format(lnum, line)) from e else: note = ENcomment() def _read_patterns(self): _patterns = dict() for lnum, line in self.sections["[PATTERNS]"]: # read the lines for each pattern -- patterns can be multiple lines of arbitrary length line = line.split(";")[0] current = line.split() if current == []: continue pattern_name = current[0] if pattern_name not in _patterns: _patterns[pattern_name] = [] for i in current[1:]: _patterns[pattern_name].append(float(i)) else: for i in current[1:]: _patterns[pattern_name].append(float(i)) for pattern_name, pattern in _patterns.items(): # add the patterns to the water newtork model self.rxn.network_data.add_pattern(pattern_name, pattern) def _read_report(self): note = ENcomment() for lnum, line in self.sections["[REPORT]"]: vals, comment = _split_line(line) if vals is None: if comment is not None: note.pre.append(comment) continue try: if comment is not None: note.post = comment if len(vals) == 0: continue if len(vals) < 2: raise SyntaxError("Invalid number of arguments in [REPORT] section") cmd = vals[0][0].lower() if cmd == "n": # NODES if self.rxn._options.report.nodes is None: if vals[1].upper() == "ALL": self.rxn._options.report.nodes = "ALL" else: self.rxn._options.report.nodes = list() self.rxn._options.report.nodes.extend(vals[1:]) elif isinstance(self.rxn._options.report.nodes, list): if vals[1].upper() == "ALL": self.rxn._options.report.nodes = "ALL" else: self.rxn._options.report.nodes.extend(vals[1:]) elif cmd == "l": # LINKS if self.rxn._options.report.links is None: if vals[1].upper() == "ALL": self.rxn._options.report.links = "ALL" else: self.rxn._options.report.links = list() self.rxn._options.report.links.extend(vals[1:]) elif isinstance(self.rxn._options.report.links, list): if vals[1].upper() == "ALL": self.rxn._options.report.links = "ALL" else: self.rxn._options.report.links.extend(vals[1:]) elif cmd == "f": self.rxn._options.report.report_filename = vals[1] elif cmd == "p": self.rxn._options.report.pagesize = vals[1] elif cmd == "s": if vals[1] not in self.rxn.reaction_system.species: raise ValueError("Undefined species in [REPORT] section: {}".format(vals[1])) self.rxn._options.report.species[vals[1]] = True if vals[2].lower().startswith("y") else False if len(vals) == 4: self.rxn._options.report.species_precision[vals[1]] = int(vals[3]) else: raise SyntaxError("Invalid syntax in [REPORT] section: unknown first word") except EpanetMsxException: raise except Exception as e: raise EpanetMsxException(201, note="at line {} of [REPORT] section:\n{}".format(lnum, line)) from e else: note = ENcomment() def _write_title(self, fout): fout.write("[TITLE]\n") fout.write(" {}\n".format(self.rxn.title)) fout.write("\n") def _write_options(self, fout): opts = self.rxn._options fout.write("[OPTIONS]\n") fout.write(" AREA_UNITS {}\n".format(opts.area_units.upper())) fout.write(" RATE_UNITS {}\n".format(opts.rate_units.upper())) fout.write(" SOLVER {}\n".format(opts.solver.upper())) fout.write(" COUPLING {}\n".format(opts.coupling.upper())) fout.write(" TIMESTEP {}\n".format(opts.timestep)) fout.write(" ATOL {}\n".format(opts.atol)) fout.write(" RTOL {}\n".format(opts.rtol)) fout.write(" COMPILER {}\n".format(opts.compiler.upper())) fout.write(" SEGMENTS {}\n".format(opts.segments)) fout.write(" PECLET {}\n".format(opts.peclet)) fout.write("\n") def _write_species(self, fout): fout.write("[SPECIES]\n") def to_msx_string(spec: Species) -> str: tols = spec.get_tolerances() if tols is None: tolstr = "" else: tolstr = " {:12.6g} {:12.6g}".format(*tols) return "{:<12s} {:<8s} {:<8s}{:s}".format( spec.species_type.name.upper(), spec.name, spec.units, tolstr, ) for var in self.rxn.reaction_system.species.values(): if isinstance(var.note, ENcomment): fout.write("{}\n".format(var.note.wrap_msx_string(to_msx_string(var)))) elif isinstance(var.note, str): fout.write(" {} ; {}\n".format(to_msx_string(var), var.note)) else: fout.write(" {}\n".format(to_msx_string(var))) fout.write("\n") def _write_coefficients(self, fout): fout.write("[COEFFICIENTS]\n") def to_msx_string(coeff: Union[Constant, Parameter]) -> str: # if self.units is not None: # post = r' ; {"units"="' + str(self.units) + r'"}' # else: post = "" return "{:<12s} {:<8s} {:<16s}{}".format( coeff.var_type.name.upper(), coeff.name, str(coeff.global_value if isinstance(coeff, Parameter) else coeff.value), post, ) for var in self.rxn.reaction_system.constants.values(): if isinstance(var.note, ENcomment): fout.write("{}\n".format(var.note.wrap_msx_string(to_msx_string(var)))) elif isinstance(var.note, str): fout.write(" {} ; {}\n".format(to_msx_string(var), var.note)) else: fout.write(" {}\n".format(to_msx_string(var))) for var in self.rxn.reaction_system.parameters.values(): if isinstance(var.note, ENcomment): fout.write("{}\n".format(var.note.wrap_msx_string(to_msx_string(var)))) elif isinstance(var.note, str): fout.write(" {} ; {}\n".format(to_msx_string(var), var.note)) else: fout.write(" {}\n".format(to_msx_string(var))) fout.write("\n") def _write_terms(self, fout): fout.write("[TERMS]\n") def to_msx_string(term: Term) -> str: return "{:<8s} {:<64s}".format(term.name, term.expression) for var in self.rxn.reaction_system.terms.values(): if isinstance(var.note, ENcomment): fout.write("{}\n".format(var.note.wrap_msx_string(to_msx_string(var)))) elif isinstance(var.note, str): fout.write(" {} ; {}\n".format(to_msx_string(var), var.note)) else: fout.write(" {}\n".format(to_msx_string(var))) fout.write("\n") def _write_pipes(self, fout): fout.write("[PIPES]\n") for spec in self.rxn.reaction_system.species.values(): var = spec.pipe_reaction if var is None: raise MSXValueError(507, note=" species {}".format(str(spec))) if isinstance(var.note, ENcomment): fout.write( "{}\n".format( var.note.wrap_msx_string( "{:<12s} {:<8s} {:<32s}".format(var.expression_type.name.upper(), str(var.species_name), var.expression) ) ) ) elif isinstance(var.note, str): fout.write( " {} ; {}\n".format( "{:<12s} {:<8s} {:<32s}".format(var.expression_type.name.upper(), str(var.species_name), var.expression), var.note, ) ) else: fout.write( " {}\n".format( "{:<12s} {:<8s} {:<32s}".format(var.expression_type.name.upper(), str(var.species_name), var.expression) ) ) fout.write("\n") def _write_tanks(self, fout): fout.write("[TANKS]\n") for spec in self.rxn.reaction_system.species.values(): if spec.species_type.name == 'WALL': continue try: var = spec.tank_reaction except KeyError: logger.warn('Species {} does not have a tank reaction - this may be a problem'.format(str(spec))) continue if var is None: raise MSXValueError(508, note=" species {}".format(str(spec))) if isinstance(var.note, ENcomment): fout.write( "{}\n".format( var.note.wrap_msx_string( "{:<12s} {:<8s} {:<32s}".format(var.expression_type.name.upper(), str(var.species_name), var.expression) ) ) ) elif isinstance(var.note, str): fout.write( " {} ; {}\n".format( "{:<12s} {:<8s} {:<32s}".format(var.expression_type.name.upper(), str(var.species_name), var.expression), var.note, ) ) else: fout.write( " {}\n".format( "{:<12s} {:<8s} {:<32s}".format(var.expression_type.name.upper(), str(var.species_name), var.expression) ) ) fout.write("\n") def _write_source_dict(self, fout): fout.write("[SOURCES]\n") for species in self.rxn.network_data.sources.keys(): for node, src in self.rxn.network_data.sources[species].items(): if isinstance(src["note"], ENcomment): fout.write( src["note"].wrap_msx_string( "{:<10s} {:<8s} {:<8s} {:12s} {:<12s}".format( src["source_type"], node, species, src["strength"], src["pattern"] if src["pattern"] is not None else "", ) ) ) elif isinstance(src["note"], str): fout.write( " {:<10s} {:<8s} {:<8s} {} {:<12s} ; {}\n".format( src["source_type"], node, species, src["strength"], src["pattern"] if src["pattern"] is not None else "", src["note"], ) ) else: fout.write( " {:<10s} {:<8s} {:<8s} {} {:<12s}\n".format( src["source_type"], node, species, src["strength"], src["pattern"] if src["pattern"] is not None else "", ) ) if src["note"] is not None: fout.write("\n") fout.write("\n") def _write_quality(self, fout): fout.write("[QUALITY]\n") for species, val in self.rxn.network_data.initial_quality.items(): for typ in ["node_values", "link_values"]: for node, conc in getattr(val, typ).items(): fout.write(" {:<8s} {:<8s} {:<8s} {}\n".format(typ.upper()[0:4], node, species, conc)) if val.global_value: fout.write(" {:<8s} {:<8s} {}\n".format("GLOBAL", species, val.global_value)) fout.write("\n") def _write_parameters(self, fout): fout.write("[PARAMETERS]\n") for name, var in self.rxn.network_data.parameter_values.items(): had_entries = False for pipeID, value in var.pipe_values.items(): fout.write(" PIPE {:<8s} {:<8s} {}\n".format(pipeID, name, value)) had_entries = True for tankID, value in var.tank_values.items(): fout.write(" PIPE {:<8s} {:<8s} {}\n".format(tankID, name, value)) had_entries = True if had_entries: fout.write("\n") fout.write("\n") def _write_patterns(self, fout): fout.write("[PATTERNS]\n") for pattern_name, pattern in self.rxn.network_data.patterns.items(): num_columns = 10 count = 0 for i in pattern: # .multipliers: if count % num_columns == 0: fout.write("\n {:<8s} {:g}".format(pattern_name, i)) else: fout.write(" {:g}".format(i)) count += 1 fout.write("\n") fout.write("\n") def _write_diffusivity(self, fout): fout.write("[DIFFUSIVITY]\n") for name in self.rxn.species_name_list: spec: Species = self.rxn.reaction_system.species[name] if spec.diffusivity is not None: fout.write(" {:<8s} {}\n".format(name, spec.diffusivity)) fout.write("\n") def _write_report(self, fout): fout.write("[REPORT]\n") if self.rxn._options.report.nodes is not None: if isinstance(self.rxn._options.report.nodes, str): fout.write(" NODES {}\n".format(self.rxn.options.report.nodes)) else: fout.write(" NODES {}\n".format(" ".join(self.rxn.options.report.nodes))) if self.rxn._options.report.links is not None: if isinstance(self.rxn._options.report.links, str): fout.write(" LINKS {}\n".format(self.rxn.options.report.links)) else: fout.write(" LINKS {}\n".format(" ".join(self.rxn.options.report.links))) for spec, val in self.rxn._options.report.species.items(): fout.write( " SPECIES {:<8s} {:<3s} {}\n".format( spec, "YES" if val else "NO", self.rxn._options.report.species_precision[spec] if spec in self.rxn._options.report.species_precision.keys() else "", ) ) if self.rxn._options.report.report_filename: fout.write(" FILE {}\n".format(self.rxn._options.report.report_filename)) if self.rxn._options.report.pagesize: fout.write(" PAGESIZE {}\n".format(self.rxn._options.report.pagesize)) fout.write("\n")
[docs] def MsxBinFile(filename, wn, res = None): duration = int(wn.options.time.duration) if res is None: from wntr.sim.results import SimulationResults res = SimulationResults() with open(filename, "rb") as fin: ftype = "=f4" idlen = 32 prolog = np.fromfile(fin, dtype=np.int32, count=6) magic1 = prolog[0] version = prolog[1] nnodes = prolog[2] nlinks = prolog[3] nspecies = prolog[4] reportstep = prolog[5] species_list = [] node_list = wn.node_name_list link_list = wn.link_name_list # print(magic1, version, nnodes, nlinks, nspecies, reportstep) species_mass = [] for i in range(nspecies): species_len = np.fromfile(fin, dtype=np.int32, count=1)[0] # print(species_len) species_name = "".join(chr(f) for f in np.fromfile(fin, dtype=np.uint8, count=species_len) if f != 0) # print(species_name) species_list.append(species_name) species_mass.append("".join(chr(f) for f in np.fromfile(fin, dtype=np.uint8, count=16) if f != 0)) timerange = range(0, duration + 1, reportstep) tr = len(timerange) row1 = ["node"] * nnodes * len(species_list) + ["link"] * nlinks * len(species_list) row2 = [] for i in [nnodes, nlinks]: for j in species_list: row2.append([j] * i) row2 = [y for x in row2 for y in x] row3 = [node_list for i in species_list] + [link_list for i in species_list] row3 = [y for x in row3 for y in x] tuples = list(zip(row1, row2, row3)) index = pd.MultiIndex.from_tuples(tuples, names=["type", "species", "name"]) try: data = np.fromfile(fin, dtype=np.dtype(ftype), count=tr * (len(species_list * (nnodes + nlinks)))) data = np.reshape(data, (tr, len(species_list * (nnodes + nlinks)))) except Exception as e: print(e) print("oops") postlog = np.fromfile(fin, dtype=np.int32, count=4) offset = postlog[0] numreport = postlog[1] errorcode = postlog[2] magicnew = postlog[3] if errorcode != 0: print(f"ERROR CODE: {errorcode}") print(offset, numreport) df_fin = pd.DataFrame(index=index, columns=timerange).transpose() if magic1 == magicnew: # print("Magic# Match") df_fin = pd.DataFrame(data.transpose(), index=index, columns=timerange) df_fin = df_fin.transpose() else: print("Magic#s do not match!") for species in species_list: res.node[species] = df_fin['node'][species] res.link[species] = df_fin['link'][species] return res