validator: updated sled-overlay version and use new diffs logic for finalization

This commit is contained in:
skoupidi
2024-03-19 16:47:33 +02:00
parent aae713227f
commit 41c9bd28ba
6 changed files with 102 additions and 75 deletions

4
Cargo.lock generated
View File

@@ -6259,9 +6259,9 @@ dependencies = [
[[package]]
name = "sled-overlay"
version = "0.0.8"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "88e44aa08940d9725a49a20955b44cd9807445d5bb7ad3cd3a6ae8eb45352d05"
checksum = "f9b13fe9552bed2194e2ba1274c7ba145558627dacd17037b62ad676b574bca6"
dependencies = [
"sled",
]

View File

@@ -119,7 +119,7 @@ libsqlite3-sys = {version = "0.28.0", features = ["sqlcipher"], optional = true}
# Blockchain store
sled = {version = "0.34.7", optional = true}
sled-overlay = {version = "0.0.8", optional = true}
sled-overlay = {version = "0.1.0", optional = true}
# Miner
randomx = {git = "https://github.com/darkrenaissance/RandomX", optional = true}

View File

@@ -137,10 +137,12 @@ impl Harness {
for (index, fork) in alice.iter().enumerate() {
assert_eq!(fork.proposals.len(), fork_sizes[index]);
assert_eq!(fork.diffs.len(), fork_sizes[index]);
}
for (index, fork) in bob.iter().enumerate() {
assert_eq!(fork.proposals.len(), fork_sizes[index]);
assert_eq!(fork.diffs.len(), fork_sizes[index]);
}
}

View File

@@ -59,6 +59,9 @@ async fn sync_blocks_real(ex: Arc<Executor<'static>>) -> Result<()> {
// Add them to nodes
th.add_blocks(&vec![block1, block2, block3.clone(), block4.clone()]).await?;
// Nodes must have one fork with 2 blocks
th.validate_fork_chains(1, vec![2]).await;
// Extend current fork sequence
let block5 = th.generate_next_block(&block4).await?;
// Create a new fork extending canonical
@@ -66,8 +69,19 @@ async fn sync_blocks_real(ex: Arc<Executor<'static>>) -> Result<()> {
// Add them to nodes
th.add_blocks(&vec![block5, block6.clone()]).await?;
// Nodes must have one fork with 2 blocks and one with 1 block
th.validate_fork_chains(2, vec![2, 1]).await;
// Grab current best fork index
let forks = th.alice.validator.consensus.forks.read().await;
// If index corresponds to the small fork, finalization
// did not occur, as it's size is not over the threshold.
let small_best = best_fork_index(&forks)? == 1;
drop(forks);
if small_best {
// Nodes must have one fork with 3 blocks and one with 2 blocks
th.validate_fork_chains(2, vec![3, 2]).await;
} else {
// Nodes must have one fork with 2 blocks and one with 1 block
th.validate_fork_chains(2, vec![2, 1]).await;
}
// We are going to create a third node and try to sync from Bob
let mut settings = Settings { localnet: true, inbound_connections: 3, ..Default::default() };
@@ -89,7 +103,7 @@ async fn sync_blocks_real(ex: Arc<Executor<'static>>) -> Result<()> {
let charlie_forks = charlie.consensus.forks.read().await;
assert_eq!(charlie_forks.len(), 1);
assert_eq!(charlie_forks[0].proposals.len(), best_fork.proposals.len());
let small_best = best_fork.proposals.len() == 1;
assert_eq!(charlie_forks[0].diffs.len(), best_fork.diffs.len());
drop(forks);
drop(charlie_forks);
@@ -106,12 +120,15 @@ async fn sync_blocks_real(ex: Arc<Executor<'static>>) -> Result<()> {
// it will have a single fork with 2 blocks.
assert_eq!(charlie_forks.len(), 1);
assert_eq!(charlie_forks[0].proposals.len(), 2);
assert_eq!(charlie_forks[0].diffs.len(), 2);
} else {
// Charlie didn't originaly have the fork, but it
// should be synced when its proposal was received
assert_eq!(charlie_forks.len(), 2);
assert_eq!(charlie_forks[0].proposals.len(), 2);
assert_eq!(charlie_forks[0].diffs.len(), 2);
assert_eq!(charlie_forks[1].proposals.len(), 2);
assert_eq!(charlie_forks[1].diffs.len(), 2);
}
drop(charlie_forks);
@@ -149,6 +166,7 @@ async fn sync_blocks_real(ex: Arc<Executor<'static>>) -> Result<()> {
let charlie_forks = charlie.consensus.forks.read().await;
assert_eq!(charlie_forks.len(), 1);
assert_eq!(charlie_forks[0].proposals.len(), 2);
assert_eq!(charlie_forks[0].diffs.len(), 2);
assert_eq!(last_proposal, charlie_forks[0].proposals[1]);
// Thanks for reading

View File

@@ -20,6 +20,7 @@ use darkfi_sdk::crypto::{MerkleTree, SecretKey};
use darkfi_serial::{async_trait, serialize, SerialDecodable, SerialEncodable};
use log::{debug, error, info};
use num_bigint::BigUint;
use sled_overlay::database::SledDbOverlayState;
use smol::lock::RwLock;
use crate::{
@@ -170,9 +171,11 @@ impl Consensus {
return Ok((original_fork.full_clone()?, Some(f_index)))
}
// TODO: use new sled overlay diffs logic to make this not having rebuild the whole fork
// Rebuild fork
let mut fork = Fork::new(self.blockchain.clone(), self.module.read().await.clone()).await?;
fork.proposals = original_fork.proposals[..p_index + 1].to_vec();
fork.diffs = original_fork.diffs[..p_index + 1].to_vec();
// Retrieve proposals blocks from original fork
let blocks = &original_fork.overlay.lock().unwrap().get_blocks_by_hash(&fork.proposals)?;
@@ -303,73 +306,53 @@ impl Consensus {
Ok(ret)
}
/// Auxiliary function to purge current forks and rebuild the ones starting
/// with the provided prefix. This function assumes that the prefix blocks have
/// already been appended to canonical chain.
pub async fn rebuild_forks(&self, prefix: &[BlockInfo]) -> Result<()> {
/// Auxiliary function to purge current forks and reset the ones starting
/// with the provided prefix, excluding provided finalized fork.
/// This function assumes that the prefix blocks have already been appended
/// to canonical chain from the finalized fork.
pub async fn reset_forks(&self, prefix: &[blake3::Hash], finalized_fork_index: &usize) {
// Grab a lock over current forks
let mut forks = self.forks.write().await;
// Find all the forks that start with the provided prefix,
// and grab their proposals
let suffix_start_index = prefix.len();
let prefix_last_index = suffix_start_index - 1;
let prefix_last = prefix.last().unwrap().hash()?;
let mut forks_proposals: Vec<Vec<BlockInfo>> = vec![];
for fork in forks.iter() {
if fork.proposals.is_empty() ||
prefix_last_index >= fork.proposals.len() ||
fork.proposals[prefix_last_index] != prefix_last
{
// excluding finalized fork index, and remove their prefixed
// proposals, and their corresponding diffs.
// If the fork is not starting with the provided prefix,
// drop it.
let excess = prefix.len();
let prefix_last_index = excess - 1;
let prefix_last = prefix.last().unwrap();
let mut dropped_forks = vec![];
for (index, fork) in forks.iter_mut().enumerate() {
if &index == finalized_fork_index {
continue
}
let suffix_proposals = fork
.overlay
.lock()
.unwrap()
.get_blocks_by_hash(&fork.proposals[suffix_start_index..])?;
// TODO add a stale forks purging logic, aka forks that
// we keep should be close to buffer size, for lower
// memory consumption
forks_proposals.push(suffix_proposals);
}
// Purge existing forks;
*forks = vec![];
// Rebuild forks
for proposals in forks_proposals {
// Create a new fork extending canonical
let mut fork =
Fork::new(self.blockchain.clone(), self.module.read().await.clone()).await?;
// Grab overlay last block
let mut previous = &fork.overlay.lock().unwrap().last_block()?;
// Append all proposals
for proposal in &proposals {
if verify_block(&fork.overlay, &fork.module, proposal, previous).await.is_err() {
error!(target: "validator::consensus::rebuild_best_fork", "Erroneous proposal block found");
fork.overlay.lock().unwrap().overlay.lock().unwrap().purge_new_trees()?;
drop(forks);
return Err(Error::BlockIsInvalid(proposal.hash()?.to_string()))
};
// Append proposal to the fork
fork.append_proposal(&Proposal::new(proposal.clone())?).await?;
// Set proposals as previous
previous = proposal;
if fork.proposals.is_empty() ||
prefix_last_index >= fork.proposals.len() ||
&fork.proposals[prefix_last_index] != prefix_last
{
dropped_forks.push(index);
continue
}
// Push the fork
forks.push(fork);
let rest_proposals = fork.proposals.split_off(excess);
let rest_diffs = fork.diffs.split_off(excess);
let mut diffs = fork.diffs.clone();
fork.proposals = rest_proposals;
fork.diffs = rest_diffs;
for diff in diffs.iter_mut() {
fork.overlay.lock().unwrap().overlay.lock().unwrap().remove_diff(diff);
}
}
// Drop invalid forks
for index in dropped_forks {
forks.remove(index);
}
// Drop forks lock
drop(forks);
Ok(())
}
}
@@ -409,6 +392,8 @@ pub struct Fork {
pub module: PoWModule,
/// Fork proposal hashes sequence
pub proposals: Vec<blake3::Hash>,
/// Fork proposal overlay diffs sequence
pub diffs: Vec<SledDbOverlayState>,
/// Valid pending transaction hashes
pub mempool: Vec<blake3::Hash>,
/// Current fork mining targets rank, cached for better performance
@@ -427,6 +412,7 @@ impl Fork {
overlay,
module,
proposals: vec![],
diffs: vec![],
mempool,
targets_rank: BigUint::from(0u64),
hashes_rank: BigUint::from(0u64),
@@ -495,6 +481,9 @@ impl Fork {
// Push proposal's hash
self.proposals.push(proposal.hash);
// Push proposal overlay diff
self.diffs.push(self.overlay.lock().unwrap().overlay.lock().unwrap().diff(&self.diffs));
Ok(())
}
@@ -592,10 +581,20 @@ impl Fork {
let overlay = self.overlay.lock().unwrap().full_clone()?;
let module = self.module.clone();
let proposals = self.proposals.clone();
let diffs = self.diffs.clone();
let mempool = self.mempool.clone();
let targets_rank = self.targets_rank.clone();
let hashes_rank = self.hashes_rank.clone();
Ok(Self { blockchain, overlay, module, proposals, mempool, targets_rank, hashes_rank })
Ok(Self {
blockchain,
overlay,
module,
proposals,
diffs,
mempool,
targets_rank,
hashes_rank,
})
}
}

View File

@@ -338,31 +338,39 @@ impl Validator {
return Ok(vec![])
}
// Grab fork proposals sequence
let forks = self.consensus.forks.read().await;
let fork = &forks[finalized_fork.unwrap()];
let proposals = fork.overlay.lock().unwrap().get_blocks_by_hash(&fork.proposals)?;
drop(forks);
// Grab the actual best fork
let finalized_fork = finalized_fork.unwrap();
let mut forks = self.consensus.forks.write().await;
let fork = &mut forks[finalized_fork];
// Find the excess over finalization threshold
let excess = (proposals.len() - self.consensus.finalization_threshold) + 1;
let excess = (fork.proposals.len() - self.consensus.finalization_threshold) + 1;
// Append finalized blocks
let finalized = &proposals[..excess];
// Apply finalized proposals diffs
let rest_proposals = fork.proposals.split_off(excess);
let rest_diffs = fork.diffs.split_off(excess);
let finalized = fork.proposals.clone();
let mut diffs = fork.diffs.clone();
fork.proposals = rest_proposals;
fork.diffs = rest_diffs;
info!(target: "validator::finalization", "Finalizing proposals:");
for block in finalized {
info!(target: "validator::finalization", "\t{} - {}", block.hash()?, block.header.height);
for (index, proposal) in finalized.iter().enumerate() {
info!(target: "validator::finalization", "\t{}", proposal);
fork.overlay.lock().unwrap().overlay.lock().unwrap().apply_diff(&mut diffs[index])?;
}
self.add_blocks(finalized).await?;
drop(forks);
// Rebuild forks starting with the finalized blocks
self.consensus.rebuild_forks(finalized).await?;
// Reset forks starting with the finalized blocks
self.consensus.reset_forks(&finalized, &finalized_fork).await;
info!(target: "validator::finalization", "Finalization completed!");
// Release append lock
drop(append_lock);
Ok(proposals)
// Grab finalized blocks
let finalized = self.blockchain.get_blocks_by_hash(&finalized)?;
Ok(finalized)
}
// ==========================