Send one-to-one (#24)

* Add inject_messages_one_to_one

* Add send_one_to_one_message scenario
This commit is contained in:
Alberto Soutullo
2025-09-26 20:56:50 +02:00
committed by GitHub
parent 375eac575d
commit 4eaabd153a
3 changed files with 66 additions and 0 deletions

View File

@@ -6,6 +6,7 @@ import random
# Project Imports
import src.logger
from src import kube_utils, setup_status
from src.inject_messages import inject_messages_one_to_one
from src.setup_status import initialize_nodes_application, send_friend_requests, accept_friend_requests, \
decline_friend_requests
@@ -103,3 +104,28 @@ async def contact_request():
await asyncio.gather(*[node.shutdown() for node in relay_nodes.values()],
*[node.shutdown() for node in light_nodes.values()])
logger.info("Finished contact_request")
async def send_one_to_one_message():
# 50 sending nodes
# 50 receiving nodes
# 50 idle nodes
# Each sending node has sent a contact request to a receiving node, that accepted it
# -> Sending nodes send one text message per 10 sec
kube_utils.setup_kubernetes_client()
backend_relay_pods = kube_utils.get_pods("status-backend-relay", "status-go-test")
relay_nodes = await initialize_nodes_application(backend_relay_pods)
backend_relay_pods = [pod_name.split(".")[0] for pod_name in backend_relay_pods]
senders = backend_relay_pods[:50]
receiving = backend_relay_pods[50:100]
friend_requests = await send_friend_requests(relay_nodes, senders, receiving)
_ = await accept_friend_requests(relay_nodes, friend_requests)
await asyncio.gather(*[inject_messages_one_to_one(relay_nodes[senders[i]], 10, relay_nodes[receiving[i]].public_key, 60) for i in range(50)])
logger.info("Shutting down node connections")
await asyncio.gather(*[node.shutdown() for node in relay_nodes.values()])
logger.info("Finished send_one_to_one_message")

View File

@@ -25,3 +25,23 @@ async def inject_messages(pod: StatusBackend, delay_between_message: float, chat
await asyncio.sleep(1)
logger.info(f"Finished sending {num_messages} messages")
# TODO merge in same function
async def inject_messages_one_to_one(pod: StatusBackend, delay_between_message: float, contact_id: str, num_messages: int):
for message_count in range(num_messages):
try:
logger.debug(f"Sending message {message_count}")
await pod.wakuext_service.send_one_to_one_message(contact_id, f"Message {message_count}")
if message_count == 0:
logger.info(f"Successfully began sending {num_messages} messages")
elif message_count % 10 == 0:
logger.debug(f"Sent {message_count} messages")
await asyncio.sleep(delay_between_message)
except AssertionError as e:
logger.error(f"Error sending message: {e}")
await asyncio.sleep(1)
logger.info(f"Finished sending {num_messages} messages")

View File

@@ -64,3 +64,23 @@ class WakuextAsyncService(AsyncService):
params = [{"id": request_id}]
json_response = await self.rpc_request("declineContactRequest", params)
return json_response
async def send_one_to_one_message(self, contact_id: str, message: str):
params = [{"id": contact_id, "message": message}]
json_response = await self.rpc_request("sendOneToOneMessage", params)
return json_response
async def create_group_chat_with_members(self, pubkey_list: list, group_chat_name: str):
params = [None, group_chat_name, pubkey_list]
json_response = await self.rpc_request("createGroupChatWithMembers", params)
return json_response
async def send_group_chat_message(self, group_id: str, message: str):
params = [{"id": group_id, "message": message}]
json_response = await self.rpc_request("sendGroupChatMessage", params)
return json_response
async def add_contact(self, contact_id: str, displayName: str):
params = [{"id": contact_id, "nickname": "fake_nickname", "displayName": displayName, "ensName": ""}]
json_response = await self.rpc_request("addContact", params)
return json_response