from unyt import UnitRegistry, Unit, array
import unyt
from unyt import *
from unyt import unit_object
from numbers import Number as numeric_type
from types import MethodType
import numpy as np
from unyt.array import _coerce_iterable_units,_preserve_units,_comparison_unit
from unyt.array import _get_binary_op_return_class,_arctan2_unit,_multiply_units, _divide_units
from unyt.exceptions import (
InvalidUnitOperation,
MissingMKSCurrent,
MKSCGSConversionError,
UnitConversionError,
UnitsNotReducible,
UnitOperationError,
InvalidUnitEquivalence,
UnitParseError,
IterableUnitCoercionError
)
from unyt.dimensions import (
angle,
base_dimensions,
dimensionless,
temperature,
current_mks,
#logarithmic
)
from numpy import (
add,
subtract,
multiply,
divide,
logaddexp,
logaddexp2,
true_divide,
floor_divide,
negative,
power,
remainder,
mod,
absolute,
rint,
sign,
conj,
exp,
exp2,
log,
log2,
log10,
expm1,
log1p,
sqrt,
square,
reciprocal,
sin,
cos,
tan,
arcsin,
arccos,
arctan,
arctan2,
hypot,
sinh,
cosh,
tanh,
arcsinh,
arccosh,
arctanh,
deg2rad,
rad2deg,
bitwise_and,
bitwise_or,
bitwise_xor,
invert,
left_shift,
right_shift,
greater,
greater_equal,
less,
less_equal,
not_equal,
equal,
logical_and,
logical_or,
logical_xor,
logical_not,
maximum,
minimum,
fmax,
fmin,
isreal,
iscomplex,
isfinite,
isinf,
isnan,
signbit,
copysign,
nextafter,
modf,
ldexp,
frexp,
fmod,
floor,
ceil,
trunc,
fabs,
spacing,
divmod as divmod_,
isnat,
ones_like,
matmul
)
trigonometric_operators = (sin, cos, tan)
POWER_SIGN_MAPPING = {multiply: 1, divide: -1}
class _ImportCache(object):
__slots__ = ["_ua", "_uq"]
def __init__(self):
self._ua = None
self._uq = None
@property
def ua(self):
if self._ua is None:
from unyt.array import unyt_array
self._ua = unyt_array
return self._ua
@property
def uq(self):
if self._uq is None:
from unyt.array import unyt_quantity
self._uq = unyt_quantity
return self._uq
_import_cache_singleton = _ImportCache()
[docs]
def mult(self, u):
""" Multiply Unit with u (Unit object). """
if not getattr(u, "is_Unit", False):
data = np.array(u, subok=True)
unit = getattr(u, "units", None)
if unit is not None:
if self.dimensions is logarithmic:
raise InvalidUnitOperation(
"Tried to multiply '%s' and '%s'." % (self, unit)
)
units = unit * self
else:
units = self
if data.dtype.kind not in ("f", "u", "i", "c"):
raise InvalidUnitOperation(
"Tried to multiply a Unit object with '%s' (type %s). "
"This behavior is undefined." % (u, type(u))
)
if data.shape == ():
return _import_cache_singleton.uq(data, units, bypass_validation=True)
return _import_cache_singleton.ua(data, units, bypass_validation=True)
elif self.dimensions is logarithmic and not u.is_dimensionless:
raise InvalidUnitOperation("Tried to multiply '%s' and '%s'." % (self, u))
elif u.dimensions is logarithmic and not self.is_dimensionless:
raise InvalidUnitOperation("Tried to multiply '%s' and '%s'." % (self, u))
base_offset = 0.0
if self.base_offset or u.base_offset:
if u.dimensions in (temperature, angle) and self.is_dimensionless:
base_offset = u.base_offset
elif self.dimensions in (temperature, angle) and u.is_dimensionless:
base_offset = self.base_offset
else:
raise InvalidUnitOperation(
"Quantities with dimensions of angle or units of "
"Fahrenheit or Celsius cannot be multiplied."
)
return Unit(
self.expr * u.expr,
base_value=(self.base_value * u.base_value),
base_offset=base_offset,
dimensions=(self.dimensions * u.dimensions),
registry=self.registry,
)
#unit.__class__=type('Unit',(Unit,),{'__mul__':mult})
#unyt.unit_object.Unit = unit
[docs]
def truediv(self, u):
""" Divide Unit by u (Unit object). """
if not isinstance(u, Unit):
if isinstance(u, (numeric_type, list, tuple, np.ndarray)):
from unyt.array import unyt_quantity
return unyt_quantity(1.0, self) / u
else:
raise InvalidUnitOperation(
"Tried to divide a Unit object by '%s' (type %s). This "
"behavior is undefined." % (u, type(u))
)
base_offset = 0.0
if self.base_offset or u.base_offset:
if self.dimensions in (temperature, angle) and u.is_dimensionless:
base_offset = self.base_offset
'''else:
raise InvalidUnitOperation(
"Quantities with units of Farhenheit "
"and Celsius cannot be divided."
)'''
return Unit(
self.expr / u.expr,
base_value=(self.base_value / u.base_value),
base_offset=base_offset,
dimensions=(self.dimensions / u.dimensions),
registry=self.registry,
)
[docs]
def array_ufunc(self, ufunc, method, *inputs, **kwargs):
func = getattr(ufunc, method)
if "out" not in kwargs:
out = None
out_func = None
else:
# we need to get both the actual "out" object and a view onto it
# in case we need to do in-place operations
out = kwargs.pop("out")[0]
if out.dtype.kind in ("u", "i"):
new_dtype = "f" + str(out.dtype.itemsize)
float_values = out.astype(new_dtype)
out.dtype = new_dtype
np.copyto(out, float_values)
out_func = out.view(np.ndarray)
if len(inputs) == 1:
# Unary ufuncs
inp = inputs[0]
u = getattr(inp, "units", None)
if u.dimensions is angle and ufunc in trigonometric_operators:
# ensure np.sin(90*degrees) works as expected
inp = inp.in_units("radian").v
# evaluate the ufunc
out_arr = func(np.asarray(inp), out=out_func, **kwargs)
if ufunc in (multiply, divide) and method == "reduce":
# a reduction of a multiply or divide corresponds to
# a repeated product which we implement as an exponent
mul = 1
power_sign = POWER_SIGN_MAPPING[ufunc]
if "axis" in kwargs and kwargs["axis"] is not None:
unit = u ** (power_sign * inp.shape[kwargs["axis"]])
else:
unit = u ** (power_sign * inp.size)
else:
# get unit of result
mul, unit = self._ufunc_registry[ufunc](u)
# use type(self) here so we can support user-defined
# subclasses of unyt_array
ret_class = type(self)
elif len(inputs) == 2:
# binary ufuncs
i0 = inputs[0]
i1 = inputs[1]
# coerce inputs to be ndarrays if they aren't already
inp0 = _coerce_iterable_units(i0)
inp1 = _coerce_iterable_units(i1)
u0 = getattr(i0, "units", None) or getattr(inp0, "units", None)
u1 = getattr(i1, "units", None) or getattr(inp1, "units", None)
ret_class = _get_binary_op_return_class(type(i0), type(i1))
if u0 is None:
u0 = Unit(registry=getattr(u1, "registry", None))
if u1 is None and ufunc is not power:
u1 = Unit(registry=getattr(u0, "registry", None))
elif ufunc is power:
u1 = inp1
if inp0.shape != () and inp1.shape != ():
raise UnitOperationError(ufunc, u0, u1)
if isinstance(u1, unyt_array):
if u1.units.is_dimensionless:
pass
else:
raise UnitOperationError(ufunc, u0, u1)
if u1.shape == ():
u1 = float(u1)
else:
u1 = 1.0
unit_operator = self._ufunc_registry[ufunc]
if unit_operator in (_preserve_units, _comparison_unit, _arctan2_unit):
# check "is" equality first for speed
if u0 is not u1 and u0 != u1:
# we allow adding, multiplying, comparisons with
# zero-filled arrays, lists, etc or scalar zero. We
# do not allow zero-filled unyt_array instances for
# performance reasons. If we did allow it, every
# binary operation would need to scan over all the
# elements of both arrays to check for arrays filled
# with zeros
if not isinstance(i0, unyt_array) or not isinstance(i1, unyt_array):
any_nonzero = [np.count_nonzero(
i0), np.count_nonzero(i1)]
if any_nonzero[0] == 0:
u0 = u1
elif any_nonzero[1] == 0:
u1 = u0
if not u0.same_dimensions_as(u1):
if unit_operator is _comparison_unit:
# we allow comparisons between data with
# units and dimensionless data
if u0.is_dimensionless:
u0 = u1
elif u1.is_dimensionless:
u1 = u0
else:
raise UnitOperationError(ufunc, u0, u1)
else:
raise UnitOperationError(ufunc, u0, u1)
conv, offset = u1.get_conversion_factor(u0, inp1.dtype)
new_dtype = np.dtype("f" + str(inp1.dtype.itemsize))
conv = new_dtype.type(conv)
'''if offset is not None:
raise InvalidUnitOperation(
"Quantities with units of Fahrenheit or Celsius "
"cannot by multiplied, divided, subtracted or "
"added with data that has different units."
)'''
inp1 = np.asarray(inp1) * conv
# get the unit of the result
mul, unit = unit_operator(u0, u1)
# actually evaluate the ufunc
out_arr = func(
inp0.view(np.ndarray), inp1.view(np.ndarray), out=out_func, **kwargs
)
if unit_operator in (_multiply_units, _divide_units):
if unit.is_dimensionless and unit.base_value != 1.0:
if not u0.is_dimensionless:
if u0.dimensions == u1.dimensions:
out_arr = np.multiply(
out_arr.view(np.ndarray), unit.base_value, out=out_func
)
unit = Unit(registry=unit.registry)
'''if (
u0.base_offset
and u0.dimensions is temperature
or u1.base_offset
and u1.dimensions is temperature
):
print(u0.base_offset,u0.dimensions,u1.base_offset,u1.dimensions)
raise InvalidUnitOperation(
"Quantities with units of Fahrenheit or Celsius TODO "
"cannot by multiplied, divide, subtracted or added."
)'''
else:
raise RuntimeError(
"Support for the %s ufunc with %i inputs has not been"
"added to unyt_array." % (str(ufunc), len(inputs))
)
if unit is None:
out_arr = np.array(out_arr, copy=False)
elif ufunc in (modf, divmod_):
out_arr = tuple((ret_class(o, unit) for o in out_arr))
elif out_arr.size == 1:
out_arr = unyt_quantity(np.asarray(out_arr), unit)
else:
if ret_class is unyt_quantity:
# This happens if you do ndarray * unyt_quantity.
# Explicitly casting to unyt_array avoids creating a
# unyt_quantity with size > 1
out_arr = unyt_array(out_arr, unit)
else:
out_arr = ret_class(out_arr, unit, bypass_validation=True)
if out is not None:
if mul != 1:
multiply(out, mul, out=out)
if np.shares_memory(out_arr, out):
mul = 1
if isinstance(out, unyt_array):
try:
out.units = out_arr.units
except AttributeError:
# out_arr is an ndarray
out.units = Unit("", registry=self.units.registry)
if mul == 1:
return out_arr
return mul * out_arr
[docs]
def uappend(self, val):
import numpy as np
return unyt_array(np.append(self.value, val), self.units)
array.unyt_array.__array_ufunc__=array_ufunc
array.unyt_array.append=uappend
Unit.__mul__=mult
Unit.__truediv__=truediv
[docs]
def Int(self):
print(self.units)
if isinstance(self, unyt_quantity):
return unyt_quantity(int(self.value), self.units)
#array.unyt_array.__int__=Int
cyml_reg = UnitRegistry('mks')
[docs]
class Simulation(object):
def __init__(self, registry):
self.registry = registry
[docs]
def array(self, value, units):
return unyt.unyt_array(value, units, registry=self.registry)
[docs]
def quantity(self, value, units):
return unyt.unyt_quantity(value, units, registry=self.registry)
# dimensionless
dimless = {"plants":"plants", "leaf":"leaf"}
[docs]
def reg_():
if "mmW" not in cyml_reg:
cyml_reg.add("mmW", base_value=1.0, dimensions=unyt.dimensions.mass/unyt.dimensions.area, tex_repr=r"\rm{mm water}")
if "j" not in cyml_reg:
cyml_reg.add("j", base_value=1.0, dimensions=unyt.dimensions.energy, tex_repr=r"\rm{joule}")
if "C_degC" not in cyml_reg:
cyml_reg.add("C_degC", base_value=1.0, offset=273.15, dimensions=unyt.dimensions.temperature, tex_repr=r"\rm{joule}")
for k in dimless.keys():
if k not in cyml_reg:
cyml_reg.add(k, base_value=1.0, dimensions=unyt.dimensions.dimensionless, tex_repr=r"\rm{v}")
return cyml_reg
rg = reg_()
cyml_units = Simulation(rg)
from unyt.unit_systems import add_symbols
[docs]
class UnitContainer(object):
def __init__(self, registry):
add_symbols(vars(self), registry)
u = UnitContainer(cyml_units.registry)
[docs]
def Cint(v):
return int(v)*v.units
[docs]
def Cfloat(v):
return float(v)*v.units