diff --git a/src/blockchain/mod.rs b/src/blockchain/mod.rs index 8671d260b..2cc7e93ae 100644 --- a/src/blockchain/mod.rs +++ b/src/blockchain/mod.rs @@ -219,9 +219,19 @@ impl Blockchain { Ok((slot, block.lead_info.offset)) } + /// Retrieve the last slot checkpoint. + pub fn last_slot_checkpoint(&self) -> Result { + self.slot_checkpoints.get_last() + } + /// Retrieve n checkpoints after given start slot. pub fn get_slot_checkpoints_after(&self, slot: u64, n: u64) -> Result> { debug!("get_slot_checkpoints_after(): {} -> {}", slot, n); self.slot_checkpoints.get_after(slot, n) } + + /// Insert a given slice of [`SlotCheckpoint`] into the blockchain database. + pub fn add_slot_checkpoints(&self, slot_checkpoints: &[SlotCheckpoint]) -> Result<()> { + self.slot_checkpoints.insert(slot_checkpoints) + } } diff --git a/src/blockchain/slotcheckpointstore.rs b/src/blockchain/slotcheckpointstore.rs index 867ea0ef8..a22991995 100644 --- a/src/blockchain/slotcheckpointstore.rs +++ b/src/blockchain/slotcheckpointstore.rs @@ -123,6 +123,15 @@ impl SlotCheckpointStore { Ok(ret) } + /// Fetch the last slot checkpoint in the tree, based on the `Ord` + /// implementation for `Vec`. This should not be able to + /// fail because we initialize the store with the genesis slot checkpoint. + pub fn get_last(&self) -> Result { + let found = self.0.last()?.unwrap(); + let checkpoint = deserialize(&found.1)?; + Ok(checkpoint) + } + /// Retrieve records count pub fn len(&self) -> usize { self.0.len() diff --git a/src/consensus/proto/protocol_sync.rs b/src/consensus/proto/protocol_sync.rs index 463c3d0c9..eb97c8d61 100644 --- a/src/consensus/proto/protocol_sync.rs +++ b/src/consensus/proto/protocol_sync.rs @@ -24,6 +24,7 @@ use smol::Executor; use crate::{ consensus::{ block::{BlockInfo, BlockOrder, BlockResponse}, + state::{SlotCheckpointRequest, SlotCheckpointResponse}, ValidatorStatePtr, }, net::{ @@ -39,6 +40,7 @@ const BATCH: u64 = 10; pub struct ProtocolSync { channel: ChannelPtr, request_sub: MessageSubscription, + slot_checkpoin_request_sub: MessageSubscription, block_sub: MessageSubscription, jobsman: ProtocolJobsManagerPtr, state: ValidatorStatePtr, @@ -55,14 +57,17 @@ impl ProtocolSync { ) -> Result { let msg_subsystem = channel.get_message_subsystem(); msg_subsystem.add_dispatch::().await; + msg_subsystem.add_dispatch::().await; msg_subsystem.add_dispatch::().await; let request_sub = channel.subscribe_msg::().await?; + let slot_checkpoin_request_sub = channel.subscribe_msg::().await?; let block_sub = channel.subscribe_msg::().await?; Ok(Arc::new(Self { channel: channel.clone(), request_sub, + slot_checkpoin_request_sub, block_sub, jobsman: ProtocolJobsManager::new("SyncProtocol", channel), state, @@ -102,6 +107,52 @@ impl ProtocolSync { } } + async fn handle_receive_slot_checkpoint_request(self: Arc) -> Result<()> { + debug!("ProtocolSync::handle_receive_slot_checkpoint_request() [START]"); + loop { + let request = match self.slot_checkpoin_request_sub.receive().await { + Ok(v) => v, + Err(e) => { + error!( + "ProtocolSync::handle_receive_slot_checkpoint_request(): recv fail: {}", + e + ); + continue + } + }; + + debug!("ProtocolSync::handle_receive_slot_checkpoint_request() received {:?}", request); + + // Extra validations can be added here + let key = request.slot; + let slot_checkpoints = match self + .state + .read() + .await + .blockchain + .get_slot_checkpoints_after(key, BATCH) + { + Ok(v) => v, + Err(e) => { + error!("ProtocolSync::handle_receive_slot_checkpoint_request(): get_slot_checkpoints_after fail: {}", e); + continue + } + }; + debug!( + "ProtocolSync::handle_receive_slot_checkpoint_request(): Found {} slot checkpoints", + slot_checkpoints.len() + ); + + let response = SlotCheckpointResponse { slot_checkpoints }; + if let Err(e) = self.channel.send(response).await { + error!( + "ProtocolSync::handle_receive_slot_checkpoint_request(): channel send fail: {}", + e + ) + }; + } + } + async fn handle_receive_block(self: Arc) -> Result<()> { // Consensus-mode enabled nodes have already performed these steps, // during proposal finalization. @@ -155,6 +206,10 @@ impl ProtocolBase for ProtocolSync { debug!("ProtocolSync::start() [START]"); self.jobsman.clone().start(executor.clone()); self.jobsman.clone().spawn(self.clone().handle_receive_request(), executor.clone()).await; + self.jobsman + .clone() + .spawn(self.clone().handle_receive_slot_checkpoint_request(), executor.clone()) + .await; self.jobsman.clone().spawn(self.clone().handle_receive_block(), executor.clone()).await; debug!("ProtocolSync::start() [END]"); Ok(()) diff --git a/src/consensus/state.rs b/src/consensus/state.rs index fcc6482d2..9def27b95 100644 --- a/src/consensus/state.rs +++ b/src/consensus/state.rs @@ -122,7 +122,7 @@ impl net::Message for ConsensusResponse { } /// Auxiliary structure used to keep track of slot validation parameters. -#[derive(Debug, SerialEncodable, SerialDecodable)] +#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] pub struct SlotCheckpoint { /// Slot UID pub slot: u64, @@ -148,3 +148,29 @@ impl SlotCheckpoint { Self::new(0, eta, sigma1, sigma2) } } + +/// Auxiliary structure used for slot checkpoints syncing +#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] +pub struct SlotCheckpointRequest { + /// Slot UID + pub slot: u64, +} + +impl net::Message for SlotCheckpointRequest { + fn name() -> &'static str { + "slotcheckpointrequest" + } +} + +/// Auxiliary structure used for slot checkpoints syncing +#[derive(Debug, Clone, SerialEncodable, SerialDecodable)] +pub struct SlotCheckpointResponse { + /// Response blocks. + pub slot_checkpoints: Vec, +} + +impl net::Message for SlotCheckpointResponse { + fn name() -> &'static str { + "slotcheckpointresponse" + } +} diff --git a/src/consensus/task/block_sync.rs b/src/consensus/task/block_sync.rs index bbe5f1aca..56802b965 100644 --- a/src/consensus/task/block_sync.rs +++ b/src/consensus/task/block_sync.rs @@ -19,6 +19,7 @@ use crate::{ consensus::{ block::{BlockOrder, BlockResponse}, + state::{SlotCheckpointRequest, SlotCheckpointResponse}, ValidatorStatePtr, }, net, Result, @@ -28,10 +29,42 @@ use log::{debug, info, warn}; /// async task used for block syncing. pub async fn block_sync_task(p2p: net::P2pPtr, state: ValidatorStatePtr) -> Result<()> { info!("Starting blockchain sync..."); - // Getting a random connected channel to ask for peers + // Getting a random connected channel to ask from peers match p2p.clone().random_channel().await { Some(channel) => { - // Communication setup + // Communication setup for slot checkpoints + let msg_subsystem = channel.get_message_subsystem(); + msg_subsystem.add_dispatch::().await; + let response_sub = channel.subscribe_msg::().await?; + + // Node sends the last known slot checkpoint of the canonical blockchain + // and loops until the response is the same slot (used to utilize batch requests). + let mut last = state.read().await.blockchain.last_slot_checkpoint()?; + info!("Last known slot checkpoint: {:?}", last.slot); + + loop { + // Node creates a `SlotCheckpointRequest` and sends it + let request = SlotCheckpointRequest { slot: last.slot }; + channel.send(request).await?; + + // Node stores response data. + let resp = response_sub.receive().await?; + + // Verify and store retrieved checkpoints + debug!("block_sync_task(): Processing received slot checkpoints"); + state.write().await.receive_slot_checkpoints(&resp.slot_checkpoints).await?; + + let last_received = state.read().await.blockchain.last_slot_checkpoint()?; + info!("Last received slot checkpoint: {:?}", last_received.slot); + + if last.slot == last_received.slot { + break + } + + last = last_received; + } + + // Communication setup for blocks let msg_subsystem = channel.get_message_subsystem(); msg_subsystem.add_dispatch::().await; let response_sub = channel.subscribe_msg::().await?; diff --git a/src/consensus/validator.rs b/src/consensus/validator.rs index 97f900624..b08aa05d8 100644 --- a/src/consensus/validator.rs +++ b/src/consensus/validator.rs @@ -38,7 +38,7 @@ use serde_json::json; use super::{ constants, leadcoin::{LeadCoin, LeadCoinSecrets}, - state::ConsensusState, + state::{ConsensusState, SlotCheckpoint}, utils::fbig2base, BlockInfo, BlockProposal, Float10, Header, LeadInfo, LeadProof, ProposalChain, }; @@ -1421,4 +1421,15 @@ impl ValidatorState { Ok(()) } + + /// Append to canonical state received finalized slot checkpoints from block sync task. + pub async fn receive_slot_checkpoints( + &mut self, + slot_checkpoints: &[SlotCheckpoint], + ) -> Result<()> { + debug!("receive_slot_checkpoints(): Appending slot checkpoints to ledger"); + self.blockchain.add_slot_checkpoints(slot_checkpoints)?; + + Ok(()) + } }