Resolved conflicts with main

This commit is contained in:
Alberto Soutullo
2023-06-09 17:38:49 +02:00
12 changed files with 996 additions and 239 deletions

View File

@@ -28,4 +28,5 @@ ENV PATH="/opt/venv/bin:$PATH"
ENV PYTHONPATH "${PYTHONPATH}:src"
# Set the entrypoint
# `docker run -it analysis /bin/sh` vs `docker run -it --entrypoint /bin/sh analysis` ?
ENTRYPOINT ["python"]

Binary file not shown.

View File

@@ -0,0 +1,634 @@
import typer
import sys
import os
import stat
import math
from pathlib import Path
import time
import json
import networkx as nx
import re
import logging as log
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages
from sklearn.cluster import KMeans
from src import vars
from src import topology
from src import log_parser
from src import analysis
# check if the path exists and is of appropriate type
def path_ok(path : Path, isDir=False):
if not path.exists():
log.error(f'"{path}" does not exist')
return False
mode = path.stat().st_mode
if not isDir and not stat.S_ISREG(mode):
log.error(f'File expected: "{path}" is not a file')
return False
if isDir and not stat.S_ISDIR(mode):
log.error(f'Directory expected: "{path}" is not a directory')
return False
# lay off the permission checks; resolve them lazily with open
return True
# define singleton
class Singleton(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]
# convert human readable sizes to bytes
class Human2BytesConverter(metaclass=Singleton):
def __init__(self): # add any human readable format/size and multiplier here
self.letters = {}
self.letters[3] = {'GiB' : 1024*1024*1024, 'MiB' : 1024*1024, 'KiB' : 1024}
self.letters[2] = {'GB' : 1024*1024*1024, 'MB' : 1024*1024, 'KB' : 1024,
'gB' : 1000*1000*1000, 'mB' : 1000*1000, 'kB' : 1000}
self.letters[1] = {'B':1}
def convert(self, value):
for i in [3, 2, 1]:
k = value[-i:]
if k in self.letters[i]:
return float(value[:-i]) * self.letters[i][k]
return np.nan
# Base class for plots and common helpers
class Plots(metaclass=Singleton):
def __init__(self, log_dir, oprefix, jf, to_plot, cfile):
self.log_dir, self.oprefix = log_dir, oprefix
self.df, self.n, self.keys, self.cols = pd.DataFrame(), 0, [], []
self.col2title, self.col2units, self.key2nodes = {}, {}, {}
self.msg_settling_times, self.msg_injection_times = {}, {}
self.grp2idx, self.idx2grp = {}, {}
self.fig, self.axes = "", ""
self.json_fname, self.G = jf, nx.empty_graph()
self.to_plot, self.to_compare = to_plot, []
self.run_summary, self.cfile = "", cfile
# waku log processing
def compute_msg_settling_times(self):
ldir = str(self.log_dir)
topology_info = topology.load_json(f'{ldir}/{vars.G_TOPOLOGY_FILE_NAME}')
topology.load_topics_into_topology(topology_info, f'{ldir}/config/topology_generated/')
injected_msgs_dict = log_parser.load_messages(ldir)
node_logs, msgs_dict, min_tss, max_tss = analysis.analyze_containers(topology_info, ldir)
simulation_time_ms = round((max_tss - min_tss) / 1000000)
log.info((f'Simulation started at {min_tss}, ended at {max_tss}. '
f'Effective simulation time was {simulation_time_ms} ms.'))
analysis.compute_message_delivery(msgs_dict, injected_msgs_dict)
analysis.compute_message_latencies(msgs_dict)
self.msg_settling_times = analysis.compute_propagation_times(msgs_dict)
self.msg_injection_times = analysis.compute_injection_times(injected_msgs_dict)
#print("message propagation_times: ", self.msg_settling_times)
def get_key(self):
return self.df.Key
def set_keys(self):
self.keys = self.df['Key'].unique()
self.keys.sort()
# set the summary for the run: to be used for plot title
def set_summary(self):
with open(self.cfile, 'r') as f: # Load config file
conf = json.load(f)
minsize = int(conf["wls"]["min_packet_size"]/1024)
maxsize = int(conf["wls"]["max_packet_size"]/1024)
self.run_summary = (f'{conf["gennet"]["num_nodes"]}by'
f'{conf["gennet"]["container_size"]}fo'
f'{conf["gennet"]["fanout"]}-'
f'{conf["wls"]["message_rate"]}mps-'
f'({minsize}-{maxsize})KiB-'
f'{conf["wls"]["simulation_time"]}sec')
print(f'summary: {self.run_summary} (from {self.cfile})')
# set the fields that go into the comparison panel
def set_compare(self, lst):
self.to_compare = lst
# extract the maximal complete sample set
def remove_incomplete_samples(self, grp, err=''):
#if not err:
self.df, minRows = self.df[~self.df.isin([err]).any(axis=1)], sys.maxsize
for cid in self.df[grp].unique():
rows = self.df[self.df[grp] == cid].shape[0]
minRows = rows if minRows > rows else minRows
self.df = self.df.groupby(grp).head(minRows)
# plot the settling times, both network- and node-wise
def plot_msg_settling_times(self):
self.set_panel_size(2, 1, False)
nmsgs = len(self.msg_settling_times)
self.fig.suptitle(f'Settling Time: {self.run_summary}, {nmsgs} msgs')
self.fig.supylabel("msecs")
self.axes[0].set_xticks([x + 1 for x in range(len(self.keys))])
#axes[0].set_xticks(ticks=[x + 1 for x in range(len(self.waku_cids))], labels=self.df["ContainerID"].unique())
self.axes[0].set_xlabel('TODO: revisit after Jordi added per-container settling times')
self.axes[1].violinplot(self.msg_settling_times, showmedians=True)
self.axes[1].axes.xaxis.set_visible(False)
plt.savefig(f'{self.oprefix}-settling-time.pdf', format="pdf", bbox_inches="tight")
#plt.show()
# set the panel params
def set_panel_size(self, m, n, shareY=False):
self.fig, self.axes = plt.subplots(m, n, layout='constrained', sharey=shareY)
self.fig.set_figwidth(12)
self.fig.set_figheight(10)
# plot col panels for selected columns
def plot_col_panels(self, agg):
for col in self.to_plot["ColPanel"]:
if col not in self.df.columns:
log.error(f"ColPanel: {col} is not in {self.df.columns}, skipping...")
continue
if col in ["CPUPerc", "MemUse"]:
self.col_panel_helper(col)
else:
self.col_panel_helper(col, agg)
# plot degree/col panels for the given set of columns
def plot_deg_col_panels(self):
for col in self.to_plot["DegColPanel"]:
if col not in self.df.columns:
log.error(f"DegColPanel: {col} is not in {self.df.columns}, skipping...")
continue
self.deg_col_panel_helper(col) # only agg for now
# plot the column panel
def col_panel_helper(self, col, agg=True):
self.set_panel_size(2, 2)
self.fig.suptitle(f'{self.col2title[col]}: {self.run_summary}')
self.fig.supylabel(self.col2units[col])
per_key_arr = []
# per docker violin plot
self.axes[0,0].ticklabel_format(style='plain')
self.axes[0,0].yaxis.grid(True)
for key in self.keys:
if agg:
tmp = self.df[self.get_key() == key][col].values
else:
tmp = self.df[self.get_key() == key][col].diff().dropna().values
per_key_arr.append(tmp)
#all_arr = np.concatenate((all_arr, tmp), axis=0)
#self.axes[0,0].set_xticks([x + 1 for x in range(len(self.keys))])
labels = [ '{}{}'.format( ' ', k) for i, k in enumerate(self.keys)]
#self.axes[0,0].set_xticklabels(labels)
legends = self.axes[0,0].violinplot(dataset=per_key_arr, showmedians=True)
#text = ""
#for key, nodes in self.key2nodes.items():
# text += f'{key} {", ".join(nodes)}\n'
#self.axes[0,0].text(0.675, 0.985, text, transform=self.axes[0,0].transAxes,
# fontsize=7, verticalalignment='top')
# consolidated violin plot
self.axes[1,0].ticklabel_format(style='plain')
self.axes[1,0].yaxis.grid(True)
self.axes[1,0].set_xlabel('All Containers')
self.axes[1,0].violinplot(self.df[col], showmedians=True)
self.axes[1,0].set_xticks([])
self.axes[1,0].axes.xaxis.set_visible(False)
# per docker scatter plot
self.axes[0,1].ticklabel_format(style='plain')
self.axes[0,1].yaxis.grid(True)
self.axes[0,1].set_xlabel('Time')
legends = []
for i, key in enumerate(self.keys):
y = per_key_arr[i]
legends.append(self.axes[0,1].scatter(x=range(0, len(y)), y=y, marker='.'))
#self.axes[0,1].legend(legends, self.keys, scatterpoints=1,
# loc='upper left', ncol=3,
# fontsize=8)
# consolidated/summed-up scatter plot
self.axes[1,1].ticklabel_format(style='plain')
self.axes[1,1].yaxis.grid(True)
self.axes[1,1].set_xlabel('Time')
out, out_avg, nkeys = [], [], len(per_key_arr)
# omit the very last measurement: could be a partial record
jindices, iindices = range (len(per_key_arr[0])-1), range(len(per_key_arr))
for j in jindices:
out.append(0.0)
for i in iindices:
out[j] += per_key_arr[i][j]
out_avg.append(out[j]/nkeys)
self.axes[1,1].plot(out, color='b')
self.axes[1,1].plot(out_avg, color='y')
self.axes[1,1].legend([f'Total {self.col2title[col]}', f'Average {self.col2title[col]}'],
loc='upper right', ncol=1, fontsize=8)
plt.savefig(f'{self.oprefix}-col-panel-{col}.pdf')
#plt.show()
# build the key2nodes: useful when $container_size$ > 1
def build_key2nodes(self):
with open(self.kinspect_fname) as f:
for line in f:
if "User Services" in line:
f.readline()
break
for line in f:
if line == "\n":
break
larray = line.split()
if "containers_" in larray[1]:
key = larray[1]
self.key2nodes[key] = [larray[2].split("libp2p-")[1].replace(':', '')]
elif "libp2p-node" in larray[0]:
self.key2nodes[key].append(larray[0].split("libp2p-")[1].replace(':', ''))
# export the df if needed
def get_df(self):
return self.df
# build indices or cluster plots
def build_cluster_index(self, grp):
lst = self.df[grp].unique()
self.grp2idx = { val : i for i, val in enumerate(lst)}
self.idx2grp = { i : val for i, val in enumerate(lst)}
self.df[f'{grp}_idx'] = self.df[grp].map(lambda x: self.grp2idx[x])
# plot the cluster panel
def cluster_plot_helper(self, grp, cols):
self.build_cluster_index(grp)
kmeans = KMeans(n_clusters=10, n_init='auto')
groups = self.df[grp].unique()
groups.sort()
xpdf = pd.DataFrame()
for g in groups:
X =self.df.loc[self.df[grp] == g][cols]
Xflat = X.values.flatten()
xpdf[g] = Xflat
labels = kmeans.fit_predict(X)
#TODO: plot better. it is not very interpretable now
self.axes[0,1].scatter(x=range(0, len(labels)), y=labels, marker='.')
#self.axes[0,1].scatter(X.iloc[:, 0], X.iloc[:, 1], c=labels, cmap='plasma')
self.axes[0,1].set_xlabel('Time')
#axis.set_yticks([x for x in range(len(groups))])
self.axes[0,1].set_yticks(range(len(groups)))
labels = ['{}{}'.format( ' ', k) for i, k in enumerate(self.keys)]
self.axes[0,1].set_yticklabels(labels)
labels = kmeans.fit_predict(xpdf)
self.axes[1,1].scatter(xpdf.iloc[:, 0], xpdf.iloc[:, 2], c=labels, cmap='plasma')
# plot the comparison panel
def plot_compare_panel(self):
self.set_panel_size(2, 3)
self.fig.suptitle(f'{self.run_summary}')
k = 0
for i in [0,1]:
for j in [0,1,2]:
col = self.to_compare[k]
#self.axes[i,j].ticklabel_format(style='plain')
self.axes[i,j].yaxis.grid(True)
pc = self.axes[i,j].violinplot(self.df[col], showmedians=True)
self.axes[i,j].set_ylabel(self.col2units[col])
self.axes[i,j].set_title(self.col2title[col])
#for p in pc['bodies']:
# p.set_facecolor('green')
# p.set_edgecolor('k')
# p.set_alpha(0.5)
k += 1
plt.savefig(f'{self.oprefix}-compare.pdf', format="pdf", bbox_inches="tight")
#plt.show()
def phase_plots_helper(self, grp, col):
pass
# build the network from json
def read_network(self):
with open(self.json_fname) as f:
js_graph = json.load(f)
for src in js_graph['nodes'].keys():
for dst in js_graph['nodes'][src]['static_nodes']:
self.G.add_edge(src, dst)
# plot the network and degree histogram
def plot_network(self):
self.set_panel_size(1, 2)
self.fig.suptitle(f'Network & Degree Distribution: {self.run_summary}')
nx.draw(self.G, ax=self.axes[0], pos=nx.kamada_kawai_layout(self.G), with_labels=True)
degree_sequence = sorted((d for n, d in self.G.degree()), reverse=True)
w = np.ones(len(degree_sequence))/len(degree_sequence)
hist, bin_edges = np.histogram(degree_sequence, weights=w, density=False)
width = (bin_edges[1] - bin_edges[0])
self.axes[1].bar(x=bin_edges[:-1], height=hist, align='center',
width=width, edgecolor='k', facecolor='green', alpha=0.5)
self.axes[1].set_xticks(range(max(degree_sequence)+1))
self.axes[1].set_title("Normalised degree histogram")
self.axes[1].set_xlabel("Degree")
self.axes[1].set_ylabel("% of Nodes")
plt.savefig(f'{self.oprefix}-network.pdf', format="pdf", bbox_inches="tight")
#plt.show()
# plot the degree col panel
def deg_col_panel_helper(self, col):
self.set_panel_size(1, 2, shareY=True)
self.fig.suptitle(f'Conditional/Total Normalised Histograms for {self.col2title[col]} : {self.run_summary}')
self.fig.supylabel(self.col2units[col])
degree_sequence = sorted((d for n, d in self.G.degree()), reverse=True)
x, y = np.unique(degree_sequence, return_counts=True)
by_degree = [[] for i in range(x[-1]+1)]
for node, degree in self.G.degree():
curr = self.df[self.df.NodeName == node][col].values
if len(by_degree[degree]) == 0 :
by_degree[degree]=self.df[self.df.NodeName == node][col].values
else :
np.append(by_degree[degree], self.df[self.df.NodeName == node][col].values)
legends = []
for d in by_degree:
if len(d) == 0:
continue
w = np.ones(len(d))/len(d)
hist, bin_edges = np.histogram(d, weights=w, density=False)
width = (bin_edges[1] - bin_edges[0])
legends.append(self.axes[0].bar(x=bin_edges[:-1], height=hist, align='center',
width=width, edgecolor='k', alpha=0.75))
self.axes[0].legend(legends, x, scatterpoints=1,
loc='upper left', ncol=5,
fontsize=8)
self.axes[0].set_title("Conditional Histogram (degree)")
d = self.df[col]
w = np.ones(len(d))/len(d)
hist, bin_edges = np.histogram(d, weights=w, density=False)
width = (bin_edges[1] - bin_edges[0])
self.axes[1].bar(x=bin_edges[:-1], height=hist, align='center',
width=width, edgecolor='k', facecolor='green', alpha=0.5)
self.axes[1].set_title("Total Histogram")
plt.savefig(f'{self.oprefix}-deg-col-panel-{col}.pdf')
#plt.show()
# handle docker stats
class DStats(Plots, metaclass=Singleton):
def __init__(self, log_dir, oprefix, jf, to_plot, cfile):
Plots.__init__(self, log_dir, oprefix, jf, to_plot, cfile)
self.dstats_fname = f'{log_dir}/dstats-data/docker-stats.out'
self.kinspect_fname = f'{log_dir}/dstats-data/docker-kinspect.out'
self.col2title = { "ContainerID" : "Docker ID",
"ContainerName" : "Docker Name",
"CPUPerc" : "CPU Utilisation",
"MemUse" : "Memory Usage",
"MemTotal" : "Total Memory",
"MemPerc" : "Memory Utilisation",
"NetRecv" : "Network Received",
"NetSent" : "Network Sent",
"BlockR" : "Block Reads",
"BlockW" : "Block Writes",
"CPIDS" : "Docker PIDS"
}
self.col2units = { "ContainerID" : "ID",
"ContainerName" : "Name",
"CPUPerc" : "Percentage (%)",
"MemUse" : "MiB",
"MemTotal" : "MiB",
"MemPerc" : "Percentage (%)",
"NetRecv" : "MiB",
"NetSent" : "MiB",
"BlockR" : "MiB",
"BlockW" : "MiB",
"CPIDS" : "PIDS"
}
self.cols = ["CPUPerc", "MemUse","NetRecv", "NetSent", "BlockR", "BlockW"]
# remove the formatting artefacts
def pre_process(self):
if not path_ok(Path(self.dstats_fname)):
sys.exit(0)
self.set_summary()
regex = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
with open(self.dstats_fname) as f:
cleaned_txt = regex.sub('', f.read())
with open(self.dstats_fname, 'w') as f:
f.write(cleaned_txt)
# make sure the df is all numeric
def post_process(self):
for name in ["ContainerID", "ContainerName"]:
self.df[name] = self.df[name].map(lambda x: x.strip())
h2b, n = Human2BytesConverter(), len(self.keys)
for percent in ["CPUPerc", "MemPerc"]:
self.df[percent] = self.df[percent].str.replace('%','').astype(float)
for size in ["MemUse", "MemTotal"]:
self.df[size] = self.df[size].map(lambda x:h2b.convert(x.strip())/(1024*1024)) # MiB
for size in ["NetRecv", "NetSent"]:
self.df[size] = self.df[size].map(lambda x:h2b.convert(x.strip())/(1024*1024)) # MiB
for size in ["BlockR", "BlockW"]:
self.df[size] = self.df[size].map(lambda x:h2b.convert(x.strip())/(1024*1024)) # MiB
self.df['Key'] = self.df['ContainerName'].map(lambda x: x.split("--")[0])
self.build_key2nodes()
self.df['NodeName'] = self.df['Key'].map(lambda x: self.key2nodes[x][0])
self.set_keys()
# build df from csv
def process_data(self):
log.info(f'processing {self.dstats_fname}...')
self.pre_process()
self.df = pd.read_csv(self.dstats_fname, header=0, comment='#',
skipinitialspace = True, delimiter='/',
usecols=["ContainerID", "ContainerName",
"CPUPerc", "MemUse", "MemTotal", "MemPerc",
"NetRecv", "NetSent", "BlockR","BlockW", "CPIDS"])
self.post_process()
self.remove_incomplete_samples(grp='Key', err='--')
self.df.to_csv(f'{self.oprefix}-cleaned.csv', sep='/')
class HostProc(Plots, metaclass=Singleton):
def __init__(self, log_dir, oprefix, jf, to_plot, cfile):
Plots.__init__(self, log_dir, oprefix, jf, to_plot, cfile)
self.fname = f'{log_dir}/host-proc-data/docker-proc.out'
self.kinspect_fname = f'{log_dir}/host-proc-data/docker-kinspect.out'
self.col2title = { 'CPUPerc' : 'CPU Utilisation',
'VmPeak' : 'Peak Virtual Memory Usage',
'MemUse' : 'Peak Virtual Memory Usage',
'VmSize' : 'Current Virtual Memory Usage',
'VmRSS' : 'Peak Physical Memory Usage',
'VmData' : 'Size of Data Segment',
'VmStk' : 'Size of Stack Segment',
'NetRecv' : 'Network1: Received Bytes',
'NetRecvPkts' : 'Network1: Received Packets',
'NetSent' : 'Network1: Transmitted Bytes',
'NetSentPkts' : 'Network1: Transmitted Packets',
'NetRX' : 'Network2: Received Bytes',
'NetWX' : 'Network2: Transmitted Bytes',
'InOctets' : 'Network3: InOctets',
'OutOctets' : 'Network3: OutOctets',
'BlockR' : 'Block Reads',
'BlockW' : 'Block Writes'
}
self.col2units = { 'CPUPerc' : '%',
'VmPeak' : 'MiB',
'MemUse' : 'MiB',
'VmSize' : 'MiB',
'VmRSS' : 'MiB',
'VmData' : 'MiB',
'VmStk' : 'MiB',
'NetRecv' : 'MiB',
'NetRecvPkts' : 'Packets',
'NetSent' : 'MiB',
'NetSentPkts' : 'Packets',
'NetRX' : 'MiB',
'NetWX' : 'MiB',
'InOctets' : 'MiB',
'OutOctets' : 'MiB',
'BlockR' : 'MiB',
'BlockW' : 'MiB'
}
self.cols = ['CPUPerc', 'VmPeak', 'MemUse', 'VmSize', 'VmRSS', 'VmData', 'VmStk',
'RxBytes', 'RxPackets', 'TxBytes', 'TxPackets', 'NetRecv', 'NetSent',
'InOctets', 'OutOctets', 'BlockR', 'BlockW']
def process_data(self):
if not path_ok(Path(self.fname)):
sys.exit(0)
self.set_summary()
self.df = pd.read_csv(self.fname, header=0, comment='#',
skipinitialspace = True, delimiter=r"\s+",
usecols= ['EpochId', 'PID', 'TimeStamp',
'ContainerName', 'ContainerID', 'NodeName',
'VmPeak', 'VmPeakUnit', 'VmSize', 'VmSizeUnit',
'VmRSS', 'VmRSSUnit', 'VmData','VmDataUnit', 'VmStk', 'VmStkUnit',
'HostVIF', 'NetSent', 'NetSentPkts', 'NetRecv', 'NetRecvPkts',
#'VETH', 'InOctets', 'OutOctets',
#'DockerVIF', 'NetRecv', 'NetSent',
#'VETH', 'InOctets', 'OutOctets',
'BlockR', 'BlockW',
'CPUPerc'])
self.post_process()
self.remove_incomplete_samples(grp='Key')
self.df.to_csv(f'{self.oprefix}-cleaned.csv', sep='/')
# normalise the units
def post_process(self):
#h2b = Human2BytesConverter()
for size in ['VmPeak', 'VmSize','VmRSS', 'VmData','VmStk']:
self.df[size] = self.df[size].map(lambda x: x/1024) # MiBs
for size in ['NetRecv','NetSent']:
self.df[size] = self.df[size].map(lambda x: x/(1024*1024)) # MiBs
for size in ['BlockR', 'BlockW']:
self.df[size] = self.df[size].map(lambda x: x/(1024*1024)) # MiBs
#self.df['Key'] = self.df['ContainerName'].map(lambda x: x.split("--")[0])
self.df['Key'] = self.df['NodeName']#.map(lambda x: x.split("--")[0])
self.df['MemUse'] = self.df['VmPeak']
self.build_key2nodes()
#self.df.rename(columns={'NodeName': 'Key'}, inplace=True)
self.set_keys()
self.df.fillna(0)
# not very helpful plots atm
def plot_clusters(self, grp, agg, axes):
self.cluster_plot_helper(col=['CPUPerc', 'VmPeak', 'VmRSS', 'MemUse', 'VmData'], grp=grp, axes=axes)
# sanity check config file
def _config_file_callback(ctx: typer.Context, param: typer.CallbackParam, cfile: str):
if cfile:
typer.echo(f"Loading config file: {os.path.basename(cfile)}")
ctx.default_map = ctx.default_map or {} # Init the default map
try:
with open(cfile, 'r') as f: # Load config file
conf = json.load(f)
if "plotting" not in conf:
log.error(f"No plotting is requested in {cfile}. Skipping plotting.")
sys.exit(0)
# Merge config and default_map
if ctx.command.name in conf["plotting"]:
ctx.default_map.update(conf["plotting"][ctx.command.name])
else:
if "hproc" in conf["plotting"]:
ctx.default_map.update(conf["plotting"]["hproc"])
else:
log.info(f"No dstats/host-proc params in config. Sticking to defaults")
#ctx.default_map.update(conf["plotting"]) # Merge config and default_map
except Exception as ex:
raise typer.BadParameter(str(ex))
return cfile
# instantiate typer and set the commands
app = typer.Typer()
# common cmd processor/plotter
def cmd_helper(metric_infra, to_plot, agg, to_compare):
metric_infra.process_data()
# always plot the compare plots; rest on demand
metric_infra.set_compare(to_compare)
metric_infra.plot_compare_panel()
log.info(f'to_plot : {to_plot}')
if "Network" in to_plot and to_plot["Network"]:
metric_infra.read_network()
metric_infra.plot_network()
if "ColPanel" in to_plot and to_plot["ColPanel"]:
metric_infra.plot_col_panels(agg)
if "ValueCluster" in to_plot and to_plot["ValueCluster"]:
# TODO: find interpretable cluster plot
metric_infra.build_cluster_index('ContainerID')
if "DegColPanel" in to_plot:
metric_infra.plot_deg_col_panels()
if "SettlingTime" in to_plot and to_plot["SettlingTime"]:
metric_infra.compute_msg_settling_times()
metric_infra.plot_msg_settling_times()
#if "Compare" in to_plot and to_plot["Compare"]:
# process / plot docker-procfs.out
@app.command()
def host_proc(ctx: typer.Context, log_dir: Path, # <- mandatory path
out_prefix: str = typer.Option("output", help="Specify the prefix for the plot pdfs"),
aggregate: bool = typer.Option(True, help="Specify whether to aggregate"),
config_file: str = typer.Option("", callback=_config_file_callback, is_eager=True,
help="Set the input config file (JSON)")):
if not path_ok(log_dir, True):
sys.exit(0)
to_plot = ctx.default_map["to_plot"] if ctx.default_map and "to_plot" in ctx.default_map else []
jf = f'{os.path.abspath(log_dir)}/config/topology_generated/network_data.json'
if os.path.exists("plots"):
os.system('rm -rf plots')
os.makedirs("plots")
host_proc = HostProc(log_dir, f'plots/{out_prefix}-host-proc', jf, to_plot, config_file)
cmd_helper(host_proc, to_plot, agg=aggregate,
to_compare=["CPUPerc", "MemUse", "NetRecv", "NetSent", "BlockR", "BlockW"])
log.info(f'Done: {log_dir}')
# process / plot docker-dstats.out
@app.command()
def dstats(ctx: typer.Context, log_dir: Path, # <- mandatory path
out_prefix: str = typer.Option("output", help="Specify the prefix for the plot pdfs"),
aggregate: bool = typer.Option(True, help="Specify whether to aggregate"),
config_file: str = typer.Option("", callback=_config_file_callback, is_eager=True,
help="Set the input config file (JSON)")):
if not path_ok(log_dir, True):
sys.exit(0)
to_plot = ctx.default_map["to_plot"] if ctx.default_map and "to_plot" in ctx.default_map else []
jf = f'{os.path.abspath(log_dir)}/config/topology_generated/network_data.json'
if os.path.exists("plots"):
os.system('rm -rf plots')
os.makedirs("plots")
dstats = DStats(log_dir, f'plots/{out_prefix}-dstats', jf, to_plot, config_file)
cmd_helper(dstats, to_plot, agg=aggregate,
to_compare=["CPUPerc", "MemUse", "NetRecv", "NetSent", "BlockR", "BlockW"])
log.info(f'Done: {log_dir}')
if __name__ == "__main__":
app()

