Changed connection handling logic.

This commit is contained in:
Alberto Soutullo
2023-12-29 16:40:39 +01:00
parent 7c4279a9ec
commit b9b0d56170

View File

@@ -2,8 +2,9 @@ use libp2p::{gossipsub, noise, swarm::NetworkBehaviour, swarm::SwarmEvent, tcp,
use std::collections::hash_map::DefaultHasher;
use std::{env, thread, u128};
use std::error::Error;
use std::fmt::Debug;
use std::hash::{Hash, Hasher};
use std::time::{Duration, SystemTime};
use std::time::{Duration, Instant, SystemTime};
use libp2p::gossipsub::TopicScoreParams;
use tokio::{io, select, time};
use tracing_subscriber::EnvFilter;
@@ -11,6 +12,7 @@ use rand::prelude::*;
use tokio::net::lookup_host;
use libp2p::futures::StreamExt;
use gethostname::gethostname;
use libp2p::core::ConnectedPoint;
use libp2p::swarm::dial_opts::DialOpts;
// We create a custom network behaviour that combines Gossipsub.
@@ -91,8 +93,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
let my_id = env::var("PEERNUMBER").expect("$PEERNUMBER is not set");
println!("{}, {}", my_id, swarm.local_peer_id().to_string());
println!("Waiting 60 seconds for node building...");
thread::sleep(Duration::from_secs(30));
println!("Waiting 30 seconds for node building...");
tokio::time::sleep(Duration::from_secs(30)).await;
let peers = env::var("PEERS").expect("$PEERS is not set");
let mut array: Vec<usize> = (0..peers.parse().unwrap()).collect();
@@ -123,8 +125,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
}
Err(e) => {
eprintln!("Failed to resolve address: {:?}", e);
println!("Waiting 15 seconds...");
thread::sleep(Duration::from_secs(15));
continue;
}
}
@@ -136,40 +136,44 @@ async fn main() -> Result<(), Box<dyn Error>> {
.address(addrs.parse().unwrap())
.build()) {
Ok(..) => {
loop {
match swarm.select_next_some().await {
SwarmEvent::ConnectionEstablished{endpoint, ..} => {
if endpoint.is_dialer() {
connected += 1;
println!("Connected to {:?}!", endpoint);
break;
}
}
swarm_event => {
println!("{:?}", swarm_event);
}
}
}
println!("Dial sent to {}", addrs);
break;
}
Err(e) => {
eprintln!("Failed to dial: {:?}", e);
println!("Waiting 15 seconds...");
thread::sleep(Duration::from_secs(15));
continue;
}
}
break;
}
}
let timeout_duration = Duration::from_secs(30);
println!("Will check connections for 30 seconds");
println!("Started checking connections at {:?}", Instant::now());
let result = tokio::time::timeout(timeout_duration, async {
loop {
match swarm.select_next_some().await {
SwarmEvent::ConnectionEstablished {
endpoint: ConnectedPoint::Dialer { address, .. },
established_in,
..
} => {
println!("Connected to {:?}, took {:?}", address, established_in);
}
swarm_event => {
println!("{:?}", swarm_event);
}
}
}
}).await;
println!("Finished checking connections at {:?}", Instant::now());
println!("Mesh size: {:?}", swarm.network_info().connection_counters());
let turn_to_publish = my_id.parse::<i32>().unwrap();
println!("Publishing turn is: {:?}", turn_to_publish);
println!("Waiting 30 for connections...");
thread::sleep(Duration::from_secs(30));
let rate = env::var("MSGRATE").expect("$MSGRATE is not set").parse().unwrap();
let mut interval = time::interval(Duration::from_millis(rate));
let msg_size = env::var("MSGSIZE").expect("$MSGSIZE is not set").parse().unwrap();
@@ -216,7 +220,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
let duration_since_epoch = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap();
let timestamp_nanos = duration_since_epoch.as_nanos();
let diff = timestamp_nanos - sent_moment;
println!("{}, milliseconds: {}", sent_moment, Duration::from_nanos(diff as u64).as_millis());
println!("{}, milliseconds: {}, id: {}, peer_id: {}", sent_moment, Duration::from_nanos(diff as u64).as_millis(), id, peer_id);
}
_ => {}
}