consensus: slot checkpoints sync impl added as part of block_sync task

This commit is contained in:
aggstam
2022-11-29 16:49:45 +02:00
parent 0a030d7067
commit 4d819bf716
6 changed files with 148 additions and 4 deletions

View File

@@ -219,9 +219,19 @@ impl Blockchain {
Ok((slot, block.lead_info.offset))
}
/// Retrieve the last slot checkpoint.
pub fn last_slot_checkpoint(&self) -> Result<SlotCheckpoint> {
self.slot_checkpoints.get_last()
}
/// Retrieve n checkpoints after given start slot.
pub fn get_slot_checkpoints_after(&self, slot: u64, n: u64) -> Result<Vec<SlotCheckpoint>> {
debug!("get_slot_checkpoints_after(): {} -> {}", slot, n);
self.slot_checkpoints.get_after(slot, n)
}
/// Insert a given slice of [`SlotCheckpoint`] into the blockchain database.
pub fn add_slot_checkpoints(&self, slot_checkpoints: &[SlotCheckpoint]) -> Result<()> {
self.slot_checkpoints.insert(slot_checkpoints)
}
}

View File

@@ -123,6 +123,15 @@ impl SlotCheckpointStore {
Ok(ret)
}
/// Fetch the last slot checkpoint in the tree, based on the `Ord`
/// implementation for `Vec<u8>`. This should not be able to
/// fail because we initialize the store with the genesis slot checkpoint.
pub fn get_last(&self) -> Result<SlotCheckpoint> {
let found = self.0.last()?.unwrap();
let checkpoint = deserialize(&found.1)?;
Ok(checkpoint)
}
/// Retrieve records count
pub fn len(&self) -> usize {
self.0.len()

View File

@@ -24,6 +24,7 @@ use smol::Executor;
use crate::{
consensus::{
block::{BlockInfo, BlockOrder, BlockResponse},
state::{SlotCheckpointRequest, SlotCheckpointResponse},
ValidatorStatePtr,
},
net::{
@@ -39,6 +40,7 @@ const BATCH: u64 = 10;
pub struct ProtocolSync {
channel: ChannelPtr,
request_sub: MessageSubscription<BlockOrder>,
slot_checkpoin_request_sub: MessageSubscription<SlotCheckpointRequest>,
block_sub: MessageSubscription<BlockInfo>,
jobsman: ProtocolJobsManagerPtr,
state: ValidatorStatePtr,
@@ -55,14 +57,17 @@ impl ProtocolSync {
) -> Result<ProtocolBasePtr> {
let msg_subsystem = channel.get_message_subsystem();
msg_subsystem.add_dispatch::<BlockOrder>().await;
msg_subsystem.add_dispatch::<SlotCheckpointRequest>().await;
msg_subsystem.add_dispatch::<BlockInfo>().await;
let request_sub = channel.subscribe_msg::<BlockOrder>().await?;
let slot_checkpoin_request_sub = channel.subscribe_msg::<SlotCheckpointRequest>().await?;
let block_sub = channel.subscribe_msg::<BlockInfo>().await?;
Ok(Arc::new(Self {
channel: channel.clone(),
request_sub,
slot_checkpoin_request_sub,
block_sub,
jobsman: ProtocolJobsManager::new("SyncProtocol", channel),
state,
@@ -102,6 +107,52 @@ impl ProtocolSync {
}
}
async fn handle_receive_slot_checkpoint_request(self: Arc<Self>) -> Result<()> {
debug!("ProtocolSync::handle_receive_slot_checkpoint_request() [START]");
loop {
let request = match self.slot_checkpoin_request_sub.receive().await {
Ok(v) => v,
Err(e) => {
error!(
"ProtocolSync::handle_receive_slot_checkpoint_request(): recv fail: {}",
e
);
continue
}
};
debug!("ProtocolSync::handle_receive_slot_checkpoint_request() received {:?}", request);
// Extra validations can be added here
let key = request.slot;
let slot_checkpoints = match self
.state
.read()
.await
.blockchain
.get_slot_checkpoints_after(key, BATCH)
{
Ok(v) => v,
Err(e) => {
error!("ProtocolSync::handle_receive_slot_checkpoint_request(): get_slot_checkpoints_after fail: {}", e);
continue
}
};
debug!(
"ProtocolSync::handle_receive_slot_checkpoint_request(): Found {} slot checkpoints",
slot_checkpoints.len()
);
let response = SlotCheckpointResponse { slot_checkpoints };
if let Err(e) = self.channel.send(response).await {
error!(
"ProtocolSync::handle_receive_slot_checkpoint_request(): channel send fail: {}",
e
)
};
}
}
async fn handle_receive_block(self: Arc<Self>) -> Result<()> {
// Consensus-mode enabled nodes have already performed these steps,
// during proposal finalization.
@@ -155,6 +206,10 @@ impl ProtocolBase for ProtocolSync {
debug!("ProtocolSync::start() [START]");
self.jobsman.clone().start(executor.clone());
self.jobsman.clone().spawn(self.clone().handle_receive_request(), executor.clone()).await;
self.jobsman
.clone()
.spawn(self.clone().handle_receive_slot_checkpoint_request(), executor.clone())
.await;
self.jobsman.clone().spawn(self.clone().handle_receive_block(), executor.clone()).await;
debug!("ProtocolSync::start() [END]");
Ok(())

View File

@@ -122,7 +122,7 @@ impl net::Message for ConsensusResponse {
}
/// Auxiliary structure used to keep track of slot validation parameters.
#[derive(Debug, SerialEncodable, SerialDecodable)]
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct SlotCheckpoint {
/// Slot UID
pub slot: u64,
@@ -148,3 +148,29 @@ impl SlotCheckpoint {
Self::new(0, eta, sigma1, sigma2)
}
}
/// Auxiliary structure used for slot checkpoints syncing
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct SlotCheckpointRequest {
/// Slot UID
pub slot: u64,
}
impl net::Message for SlotCheckpointRequest {
fn name() -> &'static str {
"slotcheckpointrequest"
}
}
/// Auxiliary structure used for slot checkpoints syncing
#[derive(Debug, Clone, SerialEncodable, SerialDecodable)]
pub struct SlotCheckpointResponse {
/// Response blocks.
pub slot_checkpoints: Vec<SlotCheckpoint>,
}
impl net::Message for SlotCheckpointResponse {
fn name() -> &'static str {
"slotcheckpointresponse"
}
}

View File

@@ -19,6 +19,7 @@
use crate::{
consensus::{
block::{BlockOrder, BlockResponse},
state::{SlotCheckpointRequest, SlotCheckpointResponse},
ValidatorStatePtr,
},
net, Result,
@@ -28,10 +29,42 @@ use log::{debug, info, warn};
/// async task used for block syncing.
pub async fn block_sync_task(p2p: net::P2pPtr, state: ValidatorStatePtr) -> Result<()> {
info!("Starting blockchain sync...");
// Getting a random connected channel to ask for peers
// Getting a random connected channel to ask from peers
match p2p.clone().random_channel().await {
Some(channel) => {
// Communication setup
// Communication setup for slot checkpoints
let msg_subsystem = channel.get_message_subsystem();
msg_subsystem.add_dispatch::<SlotCheckpointResponse>().await;
let response_sub = channel.subscribe_msg::<SlotCheckpointResponse>().await?;
// Node sends the last known slot checkpoint of the canonical blockchain
// and loops until the response is the same slot (used to utilize batch requests).
let mut last = state.read().await.blockchain.last_slot_checkpoint()?;
info!("Last known slot checkpoint: {:?}", last.slot);
loop {
// Node creates a `SlotCheckpointRequest` and sends it
let request = SlotCheckpointRequest { slot: last.slot };
channel.send(request).await?;
// Node stores response data.
let resp = response_sub.receive().await?;
// Verify and store retrieved checkpoints
debug!("block_sync_task(): Processing received slot checkpoints");
state.write().await.receive_slot_checkpoints(&resp.slot_checkpoints).await?;
let last_received = state.read().await.blockchain.last_slot_checkpoint()?;
info!("Last received slot checkpoint: {:?}", last_received.slot);
if last.slot == last_received.slot {
break
}
last = last_received;
}
// Communication setup for blocks
let msg_subsystem = channel.get_message_subsystem();
msg_subsystem.add_dispatch::<BlockResponse>().await;
let response_sub = channel.subscribe_msg::<BlockResponse>().await?;

View File

@@ -38,7 +38,7 @@ use serde_json::json;
use super::{
constants,
leadcoin::{LeadCoin, LeadCoinSecrets},
state::ConsensusState,
state::{ConsensusState, SlotCheckpoint},
utils::fbig2base,
BlockInfo, BlockProposal, Float10, Header, LeadInfo, LeadProof, ProposalChain,
};
@@ -1421,4 +1421,15 @@ impl ValidatorState {
Ok(())
}
/// Append to canonical state received finalized slot checkpoints from block sync task.
pub async fn receive_slot_checkpoints(
&mut self,
slot_checkpoints: &[SlotCheckpoint],
) -> Result<()> {
debug!("receive_slot_checkpoints(): Appending slot checkpoints to ledger");
self.blockchain.add_slot_checkpoints(slot_checkpoints)?;
Ok(())
}
}