mirror of
https://github.com/vacp2p/status-benchmarks.git
synced 2026-01-08 22:17:57 -05:00
Minor improvements (#20)
* Improve find_signal_containing_string by retrieving data from the buffer instead. * Delete olf function * Add helper functions for retrieving data in a message * Fix error in reject_community_requests
This commit is contained in:
@@ -79,13 +79,13 @@ async def accept_community_requests(node_owner: StatusBackend, join_ids: list[st
|
||||
response = await node.wakuext_service.accept_request_to_join_community(join_id)
|
||||
# We need to find the correspondant community of the join_id. We retrieve first chat because should be
|
||||
# the only one. We do this because there can be several communities if we reuse the node.
|
||||
# TODO why it returns the information of all communities?
|
||||
if response.get("result"):
|
||||
for request in response.get("result").get("requestsToJoinCommunity"):
|
||||
if request.get("id") == join_id:
|
||||
for community in response.get("result").get("communities"):
|
||||
if community.get("id") == request.get("communityId"):
|
||||
return list(community.get("chats").keys())[0]
|
||||
# TODO why it returns the information of all communities? Getting the chat this way seems weird
|
||||
msgs = await get_messages_by_message_type(response, "requestsToJoinCommunity", join_id)
|
||||
for community in response.get("result").get("communities"):
|
||||
# We always have one msg
|
||||
if community.get("id") == msgs[0].get("communityId"):
|
||||
# We always have one chat
|
||||
return list(community.get("chats").keys())[0]
|
||||
except Exception as e:
|
||||
logging.error(f"Attempt {attempt + 1}/{max_retries}: Unexpected error: {e}")
|
||||
time.sleep(retry_interval)
|
||||
@@ -107,24 +107,42 @@ async def reject_community_requests(owner: StatusBackend, join_ids: list[str]):
|
||||
|
||||
for attempt in range(max_retries):
|
||||
try:
|
||||
response = await node.wakuext_service.request_to_join_community(join_id)
|
||||
# We need to find the correspondant community of the join_id. We retrieve first chat because should be
|
||||
# the only one. We do this because there can be several communities if we reuse the node.
|
||||
# TODO why it returns the information of all communities?
|
||||
if response.get("result"):
|
||||
for request in response.get("result").get("requestsToJoinCommunity"):
|
||||
if request.get("id") == join_id:
|
||||
for community in response.get("result").get("communities"):
|
||||
if community.get("id") == request.get("communityId"):
|
||||
return list(community.get("chats").keys())[0]
|
||||
except Exception as e:
|
||||
response = await node.wakuext_service.decline_request_to_join_community(join_id)
|
||||
return response # TODO do we want this
|
||||
except AssertionError as e:
|
||||
logging.error(f"Attempt {attempt + 1}/{max_retries}: Unexpected error: {e}")
|
||||
time.sleep(retry_interval)
|
||||
|
||||
raise Exception(
|
||||
f"Failed to accept request to join community in {max_retries * retry_interval} seconds."
|
||||
f"Failed to reject community request in {max_retries * retry_interval} seconds."
|
||||
)
|
||||
|
||||
_ = await asyncio.gather(*[_reject_community_request(owner, join_id) for join_id in join_ids])
|
||||
|
||||
logger.info(f"All {len(join_ids)} nodes have been rejected successfully")
|
||||
|
||||
|
||||
async def get_messages_by_content_type(response: dict, content_type: str, message_pattern: str="") -> list[dict]:
|
||||
matched_messages = []
|
||||
messages = response.get("result", {}).get("messages", [])
|
||||
for message in messages:
|
||||
if message.get("contentType") != content_type:
|
||||
continue
|
||||
if not message_pattern or message_pattern in str(message):
|
||||
matched_messages.append(message)
|
||||
if matched_messages:
|
||||
return matched_messages
|
||||
else:
|
||||
raise ValueError(f"Failed to find a message with contentType '{content_type}' in response")
|
||||
|
||||
|
||||
async def get_messages_by_message_type(response: dict, message_type: str = "messages", message_pattern: str="") -> list[dict]:
|
||||
matched_messages = []
|
||||
messages = response.get("result", {}).get(message_type, [])
|
||||
for message in messages:
|
||||
if not message_pattern or message_pattern in str(message):
|
||||
matched_messages.append(message)
|
||||
if matched_messages:
|
||||
return matched_messages
|
||||
else:
|
||||
raise ValueError(f"Failed to find a message with message type '{message_type}' in response")
|
||||
|
||||
@@ -128,15 +128,24 @@ class AsyncSignalClient:
|
||||
async def wait_for_logout(self) -> dict:
|
||||
return await self.wait_for_signal(SignalType.NODE_LOGOUT.value)
|
||||
|
||||
async def find_signal_containing_string(self, signal_type: str, event_string: str, timeout=20) -> Optional[dict]:
|
||||
start_time = asyncio.get_event_loop().time()
|
||||
|
||||
async def find_signal_containing_string(self, signal_type: str, event_string: str, timeout: int = 20) \
|
||||
-> Optional[dict]:
|
||||
if signal_type not in self.signal_queues:
|
||||
raise ValueError(f"Signal type {signal_type} is not in the list of awaited signals")
|
||||
|
||||
queue = self.signal_queues[signal_type]
|
||||
end_time = asyncio.get_event_loop().time() + timeout
|
||||
|
||||
while True:
|
||||
try:
|
||||
signal = await asyncio.wait_for(self.signal_queues[signal_type].get(), timeout)
|
||||
for signal in queue.recent():
|
||||
if event_string in json.dumps(signal):
|
||||
logger.info(f"Found {signal_type} containing '{event_string}'")
|
||||
# Remove the found signal from the buffer
|
||||
queue.buffer.remove(signal)
|
||||
logger.info(f"Found {signal_type} containing '{event_string}' in buffer")
|
||||
return signal
|
||||
except asyncio.TimeoutError:
|
||||
raise TimeoutError(
|
||||
f"Signal {signal_type} containing '{event_string}' not received in {timeout} seconds"
|
||||
)
|
||||
|
||||
if asyncio.get_event_loop().time() > end_time:
|
||||
raise TimeoutError(f"{signal_type} containing '{event_string}' not found in {timeout} seconds")
|
||||
|
||||
await asyncio.sleep(0.2)
|
||||
|
||||
Reference in New Issue
Block a user