View File

@@ -47,7 +47,6 @@ if __name__ == "__main__":
cadvisor.run(simulation_config, metrics, topology_info, msg_propagation_times, msg_injection_times, min_tss, max_tss, prom_port)
else:
analysis_logger.G_LOGGER.error(f'Unknown infrastructure type: {infra_type}')
analysis.inject_metric_in_dict(plotting_configurations.plotting_config, "propagation",
"Propagation Time (per message)", "Propagation Time (ms)",
"msg_propagation_times", msg_propagation_times)

View File

@@ -1,7 +1,11 @@
# Python Imports
import sys
import json
import tomllib
try:
import tomllib
except ModuleNotFoundError:
import tomli as tomllib
# Project Imports
from src import analysis_logger

View File

@@ -1,17 +1,23 @@
# Install Docker
sudo apt-get update
sudo apt-get install docker-ce docker-ce-cli containerd.io docker-compose-plugin
sudo apt-get install docker-ce docker-ce-cli containerd.io docker-compose-plugin python3-venv
apt-get install -y jq
# Install the suitable kurtosis-cli
kurtosis_version=0.77.0
echo "deb [trusted=yes] https://apt.fury.io/kurtosis-tech/ /" | sudo tee /etc/apt/sources.list.d/kurtosis.list
sudo apt update
sudo apt-mark unhold kurtosis-cli
sudo apt install kurtosis-cli=$kurtosis_version
sudo apt-mark hold kurtosis-cli
sudo rm /etc/apt/sources.list.d/kurtosis.list
required_version=0.77.0
installed_version=`kurtosis version | grep -v WARN`
if [ "$installed_version" = "$required_version" ]; then
echo "Kurtosis version is up to date : $installed_version"
else
echo "deb [trusted=yes] https://apt.fury.io/kurtosis-tech/ /" | sudo tee /etc/apt/sources.list.d/kurtosis.list
sudo apt update
sudo apt-mark unhold kurtosis-cli
sudo apt install kurtosis-cli=$kurtosis_version
sudo apt-mark hold kurtosis-cli
sudo rm /etc/apt/sources.list.d/kurtosis.list
fi
# Build the analysis docker image
cd analysis-module
@@ -24,6 +30,14 @@ cd gennet-module
sh ./build_docker.sh
cd ..
echo "host-proc: setup the venv @ /tmp/host-proc"
python3 -m venv /tmp/host-proc
. /tmp/host-proc/bin/activate
python3 -m pip install -r monitoring/host-proc/requirements.txt
deactivate
echo "host-proc: venv is ready"
cd wls-module
docker build -t wls:0.0.1 .
cd ..
@@ -32,6 +46,7 @@ cd monitoring/container-proc
sh ./build.sh
cd ..
# enable as we start using go-waku
#git clone git@github.com:waku-org/go-waku.git
#cd go-waku

