Source code for tespy.tools.helpers

# -*- coding: utf-8

"""Module for helper functions used by several other modules.

This file is part of project TESPy (github.com/oemof/tespy). It's copyrighted
by the contributors recorded in the version control history of the file,
available from its original location tespy/tools/helpers.py

SPDX-License-Identifier: MIT
"""

import json
import os
from collections.abc import Mapping
from copy import deepcopy

import CoolProp.CoolProp as CP

from tespy import __datapath__
from tespy.tools import logger
from tespy.tools.global_vars import ERR
from tespy.tools.global_vars import fluid_property_data


[docs] def get_all_subdictionaries(data): subdictionaries = [] for value in data.values(): if len(value["subbranches"]) == 0: subdictionaries.append( {k: v for k, v in value.items() if k != "subbranches"} ) else: subdictionaries.append( {k: v for k, v in value.items() if k != "subbranches"} ) subdictionaries.extend(get_all_subdictionaries(value["subbranches"])) return subdictionaries
[docs] def get_chem_ex_lib(name): """Return a new dictionary by merging two dictionaries recursively.""" path = os.path.join(__datapath__, "ChemEx", f"{name}.json") with open(path, "r") as f: return json.load(f)
[docs] def fluidalias_in_list(fluid, fluid_list): aliases = [alias.replace(' ', '') for alias in CP.get_aliases(fluid)] return any(alias in fluid_list for alias in aliases)
[docs] def merge_dicts(dict1, dict2): """Return a new dictionary by merging two dictionaries recursively.""" result = deepcopy(dict1) for key, value in dict2.items(): if isinstance(value, Mapping): result[key] = merge_dicts(result.get(key, {}), value) else: result[key] = deepcopy(dict2[key]) return result
[docs] class TESPyNetworkError(Exception): """Custom message for network related errors.""" pass
[docs] class TESPyConnectionError(Exception): """Custom message for connection related errors.""" pass
[docs] class TESPyComponentError(Exception): """Custom message for component related errors.""" pass
[docs] def convert_to_SI(property, value, unit): r""" Convert a value to its SI value. Parameters ---------- property : str Fluid property to convert. value : float Value to convert. unit : str Unit of the value. Returns ------- SI_value : float Specified fluid property in SI value. """ if property == 'T': converters = fluid_property_data['T']['units'][unit] return (value + converters[0]) * converters[1] else: return value * fluid_property_data[property]['units'][unit]
[docs] def convert_from_SI(property, SI_value, unit): r""" Get a value in the network's unit system from SI value. Parameters ---------- property : str Fluid property to convert. SI_value : float SI value to convert. unit : str Unit of the value. Returns ------- value : float Specified fluid property value in network's unit system. """ if property == 'T': converters = fluid_property_data['T']['units'][unit] return SI_value / converters[1] - converters[0] else: return SI_value / fluid_property_data[property]['units'][unit]
[docs] def latex_unit(unit): r""" Convert unit to LaTeX. Parameters ---------- unit : str Value of unit for input, e.g. :code:`m3 / kg`. Returns ------- unit : str Value of unit for output, e.g. :code:`$\unitfrac{m3}{kg}$`. """ if '/' in unit: numerator = unit.split('/')[0].replace(' ', '') denominator = unit.split('/')[1].replace(' ', '') return r'$\unitfrac[]{' + numerator + '}{' + denominator + '}$' else: if unit == 'C' or unit == 'F': unit = r'^\circ ' + unit return r'$\unit[]{' + unit + '}$'
[docs] class UserDefinedEquation: def __init__(self, label, func, deriv, conns, params={}, latex={}): r""" A UserDefinedEquation allows use of generic user specified equations. Parameters ---------- label : str Label of the user defined function. func : function Equation to evaluate. deriv : function Partial derivatives of the equation. conns : list List of connections used by the function. params : dict Dictionary containing keyword arguments required by the function and/or derivative. latex : dict Dictionary holding LaTeX string of the equation as well as CharLine and CharMap instances applied in the equation for the automatic model documentation module. Example ------- Consider a pipeline transporting hot water with measurement data on temperature reduction in the pipeline as function of volumetric flow. First, we set up the TESPy model. Additionally, we will import the :py:class:`tespy.tools.helpers.UserDefinedEquation` class as well as some fluid property functions. We specify fluid property information for the inflow and assume that no pressure losses occur in the pipeline. >>> from tespy.components import Source, Sink, Pipe >>> from tespy.networks import Network >>> from tespy.connections import Connection >>> from tespy.tools import UserDefinedEquation >>> from tespy.tools import CharLine >>> from tespy.tools.fluid_properties import T_mix_ph, v_mix_ph >>> nw = Network(p_unit='bar', T_unit='C') >>> nw.set_attr(iterinfo=False) >>> so = Source('source') >>> si = Sink('sink') >>> pipeline = Pipe('pipeline') >>> inflow = Connection(so, 'out1', pipeline, 'in1') >>> outflow = Connection(pipeline, 'out1', si, 'in1') >>> nw.add_conns(inflow, outflow) >>> inflow.set_attr(T=120, p=10, v=1, fluid={'water': 1}) >>> pipeline.set_attr(pr=1) Let's assume, the temperature reduction is measured from inflow and outflow temperature. The mathematical description of the relation we want the model to follow therefore is: .. math:: 0 = T_{in} - T_{out} + f \left( \dot{m}_{in} \cdot v_{in} \right) We can define a function, describing exactly that relation using a :py:class:`tespy.tools.characteristics.CharLine` object with volumetric flow as input values and temperature drop as output values. The function should look like this: >>> def myfunc(ude): ... char = ude.params['char'] ... return ( ... ude.conns[0].calc_T() - ude.conns[1].calc_T() ... - char.evaluate( ... ude.conns[0].m.val_SI * ... ude.conns[0].calc_vol() ... ) ... ) The function does only take one parameter, we name it :code:`ude` in this case. This parameter will hold all relevant information you pass to your UserDefinedEquation later, i.e. a list of the connections (:code:`.conns`) required by the UserDefinedEquation as well as a dictionary of arbitrary parameters required for your function (:code:`.params`). The index of the :code:`.conns` indicates the position of the connection in the list of connections required for the UserDefinedEquation (see below). On top of the equation the solver requires its derivatives with respect to all relevant primary variables of the network, which are mass flow pressure, enthalpy and fluid composition. In this case, the derivatives to the mass flow, pressure and enthalpy of the inflow as well as the derivatives to the pressure and enthalpy of the outflow will be required. You have to define a function placing the derivatives in the Jacobian matrix. The Jacobian is a dictionary containing tuples as keys with the derivative as their value. The tuples indicate the equation number (always 0 for user defined equations, since there is only a single equation) and the position of the variable in the system matrix. The position of the variables is stored in the :code:`J_col` attribute. Before calculating and placing a result in the Jacobian, you have to make sure, that the variable you want to calculate the partial derivative for is actually a variable. For example, in case you specified a value for the mass flow, it will not be part of the variables' space, since it has a constant value, and thus, no derivate needs to be calculated. You can use the :code:`is_var` keyword to check, whether a mass flow, pressure or enthalpy is actually variable. We can calculate the derivatives numerically, if an easy analytical solution is not available. Simply use the :code:`numeric_deriv` method passing the variable ('m', 'p', 'h', 'fluid') as well as the connection. >>> def myjacobian(ude): ... c0 = ude.conns[0] ... c1 = ude.conns[1] ... if c0.m.is_var: ... ude.jacobian[c0.m.J_col] = ude.numeric_deriv('m', c0) ... if c0.p.is_var: ... ude.jacobian[c0.p.J_col] = ude.numeric_deriv('p', c0) ... if c0.h.is_var: ... ude.jacobian[c0.h.J_col] = ude.numeric_deriv('h', c0) ... if c1.p.is_var: ... ude.jacobian[c1.p.J_col] = ude.numeric_deriv('p', c1) ... if c1.h.is_var: ... ude.jacobian[c1.h.J_col] = ude.numeric_deriv('h', c1) After that, we only need to th specify the characteristic line we want out temperature drop to follow as well as create the UserDefinedEquation instance and add it to the network. Its equation is automatically applied. We apply extrapolation for the characteristic line as it helps with convergence, in case a paramter >>> char = CharLine( ... x=[0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, 2.0, 3.0], ... y=[17, 12, 9, 6.5, 4.5, 3, 2, 1.5, 1.25, 1.125, 1.1, 1.05], ... extrapolate=True) >>> my_ude = UserDefinedEquation( ... 'myudelabel', myfunc, myjacobian, [inflow, outflow], ... params={'char': char}) >>> nw.add_ude(my_ude) >>> nw.solve('design') Clearly the result is obvious here as the volumetric flow is exactly at one of the supporting points. >>> round(inflow.T.val - outflow.T.val, 3) 1.125 So now, let's say, we want to calculate the volumetric flow necessary to at least maintain a specific temperature at the outflow. >>> inflow.set_attr(v=None) >>> outflow.set_attr(T=110) >>> nw.solve('design') >>> round(inflow.v.val, 3) 0.267 Or calculate volumetric flow and/or temperature drop corresponding to a specified heat loss. >>> outflow.set_attr(T=None) >>> pipeline.set_attr(Q=-5e6) >>> nw.solve('design') >>> round(inflow.v.val, 3) 0.067 """ if isinstance(label, str): self.label = label else: msg = 'Label of UserDefinedEquation object must be of type String.' logger.error(msg) raise TypeError(msg) if isinstance(conns, list): self.conns = conns else: msg = ( 'Parameter conns must be a list of ' 'tespy.connections.connection.Connection objects.') logger.error(msg) raise TypeError(msg) self.func = func self.deriv = deriv if isinstance(params, dict): self.params = params else: msg = 'The parameter params must be passed as dictionary.' logger.error(msg) raise TypeError(msg) self.latex = { 'equation': r'\text{equation string not available}', 'lines': [], 'maps': [] } if isinstance(latex, dict): self.latex.update(latex) else: msg = 'The parameter latex must be passed as dictionary.' logger.error(msg) raise TypeError(msg)
[docs] def solve(self): self.residual = self.func(self) self.deriv(self)
[docs] def numeric_deriv(self, dx, conn): r""" Calculate partial derivative of the user defined function to dx. For details see :py:func:`tespy.tools.helpers._numeric_deriv` """ return _numeric_deriv(self, self.func, dx, conn, ude=self)
def _numeric_deriv(obj, func, dx, conn=None, **kwargs): r""" Calculate partial derivative of the function func to dx. Parameters ---------- obj : object Instance, which provides the equation to calculate the derivative for. func : function Function :math:`f` to calculate the partial derivative for. dx : str Partial derivative. conn : tespy.connections.connection.Connection Connection to calculate the numeric derivative for. Returns ------- deriv : float/list Partial derivative(s) of the function :math:`f` to variable(s) :math:`x`. .. math:: \frac{\partial f}{\partial x} = \frac{f(x + d) + f(x - d)}{2 d} """ if conn is None: d = obj.get_attr(dx).d exp = 0 obj.get_attr(dx).val += d exp += func(**kwargs) obj.get_attr(dx).val -= 2 * d exp -= func(**kwargs) deriv = exp / (2 * d) obj.get_attr(dx).val += d elif dx in conn.fluid.is_var: d = 1e-5 val = conn.fluid.val[dx] if conn.fluid.val[dx] + d <= 1: conn.fluid.val[dx] += d else: conn.fluid.val[dx] = 1 conn.build_fluid_data() exp = func(**kwargs) if conn.fluid.val[dx] - 2 * d >= 0: conn.fluid.val[dx] -= 2 * d else: conn.fluid.val[dx] = 0 conn.build_fluid_data() exp -= func(**kwargs) conn.fluid.val[dx] = val conn.build_fluid_data() deriv = exp / (2 * d) elif dx in ['m', 'p', 'h']: if dx == 'm': d = 1e-4 else: d = 1e-1 conn.get_attr(dx).val_SI += d exp = func(**kwargs) conn.get_attr(dx).val_SI -= 2 * d exp -= func(**kwargs) deriv = exp / (2 * d) conn.get_attr(dx).val_SI += d else: msg = ( "Your variable specification for the numerical derivative " "calculation seems to be wrong. It has to be a fluid name, m, " "p, h or the name of a component variable." ) logger.exception(msg) raise ValueError(msg) return deriv
[docs] def bus_char_evaluation(component_value, char_func, reference_value, bus_value, **kwargs): r""" Calculate the value of a bus. Parameters ---------- comp_value : float Value of the energy transfer at the component. reference_value : float Value of the bus in reference state. char_func : tespy.tools.characteristics.char_line Characteristic function of the bus. Returns ------- residual : float Residual of the equation. .. math:: residual = \dot{E}_\mathrm{bus} - \frac{\dot{E}_\mathrm{component}} {f\left(\frac{\dot{E}_\mathrm{bus}} {\dot{E}_\mathrm{bus,ref}}\right)} """ return bus_value - component_value / char_func.evaluate( bus_value / reference_value )
[docs] def bus_char_derivative(component_value, char_func, reference_value, bus_value, **kwargs): """Calculate derivative for bus char evaluation.""" d = 1e-3 return (1 - ( 1 / char_func.evaluate((bus_value + d) / reference_value) - 1 / char_func.evaluate((bus_value - d) / reference_value) ) / (2 * d))
[docs] def newton_with_kwargs( derivative, target_value, val0=300, valmin=70, valmax=3000, max_iter=10, tol_rel=ERR, tol_abs=ERR ** 2, tol_mode="rel", **function_kwargs ): # start newton loop iteration = 0 expr = True x = val0 parameter = function_kwargs["parameter"] function = function_kwargs["function"] relax = 1 if tol_mode == "rel" and abs(target_value) <= 2 * tol_rel: tol_mode = "abs" while expr: # calculate function residual and new value function_kwargs[parameter] = x residual = target_value - function(**function_kwargs) x += residual / derivative(**function_kwargs) * relax # check for value ranges if x < valmin: x = valmin if x > valmax: x = valmax iteration += 1 # relaxation to help convergence in case of jumping if iteration == 5: relax = 0.75 max_iter = 12 if iteration > max_iter: msg = ( 'The Newton algorithm was not able to find a feasible value ' f'for function {function}. Current value with x={x} is ' f'{function(**function_kwargs)}, target value is ' f'{target_value}, residual is {residual} after {iteration} ' 'iterations.' ) logger.debug(msg) break if tol_mode == 'abs' or target_value == 0: expr = abs(residual) >= tol_abs elif tol_mode == 'rel': expr = abs(residual / target_value) >= tol_rel else: expr = abs(residual / target_value) >= tol_rel or abs(residual) >= tol_abs return x
[docs] def central_difference(function=None, parameter=None, delta=None, **kwargs): upper = kwargs.copy() upper[parameter] += delta lower = kwargs lower[parameter] -= delta return (function(**upper) - function(**lower)) / (2 * delta)
[docs] def get_basic_path(): """ Return the basic tespy path and creates it if necessary. The basic path is the '.tespy' folder in the $HOME directory. """ basicpath = os.path.join(os.path.expanduser('~'), '.tespy') if not os.path.isdir(basicpath): os.mkdir(basicpath) return basicpath
[docs] def extend_basic_path(subfolder): """ Return a path based on the basic tespy path and creates it if necessary. The subfolder is the name of the path extension. """ extended_path = os.path.join(get_basic_path(), subfolder) if not os.path.isdir(extended_path): os.mkdir(extended_path) return extended_path