consensus/proto: Cleanup.

This commit is contained in:
parazyd
2022-05-01 12:01:42 +02:00
parent 2972a2d5e6
commit 4ff4d43ca4
5 changed files with 49 additions and 83 deletions

View File

@@ -54,13 +54,10 @@ impl ProtocolParticipant {
let participant_copy = (*participant).clone();
if self.state.write().await.append_participant(participant_copy.clone()) {
match self.p2p.broadcast(participant_copy).await {
Ok(()) => {}
Err(e) => {
error!("ProtocolParticipant::handle_receive_participant(): p2p broadcast failed: {}", e);
continue
}
}
if let Err(e) = self.p2p.broadcast(participant_copy).await {
error!("ProtocolParticipant::handle_receive_participant(): p2p broadcast failed: {}", e);
continue
};
}
}
}

View File

@@ -58,49 +58,42 @@ impl ProtocolProposal {
debug!("handle_receive_proposal(): Starting state transition validation");
let canon_state_clone = self.state.read().await.state_machine.lock().await.clone();
let mem_state = MemoryState::new(canon_state_clone);
match ValidatorState::validate_state_transitions(mem_state, &proposal_copy.block.txs) {
Ok(_) => {
debug!("handle_receive_proposal(): State transition valid")
}
Ok(_) => debug!("handle_receive_proposal(): State transition valid"),
Err(e) => {
warn!("handle_receive_proposal(): State transition fail: {}", e);
continue
}
}
let vote = self.state.write().await.receive_proposal(&proposal_copy);
match vote {
let vote = match self.state.write().await.receive_proposal(&proposal_copy) {
Ok(v) => {
if v.is_none() {
debug!("Node did not vote for the proposed block.");
} else {
let vote = v.unwrap();
match self.state.write().await.receive_vote(&vote).await {
Ok(_) => {}
Err(e) => {
error!("receive_vote() error: {}", e);
continue
}
};
// Broadcast block to rest of nodes
match self.p2p.broadcast(proposal_copy).await {
Ok(()) => {}
Err(e) => {
error!("handle_receive_proposal(): proposal broadcast fail: {}", e);
}
};
// Broadcast vote
match self.p2p.broadcast(vote).await {
Ok(()) => {}
Err(e) => {
error!("handle_receive_proposal(): vote broadcast fail: {}", e);
}
};
debug!("handle_receive_proposal(): Node didn't vote for proposed block.");
continue
}
v.unwrap()
}
Err(e) => {
debug!("ProtocolProposal::handle_receive_proposal() error processing proposal: {:?}", e);
debug!("ProtocolProposal::handle_receive_proposal(): error processing proposal: {}", e);
continue
}
};
if let Err(e) = self.state.write().await.receive_vote(&vote).await {
error!("handle_receive_proposal(): receive_vote error: {}", e);
continue
}
// Broadcast block to rest of nodes
if let Err(e) = self.p2p.broadcast(proposal_copy).await {
error!("handle_receive_proposal(): proposal broadcast fail: {}", e);
};
// Broadcast vote
if let Err(e) = self.p2p.broadcast(vote).await {
error!("handle_receive_proposal(): vote broadcast fail: {}", e);
}
}
}

View File

@@ -79,11 +79,8 @@ impl ProtocolSync {
debug!("ProtocolSync::handle_receive_request(): Found {} blocks", blocks.len());
let response = BlockResponse { blocks };
match self.channel.send(response).await {
Ok(()) => {}
Err(e) => {
error!("ProtocolSync::handle_receive_request(): channel send fail: {}", e)
}
if let Err(e) = self.channel.send(response).await {
error!("ProtocolSync::handle_receive_request(): channel send fail: {}", e)
};
}
}
@@ -133,37 +130,27 @@ impl ProtocolSync {
debug!("ProtocolSync::handle_receive_block(): All state transitions passed");
debug!("ProtocolSync::handle_receive_block(): Updating canon state machine");
match self.state.write().await.update_canon_state(state_updates, None).await {
Ok(()) => {}
Err(e) => {
error!("handle_receive_block(): Canon statemachine update fail: {}", e);
continue
}
}
if let Err(e) =
self.state.write().await.update_canon_state(state_updates, None).await
{
error!("handle_receive_block(): Canon statemachine update fail: {}", e);
continue
};
debug!("ProtocolSync::handle_receive_block(): Appending block to ledger");
match self.state.write().await.blockchain.add(&[info_copy.clone()]) {
Ok(_) => {}
Err(e) => {
error!("handle_receive_block(): blockchain.add() fail: {}", e);
continue
}
if let Err(e) = self.state.write().await.blockchain.add(&[info_copy.clone()]) {
error!("handle_receive_block(): blockchain.add() fail: {}", e);
continue
};
match self.state.write().await.remove_txs(info_copy.txs.clone()) {
Ok(()) => {}
Err(e) => {
error!("handle_receive_block(): remove_txs() fail: {}", e);
continue
}
if let Err(e) = self.state.write().await.remove_txs(info_copy.txs.clone()) {
error!("handle_receive_block(): remove_txs() fail: {}", e);
continue
};
match self.p2p.broadcast(info_copy).await {
Ok(()) => {}
Err(e) => {
error!("handle_receive_block(): p2p broadcast fail: {}", e);
continue
}
if let Err(e) = self.p2p.broadcast(info_copy).await {
error!("handle_receive_block(): p2p broadcast fail: {}", e);
continue
};
}
}

View File

@@ -57,14 +57,8 @@ impl ProtocolSyncConsensus {
// Extra validations can be added here.
let consensus = self.state.read().await.consensus.clone();
let response = ConsensusResponse { consensus };
match self.channel.send(response).await {
Ok(()) => {}
Err(e) => {
error!(
"ProtocolSyncConsensus::handle_receive_request() channel send fail: {}",
e
);
}
if let Err(e) = self.channel.send(response).await {
error!("ProtocolSyncConsensus::handle_receive_request() channel send fail: {}", e);
};
}
}

View File

@@ -60,8 +60,6 @@ impl ProtocolTx {
}
};
debug!("ProtocolTx::handle_receive_tx() recv: {:?}", tx);
let tx_copy = (*tx).clone();
let tx_hash = blake3::hash(&serialize(&tx_copy));
@@ -92,12 +90,9 @@ impl ProtocolTx {
// Nodes use unconfirmed_txs vector as seen_txs pool.
if self.state.write().await.append_tx(tx_copy.clone()) {
match self.p2p.broadcast(tx_copy).await {
Ok(()) => {}
Err(e) => {
error!("handle_receive_tx(): p2p broadcast fail: {}", e);
continue
}
if let Err(e) = self.p2p.broadcast(tx_copy).await {
error!("handle_receive_tx(): p2p broadcast fail: {}", e);
continue
};
}
}