Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
288 changes: 284 additions & 4 deletions sasdata/quantities/quantity.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,72 @@ def transpose(a: Union["Quantity[ArrayLike]", ArrayLike], axes: tuple | None = N
return np.transpose(a, axes=axes)


def trace(a: Union["Quantity[ArrayLike]", ArrayLike], offset: int = 0, axis1: int = 0, axis2: int = 1):
"""Find the trace of an array or an array based quantity."""
if isinstance(a, Quantity):
return DerivedQuantity(
value=np.trace(a.value, offset, axis1, axis2),
units=a.units,
history=QuantityHistory.apply_operation(Trace, a.history, offset=offset, axis1=axis1, axis2=axis2),
)
else:
return np.trace(a, offset, axis1, axis2)


def matinv(a: Union["Quantity[ArrayLike]", ArrayLike]):
"""Find the inverse of a matrix."""
if isinstance(a, Quantity):
return DerivedQuantity(
value=np.linalg.inv(a.value),
units=a.units,
history=QuantityHistory.apply_operation(MatInv, a.history),
)
else:
return np.linalg.inv(a)


def norm_1(a: Union["Quantity[ArrayLike]", ArrayLike], axes: int | tuple[int] | None = None):
"""Caculate the 1-norm of an array or an array based quantity."""
if isinstance(a, Quantity):
if axes is None:
return DerivedQuantity(
value=np.linalg.norm(a.value, ord=1, axes=axes),
units=a.units,
history=QuantityHistory.apply_operation(Norm_1, a.history),
)

else:
return DerivedQuantity(
value=np.linalg.norm(a.value, ord=1, axes=axes),
units=a.units,
history=QuantityHistory.apply_operation(Norm_1, a.history, axes=axes),
)

else:
return np.linalg.norm(a.value, ord=1, axes=axes)


def norm_2(a: Union["Quantity[ArrayLike]", ArrayLike], axes: int | tuple[int] | None = None):
"""Caculate the 2-norm of an array or an array based quantity."""
if isinstance(a, Quantity):
if axes is None:
return DerivedQuantity(
value=np.linalg.norm(a.value, axes=axes),
units=a.units,
history=QuantityHistory.apply_operation(Norm_2, a.history),
)

else:
return DerivedQuantity(
value=np.linalg.norm(a.value, axes=axes),
units=a.units,
history=QuantityHistory.apply_operation(Norm_2, a.history, axes=axes),
)

else:
return np.linalg.norm(a.value, axes=axes)


def dot(a: Union["Quantity[ArrayLike]", ArrayLike], b: Union["Quantity[ArrayLike]", ArrayLike]):
"""Dot product of two arrays or two array based quantities"""
a_is_quantity = isinstance(a, Quantity)
Expand Down Expand Up @@ -294,6 +360,34 @@ def __eq__(self, other):
return False


class MatrixIdentity(ConstantBase):
serialisation_name = "identity"

def __init__(self, size):
self.size = size

def evaluate(self, variables: dict[int, T]) -> T:
return np.eye(self.size)

def _derivative(self, hash_value: int):
return Constant(np.zeros((self.size, self.size)))

@staticmethod
def _deserialise(parameters: dict) -> "Operation":
return MatrixIdentity(self.size)

def _serialise_parameters(self) -> dict[str, Any]:
return {"size": numerical_encode(self.size)}

def summary(self, indent_amount: int = 0, indent=" "):
return f"{indent_amount * indent}{self.size} [Matrix Id.]"

def __eq__(self, other):
if isinstance(other, MatrixIdentity):
return self.size == other.size
return False


class Constant(ConstantBase):
serialisation_name = "constant"

Expand Down Expand Up @@ -999,14 +1093,14 @@ def __init__(self, a: Operation, axes: tuple[int] | None = None):
self.axes = axes

def evaluate(self, variables: dict[int, T]) -> T:
return np.transpose(self.a.evaluate(variables))
return np.transpose(self.a.evaluate(variables), self.axes)

def _derivative(self, hash_value: int) -> Operation:
return Transpose(self.a.derivative(hash_value)) # TODO: Check!
return Transpose(self.a.derivative(hash_value), self.axes) # TODO: Check!

def _clean(self):
clean_a = self.a._clean()
return Transpose(clean_a)
return Transpose(clean_a, self.axes)

def _serialise_parameters(self) -> dict[str, Any]:
if self.axes is None:
Expand Down Expand Up @@ -1034,7 +1128,7 @@ def summary(self, indent_amount: int = 0, indent=" "):
f"{indent_amount * indent}Transpose(\n"
+ self.a.summary(indent_amount + 1, indent)
+ "\n"
+ f"{(indent_amount + 1) * indent}{self.axes}\n"
+ f"{(indent_amount + 1) * indent}{list(self.axes)}\n"
+ f"{indent_amount * indent})"
)

Expand All @@ -1044,6 +1138,56 @@ def __eq__(self, other):
return False


class Trace(Operation):
"""Trace operation - as per numpy"""

serialisation_name = "trace"

def __init__(self, a: Operation, offset: int = 0, axis1: int = 0, axis2: int = 1):
self.a = a
self.offset = offset
self.axis1 = axis1
self.axis2 = axis2

def evaluate(self, variables: dict[int, T]) -> T:
return np.trace(self.a.evaluate(variables), self.offset, self.axis1, self.axis2)

def _derivative(self, hash_value: int) -> Operation:
return Trace(self.a.derivative(hash_value), self.offset, self.axis1, self.axis2)

def _clean(self):
clean_a = self.a._clean()
return Trace(clean_a, self.offset, self.axis1, self.axis2)

def _serialise_parameters(self) -> dict[str, Any]:
return {"a": self.a._serialise_json(), "offset": self.offset, "axis1": self.axis1, "axis2": self.axis2}

