From 8ae0a1f24a86c0cf526b07fb32188477e09a706a Mon Sep 17 00:00:00 2001 From: Alberto Soutullo Date: Fri, 26 Sep 2025 16:21:45 +0200 Subject: [PATCH] Minor improvements (#20) * Improve find_signal_containing_string by retrieving data from the buffer instead. * Delete olf function * Add helper functions for retrieving data in a message * Fix error in reject_community_requests --- src/setup_status.py | 56 +++++++++++++++++++++++++++++--------------- src/signal_client.py | 27 ++++++++++++++------- 2 files changed, 55 insertions(+), 28 deletions(-) diff --git a/src/setup_status.py b/src/setup_status.py index 9eedef6..ee4fa09 100644 --- a/src/setup_status.py +++ b/src/setup_status.py @@ -79,13 +79,13 @@ async def accept_community_requests(node_owner: StatusBackend, join_ids: list[st response = await node.wakuext_service.accept_request_to_join_community(join_id) # We need to find the correspondant community of the join_id. We retrieve first chat because should be # the only one. We do this because there can be several communities if we reuse the node. - # TODO why it returns the information of all communities? - if response.get("result"): - for request in response.get("result").get("requestsToJoinCommunity"): - if request.get("id") == join_id: - for community in response.get("result").get("communities"): - if community.get("id") == request.get("communityId"): - return list(community.get("chats").keys())[0] + # TODO why it returns the information of all communities? Getting the chat this way seems weird + msgs = await get_messages_by_message_type(response, "requestsToJoinCommunity", join_id) + for community in response.get("result").get("communities"): + # We always have one msg + if community.get("id") == msgs[0].get("communityId"): + # We always have one chat + return list(community.get("chats").keys())[0] except Exception as e: logging.error(f"Attempt {attempt + 1}/{max_retries}: Unexpected error: {e}") time.sleep(retry_interval) @@ -107,24 +107,42 @@ async def reject_community_requests(owner: StatusBackend, join_ids: list[str]): for attempt in range(max_retries): try: - response = await node.wakuext_service.request_to_join_community(join_id) - # We need to find the correspondant community of the join_id. We retrieve first chat because should be - # the only one. We do this because there can be several communities if we reuse the node. - # TODO why it returns the information of all communities? - if response.get("result"): - for request in response.get("result").get("requestsToJoinCommunity"): - if request.get("id") == join_id: - for community in response.get("result").get("communities"): - if community.get("id") == request.get("communityId"): - return list(community.get("chats").keys())[0] - except Exception as e: + response = await node.wakuext_service.decline_request_to_join_community(join_id) + return response # TODO do we want this + except AssertionError as e: logging.error(f"Attempt {attempt + 1}/{max_retries}: Unexpected error: {e}") time.sleep(retry_interval) raise Exception( - f"Failed to accept request to join community in {max_retries * retry_interval} seconds." + f"Failed to reject community request in {max_retries * retry_interval} seconds." ) _ = await asyncio.gather(*[_reject_community_request(owner, join_id) for join_id in join_ids]) logger.info(f"All {len(join_ids)} nodes have been rejected successfully") + + +async def get_messages_by_content_type(response: dict, content_type: str, message_pattern: str="") -> list[dict]: + matched_messages = [] + messages = response.get("result", {}).get("messages", []) + for message in messages: + if message.get("contentType") != content_type: + continue + if not message_pattern or message_pattern in str(message): + matched_messages.append(message) + if matched_messages: + return matched_messages + else: + raise ValueError(f"Failed to find a message with contentType '{content_type}' in response") + + +async def get_messages_by_message_type(response: dict, message_type: str = "messages", message_pattern: str="") -> list[dict]: + matched_messages = [] + messages = response.get("result", {}).get(message_type, []) + for message in messages: + if not message_pattern or message_pattern in str(message): + matched_messages.append(message) + if matched_messages: + return matched_messages + else: + raise ValueError(f"Failed to find a message with message type '{message_type}' in response") diff --git a/src/signal_client.py b/src/signal_client.py index 332f9cb..3204451 100644 --- a/src/signal_client.py +++ b/src/signal_client.py @@ -128,15 +128,24 @@ class AsyncSignalClient: async def wait_for_logout(self) -> dict: return await self.wait_for_signal(SignalType.NODE_LOGOUT.value) - async def find_signal_containing_string(self, signal_type: str, event_string: str, timeout=20) -> Optional[dict]: - start_time = asyncio.get_event_loop().time() + + async def find_signal_containing_string(self, signal_type: str, event_string: str, timeout: int = 20) \ + -> Optional[dict]: + if signal_type not in self.signal_queues: + raise ValueError(f"Signal type {signal_type} is not in the list of awaited signals") + + queue = self.signal_queues[signal_type] + end_time = asyncio.get_event_loop().time() + timeout + while True: - try: - signal = await asyncio.wait_for(self.signal_queues[signal_type].get(), timeout) + for signal in queue.recent(): if event_string in json.dumps(signal): - logger.info(f"Found {signal_type} containing '{event_string}'") + # Remove the found signal from the buffer + queue.buffer.remove(signal) + logger.info(f"Found {signal_type} containing '{event_string}' in buffer") return signal - except asyncio.TimeoutError: - raise TimeoutError( - f"Signal {signal_type} containing '{event_string}' not received in {timeout} seconds" - ) + + if asyncio.get_event_loop().time() > end_time: + raise TimeoutError(f"{signal_type} containing '{event_string}' not found in {timeout} seconds") + + await asyncio.sleep(0.2)