diff --git a/src/consensus/proto/protocol_sync.rs b/src/consensus/proto/protocol_sync.rs index 0f633eae0..aa4a5c166 100644 --- a/src/consensus/proto/protocol_sync.rs +++ b/src/consensus/proto/protocol_sync.rs @@ -125,12 +125,13 @@ impl ProtocolSync { // Check if node started participating in consensus. // Consensus-mode enabled nodes have already performed these steps, - // during proposal finalization. + // during proposal finalization. They still listen to this sub, + // in case they go out of sync and become a none-consensus node. if self.consensus_mode && self.state.read().await.consensus.participating.is_some() { debug!( "ProtocolSync::handle_receive_block(): node runs in consensus mode, skipping..." ); - return Ok(()) + continue } info!("ProtocolSync::handle_receive_block(): Received block: {}", info.blockhash()); @@ -218,12 +219,13 @@ impl ProtocolSync { // Check if node started participating in consensus. // Consensus-mode enabled nodes have already performed these steps, - // during proposal finalization. + // during proposal finalization. They still listen to this sub, + // in case they go out of sync and become a none-consensus node. if self.consensus_mode && self.state.read().await.consensus.participating.is_some() { debug!( "ProtocolSync::handle_receive_slot_checkpoint(): node runs in consensus mode, skipping..." ); - return Ok(()) + continue } info!( diff --git a/src/consensus/proto/protocol_sync_consensus.rs b/src/consensus/proto/protocol_sync_consensus.rs index e1665872a..9525361e2 100644 --- a/src/consensus/proto/protocol_sync_consensus.rs +++ b/src/consensus/proto/protocol_sync_consensus.rs @@ -122,8 +122,8 @@ impl ProtocolSyncConsensus { ); // Extra validations can be added here. - let slot_checkpoints = !self.state.read().await.consensus.slot_checkpoints.is_empty(); - let response = ConsensusSlotCheckpointsResponse { slot_checkpoints }; + let is_empty = self.state.read().await.consensus.slot_checkpoints.is_empty(); + let response = ConsensusSlotCheckpointsResponse { is_empty }; if let Err(e) = self.channel.send(response).await { error!("ProtocolSyncConsensus::handle_receive_slot_checkpoints_request() channel send fail: {}", e); }; diff --git a/src/consensus/state.rs b/src/consensus/state.rs index 532c32938..11998dedd 100644 --- a/src/consensus/state.rs +++ b/src/consensus/state.rs @@ -48,6 +48,8 @@ pub struct ConsensusState { pub genesis_block: blake3::Hash, /// Participating start slot pub participating: Option, + /// Node is able to propose proposals + pub proposing: bool, /// Last slot node check for finalization pub checked_finalization: u64, /// Slots offset since genesis, @@ -83,6 +85,7 @@ impl ConsensusState { genesis_ts, genesis_block, participating: None, + proposing: false, checked_finalization: 0, offset: None, forks: vec![], @@ -448,7 +451,8 @@ impl ConsensusState { return constants::MAX_F.clone() } let hist_len = self.leaders_history.len(); - if self.leaders_history[hist_len - 1] == 0 && + if hist_len > 3 && + self.leaders_history[hist_len - 1] == 0 && self.leaders_history[hist_len - 2] == 0 && self.leaders_history[hist_len - 3] == 0 && i == constants::FLOAT10_ZERO.clone() @@ -464,6 +468,10 @@ impl ConsensusState { /// * 'sigma1', 'sigma2': slot sigmas /// Returns: (check: bool, idx: usize) where idx is the winning coin's index pub fn is_slot_leader(&mut self, sigma1: pallas::Base, sigma2: pallas::Base) -> (bool, usize) { + // Check if node can produce proposals + if !self.proposing { + return (false, 0) + } let competing_coins = &self.coins.clone(); let mut won = false; @@ -589,7 +597,7 @@ impl ConsensusState { /// Auxillary function to set nodes leaders count history to the largest fork sequence /// of leaders, by using provided index. - pub fn set_leader_history(&mut self, index: i64) { + pub fn set_leader_history(&mut self, index: i64, current_slot: u64) { // Check if we found longest fork to extract sequence from match index { -1 => { @@ -598,7 +606,7 @@ impl ConsensusState { _ => { info!("set_leader_history(): Checking last proposal of fork: {}", index); let last_proposal = &self.forks[index as usize].sequence.last().unwrap().proposal; - if last_proposal.block.header.slot == self.current_slot() { + if last_proposal.block.header.slot == current_slot { // Replacing our last history element with the leaders one self.leaders_history.pop(); self.leaders_history.push(last_proposal.block.lead_info.leaders); @@ -654,6 +662,7 @@ impl ConsensusState { /// Auxiliary structure to reset consensus state for a resync pub fn reset(&mut self) { self.participating = None; + self.proposing = false; self.forks = vec![]; self.slot_checkpoints = vec![]; self.leaders_history = vec![0]; @@ -708,7 +717,7 @@ impl net::Message for ConsensusSlotCheckpointsRequest { #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] pub struct ConsensusSlotCheckpointsResponse { /// Node has hot/live slot checkpoints - pub slot_checkpoints: bool, + pub is_empty: bool, } impl net::Message for ConsensusSlotCheckpointsResponse { diff --git a/src/consensus/task/consensus_sync.rs b/src/consensus/task/consensus_sync.rs index fa68a3160..8e0fba536 100644 --- a/src/consensus/task/consensus_sync.rs +++ b/src/consensus/task/consensus_sync.rs @@ -31,57 +31,9 @@ use crate::{ }; /// async task used for consensus state syncing. -pub async fn consensus_sync_task(p2p: P2pPtr, state: ValidatorStatePtr) -> Result<()> { - info!("Starting consensus state sync..."); - let channels_map = p2p.channels().lock().await; - let values = channels_map.values(); - // Using len here because is_empty() uses unstable library feature - // called 'exact_size_is_empty'. - if values.len() != 0 { - // Node iterates the channel peers to ask for their consensus state - for channel in values { - // Communication setup - let msg_subsystem = channel.get_message_subsystem(); - msg_subsystem.add_dispatch::().await; - let response_sub = channel.subscribe_msg::().await?; - - // Node creates a `ConsensusRequest` and sends it - let request = ConsensusRequest {}; - channel.send(request).await?; - - // Node verifies response came from a participating node. - // Extra validations can be added here. - let response = response_sub.receive().await?; - if response.nullifiers.is_empty() { - warn!("Retrieved consensus state from a new node, retrying..."); - continue - } - - // Node stores response data. - let mut lock = state.write().await; - lock.consensus.offset = response.offset; - let mut forks = vec![]; - for fork in &response.forks { - forks.push(fork.clone().into()); - } - lock.consensus.forks = forks; - lock.unconfirmed_txs = response.unconfirmed_txs.clone(); - lock.consensus.slot_checkpoints = response.slot_checkpoints.clone(); - lock.consensus.leaders_history = response.leaders_history.clone(); - lock.consensus.nullifiers = response.nullifiers.clone(); - - break - } - } else { - warn!("Node is not connected to other nodes"); - } - - info!("Consensus state synced!"); - Ok(()) -} - -/// async task used for consensus state syncing. -pub async fn consensus_sync_task2(p2p: P2pPtr, state: ValidatorStatePtr) -> Result<()> { +/// Returns flag if node is not connected to other peers or consensus hasn't started, +/// so it can immediately start proposing proposals. +pub async fn consensus_sync_task(p2p: P2pPtr, state: ValidatorStatePtr) -> Result { info!("Starting consensus state sync..."); // Loop through connected channels let channels_map = p2p.channels().lock().await; @@ -91,7 +43,7 @@ pub async fn consensus_sync_task2(p2p: P2pPtr, state: ValidatorStatePtr) -> Resu if values.len() == 0 { warn!("Node is not connected to other nodes"); info!("Consensus state synced!"); - return Ok(()) + return Ok(true) } // Node iterates the channel peers to check if at least on peer has seen slot checkpoints @@ -108,7 +60,7 @@ pub async fn consensus_sync_task2(p2p: P2pPtr, state: ValidatorStatePtr) -> Resu // Node checks response let response = response_sub.receive().await?; - if !response.slot_checkpoints { + if response.is_empty { warn!("Node has not seen any slot checkpoints, retrying..."); continue } @@ -126,7 +78,7 @@ pub async fn consensus_sync_task2(p2p: P2pPtr, state: ValidatorStatePtr) -> Resu if peer.is_none() { warn!("No node that has seen any slot checkpoints was found."); info!("Consensus state synced!"); - return Ok(()) + return Ok(true) } let peer = peer.unwrap(); @@ -166,5 +118,5 @@ pub async fn consensus_sync_task2(p2p: P2pPtr, state: ValidatorStatePtr) -> Resu lock.consensus.nullifiers = response.nullifiers.clone(); info!("Consensus state synced!"); - Ok(()) + Ok(false) } diff --git a/src/consensus/task/proposal.rs b/src/consensus/task/proposal.rs index 1b9e8c1a8..b295d9b39 100644 --- a/src/consensus/task/proposal.rs +++ b/src/consensus/task/proposal.rs @@ -34,155 +34,12 @@ pub async fn proposal_task( sync_p2p: P2pPtr, state: ValidatorStatePtr, ex: Arc>, -) { - // Node waits just before the current or next epoch last finalization syncing period, so it can - // start syncing latest state. - let mut seconds_until_next_epoch = state.read().await.consensus.next_n_epoch_start(1); - let sync_offset = Duration::new(constants::FINAL_SYNC_DUR + 1, 0); - - loop { - if seconds_until_next_epoch > sync_offset { - seconds_until_next_epoch -= sync_offset; - break - } - - info!("consensus: Waiting for next epoch ({:?} sec)", seconds_until_next_epoch); - sleep(seconds_until_next_epoch.as_secs()).await; - seconds_until_next_epoch = state.read().await.consensus.next_n_epoch_start(1); - } - - info!("consensus: Waiting for next epoch ({:?} sec)", seconds_until_next_epoch); - sleep(seconds_until_next_epoch.as_secs()).await; - - // Node syncs its consensus state - if let Err(e) = consensus_sync_task(consensus_p2p.clone(), state.clone()).await { - error!("consensus: Failed syncing consensus state: {}. Quitting consensus.", e); - // TODO: Perhaps notify over a channel in order to - // stop consensus p2p protocols. - return - }; - - // Node modifies its participating slot to next. - match state.write().await.consensus.set_participating() { - Ok(()) => info!("consensus: Node will start participating in the next slot"), - Err(e) => error!("consensus: Failed to set participation slot: {}", e), - } - - loop { - // Node sleeps until finalization sync period start (2 seconds before next slot) - let seconds_sync_period = (state.read().await.consensus.next_n_slot_start(1) - - Duration::new(constants::FINAL_SYNC_DUR, 0)) - .as_secs(); - info!("consensus: Waiting for finalization sync period ({} sec)", seconds_sync_period); - sleep(seconds_sync_period).await; - - // Check if any forks can be finalized - match state.write().await.chain_finalization().await { - Ok((to_broadcast_block, to_broadcast_slot_checkpoints)) => { - // Broadcasting in background - if !to_broadcast_block.is_empty() || !to_broadcast_slot_checkpoints.is_empty() { - let _sync_p2p = sync_p2p.clone(); - ex.spawn(async move { - // Broadcast finalized blocks info, if any: - info!("consensus: Broadcasting finalized blocks"); - for info in to_broadcast_block { - match _sync_p2p.broadcast(info).await { - Ok(()) => info!("consensus: Broadcasted block"), - Err(e) => error!("consensus: Failed broadcasting block: {}", e), - } - } - - // Broadcast finalized slot checkpoints, if any: - info!("consensus: Broadcasting finalized slot checkpoints"); - for slot_checkpoint in to_broadcast_slot_checkpoints { - match _sync_p2p.broadcast(slot_checkpoint).await { - Ok(()) => info!("consensus: Broadcasted slot_checkpoint"), - Err(e) => { - error!("consensus: Failed broadcasting slot_checkpoint: {}", e) - } - } - } - }) - .detach(); - } else { - info!("consensus: No finalized blocks or slot checkpoints to broadcast"); - } - } - Err(e) => { - error!("consensus: Finalization check failed: {}", e); - } - } - - // Node sleeps until next slot - let seconds_next_slot = state.read().await.consensus.next_n_slot_start(1).as_secs(); - info!("consensus: Waiting for next slot ({} sec)", seconds_next_slot); - sleep(seconds_next_slot).await; - - // Retrieve slot sigmas - let (sigma1, sigma2) = state.write().await.consensus.sigmas(); - // Node checks if epoch has changed, to generate new epoch coins - let epoch_changed = state.write().await.consensus.epoch_changed(sigma1, sigma2).await; - match epoch_changed { - Ok(changed) => { - if changed { - info!("consensus: New epoch started: {}", state.read().await.consensus.epoch); - } - } - Err(e) => { - error!("consensus: Epoch check failed: {}", e); - continue - } - }; - // Node checks if it's the slot leader to generate a new proposal - // for that slot. - let (won, idx) = state.write().await.consensus.is_slot_leader(sigma1, sigma2); - let result = if won { state.write().await.propose(idx, sigma1, sigma2) } else { Ok(None) }; - let (proposal, coin) = match result { - Ok(pair) => { - if pair.is_none() { - info!("consensus: Node is not the slot lead"); - continue - } - pair.unwrap() - } - Err(e) => { - error!("consensus: Block proposal failed: {}", e); - continue - } - }; - - // Node stores the proposal and broadcast to rest nodes - info!("consensus: Node is the slot leader: Proposed block: {}", proposal); - debug!("consensus: Full proposal: {:?}", proposal); - match state.write().await.receive_proposal(&proposal, Some((idx, coin))).await { - Ok(()) => { - info!("consensus: Block proposal saved successfully"); - // Broadcast proposal to other consensus nodes - match consensus_p2p.broadcast(proposal).await { - Ok(()) => info!("consensus: Proposal broadcasted successfully"), - Err(e) => error!("consensus: Failed broadcasting proposal: {}", e), - } - } - Err(e) => { - error!("consensus: Block proposal save failed: {}", e); - } - } - } -} - -/// async task used for participating in the consensus protocol -pub async fn proposal_task2( - consensus_p2p: P2pPtr, - sync_p2p: P2pPtr, - state: ValidatorStatePtr, - ex: Arc>, ) { let mut retries = 0; // Sync loop loop { // Resetting consensus state, so node can still follow the finalized blocks by // the sync p2p network/protocols - // TODO: verify that if a consensus node stops participating will receive finalized blocks state.write().await.consensus.reset(); // Checking sync retries @@ -193,11 +50,21 @@ pub async fn proposal_task2( } // Node syncs its consensus state - if let Err(e) = consensus_sync_task(consensus_p2p.clone(), state.clone()).await { - error!("consensus: Failed syncing consensus state: {}. Quitting consensus.", e); - // TODO: Perhaps notify over a channel in order to - // stop consensus p2p protocols. - return + match consensus_sync_task(consensus_p2p.clone(), state.clone()).await { + Ok(p) => { + // Check if node is not connected to other nodes and can + // start proposing immediately. + if p { + info!("consensus: Node can start proposing!"); + state.write().await.consensus.proposing = p; + } + } + Err(e) => { + error!("consensus: Failed syncing consensus state: {}. Quitting consensus.", e); + // TODO: Perhaps notify over a channel in order to + // stop consensus p2p protocols. + return + } }; // Node modifies its participating slot to next. @@ -230,135 +97,174 @@ async fn consensus_loop( state: ValidatorStatePtr, ex: Arc>, ) { + // Note: when a node can start produce proposals is only enforced in code, + // where we verify if the hardware can keep up with the consensus, by + // counting how many consecutive slots node successfully listend and process + // everything. Later, this will be enforced via contract, where it will be explicit + // when a node can produce proposals, and after which slot they can be considered as valid. + let mut listened_slots = 0; + let mut changed_status = false; loop { - // TODO: Change order, since node extis consensus sync right before next slot - // Node sleeps until finalization sync period starts - let next_slot_start = state.read().await.consensus.next_n_slot_start(1); - let seconds_sync_period = if next_slot_start.as_secs() > constants::FINAL_SYNC_DUR { - (next_slot_start - Duration::new(constants::FINAL_SYNC_DUR, 0)).as_secs() + // Check if node can start proposing. + // This code ensures that we only change the status once + // and listened_slots doesn't increment further. + if listened_slots > constants::EPOCH_LENGTH { + if !changed_status { + info!("consensus: Node can start proposing!"); + state.write().await.consensus.proposing = true; + changed_status = true; + } } else { - next_slot_start.as_secs() - }; - info!("consensus: Waiting for finalization sync period ({} sec)", seconds_sync_period); - sleep(seconds_sync_period).await; - - // Keep a record of slot to verify if next slot got skipped during processing - let completed_slot = state.read().await.consensus.current_slot(); - - // Check if any forks can be finalized - match state.write().await.chain_finalization().await { - Ok((to_broadcast_block, to_broadcast_slot_checkpoints)) => { - // Broadcasting in background - if !to_broadcast_block.is_empty() || !to_broadcast_slot_checkpoints.is_empty() { - let _sync_p2p = sync_p2p.clone(); - ex.spawn(async move { - // Broadcast finalized blocks info, if any: - info!("consensus: Broadcasting finalized blocks"); - for info in to_broadcast_block { - match _sync_p2p.broadcast(info).await { - Ok(()) => info!("consensus: Broadcasted block"), - Err(e) => error!("consensus: Failed broadcasting block: {}", e), - } - } - - // Broadcast finalized slot checkpoints, if any: - info!("consensus: Broadcasting finalized slot checkpoints"); - for slot_checkpoint in to_broadcast_slot_checkpoints { - match _sync_p2p.broadcast(slot_checkpoint).await { - Ok(()) => info!("consensus: Broadcasted slot_checkpoint"), - Err(e) => { - error!("consensus: Failed broadcasting slot_checkpoint: {}", e) - } - } - } - }) - .detach(); - } else { - info!("consensus: No finalized blocks or slot checkpoints to broadcast"); - } - } - Err(e) => { - error!("consensus: Finalization check failed: {}", e); - } + listened_slots += 1; } - // Verify node didn't skip next slot - let current_slot = state.read().await.consensus.current_slot(); - if completed_slot != current_slot { + // Node waits and execute consensus protocol propose period. + if propose_period(consensus_p2p.clone(), state.clone()).await { + // Node needs to resync warn!( - "consensus: Node missed slot {} due to finalizated blocks processing, resyncing...", - current_slot + "consensus: Node missed slot {} due to proposal processing, resyncing...", + state.read().await.consensus.current_slot() ); break } - // Node sleeps until next slot - let seconds_next_slot = state.read().await.consensus.next_n_slot_start(1).as_secs(); - info!("consensus: Waiting for next slot ({} sec)", seconds_next_slot); - sleep(seconds_next_slot).await; - - // Keep a record of slot to verify if next slot got skipped during processing - let processing_slot = state.read().await.consensus.current_slot(); - - // Retrieve slot sigmas - let (sigma1, sigma2) = state.write().await.consensus.sigmas(); - // Node checks if epoch has changed and generate slot checkpoint - let epoch_changed = state.write().await.consensus.epoch_changed(sigma1, sigma2).await; - match epoch_changed { - Ok(changed) => { - if changed { - info!("consensus: New epoch started: {}", state.read().await.consensus.epoch); - } - } - Err(e) => { - error!("consensus: Epoch check failed: {}", e); - continue - } - }; - - // Node checks if it's the slot leader to generate a new proposal - // for that slot. - let (won, idx) = state.write().await.consensus.is_slot_leader(sigma1, sigma2); - let result = if won { state.write().await.propose(idx, sigma1, sigma2) } else { Ok(None) }; - let (proposal, coin) = match result { - Ok(pair) => { - if pair.is_none() { - info!("consensus: Node is not the slot lead"); - continue - } - pair.unwrap() - } - Err(e) => { - error!("consensus: Block proposal failed: {}", e); - continue - } - }; - - // Node stores the proposal and broadcast to rest nodes - info!("consensus: Node is the slot leader: Proposed block: {}", proposal); - debug!("consensus: Full proposal: {:?}", proposal); - match state.write().await.receive_proposal(&proposal, Some((idx, coin))).await { - Ok(()) => { - info!("consensus: Block proposal saved successfully"); - // Broadcast proposal to other consensus nodes - match consensus_p2p.broadcast(proposal).await { - Ok(()) => info!("consensus: Proposal broadcasted successfully"), - Err(e) => error!("consensus: Failed broadcasting proposal: {}", e), - } - } - Err(e) => { - error!("consensus: Block proposal save failed: {}", e); - } - } - - // Verify node didn't skip next slot - let current_slot = state.read().await.consensus.current_slot(); - if processing_slot != current_slot { + // Node waits and execute consensus protocol finalization period. + if finalization_period(sync_p2p.clone(), state.clone(), ex.clone()).await { + // Node needs to resync warn!( - "consensus: Node missed slot {} due to proposal processing, resyncing...", - current_slot + "consensus: Node missed slot {} due to finalizated blocks processing, resyncing...", + state.read().await.consensus.current_slot() ); break } } } + +/// async function to wait and execute consensus protocol propose period. +/// Propose period consists of 2 parts: +/// - Generate slot sigmas and checkpoint +/// - Check if slot leader to generate and broadcast proposal +/// Returns flag in case node needs to resync. +async fn propose_period(consensus_p2p: P2pPtr, state: ValidatorStatePtr) -> bool { + // Node sleeps until next slot + let seconds_next_slot = state.read().await.consensus.next_n_slot_start(1).as_secs(); + info!("consensus: Waiting for next slot ({} sec)", seconds_next_slot); + sleep(seconds_next_slot).await; + + // Keep a record of slot to verify if next slot got skipped during processing + let processing_slot = state.read().await.consensus.current_slot(); + + // Retrieve slot sigmas + let (sigma1, sigma2) = state.write().await.consensus.sigmas(); + // Node checks if epoch has changed and generate slot checkpoint + let epoch_changed = state.write().await.consensus.epoch_changed(sigma1, sigma2).await; + match epoch_changed { + Ok(changed) => { + if changed { + info!("consensus: New epoch started: {}", state.read().await.consensus.epoch); + } + } + Err(e) => { + error!("consensus: Epoch check failed: {}", e); + return false + } + }; + + // Node checks if it's the slot leader to generate a new proposal + // for that slot. + let (won, idx) = state.write().await.consensus.is_slot_leader(sigma1, sigma2); + let result = if won { state.write().await.propose(idx, sigma1, sigma2) } else { Ok(None) }; + let (proposal, coin) = match result { + Ok(pair) => { + if pair.is_none() { + info!("consensus: Node is not the slot lead"); + return false + } + pair.unwrap() + } + Err(e) => { + error!("consensus: Block proposal failed: {}", e); + return false + } + }; + + // Node stores the proposal and broadcast to rest nodes + info!("consensus: Node is the slot leader: Proposed block: {}", proposal); + debug!("consensus: Full proposal: {:?}", proposal); + match state.write().await.receive_proposal(&proposal, Some((idx, coin))).await { + Ok(()) => { + info!("consensus: Block proposal saved successfully"); + // Broadcast proposal to other consensus nodes + match consensus_p2p.broadcast(proposal).await { + Ok(()) => info!("consensus: Proposal broadcasted successfully"), + Err(e) => error!("consensus: Failed broadcasting proposal: {}", e), + } + } + Err(e) => { + error!("consensus: Block proposal save failed: {}", e); + } + } + + // Verify node didn't skip next slot + processing_slot != state.read().await.consensus.current_slot() +} + +/// async function to wait and execute consensus protocol finalization period. +/// Returns flag in case node needs to resync. +async fn finalization_period( + sync_p2p: P2pPtr, + state: ValidatorStatePtr, + ex: Arc>, +) -> bool { + // Node sleeps until finalization sync period starts + let next_slot_start = state.read().await.consensus.next_n_slot_start(1); + let seconds_sync_period = if next_slot_start.as_secs() > constants::FINAL_SYNC_DUR { + (next_slot_start - Duration::new(constants::FINAL_SYNC_DUR, 0)).as_secs() + } else { + next_slot_start.as_secs() + }; + info!("consensus: Waiting for finalization sync period ({} sec)", seconds_sync_period); + sleep(seconds_sync_period).await; + + // Keep a record of slot to verify if next slot got skipped during processing + let completed_slot = state.read().await.consensus.current_slot(); + + // Check if any forks can be finalized + match state.write().await.chain_finalization().await { + Ok((to_broadcast_block, to_broadcast_slot_checkpoints)) => { + // Broadcasting in background + if !to_broadcast_block.is_empty() || !to_broadcast_slot_checkpoints.is_empty() { + ex.spawn(async move { + // Broadcast finalized blocks info, if any: + info!("consensus: Broadcasting finalized blocks"); + for info in to_broadcast_block { + match sync_p2p.broadcast(info).await { + Ok(()) => info!("consensus: Broadcasted block"), + Err(e) => error!("consensus: Failed broadcasting block: {}", e), + } + } + + // Broadcast finalized slot checkpoints, if any: + info!("consensus: Broadcasting finalized slot checkpoints"); + for slot_checkpoint in to_broadcast_slot_checkpoints { + match sync_p2p.broadcast(slot_checkpoint).await { + Ok(()) => info!("consensus: Broadcasted slot_checkpoint"), + Err(e) => { + error!("consensus: Failed broadcasting slot_checkpoint: {}", e) + } + } + } + }) + .detach(); + } else { + info!("consensus: No finalized blocks or slot checkpoints to broadcast"); + } + } + Err(e) => { + error!("consensus: Finalization check failed: {}", e); + } + } + + // Verify node didn't skip next slot + completed_slot != state.read().await.consensus.current_slot() +} diff --git a/src/consensus/validator.rs b/src/consensus/validator.rs index d44ebf93a..1d59353ad 100644 --- a/src/consensus/validator.rs +++ b/src/consensus/validator.rs @@ -259,12 +259,16 @@ impl ValidatorState { sigma1: pallas::Base, sigma2: pallas::Base, ) -> Result> { + // Check if node can produce proposals + if !self.consensus.proposing { + return Ok(None) + } + + // Generate proposal let slot = self.consensus.current_slot(); let (prev_hash, index) = self.consensus.longest_chain_last_hash().unwrap(); let unproposed_txs = self.unproposed_txs(index); - // TODO: [PLACEHOLDER] Create and add rewards transaction - let mut tree = BridgeTree::::new(100); // The following is pretty weird, so something better should be done. for tx in &unproposed_txs { @@ -581,12 +585,14 @@ impl ValidatorState { let mut fork_index = -1; // Use this index to extract leaders count sequence from longest fork let mut index_for_history = -1; + let mut max_length_for_history = 0; let mut max_length = 0; for (index, fork) in self.consensus.forks.iter().enumerate() { let length = fork.sequence.len(); // Check if greater than max to retain index for history - if length > max_length { + if length > max_length_for_history { index_for_history = index as i64; + max_length_for_history = length; } // Ignore forks with less that 3 blocks if length < 3 { @@ -612,12 +618,12 @@ impl ValidatorState { match fork_index { -2 => { info!("chain_finalization(): Eligible forks with same height exist, nothing to finalize."); - self.consensus.set_leader_history(index_for_history); + self.consensus.set_leader_history(index_for_history, slot); return Ok((vec![], vec![])) } -1 => { info!("chain_finalization(): All chains have less than 3 proposals, nothing to finalize."); - self.consensus.set_leader_history(index_for_history); + self.consensus.set_leader_history(index_for_history, slot); return Ok((vec![], vec![])) } _ => info!("chain_finalization(): Chain {} can be finalized!", fork_index),