Source code for ddf.communicator

from abc import ABC, abstractmethod

from .message import Message


[docs] class Communicator(ABC):
[docs] @abstractmethod def send_message(self, msg: Message) -> None: raise NotImplementedError()
[docs] @abstractmethod def get_messages(self) -> list[Message]: raise NotImplementedError()
[docs] class MockCommunicatorBuilder: """Create mock communicators out of a builder. All `Communicators` created from one instance of `MockCommunicatorBuilder` will be able to send messages to each other. There is no filtering on who will get messages - everyone will get every message (except for messages sent by themselves). """ communicators: list[tuple[Communicator, list[Message]]]
[docs] class InternalCommunicator(Communicator): parent: "MockCommunicatorBuilder" index: int def __init__(self, parent: "MockCommunicatorBuilder", index: int) -> None: self.parent = parent self.index = index
[docs] def send_message(self, msg: Message) -> None: return self.parent.send_message(msg, self.index)
[docs] def get_messages(self) -> list[Message]: return self.parent.get_messages(self.index)
[docs] def peek_messages(self) -> list[Message]: """Retrieve messages without clearing them.""" return self.parent.communicators[self.index][1].copy()
def __init__(self) -> None: self.communicators = []
[docs] def get_instance(self) -> InternalCommunicator: c = MockCommunicatorBuilder.InternalCommunicator(self, len(self.communicators)) self.communicators.append((c, [])) return c
[docs] def send_message(self, msg: Message, index: int) -> None: """Send messages to all communicators except index.""" for i, c in enumerate(self.communicators): if i != index: c[1].append(msg)
[docs] def get_messages(self, index: int) -> list[Message]: """Retreive all messages for communicator at index.""" result = self.communicators[index][1].copy() self.communicators[index][1].clear() return result