@staticmethod
def _deserialise(parameters: dict) -> "Operation":
return Trace(
a=Operation.deserialise_json(parameters["a"]),
offset=parameters["offset"],
axis1=parameters["axis1"],
axis2=parameters["axis2"],
)

def summary(self, indent_amount: int = 0, indent=" "):
return (
f"{indent_amount * indent}Trace(\n"
+ self.a.summary(indent_amount + 1, indent)
+ "\n"
+ f"{(indent_amount + 1) * indent}{self.offset}\n"
+ f"{(indent_amount + 1) * indent}{self.axis1}\n"
+ f"{(indent_amount + 1) * indent}{self.axis2}\n"
+ f"{indent_amount * indent})"
)

def __eq__(self, other):
if isinstance(other, Trace):
return other.a == self.a
return False


class Dot(BinaryOperation):
"""Dot product - backed by numpy's dot method"""

Expand Down Expand Up @@ -1089,6 +1233,28 @@ def _clean_ab(self, a, b):
return MatMul(a, b)


class MatInv(UnaryOperation):
"""Matrix inversion, using numpy"""

serialisation_name = "matinv"

def evaluate(self, variables: dict[int, T]) -> T:
return np.linalg.inv(self.a.evaluate(variables))

def _derivative(self, hash_value: int) -> Operation:
return Neg(Matmul(MatMul(MatInv(self.a), self.a._derivative(hash_value)), MatInv(self.a)))

def _clean(self):
clean_a = self.a._clean()

if isinstance(clean_a, MatInv):
# Removes double inversions
return clean_a.a

else:
return MatInv(clean_a)


class TensorDot(Operation):
serialisation_name = "tensor_product"

Expand Down Expand Up @@ -1119,6 +1285,116 @@ def _deserialise(parameters: dict) -> "Operation":
)


class Norm_1(Operation):
"""1-norm of a matrix from numpy"""

serialisation_name = "norm_1"

def __init__(self, a: Operation, axes: int | tuple[int] | None = None):
self.a = a
self.axes = axes

def evaluate(self, variables: dict[int, T]) -> T:
return np.linalg.norm(self.a.evaluate(variables), ord=1, axis=self.axes)

def _derivative(self, hash_value: int) -> Operation:
return np.sign(self.a)

def _clean(self):
clean_a = self.a._clean()
return Norm_1(clean_a, self.axes)

def _serialise_parameters(self) -> dict[str, Any]:
if self.axes is None:
return {"a": self.a._serialise_json()}
else:
return {"a": self.a._serialise_json(), "axes": list(self.axes)}

@staticmethod
def _deserialise(parameters: dict) -> "Operation":
if "axes" in parameters:
return Norm_1(a=Operation.deserialise_json(parameters["a"]), axes=tuple(parameters["axes"]))
else:
return Norm_1(a=Operation.deserialise_json(parameters["a"]))

def summary(self, indent_amount: int = 0, indent=" "):
if self.axes is None:
return (
f"{indent_amount * indent}Norm_1(\n"
+ self.a.summary(indent_amount + 1, indent)
+ "\n"
+ f"{indent_amount * indent})"
)
else:
return (
f"{indent_amount * indent}Norm_1(\n"
+ self.a.summary(indent_amount + 1, indent)
+ "\n"
+ f"{(indent_amount + 1) * indent}{list(self.axes)}\n"
+ f"{indent_amount * indent})"
)

def __eq__(self, other):
if isinstance(other, Norm_1):
return other.a == self.a
return False


class Norm_2(Operation):
"""2-norm of a matrix from numpy"""

serialisation_name = "norm_2"

def __init__(self, a: Operation, axes: int | tuple[int] | None = None):
self.a = a
self.axes = axes

def evaluate(self, variables: dict[int, T]) -> T:
return np.linalg.norm(self.a.evaluate(variables), axis=self.axes)

def _derivative(self, hash_value: int) -> Operation:
return Transpose(Div(self.a, Norm_2(self.a, self.axes)))

def _clean(self):
clean_a = self.a._clean()
return Norm_2(clean_a, self.axes)

def _serialise_parameters(self) -> dict[str, Any]:
if self.axes is None:
return {"a": self.a._serialise_json()}
else:
return {"a": self.a._serialise_json(), "axes": list(self.axes)}

@staticmethod
def _deserialise(parameters: dict) -> "Operation":
if "axes" in parameters:
return Norm_2(a=Operation.deserialise_json(parameters["a"]), axes=tuple(parameters["axes"]))
else:
return Norm_2(a=Operation.deserialise_json(parameters["a"]))

def summary(self, indent_amount: int = 0, indent=" "):
if self.axes is None:
return (
f"{indent_amount * indent}Norm_2(\n"
+ self.a.summary(indent_amount + 1, indent)
+ "\n"
+ f"{indent_amount * indent})"
)
else:
return (
f"{indent_amount * indent}Norm_2(\n"
+ self.a.summary(indent_amount + 1, indent)
+ "\n"
+ f"{(indent_amount + 1) * indent}{list(self.axes)}\n"
+ f"{indent_amount * indent})"
)

def __eq__(self, other):
if isinstance(other, Norm_2):
return other.a == self.a
return False


_serialisable_classes = [
AdditiveIdentity,
MultiplicativeIdentity,
Expand All @@ -1141,9 +1417,13 @@ def _deserialise(parameters: dict) -> "Operation":
Pow,
Log,
Transpose,
Trace,
Dot,
MatMul,
MatInv,
TensorDot,
Norm_1,
Norm_2,
]

_serialisation_lookup = {class_.serialisation_name: class_ for class_ in _serialisable_classes}
Expand Down
Loading
Loading