diff --git a/src/consensus/proto/protocol_participant.rs b/src/consensus/proto/protocol_participant.rs index a896f2200..f35303788 100644 --- a/src/consensus/proto/protocol_participant.rs +++ b/src/consensus/proto/protocol_participant.rs @@ -1,7 +1,9 @@ -use async_executor::Executor; use async_std::sync::Arc; + +use async_executor::Executor; use async_trait::async_trait; use log::{debug, error}; +use url::Url; use crate::{ consensus::{Participant, ValidatorStatePtr}, @@ -17,6 +19,7 @@ pub struct ProtocolParticipant { jobsman: ProtocolJobsManagerPtr, state: ValidatorStatePtr, p2p: P2pPtr, + channel_address: Url, } impl ProtocolParticipant { @@ -30,17 +33,20 @@ impl ProtocolParticipant { msg_subsystem.add_dispatch::().await; let participant_sub = channel.subscribe_msg::().await?; + let channel_address = channel.address(); Ok(Arc::new(Self { participant_sub, jobsman: ProtocolJobsManager::new("ParticipantProtocol", channel), state, p2p, + channel_address, })) } async fn handle_receive_participant(self: Arc) -> Result<()> { debug!("ProtocolParticipant::handle_receive_participant() [START]"); + let exclude_list = vec![self.channel_address.clone()]; loop { let participant = match self.participant_sub.receive().await { Ok(v) => v, @@ -53,8 +59,11 @@ impl ProtocolParticipant { debug!("ProtocolParticipant::handle_receive_participant() recv: {:?}", participant); let participant_copy = (*participant).clone(); + if self.state.write().await.append_participant(participant_copy.clone()) { - if let Err(e) = self.p2p.broadcast(participant_copy).await { + if let Err(e) = + self.p2p.broadcast_with_exclude(participant_copy, &exclude_list).await + { error!("ProtocolParticipant::handle_receive_participant(): p2p broadcast failed: {}", e); continue }; diff --git a/src/consensus/proto/protocol_proposal.rs b/src/consensus/proto/protocol_proposal.rs index 186416d8c..74bd33273 100644 --- a/src/consensus/proto/protocol_proposal.rs +++ b/src/consensus/proto/protocol_proposal.rs @@ -1,7 +1,9 @@ -use async_executor::Executor; use async_std::sync::Arc; + +use async_executor::Executor; use async_trait::async_trait; use log::{debug, error, warn}; +use url::Url; use crate::{ consensus::{BlockProposal, ValidatorState, ValidatorStatePtr}, @@ -18,6 +20,7 @@ pub struct ProtocolProposal { jobsman: ProtocolJobsManagerPtr, state: ValidatorStatePtr, p2p: P2pPtr, + channel_address: Url, } impl ProtocolProposal { @@ -32,16 +35,21 @@ impl ProtocolProposal { let proposal_sub = channel.subscribe_msg::().await?; + let channel_address = channel.address(); + Ok(Arc::new(Self { proposal_sub, jobsman: ProtocolJobsManager::new("ProposalProtocol", channel), state, p2p, + channel_address, })) } async fn handle_receive_proposal(self: Arc) -> Result<()> { debug!("ProtocolProposal::handle_receive_proposal() [START]"); + + let exclude_list = vec![self.channel_address.clone()]; loop { let proposal = match self.proposal_sub.receive().await { Ok(v) => v, @@ -87,7 +95,7 @@ impl ProtocolProposal { } // Broadcast block to rest of nodes - if let Err(e) = self.p2p.broadcast(proposal_copy).await { + if let Err(e) = self.p2p.broadcast_with_exclude(proposal_copy, &exclude_list).await { error!("handle_receive_proposal(): proposal broadcast fail: {}", e); }; diff --git a/src/consensus/proto/protocol_sync.rs b/src/consensus/proto/protocol_sync.rs index 106de45e4..fd6f5d332 100644 --- a/src/consensus/proto/protocol_sync.rs +++ b/src/consensus/proto/protocol_sync.rs @@ -96,6 +96,7 @@ impl ProtocolSync { } debug!("handle_receive_block() [START]"); + let exclude_list = vec![self.channel.address().clone()]; loop { let info = match self.block_sub.receive().await { Ok(v) => v, @@ -165,7 +166,7 @@ impl ProtocolSync { continue }; - if let Err(e) = self.p2p.broadcast(info_copy).await { + if let Err(e) = self.p2p.broadcast_with_exclude(info_copy, &exclude_list).await { error!("handle_receive_block(): p2p broadcast fail: {}", e); *self.pending.lock().await = false; continue diff --git a/src/consensus/proto/protocol_tx.rs b/src/consensus/proto/protocol_tx.rs index 7c2463605..fa94b87c6 100644 --- a/src/consensus/proto/protocol_tx.rs +++ b/src/consensus/proto/protocol_tx.rs @@ -1,7 +1,9 @@ -use async_executor::Executor; use async_std::sync::Arc; + +use async_executor::Executor; use async_trait::async_trait; use log::{debug, error, warn}; +use url::Url; use crate::{ consensus::{ValidatorState, ValidatorStatePtr}, @@ -21,6 +23,7 @@ pub struct ProtocolTx { jobsman: ProtocolJobsManagerPtr, state: ValidatorStatePtr, p2p: P2pPtr, + channel_address: Url, } impl net::Message for Transaction { @@ -40,17 +43,20 @@ impl ProtocolTx { msg_subsystem.add_dispatch::().await; let tx_sub = channel.subscribe_msg::().await?; + let channel_address = channel.address(); Ok(Arc::new(Self { tx_sub, jobsman: ProtocolJobsManager::new("TxProtocol", channel), state, p2p, + channel_address, })) } async fn handle_receive_tx(self: Arc) -> Result<()> { debug!("ProtocolTx::handle_receive_tx() [START]"); + let exclude_list = vec![self.channel_address.clone()]; loop { let tx = match self.tx_sub.receive().await { Ok(v) => v, @@ -90,7 +96,7 @@ impl ProtocolTx { // Nodes use unconfirmed_txs vector as seen_txs pool. if self.state.write().await.append_tx(tx_copy.clone()) { - if let Err(e) = self.p2p.broadcast(tx_copy).await { + if let Err(e) = self.p2p.broadcast_with_exclude(tx_copy, &exclude_list).await { error!("handle_receive_tx(): p2p broadcast fail: {}", e); continue }; diff --git a/src/consensus/proto/protocol_vote.rs b/src/consensus/proto/protocol_vote.rs index 2e785a4a6..3c9048d0a 100644 --- a/src/consensus/proto/protocol_vote.rs +++ b/src/consensus/proto/protocol_vote.rs @@ -1,7 +1,9 @@ -use async_executor::Executor; use async_std::sync::Arc; + +use async_executor::Executor; use async_trait::async_trait; use log::{debug, error}; +use url::Url; use crate::{ consensus::{ValidatorStatePtr, Vote}, @@ -18,6 +20,7 @@ pub struct ProtocolVote { state: ValidatorStatePtr, sync_p2p: P2pPtr, consensus_p2p: P2pPtr, + channel_address: Url, } impl ProtocolVote { @@ -32,6 +35,7 @@ impl ProtocolVote { msg_subsystem.add_dispatch::().await; let vote_sub = channel.subscribe_msg::().await?; + let channel_address = channel.address(); Ok(Arc::new(Self { vote_sub, @@ -39,11 +43,13 @@ impl ProtocolVote { state, sync_p2p, consensus_p2p, + channel_address, })) } async fn handle_receive_vote(self: Arc) -> Result<()> { debug!("ProtocolVote::handle_receive_vote() [START]"); + let exclude_list = vec![self.channel_address.clone()]; loop { let vote = match self.vote_sub.receive().await { Ok(v) => v, @@ -67,7 +73,9 @@ impl ProtocolVote { }; if voted { - if let Err(e) = self.consensus_p2p.broadcast(vote_copy).await { + if let Err(e) = + self.consensus_p2p.broadcast_with_exclude(vote_copy, &exclude_list).await + { error!("handle_receive_vote(): consensus p2p broadcast fail: {}", e); continue };