Update dependencies (#13)

* censoring: Update libp2p to the latest revision

* scoring: Update libp2p to the latest revision

* eth_consensus: Update libp2p to the latest revision

* cargo fmt

* eth_consensus: Update libp2p and prometheus-client

* censoring: Update libp2p and prometheus-client

* scoring: Update libp2p and prometheus-client

* smoke: Update libp2p and prometheus-client

* Update testground

* Bump up the rust version
This commit is contained in:
Akihito Nakano
2023-03-07 06:08:17 +09:00
committed by GitHub
parent 5a2a4d649d
commit 7d806cda1c
24 changed files with 8072 additions and 2414 deletions

2693
censoring/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -9,30 +9,26 @@ edition = "2021"
chrono = { version = "0.4.19", default-features = false, features = ["clock"] }
delay_map = "0.1.1"
# TODO:
# * Switch back from the forked one, once the `protobuf` encoding is supported in rust-libp2p (libp2p-gossipsub).
# * See https://github.com/libp2p/rust-libp2p/pull/2911 for the progress to support the `protobuf` encoding.
# libp2p = { version = "0.48.0", default-features = false, features = ["gossipsub", "dns-tokio", "tcp-tokio", "noise", "mplex", "yamux", "serde"] }
# NOTE:
# * We are using the fork whose metrics encoding format is fixed to `protobuf`.
# https://github.com/libp2p/rust-libp2p/compare/master...ackintosh:rust-libp2p:prometheus-ptotobuf-support
# * That is due to that `protobuf` encoding is not supported in rust-libp2p (libp2p-gossipsub) yet.
libp2p = { git = "https://github.com/ackintosh/rust-libp2p.git", branch = "prometheus-ptotobuf-support", default-features = false, features = ["gossipsub", "dns-tokio", "tcp-tokio", "noise", "mplex", "yamux", "serde"] }
# TODO: Update libp2p once the next version, which includes prometheus-client v0.20, has been released.
# See https://github.com/ackintosh/rust-libp2p/commit/df09870c8c2294cbaeb881f58d4f9752125562bc
libp2p = { git = "https://github.com/ackintosh/rust-libp2p.git", branch = "prometheus-client-0.20.0", default-features = false, features = ["gossipsub", "dns", "tcp", "tokio", "noise", "mplex", "yamux", "serde"] }
# This is a fork of `libp2p` in order to implement malicious behaviour in the `attacker` module.
# This `libp2p-testground` is used in `attacker` module instead of `libp2p`.
# See https://github.com/ackintosh/rust-libp2p/pull/50
libp2p-testground = { git = "https://github.com/ackintosh/rust-libp2p.git", branch = "testground", package = "libp2p", default-features = false, features = ["gossipsub", "dns-tokio", "tcp-tokio", "noise", "mplex", "yamux", "serde"] }
libp2p-testground = { git = "https://github.com/ackintosh/rust-libp2p.git", rev = "8e8be3f465cb9815fd84184c32805541db546aa7", package = "libp2p", default-features = false, features = ["gossipsub", "dns", "tcp", "tokio", "noise", "mplex", "yamux", "serde"] }
rand = "0.8.5"
serde = "1.0.139"
serde_json = "1.0.82"
testground = "0.4"
# TODO: Update testground once the next version(v0.5.0).
testground = { git = "https://github.com/testground/sdk-rust.git", rev = "1fd032ec29361a00b25c0c8a6bac5f19a43019eb" }
tokio = { version = "1.20.0", features = ["macros"] }
tracing = "0.1.35"
tracing-subscriber = { version = "0.3.14", features = ["env-filter"] }
# TODO: Update once v0.19.0 has been released.
prometheus-client = { git = "https://github.com/prometheus/client_rust.git", rev = "682b24ee8c6c857b76c0683b1dd7df5a97b75c27", features = ["protobuf"] }
# TODO: Update prometheus-client once the next version, which includes the fix, has been released.
# See https://github.com/prometheus/client_rust/pull/123
prometheus-client = { git = "https://github.com/ackintosh/client_rust.git", branch = "fix/protobuf-labels", features = ["protobuf"] }
prost = "0.11"

View File

@@ -1,6 +1,6 @@
# This Dockerfile is for the `docker:generic` builder.
# See https://docs.testground.ai/builder-library/docker-generic for details about the builder.
FROM rust:1.62-bullseye as builder
FROM rust:1.67-bullseye as builder
WORKDIR /usr/src/test-plan
# * `prost-build`, a dependency of `libp2p-gossipsub`, requires cmake.

View File

@@ -23,13 +23,13 @@ use libp2p_testground::swarm::{
ConnectionHandler, IntoConnectionHandler, NetworkBehaviour, NetworkBehaviourAction,
NotifyHandler, PollParameters, SwarmBuilder, SwarmEvent,
};
use libp2p_testground::tcp::{GenTcpConfig, TokioTcpTransport};
use libp2p_testground::tcp::tokio::Transport as TcpTransport;
use libp2p_testground::tcp::Config;
use libp2p_testground::yamux::YamuxConfig;
use libp2p_testground::Transport;
use libp2p_testground::{PeerId, Swarm};
use prost::Message;
use std::collections::VecDeque;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use testground::client::Client;
@@ -90,24 +90,19 @@ pub(crate) async fn run(
}
fn build_swarm(keypair: Keypair) -> Swarm<MaliciousBehaviour> {
SwarmBuilder::new(
SwarmBuilder::with_tokio_executor(
build_transport(&keypair),
MaliciousBehaviour::new(),
PeerId::from(keypair.public()),
)
.executor(Box::new(|future| {
tokio::spawn(future);
}))
.build()
}
fn build_transport(
keypair: &Keypair,
) -> libp2p_testground::core::transport::Boxed<(PeerId, StreamMuxerBox)> {
let transport = TokioDnsConfig::system(TokioTcpTransport::new(
GenTcpConfig::default().nodelay(true),
))
.expect("DNS config");
let transport = TokioDnsConfig::system(TcpTransport::new(Config::default().nodelay(true)))
.expect("DNS config");
let noise_keys = libp2p_testground::noise::Keypair::<X25519Spec>::new()
.into_authentic(keypair)
@@ -125,7 +120,7 @@ fn build_transport(
}
type GossipsubNetworkBehaviourAction =
NetworkBehaviourAction<GossipsubEvent, GossipsubHandler, Arc<GossipsubHandlerIn>>;
NetworkBehaviourAction<GossipsubEvent, GossipsubHandler, GossipsubHandlerIn>;
pub struct MaliciousBehaviour {
/// Configuration providing gossipsub performance parameters.
@@ -225,7 +220,7 @@ impl MaliciousBehaviour {
self.events
.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id,
event: Arc::new(GossipsubHandlerIn::Message(message)),
event: GossipsubHandlerIn::Message(message),
handler: NotifyHandler::Any,
})
}
@@ -415,10 +410,7 @@ impl NetworkBehaviour for MaliciousBehaviour {
_params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
if let Some(event) = self.events.pop_front() {
return Poll::Ready(event.map_in(|e: Arc<GossipsubHandlerIn>| {
// clone send event reference if others references are present
Arc::try_unwrap(e).unwrap_or_else(|e| (*e).clone())
}));
return Poll::Ready(event);
}
loop {

View File

@@ -3,7 +3,7 @@ use crate::utils::{
BARRIER_DONE, BARRIER_STARTED_LIBP2P, BARRIER_WARMUP,
};
use crate::{InstanceInfo, Role};
use chrono::{DateTime, Local, Utc};
use chrono::Local;
use libp2p::core::muxing::StreamMuxerBox;
use libp2p::core::upgrade::{SelectUpgrade, Version};
use libp2p::dns::TokioDnsConfig;
@@ -11,19 +11,19 @@ use libp2p::futures::StreamExt;
use libp2p::gossipsub::metrics::Config;
use libp2p::gossipsub::subscription_filter::AllowAllSubscriptionFilter;
use libp2p::gossipsub::{
Gossipsub, GossipsubConfigBuilder, IdentTopic, IdentityTransform, MessageAuthenticity,
PeerScoreParams, PeerScoreThresholds, Topic, TopicScoreParams,
Behaviour, ConfigBuilder, IdentTopic, IdentityTransform, MessageAuthenticity, PeerScoreParams,
PeerScoreThresholds, Topic, TopicScoreParams,
};
use libp2p::identity::Keypair;
use libp2p::mplex::MplexConfig;
use libp2p::noise::NoiseConfig;
use libp2p::swarm::{DialError, SwarmBuilder, SwarmEvent};
use libp2p::tcp::{GenTcpConfig, TokioTcpTransport};
use libp2p::tcp::tokio::Transport as TcpTransport;
use libp2p::tcp::Config as TcpConfig;
use libp2p::yamux::YamuxConfig;
use libp2p::PeerId;
use libp2p::Transport;
use libp2p::{Multiaddr, Swarm};
use prometheus_client::encoding::proto::EncodeMetric;
use prometheus_client::registry::Registry;
use rand::seq::SliceRandom;
use rand::SeedableRng;
@@ -93,7 +93,7 @@ pub(crate) async fn run(
// ////////////////////////////////////////////////////////////////////////
// Start libp2p
// ////////////////////////////////////////////////////////////////////////
let mut registry: Registry<Box<dyn EncodeMetric>> = Registry::default();
let mut registry = Registry::default();
registry.sub_registry_with_prefix("gossipsub");
let network_send = spawn_honest_network(
@@ -154,45 +154,63 @@ pub(crate) async fn run(
// Encode the metrics to an instance of the OpenMetrics protobuf format.
// https://github.com/OpenObservability/OpenMetrics/blob/main/proto/openmetrics_data_model.proto
let metric_set = prometheus_client::encoding::proto::encode(&registry);
let metric_set = prometheus_client::encoding::protobuf::encode(&registry)?;
let mut queries = vec![];
// The `test_start_time` is used as the timestamp of metrics instead of local time of each
// instance so the timestamps between metrics of each instance are aligned.
// This is helpful when we want to sort metrics by something not timestamp(e.g. instance_name).
let test_start_time: DateTime<Utc> =
DateTime::parse_from_rfc3339(&client.run_parameters().test_start_time)?.into();
for family in metric_set.metric_families.iter() {
let q = match family.name.as_str() {
// ///////////////////////////////////
// Metrics per known topic
// ///////////////////////////////////
"topic_subscription_status" => {
queries_for_gauge(&test_start_time, family, &instance_info, run_id, "status")
}
"topic_peers_counts" => {
queries_for_gauge(&test_start_time, family, &instance_info, run_id, "count")
}
"topic_subscription_status" => queries_for_gauge(
&client.run_parameters().test_start_time,
family,
&instance_info,
run_id,
"status",
),
"topic_peers_counts" => queries_for_gauge(
&client.run_parameters().test_start_time,
family,
&instance_info,
run_id,
"count",
),
"invalid_messages_per_topic"
| "accepted_messages_per_topic"
| "ignored_messages_per_topic"
| "rejected_messages_per_topic" => {
queries_for_counter(&test_start_time, family, &instance_info, run_id)
}
| "rejected_messages_per_topic" => queries_for_counter(
&client.run_parameters().test_start_time,
family,
&instance_info,
run_id,
),
// ///////////////////////////////////
// Metrics regarding mesh state
// ///////////////////////////////////
"mesh_peer_counts" => {
queries_for_gauge(&test_start_time, family, &instance_info, run_id, "count")
}
"mesh_peer_inclusion_events" => {
queries_for_counter(&test_start_time, family, &instance_info, run_id)
}
"mesh_peer_churn_events" => {
queries_for_counter(&test_start_time, family, &instance_info, run_id)
}
"mesh_peer_counts" => queries_for_gauge(
&client.run_parameters().test_start_time,
family,
&instance_info,
run_id,
"count",
),
"mesh_peer_inclusion_events" => queries_for_counter(
&client.run_parameters().test_start_time,
family,
&instance_info,
run_id,
),
"mesh_peer_churn_events" => queries_for_counter(
&client.run_parameters().test_start_time,
family,
&instance_info,
run_id,
),
// ///////////////////////////////////
// Metrics regarding messages sent/received
// ///////////////////////////////////
@@ -201,36 +219,58 @@ pub(crate) async fn run(
| "topic_msg_sent_bytes"
| "topic_msg_recv_counts_unfiltered"
| "topic_msg_recv_counts"
| "topic_msg_recv_bytes" => {
queries_for_counter(&test_start_time, family, &instance_info, run_id)
}
| "topic_msg_recv_bytes" => queries_for_counter(
&client.run_parameters().test_start_time,
family,
&instance_info,
run_id,
),
// ///////////////////////////////////
// Metrics related to scoring
// ///////////////////////////////////
"score_per_mesh" => {
queries_for_histogram(&test_start_time, family, &instance_info, run_id)
}
"scoring_penalties" => {
queries_for_counter(&test_start_time, family, &instance_info, run_id)
}
"score_per_mesh" => queries_for_histogram(
&client.run_parameters().test_start_time,
family,
&instance_info,
run_id,
),
"scoring_penalties" => queries_for_counter(
&client.run_parameters().test_start_time,
family,
&instance_info,
run_id,
),
// ///////////////////////////////////
// General Metrics
// ///////////////////////////////////
"peers_per_protocol" => {
queries_for_gauge(&test_start_time, family, &instance_info, run_id, "peers")
}
"heartbeat_duration" => {
queries_for_histogram(&test_start_time, family, &instance_info, run_id)
}
"peers_per_protocol" => queries_for_gauge(
&client.run_parameters().test_start_time,
family,
&instance_info,
run_id,
"peers",
),
"heartbeat_duration" => queries_for_histogram(
&client.run_parameters().test_start_time,
family,
&instance_info,
run_id,
),
// ///////////////////////////////////
// Performance metrics
// ///////////////////////////////////
"topic_iwant_msgs" => {
queries_for_counter(&test_start_time, family, &instance_info, run_id)
}
"memcache_misses" => {
queries_for_counter(&test_start_time, family, &instance_info, run_id)
}
"topic_iwant_msgs" => queries_for_counter(
&client.run_parameters().test_start_time,
family,
&instance_info,
run_id,
),
"memcache_misses" => queries_for_counter(
&client.run_parameters().test_start_time,
family,
&instance_info,
run_id,
),
_ => unreachable!(),
};
@@ -249,10 +289,8 @@ pub(crate) async fn run(
/// Set up an encrypted TCP transport over the Mplex and Yamux protocols.
fn build_transport(keypair: &Keypair) -> libp2p::core::transport::Boxed<(PeerId, StreamMuxerBox)> {
let transport = TokioDnsConfig::system(TokioTcpTransport::new(
GenTcpConfig::default().nodelay(true),
))
.expect("DNS config");
let transport = TokioDnsConfig::system(TcpTransport::new(TcpConfig::default().nodelay(true)))
.expect("DNS config");
let noise_keys = libp2p::noise::Keypair::<libp2p::noise::X25519Spec>::new()
.into_authentic(keypair)
@@ -283,7 +321,7 @@ enum PublishState {
}
pub(crate) struct HonestNetwork {
swarm: Swarm<Gossipsub>,
swarm: Swarm<Behaviour>,
instance_info: InstanceInfo,
participants: HashMap<PeerId, String>,
client: Client,
@@ -297,7 +335,7 @@ pub(crate) struct HonestNetwork {
impl HonestNetwork {
#[allow(clippy::too_many_arguments)]
fn new(
registry: &mut Registry<Box<dyn EncodeMetric>>,
registry: &mut Registry,
keypair: Keypair,
instance_info: InstanceInfo,
participants: &Vec<InstanceInfo>,
@@ -307,13 +345,13 @@ impl HonestNetwork {
recv: UnboundedReceiver<HonestMessage>,
) -> Self {
let gossipsub = {
let gossipsub_config = GossipsubConfigBuilder::default()
let gossipsub_config = ConfigBuilder::default()
.prune_backoff(Duration::from_secs(PRUNE_BACKOFF))
.history_length(12)
.build()
.expect("Valid configuration");
let mut gs = Gossipsub::new_with_subscription_filter_and_transform(
let mut gs = Behaviour::new_with_subscription_filter_and_transform(
MessageAuthenticity::Signed(keypair.clone()),
gossipsub_config,
Some((registry, Config::default())),
@@ -333,14 +371,11 @@ impl HonestNetwork {
gs
};
let swarm = SwarmBuilder::new(
let swarm = SwarmBuilder::with_tokio_executor(
build_transport(&keypair),
gossipsub,
PeerId::from(keypair.public()),
)
.executor(Box::new(|future| {
tokio::spawn(future);
}))
.build();
let mut peer_to_instance_name = HashMap::new();
@@ -443,7 +478,7 @@ impl HonestNetwork {
}
async fn spawn_honest_network(
registry: &mut Registry<Box<dyn EncodeMetric>>,
registry: &mut Registry,
keypair: Keypair,
instance_info: InstanceInfo,
participants: &Vec<InstanceInfo>,

View File

@@ -1,13 +1,13 @@
use crate::InstanceInfo;
use chrono::{DateTime, Local, Utc};
use chrono::{DateTime, FixedOffset, Local};
use libp2p::futures::FutureExt;
use libp2p::futures::{Stream, StreamExt};
use prometheus_client::encoding::proto::openmetrics_data_model::counter_value;
use prometheus_client::encoding::proto::openmetrics_data_model::gauge_value;
use prometheus_client::encoding::proto::openmetrics_data_model::metric_point;
use prometheus_client::encoding::proto::openmetrics_data_model::Metric;
use prometheus_client::encoding::proto::openmetrics_data_model::MetricFamily;
use prometheus_client::encoding::proto::HistogramValue;
use prometheus_client::encoding::protobuf::openmetrics_data_model::counter_value;
use prometheus_client::encoding::protobuf::openmetrics_data_model::gauge_value;
use prometheus_client::encoding::protobuf::openmetrics_data_model::metric_point;
use prometheus_client::encoding::protobuf::openmetrics_data_model::HistogramValue;
use prometheus_client::encoding::protobuf::openmetrics_data_model::Metric;
use prometheus_client::encoding::protobuf::openmetrics_data_model::MetricFamily;
use serde::de::DeserializeOwned;
use serde::Serialize;
use serde_json::Value;
@@ -93,7 +93,7 @@ where
/// Create InfluxDB queries for Counter metrics.
pub(crate) fn queries_for_counter(
datetime: &DateTime<Utc>,
datetime: &DateTime<FixedOffset>,
family: &MetricFamily,
instance_info: &InstanceInfo,
run_id: &str,
@@ -122,7 +122,7 @@ pub(crate) fn queries_for_counter(
/// Create InfluxDB queries for Gauge metrics.
pub(crate) fn queries_for_gauge(
datetime: &DateTime<Utc>,
datetime: &DateTime<FixedOffset>,
family: &MetricFamily,
instance_info: &InstanceInfo,
run_id: &str,
@@ -152,7 +152,7 @@ pub(crate) fn queries_for_gauge(
/// Create InfluxDB queries for Histogram metrics.
pub(crate) fn queries_for_histogram(
datetime: &DateTime<Utc>,
datetime: &DateTime<FixedOffset>,
family: &MetricFamily,
instance_info: &InstanceInfo,
run_id: &str,

2248
eth_consensus/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -9,7 +9,8 @@ members = [
[workspace.dependencies]
serde_json = "1.0"
serde = "1.0"
testground = "0.4.0"
# TODO: Update testground once the next version(v0.5.0).
testground = { git = "https://github.com/testground/sdk-rust.git", rev = "1fd032ec29361a00b25c0c8a6bac5f19a43019eb" }
tokio = { version = "1.21.2", features = ["macros"] }
tracing = "0.1.35"
tracing-subscriber = { version = "0.3.14", features = ["env-filter"] }

View File

@@ -1,5 +1,5 @@
# Get chef
FROM rust:1.65-bullseye as chef
FROM rust:1.67-bullseye as chef
WORKDIR test-plan
RUN cargo install cargo-chef

View File

@@ -9,12 +9,15 @@ edition = "2021"
chrono = { version = "0.4.19", features = [ "std" ]}
delay_map = "0.1.1"
# libp2p = { version = "0.48.0", default-features = false, features = ["gossipsub", "dns-tokio", "tcp-tokio", "noise", "mplex", "yamux", "serde"] }
prometheus-client = { git = "https://github.com/prometheus/client_rust.git", rev = "682b24ee8c6c857b76c0683b1dd7df5a97b75c27", features = ["protobuf"] }
libp2p = { git = "https://github.com/ackintosh/rust-libp2p.git", branch = "prometheus-ptotobuf-support", default-features = false, features = ["gossipsub", "dns-tokio", "tcp-tokio", "noise", "mplex", "yamux", "serde"] }
# TODO: Update prometheus-client once the next version, which includes the fix, has been released.
# See https://github.com/prometheus/client_rust/pull/123
prometheus-client = { git = "https://github.com/ackintosh/client_rust.git", branch = "fix/protobuf-labels", features = ["protobuf"] }
# TODO: Update libp2p once the next version, which includes prometheus-client v0.20, has been released.
# See https://github.com/ackintosh/rust-libp2p/commit/df09870c8c2294cbaeb881f58d4f9752125562bc
libp2p = { git = "https://github.com/ackintosh/rust-libp2p.git", branch = "prometheus-client-0.20.0", default-features = false, features = ["gossipsub", "dns", "tcp", "tokio", "noise", "mplex", "yamux", "serde"] }
serde_json = "1.0"
serde = "1.0"
testground = "0.4.0"
testground.workspace = true
tokio = { version = "1.21.2", features = ["macros"] }
tracing = "0.1.35"
tracing-subscriber = { version = "0.3.14", features = ["env-filter"] }
@@ -22,5 +25,5 @@ gen_topology = { path = "../utils/gen_topology" }
futures = "0.3.24"
npg = { git = "https://github.com/sigp/eth-npg"}
sha2 = "0.10.6"
rand = "0.8.5"
rand = { version = "0.8.5", features = ["small_rng"] }
tokio-util = { version = "0.7.4", features = ["time"] }

View File

@@ -1,15 +1,13 @@
use crate::InstanceInfo;
use chrono::{DateTime, Utc};
use chrono::{DateTime, FixedOffset};
use futures::stream::FuturesUnordered;
use libp2p::gossipsub::{
error::GossipsubHandlerError, Gossipsub, GossipsubEvent, IdentTopic, MessageId,
Topic as GossipTopic,
Behaviour, Event, HandlerError, IdentTopic, MessageId, Topic as GossipTopic,
};
use libp2p::swarm::SwarmEvent;
use libp2p::PeerId;
use libp2p::Swarm;
use npg::Generator;
use prometheus_client::encoding::proto::EncodeMetric;
use prometheus_client::registry::Registry;
use rand::Rng;
use std::collections::HashMap;
@@ -30,20 +28,20 @@ use run::{ATTESTATION_SUBNETS, SYNC_SUBNETS};
/// Main struct to run the simulation.
pub struct Network {
/// Libp2p2 swarm.
swarm: Swarm<Gossipsub>,
swarm: Swarm<Behaviour>,
/// Node id for this node, local to the test run.
node_id: usize,
/// This nodes contact info.
instance_info: InstanceInfo,
/// Metrics registry.
registry: Registry<Box<dyn EncodeMetric>>,
registry: Registry,
/// Information of every other participant in the network, indexed by their (local to the test
/// run) node_id.
participants: HashMap<usize, InstanceInfo>,
/// Testground client.
client: Arc<Client>,
/// Chronos time reported by testground as the start of the test run.
start_time: DateTime<Utc>,
start_time: DateTime<FixedOffset>,
/// Instant in which the simmulation starts running, according to the local time.
local_start_time: Instant,
/// How often metrics are recorded.
@@ -120,7 +118,7 @@ impl Network {
topic: Topic,
validator: u64,
mut payload: Vec<u8>,
) -> Result<libp2p::gossipsub::MessageId, libp2p::gossipsub::error::PublishError> {
) -> Result<libp2p::gossipsub::MessageId, libp2p::gossipsub::PublishError> {
// Plain binary as messages, coupled with the validator
payload.append(&mut validator.to_be_bytes().to_vec());
@@ -137,9 +135,9 @@ impl Network {
}
// An inbound event or swarm event gets sent here
fn handle_swarm_event(&mut self, event: SwarmEvent<GossipsubEvent, GossipsubHandlerError>) {
fn handle_swarm_event(&mut self, event: SwarmEvent<Event, HandlerError>) {
match event {
SwarmEvent::Behaviour(GossipsubEvent::Message {
SwarmEvent::Behaviour(Event::Message {
propagation_source,
message_id,
message,

View File

@@ -3,9 +3,9 @@ use crate::utils::{
queries_for_histogram,
};
use crate::InstanceInfo;
use chrono::{DateTime, Utc};
use chrono::{DateTime, FixedOffset};
use libp2p::gossipsub::IdentTopic;
use prometheus_client::encoding::proto::openmetrics_data_model::MetricSet;
use prometheus_client::encoding::protobuf::openmetrics_data_model::MetricSet;
use std::sync::Arc;
use testground::client::Client;
use tracing::error;
@@ -21,7 +21,7 @@ pub(crate) struct RecordMetricsInfo {
metrics: MetricSet,
node_id: usize,
instance_info: InstanceInfo,
current: DateTime<Utc>,
current: DateTime<FixedOffset>,
}
impl Network {
@@ -29,7 +29,7 @@ impl Network {
pub(super) fn record_metrics_info(&self) -> RecordMetricsInfo {
// Encode the metrics to an instance of the OpenMetrics protobuf format.
// https://github.com/OpenObservability/OpenMetrics/blob/main/proto/openmetrics_data_model.proto
let metrics = prometheus_client::encoding::proto::encode(&self.registry);
let metrics = prometheus_client::encoding::protobuf::encode(&self.registry).unwrap();
let elapsed = chrono::Duration::from_std(self.local_start_time.elapsed())
.expect("Durations are small");

View File

@@ -1,6 +1,5 @@
use crate::utils::{record_instance_info, BARRIER_LIBP2P_READY, BARRIER_TOPOLOGY_READY};
use crate::InstanceInfo;
use chrono::{DateTime, Utc};
use futures::stream::FuturesUnordered;
use gen_topology::Params;
use libp2p::core::muxing::StreamMuxerBox;
@@ -10,22 +9,24 @@ use libp2p::futures::StreamExt;
use libp2p::gossipsub::metrics::Config;
use libp2p::gossipsub::subscription_filter::AllowAllSubscriptionFilter;
use libp2p::gossipsub::{
Gossipsub, GossipsubConfigBuilder, GossipsubMessage, IdentityTransform, MessageAuthenticity,
Behaviour, ConfigBuilder, IdentityTransform, Message as GossipsubMessage, MessageAuthenticity,
MessageId, PeerScoreParams, PeerScoreThresholds, ValidationMode,
};
use libp2p::identity::Keypair;
use libp2p::mplex::MplexConfig;
use libp2p::noise::NoiseConfig;
use libp2p::swarm::{SwarmBuilder, SwarmEvent};
use libp2p::tcp::{GenTcpConfig, TokioTcpTransport};
use libp2p::tcp::tokio::Transport as TcpTransport;
use libp2p::tcp::Config as TcpConfig;
use libp2p::yamux::YamuxConfig;
use libp2p::PeerId;
use libp2p::Transport;
use npg::slot_generator::{Subnet, ValId};
use npg::Generator;
use npg::Message;
use prometheus_client::encoding::proto::EncodeMetric;
use prometheus_client::registry::Registry;
use rand::rngs::SmallRng;
use rand::SeedableRng;
use sha2::{Digest, Sha256};
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
@@ -90,14 +91,14 @@ pub(crate) fn parse_params(
}
// Sets up the gossipsub configuration to be used in the simulation.
pub fn setup_gossipsub(registry: &mut Registry<Box<dyn EncodeMetric>>) -> Gossipsub {
pub fn setup_gossipsub(registry: &mut Registry) -> Behaviour {
let gossip_message_id = move |message: &GossipsubMessage| {
MessageId::from(
&Sha256::digest([message.topic.as_str().as_bytes(), &message.data].concat())[..20],
)
};
let gossipsub_config = GossipsubConfigBuilder::default()
let gossipsub_config = ConfigBuilder::default()
.max_transmit_size(10 * 1_048_576) // gossip_max_size(true)
// .heartbeat_interval(Duration::from_secs(1))
.prune_backoff(Duration::from_secs(60))
@@ -117,7 +118,7 @@ pub fn setup_gossipsub(registry: &mut Registry<Box<dyn EncodeMetric>>) -> Gossip
.build()
.expect("valid gossipsub configuration");
let mut gs = Gossipsub::new_with_subscription_filter_and_transform(
let mut gs = Behaviour::new_with_subscription_filter_and_transform(
MessageAuthenticity::Anonymous,
gossipsub_config,
Some((registry, Config::default())),
@@ -171,7 +172,7 @@ pub(crate) async fn run(
)
.await?;
let registry: Registry<Box<dyn EncodeMetric>> = Registry::default();
let registry = Registry::default();
let mut network = Network::new(
registry,
keypair,
@@ -216,10 +217,8 @@ pub(crate) async fn run(
pub fn build_transport(
keypair: &Keypair,
) -> libp2p::core::transport::Boxed<(PeerId, StreamMuxerBox)> {
let transport = TokioDnsConfig::system(TokioTcpTransport::new(
GenTcpConfig::default().nodelay(true),
))
.expect("DNS config");
let transport = TokioDnsConfig::system(TcpTransport::new(TcpConfig::default().nodelay(true)))
.expect("DNS config");
let noise_keys = libp2p::noise::Keypair::<libp2p::noise::X25519Spec>::new()
.into_authentic(keypair)
@@ -240,7 +239,7 @@ impl Network {
// Sets up initial conditions and configuration
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
mut registry: Registry<Box<dyn EncodeMetric>>,
mut registry: Registry,
keypair: Keypair,
node_id: usize,
instance_info: InstanceInfo,
@@ -251,14 +250,11 @@ impl Network {
) -> Self {
let gossipsub = setup_gossipsub(&mut registry);
let swarm = SwarmBuilder::new(
let swarm = SwarmBuilder::with_tokio_executor(
build_transport(&keypair),
gossipsub,
PeerId::from(keypair.public()),
)
.executor(Box::new(|future| {
tokio::spawn(future);
}))
.build();
info!(
@@ -285,10 +281,10 @@ impl Network {
.build(validator_set)
.expect("need to adjust these params");
let start_time: DateTime<Utc> =
DateTime::parse_from_rfc3339(&client.run_parameters().test_start_time)
.expect("Correct time date format from testground")
.into();
let start_time = client.run_parameters().test_start_time;
// DateTime::parse_from_rfc3339(&client.run_parameters().test_start_time)
// .expect("Correct time date format from testground")
// .into();
let local_start_time = Instant::now();
Network {
@@ -334,6 +330,8 @@ impl Network {
self.record_metrics_info(),
)));
let mut small_rng = SmallRng::from_entropy();
loop {
tokio::select! {
_ = deadline.as_mut() => {
@@ -341,7 +339,7 @@ impl Network {
break;
}
Some(m) = self.messages_gen.next() => {
let payload = m.payload();
let payload = m.payload(&mut small_rng);
let (topic, val) = match m {
Message::BeaconBlock { proposer: ValId(v), slot: _ } => {
(Topic::Blocks, v)

View File

@@ -1,13 +1,13 @@
use crate::InstanceInfo;
use chrono::{DateTime, Local, Utc};
use chrono::{DateTime, FixedOffset, Local};
use libp2p::futures::StreamExt;
use libp2p::PeerId;
use prometheus_client::encoding::proto::openmetrics_data_model::counter_value;
use prometheus_client::encoding::proto::openmetrics_data_model::gauge_value;
use prometheus_client::encoding::proto::openmetrics_data_model::metric_point;
use prometheus_client::encoding::proto::openmetrics_data_model::Metric;
use prometheus_client::encoding::proto::openmetrics_data_model::MetricFamily;
use prometheus_client::encoding::proto::HistogramValue;
use prometheus_client::encoding::protobuf::openmetrics_data_model::counter_value;
use prometheus_client::encoding::protobuf::openmetrics_data_model::gauge_value;
use prometheus_client::encoding::protobuf::openmetrics_data_model::metric_point;
use prometheus_client::encoding::protobuf::openmetrics_data_model::HistogramValue;
use prometheus_client::encoding::protobuf::openmetrics_data_model::Metric;
use prometheus_client::encoding::protobuf::openmetrics_data_model::MetricFamily;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::borrow::Cow;
@@ -54,7 +54,7 @@ pub(crate) async fn publish_and_collect<T: Serialize + DeserializeOwned>(
/// Create InfluxDB queries for Counter metrics.
pub(crate) fn queries_for_counter(
datetime: &DateTime<Utc>,
datetime: &DateTime<FixedOffset>,
family: &MetricFamily,
node_id: usize,
instance_info: &InstanceInfo,
@@ -84,7 +84,7 @@ pub(crate) fn queries_for_counter(
/// Create InfluxDB queries for Counter metrics.
pub(crate) fn initialise_counter(
datetime: &DateTime<Utc>,
datetime: &DateTime<FixedOffset>,
name: String,
hash: String,
node_id: usize,
@@ -102,7 +102,7 @@ pub(crate) fn initialise_counter(
/// Create InfluxDB queries joining counter metrics
#[allow(clippy::too_many_arguments)]
pub(crate) fn queries_for_counter_join(
datetime: &DateTime<Utc>,
datetime: &DateTime<FixedOffset>,
family1: &MetricFamily,
family2: &MetricFamily,
name: &str,
@@ -159,7 +159,7 @@ pub(crate) fn queries_for_counter_join(
/// Create InfluxDB queries for Gauge metrics.
pub(crate) fn queries_for_gauge(
datetime: &DateTime<Utc>,
datetime: &DateTime<FixedOffset>,
family: &MetricFamily,
node_id: usize,
instance_info: &InstanceInfo,
@@ -190,7 +190,7 @@ pub(crate) fn queries_for_gauge(
/// Create InfluxDB queries for Histogram metrics.
pub(crate) fn queries_for_histogram(
datetime: &DateTime<Utc>,
datetime: &DateTime<FixedOffset>,
family: &MetricFamily,
node_id: usize,
instance_info: &InstanceInfo,

2687
scoring/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -10,16 +10,25 @@ chrono = { version = "0.4.19", default-features = false, features = ["clock"] }
delay_map = "0.1.1"
futures = "0.3.24"
gen_topology = { git = "https://github.com/sigp/gossipsub-testground", rev = "4084d26b9210bc932f4aaa45f9506ecfe5f2bb04" }
libp2p = { git = "https://github.com/ackintosh/rust-libp2p.git", branch = "prometheus-ptotobuf-support", default-features = false, features = ["gossipsub", "dns-tokio", "tcp-tokio", "noise", "mplex", "yamux", "serde"] }
# TODO: Update libp2p once the next version, which includes prometheus-client v0.20, has been released.
# See https://github.com/ackintosh/rust-libp2p/commit/df09870c8c2294cbaeb881f58d4f9752125562bc
libp2p = { git = "https://github.com/ackintosh/rust-libp2p.git", branch = "prometheus-client-0.20.0", default-features = false, features = ["gossipsub", "dns", "tcp", "tokio", "noise", "mplex", "yamux", "serde"] }
npg = { git = "https://github.com/sigp/eth-npg", rev = "6118cdd63eb34ffa4b230d17e88c8adf7057e138" }
prometheus-client = { git = "https://github.com/prometheus/client_rust.git", rev = "682b24ee8c6c857b76c0683b1dd7df5a97b75c27", features = ["protobuf"] }
# TODO: Update prometheus-client once the next version, which includes the fix, has been released.
# See https://github.com/prometheus/client_rust/pull/123
prometheus-client = { git = "https://github.com/ackintosh/client_rust.git", branch = "fix/protobuf-labels", features = ["protobuf"] }
prost = "0.11"
rand = "0.8.5"
serde_json = "1.0"
serde = "1.0"
sha2 = "0.10"
slot_clock = { git = "https://github.com/divagant-martian/lighthouse", branch = "sizes" }
testground = "0.4.0"
# TODO: Update testground once the next version(v0.5.0).
testground = { git = "https://github.com/testground/sdk-rust.git", rev = "1fd032ec29361a00b25c0c8a6bac5f19a43019eb" }
tokio = { version = "1.21.2", features = ["macros"] }
tracing = "0.1.35"
tracing-subscriber = { version = "0.3.14", features = ["env-filter"] }
@@ -28,7 +37,7 @@ types = { git = "https://github.com/divagant-martian/lighthouse", branch = "size
# This is a fork of `libp2p` in order to implement malicious behaviour in the `attacker` module.
# This `libp2p-testground` is used in `attacker` module instead of `libp2p`.
# See https://github.com/ackintosh/rust-libp2p/pull/50
libp2p-testground = { git = "https://github.com/ackintosh/rust-libp2p.git", branch = "testground", package = "libp2p", default-features = false, features = ["gossipsub", "dns-tokio", "tcp-tokio", "noise", "mplex", "yamux", "serde"] }
libp2p-testground = { git = "https://github.com/ackintosh/rust-libp2p.git", rev = "8e8be3f465cb9815fd84184c32805541db546aa7", package = "libp2p", default-features = false, features = ["gossipsub", "dns", "tcp", "tokio", "noise", "mplex", "yamux", "serde"] }
[patch]
[patch.crates-io]

View File

@@ -1,6 +1,6 @@
# This Dockerfile is for the `docker:generic` builder.
# See https://docs.testground.ai/builder-library/docker-generic for details about the builder.
FROM rust:1.64-bullseye as builder
FROM rust:1.67-bullseye as builder
WORKDIR /usr/src/test-plan
# * `prost-build`, a dependency of `libp2p-gossipsub`, requires cmake.

View File

@@ -29,7 +29,8 @@ use libp2p_testground::swarm::{
ConnectionHandler, IntoConnectionHandler, NetworkBehaviour, NetworkBehaviourAction,
NotifyHandler, PollParameters, SwarmBuilder, SwarmEvent,
};
use libp2p_testground::tcp::{GenTcpConfig, TokioTcpTransport};
use libp2p_testground::tcp::tokio::Transport as TcpTransport;
use libp2p_testground::tcp::Config as TcpConfig;
use libp2p_testground::yamux::YamuxConfig;
use libp2p_testground::{Multiaddr, Transport};
use libp2p_testground::{PeerId, Swarm};
@@ -37,7 +38,6 @@ use prost::Message;
use std::borrow::Cow;
use std::collections::{HashMap, VecDeque};
use std::fmt::Debug;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use testground::client::Client;
@@ -108,24 +108,19 @@ pub(crate) async fn run(client: Client) -> Result<(), Box<dyn std::error::Error>
}
fn build_swarm(keypair: Keypair) -> Swarm<MaliciousBehaviour> {
SwarmBuilder::new(
SwarmBuilder::with_tokio_executor(
build_transport(&keypair),
MaliciousBehaviour::new(),
PeerId::from(keypair.public()),
)
.executor(Box::new(|future| {
tokio::spawn(future);
}))
.build()
}
fn build_transport(
keypair: &Keypair,
) -> libp2p_testground::core::transport::Boxed<(PeerId, StreamMuxerBox)> {
let transport = TokioDnsConfig::system(TokioTcpTransport::new(
GenTcpConfig::default().nodelay(true),
))
.expect("DNS config");
let transport = TokioDnsConfig::system(TcpTransport::new(TcpConfig::default().nodelay(true)))
.expect("DNS config");
let noise_keys = libp2p_testground::noise::Keypair::<X25519Spec>::new()
.into_authentic(keypair)
@@ -205,7 +200,7 @@ where
}
type GossipsubNetworkBehaviourAction =
NetworkBehaviourAction<GossipsubEvent, GossipsubHandler, Arc<GossipsubHandlerIn>>;
NetworkBehaviourAction<GossipsubEvent, GossipsubHandler, GossipsubHandlerIn>;
pub struct MaliciousBehaviour {
/// Configuration providing gossipsub performance parameters.
@@ -316,7 +311,7 @@ impl MaliciousBehaviour {
self.events
.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id,
event: Arc::new(GossipsubHandlerIn::Message(message)),
event: GossipsubHandlerIn::Message(message),
handler: NotifyHandler::Any,
})
}
@@ -519,10 +514,7 @@ impl NetworkBehaviour for MaliciousBehaviour {
_params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
if let Some(event) = self.events.pop_front() {
return Poll::Ready(event.map_in(|e: Arc<GossipsubHandlerIn>| {
// clone send event reference if others references are present
Arc::try_unwrap(e).unwrap_or_else(|e| (*e).clone())
}));
return Poll::Ready(event);
}
loop {

View File

@@ -13,27 +13,27 @@ use libp2p::core::muxing::StreamMuxerBox;
use libp2p::core::upgrade::{SelectUpgrade, Version};
use libp2p::dns::TokioDnsConfig;
use libp2p::futures::StreamExt;
use libp2p::gossipsub::error::PublishError;
use libp2p::gossipsub::metrics::Config;
use libp2p::gossipsub::subscription_filter::AllowAllSubscriptionFilter;
use libp2p::gossipsub::{
FastMessageId, Gossipsub, GossipsubConfigBuilder, GossipsubEvent, GossipsubMessage, IdentTopic,
IdentityTransform, MessageAuthenticity, MessageId, RawGossipsubMessage, ValidationMode,
Behaviour, ConfigBuilder, Event, FastMessageId, IdentTopic, IdentityTransform,
Message as GossipsubMessage, MessageAuthenticity, MessageId, PublishError, RawMessage,
ValidationMode,
};
use libp2p::identity::Keypair;
use libp2p::mplex::MplexConfig;
use libp2p::multiaddr::Protocol;
use libp2p::noise::NoiseConfig;
use libp2p::swarm::{SwarmBuilder, SwarmEvent};
use libp2p::tcp::{GenTcpConfig, TokioTcpTransport};
use libp2p::tcp::tokio::Transport as TcpTransport;
use libp2p::tcp::Config as TcpConfig;
use libp2p::yamux::YamuxConfig;
use libp2p::{Multiaddr, Transport};
use libp2p::{PeerId, Swarm};
use npg::slot_generator::Subnet;
use npg::slot_generator::ValId;
use npg::{Generator, Message};
use prometheus_client::encoding::proto::openmetrics_data_model::MetricSet;
use prometheus_client::encoding::proto::EncodeMetric;
use prometheus_client::encoding::protobuf::openmetrics_data_model::MetricSet;
use prometheus_client::registry::Registry;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
@@ -157,7 +157,7 @@ pub(crate) async fn run(client: Client) -> Result<(), Box<dyn std::error::Error>
// /////////////////////////////////////////////////////////////////////////////////////////////
// Start libp2p and dial the designated outbound peers
// /////////////////////////////////////////////////////////////////////////////////////////////
let mut registry: Registry<Box<dyn EncodeMetric>> = Registry::default();
let mut registry = Registry::default();
registry.sub_registry_with_prefix("gossipsub");
let mut network = Network::new(
&mut registry,
@@ -214,10 +214,8 @@ pub(crate) async fn run(client: Client) -> Result<(), Box<dyn std::error::Error>
/// Set up an encrypted TCP transport over the Mplex and Yamux protocols.
fn build_transport(keypair: &Keypair) -> libp2p::core::transport::Boxed<(PeerId, StreamMuxerBox)> {
let transport = TokioDnsConfig::system(TokioTcpTransport::new(
GenTcpConfig::default().nodelay(true),
))
.expect("DNS config");
let transport = TokioDnsConfig::system(TcpTransport::new(TcpConfig::default().nodelay(true)))
.expect("DNS config");
let noise_keys = libp2p::noise::Keypair::<libp2p::noise::X25519Spec>::new()
.into_authentic(keypair)
@@ -276,7 +274,7 @@ struct RecordMetricsInfo {
}
pub(crate) struct Network {
swarm: Swarm<Gossipsub>,
swarm: Swarm<Behaviour>,
node_id: usize,
beacon_node_info: BeaconNodeInfo,
participants: HashMap<usize, BeaconNodeInfo>,
@@ -295,7 +293,7 @@ pub(crate) struct Network {
impl Network {
#[allow(clippy::too_many_arguments)]
fn new(
registry: &mut Registry<Box<dyn EncodeMetric>>,
registry: &mut Registry,
keypair: Keypair,
node_id: usize,
beacon_node_info: BeaconNodeInfo,
@@ -312,11 +310,10 @@ impl Network {
[..20],
)
};
let fast_gossip_message_id = |message: &RawGossipsubMessage| {
FastMessageId::from(&Sha256::digest(&message.data)[..8])
};
let fast_gossip_message_id =
|message: &RawMessage| FastMessageId::from(&Sha256::digest(&message.data)[..8]);
let gossipsub_config = GossipsubConfigBuilder::default()
let gossipsub_config = ConfigBuilder::default()
// Following params are set based on lighthouse.
.max_transmit_size(10 * 1_048_576) // 10M
.prune_backoff(Duration::from_secs(PRUNE_BACKOFF))
@@ -340,7 +337,7 @@ impl Network {
.build()
.expect("Valid gossipsub configuration");
let mut gs = Gossipsub::new_with_subscription_filter_and_transform(
let mut gs = Behaviour::new_with_subscription_filter_and_transform(
MessageAuthenticity::Anonymous,
gossipsub_config,
Some((registry, Config::default())),
@@ -360,14 +357,11 @@ impl Network {
gs
};
let swarm = SwarmBuilder::new(
let swarm = SwarmBuilder::with_tokio_executor(
build_transport(&keypair),
gossipsub,
PeerId::from(keypair.public()),
)
.executor(Box::new(|future| {
tokio::spawn(future);
}))
.build();
info!(
@@ -504,11 +498,7 @@ impl Network {
Ok(())
}
async fn run_sim(
&mut self,
run_duration: Duration,
registry: &Registry<Box<dyn EncodeMetric>>,
) {
async fn run_sim(&mut self, run_duration: Duration, registry: &Registry) {
let deadline = tokio::time::sleep(run_duration);
futures::pin_mut!(deadline);
@@ -592,9 +582,9 @@ impl Network {
self.swarm.behaviour_mut().publish(ident_topic, msg)
}
fn handle_gossipsub_event(&mut self, event: GossipsubEvent) {
fn handle_gossipsub_event(&mut self, event: Event) {
match event {
GossipsubEvent::Message {
Event::Message {
propagation_source: _,
message_id: _,
message,
@@ -811,10 +801,10 @@ impl Network {
}
}
fn metrics_info(&self, registry: &Registry<Box<dyn EncodeMetric>>) -> RecordMetricsInfo {
fn metrics_info(&self, registry: &Registry) -> RecordMetricsInfo {
RecordMetricsInfo {
client: self.client.clone(),
metrics: prometheus_client::encoding::proto::encode(registry),
metrics: prometheus_client::encoding::protobuf::encode(registry).unwrap(),
peer_id: self.beacon_node_info.peer_id,
current: Local::now(),
}

View File

@@ -2,10 +2,10 @@ use crate::beacon_node::BeaconNodeInfo;
use chrono::{DateTime, Local};
use libp2p::futures::StreamExt;
use libp2p::PeerId;
use prometheus_client::encoding::proto::openmetrics_data_model::counter_value;
use prometheus_client::encoding::proto::openmetrics_data_model::metric_point;
use prometheus_client::encoding::proto::openmetrics_data_model::Metric;
use prometheus_client::encoding::proto::openmetrics_data_model::MetricFamily;
use prometheus_client::encoding::protobuf::openmetrics_data_model::counter_value;
use prometheus_client::encoding::protobuf::openmetrics_data_model::metric_point;
use prometheus_client::encoding::protobuf::openmetrics_data_model::Metric;
use prometheus_client::encoding::protobuf::openmetrics_data_model::MetricFamily;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::borrow::Cow;

2378
smoke/Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -6,11 +6,14 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
libp2p = { version = "0.48.0", default-features = false, features = ["gossipsub", "dns-tokio", "tcp-tokio", "noise", "mplex", "yamux"] }
# TODO: Update libp2p once the next version, which includes prometheus-client v0.20, has been released.
# See https://github.com/ackintosh/rust-libp2p/commit/df09870c8c2294cbaeb881f58d4f9752125562bc
libp2p = { git = "https://github.com/ackintosh/rust-libp2p.git", branch = "prometheus-client-0.20.0", default-features = false, features = ["gossipsub", "dns", "tcp", "tokio", "noise", "mplex", "yamux", "serde"] }
rand = "0.8.5"
serde = "1.0.137"
serde_json = "1.0.81"
testground = "0.4.0"
# TODO: Update testground once the next version(v0.5.0).
testground = { git = "https://github.com/testground/sdk-rust.git", rev = "1fd032ec29361a00b25c0c8a6bac5f19a43019eb" }
tokio = { version = "1.19.2", features = ["macros"] }
tracing = "0.1.35"
tracing-subscriber = { version = "0.3.11", features = ["env-filter"] }

View File

@@ -1,6 +1,6 @@
# This Dockerfile is for the `docker:generic` builder.
# See https://docs.testground.ai/builder-library/docker-generic for details about the builder.
FROM rust:1.64-bullseye as builder
FROM rust:1.67-bullseye as builder
WORKDIR /usr/src/test-plan
# * `prost-build`, a dependency of `libp2p-gossipsub`, requires cmake.

View File

@@ -8,15 +8,15 @@ use libp2p::futures::FutureExt;
use libp2p::futures::StreamExt;
use libp2p::gossipsub::subscription_filter::AllowAllSubscriptionFilter;
use libp2p::gossipsub::{
Gossipsub, GossipsubConfigBuilder, GossipsubEvent, IdentTopic as Topic, IdentityTransform,
MessageAuthenticity,
Behaviour, ConfigBuilder, Event, IdentTopic as Topic, IdentityTransform, MessageAuthenticity,
};
use libp2p::identity::Keypair;
use libp2p::mplex::MplexConfig;
use libp2p::multiaddr::Protocol;
use libp2p::noise::NoiseConfig;
use libp2p::swarm::{SwarmBuilder, SwarmEvent};
use libp2p::tcp::{GenTcpConfig, TokioTcpTransport};
use libp2p::tcp::tokio::Transport as TcpTransport;
use libp2p::tcp::Config as TcpConfig;
use libp2p::yamux::YamuxConfig;
use libp2p::{Multiaddr, PeerId, Swarm, Transport};
use rand::seq::SliceRandom;
@@ -50,7 +50,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// ////////////////////////////////////////////////////////////////////////
let mut swarm = {
// Build a Gossipsub network behaviour.
let gossipsub_config = GossipsubConfigBuilder::default()
let gossipsub_config = ConfigBuilder::default()
.history_length(
client
.run_parameters()
@@ -61,7 +61,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
)
.build()
.expect("Valid configuration");
let gossipsub = Gossipsub::new_with_subscription_filter_and_transform(
let gossipsub = Behaviour::new_with_subscription_filter_and_transform(
MessageAuthenticity::Signed(local_key.clone()),
gossipsub_config,
None,
@@ -70,10 +70,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
)
.expect("Valid configuration");
SwarmBuilder::new(build_transport(&local_key), gossipsub, local_peer_id)
.executor(Box::new(|future| {
tokio::spawn(future);
}))
SwarmBuilder::with_tokio_executor(build_transport(&local_key), gossipsub, local_peer_id)
.build()
};
@@ -164,7 +161,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
loop {
match swarm.select_next_some().await {
SwarmEvent::Behaviour(gossipsub_event) => match gossipsub_event {
GossipsubEvent::Subscribed { peer_id, topic } => {
Event::Subscribed { peer_id, topic } => {
client.record_message(format!(
"Peer {} subscribed to a topic: {}",
peer_id, topic
@@ -210,7 +207,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
loop {
match swarm.select_next_some().await {
SwarmEvent::Behaviour(gossipsub_event) => match gossipsub_event {
GossipsubEvent::Message {
Event::Message {
propagation_source,
message_id: _,
message,
@@ -261,10 +258,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Set up an encrypted TCP transport over the Mplex and Yamux protocols.
fn build_transport(keypair: &Keypair) -> libp2p::core::transport::Boxed<(PeerId, StreamMuxerBox)> {
let transport = TokioDnsConfig::system(TokioTcpTransport::new(
GenTcpConfig::default().nodelay(true),
))
.expect("DNS config");
let transport = TokioDnsConfig::system(TcpTransport::new(TcpConfig::default().nodelay(true)))
.expect("DNS config");
let noise_keys = libp2p::noise::Keypair::<libp2p::noise::X25519Spec>::new()
.into_authentic(keypair)
@@ -320,7 +315,7 @@ async fn publish_and_collect<T: Serialize + DeserializeOwned>(
/// Sets a barrier on the supplied state that fires when it reaches all participants.
async fn barrier_and_drive_swarm(
client: &Client,
swarm: &mut Swarm<Gossipsub>,
swarm: &mut Swarm<Behaviour>,
state: impl Into<Cow<'static, str>> + Copy,
event_subscribed: &mut usize,
event_message: &mut HashSet<PeerId>,
@@ -342,14 +337,14 @@ async fn barrier_and_drive_swarm(
for event in &events {
match event {
SwarmEvent::Behaviour(gossipsub_event) => match gossipsub_event {
GossipsubEvent::Subscribed { peer_id, topic } => {
Event::Subscribed { peer_id, topic } => {
client.record_message(format!(
"Peer {} subscribed to a topic: {}",
peer_id, topic
));
*event_subscribed += 1;
}
GossipsubEvent::Message {
Event::Message {
propagation_source,
message_id: _,
message,