script/research/raft_p2p: load p2p hosts ips as nodeids

This commit is contained in:
ghassmo
2022-04-07 10:42:39 +04:00
parent 5f0c516b1f
commit 10de8ad3e2
3 changed files with 81 additions and 45 deletions

View File

@@ -15,16 +15,16 @@ const SLED_COMMITS_LENGTH_TREE: &[u8] = b"_commit_length";
const SLED_VOTED_FOR_TREE: &[u8] = b"_voted_for";
const SLED_CURRENT_TERM_TREE: &[u8] = b"_current_term";
pub struct DataStore {
pub struct DataStore<T> {
_db: sled::Db,
pub logs: DataTree<Log>,
pub commits: DataTree<Vec<u8>>,
pub commits: DataTree<T>,
pub commits_length: DataTree<u64>,
pub voted_for: DataTree<Option<NodeId>>,
pub current_term: DataTree<u64>,
}
impl DataStore {
impl<T: Encodable + Decodable> DataStore<T> {
pub fn new(db_path: &str) -> Result<Self> {
let _db = sled::open(db_path).unwrap();
let logs = DataTree::new(&_db, SLED_LOGS_TREE)?;

View File

@@ -108,7 +108,7 @@ impl<T: BorshSerialize + BorshDeserialize + Clone> VecR<T> {
}
pub fn to_vec(&self) -> Vec<T> {
self.0[..].to_vec()
self.0.clone()
}
}

View File

@@ -10,7 +10,11 @@ use futures::{select, FutureExt};
use log::error;
use rand::Rng;
use darkfi::{net, Result};
use darkfi::{
net,
util::serial::{deserialize, serialize, Decodable, Encodable},
Result,
};
use crate::{
try_from_slice_unchecked, DataStore, Log, LogRequest, LogResponse, NetMsg, NetMsgMethod,
@@ -19,8 +23,12 @@ use crate::{
const HEARTBEATTIMEOUT: u64 = 100;
const TIMEOUT: u64 = 300;
const TIMEOUT_NODES: u64 = 300;
pub struct Raft {
pub type BroadcastMsg<T> = (async_channel::Sender<T>, async_channel::Receiver<T>);
pub type Sender = (async_channel::Sender<NetMsg>, async_channel::Receiver<NetMsg>);
pub struct Raft<T> {
// this will be derived from the ip
id: NodeId,
@@ -30,7 +38,7 @@ pub struct Raft {
logs: VecR<Log>,
commit_length: u64,
// the log will be added to this vector if it's committed by the majority of nodes
commits: Arc<Mutex<Vec<Vec<u8>>>>,
commits: Arc<Mutex<Vec<T>>>,
role: Role,
@@ -41,16 +49,18 @@ pub struct Raft {
sent_length: HashMap<NodeId, u64>,
acked_length: HashMap<NodeId, u64>,
nodes: VecR<NodeId>,
nodes: Arc<Mutex<HashMap<NodeId, SocketAddr>>>,
last_term: u64,
send_queues: Option<async_channel::Sender<NetMsg>>,
sender: Sender,
datastore: DataStore,
broadcast_msg: BroadcastMsg<T>,
datastore: DataStore<T>,
}
impl Raft {
impl<T: Decodable + Encodable + Clone> Raft<T> {
pub fn new(addr: SocketAddr, db_path: PathBuf) -> Result<Self> {
if db_path.to_str().is_none() {
error!(target: "raft", "datastore path is incorrect");
@@ -77,6 +87,9 @@ impl Raft {
DataStore::new(db_path_str)?
};
let broadcast_msg = async_channel::unbounded::<T>();
let sender = async_channel::unbounded::<NetMsg>();
Ok(Self {
id: NodeId::from(addr),
current_term,
@@ -89,9 +102,10 @@ impl Raft {
votes_received: VecR(vec![]),
sent_length: HashMap::new(),
acked_length: HashMap::new(),
nodes: VecR(vec![]),
nodes: Arc::new(Mutex::new(HashMap::new())),
last_term: 0,
send_queues: None,
sender,
broadcast_msg,
datastore,
})
}
@@ -102,9 +116,6 @@ impl Raft {
executor: Arc<Executor<'_>>,
) -> Result<()> {
let (p2p_snd, receive_queues) = async_channel::unbounded::<NetMsg>();
let (send_queues, p2p_recv) = async_channel::unbounded::<NetMsg>();
self.send_queues = Some(send_queues);
let p2p = net::P2p::new(net_settings).await;
let p2p = p2p.clone();
@@ -123,22 +134,38 @@ impl Raft {
// P2p performs seed session
p2p.clone().start(executor.clone()).await?;
// TODO load the peers ips after the seed session finished
// we can drive the nodes ids from the ips
executor.spawn(p2p.clone().run(executor.clone())).detach();
let p2p_cloned = p2p.clone();
let p2p_recv = self.sender.1.clone();
executor
.spawn(async move {
loop {
let msg: NetMsg = p2p_recv.recv().await.unwrap();
p2p.broadcast(msg).await.unwrap();
p2p_cloned.broadcast(msg).await.unwrap();
}
})
.detach();
let self_nodes = self.nodes.clone();
executor
.spawn(async move {
loop {
task::sleep(Duration::from_millis(TIMEOUT_NODES)).await;
let hosts = p2p.hosts().clone();
let nodes_ip = hosts.load_all().await.clone();
let mut nodes = self_nodes.lock().await;
for ip in nodes_ip.iter() {
nodes.insert(NodeId::from(*ip), *ip);
}
}
})
.detach();
let mut rng = rand::thread_rng();
let broadcast_msg_rv = self.broadcast_msg.1.clone();
loop {
let timeout = Duration::from_millis(rng.gen_range(0..200) + TIMEOUT);
let heartbeat_timeout = Duration::from_millis(HEARTBEATTIMEOUT);
@@ -146,31 +173,38 @@ impl Raft {
if self.role == Role::Leader {
select! {
m = receive_queues.recv().fuse() => self.handle_method(m?).await?,
m = broadcast_msg_rv.recv().fuse() => self.broadcast_msg(&m?).await,
_ = task::sleep(heartbeat_timeout).fuse() => self.send_heartbeat().await,
}
} else {
select! {
m = receive_queues.recv().fuse() => self.handle_method(m?).await?,
m = broadcast_msg_rv.recv().fuse() => self.broadcast_msg(&m?).await,
_ = task::sleep(timeout).fuse() => self.send_vote_request().await,
}
}
}
}
pub fn get_commits(&self) -> Arc<Mutex<Vec<Vec<u8>>>> {
pub fn get_commits(&self) -> Arc<Mutex<Vec<T>>> {
self.commits.clone()
}
pub async fn broadcast_msg(&mut self, msg: Vec<u8>) {
pub fn get_broadcast(&self) -> async_channel::Sender<T> {
self.broadcast_msg.0.clone()
}
async fn broadcast_msg(&mut self, msg: &T) {
if self.role == Role::Leader {
let msg = serialize(msg);
let log = Log { msg, term: self.current_term };
self.push_log(&log).unwrap();
self.acked_length.insert(self.id.clone(), self.logs.len());
let nodes = self.nodes.0.clone();
let nodes = self.nodes.lock().await.clone();
for node in nodes.iter() {
self.update_logs(node).await;
self.update_logs(node.0).await;
}
}
}
@@ -197,18 +231,16 @@ impl Raft {
Ok(())
}
async fn send(&self, recipient_id: Option<NodeId>, payload: &[u8], method: NetMsgMethod) {
if let Some(sender) = &self.send_queues {
let rnd = rand::random();
let net_msg = NetMsg { id: rnd, recipient_id, payload: payload.to_vec(), method };
sender.send(net_msg).await.unwrap();
}
let rnd = rand::random();
let net_msg = NetMsg { id: rnd, recipient_id, payload: payload.to_vec(), method };
self.sender.0.send(net_msg).await.unwrap();
}
async fn send_heartbeat(&self) {
if self.role == Role::Leader {
let nodes = self.nodes.0.clone();
let nodes = self.nodes.lock().await.clone();
for node in nodes.iter() {
self.update_logs(node).await;
self.update_logs(node.0).await;
}
}
}
@@ -268,15 +300,17 @@ impl Raft {
if self.role == Role::Candidate && vr.current_term == self.current_term && vr.ok {
self.votes_received.push(&vr.node_id);
if self.votes_received.len() >= ((self.nodes.len() + 1) / 2) {
let nodes = self.nodes.lock().await;
if self.votes_received.len() >= (((nodes.len() + 1) / 2) as u64) {
self.role = Role::Leader;
self.current_leader = Some(self.id.clone());
for node in self.nodes.0.iter() {
self.sent_length.insert(node.clone(), self.logs.len());
self.acked_length.insert(node.clone(), 0);
self.update_logs(node).await;
for node in nodes.iter() {
self.sent_length.insert(node.0.clone(), self.logs.len());
self.acked_length.insert(node.0.clone(), 0);
self.update_logs(node.0).await;
}
}
drop(nodes);
} else if vr.current_term > self.current_term {
self.set_current_term(&vr.current_term).unwrap();
self.role = Role::Follower;
@@ -362,21 +396,22 @@ impl Raft {
}
}
fn acks(&self, length: u64) -> VecR<NodeId> {
VecR::<NodeId>(
self.nodes.0.clone().into_iter().filter(|n| self.acked_length[n] >= length).collect(),
)
fn acks(&self, nodes: HashMap<NodeId, SocketAddr>, length: u64) -> HashMap<NodeId, SocketAddr> {
nodes.into_iter().filter(|n| self.acked_length[&n.0] >= length).collect()
}
async fn commit_log(&mut self) {
let min_acks = (self.nodes.len() + 1) / 2;
let nodes_ptr = self.nodes.lock().await;
let min_acks = ((nodes_ptr.len() + 1) / 2) as usize;
let nodes = nodes_ptr.clone();
drop(nodes_ptr);
let ready: Vec<u64> = self
.logs
.0
.iter()
.enumerate()
.filter(|(i, _)| self.acks(*i as u64).len() >= min_acks)
.filter(|(i, _)| self.acks(nodes.clone(), *i as u64).len() >= min_acks)
.map(|(i, _)| i as u64)
.collect();
@@ -429,9 +464,10 @@ impl Raft {
self.voted_for = i.clone();
self.datastore.voted_for.insert(i)
}
async fn push_commit(&mut self, i: &Vec<u8>) -> Result<()> {
self.commits.lock().await.push(i.clone());
self.datastore.commits.insert(i)
async fn push_commit(&mut self, commit: &Vec<u8>) -> Result<()> {
let commit: T = deserialize(commit)?;
self.commits.lock().await.push(commit.clone());
self.datastore.commits.insert(&commit)
}
fn push_log(&mut self, i: &Log) -> Result<()> {
self.logs.push(i);