raft: more robust syncing process

This commit is contained in:
ghassmo
2022-07-09 18:43:51 +03:00
parent df0a20f571
commit 6675cc2262
2 changed files with 50 additions and 23 deletions

View File

@@ -24,9 +24,13 @@ use super::{
DataStore,
};
// In milliseconds
const HEARTBEATTIMEOUT: u64 = 500;
const TIMEOUT: u64 = 6000;
const TIMEOUT_NODES: u64 = 1000;
const SYNC_TIMEOUT_FOR_EACH_ATTEMPT: u64 = 2000;
const SYNC_ATTEMPTS: u64 = 5;
async fn load_node_ids_loop(
nodes: Arc<Mutex<HashMap<NodeId, Url>>>,
@@ -154,15 +158,36 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
// Sync listener node
if self.role == Role::Listener {
let synced = Arc::new(Mutex::new(false));
let last_term =
if !self.logs.0.is_empty() { self.logs.0.last().unwrap().term } else { 0 };
let sync_request = SyncRequest { logs_len: self.logs.len(), last_term };
info!("Send sync request");
self.send(None, &serialize(&sync_request), NetMsgMethod::SyncRequest, None).await?;
info!("Start Syncing...");
for i in 0..SYNC_ATTEMPTS {
if *synced.lock().await {
info!("Synced Successfully!!");
break
}
self.waiting_for_sync(p2p_recv_channel.clone(), stop_signal.clone()).await?;
info!("Send sync request {}", i);
self.send(None, &serialize(&sync_request), NetMsgMethod::SyncRequest, None).await?;
self.waiting_for_sync(
executor.clone(),
p2p_recv_channel.clone(),
stop_signal.clone(),
synced.clone(),
)
.await?;
}
if !(*synced.lock().await) {
error!("Syncing failed");
return Ok(())
}
}
let mut rng = rand::thread_rng();
@@ -267,7 +292,7 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
}
debug!(target: "raft", "Role: {:?} receive msg id: {} recipient_id: {:?} method: {:?} ",
self.role, msg.id, &msg.recipient_id.is_some(), &msg.method);
self.role, msg.id, &msg.recipient_id.is_some(), &msg.method);
Ok(())
}
@@ -294,15 +319,7 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
};
info!("Send sync response");
for _ in 0..2 {
self.send(
self.current_leader.clone(),
&serialize(&sync_response),
NetMsgMethod::SyncResponse,
None,
)
.await?;
}
self.send(None, &serialize(&sync_response), NetMsgMethod::SyncResponse, None).await?;
} else {
self.send(
self.current_leader.clone(),
@@ -352,7 +369,7 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
let random_id = if msg_id.is_some() { msg_id.unwrap() } else { OsRng.next_u64() };
debug!(target: "raft","Role: {:?} send a msg id: {} recipient_id: {:?} method: {:?} ",
self.role, random_id, &recipient_id.is_some(), &method);
self.role, random_id, &recipient_id.is_some(), &method);
let net_msg = NetMsg { id: random_id, recipient_id, payload: payload.to_vec(), method };
self.seen_msgs.lock().await.push(random_id);
@@ -363,9 +380,20 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
async fn waiting_for_sync(
&mut self,
executor: Arc<Executor<'_>>,
p2p_recv_channel: async_channel::Receiver<NetMsg>,
stop_signal: async_channel::Receiver<()>,
synced: Arc<Mutex<bool>>,
) -> Result<()> {
let (timeout_s, timeout_r) = async_channel::unbounded::<()>();
executor
.spawn(async move {
task::sleep(Duration::from_millis(SYNC_TIMEOUT_FOR_EACH_ATTEMPT)).await;
timeout_s.send(()).await.unwrap_or(());
})
.detach();
info!("Waiting for sync...");
loop {
select! {
msg = p2p_recv_channel.recv().fuse() => {
@@ -373,9 +401,11 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
if msg.method == NetMsgMethod::SyncResponse {
let sr: SyncResponse = deserialize(&msg.payload)?;
self.receive_sync_response(&sr).await?;
*synced.lock().await = true;
break
}},
_ = stop_signal.recv().fuse() => break,
_ = stop_signal.recv().fuse() => break,
_ = timeout_r.recv().fuse() => break,
}
}
Ok(())

View File

@@ -62,17 +62,14 @@ impl ProtocolRaft {
let msg = (*msg).clone();
self.p2p.broadcast(msg.clone()).await?;
match (self.id.clone(), msg.recipient_id.clone()) {
// check if the ids are equal when both
// the local node and recipient ids are Some(id)
(Some(id), Some(m_id)) => {
if id != m_id {
// check if the ids are equal when both
// the local node and recipient ids are Some(id)
if let Some(self_id) = &self.id {
if let Some(recipient_id) = &msg.recipient_id {
if self_id != recipient_id {
continue
}
}
// reject if both local node and recipient ids are None then
(None, None) => continue,
_ => {}
}
self.notify_queue_sender.send(msg).await?;