Gossipsub Simulations (#9)

* episub-sim

* network generation

* mainnet network generation

* no pretty print

* add small network for testing

* upgrade versions, make the thing compile

* checkpoint

* this is ridiculous

* make gen_topology reachable by testground

* add 16 instsances for testing

* add running command

* remove params

* pass new participants param

* update upstream reg

* update upstream reg

* put msg generation back in

* put publishing back in

* adding metrics

* record metrics on intervals

* add gossip max limit

* stop fghting testground. For now

* update README

* Revert "stop fghting testground. For now"

This reverts commit a976c5371b.

* use composition files to get the docker build context a layer up

* fix params and logs

* remove unused files

* cache workspace deps

* cache workspace deps _the right_ way

* updates

* some docs

* Update CI and remove root workspace

* Add duplicates, fix clippy, improve dash

* Reduce message sizes, some debugging

* Update dash and logs

* Add scripts folder

* Fix executor lockup

* fmt and clippy

* Add some docs

* Dot to mermaid

Co-authored-by: Diva M <divma@protonmail.com>
This commit is contained in:
Age Manning
2022-11-29 16:06:53 +11:00
committed by GitHub
parent e31e401776
commit a260f69580
29 changed files with 404356 additions and 608 deletions

3
.dockerignore Normal file
View File

@@ -0,0 +1,3 @@
target/
*.data
*.tar.gz

View File

@@ -7,20 +7,25 @@ on:
- main
jobs:
cargo-fmt:
runs-on: ubuntu-latest
strategy:
matrix:
simulations: ["eth_consensus", "censoring", "smoke"]
steps:
- uses: actions/checkout@v2
- name: Get latest version of stable rust
run: rustup update stable
- name: Check formatting with cargofmt
run: cargo fmt --all -- --check
run: cd "${{ matrix.simulations }}" && cargo fmt --all -- --check
clippy:
needs: cargo-fmt
name: clippy
runs-on: ubuntu-latest
needs: cargo-fmt
strategy:
matrix:
simulations: ["eth_consensus", "censoring", "smoke"]
steps:
- uses: actions/checkout@v1
- name: Install protoc
@@ -32,6 +37,6 @@ jobs:
- name: Install Protoc
uses: arduino/setup-protoc@64c0c85d18e984422218383b81c52f8b077404d3 # v1.1.2
- name: Lint code for quality and style with Clippy
run: cargo clippy --workspace --tests -- -D warnings
run: cd "${{ matrix.simulations }}" && cargo clippy --workspace --tests -- -D warnings
- name: Certify Cargo.lock freshness
run: git diff --exit-code Cargo.lock
run: cd "${{ matrix.simulations }}" && git diff --exit-code Cargo.lock

View File

@@ -1,6 +0,0 @@
[workspace]
members = [
"smoke",
"censoring",
]

View File

@@ -41,6 +41,15 @@ be examined to determine their effectiveness.
See the [censoring documentation](./censoring/README) for instructions on how to run
the simulation.
### [Ethereum Consensus](./eth_consensus/README.md)
This is a simulation of the standard gossipsub network for various sizes of the
Ethereum consensus layer. It can be used in some forms to model network traffic
for the Ethereum consensus layer.
See the [Ethereum consensus documentation](./eth_consensus/README) for instructions on how to run
the simulation.
## Dashboards
Grafana dashboards are provided for some of the simulations. These are provided

View File

@@ -7,7 +7,7 @@ parameters to mitigate censoring attacks on gossipsub networks.
## Running the simulation
```shell
testground run composition -f censoring/compositions/censoring.toml --wait
testground run composition -f censoring/compositions/composition.toml --wait
```
## How the Simulation Works

View File

@@ -9,6 +9,7 @@ services:
- ./grafana/datasources.yml:/etc/grafana/provisioning/datasources/datasources.yml
- ./grafana/dashboards.yml:/etc/grafana/provisioning/dashboards/dashboards.yml
- ./censoring/dashboards:/var/lib/grafana/dashboards/censoring
- ./eth_consensus/dashboards:/var/lib/grafana/dashboards/eth_consensus
networks:
# Connect containers to `testground-control` to access Testground-supplied containers.

View File

@@ -0,0 +1,3 @@
target/
*.data
*.tar.gz

File diff suppressed because it is too large Load Diff

26
eth_consensus/Cargo.toml Normal file
View File

@@ -0,0 +1,26 @@
[workspace]
members = [
"simulation",
"utils/gen_topology",
"utils/gen_topology_files"
]
[workspace.dependencies]
serde_json = "1.0"
serde = "1.0"
testground = "0.4.0"
tokio = { version = "1.21.2", features = ["macros"] }
tracing = "0.1.35"
tracing-subscriber = { version = "0.3.14", features = ["env-filter"] }
[patch]
[patch.crates-io]
types = { git = "https://github.com/divagant-martian/lighthouse", branch = "sizes" }
eth2_ssz = { git = "https://github.com/divagant-martian/lighthouse", branch = "sizes" }
eth2_ssz_types = { git = "https://github.com/divagant-martian/lighthouse", branch = "sizes" }
eth2_serde_utils = { git = "https://github.com/divagant-martian/lighthouse", branch = "sizes" }
tree_hash = { git = "https://github.com/divagant-martian/lighthouse", branch = "sizes" }
eth2_hashing = { git = "https://github.com/divagant-martian/lighthouse", branch = "sizes" }
eth2_ssz_derive = { git = "https://github.com/divagant-martian/lighthouse", branch = "sizes" }
tree_hash_derive = { git = "https://github.com/divagant-martian/lighthouse", branch = "sizes" }

32
eth_consensus/Dockerfile Normal file
View File

@@ -0,0 +1,32 @@
# Get chef
FROM rust:1.65-bullseye as chef
WORKDIR test-plan
RUN cargo install cargo-chef
# Get chef to create a skeleton workspace
FROM chef AS planner
COPY ./plan .
RUN cargo chef prepare --recipe-path recipe.json
FROM chef as builder
# Build dependencies
RUN apt-get update && apt-get -y upgrade && apt-get install -y protobuf-compiler
COPY --from=planner /test-plan/recipe.json ./recipe.json
# Cache the deps using the fake workspace
RUN cargo chef cook --release --recipe-path recipe.json
# Get the real code
COPY ./plan .
# Enjoy
RUN cargo build --release -p simulation
FROM debian:bullseye-slim
COPY --from=builder /test-plan/target/release/simulation /usr/local/bin/eth_consensus
#ENV RUST_LOG=libp2p_gossipsub=debug,simulation=debug
ENV RUST_LOG=simulation=info
ENTRYPOINT ["eth_consensus"]

80
eth_consensus/README.md Normal file
View File

@@ -0,0 +1,80 @@
# Ethereum Consensus Simulation
This simulation mimics the timing, frequency and sizes of messages that would usually
occur on a normal Ethereum consensus gossipsub-network. The simulation can
specify the number of validators and nodes on the network in an attempt to
model various sizes of Ethereum consensus networks.
## Running the Simulation
This simulation can be run with the following command (from within the repos
root directory):
```sh
testground run composition -f ./eth_consensus/compositions/composition.toml --wait
```
Various aspects of the simulation can be modified. Please read the `eth_consensus/manifest.toml` to understand test parameters and `eth_consensus/compositions/composition.toml` to modify them.
## Influx DB Queries
The results of the simulation are stored in an Influx DB instance. Queries
inside grafana can be used to build dashboards. An example query is given:
`SELECT derivative("count", 10s) FROM "topic_msg_recv_bytes" WHERE $timeFilter GROUP BY "hash", "instance_name", "run_id"`
`derivative` = calculation of the rate
`hash` the topic
`instance_name` the number given to the instance inside the test run starting from 0
`run_id` the id of the run
## Controlling the Topology
The topology of the network can be created and visualised using the `utils/gen_topology` and `utils/gen_topology_files` crate.
The `gen_topology_files` binary can output json and dot files to visualise and
understand the topology of the network to be created.
There are a number of input parameters required for generating a network
topology, each of these can be customised in the composition file
`compositions/composition.toml`.
- `seed`: int - This is what is used to seed the random number generator and
can control differing variants of the topology by changing this number.
- `total_validators`: int - This is the total number of validators to be used
within the network.
- `total_node_with_vals`: int - The total number of nodes that will be assigned
validators
- `min_peer_per_node`: int - The minimum number of connections per node to
target when generating the node topology
- `max_peers_per_node`: int - The maximum number of connections per node to
target when generating the node topology
These parameters can be set in the composition in order to generate a specific
topology for the simulation.
To visualise the topology, a dot file and json file can be produced by running:
```
cargo run --bin gen_topology_file <seed> <total_validators>
<total_node_with_vals> <min_peer_per_node> <max_peer_per_node>
<output-dot-file> <output-json-file>
```
an example is:
```
cargo run --bin gen_topology_file 40 200 2 3 2 3 output.dot output.json
```
Which produces the following topology:
```mermaid
graph TD;
0-->2;
0-->3;
1-->2;
2-->4;
3-->1;
4-->1;
```

View File

@@ -0,0 +1,20 @@
[metadata]
name = "eth_consensus"
[global]
builder = "docker:generic"
plan = "gossipsub-testground/eth_consensus"
case = "eth_consensus"
runner = "local:docker"
[global.build_config]
path = "./"
[[groups]]
id = "main-instances"
instances = { count = 3 }
[groups.run]
# Check the plan (./eth_consensus/manifest.toml) to understand the meaning of each param
test_params = { seed = "40", no_val_percentage = "60", total_validators = "200", min_peers_per_node = "2", max_peers_per_node_inclusive = "3", run = "120" }

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,23 @@
name = "eth_consensus"
[defaults]
builder = "docker:generic"
runner = "local:docker"
disable_metrics = false
[builders."docker:generic"]
enabled = true
[runners."local:docker"]
enabled = true
[[testcases]]
name = "eth_consensus"
instances = { min = 3, max = 100, default = 3 }
[testcases.params]
seed = { type = "int", desc = "Seed to use for the rng", default = 40 }
no_val_percentage = { type = "int", desc = "% of nodes without vals", default = 60 }
total_validators = { type = "int", desc = "number of validators", default = 200 }
min_peers_per_node = { type = "int", desc = "minimum number of peers to connect to", default = 2 }
max_peers_per_node_inclusive = { type = "int", desc = "maximum number of peers to connect to", default = 3 }
run = { type = "int", desc = "Time to run the emulation", default=120, unit="sec" }

View File

@@ -0,0 +1,24 @@
[package]
name = "simulation"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
chrono = { version = "0.4.19", features = [ "std" ]}
delay_map = "0.1.1"
# libp2p = { version = "0.48.0", default-features = false, features = ["gossipsub", "dns-tokio", "tcp-tokio", "noise", "mplex", "yamux", "serde"] }
prometheus-client = { git = "https://github.com/prometheus/client_rust.git", rev = "682b24ee8c6c857b76c0683b1dd7df5a97b75c27", features = ["protobuf"] }
libp2p = { git = "https://github.com/ackintosh/rust-libp2p.git", branch = "prometheus-ptotobuf-support", default-features = false, features = ["gossipsub", "dns-tokio", "tcp-tokio", "noise", "mplex", "yamux", "serde"] }
serde_json = "1.0"
serde = "1.0"
testground = "0.4.0"
tokio = { version = "1.21.2", features = ["macros"] }
tracing = "0.1.35"
tracing-subscriber = { version = "0.3.14", features = ["env-filter"] }
gen_topology = { path = "../utils/gen_topology" }
futures = "0.3.24"
npg = { git = "https://github.com/sigp/eth-npg", branch = "timing-change" }
sha2 = "0.10.6"

View File

@@ -0,0 +1,60 @@
mod node_run;
mod utils;
use crate::utils::publish_and_collect;
use libp2p::identity::Keypair;
use libp2p::multiaddr::Protocol;
use libp2p::{Multiaddr, PeerId};
use serde::{Deserialize, Serialize};
use testground::client::Client;
use tracing::info;
use std::collections::HashMap;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
if let Ok(env_filter) = tracing_subscriber::EnvFilter::try_from_default_env() {
tracing_subscriber::fmt().with_env_filter(env_filter).init();
}
let client = Client::new_and_init().await?;
let local_key = Keypair::generate_ed25519();
let peer_id = PeerId::from(local_key.public());
let multiaddr = {
let mut multiaddr = Multiaddr::from(
client
.run_parameters()
.data_network_ip()?
.expect("Should have an IP address for the data network"),
);
multiaddr.push(Protocol::Tcp(9000));
multiaddr
};
// The network definition starts at 0 and the testground sequences start at 1, so adjust
// accordingly.
let node_id = client.global_seq() as usize - 1;
info!("THIS IS MY NUMBER {node_id}");
let instance_info = InstanceInfo { peer_id, multiaddr };
let participants = {
let infos =
publish_and_collect("node_info", &client, (node_id, instance_info.clone())).await?;
info!("Found {}", infos.len());
infos
.into_iter()
.filter(|(other_node_id, _)| *other_node_id != node_id)
.collect::<HashMap<usize, InstanceInfo>>()
};
node_run::run(client, node_id, instance_info, participants, local_key).await?;
Ok(())
}
#[derive(Clone, Debug, Serialize, Deserialize)]
struct InstanceInfo {
peer_id: PeerId,
multiaddr: Multiaddr,
}

