consensus/proto: add the node who sent the net message to the exclude list during broadcasting

This commit is contained in:
ghassmo
2022-05-11 06:36:44 +03:00
committed by parazyd
parent f6a85f0360
commit 85105ad730
5 changed files with 41 additions and 9 deletions

View File

@@ -1,7 +1,9 @@
use async_executor::Executor;
use async_std::sync::Arc;
use async_executor::Executor;
use async_trait::async_trait;
use log::{debug, error};
use url::Url;
use crate::{
consensus::{Participant, ValidatorStatePtr},
@@ -17,6 +19,7 @@ pub struct ProtocolParticipant {
jobsman: ProtocolJobsManagerPtr,
state: ValidatorStatePtr,
p2p: P2pPtr,
channel_address: Url,
}
impl ProtocolParticipant {
@@ -30,17 +33,20 @@ impl ProtocolParticipant {
msg_subsystem.add_dispatch::<Participant>().await;
let participant_sub = channel.subscribe_msg::<Participant>().await?;
let channel_address = channel.address();
Ok(Arc::new(Self {
participant_sub,
jobsman: ProtocolJobsManager::new("ParticipantProtocol", channel),
state,
p2p,
channel_address,
}))
}
async fn handle_receive_participant(self: Arc<Self>) -> Result<()> {
debug!("ProtocolParticipant::handle_receive_participant() [START]");
let exclude_list = vec![self.channel_address.clone()];
loop {
let participant = match self.participant_sub.receive().await {
Ok(v) => v,
@@ -53,8 +59,11 @@ impl ProtocolParticipant {
debug!("ProtocolParticipant::handle_receive_participant() recv: {:?}", participant);
let participant_copy = (*participant).clone();
if self.state.write().await.append_participant(participant_copy.clone()) {
if let Err(e) = self.p2p.broadcast(participant_copy).await {
if let Err(e) =
self.p2p.broadcast_with_exclude(participant_copy, &exclude_list).await
{
error!("ProtocolParticipant::handle_receive_participant(): p2p broadcast failed: {}", e);
continue
};

View File

@@ -1,7 +1,9 @@
use async_executor::Executor;
use async_std::sync::Arc;
use async_executor::Executor;
use async_trait::async_trait;
use log::{debug, error, warn};
use url::Url;
use crate::{
consensus::{BlockProposal, ValidatorState, ValidatorStatePtr},
@@ -18,6 +20,7 @@ pub struct ProtocolProposal {
jobsman: ProtocolJobsManagerPtr,
state: ValidatorStatePtr,
p2p: P2pPtr,
channel_address: Url,
}
impl ProtocolProposal {
@@ -32,16 +35,21 @@ impl ProtocolProposal {
let proposal_sub = channel.subscribe_msg::<BlockProposal>().await?;
let channel_address = channel.address();
Ok(Arc::new(Self {
proposal_sub,
jobsman: ProtocolJobsManager::new("ProposalProtocol", channel),
state,
p2p,
channel_address,
}))
}
async fn handle_receive_proposal(self: Arc<Self>) -> Result<()> {
debug!("ProtocolProposal::handle_receive_proposal() [START]");
let exclude_list = vec![self.channel_address.clone()];
loop {
let proposal = match self.proposal_sub.receive().await {
Ok(v) => v,
@@ -87,7 +95,7 @@ impl ProtocolProposal {
}
// Broadcast block to rest of nodes
if let Err(e) = self.p2p.broadcast(proposal_copy).await {
if let Err(e) = self.p2p.broadcast_with_exclude(proposal_copy, &exclude_list).await {
error!("handle_receive_proposal(): proposal broadcast fail: {}", e);
};

View File

@@ -96,6 +96,7 @@ impl ProtocolSync {
}
debug!("handle_receive_block() [START]");
let exclude_list = vec![self.channel.address().clone()];
loop {
let info = match self.block_sub.receive().await {
Ok(v) => v,
@@ -165,7 +166,7 @@ impl ProtocolSync {
continue
};
if let Err(e) = self.p2p.broadcast(info_copy).await {
if let Err(e) = self.p2p.broadcast_with_exclude(info_copy, &exclude_list).await {
error!("handle_receive_block(): p2p broadcast fail: {}", e);
*self.pending.lock().await = false;
continue

View File

@@ -1,7 +1,9 @@
use async_executor::Executor;
use async_std::sync::Arc;
use async_executor::Executor;
use async_trait::async_trait;
use log::{debug, error, warn};
use url::Url;
use crate::{
consensus::{ValidatorState, ValidatorStatePtr},
@@ -21,6 +23,7 @@ pub struct ProtocolTx {
jobsman: ProtocolJobsManagerPtr,
state: ValidatorStatePtr,
p2p: P2pPtr,
channel_address: Url,
}
impl net::Message for Transaction {
@@ -40,17 +43,20 @@ impl ProtocolTx {
msg_subsystem.add_dispatch::<Transaction>().await;
let tx_sub = channel.subscribe_msg::<Transaction>().await?;
let channel_address = channel.address();
Ok(Arc::new(Self {
tx_sub,
jobsman: ProtocolJobsManager::new("TxProtocol", channel),
state,
p2p,
channel_address,
}))
}
async fn handle_receive_tx(self: Arc<Self>) -> Result<()> {
debug!("ProtocolTx::handle_receive_tx() [START]");
let exclude_list = vec![self.channel_address.clone()];
loop {
let tx = match self.tx_sub.receive().await {
Ok(v) => v,
@@ -90,7 +96,7 @@ impl ProtocolTx {
// Nodes use unconfirmed_txs vector as seen_txs pool.
if self.state.write().await.append_tx(tx_copy.clone()) {
if let Err(e) = self.p2p.broadcast(tx_copy).await {
if let Err(e) = self.p2p.broadcast_with_exclude(tx_copy, &exclude_list).await {
error!("handle_receive_tx(): p2p broadcast fail: {}", e);
continue
};

View File

@@ -1,7 +1,9 @@
use async_executor::Executor;
use async_std::sync::Arc;
use async_executor::Executor;
use async_trait::async_trait;
use log::{debug, error};
use url::Url;
use crate::{
consensus::{ValidatorStatePtr, Vote},
@@ -18,6 +20,7 @@ pub struct ProtocolVote {
state: ValidatorStatePtr,
sync_p2p: P2pPtr,
consensus_p2p: P2pPtr,
channel_address: Url,
}
impl ProtocolVote {
@@ -32,6 +35,7 @@ impl ProtocolVote {
msg_subsystem.add_dispatch::<Vote>().await;
let vote_sub = channel.subscribe_msg::<Vote>().await?;
let channel_address = channel.address();
Ok(Arc::new(Self {
vote_sub,
@@ -39,11 +43,13 @@ impl ProtocolVote {
state,
sync_p2p,
consensus_p2p,
channel_address,
}))
}
async fn handle_receive_vote(self: Arc<Self>) -> Result<()> {
debug!("ProtocolVote::handle_receive_vote() [START]");
let exclude_list = vec![self.channel_address.clone()];
loop {
let vote = match self.vote_sub.receive().await {
Ok(v) => v,
@@ -67,7 +73,9 @@ impl ProtocolVote {
};
if voted {
if let Err(e) = self.consensus_p2p.broadcast(vote_copy).await {
if let Err(e) =
self.consensus_p2p.broadcast_with_exclude(vote_copy, &exclude_list).await
{
error!("handle_receive_vote(): consensus p2p broadcast fail: {}", e);
continue
};