Moved analysis module to external repo

This commit is contained in:
Alberto Soutullo Rendo
2023-07-30 14:32:20 +02:00
parent bd6d625167
commit ca095b14b7
31 changed files with 0 additions and 2603 deletions

View File

@@ -1,32 +0,0 @@
# Create the build image
FROM python:3.11-slim AS build-image
# Create the virtualenv
RUN python -m venv /opt/venv
# Use the virtualenv
ENV PATH="/opt/venv/bin:$PATH"
# Perform the installs
COPY requirements.txt .
RUN pip install -r requirements.txt
# Create the production image
FROM python:3.11-slim AS prod-image
LABEL authors="AlbertoSoutullo"
# Copy the requisite files from the build image to the production image
COPY --from=build-image /opt/venv /opt/venv
# Copy the gennet files to the production image
WORKDIR /analysis
COPY . .
# Deploy the virtualenv in production image
ENV PATH="/opt/venv/bin:$PATH"
ENV PYTHONPATH "${PYTHONPATH}:src"
# Set the entrypoint
# `docker run -it analysis /bin/sh` vs `docker run -it --entrypoint /bin/sh analysis` ?
ENTRYPOINT ["python"]

View File

@@ -1,40 +0,0 @@
Mount:
1 - Logs folder (/simulation_data)
Run:
- `docker run --network "host" -v $(pwd)/wakurtosis_logs:/simulation_data/ --add-host=host.docker.internal:host-gateway <image> <script> -p <prometheus_port> -i <infra_type>`
Example:
- `docker run --network "host" -v $(pwd)/wakurtosis_logs:/simulation_data/ --add-host=host.docker.internal:host-gateway analysis ./src/main.py -i container-proc`
- `docker run --network "host" -v $(pwd)/wakurtosis_logs:/simulation_data/ --add-host=host.docker.internal:host-gateway analysis ./src/main.py -i cadvisor -p 123456`
To run tests:
- `docker run --network "host" -v $(pwd)/wakurtosis_logs:/simulation_data/
--add-host=host.docker.internal:host-gateway <image> -m unittest discover -s tests -p "*.py"`
## Plotting configuration
The configuration is set in `config.json`, inside "plotting" keyword.
The name of the metric should be the same metric that lives inside Prometheus. This is, any cAdvisor and Waku exposed metric.
```json
{
"plotting": {
"by_node": [
"container_cpu_load_average_10s",
"container_memory_usage_bytes",
"container_network_receive_bytes_total",
"container_network_transmit_bytes_total",
"container_fs_reads_bytes_total",
"container_fs_writes_bytes_total"
]
}
}
```
`by_node`: This means that the metric will be gathered for each node, getting the distribution of the maximum values in the entire simulation.
`by simulation`: This means that we will get an accumulated value across the entire simulation. **deprecated**

View File

@@ -1,3 +0,0 @@
docker build -t analysis .

View File

@@ -1,23 +0,0 @@
# Run analysis
#if jq -e ."plotting" >/dev/null 2>&1 "./config/${wakurtosis_config_file}"; then
# if [ "$metrics_infra" = "dstats" ]; then
# docker run --name "dstats" --network "host" -v "$(pwd)/wakurtosis_logs:/simulation_data/" --add-host=host.docker.internal:host-gateway analysis src/hproc.py dstats /simulation_data/ --config-file /simulation_data/config/config.json >/dev/null 2>&1
# docker cp dstats:/analysis/plots/ wakurtosis_logs/dstats-plots
# cd wakurtosis_logs
# ln -s dstats-plots/output-dstats-compare.pdf analysis.pdf
# cd ..
# elif [ "$metrics_infra" = "host-proc" ]; then
# docker run --name "host-proc" --network "host" -v "$(pwd)/wakurtosis_logs:/simulation_data/" --add-host=host.docker.internal:host-gateway analysis src/hproc.py host-proc /simulation_data/ --config-file /simulation_data/config/config.json >/dev/null 2>&1
# docker cp host-proc:/analysis/plots/ wakurtosis_logs/host-proc-plots
# cd wakurtosis_logs
# ln -s host-proc-plots/output-host-proc-compare.pdf analysis.pdf
# cd ..
# elif [ "$metrics_infra" = "container-proc" ]; then
# docker run --network "host" -v "$(pwd)/wakurtosis_logs:/simulation_data/" --add-host=host.docker.internal:host-gateway analysis src/main.py -i container-proc >/dev/null 2>&1
# elif [ "$metrics_infra" = "cadvisor" ]; then
# prometheus_port=$(grep "\<prometheus\>" $kurtosis_inspect | awk '{print $6}' | awk -F':' '{print $2}')
# docker run --network "host" -v "$(pwd)/wakurtosis_logs:/simulation_data/" --add-host=host.docker.internal:host-gateway analysis src/main.py -i cadvisor -p "$prometheus_port" >/dev/null 2>&1
# fi
#fi

Binary file not shown.

View File

@@ -1,163 +0,0 @@
# Python Imports
import re
import sys
import glob
from tqdm_loggable.auto import tqdm
# Project Imports
from src import analysis_logger
from src import log_parser
def update_min_max_tss(tss, min_tss, max_tss):
if tss < min_tss:
min_tss = tss
elif tss > max_tss:
max_tss = tss
return min_tss, max_tss
def get_relay_line_info(log_line):
msg_topics = re.search(r'topics="([^"]+)"', log_line).group(1)
msg_topic = re.search(r'pubsubTopic=([^ ]+)', log_line).group(1)
msg_hash = re.search(r'hash=([^ ]+)', log_line).group(1)
msg_peer_id = re.search(r'peerId=([^ ]+)', log_line).group(1)
return msg_topics, msg_topic, msg_hash, msg_peer_id
def compute_injection_times(injected_msgs_dict):
return [msg['injection_time'] for msg in injected_msgs_dict.values() if msg['status'] == 200]
def analyze_published(log_line, node_logs, msgs_dict, msg_publishTime):
msg_topics, msg_topic, msg_hash, msg_peer_id = get_relay_line_info(log_line)
node_logs[msg_peer_id]['published'].append([msg_publishTime, msg_topics, msg_topic, msg_hash])
if msg_hash not in msgs_dict:
msgs_dict[msg_hash] = {'published': [{'ts': msg_publishTime, 'peer_id': msg_peer_id}],
'received': []}
else:
msgs_dict[msg_hash]['published'].append(
{'ts': msg_publishTime, 'peer_id': msg_peer_id})
def analyze_received(log_line, node_logs, msgs_dict, msg_receivedTime):
msg_topics, msg_topic, msg_hash, msg_peer_id = get_relay_line_info(log_line)
node_logs[msg_peer_id]['received'].append([msg_receivedTime, msg_topics, msg_topic, msg_hash])
if msg_hash not in msgs_dict:
msgs_dict[msg_hash] = {'published': [], 'received': [
{'ts': msg_receivedTime, 'peer_id': msg_peer_id}]}
else:
msgs_dict[msg_hash]['received'].append(
{'ts': msg_receivedTime, 'peer_id': msg_peer_id})
def parse_lines_in_file(file, node_logs, msgs_dict, min_tss, max_tss):
for log_line in file:
if 'waku.relay' in log_line:
if 'published' in log_line:
msg_publishTime = int(re.search(r'publishTime=([\d]+)', log_line).group(1))
analyze_published(log_line, node_logs, msgs_dict, msg_publishTime)
min_tss, max_tss = update_min_max_tss(msg_publishTime, min_tss, max_tss)
elif 'received' in log_line:
msg_receivedTime = int(re.search(r'receivedTime=([\d]+)', log_line).group(1))
analyze_received(log_line, node_logs, msgs_dict, msg_receivedTime)
min_tss, max_tss = update_min_max_tss(msg_receivedTime, min_tss, max_tss)
return min_tss, max_tss
def compute_message_latencies(msgs_dict):
# Compute message latencies and propagation times throughout the network
pbar = tqdm(msgs_dict.items())
for msg_hash, msg_data in pbar:
# NOTE: Careful here as I am assuming that every message is published once ...
if len(msg_data['published']) > 1:
analysis_logger.G_LOGGER.warning('Several publishers of message %s')
published_ts = int(msg_data['published'][0]['ts'])
peer_id = msg_data['published'][0]['peer_id']
pbar.set_description('Computing latencies of message %s' % msg_hash)
# Compute latencies
latencies = []
for received_data in msg_data['received']:
# Skip self
if received_data['peer_id'] == peer_id:
analysis_logger.G_LOGGER.warning('Message %s received by the same node that published it' % msg_hash)
continue
# NOTE: We are getting some negative latencies meaning that the message appears to be received before it was sent ...
# I assume this must be because those are the nodes that got the message injected in the first place
# TLDR: Should be safe to ignore all the negative latencies
latency = int(received_data['ts']) - published_ts
peer_id = msg_data['published'][0]['peer_id']
latencies.append(latency)
msgs_dict[msg_hash]['latencies'] = latencies
def compute_propagation_times(msgs_dict):
msg_propagation_times = []
pbar = tqdm(msgs_dict.items())
for msg_hash, msg_data in pbar:
pbar.set_description('Computing propagation time of message %s' % msg_hash)
# todo check Why do we round here
# msg_propagation_times.append(round(max(msg_data['latencies']) / 1000000))
msg_propagation_times.append(max(msg_data['latencies']) / 1000000)
return msg_propagation_times
def compute_message_delivery(msgs_dict, injected_msgs_dict):
# Compute message delivery
total_messages = len(injected_msgs_dict)
delivered_messages = len(msgs_dict)
lost_messages = total_messages - delivered_messages
delivery_rate = delivered_messages * 100 / total_messages
analysis_logger.G_LOGGER.info(f'{delivered_messages} of {total_messages} messages delivered. '
f'Lost: {lost_messages}. Delivery rate {delivery_rate}')
return delivery_rate
def analyze_containers(topology, simulation_path):
node_logs = {}
msgs_dict = {}
max_tss = -sys.maxsize - 1
min_tss = sys.maxsize
for container_name, container_info in topology["containers"].items():
node_pbar = tqdm(container_info["nodes"])
node_pbar.set_description(f"Parsing log of container {container_name}")
log_parser.prepare_node_in_logs(node_pbar, topology, node_logs, container_name)
folder = glob.glob(f'{simulation_path}/{container_name}--*')
if len(folder) > 1:
raise RuntimeError(f"Error: Multiple containers with same name: {folder}")
file = log_parser.open_file(folder)
min_tss, max_tss = parse_lines_in_file(file, node_logs, msgs_dict, min_tss, max_tss)
file.close()
return node_logs, msgs_dict, min_tss, max_tss
def inject_metric_in_dict(metrics, key_name, title, y_label, metric_name, values):
metrics[key_name] = {}
metrics[key_name]["title"] = title
metrics[key_name]["y_label"] = y_label
metrics[key_name]["metric_name"] = metric_name
metrics[key_name]["values"] = values

View File

@@ -1,180 +0,0 @@
# Python Imports
import re
import sys
import json
from datetime import datetime
from typing import Any, Dict, List, Set, Optional, Tuple
# Project Imports
from src import analysis_logger
def compute_simulation_time_window(min_tss: float, max_tss: float) -> Tuple[float, float, float]:
simulation_start_ts = min_tss
simulation_end_ts = max_tss
simulation_time_ms = round((simulation_end_ts - simulation_start_ts) / 1000000)
return simulation_start_ts, simulation_end_ts, simulation_time_ms
def build_summary(config_obj: Dict[str, Any], metrics_info: Dict[str, Any], msgs_dict: Dict[str, Any],
node_logs: Dict[str, Any], topics: Set[str], min_tss: float, max_tss: float,
avg_samples_per_node: float) -> Dict[str, Any]:
simulation_summary = {'general' : {}, 'nodes' : {}, 'messages' : {}, 'parameters' : {}}
simulation_summary['general']['datetime'] = datetime.now().isoformat()
simulation_summary['general']['num_messages'] = len(msgs_dict)
simulation_summary['general']['num_nodes'] = len(node_logs)
simulation_summary['general']['num_topics'] = len(topics)
simulation_summary['general']['topics'] = list(topics)
simulation_start_ts, simulation_end_ts, simulation_time_ms = compute_simulation_time_window(min_tss, max_tss)
simulation_summary['general']['simulation_start_ts'] = simulation_start_ts
simulation_summary['general']['simulation_end_ts'] = simulation_end_ts
simulation_summary['general']['simulation_time_ms'] = simulation_time_ms
simulation_summary['general']['metrics'] = metrics_info
simulation_summary['general']['metrics']['avg_samples_per_node'] = avg_samples_per_node
simulation_summary['general']['metrics']['esr'] = simulation_summary['general']['metrics']['avg_samples_per_node'] / (simulation_summary['general']['simulation_time_ms'] / 1000.0)
simulation_summary['parameters'] = config_obj
return simulation_summary
def extract_node_id(s: str) -> str:
pattern = r"node-(\d+)\.toml"
match = re.search(pattern, s)
if match:
return f"node_{match.group(1)}"
else:
return None
def add_sample_to_metrics(sample: Dict[str, Any], node_id: str, metrics_dict: Dict[str, Dict[str, Any]]) -> int:
if node_id in metrics_dict:
metrics_dict[node_id]['samples'].append(sample)
return 0
else:
metrics_dict[node_id] = {'samples' : [sample]}
return 1
def parse_container_nodes(container_id: str, container_data: Dict[str, Any], container_nodes: Dict[int, Any], metrics_dict: Dict[Any, Dict[str, Any]]) -> int:
nodes_cnt = 0
for sample in container_data['samples']:
pid = sample['PID']
if pid not in container_nodes:
analysis_logger.G_LOGGER.error('Couldn\'t find node id for PID %d in container %s' %(pid, container_id))
continue
node_id = container_nodes[pid]
if not node_id:
analysis_logger.G_LOGGER.error('Couldn\'t find node id for PID %d in container %s' %(pid, container_id))
continue
nodes_cnt += add_sample_to_metrics(sample, node_id, metrics_dict)
return nodes_cnt
def extract_container_nodes(container_id: str, container_data: Dict[str, Any]) -> Dict[int, Any]:
container_nodes = {}
for process in container_data['info']['processes']:
node_id = extract_node_id(process['binary'])
if not node_id:
analysis_logger.G_LOGGER.error('Couldn\'t match %s to node id in container %s' %(process['binary'], container_id))
continue
pid = process['pid']
container_nodes[pid] = node_id
return container_nodes
def load_metrics_file(metrics_file_path: str) -> Dict[str, Any]:
with open(metrics_file_path, 'r') as file:
return json.load(file)
def process_metrics_file(metrics_obj: Dict[str, Any]) -> Tuple[Dict[str, Any], Optional[Dict[str, Any]]]:
metrics_dict = {}
info = metrics_obj['header']
all_samples = metrics_obj['containers']
if len(all_samples) != info['num_containers']:
analysis_logger.G_LOGGER.error('Number of containers in header does not match number of containers in samples')
return metrics_dict, info
for container_id, container_data in all_samples.items():
container_nodes = extract_container_nodes(container_id, container_data)
parse_container_nodes(container_id, container_data, container_nodes, metrics_dict)
return metrics_dict, info
def load_process_level_metrics(metrics_file_path: str) -> Tuple[Dict[str, Any], Optional[Dict[str, Any]]]:
try:
metrics_obj = load_metrics_file(metrics_file_path)
metrics_dict, info = process_metrics_file(metrics_obj)
except Exception as e:
analysis_logger.G_LOGGER.error('%s: %s' % (e.__doc__,e))
sys.exit()
analysis_logger.G_LOGGER.info('Loaded metrics for %d nodes.' %len(metrics_dict))
return metrics_dict, info
def compute_node_metrics(node_obj: Dict[str, Any]) -> Tuple[int, float, float, float, float, float, float]:
num_samples = len(node_obj['samples'])
""" Peak values """
max_cpu_usage = max(obj['CPUPercentage'] for obj in node_obj['samples'])
max_memory_usage = max(obj['MemoryUsageMB'] for obj in node_obj['samples'])
""" Accumulated """
total_rx_mbytes = max(obj['NetStats']['all']['total_received'] for obj in node_obj['samples']) / (1024*1024)
total_tx_mbytes = max(obj['NetStats']['all']['total_sent'] for obj in node_obj['samples']) / (1024*1024)
""" Accumulated """
max_disk_read_mbytes = max(obj['DiskIORChar'] for obj in node_obj['samples']) / (1024*1024)
max_disk_write_mbytes = max(obj['DiskIOWChar'] for obj in node_obj['samples']) / (1024*1024)
return num_samples, max_cpu_usage, max_memory_usage, total_rx_mbytes, total_tx_mbytes, max_disk_read_mbytes, max_disk_write_mbytes
def compute_process_level_metrics(simulation_path: str, config_obj: Dict[str, Any]) -> Tuple[Dict[str, Any], List[float], List[float], Dict[str, List[float]], Dict[str, List[float]], float]:
""" Load Metrics """
metrics_file_path = f'{simulation_path}/cproc_metrics.json'
node_metrics, metrics_info = load_process_level_metrics(metrics_file_path)
""" Compute Metrics """
max_cpu_usage = []
max_memory_usage = []
total_network_usage = {'rx_mbytes' : [], 'tx_mbytes' : []}
max_disk_usage = {'disk_read_mbytes' : [], 'disk_write_mbytes' : []}
num_samples = []
for _, node_obj in node_metrics.items():
samples, cpu, mem, rx, tx, disk_read, disk_write = compute_node_metrics(node_obj)
num_samples.append(samples)
max_cpu_usage.append(cpu)
max_memory_usage.append(mem)
total_network_usage['rx_mbytes'].append(rx)
total_network_usage['tx_mbytes'].append(tx)
max_disk_usage['disk_read_mbytes'].append(disk_read)
max_disk_usage['disk_write_mbytes'].append(disk_write)
avg_samples_per_node = sum(num_samples) / len(num_samples)
return metrics_info, max_cpu_usage, max_memory_usage, total_network_usage, max_disk_usage, avg_samples_per_node
def export_summary(simulation_path: str, summary: Dict[str, Any]) -> None:
summary_path = f'{simulation_path}/summary.json'
with open(summary_path, 'w') as fp:
json.dump(summary, fp, indent=4)
analysis_logger.G_LOGGER.info(f'Analysis summary saved in {summary_path}')

