diff --git a/Dockerfile b/Dockerfile index 68b5492..e0f0a12 100644 --- a/Dockerfile +++ b/Dockerfile @@ -6,18 +6,17 @@ COPY . . RUN cargo build --release -FROM rust:latest as build +FROM rust:latest -RUN apt-get update && +RUN apt-get update && apt-get install cron -y WORKDIR /node -COPY --from=build /node/target/release/node /node/node - +COPY --from=build /node/target/release/main /node/main COPY cron_runner.sh . RUN chmod +x cron_runner.sh -RUN chmod +x node +RUN chmod +x main EXPOSE 5000 diff --git a/src/main.rs b/src/main.rs index b8bb48f..4dc33c5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,6 @@ use libp2p::{gossipsub, Multiaddr, noise, swarm::NetworkBehaviour, swarm::SwarmEvent, tcp, yamux}; use std::collections::hash_map::DefaultHasher; -use std::{env, thread}; +use std::{env, thread, u128}; use std::error::Error; use std::hash::{Hash, Hasher}; use std::time::{Duration, SystemTime}; @@ -46,7 +46,7 @@ async fn main() -> Result<(), Box> { let gossipsub_config = gossipsub::ConfigBuilder::default() .flood_publish(true) //.opportunistic_graft_ticks(-10000) - .heartbeat_interval(Duration::from_secs(1)) // This is set to aid debugging by not cluttering the log space + .heartbeat_interval(Duration::from_secs(1)) .prune_backoff(Duration::from_secs(60)) .gossip_factor(0.25) .mesh_n(6) @@ -55,7 +55,7 @@ async fn main() -> Result<(), Box> { .retain_scores(6) .mesh_outbound_min(3) .gossip_lazy(6) - .validation_mode(gossipsub::ValidationMode::Strict) // This sets the kind of message validation. The default is Strict (enforce message signing) + .validation_mode(gossipsub::ValidationMode::Strict) .message_id_fn(message_id_fn) // content-address messages. No two messages of the same content will be propagated. .build() .map_err(|msg| io::Error::new(io::ErrorKind::Other, msg))?; // Temporary hack because `build` does not return a proper `std::error::Error`. @@ -81,6 +81,10 @@ async fn main() -> Result<(), Box> { let topic = gossipsub::IdentTopic::new("test-net"); swarm.behaviour_mut().gossipsub.subscribe(&topic)?; + let binding = gethostname(); + let hostname = binding.to_str(); + println!("Hostname: {:?}", hostname); + // Listen on all interfaces and whatever port the OS assigns swarm.listen_on("/ip4/0.0.0.0/tcp/5000".parse()?)?; @@ -103,7 +107,7 @@ async fn main() -> Result<(), Box> { println!("Will connect to peer {element}"); println!("Service: {element}"); - let mut addrs = String::new(); + let mut addrs = String::from("/ip4/"); loop { match lookup_host(&t_address).await { @@ -111,7 +115,8 @@ async fn main() -> Result<(), Box> { for addr in lookup_result { if addr.is_ipv4() { println!("Resolved IPv4 address: {}", addr.ip()); - addrs.push(format!("{}:5000", addr.ip().to_string()).as_str().parse().unwrap()); + addrs.push_str(addr.ip().to_string().as_str()); + addrs.push_str("/tcp/5000"); } } } @@ -126,6 +131,7 @@ async fn main() -> Result<(), Box> { } loop { + println!("Trying to dial with {}", addrs); match swarm.dial(addrs.parse::().unwrap()) { Ok(..) => { connected += 1; @@ -143,47 +149,62 @@ async fn main() -> Result<(), Box> { } println!("Mesh size: {}", swarm.network_info().num_peers()); - let binding = gethostname(); - let hostname = binding.to_str(); - println!("Hostname: {:?}", hostname); let turn_to_publish: i32 = hostname.expect("No hostname").trim_start_matches("pod-").parse().unwrap(); println!("Publishing turn is: {:?}", turn_to_publish); - let mut interval = time::interval(Duration::from_millis(1000)); + let rate = env::var("MSGRATE").expect("$MSGRATE is not set").parse().unwrap(); + let mut interval = time::interval(Duration::from_millis(rate)); + let mut counter = 0; loop { select! { _ = interval.tick() => { - let msg_size = 1000; + let msg_size = env::var("MSGSIZE").expect("$MSGSIZE is not set").parse().unwrap(); let int_peers: i32 = peers.parse().unwrap(); - for msg in 0..10000 { - if msg % int_peers.clone() == turn_to_publish { - let datetime: DateTime = SystemTime::now().into(); - println!("Sending message at: {}", datetime.format("%d/%m/%Y %T")); - let duration_since_epoch = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap(); - let timestamp_nanos = duration_since_epoch.as_nanos(); - let now_bytes = timestamp_nanos.to_le_bytes(); + if counter % int_peers == turn_to_publish { + let duration_since_epoch = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap(); + let timestamp_nanos = duration_since_epoch.as_nanos(); + println!("Sent: {}", timestamp_nanos); + // Convert the timestamp to little-endian bytes + let now_bytes: [u8; 16] = u128::to_le_bytes(timestamp_nanos as u128); + // Create a new vector of bytes for the message + let new_seq: Vec = vec![0; msg_size]; + // Concatenate the little-endian bytes and the new sequence + let mut now_bytes_vec = Vec::new(); + now_bytes_vec.extend_from_slice(&now_bytes); + now_bytes_vec.extend_from_slice(&new_seq); - let mut now_bytes_vec = Vec::from(&now_bytes[..]); - now_bytes_vec.resize(msg_size.clone(), 0); + let result = swarm + .behaviour_mut().gossipsub + .publish(gossipsub::IdentTopic::new("test-net"), now_bytes_vec); - let _ = swarm - .behaviour_mut().gossipsub - .publish(gossipsub::IdentTopic::new("test-net"), now_bytes_vec); + match result { + Ok(message_id) => { + //let datetime: DateTime = SystemTime::now().into(); + //println!("Sending message {} at: {}", message_id, datetime.format("%d/%m/%Y %T")); + } + Err(error) => { + println!("Publish is Err: {}", error); + } } } + counter += 1; } event = swarm.select_next_some() => match event { SwarmEvent::Behaviour(MyBehaviourEvent::Gossipsub(gossipsub::Event::Message { propagation_source: peer_id, message_id: id, message, - })) => println!( - "Got message: '{}' with id: {id} from peer: {peer_id}", - String::from_utf8_lossy(&message.data), - ), + })) => { + // println!("Got message with id: {id} from peer: {peer_id}"); + let sent_moment = u128::from_le_bytes(message.data[..16].try_into().unwrap()); + let duration_since_epoch = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap(); + let timestamp_nanos = duration_since_epoch.as_nanos(); + let diff = timestamp_nanos - sent_moment; + println!("{}, milliseconds: {}", sent_moment, Duration::from_nanos(diff as u64).as_millis()); + } _ => {} } }