diff --git a/script/research/event_graph/main.py b/script/research/event_graph/main.py index 1f98e833b..18d0f62f6 100644 --- a/script/research/event_graph/main.py +++ b/script/research/event_graph/main.py @@ -1,7 +1,12 @@ from hashlib import sha256 from datetime import datetime +from random import randint import asyncio +import matplotlib.pyplot as plt +import networkx as nx + + EventId = str EventIds = list[EventId] @@ -34,24 +39,26 @@ class Event: """ ## Graph Example - E1: [] + E1: [] E2: [E1] E3: [E1] E4: [E3] E5: [E3] E6: [E4, E5] E7: [E4] - E8: [E2] + E8: [E2] """ - - class Graph: def __init__(self): self.events = dict() + def heads(self): # NOTE: we will need to keep track of heads for creating new events. # Not needed for this demo though. + # XXX this for testing purpose + return [list(self.events.keys())[-1]] + def add_event(self, event: Event): self.events[event.hash()] = event @@ -90,6 +97,8 @@ class Node: self.genesis_event = genesis_event self.active_pool.add_event(genesis_event) + def last_event(self): + return self.active_pool.heads() def receive_new_event(self, event: Event): @@ -105,7 +114,7 @@ class Node: self.active_pool.add_event(event) # events list to be removed from orphan pool - # after relink + # after relink remove_list: EventIds = [] self.relink(event, remove_list) @@ -147,7 +156,6 @@ class Node: if event.hash() not in orphan.parents: continue - missing_parents = self.active_pool.check(orphan.parents) if len(missing_parents) == 0: @@ -164,24 +172,88 @@ class Node: \n Orphan Pool: {self.orphan_pool}""" -async def run_node(name): - print(f"{name} Started") - node = Node(name) +NODES_N = 10 +BROADCAST_ATTEMPT = 3 - print(f"{name} End") +MAX_BROADCAST_DELAY = 2 +MIN_BROADCAST_DELAY = 0 + + +async def recv_loop(node, peer, queue): + while True: + event = await queue.get() + queue.task_done() + node.receive_new_event(event) + print(f"{node.name} receive: {event.hash()} from {peer}") + + +async def send_loop(node, queue): + for _ in range(BROADCAST_ATTEMPT): + + await asyncio.sleep(randint(MIN_BROADCAST_DELAY, MAX_BROADCAST_DELAY)) + event = Event(node.last_event()) + + for _ in range(NODES_N): + await queue.put(event) + await queue.join() + + print(f"{node.name} broadcast: {event.hash()}") async def main(): - tasks = await asyncio.gather( - run_node("NodeA"), - run_node("NodeB"), - run_node("NodeC")) + + send_tasks = [] + recv_tasks = [] + + nodes = [] + queues = dict() + + print(f"Run {NODES_N} Nodes") + + for i in range(NODES_N): + node = Node(f"Node{i}") + nodes.append(node) + + queue = asyncio.Queue() + queues[node.name] = queue + + send_task = asyncio.create_task(send_loop(node, queue)) + send_tasks.append(send_task) + + for node in nodes: + for (peer, queue) in queues.items(): + recv_task = asyncio.create_task(recv_loop(node, peer, queue)) + recv_tasks.append(recv_task) + + g = asyncio.gather(*recv_tasks) + + await asyncio.gather(*send_tasks) + + for node in nodes: + print(node.name) + print(len(node.active_pool.events)) + print(len(node.orphan_pool.events)) + print("###############") + + graph = nx.Graph() + + node_to_draw = nodes[0] + print(node_to_draw) + + for (h, ev) in node_to_draw.active_pool.events.items(): + graph.add_node(h[:5]) + graph.add_edges_from([(h[:5], p[:5]) for p in ev.parents]) + + nx.draw(graph, with_labels=True, node_color="#69aaff", node_size=400) + plt.show() + + await g def test_node(): node_a = Node("NodeA") - event0 = node_a.genesis_event + event0 = node_a.genesis_event event1 = Event([event0.hash()]) event2 = Event([event1.hash()]) event3 = Event([event2.hash(), event0.hash()]) @@ -200,5 +272,6 @@ def test_node(): if __name__ == "__main__": - test_node() + + # test_node() asyncio.run(main())