Fixed but with dialing. Ok is handled by polling, bot when doint dial

This commit is contained in:
Alberto Soutullo
2023-12-28 00:00:37 +01:00
parent b1f9fb63bf
commit 7c4279a9ec

View File

@@ -1,4 +1,4 @@
use libp2p::{gossipsub, Multiaddr, noise, swarm::NetworkBehaviour, swarm::SwarmEvent, tcp, yamux};
use libp2p::{gossipsub, noise, swarm::NetworkBehaviour, swarm::SwarmEvent, tcp, yamux};
use std::collections::hash_map::DefaultHasher;
use std::{env, thread, u128};
use std::error::Error;
@@ -8,10 +8,10 @@ use libp2p::gossipsub::TopicScoreParams;
use tokio::{io, select, time};
use tracing_subscriber::EnvFilter;
use rand::prelude::*;
use chrono::prelude::*;
use tokio::net::lookup_host;
use libp2p::futures::StreamExt;
use gethostname::gethostname;
use libp2p::swarm::dial_opts::DialOpts;
// We create a custom network behaviour that combines Gossipsub.
#[derive(NetworkBehaviour)]
@@ -92,7 +92,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
println!("{}, {}", my_id, swarm.local_peer_id().to_string());
println!("Waiting 60 seconds for node building...");
thread::sleep(Duration::from_secs(60));
thread::sleep(Duration::from_secs(30));
let peers = env::var("PEERS").expect("$PEERS is not set");
let mut array: Vec<usize> = (0..peers.parse().unwrap()).collect();
@@ -102,7 +102,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
let connect_to = env::var("CONNECTTO").expect("$CONNECTTO is not set");
let mut connected = 0;
for element in &array {
if connected >= connect_to.parse().unwrap() { break };
if connected >= connect_to.parse().unwrap() { break; };
if *element == my_id.parse::<usize>().unwrap() { continue; }
let t_address = format!("pod-{}:5000", element);
println!("Will connect to peer {element}");
println!("Service: {element}");
@@ -125,48 +126,62 @@ async fn main() -> Result<(), Box<dyn Error>> {
println!("Waiting 15 seconds...");
thread::sleep(Duration::from_secs(15));
continue;
},
}
}
break;
}
loop {
println!("Trying to dial with {}", addrs);
match swarm.dial(addrs.parse::<Multiaddr>().unwrap()) {
match swarm.dial(DialOpts::unknown_peer_id()
.address(addrs.parse().unwrap())
.build()) {
Ok(..) => {
connected += 1;
println!("Connected!");
loop {
match swarm.select_next_some().await {
SwarmEvent::ConnectionEstablished{endpoint, ..} => {
if endpoint.is_dialer() {
connected += 1;
println!("Connected to {:?}!", endpoint);
break;
}
}
swarm_event => {
println!("{:?}", swarm_event);
}
}
}
}
Err(e) => {
eprintln!("Failed to dial: {:?}", e);
println!("Waiting 15 seconds...");
thread::sleep(Duration::from_secs(15));
continue;
},
}
}
break;
}
}
println!("Mesh size: {}", swarm.network_info().num_peers());
println!("Mesh size: {:?}", swarm.network_info().connection_counters());
let turn_to_publish: i32 = hostname.expect("No hostname").trim_start_matches("pod-").parse().unwrap();
let turn_to_publish = my_id.parse::<i32>().unwrap();
println!("Publishing turn is: {:?}", turn_to_publish);
println!("Waiting 30 for connections...");
thread::sleep(Duration::from_secs(30));
let rate = env::var("MSGRATE").expect("$MSGRATE is not set").parse().unwrap();
let mut interval = time::interval(Duration::from_millis(rate));
let msg_size = env::var("MSGSIZE").expect("$MSGSIZE is not set").parse().unwrap();
let int_peers: i32 = peers.parse().unwrap();
let mut counter = 0;
loop {
select! {
_ = interval.tick() => {
let msg_size = env::var("MSGSIZE").expect("$MSGSIZE is not set").parse().unwrap();
let int_peers: i32 = peers.parse().unwrap();
if counter % int_peers == turn_to_publish {
let duration_since_epoch = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap();
let timestamp_nanos = duration_since_epoch.as_nanos();
println!("Sent: {}", timestamp_nanos);
// Convert the timestamp to little-endian bytes
let now_bytes: [u8; 16] = u128::to_le_bytes(timestamp_nanos as u128);
// Create a new vector of bytes for the message
@@ -182,8 +197,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
match result {
Ok(message_id) => {
//let datetime: DateTime<Utc> = SystemTime::now().into();
//println!("Sending message {} at: {}", message_id, datetime.format("%d/%m/%Y %T"));
println!("Sent: {}, id: {}", timestamp_nanos, message_id);
}
Err(error) => {
println!("Publish is Err: {}", error);
@@ -198,7 +212,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
message_id: id,
message,
})) => {
// println!("Got message with id: {id} from peer: {peer_id}");
let sent_moment = u128::from_le_bytes(message.data[..16].try_into().unwrap());
let duration_since_epoch = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap();
let timestamp_nanos = duration_since_epoch.as_nanos();