diff --git a/src/consensus/proto/protocol_sync.rs b/src/consensus/proto/protocol_sync.rs index cb50d0336..41173048c 100644 --- a/src/consensus/proto/protocol_sync.rs +++ b/src/consensus/proto/protocol_sync.rs @@ -1,5 +1,5 @@ use async_executor::Executor; -use async_std::sync::Arc; +use async_std::sync::{Arc, Mutex}; use async_trait::async_trait; use log::{debug, error, warn}; @@ -27,6 +27,7 @@ pub struct ProtocolSync { state: ValidatorStatePtr, p2p: P2pPtr, consensus_mode: bool, + pending: Mutex, } impl ProtocolSync { @@ -51,11 +52,12 @@ impl ProtocolSync { state, p2p, consensus_mode, + pending: Mutex::new(false), })) } async fn handle_receive_request(self: Arc) -> Result<()> { - debug!("ProtocolSync::handle_receive_request() [START]"); + debug!("handle_receive_request() [START]"); loop { let order = match self.request_sub.receive().await { Ok(v) => v, @@ -65,7 +67,7 @@ impl ProtocolSync { } }; - debug!("ProtocolSync::handle_receive_request() received {:?}", order); + debug!("handle_receive_request() received {:?}", order); // Extra validations can be added here let key = order.sl; @@ -76,7 +78,7 @@ impl ProtocolSync { continue } }; - debug!("ProtocolSync::handle_receive_request(): Found {} blocks", blocks.len()); + debug!("handle_receive_request(): Found {} blocks", blocks.len()); let response = BlockResponse { blocks }; if let Err(e) = self.channel.send(response).await { @@ -86,7 +88,7 @@ impl ProtocolSync { } async fn handle_receive_block(self: Arc) -> Result<()> { - debug!("ProtocolSync::handle_receive_block() [START]"); + debug!("handle_receive_block() [START]"); loop { let info = match self.block_sub.receive().await { Ok(v) => v, @@ -96,7 +98,13 @@ impl ProtocolSync { } }; - debug!("ProtocolSync::handle_receive_block() received block"); + debug!("handle_receive_block() received block"); + + // We block here if there's a pending validation, otherwise we might + // apply the same block twice. + debug!("handle_receive_block(): Waiting for pending block to apply"); + while *self.pending.lock().await {} + debug!("handle_receive_block(): Pending lock released"); // Node stores finalized block, if it doesn't exist (checking by slot), // and removes its transactions from the unconfirmed_txs vector. @@ -104,12 +112,14 @@ impl ProtocolSync { // during proposal finalization. // Extra validations can be added here. if !self.consensus_mode { + *self.pending.lock().await = true; let info_copy = (*info).clone(); let has_block = match self.state.read().await.blockchain.has_block(&info_copy) { Ok(v) => v, Err(e) => { error!("handle_receive_block(): failed checking for has_block(): {}", e); + *self.pending.lock().await = false; continue } }; @@ -124,35 +134,42 @@ impl ProtocolSync { Ok(v) => v, Err(e) => { warn!("handle_receive_block(): State transition fail: {}", e); + *self.pending.lock().await = false; continue } }; - debug!("ProtocolSync::handle_receive_block(): All state transitions passed"); + debug!("handle_receive_block(): All state transitions passed"); - debug!("ProtocolSync::handle_receive_block(): Updating canon state machine"); + debug!("handle_receive_block(): Updating canon state machine"); if let Err(e) = self.state.write().await.update_canon_state(state_updates, None).await { error!("handle_receive_block(): Canon statemachine update fail: {}", e); + *self.pending.lock().await = false; continue }; - debug!("ProtocolSync::handle_receive_block(): Appending block to ledger"); + debug!("handle_receive_block(): Appending block to ledger"); if let Err(e) = self.state.write().await.blockchain.add(&[info_copy.clone()]) { error!("handle_receive_block(): blockchain.add() fail: {}", e); + *self.pending.lock().await = false; continue }; if let Err(e) = self.state.write().await.remove_txs(info_copy.txs.clone()) { error!("handle_receive_block(): remove_txs() fail: {}", e); + *self.pending.lock().await = false; continue }; if let Err(e) = self.p2p.broadcast(info_copy).await { error!("handle_receive_block(): p2p broadcast fail: {}", e); + *self.pending.lock().await = false; continue }; } + + *self.pending.lock().await = false; } } } diff --git a/src/consensus/proto/protocol_vote.rs b/src/consensus/proto/protocol_vote.rs index c0d0cac23..2e785a4a6 100644 --- a/src/consensus/proto/protocol_vote.rs +++ b/src/consensus/proto/protocol_vote.rs @@ -67,33 +67,24 @@ impl ProtocolVote { }; if voted { - match self.consensus_p2p.broadcast(vote_copy).await { - Ok(()) => {} - Err(e) => { - error!("handle_receive_vote(): consensus p2p broadcast fail: {}", e); - continue - } + if let Err(e) = self.consensus_p2p.broadcast(vote_copy).await { + error!("handle_receive_vote(): consensus p2p broadcast fail: {}", e); + continue }; // Broadcast finalized blocks info, if any - match to_broadcast { - Some(blocks) => { - debug!("handle_receive_vote(): Broadcasting finalized blocks"); - for info in blocks { - match self.sync_p2p.broadcast(info).await { - Ok(()) => {} - Err(e) => { - error!("handle_receive_vote(): sync p2p broadcast fail: {}", e); - continue - } - }; + if let Some(blocks) = to_broadcast { + debug!("handle_receive_vote(): Broadcasting finalized blocks"); + for info in blocks { + if let Err(e) = self.sync_p2p.broadcast(info).await { + error!("handle_receive_vote(): sync p2p broadcast fail: {}", e); + // TODO: Should we quit broadcasting if one fails? + continue } } - None => { - debug!("handle_receive_vote(): No finalized blocks to broadcast"); - continue - } - } + } else { + debug!("handle_receive_vote(): No finalized blocks to broadcast"); + }; } } } diff --git a/src/consensus/state.rs b/src/consensus/state.rs index 5a0a4c38a..29aff04a5 100644 --- a/src/consensus/state.rs +++ b/src/consensus/state.rs @@ -24,7 +24,7 @@ use crate::{ }, net, node::{ - state::{state_transition, StateUpdate}, + state::{state_transition, ProgramState, StateUpdate}, Client, MemoryState, State, }, tx::Transaction, @@ -33,7 +33,7 @@ use crate::{ }; /// `2 * DELTA` represents epoch time -pub const DELTA: u64 = 30; +pub const DELTA: u64 = 20; /// This struct represents the information required by the consensus algorithm #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] @@ -151,6 +151,10 @@ impl ValidatorState { burn_vk: Lazy::new(), })); + // Create zk proof verification keys + let _ = state_machine.lock().await.mint_vk(); + let _ = state_machine.lock().await.burn_vk(); + let state = Arc::new(RwLock::new(ValidatorState { address, secret, @@ -506,18 +510,18 @@ impl ValidatorState { let mut encoded_proposal = vec![]; if let Err(e) = vote.proposal.encode(&mut encoded_proposal) { - error!(target: "consensus", "Proposal encoding failed: {:?}", e); + error!("consensus: Proposal encoding failed: {:?}", e); return Ok((false, None)) }; + let va = vote.address.to_string(); if !vote.public_key.verify(&encoded_proposal, &vote.vote) { - warn!(target: "consensus", "Voter ({}), signature couldn't be verified", vote.address.to_string()); + warn!("consensus: Voter ({}), signature couldn't be verified", va); return Ok((false, None)) } // Node refreshes participants records self.refresh_participants()?; - let node_count = self.consensus.participants.len(); // Checking that the voter can actually vote. @@ -525,7 +529,7 @@ impl ValidatorState { Some(participant) => { let mut participant = participant.clone(); if current_epoch <= participant.joined { - warn!(target: "consensus", "Voter ({}) joined after current epoch.", vote.address.to_string()); + warn!("consensus: Voter ({}) joined after current epoch.", va); return Ok((false, None)) } @@ -542,7 +546,7 @@ impl ValidatorState { self.consensus.participants.insert(participant.address, participant); } None => { - warn!(target: "consensus", "Voter ({}) is not a participant!", vote.address.to_string()); + warn!("consensus: Voter ({}) is not a participant!", vote.address.to_string()); return Ok((false, None)) } } @@ -550,7 +554,7 @@ impl ValidatorState { let proposal = match self.find_proposal(&vote.proposal) { Ok(v) => v, Err(e) => { - error!(target: "consensus", "find_proposal() failed: {}", e); + error!("consensus: find_proposal() failed: {}", e); return Err(e) } }; @@ -581,7 +585,7 @@ impl ValidatorState { to_broadcast = v; } Err(e) => { - error!(target: "consensus", "Block finalization failed: {}", e); + error!("consensus: Block finalization failed: {}", e); return Err(e) } } diff --git a/src/crypto/burn_proof.rs b/src/crypto/burn_proof.rs index ea6471b2d..cf6f10cf7 100644 --- a/src/crypto/burn_proof.rs +++ b/src/crypto/burn_proof.rs @@ -152,7 +152,7 @@ pub fn create_burn_proof( let start = Instant::now(); let public_inputs = revealed.make_outputs(); let proof = Proof::create(pk, &[c], &public_inputs, &mut OsRng)?; - debug!("Prove: [{:?}]", start.elapsed()); + debug!("Prove burn: [{:?}]", start.elapsed()); Ok((proof, revealed)) } @@ -162,6 +162,9 @@ pub fn verify_burn_proof( proof: &Proof, revealed: &BurnRevealedValues, ) -> Result<()> { + let start = Instant::now(); let public_inputs = revealed.make_outputs(); - Ok(proof.verify(vk, &public_inputs)?) + proof.verify(vk, &public_inputs)?; + debug!("Verify burn: [{:?}]", start.elapsed()); + Ok(()) } diff --git a/src/crypto/mint_proof.rs b/src/crypto/mint_proof.rs index 7df36f8fb..ecfcb370a 100644 --- a/src/crypto/mint_proof.rs +++ b/src/crypto/mint_proof.rs @@ -103,7 +103,7 @@ pub fn create_mint_proof( let start = Instant::now(); let public_inputs = revealed.make_outputs(); let proof = Proof::create(pk, &[c], &public_inputs, &mut OsRng)?; - debug!("Prove: [{:?}]", start.elapsed()); + debug!("Prove mint: [{:?}]", start.elapsed()); Ok((proof, revealed)) } @@ -113,6 +113,9 @@ pub fn verify_mint_proof( proof: &Proof, revealed: &MintRevealedValues, ) -> Result<()> { + let start = Instant::now(); let public_inputs = revealed.make_outputs(); - Ok(proof.verify(vk, &public_inputs)?) + proof.verify(vk, &public_inputs)?; + debug!("Verify mint: [{:?}]", start.elapsed()); + Ok(()) }