from abc import ABC, abstractmethod
import numpy as np
from .information import ExchangeVector, InternalState
[docs]
class Operator(ABC):
[docs]
@abstractmethod
def get_eval_vec_len(self) -> int:
"""What is op.eval(x).shape[0]?"""
raise NotImplementedError()
[docs]
@abstractmethod
def evaluate(self, state: InternalState) -> np.ndarray:
"""Evaluate the operator on the given state."""
raise NotImplementedError()
[docs]
def get_cov(self, state: InternalState) -> np.ndarray:
"""Calculates the covariance of the state that will be transmitted to a neighbor."""
jacobian = self.get_jacobian(state)
return jacobian @ state.covariance @ jacobian.T
[docs]
@abstractmethod
def get_jacobian(self, state: InternalState) -> np.ndarray:
"""Calculate jacobian of the operator at the given state."""
raise NotImplementedError()
[docs]
def get_exchange_vector(self, state: InternalState) -> ExchangeVector:
"""Apply Operator to get full ExchangeVector.
(i.e. evaluate and get_cov)
"""
mean = self.evaluate(state)
cov = self.get_cov(state)
return ExchangeVector(mean, cov)
[docs]
class IdentityOperator(Operator):
def __init__(self, n: int) -> None:
self.n = n
[docs]
def get_eval_vec_len(self) -> int:
return self.n
[docs]
def evaluate(self, state: InternalState) -> np.ndarray:
return state.mean
[docs]
def get_jacobian(self, state: InternalState) -> np.ndarray:
return np.eye(state.mean.shape[0])
[docs]
def get_cov(self, state: InternalState) -> np.ndarray:
return state.covariance
[docs]
class LinearOperator(Operator):
[docs]
def get_eval_vec_len(self) -> int:
return self.matrix.shape[0]
def __init__(self, matrix: np.ndarray) -> None:
self.matrix = np.atleast_2d(matrix)
[docs]
def evaluate(self, state: InternalState) -> np.ndarray:
return self.matrix @ state.mean
[docs]
def get_jacobian(self, state: InternalState) -> np.ndarray:
return self.matrix