raft: proper names for msgs and commits channels & minor changes

This commit is contained in:
ghassmo
2022-06-30 03:38:38 +03:00
parent d97e1d8bae
commit 65393bc322
3 changed files with 22 additions and 22 deletions

View File

@@ -133,8 +133,8 @@ async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
let datastore_raft = datastore_path.join("tau.db");
let mut raft = Raft::<EncryptedTask>::new(net_settings.inbound.clone(), datastore_raft)?;
let raft_sender = raft.get_broadcast();
let commits = raft.get_commits();
let raft_msgs_sender = raft.get_msgs_channel();
let commits_recv = raft.get_commits_channel();
let datastore_path_cloned = datastore_path.clone();
let recv_update: smol::Task<TaudResult<()>> = executor.spawn(async move {
@@ -147,10 +147,10 @@ async fn realmain(settings: Args, executor: Arc<Executor<'_>>) -> Result<()> {
info!(target: "tau", "save the received task {:?}", tk);
let encrypted_task = encrypt_task(&tk, &secret_key,&mut rng)?;
tk.save(&datastore_path_cloned)?;
raft_sender.send(encrypted_task).await.map_err(Error::from)?;
raft_msgs_sender.send(encrypted_task).await.map_err(Error::from)?;
}
}
task = commits.recv().fuse() => {
task = commits_recv.recv().fuse() => {
let recv = task.map_err(Error::from)?;
let task = decrypt_task(&recv, &secret_key);

View File

@@ -18,7 +18,7 @@ use crate::{
use super::{
primitives::{
Broadcast, BroadcastMsgRequest, Log, LogRequest, LogResponse, Logs, MapLength, NetMsg,
BroadcastMsgRequest, Channel, Log, LogRequest, LogResponse, Logs, MapLength, NetMsg,
NetMsgMethod, NodeId, Role, Sender, SyncRequest, SyncResponse, VoteRequest, VoteResponse,
},
DataStore,
@@ -93,8 +93,8 @@ pub struct Raft<T> {
sender: Sender,
broadcast_msg: Broadcast<T>,
broadcast_commits: Broadcast<T>,
msgs_channel: Channel<T>,
commits_channel: Channel<T>,
datastore: DataStore<T>,
}
@@ -115,8 +115,8 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
let commit_length = datastore.commits.get_all()?.len() as u64;
// broadcasting channels
let broadcast_msg = async_channel::unbounded::<T>();
let broadcast_commits = async_channel::unbounded::<T>();
let msgs_channel = async_channel::unbounded::<T>();
let commits_channel = async_channel::unbounded::<T>();
let sender = async_channel::unbounded::<NetMsg>();
@@ -137,8 +137,8 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
nodes: Arc::new(Mutex::new(HashMap::new())),
last_term: 0,
sender,
broadcast_msg,
broadcast_commits,
msgs_channel,
commits_channel,
datastore,
})
}
@@ -163,7 +163,7 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
let sync_request = SyncRequest { logs_len: self.logs.len(), last_term };
info!("send sync request");
info!("Send sync request");
self.send(None, &serialize(&sync_request), NetMsgMethod::SyncRequest, None).await?;
self.waiting_for_sync(p2p_recv_channel.clone(), stop_signal.clone()).await?;
@@ -171,7 +171,7 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
let mut rng = rand::thread_rng();
let broadcast_msg_rv = self.broadcast_msg.1.clone();
let broadcast_msg_rv = self.msgs_channel.1.clone();
loop {
let timeout: Duration = if self.role == Role::Leader {
@@ -208,12 +208,12 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
Ok(())
}
pub fn get_commits(&self) -> async_channel::Receiver<T> {
self.broadcast_commits.1.clone()
pub fn get_commits_channel(&self) -> async_channel::Receiver<T> {
self.commits_channel.1.clone()
}
pub fn get_broadcast(&self) -> async_channel::Sender<T> {
self.broadcast_msg.0.clone()
pub fn get_msgs_channel(&self) -> async_channel::Sender<T> {
self.msgs_channel.0.clone()
}
async fn broadcast_msg(&mut self, msg: &T, msg_id: Option<u64>) -> Result<()> {
@@ -263,7 +263,7 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
self.broadcast_msg(&d, Some(msg.id)).await?;
}
NetMsgMethod::SyncRequest => {
info!("receive sync request");
info!("Receive sync request");
let sr: SyncRequest = deserialize(&msg.payload)?;
self.receive_sync_request(&sr, msg.id).await?;
}
@@ -297,7 +297,7 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
wipe,
};
info!("send sync response");
info!("Send sync response");
for _ in 0..2 {
self.send(
self.current_leader.clone(),
@@ -321,7 +321,7 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
}
async fn receive_sync_response(&mut self, sr: &SyncResponse) -> Result<()> {
info!("receive sync response");
info!("Receive sync response");
if sr.wipe {
self.set_commit_length(&0)?;
self.push_logs(&sr.logs)?;
@@ -676,7 +676,7 @@ impl<T: Decodable + Encodable + Clone> Raft<T> {
}
async fn push_commit(&mut self, commit: &[u8]) -> Result<()> {
let commit: T = deserialize(commit)?;
self.broadcast_commits.0.send(commit.clone()).await?;
self.commits_channel.0.send(commit.clone()).await?;
self.datastore.commits.insert(&commit)
}
fn push_log(&mut self, log: &Log) -> Result<()> {

View File

@@ -8,7 +8,7 @@ use crate::{
Error, Result,
};
pub type Broadcast<T> = (async_channel::Sender<T>, async_channel::Receiver<T>);
pub type Channel<T> = (async_channel::Sender<T>, async_channel::Receiver<T>);
pub type Sender = (async_channel::Sender<NetMsg>, async_channel::Receiver<NetMsg>);
#[derive(PartialEq, Eq, Debug, Clone)]