Filter lightpush (#43)

* Change get enr container to use rest API and add service selection

* Add protocols selection, port and sharding to traffic script

* Improve victoria query

* Add function to check filter messages from get-filter-messages container

* Filter lightpush-servers until they don't have propper logging

* Create docker image to get addresses instead of enrs

* Create docker image to get messages from filter nodes

* Fix typo in comment

* Mark filter and lightpush as done
This commit is contained in:
Alberto Soutullo
2024-11-04 09:22:08 +01:00
committed by GitHub
parent f51ad0b10f
commit 17a6efd121
10 changed files with 275 additions and 42 deletions

View File

@@ -105,8 +105,8 @@ Inside each plot, we will have as many subplots as metrics in `data`.
### Secondary objectives
- [X] Add QoS parameter support to the 10k tool
- [ ] Run further Waku protocols:
- [ ] Filter
- [ ] Lightpush
- [X] Filter
- [X] Lightpush
- [X] Store
- [ ] Peer exchange

View File

@@ -0,0 +1,13 @@
FROM alpine:latest
# Update the package repository and install bind-tools and wget
RUN apk update && \
apk add bind-tools bash curl && \
# Create /app directory
mkdir /app
ADD getaddress.sh /app/getaddress.sh
RUN chmod +x /app/getaddress.sh
ENTRYPOINT ["/app/getaddress.sh"]

View File

@@ -0,0 +1,56 @@
#!/bin/bash
# Number of addrs to process, default to 1 if not specified
num_addrs=${1:-1}
# Service name to query, default to "zerotesting-bootstrap.zerotesting" if not specified
service_name=${2:-zerotesting-bootstrap.zerotesting}
# Find the IPv4 IPs of "zerotesting-bootstrap.zerotesting" using nslookup
readarray -t pod_ips < <(nslookup "$service_name" | awk '/^Address: / { print $2 }' | head -n "$num_addrs")
# Prepare the directory for addrs data
mkdir -p /etc/addrs
addrs_file="/etc/addrs/addrs.env"
> "$addrs_file" # Clear the file to start fresh
# Function to validate addrs
validate_addrs() {
if [[ $1 =~ ^/ip ]]; then
return 0 # Valid
else
return 1 # Invalid
fi
}
# Counter for valid addrs
valid_addrs_count=0
# Get and validate the addrs data from up to the specified number of IPs
for pod_ip in "${pod_ips[@]}"; do
echo "Querying IP: $pod_ip"
addrs=$(curl -X GET "http://$pod_ip:8645/debug/v1/info" -H "accept: application/json" | sed -n 's/.*"listenAddresses":\["\([^"]*\)".*/\1/p')
# Validate the addrs
validate_addrs "$addrs"
if [ $? -eq 0 ]; then
# Save the valid addrs to the file
((valid_addrs_count++))
echo "export addrs$valid_addrs_count='$addrs'" >> "$addrs_file"
if [ $valid_addrs_count -eq "$num_addrs" ]; then
break # Exit loop after the specified number of valid addrs
fi
else
echo "Invalid addrs data received from IP $pod_ip"
fi
done
# Check if we got at least one valid addrs
if [ $valid_addrs_count -eq 0 ]; then
echo "No valid addrs data received from any IPs"
exit 1
fi
# Output for debugging
echo "addrs data saved successfully:"
cat "$addrs_file"

View File

@@ -2,7 +2,7 @@ FROM alpine:latest
# Update the package repository and install bind-tools and wget
RUN apk update && \
apk add bind-tools wget bash && \
apk add bind-tools bash curl && \
# Create /app directory
mkdir /app

View File

@@ -3,8 +3,11 @@
# Number of ENRs to process, default to 3 if not specified
num_enrs=${1:-3}
# Service name to query, default to "zerotesting-bootstrap.zerotesting" if not specified
service_name=${2:-zerotesting-bootstrap.zerotesting}
# Find the IPv4 IPs of "zerotesting-bootstrap.zerotesting" using nslookup
readarray -t pod_ips < <(nslookup zerotesting-bootstrap.zerotesting | awk '/^Address: / { print $2 }' | head -n "$num_enrs")
readarray -t pod_ips < <(nslookup "$service_name" | awk '/^Address: / { print $2 }' | head -n "$num_enrs")
# Shuffle the IPs before processing them to help randomise which nodes we connect to and peer with
# Disabled for now
@@ -30,7 +33,7 @@ valid_enr_count=0
# Get and validate the ENR data from up to the specified number of IPs
for pod_ip in "${pod_ips[@]}"; do
echo "Querying IP: $pod_ip"
enr=$(wget -O - --post-data='{"jsonrpc":"2.0","method":"get_waku_v2_debug_v1_info","params":[],"id":1}' --header='Content-Type:application/json' "$pod_ip:8545" 2>/dev/null | sed -n 's/.*"enrUri":"\([^"]*\)".*/\1/p')
enr=$(curl -X GET "http://$pod_ip:8645/debug/v1/info" -H "accept: application/json" | sed -n 's/.*"enrUri":"\([^"]*\)".*/\1/p')
# Validate the ENR
validate_enr "$enr"

View File

@@ -0,0 +1,5 @@
FROM python:3.11.9-alpine
ADD filter_msg_retriever.py /app/filter_msg_retriever.py
RUN pip install requests

View File

@@ -0,0 +1,91 @@
# Python Imports
import argparse
import logging
import urllib
import requests
import socket
import time
from concurrent.futures import ProcessPoolExecutor, as_completed
from typing import Dict, List, Optional
logging.basicConfig(level=logging.INFO)
def resolve_dns(address: str) -> str:
start_time = time.time()
name, port = address.split(":")
ip_address = socket.gethostbyname(name)
elapsed = (time.time() - start_time) * 1000
logging.info(f"{address} DNS Response took {elapsed} ms")
logging.info(f"Talking with {address}, ip address: {ip_address}")
return f"{ip_address}:{port}"
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Waku filter message retriever")
parser.add_argument('-c', '--contentTopic', type=str, help='Content topic', default="/my-app/1/dst/proto")
parser.add_argument('-n', '--numNodes', type=int, help='Number of filter nodes to get messages from', default=1)
return parser.parse_args()
def fetch_all_messages(base_url: str, headers: Dict, address: str) -> Optional[List[str]]:
response = requests.get(base_url, headers=headers)
if response.status_code != 200:
logging.error(f"Error fetching data: {response.status_code}")
logging.error(response.text)
return None
data = response.json()
messages = [message_data['payload'] for message_data in data]
logging.info(f"Retrieved {len(messages)} messages from {address}")
return messages
def process_node_messages(address: str, content_topic: str) -> Optional[List[str]]:
node_ip = resolve_dns(address)
content_topic = urllib.parse.quote(content_topic, safe='')
url = f"http://{node_ip}/filter/v2/messages/{content_topic}"
logging.debug(f"Query to {url}")
headers = {"accept": "text/plain"}
return fetch_all_messages(url, headers, address)
def main():
args = parse_args()
args_dict = vars(args)
logging.info(f"Arguments: {args_dict}")
hostname = "filter"
port = "8645"
addresses = [f"{hostname}-{i}:{port}" for i in range(args.numNodes)]
content_topic = args.contentTopic
all_messages = []
with ProcessPoolExecutor() as executor:
futures = [executor.submit(process_node_messages, address, content_topic) for address in addresses]
for future in as_completed(futures):
result = future.result()
if result:
all_messages.extend(result)
if len(all_messages) > 1:
it = iter(all_messages)
len_messages = len(next(it))
if not all(len(_) == len_messages for _ in it):
print("False")
else:
print("True")
elif len(all_messages) == 1:
print("True")
else:
print("False")
if __name__ == "__main__":
main()

View File

@@ -1,3 +1,4 @@
import random
import time
import os
import base64
@@ -6,32 +7,73 @@ import asyncio
import aiohttp
import argparse
import socket
import logging
logging.basicConfig(level=logging.INFO)
async def check_dns_time(node: str) -> str:
async def check_dns_time(service: str) -> (str, str):
start_time = time.time()
name, port = node.split(":")
ip_address = socket.gethostbyname(name)
ip_address = socket.gethostbyname(service)
elapsed = (time.time() - start_time) * 1000
print(f"{name} DNS Response took {elapsed} ms")
return f"{ip_address}:{port}"
entire_hostname = socket.gethostbyaddr(ip_address)
hostname = entire_hostname[0].split('.')[0]
logging.info(f'{service} DNS Response took {elapsed} ms. Resolved to {hostname}.')
return f'{ip_address}', hostname
async def send_waku_msg(node: str, kbytes: int, pubsub_topic: str, content_topic: str, debug: bool,
stats: dict, i: int):
payload = base64.b64encode(os.urandom(kbytes * 1000)).decode('ascii').rstrip("=")
# print("Message size:", len(payload) * 3 / 4 / 1000, "KBytes")
body = {"payload": payload, "contentTopic": content_topic, "version": 1}
topic = urllib.parse.quote(pubsub_topic, safe='')
node = await check_dns_time(node) if debug else node
url = f"http://{node}/relay/v1/messages/{topic}"
# print(f"Waku REST API: {url} PubSubTopic: {pubsub_topic}, ContentTopic: {content_topic}")
print(f"Message {i} sent to {node} at {time.strftime('%H:%M:%S')}")
async def get_node_shard(ip_port: str) -> str:
ip, port = ip_port.split(':')
hostname = socket.gethostbyaddr(ip)
node_shard = hostname[0].split('.')[0].split('-')[1]
logging.info(f'Using shard {node_shard}')
return node_shard
async def send_to_relay(args: argparse.Namespace):
node_address, node_hostname = await check_dns_time('zerotesting-service') if args.debug \
else socket.gethostbyname('zerotesting-service')
node_shard = await get_node_shard(node_address) if args.shards > 1 else '0'
topic = urllib.parse.quote(args.pubsub_topic + node_shard, safe='')
url = f'http://{node_address}:{args.port}/relay/v1/messages/{topic}'
payload = base64.b64encode(os.urandom(args.msg_size_kbytes * 1000)).decode('ascii').rstrip("=")
headers = {'Content-Type': 'application/json'}
body = {'payload': payload, 'contentTopic': args.content_topic, 'version': 1}
return url, headers, body, node_hostname
async def send_to_lightpush(args: argparse.Namespace):
node_address, node_hostname = await check_dns_time('zerotesting-lightpush-client') if args.debug \
else socket.gethostbyname('zerotesting-lightpush-client')
url = f'http://{node_address}:{args.port}/lightpush/v1/message'
payload = base64.b64encode(os.urandom(args.msg_size_kbytes * 1000)).decode('ascii').rstrip("=")
headers = {'Content-Type': 'application/json', 'Accept': 'application/json'}
# TODO HANDLE BETTER THIS SHARD
body = {'pubsubTopic': args.pubsub_topic+'0', 'message': {'payload': payload, 'contentTopic': args.content_topic,
'version': 1}}
return url, headers, body, node_hostname
service_dispatcher = {
'relay': send_to_relay,
'lightpush': send_to_lightpush
}
async def send_waku_msg(args: argparse.Namespace, stats: dict, i: int):
protocol = random.choice(args.protocols)
protocol_function = service_dispatcher[protocol]
url, headers, body, node_hostname = await protocol_function(args)
logging.info(f"Message {i+1} sent at {time.strftime('%H:%M:%S')}")
start_time = time.time()
try:
async with aiohttp.ClientSession() as session:
async with session.post(url, json=body,
headers={'Content-Type': 'application/json'}) as response:
async with session.post(url, json=body, headers=headers) as response:
elapsed_time = (time.time() - start_time) * 1000
if response.status == 200:
stats['success'] += 1
@@ -39,8 +81,10 @@ async def send_waku_msg(node: str, kbytes: int, pubsub_topic: str, content_topic
stats['failure'] += 1
stats['total'] += 1
success_rate = (stats['success'] / stats['total']) * 100 if stats['total'] > 0 else 0
print(
f"Response from message {i} sent to {node}: status:{response.status}, Time: [{elapsed_time:.4f} ms], "
response_text = await response.text()
logging.info(
f"Response from message {i+1} sent to {node_hostname} status:{response.status}, {response_text}, "
f"Time: [{elapsed_time:.4f} ms], "
f"Success: {stats['success']}, Failure: {stats['failure']}, "
f"Success Rate: {success_rate:.2f}%")
except Exception as e:
@@ -48,50 +92,52 @@ async def send_waku_msg(node: str, kbytes: int, pubsub_topic: str, content_topic
stats['failure'] += 1
stats['total'] += 1
success_rate = (stats['success'] / stats['total']) * 100 if stats['total'] > 0 else 0
print(
f"Exception during message {i} sent to {node}: {str(e)}, Time: [{elapsed_time:.4f} ms], "
logging.info(
f"Exception during message {i} sent to {node_hostname} : {str(e)}, Time: [{elapsed_time:.4f} ms], "
f"Success: {stats['success']}, Failure: {stats['failure']}, "
f"Success Rate: {success_rate:.2f}%")
async def inject_message(background_tasks: set, args: argparse.Namespace, node: str, stats: dict,
i: int):
task = asyncio.create_task(
send_waku_msg(node, args.msg_size_kbytes, args.pubsub_topic, args.content_topic, args.debug,
stats, i))
async def inject_message(background_tasks: set, args: argparse.Namespace, stats: dict, i: int):
task = asyncio.create_task(send_waku_msg(args, stats, i))
background_tasks.add(task)
task.add_done_callback(background_tasks.discard)
async def main(service: str, args: argparse.Namespace):
async def main(args: argparse.Namespace):
background_tasks = set()
stats = {'success': 0, 'failure': 0, 'total': 0}
for i in range(args.messages):
await inject_message(background_tasks, args, service, stats, i+1)
await inject_message(background_tasks, args, stats, i)
await asyncio.sleep(args.delay_seconds)
await asyncio.gather(*background_tasks)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Waku message sender")
parser = argparse.ArgumentParser(description="Waku message injector")
parser.add_argument('--debug', action='store_true', help='Show DNS resolve times')
parser.add_argument('-c', '--content-topic', type=str, help='Content topic', default="kubekube")
parser.add_argument('-p', '--pubsub-topic', type=str, help='Pubsub topic',
default="/waku/2/kubetopic")
parser.add_argument('-pt', '--pubsub-topic', type=str, help='Pubsub topic',
default="/waku/2/rs/2/")
parser.add_argument('-ct', '--content-topic', type=str, help='Content topic',
default="/my-app/1/dst/proto")
parser.add_argument('-s', '--msg-size-kbytes', type=int, help='Message size in kBytes',
default=10)
parser.add_argument('-d', '--delay-seconds', type=float, help='Delay between messages',
default=1)
parser.add_argument('-m', '--messages', type=int, help='Number of messages to inject',
default=10)
parser.add_argument('-sh', '--shards', type=int, help='Number of shards',
default=1)
parser.add_argument('-ps', '--protocols', nargs='+', default=['relay'],
required=True, help='Protocols used inject messages')
parser.add_argument('-p', '--port', type=int, default=8645, help='Waku REST port')
return parser.parse_args()
if __name__ == "__main__":
args = parse_args()
service = "zerotesting-service:8645"
print(f"Starting message injection to {service}. {args}")
asyncio.run(main(service, args))
logging.info(f"{args}")
asyncio.run(main(args))

View File

@@ -39,6 +39,8 @@ class WakuTracer(MessageTracer):
def trace(self, parsed_logs: List) -> List[pd.DataFrame]:
dfs = [trace(parsed_logs[i]) for i, trace in enumerate(self._tracings) if trace is not None]
logger.warning("Filtering pods that are not 'nodes' (relay)")
dfs[0] = dfs[0][dfs[0]['pod-name'].str.startswith('nodes')]
return dfs

View File

@@ -63,7 +63,7 @@ class WakuMessageLogAnalyzer:
victoria_config = {"url": "https://vmselect.riff.cc/select/logsql/query",
"headers": {"Content-Type": "application/json"},
"params": {
"query": f"kubernetes_container_name:waku AND 'my_peer_id=16U*{peer_id}' AND _time:{self._timestamp} | limit 1"}}
"query": f"kubernetes_pod_name:nodes AND kubernetes_container_name:waku AND 'my_peer_id=16U*{peer_id}' AND _time:{self._timestamp} | limit 1"}}
reader = VictoriaReader(victoria_config, None)
result = reader.single_query_info()
@@ -219,7 +219,7 @@ class WakuMessageLogAnalyzer:
victoria_config = {"url": "https://vmselect.riff.cc/select/logsql/query",
"headers": {"Content-Type": "application/json"},
"params": {
"query": f"kubernetes_container_name:waku AND _time:{self._timestamp} AND kubernetes_pod_name:nodes | uniq by (kubernetes_pod_name)"}
"query": f"kubernetes_pod_name:nodes AND kubernetes_container_name:waku AND _time:{self._timestamp} | uniq by (kubernetes_pod_name)"}
}
reader = VictoriaReader(victoria_config, None)
@@ -276,6 +276,23 @@ class WakuMessageLogAnalyzer:
if result.is_ok():
logger.info(f'Messages from store saved in {result.ok_value}')
def check_filter_messages(self):
victoria_config = {"url": "https://vmselect.riff.cc/select/logsql/query",
"headers": {"Content-Type": "application/json"},
"params": {
"query": f"kubernetes_pod_name:get-filter-messages AND _time:{self._timestamp} | sort by (_time) desc | limit 1"}
}
reader = VictoriaReader(victoria_config, None)
result = reader.single_query_info()
if result.is_ok():
messages_string = result.unwrap()['_msg']
all_ok = ast.literal_eval(messages_string)
if all_ok:
logger.info("Messages from filter match in length.")
else:
logger.error("Messages from filter do not match.")
def analyze_message_timestamps(self, time_difference_threshold: int):
"""