View File

@@ -1,48 +0,0 @@
# Python Imports
import sys
import logging
from tqdm_loggable.tqdm_logging import tqdm_logging
# Project Imports
from src import vars
class CustomFormatter(logging.Formatter):
# Set different formats for every logging level
time_name_stamp = "[%(asctime)s.%(msecs)03d] [" + vars.G_APP_NAME + "]"
FORMATS = {
logging.ERROR: time_name_stamp + " ERROR in %(module)s.py %(funcName)s() %(lineno)d - %(msg)s",
logging.WARNING: time_name_stamp + " WARNING - %(msg)s",
logging.CRITICAL: time_name_stamp + " CRITICAL in %(module)s.py %(funcName)s() %(lineno)d - %(msg)s",
logging.INFO: time_name_stamp + " %(msg)s",
logging.DEBUG: time_name_stamp + " %(funcName)s() %(msg)s",
'DEFAULT': time_name_stamp + " %(msg)s",
}
def format(self, record):
log_fmt = self.FORMATS.get(record.levelno, self.FORMATS['DEFAULT'])
formatter = logging.Formatter(log_fmt, '%d-%m-%Y %H:%M:%S')
return formatter.format(record)
def innit_logger():
global G_LOGGER
""" Init Logging """
G_LOGGER = logging.getLogger(vars.G_APP_NAME)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(CustomFormatter())
G_LOGGER.addHandler(handler)
tqdm_logging.set_level(logging.INFO)
# Set loglevel from config
G_LOGGER.setLevel(vars.G_LOG_LEVEL)
handler.setLevel(vars.G_LOG_LEVEL)
G_LOGGER.info('Started')
return G_LOGGER
G_LOGGER = innit_logger()

View File

@@ -1,22 +0,0 @@
# Python Imports
import argparse
# Project Imports
from src import vars
def parse_args():
""" Parse command line args """
parser = argparse.ArgumentParser()
parser.add_argument("-sp", "--simulation_path", help="Simulation results path",
default=vars.G_DEFAULT_SIMULATION_PATH)
parser.add_argument("-p", "--prometheus_port", help="Pometheus port")
parser.add_argument("-i", "--infra", help="Metrics infrastructure type")
args = parser.parse_args()
simulation_path = args.simulation_path
port = args.prometheus_port
infra_type = args.infra
return simulation_path, port, infra_type

View File

@@ -1,20 +0,0 @@
# Python Imports
# Project Imports
from src import analysis_logger
from src import analysis
from src import prometheus
from src import plotting
def run(simulation_config, metrics, topology_info, msg_propagation_times, msg_injection_times, min_tss, max_tss, prom_port):
analysis_logger.G_LOGGER.info('Generating stats for CADVISOR infrastructure ...')
analysis.inject_metric_in_dict(metrics, "propagation", "Propagation Time (per message)", "Propagation Time (ms)",
"msg_propagation_times", msg_propagation_times)
analysis.inject_metric_in_dict(metrics, "injection", "Injection Time (per message)", "Milliseconds (ms)",
"msg_injection_times", msg_injection_times)
prometheus.get_hardware_metrics(metrics, topology_info, min_tss, max_tss, prom_port)
""" Generate Figure """
plotting.plot_figure_ex(simulation_config)

View File

@@ -1,20 +0,0 @@
# Python Imports
import sys
import json
# Project Imports
from src import analysis_logger
from src import vars
def load_config(simulation_path):
config_path = f'{simulation_path}/config/config.json'
try:
with open(config_path, "r") as read_file:
config = json.load(read_file)
except Exception as e:
analysis_logger.G_LOGGER.error('%s: %s' % (e.__doc__, e))
sys.exit()
return config

View File

@@ -1,23 +0,0 @@
# Python Imports
# Project Imports
from src import analysis_logger
from src import plotting
from src import analysis_cproc
def run(simulation_config, simulation_path, msgs_dict, node_logs, msg_propagation_times, msg_injection_times, min_tss, max_tss):
analysis_logger.G_LOGGER.info('Generating stats for CPROC infrastructure ...')
metrics_info, max_cpu_usage, max_memory_usage, total_network_usage, max_disk_usage, avg_samples_per_node = analysis_cproc.compute_process_level_metrics(simulation_path, simulation_config)
""" Build simulation summary """
summary = analysis_cproc.build_summary(simulation_config, metrics_info, msgs_dict, node_logs, [], min_tss, max_tss, avg_samples_per_node)
""" Generate Figure """
plotting.plot_figure_cproc(msg_propagation_times, max_cpu_usage, max_memory_usage, total_network_usage, max_disk_usage,
msg_injection_times, summary['general'], summary['parameters'])
""" Export summary """
analysis_cproc.export_summary(simulation_path, summary)

View File

