Setup StatusBackend functions (#29)

* Add functions to set up status nodes

* Fixed error when login into a node.

* Add functions to send and accept friend requests

* Split requests into apply and accept and other changes

* Started changing setup_status with previous changes

* Improve logout

* Cleanup setup_status.py

* Reduced logs to DEBUG to avoid noise

* Add subscription_performance

* Fix wrong import in communities.py
This commit is contained in:
Alberto Soutullo
2025-09-25 18:00:49 +02:00
committed by GitHub
parent 2c6f8a57b8
commit 466674bc48
6 changed files with 180 additions and 7 deletions

View File

View File

@@ -0,0 +1,69 @@
# Python Imports
import asyncio
import random
import string
import logging
# Project Imports
import src.logger
from src import kube_utils
from src import setup_status
from src.inject_messages import inject_messages
from src.setup_status import request_join_nodes_to_community, login_nodes, accept_community_requests
logger = logging.getLogger(__name__)
async def subscription_performance():
# 10 relay nodes including 1 publisher node
# 5 service nodes
# 500 light nodes
# One community setup
# All relay and light nodes have joined the community
# -> Relay and service nodes are running
# -> 1 relay node is injecting messages
# -> Start light nodes
# -> Measure time from start to time messages are being received on filter
kube_utils.setup_kubernetes_client()
backend_relay_pods = kube_utils.get_pods("status-backend-relay", "status-go-test")
backend_light_pods = kube_utils.get_pods("status-backend-light", "status-go-test")
relay_nodes, light_nodes = await asyncio.gather(
setup_status.initialize_nodes_application(backend_relay_pods),
setup_status.initialize_nodes_application(backend_light_pods, wakuV2LightClient=True)
)
name = f"test_community_{''.join(random.choices(string.ascii_letters, k=10))}"
logger.info(f"Creating community {name}")
response = await relay_nodes["status-backend-relay-0"].wakuext_service.create_community(name)
community_id = response.get("result", {}).get("communities", [{}])[0].get("id")
logger.info(f"Community {name} created with ID {community_id}")
owner = relay_nodes["status-backend-relay-0"]
nodes = [key for key in relay_nodes.keys() if key != "status-backend-relay-0"]
joins_ids_relay, join_ids_light = await asyncio.gather(
request_join_nodes_to_community(relay_nodes, nodes, community_id),
request_join_nodes_to_community(light_nodes, light_nodes.keys(), community_id)
)
logger.info(f"Join IDs: {joins_ids_relay}, \n {join_ids_light}")
chat_id_relays, chat_id_lights = await asyncio.gather(
accept_community_requests(owner, joins_ids_relay),
accept_community_requests(owner, join_ids_light),
)
message_task = asyncio.create_task(inject_messages(owner, 1, community_id + chat_id_relays, 30))
await asyncio.sleep(10)
logger.info("Logging out light nodes")
await asyncio.gather(*[node.logout() for node in light_nodes.values()])
logger.info("Logging in light nodes")
await login_nodes(light_nodes, light_nodes.keys())
await message_task
logger.info("Shutting down node connections")
await asyncio.gather(*[node.shutdown() for node in relay_nodes.values()])
await asyncio.gather(*[node.shutdown() for node in light_nodes.values()])
logger.info("Finished subscription_performance")

View File

@@ -11,11 +11,11 @@ async def inject_messages(pod: StatusBackend, msg_per_sec: int, chat_id: str, nu
delay = 1 / msg_per_sec
for message_count in range(num_messages):
try:
logger.info(f"Sending message {message_count}")
logger.debug(f"Sending message {message_count}")
await pod.wakuext_service.send_chat_message(chat_id, f"Message {message_count}")
if message_count == 0:
logger.info("Successfully began sending messages")
logger.info(f"Successfully began sending {num_messages} messages")
elif message_count % 10 == 0:
logger.debug(f"Sent {message_count} messages")

101
src/setup_status.py Normal file
View File

@@ -0,0 +1,101 @@
# Python Imports
import asyncio
import logging
import time
from concurrent.futures import as_completed, ThreadPoolExecutor
# Project Imports
from src.status_backend import StatusBackend
logger = logging.getLogger(__name__)
async def initialize_nodes_application(pod_names: list[str], wakuV2LightClient=False) -> dict[str, StatusBackend]:
# We don't need a lock here because we cannot have two pods with the same name, and no other operations are done.
nodes_status: dict[str, StatusBackend] = {}
async def _init_status(pod_name: str):
try:
status_backend = StatusBackend(
url=f"http://{pod_name}:3333",
await_signals=["messages.new", "message.delivered", "node.ready", "node.started", "node.login",
"node.stopped"]
)
await status_backend.start_status_backend()
await status_backend.create_account_and_login(wakuV2LightClient=wakuV2LightClient)
await status_backend.wallet_service.start_wallet()
await status_backend.wakuext_service.start_messenger()
nodes_status[pod_name.split(".")[0]] = status_backend
except AssertionError as e:
logger.error(f"Error initializing StatusBackend for pod {pod_name}: {e}")
raise
await asyncio.gather(*[_init_status(pod) for pod in pod_names])
logger.info(f"All {len(pod_names)} nodes have been initialized successfully")
return nodes_status
async def request_join_nodes_to_community(backend_nodes: dict[str, StatusBackend], nodes_to_join: list[str], community_id: str):
async def _request_to_join_to_community(node: StatusBackend, community_id: str) -> str:
try:
_ = await node.wakuext_service.fetch_community(community_id)
response_to_join = await node.wakuext_service.request_to_join_community(community_id)
join_id = response_to_join.get("result", {}).get("requestsToJoinCommunity", [{}])[0].get("id")
return join_id
except AssertionError as e:
logger.error(f"Error requesting to join on StatusBackend {node.base_url}: {e}")
raise
join_ids = await asyncio.gather(*[_request_to_join_to_community(backend_nodes[node], community_id) for node in nodes_to_join])
logger.info(f"All {len(nodes_to_join)} nodes have been joined successfully to {community_id}")
return join_ids
async def login_nodes(backend_nodes: dict[str, StatusBackend], include: list[str]):
async def _login_node(node: StatusBackend):
try:
await node.login(node.find_key_uid())
await node.wakuext_service.start_messenger()
await node.wallet_service.start_wallet()
except AssertionError as e:
logger.error(f"Error logging out node {node}: {e}")
raise
await asyncio.gather(*[_login_node(backend_nodes[node]) for node in include])
async def accept_community_requests(node_owner: StatusBackend, join_ids: list[str]):
async def _accept_community_request(node: StatusBackend, join_id: str) -> str:
max_retries = 40
retry_interval = 0.5
for attempt in range(max_retries):
try:
response = await node.wakuext_service.accept_request_to_join_community(join_id)
# We need to find the correspondant community of the join_id. We retrieve first chat because should be
# the only one. We do this because there can be several communities if we reuse the node.
# TODO why it returns the information of all communities?
if response.get("result"):
for request in response.get("result").get("requestsToJoinCommunity"):
if request.get("id") == join_id:
for community in response.get("result").get("communities"):
if community.get("id") == request.get("communityId"):
return list(community.get("chats").keys())[0]
except Exception as e:
logging.error(f"Attempt {attempt + 1}/{max_retries}: Unexpected error: {e}")
time.sleep(retry_interval)
raise Exception(
f"Failed to accept request to join community in {max_retries * retry_interval} seconds."
)
chat_ids = await asyncio.gather(*[_accept_community_request(node_owner, join_id) for join_id in join_ids])
logger.info(f"All {len(join_ids)} nodes have been accepted successfully")
# Same chat ID for everyone
return chat_ids[0]

View File

@@ -99,7 +99,7 @@ class AsyncSignalClient:
raise ValueError(f"Signal type {signal_type} is not in the list of awaited signals")
try:
signal = await asyncio.wait_for(self.signal_queues[signal_type].get(), timeout)
logger.info(f"Received {signal_type} signal: {signal}")
logger.debug(f"Received {signal_type} signal: {signal}")
return signal
except asyncio.TimeoutError:
raise TimeoutError(f"Signal {signal_type} not received in {timeout} seconds")
@@ -116,9 +116,9 @@ class AsyncSignalClient:
return self.signal_queues[signal_type].recent()
async def wait_for_login(self) -> dict:
logger.info("Waiting for login signal...")
logger.debug("Waiting for login signal...")
signal = await self.wait_for_signal(SignalType.NODE_LOGIN.value)
logger.info(f"Login signal received: {signal}")
logger.debug(f"Login signal received: {signal}")
if "error" in signal.get("event", {}):
error_details = signal["event"]["error"]
assert not error_details, f"Unexpected error during login: {error_details}"

View File

@@ -41,6 +41,7 @@ class StatusBackend:
pass
async def shutdown(self):
await self.logout()
await self.signal.__aexit__(None, None, None)
await self.rpc.__aexit__(None, None, None)
await self.session.close()
@@ -78,7 +79,6 @@ class StatusBackend:
await self.__aenter__()
try:
await self.logout()
logger.debug("Successfully logged out")
except AssertionError:
logger.debug("Failed to log out")
@@ -156,7 +156,10 @@ class StatusBackend:
return response
async def logout(self) -> dict:
return await self.api_valid_request("Logout", {})
json_response = await self.api_valid_request("Logout", {})
_ = await self.signal.wait_for_logout()
logger.debug("Successfully logged out")
return json_response
def set_public_key(self, signal_data: dict):
self.public_key = signal_data.get("event", {}).get("settings", {}).get("public-key")