From acd16ee99903e418992d950276123efb652cd857 Mon Sep 17 00:00:00 2001 From: ghassmo Date: Thu, 10 Nov 2022 10:19:43 +0400 Subject: [PATCH] script/research/event_graph: a little change in the implementation --- script/research/event_graph/main.py | 221 ++++++++++++---------------- 1 file changed, 95 insertions(+), 126 deletions(-) diff --git a/script/research/event_graph/main.py b/script/research/event_graph/main.py index f5e06ac70..6f0af0809 100644 --- a/script/research/event_graph/main.py +++ b/script/research/event_graph/main.py @@ -65,50 +65,22 @@ class Event: class Graph: - def __init__(self): + def __init__(self, max_time_diff): self.events = dict() + self.heads = [] + self.tails = [] + self.max_time_diff = max_time_diff def add_event(self, event: Event): - self.events[event.hash()] = event + event_id = event.hash() - def remove_event(self, event_id: EventId): - self.events.pop(event_id, None) + if self.events.get(event_id) != None: + return - # Check if given events are exist in the graph - # return a list of missing events - def check_events(self, events: EventIds) -> EventIds: - return [e for e in events if self.events.get(e) == None] + self.events[event_id] = event - def __str__(self): - res = "" - for event in self.events.values(): - res += f"\n {event}" - return res - - -class Node: - def __init__(self, name: str, queue, max_time_diff): - self.name = name - self.orphan_pool = Graph() - self.active_pool = Graph() - self.queue = queue - self.max_time_diff = max_time_diff - # Pruned events from active pool - self.pruned_events = [] - - # These variables are for the demonstration purpose - self.unpruned_active_pool = Graph() - self.unpruned_orphan_pool = Graph() - - # The active pool should always start with one event - genesis_event = Event([]) - genesis_event.set_timestamp(0.0) - self.active_pool.add_event(genesis_event) - - # On the initialization add the genesis event to both heads and tails - # list - self.tails = [genesis_event.hash()] - self.heads = [genesis_event.hash()] + self.update_heads(event) + self.update_tails(event) def update_tails(self, event): event_hash = event.hash() @@ -140,11 +112,71 @@ class Node: self.heads.append(event_hash) self.heads = sorted(self.heads) + # Check if the event is too old from now, by subtracting current timestamp + # from event timestamp, it must be more than `max_time_diff' to be consider + # old event + def is_old_event(self, event: Event): + # Ignore genesis event + if event.timestamp == 0.0: + return False + + current_timestamp = ntp_request() + diff = current_timestamp - event.timestamp + if diff > self.max_time_diff: + return True + return False + + def prune_old_events(self): + # Find the old events + old_events = [eh for eh, ev in self.events.items() if + self.is_old_event(ev)] + + # Remove the old events + for eh in old_events: + self.remove_event(eh) + + def remove_event(self, eh: EventId): + self.events.pop(eh, None) + # Remove old events from heads + if eh in self.heads: + self.heads.remove(eh) + + # Remove old events from tails + if eh in self.tails: + self.tails.remove(eh) + + # Check if given events are exist in the graph + # return a list of missing events + def check_events(self, events: EventIds) -> EventIds: + return [e for e in events if self.events.get(e) == None] + + def __str__(self): + res = "" + for event in self.events.values(): + res += f"\n {event}" + return res + + +class Node: + def __init__(self, name: str, queue, max_time_diff): + self.name = name + self.orphan_pool = Graph(max_time_diff) + self.active_pool = Graph(max_time_diff) + self.queue = queue + + # Pruned events from active pool + self.pruned_events = [] + + # The active pool should always start with one event + genesis_event = Event([]) + genesis_event.set_timestamp(0.0) + self.active_pool.add_event(genesis_event) + # On create new event def new_event(self): # Pruning old events from active pool - self.prune_old_events() - return Event(self.heads) + self.active_pool.prune_old_events() + return Event(self.active_pool.heads) # On receive new event def receive_new_event(self, event: Event, peer, np): @@ -170,10 +202,6 @@ class Node: # Add the new event to the orphan pool self.orphan_pool.add_event(event) - self.unpruned_orphan_pool.add_event(event) - - # Pruning old events from active pool - self.prune_old_events() # This function is the core of syncing algorithm # @@ -185,7 +213,7 @@ class Node: # Check if the parents of the orphan # are not missing from active pool missing_parents = self.active_pool.check_events(orphan.parents) - missing_parents = self.check_pruned_events(missing_parents) + missing_parents = self.check_pruned_events(missing_parents) if missing_parents: # Check the missing parents from orphan pool and sync with the # network for missing ones @@ -197,83 +225,53 @@ class Node: # Check again that the parents of the orphan are in the active pool missing_parents = self.active_pool.check_events(orphan.parents) - missing_parents = self.check_pruned_events(missing_parents) + missing_parents = self.check_pruned_events(missing_parents) assert (not missing_parents) # Add the event to active pool - self.add_to_active_pool(orphan) + self.activate_event(orphan) else: - self.add_to_active_pool(orphan) + self.activate_event(orphan) # Last stage, Cleaning up the orphan pool: # - Remove orphan if it is too old according to `max_time_diff` # - Move orphan to active pool if it doesn't have any missing parents - self.clean_orphan_pool() + self.clean_pools() + - def prune_old_events(self): - debug(f"{self.name} prune_old_events()") - - # old events in active pool - oe_active_p = [eh for eh, ev in self.active_pool.events.items() if - self.is_old_event(ev)] - - if not oe_active_p: - return - - # Remove the old events in active pool - for eh in oe_active_p: - self.active_pool.remove_event(eh) - self.pruned_events.append(eh) - - # Remove old events from heads - if eh in self.heads: - self.heads.remove(eh) - - # Remove old events from tails - if eh in self.tails: - self.tails.remove(eh) + def clean_pools(self): + self.active_pool.prune_old_events() for event in self.active_pool.events.values(): # Check if the event parents are old events - parents = self.check_pruned_events(event.parents) + old_parents = self.active_pool.check_events(event.parents) - if parents: + if not old_parents: continue - + # Add the event to tails if it has only old events as parents - self.update_tails(event) + self.active_pool.update_tails(event) + self.orphan_pool.prune_old_events() - - def clean_orphan_pool(self): - debug(f"{self.name} clean_orphan_pool()") while True: active_list = [] - old_events = [] - for oh, orphan in self.orphan_pool.events.items(): - - if self.is_old_event(orphan): - old_events.append(oh) - continue + for orphan in self.orphan_pool.events.values(): # Move the orphan to active pool if it doesn't have missing # parents in active pool missing_parents = self.active_pool.check_events(orphan.parents) - missing_parents = self.check_pruned_events(missing_parents) if not missing_parents: active_list.append(orphan) - - for oh in old_events: - self.orphan_pool.remove_event(oh) - self.pruned_events.append(oh) - + if not active_list: break for ev in active_list: - self.add_to_active_pool(ev) + self.activate_event(ev) + def check_and_sync(self, missing_events, np): debug(f"{self.name} check_and_sync() {missing_events}") @@ -339,7 +337,6 @@ class Node: # Add it to the orphan pool self.orphan_pool.add_event(requested_event) - self.unpruned_orphan_pool.add_event(requested_event) result.extend(requested_event.parents) # Return parents of requested events @@ -365,22 +362,17 @@ class Node: assert (event != None) # Add it to the active pool - self.add_to_active_pool(event) + self.activate_event(event) # Recursive call # Climb up for the event parents self.add_linked_events_to_active_pool(event.parents, visited) - def add_to_active_pool(self, event): + def activate_event(self, event): # Add the event to active pool self.active_pool.add_event(event) - self.unpruned_active_pool.add_event(event) - - # Update heads - self.update_heads(event) # Remove event from orphan pool self.orphan_pool.remove_event(event.hash()) - self.unpruned_orphan_pool.remove_event(event.hash()) # Get an event from orphan pool or active pool def get_event(self, event_id: EventId): @@ -396,28 +388,11 @@ class Node: def check_pruned_events(self, events): return [ev for ev in events if ev not in self.pruned_events] - # Check if the event is too old from now, by subtracting current timestamp - # from event timestamp, it must be more than `max_time_diff' to be consider - # old event - def is_old_event(self, event: Event): - # Ignore genesis event - if event.timestamp == 0.0: - return False - - current_timestamp = ntp_request() - diff = current_timestamp - event.timestamp - if diff > self.max_time_diff: - return True - return False - def __str__(self): return f""" \n Name: {self.name} \n Active Pool: {self.active_pool} - \n Orphan Pool: {self.orphan_pool} - \n Pruned Events: {self.pruned_events} - \n Heads: {self.heads} - \n Tails: {self.tails}""" + \n Orphan Pool: {self.orphan_pool}""" # Each node has `nodes_n` of this function running in the background @@ -524,12 +499,6 @@ async def run(nodes_n=3, podm=0.30, broadcast_attempt=3, max_time_diff=180.0, assert (all(n.orphan_pool.events.keys() == nodes[0].orphan_pool.events.keys() for n in nodes)) - # Assert if all heads are equal - assert (all(n.heads == nodes[0].heads for n in nodes)) - - # Assert if all tails are equal - assert (all(n.tails == nodes[0].tails for n in nodes)) - return nodes except asyncio.exceptions.TimeoutError: @@ -734,10 +703,10 @@ if __name__ == "__main__": handlers=[logging.FileHandler("debug.log", mode="w"), logging.StreamHandler()]) - # nodes = asyncio.run(run(nodes_n=14, podm=0, broadcast_attempt=4, - # max_time_diff=30,check=True)) + #nodes = asyncio.run(run(nodes_n=14, podm=0, broadcast_attempt=4, + # max_time_diff=30,check=True)) # print_network_graph(nodes[0]) # print_network_graph(nodes[0], unpruned=True) - asyncio.run(main(time_diff_decrease=True)) + # asyncio.run(main(time_diff_decrease=True))