View File

@@ -3,10 +3,10 @@ enclave_name=${1:-"wakurtosis"}
# hardcoded files/fifo/folders
rm -f ./kurtosisrun_log.txt
rm -f /tmp/hostproc-signal.fifo
rm -rf ./wakurtosis_logs ./config/topology_generated ./monitoring/host-proc/stats
rm -rf ./wakurtosis_logs ./config/topology_generated ./monitoring/host-proc/stats ./monitoring/dstats/stats monitoring/container-proc/cproc_metrics.json
docker stop gennet cadvisor bootstrap_node > /dev/null 2>&1
docker rm gennet cadvisor bootstrap_node > /dev/null 2>&1
docker stop gennet cadvisor bootstrap_node dstats host-proc analysis > /dev/null 2>&1
docker rm gennet cadvisor bootstrap_node dstats host-proc analysis > /dev/null 2>&1
kurtosis --cli-log-level "error" enclave rm -f $enclave_name > /dev/null 2>&1
@@ -15,3 +15,6 @@ docker rm $(docker ps -qa) > /dev/null 2>&1
toml_file="config/traits/discv5.toml"
sed -i "s/^discv5-bootstrap-node=\".*\"$/discv5-bootstrap-node=""/" "$toml_file"
#cleanup any host waku processes
#sudo killall -15 wakunode waku