View File

@@ -0,0 +1,665 @@
use crate::utils::{
queries_for_counter, queries_for_counter_join, queries_for_gauge, queries_for_histogram,
record_instance_info, BARRIER_LIBP2P_READY, BARRIER_TOPOLOGY_READY,
};
use crate::InstanceInfo;
use chrono::{DateTime, Utc};
use gen_topology::Params;
use libp2p::core::muxing::StreamMuxerBox;
use libp2p::core::upgrade::{SelectUpgrade, Version};
use libp2p::dns::TokioDnsConfig;
use libp2p::futures::StreamExt;
use libp2p::gossipsub::metrics::Config;
use libp2p::gossipsub::subscription_filter::AllowAllSubscriptionFilter;
use libp2p::gossipsub::{
Gossipsub, GossipsubConfigBuilder, GossipsubEvent, GossipsubMessage, IdentTopic,
IdentityTransform, MessageAuthenticity, MessageId, PeerScoreParams, PeerScoreThresholds,
Topic as GossipTopic, ValidationMode,
};
use libp2p::identity::Keypair;
use libp2p::mplex::MplexConfig;
use libp2p::noise::NoiseConfig;
use libp2p::swarm::{SwarmBuilder, SwarmEvent};
use libp2p::tcp::{GenTcpConfig, TokioTcpTransport};
use libp2p::yamux::YamuxConfig;
use libp2p::PeerId;
use libp2p::Swarm;
use libp2p::Transport;
use npg::slot_generator::{Subnet, ValId};
use npg::{Generator, Message};
use prometheus_client::encoding::proto::openmetrics_data_model::MetricSet;
use prometheus_client::encoding::proto::EncodeMetric;
use prometheus_client::registry::Registry;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::{Duration, Instant};
use testground::client::Client;
use tokio::time::{interval, Interval};
use tracing::{debug, error, info};
const ATTESTATION_SUBNETS: u64 = 4;
const SYNC_SUBNETS: u64 = 4;
const SLOTS_PER_EPOCH: u64 = 2;
const SLOT_DURATION: u64 = 12;
#[derive(Serialize, Deserialize, Clone, Debug)]
enum Topic {
Blocks,
Attestations(u64),
Aggregates(u64),
SyncMessages(u64),
SignedContributionAndProof(u64),
}
impl From<Topic> for IdentTopic {
fn from(t: Topic) -> Self {
let rep = serde_json::to_string(&t).expect("json serialization of topics never fails");
GossipTopic::new(rep)
}
}
impl From<IdentTopic> for Topic {
fn from(t: IdentTopic) -> Self {
let repr = t.hash().into_string();
serde_json::from_str(&repr).expect("json deserialization of topics never fails")
}
}
pub(crate) fn parse_params(
instance_count: usize,
instance_params: HashMap<String, String>,
) -> Result<(Duration, Params), Box<dyn std::error::Error>> {
let seed = instance_params
.get("seed")
.ok_or("seed is not specified.")?
.parse::<u64>()?;
let no_val_percentage = instance_params
.get("no_val_percentage")
.ok_or("`no_val_percentage` is not specified")?
.parse::<usize>()?
.min(100);
let total_validators = instance_params
.get("total_validators")
.ok_or("`total_validators` not specified")?
.parse::<usize>()
.map_err(|e| format!("Error reading total_validators {}", e))?;
let min_peers_per_node = instance_params
.get("min_peers_per_node")
.ok_or("`min_peers_per_node` not specified")?
.parse::<usize>()?;
let max_peers_per_node_inclusive = instance_params
.get("max_peers_per_node_inclusive")
.ok_or("`max_peers_per_node_inclusive` not specified")?
.parse::<usize>()?;
let total_nodes_without_vals = instance_count * no_val_percentage / 100;
let total_nodes_with_vals = instance_count - total_nodes_without_vals;
let run = instance_params
.get("run")
.ok_or("run is not specified.")?
.parse::<u64>()?;
let params = Params::new(
seed,
total_validators,
total_nodes_with_vals,
total_nodes_without_vals,
min_peers_per_node,
max_peers_per_node_inclusive,
)?;
Ok((Duration::from_secs(run), params))
}
pub(crate) async fn run(
client: Client,
node_id: usize,
instance_info: InstanceInfo,
participants: HashMap<usize, InstanceInfo>,
keypair: Keypair,
) -> Result<(), Box<dyn std::error::Error>> {
let test_instance_count = client.run_parameters().test_instance_count;
let (run_duration, params) = parse_params(
test_instance_count as usize,
client.run_parameters().test_instance_params,
)?;
let (params, outbound_peers, validator_assignments) =
gen_topology::Network::generate(params)?.destructure();
info!(
"Running with params {params:?} and {} participants",
participants.len()
);
let validator_set = validator_assignments
.get(&node_id)
.cloned()
.unwrap_or_default();
info!("[{}] Validators on this node: {:?}", node_id, validator_set);
let validator_set: HashSet<ValId> =
validator_set.into_iter().map(|v| ValId(v as u64)).collect();
record_instance_info(
&client,
node_id,
&instance_info.peer_id,
&client.run_parameters().test_run,
)
.await?;
let registry: Registry<Box<dyn EncodeMetric>> = Registry::default();
let mut network = Network::new(
registry,
keypair,
node_id,
instance_info,
participants.clone(),
client.clone(),
validator_set,
params,
);
client
.signal_and_wait(BARRIER_TOPOLOGY_READY, test_instance_count)
.await?;
// Set up the listening address
network.start_libp2p().await;
client
.signal_and_wait(BARRIER_LIBP2P_READY, test_instance_count)
.await?;
// Dial the designated outbound peers
network.dial_peers(outbound_peers).await;
client
.signal_and_wait(
BARRIER_TOPOLOGY_READY,
client.run_parameters().test_instance_count,
)
.await?;
if let Err(e) = network.subscribe_topics() {
error!("[{}] Failed to subscribe to topics {e}", network.node_id);
};
network.run_sim(run_duration).await;
client.record_success().await?;
Ok(())
}
/// Set up an encrypted TCP transport over the Mplex and Yamux protocols.
fn build_transport(keypair: &Keypair) -> libp2p::core::transport::Boxed<(PeerId, StreamMuxerBox)> {
let transport = TokioDnsConfig::system(TokioTcpTransport::new(
GenTcpConfig::default().nodelay(true),
))
.expect("DNS config");
let noise_keys = libp2p::noise::Keypair::<libp2p::noise::X25519Spec>::new()
.into_authentic(keypair)
.expect("Signing libp2p-noise static DH keypair failed.");
transport
.upgrade(Version::V1)
.authenticate(NoiseConfig::xx(noise_keys).into_authenticated())
.multiplex(SelectUpgrade::new(
YamuxConfig::default(),
MplexConfig::default(),
))
.timeout(Duration::from_secs(20))
.boxed()
}
// A context struct for passing information into the `record_metrics` function that can be spawned
// into its own task.
struct RecordMetricsInfo {
client: Arc<Client>,
metrics: MetricSet,
node_id: usize,
instance_info: InstanceInfo,
current: DateTime<Utc>,
}
pub(crate) struct Network {
/// Libp2p2 swarm.
swarm: Swarm<Gossipsub>,
/// Node id for this node, local to the test run.
node_id: usize,
/// This nodes contact info.
instance_info: InstanceInfo,
/// Metrics registry.
registry: Registry<Box<dyn EncodeMetric>>,
/// Information of every other participant in the network, indexed by their (local to the test
/// run) node_id.
participants: HashMap<usize, InstanceInfo>,
/// Testground client.
client: Arc<Client>,
/// Chronos time reported by testground as the start of the test run.
start_time: DateTime<Utc>,
/// Instant in which the simmulation starts running, according to the local time.
local_start_time: Instant,
/// How often metrics are recorded.
metrics_interval: Interval,
/// Generator of messages per slot.
messages_gen: Generator,
}
impl Network {
#[allow(clippy::too_many_arguments)]
fn new(
mut registry: Registry<Box<dyn EncodeMetric>>,
keypair: Keypair,
node_id: usize,
instance_info: InstanceInfo,
participants: HashMap<usize, InstanceInfo>,
client: Client,
validator_set: HashSet<ValId>,
params: Params,
) -> Self {
let gossipsub = {
let gossip_message_id = move |message: &GossipsubMessage| {
MessageId::from(
&Sha256::digest([message.topic.as_str().as_bytes(), &message.data].concat())
[..20],
)
};
let gossipsub_config = GossipsubConfigBuilder::default()
.max_transmit_size(10 * 1_048_576) // gossip_max_size(true)
// .heartbeat_interval(Duration::from_secs(1))
.prune_backoff(Duration::from_secs(60))
.mesh_n(8)
.mesh_n_low(4)
.mesh_n_high(12)
.gossip_lazy(6)
.fanout_ttl(Duration::from_secs(60))
.history_length(12)
.max_messages_per_rpc(Some(500)) // Responses to IWANT can be quite large
.history_gossip(3)
// .validate_messages() // TODO: Reintroduce message validation delays
.validation_mode(ValidationMode::Anonymous)
.duplicate_cache_time(Duration::from_secs(SLOT_DURATION * SLOTS_PER_EPOCH + 1))
.message_id_fn(gossip_message_id)
.allow_self_origin(true)
.build()
.expect("valid gossipsub configuration");
let mut gs = Gossipsub::new_with_subscription_filter_and_transform(
MessageAuthenticity::Anonymous,
gossipsub_config,
Some((&mut registry, Config::default())),
AllowAllSubscriptionFilter {},
IdentityTransform {},
)
.expect("Valid configuration");
// Setup the scoring system.
let peer_score_params = PeerScoreParams::default();
gs.with_peer_score(peer_score_params, PeerScoreThresholds::default())
.expect("Valid score params and thresholds");
gs
};
let swarm = SwarmBuilder::new(
build_transport(&keypair),
gossipsub,
PeerId::from(keypair.public()),
)
.executor(Box::new(|future| {
tokio::spawn(future);
}))
.build();
info!(
"[{}] running with {} validators",
node_id,
validator_set.len()
);
let genesis_slot = 0;
let genesis_duration = Duration::ZERO;
let slot_duration = Duration::from_secs(SLOT_DURATION);
let slots_per_epoch = SLOTS_PER_EPOCH;
let sync_subnet_size = 2;
let target_aggregators = 14;
let messages_gen = Generator::builder()
.slot_clock(genesis_slot, genesis_duration, slot_duration)
.slots_per_epoch(slots_per_epoch)
.sync_subnet_size(sync_subnet_size)
.sync_committee_subnets(SYNC_SUBNETS)
.total_validators(params.total_validators() as u64)
.target_aggregators(target_aggregators)
.attestation_subnets(ATTESTATION_SUBNETS)
.build(validator_set)
.expect("need to adjust these params");
let start_time: DateTime<Utc> =
DateTime::parse_from_rfc3339(&client.run_parameters().test_start_time)
.expect("Correct time date format from testground")
.into();
let local_start_time = Instant::now();
Network {
swarm,
node_id,
instance_info,
participants,
client: Arc::new(client),
metrics_interval: interval(slot_duration / 3),
messages_gen,
start_time,
local_start_time,
registry,
}
}
async fn start_libp2p(&mut self) {
self.swarm
.listen_on(self.instance_info.multiaddr.clone())
.expect("Swarm starts listening");
match self.swarm.next().await.unwrap() {
SwarmEvent::NewListenAddr { address, .. } => {
assert_eq!(address, self.instance_info.multiaddr)
}
e => panic!("Unexpected event {:?}", e),
};
}
// Generates the necessary amount of information to record metrics.
fn record_metrics_info(&self) -> RecordMetricsInfo {
// Encode the metrics to an instance of the OpenMetrics protobuf format.
// https://github.com/OpenObservability/OpenMetrics/blob/main/proto/openmetrics_data_model.proto
let metrics = prometheus_client::encoding::proto::encode(&self.registry);
let elapsed = chrono::Duration::from_std(self.local_start_time.elapsed())
.expect("Durations are small");
let current = self.start_time + elapsed;
RecordMetricsInfo {
client: self.client.clone(),
metrics,
node_id: self.node_id,
instance_info: self.instance_info.clone(),
current,
}
}
pub async fn dial_peers(
&mut self,
outbound_peers: std::collections::BTreeMap<usize, Vec<usize>>,
) {
let mut dialed_peers = 0;
if let Some(outbound_peers) = outbound_peers.get(&self.node_id) {
for peer_node_id in outbound_peers {
let InstanceInfo { peer_id, multiaddr } = self
.participants
.get(peer_node_id)
.unwrap_or_else(|| {
panic!("[{}] All outbound peers are participants of the network {peer_node_id} {:?}", self.node_id,self.participants.keys().collect::<Vec<_>>())
})
.clone();
info!(
"[{}] dialing {} on {}",
self.node_id, peer_node_id, multiaddr
);
if let Err(e) = self.swarm.dial(
libp2p::swarm::dial_opts::DialOpts::peer_id(peer_id)
.addresses(vec![multiaddr])
.build(),
) {
panic!(
"[{}] Dialing -> {} failed {}",
self.node_id, peer_node_id, e
);
}
dialed_peers += 1;
}
}
info!("[{}] dialed {} peers", self.node_id, dialed_peers);
}
async fn run_sim(&mut self, run_duration: Duration) {
let deadline = tokio::time::sleep(run_duration);
futures::pin_mut!(deadline);
loop {
tokio::select! {
_ = deadline.as_mut() => {
// Sim complete
break;
}
Some(m) = self.messages_gen.next() => {
let payload = m.payload();
let (topic, val) = match m {
Message::BeaconBlock { proposer: ValId(v), slot: _ } => {
(Topic::Blocks, v)
},
Message::AggregateAndProofAttestation { aggregator: ValId(v), subnet: Subnet(s), slot: _ } => {
(Topic::Aggregates(s), v)
},
Message::Attestation { attester: ValId(v), subnet: Subnet(s), slot: _ } => {
(Topic::Attestations(s), v)
},
Message::SignedContributionAndProof { validator: ValId(v), subnet: Subnet(s), slot: _ } => {
(Topic::SignedContributionAndProof(s), v)
},
Message::SyncCommitteeMessage { validator: ValId(v), subnet: Subnet(s), slot: _ } => {
(Topic::SyncMessages(s), v)
},
};
if let Err(e) = self.publish(topic.clone(), val, &payload) {
error!("Failed to publish message {e} to topic {topic:?}");
}
}
// Record peer scores
_ = self.metrics_interval.tick() => {
let metrics_info = self.record_metrics_info();
// Spawn into its own task
tokio::spawn(record_metrics(metrics_info));
}
event = self.swarm.select_next_some() => {
match event {
SwarmEvent::Behaviour(GossipsubEvent::Message { propagation_source,
message_id: _,
message,
}
) => {
let src_node = self.participants.iter().find(|(_k,v)| v.peer_id == propagation_source).map(|(k,_v)| k);
if message.topic.as_str() == "\"Blocks\"" {
info!("[{}] Received block from: {:?}, size {}", self.node_id, src_node, message.data.len());
}
}
_ => debug!("SwarmEvent: {:?}", event),
}
}
}
}
}
fn publish(
&mut self,
topic: Topic,
validator: u64,
payload: &[u8],
) -> Result<libp2p::gossipsub::MessageId, libp2p::gossipsub::error::PublishError> {
// simple tuples as messages
let msg =
serde_json::to_vec(&(validator, payload)).expect("json serialization never fails");
if let Topic::Blocks = topic {
info!(
"[{}] Publishing message topic: {}, size: {}",
self.node_id,
IdentTopic::from(topic.clone()),
msg.len()
);
}
let ident_topic: IdentTopic = topic.into();
self.swarm.behaviour_mut().publish(ident_topic, msg)
}
pub fn subscribe_topics(&mut self) -> Result<(), Box<dyn std::error::Error>> {
// blocks, attestations and aggregates, sync messages and aggregates
let blocks_topic: IdentTopic =
GossipTopic::new(serde_json::to_string(&Topic::Blocks).unwrap());
self.swarm.behaviour_mut().subscribe(&blocks_topic)?;
for subnet_n in 0..ATTESTATION_SUBNETS {
let attestation_subnet: IdentTopic = Topic::Attestations(subnet_n).into();
let aggregate_subnet: IdentTopic = Topic::Aggregates(subnet_n).into();
self.swarm.behaviour_mut().subscribe(&attestation_subnet)?;
self.swarm.behaviour_mut().subscribe(&aggregate_subnet)?;
}
for subnet_n in 0..SYNC_SUBNETS {
let sync_subnet: IdentTopic = Topic::SyncMessages(subnet_n).into();
let sync_aggregates: IdentTopic = Topic::SignedContributionAndProof(subnet_n).into();
self.swarm.behaviour_mut().subscribe(&sync_subnet)?;
self.swarm.behaviour_mut().subscribe(&sync_aggregates)?;
}
Ok(())
}
}
async fn record_metrics(info: RecordMetricsInfo) {
let run_id = &info.client.run_parameters().test_run;
// Encode the metrics to an instance of the OpenMetrics protobuf format.
// https://github.com/OpenObservability/OpenMetrics/blob/main/proto/openmetrics_data_model.proto
let metric_set = info.metrics;
let mut queries = vec![];
let current = info.current;
let node_id = info.node_id;
for family in metric_set.metric_families.iter() {
let q = match family.name.as_str() {
// ///////////////////////////////////
// Metrics per known topic
// ///////////////////////////////////
"topic_subscription_status" => queries_for_gauge(
&current,
family,
node_id,
&info.instance_info,
run_id,
"status",
),
"topic_peers_counts" => queries_for_gauge(
&current,
family,
node_id,
&info.instance_info,
run_id,
"count",
),
"invalid_messages_per_topic"
| "accepted_messages_per_topic"
| "ignored_messages_per_topic"
| "rejected_messages_per_topic" => {
queries_for_counter(&current, family, node_id, &info.instance_info, run_id)
}
// ///////////////////////////////////
// Metrics regarding mesh state
// ///////////////////////////////////
"mesh_peer_counts" => queries_for_gauge(
&current,
family,
info.node_id,
&info.instance_info,
run_id,
"count",
),
"mesh_peer_inclusion_events" => {
queries_for_counter(&current, family, info.node_id, &info.instance_info, run_id)
}
"mesh_peer_churn_events" => {
queries_for_counter(&current, family, info.node_id, &info.instance_info, run_id)
}
// ///////////////////////////////////
// Metrics regarding messages sent/received
// ///////////////////////////////////
"topic_msg_sent_counts"
| "topic_msg_published"
| "topic_msg_sent_bytes"
| "topic_msg_recv_counts_unfiltered"
| "topic_msg_recv_counts"
| "topic_msg_recv_bytes" => {
queries_for_counter(&current, family, info.node_id, &info.instance_info, run_id)
}
// ///////////////////////////////////
// Metrics related to scoring
// ///////////////////////////////////
"score_per_mesh" => {
queries_for_histogram(&current, family, info.node_id, &info.instance_info, run_id)
}
"scoring_penalties" => {
queries_for_counter(&current, family, info.node_id, &info.instance_info, run_id)
}
// ///////////////////////////////////
// General Metrics
// ///////////////////////////////////
"peers_per_protocol" => queries_for_gauge(
&current,
family,
info.node_id,
&info.instance_info,
run_id,
"peers",
),
"heartbeat_duration" => {
queries_for_histogram(&current, family, info.node_id, &info.instance_info, run_id)
}
// ///////////////////////////////////
// Performance metrics
// ///////////////////////////////////
"topic_iwant_msgs" => {
queries_for_counter(&current, family, info.node_id, &info.instance_info, run_id)
}
"memcache_misses" => {
queries_for_counter(&current, family, info.node_id, &info.instance_info, run_id)
}
_ => unreachable!(),
};
queries.extend(q);
}
// We can't do joins in InfluxDB easily, so do some custom queries here to calculate
// duplicates.
let recvd_unfiltered = metric_set
.metric_families
.iter()
.find(|family| family.name.as_str() == "topic_msg_recv_counts_unfiltered");
if let Some(recvd_unfiltered) = recvd_unfiltered {
let recvd = metric_set
.metric_families
.iter()
.find(|family| family.name.as_str() == "topic_msg_recv_counts");
if let Some(recvd) = recvd {
let q = queries_for_counter_join(
&current,
recvd_unfiltered,
recvd,
"topic_msg_recv_duplicates",
info.node_id,
&info.instance_info,
run_id,
|a, b| a.saturating_sub(b),
);
queries.extend(q);
}
}
for query in queries {
if let Err(e) = info.client.record_metric(query).await {
error!("Failed to record metrics: {:?}", e);
}
}
}

