Source code for ethp2psim.simulator

import numpy as np
from tqdm.auto import tqdm
from scipy.stats import entropy
from .message import Message
from .protocols import Protocol, DandelionProtocol
from .adversary import Adversary
from .network import Network
from typing import Optional, List, Iterable


[docs]class Simulator: """ Abstraction to simulate message passing on a P2P network Parameters ---------- adversary : adversary.Adversary adversary that observe messages in the P2P network num_msg : Optional[int] (Default: 10) number of messages to simulate use_node_weights : bool sample message sources with respect to node weights messages : Optional[List[Message]] Set messages manually seed: int (optional) Random seed (disabled by default) Examples -------- Sample message sources with respect to stake distribution >>> from .network import * >>> from .adversary import Adversary >>> from .protocols import BroadcastProtocol >>> nw_gen = NodeWeightGenerator('stake') >>> ew_gen = EdgeWeightGenerator('normal') >>> net = Network(nw_gen, ew_gen, 10, 3) >>> protocol = BroadcastProtocol(net, broadcast_mode='all') >>> adversary = Adversary(protocol, 0.4) >>> simulator = Simulator(adversary, 20, use_node_weights=True) >>> len(simulator.messages) 20 Set 5 messages originating from node 0 >>> from .network import * >>> from .message import Message >>> from .adversary import Adversary >>> from .protocols import BroadcastProtocol >>> nw_gen = NodeWeightGenerator('stake') >>> ew_gen = EdgeWeightGenerator('normal') >>> net = Network(nw_gen, ew_gen, 10, 3) >>> protocol = BroadcastProtocol(net, broadcast_mode='all') >>> adversary = Adversary(protocol, 0.4) >>> simulator = Simulator(adversary, messages=[Message(0) for _ in range(5)]) >>> len(simulator.messages) 5 >>> simulator.message_sources [0, 0, 0, 0, 0] """ def __init__( self, adversary: Adversary, num_msg: Optional[int] = 10, use_node_weights: bool = False, messages: Optional[List[Message]] = None, seed: Optional[int] = None, verbose: bool = False, ): self._rng = np.random.default_rng(seed) if num_msg > 10: self.verbose = False else: self.verbose = verbose self.adversary = adversary self.use_node_weights = use_node_weights if messages != None: self._messages = messages elif num_msg != None: # NOTE: by default adversary nodes don't send messages in the simulation - they only observe self._messages = [ Message(sender) for sender in self.adversary.protocol.network.sample_random_nodes( num_msg, replace=True, use_weights=use_node_weights, exclude=self.adversary.nodes, rng=self._rng, ) ] else: raise ValueError("One of `num_msg` or `messages` should not be None!") self._executed = False @property def messages(self): return self._messages @property def message_sources(self): return [msg.source for msg in self.messages] def run( self, coverage_threshold: float = 1.0, max_trials: int = 100, disable_progress_bar: bool = True, ) -> list: """ Run simulation Parameters ---------- coverage_threshold : float stop propagating a message if it reached the given fraction of network nodes max_trials : int stop propagating a message if it does not reach any new nodes within `max_trials` steps Examples -------- Run simulation until each message reaches 90% of all nodes >>> from .network import * >>> from .adversary import Adversary >>> from .protocols import BroadcastProtocol >>> seed = 42 >>> nw_gen = NodeWeightGenerator('stake') >>> ew_gen = EdgeWeightGenerator('normal') >>> net = Network(nw_gen, ew_gen, 10, 3, seed=seed) >>> protocol = BroadcastProtocol(net, broadcast_mode='all', seed=seed) >>> adversary = Adversary(protocol, 0.4, seed=seed) >>> simulator = Simulator(adversary, 5, use_node_weights=True, seed=seed) >>> len(simulator.messages) 5 >>> simulator.run(coverage_threshold=0.9) [0.9, 0.9, 0.9, 0.9, 0.9] """ coverage_for_messages = [] for msg in tqdm(self.messages, disable=disable_progress_bar): node_coverage = 0.0 delta = 1.0 num_trials = 0 while node_coverage < coverage_threshold and num_trials < max_trials: old_node_coverage = node_coverage node_coverage, spreading_phase, stop = msg.process(self.adversary) if stop: break if node_coverage > old_node_coverage: num_trials = 0 else: num_trials += 1 if self.verbose: print(msg.mid, node_coverage, num_trials) # NOTE: flushing message queue is essetial to correctly calculate the deanonymization power of the adversary msg.flush_queue(self.adversary) if self.verbose: print() coverage_for_messages.append(node_coverage) self._executed = True return coverage_for_messages def node_contact_time_quantiles( self, q=np.arange(0.1, 1.0, 0.1) ) -> Iterable[np.array]: """ Calculate the mean and the standard deviation for first node contact time quantiles Parameters ---------- q : list (Default: numpy.arange(0.1, 1.0, 0.1))) Node quantiles Examples -------- Observe the mean and standard deviation of propagation times until the messages reach 50% and 95% of all nodes. >>> from .network import * >>> from .adversary import Adversary >>> from .protocols import BroadcastProtocol >>> seed = 42 >>> nw_gen = NodeWeightGenerator('stake') >>> ew_gen = EdgeWeightGenerator('normal') >>> net = Network(nw_gen, ew_gen, 10, 3, seed=seed) >>> protocol = BroadcastProtocol(net, broadcast_mode='all', seed=seed) >>> adversary = Adversary(protocol, 0.4, seed=seed) >>> simulator = Simulator(adversary, 5, use_node_weights=True, seed=seed) >>> _ = simulator.run() >>> mean_t, std_t = simulator.node_contact_time_quantiles(q=[0.5,0.95]) >>> # messages on average reach 50% of all nodes within 273 milliseconds. >>> mean_t array([273.25546384, 460.11754328]) >>> std_t array([24.51004285, 11.76789887]) """ if self._executed: contact_time_quantiles = [] for msg in self.messages: first_contact_times = [ contasts[0].delay for node, contasts in msg.history.items() ] contact_time_quantiles.append(list(np.quantile(first_contact_times, q))) quantile_mx = np.array(contact_time_quantiles) mean_quantiles = np.mean(quantile_mx, axis=0) std_quantiles = np.std(quantile_mx, axis=0) return (mean_quantiles, std_quantiles) else: raise RuntimeError( "Execute the `run()` function before querying node contact times!" )
[docs]class Evaluator: """ Measures the deanonymization performance of the adversary for a given simulation Parameters ---------- simulator : Simulator Specify the simulation to evaluate estimator : {'first_reach', 'first_sent', 'dummy'}, default 'first_reach' Define adversary stategy to predict source node for each message: * first_reach: the node from whom the adversary first heard the message is assigned 1.0 probability while every other node receives zero. * first_sent: the node that sent the message the earliest to the receiver * dummy: the probability is divided equally between non-adversary nodes. Examples -------- Observe the complete evaluation pipeline below. First, initialize network, protocol, adversary. Then, simulate 20 messages with these components. Finally, query the report using the first sent estimator for the aversary. >>> from .network import * >>> from .adversary import Adversary >>> from .protocols import BroadcastProtocol >>> seed = 42 >>> nw_gen = NodeWeightGenerator('stake') >>> ew_gen = EdgeWeightGenerator('normal') >>> net = Network(nw_gen, ew_gen, 10, 3, seed=seed) >>> protocol = BroadcastProtocol(net, broadcast_mode='all', seed=seed) >>> adversary = Adversary(protocol, 0.4, seed=seed) >>> simulator = Simulator(adversary, 20, use_node_weights=True, seed=seed) >>> _ = simulator.run() >>> evaluator = Evaluator(simulator, estimator='first_sent') >>> evaluator.get_report() {'estimator': 'first_sent', 'hit_ratio': 0.25, 'inverse_rank': 0.32499999999999996, 'entropy': 0.0, 'ndcg': 0.46679861973841597, 'message_spread_ratio': 1.0} """ def __init__(self, simulator: Simulator, estimator: str = "first_reach"): self.simulator = simulator self.estimator = estimator self.probas = simulator.adversary.predict_msg_source(estimator=self.estimator) # method='first' is used to properly resolve ties for calculating exact hits self.proba_ranks = self.probas.rank(axis=1, ascending=False, method="first") @property def num_nodes(self): return self.simulator.adversary.protocol.network.num_nodes @property def message_spread_ratios(self): return [len(msg.history) / self.num_nodes for msg in self.simulator.messages] @property def exact_hits(self): hits = [] for msg in self.simulator.messages: # adversary might not see every message if ( msg.mid in self.probas.index and self.proba_ranks.loc[msg.mid, msg.source] == 1.0 ): hits.append(1.0) else: hits.append(0.0) return np.array(hits) @property def ranks(self): ranks = [] for msg in self.simulator.messages: # adversary might not see every message if msg.mid in self.probas.index: ranks.append(self.proba_ranks.loc[msg.mid, msg.source]) else: # passive approach: let's suppose we make the worst prediction ranks.append(self.num_nodes) return np.array(ranks) @property def inverse_ranks(self): return 1.0 / self.ranks @property def ndcg_scores(self): scores = [] for msg in self.simulator.messages: # adversary might not see every message if msg.mid in self.probas.index: rank = self.proba_ranks.loc[msg.mid, msg.source] else: # passive approach: let's suppose we make the worst prediction rank = self.num_nodes ndcg = 1.0 / np.log2(1.0 + rank) scores.append(ndcg) return np.array(scores) @property def entropies(self): rnd_entropy = entropy(1.0 / self.num_nodes * np.ones(self.num_nodes), base=2) entropies = [] for msg in self.simulator.messages: # adversary might not see every message if msg.mid in self.probas.index: entropies.append(entropy(self.probas.loc[msg.mid].values, base=2)) else: entropies.append(rnd_entropy) return np.array(entropies) def get_report(self) -> dict: """Calculate mean performance of the adversary for the given simulation""" return { "estimator": self.estimator, "hit_ratio": np.mean(self.exact_hits), "inverse_rank": np.mean(self.inverse_ranks), "entropy": np.mean(self.entropies), "ndcg": np.mean(self.ndcg_scores), "message_spread_ratio": np.mean(self.message_spread_ratios), }