View File

@@ -0,0 +1,57 @@
#!/bin/bash
if [ "$#" -eq 0 ]; then
echo "Usage: main.sh <container_name> [odir]"
echo "Will profile all running containers until the <container_name> exits"
exit
fi
#cline=`docker ps -qa | grep $1` # for id
cline=` docker ps -af "name=$1" | grep $1` # for name
if [ "$cline" = "" ]; then
echo "Error: $1 is not a valid container name"
exit
fi
wait_cid=$1
odir=${2:-"stats"}
#mkdir -p $odir
# get cpu/mem info for cross-refs
cat /proc/cpuinfo > $odir/docker-cpuinfo.out
cat /proc/meminfo > $odir/docker-meminfo.out
# get the list of running dockers
dps=$odir/docker-ps.out
filters="--filter ancestor=gowaku --filter ancestor=statusteam/nim-waku:nwaku-trace2 --filter ancestor=statusteam/nim-waku:nwaku-trace3"
docker ps --no-trunc --format "{{.ID}}#{{.Names}}#{{.Image}}#{{.Command}}#{{.State}}#{{.Status}}#{{.Ports}}" $filters > $dps
# extract the docker ids
dids=$odir/docker-dids.out
cut -f 1 -d '#' $dps > $dids
dstats=$odir/docker-stats.out
# add date and the names/versions of waku images involved
# also add the generating command to aid parsing/debugging
echo "dstats: starting the dstats monitor"
echo "# dstats started @ $(date)" > $dstats # clear the $dstats
echo "# images involved: $(docker images | grep waku | tr '\n' '; ' )" >> $dstats
echo '# docker stats --no-trunc --format "{{.Container}} / {{.Name}} / {{.ID}} / {{.CPUPerc}} / {{.MemUsage}} / {{.MemPerc}} / {{.NetIO}} / {{.BlockIO}} / {{.PIDs}}"' >> $dstats
echo "ContainerID/ContainerName/ID/CPUPerc/MemUse/MemTotal/MemPerc/NetRecv/NetSent/BlockR/BlockW/CPIDS" >> $dstats
# start the docker stats
docker stats --no-trunc --format "{{.Container}} / {{.Name}} / {{.ID}} / {{.CPUPerc}} / {{.MemUsage}} / {{.MemPerc}} / {{.NetIO}} / {{.BlockIO}} / {{.PIDs}}" $(cat $dids) >> $dstats &
dstats_pid=$!
echo "dstats: started and running as $dstats_pid"
echo "dstats: signalling WLS"
docker exec $wait_cid touch /wls/start.signal
echo "dstats: waiting for WLS to finish : dstats $dstats_pid is running"
docker container wait $wait_cid
sleep 10 # make sure you collect the stats until last messages settle down
echo "dstats: WLS finished: stopping the docker monitor $dstats_pid"
kill -15 $dstats_pid

View File