View File

@@ -0,0 +1,261 @@
use crate::InstanceInfo;
use chrono::{DateTime, Local, Utc};
use libp2p::futures::StreamExt;
use libp2p::PeerId;
use prometheus_client::encoding::proto::openmetrics_data_model::counter_value;
use prometheus_client::encoding::proto::openmetrics_data_model::gauge_value;
use prometheus_client::encoding::proto::openmetrics_data_model::metric_point;
use prometheus_client::encoding::proto::openmetrics_data_model::Metric;
use prometheus_client::encoding::proto::openmetrics_data_model::MetricFamily;
use prometheus_client::encoding::proto::HistogramValue;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::borrow::Cow;
use testground::client::Client;
use testground::WriteQuery;
// States for `barrier()`
pub(crate) const BARRIER_LIBP2P_READY: &str = "Started libp2p";
pub(crate) const BARRIER_TOPOLOGY_READY: &str = "Topology generated";
// Tags for InfluxDB
const TAG_INSTANCE_PEER_ID: &str = "instance_peer_id";
const TAG_INSTANCE_NAME: &str = "instance_name";
const TAG_RUN_ID: &str = "run_id";
/// Publish info and collect it from the participants. The return value includes one published by
/// myself.
pub(crate) async fn publish_and_collect<T: Serialize + DeserializeOwned>(
topic: &'static str,
client: &Client,
info: T,
) -> Result<Vec<T>, Box<dyn std::error::Error>> {
let instance_count = client.run_parameters().test_instance_count as usize;
let serialized = Cow::Owned(serde_json::to_value(&info)?);
client.publish(topic, serialized).await?;
let mut stream = client.subscribe(topic, instance_count * 2).await;
let mut vec: Vec<T> = Vec::with_capacity(instance_count);
for _ in 0..instance_count {
match stream.next().await {
Some(Ok(other)) => {
let info: T = serde_json::from_value(other)?;
vec.push(info);
}
Some(Err(e)) => return Err(Box::new(e)),
None => unreachable!(),
}
}
Ok(vec)
}
/// Create InfluxDB queries for Counter metrics.
pub(crate) fn queries_for_counter(
datetime: &DateTime<Utc>,
family: &MetricFamily,
node_id: usize,
instance_info: &InstanceInfo,
run_id: &str,
) -> Vec<WriteQuery> {
let mut queries = vec![];
for metric in family.metrics.iter() {
let mut query = WriteQuery::new((*datetime).into(), family.name.clone())
.add_tag(TAG_INSTANCE_PEER_ID, instance_info.peer_id.to_string())
.add_tag(TAG_INSTANCE_NAME, node_id.to_string())
.add_tag(TAG_RUN_ID, run_id.to_owned())
.add_field(
"count",
get_counter_value(metric).0.expect("should have int value"),
);
for l in &metric.labels {
query = query.add_tag(l.name.clone(), l.value.clone());
}
queries.push(query);
}
queries
}
/// Create InfluxDB queries joining counter metrics
#[allow(clippy::too_many_arguments)]
pub(crate) fn queries_for_counter_join(
datetime: &DateTime<Utc>,
family1: &MetricFamily,
family2: &MetricFamily,
name: &str,
node_id: usize,
instance_info: &InstanceInfo,
run_id: &str,
predicate: fn(u64, u64) -> u64,
) -> Vec<WriteQuery> {
let mut queries = vec![];
for metric in family1.metrics.iter() {
// Match on metric values
let value = {
let current_val = get_counter_value(metric).0.expect("should have int value");
let other_val = family2
.metrics
.iter()
.find(|m2| {
// match on all labels
let mut found = true;
for label in &metric.labels {
if !m2
.labels
.iter()
.any(|l| l.name == label.name && l.value == label.value)
{
found = false;
break;
}
}
found
})
.and_then(|m| get_counter_value(m).0);
other_val.map(|other| predicate(current_val, other))
};
if let Some(val) = value {
let mut query = WriteQuery::new((*datetime).into(), name)
.add_tag(TAG_INSTANCE_PEER_ID, instance_info.peer_id.to_string())
.add_tag(TAG_INSTANCE_NAME, node_id.to_string())
.add_tag(TAG_RUN_ID, run_id.to_owned())
.add_field("count", val);
for l in &metric.labels {
query = query.add_tag(l.name.clone(), l.value.clone());
}
queries.push(query);
}
}
queries
}
/// Create InfluxDB queries for Gauge metrics.
pub(crate) fn queries_for_gauge(
datetime: &DateTime<Utc>,
family: &MetricFamily,
node_id: usize,
instance_info: &InstanceInfo,
run_id: &str,
field_name: &str,
) -> Vec<WriteQuery> {
let mut queries = vec![];
for metric in family.metrics.iter() {
let mut query = WriteQuery::new((*datetime).into(), family.name.clone())
.add_tag(TAG_INSTANCE_PEER_ID, instance_info.peer_id.to_string())
.add_tag(TAG_INSTANCE_NAME, node_id.to_string())
.add_tag(TAG_RUN_ID, run_id.to_owned())
.add_field(
field_name,
get_gauge_value(metric).0.expect("should have int value"),
);
for l in &metric.labels {
query = query.add_tag(l.name.clone(), l.value.clone());
}
queries.push(query);
}
queries
}
/// Create InfluxDB queries for Histogram metrics.
pub(crate) fn queries_for_histogram(
datetime: &DateTime<Utc>,
family: &MetricFamily,
node_id: usize,
instance_info: &InstanceInfo,
run_id: &str,
) -> Vec<WriteQuery> {
let mut queries = vec![];
for metric in family.metrics.iter() {
let histogram = get_histogram_value(metric);
for bucket in histogram.buckets.iter() {
let mut query = WriteQuery::new((*datetime).into(), family.name.clone())
.add_tag(TAG_INSTANCE_PEER_ID, instance_info.peer_id.to_string())
.add_tag(TAG_INSTANCE_NAME, node_id.to_string())
.add_tag(TAG_RUN_ID, run_id.to_owned())
.add_field("count", bucket.count)
.add_field("upper_bound", bucket.upper_bound);
for l in &metric.labels {
query = query.add_tag(l.name.clone(), l.value.clone());
}
queries.push(query);
}
}
queries
}
fn get_gauge_value(metric: &Metric) -> (Option<i64>, Option<f64>) {
assert_eq!(1, metric.metric_points.len());
let metric_point = metric.metric_points.first().unwrap();
let metric_point_value = metric_point.value.as_ref().unwrap().clone();
match metric_point_value {
metric_point::Value::GaugeValue(gauge_value) => match gauge_value.value {
Some(gauge_value::Value::IntValue(i)) => (Some(i), None),
Some(gauge_value::Value::DoubleValue(f)) => (None, Some(f)),
_ => unreachable!(),
},
_ => unreachable!(),
}
}
pub(crate) fn get_counter_value(metric: &Metric) -> (Option<u64>, Option<f64>) {
assert_eq!(1, metric.metric_points.len());
let metric_point = metric.metric_points.first().unwrap();
let metric_point_value = metric_point.value.as_ref().unwrap().clone();
match metric_point_value {
metric_point::Value::CounterValue(counter_value) => match counter_value.total {
Some(counter_value::Total::IntValue(i)) => (Some(i), None),
Some(counter_value::Total::DoubleValue(f)) => (None, Some(f)),
_ => unreachable!(),
},
_ => unreachable!(),
}
}
fn get_histogram_value(metric: &Metric) -> HistogramValue {
assert_eq!(1, metric.metric_points.len());
let metric_point = metric.metric_points.first().unwrap();
let metric_point_value = metric_point.value.as_ref().unwrap().clone();
match metric_point_value {
metric_point::Value::HistogramValue(histogram_value) => histogram_value,
_ => unreachable!(),
}
}
/// Record an InstanceInfo to InfluxDB. This is useful on Grafana dashboard.
pub(crate) async fn record_instance_info(
client: &Client,
node_id: usize,
peer_id: &PeerId,
run_id: &str,
) -> Result<(), testground::errors::Error> {
let query = WriteQuery::new(Local::now().into(), "participants")
.add_tag(TAG_RUN_ID, run_id.to_owned())
// Add below as "field" not tag, because in InfluxQL, SELECT clause can't specify only tag.
// https://docs.influxdata.com/influxdb/v1.8/query_language/explore-data/#select-clause
// > The SELECT clause must specify at least one field when it includes a tag.
.add_field(TAG_INSTANCE_NAME, node_id.to_string())
.add_field(TAG_INSTANCE_PEER_ID, peer_id.to_string());
client.record_metric(query).await
}

