ddf package

Subpackages

Submodules

ddf.ant_object_reference module

ANT Object Reference.

class ddf.ant_object_reference.AntObjectReference(parent_node: Node, object_id: ~ddf.ddf.ObjectId, initial_state: ~ddf.information.InternalState, state: ~ddf.information.InternalState, measurements: list[~ddf.measurement.measurement.Measurement] = <factory>, info_subscribers: list[~ddf.ant_object_reference.InfoSubscriber] = <factory>, up_to_date: bool = False, optimizer_lsqs_method: str = 'trf', lower_bound_x_value: float = -inf, upper_bound_x_value: float = inf, probe_storage: ~ddf.measurement.gradient_storage.GradientStorage = <factory>)[source]

Bases: object

A single point in time, or a single batch.

add_new_measurement(operator: Operator, exchange_state: ExchangeVector, source: AntLink) None[source]

Add or overwrite a measurement.

  1. locals: always just add (they come from the default_info_packets)

  2. external: always overwrite if there is something to overwrite

  3. internal: always overwrite if there is something to overwrite

backpropagate(loss_gradient: ndarray, loss_channel: LossChannel) None[source]

Backpropagate the translated loss gradients through the measurements.

backpropagate_from_neighbor(gradient_wrt_exchange_vector: ndarray, loss_channel: LossChannel, node_id: NodeId) None[source]

Backpropagate the gradient from a source (neighbor, another AntObjReference) to the internal state.

evaluate_operators(state: InternalState) list[ndarray][source]

Composite model for the AntObjectReference.

It combines the outputs of all models :param state: State vector. :type state: np.ndarray

Returns:

The composite model output.

Return type:

np.ndarray

generate_gradient_messages() list[GradientMessage][source]

Generate gradient messages from measurements.

generate_info_messages() list[InfoMessage][source]

Generate info messages for info subscribers.

get_weight_projection_mat() ndarray[source]

Resize the weights vector.

Resizing is needed to perform Covariance Intersection with a vector of weights which only has one weight for all the measurements whose origin is the same node.

Returns:

“Weight Projection Matrix”

Return type:

np.ndarray

info_subscribers: list[InfoSubscriber]
initial_state: InternalState

initial state estimate, keep for covarianve limiter

lower_bound_x: ndarray

lower bounds for least squares optimizer

lower_bound_x_value: float = -inf

value used to populate AntObjectReference.lower_bound_x

measurements: list[Measurement]
object_id: ObjectId
operator_jacobians(state: InternalState) list[ndarray][source]

Jacobian of the composite model for the AntObjectReference.

It combines the jacobians of all models

optimize() None[source]
optimizer_lsqs_method: str = 'trf'
parent_node: Node
probe_storage: GradientStorage
softmax(x: ndarray) ndarray[source]

The softmax function of vector x.

Parameters:

x (1D ndarray) – Input vector

Returns:

softmax(x)

Return type:

1D ndarray

solve() None[source]

Solve the data fusion problem using Covariance Intersection.

This function determines the weights fot the diferent external + a single weight for the internal measurements. Then a weighted sum of the sources of information is performed.

Returns:

None.

state: InternalState

current best guess for state estimate

up_to_date: bool = False

this AntObjectReference just finished DataFusion and there is no new information in measurements

update() None[source]

Like an info message, but only through history.

we do not need to send it over the network

this is only done for AntLinkInternalObjectID - the others have different Nodes as target

upper_bound_x: ndarray

upper bounds for least squares optimizer

upper_bound_x_value: float = inf

value used to populate AntObjectReference.upper_bound_x

weighted_inference(measmt_weights: ndarray, save_results: bool = False) InternalState | None[source]

Weighted inference for the AntObjectReference.

Calculation of the weights that determine the influence of each measurement in the data fusion. With these weights the measurements are combined as information matrices that are used.

Parameters:
  • measmt_weights (np.ndarray) – Weights for the measurements.

  • save_results – save state and state jacobians for AntObjectReferences and Measurements?

Returns:

The infered state.

Return type:

Information

class ddf.ant_object_reference.InfoSubscriber(operator: Operator, source: AntLink, last_message: InfoMessage | None, communication_threshold: float = 1e-06)[source]

Bases: object