@@ -14,82 +14,90 @@ from pathlib import Path
#from procfs import Proc
from collections import defaultdict
# define singleton
class Singleton(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]
# TODO: return the %CPU utilisation instead?
# pulls system-wide jiffies
def get_cpu_system(f):
f.seek(0)
rbuff = f.readlines()
return f'{rbuff[0].strip()}'
class Parser(metaclass=Singleton):
def __init__(self, csize):
self.csize = csize
# pulls system-wide jiffies
def system_cpu(self, f):
f.seek(0)
line = f.readline()
return sum(int(s) for s in line.split()[2:])
# pulls per-process user/system jiffies
def get_cpu_process(f):
f.seek(0)
rbuff = f.read().strip().split()
lst = [-3, -2] # user jiffies, system jiffies
return f'{rbuff[-3]} {rbuff[-2]}'
# pulls per-process user/system jiffies
def process_cpu(self, f):
f.seek(0)
rbuff = f.readline().split()
ptot = int(rbuff[13]) + int(rbuff[14])
return ptot
# pulls VmPeak, VmSize, VmRSS stats per wakunode
def mem_metrics(self, f):
f.seek(0)
rbuff = f.readlines()
lst = [16, 17, 21, 25, 26] #VmPeak, VmSize, VmRSS, VmData, VmStack
out = [' '.join(rbuff[i].replace("\n", " ").replace(":", "").split()) for i in lst]
res = ' '.join(out)
return res
# pulls VmPeak, VmSize, VmRSS stats per wakunode
def get_mem_metrics(f):
f.seek(0)
rbuff = f.readlines()
lst = [16, 17, 20, 21, 25, 26] #VmPeak, VmSize, VmHWM, VmRSS, VmData, VmStack
out = [' '.join(rbuff[i].replace("\n", " ").replace(":", "").split()) for i in lst]
res = ' '.join(out)
return res
# pulls Rx/Tx Bytes and Packers per wakunode
def net1_metrics(self, f, host_if):
f.seek(0)
rbuff = f.readlines()
out = [line.strip().split() for line in rbuff if "eth0" in line] # docker
if out == []:
out = [line.strip().split() for line in rbuff if host_if in line] # host
out = out[0]
res = f'{out[0]} RxBytes {out[1]} RxPackets {out[2]} TxBytes {out[9]} TxPackets {out[10]}'
return res
# TODO: reconcile with net1 and net3
# pulls Rx/Tx Bytes per wakunode
def net2_metrics(self, f, veth="eth0"):
f.seek(0)
rbuff = f.readlines()
ra = rbuff[3].split()
res = f'InOctets {ra[7]} OutOctets {ra[8]}'
return f'{veth} {res}'
# pulls Rx/Tx Bytes and Packers per wakunode
def get_net1_metrics(f, host_if):
f.seek(0)
rbuff = f.readlines()
out = [line.strip().split() for line in rbuff if "eth0" in line] # docker
if out == []:
out = [line.strip().split() for line in rbuff if host_if in line] # host
out = out[0]
res = f'{out[0]} RxBytes {out[1]} RxPackets {out[2]} TxBytes {out[9]} TxPackets {out[10]}'
return res
# pulls Rx/Tx Bytes per wakunode
def net3_metrics(self, frx, ftx, veth="eth0"):
frx.seek(0)
ftx.seek(0)
return f'{veth} NETRX {frx.read().strip()} NETWX {ftx.read().strip()}'
# TODO: reconcile with net1 and net3
# pulls Rx/Tx Bytes per wakunode
def get_net2_metrics(f, veth="eth0"):
f.seek(0)
rbuff = f.readlines()
ra = rbuff[3].split()
res = f'InOctets {ra[7]} OutOctets {ra[8]}'
return f'{veth} {res}'
# pulls Rx/Tx Bytes per wakunode
def get_net3_metrics(frx, ftx, veth="eth0"):
frx.seek(0)
ftx.seek(0)
return f'{veth} NETRX {frx.read().strip()} NETWX {ftx.read().strip()}'
# pulls the disk read/write bytes per wakunodes
# TODO: demonise block reads: UNIX sockets/IPC/MSG QUEUES
def get_blk_metrics(f):
f.seek(0)
rbuff = f.readlines()
lst = [4, 5] #lst = [0, 1] if csize == 1 else [4, 5]
res = ''.join([rbuff[i].replace("\n", " ").replace("259:0 ", "") for i in lst])
return res
# pulls the disk read/write bytes per wakunodes
# TODO: demonise block reads if cszie > 1: UNIX sockets/IPC/MSG QUEUES
def blk_metrics(self, f):
f.seek(0)
rbuff = f.readlines()
lst = [4, 5]
#lst = [0, 1] if self.csize == 1 else [4, 5]
res = ''.join([rbuff[i].replace("\n", " ").replace("259:0 ", "") for i in lst])
return res
class MetricsCollector:
def __init__(self, prefix, sampling_interval):
def __init__(self, csize, prefix, sampling_interval):
self.csize = csize
self.prefix = prefix
self.procfs_sampling_interval = sampling_interval
self.pid2procfds = defaultdict(dict)
self.parser = Parser(csize)
self.docker_ps_fname = os.environ["DPS_FNAME"]
self.docker_inspect_fname = os.environ["DINSPECT_FNAME"]
self.ps_pids_fname = os.environ["PIDLIST_FNAME"]
self.docker_pid2veth_fname = os.environ["ID2VETH_FNAME"]
#self.docker_dids = []
self.docker_pids = []
self.docker_npids = len(self.docker_pids)
@@ -98,6 +106,8 @@ class MetricsCollector:
self.did2veth = {}
self.pid2veth = {}
self.pid2did = {}
self.pid2node_name = {}
self.pid2kdid = {}
self.procout_fname = os.environ["PROCOUT_FNAME"]
self.procfs_fd = ""
@@ -111,6 +121,8 @@ class MetricsCollector:
self.last_tstamp = 0
self.start_time = 0
self.last_sys_ctot, self.last_pid_ctot = {}, {}
# check if a wakunode/pid exists
def pid_exists(self, pid):
pid = int(pid)
@@ -134,10 +146,10 @@ class MetricsCollector:
self.pid2procfds[pid]["net2"] = open(f'/proc/{pid}/net/netstat')
self.pid2procfds[pid]["net3rx"] = open(f'/sys/class/net/{self.pid2veth[pid]}/statistics/rx_bytes')
self.pid2procfds[pid]["net3tx"] = open(f'/sys/class/net/{self.pid2veth[pid]}/statistics/tx_bytes')
#blk = ((f'/sys/fs/cgroup/blkio/docker'
# f'/{self.pid2did[pid]}/'
# f'blkio.throttle.io_service_bytes'
# )) if self.csize == 1 else f'/proc/{pid}/io'
#self.pid2procfds[pid]["blk"] = open((f'/sys/fs/cgroup/blkio/docker'
# f'/{self.pid2did[pid]}/'
# f'blkio.throttle.io_service_bytes'
# )) if self.csize == 1 else open(f'/proc/{pid}/io')
self.pid2procfds[pid]["blk"] = open(f'/proc/{pid}/io') # require SUDO
self.procfs_fd = open(self.procout_fname, "a")
t2 = time.time()
@@ -161,32 +173,42 @@ class MetricsCollector:
def procfs_collector(self):
for pid in self.docker_pids:
veth = self.pid2veth[pid]
sys_stat = get_cpu_system(self.pid2procfds[0]["cpu"])
stat = get_cpu_process(self.pid2procfds[pid]["cpu"])
mem = get_mem_metrics(self.pid2procfds[pid]["mem"])
net1 = get_net1_metrics(self.pid2procfds[pid]["net1"], self.host_if)
net2 = get_net2_metrics(self.pid2procfds[pid]["net2"], veth) # SNMP MIB
net3 = get_net3_metrics(self.pid2procfds[pid]["net3rx"],
self.pid2procfds[pid]["net3tx"], veth) # sysfs/cgroup stats
blk = get_blk_metrics(self.pid2procfds[pid]["blk"]) # Read, Write
sys_ctot = self.parser.system_cpu(self.pid2procfds[0]["cpu"])
pid_ctot = self.parser.process_cpu(self.pid2procfds[pid]["cpu"])
cpu_perc = (pid_ctot - self.last_pid_ctot[pid]) / (sys_ctot - self.last_sys_ctot[pid])
self.last_sys_ctot[pid], self.last_pid_ctot[pid] = sys_ctot, pid_ctot
mem = self.parser.mem_metrics(self.pid2procfds[pid]["mem"])
net1 = self.parser.net1_metrics(self.pid2procfds[pid]["net1"], self.host_if)
#net2 = self.parser.net2_metrics(self.pid2procfds[pid]["net2"], veth) # SNMP MIB
#net3 = self.parser.net3_metrics(self.pid2procfds[pid]["net3rx"],
# self.pid2procfds[pid]["net3tx"], veth) # sysfs/cgroup stats
blk = self.parser.blk_metrics(self.pid2procfds[pid]["blk"]) # Read, Write
out = ( f'SAMPLE_{self.procfs_sample_cnt} '
f'{pid} {time.time()} '
f'MEM {mem} NET {net1} {net2} {net3} '
f'BLK {blk} CPU-SYS {sys_stat} CPU-process {stat}\n'
f'{pid} {self.pid2node_name[pid]} {time.time()} '
f'{self.pid2did[pid]} {self.pid2kdid[pid]} '
f'MEM {mem} NET {net1} '
f'BLK {blk} CPU {cpu_perc}\n'
)
self.procfs_fd.write(str(out))
self.procfs_sample_cnt += 1 # schedule the next event ASAP
if not self.got_signal: # could be xpensive for n > 1000 : branch
self.procfs_scheduler.enter(self.procfs_sampling_interval, 1, self.procfs_collector, ())
if not self.procfs_sample_cnt % 50: # could be xpensive for n > 1000 : branch + mod
if (self.procfs_sample_cnt-1) % 50 == 0: # could be xpensive for n > 1000 : branch + mod
tstamp = time.time()
n = self.docker_npids
elapsed = tstamp - self.last_tstamp
log.info((f'Metrics: sample cnt = {self.procfs_sample_cnt}: '
f'time took per sample (of {n} wakunodes) ~ '
f'avg time took to sample {n}\t wakunodes ~ '
f'{elapsed/50-self.procfs_sampling_interval:.5f} secs'))
self.last_tstamp = tstamp
# keep track of cpu stats from last run
def set_last_cpu_totals(self):
log.info("Metrics: set last_cpu_totals")
for pid in self.docker_pids:
self.last_sys_ctot[pid] = self.parser.system_cpu(self.pid2procfds[0]["cpu"])
self.last_pid_ctot[pid] = self.parser.process_cpu(self.pid2procfds[pid]["cpu"])
# add headers and schedule /proc reader's first read
def launch_procfs_monitor(self, wls_cid):
t1 = time.time()
@@ -195,36 +217,67 @@ class MetricsCollector:
f'{len(self.docker_pids)}\n'))
self.procfs_fd.write(f'# {", ".join([f"{pid} = {self.pid2did[pid]}" for pid in self.pid2did])} : {len(self.pid2did.keys())}\n')
self.procfs_fd.write(f'# {", ".join([f"{pid} = {self.pid2veth[pid]}" for pid in self.pid2did])} : {len(self.pid2veth.keys())}\n')
self.procfs_fd.write(f'# {", ".join([f"{pid} = {self.pid2node_name[pid]}" for pid in self.pid2did])} : {len(self.pid2node_name.keys())}\n')
# write the df column names
self.procfs_fd.write((f'EpochId PID NodeName TimeStamp ContainerID ContainerName '
f'MEM VmPeakKey VmPeak VmPeakUnit '
f'VmSizeKey VmSize VmSizeUnit '
f'VmRSSKey VmRSS VmRSSUnit '
f'VmDataKey VmData VmDataUnit '
f'VmStkKey VmStk VmStkUnit '
f'NET HostVIF RxBytesKey NetSent RxPacketsKey NetSentPkts '
f'TxBytesKey NetRecv TxPacketsKey NetRecvPkts '
#f'VETH InOctetsKey InOctets OutOctetsKey OutOctets '
#f'DockerVIF NetRXKey NetRX NETWXKey NetWX '
f'BLK READKEY BlockR WRITEKEY BlockW '
#f'CPU-SYS cpu cpu0 cpu1 cpu2 cpu3 cpu4 cpu5 cpu6 cpu7 cpu8 cpu9 '
#f'CPU-Process CPUUTIME CPUSTIME
f'CPU CPUPerc\n'))
log.info("Metrics: launch_procfs_monitor: signalling WLS")
signal_wls = f'docker exec {wls_cid} touch /wls/start.signal'
subprocess.run(signal_wls, shell=True) # revisit after Jordi's pending branch merge
log.info("Metrics: launch_procfs_monitor: signalling dstats")
f = os.open(self.signal_fifo, os.O_WRONLY)
os.write(f, "host-proc: signal dstats\n".encode('utf-8'))
os.close(f)
self.set_last_cpu_totals()
self.start_time = time.time()
self.last_tstamp = time.time()
self.procfs_scheduler.enter(self.procfs_sampling_interval, 1,
self.procfs_collector, ())
self.procfs_scheduler.run()
# build the pid to kurtosis did map from docker ps
def build_pid2kdid(self):
did2kdid = {}
with open(self.docker_ps_fname) as f:
line = f.readline()
while line:
larr = line.split('#')
did2kdid[larr[0]] = larr[1]
line = f.readline()
for pid in self.pid2did:
self.pid2kdid[pid] = did2kdid[self.pid2did[pid]]
# build the host pid to docker id map : will include non-docker wakunodes
def build_pid2did(self):
with open(self.ps_pids_fname) as f:
self.ps_pids = f.read().strip().split("\n")
#self.docker_pids = [pid for pid in self.ps_pids if self.pid_exists(pid)]
#log.debug((f'{self.docker_pids}:{len(self.docker_pids)} <- '
# f'{self.ps_pids}:{len(self.ps_pids)}'))
for pid in self.ps_pids:
if not self.pid_exists(pid): # assert that these pids are live
continue
with open(f'/proc/{pid}/cmdline') as f:
line = f.readline()
if "waku" not in line: # assert that these pids are waku's
# assert that these pids are waku's and have config enabled
if "waku" not in line or "--config-file=" not in line:
log.info(f'non-waku pid {pid} = {line}')
continue
self.pid2node_name[pid] = line.split("--config-file=")[1].split('/')[3]
did = ""
self.docker_pids.append(pid)
# FORMAT
#'/proc/{pid}/cmdline = /usr/bin/wakunode--rpc-address=0.0.0.0
#--metrics-server-address=0.0.0.0
#--log-level=TRACE
#--config-file=/node/configuration_file/node-6/node-6.toml
#--ports-shift=2m
with open(f'/proc/{pid}/mountinfo') as f: # or /proc/{pid}/cgroup
line = f.readline()
while line:
@@ -247,17 +300,18 @@ class MetricsCollector:
with open(self.docker_pid2veth_fname) as f:
for line in f.readlines():
la = line.strip().split(":")
self.did2veth[la[0]]=la[1]
self.did2veth[la[0]]=la[1].strip().split(' ')[-1]
self.pid2veth = {pid : self.did2veth[self.pid2did[pid]] for pid in self.docker_pids}
# build metadata for the runs: pid2did, did2veth, pid2veth
def process_metadata(self):
t1 = time.time()
self.build_pid2did()
self.build_pid2kdid()
t2 = time.time()
self.build_pid2veth()
t3 = time.time()
log.info(f'Metrics: process_metadata: pid2did = {t2-t1:.5f} secs, pid2veth = {t3-t2:0.5f} secs')
log.info(f'Metrics: process_metadata: pid2did/kdid = {t2-t1:.5f} secs, pid2veth = {t3-t2:0.5f} secs')
log.info(f'Metrics: process_metadata took {t3-t1:.5f} secs')
# after metadata is collected, create the threads and launch data collection
@@ -290,9 +344,14 @@ class MetricsCollector:
# ensure sampling_interval > 0
def _sinterval_callback(ctx: typer, Context, sinterval: int):
if sinterval <= 0:
raise ValueError(f"sampling_interval must be > 0")
raise ValueError(f"sampling-interval must be > 0")
return sinterval
# ensure container_size > 0
def _csize_callback(ctx: typer, Context, csize: int):
if csize <= 0:
raise ValueError(f"container-size must be > 0")
return csize
# does not return
def main(ctx: typer.Context,
@@ -302,10 +361,8 @@ def main(ctx: typer.Context,
help="Specify the path for find the data files"),
local_if: str = typer.Option("eth0",
help="Specify the local interface to account non-docker nw stats"),
#signal_fifo: Path = typer.Option(
# ..., exists=True, file_okay=True, dir_okay=False, writable=True, resolve_path=True),
#container_size: int = typer.Option(1, callback=_csize_callback,
# help="Specify the number of wakunodes per container"),
container_size: int = typer.Option(1, callback=_csize_callback,
help="Specify the number of wakunodes per container"),
sampling_interval: int = typer.Option(1, callback=_sinterval_callback,
help="Set the sampling interval in secs")):
@@ -314,7 +371,9 @@ def main(ctx: typer.Context,
datefmt="%H:%M:%S")
log.info("Metrics: setting up")
metrics = MetricsCollector(prefix=prefix, sampling_interval=sampling_interval)
metrics = MetricsCollector(csize=container_size,
prefix=prefix,
sampling_interval=sampling_interval)
log.info("Metrics: processing system and container metadata...")
metrics.process_metadata()

