From 2b1a34116df2731a4e4abc28e4a05133558bc1a0 Mon Sep 17 00:00:00 2001 From: Roman Krasiuk Date: Fri, 23 Jun 2023 18:47:55 +0300 Subject: [PATCH] chore(provider): simplify history unwind (#3355) --- Cargo.lock | 1 + crates/storage/db/Cargo.toml | 1 + .../db/src/tables/models/sharded_key.rs | 6 + .../src/tables/models/storage_sharded_key.rs | 5 +- .../src/providers/database/provider.rs | 201 ++++++++---------- 5 files changed, 105 insertions(+), 109 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 96ccb4205b..847fc94405 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5128,6 +5128,7 @@ dependencies = [ "async-trait", "bytes", "criterion", + "derive_more", "futures", "heapless", "iai", diff --git a/crates/storage/db/Cargo.toml b/crates/storage/db/Cargo.toml index 818c22738b..5254d69bb6 100644 --- a/crates/storage/db/Cargo.toml +++ b/crates/storage/db/Cargo.toml @@ -37,6 +37,7 @@ page_size = "0.4.2" thiserror = { workspace = true } tempfile = { version = "3.3.0", optional = true } parking_lot = "0.12" +derive_more = "0.99" # arbitrary utils arbitrary = { version = "1.1.7", features = ["derive"], optional = true } diff --git a/crates/storage/db/src/tables/models/sharded_key.rs b/crates/storage/db/src/tables/models/sharded_key.rs index 71985f1f8e..a38c3af3a3 100644 --- a/crates/storage/db/src/tables/models/sharded_key.rs +++ b/crates/storage/db/src/tables/models/sharded_key.rs @@ -24,6 +24,12 @@ pub struct ShardedKey { pub highest_block_number: BlockNumber, } +impl AsRef> for ShardedKey { + fn as_ref(&self) -> &ShardedKey { + self + } +} + impl ShardedKey { /// Creates a new `ShardedKey`. pub fn new(key: T, highest_block_number: BlockNumber) -> Self { diff --git a/crates/storage/db/src/tables/models/storage_sharded_key.rs b/crates/storage/db/src/tables/models/storage_sharded_key.rs index 2b8025a8f3..984933d1f1 100644 --- a/crates/storage/db/src/tables/models/storage_sharded_key.rs +++ b/crates/storage/db/src/tables/models/storage_sharded_key.rs @@ -4,7 +4,7 @@ use crate::{ table::{Decode, Encode}, DatabaseError, }; - +use derive_more::AsRef; use reth_primitives::{BlockNumber, H160, H256}; use serde::{Deserialize, Serialize}; @@ -19,11 +19,12 @@ pub const NUM_OF_INDICES_IN_SHARD: usize = 2_000; /// `Address | Storagekey | 200` -> data is from transition 0 to 200. /// /// `Address | StorageKey | 300` -> data is from transition 201 to 300. -#[derive(Debug, Default, Clone, Eq, Ord, PartialOrd, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Default, Clone, Eq, Ord, PartialOrd, PartialEq, AsRef, Serialize, Deserialize)] pub struct StorageShardedKey { /// Storage account address. pub address: H160, /// Storage slot with highest transition id. + #[as_ref] pub sharded_key: ShardedKey, } diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 766975fbae..66a72d21d4 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -18,7 +18,7 @@ use reth_db::{ }, table::Table, tables, - transaction::{DbTx, DbTxMut, DbTxMutGAT}, + transaction::{DbTx, DbTxMut}, BlockNumberList, DatabaseError, }; use reth_interfaces::Result; @@ -102,75 +102,51 @@ impl<'this, TX: DbTxMut<'this>> DatabaseProvider<'this, TX> { } } -/// Unwind all history shards. For boundary shard, remove it from database and -/// return last part of shard with still valid items. If all full shard were removed, return list -/// would be empty. -fn unwind_account_history_shards<'a, TX: reth_db::transaction::DbTxMutGAT<'a>>( - cursor: &mut >::CursorMut, - address: Address, +/// For a given key, unwind all history shards that are below the given block number. +/// +/// S - Sharded key subtype. +/// T - Table to walk over. +/// C - Cursor implementation. +/// +/// This function walks the entries from the given start key and deletes all shards that belong to +/// the key and are below the given block number. +/// +/// The boundary shard (the shard is split by the block number) is removed from the database. Any +/// indices that are above the block number are filtered out. The boundary shard is returned for +/// reinsertion (if it's not empty). +fn unwind_history_shards<'a, S, T, C>( + cursor: &mut C, + start_key: T::Key, block_number: BlockNumber, -) -> Result> { - let mut item = cursor.seek_exact(ShardedKey::new(address, u64::MAX))?; - + mut shard_belongs_to_key: impl FnMut(&T::Key) -> bool, +) -> Result> +where + T: Table, + T::Key: AsRef>, + C: DbCursorRO<'a, T> + DbCursorRW<'a, T>, +{ + let mut item = cursor.seek_exact(start_key)?; while let Some((sharded_key, list)) = item { - // there is no more shard for address - if sharded_key.key != address { + // If the shard does not belong to the key, break. + if !shard_belongs_to_key(&sharded_key) { break } cursor.delete_current()?; - // check first item and if it is more and eq than `block_number` delete current - // item. - let first = list.iter(0).next().expect("List can't empty"); + + // Check the first item. + // If it is greater or eq to the block number, delete it. + let first = list.iter(0).next().expect("List can't be empty"); if first >= block_number as usize { item = cursor.prev()?; continue - } else if block_number <= sharded_key.highest_block_number { - // if first element is in scope whole list would be removed. - // so at least this first element is present. - return Ok(list.iter(0).take_while(|i| *i < block_number as usize).collect::>()) - } else { - let new_list = list.iter(0).collect::>(); - return Ok(new_list) - } - } - Ok(Vec::new()) -} - -/// Unwind all history shards. For boundary shard, remove it from database and -/// return last part of shard with still valid items. If all full shard were removed, return list -/// would be empty but this does not mean that there is none shard left but that there is no -/// split shards. -fn unwind_storage_history_shards<'a, TX: reth_db::transaction::DbTxMutGAT<'a>>( - cursor: &mut >::CursorMut, - address: Address, - storage_key: H256, - block_number: BlockNumber, -) -> Result> { - let mut item = cursor.seek_exact(StorageShardedKey::new(address, storage_key, u64::MAX))?; - - while let Some((storage_sharded_key, list)) = item { - // there is no more shard for address - if storage_sharded_key.address != address || - storage_sharded_key.sharded_key.key != storage_key - { - // there is no more shard for address and storage_key. - break - } - cursor.delete_current()?; - // check first item and if it is more and eq than `block_number` delete current - // item. - let first = list.iter(0).next().expect("List can't empty"); - if first >= block_number as usize { - item = cursor.prev()?; - continue - } else if block_number <= storage_sharded_key.sharded_key.highest_block_number { - // if first element is in scope whole list would be removed. - // so at least this first element is present. + } else if block_number <= sharded_key.as_ref().highest_block_number { + // Filter out all elements greater than block number. return Ok(list.iter(0).take_while(|i| *i < block_number as usize).collect::>()) } else { return Ok(list.iter(0).collect::>()) } } + Ok(Vec::new()) } @@ -1651,47 +1627,6 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> HistoryWriter for DatabaseProvider Ok(()) } - fn unwind_storage_history_indices(&self, range: Range) -> Result { - let storage_changesets = self - .tx - .cursor_read::()? - .walk_range(range)? - .collect::, _>>()?; - let changesets = storage_changesets.len(); - - let last_indices = storage_changesets - .into_iter() - // reverse so we can get lowest block number where we need to unwind account. - .rev() - // fold all storages and get last block number - .fold( - BTreeMap::new(), - |mut accounts: BTreeMap<(Address, H256), u64>, (index, storage)| { - // we just need address and lowest block number. - accounts.insert((index.address(), storage.key), index.block_number()); - accounts - }, - ); - - let mut cursor = self.tx.cursor_write::()?; - for ((address, storage_key), rem_index) in last_indices { - let shard_part = - unwind_storage_history_shards::(&mut cursor, address, storage_key, rem_index)?; - - // check last shard_part, if present, items needs to be reinserted. - if !shard_part.is_empty() { - // there are items in list - self.tx.put::( - StorageShardedKey::new(address, storage_key, u64::MAX), - BlockNumberList::new(shard_part) - .expect("There is at least one element in list and it is sorted."), - )?; - } - } - - Ok(changesets) - } - fn insert_account_history_index( &self, account_transitions: BTreeMap>, @@ -1744,6 +1679,53 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> HistoryWriter for DatabaseProvider Ok(()) } + fn unwind_storage_history_indices(&self, range: Range) -> Result { + let storage_changesets = self + .tx + .cursor_read::()? + .walk_range(range)? + .collect::, _>>()?; + let changesets = storage_changesets.len(); + + let last_indices = storage_changesets + .into_iter() + // reverse so we can get lowest block number where we need to unwind account. + .rev() + // fold all storages and get last block number + .fold( + BTreeMap::new(), + |mut accounts: BTreeMap<(Address, H256), u64>, (index, storage)| { + // we just need address and lowest block number. + accounts.insert((index.address(), storage.key), index.block_number()); + accounts + }, + ); + + let mut cursor = self.tx.cursor_write::()?; + for ((address, storage_key), rem_index) in last_indices { + let partial_shard = unwind_history_shards::<_, tables::StorageHistory, _>( + &mut cursor, + StorageShardedKey::last(address, storage_key), + rem_index, + |storage_sharded_key| { + storage_sharded_key.address == address && + storage_sharded_key.sharded_key.key == storage_key + }, + )?; + + // Check the last returned partial shard. + // If it's not empty, the shard needs to be reinserted. + if !partial_shard.is_empty() { + cursor.insert( + StorageShardedKey::last(address, storage_key), + BlockNumberList::new_pre_sorted(partial_shard), + )?; + } + } + + Ok(changesets) + } + fn unwind_account_history_indices(&self, range: RangeInclusive) -> Result { let account_changeset = self .tx @@ -1762,18 +1744,23 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> HistoryWriter for DatabaseProvider accounts.insert(account.address, index); accounts }); - // try to unwind the index + + // Unwind the account history index. let mut cursor = self.tx.cursor_write::()?; for (address, rem_index) in last_indices { - let shard_part = unwind_account_history_shards::(&mut cursor, address, rem_index)?; + let partial_shard = unwind_history_shards::<_, tables::AccountHistory, _>( + &mut cursor, + ShardedKey::last(address), + rem_index, + |sharded_key| sharded_key.key == address, + )?; - // check last shard_part, if present, items needs to be reinserted. - if !shard_part.is_empty() { - // there are items in list - self.tx.put::( - ShardedKey::new(address, u64::MAX), - BlockNumberList::new(shard_part) - .expect("There is at least one element in list and it is sorted."), + // Check the last returned partial shard. + // If it's not empty, the shard needs to be reinserted. + if !partial_shard.is_empty() { + cursor.insert( + ShardedKey::last(address), + BlockNumberList::new_pre_sorted(partial_shard), )?; } }