script/research/event_graph: a little change in the implementation

This commit is contained in:
ghassmo
2022-11-10 10:19:43 +04:00
parent 5f072d1c7f
commit acd16ee999

View File

@@ -65,50 +65,22 @@ class Event:
class Graph:
def __init__(self):
def __init__(self, max_time_diff):
self.events = dict()
self.heads = []
self.tails = []
self.max_time_diff = max_time_diff
def add_event(self, event: Event):
self.events[event.hash()] = event
event_id = event.hash()
def remove_event(self, event_id: EventId):
self.events.pop(event_id, None)
if self.events.get(event_id) != None:
return
# Check if given events are exist in the graph
# return a list of missing events
def check_events(self, events: EventIds) -> EventIds:
return [e for e in events if self.events.get(e) == None]
self.events[event_id] = event
def __str__(self):
res = ""
for event in self.events.values():
res += f"\n {event}"
return res
class Node:
def __init__(self, name: str, queue, max_time_diff):
self.name = name
self.orphan_pool = Graph()
self.active_pool = Graph()
self.queue = queue
self.max_time_diff = max_time_diff
# Pruned events from active pool
self.pruned_events = []
# These variables are for the demonstration purpose
self.unpruned_active_pool = Graph()
self.unpruned_orphan_pool = Graph()
# The active pool should always start with one event
genesis_event = Event([])
genesis_event.set_timestamp(0.0)
self.active_pool.add_event(genesis_event)
# On the initialization add the genesis event to both heads and tails
# list
self.tails = [genesis_event.hash()]
self.heads = [genesis_event.hash()]
self.update_heads(event)
self.update_tails(event)
def update_tails(self, event):
event_hash = event.hash()
@@ -140,11 +112,71 @@ class Node:
self.heads.append(event_hash)
self.heads = sorted(self.heads)
# Check if the event is too old from now, by subtracting current timestamp
# from event timestamp, it must be more than `max_time_diff' to be consider
# old event
def is_old_event(self, event: Event):
# Ignore genesis event
if event.timestamp == 0.0:
return False
current_timestamp = ntp_request()
diff = current_timestamp - event.timestamp
if diff > self.max_time_diff:
return True
return False
def prune_old_events(self):
# Find the old events
old_events = [eh for eh, ev in self.events.items() if
self.is_old_event(ev)]
# Remove the old events
for eh in old_events:
self.remove_event(eh)
def remove_event(self, eh: EventId):
self.events.pop(eh, None)
# Remove old events from heads
if eh in self.heads:
self.heads.remove(eh)
# Remove old events from tails
if eh in self.tails:
self.tails.remove(eh)
# Check if given events are exist in the graph
# return a list of missing events
def check_events(self, events: EventIds) -> EventIds:
return [e for e in events if self.events.get(e) == None]
def __str__(self):
res = ""
for event in self.events.values():
res += f"\n {event}"
return res
class Node:
def __init__(self, name: str, queue, max_time_diff):
self.name = name
self.orphan_pool = Graph(max_time_diff)
self.active_pool = Graph(max_time_diff)
self.queue = queue
# Pruned events from active pool
self.pruned_events = []
# The active pool should always start with one event
genesis_event = Event([])
genesis_event.set_timestamp(0.0)
self.active_pool.add_event(genesis_event)
# On create new event
def new_event(self):
# Pruning old events from active pool
self.prune_old_events()
return Event(self.heads)
self.active_pool.prune_old_events()
return Event(self.active_pool.heads)
# On receive new event
def receive_new_event(self, event: Event, peer, np):
@@ -170,10 +202,6 @@ class Node:
# Add the new event to the orphan pool
self.orphan_pool.add_event(event)
self.unpruned_orphan_pool.add_event(event)
# Pruning old events from active pool
self.prune_old_events()
# This function is the core of syncing algorithm
#
@@ -185,7 +213,7 @@ class Node:
# Check if the parents of the orphan
# are not missing from active pool
missing_parents = self.active_pool.check_events(orphan.parents)
missing_parents = self.check_pruned_events(missing_parents)
missing_parents = self.check_pruned_events(missing_parents)
if missing_parents:
# Check the missing parents from orphan pool and sync with the
# network for missing ones
@@ -197,83 +225,53 @@ class Node:
# Check again that the parents of the orphan are in the active pool
missing_parents = self.active_pool.check_events(orphan.parents)
missing_parents = self.check_pruned_events(missing_parents)
missing_parents = self.check_pruned_events(missing_parents)
assert (not missing_parents)
# Add the event to active pool
self.add_to_active_pool(orphan)
self.activate_event(orphan)
else:
self.add_to_active_pool(orphan)
self.activate_event(orphan)
# Last stage, Cleaning up the orphan pool:
# - Remove orphan if it is too old according to `max_time_diff`
# - Move orphan to active pool if it doesn't have any missing parents
self.clean_orphan_pool()
self.clean_pools()
def prune_old_events(self):
debug(f"{self.name} prune_old_events()")
# old events in active pool
oe_active_p = [eh for eh, ev in self.active_pool.events.items() if
self.is_old_event(ev)]
if not oe_active_p:
return
# Remove the old events in active pool
for eh in oe_active_p:
self.active_pool.remove_event(eh)
self.pruned_events.append(eh)
# Remove old events from heads
if eh in self.heads:
self.heads.remove(eh)
# Remove old events from tails
if eh in self.tails:
self.tails.remove(eh)
def clean_pools(self):
self.active_pool.prune_old_events()
for event in self.active_pool.events.values():
# Check if the event parents are old events
parents = self.check_pruned_events(event.parents)
old_parents = self.active_pool.check_events(event.parents)
if parents:
if not old_parents:
continue
# Add the event to tails if it has only old events as parents
self.update_tails(event)
self.active_pool.update_tails(event)
self.orphan_pool.prune_old_events()
def clean_orphan_pool(self):
debug(f"{self.name} clean_orphan_pool()")
while True:
active_list = []
old_events = []
for oh, orphan in self.orphan_pool.events.items():
if self.is_old_event(orphan):
old_events.append(oh)
continue
for orphan in self.orphan_pool.events.values():
# Move the orphan to active pool if it doesn't have missing
# parents in active pool
missing_parents = self.active_pool.check_events(orphan.parents)
missing_parents = self.check_pruned_events(missing_parents)
if not missing_parents:
active_list.append(orphan)
for oh in old_events:
self.orphan_pool.remove_event(oh)
self.pruned_events.append(oh)
if not active_list:
break
for ev in active_list:
self.add_to_active_pool(ev)
self.activate_event(ev)
def check_and_sync(self, missing_events, np):
debug(f"{self.name} check_and_sync() {missing_events}")
@@ -339,7 +337,6 @@ class Node:
# Add it to the orphan pool
self.orphan_pool.add_event(requested_event)
self.unpruned_orphan_pool.add_event(requested_event)
result.extend(requested_event.parents)
# Return parents of requested events
@@ -365,22 +362,17 @@ class Node:
assert (event != None)
# Add it to the active pool
self.add_to_active_pool(event)
self.activate_event(event)
# Recursive call
# Climb up for the event parents
self.add_linked_events_to_active_pool(event.parents, visited)
def add_to_active_pool(self, event):
def activate_event(self, event):
# Add the event to active pool
self.active_pool.add_event(event)
self.unpruned_active_pool.add_event(event)
# Update heads
self.update_heads(event)
# Remove event from orphan pool
self.orphan_pool.remove_event(event.hash())
self.unpruned_orphan_pool.remove_event(event.hash())
# Get an event from orphan pool or active pool
def get_event(self, event_id: EventId):
@@ -396,28 +388,11 @@ class Node:
def check_pruned_events(self, events):
return [ev for ev in events if ev not in self.pruned_events]
# Check if the event is too old from now, by subtracting current timestamp
# from event timestamp, it must be more than `max_time_diff' to be consider
# old event
def is_old_event(self, event: Event):
# Ignore genesis event
if event.timestamp == 0.0:
return False
current_timestamp = ntp_request()
diff = current_timestamp - event.timestamp
if diff > self.max_time_diff:
return True
return False
def __str__(self):
return f"""
\n Name: {self.name}
\n Active Pool: {self.active_pool}
\n Orphan Pool: {self.orphan_pool}
\n Pruned Events: {self.pruned_events}
\n Heads: {self.heads}
\n Tails: {self.tails}"""
\n Orphan Pool: {self.orphan_pool}"""
# Each node has `nodes_n` of this function running in the background
@@ -524,12 +499,6 @@ async def run(nodes_n=3, podm=0.30, broadcast_attempt=3, max_time_diff=180.0,
assert (all(n.orphan_pool.events.keys() ==
nodes[0].orphan_pool.events.keys() for n in nodes))
# Assert if all heads are equal
assert (all(n.heads == nodes[0].heads for n in nodes))
# Assert if all tails are equal
assert (all(n.tails == nodes[0].tails for n in nodes))
return nodes
except asyncio.exceptions.TimeoutError:
@@ -734,10 +703,10 @@ if __name__ == "__main__":
handlers=[logging.FileHandler("debug.log", mode="w"),
logging.StreamHandler()])
# nodes = asyncio.run(run(nodes_n=14, podm=0, broadcast_attempt=4,
# max_time_diff=30,check=True))
#nodes = asyncio.run(run(nodes_n=14, podm=0, broadcast_attempt=4,
# max_time_diff=30,check=True))
# print_network_graph(nodes[0])
# print_network_graph(nodes[0], unpruned=True)
asyncio.run(main(time_diff_decrease=True))
# asyncio.run(main(time_diff_decrease=True))