From abfc86c0fb7516455107866af4e557a17166cdd4 Mon Sep 17 00:00:00 2001 From: aggstam Date: Fri, 16 Dec 2022 18:40:30 +0200 Subject: [PATCH] WIP: consensus/consensus_sync(): redesign --- .../proto/protocol_sync_consensus.rs | 43 ++++++++- src/consensus/state.rs | 32 +++++++ src/consensus/task/consensus_sync.rs | 94 ++++++++++++++++++- src/consensus/task/proposal.rs | 27 +----- 4 files changed, 170 insertions(+), 26 deletions(-) diff --git a/src/consensus/proto/protocol_sync_consensus.rs b/src/consensus/proto/protocol_sync_consensus.rs index 35cc34688..e1665872a 100644 --- a/src/consensus/proto/protocol_sync_consensus.rs +++ b/src/consensus/proto/protocol_sync_consensus.rs @@ -23,7 +23,10 @@ use smol::Executor; use crate::{ consensus::{ - state::{ConsensusRequest, ConsensusResponse}, + state::{ + ConsensusRequest, ConsensusResponse, ConsensusSlotCheckpointsRequest, + ConsensusSlotCheckpointsResponse, + }, ValidatorStatePtr, }, net::{ @@ -36,6 +39,7 @@ use crate::{ pub struct ProtocolSyncConsensus { channel: ChannelPtr, request_sub: MessageSubscription, + slot_checkpoints_request_sub: MessageSubscription, jobsman: ProtocolJobsManagerPtr, state: ValidatorStatePtr, } @@ -48,12 +52,16 @@ impl ProtocolSyncConsensus { ) -> Result { let msg_subsystem = channel.get_message_subsystem(); msg_subsystem.add_dispatch::().await; + msg_subsystem.add_dispatch::().await; let request_sub = channel.subscribe_msg::().await?; + let slot_checkpoints_request_sub = + channel.subscribe_msg::().await?; Ok(Arc::new(Self { channel: channel.clone(), request_sub, + slot_checkpoints_request_sub, jobsman: ProtocolJobsManager::new("SyncConsensusProtocol", channel), state, })) @@ -62,7 +70,7 @@ impl ProtocolSyncConsensus { async fn handle_receive_request(self: Arc) -> Result<()> { debug!("ProtocolSyncConsensus::handle_receive_request() [START]"); loop { - let order = match self.request_sub.receive().await { + let req = match self.request_sub.receive().await { Ok(v) => v, Err(e) => { error!("ProtocolSyncConsensus::handle_receive_request() recv fail: {}", e); @@ -70,7 +78,7 @@ impl ProtocolSyncConsensus { } }; - debug!("ProtocolSyncConsensuss::handle_receive_request() received {:?}", order); + debug!("ProtocolSyncConsensuss::handle_receive_request() received {:?}", req); // Extra validations can be added here. let lock = self.state.read().await; @@ -96,6 +104,31 @@ impl ProtocolSyncConsensus { }; } } + + async fn handle_receive_slot_checkpoints_request(self: Arc) -> Result<()> { + debug!("ProtocolSyncConsensus::handle_receive_slot_checkpoints_request() [START]"); + loop { + let req = match self.slot_checkpoints_request_sub.receive().await { + Ok(v) => v, + Err(e) => { + error!("ProtocolSyncConsensus::handle_receive_slot_checkpoints_request() recv fail: {}", e); + continue + } + }; + + debug!( + "ProtocolSyncConsensuss::handle_receive_slot_checkpoints_request() received {:?}", + req + ); + + // Extra validations can be added here. + let slot_checkpoints = !self.state.read().await.consensus.slot_checkpoints.is_empty(); + let response = ConsensusSlotCheckpointsResponse { slot_checkpoints }; + if let Err(e) = self.channel.send(response).await { + error!("ProtocolSyncConsensus::handle_receive_slot_checkpoints_request() channel send fail: {}", e); + }; + } + } } #[async_trait] @@ -104,6 +137,10 @@ impl ProtocolBase for ProtocolSyncConsensus { debug!("ProtocolSyncConsensus::start() [START]"); self.jobsman.clone().start(executor.clone()); self.jobsman.clone().spawn(self.clone().handle_receive_request(), executor.clone()).await; + self.jobsman + .clone() + .spawn(self.clone().handle_receive_slot_checkpoints_request(), executor.clone()) + .await; debug!("ProtocolSyncConsensus::start() [END]"); Ok(()) } diff --git a/src/consensus/state.rs b/src/consensus/state.rs index c9fcd720d..532c32938 100644 --- a/src/consensus/state.rs +++ b/src/consensus/state.rs @@ -650,6 +650,15 @@ impl ConsensusState { } } } + + /// Auxiliary structure to reset consensus state for a resync + pub fn reset(&mut self) { + self.participating = None; + self.forks = vec![]; + self.slot_checkpoints = vec![]; + self.leaders_history = vec![0]; + self.nullifiers = vec![]; + } } /// Auxiliary structure used for consensus syncing. @@ -685,6 +694,29 @@ impl net::Message for ConsensusResponse { } } +/// Auxiliary structure used for consensus syncing. +#[derive(Debug, SerialEncodable, SerialDecodable)] +pub struct ConsensusSlotCheckpointsRequest {} + +impl net::Message for ConsensusSlotCheckpointsRequest { + fn name() -> &'static str { + "consensusslotcheckpointsrequest" + } +} + +/// Auxiliary structure used for consensus syncing. +#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] +pub struct ConsensusSlotCheckpointsResponse { + /// Node has hot/live slot checkpoints + pub slot_checkpoints: bool, +} + +impl net::Message for ConsensusSlotCheckpointsResponse { + fn name() -> &'static str { + "consensusslotcheckpointsresponse" + } +} + /// Auxiliary structure used to keep track of slot validation parameters. #[derive(Debug, Clone, SerialEncodable, SerialDecodable)] pub struct SlotCheckpoint { diff --git a/src/consensus/task/consensus_sync.rs b/src/consensus/task/consensus_sync.rs index a3dc63f2c..fa68a3160 100644 --- a/src/consensus/task/consensus_sync.rs +++ b/src/consensus/task/consensus_sync.rs @@ -20,7 +20,10 @@ use log::{info, warn}; use crate::{ consensus::{ - state::{ConsensusRequest, ConsensusResponse}, + state::{ + ConsensusRequest, ConsensusResponse, ConsensusSlotCheckpointsRequest, + ConsensusSlotCheckpointsResponse, + }, ValidatorStatePtr, }, net::P2pPtr, @@ -76,3 +79,92 @@ pub async fn consensus_sync_task(p2p: P2pPtr, state: ValidatorStatePtr) -> Resul info!("Consensus state synced!"); Ok(()) } + +/// async task used for consensus state syncing. +pub async fn consensus_sync_task2(p2p: P2pPtr, state: ValidatorStatePtr) -> Result<()> { + info!("Starting consensus state sync..."); + // Loop through connected channels + let channels_map = p2p.channels().lock().await; + let values = channels_map.values(); + // Using len here because is_empty() uses unstable library feature + // called 'exact_size_is_empty'. + if values.len() == 0 { + warn!("Node is not connected to other nodes"); + info!("Consensus state synced!"); + return Ok(()) + } + + // Node iterates the channel peers to check if at least on peer has seen slot checkpoints + let mut peer = None; + for channel in values { + // Communication setup + let msg_subsystem = channel.get_message_subsystem(); + msg_subsystem.add_dispatch::().await; + let response_sub = channel.subscribe_msg::().await?; + + // Node creates a `ConsensusSlotCheckpointsRequest` and sends it + let request = ConsensusSlotCheckpointsRequest {}; + channel.send(request).await?; + + // Node checks response + let response = response_sub.receive().await?; + if !response.slot_checkpoints { + warn!("Node has not seen any slot checkpoints, retrying..."); + continue + } + + // Keep peer to ask for consensus state + peer = Some(channel.clone()); + break + } + + // Release channels lock + drop(channels_map); + + // If no peer knows about any slot checkpoints, that means that the network was bootstrapped or restarted + // and no node has started consensus. + if peer.is_none() { + warn!("No node that has seen any slot checkpoints was found."); + info!("Consensus state synced!"); + return Ok(()) + } + let peer = peer.unwrap(); + + // Listen for next finalization + info!("Waiting for next finalization..."); + let subscriber = state.read().await.subscribers.get("blocks").unwrap().clone(); + let subscription = subscriber.subscribe().await; + subscription.receive().await; + + // After finalization occurs, sync our consensus state. + // This ensures that the received state always consists of 1 fork with one proposal. + info!("Finalization signal received, requesting consensus state..."); + // Communication setup + let msg_subsystem = peer.get_message_subsystem(); + msg_subsystem.add_dispatch::().await; + let response_sub = peer.subscribe_msg::().await?; + + // Node creates a `ConsensusRequest` and sends it + let request = ConsensusRequest {}; + peer.send(request).await?; + + // Node verifies response came from a participating node. + // Extra validations can be added here. + let response = response_sub.receive().await?; + + // Node stores response data. + let mut lock = state.write().await; + lock.consensus.offset = response.offset; + let mut forks = vec![]; + for fork in &response.forks { + forks.push(fork.clone().into()); + } + lock.consensus.forks = forks; + lock.unconfirmed_txs = response.unconfirmed_txs.clone(); + lock.consensus.slot_checkpoints = response.slot_checkpoints.clone(); + lock.consensus.leaders_history = response.leaders_history.clone(); + lock.consensus.nullifiers = response.nullifiers.clone(); + + info!("Consensus state synced!"); + Ok(()) +} diff --git a/src/consensus/task/proposal.rs b/src/consensus/task/proposal.rs index 34dba3366..1b9e8c1a8 100644 --- a/src/consensus/task/proposal.rs +++ b/src/consensus/task/proposal.rs @@ -180,9 +180,10 @@ pub async fn proposal_task2( let mut retries = 0; // Sync loop loop { - // Setting up participating to None, so node can still follow the finalized blocks by + // Resetting consensus state, so node can still follow the finalized blocks by // the sync p2p network/protocols - state.write().await.consensus.participating = None; + // TODO: verify that if a consensus node stops participating will receive finalized blocks + state.write().await.consensus.reset(); // Checking sync retries if retries > constants::SYNC_MAX_RETRIES { @@ -191,25 +192,6 @@ pub async fn proposal_task2( break } - // Node waits just before the current or next epoch last finalization syncing period, so it can - // start syncing latest state. - let mut seconds_until_next_epoch = state.read().await.consensus.next_n_epoch_start(1); - let sync_offset = Duration::new(constants::FINAL_SYNC_DUR + 1, 0); - - loop { - if seconds_until_next_epoch > sync_offset { - seconds_until_next_epoch -= sync_offset; - break - } - - info!("consensus: Waiting for next epoch ({:?} sec)", seconds_until_next_epoch); - sleep(seconds_until_next_epoch.as_secs()).await; - seconds_until_next_epoch = state.read().await.consensus.next_n_epoch_start(1); - } - - info!("consensus: Waiting for next epoch ({:?} sec)", seconds_until_next_epoch); - sleep(seconds_until_next_epoch.as_secs()).await; - // Node syncs its consensus state if let Err(e) = consensus_sync_task(consensus_p2p.clone(), state.clone()).await { error!("consensus: Failed syncing consensus state: {}. Quitting consensus.", e); @@ -249,6 +231,7 @@ async fn consensus_loop( ex: Arc>, ) { loop { + // TODO: Change order, since node extis consensus sync right before next slot // Node sleeps until finalization sync period starts let next_slot_start = state.read().await.consensus.next_n_slot_start(1); let seconds_sync_period = if next_slot_start.as_secs() > constants::FINAL_SYNC_DUR { @@ -301,7 +284,7 @@ async fn consensus_loop( // Verify node didn't skip next slot let current_slot = state.read().await.consensus.current_slot(); - if completed_slot == current_slot { + if completed_slot != current_slot { warn!( "consensus: Node missed slot {} due to finalizated blocks processing, resyncing...", current_slot