Adjusted node behavior

This commit is contained in:
Alberto Soutullo
2023-12-24 15:51:24 +01:00
parent 51e92da99e
commit b1f9fb63bf
2 changed files with 51 additions and 31 deletions

View File

@@ -6,18 +6,17 @@ COPY . .
RUN cargo build --release
FROM rust:latest as build
FROM rust:latest
RUN apt-get update &&
RUN apt-get update && apt-get install cron -y
WORKDIR /node
COPY --from=build /node/target/release/node /node/node
COPY --from=build /node/target/release/main /node/main
COPY cron_runner.sh .
RUN chmod +x cron_runner.sh
RUN chmod +x node
RUN chmod +x main
EXPOSE 5000

View File

@@ -1,6 +1,6 @@
use libp2p::{gossipsub, Multiaddr, noise, swarm::NetworkBehaviour, swarm::SwarmEvent, tcp, yamux};
use std::collections::hash_map::DefaultHasher;
use std::{env, thread};
use std::{env, thread, u128};
use std::error::Error;
use std::hash::{Hash, Hasher};
use std::time::{Duration, SystemTime};
@@ -46,7 +46,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
let gossipsub_config = gossipsub::ConfigBuilder::default()
.flood_publish(true)
//.opportunistic_graft_ticks(-10000)
.heartbeat_interval(Duration::from_secs(1)) // This is set to aid debugging by not cluttering the log space
.heartbeat_interval(Duration::from_secs(1))
.prune_backoff(Duration::from_secs(60))
.gossip_factor(0.25)
.mesh_n(6)
@@ -55,7 +55,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
.retain_scores(6)
.mesh_outbound_min(3)
.gossip_lazy(6)
.validation_mode(gossipsub::ValidationMode::Strict) // This sets the kind of message validation. The default is Strict (enforce message signing)
.validation_mode(gossipsub::ValidationMode::Strict)
.message_id_fn(message_id_fn) // content-address messages. No two messages of the same content will be propagated.
.build()
.map_err(|msg| io::Error::new(io::ErrorKind::Other, msg))?; // Temporary hack because `build` does not return a proper `std::error::Error`.
@@ -81,6 +81,10 @@ async fn main() -> Result<(), Box<dyn Error>> {
let topic = gossipsub::IdentTopic::new("test-net");
swarm.behaviour_mut().gossipsub.subscribe(&topic)?;
let binding = gethostname();
let hostname = binding.to_str();
println!("Hostname: {:?}", hostname);
// Listen on all interfaces and whatever port the OS assigns
swarm.listen_on("/ip4/0.0.0.0/tcp/5000".parse()?)?;
@@ -103,7 +107,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
println!("Will connect to peer {element}");
println!("Service: {element}");
let mut addrs = String::new();
let mut addrs = String::from("/ip4/");
loop {
match lookup_host(&t_address).await {
@@ -111,7 +115,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
for addr in lookup_result {
if addr.is_ipv4() {
println!("Resolved IPv4 address: {}", addr.ip());
addrs.push(format!("{}:5000", addr.ip().to_string()).as_str().parse().unwrap());
addrs.push_str(addr.ip().to_string().as_str());
addrs.push_str("/tcp/5000");
}
}
}
@@ -126,6 +131,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
}
loop {
println!("Trying to dial with {}", addrs);
match swarm.dial(addrs.parse::<Multiaddr>().unwrap()) {
Ok(..) => {
connected += 1;
@@ -143,47 +149,62 @@ async fn main() -> Result<(), Box<dyn Error>> {
}
println!("Mesh size: {}", swarm.network_info().num_peers());
let binding = gethostname();
let hostname = binding.to_str();
println!("Hostname: {:?}", hostname);
let turn_to_publish: i32 = hostname.expect("No hostname").trim_start_matches("pod-").parse().unwrap();
println!("Publishing turn is: {:?}", turn_to_publish);
let mut interval = time::interval(Duration::from_millis(1000));
let rate = env::var("MSGRATE").expect("$MSGRATE is not set").parse().unwrap();
let mut interval = time::interval(Duration::from_millis(rate));
let mut counter = 0;
loop {
select! {
_ = interval.tick() => {
let msg_size = 1000;
let msg_size = env::var("MSGSIZE").expect("$MSGSIZE is not set").parse().unwrap();
let int_peers: i32 = peers.parse().unwrap();
for msg in 0..10000 {
if msg % int_peers.clone() == turn_to_publish {
let datetime: DateTime<Utc> = SystemTime::now().into();
println!("Sending message at: {}", datetime.format("%d/%m/%Y %T"));
let duration_since_epoch = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap();
let timestamp_nanos = duration_since_epoch.as_nanos();
let now_bytes = timestamp_nanos.to_le_bytes();
if counter % int_peers == turn_to_publish {
let duration_since_epoch = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap();
let timestamp_nanos = duration_since_epoch.as_nanos();
println!("Sent: {}", timestamp_nanos);
// Convert the timestamp to little-endian bytes
let now_bytes: [u8; 16] = u128::to_le_bytes(timestamp_nanos as u128);
// Create a new vector of bytes for the message
let new_seq: Vec<u8> = vec![0; msg_size];
// Concatenate the little-endian bytes and the new sequence
let mut now_bytes_vec = Vec::new();
now_bytes_vec.extend_from_slice(&now_bytes);
now_bytes_vec.extend_from_slice(&new_seq);
let mut now_bytes_vec = Vec::from(&now_bytes[..]);
now_bytes_vec.resize(msg_size.clone(), 0);
let result = swarm
.behaviour_mut().gossipsub
.publish(gossipsub::IdentTopic::new("test-net"), now_bytes_vec);
let _ = swarm
.behaviour_mut().gossipsub
.publish(gossipsub::IdentTopic::new("test-net"), now_bytes_vec);
match result {
Ok(message_id) => {
//let datetime: DateTime<Utc> = SystemTime::now().into();
//println!("Sending message {} at: {}", message_id, datetime.format("%d/%m/%Y %T"));
}
Err(error) => {
println!("Publish is Err: {}", error);
}
}
}
counter += 1;
}
event = swarm.select_next_some() => match event {
SwarmEvent::Behaviour(MyBehaviourEvent::Gossipsub(gossipsub::Event::Message {
propagation_source: peer_id,
message_id: id,
message,
})) => println!(
"Got message: '{}' with id: {id} from peer: {peer_id}",
String::from_utf8_lossy(&message.data),
),
})) => {
// println!("Got message with id: {id} from peer: {peer_id}");
let sent_moment = u128::from_le_bytes(message.data[..16].try_into().unwrap());
let duration_since_epoch = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap();
let timestamp_nanos = duration_since_epoch.as_nanos();
let diff = timestamp_nanos - sent_moment;
println!("{}, milliseconds: {}", sent_moment, Duration::from_nanos(diff as u64).as_millis());
}
_ => {}
}
}