smoke: upgrade dependencies (#6)

* Consolidate dependencies

* Upgrade rust-libp2p from v0.45.1 to v0.48.0

* Improve the barrier function to use `take_until` instead of `select!`

* Install protoc

* Upgrade testground sdk from v0.3 to v0.4

Co-authored-by: Age Manning <Age@AgeManning.com>
This commit is contained in:
Akihito Nakano
2022-11-22 11:34:12 +09:00
committed by GitHub
parent cb2d84dcc4
commit d24d3c82e0
5 changed files with 700 additions and 745 deletions

View File

@@ -29,6 +29,8 @@ jobs:
sudo apt-get install -y protobuf-compiler
- name: Get latest version of stable Rust
run: rustup update stable
- name: Install Protoc
uses: arduino/setup-protoc@64c0c85d18e984422218383b81c52f8b077404d3 # v1.1.2
- name: Lint code for quality and style with Clippy
run: cargo clippy --workspace --tests -- -D warnings
- name: Certify Cargo.lock freshness

1221
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -6,12 +6,11 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
libp2p = { version = "0.45.1", default-features = false, features = ["gossipsub", "dns-tokio", "tcp-tokio", "noise", "mplex", "yamux"] }
libp2p-gossipsub = "0.38.1"
libp2p = { version = "0.48.0", default-features = false, features = ["gossipsub", "dns-tokio", "tcp-tokio", "noise", "mplex", "yamux"] }
rand = "0.8.5"
serde = "1.0.137"
serde_json = "1.0.81"
testground = "0.3.0"
testground = "0.4.0"
tokio = { version = "1.19.2", features = ["macros"] }
tracing = "0.1.35"
tracing-subscriber = { version = "0.3.11", features = ["env-filter"] }

View File

@@ -3,10 +3,12 @@
FROM rust:1.62-bullseye as builder
WORKDIR /usr/src/test-plan
# `prost-build`, a dependency of `libp2p-gossipsub`, requires cmake.
# There is a discussion for removing cmake from their dependency.
# https://github.com/tokio-rs/prost/pull/620
RUN apt-get update && apt-get install -y cmake
# * `prost-build`, a dependency of `libp2p-gossipsub`, requires cmake.
# There is a discussion for removing cmake from their dependency.
# https://github.com/tokio-rs/prost/pull/620
# * Since `prost-build` v0.11, `protoc` is required.
# https://github.com/tokio-rs/prost/releases/tag/v0.11.0
RUN apt-get update && apt-get install -y cmake && apt-get install -y protobuf-compiler
# Cache dependencies between test runs,
# See https://blog.mgattozzi.dev/caching-rust-docker-builds/

View File

@@ -1,28 +1,33 @@
extern crate core;
use libp2p::core::muxing::StreamMuxerBox;
use libp2p::core::upgrade::{SelectUpgrade, Version};
use libp2p::core::ConnectedPoint;
use libp2p::dns::TokioDnsConfig;
use libp2p::futures::FutureExt;
use libp2p::futures::StreamExt;
use libp2p::gossipsub::subscription_filter::AllowAllSubscriptionFilter;
use libp2p::gossipsub::{
Gossipsub, GossipsubConfigBuilder, GossipsubEvent, IdentTopic as Topic, IdentityTransform,
MessageAuthenticity,
};
use libp2p::identity::Keypair;
use libp2p::mplex::MplexConfig;
use libp2p::multiaddr::Protocol;
use libp2p::noise::NoiseConfig;
use libp2p::swarm::{SwarmBuilder, SwarmEvent};
use libp2p::tcp::TokioTcpConfig;
use libp2p::tcp::{GenTcpConfig, TokioTcpTransport};
use libp2p::yamux::YamuxConfig;
use libp2p::{Multiaddr, PeerId, Transport};
use libp2p_gossipsub::subscription_filter::AllowAllSubscriptionFilter;
use libp2p_gossipsub::{
Gossipsub, GossipsubConfigBuilder, GossipsubEvent, IdentTopic as Topic, IdentityTransform,
MessageAuthenticity,
};
use libp2p::{Multiaddr, PeerId, Swarm, Transport};
use rand::seq::SliceRandom;
use rand::SeedableRng;
use serde::de::DeserializeOwned;
use serde::Serialize;
use serde_json::Value;
use std::borrow::Cow;
use std::collections::HashSet;
use testground::client::Client;
use testground::RunParameters;
use tracing::{debug, info, warn};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
@@ -30,8 +35,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt().with_env_filter(env_filter).init();
}
let (client, run_parameters) = Client::new().await?;
client.wait_network_initialized().await?;
let client = Client::new_and_init().await?;
let local_key = Keypair::generate_ed25519();
let local_peer_id = PeerId::from(local_key.public());
@@ -48,7 +52,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Build a Gossipsub network behaviour.
let gossipsub_config = GossipsubConfigBuilder::default()
.history_length(
run_parameters
client
.run_parameters()
.test_instance_params
.get("gossipsub_history_length")
.ok_or("gossipsub_history_length is not specified")?
@@ -74,7 +79,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let multiaddr = {
let mut multiaddr = Multiaddr::from(
run_parameters
client
.run_parameters()
.data_network_ip()?
.expect("Should have an IP address for the data network"),
);
@@ -86,67 +92,21 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.listen_on(multiaddr.clone())
.expect("Swarm starts listening");
// Sets a barrier on the supplied state that fires when it reaches all participants.
macro_rules! barrier {
($state: expr) => {
loop {
tokio::select! {
_ = client.signal_and_wait($state, run_parameters.test_instance_count) => {
break;
}
event = swarm.select_next_some() => {
// Record the Swarm events that happen while waiting for the barrier.
match event {
SwarmEvent::Behaviour(gossipsub_event) => match gossipsub_event {
GossipsubEvent::Subscribed { peer_id, topic } => {
client.record_message(format!(
"Peer {} subscribed to a topic: {}",
peer_id, topic
));
event_subscribed += 1;
}
GossipsubEvent::Message {
propagation_source,
message_id: _,
message,
} => {
client.record_message(format!(
"Message: propagation_source: {}, source: {:?}",
propagation_source, message.source
));
if !event_message.insert(message.source.expect("Source peer id")) {
client
.record_failure(format!(
"Received duplicated message: {:?}",
message
))
.await?;
return Ok(());
}
}
ev => {
client.record_message(format!("{:?}", ev))
}
}
ev => {
client.record_message(format!("{:?}", ev))
}
}
}
}
}
};
}
barrier!("Started listening");
barrier_and_drive_swarm(
&client,
&mut swarm,
"Started listening",
&mut event_subscribed,
&mut event_message,
)
.await?;
// ////////////////////////////////////////////////////////////////////////
// Connect to a peer randomly selected
// ////////////////////////////////////////////////////////////////////////
let peer_addr = {
// Collect addresses of test case participants.
let mut others = publish_and_collect(&client, &run_parameters, multiaddr.to_string())
let mut others = publish_and_collect(&client, multiaddr.to_string())
.await?
.iter()
.filter(|&addr| addr != &multiaddr.to_string())
@@ -154,7 +114,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.collect::<Vec<_>>();
// Select a peer to connect.
let mut rnd = rand::rngs::StdRng::seed_from_u64(run_parameters.test_instance_count);
let mut rnd =
rand::rngs::StdRng::seed_from_u64(client.run_parameters().test_instance_count);
others.shuffle(&mut rnd);
others.pop().unwrap()
};
@@ -180,7 +141,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
}
barrier!("Connected to a peer");
barrier_and_drive_swarm(
&client,
&mut swarm,
"Connected to a peer",
&mut event_subscribed,
&mut event_message,
)
.await?;
// ////////////////////////////////////////////////////////////////////////
// Subscribe a topic
@@ -213,7 +181,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
}
barrier!("Subscribed a topic");
barrier_and_drive_swarm(
&client,
&mut swarm,
"Subscribed a topic",
&mut event_subscribed,
&mut event_message,
)
.await?;
// ////////////////////////////////////////////////////////////////////////
// Publish a single message
@@ -231,7 +206,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
swarm.behaviour_mut().publish(topic, "message".as_bytes())?;
// Wait until all messages published by participants have been received.
if event_message.len() < (run_parameters.test_instance_count - 1) as usize {
if event_message.len() < (client.run_parameters().test_instance_count - 1) as usize {
loop {
match swarm.select_next_some().await {
SwarmEvent::Behaviour(gossipsub_event) => match gossipsub_event {
@@ -255,7 +230,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
return Ok(());
}
if event_message.len() == (run_parameters.test_instance_count - 1) as usize
if event_message.len()
== (client.run_parameters().test_instance_count - 1) as usize
{
break;
}
@@ -269,7 +245,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
client.record_message("Received all the published messages");
barrier!("Published a message");
barrier_and_drive_swarm(
&client,
&mut swarm,
"Published a message",
&mut event_subscribed,
&mut event_message,
)
.await?;
client.record_success().await?;
@@ -278,8 +261,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Set up an encrypted TCP transport over the Mplex and Yamux protocols.
fn build_transport(keypair: &Keypair) -> libp2p::core::transport::Boxed<(PeerId, StreamMuxerBox)> {
let transport =
TokioDnsConfig::system(TokioTcpConfig::new().nodelay(true)).expect("DNS config");
let transport = TokioDnsConfig::system(TokioTcpTransport::new(
GenTcpConfig::default().nodelay(true),
))
.expect("DNS config");
let noise_keys = libp2p::noise::Keypair::<libp2p::noise::X25519Spec>::new()
.into_authentic(keypair)
@@ -300,21 +285,28 @@ fn build_transport(keypair: &Keypair) -> libp2p::core::transport::Boxed<(PeerId,
// myself.
async fn publish_and_collect<T: Serialize + DeserializeOwned>(
client: &Client,
run_parameters: &RunParameters,
info: T,
) -> Result<Vec<T>, Box<dyn std::error::Error>> {
const TOPIC: &str = "publish_and_collect";
client.publish(TOPIC, serde_json::to_string(&info)?).await?;
client
.publish(
TOPIC,
Cow::Owned(Value::String(serde_json::to_string(&info)?)),
)
.await?;
let mut stream = client.subscribe(TOPIC).await;
let mut stream = client.subscribe(TOPIC, u16::MAX.into()).await;
let mut vec: Vec<T> = vec![];
for _ in 0..run_parameters.test_instance_count {
for _ in 0..client.run_parameters().test_instance_count {
match stream.next().await {
Some(Ok(other)) => {
let info: T = serde_json::from_str(&other)?;
let info: T = match other {
Value::String(str) => serde_json::from_str(&str)?,
_ => unreachable!(),
};
vec.push(info);
}
Some(Err(e)) => return Err(Box::new(e)),
@@ -324,3 +316,58 @@ async fn publish_and_collect<T: Serialize + DeserializeOwned>(
Ok(vec)
}
/// Sets a barrier on the supplied state that fires when it reaches all participants.
async fn barrier_and_drive_swarm(
client: &Client,
swarm: &mut Swarm<Gossipsub>,
state: impl Into<Cow<'static, str>> + Copy,
event_subscribed: &mut usize,
event_message: &mut HashSet<PeerId>,
) -> Result<(), Box<dyn std::error::Error>> {
info!(
"Signal and wait for all peers to signal being done with \"{}\".",
state.into(),
);
let events = swarm
.take_until(
client
.signal_and_wait(state, client.run_parameters().test_instance_count)
.boxed_local(),
)
.collect::<Vec<_>>()
.await;
for event in &events {
match event {
SwarmEvent::Behaviour(gossipsub_event) => match gossipsub_event {
GossipsubEvent::Subscribed { peer_id, topic } => {
client.record_message(format!(
"Peer {} subscribed to a topic: {}",
peer_id, topic
));
*event_subscribed += 1;
}
GossipsubEvent::Message {
propagation_source,
message_id: _,
message,
} => {
client.record_message(format!(
"Message: propagation_source: {}, source: {:?}",
propagation_source, message.source
));
if !event_message.insert(message.source.expect("Source peer id")) {
warn!("Received duplicated message: {:?}", message);
}
}
_ => debug!("GossipsubEvent: {:?}", gossipsub_event),
},
_ => debug!("SwarmEvent: {:?}", event),
};
}
Ok(())
}