diff --git a/src/async_utils.py b/src/async_utils.py index 632f868..53ed7d6 100644 --- a/src/async_utils.py +++ b/src/async_utils.py @@ -2,18 +2,27 @@ import asyncio import logging import traceback +from collections.abc import Callable from functools import partial -from typing import List, Tuple +from typing import Literal, Any # Project Imports +from src.dataclasses import ResultEntry +RequestResult = tuple[partial, ResultEntry] +RequestError = tuple[Exception, str] + +TaskOk = tuple[Literal["ok"], RequestResult] +TaskErr = tuple[Literal["err"], RequestError] +TaskResult = TaskOk | TaskErr + +CollectedItem = tuple[str, ResultEntry] + logger = logging.getLogger(__name__) -# [(node_sender, {node_receiver: (timestamp, message)}), ...] -RequestResult = Tuple[str, dict[str, Tuple[int, str]]] -async def launch_workers(worker_tasks: List[partial], done_queue: asyncio.Queue[tuple[str, object]], intermediate_delay: float, +async def launch_workers(worker_tasks: list[partial], done_queue: asyncio.Queue[TaskResult], intermediate_delay: float, max_in_flight: int = 0) -> None: sem = asyncio.Semaphore(max_in_flight) if max_in_flight > 0 else None @@ -42,16 +51,40 @@ async def launch_workers(worker_tasks: List[partial], done_queue: asyncio.Queue[ await asyncio.sleep(intermediate_delay) -async def collect_results(done_q: asyncio.Queue[tuple[str, object]], total_tasks: int) -> List[RequestResult]: - collected: List[RequestResult] = [] +async def collect_results_from_tasks(done_queue: asyncio.Queue[TaskResult | None], + results_queue: asyncio.Queue[CollectedItem], + total_tasks: int, finished_evt: asyncio.Event): for _ in range(total_tasks): - status, payload = await done_q.get() + status, payload = await done_queue.get() if status == "ok": partial_object, results = payload logger.info(f"Task completed: {partial_object.func.__name__} {partial_object.args[1:]}") - collected.append(results) + results_queue.put_nowait((partial_object.func.__name__, results)) else: e, tb = payload # from the launcher callback logger.error(f"Task failed: {e}\n{tb}", e, tb) - return collected + logger.debug("Event is finished") + finished_evt.set() + + +async def function_on_queue_item(queue: asyncio.Queue[CollectedItem], async_func: Callable, + results: asyncio.Queue[Any]) -> None: + while True: + item = await queue.get() + if item is None: + queue.task_done() + break + result = await async_func(item) + results.put_nowait(result) + queue.task_done() + + +async def cleanup_queue_on_event(finished_evt: asyncio.Event, queue: asyncio.Queue, consumers: int = 1): + await finished_evt.wait() + logger.debug("Event triggered. Waiting for queue to be finished.") + await queue.join() + logger.debug("Queue finished.") + + for _ in range(consumers): + queue.put_nowait(None) diff --git a/src/benchmark_scenarios/private_chats.py b/src/benchmark_scenarios/private_chats.py index 840bc19..9331508 100644 --- a/src/benchmark_scenarios/private_chats.py +++ b/src/benchmark_scenarios/private_chats.py @@ -6,13 +6,14 @@ import random # Project Imports import src.logger from src import kube_utils, setup_status +from src.async_utils import CollectedItem, cleanup_queue_on_event from src.inject_messages import inject_messages_one_to_one, inject_messages_group_chat from src.setup_status import initialize_nodes_application, send_friend_requests, accept_friend_requests, \ decline_friend_requests, create_group_chat, add_contacts logger = logging.getLogger(__name__) -async def idle_relay(): +async def idle_relay(consumers: int = 4): # 1 relay node alice # 100 relay nodes - friends # friends have accepted contact request with alice, who accepted it @@ -24,15 +25,20 @@ async def idle_relay(): alice = "status-backend-relay-0" friends = [key for key in relay_nodes.keys() if key != alice] - requests_made = await send_friend_requests(relay_nodes, [alice], friends, 1) - logger.info("Accepting friend requests") - delays = await accept_friend_requests(relay_nodes, requests_made) - # TODO: These delays include the accumulation of intermediate_delays, they are not accurate. - # Intermediate delay is needed to not saturate status node, otherwise request don't arrive. - # TODO: We should merge send and receive operations in asynq queues as well + results_queue: asyncio.Queue[CollectedItem | None] = asyncio.Queue() + finished_evt = asyncio.Event() + + send_task = asyncio.create_task(send_friend_requests(relay_nodes, results_queue, [alice], friends, finished_evt)) + accept_task = asyncio.create_task(accept_friend_requests(relay_nodes, results_queue, consumers)) + cleanup_task = asyncio.create_task(cleanup_queue_on_event(finished_evt, results_queue, 4)) + _, delays_queue, _ = await asyncio.gather(send_task, accept_task, cleanup_task) + + + delays: list[float] = [] + while not delays_queue.empty(): + delays.append(delays_queue.get_nowait()) + logger.info(f"Delays are: {delays}") - logger.info("Waiting 30 seconds") - await asyncio.sleep(30) logger.info("Shutting down node connections") await asyncio.gather(*[node.shutdown() for node in relay_nodes.values()]) diff --git a/src/dataclasses.py b/src/dataclasses.py new file mode 100644 index 0000000..073c336 --- /dev/null +++ b/src/dataclasses.py @@ -0,0 +1,10 @@ +# Python Imports +from dataclasses import dataclass + + +@dataclass +class ResultEntry: + sender: str + receiver: str + timestamp: int + result: str diff --git a/src/setup_status.py b/src/setup_status.py index d5e1ed9..21008aa 100644 --- a/src/setup_status.py +++ b/src/setup_status.py @@ -5,19 +5,23 @@ import random import string import time from functools import partial -from typing import Tuple, List +from typing import List # Project Imports -from src.async_utils import launch_workers, collect_results, RequestResult +from src.async_utils import launch_workers, collect_results_from_tasks, TaskResult, CollectedItem, \ + function_on_queue_item +from src.dataclasses import ResultEntry from src.enums import MessageContentType, SignalType from src.status_backend import StatusBackend logger = logging.getLogger(__name__) +NodesInformation = dict[str, StatusBackend] -async def initialize_nodes_application(pod_names: list[str], wakuV2LightClient=False) -> dict[str, StatusBackend]: + +async def initialize_nodes_application(pod_names: list[str], wakuV2LightClient=False) -> NodesInformation: # We don't need a lock here because we cannot have two pods with the same name, and no other operations are done. - nodes_status: dict[str, StatusBackend] = {} + nodes_status: NodesInformation = {} async def _init_status(pod_name: str): try: @@ -128,19 +132,23 @@ async def reject_community_requests(owner: StatusBackend, join_ids: list[str]): logger.info(f"All {len(join_ids)} nodes have been rejected successfully") -async def send_friend_requests(nodes: dict[str, StatusBackend], senders: list[str], receivers: list[str], - intermediate_delay: float, max_in_flight: int = 0) -> list[RequestResult]: +async def send_friend_requests(nodes: NodesInformation, + results_queue: asyncio.Queue[CollectedItem | None], + senders: list[str], receivers: list[str], + finished_evt: asyncio.Event, + intermediate_delay: float = 1, max_in_flight: int = 0): - async def _send_friend_request(nodes: dict[str, StatusBackend], sender: str, receiver: str): + async def _send_friend_request(nodes: NodesInformation, sender: str, receiver: str): response = await nodes[sender].wakuext_service.send_contact_request(nodes[receiver].public_key, "Friend Request") # Get responses and filter by contact requests to obtain request ids request_response = await get_messages_by_content_type(response, MessageContentType.CONTACT_REQUEST.value) - # Create a dict {receiver: (response_timestamp, request)}, using the first response (there is always only one friend request) - request_id = {receiver: (int(request_response[0].get("timestamp")), request_response[0].get("id"))} + # Create a ResultEntry using the first response (there is always only one friend request) + request_result = ResultEntry(sender=sender, receiver=receiver, timestamp=int(request_response[0].get("timestamp")), + result=request_response[0].get("id")) - return sender, request_id + return request_result - done_queue: asyncio.Queue[tuple[str, object]] = asyncio.Queue() + done_queue: asyncio.Queue[TaskResult | None] = asyncio.Queue() workers_to_launch = [ partial(_send_friend_request, nodes, sender, receiver) @@ -148,37 +156,30 @@ async def send_friend_requests(nodes: dict[str, StatusBackend], senders: list[st for receiver in receivers ] - launcher_task = asyncio.create_task( - launch_workers(workers_to_launch, done_queue, intermediate_delay, max_in_flight) - ) - collector_task = asyncio.create_task( - collect_results(done_queue, total_tasks=len(workers_to_launch)) - ) + collector_task = asyncio.create_task(collect_results_from_tasks(done_queue, results_queue, len(workers_to_launch), finished_evt)) + launcher_task = asyncio.create_task(launch_workers(workers_to_launch, done_queue, intermediate_delay, max_in_flight)) - _, collected = await asyncio.gather(launcher_task, collector_task) - - logger.info(f"All {len(collected)} friend requests processed (out of {len(jobs)}).") - return collected + await asyncio.gather(launcher_task, collector_task) -async def accept_friend_requests(nodes: dict[str, StatusBackend], - requests: List[Tuple[str, dict[str, Tuple[int, str]]]]) -> List[float]: - # Flatten all tasks into a single list and execute them "concurrently" - async def _accept_friend_request(nodes: dict[str, StatusBackend], sender: str, receiver: str, - timestamp_request_id: Tuple[int, str]): +async def accept_friend_requests(nodes: dict[str, StatusBackend], results_queue: asyncio.Queue[CollectedItem | None], + consumers: int) -> asyncio.Queue[float]: + # TODO: This should be activated when the signal is received instead of getting looped + async def _accept_friend_request(queue_result: CollectedItem): max_retries = 40 retry_interval = 2 for attempt in range(max_retries): + function_name, result_entry = queue_result try: - _ = await nodes[receiver].wakuext_service.accept_contact_request(timestamp_request_id[1]) - accepted_signal = f"@{nodes[receiver].public_key} accepted your contact request" - message = await nodes[sender].signal.find_signal_containing_string(SignalType.MESSAGES_NEW.value, - event_string=accepted_signal, - timeout=10) - return message[0] - int(timestamp_request_id[0]) // 1000 # Convert unix milliseconds to seconds + _ = await nodes[result_entry.receiver].wakuext_service.accept_contact_request(result_entry.result) + accepted_signal = f"@{nodes[result_entry.receiver].public_key} accepted your contact request" + message = await nodes[result_entry.sender].signal.find_signal_containing_string(SignalType.MESSAGES_NEW.value, + event_string=accepted_signal, + timeout=10) + return message[0] - int(result_entry.timestamp) // 1000 # Convert unix milliseconds to seconds except Exception as e: - logging.error(f"Attempt {attempt + 1}/{max_retries} from {sender} to {receiver}: " + logging.error(f"Attempt {attempt + 1}/{max_retries} from {result_entry.sender} to {result_entry.receiver}: " f"Unexpected error accepting friend request: {e}") time.sleep(2) @@ -186,17 +187,15 @@ async def accept_friend_requests(nodes: dict[str, StatusBackend], f"Failed to accept friend request in {max_retries * retry_interval} seconds." ) - delays = await asyncio.gather( - *[ - _accept_friend_request(nodes, sender, receiver, timestamp_requestid) - for sender, receivers in requests - for receiver, timestamp_requestid in receivers.items() - ] - ) + delays_queue: asyncio.Queue[float] = asyncio.Queue() - total_requests = sum(len(receivers) for delays, receivers in requests) - logger.info(f"All {total_requests} friend requests accepted.") - return delays + workers = [asyncio.create_task( + function_on_queue_item(results_queue, _accept_friend_request, delays_queue)) + for _ in range(consumers)] + + await asyncio.gather(*workers) + + return delays_queue async def add_contacts(nodes: dict[str, StatusBackend], adders: list[str], contacts: list[str]):