diff --git a/src/raft/consensus.rs b/src/raft/consensus.rs index c7ead0d33..96a66d69a 100644 --- a/src/raft/consensus.rs +++ b/src/raft/consensus.rs @@ -69,11 +69,9 @@ async fn p2p_send_loop(receiver: async_channel::Receiver, p2p: net::P2pP pub struct Raft { // this will be derived from the ip - // if the node doesn't have an id then will become a listener and doesn't have the right - // to request/response votes or response a confirmation for log pub id: Option, - // these five vars should be on local storage + // these four vars should be on local storage current_term: u64, voted_for: Option, logs: Logs, @@ -216,7 +214,7 @@ impl Raft { warn!(target: "raft", "Raft start() Exit Signal"); load_ips_task.cancel().await; p2p_send_task.cancel().await; - self.datastore.cancel().await?; + self.datastore.flush().await?; Ok(()) } diff --git a/src/raft/datastore.rs b/src/raft/datastore.rs index beb789160..e82eaf0bd 100644 --- a/src/raft/datastore.rs +++ b/src/raft/datastore.rs @@ -34,7 +34,7 @@ impl DataStore { Ok(Self { _db, logs, commits, voted_for, current_term }) } - pub async fn cancel(&self) -> Result<()> { + pub async fn flush(&self) -> Result<()> { debug!(target: "raft", "DataStore flush"); self._db.flush_async().await?; Ok(()) diff --git a/src/raft/primitives.rs b/src/raft/primitives.rs index c56cfe22a..968192ee2 100644 --- a/src/raft/primitives.rs +++ b/src/raft/primitives.rs @@ -1,6 +1,7 @@ use std::{collections::HashMap, io, net::SocketAddr}; use crate::{ + impl_vec, util::serial::{serialize, Decodable, Encodable, SerialDecodable, SerialEncodable, VarInt}, Error, Result, }; @@ -55,9 +56,6 @@ pub struct LogRequest { pub suffix: Logs, } -#[derive(SerialDecodable, SerialEncodable, Clone, Debug)] -pub struct BroadcastMsgRequest(pub Vec); - #[derive(SerialDecodable, SerialEncodable, Clone, Debug)] pub struct LogResponse { pub node_id: NodeId, @@ -72,6 +70,9 @@ impl VoteResponse { } } +#[derive(SerialDecodable, SerialEncodable, Clone, Debug)] +pub struct BroadcastMsgRequest(pub Vec); + #[derive(Clone, Debug, SerialDecodable, SerialEncodable)] pub struct Log { pub term: u64, @@ -89,7 +90,7 @@ impl From for NodeId { } } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, SerialDecodable, SerialEncodable)] pub struct Logs(pub Vec); impl Logs { @@ -198,32 +199,4 @@ impl Decodable for NetMsgMethod { } } -impl Encodable for Logs { - fn encode(&self, s: S) -> Result { - encode_vec(&self.0, s) - } -} - -impl Decodable for Logs { - fn decode(d: D) -> Result { - Ok(Self(decode_vec(d)?)) - } -} - -fn encode_vec(vec: &[T], mut s: S) -> Result { - let mut len = 0; - len += VarInt(vec.len() as u64).encode(&mut s)?; - for c in vec.iter() { - len += c.encode(&mut s)?; - } - Ok(len) -} - -fn decode_vec(mut d: D) -> Result> { - let len = VarInt::decode(&mut d)?.0; - let mut ret = Vec::with_capacity(len as usize); - for _ in 0..len { - ret.push(Decodable::decode(&mut d)?); - } - Ok(ret) -} +impl_vec!(Log); diff --git a/src/raft/protocol_raft.rs b/src/raft/protocol_raft.rs index c6558ca32..b707f2245 100644 --- a/src/raft/protocol_raft.rs +++ b/src/raft/protocol_raft.rs @@ -61,22 +61,16 @@ impl ProtocolRaft { self.p2p.broadcast(msg.clone()).await?; match (self.id.clone(), msg.recipient_id.clone()) { - // if the local node and the msg recipient have ids - // then check if the ids are equal + // check if the ids are equal when both + // the local node and recipient ids are Some(id) (Some(id), Some(m_id)) => { if id != m_id { continue } } - // if the msg doesn't have a recipient id then the msg is a VoteRequest - // and if the local node's id is not None then it can receive - // and response with a VoteResponse - (Some(_), None) => {} - // if the local node's id is None but the recipient's id is not None - (None, Some(_)) => {} - // if the local node's id and msg recipient's id are both None then reject - // the msg becuase the local node is listener + // reject if both local node and recipient ids are None then (None, None) => continue, + _ => {} } self.notify_queue_sender.send(msg).await?;