View File

@@ -0,0 +1,10 @@
[package]
name = "gen_topology"
version = "0.1.0"
edition = "2021"
[dependencies]
rand = "0.8.5"
rand_chacha = "0.3.1"
serde = { workspace = true, features = [ "derive" ]}

View File

@@ -0,0 +1,280 @@
use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::ops::RangeInclusive;
use rand::seq::SliceRandom;
use rand::Rng;
use rand::SeedableRng;
use rand_chacha::ChaCha8Rng;
use serde::{Deserialize, Serialize};
pub type NodeId = usize;
pub type ValId = usize;
#[derive(Serialize, Deserialize, Debug)]
pub struct Params {
seed: u64,
total_validators: usize,
total_nodes_with_vals: usize,
total_nodes: usize,
connections_range: RangeInclusive<usize>,
}
impl Params {
pub fn new(
seed: u64,
total_validators: usize,
total_nodes_with_vals: usize,
total_nodes_without_vals: usize,
min_peers_per_node: usize,
max_peers_per_node_inclusive: usize,
) -> Result<Self, &'static str> {
let total_nodes = total_nodes_with_vals + total_nodes_without_vals;
if total_nodes_with_vals > total_nodes {
return Err("bad number of nodes with validators");
}
if total_nodes == 0 {
return Err("Empty network");
}
if total_validators == 0 {
return Err("no validators in the network");
}
let connections_range = min_peers_per_node..=max_peers_per_node_inclusive;
if connections_range.is_empty() {
return Err("bad connection range");
}
Ok(Self {
seed,
total_validators,
total_nodes_with_vals,
total_nodes,
connections_range,
})
}
pub fn seed(&self) -> u64 {
self.seed
}
pub fn total_validators(&self) -> usize {
self.total_validators
}
pub fn total_nodes_with_vals(&self) -> usize {
self.total_nodes_with_vals
}
pub fn total_nodes(&self) -> usize {
self.total_nodes
}
pub fn connections_range(&self) -> std::ops::RangeInclusive<usize> {
self.connections_range.clone()
}
}
#[derive(Serialize, Deserialize, Debug)]
pub struct Network {
/// Params used to generate this network configuration. Stored to allow reproduction.
params: Params,
/// Validators managed by each node.
validator_assignments: BTreeMap<NodeId, BTreeSet<ValId>>,
/// Peers to connect to for each node.
outbound_peers: BTreeMap<NodeId, Vec<NodeId>>,
}
impl Network {
fn new(
params: Params,
validator_assignments: BTreeMap<NodeId, BTreeSet<ValId>>,
outbound_peers: BTreeMap<NodeId, Vec<NodeId>>,
) -> Result<Self, String> {
let total_vals: usize = validator_assignments
.values()
.map(|validator_list| validator_list.len())
.sum();
//
// validator assignment checks
//
// first check that under possibility of repetition the number of validators is right
if total_vals != params.total_validators {
return Err(format!("the number of assigned validators does not match the expected number. Found {}, expected {}", total_vals, params.total_validators));
}
let assigned_vals: HashSet<&ValId> = validator_assignments
.values()
.flat_map(|validator_list| validator_list.iter())
.collect();
if assigned_vals.len() != total_vals {
return Err("a validator id was assigned more than once".to_string());
}
if assigned_vals.iter().max().expect("total_validators > 0")
!= &&(params.total_validators - 1)
{
return Err("validator ids do not cover the expected range (wrong max)".to_string());
}
if assigned_vals.iter().min().expect("total_validators > 0") != &&0 {
return Err("validator ids do not cover the expected range (wrong min)".to_string());
}
//
// topology checks
//
let connected_peers: HashSet<NodeId> = outbound_peers
.iter()
.flat_map(|(peer_a, dialed_peers)| dialed_peers.iter().chain(Some(peer_a)))
.cloned()
.collect();
let expected_peers: HashSet<usize> = (0..params.total_nodes).collect();
if connected_peers != expected_peers {
return Err(format!(
"set of dialed peers and expected peers differ: missing {:?}",
expected_peers.difference(&connected_peers)
));
}
println!("Connectedness should be checked with an external tool!");
Ok(Self {
params,
validator_assignments,
outbound_peers,
})
}
pub fn generate(params: Params) -> Result<Self, String> {
// Use a deterministic pseudo random number generator
let mut gen = ChaCha8Rng::seed_from_u64(params.seed);
let validator_assignments = {
// Assing validators to each node that has any validator at all
let mut all_validators = (0..params.total_validators).collect::<Vec<_>>();
let mut cuts: Vec<_> = all_validators
.choose_multiple(&mut gen, params.total_nodes_with_vals - 1)
.cloned()
.collect();
cuts.push(params.total_validators);
cuts.push(0);
all_validators.shuffle(&mut gen);
cuts.sort();
let validator_assignments: BTreeMap<NodeId, BTreeSet<ValId>> = cuts
.windows(2)
.enumerate()
.map(|(node_id, current_cut)| {
// NOTE: this means the nodes that have validators are the first
// `params.total_nodes_with_vals`. Since the node_id is an abstract construct not
// used anywhere I don't think it's worth randomizing this part
let start = current_cut[0];
let end = current_cut[1];
let assigned_vals: BTreeSet<ValId> =
all_validators[start..end].iter().cloned().collect();
(node_id, assigned_vals)
})
.collect();
validator_assignments
};
let outbound_peers = {
let mut outbound_peers: BTreeMap<NodeId, Vec<NodeId>> = BTreeMap::default();
// First build the set of all possible connections (a,b) ignoring connection direction
let mut all_connections: Vec<(usize, usize)> = (0..params.total_nodes)
.flat_map(|i| (i + 1..params.total_nodes).map(move |j| (i, j)))
.collect();
// Keep track of all connections (inbound and outbound) per peer.
// BTreeSet is useful for debugging with consistent order.
type IsOutbound = bool;
let mut topology = BTreeMap::<NodeId, (usize, BTreeMap<NodeId, IsOutbound>)>::default();
let connections_range = params.connections_range();
// for each node_id, generate a random expected number of connections within the given
// range
for p in 0..params.total_nodes {
// decide how many connections should the node have.
let num_peers = gen.gen_range(connections_range.clone());
// store the expected number of connections and a default map to keep track of the
// added connections.
topology.insert(p, (num_peers, BTreeMap::default()));
}
// Pick a random connection and add it if it's useful.
all_connections.shuffle(&mut gen);
while let Some((a, b)) = all_connections.pop() {
let (expected, current) = topology.get(&a).unwrap();
if current.len() >= *expected {
continue;
}
let (expected, current) = topology.get(&b).unwrap();
if current.len() >= *expected {
continue;
}
let from_a_to_b = gen.gen_ratio(3, 5);
topology.get_mut(&a).unwrap().1.insert(b, from_a_to_b);
topology.get_mut(&b).unwrap().1.insert(a, !from_a_to_b);
if from_a_to_b {
outbound_peers.entry(a).or_default().push(b);
} else {
outbound_peers.entry(b).or_default().push(a);
}
}
if topology
.values()
.any(|(_expected_connections, connections)| {
!connections_range.contains(&connections.len())
})
{
// The _expected_connections number might not be reached for a few nodes. We really
// care about the number of connections being withing the set range. I haven't seen
// this happen so far.
eprintln!("Some nodes didn't reach the expected number of connections");
}
outbound_peers
};
Network::new(params, validator_assignments, outbound_peers)
.map_err(|e| format!("Network generation failed: {e}"))
}
pub fn outbound_peers(&self) -> &BTreeMap<NodeId, Vec<NodeId>> {
&self.outbound_peers
}
pub fn validator_assignments(&self) -> &BTreeMap<NodeId, BTreeSet<ValId>> {
&self.validator_assignments
}
pub fn params(&self) -> &Params {
&self.params
}
#[allow(clippy::type_complexity)]
pub fn destructure(
self,
) -> (
Params,
BTreeMap<NodeId, Vec<NodeId>>,
BTreeMap<NodeId, BTreeSet<ValId>>,
) {
let Network {
params,
validator_assignments,
outbound_peers,
} = self;
(params, outbound_peers, validator_assignments)
}
}

