consensus: moved consensus state corresponding functions from ValidatorState(validator.rs) to ConsensusState(state.rs)

This commit is contained in:
aggstam
2022-12-01 12:34:41 +02:00
parent 4009320e86
commit 13d2b3e9a0
5 changed files with 570 additions and 562 deletions

View File

@@ -38,10 +38,6 @@ Pseudocode:
loop {
wait_for_next_slot_start()
if epoch_changed() {
create_competing_coins()
}
if is_slot_leader() {
block = propose_block()
p2p.broadcast_block(block)

View File

@@ -82,7 +82,7 @@ impl ProtocolProposal {
// Verify we have the proposal already
let mut lock = self.state.write().await;
if lock.proposal_exists(&proposal_copy.hash) {
if lock.consensus.proposal_exists(&proposal_copy.hash) {
debug!("ProtocolProposal::handle_receive_proposal(): Proposal already received.");
continue
}

View File

@@ -16,18 +16,29 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::time::Duration;
use chrono::{NaiveDateTime, Utc};
use darkfi_sdk::crypto::{constants::MERKLE_DEPTH, MerkleNode};
use darkfi_serial::{SerialDecodable, SerialEncodable};
use incrementalmerkletree::bridgetree::BridgeTree;
use pasta_curves::pallas;
use log::{debug, info};
use pasta_curves::{group::ff::PrimeField, pallas};
use rand::{thread_rng, Rng};
use super::{constants, leadcoin::LeadCoin, Block, Float10, ProposalChain};
use super::{
constants,
leadcoin::{LeadCoin, LeadCoinSecrets},
utils::fbig2base,
Block, BlockProposal, Float10, ProposalChain,
};
use crate::{net, tx::Transaction, util::time::Timestamp, Result};
use crate::{blockchain::Blockchain, net, tx::Transaction, util::time::Timestamp, Error, Result};
/// This struct represents the information required by the consensus algorithm
#[derive(Debug)]
pub struct ConsensusState {
/// Canonical (finalized) blockchain
pub blockchain: Blockchain,
/// Genesis block creation timestamp
pub genesis_ts: Timestamp,
/// Genesis block hash
@@ -68,9 +79,14 @@ pub struct ConsensusState {
}
impl ConsensusState {
pub fn new(genesis_ts: Timestamp, genesis_data: blake3::Hash) -> Result<Self> {
pub fn new(
blockchain: Blockchain,
genesis_ts: Timestamp,
genesis_data: blake3::Hash,
) -> Result<Self> {
let genesis_block = Block::genesis_block(genesis_ts, genesis_data).blockhash();
Ok(Self {
blockchain,
genesis_ts,
genesis_block,
participating: None,
@@ -91,6 +107,522 @@ impl ConsensusState {
prev_sigma2: pallas::Base::zero(),
})
}
/// Calculates current epoch.
pub fn current_epoch(&self) -> u64 {
self.slot_epoch(self.current_slot())
}
/// Calculates the epoch of the provided slot.
/// Epoch duration is configured using the `EPOCH_LENGTH` value.
pub fn slot_epoch(&self, slot: u64) -> u64 {
slot / constants::EPOCH_LENGTH as u64
}
/// Calculates current slot, based on elapsed time from the genesis block.
/// Slot duration is configured using the `SLOT_TIME` constant.
pub fn current_slot(&self) -> u64 {
self.genesis_ts.elapsed() / constants::SLOT_TIME
}
/// Calculates the relative number of the provided slot.
pub fn relative_slot(&self, slot: u64) -> u64 {
slot % constants::EPOCH_LENGTH as u64
}
/// Finds the last slot a proposal or block was generated.
pub fn last_slot(&self) -> Result<u64> {
let mut slot = 0;
for chain in &self.proposals {
for proposal in &chain.proposals {
if proposal.block.header.slot > slot {
slot = proposal.block.header.slot;
}
}
}
// We return here in case proposals exist,
// so we don't query the sled database.
if slot > 0 {
return Ok(slot)
}
let (last_slot, _) = self.blockchain.last()?;
Ok(last_slot)
}
/// Calculates seconds until next Nth slot starting time.
/// Slots duration is configured using the SLOT_TIME constant.
pub fn next_n_slot_start(&self, n: u64) -> Duration {
assert!(n > 0);
let start_time = NaiveDateTime::from_timestamp_opt(self.genesis_ts.0, 0).unwrap();
let current_slot = self.current_slot() + n;
let next_slot_start =
(current_slot * constants::SLOT_TIME) + (start_time.timestamp() as u64);
let next_slot_start = NaiveDateTime::from_timestamp_opt(next_slot_start as i64, 0).unwrap();
let current_time = NaiveDateTime::from_timestamp_opt(Utc::now().timestamp(), 0).unwrap();
let diff = next_slot_start - current_time;
Duration::new(diff.num_seconds().try_into().unwrap(), 0)
}
/// Calculate slots until next Nth epoch.
/// Epoch duration is configured using the EPOCH_LENGTH value.
pub fn slots_to_next_n_epoch(&self, n: u64) -> u64 {
assert!(n > 0);
let slots_till_next_epoch =
constants::EPOCH_LENGTH as u64 - self.relative_slot(self.current_slot());
((n - 1) * constants::EPOCH_LENGTH as u64) + slots_till_next_epoch
}
/// Calculates seconds until next Nth epoch starting time.
pub fn next_n_epoch_start(&self, n: u64) -> Duration {
self.next_n_slot_start(self.slots_to_next_n_epoch(n))
}
/// Set participating slot to next.
pub fn set_participating(&mut self) -> Result<()> {
self.participating = Some(self.current_slot() + 1);
Ok(())
}
/// Generate current slot checkpoint
fn generate_slot_checkpoint(&mut self, sigma1: pallas::Base, sigma2: pallas::Base) {
let slot = self.current_slot();
let checkpoint = SlotCheckpoint { slot, eta: self.epoch_eta, sigma1, sigma2 };
self.slot_checkpoints.push(checkpoint);
}
/// Check if new epoch has started, to create new epoch coins.
/// Returns flag to signify if epoch has changed and vector of
/// new epoch competing coins.
pub async fn epoch_changed(
&mut self,
sigma1: pallas::Base,
sigma2: pallas::Base,
) -> Result<bool> {
let epoch = self.current_epoch();
self.prev_sigma1 = sigma1;
self.prev_sigma2 = sigma2;
if epoch <= self.epoch {
self.generate_slot_checkpoint(sigma1.clone(), sigma2.clone());
return Ok(false)
}
let eta = self.get_eta();
// At start of epoch, relative slot is 0.
if self.coins.len() == 0 {
//TODO: DRK coin need to be burned, and consensus coin to be minted.
self.coins = self.create_epoch_coins(eta, epoch).await?;
}
self.epoch = epoch;
self.prev_epoch_eta = self.epoch_eta;
self.epoch_eta = eta;
self.generate_slot_checkpoint(sigma1.clone(), sigma2.clone());
Ok(true)
}
/// return 2-term target approximation sigma coefficients.
pub fn sigmas(&mut self) -> (pallas::Base, pallas::Base) {
let f = self.win_prob_with_full_stake();
// Generate sigmas
let mut total_stake = self.total_stake(); // Only used for fine-tuning
// at genesis epoch first slot, of absolute index 0,
// the total stake would be 0, to avoid division by zero,
// we asume total stake at first division is GENESIS_TOTAL_STAKE.
if total_stake == 0 {
total_stake = constants::GENESIS_TOTAL_STAKE;
}
info!("consensus::sigmas(): f: {}", f);
info!("consensus::sigmas(): stake: {}", total_stake);
let one = constants::FLOAT10_ONE.clone();
let two = constants::FLOAT10_TWO.clone();
let field_p = Float10::from_str_native(constants::P)
.unwrap()
.with_precision(constants::RADIX_BITS)
.value();
let total_sigma =
Float10::try_from(total_stake).unwrap().with_precision(constants::RADIX_BITS).value();
let x = one - f;
let c = x.ln();
let sigma1_fbig = c.clone() / total_sigma.clone() * field_p.clone();
let sigma1 = fbig2base(sigma1_fbig);
let sigma2_fbig = (c / total_sigma).powf(two.clone()) * (field_p / two);
let sigma2 = fbig2base(sigma2_fbig);
(sigma1, sigma2)
}
/// Generate epoch-competing coins
async fn create_epoch_coins(
&mut self,
eta: pallas::Base,
epoch: u64,
) -> Result<Vec<Vec<LeadCoin>>> {
info!("Consensus: Creating coins for epoch: {}", epoch);
self.create_coins(eta).await
}
/// Generate coins for provided sigmas.
/// NOTE: The strategy here is having a single competing coin per slot.
async fn create_coins(&mut self, eta: pallas::Base) -> Result<Vec<Vec<LeadCoin>>> {
let slot = self.current_slot();
let mut rng = thread_rng();
let mut seeds: Vec<u64> = Vec::with_capacity(constants::EPOCH_LENGTH);
for _ in 0..constants::EPOCH_LENGTH {
seeds.push(rng.gen());
}
let epoch_secrets = LeadCoinSecrets::generate();
//let mut tree_cm = BridgeTree::<MerkleNode, MERKLE_DEPTH>::new(constants::EPOCH_LENGTH);
// LeadCoin matrix where each row represents a slot and contains its competing coins.
let mut coins: Vec<Vec<LeadCoin>> = Vec::with_capacity(constants::EPOCH_LENGTH);
// TODO: TESTNET: Here we would look into the wallet to find coins we're able to use.
// The wallet has specific tables for consensus coins.
// TODO: TESTNET: Token ID still has to be enforced properly in the consensus.
// Temporarily, we compete with zero stake
for i in 0..constants::EPOCH_LENGTH {
let coin = LeadCoin::new(
eta,
constants::LOTTERY_HEAD_START, // TODO: TESTNET: Why is this constant being used?
slot + i as u64,
epoch_secrets.secret_keys[i].inner(),
epoch_secrets.merkle_roots[i],
i,
epoch_secrets.merkle_paths[i],
seeds[i],
epoch_secrets.secret_keys[i],
&mut self.coins_tree,
);
coins.push(vec![coin]);
}
Ok(coins)
}
/// leadership reward, assuming constant reward
/// TODO (res) implement reward mechanism with accord to DRK,DARK token-economics
fn reward() -> u64 {
constants::REWARD
}
/// Auxillary function to receive current slot offset.
/// If offset is None, its setted up as last block slot offset.
pub fn get_current_offset(&mut self, current_slot: u64) -> u64 {
// This is the case were we restarted our node, didn't receive offset from other nodes,
// so we need to find offset from last block, exluding network dead period.
if self.offset.is_none() {
let (last_slot, last_offset) = self.blockchain.get_last_offset().unwrap();
let offset = last_offset + (current_slot - last_slot);
info!("get_current_offset(): Setting slot offset: {}", offset);
self.offset = Some(offset);
}
self.offset.unwrap()
}
/// Auxillary function to calculate overall empty slots.
/// We keep an offset from genesis indicating when the first slot actually started.
/// This offset is shared between nodes.
fn overall_empty_slots(&mut self, current_slot: u64) -> u64 {
// Retrieve existing blocks excluding genesis
let blocks = (self.blockchain.len() as u64) - 1;
// Setup offset if only have genesis and havent received offset from other nodes
if blocks == 0 && self.offset.is_none() {
info!(
"overall_empty_slots(): Blockchain contains only genesis, setting slot offset: {}",
current_slot
);
self.offset = Some(current_slot);
}
// Retrieve longest fork length, to also those proposals in the calculation
let max_fork_length = self.longest_chain_length() as u64;
current_slot - blocks - self.get_current_offset(current_slot) - max_fork_length
}
/// total stake
/// assuming constant Reward.
fn total_stake(&mut self) -> i64 {
let current_slot = self.current_slot();
((current_slot - self.overall_empty_slots(current_slot)) * Self::reward()) as i64
}
/// Calculate how many leaders existed in previous slot and appends
/// it to history, to report it if win. On finalization sync period,
/// node replaces its leaders history with the sequence extracted by
/// the longest fork.
fn extend_leaders_history(&mut self) -> Float10 {
let slot = self.current_slot();
let previous_slot = slot - 1;
let mut count = 0;
for chain in &self.proposals {
// Previous slot proposals exist at end of each fork
if chain.proposals.last().unwrap().block.header.slot == previous_slot {
count += 1;
}
}
self.leaders_history.push(count);
debug!("extend_leaders_history(): Current leaders history: {:?}", self.leaders_history);
Float10::try_from(count as i64).unwrap().with_precision(constants::RADIX_BITS).value()
}
fn pid_error(feedback: Float10) -> Float10 {
let target = constants::FLOAT10_ONE.clone();
target - feedback
}
fn f_dif(&mut self) -> Float10 {
Self::pid_error(self.extend_leaders_history())
}
fn f_der(&self) -> Float10 {
let len = self.leaders_history.len();
let last = Float10::try_from(self.leaders_history[len - 1] as i64)
.unwrap()
.with_precision(constants::RADIX_BITS)
.value();
let second_to_last = Float10::try_from(self.leaders_history[len - 2] as i64)
.unwrap()
.with_precision(constants::RADIX_BITS)
.value();
let mut der =
(Self::pid_error(second_to_last) - Self::pid_error(last)) / constants::DT.clone();
der = if der > constants::MAX_DER.clone() { constants::MAX_DER.clone() } else { der };
der = if der < constants::MIN_DER.clone() { constants::MIN_DER.clone() } else { der };
der
}
fn f_int(&self) -> Float10 {
let mut sum = constants::FLOAT10_ZERO.clone();
let lead_history_len = self.leaders_history.len();
let history_begin_index = if lead_history_len > 10 { lead_history_len - 10 } else { 0 };
for lf in &self.leaders_history[history_begin_index..] {
sum += Self::pid_error(Float10::try_from(lf.clone()).unwrap());
}
sum
}
fn pid(p: Float10, i: Float10, d: Float10) -> Float10 {
constants::KP.clone() * p + constants::KI.clone() * i + constants::KD.clone() * d
}
/// the probability of winnig lottery having all the stake
/// returns f
fn win_prob_with_full_stake(&mut self) -> Float10 {
let p = self.f_dif();
let i = self.f_int();
let d = self.f_der();
info!("PID: P: {:?}", p);
info!("PID: I: {:?}", i);
info!("PID: D: {:?}", d);
let mut f = Self::pid(p, i, d);
info!("Consensus::win_prob_with_full_stake(): pid f: {}", f);
f = if f >= constants::FLOAT10_ONE.clone() {
constants::MAX_F.clone()
} else if f <= constants::FLOAT10_ZERO.clone() {
constants::MIN_F.clone()
} else {
f
};
info!("Consensus::win_prob_with_full_stake(): clipped f: {}", f);
f
}
/// Check that the provided participant/stakeholder coins win the slot lottery.
/// If the stakeholder has multiple competing winning coins, only the highest value
/// coin is selected, since the stakeholder can't give more than one proof per block/slot.
/// * 'sigma1', 'sigma2': slot sigmas
/// Returns: (check: bool, idx: usize) where idx is the winning coin's index
pub fn is_slot_leader(&mut self, sigma1: pallas::Base, sigma2: pallas::Base) -> (bool, usize) {
// Slot relative index
let slot = self.relative_slot(self.current_slot());
// Stakeholder's epoch coins
let coins = &self.coins;
info!("Consensus::is_leader(): slot: {}, coins len: {}", slot, coins.len());
assert!((slot as usize) < coins.len());
let competing_coins = &coins[slot as usize];
let mut won = false;
let mut highest_stake = 0;
let mut highest_stake_idx = 0;
for (winning_idx, coin) in competing_coins.iter().enumerate() {
let first_winning = coin.is_leader(sigma1, sigma2);
if first_winning && !won {
highest_stake_idx = winning_idx;
}
won |= first_winning;
if won && coin.value > highest_stake {
highest_stake = coin.value;
highest_stake_idx = winning_idx;
}
}
(won, highest_stake_idx)
}
/// Finds the longest blockchain the node holds and
/// returns the last block hash and the chain index.
pub fn longest_chain_last_hash(&self) -> Result<(blake3::Hash, i64)> {
let mut longest: Option<ProposalChain> = None;
let mut length = 0;
let mut index = -1;
if !self.proposals.is_empty() {
for (i, chain) in self.proposals.iter().enumerate() {
if chain.proposals.len() > length {
longest = Some(chain.clone());
length = chain.proposals.len();
index = i as i64;
}
}
}
let hash = match longest {
Some(chain) => chain.proposals.last().unwrap().hash,
None => self.blockchain.last()?.1,
};
Ok((hash, index))
}
/// Finds the length of longest fork chain the node holds.
pub fn longest_chain_length(&self) -> usize {
let mut max = 0;
for proposal in &self.proposals {
if proposal.proposals.len() > max {
max = proposal.proposals.len();
}
}
max
}
/// Given a proposal, find the index of the fork chain it extends.
pub fn find_extended_chain_index(&mut self, proposal: &BlockProposal) -> Result<i64> {
// We iterate through all forks to find which fork to extend
let mut chain_index = -1;
let mut prop_index = 0;
for (c_index, chain) in self.proposals.iter().enumerate() {
// Traverse proposals in reverse
for (p_index, prop) in chain.proposals.iter().enumerate().rev() {
if proposal.block.header.previous == prop.hash {
chain_index = c_index as i64;
prop_index = p_index;
break
}
}
if chain_index != -1 {
break
}
}
// If no fork was found, we check with canonical
if chain_index == -1 {
let (last_slot, last_block) = self.blockchain.last()?;
if proposal.block.header.previous != last_block ||
proposal.block.header.slot <= last_slot
{
debug!("find_extended_chain_index(): Proposal doesn't extend any known chain");
return Ok(-2)
}
// Proposal extends canonical chain
return Ok(-1)
}
// Found fork chain
let chain = &self.proposals[chain_index as usize];
// Proposal extends fork at last proposal
if prop_index == (chain.proposals.len() - 1) {
return Ok(chain_index)
}
debug!("find_extended_chain_index(): Proposal to fork a forkchain was received.");
let mut chain = self.proposals[chain_index as usize].clone();
// We keep all proposals until the one it extends
chain.proposals.drain((prop_index + 1)..);
self.proposals.push(chain);
Ok(self.proposals.len() as i64 - 1)
}
/// Search the chains we're holding for the given proposal.
pub fn proposal_exists(&self, input_proposal: &blake3::Hash) -> bool {
for chain in self.proposals.iter() {
for proposal in chain.proposals.iter() {
if input_proposal == &proposal.hash {
return true
}
}
}
false
}
/// Auxillary function to set nodes leaders count history to the largest fork sequence
/// of leaders, by using provided index.
pub fn set_leader_history(&mut self, index: i64) {
// Check if we found longest fork to extract sequence from
match index {
-1 => {
debug!("set_leader_history(): No fork exists.");
}
_ => {
debug!("set_leader_history(): Checking last proposal of fork: {}", index);
let last_proposal = self.proposals[index as usize].proposals.last().unwrap();
if last_proposal.block.header.slot == self.current_slot() {
// Replacing our last history element with the leaders one
self.leaders_history.pop();
self.leaders_history.push(last_proposal.block.lead_info.leaders);
debug!("set_leader_history(): New leaders history: {:?}", self.leaders_history);
return
}
}
}
self.leaders_history.push(0);
}
/// Utility function to extract leader selection lottery randomness(eta),
/// defined as the hash of the previous lead proof converted to pallas base.
fn get_eta(&self) -> pallas::Base {
let proof_tx_hash = self.blockchain.get_last_proof_hash().unwrap();
let mut bytes: [u8; 32] = *proof_tx_hash.as_bytes();
// read first 254 bits
bytes[30] = 0;
bytes[31] = 0;
pallas::Base::from_repr(bytes).unwrap()
}
/// Auxillary function to retrieve slot checkpoint of provided slot UID.
pub fn get_slot_checkpoint(&self, slot: u64) -> Result<SlotCheckpoint> {
// Check hot/live slot checkpoints
for slot_checkpoint in self.slot_checkpoints.iter().rev() {
if slot_checkpoint.slot == slot {
return Ok(slot_checkpoint.clone())
}
}
// Check if slot is finalized
if let Ok(slot_checkpoints) = self.blockchain.get_slot_checkpoints_by_slot(&[slot]) {
if slot_checkpoints.len() > 0 {
if let Some(slot_checkpoint) = &slot_checkpoints[0] {
return Ok(slot_checkpoint.clone())
}
}
}
Err(Error::SlotCheckpointNotFound(slot))
}
}
/// Auxiliary structure used for consensus syncing.

View File

@@ -31,7 +31,7 @@ use crate::{
pub async fn proposal_task(consensus_p2p: P2pPtr, sync_p2p: P2pPtr, state: ValidatorStatePtr) {
// Node waits just before the current or next epoch last finalization syncing period, so it can
// start syncing latest state.
let mut seconds_until_next_epoch = state.read().await.next_n_epoch_start(1);
let mut seconds_until_next_epoch = state.read().await.consensus.next_n_epoch_start(1);
let sync_offset = Duration::new(constants::FINAL_SYNC_DUR + 1, 0);
loop {
@@ -42,7 +42,7 @@ pub async fn proposal_task(consensus_p2p: P2pPtr, sync_p2p: P2pPtr, state: Valid
info!("consensus: Waiting for next epoch ({:?} sec)", seconds_until_next_epoch);
sleep(seconds_until_next_epoch.as_secs()).await;
seconds_until_next_epoch = state.read().await.next_n_epoch_start(1);
seconds_until_next_epoch = state.read().await.consensus.next_n_epoch_start(1);
}
info!("consensus: Waiting for next epoch ({:?} sec)", seconds_until_next_epoch);
@@ -57,14 +57,14 @@ pub async fn proposal_task(consensus_p2p: P2pPtr, sync_p2p: P2pPtr, state: Valid
};
// Node modifies its participating slot to next.
match state.write().await.set_participating() {
match state.write().await.consensus.set_participating() {
Ok(()) => info!("consensus: Node will start participating in the next slot"),
Err(e) => error!("consensus: Failed to set participation slot: {}", e),
}
loop {
// Node sleeps until finalization sync period start (2 seconds before next slot)
let seconds_sync_period = (state.read().await.next_n_slot_start(1) -
let seconds_sync_period = (state.read().await.consensus.next_n_slot_start(1) -
Duration::new(constants::FINAL_SYNC_DUR, 0))
.as_secs();
info!("consensus: Waiting for finalization sync period ({} sec)", seconds_sync_period);
@@ -92,14 +92,14 @@ pub async fn proposal_task(consensus_p2p: P2pPtr, sync_p2p: P2pPtr, state: Valid
}
// Node sleeps until next slot
let seconds_next_slot = state.read().await.next_n_slot_start(1).as_secs();
let seconds_next_slot = state.read().await.consensus.next_n_slot_start(1).as_secs();
info!("consensus: Waiting for next slot ({} sec)", seconds_next_slot);
sleep(seconds_next_slot).await;
// Retrieve slot sigmas
let (sigma1, sigma2) = state.write().await.sigmas();
let (sigma1, sigma2) = state.write().await.consensus.sigmas();
// Node checks if epoch has changed, to generate new epoch coins
let epoch_changed = state.write().await.epoch_changed(sigma1, sigma2).await;
let epoch_changed = state.write().await.consensus.epoch_changed(sigma1, sigma2).await;
match epoch_changed {
Ok(changed) => {
if changed {
@@ -113,7 +113,7 @@ pub async fn proposal_task(consensus_p2p: P2pPtr, sync_p2p: P2pPtr, state: Valid
};
// Node checks if it's the slot leader to generate a new proposal
// for that slot.
let (won, idx) = state.write().await.is_slot_leader(sigma1, sigma2);
let (won, idx) = state.write().await.consensus.is_slot_leader(sigma1, sigma2);
let result = if won { state.write().await.propose(idx, sigma1, sigma2) } else { Ok(None) };
let proposal = match result {
Ok(prop) => {

View File

@@ -16,10 +16,9 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::{collections::HashMap, io::Cursor, time::Duration};
use std::{collections::HashMap, io::Cursor};
use async_std::sync::{Arc, RwLock};
use chrono::{NaiveDateTime, Utc};
use darkfi_sdk::{
crypto::{
constants::MERKLE_DEPTH,
@@ -32,15 +31,14 @@ use darkfi_serial::{deserialize, serialize, Decodable, Encodable, WriteExt};
use incrementalmerkletree::{bridgetree::BridgeTree, Tree};
use log::{debug, error, info, warn};
use pasta_curves::{group::ff::PrimeField, pallas};
use rand::{rngs::OsRng, thread_rng, Rng};
use rand::rngs::OsRng;
use serde_json::json;
use super::{
constants,
leadcoin::{LeadCoin, LeadCoinSecrets},
leadcoin::LeadCoin,
state::{ConsensusState, SlotCheckpoint},
utils::fbig2base,
BlockInfo, BlockProposal, Float10, Header, LeadInfo, LeadProof, ProposalChain,
BlockInfo, BlockProposal, Header, LeadInfo, LeadProof, ProposalChain,
};
use crate::{
@@ -119,8 +117,8 @@ impl ValidatorState {
None
};
let consensus = ConsensusState::new(genesis_ts, genesis_data)?;
let blockchain = Blockchain::new(db, genesis_ts, genesis_data)?;
let consensus = ConsensusState::new(blockchain.clone(), genesis_ts, genesis_data)?;
let unconfirmed_txs = vec![];
@@ -246,374 +244,6 @@ impl ValidatorState {
true
}
/// Calculates current epoch.
pub fn current_epoch(&self) -> u64 {
self.slot_epoch(self.current_slot())
}
/// Calculates the epoch of the provided slot.
/// Epoch duration is configured using the `EPOCH_LENGTH` value.
pub fn slot_epoch(&self, slot: u64) -> u64 {
slot / constants::EPOCH_LENGTH as u64
}
/// Calculates current slot, based on elapsed time from the genesis block.
/// Slot duration is configured using the `SLOT_TIME` constant.
pub fn current_slot(&self) -> u64 {
self.consensus.genesis_ts.elapsed() / constants::SLOT_TIME
}
/// Calculates the relative number of the provided slot.
pub fn relative_slot(&self, slot: u64) -> u64 {
slot % constants::EPOCH_LENGTH as u64
}
/// Finds the last slot a proposal or block was generated.
pub fn last_slot(&self) -> Result<u64> {
let mut slot = 0;
for chain in &self.consensus.proposals {
for proposal in &chain.proposals {
if proposal.block.header.slot > slot {
slot = proposal.block.header.slot;
}
}
}
// We return here in case proposals exist,
// so we don't query the sled database.
if slot > 0 {
return Ok(slot)
}
let (last_slot, _) = self.blockchain.last()?;
Ok(last_slot)
}
/// Calculates seconds until next Nth slot starting time.
/// Slots duration is configured using the SLOT_TIME constant.
pub fn next_n_slot_start(&self, n: u64) -> Duration {
assert!(n > 0);
let start_time = NaiveDateTime::from_timestamp_opt(self.consensus.genesis_ts.0, 0).unwrap();
let current_slot = self.current_slot() + n;
let next_slot_start =
(current_slot * constants::SLOT_TIME) + (start_time.timestamp() as u64);
let next_slot_start = NaiveDateTime::from_timestamp_opt(next_slot_start as i64, 0).unwrap();
let current_time = NaiveDateTime::from_timestamp_opt(Utc::now().timestamp(), 0).unwrap();
let diff = next_slot_start - current_time;
Duration::new(diff.num_seconds().try_into().unwrap(), 0)
}
/// Calculate slots until next Nth epoch.
/// Epoch duration is configured using the EPOCH_LENGTH value.
pub fn slots_to_next_n_epoch(&self, n: u64) -> u64 {
assert!(n > 0);
let slots_till_next_epoch =
constants::EPOCH_LENGTH as u64 - self.relative_slot(self.current_slot());
((n - 1) * constants::EPOCH_LENGTH as u64) + slots_till_next_epoch
}
/// Calculates seconds until next Nth epoch starting time.
pub fn next_n_epoch_start(&self, n: u64) -> Duration {
self.next_n_slot_start(self.slots_to_next_n_epoch(n))
}
/// Set participating slot to next.
pub fn set_participating(&mut self) -> Result<()> {
self.consensus.participating = Some(self.current_slot() + 1);
Ok(())
}
/// Generate current slot checkpoint
fn generate_slot_checkpoint(&mut self, sigma1: pallas::Base, sigma2: pallas::Base) {
let slot = self.current_slot();
let checkpoint = SlotCheckpoint { slot, eta: self.consensus.epoch_eta, sigma1, sigma2 };
self.consensus.slot_checkpoints.push(checkpoint);
}
/// Check if new epoch has started, to create new epoch coins.
/// Returns flag to signify if epoch has changed and vector of
/// new epoch competing coins.
pub async fn epoch_changed(
&mut self,
sigma1: pallas::Base,
sigma2: pallas::Base,
) -> Result<bool> {
let epoch = self.current_epoch();
self.consensus.prev_sigma1 = sigma1;
self.consensus.prev_sigma2 = sigma2;
if epoch <= self.consensus.epoch {
self.generate_slot_checkpoint(sigma1.clone(), sigma2.clone());
return Ok(false)
}
let eta = self.get_eta();
// At start of epoch, relative slot is 0.
if self.consensus.coins.len() == 0 {
//TODO: DRK coin need to be burned, and consensus coin to be minted.
self.consensus.coins = self.create_epoch_coins(eta, epoch).await?;
}
self.consensus.epoch = epoch;
self.consensus.prev_epoch_eta = self.consensus.epoch_eta;
self.consensus.epoch_eta = eta;
self.generate_slot_checkpoint(sigma1.clone(), sigma2.clone());
Ok(true)
}
/// return 2-term target approximation sigma coefficients.
pub fn sigmas(&mut self) -> (pallas::Base, pallas::Base) {
let f = self.win_prob_with_full_stake();
// Generate sigmas
let mut total_stake = self.total_stake(); // Only used for fine-tuning
// at genesis epoch first slot, of absolute index 0,
// the total stake would be 0, to avoid division by zero,
// we asume total stake at first division is GENESIS_TOTAL_STAKE.
if total_stake == 0 {
total_stake = constants::GENESIS_TOTAL_STAKE;
}
info!("consensus::sigmas(): f: {}", f);
info!("consensus::sigmas(): stake: {}", total_stake);
let one = constants::FLOAT10_ONE.clone();
let two = constants::FLOAT10_TWO.clone();
let field_p = Float10::from_str_native(constants::P)
.unwrap()
.with_precision(constants::RADIX_BITS)
.value();
let total_sigma =
Float10::try_from(total_stake).unwrap().with_precision(constants::RADIX_BITS).value();
let x = one - f;
let c = x.ln();
let sigma1_fbig = c.clone() / total_sigma.clone() * field_p.clone();
let sigma1 = fbig2base(sigma1_fbig);
let sigma2_fbig = (c / total_sigma).powf(two.clone()) * (field_p / two);
let sigma2 = fbig2base(sigma2_fbig);
(sigma1, sigma2)
}
/// Generate epoch-competing coins
async fn create_epoch_coins(
&mut self,
eta: pallas::Base,
epoch: u64,
) -> Result<Vec<Vec<LeadCoin>>> {
info!("Consensus: Creating coins for epoch: {}", epoch);
self.create_coins(eta).await
}
/// Generate coins for provided sigmas.
/// NOTE: The strategy here is having a single competing coin per slot.
async fn create_coins(&mut self, eta: pallas::Base) -> Result<Vec<Vec<LeadCoin>>> {
let slot = self.current_slot();
let mut rng = thread_rng();
let mut seeds: Vec<u64> = Vec::with_capacity(constants::EPOCH_LENGTH);
for _ in 0..constants::EPOCH_LENGTH {
seeds.push(rng.gen());
}
let epoch_secrets = LeadCoinSecrets::generate();
//let mut tree_cm = BridgeTree::<MerkleNode, MERKLE_DEPTH>::new(constants::EPOCH_LENGTH);
// LeadCoin matrix where each row represents a slot and contains its competing coins.
let mut coins: Vec<Vec<LeadCoin>> = Vec::with_capacity(constants::EPOCH_LENGTH);
// TODO: TESTNET: Here we would look into the wallet to find coins we're able to use.
// The wallet has specific tables for consensus coins.
// TODO: TESTNET: Token ID still has to be enforced properly in the consensus.
// Temporarily, we compete with zero stake
for i in 0..constants::EPOCH_LENGTH {
let coin = LeadCoin::new(
eta,
constants::LOTTERY_HEAD_START, // TODO: TESTNET: Why is this constant being used?
slot + i as u64,
epoch_secrets.secret_keys[i].inner(),
epoch_secrets.merkle_roots[i],
i,
epoch_secrets.merkle_paths[i],
seeds[i],
epoch_secrets.secret_keys[i],
&mut self.consensus.coins_tree,
);
coins.push(vec![coin]);
}
Ok(coins)
}
/// leadership reward, assuming constant reward
/// TODO (res) implement reward mechanism with accord to DRK,DARK token-economics
fn reward() -> u64 {
constants::REWARD
}
/// Auxillary function to receive current slot offset.
/// If offset is None, its setted up as last block slot offset.
fn get_current_offset(&mut self, current_slot: u64) -> u64 {
// This is the case were we restarted our node, didn't receive offset from other nodes,
// so we need to find offset from last block, exluding network dead period.
if self.consensus.offset.is_none() {
let (last_slot, last_offset) = self.blockchain.get_last_offset().unwrap();
let offset = last_offset + (current_slot - last_slot);
info!("get_current_offset(): Setting slot offset: {}", offset);
self.consensus.offset = Some(offset);
}
self.consensus.offset.unwrap()
}
/// Auxillary function to calculate overall empty slots.
/// We keep an offset from genesis indicating when the first slot actually started.
/// This offset is shared between nodes.
fn overall_empty_slots(&mut self, current_slot: u64) -> u64 {
// Retrieve existing blocks excluding genesis
let blocks = (self.blockchain.len() as u64) - 1;
// Setup offset if only have genesis and havent received offset from other nodes
if blocks == 0 && self.consensus.offset.is_none() {
info!(
"overall_empty_slots(): Blockchain contains only genesis, setting slot offset: {}",
current_slot
);
self.consensus.offset = Some(current_slot);
}
// Retrieve longest fork length, to also those proposals in the calculation
let max_fork_length = self.longest_chain_length() as u64;
current_slot - blocks - self.get_current_offset(current_slot) - max_fork_length
}
/// total stake
/// assuming constant Reward.
fn total_stake(&mut self) -> i64 {
let current_slot = self.current_slot();
((current_slot - self.overall_empty_slots(current_slot)) * Self::reward()) as i64
}
/// Calculate how many leaders existed in previous slot and appends
/// it to history, to report it if win. On finalization sync period,
/// node replaces its leaders history with the sequence extracted by
/// the longest fork.
fn extend_leaders_history(&mut self) -> Float10 {
let slot = self.current_slot();
let previous_slot = slot - 1;
let mut count = 0;
for chain in &self.consensus.proposals {
// Previous slot proposals exist at end of each fork
if chain.proposals.last().unwrap().block.header.slot == previous_slot {
count += 1;
}
}
self.consensus.leaders_history.push(count);
debug!(
"extend_leaders_history(): Current leaders history: {:?}",
self.consensus.leaders_history
);
Float10::try_from(count as i64).unwrap().with_precision(constants::RADIX_BITS).value()
}
fn pid_error(feedback: Float10) -> Float10 {
let target = constants::FLOAT10_ONE.clone();
target - feedback
}
fn f_dif(&mut self) -> Float10 {
Self::pid_error(self.extend_leaders_history())
}
fn f_der(&self) -> Float10 {
let len = self.consensus.leaders_history.len();
let last = Float10::try_from(self.consensus.leaders_history[len - 1] as i64)
.unwrap()
.with_precision(constants::RADIX_BITS)
.value();
let second_to_last = Float10::try_from(self.consensus.leaders_history[len - 2] as i64)
.unwrap()
.with_precision(constants::RADIX_BITS)
.value();
let mut der =
(Self::pid_error(second_to_last) - Self::pid_error(last)) / constants::DT.clone();
der = if der > constants::MAX_DER.clone() { constants::MAX_DER.clone() } else { der };
der = if der < constants::MIN_DER.clone() { constants::MIN_DER.clone() } else { der };
der
}
fn f_int(&self) -> Float10 {
let mut sum = constants::FLOAT10_ZERO.clone();
let lead_history_len = self.consensus.leaders_history.len();
let history_begin_index = if lead_history_len > 10 { lead_history_len - 10 } else { 0 };
for lf in &self.consensus.leaders_history[history_begin_index..] {
sum += Self::pid_error(Float10::try_from(lf.clone()).unwrap());
}
sum
}
fn pid(p: Float10, i: Float10, d: Float10) -> Float10 {
constants::KP.clone() * p + constants::KI.clone() * i + constants::KD.clone() * d
}
/// the probability of winnig lottery having all the stake
/// returns f
fn win_prob_with_full_stake(&mut self) -> Float10 {
let p = self.f_dif();
let i = self.f_int();
let d = self.f_der();
info!("PID: P: {:?}", p);
info!("PID: I: {:?}", i);
info!("PID: D: {:?}", d);
let mut f = Self::pid(p, i, d);
info!("Consensus::win_prob_with_full_stake(): pid f: {}", f);
f = if f >= constants::FLOAT10_ONE.clone() {
constants::MAX_F.clone()
} else if f <= constants::FLOAT10_ZERO.clone() {
constants::MIN_F.clone()
} else {
f
};
info!("Consensus::win_prob_with_full_stake(): clipped f: {}", f);
f
}
/// Check that the provided participant/stakeholder coins win the slot lottery.
/// If the stakeholder has multiple competing winning coins, only the highest value
/// coin is selected, since the stakeholder can't give more than one proof per block/slot.
/// * 'sigma1', 'sigma2': slot sigmas
/// Returns: (check: bool, idx: usize) where idx is the winning coin's index
pub fn is_slot_leader(&mut self, sigma1: pallas::Base, sigma2: pallas::Base) -> (bool, usize) {
// Slot relative index
let slot = self.relative_slot(self.current_slot());
// Stakeholder's epoch coins
let coins = &self.consensus.coins;
info!("Consensus::is_leader(): slot: {}, coins len: {}", slot, coins.len());
assert!((slot as usize) < coins.len());
let competing_coins = &coins[slot as usize];
let mut won = false;
let mut highest_stake = 0;
let mut highest_stake_idx = 0;
for (winning_idx, coin) in competing_coins.iter().enumerate() {
let first_winning = coin.is_leader(sigma1, sigma2);
if first_winning && !won {
highest_stake_idx = winning_idx;
}
won |= first_winning;
if won && coin.value > highest_stake {
highest_stake = coin.value;
highest_stake_idx = winning_idx;
}
}
(won, highest_stake_idx)
}
/// Generate a block proposal for the current slot, containing all
/// unconfirmed transactions. Proposal extends the longest fork
/// chain the node is holding.
@@ -623,8 +253,8 @@ impl ValidatorState {
sigma1: pallas::Base,
sigma2: pallas::Base,
) -> Result<Option<BlockProposal>> {
let slot = self.current_slot();
let (prev_hash, index) = self.longest_chain_last_hash().unwrap();
let slot = self.consensus.current_slot();
let (prev_hash, index) = self.consensus.longest_chain_last_hash().unwrap();
let unproposed_txs = self.unproposed_txs(index);
// TODO: [PLACEHOLDER] Create and add rewards transaction
@@ -641,15 +271,20 @@ impl ValidatorState {
//let eta = self.consensus.epoch_eta;
// Generating leader proof
let relative_slot = self.relative_slot(slot) as usize;
let relative_slot = self.consensus.relative_slot(slot) as usize;
let coin = self.consensus.coins[relative_slot][idx];
let (proof, public_inputs) =
coin.create_lead_proof(sigma1, sigma2, self.lead_proving_key.as_ref().unwrap());
// Signing using coin
let secret_key = coin.secret_key;
let header =
Header::new(prev_hash, self.slot_epoch(slot), slot, Timestamp::current_time(), root);
let header = Header::new(
prev_hash,
self.consensus.slot_epoch(slot),
slot,
Timestamp::current_time(),
root,
);
let signed_proposal = secret_key.sign(&mut OsRng, &header.headerhash().as_bytes()[..]);
let public_key = PublicKey::from_secret(secret_key);
@@ -660,7 +295,7 @@ impl ValidatorState {
coin.slot,
coin.eta,
LeadProof::from(proof?),
self.get_current_offset(slot),
self.consensus.get_current_offset(slot),
self.consensus.leaders_history.last().unwrap().clone(),
);
// Replacing old coin with the derived coin
@@ -694,47 +329,10 @@ impl ValidatorState {
unproposed_txs
}
/// Finds the longest blockchain the node holds and
/// returns the last block hash and the chain index.
pub fn longest_chain_last_hash(&self) -> Result<(blake3::Hash, i64)> {
let mut longest: Option<ProposalChain> = None;
let mut length = 0;
let mut index = -1;
if !self.consensus.proposals.is_empty() {
for (i, chain) in self.consensus.proposals.iter().enumerate() {
if chain.proposals.len() > length {
longest = Some(chain.clone());
length = chain.proposals.len();
index = i as i64;
}
}
}
let hash = match longest {
Some(chain) => chain.proposals.last().unwrap().hash,
None => self.blockchain.last()?.1,
};
Ok((hash, index))
}
/// Finds the length of longest fork chain the node holds.
pub fn longest_chain_length(&self) -> usize {
let mut max = 0;
for proposal in &self.consensus.proposals {
if proposal.proposals.len() > max {
max = proposal.proposals.len();
}
}
max
}
/// Given a proposal, the node verify its sender (slot leader) and finds which blockchain
/// it extends. If the proposal extends the canonical blockchain, a new fork chain is created.
pub async fn receive_proposal(&mut self, proposal: &BlockProposal) -> Result<()> {
let current = self.current_slot();
let current = self.consensus.current_slot();
// Node hasn't started participating
match self.consensus.participating {
Some(start) => {
@@ -761,7 +359,7 @@ impl ValidatorState {
}
// Check if proposal extends any existing fork chains
let index = self.find_extended_chain_index(proposal)?;
let index = self.consensus.find_extended_chain_index(proposal)?;
if index == -2 {
return Err(Error::ExtendedChainIndexNotFound)
}
@@ -794,7 +392,7 @@ impl ValidatorState {
}
// Verify proposal offset
let offset = self.get_current_offset(current);
let offset = self.consensus.get_current_offset(current);
if offset != lf.offset {
warn!(
"receive_proposal(): Received proposal contains different offset: {} - {}",
@@ -811,7 +409,7 @@ impl ValidatorState {
info!("receive_proposal(): Leader proof verified successfully!");
// Validate proposal public value against coin creation slot checkpoint
let checkpoint = self.get_slot_checkpoint(lf.coin_slot)?;
let checkpoint = self.consensus.get_slot_checkpoint(lf.coin_slot)?;
if checkpoint.eta != lf.coin_eta {
return Err(Error::ProposalDifferentCoinEtaError)
}
@@ -836,7 +434,7 @@ impl ValidatorState {
}
// Validate proposal coin sigmas against current slot checkpoint
let checkpoint = self.get_slot_checkpoint(current)?;
let checkpoint = self.consensus.get_slot_checkpoint(current)?;
// sigma1
let prop_sigma1 = lf.public_inputs[constants::PI_SIGMA1_INDEX];
if checkpoint.sigma1 != prop_sigma1 {
@@ -905,67 +503,6 @@ impl ValidatorState {
Ok(())
}
/// Given a proposal, find the index of the fork chain it extends.
pub fn find_extended_chain_index(&mut self, proposal: &BlockProposal) -> Result<i64> {
// We iterate through all forks to find which fork to extend
let mut chain_index = -1;
let mut prop_index = 0;
for (c_index, chain) in self.consensus.proposals.iter().enumerate() {
// Traverse proposals in reverse
for (p_index, prop) in chain.proposals.iter().enumerate().rev() {
if proposal.block.header.previous == prop.hash {
chain_index = c_index as i64;
prop_index = p_index;
break
}
}
if chain_index != -1 {
break
}
}
// If no fork was found, we check with canonical
if chain_index == -1 {
let (last_slot, last_block) = self.blockchain.last()?;
if proposal.block.header.previous != last_block ||
proposal.block.header.slot <= last_slot
{
debug!("find_extended_chain_index(): Proposal doesn't extend any known chain");
return Ok(-2)
}
// Proposal extends canonical chain
return Ok(-1)
}
// Found fork chain
let chain = &self.consensus.proposals[chain_index as usize];
// Proposal extends fork at last proposal
if prop_index == (chain.proposals.len() - 1) {
return Ok(chain_index)
}
debug!("find_extended_chain_index(): Proposal to fork a forkchain was received.");
let mut chain = self.consensus.proposals[chain_index as usize].clone();
// We keep all proposals until the one it extends
chain.proposals.drain((prop_index + 1)..);
self.consensus.proposals.push(chain);
Ok(self.consensus.proposals.len() as i64 - 1)
}
/// Search the chains we're holding for the given proposal.
pub fn proposal_exists(&self, input_proposal: &blake3::Hash) -> bool {
for chain in self.consensus.proposals.iter() {
for proposal in chain.proposals.iter() {
if input_proposal == &proposal.hash {
return true
}
}
}
false
}
/// Remove provided transactions vector from unconfirmed_txs if they exist.
pub fn remove_txs(&mut self, transactions: &Vec<Transaction>) -> Result<()> {
for tx in transactions {
@@ -977,33 +514,6 @@ impl ValidatorState {
Ok(())
}
/// Auxillary function to set nodes leaders count history to the largest fork sequence
/// of leaders, by using provided index.
fn set_leader_history(&mut self, index: i64) {
// Check if we found longest fork to extract sequence from
match index {
-1 => {
debug!("set_leader_history(): No fork exists.");
}
_ => {
debug!("set_leader_history(): Checking last proposal of fork: {}", index);
let last_proposal =
self.consensus.proposals[index as usize].proposals.last().unwrap();
if last_proposal.block.header.slot == self.current_slot() {
// Replacing our last history element with the leaders one
self.consensus.leaders_history.pop();
self.consensus.leaders_history.push(last_proposal.block.lead_info.leaders);
debug!(
"set_leader_history(): New leaders history: {:?}",
self.consensus.leaders_history
);
return
}
}
}
self.consensus.leaders_history.push(0);
}
/// Node checks if any of the fork chains can be finalized.
/// Consensus finalization logic:
/// - If the node has observed the creation of 3 proposals in a fork chain and no other
@@ -1012,7 +522,7 @@ impl ValidatorState {
/// When fork chain proposals are finalized, the rest of fork chains are removed and all
/// slot checkpoints until current slot are apppended to canonical state.
pub async fn chain_finalization(&mut self) -> Result<Vec<BlockInfo>> {
let slot = self.current_slot();
let slot = self.consensus.current_slot();
debug!("chain_finalization(): Started finalization check for slot: {}", slot);
// Set last slot finalization check occured to current slot
self.consensus.checked_finalization = slot;
@@ -1052,12 +562,12 @@ impl ValidatorState {
match chain_index {
-2 => {
debug!("chain_finalization(): Eligible forks with same height exist, nothing to finalize.");
self.set_leader_history(index_for_history);
self.consensus.set_leader_history(index_for_history);
return Ok(vec![])
}
-1 => {
debug!("chain_finalization(): All chains have less than 3 proposals, nothing to finalize.");
self.set_leader_history(index_for_history);
self.consensus.set_leader_history(index_for_history);
return Ok(vec![])
}
_ => debug!("chain_finalization(): Chain {} can be finalized!", chain_index),
@@ -1153,36 +663,6 @@ impl ValidatorState {
Ok(finalized)
}
/// Utility function to extract leader selection lottery randomness(eta),
/// defined as the hash of the previous lead proof converted to pallas base.
fn get_eta(&self) -> pallas::Base {
let proof_tx_hash = self.blockchain.get_last_proof_hash().unwrap();
let mut bytes: [u8; 32] = *proof_tx_hash.as_bytes();
// read first 254 bits
bytes[30] = 0;
bytes[31] = 0;
pallas::Base::from_repr(bytes).unwrap()
}
/// Auxillary function to retrieve slot checkpoint of provided slot UID.
fn get_slot_checkpoint(&self, slot: u64) -> Result<SlotCheckpoint> {
// Check hot/live slot checkpoints
for slot_checkpoint in self.consensus.slot_checkpoints.iter().rev() {
if slot_checkpoint.slot == slot {
return Ok(slot_checkpoint.clone())
}
}
// Check if slot is finalized
if let Ok(slot_checkpoints) = self.blockchain.get_slot_checkpoints_by_slot(&[slot]) {
if slot_checkpoints.len() > 0 {
if let Some(slot_checkpoint) = &slot_checkpoints[0] {
return Ok(slot_checkpoint.clone())
}
}
}
Err(Error::SlotCheckpointNotFound(slot))
}
// ==========================
// State transition functions
// ==========================