Source code for barril.units.unit_database

from typing import TYPE_CHECKING
from typing import Any
from typing import Callable
from typing import Dict
from typing import Hashable
from typing import Iterator
from typing import List
from typing import Optional
from typing import Sequence
from typing import Set
from typing import Tuple
from typing import Type
from typing import TypeVar
from typing import Union
from typing import cast

import attr
import copy
import math
import traceback
from oop_ext.foundation.singleton import Singleton

from barril._util.types_ import CheckType
from barril.units.interfaces import UnitExponentTuple

if TYPE_CHECKING:
    from barril.units import Quantity

# Contains the registry for all the available unit types.
__all__ = [
    "CategoryInfo",
    "InvalidOperationError",
    "InvalidQuantityTypeError",
    "InvalidUnitError",
    "UnitDatabase",
    "UnitInfo",
    "UnitsError",
]


_LEGACY_TO_CURRENT: List[Tuple[str, str]] = [
    ("1000ft3", "Mcf"),
    ("1000m3", "Mm3"),
    ("M(ft3)", "MMcf"),
    ("M(m3)", "MMm3"),
    ("k(ft3)", "Mcf"),
    ("Ns/m", "N.s/m"),
    ("lbmole", "lbmol"),
    ("gmole", "gmol"),
]


def FixUnitIfIsLegacy(unit: str) -> Tuple[bool, str]:
    fixed_unit = unit
    try:
        for legacy, current in _LEGACY_TO_CURRENT:
            fixed_unit = fixed_unit.replace(legacy, current)
        return (unit != fixed_unit), fixed_unit
    except:
        return False, unit


class UnitsError(RuntimeError):
    """
    Base class for errors related to units.
    """


class InvalidQuantityTypeError(UnitsError):
    """
    Error raised when an invalid quantity type is found
    """

    def __init__(self, quantity_type: str, available: Optional[List[str]] = None):
        msg = f"Invalid quantity_type: {quantity_type}"
        if available is not None:
            msg += "\nAvailable:\n" + "\n".join(available)
        UnitsError.__init__(self, msg)


class InvalidUnitError(UnitsError):
    """
    Error raised when an invalid unit is found
    """

    def __init__(
        self,
        unit: str,
        quantity_type: Optional[str] = None,
        category: Optional[str] = None,
        valid_units: Optional[List[str]] = None,
    ):
        if quantity_type is not None:
            msg = f"Invalid unit for quantity_type {quantity_type}: {unit}"

        elif category is not None:
            msg = f"Invalid unit for category {category}: {unit}"

        else:
            msg = f"Invalid unit:{unit}"

        if valid_units is not None:
            msg += " [Valid Units: %s]" % valid_units

        UnitsError.__init__(self, msg)


class InvalidOperationError(UnitsError):
    """
    Error raised when some operation but couldn't actually be performed with the given units.
    E.g.: Summing meters + seconds is not valid (while meters + centimeters would be)
    """


class ComposedUnitError(UnitsError):
    """
    Error raised when a composed unit conversion is performed
    E.g.: Velocity_Array * Concentration_Array = m/s.g/cm3
    """


UnaryConversionFunc = Callable[[float], float]


class UnitInfo:
    """
    Holds information about a unit type
    """

    ADD_STR_INFO_TO_UNIT_INFO = False

    def __init__(
        self,
        quantity_type: str,
        name: str,
        unit: str,
        frombase: Union[str, UnaryConversionFunc],
        tobase: Union[str, UnaryConversionFunc],
        default_category: Optional[str] = None,
    ):
        """
        :param name:
            Name of the unit (e.g.: meter, millimiter).

        :param unit:
            String that represents the unit (symbol - e.g.: m, mm).

        :param frombase:
            the formula to convert from the base to this unit.

        :param tobase:
            The formula to convert from the this unit to the base.

        .. note:: frombase or tobase must be defined having the part that should be transformed as
            a %f, %s or just x
        """

        def MakeLambda(s: str) -> UnaryConversionFunc:
            s = s.replace("%s", "x").replace("%f", "x")
            assert "x" in s
            s.replace("x", "float(x)")
            ret = eval("lambda x:%s" % s)
            ret.__has_conversion__ = True
            return ret

        if isinstance(frombase, str):
            frombase_func = MakeLambda(frombase)
        else:
            frombase_func = frombase

        if isinstance(tobase, str):
            tobase_func = MakeLambda(tobase)
        else:
            tobase_func = tobase

        # The functions added to the UnitInfo must say whether they actually have some
        # conversion. If they do, and it's value is True, they may also have
        # __a__, __b__, __c__, __d__ identifying the posc convertions
        # (used for doing conversions for sci20 grid functions in C++)
        # If it's False, that means that the function will have no actual conversion.

        # If not identified, that means that they do have conversions associated
        if not hasattr(tobase_func, "__has_conversion__"):
            tobase_func.__has_conversion__ = True  # type:ignore[attr-defined]

        if not hasattr(frombase_func, "__has_conversion__"):
            frombase_func.__has_conversion__ = True  # type:ignore[attr-defined]

        self.name = name
        self.unit = unit
        self.frombase = frombase_func
        self.tobase = tobase_func
        self.quantity_type = quantity_type
        self.default_category = default_category

        if UnitInfo.ADD_STR_INFO_TO_UNIT_INFO:
            # must be added for the generation of the c++ version of the conversion
            self.frombase_str = frombase
            self.tobase_str = tobase

    def __hash__(self) -> int:
        return hash(self.unit)

    def __eq__(self, other: Any) -> bool:
        if not isinstance(self, UnitInfo):
            return False
        return self.unit == other.unit


