Source code for wntr.sim.solvers
"""Generic mathematical solver classes.
"""
import numpy as np
import scipy.sparse as sp
import warnings
import logging
import enum
import time
warnings.filterwarnings(
"error", "Matrix is exactly singular", sp.linalg.MatrixRankWarning
)
np.set_printoptions(precision=3, threshold=10000, linewidth=300)
logger = logging.getLogger(__name__)
[docs]
class SolverStatus(enum.IntEnum):
converged = 1
error = 0
[docs]
class NewtonSolver(object):
"""
Newton Solver class.
Attributes
----------
log_progress: bool
If True, the infinity norm of the constraint violation will be logged each iteration
log_level: int
The level for logging the infinity norm of the constraint violation
time_limit: float
If the wallclock time exceeds time_limit, the newton solver will exit with an error status
maxiter: int
If the number of iterations exceeds maxiter, the newton solver will exit with an error status
tol: float
The convergence tolerance. If the infinity norm of the constraint violation drops below tol,
the newton solver will exit with a converged status.
rho: float
During the line search, rho is used to reduce the stepsize. It should be strictly between 0 and 1.
bt_maxiter: int
The maximum number of line search iterations for each outer iteration
bt: bool
If False, a line search will not be used.
bt_start_iter: int
A line search will not be used for any iteration prior to bt_start_iter
"""
[docs]
def __init__(self, options=None):
"""
Parameters
----------
options: dict
A dictionary specifying options for the newton solver. Keys
should be strings in all caps. See the documentation of the
NewtonSolver attributes for details on each option. Possible
keys are:
| "LOG_PROGRESS" (NewtonSolver.log_progress)
| "LOG_LEVEL" (NewtonSolver.log_level)
| "TIME_LIMIT" (NewtonSolver.time_limit)
| "MAXITER" (NewtonSolver.maxiter)
| "TOL" (NewtonSolver.tol)
| "BT_RHO" (NewtonSolver.rho)
| "BT_MAXITER" (NewtonSolver.bt_maxiter)
| "BACKTRACKING" (NewtonSolver.bt)
| "BT_START_ITER" (NewtonSolver.bt_start_iter)
"""
if options is None:
options = {}
self._options = options
if "LOG_PROGRESS" not in self._options:
self.log_progress = False
else:
self.log_progress = self._options["LOG_PROGRESS"]
if "LOG_LEVEL" not in self._options:
self.log_level = logging.DEBUG
else:
self.log_level = self._options["LOG_LEVEL"]
if "TIME_LIMIT" not in self._options:
self.time_limit = 3600
else:
self.time_limit = self._options["TIME_LIMIT"]
if "MAXITER" not in self._options:
self.maxiter = 3000
else:
self.maxiter = self._options["MAXITER"]
if "TOL" not in self._options:
self.tol = 1e-6
else:
self.tol = self._options["TOL"]
if "BT_RHO" not in self._options:
self.rho = 0.5
else:
self.rho = self._options["BT_RHO"]
if "BT_MAXITER" not in self._options:
self.bt_maxiter = 100
else:
self.bt_maxiter = self._options["BT_MAXITER"]
if "BACKTRACKING" not in self._options:
self.bt = True
else:
self.bt = self._options["BACKTRACKING"]
if "BT_START_ITER" not in self._options:
self.bt_start_iter = 0
else:
self.bt_start_iter = self._options["BT_START_ITER"]
[docs]
def solve(self, model, ostream=None):
"""
Parameters
----------
model: wntr.aml.Model
Returns
-------
status: SolverStatus
message: str
iter_count: int
"""
t0 = time.time()
x = model.get_x()
if len(x) == 0:
return (
SolverStatus.converged,
"No variables or constraints",
0,
)
use_r_ = False
# MAIN NEWTON LOOP
for outer_iter in range(self.maxiter):
if time.time() - t0 >= self.time_limit:
return (
SolverStatus.error,
"Time limit exceeded",
outer_iter,
)
if use_r_:
r = r_
r_norm = new_norm
else:
r = model.evaluate_residuals()
r_norm = np.max(abs(r))
if self.log_progress or ostream is not None:
if outer_iter < self.bt_start_iter:
msg = f"iter: {outer_iter:<4d} norm: {r_norm:<10.2e} time: {time.time() - t0:<8.4f}"
if self.log_progress:
logger.log(self.log_level, msg)
if ostream is not None:
ostream.write(msg + "\n")
if r_norm < self.tol:
return (
SolverStatus.converged,
"Solved Successfully",
outer_iter,
)
J = model.evaluate_jacobian(x=None)
# Call Linear solver
try:
d = -sp.linalg.spsolve(J, r, permc_spec="COLAMD", use_umfpack=False)
except sp.linalg.MatrixRankWarning:
return (
SolverStatus.error,
"Jacobian is singular at iteration " + str(outer_iter),
outer_iter,
)
# Backtracking
alpha = 1.0
if self.bt and outer_iter >= self.bt_start_iter:
use_r_ = True
for iter_bt in range(self.bt_maxiter):
x_ = x + alpha * d
model.load_var_values_from_x(x_)
r_ = model.evaluate_residuals()
new_norm = np.max(abs(r_))
if new_norm < (1.0 - 0.0001 * alpha) * r_norm:
x = x_
break
else:
alpha = alpha * self.rho
if iter_bt + 1 >= self.bt_maxiter:
return (
SolverStatus.error,
"Line search failed at iteration " + str(outer_iter),
outer_iter,
)
if self.log_progress or ostream is not None:
msg = f"iter: {outer_iter:<4d} norm: {new_norm:<10.2e} alpha: {alpha:<10.2e} time: {time.time() - t0:<8.4f}"
if self.log_progress:
logger.log(self.log_level, msg)
if ostream is not None:
ostream.write(msg + "\n")
else:
x += d
model.load_var_values_from_x(x)
return (
SolverStatus.error,
"Reached maximum number of iterations: " + str(outer_iter),
outer_iter,
)