ddf package
Subpackages
- ddf.ant_link package
- ddf.lossnode package
- ddf.measurement package
- ddf.mqtt_communication package
- Submodules
- ddf.mqtt_communication.communicator module
- ddf.mqtt_communication.internal module
InternalCommunicator
InternalCommunicator.alias
InternalCommunicator.autorun
InternalCommunicator.base_topic
InternalCommunicator.client_id
InternalCommunicator.mqtt_host
InternalCommunicator.mqtt_port
InternalCommunicator.send_message()
InternalCommunicator.start()
InternalCommunicator.stop()
InternalCommunicator.test_send()
- ddf.mqtt_communication.message module
- Module contents
Submodules
ddf.ant_object_reference module
ANT Object Reference.
- class ddf.ant_object_reference.AntObjectReference(parent_node: Node, object_id: ~ddf.ddf.ObjectId, initial_state: ~ddf.information.InternalState, state: ~ddf.information.InternalState, measurements: list[~ddf.measurement.measurement.Measurement] = <factory>, info_subscribers: list[~ddf.ant_object_reference.InfoSubscriber] = <factory>, up_to_date: bool = False, optimizer_lsqs_method: str = 'trf', lower_bound_x_value: float = -inf, upper_bound_x_value: float = inf, probe_storage: ~ddf.measurement.gradient_storage.GradientStorage = <factory>)[source]
Bases:
object
A single point in time, or a single batch.
- add_new_measurement(operator: Operator, exchange_state: ExchangeVector, source: AntLink) None [source]
Add or overwrite a measurement.
locals: always just add (they come from the default_info_packets)
external: always overwrite if there is something to overwrite
internal: always overwrite if there is something to overwrite
- backpropagate(loss_gradient: ndarray, loss_channel: LossChannel) None [source]
Backpropagate the translated loss gradients through the measurements.
- backpropagate_from_neighbor(gradient_wrt_exchange_vector: ndarray, loss_channel: LossChannel, node_id: NodeId) None [source]
Backpropagate the gradient from a source (neighbor, another AntObjReference) to the internal state.
- evaluate_operators(state: InternalState) list[ndarray] [source]
Composite model for the AntObjectReference.
It combines the outputs of all models :param state: State vector. :type state: np.ndarray
- Returns:
The composite model output.
- Return type:
np.ndarray
- generate_gradient_messages() list[GradientMessage] [source]
Generate gradient messages from measurements.
- generate_info_messages() list[InfoMessage] [source]
Generate info messages for info subscribers.
- get_weight_projection_mat() ndarray [source]
Resize the weights vector.
Resizing is needed to perform Covariance Intersection with a vector of weights which only has one weight for all the measurements whose origin is the same node.
- Returns:
“Weight Projection Matrix”
- Return type:
np.ndarray
- info_subscribers: list[InfoSubscriber]
- initial_state: InternalState
initial state estimate, keep for covarianve limiter
- lower_bound_x: ndarray
lower bounds for least squares optimizer
- lower_bound_x_value: float = -inf
value used to populate AntObjectReference.lower_bound_x
- measurements: list[Measurement]
- operator_jacobians(state: InternalState) list[ndarray] [source]
Jacobian of the composite model for the AntObjectReference.
It combines the jacobians of all models
- optimizer_lsqs_method: str = 'trf'
- probe_storage: GradientStorage
- softmax(x: ndarray) ndarray [source]
The softmax function of vector x.
- Parameters:
x (1D ndarray) – Input vector
- Returns:
softmax(x)
- Return type:
1D ndarray
- solve() None [source]
Solve the data fusion problem using Covariance Intersection.
This function determines the weights fot the diferent external + a single weight for the internal measurements. Then a weighted sum of the sources of information is performed.
- Returns:
None.
- state: InternalState
current best guess for state estimate
- up_to_date: bool = False
this AntObjectReference just finished DataFusion and there is no new information in measurements
- update() None [source]
Like an info message, but only through history.
we do not need to send it over the network
this is only done for AntLinkInternalObjectID - the others have different Nodes as target
- upper_bound_x: ndarray
upper bounds for least squares optimizer
- upper_bound_x_value: float = inf
value used to populate AntObjectReference.upper_bound_x
- weighted_inference(measmt_weights: ndarray, save_results: bool = False) InternalState | None [source]
Weighted inference for the AntObjectReference.
Calculation of the weights that determine the influence of each measurement in the data fusion. With these weights the measurements are combined as information matrices that are used.
- Parameters:
measmt_weights (np.ndarray) – Weights for the measurements.
save_results – save state and state jacobians for AntObjectReferences and Measurements?
- Returns:
The infered state.
- Return type:
Information
- class ddf.ant_object_reference.InfoSubscriber(operator: Operator, source: AntLink, last_message: InfoMessage | None, communication_threshold: float = 1e-06)[source]
Bases:
object
Information Subscriber Info.
Information about another node that needs to be updated when this node has new information.
- communication_threshold: float = 1e-06
- generate_info_message(node: Node, state: InternalState) None | InfoMessage [source]
- kl_div(new_state: ExchangeVector, old_state: ExchangeVector) float [source]
Check if the info message is different from the current message.
- last_message: InfoMessage | None
- update(state: InternalState) None [source]
- ddf.ant_object_reference.cov2whitening(cov: ndarray, method: str = 'pca', fudge: float = 1e-18) ndarray [source]
Function for determining whitening matrices from covariance matrices.
- Parameters:
cov (2D ndarray symmetric) – covariance matrix
method (str, optional) – Method. ‘pca’ Eigenvalue decomposition nearest semi-positive definite approximation (if needed). ‘chol’ cholesky decomposition, only for non singular matrices. Defaults to ‘pca’.
fudge (float, optional) – Small fudge parameter to avoid infinities. Defaults to 1e-18.
- Returns:
whitening matrix
- Return type:
2D ndarray
- ddf.ant_object_reference.kl_div(mu_post: ndarray, gamma_post: ndarray, mu_prior: ndarray, gamma_prior: ndarray) float [source]
Calculate the KL divergence in bits.
- Parameters:
mu_post (1D ndarray) – Posterior mean vector
gamma_post (2D ndarray) – Posterior covariance
mu_prior (1D ndarray) – Prior mean
gamma_prior (2D ndarray) – Prior covariance
- Raises:
Exception – _description_
- Returns:
KL divergence in bits
- Return type:
float
ddf.communicator module
- class ddf.communicator.MockCommunicatorBuilder[source]
Bases:
object
Create mock communicators out of a builder.
All Communicators created from one instance of MockCommunicatorBuilder will be able to send messages to each other.
There is no filtering on who will get messages - everyone will get every message (except for messages sent by themselves).
- class InternalCommunicator(parent: MockCommunicatorBuilder, index: int)[source]
Bases:
Communicator
- index: int
- parent: MockCommunicatorBuilder
- communicators: list[tuple[Communicator, list[Message]]]
- get_instance() InternalCommunicator [source]
ddf.ddf module
Sample Implementation of Node and AntLink classes.
- class ddf.ddf.Id(node_id: ddf.ddf.NodeId, object_id: ddf.ddf.ObjectId | None)[source]
Bases:
object
- class ddf.ddf.NodeId(uuid: UUID)[source]
Bases:
object
Represent a Node.
For security reasons: use any version of UUID (i.e. UUIDv4)
- uuid: UUID
- class ddf.ddf.ObjectId(uuid: UUID)[source]
Bases:
object
Represent an object.
This could be a reference to a timestamp or a batch.
- uuid: UUID
- ddf.ddf.UUID_VERSION = 7
UUIDs should use version 7 to be able to encode a datetime.
ddf.information module
MVN estimates.
Include two subclasses to have type information about the type of MVN estimate
- class ddf.information.ExchangeVector(mean: ndarray, covariance: ndarray)[source]
Bases:
MVNEstimate
MVNEstimate representing an exchange vector.
- class ddf.information.InternalState(mean: ndarray, covariance: ndarray)[source]
Bases:
MVNEstimate
MVNEstimate representing internal state.
- class ddf.information.MVNEstimate(mean: ndarray, covariance: ndarray)[source]
Bases:
object
Information class for storing information about the state of the system.
- copy() MVNEstimate [source]
- covariance: ndarray
Covariance matrix of the state.
- get_whitening_op(method: str = 'pca', fudge: float = 1e-18) ndarray [source]
Function for determining whitening matrices from covariance matrices.
- Parameters:
cov (2D ndarray symmetric) – covariance matrix
method (str, optional) – Method. ‘pca’ Eigenvalue decomposition nearest semi-positive definite approximation (if needed). ‘chol’ cholesky decomposition, only for non singular matrices. Defaults to ‘pca’.
fudge (float, optional) – Small fudge parameter to avoid infinities. Defaults to 1e-18.
- Returns:
whitening matrix
- Return type:
2D ndarray
- mean: ndarray
Mean vector of the state
ddf.message module
- class ddf.message.AddNeighborMessage(sender: ddf.ddf.NodeId, recipient: ddf.ddf.NodeId, neighbor: ddf.ddf.NodeId, req_id: uuid.UUID)[source]
Bases:
Message
- req_id: UUID
- class ddf.message.ControlCommand(*values)[source]
Bases:
Enum
- FZB = 0
- GetNodeContext = 3
- GetNodeInfo = 2
- UUI = 1
- class ddf.message.ControlMessage(sender: ddf.ddf.NodeId, recipient: ddf.ddf.NodeId, request_id: uuid.UUID, command: ddf.message.ControlCommand)[source]
Bases:
Message
- command: ControlCommand
- request_id: UUID
- class ddf.message.GradientMessage(sender: ddf.ddf.NodeId, recipient: ddf.ddf.NodeId, object_id: ddf.ddf.ObjectId, gradient: numpy.ndarray, loss_channel: ddf.ant_link.loss_channel.LossChannel)[source]
Bases:
Message
- gradient: ndarray
- loss_channel: LossChannel
- class ddf.message.InfoMessage(sender: ddf.ddf.NodeId, recipient: ddf.ddf.NodeId, object_id: ddf.ddf.ObjectId, state: ddf.information.ExchangeVector, message_counter: int)[source]
Bases:
Message
- message_counter: int
- state: ExchangeVector
- class ddf.message.Message(sender: ddf.ddf.NodeId, recipient: ddf.ddf.NodeId)[source]
Bases:
ABC
- class ddf.message.NodeContextMessage(sender: ddf.ddf.NodeId, recipient: ddf.ddf.NodeId, fixed_context: str, user_context: str, req_id: uuid.UUID)[source]
Bases:
Message
- fixed_context: str
- req_id: UUID
- user_context: str
- class ddf.message.NodeInfoMessage(sender: ddf.ddf.NodeId, recipient: ddf.ddf.NodeId, available: list[str], exchange: list[str], alias: str, req_id: uuid.UUID)[source]
Bases:
Message
- alias: str
- available: list[str]
- exchange: list[str]
- req_id: UUID
- class ddf.message.ProbeResponseMessage(sender: ddf.ddf.NodeId, recipient: ddf.ddf.NodeId, reference: ddf.ddf.ObjectId, gradient: numpy.ndarray, loss_channel: ddf.ant_link.loss_channel.LossChannel)[source]
Bases:
Message
- gradient: ndarray
- loss_channel: LossChannel
ddf.node module
ANT Node Module.
- class ddf.node.Neighbor(node_id: ddf.ddf.NodeId, operator: ddf.operator.Operator)[source]
Bases:
object
- class ddf.node.Node(node_id: ~ddf.ddf.NodeId, history: list[~ddf.ant_object_reference.AntObjectReference], neighbors: dict[~ddf.ddf.NodeId, ~ddf.node.Neighbor], info_subscribers: dict[~ddf.ddf.NodeId, ~ddf.node.Neighbor], internal_operator: ~ddf.operator.Operator, alias: str, initial_state: ~ddf.information.InternalState, communicator: ~ddf.communicator.Communicator, default_information_packets: list[tuple[~ddf.operator.Operator, ~ddf.information.ExchangeVector, list[~ddf.ant_link.ant_link.GradientSubscription]]], fixed_context: str, frozen: bool = False, mode: ~ddf.node.ProcessingMode = ProcessingMode.INFO_MODE, calculate_residual_loss: bool = False, user_context: str = '', available_labels: list[str] = <factory>)[source]
Bases:
object
ANT Node class.
- alias: str
- available_labels: list[str]
- calculate_residual_loss: bool = False
- communicator: Communicator
- create_ant_object_reference(object_id: ObjectId | None, update_counter: int = 0) AntObjectReference [source]
- default_information_packets: list[tuple[Operator, ExchangeVector, list[GradientSubscription]]]
- find_object_reference(object_id: ObjectId) AntObjectReference | None [source]
Search for specific AntObjectReference in this Node.
- fixed_context: str
- frozen: bool = False
- history: list[AntObjectReference]
- initial_state: InternalState
- insert_measurement_from_info_message(ref: AntObjectReference, msg: InfoMessage) None [source]
- insert_measurement_from_sensor(object_id: ObjectId, op: Operator, sensor_data: ExchangeVector) None [source]
- mode: ProcessingMode = 0
- object_ref_in_past(ref: AntObjectReference) bool [source]
- poll_history_from_object_id(object_id: ObjectId) AntObjectReference | None [source]
Search for specific Object Id in history.
- Parameters:
object_id (ObjectId) – the object id to search for in this nodes history.
- Returns:
if the object id is found, return the AntObjectReference with this object id, otherwise None is returned.
- Return type:
reference (AntObjectReference | None)
- set_available_labels(labels: list[str]) None [source]
Set all available label texts.
This will be used when asking for GetNodeInfo.
- set_user_context(user_context: str) None [source]
Set the user_context of this node.
This will be used when sending NodeContext Messages.
- user_context: str = ''
ddf.object_id_utils module
ddf.operator module
- class ddf.operator.IdentityOperator(n: int)[source]
Bases:
Operator
- evaluate(state: InternalState) ndarray [source]
Evaluate the operator on the given state.
- get_cov(state: InternalState) ndarray [source]
Calculates the covariance of the state that will be transmitted to a neighbor.
- get_jacobian(state: InternalState) ndarray [source]
Calculate jacobian of the operator at the given state.
- class ddf.operator.LinearOperator(matrix: ndarray)[source]
Bases:
Operator
- evaluate(state: InternalState) ndarray [source]
Evaluate the operator on the given state.
- get_jacobian(state: InternalState) ndarray [source]
Calculate jacobian of the operator at the given state.
- class ddf.operator.Operator[source]
Bases:
ABC
- abstractmethod evaluate(state: InternalState) ndarray [source]
Evaluate the operator on the given state.
- get_cov(state: InternalState) ndarray [source]
Calculates the covariance of the state that will be transmitted to a neighbor.
- get_exchange_vector(state: InternalState) ExchangeVector [source]
Apply Operator to get full ExchangeVector.
(i.e. evaluate and get_cov)
- abstractmethod get_jacobian(state: InternalState) ndarray [source]
Calculate jacobian of the operator at the given state.