cleanup logs for consensus::proto::*

This commit is contained in:
x
2023-01-02 16:41:19 +01:00
parent f6dd466ccb
commit ff6eba45ec
4 changed files with 183 additions and 55 deletions

View File

@@ -63,27 +63,40 @@ impl ProtocolProposal {
}
async fn handle_receive_proposal(self: Arc<Self>) -> Result<()> {
debug!(target: "consensus::protocol_proposal::handle_receive_proposal()", "ProtocolProposal::handle_receive_proposal() [START]");
debug!(target: "consensus::protocol_proposal::handle_receive_proposal()", "START");
let exclude_list = vec![self.channel_address.clone()];
loop {
let proposal = match self.proposal_sub.receive().await {
Ok(v) => v,
Err(e) => {
debug!(target: "consensus::protocol_proposal::handle_receive_proposal()", "ProtocolProposal::handle_receive_proposal(): recv fail: {}", e);
debug!(
target: "consensus::protocol_proposal::handle_receive_proposal()",
"recv fail: {}",
e
);
continue
}
};
debug!(target: "consensus::protocol_proposal::handle_receive_proposal()", "ProtocolProposal::handle_receive_proposal(): recv: {}", proposal);
trace!(target: "consensus::protocol_proposal::handle_receive_proposal()", "ProtocolProposal::handle_receive_proposal(): Full proposal: {:?}", proposal);
debug!(
target: "consensus::protocol_proposal::handle_receive_proposal()",
"recv: {}", proposal);
trace!(
target: "consensus::protocol_proposal::handle_receive_proposal()",
"Full proposal: {:?}",
proposal
);
let proposal_copy = (*proposal).clone();
// Verify we have the proposal already
let mut lock = self.state.write().await;
if lock.consensus.proposal_exists(&proposal_copy.hash) {
debug!(target: "consensus::protocol_proposal::handle_receive_proposal()", "ProtocolProposal::handle_receive_proposal(): Proposal already received.");
debug!(
target: "consensus::protocol_proposal::handle_receive_proposal()",
"Proposal already received."
);
continue
}
@@ -96,7 +109,7 @@ impl ProtocolProposal {
{
error!(
target: "consensus::protocol_proposal::handle_receive_proposal()",
"ProtocolProposal::handle_receive_proposal(): proposal broadcast fail: {}",
"proposal broadcast fail: {}",
e
);
};
@@ -105,7 +118,7 @@ impl ProtocolProposal {
Err(e) => {
error!(
target: "consensus::protocol_proposal::handle_receive_proposal()",
"ProtocolProposal::handle_receive_proposal(): receive_proposal error: {}",
"receive_proposal error: {}",
e
);
continue
@@ -118,10 +131,10 @@ impl ProtocolProposal {
#[async_trait]
impl ProtocolBase for ProtocolProposal {
async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
debug!(target: "consensus::protocol_proposal::start()", "ProtocolProposal::start() [START]");
debug!(target: "consensus::protocol_proposal::start()", "START");
self.jobsman.clone().start(executor.clone());
self.jobsman.clone().spawn(self.clone().handle_receive_proposal(), executor.clone()).await;
debug!(target: "consensus::protocol_proposal::start()", "ProtocolProposal::start() [END]");
debug!(target: "consensus::protocol_proposal::start()", "END");
Ok(())
}

View File

@@ -81,44 +81,71 @@ impl ProtocolSync {
}
async fn handle_receive_request(self: Arc<Self>) -> Result<()> {
debug!(target: "consensus::protocol_sync::handle_receive_request()", "ProtocolSync::handle_receive_request() [START]");
debug!(
target: "consensus::protocol_sync::handle_receive_request()",
"START"
);
loop {
let order = match self.request_sub.receive().await {
Ok(v) => v,
Err(e) => {
debug!(target: "consensus::protocol_sync::handle_receive_request()", "ProtocolSync::handle_receive_request(): recv fail: {}", e);
debug!(
target: "consensus::protocol_sync::handle_receive_request()",
"recv fail: {}",
e
);
continue
}
};
debug!(target: "consensus::protocol_sync::handle_receive_request()", "ProtocolSync::handle_receive_request() received {:?}", order);
debug!(
target: "consensus::protocol_sync::handle_receive_request()",
"received {:?}",
order
);
// Extra validations can be added here
let key = order.slot;
let blocks = match self.state.read().await.blockchain.get_blocks_after(key, BATCH) {
Ok(v) => v,
Err(e) => {
error!(target: "consensus::protocol_sync::handle_receive_request()", "ProtocolSync::handle_receive_request(): get_blocks_after fail: {}", e);
error!(
target: "consensus::protocol_sync::handle_receive_request()",
"get_blocks_after fail: {}",
e
);
continue
}
};
debug!(target: "consensus::protocol_sync::handle_receive_request()", "ProtocolSync::handle_receive_request(): Found {} blocks", blocks.len());
debug!(
target: "consensus::protocol_sync::handle_receive_request()",
"Found {} blocks",
blocks.len()
);
let response = BlockResponse { blocks };
if let Err(e) = self.channel.send(response).await {
error!(target: "consensus::protocol_sync::handle_receive_request()", "ProtocolSync::handle_receive_request(): channel send fail: {}", e)
error!(
target: "consensus::protocol_sync::handle_receive_request()",
"channel send fail: {}",
e
)
};
}
}
async fn handle_receive_block(self: Arc<Self>) -> Result<()> {
debug!(target: "consensus::protocol_sync::handle_receive_block()", "ProtocolSync::handle_receive_block() [START]");
debug!(target: "consensus::protocol_sync::handle_receive_block()", "START");
let exclude_list = vec![self.channel.address()];
loop {
let info = match self.block_sub.receive().await {
Ok(v) => v,
Err(e) => {
debug!(target: "consensus::protocol_sync::handle_receive_block()", "ProtocolSync::handle_receive_block(): recv fail: {}", e);
debug!(
target: "consensus::protocol_sync::handle_receive_block()",
"recv fail: {}",
e
);
continue
}
};
@@ -136,55 +163,76 @@ impl ProtocolSync {
if current >= slot {
debug!(
target: "consensus::protocol_sync::handle_receive_block()",
"ProtocolSync::handle_receive_block(): node runs in consensus mode, skipping..."
"node runs in consensus mode, skipping..."
);
continue
}
}
}
info!(target: "consensus::protocol_sync::handle_receive_block()", "ProtocolSync::handle_receive_block(): Received block: {}", info.blockhash());
info!(
target: "consensus::protocol_sync::handle_receive_block()",
"Received block: {}",
info.blockhash()
);
debug!(target: "consensus::protocol_sync::handle_receive_block()", "ProtocolSync::handle_receive_block(): Processing received block");
debug!(
target: "consensus::protocol_sync::handle_receive_block()",
"Processing received block"
);
let info_copy = (*info).clone();
match self.state.write().await.receive_finalized_block(info_copy.clone()).await {
Ok(v) => {
if v {
debug!(target: "consensus::protocol_sync::handle_receive_block()", "ProtocolProposal::handle_receive_block(): block processed successfully, broadcasting...");
debug!(
target: "consensus::protocol_sync::handle_receive_block()",
"block processed successfully, broadcasting..."
);
if let Err(e) =
self.p2p.broadcast_with_exclude(info_copy, &exclude_list).await
{
error!(
target: "consensus::protocol_sync::handle_receive_block()",
"ProtocolSync::handle_receive_block(): p2p broadcast fail: {}",
"p2p broadcast fail: {}",
e
);
};
}
}
Err(e) => {
debug!(target: "consensus::protocol_sync::handle_receive_block()", "ProtocolSync::handle_receive_block(): error processing finalized block: {}", e);
debug!(
target: "consensus::protocol_sync::handle_receive_block()",
"error processing finalized block: {}",
e
);
}
};
}
}
async fn handle_receive_slot_checkpoint_request(self: Arc<Self>) -> Result<()> {
debug!(target: "consensus::protocol_sync::handle_receive_slot_checkpoint_request()", "ProtocolSync::handle_receive_slot_checkpoint_request() [START]");
debug!(
target: "consensus::protocol_sync::handle_receive_slot_checkpoint_request()",
"START"
);
loop {
let request = match self.slot_checkpoin_request_sub.receive().await {
Ok(v) => v,
Err(e) => {
debug!(
target: "consensus::protocol_sync::handle_receive_slot_checkpoint_request()",
"ProtocolSync::handle_receive_slot_checkpoint_request(): recv fail: {}",
"recv fail: {}",
e
);
continue
}
};
debug!(target: "consensus::protocol_sync::handle_receive_slot_checkpoint_request()", "ProtocolSync::handle_receive_slot_checkpoint_request() received {:?}", request);
debug!(
target: "consensus::protocol_sync::handle_receive_slot_checkpoint_request()",
"received {:?}",
request
);
// Extra validations can be added here
let key = request.slot;
@@ -197,13 +245,17 @@ impl ProtocolSync {
{
Ok(v) => v,
Err(e) => {
error!(target: "consensus::protocol_sync::handle_receive_slot_checkpoint_request()", "ProtocolSync::handle_receive_slot_checkpoint_request(): get_slot_checkpoints_after fail: {}", e);
error!(
target: "consensus::protocol_sync::handle_receive_slot_checkpoint_request()",
"get_slot_checkpoints_after fail: {}",
e
);
continue
}
};
debug!(
target: "consensus::protocol_sync::handle_receive_slot_checkpoint_request()",
"ProtocolSync::handle_receive_slot_checkpoint_request(): Found {} slot checkpoints",
"Found {} slot checkpoints",
slot_checkpoints.len()
);
@@ -211,7 +263,7 @@ impl ProtocolSync {
if let Err(e) = self.channel.send(response).await {
error!(
target: "consensus::protocol_sync::handle_receive_slot_checkpoint_request()",
"ProtocolSync::handle_receive_slot_checkpoint_request(): channel send fail: {}",
"channel send fail: {}",
e
)
};
@@ -219,13 +271,20 @@ impl ProtocolSync {
}
async fn handle_receive_slot_checkpoint(self: Arc<Self>) -> Result<()> {
debug!(target: "consensus::protocol_sync::handle_receive_slot_checkpoint()", "ProtocolSync::handle_receive_slot_checkpoint() [START]");
debug!(
target: "consensus::protocol_sync::handle_receive_slot_checkpoint()",
"START"
);
let exclude_list = vec![self.channel.address()];
loop {
let slot_checkpoint = match self.slot_checkpoints_sub.receive().await {
Ok(v) => v,
Err(e) => {
debug!(target: "consensus::protocol_sync::handle_receive_slot_checkpoint()", "ProtocolSync::handle_receive_slot_checkpoint(): recv fail: {}", e);
debug!(
target: "consensus::protocol_sync::handle_receive_slot_checkpoint()",
"recv fail: {}",
e
);
continue
}
};
@@ -243,7 +302,7 @@ impl ProtocolSync {
if current >= slot {
debug!(
target: "consensus::protocol_sync::handle_receive_slot_checkpoint()",
"ProtocolSync::handle_receive_block(): node runs in consensus mode, skipping..."
"node runs in consensus mode, skipping..."
);
continue
}
@@ -252,11 +311,14 @@ impl ProtocolSync {
info!(
target: "consensus::protocol_sync::handle_receive_slot_checkpoint()",
"ProtocolSync::handle_receive_slot_checkpoint(): Received slot checkpoint: {}",
"Received slot checkpoint: {}",
slot_checkpoint.slot
);
debug!(target: "consensus::protocol_sync::handle_receive_slot_checkpoint()", "ProtocolSync::handle_receive_slot_checkpoint(): Processing received slot checkpoint");
debug!(
target: "consensus::protocol_sync::handle_receive_slot_checkpoint()",
"Processing received slot checkpoint"
);
let slot_checkpoint_copy = (*slot_checkpoint).clone();
match self
.state
@@ -267,7 +329,10 @@ impl ProtocolSync {
{
Ok(v) => {
if v {
debug!(target: "consensus::protocol_sync::handle_receive_slot_checkpoint()", "ProtocolProposal::handle_receive_slot_checkpoint(): slot checkpoint processed successfully, broadcasting...");
debug!(
target: "consensus::protocol_sync::handle_receive_slot_checkpoint()",
"slot checkpoint processed successfully, broadcasting..."
);
if let Err(e) = self
.p2p
.broadcast_with_exclude(slot_checkpoint_copy, &exclude_list)
@@ -275,14 +340,18 @@ impl ProtocolSync {
{
error!(
target: "consensus::protocol_sync::handle_receive_slot_checkpoint()",
"ProtocolSync::handle_receive_slot_checkpoint(): p2p broadcast fail: {}",
"p2p broadcast fail: {}",
e
);
};
}
}
Err(e) => {
debug!(target: "consensus::protocol_sync::handle_receive_slot_checkpoint()", "ProtocolSync::handle_receive_slot_checkpoint(): error processing finalized slot checkpoint: {}", e);
debug!(
target: "consensus::protocol_sync::handle_receive_slot_checkpoint()",
"error processing finalized slot checkpoint: {}",
e
);
}
};
}
@@ -292,7 +361,7 @@ impl ProtocolSync {
#[async_trait]
impl ProtocolBase for ProtocolSync {
async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
debug!(target: "consensus::protocol_sync::start()", "ProtocolSync::start() [START]");
debug!(target: "consensus::protocol_sync::start()", "START");
self.jobsman.clone().start(executor.clone());
self.jobsman.clone().spawn(self.clone().handle_receive_request(), executor.clone()).await;
self.jobsman
@@ -304,7 +373,7 @@ impl ProtocolBase for ProtocolSync {
.clone()
.spawn(self.clone().handle_receive_slot_checkpoint(), executor.clone())
.await;
debug!(target: "consensus::protocol_sync::start()", "ProtocolSync::start() [END]");
debug!(target: "consensus::protocol_sync::start()", "END");
Ok(())
}

View File

@@ -68,17 +68,28 @@ impl ProtocolSyncConsensus {
}
async fn handle_receive_request(self: Arc<Self>) -> Result<()> {
debug!(target: "consensus::protocol_sync_consensus::handle_receive_request()", "ProtocolSyncConsensus::handle_receive_request() [START]");
debug!(
target: "consensus::protocol_sync_consensus::handle_receive_request()",
"START"
);
loop {
let req = match self.request_sub.receive().await {
Ok(v) => v,
Err(e) => {
debug!(target: "consensus::protocol_sync_consensus::handle_receive_request()", "ProtocolSyncConsensus::handle_receive_request() recv fail: {}", e);
debug!(
target: "consensus::protocol_sync_consensus::handle_receive_request()",
"recv fail: {}",
e
);
continue
}
};
debug!(target: "consensus::protocol_sync_consensus::handle_receive_request()", "ProtocolSyncConsensuss::handle_receive_request() received {:?}", req);
debug!(
target: "consensus::protocol_sync_consensus::handle_receive_request()",
"received {:?}",
req
);
// Extra validations can be added here.
let lock = self.state.read().await;
@@ -102,25 +113,36 @@ impl ProtocolSyncConsensus {
nullifiers,
};
if let Err(e) = self.channel.send(response).await {
error!(target: "consensus::protocol_sync_consensus::handle_receive_request()", "ProtocolSyncConsensus::handle_receive_request() channel send fail: {}", e);
error!(
target: "consensus::protocol_sync_consensus::handle_receive_request()",
"channel send fail: {}",
e
);
};
}
}
async fn handle_receive_slot_checkpoints_request(self: Arc<Self>) -> Result<()> {
debug!(target: "consensus::protocol_sync_consensus::handle_receive_slot_checkpoints_request()", "ProtocolSyncConsensus::handle_receive_slot_checkpoints_request() [START]");
debug!(
target: "consensus::protocol_sync_consensus::handle_receive_slot_checkpoints_request()",
"START"
);
loop {
let req = match self.slot_checkpoints_request_sub.receive().await {
Ok(v) => v,
Err(e) => {
debug!(target: "consensus::protocol_sync_consensus::handle_receive_slot_checkpoints_request()", "ProtocolSyncConsensus::handle_receive_slot_checkpoints_request() recv fail: {}", e);
debug!(
target: "consensus::protocol_sync_consensus::handle_receive_slot_checkpoints_request()",
"recv fail: {}",
e
);
continue
}
};
debug!(
target: "consensus::protocol_sync_consensus::handle_receive_slot_checkpoints_request()",
"ProtocolSyncConsensuss::handle_receive_slot_checkpoints_request() received {:?}",
"received {:?}",
req
);
@@ -130,7 +152,11 @@ impl ProtocolSyncConsensus {
let is_empty = lock.consensus.slot_checkpoints.is_empty();
let response = ConsensusSlotCheckpointsResponse { bootstrap_slot, is_empty };
if let Err(e) = self.channel.send(response).await {
error!(target: "consensus::protocol_sync_consensus::handle_receive_slot_checkpoints_request()", "ProtocolSyncConsensus::handle_receive_slot_checkpoints_request() channel send fail: {}", e);
error!(
target: "consensus::protocol_sync_consensus::handle_receive_slot_checkpoints_request()",
"channel send fail: {}",
e
);
};
}
}
@@ -139,14 +165,20 @@ impl ProtocolSyncConsensus {
#[async_trait]
impl ProtocolBase for ProtocolSyncConsensus {
async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
debug!(target: "consensus::protocol_sync_consensus::start()", "ProtocolSyncConsensus::start() [START]");
debug!(
target: "consensus::protocol_sync_consensus::start()",
"START"
);
self.jobsman.clone().start(executor.clone());
self.jobsman.clone().spawn(self.clone().handle_receive_request(), executor.clone()).await;
self.jobsman
.clone()
.spawn(self.clone().handle_receive_slot_checkpoints_request(), executor.clone())
.await;
debug!(target: "consensus::protocol_sync_consensus::start()", "ProtocolSyncConsensus::start() [END]");
debug!(
target: "consensus::protocol_sync_consensus::start()",
"END"
);
Ok(())
}

View File

@@ -53,7 +53,10 @@ impl ProtocolTx {
state: ValidatorStatePtr,
p2p: P2pPtr,
) -> Result<ProtocolBasePtr> {
debug!(target: "consensus::protocol_tx::init()", "Adding ProtocolTx to the protocol registry");
debug!(
target: "consensus::protocol_tx::init()",
"Adding ProtocolTx to the protocol registry"
);
let msg_subsystem = channel.get_message_subsystem();
msg_subsystem.add_dispatch::<Transaction>().await;
@@ -70,13 +73,20 @@ impl ProtocolTx {
}
async fn handle_receive_tx(self: Arc<Self>) -> Result<()> {
debug!(target: "consensus::protocol_tx::handle_receive_tx()", "ProtocolTx::handle_receive_tx() [START]");
debug!(
target: "consensus::protocol_tx::handle_receive_tx()",
"START"
);
let exclude_list = vec![self.channel_address.clone()];
loop {
let tx = match self.tx_sub.receive().await {
Ok(v) => v,
Err(e) => {
debug!(target: "consensus::protocol_tx::handle_receive_tx()", "ProtocolTx::handle_receive_tx(): recv fail: {}", e);
debug!(
target: "consensus::protocol_tx::handle_receive_tx()",
"recv fail: {}",
e
);
continue
}
};
@@ -86,7 +96,11 @@ impl ProtocolTx {
// Nodes use unconfirmed_txs vector as seen_txs pool.
if self.state.write().await.append_tx(tx_copy.clone()).await {
if let Err(e) = self.p2p.broadcast_with_exclude(tx_copy, &exclude_list).await {
error!(target: "consensus::protocol_tx::handle_receive_tx()", "handle_receive_tx(): p2p broadcast fail: {}", e);
error!(
target: "consensus::protocol_tx::handle_receive_tx()",
"p2p broadcast fail: {}",
e
);
};
}
}
@@ -96,10 +110,10 @@ impl ProtocolTx {
#[async_trait]
impl ProtocolBase for ProtocolTx {
async fn start(self: Arc<Self>, executor: Arc<Executor<'_>>) -> Result<()> {
debug!(target: "consensus::protocol_tx::start()", "ProtocolTx::start() [START]");
debug!(target: "consensus::protocol_tx::start()", "START");
self.jobsman.clone().start(executor.clone());
self.jobsman.clone().spawn(self.clone().handle_receive_tx(), executor.clone()).await;
debug!(target: "consensus::protocol_tx::start()", "ProtocolTx::start() [END]");
debug!(target: "consensus::protocol_tx::start()", "END");
Ok(())
}