From d6c71775a60b81392e4cdf6a1a7f63662a22be54 Mon Sep 17 00:00:00 2001 From: Alberto Soutullo Date: Thu, 25 Sep 2025 19:09:48 +0200 Subject: [PATCH] Message sending scenario (#31) * Change parameter in inject_messages to a better naming * Add message_sending community performance * Add missing await * Add missing community id and shut down nodes --- src/benchmark_scenarios/communities.py | 27 ++++++++++++++++++++++++++ src/inject_messages.py | 5 ++--- 2 files changed, 29 insertions(+), 3 deletions(-) diff --git a/src/benchmark_scenarios/communities.py b/src/benchmark_scenarios/communities.py index e532c93..87b5c78 100644 --- a/src/benchmark_scenarios/communities.py +++ b/src/benchmark_scenarios/communities.py @@ -124,3 +124,30 @@ async def store_performance(): await asyncio.gather(*[node.shutdown() for node in relay_nodes.values()]) await asyncio.gather(*[node.shutdown() for node in light_nodes.values()]) logger.info("Finished store_performance") + +async def message_sending(): + # 1 community owner + # 500 users + # all joined + # -> 100 nodes send 1 message every 5s + kube_utils.setup_kubernetes_client() + backend_relay_pods = kube_utils.get_pods("status-backend-relay", "status-go-test") + relay_nodes = await setup_status.initialize_nodes_application(backend_relay_pods) + + name = f"test_community_{''.join(random.choices(string.ascii_letters, k=10))}" + logger.info(f"Creating community {name}") + response = await relay_nodes["status-backend-relay-0"].wakuext_service.create_community(name) + community_id = response.get("result", {}).get("communities", [{}])[0].get("id") + logger.info(f"Community {name} created with ID {community_id}") + + owner = relay_nodes["status-backend-relay-0"] + nodes = [key for key in relay_nodes.keys() if key != "status-backend-relay-0"] + join_ids = await request_join_nodes_to_community(relay_nodes, nodes, community_id) + chat_id = await accept_community_requests(owner, join_ids) + + await asyncio.gather(*[inject_messages(relay_nodes[node], 5, community_id+chat_id, 100) for node in nodes[:100]]) + + logger.info("Shutting down node connections") + await asyncio.gather(*[node.shutdown() for node in relay_nodes.values()]) + logger.info("Finished store_performance") + diff --git a/src/inject_messages.py b/src/inject_messages.py index 6e4d084..d108c6f 100644 --- a/src/inject_messages.py +++ b/src/inject_messages.py @@ -7,8 +7,7 @@ from src.status_backend import StatusBackend logger = logging.getLogger(__name__) -async def inject_messages(pod: StatusBackend, msg_per_sec: int, chat_id: str, num_messages: int): - delay = 1 / msg_per_sec +async def inject_messages(pod: StatusBackend, delay_between_message: float, chat_id: str, num_messages: int): for message_count in range(num_messages): try: logger.debug(f"Sending message {message_count}") @@ -19,7 +18,7 @@ async def inject_messages(pod: StatusBackend, msg_per_sec: int, chat_id: str, nu elif message_count % 10 == 0: logger.debug(f"Sent {message_count} messages") - await asyncio.sleep(delay) + await asyncio.sleep(delay_between_message) except AssertionError as e: logger.error(f"Error sending message: {e}")