Switch target crate to sigp's gossipsub implementation (#17)

* Bump the Rust version

* Update smoke plan

* Update scoring plan

* Update censoring plan

* Fix wrong file name

* Update eth-consensus plan

* Fix clippy warnings

* cargo fmt
This commit is contained in:
Akihito Nakano
2024-04-09 14:37:18 +09:00
committed by GitHub
parent 71874683e9
commit 1193aebffe
23 changed files with 5710 additions and 5356 deletions

2420
censoring/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -6,12 +6,12 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
# unstable branch
gossipsub = { git = "https://github.com/sigp/lighthouse.git", rev = "f8fdb71f50851bf7792e68a4ee31125bd645e19a" }
chrono = { version = "0.4.19", default-features = false, features = ["clock"] } chrono = { version = "0.4.19", default-features = false, features = ["clock"] }
delay_map = "0.1.1" delay_map = "0.1.1"
libp2p = { version = "0.53.2", default-features = false, features = ["dns", "tcp", "tokio", "noise", "yamux", "serde"] }
# TODO: Update libp2p once the next version, which includes prometheus-client v0.20, has been released.
# See https://github.com/ackintosh/rust-libp2p/commit/df09870c8c2294cbaeb881f58d4f9752125562bc
libp2p = { git = "https://github.com/ackintosh/rust-libp2p.git", branch = "prometheus-client-0.20.0", default-features = false, features = ["gossipsub", "dns", "tcp", "tokio", "noise", "mplex", "yamux", "serde"] }
# This is a fork of `libp2p` in order to implement malicious behaviour in the `attacker` module. # This is a fork of `libp2p` in order to implement malicious behaviour in the `attacker` module.
# This `libp2p-testground` is used in `attacker` module instead of `libp2p`. # This `libp2p-testground` is used in `attacker` module instead of `libp2p`.
@@ -26,9 +26,5 @@ testground = { git = "https://github.com/testground/sdk-rust.git", rev = "1fd032
tokio = { version = "1.20.0", features = ["macros"] } tokio = { version = "1.20.0", features = ["macros"] }
tracing = "0.1.35" tracing = "0.1.35"
tracing-subscriber = { version = "0.3.14", features = ["env-filter"] } tracing-subscriber = { version = "0.3.14", features = ["env-filter"] }
prometheus-client = { version = "0.22.2", features = ["protobuf"] }
# TODO: Update prometheus-client once the next version, which includes the fix, has been released.
# See https://github.com/prometheus/client_rust/pull/123
prometheus-client = { git = "https://github.com/ackintosh/client_rust.git", branch = "fix/protobuf-labels", features = ["protobuf"] }
prost = "0.11" prost = "0.11"

View File

@@ -1,6 +1,6 @@
# This Dockerfile is for the `docker:generic` builder. # This Dockerfile is for the `docker:generic` builder.
# See https://docs.testground.ai/builder-library/docker-generic for details about the builder. # See https://docs.testground.ai/builder-library/docker-generic for details about the builder.
FROM rust:1.67-bullseye as builder FROM rust:1.77-bullseye as builder
WORKDIR /usr/src/test-plan WORKDIR /usr/src/test-plan
# * `prost-build`, a dependency of `libp2p-gossipsub`, requires cmake. # * `prost-build`, a dependency of `libp2p-gossipsub`, requires cmake.

View File

@@ -59,9 +59,10 @@ pub(crate) async fn run(
// Start libp2p // Start libp2p
// //////////////////////////////////////////////////////////////////////// // ////////////////////////////////////////////////////////////////////////
let mut swarm = build_swarm(keypair); let mut swarm = build_swarm(keypair);
swarm.listen_on(instance_info.multiaddr.clone())?; swarm.listen_on(recreate_multiaddr(&instance_info.multiaddr))?;
match swarm.next().await.unwrap() { match swarm.next().await.unwrap() {
SwarmEvent::NewListenAddr { address, .. } if address == instance_info.multiaddr => {} SwarmEvent::NewListenAddr { address, .. }
if address == recreate_multiaddr(&instance_info.multiaddr) => {}
e => panic!("Unexpected event {:?}", e), e => panic!("Unexpected event {:?}", e),
} }
@@ -79,7 +80,7 @@ pub(crate) async fn run(
}; };
client.record_message(format!("Victim: {:?}", victim)); client.record_message(format!("Victim: {:?}", victim));
swarm.dial(victim.multiaddr)?; swarm.dial(recreate_multiaddr(&victim.multiaddr))?;
barrier_and_drive_swarm(&client, &mut swarm, BARRIER_WARMUP).await?; barrier_and_drive_swarm(&client, &mut swarm, BARRIER_WARMUP).await?;
@@ -119,6 +120,12 @@ fn build_transport(
.boxed() .boxed()
} }
// We are recreating from `libp2p::Multiaddr` to `libp2p_testground::Multiaddr` because we use two
// different versions of libp2p.
fn recreate_multiaddr(addr: &libp2p::Multiaddr) -> libp2p_testground::Multiaddr {
libp2p_testground::Multiaddr::try_from(addr.to_vec()).unwrap()
}
type GossipsubNetworkBehaviourAction = type GossipsubNetworkBehaviourAction =
NetworkBehaviourAction<GossipsubEvent, GossipsubHandler, GossipsubHandlerIn>; NetworkBehaviourAction<GossipsubEvent, GossipsubHandler, GossipsubHandlerIn>;

