raft: prune seen messages

This commit is contained in:
ghassmo
2022-07-17 17:33:50 +04:00
parent 5f06f4090d
commit edf2458ee9
7 changed files with 63 additions and 50 deletions

1
Cargo.lock generated
View File

@@ -4130,6 +4130,7 @@ dependencies = [
"darkfi",
"easy-parallel",
"futures",
"fxhash",
"hex",
"log",
"notify",

View File

@@ -30,6 +30,7 @@ chrono = "0.4.19"
thiserror = "1.0.31"
ctrlc-async = {version= "3.2.2", default-features = false, features = ["async-std", "termination"]}
url = "2.2.2"
fxhash = "0.2.1"
# Encoding and parsing
serde = {version = "1.0.138", features = ["derive"]}

View File

@@ -8,6 +8,7 @@ use log::{debug, error, info, warn};
use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher};
use smol::future;
use structopt_toml::StructOptToml;
use fxhash::FxHashMap;
use darkfi::{
async_daemonize, net,
@@ -232,7 +233,7 @@ async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
//Raft
//
let net_settings = settings.net;
let seen_net_msgs = Arc::new(Mutex::new(vec![]));
let seen_net_msgs = Arc::new(Mutex::new(FxHashMap::default()));
let datastore_raft = datastore_path.join("tau.db");
let mut raft = Raft::<EncryptedTask>::new(

View File

@@ -5,6 +5,7 @@ use async_std::{
use std::{cmp::min, path::PathBuf, time::Duration};
use async_executor::Executor;
use chrono::Utc;
use futures::{select, FutureExt};
use fxhash::FxHashMap;
use log::{debug, error, info, warn};
@@ -25,12 +26,15 @@ use super::{
DataStore,
};
// In milliseconds
// Milliseconds
const HEARTBEATTIMEOUT: u64 = 500;
const TIMEOUT: u64 = 6000;
const TIMEOUT_NODES: u64 = 1000;
const SYNC_TIMEOUT_FOR_EACH_ATTEMPT: u64 = 2000;
// Seconds
const SEEN_DURATION: i64 = 120;
const SYNC_ATTEMPTS: u64 = 30;
async fn load_node_ids_loop(
@@ -53,6 +57,24 @@ async fn load_node_ids_loop(
}
}
// Auxilary function to periodically prun seen messages, based on when they were received.
// This helps us to prevent broadcasting loops.
async fn prune_seen_messages(map: Arc<Mutex<fxhash::FxHashMap<String, i64>>>) {
loop {
crate::util::sleep(SEEN_DURATION as u64).await;
debug!("Pruning seen messages");
let now = Utc::now().timestamp();
let mut map = map.lock().await;
for (k, v) in map.clone().iter() {
if now - v > SEEN_DURATION {
map.remove(k);
}
}
}
}
async fn p2p_send_loop(receiver: async_channel::Receiver<NetMsg>, p2p: net::P2pPtr) -> Result<()> {
loop {
let msg: NetMsg = receiver.recv().await?;
@@ -87,14 +109,14 @@ pub struct Raft<T> {
datastore: DataStore<T>,
seen_msgs: Arc<Mutex<Vec<u64>>>,
seen_msgs: Arc<Mutex<FxHashMap<String, i64>>>,
}
impl<T: Decodable + Encodable + Clone> Raft<T> {
pub fn new(
addr: Option<Url>,
db_path: PathBuf,
seen_msgs: Arc<Mutex<Vec<u64>>>,
seen_msgs: Arc<Mutex<FxHashMap<String, i64>>>,
) -> Result<Self> {
if db_path.to_str().is_none() {
error!(target: "raft", "datastore path is incorrect");
@@ -141,6 +163,8 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
let load_ips_task =
executor.spawn(load_node_ids_loop(self.nodes.clone(), p2p.clone(), self.role.clone()));
let prune_seen_messages_task = executor.spawn(prune_seen_messages(self.seen_msgs.clone()));
let mut synced = false;
// Sync listener node
@@ -208,6 +232,7 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
warn!(target: "raft", "Raft Terminating...");
load_ips_task.cancel().await;
p2p_send_task.cancel().await;
prune_seen_messages_task.cancel().await;
self.datastore.flush().await?;
Ok(())
}
@@ -288,7 +313,7 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
} else if self.logs_len() >= sr.logs_len &&
self.get_log(sr.logs_len - 1)?.term == sr.last_term
{
self.slice_logs_from(sr.logs_len)?
self.slice_logs_from(sr.logs_len)?.unwrap()
} else {
wipe = true;
self.logs()?.clone()
@@ -352,7 +377,7 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
self.role, random_id, &recipient_id.is_some(), &method);
let net_msg = NetMsg { id: random_id, recipient_id, payload: payload.to_vec(), method };
self.seen_msgs.lock().await.push(random_id);
self.seen_msgs.lock().await.insert(random_id.to_string(), Utc::now().timestamp());
self.sender.0.send(net_msg).await?;
Ok(())
@@ -501,9 +526,9 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
}
};
let suffix: Logs = match self.slice_logs_from(prefix_len) {
Ok(l) => l,
Err(_) => return Ok(()),
let suffix: Logs = match self.slice_logs_from(prefix_len)? {
Some(l) => l,
None => return Ok(()),
};
let mut prefix_term = 0;
@@ -706,10 +731,12 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
fn get_log(&self, index: u64) -> Result<Log> {
self.datastore.logs.get(index)
}
fn slice_logs_from(&self, index: u64) -> Result<Logs> {
Ok(Logs(self.datastore.logs.get_gt(index)?))
fn slice_logs_from(&self, index: u64) -> Result<Option<Logs>> {
let logs = self.logs()?;
Ok(logs.slice_from(index))
}
fn slice_logs_to(&self, index: u64) -> Result<Logs> {
Ok(Logs(self.datastore.logs.get_lt(index)?))
let logs = self.logs()?;
Ok(logs.slice_to(index))
}
}

View File

@@ -114,40 +114,6 @@ impl<T: Decodable + Encodable> DataTree<T> {
)))
}
pub fn get_lt(&self, index: u64) -> Result<Vec<T>> {
let mut ret: Vec<T> = Vec::new();
let index = index.to_be_bytes();
for i in self.tree.get_lt(index).iter() {
if i.is_none() {
return Ok(ret)
}
let data = i.as_ref().unwrap();
let data = deserialize(&data.1)?;
ret.push(data)
}
Ok(ret)
}
pub fn get_gt(&self, index: u64) -> Result<Vec<T>> {
let mut ret: Vec<T> = Vec::new();
let index = (index - 1).to_be_bytes();
for i in self.tree.get_gt(index).iter() {
if i.is_none() {
return Ok(ret)
}
let data = i.as_ref().unwrap();
let data = deserialize(&data.1)?;
ret.push(data)
}
Ok(ret)
}
pub fn is_empty(&self) -> bool {
self.tree.is_empty()
}

View File

@@ -104,6 +104,22 @@ impl Logs {
self.0.is_empty()
}
pub fn slice_from(&self, start: u64) -> Option<Self> {
if self.len() >= start {
return Some(Self(self.0[start as usize..].to_vec()))
}
None
}
pub fn slice_to(&self, end: u64) -> Self {
for i in (0..end).rev() {
if self.len() >= i {
return Self(self.0[..i as usize].to_vec())
}
}
Self(vec![])
}
pub fn get(&self, index: u64) -> Result<Log> {
match self.0.get(index as usize) {
Some(l) => Ok(l.clone()),

View File

@@ -2,6 +2,7 @@ use async_std::sync::{Arc, Mutex};
use async_executor::Executor;
use async_trait::async_trait;
use fxhash::FxHashMap;
use log::debug;
use crate::{net, Result};
@@ -14,7 +15,7 @@ pub struct ProtocolRaft {
notify_queue_sender: async_channel::Sender<NetMsg>,
msg_sub: net::MessageSubscription<NetMsg>,
p2p: net::P2pPtr,
seen_msgs: Arc<Mutex<Vec<u64>>>,
seen_msgs: Arc<Mutex<FxHashMap<String, i64>>>,
}
impl ProtocolRaft {
@@ -23,7 +24,7 @@ impl ProtocolRaft {
channel: net::ChannelPtr,
notify_queue_sender: async_channel::Sender<NetMsg>,
p2p: net::P2pPtr,
seen_msgs: Arc<Mutex<Vec<u64>>>,
seen_msgs: Arc<Mutex<FxHashMap<String, i64>>>,
) -> net::ProtocolBasePtr {
let message_subsytem = channel.get_message_subsystem();
message_subsytem.add_dispatch::<NetMsg>().await;
@@ -53,10 +54,10 @@ impl ProtocolRaft {
{
let mut msgs = self.seen_msgs.lock().await;
if msgs.contains(&msg.id) {
if msgs.contains_key(&msg.id.to_string()) {
continue
}
msgs.push(msg.id);
msgs.insert(msg.id.to_string(), chrono::Utc::now().timestamp());
}
let msg = (*msg).clone();