mirror of
https://github.com/darkrenaissance/darkfi.git
synced 2026-04-28 03:00:18 -04:00
raft: remove redundant codes
This commit is contained in:
@@ -69,11 +69,9 @@ async fn p2p_send_loop(receiver: async_channel::Receiver<NetMsg>, p2p: net::P2pP
|
||||
|
||||
pub struct Raft<T> {
|
||||
// this will be derived from the ip
|
||||
// if the node doesn't have an id then will become a listener and doesn't have the right
|
||||
// to request/response votes or response a confirmation for log
|
||||
pub id: Option<NodeId>,
|
||||
|
||||
// these five vars should be on local storage
|
||||
// these four vars should be on local storage
|
||||
current_term: u64,
|
||||
voted_for: Option<NodeId>,
|
||||
logs: Logs,
|
||||
@@ -216,7 +214,7 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
|
||||
warn!(target: "raft", "Raft start() Exit Signal");
|
||||
load_ips_task.cancel().await;
|
||||
p2p_send_task.cancel().await;
|
||||
self.datastore.cancel().await?;
|
||||
self.datastore.flush().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -34,7 +34,7 @@ impl<T: Encodable + Decodable> DataStore<T> {
|
||||
|
||||
Ok(Self { _db, logs, commits, voted_for, current_term })
|
||||
}
|
||||
pub async fn cancel(&self) -> Result<()> {
|
||||
pub async fn flush(&self) -> Result<()> {
|
||||
debug!(target: "raft", "DataStore flush");
|
||||
self._db.flush_async().await?;
|
||||
Ok(())
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use std::{collections::HashMap, io, net::SocketAddr};
|
||||
|
||||
use crate::{
|
||||
impl_vec,
|
||||
util::serial::{serialize, Decodable, Encodable, SerialDecodable, SerialEncodable, VarInt},
|
||||
Error, Result,
|
||||
};
|
||||
@@ -55,9 +56,6 @@ pub struct LogRequest {
|
||||
pub suffix: Logs,
|
||||
}
|
||||
|
||||
#[derive(SerialDecodable, SerialEncodable, Clone, Debug)]
|
||||
pub struct BroadcastMsgRequest(pub Vec<u8>);
|
||||
|
||||
#[derive(SerialDecodable, SerialEncodable, Clone, Debug)]
|
||||
pub struct LogResponse {
|
||||
pub node_id: NodeId,
|
||||
@@ -72,6 +70,9 @@ impl VoteResponse {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(SerialDecodable, SerialEncodable, Clone, Debug)]
|
||||
pub struct BroadcastMsgRequest(pub Vec<u8>);
|
||||
|
||||
#[derive(Clone, Debug, SerialDecodable, SerialEncodable)]
|
||||
pub struct Log {
|
||||
pub term: u64,
|
||||
@@ -89,7 +90,7 @@ impl From<SocketAddr> for NodeId {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
#[derive(Clone, Debug, SerialDecodable, SerialEncodable)]
|
||||
pub struct Logs(pub Vec<Log>);
|
||||
|
||||
impl Logs {
|
||||
@@ -198,32 +199,4 @@ impl Decodable for NetMsgMethod {
|
||||
}
|
||||
}
|
||||
|
||||
impl Encodable for Logs {
|
||||
fn encode<S: io::Write>(&self, s: S) -> Result<usize> {
|
||||
encode_vec(&self.0, s)
|
||||
}
|
||||
}
|
||||
|
||||
impl Decodable for Logs {
|
||||
fn decode<D: io::Read>(d: D) -> Result<Self> {
|
||||
Ok(Self(decode_vec(d)?))
|
||||
}
|
||||
}
|
||||
|
||||
fn encode_vec<T: Encodable, S: io::Write>(vec: &[T], mut s: S) -> Result<usize> {
|
||||
let mut len = 0;
|
||||
len += VarInt(vec.len() as u64).encode(&mut s)?;
|
||||
for c in vec.iter() {
|
||||
len += c.encode(&mut s)?;
|
||||
}
|
||||
Ok(len)
|
||||
}
|
||||
|
||||
fn decode_vec<T: Decodable, D: io::Read>(mut d: D) -> Result<Vec<T>> {
|
||||
let len = VarInt::decode(&mut d)?.0;
|
||||
let mut ret = Vec::with_capacity(len as usize);
|
||||
for _ in 0..len {
|
||||
ret.push(Decodable::decode(&mut d)?);
|
||||
}
|
||||
Ok(ret)
|
||||
}
|
||||
impl_vec!(Log);
|
||||
|
||||
@@ -61,22 +61,16 @@ impl ProtocolRaft {
|
||||
self.p2p.broadcast(msg.clone()).await?;
|
||||
|
||||
match (self.id.clone(), msg.recipient_id.clone()) {
|
||||
// if the local node and the msg recipient have ids
|
||||
// then check if the ids are equal
|
||||
// check if the ids are equal when both
|
||||
// the local node and recipient ids are Some(id)
|
||||
(Some(id), Some(m_id)) => {
|
||||
if id != m_id {
|
||||
continue
|
||||
}
|
||||
}
|
||||
// if the msg doesn't have a recipient id then the msg is a VoteRequest
|
||||
// and if the local node's id is not None then it can receive
|
||||
// and response with a VoteResponse
|
||||
(Some(_), None) => {}
|
||||
// if the local node's id is None but the recipient's id is not None
|
||||
(None, Some(_)) => {}
|
||||
// if the local node's id and msg recipient's id are both None then reject
|
||||
// the msg becuase the local node is listener
|
||||
// reject if both local node and recipient ids are None then
|
||||
(None, None) => continue,
|
||||
_ => {}
|
||||
}
|
||||
|
||||
self.notify_queue_sender.send(msg).await?;
|
||||
|
||||
Reference in New Issue
Block a user