From 36e9faf2f702eab28e687e1eab9aa73b52aa2790 Mon Sep 17 00:00:00 2001 From: ghassmo Date: Fri, 1 Jul 2022 09:58:19 +0300 Subject: [PATCH] raft: major fix in protocol_raft --- src/raft/protocol_raft.rs | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/src/raft/protocol_raft.rs b/src/raft/protocol_raft.rs index 70ef60e0e..36eeb593d 100644 --- a/src/raft/protocol_raft.rs +++ b/src/raft/protocol_raft.rs @@ -3,7 +3,6 @@ use async_std::sync::{Arc, Mutex}; use async_executor::Executor; use async_trait::async_trait; use log::debug; -use url::Url; use crate::{net, Result}; @@ -16,7 +15,6 @@ pub struct ProtocolRaft { msg_sub: net::MessageSubscription, p2p: net::P2pPtr, msgs: Arc>>, - channel_address: Url, } impl ProtocolRaft { @@ -31,7 +29,6 @@ impl ProtocolRaft { message_subsytem.add_dispatch::().await; let msg_sub = channel.subscribe_msg::().await.expect("Missing NetMsg dispatcher!"); - let channel_address = channel.address(); Arc::new(Self { id, @@ -40,13 +37,11 @@ impl ProtocolRaft { jobsman: net::ProtocolJobsManager::new("ProtocolRaft", channel), p2p, msgs, - channel_address, }) } async fn handle_receive_msg(self: Arc) -> Result<()> { debug!(target: "raft", "ProtocolRaft::handle_receive_msg() [START]"); - let exclude_list = vec![self.channel_address.clone()]; loop { let msg = self.msg_sub.receive().await?; @@ -56,14 +51,16 @@ impl ProtocolRaft { &msg.id, &msg.method ); - if self.msgs.lock().await.contains(&msg.id) { - continue + { + let mut msgs = self.msgs.lock().await; + if msgs.contains(&msg.id) { + continue + } + msgs.push(msg.id); } - self.msgs.lock().await.push(msg.id); - let msg = (*msg).clone(); - self.p2p.broadcast_with_exclude(msg.clone(), &exclude_list).await?; + self.p2p.broadcast(msg.clone()).await?; match (self.id.clone(), msg.recipient_id.clone()) { // check if the ids are equal when both