mirror of
https://github.com/vacp2p/status-benchmarks.git
synced 2026-01-09 15:37:54 -05:00
Contact request (#23)
* Add decline_friend_requests * Add contact_request * Add decline_contact_request to wakuext
This commit is contained in:
@@ -5,8 +5,9 @@ import random
|
|||||||
|
|
||||||
# Project Imports
|
# Project Imports
|
||||||
import src.logger
|
import src.logger
|
||||||
from src import kube_utils
|
from src import kube_utils, setup_status
|
||||||
from src.setup_status import initialize_nodes_application, send_friend_requests, accept_friend_requests
|
from src.setup_status import initialize_nodes_application, send_friend_requests, accept_friend_requests, \
|
||||||
|
decline_friend_requests
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -46,3 +47,59 @@ async def idle_light():
|
|||||||
logger.info("Shutting down node connections")
|
logger.info("Shutting down node connections")
|
||||||
await asyncio.gather(*[node.shutdown() for node in light_nodes.values()])
|
await asyncio.gather(*[node.shutdown() for node in light_nodes.values()])
|
||||||
logger.info("Finished idle_light")
|
logger.info("Finished idle_light")
|
||||||
|
|
||||||
|
|
||||||
|
async def contact_request():
|
||||||
|
# relay: 25 requesters, 60 requested (20 accept, 20 reject, 20 ignore), 25 idle
|
||||||
|
# light: 25 requesters, 60 requested (20 accept, 20 reject, 20 ignore), 25 idle
|
||||||
|
# -> Each requester send a contact request to 3 nodes in the requested set (random selection)
|
||||||
|
# -> accepting nodes in the set accept the request
|
||||||
|
# -> rejecting nodes rejects the request
|
||||||
|
# -> ignoring nodes ignore the request
|
||||||
|
# measure: Success rate of contact requests received
|
||||||
|
# measure: Success rate of contact request accepted
|
||||||
|
kube_utils.setup_kubernetes_client()
|
||||||
|
backend_relay_pods = kube_utils.get_pods("status-backend-relay", "status-go-test")
|
||||||
|
backend_light_pods = kube_utils.get_pods("status-backend-light", "status-go-test")
|
||||||
|
|
||||||
|
relay_nodes, light_nodes = await asyncio.gather(
|
||||||
|
setup_status.initialize_nodes_application(backend_relay_pods),
|
||||||
|
setup_status.initialize_nodes_application(backend_light_pods, wakuV2LightClient=True)
|
||||||
|
)
|
||||||
|
|
||||||
|
backend_relay_pods = [pod_name.split(".")[0] for pod_name in backend_relay_pods]
|
||||||
|
backend_light_pods = [pod_name.split(".")[0] for pod_name in backend_light_pods]
|
||||||
|
|
||||||
|
relay_requesters = backend_relay_pods[:25]
|
||||||
|
light_requesters = backend_light_pods[:25]
|
||||||
|
relay_requested = backend_relay_pods[25:85]
|
||||||
|
light_requested = backend_light_pods[25:85]
|
||||||
|
|
||||||
|
# Returns a list of tuples like: [(sender name, {receiver: request_id, ...})]
|
||||||
|
relay_friend_requests, light_friend_requests = await asyncio.gather(
|
||||||
|
*[send_friend_requests(relay_nodes, [requester], random.sample(relay_requested, 3)) for requester in
|
||||||
|
relay_requesters],
|
||||||
|
*[send_friend_requests(light_nodes, [requester], random.sample(light_requested, 3)) for requester in
|
||||||
|
light_requesters]
|
||||||
|
)
|
||||||
|
|
||||||
|
to_accept_requests_relay = random.sample(relay_friend_requests, 20)
|
||||||
|
remaining_relay = list(set(relay_friend_requests) - set(to_accept_requests_relay))
|
||||||
|
to_reject_requests_relay = random.sample(remaining_relay, 20)
|
||||||
|
|
||||||
|
to_accept_requests_light = random.sample(light_friend_requests, 20)
|
||||||
|
remaining_light = list(set(light_friend_requests) - set(to_accept_requests_light))
|
||||||
|
to_reject_requests_light = random.sample(remaining_light, 20)
|
||||||
|
|
||||||
|
|
||||||
|
_ = await asyncio.gather(
|
||||||
|
*[accept_friend_requests(relay_nodes, to_accept_requests_relay)],
|
||||||
|
*[accept_friend_requests(light_nodes, to_accept_requests_light)],
|
||||||
|
*[decline_friend_requests(relay_nodes, to_reject_requests_relay)],
|
||||||
|
*[decline_friend_requests(light_nodes, to_reject_requests_light)],
|
||||||
|
)
|
||||||
|
|
||||||
|
logger.info("Shutting down node connections")
|
||||||
|
await asyncio.gather(*[node.shutdown() for node in relay_nodes.values()],
|
||||||
|
*[node.shutdown() for node in light_nodes.values()])
|
||||||
|
logger.info("Finished contact_request")
|
||||||
|
|||||||
@@ -172,6 +172,36 @@ async def accept_friend_requests(nodes: dict[str, StatusBackend], requests: list
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
async def decline_friend_requests(nodes: dict[str, StatusBackend], requests: list[(str, dict[str, str])]):
|
||||||
|
# Flatten all tasks into a single list and execute them concurrently
|
||||||
|
async def _decline_friend_request(nodes: dict[str, StatusBackend], sender: str, receiver: str, request_id: str):
|
||||||
|
max_retries = 40
|
||||||
|
retry_interval = 0.5
|
||||||
|
|
||||||
|
for attempt in range(max_retries):
|
||||||
|
try:
|
||||||
|
_ = await nodes[receiver].wakuext_service.decline_contact_request(request_id)
|
||||||
|
return _
|
||||||
|
except Exception as e:
|
||||||
|
logging.error(f"Attempt {attempt + 1}/{max_retries}: Unexpected error: {e}")
|
||||||
|
time.sleep(retry_interval)
|
||||||
|
|
||||||
|
raise Exception(
|
||||||
|
f"Failed to reject friend request in {max_retries * retry_interval} seconds."
|
||||||
|
)
|
||||||
|
|
||||||
|
_ = await asyncio.gather(
|
||||||
|
*[
|
||||||
|
_decline_friend_request(nodes, sender, receiver, request_id)
|
||||||
|
for sender, receivers in requests
|
||||||
|
for receiver, request_id in receivers.items()
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
total_requests = sum(len(receivers) for _, receivers in requests)
|
||||||
|
logger.info(f"All {total_requests} friend requests rejected.")
|
||||||
|
|
||||||
|
|
||||||
async def get_messages_by_content_type(response: dict, content_type: str, message_pattern: str="") -> list[dict]:
|
async def get_messages_by_content_type(response: dict, content_type: str, message_pattern: str="") -> list[dict]:
|
||||||
matched_messages = []
|
matched_messages = []
|
||||||
messages = response.get("result", {}).get("messages", [])
|
messages = response.get("result", {}).get("messages", [])
|
||||||
|
|||||||
@@ -59,3 +59,8 @@ class WakuextAsyncService(AsyncService):
|
|||||||
params = [{"id": request_id}]
|
params = [{"id": request_id}]
|
||||||
json_response = await self.rpc_request("acceptContactRequest", params)
|
json_response = await self.rpc_request("acceptContactRequest", params)
|
||||||
return json_response
|
return json_response
|
||||||
|
|
||||||
|
async def decline_contact_request(self, request_id: str) -> dict:
|
||||||
|
params = [{"id": request_id}]
|
||||||
|
json_response = await self.rpc_request("declineContactRequest", params)
|
||||||
|
return json_response
|
||||||
|
|||||||
Reference in New Issue
Block a user