@@ -1,634 +0,0 @@
import typer
import sys
import os
import stat
import math
from pathlib import Path
import time
import json
import networkx as nx
import re
import logging as log
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages
from sklearn.cluster import KMeans
from src import vars
from src import topology
from src import log_parser
from src import analysis
# check if the path exists and is of appropriate type
def path_ok(path : Path, isDir=False):
if not path.exists():
log.error(f'"{path}" does not exist')
return False
mode = path.stat().st_mode
if not isDir and not stat.S_ISREG(mode):
log.error(f'File expected: "{path}" is not a file')
return False
if isDir and not stat.S_ISDIR(mode):
log.error(f'Directory expected: "{path}" is not a directory')
return False
# lay off the permission checks; resolve them lazily with open
return True
# define singleton
class Singleton(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]
# convert human readable sizes to bytes
class Human2BytesConverter(metaclass=Singleton):
def __init__(self): # add any human readable format/size and multiplier here
self.letters = {}
self.letters[3] = {'GiB' : 1024*1024*1024, 'MiB' : 1024*1024, 'KiB' : 1024}
self.letters[2] = {'GB' : 1024*1024*1024, 'MB' : 1024*1024, 'KB' : 1024,
'gB' : 1000*1000*1000, 'mB' : 1000*1000, 'kB' : 1000}
self.letters[1] = {'B':1}
def convert(self, value):
for i in [3, 2, 1]:
k = value[-i:]
if k in self.letters[i]:
return float(value[:-i]) * self.letters[i][k]
return np.nan
# Base class for plots and common helpers
class Plots(metaclass=Singleton):
def __init__(self, log_dir, oprefix, jf, to_plot, cfile):
self.log_dir, self.oprefix = log_dir, oprefix
self.df, self.n, self.keys, self.cols = pd.DataFrame(), 0, [], []
self.col2title, self.col2units, self.key2nodes = {}, {}, {}
self.msg_settling_times, self.msg_injection_times = {}, {}
self.grp2idx, self.idx2grp = {}, {}
self.fig, self.axes = "", ""
self.json_fname, self.G = jf, nx.empty_graph()
self.to_plot, self.to_compare = to_plot, []
self.run_summary, self.cfile = "", cfile
# waku log processing
def compute_msg_settling_times(self):
ldir = str(self.log_dir)
topology_info = topology.load_json(f'{ldir}/{vars.G_TOPOLOGY_FILE_NAME}')
topology.load_topics_into_topology(topology_info, f'{ldir}/config/topology_generated/')
injected_msgs_dict = log_parser.load_messages(ldir)
node_logs, msgs_dict, min_tss, max_tss = analysis.analyze_containers(topology_info, ldir)
simulation_time_ms = round((max_tss - min_tss) / 1000000)
log.info((f'Simulation started at {min_tss}, ended at {max_tss}. '
f'Effective simulation time was {simulation_time_ms} ms.'))
analysis.compute_message_delivery(msgs_dict, injected_msgs_dict)
analysis.compute_message_latencies(msgs_dict)
self.msg_settling_times = analysis.compute_propagation_times(msgs_dict)
self.msg_injection_times = analysis.compute_injection_times(injected_msgs_dict)
#print("message propagation_times: ", self.msg_settling_times)
def get_key(self):
return self.df.Key
def set_keys(self):
self.keys = self.df['Key'].unique()
self.keys.sort()
# set the summary for the run: to be used for plot title
def set_summary(self):
with open(self.cfile, 'r') as f: # Load config file
conf = json.load(f)
minsize = int(conf["wls"]["min_packet_size"]/1024)
maxsize = int(conf["wls"]["max_packet_size"]/1024)
self.run_summary = (f'{conf["gennet"]["num_nodes"]}by'
f'{conf["gennet"]["container_size"]}fo'
f'{conf["gennet"]["fanout"]}-'
f'{conf["wls"]["message_rate"]}mps-'
f'({minsize}-{maxsize})KiB-'
f'{conf["wls"]["simulation_time"]}sec')
print(f'summary: {self.run_summary} (from {self.cfile})')
# set the fields that go into the comparison panel
def set_compare(self, lst):
self.to_compare = lst
# extract the maximal complete sample set
def remove_incomplete_samples(self, grp, err=''):
#if not err:
self.df, minRows = self.df[~self.df.isin([err]).any(axis=1)], sys.maxsize
for cid in self.df[grp].unique():
rows = self.df[self.df[grp] == cid].shape[0]
minRows = rows if minRows > rows else minRows
self.df = self.df.groupby(grp).head(minRows)
# plot the settling times, both network- and node-wise
def plot_msg_settling_times(self):
self.set_panel_size(2, 1, False)
nmsgs = len(self.msg_settling_times)
self.fig.suptitle(f'Settling Time: {self.run_summary}, {nmsgs} msgs')
self.fig.supylabel("msecs")
self.axes[0].set_xticks([x + 1 for x in range(len(self.keys))])
#axes[0].set_xticks(ticks=[x + 1 for x in range(len(self.waku_cids))], labels=self.df["ContainerID"].unique())
self.axes[0].set_xlabel('TODO: revisit after Jordi added per-container settling times')
self.axes[1].violinplot(self.msg_settling_times, showmedians=True)
self.axes[1].axes.xaxis.set_visible(False)
plt.savefig(f'{self.oprefix}-settling-time.pdf', format="pdf", bbox_inches="tight")
#plt.show()
# set the panel params
def set_panel_size(self, m, n, shareY=False):
self.fig, self.axes = plt.subplots(m, n, layout='constrained', sharey=shareY)
self.fig.set_figwidth(12)
self.fig.set_figheight(10)
# plot col panels for selected columns
def plot_col_panels(self, agg):
for col in self.to_plot["ColPanel"]:
if col not in self.df.columns:
log.error(f"ColPanel: {col} is not in {self.df.columns}, skipping...")
continue
if col in ["CPUPerc", "MemUse"]:
self.col_panel_helper(col)
else:
self.col_panel_helper(col, agg)
# plot degree/col panels for the given set of columns
def plot_deg_col_panels(self):
for col in self.to_plot["DegColPanel"]:
if col not in self.df.columns:
log.error(f"DegColPanel: {col} is not in {self.df.columns}, skipping...")
continue
self.deg_col_panel_helper(col) # only agg for now
# plot the column panel
def col_panel_helper(self, col, agg=True):
self.set_panel_size(2, 2)
self.fig.suptitle(f'{self.col2title[col]}: {self.run_summary}')
self.fig.supylabel(self.col2units[col])
per_key_arr = []
# per docker violin plot
self.axes[0,0].ticklabel_format(style='plain')
self.axes[0,0].yaxis.grid(True)
for key in self.keys:
if agg:
tmp = self.df[self.get_key() == key][col].values
else:
tmp = self.df[self.get_key() == key][col].diff().dropna().values
per_key_arr.append(tmp)
#all_arr = np.concatenate((all_arr, tmp), axis=0)
#self.axes[0,0].set_xticks([x + 1 for x in range(len(self.keys))])
labels = [ '{}{}'.format( ' ', k) for i, k in enumerate(self.keys)]
#self.axes[0,0].set_xticklabels(labels)
legends = self.axes[0,0].violinplot(dataset=per_key_arr, showmedians=True)
#text = ""
#for key, nodes in self.key2nodes.items():
# text += f'{key} {", ".join(nodes)}\n'
#self.axes[0,0].text(0.675, 0.985, text, transform=self.axes[0,0].transAxes,
# fontsize=7, verticalalignment='top')
# consolidated violin plot
self.axes[1,0].ticklabel_format(style='plain')
self.axes[1,0].yaxis.grid(True)
self.axes[1,0].set_xlabel('All Containers')
self.axes[1,0].violinplot(self.df[col], showmedians=True)
self.axes[1,0].set_xticks([])
self.axes[1,0].axes.xaxis.set_visible(False)
# per docker scatter plot
self.axes[0,1].ticklabel_format(style='plain')
self.axes[0,1].yaxis.grid(True)
self.axes[0,1].set_xlabel('Time')
legends = []
for i, key in enumerate(self.keys):
y = per_key_arr[i]
legends.append(self.axes[0,1].scatter(x=range(0, len(y)), y=y, marker='.'))
#self.axes[0,1].legend(legends, self.keys, scatterpoints=1,
# loc='upper left', ncol=3,
# fontsize=8)
# consolidated/summed-up scatter plot
self.axes[1,1].ticklabel_format(style='plain')
self.axes[1,1].yaxis.grid(True)
self.axes[1,1].set_xlabel('Time')
out, out_avg, nkeys = [], [], len(per_key_arr)
# omit the very last measurement: could be a partial record
jindices, iindices = range (len(per_key_arr[0])-1), range(len(per_key_arr))
for j in jindices:
out.append(0.0)
for i in iindices:
out[j] += per_key_arr[i][j]
out_avg.append(out[j]/nkeys)
self.axes[1,1].plot(out, color='b')
self.axes[1,1].plot(out_avg, color='y')
self.axes[1,1].legend([f'Total {self.col2title[col]}', f'Average {self.col2title[col]}'],
loc='upper right', ncol=1, fontsize=8)
plt.savefig(f'{self.oprefix}-col-panel-{col}.pdf')
#plt.show()
# build the key2nodes: useful when $container_size$ > 1
def build_key2nodes(self):
with open(self.kinspect_fname) as f:
for line in f:
if "User Services" in line:
f.readline()
break
for line in f:
if line == "\n":
break
larray = line.split()
if "containers_" in larray[1]:
key = larray[1]
self.key2nodes[key] = [larray[2].split("libp2p-")[1].replace(':', '')]
elif "libp2p-node" in larray[0]:
self.key2nodes[key].append(larray[0].split("libp2p-")[1].replace(':', ''))
# export the df if needed
def get_df(self):
return self.df
# build indices or cluster plots
def build_cluster_index(self, grp):
lst = self.df[grp].unique()
self.grp2idx = { val : i for i, val in enumerate(lst)}
self.idx2grp = { i : val for i, val in enumerate(lst)}
self.df[f'{grp}_idx'] = self.df[grp].map(lambda x: self.grp2idx[x])
# plot the cluster panel
def cluster_plot_helper(self, grp, cols):
self.build_cluster_index(grp)
kmeans = KMeans(n_clusters=10, n_init='auto')
groups = self.df[grp].unique()
groups.sort()
xpdf = pd.DataFrame()
for g in groups:
X =self.df.loc[self.df[grp] == g][cols]
Xflat = X.values.flatten()
xpdf[g] = Xflat
labels = kmeans.fit_predict(X)
#TODO: plot better. it is not very interpretable now
self.axes[0,1].scatter(x=range(0, len(labels)), y=labels, marker='.')
#self.axes[0,1].scatter(X.iloc[:, 0], X.iloc[:, 1], c=labels, cmap='plasma')
self.axes[0,1].set_xlabel('Time')
#axis.set_yticks([x for x in range(len(groups))])
self.axes[0,1].set_yticks(range(len(groups)))
labels = ['{}{}'.format( ' ', k) for i, k in enumerate(self.keys)]
self.axes[0,1].set_yticklabels(labels)
labels = kmeans.fit_predict(xpdf)
self.axes[1,1].scatter(xpdf.iloc[:, 0], xpdf.iloc[:, 2], c=labels, cmap='plasma')
# plot the comparison panel
def plot_compare_panel(self):
self.set_panel_size(2, 3)
self.fig.suptitle(f'{self.run_summary}')
k = 0
for i in [0,1]:
for j in [0,1,2]:
col = self.to_compare[k]
#self.axes[i,j].ticklabel_format(style='plain')
self.axes[i,j].yaxis.grid(True)
pc = self.axes[i,j].violinplot(self.df[col], showmedians=True)
self.axes[i,j].set_ylabel(self.col2units[col])
self.axes[i,j].set_title(self.col2title[col])
#for p in pc['bodies']:
# p.set_facecolor('green')
# p.set_edgecolor('k')
# p.set_alpha(0.5)
k += 1
plt.savefig(f'{self.oprefix}-compare.pdf', format="pdf", bbox_inches="tight")
#plt.show()
def phase_plots_helper(self, grp, col):
pass
# build the network from json
def read_network(self):
with open(self.json_fname) as f:
js_graph = json.load(f)
for src in js_graph['nodes'].keys():
for dst in js_graph['nodes'][src]['static_nodes']:
self.G.add_edge(src, dst)
# plot the network and degree histogram
def plot_network(self):
self.set_panel_size(1, 2)
self.fig.suptitle(f'Network & Degree Distribution: {self.run_summary}')
nx.draw(self.G, ax=self.axes[0], pos=nx.kamada_kawai_layout(self.G), with_labels=True)
degree_sequence = sorted((d for n, d in self.G.degree()), reverse=True)
w = np.ones(len(degree_sequence))/len(degree_sequence)
hist, bin_edges = np.histogram(degree_sequence, weights=w, density=False)
width = (bin_edges[1] - bin_edges[0])
self.axes[1].bar(x=bin_edges[:-1], height=hist, align='center',
width=width, edgecolor='k', facecolor='green', alpha=0.5)
self.axes[1].set_xticks(range(max(degree_sequence)+1))
self.axes[1].set_title("Normalised degree histogram")
self.axes[1].set_xlabel("Degree")
self.axes[1].set_ylabel("% of Nodes")
plt.savefig(f'{self.oprefix}-network.pdf', format="pdf", bbox_inches="tight")
#plt.show()
# plot the degree col panel
def deg_col_panel_helper(self, col):
self.set_panel_size(1, 2, shareY=True)
self.fig.suptitle(f'Conditional/Total Normalised Histograms for {self.col2title[col]} : {self.run_summary}')
self.fig.supylabel(self.col2units[col])
degree_sequence = sorted((d for n, d in self.G.degree()), reverse=True)
x, y = np.unique(degree_sequence, return_counts=True)
by_degree = [[] for i in range(x[-1]+1)]
for node, degree in self.G.degree():
curr = self.df[self.df.NodeName == node][col].values
if len(by_degree[degree]) == 0 :
by_degree[degree]=self.df[self.df.NodeName == node][col].values
else :
np.append(by_degree[degree], self.df[self.df.NodeName == node][col].values)
legends = []
for d in by_degree:
if len(d) == 0:
continue
w = np.ones(len(d))/len(d)
hist, bin_edges = np.histogram(d, weights=w, density=False)
width = (bin_edges[1] - bin_edges[0])
legends.append(self.axes[0].bar(x=bin_edges[:-1], height=hist, align='center',
width=width, edgecolor='k', alpha=0.75))
self.axes[0].legend(legends, x, scatterpoints=1,
loc='upper left', ncol=5,
fontsize=8)
self.axes[0].set_title("Conditional Histogram (degree)")
d = self.df[col]
w = np.ones(len(d))/len(d)
hist, bin_edges = np.histogram(d, weights=w, density=False)
width = (bin_edges[1] - bin_edges[0])
self.axes[1].bar(x=bin_edges[:-1], height=hist, align='center',
width=width, edgecolor='k', facecolor='green', alpha=0.5)
self.axes[1].set_title("Total Histogram")
plt.savefig(f'{self.oprefix}-deg-col-panel-{col}.pdf')
#plt.show()
# handle docker stats
class DStats(Plots, metaclass=Singleton):
def __init__(self, log_dir, oprefix, jf, to_plot, cfile):
Plots.__init__(self, log_dir, oprefix, jf, to_plot, cfile)
self.dstats_fname = f'{log_dir}/dstats-data/docker-stats.out'
self.kinspect_fname = f'{log_dir}/dstats-data/docker-kinspect.out'
self.col2title = { "ContainerID" : "Docker ID",
"ContainerName" : "Docker Name",
"CPUPerc" : "CPU Utilisation",
"MemUse" : "Memory Usage",
"MemTotal" : "Total Memory",
"MemPerc" : "Memory Utilisation",
"NetRecv" : "Network Received",
"NetSent" : "Network Sent",
"BlockR" : "Block Reads",
"BlockW" : "Block Writes",
"CPIDS" : "Docker PIDS"
}
self.col2units = { "ContainerID" : "ID",
"ContainerName" : "Name",
"CPUPerc" : "Percentage (%)",
"MemUse" : "MiB",
"MemTotal" : "MiB",
"MemPerc" : "Percentage (%)",
"NetRecv" : "MiB",
"NetSent" : "MiB",
"BlockR" : "MiB",
"BlockW" : "MiB",
"CPIDS" : "PIDS"
}
self.cols = ["CPUPerc", "MemUse","NetRecv", "NetSent", "BlockR", "BlockW"]
# remove the formatting artefacts
def pre_process(self):
if not path_ok(Path(self.dstats_fname)):
sys.exit(0)
self.set_summary()
regex = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
with open(self.dstats_fname) as f:
cleaned_txt = regex.sub('', f.read())
with open(self.dstats_fname, 'w') as f:
f.write(cleaned_txt)
# make sure the df is all numeric
def post_process(self):
for name in ["ContainerID", "ContainerName"]:
self.df[name] = self.df[name].map(lambda x: x.strip())
h2b, n = Human2BytesConverter(), len(self.keys)
for percent in ["CPUPerc", "MemPerc"]:
self.df[percent] = self.df[percent].str.replace('%','').astype(float)
for size in ["MemUse", "MemTotal"]:
self.df[size] = self.df[size].map(lambda x:h2b.convert(x.strip())/(1024*1024)) # MiB
for size in ["NetRecv", "NetSent"]:
self.df[size] = self.df[size].map(lambda x:h2b.convert(x.strip())/(1024*1024)) # MiB
for size in ["BlockR", "BlockW"]:
self.df[size] = self.df[size].map(lambda x:h2b.convert(x.strip())/(1024*1024)) # MiB
self.df['Key'] = self.df['ContainerName'].map(lambda x: x.split("--")[0])
self.build_key2nodes()
self.df['NodeName'] = self.df['Key'].map(lambda x: self.key2nodes[x][0])
self.set_keys()
# build df from csv
def process_data(self):
log.info(f'processing {self.dstats_fname}...')
self.pre_process()
self.df = pd.read_csv(self.dstats_fname, header=0, comment='#',
skipinitialspace = True, delimiter='/',
usecols=["ContainerID", "ContainerName",
"CPUPerc", "MemUse", "MemTotal", "MemPerc",
"NetRecv", "NetSent", "BlockR","BlockW", "CPIDS"])
self.post_process()
self.remove_incomplete_samples(grp='Key', err='--')
self.df.to_csv(f'{self.oprefix}-cleaned.csv', sep='/')
class HostProc(Plots, metaclass=Singleton):
def __init__(self, log_dir, oprefix, jf, to_plot, cfile):
Plots.__init__(self, log_dir, oprefix, jf, to_plot, cfile)
self.fname = f'{log_dir}/host-proc-data/docker-proc.out'
self.kinspect_fname = f'{log_dir}/host-proc-data/docker-kinspect.out'
self.col2title = { 'CPUPerc' : 'CPU Utilisation',
'VmPeak' : 'Peak Virtual Memory Usage',
'MemUse' : 'Peak Virtual Memory Usage',
'VmSize' : 'Current Virtual Memory Usage',
'VmRSS' : 'Peak Physical Memory Usage',
'VmData' : 'Size of Data Segment',
'VmStk' : 'Size of Stack Segment',
'NetRecv' : 'Network1: Received Bytes',
'NetRecvPkts' : 'Network1: Received Packets',
'NetSent' : 'Network1: Transmitted Bytes',
'NetSentPkts' : 'Network1: Transmitted Packets',
'NetRX' : 'Network2: Received Bytes',
'NetWX' : 'Network2: Transmitted Bytes',
'InOctets' : 'Network3: InOctets',
'OutOctets' : 'Network3: OutOctets',
'BlockR' : 'Block Reads',
'BlockW' : 'Block Writes'
}
self.col2units = { 'CPUPerc' : '%',
'VmPeak' : 'MiB',
'MemUse' : 'MiB',
'VmSize' : 'MiB',
'VmRSS' : 'MiB',
'VmData' : 'MiB',
'VmStk' : 'MiB',
'NetRecv' : 'MiB',
'NetRecvPkts' : 'Packets',
'NetSent' : 'MiB',
'NetSentPkts' : 'Packets',
'NetRX' : 'MiB',
'NetWX' : 'MiB',
'InOctets' : 'MiB',
'OutOctets' : 'MiB',
'BlockR' : 'MiB',
'BlockW' : 'MiB'
}
self.cols = ['CPUPerc', 'VmPeak', 'MemUse', 'VmSize', 'VmRSS', 'VmData', 'VmStk',
'RxBytes', 'RxPackets', 'TxBytes', 'TxPackets', 'NetRecv', 'NetSent',
'InOctets', 'OutOctets', 'BlockR', 'BlockW']
def process_data(self):
if not path_ok(Path(self.fname)):
sys.exit(0)
self.set_summary()
self.df = pd.read_csv(self.fname, header=0, comment='#',
skipinitialspace = True, delimiter=r"\s+",
usecols= ['EpochId', 'PID', 'TimeStamp',
'ContainerName', 'ContainerID', 'NodeName',
'VmPeak', 'VmPeakUnit', 'VmSize', 'VmSizeUnit',
'VmRSS', 'VmRSSUnit', 'VmData','VmDataUnit', 'VmStk', 'VmStkUnit',
'HostVIF', 'NetSent', 'NetSentPkts', 'NetRecv', 'NetRecvPkts',
#'VETH', 'InOctets', 'OutOctets',
#'DockerVIF', 'NetRecv', 'NetSent',
#'VETH', 'InOctets', 'OutOctets',
'BlockR', 'BlockW',
'CPUPerc'])
self.post_process()
self.remove_incomplete_samples(grp='Key')
self.df.to_csv(f'{self.oprefix}-cleaned.csv', sep='/')
# normalise the units
def post_process(self):
#h2b = Human2BytesConverter()
for size in ['VmPeak', 'VmSize','VmRSS', 'VmData','VmStk']:
self.df[size] = self.df[size].map(lambda x: x/1024) # MiBs
for size in ['NetRecv','NetSent']:
self.df[size] = self.df[size].map(lambda x: x/(1024*1024)) # MiBs
for size in ['BlockR', 'BlockW']:
self.df[size] = self.df[size].map(lambda x: x/(1024*1024)) # MiBs
#self.df['Key'] = self.df['ContainerName'].map(lambda x: x.split("--")[0])
self.df['Key'] = self.df['NodeName']#.map(lambda x: x.split("--")[0])
self.df['MemUse'] = self.df['VmPeak']
self.build_key2nodes()
#self.df.rename(columns={'NodeName': 'Key'}, inplace=True)
self.set_keys()
self.df.fillna(0)
# not very helpful plots atm
def plot_clusters(self, grp, agg, axes):
self.cluster_plot_helper(col=['CPUPerc', 'VmPeak', 'VmRSS', 'MemUse', 'VmData'], grp=grp, axes=axes)
# sanity check config file
def _config_file_callback(ctx: typer.Context, param: typer.CallbackParam, cfile: str):
if cfile:
typer.echo(f"Loading config file: {os.path.basename(cfile)}")
ctx.default_map = ctx.default_map or {} # Init the default map
try:
with open(cfile, 'r') as f: # Load config file
conf = json.load(f)
if "plotting" not in conf:
log.error(f"No plotting is requested in {cfile}. Skipping plotting.")
sys.exit(0)
# Merge config and default_map
if ctx.command.name in conf["plotting"]:
ctx.default_map.update(conf["plotting"][ctx.command.name])
else:
if "hproc" in conf["plotting"]:
ctx.default_map.update(conf["plotting"]["hproc"])
else:
log.info(f"No dstats/host-proc params in config. Sticking to defaults")
#ctx.default_map.update(conf["plotting"]) # Merge config and default_map
except Exception as ex:
raise typer.BadParameter(str(ex))
return cfile
# instantiate typer and set the commands
app = typer.Typer()
# common cmd processor/plotter
def cmd_helper(metric_infra, to_plot, agg, to_compare):
metric_infra.process_data()
# always plot the compare plots; rest on demand
metric_infra.set_compare(to_compare)
metric_infra.plot_compare_panel()
log.info(f'to_plot : {to_plot}')
if "Network" in to_plot and to_plot["Network"]:
metric_infra.read_network()
metric_infra.plot_network()
if "ColPanel" in to_plot and to_plot["ColPanel"]:
metric_infra.plot_col_panels(agg)
if "ValueCluster" in to_plot and to_plot["ValueCluster"]:
# TODO: find interpretable cluster plot
metric_infra.build_cluster_index('ContainerID')
if "DegColPanel" in to_plot:
metric_infra.plot_deg_col_panels()
if "SettlingTime" in to_plot and to_plot["SettlingTime"]:
metric_infra.compute_msg_settling_times()
metric_infra.plot_msg_settling_times()
#if "Compare" in to_plot and to_plot["Compare"]:
# process / plot docker-procfs.out
@app.command()
def host_proc(ctx: typer.Context, log_dir: Path, # <- mandatory path
out_prefix: str = typer.Option("output", help="Specify the prefix for the plot pdfs"),
aggregate: bool = typer.Option(True, help="Specify whether to aggregate"),
config_file: str = typer.Option("", callback=_config_file_callback, is_eager=True,
help="Set the input config file (JSON)")):
if not path_ok(log_dir, True):
sys.exit(0)
to_plot = ctx.default_map["to_plot"] if ctx.default_map and "to_plot" in ctx.default_map else []
jf = f'{os.path.abspath(log_dir)}/config/topology_generated/network_data.json'
if os.path.exists("plots"):
os.system('rm -rf plots')
os.makedirs("plots")
host_proc = HostProc(log_dir, f'plots/{out_prefix}-host-proc', jf, to_plot, config_file)
cmd_helper(host_proc, to_plot, agg=aggregate,
to_compare=["CPUPerc", "MemUse", "NetRecv", "NetSent", "BlockR", "BlockW"])
log.info(f'Done: {log_dir}')
# process / plot docker-dstats.out
@app.command()
def dstats(ctx: typer.Context, log_dir: Path, # <- mandatory path
out_prefix: str = typer.Option("output", help="Specify the prefix for the plot pdfs"),
aggregate: bool = typer.Option(True, help="Specify whether to aggregate"),
config_file: str = typer.Option("", callback=_config_file_callback, is_eager=True,
help="Set the input config file (JSON)")):
if not path_ok(log_dir, True):
sys.exit(0)
to_plot = ctx.default_map["to_plot"] if ctx.default_map and "to_plot" in ctx.default_map else []
jf = f'{os.path.abspath(log_dir)}/config/topology_generated/network_data.json'
if os.path.exists("plots"):
os.system('rm -rf plots')
os.makedirs("plots")
dstats = DStats(log_dir, f'plots/{out_prefix}-dstats', jf, to_plot, config_file)
cmd_helper(dstats, to_plot, agg=aggregate,
to_compare=["CPUPerc", "MemUse", "NetRecv", "NetSent", "BlockR", "BlockW"])
log.info(f'Done: {log_dir}')
if __name__ == "__main__":
app()

View File

