mirror of
https://github.com/vacp2p/status-benchmarks.git
synced 2026-01-07 22:54:01 -05:00
Idle relay (#21)
* Add send_friend_requests and accept_friend_requests setup functions * Add idle_relay scenario * Add missing import
This commit is contained in:
29
src/benchmark_scenarios/private_chats.py
Normal file
29
src/benchmark_scenarios/private_chats.py
Normal file
@@ -0,0 +1,29 @@
|
||||
# Python Imports
|
||||
import asyncio
|
||||
import logging
|
||||
import random
|
||||
|
||||
# Project Imports
|
||||
import src.logger
|
||||
from src import kube_utils
|
||||
from src.setup_status import initialize_nodes_application, send_friend_requests, accept_friend_requests
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
async def idle_relay():
|
||||
# 1 relay node alice
|
||||
# 100 relay nodes - friends
|
||||
# friends have accepted contact request with alice, who accepted it
|
||||
|
||||
kube_utils.setup_kubernetes_client()
|
||||
backend_relay_pods = kube_utils.get_pods("status-backend-relay", "status-go-test")
|
||||
relay_nodes = await initialize_nodes_application(backend_relay_pods)
|
||||
|
||||
alice = "status-backend-relay-0"
|
||||
friends = [key for key in relay_nodes.keys() if key != alice]
|
||||
requests_made = await send_friend_requests(relay_nodes, [alice], friends)
|
||||
_ = await accept_friend_requests(relay_nodes, requests_made)
|
||||
|
||||
logger.info("Shutting down node connections")
|
||||
await asyncio.gather(*[node.shutdown() for node in relay_nodes.values()])
|
||||
logger.info("Finished idle_relay")
|
||||
@@ -4,6 +4,7 @@ import logging
|
||||
import time
|
||||
|
||||
# Project Imports
|
||||
from src.enums import MessageContentType, SignalType
|
||||
from src.status_backend import StatusBackend
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -121,6 +122,55 @@ async def reject_community_requests(owner: StatusBackend, join_ids: list[str]):
|
||||
|
||||
logger.info(f"All {len(join_ids)} nodes have been rejected successfully")
|
||||
|
||||
async def send_friend_requests(nodes: dict[str, StatusBackend], senders: list[str], receivers: list[str]):
|
||||
async def _send_friend_request(nodes: dict[str, StatusBackend], sender: str, receivers: list[str]):
|
||||
# Send contact requests from sender -> receivers
|
||||
responses = await asyncio.gather(*[nodes[sender].wakuext_service.send_contact_request(nodes[node].public_key, "asd") for node in receivers])
|
||||
# Get responses and filter by contact requests
|
||||
request_responses = await asyncio.gather(*[get_messages_by_content_type(response, MessageContentType.CONTACT_REQUEST.value) for response in responses])
|
||||
# Create a dict {receiver: request}, using the first response (there is always only one friend request)
|
||||
request_ids = {receiver: request_responses[i][0].get("id") for i, receiver in enumerate(receivers)}
|
||||
|
||||
return sender, request_ids
|
||||
|
||||
requests_made = await asyncio.gather(*[_send_friend_request(nodes, sender, receivers) for sender in senders])
|
||||
# Returns a list of tuples like: [(sender name, {receiver: request_id, ...})]
|
||||
logger.info(f"All {len(receivers)} friend requests sent.")
|
||||
return requests_made
|
||||
|
||||
|
||||
async def accept_friend_requests(nodes: dict[str, StatusBackend], requests: list[(str, dict[str, str])]):
|
||||
# Flatten all tasks into a single list and execute them concurrently
|
||||
async def _accept_friend_request(nodes: dict[str, StatusBackend], sender: str, receiver: str, request_id: str):
|
||||
max_retries = 40
|
||||
retry_interval = 0.5
|
||||
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
_ = await nodes[receiver].wakuext_service.accept_contact_request(request_id)
|
||||
accepted_signal = f"@{nodes[receiver].public_key} accepted your contact request"
|
||||
signal = await nodes[sender].signal.find_signal_containing_string(SignalType.MESSAGES_NEW.value, event_string=accepted_signal)
|
||||
return signal
|
||||
except Exception as e:
|
||||
logging.error(f"Attempt {attempt + 1}/{max_retries}: Unexpected error: {e}")
|
||||
time.sleep(retry_interval)
|
||||
|
||||
raise Exception(
|
||||
f"Failed to accept friend request in {max_retries * retry_interval} seconds."
|
||||
)
|
||||
|
||||
_ = await asyncio.gather(
|
||||
*[
|
||||
_accept_friend_request(nodes, sender, receiver, request_id)
|
||||
for sender, receivers in requests
|
||||
for receiver, request_id in receivers.items()
|
||||
]
|
||||
)
|
||||
|
||||
total_requests = sum(len(receivers) for _, receivers in requests)
|
||||
logger.info(f"All {total_requests} friend requests accepted.")
|
||||
|
||||
|
||||
|
||||
async def get_messages_by_content_type(response: dict, content_type: str, message_pattern: str="") -> list[dict]:
|
||||
matched_messages = []
|
||||
|
||||
Reference in New Issue
Block a user