diff --git a/bin/tau/taud/src/main.rs b/bin/tau/taud/src/main.rs index f8e64187c..34570d6e7 100644 --- a/bin/tau/taud/src/main.rs +++ b/bin/tau/taud/src/main.rs @@ -133,8 +133,8 @@ async fn realmain(settings: Args, executor: Arc>) -> Result<()> { let datastore_raft = datastore_path.join("tau.db"); let mut raft = Raft::::new(net_settings.inbound.clone(), datastore_raft)?; - let raft_sender = raft.get_broadcast(); - let commits = raft.get_commits(); + let raft_msgs_sender = raft.get_msgs_channel(); + let commits_recv = raft.get_commits_channel(); let datastore_path_cloned = datastore_path.clone(); let recv_update: smol::Task> = executor.spawn(async move { @@ -147,10 +147,10 @@ async fn realmain(settings: Args, executor: Arc>) -> Result<()> { info!(target: "tau", "save the received task {:?}", tk); let encrypted_task = encrypt_task(&tk, &secret_key,&mut rng)?; tk.save(&datastore_path_cloned)?; - raft_sender.send(encrypted_task).await.map_err(Error::from)?; + raft_msgs_sender.send(encrypted_task).await.map_err(Error::from)?; } } - task = commits.recv().fuse() => { + task = commits_recv.recv().fuse() => { let recv = task.map_err(Error::from)?; let task = decrypt_task(&recv, &secret_key); diff --git a/src/raft/consensus.rs b/src/raft/consensus.rs index 1862f33d8..37dbf8291 100644 --- a/src/raft/consensus.rs +++ b/src/raft/consensus.rs @@ -18,7 +18,7 @@ use crate::{ use super::{ primitives::{ - Broadcast, BroadcastMsgRequest, Log, LogRequest, LogResponse, Logs, MapLength, NetMsg, + BroadcastMsgRequest, Channel, Log, LogRequest, LogResponse, Logs, MapLength, NetMsg, NetMsgMethod, NodeId, Role, Sender, SyncRequest, SyncResponse, VoteRequest, VoteResponse, }, DataStore, @@ -93,8 +93,8 @@ pub struct Raft { sender: Sender, - broadcast_msg: Broadcast, - broadcast_commits: Broadcast, + msgs_channel: Channel, + commits_channel: Channel, datastore: DataStore, } @@ -115,8 +115,8 @@ impl Raft { let commit_length = datastore.commits.get_all()?.len() as u64; // broadcasting channels - let broadcast_msg = async_channel::unbounded::(); - let broadcast_commits = async_channel::unbounded::(); + let msgs_channel = async_channel::unbounded::(); + let commits_channel = async_channel::unbounded::(); let sender = async_channel::unbounded::(); @@ -137,8 +137,8 @@ impl Raft { nodes: Arc::new(Mutex::new(HashMap::new())), last_term: 0, sender, - broadcast_msg, - broadcast_commits, + msgs_channel, + commits_channel, datastore, }) } @@ -163,7 +163,7 @@ impl Raft { let sync_request = SyncRequest { logs_len: self.logs.len(), last_term }; - info!("send sync request"); + info!("Send sync request"); self.send(None, &serialize(&sync_request), NetMsgMethod::SyncRequest, None).await?; self.waiting_for_sync(p2p_recv_channel.clone(), stop_signal.clone()).await?; @@ -171,7 +171,7 @@ impl Raft { let mut rng = rand::thread_rng(); - let broadcast_msg_rv = self.broadcast_msg.1.clone(); + let broadcast_msg_rv = self.msgs_channel.1.clone(); loop { let timeout: Duration = if self.role == Role::Leader { @@ -208,12 +208,12 @@ impl Raft { Ok(()) } - pub fn get_commits(&self) -> async_channel::Receiver { - self.broadcast_commits.1.clone() + pub fn get_commits_channel(&self) -> async_channel::Receiver { + self.commits_channel.1.clone() } - pub fn get_broadcast(&self) -> async_channel::Sender { - self.broadcast_msg.0.clone() + pub fn get_msgs_channel(&self) -> async_channel::Sender { + self.msgs_channel.0.clone() } async fn broadcast_msg(&mut self, msg: &T, msg_id: Option) -> Result<()> { @@ -263,7 +263,7 @@ impl Raft { self.broadcast_msg(&d, Some(msg.id)).await?; } NetMsgMethod::SyncRequest => { - info!("receive sync request"); + info!("Receive sync request"); let sr: SyncRequest = deserialize(&msg.payload)?; self.receive_sync_request(&sr, msg.id).await?; } @@ -297,7 +297,7 @@ impl Raft { wipe, }; - info!("send sync response"); + info!("Send sync response"); for _ in 0..2 { self.send( self.current_leader.clone(), @@ -321,7 +321,7 @@ impl Raft { } async fn receive_sync_response(&mut self, sr: &SyncResponse) -> Result<()> { - info!("receive sync response"); + info!("Receive sync response"); if sr.wipe { self.set_commit_length(&0)?; self.push_logs(&sr.logs)?; @@ -676,7 +676,7 @@ impl Raft { } async fn push_commit(&mut self, commit: &[u8]) -> Result<()> { let commit: T = deserialize(commit)?; - self.broadcast_commits.0.send(commit.clone()).await?; + self.commits_channel.0.send(commit.clone()).await?; self.datastore.commits.insert(&commit) } fn push_log(&mut self, log: &Log) -> Result<()> { diff --git a/src/raft/primitives.rs b/src/raft/primitives.rs index 5826e4b40..15d198a8d 100644 --- a/src/raft/primitives.rs +++ b/src/raft/primitives.rs @@ -8,7 +8,7 @@ use crate::{ Error, Result, }; -pub type Broadcast = (async_channel::Sender, async_channel::Receiver); +pub type Channel = (async_channel::Sender, async_channel::Receiver); pub type Sender = (async_channel::Sender, async_channel::Receiver); #[derive(PartialEq, Eq, Debug, Clone)]