View File

@@ -0,0 +1,11 @@
[package]
name = "gen_topology_files"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
gen_topology = { path = "../gen_topology" }
clap = { version = "4.0.10", features = ["derive"] }
serde_json.workspace = true

File diff suppressed because it is too large Load Diff

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,65 @@
use clap::Parser;
extern crate gen_topology;
use std::io::Write;
#[derive(Parser)]
pub struct RunParams {
seed: u64,
total_validators: usize,
total_nodes_with_vals: usize,
total_nodes: usize,
min_peers_per_node: usize,
max_peers_per_node_inc: usize,
dotfile_name: String,
config_file_name: String,
}
fn main() {
if let Err(e) = gen_and_save() {
eprintln!("{}", e);
std::process::exit(1)
}
}
fn gen_and_save() -> Result<(), String> {
let RunParams {
seed,
total_validators,
total_nodes_with_vals,
total_nodes,
min_peers_per_node,
max_peers_per_node_inc,
dotfile_name,
config_file_name,
} = RunParams::parse();
let params = gen_topology::Params::new(
seed,
total_validators,
total_nodes_with_vals,
total_nodes,
min_peers_per_node,
max_peers_per_node_inc,
)?;
let network = gen_topology::Network::generate(params)?;
// gen the dotfile
let mut file =
std::fs::File::create(dotfile_name).map_err(|e| format!("Failed to create dotfile {e}"))?;
writeln!(file, "digraph {{").map_err(|e| format!("Failed writing dotfile {e}"))?;
for (peer_a, dialed_peers) in network.outbound_peers() {
for peer_b in dialed_peers {
writeln!(file, "\t{peer_a} -> {peer_b};")
.map_err(|e| format!("Failed writing dotfile {e}"))?;
}
}
writeln!(file, "}}").map_err(|e| format!("Failed writing dotfile {e}"))?;
// gen the config file
let mut file = std::fs::File::create(config_file_name)
.map_err(|e| format!("Failed creating network file {e}"))?;
let network_rep = serde_json::to_string(&network)
.map_err(|e| format!("Failed to create network file {e}"))?;
write!(file, "{}", network_rep).map_err(|e| format!("Failed to write network file {e}"))?;
Ok(())
}