View File

@@ -4,12 +4,11 @@ odir=$2
usr=$3
grp=$4
signal_fifo=$5
sinterval=1
proclog=$odir/docker-proc-log.out
# blocks until signalled
echo "host-proc: /proc fs monitor : waiting on $signal_fifo"
echo "host-proc: the helper is waiting on $signal_fifo"
cat $signal_fifo
export SIGNAL_FIFO=$signal_fifo
@@ -25,17 +24,20 @@ nfh=$((10*max_wakunodes))
ulimit -n $nhf # jack-up the number of files handles for all the children
# add date and the names/versions of waku images present
echo "# /procfs started: $(date)" > $PROCOUT_FNAME
echo "# /procfs images involved: $(docker images | grep waku | tr '\n' '; ' )" >> $PROCOUT_FNAME
echo "#host-proc: started: $(date)" > $PROCOUT_FNAME
echo "#host-proc: images involved: $(docker images | grep waku | tr '\n' '; ' )" >> $PROCOUT_FNAME
echo "host-proc: starting the /proc fs monitor"
python3 ./procfs.py --sampling-interval $sinterval --prefix $odir --wls-cid $WAIT_CID > $proclog 2>&1 &
echo "host-proc: starting the /proc fs helper"
helper=`dirname $odir`/host-proc-helper.py
# explicitly resolve the python interpreter
/tmp/host-proc/bin/python3 $helper --sampling-interval $sinterval --prefix $odir --wls-cid $WAIT_CID > $proclog 2>&1 &
procfs_pid=$!
echo "host-proc: waiting for WLS to finish: procfs $procfs_pid running"
docker container wait $WAIT_CID # now wait for the wakurtosis to finish
sleep 60 # make sure you collect the stats until last messages settle down
echo "host-proc: stopping /proc fs monitor $procfs_pid"
echo "host-proc: stopping the helper $procfs_pid"
kill -15 $procfs_pid # procfs-stats is a su process
echo "host-proc: updating the owner of logs: $usr, $grp"

View File

