darkfid: optional checkpoint usage during syncing added

This commit is contained in:
skoupidi
2024-04-22 21:14:56 +03:00
parent 7085ac34b1
commit ee2859554a
16 changed files with 173 additions and 22 deletions

View File

@@ -40,6 +40,12 @@ recipient = "5ZHfYpt4mpJcwBNxfEyxLzeFJUEeoePs5NQ5jVEgHrMf"
# Skip syncing process and start node right away
skip_sync = true
# Optional sync checkpoint height
#checkpoint_height = 0
# Optional sync checkpoint hash
#checkpoint = ""
## Localnet P2P network settings
[network_config."localnet".net]
# P2P accept addresses the instance listens on for inbound connections
@@ -114,6 +120,12 @@ miner = false
# Skip syncing process and start node right away
skip_sync = false
# Optional sync checkpoint height
#checkpoint_height = 0
# Optional sync checkpoint hash
#checkpoint = ""
## Testnet P2P network settings
[network_config."testnet".net]
# P2P accept addresses the instance listens on for inbound connections
@@ -198,6 +210,12 @@ miner = false
# Skip syncing process and start node right away
skip_sync = false
# Optional sync checkpoint height
#checkpoint_height = 0
# Optional sync checkpoint hash
#checkpoint = ""
## Mainnet P2P network settings
[network_config."mainnet".net]
# P2P accept addresses the instance listens on for inbound connections

View File

