mirror of
https://github.com/darkrenaissance/darkfi.git
synced 2026-04-28 03:00:18 -04:00
consensus/proto: Gracefully handle errors in loops.
This commit is contained in:
@@ -1,7 +1,3 @@
|
||||
// TODO: FIXME: Handle ? in these modules' loops. There should be no
|
||||
// uncaught and unhandled errors that could potentially break out of
|
||||
// the loops.
|
||||
|
||||
/// Participant announce protocol
|
||||
mod protocol_participant;
|
||||
pub use protocol_participant::ProtocolParticipant;
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use async_executor::Executor;
|
||||
use async_std::sync::Arc;
|
||||
use async_trait::async_trait;
|
||||
use log::debug;
|
||||
use log::{debug, error};
|
||||
|
||||
use crate::{
|
||||
consensus::{Participant, ValidatorStatePtr},
|
||||
@@ -42,13 +42,25 @@ impl ProtocolParticipant {
|
||||
async fn handle_receive_participant(self: Arc<Self>) -> Result<()> {
|
||||
debug!("ProtocolParticipant::handle_receive_participant() [START]");
|
||||
loop {
|
||||
let participant = self.participant_sub.receive().await?;
|
||||
let participant = match self.participant_sub.receive().await {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
error!("ProtocolParticipant::handle_receive_participant(): recv error: {}", e);
|
||||
continue
|
||||
}
|
||||
};
|
||||
|
||||
debug!("ProtocolParticipant::handle_receive_participant() recv: {:?}", participant);
|
||||
|
||||
let participant_copy = (*participant).clone();
|
||||
if self.state.write().await.append_participant(participant_copy.clone()) {
|
||||
self.p2p.broadcast(participant_copy).await?;
|
||||
match self.p2p.broadcast(participant_copy).await {
|
||||
Ok(()) => {}
|
||||
Err(e) => {
|
||||
error!("ProtocolParticipant::handle_receive_participant(): p2p broadcast failed: {}", e);
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use async_executor::Executor;
|
||||
use async_std::sync::Arc;
|
||||
use async_trait::async_trait;
|
||||
use log::{debug, warn};
|
||||
use log::{debug, error, warn};
|
||||
|
||||
use crate::{
|
||||
consensus::{BlockProposal, ValidatorState, ValidatorStatePtr},
|
||||
@@ -43,7 +43,13 @@ impl ProtocolProposal {
|
||||
async fn handle_receive_proposal(self: Arc<Self>) -> Result<()> {
|
||||
debug!("ProtocolProposal::handle_receive_proposal() [START]");
|
||||
loop {
|
||||
let proposal = self.proposal_sub.receive().await?;
|
||||
let proposal = match self.proposal_sub.receive().await {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
error!("ProtocolProposal::handle_receive_proposal(): recv fail: {}", e);
|
||||
continue
|
||||
}
|
||||
};
|
||||
|
||||
debug!("ProtocolProposal::handle_receive_proposal() recv: {:?}", proposal);
|
||||
|
||||
@@ -69,11 +75,27 @@ impl ProtocolProposal {
|
||||
debug!("Node did not vote for the proposed block.");
|
||||
} else {
|
||||
let vote = v.unwrap();
|
||||
self.state.write().await.receive_vote(&vote)?;
|
||||
match self.state.write().await.receive_vote(&vote) {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
error!("receive_vote() error: {}", e);
|
||||
continue
|
||||
}
|
||||
};
|
||||
// Broadcast block to rest of nodes
|
||||
self.p2p.broadcast(proposal_copy).await?;
|
||||
match self.p2p.broadcast(proposal_copy).await {
|
||||
Ok(()) => {}
|
||||
Err(e) => {
|
||||
error!("handle_receive_proposal(): proposal broadcast fail: {}", e);
|
||||
}
|
||||
};
|
||||
// Broadcast vote
|
||||
self.p2p.broadcast(vote).await?;
|
||||
match self.p2p.broadcast(vote).await {
|
||||
Ok(()) => {}
|
||||
Err(e) => {
|
||||
error!("handle_receive_proposal(): vote broadcast fail: {}", e);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
|
||||
@@ -57,24 +57,47 @@ impl ProtocolSync {
|
||||
async fn handle_receive_request(self: Arc<Self>) -> Result<()> {
|
||||
debug!("ProtocolSync::handle_receive_request() [START]");
|
||||
loop {
|
||||
let order = self.request_sub.receive().await?;
|
||||
let order = match self.request_sub.receive().await {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
error!("ProtocolSync::handle_receive_request(): recv fail: {}", e);
|
||||
continue
|
||||
}
|
||||
};
|
||||
|
||||
debug!("ProtocolSync::handle_receive_request() received {:?}", order);
|
||||
|
||||
// Extra validations can be added here
|
||||
let key = order.sl;
|
||||
let blocks = self.state.read().await.blockchain.get_blocks_after(key, BATCH)?;
|
||||
let blocks = match self.state.read().await.blockchain.get_blocks_after(key, BATCH) {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
error!("ProtocolSync::handle_receive_request(): get_blocks_after fail: {}", e);
|
||||
continue
|
||||
}
|
||||
};
|
||||
debug!("ProtocolSync::handle_receive_request(): Found {} blocks", blocks.len());
|
||||
|
||||
let response = BlockResponse { blocks };
|
||||
self.channel.send(response).await?;
|
||||
match self.channel.send(response).await {
|
||||
Ok(()) => {}
|
||||
Err(e) => {
|
||||
error!("ProtocolSync::handle_receive_request(): channel send fail: {}", e)
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_receive_block(self: Arc<Self>) -> Result<()> {
|
||||
debug!("ProtocolSync::handle_receive_block() [START]");
|
||||
loop {
|
||||
let info = self.block_sub.receive().await?;
|
||||
let info = match self.block_sub.receive().await {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
error!("ProtocolSync::handle_receive_block(): recv fail: {}", e);
|
||||
continue
|
||||
}
|
||||
};
|
||||
|
||||
debug!("ProtocolSync::handle_receive_block() received block");
|
||||
|
||||
@@ -85,7 +108,16 @@ impl ProtocolSync {
|
||||
// Extra validations can be added here.
|
||||
if !self.consensus_mode {
|
||||
let info_copy = (*info).clone();
|
||||
if !self.state.read().await.blockchain.has_block(&info_copy)? {
|
||||
|
||||
let has_block = match self.state.read().await.blockchain.has_block(&info_copy) {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
error!("handle_receive_block(): failed checking for has_block(): {}", e);
|
||||
continue
|
||||
}
|
||||
};
|
||||
|
||||
if !has_block {
|
||||
debug!("handle_receive_block(): Starting state transition validation");
|
||||
let canon_state_clone =
|
||||
self.state.read().await.state_machine.lock().await.clone();
|
||||
@@ -104,16 +136,35 @@ impl ProtocolSync {
|
||||
match self.state.write().await.update_canon_state(state_updates, None).await {
|
||||
Ok(()) => {}
|
||||
Err(e) => {
|
||||
error!("Failed updating canon state machine: {}", e);
|
||||
error!("handle_receive_block(): Canon statemachine update fail: {}", e);
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
debug!("ProtocolSync::handle_receive_block(): Appending block to ledger");
|
||||
self.state.write().await.blockchain.add(&[info_copy.clone()])?;
|
||||
match self.state.write().await.blockchain.add(&[info_copy.clone()]) {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
error!("handle_receive_block(): blockchain.add() fail: {}", e);
|
||||
continue
|
||||
}
|
||||
};
|
||||
|
||||
self.state.write().await.remove_txs(info_copy.txs.clone())?;
|
||||
self.p2p.broadcast(info_copy).await?;
|
||||
match self.state.write().await.remove_txs(info_copy.txs.clone()) {
|
||||
Ok(()) => {}
|
||||
Err(e) => {
|
||||
error!("handle_receive_block(): remove_txs() fail: {}", e);
|
||||
continue
|
||||
}
|
||||
};
|
||||
|
||||
match self.p2p.broadcast(info_copy).await {
|
||||
Ok(()) => {}
|
||||
Err(e) => {
|
||||
error!("handle_receive_block(): p2p broadcast fail: {}", e);
|
||||
continue
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use async_executor::Executor;
|
||||
use async_std::sync::Arc;
|
||||
use async_trait::async_trait;
|
||||
use log::debug;
|
||||
use log::{debug, error};
|
||||
|
||||
use crate::{
|
||||
consensus::{
|
||||
@@ -44,14 +44,28 @@ impl ProtocolSyncConsensus {
|
||||
async fn handle_receive_request(self: Arc<Self>) -> Result<()> {
|
||||
debug!("ProtocolSyncConsensus::handle_receive_request() [START]");
|
||||
loop {
|
||||
let order = self.request_sub.receive().await?;
|
||||
let order = match self.request_sub.receive().await {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
error!("ProtocolSyncConsensus::handle_receive_request() recv fail: {}", e);
|
||||
continue
|
||||
}
|
||||
};
|
||||
|
||||
debug!("ProtocolSyncConsensuss::handle_receive_request() received {:?}", order);
|
||||
|
||||
// Extra validations can be added here.
|
||||
let consensus = self.state.read().await.consensus.clone();
|
||||
let response = ConsensusResponse { consensus };
|
||||
self.channel.send(response).await?;
|
||||
match self.channel.send(response).await {
|
||||
Ok(()) => {}
|
||||
Err(e) => {
|
||||
error!(
|
||||
"ProtocolSyncConsensus::handle_receive_request() channel send fail: {}",
|
||||
e
|
||||
);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use async_executor::Executor;
|
||||
use async_std::sync::Arc;
|
||||
use async_trait::async_trait;
|
||||
use log::{debug, warn};
|
||||
use log::{debug, error, warn};
|
||||
|
||||
use crate::{
|
||||
consensus::{Tx, ValidatorState, ValidatorStatePtr},
|
||||
@@ -43,7 +43,13 @@ impl ProtocolTx {
|
||||
async fn handle_receive_tx(self: Arc<Self>) -> Result<()> {
|
||||
debug!("ProtocolTx::handle_receive_tx() [START]");
|
||||
loop {
|
||||
let tx = self.tx_sub.receive().await?;
|
||||
let tx = match self.tx_sub.receive().await {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
error!("ProtocolTx::handle_receive_tx(): recv fail: {}", e);
|
||||
continue
|
||||
}
|
||||
};
|
||||
|
||||
debug!("ProtocolTx::handle_receive_tx() recv: {:?}", tx);
|
||||
|
||||
@@ -62,7 +68,13 @@ impl ProtocolTx {
|
||||
|
||||
// Nodes use unconfirmed_txs vector as seen_txs pool.
|
||||
if self.state.write().await.append_tx(tx_copy.clone()) {
|
||||
self.p2p.broadcast(tx_copy).await?;
|
||||
match self.p2p.broadcast(tx_copy).await {
|
||||
Ok(()) => {}
|
||||
Err(e) => {
|
||||
error!("handle_receive_tx(): p2p broadcast fail: {}", e);
|
||||
continue
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use async_executor::Executor;
|
||||
use async_std::sync::Arc;
|
||||
use async_trait::async_trait;
|
||||
use log::debug;
|
||||
use log::{debug, error};
|
||||
|
||||
use crate::{
|
||||
consensus::{ValidatorStatePtr, Vote},
|
||||
@@ -45,21 +45,47 @@ impl ProtocolVote {
|
||||
async fn handle_receive_vote(self: Arc<Self>) -> Result<()> {
|
||||
debug!("ProtocolVote::handle_receive_vote() [START]");
|
||||
loop {
|
||||
let vote = self.vote_sub.receive().await?;
|
||||
let vote = match self.vote_sub.receive().await {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
error!("ProtocolVote::handle_receive_vote(): recv fail: {}", e);
|
||||
continue
|
||||
}
|
||||
};
|
||||
|
||||
debug!("ProtocolVote::handle_receive_vote() recv: {:?}", vote);
|
||||
|
||||
let vote_copy = (*vote).clone();
|
||||
|
||||
let (voted, to_broadcast) = self.state.write().await.receive_vote(&vote_copy)?;
|
||||
let (voted, to_broadcast) = match self.state.write().await.receive_vote(&vote_copy) {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
error!("handle_receive_vote(): receive_vote() fail: {}", e);
|
||||
continue
|
||||
}
|
||||
};
|
||||
|
||||
if voted {
|
||||
self.consensus_p2p.broadcast(vote_copy).await?;
|
||||
match self.consensus_p2p.broadcast(vote_copy).await {
|
||||
Ok(()) => {}
|
||||
Err(e) => {
|
||||
error!("handle_receive_vote(): consensus p2p broadcast fail: {}", e);
|
||||
continue
|
||||
}
|
||||
};
|
||||
|
||||
// Broadcast finalized blocks info, if any
|
||||
match to_broadcast {
|
||||
Some(blocks) => {
|
||||
debug!("handle_receive_vote(): Broadcasting finalized blocks");
|
||||
for info in blocks {
|
||||
self.sync_p2p.broadcast(info).await?;
|
||||
match self.sync_p2p.broadcast(info).await {
|
||||
Ok(()) => {}
|
||||
Err(e) => {
|
||||
error!("handle_receive_vote(): sync p2p broadcast fail: {}", e);
|
||||
continue
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
None => {
|
||||
|
||||
Reference in New Issue
Block a user