consensus: Use FxIndexMap for participants and clean up.

This commit is contained in:
parazyd
2022-04-16 13:43:03 +02:00
parent d43d564813
commit 8fb6a83838
6 changed files with 97 additions and 66 deletions

View File

@@ -207,6 +207,7 @@ blockchain = [
blockchain2 = [
"blake3",
"chrono",
"indexmap",
"sled",
"crypto",

View File

@@ -1,8 +1,8 @@
use std::io;
use log::warn;
use log::debug;
use super::{util::Timestamp, Metadata, StreamletMetadata, Tx};
use super::{Metadata, StreamletMetadata, Timestamp, Tx};
use crate::{
crypto::{keypair::PublicKey, schnorr::Signature},
impl_vec, net,
@@ -117,13 +117,13 @@ impl_vec!(BlockProposal);
/// This struct represents a sequence of block proposals.
#[derive(Debug, Clone, PartialEq, SerialEncodable, SerialDecodable)]
pub struct ProposalChain {
pub genesis: blake3::Hash,
pub genesis_block: blake3::Hash,
pub proposals: Vec<BlockProposal>,
}
impl ProposalChain {
pub fn new(genesis: blake3::Hash, initial_proposal: BlockProposal) -> Self {
Self { genesis, proposals: vec![initial_proposal] }
pub fn new(genesis_block: blake3::Hash, initial_proposal: BlockProposal) -> Self {
Self { genesis_block, proposals: vec![initial_proposal] }
}
/// A proposal is considered valid when its parent hash is equal to the
@@ -131,14 +131,14 @@ impl ProposalChain {
/// excluding the genesis block proposal.
/// Additional validity rules can be applied.
pub fn check_proposal(&self, proposal: &BlockProposal, previous: &BlockProposal) -> bool {
if proposal.st == self.genesis {
warn!("Genesis block proposal provided.");
if proposal.st == self.genesis_block {
debug!("check_proposal(): Genesis block proposal provided.");
return false
}
let prev_hash = previous.hash();
if proposal.st != prev_hash || proposal.sl <= previous.sl {
warn!("Provided proposal is invalid.");
debug!("check_proposal(): Provided proposal is invalid.");
return false
}

View File

@@ -1,4 +1,4 @@
use super::{util::Timestamp, Participant, Vote};
use super::{Participant, Timestamp, Vote};
use crate::util::serial::{SerialDecodable, SerialEncodable};
/// This struct represents additional [`Block`] information used by

View File

@@ -1,6 +1,6 @@
/// Block definition
pub mod block;
pub use block::{Block, ProposalChain};
pub use block::{Block, BlockProposal, ProposalChain};
/// Transactions
pub mod tx;
@@ -20,10 +20,11 @@ pub use vote::Vote;
/// Consensus state
pub mod state;
pub use state::ValidatorState;
pub use state::{ValidatorState, ValidatorStatePtr};
/// Utility functions
/// Utility functions and types
pub mod util;
pub use util::Timestamp;
use lazy_static::lazy_static;
lazy_static! {

View File

@@ -1,21 +1,17 @@
// TODO: Use sets instead of vectors where possible.
use std::{
collections::BTreeMap,
hash::{Hash, Hasher},
time::Duration,
};
use std::time::Duration;
use async_std::sync::{Arc, RwLock};
use chrono::{NaiveDateTime, Utc};
use fxhash::FxHasher;
use fxhash::FxBuildHasher;
use indexmap::IndexMap;
use log::{debug, error, info, warn};
use rand::rngs::OsRng;
use rand::{rngs::OsRng, Rng};
use super::{
block::{Block, BlockProposal},
util::{get_current_time, Timestamp},
Metadata, Participant, ProposalChain, StreamletMetadata, Tx, Vote,
Block, BlockProposal, Metadata, Participant, ProposalChain, StreamletMetadata, Timestamp, Tx,
Vote,
};
use crate::{
blockchain2::Blockchain,
@@ -27,14 +23,16 @@ use crate::{
Result,
};
const DELTA: u64 = 5;
type FxIndexMap<K, V> = IndexMap<K, V, FxBuildHasher>;
const DELTA: u64 = 10;
/// This struct represents the information required by the consensus algorithm
#[derive(Debug)]
pub struct ConsensusState {
/// Genesis block creation timestamp
pub genesis_ts: Timestamp,
/// Genesis block
/// Genesis block hash
pub genesis_block: blake3::Hash,
/// Last finalized block hash,
pub last_block: blake3::Hash,
@@ -46,7 +44,7 @@ pub struct ConsensusState {
/// corresponding block
pub orphan_votes: Vec<Vote>,
/// Validators currently participating in the consensus
pub participants: BTreeMap<u64, Participant>,
pub participants: FxIndexMap<u64, Participant>,
/// Validators to be added on the next epoch as participants
pub pending_participants: Vec<Participant>,
}
@@ -63,7 +61,7 @@ impl ConsensusState {
last_sl: 0,
proposals: vec![],
orphan_votes: vec![],
participants: BTreeMap::new(),
participants: FxIndexMap::with_hasher(FxBuildHasher::default()),
pending_participants: vec![],
})
}
@@ -103,7 +101,6 @@ impl ValidatorState {
let blockchain = Blockchain::new(db, genesis_ts, genesis_data)?;
let unconfirmed_txs = vec![];
// TODO: Maybe async rwlock?
let state = Arc::new(RwLock::new(ValidatorState {
id,
secret,
@@ -121,11 +118,11 @@ impl ValidatorState {
/// protocol for transactions.
pub fn append_tx(&mut self, tx: Tx) -> bool {
if self.unconfirmed_txs.contains(&tx) {
warn!("consensus::state::append_tx(): We already have this tx");
debug!("append_tx(): We already have this tx");
return false
}
info!("consensus::state::append_tx(): Appended tx to mempool");
debug!("append_tx(): Appended tx to mempool");
self.unconfirmed_txs.push(tx);
true
}
@@ -153,13 +150,10 @@ impl ValidatorState {
/// Leader calculation is based on how many nodes are participating
/// in the network.
pub fn epoch_leader(&mut self) -> u64 {
let epoch = self.current_epoch();
let mut hasher = FxHasher::default();
epoch.hash(&mut hasher);
self.zero_participants_check();
// TODO: Division by zero possible
let pos = hasher.finish() % (self.consensus.participants.len() as u64);
self.consensus.participants.iter().nth(pos as usize).unwrap().1.id
let len = self.consensus.participants.len();
assert!(len > 0);
let idx = rand::thread_rng().gen_range(0..len);
self.consensus.participants.get_index(idx).unwrap().1.id
}
/// Check if we're the current epoch leader
@@ -176,22 +170,19 @@ impl ValidatorState {
let unproposed_txs = self.unproposed_txs();
let metadata = Metadata::new(
get_current_time(),
Timestamp::current_time(),
String::from("proof"),
String::from("r"),
String::from("s"),
);
let sm = StreamletMetadata::new(self.consensus.participants.values().cloned().collect());
let signed_block = self.secret.sign(
&BlockProposal::to_proposal_hash(prev_hash, epoch, &unproposed_txs, &metadata)
.as_bytes()[..],
);
let prop = BlockProposal::to_proposal_hash(prev_hash, epoch, &unproposed_txs, &metadata);
let signed_proposal = self.secret.sign(&prop.as_bytes()[..]);
Ok(Some(BlockProposal::new(
self.public,
signed_block,
signed_proposal,
self.id,
prev_hash,
epoch,
@@ -312,6 +303,7 @@ impl ValidatorState {
};
if !self.extends_notarized_chain(chain) {
debug!("vote(): Proposal does not extend notarized chain");
return Ok(None)
}
@@ -340,13 +332,13 @@ impl ValidatorState {
}
if proposal.st == last.st && proposal.sl == last.sl {
warn!("Proposal already received");
debug!("find_extended_chain_index(): Proposal already received");
return Ok(-2)
}
}
if proposal.st != self.consensus.last_block || proposal.sl <= self.consensus.last_sl {
warn!("Proposal doesn't extend any known chain");
debug!("find_extended_chain_index(): Proposal doesn't extend any known chain");
return Ok(-2)
}
@@ -366,17 +358,16 @@ impl ValidatorState {
pub fn receive_vote(&mut self, vote: &Vote) -> Result<bool> {
let mut encoded_proposal = vec![];
let result = vote.proposal.encode(&mut encoded_proposal);
match result {
match vote.proposal.encode(&mut encoded_proposal) {
Ok(_) => (),
Err(e) => {
error!("Proposal encoding failed: {:?}", e);
error!(target: "consensus", "Proposal encoding failed: {:?}", e);
return Ok(false)
}
};
if !vote.public_key.verify(&encoded_proposal, &vote.vote) {
warn!("Voter ({}), signature couldn't be verified", vote.id);
warn!(target: "consensus", "Voter ({}), signature couldn't be verified", vote.id);
return Ok(false)
}
@@ -387,19 +378,26 @@ impl ValidatorState {
match self.consensus.participants.get(&vote.id) {
Some(participant) => {
if self.current_epoch() <= participant.joined {
warn!("Voter ({}) joined after current epoch.", vote.id);
warn!(target: "consensus", "Voter ({}) joined after current epoch.", vote.id);
return Ok(false)
}
}
None => {
warn!("Voter ({}) is not a participant!", vote.id);
warn!(target: "consensus", "Voter ({}) is not a participant!", vote.id);
return Ok(false)
}
}
let proposal = self.find_proposal(&vote.proposal)?;
let proposal = match self.find_proposal(&vote.proposal) {
Ok(v) => v,
Err(e) => {
error!(target: "consensus", "find_proposal() failed: {}", e);
return Err(e)
}
};
if proposal.is_none() {
warn!("Received vote for unknown proposal.");
warn!(target: "consensus", "Received vote for unknown proposal.");
if !self.consensus.orphan_votes.contains(vote) {
self.consensus.orphan_votes.push(vote.clone());
}
@@ -409,15 +407,22 @@ impl ValidatorState {
let (proposal, chain_idx) = proposal.unwrap();
if proposal.sm.votes.contains(vote) {
debug!("receive_vote(): Already seen this proposal");
return Ok(false)
}
proposal.sm.votes.push(vote.clone());
if !proposal.sm.notarized && proposal.sm.votes.len() > (2 * node_count / 3) {
debug!("Notarized a block");
debug!("receive_vote(): Notarized a block");
proposal.sm.notarized = true;
self.chain_finalization(chain_idx)?;
match self.chain_finalization(chain_idx) {
Ok(()) => {}
Err(e) => {
error!(target: "consensus", "Block finalization failed: {}", e);
return Err(e)
}
}
}
// Updating participant vote
@@ -465,8 +470,12 @@ impl ValidatorState {
/// starting by those proposals are removed.
pub fn chain_finalization(&mut self, chain_index: i64) -> Result<()> {
let chain = &mut self.consensus.proposals[chain_index as usize];
let len = chain.proposals.len();
if len < 3 {
if chain.proposals.len() < 3 {
debug!(
"chain_finalization(): Less than 3 proposals in chain {}, nothing to finalize",
chain_index
);
return Ok(())
}
@@ -481,6 +490,10 @@ impl ValidatorState {
}
if consecutive < 3 {
debug!(
"chain_finalization(): Less than 3 notarized blocks in chain {}, nothing to finalize",
chain_index
);
return Ok(())
}
@@ -497,9 +510,14 @@ impl ValidatorState {
chain.proposals.drain(0..(consecutive - 1));
// Append to canonical chain
debug!(target: "consensus", "Adding finalized block to chain");
let blockhashes = self.blockchain.add(&finalized)?;
info!(target: "consensus", "Adding finalized block to canonical chain");
let blockhashes = match self.blockchain.add(&finalized) {
Ok(v) => v,
Err(e) => {
error!(target: "consensus", "Failed appending finalized blocks to canonical chain: {}", e);
return Err(e)
}
};
self.consensus.last_block = *blockhashes.last().unwrap();
self.consensus.last_sl = finalized.last().unwrap().sl;
@@ -544,10 +562,15 @@ impl ValidatorState {
/// some nodes have not pushed the initial participants in the map.
pub fn zero_participants_check(&mut self) {
if self.consensus.participants.is_empty() {
debug!("zero_participants_check(): Participants are empty, trying to add pending ones");
for participant in &self.consensus.pending_participants {
self.consensus.participants.insert(participant.id, participant.clone());
}
if self.consensus.participants.is_empty() {
debug!("zero_participants_check(): Didn't manage to add any participant, pending were empty");
}
self.consensus.pending_participants = Vec::new();
}
}
@@ -555,11 +578,17 @@ impl ValidatorState {
/// Refresh the participants map, to retain only the active ones.
/// Active nodes are considered those who joined or voted on a previous epoch.
pub fn refresh_participants(&mut self) {
// Adding pending participants
debug!("refresh_participants(): Adding pending participants");
for participant in &self.consensus.pending_participants {
self.consensus.participants.insert(participant.id, participant.clone());
}
if self.consensus.participants.is_empty() {
debug!(
"refresh_participants(): Didn't manage to add any participant, pending were empty"
);
}
self.consensus.pending_participants = vec![];
let mut inactive = Vec::new();

View File

@@ -7,6 +7,11 @@ use crate::util::serial::{SerialDecodable, SerialEncodable};
pub struct Timestamp(pub i64);
impl Timestamp {
/// Generate a `Timestamp` of the current time.
pub fn current_time() -> Self {
Self(Utc::now().timestamp())
}
/// Calculates elapsed time of a `Timestamp`.
pub fn elapsed(&self) -> u64 {
let start_time = NaiveDateTime::from_timestamp(self.0, 0);
@@ -15,8 +20,3 @@ impl Timestamp {
diff.num_seconds() as u64
}
}
/// Generate a [`Timestamp`] of the current time.
pub fn get_current_time() -> Timestamp {
Timestamp(Utc::now().timestamp())
}