From 8fb6a838382b697e1efe584005bac978a9c1901f Mon Sep 17 00:00:00 2001 From: parazyd Date: Sat, 16 Apr 2022 13:43:03 +0200 Subject: [PATCH] consensus: Use FxIndexMap for participants and clean up. --- Cargo.toml | 1 + src/consensus2/block.rs | 16 ++--- src/consensus2/metadata.rs | 2 +- src/consensus2/mod.rs | 7 +- src/consensus2/state.rs | 127 +++++++++++++++++++++++-------------- src/consensus2/util.rs | 10 +-- 6 files changed, 97 insertions(+), 66 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 44ac03838..377ec69e8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -207,6 +207,7 @@ blockchain = [ blockchain2 = [ "blake3", "chrono", + "indexmap", "sled", "crypto", diff --git a/src/consensus2/block.rs b/src/consensus2/block.rs index 0b4ee64e5..3fc0f9674 100644 --- a/src/consensus2/block.rs +++ b/src/consensus2/block.rs @@ -1,8 +1,8 @@ use std::io; -use log::warn; +use log::debug; -use super::{util::Timestamp, Metadata, StreamletMetadata, Tx}; +use super::{Metadata, StreamletMetadata, Timestamp, Tx}; use crate::{ crypto::{keypair::PublicKey, schnorr::Signature}, impl_vec, net, @@ -117,13 +117,13 @@ impl_vec!(BlockProposal); /// This struct represents a sequence of block proposals. #[derive(Debug, Clone, PartialEq, SerialEncodable, SerialDecodable)] pub struct ProposalChain { - pub genesis: blake3::Hash, + pub genesis_block: blake3::Hash, pub proposals: Vec, } impl ProposalChain { - pub fn new(genesis: blake3::Hash, initial_proposal: BlockProposal) -> Self { - Self { genesis, proposals: vec![initial_proposal] } + pub fn new(genesis_block: blake3::Hash, initial_proposal: BlockProposal) -> Self { + Self { genesis_block, proposals: vec![initial_proposal] } } /// A proposal is considered valid when its parent hash is equal to the @@ -131,14 +131,14 @@ impl ProposalChain { /// excluding the genesis block proposal. /// Additional validity rules can be applied. pub fn check_proposal(&self, proposal: &BlockProposal, previous: &BlockProposal) -> bool { - if proposal.st == self.genesis { - warn!("Genesis block proposal provided."); + if proposal.st == self.genesis_block { + debug!("check_proposal(): Genesis block proposal provided."); return false } let prev_hash = previous.hash(); if proposal.st != prev_hash || proposal.sl <= previous.sl { - warn!("Provided proposal is invalid."); + debug!("check_proposal(): Provided proposal is invalid."); return false } diff --git a/src/consensus2/metadata.rs b/src/consensus2/metadata.rs index 0bd353394..bbad7e1a2 100644 --- a/src/consensus2/metadata.rs +++ b/src/consensus2/metadata.rs @@ -1,4 +1,4 @@ -use super::{util::Timestamp, Participant, Vote}; +use super::{Participant, Timestamp, Vote}; use crate::util::serial::{SerialDecodable, SerialEncodable}; /// This struct represents additional [`Block`] information used by diff --git a/src/consensus2/mod.rs b/src/consensus2/mod.rs index 43e026f07..06367e924 100644 --- a/src/consensus2/mod.rs +++ b/src/consensus2/mod.rs @@ -1,6 +1,6 @@ /// Block definition pub mod block; -pub use block::{Block, ProposalChain}; +pub use block::{Block, BlockProposal, ProposalChain}; /// Transactions pub mod tx; @@ -20,10 +20,11 @@ pub use vote::Vote; /// Consensus state pub mod state; -pub use state::ValidatorState; +pub use state::{ValidatorState, ValidatorStatePtr}; -/// Utility functions +/// Utility functions and types pub mod util; +pub use util::Timestamp; use lazy_static::lazy_static; lazy_static! { diff --git a/src/consensus2/state.rs b/src/consensus2/state.rs index 6bab5e8c2..9ee963ac2 100644 --- a/src/consensus2/state.rs +++ b/src/consensus2/state.rs @@ -1,21 +1,17 @@ // TODO: Use sets instead of vectors where possible. -use std::{ - collections::BTreeMap, - hash::{Hash, Hasher}, - time::Duration, -}; +use std::time::Duration; use async_std::sync::{Arc, RwLock}; use chrono::{NaiveDateTime, Utc}; -use fxhash::FxHasher; +use fxhash::FxBuildHasher; +use indexmap::IndexMap; use log::{debug, error, info, warn}; -use rand::rngs::OsRng; +use rand::{rngs::OsRng, Rng}; use super::{ - block::{Block, BlockProposal}, - util::{get_current_time, Timestamp}, - Metadata, Participant, ProposalChain, StreamletMetadata, Tx, Vote, + Block, BlockProposal, Metadata, Participant, ProposalChain, StreamletMetadata, Timestamp, Tx, + Vote, }; use crate::{ blockchain2::Blockchain, @@ -27,14 +23,16 @@ use crate::{ Result, }; -const DELTA: u64 = 5; +type FxIndexMap = IndexMap; + +const DELTA: u64 = 10; /// This struct represents the information required by the consensus algorithm #[derive(Debug)] pub struct ConsensusState { /// Genesis block creation timestamp pub genesis_ts: Timestamp, - /// Genesis block + /// Genesis block hash pub genesis_block: blake3::Hash, /// Last finalized block hash, pub last_block: blake3::Hash, @@ -46,7 +44,7 @@ pub struct ConsensusState { /// corresponding block pub orphan_votes: Vec, /// Validators currently participating in the consensus - pub participants: BTreeMap, + pub participants: FxIndexMap, /// Validators to be added on the next epoch as participants pub pending_participants: Vec, } @@ -63,7 +61,7 @@ impl ConsensusState { last_sl: 0, proposals: vec![], orphan_votes: vec![], - participants: BTreeMap::new(), + participants: FxIndexMap::with_hasher(FxBuildHasher::default()), pending_participants: vec![], }) } @@ -103,7 +101,6 @@ impl ValidatorState { let blockchain = Blockchain::new(db, genesis_ts, genesis_data)?; let unconfirmed_txs = vec![]; - // TODO: Maybe async rwlock? let state = Arc::new(RwLock::new(ValidatorState { id, secret, @@ -121,11 +118,11 @@ impl ValidatorState { /// protocol for transactions. pub fn append_tx(&mut self, tx: Tx) -> bool { if self.unconfirmed_txs.contains(&tx) { - warn!("consensus::state::append_tx(): We already have this tx"); + debug!("append_tx(): We already have this tx"); return false } - info!("consensus::state::append_tx(): Appended tx to mempool"); + debug!("append_tx(): Appended tx to mempool"); self.unconfirmed_txs.push(tx); true } @@ -153,13 +150,10 @@ impl ValidatorState { /// Leader calculation is based on how many nodes are participating /// in the network. pub fn epoch_leader(&mut self) -> u64 { - let epoch = self.current_epoch(); - let mut hasher = FxHasher::default(); - epoch.hash(&mut hasher); - self.zero_participants_check(); - // TODO: Division by zero possible - let pos = hasher.finish() % (self.consensus.participants.len() as u64); - self.consensus.participants.iter().nth(pos as usize).unwrap().1.id + let len = self.consensus.participants.len(); + assert!(len > 0); + let idx = rand::thread_rng().gen_range(0..len); + self.consensus.participants.get_index(idx).unwrap().1.id } /// Check if we're the current epoch leader @@ -176,22 +170,19 @@ impl ValidatorState { let unproposed_txs = self.unproposed_txs(); let metadata = Metadata::new( - get_current_time(), + Timestamp::current_time(), String::from("proof"), String::from("r"), String::from("s"), ); let sm = StreamletMetadata::new(self.consensus.participants.values().cloned().collect()); - - let signed_block = self.secret.sign( - &BlockProposal::to_proposal_hash(prev_hash, epoch, &unproposed_txs, &metadata) - .as_bytes()[..], - ); + let prop = BlockProposal::to_proposal_hash(prev_hash, epoch, &unproposed_txs, &metadata); + let signed_proposal = self.secret.sign(&prop.as_bytes()[..]); Ok(Some(BlockProposal::new( self.public, - signed_block, + signed_proposal, self.id, prev_hash, epoch, @@ -312,6 +303,7 @@ impl ValidatorState { }; if !self.extends_notarized_chain(chain) { + debug!("vote(): Proposal does not extend notarized chain"); return Ok(None) } @@ -340,13 +332,13 @@ impl ValidatorState { } if proposal.st == last.st && proposal.sl == last.sl { - warn!("Proposal already received"); + debug!("find_extended_chain_index(): Proposal already received"); return Ok(-2) } } if proposal.st != self.consensus.last_block || proposal.sl <= self.consensus.last_sl { - warn!("Proposal doesn't extend any known chain"); + debug!("find_extended_chain_index(): Proposal doesn't extend any known chain"); return Ok(-2) } @@ -366,17 +358,16 @@ impl ValidatorState { pub fn receive_vote(&mut self, vote: &Vote) -> Result { let mut encoded_proposal = vec![]; - let result = vote.proposal.encode(&mut encoded_proposal); - match result { + match vote.proposal.encode(&mut encoded_proposal) { Ok(_) => (), Err(e) => { - error!("Proposal encoding failed: {:?}", e); + error!(target: "consensus", "Proposal encoding failed: {:?}", e); return Ok(false) } }; if !vote.public_key.verify(&encoded_proposal, &vote.vote) { - warn!("Voter ({}), signature couldn't be verified", vote.id); + warn!(target: "consensus", "Voter ({}), signature couldn't be verified", vote.id); return Ok(false) } @@ -387,19 +378,26 @@ impl ValidatorState { match self.consensus.participants.get(&vote.id) { Some(participant) => { if self.current_epoch() <= participant.joined { - warn!("Voter ({}) joined after current epoch.", vote.id); + warn!(target: "consensus", "Voter ({}) joined after current epoch.", vote.id); return Ok(false) } } None => { - warn!("Voter ({}) is not a participant!", vote.id); + warn!(target: "consensus", "Voter ({}) is not a participant!", vote.id); return Ok(false) } } - let proposal = self.find_proposal(&vote.proposal)?; + let proposal = match self.find_proposal(&vote.proposal) { + Ok(v) => v, + Err(e) => { + error!(target: "consensus", "find_proposal() failed: {}", e); + return Err(e) + } + }; + if proposal.is_none() { - warn!("Received vote for unknown proposal."); + warn!(target: "consensus", "Received vote for unknown proposal."); if !self.consensus.orphan_votes.contains(vote) { self.consensus.orphan_votes.push(vote.clone()); } @@ -409,15 +407,22 @@ impl ValidatorState { let (proposal, chain_idx) = proposal.unwrap(); if proposal.sm.votes.contains(vote) { + debug!("receive_vote(): Already seen this proposal"); return Ok(false) } proposal.sm.votes.push(vote.clone()); if !proposal.sm.notarized && proposal.sm.votes.len() > (2 * node_count / 3) { - debug!("Notarized a block"); + debug!("receive_vote(): Notarized a block"); proposal.sm.notarized = true; - self.chain_finalization(chain_idx)?; + match self.chain_finalization(chain_idx) { + Ok(()) => {} + Err(e) => { + error!(target: "consensus", "Block finalization failed: {}", e); + return Err(e) + } + } } // Updating participant vote @@ -465,8 +470,12 @@ impl ValidatorState { /// starting by those proposals are removed. pub fn chain_finalization(&mut self, chain_index: i64) -> Result<()> { let chain = &mut self.consensus.proposals[chain_index as usize]; - let len = chain.proposals.len(); - if len < 3 { + + if chain.proposals.len() < 3 { + debug!( + "chain_finalization(): Less than 3 proposals in chain {}, nothing to finalize", + chain_index + ); return Ok(()) } @@ -481,6 +490,10 @@ impl ValidatorState { } if consecutive < 3 { + debug!( + "chain_finalization(): Less than 3 notarized blocks in chain {}, nothing to finalize", + chain_index + ); return Ok(()) } @@ -497,9 +510,14 @@ impl ValidatorState { chain.proposals.drain(0..(consecutive - 1)); - // Append to canonical chain - debug!(target: "consensus", "Adding finalized block to chain"); - let blockhashes = self.blockchain.add(&finalized)?; + info!(target: "consensus", "Adding finalized block to canonical chain"); + let blockhashes = match self.blockchain.add(&finalized) { + Ok(v) => v, + Err(e) => { + error!(target: "consensus", "Failed appending finalized blocks to canonical chain: {}", e); + return Err(e) + } + }; self.consensus.last_block = *blockhashes.last().unwrap(); self.consensus.last_sl = finalized.last().unwrap().sl; @@ -544,10 +562,15 @@ impl ValidatorState { /// some nodes have not pushed the initial participants in the map. pub fn zero_participants_check(&mut self) { if self.consensus.participants.is_empty() { + debug!("zero_participants_check(): Participants are empty, trying to add pending ones"); for participant in &self.consensus.pending_participants { self.consensus.participants.insert(participant.id, participant.clone()); } + if self.consensus.participants.is_empty() { + debug!("zero_participants_check(): Didn't manage to add any participant, pending were empty"); + } + self.consensus.pending_participants = Vec::new(); } } @@ -555,11 +578,17 @@ impl ValidatorState { /// Refresh the participants map, to retain only the active ones. /// Active nodes are considered those who joined or voted on a previous epoch. pub fn refresh_participants(&mut self) { - // Adding pending participants + debug!("refresh_participants(): Adding pending participants"); for participant in &self.consensus.pending_participants { self.consensus.participants.insert(participant.id, participant.clone()); } + if self.consensus.participants.is_empty() { + debug!( + "refresh_participants(): Didn't manage to add any participant, pending were empty" + ); + } + self.consensus.pending_participants = vec![]; let mut inactive = Vec::new(); diff --git a/src/consensus2/util.rs b/src/consensus2/util.rs index e46297c82..cd01409db 100644 --- a/src/consensus2/util.rs +++ b/src/consensus2/util.rs @@ -7,6 +7,11 @@ use crate::util::serial::{SerialDecodable, SerialEncodable}; pub struct Timestamp(pub i64); impl Timestamp { + /// Generate a `Timestamp` of the current time. + pub fn current_time() -> Self { + Self(Utc::now().timestamp()) + } + /// Calculates elapsed time of a `Timestamp`. pub fn elapsed(&self) -> u64 { let start_time = NaiveDateTime::from_timestamp(self.0, 0); @@ -15,8 +20,3 @@ impl Timestamp { diff.num_seconds() as u64 } } - -/// Generate a [`Timestamp`] of the current time. -pub fn get_current_time() -> Timestamp { - Timestamp(Utc::now().timestamp()) -}