From e23e1d3f88a19f3e08f6e2df59c6ea73f6087ef8 Mon Sep 17 00:00:00 2001 From: Alberto Soutullo Date: Fri, 26 Sep 2025 16:53:42 +0200 Subject: [PATCH] Idle relay (#21) * Add send_friend_requests and accept_friend_requests setup functions * Add idle_relay scenario * Add missing import --- src/benchmark_scenarios/private_chats.py | 29 ++++++++++++++ src/setup_status.py | 50 ++++++++++++++++++++++++ 2 files changed, 79 insertions(+) create mode 100644 src/benchmark_scenarios/private_chats.py diff --git a/src/benchmark_scenarios/private_chats.py b/src/benchmark_scenarios/private_chats.py new file mode 100644 index 0000000..90a683d --- /dev/null +++ b/src/benchmark_scenarios/private_chats.py @@ -0,0 +1,29 @@ +# Python Imports +import asyncio +import logging +import random + +# Project Imports +import src.logger +from src import kube_utils +from src.setup_status import initialize_nodes_application, send_friend_requests, accept_friend_requests + +logger = logging.getLogger(__name__) + +async def idle_relay(): + # 1 relay node alice + # 100 relay nodes - friends + # friends have accepted contact request with alice, who accepted it + + kube_utils.setup_kubernetes_client() + backend_relay_pods = kube_utils.get_pods("status-backend-relay", "status-go-test") + relay_nodes = await initialize_nodes_application(backend_relay_pods) + + alice = "status-backend-relay-0" + friends = [key for key in relay_nodes.keys() if key != alice] + requests_made = await send_friend_requests(relay_nodes, [alice], friends) + _ = await accept_friend_requests(relay_nodes, requests_made) + + logger.info("Shutting down node connections") + await asyncio.gather(*[node.shutdown() for node in relay_nodes.values()]) + logger.info("Finished idle_relay") diff --git a/src/setup_status.py b/src/setup_status.py index ee4fa09..fda064c 100644 --- a/src/setup_status.py +++ b/src/setup_status.py @@ -4,6 +4,7 @@ import logging import time # Project Imports +from src.enums import MessageContentType, SignalType from src.status_backend import StatusBackend logger = logging.getLogger(__name__) @@ -121,6 +122,55 @@ async def reject_community_requests(owner: StatusBackend, join_ids: list[str]): logger.info(f"All {len(join_ids)} nodes have been rejected successfully") +async def send_friend_requests(nodes: dict[str, StatusBackend], senders: list[str], receivers: list[str]): + async def _send_friend_request(nodes: dict[str, StatusBackend], sender: str, receivers: list[str]): + # Send contact requests from sender -> receivers + responses = await asyncio.gather(*[nodes[sender].wakuext_service.send_contact_request(nodes[node].public_key, "asd") for node in receivers]) + # Get responses and filter by contact requests + request_responses = await asyncio.gather(*[get_messages_by_content_type(response, MessageContentType.CONTACT_REQUEST.value) for response in responses]) + # Create a dict {receiver: request}, using the first response (there is always only one friend request) + request_ids = {receiver: request_responses[i][0].get("id") for i, receiver in enumerate(receivers)} + + return sender, request_ids + + requests_made = await asyncio.gather(*[_send_friend_request(nodes, sender, receivers) for sender in senders]) + # Returns a list of tuples like: [(sender name, {receiver: request_id, ...})] + logger.info(f"All {len(receivers)} friend requests sent.") + return requests_made + + +async def accept_friend_requests(nodes: dict[str, StatusBackend], requests: list[(str, dict[str, str])]): + # Flatten all tasks into a single list and execute them concurrently + async def _accept_friend_request(nodes: dict[str, StatusBackend], sender: str, receiver: str, request_id: str): + max_retries = 40 + retry_interval = 0.5 + + for attempt in range(max_retries): + try: + _ = await nodes[receiver].wakuext_service.accept_contact_request(request_id) + accepted_signal = f"@{nodes[receiver].public_key} accepted your contact request" + signal = await nodes[sender].signal.find_signal_containing_string(SignalType.MESSAGES_NEW.value, event_string=accepted_signal) + return signal + except Exception as e: + logging.error(f"Attempt {attempt + 1}/{max_retries}: Unexpected error: {e}") + time.sleep(retry_interval) + + raise Exception( + f"Failed to accept friend request in {max_retries * retry_interval} seconds." + ) + + _ = await asyncio.gather( + *[ + _accept_friend_request(nodes, sender, receiver, request_id) + for sender, receivers in requests + for receiver, request_id in receivers.items() + ] + ) + + total_requests = sum(len(receivers) for _, receivers in requests) + logger.info(f"All {total_requests} friend requests accepted.") + + async def get_messages_by_content_type(response: dict, content_type: str, message_pattern: str="") -> list[dict]: matched_messages = []