From 466674bc4825c33a7a406e1f972cf074412645d8 Mon Sep 17 00:00:00 2001 From: Alberto Soutullo Date: Thu, 25 Sep 2025 18:00:49 +0200 Subject: [PATCH] Setup StatusBackend functions (#29) * Add functions to set up status nodes * Fixed error when login into a node. * Add functions to send and accept friend requests * Split requests into apply and accept and other changes * Started changing setup_status with previous changes * Improve logout * Cleanup setup_status.py * Reduced logs to DEBUG to avoid noise * Add subscription_performance * Fix wrong import in communities.py --- src/benchmark_scenarios/__init__.py | 0 src/benchmark_scenarios/communities.py | 69 +++++++++++++++++ src/inject_messages.py | 4 +- src/setup_status.py | 101 +++++++++++++++++++++++++ src/signal_client.py | 6 +- src/status_backend.py | 7 +- 6 files changed, 180 insertions(+), 7 deletions(-) create mode 100644 src/benchmark_scenarios/__init__.py create mode 100644 src/benchmark_scenarios/communities.py create mode 100644 src/setup_status.py diff --git a/src/benchmark_scenarios/__init__.py b/src/benchmark_scenarios/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/benchmark_scenarios/communities.py b/src/benchmark_scenarios/communities.py new file mode 100644 index 0000000..4d87ae7 --- /dev/null +++ b/src/benchmark_scenarios/communities.py @@ -0,0 +1,69 @@ +# Python Imports +import asyncio +import random +import string +import logging + +# Project Imports +import src.logger +from src import kube_utils +from src import setup_status +from src.inject_messages import inject_messages +from src.setup_status import request_join_nodes_to_community, login_nodes, accept_community_requests + +logger = logging.getLogger(__name__) + + +async def subscription_performance(): + # 10 relay nodes including 1 publisher node + # 5 service nodes + # 500 light nodes + # One community setup + # All relay and light nodes have joined the community + # -> Relay and service nodes are running + # -> 1 relay node is injecting messages + # -> Start light nodes + # -> Measure time from start to time messages are being received on filter + kube_utils.setup_kubernetes_client() + backend_relay_pods = kube_utils.get_pods("status-backend-relay", "status-go-test") + backend_light_pods = kube_utils.get_pods("status-backend-light", "status-go-test") + + relay_nodes, light_nodes = await asyncio.gather( + setup_status.initialize_nodes_application(backend_relay_pods), + setup_status.initialize_nodes_application(backend_light_pods, wakuV2LightClient=True) + ) + + name = f"test_community_{''.join(random.choices(string.ascii_letters, k=10))}" + logger.info(f"Creating community {name}") + response = await relay_nodes["status-backend-relay-0"].wakuext_service.create_community(name) + community_id = response.get("result", {}).get("communities", [{}])[0].get("id") + logger.info(f"Community {name} created with ID {community_id}") + + owner = relay_nodes["status-backend-relay-0"] + nodes = [key for key in relay_nodes.keys() if key != "status-backend-relay-0"] + + joins_ids_relay, join_ids_light = await asyncio.gather( + request_join_nodes_to_community(relay_nodes, nodes, community_id), + request_join_nodes_to_community(light_nodes, light_nodes.keys(), community_id) + ) + + logger.info(f"Join IDs: {joins_ids_relay}, \n {join_ids_light}") + + chat_id_relays, chat_id_lights = await asyncio.gather( + accept_community_requests(owner, joins_ids_relay), + accept_community_requests(owner, join_ids_light), + ) + + message_task = asyncio.create_task(inject_messages(owner, 1, community_id + chat_id_relays, 30)) + await asyncio.sleep(10) + logger.info("Logging out light nodes") + await asyncio.gather(*[node.logout() for node in light_nodes.values()]) + logger.info("Logging in light nodes") + await login_nodes(light_nodes, light_nodes.keys()) + + await message_task + + logger.info("Shutting down node connections") + await asyncio.gather(*[node.shutdown() for node in relay_nodes.values()]) + await asyncio.gather(*[node.shutdown() for node in light_nodes.values()]) + logger.info("Finished subscription_performance") diff --git a/src/inject_messages.py b/src/inject_messages.py index 7ee2f96..6e4d084 100644 --- a/src/inject_messages.py +++ b/src/inject_messages.py @@ -11,11 +11,11 @@ async def inject_messages(pod: StatusBackend, msg_per_sec: int, chat_id: str, nu delay = 1 / msg_per_sec for message_count in range(num_messages): try: - logger.info(f"Sending message {message_count}") + logger.debug(f"Sending message {message_count}") await pod.wakuext_service.send_chat_message(chat_id, f"Message {message_count}") if message_count == 0: - logger.info("Successfully began sending messages") + logger.info(f"Successfully began sending {num_messages} messages") elif message_count % 10 == 0: logger.debug(f"Sent {message_count} messages") diff --git a/src/setup_status.py b/src/setup_status.py new file mode 100644 index 0000000..934606d --- /dev/null +++ b/src/setup_status.py @@ -0,0 +1,101 @@ +# Python Imports +import asyncio +import logging +import time +from concurrent.futures import as_completed, ThreadPoolExecutor + +# Project Imports +from src.status_backend import StatusBackend + +logger = logging.getLogger(__name__) + + +async def initialize_nodes_application(pod_names: list[str], wakuV2LightClient=False) -> dict[str, StatusBackend]: + # We don't need a lock here because we cannot have two pods with the same name, and no other operations are done. + nodes_status: dict[str, StatusBackend] = {} + + async def _init_status(pod_name: str): + try: + status_backend = StatusBackend( + url=f"http://{pod_name}:3333", + await_signals=["messages.new", "message.delivered", "node.ready", "node.started", "node.login", + "node.stopped"] + ) + await status_backend.start_status_backend() + await status_backend.create_account_and_login(wakuV2LightClient=wakuV2LightClient) + await status_backend.wallet_service.start_wallet() + await status_backend.wakuext_service.start_messenger() + nodes_status[pod_name.split(".")[0]] = status_backend + except AssertionError as e: + logger.error(f"Error initializing StatusBackend for pod {pod_name}: {e}") + raise + + await asyncio.gather(*[_init_status(pod) for pod in pod_names]) + + logger.info(f"All {len(pod_names)} nodes have been initialized successfully") + return nodes_status + + +async def request_join_nodes_to_community(backend_nodes: dict[str, StatusBackend], nodes_to_join: list[str], community_id: str): + async def _request_to_join_to_community(node: StatusBackend, community_id: str) -> str: + try: + _ = await node.wakuext_service.fetch_community(community_id) + response_to_join = await node.wakuext_service.request_to_join_community(community_id) + join_id = response_to_join.get("result", {}).get("requestsToJoinCommunity", [{}])[0].get("id") + + return join_id + + except AssertionError as e: + logger.error(f"Error requesting to join on StatusBackend {node.base_url}: {e}") + raise + + join_ids = await asyncio.gather(*[_request_to_join_to_community(backend_nodes[node], community_id) for node in nodes_to_join]) + + logger.info(f"All {len(nodes_to_join)} nodes have been joined successfully to {community_id}") + + return join_ids + + +async def login_nodes(backend_nodes: dict[str, StatusBackend], include: list[str]): + async def _login_node(node: StatusBackend): + try: + await node.login(node.find_key_uid()) + await node.wakuext_service.start_messenger() + await node.wallet_service.start_wallet() + except AssertionError as e: + logger.error(f"Error logging out node {node}: {e}") + raise + + await asyncio.gather(*[_login_node(backend_nodes[node]) for node in include]) + + +async def accept_community_requests(node_owner: StatusBackend, join_ids: list[str]): + async def _accept_community_request(node: StatusBackend, join_id: str) -> str: + max_retries = 40 + retry_interval = 0.5 + + for attempt in range(max_retries): + try: + response = await node.wakuext_service.accept_request_to_join_community(join_id) + # We need to find the correspondant community of the join_id. We retrieve first chat because should be + # the only one. We do this because there can be several communities if we reuse the node. + # TODO why it returns the information of all communities? + if response.get("result"): + for request in response.get("result").get("requestsToJoinCommunity"): + if request.get("id") == join_id: + for community in response.get("result").get("communities"): + if community.get("id") == request.get("communityId"): + return list(community.get("chats").keys())[0] + except Exception as e: + logging.error(f"Attempt {attempt + 1}/{max_retries}: Unexpected error: {e}") + time.sleep(retry_interval) + + raise Exception( + f"Failed to accept request to join community in {max_retries * retry_interval} seconds." + ) + + chat_ids = await asyncio.gather(*[_accept_community_request(node_owner, join_id) for join_id in join_ids]) + logger.info(f"All {len(join_ids)} nodes have been accepted successfully") + + # Same chat ID for everyone + return chat_ids[0] diff --git a/src/signal_client.py b/src/signal_client.py index a39e8cf..332f9cb 100644 --- a/src/signal_client.py +++ b/src/signal_client.py @@ -99,7 +99,7 @@ class AsyncSignalClient: raise ValueError(f"Signal type {signal_type} is not in the list of awaited signals") try: signal = await asyncio.wait_for(self.signal_queues[signal_type].get(), timeout) - logger.info(f"Received {signal_type} signal: {signal}") + logger.debug(f"Received {signal_type} signal: {signal}") return signal except asyncio.TimeoutError: raise TimeoutError(f"Signal {signal_type} not received in {timeout} seconds") @@ -116,9 +116,9 @@ class AsyncSignalClient: return self.signal_queues[signal_type].recent() async def wait_for_login(self) -> dict: - logger.info("Waiting for login signal...") + logger.debug("Waiting for login signal...") signal = await self.wait_for_signal(SignalType.NODE_LOGIN.value) - logger.info(f"Login signal received: {signal}") + logger.debug(f"Login signal received: {signal}") if "error" in signal.get("event", {}): error_details = signal["event"]["error"] assert not error_details, f"Unexpected error during login: {error_details}" diff --git a/src/status_backend.py b/src/status_backend.py index c8fd638..554bbb3 100644 --- a/src/status_backend.py +++ b/src/status_backend.py @@ -41,6 +41,7 @@ class StatusBackend: pass async def shutdown(self): + await self.logout() await self.signal.__aexit__(None, None, None) await self.rpc.__aexit__(None, None, None) await self.session.close() @@ -78,7 +79,6 @@ class StatusBackend: await self.__aenter__() try: await self.logout() - logger.debug("Successfully logged out") except AssertionError: logger.debug("Failed to log out") @@ -156,7 +156,10 @@ class StatusBackend: return response async def logout(self) -> dict: - return await self.api_valid_request("Logout", {}) + json_response = await self.api_valid_request("Logout", {}) + _ = await self.signal.wait_for_logout() + logger.debug("Successfully logged out") + return json_response def set_public_key(self, signal_data: dict): self.public_key = signal_data.get("event", {}).get("settings", {}).get("public-key")