Multiple consumers (#35)

* Improve async_utils.py for use multiple consumers and add custom typings

* Use queues for getting sender requests, and add custom typing

* Add function_on_queue_item

* Accept friend requests with multiple consumers

* Use previous changes in idle_relay scenario.

* Use consumers variable

* Simplify sentinel to decouple consumers from send_friend_requests and accept_friend_requests

* Add missing dataclasses.py

* Update src/setup_status.py

Co-authored-by: PearsonWhite <PearsonWhite602@gmail.com>

* Update src/setup_status.py

Co-authored-by: PearsonWhite <PearsonWhite602@gmail.com>

* Create cleanup_queue_on_event

* Decouple queue cleanup from accept friend request

---------

Co-authored-by: PearsonWhite <PearsonWhite602@gmail.com>
This commit is contained in:
Alberto Soutullo
2025-10-17 19:03:35 +02:00
committed by GitHub
parent b2e19c4747
commit 3061696515
4 changed files with 109 additions and 61 deletions

View File

@@ -2,18 +2,27 @@
import asyncio
import logging
import traceback
from collections.abc import Callable
from functools import partial
from typing import List, Tuple
from typing import Literal, Any
# Project Imports
from src.dataclasses import ResultEntry
RequestResult = tuple[partial, ResultEntry]
RequestError = tuple[Exception, str]
TaskOk = tuple[Literal["ok"], RequestResult]
TaskErr = tuple[Literal["err"], RequestError]
TaskResult = TaskOk | TaskErr
CollectedItem = tuple[str, ResultEntry]
logger = logging.getLogger(__name__)
# [(node_sender, {node_receiver: (timestamp, message)}), ...]
RequestResult = Tuple[str, dict[str, Tuple[int, str]]]
async def launch_workers(worker_tasks: List[partial], done_queue: asyncio.Queue[tuple[str, object]], intermediate_delay: float,
async def launch_workers(worker_tasks: list[partial], done_queue: asyncio.Queue[TaskResult], intermediate_delay: float,
max_in_flight: int = 0) -> None:
sem = asyncio.Semaphore(max_in_flight) if max_in_flight > 0 else None
@@ -42,16 +51,40 @@ async def launch_workers(worker_tasks: List[partial], done_queue: asyncio.Queue[
await asyncio.sleep(intermediate_delay)
async def collect_results(done_q: asyncio.Queue[tuple[str, object]], total_tasks: int) -> List[RequestResult]:
collected: List[RequestResult] = []
async def collect_results_from_tasks(done_queue: asyncio.Queue[TaskResult | None],
results_queue: asyncio.Queue[CollectedItem],
total_tasks: int, finished_evt: asyncio.Event):
for _ in range(total_tasks):
status, payload = await done_q.get()
status, payload = await done_queue.get()
if status == "ok":
partial_object, results = payload
logger.info(f"Task completed: {partial_object.func.__name__} {partial_object.args[1:]}")
collected.append(results)
results_queue.put_nowait((partial_object.func.__name__, results))
else:
e, tb = payload # from the launcher callback
logger.error(f"Task failed: {e}\n{tb}", e, tb)
return collected
logger.debug("Event is finished")
finished_evt.set()
async def function_on_queue_item(queue: asyncio.Queue[CollectedItem], async_func: Callable,
results: asyncio.Queue[Any]) -> None:
while True:
item = await queue.get()
if item is None:
queue.task_done()
break
result = await async_func(item)
results.put_nowait(result)
queue.task_done()
async def cleanup_queue_on_event(finished_evt: asyncio.Event, queue: asyncio.Queue, consumers: int = 1):
await finished_evt.wait()
logger.debug("Event triggered. Waiting for queue to be finished.")
await queue.join()
logger.debug("Queue finished.")
for _ in range(consumers):
queue.put_nowait(None)

View File

@@ -6,13 +6,14 @@ import random
# Project Imports
import src.logger
from src import kube_utils, setup_status
from src.async_utils import CollectedItem, cleanup_queue_on_event
from src.inject_messages import inject_messages_one_to_one, inject_messages_group_chat
from src.setup_status import initialize_nodes_application, send_friend_requests, accept_friend_requests, \
decline_friend_requests, create_group_chat, add_contacts
logger = logging.getLogger(__name__)
async def idle_relay():
async def idle_relay(consumers: int = 4):
# 1 relay node alice
# 100 relay nodes - friends
# friends have accepted contact request with alice, who accepted it
@@ -24,15 +25,20 @@ async def idle_relay():
alice = "status-backend-relay-0"
friends = [key for key in relay_nodes.keys() if key != alice]
requests_made = await send_friend_requests(relay_nodes, [alice], friends, 1)
logger.info("Accepting friend requests")
delays = await accept_friend_requests(relay_nodes, requests_made)
# TODO: These delays include the accumulation of intermediate_delays, they are not accurate.
# Intermediate delay is needed to not saturate status node, otherwise request don't arrive.
# TODO: We should merge send and receive operations in asynq queues as well
results_queue: asyncio.Queue[CollectedItem | None] = asyncio.Queue()
finished_evt = asyncio.Event()
send_task = asyncio.create_task(send_friend_requests(relay_nodes, results_queue, [alice], friends, finished_evt))
accept_task = asyncio.create_task(accept_friend_requests(relay_nodes, results_queue, consumers))
cleanup_task = asyncio.create_task(cleanup_queue_on_event(finished_evt, results_queue, 4))
_, delays_queue, _ = await asyncio.gather(send_task, accept_task, cleanup_task)
delays: list[float] = []
while not delays_queue.empty():
delays.append(delays_queue.get_nowait())
logger.info(f"Delays are: {delays}")
logger.info("Waiting 30 seconds")
await asyncio.sleep(30)
logger.info("Shutting down node connections")
await asyncio.gather(*[node.shutdown() for node in relay_nodes.values()])

10
src/dataclasses.py Normal file
View File

@@ -0,0 +1,10 @@
# Python Imports
from dataclasses import dataclass
@dataclass
class ResultEntry:
sender: str
receiver: str
timestamp: int
result: str

View File

@@ -5,19 +5,23 @@ import random
import string
import time
from functools import partial
from typing import Tuple, List
from typing import List
# Project Imports
from src.async_utils import launch_workers, collect_results, RequestResult
from src.async_utils import launch_workers, collect_results_from_tasks, TaskResult, CollectedItem, \
function_on_queue_item
from src.dataclasses import ResultEntry
from src.enums import MessageContentType, SignalType
from src.status_backend import StatusBackend
logger = logging.getLogger(__name__)
NodesInformation = dict[str, StatusBackend]
async def initialize_nodes_application(pod_names: list[str], wakuV2LightClient=False) -> dict[str, StatusBackend]:
async def initialize_nodes_application(pod_names: list[str], wakuV2LightClient=False) -> NodesInformation:
# We don't need a lock here because we cannot have two pods with the same name, and no other operations are done.
nodes_status: dict[str, StatusBackend] = {}
nodes_status: NodesInformation = {}
async def _init_status(pod_name: str):
try:
@@ -128,19 +132,23 @@ async def reject_community_requests(owner: StatusBackend, join_ids: list[str]):
logger.info(f"All {len(join_ids)} nodes have been rejected successfully")
async def send_friend_requests(nodes: dict[str, StatusBackend], senders: list[str], receivers: list[str],
intermediate_delay: float, max_in_flight: int = 0) -> list[RequestResult]:
async def send_friend_requests(nodes: NodesInformation,
results_queue: asyncio.Queue[CollectedItem | None],
senders: list[str], receivers: list[str],
finished_evt: asyncio.Event,
intermediate_delay: float = 1, max_in_flight: int = 0):
async def _send_friend_request(nodes: dict[str, StatusBackend], sender: str, receiver: str):
async def _send_friend_request(nodes: NodesInformation, sender: str, receiver: str):
response = await nodes[sender].wakuext_service.send_contact_request(nodes[receiver].public_key, "Friend Request")
# Get responses and filter by contact requests to obtain request ids
request_response = await get_messages_by_content_type(response, MessageContentType.CONTACT_REQUEST.value)
# Create a dict {receiver: (response_timestamp, request)}, using the first response (there is always only one friend request)
request_id = {receiver: (int(request_response[0].get("timestamp")), request_response[0].get("id"))}
# Create a ResultEntry using the first response (there is always only one friend request)
request_result = ResultEntry(sender=sender, receiver=receiver, timestamp=int(request_response[0].get("timestamp")),
result=request_response[0].get("id"))
return sender, request_id
return request_result
done_queue: asyncio.Queue[tuple[str, object]] = asyncio.Queue()
done_queue: asyncio.Queue[TaskResult | None] = asyncio.Queue()
workers_to_launch = [
partial(_send_friend_request, nodes, sender, receiver)
@@ -148,37 +156,30 @@ async def send_friend_requests(nodes: dict[str, StatusBackend], senders: list[st
for receiver in receivers
]
launcher_task = asyncio.create_task(
launch_workers(workers_to_launch, done_queue, intermediate_delay, max_in_flight)
)
collector_task = asyncio.create_task(
collect_results(done_queue, total_tasks=len(workers_to_launch))
)
collector_task = asyncio.create_task(collect_results_from_tasks(done_queue, results_queue, len(workers_to_launch), finished_evt))
launcher_task = asyncio.create_task(launch_workers(workers_to_launch, done_queue, intermediate_delay, max_in_flight))
_, collected = await asyncio.gather(launcher_task, collector_task)
logger.info(f"All {len(collected)} friend requests processed (out of {len(jobs)}).")
return collected
await asyncio.gather(launcher_task, collector_task)
async def accept_friend_requests(nodes: dict[str, StatusBackend],
requests: List[Tuple[str, dict[str, Tuple[int, str]]]]) -> List[float]:
# Flatten all tasks into a single list and execute them "concurrently"
async def _accept_friend_request(nodes: dict[str, StatusBackend], sender: str, receiver: str,
timestamp_request_id: Tuple[int, str]):
async def accept_friend_requests(nodes: dict[str, StatusBackend], results_queue: asyncio.Queue[CollectedItem | None],
consumers: int) -> asyncio.Queue[float]:
# TODO: This should be activated when the signal is received instead of getting looped
async def _accept_friend_request(queue_result: CollectedItem):
max_retries = 40
retry_interval = 2
for attempt in range(max_retries):
function_name, result_entry = queue_result
try:
_ = await nodes[receiver].wakuext_service.accept_contact_request(timestamp_request_id[1])
accepted_signal = f"@{nodes[receiver].public_key} accepted your contact request"
message = await nodes[sender].signal.find_signal_containing_string(SignalType.MESSAGES_NEW.value,
event_string=accepted_signal,
timeout=10)
return message[0] - int(timestamp_request_id[0]) // 1000 # Convert unix milliseconds to seconds
_ = await nodes[result_entry.receiver].wakuext_service.accept_contact_request(result_entry.result)
accepted_signal = f"@{nodes[result_entry.receiver].public_key} accepted your contact request"
message = await nodes[result_entry.sender].signal.find_signal_containing_string(SignalType.MESSAGES_NEW.value,
event_string=accepted_signal,
timeout=10)
return message[0] - int(result_entry.timestamp) // 1000 # Convert unix milliseconds to seconds
except Exception as e:
logging.error(f"Attempt {attempt + 1}/{max_retries} from {sender} to {receiver}: "
logging.error(f"Attempt {attempt + 1}/{max_retries} from {result_entry.sender} to {result_entry.receiver}: "
f"Unexpected error accepting friend request: {e}")
time.sleep(2)
@@ -186,17 +187,15 @@ async def accept_friend_requests(nodes: dict[str, StatusBackend],
f"Failed to accept friend request in {max_retries * retry_interval} seconds."
)
delays = await asyncio.gather(
*[
_accept_friend_request(nodes, sender, receiver, timestamp_requestid)
for sender, receivers in requests
for receiver, timestamp_requestid in receivers.items()
]
)
delays_queue: asyncio.Queue[float] = asyncio.Queue()
total_requests = sum(len(receivers) for delays, receivers in requests)
logger.info(f"All {total_requests} friend requests accepted.")
return delays
workers = [asyncio.create_task(
function_on_queue_item(results_queue, _accept_friend_request, delays_queue))
for _ in range(consumers)]
await asyncio.gather(*workers)
return delays_queue
async def add_contacts(nodes: dict[str, StatusBackend], adders: list[str], contacts: list[str]):