Request to join comm mix (#17)

* Add reject_community_requests

* Add decline_request_to_join_community

* Minor comments

* Add missing code

* Fix rebased code
This commit is contained in:
Alberto Soutullo
2025-09-26 14:47:41 +02:00
committed by GitHub
parent d6c71775a6
commit a38160ac1a
3 changed files with 71 additions and 3 deletions

View File

@@ -9,7 +9,8 @@ import src.logger
from src import kube_utils
from src import setup_status
from src.inject_messages import inject_messages
from src.setup_status import request_join_nodes_to_community, login_nodes, accept_community_requests
from src.setup_status import request_join_nodes_to_community, login_nodes, accept_community_requests, \
reject_community_requests
logger = logging.getLogger(__name__)
@@ -151,3 +152,36 @@ async def message_sending():
await asyncio.gather(*[node.shutdown() for node in relay_nodes.values()])
logger.info("Finished store_performance")
async def request_to_join_community_mix():
# 1 community owner
# 500 user nodes
# 200 joined
# 300 didn't
# -> 200/300 send request to owner
# -> request each second or all at same time
# -> accepts 100 and reject 100 from those 200
kube_utils.setup_kubernetes_client()
backend_relay_pods = kube_utils.get_pods("status-backend-relay", "status-go-test")
relay_nodes = await setup_status.initialize_nodes_application(backend_relay_pods)
name = f"test_community_{''.join(random.choices(string.ascii_letters, k=10))}"
logger.info(f"Creating community {name}")
response = relay_nodes["status-backend-relay-0"].wakuext_service.create_community(name)
community_id = response.get("result", {}).get("communities", [{}])[0].get("id")
logger.info(f"Community {name} created with ID {community_id}")
owner = relay_nodes["status-backend-relay-0"]
nodes = [key for key in relay_nodes.keys() if key != "status-backend-relay-0"]
nodes_200 = nodes[:200]
nodes_300 = nodes[200:]
join_ids = await request_join_nodes_to_community(relay_nodes, nodes_200, community_id)
_ = await accept_community_requests(owner, join_ids)
join_ids = await request_join_nodes_to_community(relay_nodes, nodes_300[:200], community_id)
_ = accept_community_requests(owner, join_ids[:100])
await reject_community_requests(owner, join_ids[:100]) # TODO fails because can't find community?
logger.info("Shutting down node connections")
await asyncio.gather(*[node.shutdown() for node in relay_nodes.values()])
logger.info("Finished store_performance")

View File

@@ -2,7 +2,6 @@
import asyncio
import logging
import time
from concurrent.futures import as_completed, ThreadPoolExecutor
# Project Imports
from src.status_backend import StatusBackend
@@ -51,7 +50,7 @@ async def request_join_nodes_to_community(backend_nodes: dict[str, StatusBackend
join_ids = await asyncio.gather(*[_request_to_join_to_community(backend_nodes[node], community_id) for node in nodes_to_join])
logger.info(f"All {len(nodes_to_join)} nodes have been joined successfully to {community_id}")
logger.info(f"All {len(nodes_to_join)} nodes have requested joined a community successfully to {community_id}")
return join_ids
@@ -69,6 +68,7 @@ async def login_nodes(backend_nodes: dict[str, StatusBackend], include: list[str
await asyncio.gather(*[_login_node(backend_nodes[node]) for node in include])
# TODO add an accept rate
async def accept_community_requests(node_owner: StatusBackend, join_ids: list[str]):
async def _accept_community_request(node: StatusBackend, join_id: str) -> str:
max_retries = 40
@@ -99,3 +99,32 @@ async def accept_community_requests(node_owner: StatusBackend, join_ids: list[st
# Same chat ID for everyone
return chat_ids[0]
async def reject_community_requests(owner: StatusBackend, join_ids: list[str]):
async def _reject_community_request(node: StatusBackend, join_id: str):
max_retries = 40
retry_interval = 0.5
for attempt in range(max_retries):
try:
response = await node.wakuext_service.request_to_join_community(join_id)
# We need to find the correspondant community of the join_id. We retrieve first chat because should be
# the only one. We do this because there can be several communities if we reuse the node.
# TODO why it returns the information of all communities?
if response.get("result"):
for request in response.get("result").get("requestsToJoinCommunity"):
if request.get("id") == join_id:
for community in response.get("result").get("communities"):
if community.get("id") == request.get("communityId"):
return list(community.get("chats").keys())[0]
except Exception as e:
logging.error(f"Attempt {attempt + 1}/{max_retries}: Unexpected error: {e}")
time.sleep(retry_interval)
raise Exception(
f"Failed to accept request to join community in {max_retries * retry_interval} seconds."
)
_ = await asyncio.gather(*[_reject_community_request(owner, join_id) for join_id in join_ids])
logger.info(f"All {len(join_ids)} nodes have been rejected successfully")

View File

@@ -39,6 +39,11 @@ class WakuextAsyncService(AsyncService):
json_response = await self.rpc_request("acceptRequestToJoinCommunity", params)
return json_response
async def decline_request_to_join_community(self, request_to_join_id: str) -> dict:
params = [{"id": request_to_join_id}]
json_response = await self.rpc_request("declineRequestToJoinCommunity", params)
return json_response
async def send_chat_message(self, chat_id: str, message: str, content_type: int = 1) -> dict:
# TODO content type can always be 1? (plain TEXT), does it need to be community type for communities?
params = [{"chatId": chat_id, "text": message, "contentType": content_type}]