"""
Steady-state system model of *hynet*.
"""
import logging
import abc
import numpy as np
from scipy.sparse import kron, coo_matrix
from hynet.types_ import (hynet_int_,
hynet_float_,
hynet_complex_,
hynet_sparse_,
ConstraintType)
from hynet.utilities.base import (create_sparse_matrix,
create_sparse_diag_matrix,
create_dense_vector,
Timer)
from hynet.utilities.worker import workers
from hynet.utilities.graph import eliminate_parallel_edges
from hynet.qcqp.problem import QCQPPoint, ObjectiveFunction, Constraint, QCQP
from hynet.qcqp.result import QCQPResult
from hynet.scenario.representation import Scenario
from hynet.scenario.capability import ConverterCapRegion
_log = logging.getLogger(__name__)
def _call_and_concatenate(constraint_generator):
"""Calls the constraint generator and concatenates its result."""
constraint_type, constraints = constraint_generator()
if len(constraints) == 1:
constraints = constraints[0]
else:
constraints = np.concatenate(constraints)
return constraint_type, constraints
[docs]class SystemModel(abc.ABC):
"""
System model for a steady-state scenario of a grid.
Based on the specification of a scenario via a ``Scenario`` object, this
class provides the methods to generate the corresponding system model
equations and constraints. The state variables in this system model
are the bus voltage vector ``v``, the converter state vector ``f``, the
injector state vector ``s``, and the auxiliary variable vector ``z``. The
latter is actually not part of the system model serves for auxiliary
purposes, e.g., for the reformulation of the piecewise linear active and
reactive power cost functions of injectors. All state variables are
considered in p.u., i.e., normalized.
This class is designed as an abstract base class for specific problem
formulations, like the optimal power flow problem, and serves as a builder
for the optimization problem and as a factory for the associated result
objects. Central to this process are the following member functions:
- ``get_problem``:
Returns the associated optimization problem as a quadratically
constrained quadratic problem (QCQP). The construction of this
QCQP object is defined by the following member functions:
(a) ``_get_objective``
Returns the objective function object.
(b) ``_get_constraint_generators``
Returns the constraint generation functions.
(c) ``get_v_bounds``:
Returns the bounds on the magnitudes of the elements of the
bus voltage vector ``v``.
(d) ``get_f_bounds``:
Returns the bounds on the elements of the converter state
vector ``f``.
(e) ``get_s_bounds``:
Returns the bounds on the elements of the injector state
vector ``s``.
(f) ``get_z_bounds``:
Returns the bounds on the elements of the auxiliary variable
vector ``z``.
The methods (a) and (b) are abstract and must be implemented in a
derived class to specify the optimization problem. While (c) - (e) are
part of the system model and should not need any customization, (f)
may be overridden. By default, the auxiliary variables are used for
the reformulation of the piecewise linear active and reactive power
cost functions of injectors into epigraph form as implemented in the
method ``get_cost_epigraph_constraints``. To customize the use of the
auxiliary variables, ``dim_z`` and ``get_z_bounds`` (as well as
``get_normalization_factors``) may be overridden in a derived class.
Please note that the coefficient matrices of the constraints for the
quadratic expressions in ``v`` must exhibit a sparsity pattern that
corresponds to the network graph of the grid. If this is not fulfilled
by a derived class, ``get_problem`` must be overridden accordingly.
- ``create_result``:
This method serves as a factory for an object that appropriately
represents an optimization result for the model. It is abstract and
must be implemented in a derived class.
See Also
--------
hynet.scenario.representation.Scenario:
Specification of a steady-state grid scenario.
hynet.system.result.SystemResult:
Result of a system-model-related optimization.
hynet.system.calc.calc:
Calculate the solution of the optimization problem of a model.
"""
def __init__(self, scenario, verify_scenario=True):
"""
Initialize the model with the scenario data.
Parameters
----------
scenario : Scenario
Steady-state scenario data for the model.
verify_scenario : bool, optional
If ``True`` (default), an integrity and validity check is performed
on the provided scenario data (see ``Scenario.verify``). In case it
is ensured beforehand that the provided data is consistent and
valid, this check may be skipped to improve performance.
"""
if verify_scenario:
scenario.verify(log=None)
timer = Timer()
self._scr = scenario
(self._Y, self._Y_src, self._Y_dst) = \
SystemModel._get_admittance_matrices(self._scr)
self._flow_constraint_normalization = \
self._select_flow_constraint_normalization(scenario)
self._cost_scaling = 1.0
self._converter_loss_error_tolerance = 5e-4
self._nodal_balance_error_tolerance = 2e-3
self._total_balance_error_tolerance = 1e-3
_log.debug("System model creation ({:.3f} sec.)"
.format(timer.total()))
@property
def cost_function_scaling(self):
"""
Return the cost function scaling used by ``get_cost_epigraph_constraints``.
To improve the numerical conditioning of the an optimization problem
with the injector cost functions in the objective and, therewith,
mitigate numerical issues with the solver, the cost functions may be
scaled. This scaling must be considered when including other terms in
the objective, e.g., a loss penalty.
"""
return self._cost_scaling
@staticmethod
def _get_admittance_matrices(scenario): # pylint: disable=too-many-locals
"""Return the bus, source, and destination admittance matrix."""
N_E = scenario.num_branches
N_V = scenario.num_buses
e_src = scenario.e_src.to_numpy()
e_dst = scenario.e_dst.to_numpy()
rho_src = scenario.branch['rho_src'].to_numpy()
rho_dst = scenario.branch['rho_dst'].to_numpy()
rho = np.multiply(rho_src.conj(), rho_dst)
y_tld = scenario.bus['y_tld'].to_numpy()
y_src = scenario.branch['y_src'].to_numpy()
y_dst = scenario.branch['y_dst'].to_numpy()
y_bar = 1 / scenario.branch['z_bar'].to_numpy()
alpha_src = np.multiply(np.square(np.abs(rho_src)), y_bar + y_src)
alpha_dst = np.multiply(np.square(np.abs(rho_dst)), y_bar + y_dst)
beta_src = -np.multiply(rho, y_bar)
beta_dst = -np.multiply(rho.conj(), y_bar)
alpha = y_tld + create_dense_vector(e_src, alpha_src, N_V) \
+ create_dense_vector(e_dst, alpha_dst, N_V)
Y_src = create_sparse_matrix(range(N_E), e_src, alpha_src, N_E, N_V) \
+ create_sparse_matrix(range(N_E), e_dst, beta_src, N_E, N_V)
Y_dst = create_sparse_matrix(range(N_E), e_dst, alpha_dst, N_E, N_V) \
+ create_sparse_matrix(range(N_E), e_src, beta_dst, N_E, N_V)
Y = create_sparse_diag_matrix(alpha) \
+ create_sparse_matrix(e_src, e_dst, beta_src, N_V, N_V) \
+ create_sparse_matrix(e_dst, e_src, beta_dst, N_V, N_V)
# REMARK: The admittance matrices are primarily employed in row slicing
# operations. For performance reasons, we thus use the CSR format here.
return Y.tocsr(), Y_src.tocsr(), Y_dst.tocsr()
@property
def scenario(self):
"""Return the scenario data of the system model."""
return self._scr
@property
def dim_v(self):
"""Return the dimension of the state variable ``v``."""
return self._scr.num_buses
@property
def dim_f(self):
"""Return the dimension of the state variable ``f``."""
return 4*self._scr.num_converters
@property
def dim_s(self):
"""Return the dimension of the state variable ``s``."""
return 2*self._scr.num_injectors
@property
def dim_z(self):
"""Return the dimension of the state variable ``z``."""
return 2*self._scr.num_injectors
@property
def Y(self):
"""Return the bus admittance matrix."""
return self._Y
@property
def Y_src(self):
"""Return the source admittance matrix."""
return self._Y_src
@property
def Y_dst(self):
"""Return the destination admittance matrix."""
return self._Y_dst
@property
def converter_loss_error_tolerance(self):
"""Return the converter loss error tolerance in MW."""
return self._converter_loss_error_tolerance
@converter_loss_error_tolerance.setter
def converter_loss_error_tolerance(self, value):
"""Set the converter loss error tolerance in MW."""
if not (np.isnan(value) or value > 0):
raise ValueError("The tolerance must be positive or numpy.nan.")
self._converter_loss_error_tolerance = value
@property
def nodal_balance_error_tolerance(self):
"""
Return the relative nodal power balance error tolerance.
Threshold on the ratio of the maximum nodal apparent power balance
error and the maximum individual apparent power load to consider a
solution as physically valid.
"""
return self._nodal_balance_error_tolerance
@nodal_balance_error_tolerance.setter
def nodal_balance_error_tolerance(self, value):
"""
Set the relative nodal power balance error tolerance.
Threshold on the ratio of the maximum nodal apparent power balance
error and the maximum individual apparent power load to consider a
solution as physically valid.
"""
if not (np.isnan(value) or value > 0):
raise ValueError("The tolerance must be positive or numpy.nan.")
self._nodal_balance_error_tolerance = value
@property
def total_balance_error_tolerance(self):
"""
Return the relative total power balance error tolerance.
Threshold on the ratio of the total active power balance error and the
total active power load to consider a solution as physically valid.
"""
return self._total_balance_error_tolerance
@total_balance_error_tolerance.setter
def total_balance_error_tolerance(self, value):
"""
Set the relative total power balance error tolerance.
Threshold on the ratio of the total active power balance error and the
total active power load to consider a solution as physically valid.
"""
if not (np.isnan(value) or value > 0):
raise ValueError("The tolerance must be positive or numpy.nan.")
self._total_balance_error_tolerance = value
[docs] def get_normalization_factors(self):
"""
Return the normalization factors of the state variables.
Returns
-------
factors : QCQPPoint
Contains the normalization factors of the state variables in the
attributes ``v``, ``f``, ``s``, and ``z``.
"""
return QCQPPoint(v=1.0,
f=1 / self._scr.base_mva,
s=1 / self._scr.base_mva,
z=self._cost_scaling)
@abc.abstractmethod
def _get_constraint_generators(self):
"""
Return a list with all constraint generation functions.
These constraint generation functions are used to generate the
constraints of the QCQP. Implement this method in a derived class to
the return a list with all constraint generation functions.
**Caution:** Please note that the *power balance constraints* and the
*real-part constraints* must be part of *all derived models*. (The
latter is explained by the state space relaxation to obtain the unified
formulation, see [1]_.)
Returns
-------
list[numpy.ndarray[Constraint]]
References
----------
.. [1] M. Hotz and W. Utschick, "hynet: An Optimal Power Flow Framework
for Hybrid AC/DC Power Systems," in IEEE Trans. Power Systems,
vol. 35, no. 2, pp. 1036-1047, Mar. 2020.
"""
@abc.abstractmethod
def _get_objective(self):
"""
Return the objective function of the QCQP.
Implement this method in a derived class to generate the objective.
Returns
-------
ObjectiveFunction
"""
@staticmethod
def _filter_constraints(constraints, constraint_type):
"""
Filter the constraints by type and concatenate the constraint arrays.
"""
filtered_constraints = \
[array for type_, array in constraints if type_ == constraint_type]
return np.concatenate(filtered_constraints)
[docs] def get_problem(self):
"""
Return the optimization problem that is associated with this model.
Returns
-------
qcqp : QCQP
QCQP specification for the optimization problem associated with
this model.
"""
timer = Timer()
# Constraint generation (using parallel processing)
constraints = workers.map(_call_and_concatenate,
self._get_constraint_generators())
# Variable bounds
v_lb, v_ub = self.get_v_bounds()
f_lb, f_ub = self.get_f_bounds()
s_lb, s_ub = self.get_s_bounds()
z_lb, z_ub = self.get_z_bounds()
qcqp = QCQP(obj_func=self._get_objective(),
eq_crt=self._filter_constraints(constraints,
ConstraintType.EQUALITY),
ineq_crt=self._filter_constraints(constraints,
ConstraintType.INEQUALITY),
lb=QCQPPoint(v_lb, f_lb, s_lb, z_lb),
ub=QCQPPoint(v_ub, f_ub, s_ub, z_ub),
edges=eliminate_parallel_edges((self._scr.e_src.to_numpy(),
self._scr.e_dst.to_numpy())),
roots=self._scr.get_ref_buses(),
normalization=self.get_normalization_factors())
_log.debug("QCQP creation ({:.3f} sec.)".format(timer.total()))
return qcqp
[docs] @abc.abstractmethod
def create_result(self, qcqp_result, total_time=np.nan, qcqp_result_pre=None):
"""
Create and return a result object associated with this model.
This method serves as a factory for a result object. Implement this
method in a derived class to return an object of an appropriately
customized result class.
Parameters
----------
qcqp_result : QCQPResult
Solution of the QCQP associated with this model.
total_time : .hynet_float_, optional
Total time for solving the optimization problem,
cf. ``hynet.system.calc.calc``.
qcqp_result_pre : QCQPResult, optional
Pre-solution of the model's QCQP for converter mode detection.
Returns
-------
result : hynet.system.result.SystemResult
"""
@staticmethod
def _select_flow_constraint_normalization(scenario):
"""
Return the system flow constraint normalization factor.
When applying a solver to the (OPF) problem, the achievable accuracy
and/or the number of iterations usually depends on the conditioning of
the optimization problem. Unfortunately, the OPF problem is a tough
guy in that regard. In the description of the hybrid AC/DC power
system model (cf. [1]_ and Appendix B in [2]_), it can be observed that
the elements of the constraint matrices are either roughly close to 1,
in the range of the series admittance of the branches (for the power
balance constraints), or in the range of the series admittance squared
(for the ampacity constraints). As the modulus of the smallest series
admittance in a system is often rather large, this typically leads to
several orders of magnitude of difference of the elements of the
constraint matrices and, potentially, of the sensitivity of the
constraint function values to changes in the bus voltage variables.
This frequently leads to numerical issues in the employed solvers,
especially for large-scale systems. To reduce the dynamic range in the
constraint matrices, the power balance constraints are scaled by a
normalization factor and the ampacity constraints by the square of the
same normalization factor. This method returns an appropriately
selected normalization factor.
References
----------
.. [1] M. Hotz and W. Utschick, "hynet: An Optimal Power Flow Framework
for Hybrid AC/DC Power Systems," in IEEE Trans. Power Systems,
vol. 35, no. 2, pp. 1036-1047, Mar. 2020.
.. [2] M. Hotz and W. Utschick, "A Hybrid Transmission Grid
Architecture Enabling Efficient Optimal Power Flow," in IEEE
Trans. Power Systems, vol. 31, no. 6, pp. 4504-4516, Nov. 2016.
See Also
--------
hynet.system.model.SystemModel.get_balance_constraints
hynet.system.model.SystemModel.get_source_ampacity_constraints
hynet.system.model.SystemModel.get_destination_ampacity_constraints
"""
warning_threshold = 1e-4
if scenario.num_branches == 0:
return 1.0
scaling = np.mean(np.abs(scenario.branch['z_bar'].to_numpy()))
if scaling < warning_threshold:
_log.warning("The mean modulus of the series impedance is very "
"small ({:e} p.u.). This may cause numerical issues."
.format(scaling))
scaling = warning_threshold
return scaling
def _get_balance_coefficients(self):
"""Return the coefficients for the power balance constraints."""
N_V = self._scr.num_buses
N_C = self._scr.num_converters
N_I = self._scr.num_injectors
c_src = self._scr.c_src.to_numpy()
c_dst = self._scr.c_dst.to_numpy()
n_src = self._scr.n_src.to_numpy()
scaling = self._flow_constraint_normalization
load = self._scr.bus['load'].to_numpy() / self._scr.base_mva
loss_fix = self._scr.converter['loss_fix'].to_numpy() / self._scr.base_mva
loss_fwd = self._scr.converter['loss_fwd'].to_numpy() / 100
loss_bwd = self._scr.converter['loss_bwd'].to_numpy() / 100
# Accumulate static losses of converters at their respective source bus
load += create_dense_vector(c_src, loss_fix, N_V)
# Standard basis vectors in R^2 and R^4
e1_R2 = create_sparse_matrix([0], [0], [1], 2, 1, dtype=hynet_float_)
e2_R2 = create_sparse_matrix([1], [0], [1], 2, 1, dtype=hynet_float_)
e1_R4 = create_sparse_matrix([0], [0], [1], 4, 1, dtype=hynet_float_)
e2_R4 = create_sparse_matrix([1], [0], [1], 4, 1, dtype=hynet_float_)
e3_R4 = create_sparse_matrix([2], [0], [1], 4, 1, dtype=hynet_float_)
e4_R4 = create_sparse_matrix([3], [0], [1], 4, 1, dtype=hynet_float_)
# Prepare calculation of vectors p_n and q_n
A_src = create_sparse_matrix(range(N_C), c_src, np.ones(N_C), N_C, N_V,
dtype=hynet_float_)
A_dst = create_sparse_matrix(range(N_C), c_dst, np.ones(N_C), N_C, N_V,
dtype=hynet_float_)
B_src = create_sparse_matrix(range(N_C), c_src, 1 - loss_bwd, N_C, N_V,
dtype=hynet_float_)
B_dst = create_sparse_matrix(range(N_C), c_dst, 1 - loss_fwd, N_C, N_V,
dtype=hynet_float_)
# CSC format for efficient column slicing
c_p = (kron(A_src, e1_R4) - kron(B_src, e2_R4) +
kron(A_dst, e2_R4) - kron(B_dst, e1_R4)).tocsc()
c_q = (-kron(A_src, e3_R4) - kron(A_dst, e4_R4)).tocsc()
# Prepare calculation of vectors for injectors
H = create_sparse_matrix(range(N_I), n_src, -np.ones(N_I), N_I, N_V,
dtype=hynet_float_)
# CSC format for efficient column slicing
a_p = kron(H, e1_R2).tocsc()
a_q = kron(H, e2_R2).tocsc()
# Constraint scaling to improve conditioning
# (See ``_select_flow_constraint_normalization`` for more details)
Y = self._Y * scaling
c_p *= scaling
c_q *= scaling
a_p *= scaling
a_q *= scaling
load *= scaling
# Coefficient matrices P_n = C_p[n] and Q_n = C_q[n]
C_p = np.ndarray(N_V, dtype=hynet_sparse_)
C_q = np.ndarray(N_V, dtype=hynet_sparse_)
for n in range(N_V):
# REMARK to the construction of the matrices P_n and Q_n: To keep
# the number of temporary matrices minimal, the construction of
# P_n = (S_n + S_n^H) / 2 and Q_n = (S_n - S_n^H) / 2j utilizes the
# fact that S_n consists of the conjugate transpose of the n-th row
# of Y in its n-th column.
y_n = Y[n, :].tocoo()
idx_row = np.concatenate([y_n.col, y_n.row + n])
idx_col = np.concatenate([y_n.row + n, y_n.col])
C_p[n] = create_sparse_matrix(idx_row, idx_col,
np.concatenate([y_n.data.conj(),
y_n.data]) / 2,
N_V, N_V, dtype=hynet_complex_)
C_q[n] = create_sparse_matrix(idx_row, idx_col,
np.concatenate([y_n.data.conj(),
-y_n.data]) / 2j,
N_V, N_V, dtype=hynet_complex_)
return C_p, C_q, c_p, c_q, a_p, a_q, load, scaling
[docs] def get_balance_constraints(self):
"""Return the active and reactive power balance constraints."""
timer = Timer()
N_V = self._scr.num_buses
index = self._scr.bus.index
crt_p = np.ndarray(N_V, dtype=Constraint)
crt_q = np.ndarray(N_V, dtype=Constraint)
C_p, C_q, c_p, c_q, a_p, a_q, load, scaling = \
self._get_balance_coefficients()
# CAVEAT: For better performance, the type conversion in the loop below
# is made via .tocoo(). Update the code if the format has changed.
assert hynet_sparse_ is coo_matrix
for n in range(N_V):
# Active power balance
crt_p[n] = Constraint(name='bal_p',
table='bus',
id=index[n],
C=C_p[n],
c=c_p[:, n].tocoo(),
a=a_p[:, n].tocoo(),
r=None,
b=-load[n].real,
scaling=scaling / self._scr.base_mva)
# Reactive power balance
crt_q[n] = Constraint(name='bal_q',
table='bus',
id=index[n],
C=C_q[n],
c=c_q[:, n].tocoo(),
a=a_q[:, n].tocoo(),
r=None,
b=-load[n].imag,
scaling=scaling / self._scr.base_mva)
_log.debug("Power balance constraints ({:.3f} sec.)"
.format(timer.total()))
return ConstraintType.EQUALITY, [crt_p, crt_q]
[docs] def get_voltage_constraints(self):
"""Return the voltage magnitude lower and upper bound constraints."""
timer = Timer()
N_V = self._scr.num_buses
index = self._scr.bus.index
v_min_squared = np.square(self._scr.bus['v_min'].to_numpy())
v_max_squared = np.square(self._scr.bus['v_max'].to_numpy())
crt_v_min = np.ndarray(N_V, dtype=Constraint)
crt_v_max = np.ndarray(N_V, dtype=Constraint)
for n in range(N_V):
M_n = create_sparse_matrix([n], [n], [1], N_V, N_V,
dtype=hynet_float_)
# Lower bound
crt_v_min[n] = Constraint(name='v_min',
table='bus',
id=index[n],
C=-M_n,
c=None,
a=None,
r=None,
b=-v_min_squared[n],
scaling=1.0)
# Upper bound
crt_v_max[n] = Constraint(name='v_max',
table='bus',
id=index[n],
C=M_n,
c=None,
a=None,
r=None,
b=v_max_squared[n],
scaling=1.0)
_log.debug("Voltage constraints ({:.3f} sec.)"
.format(timer.total()))
return ConstraintType.INEQUALITY, [crt_v_min, crt_v_max]
[docs] def get_source_ampacity_constraints(self):
"""Return the source ampacity constraints."""
# REMARK: If all branches are rated, the generation of the ampacity
# constraints typically dominates the computational cost of constraint
# generation. To mitigate this in case of parallel processing (default),
# the generation of the ampacity constraints at the source and
# destination bus are divided into two separate constraint generators.
# The computational effort for the common preprocessing (essentially the
# computation of I_max_squared and M) that is duplicated is negligible.
# The duplication of some lines of code is hoped to be forgiven ;)
timer = Timer()
N_E = self._scr.num_branches
index = self._scr.branch.index
scaling = self._flow_constraint_normalization ** 2
# CAVEAT: For better performance, the type conversion in the loop below
# is made via .tocoo(). Update the code if the format has changed.
assert hynet_sparse_ is coo_matrix
# Maximum current is defined via apparent power flow rating at 1 p.u.
rating = self._scr.branch['rating'].to_numpy()
I_max_squared = np.square(rating / self._scr.base_mva)
# Allocate arrays for (non-omitted) constraint objects
M = np.count_nonzero(~np.isnan(I_max_squared))
crt_i_max_src = np.ndarray(M, dtype=Constraint)
# Constraint scaling to improve conditioning
# (See ``_select_flow_constraint_normalization`` for more details)
Y_src = self._Y_src * np.sqrt(scaling)
I_max_squared *= scaling
m = 0
for k in range(N_E):
if np.isnan(I_max_squared[k]):
continue
# Flow limit at the source bus
Y_src_k = Y_src[k, :]
I_src_k = Y_src_k.conj().transpose().dot(Y_src_k)
crt_i_max_src[m] = Constraint(name='i_max_src',
table='branch',
id=index[k],
C=I_src_k.tocoo(),
c=None,
a=None,
r=None,
b=I_max_squared[k],
scaling=scaling)
m += 1
_log.debug("Source ampacity constraints ({:.3f} sec.)"
.format(timer.total()))
return ConstraintType.INEQUALITY, [crt_i_max_src]
[docs] def get_destination_ampacity_constraints(self):
"""Return the destination ampacity constraints."""
timer = Timer()
N_E = self._scr.num_branches
index = self._scr.branch.index
scaling = self._flow_constraint_normalization ** 2
# CAVEAT: For better performance, the type conversion in the loop below
# is made via .tocoo(). Update the code if the format has changed.
assert hynet_sparse_ is coo_matrix
# Maximum current is defined via apparent power flow rating at 1 p.u.
rating = self._scr.branch['rating'].to_numpy()
I_max_squared = np.square(rating / self._scr.base_mva)
# Allocate arrays for (non-omitted) constraint objects
M = np.count_nonzero(~np.isnan(I_max_squared))
crt_i_max_dst = np.ndarray(M, dtype=Constraint)
# Constraint scaling to improve conditioning
# (See ``_select_flow_constraint_normalization`` for more details)
Y_dst = self._Y_dst * np.sqrt(scaling)
I_max_squared *= scaling
m = 0
for k in range(N_E):
if np.isnan(I_max_squared[k]):
continue
# Flow limit at the destination bus
Y_dst_k = Y_dst[k, :]
I_dst_k = Y_dst_k.conj().transpose().dot(Y_dst_k)
crt_i_max_dst[m] = Constraint(name='i_max_dst',
table='branch',
id=index[k],
C=I_dst_k.tocoo(),
c=None,
a=None,
r=None,
b=I_max_squared[k],
scaling=scaling)
m += 1
_log.debug("Destination ampacity constraints ({:.3f} sec.)"
.format(timer.total()))
return ConstraintType.INEQUALITY, [crt_i_max_dst]
[docs] def get_drop_constraints(self):
"""Return the voltage drop lower and upper bound constraints."""
timer = Timer()
N_V = self._scr.num_buses
N_E = self._scr.num_branches
index = self._scr.branch.index
e_src = self._scr.e_src.to_numpy()
e_dst = self._scr.e_dst.to_numpy()
nu_min = self._scr.branch['drop_min'].to_numpy() / 100
nu_max = self._scr.branch['drop_max'].to_numpy() / 100
crt_drop_min = np.ndarray(np.count_nonzero(~np.isnan(nu_min)),
dtype=Constraint)
crt_drop_max = np.ndarray(np.count_nonzero(~np.isnan(nu_max)),
dtype=Constraint)
m_min = m_max = 0
for k in range(N_E):
src_dst = [e_src[k], e_dst[k]]
# Lower bound on voltage drop
if not np.isnan(nu_min[k]):
M_min_k = create_sparse_matrix(src_dst, src_dst,
[(1+nu_min[k]) ** 2, -1],
N_V, N_V, dtype=hynet_float_)
crt_drop_min[m_min] = Constraint(name='drop_min',
table='branch',
id=index[k],
C=M_min_k,
c=None,
a=None,
r=None,
b=0.0,
scaling=1.0)
m_min += 1
# Upper bound on voltage drop
if not np.isnan(nu_max[k]):
M_max_k = create_sparse_matrix(src_dst, src_dst,
[-(1+nu_max[k]) ** 2, 1],
N_V, N_V, dtype=hynet_float_)
crt_drop_max[m_max] = Constraint(name='drop_max',
table='branch',
id=index[k],
C=M_max_k,
c=None,
a=None,
r=None,
b=0.0,
scaling=1.0)
m_max += 1
_log.debug("Voltage drop constraints ({:.3f} sec.)"
.format(timer.total()))
return ConstraintType.INEQUALITY, [crt_drop_min, crt_drop_max]
[docs] def get_real_part_constraints(self):
"""
Return the "real-part" constraints.
These constraints ensure a precondition assumed by the voltage angle
difference constraints, i.e., that the voltage angle difference is
limited to +/- 90 degrees.
"""
timer = Timer()
N_V = self._scr.num_buses
N_E = self._scr.num_branches
index = self._scr.branch.index
e_src = self._scr.e_src.to_numpy()
e_dst = self._scr.e_dst.to_numpy()
crt_real_part = np.ndarray(N_E, dtype=Constraint)
for k in range(N_E):
A_k = create_sparse_matrix([e_src[k], e_dst[k]],
[e_dst[k], e_src[k]],
[-1, -1], N_V, N_V, dtype=hynet_float_)
crt_real_part[k] = Constraint(name='real_part',
table='branch',
id=index[k],
C=A_k,
c=None,
a=None,
r=None,
b=0.0,
scaling=1.0)
_log.debug("Angle 'real-part' constraints ({:.3f} sec.)"
.format(timer.total()))
return ConstraintType.INEQUALITY, [crt_real_part]
[docs] def get_angle_constraints(self):
"""
Return the angle diff. lower and upper bound and "real-part" constraints.
"""
timer = Timer()
N_V = self._scr.num_buses
N_E = self._scr.num_branches
index = self._scr.branch.index
e_src = self._scr.e_src.to_numpy()
e_dst = self._scr.e_dst.to_numpy()
tan_delta_min = np.tan(self._scr.branch['angle_min'].to_numpy() * np.pi/180)
tan_delta_max = np.tan(self._scr.branch['angle_max'].to_numpy() * np.pi/180)
crt_angle_min = np.ndarray(np.count_nonzero(~np.isnan(tan_delta_min)),
dtype=Constraint)
crt_angle_max = np.ndarray(np.count_nonzero(~np.isnan(tan_delta_max)),
dtype=Constraint)
m_min = m_max = 0
for k in range(N_E):
src_dst = [e_src[k], e_dst[k]]
dst_src = src_dst[::-1]
# Lower bound on angle difference
if not np.isnan(tan_delta_min[k]):
A_min_k = create_sparse_matrix(src_dst, dst_src,
[tan_delta_min[k]+1j,
tan_delta_min[k]-1j],
N_V, N_V, dtype=hynet_complex_)
crt_angle_min[m_min] = Constraint(name='angle_min',
table='branch',
id=index[k],
C=A_min_k,
c=None,
a=None,
r=None,
b=0.0,
scaling=1.0)
m_min += 1
# Upper bound on angle difference
if not np.isnan(tan_delta_max[k]):
A_max_k = create_sparse_matrix(src_dst, dst_src,
[-tan_delta_max[k]-1j,
-tan_delta_max[k]+1j],
N_V, N_V, dtype=hynet_complex_)
crt_angle_max[m_max] = Constraint(name='angle_max',
table='branch',
id=index[k],
C=A_max_k,
c=None,
a=None,
r=None,
b=0.0,
scaling=1.0)
m_max += 1
_log.debug("Angle difference constraints ({:.3f} sec.)"
.format(timer.total()))
return ConstraintType.INEQUALITY, [crt_angle_min, crt_angle_max]
[docs] def get_converter_polyhedron_constraints(self):
"""Return the converter capability region polyhedron constraints."""
timer = Timer()
N_C = self._scr.num_converters
index = self._scr.converter.index
cap_src = self._scr.converter['cap_src'].to_numpy()
cap_dst = self._scr.converter['cap_dst'].to_numpy()
loss_fwd = self._scr.converter['loss_fwd'].to_numpy() / 100
loss_bwd = self._scr.converter['loss_bwd'].to_numpy() / 100
crt_conv_poly = np.ndarray((0,), dtype=Constraint)
idx_col = np.zeros(4, dtype=hynet_int_)
for l in range(N_C):
A_src, b_src, name_src = cap_src[l].get_polyhedron('src', loss_bwd[l])
A_dst, b_dst, name_dst = cap_dst[l].get_polyhedron('dst', loss_fwd[l])
A = np.vstack((A_src, A_dst))
b = np.concatenate((b_src, b_dst))
name = ['src_' + x for x in name_src] + ['dst_' + x for x in name_dst]
crt = np.ndarray(len(b), dtype=Constraint)
idx_row = np.arange(4*l, 4*(l+1))
# Create constraint objects for A*f_l <= b
# (As f_l is normalized by base_mva, b is adjusted accordingly)
for i in range(len(b)):
crt[i] = Constraint(name='cap_' + name[i],
table='converter',
id=index[l],
C=None,
c=create_sparse_matrix(idx_row, idx_col,
A[i, :], 4*N_C, 1,
dtype=hynet_float_),
a=None,
r=None,
b=b[i] / self._scr.base_mva,
scaling=1 / self._scr.base_mva)
crt_conv_poly = np.concatenate((crt_conv_poly, crt))
_log.debug("Converter polyhedron constraints ({:.3f} sec.)"
.format(timer.total()))
return ConstraintType.INEQUALITY, [crt_conv_poly]
[docs] def get_injector_polyhedron_constraints(self):
"""Return the injector capability region polyhedron constraints."""
timer = Timer()
N_I = self._scr.num_injectors
index = self._scr.injector.index
cap = self._scr.injector['cap'].to_numpy()
crt_inj_poly = np.ndarray((0,), dtype=Constraint)
for j in range(N_I):
A, b, name = cap[j].get_polyhedron()
crt = np.ndarray(len(b), dtype=Constraint)
# Create constraint objects for A*s_j <= b
# (As s_j is normalized by base_mva, b is adjusted accordingly)
for i in range(len(b)):
crt[i] = Constraint(name='cap_' + name[i],
table='injector',
id=index[j],
C=None,
c=None,
a=create_sparse_matrix([2*j, 2*j + 1],
[0, 0], A[i, :],
2*N_I, 1,
dtype=hynet_float_),
r=None,
b=b[i] / self._scr.base_mva,
scaling=1 / self._scr.base_mva)
crt_inj_poly = np.concatenate((crt_inj_poly, crt))
_log.debug("Injector polyhedron constraints ({:.3f} sec.)"
.format(timer.total()))
return ConstraintType.INEQUALITY, [crt_inj_poly]
[docs] def get_cost_epigraph_constraints(self):
"""Return the PWL cost function epigraph constraints."""
timer = Timer()
N_I = self._scr.num_injectors
index = self._scr.injector.index
cost_p = self._scr.injector['cost_p'].to_numpy()
cost_q = self._scr.injector['cost_q'].to_numpy()
crt_inj_epi = np.ndarray((0,), dtype=Constraint)
a_scaling = self._scr.base_mva * self._cost_scaling
for j in range(N_I):
if cost_p[j] is not None:
(A, b) = cost_p[j].get_epigraph_polyhedron()
crt = np.ndarray(len(b), dtype=Constraint)
# Create constraint objects for z_p_l*1 >= A*s_p_l + b
for i in range(len(b)):
crt[i] = Constraint(name='inj_p_epi_' + str(i),
table=None,
id=index[j],
C=None,
c=None,
a=create_sparse_matrix([2*j], [0],
[A[i] * a_scaling],
self.dim_s, 1,
dtype=hynet_float_),
r=create_sparse_matrix([2*j], [0], [-1],
self.dim_z, 1,
dtype=hynet_float_),
b=-b[i] * self._cost_scaling,
scaling=self._cost_scaling)
crt_inj_epi = np.concatenate((crt_inj_epi, crt))
if cost_q[j] is not None:
(A, b) = cost_q[j].get_epigraph_polyhedron()
crt = np.ndarray(len(b), dtype=Constraint)
# Create constraint objects for z_q_l*1 >= A*s_q_l + b
for i in range(len(b)):
crt[i] = Constraint(name='inj_q_epi_' + str(i),
table=None,
id=index[j],
C=None,
c=None,
a=create_sparse_matrix([2*j + 1], [0],
[A[i] * a_scaling],
self.dim_s, 1,
dtype=hynet_float_),
r=create_sparse_matrix([2*j + 1], [0], [-1],
self.dim_z, 1,
dtype=hynet_float_),
b=-b[i] * self._cost_scaling,
scaling=self._cost_scaling)
crt_inj_epi = np.concatenate((crt_inj_epi, crt))
_log.debug("Cost function epigraph constraints ({:.3f} sec.)"
.format(timer.total()))
return ConstraintType.INEQUALITY, [crt_inj_epi]
[docs] def get_v_bounds(self):
"""
Return the voltage magnitude bounds ``v_lb <= |v| <= v_ub``.
**Remark:** The voltage magnitude bounds are captured by
``get_voltage_constraints`` and, thus, this box constraint is actually
redundant and only included to provide optimization variable bounds
for the solver (which, for some solvers, can improve convergence).
To avoid any impact on the dual variables of the voltage magnitude
constraints (which e.g. was observed with MOSEK), these box constraints
are loosened w.r.t. the limits employed in ``get_voltage_constraints``.
"""
gap = 0.1 * self._scr.bus['v_max'].to_numpy() # Loosened by 10% of the UB
v_lb = self._scr.bus['v_min'].to_numpy() - gap
v_ub = self._scr.bus['v_max'].to_numpy() + gap
return v_lb, v_ub
[docs] def get_f_bounds(self):
"""Return the converter state bounds ``f_lb <= f <= f_ub``."""
N_C = self._scr.num_converters
cap_src = self._scr.converter['cap_src'].to_numpy()
cap_dst = self._scr.converter['cap_dst'].to_numpy()
loss_fwd = self._scr.converter['loss_fwd'].to_numpy() / 100
loss_bwd = self._scr.converter['loss_bwd'].to_numpy() / 100
f_lb = np.zeros(4*N_C, dtype=hynet_float_)
f_ub = np.zeros(4*N_C, dtype=hynet_float_)
for l in range(N_C):
(f_lb[4*l:4*(l+1)], f_ub[4*l:4*(l+1)]) = \
ConverterCapRegion.get_box_constraint(cap_src[l],
cap_dst[l],
loss_fwd[l],
loss_bwd[l])
return f_lb / self._scr.base_mva, f_ub / self._scr.base_mva
[docs] def get_s_bounds(self):
"""Return the injector state bounds ``s_lb <= s <= s_ub``."""
N_I = self._scr.num_injectors
cap = self._scr.injector['cap'].to_numpy()
s_lb = np.zeros(2*N_I, dtype=hynet_float_)
s_ub = np.zeros(2*N_I, dtype=hynet_float_)
for j in range(N_I):
s_lb[2*j:2*(j+1)] = (cap[j].p_min, cap[j].q_min)
s_ub[2*j:2*(j+1)] = (cap[j].p_max, cap[j].q_max)
return s_lb / self._scr.base_mva, s_ub / self._scr.base_mva
[docs] def get_z_bounds(self):
"""
Return the auxiliary variable bounds ``z_lb <= z <= z_ub``.
The lower and upper bound is set to ``numpy.nan`` if the corresponding
bound should be omitted.
"""
N_I = self._scr.num_injectors
cost_p = self._scr.injector['cost_p'].to_numpy()
cost_q = self._scr.injector['cost_q'].to_numpy()
z_lb = np.nan * np.ones(self.dim_z, dtype=hynet_float_)
z_ub = np.nan * np.ones(self.dim_z, dtype=hynet_float_)
for j in range(N_I):
if cost_p[j] is None:
z_lb[2*j] = z_ub[2*j] = 0
if cost_q[j] is None:
z_lb[2*j + 1] = z_ub[2*j + 1] = 0
return z_lb, z_ub
[docs] def get_dyn_loss_function(self):
"""
Return ``(C, c)`` for the total dyn. loss ``L(v,f) = v^H C v + c^T f``.
"""
e1_R4 = create_sparse_matrix([0], [0], [1], 4, 1, dtype=hynet_float_)
e2_R4 = create_sparse_matrix([1], [0], [1], 4, 1, dtype=hynet_float_)
loss_fwd = self._scr.converter['loss_fwd'].to_numpy() / 100
loss_bwd = self._scr.converter['loss_bwd'].to_numpy() / 100
C = (self._Y + self._Y.conj().transpose()) / 2
c = kron(loss_fwd.reshape((-1, 1)), e1_R4) + \
kron(loss_bwd.reshape((-1, 1)), e2_R4)
return (hynet_sparse_(C * self._scr.base_mva),
hynet_sparse_(c * self._scr.base_mva))
[docs] def calc_dynamic_losses(self, operating_point):
"""
Return the total dynamic losses in MW for the given operating point.
Parameters
----------
operating_point : QCQPPoint
Operating point of the system *without* the normalization of the
state variables, i.e., as provided by a QCQP result.
"""
v = operating_point.v
f = operating_point.f / self._scr.base_mva
(C, c) = self.get_dyn_loss_function()
return (v.conj().dot(C.dot(v)) + c.transpose().dot(f)).item().real
[docs] def calc_shunt_apparent_power(self, operating_point):
"""
Return the shunt apparent power in MVA for the given operating point.
Parameters
----------
operating_point : QCQPPoint
Operating point of the system *without* the normalization of the
state variables, i.e., as provided by a QCQP result.
"""
v = operating_point.v
y_tld = self._scr.bus['y_tld'].to_numpy()
s_shunt = np.multiply(y_tld.conj(), np.square(np.abs(v)))
return s_shunt * self._scr.base_mva
[docs] def calc_branch_flow(self, operating_point):
"""
Return the flow on the branches for the given operating point.
Parameters
----------
operating_point : QCQPPoint
Operating point of the system *without* the normalization of the
state variables, i.e., as provided by a QCQP result.
Returns
-------
i_src : numpy.ndarray[.hynet_complex_]
Branch current flow in p.u. at the source bus.
i_dst : numpy.ndarray[.hynet_complex_]
Branch current flow in p.u. at the destination bus.
s_src : numpy.ndarray[.hynet_complex_]
Branch apparent power flow in MVA at the source bus.
s_dst : numpy.ndarray[.hynet_complex_]
Branch apparent power flow in MVA at the destination bus.
"""
v = operating_point.v
e_src = self._scr.e_src.to_numpy()
e_dst = self._scr.e_dst.to_numpy()
i_src = self._Y_src.dot(v)
i_dst = self._Y_dst.dot(v)
s_src = np.multiply(v[e_src], i_src.conj()) * self._scr.base_mva
s_dst = np.multiply(v[e_dst], i_dst.conj()) * self._scr.base_mva
return i_src, i_dst, s_src, s_dst
[docs] def calc_branch_voltage_drop(self, operating_point):
"""
Return the relative voltage magnitude drop along the branches.
Parameters
----------
operating_point : QCQPPoint
Operating point of the system *without* the normalization of the
state variables, i.e., as provided by a QCQP result.
"""
v = operating_point.v
e_src = self._scr.e_src.to_numpy()
e_dst = self._scr.e_dst.to_numpy()
v_drop = np.true_divide(np.abs(v[e_dst]), np.abs(v[e_src])) - 1
return v_drop
[docs] def calc_branch_angle_difference(self, operating_point):
"""
Return the voltage angle difference along the branches in degrees.
Parameters
----------
operating_point : QCQPPoint
Operating point of the system *without* the normalization of the
state variables, i.e., as provided by a QCQP result.
"""
v = operating_point.v
e_src = self._scr.e_src.to_numpy()
e_dst = self._scr.e_dst.to_numpy()
angle_diff = np.angle(np.multiply(v[e_src].conj(), v[e_dst])) * 180/np.pi
return angle_diff
[docs] def calc_branch_effective_rating(self, operating_point):
"""
Return the ampacity rating in MVA rating at the current bus voltages.
In the scenario data, the branch flow is limited by an ampacity rating
that is specified in terms of a long-term MVA rating at a bus voltage
of 1 p.u.. Correspondingly, the ampacity rating translates to a
different MVA rating if the bus voltages differ from 1 p.u.. This
function returns the ampacity rating in MVA at the current bus
voltages, i.e., the rating at 1 p.u. times the actual voltage.
Parameters
----------
operating_point : QCQPPoint
Operating point of the system *without* the normalization of the
state variables, i.e., as provided by a QCQP result.
"""
v_abs = np.abs(operating_point.v)
e_src = self._scr.e_src.to_numpy()
e_dst = self._scr.e_dst.to_numpy()
effective_rating = np.multiply(self._scr.branch['rating'].to_numpy(),
np.maximum(v_abs[e_src], v_abs[e_dst]))
return effective_rating
[docs] def calc_converter_flow(self, operating_point):
"""
Return the apparent power flow into the converters.
Parameters
----------
operating_point : QCQPPoint
Operating point of the system *without* the normalization of the
state variables, i.e., as provided by a QCQP result.
Returns
-------
s_src : numpy.ndarray[.hynet_complex_]
Apparent power flow in MVA into the converter at the source bus.
s_dst : numpy.ndarray[.hynet_complex_]
Apparent power flow in MVA into the converter at the destination bus.
"""
p_fwd = operating_point.f[0::4]
p_bwd = operating_point.f[1::4]
q_src = operating_point.f[2::4]
q_dst = operating_point.f[3::4]
loss_fwd = self._scr.converter['loss_fwd'].to_numpy() / 100
loss_bwd = self._scr.converter['loss_bwd'].to_numpy() / 100
s_src = p_fwd - np.multiply(1 - loss_bwd, p_bwd) - 1j*q_src
s_dst = p_bwd - np.multiply(1 - loss_fwd, p_fwd) - 1j*q_dst
return s_src, s_dst
[docs] def calc_converter_loss_error(self, operating_point):
"""
Return the converter loss error.
Parameters
----------
operating_point : QCQPPoint
Operating point of the system *without* the normalization of the
state variables, i.e., as provided by a QCQP result.
Returns
-------
loss_err : numpy.ndarray[.hynet_float_]
Loss error in MW due to noncomplementary modes of the converter.
"""
p_fwd = operating_point.f[0::4]
p_bwd = operating_point.f[1::4]
loss_fwd = self._scr.converter['loss_fwd'].to_numpy() / 100
loss_bwd = self._scr.converter['loss_bwd'].to_numpy() / 100
s_src_real = p_fwd - np.multiply(1 - loss_bwd, p_bwd)
s_dst_real = p_bwd - np.multiply(1 - loss_fwd, p_fwd)
p_fwd_net = np.maximum(s_src_real, 0.0)
p_bwd_net = np.maximum(s_dst_real, 0.0)
loss_err = np.multiply(loss_fwd, p_fwd - p_fwd_net) \
+ np.multiply(loss_bwd, p_bwd - p_bwd_net)
return loss_err
[docs] def verify_converter_loss_accuracy(self, operating_point):
"""
Return ``True`` if the converter loss error is within the tolerance.
The converter loss error tolerance of the model is specified by the
property ``converter_loss_error_tolerance``.
Parameters
----------
operating_point : QCQPPoint
Operating point of the system *without* the normalization of the
state variables, i.e., as provided by a QCQP result.
"""
loss_err = self.calc_converter_loss_error(operating_point)
if not loss_err.size:
return True
# ``numpy.nan``-compatible check against the tolerance threshold
return not (loss_err.max() > self.converter_loss_error_tolerance)
[docs] def fix_converter_modes(self, qcqp, operating_point, set_initial_point=True):
"""
Fix the converter modes in the QCQP according to the converter net flow.
This function determines the active converter mode based on the
converter net active power flow in the provided operating point and
updates the upper bounds on the converter state variable in the QCQP
to fix the converter mode accordingly. Therewith, the emergence of any
converter loss errors is suppressed.
Parameters
----------
qcqp : QCQP
QCQP specification for the optimization problem associated with
this model, see ``get_problem``.
operating_point : QCQPPoint
Operating point of the system *without* the normalization of the
state variables, i.e., as provided by a QCQP result. Generally,
this is the initial solution of the model's QCQP in which a
nonnegligible converter loss error appeared.
set_initial_point : bool, optional
If ``True`` (default), the provided operating point is set as the
initial point for the QCQP with the converter state variables
adjusted according to the net active power flow.
"""
s_src, s_dst = self.calc_converter_flow(operating_point)
p_bwd_net = np.maximum(s_dst.real, 0.0)
p_fwd_ub = qcqp.ub.f[0::4]
p_bwd_ub = qcqp.ub.f[1::4]
p_bwd_ub[p_bwd_net == 0] = 0 # Disable inactive backward modes
p_fwd_ub[p_bwd_net != 0] = 0 # Complement to inactive backward modes
# # (For zero flow, forward remains active)
qcqp.ub.f[0::4] = p_fwd_ub
qcqp.ub.f[1::4] = p_bwd_ub
# Guard against unforeseen numerical issues in mode detection...
p_fwd_lb = qcqp.lb.f[0::4]
assert np.all(p_fwd_lb[p_bwd_net != 0] == 0)
p_bwd_lb = qcqp.lb.f[1::4]
assert np.all(p_bwd_lb[p_bwd_net == 0] == 0)
if set_initial_point:
p_fwd_net = np.maximum(s_src.real, 0.0)
qcqp.initial_point = operating_point.copy()
qcqp.initial_point.f[0::4] = p_fwd_net
qcqp.initial_point.f[1::4] = p_bwd_net
qcqp.initial_point.scale(qcqp.normalization)
[docs] def calc_injection_costs(self, operating_point):
"""
Return the injector's active and reactive power injection costs in dollars.
Parameters
----------
operating_point : QCQPPoint
Operating point of the system *without* the normalization of the
state variables, i.e., as provided by a QCQP result.
Returns
-------
cost_p : numpy.ndarray[.hynet_float_]
Cost of the active power injection in dollars.
cost_q : numpy.ndarray[.hynet_float_]
Cost of the reactive power injection in dollars.
"""
s = operating_point.s[0::2] + 1j*operating_point.s[1::2]
cost_p = np.nan * np.ones(self._scr.num_injectors, dtype=hynet_float_)
cost_q = np.nan * np.ones(self._scr.num_injectors, dtype=hynet_float_)
cost_iter = zip(s,
self._scr.injector['cost_p'].to_numpy(),
self._scr.injector['cost_q'].to_numpy())
for j, (s_j, f_p, f_q) in enumerate(cost_iter):
if f_p is not None:
cost_p[j] = f_p.evaluate(s_j.real)
if f_q is not None:
cost_q[j] = f_q.evaluate(s_j.imag)
return cost_p, cost_q
[docs] def verify_power_balance_accuracy(self, bal_err):
"""
Return ``True`` if the power balance error is within the tolerance.
The power balance error tolerance of the model is specified by the
properties ``nodal_balance_error_tolerance`` and
``total_balance_error_tolerance``.
Parameters
----------
bal_err : numpy.ndarray[.hynet_complex_]
Power balance error in MVA at the individual buses.
"""
max_bal_err = np.abs(bal_err).max()
max_load = self.scenario.bus['load'].abs().max()
if max_bal_err / max_load > self.nodal_balance_error_tolerance:
return False
total_bal_err = np.sum(np.abs(bal_err.real))
total_load = np.sum(np.abs(self.scenario.bus['load'].to_numpy().real))
if total_bal_err / total_load > self.total_balance_error_tolerance:
return False
return True