from hashlib import sha256 from random import randint, random, getrandbits from collections import Counter import time import math import asyncio import logging from logging import debug, error, info import matplotlib.pyplot as plt import networkx as nx import numpy as np EventId = str EventIds = list[EventId] def ntp_request() -> float: # add random clock drift if bool(getrandbits(1)): return time.time() + randint(0, 10) else: return time.time() - randint(0, 10) class NetworkPool: def __init__(self, nodes): self.nodes = nodes def request(self, event_id: EventId): for n in self.nodes: event = n.get_event(event_id) if event != None: return event return None class Event: def __init__(self, parents: EventIds): self.timestamp = ntp_request() self.parents = sorted(parents) def set_timestamp(self, timestamp): self.timestamp = timestamp # Hash of timestamp and the parents def hash(self) -> str: m = sha256() m.update(str.encode(str(self.timestamp))) for p in self.parents: m.update(str.encode(str(p))) return m.digest().hex() def __str__(self): res = f"{self.hash()}" for p in self.parents: res += f"\n |" res += f"\n - {p}" res += f"\n" return res class Graph: def __init__(self, max_time_diff): self.events = dict() self.heads = [] self.tails = [] self.max_time_diff = max_time_diff def add_event(self, event: Event): event_id = event.hash() if self.events.get(event_id) != None: return self.events[event_id] = event self.update_heads(event) self.update_tails(event) def update_tails(self, event): event_hash = event.hash() if event_hash in self.tails: return # Remove tails if they are parents of the given event for p in event.parents: if p in self.tails: self.tails.remove(p) # Add the event to tails self.tails.append(event_hash) self.tails = sorted(self.tails) def update_heads(self, event): event_hash = event.hash() if event_hash in self.heads: return # Remove heads if they are parents of the given event for p in event.parents: if p in self.heads: self.heads.remove(p) # Add the event to heads self.heads.append(event_hash) self.heads = sorted(self.heads) # Check if the event is too old from now, by subtracting current timestamp # from event timestamp, it must be more than `max_time_diff' to be consider # old event def is_old_event(self, event: Event): # Ignore genesis event if event.timestamp == 0.0: return False current_timestamp = ntp_request() diff = current_timestamp - event.timestamp if diff > self.max_time_diff: return True return False def prune_old_events(self): # Find the old events old_events = [eh for eh, ev in self.events.items() if self.is_old_event(ev)] # Remove the old events for eh in old_events: self.remove_event(eh) def remove_event(self, eh: EventId): self.events.pop(eh, None) # Remove old events from heads if eh in self.heads: self.heads.remove(eh) # Remove old events from tails if eh in self.tails: self.tails.remove(eh) # Check if given events are exist in the graph # return a list of missing events def check_events(self, events: EventIds) -> EventIds: return [e for e in events if self.events.get(e) == None] def __str__(self): res = "" for event in self.events.values(): res += f"\n {event}" return res class Node: def __init__(self, name: str, queue, max_time_diff): self.name = name self.orphan_pool = Graph(max_time_diff) self.active_pool = Graph(max_time_diff) self.queue = queue # Pruned events from active pool self.pruned_events = [] # The active pool should always start with one event genesis_event = Event([]) genesis_event.set_timestamp(0.0) self.active_pool.add_event(genesis_event) # On create new event def new_event(self): # Pruning old events from active pool self.active_pool.prune_old_events() return Event(self.active_pool.heads) # On receive new event def receive_new_event(self, event: Event, peer, np): debug(f"{self.name} receive event from {peer}: \n {event}") event_hash = event.hash() # Reject event with no parents if not event.parents: return # XXX Reject old event # no need for this simulation # if self.is_old_event(event): # return # Reject event already exist in active pool if not self.active_pool.check_events([event_hash]): return # Reject event already exist in orphan pool if not self.orphan_pool.check_events([event_hash]): return # Add the new event to the orphan pool self.orphan_pool.add_event(event) # This function is the core of syncing algorithm # # Find all the links from the new event to events in orphan pool # Bring these events to the active pool then add the new event self.relink_orphan(event, np) def relink_orphan(self, orphan, np): # Check if the parents of the orphan # are not missing from active pool missing_parents = self.active_pool.check_events(orphan.parents) missing_parents = self.check_pruned_events(missing_parents) if missing_parents: # Check the missing parents from orphan pool and sync with the # network for missing ones self.check_and_sync(list(missing_parents), np) # At this stage all the missing parents must be in the orphan pool # The next step is to move them to active pool self.add_linked_events_to_active_pool(missing_parents, []) # Check again that the parents of the orphan are in the active pool missing_parents = self.active_pool.check_events(orphan.parents) missing_parents = self.check_pruned_events(missing_parents) assert (not missing_parents) # Add the event to active pool self.activate_event(orphan) else: self.activate_event(orphan) # Last stage, Cleaning up the orphan pool: # - Remove orphan if it is too old according to `max_time_diff` # - Move orphan to active pool if it doesn't have any missing parents self.clean_pools() def clean_pools(self): self.active_pool.prune_old_events() for event in self.active_pool.events.values(): # Check if the event parents are old events old_parents = self.active_pool.check_events(event.parents) if not old_parents: continue # Add the event to tails if it has only old events as parents self.active_pool.update_tails(event) self.orphan_pool.prune_old_events() while True: active_list = [] for orphan in self.orphan_pool.events.values(): # Move the orphan to active pool if it doesn't have missing # parents in active pool missing_parents = self.active_pool.check_events(orphan.parents) if not missing_parents: active_list.append(orphan) if not active_list: break for ev in active_list: self.activate_event(ev) def check_and_sync(self, missing_events, np): debug(f"{self.name} check_and_sync() {missing_events}") while True: # Check if all missing parents are in orphan pool, otherwise # add them to request list request_list = [] self.scan_orphan_pool(request_list, missing_events, []) if not request_list: break missing_events = self.fetch_events_from_network(request_list, np) # Check the missing links inside orphan pool def scan_orphan_pool(self, request_list, events: EventIds, visited): debug(f"{self.name} check_missing_parents() {events}") for event_hash in events: # Check if the function already visit this event if event_hash in visited: continue visited.append(event_hash) # If the event in orphan pool, do recursive call to check its # parents as well, otherwise add the event to request_list event = self.orphan_pool.events.get(event_hash) if event == None: # Check first if it's not in the active pool if self.active_pool.events.get(event_hash) != None: continue # Check if it's not in pruned events if event_hash in self.pruned_events: continue request_list.append(event_hash) else: # Recursive call # Climb up for the event parents self.scan_orphan_pool(request_list, event.parents, visited) def fetch_events_from_network(self, request_list, np): debug(f"{self.name} fetch_events() {request_list}") # XXX # Send the events in request_list to the node who send this event. # # For simulation purpose the node fetch the missed events from the # network pool which contains all the nodes and its events result = [] for p in request_list: debug(f"{self.name} request from the network: {p}") # Request from the network requested_event = np.request(p) if requested_event == None: if p not in self.pruned_events: self.pruned_events.append(p) continue # Add it to the orphan pool self.orphan_pool.add_event(requested_event) result.extend(requested_event.parents) # Return parents of requested events return result def add_linked_events_to_active_pool(self, events, visited): debug(f"{self.name} add_linked_events_to_active_pool() {events}") for event_hash in events: # Check if it already visit this event if event_hash in visited: continue visited.append(event_hash) if self.active_pool.events.get(event_hash) != None: continue if event_hash in self.pruned_events: continue # Get the event from the orphan pool event = self.orphan_pool.events.get(event_hash) assert (event != None) # Add it to the active pool self.activate_event(event) # Recursive call # Climb up for the event parents self.add_linked_events_to_active_pool(event.parents, visited) def activate_event(self, event): # Add the event to active pool self.active_pool.add_event(event) # Remove event from orphan pool self.orphan_pool.remove_event(event.hash()) # Get an event from orphan pool or active pool def get_event(self, event_id: EventId): # Check the active_pool event = self.active_pool.events.get(event_id) # Check the orphan_pool if event == None: event = self.orphan_pool.events.get(event) return event # Clean up the given events from pruned events def check_pruned_events(self, events): return [ev for ev in events if ev not in self.pruned_events] def __str__(self): return f""" \n Name: {self.name} \n Active Pool: {self.active_pool} \n Orphan Pool: {self.orphan_pool}""" # Each node has `nodes_n` of this function running in the background # for receiving events from each node separately async def recv_loop(podm, node, peer, queue, np): while True: # Wait new event event = await queue.get() queue.task_done() if event == None: break if random() < podm: debug(f"{node.name} dropped: \n {event}") continue node.receive_new_event(event, peer, np) # Send new event at random intervals # Each node has this function running in the background async def send_loop(nodes_n, max_delay, broadcast_attempt, node): for _ in range(broadcast_attempt): await asyncio.sleep(randint(0, max_delay)) # Create new event with the last heads as parents event = node.new_event() debug(f"{node.name} broadcast event: \n {event}") for _ in range(nodes_n): await node.queue.put(event) await node.queue.join() """ Run a simulation with the provided params: nodes_n: number of nodes podm: probability of dropping events (ex: 0.30 -> %30) broadcast_attempt: number of events each node should broadcast max_time_diff: a max difference in time to detect an old event check: check if all nodes have the same graph """ async def run(nodes_n=3, podm=0.30, broadcast_attempt=3, max_time_diff=180.0, check=False, max_delay=None): debug(f"Running simulation with nodes: {nodes_n}, podm: {podm},\ broadcast_attempt: {broadcast_attempt}") if max_delay == None: max_delay = round(math.log(nodes_n)) broadcast_timeout = nodes_n * broadcast_attempt * max_delay nodes = [] info(f"Run {nodes_n} Nodes") try: # Initialize `nodes_n` nodes for i in range(nodes_n): queue = asyncio.Queue() node = Node(f"Node{i}", queue, max_time_diff) nodes.append(node) # Initialize NetworkPool contains all nodes np = NetworkPool(nodes) # Initialize `nodes_n` * `nodes_n` coroutine tasks for receiving events # Each node listen to all queues from the running nodes recv_tasks = [] for node in nodes: for n in nodes: recv_tasks.append(recv_loop(podm, node, n.name, n.queue, np)) r_g = asyncio.gather(*recv_tasks) # Create coroutine task contains send_loop function for each node # Run and wait for send tasks s_g = asyncio.gather( *[send_loop(nodes_n, max_delay, broadcast_attempt, n) for n in nodes]) await asyncio.wait_for(s_g, broadcast_timeout) # Gracefully stop all receiving tasks for n in nodes: for _ in range(nodes_n): await n.queue.put(None) await n.queue.join() await r_g if check: for node in nodes: debug(node) # Assert if all nodes share the same active pool graph assert (all(n.active_pool.events.keys() == nodes[0].active_pool.events.keys() for n in nodes)) # Assert if all nodes share the same orphan pool graph assert (all(n.orphan_pool.events.keys() == nodes[0].orphan_pool.events.keys() for n in nodes)) return nodes except asyncio.exceptions.TimeoutError: error("Broadcast TimeoutError") async def main(sim_n=6, nodes_increase=False, podm_increase=False, time_diff_decrease=False): # run the simulation `sim_n` times, while enabling one of these params: # - increasing `podm` # - increasing `nodes_n` # - decreasing `max_time_diff` if nodes_increase: podm_increase = False time_diff_decrease = False if podm_increase: time_diff_decrease = False # number of nodes nodes_n = 100 # probability of dropping events podm = 0.0 # a max difference in time to detect an old event max_time_diff = 60 # seconds # number of events each node should broadcast broadcast_attempt = 10 # Number of nodes get increase in each simulation sim_nodes_inc = int(nodes_n / 5) # A value get add to `podm` in each simulation sim_podm_inc = podm / 5 # A value get subtract from `max_time_diff` in each simulation sim_diff_time_dec = max_time_diff / 10 # Contains the nodes for each simulation simulations = [] # Contains the `podm` variables for each simulation podm_list = [] # Contains the `max_time_diff` variables for each simulation mtd_list = [] podm_tmp = podm nodes_n_tmp = nodes_n time_diff_tmp = max_time_diff for _ in range(sim_n): nodes = await run(nodes_n_tmp, podm_tmp, broadcast_attempt, max_time_diff=time_diff_tmp) simulations.append(nodes) podm_list.append(podm_tmp) mtd_list.append(time_diff_tmp) if nodes_increase: nodes_n_tmp += sim_nodes_inc if podm_increase: podm_tmp += sim_podm_inc if time_diff_decrease: time_diff_tmp -= sim_diff_time_dec # Numbers of nodes for each simulation nodes_n_list = [] # Synced events percentage for each simulation events_synced_perc = [] # Number of events in active pool for each simulations active_events = [] # Number of events in pruned events list for each simulations pruned_events = [] for nodes in simulations: nodes_n = len(nodes) nodes_n_list.append(nodes_n) events = Counter() p_events = Counter() for node in nodes: events.update(list(node.active_pool.events.keys())) p_events.update(node.pruned_events) expect_events_synced = (nodes_n * broadcast_attempt) + 1 actual_events_synced = 0 for val in events.values(): # If the event is fully synced with all nodes if val == nodes_n: actual_events_synced += 1 pruned_events_synced = 0 for val in p_events.values(): # If the pruned event is fully synced with all nodes if val == nodes_n: pruned_events_synced += 1 res = (actual_events_synced * 100) / expect_events_synced events_synced_perc.append(res) active_events.append(actual_events_synced) pruned_events.append(pruned_events_synced) info(f"nodes_n: {nodes_n}") info(f"actual_events_synced: {actual_events_synced}") info(f"expect_events_synced: {expect_events_synced}") info(f"pruned_events_synced: {pruned_events_synced}") info(f"res: %{res}") # Disable logging for matplotlib logging.disable() if nodes_increase: plt.plot(nodes_n_list, events_synced_perc) plt.ylim(0, 100) plt.title( f"Event Graph simulation with %{podm * 100} probability of dropping messages") plt.ylabel( f"Events sync percentage (each node broadcast {broadcast_attempt} events)") plt.xlabel("Number of nodes") plt.show() return if podm_increase: plt.plot(podm_list, events_synced_perc) plt.ylim(0, 100) plt.title(f"Event Graph simulation with {nodes_n} nodes") plt.ylabel( f"Events sync percentage (each node broadcast {broadcast_attempt} events)") plt.xlabel("Probability of dropping messages") plt.show() return if time_diff_decrease: x = np.arange(len(mtd_list)) # the label locations width = 0.35 # the width of the bars fig, ax = plt.subplots() rects1 = ax.bar(x - width/2, active_events, width, label='Active') rects2 = ax.bar(x + width/2, pruned_events, width, label='Pruned') plt.title( f"Event Graph simulation with %{podm * 100} probability of dropping messages, and {nodes_n} nodes") plt.ylabel( f"Number of events broadcasted during the simulation (each node broadcast {broadcast_attempt} events)") plt.xlabel("A time duration to detect old events (in seconds)") plt.ylim(0, (broadcast_attempt * nodes_n) + 1) ax.set_xticks(x, mtd_list) ax.legend() ax.bar_label(rects1, padding=3) ax.bar_label(rects2, padding=3) fig.tight_layout() plt.show() return def print_network_graph(node, unpruned=False): logging.disable() graph = nx.Graph() if unpruned: for (h, ev) in node.unpruned_active_pool.events.items(): graph.add_node(h[:5]) graph.add_edges_from([(h[:5], p[:5]) for p in ev.parents]) else: for (h, ev) in node.active_pool.events.items(): graph.add_node(h[:5]) graph.add_edges_from([(h[:5], p[:5]) for p in ev.parents]) colors = [] for n in graph.nodes(): if any(n == t[:5] for t in node.tails): colors.append("red") elif any(n == h[:5] for h in node.heads): colors.append("yellow") else: colors.append("#697aff") nx.draw_networkx(graph, with_labels=True, node_color=colors) plt.show() if __name__ == "__main__": logging.basicConfig(level=logging.DEBUG, handlers=[logging.FileHandler("debug.log", mode="w"), logging.StreamHandler()]) #nodes = asyncio.run(run(nodes_n=14, podm=0, broadcast_attempt=4, # max_time_diff=30,check=True)) # print_network_graph(nodes[0]) # print_network_graph(nodes[0], unpruned=True) # asyncio.run(main(time_diff_decrease=True))