consensus: Reviewed implementation.

This commit is contained in:
parazyd
2022-04-15 21:19:26 +02:00
parent b9aaa3bae1
commit cfd3e38baf
13 changed files with 1073 additions and 40 deletions

View File

@@ -206,8 +206,11 @@ blockchain = [
blockchain2 = [
"blake3",
"chrono",
"sled",
"crypto",
"net",
"util",
]

View File

@@ -1,58 +1,54 @@
use sled::Batch;
use crate::{
util::serial::{deserialize, serialize, SerialDecodable, SerialEncodable},
consensus2::{util::Timestamp, Block},
util::serial::{deserialize, serialize},
Result,
};
const SLED_BLOCK_TREE: &[u8] = b"_blocks";
// TODO: The block structure should be as follows
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct Block {
/// Previous block hash (blake3)
pub st: blake3::Hash,
/// Slot UID, generated by the beacon
pub sl: u64,
/// Transaction hashes (blake3)
/// The actual transactions are in [`TxStore`]
pub txs: Vec<blake3::Hash>,
/// Additional block information
pub metadata: String,
}
pub struct BlockStore(sled::Tree);
impl BlockStore {
/// Opens a new or existing blockstore tree given a sled database.
pub fn new(db: &sled::Db) -> Result<Self> {
/// Opens a new or existing `BlockStore` on the given sled database.
/// The database is typically initialized as a global instance reference
/// with e.g. lazy_static.
pub fn new(db: &sled::Db, genesis_ts: Timestamp, genesis_data: blake3::Hash) -> Result<Self> {
let tree = db.open_tree(SLED_BLOCK_TREE)?;
Ok(Self(tree))
let store = Self(tree);
// In case the store is empty, create the genesis block.
if store.0.is_empty() {
store.insert(&[Block::genesis_block(genesis_ts, genesis_data)])?;
}
Ok(store)
}
/// Insert a vector of [`Block`] into the blockstore.
/// The blocks are hashed with blake3 and this blockhash is used as
// the key, where value is the serialized block itself.
pub fn insert(&self, blocks: Vec<Block>) -> Result<()> {
/// Insert a slice of [`Block`] into the blockstore. With sled, the
/// operation is done as a batch.
/// The blocks are hashed with BLAKE3 and this blockhash is used as
/// the key, where value is the serialized block itself.
pub fn insert(&self, blocks: &[Block]) -> Result<()> {
let mut batch = Batch::default();
for i in &blocks {
for i in blocks {
let serialized = serialize(i);
let blockhash = blake3::hash(&serialized);
batch.insert(blockhash.as_bytes(), serialized);
}
self.0.apply_batch(batch)?;
Ok(())
}
/// Fetch given blockhashes from the blockstore.
/// The resulting vector contains `Option` which is `Some` if the block
/// was found in the blockstore, and `None`, if it has not.
pub fn get(&self, blockhashes: Vec<blake3::Hash>) -> Result<Vec<Option<Block>>> {
pub fn get(&self, blockhashes: &[blake3::Hash]) -> Result<Vec<Option<Block>>> {
let mut ret: Vec<Option<Block>> = Vec::with_capacity(blockhashes.len());
for i in &blockhashes {
for i in blockhashes {
if let Some(found) = self.0.get(i.as_bytes())? {
let block = deserialize(&found)?;
ret.push(Some(block));
@@ -69,6 +65,7 @@ impl BlockStore {
Ok(self.0.contains_key(blockhash.as_bytes())?)
}
/*
/// Fetch the first block in the tree, based on the Ord implementation for Vec<u8>.
pub fn get_first(&self) -> Result<Option<(blake3::Hash, Block)>> {
if let Some(found) = self.0.first()? {
@@ -131,4 +128,5 @@ impl BlockStore {
self.0.range(start..end)
}
*/
}

View File

@@ -0,0 +1,20 @@
use crate::{consensus2::StreamletMetadata, util::serial::serialize, Result};
const SLED_STREAMLET_METADATA_TREE: &[u8] = b"_streamlet_metadata";
pub struct StreamletMetadataStore(sled::Tree);
impl StreamletMetadataStore {
pub fn new(db: &sled::Db) -> Result<Self> {
let tree = db.open_tree(SLED_STREAMLET_METADATA_TREE)?;
Ok(Self(tree))
}
/// Insert [`StreamletMetadata`] into the `MetadataStore`.
/// The blockhash for the metadata is used as the key,
/// where value is the serialized metadata.
pub fn insert(&self, block: blake3::Hash, metadata: &StreamletMetadata) -> Result<()> {
self.0.insert(block.as_bytes(), serialize(metadata))?;
Ok(())
}
}

View File

@@ -1,38 +1,51 @@
use std::io;
use crate::{
consensus2::{block::BlockProposal, util::Timestamp},
impl_vec,
util::serial::{Decodable, Encodable, ReadExt, VarInt, WriteExt},
Result,
};
pub mod blockstore;
pub mod nfstore;
pub mod rootstore;
pub mod txstore;
use blockstore::BlockStore;
pub mod metadatastore;
use metadatastore::StreamletMetadataStore;
pub mod nfstore;
use nfstore::NullifierStore;
pub mod rootstore;
use rootstore::RootStore;
pub mod txstore;
use txstore::TxStore;
pub struct Blockchain {
pub db: sled::Db,
/// Blocks sled tree
pub blocks: BlockStore,
/// Transactions sled tree
pub transactions: TxStore,
pub nullifiers: NullifierStore,
pub merkle_roots: RootStore,
/// Streamlet metadata sled tree
pub streamlet_metadata: StreamletMetadataStore,
// TODO:
//pub nullifiers: NullifierStore,
//pub merkle_roots: RootStore,
}
impl Blockchain {
pub fn new(db_path: &str) -> Result<Self> {
let db = sled::open(db_path)?;
let blocks = BlockStore::new(&db)?;
let transactions = TxStore::new(&db)?;
let nullifiers = NullifierStore::new(&db)?;
let merkle_roots = RootStore::new(&db)?;
pub fn new(db: &sled::Db, genesis_ts: Timestamp, genesis_data: blake3::Hash) -> Result<Self> {
let blocks = BlockStore::new(db, genesis_ts, genesis_data)?;
let transactions = TxStore::new(db)?;
let streamlet_metadata = StreamletMetadataStore::new(db)?;
Ok(Self { db, blocks, transactions, nullifiers, merkle_roots })
Ok(Self { blocks, transactions, streamlet_metadata })
}
/// Batch insert [`BlockProposal`]s.
pub fn add(&mut self, proposals: &[BlockProposal]) -> Result<Vec<blake3::Hash>> {
todo!()
}
}

179
src/consensus2/block.rs Normal file
View File

@@ -0,0 +1,179 @@
use std::io;
use log::warn;
use super::{util::Timestamp, Metadata, StreamletMetadata, Tx};
use crate::{
crypto::{keypair::PublicKey, schnorr::Signature},
impl_vec, net,
util::serial::{serialize, Decodable, Encodable, SerialDecodable, SerialEncodable, VarInt},
Result,
};
/// This struct represents a tuple of the form (`st`, `sl`, txs`, `metadata`).
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct Block {
/// Previous block hash
pub st: blake3::Hash,
/// Slot uid, generated by the beacon
pub sl: u64,
/// Transaction hashes
pub txs: Vec<blake3::Hash>,
/// Additional block information
pub metadata: Metadata,
}
impl Block {
pub fn new(st: blake3::Hash, sl: u64, txs: Vec<blake3::Hash>, metadata: Metadata) -> Self {
Self { st, sl, txs, metadata }
}
/// Generate the genesis block.
pub fn genesis_block(genesis_ts: Timestamp, genesis_data: blake3::Hash) -> Self {
let metadata =
Metadata::new(genesis_ts, String::from("proof"), String::from("r"), String::from("s"));
Self::new(genesis_data, 0, vec![], metadata)
}
}
/// This struct represents a block proposal, used for consensus.
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct BlockProposal {
/// Leader public key
pub public_key: PublicKey,
/// Block signature
pub signature: Signature,
/// Leader ID
pub id: u64,
/// Previous block hash
pub st: blake3::Hash,
/// Slot uid, generated by the beacon
pub sl: u64,
/// Transactions payload
pub txs: Vec<Tx>,
/// Additional proposal information
pub metadata: Metadata,
/// Proposal information used by Streamlet consensus
pub sm: StreamletMetadata,
}
impl BlockProposal {
#[allow(clippy::too_many_arguments)]
pub fn new(
public_key: PublicKey,
signature: Signature,
id: u64,
st: blake3::Hash,
sl: u64,
txs: Vec<Tx>,
metadata: Metadata,
sm: StreamletMetadata,
) -> Self {
Self { public_key, signature, id, st, sl, txs, metadata, sm }
}
/// Produce proposal hash using `st`, `sl`, `txs`, and `metadata`.
pub fn hash(&self) -> blake3::Hash {
Self::to_proposal_hash(self.st, self.sl, &self.txs, &self.metadata)
}
/// Generate a proposal hash using provided `st`, `sl`, `txs`, and `metadata`.
pub fn to_proposal_hash(
st: blake3::Hash,
sl: u64,
transactions: &[Tx],
metadata: &Metadata,
) -> blake3::Hash {
let mut txs = Vec::with_capacity(transactions.len());
for tx in transactions {
txs.push(blake3::hash(&serialize(tx)));
}
blake3::hash(&serialize(&Block::new(st, sl, txs, metadata.clone())))
}
}
impl PartialEq for BlockProposal {
fn eq(&self, other: &Self) -> bool {
self.public_key == other.public_key &&
self.signature == other.signature &&
self.id == other.id &&
self.st == other.st &&
self.sl == other.sl &&
self.txs == other.txs &&
self.metadata == other.metadata
}
}
impl net::Message for BlockProposal {
fn name() -> &'static str {
"proposal"
}
}
impl_vec!(BlockProposal);
/// This struct represents a sequence of block proposals.
#[derive(Debug, Clone, PartialEq, SerialEncodable, SerialDecodable)]
pub struct ProposalChain {
pub genesis: blake3::Hash,
pub proposals: Vec<BlockProposal>,
}
impl ProposalChain {
pub fn new(genesis: blake3::Hash, initial_proposal: BlockProposal) -> Self {
Self { genesis, proposals: vec![initial_proposal] }
}
/// A proposal is considered valid when its parent hash is equal to the
/// hash of the previous proposal and their epochs are incremental,
/// excluding the genesis block proposal.
/// Additional validity rules can be applied.
pub fn check_proposal(&self, proposal: &BlockProposal, previous: &BlockProposal) -> bool {
if proposal.st == self.genesis {
warn!("Genesis block proposal provided.");
return false
}
let prev_hash = previous.hash();
if proposal.st != prev_hash || proposal.sl <= previous.sl {
warn!("Provided proposal is invalid.");
return false
}
true
}
/// A proposals chain is considered valid when every proposal is valid,
/// based on the `check_proposal` function.
pub fn check_chain(&self) -> bool {
for (index, proposal) in self.proposals[1..].iter().enumerate() {
if !self.check_proposal(proposal, &self.proposals[index]) {
return false
}
}
true
}
/// Insertion of a valid proposal.
pub fn add(&mut self, proposal: &BlockProposal) {
if self.check_proposal(proposal, self.proposals.last().unwrap()) {
self.proposals.push(proposal.clone());
}
}
/// Proposals chain notarization check.
pub fn notarized(&self) -> bool {
for proposal in &self.proposals {
if !proposal.sm.notarized {
return false
}
}
true
}
}
impl_vec!(ProposalChain);

View File

@@ -0,0 +1,56 @@
use super::{util::Timestamp, Participant, Vote};
use crate::util::serial::{SerialDecodable, SerialEncodable};
/// This struct represents additional [`Block`] information used by
/// the consensus protocol
#[derive(Debug, Clone, PartialEq, SerialEncodable, SerialDecodable)]
pub struct Metadata {
/// Block creation timestamp
pub timestamp: Timestamp,
/// Block information used by the Ouroboros Praos consensus
pub om: OuroborosMetadata,
}
impl Metadata {
pub fn new(timestamp: Timestamp, proof: String, r: String, s: String) -> Self {
Self { timestamp, om: OuroborosMetadata::new(proof, r, s) }
}
}
/// This struct represents [`Block`] information used by the Ouroboros
/// Praos consensus protocol.
#[derive(Debug, Clone, PartialEq, SerialEncodable, SerialDecodable)]
pub struct OuroborosMetadata {
/// Proof that the stakeholder is the block owner
pub proof: String,
/// Random seed for VRF
pub r: String,
/// Block owner signature
pub s: String,
}
impl OuroborosMetadata {
pub fn new(proof: String, r: String, s: String) -> Self {
Self { proof, r, s }
}
}
/// This struct represents [`Block`] information used by the Streamlet
/// consensus protocol.
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct StreamletMetadata {
/// Epoch votes
pub votes: Vec<Vote>,
/// Block notarization flag
pub notarized: bool,
/// Block finalization flag
pub finalized: bool,
/// Nodes participated in the voting process
pub participants: Vec<Participant>,
}
impl StreamletMetadata {
pub fn new(participants: Vec<Participant>) -> Self {
Self { votes: vec![], notarized: false, finalized: false, participants }
}
}

35
src/consensus2/mod.rs Normal file
View File

@@ -0,0 +1,35 @@
/// Block definition
pub mod block;
pub use block::{Block, ProposalChain};
/// Transactions
pub mod tx;
pub use tx::Tx;
/// Consensus metadata
pub mod metadata;
pub use metadata::{Metadata, StreamletMetadata};
/// Consensus participant
pub mod participant;
pub use participant::Participant;
/// Consensus vote
pub mod vote;
pub use vote::Vote;
/// Consensus state
pub mod state;
pub use state::ValidatorState;
/// Utility functions
pub mod util;
use lazy_static::lazy_static;
lazy_static! {
/// Genesis hash for the mainnet chain
pub static ref MAINNET_GENESIS_HASH_BYTES: [u8; 32] = *blake3::hash(b"darkfi_mainnet").as_bytes();
/// Genesis hash for the testnet chain
pub static ref TESTNET_GENESIS_HASH_BYTES: [u8; 32] = *blake3::hash(b"darkfi_testnet").as_bytes();
}

View File

@@ -0,0 +1,56 @@
use std::{collections::BTreeMap, io};
use crate::{
impl_vec, net,
util::serial::{Decodable, Encodable, SerialDecodable, SerialEncodable, VarInt},
Result,
};
/// This struct represents a tuple of the form:
/// (`node_id`, `epoch_joined`, `last_epoch_voted`)
#[derive(Debug, Clone, PartialEq, SerialEncodable, SerialDecodable)]
pub struct Participant {
// Node ID
pub id: u64,
/// Epoch node joined the network
pub joined: u64,
/// Last epoch node voted
pub voted: Option<u64>,
}
impl Participant {
pub fn new(id: u64, joined: u64) -> Self {
Self { id, joined, voted: None }
}
}
impl net::Message for Participant {
fn name() -> &'static str {
"participant"
}
}
impl Encodable for BTreeMap<u64, Participant> {
fn encode<S: io::Write>(&self, mut s: S) -> Result<usize> {
let mut len = 0;
len += VarInt(self.len() as u64).encode(&mut s)?;
for c in self.iter() {
len += c.1.encode(&mut s)?;
}
Ok(len)
}
}
impl Decodable for BTreeMap<u64, Participant> {
fn decode<D: io::Read>(mut d: D) -> Result<Self> {
let len = VarInt::decode(&mut d)?.0;
let mut ret = BTreeMap::new();
for _ in 0..len {
let participant: Participant = Decodable::decode(&mut d)?;
ret.insert(participant.id, participant);
}
Ok(ret)
}
}
impl_vec!(Participant);

584
src/consensus2/state.rs Normal file
View File

@@ -0,0 +1,584 @@
// TODO: Pending participants can also be a map?
use std::{
collections::BTreeMap,
hash::{Hash, Hasher},
sync::{Arc, RwLock},
time::Duration,
};
use chrono::{NaiveDateTime, Utc};
use fxhash::FxHasher;
use log::{debug, error, warn};
use rand::rngs::OsRng;
use super::{
block::{Block, BlockProposal},
util::{get_current_time, Timestamp},
Metadata, Participant, ProposalChain, StreamletMetadata, Tx, Vote,
};
use crate::{
blockchain2::Blockchain,
crypto::{
keypair::{PublicKey, SecretKey},
schnorr::{SchnorrPublic, SchnorrSecret},
},
util::serial::{serialize, Encodable},
Result,
};
const DELTA: u64 = 60;
/// This struct represents the information required by the consensus algorithm
#[derive(Debug)]
pub struct ConsensusState {
/// Genesis block creation timestamp
pub genesis_ts: Timestamp,
/// Genesis block
pub genesis_block: blake3::Hash,
/// Last finalized block hash,
pub last_block: blake3::Hash,
/// Last finalized block slot,
pub last_sl: u64,
/// Fork chains containing block proposals
pub proposals: Vec<ProposalChain>,
/// Orphan votes pool, in case a vote reaches a node before the
/// corresponding block
pub orphan_votes: Vec<Vote>,
/// Validators currently participating in the consensus
pub participants: BTreeMap<u64, Participant>,
/// Validators to be added on the next epoch as participants
pub pending_participants: Vec<Participant>,
}
impl ConsensusState {
pub fn new(genesis_ts: Timestamp, genesis_data: blake3::Hash) -> Result<Self> {
let genesis_block =
blake3::hash(&serialize(&Block::genesis_block(genesis_ts, genesis_data)));
Ok(Self {
genesis_ts,
genesis_block,
last_block: genesis_block,
last_sl: 0,
proposals: vec![],
orphan_votes: vec![],
participants: BTreeMap::new(),
pending_participants: vec![],
})
}
}
/// Atomic pointer to validator state.
pub type ValidatorStatePtr = Arc<RwLock<ValidatorState>>;
/// This struct represents the state of a validator node.
pub struct ValidatorState {
/// Validator ID
pub id: u64,
/// Secret key, to sign messages
pub secret: SecretKey,
/// Validator public key
pub public: PublicKey,
/// Hot/Live data used by the consensus algorithm
pub consensus: ConsensusState,
/// Canonical (finalized) blockchain
pub blockchain: Blockchain,
/// Pending transactions
pub unconfirmed_txs: Vec<Tx>,
}
impl ValidatorState {
// TODO: Clock sync
// TODO: ID shouldn't be done like this
pub fn new(
db: &sled::Db, // <-- TODO: Avoid this with some wrapping, sled should only be in blockchain2
id: u64,
genesis_ts: Timestamp,
genesis_data: blake3::Hash,
) -> Result<ValidatorStatePtr> {
let secret = SecretKey::random(&mut OsRng);
let public = PublicKey::from_secret(secret);
let consensus = ConsensusState::new(genesis_ts, genesis_data)?;
let blockchain = Blockchain::new(db, genesis_ts, genesis_data)?;
let unconfirmed_txs = vec![];
// TODO: Maybe async rwlock?
let state = Arc::new(RwLock::new(ValidatorState {
id,
secret,
public,
consensus,
blockchain,
unconfirmed_txs,
}));
Ok(state)
}
/// The node retrieves a transaction and appends it to the unconfirmed
/// transactions list. Additional validity rules must be defined by the
/// protocol for transactions.
pub fn append_tx(&mut self, tx: Tx) -> bool {
if self.unconfirmed_txs.contains(&tx) {
warn!("consensus::state::append_tx(): We already have this tx");
return false
}
self.unconfirmed_txs.push(tx);
true
}
/// Calculates current epoch, based on elapsed time from the genesis block.
/// Epoch duration is configured using the `DELTA` value.
pub fn current_epoch(&self) -> u64 {
self.consensus.genesis_ts.elapsed() / (2 * DELTA)
}
/// Calculates seconds until next epoch starting time.
/// Epochs durationis configured using the delta value.
pub fn next_epoch_start(&self) -> Duration {
let start_time = NaiveDateTime::from_timestamp(self.consensus.genesis_ts.0, 0);
let current_epoch = self.current_epoch() + 1;
let next_epoch_start = (current_epoch * (2 * DELTA)) + (start_time.timestamp() as u64);
let next_epoch_start = NaiveDateTime::from_timestamp(next_epoch_start as i64, 0);
let current_time = NaiveDateTime::from_timestamp(Utc::now().timestamp(), 0);
let diff = next_epoch_start - current_time;
Duration::new(diff.num_seconds().try_into().unwrap(), 0)
}
/// Find epoch leader, using a simple hash method.
/// Leader calculation is based on how many nodes are participating
/// in the network.
pub fn epoch_leader(&mut self) -> u64 {
let epoch = self.current_epoch();
let mut hasher = FxHasher::default();
epoch.hash(&mut hasher);
self.zero_participants_check();
let pos = hasher.finish() % (self.consensus.participants.len() as u64);
self.consensus.participants.iter().nth(pos as usize).unwrap().1.id
}
/// Check if we're the current epoch leader
pub fn is_epoch_leader(&mut self) -> bool {
self.id == self.epoch_leader()
}
/// Generate a block proposal for the current epoch, containing all
/// unconfirmed transactions. Proposal extends the longest notarized fork
/// chain the node is holding.
pub fn propose(&self) -> Result<Option<BlockProposal>> {
let epoch = self.current_epoch();
let prev_hash = self.longest_notarized_chain_last_hash().unwrap();
let unproposed_txs = self.unproposed_txs();
let metadata = Metadata::new(
get_current_time(),
String::from("proof"),
String::from("r"),
String::from("s"),
);
let sm = StreamletMetadata::new(self.consensus.participants.values().cloned().collect());
let signed_block = self.secret.sign(
&BlockProposal::to_proposal_hash(prev_hash, epoch, &unproposed_txs, &metadata)
.as_bytes()[..],
);
Ok(Some(BlockProposal::new(
self.public,
signed_block,
self.id,
prev_hash,
epoch,
unproposed_txs,
metadata,
sm,
)))
}
/// Retrieve all unconfirmed transactions not proposed in previous blocks.
pub fn unproposed_txs(&self) -> Vec<Tx> {
let mut unproposed_txs = self.unconfirmed_txs.clone();
for chain in &self.consensus.proposals {
for proposal in &chain.proposals {
for tx in &proposal.txs {
if let Some(pos) = unproposed_txs.iter().position(|txs| *txs == *tx) {
unproposed_txs.remove(pos);
}
}
}
}
unproposed_txs
}
/// Finds the longest fully notarized blockchain the node holds and
/// returns the last block hash.
pub fn longest_notarized_chain_last_hash(&self) -> Result<blake3::Hash> {
let hash = if !self.consensus.proposals.is_empty() {
let mut longest_notarized_chain = &self.consensus.proposals[0];
let mut length = longest_notarized_chain.proposals.len();
if self.consensus.proposals.len() > 1 {
for chain in &self.consensus.proposals[1..] {
if chain.notarized() && chain.proposals.len() > length {
length = chain.proposals.len();
longest_notarized_chain = chain;
}
}
}
longest_notarized_chain.proposals.last().unwrap().hash()
} else {
self.consensus.last_block
};
Ok(hash)
}
/// Receive the proposed block, verify its sender (epoch leader),
/// and proceed with voting on it.
pub fn receive_proposal(&mut self, proposal: &BlockProposal) -> Result<Option<Vote>> {
let leader = self.epoch_leader();
if leader != proposal.id {
warn!(
"Received proposal not from epoch leader ({}), but from ({})",
leader, proposal.id
);
return Ok(None)
}
if !proposal.public_key.verify(
BlockProposal::to_proposal_hash(
proposal.st,
proposal.sl,
&proposal.txs,
&proposal.metadata,
)
.as_bytes(),
&proposal.signature,
) {
warn!("Proposer ({}) signature could not be verified", proposal.id);
return Ok(None)
}
self.vote(proposal)
}
/// Given a proposal, the node finds which blockchain it extends.
/// If the proposal extends the canonical blockchain, a new fork chain
// is created. The node votes on the proposal only if it extends the
/// longest notarized fork chain it has seen.
pub fn vote(&mut self, proposal: &BlockProposal) -> Result<Option<Vote>> {
self.zero_participants_check();
let mut proposal = proposal.clone();
// Generate proposal hash
let proposal_hash = proposal.hash();
// Add orphan votes
let mut orphans = Vec::new();
for vote in self.consensus.orphan_votes.iter() {
if vote.proposal == proposal_hash {
proposal.sm.votes.push(vote.clone());
orphans.push(vote.clone());
}
}
for vote in orphans {
self.consensus.orphan_votes.retain(|v| *v != vote);
}
let index = self.find_extended_chain_index(&proposal)?;
if index == -2 {
return Ok(None)
}
let chain = match index {
-1 => {
let pc = ProposalChain::new(self.consensus.genesis_block, proposal.clone());
self.consensus.proposals.push(pc);
self.consensus.proposals.last().unwrap()
}
_ => {
self.consensus.proposals[index as usize].add(&proposal);
&self.consensus.proposals[index as usize]
}
};
if !self.extends_notarized_chain(chain) {
return Ok(None)
}
let signed_hash = self.secret.sign(&serialize(&proposal_hash));
Ok(Some(Vote::new(self.public, signed_hash, proposal_hash, proposal.sl, self.id)))
}
/// Verify if the provided chain is notarized excluding the last block.
pub fn extends_notarized_chain(&self, chain: &ProposalChain) -> bool {
for proposal in &chain.proposals[..(chain.proposals.len() - 1)] {
if !proposal.sm.notarized {
return false
}
}
true
}
/// Given a proposal, find the index of the chain it extends.
pub fn find_extended_chain_index(&self, proposal: &BlockProposal) -> Result<i64> {
for (index, chain) in self.consensus.proposals.iter().enumerate() {
let last = chain.proposals.last().unwrap();
let hash = last.hash();
if proposal.st == hash && proposal.sl > last.sl {
return Ok(index as i64)
}
if proposal.st == last.st && proposal.sl == last.sl {
warn!("Proposal already received");
return Ok(-2)
}
}
if proposal.st != self.consensus.last_block || proposal.sl <= self.consensus.last_sl {
warn!("Proposal doesn't extend any known chain");
return Ok(-2)
}
Ok(-1)
}
/// Receive a vote for a proposal.
/// First, sender is verified using their public key.
/// The proposal is then searched for in the node's fork chains.
/// If the vote wasn't received before, it is appended to the proposal
/// votes list.
/// When a node sees 2n/3 votes for a proposal, it notarizes it.
/// When a proposal gets notarized, the transactions it contains are
/// removed from the node's unconfirmed tx list.
/// Finally, we check if the notarization of the proposal can finalize
/// parent proposals in its chain.
pub fn receive_vote(&mut self, vote: &Vote) -> Result<bool> {
let mut encoded_proposal = vec![];
let result = vote.proposal.encode(&mut encoded_proposal);
match result {
Ok(_) => (),
Err(e) => {
error!("Proposal encoding failed: {:?}", e);
return Ok(false)
}
};
if !vote.public_key.verify(&encoded_proposal, &vote.vote) {
warn!("Voter ({}), signature couldn't be verified", vote.id);
return Ok(false)
}
let node_count = self.consensus.participants.len();
self.zero_participants_check();
// Checking that the voter can actually vote.
match self.consensus.participants.get(&vote.id) {
Some(participant) => {
if self.current_epoch() <= participant.joined {
warn!("Voter ({}) joined after current epoch.", vote.id);
return Ok(false)
}
}
None => {
warn!("Voter ({}) is not a participant!", vote.id);
return Ok(false)
}
}
let proposal = self.find_proposal(&vote.proposal)?;
if proposal.is_none() {
warn!("Received vote for unknown proposal.");
if !self.consensus.orphan_votes.contains(vote) {
self.consensus.orphan_votes.push(vote.clone());
}
return Ok(false)
}
let (proposal, chain_idx) = proposal.unwrap();
if proposal.sm.votes.contains(vote) {
return Ok(false)
}
proposal.sm.votes.push(vote.clone());
if !proposal.sm.notarized && proposal.sm.votes.len() > (2 * node_count / 3) {
debug!("Notarized a block");
proposal.sm.notarized = true;
self.chain_finalization(chain_idx)?;
}
// Updating participant vote
let mut participant = match self.consensus.participants.get(&vote.id) {
Some(p) => p.clone(),
None => Participant::new(vote.id, vote.sl),
};
match participant.voted {
Some(voted) => {
if vote.sl > voted {
participant.voted = Some(vote.sl);
}
}
None => participant.voted = Some(vote.sl),
}
self.consensus.participants.insert(participant.id, participant);
Ok(true)
}
/// Search the chains we're holding for the given proposal.
pub fn find_proposal(
&mut self,
vote_proposal: &blake3::Hash,
) -> Result<Option<(&mut BlockProposal, i64)>> {
for (index, chain) in &mut self.consensus.proposals.iter_mut().enumerate() {
for proposal in chain.proposals.iter_mut().rev() {
let proposal_hash = proposal.hash();
if vote_proposal == &proposal_hash {
return Ok(Some((proposal, index as i64)))
}
}
}
Ok(None)
}
/// Provided an index, the node checks if the chain can be finalized.
/// Consensus finalization logic:
/// - If the node has observed the notarization of 3 consecutive
/// proposals in a fork chain, it finalizes (appends to canonical
/// blockchain) all proposals up to the middle block.
/// When fork chain proposals are finalized, the rest of fork chains not
/// starting by those proposals are removed.
pub fn chain_finalization(&mut self, chain_index: i64) -> Result<()> {
let chain = &mut self.consensus.proposals[chain_index as usize];
let len = chain.proposals.len();
if len < 3 {
return Ok(())
}
let mut consecutive = 0;
for proposal in &chain.proposals {
if proposal.sm.notarized {
consecutive += 1;
continue
}
break
}
if consecutive < 3 {
return Ok(())
}
let mut finalized = vec![];
for proposal in &mut chain.proposals[..(consecutive - 1)] {
proposal.sm.finalized = true;
finalized.push(proposal.clone());
for tx in proposal.txs.clone() {
if let Some(pos) = self.unconfirmed_txs.iter().position(|txs| *txs == tx) {
self.unconfirmed_txs.remove(pos);
}
}
}
chain.proposals.drain(0..(consecutive - 1));
// Append to canonical chain
let blockhashes = self.blockchain.add(&finalized)?;
self.consensus.last_block = *blockhashes.last().unwrap();
self.consensus.last_sl = finalized.last().unwrap().sl;
let mut dropped = vec![];
for chain in self.consensus.proposals.iter() {
let first = chain.proposals.first().unwrap();
if first.st != self.consensus.last_block || first.sl <= self.consensus.last_sl {
dropped.push(chain.clone());
}
}
for chain in dropped {
self.consensus.proposals.retain(|c| *c != chain);
}
// Remove orphan votes
let mut orphans = vec![];
for vote in self.consensus.orphan_votes.iter() {
if vote.sl <= self.consensus.last_sl {
orphans.push(vote.clone());
}
}
for vote in orphans {
self.consensus.orphan_votes.retain(|v| *v != vote);
}
Ok(())
}
/// Append a new participant to the pending participants list.
pub fn append_participant(&mut self, participant: Participant) -> bool {
if self.consensus.pending_participants.contains(&participant) {
return false
}
self.consensus.pending_participants.push(participant);
true
}
/// Prevent the extreme case scenario where network is initialized, but
/// some nodes have not pushed the initial participants in the map.
pub fn zero_participants_check(&mut self) {
if self.consensus.participants.is_empty() {
for participant in &self.consensus.pending_participants {
self.consensus.participants.insert(participant.id, participant.clone());
}
self.consensus.pending_participants = Vec::new();
}
}
/// Refresh the participants map, to retain only the active ones.
/// Active nodes are considered those who joined or voted on a previous epoch.
pub fn refresh_participants(&mut self) {
// Adding pending participants
for participant in &self.consensus.pending_participants {
self.consensus.participants.insert(participant.id, participant.clone());
}
self.consensus.pending_participants = vec![];
let mut inactive = Vec::new();
let previous_epoch = self.current_epoch() - 1;
for (index, participant) in self.consensus.participants.clone().iter() {
match participant.voted {
Some(epoch) => {
if epoch < previous_epoch {
inactive.push(*index);
}
}
None => {
if participant.joined < previous_epoch {
inactive.push(*index);
}
}
}
}
for index in inactive {
self.consensus.participants.remove(&index);
}
}
}

21
src/consensus2/tx.rs Normal file
View File

@@ -0,0 +1,21 @@
use std::io;
use crate::{
impl_vec, net,
util::serial::{Decodable, Encodable, SerialDecodable, SerialEncodable, VarInt},
Result,
};
/// Temporary transaction representation.
#[derive(Debug, Clone, PartialEq, SerialEncodable, SerialDecodable)]
pub struct Tx {
pub payload: String,
}
impl net::Message for Tx {
fn name() -> &'static str {
"tx"
}
}
impl_vec!(Tx);

22
src/consensus2/util.rs Normal file
View File

@@ -0,0 +1,22 @@
use chrono::{NaiveDateTime, Utc};
use crate::util::serial::{SerialDecodable, SerialEncodable};
/// Wrapper struct to represent [`chrono`] UTC timestamps.
#[derive(Debug, Copy, Clone, PartialEq, SerialDecodable, SerialEncodable)]
pub struct Timestamp(pub i64);
impl Timestamp {
/// Calculates elapsed time of a `Timestamp`.
pub fn elapsed(&self) -> u64 {
let start_time = NaiveDateTime::from_timestamp(self.0, 0);
let end_time = NaiveDateTime::from_timestamp(Utc::now().timestamp(), 0);
let diff = end_time - start_time;
diff.num_seconds() as u64
}
}
/// Generate a [`Timestamp`] of the current time.
pub fn get_current_time() -> Timestamp {
Timestamp(Utc::now().timestamp())
}

43
src/consensus2/vote.rs Normal file
View File

@@ -0,0 +1,43 @@
use std::io;
use crate::{
crypto::{keypair::PublicKey, schnorr::Signature},
impl_vec, net,
util::serial::{Decodable, Encodable, SerialDecodable, SerialEncodable, VarInt},
Result,
};
/// This struct represents a `Vote` used by the Streamlet consensus
#[derive(Debug, Clone, PartialEq, SerialDecodable, SerialEncodable)]
pub struct Vote {
/// Node public key
pub public_key: PublicKey,
/// Block signature
pub vote: Signature,
/// Block proposal hash to vote on
pub proposal: blake3::Hash,
/// Slot uid, generated by the beacon
pub sl: u64,
/// Node ID
pub id: u64,
}
impl Vote {
pub fn new(
public_key: PublicKey,
vote: Signature,
proposal: blake3::Hash,
sl: u64,
id: u64,
) -> Self {
Self { public_key, vote, proposal, sl, id }
}
}
impl net::Message for Vote {
fn name() -> &'static str {
"vote"
}
}
impl_vec!(Vote);

View File

@@ -10,6 +10,9 @@ pub mod blockchain2;
#[cfg(feature = "blockchain")]
pub mod consensus;
#[cfg(feature = "blockchain2")]
pub mod consensus2;
#[cfg(feature = "wasm-runtime")]
pub mod runtime;