From 101754353f2d44f17e319ad265c0cc4713b48089 Mon Sep 17 00:00:00 2001 From: aggstam Date: Thu, 21 Apr 2022 15:28:58 +0200 Subject: [PATCH] consensus: Add tasks for syncing the blockchain and consensus forks. --- src/consensus2/mod.rs | 3 ++ src/consensus2/state.rs | 17 +++++++++ src/consensus2/task/block_sync.rs | 57 +++++++++++++++++++++++++++++++ src/consensus2/task/fork_sync.rs | 40 ++++++++++++++++++++++ src/consensus2/task/mod.rs | 5 +++ 5 files changed, 122 insertions(+) create mode 100644 src/consensus2/task/block_sync.rs create mode 100644 src/consensus2/task/fork_sync.rs create mode 100644 src/consensus2/task/mod.rs diff --git a/src/consensus2/mod.rs b/src/consensus2/mod.rs index 2f8a164cc..03b6b753a 100644 --- a/src/consensus2/mod.rs +++ b/src/consensus2/mod.rs @@ -29,6 +29,9 @@ pub use util::Timestamp; /// P2P net protocols pub mod proto; +/// async tasks to utilize the protocols +pub mod task; + use lazy_static::lazy_static; lazy_static! { /// Genesis hash for the mainnet chain diff --git a/src/consensus2/state.rs b/src/consensus2/state.rs index ff682885f..2595adfe9 100644 --- a/src/consensus2/state.rs +++ b/src/consensus2/state.rs @@ -609,4 +609,21 @@ impl ValidatorState { self.consensus.participants.remove(&index); } } + + /// Utility function to reset the current consensus state. + pub fn reset_consensus_state(&mut self) -> Result<()> { + let genesis_ts = self.consensus.genesis_ts.clone(); + let genesis_block = self.consensus.genesis_block.clone(); + let consensus = ConsensusState { + genesis_ts, + genesis_block, + proposals: vec![], + orphan_votes: vec![], + participants: FxIndexMap::with_hasher(FxBuildHasher::default()), + pending_participants: vec![], + }; + + self.consensus = consensus; + Ok(()) + } } diff --git a/src/consensus2/task/block_sync.rs b/src/consensus2/task/block_sync.rs new file mode 100644 index 000000000..abc8f65fc --- /dev/null +++ b/src/consensus2/task/block_sync.rs @@ -0,0 +1,57 @@ +use crate::{ + consensus2::{ + block::{BlockOrder, BlockResponse}, + ValidatorStatePtr, + }, + net, Result, +}; +use log::{info, warn}; + +/// async task used for block syncing. +pub async fn block_sync_task(p2p: net::P2pPtr, state: ValidatorStatePtr) -> Result<()> { + info!("Starting blockchain sync..."); + + // we retrieve p2p network connected channels, so we can use it to + // parallelize downloads. + // Using len here because is_empty() uses unstable library feature + // called 'exact_size_is_empty'. + if p2p.channels().lock().await.values().len() != 0 { + // Currently we will just use the last channel + let channel = p2p.channels().lock().await.values().last().unwrap().clone(); + + // Communication setup + let msg_subsystem = channel.get_message_subsystem(); + msg_subsystem.add_dispatch::().await; + let response_sub = channel.subscribe_msg::().await?; + + // Node sends the last known block hash of the canonical blockchain + // and loops until the response is the same block (used to utilize + // batch requests). + let mut last = state.read().await.blockchain.last()?.unwrap(); + info!("Last known block: {:?} - {:?}", last.0, last.1); + + loop { + // Node creates a `BlockOrder` and sends it + let order = BlockOrder { sl: last.0, block: last.1 }; + channel.send(order).await?; + + // Node stores response data. Extra validations can be added here. + let response = response_sub.receive().await?; + state.write().await.blockchain.add(&response.blocks)?; + + let last_received = state.read().await.blockchain.last()?.unwrap(); + info!("Last received block: {:?} - {:?}", last_received.0, last_received.1); + + if last == last_received { + break + } + + last = last_received; + } + } else { + warn!("Node is not connected to other nodes"); + } + + info!("Blockchain synced!"); + Ok(()) +} diff --git a/src/consensus2/task/fork_sync.rs b/src/consensus2/task/fork_sync.rs new file mode 100644 index 000000000..c25b38e12 --- /dev/null +++ b/src/consensus2/task/fork_sync.rs @@ -0,0 +1,40 @@ +use log::{info, warn}; + +use crate::{ + consensus2::{ + block::{ForkOrder, ForkResponse}, + ValidatorStatePtr, + }, + net, Result, +}; + +/// async task used for consensus fork syncing. +pub async fn fork_sync_task(p2p: net::P2pPtr, state: ValidatorStatePtr) -> Result<()> { + info!("Starting forks sync..."); + + // Using len here beacuse is_empty() uses unstable library feature + // called 'exact_size_is_empty'. + if p2p.channels().lock().await.values().len() != 0 { + // Nodes ask for the fork chains of the last channel peer + let channel = p2p.channels().lock().await.values().last().unwrap().clone(); + + // Communication setup + let msg_subsystem = channel.get_message_subsystem(); + msg_subsystem.add_dispatch::().await; + let response_sub = channel.subscribe_msg::().await?; + + // Node creates a `ForkOrder` and sends it + let order = ForkOrder { id: state.read().await.id }; + channel.send(order).await?; + + // Node stores response data. Extra validations can be added here. + let response = response_sub.receive().await?; + state.write().await.consensus.proposals = response.proposals.clone(); + } else { + warn!("Node is not connected to other nodes, resetting consensus state."); + state.write().await.reset_consensus_state()?; + } + + info!("Forks synced!"); + Ok(()) +} diff --git a/src/consensus2/task/mod.rs b/src/consensus2/task/mod.rs new file mode 100644 index 000000000..a5061f718 --- /dev/null +++ b/src/consensus2/task/mod.rs @@ -0,0 +1,5 @@ +mod block_sync; +pub use block_sync::block_sync_task; + +mod fork_sync; +pub use fork_sync::fork_sync_task;