View File

@@ -4,26 +4,16 @@ use crate::utils::{
}; };
use crate::{InstanceInfo, Role}; use crate::{InstanceInfo, Role};
use chrono::Local; use chrono::Local;
use libp2p::core::muxing::StreamMuxerBox; use gossipsub::{
use libp2p::core::upgrade::{SelectUpgrade, Version}; AllowAllSubscriptionFilter, Behaviour, ConfigBuilder, IdentTopic, IdentityTransform,
use libp2p::dns::TokioDnsConfig; MessageAuthenticity, MetricsConfig, PeerScoreParams, PeerScoreThresholds, Topic,
use libp2p::futures::StreamExt; TopicScoreParams,
use libp2p::gossipsub::metrics::Config;
use libp2p::gossipsub::subscription_filter::AllowAllSubscriptionFilter;
use libp2p::gossipsub::{
Behaviour, ConfigBuilder, IdentTopic, IdentityTransform, MessageAuthenticity, PeerScoreParams,
PeerScoreThresholds, Topic, TopicScoreParams,
}; };
use libp2p::core::muxing::StreamMuxerBox;
use libp2p::futures::StreamExt;
use libp2p::identity::Keypair; use libp2p::identity::Keypair;
use libp2p::mplex::MplexConfig; use libp2p::swarm::{DialError, SwarmEvent};
use libp2p::noise::NoiseConfig; use libp2p::{noise, yamux, Multiaddr, PeerId, Swarm, SwarmBuilder, Transport};
use libp2p::swarm::{DialError, SwarmBuilder, SwarmEvent};
use libp2p::tcp::tokio::Transport as TcpTransport;
use libp2p::tcp::Config as TcpConfig;
use libp2p::yamux::YamuxConfig;
use libp2p::PeerId;
use libp2p::Transport;
use libp2p::{Multiaddr, Swarm};
use prometheus_client::registry::Registry; use prometheus_client::registry::Registry;
use rand::seq::SliceRandom; use rand::seq::SliceRandom;
use rand::SeedableRng; use rand::SeedableRng;
@@ -289,20 +279,17 @@ pub(crate) async fn run(
/// Set up an encrypted TCP transport over the Mplex and Yamux protocols. /// Set up an encrypted TCP transport over the Mplex and Yamux protocols.
fn build_transport(keypair: &Keypair) -> libp2p::core::transport::Boxed<(PeerId, StreamMuxerBox)> { fn build_transport(keypair: &Keypair) -> libp2p::core::transport::Boxed<(PeerId, StreamMuxerBox)> {
let transport = TokioDnsConfig::system(TcpTransport::new(TcpConfig::default().nodelay(true))) let tcp = libp2p::tcp::tokio::Transport::new(libp2p::tcp::Config::default().nodelay(true));
.expect("DNS config"); let transport = libp2p::dns::tokio::Transport::system(tcp)
.expect("DNS")
let noise_keys = libp2p::noise::Keypair::<libp2p::noise::X25519Spec>::new() .boxed();
.into_authentic(keypair)
.expect("Signing libp2p-noise static DH keypair failed.");
transport transport
.upgrade(Version::V1) .upgrade(libp2p::core::upgrade::Version::V1)
.authenticate(NoiseConfig::xx(noise_keys).into_authenticated()) .authenticate(
.multiplex(SelectUpgrade::new( noise::Config::new(keypair).expect("signing can fail only once during starting a node"),
YamuxConfig::default(), )
MplexConfig::default(), .multiplex(yamux::Config::default())
))
.timeout(Duration::from_secs(20)) .timeout(Duration::from_secs(20))
.boxed() .boxed()
} }
@@ -354,7 +341,7 @@ impl HonestNetwork {
let mut gs = Behaviour::new_with_subscription_filter_and_transform( let mut gs = Behaviour::new_with_subscription_filter_and_transform(
MessageAuthenticity::Signed(keypair.clone()), MessageAuthenticity::Signed(keypair.clone()),
gossipsub_config, gossipsub_config,
Some((registry, Config::default())), Some((registry, MetricsConfig::default())),
AllowAllSubscriptionFilter {}, AllowAllSubscriptionFilter {},
IdentityTransform {}, IdentityTransform {},
) )
@@ -371,12 +358,14 @@ impl HonestNetwork {
gs gs
}; };
let swarm = SwarmBuilder::with_tokio_executor( let transport = build_transport(&keypair);
build_transport(&keypair), let swarm = SwarmBuilder::with_existing_identity(keypair)
gossipsub, .with_tokio()
PeerId::from(keypair.public()), .with_other_transport(|_| transport)
) .expect("infallible")
.build(); .with_behaviour(|_| gossipsub)
.expect("infallible")
.build();
let mut peer_to_instance_name = HashMap::new(); let mut peer_to_instance_name = HashMap::new();
for info in participants { for info in participants {

2628
eth_consensus/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,5 +1,5 @@
# Get chef # Get chef
FROM rust:1.67-bullseye as chef FROM rust:1.77-bullseye as chef
WORKDIR test-plan WORKDIR test-plan
RUN cargo install cargo-chef RUN cargo install cargo-chef

View File

@@ -6,15 +6,13 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
# unstable branch
gossipsub = { git = "https://github.com/sigp/lighthouse.git", rev = "f8fdb71f50851bf7792e68a4ee31125bd645e19a" }
libp2p = { version = "0.53.2", default-features = false, features = ["dns", "tcp", "tokio", "noise", "yamux", "serde"] }
chrono = { version = "0.4.19", features = [ "std" ]} chrono = { version = "0.4.19", features = [ "std" ]}
delay_map = "0.1.1" delay_map = "0.1.1"
prometheus-client = { version = "0.22.2", features = ["protobuf"] }
# TODO: Update prometheus-client once the next version, which includes the fix, has been released.
# See https://github.com/prometheus/client_rust/pull/123
prometheus-client = { git = "https://github.com/ackintosh/client_rust.git", branch = "fix/protobuf-labels", features = ["protobuf"] }
# TODO: Update libp2p once the next version, which includes prometheus-client v0.20, has been released.
# See https://github.com/ackintosh/rust-libp2p/commit/df09870c8c2294cbaeb881f58d4f9752125562bc
libp2p = { git = "https://github.com/ackintosh/rust-libp2p.git", branch = "prometheus-client-0.20.0", default-features = false, features = ["gossipsub", "dns", "tcp", "tokio", "noise", "mplex", "yamux", "serde"] }
serde_json = "1.0" serde_json = "1.0"
serde = "1.0" serde = "1.0"
testground.workspace = true testground.workspace = true

View File

@@ -1,12 +1,9 @@
use crate::InstanceInfo; use crate::InstanceInfo;
use chrono::{DateTime, FixedOffset}; use chrono::{DateTime, FixedOffset};
use futures::stream::FuturesUnordered; use futures::stream::FuturesUnordered;
use libp2p::gossipsub::{ use gossipsub::{Behaviour, Event, IdentTopic, MessageId, Topic as GossipTopic};
Behaviour, Event, HandlerError, IdentTopic, MessageId, Topic as GossipTopic,
};
use libp2p::swarm::SwarmEvent; use libp2p::swarm::SwarmEvent;
use libp2p::PeerId; use libp2p::{PeerId, Swarm};
use libp2p::Swarm;
use npg::Generator; use npg::Generator;
use prometheus_client::registry::Registry; use prometheus_client::registry::Registry;
use rand::Rng; use rand::Rng;
@@ -118,7 +115,7 @@ impl Network {
topic: Topic, topic: Topic,
validator: u64, validator: u64,
mut payload: Vec<u8>, mut payload: Vec<u8>,
) -> Result<libp2p::gossipsub::MessageId, libp2p::gossipsub::PublishError> { ) -> Result<MessageId, gossipsub::PublishError> {
// Plain binary as messages, coupled with the validator // Plain binary as messages, coupled with the validator
payload.append(&mut validator.to_be_bytes().to_vec()); payload.append(&mut validator.to_be_bytes().to_vec());
@@ -135,7 +132,7 @@ impl Network {
} }
// An inbound event or swarm event gets sent here // An inbound event or swarm event gets sent here
fn handle_swarm_event(&mut self, event: SwarmEvent<Event, HandlerError>) { fn handle_swarm_event(&mut self, event: SwarmEvent<Event>) {
match event { match event {
SwarmEvent::Behaviour(Event::Message { SwarmEvent::Behaviour(Event::Message {
propagation_source, propagation_source,

View File

@@ -4,7 +4,7 @@ use crate::utils::{
}; };
use crate::InstanceInfo; use crate::InstanceInfo;
use chrono::{DateTime, FixedOffset}; use chrono::{DateTime, FixedOffset};
use libp2p::gossipsub::IdentTopic; use gossipsub::IdentTopic;
use prometheus_client::encoding::protobuf::openmetrics_data_model::MetricSet; use prometheus_client::encoding::protobuf::openmetrics_data_model::MetricSet;
use std::sync::Arc; use std::sync::Arc;
use testground::client::Client; use testground::client::Client;
@@ -133,6 +133,9 @@ pub(crate) async fn record_metrics(info: RecordMetricsInfo) {
| "rejected_messages_per_topic" => { | "rejected_messages_per_topic" => {
queries_for_counter(&current, family, node_id, &info.instance_info, run_id) queries_for_counter(&current, family, node_id, &info.instance_info, run_id)
} }
"publish_messages_dropped_per_topic" | "forward_messages_dropped_per_topic" => {
continue;
}
// /////////////////////////////////// // ///////////////////////////////////
// Metrics regarding mesh state // Metrics regarding mesh state
// /////////////////////////////////// // ///////////////////////////////////
@@ -193,6 +196,12 @@ pub(crate) async fn record_metrics(info: RecordMetricsInfo) {
"memcache_misses" => { "memcache_misses" => {
queries_for_counter(&current, family, info.node_id, &info.instance_info, run_id) queries_for_counter(&current, family, info.node_id, &info.instance_info, run_id)
} }
// ///////////////////////////////////
// The size of the priority queue.
// ///////////////////////////////////
"priority_queue_size" | "non_priority_queue_size" => {
continue;
}
_ => unreachable!(), _ => unreachable!(),
}; };

View File

@@ -2,25 +2,17 @@ use crate::utils::{record_instance_info, BARRIER_LIBP2P_READY, BARRIER_TOPOLOGY_
use crate::InstanceInfo; use crate::InstanceInfo;
use futures::stream::FuturesUnordered; use futures::stream::FuturesUnordered;
use gen_topology::Params; use gen_topology::Params;
use libp2p::core::muxing::StreamMuxerBox; use gossipsub::{
use libp2p::core::upgrade::{SelectUpgrade, Version}; AllowAllSubscriptionFilter, Behaviour, ConfigBuilder, IdentityTransform,
use libp2p::dns::TokioDnsConfig; Message as GossipsubMessage, MessageAuthenticity, MessageId, MetricsConfig, PeerScoreParams,
use libp2p::futures::StreamExt; PeerScoreThresholds, ValidationMode,
use libp2p::gossipsub::metrics::Config;
use libp2p::gossipsub::subscription_filter::AllowAllSubscriptionFilter;
use libp2p::gossipsub::{
Behaviour, ConfigBuilder, IdentityTransform, Message as GossipsubMessage, MessageAuthenticity,
MessageId, PeerScoreParams, PeerScoreThresholds, ValidationMode,
}; };
use libp2p::core::muxing::StreamMuxerBox;
use libp2p::futures::StreamExt;
use libp2p::identity::Keypair; use libp2p::identity::Keypair;
use libp2p::mplex::MplexConfig; use libp2p::swarm::SwarmEvent;
use libp2p::noise::NoiseConfig;
use libp2p::swarm::{SwarmBuilder, SwarmEvent};
use libp2p::tcp::tokio::Transport as TcpTransport;
use libp2p::tcp::Config as TcpConfig;
use libp2p::yamux::YamuxConfig;
use libp2p::PeerId;
use libp2p::Transport; use libp2p::Transport;
use libp2p::{noise, yamux, PeerId, SwarmBuilder};
use npg::slot_generator::{Subnet, ValId}; use npg::slot_generator::{Subnet, ValId};
use npg::Generator; use npg::Generator;
use npg::Message; use npg::Message;
@@ -121,7 +113,7 @@ pub fn setup_gossipsub(registry: &mut Registry) -> Behaviour {
let mut gs = Behaviour::new_with_subscription_filter_and_transform( let mut gs = Behaviour::new_with_subscription_filter_and_transform(
MessageAuthenticity::Anonymous, MessageAuthenticity::Anonymous,
gossipsub_config, gossipsub_config,
Some((registry, Config::default())), Some((registry, MetricsConfig::default())),
AllowAllSubscriptionFilter {}, AllowAllSubscriptionFilter {},
IdentityTransform {}, IdentityTransform {},
) )
@@ -217,20 +209,17 @@ pub(crate) async fn run(
pub fn build_transport( pub fn build_transport(
keypair: &Keypair, keypair: &Keypair,
) -> libp2p::core::transport::Boxed<(PeerId, StreamMuxerBox)> { ) -> libp2p::core::transport::Boxed<(PeerId, StreamMuxerBox)> {
let transport = TokioDnsConfig::system(TcpTransport::new(TcpConfig::default().nodelay(true))) let tcp = libp2p::tcp::tokio::Transport::new(libp2p::tcp::Config::default().nodelay(true));
.expect("DNS config"); let transport = libp2p::dns::tokio::Transport::system(tcp)
.expect("DNS")
let noise_keys = libp2p::noise::Keypair::<libp2p::noise::X25519Spec>::new() .boxed();
.into_authentic(keypair)
.expect("Signing libp2p-noise static DH keypair failed.");
transport transport
.upgrade(Version::V1) .upgrade(libp2p::core::upgrade::Version::V1)
.authenticate(NoiseConfig::xx(noise_keys).into_authenticated()) .authenticate(
.multiplex(SelectUpgrade::new( noise::Config::new(keypair).expect("signing can fail only once during starting a node"),
YamuxConfig::default(), )
MplexConfig::default(), .multiplex(yamux::Config::default())
))
.timeout(Duration::from_secs(20)) .timeout(Duration::from_secs(20))
.boxed() .boxed()
} }
@@ -250,12 +239,14 @@ impl Network {
) -> Self { ) -> Self {
let gossipsub = setup_gossipsub(&mut registry); let gossipsub = setup_gossipsub(&mut registry);
let swarm = SwarmBuilder::with_tokio_executor( let transport = build_transport(&keypair);
build_transport(&keypair), let swarm = SwarmBuilder::with_existing_identity(keypair)
gossipsub, .with_tokio()
PeerId::from(keypair.public()), .with_other_transport(|_| transport)
) .expect("infallible")
.build(); .with_behaviour(|_| gossipsub)
.expect("infallible")
.build();
info!( info!(
"[{}] running with {} validators", "[{}] running with {} validators",
@@ -373,7 +364,7 @@ impl Network {
event = self.swarm.select_next_some() => self.handle_swarm_event(event), event = self.swarm.select_next_some() => self.handle_swarm_event(event),
Some(x) = self.messages_to_validate.next(), if !self.messages_to_validate.is_empty() => { // Message needs validation Some(x) = self.messages_to_validate.next(), if !self.messages_to_validate.is_empty() => { // Message needs validation
let (message_id, peer_id) = x.into_inner(); let (message_id, peer_id) = x.into_inner();
if let Err(e) = self.swarm.behaviour_mut().report_message_validation_result(&message_id, &peer_id, libp2p::gossipsub::MessageAcceptance::Accept) { if let Err(e) = self.swarm.behaviour_mut().report_message_validation_result(&message_id, &peer_id, gossipsub::MessageAcceptance::Accept) {
warn!("Could not publish message: {} {}", message_id, e); warn!("Could not publish message: {} {}", message_id, e);
} }
} }

2454
scoring/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -11,16 +11,12 @@ delay_map = "0.1.1"
futures = "0.3.24" futures = "0.3.24"
gen_topology = { git = "https://github.com/sigp/gossipsub-testground", rev = "4084d26b9210bc932f4aaa45f9506ecfe5f2bb04" } gen_topology = { git = "https://github.com/sigp/gossipsub-testground", rev = "4084d26b9210bc932f4aaa45f9506ecfe5f2bb04" }
# TODO: Update libp2p once the next version, which includes prometheus-client v0.20, has been released. # unstable branch
# See https://github.com/ackintosh/rust-libp2p/commit/df09870c8c2294cbaeb881f58d4f9752125562bc gossipsub = { git = "https://github.com/sigp/lighthouse.git", rev = "f8fdb71f50851bf7792e68a4ee31125bd645e19a" }
libp2p = { git = "https://github.com/ackintosh/rust-libp2p.git", branch = "prometheus-client-0.20.0", default-features = false, features = ["gossipsub", "dns", "tcp", "tokio", "noise", "mplex", "yamux", "serde"] }
libp2p = { version = "0.53.2", default-features = false, features = ["dns", "tcp", "tokio", "noise", "yamux", "serde"] }
npg = { git = "https://github.com/sigp/eth-npg", rev = "6118cdd63eb34ffa4b230d17e88c8adf7057e138" } npg = { git = "https://github.com/sigp/eth-npg", rev = "6118cdd63eb34ffa4b230d17e88c8adf7057e138" }
prometheus-client = { version = "0.22.2", features = ["protobuf"] }
# TODO: Update prometheus-client once the next version, which includes the fix, has been released.
# See https://github.com/prometheus/client_rust/pull/123
prometheus-client = { git = "https://github.com/ackintosh/client_rust.git", branch = "fix/protobuf-labels", features = ["protobuf"] }
prost = "0.11" prost = "0.11"
rand = "0.8.5" rand = "0.8.5"
serde_json = "1.0" serde_json = "1.0"

View File

@@ -1,6 +1,6 @@
# This Dockerfile is for the `docker:generic` builder. # This Dockerfile is for the `docker:generic` builder.
# See https://docs.testground.ai/builder-library/docker-generic for details about the builder. # See https://docs.testground.ai/builder-library/docker-generic for details about the builder.
FROM rust:1.67-bullseye as builder FROM rust:1.77-bullseye as builder
WORKDIR /usr/src/test-plan WORKDIR /usr/src/test-plan
# * `prost-build`, a dependency of `libp2p-gossipsub`, requires cmake. # * `prost-build`, a dependency of `libp2p-gossipsub`, requires cmake.

View File

@@ -98,7 +98,9 @@ pub(crate) async fn run(client: Client) -> Result<(), Box<dyn std::error::Error>
record_victim_id(&client, target).await; record_victim_id(&client, target).await;
record_topology_edge(&client, peer_id.to_string(), target.peer_id().to_string()).await; record_topology_edge(&client, peer_id.to_string(), target.peer_id().to_string()).await;
swarm.dial(target.multiaddr().clone())?; // We are recreating the target multiaddr because we use two different versions of libp2p.
let target_addr = Multiaddr::try_from(target.multiaddr().to_vec()).unwrap();
swarm.dial(target_addr)?;
barrier_and_drive_swarm(&client, &mut swarm, BARRIER_TOPOLOGY_READY).await?; barrier_and_drive_swarm(&client, &mut swarm, BARRIER_TOPOLOGY_READY).await?;
barrier_and_drive_swarm(&client, &mut swarm, BARRIER_SIMULATION_COMPLETED).await?; barrier_and_drive_swarm(&client, &mut swarm, BARRIER_SIMULATION_COMPLETED).await?;

View File

@@ -9,26 +9,17 @@ use crate::utils::{
use chrono::TimeZone; use chrono::TimeZone;
use chrono::{DateTime, Local}; use chrono::{DateTime, Local};
use gen_topology::Params; use gen_topology::Params;
use libp2p::core::muxing::StreamMuxerBox; use gossipsub::{
use libp2p::core::upgrade::{SelectUpgrade, Version}; AllowAllSubscriptionFilter, Behaviour, ConfigBuilder, Event, IdentTopic, IdentityTransform,
use libp2p::dns::TokioDnsConfig; Message as GossipsubMessage, MessageAuthenticity, MessageId, MetricsConfig, PublishError,
use libp2p::futures::StreamExt;
use libp2p::gossipsub::metrics::Config;
use libp2p::gossipsub::subscription_filter::AllowAllSubscriptionFilter;
use libp2p::gossipsub::{
Behaviour, ConfigBuilder, Event, FastMessageId, IdentTopic, IdentityTransform,
Message as GossipsubMessage, MessageAuthenticity, MessageId, PublishError, RawMessage,
ValidationMode, ValidationMode,
}; };
use libp2p::core::muxing::StreamMuxerBox;
use libp2p::futures::StreamExt;
use libp2p::identity::Keypair; use libp2p::identity::Keypair;
use libp2p::mplex::MplexConfig;
use libp2p::multiaddr::Protocol; use libp2p::multiaddr::Protocol;
use libp2p::noise::NoiseConfig; use libp2p::swarm::SwarmEvent;
use libp2p::swarm::{SwarmBuilder, SwarmEvent}; use libp2p::{noise, yamux, Multiaddr, SwarmBuilder, Transport};
use libp2p::tcp::tokio::Transport as TcpTransport;
use libp2p::tcp::Config as TcpConfig;
use libp2p::yamux::YamuxConfig;
use libp2p::{Multiaddr, Transport};
use libp2p::{PeerId, Swarm}; use libp2p::{PeerId, Swarm};
use npg::slot_generator::Subnet; use npg::slot_generator::Subnet;
use npg::slot_generator::ValId; use npg::slot_generator::ValId;
@@ -212,22 +203,19 @@ pub(crate) async fn run(client: Client) -> Result<(), Box<dyn std::error::Error>
Ok(()) Ok(())
} }
/// Set up an encrypted TCP transport over the Mplex and Yamux protocols. // Set up an encrypted TCP transport.
fn build_transport(keypair: &Keypair) -> libp2p::core::transport::Boxed<(PeerId, StreamMuxerBox)> { fn build_transport(keypair: &Keypair) -> libp2p::core::transport::Boxed<(PeerId, StreamMuxerBox)> {
let transport = TokioDnsConfig::system(TcpTransport::new(TcpConfig::default().nodelay(true))) let tcp = libp2p::tcp::tokio::Transport::new(libp2p::tcp::Config::default().nodelay(true));
.expect("DNS config"); let transport = libp2p::dns::tokio::Transport::system(tcp)
.expect("DNS")
let noise_keys = libp2p::noise::Keypair::<libp2p::noise::X25519Spec>::new() .boxed();
.into_authentic(keypair)
.expect("Signing libp2p-noise static DH keypair failed.");
transport transport
.upgrade(Version::V1) .upgrade(libp2p::core::upgrade::Version::V1)
.authenticate(NoiseConfig::xx(noise_keys).into_authenticated()) .authenticate(
.multiplex(SelectUpgrade::new( noise::Config::new(keypair).expect("signing can fail only once during starting a node"),
YamuxConfig::default(), )
MplexConfig::default(), .multiplex(yamux::Config::default())
))
.timeout(Duration::from_secs(20)) .timeout(Duration::from_secs(20))
.boxed() .boxed()
} }
@@ -310,8 +298,6 @@ impl Network {
[..20], [..20],
) )
}; };
let fast_gossip_message_id =
|message: &RawMessage| FastMessageId::from(&Sha256::digest(&message.data)[..8]);
let gossipsub_config = ConfigBuilder::default() let gossipsub_config = ConfigBuilder::default()
// Following params are set based on lighthouse. // Following params are set based on lighthouse.
@@ -324,7 +310,6 @@ impl Network {
.validation_mode(ValidationMode::Anonymous) .validation_mode(ValidationMode::Anonymous)
.duplicate_cache_time(Duration::from_secs(33 * SLOT + 1)) .duplicate_cache_time(Duration::from_secs(33 * SLOT + 1))
.message_id_fn(gossip_message_id) .message_id_fn(gossip_message_id)
.fast_message_id_fn(fast_gossip_message_id)
.allow_self_origin(true) .allow_self_origin(true)
// Following params are set based on `NetworkLoad: 4 Average` which defined at lighthouse. // Following params are set based on `NetworkLoad: 4 Average` which defined at lighthouse.
.heartbeat_interval(Duration::from_millis(700)) .heartbeat_interval(Duration::from_millis(700))
@@ -340,7 +325,7 @@ impl Network {
let mut gs = Behaviour::new_with_subscription_filter_and_transform( let mut gs = Behaviour::new_with_subscription_filter_and_transform(
MessageAuthenticity::Anonymous, MessageAuthenticity::Anonymous,
gossipsub_config, gossipsub_config,
Some((registry, Config::default())), Some((registry, MetricsConfig::default())),
AllowAllSubscriptionFilter {}, AllowAllSubscriptionFilter {},
IdentityTransform {}, IdentityTransform {},
) )
@@ -357,12 +342,14 @@ impl Network {
gs gs
}; };
let swarm = SwarmBuilder::with_tokio_executor( let transport = build_transport(&keypair);
build_transport(&keypair), let swarm = SwarmBuilder::with_existing_identity(keypair)
gossipsub, .with_tokio()
PeerId::from(keypair.public()), .with_other_transport(|_| transport)
) .expect("infallible")
.build(); .with_behaviour(|_| gossipsub)
.expect("infallible")
.build();
info!( info!(
"[{}] running with {} validators", "[{}] running with {} validators",
@@ -865,6 +852,9 @@ async fn record_metrics(metrics_info: RecordMetricsInfo) {
&metrics_info.peer_id, &metrics_info.peer_id,
&run_id, &run_id,
), ),
"publish_messages_dropped_per_topic" | "forward_messages_dropped_per_topic" => {
continue;
}
// /////////////////////////////////// // ///////////////////////////////////
// Metrics regarding mesh state // Metrics regarding mesh state
// /////////////////////////////////// // ///////////////////////////////////
@@ -933,6 +923,12 @@ async fn record_metrics(metrics_info: RecordMetricsInfo) {
&metrics_info.peer_id, &metrics_info.peer_id,
&run_id, &run_id,
), ),
// ///////////////////////////////////
// The size of the priority queue.
// ///////////////////////////////////
"priority_queue_size" | "non_priority_queue_size" => {
continue;
}
_ => unreachable!(), _ => unreachable!(),
}; };
queries.extend(q); queries.extend(q);

View File

@@ -1,9 +1,7 @@
use crate::beacon_node::{ATTESTATION_SUBNETS, SLOT, SLOTS_PER_EPOCH, SYNC_SUBNETS}; use crate::beacon_node::{ATTESTATION_SUBNETS, SLOT, SLOTS_PER_EPOCH, SYNC_SUBNETS};
use crate::topic::Topic; use crate::topic::Topic;
use gen_topology::Params; use gen_topology::Params;
use libp2p::gossipsub::{ use gossipsub::{IdentTopic, PeerScoreParams, PeerScoreThresholds, TopicHash, TopicScoreParams};
IdentTopic, PeerScoreParams, PeerScoreThresholds, TopicHash, TopicScoreParams,
};
use rand::Rng; use rand::Rng;
use std::collections::HashMap; use std::collections::HashMap;
use std::str::FromStr; use std::str::FromStr;

View File

@@ -1,4 +1,4 @@
use libp2p::gossipsub::{IdentTopic, Topic as GossipTopic}; use gossipsub::{IdentTopic, Topic as GossipTopic};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Clone, Debug)] #[derive(Serialize, Deserialize, Clone, Debug)]

3206
smoke/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -6,9 +6,10 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
# TODO: Update libp2p once the next version, which includes prometheus-client v0.20, has been released. # unstable branch
# See https://github.com/ackintosh/rust-libp2p/commit/df09870c8c2294cbaeb881f58d4f9752125562bc gossipsub = { git = "https://github.com/sigp/lighthouse.git", rev = "f8fdb71f50851bf7792e68a4ee31125bd645e19a" }
libp2p = { git = "https://github.com/ackintosh/rust-libp2p.git", branch = "prometheus-client-0.20.0", default-features = false, features = ["gossipsub", "dns", "tcp", "tokio", "noise", "mplex", "yamux", "serde"] }
libp2p = { version = "0.53.2", default-features = false, features = ["dns", "tcp", "tokio", "noise", "yamux", "serde"] }
rand = "0.8.5" rand = "0.8.5"
serde = "1.0.137" serde = "1.0.137"
serde_json = "1.0.81" serde_json = "1.0.81"

View File

@@ -1,6 +1,6 @@
# This Dockerfile is for the `docker:generic` builder. # This Dockerfile is for the `docker:generic` builder.
# See https://docs.testground.ai/builder-library/docker-generic for details about the builder. # See https://docs.testground.ai/builder-library/docker-generic for details about the builder.
FROM rust:1.67-bullseye as builder FROM rust:1.77-bullseye as builder
WORKDIR /usr/src/test-plan WORKDIR /usr/src/test-plan
# * `prost-build`, a dependency of `libp2p-gossipsub`, requires cmake. # * `prost-build`, a dependency of `libp2p-gossipsub`, requires cmake.
@@ -36,6 +36,6 @@ FROM debian:bullseye-slim
COPY --from=builder /usr/src/test-plan/plan/target/release/smoke /usr/local/bin/smoke COPY --from=builder /usr/src/test-plan/plan/target/release/smoke /usr/local/bin/smoke
# Configure Logging # Configure Logging
# ENV RUST_LOG=libp2p_gossipsub=debug ENV RUST_LOG=libp2p_gossipsub=debug
ENTRYPOINT ["smoke"] ENTRYPOINT ["smoke"]

View File

@@ -1,24 +1,17 @@
extern crate core; extern crate core;
use libp2p::core::muxing::StreamMuxerBox; use gossipsub::AllowAllSubscriptionFilter;
use libp2p::core::upgrade::{SelectUpgrade, Version}; use gossipsub::{
use libp2p::core::ConnectedPoint;
use libp2p::dns::TokioDnsConfig;
use libp2p::futures::FutureExt;
use libp2p::futures::StreamExt;
use libp2p::gossipsub::subscription_filter::AllowAllSubscriptionFilter;
use libp2p::gossipsub::{
Behaviour, ConfigBuilder, Event, IdentTopic as Topic, IdentityTransform, MessageAuthenticity, Behaviour, ConfigBuilder, Event, IdentTopic as Topic, IdentityTransform, MessageAuthenticity,
}; };
use libp2p::core::muxing::StreamMuxerBox;
use libp2p::core::ConnectedPoint;
use libp2p::futures::FutureExt;
use libp2p::futures::StreamExt;
use libp2p::identity::Keypair; use libp2p::identity::Keypair;
use libp2p::mplex::MplexConfig;
use libp2p::multiaddr::Protocol; use libp2p::multiaddr::Protocol;
use libp2p::noise::NoiseConfig; use libp2p::swarm::SwarmEvent;
use libp2p::swarm::{SwarmBuilder, SwarmEvent}; use libp2p::{noise, yamux, Multiaddr, PeerId, Swarm, SwarmBuilder, Transport};
use libp2p::tcp::tokio::Transport as TcpTransport;
use libp2p::tcp::Config as TcpConfig;
use libp2p::yamux::YamuxConfig;
use libp2p::{Multiaddr, PeerId, Swarm, Transport};
use rand::seq::SliceRandom; use rand::seq::SliceRandom;
use rand::SeedableRng; use rand::SeedableRng;
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
@@ -26,6 +19,7 @@ use serde::Serialize;
use serde_json::Value; use serde_json::Value;
use std::borrow::Cow; use std::borrow::Cow;
use std::collections::HashSet; use std::collections::HashSet;
use std::time::Duration;
use testground::client::Client; use testground::client::Client;
use tracing::{debug, info, warn}; use tracing::{debug, info, warn};
@@ -69,8 +63,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
IdentityTransform {}, IdentityTransform {},
) )
.expect("Valid configuration"); .expect("Valid configuration");
let transport = build_transport(&local_key);
SwarmBuilder::with_tokio_executor(build_transport(&local_key), gossipsub, local_peer_id) SwarmBuilder::with_existing_identity(local_key)
.with_tokio()
.with_other_transport(|_| transport)
.expect("infallible")
.with_behaviour(|_| gossipsub)
.expect("infallible")
.build() .build()
}; };
@@ -256,23 +255,20 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
Ok(()) Ok(())
} }
// Set up an encrypted TCP transport over the Mplex and Yamux protocols. // Set up an encrypted TCP transport.
fn build_transport(keypair: &Keypair) -> libp2p::core::transport::Boxed<(PeerId, StreamMuxerBox)> { fn build_transport(keypair: &Keypair) -> libp2p::core::transport::Boxed<(PeerId, StreamMuxerBox)> {
let transport = TokioDnsConfig::system(TcpTransport::new(TcpConfig::default().nodelay(true))) let tcp = libp2p::tcp::tokio::Transport::new(libp2p::tcp::Config::default().nodelay(true));
.expect("DNS config"); let transport = libp2p::dns::tokio::Transport::system(tcp)
.expect("DNS")
let noise_keys = libp2p::noise::Keypair::<libp2p::noise::X25519Spec>::new() .boxed();
.into_authentic(keypair)
.expect("Signing libp2p-noise static DH keypair failed.");
transport transport
.upgrade(Version::V1) .upgrade(libp2p::core::upgrade::Version::V1)
.authenticate(NoiseConfig::xx(noise_keys).into_authenticated()) .authenticate(
.multiplex(SelectUpgrade::new( noise::Config::new(keypair).expect("signing can fail only once during starting a node"),
YamuxConfig::default(), )
MplexConfig::default(), .multiplex(yamux::Config::default())
)) .timeout(Duration::from_secs(20))
.timeout(std::time::Duration::from_secs(20))
.boxed() .boxed()
} }