@@ -1,39 +0,0 @@
# Python Imports
import sys
import json
# Project Imports
from src import analysis_logger
def load_messages(simulation_path):
try:
with open(f'{simulation_path}/messages.json', 'r') as f:
injected_msgs_dict = json.load(f)
except OSError as e:
analysis_logger.G_LOGGER.error('%s: %s' % (e.__doc__, e))
sys.exit()
analysis_logger.G_LOGGER.info(f'Loaded {len(injected_msgs_dict)} messages.')
return injected_msgs_dict
def prepare_node_in_logs(node_pbar, topology, node_logs, container_name):
for node in node_pbar:
node_info = topology["nodes"][node]
peer_id = node_info["peer_id"][:3] + "*" + node_info["peer_id"][-6:]
node_logs[peer_id] = {'published': [], 'received': [],
'container_name': container_name, 'peer_id': node}
def open_file(folder):
try:
file = open(f'{folder[0]}/output.log', mode='r')
except OSError as e:
analysis_logger.G_LOGGER.error(f'{e.__doc__}: {e}')
sys.exit()
return file

View File

@@ -1,63 +0,0 @@
# Python Imports
# Project Imports
from src import vars
from src import arg_parser
from src import topology
from src import log_parser
from src import analysis
from src import analysis_logger
from src import cproc
from src import cadvisor
from src import plotting
from src import plotting_configurations
from src import prometheus
if __name__ == "__main__":
""" Parse args """
simulation_path, prom_port, infra_type = arg_parser.parse_args()
"""Load Configuration"""
simulation_config = topology.load_json(simulation_path + "config/config.json")
metrics = simulation_config["plotting"]
""" Load Topics Structure """
topology_info = topology.load_json(simulation_path + vars.G_TOPOLOGY_FILE_NAME)
topology.load_topics_into_topology(topology_info, simulation_path + "config/topology_generated/")
""" Load Simulation Messages """
injected_msgs_dict = log_parser.load_messages(simulation_path)
node_logs, msgs_dict, min_tss, max_tss = analysis.analyze_containers(topology_info,
simulation_path)
""" Compute simulation time window """
simulation_time_ms = round((max_tss - min_tss) / 1000000)
analysis_logger.G_LOGGER.info(f'Simulation started at {min_tss}, ended at {max_tss}. '
f'Effective simulation time was {simulation_time_ms} ms.')
analysis.compute_message_delivery(msgs_dict, injected_msgs_dict)
analysis.compute_message_latencies(msgs_dict)
msg_propagation_times = analysis.compute_propagation_times(msgs_dict)
msg_injection_times = analysis.compute_injection_times(injected_msgs_dict)
""" Generate stats depending on the type of measurements infrastructure """
if infra_type == 'container-proc':
cproc.run(simulation_config, simulation_path, msgs_dict, node_logs, msg_propagation_times, msg_injection_times, min_tss, max_tss)
elif infra_type == 'cadvisor':
cadvisor.run(simulation_config, metrics, topology_info, msg_propagation_times, msg_injection_times, min_tss, max_tss, prom_port)
else:
analysis_logger.G_LOGGER.error(f'Unknown infrastructure type: {infra_type}')
analysis.inject_metric_in_dict(plotting_configurations.plotting_config, "propagation",
"Propagation Time (per message)", "Propagation Time (ms)",
"msg_propagation_times", msg_propagation_times)
analysis.inject_metric_in_dict(plotting_configurations.plotting_config, "injection",
"Injection Time (per message)", "Milliseconds (ms)",
"msg_injection_times", msg_injection_times)
prometheus.get_hardware_metrics(metrics, topology_info, min_tss, max_tss, prom_port)
""" Generate Figure """
plotting.plot_figure_ex(simulation_config)
""" We are done """
analysis_logger.G_LOGGER.info('Ended')

View File

@@ -1,179 +0,0 @@
# Python Imports
import math
import numpy as np
import matplotlib.pyplot as plt
# Project Imports
from src import vars
from src import analysis_logger
from src import plotting_configurations
def plot_figure_cproc(msg_propagation_times, cpu_usage, memory_usage, network_usage, disk_usage, injection_times, simulation_summary, simulation_config):
def style_violin(parts, ax):
# Change the extrema lines to dashed grey lines
for line in parts['cmaxes'].get_segments() + parts['cmins'].get_segments():
line_obj = plt.Line2D(line[:, 0], line[:, 1], color='grey', linestyle='dashed', linewidth=0.5)
ax.add_line(line_obj)
# Remove the original extrema lines
parts['cmaxes'].set_visible(False)
parts['cmins'].set_visible(False)
# Change the vertical lines to dashed grey lines
for line in parts['cbars'].get_segments():
line_obj = plt.Line2D(line[:, 0], line[:, 1], color='grey', linestyle='dashed', linewidth=0.5)
ax.add_line(line_obj)
# Remove the original vertical lines
parts['cbars'].set_visible(False)
cmean_colors = parts['cmeans'].get_color()
colors = [cmean_colors[0],'red',cmean_colors[0],cmean_colors[0]]
parts['cmeans'].set_color(colors)
# loop over the paths of the mean lines
xy = [[l.vertices[:,0].mean(),l.vertices[0,1]] for l in parts['cmeans'].get_paths()]
xy = np.array(xy)
ax.scatter(xy[:,0], xy[:,1],s=25, c="crimson", marker="o", zorder=3)
# make lines invisible
parts['cmeans'].set_visible(False)
fig, ((ax1, ax2, ax3), (ax4, ax5, ax6)) = plt.subplots(2, 3, figsize=(15, 15))
if msg_propagation_times:
parts = ax1.violinplot(msg_propagation_times, showmeans=True)
ax1.set_title('Popagation Time (per message)')
ax1.set_ylabel('Propagation Time (ms)')
ax1.spines[['right', 'top']].set_visible(False)
ax1.axes.xaxis.set_visible(False)
style_violin(parts, ax1)
parts = ax2.violinplot(cpu_usage, showmeans=True)
ax2.set_title('Peak CPU Usage (per node)')
ax2.set_ylabel('CPU Usage (%)')
ax2.spines[['right', 'top']].set_visible(False)
ax2.axes.xaxis.set_visible(False)
style_violin(parts, ax2)
parts = ax3.violinplot(memory_usage, showmeans=True)
ax3.set_title('Peak Memory Usage (per node)')
ax3.set_ylabel('Memory (MBytes)')
ax3.spines[['right', 'top']].set_visible(False)
ax3.axes.xaxis.set_visible(False)
style_violin(parts, ax3)
parts = ax4.violinplot([network_usage['rx_mbytes'], network_usage['tx_mbytes']], showmeans=True)
ax4.set_title('Total Netowrk IO (per node)')
ax4.set_ylabel('Bandwidth (MBytes)')
ax4.spines[['right', 'top']].set_visible(False)
ax4.set_xticks([1, 2])
ax4.set_xticklabels(['Received (Rx)', 'Sent (Tx)'])
style_violin(parts, ax4)
parts = ax5.violinplot(injection_times, showmeans=True)
ax5.set_title('Injection Time (per message)')
ax5.set_ylabel('Milliseconds (ms)')
ax5.spines[['right', 'top']].set_visible(False)
ax5.axes.xaxis.set_visible(False)
style_violin(parts, ax5)
parts = ax6.violinplot([disk_usage['disk_read_mbytes'], disk_usage['disk_write_mbytes']], showmeans=True)
ax6.set_title('Peak Disk IO (per node)')
ax6.set_ylabel('Disk IO (MBytes)')
ax6.spines[['right', 'top']].set_visible(False)
ax6.set_xticks([1, 2])
ax6.set_xticklabels(['Read', 'Write'])
style_violin(parts, ax6)
fig.suptitle('Wakurtosis Simulation Node Level Analysis\n(%d nodes, %d topic(s), Rate: %d msg/s, Time: %.2f s. Sampling Rate: %.2f samples/s.)\n' %(simulation_summary['num_nodes'], \
simulation_summary['num_topics'], simulation_config['wls']['message_rate'], simulation_summary['simulation_time_ms'] / 1000.0, \
simulation_summary['metrics']['esr']), fontsize=20)
plt.tight_layout()
figure_path = f'{vars.G_DEFAULT_SIMULATION_PATH}/{vars.G_DEFAULT_FIG_FILENAME}'
plt.savefig(figure_path, format="pdf", bbox_inches="tight")
analysis_logger.G_LOGGER.info(f'Figure saved in {figure_path}')
def plot_figure_ex(simulation_config):
def style_violin(parts, ax):
# Change the extrema lines to dashed grey lines
for line in parts['cmaxes'].get_segments() + parts['cmins'].get_segments():
line_obj = plt.Line2D(line[:, 0], line[:, 1], color='grey', linestyle='dashed', linewidth=0.5)
ax.add_line(line_obj)
# Remove the original extrema lines
parts['cmaxes'].set_visible(False)
parts['cmins'].set_visible(False)
# Change the vertical lines to dashed grey lines
for line in parts['cbars'].get_segments():
line_obj = plt.Line2D(line[:, 0], line[:, 1], color='grey', linestyle='dashed', linewidth=0.5)
ax.add_line(line_obj)
# Remove the original vertical lines
parts['cbars'].set_visible(False)
cmean_colors = parts['cmeans'].get_color()
colors = [cmean_colors[0], 'red', cmean_colors[0], cmean_colors[0]]
parts['cmeans'].set_color(colors)
# loop over the paths of the mean lines
xy = [[l.vertices[:, 0].mean(), l.vertices[0, 1]] for l in parts['cmeans'].get_paths()]
xy = np.array(xy)
ax.scatter(xy[:, 0], xy[:, 1], s=25, c="crimson", marker="o", zorder=3)
# make lines invisible
parts['cmeans'].set_visible(False)
metrics = plotting_configurations.plotting_config
num_subplots = len(metrics.keys())
num_cols = 3
num_rows = math.ceil(num_subplots / num_cols)
fig, axs = plt.subplots(num_rows, num_cols, figsize=(15, 15))
axs = axs.flatten()
# Remove unused subplots
for i in range(num_subplots, num_rows * num_cols):
fig.delaxes(axs[i])
for i, key in enumerate(metrics.keys()):
# if type(metrics[key]) is list:
# if sum([plotting_configurations[val]["values"] for val in metrics[key]]) == 0:
# continue
metric = metrics[key]
analysis_logger.G_LOGGER.info(f"Plotting {key}: {metric['values']}")
parts = axs[i].violinplot(metric["values"], showmeans=True)
axs[i].set_title(metric["title"])
axs[i].set_ylabel(metric["y_label"])
axs[i].spines[['right', 'top']].set_visible(False)
axs[i].axes.xaxis.set_visible(False)
if "xtic_labels" in metric.keys():
axs[i].set_xticks([i+1 for i in range(len(metric["xtic_labels"]))])
axs[i].set_xticklabels(metric["xtic_labels"])
axs[i].axes.xaxis.set_visible(True)
style_violin(parts, axs[i])
fig.suptitle(
'Wakurtosis Simulation Node Level Analysis\n(%d nodes, %d topic(s), Rate: %d msg/s, Time: %.2f s. Message Rate: %.2f. Min/Max size: %d/%d.)\n' % (
simulation_config['gennet']['num_nodes'], \
simulation_config['gennet']['num_topics'], simulation_config['wls']['message_rate'],
simulation_config['wls']['simulation_time'], \
simulation_config['wls']['message_rate'],
simulation_config['wls']['min_packet_size'],
simulation_config['wls']['max_packet_size']
), fontsize=20)
plt.tight_layout()
figure_path = f'{vars.G_DEFAULT_SIMULATION_PATH}{vars.G_DEFAULT_FIG_FILENAME}'
plt.savefig(figure_path, format="pdf", bbox_inches="tight")
analysis_logger.G_LOGGER.info(f'Figure saved in {figure_path}')

View File

@@ -1,59 +0,0 @@
plotting_config = {
"container_cpu_load_average_10s": {
"title": "Peak CPU Usage (per node)",
"y_label": "CPU Usage (%)",
"statistic": "max",
"toMB": False
},
"container_memory_usage_bytes": {
"title": "Peak Memory Usage (per node)",
"y_label": "Memory (MBytes)",
"statistic": "max",
"toMB": True
},
"container_network_receive_bytes_total": {
"title": "Total Netowrk IO (per node)",
"y_label": "Bandwidth (MBytes)",
"xtic_labels": [
"Received (Rx)"
],
"statistic": "max",
"toMB": True
},
"container_network_transmit_bytes_total": {
"title": "Total Netowrk IO (per node)",
"y_label": "Bandwidth (MBytes)",
"metric_name": [
"container_network_transmit_bytes_total"
],
"xtic_labels": [
"Sent (Tx)"
],
"statistic": "max",
"toMB": True
},
"container_fs_reads_bytes_total": {
"title": "Peak Disk IO (per node)",
"y_label": "Disk IO (MBytes)",
"metric_name": [
"container_fs_reads_bytes_total"
],
"xtic_labels": [
"Read"
],
"statistic": "max",
"toMB": True
},
"container_fs_writes_bytes_total": {
"title": "Peak Disk IO (per node)",
"y_label": "Disk IO (MBytes)",
"metric_name": [
"container_fs_writes_bytes_total"
],
"xtic_labels": [
"Write"
],
"statistic": "max",
"toMB": True
}
}

View File

@@ -1,121 +0,0 @@
# Python Imports
import builtins
from datetime import datetime
from tqdm import tqdm
from prometheus_api_client import PrometheusConnect
# Project Imports
from src import analysis_logger
from src import plotting_configurations
def connect_to_prometheus(port):
url = f"http://host.docker.internal:{port}"
try:
analysis_logger.G_LOGGER.debug('Connecting to Prometheus server in %s' % url)
prometheus = PrometheusConnect(url, disable_ssl=True)
except Exception as e:
analysis_logger.G_LOGGER.error('%s: %s' % (e.__doc__, e))
return None
return prometheus
def get_hardware_metrics(metrics, topology, min_tss, max_tss, prom_port):
container_ips = [info["kurtosis_ip"] for info in topology["containers"].values()]
pbar = tqdm(container_ips)
prometheus = connect_to_prometheus(prom_port)
for container_ip in pbar:
pbar.set_description(f'Fetching hardware stats from container {container_ip}')
try:
fetch_cadvisor_stats_from_prometheus_by_node(metrics, prometheus, container_ip, min_tss,
max_tss)
except Exception as e:
analysis_logger.G_LOGGER.error('%s: %s' % (e.__doc__, e))
continue
#fetch_cadvisor_stats_from_prometheus_by_simulation(metrics, prometheus, container_ips, min_tss,
# max_tss)
def fetch_cadvisor_stats_from_prometheus_by_simulation(metrics, prom, container_ips, start_ts,
end_ts):
start_timestamp = datetime.utcfromtimestamp(start_ts / 1e9)
end_timestamp = datetime.fromtimestamp(end_ts / 1e9)
for metric in metrics["by_simulation"]:
plotting_config = plotting_configurations.plotting_config[metric]
plotting_config.setdefault("values", []).append(
fetch_accumulated_metric_for_all_nodes(prom, metric, container_ips,
start_timestamp,
end_timestamp, plotting_config["toMB"]))
def fetch_cadvisor_stats_from_prometheus_by_node(metrics, prom, container_ip, start_ts, end_ts):
# Prometheus query example:
# container_network_transmit_bytes_total{container_label_com_kurtosistech_private_ip = "212.209.64.2"}
start_timestamp = datetime.utcfromtimestamp(start_ts / 1e9)
end_timestamp = datetime.fromtimestamp(end_ts / 1e9)
for metric in metrics["by_node"]:
plotting_config = plotting_configurations.plotting_config[metric]
stat_function = function_dispatcher[plotting_config["statistic"]]
values = fetch_metric(prom, metric, container_ip, start_timestamp, end_timestamp,
plotting_config["toMB"])
plotting_config.setdefault("values", []).append(stat_function(values))
def fetch_accumulated_metric_for_all_nodes(prom, metric, container_ips, start_timestamp,
end_timestamp,
to_mbytes=False):
result = {}
for ip in container_ips:
values = fetch_metric_with_timestamp(prom, metric, ip, start_timestamp, end_timestamp)
for item in values:
timestamp, value = item
value = int(value)
if to_mbytes:
value = value / (1024 * 1024)
if timestamp in result:
result[timestamp] += value
else:
result[timestamp] = value
result_list = [value for value in result.values()]
return result_list
def fetch_metric(prom, metric, ip, start_timestamp, end_timestamp, to_mbytes=False):
metric_result = prom.custom_query_range(
f"{metric}{{container_label_com_kurtosistech_private_ip = '{ip}'}}",
start_time=start_timestamp, end_time=end_timestamp, step="1s")
if not metric_result:
analysis_logger.G_LOGGER.error(f"{metric} returns no data. Adding zero.")
return [0]
metric_values = [float(metric_result[0]['values'][i][1]) for i in
range(len(metric_result[0]['values']))]
if to_mbytes:
metric_values = [value / (1024 * 1024) for value in metric_values]
return metric_values
def fetch_metric_with_timestamp(prom, metric, ip, start_timestamp, end_timestamp):
metric_result = prom.custom_query_range(
f"{metric}{{container_label_com_kurtosistech_private_ip = '{ip}'}}",
start_time=start_timestamp, end_time=end_timestamp, step="1s")
if not metric_result:
analysis_logger.G_LOGGER.error(f"{metric} returns no data. Adding zero.")
return [[0, 0]]
return metric_result[0]['values']
function_dispatcher = {
"max": max,
"min": min,
"average": lambda x: sum(x) / len(x)
}