@@ -1,5 +1,4 @@
#!/bin/bash
odir=./stats
if [ "$#" -eq 0 ]; then
echo "Usage: main.sh <container_name> [odir] [signal_fifo]"
@@ -20,19 +19,26 @@ signal_fifo=${3:-"/tmp/hostproc-signal.fifo"}
#mkdir -p $odir
echo "host-proc: gathering docker/process meta-data..."
# TODO: add more images to ancestor
dps=$odir/docker-ps.out
docker ps --no-trunc --filter "ancestor=statusteam/nim-waku" --filter "ancestor=gowaku" --filter "ancestor=statusteam/nim-waku:nwaku-trace2" --format "{{.ID}}#{{.Names}}#{{.Image}}#{{.Command}}#{{.State}}#{{.Status}}#{{.Ports}}" > $dps
echo "host-proc: gathering hw/docker/process meta-data..."
cat /proc/cpuinfo > $odir/docker-cpuinfo.out
cat /proc/meminfo > $odir/docker-meminfo.out
dids=$odir/docker-ids.out
# get the list of running dockers
dps=$odir/docker-ps.out
filters="--filter ancestor=gowaku --filter ancestor=statusteam/nim-waku:nwaku-trace2 --filter ancestor=statusteam/nim-waku:nwaku-trace3"
docker ps --no-trunc --format "{{.ID}}#{{.Names}}#{{.Image}}#{{.Command}}#{{.State}}#{{.Status}}#{{.Ports}}" $filters > $dps
# extract the docker ids
dids=$odir/docker-dids.out
cut -f 1 -d '#' $dps > $dids
dinspect=$odir/docker-inspect.out
docker inspect --format "{{.State.Pid}}{{.Name}}/{{.Image}}/{{.State}}" $(cat $dids) > $dinspect
#only pick up wakunodes with explicit config
pidlist=$odir/docker-pids.out
ps -ef | grep -E "wakunode|waku" |grep -v docker | grep -v grep | awk '{print $2}' > $pidlist
ps -ef | grep -E "wakunode|waku" | grep config-file | awk '{print $2}' > $pidlist
cat /proc/cpuinfo > $odir/docker-cpuinfo.out
cat /proc/meminfo > $odir/docker-meminfo.out
@@ -56,26 +62,7 @@ procout=$odir/docker-proc.out
echo "export DPS_FNAME=$dps DINSPECT_FNAME=$dinspect PIDLIST_FNAME=$pidlist ID2VETH_FNAME=$id2veth PROCOUT_FNAME=$procout LOCAL_IF=$lif WAIT_CID=$wait_cid" > $rclist
#signal the host-proc: unblocks /proc fs
echo "host-proc: collected all requisite docker/process meta-data\nSignalling the /proc fs"
echo "host-proc: collected all requisite docker/process meta-data"
echo "host-proc: signalling the monitor"
# *should* be non-blocking as attendant read is already issued
echo "host-proc: signal /proc fs " > $signal_fifo
# blocks
cat $signal_fifo
dstats=$odir/docker-stats.out
echo "host-proc: starting the dstats monitor"
echo "# dstats started: $(date)" > $dstats
echo "# images involed: $(docker images | grep waku | tr '\n' '; ' )" >> $dstats
echo '# docker stats --no-trunc --format "{{.Container}} / {{.Name}} / {{.ID}} / {{.CPUPerc}} / {{.MemUsage}} / {{.MemPerc}} / {{.NetIO}} / {{.BlockIO}} / {{.PIDs}}"' >> $dstats
# add date and the names/versions of waku images present
docker stats --no-trunc --format "{{.Container}} / {{.Name}} / {{.ID}} / {{.CPUPerc}} / {{.MemUsage}} / {{.MemPerc}} / {{.NetIO}} / {{.BlockIO}} / {{.PIDs}}" $(cat $dids) >> $dstats &
dstats_pid=$!
echo "host-proc: waiting for WLS to finish : dstats $dstats_pid is running"
docker container wait $wait_cid
sleep 60 # make sure you collect the stats until last messages settle down
echo "host-proc: stopping the docker monitor $dstats_pid"
kill -15 $dstats_pid
echo "host-proc: start the /procfs" > $signal_fifo

198
run.sh
View File