@@ -29,7 +29,7 @@ use url::Url;
use darkfi::{
async_daemonize,
blockchain::BlockInfo,
blockchain::{BlockInfo, HeaderHash},
cli_desc,
net::{settings::SettingsOpt, P2pPtr},
rpc::{
@@ -138,6 +138,14 @@ pub struct BlockchainNetwork {
/// Skip syncing process and start node right away
pub skip_sync: bool,
#[structopt(long)]
/// Optional sync checkpoint height
pub checkpoint_height: Option<u32>,
#[structopt(long)]
/// Optional sync checkpoint hash
pub checkpoint: Option<String>,
/// P2P network settings
#[structopt(flatten)]
pub net: SettingsOpt,
@@ -308,7 +316,18 @@ async fn realmain(args: Args, ex: Arc<smol::Executor<'static>>) -> Result<()> {
// Sync blockchain
if !blockchain_config.skip_sync {
sync_task(&darkfid).await?;
// Parse configured checkpoint
if blockchain_config.checkpoint_height.is_some() && blockchain_config.checkpoint.is_none() {
return Err(Error::ParseFailed("Blockchain configured checkpoint hash missing"))
}
let checkpoint = if let Some(height) = blockchain_config.checkpoint_height {
Some((height, HeaderHash::from_str(&blockchain_config.checkpoint.unwrap())?))
} else {
None
};
sync_task(&darkfid, checkpoint).await?;
} else {
*darkfid.validator.synced.write().await = true;
}

View File

@@ -38,7 +38,8 @@ use crate::{
// TODO: Parallelize independent requests.
// We can also make them be like torrents, where we retrieve chunks not in order.
/// async task used for block syncing.
pub async fn sync_task(node: &Darkfid) -> Result<()> {
/// A checkpoint can be provided to ensure node syncs the correct sequence.
pub async fn sync_task(node: &Darkfid, checkpoint: Option<(u32, HeaderHash)>) -> Result<()> {
info!(target: "darkfid::task::sync_task", "Starting blockchain sync...");
// Generate a new fork to be able to extend
@@ -48,9 +49,6 @@ pub async fn sync_task(node: &Darkfid) -> Result<()> {
// Grab blocks subscriber
let block_sub = node.subscribers.get("blocks").unwrap();
// TODO: Configure a checkpoint, filter peers that don't have that and start
// syncing the sequence until that
// Grab last known block header, including existing pending sync ones
let mut last = node.validator.blockchain.last()?;
// Check sync headers first record is the next one
@@ -67,11 +65,28 @@ pub async fn sync_task(node: &Darkfid) -> Result<()> {
}
info!(target: "darkfid::task::sync_task", "Last known block: {} - {}", last.0, last.1);
// Grab synced peers tips
let mut tips = synced_peers(node, &last.1).await?;
// Grab the most common tip and the corresponding peers
let (mut common_tip_height, mut common_tip_peers) = most_common_tip(tips).await?;
let (mut common_tip_height, mut common_tip_peers) =
most_common_tip(node, &last.1, checkpoint).await?;
// If last known block header is before the checkpoint, we sync until that first.
if let Some(checkpoint) = checkpoint {
if checkpoint.0 > last.0 {
info!(target: "darkfid::task::sync_task", "Syncing until configured checkpoint: {} - {}", checkpoint.0, checkpoint.1);
// Retrieve all the headers backwards until our last known one and verify them.
// We use the next height, in order to also retrieve the checkpoint header.
retrieve_headers(node, &common_tip_peers, last.0, checkpoint.0 + 1).await?;
// TODO: Create a more minimal verification so checkpoint blocks can be
// applied directly, skipping the full formal block checks.
// Retrieve all the blocks for those headers and apply them to canonical
last = retrieve_blocks(node, &common_tip_peers, last, block_sub).await?;
info!(target: "darkfid::task::sync_task", "Last received block: {} - {}", last.0, last.1);
// Grab synced peers most common tip again
(common_tip_height, common_tip_peers) = most_common_tip(node, &last.1, None).await?;
}
}
// Sync headers and blocks
loop {
@@ -90,8 +105,7 @@ pub async fn sync_task(node: &Darkfid) -> Result<()> {
last = last_received;
// Grab synced peers most common tip again
tips = synced_peers(node, &last.1).await?;
(common_tip_height, common_tip_peers) = most_common_tip(tips).await?;
(common_tip_height, common_tip_peers) = most_common_tip(node, &last.1, None).await?;
}
// Sync best fork
@@ -118,6 +132,7 @@ pub async fn sync_task(node: &Darkfid) -> Result<()> {
async fn synced_peers(
node: &Darkfid,
last_tip: &HeaderHash,
checkpoint: Option<(u32, HeaderHash)>,
) -> Result<HashMap<(u32, [u8; 32]), Vec<ChannelPtr>>> {
info!(target: "darkfid::task::sync::synced_peers", "Receiving tip from peers...");
let mut tips = HashMap::new();
@@ -129,6 +144,28 @@ async fn synced_peers(
if !peers.is_empty() {
// Ask each peer if they are synced
for peer in peers {
// If a checkpoint was provider, we check that the peer follows that sequence
if let Some(c) = checkpoint {
// Communication setup
let response_sub = peer.subscribe_msg::<HeaderSyncResponse>().await?;
// Node creates a `HeaderSyncRequest` and sends it
let request = HeaderSyncRequest { height: c.0 + 1 };
peer.send(&request).await?;
// Node waits for response
let Ok(response) = response_sub.receive_with_timeout(COMMS_TIMEOUT).await
else {
continue
};
// Handle response
if response.headers.is_empty() || response.headers.last().unwrap().hash() != c.1
{
continue
}
}
// Communication setup
let response_sub = peer.subscribe_msg::<TipResponse>().await?;
@@ -167,8 +204,13 @@ async fn synced_peers(
/// Auxiliary function to ask all peers for their current tip and find the most common one.
async fn most_common_tip(
tips: HashMap<(u32, [u8; 32]), Vec<ChannelPtr>>,
node: &Darkfid,
last_tip: &HeaderHash,
checkpoint: Option<(u32, HeaderHash)>,
) -> Result<(u32, Vec<ChannelPtr>)> {
// Grab synced peers tips
let tips = synced_peers(node, last_tip, checkpoint).await?;
// Grab the most common highest tip peers
info!(target: "darkfid::task::sync::most_common_tip", "Finding most common tip...");
let mut common_tip = (0, [0u8; 32], vec![]);

View File

@@ -19,7 +19,7 @@
use std::{collections::HashMap, sync::Arc};
use darkfi::{
blockchain::{BlockInfo, Header},
blockchain::{BlockInfo, Header, HeaderHash},
net::Settings,
rpc::jsonrpc::JsonSubscriber,
system::sleep,
@@ -91,13 +91,13 @@ impl Harness {
// Alice
let alice_url = Url::parse("tcp+tls://127.0.0.1:18340")?;
settings.inbound_addrs = vec![alice_url.clone()];
let alice = generate_node(&vks, &validator_config, &settings, ex, true, true).await?;
let alice = generate_node(&vks, &validator_config, &settings, ex, true, true, None).await?;
// Bob
let bob_url = Url::parse("tcp+tls://127.0.0.1:18341")?;
settings.inbound_addrs = vec![bob_url];
settings.peers = vec![alice_url];
let bob = generate_node(&vks, &validator_config, &settings, ex, true, false).await?;
let bob = generate_node(&vks, &validator_config, &settings, ex, true, false, None).await?;
Ok(Self { config, vks, validator_config, alice, bob })
}
@@ -228,6 +228,7 @@ pub async fn generate_node(
ex: &Arc<smol::Executor<'static>>,
miner: bool,
skip_sync: bool,
checkpoint: Option<(u32, HeaderHash)>,
) -> Result<Darkfid> {
let sled_db = sled::Config::new().temporary(true).open()?;
vks::inject(&sled_db, vks)?;
@@ -245,7 +246,7 @@ pub async fn generate_node(
p2p.start().await?;
if !skip_sync {
sync_task(&node).await?;
sync_task(&node, checkpoint).await?;
} else {
*node.validator.synced.write().await = true;
}

View File

@@ -55,7 +55,7 @@ async fn sync_blocks_real(ex: Arc<Executor<'static>>) -> Result<()> {
let block4 = th.generate_next_block(&block3).await?;
// Add them to nodes
th.add_blocks(&vec![block1, block2, block3.clone(), block4.clone()]).await?;
th.add_blocks(&vec![block1, block2.clone(), block3.clone(), block4.clone()]).await?;
// Nodes must have one fork with 2 blocks
th.validate_fork_chains(1, vec![2]).await;
@@ -88,8 +88,16 @@ async fn sync_blocks_real(ex: Arc<Executor<'static>>) -> Result<()> {
settings.inbound_addrs = vec![charlie_url];
let bob_url = th.bob.p2p.settings().inbound_addrs[0].clone();
settings.peers = vec![bob_url];
let charlie =
generate_node(&th.vks, &th.validator_config, &settings, &ex, false, false).await?;
let charlie = generate_node(
&th.vks,
&th.validator_config,
&settings,
&ex,
false,
false,
Some((block2.header.height, block2.hash())),
)
.await?;
// Verify node synced
let alice = &th.alice.validator;
let charlie = &charlie.validator;

View File

@@ -68,7 +68,7 @@ async fn sync_forks_real(ex: Arc<Executor<'static>>) -> Result<()> {
let bob_url = th.bob.p2p.settings().inbound_addrs[0].clone();
settings.peers = vec![bob_url];
let charlie =
generate_node(&th.vks, &th.validator_config, &settings, &ex, false, false).await?;
generate_node(&th.vks, &th.validator_config, &settings, &ex, false, false, None).await?;
// Verify node synced the best fork
let forks = th.alice.validator.consensus.forks.read().await;

View File

@@ -37,6 +37,12 @@ recipient = "9vw6WznKk7xEFQwwXhJWMMdjUPi3cXL8NrFKQpKifG1U"
# Skip syncing process and start node right away
skip_sync = true
# Optional sync checkpoint height
#checkpoint_height = 0
# Optional sync checkpoint hash
#checkpoint = ""
## Localnet P2P network settings
[network_config."localnet".net]
# P2P accept addresses the instance listens on for inbound connections

View File

@@ -37,6 +37,12 @@ recipient = "9vw6WznKk7xEFQwwXhJWMMdjUPi3cXL8NrFKQpKifG1U"
# Skip syncing process and start node right away
skip_sync = false
# Optional sync checkpoint height
#checkpoint_height = 0
# Optional sync checkpoint hash
#checkpoint = ""
## Localnet P2P network settings
[network_config."localnet".net]
# P2P accept addresses the instance listens on for inbound connections

View File

@@ -37,6 +37,12 @@ recipient = "9vw6WznKk7xEFQwwXhJWMMdjUPi3cXL8NrFKQpKifG1U"
# Skip syncing process and start node right away
skip_sync = false
# Optional sync checkpoint height
#checkpoint_height = 0
# Optional sync checkpoint hash
#checkpoint = ""
## Localnet P2P network settings
[network_config."localnet".net]
# P2P accept addresses the instance listens on for inbound connections

View File

@@ -37,6 +37,12 @@ recipient = "9vw6WznKk7xEFQwwXhJWMMdjUPi3cXL8NrFKQpKifG1U"
# Skip syncing process and start node right away
skip_sync = false
# Optional sync checkpoint height
#checkpoint_height = 0
# Optional sync checkpoint hash
#checkpoint = ""
## Localnet P2P network settings
[network_config."localnet".net]
# P2P accept addresses the instance listens on for inbound connections

View File

@@ -37,6 +37,12 @@ recipient = "9vw6WznKk7xEFQwwXhJWMMdjUPi3cXL8NrFKQpKifG1U"
# Skip syncing process and start node right away
skip_sync = false
# Optional sync checkpoint height
#checkpoint_height = 0
# Optional sync checkpoint hash
#checkpoint = ""
## Localnet P2P network settings
[network_config."localnet".net]
# P2P accept addresses the instance listens on for inbound connections

View File

@@ -40,6 +40,12 @@ recipient = "9vw6WznKk7xEFQwwXhJWMMdjUPi3cXL8NrFKQpKifG1U"
# Skip syncing process and start node right away
skip_sync = true
# Optional sync checkpoint height
#checkpoint_height = 0
# Optional sync checkpoint hash
#checkpoint = ""
## Localnet P2P network settings
[network_config."localnet".net]
# P2P accept addresses the instance listens on for inbound connections

View File

@@ -37,6 +37,12 @@ recipient = "9vw6WznKk7xEFQwwXhJWMMdjUPi3cXL8NrFKQpKifG1U"
# Skip syncing process and start node right away
skip_sync = true
# Optional sync checkpoint height
#checkpoint_height = 0
# Optional sync checkpoint hash
#checkpoint = ""
## Localnet P2P network settings
[network_config."localnet".net]
# P2P accept addresses the instance listens on for inbound connections

View File

@@ -45,6 +45,12 @@ recipient = "9vw6WznKk7xEFQwwXhJWMMdjUPi3cXL8NrFKQpKifG1U"
# Skip syncing process and start node right away
skip_sync = false
# Optional sync checkpoint height
#checkpoint_height = 0
# Optional sync checkpoint hash
#checkpoint = ""
## Localnet P2P network settings
[network_config."localnet".net]
# P2P accept addresses the instance listens on for inbound connections

View File

@@ -32,6 +32,12 @@ miner = false
# Skip syncing process and start node right away
skip_sync = false
# Optional sync checkpoint height
#checkpoint_height = 0
# Optional sync checkpoint hash
#checkpoint = ""
## Localnet P2P network settings
[network_config."localnet".net]
# P2P accept addresses the instance listens on for inbound connections

View File

@@ -16,11 +16,12 @@
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
use std::fmt;
use std::{fmt, str::FromStr};
use darkfi_sdk::{
blockchain::block_version,
crypto::{MerkleNode, MerkleTree},
hex::decode_hex_arr,
AsHex,
};
@@ -51,6 +52,14 @@ impl HeaderHash {
}
}
impl FromStr for HeaderHash {
type Err = Error;
fn from_str(header_hash_str: &str) -> Result<Self> {
Ok(Self(decode_hex_arr(header_hash_str)?))
}
}
impl fmt::Display for HeaderHash {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.0.hex())