View File

@@ -1,48 +0,0 @@
# Python Imports
import sys
import json
try:
import tomllib
except ModuleNotFoundError:
import tomli as tomllib
# Project Imports
from src import analysis_logger
from src import vars
def _load_topics(node_info, nodes, node, tomls_folder):
topics = None
with open(tomls_folder + node_info["node_config"], mode='rb') as read_file:
toml_config = tomllib.load(read_file)
if node_info["image"] == vars.G_NWAKU_IMAGE_NAME:
topics = list(toml_config["topics"].split(" "))
elif node_info["image"] == vars.G_GOWAKU_IMAGE_NAME:
topics = toml_config["topics"]
else:
raise ValueError("Unknown image type")
# Load topics into topology for easier access
nodes[node]["topics"] = topics
def load_topics_into_topology(topology, tomls_folder):
nodes = topology["nodes"]
for node, node_info in nodes.items():
try:
_load_topics(node_info, nodes, node, tomls_folder)
except ValueError as e:
analysis_logger.G_LOGGER.error('%s: %s' % (e.__doc__, e))
sys.exit()
analysis_logger.G_LOGGER.info('Loaded nodes topics from toml files')
def load_json(json_file):
with open(json_file, 'r') as read_file:
jfile = json.load(read_file)
analysis_logger.G_LOGGER.debug(jfile)
analysis_logger.G_LOGGER.info(f'{json_file} loaded')
return jfile

View File

@@ -1,12 +0,0 @@
G_DEFAULT_SIMULATION_PATH = "/simulation_data/"
G_DEFAULT_FIG_FILENAME = 'analysis.pdf'
G_DEFAULT_SUMMARY_FILENAME = 'summary.json'
G_TOPOLOGY_FILE_NAME = "network_data.json"
G_NWAKU_IMAGE_NAME = "nim-waku"
G_GOWAKU_IMAGE_NAME = "go-waku"
G_APP_NAME = 'WLS-ANALYSIS'
G_LOG_LEVEL = 'DEBUG'

View File

@@ -1,240 +0,0 @@
# Python Imports
import unittest
import sys
# Project Imports
from src import analysis
class TestAnalysis(unittest.TestCase):
def test_update_min_max_tss_min(self):
tss = 0
min_tss = 5
max_tss = 10
min_tss, max_tss = analysis.update_min_max_tss(tss, min_tss, max_tss)
self.assertEqual(min_tss, 0)
self.assertEqual(max_tss, 10)
def test_update_min_max_tss_min_2(self):
tss = 7
min_tss = 5
max_tss = 10
min_tss, max_tss = analysis.update_min_max_tss(tss, min_tss, max_tss)
self.assertEqual(min_tss, 5)
self.assertEqual(max_tss, 10)
def test_update_min_max_tss_min_negative(self):
tss = 0
min_tss = -5
max_tss = 10
min_tss, max_tss = analysis.update_min_max_tss(tss, min_tss, max_tss)
self.assertEqual(min_tss, -5)
self.assertEqual(max_tss, 10)
def test_update_min_max_tss_min_negative_2(self):
tss = -6
min_tss = -5
max_tss = 10
min_tss, max_tss = analysis.update_min_max_tss(tss, min_tss, max_tss)
self.assertEqual(min_tss, -6)
self.assertEqual(max_tss, 10)
def test_update_min_max_tss_min_negative_3(self):
tss = -4
min_tss = -5
max_tss = 10
min_tss, max_tss = analysis.update_min_max_tss(tss, min_tss, max_tss)
self.assertEqual(min_tss, -5)
self.assertEqual(max_tss, 10)
def test_update_min_max_tss_max(self):
tss = 15
min_tss = 5
max_tss = 10
min_tss, max_tss = analysis.update_min_max_tss(tss, min_tss, max_tss)
self.assertEqual(min_tss, 5)
self.assertEqual(max_tss, 15)
def test_update_min_max_tss_same(self):
tss = 6
min_tss = 5
max_tss = 10
min_tss, max_tss = analysis.update_min_max_tss(tss, min_tss, max_tss)
self.assertEqual(min_tss, 5)
self.assertEqual(max_tss, 10)
def test_update_min_max_tss_same_2(self):
tss = -4
min_tss = -10
max_tss = -5
min_tss, max_tss = analysis.update_min_max_tss(tss, min_tss, max_tss)
self.assertEqual(min_tss, -10)
self.assertEqual(max_tss, -4)
def test_get_relay_line_info(self):
log_line = "TRC 2023-04-18 08:31:28.591+00:00 waku.relay received topics=\"waku node\" tid=1 file=waku_node.nim:395 peerId=16U*96opVg pubsubTopic=topic_C hash=0x12208ff2358cd9e488cd5f2806c9859dbd28768c52b6f52614c3148e45c5c12edadc receivedTime=1681806688591971328"
msg_topics, msg_topic, msg_hash, msg_peer_id = analysis.get_relay_line_info(log_line)
self.assertEqual(msg_topics, "waku node")
self.assertEqual(msg_topic, "topic_C")
self.assertEqual(msg_hash, "0x12208ff2358cd9e488cd5f2806c9859dbd28768c52b6f52614c3148e45c5c12edadc")
self.assertEqual(msg_peer_id, "16U*96opVg")
def test_compute_injection_times(self):
injected_msgs_dict = {
1: {'status': 200, 'injection_time': 1651372800},
2: {'status': 404, 'injection_time': 1651376400},
3: {'status': 200, 'injection_time': 1651380000}
}
expected_result = [1651372800, 1651380000]
self.assertEqual(analysis.compute_injection_times(injected_msgs_dict), expected_result)
def test_compute_injection_times_empty(self):
injected_msgs_dict = {}
expected_result = []
self.assertEqual(analysis.compute_injection_times(injected_msgs_dict), expected_result)
def test_analyze_published_first_time(self):
log_line = "TRC 2023-04-18 08:31:30.147+00:00 waku.relay published topics=\"waku node\" tid=1 file=waku_node.nim:484 peerId=16U*96opVg pubsubTopic=topic_C hash=0x122071785e0803dafa9cdd288ed7db0d32277f7cab110b798f631d0fcd2d58dd7463 publishTime=1681806690147236096"
published_time = 1681806690147236096
msgs_dict = {}
node_logs = {"16U*96opVg": {'published': [], 'received': [],
'container_name': "test", 'peer_id': "test"}}
analysis.analyze_published(log_line, node_logs, msgs_dict, published_time)
self.assertEqual(node_logs["16U*96opVg"]["published"][0][2], "topic_C")
self.assertEqual(node_logs["16U*96opVg"]["published"][0][3], "0x122071785e0803dafa9cdd288ed7db0d32277f7cab110b798f631d0fcd2d58dd7463")
self.assertEqual(msgs_dict, {'0x122071785e0803dafa9cdd288ed7db0d32277f7cab110b798f631d0fcd2d58dd7463':
{'published': [{'ts': published_time, 'peer_id': "16U*96opVg"}], 'received': []}})
def test_analyze_published_second_time(self):
log_line = "TRC 2023-04-18 08:31:30.147+00:00 waku.relay published topics=\"waku node\" tid=1 file=waku_node.nim:484 peerId=16U*96opVg pubsubTopic=topic_C hash=0x122071785e0803dafa9cdd288ed7db0d32277f7cab110b798f631d0fcd2d58dd7463 publishTime=1681806690147236096"
log_line2 = "TRC 2023-04-18 08:31:30.147+00:00 waku.relay published topics=\"waku node\" tid=1 file=waku_node.nim:484 peerId=16U*96opVg pubsubTopic=topic_C hash=0x122071785e0803dafa9cdd288ed7db0d32277f7cab110b798f631d0fcd2d58dd7463 publishTime=1681806690147236097"
published_time = 1681806690147236096
published_time2 = 1681806690147236097
msgs_dict = {}
node_logs = {}
node_logs["16U*96opVg"] = {'published': [], 'received': [],
'container_name': "test", 'peer_id': "test"}
analysis.analyze_published(log_line, node_logs, msgs_dict, published_time)
analysis.analyze_published(log_line2, node_logs, msgs_dict, published_time2)
self.assertEqual(node_logs["16U*96opVg"]["published"][1][2], "topic_C")
self.assertEqual(node_logs["16U*96opVg"]["published"][1][3],
"0x122071785e0803dafa9cdd288ed7db0d32277f7cab110b798f631d0fcd2d58dd7463")
self.assertEqual(msgs_dict, {'0x122071785e0803dafa9cdd288ed7db0d32277f7cab110b798f631d0fcd2d58dd7463':
{'published': [{'ts': published_time, 'peer_id': "16U*96opVg"},
{'ts': published_time2, 'peer_id': "16U*96opVg"}], 'received': []}})
def test_analyze_received_first_time(self):
log_line = "TRC 2023-04-18 08:31:28.591+00:00 waku.relay received topics=\"waku node\" tid=1 file=waku_node.nim:395 peerId=16U*96opVg pubsubTopic=topic_C hash=0x12208ff2358cd9e488cd5f2806c9859dbd28768c52b6f52614c3148e45c5c12edadc receivedTime=1681806688591971328"
received_time = 1681806688591971328
msgs_dict = {}
node_logs = {}
node_logs["16U*96opVg"] = {'published': [], 'received': [],
'container_name': "test", 'peer_id': "test"}
analysis.analyze_received(log_line, node_logs, msgs_dict, received_time)
self.assertEqual(node_logs["16U*96opVg"]["received"][0][2], "topic_C")
self.assertEqual(node_logs["16U*96opVg"]["received"][0][3],
"0x12208ff2358cd9e488cd5f2806c9859dbd28768c52b6f52614c3148e45c5c12edadc")
self.assertEqual(msgs_dict, {'0x12208ff2358cd9e488cd5f2806c9859dbd28768c52b6f52614c3148e45c5c12edadc':
{'published': [], 'received': [{'ts': received_time, 'peer_id': "16U*96opVg"}]}})
def test_analyze_received_second_time(self):
log_line = "TRC 2023-04-18 08:31:28.591+00:00 waku.relay received topics=\"waku node\" tid=1 file=waku_node.nim:395 peerId=16U*96opVg pubsubTopic=topic_C hash=0x12208ff2358cd9e488cd5f2806c9859dbd28768c52b6f52614c3148e45c5c12edadc receivedTime=1681806688591971328"
log_line2 = "TRC 2023-04-18 08:31:28.591+00:00 waku.relay received topics=\"waku node\" tid=1 file=waku_node.nim:395 peerId=16U*96opVg pubsubTopic=topic_C hash=0x12208ff2358cd9e488cd5f2806c9859dbd28768c52b6f52614c3148e45c5c12edadc receivedTime=1681806688591971329"
received_time = 1681806688591971328
received_time2 = 1681806688591971329
msgs_dict = {}
node_logs = {}
node_logs["16U*96opVg"] = {'published': [], 'received': [],
'container_name': "test", 'peer_id': "test"}
analysis.analyze_received(log_line, node_logs, msgs_dict, received_time)
analysis.analyze_received(log_line2, node_logs, msgs_dict, received_time2)
self.assertEqual(node_logs["16U*96opVg"]["received"][1][2], "topic_C")
self.assertEqual(node_logs["16U*96opVg"]["received"][1][3],
"0x12208ff2358cd9e488cd5f2806c9859dbd28768c52b6f52614c3148e45c5c12edadc")
self.assertEqual(msgs_dict, {'0x12208ff2358cd9e488cd5f2806c9859dbd28768c52b6f52614c3148e45c5c12edadc':
{'published': [], 'received': [{'ts': received_time, 'peer_id': "16U*96opVg"},
{'ts': received_time2, 'peer_id': "16U*96opVg"}]}})
def test_parse_lines_in_file(self):
lines = [
"TRC 2023-04-18 08:31:28.591+00:00 publish topics=\"waku node\" tid=1 file=protocol.nim:160 pubsubTopic=topic_C",
"TRC 2023-04-18 08:31:28.591+00:00 waku.relay received topics=\"waku node\" tid=1 file=waku_node.nim:395 peerId=16U*92yDon pubsubTopic=topic_C hash=0x12208ff2358cd9e488cd5f2806c9859dbd28768c52b6f52614c3148e45c5c12edadc receivedTime=1681806688591453952",
"TRC 2023-04-18 08:31:28.591+00:00 waku.relay published topics=\"waku node\" tid=1 file=waku_node.nim:484 peerId=16U*92yDon pubsubTopic=topic_C hash=0x12208ff2358cd9e488cd5f2806c9859dbd28768c52b6f52614c3148e45c5c12edadc publishTime=1681806688591803648"
]
msgs_dict = {}
node_logs = {}
max_tss = -sys.maxsize - 1
min_tss = sys.maxsize
node_logs["16U*92yDon"] = {'published': [], 'received': [],
'container_name': "test", 'peer_id': "test"}
min_tss, max_tss = analysis.parse_lines_in_file(lines, node_logs, msgs_dict, min_tss, max_tss)
self.assertEqual(node_logs["16U*92yDon"]["received"][0][2], "topic_C")
self.assertEqual(node_logs["16U*92yDon"]["received"][0][3],
"0x12208ff2358cd9e488cd5f2806c9859dbd28768c52b6f52614c3148e45c5c12edadc")
self.assertEqual(node_logs["16U*92yDon"]["published"][0][2], "topic_C")
self.assertEqual(node_logs["16U*92yDon"]["published"][0][3],
"0x12208ff2358cd9e488cd5f2806c9859dbd28768c52b6f52614c3148e45c5c12edadc")
self.assertEqual(min_tss, 1681806688591453952)
self.assertEqual(max_tss, 1681806688591803648)
def test_compute_message_latencies(self):
msgs_dict = {'0x12208ff2358cd9e488cd5f2806c9859dbd28768c52b6f52614c3148e45c5c12edadc':
{'published': [{'ts': 10, 'peer_id': "16U*96opVi"}], 'received': [{'ts': 20, 'peer_id': "16U*96opVg"}]}}
analysis.compute_message_latencies(msgs_dict)
self.assertEqual(msgs_dict["0x12208ff2358cd9e488cd5f2806c9859dbd28768c52b6f52614c3148e45c5c12edadc"]["latencies"][0], 10)
def test_compute_message_latencies_multiple(self):
msgs_dict = {'0x12208ff2358cd9e488cd5f2806c9859dbd28768c52b6f52614c3148e45c5c12edadc':
{'published': [{'ts': 10, 'peer_id': "16U*96opVi"}],
'received': [{'ts': 20, 'peer_id': "16U*96opVg"},
{'ts': 30, 'peer_id': "16U*96opVg"},
{'ts': 40, 'peer_id': "16U*96opVg"}]}}
analysis.compute_message_latencies(msgs_dict)
self.assertEqual(msgs_dict["0x12208ff2358cd9e488cd5f2806c9859dbd28768c52b6f52614c3148e45c5c12edadc"]["latencies"],
[10, 20, 30])
def test_compute_propagation_times(self):
msgs_dict = {'0x12208ff2358cd9e488cd5f2806c9859dbd28768c52b6f52614c3148e45c5c12edadc':
{'published': [{'ts': 10, 'peer_id': "16U*96opVi"}], 'received': [{'ts': 20, 'peer_id': "16U*96opVg"}],
"latencies": [1000000]}}
result = analysis.compute_propagation_times(msgs_dict)
self.assertEqual(result[0], 1)
def test_compute_propagation_times_multiple_latencies(self):
msgs_dict = {'0x12208ff2358cd9e488cd5f2806c9859dbd28768c52b6f52614c3148e45c5c12edadc':
{'published': [{'ts': 10, 'peer_id': "16U*96opVi"}], 'received': [{'ts': 20, 'peer_id': "16U*96opVg"}],
"latencies": [1000000, 489, 481239, 0.71341]}}
result = analysis.compute_propagation_times(msgs_dict)
self.assertEqual(result[0], 1)
def test_compute_propagation_times_multiple_received_latencies(self):
msgs_dict = {'0x12208ff2358cd9e488cd5f2806c9859dbd28768c52b6f52614c3148e45c5c12edadc':
{'published': [{'ts': 10, 'peer_id': "16U*96opVi"}],
'received': [{'ts': 20, 'peer_id': "16U*96opVg"}],
"latencies": [1000000, 489, 481239, 0.71341]},
'0x12208ff2358cd9e488cd5f2806c9859dbd28768c52b6f52614c3148e45c5c12edadj':
{'published': [{'ts': 10, 'peer_id': "16U*96opVi"}],
'received': [{'ts': 20, 'peer_id': "16U*96opVg"}],
"latencies": [5000000, 512312, 5643, 0.1243]}
}
result = analysis.compute_propagation_times(msgs_dict)
self.assertEqual(result, [1, 5])
def test_compute_message_delivery(self):
mesages = {"a": 1, "b": 2, "c": 3}
delivered = {"a": 1, "b": 2, "c": 3}
result = analysis.compute_message_delivery(mesages, delivered)
self.assertEqual(result, 100)
def test_inject_metric_in_dict(self):
metrics = {}
analysis.inject_metric_in_dict(metrics, "test", "title", "y_label", "metric_name", [1, 2, 3])
self.assertEqual(metrics["test"]["title"], "title")
self.assertEqual(metrics["test"]["y_label"], "y_label")
self.assertEqual(metrics["test"]["metric_name"], "metric_name")
self.assertEqual(metrics["test"]["values"], [1, 2, 3])

