Files
wakurtosis/analysis.py

704 lines
27 KiB
Python

#!/usr/bin/env python3
"""
Description: Wakurtosis simulation analysis
"""
import os
import subprocess
""" Dependencies """
import sys, logging, json, argparse, tomllib, glob, re, statistics
from datetime import datetime
from tqdm_loggable.auto import tqdm
from tqdm_loggable.tqdm_logging import tqdm_logging
import matplotlib.pyplot as plt
from datetime import datetime
import numpy as np
from prometheus_api_client import PrometheusConnect
""" Globals """
G_APP_NAME = 'WLS-ANALYSIS'
G_LOG_LEVEL = 'INFO'
G_DEFAULT_CONFIG_FILE = './config/config.json'
G_DEFAULT_TOPOLOGY_PATH = './config/topology_generated'
G_DEFAULT_SIMULATION_PATH = './wakurtosis_logs'
G_DEFAULT_CONTAINER_FIG_FILENAME = 'container_analysis.pdf'
G_DEFAULT_NODES_FIG_FILENAME = 'node_analysis.pdf'
G_DEFAULT_SUMMARY_FILENAME = 'summary.json'
G_DEFAULT_METRICS_FILENAME = './monitoring/metrics.json'
G_LOGGER = None
""" Custom logging formatter """
class CustomFormatter(logging.Formatter):
# Set different formats for every logging level
time_name_stamp = "[%(asctime)s.%(msecs)03d] [" + G_APP_NAME + "]"
FORMATS = {
logging.ERROR: time_name_stamp + " ERROR in %(module)s.py %(funcName)s() %(lineno)d - %(msg)s",
logging.WARNING: time_name_stamp + " WARNING - %(msg)s",
logging.CRITICAL: time_name_stamp + " CRITICAL in %(module)s.py %(funcName)s() %(lineno)d - %(msg)s",
logging.INFO: time_name_stamp + " %(msg)s",
logging.DEBUG: time_name_stamp + " %(funcName)s() %(msg)s",
'DEFAULT': time_name_stamp + " %(msg)s",
}
def format(self, record):
log_fmt = self.FORMATS.get(record.levelno, self.FORMATS['DEFAULT'])
formatter = logging.Formatter(log_fmt, '%d-%m-%Y %H:%M:%S')
return formatter.format(record)
def plot_figure_ex(msg_propagation_times, cpu_usage, memory_usage, network_usage, disk_usage, injection_times, simulation_summary, simulation_config):
def style_violin(parts, ax):
# Change the extrema lines to dashed grey lines
for line in parts['cmaxes'].get_segments() + parts['cmins'].get_segments():
line_obj = plt.Line2D(line[:, 0], line[:, 1], color='grey', linestyle='dashed', linewidth=0.5)
ax.add_line(line_obj)
# Remove the original extrema lines
parts['cmaxes'].set_visible(False)
parts['cmins'].set_visible(False)
# Change the vertical lines to dashed grey lines
for line in parts['cbars'].get_segments():
line_obj = plt.Line2D(line[:, 0], line[:, 1], color='grey', linestyle='dashed', linewidth=0.5)
ax.add_line(line_obj)
# Remove the original vertical lines
parts['cbars'].set_visible(False)
cmean_colors = parts['cmeans'].get_color()
colors = [cmean_colors[0],'red',cmean_colors[0],cmean_colors[0]]
parts['cmeans'].set_color(colors)
# loop over the paths of the mean lines
xy = [[l.vertices[:,0].mean(),l.vertices[0,1]] for l in parts['cmeans'].get_paths()]
xy = np.array(xy)
ax.scatter(xy[:,0], xy[:,1],s=25, c="crimson", marker="o", zorder=3)
# make lines invisible
parts['cmeans'].set_visible(False)
fig, ((ax1, ax2, ax3), (ax4, ax5, ax6)) = plt.subplots(2, 3, figsize=(15, 15))
parts = ax1.violinplot(msg_propagation_times, showmeans=True)
ax1.set_title('Popagation Time (per message)')
ax1.set_ylabel('Propagation Time (ms)')
ax1.spines[['right', 'top']].set_visible(False)
ax1.axes.xaxis.set_visible(False)
style_violin(parts, ax1)
parts = ax2.violinplot(cpu_usage, showmeans=True)
ax2.set_title('Peak CPU Usage (per node)')
ax2.set_ylabel('CPU Usage (%)')
ax2.spines[['right', 'top']].set_visible(False)
ax2.axes.xaxis.set_visible(False)
style_violin(parts, ax2)
parts = ax3.violinplot(memory_usage, showmeans=True)
ax3.set_title('Peak Memory Usage (per node)')
ax3.set_ylabel('Memory (MBytes)')
ax3.spines[['right', 'top']].set_visible(False)
ax3.axes.xaxis.set_visible(False)
style_violin(parts, ax3)
parts = ax4.violinplot([network_usage['rx_mbytes'], network_usage['tx_mbytes']], showmeans=True)
ax4.set_title('Total Netowrk IO (per node)')
ax4.set_ylabel('Bandwidth (MBytes)')
ax4.spines[['right', 'top']].set_visible(False)
ax4.set_xticks([1, 2])
ax4.set_xticklabels(['Received (Rx)', 'Sent (Tx)'])
style_violin(parts, ax4)
parts = ax5.violinplot(injection_times, showmeans=True)
ax5.set_title('Injection Time (per message)')
ax5.set_ylabel('Milliseconds (ms)')
ax5.spines[['right', 'top']].set_visible(False)
ax5.axes.xaxis.set_visible(False)
style_violin(parts, ax5)
parts = ax6.violinplot([disk_usage['disk_read_mbytes'], disk_usage['disk_write_mbytes']], showmeans=True)
ax6.set_title('Peak Disk IO (per node)')
ax6.set_ylabel('Disk IO (MBytes)')
ax6.spines[['right', 'top']].set_visible(False)
ax6.set_xticks([1, 2])
ax6.set_xticklabels(['Read', 'Write'])
style_violin(parts, ax6)
fig.suptitle('Wakurtosis Simulation Node Level Analysis\n(%d nodes, %d topic(s), Rate: %d msg/s, Time: %.2f s. Sampling Rate: %.2f samples/s.)\n' %(simulation_summary['num_nodes'], \
simulation_summary['num_topics'], simulation_config['wls']['message_rate'], simulation_summary['simulation_time_ms'] / 1000.0, \
simulation_summary['metrics']['esr']), fontsize=20)
plt.tight_layout()
figure_path = '%s/%s' %(G_DEFAULT_SIMULATION_PATH, G_DEFAULT_NODES_FIG_FILENAME)
plt.savefig(figure_path, format="pdf", bbox_inches="tight")
G_LOGGER.info('Nodes analysis figure saved in %s' %figure_path)
def plot_figure(msg_propagation_times, cpu_usage, memory_usage, bandwith_in, bandwith_out):
fig, (ax1, ax2, ax3, ax4, ax5) = plt.subplots(1, 5, figsize=(15, 10))
if msg_propagation_times:
ax1.violinplot(msg_propagation_times, showmedians=True)
ax1.set_title('Message propagation times \n(sample size: %d messages)' %len(msg_propagation_times))
ax1.set_ylabel('Propagation Time (ms)')
ax1.spines[['right', 'top']].set_visible(False)
ax1.axes.xaxis.set_visible(False)
if cpu_usage:
ax2.violinplot(cpu_usage, showmedians=True)
ax2.set_title('Maximum CPU usage per Waku node \n(sample size: %d nodes)' %len(cpu_usage))
ax2.set_ylabel('CPU Cycles')
ax2.spines[['right', 'top']].set_visible(False)
ax2.axes.xaxis.set_visible(False)
if memory_usage:
ax3.violinplot(memory_usage, showmedians=True)
ax3.set_title('Maximum memory usage per Waku node \n(sample size: %d nodes)' %len(memory_usage))
ax3.set_ylabel('Bytes')
ax3.spines[['right', 'top']].set_visible(False)
ax3.axes.xaxis.set_visible(False)
if bandwith_in:
ax4.violinplot(bandwith_in, showmedians=True)
ax4.set_title('Bandwith IN usage per Waku node \n(sample size: %d nodes)' %len(memory_usage))
ax4.set_ylabel('Bytes')
ax4.spines[['right', 'top']].set_visible(False)
ax4.axes.xaxis.set_visible(False)
if bandwith_out:
ax5.violinplot(bandwith_out, showmedians=True)
ax5.set_title('Bandwith IN usage per Waku node \n(sample size: %d nodes)' %len(memory_usage))
ax5.set_ylabel('Bytes')
ax5.spines[['right', 'top']].set_visible(False)
ax5.axes.xaxis.set_visible(False)
plt.tight_layout()
figure_path = '%s/%s' %(G_DEFAULT_SIMULATION_PATH, G_DEFAULT_CONTAINER_FIG_FILENAME)
plt.savefig(figure_path, format="pdf", bbox_inches="tight")
G_LOGGER.info('Figure saved in %s' %figure_path)
def prometheus_connect(prometheus_port=52118):
prometheus = subprocess.check_output("kurtosis enclave inspect wakurtosis | grep '\\<prometheus\\>' | awk '{print $6}'", shell=True)
url = f'http://{prometheus[:-1].decode("utf-8") }'
try:
G_LOGGER.debug('Connecting to Prometheus server in %s' %url)
prometheus = PrometheusConnect(url, disable_ssl=True)
return prometheus
except Exception as e:
G_LOGGER.error('%s: %s' % (e.__doc__, e))
return None
def fetch_cadvisor_stats_from_prometheus(prometheus, container_ip, start_ts, end_ts):
metrics = prometheus.get_label_values("__name__")
# print(metrics)
start_timestamp = datetime.utcfromtimestamp(start_ts / 1e9)
end_timestamp = datetime.fromtimestamp(end_ts / 1e9)
# container_network_transmit_bytes_total{container_label_com_kurtosistech_private_ip = "212.209.64.2"}
kurtosis_ip_template = "container_label_com_kurtosistech_private_ip"
cpu = prometheus.custom_query_range(f"container_cpu_load_average_10s{{{kurtosis_ip_template} "
f"= '{container_ip}'}}", start_time=start_timestamp,
end_time=end_timestamp, step="1s")
cpu = [int(cpu[0]['values'][i][1]) for i in range(len(cpu[0]['values']))]
mem = prometheus.custom_query_range(f"container_memory_usage_bytes{{{kurtosis_ip_template} "
f"= '{container_ip}'}}", start_time=start_timestamp,
end_time=end_timestamp, step="1s")
mem = [int(mem[0]['values'][i][1]) for i in range(len(mem[0]['values']))]
net_in = prometheus.custom_query_range(f"container_network_receive_bytes_total{{{kurtosis_ip_template}"
f"= '{container_ip}'}}", start_time=start_timestamp,
end_time=end_timestamp, step="1s")
net_in = [int(net_in[0]['values'][i][1]) for i in range(len(net_in[0]['values']))]
net_out = prometheus.custom_query_range(f"container_network_transmit_bytes_total{{{kurtosis_ip_template} "
f"= '{container_ip}'}}", start_time=start_timestamp,
end_time=end_timestamp, step="1s")
net_out = [int(net_out[0]['values'][i][1]) for i in range(len(net_out[0]['values']))]
return {'cpu_usage': cpu, 'memory_usage': mem, 'bandwidth_in': net_in, 'bandwidth_out': net_out}
def _load_topics(node_info, nodes, node):
topics = None
with open("config/topology_generated/" + node_info["node_config"], mode='rb') as read_file:
toml_config = tomllib.load(read_file)
if node_info["image"] == "nim-waku":
topics = list(toml_config["topics"].split(" "))
elif node_info["image"] == "go-waku":
topics = toml_config["topics"]
else:
raise ValueError("Unknown image type")
# Load topics into topology for easier access
nodes[node]["topics"] = topics
def load_topics_into_topology(topology):
""" Load Topics """
nodes = topology["nodes"]
for node, node_info in nodes.items():
try:
_load_topics(node_info, nodes, node)
except ValueError as e:
G_LOGGER.error('%s: %s' % (e.__doc__, e))
sys.exit()
G_LOGGER.info('Loaded nodes topics from toml files')
def load_topology(topology_file):
""" Load topology """
with open(topology_file, 'r') as read_file:
topology = json.load(read_file)
G_LOGGER.debug(topology)
G_LOGGER.info('Topology loaded')
return topology
def load_messages(simulation_path):
try:
with open(f'{simulation_path}/messages.json', 'r') as f:
injected_msgs_dict = json.load(f)
except OSError as e:
G_LOGGER.error('%s: %s' % (e.__doc__, e))
sys.exit()
G_LOGGER.info(f'Loaded {len(injected_msgs_dict)} messages.')
return injected_msgs_dict
def compare_tss(tss, min_tss, max_tss):
if tss < min_tss:
min_tss = tss
elif tss > max_tss:
max_tss = tss
return min_tss, max_tss
def _innit_logger():
global G_LOGGER
""" Init Logging """
G_LOGGER = logging.getLogger(G_APP_NAME)
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(CustomFormatter())
G_LOGGER.addHandler(handler)
tqdm_logging.set_level(logging.INFO)
# Set loglevel from config
G_LOGGER.setLevel(G_LOG_LEVEL)
handler.setLevel(G_LOG_LEVEL)
G_LOGGER.info('Started')
def _parse_args():
""" Parse command line args """
parser = argparse.ArgumentParser()
parser.add_argument("-sp", "--simulation_path", help="Simulation results path",
default=G_DEFAULT_SIMULATION_PATH)
args = parser.parse_args()
simulation_path = args.simulation_path
return simulation_path
def get_relay_line_info(log_line):
msg_topics = re.search(r'topics="([^"]+)"', log_line).group(1)
msg_topic = re.search(r'pubsubTopic=([^ ]+)', log_line).group(1)
msg_hash = re.search(r'hash=([^ ]+)', log_line).group(1)
msg_peer_id = re.search(r'peerId=([^ ]+)', log_line).group(1)
return msg_topics, msg_topic, msg_hash, msg_peer_id
def analyze_published(log_line, node_logs, msgs_dict, msg_publishTime):
msg_topics, msg_topic, msg_hash, msg_peer_id = get_relay_line_info(log_line)
node_logs[msg_peer_id]['published'].append([msg_publishTime, msg_topics, msg_topic, msg_hash])
if msg_hash not in msgs_dict:
msgs_dict[msg_hash] = {'published': [{'ts': msg_publishTime, 'peer_id': msg_peer_id}],
'received': []}
else:
msgs_dict[msg_hash]['published'].append(
{'ts': msg_publishTime, 'peer_id': msg_peer_id})
def analyze_received(log_line, node_logs, msgs_dict, msg_receivedTime):
msg_topics, msg_topic, msg_hash, msg_peer_id = get_relay_line_info(log_line)
node_logs[msg_peer_id]['received'].append([msg_receivedTime, msg_topics, msg_topic, msg_hash])
if msg_hash not in msgs_dict:
msgs_dict[msg_hash] = {'published': [], 'received': [
{'ts': msg_receivedTime, 'peer_id': msg_peer_id}]}
else:
msgs_dict[msg_hash]['received'].append(
{'ts': msg_receivedTime, 'peer_id': msg_peer_id})
def parse_lines_in_file(file, node_logs, msgs_dict, min_tss, max_tss):
for log_line in file:
if 'waku.relay' in log_line:
if 'published' in log_line:
msg_publishTime = int(re.search(r'publishTime=([\d]+)', log_line).group(1))
analyze_published(log_line, node_logs, msgs_dict, msg_publishTime)
min_tss, max_tss = compare_tss(msg_publishTime, min_tss, max_tss)
elif 'received' in log_line:
msg_receivedTime = int(re.search(r'receivedTime=([\d]+)', log_line).group(1))
analyze_received(log_line, node_logs, msgs_dict, msg_receivedTime)
min_tss, max_tss = compare_tss(msg_receivedTime, min_tss, max_tss)
return min_tss, max_tss
def open_file(folder):
try:
file = open(f'{folder[0]}/output.log', mode='r')
except OSError as e:
G_LOGGER.error(f'{e.__doc__}: {e}')
sys.exit()
return file
def prepare_node_in_logs(node_pbar, topology, node_logs, container_name):
for node in node_pbar:
node_info = topology["nodes"][node]
peer_id = node_info["peer_id"][:3] + "*" + node_info["peer_id"][-6:]
node_logs[peer_id] = {'published': [], 'received': [],
'container_name': container_name, 'peer_id': node}
def analyze_containers(topology, simulation_path):
node_logs = {}
msgs_dict = {}
max_tss = -sys.maxsize - 1
min_tss = sys.maxsize
# print(topology["containers"])
for container_name, container_info in topology["containers"].items():
node_pbar = tqdm(container_info["nodes"])
node_pbar.set_description(f"Parsing log of container {container_name}")
prepare_node_in_logs(node_pbar, topology, node_logs, container_name)
folder = glob.glob(f'{simulation_path}/{container_name}--*')
if len(folder) > 1:
raise RuntimeError(f"Error: Multiple containers with same name: {folder}")
file = open_file(folder)
min_tss, max_tss = parse_lines_in_file(file, node_logs, msgs_dict, min_tss, max_tss)
file.close()
return node_logs, msgs_dict, min_tss, max_tss
def compute_message_latencies(msgs_dict):
# Compute message latencies and propagation times througout the network
pbar = tqdm(msgs_dict.items())
for msg_id, msg_data in pbar:
# NOTE: Carefull here as I am assuming that every message is published once ...
if len(msg_data['published']) > 1:
G_LOGGER.warning('Several publishers of message %s')
published_ts = int(msg_data['published'][0]['ts'])
peer_id = msg_data['published'][0]['peer_id']
pbar.set_description('Computing latencies of message %s' % msg_id)
# Compute latencies
latencies = []
for received_data in msg_data['received']:
# Skip self
if received_data['peer_id'] == peer_id:
continue
# NOTE: We are getting some negative latencies meaning that the message appears to be received before it was sent ...
# I assume this must be because those are the nodes that got the message injected in the first place
# TLDR: Should be safe to ignore all the negative latencies
latency = int(received_data['ts']) - published_ts
peer_id = msg_data['published'][0]['peer_id']
latencies.append(latency)
msgs_dict[msg_id]['latencies'] = latencies
def compute_propagation_times(msgs_dict):
msg_propagation_times = []
pbar = tqdm(msgs_dict.items())
for msg_id, msg_data in pbar:
pbar.set_description('Computing propagation time of message %s' % msg_id)
msg_propagation_times.append(round(max(msg_data['latencies']) / 1000000))
return msg_propagation_times
def compute_injection_times(injected_msgs_dict):
return [msg['injection_time'] for msg in injected_msgs_dict.values() if msg['status'] == 200]
def get_hardware_metrics(topology, node_logs, min_tss, max_tss):
prometheus = prometheus_connect()
if not prometheus:
return None, None, None, None
# Fetch Hardware metrics from Node containers
cpu_usage = []
memory_usage = []
bandwith_in = []
bandwith_out = []
node_container_ips = [info["kurtosis_ip"] for info in topology["containers"].values()]
pbar = tqdm(node_container_ips)
for container_ip in pbar:
pbar.set_description(f'Fetching hardware stats from container {container_ip}')
try:
container_stats = fetch_cadvisor_stats_from_prometheus(prometheus, container_ip, min_tss, max_tss)
except Exception as e:
G_LOGGER.error('%s: %s' % (e.__doc__, e))
continue
# NOTE: Here we could also choose a different statistic such as mean or average instead of max
cpu_usage.append(max(container_stats['cpu_usage']))
memory_usage.append(max(container_stats['memory_usage']))
bandwith_in.append(max(container_stats['bandwidth_in']))
bandwith_out.append(max(container_stats['bandwidth_out']))
return cpu_usage, memory_usage, bandwith_in, bandwith_out
def compute_message_delivery(msgs_dict, injected_msgs_dict):
# Compute message delivery
total_messages = len(injected_msgs_dict)
delivered_messages = len(msgs_dict)
lost_messages = total_messages - delivered_messages
delivery_rate = delivered_messages * 100 / total_messages
G_LOGGER.info(f'{delivered_messages} of {total_messages} messages delivered. '
f'Lost: {lost_messages}. Delivery rate {delivery_rate}')
def extract_node_id(s: str) -> str:
pattern = r"node-(\d+)\.toml"
match = re.search(pattern, s)
if match:
return f"node_{match.group(1)}"
else:
return None
def load_process_level_metrics(metrics_file_path: str):
metrics_dict = {}
try:
with open(metrics_file_path, 'r') as file:
metrics_obj = json.load(file)
info = metrics_obj['header']
all_samples = metrics_obj['containers']
nodes_cnt = 0
if len(all_samples) != info['num_containers']:
G_LOGGER.error('Number of containers in header does not match number of containers in samples')
return {}, None
for container_id, container_data in all_samples.items():
# tomls file names are unique per node
container_nodes = {}
for process in container_data['info']['processes']:
node_id = extract_node_id(process['binary'])
if not node_id:
G_LOGGER.error('Couldn\'t match %s to node id in container %s' %(process['binary'], container_id))
continue
pid = process['pid']
container_nodes[pid] = node_id
# Parse samples for each node
for sample in container_data['samples']:
if sample['PID'] not in container_nodes:
G_LOGGER.error('Couldn\'t find node id for PID %d in container %s' %(sample['PID'], container_id))
continue
node_id = container_nodes[sample['PID']]
if not node_id:
G_LOGGER.error('Couldn\'t find node id for PID %d in container %s' %(sample['PID'], container_id))
continue
if node_id in metrics_dict:
metrics_dict[node_id]['samples'].append(sample)
else:
nodes_cnt += 1
metrics_dict[node_id] = {'samples' : [sample]}
except Exception as e:
G_LOGGER.error('%s: %s' % (e.__doc__, e))
sys.exit()
G_LOGGER.info('Loaded metrics for %d nodes.' %len(metrics_dict))
# for node_id, node_data in metrics_dict.items():
# G_LOGGER.info('Node %s has %d samples' %(node_id, len(node_data['samples'])))
return metrics_dict, info
def build_summary(metrics_info, simulation_config, msgs_dict, node_logs, topics, min_tss, max_tss, avg_samples_per_node):
simulation_summary = {'general' : {}, 'nodes' : {}, 'messages' : {}, 'parameters' : {}}
simulation_summary['general']['datetime'] = datetime.now().isoformat()
simulation_summary['general']['num_messages'] = len(msgs_dict)
simulation_summary['general']['num_nodes'] = len(node_logs)
simulation_summary['general']['num_topics'] = len(topics)
simulation_summary['general']['topics'] = list(topics)
# Compute effective simulation time window
simulation_start_ts = min_tss
simulation_end_ts = max_tss
simulation_time_ms = round((simulation_end_ts - simulation_start_ts) / 1000000)
simulation_summary['general']['simulation_start_ts'] = simulation_start_ts
simulation_summary['general']['simulation_end_ts'] = simulation_end_ts
simulation_summary['general']['simulation_time_ms'] = simulation_time_ms
simulation_summary['general']['metrics'] = metrics_info
simulation_summary['general']['metrics']['avg_samples_per_node'] = avg_samples_per_node
simulation_summary['general']['metrics']['esr'] = simulation_summary['general']['metrics']['avg_samples_per_node'] / (simulation_summary['general']['simulation_time_ms'] / 1000.0)
# Load Simulation Parameters
try:
with open(G_DEFAULT_CONFIG_FILE, "r") as read_file:
simulation_summary['parameters'] = json.load(read_file)
except Exception as e:
G_LOGGER.error('%s: %s' % (e.__doc__, e))
return simulation_summary
def compute_process_level_metrics():
""" Load Metrics """
node_metrics, metrics_info = load_process_level_metrics(G_DEFAULT_METRICS_FILENAME)
""" Compute Metrics """
max_cpu_usage = []
max_memory_usage = []
total_network_usage = {'rx_mbytes' : [], 'tx_mbytes' : []}
max_disk_usage = {'disk_read_mbytes' : [], 'disk_write_mbytes' : []}
num_samples = []
for node_id, node_obj in node_metrics.items():
num_samples.append(len(node_obj['samples']))
# Peak values
max_cpu_usage.append(max(obj['CPUPercentage'] for obj in node_obj['samples']))
max_memory_usage.append(max(obj['MemoryUsageMB'] for obj in node_obj['samples']))
# Accumulated
total_network_usage['rx_mbytes'].append(max(obj['NetStats']['all']['total_received'] for obj in node_obj['samples']) / (1024*1024))
total_network_usage['tx_mbytes'].append(max(obj['NetStats']['all']['total_sent'] for obj in node_obj['samples']) / (1024*1024))
# Accumulated
max_disk_usage['disk_read_mbytes'].append(max(obj['DiskIORChar'] for obj in node_obj['samples']) / (1024*1024))
max_disk_usage['disk_write_mbytes'].append(max(obj['DiskIOWChar'] for obj in node_obj['samples']) / (1024*1024))
avg_samples_per_node = statistics.mean(num_samples)
return metrics_info, max_cpu_usage, max_memory_usage, total_network_usage, max_disk_usage, avg_samples_per_node
def main():
_innit_logger()
simulation_path = _parse_args()
""" Load Topics Structure """
topology = load_topology(simulation_path + "/network_data.json")
load_topics_into_topology(topology)
""" Load Simulation Messages """
injected_msgs_dict = load_messages(simulation_path)
node_logs, msgs_dict, min_tss, max_tss = analyze_containers(topology, simulation_path)
# Compute simulation time window
simulation_time_ms = round((max_tss - min_tss) / 1000000)
G_LOGGER.info(f'Simulation started at {min_tss}, ended at {max_tss}. '
f'Effective simulation time was {simulation_time_ms} ms.')
compute_message_delivery(msgs_dict, injected_msgs_dict)
compute_message_latencies(msgs_dict)
msg_propagation_times = compute_propagation_times(msgs_dict)
msg_injection_times = compute_injection_times(injected_msgs_dict)
# Pull hardware metrics from cAdvisor at a container level
cpu_usage, memory_usage, bandwith_in, bandwith_out = get_hardware_metrics(topology, node_logs, min_tss, max_tss)
# Generate Figure
plot_figure(msg_propagation_times, cpu_usage, memory_usage, bandwith_in, bandwith_out)
# Pull metrics from process level monitoring
if os.path.exists(G_DEFAULT_METRICS_FILENAME):
metrics_info, max_cpu_usage, max_memory_usage, total_network_usage, max_disk_usage, avg_samples_per_node = compute_process_level_metrics()
# Build simulation summary
summary = build_summary(metrics_info, topology, msgs_dict, node_logs, [], min_tss, max_tss, avg_samples_per_node)
# Plot figure
plot_figure_ex(msg_propagation_times, max_cpu_usage, max_memory_usage, total_network_usage, max_disk_usage,
msg_injection_times, summary['general'], summary['parameters'])
# Generate summary
summary_path = '%s/%s' %(G_DEFAULT_SIMULATION_PATH, G_DEFAULT_SUMMARY_FILENAME)
with open(summary_path, 'w') as fp:
json.dump(summary, fp, indent=4)
G_LOGGER.info('Analsysis sumnmary saved in %s' %summary_path)
else:
G_LOGGER.info('No metrics file found. Skipping process level metrics.')
""" We are done """
G_LOGGER.info('Ended')
if __name__ == "__main__":
main()