From 4eaabd153ada1b2dec7c6e1c1fc4df2020a9156f Mon Sep 17 00:00:00 2001 From: Alberto Soutullo Date: Fri, 26 Sep 2025 20:56:50 +0200 Subject: [PATCH] Send one-to-one (#24) * Add inject_messages_one_to_one * Add send_one_to_one_message scenario --- src/benchmark_scenarios/private_chats.py | 26 ++++++++++++++++++++++++ src/inject_messages.py | 20 ++++++++++++++++++ src/wakuext_service.py | 20 ++++++++++++++++++ 3 files changed, 66 insertions(+) diff --git a/src/benchmark_scenarios/private_chats.py b/src/benchmark_scenarios/private_chats.py index 2ac6ea5..5fab782 100644 --- a/src/benchmark_scenarios/private_chats.py +++ b/src/benchmark_scenarios/private_chats.py @@ -6,6 +6,7 @@ import random # Project Imports import src.logger from src import kube_utils, setup_status +from src.inject_messages import inject_messages_one_to_one from src.setup_status import initialize_nodes_application, send_friend_requests, accept_friend_requests, \ decline_friend_requests @@ -103,3 +104,28 @@ async def contact_request(): await asyncio.gather(*[node.shutdown() for node in relay_nodes.values()], *[node.shutdown() for node in light_nodes.values()]) logger.info("Finished contact_request") + + +async def send_one_to_one_message(): + # 50 sending nodes + # 50 receiving nodes + # 50 idle nodes + # Each sending node has sent a contact request to a receiving node, that accepted it + # -> Sending nodes send one text message per 10 sec + kube_utils.setup_kubernetes_client() + backend_relay_pods = kube_utils.get_pods("status-backend-relay", "status-go-test") + relay_nodes = await initialize_nodes_application(backend_relay_pods) + + backend_relay_pods = [pod_name.split(".")[0] for pod_name in backend_relay_pods] + + senders = backend_relay_pods[:50] + receiving = backend_relay_pods[50:100] + + friend_requests = await send_friend_requests(relay_nodes, senders, receiving) + _ = await accept_friend_requests(relay_nodes, friend_requests) + + await asyncio.gather(*[inject_messages_one_to_one(relay_nodes[senders[i]], 10, relay_nodes[receiving[i]].public_key, 60) for i in range(50)]) + + logger.info("Shutting down node connections") + await asyncio.gather(*[node.shutdown() for node in relay_nodes.values()]) + logger.info("Finished send_one_to_one_message") diff --git a/src/inject_messages.py b/src/inject_messages.py index d108c6f..4f59134 100644 --- a/src/inject_messages.py +++ b/src/inject_messages.py @@ -25,3 +25,23 @@ async def inject_messages(pod: StatusBackend, delay_between_message: float, chat await asyncio.sleep(1) logger.info(f"Finished sending {num_messages} messages") + +# TODO merge in same function +async def inject_messages_one_to_one(pod: StatusBackend, delay_between_message: float, contact_id: str, num_messages: int): + for message_count in range(num_messages): + try: + logger.debug(f"Sending message {message_count}") + await pod.wakuext_service.send_one_to_one_message(contact_id, f"Message {message_count}") + + if message_count == 0: + logger.info(f"Successfully began sending {num_messages} messages") + elif message_count % 10 == 0: + logger.debug(f"Sent {message_count} messages") + + await asyncio.sleep(delay_between_message) + + except AssertionError as e: + logger.error(f"Error sending message: {e}") + await asyncio.sleep(1) + + logger.info(f"Finished sending {num_messages} messages") diff --git a/src/wakuext_service.py b/src/wakuext_service.py index ed002cc..39cbfa4 100644 --- a/src/wakuext_service.py +++ b/src/wakuext_service.py @@ -64,3 +64,23 @@ class WakuextAsyncService(AsyncService): params = [{"id": request_id}] json_response = await self.rpc_request("declineContactRequest", params) return json_response + + async def send_one_to_one_message(self, contact_id: str, message: str): + params = [{"id": contact_id, "message": message}] + json_response = await self.rpc_request("sendOneToOneMessage", params) + return json_response + + async def create_group_chat_with_members(self, pubkey_list: list, group_chat_name: str): + params = [None, group_chat_name, pubkey_list] + json_response = await self.rpc_request("createGroupChatWithMembers", params) + return json_response + + async def send_group_chat_message(self, group_id: str, message: str): + params = [{"id": group_id, "message": message}] + json_response = await self.rpc_request("sendGroupChatMessage", params) + return json_response + + async def add_contact(self, contact_id: str, displayName: str): + params = [{"id": contact_id, "nickname": "fake_nickname", "displayName": displayName, "ensName": ""}] + json_response = await self.rpc_request("addContact", params) + return json_response