View File

@@ -1,230 +0,0 @@
# Python Imports
import unittest
from unittest.mock import patch, mock_open
# Project Imports
from src import analysis_cproc
class TestAnalysisCProc(unittest.TestCase):
def test_compute_simulation_time_window(self):
min_tss = 1000000
max_tss = 2000000
expected_simulation_start_ts = min_tss
expected_simulation_end_ts = max_tss
expected_simulation_time_ms = round((max_tss - min_tss) / 1000000)
actual_simulation_start_ts, actual_simulation_end_ts, actual_simulation_time_ms = analysis_cproc.compute_simulation_time_window(min_tss, max_tss)
self.assertEqual(actual_simulation_start_ts, expected_simulation_start_ts)
self.assertEqual(actual_simulation_end_ts, expected_simulation_end_ts)
self.assertEqual(actual_simulation_time_ms, expected_simulation_time_ms)
def test_extract_node_id(self):
# Case 1: Standard string with node ID
s = "node-123.toml"
expected_node_id = "node_123"
self.assertEqual(analysis_cproc.extract_node_id(s), expected_node_id)
# Case 2: String with node ID but additional characters
s = "prefix-node-456.toml-suffix"
expected_node_id = "node_456"
self.assertEqual(analysis_cproc.extract_node_id(s), expected_node_id)
# Case 3: String without node ID
s = "node.toml"
expected_node_id = None
self.assertEqual(analysis_cproc.extract_node_id(s), expected_node_id)
# Case 4: Empty string
s = ""
expected_node_id = None
self.assertEqual(analysis_cproc.extract_node_id(s), expected_node_id)
def test_add_sample_to_metrics(self):
# Case 1: Adding a sample to an empty metrics_dict
metrics_dict = {}
node_id = "node_123"
sample = {'PID': 123}
nodes_cnt = analysis_cproc.add_sample_to_metrics(sample, node_id, metrics_dict)
self.assertEqual(nodes_cnt, 1)
self.assertEqual(metrics_dict, {node_id: {'samples' : [sample]}})
# Case 2: Adding a sample to a metrics_dict that already contains the node_id
sample2 = {'PID': 456}
nodes_cnt = analysis_cproc.add_sample_to_metrics(sample2, node_id, metrics_dict)
self.assertEqual(nodes_cnt, 0) # It should return 0 because the node_id already exists in the metrics_dict
self.assertEqual(metrics_dict, {node_id: {'samples' : [sample, sample2]}})
def test_parse_container_nodes(self):
# Case 1: Sample PID exists in container nodes
container_id = 'container_123'
container_data = {'samples': [{'PID': 1}, {'PID': 2}]}
container_nodes = {1: 'node_1', 2: 'node_2'}
metrics_dict = {}
nodes_cnt = analysis_cproc.parse_container_nodes(container_id, container_data, container_nodes, metrics_dict)
self.assertEqual(nodes_cnt, 2)
self.assertDictEqual(metrics_dict, {'node_1': {'samples': [{'PID': 1}]}, 'node_2': {'samples': [{'PID': 2}]}})
# Case 2: Sample PID does not exist in container nodes
container_id = 'container_456'
container_data = {'samples': [{'PID': 3}]}
container_nodes = {1: 'node_1', 2: 'node_2'}
metrics_dict = {}
nodes_cnt = analysis_cproc.parse_container_nodes(container_id, container_data, container_nodes, metrics_dict)
self.assertEqual(nodes_cnt, 0)
self.assertDictEqual(metrics_dict, {})
def test_extract_container_nodes(self):
# Case 1: All processes have node IDs
container_id = 'container_123'
container_data = {'info': {'processes': [{'binary': 'node-1.toml', 'pid': 1}, {'binary': 'node-2.toml', 'pid': 2}]}}
container_nodes = analysis_cproc.extract_container_nodes(container_id, container_data)
self.assertDictEqual(container_nodes, {1: 'node_1', 2: 'node_2'})
# Case 2: Some processes don't have node IDs
container_id = 'container_456'
container_data = {'info': {'processes': [{'binary': 'node.toml', 'pid': 1}, {'binary': 'node-2.toml', 'pid': 2}]}}
container_nodes = analysis_cproc.extract_container_nodes(container_id, container_data)
self.assertDictEqual(container_nodes, {2: 'node_2'})
# Case 3: No processes have node IDs
container_id = 'container_789'
container_data = {'info': {'processes': [{'binary': 'node.toml', 'pid': 1}, {'binary': 'node.toml', 'pid': 2}]}}
container_nodes = analysis_cproc.extract_container_nodes(container_id, container_data)
self.assertDictEqual(container_nodes, {})
@patch("json.load")
@patch("builtins.open", new_callable=mock_open, read_data="data")
def test_load_metrics_file(self, mock_file, mock_json):
# Case 1: Successful load
mock_json.return_value = {'header': 'header', 'containers': {'container1': 'data1', 'container2': 'data2'}}
metrics_file_path = 'path/to/metrics_file.json'
metrics_obj = analysis_cproc.load_metrics_file(metrics_file_path)
self.assertDictEqual(metrics_obj, {'header': 'header', 'containers': {'container1': 'data1', 'container2': 'data2'}})
# Reset the mock
mock_file.reset_mock()
# Case 2: Unsuccessful load (file does not exist)
mock_file.side_effect = FileNotFoundError()
metrics_file_path = 'path/to/non_existent_file.json'
with self.assertRaises(FileNotFoundError): # The function should raise FileNotFoundError, not SystemExit
analysis_cproc.load_metrics_file(metrics_file_path)
@patch("builtins.open", new_callable=mock_open, read_data='{"key":"value"}')
@patch("analysis_cproc.load_metrics_file")
@patch("analysis_cproc.process_metrics_file")
@patch("analysis_cproc.analysis_logger.G_LOGGER")
def test_load_process_level_metrics(self, mock_logger, mock_process_metrics_file, mock_load_metrics_file, mock_open):
# Case 1: Normal execution
metrics_file_path = 'path/to/metrics_file.json'
mock_load_metrics_file.return_value = {
'header': 'header',
'containers': {
'container_1': {
'info': {'processes': [{'binary': 'node-1.toml', 'pid': 1}]},
'samples': [{'PID': 1}]
}
}
}
mock_process_metrics_file.return_value = (mock_load_metrics_file.return_value, None)
try:
analysis_cproc.load_process_level_metrics(metrics_file_path)
except SystemExit:
pass
mock_load_metrics_file.assert_called_once_with(metrics_file_path)
mock_process_metrics_file.assert_called_once_with(mock_load_metrics_file.return_value)
# Case 2: Exception handling
mock_load_metrics_file.reset_mock()
mock_load_metrics_file.side_effect = Exception("Test exception")
try:
analysis_cproc.load_process_level_metrics(metrics_file_path)
except SystemExit:
pass
mock_logger.error.assert_called_once()
def test_compute_node_metrics(self):
# Case 1: Valid node data
node_obj = {
'samples': [
{'CPUPercentage': 10, 'MemoryUsageMB': 500,
'NetStats': {'all': {'total_received': 2048, 'total_sent': 2048}},
'DiskIORChar': 2048, 'DiskIOWChar': 2048}
]
}
num_samples, max_cpu_usage, max_memory_usage, total_rx_mbytes, total_tx_mbytes, max_disk_read_mbytes, max_disk_write_mbytes = analysis_cproc.compute_node_metrics(node_obj)
self.assertEqual(num_samples, 1)
self.assertEqual(max_cpu_usage, 10)
self.assertEqual(max_memory_usage, 500)
self.assertEqual(total_rx_mbytes, 0.001953125)
self.assertEqual(total_tx_mbytes, 0.001953125)
self.assertEqual(max_disk_read_mbytes, 0.001953125)
self.assertEqual(max_disk_write_mbytes, 0.001953125)
@patch("analysis_cproc.load_metrics_file")
@patch("analysis_cproc.compute_node_metrics")
def test_compute_process_level_metrics(self, mock_compute_node_metrics, mock_load_metrics_file):
# Case 1: Normal execution
simulation_path = 'path/to/simulation'
config_obj = {'key': 'value'}
mock_load_metrics_file.return_value = (
{
'header': 'header',
'containers': {
'container_1': {
'info': {'processes': [{'binary': 'node-1.toml', 'pid': 1}]},
'samples': [{'PID': 1}]
}
}
},
'info'
)
mock_compute_node_metrics.return_value = (0, 0, 0, 0, 0, 0, 0) # mock_compute_node_metrics should return a tuple
result = analysis_cproc.compute_process_level_metrics(simulation_path, config_obj)
self.assertIsInstance(result, tuple)
self.assertEqual(len(result), 6)
mock_compute_node_metrics.assert_called()
@patch("builtins.open", new_callable=mock_open)
@patch("json.dump")
def test_export_summary(self, mock_json_dump, mock_open):
# Case 1: Normal execution
simulation_path = 'path/to/simulation'
summary = {'key': 'value'}
analysis_cproc.export_summary(simulation_path, summary)
mock_open.assert_called_with(f'{simulation_path}/summary.json', 'w')
mock_json_dump.assert_called_with(summary, mock_open(), indent=4)

View File

@@ -1,24 +0,0 @@
# Python Imports
import unittest
from unittest.mock import patch
# Project Imports
from src import arg_parser
class TestAnalysis(unittest.TestCase):
@patch('sys.argv', ['my_script.py', '-sp', 'test1', '-p', '41234', '-i', 'test2'])
def test_parse_args(self):
sim, port, infra = arg_parser.parse_args()
print(sim, port, infra)
self.assertEqual(sim, "test1")
self.assertEqual(port, "41234")
self.assertEqual(infra, "test2")
@patch('sys.argv', ['my_script.py'])
def test_parse_args_default(self):
sim, port, infra = arg_parser.parse_args()
self.assertEqual(sim, "/simulation_data/")
self.assertEqual(port, None)
self.assertEqual(infra, None)

View File

