Source code for ddf.operator

from abc import ABC, abstractmethod

import numpy as np

from .information import ExchangeVector, InternalState


[docs] class Operator(ABC):
[docs] @abstractmethod def get_eval_vec_len(self) -> int: """What is op.eval(x).shape[0]?""" raise NotImplementedError()
[docs] @abstractmethod def evaluate(self, state: InternalState) -> np.ndarray: """Evaluate the operator on the given state.""" raise NotImplementedError()
[docs] def get_cov(self, state: InternalState) -> np.ndarray: """Calculates the covariance of the state that will be transmitted to a neighbor.""" jacobian = self.get_jacobian(state) return jacobian @ state.covariance @ jacobian.T
[docs] @abstractmethod def get_jacobian(self, state: InternalState) -> np.ndarray: """Calculate jacobian of the operator at the given state.""" raise NotImplementedError()
[docs] def get_exchange_vector(self, state: InternalState) -> ExchangeVector: """Apply Operator to get full ExchangeVector. (i.e. evaluate and get_cov) """ mean = self.evaluate(state) cov = self.get_cov(state) return ExchangeVector(mean, cov)
[docs] class IdentityOperator(Operator): def __init__(self, n: int) -> None: self.n = n
[docs] def get_eval_vec_len(self) -> int: return self.n
[docs] def evaluate(self, state: InternalState) -> np.ndarray: return state.mean
[docs] def get_jacobian(self, state: InternalState) -> np.ndarray: return np.eye(state.mean.shape[0])
[docs] def get_cov(self, state: InternalState) -> np.ndarray: return state.covariance
[docs] class LinearOperator(Operator):
[docs] def get_eval_vec_len(self) -> int: return self.matrix.shape[0]
def __init__(self, matrix: np.ndarray) -> None: self.matrix = np.atleast_2d(matrix)
[docs] def evaluate(self, state: InternalState) -> np.ndarray: return self.matrix @ state.mean
[docs] def get_jacobian(self, state: InternalState) -> np.ndarray: return self.matrix