Information Subscriber Info.

Information about another node that needs to be updated when this node has new information.

communication_threshold: float = 1e-06
generate_info_message(node: Node, state: InternalState) None | InfoMessage[source]
kl_div(new_state: ExchangeVector, old_state: ExchangeVector) float[source]

Check if the info message is different from the current message.

last_message: InfoMessage | None
operator: Operator
source: AntLink
update(state: InternalState) None[source]
ddf.ant_object_reference.cov2whitening(cov: ndarray, method: str = 'pca', fudge: float = 1e-18) ndarray[source]

Function for determining whitening matrices from covariance matrices.

Parameters:
  • cov (2D ndarray symmetric) – covariance matrix

  • method (str, optional) – Method. ‘pca’ Eigenvalue decomposition nearest semi-positive definite approximation (if needed). ‘chol’ cholesky decomposition, only for non singular matrices. Defaults to ‘pca’.

  • fudge (float, optional) – Small fudge parameter to avoid infinities. Defaults to 1e-18.

Returns:

whitening matrix

Return type:

2D ndarray

ddf.ant_object_reference.kl_div(mu_post: ndarray, gamma_post: ndarray, mu_prior: ndarray, gamma_prior: ndarray) float[source]

Calculate the KL divergence in bits.

Parameters:
  • mu_post (1D ndarray) – Posterior mean vector

  • gamma_post (2D ndarray) – Posterior covariance

  • mu_prior (1D ndarray) – Prior mean

  • gamma_prior (2D ndarray) – Prior covariance

Raises:

Exception – _description_

Returns:

KL divergence in bits

Return type:

float

ddf.communicator module

class ddf.communicator.Communicator[source]

Bases: ABC

abstractmethod get_messages() list[Message][source]
abstractmethod send_message(msg: Message) None[source]
class ddf.communicator.MockCommunicatorBuilder[source]

Bases: object

Create mock communicators out of a builder.

All Communicators created from one instance of MockCommunicatorBuilder will be able to send messages to each other.

There is no filtering on who will get messages - everyone will get every message (except for messages sent by themselves).

class InternalCommunicator(parent: MockCommunicatorBuilder, index: int)[source]

Bases: Communicator

get_messages() list[Message][source]
index: int
parent: MockCommunicatorBuilder
peek_messages() list[Message][source]

Retrieve messages without clearing them.

send_message(msg: Message) None[source]
communicators: list[tuple[Communicator, list[Message]]]
get_instance() InternalCommunicator[source]
get_messages(index: int) list[Message][source]

Retreive all messages for communicator at index.

send_message(msg: Message, index: int) None[source]

Send messages to all communicators except index.

ddf.ddf module

Sample Implementation of Node and AntLink classes.

class ddf.ddf.Id(node_id: ddf.ddf.NodeId, object_id: ddf.ddf.ObjectId | None)[source]

Bases: object

node_id: NodeId
object_id: ObjectId | None
class ddf.ddf.NodeId(uuid: UUID)[source]

Bases: object

Represent a Node.

For security reasons: use any version of UUID (i.e. UUIDv4)

uuid: UUID
class ddf.ddf.ObjectId(uuid: UUID)[source]

Bases: object

Represent an object.

This could be a reference to a timestamp or a batch.

uuid: UUID
ddf.ddf.UUID_VERSION = 7

UUIDs should use version 7 to be able to encode a datetime.

ddf.ddf.pretty_print_uuid7(uuid: UUID) str[source]

ddf.information module

MVN estimates.

Include two subclasses to have type information about the type of MVN estimate

class ddf.information.ExchangeVector(mean: ndarray, covariance: ndarray)[source]

Bases: MVNEstimate

MVNEstimate representing an exchange vector.

class ddf.information.InternalState(mean: ndarray, covariance: ndarray)[source]

Bases: MVNEstimate

MVNEstimate representing internal state.

class ddf.information.MVNEstimate(mean: ndarray, covariance: ndarray)[source]

Bases: object

Information class for storing information about the state of the system.

copy() MVNEstimate[source]
covariance: ndarray

Covariance matrix of the state.

get_whitening_op(method: str = 'pca', fudge: float = 1e-18) ndarray[source]

Function for determining whitening matrices from covariance matrices.

