From 435df441276be7f66519b56eb47ed394e952acb5 Mon Sep 17 00:00:00 2001 From: rakita Date: Fri, 28 Apr 2023 23:46:45 +0200 Subject: [PATCH] feat(tree): unconnected block buffering (#2397) Co-authored-by: Matthias Seitz --- Cargo.lock | 10 + crates/blockchain-tree/Cargo.toml | 5 +- crates/blockchain-tree/src/block_buffer.rs | 398 ++++++++++++++++++ crates/blockchain-tree/src/block_indices.rs | 38 +- crates/blockchain-tree/src/blockchain_tree.rs | 203 ++++++--- crates/blockchain-tree/src/chain.rs | 4 +- crates/blockchain-tree/src/config.rs | 17 +- crates/blockchain-tree/src/lib.rs | 4 + crates/blockchain-tree/src/shareable.rs | 15 +- crates/consensus/beacon/src/engine/mod.rs | 17 +- crates/interfaces/src/blockchain_tree.rs | 20 +- crates/interfaces/src/executor.rs | 10 +- crates/primitives/src/block.rs | 2 +- crates/primitives/src/header.rs | 11 +- crates/storage/provider/src/chain.rs | 2 +- crates/storage/provider/src/providers/mod.rs | 8 +- 16 files changed, 657 insertions(+), 107 deletions(-) create mode 100644 crates/blockchain-tree/src/block_buffer.rs diff --git a/Cargo.lock b/Cargo.lock index 148473f3e1..86277781c6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3307,6 +3307,15 @@ dependencies = [ "hashbrown 0.13.2", ] +[[package]] +name = "lru" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03f1160296536f10c833a82dca22267d5486734230d47bf00bf435885814ba1e" +dependencies = [ + "hashbrown 0.13.2", +] + [[package]] name = "lru-cache" version = "0.1.2" @@ -4686,6 +4695,7 @@ version = "0.1.0" dependencies = [ "aquamarine", "assert_matches", + "lru 0.10.0", "parking_lot 0.12.1", "reth-db", "reth-interfaces", diff --git a/crates/blockchain-tree/Cargo.toml b/crates/blockchain-tree/Cargo.toml index 379f691f1b..b56b4f1452 100644 --- a/crates/blockchain-tree/Cargo.toml +++ b/crates/blockchain-tree/Cargo.toml @@ -19,11 +19,10 @@ reth-interfaces = { path = "../interfaces" } reth-db = { path = "../storage/db" } reth-provider = { path = "../storage/provider" } -# tracing -tracing = "0.1" - # common parking_lot = { version = "0.12"} +lru = "0.10" +tracing = "0.1" # mics aquamarine = "0.3.0" diff --git a/crates/blockchain-tree/src/block_buffer.rs b/crates/blockchain-tree/src/block_buffer.rs new file mode 100644 index 0000000000..f17bca6271 --- /dev/null +++ b/crates/blockchain-tree/src/block_buffer.rs @@ -0,0 +1,398 @@ +use lru::LruCache; +use reth_primitives::{BlockHash, BlockNumHash, BlockNumber, SealedBlockWithSenders}; +use std::{ + collections::{btree_map::Entry, hash_map, BTreeMap, HashMap, HashSet}, + num::NonZeroUsize, +}; +/// Type that contains blocks by number and hash. +pub type BufferedBlocks = BTreeMap>; + +/// Contains the Tree of pending blocks that are not executed but buffered +/// It allows us to store unconnected blocks for potential inclusion. +/// +/// It has three main functionality: +/// * `insert_block` for inserting blocks inside the buffer. +/// * `take_all_childrens` for connecting blocks if the parent gets received and inserted. +/// * `clean_old_blocks` to clear old blocks that are below finalized line. +/// +/// Note: Buffer is limited by number of blocks that it can contains and eviction of the block +/// is done by last recently used block. +#[derive(Debug)] +pub struct BlockBuffer { + /// Blocks ordered by block number inside the BTreeMap. + /// + /// Note: BTreeMap is used so that we can remove the finalized old blocks + /// from the buffer + blocks: BufferedBlocks, + /// Needed for removal of the blocks. and to connect the potential unconnected block + /// to the connected one. + parent_to_child: HashMap>, + /// LRU used for tracing oldest inserted blocks that are going to be + /// first in line for evicting if `max_blocks` limit is hit. + /// + /// Used as counter of amount of blocks inside buffer. + lru: LruCache, +} + +impl BlockBuffer { + /// Create new buffer with max limit of blocks + pub fn new(limit: usize) -> Self { + Self { + blocks: Default::default(), + parent_to_child: Default::default(), + lru: LruCache::new(NonZeroUsize::new(limit).unwrap()), + } + } + + /// Insert block inside the buffer. + pub fn insert_block(&mut self, block: SealedBlockWithSenders) { + let num_hash = block.num_hash(); + + self.parent_to_child.entry(block.parent_hash).or_default().insert(block.num_hash()); + self.blocks.entry(block.number).or_default().insert(block.hash, block); + + if let Some((evicted_num_hash, _)) = + self.lru.push(num_hash, ()).filter(|(b, _)| *b != num_hash) + { + // evict the block if limit is hit + if let Some(evicted_block) = self.remove_from_blocks(&evicted_num_hash) { + // evict the block if limit is hit + self.remove_from_parent(evicted_block.parent_hash, &evicted_num_hash); + } + } + } + + /// Get all the children of the block and its child children. + /// This is used to get all the blocks that are dependent on the block that is included. + /// + /// Note: that order of returned blocks is important and the blocks with lower block number + /// in the chain will come first so that they can be executed in the correct order. + pub fn take_all_childrens(&mut self, parent: BlockNumHash) -> Vec { + // remove parent block if present + let mut taken = Vec::new(); + if let Some(block) = self.remove_from_blocks(&parent) { + taken.push(block); + } + + taken.extend(self.remove_children(vec![parent]).into_iter()); + taken + } + + /// Clean up the old blocks from the buffer as blocks before finalization are not needed + /// anymore. We can discard them from the buffer. + pub fn clean_old_blocks(&mut self, finalized_number: BlockNumber) { + let mut remove_parent_children = Vec::new(); + + // discard all blocks that are before the finalized number. + while let Some(entry) = self.blocks.first_entry() { + if *entry.key() > finalized_number { + break + } + let blocks = entry.remove(); + remove_parent_children.extend( + blocks.into_iter().map(|(hash, block)| BlockNumHash::new(block.number, hash)), + ); + } + // remove from lru + for block in remove_parent_children.iter() { + self.lru.pop(block); + } + + self.remove_children(remove_parent_children); + } + + /// Return reference to buffered blocks + pub fn blocks(&self) -> &BufferedBlocks { + &self.blocks + } + + /// Return reference to the asked block. + pub fn block(&self, block: BlockNumHash) -> Option<&SealedBlockWithSenders> { + self.blocks.get(&block.number)?.get(&block.hash) + } + + /// Return number of blocks inside buffer. + pub fn len(&self) -> usize { + self.lru.len() + } + + /// Return if buffer is empty. + pub fn is_empty(&self) -> bool { + self.lru.is_empty() + } + + /// Remove from parent child connection. Dont touch childrens. + fn remove_from_parent(&mut self, parent: BlockHash, block: &BlockNumHash) { + // remove from parent to child connection, but only for this block parent. + if let hash_map::Entry::Occupied(mut entry) = self.parent_to_child.entry(parent) { + entry.get_mut().remove(block); + // if set is empty remove block entry. + if entry.get().is_empty() { + entry.remove(); + } + }; + } + + /// Remove block from `self.blocks`, This will also remove block from `self.lru`. + /// + /// Note: This function will not remove block from the `self.parent_to_child` connection. + fn remove_from_blocks(&mut self, block: &BlockNumHash) -> Option { + if let Entry::Occupied(mut entry) = self.blocks.entry(block.number) { + let ret = entry.get_mut().remove(&block.hash); + // if set is empty remove block entry. + if entry.get().is_empty() { + entry.remove(); + } + self.lru.pop(block); + return ret + }; + None + } + + /// Remove all children and their descendants for the given blocks and return them. + fn remove_children(&mut self, parent_blocks: Vec) -> Vec { + // remove all parent child connection and all the child children blocks that are connected + // to the discarded parent blocks. + let mut remove_parent_children = parent_blocks; + let mut removed_blocks = Vec::new(); + while let Some(parent_num_hash) = remove_parent_children.pop() { + // get this child blocks children and add them to the remove list. + if let Some(parent_childrens) = self.parent_to_child.remove(&parent_num_hash.hash) { + // remove child from buffer + for child in parent_childrens.iter() { + if let Some(block) = self.remove_from_blocks(child) { + removed_blocks.push(block); + } + } + remove_parent_children.extend(parent_childrens.into_iter()); + } + } + removed_blocks + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use reth_interfaces::test_utils::generators::random_block; + use reth_primitives::{BlockHash, BlockNumHash, SealedBlockWithSenders}; + + use crate::BlockBuffer; + + fn create_block(number: u64, parent: BlockHash) -> SealedBlockWithSenders { + let block = random_block(number, Some(parent), None, None); + block.seal_with_senders().unwrap() + } + + #[test] + fn simple_insertion() { + let block1 = create_block(10, BlockHash::random()); + let mut buffer = BlockBuffer::new(3); + + buffer.insert_block(block1.clone()); + assert_eq!(buffer.len(), 1); + assert_eq!(buffer.block(block1.num_hash()), Some(&block1)); + } + + #[test] + fn take_all_chain_of_childrens() { + let main_parent = BlockNumHash::new(9, BlockHash::random()); + let block1 = create_block(10, main_parent.hash); + let block2 = create_block(11, block1.hash); + let block3 = create_block(12, block2.hash); + let block4 = create_block(14, BlockHash::random()); + + let mut buffer = BlockBuffer::new(5); + + buffer.insert_block(block1.clone()); + buffer.insert_block(block2.clone()); + buffer.insert_block(block3.clone()); + buffer.insert_block(block4.clone()); + + assert_eq!(buffer.len(), 4); + assert_eq!(buffer.take_all_childrens(main_parent), vec![block1, block2, block3]); + assert_eq!(buffer.len(), 1); + } + + #[test] + fn take_all_multi_level_childrens() { + let main_parent = BlockNumHash::new(9, BlockHash::random()); + let block1 = create_block(10, main_parent.hash); + let block2 = create_block(11, block1.hash); + let block3 = create_block(11, block1.hash); + let block4 = create_block(12, block2.hash); + + let mut buffer = BlockBuffer::new(5); + + buffer.insert_block(block1.clone()); + buffer.insert_block(block2.clone()); + buffer.insert_block(block3.clone()); + buffer.insert_block(block4.clone()); + + assert_eq!(buffer.len(), 4); + assert_eq!( + buffer + .take_all_childrens(main_parent) + .into_iter() + .map(|b| (b.hash, b)) + .collect::>(), + HashMap::from([ + (block1.hash, block1), + (block2.hash, block2), + (block3.hash, block3), + (block4.hash, block4) + ]) + ); + assert_eq!(buffer.len(), 0); + } + + #[test] + fn take_self_with_childs() { + let main_parent = BlockNumHash::new(9, BlockHash::random()); + let block1 = create_block(10, main_parent.hash); + let block2 = create_block(11, block1.hash); + let block3 = create_block(11, block1.hash); + let block4 = create_block(12, block2.hash); + + let mut buffer = BlockBuffer::new(5); + + buffer.insert_block(block1.clone()); + buffer.insert_block(block2.clone()); + buffer.insert_block(block3.clone()); + buffer.insert_block(block4.clone()); + + assert_eq!(buffer.len(), 4); + assert_eq!( + buffer + .take_all_childrens(block1.num_hash()) + .into_iter() + .map(|b| (b.hash, b)) + .collect::>(), + HashMap::from([ + (block1.hash, block1), + (block2.hash, block2), + (block3.hash, block3), + (block4.hash, block4) + ]) + ); + assert_eq!(buffer.len(), 0); + } + + #[test] + fn clean_chain_of_childres() { + let main_parent = BlockNumHash::new(9, BlockHash::random()); + let block1 = create_block(10, main_parent.hash); + let block2 = create_block(11, block1.hash); + let block3 = create_block(12, block2.hash); + let block4 = create_block(14, BlockHash::random()); + + let mut buffer = BlockBuffer::new(5); + + buffer.insert_block(block1.clone()); + buffer.insert_block(block2.clone()); + buffer.insert_block(block3.clone()); + buffer.insert_block(block4.clone()); + + assert_eq!(buffer.len(), 4); + buffer.clean_old_blocks(block1.number); + assert_eq!(buffer.len(), 1); + } + + #[test] + fn clean_all_multi_level_childrens() { + let main_parent = BlockNumHash::new(9, BlockHash::random()); + let block1 = create_block(10, main_parent.hash); + let block2 = create_block(11, block1.hash); + let block3 = create_block(11, block1.hash); + let block4 = create_block(12, block2.hash); + + let mut buffer = BlockBuffer::new(5); + + buffer.insert_block(block1.clone()); + buffer.insert_block(block2.clone()); + buffer.insert_block(block3.clone()); + buffer.insert_block(block4.clone()); + + assert_eq!(buffer.len(), 4); + buffer.clean_old_blocks(block1.number); + assert_eq!(buffer.len(), 0); + } + + #[test] + fn clean_multi_chains() { + let main_parent = BlockNumHash::new(9, BlockHash::random()); + let block1 = create_block(10, main_parent.hash); + let block1a = create_block(10, main_parent.hash); + let block2 = create_block(11, block1.hash); + let block2a = create_block(11, block1.hash); + let random_block1 = create_block(10, BlockHash::random()); + let random_block2 = create_block(11, BlockHash::random()); + let random_block3 = create_block(12, BlockHash::random()); + + let mut buffer = BlockBuffer::new(10); + + buffer.insert_block(block1.clone()); + buffer.insert_block(block1a.clone()); + buffer.insert_block(block2.clone()); + buffer.insert_block(block2a.clone()); + buffer.insert_block(random_block1.clone()); + buffer.insert_block(random_block2.clone()); + buffer.insert_block(random_block3.clone()); + + assert_eq!(buffer.len(), 7); + buffer.clean_old_blocks(10); + assert_eq!(buffer.len(), 2); + } + + fn assert_block_existance(buffer: &BlockBuffer, block: &SealedBlockWithSenders) { + assert!(buffer.blocks.get(&block.number).and_then(|t| t.get(&block.hash)).is_none()); + assert!(buffer + .parent_to_child + .get(&block.parent_hash) + .and_then(|p| p.get(&block.num_hash())) + .is_none()); + } + + #[test] + fn evict_with_gap() { + let main_parent = BlockNumHash::new(9, BlockHash::random()); + let block1 = create_block(10, main_parent.hash); + let block2 = create_block(11, block1.hash); + let block3 = create_block(12, block2.hash); + let block4 = create_block(13, BlockHash::random()); + + let mut buffer = BlockBuffer::new(3); + + buffer.insert_block(block1.clone()); + buffer.insert_block(block2.clone()); + buffer.insert_block(block3.clone()); + buffer.insert_block(block4.clone()); + + // block1 gets evicted + assert_block_existance(&buffer, &block1); + + assert_eq!(buffer.len(), 3); + } + + #[test] + fn simple_eviction() { + let main_parent = BlockNumHash::new(9, BlockHash::random()); + let block1 = create_block(10, main_parent.hash); + let block2 = create_block(11, block1.hash); + let block3 = create_block(12, block2.hash); + let block4 = create_block(13, BlockHash::random()); + + let mut buffer = BlockBuffer::new(3); + + buffer.insert_block(block1.clone()); + buffer.insert_block(block2.clone()); + buffer.insert_block(block3.clone()); + buffer.insert_block(block4.clone()); + + // block3 gets evicted + assert_block_existance(&buffer, &block1); + + assert_eq!(buffer.len(), 3); + } +} diff --git a/crates/blockchain-tree/src/block_indices.rs b/crates/blockchain-tree/src/block_indices.rs index 92c4cb129b..dc523f80ad 100644 --- a/crates/blockchain-tree/src/block_indices.rs +++ b/crates/blockchain-tree/src/block_indices.rs @@ -118,11 +118,15 @@ impl BlockIndices { pub fn update_block_hashes( &mut self, hashes: BTreeMap, - ) -> BTreeSet { - let mut new_hashes = hashes.iter(); + ) -> (BTreeSet, Vec) { + // set new canonical hashes. + self.canonical_chain = hashes.clone(); + + let mut new_hashes = hashes.into_iter(); let mut old_hashes = self.canonical_chain().clone().into_iter(); - let mut remove = Vec::new(); + let mut removed = Vec::new(); + let mut added = Vec::new(); let mut new_hash = new_hashes.next(); let mut old_hash = old_hashes.next(); @@ -130,6 +134,11 @@ impl BlockIndices { loop { let Some(old_block_value) = old_hash else { // end of old_hashes canonical chain. New chain has more block then old chain. + while let Some(new) = new_hash { + // add new blocks to added list. + added.push(new.into()); + new_hash = new_hashes.next(); + } break }; let Some(new_block_value) = new_hash else { @@ -137,7 +146,7 @@ impl BlockIndices { // remove all present block. // this is mostly not going to happen as reorg should make new chain in Tree. while let Some(rem) = old_hash { - remove.push(rem); + removed.push(rem); old_hash = old_hashes.next(); } break; @@ -146,29 +155,34 @@ impl BlockIndices { match new_block_value.0.cmp(&old_block_value.0) { std::cmp::Ordering::Less => { // new chain has more past blocks than old chain + added.push(new_block_value.into()); new_hash = new_hashes.next(); } std::cmp::Ordering::Equal => { - if *new_block_value.1 != old_block_value.1 { + if new_block_value.1 != old_block_value.1 { // remove block hash as it is different - remove.push(old_block_value); + removed.push(old_block_value); + added.push(new_block_value.into()); } new_hash = new_hashes.next(); old_hash = old_hashes.next(); } std::cmp::Ordering::Greater => { // old chain has more past blocks that new chain - remove.push(old_block_value); + removed.push(old_block_value); old_hash = old_hashes.next() } } } - self.canonical_chain = hashes; - remove.into_iter().fold(BTreeSet::new(), |mut fold, (number, hash)| { - fold.extend(self.remove_block(number, hash)); - fold - }) + // remove childs of removed blocks + ( + removed.into_iter().fold(BTreeSet::new(), |mut fold, (number, hash)| { + fold.extend(self.remove_block(number, hash)); + fold + }), + added, + ) } /// Remove chain from indices and return dependent chains that needs to be removed. diff --git a/crates/blockchain-tree/src/blockchain_tree.rs b/crates/blockchain-tree/src/blockchain_tree.rs index d3cd345fab..fd9128bcd9 100644 --- a/crates/blockchain-tree/src/blockchain_tree.rs +++ b/crates/blockchain-tree/src/blockchain_tree.rs @@ -1,10 +1,15 @@ //! Implementation of [`BlockchainTree`] +use crate::{ + chain::BlockChainId, AppendableChain, BlockBuffer, BlockIndices, BlockchainTreeConfig, + PostStateData, TreeExternals, +}; use reth_db::{cursor::DbCursorRO, database::Database, tables, transaction::DbTx}; use reth_interfaces::{ blockchain_tree::BlockStatus, consensus::Consensus, executor::Error as ExecError, Error, }; use reth_primitives::{ - BlockHash, BlockNumber, ForkBlock, Hardfork, SealedBlock, SealedBlockWithSenders, U256, + BlockHash, BlockNumHash, BlockNumber, ForkBlock, Hardfork, SealedBlock, SealedBlockWithSenders, + U256, }; use reth_provider::{ chain::{ChainSplit, SplitAt}, @@ -16,12 +21,7 @@ use std::{ collections::{BTreeMap, HashMap}, sync::Arc, }; -use tracing::trace; - -use crate::{ - chain::BlockChainId, AppendableChain, BlockIndices, BlockchainTreeConfig, PostStateData, - TreeExternals, -}; +use tracing::{info, trace}; #[cfg_attr(doc, aquamarine::aquamarine)] /// Tree of chains and its identifications. @@ -67,6 +67,8 @@ use crate::{ pub struct BlockchainTree { /// The tracked chains and their current data. chains: HashMap, + /// Unconnected block buffer. + buffered_blocks: BlockBuffer, /// Static blockchain ID generator block_chain_id_generator: u64, /// Indices to block and their connection to the canonical chain. @@ -118,6 +120,7 @@ impl BlockchainTree Ok(Self { externals, + buffered_blocks: BlockBuffer::new(config.max_unconnected_blocks()), block_chain_id_generator: 0, chains: Default::default(), block_indices: BlockIndices::new( @@ -145,15 +148,6 @@ impl BlockchainTree }) } - // we will not even try to insert blocks that are too far in the future. - if block.number > last_finalized_block + self.config.max_blocks_in_chain() { - return Err(ExecError::PendingBlockIsInFuture { - block_number: block.number, - block_hash: block.hash(), - last_finalized: last_finalized_block, - }) - } - Ok(()) } @@ -227,11 +221,13 @@ impl BlockchainTree &mut self, block: SealedBlockWithSenders, ) -> Result { - // check if block parent can be found in Tree + let parent = block.parent_num_hash(); + let block_num_hash = block.num_hash(); - // Create a new sidechain by forking the given chain, or append the block if the parent - // block is the top of the given chain. - if let Some(chain_id) = self.block_indices.get_blocks_chain_id(&block.parent_hash) { + // check if block parent can be found in Tree + if let Some(chain_id) = self.block_indices.get_blocks_chain_id(&parent.hash) { + // Create a new sidechain by forking the given chain, or append the block if the parent + // block is the top of the given chain. let block_hashes = self.all_chain_hashes(chain_id); // get canonical fork. @@ -249,7 +245,7 @@ impl BlockchainTree let canonical_block_hashes = self.block_indices.canonical_chain(); // append the block if it is continuing the chain. - return if chain_tip == block.parent_hash { + let status = if chain_tip == block.parent_hash { let block_hash = block.hash(); let block_number = block.number; parent_chain.append_block( @@ -272,17 +268,19 @@ impl BlockchainTree )?; self.insert_chain(chain); Ok(BlockStatus::Accepted) - } + }; + self.try_connect_buffered_blocks(block_num_hash); + return status } + // if not found, check if the parent can be found inside canonical chain. - if Some(block.parent_hash) == self.block_indices.canonical_hash(block.number - 1) { + if Some(parent.hash) == self.block_indices.canonical_hash(parent.number) { // create new chain that points to that block //return self.fork_canonical_chain(block.clone()); // TODO save pending block to database // https://github.com/paradigmxyz/reth/issues/1713 let db = self.externals.shareable_db(); - let fork_block = ForkBlock { number: block.number - 1, hash: block.parent_hash }; // Validate that the block is post merge let parent_td = db @@ -312,16 +310,22 @@ impl BlockchainTree &block, &parent_header, canonical_block_hashes, - fork_block, + parent, &self.externals, )?; self.insert_chain(chain); + self.try_connect_buffered_blocks(block_num_hash); return Ok(block_status) } - // NOTE: Block doesn't have a parent, and if we receive this block in `make_canonical` - // function this could be a trigger to initiate p2p syncing, as we are missing the - // parent. + // if there is a parent inside the buffer, validate against it. + if let Some(buffered_parent) = self.buffered_blocks.block(parent) { + self.externals.consensus.validate_header_against_parent(&block, buffered_parent)? + } + + // insert block inside unconnected block buffer. Delaying it execution. + self.buffered_blocks.insert_block(block); + Ok(BlockStatus::Disconnected) } @@ -423,6 +427,38 @@ impl BlockchainTree self.insert_in_range_block_with_senders(block) } + /// Insert block for future execution. + pub fn buffer_block(&mut self, block: SealedBlockWithSenders) -> Result<(), Error> { + self.validate_block(&block)?; + self.buffered_blocks.insert_block(block); + Ok(()) + } + + /// Validate if block is correct and if i + fn validate_block(&self, block: &SealedBlockWithSenders) -> Result<(), Error> { + if let Err(e) = + self.externals.consensus.validate_header_with_total_difficulty(block, U256::MAX) + { + info!( + "Failed to validate header for TD related check with error: {e:?}, block:{:?}", + block + ); + return Err(e.into()) + } + + if let Err(e) = self.externals.consensus.validate_header(block) { + info!("Failed to validate header with error: {e:?}, block:{:?}", block); + return Err(e.into()) + } + + if let Err(e) = self.externals.consensus.validate_block(block) { + info!("Failed to validate blocks with error: {e:?}, block:{:?}", block); + return Err(e.into()) + } + + Ok(()) + } + /// Same as [BlockchainTree::insert_block_with_senders] but expects that the block is in range, /// See [BlockchainTree::ensure_block_is_in_range]. pub(crate) fn insert_in_range_block_with_senders( @@ -446,21 +482,28 @@ impl BlockchainTree return Ok(BlockStatus::Valid) } + // validate block hashes + self.validate_block(&block)?; + + // try to insert block self.try_insert_block(block) } /// Finalize blocks up until and including `finalized_block`, and remove them from the tree. pub fn finalize_block(&mut self, finalized_block: BlockNumber) { + // remove blocks let mut remove_chains = self.block_indices.finalize_canonical_blocks( finalized_block, self.config.num_of_additional_canonical_block_hashes(), ); - + // remove chains of removed blocks while let Some(chain_id) = remove_chains.pop_first() { if let Some(chain) = self.chains.remove(&chain_id) { remove_chains.extend(self.block_indices.remove_chain(&chain)); } } + // clean block buffer. + self.buffered_blocks.clean_old_blocks(finalized_block); } /// Reads the last `N` canonical hashes from the database and updates the block indices of the @@ -491,7 +534,8 @@ impl BlockchainTree .take(num_of_canonical_hashes as usize) .collect::, _>>()?; - let mut remove_chains = self.block_indices.update_block_hashes(last_canonical_hashes); + let (mut remove_chains, _) = + self.block_indices.update_block_hashes(last_canonical_hashes.clone()); // remove all chains that got discarded while let Some(chain_id) = remove_chains.first() { @@ -500,9 +544,35 @@ impl BlockchainTree } } + // check unconnected block buffer for the childs of new added blocks, + for added_block in last_canonical_hashes.into_iter() { + self.try_connect_buffered_blocks(added_block.into()) + } + + // check unconnected block buffer for childs of the chains. + let mut all_chain_blocks = Vec::new(); + for (_, chain) in self.chains.iter() { + for (&number, blocks) in chain.blocks.iter() { + all_chain_blocks.push(BlockNumHash { number, hash: blocks.hash }) + } + } + for block in all_chain_blocks.into_iter() { + self.try_connect_buffered_blocks(block) + } + Ok(()) } + /// Connect unconnected blocks + fn try_connect_buffered_blocks(&mut self, new_block: BlockNumHash) { + let include_blocks = self.buffered_blocks.take_all_childrens(new_block); + // insert child blocks + for block in include_blocks.into_iter() { + // dont fail on error, just ignore the block. + let _ = self.insert_block_with_senders(block); + } + } + /// Split a sidechain at the given point, and return the canonical part of it. /// /// The pending part of the chain is reinserted into the tree with the same `chain_id`. @@ -687,6 +757,8 @@ impl BlockchainTree #[cfg(test)] mod tests { + use crate::block_buffer::BufferedBlocks; + use super::*; use assert_matches::assert_matches; use reth_db::{ @@ -747,6 +819,8 @@ mod tests { fork_to_child: Option>>, /// Pending blocks pending_blocks: Option<(BlockNumber, HashSet)>, + /// Buffered blocks + buffered_blocks: Option, } impl TreeTester { @@ -766,6 +840,11 @@ mod tests { self } + fn with_buffered_blocks(mut self, buffered_blocks: BufferedBlocks) -> Self { + self.buffered_blocks = Some(buffered_blocks); + self + } + fn with_pending_blocks( mut self, pending_blocks: (BlockNumber, HashSet), @@ -792,6 +871,9 @@ mod tests { let hashes = hashes.into_iter().collect::>(); assert_eq!((num, hashes), pending_blocks); } + if let Some(buffered_blocks) = self.buffered_blocks { + assert_eq!(*tree.buffered_blocks.blocks(), buffered_blocks); + } } } @@ -800,15 +882,16 @@ mod tests { let data = BlockChainTestData::default_with_numbers(11, 12); let (block1, exec1) = data.blocks[0].clone(); let (block2, exec2) = data.blocks[1].clone(); + let genesis = data.genesis; // test pops execution results from vector, so order is from last to first. let externals = setup_externals(vec![exec2.clone(), exec1.clone(), exec2, exec1]); // last finalized block would be number 9. - setup_genesis(externals.db.clone(), data.genesis); + setup_genesis(externals.db.clone(), genesis.clone()); // make tree - let config = BlockchainTreeConfig::new(1, 2, 3); + let config = BlockchainTreeConfig::new(1, 2, 3, 2); let (sender, mut canon_notif) = tokio::sync::broadcast::channel(10); let mut tree = BlockchainTree::new(externals, sender, config).expect("failed to create tree"); @@ -816,31 +899,29 @@ mod tests { // genesis block 10 is already canonical assert_eq!(tree.make_canonical(&H256::zero()), Ok(())); - // insert block2 hits max chain size - assert_eq!( - tree.insert_block_with_senders(block2.clone()), - Err(ExecError::PendingBlockIsInFuture { - block_number: block2.number, - block_hash: block2.hash(), - last_finalized: 9, - } - .into()) - ); - // make genesis block 10 as finalized tree.finalize_block(10); - // block 2 parent is not known. + // block 2 parent is not known, block2 is buffered. assert_eq!(tree.insert_block_with_senders(block2.clone()), Ok(BlockStatus::Disconnected)); - // insert block1 - assert_eq!(tree.insert_block_with_senders(block1.clone()), Ok(BlockStatus::Valid)); - // already inserted block will return true. + // Buffered block: [block2] + // Trie state: + // | + // g1 (canonical blocks) + // | + + TreeTester::default() + .with_buffered_blocks(BTreeMap::from([( + block2.number, + HashMap::from([(block2.hash(), block2.clone())]), + )])) + .assert(&tree); + + // insert block1 and buffered block2 is inserted assert_eq!(tree.insert_block_with_senders(block1.clone()), Ok(BlockStatus::Valid)); - // insert block2 - assert_eq!(tree.insert_block_with_senders(block2.clone()), Ok(BlockStatus::Valid)); - + // Buffered blocks: [] // Trie state: // b2 (pending block) // | @@ -857,6 +938,12 @@ mod tests { .with_pending_blocks((block1.number, HashSet::from([block1.hash]))) .assert(&tree); + // already inserted block will return true. + assert_eq!(tree.insert_block_with_senders(block1.clone()), Ok(BlockStatus::Valid)); + + // block two is already inserted. + assert_eq!(tree.insert_block_with_senders(block2.clone()), Ok(BlockStatus::Valid)); + // make block1 canonical assert_eq!(tree.make_canonical(&block1.hash()), Ok(())); // check notification @@ -882,6 +969,8 @@ mod tests { .with_fork_to_child(HashMap::from([])) .assert(&tree); + /**** INSERT SIDE BLOCKS *** */ + let mut block1a = block1.clone(); let block1a_hash = H256([0x33; 32]); block1a.hash = block1a_hash; @@ -1069,6 +1158,19 @@ mod tests { Ok(CanonStateNotification::Commit{ new}) if *new.blocks() == BTreeMap::from([(block2.number,block2.clone())])); + // insert unconnected block2b + let mut block2b = block2a.clone(); + block2b.hash = H256([0x99; 32]); + block2b.parent_hash = H256([0x88; 32]); + + assert_eq!(tree.insert_block_with_senders(block2b.clone()), Ok(BlockStatus::Disconnected)); + TreeTester::default() + .with_buffered_blocks(BTreeMap::from([( + block2b.number, + HashMap::from([(block2b.hash(), block2b.clone())]), + )])) + .assert(&tree); + // update canonical block to b2, this would make b2a be removed assert_eq!(tree.restore_canonical_hashes(12), Ok(())); // Trie state: @@ -1083,6 +1185,7 @@ mod tests { .with_block_to_chain(HashMap::from([])) .with_fork_to_child(HashMap::from([])) .with_pending_blocks((block2.number + 1, HashSet::from([]))) + .with_buffered_blocks(BTreeMap::from([])) .assert(&tree); } } diff --git a/crates/blockchain-tree/src/chain.rs b/crates/blockchain-tree/src/chain.rs index 67ee8e2fa7..93d7f51b9b 100644 --- a/crates/blockchain-tree/src/chain.rs +++ b/crates/blockchain-tree/src/chain.rs @@ -138,10 +138,8 @@ impl AppendableChain { C: Consensus, EF: ExecutorFactory, { - externals.consensus.validate_header_with_total_difficulty(&block, U256::MAX)?; - externals.consensus.validate_header(&block)?; + // some checks are done before blocks comes here. externals.consensus.validate_header_against_parent(&block, parent_block)?; - externals.consensus.validate_block(&block)?; let (unseal, senders) = block.into_components(); let unseal = unseal.unseal(); diff --git a/crates/blockchain-tree/src/config.rs b/crates/blockchain-tree/src/config.rs index d830f066fd..681c8f61b3 100644 --- a/crates/blockchain-tree/src/config.rs +++ b/crates/blockchain-tree/src/config.rs @@ -9,6 +9,8 @@ pub struct BlockchainTreeConfig { max_blocks_in_chain: u64, /// The number of blocks that can be re-orged (finalization windows) max_reorg_depth: u64, + /// The number of unconnected blocks that we are buffering + max_unconnected_blocks: usize, /// For EVM's "BLOCKHASH" opcode we require last 256 block hashes. So we need to specify /// at least `additional_canonical_block_hashes`+`max_reorg_depth`, for eth that would be /// 256+64. @@ -25,6 +27,8 @@ impl Default for BlockchainTreeConfig { max_blocks_in_chain: 65, // EVM requires that last 256 block hashes are available. num_of_additional_canonical_block_hashes: 256, + // max unconnected blocks. + max_unconnected_blocks: 200, } } } @@ -35,11 +39,17 @@ impl BlockchainTreeConfig { max_reorg_depth: u64, max_blocks_in_chain: u64, num_of_additional_canonical_block_hashes: u64, + max_unconnected_blocks: usize, ) -> Self { if max_reorg_depth > max_blocks_in_chain { panic!("Side chain size should be more then finalization window"); } - Self { max_blocks_in_chain, max_reorg_depth, num_of_additional_canonical_block_hashes } + Self { + max_blocks_in_chain, + max_reorg_depth, + num_of_additional_canonical_block_hashes, + max_unconnected_blocks, + } } /// Return the maximum reorg depth. @@ -57,4 +67,9 @@ impl BlockchainTreeConfig { pub fn num_of_additional_canonical_block_hashes(&self) -> u64 { self.num_of_additional_canonical_block_hashes } + + /// Return max number of unconnected blocks that we are buffering + pub fn max_unconnected_blocks(&self) -> usize { + self.max_unconnected_blocks + } } diff --git a/crates/blockchain-tree/src/lib.rs b/crates/blockchain-tree/src/lib.rs index 6452ad2d38..4c453264ad 100644 --- a/crates/blockchain-tree/src/lib.rs +++ b/crates/blockchain-tree/src/lib.rs @@ -29,3 +29,7 @@ pub use shareable::ShareableBlockchainTree; pub mod post_state_data; pub use post_state_data::{PostStateData, PostStateDataRef}; + +/// Buffer of not executed blocks. +pub mod block_buffer; +pub use block_buffer::BlockBuffer; diff --git a/crates/blockchain-tree/src/shareable.rs b/crates/blockchain-tree/src/shareable.rs index df7b05322e..6843514373 100644 --- a/crates/blockchain-tree/src/shareable.rs +++ b/crates/blockchain-tree/src/shareable.rs @@ -35,9 +35,7 @@ impl ShareableBlockchainTree BlockchainTreeEngine for ShareableBlockchainTree { - /// Recover senders and call [`BlockchainTreeEngine::insert_block_with_senders`]. - fn insert_block(&self, block: SealedBlock) -> Result { - trace!(target: "blockchain_tree", ?block, "Inserting block"); + fn insert_block_without_senders(&self, block: SealedBlock) -> Result { let mut tree = self.tree.write(); tree.ensure_block_is_in_range(&block)?; let block = block @@ -46,11 +44,12 @@ impl BlockchainTreeEngine tree.insert_in_range_block_with_senders(block) } - fn insert_block_with_senders( - &self, - block: SealedBlockWithSenders, - ) -> Result { - trace!(target: "blockchain_tree", ?block, "Inserting block with senders"); + fn buffer_block(&self, block: SealedBlockWithSenders) -> Result<(), Error> { + self.tree.write().buffer_block(block) + } + + fn insert_block(&self, block: SealedBlockWithSenders) -> Result { + trace!(target: "blockchain_tree", ?block, "Inserting block"); self.tree.write().insert_block_with_senders(block) } diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index b1d2fba6e5..b416d04a41 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -389,7 +389,7 @@ where let status = if self.is_pipeline_idle() { let block_hash = block.hash; - match self.blockchain_tree.insert_block(block) { + match self.blockchain_tree.insert_block_without_senders(block) { Ok(status) => { let latest_valid_hash = matches!(status, BlockStatus::Valid).then_some(block_hash); @@ -404,15 +404,16 @@ where let latest_valid_hash = matches!(error, Error::Execution(ExecutorError::BlockPreMerge { .. })) .then_some(H256::zero()); - let status = match error { - Error::Execution(ExecutorError::PendingBlockIsInFuture { .. }) => { - PayloadStatusEnum::Syncing - } - error => PayloadStatusEnum::Invalid { validation_error: error.to_string() }, - }; + let status = PayloadStatusEnum::Invalid { validation_error: error.to_string() }; PayloadStatus::new(status, latest_valid_hash) } } + } else if let Err(error) = self.blockchain_tree.buffer_block_without_sender(block) { + let latest_valid_hash = + matches!(error, Error::Execution(ExecutorError::BlockPreMerge { .. })) + .then_some(H256::zero()); + let status = PayloadStatusEnum::Invalid { validation_error: error.to_string() }; + PayloadStatus::new(status, latest_valid_hash) } else { PayloadStatus::from_status(PayloadStatusEnum::Syncing) }; @@ -751,7 +752,7 @@ mod tests { // Setup blockchain tree let externals = TreeExternals::new(db.clone(), consensus, executor_factory, chain_spec); - let config = BlockchainTreeConfig::new(1, 2, 3); + let config = BlockchainTreeConfig::new(1, 2, 3, 2); let (canon_state_notification_sender, _) = tokio::sync::broadcast::channel(3); let tree = ShareableBlockchainTree::new( BlockchainTree::new(externals, canon_state_notification_sender, config) diff --git a/crates/interfaces/src/blockchain_tree.rs b/crates/interfaces/src/blockchain_tree.rs index 00c9ee8ba2..37cd9fad74 100644 --- a/crates/interfaces/src/blockchain_tree.rs +++ b/crates/interfaces/src/blockchain_tree.rs @@ -10,17 +10,23 @@ use std::collections::{BTreeMap, HashSet}; /// finalize and commit it to db. If we dont have the block, pipeline syncing should start to /// fetch the blocks from p2p. Do reorg in tables if canonical chain if needed. pub trait BlockchainTreeEngine: BlockchainTreeViewer + Send + Sync { - /// Recover senders and call [`BlockchainTreeEngine::insert_block_with_senders`]. - fn insert_block(&self, block: SealedBlock) -> Result { + /// Recover senders and call [`BlockchainTreeEngine::insert_block`]. + fn insert_block_without_senders(&self, block: SealedBlock) -> Result { let block = block.seal_with_senders().ok_or(ExecutionError::SenderRecoveryError)?; - self.insert_block_with_senders(block) + self.insert_block(block) } + /// Recover senders and call [`BlockchainTreeEngine::buffer_block`]. + fn buffer_block_without_sender(&self, block: SealedBlock) -> Result<(), Error> { + let block = block.seal_with_senders().ok_or(ExecutionError::SenderRecoveryError)?; + self.buffer_block(block) + } + + /// buffer block with senders + fn buffer_block(&self, block: SealedBlockWithSenders) -> Result<(), Error>; + /// Insert block with senders - fn insert_block_with_senders( - &self, - block: SealedBlockWithSenders, - ) -> Result; + fn insert_block(&self, block: SealedBlockWithSenders) -> Result; /// Finalize blocks up until and including `finalized_block`, and remove them from the tree. fn finalize_block(&self, finalized_block: BlockNumber); diff --git a/crates/interfaces/src/executor.rs b/crates/interfaces/src/executor.rs index 466a52d3b0..7a0256a981 100644 --- a/crates/interfaces/src/executor.rs +++ b/crates/interfaces/src/executor.rs @@ -1,4 +1,4 @@ -use reth_primitives::{BlockHash, BlockNumber, Bloom, H256}; +use reth_primitives::{BlockHash, BlockNumHash, BlockNumber, Bloom, H256}; use thiserror::Error; /// BlockExecutor Errors @@ -39,7 +39,7 @@ pub enum Error { #[error( "Appending chain on fork (other_chain_fork:?) is not possible as the tip is {chain_tip:?}" )] - AppendChainDoesntConnect { chain_tip: (u64, H256), other_chain_fork: (u64, H256) }, + AppendChainDoesntConnect { chain_tip: BlockNumHash, other_chain_fork: BlockNumHash }, #[error("Canonical chain header #{block_hash} can't be found ")] CanonicalChain { block_hash: BlockHash }, #[error("Can't insert #{block_number} {block_hash} as last finalized block number is {last_finalized}")] @@ -48,12 +48,6 @@ pub enum Error { block_number: BlockNumber, last_finalized: BlockNumber, }, - #[error("Block #{block_number} ({block_hash:?}) too far into the future. Last finalized block is #{last_finalized}")] - PendingBlockIsInFuture { - block_hash: BlockHash, - block_number: BlockNumber, - last_finalized: BlockNumber, - }, #[error("Block number #{block_number} not found in blockchain tree chain")] BlockNumberNotFoundInChain { block_number: BlockNumber }, #[error("Block hash {block_hash} not found in blockchain tree chain")] diff --git a/crates/primitives/src/block.rs b/crates/primitives/src/block.rs index e8b39b1333..5e197aa325 100644 --- a/crates/primitives/src/block.rs +++ b/crates/primitives/src/block.rs @@ -656,7 +656,7 @@ impl AsRef for RpcBlockHash { } /// Block number and hash. -#[derive(Clone, Copy, Default, PartialEq, Eq)] +#[derive(Clone, Copy, Hash, Default, PartialEq, Eq)] pub struct BlockNumHash { /// Block number pub number: BlockNumber, diff --git a/crates/primitives/src/header.rs b/crates/primitives/src/header.rs index 575652e932..c1a8b80a12 100644 --- a/crates/primitives/src/header.rs +++ b/crates/primitives/src/header.rs @@ -2,7 +2,7 @@ use crate::{ basefee::calculate_next_block_base_fee, keccak256, proofs::{EMPTY_LIST_HASH, EMPTY_ROOT}, - BlockHash, BlockNumber, Bloom, Bytes, H160, H256, U256, + BlockHash, BlockNumHash, BlockNumber, Bloom, Bytes, H160, H256, U256, }; use bytes::{Buf, BufMut, BytesMut}; use ethers_core::types::{Block, H256 as EthersH256, H64}; @@ -118,6 +118,11 @@ impl Default for Header { } impl Header { + /// Retuen paret block number and hash + pub fn parent_num_hash(&self) -> BlockNumHash { + BlockNumHash { number: self.number.saturating_sub(1), hash: self.parent_hash } + } + /// Heavy function that will calculate hash of data and will *not* save the change to metadata. /// Use [`Header::seal`], [`SealedHeader`] and unlock if you need hash to be persistent. pub fn hash_slow(&self) -> H256 { @@ -309,8 +314,8 @@ impl SealedHeader { } /// Return the number hash tuple. - pub fn num_hash(&self) -> (BlockNumber, BlockHash) { - (self.number, self.hash) + pub fn num_hash(&self) -> BlockNumHash { + BlockNumHash::new(self.number, self.hash) } } diff --git a/crates/storage/provider/src/chain.rs b/crates/storage/provider/src/chain.rs index 20a407bf20..01b88da984 100644 --- a/crates/storage/provider/src/chain.rs +++ b/crates/storage/provider/src/chain.rs @@ -143,7 +143,7 @@ impl Chain { if chain_tip.hash != chain.fork_block_hash() { return Err(ExecError::AppendChainDoesntConnect { chain_tip: chain_tip.num_hash(), - other_chain_fork: chain.fork_block().into_components(), + other_chain_fork: chain.fork_block(), } .into()) } diff --git a/crates/storage/provider/src/providers/mod.rs b/crates/storage/provider/src/providers/mod.rs index 00e69297e6..580ab3405d 100644 --- a/crates/storage/provider/src/providers/mod.rs +++ b/crates/storage/provider/src/providers/mod.rs @@ -339,8 +339,12 @@ where DB: Send + Sync, Tree: BlockchainTreeEngine, { - fn insert_block_with_senders(&self, block: SealedBlockWithSenders) -> Result { - self.tree.insert_block_with_senders(block) + fn buffer_block(&self, block: SealedBlockWithSenders) -> Result<()> { + self.tree.buffer_block(block) + } + + fn insert_block(&self, block: SealedBlockWithSenders) -> Result { + self.tree.insert_block(block) } fn finalize_block(&self, finalized_block: BlockNumber) {