diff --git a/Cargo.lock b/Cargo.lock index 6967d489b..5f1a73be3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4130,6 +4130,7 @@ dependencies = [ "darkfi", "easy-parallel", "futures", + "fxhash", "hex", "log", "notify", diff --git a/bin/tau/taud/Cargo.toml b/bin/tau/taud/Cargo.toml index ee9e9f4c4..dfd21d8db 100644 --- a/bin/tau/taud/Cargo.toml +++ b/bin/tau/taud/Cargo.toml @@ -30,6 +30,7 @@ chrono = "0.4.19" thiserror = "1.0.31" ctrlc-async = {version= "3.2.2", default-features = false, features = ["async-std", "termination"]} url = "2.2.2" +fxhash = "0.2.1" # Encoding and parsing serde = {version = "1.0.138", features = ["derive"]} diff --git a/bin/tau/taud/src/main.rs b/bin/tau/taud/src/main.rs index 32bb6b406..bb19bb63b 100644 --- a/bin/tau/taud/src/main.rs +++ b/bin/tau/taud/src/main.rs @@ -8,6 +8,7 @@ use log::{debug, error, info, warn}; use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher}; use smol::future; use structopt_toml::StructOptToml; +use fxhash::FxHashMap; use darkfi::{ async_daemonize, net, @@ -232,7 +233,7 @@ async fn realmain(settings: Args, executor: Arc>) -> Result<()> { //Raft // let net_settings = settings.net; - let seen_net_msgs = Arc::new(Mutex::new(vec![])); + let seen_net_msgs = Arc::new(Mutex::new(FxHashMap::default())); let datastore_raft = datastore_path.join("tau.db"); let mut raft = Raft::::new( diff --git a/src/raft/consensus.rs b/src/raft/consensus.rs index f5c1c7625..e36f40e13 100644 --- a/src/raft/consensus.rs +++ b/src/raft/consensus.rs @@ -5,6 +5,7 @@ use async_std::{ use std::{cmp::min, path::PathBuf, time::Duration}; use async_executor::Executor; +use chrono::Utc; use futures::{select, FutureExt}; use fxhash::FxHashMap; use log::{debug, error, info, warn}; @@ -25,12 +26,15 @@ use super::{ DataStore, }; -// In milliseconds +// Milliseconds const HEARTBEATTIMEOUT: u64 = 500; const TIMEOUT: u64 = 6000; const TIMEOUT_NODES: u64 = 1000; const SYNC_TIMEOUT_FOR_EACH_ATTEMPT: u64 = 2000; +// Seconds +const SEEN_DURATION: i64 = 120; + const SYNC_ATTEMPTS: u64 = 30; async fn load_node_ids_loop( @@ -53,6 +57,24 @@ async fn load_node_ids_loop( } } +// Auxilary function to periodically prun seen messages, based on when they were received. +// This helps us to prevent broadcasting loops. +async fn prune_seen_messages(map: Arc>>) { + loop { + crate::util::sleep(SEEN_DURATION as u64).await; + debug!("Pruning seen messages"); + + let now = Utc::now().timestamp(); + + let mut map = map.lock().await; + for (k, v) in map.clone().iter() { + if now - v > SEEN_DURATION { + map.remove(k); + } + } + } +} + async fn p2p_send_loop(receiver: async_channel::Receiver, p2p: net::P2pPtr) -> Result<()> { loop { let msg: NetMsg = receiver.recv().await?; @@ -87,14 +109,14 @@ pub struct Raft { datastore: DataStore, - seen_msgs: Arc>>, + seen_msgs: Arc>>, } impl Raft { pub fn new( addr: Option, db_path: PathBuf, - seen_msgs: Arc>>, + seen_msgs: Arc>>, ) -> Result { if db_path.to_str().is_none() { error!(target: "raft", "datastore path is incorrect"); @@ -141,6 +163,8 @@ impl Raft { let load_ips_task = executor.spawn(load_node_ids_loop(self.nodes.clone(), p2p.clone(), self.role.clone())); + let prune_seen_messages_task = executor.spawn(prune_seen_messages(self.seen_msgs.clone())); + let mut synced = false; // Sync listener node @@ -208,6 +232,7 @@ impl Raft { warn!(target: "raft", "Raft Terminating..."); load_ips_task.cancel().await; p2p_send_task.cancel().await; + prune_seen_messages_task.cancel().await; self.datastore.flush().await?; Ok(()) } @@ -288,7 +313,7 @@ impl Raft { } else if self.logs_len() >= sr.logs_len && self.get_log(sr.logs_len - 1)?.term == sr.last_term { - self.slice_logs_from(sr.logs_len)? + self.slice_logs_from(sr.logs_len)?.unwrap() } else { wipe = true; self.logs()?.clone() @@ -352,7 +377,7 @@ impl Raft { self.role, random_id, &recipient_id.is_some(), &method); let net_msg = NetMsg { id: random_id, recipient_id, payload: payload.to_vec(), method }; - self.seen_msgs.lock().await.push(random_id); + self.seen_msgs.lock().await.insert(random_id.to_string(), Utc::now().timestamp()); self.sender.0.send(net_msg).await?; Ok(()) @@ -501,9 +526,9 @@ impl Raft { } }; - let suffix: Logs = match self.slice_logs_from(prefix_len) { - Ok(l) => l, - Err(_) => return Ok(()), + let suffix: Logs = match self.slice_logs_from(prefix_len)? { + Some(l) => l, + None => return Ok(()), }; let mut prefix_term = 0; @@ -706,10 +731,12 @@ impl Raft { fn get_log(&self, index: u64) -> Result { self.datastore.logs.get(index) } - fn slice_logs_from(&self, index: u64) -> Result { - Ok(Logs(self.datastore.logs.get_gt(index)?)) + fn slice_logs_from(&self, index: u64) -> Result> { + let logs = self.logs()?; + Ok(logs.slice_from(index)) } fn slice_logs_to(&self, index: u64) -> Result { - Ok(Logs(self.datastore.logs.get_lt(index)?)) + let logs = self.logs()?; + Ok(logs.slice_to(index)) } } diff --git a/src/raft/datastore.rs b/src/raft/datastore.rs index f1521ab19..01c1fcafb 100644 --- a/src/raft/datastore.rs +++ b/src/raft/datastore.rs @@ -114,40 +114,6 @@ impl DataTree { ))) } - pub fn get_lt(&self, index: u64) -> Result> { - let mut ret: Vec = Vec::new(); - - let index = index.to_be_bytes(); - - for i in self.tree.get_lt(index).iter() { - if i.is_none() { - return Ok(ret) - } - let data = i.as_ref().unwrap(); - let data = deserialize(&data.1)?; - ret.push(data) - } - - Ok(ret) - } - - pub fn get_gt(&self, index: u64) -> Result> { - let mut ret: Vec = Vec::new(); - - let index = (index - 1).to_be_bytes(); - - for i in self.tree.get_gt(index).iter() { - if i.is_none() { - return Ok(ret) - } - let data = i.as_ref().unwrap(); - let data = deserialize(&data.1)?; - ret.push(data) - } - - Ok(ret) - } - pub fn is_empty(&self) -> bool { self.tree.is_empty() } diff --git a/src/raft/primitives.rs b/src/raft/primitives.rs index 064b592eb..387155500 100644 --- a/src/raft/primitives.rs +++ b/src/raft/primitives.rs @@ -104,6 +104,22 @@ impl Logs { self.0.is_empty() } + pub fn slice_from(&self, start: u64) -> Option { + if self.len() >= start { + return Some(Self(self.0[start as usize..].to_vec())) + } + None + } + + pub fn slice_to(&self, end: u64) -> Self { + for i in (0..end).rev() { + if self.len() >= i { + return Self(self.0[..i as usize].to_vec()) + } + } + Self(vec![]) + } + pub fn get(&self, index: u64) -> Result { match self.0.get(index as usize) { Some(l) => Ok(l.clone()), diff --git a/src/raft/protocol_raft.rs b/src/raft/protocol_raft.rs index 97bf3248d..48ac8ff20 100644 --- a/src/raft/protocol_raft.rs +++ b/src/raft/protocol_raft.rs @@ -2,6 +2,7 @@ use async_std::sync::{Arc, Mutex}; use async_executor::Executor; use async_trait::async_trait; +use fxhash::FxHashMap; use log::debug; use crate::{net, Result}; @@ -14,7 +15,7 @@ pub struct ProtocolRaft { notify_queue_sender: async_channel::Sender, msg_sub: net::MessageSubscription, p2p: net::P2pPtr, - seen_msgs: Arc>>, + seen_msgs: Arc>>, } impl ProtocolRaft { @@ -23,7 +24,7 @@ impl ProtocolRaft { channel: net::ChannelPtr, notify_queue_sender: async_channel::Sender, p2p: net::P2pPtr, - seen_msgs: Arc>>, + seen_msgs: Arc>>, ) -> net::ProtocolBasePtr { let message_subsytem = channel.get_message_subsystem(); message_subsytem.add_dispatch::().await; @@ -53,10 +54,10 @@ impl ProtocolRaft { { let mut msgs = self.seen_msgs.lock().await; - if msgs.contains(&msg.id) { + if msgs.contains_key(&msg.id.to_string()) { continue } - msgs.push(msg.id); + msgs.insert(msg.id.to_string(), chrono::Utc::now().timestamp()); } let msg = (*msg).clone();