mirror of
https://github.com/vacp2p/status-benchmarks.git
synced 2026-04-10 03:01:16 -04:00
Update private chat scenarios (#42)
* Update imports * Update idle_relay * Update idle_light * Update contact_request * Update send_one_to_one_message * Update create_private_group * Update send_group_message
This commit is contained in:
@@ -6,13 +6,14 @@ import random
|
||||
# Project Imports
|
||||
import src.logger
|
||||
from src import kube_utils, setup_status
|
||||
from src.async_utils import CollectedItem, cleanup_queue_on_event
|
||||
from src.benchmark_scenarios.scenario_utils import send_friend_requests_util
|
||||
from src.inject_messages import inject_messages_one_to_one, inject_messages_group_chat
|
||||
from src.setup_status import initialize_nodes_application, send_friend_requests, accept_friend_requests, \
|
||||
from src.setup_status import initialize_nodes_application, accept_friend_requests, \
|
||||
decline_friend_requests, create_group_chat, add_contacts
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
async def idle_relay(consumers: int = 4):
|
||||
# 1 relay node alice
|
||||
# 100 relay nodes - friends
|
||||
@@ -22,22 +23,14 @@ async def idle_relay(consumers: int = 4):
|
||||
backend_relay_pods = kube_utils.get_pods("status-backend-relay", "status-go-test")
|
||||
relay_nodes = await initialize_nodes_application(backend_relay_pods)
|
||||
|
||||
for node_id, node in relay_nodes.items():
|
||||
peers = await node.wakuext_service.peers()
|
||||
logger.info(f"Peers from {node_id}: {peers}")
|
||||
|
||||
alice = "status-backend-relay-0"
|
||||
friends = [key for key in relay_nodes.keys() if key != alice]
|
||||
|
||||
results_queue: asyncio.Queue[CollectedItem | None] = asyncio.Queue()
|
||||
finished_evt = asyncio.Event()
|
||||
|
||||
send_task = asyncio.create_task(send_friend_requests(relay_nodes, results_queue, [alice], friends, finished_evt))
|
||||
accept_task = asyncio.create_task(accept_friend_requests(relay_nodes, results_queue, consumers))
|
||||
cleanup_task = asyncio.create_task(cleanup_queue_on_event(finished_evt, results_queue, 4))
|
||||
_, delays_queue, _ = await asyncio.gather(send_task, accept_task, cleanup_task)
|
||||
|
||||
|
||||
delays: list[float] = []
|
||||
while not delays_queue.empty():
|
||||
delays.append(delays_queue.get_nowait())
|
||||
|
||||
delays = await send_friend_requests_util(relay_nodes, [alice], friends, accept_friend_requests, consumers)
|
||||
logger.info(f"Delays are: {delays}")
|
||||
|
||||
logger.info("Shutting down node connections")
|
||||
@@ -45,28 +38,32 @@ async def idle_relay(consumers: int = 4):
|
||||
logger.info("Finished idle_relay")
|
||||
|
||||
|
||||
async def idle_light():
|
||||
async def idle_light(consumers: int = 4):
|
||||
# 1 light node alice
|
||||
# 100 light nodes - friends
|
||||
# friends have accepted contact request with alice
|
||||
# friends have accepted contact request with alice, who accepted it
|
||||
|
||||
kube_utils.setup_kubernetes_client()
|
||||
backend_light_pods = kube_utils.get_pods("status-backend-light", "status-go-test")
|
||||
light_nodes = await initialize_nodes_application(backend_light_pods)
|
||||
light_nodes = await initialize_nodes_application(backend_light_pods, wakuV2LightClient=True)
|
||||
|
||||
await asyncio.sleep(10)
|
||||
|
||||
alice = "status-backend-light-0"
|
||||
friends = [key for key in light_nodes.keys() if key != alice]
|
||||
requests_made = await send_friend_requests(light_nodes, [alice], friends)
|
||||
_ = await accept_friend_requests(light_nodes, requests_made)
|
||||
|
||||
delays = await send_friend_requests_util(light_nodes, [alice], friends, accept_friend_requests, consumers)
|
||||
|
||||
logger.info(f"Delays are: {delays}")
|
||||
|
||||
logger.info("Shutting down node connections")
|
||||
await asyncio.gather(*[node.shutdown() for node in light_nodes.values()])
|
||||
logger.info("Finished idle_light")
|
||||
|
||||
|
||||
async def contact_request():
|
||||
# relay: 25 requesters, 60 requested (20 accept, 20 reject, 20 ignore), 25 idle
|
||||
# light: 25 requesters, 60 requested (20 accept, 20 reject, 20 ignore), 25 idle
|
||||
async def contact_request(consumers: int = 4):
|
||||
# relay: 25 requesters, 60 requested (20 accept, 20 reject, 20 ignore), 25 idle ## 5 12 5 (4,4,4)
|
||||
# light: 25 requesters, 60 requested (20 accept, 20 reject, 20 ignore), 25 idle ## 5 12 5 (4,4,4)
|
||||
# -> Each requester send a contact request to 3 nodes in the requested set (random selection)
|
||||
# -> accepting nodes in the set accept the request
|
||||
# -> rejecting nodes rejects the request
|
||||
@@ -82,37 +79,29 @@ async def contact_request():
|
||||
setup_status.initialize_nodes_application(backend_light_pods, wakuV2LightClient=True)
|
||||
)
|
||||
|
||||
await asyncio.sleep(20)
|
||||
|
||||
backend_relay_pods = [pod_name.split(".")[0] for pod_name in backend_relay_pods]
|
||||
backend_light_pods = [pod_name.split(".")[0] for pod_name in backend_light_pods]
|
||||
|
||||
relay_requesters = backend_relay_pods[:25]
|
||||
light_requesters = backend_light_pods[:25]
|
||||
relay_requested = backend_relay_pods[25:85]
|
||||
light_requested = backend_light_pods[25:85]
|
||||
requesters = backend_relay_pods[:25] + backend_light_pods[:25]
|
||||
receiver_accept = backend_relay_pods[25:45] + backend_light_pods[25:45]
|
||||
receiver_reject = backend_relay_pods[45:65] + backend_light_pods[45:65]
|
||||
receiver_ignore = backend_relay_pods[65:85] + backend_light_pods[65:85]
|
||||
|
||||
# Returns a list of tuples like: [(sender name, {receiver: request_id, ...})]
|
||||
relay_friend_requests, light_friend_requests = await asyncio.gather(
|
||||
*[send_friend_requests(relay_nodes, [requester], random.sample(relay_requested, 3)) for requester in
|
||||
relay_requesters],
|
||||
*[send_friend_requests(light_nodes, [requester], random.sample(light_requested, 3)) for requester in
|
||||
light_requesters]
|
||||
)
|
||||
random.shuffle(requesters)
|
||||
random.shuffle(receiver_accept)
|
||||
random.shuffle(receiver_reject)
|
||||
|
||||
to_accept_requests_relay = random.sample(relay_friend_requests, 20)
|
||||
remaining_relay = list(set(relay_friend_requests) - set(to_accept_requests_relay))
|
||||
to_reject_requests_relay = random.sample(remaining_relay, 20)
|
||||
logger.info("Sending friend requests")
|
||||
delays_accept = await send_friend_requests_util(light_nodes, requesters, receiver_accept, accept_friend_requests, 3, consumers)
|
||||
delays_reject = await send_friend_requests_util(light_nodes, requesters, receiver_reject, decline_friend_requests, 3, consumers)
|
||||
_ = await send_friend_requests_util(light_nodes, requesters, receiver_reject, None)
|
||||
|
||||
to_accept_requests_light = random.sample(light_friend_requests, 20)
|
||||
remaining_light = list(set(light_friend_requests) - set(to_accept_requests_light))
|
||||
to_reject_requests_light = random.sample(remaining_light, 20)
|
||||
logger.info(f"Accept delays ({len(delays_accept)}) are: {delays_accept}")
|
||||
logger.info(f"Reject delays ({len(delays_reject)}) are: {delays_reject}")
|
||||
|
||||
|
||||
_ = await asyncio.gather(
|
||||
*[accept_friend_requests(relay_nodes, to_accept_requests_relay)],
|
||||
*[accept_friend_requests(light_nodes, to_accept_requests_light)],
|
||||
*[decline_friend_requests(relay_nodes, to_reject_requests_relay)],
|
||||
*[decline_friend_requests(light_nodes, to_reject_requests_light)],
|
||||
)
|
||||
await asyncio.sleep(10)
|
||||
|
||||
logger.info("Shutting down node connections")
|
||||
await asyncio.gather(*[node.shutdown() for node in relay_nodes.values()],
|
||||
@@ -120,7 +109,7 @@ async def contact_request():
|
||||
logger.info("Finished contact_request")
|
||||
|
||||
|
||||
async def send_one_to_one_message():
|
||||
async def send_one_to_one_message(consumers: int = 4):
|
||||
# 50 sending nodes
|
||||
# 50 receiving nodes
|
||||
# 50 idle nodes
|
||||
@@ -130,22 +119,38 @@ async def send_one_to_one_message():
|
||||
backend_relay_pods = kube_utils.get_pods("status-backend-relay", "status-go-test")
|
||||
relay_nodes = await initialize_nodes_application(backend_relay_pods)
|
||||
|
||||
logger.info("Waiting 60 seconds after nodes initialization")
|
||||
await asyncio.sleep(60)
|
||||
|
||||
backend_relay_pods = [pod_name.split(".")[0] for pod_name in backend_relay_pods]
|
||||
|
||||
senders = backend_relay_pods[:50]
|
||||
receiving = backend_relay_pods[50:100]
|
||||
receivers = backend_relay_pods[50:100]
|
||||
|
||||
friend_requests = await send_friend_requests(relay_nodes, senders, receiving)
|
||||
_ = await accept_friend_requests(relay_nodes, friend_requests)
|
||||
delays = await send_friend_requests_util(relay_nodes, senders, receivers, accept_friend_requests, consumers)
|
||||
|
||||
await asyncio.gather(*[inject_messages_one_to_one(relay_nodes[senders[i]], 10, relay_nodes[receiving[i]].public_key, 60) for i in range(50)])
|
||||
logger.info("Waiting 20 seconds after accepting requests")
|
||||
await asyncio.sleep(10)
|
||||
|
||||
await asyncio.gather(*[inject_messages_one_to_one(relay_nodes[senders[i]], 10, relay_nodes[receivers[i]].public_key, 18) for i in range(50)])
|
||||
|
||||
logger.info("Waiting 20 seconds")
|
||||
await asyncio.sleep(20)
|
||||
|
||||
expected_messages = {f"Message {i}" for i in range(7)}
|
||||
for receiver in receivers:
|
||||
received_messages = {tup[1] for tup in relay_nodes[receiver].signal.signal_queues["messages.new"].messages if tup[1].startswith("Message ")}
|
||||
missing_messages = expected_messages - received_messages
|
||||
if missing_messages:
|
||||
logger.error(f"{receiver} is missing messages, got {len(received_messages)}/{len(expected_messages)} messages")
|
||||
|
||||
# TODO: Retrieve latencies
|
||||
logger.info("Shutting down node connections")
|
||||
await asyncio.gather(*[node.shutdown() for node in relay_nodes.values()])
|
||||
logger.info("Finished send_one_to_one_message")
|
||||
|
||||
|
||||
async def create_private_group():
|
||||
async def create_private_group(consumers: int = 4):
|
||||
# 10 admin nodes
|
||||
# 100 single-group members
|
||||
# Each admin node create a group and invite 10 single-group members in it, who accept the invite
|
||||
@@ -154,27 +159,30 @@ async def create_private_group():
|
||||
backend_relay_pods = kube_utils.get_pods("status-backend-relay", "status-go-test")
|
||||
relay_nodes = await initialize_nodes_application(backend_relay_pods)
|
||||
|
||||
logger.info("Waiting 60 seconds after nodes initialization")
|
||||
await asyncio.sleep(60)
|
||||
|
||||
backend_relay_pods = [pod_name.split(".")[0] for pod_name in backend_relay_pods]
|
||||
|
||||
admin_nodes = backend_relay_pods[:10]
|
||||
members = backend_relay_pods[10:110]
|
||||
members = backend_relay_pods[10:]
|
||||
members_pub_keys = [relay_nodes[node].public_key for node in members]
|
||||
|
||||
# In order to create a group, first they need to be friends
|
||||
friend_requests = await send_friend_requests(relay_nodes, admin_nodes, members)
|
||||
logger.info("Accepting friend requests")
|
||||
_ = await accept_friend_requests(relay_nodes, friend_requests)
|
||||
delays = await send_friend_requests_util(relay_nodes, admin_nodes, members, accept_friend_requests, consumers)
|
||||
_ = await add_contacts(relay_nodes, admin_nodes, members)
|
||||
|
||||
await asyncio.sleep(30)
|
||||
|
||||
# 1 admin to 10 users, no overlap
|
||||
await asyncio.gather(*[create_group_chat(relay_nodes[admin], members_pub_keys[10*i: (10*i)+10]) for i, admin in enumerate(admin_nodes)])
|
||||
|
||||
# TODO check they really are in the group chat?
|
||||
await asyncio.sleep(30)
|
||||
logger.info("Shutting down node connections")
|
||||
await asyncio.gather(*[node.shutdown() for node in relay_nodes.values()])
|
||||
logger.info("Finished create_private_group")
|
||||
|
||||
|
||||
async def send_group_message():
|
||||
async def send_group_message(consumers: int = 4):
|
||||
# 10 admin nodes
|
||||
# 100 single-group members
|
||||
# Each admin node create a group and invite 10 single-group members in it, who accept the invite
|
||||
@@ -183,30 +191,43 @@ async def send_group_message():
|
||||
backend_relay_pods = kube_utils.get_pods("status-backend-relay", "status-go-test")
|
||||
relay_nodes = await initialize_nodes_application(backend_relay_pods)
|
||||
|
||||
logger.info("Waiting 30 seconds after nodes initialization")
|
||||
await asyncio.sleep(30)
|
||||
|
||||
backend_relay_pods = [pod_name.split(".")[0] for pod_name in backend_relay_pods]
|
||||
|
||||
admin_nodes = backend_relay_pods[:10]
|
||||
members = backend_relay_pods[10:110]
|
||||
admin_nodes = backend_relay_pods[:2]
|
||||
members = backend_relay_pods[2:]
|
||||
members_pub_keys = [relay_nodes[node].public_key for node in members]
|
||||
|
||||
# In order to create a group, first they need to be friends
|
||||
friend_requests = await send_friend_requests(relay_nodes, admin_nodes, members)
|
||||
logger.info("Accepting friend requests")
|
||||
_ = await accept_friend_requests(relay_nodes, friend_requests)
|
||||
delays = await send_friend_requests_util(relay_nodes, admin_nodes, members, accept_friend_requests, consumers)
|
||||
_ = await add_contacts(relay_nodes, admin_nodes, members)
|
||||
|
||||
await asyncio.sleep(30)
|
||||
|
||||
# 1 admin to 10 users, no overlap
|
||||
group_ids = await asyncio.gather(*[create_group_chat(relay_nodes[admin], members_pub_keys[10*i: (10*i)+10]) for i, admin in enumerate(admin_nodes)])
|
||||
await asyncio.sleep(10)
|
||||
group_ids = await asyncio.gather(
|
||||
*[create_group_chat(relay_nodes[admin], members_pub_keys[10 * i: (10 * i) + 10]) for i, admin in
|
||||
enumerate(admin_nodes)])
|
||||
# TODO check they really are in the group chat
|
||||
await asyncio.sleep(30)
|
||||
|
||||
await asyncio.gather(*[
|
||||
inject_messages_group_chat(relay_nodes[member],
|
||||
delay_between_message=10,
|
||||
group_id=group_ids[i//10], # 10 first nodes to group 0, 10 to group 1, ...
|
||||
num_messages=10) for i, member in members
|
||||
group_id=group_ids[i // 10], # 10 first nodes to group 0, 10 to group 1, ...
|
||||
num_messages=10) for i, member in enumerate(members)
|
||||
])
|
||||
|
||||
logger.info("Waiting 30 seconds")
|
||||
await asyncio.sleep(30)
|
||||
expected_messages = 100
|
||||
for name, node in relay_nodes.items():
|
||||
received_messages = [tup[1] for tup in node.signal.signal_queues["messages.new"].messages if
|
||||
tup[1].startswith("Message ")]
|
||||
if len(received_messages) != expected_messages:
|
||||
logger.error(f"{name} is missing messages, got {len(received_messages)}/{expected_messages} messages")
|
||||
|
||||
logger.info("Shutting down node connections")
|
||||
await asyncio.gather(*[node.shutdown() for node in relay_nodes.values()])
|
||||
logger.info("Finished send_one_to_one_message")
|
||||
logger.info("Finished send_group_message")
|
||||
|
||||
Reference in New Issue
Block a user