From 71dc531e6884b69c6d090ee504384cd12206bbb9 Mon Sep 17 00:00:00 2001 From: rakita Date: Fri, 27 Jan 2023 11:28:30 +0100 Subject: [PATCH] feat: HistoryStateProvider (#1057) --- bin/reth/src/test_eth_chain/runner.rs | 4 +- crates/interfaces/src/provider.rs | 7 +- crates/primitives/src/storage.rs | 2 +- crates/staged-sync/src/utils/init.rs | 7 +- crates/stages/src/stages/execution.rs | 8 +- crates/storage/db/src/abstraction/cursor.rs | 2 +- .../storage/db/src/implementation/mdbx/mod.rs | 4 +- .../storage/db/src/tables/models/accounts.rs | 2 +- crates/storage/provider/Cargo.toml | 14 +- .../provider/src/providers/historical.rs | 272 ++++++++++++++++-- 10 files changed, 272 insertions(+), 50 deletions(-) diff --git a/bin/reth/src/test_eth_chain/runner.rs b/bin/reth/src/test_eth_chain/runner.rs index 649e1aa993..48fda304d9 100644 --- a/bin/reth/src/test_eth_chain/runner.rs +++ b/bin/reth/src/test_eth_chain/runner.rs @@ -4,7 +4,7 @@ use eyre::eyre; use reth_db::{ cursor::DbCursorRO, database::Database, - mdbx::{test_utils::create_test_rw_db, WriteMap}, + mdbx::test_utils::create_test_rw_db, tables, transaction::{DbTx, DbTxMut}, Error as DbError, @@ -128,7 +128,7 @@ pub async fn run_test(path: PathBuf) -> eyre::Result { let has_block_reward = chain_spec.paris_status().block_number().is_some(); // Create db and acquire transaction - let db = create_test_rw_db::(); + let db = create_test_rw_db(); let tx = db.tx_mut()?; // insert genesis diff --git a/crates/interfaces/src/provider.rs b/crates/interfaces/src/provider.rs index e4070fd716..63dcdb5288 100644 --- a/crates/interfaces/src/provider.rs +++ b/crates/interfaces/src/provider.rs @@ -1,4 +1,4 @@ -use reth_primitives::{BlockHash, BlockNumber}; +use reth_primitives::{Address, BlockHash, BlockNumber, TransitionId, H256}; /// KV error type. They are using u32 to represent error code. #[allow(missing_docs)] @@ -12,7 +12,6 @@ pub enum Error { BlockBody { block_number: BlockNumber, block_hash: BlockHash }, #[error("Block transition id does not exist for block #{block_number}")] BlockTransition { block_number: BlockNumber }, - #[error("Block number {block_number} from block hash #{block_hash} does not exist in canonical chain")] BlockCanonical { block_number: BlockNumber, block_hash: BlockHash }, #[error("Block number {block_number} with hash #{received_hash:?} is not canonical block. Canonical block hash is #{expected_hash:?}")] @@ -21,4 +20,8 @@ pub enum Error { expected_hash: BlockHash, received_hash: BlockHash, }, + #[error("Storage ChangeSet address: ({address:?} key: {storage_key:?}) for transition:#{transition_id} does not exist")] + StorageChangeset { transition_id: TransitionId, address: Address, storage_key: H256 }, + #[error("Account {address:?} ChangeSet for transition #{transition_id} does not exist")] + AccountChangeset { transition_id: TransitionId, address: Address }, } diff --git a/crates/primitives/src/storage.rs b/crates/primitives/src/storage.rs index 8482dea017..2ce36381f9 100644 --- a/crates/primitives/src/storage.rs +++ b/crates/primitives/src/storage.rs @@ -3,7 +3,7 @@ use reth_codecs::Compact; use serde::Serialize; /// Account storage entry. -#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize)] +#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, Serialize)] pub struct StorageEntry { /// Storage key. pub key: H256, diff --git a/crates/staged-sync/src/utils/init.rs b/crates/staged-sync/src/utils/init.rs index 72f875b6a8..fc711fff48 100644 --- a/crates/staged-sync/src/utils/init.rs +++ b/crates/staged-sync/src/utils/init.rs @@ -73,7 +73,6 @@ pub fn init_genesis(db: Arc, chain: ChainSpec) -> Result(); + let db = create_test_rw_db(); let genesis_hash = init_genesis(db.clone(), MAINNET.clone()).unwrap(); // actual, expected @@ -90,7 +89,7 @@ mod tests { #[test] fn success_init_genesis_goerli() { - let db = create_test_rw_db::(); + let db = create_test_rw_db(); let genesis_hash = init_genesis(db.clone(), GOERLI.clone()).unwrap(); // actual, expected @@ -99,7 +98,7 @@ mod tests { #[test] fn success_init_genesis_sepolia() { - let db = create_test_rw_db::(); + let db = create_test_rw_db(); let genesis_hash = init_genesis(db.clone(), SEPOLIA.clone()).unwrap(); // actual, expected diff --git a/crates/stages/src/stages/execution.rs b/crates/stages/src/stages/execution.rs index 9df2597666..650e35d4c2 100644 --- a/crates/stages/src/stages/execution.rs +++ b/crates/stages/src/stages/execution.rs @@ -230,6 +230,7 @@ impl Stage for ExecutionStage { let mut cursor_storage_changeset = tx.cursor_write::()?; + cursor_storage_changeset.seek_exact(storage_id)?; if wipe_storage { // iterate over storage and save them before entry is deleted. @@ -240,7 +241,7 @@ impl Stage for ExecutionStage { }) .try_for_each(|entry| { let (_, old_value) = entry?; - cursor_storage_changeset.append(storage_id.clone(), old_value) + cursor_storage_changeset.append(storage_id, old_value) })?; // delete all entries @@ -262,8 +263,7 @@ impl Stage for ExecutionStage { let old_entry = StorageEntry { key, value: old_value }; let new_entry = StorageEntry { key, value: new_value }; // insert into StorageChangeSet - cursor_storage_changeset - .append(storage_id.clone(), old_entry.clone())?; + cursor_storage_changeset.append(storage_id, old_entry)?; // Always delete old value as duplicate table, put will not override it tx.delete::(address, Some(old_entry))?; @@ -361,7 +361,7 @@ impl Stage for ExecutionStage { // revert all changes to PlainStorage for (key, storage) in storage_changeset_batch.into_iter().rev() { let address = key.address(); - tx.put::(address, storage.clone())?; + tx.put::(address, storage)?; if storage.value == U256::ZERO { // delete value that is zero tx.delete::(address, Some(storage))?; diff --git a/crates/storage/db/src/abstraction/cursor.rs b/crates/storage/db/src/abstraction/cursor.rs index f99027f27d..dea015a3ad 100644 --- a/crates/storage/db/src/abstraction/cursor.rs +++ b/crates/storage/db/src/abstraction/cursor.rs @@ -61,7 +61,7 @@ pub trait DbDupCursorRO<'tx, T: DupSort> { fn next_dup_val(&mut self) -> ValueOnlyResult; /// Seek by key and subkey - fn seek_by_key_subkey(&mut self, key: T::Key, value: T::SubKey) -> ValueOnlyResult; + fn seek_by_key_subkey(&mut self, key: T::Key, subkey: T::SubKey) -> ValueOnlyResult; /// Returns an iterator starting at a key greater or equal than `start_key` of a DupSort /// table. diff --git a/crates/storage/db/src/implementation/mdbx/mod.rs b/crates/storage/db/src/implementation/mdbx/mod.rs index 13e8c88377..89a36e688f 100644 --- a/crates/storage/db/src/implementation/mdbx/mod.rs +++ b/crates/storage/db/src/implementation/mdbx/mod.rs @@ -110,6 +110,8 @@ impl Deref for Env { /// Collection of database test utilities #[cfg(any(test, feature = "test-utils"))] pub mod test_utils { + use reth_libmdbx::WriteMap; + use super::{Env, EnvKind, EnvironmentKind, Path}; use std::sync::Arc; @@ -121,7 +123,7 @@ pub mod test_utils { pub const ERROR_TEMPDIR: &str = "Not able to create a temporary directory."; /// Create rw database for testing - pub fn create_test_rw_db() -> Arc> { + pub fn create_test_rw_db() -> Arc> { create_test_db(EnvKind::RW) } /// Create database for testing diff --git a/crates/storage/db/src/tables/models/accounts.rs b/crates/storage/db/src/tables/models/accounts.rs index d4cbd466bb..3c9a1c18e6 100644 --- a/crates/storage/db/src/tables/models/accounts.rs +++ b/crates/storage/db/src/tables/models/accounts.rs @@ -42,7 +42,7 @@ impl Compact for AccountBeforeTx { /// [`TxNumber`] concatenated with [`Address`]. Used as a key for [`StorageChangeSet`] /// /// Since it's used as a key, it isn't compressed when encoding it. -#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize, Ord, PartialOrd)] +#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, Serialize, Deserialize, Ord, PartialOrd)] pub struct TransitionIdAddress(pub (TransitionId, Address)); impl TransitionIdAddress { diff --git a/crates/storage/provider/Cargo.toml b/crates/storage/provider/Cargo.toml index e78ff4b85d..32b827c5fe 100644 --- a/crates/storage/provider/Cargo.toml +++ b/crates/storage/provider/Cargo.toml @@ -34,7 +34,11 @@ heapless = "0.7.16" # feature test-utils arbitrary = { version = "1.1.7", features = ["derive"], optional = true } -secp256k1 = { version = "0.24.2", default-features = false, features = ["alloc", "recovery", "rand"], optional = true } +secp256k1 = { version = "0.24.2", default-features = false, features = [ + "alloc", + "recovery", + "rand", +], optional = true } parking_lot = { version = "0.12", optional = true } [dev-dependencies] @@ -42,9 +46,13 @@ reth-db = { path = "../db", features = ["test-utils"] } test-fuzz = "3.0.4" tokio = { version = "1.21.2", features = ["full"] } tokio-stream = { version = "0.1.11", features = ["sync"] } -arbitrary = { version = "1.1.7", features = ["derive"]} +arbitrary = { version = "1.1.7", features = ["derive"] } hex-literal = "0.3" -secp256k1 = { version = "0.24.2", default-features = false, features = ["alloc", "recovery", "rand"] } +secp256k1 = { version = "0.24.2", default-features = false, features = [ + "alloc", + "recovery", + "rand", +] } parking_lot = "0.12" [features] diff --git a/crates/storage/provider/src/providers/historical.rs b/crates/storage/provider/src/providers/historical.rs index 11bbee3800..8168f6a2ab 100644 --- a/crates/storage/provider/src/providers/historical.rs +++ b/crates/storage/provider/src/providers/historical.rs @@ -1,6 +1,9 @@ -use crate::{AccountProvider, BlockHashProvider, StateProvider}; +use crate::{AccountProvider, BlockHashProvider, Error, StateProvider}; use reth_db::{ - cursor::DbCursorRO, models::storage_sharded_key::StorageShardedKey, tables, transaction::DbTx, + cursor::{DbCursorRO, DbDupCursorRO}, + models::{storage_sharded_key::StorageShardedKey, ShardedKey}, + tables, + transaction::DbTx, }; use reth_interfaces::Result; use reth_primitives::{ @@ -31,12 +34,35 @@ impl<'a, 'b, TX: DbTx<'a>> HistoricalStateProviderRef<'a, 'b, TX> { Self { tx, transition, _phantom: PhantomData {} } } } - impl<'a, 'b, TX: DbTx<'a>> AccountProvider for HistoricalStateProviderRef<'a, 'b, TX> { /// Get basic account information. - fn basic_account(&self, _address: Address) -> Result> { - // TODO add when AccountHistory is defined - Ok(None) + fn basic_account(&self, address: Address) -> Result> { + // history key to search IntegerList of transition id changesets. + let history_key = ShardedKey::new(address, self.transition); + + let Some(changeset_transition_id) = self.tx.cursor_read::()? + .seek(history_key)? + .filter(|(key,_)| key.key == address) + .map(|(_,list)| list.0.enable_rank().successor(self.transition as usize).map(|i| i as u64)) else { + return Ok(None) + }; + + // if changeset transition id is present we are getting value from changeset + if let Some(changeset_transition_id) = changeset_transition_id { + let account = self + .tx + .cursor_dup_read::()? + .seek_by_key_subkey(changeset_transition_id, address)? + .ok_or(Error::AccountChangeset { + transition_id: changeset_transition_id, + address, + })?; + Ok(account.info) + } else { + // if changeset is not present that means that there was history shard but we need to + // use newest value from plain state + Ok(self.tx.get::(address)?) + } } } @@ -49,33 +75,38 @@ impl<'a, 'b, TX: DbTx<'a>> BlockHashProvider for HistoricalStateProviderRef<'a, impl<'a, 'b, TX: DbTx<'a>> StateProvider for HistoricalStateProviderRef<'a, 'b, TX> { /// Get storage. - fn storage(&self, account: Address, storage_key: StorageKey) -> Result> { - // TODO when StorageHistory is defined - let transition_id = StorageShardedKey::new(account, storage_key, self.transition); - let transaction_number = - self.tx.get::(transition_id)?.map(|_integer_list| - // TODO select integer that is one less from transaction_number <- // TODO: (rkrasiuk) not sure this comment is still relevant - self.transition); + fn storage(&self, address: Address, storage_key: StorageKey) -> Result> { + // history key to search IntegerList of transition id changesets. + let history_key = StorageShardedKey::new(address, storage_key, self.transition); - if transaction_number.is_none() { - return Ok(None) + let Some(changeset_transition_id) = self.tx.cursor_read::()? + .seek(history_key)? + .filter(|(key,_)| key.address == address && key.sharded_key.key == storage_key) + .map(|(_,list)| list.0.enable_rank().successor(self.transition as usize).map(|i| i as u64)) else { + return Ok(None) + }; + + // if changeset transition id is present we are getting value from changeset + if let Some(changeset_transition_id) = changeset_transition_id { + let storage_entry = self + .tx + .cursor_dup_read::()? + .seek_by_key_subkey((changeset_transition_id, address).into(), storage_key)? + .ok_or(Error::StorageChangeset { + transition_id: changeset_transition_id, + address, + storage_key, + })?; + Ok(Some(storage_entry.value)) + } else { + // if changeset is not present that means that there was history shard but we need to + // use newest value from plain state + Ok(self + .tx + .cursor_dup_read::()? + .seek_by_key_subkey(address, storage_key) + .map(|r| r.map(|entry| entry.value))?) } - let num = transaction_number.unwrap(); - let mut cursor = self.tx.cursor_dup_read::()?; - - if let Some((_, entry)) = cursor.seek_exact((num, account).into())? { - if entry.key == storage_key { - return Ok(Some(entry.value)) - } - - // TODO(rakita) this will be reworked shortly in StorageHistory PR. - // if let Some((_, entry)) = cursor.seek(storage_key)? { - // if entry.key == storage_key { - // return Ok(Some(entry.value)) - // } - // } - } - Ok(None) } /// Get account code by its hash @@ -122,3 +153,182 @@ derive_from_ref!( fn storage(&self, account: Address, storage_key: StorageKey) -> Result>, fn bytecode_by_hash(&self, code_hash: H256) -> Result> ); + +#[cfg(test)] +mod tests { + use reth_db::{ + database::Database, + mdbx::test_utils::create_test_rw_db, + models::{storage_sharded_key::StorageShardedKey, AccountBeforeTx, ShardedKey}, + tables, + transaction::{DbTx, DbTxMut}, + TransitionList, + }; + use reth_primitives::{hex_literal::hex, Account, StorageEntry, H160, H256, U256}; + + use crate::{AccountProvider, HistoricalStateProviderRef, StateProvider}; + + const ADDRESS: H160 = H160(hex!("0000000000000000000000000000000000000001")); + const STORAGE: H256 = + H256(hex!("0000000000000000000000000000000000000000000000000000000000000001")); + + #[test] + fn history_provider_get_account() { + let db = create_test_rw_db(); + let tx = db.tx_mut().unwrap(); + + tx.put::( + ShardedKey { key: ADDRESS, highest_transition_id: 7 }, + TransitionList::new([3, 7]).unwrap(), + ) + .unwrap(); + tx.put::( + ShardedKey { key: ADDRESS, highest_transition_id: u64::MAX }, + TransitionList::new([10, 15]).unwrap(), + ) + .unwrap(); + + let acc_plain = Account { nonce: 100, balance: U256::ZERO, bytecode_hash: None }; + let acc_at15 = Account { nonce: 15, balance: U256::ZERO, bytecode_hash: None }; + let acc_at10 = Account { nonce: 10, balance: U256::ZERO, bytecode_hash: None }; + let acc_at7 = Account { nonce: 7, balance: U256::ZERO, bytecode_hash: None }; + let acc_at3 = Account { nonce: 3, balance: U256::ZERO, bytecode_hash: None }; + + // setup + tx.put::( + 3, + AccountBeforeTx { address: ADDRESS, info: Some(acc_at3) }, + ) + .unwrap(); + tx.put::( + 7, + AccountBeforeTx { address: ADDRESS, info: Some(acc_at7) }, + ) + .unwrap(); + tx.put::( + 10, + AccountBeforeTx { address: ADDRESS, info: Some(acc_at10) }, + ) + .unwrap(); + tx.put::( + 15, + AccountBeforeTx { address: ADDRESS, info: Some(acc_at15) }, + ) + .unwrap(); + + // setup plain state + tx.put::(ADDRESS, acc_plain).unwrap(); + tx.commit().unwrap(); + + let tx = db.tx().unwrap(); + + // run + assert_eq!( + HistoricalStateProviderRef::new(&tx, 1).basic_account(ADDRESS), + Ok(Some(acc_at3)) + ); + assert_eq!( + HistoricalStateProviderRef::new(&tx, 3).basic_account(ADDRESS), + Ok(Some(acc_at3)) + ); + assert_eq!( + HistoricalStateProviderRef::new(&tx, 4).basic_account(ADDRESS), + Ok(Some(acc_at7)) + ); + assert_eq!( + HistoricalStateProviderRef::new(&tx, 7).basic_account(ADDRESS), + Ok(Some(acc_at7)) + ); + assert_eq!( + HistoricalStateProviderRef::new(&tx, 9).basic_account(ADDRESS), + Ok(Some(acc_at10)) + ); + assert_eq!( + HistoricalStateProviderRef::new(&tx, 10).basic_account(ADDRESS), + Ok(Some(acc_at10)) + ); + assert_eq!( + HistoricalStateProviderRef::new(&tx, 11).basic_account(ADDRESS), + Ok(Some(acc_at15)) + ); + assert_eq!( + HistoricalStateProviderRef::new(&tx, 16).basic_account(ADDRESS), + Ok(Some(acc_plain)) + ); + } + + #[test] + fn history_provider_get_storage() { + let db = create_test_rw_db(); + let tx = db.tx_mut().unwrap(); + + tx.put::( + StorageShardedKey { + address: ADDRESS, + sharded_key: ShardedKey { key: STORAGE, highest_transition_id: 7 }, + }, + TransitionList::new([3, 7]).unwrap(), + ) + .unwrap(); + tx.put::( + StorageShardedKey { + address: ADDRESS, + sharded_key: ShardedKey { key: STORAGE, highest_transition_id: u64::MAX }, + }, + TransitionList::new([10, 15]).unwrap(), + ) + .unwrap(); + + let entry_plain = StorageEntry { key: STORAGE, value: U256::from(100) }; + let entry_at15 = StorageEntry { key: STORAGE, value: U256::from(15) }; + let entry_at10 = StorageEntry { key: STORAGE, value: U256::from(10) }; + let entry_at7 = StorageEntry { key: STORAGE, value: U256::from(7) }; + let entry_at3 = StorageEntry { key: STORAGE, value: U256::from(0) }; + + // setup + tx.put::((3, ADDRESS).into(), entry_at3).unwrap(); + tx.put::((7, ADDRESS).into(), entry_at7).unwrap(); + tx.put::((10, ADDRESS).into(), entry_at10).unwrap(); + tx.put::((15, ADDRESS).into(), entry_at15).unwrap(); + + // setup plain state + tx.put::(ADDRESS, entry_plain).unwrap(); + tx.commit().unwrap(); + + let tx = db.tx().unwrap(); + + // run + assert_eq!( + HistoricalStateProviderRef::new(&tx, 0).storage(ADDRESS, STORAGE), + Ok(Some(entry_at3.value)) + ); + assert_eq!( + HistoricalStateProviderRef::new(&tx, 3).storage(ADDRESS, STORAGE), + Ok(Some(entry_at3.value)) + ); + assert_eq!( + HistoricalStateProviderRef::new(&tx, 4).storage(ADDRESS, STORAGE), + Ok(Some(entry_at7.value)) + ); + assert_eq!( + HistoricalStateProviderRef::new(&tx, 7).storage(ADDRESS, STORAGE), + Ok(Some(entry_at7.value)) + ); + assert_eq!( + HistoricalStateProviderRef::new(&tx, 9).storage(ADDRESS, STORAGE), + Ok(Some(entry_at10.value)) + ); + assert_eq!( + HistoricalStateProviderRef::new(&tx, 10).storage(ADDRESS, STORAGE), + Ok(Some(entry_at10.value)) + ); + assert_eq!( + HistoricalStateProviderRef::new(&tx, 11).storage(ADDRESS, STORAGE), + Ok(Some(entry_at15.value)) + ); + assert_eq!( + HistoricalStateProviderRef::new(&tx, 16).storage(ADDRESS, STORAGE), + Ok(Some(entry_plain.value)) + ); + } +}