@@ -1,51 +0,0 @@
INF 2023-05-05 10:07:12.631+00:00 mounting relay protocol topics="waku node" tid=1 file=waku_node.nim:520
DBG 2023-05-05 10:07:12.631+00:00 init waku relay topics="waku relay" tid=1 file=protocol.nim:68
INF 2023-05-05 10:07:12.631+00:00 relay mounted successfully topics="waku node" tid=1 file=waku_node.nim:542
DBG 2023-05-05 10:07:12.631+00:00 subscribe topics="waku node" tid=1 file=waku_node.nim:433 pubsubTopic=topic_S
INF 2023-05-05 10:07:12.631+00:00 mounting libp2p ping protocol topics="waku node" tid=1 file=waku_node.nim:959
INF 2023-05-05 10:07:12.631+00:00 Starting Waku node topics="waku node" tid=1 file=waku_node.nim:1075 version=v0.15.0-23-g67db35
INF 2023-05-05 10:07:12.631+00:00 PeerInfo topics="waku node" tid=1 file=waku_node.nim:1078 peerId=16U*deUs4f addrs=@[]
INF 2023-05-05 10:07:12.631+00:00 Listening on topics="waku node" tid=1 file=waku_node.nim:1085 full=[/ip4/0.0.0.0/tcp/60000/p2p/16Uiu2HAm6vmtUwfj6i2xzV3eStVPdpZDzBgtSNoCGktRYNdeUs4f]
INF 2023-05-05 10:07:12.631+00:00 DNS: discoverable ENR topics="waku node" tid=1 file=waku_node.nim:1086 enr=enr:-Iu4QEx0VTRoPzHD1BBQt-CvC0EuYInxQ2aEnw7vLnn_KMJjLJzxi_TwhTIdxWp6K0cztgZhhnoisuZDoBPv5DXA4VgBgmlkgnY0gmlwhAAAAACJc2VjcDI1NmsxoQKq5RSREqRr-Aor-xIx0dVvddxUN1dm1Z654Q3xXE_EpIN0Y3CC6mCFd2FrdTIB
INF 2023-05-05 10:07:12.632+00:00 starting relay protocol topics="waku node" tid=1 file=waku_node.nim:492
DBG 2023-05-05 10:07:12.632+00:00 start topics="waku relay" tid=1 file=protocol.nim:116
INF 2023-05-05 10:07:12.632+00:00 relay started successfully topics="waku node" tid=1 file=waku_node.nim:513
DBG 2023-05-05 10:07:12.632+00:00 start topics="waku relay" tid=1 file=protocol.nim:116
INF 2023-05-05 10:07:12.632+00:00 Node started successfully topics="waku node" tid=1 file=waku_node.nim:1104
DBG 2023-05-05 10:07:12.632+00:00 Starting relay connectivity loop topics="waku node peer_manager" tid=1 file=peer_manager.nim:489
INF 2023-05-05 10:07:12.632+00:00 Relay peer connections topics="waku node peer_manager" tid=1 file=peer_manager.nim:411 connectedPeers=0 targetConnectedPeers=50 notConnectedPeers=0 outsideBackoffPeers=0
DBG 2023-05-05 10:07:12.632+00:00 Starting prune peerstore loop topics="waku node peer_manager" tid=1 file=peer_manager.nim:482
DBG 2023-05-05 10:07:12.632+00:00 subscribe topics="waku node rest relay_api" tid=1 file=waku_node.nim:444 pubsubTopic=topic_S
DBG 2023-05-05 10:07:12.632+00:00 subscribe topics="waku node rest relay_api" tid=1 file=protocol.nim:132 pubsubTopic=topic_S
INF 2023-05-05 10:07:12.632+00:00 Starting metrics HTTP server topics="waku node metrics" tid=1 file=waku_metrics.nim:29 serverIp=0.0.0.0 serverPort=8008
INF 2023-05-05 10:07:12.632+00:00 Metrics HTTP server started topics="waku node metrics" tid=1 file=waku_metrics.nim:36 serverIp=0.0.0.0 serverPort=8008
DBG 2023-05-05 10:07:19.744+00:00 get_waku_v2_debug_v1_info topics="waku node jsonrpc debug_api" tid=1 file=handlers.nim:21
INF 2023-05-05 10:07:27.633+00:00 Relay peer connections topics="waku node peer_manager" tid=1 file=peer_manager.nim:411 connectedPeers=0 targetConnectedPeers=50 notConnectedPeers=0 outsideBackoffPeers=0
DBG 2023-05-05 10:07:30.995+00:00 Incoming WakuRelay connection topics="waku relay" tid=1 file=protocol.nim:50 connection=16U*4ZwUxQ:6454d5624a8a39a218f7c4e2 protocol=/vac/waku/relay/2.0.0
DBG 2023-05-05 10:07:31.087+00:00 Incoming WakuRelay connection topics="waku relay" tid=1 file=protocol.nim:50 connection=16U*4ZwUxQ:6454d5634a8a39a218f7c4e4 protocol=/vac/waku/relay/2.0.0
DBG 2023-05-05 10:07:33.099+00:00 Incoming WakuRelay connection topics="waku relay" tid=1 file=protocol.nim:50 connection=16U*PavmwL:6454d5654a8a39a218f7c4ea protocol=/vac/waku/relay/2.0.0
DBG 2023-05-05 10:07:33.191+00:00 Incoming WakuRelay connection topics="waku relay" tid=1 file=protocol.nim:50 connection=16U*PavmwL:6454d5654a8a39a218f7c4ec protocol=/vac/waku/relay/2.0.0
DBG 2023-05-05 10:07:33.512+00:00 post_waku_v2_admin_v1_peers topics="waku node jsonrpc admin_api" tid=1 file=handlers.nim:52
DBG 2023-05-05 10:07:33.512+00:00 Dialing peer topics="waku node peer_manager" tid=1 file=peer_manager.nim:101 wireAddr=@[/ip4/82.68.192.4/tcp/60000] peerId=16U*4ZwUxQ failedAttempts=0
DBG 2023-05-05 10:07:33.559+00:00 Dialing peer topics="waku node peer_manager" tid=1 file=peer_manager.nim:101 wireAddr=@[/ip4/82.68.192.5/tcp/60000] peerId=16U*PavmwL failedAttempts=0
TRC 2023-05-05 10:07:33.603+00:00 Adding newly dialed peer to manager topics="waku node peer_manager" tid=1 file=peer_manager.nim:346 peerId=16Uiu2HAm4zMx9t8Qv3a7cKaftALSUdnYELmfrBtGH1QV1pM4M2SU address=/ip4/82.68.192.8/tcp/60000 proto=/vac/waku/relay/2.0.0
TRC 2023-05-05 10:07:33.603+00:00 Adding peer to manager topics="waku node peer_manager" tid=1 file=peer_manager.nim:274 peerId=16U*M4M2SU addresses=@[/ip4/82.68.192.8/tcp/60000]
DBG 2023-05-05 10:07:33.603+00:00 Dialing peer topics="waku node peer_manager" tid=1 file=peer_manager.nim:101 wireAddr=@[/ip4/82.68.192.8/tcp/60000] peerId=16U*M4M2SU failedAttempts=0
DBG 2023-05-05 10:07:33.699+00:00 Incoming WakuRelay connection topics="waku relay" tid=1 file=protocol.nim:50 connection=16U*M4M2SU:6454d5654a8a39a218f7c4f2 protocol=/vac/waku/relay/2.0.0
DBG 2023-05-05 10:07:33.923+00:00 Incoming WakuRelay connection topics="waku relay" tid=1 file=protocol.nim:50 connection=16U*M4M2SU:6454d5654a8a39a218f7c4f7 protocol=/vac/waku/relay/2.0.0
TRC 2023-05-05 10:07:39.967+00:00 waku.relay received topics="waku node" tid=1 file=waku_node.nim:395 peerId=16U*deUs4f pubsubTopic=topic_S hash=0x1220498a4a96d695ed3b9e74199a02164e158252a7774b53ef783774c4e2d4a99fcf receivedTime=1683281259967951872
TRC 2023-05-05 10:07:39.970+00:00 waku.relay received topics="waku node" tid=1 file=waku_node.nim:395 peerId=16U*deUs4f pubsubTopic=topic_S hash=0x12203225437be14dbedeaf29ec89353bb41dbb3f795f94b982b28fffb01beffcd922 receivedTime=1683281259970652672
TRC 2023-05-05 10:07:40.282+00:00 waku.relay received topics="waku node" tid=1 file=waku_node.nim:395 peerId=16U*deUs4f pubsubTopic=topic_S hash=0x12200d76cbc0ba1da64ddb4537f9658f257ce879e4cec9a5e0bf14c21c0a41942ad1 receivedTime=1683281260282779904
TRC 2023-05-05 10:07:40.322+00:00 waku.relay received topics="waku node" tid=1 file=waku_node.nim:395 peerId=16U*deUs4f pubsubTopic=topic_S hash=0x122029b5f6d094cc4cdbea01861812c4ac33c69cac55302f216cf8a4d9f276257ded receivedTime=1683281260322647040
TRC 2023-05-05 10:07:40.822+00:00 waku.relay received topics="waku node" tid=1 file=waku_node.nim:395 peerId=16U*deUs4f pubsubTopic=topic_S hash=0x1220ebaa398fb70fd5d75df57ffbeacdba8ccf0957d43c7b634e12c5c3bbe03b22ad receivedTime=1683281260822106880
TRC 2023-05-05 10:07:40.985+00:00 waku.relay received topics="waku node" tid=1 file=waku_node.nim:395 peerId=16U*deUs4f pubsubTopic=topic_S hash=0x12200888a86ff1d02cf84bc92f6eee6baf27347847b900f6dc9f12bbba3deca2522f receivedTime=1683281260985984256
TRC 2023-05-05 10:07:41.050+00:00 waku.relay received topics="waku node" tid=1 file=waku_node.nim:395 peerId=16U*deUs4f pubsubTopic=topic_S hash=0x1220601b82fb10a8975609ae1551481c369549c37836e2104c80b88c2b4fc9bc9496 receivedTime=1683281261050683648
TRC 2023-05-05 10:07:41.075+00:00 waku.relay received topics="waku node" tid=1 file=waku_node.nim:395 peerId=16U*deUs4f pubsubTopic=topic_S hash=0x12201bc5ec24014103df1f5ea5e36e742941e6d954530e60624bd191ea661db841db receivedTime=1683281261075174144
TRC 2023-05-05 10:07:41.354+00:00 waku.relay received topics="waku node" tid=1 file=waku_node.nim:395 peerId=16U*deUs4f pubsubTopic=topic_S hash=0x12200c31d928bcad93c105330cc30623af7afc3bff1d170469492541e0ddbf90b40e receivedTime=1683281261354184704
TRC 2023-05-05 10:07:41.431+00:00 waku.relay received topics="waku node" tid=1 file=waku_node.nim:395 peerId=16U*deUs4f pubsubTopic=topic_S hash=0x1220c6b6cdd5f3fcba7544f5b74491c07a0de0f5e02a34d5e112d804f4a5eff5559b receivedTime=1683281261431243264
DBG 2023-05-05 10:07:41.623+00:00 post_waku_v2_relay_v1_message topics="waku node jsonrpc relay_api" tid=1 file=handlers.nim:77
TRC 2023-05-05 10:07:41.623+00:00 publish topics="waku node" tid=1 file=protocol.nim:160 pubsubTopic=topic_S
TRC 2023-05-05 10:07:41.623+00:00 waku.relay received topics="waku node" tid=1 file=waku_node.nim:395 peerId=16U*deUs4f pubsubTopic=topic_S hash=0x1220c02ef297a04f121d957679cecf12ab7bcf5a4a7325c2c41b405b950488a98b2d receivedTime=1683281261623202816
TRC 2023-05-05 10:07:41.623+00:00 waku.relay published topics="waku node" tid=1 file=waku_node.nim:484 peerId=16U*deUs4f pubsubTopic=topic_S hash=0x1220c02ef297a04f121d957679cecf12ab7bcf5a4a7325c2c41b405b950488a98b2d publishTime=1683281261623424000
TRC 2023-05-05 10:07:41.817+00:00 waku.relay received topics="waku node" tid=1 file=waku_node.nim:395 peerId=16U*deUs4f pubsubTopic=topic_S hash=0x1220323525214cc401cd6944d9db9a237b47e442ab6c19a9d205456d48fc61d0b96e receivedTime=1683281261817536512
DBG 2023-05-05 10:07:41.818+00:00 post_waku_v2_relay_v1_message

View File

@@ -1,55 +0,0 @@
INF 2023-05-05 10:07:15.792+00:00 mounting relay protocol topics="waku node" tid=1 file=waku_node.nim:520
DBG 2023-05-05 10:07:15.792+00:00 init waku relay topics="waku relay" tid=1 file=protocol.nim:68
INF 2023-05-05 10:07:15.792+00:00 relay mounted successfully topics="waku node" tid=1 file=waku_node.nim:542
DBG 2023-05-05 10:07:15.792+00:00 subscribe topics="waku node" tid=1 file=waku_node.nim:433 pubsubTopic=topic_S
INF 2023-05-05 10:07:15.793+00:00 mounting libp2p ping protocol topics="waku node" tid=1 file=waku_node.nim:959
INF 2023-05-05 10:07:15.793+00:00 Starting Waku node topics="waku node" tid=1 file=waku_node.nim:1075 version=v0.15.0-23-g67db35
INF 2023-05-05 10:07:15.793+00:00 PeerInfo topics="waku node" tid=1 file=waku_node.nim:1078 peerId=16U*5KpGua addrs=@[]
INF 2023-05-05 10:07:15.793+00:00 Listening on topics="waku node" tid=1 file=waku_node.nim:1085 full=[/ip4/0.0.0.0/tcp/60000/p2p/16Uiu2HAmKs8nzp9zrdKfv9JGbdYg91S7D92SkkQhJ6TNuJ5KpGua]
INF 2023-05-05 10:07:15.793+00:00 DNS: discoverable ENR topics="waku node" tid=1 file=waku_node.nim:1086 enr=enr:-Iu4QMPWKj0K-62Z5EfbHPIEmNuxjhkTY6TJ_H2NuNh3tMzYGOENq7hKMuA1JVTFf9ELDi1YgdJA2bUAkwnq4gz1HC4BgmlkgnY0gmlwhAAAAACJc2VjcDI1NmsxoQNrHlMyoVE75h5eX3N7TSOy8B6b4UeS_ILbIvJY-veBPYN0Y3CC6mCFd2FrdTIB
INF 2023-05-05 10:07:15.793+00:00 starting relay protocol topics="waku node" tid=1 file=waku_node.nim:492
DBG 2023-05-05 10:07:15.793+00:00 start topics="waku relay" tid=1 file=protocol.nim:116
INF 2023-05-05 10:07:15.793+00:00 relay started successfully topics="waku node" tid=1 file=waku_node.nim:513
DBG 2023-05-05 10:07:15.793+00:00 start topics="waku relay" tid=1 file=protocol.nim:116
INF 2023-05-05 10:07:15.793+00:00 Node started successfully topics="waku node" tid=1 file=waku_node.nim:1104
DBG 2023-05-05 10:07:15.793+00:00 Starting relay connectivity loop topics="waku node peer_manager" tid=1 file=peer_manager.nim:489
INF 2023-05-05 10:07:15.793+00:00 Relay peer connections topics="waku node peer_manager" tid=1 file=peer_manager.nim:411 connectedPeers=0 targetConnectedPeers=50 notConnectedPeers=0 outsideBackoffPeers=0
DBG 2023-05-05 10:07:15.793+00:00 Starting prune peerstore loop topics="waku node peer_manager" tid=1 file=peer_manager.nim:482
DBG 2023-05-05 10:07:15.793+00:00 subscribe topics="waku node rest relay_api" tid=1 file=waku_node.nim:444 pubsubTopic=topic_S
DBG 2023-05-05 10:07:15.793+00:00 subscribe topics="waku node rest relay_api" tid=1 file=protocol.nim:132 pubsubTopic=topic_S
INF 2023-05-05 10:07:15.793+00:00 Starting metrics HTTP server topics="waku node metrics" tid=1 file=waku_metrics.nim:29 serverIp=0.0.0.0 serverPort=8008
INF 2023-05-05 10:07:15.793+00:00 Metrics HTTP server started topics="waku node metrics" tid=1 file=waku_metrics.nim:36 serverIp=0.0.0.0 serverPort=8008
DBG 2023-05-05 10:07:19.786+00:00 get_waku_v2_debug_v1_info topics="waku node jsonrpc debug_api" tid=1 file=handlers.nim:21
DBG 2023-05-05 10:07:30.723+00:00 Incoming WakuRelay connection topics="waku relay" tid=1 file=protocol.nim:50 connection=16U*4ZwUxQ:6454d562f837b39597b9ab03 protocol=/vac/waku/relay/2.0.0
INF 2023-05-05 10:07:30.794+00:00 Relay peer connections topics="waku node peer_manager" tid=1 file=peer_manager.nim:411 connectedPeers=1 targetConnectedPeers=50 notConnectedPeers=0 outsideBackoffPeers=0
DBG 2023-05-05 10:07:30.811+00:00 Incoming WakuRelay connection topics="waku relay" tid=1 file=protocol.nim:50 connection=16U*4ZwUxQ:6454d562f837b39597b9ab05 protocol=/vac/waku/relay/2.0.0
DBG 2023-05-05 10:07:34.863+00:00 Incoming WakuRelay connection topics="waku relay" tid=1 file=protocol.nim:50 connection=16U*MMKSHQ:6454d566f837b39597b9ab0b protocol=/vac/waku/relay/2.0.0
DBG 2023-05-05 10:07:34.959+00:00 Incoming WakuRelay connection topics="waku relay" tid=1 file=protocol.nim:50 connection=16U*MMKSHQ:6454d566f837b39597b9ab0d protocol=/vac/waku/relay/2.0.0
DBG 2023-05-05 10:07:35.008+00:00 post_waku_v2_admin_v1_peers topics="waku node jsonrpc admin_api" tid=1 file=handlers.nim:52
DBG 2023-05-05 10:07:35.008+00:00 Dialing peer topics="waku node peer_manager" tid=1 file=peer_manager.nim:101 wireAddr=@[/ip4/82.68.192.4/tcp/60000] peerId=16U*4ZwUxQ failedAttempts=0
DBG 2023-05-05 10:07:35.051+00:00 Dialing peer topics="waku node peer_manager" tid=1 file=peer_manager.nim:101 wireAddr=@[/ip4/82.68.192.7/tcp/60000] peerId=16U*MMKSHQ failedAttempts=0
TRC 2023-05-05 10:07:39.967+00:00 waku.relay received topics="waku node" tid=1 file=waku_node.nim:395 peerId=16U*5KpGua pubsubTopic=topic_S hash=0x1220498a4a96d695ed3b9e74199a02164e158252a7774b53ef783774c4e2d4a99fcf receivedTime=1683281259967774720
TRC 2023-05-05 10:07:39.970+00:00 waku.relay received topics="waku node" tid=1 file=waku_node.nim:395 peerId=16U*5KpGua pubsubTopic=topic_S hash=0x12203225437be14dbedeaf29ec89353bb41dbb3f795f94b982b28fffb01beffcd922 receivedTime=1683281259970201088
DBG 2023-05-05 10:07:40.281+00:00 post_waku_v2_relay_v1_message topics="waku node jsonrpc relay_api" tid=1 file=handlers.nim:77
TRC 2023-05-05 10:07:40.281+00:00 publish topics="waku node" tid=1 file=protocol.nim:160 pubsubTopic=topic_S
TRC 2023-05-05 10:07:40.281+00:00 waku.relay received topics="waku node" tid=1 file=waku_node.nim:395 peerId=16U*5KpGua pubsubTopic=topic_S hash=0x12200d76cbc0ba1da64ddb4537f9658f257ce879e4cec9a5e0bf14c21c0a41942ad1 receivedTime=1683281260281698048
TRC 2023-05-05 10:07:40.281+00:00 waku.relay published topics="waku node" tid=1 file=waku_node.nim:484 peerId=16U*5KpGua pubsubTopic=topic_S hash=0x12200d76cbc0ba1da64ddb4537f9658f257ce879e4cec9a5e0bf14c21c0a41942ad1 publishTime=1683281260281901056
TRC 2023-05-05 10:07:40.322+00:00 waku.relay received topics="waku node" tid=1 file=waku_node.nim:395 peerId=16U*5KpGua pubsubTopic=topic_S hash=0x122029b5f6d094cc4cdbea01861812c4ac33c69cac55302f216cf8a4d9f276257ded receivedTime=1683281260322366976
TRC 2023-05-05 10:07:40.821+00:00 waku.relay received topics="waku node" tid=1 file=waku_node.nim:395 peerId=16U*5KpGua pubsubTopic=topic_S hash=0x1220ebaa398fb70fd5d75df57ffbeacdba8ccf0957d43c7b634e12c5c3bbe03b22ad receivedTime=1683281260821294080
TRC 2023-05-05 10:07:40.985+00:00 waku.relay received topics="waku node" tid=1 file=waku_node.nim:395 peerId=16U*5KpGua pubsubTopic=topic_S hash=0x12200888a86ff1d02cf84bc92f6eee6baf27347847b900f6dc9f12bbba3deca2522f receivedTime=1683281260985957376
DBG 2023-05-05 10:07:41.049+00:00 post_waku_v2_relay_v1_message topics="waku node jsonrpc relay_api" tid=1 file=handlers.nim:77
TRC 2023-05-05 10:07:41.049+00:00 publish topics="waku node" tid=1 file=protocol.nim:160 pubsubTopic=topic_S
TRC 2023-05-05 10:07:41.049+00:00 waku.relay received topics="waku node" tid=1 file=waku_node.nim:395 peerId=16U*5KpGua pubsubTopic=topic_S hash=0x1220601b82fb10a8975609ae1551481c369549c37836e2104c80b88c2b4fc9bc9496 receivedTime=1683281261049372416
TRC 2023-05-05 10:07:41.049+00:00 waku.relay published topics="waku node" tid=1 file=waku_node.nim:484 peerId=16U*5KpGua pubsubTopic=topic_S hash=0x1220601b82fb10a8975609ae1551481c369549c37836e2104c80b88c2b4fc9bc9496 publishTime=1683281261049528832
TRC 2023-05-05 10:07:41.075+00:00 waku.relay received topics="waku node" tid=1 file=waku_node.nim:395 peerId=16U*5KpGua pubsubTopic=topic_S hash=0x12201bc5ec24014103df1f5ea5e36e742941e6d954530e60624bd191ea661db841db receivedTime=1683281261075706368
TRC 2023-05-05 10:07:41.353+00:00 waku.relay received topics="waku node" tid=1 file=waku_node.nim:395 peerId=16U*5KpGua pubsubTopic=topic_S hash=0x12200c31d928bcad93c105330cc30623af7afc3bff1d170469492541e0ddbf90b40e receivedTime=1683281261353893888
TRC 2023-05-05 10:07:41.430+00:00 waku.relay received topics="waku node" tid=1 file=waku_node.nim:395 peerId=16U*5KpGua pubsubTopic=topic_S hash=0x1220c6b6cdd5f3fcba7544f5b74491c07a0de0f5e02a34d5e112d804f4a5eff5559b receivedTime=1683281261430506496
TRC 2023-05-05 10:07:41.624+00:00 waku.relay received topics="waku node" tid=1 file=waku_node.nim:395 peerId=16U*5KpGua pubsubTopic=topic_S hash=0x1220c02ef297a04f121d957679cecf12ab7bcf5a4a7325c2c41b405b950488a98b2d receivedTime=1683281261624496640
TRC 2023-05-05 10:07:41.817+00:00 waku.relay received topics="waku node" tid=1 file=waku_node.nim:395 peerId=16U*5KpGua pubsubTopic=topic_S hash=0x1220323525214cc401cd6944d9db9a237b47e442ab6c19a9d205456d48fc61d0b96e receivedTime=1683281261817491456
TRC 2023-05-05 10:07:41.819+00:00 waku.relay received topics="waku node" tid=1 file=waku_node.nim:395 peerId=16U*5KpGua pubsubTopic=topic_S hash=0x12209712985f998221fa811f92c54d2fbce7b7dd4adaa6da4859391b4ee906d6fd4c receivedTime=1683281261819732992
TRC 2023-05-05 10:07:41.897+00:00 waku.relay received topics="waku node" tid=1 file=waku_node.nim:395 peerId=16U*5KpGua pubsubTopic=topic_S hash=0x1220e341c5260c491718eb9a390e7c35fa916c70b8d41fed7ca4d89f4defc72d7569 receivedTime=1683281261897427200
DBG 2023-05-05 10:07:41.946+00:00 post_waku_v2_relay_v1_message topics="waku node jsonrpc relay_api" tid=1 file=handlers.nim:77
TRC 2023-05-05 10:07:41.946+00:00 publish topics="waku node" tid=1 file=protocol.nim:160 pubsubTopic=topic_S
TRC 2023-05-05 10:07:41.946+00:00 waku.relay received topics="waku node" tid=1 file=waku_node.nim:395 peerId=16U*5KpGua pubsubTopic=topic_S hash=0x1220690e1527b2991263463c3b67469fbc06c4a3891bd00a24c294e31d6742e2f3a5 receivedTime=1683281261946718976
TRC 2023-05-05 10:07:41.946+00:00 waku.relay published topics="waku node" tid=1 file=waku_node.nim:484 peerId=16U*5KpGua pubsubTopic=topic_S hash=0x1220690e1527b2991263463c3b67469fbc06c4a3891bd00a24c294e31d6742e2f3a5 publishTime=1683281261946988544
TRC 2023-05-05 10:07:41.973+00:00 waku.relay received

