diff --git a/Cargo.toml b/Cargo.toml index 8419a4a6f..44ac03838 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -206,8 +206,11 @@ blockchain = [ blockchain2 = [ "blake3", + "chrono", "sled", + "crypto", + "net", "util", ] diff --git a/src/blockchain2/blockstore.rs b/src/blockchain2/blockstore.rs index a8d22887a..238e76c0f 100644 --- a/src/blockchain2/blockstore.rs +++ b/src/blockchain2/blockstore.rs @@ -1,58 +1,54 @@ use sled::Batch; use crate::{ - util::serial::{deserialize, serialize, SerialDecodable, SerialEncodable}, + consensus2::{util::Timestamp, Block}, + util::serial::{deserialize, serialize}, Result, }; const SLED_BLOCK_TREE: &[u8] = b"_blocks"; -// TODO: The block structure should be as follows -#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] -pub struct Block { - /// Previous block hash (blake3) - pub st: blake3::Hash, - /// Slot UID, generated by the beacon - pub sl: u64, - /// Transaction hashes (blake3) - /// The actual transactions are in [`TxStore`] - pub txs: Vec, - /// Additional block information - pub metadata: String, -} - pub struct BlockStore(sled::Tree); impl BlockStore { - /// Opens a new or existing blockstore tree given a sled database. - pub fn new(db: &sled::Db) -> Result { + /// Opens a new or existing `BlockStore` on the given sled database. + /// The database is typically initialized as a global instance reference + /// with e.g. lazy_static. + pub fn new(db: &sled::Db, genesis_ts: Timestamp, genesis_data: blake3::Hash) -> Result { let tree = db.open_tree(SLED_BLOCK_TREE)?; - Ok(Self(tree)) + let store = Self(tree); + + // In case the store is empty, create the genesis block. + if store.0.is_empty() { + store.insert(&[Block::genesis_block(genesis_ts, genesis_data)])?; + } + + Ok(store) } - /// Insert a vector of [`Block`] into the blockstore. - /// The blocks are hashed with blake3 and this blockhash is used as - // the key, where value is the serialized block itself. - pub fn insert(&self, blocks: Vec) -> Result<()> { + /// Insert a slice of [`Block`] into the blockstore. With sled, the + /// operation is done as a batch. + /// The blocks are hashed with BLAKE3 and this blockhash is used as + /// the key, where value is the serialized block itself. + pub fn insert(&self, blocks: &[Block]) -> Result<()> { let mut batch = Batch::default(); - for i in &blocks { + for i in blocks { let serialized = serialize(i); let blockhash = blake3::hash(&serialized); batch.insert(blockhash.as_bytes(), serialized); } self.0.apply_batch(batch)?; - Ok(()) } /// Fetch given blockhashes from the blockstore. /// The resulting vector contains `Option` which is `Some` if the block /// was found in the blockstore, and `None`, if it has not. - pub fn get(&self, blockhashes: Vec) -> Result>> { + pub fn get(&self, blockhashes: &[blake3::Hash]) -> Result>> { let mut ret: Vec> = Vec::with_capacity(blockhashes.len()); - for i in &blockhashes { + for i in blockhashes { if let Some(found) = self.0.get(i.as_bytes())? { let block = deserialize(&found)?; ret.push(Some(block)); @@ -69,6 +65,7 @@ impl BlockStore { Ok(self.0.contains_key(blockhash.as_bytes())?) } + /* /// Fetch the first block in the tree, based on the Ord implementation for Vec. pub fn get_first(&self) -> Result> { if let Some(found) = self.0.first()? { @@ -131,4 +128,5 @@ impl BlockStore { self.0.range(start..end) } + */ } diff --git a/src/blockchain2/metadatastore.rs b/src/blockchain2/metadatastore.rs new file mode 100644 index 000000000..563a02725 --- /dev/null +++ b/src/blockchain2/metadatastore.rs @@ -0,0 +1,20 @@ +use crate::{consensus2::StreamletMetadata, util::serial::serialize, Result}; + +const SLED_STREAMLET_METADATA_TREE: &[u8] = b"_streamlet_metadata"; + +pub struct StreamletMetadataStore(sled::Tree); + +impl StreamletMetadataStore { + pub fn new(db: &sled::Db) -> Result { + let tree = db.open_tree(SLED_STREAMLET_METADATA_TREE)?; + Ok(Self(tree)) + } + + /// Insert [`StreamletMetadata`] into the `MetadataStore`. + /// The blockhash for the metadata is used as the key, + /// where value is the serialized metadata. + pub fn insert(&self, block: blake3::Hash, metadata: &StreamletMetadata) -> Result<()> { + self.0.insert(block.as_bytes(), serialize(metadata))?; + Ok(()) + } +} diff --git a/src/blockchain2/mod.rs b/src/blockchain2/mod.rs index 2a97fd1ba..93ea15938 100644 --- a/src/blockchain2/mod.rs +++ b/src/blockchain2/mod.rs @@ -1,38 +1,51 @@ use std::io; use crate::{ + consensus2::{block::BlockProposal, util::Timestamp}, impl_vec, util::serial::{Decodable, Encodable, ReadExt, VarInt, WriteExt}, Result, }; pub mod blockstore; -pub mod nfstore; -pub mod rootstore; -pub mod txstore; - use blockstore::BlockStore; + +pub mod metadatastore; +use metadatastore::StreamletMetadataStore; + +pub mod nfstore; use nfstore::NullifierStore; + +pub mod rootstore; use rootstore::RootStore; + +pub mod txstore; use txstore::TxStore; pub struct Blockchain { - pub db: sled::Db, + /// Blocks sled tree pub blocks: BlockStore, + /// Transactions sled tree pub transactions: TxStore, - pub nullifiers: NullifierStore, - pub merkle_roots: RootStore, + /// Streamlet metadata sled tree + pub streamlet_metadata: StreamletMetadataStore, + // TODO: + //pub nullifiers: NullifierStore, + //pub merkle_roots: RootStore, } impl Blockchain { - pub fn new(db_path: &str) -> Result { - let db = sled::open(db_path)?; - let blocks = BlockStore::new(&db)?; - let transactions = TxStore::new(&db)?; - let nullifiers = NullifierStore::new(&db)?; - let merkle_roots = RootStore::new(&db)?; + pub fn new(db: &sled::Db, genesis_ts: Timestamp, genesis_data: blake3::Hash) -> Result { + let blocks = BlockStore::new(db, genesis_ts, genesis_data)?; + let transactions = TxStore::new(db)?; + let streamlet_metadata = StreamletMetadataStore::new(db)?; - Ok(Self { db, blocks, transactions, nullifiers, merkle_roots }) + Ok(Self { blocks, transactions, streamlet_metadata }) + } + + /// Batch insert [`BlockProposal`]s. + pub fn add(&mut self, proposals: &[BlockProposal]) -> Result> { + todo!() } } diff --git a/src/consensus2/block.rs b/src/consensus2/block.rs new file mode 100644 index 000000000..0b4ee64e5 --- /dev/null +++ b/src/consensus2/block.rs @@ -0,0 +1,179 @@ +use std::io; + +use log::warn; + +use super::{util::Timestamp, Metadata, StreamletMetadata, Tx}; +use crate::{ + crypto::{keypair::PublicKey, schnorr::Signature}, + impl_vec, net, + util::serial::{serialize, Decodable, Encodable, SerialDecodable, SerialEncodable, VarInt}, + Result, +}; + +/// This struct represents a tuple of the form (`st`, `sl`, txs`, `metadata`). +#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] +pub struct Block { + /// Previous block hash + pub st: blake3::Hash, + /// Slot uid, generated by the beacon + pub sl: u64, + /// Transaction hashes + pub txs: Vec, + /// Additional block information + pub metadata: Metadata, +} + +impl Block { + pub fn new(st: blake3::Hash, sl: u64, txs: Vec, metadata: Metadata) -> Self { + Self { st, sl, txs, metadata } + } + + /// Generate the genesis block. + pub fn genesis_block(genesis_ts: Timestamp, genesis_data: blake3::Hash) -> Self { + let metadata = + Metadata::new(genesis_ts, String::from("proof"), String::from("r"), String::from("s")); + + Self::new(genesis_data, 0, vec![], metadata) + } +} + +/// This struct represents a block proposal, used for consensus. +#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] +pub struct BlockProposal { + /// Leader public key + pub public_key: PublicKey, + /// Block signature + pub signature: Signature, + /// Leader ID + pub id: u64, + /// Previous block hash + pub st: blake3::Hash, + /// Slot uid, generated by the beacon + pub sl: u64, + /// Transactions payload + pub txs: Vec, + /// Additional proposal information + pub metadata: Metadata, + /// Proposal information used by Streamlet consensus + pub sm: StreamletMetadata, +} + +impl BlockProposal { + #[allow(clippy::too_many_arguments)] + pub fn new( + public_key: PublicKey, + signature: Signature, + id: u64, + st: blake3::Hash, + sl: u64, + txs: Vec, + metadata: Metadata, + sm: StreamletMetadata, + ) -> Self { + Self { public_key, signature, id, st, sl, txs, metadata, sm } + } + + /// Produce proposal hash using `st`, `sl`, `txs`, and `metadata`. + pub fn hash(&self) -> blake3::Hash { + Self::to_proposal_hash(self.st, self.sl, &self.txs, &self.metadata) + } + + /// Generate a proposal hash using provided `st`, `sl`, `txs`, and `metadata`. + pub fn to_proposal_hash( + st: blake3::Hash, + sl: u64, + transactions: &[Tx], + metadata: &Metadata, + ) -> blake3::Hash { + let mut txs = Vec::with_capacity(transactions.len()); + for tx in transactions { + txs.push(blake3::hash(&serialize(tx))); + } + + blake3::hash(&serialize(&Block::new(st, sl, txs, metadata.clone()))) + } +} + +impl PartialEq for BlockProposal { + fn eq(&self, other: &Self) -> bool { + self.public_key == other.public_key && + self.signature == other.signature && + self.id == other.id && + self.st == other.st && + self.sl == other.sl && + self.txs == other.txs && + self.metadata == other.metadata + } +} + +impl net::Message for BlockProposal { + fn name() -> &'static str { + "proposal" + } +} + +impl_vec!(BlockProposal); + +/// This struct represents a sequence of block proposals. +#[derive(Debug, Clone, PartialEq, SerialEncodable, SerialDecodable)] +pub struct ProposalChain { + pub genesis: blake3::Hash, + pub proposals: Vec, +} + +impl ProposalChain { + pub fn new(genesis: blake3::Hash, initial_proposal: BlockProposal) -> Self { + Self { genesis, proposals: vec![initial_proposal] } + } + + /// A proposal is considered valid when its parent hash is equal to the + /// hash of the previous proposal and their epochs are incremental, + /// excluding the genesis block proposal. + /// Additional validity rules can be applied. + pub fn check_proposal(&self, proposal: &BlockProposal, previous: &BlockProposal) -> bool { + if proposal.st == self.genesis { + warn!("Genesis block proposal provided."); + return false + } + + let prev_hash = previous.hash(); + if proposal.st != prev_hash || proposal.sl <= previous.sl { + warn!("Provided proposal is invalid."); + return false + } + + true + } + + /// A proposals chain is considered valid when every proposal is valid, + /// based on the `check_proposal` function. + pub fn check_chain(&self) -> bool { + for (index, proposal) in self.proposals[1..].iter().enumerate() { + if !self.check_proposal(proposal, &self.proposals[index]) { + return false + } + } + + true + } + + /// Insertion of a valid proposal. + pub fn add(&mut self, proposal: &BlockProposal) { + if self.check_proposal(proposal, self.proposals.last().unwrap()) { + self.proposals.push(proposal.clone()); + } + } + + /// Proposals chain notarization check. + pub fn notarized(&self) -> bool { + for proposal in &self.proposals { + if !proposal.sm.notarized { + return false + } + } + + true + } +} + +impl_vec!(ProposalChain); diff --git a/src/consensus2/metadata.rs b/src/consensus2/metadata.rs new file mode 100644 index 000000000..0bd353394 --- /dev/null +++ b/src/consensus2/metadata.rs @@ -0,0 +1,56 @@ +use super::{util::Timestamp, Participant, Vote}; +use crate::util::serial::{SerialDecodable, SerialEncodable}; + +/// This struct represents additional [`Block`] information used by +/// the consensus protocol +#[derive(Debug, Clone, PartialEq, SerialEncodable, SerialDecodable)] +pub struct Metadata { + /// Block creation timestamp + pub timestamp: Timestamp, + /// Block information used by the Ouroboros Praos consensus + pub om: OuroborosMetadata, +} + +impl Metadata { + pub fn new(timestamp: Timestamp, proof: String, r: String, s: String) -> Self { + Self { timestamp, om: OuroborosMetadata::new(proof, r, s) } + } +} + +/// This struct represents [`Block`] information used by the Ouroboros +/// Praos consensus protocol. +#[derive(Debug, Clone, PartialEq, SerialEncodable, SerialDecodable)] +pub struct OuroborosMetadata { + /// Proof that the stakeholder is the block owner + pub proof: String, + /// Random seed for VRF + pub r: String, + /// Block owner signature + pub s: String, +} + +impl OuroborosMetadata { + pub fn new(proof: String, r: String, s: String) -> Self { + Self { proof, r, s } + } +} + +/// This struct represents [`Block`] information used by the Streamlet +/// consensus protocol. +#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] +pub struct StreamletMetadata { + /// Epoch votes + pub votes: Vec, + /// Block notarization flag + pub notarized: bool, + /// Block finalization flag + pub finalized: bool, + /// Nodes participated in the voting process + pub participants: Vec, +} + +impl StreamletMetadata { + pub fn new(participants: Vec) -> Self { + Self { votes: vec![], notarized: false, finalized: false, participants } + } +} diff --git a/src/consensus2/mod.rs b/src/consensus2/mod.rs new file mode 100644 index 000000000..84df310d4 --- /dev/null +++ b/src/consensus2/mod.rs @@ -0,0 +1,35 @@ +/// Block definition +pub mod block; +pub use block::{Block, ProposalChain}; + +/// Transactions +pub mod tx; +pub use tx::Tx; + +/// Consensus metadata +pub mod metadata; +pub use metadata::{Metadata, StreamletMetadata}; + +/// Consensus participant +pub mod participant; +pub use participant::Participant; + +/// Consensus vote +pub mod vote; +pub use vote::Vote; + +/// Consensus state +pub mod state; +pub use state::ValidatorState; + +/// Utility functions +pub mod util; + +use lazy_static::lazy_static; +lazy_static! { + /// Genesis hash for the mainnet chain + pub static ref MAINNET_GENESIS_HASH_BYTES: [u8; 32] = *blake3::hash(b"darkfi_mainnet").as_bytes(); + + /// Genesis hash for the testnet chain + pub static ref TESTNET_GENESIS_HASH_BYTES: [u8; 32] = *blake3::hash(b"darkfi_testnet").as_bytes(); +} diff --git a/src/consensus2/participant.rs b/src/consensus2/participant.rs new file mode 100644 index 000000000..0b5131c8b --- /dev/null +++ b/src/consensus2/participant.rs @@ -0,0 +1,56 @@ +use std::{collections::BTreeMap, io}; + +use crate::{ + impl_vec, net, + util::serial::{Decodable, Encodable, SerialDecodable, SerialEncodable, VarInt}, + Result, +}; + +/// This struct represents a tuple of the form: +/// (`node_id`, `epoch_joined`, `last_epoch_voted`) +#[derive(Debug, Clone, PartialEq, SerialEncodable, SerialDecodable)] +pub struct Participant { + // Node ID + pub id: u64, + /// Epoch node joined the network + pub joined: u64, + /// Last epoch node voted + pub voted: Option, +} + +impl Participant { + pub fn new(id: u64, joined: u64) -> Self { + Self { id, joined, voted: None } + } +} + +impl net::Message for Participant { + fn name() -> &'static str { + "participant" + } +} + +impl Encodable for BTreeMap { + fn encode(&self, mut s: S) -> Result { + let mut len = 0; + len += VarInt(self.len() as u64).encode(&mut s)?; + for c in self.iter() { + len += c.1.encode(&mut s)?; + } + Ok(len) + } +} + +impl Decodable for BTreeMap { + fn decode(mut d: D) -> Result { + let len = VarInt::decode(&mut d)?.0; + let mut ret = BTreeMap::new(); + for _ in 0..len { + let participant: Participant = Decodable::decode(&mut d)?; + ret.insert(participant.id, participant); + } + Ok(ret) + } +} + +impl_vec!(Participant); diff --git a/src/consensus2/state.rs b/src/consensus2/state.rs new file mode 100644 index 000000000..ce66cfbc9 --- /dev/null +++ b/src/consensus2/state.rs @@ -0,0 +1,584 @@ +// TODO: Pending participants can also be a map? + +use std::{ + collections::BTreeMap, + hash::{Hash, Hasher}, + sync::{Arc, RwLock}, + time::Duration, +}; + +use chrono::{NaiveDateTime, Utc}; +use fxhash::FxHasher; +use log::{debug, error, warn}; +use rand::rngs::OsRng; + +use super::{ + block::{Block, BlockProposal}, + util::{get_current_time, Timestamp}, + Metadata, Participant, ProposalChain, StreamletMetadata, Tx, Vote, +}; +use crate::{ + blockchain2::Blockchain, + crypto::{ + keypair::{PublicKey, SecretKey}, + schnorr::{SchnorrPublic, SchnorrSecret}, + }, + util::serial::{serialize, Encodable}, + Result, +}; + +const DELTA: u64 = 60; + +/// This struct represents the information required by the consensus algorithm +#[derive(Debug)] +pub struct ConsensusState { + /// Genesis block creation timestamp + pub genesis_ts: Timestamp, + /// Genesis block + pub genesis_block: blake3::Hash, + /// Last finalized block hash, + pub last_block: blake3::Hash, + /// Last finalized block slot, + pub last_sl: u64, + /// Fork chains containing block proposals + pub proposals: Vec, + /// Orphan votes pool, in case a vote reaches a node before the + /// corresponding block + pub orphan_votes: Vec, + /// Validators currently participating in the consensus + pub participants: BTreeMap, + /// Validators to be added on the next epoch as participants + pub pending_participants: Vec, +} + +impl ConsensusState { + pub fn new(genesis_ts: Timestamp, genesis_data: blake3::Hash) -> Result { + let genesis_block = + blake3::hash(&serialize(&Block::genesis_block(genesis_ts, genesis_data))); + + Ok(Self { + genesis_ts, + genesis_block, + last_block: genesis_block, + last_sl: 0, + proposals: vec![], + orphan_votes: vec![], + participants: BTreeMap::new(), + pending_participants: vec![], + }) + } +} + +/// Atomic pointer to validator state. +pub type ValidatorStatePtr = Arc>; + +/// This struct represents the state of a validator node. +pub struct ValidatorState { + /// Validator ID + pub id: u64, + /// Secret key, to sign messages + pub secret: SecretKey, + /// Validator public key + pub public: PublicKey, + /// Hot/Live data used by the consensus algorithm + pub consensus: ConsensusState, + /// Canonical (finalized) blockchain + pub blockchain: Blockchain, + /// Pending transactions + pub unconfirmed_txs: Vec, +} + +impl ValidatorState { + // TODO: Clock sync + // TODO: ID shouldn't be done like this + pub fn new( + db: &sled::Db, // <-- TODO: Avoid this with some wrapping, sled should only be in blockchain2 + id: u64, + genesis_ts: Timestamp, + genesis_data: blake3::Hash, + ) -> Result { + let secret = SecretKey::random(&mut OsRng); + let public = PublicKey::from_secret(secret); + let consensus = ConsensusState::new(genesis_ts, genesis_data)?; + let blockchain = Blockchain::new(db, genesis_ts, genesis_data)?; + let unconfirmed_txs = vec![]; + + // TODO: Maybe async rwlock? + let state = Arc::new(RwLock::new(ValidatorState { + id, + secret, + public, + consensus, + blockchain, + unconfirmed_txs, + })); + + Ok(state) + } + + /// The node retrieves a transaction and appends it to the unconfirmed + /// transactions list. Additional validity rules must be defined by the + /// protocol for transactions. + pub fn append_tx(&mut self, tx: Tx) -> bool { + if self.unconfirmed_txs.contains(&tx) { + warn!("consensus::state::append_tx(): We already have this tx"); + return false + } + + self.unconfirmed_txs.push(tx); + true + } + + /// Calculates current epoch, based on elapsed time from the genesis block. + /// Epoch duration is configured using the `DELTA` value. + pub fn current_epoch(&self) -> u64 { + self.consensus.genesis_ts.elapsed() / (2 * DELTA) + } + + /// Calculates seconds until next epoch starting time. + /// Epochs durationis configured using the delta value. + pub fn next_epoch_start(&self) -> Duration { + let start_time = NaiveDateTime::from_timestamp(self.consensus.genesis_ts.0, 0); + let current_epoch = self.current_epoch() + 1; + let next_epoch_start = (current_epoch * (2 * DELTA)) + (start_time.timestamp() as u64); + let next_epoch_start = NaiveDateTime::from_timestamp(next_epoch_start as i64, 0); + let current_time = NaiveDateTime::from_timestamp(Utc::now().timestamp(), 0); + let diff = next_epoch_start - current_time; + + Duration::new(diff.num_seconds().try_into().unwrap(), 0) + } + + /// Find epoch leader, using a simple hash method. + /// Leader calculation is based on how many nodes are participating + /// in the network. + pub fn epoch_leader(&mut self) -> u64 { + let epoch = self.current_epoch(); + let mut hasher = FxHasher::default(); + epoch.hash(&mut hasher); + self.zero_participants_check(); + let pos = hasher.finish() % (self.consensus.participants.len() as u64); + self.consensus.participants.iter().nth(pos as usize).unwrap().1.id + } + + /// Check if we're the current epoch leader + pub fn is_epoch_leader(&mut self) -> bool { + self.id == self.epoch_leader() + } + + /// Generate a block proposal for the current epoch, containing all + /// unconfirmed transactions. Proposal extends the longest notarized fork + /// chain the node is holding. + pub fn propose(&self) -> Result> { + let epoch = self.current_epoch(); + let prev_hash = self.longest_notarized_chain_last_hash().unwrap(); + let unproposed_txs = self.unproposed_txs(); + + let metadata = Metadata::new( + get_current_time(), + String::from("proof"), + String::from("r"), + String::from("s"), + ); + + let sm = StreamletMetadata::new(self.consensus.participants.values().cloned().collect()); + + let signed_block = self.secret.sign( + &BlockProposal::to_proposal_hash(prev_hash, epoch, &unproposed_txs, &metadata) + .as_bytes()[..], + ); + + Ok(Some(BlockProposal::new( + self.public, + signed_block, + self.id, + prev_hash, + epoch, + unproposed_txs, + metadata, + sm, + ))) + } + + /// Retrieve all unconfirmed transactions not proposed in previous blocks. + pub fn unproposed_txs(&self) -> Vec { + let mut unproposed_txs = self.unconfirmed_txs.clone(); + for chain in &self.consensus.proposals { + for proposal in &chain.proposals { + for tx in &proposal.txs { + if let Some(pos) = unproposed_txs.iter().position(|txs| *txs == *tx) { + unproposed_txs.remove(pos); + } + } + } + } + + unproposed_txs + } + + /// Finds the longest fully notarized blockchain the node holds and + /// returns the last block hash. + pub fn longest_notarized_chain_last_hash(&self) -> Result { + let hash = if !self.consensus.proposals.is_empty() { + let mut longest_notarized_chain = &self.consensus.proposals[0]; + let mut length = longest_notarized_chain.proposals.len(); + if self.consensus.proposals.len() > 1 { + for chain in &self.consensus.proposals[1..] { + if chain.notarized() && chain.proposals.len() > length { + length = chain.proposals.len(); + longest_notarized_chain = chain; + } + } + } + + longest_notarized_chain.proposals.last().unwrap().hash() + } else { + self.consensus.last_block + }; + + Ok(hash) + } + + /// Receive the proposed block, verify its sender (epoch leader), + /// and proceed with voting on it. + pub fn receive_proposal(&mut self, proposal: &BlockProposal) -> Result> { + let leader = self.epoch_leader(); + if leader != proposal.id { + warn!( + "Received proposal not from epoch leader ({}), but from ({})", + leader, proposal.id + ); + return Ok(None) + } + + if !proposal.public_key.verify( + BlockProposal::to_proposal_hash( + proposal.st, + proposal.sl, + &proposal.txs, + &proposal.metadata, + ) + .as_bytes(), + &proposal.signature, + ) { + warn!("Proposer ({}) signature could not be verified", proposal.id); + return Ok(None) + } + + self.vote(proposal) + } + + /// Given a proposal, the node finds which blockchain it extends. + /// If the proposal extends the canonical blockchain, a new fork chain + // is created. The node votes on the proposal only if it extends the + /// longest notarized fork chain it has seen. + pub fn vote(&mut self, proposal: &BlockProposal) -> Result> { + self.zero_participants_check(); + let mut proposal = proposal.clone(); + + // Generate proposal hash + let proposal_hash = proposal.hash(); + + // Add orphan votes + let mut orphans = Vec::new(); + for vote in self.consensus.orphan_votes.iter() { + if vote.proposal == proposal_hash { + proposal.sm.votes.push(vote.clone()); + orphans.push(vote.clone()); + } + } + + for vote in orphans { + self.consensus.orphan_votes.retain(|v| *v != vote); + } + + let index = self.find_extended_chain_index(&proposal)?; + + if index == -2 { + return Ok(None) + } + + let chain = match index { + -1 => { + let pc = ProposalChain::new(self.consensus.genesis_block, proposal.clone()); + self.consensus.proposals.push(pc); + self.consensus.proposals.last().unwrap() + } + _ => { + self.consensus.proposals[index as usize].add(&proposal); + &self.consensus.proposals[index as usize] + } + }; + + if !self.extends_notarized_chain(chain) { + return Ok(None) + } + + let signed_hash = self.secret.sign(&serialize(&proposal_hash)); + Ok(Some(Vote::new(self.public, signed_hash, proposal_hash, proposal.sl, self.id))) + } + + /// Verify if the provided chain is notarized excluding the last block. + pub fn extends_notarized_chain(&self, chain: &ProposalChain) -> bool { + for proposal in &chain.proposals[..(chain.proposals.len() - 1)] { + if !proposal.sm.notarized { + return false + } + } + + true + } + + /// Given a proposal, find the index of the chain it extends. + pub fn find_extended_chain_index(&self, proposal: &BlockProposal) -> Result { + for (index, chain) in self.consensus.proposals.iter().enumerate() { + let last = chain.proposals.last().unwrap(); + let hash = last.hash(); + if proposal.st == hash && proposal.sl > last.sl { + return Ok(index as i64) + } + + if proposal.st == last.st && proposal.sl == last.sl { + warn!("Proposal already received"); + return Ok(-2) + } + } + + if proposal.st != self.consensus.last_block || proposal.sl <= self.consensus.last_sl { + warn!("Proposal doesn't extend any known chain"); + return Ok(-2) + } + + Ok(-1) + } + + /// Receive a vote for a proposal. + /// First, sender is verified using their public key. + /// The proposal is then searched for in the node's fork chains. + /// If the vote wasn't received before, it is appended to the proposal + /// votes list. + /// When a node sees 2n/3 votes for a proposal, it notarizes it. + /// When a proposal gets notarized, the transactions it contains are + /// removed from the node's unconfirmed tx list. + /// Finally, we check if the notarization of the proposal can finalize + /// parent proposals in its chain. + pub fn receive_vote(&mut self, vote: &Vote) -> Result { + let mut encoded_proposal = vec![]; + + let result = vote.proposal.encode(&mut encoded_proposal); + match result { + Ok(_) => (), + Err(e) => { + error!("Proposal encoding failed: {:?}", e); + return Ok(false) + } + }; + + if !vote.public_key.verify(&encoded_proposal, &vote.vote) { + warn!("Voter ({}), signature couldn't be verified", vote.id); + return Ok(false) + } + + let node_count = self.consensus.participants.len(); + self.zero_participants_check(); + + // Checking that the voter can actually vote. + match self.consensus.participants.get(&vote.id) { + Some(participant) => { + if self.current_epoch() <= participant.joined { + warn!("Voter ({}) joined after current epoch.", vote.id); + return Ok(false) + } + } + None => { + warn!("Voter ({}) is not a participant!", vote.id); + return Ok(false) + } + } + + let proposal = self.find_proposal(&vote.proposal)?; + if proposal.is_none() { + warn!("Received vote for unknown proposal."); + if !self.consensus.orphan_votes.contains(vote) { + self.consensus.orphan_votes.push(vote.clone()); + } + + return Ok(false) + } + + let (proposal, chain_idx) = proposal.unwrap(); + if proposal.sm.votes.contains(vote) { + return Ok(false) + } + + proposal.sm.votes.push(vote.clone()); + + if !proposal.sm.notarized && proposal.sm.votes.len() > (2 * node_count / 3) { + debug!("Notarized a block"); + proposal.sm.notarized = true; + self.chain_finalization(chain_idx)?; + } + + // Updating participant vote + let mut participant = match self.consensus.participants.get(&vote.id) { + Some(p) => p.clone(), + None => Participant::new(vote.id, vote.sl), + }; + + match participant.voted { + Some(voted) => { + if vote.sl > voted { + participant.voted = Some(vote.sl); + } + } + None => participant.voted = Some(vote.sl), + } + + self.consensus.participants.insert(participant.id, participant); + Ok(true) + } + + /// Search the chains we're holding for the given proposal. + pub fn find_proposal( + &mut self, + vote_proposal: &blake3::Hash, + ) -> Result> { + for (index, chain) in &mut self.consensus.proposals.iter_mut().enumerate() { + for proposal in chain.proposals.iter_mut().rev() { + let proposal_hash = proposal.hash(); + if vote_proposal == &proposal_hash { + return Ok(Some((proposal, index as i64))) + } + } + } + + Ok(None) + } + + /// Provided an index, the node checks if the chain can be finalized. + /// Consensus finalization logic: + /// - If the node has observed the notarization of 3 consecutive + /// proposals in a fork chain, it finalizes (appends to canonical + /// blockchain) all proposals up to the middle block. + /// When fork chain proposals are finalized, the rest of fork chains not + /// starting by those proposals are removed. + pub fn chain_finalization(&mut self, chain_index: i64) -> Result<()> { + let chain = &mut self.consensus.proposals[chain_index as usize]; + let len = chain.proposals.len(); + if len < 3 { + return Ok(()) + } + + let mut consecutive = 0; + for proposal in &chain.proposals { + if proposal.sm.notarized { + consecutive += 1; + continue + } + + break + } + + if consecutive < 3 { + return Ok(()) + } + + let mut finalized = vec![]; + for proposal in &mut chain.proposals[..(consecutive - 1)] { + proposal.sm.finalized = true; + finalized.push(proposal.clone()); + for tx in proposal.txs.clone() { + if let Some(pos) = self.unconfirmed_txs.iter().position(|txs| *txs == tx) { + self.unconfirmed_txs.remove(pos); + } + } + } + + chain.proposals.drain(0..(consecutive - 1)); + + // Append to canonical chain + let blockhashes = self.blockchain.add(&finalized)?; + self.consensus.last_block = *blockhashes.last().unwrap(); + self.consensus.last_sl = finalized.last().unwrap().sl; + + let mut dropped = vec![]; + for chain in self.consensus.proposals.iter() { + let first = chain.proposals.first().unwrap(); + if first.st != self.consensus.last_block || first.sl <= self.consensus.last_sl { + dropped.push(chain.clone()); + } + } + + for chain in dropped { + self.consensus.proposals.retain(|c| *c != chain); + } + + // Remove orphan votes + let mut orphans = vec![]; + for vote in self.consensus.orphan_votes.iter() { + if vote.sl <= self.consensus.last_sl { + orphans.push(vote.clone()); + } + } + + for vote in orphans { + self.consensus.orphan_votes.retain(|v| *v != vote); + } + + Ok(()) + } + + /// Append a new participant to the pending participants list. + pub fn append_participant(&mut self, participant: Participant) -> bool { + if self.consensus.pending_participants.contains(&participant) { + return false + } + + self.consensus.pending_participants.push(participant); + true + } + + /// Prevent the extreme case scenario where network is initialized, but + /// some nodes have not pushed the initial participants in the map. + pub fn zero_participants_check(&mut self) { + if self.consensus.participants.is_empty() { + for participant in &self.consensus.pending_participants { + self.consensus.participants.insert(participant.id, participant.clone()); + } + + self.consensus.pending_participants = Vec::new(); + } + } + + /// Refresh the participants map, to retain only the active ones. + /// Active nodes are considered those who joined or voted on a previous epoch. + pub fn refresh_participants(&mut self) { + // Adding pending participants + for participant in &self.consensus.pending_participants { + self.consensus.participants.insert(participant.id, participant.clone()); + } + + self.consensus.pending_participants = vec![]; + + let mut inactive = Vec::new(); + let previous_epoch = self.current_epoch() - 1; + for (index, participant) in self.consensus.participants.clone().iter() { + match participant.voted { + Some(epoch) => { + if epoch < previous_epoch { + inactive.push(*index); + } + } + + None => { + if participant.joined < previous_epoch { + inactive.push(*index); + } + } + } + } + + for index in inactive { + self.consensus.participants.remove(&index); + } + } +} diff --git a/src/consensus2/tx.rs b/src/consensus2/tx.rs new file mode 100644 index 000000000..370d89231 --- /dev/null +++ b/src/consensus2/tx.rs @@ -0,0 +1,21 @@ +use std::io; + +use crate::{ + impl_vec, net, + util::serial::{Decodable, Encodable, SerialDecodable, SerialEncodable, VarInt}, + Result, +}; + +/// Temporary transaction representation. +#[derive(Debug, Clone, PartialEq, SerialEncodable, SerialDecodable)] +pub struct Tx { + pub payload: String, +} + +impl net::Message for Tx { + fn name() -> &'static str { + "tx" + } +} + +impl_vec!(Tx); diff --git a/src/consensus2/util.rs b/src/consensus2/util.rs new file mode 100644 index 000000000..e46297c82 --- /dev/null +++ b/src/consensus2/util.rs @@ -0,0 +1,22 @@ +use chrono::{NaiveDateTime, Utc}; + +use crate::util::serial::{SerialDecodable, SerialEncodable}; + +/// Wrapper struct to represent [`chrono`] UTC timestamps. +#[derive(Debug, Copy, Clone, PartialEq, SerialDecodable, SerialEncodable)] +pub struct Timestamp(pub i64); + +impl Timestamp { + /// Calculates elapsed time of a `Timestamp`. + pub fn elapsed(&self) -> u64 { + let start_time = NaiveDateTime::from_timestamp(self.0, 0); + let end_time = NaiveDateTime::from_timestamp(Utc::now().timestamp(), 0); + let diff = end_time - start_time; + diff.num_seconds() as u64 + } +} + +/// Generate a [`Timestamp`] of the current time. +pub fn get_current_time() -> Timestamp { + Timestamp(Utc::now().timestamp()) +} diff --git a/src/consensus2/vote.rs b/src/consensus2/vote.rs new file mode 100644 index 000000000..0f77942b9 --- /dev/null +++ b/src/consensus2/vote.rs @@ -0,0 +1,43 @@ +use std::io; + +use crate::{ + crypto::{keypair::PublicKey, schnorr::Signature}, + impl_vec, net, + util::serial::{Decodable, Encodable, SerialDecodable, SerialEncodable, VarInt}, + Result, +}; + +/// This struct represents a `Vote` used by the Streamlet consensus +#[derive(Debug, Clone, PartialEq, SerialDecodable, SerialEncodable)] +pub struct Vote { + /// Node public key + pub public_key: PublicKey, + /// Block signature + pub vote: Signature, + /// Block proposal hash to vote on + pub proposal: blake3::Hash, + /// Slot uid, generated by the beacon + pub sl: u64, + /// Node ID + pub id: u64, +} + +impl Vote { + pub fn new( + public_key: PublicKey, + vote: Signature, + proposal: blake3::Hash, + sl: u64, + id: u64, + ) -> Self { + Self { public_key, vote, proposal, sl, id } + } +} + +impl net::Message for Vote { + fn name() -> &'static str { + "vote" + } +} + +impl_vec!(Vote); diff --git a/src/lib.rs b/src/lib.rs index 372a0a1b8..71ad2fca5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,6 +10,9 @@ pub mod blockchain2; #[cfg(feature = "blockchain")] pub mod consensus; +#[cfg(feature = "blockchain2")] +pub mod consensus2; + #[cfg(feature = "wasm-runtime")] pub mod runtime;