From 40739693a1138d2bbb66a8af032d81d518bdcef7 Mon Sep 17 00:00:00 2001 From: skoupidi Date: Wed, 17 Apr 2024 17:42:41 +0300 Subject: [PATCH] darkfid: persist sync headers in a sled tree + some minor beautifications --- bin/darkfid/src/task/sync.rs | 126 ++++++++++----- bin/darkfid/src/tests/harness.rs | 2 + bin/darkfid/src/tests/mod.rs | 2 + .../research/blockchain-explorer/src/main.rs | 19 ++- src/blockchain/block_store.rs | 11 +- src/blockchain/header_store.rs | 145 +++++++++++++++--- src/blockchain/mod.rs | 2 +- src/error.rs | 3 + 8 files changed, 235 insertions(+), 75 deletions(-) diff --git a/bin/darkfid/src/task/sync.rs b/bin/darkfid/src/task/sync.rs index 44cb1abf3..7d79370c0 100644 --- a/bin/darkfid/src/task/sync.rs +++ b/bin/darkfid/src/task/sync.rs @@ -19,14 +19,10 @@ use std::collections::HashMap; use darkfi::{ - blockchain::{Header, HeaderHash}, - net::ChannelPtr, - system::sleep, - util::encoding::base64, - Error, Result, + blockchain::HeaderHash, net::ChannelPtr, system::sleep, util::encoding::base64, Error, Result, }; use darkfi_serial::serialize_async; -use log::{debug, info, warn}; +use log::{debug, error, info, warn}; use rand::{prelude::SliceRandom, rngs::OsRng}; use tinyjson::JsonValue; @@ -50,9 +46,8 @@ pub async fn sync_task(node: &Darkfid) -> Result<()> { // TODO: Configure a checkpoint, filter peers that don't have that and start // syncing the sequence until that - // Grab last known block/header - // TODO: grab last known header from the headers sync tree, once added - let mut last = node.validator.blockchain.last()?; + // Grab last known block header + let mut last = last_header(node)?; info!(target: "darkfid::task::sync_task", "Last known block: {} - {}", last.0, last.1); loop { // Grab the most common tip and the corresponding peers @@ -60,12 +55,12 @@ pub async fn sync_task(node: &Darkfid) -> Result<()> { // Retrieve all the headers backawards until our last known one and verify them. // We use the next height, in order to also retrieve the peers tip header. - let headers = retrieve_headers(&common_tip_peers, last, common_tip_height + 1).await?; + retrieve_headers(node, &common_tip_peers, last, common_tip_height + 1).await?; // Retrieve all the blocks for those headers and apply them to canonical - retrieve_blocks(node, &peers, headers).await?; + retrieve_blocks(node, &peers).await?; - let last_received = node.validator.blockchain.last()?; + let last_received = last_header(node)?; info!(target: "darkfid::task::sync_task", "Last received block: {} - {}", last_received.0, last_received.1); if last == last_received { @@ -125,11 +120,22 @@ async fn synced_peers(node: &Darkfid) -> Result> { Ok(peers) } +/// Auxiliary function to retrieve last known block header, including existing pending sync ones. +fn last_header(node: &Darkfid) -> Result<(u32, HeaderHash)> { + // First we check if we have pending sync headers + if let Some(last_sync) = node.validator.blockchain.headers.get_last_sync()? { + return Ok((last_sync.height, last_sync.hash())) + } + // Then we grab the last one from the actual canonical chain + node.validator.blockchain.last() +} + /// Auxiliary function to ask all peers for their current tip and find the most common one. async fn most_common_tip( peers: &[ChannelPtr], last_tip: &HeaderHash, ) -> Result<(u32, Vec)> { + info!(target: "darkfid::task::sync::most_common_tip", "Receiving tip from peers..."); let mut tips: HashMap<(u32, [u8; 32]), Vec> = HashMap::new(); for peer in peers { // Node creates a `TipRequest` and sends it @@ -164,27 +170,30 @@ async fn most_common_tip( common_tip_peers = peers; } if common_tips.len() > 1 { - info!(target: "darkfid::task::sync::most_common_tip", "Multiple common tips found: {:?}", common_tips); - return Err(Error::UnsupportedChain) + error!(target: "darkfid::task::sync::most_common_tip", "Multiple common tips found: {:?}", common_tips); + return Err(Error::BlockchainSyncError) } + info!(target: "darkfid::task::sync::most_common_tip", "Received tip from peers: {} - {}", common_tips[0].0, HeaderHash::new(common_tips[0].1)); Ok((common_tips[0].0, common_tip_peers)) } /// Auxiliary function to retrieve headers backwards until our last known one and verify them. async fn retrieve_headers( + node: &Darkfid, peers: &[ChannelPtr], last_known: (u32, HeaderHash), tip_height: u32, -) -> Result> { +) -> Result<()> { + info!(target: "darkfid::task::sync::retrieve_headers", "Retrieving missing headers from peers..."); // Communication setup let mut peer_subs = vec![]; for peer in peers { peer_subs.push(peer.subscribe_msg::().await?); } - // TODO: store them in a sled tree - let mut headers = vec![]; + // We subtract 1 since tip_height is increased by one + let total = tip_height - last_known.0 - 1; let mut last_tip_height = tip_height; 'headers_loop: loop { for (index, peer) in peers.iter().enumerate() { @@ -205,38 +214,70 @@ async fn retrieve_headers( break 'headers_loop } - response_headers.extend_from_slice(&headers); - headers = response_headers; - last_tip_height = headers[0].height; + // Store the headers + node.validator.blockchain.headers.insert_sync(&response_headers)?; + last_tip_height = response_headers[0].height; + info!(target: "darkfid::task::sync::retrieve_headers", "Headers received: {}/{}", node.validator.blockchain.headers.len_sync(), total); } } // Check if we retrieved any new headers - if headers.is_empty() { - return Ok(headers); + if node.validator.blockchain.headers.is_empty_sync() { + return Ok(()); } // Verify headers sequence. Here we do a quick and dirty verification // of just the hashes and heights sequence. We will formaly verify - // the blocks when we retrieve them. + // the blocks when we retrieve them. We verify them in batches, + // to not load them all in memory. + info!(target: "darkfid::task::sync::retrieve_headers", "Verifying headers sequence..."); + let mut verified_headers = 0; + let total = node.validator.blockchain.headers.len_sync(); + // First we verify the first `BATCH` sequence, using the last canonical known one as + // the first sync header previous. + let last_known = node.validator.blockchain.last()?; + let mut headers = node.validator.blockchain.headers.get_after_sync(0, BATCH)?; if headers[0].previous != last_known.1 || headers[0].height != last_known.0 + 1 { return Err(Error::BlockIsInvalid(headers[0].hash().as_string())) } + verified_headers += 1; for (index, header) in headers[1..].iter().enumerate() { if header.previous != headers[index].hash() || header.height != headers[index].height + 1 { return Err(Error::BlockIsInvalid(header.hash().as_string())) } + verified_headers += 1; + } + info!(target: "darkfid::task::sync::retrieve_headers", "Headers verified: {}/{}", verified_headers, total); + // Now we verify the rest sequences + let mut last_checked = headers.last().unwrap().clone(); + headers = node.validator.blockchain.headers.get_after_sync(last_checked.height, BATCH)?; + while !headers.is_empty() { + if headers[0].previous != last_checked.hash() || + headers[0].height != last_checked.height + 1 + { + return Err(Error::BlockIsInvalid(headers[0].hash().as_string())) + } + verified_headers += 1; + for (index, header) in headers[1..].iter().enumerate() { + if header.previous != headers[index].hash() || + header.height != headers[index].height + 1 + { + return Err(Error::BlockIsInvalid(header.hash().as_string())) + } + verified_headers += 1; + } + last_checked = headers.last().unwrap().clone(); + headers = node.validator.blockchain.headers.get_after_sync(last_checked.height, BATCH)?; + info!(target: "darkfid::task::sync::retrieve_headers", "Headers verified: {}/{}", verified_headers, total); } - Ok(headers) + info!(target: "darkfid::task::sync::retrieve_headers", "Headers sequence verified!"); + Ok(()) } /// Auxiliary function to retrieve blocks of provided headers and apply them to canonical. -async fn retrieve_blocks( - node: &Darkfid, - peers: &[ChannelPtr], - mut headers: Vec
, -) -> Result<()> { +async fn retrieve_blocks(node: &Darkfid, peers: &[ChannelPtr]) -> Result<()> { + info!(target: "darkfid::task::sync::retrieve_blocks", "Retrieving missing blocks from peers..."); // Communication setup let mut peer_subs = vec![]; for peer in peers { @@ -244,20 +285,18 @@ async fn retrieve_blocks( } let notif_sub = node.subscribers.get("blocks").unwrap(); + let mut received_blocks = 0; + let total = node.validator.blockchain.headers.len_sync(); 'blocks_loop: loop { for (index, peer) in peers.iter().enumerate() { // Grab first `BATCH` headers - let mut headers_copy = headers.clone(); - let mut request_headers = vec![]; - while request_headers.len() < BATCH { - if headers_copy.is_empty() { - break; - } - request_headers.push(headers_copy.remove(0).hash()); + let headers = node.validator.blockchain.headers.get_after_sync(0, BATCH)?; + if headers.is_empty() { + break 'blocks_loop } // Node creates a `SyncRequest` and sends it - let request = SyncRequest { headers: request_headers }; + let request = SyncRequest { headers: headers.iter().map(|h| h.hash()).collect() }; peer.send(&request).await?; // Node waits for response @@ -269,6 +308,11 @@ async fn retrieve_blocks( debug!(target: "darkfid::task::sync::retrieve_blocks", "Processing received blocks"); node.validator.add_blocks(&response.blocks).await?; + // Remove synced headers + node.validator.blockchain.headers.remove_sync( + &response.blocks.iter().map(|b| b.header.height).collect::>(), + )?; + // Notify subscriber for block in &response.blocks { info!(target: "darkfid::task::sync::retrieve_blocks", "Appended block: {} - {}", block.header.height, block.hash()); @@ -277,11 +321,8 @@ async fn retrieve_blocks( notif_sub.notify(vec![encoded_block].into()).await; } - headers = headers_copy; - - if headers.is_empty() { - break 'blocks_loop - } + received_blocks += response.blocks.len(); + info!(target: "darkfid::task::sync::retrieve_blocks", "Blocks received: {}/{}", received_blocks, total); } } @@ -290,6 +331,7 @@ async fn retrieve_blocks( /// Auxiliary function to retrieve best fork state from a random peer. async fn sync_best_fork(node: &Darkfid, peers: &[ChannelPtr], last_tip: &HeaderHash) -> Result<()> { + info!(target: "darkfid::task::sync::sync_best_fork", "Syncing fork states from peers..."); // Getting a random peer to ask for blocks let channel = &peers.choose(&mut OsRng).unwrap(); diff --git a/bin/darkfid/src/tests/harness.rs b/bin/darkfid/src/tests/harness.rs index 0c4082b94..e69974d34 100644 --- a/bin/darkfid/src/tests/harness.rs +++ b/bin/darkfid/src/tests/harness.rs @@ -116,6 +116,8 @@ impl Harness { let alice_blockchain_len = alice.blockchain.len(); assert_eq!(alice_blockchain_len, bob.blockchain.len()); assert_eq!(alice_blockchain_len, total_blocks); + assert!(alice.blockchain.headers.is_empty_sync()); + assert!(bob.blockchain.headers.is_empty_sync()); Ok(()) } diff --git a/bin/darkfid/src/tests/mod.rs b/bin/darkfid/src/tests/mod.rs index 11793c055..b7257632e 100644 --- a/bin/darkfid/src/tests/mod.rs +++ b/bin/darkfid/src/tests/mod.rs @@ -95,6 +95,7 @@ async fn sync_blocks_real(ex: Arc>) -> Result<()> { let charlie = &charlie.validator; charlie.validate_blockchain(pow_target, pow_fixed_difficulty.clone()).await?; assert_eq!(alice.blockchain.len(), charlie.blockchain.len()); + assert!(charlie.blockchain.headers.is_empty_sync()); // Node must have just the best fork let forks = alice.consensus.forks.read().await; let best_fork = &forks[best_fork_index(&forks)?]; @@ -160,6 +161,7 @@ async fn sync_blocks_real(ex: Arc>) -> Result<()> { charlie.finalization().await?; charlie.validate_blockchain(pow_target, pow_fixed_difficulty).await?; assert_eq!(alice.blockchain.len(), charlie.blockchain.len()); + assert!(charlie.blockchain.headers.is_empty_sync()); assert_eq!(last, charlie.blockchain.last()?.1); let charlie_forks = charlie.consensus.forks.read().await; assert_eq!(charlie_forks.len(), 1); diff --git a/script/research/blockchain-explorer/src/main.rs b/script/research/blockchain-explorer/src/main.rs index 32c2851d7..e7de1a486 100644 --- a/script/research/blockchain-explorer/src/main.rs +++ b/script/research/blockchain-explorer/src/main.rs @@ -86,22 +86,33 @@ impl HeaderInfo { #[derive(Debug)] struct HeaderStoreInfo { - _headers: Vec, + _main: Vec, + _sync: Vec, } impl HeaderStoreInfo { pub fn new(headerstore: &HeaderStore) -> HeaderStoreInfo { - let mut _headers = Vec::new(); + let mut _main = Vec::new(); let result = headerstore.get_all(); match result { Ok(iter) => { for (hash, header) in iter.iter() { - _headers.push(HeaderInfo::new(*hash, header)); + _main.push(HeaderInfo::new(*hash, header)); } } Err(e) => println!("Error: {:?}", e), } - HeaderStoreInfo { _headers } + let mut _sync = Vec::new(); + let result = headerstore.get_all_sync(); + match result { + Ok(iter) => { + for (_, header) in iter.iter() { + _sync.push(HeaderInfo::new(header.hash(), header)); + } + } + Err(e) => println!("Error: {:?}", e), + } + HeaderStoreInfo { _main, _sync } } } diff --git a/src/blockchain/block_store.rs b/src/blockchain/block_store.rs index 13c17358a..7c2825783 100644 --- a/src/blockchain/block_store.rs +++ b/src/blockchain/block_store.rs @@ -507,13 +507,13 @@ impl BlockStore { /// Fetch n hashes before given height. In the iteration, if an order /// height is not found, the iteration stops and the function returns what - /// it has found so far in the `BlockOrderStore`. + /// it has found so far in the store's order tree. pub fn get_before(&self, height: u32, n: usize) -> Result> { let mut ret = vec![]; let mut key = height; let mut counter = 0; - while counter <= n { + while counter < n { if let Some(found) = self.order.get_lt(key.to_be_bytes())? { let (height, hash) = parse_u32_key_record(found)?; key = height; @@ -530,10 +530,7 @@ impl BlockStore { /// Fetch the first block hash in the order tree, based on the `Ord` /// implementation for `Vec`. pub fn get_first(&self) -> Result<(u32, HeaderHash)> { - let found = match self.order.first()? { - Some(s) => s, - None => return Err(Error::BlockHeightNotFound(0u32)), - }; + let Some(found) = self.order.first()? else { return Err(Error::BlockHeightNotFound(0u32)) }; let (height, hash) = parse_u32_key_record(found)?; Ok((height, hash)) @@ -557,7 +554,7 @@ impl BlockStore { Ok(Some(block_difficulty)) } - /// Fetch the last N records from the difficulty store, in order. + /// Fetch the last N records from the store's difficulty tree, in order. pub fn get_last_n_difficulties(&self, n: usize) -> Result> { // Build an iterator to retrieve last N records let records = self.difficulty.iter().rev().take(n); diff --git a/src/blockchain/header_store.rs b/src/blockchain/header_store.rs index 09e297abf..571bec1ac 100644 --- a/src/blockchain/header_store.rs +++ b/src/blockchain/header_store.rs @@ -30,7 +30,7 @@ use darkfi_serial::{deserialize, serialize, Encodable, SerialDecodable, SerialEn use crate::{util::time::Timestamp, Error, Result}; -use super::{parse_record, SledDbOverlayPtr}; +use super::{parse_record, parse_u32_key_record, SledDbOverlayPtr}; #[derive(Copy, Clone, Debug, Eq, PartialEq, SerialEncodable, SerialDecodable)] // We have to introduce a type rather than using an alias so we can restrict API access @@ -106,30 +106,46 @@ impl Default for Header { } } -/// [`Header`] sled tree const SLED_HEADER_TREE: &[u8] = b"_headers"; +const SLED_SYNC_HEADER_TREE: &[u8] = b"_sync_headers"; -/// The `HeaderStore` is a `sled` tree storing all the blockchain's blocks' headers -/// where the key is the headers' hash, and value is the serialized header. +/// The `HeaderStore` is a structure representing all `sled` trees related +/// to storing the blockchain's blocks's header information. #[derive(Clone)] -pub struct HeaderStore(pub sled::Tree); +pub struct HeaderStore { + /// Main `sled` tree, storing all the blockchain's blocks' headers, + /// where the key is the headers' hash, and value is the serialized header. + pub main: sled::Tree, + /// The `sled` tree storing all the node pending headers while syncing, + /// where the key is the height number, and the value is the serialized + /// header. + pub sync: sled::Tree, +} impl HeaderStore { /// Opens a new or existing `HeaderStore` on the given sled database. pub fn new(db: &sled::Db) -> Result { - let tree = db.open_tree(SLED_HEADER_TREE)?; - Ok(Self(tree)) + let main = db.open_tree(SLED_HEADER_TREE)?; + let sync = db.open_tree(SLED_SYNC_HEADER_TREE)?; + Ok(Self { main, sync }) } - /// Insert a slice of [`Header`] into the blockstore. + /// Insert a slice of [`Header`] into the store's main tree. pub fn insert(&self, headers: &[Header]) -> Result> { let (batch, ret) = self.insert_batch(headers); - self.0.apply_batch(batch)?; + self.main.apply_batch(batch)?; Ok(ret) } - /// Generate the sled batch corresponding to an insert, so caller - /// can handle the write operation. + /// Insert a slice of [`Header`] into the store's sync tree. + pub fn insert_sync(&self, headers: &[Header]) -> Result<()> { + let batch = self.insert_batch_sync(headers); + self.sync.apply_batch(batch)?; + Ok(()) + } + + /// Generate the sled batch corresponding to an insert to the main + /// tree, so caller can handle the write operation. /// The header's hash() function output is used as the key, /// while value is the serialized [`Header`] itself. /// On success, the function returns the header hashes in the same @@ -147,21 +163,35 @@ impl HeaderStore { (batch, ret) } - /// Check if the headerstore contains a given headerhash. - pub fn contains(&self, headerhash: &HeaderHash) -> Result { - Ok(self.0.contains_key(headerhash.inner())?) + /// Generate the sled batch corresponding to an insert to the sync + /// tree, so caller can handle the write operation. + /// The header height is used as the key, while value is the serialized + /// [`Header`] itself. + pub fn insert_batch_sync(&self, headers: &[Header]) -> sled::Batch { + let mut batch = sled::Batch::default(); + + for header in headers { + batch.insert(&header.height.to_be_bytes(), serialize(header)); + } + + batch } - /// Fetch given headerhashes from the headerstore. + /// Check if the store's main tree contains a given header hash. + pub fn contains(&self, headerhash: &HeaderHash) -> Result { + Ok(self.main.contains_key(headerhash.inner())?) + } + + /// Fetch given header hashes from the store's main tree. /// The resulting vector contains `Option`, which is `Some` if the header - /// was found in the headerstore, and otherwise it is `None`, if it has not. - /// The second parameter is a boolean which tells the function to fail in - /// case at least one header was not found. + /// was found in the store's main tree, and otherwise it is `None`, if it + /// has not. The second parameter is a boolean which tells the function to + /// fail in case at least one header was not found. pub fn get(&self, headerhashes: &[HeaderHash], strict: bool) -> Result>> { let mut ret = Vec::with_capacity(headerhashes.len()); for hash in headerhashes { - if let Some(found) = self.0.get(hash.inner())? { + if let Some(found) = self.main.get(hash.inner())? { let header = deserialize(&found)?; ret.push(Some(header)); continue @@ -175,18 +205,91 @@ impl HeaderStore { Ok(ret) } - /// Retrieve all headers from the headerstore in the form of a tuple + /// Retrieve all headers from the store's main tree in the form of a tuple /// (`headerhash`, `header`). /// Be careful as this will try to load everything in memory. pub fn get_all(&self) -> Result> { let mut headers = vec![]; - for header in self.0.iter() { + for header in self.main.iter() { headers.push(parse_record(header.unwrap())?); } Ok(headers) } + + /// Retrieve all headers from the store's sync tree in the form of a tuple + /// (`height`, `header`). + /// Be careful as this will try to load everything in memory. + pub fn get_all_sync(&self) -> Result> { + let mut headers = vec![]; + + for record in self.sync.iter() { + headers.push(parse_u32_key_record(record.unwrap())?); + } + + Ok(headers) + } + + /// Fetch the last header in the store's sync tree, based on the `Ord` + /// implementation for `Vec`. + pub fn get_last_sync(&self) -> Result> { + let Some(found) = self.sync.last()? else { return Ok(None) }; + let (_, header) = parse_u32_key_record(found)?; + + Ok(Some(header)) + } + + /// Fetch n hashes after given height. In the iteration, if a header + /// height is not found, the iteration stops and the function returns what + /// it has found so far in the store's sync tree. + pub fn get_after_sync(&self, height: u32, n: usize) -> Result> { + let mut ret = vec![]; + + let mut key = height; + let mut counter = 0; + while counter < n { + if let Some(found) = self.sync.get_gt(key.to_be_bytes())? { + let (height, hash) = parse_u32_key_record(found)?; + key = height; + ret.push(hash); + counter += 1; + continue + } + break + } + + Ok(ret) + } + + /// Retrieve store's sync tree records count. + pub fn len_sync(&self) -> usize { + self.sync.len() + } + + /// Check if store's sync tree contains any records. + pub fn is_empty_sync(&self) -> bool { + self.sync.is_empty() + } + + /// Remove a slice of [`u32`] from the store's sync tree. + pub fn remove_sync(&self, heights: &[u32]) -> Result<()> { + let batch = self.remove_batch_sync(heights); + self.sync.apply_batch(batch)?; + Ok(()) + } + + /// Generate the sled batch corresponding to a remove from the store's sync + /// tree, so caller can handle the write operation. + pub fn remove_batch_sync(&self, heights: &[u32]) -> sled::Batch { + let mut batch = sled::Batch::default(); + + for height in heights { + batch.remove(&height.to_be_bytes()); + } + + batch + } } /// Overlay structure over a [`HeaderStore`] instance. diff --git a/src/blockchain/mod.rs b/src/blockchain/mod.rs index b4083df4a..78a6f8279 100644 --- a/src/blockchain/mod.rs +++ b/src/blockchain/mod.rs @@ -79,7 +79,7 @@ impl Blockchain { // Store header let (headers_batch, _) = self.headers.insert_batch(&[block.header.clone()]); - trees.push(self.headers.0.clone()); + trees.push(self.headers.main.clone()); batches.push(headers_batch); // Store block diff --git a/src/error.rs b/src/error.rs index 2fa7fe996..27de7cbda 100644 --- a/src/error.rs +++ b/src/error.rs @@ -382,6 +382,9 @@ pub enum Error { #[error("Block {0} contains 0 transactions")] BlockContainsNoTransactions(String), + #[error("Blockchain sync failed")] + BlockchainSyncError, + #[error("Contract {0} not found in database")] ContractNotFound(String),