View File

@@ -1,10 +0,0 @@
{
"c5ee88beed421d6899f9f03252253f8dc625c7d2d9d2910b1ad2d41971973158": {
"ts": 1681806688582342645,
"injection_point": "http://151.42.16.3:8545/",
"nonce": 0,
"topic": "topic_C",
"payload": "RJh8R+OkqkkoLq59Pr+XZYbhtPj5hLNxuzurrNDhKI/omHE8IgAqPawT60LMf7yHBaKlTB5iSnDEPAHzgNo+GM5OgGOFu8IJGUWAPx+e4Ek9DXKLtUXM6P5N9P8pcgJ1gCYcWJ/Oq/NbrQ+c590cetn1jXtdmgebH3GsVE55jJ1lLdS+ACiGWYcdf8qAD7ygqGt1VnmKWV3RIMeQVi1TzPWxOPwqiYbrbka36lX9kd9C105YXZ5vxwacF11KCIpjWWSWQ1FQwkW9jLZn8hAw6Rv1d3QMtF3EnQDYSxoEQhMM8bunc3c5tOzDmiLqYv8BGEkJAE/hMrob/sr8bsQ18gya/DAzTAOGuwF6GrxdAGckYPT94HCboBUGsjjClvmoR38orTdxQ4mT6oRqiPkUD75DpSGmFri60WM8Hu3Yo54DmjUhu8DTIrA8SNrioy4CNvXVUmMyuNtP1nv/wyxYisjpf7oqYaI58fkSsyr5QBszr+PCPqk8ODeto+ZQYpxtU/beW9teX+FPgFEYAHzLcVjuFSIcsni/IHlRzQtVghzHi1AmWGn0SFdGckIkU/uDz7O12Hf5VwrEghekk+tQxAU05cZr5R6YXTGyRnTCYKeBDvuh0svUO6K3YqqdYB8HOfhsZ+oHjwrLX9h5HudUijNBGCvuWLu3FN/865WPWvoINKMdfWOIAtt1W93MieGSckjm76J8rO8mVvoNGqQ1s7WcCKzVIz1TlaxK16Zx40KSO22x1eo8lFOUk0xyjuSTnpRq8fmxPKunw68gRr01Ym/iAZ5aYTu/mdqXicq7x/Li9XXfzV5uQ7zEldxI/FEHezoV4W3EpM4hWIiPxctqdjezNTDDcjOfnO1nkl48XZD7DeoUaZ76m3eCeosy+oD6mBIFWyU1VzvaIn3iXXpTFuI97KeazcA1NlClt2J8ECMlWO0INmX3eC9iSDOkfBXSJh57EMMrArBUvSQQjJx+Vr68ghhxp6ZeqSlADoQh9Fwgg1p6KavKBInYakHJtYnhYqPfvgFC6DodFHt7XaOzEQuD3eHcXqmY1FU9l3koqS+dURIXQQwvs6yyXemFxuvsHFyGYa/Gmn0HE/QCuwMBQ5sgIaOmce4j",
"payload_size": 840
}
}

View File

@@ -1,35 +0,0 @@
# Python Imports
import unittest
from unittest.mock import patch
# Project Imports
from src import log_parser
class TestLogParser(unittest.TestCase):
@patch('builtins.open')
@patch('json.load')
def test_load_messages(self, mock_json_load, mock_open):
mock_data = {'key': 'value'}
mock_file = mock_open.return_value.__enter__.return_value
mock_file.read.return_value = 'dummy'
mock_json_load.return_value = mock_data
data = log_parser.load_messages('/path/to/simulation')
mock_open.assert_called_once_with('/path/to/simulation/messages.json', 'r')
mock_json_load.assert_called_once_with(mock_file)
self.assertEqual(data, mock_data)
def test_prepare_node_in_logs(self):
node_pbar = ['node1', 'node2']
topology = {'nodes': {'node1': {'peer_id': 'peer1'}, 'node2': {'peer_id': 'peer2'}}}
node_logs = {}
container_name = 'container1'
log_parser.prepare_node_in_logs(node_pbar, topology, node_logs, container_name)
self.assertEqual(node_logs, {'pee*peer1': {'published': [], 'received': [],
'container_name': 'container1', 'peer_id': 'node1'},
'pee*peer2': {'published': [], 'received': [],
'container_name': 'container1', 'peer_id': 'node2'}})

View File

@@ -1,167 +0,0 @@
# Python Imports
import unittest
from unittest.mock import patch
from prometheus_api_client import PrometheusConnect
# Project Imports
from src import prometheus
from src import plotting_configurations
class TestPrometheus(unittest.TestCase):
@patch('src.prometheus.fetch_metric')
def test_fetch_cadvisor_stats_from_prometheus_by_node(self, mock_fetch_metric):
mock_fetch_metric.side_effect = [
[10, 20, 30]
]
metrics = {
"by_node": ["container_cpu_load_average_10s"]
}
prom = None
container_ip = "192.168.1.1"
start_ts = 1630000000000000000
end_ts = 1630000100000000000
prometheus.fetch_cadvisor_stats_from_prometheus_by_node(metrics, prom, container_ip, start_ts, end_ts)
self.assertEqual(plotting_configurations.plotting_config["container_cpu_load_average_10s"]["values"], [30])
@patch.object(PrometheusConnect, 'custom_query_range')
def test_fetch_metric(self, mock_custom_query_range):
mock_custom_query_range.return_value = [
{
'metric': {'__name__': 'container_memory_usage_bytes'},
'values': [
[1683281260, '5382144'],
[1683281261, '5382144'],
[1683281262, '5382144'],
[1683281263, '6291456'],
[1683281320, '10289152']
]
}
]
prom = PrometheusConnect()
metric_values = prometheus.fetch_metric(prom, None, None, None, None, False)
self.assertEqual(metric_values, [5382144, 5382144, 5382144, 6291456, 10289152])
@patch.object(PrometheusConnect, 'custom_query_range')
def test_fetch_metric_mbytes(self, mock_custom_query_range):
mock_custom_query_range.return_value = [
{
'metric': {'__name__': 'container_memory_usage_bytes'},
'values': [
[1683281260, '5382144'],
[1683281261, '5382144'],
[1683281262, '5382144'],
[1683281263, '6291456'],
[1683281320, '10289152']
]
}
]
prom = PrometheusConnect()
metric_values = prometheus.fetch_metric(prom, None, None, None, None, True)
self.assertEqual(metric_values, [5.1328125, 5.1328125, 5.1328125, 6.0, 9.8125])
@patch.object(PrometheusConnect, 'custom_query_range')
def test_fetch_metric_empty(self, mock_custom_query_range):
mock_custom_query_range.return_value = []
prom = PrometheusConnect()
metric_values = prometheus.fetch_metric(prom, None, None, None, None, True)
self.assertEqual(metric_values, [0])
@patch('src.prometheus.fetch_accumulated_metric_for_all_nodes')
def test_fetch_cadvisor_stats_from_prometheus_by_simulation(self, mock_fetch_accumulated_metric_for_all_nodes):
metrics = {
"by_simulation": [
"container_network_receive_bytes_total",
"container_network_transmit_bytes_total"
]
}
mock_fetch_accumulated_metric_for_all_nodes.side_effect = [[12, 17, 13, 5, 8, 4], [1, 4, 3]]
prom = PrometheusConnect()
prometheus.fetch_cadvisor_stats_from_prometheus_by_simulation(metrics, prom, None, 0, 0)
self.assertEqual(plotting_configurations.plotting_config["container_network_receive_bytes_total"]["values"],
[[12, 17, 13, 5, 8, 4]])
self.assertEqual(plotting_configurations.plotting_config["container_network_transmit_bytes_total"]["values"],
[[1, 4, 3]])
@patch('src.prometheus.fetch_metric_with_timestamp')
def test_fetch_accumulated_metric_for_all_nodes_1(self, mock_fetch_metric_with_timestamp):
mock_fetch_metric_with_timestamp.side_effect = [[[1683281260, 1],
[1683281260, 4],
[1683281261, 3],
[1683281261, 3],
[1683281262, 5],
[1683281263, 4],
[1683281320, 8]]]
prom = PrometheusConnect()
ips = ["test"]
metric_values = prometheus.fetch_accumulated_metric_for_all_nodes(prom, None, ips, None, None, None)
self.assertEqual(metric_values, [5, 6, 5, 4, 8])
@patch('src.prometheus.fetch_metric_with_timestamp')
def test_fetch_accumulated_metric_for_all_nodes_2(self, mock_fetch_metric_with_timestamp):
mock_fetch_metric_with_timestamp.side_effect = [[[1683281260, 1], [1683281260, 4], [1683281261, 3],
[1683281261, 3], [1683281262, 5], [1683281263, 4],
[1683281320, 8]],
[[1683281260, 7], [1683281260, 1], [1683281261, 6],
[1683281261, 5], [1683281262, 8], [1683281263, 1],
[1683281330, 4]]]
prom = PrometheusConnect()
ips = ["test1", "test2"]
metric_values = prometheus.fetch_accumulated_metric_for_all_nodes(prom, None, ips, None, None, None)
self.assertEqual(metric_values, [13, 17, 13, 5, 8, 4])
@patch('src.prometheus.fetch_metric_with_timestamp')
def test_fetch_accumulated_metric_for_all_nodes_empty(self, mock_fetch_metric_with_timestamp):
mock_fetch_metric_with_timestamp.side_effect = [[[0, 0]], [[0, 0]]]
prom = PrometheusConnect()
ips = ["test1", "test2"]
metric_values = prometheus.fetch_accumulated_metric_for_all_nodes(prom, None, ips, None, None, None)
self.assertEqual(metric_values, [0])
@patch.object(PrometheusConnect, 'custom_query_range')
def test_fetch_metric_with_timestamp(self, mock_custom_query_range):
mock_custom_query_range.return_value = [
{
'metric': {'__name__': 'container_memory_usage_bytes'},
'values': [
[1683281260, '5382144'],
[1683281261, '5382144'],
[1683281262, '5382144'],
[1683281263, '6291456'],
[1683281320, '10289152']
]
}
]
prom = PrometheusConnect()
metric_values = prometheus.fetch_metric_with_timestamp(prom, None, None, None, None)
self.assertEqual(metric_values,
[[1683281260, '5382144'], [1683281261, '5382144'], [1683281262, '5382144'],
[1683281263, '6291456'],
[1683281320, '10289152']])
@patch.object(PrometheusConnect, 'custom_query_range')
def test_fetch_metric_with_timestamp_empty_data(self, mock_custom_query_range):
mock_custom_query_range.return_value = []
prom = PrometheusConnect()
metric_values = prometheus.fetch_metric_with_timestamp(prom, None, None, None, None)
self.assertEqual(metric_values, [[0, 0]])

View File

@@ -1,62 +0,0 @@
# Python Imports
import unittest
from unittest.mock import patch
# Project Imports
from src import topology
class TestTopology(unittest.TestCase):
@patch('builtins.open')
@patch('tomllib.load')
def test_load_topics_nwaku(self, mock_toml_load, mock_open):
mock_node_info = {
"node_config": "config.toml",
"image": "nim-waku",
}
mock_nodes = {"node1": {}, "node2": {}}
mock_toml_config = {"topics": "topic1 topic2 topic3"}
mock_toml_load.return_value = mock_toml_config
topology._load_topics(mock_node_info, mock_nodes, "node1", "/path/to/tomls/")
mock_open.assert_called_once_with('/path/to/tomls/config.toml', mode='rb')
mock_toml_load.assert_called_once_with(mock_open.return_value.__enter__.return_value)
self.assertListEqual(mock_nodes["node1"]["topics"], ["topic1", "topic2", "topic3"])
@patch('builtins.open')
@patch('tomllib.load')
def test_load_topics_gowaku(self, mock_toml_load, mock_open):
mock_node_info = {
"node_config": "config.toml",
"image": "go-waku",
}
mock_nodes = {"node1": {}, "node2": {}}
mock_toml_config = {"topics": ["topic1", "topic2", "topic3"]}
mock_toml_load.return_value = mock_toml_config
topology._load_topics(mock_node_info, mock_nodes, "node1", "/path/to/tomls/")
mock_open.assert_called_once_with('/path/to/tomls/config.toml', mode='rb')
mock_toml_load.assert_called_once_with(mock_open.return_value.__enter__.return_value)
self.assertListEqual(mock_nodes["node1"]["topics"], ["topic1", "topic2", "topic3"])
@patch('builtins.open')
@patch('tomllib.load')
def test_load_topics_into_topology(self, mock_toml_load, mock_open):
nodes = {
"nodes": {
"node1": {
"node_config": "config.toml",
"image": "nim-waku",
},
"node2": {
"node_config": "config.toml",
"image": "nim-waku",
}}
}
mock_toml_config = {"topics": "topic1 topic2 topic3"}
mock_toml_load.return_value = mock_toml_config
topology.load_topics_into_topology(nodes, "")
self.assertEqual(nodes["nodes"]["node1"]["topics"], ["topic1", "topic2", "topic3"])
self.assertEqual(nodes["nodes"]["node2"]["topics"], ["topic1", "topic2", "topic3"])