diff --git a/src/benchmark_scenarios/private_chats.py b/src/benchmark_scenarios/private_chats.py index 962733b..dea0283 100644 --- a/src/benchmark_scenarios/private_chats.py +++ b/src/benchmark_scenarios/private_chats.py @@ -6,7 +6,7 @@ import random # Project Imports import src.logger from src import kube_utils, setup_status -from src.inject_messages import inject_messages_one_to_one +from src.inject_messages import inject_messages_one_to_one, inject_messages_group_chat from src.setup_status import initialize_nodes_application, send_friend_requests, accept_friend_requests, \ decline_friend_requests, create_group_chat, add_contacts @@ -158,3 +158,41 @@ async def create_private_group(): logger.info("Shutting down node connections") await asyncio.gather(*[node.shutdown() for node in relay_nodes.values()]) logger.info("Finished create_private_group") + + +async def send_group_message(): + # 10 admin nodes + # 100 single-group members + # Each admin node create a group and invite 10 single-group members in it, who accept the invite + # -> Every member send a message in their group every 10 seconds + kube_utils.setup_kubernetes_client() + backend_relay_pods = kube_utils.get_pods("status-backend-relay", "status-go-test") + relay_nodes = await initialize_nodes_application(backend_relay_pods) + + backend_relay_pods = [pod_name.split(".")[0] for pod_name in backend_relay_pods] + + admin_nodes = backend_relay_pods[:10] + members = backend_relay_pods[10:110] + members_pub_keys = [relay_nodes[node].public_key for node in members] + + # In order to create a group, first they need to be friends + friend_requests = await send_friend_requests(relay_nodes, admin_nodes, members) + logger.info("Accepting friend requests") + _ = await accept_friend_requests(relay_nodes, friend_requests) + _ = await add_contacts(relay_nodes, admin_nodes, members) + + # 1 admin to 10 users, no overlap + group_ids = await asyncio.gather(*[create_group_chat(relay_nodes[admin], members_pub_keys[10*i: (10*i)+10]) for i, admin in enumerate(admin_nodes)]) + await asyncio.sleep(10) + + await asyncio.gather(*[ + inject_messages_group_chat(relay_nodes[member], + delay_between_message=10, + group_id=group_ids[i//10], # 10 first nodes to group 0, 10 to group 1, ... + num_messages=10) for i, member in members + ]) + + + logger.info("Shutting down node connections") + await asyncio.gather(*[node.shutdown() for node in relay_nodes.values()]) + logger.info("Finished send_one_to_one_message") diff --git a/src/inject_messages.py b/src/inject_messages.py index 4f59134..b4de4a5 100644 --- a/src/inject_messages.py +++ b/src/inject_messages.py @@ -45,3 +45,22 @@ async def inject_messages_one_to_one(pod: StatusBackend, delay_between_message: await asyncio.sleep(1) logger.info(f"Finished sending {num_messages} messages") + +async def inject_messages_group_chat(pod: StatusBackend, delay_between_message: float, group_id: str, num_messages: int): + for message_count in range(num_messages): + try: + logger.debug(f"Sending message {message_count}") + await pod.wakuext_service.send_group_chat_message(group_id, f"Message {message_count}") + + if message_count == 0: + logger.info(f"Successfully began sending {num_messages} messages") + elif message_count % 10 == 0: + logger.debug(f"Sent {message_count} messages") + + await asyncio.sleep(delay_between_message) + + except AssertionError as e: + logger.error(f"Error sending message: {e}") + await asyncio.sleep(1) + + logger.info(f"Finished sending {num_messages} messages") \ No newline at end of file diff --git a/src/setup_status.py b/src/setup_status.py index 25628a2..3f82c9e 100644 --- a/src/setup_status.py +++ b/src/setup_status.py @@ -221,6 +221,7 @@ async def create_group_chat(admin: StatusBackend, receivers: list[str]): group_id = response.get("result", {}).get("communities", [{}])[0].get("id") logger.info(f"Group {name} created with ID {group_id}") + return group_id async def get_messages_by_content_type(response: dict, content_type: str, message_pattern: str="") -> list[dict]: