Source code for hynet.system.result

#pylint: disable=line-too-long,too-many-lines,too-many-statements
"""
Representation of a system-model-related optimization result.
"""

import logging
import os.path
from collections import OrderedDict
import functools

import numpy as np
import pandas as pd

from hynet import __version__ as hynet_version
from hynet.types_ import (hynet_float_,
                          hynet_eps,
                          BusType,
                          BranchType,
                          SolverStatus)
from hynet.utilities.base import Timer, truncate_with_ellipsis

_log = logging.getLogger(__name__)


[docs]def ensure_result_availability(func): """ Decorates a result evaluation function with a result data availability check. Functions that evaluate the result data typically require a check if the result data is available. This decorator offers a convenient and unified way to augment a result evaluation function with such a check, where the function must take a ``SystemResult``-based object as the first argument. """ @functools.wraps(func) def wrapper(result, *args, **kwargs): if result.empty: raise ValueError("The result is empty.") return func(result, *args, **kwargs) return wrapper
[docs]class SystemResult: """ Result of a system-model-related optimization. Parameters ---------- model : SystemModel Model for the processed optimization problem. empty : bool ``True`` if the object does not contain any result data and ``False`` otherwise. solver : SolverInterface Solver object by which the result was obtained. solver_status : SolverStatus Status reported by the solver. solver_time : float Duration of the call to the solver in seconds. optimal_value : float Optimal objective value or ``numpy.nan`` if the solver failed. total_time : float or numpy.nan Total time for the calculation, including the modeling, solving, and result assembly. If not provided, this time is set to ``numpy.nan``. reconstruction_mse : float Unavailable if the result is empty and, otherwise, the mean squared error of the reconstructed bus voltages in case of a relaxation and ``numpy.nan`` otherwise. bus : pandas.DataFrame, optional Unavailable if the result is empty and, otherwise, a data frame with the bus result data, indexed by the *bus ID*, which comprises at least the following columns: ``v``: (``hynet_complex_``) Bus voltage rms phasor (AC) or bus voltage magnitude (DC). ``s_shunt``: (``hynet_complex_``) Shunt apparent power in MVA. The real part constitutes the shunt losses in MW and the *negated* imaginary part constitutes the reactive power *injection*. ``bal_err``: (``hynet_complex_``) Power balance residual in MVA, i.e., the evaluation of the complex-valued power balance equation at the system state. Theoretically, this should be identical to zero, but due to a limited solver accuracy and/or inexactness of the relaxation it is only approximately zero. This residual supports the assessment of solution accuracy and validity. ``dv_bal_p``: (``hynet_float_``) Dual variable or KKT multiplier of the active power balance constraint. ``dv_bal_q``: (``hynet_float_``) Dual variable or KKT multiplier of the reactive power balance constraint. branch : pandas.DataFrame, optional Unavailable if the result is empty and, otherwise, a data frame with the branch result data, indexed by the *branch ID*, which comprises at least the following columns: ``s_src``: (``hynet_complex_``) Apparent power flow in MVA at the source bus (measured as a flow *into* the branch). ``s_dst``: (``hynet_complex_``) Apparent power flow in MVA at the destination bus (measured as a flow *into* the branch). ``i_src``: (``hynet_complex_``) Current flow in p.u. at the source bus (measured as a flow *into* the branch). ``i_dst``: (``hynet_complex_``) Current flow in p.u. at the destination bus (measured as a flow *into* the branch). ``v_drop``: (``hynet_float_``) Relative voltage magnitude drop from the source bus to the destination bus. ``angle_diff``: (``hynet_float_``) Bus voltage angle difference in degrees between the source and destination bus. ``effective_rating``: (``hynet_float_``) Ampacity in terms of a long-term MVA rating at the *actual* bus voltage. If no rating is available, it is set to ``numpy.nan``. ``rel_err``: (``hynet_float_``) Branch-related relative reconstruction error :math:`\\kappa_k(V^\\star)` as defined in equation (24) in [1]_ in case of a relaxed QCQP or ``numpy.nan`` otherwise. ``dv_real_part``: (``hynet_float_``) Dual variable or KKT multiplier of the +/-90 degrees constraint on the angle difference (cf. equation (27) in [2]_) or ``numpy.nan`` if unavailable. converter : pandas.DataFrame Unavailable if the result is empty and, otherwise, a data frame with the converter result data, indexed by the *converter ID*, which comprises at least the following columns: ``p_src``: (``hynet_float_``) Active power flow in MW at the source bus *into the converter*. ``p_dst``: (``hynet_float_``) Active power flow in MW at the destination bus *into the converter*. ``q_src``: (``hynet_float_``) Reactive power injection in Mvar at the source bus *into the grid*. ``q_dst``: (``hynet_float_``) Reactive power injection in Mvar at the destination bus *into the grid*. ``loss_err``: (``hynet_float_``) Loss error in MW due to noncomplementary modes of the converter. ``loss_err_pre``: (``hynet_float_``) Only available if the QCQP was pre-solved to detect and fix the converter modes. Loss error in MW in the pre-solution due to noncomplementary modes of the converter. ``dv_p_fwd_min``: (``hynet_float_``) Dual variable or KKT multiplier of the lower bound on the converter's forward mode active power flow or ``numpy.nan`` if unavailable. ``dv_p_fwd_max``: (``hynet_float_``) Dual variable or KKT multiplier of the upper bound on the converter's forward mode active power flow or ``numpy.nan`` if unavailable. ``dv_p_bwd_min``: (``hynet_float_``) Dual variable or KKT multiplier of the lower bound on the converter's backward mode active power flow or ``numpy.nan`` if unavailable. ``dv_p_bwd_max``: (``hynet_float_``) Dual variable or KKT multiplier of the upper bound on the converter's backward mode active power flow or ``numpy.nan`` if unavailable. ``dv_cap_src_q_min``: (``hynet_float_``) Dual variable or KKT multiplier of the reactive power lower bound of the capability region at the source bus or ``numpy.nan`` if unavailable. ``dv_cap_src_q_max``: (``hynet_float_``) Dual variable or KKT multiplier of the reactive power upper bound of the capability region at the source bus or ``numpy.nan`` if unavailable. ``dv_cap_dst_q_min``: (``hynet_float_``) Dual variable or KKT multiplier of the reactive power lower bound of the capability region at the destination bus or ``numpy.nan`` if unavailable. ``dv_cap_dst_q_max``: (``hynet_float_``) Dual variable or KKT multiplier of the reactive power upper bound of the capability region at the destination bus or ``numpy.nan`` if unavailable. injector : pandas.DataFrame Unavailable if the result is empty and, otherwise, a data frame with the injector result data, indexed by the *injector ID*, which comprises at least the following columns: ``s``: (``hynet_complex_``) Apparent power injection in MVA. ``cost_p``: (``hynet_float_``) Cost of the active power injection in dollars or ``numpy.nan`` if no cost function was provided. ``cost_q``: (``hynet_float_``) Cost of the reactive power injection in dollars or ``numpy.nan`` if no cost function was provided. ``dv_cap_p_min``: (``hynet_float_``) Dual variable or KKT multiplier of the active power lower bound of the capability region or ``numpy.nan`` if unavailable. ``dv_cap_q_min``: (``hynet_float_``) Dual variable or KKT multiplier of the reactive power lower bound of the capability region or ``numpy.nan`` if unavailable. ``dv_cap_p_max``: (``hynet_float_``) Dual variable or KKT multiplier of the active power upper bound of the capability region or ``numpy.nan`` if unavailable. ``dv_cap_q_max``: (``hynet_float_``) Dual variable or KKT multiplier of the reactive power upper bound of the capability region or ``numpy.nan`` if unavailable. References ---------- .. [1] M. Hotz and W. Utschick, "The Hybrid Transmission Grid Architecture: Benefits in Nodal Pricing," in IEEE Trans. Power Systems, vol. 33, no. 2, pp. 1431-1442, Mar. 2018. .. [2] M. Hotz and W. Utschick, "A Hybrid Transmission Grid Architecture Enabling Efficient Optimal Power Flow," in IEEE Trans. Power Systems, vol. 31, no. 6, pp. 4504-4516, Nov. 2016. """ def __init__(self, model, qcqp_result, total_time=np.nan, qcqp_result_pre=None): """ Create a result object. Parameters ---------- model : hynet.system.model.SystemModel Model for the processed optimization problem. qcqp_result : hynet.qcqp.result.QCQPResult Solution of the QCQP. total_time : .hynet_float_, optional Total time for solving the QCQP, cf. ``hynet.system.calc.calc``. qcqp_result_pre : QCQPResult, optional Pre-solution of the model's QCQP for converter mode detection. """ timer = Timer() self._define_parameterization_constants() self.model = model self.total_time = total_time # REMARK: Only relevant data of the QCQP result object is extracted # here as storing it in its entirety would bloat the result object # due to the constraint objects in the contained QCQP object, which # exhibit a significant memory consumption. self.empty = qcqp_result.empty self.solver = qcqp_result.solver self.solver_status = qcqp_result.solver_status self.solver_time = qcqp_result.solver_time self.optimal_value = qcqp_result.optimal_value self._optimizer = qcqp_result.optimizer if qcqp_result.empty: return # Bus-related state data s_shunt = self.model.calc_shunt_apparent_power(self._optimizer) self.bus = pd.DataFrame(OrderedDict( [('v', self._optimizer.v), ('s_shunt', s_shunt) ])) self.bus.index = self.scenario.bus.index # Branch-related state data i_src, i_dst, s_src, s_dst = self.model.calc_branch_flow(self._optimizer) v_drop = self.model.calc_branch_voltage_drop(self._optimizer) angle_diff = self.model.calc_branch_angle_difference(self._optimizer) effective_rating = self.model.calc_branch_effective_rating(self._optimizer) self.branch = pd.DataFrame(OrderedDict( [('s_src', s_src), ('s_dst', s_dst), ('i_src', i_src), ('i_dst', i_dst), ('v_drop', v_drop), ('angle_diff', angle_diff), ('effective_rating', effective_rating) ])) self.branch.index = self.scenario.branch.index # Converter-related state data s_src, s_dst = self.model.calc_converter_flow(self._optimizer) loss_err = self.model.calc_converter_loss_error(self._optimizer) self.converter = pd.DataFrame(OrderedDict( [('p_src', s_src.real), ('p_dst', s_dst.real), ('q_src', -s_src.imag), ('q_dst', -s_dst.imag), ('loss_err', loss_err) ])) self.converter.index = self.scenario.converter.index if qcqp_result_pre is not None and not qcqp_result_pre.empty: loss_err_pre = \ self.model.calc_converter_loss_error(qcqp_result_pre.optimizer) self.converter['loss_err_pre'] = loss_err_pre # Injector-related state data cost_p, cost_q = self.model.calc_injection_costs(self._optimizer) self.injector = pd.DataFrame(OrderedDict( [('s', self._optimizer.s[0::2] + 1j*self._optimizer.s[1::2]), ('cost_p', cost_p), ('cost_q', cost_q) ])) self.injector.index = self.scenario.injector.index # Mean squared and branch-related reconstruction error self.reconstruction_mse = qcqp_result.reconstruction_mse if qcqp_result.V is not None: edges = (self.scenario.e_src.to_numpy(), self.scenario.e_dst.to_numpy()) rel_err = self.solver.rank1approx.calc_rel_err(qcqp_result.V, self._optimizer.v, edges) else: rel_err = np.nan * np.ones(self.scenario.num_branches, dtype=hynet_float_) self.branch.loc[:, 'rel_err'] = pd.Series(rel_err, index=self.branch.index) # Dual variables of the box constraints self.injector['dv_cap_p_min'] = qcqp_result.dv_lb.s[0::2] self.injector['dv_cap_q_min'] = qcqp_result.dv_lb.s[1::2] self.injector['dv_cap_p_max'] = qcqp_result.dv_ub.s[0::2] self.injector['dv_cap_q_max'] = qcqp_result.dv_ub.s[1::2] self.converter['dv_p_fwd_min'] = qcqp_result.dv_lb.f[0::4] self.converter['dv_p_bwd_min'] = qcqp_result.dv_lb.f[1::4] self.converter['dv_p_fwd_max'] = qcqp_result.dv_ub.f[0::4] self.converter['dv_p_bwd_max'] = qcqp_result.dv_ub.f[1::4] self.converter['dv_cap_src_q_min'] = qcqp_result.dv_lb.f[2::4] self.converter['dv_cap_dst_q_min'] = qcqp_result.dv_lb.f[3::4] self.converter['dv_cap_src_q_max'] = qcqp_result.dv_ub.f[2::4] self.converter['dv_cap_dst_q_max'] = qcqp_result.dv_ub.f[3::4] # Dual variables and constraint function values timer.interval() qcqp_result.get_result_tables(tables={'bus': self.bus, 'branch': self.branch, 'converter': self.converter, 'injector': self.injector}, dual_prefix='dv_', value_prefix='cv_') _log.debug("Result table creation ({:.3f} sec.)" .format(timer.interval())) # By definition, the power balance equations must be part of all models assert all(map(lambda x: x in self.bus.columns, ['dv_bal_p', 'dv_bal_q', 'cv_bal_p', 'cv_bal_q'])) # By definition, the real-part constraints must be part of all models assert self.branch.empty or 'dv_real_part' in self.branch.columns # Power balance error self.bus.loc[:, 'bal_err'] = self.bus['cv_bal_p'] + 1j*self.bus['cv_bal_q'] self.bus.drop(['cv_bal_p', 'cv_bal_q'], axis='columns', inplace=True) # Update the total time with the duration of the result creation self.total_time += timer.total() _log.debug("Result creation (total) ({:.3f} sec.)" .format(timer.total())) @property def num_buses(self): """Return the number of buses.""" return 0 if self.empty else len(self.bus.index) @property def num_branches(self): """Return the number of branches.""" return 0 if self.empty else len(self.branch.index) @property def num_converters(self): """Return the number of converters.""" return 0 if self.empty else len(self.converter.index) @property def num_injectors(self): """Return the number of injectors.""" return 0 if self.empty else len(self.injector.index) @property def scenario(self): """Return the scenario data of the system model.""" return self.model.scenario def __repr__(self): """Return a summary of the result.""" t = "" t += self._get_header() t += "|> Data Source " + "-"*63 + "<|\n" t += "|" + " "*78 + "|\n" t += self._get_data_source_summary() t += "|" + " "*78 + "|\n" t += "|> Grid Information " + "-"*58 + "<|\n" t += "|" + " "*78 + "|\n" t += self._get_grid_structure_summary() t += "|" + " "*78 + "|\n" t += self._get_grid_injection_summary() t += "|" + " "*78 + "|\n" if not self.empty: t += "|> Results " + "-"*67 + "<|\n" t += "|" + " "*78 + "|\n" t += self._get_injection_and_loss_summary() t += "|" + " "*40 + "-+----+-" + " "*30 + "|\n" t += self._get_nodal_and_utilization_summary() t += "|" + " "*78 + "|\n" t += "|> Solution Process " + "-"*58 + "<|\n" t += "|" + " "*78 + "|\n" t += self._get_solver_info() t += self._get_solution_accuracy_info() t += "+" + "-"*78 + "+\n" return t @property def details(self): """ Return a formatted string with details of the system's state. The returned string contains a formatted table for all major entity types, which is hopefully mostly self-explaining. In the very left or right of a column, there may be an indicator: +-----------+-------------------------------------------------------+ | Indicator | Meaning | +===========+=======================================================+ | ``R`` | Reference bus in the respective subgrid. | +-----------+-------------------------------------------------------+ | ``=`` | The bus is a DC bus. If there is no indicator, the | | | bus is an AC bus. | +-----------+-------------------------------------------------------+ | ``*`` | A limit on the respective quantity is active. | +-----------+-------------------------------------------------------+ | ``>`` | The branch is highly loaded, i.e., the flow is 90% or | | | more of the effective rating. | +-----------+-------------------------------------------------------+ | ``T`` | The branch is a transformer. If there is no | | | indicator, the branch is a line/cable. | +-----------+-------------------------------------------------------+ """ t = "" if self.empty: t += "+" + "-"*78 + "+\n" t += "| Solver terminated with status {:<46s} |\n"\ .format("'" + self.solver_status.name + "'") t += "+" + "-"*78 + "+\n" else: if self.bus.index.max() > 99999 or self.bus.index.min() < -9999: raise RuntimeError("This formatted output only supports bus " "IDs comprising 5 characters or less.") t += self._get_bus_details() t += self._get_branch_details() t += self._get_converter_details() t += self._get_injector_details() return t @property def is_valid(self): """ Return ``True`` if the result is considered as valid. For a result to be valid, the solved must have terminated with the solver status ``SOLVED`` and the converter loss error and the power balance error must be within the model's tolerance. """ return (not self.empty and self.solver_status == SolverStatus.SOLVED and self.is_physical) @property @ensure_result_availability def is_physical(self): """ Return ``True`` if the flow errors are within the model's tolerance. See Also -------- SystemResult.has_valid_converter_flows SystemResult.has_valid_power_balance """ return self.has_valid_converter_flows and self.has_valid_power_balance @property @ensure_result_availability def has_valid_converter_flows(self): """ Return ``True`` if the converter loss error is within the model's tolerance. """ return self.model.verify_converter_loss_accuracy(self._optimizer) @property @ensure_result_availability def has_valid_power_balance(self): """ Return ``True`` if the power balance error is within the model's tolerance. """ return self.model.verify_power_balance_accuracy(self.bus['bal_err'].to_numpy())
[docs] @ensure_result_availability def get_total_injection_cost(self): """ Return the total injection cost in $/h. """ total_injection_cost = 0 if not self.injector['cost_p'].isnull().all(): total_injection_cost += self.injector['cost_p'].sum() if not self.injector['cost_q'].isnull().all(): total_injection_cost += self.injector['cost_q'].sum() return total_injection_cost
[docs] @ensure_result_availability def get_dynamic_losses(self): """ Return the dynamic losses in MW. """ return self.model.calc_dynamic_losses(self._optimizer)
[docs] @ensure_result_availability def get_total_losses(self): """ Return the total losses in MW. The total losses comprise the dynamic losses and the static losses of the converters. """ total_losses = self.get_dynamic_losses() if not self.scenario.converter.empty: total_losses += self.scenario.converter['loss_fix'].sum() return total_losses
[docs] @ensure_result_availability def get_branch_utilization(self): """ Return a pandas Series with the branch utilization. Returns ------- branch_utilization : pandas.Series Utilization of the branches as the ratio of the MVA branch flow over the effective rating or ``numpy.nan`` for unrated branches. """ branch_flow = self.branch[['s_src', 's_dst']].abs().max(axis=1) branch_utilization = branch_flow / self.branch['effective_rating'] branch_utilization.name = 'branch_utilization' return branch_utilization
#========================================================================== # # CAUTION: Below is only some ugly string formatting code. # # You have been warned! ;P # #========================================================================== def _define_parameterization_constants(self): # Title for this result (Set this in a derived class) self._result_title = '' # Unit of the power balance dual variables (Set this in a derived class) # # CAUTION: Use at maximum 9 characters for proper formatting with the # result formatting methods provided by this class. self._unit_dv_bal_p = '' self._unit_dv_bal_q = '' # Result printing: Threshold on zero values to be replaced by a dash self._zero_thres = 1e-8 # Result printing: Tolerance factor for activity indicator w.r.t. # constraint interval self._tol_active = 0.005 # Result printing: Ratio w.r.t. to rating to indicate high loading self._high_loading = 0.9 def _get_header(self): t = "" t += "\n+" + "-"*78 + "+\n" t += "| {:<50s} {:>25s} |\n"\ .format(self._result_title.upper(), "hynet ~ version " + hynet_version) t += "|" + " "*78 + "|\n" return t def _get_data_source_summary(self): scenario = self.scenario if scenario.grid_name: grid_name = scenario.grid_name else: grid_name = "-" if scenario.database_uri: database_uri = " (" + os.path.basename(scenario.database_uri) + ")" else: database_uri = "" if scenario.name: scenario_name = scenario.name + " @ " else: scenario_name = "" t = "" t += "| Grid: {:<64s} |\n"\ .format(truncate_with_ellipsis(grid_name + database_uri, 64)) t += "| Scenario: {:<64s} |\n"\ .format(truncate_with_ellipsis("{0:s}{1:s} (id={2:s})" .format(scenario_name, scenario.get_time_string(), str(scenario.id)), 64)) return t def _get_grid_structure_summary(self): bus = self.scenario.bus branch = self.scenario.branch converter = self.scenario.converter injector = self.scenario.injector line_bus_type = bus.loc[ branch.loc[branch['type'] == BranchType.LINE, 'src'], 'type' ].to_numpy() conv_src_bus_type = bus.loc[converter['src'], 'type'].to_numpy() conv_dst_bus_type = bus.loc[converter['dst'], 'type'].to_numpy() conv_b2b_bus_type = conv_src_bus_type[conv_src_bus_type == conv_dst_bus_type] num_conventional = sum(x.is_conventional() for x in injector['type']) num_renewable = sum(x.is_renewable() for x in injector['type']) num_prosumer = sum(x.is_prosumer() for x in injector['type']) num_load = sum(x.is_load() for x in injector['type']) num_compensation = sum(x.is_compensation() for x in injector['type']) t = "" t += "| Topology: {:>10s}|{:7d} AC subgrids|{:6d} DC subgrids|{:5d} islands |\n"\ .format("hyb. arch." if self.scenario.has_acyclic_subgrids() else "meshed sg.", len(self.scenario.get_ac_subgrids()), len(self.scenario.get_dc_subgrids()), len(self.scenario.get_islands()) - 1) t += "| Buses: {:8d} |{:7d} AC buses |{:6d} DC buses |{:5d} with shunt |\n"\ .format(len(bus.index), (bus['type'] == BusType.AC).sum(), (bus['type'] == BusType.DC).sum(), (bus['y_tld'].abs() > 0).sum()) t += "| Branches: {:8d} |{:7d} AC lines |{:6d} DC lines |{:5d} transform. |\n"\ .format(len(branch.index), (line_bus_type == BusType.AC).sum(), (line_bus_type == BusType.DC).sum(), (branch['type'].to_numpy() == BranchType.TRANSFORMER).sum()) t += "| Converters:{:8d} |{:7d} AC/DC |{:6d} AC/AC |{:5d} DC/DC |\n"\ .format(len(converter.index), (conv_src_bus_type != conv_dst_bus_type).sum(), (conv_b2b_bus_type == BusType.AC).sum(), (conv_b2b_bus_type == BusType.DC).sum()) t += "| Injectors: {:8d} |{:7d} convent. |{:6d} disp. load |{:5d} compensat. |\n"\ .format(len(injector.index), num_conventional, num_load, num_compensation) t += "|" + " "*21 + "|{:7d} renewable |{:6d} prosumer | |\n"\ .format(num_renewable, num_prosumer) return t def _get_grid_injection_summary(self): injector = self.scenario.injector total_load = self.scenario.bus['load'].sum() t = "" t += "| Injection:{:11.1f} MW{:>10s} Mvar | Min.:{:9.1f} MW /{:10.1f} Mvar |\n"\ .format(sum(x.p_max for x in injector['cap']), "/{:8.1f}".format(sum(x.q_max for x in injector['cap'])), sum(x.p_min for x in injector['cap']), sum(x.q_min for x in injector['cap'])) t += "| Total load:{:10.1f} MW /{:8.1f} Mvar | Loading:{:8.2%} of P-capacity |\n"\ .format(total_load.real, total_load.imag, self.scenario.get_relative_loading()) return t def _get_injection_and_loss_summary(self): converter_p_sum = 0 converter_q_sum = 0 if not self.scenario.converter.empty: converter_p_sum += self.scenario.converter['loss_fix'].sum() converter_p_sum += self.converter['p_src'].sum() converter_p_sum += self.converter['p_dst'].sum() converter_q_sum += self.converter['q_src'].sum() converter_q_sum += self.converter['q_dst'].sum() total_losses = self.get_total_losses() total_real_load = self.scenario.bus['load'].to_numpy().real.sum() total_injection = self.injector['s'].sum() total_s_shunt = self.bus['s_shunt'].sum() total_branch_loss = self.branch['s_src'].to_numpy().real.sum() \ + self.branch['s_dst'].to_numpy().real.sum() t = "" t += "| Injection:{:11.1f} MW /{:8.1f} Mvar | Inj. cost:{:17.3f} k$/h |\n"\ .format(total_injection.real, total_injection.imag, self.get_total_injection_cost()/1e3) t += "| Total loss:{:10.1f} MW{:>8s} P-load | Shunt loss:{:14.1f} MW |\n"\ .format(total_losses, "/{:6.2%}".format(total_losses/total_real_load), total_s_shunt.real) t += "| Branch loss:{:9.1f} MW | Shunt Q-sum:{:13.1f} Mvar |\n"\ .format(total_branch_loss, -total_s_shunt.imag) t += "| Conv. loss:{:10.1f} MW | Conv. Q-sum:{:13.1f} Mvar |\n"\ .format(converter_p_sum, converter_q_sum) return t def _get_nodal_and_utilization_summary(self): v_abs = np.abs(self.bus['v']) v_angle = np.angle(self.bus['v']) * 180/np.pi dv_bal_p_min = self.bus['dv_bal_p'].min() dv_bal_p_max = self.bus['dv_bal_p'].max() dv_bal_q_min = self.bus['dv_bal_q'].min() dv_bal_q_max = self.bus['dv_bal_q'].max() branch_utilization = self.get_branch_utilization() idx_dc_line = self.scenario.get_dc_branches() idx_ac = self.scenario.get_ac_branches() idx_ac_line = idx_ac[ self.scenario.branch.loc[idx_ac, 'type'] == BranchType.LINE] idx_tf = idx_ac[ self.scenario.branch.loc[idx_ac, 'type'] == BranchType.TRANSFORMER] util_dc_line = branch_utilization[idx_dc_line].mean() util_ac_line = branch_utilization[idx_ac_line].mean() util_tf = branch_utilization[idx_tf].mean() t = "" t += "| Voltage mag.:{:10.3f} to{:10.3f} p.u. | Average branch utilization: |\n"\ .format(v_abs.min(), v_abs.max()) t += "| Voltage angle:{:8.2f} to{:9.2f} deg. | AC lines: {:<16s}|\n"\ .format(v_angle.min(), v_angle.max(), "{:8.2%}".format(util_ac_line) if not np.isnan(util_ac_line) else " -") t += "| P-bal. dual:{:>10s} to{:>9s} {:<9s}| DC lines: {:<16s}|\n"\ .format("{:10.2f}".format(dv_bal_p_min) if not np.isnan(dv_bal_p_min) else "- ", "{:9.2f}".format(dv_bal_p_max) if not np.isnan(dv_bal_p_max) else "- ", self._unit_dv_bal_p, "{:8.2%}".format(util_dc_line) if not np.isnan(util_dc_line) else " -") t += "| Q-bal. dual:{:>10s} to{:>9s} {:<9s}| Transformers:{:<16s}|\n"\ .format("{:10.2f}".format(dv_bal_q_min) if not np.isnan(dv_bal_q_min) else "- ", "{:9.2f}".format(dv_bal_q_max) if not np.isnan(dv_bal_q_max) else "- ", self._unit_dv_bal_q, "{:8.2%}".format(util_tf) if not np.isnan(util_tf) else " -") return t def _get_solver_info(self): solver_string = str(self.solver) if self.solver.param: solver_string += ' / ' solver_string += ', '.join([key + '=' + str(value) for key, value in self.solver.param.items()]) t = "" t += "| Solver: {:<56s} |\n"\ .format(truncate_with_ellipsis(solver_string, 56)) t += "| Status: {:<56s} |\n"\ .format(self.solver_status.name.lower()) t += "| Time: {:<56s} |\n"\ .format('{:.2f} sec. in the solver'.format(self.solver_time) + ('' if np.isnan(self.total_time) else ' / {:.2f} sec. in total'.format(self.total_time))) return t def _get_solution_accuracy_info(self, inaccuracy_warning=True): t = "" if self.empty: return t def format_mean_max(series): return "{:10.3e} ({:9.3e}) mean (max.)".format(series.mean(), series.max()) t += "| Power balance: {:s} absolute error in MVA |\n"\ .format(format_mean_max(self.bus['bal_err'].abs())) if not self.scenario.converter.empty: if 'loss_err_pre' in self.converter: # The converter modes were fixed... if not self.has_valid_converter_flows: _log.error("The converter loss error violates the " "tolerance despite fixed modes.") t += "| Converters: Fixed mode. [ was{:s} MW ] |\n"\ .format(format_mean_max(self.converter['loss_err_pre'])) else: t += "| Converters: {:s} loss error in MW |\n"\ .format(format_mean_max(self.converter['loss_err'])) if not np.isnan(self.reconstruction_mse): t += "| Voltage recovery: {:<56s} |\n"\ .format("{:9.3e} mean squared error (reconstruction error)" .format(self.reconstruction_mse)) if inaccuracy_warning and not self.is_physical: t += "|" + " "*78 + "|\n" t += "|{:^78s}|\n"\ .format("WARNING: THE ERROR TOLERANCES FOR A " "PHYSICAL SOLUTION ARE NOT SATISFIED!") return t def _check_activity(self, value, min_, max_): if min_ is None and max_ is not None: min_ = np.nan * max_ elif max_ is None and min_ is not None: max_ = np.nan * min_ tolerance = pd.Series(False, index=max_.index) for id_ in tolerance.index: val = value.at[id_] lb = min_.at[id_] ub = max_.at[id_] lb_present = ~np.isnan(lb) ub_present = ~np.isnan(ub) if lb_present and ub_present: tol = self._tol_active * (ub - lb) + self._zero_thres tolerance.at[id_] = (val <= lb + tol) | (val >= ub - tol) elif lb_present: tol = abs(self._tol_active * lb) + self._zero_thres tolerance.at[id_] = (val <= lb + tol) elif ub_present: tol = abs(self._tol_active * ub) + self._zero_thres tolerance.at[id_] = (val >= ub - tol) return tolerance def _check_cap_region_box_active(self, value, cap): p = pd.Series(value.to_numpy().real, index=value.index) p_min = pd.Series([x.p_min for x in cap], index=cap.index) p_max = pd.Series([x.p_max for x in cap], index=cap.index) p_active = self._check_activity(p, p_min, p_max) q = pd.Series(value.to_numpy().imag, index=value.index) q_min = pd.Series([x.q_min for x in cap], index=cap.index) q_max = pd.Series([x.q_max for x in cap], index=cap.index) q_active = self._check_activity(q, q_min, q_max) return p_active, q_active def _get_bus_details(self): unit_dv_bal_p = self._unit_dv_bal_p if unit_dv_bal_p and len(unit_dv_bal_p) <= 7: unit_dv_bal_p = "(" + unit_dv_bal_p + ")" unit_dv_bal_q = self._unit_dv_bal_q if unit_dv_bal_q and len(unit_dv_bal_q) <= 7: unit_dv_bal_q = "(" + unit_dv_bal_q + ")" t = "" t += "\n+" + "-"*78 + "+\n" t += "| BUS RESULT" + " "*67 + "|\n" t += "+-------+---------------+-------------------+--------------+-------------------+\n" t += "| | Voltage | Load | Shunt | Dual Variables |\n" t += "| +-------+-------+---------+---------+------+-------+---------+---------+\n" t += "| | Mag. | Phase | P | Q | Loss | Q-Inj.| P-bal. | Q-bal. |\n" t += "| ID | (pu) | (deg) | (MW) | (Mvar) | (MW) | (Mvar)|{:^9s}|{:^9s}|\n"\ .format(unit_dv_bal_p, unit_dv_bal_q) t += "+-------+-------+-------+---------+---------+------+-------+---------+---------+\n" t = [t] scr_bus = self.scenario.bus res_bus = self.bus dc_bus = scr_bus['type'] == BusType.DC v_abs = res_bus['v'].abs() v_abs_active = self._check_activity(v_abs, scr_bus['v_min'], scr_bus['v_max']) v_angle = pd.Series(np.angle(res_bus['v']) * 180/np.pi, index=res_bus.index) load_p = pd.Series(scr_bus['load'].to_numpy().real, index=scr_bus.index) load_q = pd.Series(scr_bus['load'].to_numpy().imag, index=scr_bus.index) shunt_p = pd.Series(res_bus['s_shunt'].to_numpy().real, index=res_bus.index) shunt_q = pd.Series(res_bus['s_shunt'].to_numpy().imag, index=res_bus.index) dv_bal_p = res_bus['dv_bal_p'] dv_bal_q = res_bus['dv_bal_q'] for id_ in res_bus.index: r = "|" r += "R" if scr_bus.at[id_, 'ref'] else " " r += "{:5d}".format(id_) r += "=" if dc_bus.at[id_] else " " r += "|" r += "*" if v_abs_active.at[id_] else " " r += "{:6.3f}".format(v_abs.at[id_]) r += "|" if dc_bus.at[id_] and np.abs(v_angle.at[id_]) < self._zero_thres: r += " - " else: r += "{:7.3f}".format(v_angle.at[id_]) r += "|" if load_p.at[id_] == 0: r += " - " else: r += "{:9.2f}".format(load_p.at[id_]) r += "|" if load_q.at[id_] == 0: r += " - " else: r += "{:9.2f}".format(load_q.at[id_]) r += "|" if shunt_p.at[id_] == 0: r += " - " else: r += "{:6.3f}".format(shunt_p.at[id_]) r += "|" if shunt_q.at[id_] == 0: r += " - " else: r += "{:7.3f}".format(-shunt_q.at[id_]) r += "|" if np.isnan(dv_bal_p.at[id_]): r += " - " else: r += "{:9.3f}".format(dv_bal_p.at[id_]) r += "|" if np.isnan(dv_bal_q.at[id_]): r += " - " else: r += "{:9.3f}".format(dv_bal_q.at[id_]) r += "|" r += "\n" t.append(r) t = "".join(t) t += "+-------+-------+-------+---------+---------+------+-------+---------+---------+\n" return t def _get_branch_details(self): t = "" t += "\n+" + "-"*78 + "+\n" t += "| BRANCH RESULT" + " "*64 + "|\n" t += "+-------+-------------+-------------------+-----------------+--------+---------+\n" t += "| | Terminals | Power Flow | Voltage | Losses | Reconst.|\n" t += "| +------+------+----------+--------+--------+--------+--------+---------+\n" t += "| | Src. | Dst. | |S_src| | PF_src | Drop |Angle D.| Dyn. | Kappa |\n" t += "| ID | Bus | Bus | (MVA) | | (%) | (deg) | (MW) | |\n" t += "+-------+------+------+----------+--------+--------+--------+--------+---------+\n" scr_branch = self.scenario.branch res_branch = self.branch if res_branch.empty: t += "| No branches present" + " "*58 + "|\n" t += "+" + "-"*78 + "+\n" return t t = [t] transformer = scr_branch['type'] == BranchType.TRANSFORMER src = scr_branch['src'] dst = scr_branch['dst'] dc_bus = self.scenario.bus['type'] == BusType.DC S_src = res_branch['s_src'] S_dst = res_branch['s_dst'] I_src_abs = res_branch['i_src'].abs() I_dst_abs = res_branch['i_dst'].abs() I_max = scr_branch['rating'] / self.scenario.base_mva congested = self._check_activity(I_src_abs, None, I_max) \ | self._check_activity(I_dst_abs, None, I_max) highly_loaded = (I_src_abs >= self._high_loading * I_max)\ | (I_dst_abs >= self._high_loading * I_max) drop = res_branch['v_drop'] * 100 drop_active = self._check_activity(drop, scr_branch['drop_min'], scr_branch['drop_max']) angle_diff = res_branch['angle_diff'] angle_diff_active = self._check_activity(angle_diff, scr_branch['angle_min'], scr_branch['angle_max']) loss_dyn = pd.Series((S_src + S_dst).to_numpy().real, index=res_branch.index) kappa = res_branch['rel_err'] for id_ in res_branch.index: r = "|" r += "{:6d}".format(id_) r += "T" if transformer.at[id_] else " " r += "|" r += "{:5d}".format(src.at[id_]) r += "=" if dc_bus.loc[src.at[id_]] else " " r += "|" r += "{:5d}".format(dst.at[id_]) r += "=" if dc_bus.loc[dst.at[id_]] else " " r += "|" r += "*" if congested.at[id_] else ">" if highly_loaded.at[id_] else " " r += "{:9.2f}".format(np.abs(S_src.at[id_])) r += "|" if np.abs(S_src.at[id_]) < self._zero_thres: r += " - " else: r += "{:8.3f}".format(np.real(S_src.at[id_]) / np.abs(S_src.at[id_])) r += "|" r += "*" if drop_active.at[id_] else " " r += "{:7.3f}".format(drop.at[id_]) r += "|" r += "*" if angle_diff_active.at[id_] else " " if dc_bus.loc[src.at[id_]] and np.abs(angle_diff.at[id_]) < self._zero_thres: r += " - " else: r += "{:7.3f}".format(angle_diff.at[id_]) r += "|" r += "{:8.3f}".format(loss_dyn.at[id_]) r += "|" if np.isnan(kappa.at[id_]): r += " - " else: r += "{:9.2e}".format(kappa.at[id_]) r += "|" r += "\n" t.append(r) t = "".join(t) t += "+-------+------+------+----------+--------+--------+--------+--------+---------+\n" return t def _get_converter_details(self): t = "" t += "\n+" + "-"*78 + "+\n" t += "| CONVERTER RESULT" + " "*61 + "|\n" t += "+-------+-------------+---------------------+-----------------+----------------+\n" t += "| | Terminals | Power Flow |Reactive Pwr Inj.| Losses |\n" t += "| +------+------+----------+----------+--------+--------+--------+-------+\n" t += "| | Src. | Dst. | P_src | P_dst | Q_src | Q_dst | Dyn. | Fix |\n" t += "| ID | Bus | Bus | (MW) | (MW) | (Mvar) | (Mvar) | (MW) | (MW) |\n" t += "+-------+------+------+----------+----------+--------+--------+--------+-------+\n" scr_converter = self.scenario.converter res_converter = self.converter if res_converter.empty: t += "| No converters present" + " "*56 + "|\n" t += "+" + "-"*78 + "+\n" return t t = [t] src = scr_converter['src'] dst = scr_converter['dst'] dc_bus = self.scenario.bus['type'] == BusType.DC P_src = res_converter['p_src'] P_dst = res_converter['p_dst'] Q_src = res_converter['q_src'] Q_dst = res_converter['q_dst'] (P_src_active, _) = \ self._check_cap_region_box_active(P_src + 1j*Q_src, scr_converter['cap_src']) (P_dst_active, _) = \ self._check_cap_region_box_active(P_dst + 1j*Q_dst, scr_converter['cap_dst']) loss_dyn = P_src + P_dst loss_fix = scr_converter['loss_fix'] for id_ in res_converter.index: r = "|" r += "{:6d}".format(id_) r += " |" r += "{:5d}".format(src.at[id_]) r += "=" if dc_bus.loc[src.at[id_]] else " " r += "|" r += "{:5d}".format(dst.at[id_]) r += "=" if dc_bus.loc[dst.at[id_]] else " " r += "|" r += "*" if P_src_active.at[id_] else " " r += "{:9.2f}".format(P_src.at[id_]) r += "|" r += "*" if P_dst_active.at[id_] else " " r += "{:9.2f}".format(P_dst.at[id_]) r += "|" if dc_bus.loc[src.at[id_]] and np.abs(Q_src.at[id_]) < self._zero_thres: r += " - " else: r += "{:8.2f}".format(Q_src.at[id_]) r += "|" if dc_bus.loc[dst.at[id_]] and np.abs(Q_dst.at[id_]) < self._zero_thres: r += " - " else: r += "{:8.2f}".format(Q_dst.at[id_]) r += "|" r += "{:8.3f}".format(loss_dyn.at[id_]) r += "|" if loss_fix.at[id_] == 0: r += " - " else: r += "{:7.3f}".format(loss_fix.at[id_]) r += "|" r += "\n" t.append(r) t = "".join(t) t += "+-------+------+------+----------+----------+--------+--------+--------+-------+\n" return t def _get_injector_details(self): t = "" t += "\n+" + "-"*78 + "+\n" t += "| INJECTOR RESULT" + " "*62 + "|\n" t += "+-------+------+--------------------------+-----------------+------------------+\n" t += "| | Term.| Injection | Cost | Type |\n" t += "| +------+----------+--------+------+--------+--------+ |\n" t += "| | | P | Q | PF | for P | for Q | |\n" t += "| ID | Bus | (MW) | (Mvar) | | (k$/h) | (k$/h) | |\n" t += "+-------+------+----------+--------+------+--------+--------+------------------+\n" t = [t] scr_injector = self.scenario.injector res_injector = self.injector terminal = scr_injector['bus'] dc_bus = self.scenario.bus['type'] == BusType.DC S = res_injector['s'] P = pd.Series(S.to_numpy().real, index=res_injector.index) Q = pd.Series(S.to_numpy().imag, index=res_injector.index) (P_active, _) = self._check_cap_region_box_active(S, scr_injector['cap']) cost_p = res_injector['cost_p'] cost_q = res_injector['cost_q'] for id_ in res_injector.index: r = "|" r += "{:6d}".format(id_) r += " |" r += "{:5d}".format(terminal.at[id_]) r += "=" if dc_bus.loc[terminal.at[id_]] else " " r += "|" r += "*" if P_active.at[id_] else " " r += "{:9.2f}".format(P.at[id_]) r += "|" if dc_bus.loc[terminal.at[id_]] and np.abs(Q.at[id_]) < self._zero_thres: r += " - " else: r += "{:8.2f}".format(Q.at[id_]) r += "|" if ((dc_bus.loc[terminal.at[id_]] and np.abs(Q.at[id_]) < self._zero_thres) or np.abs(S.at[id_]) < self._zero_thres): r += " - " else: r += "{:6.3f}".format(P.at[id_] / np.abs(S.at[id_])) r += "|" if np.isnan(cost_p.at[id_]): r += " - " else: r += "{:8.3f}".format(cost_p.at[id_]/1e3) r += "|" if np.isnan(cost_q.at[id_]): r += " - " else: r += "{:8.3f}".format(cost_q.at[id_]/1e3) r += "|" r += " {:<16s} ".format(truncate_with_ellipsis( str(scr_injector.at[id_, 'type']), 16)) r += "|" r += "\n" t.append(r) t = "".join(t) t += "+-------+------+----------+--------+------+--------+--------+------------------+\n" return t