script/research/event_graph: clean up the code

This commit is contained in:
ghassmo
2022-10-23 14:33:02 +04:00
parent 83102a2726
commit 208b29fa4f

View File

@@ -81,13 +81,7 @@ class Graph:
# Check if given events are exist in the graph
# return a list of missing events
def check(self, events: EventIds) -> EventIds:
missing_events = []
for e in events:
if self.events.get(e) == None:
missing_events.append(e)
return missing_events
return [e for e in events if self.events.get(e) == None]
def __str__(self):
res = ""
@@ -112,7 +106,7 @@ class Node:
# On the initialization make the root node as head
self.heads = [genesis_event.hash()]
# Remove the parents for the event if they are exist in heads
# Remove the heads if they are parents of the event
def remove_heads(self, event):
for p in event.parents:
if p in self.heads:
@@ -155,43 +149,61 @@ class Node:
# Check if the parents of the orphan
# are not missing from active pool
missing_parents = self.active_pool.check(orphan.parents)
if not missing_parents:
if missing_parents:
# Check the missing parents from orphan pool and sync with the
# network for missing ones
self.check_and_sync(list(missing_parents), np)
# At this stage all the missing parents must be in the orphan pool
# The next step is to move them to active pool
self.add_linked_events_to_active_pool(missing_parents, [])
# Check again that the parents of the orphan are in the active pool
missing_parents = self.active_pool.check(orphan.parents)
assert (not missing_parents)
# Add the event to active pool
self.add_to_active_pool(orphan)
else:
self.add_to_active_pool(orphan)
return
# Check the missing parents from orphan pool and sync with the network for
# missing ones
self.check_orphan_pool(list(missing_parents), np)
# Last stage is cleaning up the orphan pool
self.clean_orphan_pool()
# At this stage all the missing parents must be in the orphan pool
# The next step is to move them to active pool
self.update_active_pool(missing_parents, [])
def clean_orphan_pool(self):
debug(f"{self.name} clean_orphan_pool()")
while True:
remove_list = []
for orphan in self.orphan_pool.events.values():
# Move the orphan to active pool if it doesn't have missing
# parents in active pool
missing_parents = self.active_pool.check(orphan.parents)
# Check again that the parents of the orphan are in the active pool
missing_parents = self.active_pool.check(orphan.parents)
assert (not missing_parents)
if not missing_parents:
remove_list.append(orphan)
# Last stage, add the event to active pool
self.add_to_active_pool(orphan)
if not remove_list:
break
for ev in remove_list:
self.add_to_active_pool(ev)
def check_orphan_pool(self, missing_events, np):
debug(f"{self.name} check_orphan_pool() {missing_events}")
def check_and_sync(self, missing_events, np):
debug(f"{self.name} check_and_sync() {missing_events}")
while True:
# Check if all missing parents are in orphan pool, otherwise
# add them to request list
request_list = []
self.check_missing_parents(request_list, missing_events, [])
self.scan_orphan_pool(request_list, missing_events, [])
if not request_list:
break
missing_events = self.fetch_events(request_list, np)
missing_events = self.fetch_events_from_network(request_list, np)
def check_missing_parents(self, request_list, events: EventIds, visited):
# Check the missing links inside orphan pool
def scan_orphan_pool(self, request_list, events: EventIds, visited):
debug(f"{self.name} check_missing_parents() {events}")
for event_hash in events:
@@ -210,9 +222,9 @@ class Node:
else:
# Recursive call
# Climb up for the event parents
self.check_missing_parents(request_list, event.parents, visited)
self.scan_orphan_pool(request_list, event.parents, visited)
def fetch_events(self, request_list, np):
def fetch_events_from_network(self, request_list, np):
debug(f"{self.name} fetch_events() {request_list}")
# XXX
# Send the events in request_list to the node who send this event.
@@ -235,9 +247,8 @@ class Node:
# Return parents of requested events
return result
def update_active_pool(self, events, visited):
debug(f"{self.name} update_active_pool() {events}")
def add_linked_events_to_active_pool(self, events, visited):
debug(f"{self.name} add_linked_events_to_active_pool() {events}")
for event_hash in events:
# Check if it already visit this event
if event_hash in visited:
@@ -257,7 +268,7 @@ class Node:
# Recursive call
# Climb up for the event parents
self.update_active_pool(event.parents, visited)
self.add_linked_events_to_active_pool(event.parents, visited)
def add_to_active_pool(self, event):
# Add the event to active pool
@@ -393,21 +404,21 @@ async def run(nodes_n=3, podm=0.30, broadcast_attempt=3, check=False):
error("Broadcast TimeoutError")
async def main(sim_n=6, nodes_increase=False, podm_increase=False ):
async def main(sim_n=6, nodes_increase=False, podm_increase=False):
# run the simulation `sim_n` times with increasing `podm` and `nodes_n`
if nodes_increase:
podm_increase = False
# number of nodes
nodes_n = 5
nodes_n = 10
# probability of dropping events
podm = 0.20
# number of events each node should broadcast
broadcast_attempt = 5
broadcast_attempt = 10
sim_nodes_inc = int(nodes_n / 5)
sim_podm_inc = podm / 5
sim_podm_inc = podm / 5
sim = []
nodes_n_list = []
@@ -416,13 +427,14 @@ async def main(sim_n=6, nodes_increase=False, podm_increase=False ):
podm_tmp = podm
nodes_n_tmp = nodes_n
for _ in range(sim_n):
nodes = await run(nodes_n_tmp, podm_tmp, broadcast_attempt)
sim.append(nodes)
podm_list.append(podm_tmp)
if nodes_increase:
nodes_n_tmp += sim_nodes_inc
nodes_n_tmp += sim_nodes_inc
if podm_increase:
podm_tmp += sim_podm_inc
@@ -437,7 +449,7 @@ async def main(sim_n=6, nodes_increase=False, podm_increase=False ):
events.update(list(node.active_pool.events.keys()))
# Remove the genesis event
del events["8aed642bf5118b9d3c859bd4be35ecac75b6e873cce34e7b6f554b06f75550d7"]
del events[node.genesis_event]
expect_events_synced = (nodes_n * broadcast_attempt)
actual_events_synced = 0
@@ -456,20 +468,20 @@ async def main(sim_n=6, nodes_increase=False, podm_increase=False ):
info(f"expect_events_synced: {expect_events_synced}")
info(f"res: %{res}")
logging.disable()
if nodes_increase:
plt.plot(nodes_n_list, events_synced)
plt.ylim(0, 100)
plt.title(f"Event Graph simulation with %{podm * 100} probability of dropping messages")
plt.title(
f"Event Graph simulation with %{podm * 100} probability of dropping messages")
plt.ylabel("Events sync percentage")
plt.xlabel("Number of nodes")
plt.show()
if podm_increase:
if podm_increase:
plt.plot(podm_list, events_synced)
plt.ylim(0, 100)
@@ -480,7 +492,6 @@ async def main(sim_n=6, nodes_increase=False, podm_increase=False ):
plt.show()
def print_network_graph(nodes):
for (i, node) in enumerate(nodes):
graph = nx.Graph()
@@ -512,5 +523,5 @@ if __name__ == "__main__":
handlers=[logging.FileHandler("debug.log", mode="w"),
logging.StreamHandler()])
#asyncio.run(run(nodes_n=3, podm=0, broadcast_attempt=3, check=True))
asyncio.run(main(nodes_increase=True))