Asynq utils (#33)

* Create async queue utils

* Change from signal queue to message list.

* Change send_friend_requests to use asynq utils

* Adjust accept_friend_requests with latest changes

* Modifications to idle_relay

* Update src/signal_client.py

Co-authored-by: PearsonWhite <PearsonWhite602@gmail.com>

* Update src/async_utils.py

Co-authored-by: PearsonWhite <PearsonWhite602@gmail.com>

* Add traceback to error

* Directly await enqueue jobs instead of do asyncio.gather. Also move collector_task to gather.

* Refactor double queue approach

* Remove double quotes

* Return complete partial object

---------

Co-authored-by: PearsonWhite <PearsonWhite602@gmail.com>
This commit is contained in:
Alberto Soutullo
2025-10-07 15:14:59 +02:00
committed by GitHub
parent a92c7cde89
commit b2e19c4747
4 changed files with 127 additions and 35 deletions

57
src/async_utils.py Normal file
View File

@@ -0,0 +1,57 @@
# Python Imports
import asyncio
import logging
import traceback
from functools import partial
from typing import List, Tuple
# Project Imports
logger = logging.getLogger(__name__)
# [(node_sender, {node_receiver: (timestamp, message)}), ...]
RequestResult = Tuple[str, dict[str, Tuple[int, str]]]
async def launch_workers(worker_tasks: List[partial], done_queue: asyncio.Queue[tuple[str, object]], intermediate_delay: float,
max_in_flight: int = 0) -> None:
sem = asyncio.Semaphore(max_in_flight) if max_in_flight > 0 else None
for worker in worker_tasks:
if sem is not None:
await sem.acquire()
# worker.args has (nodes, sender, receiver)
logger.info(f"Launching task {worker.func.__name__}: {worker.args[1:]}")
fut = asyncio.create_task(worker())
def _on_done(t: asyncio.Task, j=worker) -> None:
if sem is not None:
sem.release()
try:
result = t.result()
done_queue.put_nowait(("ok", (j, result)))
except Exception as e:
tb = "".join(traceback.format_exception(type(e), e, e.__traceback__))
done_queue.put_nowait(("err", (e, tb)))
fut.add_done_callback(_on_done)
if intermediate_delay:
await asyncio.sleep(intermediate_delay)
async def collect_results(done_q: asyncio.Queue[tuple[str, object]], total_tasks: int) -> List[RequestResult]:
collected: List[RequestResult] = []
for _ in range(total_tasks):
status, payload = await done_q.get()
if status == "ok":
partial_object, results = payload
logger.info(f"Task completed: {partial_object.func.__name__} {partial_object.args[1:]}")
collected.append(results)
else:
e, tb = payload # from the launcher callback
logger.error(f"Task failed: {e}\n{tb}", e, tb)
return collected

View File

@@ -23,8 +23,16 @@ async def idle_relay():
alice = "status-backend-relay-0"
friends = [key for key in relay_nodes.keys() if key != alice]
requests_made = await send_friend_requests(relay_nodes, [alice], friends)
_ = await accept_friend_requests(relay_nodes, requests_made)
requests_made = await send_friend_requests(relay_nodes, [alice], friends, 1)
logger.info("Accepting friend requests")
delays = await accept_friend_requests(relay_nodes, requests_made)
# TODO: These delays include the accumulation of intermediate_delays, they are not accurate.
# Intermediate delay is needed to not saturate status node, otherwise request don't arrive.
# TODO: We should merge send and receive operations in asynq queues as well
logger.info(f"Delays are: {delays}")
logger.info("Waiting 30 seconds")
await asyncio.sleep(30)
logger.info("Shutting down node connections")
await asyncio.gather(*[node.shutdown() for node in relay_nodes.values()])

View File

@@ -4,8 +4,11 @@ import logging
import random
import string
import time
from functools import partial
from typing import Tuple, List
# Project Imports
from src.async_utils import launch_workers, collect_results, RequestResult
from src.enums import MessageContentType, SignalType
from src.status_backend import StatusBackend
@@ -124,53 +127,76 @@ async def reject_community_requests(owner: StatusBackend, join_ids: list[str]):
logger.info(f"All {len(join_ids)} nodes have been rejected successfully")
async def send_friend_requests(nodes: dict[str, StatusBackend], senders: list[str], receivers: list[str]):
async def _send_friend_request(nodes: dict[str, StatusBackend], sender: str, receivers: list[str]):
# Send contact requests from sender -> receivers
responses = await asyncio.gather(*[nodes[sender].wakuext_service.send_contact_request(nodes[node].public_key, "asd") for node in receivers])
# Get responses and filter by contact requests
request_responses = await asyncio.gather(*[get_messages_by_content_type(response, MessageContentType.CONTACT_REQUEST.value) for response in responses])
# Create a dict {receiver: request}, using the first response (there is always only one friend request)
request_ids = {receiver: request_responses[i][0].get("id") for i, receiver in enumerate(receivers)}
return sender, request_ids
async def send_friend_requests(nodes: dict[str, StatusBackend], senders: list[str], receivers: list[str],
intermediate_delay: float, max_in_flight: int = 0) -> list[RequestResult]:
requests_made = await asyncio.gather(*[_send_friend_request(nodes, sender, receivers) for sender in senders])
# Returns a list of tuples like: [(sender name, {receiver: request_id, ...})]
logger.info(f"All {len(receivers)} friend requests sent.")
return requests_made
async def _send_friend_request(nodes: dict[str, StatusBackend], sender: str, receiver: str):
response = await nodes[sender].wakuext_service.send_contact_request(nodes[receiver].public_key, "Friend Request")
# Get responses and filter by contact requests to obtain request ids
request_response = await get_messages_by_content_type(response, MessageContentType.CONTACT_REQUEST.value)
# Create a dict {receiver: (response_timestamp, request)}, using the first response (there is always only one friend request)
request_id = {receiver: (int(request_response[0].get("timestamp")), request_response[0].get("id"))}
return sender, request_id
done_queue: asyncio.Queue[tuple[str, object]] = asyncio.Queue()
workers_to_launch = [
partial(_send_friend_request, nodes, sender, receiver)
for sender in senders
for receiver in receivers
]
launcher_task = asyncio.create_task(
launch_workers(workers_to_launch, done_queue, intermediate_delay, max_in_flight)
)
collector_task = asyncio.create_task(
collect_results(done_queue, total_tasks=len(workers_to_launch))
)
_, collected = await asyncio.gather(launcher_task, collector_task)
logger.info(f"All {len(collected)} friend requests processed (out of {len(jobs)}).")
return collected
async def accept_friend_requests(nodes: dict[str, StatusBackend], requests: list[(str, dict[str, str])]):
# Flatten all tasks into a single list and execute them concurrently
async def _accept_friend_request(nodes: dict[str, StatusBackend], sender: str, receiver: str, request_id: str):
async def accept_friend_requests(nodes: dict[str, StatusBackend],
requests: List[Tuple[str, dict[str, Tuple[int, str]]]]) -> List[float]:
# Flatten all tasks into a single list and execute them "concurrently"
async def _accept_friend_request(nodes: dict[str, StatusBackend], sender: str, receiver: str,
timestamp_request_id: Tuple[int, str]):
max_retries = 40
retry_interval = 0.5
retry_interval = 2
for attempt in range(max_retries):
try:
_ = await nodes[receiver].wakuext_service.accept_contact_request(request_id)
_ = await nodes[receiver].wakuext_service.accept_contact_request(timestamp_request_id[1])
accepted_signal = f"@{nodes[receiver].public_key} accepted your contact request"
signal = await nodes[sender].signal.find_signal_containing_string(SignalType.MESSAGES_NEW.value, event_string=accepted_signal)
return signal
message = await nodes[sender].signal.find_signal_containing_string(SignalType.MESSAGES_NEW.value,
event_string=accepted_signal,
timeout=10)
return message[0] - int(timestamp_request_id[0]) // 1000 # Convert unix milliseconds to seconds
except Exception as e:
logging.error(f"Attempt {attempt + 1}/{max_retries}: Unexpected error: {e}")
time.sleep(retry_interval)
logging.error(f"Attempt {attempt + 1}/{max_retries} from {sender} to {receiver}: "
f"Unexpected error accepting friend request: {e}")
time.sleep(2)
raise Exception(
f"Failed to accept friend request in {max_retries * retry_interval} seconds."
)
_ = await asyncio.gather(
delays = await asyncio.gather(
*[
_accept_friend_request(nodes, sender, receiver, request_id)
_accept_friend_request(nodes, sender, receiver, timestamp_requestid)
for sender, receivers in requests
for receiver, request_id in receivers.items()
for receiver, timestamp_requestid in receivers.items()
]
)
total_requests = sum(len(receivers) for _, receivers in requests)
total_requests = sum(len(receivers) for delays, receivers in requests)
logger.info(f"All {total_requests} friend requests accepted.")
return delays
async def add_contacts(nodes: dict[str, StatusBackend], adders: list[str], contacts: list[str]):

View File

@@ -148,8 +148,8 @@ class AsyncSignalClient:
async def wait_for_logout(self) -> dict:
return await self.wait_for_signal(SignalType.NODE_LOGOUT.value)
async def find_signal_containing_string(self, signal_type: str, event_string: str, timeout: int = 20) \
# TODO should be applied to other places
async def find_signal_containing_string(self, signal_type: str, event_string: str, timeout: int = 10) \
-> Optional[dict]:
if signal_type not in self.signal_queues:
raise ValueError(f"Signal type {signal_type} is not in the list of awaited signals")
@@ -157,13 +157,14 @@ class AsyncSignalClient:
queue = self.signal_queues[signal_type]
end_time = asyncio.get_event_loop().time() + timeout
# TODO MAKE THIS TO BE TRIGGERED AUTOMATICALLY WHEN MESSAGE APPEARS
while True:
for signal in queue.recent():
if event_string in json.dumps(signal):
for message in queue.messages:
if event_string in message[1]:
# Remove the found signal from the buffer
queue.buffer.remove(signal)
logger.info(f"Found {signal_type} containing '{event_string}' in buffer")
return signal
# queue.buffer.remove(signal)
logger.info(f"Found {signal_type} containing '{event_string}' in messages")
return message
if asyncio.get_event_loop().time() > end_time:
raise TimeoutError(f"{signal_type} containing '{event_string}' not found in {timeout} seconds")