From d54402ebcd11fb106964e6f0e4716fc42956bf0a Mon Sep 17 00:00:00 2001 From: parazyd Date: Sat, 30 Apr 2022 09:39:16 +0200 Subject: [PATCH] consensus/proto: Gracefully handle errors in loops. --- src/consensus/proto/mod.rs | 4 -- src/consensus/proto/protocol_participant.rs | 18 ++++- src/consensus/proto/protocol_proposal.rs | 32 +++++++-- src/consensus/proto/protocol_sync.rs | 69 ++++++++++++++++--- .../proto/protocol_sync_consensus.rs | 20 +++++- src/consensus/proto/protocol_tx.rs | 18 ++++- src/consensus/proto/protocol_vote.rs | 36 ++++++++-- 7 files changed, 165 insertions(+), 32 deletions(-) diff --git a/src/consensus/proto/mod.rs b/src/consensus/proto/mod.rs index 100950e79..7738faaf7 100644 --- a/src/consensus/proto/mod.rs +++ b/src/consensus/proto/mod.rs @@ -1,7 +1,3 @@ -// TODO: FIXME: Handle ? in these modules' loops. There should be no -// uncaught and unhandled errors that could potentially break out of -// the loops. - /// Participant announce protocol mod protocol_participant; pub use protocol_participant::ProtocolParticipant; diff --git a/src/consensus/proto/protocol_participant.rs b/src/consensus/proto/protocol_participant.rs index 0d66fe2dc..c95ec5cbe 100644 --- a/src/consensus/proto/protocol_participant.rs +++ b/src/consensus/proto/protocol_participant.rs @@ -1,7 +1,7 @@ use async_executor::Executor; use async_std::sync::Arc; use async_trait::async_trait; -use log::debug; +use log::{debug, error}; use crate::{ consensus::{Participant, ValidatorStatePtr}, @@ -42,13 +42,25 @@ impl ProtocolParticipant { async fn handle_receive_participant(self: Arc) -> Result<()> { debug!("ProtocolParticipant::handle_receive_participant() [START]"); loop { - let participant = self.participant_sub.receive().await?; + let participant = match self.participant_sub.receive().await { + Ok(v) => v, + Err(e) => { + error!("ProtocolParticipant::handle_receive_participant(): recv error: {}", e); + continue + } + }; debug!("ProtocolParticipant::handle_receive_participant() recv: {:?}", participant); let participant_copy = (*participant).clone(); if self.state.write().await.append_participant(participant_copy.clone()) { - self.p2p.broadcast(participant_copy).await?; + match self.p2p.broadcast(participant_copy).await { + Ok(()) => {} + Err(e) => { + error!("ProtocolParticipant::handle_receive_participant(): p2p broadcast failed: {}", e); + continue + } + } } } } diff --git a/src/consensus/proto/protocol_proposal.rs b/src/consensus/proto/protocol_proposal.rs index 9322f059c..e69f87c3a 100644 --- a/src/consensus/proto/protocol_proposal.rs +++ b/src/consensus/proto/protocol_proposal.rs @@ -1,7 +1,7 @@ use async_executor::Executor; use async_std::sync::Arc; use async_trait::async_trait; -use log::{debug, warn}; +use log::{debug, error, warn}; use crate::{ consensus::{BlockProposal, ValidatorState, ValidatorStatePtr}, @@ -43,7 +43,13 @@ impl ProtocolProposal { async fn handle_receive_proposal(self: Arc) -> Result<()> { debug!("ProtocolProposal::handle_receive_proposal() [START]"); loop { - let proposal = self.proposal_sub.receive().await?; + let proposal = match self.proposal_sub.receive().await { + Ok(v) => v, + Err(e) => { + error!("ProtocolProposal::handle_receive_proposal(): recv fail: {}", e); + continue + } + }; debug!("ProtocolProposal::handle_receive_proposal() recv: {:?}", proposal); @@ -69,11 +75,27 @@ impl ProtocolProposal { debug!("Node did not vote for the proposed block."); } else { let vote = v.unwrap(); - self.state.write().await.receive_vote(&vote)?; + match self.state.write().await.receive_vote(&vote) { + Ok(_) => {} + Err(e) => { + error!("receive_vote() error: {}", e); + continue + } + }; // Broadcast block to rest of nodes - self.p2p.broadcast(proposal_copy).await?; + match self.p2p.broadcast(proposal_copy).await { + Ok(()) => {} + Err(e) => { + error!("handle_receive_proposal(): proposal broadcast fail: {}", e); + } + }; // Broadcast vote - self.p2p.broadcast(vote).await?; + match self.p2p.broadcast(vote).await { + Ok(()) => {} + Err(e) => { + error!("handle_receive_proposal(): vote broadcast fail: {}", e); + } + }; } } Err(e) => { diff --git a/src/consensus/proto/protocol_sync.rs b/src/consensus/proto/protocol_sync.rs index e45b09334..d69059228 100644 --- a/src/consensus/proto/protocol_sync.rs +++ b/src/consensus/proto/protocol_sync.rs @@ -57,24 +57,47 @@ impl ProtocolSync { async fn handle_receive_request(self: Arc) -> Result<()> { debug!("ProtocolSync::handle_receive_request() [START]"); loop { - let order = self.request_sub.receive().await?; + let order = match self.request_sub.receive().await { + Ok(v) => v, + Err(e) => { + error!("ProtocolSync::handle_receive_request(): recv fail: {}", e); + continue + } + }; debug!("ProtocolSync::handle_receive_request() received {:?}", order); // Extra validations can be added here let key = order.sl; - let blocks = self.state.read().await.blockchain.get_blocks_after(key, BATCH)?; + let blocks = match self.state.read().await.blockchain.get_blocks_after(key, BATCH) { + Ok(v) => v, + Err(e) => { + error!("ProtocolSync::handle_receive_request(): get_blocks_after fail: {}", e); + continue + } + }; debug!("ProtocolSync::handle_receive_request(): Found {} blocks", blocks.len()); let response = BlockResponse { blocks }; - self.channel.send(response).await?; + match self.channel.send(response).await { + Ok(()) => {} + Err(e) => { + error!("ProtocolSync::handle_receive_request(): channel send fail: {}", e) + } + }; } } async fn handle_receive_block(self: Arc) -> Result<()> { debug!("ProtocolSync::handle_receive_block() [START]"); loop { - let info = self.block_sub.receive().await?; + let info = match self.block_sub.receive().await { + Ok(v) => v, + Err(e) => { + error!("ProtocolSync::handle_receive_block(): recv fail: {}", e); + continue + } + }; debug!("ProtocolSync::handle_receive_block() received block"); @@ -85,7 +108,16 @@ impl ProtocolSync { // Extra validations can be added here. if !self.consensus_mode { let info_copy = (*info).clone(); - if !self.state.read().await.blockchain.has_block(&info_copy)? { + + let has_block = match self.state.read().await.blockchain.has_block(&info_copy) { + Ok(v) => v, + Err(e) => { + error!("handle_receive_block(): failed checking for has_block(): {}", e); + continue + } + }; + + if !has_block { debug!("handle_receive_block(): Starting state transition validation"); let canon_state_clone = self.state.read().await.state_machine.lock().await.clone(); @@ -104,16 +136,35 @@ impl ProtocolSync { match self.state.write().await.update_canon_state(state_updates, None).await { Ok(()) => {} Err(e) => { - error!("Failed updating canon state machine: {}", e); + error!("handle_receive_block(): Canon statemachine update fail: {}", e); continue } } debug!("ProtocolSync::handle_receive_block(): Appending block to ledger"); - self.state.write().await.blockchain.add(&[info_copy.clone()])?; + match self.state.write().await.blockchain.add(&[info_copy.clone()]) { + Ok(_) => {} + Err(e) => { + error!("handle_receive_block(): blockchain.add() fail: {}", e); + continue + } + }; - self.state.write().await.remove_txs(info_copy.txs.clone())?; - self.p2p.broadcast(info_copy).await?; + match self.state.write().await.remove_txs(info_copy.txs.clone()) { + Ok(()) => {} + Err(e) => { + error!("handle_receive_block(): remove_txs() fail: {}", e); + continue + } + }; + + match self.p2p.broadcast(info_copy).await { + Ok(()) => {} + Err(e) => { + error!("handle_receive_block(): p2p broadcast fail: {}", e); + continue + } + }; } } } diff --git a/src/consensus/proto/protocol_sync_consensus.rs b/src/consensus/proto/protocol_sync_consensus.rs index 54197e7a8..814463fcb 100644 --- a/src/consensus/proto/protocol_sync_consensus.rs +++ b/src/consensus/proto/protocol_sync_consensus.rs @@ -1,7 +1,7 @@ use async_executor::Executor; use async_std::sync::Arc; use async_trait::async_trait; -use log::debug; +use log::{debug, error}; use crate::{ consensus::{ @@ -44,14 +44,28 @@ impl ProtocolSyncConsensus { async fn handle_receive_request(self: Arc) -> Result<()> { debug!("ProtocolSyncConsensus::handle_receive_request() [START]"); loop { - let order = self.request_sub.receive().await?; + let order = match self.request_sub.receive().await { + Ok(v) => v, + Err(e) => { + error!("ProtocolSyncConsensus::handle_receive_request() recv fail: {}", e); + continue + } + }; debug!("ProtocolSyncConsensuss::handle_receive_request() received {:?}", order); // Extra validations can be added here. let consensus = self.state.read().await.consensus.clone(); let response = ConsensusResponse { consensus }; - self.channel.send(response).await?; + match self.channel.send(response).await { + Ok(()) => {} + Err(e) => { + error!( + "ProtocolSyncConsensus::handle_receive_request() channel send fail: {}", + e + ); + } + }; } } } diff --git a/src/consensus/proto/protocol_tx.rs b/src/consensus/proto/protocol_tx.rs index 4c06fb2d8..92b46f536 100644 --- a/src/consensus/proto/protocol_tx.rs +++ b/src/consensus/proto/protocol_tx.rs @@ -1,7 +1,7 @@ use async_executor::Executor; use async_std::sync::Arc; use async_trait::async_trait; -use log::{debug, warn}; +use log::{debug, error, warn}; use crate::{ consensus::{Tx, ValidatorState, ValidatorStatePtr}, @@ -43,7 +43,13 @@ impl ProtocolTx { async fn handle_receive_tx(self: Arc) -> Result<()> { debug!("ProtocolTx::handle_receive_tx() [START]"); loop { - let tx = self.tx_sub.receive().await?; + let tx = match self.tx_sub.receive().await { + Ok(v) => v, + Err(e) => { + error!("ProtocolTx::handle_receive_tx(): recv fail: {}", e); + continue + } + }; debug!("ProtocolTx::handle_receive_tx() recv: {:?}", tx); @@ -62,7 +68,13 @@ impl ProtocolTx { // Nodes use unconfirmed_txs vector as seen_txs pool. if self.state.write().await.append_tx(tx_copy.clone()) { - self.p2p.broadcast(tx_copy).await?; + match self.p2p.broadcast(tx_copy).await { + Ok(()) => {} + Err(e) => { + error!("handle_receive_tx(): p2p broadcast fail: {}", e); + continue + } + }; } } } diff --git a/src/consensus/proto/protocol_vote.rs b/src/consensus/proto/protocol_vote.rs index 85246bd69..47dfcc170 100644 --- a/src/consensus/proto/protocol_vote.rs +++ b/src/consensus/proto/protocol_vote.rs @@ -1,7 +1,7 @@ use async_executor::Executor; use async_std::sync::Arc; use async_trait::async_trait; -use log::debug; +use log::{debug, error}; use crate::{ consensus::{ValidatorStatePtr, Vote}, @@ -45,21 +45,47 @@ impl ProtocolVote { async fn handle_receive_vote(self: Arc) -> Result<()> { debug!("ProtocolVote::handle_receive_vote() [START]"); loop { - let vote = self.vote_sub.receive().await?; + let vote = match self.vote_sub.receive().await { + Ok(v) => v, + Err(e) => { + error!("ProtocolVote::handle_receive_vote(): recv fail: {}", e); + continue + } + }; debug!("ProtocolVote::handle_receive_vote() recv: {:?}", vote); let vote_copy = (*vote).clone(); - let (voted, to_broadcast) = self.state.write().await.receive_vote(&vote_copy)?; + let (voted, to_broadcast) = match self.state.write().await.receive_vote(&vote_copy) { + Ok(v) => v, + Err(e) => { + error!("handle_receive_vote(): receive_vote() fail: {}", e); + continue + } + }; + if voted { - self.consensus_p2p.broadcast(vote_copy).await?; + match self.consensus_p2p.broadcast(vote_copy).await { + Ok(()) => {} + Err(e) => { + error!("handle_receive_vote(): consensus p2p broadcast fail: {}", e); + continue + } + }; + // Broadcast finalized blocks info, if any match to_broadcast { Some(blocks) => { debug!("handle_receive_vote(): Broadcasting finalized blocks"); for info in blocks { - self.sync_p2p.broadcast(info).await?; + match self.sync_p2p.broadcast(info).await { + Ok(()) => {} + Err(e) => { + error!("handle_receive_vote(): sync p2p broadcast fail: {}", e); + continue + } + }; } } None => {