View File

@@ -13,4 +13,9 @@ providers:
type: file
options:
path: /var/lib/grafana/dashboards/censoring/PeerScores.json
- name: 'Basic Gossipsub'
orgId: 1
folder: 'Eth Consensus'
type: file
options:
path: /var/lib/grafana/dashboards/eth_consensus/BasicGossipsub.json

6
scripts/clear_db.sh Executable file
View File

@@ -0,0 +1,6 @@
#! /bin/sh
# This script is used to drop and re-create the InfluxDB docker container,
# removing all past testground runs.
docker rm -f testground-influxdb
testground healthcheck --runner local:docker --fix

View File

@@ -1,6 +1,6 @@
# This Dockerfile is for the `docker:generic` builder.
# See https://docs.testground.ai/builder-library/docker-generic for details about the builder.
FROM rust:1.62-bullseye as builder
FROM rust:1.64-bullseye as builder
WORKDIR /usr/src/test-plan
# * `prost-build`, a dependency of `libp2p-gossipsub`, requires cmake.
@@ -16,6 +16,7 @@ RUN apt-get update && apt-get install -y cmake && apt-get install -y protobuf-co
RUN mkdir -p ./plan/src/
# This is a placeholder main function to build only the dependencies.
RUN echo "fn main() { println!(\"If you see this message, you may want to clean up the target directory or the Docker build cache.\") }" > ./plan/src/main.rs
COPY ./plan/Cargo.toml ./plan/
RUN cd ./plan/ && cargo build --release
@@ -37,4 +38,4 @@ COPY --from=builder /usr/src/test-plan/plan/target/release/smoke /usr/local/bin/
# Configure Logging
# ENV RUST_LOG=libp2p_gossipsub=debug
ENTRYPOINT ["smoke"]
ENTRYPOINT ["smoke"]

View File

@@ -14,5 +14,5 @@ enabled = true
name = "smoke"
instances = { min = 2, max = 100, default = 2 }
[testcases.params]
gossipsub_history_length = { type = "int", desc = "Number of heartbeats to keep in the `memcache`", default = 5 }
[testcases.params]
gossipsub_history_length = { type = "int", desc = "Number of heartbeats to keep in the `memcache`", default = 5 }