Parameters:
  • cov (2D ndarray symmetric) – covariance matrix

  • method (str, optional) – Method. ‘pca’ Eigenvalue decomposition nearest semi-positive definite approximation (if needed). ‘chol’ cholesky decomposition, only for non singular matrices. Defaults to ‘pca’.

  • fudge (float, optional) – Small fudge parameter to avoid infinities. Defaults to 1e-18.

Returns:

whitening matrix

Return type:

2D ndarray

mean: ndarray

Mean vector of the state

ddf.message module

class ddf.message.AddNeighborMessage(sender: ddf.ddf.NodeId, recipient: ddf.ddf.NodeId, neighbor: ddf.ddf.NodeId, req_id: uuid.UUID)[source]

Bases: Message

neighbor: NodeId
req_id: UUID
class ddf.message.ControlCommand(*values)[source]

Bases: Enum

FZB = 0
GetNodeContext = 3
GetNodeInfo = 2
UUI = 1
class ddf.message.ControlMessage(sender: ddf.ddf.NodeId, recipient: ddf.ddf.NodeId, request_id: uuid.UUID, command: ddf.message.ControlCommand)[source]

Bases: Message

command: ControlCommand
request_id: UUID
class ddf.message.GradientMessage(sender: ddf.ddf.NodeId, recipient: ddf.ddf.NodeId, object_id: ddf.ddf.ObjectId, gradient: numpy.ndarray, loss_channel: ddf.ant_link.loss_channel.LossChannel)[source]

Bases: Message

gradient: ndarray
loss_channel: LossChannel
object_id: ObjectId
class ddf.message.InfoMessage(sender: ddf.ddf.NodeId, recipient: ddf.ddf.NodeId, object_id: ddf.ddf.ObjectId, state: ddf.information.ExchangeVector, message_counter: int)[source]

Bases: Message

message_counter: int
object_id: ObjectId
state: ExchangeVector
class ddf.message.Message(sender: ddf.ddf.NodeId, recipient: ddf.ddf.NodeId)[source]

Bases: ABC

recipient: NodeId
sender: NodeId
class ddf.message.NodeContextMessage(sender: ddf.ddf.NodeId, recipient: ddf.ddf.NodeId, fixed_context: str, user_context: str, req_id: uuid.UUID)[source]

Bases: Message

fixed_context: str
req_id: UUID
user_context: str
class ddf.message.NodeInfoMessage(sender: ddf.ddf.NodeId, recipient: ddf.ddf.NodeId, available: list[str], exchange: list[str], alias: str, req_id: uuid.UUID)[source]

Bases: Message

alias: str
available: list[str]
exchange: list[str]
req_id: UUID
class ddf.message.ProbeResponseMessage(sender: ddf.ddf.NodeId, recipient: ddf.ddf.NodeId, reference: ddf.ddf.ObjectId, gradient: numpy.ndarray, loss_channel: ddf.ant_link.loss_channel.LossChannel)[source]

Bases: Message

gradient: ndarray
loss_channel: LossChannel
reference: ObjectId

ddf.node module

ANT Node Module.

class ddf.node.Neighbor(node_id: ddf.ddf.NodeId, operator: ddf.operator.Operator)[source]

Bases: object

node_id: NodeId
operator: Operator
class ddf.node.Node(node_id: ~ddf.ddf.NodeId, history: list[~ddf.ant_object_reference.AntObjectReference], neighbors: dict[~ddf.ddf.NodeId, ~ddf.node.Neighbor], info_subscribers: dict[~ddf.ddf.NodeId, ~ddf.node.Neighbor], internal_operator: ~ddf.operator.Operator, alias: str, initial_state: ~ddf.information.InternalState, communicator: ~ddf.communicator.Communicator, default_information_packets: list[tuple[~ddf.operator.Operator, ~ddf.information.ExchangeVector, list[~ddf.ant_link.ant_link.GradientSubscription]]], fixed_context: str, frozen: bool = False, mode: ~ddf.node.ProcessingMode = ProcessingMode.INFO_MODE, calculate_residual_loss: bool = False, user_context: str = '', available_labels: list[str] = <factory>)[source]

Bases: object

ANT Node class.

