raft: add exclude list during broadcasting

This commit is contained in:
ghassmo
2022-05-11 06:46:57 +03:00
committed by parazyd
parent 85105ad730
commit f0b8afc55b

View File

@@ -3,6 +3,7 @@ use async_std::sync::{Arc, Mutex};
use async_executor::Executor;
use async_trait::async_trait;
use log::debug;
use url::Url;
use crate::{net, Result};
@@ -15,6 +16,7 @@ pub struct ProtocolRaft {
msg_sub: net::MessageSubscription<NetMsg>,
p2p: net::P2pPtr,
msgs: Arc<Mutex<Vec<u64>>>,
channel_address: Url,
}
impl ProtocolRaft {
@@ -29,6 +31,7 @@ impl ProtocolRaft {
message_subsytem.add_dispatch::<NetMsg>().await;
let msg_sub = channel.subscribe_msg::<NetMsg>().await.expect("Missing NetMsg dispatcher!");
let channel_address = channel.address();
Arc::new(Self {
id,
@@ -37,11 +40,13 @@ impl ProtocolRaft {
jobsman: net::ProtocolJobsManager::new("ProtocolRaft", channel),
p2p,
msgs,
channel_address,
})
}
async fn handle_receive_msg(self: Arc<Self>) -> Result<()> {
debug!(target: "raft", "ProtocolRaft::handle_receive_msg() [START]");
let exclude_list = vec![self.channel_address.clone()];
loop {
let msg = self.msg_sub.receive().await?;
@@ -58,7 +63,7 @@ impl ProtocolRaft {
self.msgs.lock().await.push(msg.id);
let msg = (*msg).clone();
self.p2p.broadcast(msg.clone()).await?;
self.p2p.broadcast_with_exclude(msg.clone(), &exclude_list).await?;
match (self.id.clone(), msg.recipient_id.clone()) {
// check if the ids are equal when both