Gennet: traits and complete configurability (#102)

* added intitial set of traits

* refactor

* renaming

* added checks for trait distributions

* .gitignore

* traits distribution type enforcement

* added support for all traits

* .gitignore

* .gitignore softlinks

* minor redacts

* updated traits to handle |G| >= num_nodes correctly

* traits dir

* updated the config.json

* now new traits need only tomls, no explicit declaration needed

* recoded the invert_dict_of_list

* unused vars

* generate_subnets comments

* PEP8 space colon

* dict_to_arrays, trait dir fix

* .gitigore

* updated config.json

* added node type check

* disabled nat=any
This commit is contained in:
0xFugue
2023-03-31 17:47:07 +05:30
committed by GitHub
parent 7581e0566a
commit 04a4cc87ef
19 changed files with 398 additions and 102 deletions

View File

@@ -7,27 +7,52 @@ import numpy as np
import random, math
import sys, os
import json, ast
from collections import defaultdict
import time, tracemalloc
import string
import typer
from enum import Enum
import typer, tomli
from enum import Enum, EnumMeta
# Enums & Consts
# To add a new node type, add appropriate entries to the nodeType and nodeTypeSwitch
class nodeType(Enum):
class MetaEnum(EnumMeta):
def __contains__(cls, item):
try:
cls(item)
except ValueError:
return False
return True
class BaseEnum(Enum, metaclass=MetaEnum):
pass
class Trait(BaseEnum):
NWAKU = "nwaku"
GOWAKU = "gowaku"
DISCV5 = "discv5"
DNSDISC = "dnsdisc"
DNS = "dns"
FLTER = "flter"
LIGHTPUSH = "lightpush"
METRICS = "metrics"
NODE = "node"
PEER = "peer"
PEERXCHNG = "peerxchng"
RELAY = "relay"
REST = "rest"
RLN = "rln"
RPC = "rpc"
STORE = "store"
SWAP = "swap"
WEBSOCKET = "websocket"
# To add a new node type, add appropriate entries to the nodeType and nodeTypeToDocker
class nodeType(BaseEnum):
NWAKU = "nwaku" # waku desktop config
GOWAKU = "gowaku" # waku mobile config
nodeTypeToTomlDefault = {
nodeType.NWAKU: "rpc-admin = true\nkeep-alive = true\nmetrics-server = true\n",
nodeType.GOWAKU: "rpc-admin = true\nmetrics-server = true\nrpc = true\n"
}
nodeTypeToDocker = {
nodeType.NWAKU: "nim-waku",
nodeType.GOWAKU: "go-waku"
@@ -49,6 +74,7 @@ NW_DATA_FNAME = "network_data.json"
EXTERNAL_NODES_PREFIX, NODE_PREFIX, SUBNET_PREFIX, CONTAINER_PREFIX = \
"nodes", "node", "subnetwork", "containers"
ID_STR_SEPARATOR = "-"
DEFAULT_TRAITS_DIR="../config/traits"
### I/O related fns ##############################################################
@@ -131,22 +157,26 @@ def generate_config_model(ctx):
degrees[-1] += 1
return nx.configuration_model(degrees) # generate the graph
# |G| = n
def generate_scalefree_graph(ctx):
n = ctx.params["num_nodes"]
return nx.scale_free_graph(n)
# |G| = n; n must be larger than k=D=3
def generate_newmanwattsstrogatz_graph(ctx):
n = ctx.params["num_nodes"]
fanout = ctx.params["fanout"]
return nx.newman_watts_strogatz_graph(n, fanout, 0.5)
# |G| = n (if odd); n+1 (if even)
def generate_barbell_graph(ctx):
n = ctx.params["num_nodes"]
return nx.barbell_graph(int(n / 2), 1)
# |G| > fanout^{\floor{log_n} + 1}
def generate_balanced_tree(ctx):
n = ctx.params["num_nodes"]
@@ -154,6 +184,7 @@ def generate_balanced_tree(ctx):
height = int(math.log(n) / math.log(fanout))
return nx.balanced_tree(fanout, height)
# nomostree is a balanced binary tree with even number of leaves
# |G| = n (if odd); n+1 (if even)
def generate_nomos_tree(ctx):
@@ -166,9 +197,9 @@ def generate_nomos_tree(ctx):
i, diff = 0, G.number_of_nodes() - n
leaves = [x for x in G.nodes() if G.degree(x) == 1]
nleaves = len(leaves)
if (nleaves - diff) % 2 != 0 :
if (nleaves - diff) % 2 != 0:
diff -= 1
for node in leaves :
for node in leaves:
if i == diff:
break
G.remove_node(node)
@@ -176,6 +207,7 @@ def generate_nomos_tree(ctx):
G = nx.convert_node_labels_to_integers(G)
return G
# |G| = n
def generate_star_graph(ctx):
n = ctx.params["num_nodes"]
@@ -203,35 +235,43 @@ def generate_network(ctx):
def postprocess_network(G):
G = nx.Graph(G) # prune out parallel/multi edges
G.remove_edges_from(nx.selfloop_edges(G)) # remove the self-loops
mapping = {i: f"{NODE_PREFIX}{ID_STR_SEPARATOR}{i}" for i in range(len(G))}
mapping = {i : f"{NODE_PREFIX}{ID_STR_SEPARATOR}{i}" for i in range(len(G))}
return nx.relabel_nodes(G, mapping) # label the nodes
def generate_subnets(G, num_subnets):
n = len(G.nodes)
if num_subnets == n: # if num_subnets == size of the network
return {f"{NODE_PREFIX}{ID_STR_SEPARATOR}{i}": f"{SUBNET_PREFIX}_{i}" for i in range(n)}
return {f"{NODE_PREFIX}{ID_STR_SEPARATOR}{i}" : f"{SUBNET_PREFIX}_{i}" for i in range(n)}
# Permute the node indices; this makes sure that the nodes are assigned randomly to subnets
lst = list(range(n))
random.shuffle(lst)
# Select (without replacement) a num_subnets - 1 of offsets; make sure final offset is n-1.
# Each offset demarcates a subnet boundary
offsets = sorted(random.sample(range(0, n), num_subnets - 1))
offsets.append(n - 1)
offsets.append(n - 1) # we have num_subnets offsets
start, subnet_id, node2subnet = 0, 0, {}
for end in offsets:
l = []
for i in range(start, end + 1):
# Build a node2subnet map as follows
# From the permuted lst, pick nodes whose indices are in the closed interval [start, end].
# Remember, these are *sorted* offsets in the range of 0..n and without replacement; so
# they will all index correctly.
# Finally, assign all these node to the current subnet.
for i in range(start, end + 1):
node2subnet[f"{NODE_PREFIX}{ID_STR_SEPARATOR}{lst[i]}"] = f"{SUBNET_PREFIX}_{subnet_id}"
#node2subnet[lst[i]] = subnet_id
start = end
subnet_id += 1
start = end # roll over the start to the end of the last offset
subnet_id += 1 # increment the subnet_id
return node2subnet
### file format related fns ###########################################################
# Generate per node toml configs
def generate_toml(topics, configuration, node_type=nodeType.NWAKU):
topics = get_random_sublist(topics)
def generate_toml(traits_dir, topics, traits_list):
topics, node_type, tomls = get_random_sublist(topics), traits_list[0], ""
if node_type == nodeType.GOWAKU: # comma separated list of quoted topics
topic_str = ", ".join(f"\"{t}\"" for t in topics)
topic_str = f"[{topic_str}]"
@@ -239,51 +279,83 @@ def generate_toml(topics, configuration, node_type=nodeType.NWAKU):
topic_str = " ".join(topics)
topic_str = f"\"{topic_str}\""
if configuration is None:
config = nodeTypeToTomlDefault.get(node_type)
return f"{config}topics = {topic_str}\n"
return f"{configuration}topics = {topic_str}\n"
for trait in traits_list[1:]: # skip the first trait as it is docker/node selector.
with open(f"{traits_dir}/{trait}.toml", 'rb') as f:
toml = ""
for key, value in tomli.load(f).items():
toml += f"{key} = {str(value)}\n"
tomls += toml
return f"{tomls}topics = {topic_str}\n"
# Convert a dict to pair of arrays
def dict_to_arrays(dic):
keys, vals = list(dic.keys()), []
for k in keys :
vals.append(dic[k])
keys, vals = zip(*dic.items())
return keys, vals
# Generate a list of nodeType enums that respects the node type distribution
def generate_node_types(node_type_distribution, G):
num_nodes = G.number_of_nodes()
nodes, node_probs = dict_to_arrays(node_type_distribution)
node_types_str = random.choices(nodes, weights=node_probs, k=num_nodes)
node_types_enum = [nodeType(s) for s in node_types_str]
return node_types_enum
# Check for range failures in a list
def range_fails(lst, min=0, max=100):
return any(x < min or x > max for x in lst)
# Inverts a dictionary of lists
def invert_dict_of_list(d):
inv = {}
# Check for sum failures in a list
def sum_fails(lst, sum_expected=100):
return not sum(lst) == sum_expected
# Construct the nodeType from the trait
def traits_to_nodeType(s):
return nodeType(s.split(':')[0])
# Validate the traits distribution (stick to percentages: num nodes may vary post generation)
def validate_traits_distribution(traits_dir, traits_distribution):
traits, traits_freq = dict_to_arrays(traits_distribution)
if range_fails(traits_freq, max=100):
raise ValueError(f"{traits_distribution} : invalid percentage (>{100} or <0)")
if sum_fails(traits_freq, sum_expected=100):
raise ValueError(f"{traits_distribution} : percentages do not sum to {100}")
if not os.path.exists(traits_dir):
raise ValueError(f"{traits_dir} : trait directory does not exist!")
for s in traits:
traits_list = s.split(":")
if traits_list[0] not in nodeType:
raise ValueError(f"{traits_distribution} : unknown node type {traits_list[0]} in {s}")
for t in traits_list[1:]:
if t not in Trait and not os.path.exists(f"{traits_dir}/{t}.toml"):
raise ValueError(f"{traits_distribution} : unknown trait {t} in {s}")
# Generate a list of nodeType enums that respects the node type distribution
def generate_traits_distribution(node_type_distribution, G):
num_nodes = G.number_of_nodes()
nodes, node_percentage = dict_to_arrays(node_type_distribution)
traits_distribution = []
for i, n in enumerate(nodes):
traits_distribution += [nodes[i]] * math.ceil(node_percentage[i] * num_nodes/100)
random.shuffle(traits_distribution)
return traits_distribution
# Inverts a dictionary of lists (of lists/tuples)
def invert_dict_of_list(d, idx=0):
inv = defaultdict(list)
for key, val in d.items():
if val not in inv:
inv[val] = [key]
else:
inv[val].append(key)
inv[val[idx]].append(key)
return inv
# TODO: reduce container packer memory consumption
# Packs the nodes into container in a subnet aware manner : optimal
# Number of containers =
# $$ O(\sum_{i=0}^{num_subnets} log_{container_size}(#Nodes_{numsubnets}) + num_subnets)
def pack_nodes(container_size, node2subnet, G):
subnet2node = invert_dict_of_list(node2subnet)
def pack_nodes(container_size, node2subnet):
subnet2nodes = invert_dict_of_list(node2subnet)
port_shift, cid, node2container = 0, 0, {}
for subnet in subnet2node:
for node in subnet2node[subnet]:
if port_shift >= container_size :
for subnet in subnet2nodes:
for node in subnet2nodes[subnet]:
if port_shift >= container_size:
port_shift, cid = 0, cid+1
node2container[node] = (port_shift, f"{CONTAINER_PREFIX}_{cid}")
port_shift += 1
@@ -294,29 +366,20 @@ def pack_nodes(container_size, node2subnet, G):
def generate_and_write_files(ctx: typer, G):
topics = generate_topics(ctx.params["num_topics"])
node2subnet = generate_subnets(G, ctx.params["num_subnets"])
node_types_enum = generate_node_types(ctx.params["node_type_distribution"], G)
node2container = pack_nodes(ctx.params["container_size"], node2subnet, G)
traits_distribution = generate_traits_distribution(ctx.params["node_type_distribution"], G)
node2container = pack_nodes(ctx.params["container_size"], node2subnet)
container2nodes = invert_dict_of_list(node2container, 1)
json_dump = {}
json_dump[CONTAINER_PREFIX] = {}
json_dump[EXTERNAL_NODES_PREFIX] = {}
inv = {}
for key, val in node2container.items():
if val[1] not in inv:
inv[val[1]] = [key]
else:
inv[val[1]].append(key)
for container, nodes in inv.items():
json_dump, json_dump[CONTAINER_PREFIX], json_dump[EXTERNAL_NODES_PREFIX] = {}, {}, {}
for container, nodes in container2nodes.items():
json_dump[CONTAINER_PREFIX][container] = nodes
i = 0
i, traits_dir = 0, ctx.params["traits_dir"]
for node in G.nodes:
# package container_size nodes per container
# write the per node toml for the i^ith node of appropriate type
node_type, i = node_types_enum[i], i+1
configuration = ctx.params.get("node_config", {}).get(node_type.value)
write_toml(ctx.params["output_dir"], node, generate_toml(topics, configuration, node_type))
traits_list, i = traits_distribution[i].split(":"), i+1
node_type = nodeType(traits_list[0])
write_toml(ctx.params["output_dir"], node, generate_toml(traits_dir, topics, traits_list))
json_dump[EXTERNAL_NODES_PREFIX][node] = {}
json_dump[EXTERNAL_NODES_PREFIX][node]["static_nodes"] = []
for edge in G.edges(node):
@@ -334,20 +397,20 @@ def generate_and_write_files(ctx: typer, G):
# sanity check : valid json with "gennet" config
def conf_callback(ctx: typer.Context, param: typer.CallbackParam, cfile: str):
def _config_file_callback(ctx: typer.Context, param: typer.CallbackParam, cfile: str):
if cfile:
typer.echo(f"Loading config file: {cfile.split('/')[-1]}")
typer.echo(f"Loading config file: {os.path.basename(cfile)}")
ctx.default_map = ctx.default_map or {} # Init the default map
try:
with open(cfile, 'r') as f: # Load config file
conf = json.load(f)
if "gennet" not in conf:
print(f"Gennet configuration not found in {cfile}. Skipping topology generation.")
print(
f"Gennet configuration not found in {cfile}. Skipping topology generation.")
sys.exit(0)
if "general" in conf and "prng_seed" in conf["general"]:
conf["gennet"]["prng_seed"] = conf["general"]["prng_seed"]
# TODO : type-check and sanity-check the config.json
#print(conf)
# TODO : type-check and sanity-check the values in config.json
ctx.default_map.update(conf["gennet"]) # Merge config and default_map
except Exception as ex:
raise typer.BadParameter(str(ex))
@@ -369,16 +432,16 @@ def _num_subnets_callback(ctx: typer, Context, num_subnets: int):
num_subnets = num_nodes
if num_subnets > num_nodes:
raise ValueError(
f"num_subnets must be <= num_nodes: num_subnets={num_subnets}, num_nodes={1}")
f"num_subnets must be <= num_nodes : num_subnets={num_subnets}, num_nodes={1}")
return num_subnets
def main(ctx: typer.Context,
benchmark: bool = typer.Option(False, help="Measure CPU/Mem usage of Gennet"),
draw: bool = typer.Option(False, help="Draw the generated network"),
container_size: int = typer.Option(1, help="Set the number of nodes per container"), # TODO: reduce container packer memory consumption
container_size: int = typer.Option(1, help="Set the number of nodes per container"),
output_dir: str = typer.Option("network_data", help="Set the output directory for Gennet generated files"),
prng_seed: int = typer.Option(41, help="Set the random seed"),
prng_seed: int = typer.Option(1, help="Set the random seed"),
num_nodes: int = typer.Option(4, help="Set the number of nodes"),
num_topics: int = typer.Option(1, help="Set the number of topics"),
fanout: int = typer.Option(3, help="Set the arity for trees & newmanwattsstrogatz"),
@@ -387,33 +450,25 @@ def main(ctx: typer.Context,
network_type: networkType = typer.Option(networkType.NEWMANWATTSSTROGATZ.value, help="Set the node type"),
num_subnets: int = typer.Option(1, callback=_num_subnets_callback, help="Set the number of subnets"),
num_partitions: int = typer.Option(1, callback=_num_partitions_callback, help="Set the number of network partitions"),
config_file: str = typer.Option("", callback=conf_callback, is_eager=True, help="Set the input config file (JSON)")):
config_file: str = typer.Option("", callback=_config_file_callback, is_eager=True, help="Set the input config file (JSON)")):
# Benchmarking: record start time and start tracing mallocs
if benchmark :
if benchmark:
tracemalloc.start()
start = time.time()
# re-read the conf file to set node_type_distribution -- no cli equivalent
conf = {}
if config_file != "" :
with open(config_file, 'r') as f: # Load config file
conf = json.load(f)
#print(conf)
# set the random seed : networkx uses numpy.random as well
print("Setting the random seed to", prng_seed)
print("Setting the random seed to ", prng_seed)
random.seed(prng_seed)
np.random.seed(prng_seed)
# leaving it for now should any json parsing issues pops up
# Extract the node type distribution from config.json or use the default
# no cli equivalent for node type distribution (NTD)
# if "gennet" in conf and "node_type_distribution" in conf ["gennet"]:
# node_type_distribution = conf["gennet"]["node_type_distribution"]
# else:
# node_type_distribution = { "nwaku" : 100 } # default NTD is all nwaku
if ctx.params["config_file"] == "":
ctx.params["traits_dir"] = DEFAULT_TRAITS_DIR
else:
ctx.params["traits_dir"] = os.path.dirname(ctx.params["config_file"]) + f"/traits"
# validate node type distribution
validate_traits_distribution(ctx.params["traits_dir"], node_type_distribution)
# Generate the network
# G = generate_network(num_nodes, networkType(network_type), tree_arity)
@@ -424,7 +479,9 @@ def main(ctx: typer.Context,
# Generate file format specific data structs and write the files
generate_and_write_files(ctx, G)
if draw :
# Draw the graph if need be
if draw:
draw_network(output_dir, G)
end = time.time()
@@ -432,7 +489,7 @@ def main(ctx: typer.Context,
print(f"For {G.number_of_nodes()}/{num_nodes} nodes, network generation took {time_took} secs.\nThe generated network is under ./{output_dir}")
# Benchmarking. Record finish time and stop the malloc tracing
if benchmark :
if benchmark:
mem_curr, mem_max = tracemalloc.get_traced_memory()
tracemalloc.stop()
print(f"STATS: For {num_nodes} nodes, time took is {time_took} secs, peak memory usage is {mem_max/(1024*1024)} MBs\n")