add_info_subscriber(node_id: NodeId, op: Operator, eval_vec_len: int) None[source]
add_neighbor(node_id: NodeId, op: Operator, eval_vec_len: int) None[source]
alias: str
available_labels: list[str]
calculate_residual_loss: bool = False
communicator: Communicator
create_ant_object_reference(object_id: ObjectId | None, update_counter: int = 0) AntObjectReference[source]
default_information_packets: list[tuple[Operator, ExchangeVector, list[GradientSubscription]]]
find_object_reference(object_id: ObjectId) AntObjectReference | None[source]

Search for specific AntObjectReference in this Node.

fixed_context: str
frozen: bool = False
history: list[AntObjectReference]
info_subscribers: dict[NodeId, Neighbor]
initial_state: InternalState
insert_measurement_from_info_message(ref: AntObjectReference, msg: InfoMessage) None[source]
insert_measurement_from_sensor(object_id: ObjectId, op: Operator, sensor_data: ExchangeVector) None[source]
internal_operator: Operator
mode: ProcessingMode = 0
neighbors: dict[NodeId, Neighbor]
node_id: NodeId
object_ref_in_past(ref: AntObjectReference) bool[source]
optimize() None[source]
poll_history_from_object_id(object_id: ObjectId) AntObjectReference | None[source]

Search for specific Object Id in history.

Parameters:

object_id (ObjectId) – the object id to search for in this nodes history.

Returns:

if the object id is found, return the AntObjectReference with this object id, otherwise None is returned.

Return type:

reference (AntObjectReference | None)

residual_loss() None[source]
run() None[source]

Run the step function forever.

set_available_labels(labels: list[str]) None[source]

Set all available label texts.

This will be used when asking for GetNodeInfo.

set_user_context(user_context: str) None[source]

Set the user_context of this node.

This will be used when sending NodeContext Messages.

solve(force: bool = False) None[source]
step() None[source]

Process and send messages once.

user_context: str = ''
visualize() str[source]

Visualize the node as a string.

zerogradients() None[source]
class ddf.node.ProcessingMode(*values)[source]

Bases: Enum

GRADIENT_MODE = 1
INFO_MODE = 0

ddf.object_id_utils module

ddf.object_id_utils.convert_epoch(epoch_in_s: float) tuple[int, int][source]

Convert epoch to seconds and nanoseconds.

Should be fine until year=6429

ddf.object_id_utils.create_uuidv7_from_datetime(dt: datetime) UUID[source]
ddf.object_id_utils.create_uuidv7_from_epoch(epoch_in_s: float) UUID[source]
ddf.object_id_utils.read_datetime_from_uuidv7(uuid: UUID) datetime[source]
ddf.object_id_utils.zero_non_timestamp_fields(uuid: UUID) UUID[source]

Zero out the non-timestamp fields of a UUIDv7.

ddf.operator module

class ddf.operator.IdentityOperator(n: int)[source]

Bases: Operator

evaluate(state: InternalState) ndarray[source]

Evaluate the operator on the given state.

get_cov(state: InternalState) ndarray[source]

Calculates the covariance of the state that will be transmitted to a neighbor.

get_eval_vec_len() int[source]

What is op.eval(x).shape[0]?

get_jacobian(state: InternalState) ndarray[source]

Calculate jacobian of the operator at the given state.

class ddf.operator.LinearOperator(matrix: ndarray)[source]

Bases: Operator

evaluate(state: InternalState) ndarray[source]

Evaluate the operator on the given state.

get_eval_vec_len() int[source]

What is op.eval(x).shape[0]?

get_jacobian(state: InternalState) ndarray[source]

Calculate jacobian of the operator at the given state.

class ddf.operator.Operator[source]

Bases: ABC

abstractmethod evaluate(state: InternalState) ndarray[source]

Evaluate the operator on the given state.

get_cov(state: InternalState) ndarray[source]

Calculates the covariance of the state that will be transmitted to a neighbor.

abstractmethod get_eval_vec_len() int[source]

What is op.eval(x).shape[0]?

get_exchange_vector(state: InternalState) ExchangeVector[source]

Apply Operator to get full ExchangeVector.

(i.e. evaluate and get_cov)

abstractmethod get_jacobian(state: InternalState) ndarray[source]

Calculate jacobian of the operator at the given state.

Module contents