diff --git a/src/consensus/proto/protocol_sync_consensus.rs b/src/consensus/proto/protocol_sync_consensus.rs index d9e9e8fe0..f799679bc 100644 --- a/src/consensus/proto/protocol_sync_consensus.rs +++ b/src/consensus/proto/protocol_sync_consensus.rs @@ -94,20 +94,32 @@ impl ProtocolSyncConsensus { // Extra validations can be added here. let lock = self.state.read().await; let bootstrap_slot = lock.consensus.bootstrap_slot; + let current_slot = lock.consensus.current_slot(); let mut forks = vec![]; for fork in &lock.consensus.forks { forks.push(fork.clone().into()); } let unconfirmed_txs = lock.unconfirmed_txs.clone(); let slot_checkpoints = lock.consensus.slot_checkpoints.clone(); - let previous_leaders = lock.consensus.previous_leaders.clone(); + let mut f_history = vec![]; + for f in &lock.consensus.f_history { + let f_str = format!("{:}", f); + f_history.push(f_str); + } + let mut err_history = vec![]; + for err in &lock.consensus.err_history { + let err_str = format!("{:}", err); + err_history.push(err_str); + } let nullifiers = lock.consensus.nullifiers.clone(); let response = ConsensusResponse { bootstrap_slot, + current_slot, forks, unconfirmed_txs, slot_checkpoints, - previous_leaders, + f_history, + err_history, nullifiers, }; if let Err(e) = self.channel.send(response).await { @@ -147,8 +159,9 @@ impl ProtocolSyncConsensus { // Extra validations can be added here. let lock = self.state.read().await; let bootstrap_slot = lock.consensus.bootstrap_slot; + let proposing = lock.consensus.proposing; let is_empty = lock.consensus.slot_checkpoints_is_empty(); - let response = ConsensusSlotCheckpointsResponse { bootstrap_slot, is_empty }; + let response = ConsensusSlotCheckpointsResponse { bootstrap_slot, proposing, is_empty }; if let Err(e) = self.channel.send(response).await { error!( target: "consensus::protocol_sync_consensus::handle_receive_slot_checkpoints_request()", diff --git a/src/consensus/state.rs b/src/consensus/state.rs index fae2853f0..cd560b968 100644 --- a/src/consensus/state.rs +++ b/src/consensus/state.rs @@ -77,9 +77,9 @@ pub struct ConsensusState { pub slot_checkpoints: Vec, /// Last slot leaders count pub previous_leaders: u64, - /// controller output history + /// Controller output history pub f_history: Vec, - /// controller proportional error history + /// Controller proportional error history pub err_history: Vec, // TODO: Aren't these already in db after finalization? /// Canonical competing coins @@ -215,14 +215,8 @@ impl ConsensusState { // Initialize node lead coins and set current epoch and eta. pub async fn init_coins(&mut self) -> Result<()> { self.epoch = self.current_epoch(); - // Create slot checkpoint if not on genesis slot (already in db) - if self.slot_checkpoints.is_empty() && self.current_slot() != 0 { - let (sigma1, sigma2) = self.sigmas(); - self.generate_slot_checkpoint(sigma1, sigma2); - }; self.coins = self.create_coins().await?; self.update_forks_checkpoints(); - Ok(()) } @@ -247,13 +241,11 @@ impl ConsensusState { pub fn sigmas(&mut self) -> (pallas::Base, pallas::Base) { let f = self.win_inv_prob_with_full_stake(); let total_stake = self.total_stake(); - let total_sigma = Float10::try_from(total_stake).unwrap(); - - Self::calc_sigmas(f, total_sigma) + self.calc_sigmas(f, total_sigma) } - fn calc_sigmas(f: Float10, total_sigma: Float10) -> (pallas::Base, pallas::Base) { + fn calc_sigmas(&self, f: Float10, total_sigma: Float10) -> (pallas::Base, pallas::Base) { info!(target: "consensus::state", "sigmas(): f: {}", f); info!(target: "consensus::state", "sigmas(): total network stake: {:}", total_sigma); @@ -285,12 +277,7 @@ impl ConsensusState { /// Generate coins for provided sigmas. /// NOTE: The strategy here is having a single competing coin per slot. // TODO: DRK coin need to be burned, and consensus coin to be minted. - async fn create_coins( - &mut self, - //eta: pallas::Base, - ) -> Result> { - let slot = self.current_slot(); - + async fn create_coins(&mut self) -> Result> { // TODO: cleanup LeadCoinSecrets, no need to keep a vector let (seeds, epoch_secrets) = { let mut rng = thread_rng(); @@ -326,7 +313,7 @@ impl ConsensusState { //let stake = self.initial_distribution; let c = LeadCoin::new( 0, - slot, + self.current_slot(), epoch_secrets.secret_keys[0].inner(), epoch_secrets.merkle_roots[0], 0, @@ -652,6 +639,8 @@ impl ConsensusState { self.forks = vec![]; self.slot_checkpoints = vec![]; self.previous_leaders = 0; + self.f_history = vec![constants::FLOAT10_ZERO.clone()]; + self.err_history = vec![constants::FLOAT10_ZERO.clone(), constants::FLOAT10_ZERO.clone()]; self.nullifiers = vec![]; } } @@ -671,14 +660,20 @@ impl net::Message for ConsensusRequest { pub struct ConsensusResponse { /// Slot the network was bootstrapped pub bootstrap_slot: u64, + /// Current slot + pub current_slot: u64, /// Hot/live data used by the consensus algorithm pub forks: Vec, /// Pending transactions pub unconfirmed_txs: Vec, /// Hot/live slot checkpoints pub slot_checkpoints: Vec, - /// Last slot leaders count - pub previous_leaders: u64, + // TODO: When Float10 supports encoding/decoding this should be + // replaced by directly using Vec + /// Controller output history + pub f_history: Vec, + /// Controller proportional error history + pub err_history: Vec, /// Seen nullifiers from proposals pub nullifiers: Vec, } @@ -704,6 +699,8 @@ impl net::Message for ConsensusSlotCheckpointsRequest { pub struct ConsensusSlotCheckpointsResponse { /// Node known bootstrap slot pub bootstrap_slot: u64, + /// Node is able to propose proposals + pub proposing: bool, /// Node has hot/live slot checkpoints pub is_empty: bool, } diff --git a/src/consensus/task/consensus_sync.rs b/src/consensus/task/consensus_sync.rs index 5f3ea1865..57b64cac7 100644 --- a/src/consensus/task/consensus_sync.rs +++ b/src/consensus/task/consensus_sync.rs @@ -24,7 +24,7 @@ use crate::{ ConsensusRequest, ConsensusResponse, ConsensusSlotCheckpointsRequest, ConsensusSlotCheckpointsResponse, }, - ValidatorStatePtr, + Float10, ValidatorStatePtr, }, net::P2pPtr, util::async_util::sleep, @@ -67,6 +67,10 @@ pub async fn consensus_sync_task(p2p: P2pPtr, state: ValidatorStatePtr) -> Resul warn!(target: "consensus::consensus_sync", "Network was just bootstraped, checking rest nodes"); continue } + if !response.proposing { + warn!(target: "consensus::consensus_sync", "Node is not proposing, checking rest nodes"); + continue + } if response.is_empty { warn!(target: "consensus::consensus_sync", "Node has not seen any slot checkpoints, retrying..."); continue @@ -113,7 +117,7 @@ pub async fn consensus_sync_task(p2p: P2pPtr, state: ValidatorStatePtr) -> Resul let mut response = response_sub.receive().await?; // Verify that peer has finished finalizing forks loop { - if response.forks.len() != 1 { + if response.forks.len() != 0 { warn!(target: "consensus::consensus_sync", "Peer has not finished finalization, retrying..."); sleep(1).await; peer.send(ConsensusRequest {}).await?; @@ -124,9 +128,8 @@ pub async fn consensus_sync_task(p2p: P2pPtr, state: ValidatorStatePtr) -> Resul } // Verify that the node has received all finalized blocks - let last_finalized_slot = response.forks[0].sequence[0].proposal.block.header.slot - 1; loop { - if !state.read().await.blockchain.has_slot(last_finalized_slot)? { + if !state.read().await.blockchain.has_slot(response.current_slot)? { warn!(target: "consensus::consensus_sync", "Node has not finished finalization, retrying..."); sleep(1).await; continue @@ -144,7 +147,19 @@ pub async fn consensus_sync_task(p2p: P2pPtr, state: ValidatorStatePtr) -> Resul lock.consensus.forks = forks; lock.unconfirmed_txs = response.unconfirmed_txs.clone(); lock.consensus.slot_checkpoints = response.slot_checkpoints.clone(); - lock.consensus.previous_leaders = response.previous_leaders.clone(); + lock.consensus.previous_leaders = 1; + let mut f_history = vec![]; + for f in &response.f_history { + let f_float = Float10::try_from(f.as_str()).unwrap(); + f_history.push(f_float); + } + lock.consensus.f_history = f_history; + let mut err_history = vec![]; + for err in &response.err_history { + let err_float = Float10::try_from(err.as_str()).unwrap(); + err_history.push(err_float); + } + lock.consensus.err_history = err_history; lock.consensus.nullifiers = response.nullifiers.clone(); lock.consensus.init_coins().await?;