consensus: Add tasks for syncing the blockchain and consensus forks.

This commit is contained in:
aggstam
2022-04-21 15:28:58 +02:00
committed by parazyd
parent 95d74a034a
commit 101754353f
5 changed files with 122 additions and 0 deletions

View File

@@ -29,6 +29,9 @@ pub use util::Timestamp;
/// P2P net protocols
pub mod proto;
/// async tasks to utilize the protocols
pub mod task;
use lazy_static::lazy_static;
lazy_static! {
/// Genesis hash for the mainnet chain

View File

@@ -609,4 +609,21 @@ impl ValidatorState {
self.consensus.participants.remove(&index);
}
}
/// Utility function to reset the current consensus state.
pub fn reset_consensus_state(&mut self) -> Result<()> {
let genesis_ts = self.consensus.genesis_ts.clone();
let genesis_block = self.consensus.genesis_block.clone();
let consensus = ConsensusState {
genesis_ts,
genesis_block,
proposals: vec![],
orphan_votes: vec![],
participants: FxIndexMap::with_hasher(FxBuildHasher::default()),
pending_participants: vec![],
};
self.consensus = consensus;
Ok(())
}
}

View File

@@ -0,0 +1,57 @@
use crate::{
consensus2::{
block::{BlockOrder, BlockResponse},
ValidatorStatePtr,
},
net, Result,
};
use log::{info, warn};
/// async task used for block syncing.
pub async fn block_sync_task(p2p: net::P2pPtr, state: ValidatorStatePtr) -> Result<()> {
info!("Starting blockchain sync...");
// we retrieve p2p network connected channels, so we can use it to
// parallelize downloads.
// Using len here because is_empty() uses unstable library feature
// called 'exact_size_is_empty'.
if p2p.channels().lock().await.values().len() != 0 {
// Currently we will just use the last channel
let channel = p2p.channels().lock().await.values().last().unwrap().clone();
// Communication setup
let msg_subsystem = channel.get_message_subsystem();
msg_subsystem.add_dispatch::<BlockResponse>().await;
let response_sub = channel.subscribe_msg::<BlockResponse>().await?;
// Node sends the last known block hash of the canonical blockchain
// and loops until the response is the same block (used to utilize
// batch requests).
let mut last = state.read().await.blockchain.last()?.unwrap();
info!("Last known block: {:?} - {:?}", last.0, last.1);
loop {
// Node creates a `BlockOrder` and sends it
let order = BlockOrder { sl: last.0, block: last.1 };
channel.send(order).await?;
// Node stores response data. Extra validations can be added here.
let response = response_sub.receive().await?;
state.write().await.blockchain.add(&response.blocks)?;
let last_received = state.read().await.blockchain.last()?.unwrap();
info!("Last received block: {:?} - {:?}", last_received.0, last_received.1);
if last == last_received {
break
}
last = last_received;
}
} else {
warn!("Node is not connected to other nodes");
}
info!("Blockchain synced!");
Ok(())
}

View File

@@ -0,0 +1,40 @@
use log::{info, warn};
use crate::{
consensus2::{
block::{ForkOrder, ForkResponse},
ValidatorStatePtr,
},
net, Result,
};
/// async task used for consensus fork syncing.
pub async fn fork_sync_task(p2p: net::P2pPtr, state: ValidatorStatePtr) -> Result<()> {
info!("Starting forks sync...");
// Using len here beacuse is_empty() uses unstable library feature
// called 'exact_size_is_empty'.
if p2p.channels().lock().await.values().len() != 0 {
// Nodes ask for the fork chains of the last channel peer
let channel = p2p.channels().lock().await.values().last().unwrap().clone();
// Communication setup
let msg_subsystem = channel.get_message_subsystem();
msg_subsystem.add_dispatch::<ForkResponse>().await;
let response_sub = channel.subscribe_msg::<ForkResponse>().await?;
// Node creates a `ForkOrder` and sends it
let order = ForkOrder { id: state.read().await.id };
channel.send(order).await?;
// Node stores response data. Extra validations can be added here.
let response = response_sub.receive().await?;
state.write().await.consensus.proposals = response.proposals.clone();
} else {
warn!("Node is not connected to other nodes, resetting consensus state.");
state.write().await.reset_consensus_state()?;
}
info!("Forks synced!");
Ok(())
}

View File

@@ -0,0 +1,5 @@
mod block_sync;
pub use block_sync::block_sync_task;
mod fork_sync;
pub use fork_sync::fork_sync_task;