diff --git a/src/consensus/proto/protocol_proposal.rs b/src/consensus/proto/protocol_proposal.rs index 32ac4db58..db7105dbd 100644 --- a/src/consensus/proto/protocol_proposal.rs +++ b/src/consensus/proto/protocol_proposal.rs @@ -63,27 +63,40 @@ impl ProtocolProposal { } async fn handle_receive_proposal(self: Arc) -> Result<()> { - debug!(target: "consensus::protocol_proposal::handle_receive_proposal()", "ProtocolProposal::handle_receive_proposal() [START]"); + debug!(target: "consensus::protocol_proposal::handle_receive_proposal()", "START"); let exclude_list = vec![self.channel_address.clone()]; loop { let proposal = match self.proposal_sub.receive().await { Ok(v) => v, Err(e) => { - debug!(target: "consensus::protocol_proposal::handle_receive_proposal()", "ProtocolProposal::handle_receive_proposal(): recv fail: {}", e); + debug!( + target: "consensus::protocol_proposal::handle_receive_proposal()", + "recv fail: {}", + e + ); continue } }; - debug!(target: "consensus::protocol_proposal::handle_receive_proposal()", "ProtocolProposal::handle_receive_proposal(): recv: {}", proposal); - trace!(target: "consensus::protocol_proposal::handle_receive_proposal()", "ProtocolProposal::handle_receive_proposal(): Full proposal: {:?}", proposal); + debug!( + target: "consensus::protocol_proposal::handle_receive_proposal()", + "recv: {}", proposal); + trace!( + target: "consensus::protocol_proposal::handle_receive_proposal()", + "Full proposal: {:?}", + proposal + ); let proposal_copy = (*proposal).clone(); // Verify we have the proposal already let mut lock = self.state.write().await; if lock.consensus.proposal_exists(&proposal_copy.hash) { - debug!(target: "consensus::protocol_proposal::handle_receive_proposal()", "ProtocolProposal::handle_receive_proposal(): Proposal already received."); + debug!( + target: "consensus::protocol_proposal::handle_receive_proposal()", + "Proposal already received." + ); continue } @@ -96,7 +109,7 @@ impl ProtocolProposal { { error!( target: "consensus::protocol_proposal::handle_receive_proposal()", - "ProtocolProposal::handle_receive_proposal(): proposal broadcast fail: {}", + "proposal broadcast fail: {}", e ); }; @@ -105,7 +118,7 @@ impl ProtocolProposal { Err(e) => { error!( target: "consensus::protocol_proposal::handle_receive_proposal()", - "ProtocolProposal::handle_receive_proposal(): receive_proposal error: {}", + "receive_proposal error: {}", e ); continue @@ -118,10 +131,10 @@ impl ProtocolProposal { #[async_trait] impl ProtocolBase for ProtocolProposal { async fn start(self: Arc, executor: Arc>) -> Result<()> { - debug!(target: "consensus::protocol_proposal::start()", "ProtocolProposal::start() [START]"); + debug!(target: "consensus::protocol_proposal::start()", "START"); self.jobsman.clone().start(executor.clone()); self.jobsman.clone().spawn(self.clone().handle_receive_proposal(), executor.clone()).await; - debug!(target: "consensus::protocol_proposal::start()", "ProtocolProposal::start() [END]"); + debug!(target: "consensus::protocol_proposal::start()", "END"); Ok(()) } diff --git a/src/consensus/proto/protocol_sync.rs b/src/consensus/proto/protocol_sync.rs index 90356c4a8..0a0d33877 100644 --- a/src/consensus/proto/protocol_sync.rs +++ b/src/consensus/proto/protocol_sync.rs @@ -81,44 +81,71 @@ impl ProtocolSync { } async fn handle_receive_request(self: Arc) -> Result<()> { - debug!(target: "consensus::protocol_sync::handle_receive_request()", "ProtocolSync::handle_receive_request() [START]"); + debug!( + target: "consensus::protocol_sync::handle_receive_request()", + "START" + ); loop { let order = match self.request_sub.receive().await { Ok(v) => v, Err(e) => { - debug!(target: "consensus::protocol_sync::handle_receive_request()", "ProtocolSync::handle_receive_request(): recv fail: {}", e); + debug!( + target: "consensus::protocol_sync::handle_receive_request()", + "recv fail: {}", + e + ); continue } }; - debug!(target: "consensus::protocol_sync::handle_receive_request()", "ProtocolSync::handle_receive_request() received {:?}", order); + debug!( + target: "consensus::protocol_sync::handle_receive_request()", + "received {:?}", + order + ); // Extra validations can be added here let key = order.slot; let blocks = match self.state.read().await.blockchain.get_blocks_after(key, BATCH) { Ok(v) => v, Err(e) => { - error!(target: "consensus::protocol_sync::handle_receive_request()", "ProtocolSync::handle_receive_request(): get_blocks_after fail: {}", e); + error!( + target: "consensus::protocol_sync::handle_receive_request()", + "get_blocks_after fail: {}", + e + ); continue } }; - debug!(target: "consensus::protocol_sync::handle_receive_request()", "ProtocolSync::handle_receive_request(): Found {} blocks", blocks.len()); + debug!( + target: "consensus::protocol_sync::handle_receive_request()", + "Found {} blocks", + blocks.len() + ); let response = BlockResponse { blocks }; if let Err(e) = self.channel.send(response).await { - error!(target: "consensus::protocol_sync::handle_receive_request()", "ProtocolSync::handle_receive_request(): channel send fail: {}", e) + error!( + target: "consensus::protocol_sync::handle_receive_request()", + "channel send fail: {}", + e + ) }; } } async fn handle_receive_block(self: Arc) -> Result<()> { - debug!(target: "consensus::protocol_sync::handle_receive_block()", "ProtocolSync::handle_receive_block() [START]"); + debug!(target: "consensus::protocol_sync::handle_receive_block()", "START"); let exclude_list = vec![self.channel.address()]; loop { let info = match self.block_sub.receive().await { Ok(v) => v, Err(e) => { - debug!(target: "consensus::protocol_sync::handle_receive_block()", "ProtocolSync::handle_receive_block(): recv fail: {}", e); + debug!( + target: "consensus::protocol_sync::handle_receive_block()", + "recv fail: {}", + e + ); continue } }; @@ -136,55 +163,76 @@ impl ProtocolSync { if current >= slot { debug!( target: "consensus::protocol_sync::handle_receive_block()", - "ProtocolSync::handle_receive_block(): node runs in consensus mode, skipping..." + "node runs in consensus mode, skipping..." ); continue } } } - info!(target: "consensus::protocol_sync::handle_receive_block()", "ProtocolSync::handle_receive_block(): Received block: {}", info.blockhash()); + info!( + target: "consensus::protocol_sync::handle_receive_block()", + "Received block: {}", + info.blockhash() + ); - debug!(target: "consensus::protocol_sync::handle_receive_block()", "ProtocolSync::handle_receive_block(): Processing received block"); + debug!( + target: "consensus::protocol_sync::handle_receive_block()", + "Processing received block" + ); let info_copy = (*info).clone(); match self.state.write().await.receive_finalized_block(info_copy.clone()).await { Ok(v) => { if v { - debug!(target: "consensus::protocol_sync::handle_receive_block()", "ProtocolProposal::handle_receive_block(): block processed successfully, broadcasting..."); + debug!( + target: "consensus::protocol_sync::handle_receive_block()", + "block processed successfully, broadcasting..." + ); if let Err(e) = self.p2p.broadcast_with_exclude(info_copy, &exclude_list).await { error!( target: "consensus::protocol_sync::handle_receive_block()", - "ProtocolSync::handle_receive_block(): p2p broadcast fail: {}", + "p2p broadcast fail: {}", e ); }; } } Err(e) => { - debug!(target: "consensus::protocol_sync::handle_receive_block()", "ProtocolSync::handle_receive_block(): error processing finalized block: {}", e); + debug!( + target: "consensus::protocol_sync::handle_receive_block()", + "error processing finalized block: {}", + e + ); } }; } } async fn handle_receive_slot_checkpoint_request(self: Arc) -> Result<()> { - debug!(target: "consensus::protocol_sync::handle_receive_slot_checkpoint_request()", "ProtocolSync::handle_receive_slot_checkpoint_request() [START]"); + debug!( + target: "consensus::protocol_sync::handle_receive_slot_checkpoint_request()", + "START" + ); loop { let request = match self.slot_checkpoin_request_sub.receive().await { Ok(v) => v, Err(e) => { debug!( target: "consensus::protocol_sync::handle_receive_slot_checkpoint_request()", - "ProtocolSync::handle_receive_slot_checkpoint_request(): recv fail: {}", + "recv fail: {}", e ); continue } }; - debug!(target: "consensus::protocol_sync::handle_receive_slot_checkpoint_request()", "ProtocolSync::handle_receive_slot_checkpoint_request() received {:?}", request); + debug!( + target: "consensus::protocol_sync::handle_receive_slot_checkpoint_request()", + "received {:?}", + request + ); // Extra validations can be added here let key = request.slot; @@ -197,13 +245,17 @@ impl ProtocolSync { { Ok(v) => v, Err(e) => { - error!(target: "consensus::protocol_sync::handle_receive_slot_checkpoint_request()", "ProtocolSync::handle_receive_slot_checkpoint_request(): get_slot_checkpoints_after fail: {}", e); + error!( + target: "consensus::protocol_sync::handle_receive_slot_checkpoint_request()", + "get_slot_checkpoints_after fail: {}", + e + ); continue } }; debug!( target: "consensus::protocol_sync::handle_receive_slot_checkpoint_request()", - "ProtocolSync::handle_receive_slot_checkpoint_request(): Found {} slot checkpoints", + "Found {} slot checkpoints", slot_checkpoints.len() ); @@ -211,7 +263,7 @@ impl ProtocolSync { if let Err(e) = self.channel.send(response).await { error!( target: "consensus::protocol_sync::handle_receive_slot_checkpoint_request()", - "ProtocolSync::handle_receive_slot_checkpoint_request(): channel send fail: {}", + "channel send fail: {}", e ) }; @@ -219,13 +271,20 @@ impl ProtocolSync { } async fn handle_receive_slot_checkpoint(self: Arc) -> Result<()> { - debug!(target: "consensus::protocol_sync::handle_receive_slot_checkpoint()", "ProtocolSync::handle_receive_slot_checkpoint() [START]"); + debug!( + target: "consensus::protocol_sync::handle_receive_slot_checkpoint()", + "START" + ); let exclude_list = vec![self.channel.address()]; loop { let slot_checkpoint = match self.slot_checkpoints_sub.receive().await { Ok(v) => v, Err(e) => { - debug!(target: "consensus::protocol_sync::handle_receive_slot_checkpoint()", "ProtocolSync::handle_receive_slot_checkpoint(): recv fail: {}", e); + debug!( + target: "consensus::protocol_sync::handle_receive_slot_checkpoint()", + "recv fail: {}", + e + ); continue } }; @@ -243,7 +302,7 @@ impl ProtocolSync { if current >= slot { debug!( target: "consensus::protocol_sync::handle_receive_slot_checkpoint()", - "ProtocolSync::handle_receive_block(): node runs in consensus mode, skipping..." + "node runs in consensus mode, skipping..." ); continue } @@ -252,11 +311,14 @@ impl ProtocolSync { info!( target: "consensus::protocol_sync::handle_receive_slot_checkpoint()", - "ProtocolSync::handle_receive_slot_checkpoint(): Received slot checkpoint: {}", + "Received slot checkpoint: {}", slot_checkpoint.slot ); - debug!(target: "consensus::protocol_sync::handle_receive_slot_checkpoint()", "ProtocolSync::handle_receive_slot_checkpoint(): Processing received slot checkpoint"); + debug!( + target: "consensus::protocol_sync::handle_receive_slot_checkpoint()", + "Processing received slot checkpoint" + ); let slot_checkpoint_copy = (*slot_checkpoint).clone(); match self .state @@ -267,7 +329,10 @@ impl ProtocolSync { { Ok(v) => { if v { - debug!(target: "consensus::protocol_sync::handle_receive_slot_checkpoint()", "ProtocolProposal::handle_receive_slot_checkpoint(): slot checkpoint processed successfully, broadcasting..."); + debug!( + target: "consensus::protocol_sync::handle_receive_slot_checkpoint()", + "slot checkpoint processed successfully, broadcasting..." + ); if let Err(e) = self .p2p .broadcast_with_exclude(slot_checkpoint_copy, &exclude_list) @@ -275,14 +340,18 @@ impl ProtocolSync { { error!( target: "consensus::protocol_sync::handle_receive_slot_checkpoint()", - "ProtocolSync::handle_receive_slot_checkpoint(): p2p broadcast fail: {}", + "p2p broadcast fail: {}", e ); }; } } Err(e) => { - debug!(target: "consensus::protocol_sync::handle_receive_slot_checkpoint()", "ProtocolSync::handle_receive_slot_checkpoint(): error processing finalized slot checkpoint: {}", e); + debug!( + target: "consensus::protocol_sync::handle_receive_slot_checkpoint()", + "error processing finalized slot checkpoint: {}", + e + ); } }; } @@ -292,7 +361,7 @@ impl ProtocolSync { #[async_trait] impl ProtocolBase for ProtocolSync { async fn start(self: Arc, executor: Arc>) -> Result<()> { - debug!(target: "consensus::protocol_sync::start()", "ProtocolSync::start() [START]"); + debug!(target: "consensus::protocol_sync::start()", "START"); self.jobsman.clone().start(executor.clone()); self.jobsman.clone().spawn(self.clone().handle_receive_request(), executor.clone()).await; self.jobsman @@ -304,7 +373,7 @@ impl ProtocolBase for ProtocolSync { .clone() .spawn(self.clone().handle_receive_slot_checkpoint(), executor.clone()) .await; - debug!(target: "consensus::protocol_sync::start()", "ProtocolSync::start() [END]"); + debug!(target: "consensus::protocol_sync::start()", "END"); Ok(()) } diff --git a/src/consensus/proto/protocol_sync_consensus.rs b/src/consensus/proto/protocol_sync_consensus.rs index a98b1882b..7da7224a5 100644 --- a/src/consensus/proto/protocol_sync_consensus.rs +++ b/src/consensus/proto/protocol_sync_consensus.rs @@ -68,17 +68,28 @@ impl ProtocolSyncConsensus { } async fn handle_receive_request(self: Arc) -> Result<()> { - debug!(target: "consensus::protocol_sync_consensus::handle_receive_request()", "ProtocolSyncConsensus::handle_receive_request() [START]"); + debug!( + target: "consensus::protocol_sync_consensus::handle_receive_request()", + "START" + ); loop { let req = match self.request_sub.receive().await { Ok(v) => v, Err(e) => { - debug!(target: "consensus::protocol_sync_consensus::handle_receive_request()", "ProtocolSyncConsensus::handle_receive_request() recv fail: {}", e); + debug!( + target: "consensus::protocol_sync_consensus::handle_receive_request()", + "recv fail: {}", + e + ); continue } }; - debug!(target: "consensus::protocol_sync_consensus::handle_receive_request()", "ProtocolSyncConsensuss::handle_receive_request() received {:?}", req); + debug!( + target: "consensus::protocol_sync_consensus::handle_receive_request()", + "received {:?}", + req + ); // Extra validations can be added here. let lock = self.state.read().await; @@ -102,25 +113,36 @@ impl ProtocolSyncConsensus { nullifiers, }; if let Err(e) = self.channel.send(response).await { - error!(target: "consensus::protocol_sync_consensus::handle_receive_request()", "ProtocolSyncConsensus::handle_receive_request() channel send fail: {}", e); + error!( + target: "consensus::protocol_sync_consensus::handle_receive_request()", + "channel send fail: {}", + e + ); }; } } async fn handle_receive_slot_checkpoints_request(self: Arc) -> Result<()> { - debug!(target: "consensus::protocol_sync_consensus::handle_receive_slot_checkpoints_request()", "ProtocolSyncConsensus::handle_receive_slot_checkpoints_request() [START]"); + debug!( + target: "consensus::protocol_sync_consensus::handle_receive_slot_checkpoints_request()", + "START" + ); loop { let req = match self.slot_checkpoints_request_sub.receive().await { Ok(v) => v, Err(e) => { - debug!(target: "consensus::protocol_sync_consensus::handle_receive_slot_checkpoints_request()", "ProtocolSyncConsensus::handle_receive_slot_checkpoints_request() recv fail: {}", e); + debug!( + target: "consensus::protocol_sync_consensus::handle_receive_slot_checkpoints_request()", + "recv fail: {}", + e + ); continue } }; debug!( target: "consensus::protocol_sync_consensus::handle_receive_slot_checkpoints_request()", - "ProtocolSyncConsensuss::handle_receive_slot_checkpoints_request() received {:?}", + "received {:?}", req ); @@ -130,7 +152,11 @@ impl ProtocolSyncConsensus { let is_empty = lock.consensus.slot_checkpoints.is_empty(); let response = ConsensusSlotCheckpointsResponse { bootstrap_slot, is_empty }; if let Err(e) = self.channel.send(response).await { - error!(target: "consensus::protocol_sync_consensus::handle_receive_slot_checkpoints_request()", "ProtocolSyncConsensus::handle_receive_slot_checkpoints_request() channel send fail: {}", e); + error!( + target: "consensus::protocol_sync_consensus::handle_receive_slot_checkpoints_request()", + "channel send fail: {}", + e + ); }; } } @@ -139,14 +165,20 @@ impl ProtocolSyncConsensus { #[async_trait] impl ProtocolBase for ProtocolSyncConsensus { async fn start(self: Arc, executor: Arc>) -> Result<()> { - debug!(target: "consensus::protocol_sync_consensus::start()", "ProtocolSyncConsensus::start() [START]"); + debug!( + target: "consensus::protocol_sync_consensus::start()", + "START" + ); self.jobsman.clone().start(executor.clone()); self.jobsman.clone().spawn(self.clone().handle_receive_request(), executor.clone()).await; self.jobsman .clone() .spawn(self.clone().handle_receive_slot_checkpoints_request(), executor.clone()) .await; - debug!(target: "consensus::protocol_sync_consensus::start()", "ProtocolSyncConsensus::start() [END]"); + debug!( + target: "consensus::protocol_sync_consensus::start()", + "END" + ); Ok(()) } diff --git a/src/consensus/proto/protocol_tx.rs b/src/consensus/proto/protocol_tx.rs index f08365fb8..f895ecb6f 100644 --- a/src/consensus/proto/protocol_tx.rs +++ b/src/consensus/proto/protocol_tx.rs @@ -53,7 +53,10 @@ impl ProtocolTx { state: ValidatorStatePtr, p2p: P2pPtr, ) -> Result { - debug!(target: "consensus::protocol_tx::init()", "Adding ProtocolTx to the protocol registry"); + debug!( + target: "consensus::protocol_tx::init()", + "Adding ProtocolTx to the protocol registry" + ); let msg_subsystem = channel.get_message_subsystem(); msg_subsystem.add_dispatch::().await; @@ -70,13 +73,20 @@ impl ProtocolTx { } async fn handle_receive_tx(self: Arc) -> Result<()> { - debug!(target: "consensus::protocol_tx::handle_receive_tx()", "ProtocolTx::handle_receive_tx() [START]"); + debug!( + target: "consensus::protocol_tx::handle_receive_tx()", + "START" + ); let exclude_list = vec![self.channel_address.clone()]; loop { let tx = match self.tx_sub.receive().await { Ok(v) => v, Err(e) => { - debug!(target: "consensus::protocol_tx::handle_receive_tx()", "ProtocolTx::handle_receive_tx(): recv fail: {}", e); + debug!( + target: "consensus::protocol_tx::handle_receive_tx()", + "recv fail: {}", + e + ); continue } }; @@ -86,7 +96,11 @@ impl ProtocolTx { // Nodes use unconfirmed_txs vector as seen_txs pool. if self.state.write().await.append_tx(tx_copy.clone()).await { if let Err(e) = self.p2p.broadcast_with_exclude(tx_copy, &exclude_list).await { - error!(target: "consensus::protocol_tx::handle_receive_tx()", "handle_receive_tx(): p2p broadcast fail: {}", e); + error!( + target: "consensus::protocol_tx::handle_receive_tx()", + "p2p broadcast fail: {}", + e + ); }; } } @@ -96,10 +110,10 @@ impl ProtocolTx { #[async_trait] impl ProtocolBase for ProtocolTx { async fn start(self: Arc, executor: Arc>) -> Result<()> { - debug!(target: "consensus::protocol_tx::start()", "ProtocolTx::start() [START]"); + debug!(target: "consensus::protocol_tx::start()", "START"); self.jobsman.clone().start(executor.clone()); self.jobsman.clone().spawn(self.clone().handle_receive_tx(), executor.clone()).await; - debug!(target: "consensus::protocol_tx::start()", "ProtocolTx::start() [END]"); + debug!(target: "consensus::protocol_tx::start()", "END"); Ok(()) }