consensus: fixed consensys state sync

This commit is contained in:
aggstam
2023-02-07 23:59:28 +02:00
parent 90f3a0fea7
commit a9fe80715b
3 changed files with 54 additions and 29 deletions

View File

@@ -94,20 +94,32 @@ impl ProtocolSyncConsensus {
// Extra validations can be added here.
let lock = self.state.read().await;
let bootstrap_slot = lock.consensus.bootstrap_slot;
let current_slot = lock.consensus.current_slot();
let mut forks = vec![];
for fork in &lock.consensus.forks {
forks.push(fork.clone().into());
}
let unconfirmed_txs = lock.unconfirmed_txs.clone();
let slot_checkpoints = lock.consensus.slot_checkpoints.clone();
let previous_leaders = lock.consensus.previous_leaders.clone();
let mut f_history = vec![];
for f in &lock.consensus.f_history {
let f_str = format!("{:}", f);
f_history.push(f_str);
}
let mut err_history = vec![];
for err in &lock.consensus.err_history {
let err_str = format!("{:}", err);
err_history.push(err_str);
}
let nullifiers = lock.consensus.nullifiers.clone();
let response = ConsensusResponse {
bootstrap_slot,
current_slot,
forks,
unconfirmed_txs,
slot_checkpoints,
previous_leaders,
f_history,
err_history,
nullifiers,
};
if let Err(e) = self.channel.send(response).await {
@@ -147,8 +159,9 @@ impl ProtocolSyncConsensus {
// Extra validations can be added here.
let lock = self.state.read().await;
let bootstrap_slot = lock.consensus.bootstrap_slot;
let proposing = lock.consensus.proposing;
let is_empty = lock.consensus.slot_checkpoints_is_empty();
let response = ConsensusSlotCheckpointsResponse { bootstrap_slot, is_empty };
let response = ConsensusSlotCheckpointsResponse { bootstrap_slot, proposing, is_empty };
if let Err(e) = self.channel.send(response).await {
error!(
target: "consensus::protocol_sync_consensus::handle_receive_slot_checkpoints_request()",

View File

@@ -77,9 +77,9 @@ pub struct ConsensusState {
pub slot_checkpoints: Vec<SlotCheckpoint>,
/// Last slot leaders count
pub previous_leaders: u64,
/// controller output history
/// Controller output history
pub f_history: Vec<Float10>,
/// controller proportional error history
/// Controller proportional error history
pub err_history: Vec<Float10>,
// TODO: Aren't these already in db after finalization?
/// Canonical competing coins
@@ -215,14 +215,8 @@ impl ConsensusState {
// Initialize node lead coins and set current epoch and eta.
pub async fn init_coins(&mut self) -> Result<()> {
self.epoch = self.current_epoch();
// Create slot checkpoint if not on genesis slot (already in db)
if self.slot_checkpoints.is_empty() && self.current_slot() != 0 {
let (sigma1, sigma2) = self.sigmas();
self.generate_slot_checkpoint(sigma1, sigma2);
};
self.coins = self.create_coins().await?;
self.update_forks_checkpoints();
Ok(())
}
@@ -247,13 +241,11 @@ impl ConsensusState {
pub fn sigmas(&mut self) -> (pallas::Base, pallas::Base) {
let f = self.win_inv_prob_with_full_stake();
let total_stake = self.total_stake();
let total_sigma = Float10::try_from(total_stake).unwrap();
Self::calc_sigmas(f, total_sigma)
self.calc_sigmas(f, total_sigma)
}
fn calc_sigmas(f: Float10, total_sigma: Float10) -> (pallas::Base, pallas::Base) {
fn calc_sigmas(&self, f: Float10, total_sigma: Float10) -> (pallas::Base, pallas::Base) {
info!(target: "consensus::state", "sigmas(): f: {}", f);
info!(target: "consensus::state", "sigmas(): total network stake: {:}", total_sigma);
@@ -285,12 +277,7 @@ impl ConsensusState {
/// Generate coins for provided sigmas.
/// NOTE: The strategy here is having a single competing coin per slot.
// TODO: DRK coin need to be burned, and consensus coin to be minted.
async fn create_coins(
&mut self,
//eta: pallas::Base,
) -> Result<Vec<LeadCoin>> {
let slot = self.current_slot();
async fn create_coins(&mut self) -> Result<Vec<LeadCoin>> {
// TODO: cleanup LeadCoinSecrets, no need to keep a vector
let (seeds, epoch_secrets) = {
let mut rng = thread_rng();
@@ -326,7 +313,7 @@ impl ConsensusState {
//let stake = self.initial_distribution;
let c = LeadCoin::new(
0,
slot,
self.current_slot(),
epoch_secrets.secret_keys[0].inner(),
epoch_secrets.merkle_roots[0],
0,
@@ -652,6 +639,8 @@ impl ConsensusState {
self.forks = vec![];
self.slot_checkpoints = vec![];
self.previous_leaders = 0;
self.f_history = vec![constants::FLOAT10_ZERO.clone()];
self.err_history = vec![constants::FLOAT10_ZERO.clone(), constants::FLOAT10_ZERO.clone()];
self.nullifiers = vec![];
}
}
@@ -671,14 +660,20 @@ impl net::Message for ConsensusRequest {
pub struct ConsensusResponse {
/// Slot the network was bootstrapped
pub bootstrap_slot: u64,
/// Current slot
pub current_slot: u64,
/// Hot/live data used by the consensus algorithm
pub forks: Vec<ForkInfo>,
/// Pending transactions
pub unconfirmed_txs: Vec<Transaction>,
/// Hot/live slot checkpoints
pub slot_checkpoints: Vec<SlotCheckpoint>,
/// Last slot leaders count
pub previous_leaders: u64,
// TODO: When Float10 supports encoding/decoding this should be
// replaced by directly using Vec<Float10>
/// Controller output history
pub f_history: Vec<String>,
/// Controller proportional error history
pub err_history: Vec<String>,
/// Seen nullifiers from proposals
pub nullifiers: Vec<pallas::Base>,
}
@@ -704,6 +699,8 @@ impl net::Message for ConsensusSlotCheckpointsRequest {
pub struct ConsensusSlotCheckpointsResponse {
/// Node known bootstrap slot
pub bootstrap_slot: u64,
/// Node is able to propose proposals
pub proposing: bool,
/// Node has hot/live slot checkpoints
pub is_empty: bool,
}

View File

@@ -24,7 +24,7 @@ use crate::{
ConsensusRequest, ConsensusResponse, ConsensusSlotCheckpointsRequest,
ConsensusSlotCheckpointsResponse,
},
ValidatorStatePtr,
Float10, ValidatorStatePtr,
},
net::P2pPtr,
util::async_util::sleep,
@@ -67,6 +67,10 @@ pub async fn consensus_sync_task(p2p: P2pPtr, state: ValidatorStatePtr) -> Resul
warn!(target: "consensus::consensus_sync", "Network was just bootstraped, checking rest nodes");
continue
}
if !response.proposing {
warn!(target: "consensus::consensus_sync", "Node is not proposing, checking rest nodes");
continue
}
if response.is_empty {
warn!(target: "consensus::consensus_sync", "Node has not seen any slot checkpoints, retrying...");
continue
@@ -113,7 +117,7 @@ pub async fn consensus_sync_task(p2p: P2pPtr, state: ValidatorStatePtr) -> Resul
let mut response = response_sub.receive().await?;
// Verify that peer has finished finalizing forks
loop {
if response.forks.len() != 1 {
if response.forks.len() != 0 {
warn!(target: "consensus::consensus_sync", "Peer has not finished finalization, retrying...");
sleep(1).await;
peer.send(ConsensusRequest {}).await?;
@@ -124,9 +128,8 @@ pub async fn consensus_sync_task(p2p: P2pPtr, state: ValidatorStatePtr) -> Resul
}
// Verify that the node has received all finalized blocks
let last_finalized_slot = response.forks[0].sequence[0].proposal.block.header.slot - 1;
loop {
if !state.read().await.blockchain.has_slot(last_finalized_slot)? {
if !state.read().await.blockchain.has_slot(response.current_slot)? {
warn!(target: "consensus::consensus_sync", "Node has not finished finalization, retrying...");
sleep(1).await;
continue
@@ -144,7 +147,19 @@ pub async fn consensus_sync_task(p2p: P2pPtr, state: ValidatorStatePtr) -> Resul
lock.consensus.forks = forks;
lock.unconfirmed_txs = response.unconfirmed_txs.clone();
lock.consensus.slot_checkpoints = response.slot_checkpoints.clone();
lock.consensus.previous_leaders = response.previous_leaders.clone();
lock.consensus.previous_leaders = 1;
let mut f_history = vec![];
for f in &response.f_history {
let f_float = Float10::try_from(f.as_str()).unwrap();
f_history.push(f_float);
}
lock.consensus.f_history = f_history;
let mut err_history = vec![];
for err in &response.err_history {
let err_float = Float10::try_from(err.as_str()).unwrap();
err_history.push(err_float);
}
lock.consensus.err_history = err_history;
lock.consensus.nullifiers = response.nullifiers.clone();
lock.consensus.init_coins().await?;