@attr.s(auto_attribs=True)
class CategoryInfo:
    """
    Holds information about a category
    """

    category: str = ""
    quantity_type: str = ""
    valid_units: Optional[List[str]] = attr.Factory(list)  # type:ignore[assignment]
    valid_units_set: Set[str] = attr.Factory(set)
    default_unit: Optional[str] = ""
    default_value: float = 0.0
    min_value: Optional[float] = None
    max_value: Optional[float] = None
    is_min_exclusive: bool = False
    is_max_exclusive: bool = False
    caption: str = ""


T = TypeVar("T")
ConversionFunc = Callable[["UnitDatabase", str, str, str, T], T]


[docs] class UnitDatabase(Singleton): """ Registry with all the available quantity types and units that represent the physical units. Quantity Types represent the type of the unit, for instance length or temperature, as strings. Every quantity type has one or more units associated with it. Units represent one specific unit inside a quantity_type, for instance meters and centimeters for length. """ PREPOSITIONS_IN_CATEGORY_NAME = ("per", "of")
[docs] @classmethod def CreateDefaultSingleton(cls) -> "UnitDatabase": result = cls(default_singleton=True) cls.FillUnitDatabaseWithPosc(result) return result
# Some aliases to some quantity types _ADDITIONAL_CATEGORY_ALIASES = { "liquid volume flow rate": "volume flow rate", "gas volume flow rate": "volume flow rate", "liquid volume": "volume", "gas volume": "volume", "liquid volume per standard volume": "volume per standard volume", "gas volume per standard volume": "volume per standard volume", "date": "time", }
[docs] @classmethod def FillUnitDatabaseWithPosc( cls, unit_database: "UnitDatabase", fill_categories: bool = True ) -> "UnitDatabase": """ Fills a unit database with the posc values. :param UnitDatabase unit_database: The unit database to be filled. :param bool fill_categories: Indicates whether for each quantity type a category with the same name should be created. :rtype: UnitDatabase :returns: The unit database passed as a parameter. """ from .posc import FillUnitDatabaseWithPosc unit_database.Clear() FillUnitDatabaseWithPosc( unit_database, fill_categories=fill_categories, override_categories=True ) if fill_categories: for quantity_alias, quantity_type in cls._ADDITIONAL_CATEGORY_ALIASES.items(): unit_database.AddCategory(quantity_alias, quantity_type) return unit_database
[docs] def CheckValueForCategory( self, category: str, value: float, unit: Optional[str] = None ) -> None: """ :param category: The category to be checked. :param value: The value to be checked for the given category. :param unit: The unit of the value passed (if not available, the default value is considered). """ from ._quantity import ObtainQuantity quantity = ObtainQuantity(unit, category) quantity.CheckValue(value)
[docs] def CheckDefaultUnitDatabase(self) -> None: """ Checks if this is the default unit-database. If it's not a 'default' unit-database, an error is raised together with the stack trace from where it was originally created. :raises AssertionError: raises error if this was not created as the default unit database. """ if hasattr(self, "_database_created_from"): raise AssertionError( "Not default unit-database. Creation: \n-------\n%s\n-------\n" % (self._database_created_from,) )
# Additional conversions stored at the class (no point in storing them only in an instance, # even if it's a singleton). This is done because if a new database is created (say to support just # some specific units) it would be painfull to have to register all conversion functions manually. # This way we have a singleton that have the application units, other databases can be created # but all databases share the same conversion functions. # # dict of class supported -> conversion function # see RegisterAdditionalConversionType _additional_conversions: Dict[Type, ConversionFunc] = {} def __init__(self, default_singleton: bool = False): """ Initializes the unit manager without any quantity types. :param bool default_singleton: If True, this is the unit database that's created through the CreateDefaultSingleton method. This is needed because any constants created a module or class level must have a reference to the default unit-database and not for some other database created in tests (so, this flag is used together with CheckDefaultUnitDatabase later on). """ if not default_singleton: # If this is not the default singleton, mark from where was it created if we need # to check later on. from io import StringIO s = StringIO() traceback.print_stack(file=s) self._database_created_from = s.getvalue() # Quantities must be cached accordingly to the current unit-database. self.quantities_cache: Dict[Hashable, "Quantity"] = {} # Dictionary to cache whether a unit is valid in a category. self._category_unit_valid: Dict[Tuple[str, str], bool] = {} # dict of quantity_type => list of UnitInfo (the first unit in this list is the base unit for # the given quantity type) self.quantity_types: Dict[str, List[UnitInfo]] = {} # dict of unit name => UnitInfo self.unit_to_unit_info: Dict[str, UnitInfo] = {} self.categories_to_quantity_types: Dict[str, CategoryInfo] = {} # ----------------------------------------------- The interfaces below all work with the category @classmethod def FillSimple(cls, unit_database: "UnitDatabase") -> None: unit_database.AddUnitBase("length", "meters", "m") unit_database.AddUnit("length", "milimeters", "mm", "%f * 1000.0", "%f / 1000.0") unit_database.AddUnit("length", "centimeters", "cm", "%f * 100.0", "%f / 100.0") unit_database.AddUnit("length", "kilometers", "km", "%f / 1000.0", "%f * 1000.0") unit_database.AddUnitBase("time", "seconds", "s") unit_database.AddUnit("time", "minutes", "min", "%f * 60.0", " %f * 60.0") unit_database.AddUnit("time", "hours", "h", "%f * 3600.0", " %f * 3600.0") unit_database.AddUnit("time", "days", "d", "%f * 86400.0", " %f * 86400.0") unit_database.AddCategory("length", "length") unit_database.AddCategory("time", "time")
[docs] def AddCategory( self, category: str, quantity_type: Optional[str] = None, valid_units: Optional[List[str]] = None, override: bool = False, default_unit: Optional[str] = None, default_value: Optional[float] = None, min_value: Optional[float] = None, max_value: Optional[float] = None, is_min_exclusive: bool = False, is_max_exclusive: bool = False, caption: str = "", from_category: Optional[str] = None, ) -> CategoryInfo: """Adds a category to the unit-management. If it already exists, throws an error if override is not set to True :param category: The category to be added to the unit-management. :param quantity_type: The quantity type that this category maps to. :param valid_units: A set of the valid units for the given category. :param override: Whether to replace the quantity type for the category. :param default_unit: The default unit for the category :param default_value: The default value for the category :param min_value: Minimum value acceptable. If None, any value can be set. :param max_value: Maximum value acceptable. If None, any value can be set. :param is_min_exclusive: If the min_value given is exclusive. :param is_max_exclusive: If the max_value given is exclusive. :param caption: User friendly caption for this category. :param from_category: Category to copy other parameters from. :raises UnitsError: If the category was already added and override is not set to True """ CheckType(category, str) if from_category and quantity_type: raise ValueError("cannot pass both quantity_type and from_category") if not override and category in self.categories_to_quantity_types: raise UnitsError("category %r already registered" % category) if min_value is not None and max_value is not None: if max_value < min_value: raise ValueError( "min_value ({}) must be >= than min_value ({})".format(min_value, max_value) ) if from_category: category_info = self.GetCategoryInfo(from_category) quantity_type = category_info.quantity_type if valid_units is None: valid_units = category_info.valid_units if default_unit is None: default_unit = category_info.default_unit if default_value is None: default_value = category_info.default_value if min_value is None: min_value = category_info.min_value if max_value is None: max_value = category_info.max_value if is_min_exclusive is None: is_min_exclusive = category_info.is_min_exclusive if is_max_exclusive is None: is_max_exclusive = category_info.is_max_exclusive if caption is None: caption = category_info.caption assert quantity_type is not None # check if valid_units should inherit from the quantity_type if valid_units is not None: # valid units given: check if all the given units are valid quantity_units = set(self.GetUnits(quantity_type)) for i, unit in enumerate(valid_units): was_unit_fixed, fixed_unit = FixUnitIfIsLegacy(unit) if was_unit_fixed: valid_units[i] = fixed_unit if fixed_unit not in quantity_units: msg = "unit %r is not valid for quantity type %r.\nQuantity units: %r" raise ValueError(msg % (unit, quantity_type, sorted(quantity_units))) # if (min_value is not None or max_value is not None) and default_unit is None: if default_unit is None: default_unit = self.GetBaseUnit(quantity_type) if valid_units and default_unit not in valid_units: default_unit = valid_units[0] else: quantity_units = set(self.GetUnits(quantity_type)) was_unit_fixed, fixed_default_unit = FixUnitIfIsLegacy(default_unit) if was_unit_fixed: default_unit = fixed_default_unit if default_unit not in quantity_units: raise ValueError( "unit %r is not valid for default quantity type %r" % (default_unit, quantity_type) ) # caption if not caption: caption = category.title() # Do not Title prepositions for word in self.PREPOSITIONS_IN_CATEGORY_NAME: caption = caption.replace(word.title() + " ", word + " ") # if the default_value has not been passed # 1) min_value defined? ---> min_value # else # 2) max_value defined? ---> max_value # else # 3) zero if default_value is None: if is_min_exclusive or is_max_exclusive: raise RuntimeError("default_value must be supplied") elif min_value is not None: default_value = min_value elif max_value is not None: default_value = max_value else: default_value = 0.0 else: # the default_value is defined msg = "Error while adding category %s: default_value %f %s %f" if min_value is not None: if is_min_exclusive: assert default_value > min_value, msg % ( category, default_value, "must be >", min_value, ) else: assert default_value >= min_value, msg % ( category, default_value, "must be >=", min_value, ) if max_value is not None: if is_max_exclusive: assert default_value < max_value, msg % ( category, default_value, "must be <", max_value, ) else: assert default_value <= max_value, msg % ( category, default_value, "must be <=", max_value, ) info = CategoryInfo( category=category, quantity_type=quantity_type, valid_units=valid_units, valid_units_set=set(valid_units) if valid_units is not None else set(), default_unit=default_unit, default_value=default_value, min_value=min_value, max_value=max_value, is_min_exclusive=is_min_exclusive, is_max_exclusive=is_max_exclusive, caption=caption, ) self.categories_to_quantity_types[category] = info return info
[docs] def IsValidCategory(self, category: str) -> bool: """ Check if the given category is valid into the unit database. :param category: The category to check the validity. :returns: True is it is a valid category; otherwise False. """ return category in self.categories_to_quantity_types
[docs] def IterCategories(self) -> Iterator[str]: """ Iterator for all the categories. :returns: An iterator that'll provide all the categories. """ return iter(self.categories_to_quantity_types.keys())
[docs] def GetCategoryInfo(self, category: str) -> CategoryInfo: """ :param category: The category we're interested in. :returns: The category info for the category passed. """ try: return self.categories_to_quantity_types[category] except KeyError: categories_str = "" for cat in sorted(self.categories_to_quantity_types.keys()): if cat is None: cat = "None" categories_str += cat + "\n" raise InvalidQuantityTypeError( 'The category: "%s" is not added to the unit manager.\n' "--- Available ---:\n%s" % (category, categories_str) )
[docs] def GetCategoryQuantityType(self, category: str) -> str: """ :returns: The quantity type of some category. """ return self.GetCategoryInfo(category).quantity_type
[docs] def FindUnitCase(self, category: str, unit: str) -> str: """ Given a unit in any case, returns a unit that is a match with the correct case within the unit-database. :param category: The category for the given unit :param unit: The unit that should be used to match the case. :returns: the unit considering the actual case used within the unit database. :raises AssertionError: if no unit could be found or more than one was found (there should be only 1 match considering it in a case-insensitive way). """ category_info = self.GetCategoryInfo(category) infos = self.GetInfos(category_info.quantity_type) matched = [] unit_lower = unit.lower() for info in infos: if info.unit.lower() == unit_lower: matched.append(info.unit) if len(matched) == 1: return matched[0] else: raise AssertionError( "Expected 1 match not considering case for: '%s'. Found: %s" % (unit, [u for u in matched]) )
[docs] def CheckCategoryUnit(self, category: str, unit: str) -> None: """ Check if the given category accepts the passed unit. :raises InvalidUnitError: if the unit provided is not accepted for this category """ assert category.__class__ == str, f"Expected unit of type str, found {category}" assert unit.__class__ == str, f"Expected unit of type str, found {unit}" key = (category, unit) try: # i.e.: if not valid (valid = self._category_unit_valid[key]) if not self._category_unit_valid[key]: raise InvalidUnitError(unit, None, category) except KeyError: if category.__class__ != str: raise TypeError("Only str is accepted. %s is not." % category.__class__) try: category_info = self.GetCategoryInfo(category) # When setting a unit, leave the user set any unit from the quantity type, even if there's # a subset for the category (the idea is that the units for the category are only used to # filter them in the UI, not really to do internal validations). self.CheckQuantityTypeUnit(category_info.quantity_type, unit) valid = True except UnitsError: valid = False self._category_unit_valid[key] = valid if not valid: raise InvalidUnitError(unit, None, category)
[docs] def GetValidUnits(self, category: str) -> List[str]: """ :rtype: list(str) :returns: The valid units for a given category. If the valid categories weren't given uses the valid units from quantity type. """ # Special case: the empty category, as generated by Quantity.CreateEmpty(), has no valid units. if category == "": return [] category_info = self.GetCategoryInfo(category) if category_info.valid_units is not None: return category_info.valid_units else: if category_info.quantity_type != category: return self.GetValidUnits(category_info.quantity_type) # the valid units have not been specified for the given category (so, let's return # the units for the quantity type) return self.GetUnits(category_info.quantity_type)
[docs] def GetDefaultValue(self, category: str) -> float: """ :returns: The default value for the given category. """ category_info = self.GetCategoryInfo(category) return category_info.default_value
[docs] def GetDefaultUnit(self, category: str) -> str: """ :returns: The default unit for the given category. .. note:: This method shouldn't return None, when the default_unit isn't defined for the category the quantity type base unit is used. """ category_info = self.GetCategoryInfo(category) return category_info.default_unit or ""
# --------------------- The interfaces below all work with the quantity type and not the category
[docs] def AddUnit( self, quantity_type: str, name: str, unit: str, frombase: Union[str, UnaryConversionFunc], tobase: Union[str, UnaryConversionFunc], default_category: Optional[str] = None, ) -> None: """ Registers a new unit type. :param quantity_type: The quantity type for the added unit. :param name: A user-friendly name for this unit. :param unit: The unit to be added. :param frombase: If string, an expression to convert from the base unit of this quantity_type to this unit. If callable, must accept a float value that applies the conversion. :param tobase: If a string, an expression to convert from this unit to the base unit. If callable, must accept a float value that applies the conversion. .. note:: Each expression must refer to %f or x as the current value of the unit. :param default_category: The default category for the added unit (if any). """ assert quantity_type is not None if unit.__class__ != str: raise TypeError("Only str is accepted. %s is not." % unit.__class__) if unit is None: unit = name info = UnitInfo( quantity_type, name, unit, frombase, tobase, default_category=default_category ) if unit in self.unit_to_unit_info: raise RuntimeError( "Unit: %s already added to the unit database for the quantity type: %s (trying to add to: %s)" % (unit, self.unit_to_unit_info[unit].quantity_type, quantity_type) ) else: self.unit_to_unit_info[unit] = info quantity_type_list = self.quantity_types.setdefault(quantity_type, []) if unit in [q.unit for q in quantity_type_list]: raise RuntimeError("Unit already registered: {} ({})".format(name, unit)) quantity_type_list.append(info)
[docs] def AddUnitBase(self, quantity_type: str, name: str, unit: str) -> None: """ Add a base unit type. Inside each quantity_type, there must be at least one Base unit. Parameters have the same meaning as in AddUnit(). """ def identity(x: Any) -> Any: return x identity.__has_conversion__ = False # type:ignore[attr-defined] self.AddUnit(quantity_type, name, unit, identity, identity) # move the base info to the first position # (that's a convention: the base is always in the first position) infos = self.quantity_types[quantity_type] base = infos[-1] # was appended to the end in Units.Add del infos[-1] infos.insert(0, base)
[docs] def GetBaseUnit(self, quantity_type: str) -> Optional[str]: """ :param quantity_type: The quantity type for which we want a base unit. :returns: The base unit of the given quantity_type. :raises InvalidQuantityTypeError: if the quantity type is not valid """ try: infos = self.quantity_types[quantity_type] return infos[0].unit except KeyError: self.CheckQuantityType(quantity_type) return None
[docs] def GetDefaultCategory(self, unit: str) -> Optional[str]: """ :param unit: The unit for which we want the category. :rtype: str or None :returns: The default category for the passed unit. """ try: unit_info = self.unit_to_unit_info[unit] except KeyError: is_legacy, fixed_unit = FixUnitIfIsLegacy(unit) if not is_legacy: return None unit_info = self.unit_to_unit_info[fixed_unit] category = unit_info.default_category if category: return category category = unit_info.quantity_type if category in self.categories_to_quantity_types: return category return None
[docs] def GetQuantityType(self, unit: str) -> Optional[str]: """ :returns: A quantity_type that contains the respective unit. """ try: return self.unit_to_unit_info[unit].quantity_type except KeyError: return None
[docs] def FindSimilarUnitMatches(self, unit: str) -> List[str]: """ This function will use heuristics to find similar units in the unit database to the passed unit. :param unit: The unit which doesn't have a direct match in the unit dabatase. :returns: Returns a list with possible matches for the passed unit, sorted. """ import re compiled = re.compile(r"[\./]") unit_split = compiled.split(unit.lower()) close_match = [] for existing_unit in self.unit_to_unit_info.keys(): existing_unit_split = compiled.split(existing_unit.lower()) if len(existing_unit_split) == len(unit_split): for a, b in zip(existing_unit_split, unit_split): if not a.startswith(b) and not b.startswith(a): break else: close_match.append(existing_unit) return sorted(close_match)
[docs] def GetQuantityTypes(self) -> List[str]: """ :returns: A list of the available categories, sorted. """ return sorted(self.quantity_types.keys())
[docs] def GetUnits(self, quantity_type: Optional[str] = None) -> List[str]: """ :return: The units of that quantity_type (if quantity_type is given) otherwise, returns all available units. """ return [x.unit for x in self.GetInfos(quantity_type)]
[docs] def GetUnitName(self, quantity_type: str, unit: str) -> str: """ :returns: The user-friendly name for the given unit. """ info = self.GetInfo(quantity_type, unit) return info.name
[docs] def GetUnitNames(self, quantity_type: str) -> List[str]: """ :returns: The user-friendly names for all the units in the given quantity_type. :raises InvalidQuantityTypeError: """ return [x.name for x in self.GetInfos(quantity_type)]
[docs] def GetInfo( self, quantity_type: str, unit: str, fix_unknown: bool = False, fix_legacy: bool = True ) -> UnitInfo: """ :param bool fix_unknown: If True won't raise error if quantity_type is unkwnown (and unit may be anything). Returns the unknown unit info in this situation. :param bool fix_legacy: If True and `unit` is in _LEGACY_TO_CURRENT, then it will return the current equivalent unit info. :rtype: UnitInfo :returns: The unit object registered with the given unit @raise InvalidQuantityTypeError @raise InvalidUnitError """ def TryToGetUnitInfoFromUnit(unit: str) -> Optional[UnitInfo]: """ This is the common case, where the unit matches the quantity type registered. """ try: # Common case: unit matches the quantity type registered. unit_info = self.unit_to_unit_info[unit] if quantity_type == unit_info.quantity_type: return unit_info except KeyError: pass # Just ignore and go through the 'uncommon' case. return None unit_info = TryToGetUnitInfoFromUnit(unit) if unit_info is not None: return unit_info # First check if the quantity_type is a registered category try: category_info = self.categories_to_quantity_types[quantity_type] except KeyError: pass else: quantity_type = category_info.quantity_type try: quantity_types = self.quantity_types[quantity_type] except KeyError: raise InvalidQuantityTypeError(quantity_type, sorted(self.quantity_types.keys())) else: for info in quantity_types: if info.unit == unit: return info else: if fix_unknown: # Before actually triggering the error, handle the unknown case: # We can have an unknown quantity type with a 'known' unit (i.e.: the reader # says it's unknown, but we get a proper label for it in the UI, thus, it's a # known 'unknown' quantity). So, in this case, proceed returning the unknown # quantity unit information. from ._unit_constants import UNKNOWN_QUANTITY_TYPE from ._unit_constants import UNKNOWN_UNIT if quantity_type == UNKNOWN_QUANTITY_TYPE: quantity_types = self.quantity_types[quantity_type] for info in quantity_types: if info.unit == UNKNOWN_UNIT: return info if fix_legacy: is_legacy, fixed_unit = FixUnitIfIsLegacy(unit) if is_legacy: unit_info = TryToGetUnitInfoFromUnit(fixed_unit) if unit_info is not None: return unit_info raise InvalidUnitError( unit, quantity_type, valid_units=sorted([info.unit for info in quantity_types]) )
[docs] def GetInfos(self, quantity_type: Optional[str] = None) -> List[UnitInfo]: """ :returns: All UnitInfos from that quantity_type (if quantity_type is given), otherwise return all UnitInfos. @raise InvalidQuantityTypeError """ if quantity_type is None: all_infos = [] for infos in self.quantity_types.values(): all_infos.extend(infos) return all_infos else: try: return self.quantity_types[quantity_type] except KeyError: raise InvalidQuantityTypeError(quantity_type, list(self.quantity_types.keys()))
[docs] def CheckQuantityType(self, quantity_type: str) -> None: """ Checks if the quantity type is valid. If it is not, raise an InvalidQuantityTypeError. @raise InvalidQuantityTypeError @raise InvalidUnitError """ if quantity_type not in self.quantity_types: raise InvalidQuantityTypeError(quantity_type, sorted(self.quantity_types))
[docs] def CheckQuantityTypeUnit(self, quantity_type: str, unit: str) -> None: """ Check if the given quantity type has the given unit. @raise InvalidUnitError """ # NOTE: Using `fix_legacy=False` because when don't want to fix the legacy unit # at this point, we actually want `InvalidUnitError` is the unit is # legacy self.GetInfo(quantity_type, unit, fix_legacy=False)
def _ConvertWithExp( self, quantity_type: str, from_unit_exps: Sequence[UnitExponentTuple], to_unit_exps: Sequence[UnitExponentTuple], value: float, ) -> float: """ Converts a value from one unit to another unit considering that the units from and to have exponents. :param quantity_type: The quantity type that has the from and to units. :param from_unit_exps: The unit and exponent of the "from" unit. :param to_unit_exps: The unit and exponent of the "to" unit. :param float value: The value to be converted. :rtype: float :returns: Returns the converted value. """ len_from_unit = len(from_unit_exps) len_to_unit = len(to_unit_exps) if len_to_unit == 0 or len_from_unit == 0: return value if len_from_unit != 1: raise ComposedUnitError( "Can only convert one unit to another (not a composed unit at this point)" ) if len_to_unit != 1: raise ComposedUnitError( "Can only convert one unit to another (not a composed unit at this point)" ) from_unit, from_exp = from_unit_exps[0] to_unit, to_exp = to_unit_exps[0] if from_exp != to_exp: raise ValueError( "Cannot convert among different exponents (%s) to (%s)" % ((from_unit, from_exp), (to_unit, to_exp)) ) if from_exp == to_exp == 1: # Special case handling return self.Convert(quantity_type, from_unit, to_unit, value) negative = False if value < 0.0: negative = True value = abs(value) value = math.pow(value, 1.0 / from_exp) # Convert from the exponent value = self.Convert(quantity_type, from_unit, to_unit, value) ret = math.pow(value, to_exp) if negative: return -ret return ret
[docs] @classmethod def RegisterAdditionalConversionType(cls, class_: Type, func: ConversionFunc) -> None: """ This function may be used to register conversions for additional classes, not originally treated (e.g.: IGridFunction) :param type class_: This is the class that should be treated. So, when a conversion is requested, if a class is an instance of the class passed, it'll be used to do the conversions. :param callable func: The function that should do the conversion. It'll be called as func(unit_database, quantity_type, from_unit_info, to_unit_info, value) """ if class_ not in cls._additional_conversions: cls._additional_conversions[class_] = func else: assert ( cls._additional_conversions[class_] == func ), "The class %s already has a convertion function registered" % (str(class_))
[docs] def Convert( self, category_or_quantity_type: str, from_unit: Union[str, Sequence[UnitExponentTuple]], to_unit: Union[str, Sequence[UnitExponentTuple]], value: Any, ) -> Any: """ Converts a value from one unit to another unit (given the quantity type that contains both units), so, note that the quantity type at this point is always the same (can't convert units with quantity types that don't match). :param category_or_quantity_type: The category or quantity type for doing the conversion (if it's a category it's converted into the quantity type inside this method). :param from_unit: A string determining the unit from the value we want to convert or a list of exponents. :param to_unit: A string determining to convert the value to or a list of exponents. :param value: The value that should be converted. .. note:: that from_unit and to_unit must only have 1 item at this point. :returns: The converted value """ supported_types = tuple(self._additional_conversions) convert_function: Optional[ConversionFunc] = None if isinstance(value, supported_types): for key, convert_function in self._additional_conversions.items(): if isinstance(value, key): break # keep convert_function for later use else: assert False # operations with exponents... from_is_list = from_unit.__class__ in (list, tuple) to_is_list = to_unit.__class__ in (list, tuple) if from_is_list or to_is_list: from_unit_exps = cast(list, from_unit) if from_is_list else [(cast(str, from_unit), 1)] to_unit_exps = cast(list, to_unit) if to_is_list else [(cast(str, to_unit), 1)] # same unit: no conversion needed if from_unit_exps == to_unit_exps: return value if ( category_or_quantity_type.__class__ in (list, tuple) and len(category_or_quantity_type) == 1 ): category_or_quantity_type = category_or_quantity_type[0] return self._ConvertWithExp( category_or_quantity_type, from_unit_exps, to_unit_exps, value ) from_unit = cast(str, from_unit) to_unit = cast(str, to_unit) # same unit: no conversion needed if from_unit == to_unit: return value # simple operations (same exponent) try: quantity_type = self.categories_to_quantity_types[ category_or_quantity_type ].quantity_type except KeyError: self.CheckQuantityType(category_or_quantity_type) quantity_type = category_or_quantity_type if convert_function is not None: return convert_function(self, quantity_type, from_unit, to_unit, value) this = self.GetInfo(quantity_type, from_unit, fix_unknown=True) other = self.GetInfo(quantity_type, to_unit, fix_unknown=True) if isinstance(value, (float, int)): return other.frombase(this.tobase(value)) else: # list / tuple frombase = other.frombase tobase = this.tobase values_gen = (frombase(tobase(v)) for v in value) if isinstance(value, tuple): return tuple(values_gen) else: return list(values_gen)
[docs] def Clear(self) -> None: """ Removes all the quantity types registered. """ self.quantity_types.clear() self.categories_to_quantity_types.clear() self.unit_to_unit_info.clear() self.quantities_cache.clear() self._category_unit_valid.clear()
# Operations with different quantities --------------------------------------------------------- def _DoOperationWithSameQuantity( self, quantity1: "Quantity", quantity2: "Quantity", value1: T, value2: T, operation: Callable[[T, T], T], ) -> Tuple["Quantity", T]: """ Given 2 quantities, do an operation that DOES NOT accept the creation of a new composed quantity (e.g.: sum, subtraction) """ if quantity1 == quantity2: return quantity1, operation(value1, value2) else: # otherwise, we must transform the units in the quantity1 to their counterparts # in the quantity2 (without changing anything in the categories at this time) category_to_unit_and_exp1 = copy.deepcopy(quantity1.GetCategoryToUnitAndExps()) category_to_unit_and_exp2 = copy.deepcopy(quantity2.GetCategoryToUnitAndExps()) ( category_to_unit_and_exp1, category_to_unit_and_exp2, value1, value2, ) = self._MatchQuantities( category_to_unit_and_exp1, category_to_unit_and_exp2, value1, value2 ) # at this point the quantity types and units must be the same (the categories may actually # be different) quantity1 = quantity1.CreateCopyInstance(category_to_unit_and_exp1) quantity2 = quantity2.CreateCopyInstance(category_to_unit_and_exp2) composing_units1 = set(quantity1.GetComposingUnitsJoiningExponents()) composing_units2 = set(quantity2.GetComposingUnitsJoiningExponents()) if composing_units1 != composing_units2: if len(composing_units1) == 0: # no unit in the 1st part (just take the 2nd as the correct one) quantity1 = quantity2 elif len(composing_units2) == 0: pass # ok, no units in the second part... else: raise InvalidOperationError( "Error. Can't do operation because units don't match: (%s != %s)" % (composing_units1, composing_units2) ) return quantity1, operation(value1, value2) def Sum( self, quantity1: "Quantity", quantity2: "Quantity", value1: T, value2: T ) -> Tuple["Quantity", T]: func = lambda a, b: a + b return self._DoOperationWithSameQuantity(quantity1, quantity2, value1, value2, func) def Subtract( self, quantity1: "Quantity", quantity2: "Quantity", value1: T, value2: T ) -> Tuple["Quantity", T]: func = lambda a, b: a - b return self._DoOperationWithSameQuantity(quantity1, quantity2, value1, value2, func) def Divide( self, quantity1: "Quantity", quantity2: "Quantity", value1: T, value2: T ) -> Tuple["Quantity", T]: return self._DoOperationResultingInNewQuantity( quantity1, quantity2, value1, value2, lambda a, b: a - b, lambda a, b: a / b ) def FloorDivide( self, quantity1: "Quantity", quantity2: "Quantity", value1: T, value2: T ) -> Tuple["Quantity", T]: return self._DoOperationResultingInNewQuantity( quantity1, quantity2, value1, value2, lambda a, b: a - b, lambda a, b: a // b )
[docs] def Multiply( self, quantity1: "Quantity", quantity2: "Quantity", value1: T, value2: T ) -> Tuple["Quantity", T]: """ Multiplication with different quantities. Rationale: - the multiplication of different quantities can result in a derived quantity type. - the quantities / values should be transformed if there are quantity types that are equal in bot quantities (e.g.: if there's cm in the quantity1 and m in the quantity2, the value1 and the quantity1 must be converted to m before proceeding with the actual multiplication) - note that if a given quantity type has different units in the quantity1 or in the quantity2, they must 1st be converted to have the same unit in a given quantity type. - after the quantities are compatible, it's just a matter of creating a join of the available units and summing the expoents to create the resulting quantity (and multiplying the actual values). :type quantity1: IQuantity (related to value1) :param quantity1: :type quantity2: IQuantity (related to value2) :param quantity2: :type value1: iterable or number :param value1: :type value2: iterable or number :param value2: :rtype: tuple(IQuantity, value) """ return self._DoOperationResultingInNewQuantity( quantity1, quantity2, value1, value2, lambda a, b: a + b, lambda a, b: a * b )
def _MatchQuantities( self, category_to_unit_and_exp1: Any, category_to_unit_and_exp2: Any, value1: Any, value2: Any, ) -> Tuple[Any, Any, Any, Any]: """ Matches all the units for a given quantity type (so, if a unit 'm' is found, if a 'cm' is later found, convert it to 'm' -- as well as it's composing value). :rtype: the dicts passed with the quantity types matched to the same units and the corresponding values converted to match those changes. """ quantity_types_found_to_used_unit: Dict[Any, Any] = {} # 1st thing is putting the same unit for a given quantity type (both sides) for c in (category_to_unit_and_exp1, category_to_unit_and_exp2): for category, unit_exp in list(c.items()): unit, _exp = unit_exp quantity_type = self.GetCategoryQuantityType(category) used_unit_for_quantity_type = quantity_types_found_to_used_unit.get(quantity_type) if used_unit_for_quantity_type is None: quantity_types_found_to_used_unit[quantity_type] = unit else: # don't worry about the exponent at this time, just update the unit and the related value. if c is category_to_unit_and_exp1: value1 = self.Convert( quantity_type, unit, used_unit_for_quantity_type, value1 ) else: value2 = self.Convert( quantity_type, unit, used_unit_for_quantity_type, value2 ) unit_exp[0] = used_unit_for_quantity_type return category_to_unit_and_exp1, category_to_unit_and_exp2, value1, value2 def _DoOperationResultingInNewQuantity( self, quantity1: "Quantity", quantity2: "Quantity", value1: Any, value2: Any, operation_exp: Any, operation: Any, ) -> Any: """ Given 2 quantities, do an operation that accepts the creation of a new quantity that is composed of the quantities passed (e.g.: division, multiplication). """ from ._quantity import Quantity # otherwise, we must transform the units in the quantity1 to their counterparts # in the quantity2 (without changing anything in the categories at this time) category_to_unit_and_exp1 = quantity1.GetCategoryToUnitAndExpsCopy() category_to_unit_and_exp2 = quantity2.GetCategoryToUnitAndExpsCopy() ( category_to_unit_and_exp1, category_to_unit_and_exp2, value1, value2, ) = self._MatchQuantities( category_to_unit_and_exp1, category_to_unit_and_exp2, value1, value2 ) # add the categories to the resulting one for category2, (unit2, exp2) in list(category_to_unit_and_exp2.items()): if category2 not in category_to_unit_and_exp1: exp1 = 0 category_to_unit_and_exp1[category2] = [unit2, operation_exp(exp1, exp2)] else: unit_exp1 = category_to_unit_and_exp1[category2] unit1, exp1 = unit_exp1 # type:ignore[assignment] if unit1 == unit2: unit_exp1[1] = operation_exp(exp1, exp2) # type:ignore[index] else: raise RuntimeError( "This should've been covered already ({} != {}).".format(unit1, unit2) ) # unit -> expoent only_units_expoents: Dict[Any, Any] = {} for c, (unit, exp) in list(category_to_unit_and_exp1.items()): existing = only_units_expoents.get(unit, 0) only_units_expoents[unit] = existing + exp # remove the ones that have exponent = 0 for c, (unit, exp) in list(category_to_unit_and_exp1.items()): if exp == 0 or only_units_expoents[unit] == 0: # that's ok, it's been removed from the expression... del category_to_unit_and_exp1[c] result = Quantity.CreateDerived(category_to_unit_and_exp1) return result, operation(value1, value2)
class RegisterConversion: _registered = False @classmethod def RegisterNumpyConversion(cls) -> None: """ Register a special unit conversion for numpy arrays. :param UnitDatabase db: The unit-database instance to register the conversion into. """ import numpy def ConvertNumpyArray( db: UnitDatabase, quantity_type: str, from_unit: str, to_unit: str, array: Any ) -> Any: """ Converts the given numpy array by applying the conversion as if it was a scalar, since arrays support the numeric operators, which are applied element-wise. """ from_unit_info = db.GetInfo(quantity_type, from_unit, fix_unknown=True) to_unit_info = db.GetInfo(quantity_type, to_unit, fix_unknown=True) to_base = from_unit_info.tobase from_base = to_unit_info.frombase return from_base(to_base(array)) UnitDatabase.RegisterAdditionalConversionType(numpy.ndarray, ConvertNumpyArray) RegisterConversion.RegisterNumpyConversion()