@@ -1,20 +1,18 @@
#!/bin/sh
#TODO: make them functions
##################### SETUP & CLEANUP
if [ "$#" -eq 0 ]; then
echo "Error: Must select the measurement infra: cadvisor, host-proc, container-proc"
echo "Error: Must select the measurement infra: cadvisor, dstats, host-proc, container-proc"
echo "Usage: sh ./run.sh <measurement_infra> [enclave_name] [config_file]"
exit 1
fi
dir=$(pwd)
# Parse args
# get the args
metrics_infra=${1:-"cadvisor"}
enclave_name=${2:-"wakurtosis"}
wakurtosis_config_file=${3:-"config.json"}
dir=$(pwd)
loglevel="error"
echo "- Metrics Infra: " "$metrics_infra"
echo "- Enclave name: " "$enclave_name"
@@ -24,29 +22,51 @@ echo "- Configuration file: " "$wakurtosis_config_file"
echo "Cleaning up previous runs"
sh ./cleanup.sh $enclave_name
echo "Done cleaning up previous runs"
# make sure the prometheus and grafana configs are readable
chmod a+r monitoring/prometheus.yml monitoring/configuration/config/grafana.ini ./monitoring/configuration/config/provisioning/dashboards/dashboard.yaml
##################### END
kurtosis_run="kurtosis_run.log"
kurtosis_inspect="kurtosis_inspect.log"
##################### CADVISOR
if [ "$metrics_infra" = "cadvisor" ];
then
# Preparing enclave
usr=`id -u`
grp=`id -g`
stats_dir=stats
signal_fifo=/tmp/hostproc-signal.fifo # do not create fifo under ./stats, or inside the repo
##################### MONITORING MODULE PROLOGUES
if [ "$metrics_infra" = "cadvisor" ]; then #CADVISOR
# prepare the enclave
echo "Preparing the enclave..."
kurtosis --cli-log-level $loglevel enclave add --name ${enclave_name}
enclave_prefix=$(kurtosis --cli-log-level $loglevel enclave inspect --full-uuids $enclave_name | grep UUID: | awk '{print $2}')
echo "Enclave network: "$enclave_prefix
# Get enclave last IP
# get the last IP of the enclave
subnet="$(docker network inspect $enclave_prefix | jq -r '.[].IPAM.Config[0].Subnet')"
echo "Enclave subnetork: $subnet"
last_ip="$(ipcalc $subnet | grep HostMax | awk '{print $2}')"
echo "cAdvisor IP: $last_ip"
# Set up Cadvisor
docker run --volume=/:/rootfs:ro --volume=/var/run:/var/run:rw --volume=/var/lib/docker/:/var/lib/docker:ro --volume=/dev/disk/:/dev/disk:ro --volume=/sys:/sys:ro --volume=/etc/machine-id:/etc/machine-id:ro --publish=8080:8080 --detach=true --name=cadvisor --privileged --device=/dev/kmsg --network $enclave_prefix --ip=$last_ip gcr.io/cadvisor/cadvisor:v0.47.0 >/dev/null 2>&1
# set up the cadvisor
docker run --volume=/:/rootfs:ro --volume=/var/run:/var/run:rw --volume=/var/lib/docker/:/var/lib/docker:ro --volume=/dev/disk/:/dev/disk:ro --volume=/sys:/sys:ro --volume=/etc/machine-id:/etc/machine-id:ro --publish=8080:8080 --detach=true --name=cadvisor --privileged --device=/dev/kmsg --network $enclave_prefix --ip=$last_ip gcr.io/cadvisor/cadvisor:v0.47.0
elif [ "$metrics_infra" = "dstats" ]; then # HOST-PROC
odir=./monitoring/dstats/$stats_dir
mkdir $odir
elif [ "$metrics_infra" = "host-proc" ]; then # HOST-PROC
odir=./monitoring/host-proc/$stats_dir
rclist=$odir/docker-rc-list.out
mkdir $odir
mkfifo $signal_fifo
chmod 0777 $signal_fifo
# get the sudo sorted out in the main thread itself
echo "host-proc: need sudo rights, please enter suitable credentials at the prompt"
sudo echo "host-proc: got the credentials, starting the host-proc helper" # dummy sudo cmd
sudo sh ./monitoring/host-proc/host-proc-helper.sh $rclist $odir $usr $grp $signal_fifo &
fi
##################### END
##################### BOOTSTRAP NODE
echo "Setting up bootstrap node"
# Get bootstrap IP in enclave
@@ -79,14 +99,14 @@ toml_file="config/traits/discv5.toml"
sed -i "s|discv5-bootstrap-node=\(.*\)|discv5-bootstrap-node=[\"${NODE_ENR}\"]|" $toml_file
##################### END
##################### GENNET
# Run Gennet docker container
echo "Running network generation"
docker run --name cgennet -v ${dir}/config/:/config:ro gennet --config-file /config/"${wakurtosis_config_file}" --traits-dir /config/traits
err=$?
if [ $err != 0 ]
then
if [ $err != 0 ]; then
echo "Gennet failed with error code $err"
exit
fi
@@ -96,81 +116,51 @@ docker rm cgennet > /dev/null 2>&1
##################### END
##################### HOST PROCFS MONITOR : PROLOGUE
if [ "$metrics_infra" = "host-proc" ];
then
usr=`id -u`
grp=`id -g`
odir=./stats
signal_fifo=/tmp/hostproc-signal.fifo # do not create fifo under ./stats, or inside the repo
rclist=$odir/docker-rc-list.out
cd monitoring/host-proc
mkdir -p $odir
mkfifo $signal_fifo
chmod 0777 $signal_fifo
sudo sh ./procfs.sh $rclist $odir $usr $grp $signal_fifo &
cd -
fi
##################### END
##################### KURTOSIS RUN
# Create the new enclave and run the simulation
jobs=$(cat "${dir}"/config/"${wakurtosis_config_file}" | jq -r ".kurtosis.jobs")
echo "\nSetting up the enclave: $enclave_name"
kurtosis_cmd="kurtosis --cli-log-level \"$loglevel\" run --enclave ${enclave_name} . '{\"wakurtosis_config_file\" : \"config/${wakurtosis_config_file}\"}' --parallelism ${jobs} > kurtosisrun_log.txt 2>&1"
kurtosis_cmd="kurtosis --cli-log-level \"$loglevel\" run --full-uuids --enclave ${enclave_name} . '{\"wakurtosis_config_file\" : \"config/${wakurtosis_config_file}\"}' --parallelism ${jobs} > $kurtosis_run 2>&1"
START=$(date +%s)
eval "$kurtosis_cmd"
END1=$(date +%s)
DIFF1=$(( $END1 - $START ))
echo -e "Enclave $enclave_name is up and running: took $DIFF1 secs to setup"
echo "Enclave $enclave_name is up and running: took $DIFF1 secs to setup"
sed -n '/Starlark code successfully run. No output was returned./,$p' $kurtosis_run > $kurtosis_inspect
# Extract the WLS service name
wls_service_name=$(kurtosis --cli-log-level $loglevel enclave inspect $enclave_name | grep "\<wls\>" | awk '{print $1}')
echo -e "\n--> To see simulation logs run: kurtosis service logs $enclave_name $wls_service_name <--"
##################### END
wls_service_name=$(grep "\<wls\>" $kurtosis_inspect | awk '{print $1}')
echo "\n--> To see simulation logs run: kurtosis service logs $enclave_name $wls_service_name <--"
##################### EXTRACT WLS CID
# Get the container prefix/suffix for the WLS service
service_name=$(kurtosis --cli-log-level $loglevel enclave inspect $enclave_name | grep $wls_service_name | awk '{print $2}')
service_uuid=$(kurtosis --cli-log-level $loglevel enclave inspect --full-uuids $enclave_name | grep $wls_service_name | awk '{print $1}')
wls_sname=$(grep $wls_service_name $kurtosis_inspect | awk '{print $2}')
wls_suuid=$(grep $wls_service_name $kurtosis_inspect | awk '{print $1}')
# Construct the fully qualified container name that kurtosis has created
wls_cid="$service_name--$service_uuid"
wls_cid="$wls_sname--$wls_suuid"
#echo "The WLS_CID = $wls_cid"
##################### END
##################### CADVISOR MONITOR: EPILOGUE
if [ "$metrics_infra" = "cadvisor" ];
then
echo "Signaling WLS"
##################### MONITORING MODULE EPILOGUE: WLS SIGNALLING
if [ "$metrics_infra" = "cadvisor" ]; then
echo "cadvisor: signaling WLS"
docker exec $wls_cid touch /wls/start.signal
fi
##################### END
##################### HOST PROCFS MONITOR: EPILOGUE
if [ "$metrics_infra" = "host-proc" ];
then
echo "Starting the /proc fs and docker stat measurements"
cd monitoring/host-proc
sh ./dstats.sh $wls_cid $odir $signal_fifo &
cd -
fi
##################### END
##################### Container-Proc MONITOR
if [ "$metrics_infra" = "container-proc" ];
then
elif [ "$metrics_infra" = "dstats" ]; then
echo "Starting dstats measurements.."
# collect container/node mapping via kurtosis
kinspect=$odir/docker-kinspect.out
cp $kurtosis_inspect $kinspect
sh ./monitoring/dstats/dstats.sh $wls_cid $odir & # the process subtree takes care of itself
elif [ "$metrics_infra" = "host-proc" ]; then
echo "Starting host-proc measurements.."
kinspect=$odir/docker-kinspect.out
cp $kurtosis_inspect $kinspect
sh ./monitoring/host-proc/host-proc.sh $wls_cid $odir $signal_fifo &
elif [ "$metrics_infra" = "container-proc" ]; then
echo "Starting monitoring with probes in the containers"
# Start process level monitoring (in background, will wait to WSL to be created)
docker run \
@@ -178,7 +168,6 @@ then
-v $(pwd)/monitoring/container-proc/:/cproc-mon/ \
-v $(pwd)/config/config.json:/cproc-mon/config/config.json \
container-proc:latest &
monitor_pid=$!
fi
##################### END
@@ -187,73 +176,80 @@ fi
##################### GRAFANA
# Fetch the Grafana address & port
grafana_host=$(kurtosis enclave inspect $enclave_name | grep "\<grafana\>" | awk '{print $6}')
#grafana_host=$(kurtosis enclave inspect $enclave_name | grep "\<grafana\>" | awk '{print $6}')
grafana_host=$(grep "\<grafana\>" $kurtosis_inspect | awk '{print $6}')
echo -e "\n--> Statistics in Grafana server at http://$grafana_host/ <--"
echo "Output of kurtosis run command written in kurtosisrun_log.txt"
echo "\n--> Statistics in Grafana server at http://$grafana_host/ <--"
echo "Output of kurtosis run command written in $kurtosis_run"
##################### END
# Get the container prefix/uffix for the WLS service
service_name="$(kurtosis --cli-log-level $loglevel enclave inspect $enclave_name | grep $wls_service_name | awk '{print $2}')"
service_uuid="$(kurtosis --cli-log-level $loglevel enclave inspect --full-uuids $enclave_name | grep $wls_service_name | awk '{print $1}')"
##################### WAIT FOR THE WLS TO FINISH
# Wait for the container to halt; this will block
echo -e "Waiting for simulation to finish ..."
echo "Waiting for simulation to finish ..."
status_code="$(docker container wait $wls_cid)"
echo -e "Simulation ended with code $status_code Results in ./${enclave_name}_logs"
echo "Simulation ended with code $status_code Results in ./${enclave_name}_logs"
END2=$(date +%s)
DIFF2=$(( $END2 - $END1 ))
echo "Simulation took $DIFF1 + $DIFF2 = $(( $END2 - $START)) secs"
##################### END
# give time for the messages to settle down before we collect logs
# sleep 60
##################### GATHER CONFIG, LOGS & METRICS
# give time for the messages to settle down before we collect the logs
sleep 60
# dump logs
echo "Dumping Kurtosis logs"
kurtosis enclave dump ${enclave_name} ${enclave_name}_logs > /dev/null 2>&1
cp kurtosisrun_log.txt ${enclave_name}_logs
cp $kurtosis_run $kurtosis_inspect ${enclave_name}_logs
# copy metrics data, config, network_data to the logs dir
cp -r ./config ${enclave_name}_logs
if [ "$metrics_infra" = "host-proc" ];
then
echo "Copying the /proc fs and docker stat measurements"
cp -r ./monitoring/host-proc/stats ${enclave_name}_logs/host-proc-stats
fi
if [ "$metrics_infra" = "container-proc" ];
then
echo -e "Waiting monitoring to finish ..."
##################### MONITORING MODULE - COPY
if [ "$metrics_infra" = "dstats" ]; then
# unfortunately there is no way to introduce a race-free finish signalling
echo "dstats: copying the dstats data"
cp -r ./monitoring/dstats/stats ${enclave_name}_logs/dstats-data
elif [ "$metrics_infra" = "host-proc" ]; then
echo "Copying the host-proc data"
cp -r ./monitoring/host-proc/stats ${enclave_name}_logs/host-proc-data
elif [ "$metrics_infra" = "container-proc" ]; then
echo "Waiting monitoring to finish ..."
wait $monitor_pid
echo "Copying the container-proc measurements"
cp ./monitoring/container-proc/cproc_metrics.json "./${enclave_name}_logs/cproc_metrics.json" > /dev/null 2>&1
# \rm -r ./monitoring/container-proc/cproc_metrics.json > /dev/null 2>&1
fi
# Copy simulation results
docker cp "$wls_cid:/wls/messages.json" "./${enclave_name}_logs"
docker cp "$wls_cid:/wls/network_topology/network_data.json" "./${enclave_name}_logs"
echo "- Metrics Infra: $metrics_infra" > ./${enclave_name}_logs/run_args
echo "- Enclave name: $enclave_name" >> ./${enclave_name}_logs/run_args
echo "- Configuration file: $wakurtosis_config_file" >> ./${enclave_name}_logs/run_args
# Copy simulation results
docker cp "$wls_cid:/wls/network_topology/network_data.json" "./${enclave_name}_logs"
docker cp "$wls_cid:/wls/messages.json" "./${enclave_name}_logs"
# Run analysis
if jq -e ."plotting" >/dev/null 2>&1 "./config/${wakurtosis_config_file}";
then
if [ "$metrics_infra" = "container-proc" ];
then
if jq -e ."plotting" >/dev/null 2>&1 "./config/${wakurtosis_config_file}"; then
if [ "$metrics_infra" = "dstats" ]; then
docker run --name "dstats" --network "host" -v "$(pwd)/wakurtosis_logs:/simulation_data/" --add-host=host.docker.internal:host-gateway analysis src/hproc.py dstats /simulation_data/ --config-file /simulation_data/config/config.json >/dev/null 2>&1
docker cp dstats:/analysis/plots/ wakurtosis_logs/dstats-plots
cd wakurtosis_logs
ln -s dstats-plots/output-dstats-compare.pdf analysis.pdf
cd ..
elif [ "$metrics_infra" = "host-proc" ]; then
docker run --name "host-proc" --network "host" -v "$(pwd)/wakurtosis_logs:/simulation_data/" --add-host=host.docker.internal:host-gateway analysis src/hproc.py host-proc /simulation_data/ --config-file /simulation_data/config/config.json >/dev/null 2>&1
docker cp host-proc:/analysis/plots/ wakurtosis_logs/host-proc-plots
cd wakurtosis_logs
ln -s host-proc-plots/output-host-proc-compare.pdf analysis.pdf
cd ..
elif [ "$metrics_infra" = "container-proc" ]; then
docker run --network "host" -v "$(pwd)/wakurtosis_logs:/simulation_data/" --add-host=host.docker.internal:host-gateway analysis src/main.py -i container-proc >/dev/null 2>&1
elif [ "$metrics_infra" = "cadvisor" ]; then
prometheus_port=$(kurtosis enclave inspect wakurtosis | grep "\<prometheus\>" | awk '{print $6}' | awk -F':' '{print $2}')
prometheus_port=$(grep "\<prometheus\>" $kurtosis_inspect | awk '{print $6}' | awk -F':' '{print $2}')
docker run --network "host" -v "$(pwd)/wakurtosis_logs:/simulation_data/" --add-host=host.docker.internal:host-gateway analysis src/main.py -i cadvisor -p "$prometheus_port" >/dev/null 2>&1
fi
fi