Mixed improvements for analysis (#46)

* Delete repeated docker utility

* Add shards for filter message retriever script

* Fix default content topic for store message retriever

* Change service to query so store message retriever can query any relay node

* Reduce complexity of publisher since now shards are implicit in stateful set names

* Delete unnecessary sleeps

* Add shard to dataframe, and infer the shard from the stateful set name

* Refactor code to match changes from sharding

* Add helper function to plot message distribution time

* Improve logging when a peer missed messages by showing the message hash

* Delete unnused import

* Update example file with the newest changes
This commit is contained in:
Alberto Soutullo
2025-01-12 12:15:36 +01:00
committed by GitHub
parent e2d9b5746e
commit d76b0f9412
9 changed files with 101 additions and 162 deletions

View File

@@ -26,6 +26,7 @@ def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Waku filter message retriever")
parser.add_argument('-c', '--contentTopic', type=str, help='Content topic', default="/my-app/1/dst/proto")
parser.add_argument('-n', '--numNodes', type=int, help='Number of filter nodes to get messages from', default=1)
parser.add_argument('-s', '--numShards', type=int, help='Number of shards in the cluster', default=1)
return parser.parse_args()
@@ -59,15 +60,18 @@ def main():
args_dict = vars(args)
logging.info(f"Arguments: {args_dict}")
hostname = "filter"
hostname = "fclient"
port = "8645"
addresses = [f"{hostname}-{i}:{port}" for i in range(args.numNodes)]
content_topic = args.contentTopic
addresses = [
f"{hostname}-{shard}-{node}:{port}"
for shard in range(args.numShards)
for node in range(args.numNodes)
]
all_messages = []
with ProcessPoolExecutor() as executor:
futures = [executor.submit(process_node_messages, address, content_topic) for address in addresses]
futures = [executor.submit(process_node_messages, address, args.contentTopic) for address in addresses]
for future in as_completed(futures):
result = future.result()

View File

@@ -24,7 +24,7 @@ def resolve_dns(node: str) -> str:
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Waku storage retriever")
parser.add_argument('-c', '--contentTopics', type=str, help='Content topic', default="kubekube")
parser.add_argument('-c', '--contentTopics', type=str, help='Content topic', default="/my-app/1/dst/proto")
parser.add_argument('-p', '--pubsubTopic', type=str, help='Pubsub topic',
default="/waku/2/rs/2/0")
parser.add_argument('-ps', '--pageSize', type=int,
@@ -73,7 +73,7 @@ def main():
args_dict = vars(args)
logging.info(f"Arguments: {args_dict}")
service = "zerotesting-store:8645"
service = "zerotesting-service:8645"
node = resolve_dns(service)
url = f"http://{node}/store/v3/messages"
logging.info(f"Query to {url}")

View File

@@ -1,5 +0,0 @@
FROM python:3.11-alpine
ADD ./store_msg_retriever.py /app/store_msg_retriever.py
RUN pip install requests

View File

@@ -1,84 +0,0 @@
# Python Imports
import argparse
import logging
import requests
import socket
import time
from typing import Dict, List
logging.basicConfig(level=logging.DEBUG)
def check_dns_time(node: str) -> str:
start_time = time.time()
name, port = node.split(":")
ip_address = socket.gethostbyname(name)
elapsed = (time.time() - start_time) * 1000
logging.info(f"{name} DNS Response took {elapsed} ms")
return f"{ip_address}:{port}"
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Waku storage retriever")
parser.add_argument('-c', '--contentTopics', type=str, help='Content topic', default="kubekube")
parser.add_argument('-p', '--pubsubTopic', type=str, help='Pubsub topic',
default="/waku/2/rs/2/0")
parser.add_argument('-ps', '--pageSize', type=int,
help='Number of messages to retrieve per page', default=60)
parser.add_argument('-cs', '--cursor', type=str,
help='Cursor field intended for pagination purposes. ', default="")
return parser.parse_args()
def next_cursor(data: Dict) -> str | None:
cursor = data.get('paginationCursor')
if not cursor:
logging.info("No more messages")
return None
return cursor
def fetch_all_messages(base_url: str, initial_params: Dict, headers: Dict) -> List[str]:
all_messages = []
params = initial_params.copy()
while True:
response = requests.get(base_url, headers=headers, params=params)
if response.status_code != 200:
logging.error(f"Error fetching data: {response.status_code}")
break
data = response.json()
logging.info(data)
all_messages.extend([message['messageHash'] for message in data['messages']])
cursor = next_cursor(data)
if not cursor:
break
params["cursor"] = cursor
return all_messages
def main():
args = parse_args()
args_dict = vars(args)
logging.info(f"Arguments: {args_dict}")
service = "zerotesting-service:8645"
node = check_dns_time(service)
url = f"http://{node}/store/v3/messages"
logging.info(f"Query to {url}")
headers = {"accept": "application/json"}
messages = fetch_all_messages(url, args_dict, headers)
logging.info("List of messages")
# We do a print here, so it is easier to parse when reading from victoria logs
print(messages)
if __name__ == "__main__":
main()

View File

@@ -13,20 +13,19 @@ from typing import Tuple, Dict
logging.basicConfig(level=logging.INFO)
async def check_dns_time(service: str, num_nodes: int, num_shards: int) -> tuple[str, str, str]:
async def check_dns_time(service: str) -> tuple[str, str, str]:
start_time = time.time()
ip_address = socket.gethostbyname(service)
elapsed = (time.time() - start_time) * 1000
entire_hostname = socket.gethostbyaddr(ip_address)
hostname = entire_hostname[0].split('.')[0]
node_shard = int(int(hostname.split('-')[1]) % (num_nodes / num_shards))
node_shard = int(hostname.split('-')[1])
logging.info(f'{service} DNS Response took {elapsed} ms. Resolved to {hostname} with shard {node_shard}.')
return f'{ip_address}', hostname, f'{node_shard}'
async def send_to_relay(args: argparse.Namespace) -> Tuple[str, Dict[str, str], Dict[str, str | int], str]:
node_address, node_hostname, node_shard = await check_dns_time('zerotesting-service', args.number_nodes,
args.shards)
node_address, node_hostname, node_shard = await check_dns_time('zerotesting-service')
topic = urllib.parse.quote(args.pubsub_topic + node_shard, safe='')
url = f'http://{node_address}:{args.port}/relay/v1/messages/{topic}'
@@ -38,8 +37,7 @@ async def send_to_relay(args: argparse.Namespace) -> Tuple[str, Dict[str, str],
async def send_to_lightpush(args: argparse.Namespace) -> Tuple[str, Dict[str, str], Dict[str, dict[str, str | int]], str]:
node_address, node_hostname, shard = await check_dns_time('zerotesting-service', args.number_nodes,
args.shards)
node_address, node_hostname, shard = await check_dns_time('zerotesting-lightpush-client')
url = f'http://{node_address}:{args.port}/lightpush/v1/message'
payload = base64.b64encode(os.urandom(args.msg_size_kbytes * 1000)).decode('ascii').rstrip("=")
@@ -121,10 +119,6 @@ def parse_args() -> argparse.Namespace:
default=1)
parser.add_argument('-m', '--messages', type=int, help='Number of messages to inject',
default=10)
parser.add_argument('-sh', '--shards', type=int, help='Number of shards',
default=1)
parser.add_argument('-n', '--number-nodes', type=int,
help='Number of waku nodes. Needed with more than 1 shard')
parser.add_argument('-ps', '--protocols', nargs='+', default=['relay'],
help='Protocols used inject messages')
parser.add_argument('-p', '--port', type=int, default=8645, help='Waku REST port')
@@ -134,8 +128,5 @@ def parse_args() -> argparse.Namespace:
if __name__ == "__main__":
args = parse_args()
if args.shards > 1 and not args.number_nodes:
logging.error('Number of nodes needs to be specified if there are multiple shards')
exit(1)
logging.info(f'{args}')
asyncio.run(main(args))

View File

@@ -7,9 +7,10 @@ from src.mesh_analysis.waku_message_log_analyzer import WakuMessageLogAnalyzer
if __name__ == '__main__':
# Timestamp of the simulation
timestamp = "[2024-08-14T11:11:00, 2024-08-14T12:05:00]"
timestamp = "[2024-12-26T16:01:00, 2024-12-26T16:20:00]"
stateful_sets = ["fserver", "lpserver", "store"]
# Example of data analysis from cluster
log_analyzer = WakuMessageLogAnalyzer(2, timestamp, dump_analysis_dir='local_data/shard_tests/')
log_analyzer = WakuMessageLogAnalyzer(stateful_sets, timestamp, dump_analysis_dir='local_data/mixed_enviroment/')
# Example of data analysis from local
# log_analyzer = WakuMessageLogAnalyzer(local_folder_to_analyze='lpt_duptest_debug', dump_analysis_dir='lpt_duptest_debug/notion')

View File

@@ -2,7 +2,6 @@
import json
import logging
import re
import time
import pandas as pd
import requests
from typing import Dict, List, Optional, Iterator
@@ -62,7 +61,6 @@ class VictoriaReader:
return dfs
def single_query_info(self) -> Result[Dict, Response]:
time.sleep(10)
response = requests.post(self._config['url'], headers=self._config['headers'], params=self._config['params'])
if response.status_code != 200:
logger.error(f'Request failed with status code: {response.status_code}')
@@ -78,7 +76,6 @@ class VictoriaReader:
return Err(response)
def multi_query_info(self) -> Result[Iterator, str]:
time.sleep(10)
response = requests.post(self._config['url'], headers=self._config['headers'], params=self._config['params'])
if response.status_code != 200:
logger.error(f'Request failed with status code: {response.status_code}')

View File

@@ -8,7 +8,7 @@ from result import Ok, Err
# Project Imports
from src.mesh_analysis.tracers.message_tracer import MessageTracer
from src.utils import path_utils, file_utils
from src.utils import path_utils
logger = logging.getLogger(__name__)
@@ -39,8 +39,6 @@ class WakuTracer(MessageTracer):
def trace(self, parsed_logs: List) -> List[pd.DataFrame]:
dfs = [trace(parsed_logs[i]) for i, trace in enumerate(self._tracings) if trace is not None]
logger.warning("Filtering pods that are not 'nodes' (relay)")
dfs[0] = dfs[0][dfs[0]['pod-name'].str.startswith('nodes')]
return dfs
@@ -98,14 +96,20 @@ class WakuTracer(MessageTracer):
filtered_sums = column_sums[column_sums != unique_messages]
result_list = list(filtered_sums.items())
for result in result_list:
logger.warning(f'Peer {result[0]} {result[1]}/{unique_messages} messages received')
peer_id, count = result
missing_hashes = df[df[peer_id] == 0].index.tolist()
missing_hashes.extend(df[df[peer_id].isna()].index.tolist())
logger.warning(f'Peer {result[0]} {result[1]}/{unique_messages}: {missing_hashes}')
def check_if_msg_has_been_sent(self, peers: List, missed_messages: List, sent_df: pd.DataFrame) -> List:
messages_sent_to_peer = []
for peer in peers:
filtered_df = sent_df.loc[missed_messages]
filtered_df = filtered_df[filtered_df['receiver_peer_id'] == peer]
messages_sent_to_peer.append((peer, filtered_df))
try:
filtered_df = sent_df.loc[(slice(None), missed_messages), :]
filtered_df = filtered_df[filtered_df['receiver_peer_id'] == peer]
messages_sent_to_peer.append((peer, filtered_df))
except KeyError as _:
logger.warning(f'Message {missed_messages} has not ben sent to {peer} by any other node.')
return messages_sent_to_peer

View File

@@ -18,10 +18,10 @@ logger = logging.getLogger(__name__)
class WakuMessageLogAnalyzer:
def __init__(self, num_shards: int, timestamp_to_analyze: str = None, dump_analysis_dir: str = None,
local_folder_to_analyze: str = None):
self._num_shards = num_shards
self._num_nodes = None
def __init__(self, stateful_sets: List[str], timestamp_to_analyze: str = None,
dump_analysis_dir: str = None, local_folder_to_analyze: str = None):
self._stateful_sets = stateful_sets
self._num_nodes: List[int] = []
self._validate_analysis_location(timestamp_to_analyze, local_folder_to_analyze)
self._set_up_paths(dump_analysis_dir, local_folder_to_analyze)
self._timestamp = timestamp_to_analyze
@@ -40,15 +40,14 @@ class WakuMessageLogAnalyzer:
logger.error(result.err_value)
exit(1)
def _get_victoria_config_parallel(self, node_index: int, num_nodes: int, num_shards: int) -> Dict:
shard = int(node_index // (num_nodes / num_shards))
def _get_victoria_config_parallel(self, stateful_set_name: str, node_index: int) -> Dict:
return {"url": "https://vmselect.riff.cc/select/logsql/query",
"headers": {"Content-Type": "application/json"},
"params": [
{
"query": f"kubernetes_container_name:waku AND kubernetes_pod_name:nodes-{shard}-{node_index} AND received relay message AND _time:{self._timestamp}"},
"query": f"kubernetes_container_name:waku AND kubernetes_pod_name:{stateful_set_name}-{node_index} AND received relay message AND _time:{self._timestamp}"},
{
"query": f"kubernetes_container_name:waku AND kubernetes_pod_name:nodes-{shard}-{node_index} AND sent relay message AND _time:{self._timestamp}"}]
"query": f"kubernetes_container_name:waku AND kubernetes_pod_name:{stateful_set_name}-{node_index} AND sent relay message AND _time:{self._timestamp}"}]
}
def _get_victoria_config_single(self) -> Dict:
@@ -66,7 +65,7 @@ class WakuMessageLogAnalyzer:
victoria_config = {"url": "https://vmselect.riff.cc/select/logsql/query",
"headers": {"Content-Type": "application/json"},
"params": {
"query": f"kubernetes_pod_name:nodes AND kubernetes_container_name:waku AND 'my_peer_id=16U*{peer_id}' AND _time:{self._timestamp} | limit 1"}}
"query": f"kubernetes_container_name:waku AND 'my_peer_id=16U*{peer_id}' AND _time:{self._timestamp} | limit 1"}}
reader = VictoriaReader(victoria_config, None)
result = reader.single_query_info()
@@ -123,7 +122,7 @@ class WakuMessageLogAnalyzer:
reader = FileReader(self._local_path_to_analyze, waku_tracer)
dfs = reader.read()
has_issues = waku_tracer.has_message_reliability_issues('msg_hash', 'receiver_peer_id', dfs[0], dfs[1],
has_issues = waku_tracer.has_message_reliability_issues('shard', 'msg_hash', 'receiver_peer_id', dfs[0], dfs[1],
self._dump_analysis_dir)
return has_issues
@@ -140,34 +139,36 @@ class WakuMessageLogAnalyzer:
return has_issues
def _read_logs_for_node(self, node_index, victoria_config_func) -> List[pd.DataFrame]:
def _read_logs_for_node(self, stateful_set_name: str, node_index: int, victoria_config_func) -> List[pd.DataFrame]:
waku_tracer = WakuTracer()
waku_tracer.with_received_pattern()
waku_tracer.with_sent_pattern()
config = victoria_config_func(node_index, self._num_nodes, self._num_shards)
config = victoria_config_func(stateful_set_name, node_index)
reader = VictoriaReader(config, waku_tracer)
data = reader.read()
logger.debug(f'Nodes-{node_index} analyzed')
logger.debug(f'{stateful_set_name}-{node_index} analyzed')
return data
def _read_logs_concurrently(self) -> List[pd.DataFrame]:
dfs = []
with ProcessPoolExecutor() as executor:
futures = {executor.submit(self._read_logs_for_node, i, self._get_victoria_config_parallel): i
for i in range(self._num_nodes)}
for stateful_set_name, num_nodes_in_stateful_set in zip(self._stateful_sets, self._num_nodes):
with ProcessPoolExecutor(8) as executor:
futures = {executor.submit(self._read_logs_for_node, stateful_set_name, node_index,
self._get_victoria_config_parallel):
node_index for node_index in range(num_nodes_in_stateful_set)}
for i, future in enumerate(as_completed(futures)):
i = i + 1
try:
df = future.result()
dfs.append(df)
if i % 50 == 0:
logger.info(f'Processed {i}/{self._num_nodes} nodes')
for i, future in enumerate(as_completed(futures)):
i = i + 1
try:
df = future.result()
dfs.append(df)
if i % 50 == 0:
logger.info(f'Processed {i}/{num_nodes_in_stateful_set} nodes in stateful set <{stateful_set_name}>')
except Exception as e:
logger.error(f'Error retrieving logs for node {futures[future]}: {e}')
except Exception as e:
logger.error(f'Error retrieving logs for node {futures[future]}: {e}')
return dfs
@@ -195,7 +196,7 @@ class WakuMessageLogAnalyzer:
dfs = list(zip(*dfs))
dfs = [pd.concat(tup, axis=0) for tup in dfs]
dfs = [df.assign(shard=df['pod-name'].str.extract(r'nodes-(\d+)-').astype(int))
dfs = [df.assign(shard=df['pod-name'].str.extract(r'.*-(\d+)-').astype(int))
.set_index(['shard', 'msg_hash', 'timestamp'])
.sort_index()
for df in dfs]
@@ -221,26 +222,31 @@ class WakuMessageLogAnalyzer:
return Ok(None)
def _get_number_nodes(self) -> int:
victoria_config = {"url": "https://vmselect.riff.cc/select/logsql/query",
"headers": {"Content-Type": "application/json"},
"params": {
"query": f"kubernetes_pod_name:nodes AND kubernetes_container_name:waku AND _time:{self._timestamp} | uniq by (kubernetes_pod_name)"}
}
def _get_number_nodes(self) -> List[int]:
num_nodes_per_stateful_set = []
reader = VictoriaReader(victoria_config, None)
result = reader.multi_query_info()
if result.is_ok():
return len(list(result.ok_value))
else:
logger.error(result.err_value)
exit(1)
for stateful_set in self._stateful_sets:
victoria_config = {"url": "https://vmselect.riff.cc/select/logsql/query",
"headers": {"Content-Type": "application/json"},
"params": {
"query": f"kubernetes_container_name:waku AND kubernetes_pod_name:{stateful_set} AND _time:{self._timestamp} | uniq by (kubernetes_pod_name)"}
}
def analyze_message_logs(self, parallel=False):
reader = VictoriaReader(victoria_config, None)
result = reader.multi_query_info()
if result.is_ok():
num_nodes_per_stateful_set.append(len(list(result.ok_value)))
else:
logger.error(result.err_value)
exit(1)
return num_nodes_per_stateful_set
def analyze_message_logs(self, parallel: bool = False):
if self._timestamp is not None:
logger.info('Analyzing from server')
self._num_nodes = self._get_number_nodes()
logger.info(f'Detected {self._num_nodes} pods')
logger.info(f'Detected {self._num_nodes} pods in {self._stateful_sets}')
has_issues = self._has_issues_in_cluster_parallel() if parallel else self._has_issues_in_cluster_single()
if has_issues:
match file_utils.get_files_from_folder_path(Path(self._dump_analysis_dir), extension="csv"):
@@ -265,7 +271,7 @@ class WakuMessageLogAnalyzer:
if result.is_ok():
messages_string = result.unwrap()['_msg']
messages_list = ast.literal_eval(messages_string)
messages_list = ['0x'+base64.b64decode(msg).hex() for msg in messages_list]
messages_list = ['0x' + base64.b64decode(msg).hex() for msg in messages_list]
logger.debug(f'Messages from store: {messages_list}')
if len(self._message_hashes) != len(messages_list):
@@ -316,3 +322,28 @@ class WakuMessageLogAnalyzer:
for jump in time_jumps:
logger.info(f'{file}: {jump[0]} to {jump[1]} -> {jump[2]}')
def plot_message_distribution(self):
import matplotlib.pyplot as plt
import seaborn as sns
sns.set_theme()
df = pd.read_csv('local_data/mixed_enviroment/summary/received.csv', parse_dates=['timestamp'])
df.set_index(['shard', 'msg_hash', 'timestamp'], inplace=True)
time_ranges = df.groupby(level='msg_hash').apply(
lambda x: (x.index.get_level_values('timestamp').max() - x.index.get_level_values(
'timestamp').min()).total_seconds()
)
time_ranges_df = time_ranges.reset_index(name='time_to_reach')
plt.figure(figsize=(12, 6))
sns.boxplot(x='time_to_reach', data=time_ranges_df, color='skyblue')
plt.xlabel('Time to Reach All Nodes (seconds)')
plt.title('210 Nodes - 1msg/s - 1KB - 600 messages \n Message time distribution')
plt.savefig("distribution-mixed")
plt.show()