bin/tau: using channels instead of vector to receive logs from raft

This commit is contained in:
ghassmo
2022-04-14 08:15:15 +04:00
parent 54def23988
commit 039392caa4
2 changed files with 24 additions and 34 deletions

View File

@@ -12,7 +12,6 @@ use darkfi::{
util::{
cli::{log_config, spawn_config, Config},
path::get_config_path,
sleep,
},
Error,
};
@@ -48,7 +47,6 @@ async fn start(settings: Settings, executor: Arc<Executor<'_>>) -> TaudResult<()
let raft_sender = raft.get_broadcast();
let commits = raft.get_commits();
let initial_sync_commits = commits.clone();
let initial_sync_raft_sender = raft_sender.clone();
//
@@ -67,53 +65,48 @@ async fn start(settings: Settings, executor: Arc<Executor<'_>>) -> TaudResult<()
let rpc_interface = Arc::new(JsonRpcInterface::new(rpc_snd, settings.dataset_path.clone()));
let dataset_path_cloned = settings.dataset_path.clone();
let recv_update_from_raft: smol::Task<TaudResult<()>> = executor.spawn(async move {
let recv_update_from_rpc: smol::Task<TaudResult<()>> = executor.spawn(async move {
loop {
let task_info = rpc_rcv.recv().await.map_err(Error::from)?;
if let Some(tk) = task_info {
info!(target: "tau", "save the received task {:?}", tk);
tk.save(&dataset_path_cloned)?;
raft_sender.send(tk).await.map_err(Error::from)?;
}
}
});
let recv_commits = commits.lock().await;
for task_info in recv_commits.iter() {
info!(target: "tau", "update from the commits");
task_info.save(&dataset_path_cloned)?;
}
let dataset_path_cloned = settings.dataset_path.clone();
let recv_update_from_raft: smol::Task<TaudResult<()>> = executor.spawn(async move {
loop {
let task = commits.recv().await.map_err(Error::from)?;
info!(target: "tau", "update from the commits");
task.save(&dataset_path_cloned)?;
}
});
let dataset_path_cloned = settings.dataset_path.clone();
let initial_sync: smol::Task<TaudResult<()>> = executor.spawn(async move {
info!(target: "tau", "Start initial sync waiting the network for 5 seconds");
sleep(5).await;
let recv_commits = initial_sync_commits.lock().await;
for task_info in recv_commits.iter() {
info!(target: "tau", "Save received tasks {:?}", task_info);
task_info.save(&dataset_path_cloned)?;
}
info!(target: "tau", "Start initial sync");
info!(target: "tau", "Upload local tasks");
let tasks = MonthTasks::load_current_open_tasks(&dataset_path_cloned)?;
for task in tasks {
info!(target: "tau", "send local task {:?}", task);
initial_sync_raft_sender.send(task).await.map_err(Error::from)?;
}
Ok(())
});
let ex2 = executor.clone();
ex2.spawn(listen_and_serve(server_config, rpc_interface, executor.clone())).detach();
let executor_cloned = executor.clone();
executor_cloned
.spawn(listen_and_serve(server_config, rpc_interface, executor.clone()))
.detach();
// blocking
raft.start(p2p_settings.clone(), executor.clone()).await?;
recv_update_from_rpc.cancel().await;
recv_update_from_raft.cancel().await;
initial_sync.cancel().await;
Ok(())

View File

@@ -24,7 +24,7 @@ const HEARTBEATTIMEOUT: u64 = 100;
const TIMEOUT: u64 = 300;
const TIMEOUT_NODES: u64 = 300;
pub type BroadcastMsg<T> = (async_channel::Sender<T>, async_channel::Receiver<T>);
pub type Broadcast<T> = (async_channel::Sender<T>, async_channel::Receiver<T>);
type Sender = (async_channel::Sender<NetMsg>, async_channel::Receiver<NetMsg>);
pub struct Raft<T> {
@@ -38,8 +38,6 @@ pub struct Raft<T> {
voted_for: Option<NodeId>,
logs: Logs,
commit_length: u64,
// the log will be added to this vector if it's committed by the majority of nodes
commits: Arc<Mutex<Vec<T>>>,
role: Role,
@@ -56,7 +54,9 @@ pub struct Raft<T> {
sender: Sender,
broadcast_msg: BroadcastMsg<T>,
broadcast_msg: Broadcast<T>,
broadcast_commits: Broadcast<T>,
datastore: DataStore<T>,
}
@@ -74,7 +74,6 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
let mut voted_for = None;
let mut logs = Logs(vec![]);
let mut commit_length = 0;
let mut commits = Arc::new(Mutex::new(vec![]));
let datastore = if db_path.exists() {
let datastore = DataStore::new(db_path_str)?;
@@ -82,13 +81,13 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
voted_for = datastore.voted_for.get_last()?.flatten();
logs = Logs(datastore.logs.get_all()?);
commit_length = datastore.commits_length.get_last()?.unwrap_or(0);
commits = Arc::new(Mutex::new(datastore.commits.get_all()?));
datastore
} else {
DataStore::new(db_path_str)?
};
let broadcast_msg = async_channel::unbounded::<T>();
let broadcast_commits = async_channel::unbounded::<T>();
let sender = async_channel::unbounded::<NetMsg>();
Ok(Self {
@@ -97,7 +96,6 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
voted_for,
logs,
commit_length,
commits,
role: Role::Follower,
current_leader: None,
votes_received: vec![],
@@ -107,6 +105,7 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
last_term: 0,
sender,
broadcast_msg,
broadcast_commits,
datastore,
})
}
@@ -167,8 +166,6 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
if self_id.is_none() {
return
}
// wait the network
task::sleep(Duration::from_secs(5)).await;
loop {
debug!(target: "raft", "load node ids from p2p hosts ips");
task::sleep(Duration::from_millis(TIMEOUT_NODES * 10)).await;
@@ -211,8 +208,8 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
}
}
pub fn get_commits(&self) -> Arc<Mutex<Vec<T>>> {
self.commits.clone()
pub fn get_commits(&self) -> async_channel::Receiver<T> {
self.broadcast_commits.1.clone()
}
pub fn get_broadcast(&self) -> async_channel::Sender<T> {
@@ -572,7 +569,7 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
}
async fn push_commit(&mut self, commit: &[u8]) -> Result<()> {
let commit: T = deserialize(commit)?;
self.commits.lock().await.push(commit.clone());
self.broadcast_commits.0.send(commit.clone()).await?;
self.datastore.commits.insert(&commit)
}
fn push_log(&mut self, i: &Log) -> Result<()> {